Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new transform prop to keep db name in target pattern #207

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions kafka-connect-transforms/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,10 @@ It will promote the `before` or `after` element fields to top level and add the

## Configuration

| Property | Description |
|---------------------|-----------------------------------------------------------------------------------|
| cdc.target.pattern | Pattern to use for setting the CDC target field value, default is `{db}.{table}` |
| Property | Description |
|-----------------------------------|------------------------------------------------------------------------------------------------------------------------------------------|
| cdc.target.pattern | Pattern to use for setting the CDC target field value. Available placeholders: `{db}`, `{schema}`,`{table}`. Default is `{db}.{table}`. |
| cdc.target.schema.replace.dbname | Replace db name with schema in CDC target field value, if schema exists. Default is `true` |

# JsonToMapTransform
_(Experimental)_
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,24 +41,35 @@ public class DebeziumTransform<R extends ConnectRecord<R>> implements Transforma
private static final Logger LOG = LoggerFactory.getLogger(DebeziumTransform.class.getName());

private static final String CDC_TARGET_PATTERN = "cdc.target.pattern";
private static final String CDC_TARGET_SCHEMA_REPLACE_DBNAME = "cdc.target.schema.replace.dbname";
private static final String DB_PLACEHOLDER = "{db}";
private static final String SCHEMA_PLACEHOLDER = "{schema}";
private static final String TABLE_PLACEHOLDER = "{table}";

public static final ConfigDef CONFIG_DEF =
new ConfigDef()
.define(
CDC_TARGET_PATTERN,
ConfigDef.Type.STRING,
null,
"",
Importance.MEDIUM,
"Pattern to use for setting the CDC target field value.");
"Pattern to use for setting the CDC target field value.")
.define(
CDC_TARGET_SCHEMA_REPLACE_DBNAME,
ConfigDef.Type.STRING,
"true",
Importance.LOW,
"Replace db name with schema in CDC target field value, if schema exists.");

private String cdcTargetPattern;
private Boolean cdcTargetSchemaReplaceDbname;

@Override
public void configure(Map<String, ?> props) {
SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
cdcTargetPattern = config.getString(CDC_TARGET_PATTERN);
cdcTargetSchemaReplaceDbname =
config.getString(CDC_TARGET_SCHEMA_REPLACE_DBNAME).equals("true");
}

@Override
Expand Down Expand Up @@ -178,39 +189,62 @@ private String mapOperation(String originalOp) {
}

private void setTableAndTargetFromSourceStruct(Struct source, Struct cdcMetadata) {
String db;

String db = source.getString("db");
String schema = "";
String colSource;
String table = source.getString("table");

if (source.schema().field("schema") != null) {
// prefer schema if present, e.g. for Postgres
db = source.getString("schema");
schema = source.getString("schema");
if (cdcTargetSchemaReplaceDbname) {
colSource = schema + "." + table;
db = schema;
} else {
colSource = db + "." + schema + "." + table;
}
} else {
db = source.getString("db");
colSource = db + "." + table;
}
String table = source.getString("table");

cdcMetadata.put(CdcConstants.COL_SOURCE, db + "." + table);
cdcMetadata.put(CdcConstants.COL_TARGET, target(db, table));
cdcMetadata.put(CdcConstants.COL_SOURCE, colSource);
cdcMetadata.put(CdcConstants.COL_TARGET, target(db, schema, table));
}

private void setTableAndTargetFromSourceMap(Object source, Map<String, Object> cdcMetadata) {
Map<String, Object> map = Requirements.requireMap(source, "Debezium transform");

String db;
String db = map.get("db").toString();
String schema = "";
String colSource;
String table = map.get("table").toString();

if (map.containsKey("schema")) {
// prefer schema if present, e.g. for Postgres
db = map.get("schema").toString();
schema = map.get("schema").toString();
if (cdcTargetSchemaReplaceDbname) {
colSource = schema + "." + table;
db = schema;
} else {
colSource = db + "." + schema + "." + table;
}
} else {
db = map.get("db").toString();
colSource = db + "." + table;
}
String table = map.get("table").toString();

cdcMetadata.put(CdcConstants.COL_SOURCE, db + "." + table);
cdcMetadata.put(CdcConstants.COL_TARGET, target(db, table));
cdcMetadata.put(CdcConstants.COL_SOURCE, colSource);
cdcMetadata.put(CdcConstants.COL_TARGET, target(db, schema, table));
}

private String target(String db, String table) {
return cdcTargetPattern == null || cdcTargetPattern.isEmpty()
? db + "." + table
: cdcTargetPattern.replace(DB_PLACEHOLDER, db).replace(TABLE_PLACEHOLDER, table);
private String target(String db, String schema, String table) {
if (cdcTargetPattern.isEmpty()) {
return schema.isEmpty() ? db + "." + table : db + "." + schema + "." + table;
}
return cdcTargetPattern
.replace(DB_PLACEHOLDER, db)
.replace(SCHEMA_PLACEHOLDER, schema)
.replace(TABLE_PLACEHOLDER, table);
}

private Schema makeCdcSchema(Schema keySchema) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public void testDmsTransformNull() {
@SuppressWarnings("unchecked")
public void testDebeziumTransformSchemaless() {
try (DebeziumTransform<SinkRecord> smt = new DebeziumTransform<>()) {
smt.configure(ImmutableMap.of("cdc.target.pattern", "{db}_x.{table}_x"));
smt.configure(ImmutableMap.of("cdc.target.pattern", "pattern.{db}_{table}"));

Map<String, Object> event = createDebeziumEventMap("u");
Map<String, Object> key = ImmutableMap.of("account_id", 1L);
Expand All @@ -86,16 +86,93 @@ public void testDebeziumTransformSchemaless() {

Map<String, Object> cdcMetadata = (Map<String, Object>) value.get("_cdc");
assertThat(cdcMetadata.get("op")).isEqualTo("U");
assertThat(cdcMetadata.get("source")).isEqualTo("schema.tbl");
assertThat(cdcMetadata.get("target")).isEqualTo("schema_x.tbl_x");
assertThat(cdcMetadata.get("source")).isEqualTo("test_db.test_table");
assertThat(cdcMetadata.get("target")).isEqualTo("pattern.test_db_test_table");
assertThat(cdcMetadata.get("key")).isInstanceOf(Map.class);
}
}

@Test
@SuppressWarnings("unchecked")
public void testDebeziumTransformSchemalessIncludeDbname() {
try (DebeziumTransform<SinkRecord> smt = new DebeziumTransform<>()) {
smt.configure(
ImmutableMap.of(
"cdc.target.pattern", "pattern.{db}_{table}",
// takes no effect
"cdc.target.schema.replace.dbname", "false"));

Map<String, Object> event = createDebeziumEventMap("u");
Map<String, Object> key = ImmutableMap.of("account_id", 1L);
SinkRecord record = new SinkRecord("topic", 0, null, key, null, event, 0);

SinkRecord result = smt.apply(record);
assertThat(result.value()).isInstanceOf(Map.class);
Map<String, Object> value = (Map<String, Object>) result.value();

assertThat(value.get("account_id")).isEqualTo(1);

Map<String, Object> cdcMetadata = (Map<String, Object>) value.get("_cdc");
assertThat(cdcMetadata.get("op")).isEqualTo("U");
assertThat(cdcMetadata.get("source")).isEqualTo("test_db.test_table");
assertThat(cdcMetadata.get("target")).isEqualTo("pattern.test_db_test_table");
assertThat(cdcMetadata.get("key")).isInstanceOf(Map.class);
}
}

@Test
public void testDebeziumTransformWithSchema() {
try (DebeziumTransform<SinkRecord> smt = new DebeziumTransform<>()) {
smt.configure(ImmutableMap.of("cdc.target.pattern", "{db}_x.{table}_x"));
smt.configure(ImmutableMap.of("cdc.target.pattern", "pattern.{db}_{table}"));

Struct event = createDebeziumEventStruct("u");
Struct key = new Struct(KEY_SCHEMA).put("account_id", 1L);
SinkRecord record = new SinkRecord("topic", 0, KEY_SCHEMA, key, VALUE_SCHEMA, event, 0);

SinkRecord result = smt.apply(record);
assertThat(result.value()).isInstanceOf(Struct.class);
Struct value = (Struct) result.value();

assertThat(value.get("account_id")).isEqualTo(1L);

Struct cdcMetadata = value.getStruct("_cdc");
assertThat(cdcMetadata.get("op")).isEqualTo("U");
assertThat(cdcMetadata.get("source")).isEqualTo("test_schema.test_table");
assertThat(cdcMetadata.get("target")).isEqualTo("pattern.test_schema_test_table");
assertThat(cdcMetadata.get("key")).isInstanceOf(Struct.class);
}
}

@Test
public void testDebeziumTransformWithSchemaIncludeDbname() {
try (DebeziumTransform<SinkRecord> smt = new DebeziumTransform<>()) {
smt.configure(
ImmutableMap.of(
"cdc.target.pattern", "pattern.{db}_{schema}_{table}",
"cdc.target.schema.replace.dbname", "false"));

Struct event = createDebeziumEventStruct("u");
Struct key = new Struct(KEY_SCHEMA).put("account_id", 1L);
SinkRecord record = new SinkRecord("topic", 0, KEY_SCHEMA, key, VALUE_SCHEMA, event, 0);

SinkRecord result = smt.apply(record);
assertThat(result.value()).isInstanceOf(Struct.class);
Struct value = (Struct) result.value();

assertThat(value.get("account_id")).isEqualTo(1L);

Struct cdcMetadata = value.getStruct("_cdc");
assertThat(cdcMetadata.get("op")).isEqualTo("U");
assertThat(cdcMetadata.get("source")).isEqualTo("test_db.test_schema.test_table");
assertThat(cdcMetadata.get("target")).isEqualTo("pattern.test_db_test_schema_test_table");
assertThat(cdcMetadata.get("key")).isInstanceOf(Struct.class);
}
}

@Test
public void testDebeziumTransformWithSchemaNotReplaceDbname() {
try (DebeziumTransform<SinkRecord> smt = new DebeziumTransform<>()) {
smt.configure(ImmutableMap.of("cdc.target.schema.replace.dbname", "false"));

Struct event = createDebeziumEventStruct("u");
Struct key = new Struct(KEY_SCHEMA).put("account_id", 1L);
Expand All @@ -109,18 +186,18 @@ public void testDebeziumTransformWithSchema() {

Struct cdcMetadata = value.getStruct("_cdc");
assertThat(cdcMetadata.get("op")).isEqualTo("U");
assertThat(cdcMetadata.get("source")).isEqualTo("schema.tbl");
assertThat(cdcMetadata.get("target")).isEqualTo("schema_x.tbl_x");
assertThat(cdcMetadata.get("source")).isEqualTo("test_db.test_schema.test_table");
assertThat(cdcMetadata.get("target")).isEqualTo("test_db.test_schema.test_table");
assertThat(cdcMetadata.get("key")).isInstanceOf(Struct.class);
}
}

private Map<String, Object> createDebeziumEventMap(String operation) {
Map<String, Object> source =
ImmutableMap.of(
"db", "db",
"schema", "schema",
"table", "tbl");
"db", "test_db",
// "schema", "test_schema",
"table", "test_table");

Map<String, Object> data =
ImmutableMap.of(
Expand All @@ -138,7 +215,10 @@ private Map<String, Object> createDebeziumEventMap(String operation) {

private Struct createDebeziumEventStruct(String operation) {
Struct source =
new Struct(SOURCE_SCHEMA).put("db", "db").put("schema", "schema").put("table", "tbl");
new Struct(SOURCE_SCHEMA)
.put("db", "test_db")
.put("schema", "test_schema")
.put("table", "test_table");

Struct data =
new Struct(ROW_SCHEMA)
Expand Down