Skip to content

Commit

Permalink
Bdevetak/checkpoint (#29)
Browse files Browse the repository at this point in the history
* make active schema bootstraping configurable

* fix for 100% cpu usage if queues are empty; use takeFirst() instead of poll()

* Removed currentTransactionSchemaName variable and the logic surrounding it to prevent logging of non-replicated statements

* Remove two unnecessary messages and upgrade the level of logging for push timeouts

* fix for active schema bootstrap test

* This branch contains:
1. Fix for handling microseconds sufix for the case of a split transaction
that spans multiple seconds from begin to commit.
2. Added dry-run mode for hbase applier

* some todos

* integration test for split-transaction microseconds

* enforce string type in expected values for split-transaction intengration test

* remove pseudoGTID support & code cleenup
  • Loading branch information
bdevetak authored Mar 14, 2019
1 parent 2e01343 commit b2308c1
Show file tree
Hide file tree
Showing 10 changed files with 53 additions and 62 deletions.
11 changes: 1 addition & 10 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
[![][Build Status img]][Build Status]
[![][Coverage Status img]][Coverage Status]
[![][Known Vulnerabilities img]][Known Vulnerabilities]
[![][Quality Gate img]][Quality Gate]
[![][license img]][license]

## MySQL Replicator
Replicates data changes from MySQL binlog to HBase or Kafka. In case of HBase, preserves the previous data versions. HBase storage is intended for auditing and analysis of historical data. In addition, special daily-changes tables can be maintained in HBase, which are convenient for fast and cheap imports from HBase to Hive. Replication to Kafka is intended for easy real-time access to a stream of data changes.

## Intro
This readme file provides some basic documentation on how to get started. For more details, refer to official documentation at [mysql-time-machine](https://mysql-time-machine.github.io/).
This readme file provides basic introduction on how to get started. For more details, refer to official documentation at [mysql-time-machine](https://mysql-time-machine.github.io/).

### Building required Docker images
1. Run `mvn clean package` from the root of the `replicator` repository to build the MySQL Replicator distribution that will be used later;
Expand Down Expand Up @@ -161,13 +160,5 @@ limitations under the License.
[Coverage Status img]:https://codecov.io/gh/mysql-time-machine/replicator/branch/master/graph/badge.svg
[Known Vulnerabilities img]:https://snyk.io/test/github/mysql-time-machine/replicator/badge.svg
[Known Vulnerabilities]:https://snyk.io/test/github/mysql-time-machine/replicator
[Quality Gate img]:https://sonarcloud.io/api/badges/gate?key=com.booking%3Amysql-replicator
[Quality Gate]:https://sonarcloud.io/dashboard?id=com.booking%3Amysql-replicator
[Maven Central]:https://maven-badges.herokuapp.com/maven-central/com.booking/mysql-replicator
[Maven Central img]:https://maven-badges.herokuapp.com/maven-central/com.booking/mysql-replicator/badge.svg
[license]:LICENSE
[license img]:https://img.shields.io/badge/license-Apache%202-blue.svg
[License Check img]:https://app.fossa.io/api/projects/git%2Bgithub.com%2Fmysql-time-machine%2Freplicator.svg?type=shield
[License Check]:https://app.fossa.io/projects/git%2Bgithub.com%2Fmysql-time-machine%2Freplicator?ref=badge_shield
[Javadocs]:http://javadoc.io/doc/com.booking/mysql-replicator
[Javadocs img]:http://javadoc.io/badge/com.booking/mysql-replicator.svg
Original file line number Diff line number Diff line change
Expand Up @@ -178,16 +178,16 @@ public Boolean apply(Collection<AugmentedEvent> events) {

if (s) {
this.metrics.getRegistry()
.counter("hbase.thread_" + threadID + ".applier.buffer.flush.success").inc(1L);
.counter("thread_" + threadID + ".hbase_applier.buffer.flush.success").inc(1L);
return true; // <- committed, will advance safe checkpoint
} else {
this.metrics.getRegistry()
.counter("hbase.thread_" + threadID + ".applier.buffer.flush.failure").inc(1L);
.counter("thread_" + threadID + ".hbase_applier.buffer.flush.failure").inc(1L);
throw new RuntimeException("Failed to write buffer to HBase");
}
} else {
this.metrics.getRegistry()
.counter("hbase.thread_" + threadID + ".buffer.buffered").inc(1L);
.counter("thread_" + threadID + ".hbase_applier.buffer.buffered").inc(1L);
hBaseApplierWriter.buffer(threadID, transactionUUID, dataEvents);
return false; // buffered
}
Expand All @@ -197,18 +197,17 @@ public Boolean apply(Collection<AugmentedEvent> events) {
for (String transactionUUID : transactionUUIDs) {
hBaseApplierWriter.buffer(threadID, transactionUUID, dataEvents);
this.metrics.getRegistry()
.counter("hbase.thread_" + threadID + ".applier.buffer.buffered").inc(1L);
.counter("thread_" + threadID + ".hbase_applier.buffer.buffered").inc(1L);
}
this.metrics.getRegistry()
.counter("hbase.thread_" + threadID + ".applier.buffer.flush.force.attempt").inc(1L);
.counter("thread_" + threadID + ".hbase_applier.buffer.flush.force.attempt").inc(1L);
forceFlush();
this.metrics.getRegistry()
.counter("hbase.thread_" + threadID + ".applier.buffer.flush.force.success").inc(1L);
.counter("thread_" + threadID + ".hbase_applier.buffer.flush.force.success").inc(1L);
return true;
} else {
LOG.warn("Empty transaction");
this.metrics.getRegistry()
.counter("hbase.thread_" + threadID + ".applier.events.empty").inc(1L);
.counter("thread_" + threadID + ".hbase_applier.events.empty").inc(1L);
return false; // treat empty transaction as buffered
}
}
Expand All @@ -227,8 +226,6 @@ private void doSchemaLog(Collection<AugmentedEvent> events) throws IOException,
for (AugmentedEvent ev : events) {
if (ev.getOptionalPayload() != null) {

LOG.info("AugmentedEvent contains optionalPayload");

SchemaSnapshot schemaSnapshot = ((SchemaSnapshot)ev.getOptionalPayload());

hbaseSchemaManager.writeSchemaSnapshot(schemaSnapshot, this.configuration);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,7 @@ public synchronized Collection<AugmentedEvent> apply(RawEvent rawEvent) {

try {

this.metrics.getRegistry()
.counter("hbase.augmenter.apply.attempt").inc(1L);
this.metrics.getRegistry().counter("augmenter.apply.attempt").inc(1L);

RawEventHeaderV4 eventHeader = rawEvent.getHeader();
RawEventData eventData = rawEvent.getData();
Expand All @@ -118,15 +117,24 @@ public synchronized Collection<AugmentedEvent> apply(RawEvent rawEvent) {

if (this.context.shouldProcess()) {

this.metrics.getRegistry().counter("augmenter.apply.should_process.true").inc(1L);

if(this.context.isTransactionsEnabled()){

return processTransactionFlow(eventHeader, eventData);
}

AugmentedEvent augmentedEvent = getAugmentedEvent(eventHeader, eventData);

if (augmentedEvent == null) return null;

return Collections.singletonList(augmentedEvent);
}

this.metrics.getRegistry().counter("augmenter.apply.should_process.false").inc(1L);

return null;

} finally {
this.context.updatePosition();
}
Expand Down Expand Up @@ -161,11 +169,8 @@ private Collection<AugmentedEvent> processTransactionFlow(RawEventHeaderV4 event
if (augmentedEvent == null) return null;

if (this.context.getTransaction().started()) {

if (this.context.getTransaction().resuming() && this.context.getTransaction().sizeLimitExceeded()) {

Collection<AugmentedEvent> augmentedEvents = this.context.getTransaction().getAndClear();

this.context.getTransaction().add(augmentedEvent);
return augmentedEvents;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ public interface Configuration {
String DDL_TEMPORARY_TABLE_PATTERN = "augmenter.context.pattern.ddl.temporary.table";
String DDL_VIEW_PATTERN = "augmenter.context.pattern.ddl.view";
String DDL_ANALYZE_PATTERN = "augmenter.context.pattern.ddl.analyze";
String PSEUDO_GTID_PATTERN = "augmenter.context.pattern.pseudo.gtid";
String ENUM_PATTERN = "augmenter.context.pattern.enum";
String SET_PATTERN = "augmenter.context.pattern.set";
String EXCLUDE_TABLE = "augmenter.context.exclude.table";
Expand All @@ -59,7 +58,6 @@ public interface Configuration {
private static final String DEFAULT_DDL_TEMPORARY_TABLE_PATTERN = "^(/\\*.*?\\*/\\s*)?(alter|drop|create|rename|truncate|modify)\\s+(temporary)\\s+(table)\\s+(\\S+)";
private static final String DEFAULT_DDL_VIEW_PATTERN = "^(/\\*.*?\\*/\\s*)?(alter|drop|create|rename|truncate|modify)\\s+(view)\\s+(\\S+)";
private static final String DEFAULT_DDL_ANALYZE_PATTERN = "^(/\\*.*?\\*/\\s*)?(analyze)\\s+(table)\\s+(\\S+)";
private static final String DEFAULT_PSEUDO_GTID_PATTERN = "(?<=_pseudo_gtid_hint__asc\\:)(.{8}\\:.{16}\\:.{8})";
private static final String DEFAULT_ENUM_PATTERN = "(?<=enum\\()(.*?)(?=\\))";
private static final String DEFAULT_SET_PATTERN = "(?<=set\\()(.*?)(?=\\))";

Expand All @@ -74,7 +72,6 @@ public interface Configuration {
private final Pattern ddlTemporaryTablePattern;
private final Pattern ddlViewPattern;
private final Pattern ddlAnalyzePattern;
private final Pattern pseudoGTIDPattern;
private final Pattern enumPattern;
private final Pattern setPattern;
private final boolean transactionsEnabled;
Expand Down Expand Up @@ -143,7 +140,6 @@ public AugmenterContext(SchemaManager schemaManager, Map<String, Object> configu
this.ddlTemporaryTablePattern = this.getPattern(configuration, Configuration.DDL_TEMPORARY_TABLE_PATTERN, AugmenterContext.DEFAULT_DDL_TEMPORARY_TABLE_PATTERN);
this.ddlViewPattern = this.getPattern(configuration, Configuration.DDL_VIEW_PATTERN, AugmenterContext.DEFAULT_DDL_VIEW_PATTERN);
this.ddlAnalyzePattern = this.getPattern(configuration, Configuration.DDL_ANALYZE_PATTERN, AugmenterContext.DEFAULT_DDL_ANALYZE_PATTERN);
this.pseudoGTIDPattern = this.getPattern(configuration, Configuration.PSEUDO_GTID_PATTERN, AugmenterContext.DEFAULT_PSEUDO_GTID_PATTERN);
this.enumPattern = this.getPattern(configuration, Configuration.ENUM_PATTERN, AugmenterContext.DEFAULT_ENUM_PATTERN);
this.setPattern = this.getPattern(configuration, Configuration.SET_PATTERN, AugmenterContext.DEFAULT_SET_PATTERN);
this.transactionsEnabled = Boolean.parseBoolean(configuration.getOrDefault(Configuration.TRANSACTIONS_ENABLED, "true").toString());
Expand Down Expand Up @@ -251,7 +247,7 @@ public synchronized void updateContext(RawEventHeaderV4 eventHeader, RawEventDat
Matcher matcher;

this.metrics.getRegistry()
.counter("hbase.augmenter_context.type.query").inc(1L);
.counter("augmenter_context.type.query").inc(1L);

// begin
if (this.beginPattern.matcher(query).find()) {
Expand All @@ -262,12 +258,12 @@ public synchronized void updateContext(RawEventHeaderV4 eventHeader, RawEventDat
queryRawEventData.getDatabase(),
null
);

if (!this.transaction.begin()) {
AugmenterContext.LOG.log(Level.WARNING, "transaction already started");
}

}

// commit
else if (this.commitPattern.matcher(query).find()) {
this.updateCommons(
Expand Down Expand Up @@ -300,8 +296,8 @@ else if ((matcher = this.ddlDefinerPattern.matcher(query)).find()) {
// ddl table
else if ((matcher = this.ddlTablePattern.matcher(query)).find()) {
this.metrics.getRegistry()
.counter("hbase.augmenter_context.type.ddl_table").inc(1L);
Boolean shouldProcess = ( queryRawEventData.getDatabase().equals(replicatedSchema) );
.counter("augmenter_context.type.ddl_table").inc(1L);
Boolean shouldProcess = (queryRawEventData.getDatabase().equals(replicatedSchema) );
this.updateCommons(
shouldProcess,
QueryAugmentedEventDataType.DDL_TABLE,
Expand Down Expand Up @@ -356,26 +352,6 @@ else if ((matcher = this.ddlViewPattern.matcher(query)).find()) {
queryRawEventData.getDatabase(),
null
);
}

// pseudoGTID
else if ((matcher = this.pseudoGTIDPattern.matcher(query)).find()) {
this.metrics.getRegistry()
.counter("hbase.augmenter_context.type.pseudo_gtid").inc(1L);
this.updateCommons(
false,
QueryAugmentedEventDataType.PSEUDO_GTID,
null,
queryRawEventData.getDatabase(),
null
);

this.updateGTID(
GTIDType.PSEUDO,
matcher.group(0),
(byte) 0,
0
);
} else {
this.metrics.getRegistry()
.counter("hbase.augmenter_context.type.unknown").inc(1L);
Expand Down Expand Up @@ -658,6 +634,7 @@ public CurrentTransaction getTransaction() {
return this.transaction;
}


public boolean shouldProcess() {
return this.continueFlag.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,14 @@ public boolean begin() {
}
}

public int getCurrentBufferSize() {
if (buffer != null && buffer.get() != null) {
return buffer.get().size();
} else {
return 0;
}
}

public boolean add(AugmentedEvent event) {
if (this.started.get() && !this.sizeLimitExceeded()) {
return this.buffer.get().add(event);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public BinaryLogSupplier(Map<String, Object> configuration) {
Object schema = configuration.get(Configuration.MYSQL_SCHEMA);
Object username = configuration.get(Configuration.MYSQL_USERNAME);
Object password = configuration.get(Configuration.MYSQL_PASSWORD);
Object positionType = configuration.getOrDefault(Configuration.POSITION_TYPE, PositionType.ANY.name());
Object positionType = configuration.getOrDefault(Configuration.POSITION_TYPE, PositionType.GTID);

Objects.requireNonNull(hostname, String.format("Configuration required: %s", Configuration.MYSQL_HOSTNAME));
Objects.requireNonNull(schema, String.format("Configuration required: %s", Configuration.MYSQL_SCHEMA));
Expand Down Expand Up @@ -174,14 +174,28 @@ public void onDisconnect(BinaryLogClient client) {
}

if (checkpoint != null) {

if (checkpoint.getGTID() != null && checkpoint.getGTID().getType() == GTIDType.PSEUDO) {
throw new RuntimeException("PseudoGTID checkpoints are no longer supported. Last version of the replicator that supports pseudoGTIDs is v0.14.6. For all subsequent versions please use native GTIDs instead.");
}

this.client.setServerId(checkpoint.getServerId());

// start from binlog position and filename
if ((this.positionType == PositionType.ANY || this.positionType == PositionType.BINLOG) && checkpoint.getBinlog() != null) {
LOG.info("Starting Binlog Client from binlog:position -> " +
checkpoint.getBinlog().getFilename() +
":" +
checkpoint.getBinlog().getPosition()
);

this.client.setBinlogFilename(checkpoint.getBinlog().getFilename());
this.client.setBinlogPosition(checkpoint.getBinlog().getPosition());
}

if ((this.positionType == PositionType.ANY || this.positionType == PositionType.GTID) && checkpoint.getGTID() != null && checkpoint.getGTID().getType() == GTIDType.REAL) {
// start from GTID
if ((this.positionType == PositionType.GTID) && checkpoint.getGTID() != null && checkpoint.getGTID().getType() == GTIDType.REAL) {
LOG.info("Starting Binlog Client from GTID checkpoint: " + checkpoint.getGTID().getValue());
this.client.setGtidSet(checkpoint.getGTID().getValue());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,12 +163,12 @@ public Replicator(final Map<String, Object> configuration) {
Map<Integer, Collection<AugmentedEvent>> splitEventsMap = new HashMap<>();
for (AugmentedEvent event : events) {
this.metrics.getRegistry()
.counter("hbase.streams.partitioner.event.apply.attempt").inc(1L);
.counter("streams.partitioner.event.apply.attempt").inc(1L);
splitEventsMap.computeIfAbsent(
this.partitioner.apply(event, tasks), partition -> new ArrayList<>()
).add(event);
metrics.getRegistry()
.counter("hbase.streams.partitioner.event.apply.success").inc(1L);
.counter("streams.partitioner.event.apply.success").inc(1L);
}
for (Collection<AugmentedEvent> splitEvents : splitEventsMap.values()) {
this.destinationStream.push(splitEvents);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,6 @@ enum Type {
NONE {
@Override
protected CheckpointApplier newInstance(CheckpointStorage checkpointStorage, String checkpointPath, long period, boolean transactionEnabled) {
// BiConsumer<AugmentedEvent, Integer> cap = (event, map) -> {
//
// };
// return (CheckpointApplier) cap;
return new DummyCheckPointApplier();
}
},
Expand Down Expand Up @@ -46,7 +42,7 @@ static CheckpointApplier build(Map<String, Object> configuration, CheckpointStor
).newInstance(
checkpointStorage,
checkpointPath,
Long.parseLong(configuration.getOrDefault(Configuration.PERIOD, "60000").toString()),
Long.parseLong(configuration.getOrDefault(Configuration.PERIOD, "5000").toString()),
transactionEnabled
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,8 @@ class ReplicatorHBasePipelineIntegrationTestRunner extends Specification {

// Augmenter
configuration.put(AugmenterContext.Configuration.TRANSACTION_BUFFER_LIMIT, String.valueOf(AUGMENTER_TRANSACTION_BUFFER_SIZE_LIMIT))
configuration.put(AugmenterContext.Configuration.TRANSACTIONS_ENABLED, true);

configuration.put(AugmenterFilter.Configuration.FILTER_TYPE, AUGMENTER_FILTER_TYPE)
configuration.put(AugmenterFilter.Configuration.FILTER_CONFIGURATION, AUGMENTER_FILTER_CONFIGURATION)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ private Map<String, Object> getConfiguration() {
configuration.put(Coordinator.Configuration.TYPE, Coordinator.Type.ZOOKEEPER.name());

configuration.put(Supplier.Configuration.TYPE, Supplier.Type.BINLOG.name());
configuration.put(BinaryLogSupplier.Configuration.POSITION_TYPE, BinaryLogSupplier.PositionType.BINLOG);

configuration.put(Augmenter.Configuration.SCHEMA_TYPE, Augmenter.SchemaType.ACTIVE.name());
configuration.put(Seeker.Configuration.TYPE, Seeker.Type.KAFKA.name());
Expand Down

0 comments on commit b2308c1

Please sign in to comment.