💎一站式轻松地调用各大LLM模型接口,支持GPT4、智谱、星火、月之暗面及文生图 广告
[TOC] # 依赖的maven ~~~ <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.2.1</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase</artifactId> <version>1.4.3</version> <type>pom</type> </dependency> ~~~ # CURD代码 注意: 全表扫描不是对某个时间点表的快照的扫描.如果扫描已经开始,但是在行R被扫描器对象读出之前,行R被改变了,那么扫描器读出行R更新后的版本.但是扫描器读出的数据是一致的,得到R更新后的完整行 ## 前置操作 ~~~ package com.hbase; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.hadoop.hbase.util.Bytes; import org.junit.Before; import org.junit.Test; import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; public class HbaseDemo { private Configuration conf = null; private Connection conn = null; @Before public void init() throws IOException { //构建个配置 conf = HBaseConfiguration.create(); //对于hbase的客户端来说,只需要知道hbase所使用的zookeeper集群就可以了 //因为hbase的客户端找hbase读写数据完全不用经过hmaster conf.set("hbase.zookeeper.quorum", "master:2181,slave1:2181,slave2:2181"); conn = ConnectionFactory.createConnection(conf); } } ~~~ 添加测试数据 ~~~ //添加数据 @Test public void testPut() throws IOException { Table table = conn.getTable(TableName.valueOf("t_user_info")); ArrayList<Put> puts = new ArrayList<Put>(); //构建一个put对象(kv),指定行键 Put put01 = new Put(Bytes.toBytes("user001")); put01.addColumn(Bytes.toBytes("base_info"), Bytes.toBytes("username"), Bytes.toBytes("zhangsan")); Put put02 = new Put("user001".getBytes()); put02.addColumn(Bytes.toBytes("base_info"), Bytes.toBytes("password"), Bytes.toBytes("123456")); Put put03 = new Put("user002".getBytes()); put03.addColumn(Bytes.toBytes("base_info"), Bytes.toBytes("username"), Bytes.toBytes("lisi")); put03.addColumn(Bytes.toBytes("extra_info"), Bytes.toBytes("married"), Bytes.toBytes("false")); Put put04 = new Put("zhang_sh_01".getBytes()); put04.addColumn(Bytes.toBytes("base_info"), Bytes.toBytes("username"), Bytes.toBytes("zhang01")); put04.addColumn(Bytes.toBytes("extra_info"), Bytes.toBytes("married"), Bytes.toBytes("false")); Put put05 = new Put("zhang_sh_02".getBytes()); put05.addColumn(Bytes.toBytes("base_info"), Bytes.toBytes("username"), Bytes.toBytes("zhang02")); put05.addColumn(Bytes.toBytes("extra_info"), Bytes.toBytes("married"), Bytes.toBytes("false")); Put put06 = new Put("liu_sh_01".getBytes()); put06.addColumn(Bytes.toBytes("base_info"), Bytes.toBytes("username"), Bytes.toBytes("liu01")); put06.addColumn(Bytes.toBytes("extra_info"), Bytes.toBytes("married"), Bytes.toBytes("false")); Put put07 = new Put("zhang_bj_01".getBytes()); put07.addColumn(Bytes.toBytes("base_info"), Bytes.toBytes("username"), Bytes.toBytes("zhang03")); put07.addColumn(Bytes.toBytes("extra_info"), Bytes.toBytes("married"), Bytes.toBytes("false")); Put put08 = new Put("zhang_bj_01".getBytes()); put08.addColumn(Bytes.toBytes("base_info"), Bytes.toBytes("username"), Bytes.toBytes("zhang04")); put08.addColumn(Bytes.toBytes("extra_info"), Bytes.toBytes("married"), Bytes.toBytes("false")); puts.add(put01); puts.add(put02); puts.add(put03); puts.add(put04); puts.add(put05); puts.add(put06); puts.add(put07); puts.add(put08); table.put(puts); table.close(); conn.close(); } ~~~ ## 表是否存在 ~~~ @Test public boolean testExists(String tableName) throws IOException { //老API //HBaseAdmin admin = new HBaseAdmin(conf); //新API Connection connection = ConnectionFactory.createConnection(conf); Admin admin = connection.getAdmin(); return admin.tableExists(TableName.valueOf(tableName)); } ~~~ ## 创建表 这是不需要命名空间的 ~~~ //建表 @Test public void testCreate() throws IOException { //获取一个表管理器 Admin admin = conn.getAdmin(); //构造一个表描述器,并指定表名 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("t_user_info")); //构造一个列族描述器,并指定列族名 HColumnDescriptor hcd1 = new HColumnDescriptor("base_info"); //为该列族设定一个布隆过滤器类型参数/版本数量 hcd1.setBloomFilterType(BloomType.ROW).setVersions(1, 3); //构造第二个列族描述器,并指定列族名 HColumnDescriptor hcd2 = new HColumnDescriptor("extra_info"); //为该列族设定一个布隆过滤器类型参数/版本数量 hcd2.setBloomFilterType(BloomType.ROW).setVersions(1, 3); //将列族描述器添加到表描述器中 htd.addFamily(hcd1).addFamily(hcd2); admin.createTable(htd); admin.close(); conn.close(); } ~~~ ## 命名空间管理 命名空间可以被创建、移除、修改。 表和命名空间的隶属关系在在创建表时决定,通过以下格式指定: `<namespace>:<table>` Example:hbase shell中创建命名空间、创建命名空间中的表、移除命名空间、修改命名空间 ~~~ #Create a namespace create_namespace 'my_ns' ~~~ ~~~ #create my_table in my_ns namespace create 'my_ns:my_table', 'fam' ~~~ ~~~ #drop namespace drop_namespace 'my_ns' ~~~ ~~~ #alter namespace alter_namespace 'my_ns', {METHOD => 'set', 'PROPERTY_NAME' => 'PROPERTY_VALUE'} ~~~ 预定义的命名空间 有两个系统内置的预定义命名空间: * hbase:系统命名空间,用于包含hbase的内部表 * default:所有未指定命名空间的表都自动进入该命名空间 Example:指定命名空间和默认命名空间 ~~~ #namespace=foo and table qualifier=bar create 'foo:bar', 'fam' ~~~ ~~~ #namespace=default and table qualifier=bar create 'bar', 'fam' ~~~ **代码** ~~~ Configuration conf = HBaseConfiguration.create(); HBaseAdmin admin = new HBaseAdmin(conf); //create namespace named "my_ns" admin.createNamespace(NamespaceDescriptor.create("my_ns").build()); //create tableDesc, with namespace name "my_ns" and table name "mytable" HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf("my_ns:mytable")); tableDesc.setDurability(Durability.SYNC_WAL); //add a column family "mycf" HColumnDescriptor hcd = new HColumnDescriptor("mycf"); tableDesc.addFamily(hcd); admin.createTable(tableDesc); admin.close(); ~~~ **关键知识点** 1. 必须将HBase集群的hbase-site.xml文件添加进工程的classpath中,或者通过Configuration对象设置相关属性,否则程序获取不到集群相关信息,也就无法找到集群,运行程序时会报错; 2. HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf("my_ns:mytable"))代码是描述表mytable,并将mytable放到了my_ns命名空间中,前提是该命名空间已存在,如果指定的是不存在命名空间,则会报错org.apache.hadoop.hbase.NamespaceNotFoundException; 3. 命名空间一般在建模阶段通过命令行创建,在java代码中通过admin.createNamespace(NamespaceDescriptor.create("my_ns").build())创建的机会不多; 4. **创建HBaseAdmin对象时就已经建立了客户端程序与HBase集群的connection**,所以在程序执行完成后,务必通过admin.close()关闭connection; 5. 可以**通过HTableDescriptor对象设置表的特性,比如:通过tableDesc.setMaxFileSize(512)设置一个region中的store文件的最大size,当一个region中的最大store文件达到这个size时,region就开始分裂;通过tableDesc.setMemStoreFlushSize(512)设置region内存中的memstore的最大值**,当memstore达到这个值时,开始往磁盘中刷数据。更多特性请自行查阅官网API; 6. 可以通过HColumnDescriptor对象设置列族的特性,比如:**通过hcd.setTimeToLive(5184000)设置数据保存的最长时间;通过hcd.setInMemory(true)设置数据保存在内存中以提高响应速度;通过 hcd.setMaxVersions(10)设置数据保存的最大版本数;通过hcd.setMinVersions(5)设置数据保存的最小版本数(配合TimeToLive使用)**。更多特性请自行查阅官网API; 7. 数据的版本数只能通过HColumnDescriptor对象设置,不能通过HTableDescriptor对象设置; 8. 由于HBase的数据是先写入内存,数据累计达到内存阀值时才往磁盘中flush数据,所以,如果在数据还没有flush进硬盘时,regionserver down掉了,内存中的数据将丢失。要想解决这个场景的问题就需要用到WAL(Write-Ahead-Log),tableDesc.setDurability(Durability.SYNC_WAL)就是设置写WAL日志的级别,示例中设置的是同步写WAL,该方式安全性较高,但无疑会一定程度影响性能,请根据具体场景选择使用; 9. setDurability(Durability d)方法可以在相关的三个对象中使用,分别是:HTableDescriptor,Delete,Put(其中Delete和Put的该方法都是继承自父类org.apache.hadoop.hbase.client.Mutation)。分别针对表、插入操作、删除操作设定WAL日志写入级别。需要注意的是,D**elete和Put并不会继承Table的Durability级别(已实测验证)**。Durability是一个枚举变量,可选值参见4.2节。如果不通过该方法指定WAL日志级别,则为默认USE_DEFAULT级别。 ## 删除表 删除表没创建表那么多学问,直接上代码 ~~~ Configuration conf = HBaseConfiguration.create(); HBaseAdmin admin = new HBaseAdmin(conf); String tablename = "my_ns:mytable"; if(admin.tableExists(tablename)) { try { if (! admin.isTableDisabled(TableName.valueOf(tableName))) { admin.disableTable(tablename); } admin.deleteTable(tablename); } catch (Exception e) { // TODO: handle exception e.printStackTrace(); } } admin.close(); ~~~ 删除表前必须先disable表 ## 删除表中的数据 Delete类用于删除表中的一行数据,通过HTable.delete来执行该动作。 在执行Delete操作时,HBase并不会立即删除数据,而是对需要删除的数据打上一个“墓碑”标记,直到当Storefile合并时,再清除这些被标记上“墓碑”的数据。 如果希望删除整行,用行键来初始化一个Delete对象即可。如果希望进一步定义删除的具体内容,可以使用以下这些Delete对象的方法: * 为了删除指定的列族,可以使用deleteFamily * 为了删除指定列的多个版本,可以使用deleteColumns * 为了删除指定列的指定版本,可以使用deleteColumn,这样的话就只会删除版本号(时间戳)与指定版本相同的列。如果不指定时间戳,默认只删除最新的版本 **构造函数** 1. 指定要删除的行键 ~~~ Delete(byte[] row) ~~~ 删除行键指定行的数据。 如果没有进一步的操作,使用该构造函数将删除行键指定的行中**所有列族中所有列的所有版本!** 2. 指定要删除的行键和时间戳 ~~~ Delete(byte[] row, long timestamp) ~~~ 删除行键和时间戳共同确定行的数据。 如果没有进一步的操作,使用该构造函数将删除行键指定的行中,所有列族中所有列的**时间戳小于等于指定时间戳的数据版本**。 注意:该时间戳仅仅和删除行有关,如果需要进一步指定列族或者列,你必须分别为它们指定时间戳。 3. 给定一个字符串,目标行键的偏移,截取的长度 ~~~ Delete(byte[] rowArray, int rowOffset, int rowLength) ~~~ 4. 给定一个字符串,目标行键的偏移,截取的长度,时间戳 ~~~ Delete(byte[] rowArray, int rowOffset, int rowLength, long ts) ~~~ **常用方法** * `Delete deleteColumn(byte[] family, byte[] qualifier)` 删除指定列的**最新版本**的数据。 * `Delete deleteColumns(byte[] family, byte[] qualifier) ` 删除指定列的**所有版本**的数据。 * `Delete deleteColumn(byte[] family, byte[] qualifier, long timestamp)` 删除指定列的**指定版本**的数据。 * `Delete deleteColumns(byte[] family, byte[] qualifier, long timestamp)` 删除指定列的,时间戳**小于等于给定时间戳**的**所有**版本的数据。 * `Delete deleteFamily(byte[] family)` 删除指定列族的所有列的**所有**版本数据。 * `Delete deleteFamily(byte[] family, long timestamp)` 删除指定列族的所有列中**时间戳小于等于指定时间戳**的所有数据。 * `Delete deleteFamilyVersion(byte[] family, long timestamp)` 删除指定列族中所有**列的时间戳等于指定时间戳**的版本数据。 * `void setTimestamp(long timestamp)` 为Delete对象设置时间戳。 **实例代码** **删除整行的所有列族、所有行、所有版本** ~~~ Configuration conf = HBaseConfiguration.create(); HTable table = new HTable(conf, "rd_ns:leetable"); Delete delete = new Delete(Bytes.toBytes("000")); table.delete(delete); table.close(); ~~~ **删除指定列的最新版本** ~~~ Configuration conf = HBaseConfiguration.create(); HTable table = new HTable(conf, "rd_ns:leetable"); Delete delete = new Delete(Bytes.toBytes("100003")); delete.deleteColumn(Bytes.toBytes("info"), Bytes.toBytes("address")); table.delete(delete); table.close(); ~~~ **删除指定列的所有版本** 接以上场景,执行以下代码: ~~~ Configuration conf = HBaseConfiguration.create(); HTable table = new HTable(conf, "rd_ns:leetable"); Delete delete = new Delete(Bytes.toBytes("100003")); delete.deleteColumns(Bytes.toBytes("info"), Bytes.toBytes("address")); table.delete(delete); table.close(); ~~~ **删除指定列族中所有列的时间戳等于指定时间戳的版本数据** ~~~ Configuration conf = HBaseConfiguration.create(); HTable table = new HTable(conf, "rd_ns:leetable"); Delete delete = new Delete(Bytes.toBytes("100003")); delete.deleteFamilyVersion(Bytes.toBytes("info"), 1405390959464L); table.delete(delete); table.close(); ~~~ ## 修改表 修改现有列族的属性 ~~~ @Test public void testModify() throws IOException { Admin admin = conn.getAdmin(); // admin.disableTable(TableName.valueOf("t_user_info")); // 修改已有的ColumnFamily HTableDescriptor table = admin.getTableDescriptor(TableName.valueOf("t_user_info")); HColumnDescriptor f2 = table.getFamily("extra_info".getBytes()); //设置布隆过滤器 f2.setBloomFilterType(BloomType.ROWCOL); //设置版本 f2.setVersions(1, 5); // 添加新的ColumnFamily table.addFamily(new HColumnDescriptor("other_info")); //将修改后的描述对象应用到目标表 admin.modifyTable(TableName.valueOf("t_user_info"), table); admin.close(); conn.close(); } ~~~ 修改表,删除三个列族,新增一个列族 ~~~ Configuration conf = HBaseConfiguration.create(); HBaseAdmin admin = new HBaseAdmin(conf); String tablename = "rd_ns:itable"; if(admin.tableExists(tablename)) { try { admin.disableTable(tablename); //get the TableDescriptor of target table HTableDescriptor newtd = admin.getTableDescriptor(Bytes.toBytes("rd_ns:itable")); //remove 3 useless column families newtd.removeFamily(Bytes.toBytes("note")); newtd.removeFamily(Bytes.toBytes("newcf")); newtd.removeFamily(Bytes.toBytes("sysinfo")); //create HColumnDescriptor for new column family HColumnDescriptor newhcd = new HColumnDescriptor("action_log"); newhcd.setMaxVersions(10); newhcd.setKeepDeletedCells(true); //add the new column family(HColumnDescriptor) to HTableDescriptor newtd.addFamily(newhcd); //modify target table struture admin.modifyTable(Bytes.toBytes("rd_ns:itable"),newtd); admin.enableTable(tablename); } catch (Exception e) { e.printStackTrace(); } } admin.close(); ~~~ 逻辑很简单: 1. 通过admin.getTableDescriptor(Bytes.toBytes("rd_ns:itable"))取得目标表的描述对象,应该就是取得指向该对象的指针了; 2. 修改目标表描述对象; 3. 通过admin.modifyTable(Bytes.toBytes("rd_ns:itable"),newtd)将修改后的描述对象应用到目标表。 ## 添加数据 **新增、更新数据Put** **常用构造函数** 1. 指定行键 ~~~ public Put(byte[] row) ~~~ 参数:row 行键 2. 指定行键和时间戳 ~~~ public Put(byte[] row, long ts) ~~~ 参数:row 行键,ts 时间戳 3. 从目标字符串中提取子串,作为行键 ~~~ Put(byte[] rowArray, int rowOffset, int rowLength) ~~~ 4. 从目标字符串中提取子串,作为行键,并加上时间戳 ~~~ Put(byte[] rowArray, int rowOffset, int rowLength, long ts) ~~~ **常用方法** 1. 指定列族、限定符,添加值 ~~~ add(byte[] family, byte[] qualifier, byte[] value) ~~~ 2. 指定列族、限定符、时间戳,添加值 ~~~ add(byte[] family, byte[] qualifier, long ts, byte[] value) ~~~ 3. 设置写WAL(Write-Ahead-Log)的级别 ~~~ public void setDurability(Durability d) ~~~ 参数是一个枚举值,可以有以下几种选择: * ASYNC_WAL : 当数据变动时,异步写WAL日志 * SYNC_WAL : 当数据变动时,同步写WAL日志 * FSYNC_WAL : 当数据变动时,同步写WAL日志,并且,强制将数据写入磁盘 * SKIP_WAL : 不写WAL日志 * USE_DEFAULT : 使用HBase全局默认的WAL写入级别,即SYNC_WAL **实例代码** **插入行** ~~~ Configuration conf = HBaseConfiguration.create(); HTable table = new HTable(conf, "rd_ns:leetable"); Put put = new Put(Bytes.toBytes("100001")); put.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("lion")); put.add(Bytes.toBytes("info"), Bytes.toBytes("address"), Bytes.toBytes("shangdi")); put.add(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes("30")); put.setDurability(Durability.SYNC_WAL); table.put(put); table.close(); ~~~ **更新行** ~~~ Configuration conf = HBaseConfiguration.create(); HTable table = new HTable(conf, "rd_ns:leetable"); Put put = new Put(Bytes.toBytes("100001")); put.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("lee")); put.add(Bytes.toBytes("info"), Bytes.toBytes("address"), Bytes.toBytes("longze")); put.add(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes("31")); put.setDurability(Durability.SYNC_WAL); table.put(put); table.close(); ~~~ 注意: 1. **Put的构造函数都需要指定行键,如果是全新的行键,则新增一行;如果是已有的行键,则更新现有行** 2. **创建Put对象及put.add过程都是在构建一行的数据,创建Put对象时相当于创建了行对象,add的过程就是往目标行里添加cell,直到table.put才将数据插入表格**; 3. 以上代码创建Put对象用的是构造函数1,也可用构造函数2,第二个参数是时间戳; 4. Put还有别的构造函数,请查阅官网API。 **从目标字符串中提取子串,作为行键,构建Put** ~~~ Configuration conf = HBaseConfiguration.create(); HTable table = new HTable(conf, "rd_ns:leetable"); Put put = new Put(Bytes.toBytes("100001_100002"),7,6); put.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("show")); put.add(Bytes.toBytes("info"), Bytes.toBytes("address"), Bytes.toBytes("caofang")); put.add(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes("30")); table.put(put); table.close(); ~~~ 注意,关于:Put put = new Put(Bytes.toBytes("100001_100002"),7,6) **第二个参数是偏移量,也就是行键从第一个参数的第几个字符开始截取;** **第三个参数是截取长度;** **这个代码实际是从 100001_100002 中截取了100002子串作为目标行的行键** ## 读取数据 读取,get一次读取一行 ~~~ @Test public void testGet() throws IOException { Table table = conn.getTable(TableName.valueOf("t_user_info")); //构造一个get查询对象.指定要get的是那一行 Get get = new Get("user001".getBytes()); Result result = table.get(get); CellScanner cellScanner = result.cellScanner(); //迭代 while (cellScanner.advance()) { Cell current = cellScanner.current(); //列族名 byte[] familyArray = current.getFamilyArray(); //列标识符的名称 byte[] qualifierArray = current.getQualifierArray(); //具体的值 byte[] valueArray = current.getValueArray(); //获取有用字符 System.out.printf(new String(familyArray, current.getFamilyOffset(), current.getFamilyLength())); System.out.printf(":" + new String(qualifierArray, current.getQualifierOffset(), current.getQualifierLength())); System.out.printf(" " + new String(valueArray, current.getValueOffset(), current.getValueLength())); System.out.println(); } table.close(); conn.close(); } ~~~ 批量查询数据 ~~~ @Test public void testScan() throws IOException { Table t_user_info = conn.getTable(TableName.valueOf("t_user_info")); //表是liu_sh_01,row key是zhang_bj_01 //数据(字典排序,从liu_sh_01到zhang_bj_01之间的row key全部遍历)("\000"不加这个是包头不包尾,加了是全部包,原因是这个字段排序是排在zhang_bj_01后面),因为永远不知道下一个rowkey是什么,就加个\000来表示下一个rowkey Scan scan = new Scan(Bytes.toBytes("liu_sh_01"), Bytes.toBytes("zhang_bj_01" + "\000")); ResultScanner scanner = t_user_info.getScanner(scan); //迭代器 Iterator<Result> iter = scanner.iterator(); while (iter.hasNext()) { //获取一行记录 Result result = iter.next(); //获取到每一个cell CellScanner cellScanner = result.cellScanner(); //遍历cell while (cellScanner.advance()) { Cell current = cellScanner.current(); byte[] familyArray = current.getFamilyArray(); byte[] valueArray = current.getValueArray(); byte[] qualifierArray = current.getQualifierArray(); byte[] rowArray = current.getRowArray(); System.out.print(new String(rowArray, current.getRowOffset(), current.getRowLength())+" "); System.out.print(new String(familyArray, current.getFamilyOffset(), current.getFamilyLength())); System.out.print(":" + new String(qualifierArray, current.getQualifierOffset(), current.getQualifierLength())); System.out.print(" " + new String(valueArray, current.getValueOffset(), current.getValueLength())); System.out.println(); } System.out.println("-----------------------------"); } } ~~~ 读取指定的列,多版本 ~~~ @Test public void testGetColumn() throws IOException { Table table = conn.getTable(TableName.valueOf("t_user_info")); //构造一个get查询对象.指定要get的是那一行 Get get = new Get("zhang_sh_02".getBytes()); //设置一次性取多少个版本的数据 get.setMaxVersions(4); // 获取指定列族和列修饰符对应的列 get.addColumn(Bytes.toBytes("base_info"), Bytes.toBytes("username")); Result result = table.get(get); for (KeyValue kv : result.list()) { System.out.println("family:" + Bytes.toString(kv.getFamily())); System.out.println("qualifier:" + Bytes.toString(kv.getQualifier())); System.out.println("value:" + Bytes.toString(kv.getValue())); System.out.println("Timestamp:" + kv.getTimestamp()); System.out.println("-------------------------------------------"); } table.close(); conn.close(); } ~~~ ## Get获取单行 如果希望获取整行数据,用行键初始化一个Get对象就可以,如果希望进一步缩小获取的数据范围,可以使用Get对象的以下方法: * 如果希望取得指定列族的所有列数据,使用**addFamily**添加所有的目标列族即可; * 如果希望取得指定列的数据,使用**addColumn**添加所有的目标列即可; * 如果希望取得目标列的指定时间戳范围的数据版本,使用**setTimeRange**; * 如果仅希望获取目标列的指定时间戳版本,则使用**setTimestamp**; * 如果希望限制每个列返回的版本数,使用**setMaxVersions**; * 如果希望添加过滤器,使用**setFilte** 上述讲述了如何使用Get从HBase中获取数据,并将数据进行展示,其实Get对象中的很多属性可以控制在进行查询时的细节控制,从而控制数据从HBase服务器返回时的数据量,从而可以进行数据优化 1. `Get(byte[] row) / Get(byte[] row, RowLock lock)` 初始化函数。在初始化函数时必须要指定Get将要获取的行键,第二个函数则是允许用户自己对Get上一个行锁,但是系统并不赞成用户这么使用。因为在多个客户端进行操作,且都上了自定义的行锁以后,可能会出现因为彼此的行锁需要对方的资源而死锁现象。但是两个客户端的长时间等待与系统连接资源的占用。 2. `addFamily(byte[] family) / addColumn(byte[] family, byte[] qualifier)` 添加列簇 / 添加列函数。通过该函数Get在数据获取时,获取的数据范围:两个函数都不设定时获取正行的所有数据。 使用 addFamily时获取制定列簇的所有列的数据。 addColumn则获取制定列的数据 3. `setTimeStamp(long timestamp)` 设置获取数据的时间戳 4. `setTimeRange(long minTime,long maxTime)` 设置获取数据的时间戳范围 5. `setMaxVersion(int version) / setMaxVersion()` 在默认情况下,Get方法之获取一列的最新的版本。但是有时需要的话则会一次获取多个版本的数据。 第一个函数可以指定确切的返回的版本数量。第二个函数则相当于setMaxVersion(Integer.MAX_VALUE)。即获取列中所有版本的 数据。 6. `setCacheBlock(boolean open)` 是否打开服务器端快缓存。设置该Get获取的数据是否缓存在内存中 在HBase中,整个表以region分块的方式被分布式的存在不同的region服务器中。每一个region服务器将会维护多个region。而在每一个region中都会存在快缓存区域。当每次去读某一个KeyValue数据块时,则会将整个数据加载到缓存区中。又因为加载的数据远大于一个KeyValue所含的数据大小。所以一般情况下缓存区域内都会存放当前KeyValue对象的连续的数据。但是如果在随机读写的程序中,这种数据加载进入缓存区并没有任何的作用,反而会因为在家时间而使得数据获取时间增长。因此我们要根据实际情况去选择是否开启region上的缓存区。连续读写时,开始缓存区可以增加搜索速度。在随机读写时,关闭缓存区可以缩小读取时间。 7. `setFilter(Filter f)` 添加过滤器。因为HBase并没有原声的SQL指定环境,因此在SQL语句中的where条件语句就需要通过特定的借口去实现,而Filter则就是顶替了where 语句的作用。能够实现在在数据查询中的一些精细的控制。 8. 设置获取数据的版本 `Get setMaxVersions(int maxVersions)` 设定获取数据的版本数 `Get setMaxVersions()` 设定获取数据的所有版本 **代码** **获取行键指定行的所有列族、所有列的最新版本数据** ~~~ Configuration conf = HBaseConfiguration.create(); HTable table = new HTable(conf, "rd_ns:leetable"); Get get = new Get(Bytes.toBytes("100003")); Result r = table.get(get); for (Cell cell : r.rawCells()) { System.out.println( "Rowkey : "+Bytes.toString(r.getRow())+ " Familiy:Quilifier : "+Bytes.toString(CellUtil.cloneQualifier(cell))+ " Value : "+Bytes.toString(CellUtil.cloneValue(cell)) ); } table.close(); ~~~ **获取行键指定行中,指定列的最新版本数据** ~~~ Configuration conf = HBaseConfiguration.create(); HTable table = new HTable(conf, "rd_ns:leetable"); Get get = new Get(Bytes.toBytes("100003")); get.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name")); Result r = table.get(get); for (Cell cell : r.rawCells()) { System.out.println( "Rowkey : "+Bytes.toString(r.getRow())+ " Familiy:Quilifier : "+Bytes.toString(CellUtil.cloneQualifier(cell))+ " Value : "+Bytes.toString(CellUtil.cloneValue(cell)) ); } table.close(); ~~~ **获取行键指定的行中,指定时间戳的数据** ~~~ Configuration conf = HBaseConfiguration.create(); HTable table = new HTable(conf, "rd_ns:leetable"); Get get = new Get(Bytes.toBytes("100003")); get.setTimeStamp(1405407854374L); Result r = table.get(get); for (Cell cell : r.rawCells()) { System.out.println( "Rowkey : "+Bytes.toString(r.getRow())+ " Familiy:Quilifier : "+Bytes.toString(CellUtil.cloneQualifier(cell))+ " Value : "+Bytes.toString(CellUtil.cloneValue(cell)) ); } table.close(); ~~~ **获取行键指定的行中,所有版本的数据** ~~~ Configuration conf = HBaseConfiguration.create(); HTable table = new HTable(conf, "rd_ns:itable"); Get get = new Get(Bytes.toBytes("100003")); get.setMaxVersions(); Result r = table.get(get); for (Cell cell : r.rawCells()) { System.out.println( "Rowkey : "+Bytes.toString(r.getRow())+ " Familiy:Quilifier : "+Bytes.toString(CellUtil.cloneQualifier(cell))+ " Value : "+Bytes.toString(CellUtil.cloneValue(cell))+ " Time : "+cell.getTimestamp() ); } table.close(); ~~~ 注意: **能输出多版本数据的前提是当前列族能保存多版本数据,列族可以保存的数据版本数通过HColumnDescriptor的setMaxVersions(Int)方法设置** ## scan获取多行 Scan对象可以返回满足给定条件的多行数据。如果希望获取所有的行,直接初始化一个Scan对象即可。如果希望限制扫描的行范围,可以使用以下方法: * 如果希望获取指定列族的所有列,可使用addFamily方法来添加所有希望获取的列族 * 如果希望获取指定列,使用addColumn方法来添加所有列 * 通过setTimeRange方法设定获取列的时间范围 * 通过setTimestamp方法指定具体的时间戳,只返回该时间戳的数据 * 通过setMaxVersions方法设定最大返回的版本数 * 通过setBatch方法设定返回数据的最大行数 * 通过setFilter方法为Scan对象添加过滤器,过滤器详解请参见:http://blog.csdn.net/u010967382/article/details/37653177 * Scan的结果数据是可以缓存在内存中的,可以通过getCaching()方法来查看当前设定的缓存条数,也可以通过setCaching(int caching)来设定缓存在内存中的行数,缓存得越多,以后查询结果越快,同时也消耗更多内存。此外,通过setCacheBlocks方法设置是否缓存Scan的结果数据块,默认为true * 我们可以通过setMaxResultSize(long)方法来设定Scan返回的结果行数 **常用构造函数** 1. 创建扫描所有行的Scan ~~~ Scan() ~~~ 2. 创建Scan,从指定行开始扫描 ~~~ Scan(byte[] startRow) ~~~ 参数:startRow行键 注意:如果指定行不存在,从下一个最近的行开始 3. 创建Scan,指定起止行 ~~~ Scan(byte[] startRow, byte[] stopRow) ~~~ 参数:startRow起始行,stopRow终止行 注意:`startRow <= 结果集 < stopRow` 4. 创建Scan,指定起始行和过滤器 ~~~ Scan(byte[] startRow, Filter filter) ~~~ 参数:startRow起始行,filter过滤器 注意:过滤器的功能和构造参见http://blog.csdn.net/u010967382/article/details/37653177 **常用方法** * `Scan setStartRow(byte[] startRow)` 设置Scan的开始行,**默认结果集包含该行**。如果希望结果集不包含该行,可以在行键末尾加上0。 * `Scan setStopRow(byte[] stopRow)` 设置Scan的结束行,**默认结果集不包含该行**。如果希望结果集包含该行,可以在行键末尾加上0。 * `Scan setTimeRange(long minStamp, long maxStamp)` 扫描指定**时间范围**的数据 * `Scan setTimeStamp(long timestamp)` 扫描指定**时间**的数据 * `Scan addColumn(byte[] family, byte[] qualifier)` 指定扫描的列 * `Scan addFamily(byte[] family)` 指定扫描的列族 * `Scan setFilter(Filter filter)` 为Scan设置过滤器 * `Scan setReversed(boolean reversed)` 设置Scan的扫描顺序,默认是正向扫描(false),可以设置为逆向扫描(true)。注意:该方法0.98版本以后才可用!! * `Scan setMaxVersions()` 获取所有版本的数据 * `Scan setMaxVersions(int maxVersions)` 设置获取的最大版本数 * `void setCaching(int caching)` 设定缓存在内存中的行数,缓存得越多,以后查询结果越快,同时也消耗更多内存 * `void setRaw(boolean raw)` 激活或者禁用raw模式。如果raw模式被激活,Scan将返回所有已经被**打上删除标记但尚未被真正删除的数据**。该功能仅用于激活了KEEP_DELETED_ROWS的列族,即列族开启了hcd.setKeepDeletedCells(true)。Scan激活raw模式后,就不能指定任意的列,否则会报错 **代码** **扫描表中的所有行的最新版本数据** ~~~ Configuration conf = HBaseConfiguration.create(); HTable table = new HTable(conf, "rd_ns:itable"); Scan s = new Scan(); ResultScanner rs = table.getScanner(s); for (Result r : rs) { for (Cell cell : r.rawCells()) { System.out.println( "Rowkey : "+Bytes.toString(r.getRow())+ " Familiy:Quilifier : "+Bytes.toString(CellUtil.cloneQualifier(cell))+ " Value : "+Bytes.toString(CellUtil.cloneValue(cell))+ " Time : "+cell.getTimestamp() ); } } table.close(); ~~~ **扫描指定行键范围,通过末尾加0,使得结果集包含StopRow** ~~~ Configuration conf = HBaseConfiguration.create(); HTable table = new HTable(conf, "rd_ns:itable"); Scan s = new Scan(); s.setStartRow(Bytes.toBytes("100001")); s.setStopRow(Bytes.toBytes("1000020")); ResultScanner rs = table.getScanner(s); for (Result r : rs) { for (Cell cell : r.rawCells()) { System.out.println( "Rowkey : "+Bytes.toString(r.getRow())+ " Familiy:Quilifier : "+Bytes.toString(CellUtil.cloneQualifier(cell))+ " Value : "+Bytes.toString(CellUtil.cloneValue(cell))+ " Time : "+cell.getTimestamp() ); } } table.close(); ~~~ **返回所有已经被打上删除标记但尚未被真正删除的数据** 然而,使用Scan强大的s.setRaw(true)方法,可以获得所有已经被打上删除标记但尚未被真正删除的数据。 代码如下: ~~~ Configuration conf = HBaseConfiguration.create(); HTable table = new HTable(conf, "rd_ns:itable"); Scan s = new Scan(); s.setStartRow(Bytes.toBytes("100003")); s.setRaw(true); s.setMaxVersions(); ResultScanner rs = table.getScanner(s); for (Result r : rs) { for (Cell cell : r.rawCells()) { System.out.println( "Rowkey : "+Bytes.toString(r.getRow())+ " Familiy:Quilifier : "+Bytes.toString(CellUtil.cloneQualifier(cell))+ " Value : "+Bytes.toString(CellUtil.cloneValue(cell))+ " Time : "+cell.getTimestamp() ); } } table.close(); ~~~ **结合过滤器,获取所有age在25到30之间的行** ~~~ Configuration conf = HBaseConfiguration.create(); HTable table = new HTable(conf, "rd_ns:itable"); FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL); SingleColumnValueFilter filter1 = new SingleColumnValueFilter( Bytes.toBytes("info"), Bytes.toBytes("age"), CompareOp.GREATER_OR_EQUAL, Bytes.toBytes("25") ); SingleColumnValueFilter filter2 = new SingleColumnValueFilter( Bytes.toBytes("info"), Bytes.toBytes("age"), CompareOp.LESS_OR_EQUAL, Bytes.toBytes("30") ); filterList.addFilter(filter1); filterList.addFilter(filter2); Scan scan = new Scan(); scan.setFilter(filterList); ResultScanner rs = table.getScanner(scan); for (Result r : rs) { for (Cell cell : r.rawCells()) { System.out.println( "Rowkey : "+Bytes.toString(r.getRow())+ " Familiy:Quilifier : "+Bytes.toString(CellUtil.cloneQualifier(cell))+ " Value : "+Bytes.toString(CellUtil.cloneValue(cell))+ " Time : "+cell.getTimestamp() ); } } table.close(); ~~~ ## 计数器 计数器可以-1也可以是0 **在mapreduce中要注意,mapreduce任务失败可能会重试,而导致如果用这个可能会不准.因为在mapreduce中可能不是幂等运算** ### 单计数器 ~~~ Table table = conn.getTable(TableName.valueOf("t_user_info")); //记住这个值初始的时候不要用put去设置,会导致后面的错误 原因是'1'会转换成Bytes.toBytes() long rel = table.incrementColumnValue(Bytes.toBytes("user001"), Bytes.toBytes("base_info"), Bytes.toBytes("hit"), 2L); //返回这一列的结果 System.out.println(rel); //存储成功会变成 //column=base_info:hit, timestamp=1532337393697, value=\x00\x00\x00\x00\x00\x00\x00\x01 table.close(); ~~~ ### 复合计数器 ~~~ Table table = connection.getTable(TableName.valueOf("counters")); Increment increment1 = new Increment(Bytes.toBytes("20160101")); increment1.addColumn(Bytes.toBytes("daily"),Bytes.toBytes("clicks"),1); increment1.addColumn(Bytes.toBytes("daily"),Bytes.toBytes("hits"),1); increment1.addColumn(Bytes.toBytes("weekly"),Bytes.toBytes("clicks"),10); increment1.addColumn(Bytes.toBytes("weekly"),Bytes.toBytes("hits"),10); Result result = table.increment(increment1); for(Cell cell:result.rawCells()){ System.out.println("Cell: " + cell + " Value: " + Bytes.toLong(cell.getValueArray(), cell.getValueOffset(),cell.getValueLength())); } Increment increment2 = new Increment(Bytes.toBytes("20160101")); increment2.addColumn(Bytes.toBytes("daily"),Bytes.toBytes("clicks"), 5); increment2.addColumn(Bytes.toBytes("daily"),Bytes.toBytes("hits"), 1); increment2.addColumn(Bytes.toBytes("weekly"),Bytes.toBytes("clicks"), 0); increment2.addColumn(Bytes.toBytes("weekly"),Bytes.toBytes("hits"), -5); Result result2 = table.increment(increment2); for (Cell cell : result2.rawCells()) { System.out.println("Cell: " + cell + " Value: " + Bytes.toLong(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())); } table.close(); connection.close(); ~~~ ### 获取计数器的值 ~~~ @Test public void testGet()throws Exception{ HTable table = new HTable(conf,"wc"); Get get =new Get("apple01".getBytes()); get.addColumn("cf".getBytes(),"hits".getBytes()); Result result = table.get(get); for (KeyValue kv : result.list()) { System.out.println("family:" + Bytes.toString(kv.getFamily())); System.out .println("qualifier:" + Bytes.toString(kv.getQualifier())); System.out.println("value:" + Bytes.toLong(kv.getValue())); 计数器的值获取 } table.close(); } ~~~ # 扫描器缓存 在Hbase的设置里扫描每次RPC调用得到一批数据.这可以在扫描对象上使用setCaching(int)在每个扫描器(scanner)层次上设置,也可以在hbase-site.xml配置文件里使用HBase.client.scanner.caching属性来设置. 如果缓存值设置为n,每次RPC调用扫描器返回n行,然后这些数据缓存在客户端.这个设置的默认值是1,这意味着客户端对HBase的每次RPC调用在扫描整张表后仅仅返回一行.这个数字很保守,可以调整它以获得更好的性能. 但是该值设置过高意味着客户端和hbase的交互会出现较长的暂停,这会导致hbase端的超时. ResultScanner接口也有一个next(int)调用,你可以用来要求返回扫描的下面n行.这是在API层面提供的遍历,与为了获取那n行数据客户端对HBase的RPC调用次数无关. 在内部机制中,ResultScanner使用了多次RPC调用来满足这个请求,每次RPC调用返回的行数只取决于为扫描器设置的缓存值