Browse Source

Merge branch 'master' of http://10.1.235.20:3000/rest/code-example

liutong3 6 years ago
parent
commit
1729fae314

+ 8 - 0
ipu-hadoop-example/aaa.txt

@ -0,0 +1,8 @@
1
111
2
222 333
3
444 555 666
4
777 888
5
999
6
0000000000
7
8

+ 57 - 0
ipu-hadoop-example/pom.xml

@ -0,0 +1,57 @@
1
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
2
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
3
	<modelVersion>4.0.0</modelVersion>
4
5
	<parent>
6
		<groupId>com.ai.ipu</groupId>
7
		<artifactId>ipu-aggregator</artifactId>
8
		<version>3.1-SNAPSHOT</version>
9
	</parent>
10
11
	<groupId>com.ai.ipu.example</groupId>
12
	<artifactId>ipu-hadoop-example</artifactId>
13
	<version>1.0</version>
14
	<packaging>jar</packaging>
15
16
	<name>ipu-hadoop-example</name>
17
	<url>http://maven.apache.org</url>
18
19
	<properties>
20
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
21
		<ipu>3.1-SNAPSHOT</ipu>
22
		<jdk>1.8</jdk>
23
		<junit>4.12</junit>
24
		<hadoop-client>2.7.3</hadoop-client>
25
	</properties>
26
27
	<dependencies>
28
		<dependency>
29
			<groupId>junit</groupId>
30
			<artifactId>junit</artifactId>
31
			<version>${junit}</version>
32
		</dependency>
33
34
		<dependency>
35
			<groupId>org.apache.hadoop</groupId>
36
			<artifactId>hadoop-client</artifactId>
37
			<version>${hadoop-client}</version>
38
		</dependency>
39
40
		<dependency>
41
			<groupId>com.ai.ipu</groupId>
42
			<artifactId>ipu-basic</artifactId>
43
		</dependency>
44
	</dependencies>
45
46
	<build>
47
		<plugins>
48
			<plugin>
49
                <groupId>org.apache.maven.plugins</groupId>
50
                <artifactId>maven-surefire-plugin</artifactId>
51
                <configuration>
52
                    <skip>true</skip>
53
                </configuration>
54
            </plugin>
55
		</plugins>
56
	</build>
57
</project>

+ 67 - 0
ipu-hadoop-example/src/test/java/com/ai/ipu/example/hadoop/HadoopConfig.java

@ -0,0 +1,67 @@
1
package com.ai.ipu.example.hadoop;
2
3
import com.ai.ipu.basic.file.ResourceBundleUtil;
4
import com.ai.ipu.basic.log.ILogger;
5
import com.ai.ipu.basic.log.IpuLoggerFactory;
6
7
/**
8
 * 类描述
9
 *
10
 * @author lilb3@asiainfo.com
11
 * @since 2019-07-01 15:36
12
 **/
13
public class HadoopConfig {
14
    private static final ILogger LOGGER = IpuLoggerFactory.createLogger(HadoopConfig.class);
15
    private static final String CONFIG_FILE_PATH = "hadoop";
16
    /*要连接的Hadoop地址URI, 如hdfs://ip:port*/
17
    private static String hdfsPath;
18
    /*是否支持使用Host Name连接Hadoop, true:支持 false:不支持*/
19
    private static String dfsClientUseDatanodeHostname;
20
    /*测试创建的Dir的名称*/
21
    private static String dirName;
22
    /*测试创建的file的名称, 文件路径为 /mkdir.name/*/
23
    private static String fileName;
24
    /*测试写入文件的字符串*/
25
    private static String writeString;
26
    /*测试上传到hdfs的文件名,本地需要有对应的文件*/
27
    private static String uploadLocalFile;
28
    /*测试下载到本地的文件名*/
29
    private static String downloadLocalFile;
30
31
    public static String getHdfsPath() {
32
        return hdfsPath;
33
    }
34
35
    public static String getDfsClientUseDatanodeHostname() {
36
        return dfsClientUseDatanodeHostname;
37
    }
38
39
    public static String getDirName() {
40
        return dirName;
41
    }
42
43
    public static String getFileName() {
44
        return fileName;
45
    }
46
47
    public static String getWriteString() {
48
        return writeString;
49
    }
50
51
    public static String getUploadLocalFile() {
52
        return uploadLocalFile;
53
    }
54
55
    public static String getDownloadLocalFile() {
56
        return downloadLocalFile;
57
    }
58
59
    /*加载配置文件*/
60
    static {
61
        try {
62
            ResourceBundleUtil.initialize(CONFIG_FILE_PATH, HadoopConfig.class);
63
        } catch (Exception e) {
64
            LOGGER.error(CONFIG_FILE_PATH + "配置文件读取失败", e);
65
        }
66
    }
67
}

+ 108 - 0
ipu-hadoop-example/src/test/java/com/ai/ipu/example/hadoop/HadoopExample.java

@ -0,0 +1,108 @@
1
package com.ai.ipu.example.hadoop;
2
3
import org.apache.hadoop.conf.Configuration;
4
import org.apache.hadoop.fs.BlockLocation;
5
import org.apache.hadoop.fs.FSDataOutputStream;
6
import org.apache.hadoop.fs.FileStatus;
7
import org.apache.hadoop.fs.FileSystem;
8
import org.apache.hadoop.fs.Path;
9
import org.junit.After;
10
import org.junit.Before;
11
import org.junit.Test;
12
13
import java.net.URI;
14
15
/**
16
 * 类描述
17
 *
18
 * @author lilb3@asiainfo.com
19
 * @since 2019-07-01 15:36
20
 **/
