diff --git a/src/main/java/com/booking/replication/Configuration.java b/src/main/java/com/booking/replication/Configuration.java index 87ebd6fc..a349abc4 100644 --- a/src/main/java/com/booking/replication/Configuration.java +++ b/src/main/java/com/booking/replication/Configuration.java @@ -45,10 +45,13 @@ private static class ReplicationSchema implements Serializable { } @JsonDeserialize - private Payload payload; + @JsonProperty("payload") + private Payload payload = new Payload(); private static class Payload implements Serializable { - public String table_name; + + @JsonDeserialize + public String table_name = ""; } diff --git a/src/main/java/com/booking/replication/applier/hbase/ApplierTask.java b/src/main/java/com/booking/replication/applier/hbase/ApplierTask.java index 461ed16c..4f486ab7 100644 --- a/src/main/java/com/booking/replication/applier/hbase/ApplierTask.java +++ b/src/main/java/com/booking/replication/applier/hbase/ApplierTask.java @@ -10,10 +10,8 @@ class ApplierTask extends HashMap { private Future taskFuture; private TaskStatus taskStatus; - // TODO: rename PseudoGTIDCheckpoint since its no longer just - // for committed positions - // One task can contain row ops sequence that spance across more than - // one pGTID, so the last seen is maintained in the task + // One task can contain row ops sequence that spans across more than + // one pGTID, so the last seen is maintained in the task private PseudoGTIDCheckpoint pseudoGTIDCheckPoint; // <- latest one withing the task event range ApplierTask(TaskStatus taskStatus) { diff --git a/src/main/java/com/booking/replication/applier/hbase/HBaseWriterTask.java b/src/main/java/com/booking/replication/applier/hbase/HBaseWriterTask.java index 22802b50..703cf62a 100644 --- a/src/main/java/com/booking/replication/applier/hbase/HBaseWriterTask.java +++ b/src/main/java/com/booking/replication/applier/hbase/HBaseWriterTask.java @@ -88,8 +88,12 @@ public HBaseTaskResult call() throws Exception { if (chaosMonkey.feelsLikeThrowingExceptionForTaskInProgress()) { throw new Exception("Chaos monkey exception for task in progress!"); } + if (chaosMonkey.feelsLikeFailingTaskInProgessWithoutException()) { + + LOGGER.debug("Chaos monkey failing task in progress without exception"); return new HBaseTaskResult(taskUuid, TaskStatus.WRITE_FAILED, false); + } for (final String transactionUuid : taskTransactionBuffer.keySet()) { @@ -99,20 +103,36 @@ public HBaseTaskResult call() throws Exception { int numberOfFlushedTablesInCurrentTransaction = 0; final Timer.Context timerContext = putLatencyTimer.time(); + for (final String bufferedMySQLTableName : taskTransactionBuffer.get(transactionUuid).keySet()) { if (chaosMonkey.feelsLikeThrowingExceptionBeforeFlushingData()) { + throw new Exception("Chaos monkey is here to prevent call to flush!!!"); + } else if (chaosMonkey.feelsLikeFailingDataFlushWithoutException()) { + + LOGGER.debug("Chaos monkey failing data flush without throwing exception"); return new HBaseTaskResult(taskUuid, TaskStatus.WRITE_FAILED, false); + } else { + + LOGGER.debug("Passed the chaos monkey army."); + LOGGER.debug("Try to read transaction " + transactionUuid + " from taskTransactionBuffer"); + List rowOps = taskTransactionBuffer.get(transactionUuid).get(bufferedMySQLTableName); - Map> mutationsByTable = mutationGenerator.generateMutations(rowOps).stream() - .collect( - Collectors.groupingBy( mutation->mutation.getTable() - ) - ); + LOGGER.debug("Got rowOps from taskTransactionBuffer for table " + bufferedMySQLTableName); + + Map> mutationsByTable = + mutationGenerator + .generateMutations(rowOps) + .stream() + .collect( + Collectors.groupingBy( mutation -> mutation.getTable() ) + ); + + LOGGER.debug("Generated HBase mutations for { transactionUuid => " + transactionUuid + ", table => " + bufferedMySQLTableName + " }"); for (Map.Entry> entry : mutationsByTable.entrySet()){