Bladeren bron

msgframe的kafka测试代码

liutong3 6 jaren geleden
bovenliggende
commit
655b5fcdb8

+ 37 - 0
ipu-kafka-example/ipu-kafka-example.iml

@ -0,0 +1,37 @@
1
<?xml version="1.0" encoding="UTF-8"?>
2
<module org.jetbrains.idea.maven.project.MavenProjectsManager.isMavenModule="true" type="JAVA_MODULE" version="4">
3
  <component name="NewModuleRootManager" LANGUAGE_LEVEL="JDK_1_5">
4
    <output url="file://$MODULE_DIR$/target/classes" />
5
    <output-test url="file://$MODULE_DIR$/target/test-classes" />
6
    <content url="file://$MODULE_DIR$">
7
      <sourceFolder url="file://$MODULE_DIR$/src/main/java" isTestSource="false" />
8
      <sourceFolder url="file://$MODULE_DIR$/src/main/resources" type="java-resource" />
9
      <sourceFolder url="file://$MODULE_DIR$/src/test/java" isTestSource="true" />
10
      <excludeFolder url="file://$MODULE_DIR$/target" />
11
    </content>
12
    <orderEntry type="inheritedJdk" />
13
    <orderEntry type="sourceFolder" forTests="false" />
14
    <orderEntry type="library" name="Maven: com.ai.aif.msgframe:msgframe-common:1.9.2" level="project" />
15
    <orderEntry type="library" name="Maven: com.ai.aif.amber:amber-client:1.1.3" level="project" />
16
    <orderEntry type="library" name="Maven: org.fusesource.hawtbuf:hawtbuf:1.9" level="project" />
17
    <orderEntry type="library" name="Maven: com.aliyun.openservices:ons-client:1.7.0.Final" level="project" />
18
    <orderEntry type="library" name="Maven: commons-io:commons-io:2.4" level="project" />
19
    <orderEntry type="library" name="Maven: commons-lang:commons-lang:2.6" level="project" />
20
    <orderEntry type="library" name="Maven: com.ai.aif.msgframe:msgframe-client:1.9.2" level="project" />
21
    <orderEntry type="library" name="Maven: com.ai.aif.msgframe:msgframe-server:1.9.2" level="project" />
22
    <orderEntry type="library" name="Maven: com.ai.aif.msgframe:xmlmsgframe:2.6.4" level="project" />
23
    <orderEntry type="library" name="Maven: org.apache.xmlbeans:xmlbeans:2.6.0" level="project" />
24
    <orderEntry type="library" name="Maven: stax:stax-api:1.0.1" level="project" />
25
    <orderEntry type="library" name="Maven: com.rabbitmq:amqp-client:3.6.5" level="project" />
26
    <orderEntry type="library" name="Maven: com.alibaba:fastjson:1.1.46" level="project" />
27
    <orderEntry type="library" name="Maven: org.hamcrest:hamcrest-core:1.3" level="project" />
28
    <orderEntry type="library" name="Maven: javax.jms:jms:1.1" level="project" />
29
    <orderEntry type="library" name="Maven: org.apache.kafka:kafka-clients:0.10.2.0" level="project" />
30
    <orderEntry type="library" name="Maven: net.jpountz.lz4:lz4:1.3.0" level="project" />
31
    <orderEntry type="library" name="Maven: org.xerial.snappy:snappy-java:1.1.2.6" level="project" />
32
    <orderEntry type="library" name="Maven: org.slf4j:slf4j-api:1.7.21" level="project" />
33
    <orderEntry type="library" name="Maven: log4j:log4j:1.2.12" level="project" />
34
    <orderEntry type="library" name="Maven: com.alibaba.rocketmq:rocketmq-client:3.5.9" level="project" />
35
    <orderEntry type="library" name="Maven: junit:junit:4.12" level="project" />
36
  </component>
37
</module>

+ 86 - 0
ipu-kafka-example/pom.xml

