Przeglądaj źródła

增加Mock服务指定topic功能

wangdong6 4 lat temu
rodzic
commit
54ac2ea546

+ 1 - 9
ebc-mock-service/src/main/java/com/ai/bss/mock/MockApp.java

@ -1,6 +1,5 @@
1 1
package com.ai.bss.mock;
2 2
3
import com.ai.bss.mock.utils.tcp.IpuTcpLongConnectClient;
4 3
import org.springframework.boot.SpringApplication;
5 4
import org.springframework.boot.autoconfigure.SpringBootApplication;
6 5
import org.springframework.boot.autoconfigure.domain.EntityScan;
@ -9,7 +8,7 @@ import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
9 8
import org.springframework.scheduling.annotation.EnableAsync;
10 9
11 10
/**
12
 * @author zhangfeng
11
 * @author wangdong
13 12
 */
14 13
15 14
@EnableJpaRepositories(basePackages = "com.ai.bss")
@ -19,15 +18,8 @@ import org.springframework.scheduling.annotation.EnableAsync;
19 18
@SpringBootApplication
20 19
21 20
public class MockApp {
22
    private static final String HOST = "47.105.130.83";
23
    private static final int PORT = 8042;
24 21
25 22
    public static void main(String[] args) throws Exception {
26
27
//        IpuTcpLongConnectClient.getInstance(HOST, PORT).test();
28
29 23
        SpringApplication.run(MockApp.class, args);
30
31
32 24
    }
33 25
}

+ 24 - 21
ebc-mock-service/src/main/java/com/ai/bss/mock/service/impl/MockProcessImpl.java

@ -10,6 +10,7 @@ import com.ai.bss.mock.utils.KafkaProducerConfig;
10 10
import com.ai.bss.mock.utils.tcp.IpuTcpLongConnectClient;
11 11
import com.alibaba.fastjson.JSONObject;
12 12
import lombok.extern.slf4j.Slf4j;
13
import org.springframework.beans.factory.annotation.Value;
13 14
import org.springframework.context.annotation.Configuration;
14 15
import org.springframework.kafka.core.KafkaTemplate;
15 16
import org.springframework.scheduling.annotation.Async;
@ -24,18 +25,18 @@ import java.util.concurrent.TimeUnit;
24 25
25 26
26 27
@Slf4j
27
@Configuration
28 28
@Service
29 29
public class MockProcessImpl implements MockProcess {
30 30
//    EBC设备tcp连接服务地址
31 31
    private static final String HOST = "47.105.130.83";
32 32
    private static final int PORT = 8042;
33 33
34
//    private static String kafkaServers = "47.105.160.21:9090";
35
    private static String kafkaServers = "10.19.90.34:9090";
34 36
35
//    @Value("${kafka.producer.servers:Empty}")
36
//    private static String kafkaServers;
37
    private static String kafkaServers = "47.105.160.21:9090";
38 37
38
    @Value("${kafka.producer.servers}")
39
    private static String servers;
39 40
40 41
    @Async
41 42
    @Override
@ -174,19 +175,20 @@ public class MockProcessImpl implements MockProcess {
174 175
175 176
    private boolean sendKafkamsg(List<DataPoint> dataPointList,Long frequency){
176 177
//        String kafkaServers = "";
177
        KafkaTemplate kafkaTemplate = kafkaTemplateMap.get(kafkaServers);
178
        if(kafkaTemplate == null){
179
            //new 实例
180
            KafkaProducerConfig kafkaProducerConfig = new KafkaProducerConfig(kafkaServers);
181
            kafkaTemplate = kafkaProducerConfig.kafkaTemplate();
182
            kafkaTemplateMap.put(kafkaServers,kafkaTemplate);
183
        }
178
//        KafkaTemplate kafkaTemplate = kafkaTemplateMap.get(kafkaServers);
179
//        if(kafkaTemplate == null){
180
//            //new 实例
181
//            KafkaProducerConfig kafkaProducerConfig = new KafkaProducerConfig(kafkaServers);
182
//            kafkaTemplate = kafkaProducerConfig.kafkaTemplate();
183
//            kafkaTemplateMap.put(kafkaServers,kafkaTemplate);
184
//        }
185
186
        System.out.println(servers);
184 187
        try {
185 188
            for(DataPoint dataPoint:dataPointList){
186
                Object result = kafkaTemplate.send(dataPoint.getPushTopic(), JSONObject.toJSONString(dataPoint)).get();
187
//                Object result = KafkaProducerConfig.sendForKafka(dataPoint.getPushTopic(), JSONObject.toJSONString(dataPoint));
188
                kafkaTemplate.flush();
189
                log.debug("It's successful send msg to Kafka:" + kafkaServers + "服务器地址以及topic:" + dataPoint.getPushTopic() + " 内容:" + JSONObject.toJSONString(dataPoint) + " 结果" + result);
189
//                Object result = kafkaTemplate.send(dataPoint.getPushTopic(), JSONObject.toJSONString(dataPoint)).get();
190
                Object result = KafkaProducerConfig.sendForKafka(dataPoint.getPushTopic(), JSONObject.toJSONString(dataPoint));
191
//                kafkaTemplate.flush();
190 192
191 193
                TimeUnit.MILLISECONDS.sleep(frequency*1000);
192 194
@ -212,17 +214,18 @@ public class MockProcessImpl implements MockProcess {
212 214
            kafkaTemplate = kafkaProducerConfig.kafkaTemplate();
213 215
            kafkaTemplateMap.put(kafkaServers,kafkaTemplate);
214 216
        }
217
215 218
        try {
216
//            for(DataPoint dataPoint:dataPointList){
217 219
            Object result = kafkaTemplate.send(dataPoint.getPushTopic(), JSONObject.toJSONString(dataPoint)).get();
218
            kafkaTemplate.flush();
220
//            Object result = KafkaProducerConfig.sendForKafka(dataPoint.getPushTopic(), JSONObject.toJSONString(dataPoint));
221
//            kafkaTemplate.flush();
219 222
220 223
//                TimeUnit.MILLISECONDS.sleep(frequency*1000);
221
            log.debug("It's successful send msg to Kafka:" + kafkaServers + "服务器地址以及topic:" + dataPoint.getPushTopic() + " 内容:" + JSONObject.toJSONString(dataPoint) + " 结果" + result);
224
//            log.debug("It's successful send msg to Kafka:" + kafkaServers + "服务器地址以及topic:" + dataPoint.getPushTopic() + " 内容:" + JSONObject.toJSONString(dataPoint) + " 结果" + result);
222 225
//            }
223
        } catch (InterruptedException e) {
224
            e.printStackTrace();
225
            return false;
226
//        } catch (InterruptedException e) {
227
//            e.printStackTrace();
228
//            return false;
226 229
        } catch (Exception e) {
227 230
            e.printStackTrace();
228 231
            log.error(e.getMessage());

+ 0 - 3
ebc-mock-service/src/main/java/com/ai/bss/mock/service/interfaces/MockManageService.java

@ -1,8 +1,5 @@
1 1
package com.ai.bss.mock.service.interfaces;
2 2
3
import com.ai.bss.mock.model.MockScenarioData;
4
5
import java.util.List;
6 3
import java.util.Map;
7 4
8 5
public interface MockManageService {

+ 26 - 5
ebc-mock-service/src/main/java/com/ai/bss/mock/utils/KafkaProducerConfig.java

@ -1,5 +1,6 @@
1 1
package com.ai.bss.mock.utils;
2 2
3
import com.alibaba.fastjson.JSONObject;
3 4
import lombok.NoArgsConstructor;
4 5
import lombok.extern.slf4j.Slf4j;
5 6
import org.apache.kafka.clients.producer.ProducerConfig;
@ -11,6 +12,7 @@ import org.springframework.kafka.annotation.EnableKafka;
11 12
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
12 13
import org.springframework.kafka.core.KafkaTemplate;
13 14
import org.springframework.kafka.core.ProducerFactory;
15
import org.springframework.stereotype.Component;
14 16
15 17
import java.util.HashMap;
16 18
import java.util.Map;
@ -19,10 +21,12 @@ import java.util.Map;
19 21
@Configuration
20 22
@EnableKafka
21 23
@NoArgsConstructor
24
@Component
22 25
public class KafkaProducerConfig {
23 26
24 27
    @Value("${kafka.producer.servers:Empty}")
25
    private static String servers;
28
//    private static String servers = "47.105.160.21:9090";
29
    private static String servers = "10.19.90.34:9090";
26 30
    @Value("${kafka.producer.retries:0}")
27 31
    private  int retries;
28 32
    @Value("${kafka.producer.batch.size:4096}")
@ -37,9 +41,9 @@ public class KafkaProducerConfig {
37 41
    public KafkaProducerConfig(String inputServers){
38 42
        servers = inputServers;
39 43
        this.retries = 0;
40
        this.batchSize = 4096;
44
        this.batchSize = 16384;
41 45
        this.linger = 1;
42
        this.bufferMemory = 40960;
46
        this.bufferMemory = 33554432;
43 47
        this.maxBlockTime = 6000;
44 48
45 49
    }
@ -66,6 +70,10 @@ public class KafkaProducerConfig {
66 70
        return new KafkaTemplate<String, String>(producerFactory());
67 71
    }
68 72
//
73
74
75
    private static Map<String, KafkaTemplate>  kafkaTemplateMap = new HashMap<>();
76
    static KafkaTemplate  kafkaTemplate = null;
69 77
    /**
70 78
     * 推送消息到订阅的KAFKA服务器地址以及topic
71 79
     * @param kafkaTopic  订阅的kafka topic
@ -73,12 +81,25 @@ public class KafkaProducerConfig {
73 81
     * @return
74 82
     */
75 83
    public static Boolean sendForKafka(String kafkaTopic,String content) {
76
        KafkaProducerConfig kafkaProducerConfig = new KafkaProducerConfig(servers);
77
        KafkaTemplate kafkaTemplate =  kafkaProducerConfig.kafkaTemplate();
84
//        KafkaProducerConfig kafkaProducerConfig = new KafkaProducerConfig(servers);
85
//        KafkaTemplate kafkaTemplate =  kafkaProducerConfig.kafkaTemplate();
86
87
88
        kafkaTemplate = kafkaTemplateMap.get(servers);
89
        if(kafkaTemplate == null){
90
            //new 实例
91
//            KafkaProducerConfig kafkaProducerConfig = new KafkaProducerConfig(servers);
92
            KafkaProducerConfig kafkaProducerConfig = new KafkaProducerConfig();
93
            kafkaTemplate = kafkaProducerConfig.kafkaTemplate();
94
            kafkaTemplateMap.put(servers,kafkaTemplate);
95
        }
78 96
79 97
80 98
        try {
99
            System.out.println("send msg to Kafka:" + servers + "服务器地址以及topic:" + kafkaTopic + " 内容:" + content );
81 100
            Object result = kafkaTemplate.send(kafkaTopic,content ).get();
101
            log.debug("It's successful send msg to Kafka:" + servers + "服务器地址以及topic:" + kafkaTopic + " 内容:" + content + " 结果" + result);
102
82 103
            kafkaTemplate.flush();
83 104
        } catch (InterruptedException e) {
84 105
            log.error(e.getMessage());

+ 6 - 3
ebc-mock-service/src/main/resources/application.properties

@ -11,10 +11,13 @@ server.port=8086
11 11
#spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
12 12
#spring.datasource.username=comon_frm
13 13
#spring.datasource.password=1qaz@WSX
14
spring.datasource.url=jdbc:mysql://47.105.160.21:3309/dmp?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=Asia/Shanghai
14
#spring.datasource.url=jdbc:mysql://47.105.160.21:3309/dmp?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=Asia/Shanghai
15
spring.datasource.url=jdbc:mysql://10.19.90.34:3307/energy?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=Asia/Shanghai
15 16
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
16
spring.datasource.username=cmp
17
spring.datasource.password=cmp@123
17
#spring.datasource.username=cmp
18
#spring.datasource.password=cmp@123
19
spring.datasource.username=ebc
20
spring.datasource.password=ebc@123
18 21
19 22
# JPA (JpaBaseConfiguration, HibernateJpaAutoConfiguration)
20 23
#spring.jpa.database=default