63 Commits bac07c34ef ... b7d4c9d278

Autor SHA1 Mensagem Data
  lilb3 b7d4c9d278 Merge remote-tracking branch 'origin/master' 6 anos atrás
  lilb3 76cf5b7d79 1.Hadoop/Hbase测试用例代码已经转移到对应目录,故删除。 6 anos atrás
  lilb3 c621b8bb42 Hadoop+Hbase+Spark测试用例代码提交 6 anos atrás
  liutong3 ef5ba788a7 Merge branch 'master' of http://10.1.235.20:3000/rest/rest-guide 6 anos atrás
  liutong3 f3aa8d34c9 增加rabbitmq的消费者、生产者代码 6 anos atrás
  liutong3 054c7af470 Merge branch 'master' of http://10.1.235.20:3000/rest/rest-guide 6 anos atrás
  liutong3 e6ebab4a15 重装zk,配置信息有变化 6 anos atrás
  liutong3 c2df84bca8 Merge branch 'master' of http://10.1.235.20:3000/rest/rest-guide 6 anos atrás
  liutong3 b73972457b zk改成使用ipu-zk连接 6 anos atrás
  liutong3 1c8cbafb8f rocketMQ示例 6 anos atrás
  liutong3 afeb47976f kafka示例 6 anos atrás
  liutong3 0c878383ab zookeeper示例 6 anos atrás
  huangbo da13ffafd9 增加异常编码,增加上下文操作。 6 anos atrás
  huangbo e365aa7af3 完成文件上传范例的初始版本,支持单个文件和多个文件上传。 6 anos atrás
  huangbo 9671698074 全局异常优化:自定义异常和默认异常处理 6 anos atrás
  huangbo 0b845642d9 增加校验切面、权限切面、事务切面三类切面 6 anos atrás
  huangbo 47541f520d JMap和IData的自定义适配从配置文件迁移至代码中 6 anos atrás
  huangbo 2c1d65c096 pom的BUG修复、优化 6 anos atrás
  huangbo e0e46714fb comp-example工程初始化 6 anos atrás
  huangbo c6bb31dad1 版本变更:3.1-SNAPSHOT 6 anos atrás
  huangbo 5a81f339f0 层级化打包pom代码优化 6 anos atrás
  huangbo 4a741177e4 重要:jar分离打包方案初始版本 6 anos atrás
  huangbo 1cc0e6568c pom优化微调 6 anos atrás
  huangbo 7da3c05382 pom结构微调 6 anos atrás
  huangbo 5ee7e43074 引入druid组件 6 anos atrás
  huangbo 0191dd254f 新功能改造:Controller参数支持JMap 6 anos atrás
  huangbo fc694e8d6d 代码优化 6 anos atrás
  huangbo b4db991651 优化IpuContextData上下文对象逻辑处理。 6 anos atrás
  huangbo 3b96f790d8 优化:修改配置 6 anos atrás
  huangbo 8414a0f6ad 增加文件上传的代码,还有BUG。 6 anos atrás
  huangbo 473c0ab28e 开发dubbo公用调用方法callCommonService 6 anos atrás
  huangbo 4f5997c2ab 在rest和service体系下增加dubbo调用方式 6 anos atrás
  huangbo 679cbfb5f2 IpuController下沉 6 anos atrás
  huangbo c8be4f8358 提交lic 6 anos atrás
  huangbo 1fca57cd0e IPU服务端脚手架工程初始化 6 anos atrás
  huangbo fcbb66cac1 优化 6 anos atrás
  huangbo 0b5e503a7c 增加单表增删改查的范例 6 anos atrás
  miaozy 6b9ee44935 删除aif内网私服地址 7 anos atrás
  miaozy 9a92966bb5 引入msgframe相关依赖包 7 anos atrás
  huangbo 7039cfabfd 引入msgframe的Demo使用案例 7 anos atrás
  huangbo 9ece25612e 区分context-path、servlet-path、static-path,正确配置拦截器的exclude-mapping 7 anos atrás
  huangbo 0e0419f4c0 Merge branch 'master' of http://10.1.235.20:3000/rest/rest-guide.git 7 anos atrás
  huangbo 2feb78415e 引入IPU MCV体系。结合spring mvc共同使用。 7 anos atrás
  huangbo ed04772fc2 在过滤器上增加数据库统一管理的功能和对应案例 7 anos atrás
  huangbo 27d4ec055f 日志适配、增加线程栈自动dump配置、增加Netty4通信 7 anos atrás
  huangbo afcceedf91 增加令牌验证案例 7 anos atrás
  huangbo 277716a327 增加延迟连接lazy="true" 7 anos atrás
  huangbo 8f10c195a5 增加服务并发控制案例 7 anos atrás
  huangbo ebfd0f4f5e 增加服务伪装案例 7 anos atrás
  huangbo 67375cbb3d 增加服务存根案例 7 anos atrás
  huangbo 0a4bf8791b 修复dubbo的oninvoke属性bug,修改了DubboBeanDefinitionParser和FutureFilter 7 anos atrás
  huangbo 4e73226787 dubbo拦截器案例 7 anos atrás
  huangbo a31e945d23 dubbo拦截器案例 7 anos atrás
  huangbo 80381cd8a0 服务回调案例 7 anos atrás
  huangbo 0189ab35d2 案例工程代码结构调整 7 anos atrás
  huangbo 0f4c2f4104 修改readme,说明ipu license。 7 anos atrás
  huangbo 1f85105a33 提交ipu license证书 7 anos atrás
  huangbo 7ae2e05013 固定的数据接口返回格式:返回信息、返回编码、返回异常。 7 anos atrás
  huangbo 509c3cca74 pom优化,正式和测试环境优化。 7 anos atrás
  huangbo 9096fb8b98 开发、测试、正式环境的区分 7 anos atrás
  huangbo e17c2812aa 代码调整和优化 7 anos atrás
  huangbo 3c08dfb933 rest向导工程初始化 7 anos atrás
  rest 6d6df65ecb initial commit 7 anos atrás

+ 13 - 1
comp-example/pom.xml

@ -21,6 +21,7 @@
21 21
		<ipu>3.1-SNAPSHOT</ipu>
22 22
		<jdk>1.8</jdk>
23 23
		<slf4j-api>1.7.16</slf4j-api>
24
		<spark>2.4.1</spark>
24 25
		<startClass>com.ai.ipu.example.spark.SparkExample</startClass>
25 26
	</properties>
26 27
@ -94,7 +95,7 @@
94 95
		<dependency>
95 96
			<groupId>org.apache.spark</groupId>
96 97
			<artifactId>spark-core_2.11</artifactId>
97
			<version>2.4.1</version>
98
			<version>${spark}</version>
98 99
		</dependency>
99 100
100 101
		<dependency>
@ -110,6 +111,17 @@
110 111
			<version>2.9.1</version>
111 112
		</dependency>
112 113
114
		<dependency>
115
			<groupId>org.apache.spark</groupId>
116
			<artifactId>spark-streaming_2.11</artifactId>
117
			<version>${spark}</version>
118
		</dependency>
119
120
		<dependency>
121
			<groupId>org.apache.spark</groupId>
122
			<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
123
			<version>${spark}</version>
124
		</dependency>
113 125
	</dependencies>
114 126
115 127
	<build>

+ 209 - 0
comp-example/src/main/java/com/ai/ipu/example/spark/JavaRecoverableNetworkWordCountTest.java

@ -0,0 +1,209 @@
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one or more
3
 * contributor license agreements.  See the NOTICE file distributed with
4
 * this work for additional information regarding copyright ownership.
5
 * The ASF licenses this file to You under the Apache License, Version 2.0
6
 * (the "License"); you may not use this file except in compliance with
7
 * the License.  You may obtain a copy of the License at
8
 *
9
 *    http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 */
17
package com.ai.ipu.example.spark;
18
19
import com.google.common.io.Files;
20
import org.apache.spark.SparkConf;
21
import org.apache.spark.api.java.JavaSparkContext;
22
import org.apache.spark.api.java.function.Function0;
23
import org.apache.spark.broadcast.Broadcast;
24
import org.apache.spark.streaming.Durations;
25
import org.apache.spark.streaming.api.java.JavaDStream;
26
import org.apache.spark.streaming.api.java.JavaPairDStream;
27
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
28
import org.apache.spark.streaming.api.java.JavaStreamingContext;
29
import org.apache.spark.util.LongAccumulator;
30
import scala.Tuple2;
31
32
import java.io.File;
33
import java.nio.charset.Charset;
34
import java.util.Arrays;
35
import java.util.List;
36
import java.util.regex.Pattern;
37
38
/**
39
 * Use this singleton to get or register a Broadcast variable.
40
 */