21
public class HadoopExample {
22
    private FileSystem hdfs = null;
23
    private static final String HDFS_SPLIT_STRING = "/";
24
25
26
    @Before
27
    public void setBefore() throws Exception {
28
        Configuration configuration = new Configuration();
29
        configuration.set("dfs.client.use.datanode.hostname", HadoopConfig.getDfsClientUseDatanodeHostname());
30
        String hdfsPath = HadoopConfig.getHdfsPath();
31
        hdfs = FileSystem.get(new URI(hdfsPath), configuration);
32
    }
33
34
    @After
35
    public void setAfter() throws Exception {
36
        if (hdfs != null) {
37
            hdfs.close();
38
        }
39
    }
40
41
    @Test
42
    public void mkdir() throws Exception {
43
        String newDir = HDFS_SPLIT_STRING + HadoopConfig.getDirName();
44
        boolean result = hdfs.mkdirs(new Path(newDir));
45
        if (result) {
46
            System.out.println("Success!");
47
        } else {
48
            System.out.println("Failed!");
49
        }
50
    }
51
52
    @Test
53
    public void createFile() throws Exception {
54
        String filePath = HDFS_SPLIT_STRING + HadoopConfig.getDirName() + HDFS_SPLIT_STRING + HadoopConfig.getFileName();
55
        FSDataOutputStream create = hdfs.create(new Path(filePath));
56
        create.writeBytes(HadoopConfig.getWriteString());
57
        create.close();
58
        System.out.println("Finish!");
59
    }
60
61
    @Test
62
    public void copyFromLocalFile() throws Exception {
63
        String localFile = HadoopConfig.getUploadLocalFile();
64
        String toHDFS = HDFS_SPLIT_STRING + HadoopConfig.getDirName() + HDFS_SPLIT_STRING;
65
        hdfs.copyFromLocalFile(new Path(localFile), new Path(toHDFS));
66
        System.out.println("Finish!");
67
    }
68
69
    @Test
70
    public void copyToLocalFile() throws Exception {
71
        String hdfsFile = HDFS_SPLIT_STRING + HadoopConfig.getDirName() + HDFS_SPLIT_STRING + HadoopConfig.getFileName();
72
        String localFile = HadoopConfig.getDownloadLocalFile();
73
        hdfs.copyToLocalFile(false, new Path(hdfsFile), new Path(localFile));
74
        System.out.println("Finish!");
75
    }
76
77
    @Test
78
    public void listFile() throws Exception {
79
        iteratorListFile(hdfs, new Path(HDFS_SPLIT_STRING));
80
    }
81
82
    @Test
83
    public void locateFile() throws Exception {
84
        Path file = new Path(HDFS_SPLIT_STRING + HadoopConfig.getDirName() + HDFS_SPLIT_STRING + HadoopConfig.getFileName());
85
        FileStatus fileStatus = hdfs.getFileStatus(file);
86
        BlockLocation[] location = hdfs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
87
        for (BlockLocation block : location) {
88
            String[] hosts = block.getHosts();
89
            for (String host : hosts) {
90
                System.out.println("block:" + block + " host:" + host);
91
            }
92
        }
93
    }
94
95
    private static void iteratorListFile(FileSystem hdfs, Path path) throws Exception {
96
        FileStatus[] files = hdfs.listStatus(path);
97
        for (FileStatus file : files) {
98
            if (file.isDirectory()) {
99
                System.out.println(file.getPermission() + " " + file.getOwner()
100
                        + " " + file.getGroup() + " " + file.getPath());
101
                iteratorListFile(hdfs, file.getPath());
102
            } else if (file.isFile()) {
103
                System.out.println(file.getPermission() + " " + file.getOwner()
104
                        + " " + file.getGroup() + " " + file.getPath());
105
            }
106
        }
107
    }
108
}

+ 14 - 0
ipu-hadoop-example/src/test/resources/hadoop.properties

@ -0,0 +1,14 @@
1
#要连接的Hadoop地址URI, 如hdfs://ip:port
2
hdfs.path=hdfs://iZm5e5xe1w25avi0io1f5aZ:9000
3
#是否支持使用Host Name连接Hadoop, true:支持<此处应设置为true> false:不支持
4
dfs.client.use.datanode.hostname=true
5
#测试创建的Dir的名称
6
dir.name=hdfstest
7
#测试创建的file的名称, 文件路径为 /mkdir.name/
8
file.name=touchfile
9
#测试写入文件的字符串
10
write.string=abcdefghijklmnopqrstuvwxyz1234567890
11
#测试上传到hdfs的文件名,本地需要有对应的文件
12
upload.local.file=aaa.txt
13
#测试下载之后存在本地的文件名
14
download.local.file=bbb.txt

+ 8 - 0
ipu-hadoop-example/src/test/resources/log4j.properties

@ -0,0 +1,8 @@
1
log4j.rootLogger=error, console
2
log4j.logger.org.apache.zookeeper=error
3
log4j.logger.com.ai.ipu.example=debug
4
5
log4j.appender.console=org.apache.log4j.ConsoleAppender
6
log4j.appender.console.target=System.out
7
log4j.appender.console.layout=org.apache.log4j.PatternLayout
8
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %5p [%t] (%F:%L) - %m%n

+ 61 - 0
ipu-hbase-example/pom.xml

@ -0,0 +1,61 @@
1
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
2
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
3
	<modelVersion>4.0.0</modelVersion>
4
5
	<parent>
6
		<groupId>com.ai.ipu</groupId>
7
		<artifactId>ipu-aggregator</artifactId>
8
		<version>3.1-SNAPSHOT</version>
9
	</parent>
10
11
	<groupId>com.ai.ipu.example</groupId>
