From 3e534078632d1d0712cbd45d840b334b8d716c26 Mon Sep 17 00:00:00 2001 From: Bosko Devetak Date: Wed, 21 Mar 2018 12:59:41 +0100 Subject: [PATCH] Safe checkpoint management: do not save safe checkpoint in rotate event; the only source of truth for safe checkpoint is pseudoGTID --- .../applier/hbase/HBaseApplierWriter.java | 2 +- .../event/handler/RotateEventHandler.java | 33 ++----------------- 2 files changed, 3 insertions(+), 32 deletions(-) diff --git a/src/main/java/com/booking/replication/applier/hbase/HBaseApplierWriter.java b/src/main/java/com/booking/replication/applier/hbase/HBaseApplierWriter.java index ced8e3f1..3a196db5 100644 --- a/src/main/java/com/booking/replication/applier/hbase/HBaseApplierWriter.java +++ b/src/main/java/com/booking/replication/applier/hbase/HBaseApplierWriter.java @@ -240,7 +240,7 @@ public Connection createHBaseConnection(int retry) throws IOException { } retry--; } - LOGGER.error("Failed to create hbase connection from HBaseApplier, attempt " + retry + "/10. Giving up. Last expcetion was:" + lastException); + LOGGER.error("Failed to create hbase connection from HBaseApplier, attempt " + retry + "/10. Giving up. Last exception was:" + lastException); throw new IOException("Could not create HBase connection, all retry attempts failed. Last exception was:", lastException); } diff --git a/src/main/java/com/booking/replication/pipeline/event/handler/RotateEventHandler.java b/src/main/java/com/booking/replication/pipeline/event/handler/RotateEventHandler.java index 41733787..d52b24df 100644 --- a/src/main/java/com/booking/replication/pipeline/event/handler/RotateEventHandler.java +++ b/src/main/java/com/booking/replication/pipeline/event/handler/RotateEventHandler.java @@ -40,45 +40,16 @@ public void apply(BinlogEventV4 binlogEventV4, CurrentTransaction currentTransac } catch (IOException e) { throw new EventHandlerApplyException("Failed to apply event", e); } - LOGGER.info("End of binlog file. Waiting for all tasks to finish before moving forward..."); - - //TODO: Investigate if this is the right thing to do. - - eventHandlerConfiguration.getApplier().waitUntilAllRowsAreCommitted(); String currentBinlogFileName = pipelinePosition.getCurrentPosition().getBinlogFilename(); - long currentBinlogPosition = pipelinePosition.getCurrentPosition().getBinlogPosition(); - // binlog begins on position 4 - if (currentBinlogPosition <= 0L) currentBinlogPosition = 4; String nextBinlogFileName = event.getBinlogFileName().toString(); - LOGGER.info("All rows committed, moving to next binlog " + nextBinlogFileName); - - String pseudoGTID = pipelinePosition.getCurrentPseudoGTID(); - String pseudoGTIDFullQuery = pipelinePosition.getCurrentPseudoGTIDFullQuery(); - int currentSlaveId = pipelinePosition.getCurrentPosition().getServerID(); - - PseudoGTIDCheckpoint marker = new PseudoGTIDCheckpoint( - pipelinePosition.getCurrentPosition().getHost(), - currentSlaveId, - currentBinlogFileName, - currentBinlogPosition, - pseudoGTID, - pseudoGTIDFullQuery, - pipelineOrchestrator.getFakeMicrosecondCounter() - ); - - try { - Coordinator.saveCheckpointMarker(marker); - } catch (Exception e) { - LOGGER.error("Failed to save Checkpoint!", e); - pipelineOrchestrator.requestShutdown(); - } + LOGGER.info("Rotate Event: moving to the processing of the next binlog file" + nextBinlogFileName); if (currentBinlogFileName.equals(lastBinlogFileName)) { - LOGGER.info("processed the last binlog file " + lastBinlogFileName); + LOGGER.info("Processed the last binlog file " + lastBinlogFileName); pipelineOrchestrator.requestShutdown(); } }