p language-java"> 22
@Service
20
@Service
23
public class MockManageServiceImpl implements MockManageService {
21
public class MockManageServiceImpl implements MockManageService {
24
22
25
	protected static Map MACK_STATUS = new HashMap();
26
	
27
    @Autowired
28
    private MockScenarioDataRepository mockScenarioDataRepository;
29
    @Autowired
30
    private MockProcess mockProcess;
31
32
//    @Value("${kafka.producer.servers:Empty}")
33
    private static String kafkaServers = "47.105.160.21:9090";
34
35
    @Override
36
    public Map findMockStatusByScenarioId()  {
37
        List<Long> result = new ArrayList(MACK_STATUS.keySet());
38
        Map  tempmap = new HashMap();
39
        for(Long sid:result){
40
            tempmap.put(sid,"场景 "+sid+" 执行中...");
41
        }
42
        return tempmap;
43
    }
44
    @Override
45
    public String  startMackData(Long sId,  Long frequency,String topic0,String topic1)  {
46
        //        验证场景是否正在执行
47
        if(MACK_STATUS.get(sId)!=null){
48
            return "场景" + sId + " 正在执行,请耐心等待...";
49
        }
50
        //根据场景ID从数据库中查询模拟数据
51
        List<MockScenarioData>  mockScenarioDataList = mockScenarioDataRepository.findByScenarioIdOrderByOrderNo(sId);
52
53
        List<Long> orderNoList = mockScenarioDataRepository.findByOrderNoByScenarioId(sId);
54
        if(CollectionUtils.isEmpty(mockScenarioDataList) || CollectionUtils.isEmpty(orderNoList)){
55
            return "场景" + sId + " 没有配置模拟 ,请在数据库中添加数据再执行...";
56
        }
57
        
58
        //记录场景执行状态
59
        MACK_STATUS.put(sId,frequency);
60
        //异步执行模拟场景 通过IOT-DMP连接服务模拟
23
	protected static Map<Long, Long> MACK_STATUS = new HashMap();
24
25
	@Autowired
26
	private MockScenarioDataRepository mockScenarioDataRepository;
27
	@Autowired
28
	private MockProcess mockProcess;
29
30
	@Override
31
	public Map<Long, String> findMockStatusByScenarioId() {
32
		List<Long> result = new ArrayList<Long>(MACK_STATUS.keySet());
33
		Map<Long, String> tempmap = new HashMap<Long, String>();
34
		for (Long sid : result) {
35
			tempmap.put(sid, "场景 " + sid + " 执行中...");
36
		}
37
		return tempmap;
38
	}
39
40
	@Override
41
	public String startMackData(Long sId, Long frequency, String topic) {
42
		// 验证场景是否正在执行
43
		if (MACK_STATUS.get(sId) != null) {
44
			return "场景" + sId + " 正在执行,请耐心等待...";
45
		}
46
		// 根据场景ID从数据库中查询模拟数据
47
		List<MockScenarioData> mockScenarioDataList = mockScenarioDataRepository.findByScenarioIdOrderByOrderNo(sId);
48
49
		List<Long> orderNoList = mockScenarioDataRepository.findByOrderNoByScenarioId(sId);
50
		if (CollectionUtils.isEmpty(mockScenarioDataList) || CollectionUtils.isEmpty(orderNoList)) {
51
			return "场景" + sId + " 没有配置模拟 ,请在数据库中添加数据再执行...";
52
		}
53
54
		// 记录场景执行状态
55
		MACK_STATUS.put(sId, frequency);
56
		// 异步执行模拟场景 通过IOT-DMP连接服务模拟
61
//        mockProcess.processMock(sId,mockScenarioDataList,frequency);
57
//        mockProcess.processMock(sId,mockScenarioDataList,frequency);
62
58
63
        Map<Long, List<LocationBean>> dataMap=getMapListData(mockScenarioDataList);
64
        
65
        //异步执行模拟场景  直接模拟最终数据发送到kafka
66
        mockProcess.processMockKafka(sId,dataMap,orderNoList,frequency,topic0, topic1);
67
68
        return "场景" + sId + " 开始执行...";
69
    }
70
71
72
    @Override
73
    public String  stopMackData(Long sId){
74
        MACK_STATUS.remove(sId);
75
        return "场景" + sId + " 终止执行...";
76
    }
77
78
    /**
79
          * 按orderNo分组
80
     * @param mockScenarioDataList
81
     * @return
82
     */
83
    private Map<Long, List<LocationBean>> getMapListData(List<MockScenarioData> mockScenarioDataList) {
84
    	Map<Long, List<LocationBean>> map=new HashMap<Long, List<LocationBean>>();
85
    	
86
    	for (MockScenarioData mockScenarioData : mockScenarioDataList) {
87
    		LocationBean locationBean=new LocationBean();
88
        	locationBean.setFloor(1);
89
        	locationBean.setMac(mockScenarioData.getDeviceId());
90
        	locationBean.setxAxis(Double.valueOf(mockScenarioData.getLongitude()));
91
        	locationBean.setyAxis(Double.valueOf(mockScenarioData.getLatitude()));
92
        	locationBean.setzAxis(0d);
93
        	locationBean.setTimeStamp(new Timestamp(System.currentTimeMillis()));
94
    		
95
        	
96
    		long orderNo=mockScenarioData.getOrderNo();
97
    		
98
    		List<LocationBean> list=map.get(orderNo);
99
    		if (list==null) {
100
    			list=new ArrayList<LocationBean>();
101
    			list.add(locationBean);
102
    			map.put(orderNo, list);
103
			}else {
59
		Map<Long, List<LocationBean>> dataMap = getMapListData(mockScenarioDataList);
60
61
		// 异步执行模拟场景 直接模拟最终数据发送到kafka
62
		mockProcess.processMockKafka(sId, dataMap, orderNoList, frequency, topic);
63
64
		return "场景" + sId + " 开始执行...";
65
	}
66
67
	@Override
68
	public String stopMackData(Long sId) {
69
		MACK_STATUS.remove(sId);
70
		return "场景" + sId + " 终止执行...";
71
	}
72
73
	/**
74
	      * 按orderNo分组
75
	 * @param mockScenarioDataList
76
	 * @return
77
	 */
78
	private Map<Long, List<LocationBean>> getMapListData(List<MockScenarioData> mockScenarioDataList) {
79
		Map<Long, List<LocationBean>> map = new HashMap<Long, List<LocationBean>>();
80
81
		for (MockScenarioData mockScenarioData : mockScenarioDataList) {
82
			LocationBean locationBean = new LocationBean(mockScenarioData.getDeviceId(),
83
					Double.valueOf(mockScenarioData.getLongitude()), Double.valueOf(mockScenarioData.getLatitude()));
84
			
85
			if (StringUtils.isNotBlank(mockScenarioData.getFloor()))
86
				locationBean.setFloor(Long.valueOf(mockScenarioData.getFloor()));
87
88
			if (StringUtils.isNotBlank(mockScenarioData.getHeight()))
89
				locationBean.setZAxis(Double.valueOf(mockScenarioData.getHeight()));
90
			
91
			long orderNo = mockScenarioData.getOrderNo();
92
93
			List<LocationBean> list = map.get(orderNo);
94
			if (list == null) {
95
				list = new ArrayList<LocationBean>();
96
				list.add(locationBean);
97
				map.put(orderNo, list);
98
			} else {
104
				list.add(locationBean);
99
				list.add(locationBean);
105
			}
100
			}
106
		}
101
		}
107
    	
108
    	return map;
109
    }
110
    
102
103
		return map;
104
	}
105
106
	@Override
107
	public String sendMsgData(String msg, String topic) {
108
		String result = "";
109
110
		try {
111
			mockProcess.sendKafkaData(topic, msg);
112
			result = "消息发送成功:" + msg;
113
		} catch (Exception e) {
114
			result = "消息发送失败:" + msg + ",原因:" + e.getMessage();
115
		}
116
117
		return result;
118
	}
119
111
}
120
}

+ 34 - 19
indoor-mock-service/src/main/java/com/ai/bss/mock/service/impl/MockProcessImpl.java

1
package com.ai.bss.mock.service.impl;
1
package com.ai.bss.mock.service.impl;
2
2
3
import java.sql.Timestamp;
4
import java.util.ArrayList;
5
import java.util.Date;
6
import java.util.HashMap;
3
import java.util.HashMap;
7
import java.util.List;
4
import java.util.List;
8
import java.util.Map;
5
import java.util.Map;
10
import java.util.concurrent.Executors;
7
import java.util.concurrent.Executors;
11
import java.util.concurrent.TimeUnit;
8
import java.util.concurrent.TimeUnit;
12
9
10
import org.apache.commons.lang3.StringUtils;
13
import org.slf4j.Logger;
11
import org.slf4j.Logger;
14
import org.slf4j.LoggerFactory;
12
import org.slf4j.LoggerFactory;
15
import org.springframework.beans.factory.annotation.Value;
13
import org.springframework.beans.factory.annotation.Value;
16
import org.springframework.kafka.core.KafkaTemplate;
14
import org.springframework.kafka.core.KafkaTemplate;
17
import org.springframework.scheduling.annotation.Async;
15
import org.springframework.scheduling.annotation.Async;
18
import org.springframework.stereotype.Service;
16
import org.springframework.stereotype.Service;
19
import org.springframework.util.StringUtils;
20
17
21
import com.ai.bss.mock.model.Data;
22
import com.ai.bss.mock.model.DataPoint;
23
import com.ai.bss.mock.model.EBCData;
24
import com.ai.bss.mock.model.LocationBean;
18
import com.ai.bss.mock.model.LocationBean;
25
import com.ai.bss.mock.model.MockScenarioData;
19
import com.ai.bss.mock.model.MockScenarioData;
26
import com.ai.bss.mock.service.interfaces.MockProcess;
20
import com.ai.bss.mock.service.interfaces.MockProcess;
28
import com.ai.bss.mock.utils.tcp.IpuTcpLongConnectClient;
22
import com.ai.bss.mock.utils.tcp.IpuTcpLongConnectClient;
29
import com.ailk.common.data.impl.DataMap;
23
import com.ailk.common.data.impl.DataMap;
30
import com.alibaba.fastjson.JSON;
24
import com.alibaba.fastjson.JSON;
31
import com.alibaba.fastjson.JSONObject;
32
25
33
import lombok.extern.slf4j.Slf4j;
26
import lombok.extern.slf4j.Slf4j;
34
27
38
public class MockProcessImpl implements MockProcess {
31
public class MockProcessImpl implements MockProcess {
39
	private static final Logger logger = LoggerFactory.getLogger(MockProcessImpl.class);
32
	private static final Logger logger = LoggerFactory.getLogger(MockProcessImpl.class);
40
33
41
//    EBC设备tcp连接服务地址
42
    private static final String HOST = "47.105.130.83";
43
    private static final int PORT = 8042;
34
	@Value("${kafka.servers.host}")
35
    private String HOST ;
36
	@Value("${kafka.servers.port}")
37
    private int PORT;
44
38
45
    private static String kafkaServers = "47.105.160.21:9091";
46
47
    @Value("${kafka.producer.servers}")
48
    private static String servers;
39
    @Value("${kafka.bootstrap-servers}")
40
    private String kafkaServers;
49
    
41
    
50
    @Value("${kafka.producer.topic}")
51
    private String topic;
42
    @Value("${kafka.topic.request}")
43
    private String requestTopic;
52
44
53
    @Async
45
    @Async
54
    @Override
46
    @Override
68
    
60
    
69
    @Async
61
    @Async
70
    @Override
62
    @Override
71
    public void processMockKafka(Long sId,Map<Long, List<LocationBean>> dataMap,List<Long> orderNoList, Long frequency,String topic0,String topic1){
63
    public void processMockKafka(Long sId,Map<Long, List<LocationBean>> dataMap,List<Long> orderNoList, Long frequency,String myTopic){
72
    	String clientId="XBG_AC67B2C23CFC";
64
    	String clientId="XBG_AC67B2C23CFC";
73
		String userName="abc";
65
		String userName="abc";
74
		topic="uploadInfoTest";
66
		String topic=StringUtils.isNotBlank(myTopic)?myTopic:requestTopic;
75
		
67
		
76
    	try {
68
    	try {
77
            for (Long orderNo : orderNoList) {
69
            for (Long orderNo : orderNoList) {
133
		}
125
		}
134
	}
126
	}
135
    
127
    
128
    @Async
129
    @Override
130
    public void sendKafkaData(String topic, String msgStr) throws Exception{
131
		KafkaTemplate kafkaTemplate = kafkaTemplateMap.get(kafkaServers);
132
133
		if (kafkaTemplate == null) {
134
			// new 实例
135
			KafkaProducerConfig kafkaProducerConfig = new KafkaProducerConfig(kafkaServers);
136
			kafkaTemplate = kafkaProducerConfig.kafkaTemplate();
137
			kafkaTemplateMap.put(kafkaServers, kafkaTemplate);
138
		}
139
140
		if (kafkaTemplate == null) {
141
			log.error("kafkaTemplate is null");
142
			 throw new RuntimeException("kafkaTemplate is null");
143
		}
144
145
			log.info("发送kafka消息:kafkaServers="+kafkaServers+", topic=" + topic + ", msg=" + msgStr);
146
			kafkaTemplate.send(topic, msgStr);
147
148
			kafkaTemplate.flush();
149
			log.info("发送kafka消息成功");
150
	}
136
}
151
}

+ 10 - 2
indoor-mock-service/src/main/java/com/ai/bss/mock/service/interfaces/MockManageService.java

8
     * 查询当前正在 的场景
8
     * 查询当前正在 的场景
9
     * @return
9
     * @return
10
     */
10
     */
11
    Map findMockStatusByScenarioId();
11
    Map<Long,String> findMockStatusByScenarioId();
12
12
13
    /**
13
    /**
14
     * 开始执行模拟数据
14
     * 开始执行模拟数据
16
     * @param f
16
     * @param f
17
     * @return
17
     * @return
18
     */
18
     */
19
    String  startMackData(Long sId,  Long f,String topic0,String topic1) ;
19
    String  startMackData(Long sId,  Long f,String topic) ;
20
20
21
    /**
21
    /**
22
     * 停止执行模拟数据
22
     * 停止执行模拟数据
24
     * @return
24
     * @return
25
     */
25
     */
26
    String  stopMackData(Long sId);
26
    String  stopMackData(Long sId);
27
    
28
    /**
29
     * 发送kafka消息
30
     * @param msg
31
     * @param topic
32
     * @return
33
     */
34
    String  sendMsgData(String msg,String topic);
27
}
35
}