12
	<artifactId>ipu-hbase-example</artifactId>
13
	<version>1.0</version>
14
	<packaging>jar</packaging>
15
16
	<name>ipu-hbase-example</name>
17
	<url>http://maven.apache.org</url>
18
19
	<properties>
20
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
21
		<ipu>3.1-SNAPSHOT</ipu>
22
		<jdk>1.8</jdk>
23
		<junit>4.12</junit>
24
		<hbase>1.2.6</hbase>
25
	</properties>
26
27
	<dependencies>
28
		<dependency>
29
			<groupId>junit</groupId>
30
			<artifactId>junit</artifactId>
31
			<version>${junit}</version>
32
		</dependency>
33
		<dependency>
34
			<groupId>org.apache.hbase</groupId>
35
			<artifactId>hbase-client</artifactId>
36
			<version>${hbase}</version>
37
		</dependency>
38
		<dependency>
39
			<groupId>org.apache.hbase</groupId>
40
			<artifactId>hbase-server</artifactId>
41
			<version>${hbase}</version>
42
		</dependency>
43
44
		<dependency>
45
			<groupId>com.ai.ipu</groupId>
46
			<artifactId>ipu-basic</artifactId>
47
		</dependency>
48
	</dependencies>
49
50
	<build>
51
		<plugins>
52
			<plugin>
53
                <groupId>org.apache.maven.plugins</groupId>
54
                <artifactId>maven-surefire-plugin</artifactId>
55
                <configuration>
56
                    <skip>true</skip>
57
                </configuration>
58
            </plugin>
59
		</plugins>
60
	</build>
61
</project>

+ 76 - 0
ipu-hbase-example/src/test/java/com/ai/ipu/example/hbase/HbaseConfig.java

@ -0,0 +1,76 @@
1
package com.ai.ipu.example.hbase;
2
3
import com.ai.ipu.basic.file.ResourceBundleUtil;
4
import com.ai.ipu.basic.log.ILogger;
5
import com.ai.ipu.basic.log.IpuLoggerFactory;
6
7
/**
8
 * 类描述
9
 *
10
 * @author lilb3@asiainfo.com
11
 * @since 2019-07-01 15:36
12
 **/
13
public class HbaseConfig {
14
    private static final ILogger LOGGER = IpuLoggerFactory.createLogger(HbaseConfig.class);
15
    private static final String CONFIG_FILE_PATH = "hbase";
16
    /*hadoop的安装目录*/
17
    private static String hadoopHomeDir;
18
    /*hbase在hadoop上的存储路径*/
19
    private static String hbaseRootdir;
20
    /*hbase连接的zookeeper集群地址*/
21
    private static String hbaseZookeeperQuorum;
22
    /*hbase表名*/
23
    private static String tableName;
24
    /*hbase表的列族*/
25
    private static String tableCloumns;
26
    /*studeng的格式*/
27
    private static String key;
28
    private static String zhangShan;
29
    private static String liSi;
30
    private static String wangWu;
31
32
    public static String getHadoopHomeDir() {
33
        return hadoopHomeDir;
34
    }
35
36
    public static String getHbaseRootdir() {
37
        return hbaseRootdir;
38
    }
39
40
    public static String getHbaseZookeeperQuorum() {
41
        return hbaseZookeeperQuorum;
42
    }
43
44
    public static String getTableName() {
45
        return tableName;
46
    }
47
48
    public static String getTableCloumns() {
49
        return tableCloumns;
50
    }
51
52
    public static String getKey() {
53
        return key;
54
    }
55
56
    public static String getZhangShan() {
57
        return zhangShan;
58
    }
59
60
    public static String getLiSi() {
61
        return liSi;
62
    }
63
64
    public static String getWangWu() {
65
        return wangWu;
66
    }
67
68
    /*加载配置文件*/
69
    static {
70
        try {
71
            ResourceBundleUtil.initialize(CONFIG_FILE_PATH, HbaseConfig.class);
72
        } catch (Exception e) {
73
            LOGGER.error(CONFIG_FILE_PATH + "配置文件读取失败", e);
74
        }
75
    }
76
}

+ 381 - 0
ipu-hbase-example/src/test/java/com/ai/ipu/example/hbase/HbaseExample.java

@ -0,0 +1,381 @@
1
package com.ai.ipu.example.hbase;
2
3
import org.apache.hadoop.conf.Configuration;
4
import org.apache.hadoop.hbase.Cell;
5
import org.apache.hadoop.hbase.CellUtil;
6
import org.apache.hadoop.hbase.HColumnDescriptor;
7
import org.apache.hadoop.hbase.HTableDescriptor;
8
import org.apache.hadoop.hbase.TableName;
9
import org.apache.hadoop.hbase.client.Admin;
10
import org.apache.hadoop.hbase.client.Connection;
11
import org.apache.hadoop.hbase.client.ConnectionFactory;
12
import org.apache.hadoop.hbase.client.Delete;
13
import org.apache.hadoop.hbase.client.Get;
14
import org.apache.hadoop.hbase.client.Put;
15
import org.apache.hadoop.hbase.client.Result;
16
import org.apache.hadoop.hbase.client.ResultScanner;
17
import org.apache.hadoop.hbase.client.Scan;
18
import org.apache.hadoop.hbase.client.Table;
19
import org.apache.hadoop.hbase.filter.CompareFilter;
20
import org.apache.hadoop.hbase.filter.FilterList;
21
import org.apache.hadoop.hbase.filter.PrefixFilter;
22
import org.apache.hadoop.hbase.filter.RegexStringComparator;
23
import org.apache.hadoop.hbase.filter.RowFilter;
24
import org.apache.hadoop.hbase.util.Bytes;
25
import org.junit.After;
26
import org.junit.Before;
27
import org.junit.Test;
28
29
import java.util.ArrayList;
30
import java.util.List;
31
32
/**
33
 * Hbase测试
34
 *
35
 * @author lilb3@asiainfo.com
36
 * @since 2019-07-01 15:36
37
 */