@ -0,0 +1,86 @@
1
<?xml version="1.0" encoding="UTF-8"?>
2
<project xmlns="http://maven.apache.org/POM/4.0.0"
3
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5
    <modelVersion>4.0.0</modelVersion>
6
7
    <groupId>com.ai.ipu</groupId>
8
    <artifactId>ipu-kafka-example</artifactId>
9
    <version>1.0-SNAPSHOT</version>
10
11
    <properties>
12
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
13
        <ipu>3.1-SNAPSHOT</ipu>
14
        <jdk>1.8</jdk>
15
    </properties>
16
17
    <dependencies>
18
        <dependency>
19
            <groupId>com.ai.aif.msgframe</groupId>
20
            <artifactId>msgframe-common</artifactId>
21
            <version>1.9.2</version>
22
        </dependency>
23
        <dependency>
24
            <groupId>com.ai.aif.msgframe</groupId>
25
            <artifactId>msgframe-client</artifactId>
26
            <version>1.9.2</version>
27
        </dependency>
28
        <dependency>
29
            <groupId>com.ai.aif.msgframe</groupId>
30
            <artifactId>msgframe-server</artifactId>
31
            <version>1.9.2</version>
32
        </dependency>
33
        <dependency>
34
            <groupId>com.ai.aif.msgframe</groupId>
35
            <artifactId>xmlmsgframe</artifactId>
36
            <version>2.6.4</version>
37
        </dependency>
38
        <dependency>
39
            <groupId>org.apache.xmlbeans</groupId>
40
            <artifactId>xmlbeans</artifactId>
41
            <version>2.6.0</version>
42
        </dependency>
43
        <dependency>
44
            <groupId>com.rabbitmq</groupId>
45
            <artifactId>amqp-client</artifactId>
46
            <version>3.6.5</version>
47
        </dependency>
48
        <dependency>
49
            <groupId>com.alibaba</groupId>
50
            <artifactId>fastjson</artifactId>
51
            <version>1.1.46</version>
52
        </dependency>
53
        <dependency>
54
            <groupId>org.hamcrest</groupId>
55
            <artifactId>hamcrest-core</artifactId>
56
            <version>1.3</version>
57
        </dependency>
58
        <dependency>
59
            <groupId>javax.jms</groupId>
60
            <artifactId>jms</artifactId>
61
            <version>1.1</version>
62
        </dependency>
63
        <dependency>
64
            <groupId>org.apache.kafka</groupId>
65
            <artifactId>kafka-clients</artifactId>
66
            <version>0.10.2.0</version>
67
        </dependency>
68
        <dependency>
69
            <groupId>log4j</groupId>
70
            <artifactId>log4j</artifactId>
71
            <version>1.2.12</version>
72
        </dependency>
73
        <dependency>
74
            <groupId>com.alibaba.rocketmq</groupId>
75
            <artifactId>rocketmq-client</artifactId>
76
            <version>3.5.9</version>
77
        </dependency>
78
        <dependency>
79
            <groupId>junit</groupId>
80
            <artifactId>junit</artifactId>
81
            <version>4.12</version>
82
            <scope>compile</scope>
83
        </dependency>
84
    </dependencies>
85
86
</project>

+ 15 - 0
ipu-kafka-example/src/main/java/com/ai/ipu/example/kafka/KafkaCustomer.java

@ -0,0 +1,15 @@
1
package com.ai.ipu.example.kafka;
2
3
import com.ai.aif.msgframe.consumer.MfServiceStartup;
4
5
/**
6
 * @author liutong3
7
 * @team IPU
8
 * @date 2019/6/10 10:13
9
 * @desc kafka消费者启动
10
 */
11
public class KafkaCustomer {
12
    public static void main(String[] args){
13
        MfServiceStartup.main();
14
    }
15
}

+ 22 - 0
ipu-kafka-example/src/main/java/com/ai/ipu/example/kafka/KafkaCustomerExample.java

