Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka Connect: Record converters #9641

Merged
merged 15 commits into from
Mar 21, 2024
Merged

Conversation

bryanck
Copy link
Contributor

@bryanck bryanck commented Feb 4, 2024

This PR is the next stage in submitting the Iceberg Kafka Connect sink connector, and is a follow up to #8701 and #9466. It includes the record converters with related config and tests.

Still not included in the sink are the commit controller, integration tests, or docs, which will be added in follow up PRs. For reference, the current sink implementation can be found at https://github.com/tabular-io/iceberg-kafka-connect.

}

@Override
public void write(SinkRecord record) {
try {
// TODO: config to handle tombstones instead of always ignoring?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this line can probably be removed as it already exists right below this one

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whoops, thanks, I removed the duplicate comment.

Copy link
Contributor

@nastra nastra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overall this LGTM, just a few minor things I noticed


@Override
public <T> void set(int pos, T value) {
throw new UnsupportedOperationException();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be good if all of the UOE would have a short message saying what exactly isn't supported. This makes it easier to to understand where the UOE is coming from when it ever is getting thrown and stack traces aren't shown in the logs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I added some exception messages for these.


private static final JsonConverter JSON_CONVERTER = new JsonConverter();

static {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could also be done in a @BeforeAll as I don't think static blocks are used that frequently in tests

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I made this change.

case DECIMAL:
return convertDecimal(value, (DecimalType) type);
case BOOLEAN:
return convertBoolean(value);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

boolean don't seem to be tested in the existing test cases

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I added this.

protected int convertInt(Object value) {
if (value instanceof Number) {
return ((Number) value).intValue();
} else if (value instanceof String) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've noticed that the String part for ints/longs/floats/doubles isn't tested in the existing tests. maybe it would be good to test those if that's easily possible

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree. I added test cases for these.

@bryanck bryanck force-pushed the kc-connector-convert branch from 53cfd92 to 374093a Compare February 6, 2024 17:49
Copy link
Contributor

@nastra nastra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, @danielcweeks could you also take a look please?

@bryanck bryanck force-pushed the kc-connector-convert branch from 6bb988f to 451b18b Compare February 11, 2024 15:54
@bryanck bryanck force-pushed the kc-connector-convert branch from 6cfc05d to b2e7352 Compare February 19, 2024 02:23
@Fokko Fokko requested a review from danielcweeks February 21, 2024 09:33
@Fokko
Copy link
Contributor

Fokko commented Feb 21, 2024

Maybe @fqaiser94 is also interested in reviewing this :)

@bryanck bryanck force-pushed the kc-connector-convert branch from b2e7352 to 3e33382 Compare February 25, 2024 16:06
@danielcweeks
Copy link
Contributor

A few comments/questions, but overall looks good. You might want to double check all the access levels for classes as I think we're generally overexposing.


@Override
protected StructLike asStructLike(Record data) {
return wrapper.wrap(data);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

q: why do we need to wrap these data records?

I'm able to replace this line with this:

Suggested change
return wrapper.wrap(data);
return data;

and still pass all the tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this class has been removed

Comment on lines 41 to 42
private final InternalRecordWrapper wrapper;
private final InternalRecordWrapper keyWrapper;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess similar to the above, why do we have these wrappers at all?
I was able to remove these and their usages and was still to able pass the tests in the original tabular repo as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The wrapper adds some transforms that are needed in some places where the value type expected doesn't match what the file writer expects. For example, the Parquet writer expects java.time objects for date/time values, but longs are expected other places, like when the partition key is created. This is modeled after the Flink sink's BaseDeltaTaskWriter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI this class has been removed

// FIXME: update this when the record converter is added
return null;
if (!config.evolveSchemaEnabled()) {
return recordConverter.convert(record.value());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we only really sink the value portion of the records.
Is the idea that if users have valuable data in the key, they should combine this sink-connector with an appropriate SMT to move the data from the key into the value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's right. The thought is we will have a subproject for useful SMTs and this might be one we would want to add.

Comment on lines +204 to +216
if (schemaUpdateConsumer != null) {
String parentFieldName =
structFieldId < 0 ? null : tableSchema.findColumnName(structFieldId);
Type type = SchemaUtils.toIcebergType(recordField.schema(), config);
schemaUpdateConsumer.addColumn(parentFieldName, recordField.name(), type);
}
Copy link
Contributor

@fqaiser94 fqaiser94 Feb 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CMIIW but I'm wondering if there is a bug here? When we enter this branch, although we "update the table schema" to add a new column but we don't seem to add the value of that new column to the (GenericRecord) result.

I feel like this test should pass but currently it does not:

  @Test
  public void testNewColumn() {
    final org.apache.iceberg.Schema table_schema = new org.apache.iceberg.Schema(
            NestedField.required(20, "i", IntegerType.get()));
    final Table table = mock(Table.class);
    when(table.schema()).thenReturn(table_schema);

    final Schema connect_schema =
            SchemaBuilder.struct()
                    .field("i", Schema.INT32_SCHEMA)
                    .field("l", Schema.INT32_SCHEMA);
    final Struct data = new Struct(connect_schema).put("i", 1).put("l", 2);

    final RecordConverter converter = new RecordConverter(table, config);
    final SchemaUpdate.Consumer consumer = new SchemaUpdate.Consumer();
    final Record record = converter.convert(data, consumer);

    // should add column "l"
    assertThat(consumer.empty()).isFalse();

    GenericRecord rec = (GenericRecord) record;
    assertThat(rec.getField("i")).isEqualTo(1);
    assertThat(rec.getField("l")).isEqualTo(2); // this assertion fails, expected=2, actual=null
  }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The value isn't added to the record until it exists in Iceberg. This will return the record to IcebergWriter without the value but with the requested schema change. IcebergWriter will apply the schema change and reconvert the record, at which point the value should be present in the record.

Comment on lines +95 to +90
// convert the row again, this time using the new table schema
row = recordConverter.convert(record.value(), null);
Copy link
Contributor

@fqaiser94 fqaiser94 Feb 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ahhh I see now, you convert it again afterwards with the new schema, and presumably this time you won't hit that branch and will include the value in the resulting row ...

Is the fundamental reason we need to do this twice because we basically don't know the new field's ID before the schema evolution is executed and therefore can't add the new field to the GenericRecord?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's right.

Comment on lines 105 to 112
if (opValue == null) {
return Operation.INSERT;
}

String opStr = opValue.toString().trim().toUpperCase();
if (opStr.isEmpty()) {
return Operation.INSERT;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to Dan's comment, I feel this is also a bit overly permissive.

Wondering in general if it would be better to start off strict and relax constraints in the future if necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code has been removed

Comment on lines 498 to 516
private String ensureTimestampFormat(String str) {
String result = str;
if (result.charAt(10) == ' ') {
result = result.substring(0, 10) + 'T' + result.substring(11);
}
if (result.length() > 22 && result.charAt(19) == '+' && result.charAt(22) == ':') {
result = result.substring(0, 19) + result.substring(19).replace(":", "");
}
return result;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a strong reason for supporting more than just ISO_OFFSET_DATE_TIME/ISO_LOCAL_DATE_TIME formats? e.g. do we do similar work in other connectors e.g. Flink/Spark? IMO would be more preferable to leave it to users to write an SMT if necessary to convert their Strings to one of those timestamp formats.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was requested by users and it seemed reasonable to permit some well known variations of the ISO formats (e.g. space instead of T or offsets with a :).

Comment on lines 549 to 584
List<Object> inputList =
ImmutableList.of(
"2023-05-18T11:22:33Z",
"2023-05-18 11:22:33Z",
"2023-05-18T11:22:33+00",
"2023-05-18 11:22:33+00",
"2023-05-18T11:22:33+00:00",
"2023-05-18 11:22:33+00:00",
"2023-05-18T11:22:33+0000",
"2023-05-18 11:22:33+0000",
"2023-05-18T11:22:33",
"2023-05-18 11:22:33",
expectedMillis,
new Date(expectedMillis),
OffsetDateTime.ofInstant(Instant.ofEpochMilli(expectedMillis), ZoneOffset.UTC),
LocalDateTime.ofInstant(Instant.ofEpochMilli(expectedMillis), ZoneOffset.UTC));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Following on from the above, these would the expected/supported strings IMO.

    List<Object> inputList =
        ImmutableList.of(
            "2023-05-18T11:22:33Z",
//            "2023-05-18 11:22:33Z",
            "2023-05-18T11:22:33+00",
//            "2023-05-18 11:22:33+00",
            "2023-05-18T11:22:33+00:00",
//            "2023-05-18 11:22:33+00:00",
//            "2023-05-18T11:22:33+0000",
//            "2023-05-18 11:22:33+0000",
            "2023-05-18T11:22:33",
//            "2023-05-18 11:22:33",
            expectedMillis,
            new Date(expectedMillis),
            OffsetDateTime.ofInstant(Instant.ofEpochMilli(expectedMillis), ZoneOffset.UTC),
            LocalDateTime.ofInstant(Instant.ofEpochMilli(expectedMillis), ZoneOffset.UTC));

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left the format in, I felt it was OK to be a bit flexible here for convenience, as these date format variations are very commonly found.

ImmutableList.of(
"2023-05-18T11:22:33Z",
"2023-05-18 11:22:33Z",
"2023-05-18T11:22:33+00",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: is it worth adding a test-case somwhere with a timezone offset that isn't +00 e.g.

            "2023-05-18T12:22:33+01", 

or have I just missed it somewhere? :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added some tests for this.


public RecordConverter(Table table, IcebergSinkConfig config) {
this.tableSchema = table.schema();
this.nameMapping = createNameMapping(table);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not particularly familiar with name-mapping but do we need to worry at all about the name-mapping property changing in the table in the background?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name mapping would only get updated when the table is reloaded.

Comment on lines +668 to +698
converter.convert(data, consumer);
Collection<AddColumn> addCols = consumer.addColumns();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: i found it helpful as a reader to see what it actually returns :)

Suggested change
converter.convert(data, consumer);
Collection<AddColumn> addCols = consumer.addColumns();
Record record = converter.convert(data, consumer);
GenericRecord rec = (GenericRecord) record;
assertThat(rec.size()).isEqualTo(1);
assertThat(rec.struct().fields().get(0).name()).isEqualTo("ii");
assertThat(rec.getField("ii")).isEqualTo(null);
Collection<AddColumn> addCols = consumer.addColumns();

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like this is overloading the test, which is testing the missing column detection, not record conversion which is covered in the other tests.

/**
* This is modified from {@link org.apache.iceberg.util.StructProjection} to support record types.
*/
public class RecordProjection implements Record {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: would it be worth it to write some unit tests for this class?

If so, feel free to use the following as a starting point (wrote a couple quickly while I was trying to understand things).

package org.apache.iceberg.connect.data;

import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.types.Types;
import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import org.apache.iceberg.data.Record;

class RecordProjectionTest {

    @Test
    void testShouldReturnOnlyValuesInProjectionSchema() {
        final org.apache.iceberg.Schema SIMPLE_SCHEMA =
                new org.apache.iceberg.Schema(
                        Types.NestedField.required(1, "a", Types.IntegerType.get()),
                        Types.NestedField.required(2, "b", Types.StringType.get()));

        final org.apache.iceberg.Schema PROJECTION_SCHEMA =
                new org.apache.iceberg.Schema(
                        Types.NestedField.required(1, "a", Types.IntegerType.get()));

        final RecordProjection projection = RecordProjection.create(SIMPLE_SCHEMA, PROJECTION_SCHEMA);
        final Record record = GenericRecord.create(SIMPLE_SCHEMA);
        record.setField("a", 234L);
        record.setField("b", "foobar");
        final Record projectedRecord = projection.wrap(record);

        assertThat(projectedRecord.get(0)).isEqualTo(234L);
        assertThatThrownBy(() -> projectedRecord.get(1))
                .isInstanceOf(ArrayIndexOutOfBoundsException.class)
                .hasMessageContaining("Index 1 out of bounds for length 1");
    }

    @Test
    void testShouldReturnOnlyValuesInNestedProjectionSchema() {
        final Types.StructType A_STRUCT = Types.StructType.of(
                Types.NestedField.required(2, "b", Types.IntegerType.get()),
                Types.NestedField.required(3, "c", Types.StringType.get())
        );

        final org.apache.iceberg.Schema NESTED_SCHEMA =
                new org.apache.iceberg.Schema(
                        Types.NestedField.required(1, "a", A_STRUCT)
                );

        final org.apache.iceberg.Schema NESTED_PROJECTION_SCHEMA =
                new org.apache.iceberg.Schema(
                        Types.NestedField.required(1, "a", Types.StructType.of(
                                Types.NestedField.required(2, "b", Types.IntegerType.get())
                        ))
                );

        final RecordProjection projection = RecordProjection.create(NESTED_SCHEMA, NESTED_PROJECTION_SCHEMA);

        final Record aRecord = GenericRecord.create(A_STRUCT);
        aRecord.setField("b", 234L);
        aRecord.setField("c", "foobar");

        final Record record = GenericRecord.create(NESTED_SCHEMA);
        record.setField("a", aRecord);

        final Record projectedRecord = projection.wrap(record);
        final Record aProjectedRecord = (Record) projectedRecord.get(0);

        assertThat(aProjectedRecord.get(0)).isEqualTo(234L);
        assertThatThrownBy(() -> aProjectedRecord.get(1))
                .isInstanceOf(ArrayIndexOutOfBoundsException.class)
                .hasMessageContaining("Index 1 out of bounds for length 1");
    }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test isn't needed anymore as the class was removed.

@fqaiser94
Copy link
Contributor

looks good to me overall, mostly questions/nits 👍

@bryanck bryanck force-pushed the kc-connector-convert branch from 545fe3b to 96061ca Compare February 29, 2024 00:32
@bryanck
Copy link
Contributor Author

bryanck commented Feb 29, 2024

FYI, I removed the delta writers along with upsert and CDC related code.

@bryanck bryanck changed the title Kafka Connect: Record converters and delta writers Kafka Connect: Record converters Mar 2, 2024
@fqaiser94
Copy link
Contributor

What's expected to be included in the next kafka-connect PR?

Asking b/c I'm doing some work on implementing zombie-fencing in tabular/iceberg-kafka-connect currently and we may want to wait to see what that looks like in terms of code changes before tackling the worker/coordinator portions of the code (in case it ends up being a large refactor).

@bryanck
Copy link
Contributor Author

bryanck commented Mar 12, 2024

What's expected to be included in the next kafka-connect PR?

Asking b/c I'm doing some work on implementing zombie-fencing in tabular/iceberg-kafka-connect currently and we may want to wait to see what that looks like in terms of code changes before tackling the worker/coordinator portions of the code (in case it ends up being a large refactor).

The next submission will be the coordinator piece. I have already addressed zombie fencing in that upcoming code, so I feel we can proceed with it for consideration.

@bryanck bryanck force-pushed the kc-connector-convert branch from bb88082 to 1207d9c Compare March 21, 2024 13:50
@bryanck
Copy link
Contributor Author

bryanck commented Mar 21, 2024

I was planning on merging this, unless someone wants to give more feedback, cc @fqaiser94 @danielcweeks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants