HBase put一条数据 Region 路由规则
1.客户端put接口
org.apache.hadoop.hbase.client.HTableInterface.put(Put put) org.apache.hadoop.hbase.client.HTable.put public void put(final Put put) throws IOException { //缓存数据 doPut(put); if (autoFlush) { //提交数据刷写到磁盘请求 flushCommits(); } }
2.提交写请求
org.apache.hadoop.hbase.client.HTable.flushCommits public void flushCommits() throws IOException { try { Object[] results = new Object[writeBuffer.size()]; try { //提交数据刷下请求 this.connection.processBatch(writeBuffer, tableName, pool, results); } catch (InterruptedException e) { throw new IOException(e); } finally { ... } finally { if (clearBufferOnFail) { writeBuffer.clear();//清理客户端缓存数据 currentWriteBufferSize = 0; } else { // 计算客户端缓存数据大小 currentWriteBufferSize = 0; for (Put aPut : writeBuffer) { currentWriteBufferSize += aPut.heapSize(); } } } }
3.处理批量写
org.apache.hadoop.hbase.client.HConnection.processBatch() .HConnectionImplementation.processBatch() public void processBatch(List<? extends Row> list, final byte[] tableName, ExecutorService pool, Object[] results) throws IOException, InterruptedException { // This belongs in HTable!!! Not in here. St.Ack // results must be the same size as list if (results.length != list.size()) { throw new IllegalArgumentException("argument results must be the same size as argument list"); } //处理批量写 processBatchCallback(list, tableName, pool, results, null); } //这个方法非常重要,在这里定义了,哪些rowkey对应的数据应该存放到那个Region上,然后将相应的数据提交到Region对应的RegionServer上 public <R> void processBatchCallback( List<? extends Row> list, byte[] tableName, ExecutorService pool, Object[] results, Batch.Callback<R> callback) throws IOException, InterruptedException { ... // step 1: 分解Regionserver块并构建对应的数据结构 //HRegionLocation这个对象至关重要,他定义了Region相关的信息HRegionInfo,RegionServer Name、port Map<HRegionLocation, MultiAction<R>> actionsByServer = new HashMap<HRegionLocation, MultiAction<R>>(); for (int i = 0; i < workingList.size(); i++) { Row row = workingList.get(i); if (row != null) { //下面这句是整个put数据路由的核心,将提交的数据根据row分类到不同的Region上 HRegionLocation loc = locateRegion(tableName, row.getRow()); byte[] regionName = loc.getRegionInfo().getRegionName(); MultiAction<R> actions = actionsByServer.get(loc); if (actions == null) { actions = new MultiAction<R>(); actionsByServer.put(loc, actions); } Action<R> action = new Action<R>(row, i); lastServers[i] = loc; actions.add(regionName, action); } } // step 2: 提交请求到相应的RegionServer上去处理 Map<HRegionLocation, Future<MultiResponse>> futures = new HashMap<HRegionLocation, Future<MultiResponse>>( actionsByServer.size()); for (Entry<HRegionLocation, MultiAction<R>> e: actionsByServer.entrySet()) { futures.put(e.getKey(), pool.submit(createCallable(e.getKey(), e.getValue(), tableName))); } // step 3:收集写入成功是失败的返回结果 ... // step 4: 对写入失败的数据进行重试. ... }
public HRegionLocation relocateRegion(final byte [] tableName, final byte [] row) throws IOException{ ... return locateRegion(tableName, row, false, true); }
4.处理批量写
org.apache.hadoop.hbase.client.HConnectionImplementation.locateRegion() private HRegionLocation locateRegion(final byte [] tableName, final byte [] row, boolean useCache, boolean retry) throws IOException { ... //确保ZK是正常的 ensureZookeeperTrackers(); if (Bytes.equals(tableName, HConstants.ROOT_TABLE_NAME)) { ... } else if (Bytes.equals(tableName, HConstants.META_TABLE_NAME)) { ... } else {//用户表的数据插入是调用下面的这个操作 // Region not in the cache - have to go to the meta RS return locateRegionInMeta(HConstants.META_TABLE_NAME, tableName, row, useCache, userRegionLock, retry); } } private HRegionLocation locateRegionInMeta(final byte [] parentTable, final byte [] tableName, final byte [] row, boolean useCache, Object regionLockObject, boolean retry) throws IOException { HRegionLocation location; //如果客户端保存的缓存,从缓存中直接查询 if (useCache) { location = getCachedLocation(tableName, row); if (location != null) { return location; } } //以下是如果客户端没有ZK缓存,从ZOOKEEPER -> -ROOT- -> .META.将这些数据缓存到客户端,然后再去从.META.表中数去判定row应该路由到那个Region上 ... } HRegionLocation getCachedLocation(final byte [] tableName, final byte [] row) { SoftValueSortedMap<byte [], HRegionLocation> tableLocations = getTableLocations(tableName); .... //判断row对应的rowkey应该在哪个Region上,应该将请求发给哪个RegionServer byte[] endKey = possibleRegion.getRegionInfo().getEndKey(); if (Bytes.equals(endKey, HConstants.EMPTY_END_ROW) || KeyValue.getRowComparator(tableName).compareRows( endKey, 0, endKey.length, row, 0, row.length) > 0) { return possibleRegion; } return null; }
5.下面的事情就是RegionServer去将数据写道Memstore,StoreFile,HFile了
相关推荐
kettle集群搭建以及使用kettle将mysql数据转换为Hbase数据
使用spark读取hbase中的数据,并插入到mysql中
从HDFS中读文件,用groupby进行sort,然后写入Hbase中
java操作Hbase之从Hbase中读取数据写入hdfs中源码,附带全部所需jar包,欢迎下载学习。
基于数据冗余的HBase合并机制研究_HBase列式数据库的所有操作均以追加数据的方式写入,导致其合并机制占用资源过多,影响系统读性能。
springboot搭建的hbase可视化界面 支持hbase的建表与删除 支持根据rowkey查询数据
1、region 拆分机制 region中存储的是大量的rowkey数据 ,当region中的数据条数过多的时候,直接影响查询效率.当region过大的时候.hbase会拆分region , 这也是Hbase的一个优点 . HBase的region split策略一共有以下几...
从HBase的集群搭建、HBaseshell操作、java编程、架构、原理、涉及的数据结构,并且结合陌陌海量消息存储案例来讲解实战HBase 课程亮点 1,知识体系完备,从小白到大神各阶段读者均能学有所获。 2,生动形象,化繁为...
HBASE的一个读取数据流程的解析,清晰的画出整个过程,十分有利于理解
python 连接hbase 打印数据。hbase 的一些源数据未转化
为解决现有的HBase数据压缩策略选择方法未考虑数据的冷热性,以及在选择过程中存在片面性和不可靠性的缺陷,提出了基于HBase数据分类的压缩策略选择方法。依据数据文件的访问频度将HBase数据划分为冷热数据,并限定具体...
分布式数据库HBase在大规模数据加载中较传统关系型数据库有较大的优势但也存在很大的优化空间.基于Hadoop分布式平台搭建HBase环境,并优化自定义数据加载算法.首先,分析HBase底层数据存储,实验得出HBase自带数据加载...
Hbase笔记 —— 利用JavaAPI的方式操作Hbase数据库(往hbase的表中批量插入数据)
hbase备份和数据恢复,hbase与hive的互导,hbase和hdfs互导。
将数据从Hadoop中向HBase载入数据,该过程大致可以分为两步: 一、将Hadoop中普通文本格式的数据转化为可被HBase识别的HFile文件,HFile相当于Oracle中的DBF数据 文件。 二、将HFile载入到HBase中,该过程实际就是...
hbase海量数据的全量导入方法,大数据导入。
HBase基本数据操作详解,分享给大家!
2. 请简述HBase中数据写入最后导致Region分裂的全过程 3. 如果设计一个笔记的表,表中要求有笔记的属性和笔记的内容,怎么做 4. HBase部署时如何指定多个zookeeper 5. HBase shell是基于哪种JVM运行的语言实现的 6. ...
简单的介绍了habse存储数据的样子和简单的hbase shell 使用