Sfoglia il codice sorgente

提交ipu-database和msgframe结合使用示例

weihf 7 anni fa
parent
commit
f90e053c4a

+ 36 - 0
ipu-ddmp-data/.classpath

@ -0,0 +1,36 @@
1
<?xml version="1.0" encoding="UTF-8"?>
2
<classpath>
3
	<classpathentry kind="src" output="target/classes" path="src/main/java">
4
		<attributes>
5
			<attribute name="optional" value="true"/>
6
			<attribute name="maven.pomderived" value="true"/>
7
		</attributes>
8
	</classpathentry>
9
	<classpathentry excluding="**" kind="src" output="target/classes" path="src/main/resources">
10
		<attributes>
11
			<attribute name="maven.pomderived" value="true"/>
12
		</attributes>
13
	</classpathentry>
14
	<classpathentry kind="src" output="target/test-classes" path="src/test/java">
15
		<attributes>
16
			<attribute name="optional" value="true"/>
17
			<attribute name="maven.pomderived" value="true"/>
18
		</attributes>
19
	</classpathentry>
20
	<classpathentry excluding="**" kind="src" output="target/test-classes" path="src/test/resources">
21
		<attributes>
22
			<attribute name="maven.pomderived" value="true"/>
23
		</attributes>
24
	</classpathentry>
25
	<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/jdk1.8.0_131">
26
		<attributes>
27
			<attribute name="maven.pomderived" value="true"/>
28
		</attributes>
29
	</classpathentry>
30
	<classpathentry kind="con" path="org.eclipse.m2e.MAVEN2_CLASSPATH_CONTAINER">
31
		<attributes>
32
			<attribute name="maven.pomderived" value="true"/>
33
		</attributes>
34
	</classpathentry>
35
	<classpathentry kind="output" path="target/classes"/>
36
</classpath>

+ 23 - 0
ipu-ddmp-data/.project

@ -0,0 +1,23 @@
1
<?xml version="1.0" encoding="UTF-8"?>
2
<projectDescription>
3
	<name>ipu-ddmp-data</name>
4
	<comment></comment>
5
	<projects>
6
	</projects>
7
	<buildSpec>
8
		<buildCommand>
9
			<name>org.eclipse.jdt.core.javabuilder</name>
10
			<arguments>
11
			</arguments>
12
		</buildCommand>
13
		<buildCommand>
14
			<name>org.eclipse.m2e.core.maven2Builder</name>
15
			<arguments>
16
			</arguments>
17
		</buildCommand>
18
	</buildSpec>
19
	<natures>
20
		<nature>org.eclipse.jdt.core.javanature</nature>
21
		<nature>org.eclipse.m2e.core.maven2Nature</nature>
22
	</natures>
23
</projectDescription>

+ 154 - 0
ipu-ddmp-data/pom.xml

@ -0,0 +1,154 @@
1
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
2
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
3
	<modelVersion>4.0.0</modelVersion>
4
	<parent>
5
		<groupId>com.ai.ipu</groupId>
6
		<artifactId>ipu-spring-libs</artifactId>
7
		<version>3.0</version>
8
	</parent>
9

10
	<groupId>com.ai.ipu</groupId>
11
	<artifactId>ipu-ddmp-data</artifactId>
12
	<version>0.0.1-SNAPSHOT</version>
13
	<name>ipu-ddmp-data</name>
14

15
	<repositories>
16
		<repository>
17
			<id>ipu</id>
18
			<name>ipu repository</name>
19
			<url>http://114.215.100.48:9090/nexus/content/groups/public/</url>
20
			<releases>
21
				<enabled>true</enabled>
22
			</releases>
23
			<snapshots>
24
				<enabled>true</enabled>
25
				<updatePolicy>always</updatePolicy>
26
			</snapshots>
27
		</repository>
28
	</repositories>
29

30
	<properties>
31
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
32
		<msgframe>1.9.2</msgframe>
33
	</properties>
34
	<dependencies>
35
		<dependency>
36
			<groupId>org.springframework.boot</groupId>
37
			<artifactId>spring-boot-starter-web</artifactId>
38
			<exclusions>
39
				<exclusion>
40
					<groupId>org.hibernate</groupId>
41
					<artifactId>hibernate-validator</artifactId>
42
				</exclusion>
43
				<exclusion>
44
					<groupId>com.fasterxml.jackson.core</groupId>
45
					<artifactId>jackson-databind</artifactId>
46
				</exclusion>
47
			</exclusions>
48
		</dependency>
49
		<!-- 缺省用tomcat容器 -->
50
		<dependency>
51
			<groupId>org.springframework.boot</groupId>
52
			<artifactId>spring-boot-starter-tomcat</artifactId>
53
			<scope>provided</scope>
54
		</dependency>
55
		<!-- =============公共必选依赖============= -->
56
		<dependency>
57
			<groupId>javax.servlet</groupId>
58
			<artifactId>servlet-api</artifactId>
59
			<scope>provided</scope>
60
		</dependency>
61
		<dependency>
62
			<groupId>org.apache.commons</groupId>
63
			<artifactId>commons-lang3</artifactId>
64
		</dependency>
65
		<dependency>
66
			<groupId>junit</groupId>
67
			<artifactId>junit</artifactId>
68
			<scope>test</scope>
69
			<version>4.12</version>
70
		</dependency>
71
		<dependency>
72
			<groupId>log4j</groupId>
73
			<artifactId>log4j</artifactId>
74
		</dependency>
75
		<dependency>
76
			<groupId>dom4j</groupId>
77
			<artifactId>dom4j</artifactId>
78
		</dependency>
79
		<!-- msgframe相关依赖 -->
80
		<dependency>
81
			<groupId>com.ai.aif.msgframe</groupId>
82
			<artifactId>msgframe-common</artifactId>
83
			<version>${msgframe}</version>
84
		</dependency>
85
		<dependency>
86
			<groupId>com.ai.aif.msgframe</groupId>
87
			<artifactId>msgframe-client</artifactId>
88
			<version>${msgframe}</version>
89
		</dependency>
90
		<dependency>
91
			<groupId>com.ai.aif.msgframe</groupId>
92
			<artifactId>msgframe-server</artifactId>
93
			<version>${msgframe}</version>
94
		</dependency>
95
		<dependency>
96
			<groupId>com.ai.aif.msgframe</groupId>
97
			<artifactId>xmlmsgframe</artifactId>
98
			<version>2.6.4</version>
