Ver Código Fonte

Merge branch 'master' of http://10.1.235.20:3000/rest/rest-guide.git

huangbo 6 anos atrás
pai
commit
82869eb01c

+ 21 - 1
comp-example/pom.xml

@ -27,8 +27,28 @@
27 27
			<artifactId>ipu-cache</artifactId>
28 28
			<version>${ipu}</version>
29 29
		</dependency>
30
31
		<dependency>
32
			<groupId>org.apache.zookeeper</groupId>
33
			<artifactId>zookeeper</artifactId>
34
			<version>3.4.12</version>
35
		</dependency>
36
37
		<dependency>
38
			<groupId>org.apache.kafka</groupId>
39
			<artifactId>kafka-clients</artifactId>
40
			<version>0.10.2.1</version>
41
		</dependency>
42
43
		<dependency>
44
			<groupId>org.apache.rocketmq</groupId>
45
			<artifactId>rocketmq-client</artifactId>
46
			<version>4.2.0</version>
47
		</dependency>
30 48
	</dependencies>
31
	
49
50
51
32 52
	<build>
33 53
		<plugins>
34 54
			<plugin>

+ 34 - 0
comp-example/src/test/java/com/ai/ipu/example/kafka/KafkaCustomerExample.java

@ -0,0 +1,34 @@
1
package com.ai.ipu.example.kafka;
2
3
import org.apache.kafka.clients.consumer.ConsumerConfig;
4
import org.apache.kafka.clients.consumer.ConsumerRecord;
5
import org.apache.kafka.clients.consumer.ConsumerRecords;
6
import org.apache.kafka.clients.consumer.KafkaConsumer;
7
import java.util.Arrays;
8
import java.util.Properties;
9
10
public class KafkaCustomerExample {
11
    public static void main(String[] args) throws InterruptedException {
12
        Properties properties = new Properties();
13
        properties.put("bootstrap.servers", "47.105.160.21:9091,47.105.160.21:9092,47.105.160.21:9093");
14
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
15
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
16
        properties.put(ConsumerConfig.GROUP_ID_CONFIG ,"test") ;
17
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
18
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
19
20
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
21
        kafkaConsumer.subscribe(Arrays.asList("test"));
22
        try {
23
            while (true) {
24
                ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
25
                for (ConsumerRecord<String, String> record : records) {
26
                    System.out.printf("offset = %d, value = %s", record.offset(), record.value());
27
                    System.out.println("\n=================================");
28
                }
29
            }
30
        } finally {
31
            kafkaConsumer.close();
32
        }
33
    }
34
}

+ 33 - 0
comp-example/src/test/java/com/ai/ipu/example/kafka/KafkaProducerExample.java

@ -0,0 +1,33 @@
1
package com.ai.ipu.example.kafka;
2
3
import org.apache.kafka.clients.producer.KafkaProducer;
4
import org.apache.kafka.clients.producer.ProducerRecord;
5
6
import java.util.ArrayList;
7
import java.util.List;
8
import java.util.Properties;
9
import java.util.Scanner;
10
11
public class KafkaProducerExample {
12
    public static void main(String[] args){
13
        Properties properties = new Properties();
14
        properties.put("bootstrap.servers", "47.105.160.21:9091,47.105.160.21:9092,47.105.160.21:9093");
15
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
16
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
17
        properties.put("request.required.acks", "all");
18
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
19
        try {
20
            Scanner sc = new Scanner(System.in);
21
            int i = 0;
22
            while (true) {
23
                i++;
24
                System.out.println("请输入第" + i + "条信息的内容:");
25
                String message = sc.nextLine();
26
                producer.send(new ProducerRecord<String, String>("test", message));
27
                System.out.println("Sent:" + message);
28
            }
29
        } finally {
30
            producer.close();
31
        }
32
    }
33
}

+ 36 - 0
comp-example/src/test/java/com/ai/ipu/example/rocketmq/RocketMQCustomerExample.java

@ -0,0 +1,36 @@
1
package com.ai.ipu.example.rocketmq;
2
3
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
4
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
5
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
6
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
7
import org.apache.rocketmq.client.exception.MQClientException;
8
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
9
import org.apache.rocketmq.common.message.MessageExt;
10
11
import java.util.List;
12
13
public class RocketMQCustomerExample {
14
    public static void main(String[] args) throws MQClientException {
15
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer1");
16
        consumer.setNamesrvAddr("47.105.160.21:9876");
17
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
18
        consumer.subscribe("testTopic", "*");
19
        consumer.registerMessageListener(new MessageListenerConcurrently() {
20
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages,
21
                                                            ConsumeConcurrentlyContext context) {
22
                System.out.println("返回消息信息:");
23
                System.out.println(messages);
24
                int j = 0;
25
                for (MessageExt message : messages) {
26
                    j++;
27
                    System.out.println("返回的第" + j + "条消息为:");
28
                    System.out.println(new String(message.getBody()));
29
                }
30
                System.out.println("----------------------------------------------------------------------------------");
31
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
32
            }
33
        });
34
        consumer.start();
35
    }
36
}

+ 47 - 0
comp-example/src/test/java/com/ai/ipu/example/rocketmq/RocketMQProducerExample.java