+ 3 - 1
indoor-mock-service/src/main/java/com/ai/bss/mock/service/interfaces/MockProcess.java

10
    void processMock(Long sId,List<MockScenarioData> mockScenarioDataList, Long frequency);
10
    void processMock(Long sId,List<MockScenarioData> mockScenarioDataList, Long frequency);
11
11
12
12
13
    void processMockKafka(Long sId,Map<Long, List<LocationBean>> dataMap,List<Long> orderNoList, Long frequency,String topic0,String topic1);
13
    void processMockKafka(Long sId,Map<Long, List<LocationBean>> dataMap,List<Long> orderNoList, Long frequency,String myTopic);
14
15
    void sendKafkaData(String topic, String msgStr) throws Exception;
14
}
16
}

+ 3 - 7
indoor-mock-service/src/main/java/com/ai/bss/mock/utils/makedata/MakeBluetoothData.java

1
package com.ai.bss.mock.utils.makedata;
1
package com.ai.bss.mock.utils.makedata;
2
2
3
import java.sql.Timestamp;
3
import java.sql.Timestamp;
4
import java.util.Date;
4
5
5
import com.ai.bss.mock.model.LocationBean;
6
import com.ai.bss.mock.model.LocationBean;
6
import com.alibaba.fastjson.JSON;
7
import com.alibaba.fastjson.JSON;
13
	 * @param yAxis