99
		</dependency>
100
		<dependency>
101
			<groupId>org.apache.xmlbeans</groupId>
102
			<artifactId>xmlbeans</artifactId>
103
			<version>2.6.0</version>
104
		</dependency>
105
		<dependency>
106
			<groupId>com.rabbitmq</groupId>
107
			<artifactId>amqp-client</artifactId>
108
			<version>3.6.5</version>
109
		</dependency>
110
		<dependency>
111
			<groupId>com.alibaba</groupId>
112
			<artifactId>fastjson</artifactId>
113
			<version>1.1.46</version>
114
		</dependency>
115
		<dependency>
116
			<groupId>org.hamcrest</groupId>
117
			<artifactId>hamcrest-core</artifactId>
118
			<version>1.3</version>
119
		</dependency>
120
		<dependency>
121
			<groupId>javax.jms</groupId>
122
			<artifactId>jms</artifactId>
123
			<version>1.1</version>
124
		</dependency>
125
		<dependency>
126
			<groupId>org.apache.kafka</groupId>
127
			<artifactId>kafka-clients</artifactId>
128
			<version>0.10.2.0</version>
129
		</dependency>
130

131
		<dependency>
132
			<groupId>com.ai.ipu</groupId>
133
			<artifactId>ipu-database</artifactId>
134
			<version>3.0</version>
135
		</dependency>
136

137
		<dependency>
138
			<groupId>org.aspectj</groupId>
139
			<artifactId>aspectjrt</artifactId>
140
			<version>1.8.9</version>
141
		</dependency>
142
		<!-- https://mvnrepository.com/artifact/org.aspectj/aspectjtools -->
143
		<dependency>
144
			<groupId>org.aspectj</groupId>
145
			<artifactId>aspectjtools</artifactId>
146
			<version>1.8.9</version>
147
		</dependency>
148
		<dependency>
149
			<groupId>org.aspectj</groupId>
150
			<artifactId>aspectjweaver</artifactId>
151
			<version>1.7.4</version>
152
		</dependency>
153
	</dependencies>
154
</project>

+ 37 - 0
ipu-ddmp-data/src/main/java/com/ai/ipu/ddmp/data/MainModule.java

@ -0,0 +1,37 @@
1
package com.ai.ipu.ddmp.data;
2

3
import org.springframework.beans.factory.annotation.Autowired;
4
import org.springframework.boot.SpringApplication;
5
import org.springframework.boot.autoconfigure.SpringBootApplication;
6
import org.springframework.boot.web.servlet.ServletComponentScan;
7

8
import com.ai.aif.msgframe.consumer.MfConsumerClient;
9
import com.ai.aif.msgframe.consumer.MfServiceStartup;
10
import com.ai.ipu.ddmp.data.config.DataConfig;
11
import com.ai.ipu.ddmp.data.service.DataManagerService;
12
import com.ai.ipu.ddmp.data.util.SpringUtil;
13

14

15
@SpringBootApplication
16
@ServletComponentScan
17
public class MainModule {
18
	@Autowired
19
    public DataConfig dataConfig;
20
	@Autowired
21
	public DataManagerService dataManagerService;
22
	
23
	public static void main(String[] args) {
24
		// TODO Auto-generated method stub
25
		SpringApplication.run(MainModule.class, args);
26
		
27
		//启动msgFrame消费者
28
		MfServiceStartup.main();
29
//		String[] topics = "tab_ipu_oper|tab_ipu_plugin|tab_ipu_page|tab_ipu_data|tab_ipu_crash|tab_ipu_device".split("\\|");
30
//		for (String topic:topics)
31
//		{
32
//			MfConsumerClient.subscribe(topic, "*",
33
//				"com.ai.ipu.ddmp.data.consumer.ConsumerProcessorImpl");
34
//		}
35
	}
36

37
}

+ 43 - 0
ipu-ddmp-data/src/main/java/com/ai/ipu/ddmp/data/aop/TranscationManager.java

@ -0,0 +1,43 @@
1
package com.ai.ipu.ddmp.data.aop;
2

3
import org.aspectj.lang.JoinPoint;
4
import org.aspectj.lang.annotation.After;
5
import org.aspectj.lang.annotation.AfterThrowing;
6
import org.aspectj.lang.annotation.Aspect;
7
import org.aspectj.lang.annotation.Before;
8
import org.springframework.stereotype.Component;
9

10
import com.ai.ipu.basic.log.ILogger;
11
import com.ai.ipu.basic.log.IpuLoggerFactory;
12
import com.ai.ipu.database.conn.SqlSessionManager;
13

14
@Aspect //声明该类是切面类
15
@Component//配置文件中启动自动扫描功能
16
public class TranscationManager {
17
	protected final transient ILogger log = IpuLoggerFactory.createLogger(TranscationManager.class);
18
	
19
	@Before("execution(* com.ai.ipu.ddmp.data.service.*.*(..))")
20
	public void startTranscation(){
21
		log.debug("开始数据库操作");
22
	}
23

24
	@AfterThrowing(pointcut = "execution(* com.ai.ipu.ddmp.data.service.*.*(..))", throwing = "e")
25
    private void doAfterThrow(JoinPoint joinPoint,  Throwable e) {
26
		e.printStackTrace();
27
		long start = System.currentTimeMillis();
28
		SqlSessionManager.rollbackAll();
29
		log.debug("回退数据库连接耗时:" + (System.currentTimeMillis() - start));
30
		
31
//		log.debug("数据库连接全量回收");
32
//		SqlSessionManager.closeAll();
33
	}
34

35
	@After("execution(* com.ai.ipu.ddmp.data.service.*.*(..))")
36
	public void commitTranscation(){
37
		long start = System.currentTimeMillis();
38
		SqlSessionManager.commitAll();
39
		log.debug("提交数据库连接耗时:" + (System.currentTimeMillis() - start));
40
		
41
	}
42
	
43
}

+ 55 - 0
ipu-ddmp-data/src/main/java/com/ai/ipu/ddmp/data/config/DataConfig.java

@ -0,0 +1,55 @@
1
package com.ai.ipu.ddmp.data.config;
2

3

4
import org.springframework.boot.context.properties.ConfigurationProperties;
5
import org.springframework.stereotype.Component;
6

7

8
/**
9
 * DataConfig
10
 * 
11
 * @author:weihf@asiainfo.com
12
 * @2018-06-28
13
 * @since 1.0
14
 */
15
@Component
16
@ConfigurationProperties(prefix = "data.config")
17
public class DataConfig {
18
	private int batchSize;
19
    private int lingerMs;
20
    private String topics;
21
    private String tables;
22
    private String seperator;
23
    
24
	public int getBatchSize() {
25
		return batchSize;
26
	}
27
	public void setBatchSize(int batchSize) {
28
		this.batchSize = batchSize;
29
	}
30
	public int getLingerMs() {
31
		return lingerMs;
32
	}
33
	public void setLingerMs(int lingerMs) {
34
		this.lingerMs = lingerMs;
35
	}
36
	public String[] getTopics() {
37
		return topics.split(seperator==null?"|":seperator);
38
	}
39
	public void setTopics(String topics) {
40
		this.topics = topics;
41
	}
42
	public String[] getTables() {
43
		return tables.split(seperator==null?"|":seperator);
44
	}
45
	public void setTables(String tables) {
46
		this.tables = tables;
47
	}
48
	public String getSeperator() {
49
		return seperator;
50
	}
51
	public void setSeperator(String seperator) {
52
		this.seperator = seperator;
53
	}
54

55
}

+ 25 - 0
ipu-ddmp-data/src/main/java/com/ai/ipu/ddmp/data/consumer/ConsumerProcessorImpl.java

@ -0,0 +1,25 @@
1
package com.ai.ipu.ddmp.data.consumer;
2
3
import com.ai.aif.msgframe.common.IConsumerProcessor;
4
import com.ai.aif.msgframe.common.exception.ConsumerException;
5
import com.ai.aif.msgframe.common.message.MsgFMessage;
6
import com.ai.aif.msgframe.common.message.MsgFTextMessage;
7
import com.ai.ipu.ddmp.data.manager.DataManager;
8
9
10
public class ConsumerProcessorImpl implements IConsumerProcessor {
11
	
12
	@SuppressWarnings("rawtypes")
13
	public Object process(MsgFMessage msg) throws ConsumerException {
14
	
15
		if (msg instanceof MsgFTextMessage) {
16
			System.out.println("receive message" + "   " +msg.getTopic() + msg.getMsgId() + "   " + msg.getFilterTag() + "       "
17
					+ ((MsgFTextMessage) msg).getText());
18
			DataManager.putSql(msg.getTopic(), ((MsgFTextMessage) msg).getText());
19
		}
20
21
		// 返回Boolean类型的false,代表消息消费失败,重新消费,其他表示消费成功
22
		return new Boolean(true);
23
	}
24
25
}

+ 32 - 0
ipu-ddmp-data/src/main/java/com/ai/ipu/ddmp/data/dao/DataDao.java

@ -0,0 +1,32 @@
1
package com.ai.ipu.ddmp.data.dao;
2

3
import java.io.IOException;
4
import java.util.HashMap;
5
import java.util.List;
6
import java.util.Map;
7

8
import com.ai.ipu.database.dao.impl.AbstractBizDao;
9

10
public class DataDao extends AbstractBizDao {
11

12
	public DataDao(String connName) throws IOException {
13
		super(connName);
14
	}
15

16
	public int save(String sql) throws Exception
17
	{
18
		return dao.executeInsert(sql);
19
	}
20
	
21
	public int save(String sql, Map<String, Object> param) throws Exception
22
	{
23
		return dao.executeInsert(sql, param);
24
	}
25
	
26
	public List<Map<String, Object>> takeTableCols(String tableName) throws Exception {
27
    	String sql = "select COLUMN_NAME, IS_NULLABLE, DATA_TYPE  from information_schema.COLUMNS where table_name =#{tableName} and EXTRA != 'auto_increment'";
28
    	Map<String, Object> param = new HashMap<String, Object>();
29
    	param.put("tableName", tableName);
30
    	return  dao.executeSelect(sql, param);   
31
    }
32
}

+ 139 - 0
ipu-ddmp-data/src/main/java/com/ai/ipu/ddmp/data/manager/DataManager.java

@ -0,0 +1,139 @@
1
package com.ai.ipu.ddmp.data.manager;
2

3

4
import java.util.ArrayList;
5
import java.util.List;
6
import java.util.Queue;
7
import java.util.Timer;
8
import java.util.TimerTask;
9
import java.util.concurrent.ConcurrentHashMap;
10
import java.util.concurrent.ConcurrentLinkedQueue;
11
import java.util.concurrent.ConcurrentMap;
12
import java.util.concurrent.atomic.AtomicBoolean;
13
import java.util.concurrent.atomic.AtomicInteger;
14

15
import org.slf4j.Logger;
16
import org.slf4j.LoggerFactory;
17
import org.springframework.boot.CommandLineRunner;
18
import org.springframework.stereotype.Component;
19

20
import com.ai.ipu.ddmp.data.config.DataConfig;
21
import com.ai.ipu.ddmp.data.service.DataManagerService;
22
import com.ai.ipu.ddmp.data.util.SpringUtil;
23

24

