瀏覽代碼

Hadoop+Hbase+Spark测试用例代码提交

lilb3 6 年之前
父節點
當前提交
c621b8bb42

+ 60 - 1
comp-example/pom.xml

20
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
20
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
21
		<ipu>3.1-SNAPSHOT</ipu>
21
		<ipu>3.1-SNAPSHOT</ipu>
22
		<jdk>1.8</jdk>
22
		<jdk>1.8</jdk>
23
		<slf4j-api>1.7.16</slf4j-api>
24
		<startClass>com.ai.ipu.example.spark.SparkExample</startClass>
23
	</properties>
25
	</properties>
24
26
25
	<dependencies>
27
	<dependencies>
29
			<version>4.12</version>
31
			<version>4.12</version>
30
			<scope>test</scope>
32
			<scope>test</scope>
31
		</dependency>
33
		</dependency>
34
32
		<dependency>
35
		<dependency>
33
			<groupId>com.ai.ipu</groupId>
36
			<groupId>com.ai.ipu</groupId>
34
			<artifactId>ipu-cache</artifactId>
37
			<artifactId>ipu-cache</artifactId>
64
			<artifactId>ipu-zk</artifactId>
67
			<artifactId>ipu-zk</artifactId>
65
			<version>${ipu}</version>
68
			<version>${ipu}</version>
66
		</dependency>
69
		</dependency>
67
	</dependencies>
68
70
71
		<dependency>
72
			<groupId>org.apache.hadoop</groupId>
73
			<artifactId>hadoop-client</artifactId>
74
			<version>2.7.3</version>
75
		</dependency>
76
77
		<dependency>
78
			<groupId>io.netty</groupId>
79
			<artifactId>netty-all</artifactId>
80
			<version>4.1.17.Final</version>
81
		</dependency>
69
82
83
		<dependency>
84
			<groupId>org.apache.hbase</groupId>
85
			<artifactId>hbase-client</artifactId>
86
			<version>1.2.6</version>
87
		</dependency>
88
		<dependency>
89
			<groupId>org.apache.hbase</groupId>
90
			<artifactId>hbase-server</artifactId>
91
			<version>1.2.6</version>
92
		</dependency>
93
94
		<dependency>
95
			<groupId>org.apache.spark</groupId>
96
			<artifactId>spark-core_2.11</artifactId>
97
			<version>2.4.1</version>
98
		</dependency>
99
100
		<dependency>
101
			<groupId>org.slf4j</groupId>
102
			<artifactId>slf4j-log4j12</artifactId>
103
			<version>1.7.16</version>
104
		</dependency>
105
106
		<!-- Failed to set setXIncludeAware(true) for parser -->
107
		<dependency>
108
			<groupId>xerces</groupId>
109
			<artifactId>xercesImpl</artifactId>
110
			<version>2.9.1</version>
111
		</dependency>
112
113
	</dependencies>
70
114
71
	<build>
115
	<build>
72
		<plugins>
116
		<plugins>
77
                    <skip>true</skip>
121
                    <skip>true</skip>
78
                </configuration>
122
                </configuration>
79
            </plugin>
123
            </plugin>
124
			<plugin>
125
				<groupId>org.springframework.boot</groupId>
126
				<artifactId>spring-boot-maven-plugin</artifactId>
127
				<executions>
128
					<execution>
129
						<goals>
130
							<goal>repackage</goal>
131
							<goal>build-info</goal>
132
						</goals>
133
					</execution>
134
				</executions>
135
				<configuration>
136
					<mainClass>${startClass}</mainClass>
137
				</configuration>
138
			</plugin>
80
		</plugins>
139
		</plugins>
81
	</build>
140
	</build>
82
</project>
141
</project>

+ 41 - 0
comp-example/src/main/java/com/ai/ipu/example/spark/SimpleApp.java

1
package com.ai.ipu.example.spark;
2
3
import org.apache.spark.SparkConf;
4
import org.apache.spark.api.java.JavaRDD;
5
import org.apache.spark.api.java.JavaSparkContext;
6
import org.apache.spark.api.java.function.Function;
7
8
/**
9
 * 类描述
10
 *
11
 * @Author: lilb3@asiainfo.com
12
 * @Date: 2019-04-11 21:14
13
 * 部署服务器方式:
14
 * a) SpringBoot打包部署 需要在pom指定startClass为com.ai.ipu.example.spark.SimpleApp
15
 * $SPARK_HOME/bin/spark-submit --class org.springframework.boot.loader.JarLauncher --master local[4] /home/mysql/test/comp-example-1.0.jar iZm5e5xe1w25avi0io1f5aZ
16
 * b) 普通打包部署
17
 * $SPARK_HOME/bin/spark-submit --class com.ai.ipu.example.spark.SimpleApp --master local[4] /home/mysql/test/comp-example-1.0.jar.original iZm5e5xe1w25avi0io1f5aZ
18
 **/
19
public class SimpleApp {
20
    public static void main(String[] args) {
21
        if(args.length < 1){
22
            System.out.println("参数不对" + "第一个参数是hadoop所在的主机ip");
23
            return;
24
        }
25
        String host = args[0];//iZm5e5xe1w25avi0io1f5aZ
26
        String logFile = "hdfs://"+ host +":9000/ccc.txt";
27
        SparkConf conf = new SparkConf().setAppName("Simple Application");
28
        // conf.setJars(new String[] {"D:\\ideaws\\rest\\rest-guide\\comp-example\\target\\comp-example-1.0.jar.original"});
29
        JavaSparkContext sc = new JavaSparkContext(conf);
30
        JavaRDD<String> logData = sc.textFile(logFile).cache();
31
32
        long numAs = logData.filter((Function<String, Boolean>) s -> s.contains("1")).count();
33
34
        long numBs = logData.filter((Function<String, Boolean>) s -> s.contains("2")).count();
35
36
        System.out.println("============================================================");
37
        System.out.println("Lines with 1: " + numAs + ", lines with 2: " + numBs);
38
        System.out.println("============================================================");
39
        sc.stop();
40
    }
41
}

+ 138 - 0
comp-example/src/main/java/com/ai/ipu/example/spark/SparkExample.java

1
package com.ai.ipu.example.spark;
2
3
import com.ai.ipu.basic.log.ILogger;
4
import com.ai.ipu.basic.log.IpuLoggerFactory;
5
import org.apache.spark.SparkConf;
6
import org.apache.spark.api.java.JavaPairRDD;
7
import org.apache.spark.api.java.JavaRDD;
8
import org.apache.spark.api.java.JavaSparkContext;
9
import org.apache.spark.api.java.function.FlatMapFunction;
10
import org.apache.spark.api.java.function.Function2;
11
import org.apache.spark.api.java.function.PairFunction;
12
import org.apache.spark.api.java.function.VoidFunction;
13
import scala.Tuple2;
14
15
import java.util.Arrays;
16
import java.util.Iterator;
17
import java.util.List;
18
19
/**
20
 * Spark统计单词个数示例
21
 *
22
 * @Author: lilb3@asiainfo.com
23
 * @Date: 2019-05-24 17:10
24
 * 部署服务器方式:
25
 * a) SpringBoot打包部署 需要在pom指定startClass为com.ai.ipu.example.spark.SparkExample
26
 * $SPARK_HOME/bin/spark-submit --class org.springframework.boot.loader.JarLauncher --master local[4] /home/mysql/test/comp-example-1.0.jar iZm5e5xe1w25avi0io1f5aZ
27
 * b) 普通打包部署 需要上传原包和依赖包 如框架日志ipu-basic-3.1-SNAPSHOT.jar, 多个jar以逗号隔开
28
 * $SPARK_HOME/bin/spark-submit --class com.ai.ipu.example.spark.SparkExample --master local[4] --jars /home/mysql/test/ipu-basic-3.1-SNAPSHOT.jar /home/mysql/test/comp-example-1.0.jar.original iZm5e5xe1w25avi0io1f5aZ
29
 **/