38
public class HbaseExample {
39
    private static final String SPLIT_STRING = "###";
40
    private static final String[] TABLE_COLUMNS = HbaseConfig.getTableCloumns().split(SPLIT_STRING);
41
    private static final String[] KEYS = HbaseConfig.getKey().split(SPLIT_STRING);
42
    private static final String TABLE_NAME = HbaseConfig.getTableName();
43
    private Connection connection = null;
44
45
    private Table table = null;
46
47
    private Admin admin = null;
48
49
    /**
50
     * 初始化配置
51
     *
52
     * @throws Exception 异常
53
     */
54
    @Before
55
    public void setBefore() throws Exception {
56
        Configuration configuration = new Configuration();
57
        System.setProperty("hadoop.home.dir", HbaseConfig.getHadoopHomeDir());
58
        configuration.set("hbase.rootdir", HbaseConfig.getHbaseRootdir());
59
        configuration.set("hbase.zookeeper.quorum", HbaseConfig.getHbaseZookeeperQuorum());
60
        connection = ConnectionFactory.createConnection(configuration);
61
        admin = connection.getAdmin();
62
    }
63
64
    /**
65
     * 关闭连接
66
     *
67
     * @throws Exception 异常
68
     */
69
    @After
70
    public void setAfter() throws Exception {
71
        if (connection != null) connection.close();
72
    }
73
74
    /**
75
     * 创建表
76
     *
77
     * @throws Exception 异常
78
     */
79
    @Test
80
    public void createTable() throws Exception {
81
        if (admin.tableExists(TableName.valueOf(TABLE_NAME))) {
82
            System.out.println("表已经存在");
83
        } else {
84
            HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf(TABLE_NAME));
85
            //添加列族columnFamily ,不必指定列
86
            String[] tableColumns = HbaseConfig.getTableCloumns().split(SPLIT_STRING);
87
            for (String column : tableColumns) {
88
                hTableDescriptor.addFamily(new HColumnDescriptor(column));
89
            }
90
            admin.createTable(hTableDescriptor);
91
            System.out.println("表" + TABLE_NAME + "创建成功");
92
        }
93
    }
94
95
    /**
96
     * 查看表和表结构
97
     *
98
     * @throws Exception 异常
99
     */
100
    @Test
101
    public void listTable() throws Exception {
102
        HTableDescriptor[] tableList = admin.listTables();
103
        if (tableList.length > 0) {
104
            for (HTableDescriptor ta : tableList) {
105
                System.out.println(ta.getNameAsString());
106
                for (HColumnDescriptor column :
107
                        ta.getColumnFamilies()) {
108
                    System.out.println("\t" + column.getNameAsString());
109
                }
110
            }
111
        }
112
    }
113
114
    /**
115
     * 添加单条记录
116
     *
117
     * @throws Exception 异常
118
     */
119
    @Test
120
    public void putOne() throws Exception {
121
122
        Table table = connection.getTable(TableName.valueOf(TABLE_NAME));
123
        //创建 put,并制定 put 的Rowkey
124
        Put put = new Put(Bytes.toBytes("wangwu"));
125
        String[] values = HbaseConfig.getWangWu().split(SPLIT_STRING);
126
        for (int i = 0; i < TABLE_COLUMNS.length; i++) {
127
            put.addColumn(Bytes.toBytes(TABLE_COLUMNS[i]), Bytes.toBytes(KEYS[2 * i]), Bytes.toBytes(values[2 * i]));
128
            put.addColumn(Bytes.toBytes(TABLE_COLUMNS[i]), Bytes.toBytes(KEYS[2 * i + 1]), Bytes.toBytes(values[2 * i + 1]));
129
        }
130
        table.put(put);
131
        System.out.println("添加成功。");
132
    }
133
134
    /**
135
     * 添加多条记录
136
     *
137
     * @throws Exception 异常
138
     */
139
    @Test
140
    public void putList() throws Exception {
141
142
        Table table = connection.getTable(TableName.valueOf(TABLE_NAME));
143
        //创建 put,并制定 put 的Rowkey
144
        Put put = new Put(Bytes.toBytes("zhangsan"));
145
        String[] values = HbaseConfig.getZhangShan().split(SPLIT_STRING);
146
        for (int i = 0; i < TABLE_COLUMNS.length; i++) {
147
            put.addColumn(Bytes.toBytes(TABLE_COLUMNS[i]), Bytes.toBytes(KEYS[2 * i]), Bytes.toBytes(values[2 * i]));
148
            put.addColumn(Bytes.toBytes(TABLE_COLUMNS[i]), Bytes.toBytes(KEYS[2 * i + 1]), Bytes.toBytes(values[2 * i + 1]));
149
        }
150
151
        Put put1 = new Put(Bytes.toBytes("lisi"));
152
        String[] values1 = HbaseConfig.getLiSi().split(SPLIT_STRING);
153
        for (int i = 0; i < TABLE_COLUMNS.length; i++) {
154
            put1.addColumn(Bytes.toBytes(TABLE_COLUMNS[i]), Bytes.toBytes(KEYS[2 * i]), Bytes.toBytes(values1[2 * i]));
155
            put1.addColumn(Bytes.toBytes(TABLE_COLUMNS[i]), Bytes.toBytes(KEYS[2 * i + 1]), Bytes.toBytes(values1[2 * i + 1]));
156
        }
157
158
        List<Put> list = new ArrayList<>();
159
        list.add(put);
160
        list.add(put1);
161
162
        table.put(list);
163
        System.out.println("添加多条记录成功。");
164
    }