@ -0,0 +1,22 @@
1
package com.ai.ipu.example.kafka;
2
3
import com.ai.aif.msgframe.common.IConsumerProcessor;
4
import com.ai.aif.msgframe.common.exception.ConsumerException;
5
import com.ai.aif.msgframe.common.message.MsgFMessage;
6
import com.ai.aif.msgframe.common.message.MsgFTextMessage;
7
8
/**
9
 * @author liutong3
10
 * @team IPU
11
 * @date 2019/6/6 14:59
12
 * @desc kafka消费者
13
 */
14
public class KafkaCustomerExample implements IConsumerProcessor {
15
    @SuppressWarnings("rawtypes")
16
    public Object process(MsgFMessage message) throws ConsumerException {
17
        if (message instanceof MsgFTextMessage) {
18
            System.out.println("{topic:" + message.getTopic() + ",value:" + ((MsgFTextMessage) message).getText() + "}");
19
        }
20
        return true;
21
    }
22
}

+ 91 - 0
ipu-kafka-example/src/main/java/com/ai/ipu/example/kafka/KafkaProducerExample.java

@ -0,0 +1,91 @@
1
package com.ai.ipu.example.kafka;
2
3
import com.ai.aif.msgframe.MfProducerClient;
4
import com.ai.aif.msgframe.common.CompletionListener;
5
import com.ai.aif.msgframe.common.message.MsgFMessage;
6
import com.ai.aif.msgframe.common.message.MsgFTextMessage;
7
import org.apache.log4j.Logger;
8
import org.junit.Test;
9
10
import java.util.Date;
11
12
13
/**
14
 * @author liutong3
15
 * @team IPU
16
 * @date 2019/6/6 16:07
17
 * @desc kafka生产者
18
 */
19
public class KafkaProducerExample {
20
    private static final Logger LOG = Logger.getLogger(KafkaProducerExample.class);
21
    private final String TOPIC = "test";
22
    private final String MESSAGE = "test";
23
    private final String ORDERID = "1000";
24
25
    @Test
26
    public void testSendMessage() {
27
        String message = getMessage();
28
        sendMessage(TOPIC, message);
29
    }
30
31
    @Test
32
    public void testSendAsyncMessage() {
33
        String message = getMessage();
34
        sendAsyncMessage(TOPIC, message);
35
    }
36
37
    @Test
38
    public void testSendOrderMessage() {
39
        String message = getMessage();
40
        sendOrderMessage(TOPIC, message, ORDERID);
41
    }
42
43
    private String getMessage() {
44
        return MESSAGE + "_" + new Date().getTime();
45
    }
46
47
    private void sendMessage(String topic, String text) {
48
        try {
49
            MfProducerClient client = new MfProducerClient();
50
            MsgFTextMessage message = new MsgFTextMessage();
51
            message.setText(text);
52
            client.send(topic, message);
53
            LOG.debug("Sent:" + message);
54
            Thread.currentThread().sleep(1000);
55
        } catch (Exception e) {
56
            throw new RuntimeException(e);
57
        }
58
    }
59
60
    private void sendAsyncMessage(String topic, final String text) {
61
        try {
62
            MfProducerClient client = new MfProducerClient();
63
            final MsgFTextMessage message = new MsgFTextMessage();
64
            message.setText(text);
65
            client.asyncSend(topic, message, new CompletionListener() {
66
                public void onCompletion(MsgFMessage msgFMessage) {
67
                    LOG.debug("Sent Success:" + msgFMessage.getMsgId() + "-" + text);
68
                }
69
70
                public void onException(MsgFMessage msgFMessage, Exception e) {
71
                    LOG.debug("Sent Error:" + msgFMessage.getMsgId() + "-" + text);
72
                }
73
            });
74
            Thread.currentThread().sleep(1000);
75
        } catch (Exception e) {
76
            throw new RuntimeException(e);
77
        }
78
    }
79
80
    private void sendOrderMessage(String topic, final String text, String orderId) {
81
        try {
82
            MfProducerClient client = new MfProducerClient();
83
            final MsgFTextMessage message = new MsgFTextMessage();
84
            message.setText(text);
85
            client.sendOrderMsg(topic, message, orderId);
86
            Thread.currentThread().sleep(1000);
87
        } catch (Exception e) {
88
            throw new RuntimeException(e);
89
        }
90
    }
91
}