30
public class SparkExample {
31
    private static final ILogger LOGGER = IpuLoggerFactory.createLogger(SparkExample.class);
32
33
    public static void main(String[] args) {
34
        if(args.length < 1){
35
            System.out.println("参数不对" + "第一个参数是hadoop所在的主机ip");
36
            return;
37
        }
38
        String host = args[0];
39
        /*
40
         * 对于所有的spark程序所言,要进行所有的操作,首先要创建一个spark上下文。
41
         * 在创建上下文的过程中,程序会向集群申请资源及构建相应的运行环境。
42
         * 设置spark应用程序名称
43
         * 创建的 sparkContext 唯一需要的参数就是 sparkConf,它是一组 K-V 属性对。
44
         */
45
        SparkConf sparkConf = new SparkConf().setAppName("SparkExample");
46
        // sparkConf .setJars(new String[]{"D:\\ideaws\\rest\\rest-guide\\comp-example\\target\\comp-example-1.0.jar.original"});
47
48
        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
49
50
        // 用List构建JavaRDD
51
        List<String> data = Arrays.asList("1", "2", "3", "4", "5");
52
        JavaRDD<String> distData = jsc.parallelize(data);
53
        List<String> intLine = distData.collect();
54
        for (String string : intLine) {
55
            LOGGER.debug("List构建: " + string);
56
        }
57
58
        // 从hdfs读取文件形成RDD
59
        String textFilePath = "hdfs://" + host + ":9000/aaa.txt";
60
        /*
61
         * 利用textFile接口从文件系统中读入指定的文件,返回一个RDD实例对象。
62
         * RDD的初始创建都是由SparkContext来负责的,将内存中的集合或者外部文件系统作为输入源。
63
         * RDD:弹性分布式数据集,即一个 RDD 代表一个被分区的只读数据集。一个 RDD 的生成只有两种途径,
64
         * 一是来自于内存集合和外部存储系统,另一种是通过转换操作来自于其他 RDD,比如 Map、Filter、Join,等等。
65
         * textFile()方法可将本地文件或HDFS文件转换成RDD,读取本地文件需要各节点上都存在,或者通过网络共享该文件
66
         */
67
        JavaRDD<String> lines = jsc.textFile(textFilePath);
68
        List<String> stringLine = lines.collect();
69
        for (String string : stringLine) {
70
            LOGGER.debug("文件读取: " + string);
71
        }
72
73
        // 行数据的分割,调用flatMap函数
74
        /*
75
         * new FlatMapFunction<String, String>两个string分别代表输入和输出类型
76
         * Override的call方法需要自己实现一个转换的方法,并返回一个Iterable的结构
77
         *
78
         * flatmap属于一类非常常用的spark函数,简单的说作用就是将一条rdd数据使用你定义的函数给分解成多条rdd数据
79
         * 例如,当前状态下,lines这个rdd类型的变量中,每一条数据都是一行String,我们现在想把他拆分成1个个的词的话,
80
         */
81
        //flatMap与map的区别是,对每个输入,flatMap会生成一个或多个的输出,而map只是生成单一的输出
82
        //用空格分割各个单词,输入一行,输出多个对象,所以用flatMap
83
        JavaRDD<String> words = lines.flatMap((FlatMapFunction<String, String>) s -> {
84
            String[] words1 = s.split(" ");
85
            return Arrays.asList(words1).iterator();
86
        });
87
88
        // 将数据转换为key/value键值对
89
        /*
90
         * pairFunction<T,K,V>: T:输入类型;K,V:输出键值对
91
         * 表示输入类型为T,生成的key-value对中的key类型为k,value类型为v,对本例,T=String, K=String, V=Integer(计数)
92
         * 需要重写call方法实现转换
93
         */
94
        //scala.Tuple2<K,V> call(T t)
95
        //Tuple2为scala中的一个对象,call方法的输入参数为T,即输入一个单词s,新的Tuple2对象的key为这个单词,计数为1
96
        JavaPairRDD<String, Integer> ones = words.mapToPair((PairFunction<String, String, Integer>) s -> {
97
            LOGGER.debug("mapToPair call: " + s);
98
            return new Tuple2<>(s, 1);
99
        });
100
101
        // 聚合结果
102
        /*
103
         * 调用reduceByKey方法,按key值进行reduce
104
         *  reduceByKey方法,类似于MR的reduce
105
         *  要求被操作的数据(即下面实例中的ones)是KV键值对形式,该方法会按照key相同的进行聚合,在两两运算
106
         *  若ones有<"one", 1>, <"one", 1>,会根据"one"将相同的pair单词个数进行统计,输入为Integer,输出也为Integer输出<"one", 2>
107
         */
108
        // reduce阶段,key相同的value怎么处理的问题
109
        // 备注:spark也有reduce方法,输入数据是RDD类型就可以,不需要键值对,
110
        // reduce方法会对输入进来的所有数据进行两两运算
111
        JavaPairRDD<String, Integer> counts = ones.reduceByKey((Function2<Integer, Integer, Integer>) (i1, i2) -> {
112
            LOGGER.debug("reduceByKey call:" + i1 + ", " + i2);
113
            return i1 + i2;
114
        });
115
116
        // 结果输出到HDFS,Windows对应的是真实目录,如:D:/logs/spark_test/1555319746196,父目录必须存在,否则不执行
117
        counts.saveAsTextFile(System.getProperty("USER_HOME") + System.lineSeparator()+ System.currentTimeMillis());
118
119
120
        /*
121
         * 结果转化为常见类型输出
122
         * collect方法用于将spark的RDD类型转化为我们熟知的java常见类型
123
         */
124
        List<Tuple2<String, Integer>> output = counts.collect();
125
        for (Tuple2<?, ?> tuple : output) {
126
            LOGGER.debug(tuple._1() + ": " + tuple._2());
127
        }
128
129
        // 直接输出
130
        counts.foreachPartition((VoidFunction<Iterator<Tuple2<String, Integer>>>) tuple2Iterator -> {
131
            while (tuple2Iterator.hasNext()) {
132
                Tuple2<String, Integer> t2 = tuple2Iterator.next();
133
                LOGGER.debug(t2._1() + ": " + t2._2());
134
            }
135
        });
136
        jsc.stop();
137
    }
138
}

