Browse Source

1、Sonar检测问题处理

lilb3 5 years ago
parent
commit
9e4bfbbd84

+ 26 - 0
comp-example/src/main/java/com/ai/ipu/example/spark/JavaDroppedWordsCounter.java

@ -0,0 +1,26 @@
1
package com.ai.ipu.example.spark;
2
3
4
import org.apache.spark.api.java.JavaSparkContext;
5
import org.apache.spark.util.LongAccumulator;
6
7
/**
8
 * Use this singleton to get or register an Accumulator.
9
 *
10
 * @author lilb3@asiainfo.com
11
 * @since 2020/4/27
12
 **/
13
final class JavaDroppedWordsCounter {
14
  private static LongAccumulator instance;
15
16
  private JavaDroppedWordsCounter() {
17
18
  }
19
20
  static synchronized LongAccumulator getInstance(JavaSparkContext jsc) {
21
    if (instance == null) {
22
      instance = jsc.sc().longAccumulator("WordsInBlacklistCounter");
23
    }
24
    return instance;
25
  }
26
}

+ 123 - 130
comp-example/src/main/java/com/ai/ipu/example/spark/JavaRecoverableNetworkWordCountTest.java

@ -16,12 +16,15 @@
16 16
 */
17 17
package com.ai.ipu.example.spark;
18 18
19
import com.google.common.io.Files;
19
import com.ai.ipu.basic.log.ILogger;
20
import com.ai.ipu.basic.log.IpuLoggerFactory;
20 21
import org.apache.spark.SparkConf;
22
import org.apache.spark.api.java.JavaPairRDD;
21 23
import org.apache.spark.api.java.JavaSparkContext;
22 24
import org.apache.spark.api.java.function.Function0;
23 25
import org.apache.spark.broadcast.Broadcast;
24 26
import org.apache.spark.streaming.Durations;
27
import org.apache.spark.streaming.Time;
25 28
import org.apache.spark.streaming.api.java.JavaDStream;
26 29
import org.apache.spark.streaming.api.java.JavaPairDStream;
27 30
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
@ -30,51 +33,15 @@ import org.apache.spark.util.LongAccumulator;
30 33
import scala.Tuple2;
31 34
32 35
import java.io.File;
33
import java.nio.charset.Charset;
36
import java.io.FileOutputStream;
37
import java.io.IOException;
38
import java.io.OutputStreamWriter;
39
import java.nio.charset.StandardCharsets;
34 40
import java.util.Arrays;
35 41
import java.util.List;
36 42
import java.util.regex.Pattern;
37 43
38 44
/**
39
 * Use this singleton to get or register a Broadcast variable.
40
 */
41
class JavaWordBlacklist {
42
43
    private static volatile Broadcast<List<String>> instance = null;
44
45
    public static Broadcast<List<String>> getInstance(JavaSparkContext jsc) {
46
        if (instance == null) {
47
            synchronized (JavaWordBlacklist.class) {
48
                if (instance == null) {
49
                    List<String> wordBlacklist = Arrays.asList("a", "b", "c");
50
                    instance = jsc.broadcast(wordBlacklist);
51
                }
52
            }
53
        }
54
        return instance;
55
    }
56
}
57
58
/**
59
 * Use this singleton to get or register an Accumulator.
60
 */
61
class JavaDroppedWordsCounter {
62
63
    private static volatile LongAccumulator instance = null;
64
65
    public static LongAccumulator getInstance(JavaSparkContext jsc) {
66
        if (instance == null) {
67
            synchronized (JavaDroppedWordsCounter.class) {
68
                if (instance == null) {
69
                    instance = jsc.sc().longAccumulator("WordsInBlacklistCounter");
70
                }
71
            }
72
        }
73
        return instance;
74
    }
75
}
76
77
/**
78 45
 * Counts words in text encoded with UTF8 received from the network every second. This example also
79 46
 * shows how to use lazily instantiated singleton instances for Accumulator and Broadcast so that
80 47
 * they can be registered on driver failures.
@ -101,109 +68,135 @@ class JavaDroppedWordsCounter {
101 68
 * the checkpoint data.
102 69
 * <p>
103 70
 * Refer to the online documentation for more details.
104
 *
71
 * <p>
105 72
 * Spark广播变量 累加器 可恢复网络计数测试
106 73
 *
107 74
 * @author lilb3@asiainfo.com
108 75
 * @since 2019-05-24 17:10
109 76
 * 普通打包部署 需要上传原包和依赖包 如框架日志ipu-basic-3.1-SNAPSHOT.jar, 多个jar以逗号隔开
110 77
 * 删除CheckPoint文件:
111
 * ~/shell/hadoop/9000/conn_hadoop.sh fs -rm -R hdfs://iZm5e5xe1w25avi0io1f5aZ:9000/checkpoint/JavaRecoverableNetworkWordCountTest
78
 * ~/shell/hadoop/9000/conn_hadoop.sh fs -rm -R \
79
 * hdfs://iZm5e5xe1w25avi0io1f5aZ:9000/checkpoint/JavaRecoverableNetworkWordCountTest
112 80
 * 查看CheckPoint文件:
113
 * ~/shell/hadoop/9000/conn_hadoop.sh fs -ls -R hdfs://iZm5e5xe1w25avi0io1f5aZ:9000/checkpoint/JavaRecoverableNetworkWordCountTest
81
 * ~/shell/hadoop/9000/conn_hadoop.sh fs -ls -R \
82
 * hdfs://iZm5e5xe1w25avi0io1f5aZ:9000/checkpoint/JavaRecoverableNetworkWordCountTest
114 83
 * 开启监听:
115 84
 * nc -lk iZm5e5xe1w25avi0io1f5aZ 9100
116 85
 * 执行程序:
117
 * ~/software/spark-2.4.1-bin-hadoop2.7/bin/spark-submit --jars /home/mysql/test/ipu-basic-3.1-SNAPSHOT.jar --class com.ai.ipu.example.spark.JavaRecoverableNetworkWordCountTest /home/mysql/test/comp-example-1.0.jar.original iZm5e5xe1w25avi0io1f5aZ 9100 hdfs://iZm5e5xe1w25avi0io1f5aZ:9000/checkpoint/JavaRecoverableNetworkWordCountTest/ ~/logs/spark/JavaRecoverableNetworkWordCountTest.txt
86
 * ~/software/spark-2.4.1-bin-hadoop2.7/bin/spark-submit \
87
 * --jars /home/mysql/test/ipu-basic-3.1-SNAPSHOT.jar \
88
 * --class com.ai.ipu.example.spark.JavaRecoverableNetworkWordCountTest \
89
 * /home/mysql/test/comp-example-1.0.jar.original iZm5e5xe1w25avi0io1f5aZ 9100 \
90
 * hdfs://iZm5e5xe1w25avi0io1f5aZ:9000/checkpoint/JavaRecoverableNetworkWordCountTest/ \
91
 * ~/logs/spark/JavaRecoverableNetworkWordCountTest.txt
118 92
 * 查看结果:
119 93
 * cat ~/logs/spark/JavaRecoverableNetworkWordCountTest.txt
120 94
 **/
121 95
public final class JavaRecoverableNetworkWordCountTest {
122
    private static final Pattern SPACE = Pattern.compile(" ");
123
124
    private static JavaStreamingContext createContext(String ip,
125
                                                      int port,
126
                                                      String checkpointDirectory,
127
                                                      String outputPath) {
128
129
        // If you do not see this printed, that means the StreamingContext has been loaded
130
        // from the new checkpoint
131
        System.out.println("Creating new context");
132
        File outputFile = new File(outputPath);
133
        if (outputFile.exists()) {
134
            outputFile.delete();
135
        }
136
        SparkConf sparkConf = new SparkConf().setAppName("JavaRecoverableNetworkWordCount");
137
        // Create the context with a 1 second batch size
138
        JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(30));
139
        ssc.checkpoint(checkpointDirectory);
140
141
        // Create a socket stream on target ip:port and count the
142
        // words in input stream of \n delimited text (eg. generated by 'nc')
143
        JavaReceiverInputDStream<String> lines = ssc.socketTextStream(ip, port);
144
        JavaDStream<String> words = lines.flatMap(x -> {
145
            System.out.println("flatMap call: " + x);
146
            return Arrays.asList(SPACE.split(x)).iterator();
147
        });
148
        JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1))
149
                .reduceByKey((i1, i2) -> i1 + i2);
150
151
        wordCounts.foreachRDD((rdd, time) -> {
152
            // Get or register the blacklist Broadcast
153
            Broadcast<List<String>> blacklist =
154
                    JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context()));
155
            // Get or register the droppedWordsCounter Accumulator
156
            LongAccumulator droppedWordsCounter =
157
                    JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context()));
158
            // Use blacklist to drop words and use droppedWordsCounter to count them
159
            String counts = rdd.filter(wordCount -> {
160
                System.out.println("foreachRDD call: " + wordCount._1() + ", " + wordCount._2());
161
                if (blacklist.value().contains(wordCount._1())) {
162
                    droppedWordsCounter.add(wordCount._2());
163
                    return false;
164
                } else {
165
                    return true;
166
                }
167
            }).collect().toString();
168
            String output = "Counts at time " + time + " " + counts;
169
            System.out.println(output);
