From e769addf75400e9dffb72e7e417bc752f804622a Mon Sep 17 00:00:00 2001 From: Bryan Keller Date: Thu, 21 Mar 2024 14:08:16 -0700 Subject: [PATCH] Kafka Connect: Record converters (#9641) --- .../connect/events/TableReference.java | 7 +- .../iceberg/connect/IcebergSinkConfig.java | 6 +- .../iceberg/connect/IcebergSinkConnector.java | 5 +- .../iceberg/connect/data/IcebergWriter.java | 32 +- .../iceberg/connect/data/NoOpWriter.java | 2 +- .../connect/data/PartitionedAppendWriter.java | 4 +- .../iceberg/connect/data/RecordConverter.java | 517 ++++++++++ .../iceberg/connect/data/SchemaUpdate.java | 44 +- .../iceberg/connect/data/SchemaUtils.java | 14 +- .../iceberg/connect/data/Utilities.java | 20 +- .../connect/IcebergSinkConnectorTest.java | 4 +- .../connect/data/RecordConverterTest.java | 936 ++++++++++++++++++ 12 files changed, 1526 insertions(+), 65 deletions(-) create mode 100644 kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java create mode 100644 kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/RecordConverterTest.java diff --git a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/TableReference.java b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/TableReference.java index d1400f58f74c..50eaa1050485 100644 --- a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/TableReference.java +++ b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/TableReference.java @@ -18,10 +18,9 @@ */ package org.apache.iceberg.connect.events; -import static java.util.stream.Collectors.toList; - import java.util.Arrays; import java.util.List; +import java.util.stream.Collectors; import org.apache.avro.Schema; import org.apache.avro.generic.IndexedRecord; import org.apache.avro.util.Utf8; @@ -96,7 +95,9 @@ public void put(int i, Object v) { return; case NAMESPACE: this.namespace = - v == null ? null : ((List) v).stream().map(Utf8::toString).collect(toList()); + v == null + ? null + : ((List) v).stream().map(Utf8::toString).collect(Collectors.toList()); return; case NAME: this.name = v == null ? null : v.toString(); diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java index aa1ecdd5d1ba..d1572fbff37b 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java @@ -18,8 +18,6 @@ */ package org.apache.iceberg.connect; -import static java.util.stream.Collectors.toList; - import java.io.InputStream; import java.nio.file.Files; import java.nio.file.Paths; @@ -28,6 +26,7 @@ import java.util.Map; import java.util.Properties; import java.util.regex.Pattern; +import java.util.stream.Collectors; import org.apache.iceberg.IcebergBuild; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -72,7 +71,6 @@ public class IcebergSinkConfig extends AbstractConfig { private static final String TABLES_DEFAULT_COMMIT_BRANCH = "iceberg.tables.default-commit-branch"; private static final String TABLES_DEFAULT_ID_COLUMNS = "iceberg.tables.default-id-columns"; private static final String TABLES_DEFAULT_PARTITION_BY = "iceberg.tables.default-partition-by"; - // FIXME: add config for CDC and upsert mode private static final String TABLES_AUTO_CREATE_ENABLED_PROP = "iceberg.tables.auto-create-enabled"; private static final String TABLES_EVOLVE_SCHEMA_ENABLED_PROP = @@ -365,7 +363,7 @@ static List stringToList(String value, String regex) { return ImmutableList.of(); } - return Arrays.stream(value.split(regex)).map(String::trim).collect(toList()); + return Arrays.stream(value.split(regex)).map(String::trim).collect(Collectors.toList()); } public String controlTopic() { diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConnector.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConnector.java index 485b209302d5..8be8518f4407 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConnector.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConnector.java @@ -18,11 +18,10 @@ */ package org.apache.iceberg.connect; -import static java.util.stream.Collectors.toList; - import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.kafka.common.config.ConfigDef; @@ -60,7 +59,7 @@ public List> taskConfigs(int maxTasks) { map.put(IcebergSinkConfig.INTERNAL_TRANSACTIONAL_SUFFIX_PROP, txnSuffix + i); return map; }) - .collect(toList()); + .collect(Collectors.toList()); } @Override diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriter.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriter.java index da88b3b50ffe..27ffc4de9973 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriter.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriter.java @@ -38,8 +38,7 @@ public class IcebergWriter implements RecordWriter { private final IcebergSinkConfig config; private final List writerResults; - // FIXME: update this when the record converter is added - // private RecordConverter recordConverter; + private RecordConverter recordConverter; private TaskWriter writer; public IcebergWriter(Table table, String tableName, IcebergSinkConfig config) { @@ -52,19 +51,15 @@ public IcebergWriter(Table table, String tableName, IcebergSinkConfig config) { private void initNewWriter() { this.writer = Utilities.createTableWriter(table, tableName, config); - // FIXME: update this when the record converter is added - // this.recordConverter = new RecordConverter(table, config); + this.recordConverter = new RecordConverter(table, config); } @Override public void write(SinkRecord record) { try { - // TODO: config to handle tombstones instead of always ignoring? + // ignore tombstones... if (record.value() != null) { Record row = convertToRow(record); - - // FIXME: add CDC operation support - writer.write(row); } } catch (Exception e) { @@ -77,8 +72,25 @@ public void write(SinkRecord record) { } private Record convertToRow(SinkRecord record) { - // FIXME: update this when the record converter is added - return null; + if (!config.evolveSchemaEnabled()) { + return recordConverter.convert(record.value()); + } + + SchemaUpdate.Consumer updates = new SchemaUpdate.Consumer(); + Record row = recordConverter.convert(record.value(), updates); + + if (!updates.empty()) { + // complete the current file + flush(); + // apply the schema updates, this will refresh the table + SchemaUtils.applySchemaUpdates(table, updates); + // initialize a new writer with the new schema + initNewWriter(); + // convert the row again, this time using the new table schema + row = recordConverter.convert(record.value(), null); + } + + return row; } private void flush() { diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/NoOpWriter.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/NoOpWriter.java index 31abe09cf1a4..64ca44f03209 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/NoOpWriter.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/NoOpWriter.java @@ -21,7 +21,7 @@ import java.util.List; import org.apache.kafka.connect.sink.SinkRecord; -public class NoOpWriter implements RecordWriter { +class NoOpWriter implements RecordWriter { @Override public void write(SinkRecord record) { // NO-OP diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/PartitionedAppendWriter.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/PartitionedAppendWriter.java index 1d429e44e675..ad8b5715a99b 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/PartitionedAppendWriter.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/PartitionedAppendWriter.java @@ -29,12 +29,12 @@ import org.apache.iceberg.io.OutputFileFactory; import org.apache.iceberg.io.PartitionedFanoutWriter; -public class PartitionedAppendWriter extends PartitionedFanoutWriter { +class PartitionedAppendWriter extends PartitionedFanoutWriter { private final PartitionKey partitionKey; private final InternalRecordWrapper wrapper; - public PartitionedAppendWriter( + PartitionedAppendWriter( PartitionSpec spec, FileFormat format, FileAppenderFactory appenderFactory, diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java new file mode 100644 index 000000000000..406a2cba4526 --- /dev/null +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java @@ -0,0 +1,517 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.data; + +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.math.BigDecimal; +import java.math.RoundingMode; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeFormatterBuilder; +import java.time.format.DateTimeParseException; +import java.time.temporal.Temporal; +import java.util.Base64; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Collectors; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.connect.IcebergSinkConfig; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.mapping.MappedField; +import org.apache.iceberg.mapping.NameMapping; +import org.apache.iceberg.mapping.NameMappingParser; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Type.PrimitiveType; +import org.apache.iceberg.types.Types.DecimalType; +import org.apache.iceberg.types.Types.ListType; +import org.apache.iceberg.types.Types.MapType; +import org.apache.iceberg.types.Types.NestedField; +import org.apache.iceberg.types.Types.StructType; +import org.apache.iceberg.types.Types.TimestampType; +import org.apache.iceberg.util.DateTimeUtil; +import org.apache.kafka.connect.data.Struct; + +class RecordConverter { + + private static final ObjectMapper MAPPER = new ObjectMapper(); + + private static final DateTimeFormatter OFFSET_TIMESTAMP_FORMAT = + new DateTimeFormatterBuilder() + .append(DateTimeFormatter.ISO_LOCAL_DATE_TIME) + .appendOffset("+HHmm", "Z") + .toFormatter(); + + private final Schema tableSchema; + private final NameMapping nameMapping; + private final IcebergSinkConfig config; + private final Map> structNameMap = Maps.newHashMap(); + + RecordConverter(Table table, IcebergSinkConfig config) { + this.tableSchema = table.schema(); + this.nameMapping = createNameMapping(table); + this.config = config; + } + + Record convert(Object data) { + return convert(data, null); + } + + Record convert(Object data, SchemaUpdate.Consumer schemaUpdateConsumer) { + if (data instanceof Struct || data instanceof Map) { + return convertStructValue(data, tableSchema.asStruct(), -1, schemaUpdateConsumer); + } + throw new UnsupportedOperationException("Cannot convert type: " + data.getClass().getName()); + } + + private NameMapping createNameMapping(Table table) { + String nameMappingString = table.properties().get(TableProperties.DEFAULT_NAME_MAPPING); + return nameMappingString != null ? NameMappingParser.fromJson(nameMappingString) : null; + } + + private Object convertValue( + Object value, Type type, int fieldId, SchemaUpdate.Consumer schemaUpdateConsumer) { + if (value == null) { + return null; + } + switch (type.typeId()) { + case STRUCT: + return convertStructValue(value, type.asStructType(), fieldId, schemaUpdateConsumer); + case LIST: + return convertListValue(value, type.asListType(), schemaUpdateConsumer); + case MAP: + return convertMapValue(value, type.asMapType(), schemaUpdateConsumer); + case INTEGER: + return convertInt(value); + case LONG: + return convertLong(value); + case FLOAT: + return convertFloat(value); + case DOUBLE: + return convertDouble(value); + case DECIMAL: + return convertDecimal(value, (DecimalType) type); + case BOOLEAN: + return convertBoolean(value); + case STRING: + return convertString(value); + case UUID: + return convertUUID(value); + case BINARY: + case FIXED: + return convertBase64Binary(value); + case DATE: + return convertDateValue(value); + case TIME: + return convertTimeValue(value); + case TIMESTAMP: + return convertTimestampValue(value, (TimestampType) type); + } + throw new UnsupportedOperationException("Unsupported type: " + type.typeId()); + } + + protected GenericRecord convertStructValue( + Object value, + StructType schema, + int parentFieldId, + SchemaUpdate.Consumer schemaUpdateConsumer) { + if (value instanceof Map) { + return convertToStruct((Map) value, schema, parentFieldId, schemaUpdateConsumer); + } else if (value instanceof Struct) { + return convertToStruct((Struct) value, schema, parentFieldId, schemaUpdateConsumer); + } + throw new IllegalArgumentException("Cannot convert to struct: " + value.getClass().getName()); + } + + /** + * This method will be called for records when there is no record schema. Also, when there is no + * schema, we infer that map values are struct types. This method might also be called if the + * field value is a map but the Iceberg type is a struct. This can happen if the Iceberg table + * schema is not managed by the sink, i.e. created manually. + */ + private GenericRecord convertToStruct( + Map map, + StructType schema, + int structFieldId, + SchemaUpdate.Consumer schemaUpdateConsumer) { + GenericRecord result = GenericRecord.create(schema); + map.forEach( + (recordFieldNameObj, recordFieldValue) -> { + String recordFieldName = recordFieldNameObj.toString(); + NestedField tableField = lookupStructField(recordFieldName, schema, structFieldId); + if (tableField == null) { + // add the column if schema evolution is on, otherwise skip the value, + // skip the add column if we can't infer the type + if (schemaUpdateConsumer != null) { + Type type = SchemaUtils.inferIcebergType(recordFieldValue, config); + if (type != null) { + String parentFieldName = + structFieldId < 0 ? null : tableSchema.findColumnName(structFieldId); + schemaUpdateConsumer.addColumn(parentFieldName, recordFieldName, type); + } + } + } else { + result.setField( + tableField.name(), + convertValue( + recordFieldValue, + tableField.type(), + tableField.fieldId(), + schemaUpdateConsumer)); + } + }); + return result; + } + + /** This method will be called for records and struct values when there is a record schema. */ + private GenericRecord convertToStruct( + Struct struct, + StructType schema, + int structFieldId, + SchemaUpdate.Consumer schemaUpdateConsumer) { + GenericRecord result = GenericRecord.create(schema); + struct + .schema() + .fields() + .forEach( + recordField -> { + NestedField tableField = lookupStructField(recordField.name(), schema, structFieldId); + if (tableField == null) { + // add the column if schema evolution is on, otherwise skip the value + if (schemaUpdateConsumer != null) { + String parentFieldName = + structFieldId < 0 ? null : tableSchema.findColumnName(structFieldId); + Type type = SchemaUtils.toIcebergType(recordField.schema(), config); + schemaUpdateConsumer.addColumn(parentFieldName, recordField.name(), type); + } + } else { + boolean hasSchemaUpdates = false; + if (schemaUpdateConsumer != null) { + // update the type if needed and schema evolution is on + PrimitiveType evolveDataType = + SchemaUtils.needsDataTypeUpdate(tableField.type(), recordField.schema()); + if (evolveDataType != null) { + String fieldName = tableSchema.findColumnName(tableField.fieldId()); + schemaUpdateConsumer.updateType(fieldName, evolveDataType); + hasSchemaUpdates = true; + } + // make optional if needed and schema evolution is on + if (tableField.isRequired() && recordField.schema().isOptional()) { + String fieldName = tableSchema.findColumnName(tableField.fieldId()); + schemaUpdateConsumer.makeOptional(fieldName); + hasSchemaUpdates = true; + } + } + if (!hasSchemaUpdates) { + result.setField( + tableField.name(), + convertValue( + struct.get(recordField), + tableField.type(), + tableField.fieldId(), + schemaUpdateConsumer)); + } + } + }); + return result; + } + + private NestedField lookupStructField(String fieldName, StructType schema, int structFieldId) { + if (nameMapping == null) { + return config.schemaCaseInsensitive() + ? schema.caseInsensitiveField(fieldName) + : schema.field(fieldName); + } + + return structNameMap + .computeIfAbsent(structFieldId, notUsed -> createStructNameMap(schema)) + .get(fieldName); + } + + private Map createStructNameMap(StructType schema) { + Map map = Maps.newHashMap(); + schema + .fields() + .forEach( + col -> { + MappedField mappedField = nameMapping.find(col.fieldId()); + if (mappedField != null && !mappedField.names().isEmpty()) { + mappedField.names().forEach(name -> map.put(name, col)); + } else { + map.put(col.name(), col); + } + }); + return map; + } + + protected List convertListValue( + Object value, ListType type, SchemaUpdate.Consumer schemaUpdateConsumer) { + Preconditions.checkArgument(value instanceof List); + List list = (List) value; + return list.stream() + .map( + element -> { + int fieldId = type.fields().get(0).fieldId(); + return convertValue(element, type.elementType(), fieldId, schemaUpdateConsumer); + }) + .collect(Collectors.toList()); + } + + protected Map convertMapValue( + Object value, MapType type, SchemaUpdate.Consumer schemaUpdateConsumer) { + Preconditions.checkArgument(value instanceof Map); + Map map = (Map) value; + Map result = Maps.newHashMap(); + map.forEach( + (k, v) -> { + int keyFieldId = type.fields().get(0).fieldId(); + int valueFieldId = type.fields().get(1).fieldId(); + result.put( + convertValue(k, type.keyType(), keyFieldId, schemaUpdateConsumer), + convertValue(v, type.valueType(), valueFieldId, schemaUpdateConsumer)); + }); + return result; + } + + protected int convertInt(Object value) { + if (value instanceof Number) { + return ((Number) value).intValue(); + } else if (value instanceof String) { + return Integer.parseInt((String) value); + } + throw new IllegalArgumentException("Cannot convert to int: " + value.getClass().getName()); + } + + protected long convertLong(Object value) { + if (value instanceof Number) { + return ((Number) value).longValue(); + } else if (value instanceof String) { + return Long.parseLong((String) value); + } + throw new IllegalArgumentException("Cannot convert to long: " + value.getClass().getName()); + } + + protected float convertFloat(Object value) { + if (value instanceof Number) { + return ((Number) value).floatValue(); + } else if (value instanceof String) { + return Float.parseFloat((String) value); + } + throw new IllegalArgumentException("Cannot convert to float: " + value.getClass().getName()); + } + + protected double convertDouble(Object value) { + if (value instanceof Number) { + return ((Number) value).doubleValue(); + } else if (value instanceof String) { + return Double.parseDouble((String) value); + } + throw new IllegalArgumentException("Cannot convert to double: " + value.getClass().getName()); + } + + protected BigDecimal convertDecimal(Object value, DecimalType type) { + BigDecimal bigDecimal; + if (value instanceof BigDecimal) { + bigDecimal = (BigDecimal) value; + } else if (value instanceof Number) { + Number num = (Number) value; + Double dbl = num.doubleValue(); + if (dbl.equals(Math.floor(dbl))) { + bigDecimal = BigDecimal.valueOf(num.longValue()); + } else { + bigDecimal = BigDecimal.valueOf(dbl); + } + } else if (value instanceof String) { + bigDecimal = new BigDecimal((String) value); + } else { + throw new IllegalArgumentException( + "Cannot convert to BigDecimal: " + value.getClass().getName()); + } + return bigDecimal.setScale(type.scale(), RoundingMode.HALF_UP); + } + + protected boolean convertBoolean(Object value) { + if (value instanceof Boolean) { + return (boolean) value; + } else if (value instanceof String) { + return Boolean.parseBoolean((String) value); + } + throw new IllegalArgumentException("Cannot convert to boolean: " + value.getClass().getName()); + } + + protected String convertString(Object value) { + try { + if (value instanceof String) { + return (String) value; + } else if (value instanceof Number || value instanceof Boolean) { + return value.toString(); + } else if (value instanceof Map || value instanceof List) { + return MAPPER.writeValueAsString(value); + } else if (value instanceof Struct) { + Struct struct = (Struct) value; + byte[] data = config.jsonConverter().fromConnectData(null, struct.schema(), struct); + return new String(data, StandardCharsets.UTF_8); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + throw new IllegalArgumentException("Cannot convert to string: " + value.getClass().getName()); + } + + protected UUID convertUUID(Object value) { + if (value instanceof String) { + return UUID.fromString((String) value); + } else if (value instanceof UUID) { + return (UUID) value; + } + throw new IllegalArgumentException("Cannot convert to UUID: " + value.getClass().getName()); + } + + protected ByteBuffer convertBase64Binary(Object value) { + if (value instanceof String) { + return ByteBuffer.wrap(Base64.getDecoder().decode((String) value)); + } else if (value instanceof byte[]) { + return ByteBuffer.wrap((byte[]) value); + } else if (value instanceof ByteBuffer) { + return (ByteBuffer) value; + } + throw new IllegalArgumentException("Cannot convert to binary: " + value.getClass().getName()); + } + + @SuppressWarnings("JavaUtilDate") + protected LocalDate convertDateValue(Object value) { + if (value instanceof Number) { + int days = ((Number) value).intValue(); + return DateTimeUtil.dateFromDays(days); + } else if (value instanceof String) { + return LocalDate.parse((String) value); + } else if (value instanceof LocalDate) { + return (LocalDate) value; + } else if (value instanceof Date) { + int days = (int) (((Date) value).getTime() / 1000 / 60 / 60 / 24); + return DateTimeUtil.dateFromDays(days); + } + throw new RuntimeException("Cannot convert date: " + value); + } + + @SuppressWarnings("JavaUtilDate") + protected LocalTime convertTimeValue(Object value) { + if (value instanceof Number) { + long millis = ((Number) value).longValue(); + return DateTimeUtil.timeFromMicros(millis * 1000); + } else if (value instanceof String) { + return LocalTime.parse((String) value); + } else if (value instanceof LocalTime) { + return (LocalTime) value; + } else if (value instanceof Date) { + long millis = ((Date) value).getTime(); + return DateTimeUtil.timeFromMicros(millis * 1000); + } + throw new RuntimeException("Cannot convert time: " + value); + } + + protected Temporal convertTimestampValue(Object value, TimestampType type) { + if (type.shouldAdjustToUTC()) { + return convertOffsetDateTime(value); + } + return convertLocalDateTime(value); + } + + @SuppressWarnings("JavaUtilDate") + private OffsetDateTime convertOffsetDateTime(Object value) { + if (value instanceof Number) { + long millis = ((Number) value).longValue(); + return DateTimeUtil.timestamptzFromMicros(millis * 1000); + } else if (value instanceof String) { + return parseOffsetDateTime((String) value); + } else if (value instanceof OffsetDateTime) { + return (OffsetDateTime) value; + } else if (value instanceof LocalDateTime) { + return ((LocalDateTime) value).atOffset(ZoneOffset.UTC); + } else if (value instanceof Date) { + return DateTimeUtil.timestamptzFromMicros(((Date) value).getTime() * 1000); + } + throw new RuntimeException( + "Cannot convert timestamptz: " + value + ", type: " + value.getClass()); + } + + private OffsetDateTime parseOffsetDateTime(String str) { + String tsStr = ensureTimestampFormat(str); + try { + return OFFSET_TIMESTAMP_FORMAT.parse(tsStr, OffsetDateTime::from); + } catch (DateTimeParseException e) { + return LocalDateTime.parse(tsStr, DateTimeFormatter.ISO_LOCAL_DATE_TIME) + .atOffset(ZoneOffset.UTC); + } + } + + @SuppressWarnings("JavaUtilDate") + private LocalDateTime convertLocalDateTime(Object value) { + if (value instanceof Number) { + long millis = ((Number) value).longValue(); + return DateTimeUtil.timestampFromMicros(millis * 1000); + } else if (value instanceof String) { + return parseLocalDateTime((String) value); + } else if (value instanceof LocalDateTime) { + return (LocalDateTime) value; + } else if (value instanceof OffsetDateTime) { + return ((OffsetDateTime) value).toLocalDateTime(); + } else if (value instanceof Date) { + return DateTimeUtil.timestampFromMicros(((Date) value).getTime() * 1000); + } + throw new RuntimeException( + "Cannot convert timestamp: " + value + ", type: " + value.getClass()); + } + + private LocalDateTime parseLocalDateTime(String str) { + String tsStr = ensureTimestampFormat(str); + try { + return LocalDateTime.parse(tsStr, DateTimeFormatter.ISO_LOCAL_DATE_TIME); + } catch (DateTimeParseException e) { + return OFFSET_TIMESTAMP_FORMAT.parse(tsStr, OffsetDateTime::from).toLocalDateTime(); + } + } + + private String ensureTimestampFormat(String str) { + String result = str; + if (result.charAt(10) == ' ') { + result = result.substring(0, 10) + 'T' + result.substring(11); + } + if (result.length() > 22 + && (result.charAt(19) == '+' || result.charAt(19) == '-') + && result.charAt(22) == ':') { + result = result.substring(0, 19) + result.substring(19).replace(":", ""); + } + return result; + } +} diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SchemaUpdate.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SchemaUpdate.java index 2bb0e65f204b..809bea84dcc2 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SchemaUpdate.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SchemaUpdate.java @@ -24,97 +24,97 @@ import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Type.PrimitiveType; -public class SchemaUpdate { +class SchemaUpdate { - public static class Consumer { + static class Consumer { private final Map addColumns = Maps.newHashMap(); private final Map updateTypes = Maps.newHashMap(); private final Map makeOptionals = Maps.newHashMap(); - public Collection addColumns() { + Collection addColumns() { return addColumns.values(); } - public Collection updateTypes() { + Collection updateTypes() { return updateTypes.values(); } - public Collection makeOptionals() { + Collection makeOptionals() { return makeOptionals.values(); } - public boolean empty() { + boolean empty() { return addColumns.isEmpty() && updateTypes.isEmpty() && makeOptionals.isEmpty(); } - public void addColumn(String parentName, String name, Type type) { + void addColumn(String parentName, String name, Type type) { AddColumn addCol = new AddColumn(parentName, name, type); addColumns.put(addCol.key(), addCol); } - public void updateType(String name, PrimitiveType type) { + void updateType(String name, PrimitiveType type) { updateTypes.put(name, new UpdateType(name, type)); } - public void makeOptional(String name) { + void makeOptional(String name) { makeOptionals.put(name, new MakeOptional(name)); } } - public static class AddColumn extends SchemaUpdate { + static class AddColumn extends SchemaUpdate { private final String parentName; private final String name; private final Type type; - public AddColumn(String parentName, String name, Type type) { + AddColumn(String parentName, String name, Type type) { this.parentName = parentName; this.name = name; this.type = type; } - public String parentName() { + String parentName() { return parentName; } - public String name() { + String name() { return name; } - public String key() { + String key() { return parentName == null ? name : parentName + "." + name; } - public Type type() { + Type type() { return type; } } - public static class UpdateType extends SchemaUpdate { + static class UpdateType extends SchemaUpdate { private final String name; private final PrimitiveType type; - public UpdateType(String name, PrimitiveType type) { + UpdateType(String name, PrimitiveType type) { this.name = name; this.type = type; } - public String name() { + String name() { return name; } - public PrimitiveType type() { + PrimitiveType type() { return type; } } - public static class MakeOptional extends SchemaUpdate { + static class MakeOptional extends SchemaUpdate { private final String name; - public MakeOptional(String name) { + MakeOptional(String name) { this.name = name; } - public String name() { + String name() { return name; } } diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SchemaUtils.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SchemaUtils.java index 64fa89041c29..a2e0729fd506 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SchemaUtils.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SchemaUtils.java @@ -65,13 +65,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class SchemaUtils { +class SchemaUtils { private static final Logger LOG = LoggerFactory.getLogger(SchemaUtils.class); private static final Pattern TRANSFORM_REGEX = Pattern.compile("(\\w+)\\((.+)\\)"); - public static PrimitiveType needsDataTypeUpdate(Type currentIcebergType, Schema valueSchema) { + static PrimitiveType needsDataTypeUpdate(Type currentIcebergType, Schema valueSchema) { if (currentIcebergType.typeId() == TypeID.FLOAT && valueSchema.type() == Schema.Type.FLOAT64) { return DoubleType.get(); } @@ -81,7 +81,7 @@ public static PrimitiveType needsDataTypeUpdate(Type currentIcebergType, Schema return null; } - public static void applySchemaUpdates(Table table, SchemaUpdate.Consumer updates) { + static void applySchemaUpdates(Table table, SchemaUpdate.Consumer updates) { if (updates == null || updates.empty()) { // no updates to apply return; @@ -150,7 +150,7 @@ private static boolean isOptional(org.apache.iceberg.Schema schema, MakeOptional return field.isOptional(); } - public static PartitionSpec createPartitionSpec( + static PartitionSpec createPartitionSpec( org.apache.iceberg.Schema schema, List partitionBy) { if (partitionBy.isEmpty()) { return PartitionSpec.unpartitioned(); @@ -209,11 +209,11 @@ private static Pair transformArgPair(String argsStr) { return Pair.of(parts.get(0).trim(), Integer.parseInt(parts.get(1).trim())); } - public static Type toIcebergType(Schema valueSchema, IcebergSinkConfig config) { + static Type toIcebergType(Schema valueSchema, IcebergSinkConfig config) { return new SchemaGenerator(config).toIcebergType(valueSchema); } - public static Type inferIcebergType(Object value, IcebergSinkConfig config) { + static Type inferIcebergType(Object value, IcebergSinkConfig config) { return new SchemaGenerator(config).inferIcebergType(value); } @@ -290,7 +290,7 @@ Type toIcebergType(Schema valueSchema) { } @SuppressWarnings("checkstyle:CyclomaticComplexity") - public Type inferIcebergType(Object value) { + Type inferIcebergType(Object value) { if (value == null) { return null; } else if (value instanceof String) { diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/Utilities.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/Utilities.java index ec13b003a21a..4ff83f777527 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/Utilities.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/Utilities.java @@ -18,12 +18,6 @@ */ package org.apache.iceberg.connect.data; -import static java.util.stream.Collectors.toSet; -import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; -import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; -import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES; -import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT; - import java.io.IOException; import java.net.URL; import java.nio.file.Files; @@ -33,9 +27,11 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.stream.Collectors; import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.FileFormat; import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.common.DynClasses; import org.apache.iceberg.common.DynConstructors; @@ -175,12 +171,16 @@ public static TaskWriter createTableWriter( Map tableProps = Maps.newHashMap(table.properties()); tableProps.putAll(config.writeProps()); - String formatStr = tableProps.getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT); + String formatStr = + tableProps.getOrDefault( + TableProperties.DEFAULT_FILE_FORMAT, TableProperties.DEFAULT_FILE_FORMAT_DEFAULT); FileFormat format = FileFormat.fromString(formatStr); long targetFileSize = PropertyUtil.propertyAsLong( - tableProps, WRITE_TARGET_FILE_SIZE_BYTES, WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT); + tableProps, + TableProperties.WRITE_TARGET_FILE_SIZE_BYTES, + TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT); Set identifierFieldIds = table.schema().identifierFieldIds(); @@ -197,7 +197,7 @@ public static TaskWriter createTableWriter( } return field.fieldId(); }) - .collect(toSet()); + .collect(Collectors.toSet()); } FileAppenderFactory appenderFactory; @@ -224,8 +224,6 @@ public static TaskWriter createTableWriter( .format(format) .build(); - // FIXME: add delta writers - TaskWriter writer; if (table.spec().isUnpartitioned()) { writer = diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/IcebergSinkConnectorTest.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/IcebergSinkConnectorTest.java index 86502794b224..c8f563a13911 100644 --- a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/IcebergSinkConnectorTest.java +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/IcebergSinkConnectorTest.java @@ -18,7 +18,6 @@ */ package org.apache.iceberg.connect; -import static org.apache.iceberg.connect.IcebergSinkConfig.INTERNAL_TRANSACTIONAL_SUFFIX_PROP; import static org.assertj.core.api.Assertions.assertThat; import java.util.List; @@ -35,6 +34,7 @@ public void testTaskConfigs() { connector.start(ImmutableMap.of()); List> configs = connector.taskConfigs(3); assertThat(configs).hasSize(3); - configs.forEach(map -> assertThat(map).containsKey(INTERNAL_TRANSACTIONAL_SUFFIX_PROP)); + configs.forEach( + map -> assertThat(map).containsKey(IcebergSinkConfig.INTERNAL_TRANSACTIONAL_SUFFIX_PROP)); } } diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/RecordConverterTest.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/RecordConverterTest.java new file mode 100644 index 000000000000..b494a9da85d3 --- /dev/null +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/RecordConverterTest.java @@ -0,0 +1,936 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.data; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.fasterxml.jackson.databind.ObjectMapper; +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.time.temporal.Temporal; +import java.util.Base64; +import java.util.Collection; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.function.Function; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.connect.IcebergSinkConfig; +import org.apache.iceberg.connect.data.SchemaUpdate.AddColumn; +import org.apache.iceberg.connect.data.SchemaUpdate.UpdateType; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.mapping.MappedField; +import org.apache.iceberg.mapping.NameMapping; +import org.apache.iceberg.mapping.NameMappingParser; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types.BinaryType; +import org.apache.iceberg.types.Types.BooleanType; +import org.apache.iceberg.types.Types.DateType; +import org.apache.iceberg.types.Types.DecimalType; +import org.apache.iceberg.types.Types.DoubleType; +import org.apache.iceberg.types.Types.FixedType; +import org.apache.iceberg.types.Types.FloatType; +import org.apache.iceberg.types.Types.IntegerType; +import org.apache.iceberg.types.Types.ListType; +import org.apache.iceberg.types.Types.LongType; +import org.apache.iceberg.types.Types.MapType; +import org.apache.iceberg.types.Types.NestedField; +import org.apache.iceberg.types.Types.StringType; +import org.apache.iceberg.types.Types.StructType; +import org.apache.iceberg.types.Types.TimeType; +import org.apache.iceberg.types.Types.TimestampType; +import org.apache.iceberg.types.Types.UUIDType; +import org.apache.kafka.connect.data.Decimal; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.data.Time; +import org.apache.kafka.connect.data.Timestamp; +import org.apache.kafka.connect.json.JsonConverter; +import org.apache.kafka.connect.json.JsonConverterConfig; +import org.apache.kafka.connect.storage.ConverterConfig; +import org.apache.kafka.connect.storage.ConverterType; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +public class RecordConverterTest { + + private static final ObjectMapper MAPPER = new ObjectMapper(); + + private static final org.apache.iceberg.Schema SCHEMA = + new org.apache.iceberg.Schema( + NestedField.required(20, "i", IntegerType.get()), + NestedField.required(21, "l", LongType.get()), + NestedField.required(22, "d", DateType.get()), + NestedField.required(23, "t", TimeType.get()), + NestedField.required(24, "ts", TimestampType.withoutZone()), + NestedField.required(25, "tsz", TimestampType.withZone()), + NestedField.required(26, "fl", FloatType.get()), + NestedField.required(27, "do", DoubleType.get()), + NestedField.required(28, "dec", DecimalType.of(9, 2)), + NestedField.required(29, "s", StringType.get()), + NestedField.required(30, "b", BooleanType.get()), + NestedField.required(31, "u", UUIDType.get()), + NestedField.required(32, "f", FixedType.ofLength(3)), + NestedField.required(33, "bi", BinaryType.get()), + NestedField.required(34, "li", ListType.ofRequired(35, StringType.get())), + NestedField.required( + 36, "ma", MapType.ofRequired(37, 38, StringType.get(), StringType.get())), + NestedField.optional(39, "extra", StringType.get())); + + // we have 1 unmapped column so exclude that from the count + private static final int MAPPED_CNT = SCHEMA.columns().size() - 1; + + private static final org.apache.iceberg.Schema NESTED_SCHEMA = + new org.apache.iceberg.Schema( + NestedField.required(1, "ii", IntegerType.get()), + NestedField.required(2, "st", SCHEMA.asStruct())); + + private static final org.apache.iceberg.Schema SIMPLE_SCHEMA = + new org.apache.iceberg.Schema( + NestedField.required(1, "ii", IntegerType.get()), + NestedField.required(2, "st", StringType.get())); + + private static final org.apache.iceberg.Schema ID_SCHEMA = + new org.apache.iceberg.Schema(NestedField.required(1, "ii", IntegerType.get())); + + private static final org.apache.iceberg.Schema STRUCT_IN_LIST_SCHEMA = + new org.apache.iceberg.Schema( + NestedField.required(100, "stli", ListType.ofRequired(101, NESTED_SCHEMA.asStruct()))); + + private static final org.apache.iceberg.Schema STRUCT_IN_LIST_BASIC_SCHEMA = + new org.apache.iceberg.Schema( + NestedField.required(100, "stli", ListType.ofRequired(101, ID_SCHEMA.asStruct()))); + + private static final org.apache.iceberg.Schema STRUCT_IN_MAP_SCHEMA = + new org.apache.iceberg.Schema( + NestedField.required( + 100, + "stma", + MapType.ofRequired(101, 102, StringType.get(), NESTED_SCHEMA.asStruct()))); + + private static final org.apache.iceberg.Schema STRUCT_IN_MAP_BASIC_SCHEMA = + new org.apache.iceberg.Schema( + NestedField.required( + 100, "stma", MapType.ofRequired(101, 102, StringType.get(), ID_SCHEMA.asStruct()))); + + private static final Schema CONNECT_SCHEMA = + SchemaBuilder.struct() + .field("i", Schema.INT32_SCHEMA) + .field("l", Schema.INT64_SCHEMA) + .field("d", org.apache.kafka.connect.data.Date.SCHEMA) + .field("t", Time.SCHEMA) + .field("ts", Timestamp.SCHEMA) + .field("tsz", Timestamp.SCHEMA) + .field("fl", Schema.FLOAT32_SCHEMA) + .field("do", Schema.FLOAT64_SCHEMA) + .field("dec", Decimal.schema(2)) + .field("s", Schema.STRING_SCHEMA) + .field("b", Schema.BOOLEAN_SCHEMA) + .field("u", Schema.STRING_SCHEMA) + .field("f", Schema.BYTES_SCHEMA) + .field("bi", Schema.BYTES_SCHEMA) + .field("li", SchemaBuilder.array(Schema.STRING_SCHEMA)) + .field("ma", SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA)); + + private static final Schema CONNECT_NESTED_SCHEMA = + SchemaBuilder.struct().field("ii", Schema.INT32_SCHEMA).field("st", CONNECT_SCHEMA); + + private static final Schema CONNECT_STRUCT_IN_LIST_SCHEMA = + SchemaBuilder.struct().field("stli", SchemaBuilder.array(CONNECT_NESTED_SCHEMA)).build(); + + private static final Schema CONNECT_STRUCT_IN_MAP_SCHEMA = + SchemaBuilder.struct() + .field("stma", SchemaBuilder.map(Schema.STRING_SCHEMA, CONNECT_NESTED_SCHEMA)) + .build(); + + private static final LocalDate DATE_VAL = LocalDate.parse("2023-05-18"); + private static final LocalTime TIME_VAL = LocalTime.parse("07:14:21"); + private static final LocalDateTime TS_VAL = LocalDateTime.parse("2023-05-18T07:14:21"); + private static final OffsetDateTime TSZ_VAL = OffsetDateTime.parse("2023-05-18T07:14:21Z"); + private static final BigDecimal DEC_VAL = new BigDecimal("12.34"); + private static final String STR_VAL = "foobar"; + private static final UUID UUID_VAL = UUID.randomUUID(); + private static final ByteBuffer BYTES_VAL = ByteBuffer.wrap(new byte[] {1, 2, 3}); + private static final List LIST_VAL = ImmutableList.of("hello", "world"); + private static final Map MAP_VAL = ImmutableMap.of("one", "1", "two", "2"); + + private static final JsonConverter JSON_CONVERTER = new JsonConverter(); + + private IcebergSinkConfig config; + + @BeforeAll + public static void beforeAll() { + JSON_CONVERTER.configure( + ImmutableMap.of( + JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, + false, + ConverterConfig.TYPE_CONFIG, + ConverterType.VALUE.getName())); + } + + @BeforeEach + public void before() { + this.config = mock(IcebergSinkConfig.class); + when(config.jsonConverter()).thenReturn(JSON_CONVERTER); + } + + @Test + public void testMapConvert() { + Table table = mock(Table.class); + when(table.schema()).thenReturn(SCHEMA); + RecordConverter converter = new RecordConverter(table, config); + + Map data = createMapData(); + Record record = converter.convert(data); + assertRecordValues(record); + } + + @Test + public void testNestedMapConvert() { + Table table = mock(Table.class); + when(table.schema()).thenReturn(NESTED_SCHEMA); + RecordConverter converter = new RecordConverter(table, config); + + Map nestedData = createNestedMapData(); + Record record = converter.convert(nestedData); + assertNestedRecordValues(record); + } + + @Test + @SuppressWarnings("unchecked") + public void testMapToString() throws Exception { + Table table = mock(Table.class); + when(table.schema()).thenReturn(SIMPLE_SCHEMA); + RecordConverter converter = new RecordConverter(table, config); + + Map nestedData = createNestedMapData(); + Record record = converter.convert(nestedData); + + String str = (String) record.getField("st"); + Map map = (Map) MAPPER.readValue(str, Map.class); + assertThat(map).hasSize(MAPPED_CNT); + } + + @Test + public void testMapValueInListConvert() { + Table table = mock(Table.class); + when(table.schema()).thenReturn(STRUCT_IN_LIST_SCHEMA); + RecordConverter converter = new RecordConverter(table, config); + + Map data = createNestedMapData(); + Record record = converter.convert(ImmutableMap.of("stli", ImmutableList.of(data, data))); + List fieldVal = (List) record.getField("stli"); + + Record elementVal = (Record) fieldVal.get(0); + assertNestedRecordValues(elementVal); + } + + @Test + public void testMapValueInMapConvert() { + Table table = mock(Table.class); + when(table.schema()).thenReturn(STRUCT_IN_MAP_SCHEMA); + RecordConverter converter = new RecordConverter(table, config); + + Map data = createNestedMapData(); + Record record = + converter.convert(ImmutableMap.of("stma", ImmutableMap.of("key1", data, "key2", data))); + + Map fieldVal = (Map) record.getField("stma"); + Record mapVal = (Record) fieldVal.get("key1"); + assertNestedRecordValues(mapVal); + } + + @Test + public void testStructConvert() { + Table table = mock(Table.class); + when(table.schema()).thenReturn(SCHEMA); + RecordConverter converter = new RecordConverter(table, config); + + Struct data = createStructData(); + Record record = converter.convert(data); + assertRecordValues(record); + } + + @Test + public void testNestedStructConvert() { + Table table = mock(Table.class); + when(table.schema()).thenReturn(NESTED_SCHEMA); + RecordConverter converter = new RecordConverter(table, config); + + Struct nestedData = createNestedStructData(); + Record record = converter.convert(nestedData); + assertNestedRecordValues(record); + } + + @Test + @SuppressWarnings("unchecked") + public void testStructToString() throws Exception { + Table table = mock(Table.class); + when(table.schema()).thenReturn(SIMPLE_SCHEMA); + RecordConverter converter = new RecordConverter(table, config); + + Struct nestedData = createNestedStructData(); + Record record = converter.convert(nestedData); + + String str = (String) record.getField("st"); + Map map = (Map) MAPPER.readValue(str, Map.class); + assertThat(map).hasSize(MAPPED_CNT); + } + + @Test + public void testStructValueInListConvert() { + Table table = mock(Table.class); + when(table.schema()).thenReturn(STRUCT_IN_LIST_SCHEMA); + RecordConverter converter = new RecordConverter(table, config); + + Struct data = createNestedStructData(); + Struct struct = + new Struct(CONNECT_STRUCT_IN_LIST_SCHEMA).put("stli", ImmutableList.of(data, data)); + Record record = converter.convert(struct); + + List fieldVal = (List) record.getField("stli"); + Record elementVal = (Record) fieldVal.get(0); + assertNestedRecordValues(elementVal); + } + + @Test + public void testStructValueInMapConvert() { + Table table = mock(Table.class); + when(table.schema()).thenReturn(STRUCT_IN_MAP_SCHEMA); + RecordConverter converter = new RecordConverter(table, config); + + Struct data = createNestedStructData(); + Struct struct = + new Struct(CONNECT_STRUCT_IN_MAP_SCHEMA) + .put("stma", ImmutableMap.of("key1", data, "key2", data)); + Record record = converter.convert(struct); + + Map fieldVal = (Map) record.getField("stma"); + Record mapVal = (Record) fieldVal.get("key1"); + assertNestedRecordValues(mapVal); + } + + @Test + public void testNameMapping() { + Table table = mock(Table.class); + when(table.schema()).thenReturn(SIMPLE_SCHEMA); + + NameMapping nameMapping = NameMapping.of(MappedField.of(1, ImmutableList.of("renamed_ii"))); + when(table.properties()) + .thenReturn( + ImmutableMap.of( + TableProperties.DEFAULT_NAME_MAPPING, NameMappingParser.toJson(nameMapping))); + + RecordConverter converter = new RecordConverter(table, config); + + Map data = ImmutableMap.of("renamed_ii", 123); + Record record = converter.convert(data); + assertThat(record.getField("ii")).isEqualTo(123); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testCaseSensitivity(boolean caseInsensitive) { + Table table = mock(Table.class); + when(table.schema()).thenReturn(SIMPLE_SCHEMA); + + when(config.schemaCaseInsensitive()).thenReturn(caseInsensitive); + + RecordConverter converter = new RecordConverter(table, config); + + Map mapData = ImmutableMap.of("II", 123); + Record record1 = converter.convert(mapData); + + Struct structData = + new Struct(SchemaBuilder.struct().field("II", Schema.INT32_SCHEMA).build()).put("II", 123); + Record record2 = converter.convert(structData); + + if (caseInsensitive) { + assertThat(record1.getField("ii")).isEqualTo(123); + assertThat(record2.getField("ii")).isEqualTo(123); + } else { + assertThat(record1.getField("ii")).isEqualTo(null); + assertThat(record2.getField("ii")).isEqualTo(null); + } + } + + @Test + public void testIntConversion() { + Table table = mock(Table.class); + when(table.schema()).thenReturn(SIMPLE_SCHEMA); + + RecordConverter converter = new RecordConverter(table, config); + + int expectedInt = 123; + + ImmutableList.of("123", 123.0f, 123.0d, 123L, expectedInt) + .forEach( + input -> { + int val = converter.convertInt(input); + assertThat(val).isEqualTo(expectedInt); + }); + } + + @Test + public void testLongConversion() { + Table table = mock(Table.class); + when(table.schema()).thenReturn(SIMPLE_SCHEMA); + + RecordConverter converter = new RecordConverter(table, config); + + long expectedLong = 123L; + + ImmutableList.of("123", 123.0f, 123.0d, 123, expectedLong) + .forEach( + input -> { + long val = converter.convertLong(input); + assertThat(val).isEqualTo(expectedLong); + }); + } + + @Test + public void testFloatConversion() { + Table table = mock(Table.class); + when(table.schema()).thenReturn(SIMPLE_SCHEMA); + + RecordConverter converter = new RecordConverter(table, config); + + float expectedFloat = 123f; + + ImmutableList.of("123", 123, 123L, 123d, expectedFloat) + .forEach( + input -> { + float val = converter.convertFloat(input); + assertThat(val).isEqualTo(expectedFloat); + }); + } + + @Test + public void testDoubleConversion() { + Table table = mock(Table.class); + when(table.schema()).thenReturn(SIMPLE_SCHEMA); + + RecordConverter converter = new RecordConverter(table, config); + + double expectedDouble = 123d; + + ImmutableList.of("123", 123, 123L, 123f, expectedDouble) + .forEach( + input -> { + double val = converter.convertDouble(input); + assertThat(val).isEqualTo(expectedDouble); + }); + } + + @Test + public void testDecimalConversion() { + Table table = mock(Table.class); + when(table.schema()).thenReturn(SIMPLE_SCHEMA); + + RecordConverter converter = new RecordConverter(table, config); + + BigDecimal expected = new BigDecimal("123.45"); + + ImmutableList.of("123.45", 123.45d, expected) + .forEach( + input -> { + BigDecimal decimal = converter.convertDecimal(input, DecimalType.of(10, 2)); + assertThat(decimal).isEqualTo(expected); + }); + + BigDecimal expected2 = new BigDecimal(123); + + ImmutableList.of("123", 123, expected2) + .forEach( + input -> { + BigDecimal decimal = converter.convertDecimal(input, DecimalType.of(10, 0)); + assertThat(decimal).isEqualTo(expected2); + }); + } + + @Test + public void testDateConversion() { + Table table = mock(Table.class); + when(table.schema()).thenReturn(SIMPLE_SCHEMA); + RecordConverter converter = new RecordConverter(table, config); + + LocalDate expected = LocalDate.of(2023, 11, 15); + + List inputList = + ImmutableList.of( + "2023-11-15", + expected.toEpochDay(), + expected, + new Date(Duration.ofDays(expected.toEpochDay()).toMillis())); + + inputList.forEach( + input -> { + Temporal ts = converter.convertDateValue(input); + assertThat(ts).isEqualTo(expected); + }); + } + + @Test + public void testTimeConversion() { + Table table = mock(Table.class); + when(table.schema()).thenReturn(SIMPLE_SCHEMA); + RecordConverter converter = new RecordConverter(table, config); + + LocalTime expected = LocalTime.of(7, 51, 30, 888_000_000); + + List inputList = + ImmutableList.of( + "07:51:30.888", + expected.toNanoOfDay() / 1000 / 1000, + expected, + new Date(expected.toNanoOfDay() / 1000 / 1000)); + + inputList.forEach( + input -> { + Temporal ts = converter.convertTimeValue(input); + assertThat(ts).isEqualTo(expected); + }); + } + + @Test + public void testTimestampWithZoneConversion() { + OffsetDateTime expected = OffsetDateTime.parse("2023-05-18T11:22:33Z"); + long expectedMillis = expected.toInstant().toEpochMilli(); + assertTimestampConvert(expected, expectedMillis, TimestampType.withZone()); + + // zone should be respected + expected = OffsetDateTime.parse("2023-05-18T03:22:33-08:00"); + List additionalInput = + ImmutableList.of( + "2023-05-18T03:22:33-08", + "2023-05-18 03:22:33-08", + "2023-05-18T03:22:33-08:00", + "2023-05-18 03:22:33-08:00", + "2023-05-18T03:22:33-0800", + "2023-05-18 03:22:33-0800"); + assertTimestampConvert(expected, additionalInput, TimestampType.withZone()); + } + + @Test + public void testTimestampWithoutZoneConversion() { + LocalDateTime expected = LocalDateTime.parse("2023-05-18T11:22:33"); + long expectedMillis = expected.atZone(ZoneOffset.UTC).toInstant().toEpochMilli(); + assertTimestampConvert(expected, expectedMillis, TimestampType.withoutZone()); + + // zone should be ignored + List additionalInput = + ImmutableList.of( + "2023-05-18T11:22:33-08", + "2023-05-18 11:22:33-08", + "2023-05-18T11:22:33-08:00", + "2023-05-18 11:22:33-08:00", + "2023-05-18T11:22:33-0800", + "2023-05-18 11:22:33-0800"); + assertTimestampConvert(expected, additionalInput, TimestampType.withoutZone()); + } + + private void assertTimestampConvert(Temporal expected, long expectedMillis, TimestampType type) { + List inputList = + Lists.newArrayList( + "2023-05-18T11:22:33Z", + "2023-05-18 11:22:33Z", + "2023-05-18T11:22:33+00", + "2023-05-18 11:22:33+00", + "2023-05-18T11:22:33+00:00", + "2023-05-18 11:22:33+00:00", + "2023-05-18T11:22:33+0000", + "2023-05-18 11:22:33+0000", + "2023-05-18T11:22:33", + "2023-05-18 11:22:33", + expectedMillis, + new Date(expectedMillis), + OffsetDateTime.ofInstant(Instant.ofEpochMilli(expectedMillis), ZoneOffset.UTC), + LocalDateTime.ofInstant(Instant.ofEpochMilli(expectedMillis), ZoneOffset.UTC)); + + assertTimestampConvert(expected, inputList, type); + } + + private void assertTimestampConvert( + Temporal expected, List inputList, TimestampType type) { + Table table = mock(Table.class); + when(table.schema()).thenReturn(SIMPLE_SCHEMA); + RecordConverter converter = new RecordConverter(table, config); + + inputList.forEach( + input -> { + Temporal ts = converter.convertTimestampValue(input, type); + assertThat(ts).isEqualTo(expected); + }); + } + + @Test + public void testMissingColumnDetectionMap() { + Table table = mock(Table.class); + when(table.schema()).thenReturn(ID_SCHEMA); + RecordConverter converter = new RecordConverter(table, config); + + Map data = Maps.newHashMap(createMapData()); + data.put("null", null); + + SchemaUpdate.Consumer consumer = new SchemaUpdate.Consumer(); + converter.convert(data, consumer); + Collection addCols = consumer.addColumns(); + + assertThat(addCols).hasSize(MAPPED_CNT); + + Map newColMap = Maps.newHashMap(); + addCols.forEach(addCol -> newColMap.put(addCol.name(), addCol)); + + assertTypesAddedFromMap(col -> newColMap.get(col).type()); + + // null values should be ignored + assertThat(newColMap).doesNotContainKey("null"); + } + + @Test + public void testMissingColumnDetectionMapNested() { + Table table = mock(Table.class); + when(table.schema()).thenReturn(ID_SCHEMA); + RecordConverter converter = new RecordConverter(table, config); + + Map nestedData = createNestedMapData(); + SchemaUpdate.Consumer consumer = new SchemaUpdate.Consumer(); + converter.convert(nestedData, consumer); + Collection addCols = consumer.addColumns(); + + assertThat(addCols).hasSize(1); + + AddColumn addCol = addCols.iterator().next(); + assertThat(addCol.name()).isEqualTo("st"); + + StructType addedType = addCol.type().asStructType(); + assertThat(addedType.fields()).hasSize(MAPPED_CNT); + assertTypesAddedFromMap(col -> addedType.field(col).type()); + } + + @Test + public void testMissingColumnDetectionMapListValue() { + Table table = mock(Table.class); + when(table.schema()).thenReturn(STRUCT_IN_LIST_BASIC_SCHEMA); + RecordConverter converter = new RecordConverter(table, config); + + Map nestedData = createNestedMapData(); + Map map = ImmutableMap.of("stli", ImmutableList.of(nestedData, nestedData)); + SchemaUpdate.Consumer consumer = new SchemaUpdate.Consumer(); + converter.convert(map, consumer); + Collection addCols = consumer.addColumns(); + + assertThat(addCols).hasSize(1); + + AddColumn addCol = addCols.iterator().next(); + assertThat(addCol.parentName()).isEqualTo("stli.element"); + assertThat(addCol.name()).isEqualTo("st"); + + StructType nestedElementType = addCol.type().asStructType(); + assertThat(nestedElementType.fields()).hasSize(MAPPED_CNT); + assertTypesAddedFromMap(col -> nestedElementType.field(col).type()); + } + + private void assertTypesAddedFromMap(Function fn) { + assertThat(fn.apply("i")).isInstanceOf(LongType.class); + assertThat(fn.apply("l")).isInstanceOf(LongType.class); + assertThat(fn.apply("d")).isInstanceOf(StringType.class); + assertThat(fn.apply("t")).isInstanceOf(StringType.class); + assertThat(fn.apply("ts")).isInstanceOf(StringType.class); + assertThat(fn.apply("tsz")).isInstanceOf(StringType.class); + assertThat(fn.apply("fl")).isInstanceOf(DoubleType.class); + assertThat(fn.apply("do")).isInstanceOf(DoubleType.class); + assertThat(fn.apply("dec")).isInstanceOf(StringType.class); + assertThat(fn.apply("s")).isInstanceOf(StringType.class); + assertThat(fn.apply("b")).isInstanceOf(BooleanType.class); + assertThat(fn.apply("u")).isInstanceOf(StringType.class); + assertThat(fn.apply("f")).isInstanceOf(StringType.class); + assertThat(fn.apply("bi")).isInstanceOf(StringType.class); + assertThat(fn.apply("li")).isInstanceOf(ListType.class); + assertThat(fn.apply("ma")).isInstanceOf(StructType.class); + } + + @Test + public void testMissingColumnDetectionStruct() { + Table table = mock(Table.class); + when(table.schema()).thenReturn(ID_SCHEMA); + RecordConverter converter = new RecordConverter(table, config); + + Struct data = createStructData(); + SchemaUpdate.Consumer consumer = new SchemaUpdate.Consumer(); + converter.convert(data, consumer); + Collection addCols = consumer.addColumns(); + + assertThat(addCols).hasSize(MAPPED_CNT); + + Map newColMap = Maps.newHashMap(); + addCols.forEach(addCol -> newColMap.put(addCol.name(), addCol)); + + assertTypesAddedFromStruct(col -> newColMap.get(col).type()); + } + + @Test + public void testMissingColumnDetectionStructNested() { + Table table = mock(Table.class); + when(table.schema()).thenReturn(ID_SCHEMA); + RecordConverter converter = new RecordConverter(table, config); + + Struct nestedData = createNestedStructData(); + SchemaUpdate.Consumer consumer = new SchemaUpdate.Consumer(); + converter.convert(nestedData, consumer); + Collection addCols = consumer.addColumns(); + + assertThat(addCols).hasSize(1); + + AddColumn addCol = addCols.iterator().next(); + assertThat(addCol.name()).isEqualTo("st"); + + StructType addedType = addCol.type().asStructType(); + assertThat(addedType.fields()).hasSize(MAPPED_CNT); + assertTypesAddedFromStruct(col -> addedType.field(col).type()); + } + + @Test + public void testMissingColumnDetectionStructListValue() { + Table table = mock(Table.class); + when(table.schema()).thenReturn(STRUCT_IN_LIST_BASIC_SCHEMA); + RecordConverter converter = new RecordConverter(table, config); + + Struct nestedData = createNestedStructData(); + Struct struct = + new Struct(CONNECT_STRUCT_IN_LIST_SCHEMA) + .put("stli", ImmutableList.of(nestedData, nestedData)); + SchemaUpdate.Consumer consumer = new SchemaUpdate.Consumer(); + converter.convert(struct, consumer); + Collection addCols = consumer.addColumns(); + + assertThat(addCols).hasSize(1); + + AddColumn addCol = addCols.iterator().next(); + assertThat(addCol.parentName()).isEqualTo("stli.element"); + assertThat(addCol.name()).isEqualTo("st"); + + StructType nestedElementType = addCol.type().asStructType(); + assertThat(nestedElementType.fields()).hasSize(MAPPED_CNT); + assertTypesAddedFromStruct(col -> nestedElementType.field(col).type()); + } + + @Test + public void testMissingColumnDetectionStructMapValue() { + Table table = mock(Table.class); + when(table.schema()).thenReturn(STRUCT_IN_MAP_BASIC_SCHEMA); + RecordConverter converter = new RecordConverter(table, config); + + Struct nestedData = createNestedStructData(); + Struct struct = + new Struct(CONNECT_STRUCT_IN_MAP_SCHEMA) + .put("stma", ImmutableMap.of("key1", nestedData, "key2", nestedData)); + SchemaUpdate.Consumer consumer = new SchemaUpdate.Consumer(); + converter.convert(struct, consumer); + Collection addCols = consumer.addColumns(); + + assertThat(addCols).hasSize(1); + + AddColumn addCol = addCols.iterator().next(); + assertThat(addCol.parentName()).isEqualTo("stma.value"); + assertThat(addCol.name()).isEqualTo("st"); + + StructType nestedValueType = addCol.type().asStructType(); + assertThat(nestedValueType.fields()).hasSize(MAPPED_CNT); + assertTypesAddedFromStruct(col -> nestedValueType.field(col).type()); + } + + private void assertTypesAddedFromStruct(Function fn) { + assertThat(fn.apply("i")).isInstanceOf(IntegerType.class); + assertThat(fn.apply("l")).isInstanceOf(LongType.class); + assertThat(fn.apply("d")).isInstanceOf(DateType.class); + assertThat(fn.apply("t")).isInstanceOf(TimeType.class); + assertThat(fn.apply("ts")).isInstanceOf(TimestampType.class); + assertThat(fn.apply("tsz")).isInstanceOf(TimestampType.class); + assertThat(fn.apply("fl")).isInstanceOf(FloatType.class); + assertThat(fn.apply("do")).isInstanceOf(DoubleType.class); + assertThat(fn.apply("dec")).isInstanceOf(DecimalType.class); + assertThat(fn.apply("s")).isInstanceOf(StringType.class); + assertThat(fn.apply("b")).isInstanceOf(BooleanType.class); + assertThat(fn.apply("u")).isInstanceOf(StringType.class); + assertThat(fn.apply("f")).isInstanceOf(BinaryType.class); + assertThat(fn.apply("bi")).isInstanceOf(BinaryType.class); + assertThat(fn.apply("li")).isInstanceOf(ListType.class); + assertThat(fn.apply("ma")).isInstanceOf(MapType.class); + } + + @Test + public void testEvolveTypeDetectionStruct() { + org.apache.iceberg.Schema tableSchema = + new org.apache.iceberg.Schema( + NestedField.required(1, "ii", IntegerType.get()), + NestedField.required(2, "ff", FloatType.get())); + + Table table = mock(Table.class); + when(table.schema()).thenReturn(tableSchema); + RecordConverter converter = new RecordConverter(table, config); + + Schema valueSchema = + SchemaBuilder.struct().field("ii", Schema.INT64_SCHEMA).field("ff", Schema.FLOAT64_SCHEMA); + Struct data = new Struct(valueSchema).put("ii", 11L).put("ff", 22d); + + SchemaUpdate.Consumer consumer = new SchemaUpdate.Consumer(); + converter.convert(data, consumer); + Collection updates = consumer.updateTypes(); + + assertThat(updates).hasSize(2); + + Map updateMap = Maps.newHashMap(); + updates.forEach(update -> updateMap.put(update.name(), update)); + + assertThat(updateMap.get("ii").type()).isInstanceOf(LongType.class); + assertThat(updateMap.get("ff").type()).isInstanceOf(DoubleType.class); + } + + @Test + public void testEvolveTypeDetectionStructNested() { + org.apache.iceberg.Schema structColSchema = + new org.apache.iceberg.Schema( + NestedField.required(1, "ii", IntegerType.get()), + NestedField.required(2, "ff", FloatType.get())); + + org.apache.iceberg.Schema tableSchema = + new org.apache.iceberg.Schema( + NestedField.required(3, "i", IntegerType.get()), + NestedField.required(4, "st", structColSchema.asStruct())); + + Table table = mock(Table.class); + when(table.schema()).thenReturn(tableSchema); + RecordConverter converter = new RecordConverter(table, config); + + Schema structSchema = + SchemaBuilder.struct().field("ii", Schema.INT64_SCHEMA).field("ff", Schema.FLOAT64_SCHEMA); + Schema schema = + SchemaBuilder.struct().field("i", Schema.INT32_SCHEMA).field("st", structSchema); + Struct structValue = new Struct(structSchema).put("ii", 11L).put("ff", 22d); + Struct data = new Struct(schema).put("i", 1).put("st", structValue); + + SchemaUpdate.Consumer consumer = new SchemaUpdate.Consumer(); + converter.convert(data, consumer); + Collection updates = consumer.updateTypes(); + + assertThat(updates).hasSize(2); + + Map updateMap = Maps.newHashMap(); + updates.forEach(update -> updateMap.put(update.name(), update)); + + assertThat(updateMap.get("st.ii").type()).isInstanceOf(LongType.class); + assertThat(updateMap.get("st.ff").type()).isInstanceOf(DoubleType.class); + } + + private Map createMapData() { + return ImmutableMap.builder() + .put("i", 1) + .put("l", 2L) + .put("d", DATE_VAL.toString()) + .put("t", TIME_VAL.toString()) + .put("ts", TS_VAL.toString()) + .put("tsz", TSZ_VAL.toString()) + .put("fl", 1.1f) + .put("do", 2.2d) + .put("dec", DEC_VAL.toString()) + .put("s", STR_VAL) + .put("b", true) + .put("u", UUID_VAL.toString()) + .put("f", Base64.getEncoder().encodeToString(BYTES_VAL.array())) + .put("bi", Base64.getEncoder().encodeToString(BYTES_VAL.array())) + .put("li", LIST_VAL) + .put("ma", MAP_VAL) + .build(); + } + + private Map createNestedMapData() { + return ImmutableMap.builder().put("ii", 11).put("st", createMapData()).build(); + } + + private Struct createStructData() { + return new Struct(CONNECT_SCHEMA) + .put("i", 1) + .put("l", 2L) + .put("d", new Date(DATE_VAL.toEpochDay() * 24 * 60 * 60 * 1000L)) + .put("t", new Date(TIME_VAL.toNanoOfDay() / 1_000_000)) + .put("ts", Date.from(TS_VAL.atZone(ZoneOffset.UTC).toInstant())) + .put("tsz", Date.from(TSZ_VAL.toInstant())) + .put("fl", 1.1f) + .put("do", 2.2d) + .put("dec", DEC_VAL) + .put("s", STR_VAL) + .put("b", true) + .put("u", UUID_VAL.toString()) + .put("f", BYTES_VAL.array()) + .put("bi", BYTES_VAL.array()) + .put("li", LIST_VAL) + .put("ma", MAP_VAL); + } + + private Struct createNestedStructData() { + return new Struct(CONNECT_NESTED_SCHEMA).put("ii", 11).put("st", createStructData()); + } + + private void assertRecordValues(Record record) { + GenericRecord rec = (GenericRecord) record; + assertThat(rec.getField("i")).isEqualTo(1); + assertThat(rec.getField("l")).isEqualTo(2L); + assertThat(rec.getField("d")).isEqualTo(DATE_VAL); + assertThat(rec.getField("t")).isEqualTo(TIME_VAL); + assertThat(rec.getField("ts")).isEqualTo(TS_VAL); + assertThat(rec.getField("tsz")).isEqualTo(TSZ_VAL); + assertThat(rec.getField("fl")).isEqualTo(1.1f); + assertThat(rec.getField("do")).isEqualTo(2.2d); + assertThat(rec.getField("dec")).isEqualTo(DEC_VAL); + assertThat(rec.getField("s")).isEqualTo(STR_VAL); + assertThat(rec.getField("b")).isEqualTo(true); + assertThat(rec.getField("u")).isEqualTo(UUID_VAL); + assertThat(rec.getField("f")).isEqualTo(BYTES_VAL); + assertThat(rec.getField("bi")).isEqualTo(BYTES_VAL); + assertThat(rec.getField("li")).isEqualTo(LIST_VAL); + assertThat(rec.getField("ma")).isEqualTo(MAP_VAL); + } + + private void assertNestedRecordValues(Record record) { + GenericRecord rec = (GenericRecord) record; + assertThat(rec.getField("ii")).isEqualTo(11); + assertRecordValues((GenericRecord) rec.getField("st")); + } +}