+ 115 - 0
comp-example/src/test/java/com/ai/ipu/example/hadoop/HadoopExample.java

1
package com.ai.ipu.example.hadoop;
2
3
import org.apache.hadoop.conf.Configuration;
4
import org.apache.hadoop.fs.BlockLocation;
5
import org.apache.hadoop.fs.FSDataOutputStream;
6
import org.apache.hadoop.fs.FileStatus;
7
import org.apache.hadoop.fs.FileSystem;
8
import org.apache.hadoop.fs.Path;
9
import org.junit.After;
10
import org.junit.Before;
11
import org.junit.Test;
12
13
import java.net.URI;
14
15
/**
16
 * 类描述
17
 *
18
 * @Author: lilb3@asiainfo.com
19
 * @Date: 2019-05-24 17:10
20
 **/
21
public class HadoopExample {
22
    private FileSystem hdfs = null;
23
24
    @Before
25
    public void setBefore() throws Exception {
26
        Configuration configuration = new Configuration();
27
        configuration.set("dfs.client.use.datanode.hostname", "true");
28
        // String hdfsPath = "hdfs://u0:9000";
29
        String hdfsPath = "hdfs://iZm5e5xe1w25avi0io1f5aZ:9000";
30
        hdfs = FileSystem.get(new URI(hdfsPath), configuration);
31
    }
32
33
    @After
34
    public void setAfter() throws Exception {
35
        if (hdfs != null) {
36
            hdfs.close();
37
        }
38
    }
39
40
    @Test
41
    public void mkdir() throws Exception {
42
        String newDir = "/hdfstest";
43
        boolean result = hdfs.mkdirs(new Path(newDir));
44
        if (result) {
45
            System.out.println("Success!");
46
        } else {
47
            System.out.println("Failed!");
48
        }
49
    }
50
51
    @Test
52
    public void createFile() throws Exception {
53
        String filePath = "/hdfstest/touchfile";
54
        FSDataOutputStream create = hdfs.create(new Path(filePath));
55
        create.writeBytes("abcdefghijklmnopqrstuvwxyz1234567890");
56
        create.close();
57
        System.out.println("Finish!");
58
    }
59
60
    @Test
61
    public void copyFromLocalFile() throws Exception {
62
        String fromLinux = "~/aaa.txt";
63
        String os = System.getProperty("os.name");
64
        if(os.toLowerCase().startsWith("win")){
65
            fromLinux = "D:\\logs\\aaa.txt";
66
        }
67
        String toHDFS = "/hdfstest/";
68
        hdfs.copyFromLocalFile(new Path(fromLinux), new Path(toHDFS));
69
        System.out.println("Finish!");
70
    }
71
72
    @Test
73
    public void copyToLocalFile() throws Exception {
74
        String from_HDFS = "/hdfstest/aaa.txt";
75
        String toLinux = "~/bbb.txt";
76
        String os = System.getProperty("os.name");
77
        if(os.toLowerCase().startsWith("win")){
78
            toLinux = "D:\\logs\\bbb.txt";
79
        }
80
        hdfs.copyToLocalFile(false, new Path(from_HDFS), new Path(toLinux));
81
        System.out.println("Finish!");
82
    }
83
84
    @Test
85
    public void listFile() throws Exception {
86
        iteratorListFile(hdfs, new Path("/"));
87
    }
88
89
    @Test
90
    public void locateFile() throws Exception{
91
        Path file = new Path("/hdfstest/aaa.txt");
92
        FileStatus fileStatus = hdfs.getFileStatus(file);
93
        BlockLocation[] location = hdfs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
94
        for (BlockLocation block : location) {
95
            String[] hosts = block.getHosts();
96
            for (String host : hosts) {
97
                System.out.println("block:" +block + " host:"+ host);
98
            }
99
        }
100
    }
101
102
    private static void iteratorListFile(FileSystem hdfs, Path path) throws Exception {
103
        FileStatus[] files = hdfs.listStatus(path);
104
        for (FileStatus file : files) {
105
            if (file.isDirectory()) {
106
                System.out.println(file.getPermission() + " " + file.getOwner()
107
                        + " " + file.getGroup() + " " + file.getPath());
108
                iteratorListFile(hdfs, file.getPath());
109
            } else if (file.isFile()) {
110
                System.out.println(file.getPermission() + " " + file.getOwner()
111
                        + " " + file.getGroup() + " " + file.getPath());
112
            }
113
        }
114
    }
115
}