41
class JavaWordBlacklist {
42
43
    private static volatile Broadcast<List<String>> instance = null;
44
45
    public static Broadcast<List<String>> getInstance(JavaSparkContext jsc) {
46
        if (instance == null) {
47
            synchronized (JavaWordBlacklist.class) {
48
                if (instance == null) {
49
                    List<String> wordBlacklist = Arrays.asList("a", "b", "c");
50
                    instance = jsc.broadcast(wordBlacklist);
51
                }
52
            }
53
        }
54
        return instance;
55
    }
56
}
57
58
/**
59
 * Use this singleton to get or register an Accumulator.
60
 */
61
class JavaDroppedWordsCounter {
62
63
    private static volatile LongAccumulator instance = null;
64
65
    public static LongAccumulator getInstance(JavaSparkContext jsc) {
66
        if (instance == null) {
67
            synchronized (JavaDroppedWordsCounter.class) {
68
                if (instance == null) {
69
                    instance = jsc.sc().longAccumulator("WordsInBlacklistCounter");
70
                }
71
            }
72
        }
73
        return instance;
74
    }
75
}
76
77
/**
78
 * Counts words in text encoded with UTF8 received from the network every second. This example also
79
 * shows how to use lazily instantiated singleton instances for Accumulator and Broadcast so that
80
 * they can be registered on driver failures.
81
 * <p>
82
 * Usage: JavaRecoverableNetworkWordCount <hostname> <port> <checkpoint-directory> <output-file>
83
 * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive
84
 * data. <checkpoint-directory> directory to HDFS-compatible file system which checkpoint data
85
 * <output-file> file to which the word counts will be appended
86
 * <p>
87
 * <checkpoint-directory> and <output-file> must be absolute paths
88
 * <p>
89
 * To run this on your local machine, you need to first run a Netcat server
90
 * <p>
91
 * `$ nc -lk 9999`
92
 * <p>
93
 * and run the example as
94
 * <p>
95
 * `$ ./bin/run-example org.apache.spark.examples.streaming.JavaRecoverableNetworkWordCount \
96
 * localhost 9999 ~/checkpoint/ ~/out`
97
 * <p>
98
 * If the directory ~/checkpoint/ does not exist (e.g. running for the first time), it will create
99
 * a new StreamingContext (will print "Creating new context" to the console). Otherwise, if
100
 * checkpoint data exists in ~/checkpoint/, then it will create StreamingContext from
101
 * the checkpoint data.
102
 * <p>
103
 * Refer to the online documentation for more details.
104
 *
105
 * Spark广播变量 累加器 可恢复网络计数测试
106
 *
107
 * @author lilb3@asiainfo.com
108
 * @since 2019-05-24 17:10
109
 * 普通打包部署 需要上传原包和依赖包 如框架日志ipu-basic-3.1-SNAPSHOT.jar, 多个jar以逗号隔开
110
 * 删除CheckPoint文件:
111
 * ~/shell/hadoop/9000/conn_hadoop.sh fs -rm -R hdfs://iZm5e5xe1w25avi0io1f5aZ:9000/checkpoint/JavaRecoverableNetworkWordCountTest
112
 * 查看CheckPoint文件:
113
 * ~/shell/hadoop/9000/conn_hadoop.sh fs -ls -R hdfs://iZm5e5xe1w25avi0io1f5aZ:9000/checkpoint/JavaRecoverableNetworkWordCountTest
114
 * 开启监听:
115
 * nc -lk iZm5e5xe1w25avi0io1f5aZ 9100
116
 * 执行程序:
117
 * ~/software/spark-2.4.1-bin-hadoop2.7/bin/spark-submit --jars /home/mysql/test/ipu-basic-3.1-SNAPSHOT.jar --class com.ai.ipu.example.spark.JavaRecoverableNetworkWordCountTest /home/mysql/test/comp-example-1.0.jar.original iZm5e5xe1w25avi0io1f5aZ 9100 hdfs://iZm5e5xe1w25avi0io1f5aZ:9000/checkpoint/JavaRecoverableNetworkWordCountTest/ ~/logs/spark/JavaRecoverableNetworkWordCountTest.txt
118
 * 查看结果:
119
 * cat ~/logs/spark/JavaRecoverableNetworkWordCountTest.txt
120
 **/
121
public final class JavaRecoverableNetworkWordCountTest {
122
    private static final Pattern SPACE = Pattern.compile(" ");
123
124
    private static JavaStreamingContext createContext(String ip,
125
                                                      int port,
126
                                                      String checkpointDirectory,
127
                                                      String outputPath) {
128
129
        // If you do not see this printed, that means the StreamingContext has been loaded
130
        // from the new checkpoint
131
        System.out.println("Creating new context");
132
        File outputFile = new File(outputPath);
133
        if (outputFile.exists()) {
134
            outputFile.delete();
135
        }
136
        SparkConf sparkConf = new SparkConf().setAppName("JavaRecoverableNetworkWordCount");
137
        // Create the context with a 1 second batch size
138
        JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(30));
139
        ssc.checkpoint(checkpointDirectory);
140
141
        // Create a socket stream on target ip:port and count the
142
        // words in input stream of \n delimited text (eg. generated by 'nc')
143
        JavaReceiverInputDStream<String> lines = ssc.socketTextStream(ip, port);
144
        JavaDStream<String> words = lines.flatMap(x -> {
145
            System.out.println("flatMap call: " + x);
146
            return Arrays.asList(SPACE.split(x)).iterator();
147
        });
148
        JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1))
149
                .reduceByKey((i1, i2) -> i1 + i2);
150
151
        wordCounts.foreachRDD((rdd, time) -> {
152
            // Get or register the blacklist Broadcast
153
            Broadcast<List<String>> blacklist =
154
                    JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context()));
155
            // Get or register the droppedWordsCounter Accumulator
156
            LongAccumulator droppedWordsCounter =
157
                    JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context()));
158
            // Use blacklist to drop words and use droppedWordsCounter to count them
159
            String counts = rdd.filter(wordCount -> {
160
                System.out.println("foreachRDD call: " + wordCount._1() + ", " + wordCount._2());
161
                if (blacklist.value().contains(wordCount._1())) {
162
                    droppedWordsCounter.add(wordCount._2());
163
                    return false;
164
                } else {
165
                    return true;
166
                }
167
            }).collect().toString();
168
            String output = "Counts at time " + time + " " + counts;
169
            System.out.println(output);
170
            System.out.println("Dropped " + droppedWordsCounter.value() + " word(s) totally");
171
            // droppedWordsCounter.
172
            System.out.println("Appending to " + outputFile.getAbsolutePath());
173
            Files.append(output + "\n", outputFile, Charset.defaultCharset());
174
        });
175
176
        return ssc;
177
    }
178
179
    public static void main(String[] args) throws Exception {
180
        if (args.length != 4) {
181
            System.err.println("You arguments were " + Arrays.asList(args));
182
            System.err.println(
183
                    "Usage: JavaRecoverableNetworkWordCount <hostname> <port> <checkpoint-directory>\n" +
184
                            "     <output-file>. <hostname> and <port> describe the TCP server that Spark\n" +
185
                            "     Streaming would connect to receive data. <checkpoint-directory> directory to\n" +
186
                            "     HDFS-compatible file system which checkpoint data <output-file> file to which\n" +
187
                            "     the word counts will be appended\n" +
188
                            "\n" +
189
                            "In local mode, <master> should be 'local[n]' with n > 1\n" +
190
                            "Both <checkpoint-directory> and <output-file> must be absolute paths");
191
            System.exit(1);
192
        }
193
194
        String ip = args[0];
195
        int port = Integer.parseInt(args[1]);
196
        String checkpointDirectory = args[2];
197
        String outputPath = args[3];
198
199
        // Function to create JavaStreamingContext without any output operations
200
        // (used to detect the new context)
201
        Function0<JavaStreamingContext> createContextFunc =
202
                () -> createContext(ip, port, checkpointDirectory, outputPath);
203
204
        JavaStreamingContext ssc =
205
                JavaStreamingContext.getOrCreate(checkpointDirectory, createContextFunc);
206
        ssc.start();
207
        ssc.awaitTermination();
208
    }
209
}

+ 49 - 0
comp-example/src/main/java/com/ai/ipu/example/spark/RemoteFileSparkTest.java

