liutong3 лет назад: 6
Родитель
Сommit
afeb47976f

+ 5 - 0
comp-example/pom.xml

@ -32,6 +32,11 @@
32 32
			<artifactId>zookeeper</artifactId>
33 33
			<version>3.4.12</version>
34 34
		</dependency>
35
		<dependency>
36
			<groupId>org.apache.kafka</groupId>
37
			<artifactId>kafka-clients</artifactId>
38
			<version>0.10.2.1</version>
39
		</dependency>
35 40
	</dependencies>
36 41
37 42

+ 34 - 0
comp-example/src/test/java/com/ai/ipu/example/kafka/KafkaCustomerExample.java

@ -0,0 +1,34 @@
1
package com.ai.ipu.example.kafka;
2
3
import org.apache.kafka.clients.consumer.ConsumerConfig;
4
import org.apache.kafka.clients.consumer.ConsumerRecord;
5
import org.apache.kafka.clients.consumer.ConsumerRecords;
6
import org.apache.kafka.clients.consumer.KafkaConsumer;
7
import java.util.Arrays;
8
import java.util.Properties;
9
10
public class KafkaCustomerExample {
11
    public static void main(String[] args) throws InterruptedException {
12
        Properties properties = new Properties();
13
        properties.put("bootstrap.servers", "47.105.160.21:9091,47.105.160.21:9092,47.105.160.21:9093");
14
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
15
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
16
        properties.put(ConsumerConfig.GROUP_ID_CONFIG ,"test") ;
17
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
18
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
19
20
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
21
        kafkaConsumer.subscribe(Arrays.asList("test"));
22
        try {
23
            while (true) {
24
                ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
25
                for (ConsumerRecord<String, String> record : records) {
26
                    System.out.printf("offset = %d, value = %s", record.offset(), record.value());
27
                    System.out.println("\n=================================");
28
                }
29
            }
30
        } finally {
31
            kafkaConsumer.close();
32
        }
33
    }
34
}

+ 33 - 0
comp-example/src/test/java/com/ai/ipu/example/kafka/KafkaProducerExample.java

@ -0,0 +1,33 @@
1
package com.ai.ipu.example.kafka;
2
3
import org.apache.kafka.clients.producer.KafkaProducer;
4
import org.apache.kafka.clients.producer.ProducerRecord;
5
6
import java.util.ArrayList;
7
import java.util.List;
8
import java.util.Properties;
9
import java.util.Scanner;
10
11
public class KafkaProducerExample {
12
    public static void main(String[] args){
13
        Properties properties = new Properties();
14
        properties.put("bootstrap.servers", "47.105.160.21:9091,47.105.160.21:9092,47.105.160.21:9093");
15
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
16
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
17
        properties.put("request.required.acks", "all");
18
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
19
        try {
20
            Scanner sc = new Scanner(System.in);
21
            int i = 0;
22
            while (true) {
23
                i++;
24
                System.out.println("请输入第" + i + "条信息的内容:");
25
                String message = sc.nextLine();
26
                producer.send(new ProducerRecord<String, String>("test", message));
27
                System.out.println("Sent:" + message);
28
            }
29
        } finally {
30
            producer.close();
31
        }
32
    }
33
}

+ 1 - 1
comp-example/src/test/java/com/ai/ipu/example/zookeeper/ZookeeperExample.java

@ -56,7 +56,7 @@ public class ZookeeperExample {
56 56
        System.out.println("---------------------------------");
57 57

58 58
        // 这里再次进行修改,则不会触发Watch事件,这就是我们验证ZK的一个特性“一次性触发”,也就是说设置一次监视,只会对下次操作起一次作用。
59
        System.out.println("3再次修改节点数据 ");
59
        System.out.println("再次修改节点数据 ");
60 60
        zk.setData("/myzktest", "liutong2".getBytes(), -1);
61 61
        System.out.println("/myzktest的数据为:");
62 62
        System.out.println(new String(zk.getData("/myzktest", false, null)));