From dcf32750b388a3a4825736d4c949101b3bc4f36f Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Tue, 31 Oct 2023 15:21:04 +0100 Subject: [PATCH] simplify offset tracker --- .../io/questdb/kafka/EmptyOffsetTracker.java | 35 +++++++++ ...etTracker.java => MultiOffsetTracker.java} | 2 +- ...nOffsetTracker.java => OffsetTracker.java} | 2 +- .../io/questdb/kafka/QuestDBSinkTask.java | 2 +- .../SingleTopicPartitionOffsetTracker.java | 72 ------------------- .../QuestDBSinkConnectorEmbeddedTest.java | 26 ------- .../java/io/questdb/kafka/ExactlyOnceIT.java | 3 +- 7 files changed, 39 insertions(+), 103 deletions(-) create mode 100644 connector/src/main/java/io/questdb/kafka/EmptyOffsetTracker.java rename connector/src/main/java/io/questdb/kafka/{MultiTopicPartitionOffsetTracker.java => MultiOffsetTracker.java} (98%) rename connector/src/main/java/io/questdb/kafka/{TopicPartitionOffsetTracker.java => OffsetTracker.java} (93%) delete mode 100644 connector/src/main/java/io/questdb/kafka/SingleTopicPartitionOffsetTracker.java diff --git a/connector/src/main/java/io/questdb/kafka/EmptyOffsetTracker.java b/connector/src/main/java/io/questdb/kafka/EmptyOffsetTracker.java new file mode 100644 index 0000000..aef8315 --- /dev/null +++ b/connector/src/main/java/io/questdb/kafka/EmptyOffsetTracker.java @@ -0,0 +1,35 @@ +package io.questdb.kafka; + +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.sink.SinkTaskContext; + +import java.util.Collection; +import java.util.Map; + +public final class EmptyOffsetTracker implements OffsetTracker { + @Override + public void onPartitionsOpened(Collection partitions) { + + } + + @Override + public void onPartitionsClosed(Collection partitions) { + + } + + @Override + public void onObservedOffset(int partition, String topic, long offset) { + + } + + @Override + public void configureSafeOffsets(SinkTaskContext sinkTaskContext, long rewindOffset) { + assert rewindOffset == 0; + } + + @Override + public void transformPreCommit(Map currentOffsets, long rewindOffset) { + assert rewindOffset == 0; + } +} diff --git a/connector/src/main/java/io/questdb/kafka/MultiTopicPartitionOffsetTracker.java b/connector/src/main/java/io/questdb/kafka/MultiOffsetTracker.java similarity index 98% rename from connector/src/main/java/io/questdb/kafka/MultiTopicPartitionOffsetTracker.java rename to connector/src/main/java/io/questdb/kafka/MultiOffsetTracker.java index 4758a35..b9dfe76 100644 --- a/connector/src/main/java/io/questdb/kafka/MultiTopicPartitionOffsetTracker.java +++ b/connector/src/main/java/io/questdb/kafka/MultiOffsetTracker.java @@ -10,7 +10,7 @@ import java.util.Collection; import java.util.Map; -public final class MultiTopicPartitionOffsetTracker implements TopicPartitionOffsetTracker { +public final class MultiOffsetTracker implements OffsetTracker { private static final int EMPTY = -1; private static final int CLOSED = -2; diff --git a/connector/src/main/java/io/questdb/kafka/TopicPartitionOffsetTracker.java b/connector/src/main/java/io/questdb/kafka/OffsetTracker.java similarity index 93% rename from connector/src/main/java/io/questdb/kafka/TopicPartitionOffsetTracker.java rename to connector/src/main/java/io/questdb/kafka/OffsetTracker.java index 7791ef2..a268f4f 100644 --- a/connector/src/main/java/io/questdb/kafka/TopicPartitionOffsetTracker.java +++ b/connector/src/main/java/io/questdb/kafka/OffsetTracker.java @@ -7,7 +7,7 @@ import java.util.Collection; import java.util.Map; -public interface TopicPartitionOffsetTracker { +public interface OffsetTracker { void onPartitionsOpened(Collection partitions); void onPartitionsClosed(Collection partitions); diff --git a/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java b/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java index 78507e7..5c77692 100644 --- a/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java +++ b/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java @@ -38,7 +38,7 @@ public final class QuestDBSinkTask extends SinkTask { private long batchesSinceLastError = 0; private DateFormat dataFormat; private boolean kafkaTimestampsEnabled; - private final TopicPartitionOffsetTracker tracker = new MultiTopicPartitionOffsetTracker(); + private final OffsetTracker tracker = new MultiOffsetTracker(); //private final TopicPartitionOffsetTracker tracker = new SingleTopicPartitionOffsetTracker(); @Override diff --git a/connector/src/main/java/io/questdb/kafka/SingleTopicPartitionOffsetTracker.java b/connector/src/main/java/io/questdb/kafka/SingleTopicPartitionOffsetTracker.java deleted file mode 100644 index b2d44a2..0000000 --- a/connector/src/main/java/io/questdb/kafka/SingleTopicPartitionOffsetTracker.java +++ /dev/null @@ -1,72 +0,0 @@ -package io.questdb.kafka; - -import io.questdb.std.LongList; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.connect.sink.SinkTaskContext; - -import java.util.Collection; -import java.util.Map; - -public final class SingleTopicPartitionOffsetTracker implements TopicPartitionOffsetTracker { - private final LongList offsets = new LongList(); - private String topic; - - @Override - public void onPartitionsOpened(Collection partitions) { - for (TopicPartition partition : partitions) { - if (topic == null) { - topic = partition.topic(); - } else if (!topic.equals(partition.topic())) { - throw new IllegalArgumentException("SingleTopicPartitionOffsetTracker can only track a single topic"); - } - offsets.extendAndSet(partition.partition(), -1); - } - } - - @Override - public void onPartitionsClosed(Collection partitions) { - for (TopicPartition partition : partitions) { - offsets.extendAndSet(partition.partition(), -1); - } - } - - @Override - public void onObservedOffset(int partition, String topic, long offset) { - long currentOffset = offsets.get(partition); - if (currentOffset < offset) { - offsets.extendAndSet(partition, offset); - } - } - - @Override - public void configureSafeOffsets(SinkTaskContext sinkTaskContext, long rewindOffset) { - assert topic != null; - - for (int partition = 0; partition < offsets.size(); partition++) { - long offset = offsets.get(partition); - if (offset != -1) { - long newOffset = Math.max(0, offset - rewindOffset); - sinkTaskContext.offset(new TopicPartition(topic, partition), newOffset); - } - } - } - - @Override - public void transformPreCommit(Map currentOffsets, long rewindOffset) { - assert topic != null; - - for (Map.Entry entry : currentOffsets.entrySet()) { - TopicPartition topicPartition = entry.getKey(); - if (!topicPartition.topic().equals(topic)) { - throw new IllegalArgumentException("SingleTopicPartitionOffsetTracker can only track a single topic"); - } - long offset = offsets.get(topicPartition.partition()); - if (offset != -1) { - long newOffset = Math.max(0, offset - rewindOffset); - OffsetAndMetadata offsetAndMetadata = entry.getValue(); - currentOffsets.put(topicPartition, new OffsetAndMetadata(newOffset, offsetAndMetadata.leaderEpoch(), offsetAndMetadata.metadata())); - } - } - } -} diff --git a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java index 96569df..1462fbb 100644 --- a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java +++ b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java @@ -24,7 +24,6 @@ import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy; -import org.testcontainers.containers.wait.strategy.ShellStrategy; import org.testcontainers.junit.jupiter.Testcontainers; import java.math.BigDecimal; @@ -66,34 +65,9 @@ public static void createContainer() { @AfterAll public static void stopContainer() { questDBContainer.stop(); -// deleteFromContainer("questdb"); Files.rmdir(io.questdb.std.str.Path.getThreadLocal(dbRoot.toAbsolutePath().toString())); } - private static void deleteFromContainer(String directory) { - GenericContainer cleanup = new GenericContainer<>("alpine:3.18.4") - .withFileSystemBind(dbRoot.toAbsolutePath().toString(), "/var/lib/delete") - .withCommand("ls -l /var/lib/delete/") - .withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("cleanup1"))); - cleanup.start(); - cleanup.stop(); - - cleanup = new GenericContainer<>("alpine:3.18.4") - .withFileSystemBind(dbRoot.toAbsolutePath().toString(), "/var/lib/delete") - .withCommand("rm -rf /var/lib/delete/" + directory) - .withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("cleanup2"))) - .waitingFor(new ShellStrategy().withCommand("rm -rf /var/lib/delete/" + directory)); - cleanup.start(); - cleanup.stop(); - - cleanup = new GenericContainer<>("alpine:3.18.4") - .withFileSystemBind(dbRoot.toAbsolutePath().toString(), "/var/lib/delete") - .withCommand("ls -l /var/lib/delete/") - .withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("cleanup3"))); - cleanup.start(); - cleanup.stop(); - } - private static String questDBDirectory() { return dbRoot.resolve("questdb").toAbsolutePath().toString(); } diff --git a/integration-tests/exactlyonce/src/test/java/io/questdb/kafka/ExactlyOnceIT.java b/integration-tests/exactlyonce/src/test/java/io/questdb/kafka/ExactlyOnceIT.java index 458d85f..aa07fd1 100644 --- a/integration-tests/exactlyonce/src/test/java/io/questdb/kafka/ExactlyOnceIT.java +++ b/integration-tests/exactlyonce/src/test/java/io/questdb/kafka/ExactlyOnceIT.java @@ -246,8 +246,7 @@ private static String newPayload() { UUID uuid = UUID.randomUUID(); int val = ThreadLocalRandom.current().nextInt(100); - String jsonVal = "{\"ts\":" + nanoTs + ",\"id\":\"" + uuid + "\",\"val\":" + val + "}"; - return jsonVal; + return "{\"ts\":" + nanoTs + ",\"id\":\"" + uuid + "\",\"val\":" + val + "}"; } private static void startKillingRandomContainers(CyclicBarrier barrier) {