`

hbase源码分析(一):客户端数据入库

 
阅读更多

 

Hbase插入数据的过程大致是:

  1. 客户端提交请求给region server(这中间会有作一些缓存)
  2. region server接收到请求,判断如果是put请求,将其put到memstore
  3. 每次memstore的操作,都会检查memstore是否操作一个阈值,如果超过,就开始执行flush(),这个flush其实就是从内存中的KeyValue对持久化到HStore(也就是HFile)上面

下面我们来看一条数据时怎么进入到hbase的吧:

客户端:

 

  • HTable.java 执行put操作
  public void put(final Put put) throws IOException {
    doPut(Arrays.asList(put));
  }
  •  在put方法里执行doPut操作

验证put的合法性,然后检查keyvalue的大小是否越界,这个值可以如过配置i参数hbase.client.keyvalue.maxsize参数来配置,默认这个值是无限大的,然后调用writeBuffer.add(put);将数据写入到本地缓存,当数据超过本地缓存writeBufferSize(默认是2097152)的大小或者设置了自动提交autoFlush (默认是打开的尾true)或者你手动调用了flushCommits()操作,这些缓存将被flush

  private void doPut(final List<Put> puts) throws IOException {
    int n = 0;
    for (Put put : puts) {
      validatePut(put);
      writeBuffer.add(put);//将数据写入到本地缓存
      currentWriteBufferSize += put.heapSize();
     
      // we need to periodically see if the writebuffer is full instead of waiting until the end of the List
      n++;
      if (n % DOPUT_WB_CHECK == 0 && currentWriteBufferSize > writeBufferSize) {
        flushCommits();
      }
    }
    if (autoFlush || currentWriteBufferSize > writeBufferSize) {
      flushCommits();
    }
  }

 

  • flushCommits()操作代码:
  public void flushCommits() throws IOException {
    try {
      Object[] results = new Object[writeBuffer.size()];
      try {
    	//在这里连接远程的region server提交请求
        this.connection.processBatch(writeBuffer, tableName, pool, results);
      } catch (InterruptedException e) {
        throw new IOException(e);
      } finally {
        // mutate list so that it is empty for complete success, or contains
        // only failed records results are returned in the same order as the
        // requests in list walk the list backwards, so we can remove from list
        // without impacting the indexes of earlier members
        for (int i = results.length - 1; i>=0; i--) {
          if (results[i] instanceof Result) {
            // successful Puts are removed from the list here.
            writeBuffer.remove(i);
          }
        }
      }
    } finally {
      if (clearBufferOnFail) {
        writeBuffer.clear();
        currentWriteBufferSize = 0;
      } else {
        // the write buffer was adjusted by processBatchOfPuts
        currentWriteBufferSize = 0;
        for (Put aPut : writeBuffer) {
          currentWriteBufferSize += aPut.heapSize();
        }
      }
    }
  }

我们来看看HConnection.java的实现类HConnectionImplementation是怎么实现processBatch操作的:

 

    public void processBatch(List<? extends Row> list,
        final byte[] tableName,
        ExecutorService pool,
        Object[] results) throws IOException, InterruptedException {

      // results must be the same size as list
      if (results.length != list.size()) {
        throw new IllegalArgumentException("argument results must be the same size as argument list");
      }

      processBatchCallback(list, tableName, pool, results, null);
    }
    public <R> void processBatchCallback(
        List<? extends Row> list,
        byte[] tableName,
        ExecutorService pool,
        Object[] results,
        Batch.Callback<R> callback)
    throws IOException, InterruptedException {

      // results must be the same size as list
      if (results.length != list.size()) {
        throw new IllegalArgumentException(
            "argument results must be the same size as argument list");
      }
      if (list.isEmpty()) {
        return;
      }

      // Keep track of the most recent servers for any given item for better
      // exceptional reporting.  We keep HRegionLocation to save on parsing.
      // Later below when we use lastServers, we'll pull what we need from
      // lastServers.
      HRegionLocation [] lastServers = new HRegionLocation[results.length];
      List<Row> workingList = new ArrayList<Row>(list);
      boolean retry = true;
      // count that helps presize actions array
      int actionCount = 0;
      Throwable singleRowCause = null;

      for (int tries = 0; tries < numRetries && retry; ++tries) {

        // sleep first, if this is a retry
        if (tries >= 1) {
          long sleepTime = getPauseTime(tries);
          LOG.debug("Retry " +tries+ ", sleep for " +sleepTime+ "ms!");
          Thread.sleep(sleepTime);
        }
        // step 1:分解为regionserver-sized块并构建数据结构
        Map<HRegionLocation, MultiAction<R>> actionsByServer =
          new HashMap<HRegionLocation, MultiAction<R>>();
        for (int i = 0; i < workingList.size(); i++) {
          Row row = workingList.get(i);
          if (row != null) {
            HRegionLocation loc = locateRegion(tableName, row.getRow(), true);
            byte[] regionName = loc.getRegionInfo().getRegionName();

            MultiAction<R> actions = actionsByServer.get(loc);
            if (actions == null) {
              actions = new MultiAction<R>();
              actionsByServer.put(loc, actions);
            }

            Action<R> action = new Action<R>(row, i);
            lastServers[i] = loc;
            actions.add(regionName, action);
          }
        }

        // step 2: 发出请求

        Map<HRegionLocation, Future<MultiResponse>> futures =
            new HashMap<HRegionLocation, Future<MultiResponse>>(
                actionsByServer.size());

        for (Entry<HRegionLocation, MultiAction<R>> e: actionsByServer.entrySet()) {
          //异步的处理数据写入请求
          futures.put(e.getKey(), pool.submit(createCallable(e.getKey(), e.getValue(), tableName)));
        }

        // step 3: 手机失败和成功的信息,并准备对失败的进行重新写入

        for (Entry<HRegionLocation, Future<MultiResponse>> responsePerServer
             : futures.entrySet()) {
          HRegionLocation loc = responsePerServer.getKey();

          try {
            Future<MultiResponse> future = responsePerServer.getValue();
            MultiResponse resp = future.get();

            if (resp == null) {
              // Entire server failed
              LOG.debug("Failed all for server: " + loc.getHostnamePort() +
                ", removing from cache");
              continue;
            }

            for (Entry<byte[], List<Pair<Integer,Object>>> e : resp.getResults().entrySet()) {
              byte[] regionName = e.getKey();
              List<Pair<Integer, Object>> regionResults = e.getValue();
              for (Pair<Integer, Object> regionResult : regionResults) {
                if (regionResult == null) {
                  // if the first/only record is 'null' the entire region failed.
                  LOG.debug("Failures for region: " +
                      Bytes.toStringBinary(regionName) +
                      ", removing from cache");
                } else {
                  // Result might be an Exception, including DNRIOE
                  results[regionResult.getFirst()] = regionResult.getSecond();
                  if (callback != null && !(regionResult.getSecond() instanceof Throwable)) {
                    callback.update(e.getKey(),
                        list.get(regionResult.getFirst()).getRow(),
                        (R)regionResult.getSecond());
                  }
                }
              }
            }
          } catch (ExecutionException e) {
            LOG.warn("Failed all from " + loc, e);
          }
        }

        // step 4: 识别失败的数据,并准备去重试写入

        // Find failures (i.e. null Result), and add them to the workingList (in
        // order), so they can be retried.
        retry = false;
        workingList.clear();
        actionCount = 0;
        for (int i = 0; i < results.length; i++) {
          // if null (fail) or instanceof Throwable && not instanceof DNRIOE
          // then retry that row. else dont.
          if (results[i] == null ||
              (results[i] instanceof Throwable &&
                  !(results[i] instanceof DoNotRetryIOException))) {

            retry = true;
            actionCount++;
            Row row = list.get(i);
            workingList.add(row);
            deleteCachedLocation(tableName, row.getRow());
          } else {
            if (results[i] != null && results[i] instanceof Throwable) {
              actionCount++;
            }
            // add null to workingList, so the order remains consistent with the original list argument.
            workingList.add(null);
          }
        }
      }

      if (retry) {
        // Simple little check for 1 item failures.
        if (singleRowCause != null) {
          throw new IOException(singleRowCause);
        }
      }


      List<Throwable> exceptions = new ArrayList<Throwable>(actionCount);
      List<Row> actions = new ArrayList<Row>(actionCount);
      List<String> addresses = new ArrayList<String>(actionCount);

      for (int i = 0 ; i < results.length; i++) {
        if (results[i] == null || results[i] instanceof Throwable) {
          exceptions.add((Throwable)results[i]);
          actions.add(list.get(i));
          addresses.add(lastServers[i].getHostnamePort());
        }
      }

      if (!exceptions.isEmpty()) {
        throw new RetriesExhaustedWithDetailsException(exceptions,
            actions,
            addresses);
      }
    } 

 

通过RPC向Region Server提交数据,

 

    private <R> Callable<MultiResponse> createCallable(final HRegionLocation loc,
        final MultiAction<R> multi, final byte [] tableName) {
      final HConnection connection = this;
      return new Callable<MultiResponse>() {
       public MultiResponse call() throws IOException {
         return getRegionServerWithoutRetries(
             new ServerCallable<MultiResponse>(connection, tableName, null) {
               public MultiResponse call() throws IOException {
                 return server.multi(multi);
               }
               @Override
               public void connect(boolean reload) throws IOException {
                 server =
                   connection.getHRegionConnection(loc.getHostname(), loc.getPort());
               }
             }
         );
       }
     };
   }

 

 

获取RPC实例的操作:

 

 HRegionInterface getHRegionConnection(final String hostname, final int port,
        final InetSocketAddress isa, final boolean master)
    throws IOException {
      if (master) getMaster();
      HRegionInterface server;
      String rsName = null;
      if (isa != null) {
        rsName = Addressing.createHostAndPortStr(isa.getHostName(),
            isa.getPort());
      } else {
        rsName = Addressing.createHostAndPortStr(hostname, port);
      }
      // See if we already have a connection (common case)
      server = this.servers.get(rsName);
      if (server == null) {
        // create a unique lock for this RS (if necessary)
        this.connectionLock.putIfAbsent(rsName, rsName);
        // get the RS lock
        synchronized (this.connectionLock.get(rsName)) {
          // do one more lookup in case we were stalled above
          server = this.servers.get(rsName);
          if (server == null) {
            try {
              if (clusterId.hasId()) {
                conf.set(HConstants.CLUSTER_ID, clusterId.getId());
              }
              // Only create isa when we need to.
              InetSocketAddress address = isa != null? isa:
                new InetSocketAddress(hostname, port);
              // definitely a cache miss. establish an RPC for this RS
              server = (HRegionInterface) HBaseRPC.waitForProxy(
                  serverInterfaceClass, HRegionInterface.VERSION,
                  address, this.conf,
                  this.maxRPCAttempts, this.rpcTimeout, this.rpcTimeout);
              this.servers.put(Addressing.createHostAndPortStr(
                  address.getHostName(), address.getPort()), server);
            } catch (RemoteException e) {
              LOG.warn("RemoteException connecting to RS", e);
              // Throw what the RemoteException was carrying.
              throw e.unwrapRemoteException();
            }
          }
        }
      }
      return server;
    }

 

  • hbase client在执行插入的时候,会对最近使用的region server做缓存,如果缓存中保存了相应的region server信息,就直接使用这个region信息,连接这个region server,否则会对master进行一次rpc操作,获得region server信息,客户端的操作put、get、delete等操作每次都是封装在一个Action对象中进行提交操作的,都是一系列的的action一起提交,这就是MultiAction
Server端操作:
 
客户端通过RPC提交过来的操作会进入到HRegionServer.multi(MultiAction<R> multi)中处理插入请求。
  • 出去每一个action对象,判断属于哪一个实例(put/get/delete),来执行相应的操作
  • 给每个put分配一个lock
  • 执行HRgion.put,进行数据写入操作
HRegionServer.java
  @SuppressWarnings("unchecked")
  @Override
  public <R> MultiResponse multi(MultiAction<R> multi) throws IOException {
    checkOpen();
    MultiResponse response = new MultiResponse();
    for (Map.Entry<byte[], List<Action<R>>> e : multi.actions.entrySet()) {
      byte[] regionName = e.getKey();
      List<Action<R>> actionsForRegion = e.getValue();
      // sort based on the row id - this helps in the case where we reach the
      // end of a region, so that we don't have to try the rest of the
      // actions in the list.
      Collections.sort(actionsForRegion);
      Row action;
      List<Action<R>> puts = new ArrayList<Action<R>>();
      for (Action<R> a : actionsForRegion) {
        action = a.getAction();
        int originalIndex = a.getOriginalIndex();

        try {
          //判断action是哪种操作
          if (action instanceof Delete) {
            delete(regionName, (Delete) action);
            response.add(regionName, originalIndex, new Result());
          } else if (action instanceof Get) {
            response.add(regionName, originalIndex, get(regionName, (Get) action));
          } else if (action instanceof Put) {
            puts.add(a);  // wont throw.
          } else if (action instanceof Exec) {
            ExecResult result = execCoprocessor(regionName, (Exec)action);
            response.add(regionName, new Pair<Integer, Object>(
                a.getOriginalIndex(), result.getValue()
            ));
          } else {
            LOG.debug("Error: invalid Action, row must be a Get, Delete, " +
                "Put or Exec.");
            throw new DoNotRetryIOException("Invalid Action, row must be a " +
                "Get, Delete or Put.");
          }
        } catch (IOException ex) {
          response.add(regionName, originalIndex, ex);
        }
      }

      // We do the puts with result.put so we can get the batching efficiency
      // we so need. All this data munging doesn't seem great, but at least
      // we arent copying bytes or anything.
      if (!puts.isEmpty()) {
        try {
          HRegion region = getRegion(regionName);

          if (!region.getRegionInfo().isMetaTable()) {
            this.cacheFlusher.reclaimMemStoreMemory();
          }

          List<Pair<Put,Integer>> putsWithLocks =
              Lists.newArrayListWithCapacity(puts.size());
          for (Action<R> a : puts) {
            Put p = (Put) a.getAction();

            Integer lock;
            try {
              //获取lock
              lock = getLockFromId(p.getLockId());
            } catch (UnknownRowLockException ex) {
              response.add(regionName, a.getOriginalIndex(), ex);
              continue;
            }
            putsWithLocks.add(new Pair<Put, Integer>(p, lock));
          }

          this.requestCount.addAndGet(puts.size());
          //调用将数据写入到region中
          OperationStatus[] codes =
              region.put(putsWithLocks.toArray(new Pair[]{}));

          for( int i = 0 ; i < codes.length ; i++) {
            OperationStatus code = codes[i];

            Action<R> theAction = puts.get(i);
            Object result = null;

            if (code.getOperationStatusCode() == OperationStatusCode.SUCCESS) {
              result = new Result();
            } else if (code.getOperationStatusCode()
                == OperationStatusCode.BAD_FAMILY) {
              result = new NoSuchColumnFamilyException(code.getExceptionMsg());
            }
            // FAILURE && NOT_RUN becomes null, aka: need to run again.

            response.add(regionName, theAction.getOriginalIndex(), result);
          }
        } catch (IOException ioe) {
          // fail all the puts with the ioe in question.
          for (Action<R> a: puts) {
            response.add(regionName, a.getOriginalIndex(), ioe);
          }
        }
      }
    }
    return response;
  }
 
HRegion.java的put操作:
  /**
   * @param put
   * @param lockid
   * @param writeToWAL
   * @throws IOException
   */
  public void put(Put put, Integer lockid, boolean writeToWAL)
  throws IOException {
	//检查region是否只读,如果只读,就会抛出异常
    checkReadOnly();

    // Do a rough check that we have resources to accept a write.  The check is
    // 'rough' in that between the resource check and the call to obtain a
    // read lock, resources may run out.  For now, the thought is that this
    // will be extremely rare; we'll deal with it when it happens.
    checkResources();
    //获取的lock
    startRegionOperation();
    this.writeRequestsCount.increment();
    try {
      // We obtain a per-row lock, so other clients will block while one client
      // performs an update. The read lock is released by the client calling
      // #commit or #abort or if the HRegionServer lease on the lock expires.
      // See HRegionServer#RegionListener for how the expire on HRegionServer
      // invokes a HRegion#abort.
      byte [] row = put.getRow();
      // If we did not pass an existing row lock, obtain a new one
      Integer lid = getLock(lockid, row, true);

      try {
        // All edits for the given row (across all column families) must happen atomically.
        internalPut(put, put.getClusterId(), writeToWAL);
      } finally {
        if(lockid == null) releaseRowLock(lid);
      }
    } finally {
      closeRegionOperation();
    }
  }
 

checkResource()操作:

在实际执行put执行,先要进行必要的检查操作,我们来看看checkResource()方法。

 

  private void checkResources() {

    // If catalog region, do not impose resource constraints or block updates.
    if (this.getRegionInfo().isMetaRegion()) return;

    boolean blocked = false;
    while (this.memstoreSize.get() > this.blockingMemStoreSize) {
      requestFlush();
      if (!blocked) {
        LOG.info("Blocking updates for '" + Thread.currentThread().getName() +
          "' on region " + Bytes.toStringBinary(getRegionName()) +
          ": memstore size " +
          StringUtils.humanReadableInt(this.memstoreSize.get()) +
          " is >= than blocking " +
          StringUtils.humanReadableInt(this.blockingMemStoreSize) + " size");
      }
      blocked = true;
      synchronized(this) {
        try {
          wait(threadWakeFrequency);
        } catch (InterruptedException e) {
          // continue;
        }
      }
    }
    if (blocked) {
      LOG.info("Unblocking updates for region " + this + " '"
          + Thread.currentThread().getName() + "'");
    }
  }

 可以看出当Hregion的Memstore总大小超过blockingMemStoreSize,则会进入flush操作,线程会进入到阻塞状态,直到memstoresize的值降到合适的范围内。

 

 

internalPut这个操作包括:

 

  • checkFamilies 检查列族
  • updateKVTimestamps 更新KeyValue的时间戳
  • addFamilyMapToWALEdit 预写日志
  • applyFamilyMapToMemstore 将数据写入到memstore中
  • isFlushSize 判断是否将文件flush到HFile中
  • 释放锁
  • 将memstore的数据flush到HFile中

本文仅是个人理解,有什么不正确的地方肯定指正

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics