Skip to content

Commit

Permalink
Safe checkpoint management: do not save safe checkpoint in rotate eve…
Browse files Browse the repository at this point in the history
…nt; the only source of truth for safe checkpoint is pseudoGTID
  • Loading branch information
bdevetak committed Mar 21, 2018
1 parent 11c5629 commit 3e53407
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ public Connection createHBaseConnection(int retry) throws IOException {
}
retry--;
}
LOGGER.error("Failed to create hbase connection from HBaseApplier, attempt " + retry + "/10. Giving up. Last expcetion was:" + lastException);
LOGGER.error("Failed to create hbase connection from HBaseApplier, attempt " + retry + "/10. Giving up. Last exception was:" + lastException);
throw new IOException("Could not create HBase connection, all retry attempts failed. Last exception was:", lastException);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,45 +40,16 @@ public void apply(BinlogEventV4 binlogEventV4, CurrentTransaction currentTransac
} catch (IOException e) {
throw new EventHandlerApplyException("Failed to apply event", e);
}
LOGGER.info("End of binlog file. Waiting for all tasks to finish before moving forward...");

//TODO: Investigate if this is the right thing to do.

eventHandlerConfiguration.getApplier().waitUntilAllRowsAreCommitted();


String currentBinlogFileName = pipelinePosition.getCurrentPosition().getBinlogFilename();
long currentBinlogPosition = pipelinePosition.getCurrentPosition().getBinlogPosition();
// binlog begins on position 4
if (currentBinlogPosition <= 0L) currentBinlogPosition = 4;

String nextBinlogFileName = event.getBinlogFileName().toString();

LOGGER.info("All rows committed, moving to next binlog " + nextBinlogFileName);

String pseudoGTID = pipelinePosition.getCurrentPseudoGTID();
String pseudoGTIDFullQuery = pipelinePosition.getCurrentPseudoGTIDFullQuery();
int currentSlaveId = pipelinePosition.getCurrentPosition().getServerID();

PseudoGTIDCheckpoint marker = new PseudoGTIDCheckpoint(
pipelinePosition.getCurrentPosition().getHost(),
currentSlaveId,
currentBinlogFileName,
currentBinlogPosition,
pseudoGTID,
pseudoGTIDFullQuery,
pipelineOrchestrator.getFakeMicrosecondCounter()
);

try {
Coordinator.saveCheckpointMarker(marker);
} catch (Exception e) {
LOGGER.error("Failed to save Checkpoint!", e);
pipelineOrchestrator.requestShutdown();
}
LOGGER.info("Rotate Event: moving to the processing of the next binlog file" + nextBinlogFileName);

if (currentBinlogFileName.equals(lastBinlogFileName)) {
LOGGER.info("processed the last binlog file " + lastBinlogFileName);
LOGGER.info("Processed the last binlog file " + lastBinlogFileName);
pipelineOrchestrator.requestShutdown();
}
}
Expand Down

0 comments on commit 3e53407

Please sign in to comment.