@ -0,0 +1,47 @@
1
package com.ai.ipu.example.rocketmq;
2
3
import org.apache.rocketmq.client.exception.MQBrokerException;
4
import org.apache.rocketmq.client.exception.MQClientException;
5
import org.apache.rocketmq.client.producer.DefaultMQProducer;
6
import org.apache.rocketmq.client.producer.SendResult;
7
import org.apache.rocketmq.common.message.Message;
8
import org.apache.rocketmq.remoting.common.RemotingHelper;
9
import org.apache.rocketmq.remoting.exception.RemotingException;
10
11
import java.io.UnsupportedEncodingException;
12
import java.util.Scanner;
13
14
public class RocketMQProducerExample {
15
    public static void main(String[] args) throws MQClientException, InterruptedException {
16
        DefaultMQProducer producer = new DefaultMQProducer("producer1");
17
        producer.setVipChannelEnabled(false);
18
        producer.setNamesrvAddr("47.105.160.21:9876");
19
        producer.start();
20
21
22
        try {
23
            Scanner sc = new Scanner(System.in);
24
            int i = 0;
25
            while (true) {
26
                i++;
27
                System.out.println("请输入第" + i + "条信息的内容:");
28
                String message = sc.nextLine();
29
                Message msg = new Message("testTopic",
30
                        "testTag",
31
                        message.getBytes(RemotingHelper.DEFAULT_CHARSET)
32
                );
33
                SendResult sendResult = producer.send(msg);
34
                System.out.println("发出消息信息:");
35
                System.out.println(sendResult);
36
            }
37
        } catch (RemotingException e) {
38
            e.printStackTrace();
39
        } catch (MQBrokerException e) {
40
            e.printStackTrace();
41
        } catch (UnsupportedEncodingException e) {
42
            e.printStackTrace();
43
        } finally {
44
            producer.shutdown();
45
        }
46
    }
47
}

+ 76 - 0
comp-example/src/test/java/com/ai/ipu/example/zookeeper/ZookeeperExample.java

@ -1,5 +1,81 @@
1 1
package com.ai.ipu.example.zookeeper;
2 2

3
import org.apache.zookeeper.*;
4

5
import java.io.IOException;
6
import java.util.List;
7

3 8
public class ZookeeperExample {
9
    // 会话超时时间,设置为与系统默认时间一致
10
    private static final int SESSION_TIMEOUT = 2 * 1000;
11

12
    // 创建 ZooKeeper 实例
13
    private ZooKeeper zk;
14

15
    // 创建 Watcher 实例
16
    private Watcher wh = new Watcher() {
17
        public void process(WatchedEvent event) {
18
            System.out.println("触发了WatchedEvent的事件 >>> " + event.toString());
19
        }
20
    };
21

22
    // 初始化 ZooKeeper 实例
23
    private void createZKInstance() throws IOException {
24
        // 连接到ZK服务,多个可以用逗号分割写
25
        zk = new ZooKeeper("47.105.160.21:2181,47.105.160.21:2182,47.105.160.21:2183", ZookeeperExample.SESSION_TIMEOUT, this.wh);
26

27
    }
28

29
    private void showNodes(String path) throws InterruptedException, KeeperException {
30
        List<String> list = zk.getChildren(path, this.wh);
31
        int i = 1;
32
        for (String node : list) {
33
            System.out.println("(" + i + ")" + node);
34
            i++;
35
        }
36
    }
37
    
38
    private void ZKOperations() throws InterruptedException, KeeperException {
39
        System.out.println("初始节点:");
40
        showNodes("/");
41
        System.out.println("---------------------------------");
42

43
        System.out.println("创建 ZooKeeper 节点 (znode : myzktest, 数据: myData2 ,权限: OPEN_ACL_UNSAFE ,节点类型: Persistent");
44
        zk.create("/myzktest", "myData2".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
45
        System.out.println("查看是否创建成功: ");
46
        System.out.println("当前节点:");
47
        showNodes("/");
48
        System.out.println("/myzktest的数据为:");
49
        System.out.println(new String(zk.getData("/myzktest", this.wh, null)));// 添加Watch
50
        System.out.println("---------------------------------");
51

52
        System.out.println("修改节点数据 ");
53
        zk.setData("/myzktest", "liutong".getBytes(), -1);
54
        System.out.println("/myzktest的数据为:");
55
        System.out.println(new String(zk.getData("/myzktest", false, null)));
56
        System.out.println("---------------------------------");
57

58
        // 这里再次进行修改,则不会触发Watch事件,这就是我们验证ZK的一个特性“一次性触发”,也就是说设置一次监视,只会对下次操作起一次作用。
59
        System.out.println("再次修改节点数据 ");
60
        zk.setData("/myzktest", "liutong2".getBytes(), -1);
61
        System.out.println("/myzktest的数据为:");
62
        System.out.println(new String(zk.getData("/myzktest", false, null)));
63
        System.out.println("---------------------------------");
64

65
        System.out.println("删除节点 ");
66
        zk.delete("/myzktest", -1);
67
        System.out.println(" 节点状态: [" + zk.exists("/myzktest", false) + "]");
68
        showNodes("/");
69
    }
70

71
    private void ZKClose() throws InterruptedException {
72
        zk.close();
73
    }
4 74

75
    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
76
        ZookeeperExample dm = new ZookeeperExample();
77
        dm.createZKInstance();
78
        dm.ZKOperations();
79
        dm.ZKClose();
80
    }
5 81
}