From 96061cac275b4bda926bc536cf2379f7be508114 Mon Sep 17 00:00:00 2001 From: Bryan Keller Date: Wed, 28 Feb 2024 16:30:30 -0800 Subject: [PATCH] remove delta writers --- .../iceberg/connect/IcebergSinkConfig.java | 23 -- .../connect/data/BaseDeltaTaskWriter.java | 102 --------- .../iceberg/connect/data/IcebergWriter.java | 30 +-- .../iceberg/connect/data/Operation.java | 25 --- .../connect/data/PartitionedDeltaWriter.java | 93 -------- .../connect/data/RecordProjection.java | 200 ------------------ .../iceberg/connect/data/RecordWrapper.java | 83 -------- .../data/UnpartitionedDeltaWriter.java | 66 ------ .../iceberg/connect/data/Utilities.java | 52 ++--- .../data/PartitionedDeltaWriterTest.java | 70 ------ .../data/UnpartitionedDeltaWriterTest.java | 62 ------ 11 files changed, 13 insertions(+), 793 deletions(-) delete mode 100644 kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/BaseDeltaTaskWriter.java delete mode 100644 kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/Operation.java delete mode 100644 kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/PartitionedDeltaWriter.java delete mode 100644 kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordProjection.java delete mode 100644 kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordWrapper.java delete mode 100644 kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/UnpartitionedDeltaWriter.java delete mode 100644 kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/PartitionedDeltaWriterTest.java delete mode 100644 kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/UnpartitionedDeltaWriterTest.java diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java index dd9f9e401446..d1572fbff37b 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java @@ -71,9 +71,6 @@ public class IcebergSinkConfig extends AbstractConfig { private static final String TABLES_DEFAULT_COMMIT_BRANCH = "iceberg.tables.default-commit-branch"; private static final String TABLES_DEFAULT_ID_COLUMNS = "iceberg.tables.default-id-columns"; private static final String TABLES_DEFAULT_PARTITION_BY = "iceberg.tables.default-partition-by"; - private static final String TABLES_CDC_FIELD_PROP = "iceberg.tables.cdc-field"; - private static final String TABLES_UPSERT_MODE_ENABLED_PROP = - "iceberg.tables.upsert-mode-enabled"; private static final String TABLES_AUTO_CREATE_ENABLED_PROP = "iceberg.tables.auto-create-enabled"; private static final String TABLES_EVOLVE_SCHEMA_ENABLED_PROP = @@ -152,18 +149,6 @@ private static ConfigDef newConfigDef() { null, Importance.MEDIUM, "Default partition spec to use when creating tables, comma-separated"); - configDef.define( - TABLES_CDC_FIELD_PROP, - ConfigDef.Type.STRING, - null, - Importance.MEDIUM, - "Source record field that identifies the type of operation (insert, update, or delete)"); - configDef.define( - TABLES_UPSERT_MODE_ENABLED_PROP, - ConfigDef.Type.BOOLEAN, - false, - Importance.MEDIUM, - "Set to true to treat all appends as upserts, false otherwise"); configDef.define( TABLES_AUTO_CREATE_ENABLED_PROP, ConfigDef.Type.BOOLEAN, @@ -422,14 +407,6 @@ public String hadoopConfDir() { return getString(HADDOP_CONF_DIR_PROP); } - public String tablesCdcField() { - return getString(TABLES_CDC_FIELD_PROP); - } - - public boolean upsertModeEnabled() { - return getBoolean(TABLES_UPSERT_MODE_ENABLED_PROP); - } - public boolean autoCreateEnabled() { return getBoolean(TABLES_AUTO_CREATE_ENABLED_PROP); } diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/BaseDeltaTaskWriter.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/BaseDeltaTaskWriter.java deleted file mode 100644 index 09cedea693f1..000000000000 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/BaseDeltaTaskWriter.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.connect.data; - -import java.io.IOException; -import java.util.Set; -import org.apache.iceberg.FileFormat; -import org.apache.iceberg.PartitionKey; -import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.Schema; -import org.apache.iceberg.StructLike; -import org.apache.iceberg.data.InternalRecordWrapper; -import org.apache.iceberg.data.Record; -import org.apache.iceberg.io.BaseTaskWriter; -import org.apache.iceberg.io.FileAppenderFactory; -import org.apache.iceberg.io.FileIO; -import org.apache.iceberg.io.OutputFileFactory; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; -import org.apache.iceberg.types.TypeUtil; - -abstract class BaseDeltaTaskWriter extends BaseTaskWriter { - - private final Schema schema; - private final Schema deleteSchema; - private final InternalRecordWrapper wrapper; - private final InternalRecordWrapper keyWrapper; - private final RecordProjection keyProjection; - private final boolean upsertMode; - - BaseDeltaTaskWriter( - PartitionSpec spec, - FileFormat format, - FileAppenderFactory appenderFactory, - OutputFileFactory fileFactory, - FileIO io, - long targetFileSize, - Schema schema, - Set identifierFieldIds, - boolean upsertMode) { - super(spec, format, appenderFactory, fileFactory, io, targetFileSize); - this.schema = schema; - this.deleteSchema = TypeUtil.select(schema, Sets.newHashSet(identifierFieldIds)); - this.wrapper = new InternalRecordWrapper(schema.asStruct()); - this.keyWrapper = new InternalRecordWrapper(deleteSchema.asStruct()); - this.keyProjection = RecordProjection.create(schema, deleteSchema); - this.upsertMode = upsertMode; - } - - abstract RowDataDeltaWriter route(Record row); - - InternalRecordWrapper wrapper() { - return wrapper; - } - - @Override - public void write(Record row) throws IOException { - Operation op = - row instanceof RecordWrapper - ? ((RecordWrapper) row).op() - : upsertMode ? Operation.UPDATE : Operation.INSERT; - RowDataDeltaWriter writer = route(row); - if (op == Operation.UPDATE || op == Operation.DELETE) { - writer.deleteKey(keyProjection.wrap(row)); - } - if (op == Operation.UPDATE || op == Operation.INSERT) { - writer.write(row); - } - } - - class RowDataDeltaWriter extends BaseEqualityDeltaWriter { - - RowDataDeltaWriter(PartitionKey partition) { - super(partition, schema, deleteSchema); - } - - @Override - protected StructLike asStructLike(Record data) { - return wrapper.wrap(data); - } - - @Override - protected StructLike asStructLikeKey(Record data) { - return keyWrapper.wrap(data); - } - } -} diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriter.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriter.java index 8c8ef9e67332..27ffc4de9973 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriter.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriter.java @@ -60,13 +60,7 @@ public void write(SinkRecord record) { // ignore tombstones... if (record.value() != null) { Record row = convertToRow(record); - String cdcField = config.tablesCdcField(); - if (cdcField == null) { - writer.write(row); - } else { - Operation op = extractCdcOperation(record.value(), cdcField); - writer.write(new RecordWrapper(row, op)); - } + writer.write(row); } } catch (Exception e) { throw new DataException( @@ -99,28 +93,6 @@ private Record convertToRow(SinkRecord record) { return row; } - private Operation extractCdcOperation(Object recordValue, String cdcField) { - Object opValue = Utilities.extractFromRecordValue(recordValue, cdcField); - - if (opValue == null) { - return Operation.INSERT; - } - - String opStr = opValue.toString().trim().toUpperCase(); - if (opStr.isEmpty()) { - return Operation.INSERT; - } - - switch (opStr.charAt(0)) { - case 'U': - return Operation.UPDATE; - case 'D': - return Operation.DELETE; - default: - return Operation.INSERT; - } - } - private void flush() { WriteResult writeResult; try { diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/Operation.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/Operation.java deleted file mode 100644 index 7f428a6dc2b6..000000000000 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/Operation.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.connect.data; - -public enum Operation { - INSERT, - UPDATE, - DELETE -} diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/PartitionedDeltaWriter.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/PartitionedDeltaWriter.java deleted file mode 100644 index eae5d116c6c1..000000000000 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/PartitionedDeltaWriter.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.connect.data; - -import java.io.IOException; -import java.io.UncheckedIOException; -import java.util.Map; -import java.util.Set; -import org.apache.iceberg.FileFormat; -import org.apache.iceberg.PartitionKey; -import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.Schema; -import org.apache.iceberg.data.Record; -import org.apache.iceberg.io.FileAppenderFactory; -import org.apache.iceberg.io.FileIO; -import org.apache.iceberg.io.OutputFileFactory; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.util.Tasks; - -public class PartitionedDeltaWriter extends BaseDeltaTaskWriter { - private final PartitionKey partitionKey; - - private final Map writers = Maps.newHashMap(); - - PartitionedDeltaWriter( - PartitionSpec spec, - FileFormat format, - FileAppenderFactory appenderFactory, - OutputFileFactory fileFactory, - FileIO io, - long targetFileSize, - Schema schema, - Set identifierFieldIds, - boolean upsertMode) { - super( - spec, - format, - appenderFactory, - fileFactory, - io, - targetFileSize, - schema, - identifierFieldIds, - upsertMode); - this.partitionKey = new PartitionKey(spec, schema); - } - - @Override - RowDataDeltaWriter route(Record row) { - partitionKey.partition(wrapper().wrap(row)); - - RowDataDeltaWriter writer = writers.get(partitionKey); - if (writer == null) { - // NOTE: pass a copy of the partition key to the writer to prevent it from - // being modified - PartitionKey copiedKey = partitionKey.copy(); - writer = new RowDataDeltaWriter(copiedKey); - writers.put(copiedKey, writer); - } - - return writer; - } - - @Override - public void close() { - try { - Tasks.foreach(writers.values()) - .throwFailureWhenFinished() - .noRetry() - .run(RowDataDeltaWriter::close, IOException.class); - - writers.clear(); - } catch (IOException e) { - throw new UncheckedIOException("Failed to close equality delta writer", e); - } - } -} diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordProjection.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordProjection.java deleted file mode 100644 index 41972f2ccede..000000000000 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordProjection.java +++ /dev/null @@ -1,200 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.connect.data; - -import java.util.List; -import java.util.Map; -import org.apache.iceberg.Schema; -import org.apache.iceberg.data.Record; -import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.types.Types.ListType; -import org.apache.iceberg.types.Types.MapType; -import org.apache.iceberg.types.Types.NestedField; -import org.apache.iceberg.types.Types.StructType; - -/** - * This is modified from {@link org.apache.iceberg.util.StructProjection} to support record types. - */ -public class RecordProjection implements Record { - - /** - * Creates a projecting wrapper for {@link Record} rows. - * - *

This projection does not work with repeated types like lists and maps. - * - * @param dataSchema schema of rows wrapped by this projection - * @param projectedSchema result schema of the projected rows - * @return a wrapper to project rows - */ - public static RecordProjection create(Schema dataSchema, Schema projectedSchema) { - return new RecordProjection(dataSchema.asStruct(), projectedSchema.asStruct()); - } - - private final StructType type; - private final int[] positionMap; - private final RecordProjection[] nestedProjections; - private Record record; - - private RecordProjection(StructType structType, StructType projection) { - this(structType, projection, false); - } - - @SuppressWarnings("checkstyle:CyclomaticComplexity") - private RecordProjection(StructType structType, StructType projection, boolean allowMissing) { - this.type = projection; - this.positionMap = new int[projection.fields().size()]; - this.nestedProjections = new RecordProjection[projection.fields().size()]; - - // set up the projection positions and any nested projections that are needed - List dataFields = structType.fields(); - for (int pos = 0; pos < positionMap.length; pos += 1) { - NestedField projectedField = projection.fields().get(pos); - - boolean found = false; - for (int i = 0; !found && i < dataFields.size(); i += 1) { - NestedField dataField = dataFields.get(i); - if (projectedField.fieldId() == dataField.fieldId()) { - found = true; - positionMap[pos] = i; - switch (projectedField.type().typeId()) { - case STRUCT: - nestedProjections[pos] = - new RecordProjection( - dataField.type().asStructType(), projectedField.type().asStructType()); - break; - case MAP: - MapType projectedMap = projectedField.type().asMapType(); - MapType originalMap = dataField.type().asMapType(); - - boolean keyProjectable = - !projectedMap.keyType().isNestedType() - || projectedMap.keyType().equals(originalMap.keyType()); - boolean valueProjectable = - !projectedMap.valueType().isNestedType() - || projectedMap.valueType().equals(originalMap.valueType()); - Preconditions.checkArgument( - keyProjectable && valueProjectable, - "Cannot project a partial map key or value struct. Trying to project %s out of %s", - projectedField, - dataField); - - nestedProjections[pos] = null; - break; - case LIST: - ListType projectedList = projectedField.type().asListType(); - ListType originalList = dataField.type().asListType(); - - boolean elementProjectable = - !projectedList.elementType().isNestedType() - || projectedList.elementType().equals(originalList.elementType()); - Preconditions.checkArgument( - elementProjectable, - "Cannot project a partial list element struct. Trying to project %s out of %s", - projectedField, - dataField); - - nestedProjections[pos] = null; - break; - default: - nestedProjections[pos] = null; - } - } - } - - if (!found && projectedField.isOptional() && allowMissing) { - positionMap[pos] = -1; - nestedProjections[pos] = null; - } else if (!found) { - throw new IllegalArgumentException( - String.format("Cannot find field %s in %s", projectedField, structType)); - } - } - } - - public RecordProjection wrap(Record newRecord) { - this.record = newRecord; - return this; - } - - @Override - public int size() { - return type.fields().size(); - } - - @Override - public T get(int pos, Class javaClass) { - // struct can be null if wrap is not called first before the get call - // or if a null struct is wrapped. - if (record == null) { - return null; - } - - int recordPos = positionMap[pos]; - if (nestedProjections[pos] != null) { - Record nestedStruct = record.get(recordPos, Record.class); - if (nestedStruct == null) { - return null; - } - - return javaClass.cast(nestedProjections[pos].wrap(nestedStruct)); - } - - if (recordPos != -1) { - return record.get(recordPos, javaClass); - } else { - return null; - } - } - - @Override - public void set(int pos, T value) { - throw new UnsupportedOperationException("RecordProjection.set(int, Object) is not supported"); - } - - @Override - public StructType struct() { - return type; - } - - @Override - public Object getField(String name) { - throw new UnsupportedOperationException("RecordProjection.getField(String) is not supported"); - } - - @Override - public void setField(String name, Object value) { - throw new UnsupportedOperationException( - "RecordProjection.setField(String, Object) is not supported"); - } - - @Override - public Object get(int pos) { - return get(pos, Object.class); - } - - @Override - public Record copy() { - throw new UnsupportedOperationException("RecordProjection.copy() is not supported"); - } - - @Override - public Record copy(Map overwriteValues) { - throw new UnsupportedOperationException("RecordProjection.copy(Map) is not supported"); - } -} diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordWrapper.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordWrapper.java deleted file mode 100644 index 915608562034..000000000000 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordWrapper.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.connect.data; - -import java.util.Map; -import org.apache.iceberg.data.Record; -import org.apache.iceberg.types.Types.StructType; - -public class RecordWrapper implements Record { - - private final Record delegate; - private final Operation op; - - public RecordWrapper(Record delegate, Operation op) { - this.delegate = delegate; - this.op = op; - } - - public Operation op() { - return op; - } - - @Override - public StructType struct() { - return delegate.struct(); - } - - @Override - public Object getField(String name) { - return delegate.getField(name); - } - - @Override - public void setField(String name, Object value) { - delegate.setField(name, value); - } - - @Override - public Object get(int pos) { - return delegate.get(pos); - } - - @Override - public Record copy() { - return new RecordWrapper(delegate.copy(), op); - } - - @Override - public Record copy(Map overwriteValues) { - return new RecordWrapper(delegate.copy(overwriteValues), op); - } - - @Override - public int size() { - return delegate.size(); - } - - @Override - public T get(int pos, Class javaClass) { - return delegate.get(pos, javaClass); - } - - @Override - public void set(int pos, T value) { - delegate.set(pos, value); - } -} diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/UnpartitionedDeltaWriter.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/UnpartitionedDeltaWriter.java deleted file mode 100644 index 46b0a45d532d..000000000000 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/UnpartitionedDeltaWriter.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.connect.data; - -import java.io.IOException; -import java.util.Set; -import org.apache.iceberg.FileFormat; -import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.Schema; -import org.apache.iceberg.data.Record; -import org.apache.iceberg.io.FileAppenderFactory; -import org.apache.iceberg.io.FileIO; -import org.apache.iceberg.io.OutputFileFactory; - -public class UnpartitionedDeltaWriter extends BaseDeltaTaskWriter { - private final RowDataDeltaWriter writer; - - UnpartitionedDeltaWriter( - PartitionSpec spec, - FileFormat format, - FileAppenderFactory appenderFactory, - OutputFileFactory fileFactory, - FileIO io, - long targetFileSize, - Schema schema, - Set identifierFieldIds, - boolean upsertMode) { - super( - spec, - format, - appenderFactory, - fileFactory, - io, - targetFileSize, - schema, - identifierFieldIds, - upsertMode); - this.writer = new RowDataDeltaWriter(null); - } - - @Override - RowDataDeltaWriter route(Record row) { - return writer; - } - - @Override - public void close() throws IOException { - writer.close(); - } -} diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/Utilities.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/Utilities.java index 2de58d0d7486..4ff83f777527 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/Utilities.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/Utilities.java @@ -226,47 +226,19 @@ public static TaskWriter createTableWriter( TaskWriter writer; if (table.spec().isUnpartitioned()) { - if (config.tablesCdcField() == null && !config.upsertModeEnabled()) { - writer = - new UnpartitionedWriter<>( - table.spec(), format, appenderFactory, fileFactory, table.io(), targetFileSize); - } else { - writer = - new UnpartitionedDeltaWriter( - table.spec(), - format, - appenderFactory, - fileFactory, - table.io(), - targetFileSize, - table.schema(), - identifierFieldIds, - config.upsertModeEnabled()); - } + writer = + new UnpartitionedWriter<>( + table.spec(), format, appenderFactory, fileFactory, table.io(), targetFileSize); } else { - if (config.tablesCdcField() == null && !config.upsertModeEnabled()) { - writer = - new PartitionedAppendWriter( - table.spec(), - format, - appenderFactory, - fileFactory, - table.io(), - targetFileSize, - table.schema()); - } else { - writer = - new PartitionedDeltaWriter( - table.spec(), - format, - appenderFactory, - fileFactory, - table.io(), - targetFileSize, - table.schema(), - identifierFieldIds, - config.upsertModeEnabled()); - } + writer = + new PartitionedAppendWriter( + table.spec(), + format, + appenderFactory, + fileFactory, + table.io(), + targetFileSize, + table.schema()); } return writer; } diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/PartitionedDeltaWriterTest.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/PartitionedDeltaWriterTest.java deleted file mode 100644 index b8b7f0b66f95..000000000000 --- a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/PartitionedDeltaWriterTest.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.connect.data; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import org.apache.iceberg.FileFormat; -import org.apache.iceberg.connect.IcebergSinkConfig; -import org.apache.iceberg.connect.TableSinkConfig; -import org.apache.iceberg.data.GenericRecord; -import org.apache.iceberg.data.Record; -import org.apache.iceberg.io.WriteResult; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; - -public class PartitionedDeltaWriterTest extends BaseWriterTest { - - @ParameterizedTest - @ValueSource(strings = {"parquet", "orc"}) - public void testPartitionedDeltaWriter(String format) { - IcebergSinkConfig config = mock(IcebergSinkConfig.class); - when(config.upsertModeEnabled()).thenReturn(true); - when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class)); - when(config.writeProps()).thenReturn(ImmutableMap.of("write.format.default", format)); - - when(table.spec()).thenReturn(SPEC); - - Record row1 = GenericRecord.create(SCHEMA); - row1.setField("id", 123L); - row1.setField("data", "hello world!"); - row1.setField("id2", 123L); - - Record row2 = GenericRecord.create(SCHEMA); - row2.setField("id", 234L); - row2.setField("data", "foobar"); - row2.setField("id2", 234L); - - WriteResult result = - writeTest(ImmutableList.of(row1, row2), config, PartitionedDeltaWriter.class); - - // in upsert mode, each write is a delete + append, so we'll have 1 data file - // and 1 delete file for each partition (2 total) - assertThat(result.dataFiles()).hasSize(2); - assertThat(result.dataFiles()).allMatch(file -> file.format() == FileFormat.fromString(format)); - assertThat(result.deleteFiles()).hasSize(2); - assertThat(result.deleteFiles()) - .allMatch(file -> file.format() == FileFormat.fromString(format)); - } -} diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/UnpartitionedDeltaWriterTest.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/UnpartitionedDeltaWriterTest.java deleted file mode 100644 index 947d965bffcd..000000000000 --- a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/UnpartitionedDeltaWriterTest.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.connect.data; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import org.apache.iceberg.FileFormat; -import org.apache.iceberg.connect.IcebergSinkConfig; -import org.apache.iceberg.connect.TableSinkConfig; -import org.apache.iceberg.data.GenericRecord; -import org.apache.iceberg.data.Record; -import org.apache.iceberg.io.WriteResult; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; - -public class UnpartitionedDeltaWriterTest extends BaseWriterTest { - - @ParameterizedTest - @ValueSource(strings = {"parquet", "orc"}) - public void testUnpartitionedDeltaWriter(String format) { - IcebergSinkConfig config = mock(IcebergSinkConfig.class); - when(config.upsertModeEnabled()).thenReturn(true); - when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class)); - when(config.writeProps()).thenReturn(ImmutableMap.of("write.format.default", format)); - - Record row = GenericRecord.create(SCHEMA); - row.setField("id", 123L); - row.setField("data", "hello world!"); - row.setField("id2", 123L); - - WriteResult result = writeTest(ImmutableList.of(row), config, UnpartitionedDeltaWriter.class); - - // in upsert mode, each write is a delete + append, so we'll have 1 data file - // and 1 delete file - assertThat(result.dataFiles()).hasSize(1); - assertThat(result.dataFiles()).allMatch(file -> file.format() == FileFormat.fromString(format)); - assertThat(result.deleteFiles()).hasSize(1); - assertThat(result.deleteFiles()) - .allMatch(file -> file.format() == FileFormat.fromString(format)); - } -}