26
@Slf4j
27
public class AIAlarmKafkaTask {
28
29
	@Autowired
30
	private AiTaskCommand aiTaskCommand;
31
32
	@KafkaListener(containerFactory = "kafkaBatchListener3", topics = "${kafka.topic.aialarm:topic_ai_alarm}", groupId = "alarm_group")
33
	public void alarmListener(ConsumerRecord<String, String> records, Acknowledgment ack) throws Throwable {
34
		try {
35
			log.info("----------------AI监控触发违规信息消费开始---------------------------");
36
37
			String message = records.value();
38
			log.info("已接AI报违规消息,消息为:" + message);
39
40
			CommonRequest<AiIdenLog> aiIdenLogRequest = JSON.parseObject(message,
41
					new TypeReference<CommonRequest<AiIdenLog>>() {
42
					});
43
44
			CommonResponse runningResult = aiTaskCommand.aiIdenLogTriggerEvent(aiIdenLogRequest);
45
			log.info("runningResult: \n{}", JsonUtils.toJSONStringByDateFormat(runningResult, true));
46
			Assert.assertTrue(runningResult.isSuccess());
47
48
		} catch (Exception e) {
49
			log.error("kafka消费异常" + e.getMessage(), e);
50
		} finally {
51
			ack.acknowledge();// 手动提交偏移量
52
		}
53
	}
54
}

+ 54 - 0
security-protection-service/src/main/java/com/ai/bss/security/protection/service/task/AIAttendanceKafkaTask.java

@ -0,0 +1,54 @@
1
package com.ai.bss.security.protection.service.task;
2
3
import org.apache.kafka.clients.consumer.ConsumerRecord;
4
import org.junit.Assert;
5
import org.springframework.beans.factory.annotation.Autowired;
6
import org.springframework.kafka.annotation.KafkaListener;
7
import org.springframework.kafka.support.Acknowledgment;
8
import org.springframework.stereotype.Component;
9
10
import com.ai.abc.api.model.CommonRequest;
11
import com.ai.abc.api.model.CommonResponse;
12
import com.ai.abc.util.JsonUtils;
13
import com.ai.bss.work.safety.model.AiIdenLog;
14
import com.ai.bss.work.safety.service.api.AiTaskCommand;
15
import com.alibaba.fastjson.JSON;
16
import com.alibaba.fastjson.TypeReference;
17
18
import lombok.extern.slf4j.Slf4j;
19
20
/**
21
 * 人脸识别事件触发考勤
22
 * @author konghl@asiainfo.com
23
 * 2020-12-14
24
 */
25
@Component
26
@Slf4j
27
public class AIAttendanceKafkaTask {
28
29
	@Autowired
30
	private AiTaskCommand aiTaskCommand;
31
32
	@KafkaListener(containerFactory = "kafkaBatchListener3", topics = "${kafka.topic.alarm:Topic_IoT_IndividualAlarm}", groupId = "alarm_group")
33
	public void alarmListener(ConsumerRecord<String, String> records, Acknowledgment ack) throws Throwable {
34
		try {
35
			log.info("----------------AI人脸识别触发考勤信息消费开始---------------------------");
36
37
			String message = records.value();
38
			log.info("已接AI考勤信息,消息为:" + message);
39
40
			CommonRequest<AiIdenLog> aiIdenLogRequest = JSON.parseObject(message,
41
					new TypeReference<CommonRequest<AiIdenLog>>() {
42
					});
43
44
			CommonResponse runningResult = aiTaskCommand.aiIdenLogTriggerEvent(aiIdenLogRequest);
45
			log.info("runningResult: \n{}", JsonUtils.toJSONStringByDateFormat(runningResult, true));
46
			Assert.assertTrue(runningResult.isSuccess());
47
48
		} catch (Exception e) {
49
			log.error("kafka消费异常" + e.getMessage(), e);
50
		} finally {
51
			ack.acknowledge();// 手动提交偏移量
52
		}
53
	}
54
}

+ 54 - 0
security-protection-service/src/main/java/com/ai/bss/security/protection/service/task/AIResultRecordKafkaTask.java

@ -0,0 +1,54 @@
1
package com.ai.bss.security.protection.service.task;
2
3
import org.apache.kafka.clients.consumer.ConsumerRecord;
4
import org.junit.Assert;
5
import org.springframework.beans.factory.annotation.Autowired;
6
import org.springframework.kafka.annotation.KafkaListener;
7
import org.springframework.kafka.support.Acknowledgment;
8
import org.springframework.stereotype.Component;
9
10
import com.ai.abc.api.model.CommonRequest;
11
import com.ai.abc.api.model.CommonResponse;
12
import com.ai.abc.util.JsonUtils;
13
import com.ai.bss.work.safety.model.AiIdenLog;
14
import com.ai.bss.work.safety.service.api.AiTaskCommand;
15
import com.alibaba.fastjson.JSON;
16
import com.alibaba.fastjson.TypeReference;
17
18
import lombok.extern.slf4j.Slf4j;
19
20
/**
21
 * AI任务执行结果日志创建
22
 * @author konghl@asiainfo.com
23
 * 2020-12-14
24
 */
25
@Component
26
@Slf4j
27
public class AIResultRecordKafkaTask {
28
29
	@Autowired
30
	private AiTaskCommand aiTaskCommand;
31
32
	@KafkaListener(containerFactory = "kafkaBatchListener6", topics = "${kafka.topic.alarm:Topic_IoT_IndividualAlarm}", groupId = "alarm_group")
33
	public void alarmListener(ConsumerRecord<String, String> records, Acknowledgment ack) throws Throwable {
34
		try {
35
			log.info("----------------AI任务执行结果信息消费开始---------------------------");
36
37
			String message = records.value();
38
			log.info("已接AI任务执行结果消息,消息为:" + message);
39
40
			CommonRequest<AiIdenLog> aiIdenLogRequest = JSON.parseObject(message,
41
					new TypeReference<CommonRequest<AiIdenLog>>() {
42
					});
43
44
			CommonResponse<AiIdenLog> runningResult = aiTaskCommand.createAiIdenLog(aiIdenLogRequest);
45
			log.info("runningResult: \n{}", JsonUtils.toJSONStringByDateFormat(runningResult, true));
46
			Assert.assertTrue(runningResult.isSuccess());
47
48
		} catch (Exception e) {
49
			log.error("kafka消费异常" + e.getMessage(), e);
50
		} finally {
51
			ack.acknowledge();// 手动提交偏移量
52
		}
53
	}
54
}

+ 3 - 4
security-protection-service/src/main/resources/application.properties

@ -27,10 +27,9 @@ spring.main.allow-bean-definition-overriding=true
27 27
#kafka
28 28
#kafka.bootstrap-servers=47.105.160.21:9090
29 29
kafka.bootstrap-servers=10.19.90.34:9090
30
kafka.topic.deviceLocation=Topic_IoT_DeviceLocation_111
31
kafka.topic.alarm=Topic_IoT_IndividualAlarm_111
32
#kafka.topic.deviceLocation=DeviceLocationA
33
#kafka.topic.alarm=IndividualAlarmA
30
kafka.topic.aitask=topic_ai_task
31
kafka.topic.aialarm=topic_ai_alarm
32
kafka.topic.aiattendance=topic_ai_attendance
34 33
kafka.producer.batch-size=16785
35 34
kafka.producer.retries=1
36 35
kafka.producer.buffer-memory=33554432

Sign In - Nuosi Git Service

Sign In