@ -0,0 +1,49 @@
1
package com.ai.ipu.example.spark;
2
3
import com.ai.ipu.basic.log.ILogger;
4
import com.ai.ipu.basic.log.IpuLoggerFactory;
5
import org.apache.spark.SparkConf;
6
import org.apache.spark.api.java.JavaRDD;
7
import org.apache.spark.api.java.JavaSparkContext;
8
import org.apache.spark.api.java.function.Function;
9
10
/**
11
 * Spark统计包含a b的单词个数示例
12
 *
13
 * @author lilb3@asiainfo.com
14
 * @since 2019-05-24 17:10
15
 * 普通打包部署 需要上传原包和依赖包 如框架日志ipu-basic-3.1-SNAPSHOT.jar, 多个jar以逗号隔开
16
 * ~/software/spark-2.4.1-bin-hadoop2.7/bin/spark-submit --jars /home/mysql/test/ipu-basic-3.1-SNAPSHOT.jar --class com.ai.ipu.example.spark.RemoteFileSparkTest /home/mysql/test/comp-example-1.0.jar.original
17
 **/
18
public class RemoteFileSparkTest {
19
    private static final ILogger LOGGER = IpuLoggerFactory.createLogger(RemoteFileSparkTest.class);
20
21
    public static void main(String[] args) {
22
        JavaSparkContext jsc = null;
23
        try {
24
            SparkConf conf = new SparkConf().setMaster(SparkConfig.getSparkMaster()).setAppName("RemoteFileSparkTest");
25
26
            jsc = new JavaSparkContext(conf);
27
28
            String logFile = SparkConfig.getHdfsUri() + "/aaa.txt";
29
            JavaRDD<String> jddData = jsc.textFile(logFile).cache();
30
31
            for (String str : jddData.collect()) {
32
                System.out.println("jddData: " + str);
33
            }
34
            jddData.saveAsTextFile(SparkConfig.getHdfsUri() + "/test/" + System.currentTimeMillis());
35
            long numAs = jddData.filter((Function<String, Boolean>) s -> s.contains("1")).count();
36
37
            long numBs = jddData.filter((Function<String, Boolean>) s -> s.contains("2")).count();
38
39
            LOGGER.info("Lines with a: " + numAs + ", lines with b: " + numBs);
40
            jsc.stop();
41
        } catch (Exception e) {
42
            LOGGER.error("Exception" + e);
43
        } finally {
44
            if (null != jsc) {
45
                jsc.close();
46
            }
47
        }
48
    }
49
}

+ 0 - 41
comp-example/src/main/java/com/ai/ipu/example/spark/SimpleApp.java

@ -1,41 +0,0 @@
1
package com.ai.ipu.example.spark;
2
3
import org.apache.spark.SparkConf;
4
import org.apache.spark.api.java.JavaRDD;
5
import org.apache.spark.api.java.JavaSparkContext;
6
import org.apache.spark.api.java.function.Function;
7
8
/**
9
 * 类描述
10
 *
11
 * @Author: lilb3@asiainfo.com
12
 * @Date: 2019-04-11 21:14
13
 * 部署服务器方式:
14
 * a) SpringBoot打包部署 需要在pom指定startClass为com.ai.ipu.example.spark.SimpleApp
15
 * $SPARK_HOME/bin/spark-submit --class org.springframework.boot.loader.JarLauncher --master local[4] /home/mysql/test/comp-example-1.0.jar iZm5e5xe1w25avi0io1f5aZ
16
 * b) 普通打包部署
17
 * $SPARK_HOME/bin/spark-submit --class com.ai.ipu.example.spark.SimpleApp --master local[4] /home/mysql/test/comp-example-1.0.jar.original iZm5e5xe1w25avi0io1f5aZ
18
 **/
19
public class SimpleApp {
20
    public static void main(String[] args) {
21
        if(args.length < 1){
22
            System.out.println("参数不对" + "第一个参数是hadoop所在的主机ip");
23
            return;
24
        }
25
        String host = args[0];//iZm5e5xe1w25avi0io1f5aZ
26
        String logFile = "hdfs://"+ host +":9000/ccc.txt";
27
        SparkConf conf = new SparkConf().setAppName("Simple Application");
28
        // conf.setJars(new String[] {"D:\\ideaws\\rest\\rest-guide\\comp-example\\target\\comp-example-1.0.jar.original"});
29
        JavaSparkContext sc = new JavaSparkContext(conf);
30
        JavaRDD<String> logData = sc.textFile(logFile).cache();
31
32
        long numAs = logData.filter((Function<String, Boolean>) s -> s.contains("1")).count();
33
34
        long numBs = logData.filter((Function<String, Boolean>) s -> s.contains("2")).count();
35
36
        System.out.println("============================================================");
37
        System.out.println("Lines with 1: " + numAs + ", lines with 2: " + numBs);
38
        System.out.println("============================================================");
39
        sc.stop();
40
    }
41
}

+ 40 - 0
comp-example/src/main/java/com/ai/ipu/example/spark/SparkConfig.java

@ -0,0 +1,40 @@
1
package com.ai.ipu.example.spark;
2
3
import com.ai.ipu.basic.file.ResourceBundleUtil;
4
5
/**
6
 * 类描述
7
 *
8
 * @author lilb3@asiainfo.com
9
 * @since 2019-07-01 15:36
10
 **/
11
public class SparkConfig {
12
    private static final String CONFIG_FILE_PATH = "spark";
13
    /*Spark集群*/
14
    private static String sparkMaster;
15
    /*Hadoop地址*/
16
    private static String hdfsUri;
17
    /*测试文件*/
18
    private static String testFileName;
19
20
    public static String getSparkMaster() {
21
        return sparkMaster;
22
    }
23
24
    public static String getHdfsUri() {
25
        return hdfsUri;
26
    }
27
28
    public static String getTestFileName() {
29
        return testFileName;
30
    }
31
32
    /*加载配置文件*/
33
    static {
34
        try {
35
            ResourceBundleUtil.initialize(CONFIG_FILE_PATH, SparkConfig.class);
36
        } catch (Exception e) {
37
            System.out.println(CONFIG_FILE_PATH + "配置文件读取失败" + e);
38
        }
39
    }
40
}

+ 8 - 13
comp-example/src/main/java/com/ai/ipu/example/spark/SparkExample.java

@ -19,23 +19,18 @@ import java.util.List;
19 19
/**
20 20
 * Spark统计单词个数示例
21 21
 *
22
 * @Author: lilb3@asiainfo.com
23
 * @Date: 2019-05-24 17:10
22
 * @author lilb3@asiainfo.com
23
 * @since 2019-05-24 17:10
24 24
 * 部署服务器方式:
25 25
 * a) SpringBoot打包部署 需要在pom指定startClass为com.ai.ipu.example.spark.SparkExample
26
 * $SPARK_HOME/bin/spark-submit --class org.springframework.boot.loader.JarLauncher --master local[4] /home/mysql/test/comp-example-1.0.jar iZm5e5xe1w25avi0io1f5aZ
26
 * ~/software/spark-2.4.1-bin-hadoop2.7/bin/spark-submit --class org.springframework.boot.loader.JarLauncher /home/mysql/test/comp-example-1.0.jar
27 27
 * b) 普通打包部署 需要上传原包和依赖包 如框架日志ipu-basic-3.1-SNAPSHOT.jar, 多个jar以逗号隔开
28
 * $SPARK_HOME/bin/spark-submit --class com.ai.ipu.example.spark.SparkExample --master local[4] --jars /home/mysql/test/ipu-basic-3.1-SNAPSHOT.jar /home/mysql/test/comp-example-1.0.jar.original iZm5e5xe1w25avi0io1f5aZ
28
 * ~/software/spark-2.4.1-bin-hadoop2.7/bin/spark-submit --jars /home/mysql/test/ipu-basic-3.1-SNAPSHOT.jar --class com.ai.ipu.example.spark.SparkExample /home/mysql/test/comp-example-1.0.jar.original
29 29
 **/
