Skip to content

Commit

Permalink
initial changes to add txid
Browse files Browse the repository at this point in the history
  • Loading branch information
syook-r7 committed Jul 2, 2024
1 parent 066bb79 commit 61d9e25
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,6 @@ public interface CdcConstants {
String COL_SOURCE = "source";
String COL_TARGET = "target";
String COL_KEY = "key";

String COL_TXID = "txid";
}
Original file line number Diff line number Diff line change
Expand Up @@ -179,22 +179,31 @@ private String mapOperation(String originalOp) {

private void setTableAndTargetFromSourceStruct(Struct source, Struct cdcMetadata) {
String db;
Long txid = null;

if (source.schema().field("schema") != null) {
// prefer schema if present, e.g. for Postgres
db = source.getString("schema");
} else {
db = source.getString("db");
}

String table = source.getString("table");

if (source.schema().field("txId") != null) {
txid = source.getInt64("txId");
}

cdcMetadata.put(CdcConstants.COL_SOURCE, db + "." + table);
cdcMetadata.put(CdcConstants.COL_TARGET, target(db, table));
cdcMetadata.put(CdcConstants.COL_TXID, txid);
}

private void setTableAndTargetFromSourceMap(Object source, Map<String, Object> cdcMetadata) {
Map<String, Object> map = Requirements.requireMap(source, "Debezium transform");

String db;
Long txid = null;
if (map.containsKey("schema")) {
// prefer schema if present, e.g. for Postgres
db = map.get("schema").toString();
Expand All @@ -203,8 +212,13 @@ private void setTableAndTargetFromSourceMap(Object source, Map<String, Object> c
}
String table = map.get("table").toString();

if (map.containsKey("txid")) {
txid = Long.valueOf(map.get("txId").toString());
}

cdcMetadata.put(CdcConstants.COL_SOURCE, db + "." + table);
cdcMetadata.put(CdcConstants.COL_TARGET, target(db, table));
cdcMetadata.put(CdcConstants.COL_TXID, txid);
}

private String target(String db, String table) {
Expand All @@ -220,7 +234,8 @@ private Schema makeCdcSchema(Schema keySchema) {
.field(CdcConstants.COL_TS, Timestamp.SCHEMA)
.field(CdcConstants.COL_OFFSET, Schema.OPTIONAL_INT64_SCHEMA)
.field(CdcConstants.COL_SOURCE, Schema.STRING_SCHEMA)
.field(CdcConstants.COL_TARGET, Schema.STRING_SCHEMA);
.field(CdcConstants.COL_TARGET, Schema.STRING_SCHEMA)
.field(CdcConstants.COL_TXID, Schema.OPTIONAL_INT64_SCHEMA);

if (keySchema != null) {
builder.field(CdcConstants.COL_KEY, keySchema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public class DebeziumTransformTest {
.field("source", SOURCE_SCHEMA)
.field("before", ROW_SCHEMA)
.field("after", ROW_SCHEMA)
.field("txid", Schema.INT64_SCHEMA)
.build();

@Test
Expand Down

0 comments on commit 61d9e25

Please sign in to comment.