25
@Component
26
public class DataManager implements CommandLineRunner {
27
	private static final Logger logger = LoggerFactory.getLogger(DataManager.class);
28
	private static final AtomicInteger batchSize = new AtomicInteger(0);
29
	private static final ConcurrentMap<String /* table name*/, Queue<String /* sql*/>> sqlTable = new ConcurrentHashMap<String, Queue<String>>();
30
	private static final ConcurrentMap<String /* table name*/, AtomicBoolean /* lock */> tableLock = new ConcurrentHashMap<String, AtomicBoolean>();
31
	private static final ConcurrentMap<String /* table name*/, AtomicInteger /* size */> tableSize = new ConcurrentHashMap<String, AtomicInteger>();
32

33
	
34
	public void run(String... args) throws Exception {
35
		// TODO Auto-generated method stub
36
		DataConfig dataConfig = (DataConfig) SpringUtil.getBean("dataConfig");
37
		/*Queue<String> sqlQue = new ConcurrentLinkedQueue();
38
		AtomicBoolean lock = new AtomicBoolean(false);
39
		AtomicInteger queSize = new AtomicInteger(0);
40
		
41
		sqlQue.add("a");
42
		queSize.incrementAndGet();
43
		
44
		sqlQue.add("b");
45
		queSize.incrementAndGet();
46
		sqlQue.add("c");
47
		queSize.incrementAndGet();
48
		
49
		sqlTable.put("test", sqlQue);
50
		tableLock.put("test", lock);
51
		tableSize.put("test", queSize);*/
52
		
53
		if (dataConfig.getBatchSize()<=0)
54
			batchSize.set(1000);
55
		else
56
			batchSize.set(dataConfig.getBatchSize());
57
		
58
		if (dataConfig.getLingerMs() == 0)
59
			return;
60
        
61
		Timer timer = new Timer();
62
        timer.scheduleAtFixedRate(new TimerTask() {
63
            public void run() {
64
            	logger.debug("开始做定时任务");
65
            	
66
            	/* 如果到达队列定义长度或时间间隔时,通过tableLock判断是否已经在对队列做取处理。不允许同时对队列做取操作*/
67
            	for(String key : sqlTable.keySet())
68
            	{
69
            		if (tableSize.get(key) == null)
70
            			tableSize.put(key, new AtomicInteger(0));
71

72
            		if (tableLock.get(key) == null)
73
            			tableLock.put(key, new AtomicBoolean(false));
74
            		
75
            		tableLock.get(key).set(true);
76
            		List sqlList = new ArrayList();
77
           			String sql;
78
    				while ((sql = sqlTable.get(key).poll())  != null)
79
           			{
80
    					tableSize.get(key).decrementAndGet();
81
    					sqlList.add(sql);
82
           			}
83
    				DataManagerService service = (DataManagerService) SpringUtil.getBean("dataManagerService");
84
    				try {
85
    					service.saveBatch(key, sqlList);
86
					} catch (Exception e) {
87
						// TODO Auto-generated catch block
88
						e.printStackTrace();
89
					}
90
    				tableLock.get(key).set(false);
91
            	}
92
    			
93
            	logger.debug("定时任务完成");
94
            }
95
        }, 0, dataConfig.getLingerMs()*1000);        
96
	}
97

98
	public static boolean putSql(String topicName, String sql) {
99
		// TODO Auto-generated method stub
100
		if (sql == null || "".equals(sql.trim())) {
101
			return false;
102
		}
103

104
		if (tableSize.get(topicName) == null)
105
			tableSize.put(topicName, new AtomicInteger(0));
106

107
		if (tableLock.get(topicName) == null)
108
			tableLock.put(topicName, new AtomicBoolean(false));
109
		
110
		if (sqlTable.get(topicName) == null)
111
			sqlTable.put(topicName, new ConcurrentLinkedQueue());
112
		
113
		/* 如果到达队列定义长度或时间间隔时,通过tableLock判断是否已经在对队列做取处理。不允许同时对队列做取操作*/
114
		if (tableSize.get(topicName).get()>=batchSize.intValue() && !tableLock.get(topicName).get())
115
		{
116
			tableLock.get(topicName).set(true);
117
			List sqlList = new ArrayList();
118
   			String sqlStr;
119
			while ((sqlStr = sqlTable.get(topicName).poll())  != null)
120
   			{
121
				tableSize.get(topicName).decrementAndGet();
122
				sqlList.add(sqlStr);
123
   			}
124
			DataManagerService service = (DataManagerService) SpringUtil.getBean("dataManagerService");
125
			try {
126
				service.saveBatch(topicName, sqlList);
127
			} catch (Exception e) {
128
				// TODO Auto-generated catch block
129
				e.printStackTrace();
130
			}
131
			tableLock.get(topicName).set(false);
132
		}
133
		sqlTable.get(topicName).add(sql);
134
		int size = tableSize.get(topicName).incrementAndGet();
135
		logger.debug("sqlTable size:" + size);
136
		return true;
137
	}
138
	
139
}

+ 180 - 0
ipu-ddmp-data/src/main/java/com/ai/ipu/ddmp/data/service/DataManagerService.java

@ -0,0 +1,180 @@
1
package com.ai.ipu.ddmp.data.service;
2

3
import java.util.HashMap;
4
import java.util.List;
5
import java.util.Map;
6

7
import org.springframework.stereotype.Service;
8

9
import com.ai.ipu.database.dao.IpuDaoManager;
10
import com.ai.ipu.ddmp.data.config.DataConfig;
11
import com.ai.ipu.ddmp.data.dao.DataDao;
12
import com.ai.ipu.ddmp.data.util.SpringUtil;
13
import com.alibaba.fastjson.JSONObject;
14

15
@Service
16
public class DataManagerService {
17

18
	public int save(String topicName, String value) throws Exception {
19
		// TODO Auto-generated method stub
20
		DataDao dao = IpuDaoManager.takeDao(DataDao.class, "test");
21
		String[] topics = ((DataConfig) SpringUtil.getBean("dataConfig")).getTopics();
22
		String[] tables = ((DataConfig) SpringUtil.getBean("dataConfig")).getTables();
23
		String tableName = "";
24
		for (int i=0;i<tables.length;i++)
25
		{
26
			if (topics[i].equals(topicName))
27
			{
28
				tableName = tables[i];
29
				break;
30
			}
31
		}
32
		List<Map<String, Object>> cols = dao.takeTableCols(tableName);
33
		StringBuffer sql = new StringBuffer();
34
		sql.append("insert into ")
35
		.append(tableName)
36
		.append(" ( ");
37
		
38
		int j=0;
39
		for (Map<String, Object> col:cols)
40
		{
41
			if (j>0)
42
				sql.append(",");
43
			sql.append(col.get("COLUMN_NAME"));
44
			j++;
45
		}
46
		
47
		sql.append(" ) ") 
48
		.append(" values ");
49
		
50
		Map<String, Object> param = new HashMap<String, Object>();
51
		JSONObject jsValue= JSONObject.parseObject(value);
52
		sql.append("(");
53
		int k=0;
54
		for (Map<String, Object> col:cols)
55
		{
56
			if (jsValue.get(col.get("COLUMN_NAME")) != null)
57
			{
58
				
59
				if (k>0)
60
					sql.append(",");
61
				param.put((String)col.get("COLUMN_NAME"), jsValue.get(col.get("COLUMN_NAME")));
62
				sql.append("#{")
63
				.append(col.get("COLUMN_NAME"))
64
				.append("}");
65
				k++;
66
			}
67
			else if (col.get("IS_NULLABLE").equals("Y"))
68
			{
69
				if (k>0)
70
					sql.append(",");
71
				if ("datetime".equalsIgnoreCase((String) col.get("DATA_TYPE")))
72
					;
73
				else if ("date".equalsIgnoreCase((String) col.get("DATA_TYPE")))
74
					;
75
				else if ("timestamp".equalsIgnoreCase((String) col.get("DATA_TYPE")))
76
					;
77
				else
78
					param.put((String)col.get("COLUMN_NAME"),"null");
79
				sql.append("#{")
80
				.append(col.get("COLUMN_NAME"))
81
				.append("}");
82
				k++;
83
			}
84
			else
85
				throw new Exception(col.get("COLUMN_NAME")+" is not null.");
86
				
87
		}
88
		sql.append(")");
89

90
		return dao.save(sql.toString(), param);
91
	}
92

93
	public int saveBatch(String topicName, List<String> valueList) throws Exception {
94
		// TODO Auto-generated method stub
95
		if (valueList == null || valueList.size() == 0)
96
			return -1;
97
		else if (valueList.size() == 1)
98
			return save(topicName, valueList.get(0));
99
		
100
		DataDao dao = IpuDaoManager.takeDao(DataDao.class, "test");
101
		String[] topics = ((DataConfig) SpringUtil.getBean("dataConfig")).getTopics();
102
		String[] tables = ((DataConfig) SpringUtil.getBean("dataConfig")).getTables();
103
		String tableName = "";
104
		for (int i=0;i<tables.length;i++)
105
		{
106
			if (topics[i].equals(topicName))
107
			{
108
				tableName = tables[i];
109
				break;
110
			}
111
		}
112
		List<Map<String, Object>> cols = dao.takeTableCols(tableName);
113
		StringBuffer sql = new StringBuffer();
114
		sql.append("insert into ")
115
		.append(tableName)
116
		.append(" ( ");
117
		
118
		int j=0;
119
		for (Map<String, Object> col:cols)
120
		{
121
			if (j>0)
122
				sql.append(",");
123
			sql.append(col.get("COLUMN_NAME"));
124
			j++;
125
		}
126
		
127
		sql.append(" ) ") 
128
		.append(" values ");
129
		j=0;
130
		for (String value:valueList)
131
		{
132
			JSONObject jsValue= JSONObject.parseObject(value);
133
			if (j>0)
134
				sql.append(",");
135
			sql.append("(");
136
			int k=0;
137
			for (Map<String, Object> col:cols)
138
			{
139
				if (jsValue.get(col.get("COLUMN_NAME")) != null)
140
				{
141
					
142
					if (k>0)
143
						sql.append(",");
144
					if ("varchar".equalsIgnoreCase((String) col.get("DATA_TYPE")) || "char".equalsIgnoreCase((String) col.get("DATA_TYPE")))
145
						sql.append("'").append(jsValue.get(col.get("COLUMN_NAME"))).append("'");
146
					else if ("datetime".equalsIgnoreCase((String) col.get("DATA_TYPE")))
147
						;
148
					else if ("date".equalsIgnoreCase((String) col.get("DATA_TYPE")))
149
						;
150
					else if ("timestamp".equalsIgnoreCase((String) col.get("DATA_TYPE")))
151
						;
152
					else
153
						sql.append(jsValue.get(col.get("COLUMN_NAME")));
154
					k++;
155
				}
156
				else if (col.get("IS_NULLABLE").equals("Y"))
157
				{
158
					if (k>0)
159
						sql.append(",");
160
					if ("datetime".equalsIgnoreCase((String) col.get("DATA_TYPE")))
161
						;
162
					else if ("date".equalsIgnoreCase((String) col.get("DATA_TYPE")))
163
						;
164
					else if ("timestamp".equalsIgnoreCase((String) col.get("DATA_TYPE")))
165
						;
166
					else
167
						sql.append("null");
168
					k++;
169
				}
170
				else
171
					throw new Exception(col.get("COLUMN_NAME")+" is not null.");
172
					
173
			}
174
			sql.append(")");
175
			j++;
176
		}
177
		return dao.save(sql.toString());
178
	}
179
	
180
}

+ 40 - 0
ipu-ddmp-data/src/main/java/com/ai/ipu/ddmp/data/util/SpringUtil.java

@ -0,0 +1,40 @@
1
package com.ai.ipu.ddmp.data.util;
2

3
import org.springframework.beans.BeansException;
4
import org.springframework.context.ApplicationContext;
5
import org.springframework.context.ApplicationContextAware;
6
import org.springframework.context.support.ClassPathXmlApplicationContext;
7
import org.springframework.stereotype.Component;
8

9
@Component
10
public class SpringUtil implements ApplicationContextAware {
11
	private static ApplicationContext applicationContext = null;
12
	
13
	public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
14
		if(SpringUtil.applicationContext == null){
15
            SpringUtil.applicationContext  = applicationContext;
16
        }
17
	}
18

19
	//获取applicationContext
20
    public static ApplicationContext getApplicationContext() {
21
        return applicationContext;
22
    }
23

24
    //通过name获取 Bean.
25
    public static Object getBean(String name){
26
        return getApplicationContext().getBean(name);
27

28
    }
29

30
    //通过class获取Bean.
31
    public static <T> T getBean(Class<T> clazz){
32
        return getApplicationContext().getBean(clazz);
33
    }
34

35
    //通过name,以及Clazz返回指定的Bean
36
    public static <T> T getBean(String name,Class<T> clazz){
37
        return getApplicationContext().getBean(name, clazz);
38
    }
39

40
}

+ 30 - 0
ipu-ddmp-data/src/main/resources/application.properties

@ -0,0 +1,30 @@
1
#设置服务器端口
2
#也可以通过启动命令行参数实现:java -jar myproject.jar --server.port=9084
3
server.port=9999
4

