diff --git a/kafka-connect-transforms/src/main/java/io/tabular/iceberg/connect/transforms/CdcConstants.java b/kafka-connect-transforms/src/main/java/io/tabular/iceberg/connect/transforms/CdcConstants.java index 25b997f8..c5c01d1c 100644 --- a/kafka-connect-transforms/src/main/java/io/tabular/iceberg/connect/transforms/CdcConstants.java +++ b/kafka-connect-transforms/src/main/java/io/tabular/iceberg/connect/transforms/CdcConstants.java @@ -31,4 +31,6 @@ public interface CdcConstants { String COL_SOURCE = "source"; String COL_TARGET = "target"; String COL_KEY = "key"; + + String COL_TXID = "txid"; } diff --git a/kafka-connect-transforms/src/main/java/io/tabular/iceberg/connect/transforms/DebeziumTransform.java b/kafka-connect-transforms/src/main/java/io/tabular/iceberg/connect/transforms/DebeziumTransform.java index 1e79e211..3ba88e82 100644 --- a/kafka-connect-transforms/src/main/java/io/tabular/iceberg/connect/transforms/DebeziumTransform.java +++ b/kafka-connect-transforms/src/main/java/io/tabular/iceberg/connect/transforms/DebeziumTransform.java @@ -179,22 +179,31 @@ private String mapOperation(String originalOp) { private void setTableAndTargetFromSourceStruct(Struct source, Struct cdcMetadata) { String db; + Long txid = null; + if (source.schema().field("schema") != null) { // prefer schema if present, e.g. for Postgres db = source.getString("schema"); } else { db = source.getString("db"); } + String table = source.getString("table"); + if (source.schema().field("txId") != null) { + txid = source.getInt64("txId"); + } + cdcMetadata.put(CdcConstants.COL_SOURCE, db + "." + table); cdcMetadata.put(CdcConstants.COL_TARGET, target(db, table)); + cdcMetadata.put(CdcConstants.COL_TXID, txid); } private void setTableAndTargetFromSourceMap(Object source, Map cdcMetadata) { Map map = Requirements.requireMap(source, "Debezium transform"); String db; + Long txid = null; if (map.containsKey("schema")) { // prefer schema if present, e.g. for Postgres db = map.get("schema").toString(); @@ -203,8 +212,13 @@ private void setTableAndTargetFromSourceMap(Object source, Map c } String table = map.get("table").toString(); + if (map.containsKey("txid")) { + txid = Long.valueOf(map.get("txId").toString()); + } + cdcMetadata.put(CdcConstants.COL_SOURCE, db + "." + table); cdcMetadata.put(CdcConstants.COL_TARGET, target(db, table)); + cdcMetadata.put(CdcConstants.COL_TXID, txid); } private String target(String db, String table) { @@ -220,7 +234,8 @@ private Schema makeCdcSchema(Schema keySchema) { .field(CdcConstants.COL_TS, Timestamp.SCHEMA) .field(CdcConstants.COL_OFFSET, Schema.OPTIONAL_INT64_SCHEMA) .field(CdcConstants.COL_SOURCE, Schema.STRING_SCHEMA) - .field(CdcConstants.COL_TARGET, Schema.STRING_SCHEMA); + .field(CdcConstants.COL_TARGET, Schema.STRING_SCHEMA) + .field(CdcConstants.COL_TXID, Schema.OPTIONAL_INT64_SCHEMA); if (keySchema != null) { builder.field(CdcConstants.COL_KEY, keySchema); diff --git a/kafka-connect-transforms/src/test/java/io/tabular/iceberg/connect/transforms/DebeziumTransformTest.java b/kafka-connect-transforms/src/test/java/io/tabular/iceberg/connect/transforms/DebeziumTransformTest.java index c9f34848..d90af5d7 100644 --- a/kafka-connect-transforms/src/test/java/io/tabular/iceberg/connect/transforms/DebeziumTransformTest.java +++ b/kafka-connect-transforms/src/test/java/io/tabular/iceberg/connect/transforms/DebeziumTransformTest.java @@ -57,6 +57,7 @@ public class DebeziumTransformTest { .field("source", SOURCE_SCHEMA) .field("before", ROW_SCHEMA) .field("after", ROW_SCHEMA) + .field("txid", Schema.INT64_SCHEMA) .build(); @Test