hbase
hbase介绍
HBase产生背景介绍:
由于 HAOOP 不支持随机读写的操作, 仅支持顺序性读写操作, 适合于进行批量化处理操作
HBase是采用 java 语言开发, HBase基于HDFS , 是一个 支持高效的随机读写能力的noSQL型数据库
HBase支持三种方式进行查询数据:
- 1) 支持主键查询
- 2) 支持主键的范围查询
- 3) 支持全表查询
HBase本质上就是一个存储容器, 不支持多行事务, 仅支持单行事务, 不支持SQL语句, 数据存储格式都是一切皆字节
HBase集群, 可以向hadoop一样, 通过横向扩展方式, 来提升HBase处理和存储的能力
HBase的表具有以下特征:
- 大: 表支持存储上十亿行数据, 上百万列
- 面向列: 数据是面向于列(列族)式的存储方案
- 稀疏性: 在HBase的表存储数据, 如果某个字段为null, 并不会占用磁盘任何空间, 所以可以将表构建非常稀疏
应用场景:
- 数据需要进行随机读写操作
- 数据量比较的大
- 数据比较的稀疏
hbase和其他软件的区别
hbase和RDBMS的区别
- HBase: 以表形式存储数据, 不支持SQL 不支持事务, 仅支持单行事务操作, 数据存储是分布式存储 ,存储是结构化和半结构数据,不支持join
- RDBMS:以表形式存储数据, 支持SQL 支持多行事务操作, 数据存储中心化存储, 存储主要是以结构化数据为主,支持join
hbase 和 HDFS的区别
- HBASE: 强依赖于HDFS , 数据最终存储在HDFS之上的, 支持高效的随机读写操作
- HDFS: 分布式存储容器, 适合于批量化数据存储, 不支持随机读写能力
说明:
注意到 HBASE 和HDFS 既有联系 又有矛盾, HBASE基于HDFS , 而HDFS不支持随机读写, 但是HBASE支持随机读写
hbase和hive的区别
HBASE: 基于hadoop的软件 , hbase是nosql存储容器, HBASE延迟型较低, 接入在线业务
HIVE: 基于hadoop的软件, 数仓分析工具 , hive延迟较高, 接入离线业务, 用于 OLAP操作
hbase部署
hbase的安装操作
安装HBase的易错点:
- 修改hbase-site.xml的时候, 没有检查 zookeeper的目录位置
- 没有将 htrace-core-3.1.0-incubating.jar 放置到hbase的lib目录下
- 没有讲conf/regionserves中的localhost信息删除, 以及此文件存在空行
如何启动HBase:
- 先去启动 zookeeper
#第一步: 三台节点都要执行:
cd /export/server/zookeeper-3.4.6/bin
./zkServer.sh start
#第二步: 通过 jps 查询 三个节点是否都出现了以下这个进程
QuorumPeerMain
#第三步: 三台节点, 依次检查启动状态, 必须看到 两个 follower 一个 leader- 启动 hadoop集群:
1) 在node1的任意位置下执行:
start-all.sh
2) 检查 三个节点是否都启动
node1:
namenode
datanode
resourceManager
nodemanager
node2:
seconderyNamenode
datanode
nodemanager
node3:
datanode
nodemanager
3) 分别打开 50070 和 8088 的端口号 检查
在50070 检查是否退出安全模式
三个datanode是否存在
在8088端口号, 检查 active node 是否为 3- 启动 HBASE
1) 在node1节点上, 任意目录下执行:
start-hbase.sh
2) 检查: 在三个节点依次 通过 jps查询
node1:
HMaster
HRegionServer
node2:
HRegionServer
node3:
HRegionServer
此检查, 如果第三步正常, 不需要在检查, 如果第三步一直无法显示, 请在2分钟后, 在此查询, 是否有减少进程
如果那个节点没有启动, 请查询其日志文件:
日志存储的目录: /export/server/hbase-2.1.0/logs
查看两个文件:
hbase-root-regionserver-node[N].log
hbase-root-regionserver-node[N].out
查看日志命令:
tail -200f xxx.文件
3) 登录 hbase的管理界面: 端口号 16010
访问: http://node1:16010hbase的表模型
- Table: hbase的表, 在hbase中, 数据也是通过表形式组织在一起, hbase可以构建多张表
- rowkey: 行键(主键) 类似于RDBMS中的PK , 保证唯一且非空 , rowkey仅会安装 字典序方案进行排序
- 列: 列族 + 列限定符(列名)组成的
- 列族: 在一个表可以构建多个列族的, 每个列族下可以有多个列名, 支持有上百万个列
- 注意:
- 在建表的时候, 必须要指定列族
- 在建表的时候, 建议列族越少越好, 能用一个解决的, 坚决不使用两个的
- 注意:
- 列名: 一个列名必然属于某一个列族的, 列名不需要在建表的时候指定, 在后期添加数据的时候动态指定
- 时间戳:
- 每一个数据都是由时间戳的概念, 默认为添加数据的时间, 当然也可以手动指定时间
- 版本号: 是否需要保留每个数据的历史变更信息, 以及保留多少个
- 默认值: 1 表示只保留最新的版本
- 版本号是基于时间戳来记录各个历史版本的
- 单元格: 如何确定一个单元格
- 通过: rowkey + 列族 + 列名 + 值
说明: 在建表的时候,必须指定二个参数 一个 表名 一个是列族
hbase常用操作
hbase的相关操作_shell命令
hbase的基本shell操作
- 如何进入HBase的命令行客户端
[root@node2 ~]# hbase shell- 查看帮助文档的命令
hbase(main):001:0> help #查看某一个命令如何使用:
#格式: help '命令名称'
hbase(main):012:0> help 'create'- 查询集群的状态: status
- 查询Hbase有那些表: list
- 如何创建一张表
#格式:
create '表名' , '列族名称1','列族名称2' ....
- 如何向表中添加数据: put
#格式:
put '表名','rowkey值','列族:列名','值'- 如何读取某一个rowkey的数据呢?
格式:
get '表名','rowkey',['列族1','列族2' ...],['列族1:列名'],['列族1','列族2:列名' ...]- 扫描查询 scan
格式:
+,[{COLUMNS =>['列族1','列族2']} | {COLUMNS =>['列族1:列名','列族2'],VERSIONS=>N} ] , [{FORMATTER =>'toString'}] , [{LIMIT =>N}]
范围查询的格式:
scan '表名', {COLUNMS=>['列族1','列族2']} | {COLUMNS =>['列族1:列名','列族2'], STARTROW =>'起始rowkey的值', ENDROW=>'结束rowkey'}
注意: 包头不包尾
说明:
{FORMATTER =>'toString'} : 用于显示中文
{LIMIT =>N} : 显示前 N条数据- 如何修改表中数据
修改与添加数据的操作 是一致的, 只需要保证 rowkey相同 就是 修改操作- 删除数据: delete
格式:
delete '表名','rowkey','列族:列名' 格式:
deleteall '表名','rowkey',['列族:列名'] delete 和 deleteall区别:
两个操作都是用来执行删除数据操作
区别点:
1) delete操作 只能删除表中某个列的数据 deleteall支持删除某行数据
2) 通过delete删除某个列的时候, 默认只是删除其最新的版本, 而deleteall直接将其所有的版本数据全部都删除- 如何清空表
格式:
truncate '表名'- 如何删除表
格式:
drop '表名'说明:
在删除表的时候, 必须先禁用表, 然后才能删除表
- 如何查看表中有多少行数据
格式:
count '表名'hbase的高级shell命令
- HBase的过滤器查询操作
格式:
scan '表名',{FILTER=>"过滤器名称(比较运算符,比较器表达式)" }
常见的过滤器:
rowkey过滤器:
RowFilter : 实现行键字符串的比较和过滤操作
PrefixFilter : rowkey 前缀过滤器
列族过滤器:
FamilyFilter: 列族过滤器
列名过滤器:
QualifierFilter : 列名过滤器, 中显示对应列名的数据
列值过滤器:
ValueFilter: 列值过滤器, 找到符合值的数据
SingleColumnValueFilter: 在执行的列族和列名中进行比较具体的值, 将符合条数据整行数据全部都返回(包含条件的内容字段)
SingleColumnVlaueExcludeFilter: 在执行的列族和列名中进行比较具体的值, 将符合条数据整行数据全部都返回(去除条件内容的字段)
其他过滤器:
PageFilter : 用来执行分页操作
比较运算符 : = > < >= <= !=
比较器:
BinaryComparator : 完整匹配字节数组
BinaryPrefixComparator : 匹配字节数组前缀
NullComparator : 匹配Null
SubstringComparator : 模糊匹配字符串
比较器表达式:
BinaryComparator : binary:值
BinaryPrefixComparator : binaryprefix:值
NullComparator : null
SubstringComparator: substring:值
如果不知道过滤器的构造参数, 可以查看此地址:
http://hbase.apache.org/2.2/devapidocs/index.html- 高级的shell管理命令:
2.1: whoami : 显示Hbase当前使用用户
2.2: describe: 展示表的结构信息
2.3: exists 判断表是否存在
2.4: is_enabled 和 is_disabled 判断表是否启用和是否禁用
2.5 : alter: 该命令可以改变表和列族信息
如何增加列族:
alter '表名' ,NAME=>'列族名',[VERSIONS=>N]
如何删除列族:
alter '表名','delete'=>'列族名'
hbase的javaAPI的操作
项目的准备工作
- 创建maven项目:
- 1.1) 先构建一个父工程: bigdata_parent_01
- 1.2) 接着将父工程中 src 删除
- 1.3) 创建一个子工程: day01_hbase
- 导入相关的依赖:pom
<repositories><!--代码库-->
<repository>
<id>aliyun</id>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
<releases><enabled>true</enabled></releases>
<snapshots>
<enabled>false</enabled>
<updatePolicy>never</updatePolicy>
</snapshots>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version></dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<version>6.14.3</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<target>1.8</target>
<source>1.8</source>
</configuration>
</plugin>
</plugins>
</build>- 创建包结构:
- 名称: com.gordon.hbase
创建表
实现步骤:
1) 创建HBase连接对象:
2) 从连接对象中获取相关的管理对象: Admin(对表的操作) 和 Table(对表数据操作)
3) 执行相关的操作
4) 处理结果集 -- 此步骤只有查询操作
5) 释放资源- 具体实现
// 创建表
@Test
public void test01() throws Exception{
// 1) 创建HBase连接对象:
//Configuration conf = new Configuration();
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum","node1:2181,node2:2181,node3:2181");
Connection hbConn = ConnectionFactory.createConnection(conf);
// 2) 从连接对象中获取相关的管理对象: Admin(对表的操作) 和 Table(对表数据操作)
Admin admin = hbConn.getAdmin();
// 3) 执行相关的操作
boolean flag = admin.tableExists(TableName.valueOf("WATER_BILL")); // 为true 表示存在, 否则为不存在
if(!flag){
// 说明 表不存在
//3.1: 创建 表的构建器对象
TableDescriptorBuilder descBuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf("WATER_BILL"));
//3.2: 在构建器 设置表的列族信息
ColumnFamilyDescriptorBuilder familyDesc = ColumnFamilyDescriptorBuilder.newBuilder("C1".getBytes());
descBuilder.setColumnFamily(familyDesc.build());
//3.3: 基于表构建器 构建表基本信息封装对象
TableDescriptor desc = descBuilder.build();
admin.createTable(desc);
}
// 4) 处理结果集 -- 此步骤只有查询操作
// 5) 释放资源
admin.close();
hbConn.close();
}添加数据
- 操作步骤
1) 创建Hbase的连接对象
2) 通过连接对象 获取相关的管理对象: admin 和 table
3) 执行相关的操作: 添加数据
4) 处理结果集: -- 此步骤不需要
5) 释放资源- 具体操作代码
// 添加数据
@Test
public void test02() throws Exception{
//1) 创建Hbase的连接对象
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum","node1:2181,node2:2181,node3:2181");
Connection hbConn = ConnectionFactory.createConnection(conf);
//2) 通过连接对象 获取相关的管理对象: admin 和 table
Table table = hbConn.getTable(TableName.valueOf("WATER_BILL"));
//3) 执行相关的操作: 添加数据
Put put = new Put(Bytes.toBytes(4944191));
put.addColumn("C1".getBytes(),"NAME".getBytes(),"登卫红".getBytes());
put.addColumn("C1".getBytes(),"ADDRESS".getBytes(),"贵州省铜仁市德江县7单元267室".getBytes());
put.addColumn("C1".getBytes(),"SEX".getBytes(),"男".getBytes());
put.addColumn("C1".getBytes(),"PAY_TIME".getBytes(),"2020-05-10".getBytes());
table.put(put);
//4) 处理结果集: -- 此步骤不需要
//5) 释放资源
table.close();
hbConn.close();
}抽取一些公共的方法
private Connection hbConn;
private Table table;
private Admin admin;
private String tableName = "WATER_BILL";
@Before
public void before() throws Exception{
//1) 创建Hbase的连接对象
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum","node1:2181,node2:2181,node3:2181");
hbConn = ConnectionFactory.createConnection(conf);
//2) 通过连接对象 获取相关的管理对象: admin 和 table
admin = hbConn.getAdmin();
table = hbConn.getTable(TableName.valueOf(tableName));
}
@Test
public void test03(){
// 3) 执行相关的操作
//4) 处理结果集
}
@After
public void after() throws Exception{
//5. 释放资源
admin.close();
table.close();
hbConn.close();
}查询某一条数据
- 代码实现
// 根据rowkey 查询某一条数据
@Test
public void test03() throws Exception{
// 3) 执行相关的操作 get
Get get = new Get(Bytes.toBytes(4944191));
Result result = table.get(get); // 一个Result 表示 就是一行数据
//4) 处理结果集
//4.1: 获取一行数据中每一个单元格
List<Cell> listCells = result.listCells();
//4.2: 遍历单元格, 从单元格获取数据
for (Cell cell : listCells) {
// 从单元格可以获取那些内容: rowkey 列族 列名 列值
//4.2.1 获取rowkey
/*byte[] rowArray = cell.getRowArray();
int rowkey = Bytes.toInt(rowArray, cell.getRowOffset(), cell.getRowLength());*/
byte[] rowBytes = CellUtil.cloneRow(cell);
int rowkey = Bytes.toInt(rowBytes);
// 4.2.2 获取 列族
byte[] familyBytes = CellUtil.cloneFamily(cell);
String family = Bytes.toString(familyBytes);
// 4.2.3 获取 列名(列限定符)
byte[] qualifierBytes = CellUtil.cloneQualifier(cell);
String qualifier = Bytes.toString(qualifierBytes);
// 4.2.4 获取 列值
byte[] valueBytes = CellUtil.cloneValue(cell);
String value = Bytes.toString(valueBytes);
System.out.println("rowkey:"+rowkey +"; 列族为:"+family+"; 列名为:"+qualifier+"; 列值为:"+value);
}
}删除数据
- 代码实现
// 删除数据操作:
@Test
public void test04() throws Exception{
// 3) 执行相关的操作 : delete 和 deleteall
Delete delete = new Delete(Bytes.toBytes(4944191));
//delete.addColumn("C1".getBytes(),"NAME".getBytes());
delete.addFamily("C1".getBytes());
table.delete(delete);
//4) 处理结果集 -- 此步骤不需要
}删除表
- 代码实现
// 删除表操作:
@Test
public void test05() throws Exception{
// 3) 执行相关的操作
boolean flag = admin.isTableEnabled(TableName.valueOf(tableName)); // 判断是否启用表
if(flag){
admin.disableTable(TableName.valueOf(tableName)); //如果启用, 将其先禁用
}
admin.deleteTable(TableName.valueOf(tableName));
//4) 处理结果集 -- 此步骤不需要
}导入数据的操作
如何导入数据操作
- 语法格式:
hbase org.apache.hadoop.hbase.mapreduce.Import 表名 HDFS数据文件路径注意: 此操作需要在shell命令窗口下执行, 不要在hbase的命令行窗口行
操作步骤:
接下来即可执行导入命令
hbase org.apache.hadoop.hbase.mapreduce.Import WATER_BILL /water_bill/input基于scan的扫描查询
//scan操作
@Test
public void scan() throws Exception{
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "node1:2181,node2:2181,node3:2181");
Connection hbConn = ConnectionFactory.createConnection(conf);
Table table = hbConn.getTable(TableName.valueOf("WATER_BILL"));
//命令行的写法 scan 表名 ,{FILTER=>"QualifierFilter(=,'substring:a')"}
Scan scan = new Scan();
FilterList filter = new FilterList();
SingleColumnValueFilter startValueFilter = new SingleColumnValueFilter("C1".getBytes(),"RECORD_DATE".getBytes(), CompareOperator.GREATER_OR_EQUAL,"2020-06-01".getBytes());
SingleColumnValueFilter endValueFilter = new SingleColumnValueFilter("C1".getBytes(),"RECORD_DATE".getBytes(), CompareOperator.LESS_OR_EQUAL,"2020-07-01".getBytes()) ;
filter.addFilter(startValueFilter);
filter.addFilter(endValueFilter);
scan.setFilter(filter);
ResultScanner results = table.getScanner(scan);
for (Result result : results) {
List<Cell> cells = result.listCells();
for (Cell cell : cells) {
String qulifier = Bytes.toString(CellUtil.cloneQualifier(cell));
if("NAME".equals(qulifier)){
String name = Bytes.toString(CellUtil.cloneValue(cell));
System.out.println(name);
}
}
}
table.close();
hbConn.close();
}hbase的架构及原理
hbase高可用架构部署
hbase的高可用, 主要指的是让HBase的主节点, 有多台, 当其中一台出现故障后, 可以让其他的节点顶上来
如何配置呢?
- 在node1 进入 hbase的conf目录下, 创建一个 backup-masters
cd /export/server/hbase-2.1.0/conf/
vim backup-masters
内容如下:
node2.itcast.cn
node3.itcast.cn- 将 backup-master 发送其他两台节点
cd /export/server/hbase-2.1.0/conf/
scp backup-masters node2:$PWD
scp backup-masters node3:$PWD- 重启 HBase的集群
在 node1执行:
stop-hbase.sh
start-hbase.shhbase的集群架构图
region server的上线和下线
region server的上线流程:
说明:
当regionServer上线后, master会立即感知到, 此时regionServer需要上master汇报自己当下管理那些region, 然后master会根据各个regionServer汇报的region的管理情况, 然后在读取meta表获取到所有的region, 与之比较, 查看是否还有未分配的region, 如果有, 将这个未分配的region, 均匀的分配给各个regionServer上, 保证负载均衡
regionServer的下线流程
说明: 当HregionServer下线后, 对应这个regionServer所管理的region就处于无人管理的状态(无分配状态),此时master就需要将这些没有分配的region, 重新分配给其他的regionServer上 即可, 当regionServer有重新上线之后, 从之前regionServer上, 解除一些region, 将region分配给当前这个新启动regionServer上 (存在时间间隔, 不是立即执行)
master的上线和下线
说明:
master短暂的下线, 并不会太大的影响HBase的集群, 因为hbase的读写操作是不经过HMaster, 而大多数的请求都是读写的请求,
master下线主要是会影响到对元数据操作的请求, 比如说 创建表 删除表 修改表
HBase的读写原理
读取数据的流程
Meat表内具体存放哪些信息:
rowkey:由四个字段拼接起来,分别是 表名-StratRow-TimeStamp-EncodedName。
数据分为4列:
info:regioninfo:EncodedName、RegionName、Region的StartRow、Region的StopRow;
info:seqnumDuringOpen:存储Region打开时的sequenceId;
info:server:存储Region落在哪个RegionServer上;
info:serverstartcode:存储所在的RegionServer启动时间戳;- HBase读取数据的流程
由客户端发起读取数据的请求: scan '表名'
1) 首先第一步连接zookeeper, 从zookeeper中获取 HBase:Meta 表的位置信息(meta被那个regionServer所管理)
HBase:Meta 表 是 hbase的管理表, 有且只有一个region 主要是用于存储 hbase的元数据信息, 比如 有那些表, 每个表有那些region, 每个region被那些regionServer管理 ......
2) 连接对应(hbase meta表的)regionServer, 获取meta表中数据, 从meta表获取对应要查询的表有几个region, 以及这些region被那些regionserver所管理, 与当下要读取表的元数据信息全部都获取到
3) 并发的连接(查询表的)regionServer, 从各个regionServer的region中读取表的数据即可, 如果是基于scan扫描 此时会将所有的数据全部扫描的到客户端(边读边处理)), 如果是get操作, 此时从region中, 找到具体的rowkey
从region读取数据顺序: memStore --> blockCache --->StoreFile(小File) --> 大HFile数据的写入流程
- HBase的数据写入流程
客户端发起写入数据请求: put 'user' ,'rk001','C1:name','张三'
1) 首先连接zookeeper, 获取HBase:Meta表所在的regionServer的地址
2) 连接regionServer, 从meta表获取要写入数据的表,根据rowkey找到对应的region被那个regionServer所管理
3) 连接对应regionServer, 开始进行数据写入操作
4) 首先先将数据写入到这个regionServer的HLog(WAL)日志中, 然后再将数据写入到memStore(可能会写入到多个memStore)中
5) 当HLog 和 memStore都成功将数据写入完成, 此时 客户端的写入流程就结束了, 客户端返回写入成功....
-------------------------以上为客户端流程------------------------------------
服务端流程:
6) 随着客户端不断的写入, memStore中数据越来越多, 当memStore数据达到一定的阈值(128M/1小时)后, 就会执行flush刷新机制, 将memStore数据 "最终" 刷新到HDFS上, 形成一个storeFile(小File)文件
7) 随着不断的flush的刷新操作, 在HDFS上, 会存储越来越多的小File文件, 当这些小的Hfile文件达到一定的阈值(3个及以上)后, 就会启动compact(合并压缩)机制, 将多个小Hfile "最终" 合并为一个大的HFile文件
8) 随着不断的compact的合并压缩, 这个大的Hfile 也会越来越大, 当这个大的Hfile达到一定的阈值("最终"10GB)后, 就会触发 split机制, 将大的Hfile 进行一分为二, 形成两个新的Hfile, 此时对应region 也会进行一份为二, 形成两个新的region, 每个region管理其中一个新的大Hfile即可, 一旦split分裂完毕, 此时旧的region就会下线(注意: 在执行split分裂过程中, 当下分裂的表是不接受读写请求)Hbase三大机制
HBase的flush刷新机制
- HBase的flush刷新机制:
flush机制: 刷新机制
目的:
将memStore中最终写入到HDFS上, 形成一个storeFile文件
阈值:
大小阈值: 128M
时间阈值: 1小时
注意: 满足了那个阈值, 都会触发flush机制
flush流程: hbase 2.0 flush
1) 关闭掉当前这个达到阈值的memStore空间, 然后开启一个新的memStore, 以便于正常写入
2) 将这个关闭的memStore 首先放入一个 队列容器 (内存)中, 在HBase的2.0版本后, 会让这个队列容器,尽可能晚的刷新到磁盘上, 此时在队列的memStore变更为只读状态, 在读取数据时候, 依然可以从队列中进行读取操作, 以保证读取的效率
3) 随着memStore不断的达到阈值 , 在队列容器中存储的memStore的数量也会越来越多, 当内存达到一定的阈值(?)后, 触发flush操作, 将队列中所有的数据 进行合并操作,然后 一次性刷新到磁盘上, 形成一个storeFile文件;
注意: 此操作, 仅在hbase2.0版本后才支持, 在2.0版本下, 写入到队列之后, 直接将数据刷新HDFS上, 形成一个个storeFile, 即使刷新慢了, 导致队列中有了多个memStore, 依然一个memStore就是一个storeFile说明:
内存合并操作, 在hbase2.0后就开始支持了, 但是hbase2.x版本, 默认是不开启内存合并的, 如果开启, 需要手动设置
如何开启内存合并操作:
方式1: 全局配置
- 配置在: hbase-site.xml ,建议配置 adaptive
方式2: 针对某一个表来设置
- 配置在: 在建表的时候, 可以针对设置操作
合并的方案: 共计有三种
- basic(基础型):
- 作用: 在合并的过程中, 不关心是否有重复数据, 或者过期的版本数据, 直接进行合并即可, 效率最高的
- eager(饥渴型):
- 作用: 在合并的过程中, 会积极的判断数据是否有重复, 以及是否有过期, 会将重复的 过期的版本, 全部清洗掉, 然后合并在一起
- adaptive(适应型):
- 作用: 检查数据, 如果发现数据重复和过期版本 的比例以及达到 eager方案, 就采用饥渴型, 否则就采用基础型
HBase的storeFile的合并机制
- compact机制流程说明
compact 合并压缩机制:
目的: 将多个storeFile(小Hfile) 合并为一个更大的HFile文件
阈值: 达到3个及以上
minor:
目的: 将多个小的storeFile 合并为一个较大的Hfile文件过程
流程:
1) 当storeFile文件数量达到阈值(3个及以上)后, 首先先将这几个小storeFile进行合并操作, 形成一个较大的storeFile文件
2) 在合并过程中, minor 操作不会对数据进行任何的删除, 去掉重复操作, 仅仅做一个基本排序合并工作即可, 整体执行效率是非常快的
major:
目的: 将这个较大的Hfile 和之前大的Hfile 进行合并形成一个最终的大Hfile操作
流程:
1) 默认情况下, 当达到一定的阈值(7天|手动)后触发major操作, 将磁盘中较大的HFile和之前大的Hfile进行合并, 形成一个最终的大Hfile文件即可
2) 在合并的过程中, 打上重复标记的数据, 打上过期版本标记的数据, 在major执行过程中, 就会进行全部处理掉
注意: 由于major在合并的过程中, 需要对所有数据进行合并操作, 此时无法对数据进行相关的操作, 而且此操作由于数据量较大, 执行时间较大, 此操作会Hbase性能影响较大
所以在实际生产中, 一般是关闭major, 改为手动执行
合并做法:
将HDFS数据读取出来, 边读边进行处理, 边将数据通过追加的形式添加到HDFS上Hbase的split机制(region分裂)
此公式主要是用于计算 表对应region, 何时执行分裂的操作
比如说:
当最初始的时候, 表只有一个Region , 此时 1^2 * 128 与 10GB 做比较, 那个小, 我们就会在那个值上执行分裂, 此时第一次应该在 region的Hfile数据量达到 128M 的时候执行分裂
当第二次分裂, R=2, 经过计算后, 当region的Hfile数据量达到 512M的时候, 就会执行分裂
以此类推
直到表的region数量达到 9个及以上的时候, 此时region分裂按照 10GB 分裂一次HBase的Bulk Load 批量加载操作
假设: 目前有一批数据, 此数据量比较大, 需要将这些数据写入到HBase中, 如果采用hbase的 普通的javaAPI操作, 会将数据正常的写入流程 进入到HBase中, 而正常数据流程: 先将数据写入到HLog 然后将数据写入memStore, 然后从memStore到storeFile, 再从storeFile到Hfile, 而且在整个写入流程中, 需要大量的占用服务器的资源
如果这个时候, 还有大量的请求, 从Hbase的这个表中读取数据, 由于服务器的资源都被写入的请求占用了, 此时读取的请求可能无法实施, 或者返回结果会很慢, 此时对网络的带宽造成较大的影响
思考 如何解决呢?
1) 将这一批数据 先转换为 HFile的文件
2) 将HFile文件直接导入HBase中, 让Hbase直接加载即可
此操作不需要先写入HLog 然后到内存, 然后HDFS过程, 直接将数据到达HDFS操作解决的应用场景
- 需求一次性写入大量数据到达HBase的操作
5.1 需求说明
目前在HDFS上有一份 CSV文件, 此文件中记录大量的转账数据, 要求将这些转换数据 存储到Hbase中, 由于初始数据量过于庞大, 可以采用 bulk_load 将数据批量加载HBase中
准备工作
- 在Hbase中创建目标表:
create 'TRANSFER_RECORD','C1'- 将数据 上传到 HDFS中
hdfs dfs -mkdir -p /hbase/bulkload/input
rz将数据上传到 Linux中
hdfs dfs -put bank_record.csv /hbase/bulkload/input- 在IDEA中构建项目: day02_hbase_bulk_load
- 导入相关的pom依赖
<repositories><!--代码库-->
<repository>
<id>aliyun</id>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
<releases><enabled>true</enabled></releases>
<snapshots>
<enabled>false</enabled>
<updatePolicy>never</updatePolicy>
</snapshots>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-mapreduce</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.5</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<target>1.8</target>
<source>1.8</source>
</configuration>
</plugin>
</plugins>
</build>- 创建包结构
- com.hbase.bulkLoad
MR-bulkload
将CSV数据转换为HFile文件格式数据(天龙八部)==>内存多的话,需要改成spark来完成
- 编写 Mapper程序
package com.hbase.bulkLoad;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class BulkLoadMapper extends Mapper<LongWritable,Text,ImmutableBytesWritable, Put> {
private ImmutableBytesWritable k2 = new ImmutableBytesWritable();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1. 获取一行数据
String line = value.toString();
//2. 判断是否接收到数据
if(line != null && !"".equals(line.trim())){
//3. 对这一行数据执行切割操作
String[] fields = line.split(",");
//4. 封装 k2 和 v2的数据
byte[] rowkey = fields[0].getBytes();
k2.set(rowkey);
Put v2 = new Put(rowkey);
v2.addColumn("C1".getBytes(),"code".getBytes(),fields[1].getBytes());
v2.addColumn("C1".getBytes(),"rec_account".getBytes(),fields[2].getBytes());
v2.addColumn("C1".getBytes(),"rec_bank_name".getBytes(),fields[3].getBytes());
v2.addColumn("C1".getBytes(),"rec_name".getBytes(),fields[4].getBytes());
v2.addColumn("C1".getBytes(),"pay_account".getBytes(),fields[5].getBytes());
v2.addColumn("C1".getBytes(),"pay_name".getBytes(),fields[6].getBytes());
v2.addColumn("C1".getBytes(),"pay_comments".getBytes(),fields[7].getBytes());
v2.addColumn("C1".getBytes(),"pay_channel".getBytes(),fields[8].getBytes());
v2.addColumn("C1".getBytes(),"pay_way".getBytes(),fields[9].getBytes());
v2.addColumn("C1".getBytes(),"status".getBytes(),fields[10].getBytes());
v2.addColumn("C1".getBytes(),"timestamp".getBytes(),fields[11].getBytes());
v2.addColumn("C1".getBytes(),"money".getBytes(),fields[12].getBytes());
//5. 写出去
context.write(k2,v2);
}
}
}- 编写 驱动类
package com.hbase.bulkLoad;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class BulkLoadDriver extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
//1. 创建 Job对象
Job job = Job.getInstance(super.getConf(), "BulkLoadDriver");
// 设置提交yarn 必备参数
job.setJarByClass(BulkLoadDriver.class);
//2. 设置 天龙八部
//2.1: 设置 输入类 和输入路径
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("hdfs://node1:8020/hbase/bulkload/input"));
//2.2: 设置 mapper类和 map输出k2和v2的类型
job.setMapperClass(BulkLoadMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
//2.3: 设置 shuffle操作: 分区 排序 规约 分组
//2.7: 设置 reduce类 和 reduce 输出 k3和v3的类型
job.setNumReduceTasks(0); // 没有reduce
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Put.class);
//2.8: 设置输出类, 及输出路径 Hfile
job.setOutputFormatClass(HFileOutputFormat2.class);
// 获取 连接对象 和 table对象
Connection hbConn = ConnectionFactory.createConnection(super.getConf());
Table table = hbConn.getTable(TableName.valueOf("TRANSFER_RECORD"));
HFileOutputFormat2.configureIncrementalLoad(job,table,hbConn.getRegionLocator(TableName.valueOf("TRANSFER_RECORD")));
HFileOutputFormat2.setOutputPath(job, new Path("hdfs://node1:8020/hbase/bulkload/output"));
//3. 提交任务
boolean flag = job.waitForCompletion(true);
return flag ? 0:1;
}
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum","node1:2181,node2:2181,node3:2181");
int i = ToolRunner.run(conf, new BulkLoadDriver(), args);
System.exit(i);
}
}- 测试操作:
将Hfile文件格式数据加载HBase中
语法格式要求
hbase org.apache.hadoop.hbase.tool.LoadIncrementalHFiles MR输出路径 HBase表名执行导入操作:
hbase org.apache.hadoop.hbase.tool.LoadIncrementalHFiles /hbase/bulkload/output TRANSFER_RECORD但是, 各位 你们今天执行可能会报错: 尝试了10次 依然无法导入错误
- 检查:
查看 Hbase的日志: regionServer的日志
优先查看 当下执行导入操作的这个regionServer的日志, 如果没有错误
在查询导入的表, 对应region属于哪个regionServer, 查询这个regionServer的日志- 错误原因:
hbase的采用 域名 和 hdfs的采用的域名不一致导致的, hbase到导入数据的时候, 发现这个域名不一致, 以为不是同一个集群 导致失败解决方案:
- 查询 hbase-site.xml中 hbase.root.dir配置的hdfs的域名是什么?
- 查询 hdfs的 core-site.xml中 fs.defaultFS的配置的hdfs的域名是什么?
- 如果两个不一致, 建议大家修改 core-site.xml
- 修改后, 将这这个配置发送给 node2 和 node3
- 重启 hadoop 和 hbase 即可,然后重新尝试导入操作
Spark bulkLoad
/**
* @Author bigdatalearnshare
*/
object App {
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME", "root")
val sparkSession = SparkSession
.builder()
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.master("local[*]")
.getOrCreate()
val rowKeyField = "id"
val df = sparkSession.read.format("json").load("/people.json")
val fields = df.columns.filterNot(_ == "id").sorted
val data = df.rdd.map { row =>
val rowKey = Bytes.toBytes(row.getAs(rowKeyField).toString)
val kvs = fields.map { field =>
new KeyValue(rowKey, Bytes.toBytes("hfile-fy"), Bytes.toBytes(field), Bytes.toBytes(row.getAs(field).toString))
}
(new ImmutableBytesWritable(rowKey), kvs)
}.flatMapValues(x => x).sortByKey()
val hbaseConf = HBaseConfiguration.create(sparkSession.sessionState.newHadoopConf())
hbaseConf.set("hbase.zookeeper.quorum", "linux-1:2181,linux-2:2181,linux-3:2181")
hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, "hfile")
val connection = ConnectionFactory.createConnection(hbaseConf)
val tableName = TableName.valueOf("hfile")
//没有HBase表则创建
creteHTable(tableName, connection)
val table = connection.getTable(tableName)
try {
val regionLocator = connection.getRegionLocator(tableName)
val job = Job.getInstance(hbaseConf)
job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
job.setMapOutputValueClass(classOf[KeyValue])
HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator)
val savePath = "hdfs://linux-1:9000/hfile_save"
delHdfsPath(savePath, sparkSession)
job.getConfiguration.set("mapred.output.dir", savePath)
data.saveAsNewAPIHadoopDataset(job.getConfiguration)
val bulkLoader = new LoadIncrementalHFiles(hbaseConf)
bulkLoader.doBulkLoad(new Path(savePath), connection.getAdmin, table, regionLocator)
} finally {
//WARN LoadIncrementalHFiles: Skipping non-directory hdfs://linux-1:9000/hfile_save/_SUCCESS 不影响,直接把文件移到HBASE对应HDFS地址了
table.close()
connection.close()
}
sparkSession.stop()
}
def creteHTable(tableName: TableName, connection: Connection): Unit = {
val admin = connection.getAdmin
if (!admin.tableExists(tableName)) {
val tableDescriptor = new HTableDescriptor(tableName)
tableDescriptor.addFamily(new HColumnDescriptor(Bytes.toBytes("hfile-fy")))
admin.createTable(tableDescriptor)
}
}
def delHdfsPath(path: String, sparkSession: SparkSession) {
val hdfs = FileSystem.get(sparkSession.sessionState.newHadoopConf())
val hdfsPath = new Path(path)
if (hdfs.exists(hdfsPath)) {
//val filePermission = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.READ)
hdfs.delete(hdfsPath, true)
}
}
}HBase和Hive的集成操作
hbase和hive的对比说明
- hive: 就是一个数据仓库的工具 , 基于HADOOP, 数据存储在Datanode, 执行翻译为MR, 支持SQL, 支持join 主要是用于离线分析操作 与清洗操作, 延迟较高
- hbase: 是一个nosql型数据库, 用于存储数据, 基于hadoop, 数据最终存储在datanode, 不支持SQL, 不支持join 主要是用于接入在线业务, 延迟较低 , 具有高效的随机读写能力
说明:
hive 和 hbase都是基于hadoop的不同的工具, hive和hbase可以集成在一起,hive on hbase ,使用hql进行批量分析查询,若是要求随机读写需集成phoenix。
1.2 hbase如何hive进行集成操作
集成步骤:
- 将hive提供的一个和hbase整合的通信包, 导入到Hbase的lib目录下
cd /export/server/hive-2.1.0/lib/
cp hive-hbase-handler-2.1.0.jar /export/server/hbase-2.1.0/lib/- 将 这个通信包 发送给 node1 和 node2的hbase的lib目录下
cd /export/server/hbase-2.1.0/lib/
scp hive-hbase-handler-2.1.0.jar node1:/export/server/hbase-2.1.0/lib/
scp hive-hbase-handler-2.1.0.jar node2:/export/server/hbase-2.1.0/lib/- 修改 hive的 配置文件:
- hive-site.xml
cd /export/server/hive-2.1.0/conf/
vim hive-site.xml
添加以下内容:
<property>
<name>hive.zookeeper.quorum</name>
<value>node1,node2,node3</value>
</property>
<property>
<name>hbase.zookeeper.quorum</name>
<value>node1,node2,node3</value>
</property>
<property>
<name>hive.server2.enable.doAs</name>
<value>false</value>
</property>- hive-env.sh
cd /export/server/hive-2.1.0/conf/
vim hive-env.sh
添加以下内容:
export HBASE_HOME=/export/server/hbase-2.1.0- 启动
1) 先启动 zookeeper , 保证zookeeper启动良好
2) 接着启动 hadoop集群, 保证hadoop是启动良好
3) 然后启动 hbase集群, 保证hbase集群是启动良好的
4) 最后启动 hive , 保证hive是启动良好的
说明: 3 和 4 可以调换- 测试 查看是否可用:
第一步: 在hbase的shell客户端下, 创建一个表 并添加相关的数据
hbase(main):007:0> create 'hbase_hive_score','cf'
hbase(main):007:0> put 'hbase_hive_score' ,'1','cf:name','zhangsan'
hbase(main):007:0> put 'hbase_hive_score' ,'1','cf:age','25'
hbase(main):008:0> put 'hbase_hive_score' ,'2','cf:name','lisi'
hbase(main):009:0> put 'hbase_hive_score' ,'2','cf:age','30'
hbase(main):010:0> put 'hbase_hive_score' ,'3','cf:name','wangwu'
hbase(main):011:0> put 'hbase_hive_score' ,'3','cf:age','18'
hbase(main):012:0> scan 'hbase_hive_score'
ROW COLUMN+CELL
1 column=cf:age, timestamp=1615427034130, value=25
1 column=cf:name, timestamp=1615427024464, value=zhangsan
2 column=cf:age, timestamp=1615427052348, value=30
2 column=cf:name, timestamp=1615427045923, value=lisi
3 column=cf:age, timestamp=1615427082291, value=18
3 column=cf:name, timestamp=1615427073970, value=wangwu
第二步: 在hive中对hbase的这个表进行映射匹配操作, 由于数据是被hbase所管理, 在hive中建表选择外部表
语法格式:
create external table 表名 (
字段1 类型,
字段2 类型,
字段3 类型
....
) stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' with serdeproperties('hbase.columns.mapping'=':key,列族1:列名1...') tblproperties('hbase.table.name'='hbase的表名');
注意:
表名 : 建议和hbase表名保持一致 (不一致也是OK的)
字段 : 建议和hbase列名保持一致
字段的第一个, 建议放置的主键字段, 不需要加primary key
create external table day03_hivehbase.hbase_hive_score (
id int,
name string,
age int
) stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' with serdeproperties('hbase.columns.mapping'=':key,cf:name,cf:age') tblproperties('hbase.table.name'='hbase_hive_score');hive集成存在的问题:
Apache Phoenix
Apache Phoenix 仅仅是一款可以通过SQL的方式来操作(CRUD)hbase的工具,底层大量的利用hbase的协处理器。
phoenix集成hbase与hive、impala集成的对比
HBase的协处理器
3.1 协处理器的基本介绍
hbase提供的协处理器主要有二大类
- observer:
可以将observer看做是 数据库的触发器 或者可以理解为监听器, 可以通过observer提供一些钩子(事件), 对某些事件进行监听操作, 一旦触发了这个事件, 理解通知绑定监听事件的人即可
这类协处理器还可以做什么事情呢?
1) 操作日志记录
2) 权限的管理 <img src="https://onedrive.ipfscan.us.kg/_layouts/52/download.aspx?share=EYN0_O-4wL5EqYbr6x4K3AYBdXJsQuSPqzXRcJuolzlAug" alt="hbase.assets/image-20210311153033576.png" title="image-20210311153033576.png">
- endpoint:
这类协处理器 可以将其看做是 数据库的中存储过程, 也可以类似于在java中定义一个方法, 将相关的功能放置在这个方法中 即可
一旦定义这样协处理器, 可以将这个协处理器提交到server(服务)端, 有各个服务端来执行这段操作, 将执行的结果返回给客户端, 客户端在根据返回结果做相应的处理即可
作用: 做 聚合操作 sum count max ...3.2 如何设置协处理器
设置方式一: 静态设置 全局设置
此配置需要配置到hbase-site.xml中 全局有效, 每个hbase表 都会有这个协处理器设置方式二: 动态设置, 只针对某个表有效
第一步: 禁用表
第二步: 添加协处理器
第三步: 启用表
如何卸载动态设置协处理器:
第一步: 禁用表
第二步: 删除协处理器
第三步: 启用表
apache Phoenix的安装
注意:
1) 在安装完成后, 如果hbase无法启动, 请检查 hbase的配置文件 以及lib目录,3个节点都需要检查 是否OK
2) 在安装完成后, 如果Phoenix无法启动, 一启动就报错, 检查 Phoenix的bin目录下的hbase-site.xml 其内容是否是hbase的conf目录下的那个hbase-site.xml的内容Apache Phoenix的基本入门操作
- 如何在Phoenix创建表
--格式:
create table [if not exists] 表名 (
rowkey名称 数据类型 primary key,
列族名.列名1 数据类型 ,
列族名.列名2 数据类型 ,
列族名.列名3 数据类型 ,
列族名.列名4 数据类型
.....
);- 案例: 创建一张订单表
create table order_dtl (
id varchar primary key,
c1.status varchar ,
c1.money integer ,
c1.pay_way integer ,
c1.user_id varchar,
c1.operation varchar,
c1.category varchar
);执行完成后, 可以在hbase的webui中查看到, 多出来一张表, 在此表中默认的region数量为1, 同时给表加入很多协处理器
注意: Phoenix会自动将小写变更为大写: 表名 列族 列名
需求: 字段必须为小写, 不使用大写 , 如何做
create table "order_dtl_01" (
"id" varchar primary key,
"c1"."status" varchar ,
"c1".money integer ,
c1."pay_way" integer ,
c1.user_id varchar,
c1.operation varchar,
c1.category varchar
);
注意: 如果想要使用小写, 只需要在需要小写的内容两端加上双引号(必须为双引号)
单引号 表示是普通字符串
推荐: 建议使用大写, 如果为小写, 后续所有的操作, 只要用到这个小写的内容, 必须加双引号
- 如何在Phoenix中查看所有的表
格式:
!table- 查看某一个表的结构信息
格式:
!desc 表名注意: Phoenix会自动将小写变更为大写: 表名 列族 列名
需求: 字段必须为小写, 不使用大写 , 如何做
- 如何向表中插入数据
格式:
upsert into 表名(列族.列名1,列族.列名2,... ) values(值1,值2 ....)
案例:
upsert into ORDER_DTL values('000002','未提交',4070,1,'4944191','2020/04/25 12:09:16','手机');- 查询操作: 与标准的SQL是一致的
- 只不过 不支持 join 不支持多表关联 仅支持单表查询
- 删除数据: 与标准SQL是一致的
- 分页查询
语法:
select * from 表 limit 每页显示n条 offset(m-1)
(从第m条开始显示n条)
案例:
1) 首先先添加一坨数据
UPSERT INTO "ORDER_DTL" VALUES('000002','已提交',4070,1,'4944191','2020-04-25 12:09:16','手机;');
UPSERT INTO "ORDER_DTL" VALUES('000003','已完成',4350,1,'1625615','2020-04-25 12:09:37','家用电器;;电脑;');
UPSERT INTO "ORDER_DTL" VALUES('000004','已提交',6370,3,'3919700','2020-04-25 12:09:39','男装;男鞋;');
UPSERT INTO "ORDER_DTL" VALUES('000005','已付款',6370,3,'3919700','2020-04-25 12:09:44','男装;男鞋;');
UPSERT INTO "ORDER_DTL" VALUES('000006','已提交',9380,1,'2993700','2020-04-25 12:09:41','维修;手机;');
UPSERT INTO "ORDER_DTL" VALUES('000007','已付款',9380,1,'2993700','2020-04-25 12:09:46','维修;手机;');
UPSERT INTO "ORDER_DTL" VALUES('000008','已完成',6400,2,'5037058','2020-04-25 12:10:13','数码;女装;');
UPSERT INTO "ORDER_DTL" VALUES('000009','已付款',280,1,'3018827','2020-04-25 12:09:53','男鞋;汽车;');
UPSERT INTO "ORDER_DTL" VALUES('000010','已完成',5600,1,'6489579','2020-04-25 12:08:55','食品;家用电器;');
UPSERT INTO "ORDER_DTL" VALUES('000011','已付款',5600,1,'6489579','2020-04-25 12:09:00','食品;家用电器;');
UPSERT INTO "ORDER_DTL" VALUES('000012','已提交',8340,2,'2948003','2020-04-25 12:09:26','男装;男鞋;');
UPSERT INTO "ORDER_DTL" VALUES('000013','已付款',8340,2,'2948003','2020-04-25 12:09:30','男装;男鞋;');
UPSERT INTO "ORDER_DTL" VALUES('000014','已提交',7060,2,'2092774','2020-04-25 12:09:38','酒店;旅游;');
UPSERT INTO "ORDER_DTL" VALUES('000015','已提交',640,3,'7152356','2020-04-25 12:09:49','维修;手机;');
UPSERT INTO "ORDER_DTL" VALUES('000016','已付款',9410,3,'7152356','2020-04-25 12:10:01','维修;手机;');
UPSERT INTO "ORDER_DTL" VALUES('000017','已提交',9390,3,'8237476','2020-04-25 12:10:08','男鞋;汽车;');
UPSERT INTO "ORDER_DTL" VALUES('000018','已提交',7490,2,'7813118','2020-04-25 12:09:05','机票;文娱;');
UPSERT INTO "ORDER_DTL" VALUES('000019','已付款',7490,2,'7813118','2020-04-25 12:09:06','机票;文娱;');
UPSERT INTO "ORDER_DTL" VALUES('000020','已付款',5360,2,'5301038','2020-04-25 12:08:50','维修;手机;');
UPSERT INTO "ORDER_DTL" VALUES('000021','已提交',5360,2,'5301038','2020-04-25 12:08:53','维修;手机;');
UPSERT INTO "ORDER_DTL" VALUES('000022','已取消',5360,2,'5301038','2020-04-25 12:08:58','维修;手机;');
UPSERT INTO "ORDER_DTL" VALUES('000023','已付款',6490,0,'3141181','2020-04-25 12:09:22','食品;家用电器;');
UPSERT INTO "ORDER_DTL" VALUES('000024','已付款',3820,1,'9054826','2020-04-25 12:10:04','家用电器;;电脑;');
UPSERT INTO "ORDER_DTL" VALUES('000025','已提交',4650,2,'5837271','2020-04-25 12:08:52','机票;文娱;');
UPSERT INTO "ORDER_DTL" VALUES('000026','已付款',4650,2,'5837271','2020-04-25 12:08:57','机票;文娱;');
2) 采用分页查询: 每页显示 5 条 , 显示第一页- 删除表: 和标准SQL是一致
Apache Phoenix的预分区操作
通过Phoenix来构建表, 默认情况下, 只有一个region
Phoenix预分区的方式:
- 手动预分区:
语法格式:
create table [if not exists] 表名 (
rowkey名称 数据类型 primary key,
列族名.列名1 数据类型 ,
列族名.列名2 数据类型 ,
列族名.列名3 数据类型 ,
列族名.列名4 数据类型
.....
)
compression='GZ' -- 压缩方式
split on(region分区方案) -- 定义手动预分区操作
;
案例:
drop table order_dtl;
create table order_dtl (
id varchar primary key,
c1.status varchar ,
c1.money integer ,
c1.pay_way integer ,
c1.user_id varchar,
c1.operation varchar,
c1.category varchar
)
compression='GZ'
split on('10','20','30');- hash预分区:
格式:
create table [if not exists] 表名 (
rowkey名称 数据类型 primary key,
列族名.列名1 数据类型 ,
列族名.列名2 数据类型 ,
列族名.列名3 数据类型 ,
列族名.列名4 数据类型
.....
)
compression='GZ', -- 压缩方式
salt_buckets=N -- 加盐预分区 (hash + rowkey自动加盐)
;
案例
drop table order_dtl;
create table order_dtl (
id varchar primary key,
c1.status varchar ,
c1.money integer ,
c1.pay_way integer ,
c1.user_id varchar,
c1.operation varchar,
c1.category varchar
)
compression='GZ' ,
salt_buckets=10; 总结: 如果使用Phoenix的加盐预分区方案, Phoenix在添加数据的时候, 会自动在rowkey的前面进行加盐处理, 但是对用户从操作Phoenix角度来说是无感操作,除非我们去hbase查看原始内容
apache Phoenix的视图
默认情况下, Phoenix中只展示由Phoenix自己创建表, 如果说hbase的表是通过hbase自己来构建的, 在Phoenix中无法查看到, 那么也就意味着, 无法通过Phoenix来操作hbase原有表
如果想通过Phoenix对hbase原有表进行SQL的操作, 此时可以利用Phoenix提供的视图来解决
如何实现视图呢?
格式:
create view "名称空间"."hbase对应的表的表名" (
key varchar primary key,
"列族"."列名" 类型,
.....
) [default_colunm_family='列族名'];
注意事项:
视图的名称 一定要与 需要建立视图的hbase的表名是一致的
key的名称是可以任意的,但是必须添加primary key
普通的列, 需要和 hbase的对应表保持一致案例: 针对 WATER_BILL 表 构建视图
create view "WATER_BILL" (
ID varchar primary key,
C1.NAME varchar,
C1.ADDRESS varchar,
C1.LATEST_DATE varchar,
C1.NUM_CURRENT UNSIGNED_DOUBLE ,
C1.NUM_PREVIOUS UNSIGNED_DOUBLE ,
C1.NUM_USAGE UNSIGNED_DOUBLE ,
C1.PAY_DATE varchar,
C1.RECORD_DATE varchar,
C1.SEX varchar,
C1.TOTAL_MONEY UNSIGNED_DOUBLE
);
UNSIGNED_DOUBLE: 无符号的double类型查询视图, 观察是否有数据:
查询 六月份的用户的用水量
select name,num_usage, record_date from water_bill where record_date between '2020-06-01' and '2020-06-30';Apache Phoenix的二级索引
索引目的: 提高查询的效率
2.1 Phoenix索引的分类
在Phoenix中共计提供了四种索引:
- 全局索引
- 特点:
在构建了全局索引之后, 会单独的形成一张索引表,单独的索引表与目标表拥有相同的region数量, 当查询数据时候, 可以先到索引表查询, 查询到之后, 再到目标中进行查询即可, 但是如果对数据进行修改, 此时索引表的数据也会随之进行修改操作
注意:
在修改索引的表, 对于全局索引而言, 需要做全局更新操作, 代价较大, 效率较低
在查询的过程中, 如果SQL语句中出现了非索引的字段, 此时全局索引无法生效
全局索引一般和覆盖索引搭配使用,读的效率很高,但对写入的效率影响较大, 所以说 全局索引 比较适用于 读多 写少的场景- 适用于: 读多 写少的场景
- 如何来构建索引呢?
格式:
create index 索引名称 on 表名(列名1,列名2 ....)- 如何删除索引呢?
格式:
drop index 索引名称 on 表名;- 本地索引
- 特点:
在构建了本地索引后, 不会单独创建一张索引表, 索引数据直接附带在目标表对应字段的后面, 这样在进行修改(增 删 改)数据操作的时候 , 直接在目标对索引数据一次性就处理掉了, 适用于写的多场景
在执行查询的时候, Phoenix会自动选择是否使用本地索引, 即使有非索引的字段, 依然可用
注意:
如果表在构建的时候, 采用的加盐预分区的方案, 建议大家不要使用本地索引, 因为有部分操作是不支持的- 适用于: 写多 读少的场景
- 如何创建本地索引呢?
格式:
create local index 索引名称 on 表名(列名1,列名2 ....)- 如何删除索引
格式:
drop index 索引名称 on 表名;- 覆盖索引
- 特点:
覆盖索引无法单独使用, 必须和全局索引或者本地索引配合使用, 在构建覆盖索引后, 将对应索引字段数据, 直接放置在本地索引或者全局索引字段的后面即可, 这样在查询的时候, 可以不去查询目标表, 直接在索引表中可以所有需要的字段数据, 这样可以提升读取数据时间
一般会和全局索引组合使用- 适用于: 将一些不参与条件的字段 但是会参与展示的字段 放置在覆盖索引上
- 如何构建覆盖索引呢?
格式:
create [local] index 索引名称 on 表名(列名1, 列名2....) include(列1,列2...)如何删除覆盖索引: 随着全局或者本地索引的删除而删除
- 函数索引
- 特点
无法单独使用, 在本地索引或者全局索引中使用, 主要用于针对某个函数的结果来构建索引, 这样以后用到这个对应结果数据的函数, 那么也可以执行索引优化- 适用于: 多条SQL语句中, 需要频繁的使用某个函数的结果
- 如何构建呢?
格式:
create [local] index 索引名称 on 表名(列1,列2,函数名称(字段...)...)2.2 案例一: 创建全局索引+覆盖索引
需求: 查询 用户id为 8237476 的订单数据
SQL:
select USER_ID,ID,MONEY from order_dtl where USER_ID='8237476';
使用 explain 检查其执行的效率:
explain select USER_ID,ID,MONEY from order_dtl where USER_ID='8237476';
结论: 通过扫描全表 获取的结果数据- 索引的方式来提升效率
1) 创建索引:
create index idx_index_order_dtl on order_dtl(user_id) include(ID,MONEY);2) 重新检查刚刚SQL, 是否会执行操作
explain select USER_ID,ID,MONEY from order_dtl where USER_ID='8237476';如果查询的中, 出现了非索引的字段, 会出现什么问题呢?
总结: 全局索引无法生效
要求: 就想使用全局索引, 而且不想对这个非索引的字段构建索引
解决方案: 通过强制使用全局索引方案 来生效
explain select /*+INDEX(order_dtl idx_index_order_dtl) */ * from order_dtl where USER_ID='8237476';删除索引
drop index idx_index_order_dtl on order_dtl;2.3 案例二: 本地索引
- 需求: 根据 订单ID 订单状态 支付金额 支付方式 用户ID 查询数据
1) 构建索引:
create local index IDX_LOCAL_order_dtl on order_dtl(ID,STATUS,MONEY,PAY_WAY,USER_ID);说明: 虽然可以通过!table看到索引表, 本质上在hbase上没有构建索引表
2) 测试1: 所有字段都是本地索引
explain select USER_ID,ID,MONEY from order_dtl where USER_ID='8237476';3) 测试2: 有部分字段没有索引
explain select USER_ID,ID,MONEY,CATEGORY from order_dtl where USER_ID='8237476';3) 测试3: 查询所有的字段
explain select * from order_dtl where USER_ID='8237476';
原因:
表是加盐预分区的操作注意: 本地索引 会直接对目标表数据产生影响, 所以一旦构建本地索引, 将不再支持通过原生API 查询数据
2.4 案例三: 实现WATER_BILL查询操作
- 原来的查询WATER_BILL的SQL语句:
select name,num_usage, record_date from water_bill where record_date between '2020-06-01' and '2020-06-30';
花费时间为: 8s多- 接下来对其进行优化:
1) 创建索引: 全局|本地 + 覆盖
create index IDX_INDEX_WATER_BILL ON WATER_BILL(record_date) include (name,num_usage);
2) 执行查询操作:
select name,num_usage, record_date from water_bill where record_date between '2020-06-01' and '2020-06-30';
花费时间: 7s左右总结索引的使用
1) 给那些经常查询使用字段添加索引
2) 如果前期经常使用, 但是后期不使用, 请将索引信息删除
3) 如果前期不经常使用,但是后期经常使用, 后期还需要添加索引
索引好处: 提升查询的效率
弊端: 数据会冗余存储 占用磁盘空间
索引: 以空间换时间操作hbase的表结构设计
hbase的名称空间(命名空间)
hbase的名称空间 类似于 在mysql 或者 hive中的数据库, 只不过在hbase将其称为叫做名称空间(命名空间)
思考: mysql或者hive 为什么会有数据库的概念呢?
- 利于管理,利于维护
- 业务划分更加明确
- 可以针对性设置权限
同样的, hbase的名称空间也具有相似作用
在hbase中, 默认情况下, 是有两个名称空间的, 一个 hbase 和 default
- 名称为 hbase的名称空间: 专门用于放置hbase内部的管理表的, 此名称空间, 一般不操作, hbase自己维护即可
- 内部有一张核心的元数据表: meta表
- 此表存储了hbase中所有自定义表的相关的元数据信息
- 内部有一张核心的元数据表: meta表
- 名称为 default 的名称空间: 默认的名称空间, 在创建hbase表的时候, 如果没有指定名称空间, 默认就创建到此空间下
- 类似于 hive中也有一个default数据库
- 在测试 学习的时候可以使用
如何操作HBase的名称空间呢?
# 如何创建名称空间:
create_namespace '名称空间'
#查看当下有那些名称空间
list_namespace
#查看某一个名称空间
describe_namespace '名称空间'
#删除名称空间
drop_namespace '名称空间'
注意: 在删除这个空间的时候, 需要保证此空间下没有任何表
#如何在某个名称空间下创建表
create '名称空间名称:表名','列族' ...
注意: 一旦建立在default以外的名称空间下, 在后续操作这个表, 必须携带名称空间,否则hbase会到default空间下hbase表的列族的设计
结论: 在建表的时候, 如果能用一个列族来解决的, 坚决使用一个即可, 能少则少
原因:
过多的列族会导致在region中出现过多的store模块, 而每个store模块中由一个memStore 和 多个storeFile构成的, 这样会导致数据在存储或者读取的时候, 需要跨域多个内存和多个文件才能将整行的数据都获取到, 此时增大了IO操作, 从而导致效率比较低
而且这个列族多了之后, 会导致region可能会频繁的要进行刷新 和 compact合并操作
多个列族在文件层次上存储在不同的HFile文件中。
思考: 什么场景下 可能会构建多个列族呢? 一般为 2~5
- 假设有一个表, 表中字段比较多, 但是某些字段不常使用, 此时可以将经常使用的字段放入某一个列族中, 另一个放置不常使用字段即可
- 假设一个表要对接多个不同业务, 而多个不同业务可能会用到不同的字段, 可以根据业务 划分不同的列族中
hbase表的预分区
在hbase中 表默认情况下 只有一个region, 而一个region只能被一个regionServer所管理
思考:
假设通过默认的方式, 创建了一张hbase的表, 接着需要向这个表写入大量的数据, 同时也有大量的读请求在查询这个表中数据, 此时会发生什么问题呢?
出现问题: 对应管理region的regionServer的服务器可能出现延迟较大, 甚至出现宕机的问题
原因: 只有一个region, 对应只有一个regionServer, 一个regionServer需要承载这个表所有并发读写请求
如何解决呢?
如果可以在建表的时候, 一次性构建出多个region, 然后多个region能够均匀的分布在不同的regionServer上, 这样问题及迎刃而解了实现方式: 通过HBase的预分区
预分区的目的: 让hbase在建表的时候, 就可以拥有多个region
如何实现预分区的操作:
- 手动预分区操作
create '表名' ,'列族1', SPLITS=>[分区范围即可] 例子: create 't1_split','f1',SPLITS=>['10','20','30','40']- 采用 hash预分区方案
格式 create '表名' ,'列族1', {NUMREGIONS=>N,SPLITALGO=>'HexStringSplit'} 案例: create 't2_split','f1',{NUMREGIONS=>10,SPLITALGO=>'HexStringSplit'}hbase.hregion.max.filesize不宜过大或过小,经过实战,生产高并发运行下,最佳大小5-10GB!关闭某些重要场景的HBase表的major_compact!在非高峰期的时候再去调用major_compact,这样可以减少split的同时,显著提供集群的性能,吞吐量、非常有用。 一般情况下,单个region的大小建议控制在5GB以内,可以通过参数hbase.hregion.max.filesize来设置,单个regionserver下的regions个数控制在200个以内。regions过多会导致集群不堪重负、regionserver 频繁FullGC的情况,进而影响集群的稳定性甚至上层的应用服务。
hbase的版本确界和TTL
什么是数据版本确界
数据版本的确界 所描述的就是在hbase中 数据的历史版本的数量
- 下界: hbase中对应单元格 至少需要保留多少版本, 即使数据已经过期了 ,
- 默认值为 0
- 上界: hbase中对应单元格 最多可以保留多少个版本, 如果比设置多了, 最早期本部会被覆盖掉
- 默认值为 1
2.6.2 什么是数据的TTL
在hbase中, 可以针对数据设置过期时间, 当时间过期后, hbase会自动将过期数据给清理掉
2.6.3 代码演示数据版本确界和TTL
- 代码内容:
public class HBaseTTLTest {
//1. 创建一个Hbase的表 (设置 版本确界 以及 TTL)
//2. 向表添加数据, 添加后 进行多次修改操作(产生历史版本)
//3. 查询数据: 需要将其历史版本全部都查询到
public static void main(String[] args) throws Exception {
//1. 根据连接工厂获取hbase的连接对象
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum","node1:2181,node2:2181,node3:2181");
Connection hbConn = ConnectionFactory.createConnection(conf);
//2. 获取相关的管理对象
Admin admin = hbConn.getAdmin();
//3. 执行相关的操作:
//3.1: 判断表是否存在
if( !admin.tableExists(TableName.valueOf("day03_hbaseTTL"))){
// 只要能进来 说明 表不存在
TableDescriptorBuilder descBuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf("day03_hbaseTTL"));
ColumnFamilyDescriptorBuilder familyDesc = ColumnFamilyDescriptorBuilder.newBuilder("C1".getBytes());
// 设置 版本 确界 和 TTL
familyDesc.setMinVersions(3); // 设置 下界
familyDesc.setMaxVersions(5); // 设置 上界
familyDesc.setTimeToLive(30); // 设置 ttl 有效时间为30s
ColumnFamilyDescriptor family = familyDesc.build();
descBuilder.setColumnFamily(family);
TableDescriptor desc = descBuilder.build();
//3.2: 创建表
admin.createTable(desc);
}
//3.3: 添加数据操作
//3.3.1: 获取 table管理对象
Table table = hbConn.getTable(TableName.valueOf("day03_hbaseTTL"));
//3.3.2: 执行添加数据操作
for(int i = 1 ; i<= 2 ; i++){
Put put = new Put("rk001".getBytes());
put.addColumn("C1".getBytes(),"NAME".getBytes(),("zhangsan"+i).getBytes());
table.put(put);
}
//3.4: 查询数据
Get get = new Get("rk001".getBytes());
get.readAllVersions(); // 获取所有的历史版本
Result result = table.get(get);
//Cell[] rawCells = result.rawCells();
List<Cell> cells = result.listCells();
for (Cell cell : rawCells) {
System.out.println(Bytes.toString(CellUtil.cloneValue(cell)));
}
//4. 释放资源
table.close();
admin.close();
hbConn.close();
}
}总结:
即使所有的历史版本的数据都过期了, hbase也会至少保留 min_version个最新版本数据, 以保证我们在查询数据的时候. 可以有数据返回
hbase的中rowkey的设计原则
说明:
单纯的通过预分区 是无法解决 向hbase存储数据的高并发的读写问题, 因为如果所有的数据都是某一个region内的数据, 此时依然相当于表只有一个region
解决方案:
需要在预分区的基础, 让rowkey的值 能够均匀的落在不同的region上, 才可以解决
所以: rowkey设计良好, 直接关系到, 数据应该存储到那个region中, 如果设置不良好, 可能会导致数据倾斜问题
设置rowkey有什么样原则呢?
- 官方rowkey设计原则
1) 避免将 递增行键/时序数据 放置在rowkey最前面
2) 避免rowkey和列的长度过大
说明, 太长rowkey 和列 会导致 占用空间更大, 导致在内存中存储数据行数会更少, 从而最终导致提前进行flush操作, 影响效率
rowkey支持大小: 64kb
一般情况下: 建议在 100字节以下 , 大部分长度为 10~20左右
3) 使用Long等类型 比String类型更省空间
4) 保证rowkey的唯一性如何避免热点的问题呢?
- 反转操作
- 弊端 : 如果是相关性比较强的数据, 此种打乱会导致读写效率降低
- 加盐策略(随机在rowkey前面加一个随机字符串或者随机数)
- 弊端: 如果是相关性比较强的数据, 此种打乱会导致读写效率降低
- 哈希策略(MurmurHash3)
hbase的表的压缩方案的选择
在生产中如何选择压缩方式呢?
1) 必须要压缩, 如果表的数据, 需要进行频繁的数据读写操作, 而且数据量比较大, 建议采用 snappy压缩方案
2) 必须要压缩, 如果表的数据, 需要进行大量写入操作, 但是读取操作不是特别频繁, 建议采用 GZIP|GZ如何在hbase设置表的压缩方案? 默认情况下HBASE是不压缩的, 压缩方案是针对列族来说的
配置操作:
格式:
建表时: create '表名' , {NAME=>'列族',COMPRESSION=>'GZ|SNAPPY'}
对已经存在表, 通过修改表结构: alter '表名', {NAME=>'列族',COMPRESSION=>'GZ|SNAPPY'}注意:
如果需要采用SNAPPY 压缩方案, 可能需要对HBase进行重新编译, 在编译时类似于 Hadoop, 将支持压缩的C接口放入hbase编译包中 才可以
如果要采用LZO的压缩方案, 需要放置LZO的压缩的jar包
HBase数据结构
简介
传统关系型数据库,一般都选择使用B+树作为索引结构,而在大数据场景下,HBase、Kudu这些存储引擎选择的是LSM树。LSM树,即日志结构合并树(Log-Structured Merge-Tree)。
B+树是建立索引的通用技术,但如果并发写入压力较大时,B+树需要大量的磁盘随机IO,而严重影响索引创建的速度,在一些写入操作非常频繁的应用场景中,就不太适合了
LSM树通过磁盘的顺序写,来实现最好的写性能
LSM树设计思想
LSM 的主要思想是划分不同等级的结构,换句话来理解,就是LSM中不止一个数据结构,而是存在多种结构
一个结构在内存、其他结构在磁盘(HBase存储结构中,有内存——MemStore、也有磁盘——StoreFile)
内存的结构可以是B树、红黑树、跳表等结构(HBase中是跳表),磁盘中的树就是一颗B+树
C0层保存了最近写入的数据,数据都是有序的,而且可以随机更新、随机查询
C1到CK层的数据都是存在磁盘中,每一层中key都是有序存储的