30 30
public class SparkExample {
31 31
    private static final ILogger LOGGER = IpuLoggerFactory.createLogger(SparkExample.class);
32 32
33 33
    public static void main(String[] args) {
34
        if(args.length < 1){
35
            System.out.println("参数不对" + "第一个参数是hadoop所在的主机ip");
36
            return;
37
        }
38
        String host = args[0];
39 34
        /*
40 35
         * 对于所有的spark程序所言,要进行所有的操作,首先要创建一个spark上下文。
41 36
         * 在创建上下文的过程中,程序会向集群申请资源及构建相应的运行环境。
@ -43,7 +38,8 @@ public class SparkExample {
43 38
         * 创建的 sparkContext 唯一需要的参数就是 sparkConf,它是一组 K-V 属性对。
44 39
         */
45 40
        SparkConf sparkConf = new SparkConf().setAppName("SparkExample");
46
        // sparkConf .setJars(new String[]{"D:\\ideaws\\rest\\rest-guide\\comp-example\\target\\comp-example-1.0.jar.original"});
41
        /*java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.f$3 of type org.apache.spark.api.java.function.FlatMapFunction in instance of org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1*/
42
        //sparkConf.setJars(new String[]{SparkConfig.getJarFilePath()});
47 43
48 44
        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
49 45
@ -56,7 +52,7 @@ public class SparkExample {
56 52
        }
57 53
58 54
        // 从hdfs读取文件形成RDD
59
        String textFilePath = "hdfs://" + host + ":9000/aaa.txt";
55
        String textFilePath = SparkConfig.getHdfsUri() + "/" + SparkConfig.getTestFileName();
60 56
        /*
61 57
         * 利用textFile接口从文件系统中读入指定的文件,返回一个RDD实例对象。
62 58
         * RDD的初始创建都是由SparkContext来负责的,将内存中的集合或者外部文件系统作为输入源。
@ -114,8 +110,7 @@ public class SparkExample {
114 110
        });
115 111
116 112
        // 结果输出到HDFS,Windows对应的是真实目录,如:D:/logs/spark_test/1555319746196,父目录必须存在,否则不执行
117
        counts.saveAsTextFile(System.getProperty("USER_HOME") + System.lineSeparator()+ System.currentTimeMillis());
118
113
        counts.saveAsTextFile(SparkConfig.getHdfsUri() + "/spark/" + System.currentTimeMillis());
119 114
120 115
        /*
121 116
         * 结果转化为常见类型输出

+ 96 - 0
comp-example/src/main/java/com/ai/ipu/example/spark/SparkReadHbaseTest.java

@ -0,0 +1,96 @@
1
package com.ai.ipu.example.spark;
2
3
import org.apache.hadoop.conf.Configuration;
4
import org.apache.hadoop.hbase.HBaseConfiguration;
5
import org.apache.hadoop.hbase.client.Result;
6
import org.apache.hadoop.hbase.client.Scan;
7
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
8
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
9
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
10
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
11
import org.apache.hadoop.hbase.util.Base64;
12
import org.apache.hadoop.hbase.util.Bytes;
13
import org.apache.spark.SparkConf;
14
import org.apache.spark.api.java.JavaPairRDD;
15
import org.apache.spark.api.java.JavaSparkContext;
16
import org.apache.spark.api.java.function.PairFunction;
17
import scala.Tuple2;
18
19
import java.io.IOException;
20
import java.util.ArrayList;
21
import java.util.Arrays;
22
import java.util.List;
23
24
/**
25
 * Spark读取Hbase的数据
26
 *
27
 * @author lilb3@asiainfo.com
28
 * @since 2019-05-24 17:10
29
 * 普通打包部署
30
 * 连接Hbase:
31
 * ~/shell/hbase_distribution/16000/conn_hbase.sh
32
 * 表不存在就创建表:
33
 * create 'mytable', 'cf'
34
 * 添加两条记录:
35
 * put 'mytable', '1', 'cf:msg', 'Hello World'
36
 * put 'mytable', '2', 'cf:blog', 'Hello Tester'
37
 * 退出执行命令:
38
 * ctrl+c
39
 * 执行命令:
40
 * ~/software/spark-2.4.1-bin-hadoop2.7/bin/spark-submit --jars /home/mysql/test/ipu-basic-3.1-SNAPSHOT.jar,/home/mysql/test/lib/*.jar --class com.ai.ipu.example.spark.SparkReadHbaseTest /home/mysql/test/comp-example-1.0.jar.original "iZm5e5xe1w25avi0io1f5aZ:2101,iZm5e5xe1w25avi0io1f5aZ:2102,iZm5e5xe1w25avi0io1f5aZ:2103" 0 mytable cf msg
41
 **/
42
public class SparkReadHbaseTest {
43
    public static void main(String[] args) {
44
        if (args.length < 3) {
45
            System.err.println("You arguments were " + Arrays.asList(args));
46
            System.err.println("参数格式:\n" +
47
                    "\t第一个参数:Zookeeper连接信息;\n" +
48
                    "\t第二个参数:Zookeeper连接端口,为0表示不传此参数;\n" +
49
                    "\t第三个参数:HBase表名;\n" +
50
                    "\t第四个参数:列族名;\n" +
51
                    "\t第五个参数:列名。");
52
            System.exit(1);
53
        }
54
        SparkConf sparkConf = new SparkConf();
55
        sparkConf.setMaster(SparkConfig.getSparkMaster()).setAppName("SparkReadHbaseTest");
56
        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
57
58
        Configuration conf = HBaseConfiguration.create();
59
        String hbaseZookeeperQuorum = args[0];
60
        String hbaseZookeeperPropertyClientPort = args[1];
61
        conf.set("hbase.zookeeper.quorum", hbaseZookeeperQuorum);
62
        if (!"0".equals(hbaseZookeeperPropertyClientPort) && hbaseZookeeperPropertyClientPort.matches("\\d+")) {
63
            conf.set("hbase.zookeeper.property.clientPort", hbaseZookeeperPropertyClientPort);
64
        }
65
        Scan scan = new Scan();
66
            scan.addFamily(Bytes.toBytes(args[3]));
67
            scan.addColumn(Bytes.toBytes(args[3]), Bytes.toBytes(args[4]));
68
        String scanToString;
69
        try {
70
            ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
71
            scanToString = Base64.encodeBytes(proto.toByteArray());
72
            String tableName = args[2];
73
            conf.set(TableInputFormat.INPUT_TABLE, tableName);
74
            conf.set(TableInputFormat.SCAN, scanToString);
75
            JavaPairRDD<ImmutableBytesWritable, Result> hBaseRDD = jsc.newAPIHadoopRDD(conf,
76
                    TableInputFormat.class, ImmutableBytesWritable.class,
77
                    Result.class);
78
            JavaPairRDD<String, List<String>> javaPairRDD = hBaseRDD.mapToPair(
79
                    (PairFunction<Tuple2<ImmutableBytesWritable, Result>, String, List<String>>) results -> {
80
                        List<String> list = new ArrayList<>();
81
                        byte[] msg = results._2().getValue(Bytes.toBytes(args[3]), Bytes.toBytes(args[4]));
82
                        list.add(Bytes.toString(msg));
83
                        return new Tuple2<>(Bytes.toString(results._1().get()), list);
84
                    }
85
            );
86
            List<Tuple2<String, List<String>>> list = javaPairRDD.collect();
87
            for (Tuple2<String, List<String>> tuple2 : list) {
88
                System.out.println("javaPairRDD.collect:" + tuple2._1 + " , " + tuple2._2);
89
            }
90
        } catch (IOException io) {
91
            io.printStackTrace();
92
        } catch (Exception e) {
93
            e.printStackTrace();
94
        }
95
    }
96
}

+ 99 - 0
comp-example/src/main/java/com/ai/ipu/example/spark/SparkStreamKafkaCountTest.java

@ -0,0 +1,99 @@
1
package com.ai.ipu.example.spark;
2
3
import org.apache.kafka.clients.consumer.ConsumerRecord;
4
import org.apache.kafka.common.serialization.StringDeserializer;
5
import org.apache.spark.SparkConf;
6
import org.apache.spark.api.java.function.FlatMapFunction;
7
import org.apache.spark.api.java.function.Function2;
8
import org.apache.spark.api.java.function.PairFunction;
9
import org.apache.spark.streaming.Duration;
10
import org.apache.spark.streaming.api.java.JavaDStream;
11
import org.apache.spark.streaming.api.java.JavaInputDStream;
12
import org.apache.spark.streaming.api.java.JavaPairDStream;
13
import org.apache.spark.streaming.api.java.JavaStreamingContext;
14
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
15
import org.apache.spark.streaming.kafka010.KafkaUtils;
16
import org.apache.spark.streaming.kafka010.LocationStrategies;
17
import scala.Tuple2;
18
19
import java.util.ArrayList;
20
import java.util.Arrays;
21
import java.util.Collection;
22
import java.util.HashMap;
23
import java.util.HashSet;
24
import java.util.List;
25
26
/**
27
 * Spark Stream 从Kafka读取数据
28
 *
29
 * @author lilb3@asiainfo.com
30
 * @since 2019-05-24 17:10
31
 * 普通打包部署
32
 * 连接Kafka:
33
 * ~/software/kafka_2.11-2.0.0/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
34
 * 执行命令:
35
 * ~/software/spark-2.4.1-bin-hadoop2.7/bin/spark-submit --jars /home/mysql/test/ipu-basic-3.1-SNAPSHOT.jar,/home/mysql/test/lib/*.jar --class com.ai.ipu.example.spark.SparkStreamKafkaCountTest /home/mysql/test/comp-example-1.0.jar.original hdfs://iZm5e5xe1w25avi0io1f5aZ:9000/checkpoint/SparkStreamKafkaCountTest/ test "iZm5e5xe1w25avi0io1f5aZ:9091,iZm5e5xe1w25avi0io1f5aZ:9092,iZm5e5xe1w25avi0io1f5aZ:9093"
36
 * 输入测试数据:
37
 * ------随便输入------
38
 **/
39
public class SparkStreamKafkaCountTest {
40
41
    public static void main(String[] args) {
42
        if (args.length < 3) {
43
            System.err.println("You arguments were " + Arrays.asList(args));
44
            System.err.println("参数格式:\n" +
45
                    "\t第一个参数:CheckPoint文件路径;\n" +
46
                    "\t第二个参数:topic kafka主题;\n" +
47
                    "\t第三个参数:Kafka的连接信息,ip:port。");
48
            System.exit(1);
49
        }
50
        SparkConf sparkConf = new SparkConf().setAppName("SparkStreamKafkaCountTest").setMaster(SparkConfig.getSparkMaster());
51
        JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(10000));
52
        jssc.checkpoint(args[0]);
53
        //存放话题跟分片的映射关系
54
        String[] topicsArr = new String[]{args[1]};
55
        Collection<String> topicSet = new HashSet<>(Arrays.asList(topicsArr));
56
        HashMap kafkaParams = new HashMap<>();
57
        // 构建kafka参数map
58
        // 主要要放置的是连接的kafka集群的地址(broker集群的地址列表)
59
        // Kafka服务监听端口
60
        kafkaParams.put("bootstrap.servers", args[2]);
61
        // 指定kafka输出key的数据类型及编码格式(默认为字符串类型编码格式为uft-8)
62
        kafkaParams.put("key.deserializer", StringDeserializer.class);
63
        // 指定kafka输出value的数据类型及编码格式(默认为字符串类型编码格式为uft-8)
64
        kafkaParams.put("value.deserializer", StringDeserializer.class);
65
        // 消费者ID,随意指定
66
        kafkaParams.put("group.id", "SparkStreamKafkaCountTest");
67
        // 指定从latest(最新,其他版本的是largest这里不行)还是smallest(最早)处开始读取数据
68
        kafkaParams.put("auto.offset.reset", "latest");
69
        // 如果true,consumer定期地往zookeeper写入每个分区的offset
70
        kafkaParams.put("enable.auto.commit", false);
71
        //从Kafka中获取数据转换成RDD
72
        JavaInputDStream<ConsumerRecord<String, String>> lines = KafkaUtils.createDirectStream(
73
                jssc,
74
                LocationStrategies.PreferConsistent(),
75
                ConsumerStrategies.Subscribe(topicSet, kafkaParams)
76
        );
77
        JavaDStream<String> words = lines.flatMap((FlatMapFunction<ConsumerRecord<String, String>, String>) s -> {
78
            List<String> list = new ArrayList<>();
79
            list.add(s.value());
80
            return list.iterator();
81
        });
82
        //对其中的单词进行统计
83
        JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
84
                (PairFunction<String, String, Integer>) s -> {
85
                    System.out.println("mapToPair call: " + s);
86
                    return new Tuple2<>(s, 1);
87
                }).reduceByKey((Function2<Integer, Integer, Integer>) (i1, i2) -> {
88
            System.out.println("reduceByKey call: " + i1 + ", " + i2);
89
            return i1 + i2;
90
        });
91
        wordCounts.print();
92
        jssc.start();
93
        try {
94
            jssc.awaitTermination();
95
        } catch (InterruptedException e) {
96
            e.printStackTrace();
97
        }
98
    }
99
}

+ 122 - 0
comp-example/src/main/java/com/ai/ipu/example/spark/SparkStreamSocketOrFileFolderCountTest.java

@ -0,0 +1,122 @@
1
package com.ai.ipu.example.spark;
2
3
import org.apache.spark.SparkConf;
4
import org.apache.spark.api.java.Optional;
5
import org.apache.spark.api.java.function.FlatMapFunction;
6
import org.apache.spark.api.java.function.Function2;
7
import org.apache.spark.api.java.function.PairFunction;
8
import org.apache.spark.streaming.Durations;
9
import org.apache.spark.streaming.api.java.JavaDStream;
10
import org.apache.spark.streaming.api.java.JavaPairDStream;
11
import org.apache.spark.streaming.api.java.JavaStreamingContext;
12
import scala.Tuple2;
13
14
import java.util.Arrays;
15
import java.util.List;
16
17
/**
18
 * Spark Stream 从HDSF系统或者Socket读取数据,并统计单词出现次数   累计出现次数
19
 *
20
 * @author lilb3@asiainfo.com
21
 * @since 2019-05-24 17:10
22
 * 普通打包部署
23
 * ~/shell/hadoop/9000/conn_hadoop.sh fs -rm -R hdfs://iZm5e5xe1w25avi0io1f5aZ:9000/checkpoint/SparkStreamSocketCountTest/
24
 * Socket:
25
 * 开启监听:
26
 * nc -lk iZm5e5xe1w25avi0io1f5aZ 9100
27
 * ~/software/spark-2.4.1-bin-hadoop2.7/bin/spark-submit --jars /home/mysql/test/ipu-basic-3.1-SNAPSHOT.jar --class com.ai.ipu.example.spark.SparkStreamSocketOrFileFolderCountTest /home/mysql/test/comp-example-1.0.jar.original hdfs://iZm5e5xe1w25avi0io1f5aZ:9000/checkpoint/SparkStreamSocketCountTest/ 0 iZm5e5xe1w25avi0io1f5aZ:9100 hdfs://iZm5e5xe1w25avi0io1f5aZ:9000/spark/SparkStreamSocketCountTest/ ~/logs/spark/SparkStreamSocketCountTest.txt
28
 *
29
 * ~/shell/hadoop/9000/conn_hadoop.sh fs -rm -R hdfs://iZm5e5xe1w25avi0io1f5aZ:9000/checkpoint/SparkStreamFileFolderCountTest/
30
 * 文件夹:
31
 * 创建备用文件:
32
 * echo "a aa aaa b bb bbb c cc ccc d dd ddd e ee eee f ff fff g gg ggg" > ~/test1.txt
33
 * echo "1 11 111 2 22 222 3 33 333 4 44 444 5 55 555 6 66 666 7 77 777" > ~/test2.txt
34
 * echo "~ ~~ ~~~ ! !! !!! @ @@ @@@ # ## ### $ $$ $$$ % %% %%% ^ ^^ ^^^" > ~/test3.txt
35
 * 执行命令:
36
 * ~/software/spark-2.4.1-bin-hadoop2.7/bin/spark-submit --jars /home/mysql/test/ipu-basic-3.1-SNAPSHOT.jar --class com.ai.ipu.example.spark.SparkStreamSocketOrFileFolderCountTest /home/mysql/test/comp-example-1.0.jar.original hdfs://iZm5e5xe1w25avi0io1f5aZ:9000/checkpoint/SparkStreamFileFolderCountTest/ 1 hdfs://iZm5e5xe1w25avi0io1f5aZ:9000/spark/testFileFolder/ hdfs://iZm5e5xe1w25avi0io1f5aZ:9000/spark/SparkStreamFileFolderCountTest/ ~/logs/spark/SparkStreamFileFolderCountTest.txt
37
 * 上传文件做计算:
38
 * ~/shell/hadoop/9000/conn_hadoop.sh fs -mkdir /spark/testFileFolder
39
 * ~/shell/hadoop/9000/conn_hadoop.sh fs -put ~/test1.txt /spark/testFileFolder/test10.txt
40
 * ~/shell/hadoop/9000/conn_hadoop.sh fs -put ~/test1.txt /spark/testFileFolder/test11.txt
41
 * ~/shell/hadoop/9000/conn_hadoop.sh fs -put ~/test2.txt /spark/testFileFolder/test20.txt
42
 * ~/shell/hadoop/9000/conn_hadoop.sh fs -put ~/test2.txt /spark/testFileFolder/test21.txt
43
 * ~/shell/hadoop/9000/conn_hadoop.sh fs -put ~/test2.txt /spark/testFileFolder/test22.txt
44
 * ~/shell/hadoop/9000/conn_hadoop.sh fs -put ~/test3.txt /spark/testFileFolder/test30.txt
45
 * ~/shell/hadoop/9000/conn_hadoop.sh fs -put ~/test3.txt /spark/testFileFolder/test31.txt
46
 * ~/shell/hadoop/9000/conn_hadoop.sh fs -put ~/test3.txt /spark/testFileFolder/test32.txt
47
 * ~/shell/hadoop/9000/conn_hadoop.sh fs -put ~/test3.txt /spark/testFileFolder/test33.txt
48
 **/