5
is.spring.boot=true
6
web.root=webapp/
7
spring.mvc.static-path-pattern=/**
8
spring.resources.static-locations=classpath:/${web.root}
9

10
#context-path必须以/开头
11
server.context-path=/ipuDdmpData
12
spring.application.name=ipuDdmpData
13
spring.application.index=ipuDdmpData
14

15
logging.file=./logs/ipuDdmpData.log
16
#logging.config=classpath:logback.xml
17

18
#data.config相关配置,所有配置都可以通过--启动参数覆盖
19
#队列长度
20
data.config.batchSize=10000
21
#时间间隔,单位:秒
22
data.config.lingerMs=5
23
#设置接收主题
24
#data.config.topics=ipu_oper|ipu_plugin|ipu_page|ipu_data|ipu_crash|ipu_device
25
data.config.topics=ipu_data
26
#每个主题对应的消息需要导入的数据库表。不同主题对应的表之间用|分隔
27
#data.config.tables=tab_ipu_oper|tab_ipu_plugin|tab_ipu_page|tab_ipu_data|tab_ipu_crash|tab_ipu_device
28
data.config.tables=tmp_info
29
#topics和tables的分隔符,由于|有特殊含义,因此需要写为\\|
30
data.config.seperator=\\|

+ 17 - 0
ipu-ddmp-data/src/main/resources/applicationContext.xml

@ -0,0 +1,17 @@
1
<?xml version="1.0" encoding="UTF-8"?>
2
<beans xmlns="http://www.springframework.org/schema/beans"
3
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4
    xmlns:aop="http://www.springframework.org/schema/aop"
5
    xmlns:context="http://www.springframework.org/schema/context"
6
    xsi:schemaLocation="http://www.springframework.org/schema/beans 
7
        http://www.springframework.org/schema/beans/spring-beans.xsd
8
        http://www.springframework.org/schema/context 
9
        http://www.springframework.org/schema/context/spring-context-4.3.xsd
10
        http://www.springframework.org/schema/aop 
11
        http://www.springframework.org/schema/aop/spring-aop-4.3.xsd" >
12

13
        <!-- 自动扫描包下的类,并将其实例化。多个包之间用,隔开 -->
14
        <context:component-scan base-package="com.ai.ipu.ddmp.data"></context:component-scan>
15
        <!-- 配置文件中启动AspectJ的注解功能 ,默认是false,要将其改为true -->
16
        <aop:aspectj-autoproxy proxy-target-class="true"></aop:aspectj-autoproxy> 
17
</beans>

+ 6 - 0
ipu-ddmp-data/src/main/resources/destination_rule.xml

@ -0,0 +1,6 @@
1
<?xml version="1.0" encoding="utf-8"?>
2
<destinationRuleCfg xmlns="http://www.asiainfo.com/msgframe/destinationrule">
3
	<subject name="TagTest">
4
		<area name="tag1" desc="tag1" number="10"/>
5
	</subject>
6
</destinationRuleCfg>	

+ 40 - 0
ipu-ddmp-data/src/main/resources/ipu-mybatis-config.xml

@ -0,0 +1,40 @@
1
<?xml version="1.0" encoding="UTF-8" ?>
2
<!DOCTYPE configuration
3
    PUBLIC "-//mybatis.org//DTD Config 3.0//EN"
4
    "http://mybatis.org/dtd/mybatis-3-config.dtd">
5

6
<configuration>
7
    <settings>
8
		<setting name="defaultFetchSize" value="1000" /> <!-- 结果集获取数量提示值,分批传输 -->
9
	</settings>
10
    <plugins>
11
        <!-- 分页插件,可根据参数定制化 -->
12
	    <plugin interceptor="com.github.pagehelper.PageInterceptor">
13
	        <!-- config params as the following -->
14
		</plugin>
15
	</plugins>
16
	<environments default="test">
17
		<environment id="test">
18
			<transactionManager type="JDBC" />
19
			<dataSource type="com.ai.ipu.database.datasource.C3P0DataSourceFactory">
20
				<property name="driverClass" value="com.mysql.jdbc.Driver" />
21
				<!--<property name="jdbcUrl" value="jdbc:mysql://121.42.183.206:3317/test?useSSL=false" />  -->
22
				<property name="jdbcUrl" value="jdbc:mysql://10.19.13.54:6601/test?useSSL=false" />
23
				<property name="user" value="ipu" />
24
				<property name="password" value="ipumysql" />
25
				<!-- 连接池用完时,等待获取新连接的时间 (毫秒) -->
26
				<property name="checkoutTimeout" value="5000" />
27
				<!--定义在从数据库获取新连接失败后重复尝试的次数。Default: 30 -->
28
				<property name="acquireRetryAttempts" value="5" />
29
				<!--两次连接中间隔时间,单位毫秒。Default: 1000 -->
30
				<property name="acquireRetryDelay" value="1000" />
31
				<property name="initialPoolSize" value="50" />
32
				<property name="minPoolSize" value="50" />
33
				<property name="maxPoolSize" value="50" />
34
				<property name="maxIdleTime" value="600" />
35
				<property name="idleConnectionTestPeriod" value="60" />
36
				<property name="preferredTestQuery" value="SELECT 1" />
37
			</dataSource>
38
		</environment>
39
	</environments>
40
</configuration>

+ 53 - 0
ipu-ddmp-data/src/main/resources/logback.xml

@ -0,0 +1,53 @@
1
<?xml version="1.0" encoding="UTF-8"?>
2
<configuration>
3
	<include resource="org/springframework/boot/logging/logback/base.xml" /> <!-- logback 提供的基本配置 -->
4

5
	<!-- 控制台 -->
6
	<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
7
		<encoder charset="UTF-8">
8
			<!-- <pattern>[%d{yyyy-MM-dd HH:mm:ss.SSS}] %5p %m%n</pattern> -->
9
			<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} {%thread} %-5level %logger{50}-%msg%n</pattern>
10
			<charset>UTF-8</charset> <!-- 解决中文乱码问题 -->
11
		</encoder>
12
	</appender>
13

14
	<!-- 记录到文件 (每天一个文件) -->
15
	<appender name="FILE"
16
		class="ch.qos.logback.core.rolling.RollingFileAppender">
17
		<file>/data/test/logs/ipuDdmpData.log</file><!-- 
18
			最新的log文件名 -->
19
		<append>true</append>
20
		<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
21
			<!-- 历史文件名 -->
22
			<fileNamePattern>/data/test/logs/ipuDdmpData-%d{yyyy-MM-dd}.%i.log
23
			</fileNamePattern>
24
			<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
25
				<maxFileSize>104857600</maxFileSize>
26
			</timeBasedFileNamingAndTriggeringPolicy>
27
			<MaxHistory>30</MaxHistory><!-- 保留 30 天的日志 -->
28
		</rollingPolicy>
29
		<encoder>
30
			<!-- <pattern>[%d{yyyy-MM-dd HH:mm:ss.SSS}] %5p %m%n</pattern> -->
31
			<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} {%thread} %-5level %logger{50}-%msg%n</pattern>
32
			<!-- <charset class="java.nio.charset.Charset">UTF-8</charset> -->
33
			<charset>UTF-8</charset>  <!--解决中文乱码问题 -->
34
		</encoder>
35
	</appender>
36

37
	<!-- <root level="DEBUG"> -->
38
	<!-- <root level="INFO"> <appender-ref ref="CONSOLE" /> <appender-ref ref="FILE" 
39
		/> </root> -->
40

41
	<!-- 将上面两个 appender 关联到我们的项目 -->
42
	<logger name="com.ai.ipu.ddmp.data" level="DEBUG"
43
		additivity="false"> <!-- name 是项目包名,为了方便调试,输出 DEBUG 级别及其以上的log -->
44
		<appender-ref ref="CONSOLE" />
45
		<appender-ref ref="FILE" />
46
	</logger>
47

48
	<!-- 其他框架的日志输出 -->
49
	<property name="LOG_FILE"
50
		value="/data/test/logs/spring-boot-tmp.log" />
51
	<logger name="org.springframework" level="INFO" />   <!-- spring 包下的 logger, 只输出 INFO 级别的 -->
52

53
</configuration> 

+ 66 - 0
ipu-ddmp-data/src/main/resources/msgframe-config.xml

@ -0,0 +1,66 @@
1
<?xml version="1.0" encoding="UTF-8"?>
2
<msgframeCfg xmlns="http://www.asiainfo.com/msgframe">
3
	<centerCfg>
4
		<!--[1-1] 中心名称  定义当前中心或者当前系统的名称编码,全局配置唯一性-->
5
		<name>orderCenter</name>
6
		<!-- 全局配置  影响全局的一些参数配置,如入职中心的开关配置,整体的序列化方式等,全局配置唯一性 -->
7
		<globalCfg>
8
			<!-- 消息生产端消息注入处理类,可以不配或者多配,在发送时可以在消息发送前执行,配置类的全路径且继承接口 com.cmos.msgframe.common.IProsInjectionProcessor -->
9
			<!-- 		<prodInjection>sadsa</prodInjection> -->
10
			<!-- 消息消费端消息注入处理类,可以不配或者多配,在消费时可以在消息处理业务前执行,配置类的全路径且继承接口 com.cmos.msgframe.common.IConsInjectionProcessor -->
11
			<!-- 		<consInjection>sadsa</consInjection> -->
12
		</globalCfg>
13

14
		<!-- 总的主题列表配置信息,包含所有的主题配置信息,全局配置唯一性 -->
15
		<destinations>
16
		    <queue name="ipu_data" belong="orderCenter"/>
17
			<queue name="PerfTest" belong="orderCenter"/>
18
			<queue name="TagTest" belong="orderCenter" ruleClass="com.ai.aif.msgframe.common.route.impl.CustomDestinationRule"/>
19
			<queue name="OrderTest" order="true" belong="orderCenter" number="1"/>
20
			<queue name="TransactionTest" belong="orderCenter"/>
21
		</destinations>
22
		<!-- 消费端的配置信息,主要包含订阅的主题和消息的业务实现类,全局配置唯一性 -->
23
		<subscribes>
24
		    <!--   主题订阅信息,订阅在<destinations></destinations>标签中的主题消息 -->
25
		    <subscribe subDestination="ipu_data" consumeType="push">
26
				<implclass>com.ai.ipu.ddmp.data.consumer.ConsumerProcessorImpl</implclass>
27
			</subscribe>
28
			<!-- tag:可选,默认是*。配合destinations->ruleClass,如果有多个过滤条件,请按照a||b -->
29
			<subscribe subDestination="PerfTest" consumeType="push">
30
				<!-- 配置实现类,利用管道流的思想,按照配置顺序依次处理消息.只支持异步消息 -->
31
				<implclass>com.ai.ipu.ddmp.data.consumer.ConsumerProcessorImpl</implclass>
32
			</subscribe>
33
			<subscribe subDestination="TagTest" consumeType="push">
34
				<implclass>com.ai.ipu.ddmp.data.consumer.ConsumerProcessorImpl</implclass>
35
			</subscribe>
36
			<subscribe subDestination="OrderTest" consumeType="push">
37
				<implclass>com.ai.ipu.ddmp.data.consumer.ConsumerProcessorImpl</implclass>
38
			</subscribe>
39
			<subscribe subDestination="TransactionTest" consumeType="push">
40
				<implclass>com.ai.ipu.ddmp.data.consumer.ConsumerProcessorImpl</implclass>
41
			</subscribe>
42
		</subscribes>
43
		<!-- 主要配置中心(系统)和其所属集群的关系,全局配置唯一性 -->
44
		<centers>
45
			<center name="orderCenter" containClusters="cluster-1" />
46
		</centers>
47
		<!-- 集群配置,主要配置当前中心或者系统用到的所有集群信息,全局配置唯一性 -->
48
		<clusters>
49
		<!--RocketMQ、Kafka、ActiveMQ  -->
50
			<cluster name="cluster-1" type="Kafka">
51
  				<!-- <url>10.1.243.19:9876;10.1.243.20:9876</url> -->
52
  				     <url>10.19.13.54:9092</url>
53
			</cluster>
54
		</clusters>
55

56
		<!-- 持久化配置区,主要包括生产端和消费端的异常持久化配置、生产端和消费端的日志持久化配置,全局配置唯一性 -->
57
		<persistence>
58
			<exceptionPersistence>
59
				<!-- 生产者异常处理,必须实现com.ai.aif.msgframe.common.ex.exception.IExceptionPersitence接口 -->
60
				<producerExceptionClass>com.ai.aif.msgframe.producer.ex.ProducerExceptionHandle</producerExceptionClass>
61
				<!-- 消费者异常处理,必须实现com.ai.aif.msgframe.common.ex.exception.IExceptionPersitence接口 -->
62
				<consumerExceptionClass>com.ai.aif.msgframe.consumer.ex.ConsumerExceptionHandle</consumerExceptionClass>
63
			</exceptionPersistence>
64
		</persistence>
65
	</centerCfg>
66
</msgframeCfg>

+ 62 - 0
ipu-ddmp-data/src/test/java/SendMessageTest.java

@ -0,0 +1,62 @@
1
2
3
import java.util.concurrent.ExecutorService;
4
import java.util.concurrent.Executors;
5
6
import org.slf4j.Logger;
7
import org.slf4j.LoggerFactory;
8
9
import com.ai.aif.msgframe.MfProducerClient;
10
import com.ai.aif.msgframe.common.message.MsgFTextMessage;
11
12
public class SendMessageTest {
13
14
	private static final Logger logger = LoggerFactory.getLogger(SendMessageTest.class);
15
16
	private static final ExecutorService pool = Executors.newFixedThreadPool(5);
17
18
	@org.junit.Test
19
	public void sendMessageTest() {
20
21
		try {
22
//			for (int i = 0; i < 10; i++) {
23
24
				pool.submit(new ProducerThreadTest());
25
//			}
26
27
		} catch (Exception e) {
28
			throw new RuntimeException(e);
29
		}
30
		
31
		while(true){
32
			
33
		}
34
35
36
	}
37
38
	class ProducerThreadTest implements Runnable {
39
40
		public void run() {
41
42
			try {
43
44
				MfProducerClient client = new MfProducerClient();
45
46
				for (int i = 0; i < 10; i++) {
47
					MsgFTextMessage message = new MsgFTextMessage();
48
					message.setText("{info_name:\"abc"+i+"\",info_class:"+i+"}");
49
					client.send("ipu_data", message);
50
					logger.info("发送成功," + message);
51
52
					Thread.currentThread().sleep(1000);
53
				}
54
55
			} catch (Exception e) {
56
				throw new RuntimeException(e);
57
			}
58
59
		}
60
	}
61
62
}

+ 62 - 0
ipu-ddmp-data/src/test/java/SendOnewayMessageTest.java

@ -0,0 +1,62 @@
1

2

3
import java.util.concurrent.ExecutorService;
4
import java.util.concurrent.Executors;
5

6
import org.slf4j.Logger;
7
import org.slf4j.LoggerFactory;
8

9
import com.ai.aif.msgframe.MfProducerClient;
10
import com.ai.aif.msgframe.common.message.MsgFTextMessage;
11

12
public class SendOnewayMessageTest {
13
	
14
	private static final Logger logger = LoggerFactory.getLogger(SendOnewayMessageTest.class);
15

16
	private static final ExecutorService pool = Executors.newFixedThreadPool(5);
17

18
	@org.junit.Test
19
	public void sendMessageTest() {
20

21
		try {
22
			for (int i = 0; i < 10; i++) {
23

24
				pool.submit(new ProducerThreadTest());
25
			}
26

27
		} catch (Exception e) {
28
			throw new RuntimeException(e);
29
		}
30
		
31
		while(true){
32
			
33
		}
34

35

36
	}
37

38
	class ProducerThreadTest implements Runnable {
39

40
		public void run() {
41

42
			try {
43

44
				MfProducerClient client = new MfProducerClient();
45
				for (int i = 0; i < 10000; i++) {
46
					
47
					MsgFTextMessage message = new MsgFTextMessage();
48
					message.setText("oneway message test " + i);
49
					client.sendOneway("PerfTest", message);
50
					logger.info("发送成功," + message);
51

52
					Thread.currentThread().sleep(1000);
53
				}
54

55
			} catch (Exception e) {
56
				throw new RuntimeException(e);
57
			}
58

59
		}
60
	}
61

62
}

+ 63 - 0
ipu-ddmp-data/src/test/java/SendOrderMessageTest.java

@ -0,0 +1,63 @@
1

2

3
import java.util.concurrent.ExecutorService;
4
import java.util.concurrent.Executors;
5

6
import org.slf4j.Logger;
7
import org.slf4j.LoggerFactory;
8

9
import com.ai.aif.msgframe.MfProducerClient;
10
import com.ai.aif.msgframe.common.message.MsgFTextMessage;
11

12
public class SendOrderMessageTest {
13

14
	private static final Logger logger = LoggerFactory.getLogger(SendOrderMessageTest.class);
15

16
	private static final ExecutorService pool = Executors.newFixedThreadPool(5);
17

18
	@org.junit.Test
19
	public void sendMessageTest() {
20

21
		try {
22
			for (int i = 0; i < 10; i++) {
23

24
				pool.submit(new ProducerThreadTest());
25
			}
26

27
		} catch (Exception e) {
28
			throw new RuntimeException(e);
29
		}
30
		
31
		while(true){
32
			
33
		}
34

35

36
	}
37

38
	class ProducerThreadTest implements Runnable {
39

40
		public void run() {
41

42
			try {
43
				MfProducerClient client = new MfProducerClient();
44
				String orderId = "2013";// orderId对于同一组顺序消息必须一致
45
				for (int i = 0; i < 3; i++) {
46

47
					MsgFTextMessage orderMessage = new MsgFTextMessage();
48
					orderMessage.setText("order message " + i);
49
					client.sendOrderMsg("OrderTest", orderMessage, orderId);
50
					
51
					logger.info("发送成功," + orderMessage);
52

53
					Thread.currentThread().sleep(1000);
54
				}
55

56
			} catch (Exception e) {
57
				throw new RuntimeException(e);
58
			}
59

60
		}
61
	}
62

63
}

+ 63 - 0
ipu-ddmp-data/src/test/java/SendTagMessageTest.java

@ -0,0 +1,63 @@
1

2
import java.util.concurrent.ExecutorService;
3
import java.util.concurrent.Executors;
4

5
import org.slf4j.Logger;
6
import org.slf4j.LoggerFactory;
7

8
import com.ai.aif.msgframe.MfProducerClient;
9
import com.ai.aif.msgframe.common.message.MsgFTextMessage;
10

11
public class SendTagMessageTest {
12

13
	private static final Logger logger = LoggerFactory.getLogger(SendTagMessageTest.class);
14

15
	private static final ExecutorService pool = Executors.newFixedThreadPool(5);
16

17
	@org.junit.Test
18
	public void sendMessageTest() {
19

20
		try {
21
			for (int i = 0; i < 10; i++) {
22

23
				pool.submit(new ProducerThreadTest());
24
			}
25

26
		} catch (Exception e) {
27
			throw new RuntimeException(e);
28
		}
29
		
30
		while(true){
31
			
32
		}
33

34

35
	}
36

37
	class ProducerThreadTest implements Runnable {
38

39
		public void run() {
40

41
			try {
42
				MfProducerClient client = new MfProducerClient();
43
				
44
				for (int i = 0; i < 10000; i++) {
45

46
					MsgFTextMessage message = new MsgFTextMessage();
47
					message.setText("message tag test" + i);
48
					message.setFilterTag("tag1");// 设置消息tag
49
					client.send("TagTest", message);
50

51
					logger.info("发送成功," + message);
52

53
					Thread.currentThread().sleep(1000);
54
				}
55

56
			} catch (Exception e) {
57
				throw new RuntimeException(e);
58
			}
59

60
		}
61
	}
62

63
}