|
package com.ai.ipu.example.hbase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.filter.RegexStringComparator;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
/**
* Hbase测试
*
* @author lilb3@asiainfo.com
* @since 2019-07-01 15:36
*/
public class HbaseExample {
private static final String SPLIT_STRING = "###";
private static final String[] TABLE_COLUMNS = HbaseConfig.getTableCloumns().split(SPLIT_STRING);
private static final String[] KEYS = HbaseConfig.getKey().split(SPLIT_STRING);
private static final String TABLE_NAME = HbaseConfig.getTableName();
private Connection connection = null;
private Table table = null;
private Admin admin = null;
/**
* 初始化配置
*
* @throws Exception 异常
*/
@Before
public void setBefore() throws Exception {
Configuration configuration = new Configuration();
System.setProperty("hadoop.home.dir", HbaseConfig.getHadoopHomeDir());
configuration.set("hbase.rootdir", HbaseConfig.getHbaseRootdir());
configuration.set("hbase.zookeeper.quorum", HbaseConfig.getHbaseZookeeperQuorum());
connection = ConnectionFactory.createConnection(configuration);
admin = connection.getAdmin();
}
/**
* 关闭连接
*
* @throws Exception 异常
*/
@After
public void setAfter() throws Exception {
if (connection != null) connection.close();
}
/**
* 创建表
*
* @throws Exception 异常
*/
@Test
public void createTable() throws Exception {
if (admin.tableExists(TableName.valueOf(TABLE_NAME))) {
System.out.println("表已经存在");
} else {
HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf(TABLE_NAME));
//添加列族columnFamily ,不必指定列
String[] tableColumns = HbaseConfig.getTableCloumns().split(SPLIT_STRING);
for (String column : tableColumns) {
hTableDescriptor.addFamily(new HColumnDescriptor(column));
}
admin.createTable(hTableDescriptor);
System.out.println("表" + TABLE_NAME + "创建成功");
}
}
/**
* 查看表和表结构
*
* @throws Exception 异常
*/
@Test
public void listTable() throws Exception {
HTableDescriptor[] tableList = admin.listTables();
if (tableList.length > 0) {
for (HTableDescriptor ta : tableList) {
System.out.println(ta.getNameAsString());
for (HColumnDescriptor column :
ta.getColumnFamilies()) {
System.out.println("\t" + column.getNameAsString());
}
}
}
}
/**
* 添加单条记录
*
* @throws Exception 异常
*/
@Test
public void putOne() throws Exception {
Table table = connection.getTable(TableName.valueOf(TABLE_NAME));
//创建 put,并制定 put 的Rowkey
Put put = new Put(Bytes.toBytes("wangwu"));
String[] values = HbaseConfig.getWangWu().split(SPLIT_STRING);
for (int i = 0; i < TABLE_COLUMNS.length; i++) {
put.addColumn(Bytes.toBytes(TABLE_COLUMNS[i]), Bytes.toBytes(KEYS[2 * i]), Bytes.toBytes(values[2 * i]));
put.addColumn(Bytes.toBytes(TABLE_COLUMNS[i]), Bytes.toBytes(KEYS[2 * i + 1]), Bytes.toBytes(values[2 * i + 1]));
}
table.put(put);
System.out.println("添加成功。");
}
/**
* 添加多条记录
*
* @throws Exception 异常
*/
@Test
public void putList() throws Exception {
Table table = connection.getTable(TableName.valueOf(TABLE_NAME));
//创建 put,并制定 put 的Rowkey
Put put = new Put(Bytes.toBytes("zhangsan"));
String[] values = HbaseConfig.getZhangShan().split(SPLIT_STRING);
for (int i = 0; i < TABLE_COLUMNS.length; i++) {
put.addColumn(Bytes.toBytes(TABLE_COLUMNS[i]), Bytes.toBytes(KEYS[2 * i]), Bytes.toBytes(values[2 * i]));
put.addColumn(Bytes.toBytes(TABLE_COLUMNS[i]), Bytes.toBytes(KEYS[2 * i + 1]), Bytes.toBytes(values[2 * i + 1]));
}
Put put1 = new Put(Bytes.toBytes("lisi"));
String[] values1 = HbaseConfig.getLiSi().split(SPLIT_STRING);
for (int i = 0; i < TABLE_COLUMNS.length; i++) {
put1.addColumn(Bytes.toBytes(TABLE_COLUMNS[i]), Bytes.toBytes(KEYS[2 * i]), Bytes.toBytes(values1[2 * i]));
put1.addColumn(Bytes.toBytes(TABLE_COLUMNS[i]), Bytes.toBytes(KEYS[2 * i + 1]), Bytes.toBytes(values1[2 * i + 1]));
}
List<Put> list = new ArrayList<>();
list.add(put);
list.add(put1);
table.put(list);
System.out.println("添加多条记录成功。");
}
/**
* 根据Rowkey进行查询
*
* @throws Exception 异常
*/
@Test
public void scanByRowKey() throws Exception {
String rowKey = "zhangsan";
table = connection.getTable(TableName.valueOf(TABLE_NAME));
Get get = new Get(rowKey.getBytes());
Result result = table.get(get);
System.out.println("列族\t列名\t内容");
for (Cell cell : result.rawCells()) {
System.out.println(
Bytes.toString(CellUtil.cloneRow(cell)) + "\t" + //Rowkey
Bytes.toString(CellUtil.cloneFamily(cell)) + "\t" + //CF
Bytes.toString(CellUtil.cloneQualifier(cell)) + "\t" +//qualifier
Bytes.toString(CellUtil.cloneValue(cell)) //value
);
}
}
/**
* 根据Rowkey和列族和列进行查询内容
*
* @throws Exception 异常
*/
@Test
public void scanByRowKey1() throws Exception {
String rowKey = "zhangsan";
table = connection.getTable(TableName.valueOf(TABLE_NAME));
Get get = new Get(rowKey.getBytes());
get.addColumn(Bytes.toBytes(TABLE_COLUMNS[2]), Bytes.toBytes(KEYS[5]));
Result result = table.get(get);
System.out.println("列族rowKey = \"zhangsan\"列族为" + TABLE_COLUMNS[2] + ",列为" + KEYS[5] + "的值是:");
String value = Bytes.toString(result.getValue(Bytes.toBytes(TABLE_COLUMNS[2]), Bytes.toBytes(KEYS[5])));
System.out.println(value);
}
/**
* 全表查询
*
* @throws Exception 异常
*/
@Test
public void scanTable() throws Exception {
table = connection.getTable(TableName.valueOf(TABLE_NAME));
Scan scan = new Scan();
ResultScanner rs = table.getScanner(scan);
for (Result result : rs) {
printResult(result);
}
}
/**
* Scan 查询 添加个条件,Scan 对象参数是 Rowkey
*
* @throws Exception 异常
*/
@Test
public void scanTable1() throws Exception {
table = connection.getTable(TableName.valueOf(TABLE_NAME));
Scan scan = new Scan(Bytes.toBytes("wangwu"), Bytes.toBytes("wangwu"));
ResultScanner rs = table.getScanner(scan);
for (Result result : rs) {
printResult(result);
}
}
/**
* 定义私有方法 用于打印表的信息
*
* @param result 需要打印的结果
*/
private void printResult(Result result) {
for (Cell cell :
result.rawCells()) {
System.out.println(
//Rowkey
Bytes.toString(CellUtil.cloneRow(cell)) + "\t" +
//CF
Bytes.toString(CellUtil.cloneFamily(cell)) + "\t" +
//qualifier
Bytes.toString(CellUtil.cloneQualifier(cell)) + "\t" +
//value
Bytes.toString(CellUtil.cloneValue(cell))
);
}
System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
}
/**
* 条件过滤 Filter
* 查询后最名 为 wu的
* Hbase 中数据是按照字典顺序进行排列的
* 不会因为插入的时间不同而不同
*
* @throws Exception 异常
*/
@Test
public void rowFilter() throws Exception {
table = connection.getTable(TableName.valueOf(TABLE_NAME));
String reg = "^*wu";
org.apache.hadoop.hbase.filter.Filter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator(reg));
Scan scan = new Scan();
scan.setFilter(filter);
ResultScanner results = table.getScanner(scan);
for (Result result : results) {
printResult(result);
}
}
/**
* 前缀名过滤
*
* @throws Exception 异常
*/
@Test
public void prefixFilter() throws Exception {
table = connection.getTable(TableName.valueOf(TABLE_NAME));
org.apache.hadoop.hbase.filter.Filter filter = new PrefixFilter(Bytes.toBytes("li"));
Scan scan = new Scan();
scan.setFilter(filter);
ResultScanner results = table.getScanner(scan);
for (Result result : results) {
printResult(result);
}
}
/**
* 多条件过滤 FilterList
* <p>
* MUST_PASS_ALL 全都满足条件
* <p>
* MUST_PASS_ONE 满足一个就可以
*
* @throws Exception 异常
*/
@Test
public void filterList() throws Exception {
table = connection.getTable(TableName.valueOf(TABLE_NAME));
FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE);
org.apache.hadoop.hbase.filter.Filter filter = new PrefixFilter(Bytes.toBytes("li"));
org.apache.hadoop.hbase.filter.Filter filter2 = new PrefixFilter(Bytes.toBytes("zh"));
filterList.addFilter(filter);
filterList.addFilter(filter2);
Scan scan = new Scan();
scan.setFilter(filterList);
ResultScanner results = table.getScanner(scan);
for (Result result : results) {
printResult(result);
}
}
/**
* 删除数据
*
* @throws Exception 异常
*/
@Test
public void delete() throws Exception {
table = connection.getTable(TableName.valueOf(TABLE_NAME));
Delete delete = new Delete(Bytes.toBytes("wangwu"));
table.delete(delete);
System.out.println("删除成功。");
}
/**
* 删除数据
*
* @throws Exception 异常
*/
@Test
public void multiDelete() throws Exception {
table = connection.getTable(TableName.valueOf(TABLE_NAME));
Delete delete = new Delete(Bytes.toBytes("zhangsan"));
//删除当前版本的内容
// delete.addColumn(Bytes.toBytes("more"),Bytes.toBytes("good"));
//删除所有版本
//delete.addColumns(Bytes.toBytes("more"),Bytes.toBytes("good"));
/*
* addColumn addColumns 是不一样的 ,记住了,
*
* addColumn 是删除单元格当前最近版本的 内容,
*
* addColumns 是删除单元格所有版本的内容
* */
//删除列族
delete.addFamily(Bytes.toBytes("info"));
table.delete(delete);
System.out.println("删除列成功。");
}
/**
* 删除表
*
* @throws Exception 异常
*/
@Test
public void dropTable() throws Exception {
table = connection.getTable(TableName.valueOf(TABLE_NAME));
if (admin.tableExists(TableName.valueOf(TABLE_NAME))) {
admin.disableTable(TableName.valueOf(TABLE_NAME));
admin.deleteTable(TableName.valueOf(TABLE_NAME));
System.out.println("删除成功");
} else {
System.out.println("表不存在");
}
}
}
|