+ 376 - 5
comp-example/src/test/java/com/ai/ipu/example/hbase/HbaseExample.java

1
package com.ai.ipu.example.hbase;
2

3
public class HbaseExample {
4

5
}
1
package com.ai.ipu.example.hbase;
2
3
import org.apache.hadoop.conf.Configuration;
4
import org.apache.hadoop.hbase.Cell;
5
import org.apache.hadoop.hbase.CellUtil;
6
import org.apache.hadoop.hbase.HColumnDescriptor;
7
import org.apache.hadoop.hbase.HTableDescriptor;
8
import org.apache.hadoop.hbase.TableName;
9
import org.apache.hadoop.hbase.client.Admin;
10
import org.apache.hadoop.hbase.client.Connection;
11
import org.apache.hadoop.hbase.client.ConnectionFactory;
12
import org.apache.hadoop.hbase.client.Delete;
13
import org.apache.hadoop.hbase.client.Get;
14
import org.apache.hadoop.hbase.client.Put;
15
import org.apache.hadoop.hbase.client.Result;
16
import org.apache.hadoop.hbase.client.ResultScanner;
17
import org.apache.hadoop.hbase.client.Scan;
18
import org.apache.hadoop.hbase.client.Table;
19
import org.apache.hadoop.hbase.filter.CompareFilter;
20
import org.apache.hadoop.hbase.filter.FilterList;
21
import org.apache.hadoop.hbase.filter.PrefixFilter;
22
import org.apache.hadoop.hbase.filter.RegexStringComparator;
23
import org.apache.hadoop.hbase.filter.RowFilter;
24
import org.apache.hadoop.hbase.util.Bytes;
25
import org.junit.After;
26
import org.junit.Before;
27
import org.junit.Test;
28
29
import java.util.ArrayList;
30
import java.util.List;
31
32
public class HbaseExample {
33
    private Connection connection = null;
34
35
    private Table table = null;
36
37
    private Admin admin = null;
38
39
    /**
40
     * 初始化配置
41
     *
42
     * @throws Exception 异常
43
     */
44
    @Before
45
    public void setBefore() throws Exception {
46
        Configuration configuration = new Configuration();
47
        System.setProperty("hadoop.home.dir", "/home/mysql/software/hadoop273");
48
        // configuration.set("hbase.rootdir", "hdfs://u0:9000/home/hbase126");
49
        // configuration.set("hbase.zookeeper.quorum", "u0:2181");
50
        configuration.set("hbase.rootdir", "hdfs://iZm5e5xe1w25avi0io1f5aZ:9000/home/hbase126");
51
        configuration.set("hbase.zookeeper.quorum", "iZm5e5xe1w25avi0io1f5aZ:2101,iZm5e5xe1w25avi0io1f5aZ:2102,iZm5e5xe1w25avi0io1f5aZ:2103");
52
        connection = ConnectionFactory.createConnection(configuration);
53
        admin = connection.getAdmin();
54
    }
55
56
    /**
57
     * 关闭连接
58
     *
59
     * @throws Exception 异常
60
     */
61
    @After
62
    public void setAfter() throws Exception {
63
        if (connection != null) connection.close();
64
    }
65
66
    /**
67
     * 创建表
68
     *
69
     * @throws Exception 异常
70
     */
71
    @Test
72
    public void createTable() throws Exception {
73
        String tableName = "school1";
74
        if (admin.tableExists(TableName.valueOf(tableName))) {
75
            System.out.println("表已经存在");
76
        } else {
77
            HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf(tableName));
78
            //添加列族columnFamily ,不必指定列
79
            hTableDescriptor.addFamily(new HColumnDescriptor("address"));
80
            hTableDescriptor.addFamily(new HColumnDescriptor("more"));
81
            hTableDescriptor.addFamily(new HColumnDescriptor("info"));
82
            admin.createTable(hTableDescriptor);
83
            System.out.println("表" + tableName + "创建成功");
84
        }
85
    }
