Skip to content

Commit

Permalink
Update release 0.14.4._03: contains bug fix in configuration class fo…
Browse files Browse the repository at this point in the history
…r bug that triggers null pointer exception in HBaseApplier when payload table is not specified in config file
  • Loading branch information
bdevetak committed Mar 21, 2018
1 parent 3e53407 commit b8a2063
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 11 deletions.
7 changes: 5 additions & 2 deletions src/main/java/com/booking/replication/Configuration.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,13 @@ private static class ReplicationSchema implements Serializable {
}

@JsonDeserialize
private Payload payload;
@JsonProperty("payload")
private Payload payload = new Payload();

private static class Payload implements Serializable {
public String table_name;

@JsonDeserialize
public String table_name = "";
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,8 @@ class ApplierTask extends HashMap<String, TransactionProxy> {
private Future<HBaseTaskResult> taskFuture;
private TaskStatus taskStatus;

// TODO: rename PseudoGTIDCheckpoint since its no longer just
// for committed positions
// One task can contain row ops sequence that spance across more than
// one pGTID, so the last seen is maintained in the task
// One task can contain row ops sequence that spans across more than
// one pGTID, so the last seen is maintained in the task
private PseudoGTIDCheckpoint pseudoGTIDCheckPoint; // <- latest one withing the task event range

ApplierTask(TaskStatus taskStatus) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,12 @@ public HBaseTaskResult call() throws Exception {
if (chaosMonkey.feelsLikeThrowingExceptionForTaskInProgress()) {
throw new Exception("Chaos monkey exception for task in progress!");
}

if (chaosMonkey.feelsLikeFailingTaskInProgessWithoutException()) {

LOGGER.debug("Chaos monkey failing task in progress without exception");
return new HBaseTaskResult(taskUuid, TaskStatus.WRITE_FAILED, false);

}

for (final String transactionUuid : taskTransactionBuffer.keySet()) {
Expand All @@ -99,20 +103,36 @@ public HBaseTaskResult call() throws Exception {
int numberOfFlushedTablesInCurrentTransaction = 0;

final Timer.Context timerContext = putLatencyTimer.time();

for (final String bufferedMySQLTableName : taskTransactionBuffer.get(transactionUuid).keySet()) {

if (chaosMonkey.feelsLikeThrowingExceptionBeforeFlushingData()) {

throw new Exception("Chaos monkey is here to prevent call to flush!!!");

} else if (chaosMonkey.feelsLikeFailingDataFlushWithoutException()) {

LOGGER.debug("Chaos monkey failing data flush without throwing exception");
return new HBaseTaskResult(taskUuid, TaskStatus.WRITE_FAILED, false);

} else {

LOGGER.debug("Passed the chaos monkey army.");
LOGGER.debug("Try to read transaction " + transactionUuid + " from taskTransactionBuffer");

List<AugmentedRow> rowOps = taskTransactionBuffer.get(transactionUuid).get(bufferedMySQLTableName);

Map<String, List<HBaseApplierMutationGenerator.PutMutation>> mutationsByTable = mutationGenerator.generateMutations(rowOps).stream()
.collect(
Collectors.groupingBy( mutation->mutation.getTable()
)
);
LOGGER.debug("Got rowOps from taskTransactionBuffer for table " + bufferedMySQLTableName);

Map<String, List<HBaseApplierMutationGenerator.PutMutation>> mutationsByTable =
mutationGenerator
.generateMutations(rowOps)
.stream()
.collect(
Collectors.groupingBy( mutation -> mutation.getTable() )
);

LOGGER.debug("Generated HBase mutations for { transactionUuid => " + transactionUuid + ", table => " + bufferedMySQLTableName + " }");

for (Map.Entry<String, List<HBaseApplierMutationGenerator.PutMutation>> entry : mutationsByTable.entrySet()){

Expand Down

0 comments on commit b8a2063

Please sign in to comment.