+ 8 - 0
ipu-kafka-example/src/main/resources/log4j.properties

@ -0,0 +1,8 @@
1
log4j.rootLogger=error, console
2
log4j.logger.org.apache.zookeeper=error
3
log4j.logger.com.ai.ipu.example=debug
4
5
log4j.appender.console=org.apache.log4j.ConsoleAppender
6
log4j.appender.console.target=System.out
7
log4j.appender.console.layout=org.apache.log4j.PatternLayout
8
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %5p [%t] (%F:%L) - %m%n

+ 28 - 0
ipu-kafka-example/src/main/resources/msgframe-config.xml

@ -0,0 +1,28 @@
1
<?xml version="1.0" encoding="UTF-8"?>
2
<msgframeCfg xmlns="http://www.asiainfo.com/msgframe">
3
    <centerCfg>
4
        <name>ipu-kafka-example</name>
5
        <destinations>
6
            <queue name="test" belong="myCenter"/>
7
        </destinations>
8
        <subscribes>
9
            <subscribe subDestination="test">
10
                <implclass>com.ai.ipu.example.kafka.KafkaCustomerExample</implclass>
11
            </subscribe>
12
        </subscribes>
13
        <centers>
14
            <center name="myCenter" containClusters="kafka-cluster"/>
15
        </centers>
16
        <clusters>
17
            <cluster name="kafka-cluster" type="Kafka">
18
                <url>47.105.160.21:9091,47.105.160.21:9092,47.105.160.21:9093</url>
19
            </cluster>
20
        </clusters>
21
        <!--<persistence>-->
22
            <!--<exceptionPersistence>-->
23
                <!--<producerExceptionClass>com.ai.ipu.example.kafka.exception.ProducerExceprionHandle</producerExceptionClass>-->
24
                <!--<consumerExceptionClass>com.ai.ipu.example.kafka.exception.ConsumerExceprionHandle</consumerExceptionClass>-->
25
            <!--</exceptionPersistence>-->
26
        <!--</persistence>-->
27
    </centerCfg>
28
</msgframeCfg>

+ 2 - 2
ipu-zk-example/ipu-zk-example.iml

@ -11,8 +11,6 @@
11 11
    </content>
12 12
    <orderEntry type="inheritedJdk" />
13 13
    <orderEntry type="sourceFolder" forTests="false" />
14
    <orderEntry type="library" name="Maven: junit:junit:4.12" level="project" />
15
    <orderEntry type="library" name="Maven: org.hamcrest:hamcrest-core:1.3" level="project" />
16 14
    <orderEntry type="library" name="Maven: com.ai.ipu:ipu-zk:3.1-SNAPSHOT" level="project" />
17 15
    <orderEntry type="library" name="Maven: org.apache.zookeeper:zookeeper:3.4.9" level="project" />
18 16
    <orderEntry type="library" name="Maven: org.slf4j:slf4j-api:1.6.1" level="project" />
@ -51,5 +49,7 @@
51 49
    <orderEntry type="library" name="Maven: com.alibaba:fastjson:1.2.38" level="project" />
52 50
    <orderEntry type="library" name="Maven: org.yaml:snakeyaml:1.23" level="project" />
53 51
    <orderEntry type="library" name="Maven: com.ai.wade:wade-data:1.0" level="project" />
52
    <orderEntry type="library" name="Maven: junit:junit:4.12" level="project" />
53
    <orderEntry type="library" name="Maven: org.hamcrest:hamcrest-core:1.3" level="project" />
54 54
  </component>
55 55
</module>