86
87
    /**
88
     * 查看表和表结构
89
     *
90
     * @throws Exception 异常
91
     */
92
    @Test
93
    public void listTable() throws Exception {
94
        HTableDescriptor[] tableList = admin.listTables();
95
        if (tableList.length > 0) {
96
            for (HTableDescriptor ta : tableList) {
97
                System.out.println(ta.getNameAsString());
98
                for (HColumnDescriptor column :
99
                        ta.getColumnFamilies()) {
100
                    System.out.println("\t" + column.getNameAsString());
101
                }
102
            }
103
        }
104
    }
105
106
    /**
107
     * 添加单条记录
108
     *
109
     * @throws Exception 异常
110
     */
111
    @Test
112
    public void putOne() throws Exception {
113
114
        Table table = connection.getTable(TableName.valueOf("school1"));
115
        //创建 put,并制定 put 的Rowkey
116
        Put put = new Put(Bytes.toBytes("wangwu"));
117
        //byte [] family, byte [] qualifier, byte [] value
118
        put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes("20"));
119
        put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("sex"), Bytes.toBytes("男"));
120
        put.addColumn(Bytes.toBytes("address"), Bytes.toBytes("city"), Bytes.toBytes("changsha"));
121
        put.addColumn(Bytes.toBytes("address"), Bytes.toBytes("address"), Bytes.toBytes("yueyang"));
122
        put.addColumn(Bytes.toBytes("more"), Bytes.toBytes("hobby"), Bytes.toBytes("girl"));
123
        put.addColumn(Bytes.toBytes("more"), Bytes.toBytes("good"), Bytes.toBytes("walk"));
124
125
        table.put(put);
126
    }
127
128
    /**
129
     * 添加多条记录
130
     *
131
     * @throws Exception 异常
132
     */
133
    @Test
134
    public void putList() throws Exception {
135
136
        Table table = connection.getTable(TableName.valueOf("school1"));
137
        //创建 put,并制定 put 的Rowkey
138
        Put put = new Put(Bytes.toBytes("zhangsan"));
139
        //byte [] family, byte [] qualifier, byte [] value
140
        put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes("18"));
141
        put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("sex"), Bytes.toBytes("男"));
142
        put.addColumn(Bytes.toBytes("address"), Bytes.toBytes("city"), Bytes.toBytes("beijing"));
143
        put.addColumn(Bytes.toBytes("address"), Bytes.toBytes("address"), Bytes.toBytes("henan"));
144
        put.addColumn(Bytes.toBytes("more"), Bytes.toBytes("hobby"), Bytes.toBytes("girl"));
145
        put.addColumn(Bytes.toBytes("more"), Bytes.toBytes("good"), Bytes.toBytes("basketball"));
146
147
        Put put1 = new Put(Bytes.toBytes("lisi"));