49
public class SparkStreamSocketOrFileFolderCountTest {
50
    public static void main(String[] args) {
51
        if(args.length < 5){
52
            System.err.println("You arguments were " + Arrays.asList(args));
53
            System.err.println("参数格式:\n" +
54
                    "\t第一个参数:CheckPoint文件路径;\n" +
55
                    "\t第二个参数:计数类型 0表示Socket,1表示文件系统;\n" +
56
                    "\t第三个参数:Socket的ip:port 或者 需要统计的文件夹Path;\n" +
57
                    "\t第四个参数:执行结果保存到hdfs的目录;\n" +
58
                    "\t第五个参数:执行结果保存到本地的目录。");
59
            System.exit(1);
60
        }
61
        /*创建一个本地StreamingContext两个工作线程和批间隔1秒(原作者是间隔一秒,产生的速度太快了,我这里改为30秒,
62
        还有就是产生了在将处理结果存放的目录下产生了大量的小文件,这样在生产环境中肯定是不行的,
63
        我感觉应该是按文件的大小来产生而不应该是按时间间隔产生)*/
64
        SparkConf conf = new SparkConf().setMaster(SparkConfig.getSparkMaster()).setAppName("SparkStreamSocketOrFileFolderCountTest").set("spark.testing.memory", "2147480000");
65
        JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(30));
66
67
        jssc.checkpoint(args[0]);
68
        //下面两行代码是数据来源:第一行是通过socketTextStream套接字,第二行是直接通过hdfs上的某个文件目录来作为输入数据源
69
        JavaDStream<String> words;
70
        if("0".equals(args[1])){
71
            // 在u0终端上面执行 nc -lk 9999
72
            if(!args[2].matches("\\w+\\:\\d+")){
73
                System.out.println("第三个参数不对,格式为Socket的ip:port");
74
                System.exit(1);
75
            }
76
            words = jssc.socketTextStream(args[2].split(":")[0], Integer.parseInt(args[2].split(":")[1])).flatMap((FlatMapFunction<String, String>) x -> {
77
                System.out.println(Arrays.asList(x.split(" ")).get(0));
78
                return Arrays.asList(x.split(" ")).iterator();
79
            });
80
        } else {
81
            words = jssc.textFileStream(args[2]).flatMap((FlatMapFunction<String, String>) x -> {
82
                System.out.println(Arrays.asList(x.split(" ")).get(0));
83
                return Arrays.asList(x.split(" ")).iterator();
84
            });
85
        }
86
        // Count each word in each batch
87
        JavaPairDStream<String, Integer> pairs = words.mapToPair((PairFunction<String, String, Integer>) s -> new Tuple2<>(s, 1));
88
89
        // 使用updateStateByKey来更新状态
90
        // 参数valueList:相当于这个batch,这个key新的值,可能有多个,比如(hadoop,1)(hadoop,1)传入的可能是(1,1)
91
        // 参数oldState:就是指这个key之前的状态
92
        JavaPairDStream<String, Integer> stateDStream = pairs.updateStateByKey((Function2<List<Integer>, Optional<Integer>, Optional<Integer>>) (valueList, oldState) -> {
93
            Integer newState = 0;
94
            // 如果oldState之前已经存在,那么这个key可能之前已经被统计过,否则说明这个key第一次出现
95
            if (oldState.isPresent()) {
96
                newState = oldState.get();
97
            }
98
99
            // 更新state
100
            for (Integer value : valueList) {
101
                newState += value;
102
            }
103
            return Optional.of(newState);
104
        });
105
106
        // 打印前十的元素每个抽样生成DStream到控制台
107
        stateDStream.print();
108
        // 将处理结果保存在hdfs中
109
        stateDStream.dstream().saveAsTextFiles(args[3], System.currentTimeMillis() + "_count");
110
        // 将处理结果保存在Linux本地中
111
        stateDStream.dstream().saveAsTextFiles(args[4], "test");
112
113
        jssc.start();
114
        try {
115
            // Wait for the computation to terminate
116
            jssc.awaitTermination();
117
        } catch (InterruptedException e) {
118
            e.printStackTrace();
119
        }
120
    }
121
122
}

+ 8 - 0
comp-example/src/main/resources/log4j.properties

@ -0,0 +1,8 @@
1
log4j.rootLogger=error, console
2
log4j.logger.org.apache.zookeeper=error
3
log4j.logger.com.ai.ipu.example=debug
4
5
log4j.appender.console=org.apache.log4j.ConsoleAppender
6
log4j.appender.console.target=System.out
7
log4j.appender.console.layout=org.apache.log4j.PatternLayout
8
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %5p [%t] (%F:%L) - %m%n

+ 6 - 0
comp-example/src/main/resources/spark.properties

@ -0,0 +1,6 @@
1
#Spark集群
2
spark.master=spark://iZm5e5xe1w25avi0io1f5aZ:7077
3
#Hadoop地址
4
hdfs.uri=hdfs://iZm5e5xe1w25avi0io1f5aZ:9000
5
#测试文件
6
test.file.name=aaa.txt

+ 0 - 115
comp-example/src/test/java/com/ai/ipu/example/hadoop/HadoopExample.java

@ -1,115 +0,0 @@
1
package com.ai.ipu.example.hadoop;
2
3
import org.apache.hadoop.conf.Configuration;
4
import org.apache.hadoop.fs.BlockLocation;
5
import org.apache.hadoop.fs.FSDataOutputStream;
6
import org.apache.hadoop.fs.FileStatus;
7
import org.apache.hadoop.fs.FileSystem;
8
import org.apache.hadoop.fs.Path;
9
import org.junit.After;
10
import org.junit.Before;
11
import org.junit.Test;
12
13
import java.net.URI;
14
15
/**
16
 * 类描述
17
 *
18
 * @Author: lilb3@asiainfo.com
19
 * @Date: 2019-05-24 17:10
20
 **/
21
public class HadoopExample {
22
    private FileSystem hdfs = null;
23
24
    @Before
25
    public void setBefore() throws Exception {
26
        Configuration configuration = new Configuration();
27
        configuration.set("dfs.client.use.datanode.hostname", "true");
28
        // String hdfsPath = "hdfs://u0:9000";
29
        String hdfsPath = "hdfs://iZm5e5xe1w25avi0io1f5aZ:9000";
30
        hdfs = FileSystem.get(new URI(hdfsPath), configuration);
31
    }
32
33
    @After
34
    public void setAfter() throws Exception {
35
        if (hdfs != null) {
36
            hdfs.close();
37
        }
38
    }
39
40
    @Test
41
    public void mkdir() throws Exception {
42
        String newDir = "/hdfstest";
43
        boolean result = hdfs.mkdirs(new Path(newDir));
44
        if (result) {
45
            System.out.println("Success!");
46
        } else {
47
            System.out.println("Failed!");
48
        }
49
    }
50
51
    @Test
52
    public void createFile() throws Exception {
53
        String filePath = "/hdfstest/touchfile";
54
        FSDataOutputStream create = hdfs.create(new Path(filePath));
55
        create.writeBytes("abcdefghijklmnopqrstuvwxyz1234567890");
56
        create.close();
57
        System.out.println("Finish!");
58
    }
59
60
    @Test
61
    public void copyFromLocalFile() throws Exception {
62
        String fromLinux = "~/aaa.txt";
63
        String os = System.getProperty("os.name");
64
        if(os.toLowerCase().startsWith("win")){
65
            fromLinux = "D:\\logs\\aaa.txt";
66
        }
67
        String toHDFS = "/hdfstest/";
68
        hdfs.copyFromLocalFile(new Path(fromLinux), new Path(toHDFS));
69
        System.out.println("Finish!");
70
    }
71
72
    @Test
73
    public void copyToLocalFile() throws Exception {
74
        String from_HDFS = "/hdfstest/aaa.txt";
75
        String toLinux = "~/bbb.txt";
76
        String os = System.getProperty("os.name");
77
        if(os.toLowerCase().startsWith("win")){
78
            toLinux = "D:\\logs\\bbb.txt";
79
        }
80
        hdfs.copyToLocalFile(false, new Path(from_HDFS), new Path(toLinux));
81
        System.out.println("Finish!");
82
    }
83
84
    @Test
85
    public void listFile() throws Exception {
86
        iteratorListFile(hdfs, new Path("/"));
87
    }
88
89
    @Test
90
    public void locateFile() throws Exception{
91
        Path file = new Path("/hdfstest/aaa.txt");
92
        FileStatus fileStatus = hdfs.getFileStatus(file);
93
        BlockLocation[] location = hdfs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
94
        for (BlockLocation block : location) {
95
            String[] hosts = block.getHosts();
96
            for (String host : hosts) {
97
                System.out.println("block:" +block + " host:"+ host);
98
            }
99
        }
100
    }
101
102
    private static void iteratorListFile(FileSystem hdfs, Path path) throws Exception {
103
        FileStatus[] files = hdfs.listStatus(path);
104
        for (FileStatus file : files) {
105
            if (file.isDirectory()) {
106
                System.out.println(file.getPermission() + " " + file.getOwner()
107
                        + " " + file.getGroup() + " " + file.getPath());
108
                iteratorListFile(hdfs, file.getPath());
109
            } else if (file.isFile()) {
110
                System.out.println(file.getPermission() + " " + file.getOwner()
111
                        + " " + file.getGroup() + " " + file.getPath());
112
            }
113
        }
114
    }
115
}

+ 0 - 376
comp-example/src/test/java/com/ai/ipu/example/hbase/HbaseExample.java

@ -1,376 +0,0 @@
1
package com.ai.ipu.example.hbase;
2
3
import org.apache.hadoop.conf.Configuration;
4
import org.apache.hadoop.hbase.Cell;
5
import org.apache.hadoop.hbase.CellUtil;
6
import org.apache.hadoop.hbase.HColumnDescriptor;
7
import org.apache.hadoop.hbase.HTableDescriptor;
8
import org.apache.hadoop.hbase.TableName;
9
import org.apache.hadoop.hbase.client.Admin;
10
import org.apache.hadoop.hbase.client.Connection;
11
import org.apache.hadoop.hbase.client.ConnectionFactory;
12
import org.apache.hadoop.hbase.client.Delete;
13
import org.apache.hadoop.hbase.client.Get;
14
import org.apache.hadoop.hbase.client.Put;
15
import org.apache.hadoop.hbase.client.Result;
16
import org.apache.hadoop.hbase.client.ResultScanner;
17
import org.apache.hadoop.hbase.client.Scan;
18
import org.apache.hadoop.hbase.client.Table;
19
import org.apache.hadoop.hbase.filter.CompareFilter;
20
import org.apache.hadoop.hbase.filter.FilterList;
21
import org.apache.hadoop.hbase.filter.PrefixFilter;
22
import org.apache.hadoop.hbase.filter.RegexStringComparator;
23
import org.apache.hadoop.hbase.filter.RowFilter;
24
import org.apache.hadoop.hbase.util.Bytes;
25
import org.junit.After;
26
import org.junit.Before;
27
import org.junit.Test;
28
29
import java.util.ArrayList;
30
import java.util.List;
31
32
public class HbaseExample {
33
    private Connection connection = null;
34
35
    private Table table = null;
36
37
    private Admin admin = null;
38
39
    /**
40
     * 初始化配置
41
     *
42
     * @throws Exception 异常
43
     */
44
    @Before
45
    public void setBefore() throws Exception {
46
        Configuration configuration = new Configuration();
47
        System.setProperty("hadoop.home.dir", "/home/mysql/software/hadoop273");
48
        // configuration.set("hbase.rootdir", "hdfs://u0:9000/home/hbase126");
49
        // configuration.set("hbase.zookeeper.quorum", "u0:2181");
50
        configuration.set("hbase.rootdir", "hdfs://iZm5e5xe1w25avi0io1f5aZ:9000/home/hbase126");
51
        configuration.set("hbase.zookeeper.quorum", "iZm5e5xe1w25avi0io1f5aZ:2101,iZm5e5xe1w25avi0io1f5aZ:2102,iZm5e5xe1w25avi0io1f5aZ:2103");
52
        connection = ConnectionFactory.createConnection(configuration);
53
        admin = connection.getAdmin();
54
    }
55
56
    /**
57
     * 关闭连接
58
     *
59
     * @throws Exception 异常
60
     */
61
    @After
62
    public void setAfter() throws Exception {
63
        if (connection != null) connection.close();
64
    }
65
66
    /**
67
     * 创建表
68
     *
69
     * @throws Exception 异常
70
     */
71
    @Test
72
    public void createTable() throws Exception {
73
        String tableName = "school1";
74
        if (admin.tableExists(TableName.valueOf(tableName))) {
75
            System.out.println("表已经存在");
76
        } else {
77
            HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf(tableName));
78
            //添加列族columnFamily ,不必指定列
79
            hTableDescriptor.addFamily(new HColumnDescriptor("address"));
80
            hTableDescriptor.addFamily(new HColumnDescriptor("more"));
81
            hTableDescriptor.addFamily(new HColumnDescriptor("info"));
82
            admin.createTable(hTableDescriptor);
83
            System.out.println("表" + tableName + "创建成功");
84
        }
85
    }
86
87
    /**
88
     * 查看表和表结构
89
     *
90
     * @throws Exception 异常
91
     */
92
    @Test
93
    public void listTable() throws Exception {
94
        HTableDescriptor[] tableList = admin.listTables();
95
        if (tableList.length > 0) {
96
            for (HTableDescriptor ta : tableList) {
97
                System.out.println(ta.getNameAsString());
98
                for (HColumnDescriptor column :
99
                        ta.getColumnFamilies()) {
100
                    System.out.println("\t" + column.getNameAsString());
101
                }
102
            }
103
        }
104
    }
105
106
    /**
107
     * 添加单条记录
108
     *
109
     * @throws Exception 异常
110
     */
111
    @Test
112
    public void putOne() throws Exception {
113
114
        Table table = connection.getTable(TableName.valueOf("school1"));
115
        //创建 put,并制定 put 的Rowkey
116
        Put put = new Put(Bytes.toBytes("wangwu"));
117
        //byte [] family, byte [] qualifier, byte [] value
118
        put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes("20"));
119
        put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("sex"), Bytes.toBytes("男"));
120
        put.addColumn(Bytes.toBytes("address"), Bytes.toBytes("city"), Bytes.toBytes("changsha"));
121
        put.addColumn(Bytes.toBytes("address"), Bytes.toBytes("address"), Bytes.toBytes("yueyang"));
122
        put.addColumn(Bytes.toBytes("more"), Bytes.toBytes("hobby"), Bytes.toBytes("girl"));
123
        put.addColumn(Bytes.toBytes("more"), Bytes.toBytes("good"), Bytes.toBytes("walk"));
124
125
        table.put(put);
126
    }
127
128
    /**
129
     * 添加多条记录
130
     *
131
     * @throws Exception 异常
132
     */
133
    @Test
134
    public void putList() throws Exception {
135
136
        Table table = connection.getTable(TableName.valueOf("school1"));
137
        //创建 put,并制定 put 的Rowkey
138
        Put put = new Put(Bytes.toBytes("zhangsan"));
139
        //byte [] family, byte [] qualifier, byte [] value
140
        put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes("18"));
141
        put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("sex"), Bytes.toBytes("男"));
142
        put.addColumn(Bytes.toBytes("address"), Bytes.toBytes("city"), Bytes.toBytes("beijing"));
143
        put.addColumn(Bytes.toBytes("address"), Bytes.toBytes("address"), Bytes.toBytes("henan"));
144
        put.addColumn(Bytes.toBytes("more"), Bytes.toBytes("hobby"), Bytes.toBytes("girl"));
145
        put.addColumn(Bytes.toBytes("more"), Bytes.toBytes("good"), Bytes.toBytes("basketball"));