165
166
    /**
167
     * 根据Rowkey进行查询
168
     *
169
     * @throws Exception 异常
170
     */
171
    @Test
172
    public void scanByRowKey() throws Exception {
173
        String rowKey = "zhangsan";
174
        table = connection.getTable(TableName.valueOf(TABLE_NAME));
175
        Get get = new Get(rowKey.getBytes());
176
177
        Result result = table.get(get);
178
        System.out.println("列族\t列名\t内容");
179
        for (Cell cell : result.rawCells()) {
180
            System.out.println(
181
                    Bytes.toString(CellUtil.cloneRow(cell)) + "\t" +  //Rowkey
182
                            Bytes.toString(CellUtil.cloneFamily(cell)) + "\t" + //CF
183
                            Bytes.toString(CellUtil.cloneQualifier(cell)) + "\t" +//qualifier
184
                            Bytes.toString(CellUtil.cloneValue(cell)) //value
185
            );
186
        }
187
    }
188
189
    /**
190
     * 根据Rowkey和列族和列进行查询内容
191
     *
192
     * @throws Exception 异常
193
     */
194
    @Test
195
    public void scanByRowKey1() throws Exception {
196
        String rowKey = "zhangsan";
197
        table = connection.getTable(TableName.valueOf(TABLE_NAME));
198
        Get get = new Get(rowKey.getBytes());
199
        get.addColumn(Bytes.toBytes(TABLE_COLUMNS[2]), Bytes.toBytes(KEYS[5]));
200
201
        Result result = table.get(get);
202
        System.out.println("列族rowKey = \"zhangsan\"列族为" + TABLE_COLUMNS[2] + ",列为" + KEYS[5] + "的值是:");
203
        String value = Bytes.toString(result.getValue(Bytes.toBytes(TABLE_COLUMNS[2]), Bytes.toBytes(KEYS[5])));
204
        System.out.println(value);
205
    }
206
207
    /**
208
     * 全表查询
209
     *
210
     * @throws Exception 异常
211
     */
212
    @Test
213
    public void scanTable() throws Exception {
214
        table = connection.getTable(TableName.valueOf(TABLE_NAME));
215
        Scan scan = new Scan();
216
        ResultScanner rs = table.getScanner(scan);
217
        for (Result result : rs) {
218
            printResult(result);
219
        }
220
    }
221
222
    /**
223
     * Scan 查询 添加个条件,Scan 对象参数是  Rowkey
224
     *
225
     * @throws Exception 异常
226
     */
227
    @Test
228
    public void scanTable1() throws Exception {
229
        table = connection.getTable(TableName.valueOf(TABLE_NAME));
230
        Scan scan = new Scan(Bytes.toBytes("wangwu"), Bytes.toBytes("wangwu"));
231
232
        ResultScanner rs = table.getScanner(scan);
233
        for (Result result : rs) {
234
            printResult(result);
235
        }
236
    }
237
238
    /**
239
     * 定义私有方法 用于打印表的信息
240
     *
241
     * @param result 需要打印的结果
242
     */
243
    private void printResult(Result result) {
244
        for (Cell cell :
245
                result.rawCells()) {
246
            System.out.println(
247
                    //Rowkey
248
                    Bytes.toString(CellUtil.cloneRow(cell)) + "\t" +
249
                            //CF
250
                            Bytes.toString(CellUtil.cloneFamily(cell)) + "\t" +
251
                            //qualifier
252
                            Bytes.toString(CellUtil.cloneQualifier(cell)) + "\t" +
253
                            //value
254
                            Bytes.toString(CellUtil.cloneValue(cell))
255
            );
256
        }
257
        System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
258
    }
259
260
261
    /**
262
     * 条件过滤  Filter
263
     * 查询后最名 为  wu的
264
     * Hbase 中数据是按照字典顺序进行排列的
265
     * 不会因为插入的时间不同而不同
266
     *
267
     * @throws Exception 异常
268
     */
269
    @Test
270
    public void rowFilter() throws Exception {
271
        table = connection.getTable(TableName.valueOf(TABLE_NAME));
272
        String reg = "^*wu";
273
        org.apache.hadoop.hbase.filter.Filter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator(reg));
274
        Scan scan = new Scan();
275
        scan.setFilter(filter);
276
        ResultScanner results = table.getScanner(scan);
277
        for (Result result : results) {
278
            printResult(result);
279
        }
280
    }
281
282
    /**
283
     * 前缀名过滤
284
     *
285
     * @throws Exception 异常
286
     */
287
    @Test
288
    public void prefixFilter() throws Exception {
289
        table = connection.getTable(TableName.valueOf(TABLE_NAME));
290
        org.apache.hadoop.hbase.filter.Filter filter = new PrefixFilter(Bytes.toBytes("li"));
291
        Scan scan = new Scan();
292
        scan.setFilter(filter);
293
        ResultScanner results = table.getScanner(scan);
294
        for (Result result : results) {
295
            printResult(result);
296
        }
297
    }
298
299
    /**
300
     * 多条件过滤  FilterList
301
     * <p>
302
     * MUST_PASS_ALL 全都满足条件
303
     * <p>
304
     * MUST_PASS_ONE  满足一个就可以
305
     *
306
     * @throws Exception 异常
307
     */
308
    @Test
309
    public void filterList() throws Exception {
310
        table = connection.getTable(TableName.valueOf(TABLE_NAME));
311
        FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE);