148
        // family ,qualifier, value  顺序不可乱,列族,列,内容
149
        put1.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes("22"));
150
        put1.addColumn(Bytes.toBytes("info"), Bytes.toBytes("sex"), Bytes.toBytes("女"));
151
        put1.addColumn(Bytes.toBytes("address"), Bytes.toBytes("city"), Bytes.toBytes("nanjing"));
152
        put1.addColumn(Bytes.toBytes("address"), Bytes.toBytes("address"), Bytes.toBytes("hebei"));
153
        put1.addColumn(Bytes.toBytes("more"), Bytes.toBytes("hobby"), Bytes.toBytes("boy"));
154
        put1.addColumn(Bytes.toBytes("more"), Bytes.toBytes("good"), Bytes.toBytes("ball"));
155
156
        List<Put> list = new ArrayList<>();
157
        list.add(put);
158
        list.add(put1);
159
160
        table.put(list);
161
    }
162
163
    /**
164
     * 根据Rowkey进行查询
165
     *
166
     * @throws Exception 异常
167
     */
168
    @Test
169
    public void scanByRowKey() throws Exception {
170
        String rowKey = "zhangsan";
171
        table = connection.getTable(TableName.valueOf("school1"));
172
        Get get = new Get(rowKey.getBytes());
173
174
        Result result = table.get(get);
175
        System.out.println("列族\t列名\t内容");
176
        for (Cell cell : result.rawCells()) {
177
            System.out.println(
178
                    Bytes.toString(CellUtil.cloneRow(cell)) + "\t" +  //Rowkey
179
                            Bytes.toString(CellUtil.cloneFamily(cell)) + "\t" + //CF
180
                            Bytes.toString(CellUtil.cloneQualifier(cell)) + "\t" +//qualifier
181
                            Bytes.toString(CellUtil.cloneValue(cell)) //value
182
            );
183
        }
184
    }
185
186
    /**
187
     * 根据Rowkey和列族和列进行查询内容
188
     *
189
     * @throws Exception 异常
190
     */
191
    @Test
192
    public void scanByRowKey1() throws Exception {
193
        String rowKey = "zhangsan";
194
        table = connection.getTable(TableName.valueOf("school1"));
195
        Get get = new Get(rowKey.getBytes());
196
        get.addColumn(Bytes.toBytes("more"), Bytes.toBytes("good"));
197
198
        Result result = table.get(get);
199
        System.out.println("列族rowKey = \"zhangsan\"列族为info,列为age的值是:");
200
        String age = Bytes.toString(result.getValue(Bytes.toBytes("more"), Bytes.toBytes("good")));
201
        System.out.println(age);
202
    }
203
204
    /**
205
     * 全表查询
206
     *
207
     * @throws Exception 异常
208
     */
209
    @Test
210
    public void scanTable() throws Exception {
211
        table = connection.getTable(TableName.valueOf("school1"));
212
        Scan scan = new Scan();
213
        ResultScanner rs = table.getScanner(scan);
214
        for (Result result : rs) {
215
            printResult(result);
216
        }
217
    }
218
219
    /**
220
     * Scan 查询 添加个条件,Scan 对象参数是  Rowkey
221
     *
222
     * @throws Exception 异常
223
     */
224
    @Test
225
    public void scanTable1() throws Exception {
226
        table = connection.getTable(TableName.valueOf("school1"));
227
        Scan scan = new Scan(Bytes.toBytes("wangwu"), Bytes.toBytes("wangwu"));
228
229
        ResultScanner rs = table.getScanner(scan);
230
        for (Result result : rs) {
231
            printResult(result);
232
        }
233
    }
234
235
    /**
236
     * 定义私有方法 用于打印表的信息
237
     *
238
     * @param result 需要打印的结果
239
     */
240
    private void printResult(Result result) {
241
        for (Cell cell :
242
                result.rawCells()) {
243
            System.out.println(
244
                    //Rowkey
245
                    Bytes.toString(CellUtil.cloneRow(cell)) + "\t" +
246
                            //CF
247
                            Bytes.toString(CellUtil.cloneFamily(cell)) + "\t" +
248
                            //qualifier
249
                            Bytes.toString(CellUtil.cloneQualifier(cell)) + "\t" +
250
                            //value
251
                            Bytes.toString(CellUtil.cloneValue(cell))
252
            );
253
        }
254
        System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
255
    }
