Skip to content

Commit

Permalink
DBZ-7903 Align snapshot modes
Browse files Browse the repository at this point in the history
  • Loading branch information
mfvitale authored and jpechane committed Oct 31, 2024
1 parent a0e0301 commit 4699a1a
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@
import io.debezium.connector.SnapshotRecord;
import io.debezium.ibmi.db2.journal.retrieve.JournalProcessedPosition;
import io.debezium.ibmi.db2.journal.retrieve.JournalReceiver;
import io.debezium.pipeline.CommonOffsetContext;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.spi.schema.DataCollectionId;

public class As400OffsetContext implements OffsetContext {
private static Logger log = LoggerFactory.getLogger(As400OffsetContext.class);
public class As400OffsetContext extends CommonOffsetContext<SourceInfo> {
private static final Logger log = LoggerFactory.getLogger(As400OffsetContext.class);
// TODO note believe there is a per journal offset
private static final String SERVER_PARTITION_KEY = "server";
public static final String EVENT_SEQUENCE = "offset.event_sequence";
Expand All @@ -47,52 +48,52 @@ public class As400OffsetContext implements OffsetContext {
private final As400ConnectorConfig connectorConfig;
private final SourceInfo sourceInfo;
private final JournalProcessedPosition position;
private final String inclueTables;
private final String includeTables;
private boolean hasNewTables = false;
private volatile boolean snapshotComplete = false;

public As400OffsetContext(As400ConnectorConfig connectorConfig) {
super();
super(new SourceInfo(connectorConfig), false);
partition = Collections.singletonMap(SERVER_PARTITION_KEY, connectorConfig.getLogicalName());
this.position = connectorConfig.getOffset();
this.connectorConfig = connectorConfig;
sourceInfo = new SourceInfo(connectorConfig);
inclueTables = connectorConfig.tableIncludeList();
includeTables = connectorConfig.tableIncludeList();
}

public As400OffsetContext(As400ConnectorConfig connectorConfig, JournalProcessedPosition position) {
super();
super(new SourceInfo(connectorConfig), false);
partition = Collections.singletonMap(SERVER_PARTITION_KEY, connectorConfig.getLogicalName());
this.position = position;
this.connectorConfig = connectorConfig;
sourceInfo = new SourceInfo(connectorConfig);
inclueTables = connectorConfig.tableIncludeList();
includeTables = connectorConfig.tableIncludeList();
}

public As400OffsetContext(As400ConnectorConfig connectorConfig, JournalProcessedPosition position, String includeTables,
boolean snapshotComplete) {
super();
super(new SourceInfo(connectorConfig), snapshotComplete);
partition = Collections.singletonMap(SERVER_PARTITION_KEY, connectorConfig.getLogicalName());
this.position = position;
this.connectorConfig = connectorConfig;
sourceInfo = new SourceInfo(connectorConfig);
this.inclueTables = includeTables;
this.includeTables = includeTables;
this.snapshotComplete = snapshotComplete;
}

public void setPosition(JournalProcessedPosition newPosition) {
this.position.setPosition(newPosition);
}

public boolean isSnapshotCompplete() {
public boolean isSnapshotComplete() {
return this.snapshotComplete;
}

public JournalProcessedPosition getPosition() {
return position;
}

public boolean isPosisionSet() {
public boolean isPositionSet() {
return position != null && position.isOffsetSet();
}

Expand Down Expand Up @@ -124,7 +125,7 @@ public void endTransaction() {
As400OffsetContext.RECEIVER, position.getReceiver().name(),
As400OffsetContext.PROCESSED, Boolean.toString(position.processed()),
As400OffsetContext.RECEIVER_LIBRARY, position.getReceiver().library(),
RelationalDatabaseConnectorConfig.TABLE_INCLUDE_LIST.name(), inclueTables,
RelationalDatabaseConnectorConfig.TABLE_INCLUDE_LIST.name(), includeTables,
As400OffsetContext.SNAPSHOT_COMPLETED_KEY, Boolean.toString(snapshotComplete)));
}

Expand All @@ -142,28 +143,6 @@ public Struct getSourceInfo() {
return sourceInfo.struct();
}

@Override
public boolean isSnapshotRunning() {
return sourceInfo.isSnapshot();
}

@Override
public void preSnapshotStart() {
snapshotComplete = false;
sourceInfo.setSnapshot(SnapshotRecord.TRUE);
}

@Override
public void preSnapshotCompletion() {
snapshotComplete = true;
}

@Override
public void postSnapshotCompletion() {
sourceInfo.setSnapshot(SnapshotRecord.FALSE);
snapshotComplete = true;
}

@Override
public void event(DataCollectionId collectionId, Instant timestamp) {
sourceInfo.setSourceTime(timestamp);
Expand All @@ -176,7 +155,7 @@ public TransactionContext getTransactionContext() {
}

public String getIncludeTables() {
return inclueTables;
return includeTables;
}

public static class Loader implements OffsetContext.Loader<As400OffsetContext> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ protected void determineSnapshotOffset(
RelationalSnapshotContext<As400Partition, As400OffsetContext> snapshotContext,
As400OffsetContext previousOffset)
throws Exception {
if (previousOffset != null && previousOffset.isPosisionSet()) {
if (previousOffset != null && previousOffset.isPositionSet()) {
snapshotContext.offset = previousOffset;
}

Expand Down Expand Up @@ -213,7 +213,7 @@ public SnapshottingTask getSnapshottingTask(As400Partition partition, As400Offse
final Map<DataCollectionId, String> snapshotSelectOverridesByTable = connectorConfig.getSnapshotSelectOverridesByTable();

// found a previous offset and the earlier snapshot has completed
if (previousOffset != null && previousOffset.isSnapshotCompplete()) {
if (previousOffset != null && previousOffset.isSnapshotComplete()) {
// when control tables in place
if (!previousOffset.hasNewTables()) {
log.info(
Expand Down

0 comments on commit 4699a1a

Please sign in to comment.