312
        org.apache.hadoop.hbase.filter.Filter filter = new PrefixFilter(Bytes.toBytes("li"));
313
        org.apache.hadoop.hbase.filter.Filter filter2 = new PrefixFilter(Bytes.toBytes("zh"));
314
        filterList.addFilter(filter);
315
        filterList.addFilter(filter2);
316
        Scan scan = new Scan();
317
        scan.setFilter(filterList);
318
        ResultScanner results = table.getScanner(scan);
319
        for (Result result : results) {
320
            printResult(result);
321
        }
322
    }
323
324
    /**
325
     * 删除数据
326
     *
327
     * @throws Exception 异常
328
     */
329
    @Test
330
    public void delete() throws Exception {
331
        table = connection.getTable(TableName.valueOf(TABLE_NAME));
332
        Delete delete = new Delete(Bytes.toBytes("wangwu"));
333
        table.delete(delete);
334
        System.out.println("删除成功。");
335
    }
336
337
    /**
338
     * 删除数据
339
     *
340
     * @throws Exception 异常
341
     */
342
    @Test
343
    public void multiDelete() throws Exception {
344
        table = connection.getTable(TableName.valueOf(TABLE_NAME));
345
        Delete delete = new Delete(Bytes.toBytes("zhangsan"));
346
        //删除当前版本的内容
347
        // delete.addColumn(Bytes.toBytes("more"),Bytes.toBytes("good"));
348
        //删除所有版本
349
        //delete.addColumns(Bytes.toBytes("more"),Bytes.toBytes("good"));
350
351
        /*
352
         * addColumn addColumns 是不一样的 ,记住了,
353
         *
354
         * addColumn 是删除单元格当前最近版本的 内容,
355
         *
356
         * addColumns 是删除单元格所有版本的内容
357
         * */
358
        //删除列族
359
        delete.addFamily(Bytes.toBytes("info"));
360
361
        table.delete(delete);
362
        System.out.println("删除列成功。");
363
    }
364
365
    /**
366
     * 删除表
367
     *
368
     * @throws Exception 异常
369
     */
370
    @Test
371
    public void dropTable() throws Exception {
372
        table = connection.getTable(TableName.valueOf(TABLE_NAME));
373
        if (admin.tableExists(TableName.valueOf(TABLE_NAME))) {
374
            admin.disableTable(TableName.valueOf(TABLE_NAME));
375
            admin.deleteTable(TableName.valueOf(TABLE_NAME));
376
            System.out.println("删除成功");
377
        } else {
378
            System.out.println("表不存在");
379
        }
380
    }
381
}

+ 15 - 0
ipu-hbase-example/src/test/resources/hbase.properties

@ -0,0 +1,15 @@
1
#hadoop的安装目录 mysql
2
hadoop.home.dir=/home/mysql/software/hadoop273
3
#hbase在hadoop上的存储路径 iZm5e5xe1w25avi0io1f5aZ
4
hbase.rootdir=hdfs://iZm5e5xe1w25avi0io1f5aZ:9000/hbase
5
#hbase连接的zookeeper集群地址 iZm5e5xe1w25avi0io1f5aZ:2101,iZm5e5xe1w25avi0io1f5aZ:2102,iZm5e5xe1w25avi0io1f5aZ:2103
6
hbase.zookeeper.quorum=iZm5e5xe1w25avi0io1f5aZ:2101,iZm5e5xe1w25avi0io1f5aZ:2102,iZm5e5xe1w25avi0io1f5aZ:2103
7
#hbase表名
8
table.name=school
9
#hbase表的列族 <不能修改>
10
table.cloumns=info###address###more
11
#student的格式
12
key=age###sex###city###address###hobby###good
13
zhang.shan=18###男###beijing###tianjing###girl###basketball
14
li.si=22###女###nanjing###fuzhou###boy###ball
15
wang.wu=20###男###changsha###yueyang###girl###walk

+ 8 - 0
ipu-hbase-example/src/test/resources/log4j.properties

@ -0,0 +1,8 @@
1
log4j.rootLogger=error, console
2
log4j.logger.org.apache.zookeeper=error
3
log4j.logger.com.ai.ipu.example=debug
4
5
log4j.appender.console=org.apache.log4j.ConsoleAppender
6
log4j.appender.console.target=System.out
7
log4j.appender.console.layout=org.apache.log4j.PatternLayout
8
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %5p [%t] (%F:%L) - %m%n

+ 86 - 0
ipu-spark-example/pom.xml

@ -0,0 +1,86 @@
1
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
2
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
3
	<modelVersion>4.0.0</modelVersion>
4
5
	<parent>
6
		<groupId>com.ai.ipu</groupId>
7
		<artifactId>ipu-aggregator</artifactId>
8
		<version>3.1-SNAPSHOT</version>
9
	</parent>
10
11
	<groupId>com.ai.ipu.example</groupId>
12
	<artifactId>ipu-spark-example</artifactId>
13
	<version>1.0</version>
14
	<packaging>jar</packaging>
15
16
	<name>ipu-spark-example</name>
17
	<url>http://maven.apache.org</url>
18
19
	<properties>
20
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
21
		<ipu>3.1-SNAPSHOT</ipu>
22
		<jdk>1.8</jdk>
23
		<junit>4.12</junit>
24
		<spark>2.4.1</spark>
25
		<slf4j-api>1.7.16</slf4j-api>
26
	</properties>
27
28
	<dependencies>
29
		<dependency>
30
			<groupId>junit</groupId>
31
			<artifactId>junit</artifactId>
32
			<version>${junit}</version>
33
			<scope>test</scope>
34
		</dependency>
35
36
		<dependency>
37
			<groupId>com.ai.ipu</groupId>
38
			<artifactId>ipu-cache</artifactId>
39
			<version>${ipu}</version>
40
		</dependency>
41
42
		<dependency>
43
			<groupId>org.apache.hadoop</groupId>
44
			<artifactId>hadoop-client</artifactId>
45
			<version>2.7.3</version>
46
		</dependency>
47
48
		<!--<dependency>
49
			<groupId>io.netty</groupId>
50
			<artifactId>netty-all</artifactId>
51
			<version>4.1.17.Final</version>
52
		</dependency>-->
53
54
		<dependency>
55
			<groupId>org.apache.spark</groupId>
56
			<artifactId>spark-core_2.11</artifactId>
57
			<version>${spark}</version>
58
		</dependency>
59
60
		<!-- java.lang.NoSuchMethodError: org.slf4j.helpers.Util.reportFailure(Ljava/lang/String;)V -->
61
		<dependency>
62
			<groupId>org.slf4j</groupId>
63
			<artifactId>slf4j-log4j12</artifactId>
64
			<version>${slf4j-api}</version>
65
		</dependency>
66
67
		<!-- Failed to set setXIncludeAware(true) for parser -->
68
		<!--<dependency>
69
			<groupId>xerces</groupId>
70
			<artifactId>xercesImpl</artifactId>
71
			<version>2.9.1</version>
72
		</dependency>-->
73
	</dependencies>
74
75
	<build>
76
		<plugins>
77
			<plugin>
78
                <groupId>org.apache.maven.plugins</groupId>
79
                <artifactId>maven-surefire-plugin</artifactId>
80
                <configuration>
81
                    <skip>true</skip>
82
                </configuration>
83
            </plugin>
84
		</plugins>
85
	</build>
86
</project>

+ 46 - 0
ipu-spark-example/src/main/java/com/ai/ipu/example/spark/SparkConfig.java

@ -0,0 +1,46 @@
1
package com.ai.ipu.example.spark;
2
3
import com.ai.ipu.basic.file.ResourceBundleUtil;
4
5
/**
6
 * 类描述
7
 *
8
 * @author lilb3@asiainfo.com
9
 * @since 2019-07-01 15:36
10
 **/
11
public class SparkConfig {
12
    private static final String CONFIG_FILE_PATH = "spark";
13
    /*Spark集群*/
14
    private static String sparkMaster;
15
    /*Hadoop地址*/
16
    private static String hdfsUri;
17
    /*测试文件*/
18
    private static String testFileName;
19
    /*项目打包路径*/
20
    private static String jarFilePath;
21
22
    public static String getSparkMaster() {
23
        return sparkMaster;
24
    }
25
26
    public static String getHdfsUri() {
27
        return hdfsUri;
28
    }
29
30
    public static String getTestFileName() {
31
        return testFileName;
32
    }
33
34
    public static String getJarFilePath() {
35
        return jarFilePath;
36
    }
37
38
    /*加载配置文件*/
39
    static {
40
        try {
41
            ResourceBundleUtil.initialize(CONFIG_FILE_PATH, SparkConfig.class);
42
        } catch (Exception e) {
43
            System.out.println(CONFIG_FILE_PATH + "配置文件读取失败" + e);
44
        }
45
    }
46
}

+ 125 - 0
ipu-spark-example/src/main/java/com/ai/ipu/example/spark/SparkExample.java

@ -0,0 +1,125 @@
1
package com.ai.ipu.example.spark;
2
3
import org.apache.spark.SparkConf;
4
import org.apache.spark.api.java.JavaPairRDD;
5
import org.apache.spark.api.java.JavaRDD;
6
import org.apache.spark.api.java.JavaSparkContext;
7
import org.apache.spark.api.java.function.FlatMapFunction;
8
import org.apache.spark.api.java.function.Function2;
9
import org.apache.spark.api.java.function.PairFunction;
10
import org.apache.spark.api.java.function.VoidFunction;
11
import scala.Tuple2;
12
13
import java.util.Arrays;
14
import java.util.Iterator;
15
import java.util.List;
16
17
/**
18
 * Spark统计单词个数示例
19
 *
20
 * @author lilb3@asiainfo.com
21
 * @since 2019-07-01 15:36
22
 **/
