Browse Source

增加rabbitmq的消费者、生产者代码

liutong3 6 years ago
parent
commit
f3aa8d34c9

+ 6 - 0
comp-example/pom.xml

@ -47,6 +47,12 @@
47 47
		</dependency>
48 48
49 49
		<dependency>
50
			<groupId>com.rabbitmq</groupId>
51
			<artifactId>amqp-client</artifactId>
52
			<version>3.6.5</version>
53
		</dependency>
54
55
		<dependency>
50 56
			<groupId>com.ai.ipu</groupId>
51 57
			<artifactId>ipu-zk</artifactId>
52 58
			<version>${ipu}</version>

+ 45 - 0
comp-example/src/test/java/com/ai/ipu/example/rabbitmq/RabbitMQCustomerExample.java

@ -0,0 +1,45 @@
1
package com.ai.ipu.example.rabbitmq;
2
3
import com.rabbitmq.client.*;
4
5
import java.io.IOException;
6
import java.util.concurrent.TimeoutException;
7
8
/**
9
 * @author liutong3
10
 * @team IPU
11
 * @date 2019/5/28 11:00
12
 * @desc rabbitmq消费者
13
 */
14
public class RabbitMQCustomerExample {
15
    private final static String QUEUE_NAME = "rabbitMQ.test";
16
17
    public static void main(String[] args) throws IOException, TimeoutException {
18
        // 创建连接工厂
19
        ConnectionFactory factory = new ConnectionFactory();
20
        //设置RabbitMQ地址
21
        factory.setHost("47.105.160.21");
22
        factory.setUsername("admin");
23
        factory.setPassword("admin");
24
        //创建一个新的连接
25
        Connection connection = factory.newConnection();
26
        //创建一个通道
27
        Channel channel = connection.createChannel();
28
        //声明要关注的队列
29
        channel.queueDeclare(QUEUE_NAME, false, false, true, null);
30
        //DefaultConsumer类实现了Consumer接口,通过传入一个频道,
31
        // 告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery
32
        Consumer consumer = new DefaultConsumer(channel) {
33
            @Override
34
            public void handleDelivery(String consumerTag, Envelope envelope,
35
                                       AMQP.BasicProperties properties, byte[] body)
36
                    throws IOException {
37
                String message = new String(body, "UTF-8");
38
                System.out.println("返回消息信息:");
39
                System.out.println(message);
40
            }
41
        };
42
        //自动回复队列应答 -- RabbitMQ中的消息确认机制
43
        channel.basicConsume(QUEUE_NAME, true, consumer);
44
    }
45
}

+ 50 - 0
comp-example/src/test/java/com/ai/ipu/example/rabbitmq/RabbitMQProducerExample.java

@ -0,0 +1,50 @@
1
package com.ai.ipu.example.rabbitmq;
2
3
import com.rabbitmq.client.Channel;
4
import com.rabbitmq.client.Connection;
5
import com.rabbitmq.client.ConnectionFactory;
6
import org.apache.kafka.clients.producer.ProducerRecord;
7
8
import java.io.IOException;
9
import java.util.Scanner;
10
import java.util.concurrent.TimeoutException;
11
12
/**
13
 * @author liutong3
14
 * @team IPU
15
 * @date 2019/5/28 11:02
16
 * @desc tabbitmq生产者
17
 */
18
public class RabbitMQProducerExample {
19
    public final static String QUEUE_NAME="rabbitMQ.test";
20
21
    public static void main(String[] args) throws IOException, TimeoutException {
22
        //创建连接工厂
23
        ConnectionFactory factory = new ConnectionFactory();
24
        //设置RabbitMQ相关信息
25
        factory.setHost("47.105.160.21");
26
        factory.setUsername("admin");
27
        factory.setPassword("admin");
28
//        factory.setPort();
29
        //创建一个新的连接
30
        Connection connection = factory.newConnection();
31
        //创建一个通道
32
        Channel channel = connection.createChannel();
33
        //  声明一个队列        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
34
        try {
35
            Scanner sc = new Scanner(System.in);
36
            int i = 0;
37
            while (true) {
38
                i++;
39
                System.out.println("请输入第" + i + "条信息的内容:");
40
                String message = sc.nextLine();
41
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
42
                System.out.println("Sent:" + message);
43
            }
44
        } finally {
45
            //关闭通道和连接
46
            channel.close();
47
            connection.close();
48
        }
49
    }
50
}