Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka Connect: Record converters #9641

Merged
merged 15 commits into from
Mar 21, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@
*/
package org.apache.iceberg.connect.events;

import static java.util.stream.Collectors.toList;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.util.Utf8;
Expand Down Expand Up @@ -96,7 +95,9 @@ public void put(int i, Object v) {
return;
case NAMESPACE:
this.namespace =
v == null ? null : ((List<Utf8>) v).stream().map(Utf8::toString).collect(toList());
v == null
? null
: ((List<Utf8>) v).stream().map(Utf8::toString).collect(Collectors.toList());
return;
case NAME:
this.name = v == null ? null : v.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
*/
package org.apache.iceberg.connect;

import static java.util.stream.Collectors.toList;

import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
Expand All @@ -28,6 +26,7 @@
import java.util.Map;
import java.util.Properties;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.iceberg.IcebergBuild;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
Expand Down Expand Up @@ -72,7 +71,6 @@ public class IcebergSinkConfig extends AbstractConfig {
private static final String TABLES_DEFAULT_COMMIT_BRANCH = "iceberg.tables.default-commit-branch";
private static final String TABLES_DEFAULT_ID_COLUMNS = "iceberg.tables.default-id-columns";
private static final String TABLES_DEFAULT_PARTITION_BY = "iceberg.tables.default-partition-by";
// FIXME: add config for CDC and upsert mode
private static final String TABLES_AUTO_CREATE_ENABLED_PROP =
"iceberg.tables.auto-create-enabled";
private static final String TABLES_EVOLVE_SCHEMA_ENABLED_PROP =
Expand Down Expand Up @@ -365,7 +363,7 @@ static List<String> stringToList(String value, String regex) {
return ImmutableList.of();
}

return Arrays.stream(value.split(regex)).map(String::trim).collect(toList());
return Arrays.stream(value.split(regex)).map(String::trim).collect(Collectors.toList());
}

public String controlTopic() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@
*/
package org.apache.iceberg.connect;

import static java.util.stream.Collectors.toList;

import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.kafka.common.config.ConfigDef;
Expand Down Expand Up @@ -60,7 +59,7 @@ public List<Map<String, String>> taskConfigs(int maxTasks) {
map.put(IcebergSinkConfig.INTERNAL_TRANSACTIONAL_SUFFIX_PROP, txnSuffix + i);
return map;
})
.collect(toList());
.collect(Collectors.toList());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ public class IcebergWriter implements RecordWriter {
private final IcebergSinkConfig config;
private final List<WriterResult> writerResults;

// FIXME: update this when the record converter is added
// private RecordConverter recordConverter;
private RecordConverter recordConverter;
private TaskWriter<Record> writer;

public IcebergWriter(Table table, String tableName, IcebergSinkConfig config) {
Expand All @@ -52,19 +51,15 @@ public IcebergWriter(Table table, String tableName, IcebergSinkConfig config) {

private void initNewWriter() {
this.writer = Utilities.createTableWriter(table, tableName, config);
// FIXME: update this when the record converter is added
// this.recordConverter = new RecordConverter(table, config);
this.recordConverter = new RecordConverter(table, config);
danielcweeks marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public void write(SinkRecord record) {
try {
// TODO: config to handle tombstones instead of always ignoring?
// ignore tombstones...
if (record.value() != null) {
Record row = convertToRow(record);

// FIXME: add CDC operation support

writer.write(row);
}
} catch (Exception e) {
Expand All @@ -77,8 +72,25 @@ public void write(SinkRecord record) {
}

private Record convertToRow(SinkRecord record) {
// FIXME: update this when the record converter is added
return null;
if (!config.evolveSchemaEnabled()) {
return recordConverter.convert(record.value());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we only really sink the value portion of the records.
Is the idea that if users have valuable data in the key, they should combine this sink-connector with an appropriate SMT to move the data from the key into the value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's right. The thought is we will have a subproject for useful SMTs and this might be one we would want to add.

}

SchemaUpdate.Consumer updates = new SchemaUpdate.Consumer();
Record row = recordConverter.convert(record.value(), updates);

if (!updates.empty()) {
// complete the current file
flush();
// apply the schema updates, this will refresh the table
SchemaUtils.applySchemaUpdates(table, updates);
// initialize a new writer with the new schema
initNewWriter();
// convert the row again, this time using the new table schema
row = recordConverter.convert(record.value(), null);
Comment on lines +89 to +90
Copy link
Contributor

@fqaiser94 fqaiser94 Feb 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ahhh I see now, you convert it again afterwards with the new schema, and presumably this time you won't hit that branch and will include the value in the resulting row ...

Is the fundamental reason we need to do this twice because we basically don't know the new field's ID before the schema evolution is executed and therefore can't add the new field to the GenericRecord?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's right.

}

return row;
}

private void flush() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import java.util.List;
import org.apache.kafka.connect.sink.SinkRecord;

public class NoOpWriter implements RecordWriter {
class NoOpWriter implements RecordWriter {
@Override
public void write(SinkRecord record) {
// NO-OP
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.io.PartitionedFanoutWriter;

public class PartitionedAppendWriter extends PartitionedFanoutWriter<Record> {
class PartitionedAppendWriter extends PartitionedFanoutWriter<Record> {

private final PartitionKey partitionKey;
private final InternalRecordWrapper wrapper;

public PartitionedAppendWriter(
PartitionedAppendWriter(
PartitionSpec spec,
FileFormat format,
FileAppenderFactory<Record> appenderFactory,
Expand Down
Loading