256
257
258
    /**
259
     * 条件过滤  Filter
260
     * 查询后最名 为  wu的
261
     * Hbase 中数据是按照字典顺序进行排列的
262
     * 不会因为插入的时间不同而不同
263
     *
264
     * @throws Exception 异常
265
     */
266
    @Test
267
    public void rowFilter() throws Exception {
268
        table = connection.getTable(TableName.valueOf("school1"));
269
        String reg = "^*wu";
270
        org.apache.hadoop.hbase.filter.Filter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator(reg));
271
        Scan scan = new Scan();
272
        scan.setFilter(filter);
273
        ResultScanner results = table.getScanner(scan);
274
        for (Result result :                results) {
275
            printResult(result);
276
        }
277
    }
278
279
    /**
280
     * 前缀名过滤
281
     *
282
     * @throws Exception 异常
283
     */
284
    @Test
285
    public void prefixFilter() throws Exception {
286
        table = connection.getTable(TableName.valueOf("school1"));
287
        org.apache.hadoop.hbase.filter.Filter filter = new PrefixFilter(Bytes.toBytes("li"));
288
        Scan scan = new Scan();
289
        scan.setFilter(filter);
290
        ResultScanner results = table.getScanner(scan);
291
        for (Result result :                results) {
292
            printResult(result);
293
        }
294
    }
295
296
    /**
297
     * 多条件过滤  FilterList
298
     * <p>
299
     * MUST_PASS_ALL 全都满足条件
300
     * <p>
301
     * MUST_PASS_ONE  满足一个就可以
302
     *
303
     * @throws Exception 异常
304
     */
305
    @Test
306
    public void filterList() throws Exception {
307
        table = connection.getTable(TableName.valueOf("school1"));
308
        FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE);
309
        org.apache.hadoop.hbase.filter.Filter filter = new PrefixFilter(Bytes.toBytes("li"));
310
        org.apache.hadoop.hbase.filter.Filter filter2 = new PrefixFilter(Bytes.toBytes("zh"));
311
        filterList.addFilter(filter);
312
        filterList.addFilter(filter2);
313
        Scan scan = new Scan();
314
        scan.setFilter(filterList);
315
        ResultScanner results = table.getScanner(scan);
316
        for (Result result :                results) {
317
            printResult(result);
318
        }
319
    }
320
321
    /**
322
     * 删除数据
323
     *
324
     * @throws Exception 异常
325
     */
326
    @Test
327
    public void delete() throws Exception {
328
        table = connection.getTable(TableName.valueOf("school1"));
329
        Delete delete = new Delete(Bytes.toBytes("lisi"));
330
        table.delete(delete);
331
    }
332
333
    /**
334
     * 删除数据
335
     *
336
     * @throws Exception 异常
337
     */
338
    @Test
339
    public void multiDelete() throws Exception {
340
        table = connection.getTable(TableName.valueOf("school1"));
341
        Delete delete = new Delete(Bytes.toBytes("zhangsan"));
342
        //删除当前版本的内容
343
        // delete.addColumn(Bytes.toBytes("more"),Bytes.toBytes("good"));
344
        //删除所有版本
345
        //delete.addColumns(Bytes.toBytes("more"),Bytes.toBytes("good"));
346
347
        /*
348
         * addColumn addColumns 是不一样的 ,记住了,
349
         *
350
         * addColumn 是删除单元格当前最近版本的 内容,
351
         *
352
         * addColumns 是删除单元格所有版本的内容
353
         * */
354
        //删除列族
355
        delete.addFamily(Bytes.toBytes("info"));
356
357
        table.delete(delete);
358
    }
359
360
    /**
361
     * 删除表
362
     *
363
     * @throws Exception 异常
364
     */
365
    @Test
366
    public void dropTable() throws Exception {
367
        table = connection.getTable(TableName.valueOf("school1"));
368
        if (admin.tableExists(TableName.valueOf("school1"))) {
369
            admin.disableTable(TableName.valueOf("school1"));
370
            admin.deleteTable(TableName.valueOf("school1"));
371
            System.out.println("删除成功");
372
        } else {
373
            System.out.println("表不存在");
374
        }
375
    }
376
}