Skip to content

Commit

Permalink
simplify offset tracker
Browse files Browse the repository at this point in the history
  • Loading branch information
jerrinot committed Oct 31, 2023
1 parent 1dca9f4 commit dcf3275
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 103 deletions.
35 changes: 35 additions & 0 deletions connector/src/main/java/io/questdb/kafka/EmptyOffsetTracker.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package io.questdb.kafka;

import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.sink.SinkTaskContext;

import java.util.Collection;
import java.util.Map;

public final class EmptyOffsetTracker implements OffsetTracker {
@Override
public void onPartitionsOpened(Collection<TopicPartition> partitions) {

}

@Override
public void onPartitionsClosed(Collection<TopicPartition> partitions) {

}

@Override
public void onObservedOffset(int partition, String topic, long offset) {

}

@Override
public void configureSafeOffsets(SinkTaskContext sinkTaskContext, long rewindOffset) {
assert rewindOffset == 0;
}

@Override
public void transformPreCommit(Map<TopicPartition, OffsetAndMetadata> currentOffsets, long rewindOffset) {
assert rewindOffset == 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import java.util.Collection;
import java.util.Map;

public final class MultiTopicPartitionOffsetTracker implements TopicPartitionOffsetTracker {
public final class MultiOffsetTracker implements OffsetTracker {
private static final int EMPTY = -1;
private static final int CLOSED = -2;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import java.util.Collection;
import java.util.Map;

public interface TopicPartitionOffsetTracker {
public interface OffsetTracker {
void onPartitionsOpened(Collection<TopicPartition> partitions);

void onPartitionsClosed(Collection<TopicPartition> partitions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public final class QuestDBSinkTask extends SinkTask {
private long batchesSinceLastError = 0;
private DateFormat dataFormat;
private boolean kafkaTimestampsEnabled;
private final TopicPartitionOffsetTracker tracker = new MultiTopicPartitionOffsetTracker();
private final OffsetTracker tracker = new MultiOffsetTracker();
//private final TopicPartitionOffsetTracker tracker = new SingleTopicPartitionOffsetTracker();

@Override
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
import org.testcontainers.containers.wait.strategy.ShellStrategy;
import org.testcontainers.junit.jupiter.Testcontainers;

import java.math.BigDecimal;
Expand Down Expand Up @@ -66,34 +65,9 @@ public static void createContainer() {
@AfterAll
public static void stopContainer() {
questDBContainer.stop();
// deleteFromContainer("questdb");
Files.rmdir(io.questdb.std.str.Path.getThreadLocal(dbRoot.toAbsolutePath().toString()));
}

private static void deleteFromContainer(String directory) {
GenericContainer<?> cleanup = new GenericContainer<>("alpine:3.18.4")
.withFileSystemBind(dbRoot.toAbsolutePath().toString(), "/var/lib/delete")
.withCommand("ls -l /var/lib/delete/")
.withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("cleanup1")));
cleanup.start();
cleanup.stop();

cleanup = new GenericContainer<>("alpine:3.18.4")
.withFileSystemBind(dbRoot.toAbsolutePath().toString(), "/var/lib/delete")
.withCommand("rm -rf /var/lib/delete/" + directory)
.withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("cleanup2")))
.waitingFor(new ShellStrategy().withCommand("rm -rf /var/lib/delete/" + directory));
cleanup.start();
cleanup.stop();

cleanup = new GenericContainer<>("alpine:3.18.4")
.withFileSystemBind(dbRoot.toAbsolutePath().toString(), "/var/lib/delete")
.withCommand("ls -l /var/lib/delete/")
.withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("cleanup3")));
cleanup.start();
cleanup.stop();
}

private static String questDBDirectory() {
return dbRoot.resolve("questdb").toAbsolutePath().toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,7 @@ private static String newPayload() {
UUID uuid = UUID.randomUUID();
int val = ThreadLocalRandom.current().nextInt(100);

String jsonVal = "{\"ts\":" + nanoTs + ",\"id\":\"" + uuid + "\",\"val\":" + val + "}";
return jsonVal;
return "{\"ts\":" + nanoTs + ",\"id\":\"" + uuid + "\",\"val\":" + val + "}";
}

private static void startKillingRandomContainers(CyclicBarrier barrier) {
Expand Down

0 comments on commit dcf3275

Please sign in to comment.