Skip to content

Commit

Permalink
v016-RC2 release. Contains: (#47)
Browse files Browse the repository at this point in the history
- Bug fixes:
    - active schema name rewrite for DLLs that use full table name `schema`.`table`
    - bug fix for load schema in case when table is droped from active schema;
    - bug fix for table swap schema change case
    - bug fix for GTID checkpoint processing in case of out of order transactions

- Consistency improvements, cleenup and tests:
    - replace warning with RuntimeException in case when ActiveSchema cannot be synced.
    - always require binlog_row_image=full; detect and throw RuntimeException if this changes during the replication.
    - more consistent metric naming.
    - JSONKafka integration test
  • Loading branch information
bdevetak authored Dec 5, 2019
1 parent 82926d4 commit ac19718
Show file tree
Hide file tree
Showing 43 changed files with 497 additions and 219 deletions.
2 changes: 1 addition & 1 deletion mysql-replicator-applier/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>com.booking.replication</groupId>
<artifactId>replicator</artifactId>
<version>0.16.0-rc1-SNAPSHOT</version>
<version>0.16.0-rc2-SNAPSHOT</version>
</parent>
<artifactId>mysql-replicator-applier</artifactId>
<name>mysql-replicator-applier</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,12 +197,12 @@ public Boolean apply(Collection<AugmentedEvent> events) {

this.metrics
.getRegistry()
.histogram("hbase.thread_" + threadID + ".applier.events.apply.batchsize")
.histogram("applier.hbase.thread_" + threadID + ".events.apply.batchsize")
.update(events.size());

for (AugmentedEvent event : events) {
this.metrics.getRegistry()
.counter("hbase.thread_" + threadID + ".applier.events.seen").inc(1L);
.counter("applier.hbase.thread_" + threadID + ".events.seen").inc(1L);
}

if (dryRun) {
Expand Down Expand Up @@ -240,23 +240,23 @@ public Boolean apply(Collection<AugmentedEvent> events) {
if ((dataEvents.size() >= FLUSH_BUFFER_SIZE) || hbaseApplierWriter.getThreadBufferSize(threadID) >= FLUSH_BUFFER_SIZE) {
hbaseApplierWriter.buffer(threadID, transactionUUID, dataEvents);
this.metrics.getRegistry()
.counter("hbase.thread_" + threadID + ".applier.buffer.buffered").inc(1L);
.counter("applier.hbase.thread_" + threadID + ".buffer.buffered").inc(1L);
this.metrics.getRegistry()
.counter("hbase.thread_" + threadID + ".applier.buffer.flush.attempt").inc(1L);
.counter("applier.hbase.thread_" + threadID + ".buffer.flush.attempt").inc(1L);
boolean isFlushSuccess = hbaseApplierWriter.flushThreadBuffer(threadID);

if (isFlushSuccess) {
this.metrics.getRegistry()
.counter("thread_" + threadID + ".hbase_applier.buffer.flush.success").inc(1L);
.counter("applier.hbase.thread_" + threadID + ".buffer.flush.success").inc(1L);
return true; // <- committed, will advance safe checkpoint
} else {
this.metrics.getRegistry()
.counter("thread_" + threadID + ".hbase_applier.buffer.flush.failure").inc(1L);
.counter("applier.hbase.thread_" + threadID + ".buffer.flush.failure").inc(1L);
throw new RuntimeException("Failed to write buffer to HBase");
}
} else {
this.metrics.getRegistry()
.counter("thread_" + threadID + ".hbase_applier.buffer.buffered").inc(1L);
.counter("applier.hbase.thread_" + threadID + ".buffer.buffered").inc(1L);
hbaseApplierWriter.buffer(threadID, transactionUUID, dataEvents);
return false; // buffered
}
Expand All @@ -266,17 +266,17 @@ public Boolean apply(Collection<AugmentedEvent> events) {
for (String transactionUUID : transactionUUIDs) {
hbaseApplierWriter.buffer(threadID, transactionUUID, dataEvents);
this.metrics.getRegistry()
.counter("thread_" + threadID + ".hbase_applier.buffer.buffered").inc(1L);
.counter("applier.hbase.thread_" + threadID + ".buffer.buffered").inc(1L);
}
this.metrics.getRegistry()
.counter("thread_" + threadID + ".hbase_applier.buffer.flush.force.attempt").inc(1L);
.counter("applier.hbase.thread_" + threadID + ".buffer.flush.force.attempt").inc(1L);
forceFlush();
this.metrics.getRegistry()
.counter("thread_" + threadID + ".hbase_applier.buffer.flush.force.success").inc(1L);
.counter("applier.hbase.thread_" + threadID + ".buffer.flush.force.success").inc(1L);
return true;
} else {
this.metrics.getRegistry()
.counter("thread_" + threadID + ".hbase_applier.events.empty").inc(1L);
.counter("applier.hbase.thread_" + threadID + ".events.empty").inc(1L);
return false; // treat empty transaction as buffered
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,9 @@ public PutMutation getPutForMirroredTable(AugmentedRow augmentedRow) {
Bytes.toBytes(columnValue)
);
this.metrics.getRegistry()
.counter("hbase.applier.columns.mutations.count").inc(1L);
.counter("applier.hbase.columns.mutations.count").inc(1L);
this.metrics.getRegistry()
.counter("hbase.applier.columns.mutations.delete.count").inc(1L);
.counter("applier.hbase.columns.mutations.delete.count").inc(1L);

if (uuid != null) {
put.addColumn(
Expand All @@ -147,9 +147,9 @@ public PutMutation getPutForMirroredTable(AugmentedRow augmentedRow) {
);

this.metrics.getRegistry()
.counter("hbase.applier.columns.mutations.count").inc(1L);
.counter("applier.hbase.columns.mutations.count").inc(1L);
this.metrics.getRegistry()
.counter("hbase.applier.columns.mutations.delete.count").inc(1L);
.counter("applier.hbase.columns.mutations.delete.count").inc(1L);
}
if (xid != null) {
put.addColumn(
Expand All @@ -160,9 +160,9 @@ public PutMutation getPutForMirroredTable(AugmentedRow augmentedRow) {
);

this.metrics.getRegistry()
.counter("hbase.applier.columns.mutations.count").inc(1L);
.counter("applier.hbase.columns.mutations.count").inc(1L);
this.metrics.getRegistry()
.counter("hbase.applier.columns.mutations.delete.count").inc(1L);
.counter("applier.hbase.columns.mutations.delete.count").inc(1L);
}
break;
}
Expand Down Expand Up @@ -194,9 +194,9 @@ public PutMutation getPutForMirroredTable(AugmentedRow augmentedRow) {
);

this.metrics.getRegistry()
.counter("hbase.applier.columns.mutations.count").inc(1L);
.counter("applier.hbase.columns.mutations.count").inc(1L);
this.metrics.getRegistry()
.counter("hbase.applier.columns.mutations.update.count").inc(1L);
.counter("applier.hbase.columns.mutations.update.count").inc(1L);

} else {
// no change, skip
Expand All @@ -210,9 +210,9 @@ public PutMutation getPutForMirroredTable(AugmentedRow augmentedRow) {
Bytes.toBytes("U")
);
this.metrics.getRegistry()
.counter("hbase.applier.columns.mutations.count").inc(1L);
.counter("applier.hbase.columns.mutations.count").inc(1L);
this.metrics.getRegistry()
.counter("hbase.applier.columns.mutations.update.count").inc(1L);
.counter("applier.hbase.columns.mutations.update.count").inc(1L);

if (uuid != null) {
put.addColumn(
Expand All @@ -222,9 +222,9 @@ public PutMutation getPutForMirroredTable(AugmentedRow augmentedRow) {
Bytes.toBytes(uuid)
);
this.metrics.getRegistry()
.counter("hbase.applier.columns.mutations.count").inc(1L);
.counter("applier.hbase.columns.mutations.count").inc(1L);
this.metrics.getRegistry()
.counter("hbase.applier.columns.mutations.update.count").inc(1L);
.counter("applier.hbase.columns.mutations.update.count").inc(1L);
}

if (xid != null) {
Expand All @@ -235,9 +235,9 @@ public PutMutation getPutForMirroredTable(AugmentedRow augmentedRow) {
Bytes.toBytes(xid.toString())
);
this.metrics.getRegistry()
.counter("hbase.applier.columns.mutations.count").inc(1L);
.counter("applier.hbase.columns.mutations.count").inc(1L);
this.metrics.getRegistry()
.counter("hbase.applier.columns.mutations.update.count").inc(1L);
.counter("applier.hbase.columns.mutations.update.count").inc(1L);
}
break;
}
Expand All @@ -259,9 +259,9 @@ public PutMutation getPutForMirroredTable(AugmentedRow augmentedRow) {
);

this.metrics.getRegistry()
.counter("hbase.applier.columns.mutations.count").inc(1L);
.counter("applier.hbase.columns.mutations.count").inc(1L);
this.metrics.getRegistry()
.counter("hbase.applier.columns.mutations.insert.count").inc(1L);
.counter("applier.hbase.columns.mutations.insert.count").inc(1L);
}

put.addColumn(
Expand All @@ -272,9 +272,9 @@ public PutMutation getPutForMirroredTable(AugmentedRow augmentedRow) {
);

this.metrics.getRegistry()
.counter("hbase.applier.columns.mutations.count").inc(1L);
.counter("applier.hbase.columns.mutations.count").inc(1L);
this.metrics.getRegistry()
.counter("hbase.applier.columns.mutations.insert.count").inc(1L);
.counter("applier.hbase.columns.mutations.insert.count").inc(1L);

if (uuid != null) {
put.addColumn(
Expand All @@ -284,9 +284,9 @@ public PutMutation getPutForMirroredTable(AugmentedRow augmentedRow) {
Bytes.toBytes(uuid)
);
this.metrics.getRegistry()
.counter("hbase.applier.columns.mutations.count").inc(1L);
.counter("applier.hbase.columns.mutations.count").inc(1L);
this.metrics.getRegistry()
.counter("hbase.applier.columns.mutations.insert.count").inc(1L);
.counter("applier.hbase.columns.mutations.insert.count").inc(1L);
}

if (xid != null) {
Expand All @@ -297,9 +297,9 @@ public PutMutation getPutForMirroredTable(AugmentedRow augmentedRow) {
Bytes.toBytes(xid.toString())
);
this.metrics.getRegistry()
.counter("hbase.applier.columns.mutations.count").inc(1L);
.counter("applier.hbase.columns.mutations.count").inc(1L);
this.metrics.getRegistry()
.counter("hbase.applier.columns.mutations.insert.count").inc(1L);
.counter("applier.hbase.columns.mutations.insert.count").inc(1L);
}
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,10 @@ public HBaseTimeMachineWriter(Configuration hbaseConfig,
this.hbaseSchemaManager = hbaseSchemaManager;

this.metrics.getRegistry()
.counter("hbase.applier.connection.attempt").inc(1L);
.counter("applier.hbase.connection.attempt").inc(1L);
connection = ConnectionFactory.createConnection(hbaseConfig);
this.metrics.getRegistry()
.counter("hbase.applier.connection.success").inc(1L);
.counter("applier.hbase.connection.success").inc(1L);

admin = connection.getAdmin();

Expand Down Expand Up @@ -192,7 +192,7 @@ private String extractTableName(List<AugmentedRow> augmentedRows) {
private void writeToHBase(Long threadID) throws IOException {

this.metrics.getRegistry()
.histogram("hbase.thread_" + threadID + "_applier.writer.buffer.thread_" + threadID + ".nr_transactions_buffered")
.histogram("applier.hbase.writer.buffer.thread_" + threadID + ".nr_transactions_buffered")
.update( buffered.get(threadID).size() );

List<HBaseApplierMutationGenerator.PutMutation> mutations = new ArrayList<>();
Expand All @@ -206,7 +206,7 @@ private void writeToHBase(Long threadID) throws IOException {
hbaseSchemaManager.createHBaseTableIfNotExists(augmentedRowsTableName);

this.metrics.getRegistry()
.counter("hbase.thread_" + threadID + ".applier.writer.rows.received").inc(augmentedRows.size());
.counter("applier.hbase.writer.thread_" + threadID + ".rows.received").inc(augmentedRows.size());

if (timestampOrganizer != null) {
timestampOrganizer.organizeTimestamps(augmentedRows, augmentedRowsTableName, threadID, transactionUUID);
Expand All @@ -218,7 +218,7 @@ private void writeToHBase(Long threadID) throws IOException {
mutations.addAll(eventMutations);

this.metrics.getRegistry()
.counter("hbase.thread_" + threadID + ".applier.writer.mutations_generated").inc(eventMutations.size());
.counter("applier.hbase.writer.thread_" + threadID + ".mutations_generated").inc(eventMutations.size());

}
}
Expand Down Expand Up @@ -255,12 +255,12 @@ private void writeToHBase(Long threadID) throws IOException {

this.metrics
.getRegistry()
.histogram("hbase.thread_" + threadID + ".applier.writer.put.latency")
.histogram("applier.writer.hbase.thread_" + threadID + ".put.latency")
.update(latency);

this.metrics
.getRegistry()
.histogram("hbase.thread_" + threadID + ".applier.writer.put.nr-mutations")
.histogram("applier.writer.hbase.thread_" + threadID + ".put.nr-mutations")
.update(nrMutations);

// TODO: send sample to validator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
import com.booking.replication.applier.Applier;
import com.booking.replication.applier.Partitioner;
import com.booking.replication.applier.schema.registry.BCachedSchemaRegistryClient;
import com.booking.replication.augmenter.model.event.AugmentedEvent;
import com.booking.replication.augmenter.model.event.QueryAugmentedEventData;
import com.booking.replication.augmenter.model.event.*;
import com.booking.replication.commons.map.MapFilter;
import com.booking.replication.commons.metrics.Metrics;

import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;

import com.fasterxml.jackson.core.JsonProcessingException;
Expand Down Expand Up @@ -37,6 +37,7 @@
import java.io.UncheckedIOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;

public class KafkaApplier implements Applier {
private static final Logger LOG = LogManager.getLogger(KafkaApplier.class);
Expand Down Expand Up @@ -65,18 +66,22 @@ public interface MessageFormat {
String JSON = "json";
}

private final String METRIC_APPLIER_DELAY;

private final Map<Integer, Producer<byte[], byte[]>> producers;
private final Map<String, Object> configuration;

private final Metrics<?> metrics;
private final Partitioner partitioner;

private final String topic;
private final String delayName;

private final int totalPartitions;

private final AtomicReference<AugmentedEvent> lastEventSent = new AtomicReference<>(null);

public KafkaApplier(Map<String, Object> configuration) {

Object topic = configuration.get(Configuration.TOPIC);

Objects.requireNonNull(topic, String.format("Configuration required: %s", Configuration.TOPIC));
Expand All @@ -99,17 +104,27 @@ public KafkaApplier(Map<String, Object> configuration) {

Objects.requireNonNull(topic, String.format("Configuration required: %s", Configuration.TOPIC));

this.delayName = MetricRegistry.name(
String.valueOf(configuration.getOrDefault(Metrics.Configuration.BASE_PATH, "events")),
"delay"
this.metricBase = MetricRegistry.name(this.metrics.basePath());

this.setupColumnsFilter(configuration);

METRIC_APPLIER_DELAY = MetricRegistry.name(
String.valueOf(configuration.getOrDefault(Metrics.Configuration.BASE_PATH, "")),
"applier","kafka","delay"
);

this.metricBase = MetricRegistry.name(this.metrics.basePath(), "kafka");
this.metrics.register(METRIC_APPLIER_DELAY, (Gauge<Long>) () -> {
if (lastEventSent.get() != null) {
return System.currentTimeMillis() - lastEventSent.get().getHeader().getTimestamp();
} else {
return 0L;
}
});

this.setupColumnsFilter(configuration);
}

private void setupColumnsFilter(Map<String, Object> configuration) {

this.includeInColumns.add("name");
this.includeInColumns.add("columnType");

Expand All @@ -118,7 +133,9 @@ private void setupColumnsFilter(Map<String, Object> configuration) {
LOG.info("Adding " + this.includeInColumns.toString() + " fields in metadata.columns.");

SimpleFilterProvider filterProvider = new SimpleFilterProvider();

filterProvider.addFilter("column", SimpleBeanPropertyFilter.filterOutAllExcept(this.includeInColumns));

MAPPER.setFilterProvider(filterProvider);
}

Expand Down Expand Up @@ -179,6 +196,8 @@ public Boolean apply(Collection<AugmentedEvent> events) {
});
}

this.lastEventSent.set(event);

writeMetrics(event, numRows);
}

Expand All @@ -203,7 +222,11 @@ public Boolean apply(Collection<AugmentedEvent> events) {
data
));

writeMetrics(event, 1);
this.lastEventSent.set(event);

int numRows = getNumberOfRowsInAugmentedEvent(event);

writeMetrics(event, numRows);
}

return true;
Expand All @@ -213,10 +236,24 @@ public Boolean apply(Collection<AugmentedEvent> events) {
}
}

private int getNumberOfRowsInAugmentedEvent(AugmentedEvent event) {
if (event.getHeader().getEventType().equals(AugmentedEventType.INSERT)) {
return ((RowsAugmentedEventData) event.getData()).getRows().size();
}

if (event.getHeader().getEventType().equals(AugmentedEventType.UPDATE)) {
return ((UpdateRowsAugmentedEventData) event.getData()).getRows().size();
}

if (event.getHeader().getEventType().equals(AugmentedEventType.DELETE)) {
return ((DeleteRowsAugmentedEventData) event.getData()).getRows().size();
}
return 0;
}

private void writeMetrics(AugmentedEvent event, int numRows) {
this.metrics.updateMeter(this.delayName, System.currentTimeMillis() - event.getHeader().getTimestamp());
String tblName = String.valueOf(event.getHeader().getTableName()); //convert null to "null"
String rowCounter = MetricRegistry.name(this.metricBase, tblName, event.getHeader().getEventType().name());
String rowCounter = MetricRegistry.name(this.metricBase, "applier", "kafka", "rows", tblName, event.getHeader().getEventType().name());
this.metrics.incrementCounter(rowCounter, numRows);
}

Expand Down
Loading

0 comments on commit ac19718

Please sign in to comment.