170
            System.out.println("Dropped " + droppedWordsCounter.value() + " word(s) totally");
171
            // droppedWordsCounter.
172
            System.out.println("Appending to " + outputFile.getAbsolutePath());
173
            Files.append(output + "\n", outputFile, Charset.defaultCharset());
174
        });
175
176
        return ssc;
96
  private static final ILogger LOGGER = IpuLoggerFactory.createLogger(JavaRecoverableNetworkWordCountTest.class);
97
  private static final int ARGS_LENGTH = 4;
98
  private static final int ARGS_1_INDEX = 0;
99
  private static final int ARGS_2_INDEX = 1;
100
  private static final int ARGS_3_INDEX = 2;
101
  private static final int ARGS_4_INDEX = 3;
102
  private static final long DURATIONS = 30L;
103
104
  private static final Pattern SPACE = Pattern.compile(" ");
105
106
  private static JavaStreamingContext createContext(String ip,
107
                                                    int port,
108
                                                    String checkpointDirectory,
109
                                                    String outputPath) throws IOException {
110
111
    // If you do not see this printed, that means the StreamingContext has been loaded
112
    // from the new checkpoint
113
    LOGGER.debug("Creating new context");
114
    File outputFile = new File(outputPath);
115
    if (outputFile.exists()) {
116
      java.nio.file.Files.delete(outputFile.toPath());
117
    }
118
    SparkConf sparkConf = new SparkConf().setAppName("JavaRecoverableNetworkWordCount");
119
    // Create the context with a 1 second batch size
120
    JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(DURATIONS));
121
    ssc.checkpoint(checkpointDirectory);
122
123
    // Create a socket stream on target ip:port and count the
124
    // words in input stream of \n delimited text (eg. generated by 'nc')
125
    JavaReceiverInputDStream<String> lines = ssc.socketTextStream(ip, port);
126
    JavaDStream<String> words = lines.flatMap((String x) -> {
127
      LOGGER.debug("flatMap call: " + x);
128
      return Arrays.asList(SPACE.split(x)).iterator();
129
    });
130
    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1))
131
            .reduceByKey(Integer::sum);
132
133
    wordCounts.foreachRDD((rdd, time) -> doWithRdd(rdd, time, outputFile));
134
135
    return ssc;
136
  }
137
138
  private static void doWithRdd(JavaPairRDD<String, Integer> rdd, Time time, File outputFile) throws IOException {
139
    // Get or register the blacklist Broadcast
140
    Broadcast<List<String>> blacklist =
141
            JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context()));
142
    // Get or register the droppedWordsCounter Accumulator
143
    LongAccumulator droppedWordsCounter =
144
            JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context()));
145
    // Use blacklist to drop words and use droppedWordsCounter to count them
146
    String counts = rdd.filter((Tuple2<String, Integer> wordCount) -> {
147
      LOGGER.debug("foreachRDD call: " + wordCount._1() + ", " + wordCount._2());
148
      if (blacklist.value().contains(wordCount._1())) {
149
        droppedWordsCounter.add(wordCount._2());
150
        return false;
151
      } else {
152
        return true;
153
      }
154
    }).collect().toString();
155
    String output = "Counts at time " + time + " " + counts;
156
    LOGGER.debug(output);
157
    LOGGER.debug("Dropped " + droppedWordsCounter.value() + " word(s) totally");
158
    // droppedWordsCounter.
159
    LOGGER.debug("Appending to " + outputFile.getAbsolutePath());
160
161
    try (OutputStreamWriter outputStreamWriter = new OutputStreamWriter(
162
            new FileOutputStream(outputFile, true), StandardCharsets.UTF_8)) {
163
      outputStreamWriter.write(output);
164
    }
165
  }
166
167
  public static void main(String[] args) {
168
    if (args.length != ARGS_LENGTH) {
169
      LOGGER.error("You arguments were " + Arrays.asList(args));
170
      LOGGER.error(
171
              "Usage: JavaRecoverableNetworkWordCount <hostname> <port> <checkpoint-directory>\n" +
172
                      "     <output-file>. <hostname> and <port> describe the TCP server that Spark\n" +
173
                      "     Streaming would connect to receive data. <checkpoint-directory> directory to\n" +
174
                      "     HDFS-compatible file system which checkpoint data <output-file> file to which\n" +
175
                      "     the word counts will be appended\n" +
176
                      "\n" +
177
                      "In local mode, <master> should be 'local[n]' with n > 1\n" +
178
                      "Both <checkpoint-directory> and <output-file> must be absolute paths");
179
      System.exit(1);
177 180
    }
178 181
179
    public static void main(String[] args) throws Exception {
180
        if (args.length != 4) {
181
            System.err.println("You arguments were " + Arrays.asList(args));
182
            System.err.println(
183
                    "Usage: JavaRecoverableNetworkWordCount <hostname> <port> <checkpoint-directory>\n" +
184
                            "     <output-file>. <hostname> and <port> describe the TCP server that Spark\n" +
185
                            "     Streaming would connect to receive data. <checkpoint-directory> directory to\n" +
186
                            "     HDFS-compatible file system which checkpoint data <output-file> file to which\n" +
187
                            "     the word counts will be appended\n" +
188
                            "\n" +
189
                            "In local mode, <master> should be 'local[n]' with n > 1\n" +
190
                            "Both <checkpoint-directory> and <output-file> must be absolute paths");
191
            System.exit(1);
192
        }
193
194
        String ip = args[0];
195
        int port = Integer.parseInt(args[1]);
196
        String checkpointDirectory = args[2];
197
        String outputPath = args[3];
198
199
        // Function to create JavaStreamingContext without any output operations
200
        // (used to detect the new context)
201
        Function0<JavaStreamingContext> createContextFunc =
202
                () -> createContext(ip, port, checkpointDirectory, outputPath);
203
204
        JavaStreamingContext ssc =
205
                JavaStreamingContext.getOrCreate(checkpointDirectory, createContextFunc);
206
        ssc.start();
207
        ssc.awaitTermination();
182
    String ip = args[ARGS_1_INDEX];
183
    int port = Integer.parseInt(args[ARGS_2_INDEX]);
184
    String checkpointDirectory = args[ARGS_3_INDEX];
185
    String outputPath = args[ARGS_4_INDEX];
186
187
    try {
188
      // Function to create JavaStreamingContext without any output operations
189
      // (used to detect the new context)
190
      Function0<JavaStreamingContext> createContextFunc =
191
              () -> createContext(ip, port, checkpointDirectory, outputPath);
192
193
      JavaStreamingContext ssc =
194
              JavaStreamingContext.getOrCreate(checkpointDirectory, createContextFunc);
195
      ssc.start();
196
      ssc.awaitTermination();
197
    } catch (InterruptedException e) {
198
      LOGGER.error(e.getMessage(), e);
199
      Thread.currentThread().interrupt();
208 200
    }
209
}
201
  }
202
}

+ 29 - 0
comp-example/src/main/java/com/ai/ipu/example/spark/JavaWordBlacklist.java

@ -0,0 +1,29 @@
1
package com.ai.ipu.example.spark;
2
3
import org.apache.spark.api.java.JavaSparkContext;
4
import org.apache.spark.broadcast.Broadcast;
5
6
import java.util.Arrays;
7
import java.util.List;
8
9
/**
10
 * Use this singleton to get or register a Broadcast variable.
11
 *
12
 * @author lilb3@asiainfo.com
13
 * @since 2020/4/27
14
 **/
15
final class JavaWordBlacklist {
16
  private static Broadcast<List<String>> instance;
17
18
  private JavaWordBlacklist() {
19
20
  }
21
22
  static synchronized Broadcast<List<String>> getInstance(JavaSparkContext jsc) {
23
    if (instance == null) {
24
      List<String> wordBlacklist = Arrays.asList("a", "b", "c");
25
      instance = jsc.broadcast(wordBlacklist);
26
    }
27
    return instance;
28
  }
29
}

+ 23 - 31
comp-example/src/main/java/com/ai/ipu/example/spark/RemoteFileSparkTest.java

@ -5,7 +5,6 @@ import com.ai.ipu.basic.log.IpuLoggerFactory;
5 5
import org.apache.spark.SparkConf;
6 6
import org.apache.spark.api.java.JavaRDD;
7 7
import org.apache.spark.api.java.JavaSparkContext;
8
import org.apache.spark.api.java.function.Function;
9 8
10 9
/**
11 10
 * Spark统计包含a b的单词个数示例
@ -13,37 +12,30 @@ import org.apache.spark.api.java.function.Function;
13 12
 * @author lilb3@asiainfo.com
14 13
 * @since 2019-05-24 17:10
15 14
 * 普通打包部署 需要上传原包和依赖包 如框架日志ipu-basic-3.1-SNAPSHOT.jar, 多个jar以逗号隔开
16
 * ~/software/spark-2.4.1-bin-hadoop2.7/bin/spark-submit --jars /home/mysql/test/ipu-basic-3.1-SNAPSHOT.jar --class com.ai.ipu.example.spark.RemoteFileSparkTest /home/mysql/test/comp-example-1.0.jar.original
15
 * ~/software/spark-2.4.1-bin-hadoop2.7/bin/spark-submit \
16
 * --jars /home/mysql/test/ipu-basic-3.1-SNAPSHOT.jar \
17
 * --class com.ai.ipu.example.spark.RemoteFileSparkTest \
18
 * /home/mysql/test/comp-example-1.0.jar.original
17 19
 **/
18 20
public class RemoteFileSparkTest {
19
    private static final ILogger LOGGER = IpuLoggerFactory.createLogger(RemoteFileSparkTest.class);
20
21
    public static void main(String[] args) {
22
        JavaSparkContext jsc = null;
23
        try {
24
            SparkConf conf = new SparkConf().setMaster(SparkConfig.getSparkMaster()).setAppName("RemoteFileSparkTest");
25
26
            jsc = new JavaSparkContext(conf);
27
28
            String logFile = SparkConfig.getHdfsUri() + "/aaa.txt";
29
            JavaRDD<String> jddData = jsc.textFile(logFile).cache();
30
31
            for (String str : jddData.collect()) {
32
                System.out.println("jddData: " + str);
33
            }
34
            jddData.saveAsTextFile(SparkConfig.getHdfsUri() + "/test/" + System.currentTimeMillis());
35
            long numAs = jddData.filter((Function<String, Boolean>) s -> s.contains("1")).count();
36
37
            long numBs = jddData.filter((Function<String, Boolean>) s -> s.contains("2")).count();
38
39
            LOGGER.info("Lines with a: " + numAs + ", lines with b: " + numBs);
40
            jsc.stop();
41
        } catch (Exception e) {
42
            LOGGER.error("Exception" + e);
43
        } finally {
44
            if (null != jsc) {
45
                jsc.close();
46
            }
47
        }
21
  private static final ILogger LOGGER = IpuLoggerFactory.createLogger(RemoteFileSparkTest.class);
22
23
  public static void main(String[] args) {
24
    SparkConf conf = new SparkConf().setMaster(SparkConfig.getSparkMaster()).setAppName("RemoteFileSparkTest");
25
    try (JavaSparkContext javaSparkContext = new JavaSparkContext(conf)) {
26
      String logFile = SparkConfig.getHdfsUri() + "/aaa.txt";
27
      JavaRDD<String> jddData = javaSparkContext.textFile(logFile).cache();
28
29
      for (String str : jddData.collect()) {
30
        LOGGER.debug("jddData: " + str);
31
      }
32
      jddData.saveAsTextFile(SparkConfig.getHdfsUri() + "/test/" + System.currentTimeMillis());
33
      long numAs = jddData.filter((String s) -> s.contains("1")).count();
34
      long numBs = jddData.filter((String s) -> s.contains("2")).count();
35
      LOGGER.info("Lines with a: " + numAs + ", lines with b: " + numBs);
36
      javaSparkContext.stop();
37
    } catch (Exception e) {
38
      LOGGER.error("Exception" + e.getMessage(), e);
48 39
    }
40
  }
49 41
}

+ 38 - 25
comp-example/src/main/java/com/ai/ipu/example/spark/SparkConfig.java

@ -1,6 +1,8 @@
1 1
package com.ai.ipu.example.spark;
2 2
3 3
import com.ai.ipu.basic.file.ResourceBundleUtil;
4
import com.ai.ipu.basic.log.ILogger;
5
import com.ai.ipu.basic.log.IpuLoggerFactory;
4 6
5 7
/**
6 8
 * 类描述
@ -8,33 +10,44 @@ import com.ai.ipu.basic.file.ResourceBundleUtil;
8 10
 * @author lilb3@asiainfo.com
9 11
 * @since 2019-07-01 15:36
10 12
 **/
11
public class SparkConfig {
12
    private static final String CONFIG_FILE_PATH = "spark";
13
    /*Spark集群*/
14
    private static String sparkMaster;
15
    /*Hadoop地址*/
16
    private static String hdfsUri;
17
    /*测试文件*/
18
    private static String testFileName;
19
20
    public static String getSparkMaster() {
21
        return sparkMaster;
22
    }
13
final class SparkConfig {
14
  private static final ILogger LOGGER = IpuLoggerFactory.createLogger(SparkConfig.class);
15
  private static final String CONFIG_FILE_PATH = "spark";
16
  /**
17
   * Spark集群
18
   */
19
  private static String sparkMaster;
20
  /**
21
   * Hadoop地址
22
   */
23
  private static String hdfsUri;
24
  /**
25
   * 测试文件
26
   */
27
  private static String testFileName;
23 28
24
    public static String getHdfsUri() {
25
        return hdfsUri;
29
  /*加载配置文件*/
30
  static {
31
    try {
32
      ResourceBundleUtil.initialize(CONFIG_FILE_PATH, SparkConfig.class);
33
    } catch (Exception e) {
34
      LOGGER.error(CONFIG_FILE_PATH + "配置文件读取失败" + e);
26 35
    }
36
  }
27 37
28
    public static String getTestFileName() {
29
        return testFileName;
30
    }
38
  private SparkConfig() {
31 39
32
    /*加载配置文件*/
33
    static {
34
        try {
35
            ResourceBundleUtil.initialize(CONFIG_FILE_PATH, SparkConfig.class);
36
        } catch (Exception e) {
37
            System.out.println(CONFIG_FILE_PATH + "配置文件读取失败" + e);
38
        }
39
    }
40
  }
41
42
  public static String getSparkMaster() {
43
    return sparkMaster;
44
  }
45
46
  public static String getHdfsUri() {
47
    return hdfsUri;
48
  }
49
50
  public static String getTestFileName() {
51
    return testFileName;
52
  }
40 53
}

+ 126 - 105
comp-example/src/main/java/com/ai/ipu/example/spark/SparkExample.java

@ -6,12 +6,9 @@ import org.apache.spark.SparkConf;
6 6
import org.apache.spark.api.java.JavaPairRDD;
7 7
import org.apache.spark.api.java.JavaRDD;
8 8
import org.apache.spark.api.java.JavaSparkContext;
9
import org.apache.spark.api.java.function.FlatMapFunction;
10
import org.apache.spark.api.java.function.Function2;
11
import org.apache.spark.api.java.function.PairFunction;
12
import org.apache.spark.api.java.function.VoidFunction;
13 9
import scala.Tuple2;
14 10
11
import java.io.File;
15 12
import java.util.Arrays;
16 13
import java.util.Iterator;
17 14
import java.util.List;
@ -23,111 +20,135 @@ import java.util.List;
23 20
 * @since 2019-05-24 17:10
24 21
 * 部署服务器方式:
25 22
 * a) SpringBoot打包部署 需要在pom指定startClass为com.ai.ipu.example.spark.SparkExample
26
 * ~/software/spark-2.4.1-bin-hadoop2.7/bin/spark-submit --class org.springframework.boot.loader.JarLauncher /home/mysql/test/comp-example-1.0.jar
23
 * ~/software/spark-2.4.1-bin-hadoop2.7/bin/spark-submit \
24
 * --class org.springframework.boot.loader.JarLauncher /home/mysql/test/comp-example-1.0.jar
27 25
 * b) 普通打包部署 需要上传原包和依赖包 如框架日志ipu-basic-3.1-SNAPSHOT.jar, 多个jar以逗号隔开
28
 * ~/software/spark-2.4.1-bin-hadoop2.7/bin/spark-submit --jars /home/mysql/test/ipu-basic-3.1-SNAPSHOT.jar --class com.ai.ipu.example.spark.SparkExample /home/mysql/test/comp-example-1.0.jar.original
29
 **/
26
 * ~/software/spark-2.4.1-bin-hadoop2.7/bin/spark-submit \
27
 * --jars /home/mysql/test/ipu-basic-3.1-SNAPSHOT.jar \
28
 * --class com.ai.ipu.example.spark.SparkExample /home/mysql/test/comp-example-1.0.jar.original
29
 */
30 30
public class SparkExample {
31
    private static final ILogger LOGGER = IpuLoggerFactory.createLogger(SparkExample.class);
32
33
    public static void main(String[] args) {
34
        /*
35
         * 对于所有的spark程序所言,要进行所有的操作,首先要创建一个spark上下文。
36
         * 在创建上下文的过程中,程序会向集群申请资源及构建相应的运行环境。
37
         * 设置spark应用程序名称
38
         * 创建的 sparkContext 唯一需要的参数就是 sparkConf,它是一组 K-V 属性对。
39
         */
40
        SparkConf sparkConf = new SparkConf().setAppName("SparkExample");
41
        /*java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.f$3 of type org.apache.spark.api.java.function.FlatMapFunction in instance of org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1*/
42
        //sparkConf.setJars(new String[]{SparkConfig.getJarFilePath()});
43
44
        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
45
46
        // 用List构建JavaRDD
47
        List<String> data = Arrays.asList("1", "2", "3", "4", "5");
48
        JavaRDD<String> distData = jsc.parallelize(data);
49
        List<String> intLine = distData.collect();
50
        for (String string : intLine) {
51
            LOGGER.debug("List构建: " + string);
52
        }
31
  private static final ILogger LOGGER = IpuLoggerFactory.createLogger(SparkExample.class);
53 32
54
        // 从hdfs读取文件形成RDD
55
        String textFilePath = SparkConfig.getHdfsUri() + "/" + SparkConfig.getTestFileName();
56
        /*
57
         * 利用textFile接口从文件系统中读入指定的文件,返回一个RDD实例对象
58
         * RDD的初始创建都是由SparkContext来负责的,将内存中的集合或者外部文件系统作为输入源。
59
         * RDD:弹性分布式数据集,即一个 RDD 代表一个被分区的只读数据集。一个 RDD 的生成只有两种途径,
60
         * 一是来自于内存集合和外部存储系统,另一种是通过转换操作来自于其他 RDD,比如 Map、Filter、Join,等等。
61
         * textFile()方法可将本地文件或HDFS文件转换成RDD,读取本地文件需要各节点上都存在,或者通过网络共享该文件
62
         */
63
        JavaRDD<String> lines = jsc.textFile(textFilePath);
64
        List<String> stringLine = lines.collect();
65
        for (String string : stringLine) {
66
            LOGGER.debug("文件读取: " + string);
67
        }
33
  public static void main(String[] args) {
34
    /*
35
     * 对于所有的spark程序所言,要进行所有的操作,首先要创建一个spark上下文。
36
     * 在创建上下文的过程中,程序会向集群申请资源及构建相应的运行环境。
37
     * 设置spark应用程序名称
38
     * 创建的 sparkContext 唯一需要的参数就是 sparkConf,它是一组 K-V 属性对。
39
     */
40
    SparkConf sparkConf = new SparkConf().setAppName("SparkExample");
68 41
69
        // 行数据的分割,调用flatMap函数
70
        /*
71
         * new FlatMapFunction<String, String>两个string分别代表输入和输出类型
72
         * Override的call方法需要自己实现一个转换的方法,并返回一个Iterable的结构
73
         *
74
         * flatmap属于一类非常常用的spark函数,简单的说作用就是将一条rdd数据使用你定义的函数给分解成多条rdd数据
75
         * 例如,当前状态下,lines这个rdd类型的变量中,每一条数据都是一行String,我们现在想把他拆分成1个个的词的话,
76
         */
77
        //flatMap与map的区别是,对每个输入,flatMap会生成一个或多个的输出,而map只是生成单一的输出
78
        //用空格分割各个单词,输入一行,输出多个对象,所以用flatMap
79
        JavaRDD<String> words = lines.flatMap((FlatMapFunction<String, String>) s -> {
80
            String[] words1 = s.split(" ");
81
            return Arrays.asList(words1).iterator();
82
        });
83
84
        // 将数据转换为key/value键值对
85
        /*
86
         * pairFunction<T,K,V>: T:输入类型;K,V:输出键值对
87
         * 表示输入类型为T,生成的key-value对中的key类型为k,value类型为v,对本例,T=String, K=String, V=Integer(计数)
88
         * 需要重写call方法实现转换
89
         */
90
        //scala.Tuple2<K,V> call(T t)
91
        //Tuple2为scala中的一个对象,call方法的输入参数为T,即输入一个单词s,新的Tuple2对象的key为这个单词,计数为1
92
        JavaPairRDD<String, Integer> ones = words.mapToPair((PairFunction<String, String, Integer>) s -> {
93
            LOGGER.debug("mapToPair call: " + s);
94
            return new Tuple2<>(s, 1);
95
        });
96
97
        // 聚合结果
98
        /*
99
         * 调用reduceByKey方法,按key值进行reduce
100
         *  reduceByKey方法,类似于MR的reduce
101
         *  要求被操作的数据(即下面实例中的ones)是KV键值对形式,该方法会按照key相同的进行聚合,在两两运算
102
         *  若ones有<"one", 1>, <"one", 1>,会根据"one"将相同的pair单词个数进行统计,输入为Integer,输出也为Integer输出<"one", 2>
103
         */
104
        // reduce阶段,key相同的value怎么处理的问题
105
        // 备注:spark也有reduce方法,输入数据是RDD类型就可以,不需要键值对,
106
        // reduce方法会对输入进来的所有数据进行两两运算
107
        JavaPairRDD<String, Integer> counts = ones.reduceByKey((Function2<Integer, Integer, Integer>) (i1, i2) -> {
108
            LOGGER.debug("reduceByKey call:" + i1 + ", " + i2);
109
            return i1 + i2;
110
        });
111
112
        // 结果输出到HDFS,Windows对应的是真实目录,如:D:/logs/spark_test/1555319746196,父目录必须存在,否则不执行
113
        counts.saveAsTextFile(SparkConfig.getHdfsUri() + "/spark/" + System.currentTimeMillis());
114
115
        /*
116
         * 结果转化为常见类型输出
117
         * collect方法用于将spark的RDD类型转化为我们熟知的java常见类型
118
         */
119
        List<Tuple2<String, Integer>> output = counts.collect();
120
        for (Tuple2<?, ?> tuple : output) {
121
            LOGGER.debug(tuple._1() + ": " + tuple._2());
122
        }
42
    /*
43
     * java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field
44
     * org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.f$3 of type org.apache.spark.api.java.function
45
     * .FlatMapFunction in instance of org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1
46
     */
47
    // sparkConf.setJars(new String[]{SparkConfig.getJarFilePath()})
48
49
    try (JavaSparkContext jsc = new JavaSparkContext(sparkConf)) {
50
      // 用List构建JavaRDD
51
      List<String> data = Arrays.asList("1", "2", "3", "4", "5");
52
      JavaRDD<String> distData = jsc.parallelize(data);
53
      List<String> intLine = distData.collect();
54
55
      for (String string : intLine) {
56
        LOGGER.debug("List构建: " + string);
57
      }
58
59
      // 从hdfs读取文件形成RDD
60
      String textFilePath = SparkConfig.getHdfsUri() + File.pathSeparator + SparkConfig.getTestFileName();
61
62
      /*
63
       * 利用textFile接口从文件系统中读入指定的文件,返回一个RDD实例对象。
64
       * RDD的初始创建都是由SparkContext来负责的,将内存中的集合或者外部文件系统作为输入源。
65
       * RDD:弹性分布式数据集,即一个 RDD 代表一个被分区的只读数据集。一个 RDD 的生成只有两种途径,
66
       * 一是来自于内存集合和外部存储系统,另一种是通过转换操作来自于其他 RDD,比如 Map、Filter、Join,等等。
67
       * textFile()方法可将本地文件或HDFS文件转换成RDD,读取本地文件需要各节点上都存在,或者通过网络共享该文件
68
       */
69
      JavaRDD<String> lines = jsc.textFile(textFilePath);
70
      List<String> stringLine = lines.collect();
71
72
      for (String string : stringLine) {
73
        LOGGER.debug("文件读取: " + string);
74
      }
75
76
      // 行数据的分割,调用flatMap函数
77
78
      /*
79
       * new FlatMapFunction<String, String>两个string分别代表输入和输出类型
80
       * Override的call方法需要自己实现一个转换的方法,并返回一个Iterable的结构
81
       *
82
       * flatMap属于一类非常常用的spark函数,简单的说作用就是将一条rdd数据使用你定义的函数给分解成多条rdd数据
83
       * 例如,当前状态下,lines这个rdd类型的变量中,每一条数据都是一行String,我们现在想把他拆分成1个个的词的话,
84
       */
85
86
      // flatMap与map的区别是,对每个输入,flatMap会生成一个或多个的输出,而map只是生成单一的输出
87
      // 用空格分割各个单词,输入一行,输出多个对象,所以用flatMap
88
      JavaRDD<String> words = lines.flatMap(
89
              (String s) -> {
90
                String[] words1 = s.split(" ");
91
92
                return Arrays.asList(words1).iterator();
93
              });
94
95
      // 将数据转换为key/value键值对
123 96
124
        // 直接输出
125
        counts.foreachPartition((VoidFunction<Iterator<Tuple2<String, Integer>>>) tuple2Iterator -> {
126
            while (tuple2Iterator.hasNext()) {
127
                Tuple2<String, Integer> t2 = tuple2Iterator.next();
128
                LOGGER.debug(t2._1() + ": " + t2._2());
129
            }
130
        });
131
        jsc.stop();
97
      /*
98
       * pairFunction<T,K,V>: T:输入类型;K,V:输出键值对
99
       * 表示输入类型为T,生成的key-value对中的key类型为k,value类型为v,对本例,T=String, K=String, V=Integer(计数)
100
       * 需要重写call方法实现转换
101
       */
102
103
      // scala.Tuple2<K,V> call(T t)
104
      // Tuple2为scala中的一个对象,call方法的输入参数为T,即输入一个单词s,新的Tuple2对象的key为这个单词,计数为1
105
      JavaPairRDD<String, Integer> ones = words.mapToPair(
106
              (String s) -> {
107
                LOGGER.debug("mapToPair call: " + s);
108
109
                return new Tuple2<>(s, 1);
110
              });
111
112
      // 聚合结果
113
114
      /*
115
       * 调用reduceByKey方法,按key值进行reduce
116
       *  reduceByKey方法,类似于MR的reduce
117
       *  要求被操作的数据(即下面实例中的ones)是KV键值对形式,该方法会按照key相同的进行聚合,在两两运算
118
       *  若ones有<"one", 1>, <"one", 1>,会根据"one"将相同的pair单词个数进行统计,输入为Integer,输出也为Integer输出<"one", 2>
119
       */
120
121
      // reduce阶段,key相同的value怎么处理的问题
122
      // 备注:spark也有reduce方法,输入数据是RDD类型就可以,不需要键值对,
123
      // reduce方法会对输入进来的所有数据进行两两运算
124
      JavaPairRDD<String, Integer> counts = ones.reduceByKey((Integer i1, Integer i2) -> {
125
        LOGGER.debug("reduceByKey call:" + i1 + ", " + i2);
126
127
        return i1 + i2;
128
      });
129
130
      // 结果输出到HDFS,Windows对应的是真实目录,如:D:/logs/spark_test/1555319746196,父目录必须存在,否则不执行
131
      counts.saveAsTextFile(SparkConfig.getHdfsUri() + "/spark/" + System.currentTimeMillis());
132
133
      /*
134
       * 结果转化为常见类型输出
135
       * collect方法用于将spark的RDD类型转化为我们熟知的java常见类型
136
       */
137
      List<Tuple2<String, Integer>> output = counts.collect();
138
139
      for (Tuple2<?, ?> tuple : output) {
140
        LOGGER.debug(tuple._1() + ": " + tuple._2());
141
      }
142
143
      // 直接输出
144
      counts.foreachPartition((Iterator<Tuple2<String, Integer>> tuple2Iterator) -> {
145
        while (tuple2Iterator.hasNext()) {
146
          Tuple2<String, Integer> t2 = tuple2Iterator.next();
147
148
          LOGGER.debug(t2._1() + ": " + t2._2());
149
        }
150
      });
151
      jsc.stop();
132 152
    }
153
  }
133 154
}

+ 68 - 53
comp-example/src/main/java/com/ai/ipu/example/spark/SparkReadHbaseTest.java

@ -1,5 +1,7 @@
1 1
package com.ai.ipu.example.spark;
2 2
3
import com.ai.ipu.basic.log.ILogger;
4
import com.ai.ipu.basic.log.IpuLoggerFactory;
3 5
import org.apache.hadoop.conf.Configuration;
4 6
import org.apache.hadoop.hbase.HBaseConfiguration;
5 7
import org.apache.hadoop.hbase.client.Result;
@ -13,13 +15,13 @@ import org.apache.hadoop.hbase.util.Bytes;
13 15
import org.apache.spark.SparkConf;
14 16
import org.apache.spark.api.java.JavaPairRDD;
15 17
import org.apache.spark.api.java.JavaSparkContext;
16
import org.apache.spark.api.java.function.PairFunction;
17 18
import scala.Tuple2;
18 19
19 20
import java.io.IOException;
20 21
import java.util.ArrayList;
21 22
import java.util.Arrays;
22 23
import java.util.List;
24
import java.util.regex.Pattern;
23 25
24 26
/**
25 27
 * Spark读取Hbase的数据
@ -37,60 +39,73 @@ import java.util.List;
37 39
 * 退出执行命令:
38 40
 * ctrl+c
39 41
 * 执行命令:
40
 * ~/software/spark-2.4.1-bin-hadoop2.7/bin/spark-submit --jars /home/mysql/test/ipu-basic-3.1-SNAPSHOT.jar,/home/mysql/test/lib/*.jar --class com.ai.ipu.example.spark.SparkReadHbaseTest /home/mysql/test/comp-example-1.0.jar.original "iZm5e5xe1w25avi0io1f5aZ:2101,iZm5e5xe1w25avi0io1f5aZ:2102,iZm5e5xe1w25avi0io1f5aZ:2103" 0 mytable cf msg
42
 * ~/software/spark-2.4.1-bin-hadoop2.7/bin/spark-submit \
43
 * --jars /home/mysql/test/ipu-basic-3.1-SNAPSHOT.jar,/home/mysql/test/lib/*.jar \
44
 * --class com.ai.ipu.example.spark.SparkReadHbaseTest \
45
 * /home/mysql/test/comp-example-1.0.jar.original \
46
 * "iZm5e5xe1w25avi0io1f5aZ:2101,iZm5e5xe1w25avi0io1f5aZ:2102,iZm5e5xe1w25avi0io1f5aZ:2103" 0 mytable cf msg
41 47
 **/
42 48
public class SparkReadHbaseTest {
43
    public static void main(String[] args) {
44
        if (args.length < 3) {
45
            System.err.println("You arguments were " + Arrays.asList(args));
46
            System.err.println("参数格式:\n" +
47
                    "\t第一个参数:Zookeeper连接信息;\n" +
48
                    "\t第二个参数:Zookeeper连接端口,为0表示不传此参数;\n" +
49
                    "\t第三个参数:HBase表名;\n" +
50
                    "\t第四个参数:列族名;\n" +
51
                    "\t第五个参数:列名。");
52
            System.exit(1);
53
        }
54
        SparkConf sparkConf = new SparkConf();
55
        sparkConf.setMaster(SparkConfig.getSparkMaster()).setAppName("SparkReadHbaseTest");
56
        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
49
  private static final ILogger LOGGER = IpuLoggerFactory.createLogger(SparkReadHbaseTest.class);
50
  private static final int MIN_ARGS_LENGTH = 3;
51
  private static final Pattern SERVER_REG = Pattern.compile("\\d+");
52
  private static final String PORT_0 = "0";
53
  private static final int ZOOKEEPER_SERVER_INDEX = 0;
54
  private static final int ZOOKEEPER_PORT_INDEX = 1;
55
  private static final int HBASE_TABLE_INDEX = 2;
56
  private static final int FAMILY_INDEX = 3;
57
  private static final int COLUMN_INDEX = 4;
57 58
58
        Configuration conf = HBaseConfiguration.create();
59
        String hbaseZookeeperQuorum = args[0];
60
        String hbaseZookeeperPropertyClientPort = args[1];
61
        conf.set("hbase.zookeeper.quorum", hbaseZookeeperQuorum);
62
        if (!"0".equals(hbaseZookeeperPropertyClientPort) && hbaseZookeeperPropertyClientPort.matches("\\d+")) {
63
            conf.set("hbase.zookeeper.property.clientPort", hbaseZookeeperPropertyClientPort);
64
        }
65
        Scan scan = new Scan();
66
            scan.addFamily(Bytes.toBytes(args[3]));
67
            scan.addColumn(Bytes.toBytes(args[3]), Bytes.toBytes(args[4]));
68
        String scanToString;
69
        try {
70
            ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
71
            scanToString = Base64.encodeBytes(proto.toByteArray());
72
            String tableName = args[2];
73
            conf.set(TableInputFormat.INPUT_TABLE, tableName);
74
            conf.set(TableInputFormat.SCAN, scanToString);
75
            JavaPairRDD<ImmutableBytesWritable, Result> hBaseRDD = jsc.newAPIHadoopRDD(conf,
76
                    TableInputFormat.class, ImmutableBytesWritable.class,
77
                    Result.class);
78
            JavaPairRDD<String, List<String>> javaPairRDD = hBaseRDD.mapToPair(
79
                    (PairFunction<Tuple2<ImmutableBytesWritable, Result>, String, List<String>>) results -> {
80
                        List<String> list = new ArrayList<>();
81
                        byte[] msg = results._2().getValue(Bytes.toBytes(args[3]), Bytes.toBytes(args[4]));
82
                        list.add(Bytes.toString(msg));
83
                        return new Tuple2<>(Bytes.toString(results._1().get()), list);
84
                    }
85
            );
86
            List<Tuple2<String, List<String>>> list = javaPairRDD.collect();
87
            for (Tuple2<String, List<String>> tuple2 : list) {
88
                System.out.println("javaPairRDD.collect:" + tuple2._1 + " , " + tuple2._2);
89
            }
90
        } catch (IOException io) {
91
            io.printStackTrace();
92
        } catch (Exception e) {
93
            e.printStackTrace();
94
        }
59
  public static void main(String[] args) {
60
    if (args.length < MIN_ARGS_LENGTH) {
61
      LOGGER.error("You arguments were " + Arrays.asList(args));
62
      LOGGER.error("参数格式:\n" +
63
              "\t第一个参数:Zookeeper连接信息;\n" +
64
              "\t第二个参数:Zookeeper连接端口,为0表示不传此参数;\n" +
65
              "\t第三个参数:HBase表名;\n" +
66
              "\t第四个参数:列族名;\n" +
67
              "\t第五个参数:列名。");
68
      System.exit(1);
95 69
    }
70
    SparkConf sparkConf = new SparkConf();
71
    sparkConf.setMaster(SparkConfig.getSparkMaster()).setAppName("SparkReadHbaseTest");
72
    try (JavaSparkContext jsc = new JavaSparkContext(sparkConf)) {
73
      Configuration conf = HBaseConfiguration.create();
74
      String hbaseZookeeperQuorum = args[ZOOKEEPER_SERVER_INDEX];
75
      String hbaseZookeeperPropertyClientPort = args[ZOOKEEPER_PORT_INDEX];
76
      conf.set("hbase.zookeeper.quorum", hbaseZookeeperQuorum);
77
      if (!PORT_0.equals(hbaseZookeeperPropertyClientPort)
78
              && SERVER_REG.matcher(hbaseZookeeperPropertyClientPort).matches()) {
79
        conf.set("hbase.zookeeper.property.clientPort", hbaseZookeeperPropertyClientPort);
80
      }
81
      Scan scan = new Scan();
82
      scan.addFamily(Bytes.toBytes(args[FAMILY_INDEX]));
83
      scan.addColumn(Bytes.toBytes(args[FAMILY_INDEX]), Bytes.toBytes(args[COLUMN_INDEX]));
84
      ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
85
      String scanToString = Base64.encodeBytes(proto.toByteArray());
86
      String tableName = args[HBASE_TABLE_INDEX];
87
      conf.set(TableInputFormat.INPUT_TABLE, tableName);
88
      conf.set(TableInputFormat.SCAN, scanToString);
89
      JavaPairRDD<ImmutableBytesWritable, Result> hbaseRdd = jsc.newAPIHadoopRDD(conf,
90
              TableInputFormat.class, ImmutableBytesWritable.class,
91
              Result.class);
92
      JavaPairRDD<String, List<String>> javaPairRdd = hbaseRdd.mapToPair(
93
              (Tuple2<ImmutableBytesWritable, Result> results) -> {
94
                List<String> list = new ArrayList<>();
95
                byte[] msg = results._2().getValue(Bytes.toBytes(args[FAMILY_INDEX]),
96
                        Bytes.toBytes(args[COLUMN_INDEX]));
97
                list.add(Bytes.toString(msg));
98
                return new Tuple2<>(Bytes.toString(results._1().get()), list);
99
              }
100
      );
101
      List<Tuple2<String, List<String>>> list = javaPairRdd.collect();
102
      for (Tuple2<String, List<String>> tuple2 : list) {
103
        LOGGER.debug("javaPairRDD.collect:" + tuple2._1 + " , " + tuple2._2);
104
      }
105
    } catch (IOException e) {
106
      LOGGER.error("IOException:" + e.getMessage(), e);
107
    } catch (Exception e) {
108
      LOGGER.error("Exception:" + e.getMessage(), e);
109
    }
110
  }
96 111
}

+ 74 - 61
comp-example/src/main/java/com/ai/ipu/example/spark/SparkStreamKafkaCountTest.java

@ -1,11 +1,10 @@
1 1
package com.ai.ipu.example.spark;
2 2
3
import com.ai.ipu.basic.log.ILogger;
4
import com.ai.ipu.basic.log.IpuLoggerFactory;
3 5
import org.apache.kafka.clients.consumer.ConsumerRecord;
4 6
import org.apache.kafka.common.serialization.StringDeserializer;
5 7
import org.apache.spark.SparkConf;
6
import org.apache.spark.api.java.function.FlatMapFunction;
7
import org.apache.spark.api.java.function.Function2;
8
import org.apache.spark.api.java.function.PairFunction;
9 8
import org.apache.spark.streaming.Duration;
10 9
import org.apache.spark.streaming.api.java.JavaDStream;
11 10
import org.apache.spark.streaming.api.java.JavaInputDStream;
@ -32,68 +31,82 @@ import java.util.List;
32 31
 * 连接Kafka:
33 32
 * ~/software/kafka_2.11-2.0.0/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
34 33
 * 执行命令:
35
 * ~/software/spark-2.4.1-bin-hadoop2.7/bin/spark-submit --jars /home/mysql/test/ipu-basic-3.1-SNAPSHOT.jar,/home/mysql/test/lib/*.jar --class com.ai.ipu.example.spark.SparkStreamKafkaCountTest /home/mysql/test/comp-example-1.0.jar.original hdfs://iZm5e5xe1w25avi0io1f5aZ:9000/checkpoint/SparkStreamKafkaCountTest/ test "iZm5e5xe1w25avi0io1f5aZ:9091,iZm5e5xe1w25avi0io1f5aZ:9092,iZm5e5xe1w25avi0io1f5aZ:9093"
34
 * ~/software/spark-2.4.1-bin-hadoop2.7/bin/spark-submit \
35
 * --jars /home/mysql/test/ipu-basic-3.1-SNAPSHOT.jar,/home/mysql/test/lib/*.jar \
36
 * --class com.ai.ipu.example.spark.SparkStreamKafkaCountTest \
37
 * /home/mysql/test/comp-example-1.0.jar.original \
38
 * hdfs://iZm5e5xe1w25avi0io1f5aZ:9000/checkpoint/SparkStreamKafkaCountTest/ \
39
 * test "iZm5e5xe1w25avi0io1f5aZ:9091,iZm5e5xe1w25avi0io1f5aZ:9092,iZm5e5xe1w25avi0io1f5aZ:9093"
36 40
 * 输入测试数据:
37 41
 * ------随便输入------
38 42
 **/
39 43
public class SparkStreamKafkaCountTest {
44
  private static final ILogger LOGGER = IpuLoggerFactory.createLogger(SparkStreamKafkaCountTest.class);
45
  private static final int ARGS_LENGTH = 3;
46
  private static final int CHECKPOINT_INDEX = 0;
47
  private static final int TOPIC_INDEX = 1;
48
  private static final int BROKER_INDEX = 2;
49
  private static final long DURATIONS = 10000L;
50
  private static final int INITIAL_CAPACITY = 16;
40 51
41
    public static void main(String[] args) {
42
        if (args.length < 3) {
43
            System.err.println("You arguments were " + Arrays.asList(args));
44
            System.err.println("参数格式:\n" +
45
                    "\t第一个参数:CheckPoint文件路径;\n" +
46
                    "\t第二个参数:topic kafka主题;\n" +
47
                    "\t第三个参数:Kafka的连接信息,ip:port。");
48
            System.exit(1);
49
        }
50
        SparkConf sparkConf = new SparkConf().setAppName("SparkStreamKafkaCountTest").setMaster(SparkConfig.getSparkMaster());
51
        JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(10000));
52
        jssc.checkpoint(args[0]);
53
        //存放话题跟分片的映射关系
54
        String[] topicsArr = new String[]{args[1]};
55
        Collection<String> topicSet = new HashSet<>(Arrays.asList(topicsArr));
56
        HashMap kafkaParams = new HashMap<>();
57
        // 构建kafka参数map
58
        // 主要要放置的是连接的kafka集群的地址(broker集群的地址列表)
59
        // Kafka服务监听端口
60
        kafkaParams.put("bootstrap.servers", args[2]);
61
        // 指定kafka输出key的数据类型及编码格式(默认为字符串类型编码格式为uft-8)
62
        kafkaParams.put("key.deserializer", StringDeserializer.class);
63
        // 指定kafka输出value的数据类型及编码格式(默认为字符串类型编码格式为uft-8)
64
        kafkaParams.put("value.deserializer", StringDeserializer.class);
65
        // 消费者ID,随意指定
66
        kafkaParams.put("group.id", "SparkStreamKafkaCountTest");
67
        // 指定从latest(最新,其他版本的是largest这里不行)还是smallest(最早)处开始读取数据
68
        kafkaParams.put("auto.offset.reset", "latest");
69
        // 如果true,consumer定期地往zookeeper写入每个分区的offset
70
        kafkaParams.put("enable.auto.commit", false);
71
        //从Kafka中获取数据转换成RDD
72
        JavaInputDStream<ConsumerRecord<String, String>> lines = KafkaUtils.createDirectStream(
73
                jssc,
74
                LocationStrategies.PreferConsistent(),
75
                ConsumerStrategies.Subscribe(topicSet, kafkaParams)
76
        );
77
        JavaDStream<String> words = lines.flatMap((FlatMapFunction<ConsumerRecord<String, String>, String>) s -> {
78
            List<String> list = new ArrayList<>();
79
            list.add(s.value());
80
            return list.iterator();
81
        });
82
        //对其中的单词进行统计
83
        JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
84
                (PairFunction<String, String, Integer>) s -> {
85
                    System.out.println("mapToPair call: " + s);
86
                    return new Tuple2<>(s, 1);
87
                }).reduceByKey((Function2<Integer, Integer, Integer>) (i1, i2) -> {
88
            System.out.println("reduceByKey call: " + i1 + ", " + i2);
89
            return i1 + i2;
90
        });
91
        wordCounts.print();
92
        jssc.start();
93
        try {
94
            jssc.awaitTermination();
95
        } catch (InterruptedException e) {
96
            e.printStackTrace();
97
        }
52
  public static void main(String[] args) {
53
    if (args.length < ARGS_LENGTH) {
54
      LOGGER.error("You arguments were " + Arrays.asList(args));
55
      LOGGER.error("参数格式:\n" +
56
              "\t第一个参数:CheckPoint文件路径;\n" +
57
              "\t第二个参数:topic kafka主题;\n" +
58
              "\t第三个参数:Kafka的连接信息,ip:port。");
59
      System.exit(1);
98 60
    }
61
    SparkConf sparkConf = new SparkConf().setAppName("SparkStreamKafkaCountTest")
62
            .setMaster(SparkConfig.getSparkMaster());
63
    JavaStreamingContext javaStreamingContext = new JavaStreamingContext(sparkConf, new Duration(DURATIONS));
64
    javaStreamingContext.checkpoint(args[CHECKPOINT_INDEX]);
65
    //存放话题跟分片的映射关系
66
    String[] topicsArr = new String[]{args[TOPIC_INDEX]};
67
    Collection<String> topicSet = new HashSet<>(Arrays.asList(topicsArr));
68
    HashMap<String, Object> kafkaParams = new HashMap<>(INITIAL_CAPACITY);
69
    // 构建kafka参数map
70
    // 主要要放置的是连接的kafka集群的地址(broker集群的地址列表)
71
    // Kafka服务监听端口
72
    kafkaParams.put("bootstrap.servers", args[BROKER_INDEX]);
73
    // 指定kafka输出key的数据类型及编码格式(默认为字符串类型编码格式为uft-8)
74
    kafkaParams.put("key.deserializer", StringDeserializer.class);
75
    // 指定kafka输出value的数据类型及编码格式(默认为字符串类型编码格式为uft-8)
76
    kafkaParams.put("value.deserializer", StringDeserializer.class);
77
    // 消费者ID,随意指定
78
    kafkaParams.put("group.id", "SparkStreamKafkaCountTest");
79
    // 指定从latest(最新,其他版本的是largest这里不行)还是smallest(最早)处开始读取数据
80
    kafkaParams.put("auto.offset.reset", "latest");
81
    // 如果true,consumer定期地往zookeeper写入每个分区的offset
82
    kafkaParams.put("enable.auto.commit", false);
83
    //从Kafka中获取数据转换成RDD
84
    JavaInputDStream<ConsumerRecord<String, String>> lines = KafkaUtils.createDirectStream(
85
            javaStreamingContext,
86
            LocationStrategies.PreferConsistent(),
87
            ConsumerStrategies.Subscribe(topicSet, kafkaParams   ConsumerStrategies.Subscribe(topicSet, kafkaParams)
88
    );
89
    JavaDStream<String> words = lines.flatMap((ConsumerRecord<String, String> s) -> {
90
      List<String> list = new ArrayList<>();
91
      list.add(s.value());
92
      return list.iterator();
93
    });
94
    //对其中的单词进行统计
95
    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
96
            (String s) -> {
97
              LOGGER.debug("mapToPair call: " + s);
98
              return new Tuple2<>(s, 1);
99
            }).reduceByKey((Integer i1, Integer i2) -> {
100
      LOGGER.debug("reduceByKey call: " + i1 + ", " + i2);
101
      return i1 + i2;
102
    });
103
    wordCounts.print();
104
    javaStreamingContext.start();
105
    try {
106
      javaStreamingContext.awaitTermination();
107
    } catch (InterruptedException e) {
108
      LOGGER.error("InterruptedException:" + e.getMessage(), e);
109
      Thread.currentThread().interrupt();
110
    }
111
  }
99 112
}

+ 124 - 74
comp-example/src/main/java/com/ai/ipu/example/spark/SparkStreamSocketOrFileFolderCountTest.java

@ -1,10 +1,9 @@
1 1
package com.ai.ipu.example.spark;
2 2
3
import com.ai.ipu.basic.log.ILogger;
4
import com.ai.ipu.basic.log.IpuLoggerFactory;
3 5
import org.apache.spark.SparkConf;
4 6
import org.apache.spark.api.java.Optional;
5
import org.apache.spark.api.java.function.FlatMapFunction;
6
import org.apache.spark.api.java.function.Function2;
7
import org.apache.spark.api.java.function.PairFunction;
8 7
import org.apache.spark.streaming.Durations;
9 8
import org.apache.spark.streaming.api.java.JavaDStream;
10 9
import org.apache.spark.streaming.api.java.JavaPairDStream;
@ -13,27 +12,46 @@ import scala.Tuple2;
13 12
14 13
import java.util.Arrays;
15 14
import java.util.List;
15
import java.util.regex.Pattern;
16 16
17 17
/**
18 18
 * Spark Stream 从HDSF系统或者Socket读取数据,并统计单词出现次数   累计出现次数
19 19
 *
20 20
 * @author lilb3@asiainfo.com
21 21
 * @since 2019-05-24 17:10
22
 * 普通打包部署
23
 * ~/shell/hadoop/9000/conn_hadoop.sh fs -rm -R hdfs://iZm5e5xe1w25avi0io1f5aZ:9000/checkpoint/SparkStreamSocketCountTest/
24 22
 * Socket:
25 23
 * 开启监听:
26 24
 * nc -lk iZm5e5xe1w25avi0io1f5aZ 9100
27
 * ~/software/spark-2.4.1-bin-hadoop2.7/bin/spark-submit --jars /home/mysql/test/ipu-basic-3.1-SNAPSHOT.jar --class com.ai.ipu.example.spark.SparkStreamSocketOrFileFolderCountTest /home/mysql/test/comp-example-1.0.jar.original hdfs://iZm5e5xe1w25avi0io1f5aZ:9000/checkpoint/SparkStreamSocketCountTest/ 0 iZm5e5xe1w25avi0io1f5aZ:9100 hdfs://iZm5e5xe1w25avi0io1f5aZ:9000/spark/SparkStreamSocketCountTest/ ~/logs/spark/SparkStreamSocketCountTest.txt
28
 *
29
 * ~/shell/hadoop/9000/conn_hadoop.sh fs -rm -R hdfs://iZm5e5xe1w25avi0io1f5aZ:9000/checkpoint/SparkStreamFileFolderCountTest/
25
 * 删除数据文件:
26
 * ~/shell/hadoop/9000/conn_hadoop.sh fs -rm -R \
27
 * hdfs://iZm5e5xe1w25avi0io1f5aZ:9000/checkpoint/SparkStreamSocketCountTest/
28
 * 执行命令:
29
 * ~/software/spark-2.4.1-bin-hadoop2.7/bin/spark-submit \
30
 * --jars /home/mysql/test/ipu-basic-3.1-SNAPSHOT.jar \
31
 * --class com.ai.ipu.example.spark.SparkStreamSocketOrFileFolderCountTest \
32
 * /home/mysql/test/ipu-basic-3.1-SNAPSHOT.jar.original \
33
 * hdfs://iZm5e5xe1w25avi0io1f5aZ:9000/checkpoint/SparkStreamSocketCountTest/ \
34
 * 0 iZm5e5xe1w25avi0io1f5aZ:9100 \
35
 * hdfs://iZm5e5xe1w25avi0io1f5aZ:9000/spark/SparkStreamSocketCountTest/ \
36
 * ~/logs/spark/SparkStreamSocketCountTest.txt
37
 * <p>
38
 * 删除数据文件:
39
 * ~/shell/hadoop/9000/conn_hadoop.sh fs -rm -R \
40
 * hdfs://iZm5e5xe1w25avi0io1f5aZ:9000/checkpoint/SparkStreamFileFolderCountTest/
30 41
 * 文件夹:
31 42
 * 创建备用文件:
32 43
 * echo "a aa aaa b bb bbb c cc ccc d dd ddd e ee eee f ff fff g gg ggg" > ~/test1.txt
33 44
 * echo "1 11 111 2 22 222 3 33 333 4 44 444 5 55 555 6 66 666 7 77 777" > ~/test2.txt
34 45
 * echo "~ ~~ ~~~ ! !! !!! @ @@ @@@ # ## ### $ $$ $$$ % %% %%% ^ ^^ ^^^" > ~/test3.txt
35 46
 * 执行命令:
36
 * ~/software/spark-2.4.1-bin-hadoop2.7/bin/spark-submit --jars /home/mysql/test/ipu-basic-3.1-SNAPSHOT.jar --class com.ai.ipu.example.spark.SparkStreamSocketOrFileFolderCountTest /home/mysql/test/comp-example-1.0.jar.original hdfs://iZm5e5xe1w25avi0io1f5aZ:9000/checkpoint/SparkStreamFileFolderCountTest/ 1 hdfs://iZm5e5xe1w25avi0io1f5aZ:9000/spark/testFileFolder/ hdfs://iZm5e5xe1w25avi0io1f5aZ:9000/spark/SparkStreamFileFolderCountTest/ ~/logs/spark/SparkStreamFileFolderCountTest.txt
47
 * ~/software/spark-2.4.1-bin-hadoop2.7/bin/spark-submit \
48
 * --jars /home/mysql/test/ipu-basic-3.1-SNAPSHOT.jar \
49
 * --class com.ai.ipu.example.spark.SparkStreamSocketOrFileFolderCountTest \
50
 * /home/mysql/test/comp-example-1.0.jar.original \
51
 * hdfs://iZm5e5xe1w25avi0io1f5aZ:9000/checkpoint/SparkStreamFileFolderCountTest/ \
52
 * 1 hdfs://iZm5e5xe1w25avi0io1f5aZ:9000/spark/testFileFolder/ \
53
 * hdfs://iZm5e5xe1w25avi0io1f5aZ:9000/spark/SparkStreamFileFolderCountTest/ \
54
 * ~/logs/spark/SparkStreamFileFolderCountTest.txt
37 55
 * 上传文件做计算:
38 56
 * ~/shell/hadoop/9000/conn_hadoop.sh fs -mkdir /spark/testFileFolder
39 57
 * ~/shell/hadoop/9000/conn_hadoop.sh fs -put ~/test1.txt /spark/testFileFolder/test10.txt
@ -47,76 +65,108 @@ import java.util.List;
47 65
 * ~/shell/hadoop/9000/conn_hadoop.sh fs -put ~/test3.txt /spark/testFileFolder/test33.txt
48 66
 **/
49 67
public class SparkStreamSocketOrFileFolderCountTest {
50
    public static void main(String[] args) {
51
        if(args.length < 5){
52
            System.err.println("You arguments were " + Arrays.asList(args));
53
            System.err.println("参数格式:\n" +
54
                    "\t第一个参数:CheckPoint文件路径;\n" +
55
                    "\t第二个参数:计数类型 0表示Socket,1表示文件系统;\n" +
56
                    "\t第三个参数:Socket的ip:port 或者 需要统计的文件夹Path;\n" +
57
                    "\t第四个参数:执行结果保存到hdfs的目录;\n" +
58
                    "\t第五个参数:执行结果保存到本地的目录。");
59
            System.exit(1);
60
        }
68
  private static final ILogger LOGGER = IpuLoggerFactory.createLogger(SparkStreamSocketOrFileFolderCountTest.class);
69
  private static final Pattern SERVER_REG = Pattern.compile("\\w+:\\d+");
70
  private static final int ARGS_LENGTH = 5;
71
  private static final String SOCKET_FLAG = "0";
72
  /**
73
   * 测试类型Index
74
   */
75
  private static final int TYPE_INDEX = 1;
76
  /**
77
   * 连接信息Index
78
   */
79
  private static final int BROKER_INDEX = 2;
80
  /**
81
   * 保存HDFS文件Index
82
   */
83
  private static final int SAVE_HDFS_FILE_NAME_INDEX = 3;
84
  /**
85
   * 保存本地文件路径Index
86
   */
87
  private static final int SAVE_LOCAL_FILE_NAME_INDEX = 4;
88
  /**
89
   * 持续时长
90
   */
91
  private static final long DURATIONS = 30L;
92
93
94
  public static void main(String[] args) {
95
    if (args.length < ARGS_LENGTH) {
96
      LOGGER.error("You arguments were " + Arrays.asList(args));
97
      LOGGER.error("参数格式:\n" +
98
              "\t第一个参数:CheckPoint文件路径;\n" +
99
              "\t第二个参数:计数类型 0表示Socket,1表示文件系统;\n" +
100
              "\t第三个参数:Socket的ip:port 或者 需要统计的文件夹Path;\n" +
101
              "\t第四个参数:执行结果保存到hdfs的目录;\n" +
102
              "\t第五个参数:执行结果保存到本地的目录。");
103
      System.exit(1);
104
    }
61 105
        /*创建一个本地StreamingContext两个工作线程和批间隔1秒(原作者是间隔一秒,产生的速度太快了,我这里改为30秒,
62 106
        还有就是产生了在将处理结果存放的目录下产生了大量的小文件,这样在生产环境中肯定是不行的,
63 107
        我感觉应该是按文件的大小来产生而不应该是按时间间隔产生)*/
64
        SparkConf conf = new SparkConf().setMaster(SparkConfig.getSparkMaster()).setAppName("SparkStreamSocketOrFileFolderCountTest").set("spark.testing.memory", "2147480000");
65
        JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(30));
66
67
        jssc.checkpoint(args[0]);
68
        //下面两行代码是数据来源:第一行是通过socketTextStream套接字,第二行是直接通过hdfs上的某个文件目录来作为输入数据源
69
        JavaDStream<String> words;
70
        if("0".equals(args[1])){
71
            // 在u0终端上面执行 nc -lk 9999
72
            if(!args[2].matches("\\w+\\:\\d+")){
73
                System.out.println("第三个参数不对,格式为Socket的ip:port");
74
                System.exit(1);
75
            }
76
            words = jssc.socketTextStream(args[2].split(":")[0], Integer.parseInt(args[2].split(":")[1])).flatMap((FlatMapFunction<String, String>) x -> {
77
                System.out.println(Arrays.asList(x.split(" ")).get(0));
78
                return Arrays.asList(x.split(" ")).iterator();
79
            });
80
        } else {
81
            words = jssc.textFileStream(args[2]).flatMap((FlatMapFunction<String, String>) x -> {
82
                System.out.println(Arrays.asList(x.split(" ")).get(0));
83
                return Arrays.asList(x.split(" ")).iterator();
84
            });
108
    SparkConf conf = new SparkConf().setMaster(SparkConfig.getSparkMaster())
109
            .setAppName("SparkStreamSocketOrFileFolderCountTest").set("spark.testing.memory", "2147480000");
110
    try (JavaStreamingContext javaStreamingContext = new JavaStreamingContext(conf, Durations.seconds(DURATIONS))) {
111
      javaStreamingContext.checkpoint(args[0]);
112
      //下面两行代码是数据来源:第一行是通过socketTextStream套接字,第二行是直接通过hdfs上的某个文件目录来作为输入数据源
113
      JavaDStream<String> words;
114
      if (SOCKET_FLAG.equals(args[TYPE_INDEX])) {
115
        // 在u0终端上面执行 nc -lk 9999
116
        if (!SERVER_REG.matcher(args[BROKER_INDEX]).matches()) {
117
          LOGGER.debug("第三个参数不对,格式为Socket的ip:port");
118
          System.exit(1);
85 119
        }
86
        // Count each word in each batch
87
        JavaPairDStream<String, Integer> pairs = words.mapToPair((PairFunction<String, String, Integer>) s -> new Tuple2<>(s, 1));
88
89
        // 使用updateStateByKey来更新状态
90
        // 参数valueList:相当于这个batch,这个key新的值,可能有多个,比如(hadoop,1)(hadoop,1)传入的可能是(1,1)
91
        // 参数oldState:就是指这个key之前的状态
92
        JavaPairDStream<String, Integer> stateDStream = pairs.updateStateByKey((Function2<List<Integer>, Optional<Integer>, Optional<Integer>>) (valueList, oldState) -> {
93
            Integer newState = 0;
94
            // 如果oldState之前已经存在,那么这个key可能之前已经被统计过,否则说明这个key第一次出现
95
            if (oldState.isPresent()) {
96
                newState = oldState.get();
97
            }
98
99
            // 更新state
100
            for (Integer value : valueList) {
101
                newState += value;
102
            }
103
            return Optional.of(newState);
120
        words = javaStreamingContext
121
                .socketTextStream(args[BROKER_INDEX].split(":")[0],
122
                        Integer.parseInt(args[BROKER_INDEX].split(":")[1]))
123
                .flatMap((String x) -> {
124
                  LOGGER.debug(Arrays.asList(x.split(" ")).get(0));
125
                  return Arrays.asList(x.split(" ")).iterator();
126
                });
127
      } else {
128
        words = javaStreamingContext.textFileStream(args[BROKER_INDEX]).flatMap((String x) -> {
129
          LOGGER.debug(Arrays.asList(x.split(" ")).get(0));
130
          return Arrays.asList(x.split(" ")).iterator();
104 131
        });
132
      }
133
      // Count each word in each batch
134
      JavaPairDStream<String, Integer> pairs = words.mapToPair(s -> new Tuple2<>(s, 1));
105 135
106
        // 打印前十的元素每个抽样生成DStream到控制台
107
        stateDStream.print();
108
        // 将处理结果保存在hdfs中
109
        stateDStream.dstream().saveAsTextFiles(args[3], System.currentTimeMillis() + "_count");
110
        // 将处理结果保存在Linux本地中
111
        stateDStream.dstream().saveAsTextFiles(args[4], "test");
136
      // 使用updateStateByKey来更新状态
137
      // 参数valueList:相当于这个batch,这个key新的值,可能有多个,比如(hadoop,1)(hadoop,1)传入的可能是(1,1)
138
      // 参数oldState:就是指这个key之前的状态
139
      JavaPairDStream<String, Integer> javaPairDataStream = pairs
140
              .updateStateByKey((List<Integer> valueList, Optional<Integer> oldState) -> {
141
                Integer newState = 0;
142
                // 如果oldState之前已经存在,那么这个key可能之前已经被统计过,否则说明这个key第一次出现
143
                if (oldState.isPresent()) {
144
                  newState = oldState.get();
145
                }
112 146
113
        jssc.start();
114
        try {
115
            // Wait for the computation to terminate
116
            jssc.awaitTermination();
117
        } catch (InterruptedException e) {
118
            e.printStackTrace();
119
        }
120
    }
147
                // 更新state
148
                for (Integer value : valueList) {
149
                  newState += value;
150
                }
151
                return Optional.of(newState);
152
              });
121 153
122
}
154
      // 打印前十的元素每个抽样生成DStream到控制台
155
      javaPairDataStream.print();
156
      // 将处理结果保存在hdfs中
157
      javaPairDataStream.dstream()
158
              .saveAsTextFiles(args[SAVE_HDFS_FILE_NAME_INDEX], System.currentTimeMillis() + "_count");
159
      // 将处理结果保存在Linux本地中
160
      javaPairDataStream.dstream().saveAsTextFiles(args[SAVE_LOCAL_FILE_NAME_INDEX], "test");
161
      //启动
162
      javaStreamingContext.start();
163
      // Wait for the computation to terminate
164
      javaStreamingContext.awaitTermination();
165
    } catch (NumberFormatException e) {
166
      LOGGER.error("NumberFormatException:" + e.getMessage(), e);
167
    } catch (InterruptedException e) {
168
      LOGGER.error("InterruptedException:" + e.getMessage(), e);
169
      Thread.currentThread().interrupt();
170
    }
171
  }
172
}