`

HBase 写入数据Region路由机制

阅读更多
HBase put一条数据 Region 路由规则
1.客户端put接口
org.apache.hadoop.hbase.client.HTableInterface.put(Put put)
   org.apache.hadoop.hbase.client.HTable.put
   public void put(final Put put) throws IOException {
    //缓存数据
 doPut(put);
    if (autoFlush) {
   //提交数据刷写到磁盘请求
      flushCommits();
    }
  }
 
2.提交写请求
org.apache.hadoop.hbase.client.HTable.flushCommits
 public void flushCommits() throws IOException {
    try {
      Object[] results = new Object[writeBuffer.size()];
      try {
  //提交数据刷下请求
        this.connection.processBatch(writeBuffer, tableName, pool, results);
      } catch (InterruptedException e) {
        throw new IOException(e);
      } finally {
      ...
    } finally {
      if (clearBufferOnFail) {
        writeBuffer.clear();//清理客户端缓存数据
        currentWriteBufferSize = 0;
      } else {
        // 计算客户端缓存数据大小
        currentWriteBufferSize = 0;
        for (Put aPut : writeBuffer) {
          currentWriteBufferSize += aPut.heapSize();
        }
      }
    }
  }
 
3.处理批量写
org.apache.hadoop.hbase.client.HConnection.processBatch()
                              .HConnectionImplementation.processBatch()
    public void processBatch(List<? extends Row> list,
        final byte[] tableName,
        ExecutorService pool,
        Object[] results) throws IOException, InterruptedException {
      // This belongs in HTable!!! Not in here. St.Ack
      // results must be the same size as list
      if (results.length != list.size()) {
        throw new IllegalArgumentException("argument results must be the same size as argument list");
      }
      //处理批量写
      processBatchCallback(list, tableName, pool, results, null);
    }
 //这个方法非常重要,在这里定义了,哪些rowkey对应的数据应该存放到那个Region上,然后将相应的数据提交到Region对应的RegionServer上
    public <R> void processBatchCallback(
        List<? extends Row> list,
        byte[] tableName,
        ExecutorService pool,
        Object[] results,
        Batch.Callback<R> callback)
    throws IOException, InterruptedException {
        ...
        // step 1: 分解Regionserver块并构建对应的数据结构
  //HRegionLocation这个对象至关重要,他定义了Region相关的信息HRegionInfo,RegionServer Name、port
        Map<HRegionLocation, MultiAction<R>> actionsByServer =
          new HashMap<HRegionLocation, MultiAction<R>>();
        for (int i = 0; i < workingList.size(); i++) {
          Row row = workingList.get(i);
          if (row != null) {
   //下面这句是整个put数据路由的核心,将提交的数据根据row分类到不同的Region上
            HRegionLocation loc = locateRegion(tableName, row.getRow());
            byte[] regionName = loc.getRegionInfo().getRegionName();
            MultiAction<R> actions = actionsByServer.get(loc);
            if (actions == null) {
              actions = new MultiAction<R>();
              actionsByServer.put(loc, actions);
            }
            Action<R> action = new Action<R>(row, i);
            lastServers[i] = loc;
            actions.add(regionName, action);
          }
        }
        // step 2: 提交请求到相应的RegionServer上去处理
        Map<HRegionLocation, Future<MultiResponse>> futures =
            new HashMap<HRegionLocation, Future<MultiResponse>>(
                actionsByServer.size());
        for (Entry<HRegionLocation, MultiAction<R>> e: actionsByServer.entrySet()) {
          futures.put(e.getKey(), pool.submit(createCallable(e.getKey(), e.getValue(), tableName)));
        }
        // step 3:收集写入成功是失败的返回结果
        ...
        // step 4: 对写入失败的数据进行重试.
        ...
    }
 
 
 
 public HRegionLocation relocateRegion(final byte [] tableName,
        final byte [] row)
    throws IOException{
   ...
      return locateRegion(tableName, row, false, true);
    }
 
4.处理批量写
org.apache.hadoop.hbase.client.HConnectionImplementation.locateRegion()
    private HRegionLocation locateRegion(final byte [] tableName,
      final byte [] row, boolean useCache, boolean retry)
    throws IOException {
   ...
   //确保ZK是正常的
      ensureZookeeperTrackers();
      if (Bytes.equals(tableName, HConstants.ROOT_TABLE_NAME)) {
        ...
      } else if (Bytes.equals(tableName, HConstants.META_TABLE_NAME)) {
        ...
      } else {//用户表的数据插入是调用下面的这个操作
        // Region not in the cache - have to go to the meta RS
        return locateRegionInMeta(HConstants.META_TABLE_NAME, tableName, row,
            useCache, userRegionLock, retry);
      }
    }
    private HRegionLocation locateRegionInMeta(final byte [] parentTable,
      final byte [] tableName, final byte [] row, boolean useCache,
      Object regionLockObject, boolean retry)
    throws IOException {
      HRegionLocation location;
      //如果客户端保存的缓存,从缓存中直接查询
      if (useCache) {
        location = getCachedLocation(tableName, row);
        if (location != null) {
          return location;
        }
      }
   //以下是如果客户端没有ZK缓存,从ZOOKEEPER -> -ROOT- -> .META.将这些数据缓存到客户端,然后再去从.META.表中数去判定row应该路由到那个Region上
     ...
    }
 
  HRegionLocation getCachedLocation(final byte [] tableName,
        final byte [] row) {
      SoftValueSortedMap<byte [], HRegionLocation> tableLocations =
        getTableLocations(tableName);
  ....
      //判断row对应的rowkey应该在哪个Region上,应该将请求发给哪个RegionServer
      byte[] endKey = possibleRegion.getRegionInfo().getEndKey();
      if (Bytes.equals(endKey, HConstants.EMPTY_END_ROW) ||
          KeyValue.getRowComparator(tableName).compareRows(
              endKey, 0, endKey.length, row, 0, row.length) > 0) {
        return possibleRegion;
      }
      return null;
    }
 
5.下面的事情就是RegionServer去将数据写道Memstore,StoreFile,HFile了
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics