团队对封装组件的代码范例

HbaseExample.java 13KB

    package com.ai.ipu.example.hbase; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.filter.CompareFilter; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.PrefixFilter; import org.apache.hadoop.hbase.filter.RegexStringComparator; import org.apache.hadoop.hbase.filter.RowFilter; import org.apache.hadoop.hbase.util.Bytes; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.util.ArrayList; import java.util.List; /** * Hbase测试 * * @author lilb3@asiainfo.com * @since 2019-07-01 15:36 */ public class HbaseExample { private static final String SPLIT_STRING = "###"; private static final String[] TABLE_COLUMNS = HbaseConfig.getTableCloumns().split(SPLIT_STRING); private static final String[] KEYS = HbaseConfig.getKey().split(SPLIT_STRING); private static final String TABLE_NAME = HbaseConfig.getTableName(); private Connection connection = null; private Table table = null; private Admin admin = null; /** * 初始化配置 * * @throws Exception 异常 */ @Before public void setBefore() throws Exception { Configuration configuration = new Configuration(); System.setProperty("hadoop.home.dir", HbaseConfig.getHadoopHomeDir()); configuration.set("hbase.rootdir", HbaseConfig.getHbaseRootdir()); configuration.set("hbase.zookeeper.quorum", HbaseConfig.getHbaseZookeeperQuorum()); connection = ConnectionFactory.createConnection(configuration); admin = connection.getAdmin(); } /** * 关闭连接 * * @throws Exception 异常 */ @After public void setAfter() throws Exception { if (connection != null) connection.close(); } /** * 创建表 * * @throws Exception 异常 */ @Test public void createTable() throws Exception { if (admin.tableExists(TableName.valueOf(TABLE_NAME))) { System.out.println("表已经存在"); } else { HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf(TABLE_NAME)); //添加列族columnFamily ,不必指定列 String[] tableColumns = HbaseConfig.getTableCloumns().split(SPLIT_STRING); for (String column : tableColumns) { hTableDescriptor.addFamily(new HColumnDescriptor(column)); } admin.createTable(hTableDescriptor); System.out.println("表" + TABLE_NAME + "创建成功"); } } /** * 查看表和表结构 * * @throws Exception 异常 */ @Test public void listTable() throws Exception { HTableDescriptor[] tableList = admin.listTables(); if (tableList.length > 0) { for (HTableDescriptor ta : tableList) { System.out.println(ta.getNameAsString()); for (HColumnDescriptor column : ta.getColumnFamilies()) { System.out.println("\t" + column.getNameAsString()); } } } } /** * 添加单条记录 * * @throws Exception 异常 */ @Test public void putOne() throws Exception { Table table = connection.getTable(TableName.valueOf(TABLE_NAME)); //创建 put,并制定 put 的Rowkey Put put = new Put(Bytes.toBytes("wangwu")); String[] values = HbaseConfig.getWangWu().split(SPLIT_STRING); for (int i = 0; i < TABLE_COLUMNS.length; i++) { put.addColumn(Bytes.toBytes(TABLE_COLUMNS[i]), Bytes.toBytes(KEYS[2 * i]), Bytes.toBytes(values[2 * i])); put.addColumn(Bytes.toBytes(TABLE_COLUMNS[i]), Bytes.toBytes(KEYS[2 * i + 1]), Bytes.toBytes(values[2 * i + 1])); } table.put(put); System.out.println("添加成功。"); } /** * 添加多条记录 * * @throws Exception 异常 */ @Test public void putList() throws Exception { Table table = connection.getTable(TableName.valueOf(TABLE_NAME)); //创建 put,并制定 put 的Rowkey Put put = new Put(Bytes.toBytes("zhangsan")); String[] values = HbaseConfig.getZhangShan().split(SPLIT_STRING); for (int i = 0; i < TABLE_COLUMNS.length; i++) { put.addColumn(Bytes.toBytes(TABLE_COLUMNS[i]), Bytes.toBytes(KEYS[2 * i]), Bytes.toBytes(values[2 * i])); put.addColumn(Bytes.toBytes(TABLE_COLUMNS[i]), Bytes.toBytes(KEYS[2 * i + 1]), Bytes.toBytes(values[2 * i + 1])); } Put put1 = new Put(Bytes.toBytes("lisi")); String[] values1 = HbaseConfig.getLiSi().split(SPLIT_STRING); for (int i = 0; i < TABLE_COLUMNS.length; i++) { put1.addColumn(Bytes.toBytes(TABLE_COLUMNS[i]), Bytes.toBytes(KEYS[2 * i]), Bytes.toBytes(values1[2 * i])); put1.addColumn(Bytes.toBytes(TABLE_COLUMNS[i]), Bytes.toBytes(KEYS[2 * i + 1]), Bytes.toBytes(values1[2 * i + 1])); } List<Put> list = new ArrayList<>(); list.add(put); list.add(put1); table.put(list); System.out.println("添加多条记录成功。"); } /** * 根据Rowkey进行查询 * * @throws Exception 异常 */ @Test public void scanByRowKey() throws Exception { String rowKey = "zhangsan"; table = connection.getTable(TableName.valueOf(TABLE_NAME)); Get get = new Get(rowKey.getBytes()); Result result = table.get(get); System.out.println("列族\t列名\t内容"); for (Cell cell : result.rawCells()) { System.out.println( Bytes.toString(CellUtil.cloneRow(cell)) + "\t" + //Rowkey Bytes.toString(CellUtil.cloneFamily(cell)) + "\t" + //CF Bytes.toString(CellUtil.cloneQualifier(cell)) + "\t" +//qualifier Bytes.toString(CellUtil.cloneValue(cell)) //value ); } } /** * 根据Rowkey和列族和列进行查询内容 * * @throws Exception 异常 */ @Test public void scanByRowKey1() throws Exception { String rowKey = "zhangsan"; table = connection.getTable(TableName.valueOf(TABLE_NAME)); Get get = new Get(rowKey.getBytes()); get.addColumn(Bytes.toBytes(TABLE_COLUMNS[2]), Bytes.toBytes(KEYS[5])); Result result = table.get(get); System.out.println("列族rowKey = \"zhangsan\"列族为" + TABLE_COLUMNS[2] + ",列为" + KEYS[5] + "的值是:"); String value = Bytes.toString(result.getValue(Bytes.toBytes(TABLE_COLUMNS[2]), Bytes.toBytes(KEYS[5]))); System.out.println(value); } /** * 全表查询 * * @throws Exception 异常 */ @Test public void scanTable() throws Exception { table = connection.getTable(TableName.valueOf(TABLE_NAME)); Scan scan = new Scan(); ResultScanner rs = table.getScanner(scan); for (Result result : rs) { printResult(result); } } /** * Scan 查询 添加个条件,Scan 对象参数是 Rowkey * * @throws Exception 异常 */ @Test public void scanTable1() throws Exception { table = connection.getTable(TableName.valueOf(TABLE_NAME)); Scan scan = new Scan(Bytes.toBytes("wangwu"), Bytes.toBytes("wangwu")); ResultScanner rs = table.getScanner(scan); for (Result result : rs) { printResult(result); } } /** * 定义私有方法 用于打印表的信息 * * @param result 需要打印的结果 */ private void printResult(Result result) { for (Cell cell : result.rawCells()) { System.out.println( //Rowkey Bytes.toString(CellUtil.cloneRow(cell)) + "\t" + //CF Bytes.toString(CellUtil.cloneFamily(cell)) + "\t" + //qualifier Bytes.toString(CellUtil.cloneQualifier(cell)) + "\t" + //value Bytes.toString(CellUtil.cloneValue(cell)) ); } System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~"); } /** * 条件过滤 Filter * 查询后最名 为 wu的 * Hbase 中数据是按照字典顺序进行排列的 * 不会因为插入的时间不同而不同 * * @throws Exception 异常 */ @Test public void rowFilter() throws Exception { table = connection.getTable(TableName.valueOf(TABLE_NAME)); String reg = "^*wu"; org.apache.hadoop.hbase.filter.Filter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator(reg)); Scan scan = new Scan(); scan.setFilter(filter); ResultScanner results = table.getScanner(scan); for (Result result : results) { printResult(result); } } /** * 前缀名过滤 * * @throws Exception 异常 */ @Test public void prefixFilter() throws Exception { table = connection.getTable(TableName.valueOf(TABLE_NAME)); org.apache.hadoop.hbase.filter.Filter filter = new PrefixFilter(Bytes.toBytes("li")); Scan scan = new Scan(); scan.setFilter(filter); ResultScanner results = table.getScanner(scan); for (Result result : results) { printResult(result); } } /** * 多条件过滤 FilterList * <p> * MUST_PASS_ALL 全都满足条件 * <p> * MUST_PASS_ONE 满足一个就可以 * * @throws Exception 异常 */ @Test public void filterList() throws Exception { table = connection.getTable(TableName.valueOf(TABLE_NAME)); FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE); org.apache.hadoop.hbase.filter.Filter filter = new PrefixFilter(Bytes.toBytes("li")); org.apache.hadoop.hbase.filter.Filter filter2 = new PrefixFilter(Bytes.toBytes("zh")); filterList.addFilter(filter); filterList.addFilter(filter2); Scan scan = new Scan(); scan.setFilter(filterList); ResultScanner results = table.getScanner(scan); for (Result result : results) { printResult(result); } } /** * 删除数据 * * @throws Exception 异常 */ @Test public void delete() throws Exception { table = connection.getTable(TableName.valueOf(TABLE_NAME)); Delete delete = new Delete(Bytes.toBytes("wangwu")); table.delete(delete); System.out.println("删除成功。"); } /** * 删除数据 * * @throws Exception 异常 */ @Test public void multiDelete() throws Exception { table = connection.getTable(TableName.valueOf(TABLE_NAME)); Delete delete = new Delete(Bytes.toBytes("zhangsan")); //删除当前版本的内容 // delete.addColumn(Bytes.toBytes("more"),Bytes.toBytes("good")); //删除所有版本 //delete.addColumns(Bytes.toBytes("more"),Bytes.toBytes("good")); /* * addColumn addColumns 是不一样的 ,记住了, * * addColumn 是删除单元格当前最近版本的 内容, * * addColumns 是删除单元格所有版本的内容 * */ //删除列族 delete.addFamily(Bytes.toBytes("info")); table.delete(delete); System.out.println("删除列成功。"); } /** * 删除表 * * @throws Exception 异常 */ @Test public void dropTable() throws Exception { table = connection.getTable(TableName.valueOf(TABLE_NAME)); if (admin.tableExists(TableName.valueOf(TABLE_NAME))) { admin.disableTable(TableName.valueOf(TABLE_NAME)); admin.deleteTable(TableName.valueOf(TABLE_NAME)); System.out.println("删除成功"); } else { System.out.println("表不存在"); } } }