diff --git a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/TableReference.java b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/TableReference.java index d1400f58f74c..50eaa1050485 100644 --- a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/TableReference.java +++ b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/TableReference.java @@ -18,10 +18,9 @@ */ package org.apache.iceberg.connect.events; -import static java.util.stream.Collectors.toList; - import java.util.Arrays; import java.util.List; +import java.util.stream.Collectors; import org.apache.avro.Schema; import org.apache.avro.generic.IndexedRecord; import org.apache.avro.util.Utf8; @@ -96,7 +95,9 @@ public void put(int i, Object v) { return; case NAMESPACE: this.namespace = - v == null ? null : ((List) v).stream().map(Utf8::toString).collect(toList()); + v == null + ? null + : ((List) v).stream().map(Utf8::toString).collect(Collectors.toList()); return; case NAME: this.name = v == null ? null : v.toString(); diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java index aa1ecdd5d1ba..dd9f9e401446 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java @@ -18,8 +18,6 @@ */ package org.apache.iceberg.connect; -import static java.util.stream.Collectors.toList; - import java.io.InputStream; import java.nio.file.Files; import java.nio.file.Paths; @@ -28,6 +26,7 @@ import java.util.Map; import java.util.Properties; import java.util.regex.Pattern; +import java.util.stream.Collectors; import org.apache.iceberg.IcebergBuild; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -72,7 +71,9 @@ public class IcebergSinkConfig extends AbstractConfig { private static final String TABLES_DEFAULT_COMMIT_BRANCH = "iceberg.tables.default-commit-branch"; private static final String TABLES_DEFAULT_ID_COLUMNS = "iceberg.tables.default-id-columns"; private static final String TABLES_DEFAULT_PARTITION_BY = "iceberg.tables.default-partition-by"; - // FIXME: add config for CDC and upsert mode + private static final String TABLES_CDC_FIELD_PROP = "iceberg.tables.cdc-field"; + private static final String TABLES_UPSERT_MODE_ENABLED_PROP = + "iceberg.tables.upsert-mode-enabled"; private static final String TABLES_AUTO_CREATE_ENABLED_PROP = "iceberg.tables.auto-create-enabled"; private static final String TABLES_EVOLVE_SCHEMA_ENABLED_PROP = @@ -151,6 +152,18 @@ private static ConfigDef newConfigDef() { null, Importance.MEDIUM, "Default partition spec to use when creating tables, comma-separated"); + configDef.define( + TABLES_CDC_FIELD_PROP, + ConfigDef.Type.STRING, + null, + Importance.MEDIUM, + "Source record field that identifies the type of operation (insert, update, or delete)"); + configDef.define( + TABLES_UPSERT_MODE_ENABLED_PROP, + ConfigDef.Type.BOOLEAN, + false, + Importance.MEDIUM, + "Set to true to treat all appends as upserts, false otherwise"); configDef.define( TABLES_AUTO_CREATE_ENABLED_PROP, ConfigDef.Type.BOOLEAN, @@ -365,7 +378,7 @@ static List stringToList(String value, String regex) { return ImmutableList.of(); } - return Arrays.stream(value.split(regex)).map(String::trim).collect(toList()); + return Arrays.stream(value.split(regex)).map(String::trim).collect(Collectors.toList()); } public String controlTopic() { @@ -409,6 +422,14 @@ public String hadoopConfDir() { return getString(HADDOP_CONF_DIR_PROP); } + public String tablesCdcField() { + return getString(TABLES_CDC_FIELD_PROP); + } + + public boolean upsertModeEnabled() { + return getBoolean(TABLES_UPSERT_MODE_ENABLED_PROP); + } + public boolean autoCreateEnabled() { return getBoolean(TABLES_AUTO_CREATE_ENABLED_PROP); } diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConnector.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConnector.java index 485b209302d5..8be8518f4407 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConnector.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConnector.java @@ -18,11 +18,10 @@ */ package org.apache.iceberg.connect; -import static java.util.stream.Collectors.toList; - import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.kafka.common.config.ConfigDef; @@ -60,7 +59,7 @@ public List> taskConfigs(int maxTasks) { map.put(IcebergSinkConfig.INTERNAL_TRANSACTIONAL_SUFFIX_PROP, txnSuffix + i); return map; }) - .collect(toList()); + .collect(Collectors.toList()); } @Override diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/BaseDeltaTaskWriter.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/BaseDeltaTaskWriter.java new file mode 100644 index 000000000000..09cedea693f1 --- /dev/null +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/BaseDeltaTaskWriter.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.data; + +import java.io.IOException; +import java.util.Set; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionKey; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.data.InternalRecordWrapper; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.BaseTaskWriter; +import org.apache.iceberg.io.FileAppenderFactory; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.TypeUtil; + +abstract class BaseDeltaTaskWriter extends BaseTaskWriter { + + private final Schema schema; + private final Schema deleteSchema; + private final InternalRecordWrapper wrapper; + private final InternalRecordWrapper keyWrapper; + private final RecordProjection keyProjection; + private final boolean upsertMode; + + BaseDeltaTaskWriter( + PartitionSpec spec, + FileFormat format, + FileAppenderFactory appenderFactory, + OutputFileFactory fileFactory, + FileIO io, + long targetFileSize, + Schema schema, + Set identifierFieldIds, + boolean upsertMode) { + super(spec, format, appenderFactory, fileFactory, io, targetFileSize); + this.schema = schema; + this.deleteSchema = TypeUtil.select(schema, Sets.newHashSet(identifierFieldIds)); + this.wrapper = new InternalRecordWrapper(schema.asStruct()); + this.keyWrapper = new InternalRecordWrapper(deleteSchema.asStruct()); + this.keyProjection = RecordProjection.create(schema, deleteSchema); + this.upsertMode = upsertMode; + } + + abstract RowDataDeltaWriter route(Record row); + + InternalRecordWrapper wrapper() { + return wrapper; + } + + @Override + public void write(Record row) throws IOException { + Operation op = + row instanceof RecordWrapper + ? ((RecordWrapper) row).op() + : upsertMode ? Operation.UPDATE : Operation.INSERT; + RowDataDeltaWriter writer = route(row); + if (op == Operation.UPDATE || op == Operation.DELETE) { + writer.deleteKey(keyProjection.wrap(row)); + } + if (op == Operation.UPDATE || op == Operation.INSERT) { + writer.write(row); + } + } + + class RowDataDeltaWriter extends BaseEqualityDeltaWriter { + + RowDataDeltaWriter(PartitionKey partition) { + super(partition, schema, deleteSchema); + } + + @Override + protected StructLike asStructLike(Record data) { + return wrapper.wrap(data); + } + + @Override + protected StructLike asStructLikeKey(Record data) { + return keyWrapper.wrap(data); + } + } +} diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriter.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriter.java index da88b3b50ffe..d0c61e016889 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriter.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriter.java @@ -38,8 +38,7 @@ public class IcebergWriter implements RecordWriter { private final IcebergSinkConfig config; private final List writerResults; - // FIXME: update this when the record converter is added - // private RecordConverter recordConverter; + private RecordConverter recordConverter; private TaskWriter writer; public IcebergWriter(Table table, String tableName, IcebergSinkConfig config) { @@ -52,20 +51,23 @@ public IcebergWriter(Table table, String tableName, IcebergSinkConfig config) { private void initNewWriter() { this.writer = Utilities.createTableWriter(table, tableName, config); - // FIXME: update this when the record converter is added - // this.recordConverter = new RecordConverter(table, config); + this.recordConverter = new RecordConverter(table, config); } @Override public void write(SinkRecord record) { try { + // TODO: config to handle tombstones instead of always ignoring? // TODO: config to handle tombstones instead of always ignoring? if (record.value() != null) { Record row = convertToRow(record); - - // FIXME: add CDC operation support - - writer.write(row); + String cdcField = config.tablesCdcField(); + if (cdcField == null) { + writer.write(row); + } else { + Operation op = extractCdcOperation(record.value(), cdcField); + writer.write(new RecordWrapper(row, op)); + } } } catch (Exception e) { throw new DataException( @@ -77,8 +79,49 @@ public void write(SinkRecord record) { } private Record convertToRow(SinkRecord record) { - // FIXME: update this when the record converter is added - return null; + if (!config.evolveSchemaEnabled()) { + return recordConverter.convert(record.value()); + } + + SchemaUpdate.Consumer updates = new SchemaUpdate.Consumer(); + Record row = recordConverter.convert(record.value(), updates); + + if (!updates.empty()) { + // complete the current file + flush(); + // apply the schema updates, this will refresh the table + SchemaUtils.applySchemaUpdates(table, updates); + // initialize a new writer with the new schema + initNewWriter(); + // convert the row again, this time using the new table schema + row = recordConverter.convert(record.value(), null); + } + + return row; + } + + private Operation extractCdcOperation(Object recordValue, String cdcField) { + Object opValue = Utilities.extractFromRecordValue(recordValue, cdcField); + + if (opValue == null) { + return Operation.INSERT; + } + + String opStr = opValue.toString().trim().toUpperCase(); + if (opStr.isEmpty()) { + return Operation.INSERT; + } + + // TODO: define value mapping in config? + + switch (opStr.charAt(0)) { + case 'U': + return Operation.UPDATE; + case 'D': + return Operation.DELETE; + default: + return Operation.INSERT; + } } private void flush() { diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/Operation.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/Operation.java new file mode 100644 index 000000000000..7f428a6dc2b6 --- /dev/null +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/Operation.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.data; + +public enum Operation { + INSERT, + UPDATE, + DELETE +} diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/PartitionedDeltaWriter.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/PartitionedDeltaWriter.java new file mode 100644 index 000000000000..eae5d116c6c1 --- /dev/null +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/PartitionedDeltaWriter.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.data; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Map; +import java.util.Set; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionKey; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.FileAppenderFactory; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.util.Tasks; + +public class PartitionedDeltaWriter extends BaseDeltaTaskWriter { + private final PartitionKey partitionKey; + + private final Map writers = Maps.newHashMap(); + + PartitionedDeltaWriter( + PartitionSpec spec, + FileFormat format, + FileAppenderFactory appenderFactory, + OutputFileFactory fileFactory, + FileIO io, + long targetFileSize, + Schema schema, + Set identifierFieldIds, + boolean upsertMode) { + super( + spec, + format, + appenderFactory, + fileFactory, + io, + targetFileSize, + schema, + identifierFieldIds, + upsertMode); + this.partitionKey = new PartitionKey(spec, schema); + } + + @Override + RowDataDeltaWriter route(Record row) { + partitionKey.partition(wrapper().wrap(row)); + + RowDataDeltaWriter writer = writers.get(partitionKey); + if (writer == null) { + // NOTE: pass a copy of the partition key to the writer to prevent it from + // being modified + PartitionKey copiedKey = partitionKey.copy(); + writer = new RowDataDeltaWriter(copiedKey); + writers.put(copiedKey, writer); + } + + return writer; + } + + @Override + public void close() { + try { + Tasks.foreach(writers.values()) + .throwFailureWhenFinished() + .noRetry() + .run(RowDataDeltaWriter::close, IOException.class); + + writers.clear(); + } catch (IOException e) { + throw new UncheckedIOException("Failed to close equality delta writer", e); + } + } +} diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java new file mode 100644 index 000000000000..7c6ae76e9b49 --- /dev/null +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java @@ -0,0 +1,508 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.data; + +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.math.BigDecimal; +import java.math.RoundingMode; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeFormatterBuilder; +import java.time.format.DateTimeParseException; +import java.time.temporal.Temporal; +import java.util.Base64; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Collectors; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.connect.IcebergSinkConfig; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.mapping.MappedField; +import org.apache.iceberg.mapping.NameMapping; +import org.apache.iceberg.mapping.NameMappingParser; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Type.PrimitiveType; +import org.apache.iceberg.types.Types.DecimalType; +import org.apache.iceberg.types.Types.ListType; +import org.apache.iceberg.types.Types.MapType; +import org.apache.iceberg.types.Types.NestedField; +import org.apache.iceberg.types.Types.StructType; +import org.apache.iceberg.types.Types.TimestampType; +import org.apache.iceberg.util.DateTimeUtil; +import org.apache.kafka.connect.data.Struct; + +public class RecordConverter { + + private static final ObjectMapper MAPPER = new ObjectMapper(); + + private static final DateTimeFormatter OFFSET_TS_FMT = + new DateTimeFormatterBuilder() + .append(DateTimeFormatter.ISO_LOCAL_DATE_TIME) + .appendOffset("+HHmm", "Z") + .toFormatter(); + + private final Schema tableSchema; + private final NameMapping nameMapping; + private final IcebergSinkConfig config; + private final Map> structNameMap = Maps.newHashMap(); + + public RecordConverter(Table table, IcebergSinkConfig config) { + this.tableSchema = table.schema(); + this.nameMapping = createNameMapping(table); + this.config = config; + } + + public Record convert(Object data) { + return convert(data, null); + } + + public Record convert(Object data, SchemaUpdate.Consumer schemaUpdateConsumer) { + if (data instanceof Struct || data instanceof Map) { + return convertStructValue(data, tableSchema.asStruct(), -1, schemaUpdateConsumer); + } + throw new UnsupportedOperationException("Cannot convert type: " + data.getClass().getName()); + } + + private NameMapping createNameMapping(Table table) { + String nameMappingString = table.properties().get(TableProperties.DEFAULT_NAME_MAPPING); + return nameMappingString != null ? NameMappingParser.fromJson(nameMappingString) : null; + } + + private Object convertValue( + Object value, Type type, int fieldId, SchemaUpdate.Consumer schemaUpdateConsumer) { + if (value == null) { + return null; + } + switch (type.typeId()) { + case STRUCT: + return convertStructValue(value, type.asStructType(), fieldId, schemaUpdateConsumer); + case LIST: + return convertListValue(value, type.asListType(), schemaUpdateConsumer); + case MAP: + return convertMapValue(value, type.asMapType(), schemaUpdateConsumer); + case INTEGER: + return convertInt(value); + case LONG: + return convertLong(value); + case FLOAT: + return convertFloat(value); + case DOUBLE: + return convertDouble(value); + case DECIMAL: + return convertDecimal(value, (DecimalType) type); + case BOOLEAN: + return convertBoolean(value); + case STRING: + return convertString(value); + case UUID: + return convertUUID(value); + case BINARY: + case FIXED: + return convertBase64Binary(value); + case DATE: + return convertDateValue(value); + case TIME: + return convertTimeValue(value); + case TIMESTAMP: + return convertTimestampValue(value, (TimestampType) type); + } + throw new UnsupportedOperationException("Unsupported type: " + type.typeId()); + } + + protected GenericRecord convertStructValue( + Object value, + StructType schema, + int parentFieldId, + SchemaUpdate.Consumer schemaUpdateConsumer) { + if (value instanceof Map) { + return convertToStruct((Map) value, schema, parentFieldId, schemaUpdateConsumer); + } else if (value instanceof Struct) { + return convertToStruct((Struct) value, schema, parentFieldId, schemaUpdateConsumer); + } + throw new IllegalArgumentException("Cannot convert to struct: " + value.getClass().getName()); + } + + private GenericRecord convertToStruct( + Map map, + StructType schema, + int structFieldId, + SchemaUpdate.Consumer schemaUpdateConsumer) { + GenericRecord result = GenericRecord.create(schema); + map.forEach( + (recordFieldNameObj, recordFieldValue) -> { + String recordFieldName = recordFieldNameObj.toString(); + NestedField tableField = lookupStructField(recordFieldName, schema, structFieldId); + if (tableField == null) { + // add the column if schema evolution is on, otherwise skip the value, + // skip the add column if we can't infer the type + if (schemaUpdateConsumer != null) { + Type type = SchemaUtils.inferIcebergType(recordFieldValue, config); + if (type != null) { + String parentFieldName = + structFieldId < 0 ? null : tableSchema.findColumnName(structFieldId); + schemaUpdateConsumer.addColumn(parentFieldName, recordFieldName, type); + } + } + } else { + result.setField( + tableField.name(), + convertValue( + recordFieldValue, + tableField.type(), + tableField.fieldId(), + schemaUpdateConsumer)); + } + }); + return result; + } + + private GenericRecord convertToStruct( + Struct struct, + StructType schema, + int structFieldId, + SchemaUpdate.Consumer schemaUpdateConsumer) { + GenericRecord result = GenericRecord.create(schema); + struct + .schema() + .fields() + .forEach( + recordField -> { + NestedField tableField = lookupStructField(recordField.name(), schema, structFieldId); + if (tableField == null) { + // add the column if schema evolution is on, otherwise skip the value + if (schemaUpdateConsumer != null) { + String parentFieldName = + structFieldId < 0 ? null : tableSchema.findColumnName(structFieldId); + Type type = SchemaUtils.toIcebergType(recordField.schema(), config); + schemaUpdateConsumer.addColumn(parentFieldName, recordField.name(), type); + } + } else { + boolean hasSchemaUpdates = false; + if (schemaUpdateConsumer != null) { + // update the type if needed and schema evolution is on + PrimitiveType evolveDataType = + SchemaUtils.needsDataTypeUpdate(tableField.type(), recordField.schema()); + if (evolveDataType != null) { + String fieldName = tableSchema.findColumnName(tableField.fieldId()); + schemaUpdateConsumer.updateType(fieldName, evolveDataType); + hasSchemaUpdates = true; + } + // make optional if needed and schema evolution is on + if (tableField.isRequired() && recordField.schema().isOptional()) { + String fieldName = tableSchema.findColumnName(tableField.fieldId()); + schemaUpdateConsumer.makeOptional(fieldName); + hasSchemaUpdates = true; + } + } + if (!hasSchemaUpdates) { + result.setField( + tableField.name(), + convertValue( + struct.get(recordField), + tableField.type(), + tableField.fieldId(), + schemaUpdateConsumer)); + } + } + }); + return result; + } + + private NestedField lookupStructField(String fieldName, StructType schema, int structFieldId) { + if (nameMapping == null) { + return config.schemaCaseInsensitive() + ? schema.caseInsensitiveField(fieldName) + : schema.field(fieldName); + } + + return structNameMap + .computeIfAbsent(structFieldId, notUsed -> createStructNameMap(schema)) + .get(fieldName); + } + + private Map createStructNameMap(StructType schema) { + Map map = Maps.newHashMap(); + schema + .fields() + .forEach( + col -> { + MappedField mappedField = nameMapping.find(col.fieldId()); + if (mappedField != null && !mappedField.names().isEmpty()) { + mappedField.names().forEach(name -> map.put(name, col)); + } else { + map.put(col.name(), col); + } + }); + return map; + } + + protected List convertListValue( + Object value, ListType type, SchemaUpdate.Consumer schemaUpdateConsumer) { + Preconditions.checkArgument(value instanceof List); + List list = (List) value; + return list.stream() + .map( + element -> { + int fieldId = type.fields().get(0).fieldId(); + return convertValue(element, type.elementType(), fieldId, schemaUpdateConsumer); + }) + .collect(Collectors.toList()); + } + + protected Map convertMapValue( + Object value, MapType type, SchemaUpdate.Consumer schemaUpdateConsumer) { + Preconditions.checkArgument(value instanceof Map); + Map map = (Map) value; + Map result = Maps.newHashMap(); + map.forEach( + (k, v) -> { + int keyFieldId = type.fields().get(0).fieldId(); + int valueFieldId = type.fields().get(1).fieldId(); + result.put( + convertValue(k, type.keyType(), keyFieldId, schemaUpdateConsumer), + convertValue(v, type.valueType(), valueFieldId, schemaUpdateConsumer)); + }); + return result; + } + + protected int convertInt(Object value) { + if (value instanceof Number) { + return ((Number) value).intValue(); + } else if (value instanceof String) { + return Integer.parseInt((String) value); + } + throw new IllegalArgumentException("Cannot convert to int: " + value.getClass().getName()); + } + + protected long convertLong(Object value) { + if (value instanceof Number) { + return ((Number) value).longValue(); + } else if (value instanceof String) { + return Long.parseLong((String) value); + } + throw new IllegalArgumentException("Cannot convert to long: " + value.getClass().getName()); + } + + protected float convertFloat(Object value) { + if (value instanceof Number) { + return ((Number) value).floatValue(); + } else if (value instanceof String) { + return Float.parseFloat((String) value); + } + throw new IllegalArgumentException("Cannot convert to float: " + value.getClass().getName()); + } + + protected double convertDouble(Object value) { + if (value instanceof Number) { + return ((Number) value).doubleValue(); + } else if (value instanceof String) { + return Double.parseDouble((String) value); + } + throw new IllegalArgumentException("Cannot convert to double: " + value.getClass().getName()); + } + + protected BigDecimal convertDecimal(Object value, DecimalType type) { + BigDecimal bigDecimal; + if (value instanceof BigDecimal) { + bigDecimal = (BigDecimal) value; + } else if (value instanceof Number) { + Number num = (Number) value; + Double dbl = num.doubleValue(); + if (dbl.equals(Math.floor(dbl))) { + bigDecimal = BigDecimal.valueOf(num.longValue()); + } else { + bigDecimal = BigDecimal.valueOf(dbl); + } + } else if (value instanceof String) { + bigDecimal = new BigDecimal((String) value); + } else { + throw new IllegalArgumentException( + "Cannot convert to BigDecimal: " + value.getClass().getName()); + } + return bigDecimal.setScale(type.scale(), RoundingMode.HALF_UP); + } + + protected boolean convertBoolean(Object value) { + if (value instanceof Boolean) { + return (boolean) value; + } else if (value instanceof String) { + return Boolean.parseBoolean((String) value); + } + throw new IllegalArgumentException("Cannot convert to boolean: " + value.getClass().getName()); + } + + protected String convertString(Object value) { + try { + if (value instanceof String) { + return (String) value; + } else if (value instanceof Number || value instanceof Boolean) { + return value.toString(); + } else if (value instanceof Map || value instanceof List) { + return MAPPER.writeValueAsString(value); + } else if (value instanceof Struct) { + Struct struct = (Struct) value; + byte[] data = config.jsonConverter().fromConnectData(null, struct.schema(), struct); + return new String(data, StandardCharsets.UTF_8); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + throw new IllegalArgumentException("Cannot convert to string: " + value.getClass().getName()); + } + + protected UUID convertUUID(Object value) { + if (value instanceof String) { + return UUID.fromString((String) value); + } else if (value instanceof UUID) { + return (UUID) value; + } + throw new IllegalArgumentException("Cannot convert to UUID: " + value.getClass().getName()); + } + + protected ByteBuffer convertBase64Binary(Object value) { + if (value instanceof String) { + return ByteBuffer.wrap(Base64.getDecoder().decode((String) value)); + } else if (value instanceof byte[]) { + return ByteBuffer.wrap((byte[]) value); + } else if (value instanceof ByteBuffer) { + return (ByteBuffer) value; + } + throw new IllegalArgumentException("Cannot convert to binary: " + value.getClass().getName()); + } + + @SuppressWarnings("JavaUtilDate") + protected LocalDate convertDateValue(Object value) { + if (value instanceof Number) { + int days = ((Number) value).intValue(); + return DateTimeUtil.dateFromDays(days); + } else if (value instanceof String) { + return LocalDate.parse((String) value); + } else if (value instanceof LocalDate) { + return (LocalDate) value; + } else if (value instanceof Date) { + int days = (int) (((Date) value).getTime() / 1000 / 60 / 60 / 24); + return DateTimeUtil.dateFromDays(days); + } + throw new RuntimeException("Cannot convert date: " + value); + } + + @SuppressWarnings("JavaUtilDate") + protected LocalTime convertTimeValue(Object value) { + if (value instanceof Number) { + long millis = ((Number) value).longValue(); + return DateTimeUtil.timeFromMicros(millis * 1000); + } else if (value instanceof String) { + return LocalTime.parse((String) value); + } else if (value instanceof LocalTime) { + return (LocalTime) value; + } else if (value instanceof Date) { + long millis = ((Date) value).getTime(); + return DateTimeUtil.timeFromMicros(millis * 1000); + } + throw new RuntimeException("Cannot convert time: " + value); + } + + protected Temporal convertTimestampValue(Object value, TimestampType type) { + if (type.shouldAdjustToUTC()) { + return convertOffsetDateTime(value); + } + return convertLocalDateTime(value); + } + + @SuppressWarnings("JavaUtilDate") + private OffsetDateTime convertOffsetDateTime(Object value) { + if (value instanceof Number) { + long millis = ((Number) value).longValue(); + return DateTimeUtil.timestamptzFromMicros(millis * 1000); + } else if (value instanceof String) { + return parseOffsetDateTime((String) value); + } else if (value instanceof OffsetDateTime) { + return (OffsetDateTime) value; + } else if (value instanceof LocalDateTime) { + return ((LocalDateTime) value).atOffset(ZoneOffset.UTC); + } else if (value instanceof Date) { + return DateTimeUtil.timestamptzFromMicros(((Date) value).getTime() * 1000); + } + throw new RuntimeException( + "Cannot convert timestamptz: " + value + ", type: " + value.getClass()); + } + + private OffsetDateTime parseOffsetDateTime(String str) { + String tsStr = ensureTimestampFormat(str); + try { + return OFFSET_TS_FMT.parse(tsStr, OffsetDateTime::from); + } catch (DateTimeParseException e) { + return LocalDateTime.parse(tsStr, DateTimeFormatter.ISO_LOCAL_DATE_TIME) + .atOffset(ZoneOffset.UTC); + } + } + + @SuppressWarnings("JavaUtilDate") + private LocalDateTime convertLocalDateTime(Object value) { + if (value instanceof Number) { + long millis = ((Number) value).longValue(); + return DateTimeUtil.timestampFromMicros(millis * 1000); + } else if (value instanceof String) { + return parseLocalDateTime((String) value); + } else if (value instanceof LocalDateTime) { + return (LocalDateTime) value; + } else if (value instanceof OffsetDateTime) { + return ((OffsetDateTime) value).toLocalDateTime(); + } else if (value instanceof Date) { + return DateTimeUtil.timestampFromMicros(((Date) value).getTime() * 1000); + } + throw new RuntimeException( + "Cannot convert timestamp: " + value + ", type: " + value.getClass()); + } + + private LocalDateTime parseLocalDateTime(String str) { + String tsStr = ensureTimestampFormat(str); + try { + return LocalDateTime.parse(tsStr, DateTimeFormatter.ISO_LOCAL_DATE_TIME); + } catch (DateTimeParseException e) { + return OFFSET_TS_FMT.parse(tsStr, OffsetDateTime::from).toLocalDateTime(); + } + } + + private String ensureTimestampFormat(String str) { + String result = str; + if (result.charAt(10) == ' ') { + result = result.substring(0, 10) + 'T' + result.substring(11); + } + if (result.length() > 22 && result.charAt(19) == '+' && result.charAt(22) == ':') { + result = result.substring(0, 19) + result.substring(19).replace(":", ""); + } + return result; + } +} diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordProjection.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordProjection.java new file mode 100644 index 000000000000..79ce2c111a3a --- /dev/null +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordProjection.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.data; + +import java.util.List; +import java.util.Map; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.types.Types.ListType; +import org.apache.iceberg.types.Types.MapType; +import org.apache.iceberg.types.Types.NestedField; +import org.apache.iceberg.types.Types.StructType; + +/** + * This is modified from {@link org.apache.iceberg.util.StructProjection} to support record types. + */ +public class RecordProjection implements Record { + + /** + * Creates a projecting wrapper for {@link Record} rows. + * + *

This projection does not work with repeated types like lists and maps. + * + * @param dataSchema schema of rows wrapped by this projection + * @param projectedSchema result schema of the projected rows + * @return a wrapper to project rows + */ + public static RecordProjection create(Schema dataSchema, Schema projectedSchema) { + return new RecordProjection(dataSchema.asStruct(), projectedSchema.asStruct()); + } + + private final StructType type; + private final int[] positionMap; + private final RecordProjection[] nestedProjections; + private Record record; + + private RecordProjection(StructType structType, StructType projection) { + this(structType, projection, false); + } + + @SuppressWarnings("checkstyle:CyclomaticComplexity") + private RecordProjection(StructType structType, StructType projection, boolean allowMissing) { + this.type = projection; + this.positionMap = new int[projection.fields().size()]; + this.nestedProjections = new RecordProjection[projection.fields().size()]; + + // set up the projection positions and any nested projections that are needed + List dataFields = structType.fields(); + for (int pos = 0; pos < positionMap.length; pos += 1) { + NestedField projectedField = projection.fields().get(pos); + + boolean found = false; + for (int i = 0; !found && i < dataFields.size(); i += 1) { + NestedField dataField = dataFields.get(i); + if (projectedField.fieldId() == dataField.fieldId()) { + found = true; + positionMap[pos] = i; + switch (projectedField.type().typeId()) { + case STRUCT: + nestedProjections[pos] = + new RecordProjection( + dataField.type().asStructType(), projectedField.type().asStructType()); + break; + case MAP: + MapType projectedMap = projectedField.type().asMapType(); + MapType originalMap = dataField.type().asMapType(); + + boolean keyProjectable = + !projectedMap.keyType().isNestedType() + || projectedMap.keyType().equals(originalMap.keyType()); + boolean valueProjectable = + !projectedMap.valueType().isNestedType() + || projectedMap.valueType().equals(originalMap.valueType()); + Preconditions.checkArgument( + keyProjectable && valueProjectable, + "Cannot project a partial map key or value struct. Trying to project %s out of %s", + projectedField, + dataField); + + nestedProjections[pos] = null; + break; + case LIST: + ListType projectedList = projectedField.type().asListType(); + ListType originalList = dataField.type().asListType(); + + boolean elementProjectable = + !projectedList.elementType().isNestedType() + || projectedList.elementType().equals(originalList.elementType()); + Preconditions.checkArgument( + elementProjectable, + "Cannot project a partial list element struct. Trying to project %s out of %s", + projectedField, + dataField); + + nestedProjections[pos] = null; + break; + default: + nestedProjections[pos] = null; + } + } + } + + if (!found && projectedField.isOptional() && allowMissing) { + positionMap[pos] = -1; + nestedProjections[pos] = null; + } else if (!found) { + throw new IllegalArgumentException( + String.format("Cannot find field %s in %s", projectedField, structType)); + } + } + } + + public RecordProjection wrap(Record newRecord) { + this.record = newRecord; + return this; + } + + @Override + public int size() { + return type.fields().size(); + } + + @Override + public T get(int pos, Class javaClass) { + // struct can be null if wrap is not called first before the get call + // or if a null struct is wrapped. + if (record == null) { + return null; + } + + int recordPos = positionMap[pos]; + if (nestedProjections[pos] != null) { + Record nestedStruct = record.get(recordPos, Record.class); + if (nestedStruct == null) { + return null; + } + + return javaClass.cast(nestedProjections[pos].wrap(nestedStruct)); + } + + if (recordPos != -1) { + return record.get(recordPos, javaClass); + } else { + return null; + } + } + + @Override + public void set(int pos, T value) { + throw new UnsupportedOperationException(); + } + + @Override + public StructType struct() { + return type; + } + + @Override + public Object getField(String name) { + throw new UnsupportedOperationException(); + } + + @Override + public void setField(String name, Object value) { + throw new UnsupportedOperationException(); + } + + @Override + public Object get(int pos) { + return get(pos, Object.class); + } + + @Override + public Record copy() { + throw new UnsupportedOperationException(); + } + + @Override + public Record copy(Map overwriteValues) { + throw new UnsupportedOperationException(); + } +} diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordWrapper.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordWrapper.java new file mode 100644 index 000000000000..915608562034 --- /dev/null +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordWrapper.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.data; + +import java.util.Map; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.types.Types.StructType; + +public class RecordWrapper implements Record { + + private final Record delegate; + private final Operation op; + + public RecordWrapper(Record delegate, Operation op) { + this.delegate = delegate; + this.op = op; + } + + public Operation op() { + return op; + } + + @Override + public StructType struct() { + return delegate.struct(); + } + + @Override + public Object getField(String name) { + return delegate.getField(name); + } + + @Override + public void setField(String name, Object value) { + delegate.setField(name, value); + } + + @Override + public Object get(int pos) { + return delegate.get(pos); + } + + @Override + public Record copy() { + return new RecordWrapper(delegate.copy(), op); + } + + @Override + public Record copy(Map overwriteValues) { + return new RecordWrapper(delegate.copy(overwriteValues), op); + } + + @Override + public int size() { + return delegate.size(); + } + + @Override + public T get(int pos, Class javaClass) { + return delegate.get(pos, javaClass); + } + + @Override + public void set(int pos, T value) { + delegate.set(pos, value); + } +} diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/UnpartitionedDeltaWriter.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/UnpartitionedDeltaWriter.java new file mode 100644 index 000000000000..46b0a45d532d --- /dev/null +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/UnpartitionedDeltaWriter.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.data; + +import java.io.IOException; +import java.util.Set; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.FileAppenderFactory; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.OutputFileFactory; + +public class UnpartitionedDeltaWriter extends BaseDeltaTaskWriter { + private final RowDataDeltaWriter writer; + + UnpartitionedDeltaWriter( + PartitionSpec spec, + FileFormat format, + FileAppenderFactory appenderFactory, + OutputFileFactory fileFactory, + FileIO io, + long targetFileSize, + Schema schema, + Set identifierFieldIds, + boolean upsertMode) { + super( + spec, + format, + appenderFactory, + fileFactory, + io, + targetFileSize, + schema, + identifierFieldIds, + upsertMode); + this.writer = new RowDataDeltaWriter(null); + } + + @Override + RowDataDeltaWriter route(Record row) { + return writer; + } + + @Override + public void close() throws IOException { + writer.close(); + } +} diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/Utilities.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/Utilities.java index ec13b003a21a..2de58d0d7486 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/Utilities.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/Utilities.java @@ -18,12 +18,6 @@ */ package org.apache.iceberg.connect.data; -import static java.util.stream.Collectors.toSet; -import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; -import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; -import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES; -import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT; - import java.io.IOException; import java.net.URL; import java.nio.file.Files; @@ -33,9 +27,11 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.stream.Collectors; import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.FileFormat; import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.common.DynClasses; import org.apache.iceberg.common.DynConstructors; @@ -175,12 +171,16 @@ public static TaskWriter createTableWriter( Map tableProps = Maps.newHashMap(table.properties()); tableProps.putAll(config.writeProps()); - String formatStr = tableProps.getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT); + String formatStr = + tableProps.getOrDefault( + TableProperties.DEFAULT_FILE_FORMAT, TableProperties.DEFAULT_FILE_FORMAT_DEFAULT); FileFormat format = FileFormat.fromString(formatStr); long targetFileSize = PropertyUtil.propertyAsLong( - tableProps, WRITE_TARGET_FILE_SIZE_BYTES, WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT); + tableProps, + TableProperties.WRITE_TARGET_FILE_SIZE_BYTES, + TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT); Set identifierFieldIds = table.schema().identifierFieldIds(); @@ -197,7 +197,7 @@ public static TaskWriter createTableWriter( } return field.fieldId(); }) - .collect(toSet()); + .collect(Collectors.toSet()); } FileAppenderFactory appenderFactory; @@ -224,23 +224,49 @@ public static TaskWriter createTableWriter( .format(format) .build(); - // FIXME: add delta writers - TaskWriter writer; if (table.spec().isUnpartitioned()) { - writer = - new UnpartitionedWriter<>( - table.spec(), format, appenderFactory, fileFactory, table.io(), targetFileSize); + if (config.tablesCdcField() == null && !config.upsertModeEnabled()) { + writer = + new UnpartitionedWriter<>( + table.spec(), format, appenderFactory, fileFactory, table.io(), targetFileSize); + } else { + writer = + new UnpartitionedDeltaWriter( + table.spec(), + format, + appenderFactory, + fileFactory, + table.io(), + targetFileSize, + table.schema(), + identifierFieldIds, + config.upsertModeEnabled()); + } } else { - writer = - new PartitionedAppendWriter( - table.spec(), - format, - appenderFactory, - fileFactory, - table.io(), - targetFileSize, - table.schema()); + if (config.tablesCdcField() == null && !config.upsertModeEnabled()) { + writer = + new PartitionedAppendWriter( + table.spec(), + format, + appenderFactory, + fileFactory, + table.io(), + targetFileSize, + table.schema()); + } else { + writer = + new PartitionedDeltaWriter( + table.spec(), + format, + appenderFactory, + fileFactory, + table.io(), + targetFileSize, + table.schema(), + identifierFieldIds, + config.upsertModeEnabled()); + } } return writer; } diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/IcebergSinkConnectorTest.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/IcebergSinkConnectorTest.java index 86502794b224..c8f563a13911 100644 --- a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/IcebergSinkConnectorTest.java +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/IcebergSinkConnectorTest.java @@ -18,7 +18,6 @@ */ package org.apache.iceberg.connect; -import static org.apache.iceberg.connect.IcebergSinkConfig.INTERNAL_TRANSACTIONAL_SUFFIX_PROP; import static org.assertj.core.api.Assertions.assertThat; import java.util.List; @@ -35,6 +34,7 @@ public void testTaskConfigs() { connector.start(ImmutableMap.of()); List> configs = connector.taskConfigs(3); assertThat(configs).hasSize(3); - configs.forEach(map -> assertThat(map).containsKey(INTERNAL_TRANSACTIONAL_SUFFIX_PROP)); + configs.forEach( + map -> assertThat(map).containsKey(IcebergSinkConfig.INTERNAL_TRANSACTIONAL_SUFFIX_PROP)); } } diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/PartitionedDeltaWriterTest.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/PartitionedDeltaWriterTest.java new file mode 100644 index 000000000000..b8b7f0b66f95 --- /dev/null +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/PartitionedDeltaWriterTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.data; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.connect.IcebergSinkConfig; +import org.apache.iceberg.connect.TableSinkConfig; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.WriteResult; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +public class PartitionedDeltaWriterTest extends BaseWriterTest { + + @ParameterizedTest + @ValueSource(strings = {"parquet", "orc"}) + public void testPartitionedDeltaWriter(String format) { + IcebergSinkConfig config = mock(IcebergSinkConfig.class); + when(config.upsertModeEnabled()).thenReturn(true); + when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class)); + when(config.writeProps()).thenReturn(ImmutableMap.of("write.format.default", format)); + + when(table.spec()).thenReturn(SPEC); + + Record row1 = GenericRecord.create(SCHEMA); + row1.setField("id", 123L); + row1.setField("data", "hello world!"); + row1.setField("id2", 123L); + + Record row2 = GenericRecord.create(SCHEMA); + row2.setField("id", 234L); + row2.setField("data", "foobar"); + row2.setField("id2", 234L); + + WriteResult result = + writeTest(ImmutableList.of(row1, row2), config, PartitionedDeltaWriter.class); + + // in upsert mode, each write is a delete + append, so we'll have 1 data file + // and 1 delete file for each partition (2 total) + assertThat(result.dataFiles()).hasSize(2); + assertThat(result.dataFiles()).allMatch(file -> file.format() == FileFormat.fromString(format)); + assertThat(result.deleteFiles()).hasSize(2); + assertThat(result.deleteFiles()) + .allMatch(file -> file.format() == FileFormat.fromString(format)); + } +} diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/RecordConverterTest.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/RecordConverterTest.java new file mode 100644 index 000000000000..a61e7324a778 --- /dev/null +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/RecordConverterTest.java @@ -0,0 +1,828 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.data; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.fasterxml.jackson.databind.ObjectMapper; +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.time.temporal.Temporal; +import java.util.Base64; +import java.util.Collection; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.function.Function; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.connect.IcebergSinkConfig; +import org.apache.iceberg.connect.data.SchemaUpdate.AddColumn; +import org.apache.iceberg.connect.data.SchemaUpdate.UpdateType; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.mapping.MappedField; +import org.apache.iceberg.mapping.NameMapping; +import org.apache.iceberg.mapping.NameMappingParser; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.types.Types.BinaryType; +import org.apache.iceberg.types.Types.DateType; +import org.apache.iceberg.types.Types.DecimalType; +import org.apache.iceberg.types.Types.DoubleType; +import org.apache.iceberg.types.Types.FloatType; +import org.apache.iceberg.types.Types.IntegerType; +import org.apache.iceberg.types.Types.ListType; +import org.apache.iceberg.types.Types.LongType; +import org.apache.iceberg.types.Types.MapType; +import org.apache.iceberg.types.Types.StringType; +import org.apache.iceberg.types.Types.StructType; +import org.apache.iceberg.types.Types.TimeType; +import org.apache.iceberg.types.Types.TimestampType; +import org.apache.kafka.connect.data.Decimal; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.data.Time; +import org.apache.kafka.connect.data.Timestamp; +import org.apache.kafka.connect.json.JsonConverter; +import org.apache.kafka.connect.json.JsonConverterConfig; +import org.apache.kafka.connect.storage.ConverterConfig; +import org.apache.kafka.connect.storage.ConverterType; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +public class RecordConverterTest { + + private static final ObjectMapper MAPPER = new ObjectMapper(); + + private static final org.apache.iceberg.Schema SCHEMA = + new org.apache.iceberg.Schema( + Types.NestedField.required(21, "i", IntegerType.get()), + Types.NestedField.required(22, "l", LongType.get()), + Types.NestedField.required(23, "d", DateType.get()), + Types.NestedField.required(24, "t", TimeType.get()), + Types.NestedField.required(25, "ts", TimestampType.withoutZone()), + Types.NestedField.required(26, "tsz", TimestampType.withZone()), + Types.NestedField.required(27, "fl", FloatType.get()), + Types.NestedField.required(28, "do", DoubleType.get()), + Types.NestedField.required(29, "dec", DecimalType.of(9, 2)), + Types.NestedField.required(30, "s", StringType.get()), + Types.NestedField.required(31, "u", Types.UUIDType.get()), + Types.NestedField.required(32, "f", Types.FixedType.ofLength(3)), + Types.NestedField.required(33, "b", BinaryType.get()), + Types.NestedField.required(34, "li", ListType.ofRequired(35, StringType.get())), + Types.NestedField.required( + 36, "ma", MapType.ofRequired(37, 38, StringType.get(), StringType.get())), + Types.NestedField.optional(39, "extra", StringType.get())); + + // we have 1 unmapped column so exclude that from the count + private static final int MAPPED_CNT = SCHEMA.columns().size() - 1; + + private static final org.apache.iceberg.Schema NESTED_SCHEMA = + new org.apache.iceberg.Schema( + Types.NestedField.required(1, "ii", IntegerType.get()), + Types.NestedField.required(2, "st", SCHEMA.asStruct())); + + private static final org.apache.iceberg.Schema SIMPLE_SCHEMA = + new org.apache.iceberg.Schema( + Types.NestedField.required(1, "ii", IntegerType.get()), + Types.NestedField.required(2, "st", StringType.get())); + + private static final org.apache.iceberg.Schema ID_SCHEMA = + new org.apache.iceberg.Schema(Types.NestedField.required(1, "ii", IntegerType.get())); + + private static final org.apache.iceberg.Schema STRUCT_IN_LIST_SCHEMA = + new org.apache.iceberg.Schema( + Types.NestedField.required( + 100, "stli", ListType.ofRequired(101, NESTED_SCHEMA.asStruct()))); + + private static final org.apache.iceberg.Schema STRUCT_IN_LIST_BASIC_SCHEMA = + new org.apache.iceberg.Schema( + Types.NestedField.required(100, "stli", ListType.ofRequired(101, ID_SCHEMA.asStruct()))); + + private static final org.apache.iceberg.Schema STRUCT_IN_MAP_SCHEMA = + new org.apache.iceberg.Schema( + Types.NestedField.required( + 100, + "stma", + MapType.ofRequired(101, 102, StringType.get(), NESTED_SCHEMA.asStruct()))); + + private static final org.apache.iceberg.Schema STRUCT_IN_MAP_BASIC_SCHEMA = + new org.apache.iceberg.Schema( + Types.NestedField.required( + 100, "stma", MapType.ofRequired(101, 102, StringType.get(), ID_SCHEMA.asStruct()))); + + private static final Schema CONNECT_SCHEMA = + SchemaBuilder.struct() + .field("i", Schema.INT32_SCHEMA) + .field("l", Schema.INT64_SCHEMA) + .field("d", org.apache.kafka.connect.data.Date.SCHEMA) + .field("t", Time.SCHEMA) + .field("ts", Timestamp.SCHEMA) + .field("tsz", Timestamp.SCHEMA) + .field("fl", Schema.FLOAT32_SCHEMA) + .field("do", Schema.FLOAT64_SCHEMA) + .field("dec", Decimal.schema(2)) + .field("s", Schema.STRING_SCHEMA) + .field("u", Schema.STRING_SCHEMA) + .field("f", Schema.BYTES_SCHEMA) + .field("b", Schema.BYTES_SCHEMA) + .field("li", SchemaBuilder.array(Schema.STRING_SCHEMA)) + .field("ma", SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA)); + + private static final Schema CONNECT_NESTED_SCHEMA = + SchemaBuilder.struct().field("ii", Schema.INT32_SCHEMA).field("st", CONNECT_SCHEMA); + + private static final Schema CONNECT_STRUCT_IN_LIST_SCHEMA = + SchemaBuilder.struct().field("stli", SchemaBuilder.array(CONNECT_NESTED_SCHEMA)).build(); + + private static final Schema CONNECT_STRUCT_IN_MAP_SCHEMA = + SchemaBuilder.struct() + .field("stma", SchemaBuilder.map(Schema.STRING_SCHEMA, CONNECT_NESTED_SCHEMA)) + .build(); + + private static final LocalDate DATE_VAL = LocalDate.parse("2023-05-18"); + private static final LocalTime TIME_VAL = LocalTime.parse("07:14:21"); + private static final LocalDateTime TS_VAL = LocalDateTime.parse("2023-05-18T07:14:21"); + private static final OffsetDateTime TSZ_VAL = OffsetDateTime.parse("2023-05-18T07:14:21Z"); + private static final BigDecimal DEC_VAL = new BigDecimal("12.34"); + private static final String STR_VAL = "foobar"; + private static final UUID UUID_VAL = UUID.randomUUID(); + private static final ByteBuffer BYTES_VAL = ByteBuffer.wrap(new byte[] {1, 2, 3}); + private static final List LIST_VAL = ImmutableList.of("hello", "world"); + private static final Map MAP_VAL = ImmutableMap.of("one", "1", "two", "2"); + + private static final JsonConverter JSON_CONVERTER = new JsonConverter(); + + static { + JSON_CONVERTER.configure( + ImmutableMap.of( + JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, + false, + ConverterConfig.TYPE_CONFIG, + ConverterType.VALUE.getName())); + } + + private IcebergSinkConfig config; + + @BeforeEach + public void before() { + this.config = mock(IcebergSinkConfig.class); + when(config.jsonConverter()).thenReturn(JSON_CONVERTER); + } + + @Test + public void testMapConvert() { + Table table = mock(Table.class); + when(table.schema()).thenReturn(SCHEMA); + RecordConverter converter = new RecordConverter(table, config); + + Map data = createMapData(); + Record record = converter.convert(data); + assertRecordValues(record); + } + + @Test + public void testNestedMapConvert() { + Table table = mock(Table.class); + when(table.schema()).thenReturn(NESTED_SCHEMA); + RecordConverter converter = new RecordConverter(table, config); + + Map nestedData = createNestedMapData(); + Record record = converter.convert(nestedData); + assertNestedRecordValues(record); + } + + @Test + @SuppressWarnings("unchecked") + public void testMapToString() throws Exception { + Table table = mock(Table.class); + when(table.schema()).thenReturn(SIMPLE_SCHEMA); + RecordConverter converter = new RecordConverter(table, config); + + Map nestedData = createNestedMapData(); + Record record = converter.convert(nestedData); + + String str = (String) record.getField("st"); + Map map = (Map) MAPPER.readValue(str, Map.class); + assertThat(map).hasSize(MAPPED_CNT); + } + + @Test + public void testMapValueInListConvert() { + Table table = mock(Table.class); + when(table.schema()).thenReturn(STRUCT_IN_LIST_SCHEMA); + RecordConverter converter = new RecordConverter(table, config); + + Map data = createNestedMapData(); + Record record = converter.convert(ImmutableMap.of("stli", ImmutableList.of(data, data))); + List fieldVal = (List) record.getField("stli"); + + Record elementVal = (Record) fieldVal.get(0); + assertNestedRecordValues(elementVal); + } + + @Test + public void testMapValueInMapConvert() { + Table table = mock(Table.class); + when(table.schema()).thenReturn(STRUCT_IN_MAP_SCHEMA); + RecordConverter converter = new RecordConverter(table, config); + + Map data = createNestedMapData(); + Record record = + converter.convert(ImmutableMap.of("stma", ImmutableMap.of("key1", data, "key2", data))); + + Map fieldVal = (Map) record.getField("stma"); + Record mapVal = (Record) fieldVal.get("key1"); + assertNestedRecordValues(mapVal); + } + + @Test + public void testStructConvert() { + Table table = mock(Table.class); + when(table.schema()).thenReturn(SCHEMA); + RecordConverter converter = new RecordConverter(table, config); + + Struct data = createStructData(); + Record record = converter.convert(data); + assertRecordValues(record); + } + + @Test + public void testNestedStructConvert() { + Table table = mock(Table.class); + when(table.schema()).thenReturn(NESTED_SCHEMA); + RecordConverter converter = new RecordConverter(table, config); + + Struct nestedData = createNestedStructData(); + Record record = converter.convert(nestedData); + assertNestedRecordValues(record); + } + + @Test + @SuppressWarnings("unchecked") + public void testStructToString() throws Exception { + Table table = mock(Table.class); + when(table.schema()).thenReturn(SIMPLE_SCHEMA); + RecordConverter converter = new RecordConverter(table, config); + + Struct nestedData = createNestedStructData(); + Record record = converter.convert(nestedData); + + String str = (String) record.getField("st"); + Map map = (Map) MAPPER.readValue(str, Map.class); + assertThat(map).hasSize(MAPPED_CNT); + } + + @Test + public void testStructValueInListConvert() { + Table table = mock(Table.class); + when(table.schema()).thenReturn(STRUCT_IN_LIST_SCHEMA); + RecordConverter converter = new RecordConverter(table, config); + + Struct data = createNestedStructData(); + Struct struct = + new Struct(CONNECT_STRUCT_IN_LIST_SCHEMA).put("stli", ImmutableList.of(data, data)); + Record record = converter.convert(struct); + + List fieldVal = (List) record.getField("stli"); + Record elementVal = (Record) fieldVal.get(0); + assertNestedRecordValues(elementVal); + } + + @Test + public void testStructValueInMapConvert() { + Table table = mock(Table.class); + when(table.schema()).thenReturn(STRUCT_IN_MAP_SCHEMA); + RecordConverter converter = new RecordConverter(table, config); + + Struct data = createNestedStructData(); + Struct struct = + new Struct(CONNECT_STRUCT_IN_MAP_SCHEMA) + .put("stma", ImmutableMap.of("key1", data, "key2", data)); + Record record = converter.convert(struct); + + Map fieldVal = (Map) record.getField("stma"); + Record mapVal = (Record) fieldVal.get("key1"); + assertNestedRecordValues(mapVal); + } + + @Test + public void testNameMapping() { + Table table = mock(Table.class); + when(table.schema()).thenReturn(SIMPLE_SCHEMA); + + NameMapping nameMapping = NameMapping.of(MappedField.of(1, ImmutableList.of("renamed_ii"))); + when(table.properties()) + .thenReturn( + ImmutableMap.of( + TableProperties.DEFAULT_NAME_MAPPING, NameMappingParser.toJson(nameMapping))); + + RecordConverter converter = new RecordConverter(table, config); + + Map data = ImmutableMap.of("renamed_ii", 123); + Record record = converter.convert(data); + assertThat(record.getField("ii")).isEqualTo(123); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testCaseSensitivity(boolean caseInsensitive) { + Table table = mock(Table.class); + when(table.schema()).thenReturn(SIMPLE_SCHEMA); + + when(config.schemaCaseInsensitive()).thenReturn(caseInsensitive); + + RecordConverter converter = new RecordConverter(table, config); + + Map mapData = ImmutableMap.of("II", 123); + Record record1 = converter.convert(mapData); + + Struct structData = + new Struct(SchemaBuilder.struct().field("II", Schema.INT32_SCHEMA).build()).put("II", 123); + Record record2 = converter.convert(structData); + + if (caseInsensitive) { + assertThat(record1.getField("ii")).isEqualTo(123); + assertThat(record2.getField("ii")).isEqualTo(123); + } else { + assertThat(record1.getField("ii")).isEqualTo(null); + assertThat(record2.getField("ii")).isEqualTo(null); + } + } + + @Test + public void testDecimalConversion() { + Table table = mock(Table.class); + when(table.schema()).thenReturn(SIMPLE_SCHEMA); + + RecordConverter converter = new RecordConverter(table, config); + + BigDecimal expected = new BigDecimal("123.45"); + + ImmutableList.of("123.45", 123.45d, expected) + .forEach( + input -> { + BigDecimal decimal = converter.convertDecimal(input, DecimalType.of(10, 2)); + assertThat(decimal).isEqualTo(expected); + }); + + BigDecimal expected2 = new BigDecimal(123); + + ImmutableList.of("123", 123, expected2) + .forEach( + input -> { + BigDecimal decimal = converter.convertDecimal(input, DecimalType.of(10, 0)); + assertThat(decimal).isEqualTo(expected2); + }); + } + + @Test + public void testDateConversion() { + Table table = mock(Table.class); + when(table.schema()).thenReturn(SIMPLE_SCHEMA); + RecordConverter converter = new RecordConverter(table, config); + + LocalDate expected = LocalDate.of(2023, 11, 15); + + List inputList = + ImmutableList.of( + "2023-11-15", + expected.toEpochDay(), + expected, + new Date(Duration.ofDays(expected.toEpochDay()).toMillis())); + + inputList.forEach( + input -> { + Temporal ts = converter.convertDateValue(input); + assertThat(ts).isEqualTo(expected); + }); + } + + @Test + public void testTimeConversion() { + Table table = mock(Table.class); + when(table.schema()).thenReturn(SIMPLE_SCHEMA); + RecordConverter converter = new RecordConverter(table, config); + + LocalTime expected = LocalTime.of(7, 51, 30, 888_000_000); + + List inputList = + ImmutableList.of( + "07:51:30.888", + expected.toNanoOfDay() / 1000 / 1000, + expected, + new Date(expected.toNanoOfDay() / 1000 / 1000)); + + inputList.forEach( + input -> { + Temporal ts = converter.convertTimeValue(input); + assertThat(ts).isEqualTo(expected); + }); + } + + @Test + public void testTimestampWithZoneConversion() { + OffsetDateTime expected = OffsetDateTime.parse("2023-05-18T11:22:33Z"); + long expectedMillis = expected.toInstant().toEpochMilli(); + convertToTimestamps(expected, expectedMillis, TimestampType.withZone()); + } + + @Test + public void testTimestampWithoutZoneConversion() { + LocalDateTime expected = LocalDateTime.parse("2023-05-18T11:22:33"); + long expectedMillis = expected.atZone(ZoneOffset.UTC).toInstant().toEpochMilli(); + convertToTimestamps(expected, expectedMillis, TimestampType.withoutZone()); + } + + private void convertToTimestamps(Temporal expected, long expectedMillis, TimestampType type) { + Table table = mock(Table.class); + when(table.schema()).thenReturn(SIMPLE_SCHEMA); + RecordConverter converter = new RecordConverter(table, config); + + List inputList = + ImmutableList.of( + "2023-05-18T11:22:33Z", + "2023-05-18 11:22:33Z", + "2023-05-18T11:22:33+00", + "2023-05-18 11:22:33+00", + "2023-05-18T11:22:33+00:00", + "2023-05-18 11:22:33+00:00", + "2023-05-18T11:22:33+0000", + "2023-05-18 11:22:33+0000", + "2023-05-18T11:22:33", + "2023-05-18 11:22:33", + expectedMillis, + new Date(expectedMillis), + OffsetDateTime.ofInstant(Instant.ofEpochMilli(expectedMillis), ZoneOffset.UTC), + LocalDateTime.ofInstant(Instant.ofEpochMilli(expectedMillis), ZoneOffset.UTC)); + + inputList.forEach( + input -> { + Temporal ts = converter.convertTimestampValue(input, type); + assertThat(ts).isEqualTo(expected); + }); + } + + @Test + public void testMissingColumnDetectionMap() { + Table table = mock(Table.class); + when(table.schema()).thenReturn(ID_SCHEMA); + RecordConverter converter = new RecordConverter(table, config); + + Map data = Maps.newHashMap(createMapData()); + data.put("null", null); + + SchemaUpdate.Consumer consumer = new SchemaUpdate.Consumer(); + converter.convert(data, consumer); + Collection addCols = consumer.addColumns(); + + assertThat(addCols).hasSize(15); + + Map newColMap = Maps.newHashMap(); + addCols.forEach(addCol -> newColMap.put(addCol.name(), addCol)); + + assertTypesAddedFromMap(col -> newColMap.get(col).type()); + + // null values should be ignored + assertThat(newColMap).doesNotContainKey("null"); + } + + @Test + public void testMissingColumnDetectionMapNested() { + Table table = mock(Table.class); + when(table.schema()).thenReturn(ID_SCHEMA); + RecordConverter converter = new RecordConverter(table, config); + + Map nestedData = createNestedMapData(); + SchemaUpdate.Consumer consumer = new SchemaUpdate.Consumer(); + converter.convert(nestedData, consumer); + Collection addCols = consumer.addColumns(); + + assertThat(addCols).hasSize(1); + + AddColumn addCol = addCols.iterator().next(); + assertThat(addCol.name()).isEqualTo("st"); + + StructType addedType = addCol.type().asStructType(); + assertThat(addedType.fields()).hasSize(15); + assertTypesAddedFromMap(col -> addedType.field(col).type()); + } + + @Test + public void testMissingColumnDetectionMapListValue() { + Table table = mock(Table.class); + when(table.schema()).thenReturn(STRUCT_IN_LIST_BASIC_SCHEMA); + RecordConverter converter = new RecordConverter(table, config); + + Map nestedData = createNestedMapData(); + Map map = ImmutableMap.of("stli", ImmutableList.of(nestedData, nestedData)); + SchemaUpdate.Consumer consumer = new SchemaUpdate.Consumer(); + converter.convert(map, consumer); + Collection addCols = consumer.addColumns(); + + assertThat(addCols).hasSize(1); + + AddColumn addCol = addCols.iterator().next(); + assertThat(addCol.parentName()).isEqualTo("stli.element"); + assertThat(addCol.name()).isEqualTo("st"); + + StructType nestedElementType = addCol.type().asStructType(); + assertThat(nestedElementType.fields()).hasSize(15); + assertTypesAddedFromMap(col -> nestedElementType.field(col).type()); + } + + private void assertTypesAddedFromMap(Function fn) { + assertThat(fn.apply("i")).isInstanceOf(LongType.class); + assertThat(fn.apply("l")).isInstanceOf(LongType.class); + assertThat(fn.apply("d")).isInstanceOf(StringType.class); + assertThat(fn.apply("t")).isInstanceOf(StringType.class); + assertThat(fn.apply("ts")).isInstanceOf(StringType.class); + assertThat(fn.apply("tsz")).isInstanceOf(StringType.class); + assertThat(fn.apply("fl")).isInstanceOf(DoubleType.class); + assertThat(fn.apply("do")).isInstanceOf(DoubleType.class); + assertThat(fn.apply("dec")).isInstanceOf(StringType.class); + assertThat(fn.apply("s")).isInstanceOf(StringType.class); + assertThat(fn.apply("u")).isInstanceOf(StringType.class); + assertThat(fn.apply("f")).isInstanceOf(StringType.class); + assertThat(fn.apply("b")).isInstanceOf(StringType.class); + assertThat(fn.apply("li")).isInstanceOf(ListType.class); + assertThat(fn.apply("ma")).isInstanceOf(StructType.class); + } + + @Test + public void testMissingColumnDetectionStruct() { + Table table = mock(Table.class); + when(table.schema()).thenReturn(ID_SCHEMA); + RecordConverter converter = new RecordConverter(table, config); + + Struct data = createStructData(); + SchemaUpdate.Consumer consumer = new SchemaUpdate.Consumer(); + converter.convert(data, consumer); + Collection addCols = consumer.addColumns(); + + assertThat(addCols).hasSize(15); + + Map newColMap = Maps.newHashMap(); + addCols.forEach(addCol -> newColMap.put(addCol.name(), addCol)); + + assertTypesAddedFromStruct(col -> newColMap.get(col).type()); + } + + @Test + public void testMissingColumnDetectionStructNested() { + Table table = mock(Table.class); + when(table.schema()).thenReturn(ID_SCHEMA); + RecordConverter converter = new RecordConverter(table, config); + + Struct nestedData = createNestedStructData(); + SchemaUpdate.Consumer consumer = new SchemaUpdate.Consumer(); + converter.convert(nestedData, consumer); + Collection addCols = consumer.addColumns(); + + assertThat(addCols).hasSize(1); + + AddColumn addCol = addCols.iterator().next(); + assertThat(addCol.name()).isEqualTo("st"); + + StructType addedType = addCol.type().asStructType(); + assertThat(addedType.fields()).hasSize(15); + assertTypesAddedFromStruct(col -> addedType.field(col).type()); + } + + @Test + public void testMissingColumnDetectionStructListValue() { + Table table = mock(Table.class); + when(table.schema()).thenReturn(STRUCT_IN_LIST_BASIC_SCHEMA); + RecordConverter converter = new RecordConverter(table, config); + + Struct nestedData = createNestedStructData(); + Struct struct = + new Struct(CONNECT_STRUCT_IN_LIST_SCHEMA) + .put("stli", ImmutableList.of(nestedData, nestedData)); + SchemaUpdate.Consumer consumer = new SchemaUpdate.Consumer(); + converter.convert(struct, consumer); + Collection addCols = consumer.addColumns(); + + assertThat(addCols).hasSize(1); + + AddColumn addCol = addCols.iterator().next(); + assertThat(addCol.parentName()).isEqualTo("stli.element"); + assertThat(addCol.name()).isEqualTo("st"); + + StructType nestedElementType = addCol.type().asStructType(); + assertThat(nestedElementType.fields()).hasSize(15); + assertTypesAddedFromStruct(col -> nestedElementType.field(col).type()); + } + + @Test + public void testMissingColumnDetectionStructMapValue() { + Table table = mock(Table.class); + when(table.schema()).thenReturn(STRUCT_IN_MAP_BASIC_SCHEMA); + RecordConverter converter = new RecordConverter(table, config); + + Struct nestedData = createNestedStructData(); + Struct struct = + new Struct(CONNECT_STRUCT_IN_MAP_SCHEMA) + .put("stma", ImmutableMap.of("key1", nestedData, "key2", nestedData)); + SchemaUpdate.Consumer consumer = new SchemaUpdate.Consumer(); + converter.convert(struct, consumer); + Collection addCols = consumer.addColumns(); + + assertThat(addCols).hasSize(1); + + AddColumn addCol = addCols.iterator().next(); + assertThat(addCol.parentName()).isEqualTo("stma.value"); + assertThat(addCol.name()).isEqualTo("st"); + + StructType nestedValueType = addCol.type().asStructType(); + assertThat(nestedValueType.fields()).hasSize(15); + assertTypesAddedFromStruct(col -> nestedValueType.field(col).type()); + } + + private void assertTypesAddedFromStruct(Function fn) { + assertThat(fn.apply("i")).isInstanceOf(IntegerType.class); + assertThat(fn.apply("l")).isInstanceOf(LongType.class); + assertThat(fn.apply("d")).isInstanceOf(DateType.class); + assertThat(fn.apply("t")).isInstanceOf(TimeType.class); + assertThat(fn.apply("ts")).isInstanceOf(TimestampType.class); + assertThat(fn.apply("tsz")).isInstanceOf(TimestampType.class); + assertThat(fn.apply("fl")).isInstanceOf(FloatType.class); + assertThat(fn.apply("do")).isInstanceOf(DoubleType.class); + assertThat(fn.apply("dec")).isInstanceOf(DecimalType.class); + assertThat(fn.apply("s")).isInstanceOf(StringType.class); + assertThat(fn.apply("u")).isInstanceOf(StringType.class); + assertThat(fn.apply("f")).isInstanceOf(BinaryType.class); + assertThat(fn.apply("b")).isInstanceOf(BinaryType.class); + assertThat(fn.apply("li")).isInstanceOf(ListType.class); + assertThat(fn.apply("ma")).isInstanceOf(MapType.class); + } + + @Test + public void testEvolveTypeDetectionStruct() { + org.apache.iceberg.Schema tableSchema = + new org.apache.iceberg.Schema( + Types.NestedField.required(1, "ii", IntegerType.get()), + Types.NestedField.required(2, "ff", FloatType.get())); + + Table table = mock(Table.class); + when(table.schema()).thenReturn(tableSchema); + RecordConverter converter = new RecordConverter(table, config); + + Schema valueSchema = + SchemaBuilder.struct().field("ii", Schema.INT64_SCHEMA).field("ff", Schema.FLOAT64_SCHEMA); + Struct data = new Struct(valueSchema).put("ii", 11L).put("ff", 22d); + + SchemaUpdate.Consumer consumer = new SchemaUpdate.Consumer(); + converter.convert(data, consumer); + Collection updates = consumer.updateTypes(); + + assertThat(updates).hasSize(2); + + Map updateMap = Maps.newHashMap(); + updates.forEach(update -> updateMap.put(update.name(), update)); + + assertThat(updateMap.get("ii").type()).isInstanceOf(LongType.class); + assertThat(updateMap.get("ff").type()).isInstanceOf(DoubleType.class); + } + + @Test + public void testEvolveTypeDetectionStructNested() { + org.apache.iceberg.Schema structColSchema = + new org.apache.iceberg.Schema( + Types.NestedField.required(1, "ii", IntegerType.get()), + Types.NestedField.required(2, "ff", FloatType.get())); + + org.apache.iceberg.Schema tableSchema = + new org.apache.iceberg.Schema( + Types.NestedField.required(3, "i", IntegerType.get()), + Types.NestedField.required(4, "st", structColSchema.asStruct())); + + Table table = mock(Table.class); + when(table.schema()).thenReturn(tableSchema); + RecordConverter converter = new RecordConverter(table, config); + + Schema structSchema = + SchemaBuilder.struct().field("ii", Schema.INT64_SCHEMA).field("ff", Schema.FLOAT64_SCHEMA); + Schema schema = + SchemaBuilder.struct().field("i", Schema.INT32_SCHEMA).field("st", structSchema); + Struct structValue = new Struct(structSchema).put("ii", 11L).put("ff", 22d); + Struct data = new Struct(schema).put("i", 1).put("st", structValue); + + SchemaUpdate.Consumer consumer = new SchemaUpdate.Consumer(); + converter.convert(data, consumer); + Collection updates = consumer.updateTypes(); + + assertThat(updates).hasSize(2); + + Map updateMap = Maps.newHashMap(); + updates.forEach(update -> updateMap.put(update.name(), update)); + + assertThat(updateMap.get("st.ii").type()).isInstanceOf(LongType.class); + assertThat(updateMap.get("st.ff").type()).isInstanceOf(DoubleType.class); + } + + private Map createMapData() { + return ImmutableMap.builder() + .put("i", 1) + .put("l", 2L) + .put("d", DATE_VAL.toString()) + .put("t", TIME_VAL.toString()) + .put("ts", TS_VAL.toString()) + .put("tsz", TSZ_VAL.toString()) + .put("fl", 1.1f) + .put("do", 2.2d) + .put("dec", DEC_VAL.toString()) + .put("s", STR_VAL) + .put("u", UUID_VAL.toString()) + .put("f", Base64.getEncoder().encodeToString(BYTES_VAL.array())) + .put("b", Base64.getEncoder().encodeToString(BYTES_VAL.array())) + .put("li", LIST_VAL) + .put("ma", MAP_VAL) + .build(); + } + + private Map createNestedMapData() { + return ImmutableMap.builder().put("ii", 11).put("st", createMapData()).build(); + } + + private Struct createStructData() { + return new Struct(CONNECT_SCHEMA) + .put("i", 1) + .put("l", 2L) + .put("d", new Date(DATE_VAL.toEpochDay() * 24 * 60 * 60 * 1000L)) + .put("t", new Date(TIME_VAL.toNanoOfDay() / 1_000_000)) + .put("ts", Date.from(TS_VAL.atZone(ZoneOffset.UTC).toInstant())) + .put("tsz", Date.from(TSZ_VAL.toInstant())) + .put("fl", 1.1f) + .put("do", 2.2d) + .put("dec", DEC_VAL) + .put("s", STR_VAL) + .put("u", UUID_VAL.toString()) + .put("f", BYTES_VAL.array()) + .put("b", BYTES_VAL.array()) + .put("li", LIST_VAL) + .put("ma", MAP_VAL); + } + + private Struct createNestedStructData() { + return new Struct(CONNECT_NESTED_SCHEMA).put("ii", 11).put("st", createStructData()); + } + + private void assertRecordValues(Record record) { + GenericRecord rec = (GenericRecord) record; + assertThat(rec.getField("i")).isEqualTo(1); + assertThat(rec.getField("l")).isEqualTo(2L); + assertThat(rec.getField("d")).isEqualTo(DATE_VAL); + assertThat(rec.getField("t")).isEqualTo(TIME_VAL); + assertThat(rec.getField("ts")).isEqualTo(TS_VAL); + assertThat(rec.getField("tsz")).isEqualTo(TSZ_VAL); + assertThat(rec.getField("fl")).isEqualTo(1.1f); + assertThat(rec.getField("do")).isEqualTo(2.2d); + assertThat(rec.getField("dec")).isEqualTo(DEC_VAL); + assertThat(rec.getField("s")).isEqualTo(STR_VAL); + assertThat(rec.getField("u")).isEqualTo(UUID_VAL); + assertThat(rec.getField("f")).isEqualTo(BYTES_VAL); + assertThat(rec.getField("b")).isEqualTo(BYTES_VAL); + assertThat(rec.getField("li")).isEqualTo(LIST_VAL); + assertThat(rec.getField("ma")).isEqualTo(MAP_VAL); + } + + private void assertNestedRecordValues(Record record) { + GenericRecord rec = (GenericRecord) record; + assertThat(rec.getField("ii")).isEqualTo(11); + assertRecordValues((GenericRecord) rec.getField("st")); + } +} diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/UnpartitionedDeltaWriterTest.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/UnpartitionedDeltaWriterTest.java new file mode 100644 index 000000000000..947d965bffcd --- /dev/null +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/UnpartitionedDeltaWriterTest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.data; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.connect.IcebergSinkConfig; +import org.apache.iceberg.connect.TableSinkConfig; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.WriteResult; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +public class UnpartitionedDeltaWriterTest extends BaseWriterTest { + + @ParameterizedTest + @ValueSource(strings = {"parquet", "orc"}) + public void testUnpartitionedDeltaWriter(String format) { + IcebergSinkConfig config = mock(IcebergSinkConfig.class); + when(config.upsertModeEnabled()).thenReturn(true); + when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class)); + when(config.writeProps()).thenReturn(ImmutableMap.of("write.format.default", format)); + + Record row = GenericRecord.create(SCHEMA); + row.setField("id", 123L); + row.setField("data", "hello world!"); + row.setField("id2", 123L); + + WriteResult result = writeTest(ImmutableList.of(row), config, UnpartitionedDeltaWriter.class); + + // in upsert mode, each write is a delete + append, so we'll have 1 data file + // and 1 delete file + assertThat(result.dataFiles()).hasSize(1); + assertThat(result.dataFiles()).allMatch(file -> file.format() == FileFormat.fromString(format)); + assertThat(result.deleteFiles()).hasSize(1); + assertThat(result.deleteFiles()) + .allMatch(file -> file.format() == FileFormat.fromString(format)); + } +}