From 36b1c579ff863bf6d4a0780aadf46a8dbb6e0201 Mon Sep 17 00:00:00 2001 From: Volodymyr Pytak Date: Mon, 9 Apr 2018 15:11:15 +0200 Subject: [PATCH] disable HBase timestamp organizing logic during initial snapshot --- pom.xml | 2 +- .../replication/applier/hbase/HBaseApplierWriter.java | 10 +++++++--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/pom.xml b/pom.xml index 4ce1d03d..14d72b65 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.booking mysql-replicator - 0.14.4_02 + 0.14.4_04 jar mysql-replicator diff --git a/src/main/java/com/booking/replication/applier/hbase/HBaseApplierWriter.java b/src/main/java/com/booking/replication/applier/hbase/HBaseApplierWriter.java index 3a196db5..afb0293f 100644 --- a/src/main/java/com/booking/replication/applier/hbase/HBaseApplierWriter.java +++ b/src/main/java/com/booking/replication/applier/hbase/HBaseApplierWriter.java @@ -83,7 +83,7 @@ public class HBaseApplierWriter { private static final ConcurrentHashMap taskUUIDToPseudoGTID = new ConcurrentHashMap<>(); - private final RowTimestampOrganizer timestampOrganizer; + private RowTimestampOrganizer timestampOrganizer; private static PseudoGTIDCheckpoint latestCommittedPseudoGTIDCheckPoint; /** * Shared connection used by all tasks in applier. @@ -172,7 +172,9 @@ public HBaseApplierWriter( taskPool = Executors.newFixedThreadPool(this.poolSize); mutationGenerator = new HBaseApplierMutationGenerator(configuration); - timestampOrganizer = new RowTimestampOrganizer(); + if (!configuration.isInitialSnapshotMode()) { + timestampOrganizer = new RowTimestampOrganizer(); + } hbaseConf.set("hbase.zookeeper.quorum", configuration.getHBaseQuorum()); hbaseConf.set("hbase.client.keyvalue.maxsize", "0"); @@ -290,7 +292,9 @@ public synchronized void pushToCurrentTaskBuffer(AugmentedRowsEvent augmentedRow } List augmentedRows = augmentedRowsEvent.getSingleRowEvents(); - timestampOrganizer.organizeTimestamps(augmentedRows, mySQLTableName, currentTransactionUUID); + if (timestampOrganizer != null) { + timestampOrganizer.organizeTimestamps(augmentedRows, mySQLTableName, currentTransactionUUID); + } // Add to buffer for (AugmentedRow augmentedRow : augmentedRows) {