Skip to content

Commit

Permalink
HBASE-19504 Add TimeRange support into checkAndMutate
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Stack <[email protected]>
  • Loading branch information
chia7712 committed Mar 23, 2018
1 parent cd5a821 commit ad47c2d
Show file tree
Hide file tree
Showing 27 changed files with 529 additions and 366 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
public class Append extends Mutation {
private static final Logger LOG = LoggerFactory.getLogger(Append.class);
private static final long HEAP_OVERHEAD = ClassSize.REFERENCE + ClassSize.TIMERANGE;
private TimeRange tr = new TimeRange();
private TimeRange tr = TimeRange.allTime();

/**
* Sets the TimeRange to be used on the Get for this append.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,14 @@
import static org.apache.hadoop.hbase.client.ConnectionUtils.toCheckExistenceOnly;

import com.google.protobuf.RpcChannel;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;

Expand Down Expand Up @@ -235,6 +234,11 @@ interface CheckAndMutateBuilder {
*/
CheckAndMutateBuilder qualifier(byte[] qualifier);

/**
* @param timeRange time range to check.
*/
CheckAndMutateBuilder timeRange(TimeRange timeRange);

/**
* Check for lack of column.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,16 @@
import static java.util.stream.Collectors.toList;

import com.google.protobuf.RpcChannel;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.yetus.audience.InterfaceAudience;

/**
Expand Down Expand Up @@ -151,6 +150,12 @@ public CheckAndMutateBuilder qualifier(byte[] qualifier) {
return this;
}

@Override
public CheckAndMutateBuilder timeRange(TimeRange timeRange) {
builder.timeRange(timeRange);
return this;
}

@Override
public CheckAndMutateBuilder ifNotExists() {
builder.ifNotExists();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public class Get extends Query implements Row {
private boolean cacheBlocks = true;
private int storeLimit = -1;
private int storeOffset = 0;
private TimeRange tr = new TimeRange();
private TimeRange tr = TimeRange.allTime();
private boolean checkExistenceOnly = false;
private boolean closestRowBefore = false;
private Map<byte [], NavigableSet<byte []>> familyMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
Expand Down
111 changes: 59 additions & 52 deletions hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
Expand Down Expand Up @@ -692,14 +693,14 @@ protected Long rpcCall() throws Exception {
@Deprecated
public boolean checkAndPut(final byte [] row, final byte [] family, final byte [] qualifier,
final byte [] value, final Put put) throws IOException {
return doCheckAndPut(row, family, qualifier, CompareOperator.EQUAL.name(), value, put);
return doCheckAndPut(row, family, qualifier, CompareOperator.EQUAL.name(), value, null, put);
}

@Override
@Deprecated
public boolean checkAndPut(final byte [] row, final byte [] family, final byte [] qualifier,
final CompareOp compareOp, final byte [] value, final Put put) throws IOException {
return doCheckAndPut(row, family, qualifier, compareOp.name(), value, put);
return doCheckAndPut(row, family, qualifier, compareOp.name(), value, null, put);
}

@Override
Expand All @@ -708,11 +709,12 @@ public boolean checkAndPut(final byte [] row, final byte [] family, final byte [
final CompareOperator op, final byte [] value, final Put put) throws IOException {
// The name of the operators in CompareOperator are intentionally those of the
// operators in the filter's CompareOp enum.
return doCheckAndPut(row, family, qualifier, op.name(), value, put);
return doCheckAndPut(row, family, qualifier, op.name(), value, null, put);
}

private boolean doCheckAndPut(final byte [] row, final byte [] family, final byte [] qualifier,
final String opName, final byte [] value, final Put put) throws IOException {
private boolean doCheckAndPut(final byte[] row, final byte[] family, final byte[] qualifier,
final String opName, final byte[] value, final TimeRange timeRange, final Put put)
throws IOException {
ClientServiceCallable<Boolean> callable =
new ClientServiceCallable<Boolean>(this.connection, getName(), row,
this.rpcControllerFactory.newController(), put.getPriority()) {
Expand All @@ -721,7 +723,7 @@ protected Boolean rpcCall() throws Exception {
CompareType compareType = CompareType.valueOf(opName);
MutateRequest request = RequestConverter.buildMutateRequest(
getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
new BinaryComparator(value), compareType, put);
new BinaryComparator(value), compareType, timeRange, put);
MutateResponse response = doMutate(request);
return Boolean.valueOf(response.getProcessed());
}
Expand All @@ -732,89 +734,87 @@ protected Boolean rpcCall() throws Exception {

@Override
@Deprecated
public boolean checkAndDelete(final byte [] row, final byte [] family, final byte [] qualifier,
final byte [] value, final Delete delete) throws IOException {
return doCheckAndDelete(row, family, qualifier, CompareOperator.EQUAL.name(), value, delete);
public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier,
final byte[] value, final Delete delete) throws IOException {
return doCheckAndDelete(row, family, qualifier, CompareOperator.EQUAL.name(), value, null,
delete);
}

@Override
@Deprecated
public boolean checkAndDelete(final byte [] row, final byte [] family, final byte [] qualifier,
final CompareOp compareOp, final byte [] value, final Delete delete) throws IOException {
return doCheckAndDelete(row, family, qualifier, compareOp.name(), value, delete);
public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier,
final CompareOp compareOp, final byte[] value, final Delete delete) throws IOException {
return doCheckAndDelete(row, family, qualifier, compareOp.name(), value, null, delete);
}

@Override
@Deprecated
public boolean checkAndDelete(final byte [] row, final byte [] family, final byte [] qualifier,
final CompareOperator op, final byte [] value, final Delete delete) throws IOException {
return doCheckAndDelete(row, family, qualifier, op.name(), value, delete);
public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier,
final CompareOperator op, final byte[] value, final Delete delete) throws IOException {
return doCheckAndDelete(row, family, qualifier, op.name(), value, null, delete);
}

private boolean doCheckAndDelete(final byte [] row, final byte [] family, final byte [] qualifier,
final String opName, final byte [] value, final Delete delete) throws IOException {
private boolean doCheckAndDelete(final byte[] row, final byte[] family, final byte[] qualifier,
final String opName, final byte[] value, final TimeRange timeRange, final Delete delete)
throws IOException {
CancellableRegionServerCallable<SingleResponse> callable =
new CancellableRegionServerCallable<SingleResponse>(
this.connection, getName(), row, this.rpcControllerFactory.newController(),
writeRpcTimeoutMs, new RetryingTimeTracker().start(), delete.getPriority()) {
@Override
protected SingleResponse rpcCall() throws Exception {
CompareType compareType = CompareType.valueOf(opName);
MutateRequest request = RequestConverter.buildMutateRequest(
getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
new BinaryComparator(value), compareType, delete);
MutateResponse response = doMutate(request);
return ResponseConverter.getResult(request, response, getRpcControllerCellScanner());
}
};
new CancellableRegionServerCallable<SingleResponse>(this.connection, getName(), row,
this.rpcControllerFactory.newController(), writeRpcTimeoutMs,
new RetryingTimeTracker().start(), delete.getPriority()) {
@Override
protected SingleResponse rpcCall() throws Exception {
CompareType compareType = CompareType.valueOf(opName);
MutateRequest request = RequestConverter
.buildMutateRequest(getLocation().getRegionInfo().getRegionName(), row, family,
qualifier, new BinaryComparator(value), compareType, timeRange, delete);
MutateResponse response = doMutate(request);
return ResponseConverter.getResult(request, response, getRpcControllerCellScanner());
}
};
List<Delete> rows = Collections.singletonList(delete);
Object[] results = new Object[1];
AsyncProcessTask task = AsyncProcessTask.newBuilder()
.setPool(pool)
.setTableName(tableName)
.setRowAccess(rows)
AsyncProcessTask task =
AsyncProcessTask.newBuilder().setPool(pool).setTableName(tableName).setRowAccess(rows)
.setCallable(callable)
// TODO any better timeout?
.setRpcTimeout(Math.max(readRpcTimeoutMs, writeRpcTimeoutMs))
.setOperationTimeout(operationTimeoutMs)
.setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL)
.setResults(results)
.build();
.setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL).setResults(results).build();
AsyncRequestFuture ars = multiAp.submit(task);
ars.waitUntilDone();
if (ars.hasError()) {
throw ars.getErrors();
}
return ((SingleResponse.Entry)results[0]).isProcessed();
return ((SingleResponse.Entry) results[0]).isProcessed();
}

@Override
public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) {
return new CheckAndMutateBuilderImpl(row, family);
}

private boolean doCheckAndMutate(final byte [] row, final byte [] family, final byte [] qualifier,
final String opName, final byte [] value, final RowMutations rm)
throws IOException {
private boolean doCheckAndMutate(final byte[] row, final byte[] family, final byte[] qualifier,
final String opName, final byte[] value, final TimeRange timeRange, final RowMutations rm)
throws IOException {
CancellableRegionServerCallable<MultiResponse> callable =
new CancellableRegionServerCallable<MultiResponse>(connection, getName(), rm.getRow(),
rpcControllerFactory.newController(), writeRpcTimeoutMs, new RetryingTimeTracker().start(),
rm.getMaxPriority()) {
@Override
protected MultiResponse rpcCall() throws Exception {
CompareType compareType = CompareType.valueOf(opName);
MultiRequest request = RequestConverter.buildMutateRequest(
getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
new BinaryComparator(value), compareType, rm);
MultiRequest request = RequestConverter
.buildMutateRequest(getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
new BinaryComparator(value), compareType, timeRange, rm);
ClientProtos.MultiResponse response = doMulti(request);
ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
if (res.hasException()) {
Throwable ex = ProtobufUtil.toException(res.getException());
if (ex instanceof IOException) {
throw (IOException)ex;
throw (IOException) ex;
}
throw new IOException("Failed to checkAndMutate row: "+
Bytes.toStringBinary(rm.getRow()), ex);
throw new IOException(
"Failed to checkAndMutate row: " + Bytes.toStringBinary(rm.getRow()), ex);
}
return ResponseConverter.getResults(request, response, getRpcControllerCellScanner());
}
Expand Down Expand Up @@ -850,14 +850,14 @@ protected MultiResponse rpcCall() throws Exception {
public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier,
final CompareOp compareOp, final byte [] value, final RowMutations rm)
throws IOException {
return doCheckAndMutate(row, family, qualifier, compareOp.name(), value, rm);
return doCheckAndMutate(row, family, qualifier, compareOp.name(), value, null, rm);
}

@Override
@Deprecated
public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier,
final CompareOperator op, final byte [] value, final RowMutations rm) throws IOException {
return doCheckAndMutate(row, family, qualifier, op.name(), value, rm);
return doCheckAndMutate(row, family, qualifier, op.name(), value, null, rm);
}

@Override
Expand Down Expand Up @@ -1234,6 +1234,7 @@ private class CheckAndMutateBuilderImpl implements CheckAndMutateBuilder {
private final byte[] row;
private final byte[] family;
private byte[] qualifier;
private TimeRange timeRange;
private CompareOperator op;
private byte[] value;

Expand All @@ -1249,6 +1250,12 @@ public CheckAndMutateBuilder qualifier(byte[] qualifier) {
return this;
}

@Override
public CheckAndMutateBuilder timeRange(TimeRange timeRange) {
this.timeRange = timeRange;
return this;
}

@Override
public CheckAndMutateBuilder ifNotExists() {
this.op = CompareOperator.EQUAL;
Expand All @@ -1271,19 +1278,19 @@ private void preCheck() {
@Override
public boolean thenPut(Put put) throws IOException {
preCheck();
return doCheckAndPut(row, family, qualifier, op.name(), value, put);
return doCheckAndPut(row, family, qualifier, op.name(), value, timeRange, put);
}

@Override
public boolean thenDelete(Delete delete) throws IOException {
preCheck();
return doCheckAndDelete(row, family, qualifier, op.name(), value, delete);
return doCheckAndDelete(row, family, qualifier, op.name(), value, timeRange, delete);
}

@Override
public boolean thenMutate(RowMutations mutation) throws IOException {
preCheck();
return doCheckAndMutate(row, family, qualifier, op.name(), value, mutation);
return doCheckAndMutate(row, family, qualifier, op.name(), value, timeRange, mutation);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
@InterfaceAudience.Public
public class Increment extends Mutation {
private static final int HEAP_OVERHEAD = ClassSize.REFERENCE + ClassSize.TIMERANGE;
private TimeRange tr = new TimeRange();
private TimeRange tr = TimeRange.allTime();

/**
* Create a Increment operation for the specified row.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.SingleRequestCallerBuilder;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ReflectionUtils;
Expand Down Expand Up @@ -265,6 +266,8 @@ private final class CheckAndMutateBuilderImpl implements CheckAndMutateBuilder {

private byte[] qualifier;

private TimeRange timeRange;

private CompareOperator op;

private byte[] value;
Expand All @@ -281,6 +284,12 @@ public CheckAndMutateBuilder qualifier(byte[] qualifier) {
return this;
}

@Override
public CheckAndMutateBuilder timeRange(TimeRange timeRange) {
this.timeRange = timeRange;
return this;
}

@Override
public CheckAndMutateBuilder ifNotExists() {
this.op = CompareOperator.EQUAL;
Expand All @@ -307,7 +316,7 @@ public CompletableFuture<Boolean> thenPut(Put put) {
.action((controller, loc, stub) -> RawAsyncTableImpl.<Put, Boolean> mutate(controller,
loc, stub, put,
(rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
new BinaryComparator(value), CompareType.valueOf(op.name()), p),
new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, p),
(c, r) -> r.getProcessed()))
.call();
}
Expand All @@ -319,7 +328,7 @@ public CompletableFuture<Boolean> thenDelete(Delete delete) {
.action((controller, loc, stub) -> RawAsyncTableImpl.<Delete, Boolean> mutate(controller,
loc, stub, delete,
(rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
new BinaryComparator(value), CompareType.valueOf(op.name()), d),
new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, d),
(c, r) -> r.getProcessed()))
.call();
}
Expand All @@ -331,7 +340,7 @@ public CompletableFuture<Boolean> thenMutate(RowMutations mutation) {
.action((controller, loc, stub) -> RawAsyncTableImpl.<Boolean> mutateRow(controller, loc,
stub, mutation,
(rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
new BinaryComparator(value), CompareType.valueOf(op.name()), rm),
new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, rm),
resp -> resp.getExists()))
.call();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public class Scan extends Query {
private long maxResultSize = -1;
private boolean cacheBlocks = true;
private boolean reversed = false;
private TimeRange tr = new TimeRange();
private TimeRange tr = TimeRange.allTime();
private Map<byte [], NavigableSet<byte []>> familyMap =
new TreeMap<byte [], NavigableSet<byte []>>(Bytes.BYTES_COMPARATOR);
private Boolean asyncPrefetch = null;
Expand Down
Loading

0 comments on commit ad47c2d

Please sign in to comment.