146
147
        Put put1 = new Put(Bytes.toBytes("lisi"));
148
        // family ,qualifier, value  顺序不可乱,列族,列,内容
149
        put1.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes("22"));
150
        put1.addColumn(Bytes.toBytes("info"), Bytes.toBytes("sex"), Bytes.toBytes("女"));
151
        put1.addColumn(Bytes.toBytes("address"), Bytes.toBytes("city"), Bytes.toBytes("nanjing"));
152
        put1.addColumn(Bytes.toBytes("address"), Bytes.toBytes("address"), Bytes.toBytes("hebei"));
153
        put1.addColumn(Bytes.toBytes("more"), Bytes.toBytes("hobby"), Bytes.toBytes("boy"));
154
        put1.addColumn(Bytes.toBytes("more"), Bytes.toBytes("good"), Bytes.toBytes("ball"));
155
156
        List<Put> list = new ArrayList<>();
157
        list.add(put);
158
        list.add(put1);
159
160
        table.put(list);
161
    }
162
163
    /**
164
     * 根据Rowkey进行查询
165
     *
166
     * @throws Exception 异常
167
     */
168
    @Test
169
    public void scanByRowKey() throws Exception {
170
        String rowKey = "zhangsan";
171
        table = connection.getTable(TableName.valueOf("school1"));
172
        Get get = new Get(rowKey.getBytes());
173
174
        Result result = table.get(get);
175
        System.out.println("列族\t列名\t内容");
176
        for (Cell cell : result.rawCells()) {
177
            System.out.println(
178
                    Bytes.toString(CellUtil.cloneRow(cell)) + "\t" +  //Rowkey
179
                            Bytes.toString(CellUtil.cloneFamily(cell)) + "\t" + //CF
180
                            Bytes.toString(CellUtil.cloneQualifier(cell)) + "\t" +//qualifier
181
                            Bytes.toString(CellUtil.cloneValue(cell)) //value
182
            );
183
        }
184
    }
185
186
    /**
187
     * 根据Rowkey和列族和列进行查询内容
188
     *
189
     * @throws Exception 异常
190
     */
191
    @Test
192
    public void scanByRowKey1() throws Exception {
193
        String rowKey = "zhangsan";
194
        table = connection.getTable(TableName.valueOf("school1"));
195
        Get get = new Get(rowKey.getBytes());
196
        get.addColumn(Bytes.toBytes("more"), Bytes.toBytes("good"));
197
198
        Result result = table.get(get);
199
        System.out.println("列族rowKey = \"zhangsan\"列族为info,列为age的值是:");
200
        String age = Bytes.toString(result.getValue(Bytes.toBytes("more"), Bytes.toBytes("good")));
201
        System.out.println(age);
202
    }
203
204
    /**
205
     * 全表查询
206
     *
207
     * @throws Exception 异常
208
     */
209
    @Test
210
    public void scanTable() throws Exception {
211
        table = connection.getTable(TableName.valueOf("school1"));
212
        Scan scan = new Scan();
213
        ResultScanner rs = table.getScanner(scan);
214
        for (Result result : rs) {
215
            printResult(result);
216
        }
217
    }
218
219
    /**
220
     * Scan 查询 添加个条件,Scan 对象参数是  Rowkey
221
     *
222
     * @throws Exception 异常
223
     */
224
    @Test
225
    public void scanTable1() throws Exception {
226
        table = connection.getTable(TableName.valueOf("school1"));
227
        Scan scan = new Scan(Bytes.toBytes("wangwu"), Bytes.toBytes("wangwu"));
228
229
        ResultScanner rs = table.getScanner(scan);
230
        for (Result result : rs) {
231
            printResult(result);
232
        }
233
    }
234
235
    /**
236
     * 定义私有方法 用于打印表的信息
237
     *
238
     * @param result 需要打印的结果
239
     */
240
    private void printResult(Result result) {
241
        for (Cell cell :
242
                result.rawCells()) {
243
            System.out.println(
244
                    //Rowkey
245
                    Bytes.toString(CellUtil.cloneRow(cell)) + "\t" +
246
                            //CF
247
                            Bytes.toString(CellUtil.cloneFamily(cell)) + "\t" +
248
                            //qualifier
249
                            Bytes.toString(CellUtil.cloneQualifier(cell)) + "\t" +
250
                            //value
251
                            Bytes.toString(CellUtil.cloneValue(cell))
252
            );
253
        }
254
        System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
255
    }
256
257
258
    /**
259
     * 条件过滤  Filter
260
     * 查询后最名 为  wu的
261
     * Hbase 中数据是按照字典顺序进行排列的
262
     * 不会因为插入的时间不同而不同
263
     *
264
     * @throws Exception 异常
265
     */
266
    @Test
267
    public void rowFilter() throws Exception {
268
        table = connection.getTable(TableName.valueOf("school1"));
269
        String reg = "^*wu";
270
        org.apache.hadoop.hbase.filter.Filter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator(reg));
271
        Scan scan = new Scan();
272
        scan.setFilter(filter);
273
        ResultScanner results = table.getScanner(scan);
274
        for (Result result :                results) {
275
            printResult(result);
276
        }
277
    }
278
279
    /**
280
     * 前缀名过滤
281
     *
282
     * @throws Exception 异常
283
     */
284
    @Test
285
    public void prefixFilter() throws Exception {
286
        table = connection.getTable(TableName.valueOf("school1"));
287
        org.apache.hadoop.hbase.filter.Filter filter = new PrefixFilter(Bytes.toBytes("li"));
288
        Scan scan = new Scan();
289
        scan.setFilter(filter);
290
        ResultScanner results = table.getScanner(scan);
291
        for (Result result :                results) {
292
            printResult(result);
293
        }
294
    }
295
296
    /**
297
     * 多条件过滤  FilterList
298
     * <p>
299
     * MUST_PASS_ALL 全都满足条件
300
     * <p>
301
     * MUST_PASS_ONE  满足一个就可以
302
     *
303
     * @throws Exception 异常
304
     */
305
    @Test
306
    public void filterList() throws Exception {
307
        table = connection.getTable(TableName.valueOf("school1"));
308
        FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE);
309
        org.apache.hadoop.hbase.filter.Filter filter = new PrefixFilter(Bytes.toBytes("li"));
310
        org.apache.hadoop.hbase.filter.Filter filter2 = new PrefixFilter(Bytes.toBytes("zh"));
311
        filterList.addFilter(filter);
312
        filterList.addFilter(filter2);
313
        Scan scan = new Scan();
314
        scan.setFilter(filterList);
315
        ResultScanner results = table.getScanner(scan);
316
        for (Result result :                results) {
317
            printResult(result);
318
        }
319
    }
320
321
    /**
322
     * 删除数据
323
     *
324
     * @throws Exception 异常
325
     */
326
    @Test
327
    public void delete() throws Exception {
328
        table = connection.getTable(TableName.valueOf("school1"));
329
        Delete delete = new Delete(Bytes.toBytes("lisi"));
330
        table.delete(delete);
331
    }
332
333
    /**
334
     * 删除数据
335
     *
336
     * @throws Exception 异常
337
     */
338
    @Test
339
    public void multiDelete() throws Exception {
340
        table = connection.getTable(TableName.valueOf("school1"));
341
        Delete delete = new Delete(Bytes.toBytes("zhangsan"));
342
        //删除当前版本的内容
343
        // delete.addColumn(Bytes.toBytes("more"),Bytes.toBytes("good"));
344
        //删除所有版本
345
        //delete.addColumns(Bytes.toBytes("more"),Bytes.toBytes("good"));
346
347
        /*
348
         * addColumn addColumns 是不一样的 ,记住了,
349
         *
350
         * addColumn 是删除单元格当前最近版本的 内容,
351
         *
352
         * addColumns 是删除单元格所有版本的内容
353
         * */
354
        //删除列族
355
        delete.addFamily(Bytes.toBytes("info"));
356
357
        table.delete(delete);
358
    }
359
360
    /**
361
     * 删除表
362
     *
363
     * @throws Exception 异常
364
     */
365
    @Test
366
    public void dropTable() throws Exception {
367
        table = connection.getTable(TableName.valueOf("school1"));
368
        if (admin.tableExists(TableName.valueOf("school1"))) {
369
            admin.disableTable(TableName.valueOf("school1"));
370
            admin.deleteTable(TableName.valueOf("school1"));
371
            System.out.println("删除成功");
372
        } else {
373
            System.out.println("表不存在");
374
        }
375
    }
376
}