Browse Source

rocketMQ示例

liutong3 6 years ago
parent
commit
1c8cbafb8f

+ 8 - 0
comp-example/pom.xml

@ -27,16 +27,24 @@
27 27
			<artifactId>ipu-cache</artifactId>
28 28
			<version>${ipu}</version>
29 29
		</dependency>
30
30 31
		<dependency>
31 32
			<groupId>org.apache.zookeeper</groupId>
32 33
			<artifactId>zookeeper</artifactId>
33 34
			<version>3.4.12</version>
34 35
		</dependency>
36
35 37
		<dependency>
36 38
			<groupId>org.apache.kafka</groupId>
37 39
			<artifactId>kafka-clients</artifactId>
38 40
			<version>0.10.2.1</version>
39 41
		</dependency>
42
43
		<dependency>
44
			<groupId>org.apache.rocketmq</groupId>
45
			<artifactId>rocketmq-client</artifactId>
46
			<version>4.2.0</version>
47
		</dependency>
40 48
	</dependencies>
41 49
42 50

+ 36 - 0
comp-example/src/test/java/com/ai/ipu/example/rocketmq/RocketMQCustomerExample.java

@ -0,0 +1,36 @@
1
package com.ai.ipu.example.rocketmq;
2
3
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
4
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
5
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
6
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
7
import org.apache.rocketmq.client.exception.MQClientException;
8
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
9
import org.apache.rocketmq.common.message.MessageExt;
10
11
import java.util.List;
12
13
public class RocketMQCustomerExample {
14
    public static void main(String[] args) throws MQClientException {
15
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer1");
16
        consumer.setNamesrvAddr("47.105.160.21:9876");
17
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
18
        consumer.subscribe("testTopic", "*");
19
        consumer.registerMessageListener(new MessageListenerConcurrently() {
20
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages,
21
                                                            ConsumeConcurrentlyContext context) {
22
                System.out.println("返回消息信息:");
23
                System.out.println(messages);
24
                int j = 0;
25
                for (MessageExt message : messages) {
26
                    j++;
27
                    System.out.println("返回的第" + j + "条消息为:");
28
                    System.out.println(new String(message.getBody()));
29
                }
30
                System.out.println("----------------------------------------------------------------------------------");
31
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
32
            }
33
        });
34
        consumer.start();
35
    }
36
}

+ 47 - 0
comp-example/src/test/java/com/ai/ipu/example/rocketmq/RocketMQProducerExample.java

@ -0,0 +1,47 @@
1
package com.ai.ipu.example.rocketmq;
2
3
import org.apache.rocketmq.client.exception.MQBrokerException;
4
import org.apache.rocketmq.client.exception.MQClientException;
5
import org.apache.rocketmq.client.producer.DefaultMQProducer;
6
import org.apache.rocketmq.client.producer.SendResult;
7
import org.apache.rocketmq.common.message.Message;
8
import org.apache.rocketmq.remoting.common.RemotingHelper;
9
import org.apache.rocketmq.remoting.exception.RemotingException;
10
11
import java.io.UnsupportedEncodingException;
12
import java.util.Scanner;
13
14
public class RocketMQProducerExample {
15
    public static void main(String[] args) throws MQClientException, InterruptedException {
16
        DefaultMQProducer producer = new DefaultMQProducer("producer1");
17
        producer.setVipChannelEnabled(false);
18
        producer.setNamesrvAddr("47.105.160.21:9876");
19
        producer.start();
20
21
22
        try {
23
            Scanner sc = new Scanner(System.in);
24
            int i = 0;
25
            while (true) {
26
                i++;
27
                System.out.println("请输入第" + i + "条信息的内容:");
28
                String message = sc.nextLine();
29
                Message msg = new Message("testTopic",
30
                        "testTag",
31
                        message.getBytes(RemotingHelper.DEFAULT_CHARSET)
32
                );
33
                SendResult sendResult = producer.send(msg);
34
                System.out.println("发出消息信息:");
35
                System.out.println(sendResult);
36
            }
37
        } catch (RemotingException e) {
38
            e.printStackTrace();
39
        } catch (MQBrokerException e) {
40
            e.printStackTrace();
41
        } catch (UnsupportedEncodingException e) {
42
            e.printStackTrace();
43
        } finally {
44
            producer.shutdown();
45
        }
46
    }
47
}