Skip to content

Commit

Permalink
Kafka Connect: Record converters (#9641)
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanck authored Mar 21, 2024
1 parent 59ffa33 commit e769add
Show file tree
Hide file tree
Showing 12 changed files with 1,526 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@
*/
package org.apache.iceberg.connect.events;

import static java.util.stream.Collectors.toList;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.util.Utf8;
Expand Down Expand Up @@ -96,7 +95,9 @@ public void put(int i, Object v) {
return;
case NAMESPACE:
this.namespace =
v == null ? null : ((List<Utf8>) v).stream().map(Utf8::toString).collect(toList());
v == null
? null
: ((List<Utf8>) v).stream().map(Utf8::toString).collect(Collectors.toList());
return;
case NAME:
this.name = v == null ? null : v.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
*/
package org.apache.iceberg.connect;

import static java.util.stream.Collectors.toList;

import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
Expand All @@ -28,6 +26,7 @@
import java.util.Map;
import java.util.Properties;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.iceberg.IcebergBuild;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
Expand Down Expand Up @@ -72,7 +71,6 @@ public class IcebergSinkConfig extends AbstractConfig {
private static final String TABLES_DEFAULT_COMMIT_BRANCH = "iceberg.tables.default-commit-branch";
private static final String TABLES_DEFAULT_ID_COLUMNS = "iceberg.tables.default-id-columns";
private static final String TABLES_DEFAULT_PARTITION_BY = "iceberg.tables.default-partition-by";
// FIXME: add config for CDC and upsert mode
private static final String TABLES_AUTO_CREATE_ENABLED_PROP =
"iceberg.tables.auto-create-enabled";
private static final String TABLES_EVOLVE_SCHEMA_ENABLED_PROP =
Expand Down Expand Up @@ -365,7 +363,7 @@ static List<String> stringToList(String value, String regex) {
return ImmutableList.of();
}

return Arrays.stream(value.split(regex)).map(String::trim).collect(toList());
return Arrays.stream(value.split(regex)).map(String::trim).collect(Collectors.toList());
}

public String controlTopic() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@
*/
package org.apache.iceberg.connect;

import static java.util.stream.Collectors.toList;

import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.kafka.common.config.ConfigDef;
Expand Down Expand Up @@ -60,7 +59,7 @@ public List<Map<String, String>> taskConfigs(int maxTasks) {
map.put(IcebergSinkConfig.INTERNAL_TRANSACTIONAL_SUFFIX_PROP, txnSuffix + i);
return map;
})
.collect(toList());
.collect(Collectors.toList());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ public class IcebergWriter implements RecordWriter {
private final IcebergSinkConfig config;
private final List<WriterResult> writerResults;

// FIXME: update this when the record converter is added
// private RecordConverter recordConverter;
private RecordConverter recordConverter;
private TaskWriter<Record> writer;

public IcebergWriter(Table table, String tableName, IcebergSinkConfig config) {
Expand All @@ -52,19 +51,15 @@ public IcebergWriter(Table table, String tableName, IcebergSinkConfig config) {

private void initNewWriter() {
this.writer = Utilities.createTableWriter(table, tableName, config);
// FIXME: update this when the record converter is added
// this.recordConverter = new RecordConverter(table, config);
this.recordConverter = new RecordConverter(table, config);
}

@Override
public void write(SinkRecord record) {
try {
// TODO: config to handle tombstones instead of always ignoring?
// ignore tombstones...
if (record.value() != null) {
Record row = convertToRow(record);

// FIXME: add CDC operation support

writer.write(row);
}
} catch (Exception e) {
Expand All @@ -77,8 +72,25 @@ public void write(SinkRecord record) {
}

private Record convertToRow(SinkRecord record) {
// FIXME: update this when the record converter is added
return null;
if (!config.evolveSchemaEnabled()) {
return recordConverter.convert(record.value());
}

SchemaUpdate.Consumer updates = new SchemaUpdate.Consumer();
Record row = recordConverter.convert(record.value(), updates);

if (!updates.empty()) {
// complete the current file
flush();
// apply the schema updates, this will refresh the table
SchemaUtils.applySchemaUpdates(table, updates);
// initialize a new writer with the new schema
initNewWriter();
// convert the row again, this time using the new table schema
row = recordConverter.convert(record.value(), null);
}

return row;
}

private void flush() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import java.util.List;
import org.apache.kafka.connect.sink.SinkRecord;

public class NoOpWriter implements RecordWriter {
class NoOpWriter implements RecordWriter {
@Override
public void write(SinkRecord record) {
// NO-OP
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.io.PartitionedFanoutWriter;

public class PartitionedAppendWriter extends PartitionedFanoutWriter<Record> {
class PartitionedAppendWriter extends PartitionedFanoutWriter<Record> {

private final PartitionKey partitionKey;
private final InternalRecordWrapper wrapper;

public PartitionedAppendWriter(
PartitionedAppendWriter(
PartitionSpec spec,
FileFormat format,
FileAppenderFactory<Record> appenderFactory,
Expand Down
Loading

0 comments on commit e769add

Please sign in to comment.