-
Notifications
You must be signed in to change notification settings - Fork 2.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Kafka Connect: Record converters #9641
Conversation
42c4eca
to
fefe257
Compare
} | ||
|
||
@Override | ||
public void write(SinkRecord record) { | ||
try { | ||
// TODO: config to handle tombstones instead of always ignoring? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this line can probably be removed as it already exists right below this one
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whoops, thanks, I removed the duplicate comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
overall this LGTM, just a few minor things I noticed
|
||
@Override | ||
public <T> void set(int pos, T value) { | ||
throw new UnsupportedOperationException(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be good if all of the UOE would have a short message saying what exactly isn't supported. This makes it easier to to understand where the UOE is coming from when it ever is getting thrown and stack traces aren't shown in the logs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I added some exception messages for these.
|
||
private static final JsonConverter JSON_CONVERTER = new JsonConverter(); | ||
|
||
static { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: could also be done in a @BeforeAll
as I don't think static
blocks are used that frequently in tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I made this change.
case DECIMAL: | ||
return convertDecimal(value, (DecimalType) type); | ||
case BOOLEAN: | ||
return convertBoolean(value); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
boolean don't seem to be tested in the existing test cases
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I added this.
protected int convertInt(Object value) { | ||
if (value instanceof Number) { | ||
return ((Number) value).intValue(); | ||
} else if (value instanceof String) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've noticed that the String part for ints/longs/floats/doubles isn't tested in the existing tests. maybe it would be good to test those if that's easily possible
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I agree. I added test cases for these.
53cfd92
to
374093a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, @danielcweeks could you also take a look please?
6bb988f
to
451b18b
Compare
6cfc05d
to
b2e7352
Compare
Maybe @fqaiser94 is also interested in reviewing this :) |
b2e7352
to
3e33382
Compare
...nect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/PartitionedDeltaWriter.java
Outdated
Show resolved
Hide resolved
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java
Outdated
Show resolved
Hide resolved
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java
Outdated
Show resolved
Hide resolved
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordProjection.java
Outdated
Show resolved
Hide resolved
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriter.java
Show resolved
Hide resolved
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriter.java
Outdated
Show resolved
Hide resolved
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/Operation.java
Outdated
Show resolved
Hide resolved
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordWrapper.java
Outdated
Show resolved
Hide resolved
...ct/kafka-connect/src/main/java/org/apache/iceberg/connect/data/UnpartitionedDeltaWriter.java
Outdated
Show resolved
Hide resolved
A few comments/questions, but overall looks good. You might want to double check all the access levels for classes as I think we're generally overexposing. |
|
||
@Override | ||
protected StructLike asStructLike(Record data) { | ||
return wrapper.wrap(data); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
q: why do we need to wrap these data records?
I'm able to replace this line with this:
return wrapper.wrap(data); | |
return data; |
and still pass all the tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this class has been removed
private final InternalRecordWrapper wrapper; | ||
private final InternalRecordWrapper keyWrapper; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess similar to the above, why do we have these wrappers at all?
I was able to remove these and their usages and was still to able pass the tests in the original tabular repo as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The wrapper adds some transforms that are needed in some places where the value type expected doesn't match what the file writer expects. For example, the Parquet writer expects java.time
objects for date/time values, but longs are expected other places, like when the partition key is created. This is modeled after the Flink sink's BaseDeltaTaskWriter
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI this class has been removed
// FIXME: update this when the record converter is added | ||
return null; | ||
if (!config.evolveSchemaEnabled()) { | ||
return recordConverter.convert(record.value()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems we only really sink the value
portion of the records.
Is the idea that if users have valuable data in the key
, they should combine this sink-connector with an appropriate SMT to move the data from the key
into the value
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that's right. The thought is we will have a subproject for useful SMTs and this might be one we would want to add.
if (schemaUpdateConsumer != null) { | ||
String parentFieldName = | ||
structFieldId < 0 ? null : tableSchema.findColumnName(structFieldId); | ||
Type type = SchemaUtils.toIcebergType(recordField.schema(), config); | ||
schemaUpdateConsumer.addColumn(parentFieldName, recordField.name(), type); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CMIIW but I'm wondering if there is a bug here? When we enter this branch, although we "update the table schema" to add a new column but we don't seem to add the value of that new column to the (GenericRecord) result
.
I feel like this test should pass but currently it does not:
@Test
public void testNewColumn() {
final org.apache.iceberg.Schema table_schema = new org.apache.iceberg.Schema(
NestedField.required(20, "i", IntegerType.get()));
final Table table = mock(Table.class);
when(table.schema()).thenReturn(table_schema);
final Schema connect_schema =
SchemaBuilder.struct()
.field("i", Schema.INT32_SCHEMA)
.field("l", Schema.INT32_SCHEMA);
final Struct data = new Struct(connect_schema).put("i", 1).put("l", 2);
final RecordConverter converter = new RecordConverter(table, config);
final SchemaUpdate.Consumer consumer = new SchemaUpdate.Consumer();
final Record record = converter.convert(data, consumer);
// should add column "l"
assertThat(consumer.empty()).isFalse();
GenericRecord rec = (GenericRecord) record;
assertThat(rec.getField("i")).isEqualTo(1);
assertThat(rec.getField("l")).isEqualTo(2); // this assertion fails, expected=2, actual=null
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The value isn't added to the record until it exists in Iceberg. This will return the record to IcebergWriter
without the value but with the requested schema change. IcebergWriter
will apply the schema change and reconvert the record, at which point the value should be present in the record.
// convert the row again, this time using the new table schema | ||
row = recordConverter.convert(record.value(), null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ahhh I see now, you convert it again afterwards with the new schema, and presumably this time you won't hit that branch and will include the value in the resulting row ...
Is the fundamental reason we need to do this twice because we basically don't know the new field's ID before the schema evolution is executed and therefore can't add the new field to the GenericRecord
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that's right.
if (opValue == null) { | ||
return Operation.INSERT; | ||
} | ||
|
||
String opStr = opValue.toString().trim().toUpperCase(); | ||
if (opStr.isEmpty()) { | ||
return Operation.INSERT; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to Dan's comment, I feel this is also a bit overly permissive.
Wondering in general if it would be better to start off strict and relax constraints in the future if necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code has been removed
private String ensureTimestampFormat(String str) { | ||
String result = str; | ||
if (result.charAt(10) == ' ') { | ||
result = result.substring(0, 10) + 'T' + result.substring(11); | ||
} | ||
if (result.length() > 22 && result.charAt(19) == '+' && result.charAt(22) == ':') { | ||
result = result.substring(0, 19) + result.substring(19).replace(":", ""); | ||
} | ||
return result; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a strong reason for supporting more than just ISO_OFFSET_DATE_TIME
/ISO_LOCAL_DATE_TIME
formats? e.g. do we do similar work in other connectors e.g. Flink/Spark? IMO would be more preferable to leave it to users to write an SMT if necessary to convert their Strings to one of those timestamp formats.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was requested by users and it seemed reasonable to permit some well known variations of the ISO formats (e.g. space instead of T
or offsets with a :
).
List<Object> inputList = | ||
ImmutableList.of( | ||
"2023-05-18T11:22:33Z", | ||
"2023-05-18 11:22:33Z", | ||
"2023-05-18T11:22:33+00", | ||
"2023-05-18 11:22:33+00", | ||
"2023-05-18T11:22:33+00:00", | ||
"2023-05-18 11:22:33+00:00", | ||
"2023-05-18T11:22:33+0000", | ||
"2023-05-18 11:22:33+0000", | ||
"2023-05-18T11:22:33", | ||
"2023-05-18 11:22:33", | ||
expectedMillis, | ||
new Date(expectedMillis), | ||
OffsetDateTime.ofInstant(Instant.ofEpochMilli(expectedMillis), ZoneOffset.UTC), | ||
LocalDateTime.ofInstant(Instant.ofEpochMilli(expectedMillis), ZoneOffset.UTC)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Following on from the above, these would the expected/supported strings IMO.
List<Object> inputList =
ImmutableList.of(
"2023-05-18T11:22:33Z",
// "2023-05-18 11:22:33Z",
"2023-05-18T11:22:33+00",
// "2023-05-18 11:22:33+00",
"2023-05-18T11:22:33+00:00",
// "2023-05-18 11:22:33+00:00",
// "2023-05-18T11:22:33+0000",
// "2023-05-18 11:22:33+0000",
"2023-05-18T11:22:33",
// "2023-05-18 11:22:33",
expectedMillis,
new Date(expectedMillis),
OffsetDateTime.ofInstant(Instant.ofEpochMilli(expectedMillis), ZoneOffset.UTC),
LocalDateTime.ofInstant(Instant.ofEpochMilli(expectedMillis), ZoneOffset.UTC));
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left the format in, I felt it was OK to be a bit flexible here for convenience, as these date format variations are very commonly found.
ImmutableList.of( | ||
"2023-05-18T11:22:33Z", | ||
"2023-05-18 11:22:33Z", | ||
"2023-05-18T11:22:33+00", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: is it worth adding a test-case somwhere with a timezone offset that isn't +00
e.g.
"2023-05-18T12:22:33+01",
or have I just missed it somewhere? :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added some tests for this.
|
||
public RecordConverter(Table table, IcebergSinkConfig config) { | ||
this.tableSchema = table.schema(); | ||
this.nameMapping = createNameMapping(table); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not particularly familiar with name-mapping but do we need to worry at all about the name-mapping property changing in the table in the background?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name mapping would only get updated when the table is reloaded.
converter.convert(data, consumer); | ||
Collection<AddColumn> addCols = consumer.addColumns(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: i found it helpful as a reader to see what it actually returns :)
converter.convert(data, consumer); | |
Collection<AddColumn> addCols = consumer.addColumns(); | |
Record record = converter.convert(data, consumer); | |
GenericRecord rec = (GenericRecord) record; | |
assertThat(rec.size()).isEqualTo(1); | |
assertThat(rec.struct().fields().get(0).name()).isEqualTo("ii"); | |
assertThat(rec.getField("ii")).isEqualTo(null); | |
Collection<AddColumn> addCols = consumer.addColumns(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like this is overloading the test, which is testing the missing column detection, not record conversion which is covered in the other tests.
/** | ||
* This is modified from {@link org.apache.iceberg.util.StructProjection} to support record types. | ||
*/ | ||
public class RecordProjection implements Record { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: would it be worth it to write some unit tests for this class?
If so, feel free to use the following as a starting point (wrote a couple quickly while I was trying to understand things).
package org.apache.iceberg.connect.data;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.types.Types;
import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import org.apache.iceberg.data.Record;
class RecordProjectionTest {
@Test
void testShouldReturnOnlyValuesInProjectionSchema() {
final org.apache.iceberg.Schema SIMPLE_SCHEMA =
new org.apache.iceberg.Schema(
Types.NestedField.required(1, "a", Types.IntegerType.get()),
Types.NestedField.required(2, "b", Types.StringType.get()));
final org.apache.iceberg.Schema PROJECTION_SCHEMA =
new org.apache.iceberg.Schema(
Types.NestedField.required(1, "a", Types.IntegerType.get()));
final RecordProjection projection = RecordProjection.create(SIMPLE_SCHEMA, PROJECTION_SCHEMA);
final Record record = GenericRecord.create(SIMPLE_SCHEMA);
record.setField("a", 234L);
record.setField("b", "foobar");
final Record projectedRecord = projection.wrap(record);
assertThat(projectedRecord.get(0)).isEqualTo(234L);
assertThatThrownBy(() -> projectedRecord.get(1))
.isInstanceOf(ArrayIndexOutOfBoundsException.class)
.hasMessageContaining("Index 1 out of bounds for length 1");
}
@Test
void testShouldReturnOnlyValuesInNestedProjectionSchema() {
final Types.StructType A_STRUCT = Types.StructType.of(
Types.NestedField.required(2, "b", Types.IntegerType.get()),
Types.NestedField.required(3, "c", Types.StringType.get())
);
final org.apache.iceberg.Schema NESTED_SCHEMA =
new org.apache.iceberg.Schema(
Types.NestedField.required(1, "a", A_STRUCT)
);
final org.apache.iceberg.Schema NESTED_PROJECTION_SCHEMA =
new org.apache.iceberg.Schema(
Types.NestedField.required(1, "a", Types.StructType.of(
Types.NestedField.required(2, "b", Types.IntegerType.get())
))
);
final RecordProjection projection = RecordProjection.create(NESTED_SCHEMA, NESTED_PROJECTION_SCHEMA);
final Record aRecord = GenericRecord.create(A_STRUCT);
aRecord.setField("b", 234L);
aRecord.setField("c", "foobar");
final Record record = GenericRecord.create(NESTED_SCHEMA);
record.setField("a", aRecord);
final Record projectedRecord = projection.wrap(record);
final Record aProjectedRecord = (Record) projectedRecord.get(0);
assertThat(aProjectedRecord.get(0)).isEqualTo(234L);
assertThatThrownBy(() -> aProjectedRecord.get(1))
.isInstanceOf(ArrayIndexOutOfBoundsException.class)
.hasMessageContaining("Index 1 out of bounds for length 1");
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test isn't needed anymore as the class was removed.
looks good to me overall, mostly questions/nits 👍 |
545fe3b
to
96061ca
Compare
FYI, I removed the delta writers along with upsert and CDC related code. |
What's expected to be included in the next kafka-connect PR? Asking b/c I'm doing some work on implementing zombie-fencing in tabular/iceberg-kafka-connect currently and we may want to wait to see what that looks like in terms of code changes before tackling the worker/coordinator portions of the code (in case it ends up being a large refactor). |
The next submission will be the coordinator piece. I have already addressed zombie fencing in that upcoming code, so I feel we can proceed with it for consideration. |
c5ceee1
to
bb88082
Compare
bb88082
to
1207d9c
Compare
I was planning on merging this, unless someone wants to give more feedback, cc @fqaiser94 @danielcweeks |
This PR is the next stage in submitting the Iceberg Kafka Connect sink connector, and is a follow up to #8701 and #9466. It includes the record converters with related config and tests.
Still not included in the sink are the commit controller, integration tests, or docs, which will be added in follow up PRs. For reference, the current sink implementation can be found at https://github.com/tabular-io/iceberg-kafka-connect.