Skip to content

Commit

Permalink
remove delta writers
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanck committed Feb 29, 2024
1 parent 3e33382 commit 96061ca
Show file tree
Hide file tree
Showing 11 changed files with 13 additions and 793 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,6 @@ public class IcebergSinkConfig extends AbstractConfig {
private static final String TABLES_DEFAULT_COMMIT_BRANCH = "iceberg.tables.default-commit-branch";
private static final String TABLES_DEFAULT_ID_COLUMNS = "iceberg.tables.default-id-columns";
private static final String TABLES_DEFAULT_PARTITION_BY = "iceberg.tables.default-partition-by";
private static final String TABLES_CDC_FIELD_PROP = "iceberg.tables.cdc-field";
private static final String TABLES_UPSERT_MODE_ENABLED_PROP =
"iceberg.tables.upsert-mode-enabled";
private static final String TABLES_AUTO_CREATE_ENABLED_PROP =
"iceberg.tables.auto-create-enabled";
private static final String TABLES_EVOLVE_SCHEMA_ENABLED_PROP =
Expand Down Expand Up @@ -152,18 +149,6 @@ private static ConfigDef newConfigDef() {
null,
Importance.MEDIUM,
"Default partition spec to use when creating tables, comma-separated");
configDef.define(
TABLES_CDC_FIELD_PROP,
ConfigDef.Type.STRING,
null,
Importance.MEDIUM,
"Source record field that identifies the type of operation (insert, update, or delete)");
configDef.define(
TABLES_UPSERT_MODE_ENABLED_PROP,
ConfigDef.Type.BOOLEAN,
false,
Importance.MEDIUM,
"Set to true to treat all appends as upserts, false otherwise");
configDef.define(
TABLES_AUTO_CREATE_ENABLED_PROP,
ConfigDef.Type.BOOLEAN,
Expand Down Expand Up @@ -422,14 +407,6 @@ public String hadoopConfDir() {
return getString(HADDOP_CONF_DIR_PROP);
}

public String tablesCdcField() {
return getString(TABLES_CDC_FIELD_PROP);
}

public boolean upsertModeEnabled() {
return getBoolean(TABLES_UPSERT_MODE_ENABLED_PROP);
}

public boolean autoCreateEnabled() {
return getBoolean(TABLES_AUTO_CREATE_ENABLED_PROP);
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,7 @@ public void write(SinkRecord record) {
// ignore tombstones...
if (record.value() != null) {
Record row = convertToRow(record);
String cdcField = config.tablesCdcField();
if (cdcField == null) {
writer.write(row);
} else {
Operation op = extractCdcOperation(record.value(), cdcField);
writer.write(new RecordWrapper(row, op));
}
writer.write(row);
}
} catch (Exception e) {
throw new DataException(
Expand Down Expand Up @@ -99,28 +93,6 @@ private Record convertToRow(SinkRecord record) {
return row;
}

private Operation extractCdcOperation(Object recordValue, String cdcField) {
Object opValue = Utilities.extractFromRecordValue(recordValue, cdcField);

if (opValue == null) {
return Operation.INSERT;
}

String opStr = opValue.toString().trim().toUpperCase();
if (opStr.isEmpty()) {
return Operation.INSERT;
}

switch (opStr.charAt(0)) {
case 'U':
return Operation.UPDATE;
case 'D':
return Operation.DELETE;
default:
return Operation.INSERT;
}
}

private void flush() {
WriteResult writeResult;
try {
Expand Down

This file was deleted.

This file was deleted.

Loading

0 comments on commit 96061ca

Please sign in to comment.