14
	 * @param yAxis
14
	 */
15
	 */
15
	public static String sendBluetoothKafka(double xAxis,double yAxis) {
16
	public static String sendBluetoothKafka(double xAxis,double yAxis) {
16
		String clientId="XBG_AC67B2C23CFC";
17
		 String userName="abc";
18
		
19
		 String mac="d3e656a4573d";
17
		 String id="d3e656a4573d";
20
		 
18
		 
21
		 Integer floor=1;
22
		 Timestamp timeStamp=new Timestamp(System.currentTimeMillis());
23
		 LocationBean location=new LocationBean(mac, floor, xAxis, yAxis, timeStamp);
19
		 LocationBean location=new LocationBean(id, xAxis, yAxis);
24
		 
20
		 
25
		 String msg=JSON.toJSONString(location);
21
		 String msg=JSON.toJSONString(location);
26
		 
22
		 

+ 48 - 19
indoor-mock-service/src/main/resources/application.properties

1
spring.application.name=WorkTaskSpec
1
spring.application.name=WorkTaskSpec
2
server.port=8086
2
server.port=8090
3
3
4
# DATASOURCE (DataSourceAutoConfiguration & DataSourceProperties)
4
# DATASOURCE (DataSourceAutoConfiguration & DataSourceProperties)
5
#spring.datasource.url=jdbc:mysql://localhost:3306/cmp
5
#spring.datasource.url=jdbc:mysql://localhost:3306/cmp
15
#spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
15
#spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
16
#spring.datasource.username=cmp
16
#spring.datasource.username=cmp
17
#spring.datasource.password=cmp@123
17
#spring.datasource.password=cmp@123
18
spring.datasource.url=jdbc:mysql://10.19.90.34:3307/energy?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=Asia/Shanghai
18
#spring.datasource.url=jdbc:mysql://10.19.90.34:3307/indoor?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=Asia/Shanghai
19
#spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
20
#spring.datasource.username=ebc
21
#spring.datasource.password=ebc@123
22
spring.datasource.url=jdbc:mysql://8.130.50.1:6000/indoor?useUnicode=true&ampcharacterEncoding=utf-8&useSSL=false&serverTimezone=Asia/Shanghai
19
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
23
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
20
spring.datasource.username=ebc
21
spring.datasource.password=ebc@123
24
spring.datasource.username=cmp
25
spring.datasource.password=cmp@123
26
22
27
23
# JPA (JpaBaseConfiguration, HibernateJpaAutoConfiguration)
28
# JPA (JpaBaseConfiguration, HibernateJpaAutoConfiguration)
24
#spring.jpa.database=default
29
#spring.jpa.database=default
41
46
42
47
43
#============== kafka ===================
48
#============== kafka ===================
44
kafka.consumer.zookeeper.connect=47.105.160.21:2100
45
kafka.consumer.servers=47.105.160.21:9090
46
kafka.consumer.enable.auto.commit=true
47
kafka.consumer.session.timeout=6000
48
kafka.consumer.auto.commit.interval=100
49
kafka.consumer.auto.offset.reset=latest
50
kafka.consumer.topic=productMessage
51
kafka.consumer.group.id=productMessage
52
kafka.consumer.concurrency=10
53
54
kafka.producer.servers=47.105.160.21:9091
55
kafka.producer.retries=0
56
kafka.producer.batch.size=4096
49
#kafka.consumer.zookeeper.connect=47.105.160.21:2100
50
#kafka.consumer.servers=47.105.160.21:9090
51
#kafka.consumer.enable.auto.commit=true
52
#kafka.consumer.session.timeout=6000
53
#kafka.consumer.auto.commit.interval=100
54
#kafka.consumer.auto.offset.reset=latest
55
#kafka.consumer.topic=productMessage
56
#kafka.consumer.group.id=productMessage
57
#kafka.consumer.concurrency=10
58
59
#kafka.producer.servers=47.105.160.21:9091
60
#kafka.producer.retries=0
61
#kafka.producer.batch.size=4096
62
#kafka.producer.linger=1
63
#kafka.producer.buffer.memory=40960
64
#kafka.producer.topic=uploadInfoTest1
65
66
kafka.servers.host=8.130.50.1
67
kafka.servers.port=9091
68
kafka.bootstrap-servers=8.130.50.1:9091
69
70
#kafka.topic=bluetoothTopic
71
#kafka.topic=uwbTopic
72
kafka.topic.request=locationTopic
73
74
kafka.producer.batch-size=16785
75
kafka.producer.retries=1
76
kafka.producer.buffer-memory=33554432
57
kafka.producer.linger=1
77
kafka.producer.linger=1
58
kafka.producer.buffer.memory=40960
59
kafka.producer.topic=uploadInfoTest
78
kafka.consumer.auto-offset-reset=latest
79
kafka.consumer.max-poll-records=3100
80
kafka.consumer.enable-auto-commit=false
81
kafka.consumer.auto-commit-interval=1000
82
kafka.consumer.session-timeout=20000
83
kafka.consumer.max-poll-interval=15000
84
kafka.consumer.max-partition-fetch-bytes=15728640
85
kafka.listener.batch-listener=true
86
kafka.listener.concurrencys=3,6
87
kafka.listener.poll-timeout=1500
88

code-example - Nuosi Git Service

团队对封装组件的代码范例

liutong3 69fef4d69a 添加注释 5 年之前
..
src 69fef4d69a 添加注释 5 年之前
.gitignore cbfab71f08 gitignore调整 5 年之前
ipu-kafka-example.iml 655b5fcdb8 msgframe的kafka测试代码 6 年之前
pom.xml 69fef4d69a 添加注释 5 年之前