23
public class SparkExample {
24
    public static void main(String[] args) {
25
        /*
26
         * 对于所有的spark程序所言,要进行所有的操作,首先要创建一个spark上下文。
27
         * 在创建上下文的过程中,程序会向集群申请资源及构建相应的运行环境。
28
         * 设置spark应用程序名称
29
         * 创建的 sparkContext 唯一需要的参数就是 sparkConf,它是一组 K-V 属性对。
30
         */
31
        SparkConf sparkConf = new SparkConf().setAppName("SparkExample").setMaster(SparkConfig.getSparkMaster());
32
        /*java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.f$3 of type org.apache.spark.api.java.function.FlatMapFunction in instance of org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1*/
33
        sparkConf.setJars(new String[]{SparkConfig.getJarFilePath()});
34
35
        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
36
37
        // 用List构建JavaRDD
38
        List<String> data = Arrays.asList("1", "2", "3", "4", "5");
39
        JavaRDD<String> distData = jsc.parallelize(data);
40
        List<String> intLine = distData.collect();
41
        for (String string : intLine) {
42
            System.out.println("List构建: " + string);
43
        }
44
45
        // 从hdfs读取文件形成RDD
46
        String textFilePath = SparkConfig.getHdfsUri() + "/" + SparkConfig.getTestFileName();
47
        /*
48
         * 利用textFile接口从文件系统中读入指定的文件,返回一个RDD实例对象。
49
         * RDD的初始创建都是由SparkContext来负责的,将内存中的集合或者外部文件系统作为输入源。
50
         * RDD:弹性分布式数据集,即一个 RDD 代表一个被分区的只读数据集。一个 RDD 的生成只有两种途径,
51
         * 一是来自于内存集合和外部存储系统,另一种是通过转换操作来自于其他 RDD,比如 Map、Filter、Join,等等。
52
         * textFile()方法可将本地文件或HDFS文件转换成RDD,读取本地文件需要各节点上都存在,或者通过网络共享该文件
53
         */
54
        JavaRDD<String> lines = jsc.textFile(textFilePath);
55
        List<String> stringLine = lines.collect();
56
        for (String string : stringLine) {
57
            System.out.println("文件读取: " + string);
58
        }
59
60
        // 行数据的分割,调用flatMap函数
61
        /*
62
         * new FlatMapFunction<String, String>两个string分别代表输入和输出类型
63
         * Override的call方法需要自己实现一个转换的方法,并返回一个Iterable的结构
64
         *
65
         * flatmap属于一类非常常用的spark函数,简单的说作用就是将一条rdd数据使用你定义的函数给分解成多条rdd数据
66
         * 例如,当前状态下,lines这个rdd类型的变量中,每一条数据都是一行String,我们现在想把他拆分成1个个的词的话,
67
         */
68
        //flatMap与map的区别是,对每个输入,flatMap会生成一个或多个的输出,而map只是生成单一的输出
69
        //用空格分割各个单词,输入一行,输出多个对象,所以用flatMap
70
        JavaRDD<String> words = lines.flatMap((FlatMapFunction<String, String>) s -> {
71
            String[] words1 = s.split(" ");
72
            return Arrays.asList(words1).iterator();
73
        });
74
75
        // 将数据转换为key/value键值对
76
        /*
77
         * pairFunction<T,K,V>: T:输入类型;K,V:输出键值对
78
         * 表示输入类型为T,生成的key-value对中的key类型为k,value类型为v,对本例,T=String, K=String, V=Integer(计数)
79
         * 需要重写call方法实现转换
80
         */
81
        //scala.Tuple2<K,V> call(T t)
82
        //Tuple2为scala中的一个对象,call方法的输入参数为T,即输入一个单词s,新的Tuple2对象的key为这个单词,计数为1
83
        JavaPairRDD<String, Integer> ones = words.mapToPair((PairFunction<String, String, Integer>) s -> {
84
            System.out.println("mapToPair call: " + s);
85
            return new Tuple2<>(s, 1);
86
        });
87
88
        // 聚合结果
89
        /*
90
         * 调用reduceByKey方法,按key值进行reduce
91
         *  reduceByKey方法,类似于MR的reduce
92
         *  要求被操作的数据(即下面实例中的ones)是KV键值对形式,该方法会按照key相同的进行聚合,在两两运算
93
         *  若ones有<"one", 1>, <"one", 1>,会根据"one"将相同的pair单词个数进行统计,输入为Integer,输出也为Integer输出<"one", 2>
94
         */
95
        // reduce阶段,key相同的value怎么处理的问题
96
        // 备注:spark也有reduce方法,输入数据是RDD类型就可以,不需要键值对,
97
        // reduce方法会对输入进来的所有数据进行两两运算
98
        JavaPairRDD<String, Integer> counts = ones.reduceByKey((Function2<Integer, Integer, Integer>) (i1, i2) -> {
99
            System.out.println("reduceByKey call:" + i1 + ", " + i2);
100
            return i1 + i2;
101
        });
102
103
        // 结果输出到HDFS,Windows对应的是真实目录,如:D:/logs/spark_test/1555319746196,父目录必须存在,否则不执行
104
        counts.saveAsTextFile(SparkConfig.getHdfsUri() + "/spark/" + System.currentTimeMillis());
105
106
107
        /*
108
         * 结果转化为常见类型输出
109
         * collect方法用于将spark的RDD类型转化为我们熟知的java常见类型
110
         */
111
        List<Tuple2<String, Integer>> output = counts.collect();
112
        for (Tuple2<?, ?> tuple : output) {
113
            System.out.println(tuple._1() + ": " + tuple._2());
114
        }
115
116
        // 直接输出
117
        counts.foreachPartition((VoidFunction<Iterator<Tuple2<String, Integer>>>) tuple2Iterator -> {
118
            while (tuple2Iterator.hasNext()) {
119
                Tuple2<String, Integer> t2 = tuple2Iterator.next();
120
                System.out.println(t2._1() + ": " + t2._2());
121
            }
122
        });
123
        jsc.stop();
124
    }
125
}

+ 8 - 0
ipu-spark-example/src/main/resources/log4j.properties

@ -0,0 +1,8 @@
1
log4j.rootLogger=error, console
2
log4j.logger.org.apache.zookeeper=error
3
log4j.logger.com.ai.ipu.example=debug
4
5
log4j.appender.console=org.apache.log4j.ConsoleAppender
6
log4j.appender.console.target=System.out
7
log4j.appender.console.layout=org.apache.log4j.PatternLayout
8
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %5p [%t] (%F:%L) - %m%n

+ 8 - 0
ipu-spark-example/src/main/resources/spark.properties

@ -0,0 +1,8 @@
1
#Spark集群
2
spark.master=spark://iZm5e5xe1w25avi0io1f5aZ:7077
3
#Hadoop地址
4
hdfs.uri=hdfs://iZm5e5xe1w25avi0io1f5aZ:9000
5
#测试文件
6
test.file.name=aaa.txt
7
#项目打包路径
8
jar.file.path=D:\\ideaws\\rest\\code-example\\ipu-spark-example\\target\\ipu-spark-example-1.0.jar