From c2fd77a3f4c804572c4fbe6081e150510ca9e262 Mon Sep 17 00:00:00 2001 From: abharath9 Date: Sat, 14 Dec 2024 16:49:11 -0600 Subject: [PATCH] Flink: Add RowConverter for Iceberg Source (#11301) Co-authored-by: Bharath Kumar Avusherla --- .../flink/source/reader/RowConverter.java | 59 +++++ ...TestIcebergSourceBoundedConverterBase.java | 223 ++++++++++++++++++ ...TestIcebergSourceBoundedGenericRecord.java | 192 +++------------ .../source/TestIcebergSourceBoundedRow.java | 52 ++++ 4 files changed, 367 insertions(+), 159 deletions(-) create mode 100644 flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowConverter.java create mode 100644 flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedConverterBase.java create mode 100644 flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedRow.java diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowConverter.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowConverter.java new file mode 100644 index 000000000000..a84384fe17bf --- /dev/null +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowConverter.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source.reader; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.conversion.DataStructureConverter; +import org.apache.flink.table.data.conversion.DataStructureConverters; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.utils.TypeConversions; +import org.apache.flink.types.Row; +import org.apache.iceberg.flink.FlinkSchemaUtil; + +public class RowConverter implements RowDataConverter { + private final DataStructureConverter converter; + private final TypeInformation outputTypeInfo; + + private RowConverter(RowType rowType, TypeInformation rowTypeInfo) { + this.converter = + DataStructureConverters.getConverter(TypeConversions.fromLogicalToDataType(rowType)); + this.outputTypeInfo = rowTypeInfo; + } + + public static RowConverter fromIcebergSchema(org.apache.iceberg.Schema icebergSchema) { + RowType rowType = FlinkSchemaUtil.convert(icebergSchema); + TableSchema tableSchema = FlinkSchemaUtil.toSchema(icebergSchema); + RowTypeInfo rowTypeInfo = + new RowTypeInfo(tableSchema.getFieldTypes(), tableSchema.getFieldNames()); + return new RowConverter(rowType, rowTypeInfo); + } + + @Override + public Row apply(RowData rowData) { + return (Row) converter.toExternal(rowData); + } + + @Override + public TypeInformation getProducedType() { + return outputTypeInfo; + } +} diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedConverterBase.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedConverterBase.java new file mode 100644 index 000000000000..5ef387864b90 --- /dev/null +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedConverterBase.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.types.Row; +import org.apache.flink.util.CloseableIterator; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Parameter; +import org.apache.iceberg.ParameterizedTestExtension; +import org.apache.iceberg.Parameters; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.data.GenericAppenderHelper; +import org.apache.iceberg.data.RandomGenericData; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.flink.FlinkConfigOptions; +import org.apache.iceberg.flink.HadoopCatalogExtension; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.flink.TestFixtures; +import org.apache.iceberg.flink.TestHelpers; +import org.apache.iceberg.flink.source.assigner.SimpleSplitAssignerFactory; +import org.apache.iceberg.flink.source.reader.ReaderFunction; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.TypeUtil; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.io.TempDir; + +@ExtendWith(ParameterizedTestExtension.class) +public abstract class TestIcebergSourceBoundedConverterBase { + @TempDir protected Path temporaryFolder; + + @RegisterExtension + static final HadoopCatalogExtension CATALOG_EXTENSION = + new HadoopCatalogExtension(TestFixtures.DATABASE, TestFixtures.TABLE); + + @Parameters(name = "format={0}, parallelism = {1}, useConverter = {2}") + public static Object[][] parameters() { + return new Object[][] { + {FileFormat.AVRO, 2, true}, + {FileFormat.PARQUET, 2, true}, + {FileFormat.ORC, 2, true} + }; + } + + @Parameter(index = 0) + FileFormat fileFormat; + + @Parameter(index = 1) + int parallelism; + + @Parameter(index = 2) + boolean useConverter; + + @TestTemplate + public void testUnpartitionedTable() throws Exception { + Table table = + CATALOG_EXTENSION.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA); + List expectedRecords = RandomGenericData.generate(TestFixtures.SCHEMA, 2, 0L); + new GenericAppenderHelper(table, fileFormat, temporaryFolder).appendToTable(expectedRecords); + TestHelpers.assertRecords(run(), expectedRecords, TestFixtures.SCHEMA); + } + + @TestTemplate + public void testPartitionedTable() throws Exception { + String dateStr = "2020-03-20"; + Table table = getPartitionedTable(); + List expectedRecords = RandomGenericData.generate(TestFixtures.SCHEMA, 2, 0L); + for (Record expectedRecord : expectedRecords) { + expectedRecord.setField("dt", dateStr); + } + addRecordsToPartitionedTable(table, dateStr, expectedRecords); + TestHelpers.assertRecords(run(), expectedRecords, TestFixtures.SCHEMA); + } + + @TestTemplate + public void testProjection() throws Exception { + Table table = getPartitionedTable(); + List expectedRecords = RandomGenericData.generate(TestFixtures.SCHEMA, 2, 0L); + addRecordsToPartitionedTable(table, "2020-03-20", expectedRecords); + // select the "data" field (fieldId == 1) + Schema projectedSchema = TypeUtil.select(TestFixtures.SCHEMA, Sets.newHashSet(1)); + List expectedRows = + Arrays.asList(Row.of(expectedRecords.get(0).get(0)), Row.of(expectedRecords.get(1).get(0))); + TestHelpers.assertRows( + run(projectedSchema, Collections.emptyList(), Collections.emptyMap()), expectedRows); + } + + static Table getPartitionedTable() { + return CATALOG_EXTENSION + .catalog() + .createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA, TestFixtures.SPEC); + } + + static TableLoader tableLoader() { + return CATALOG_EXTENSION.tableLoader(); + } + + private void addRecordsToPartitionedTable( + Table table, String dateStr, List expectedRecords) throws IOException { + new GenericAppenderHelper(table, fileFormat, temporaryFolder) + .appendToTable(org.apache.iceberg.TestHelpers.Row.of(dateStr, 0), expectedRecords); + } + + private List run() throws Exception { + return run(null, Collections.emptyList(), Collections.emptyMap()); + } + + private List run( + Schema projectedSchema, List filters, Map options) + throws Exception { + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(parallelism); + env.getConfig().enableObjectReuse(); + + Configuration config = new Configuration(); + config.setInteger(FlinkConfigOptions.SOURCE_READER_FETCH_BATCH_RECORD_COUNT, 128); + Table table; + try (TableLoader tableLoader = tableLoader()) { + tableLoader.open(); + table = tableLoader.loadTable(); + } + + Schema readSchema = projectedSchema != null ? projectedSchema : table.schema(); + IcebergSource.Builder sourceBuilder = + getSourceBuilder(projectedSchema, filters, readSchema, config, table); + + if (projectedSchema != null) { + sourceBuilder.project(projectedSchema); + } + + sourceBuilder.filters(filters); + sourceBuilder.setAll(options); + + DataStream inputStream = + env.fromSource( + sourceBuilder.build(), + WatermarkStrategy.noWatermarks(), + "testBasicRead", + getTypeInfo(readSchema)); + + DataStream stream = mapToRow(inputStream, readSchema); + + try (CloseableIterator iter = stream.executeAndCollect()) { + return Lists.newArrayList(iter); + } + } + + private IcebergSource.Builder getSourceBuilder( + Schema projectedSchema, + List filters, + Schema readSchema, + Configuration config, + Table table) + throws Exception { + if (useConverter) { + return createSourceBuilderWithConverter(readSchema, config, table); + } + return createSourceBuilderWithReaderFunction(table, projectedSchema, filters, config); + } + + private IcebergSource.Builder createSourceBuilderWithConverter( + Schema readSchema, Configuration config, Table table) throws Exception { + return IcebergSource.forOutputType(getConverter(readSchema, table)) + .tableLoader(tableLoader()) + .assignerFactory(new SimpleSplitAssignerFactory()) + .flinkConfig(config); + } + + private IcebergSource.Builder createSourceBuilderWithReaderFunction( + Table table, Schema projected, List filters, Configuration config) + throws Exception { + return IcebergSource.builder() + .tableLoader(tableLoader()) + .readerFunction(getReaderFunction(projected, table, filters)) + .assignerFactory(new SimpleSplitAssignerFactory()) + .flinkConfig(config); + } + + protected abstract org.apache.iceberg.flink.source.reader.RowDataConverter getConverter( + org.apache.iceberg.Schema icebergSchema, Table table) throws Exception; + + protected ReaderFunction getReaderFunction( + org.apache.iceberg.Schema icebergSchema, Table table, List filters) + throws Exception { + throw new UnsupportedOperationException("No default implementation for getReaderFunction"); + } + + protected abstract TypeInformation getTypeInfo(Schema icebergSchema); + + protected abstract DataStream mapToRow(DataStream inputStream, Schema icebergSchema); +} diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedGenericRecord.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedGenericRecord.java index 4e649d15b1ce..faddce542285 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedGenericRecord.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedGenericRecord.java @@ -18,57 +18,34 @@ */ package org.apache.iceberg.flink.source; -import java.nio.file.Path; -import java.util.Arrays; -import java.util.Collections; import java.util.List; -import java.util.Map; import org.apache.avro.generic.GenericRecord; -import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.types.Row; -import org.apache.flink.util.CloseableIterator; import org.apache.iceberg.FileFormat; -import org.apache.iceberg.Parameter; import org.apache.iceberg.ParameterizedTestExtension; import org.apache.iceberg.Parameters; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.avro.AvroSchemaUtil; -import org.apache.iceberg.data.GenericAppenderHelper; -import org.apache.iceberg.data.RandomGenericData; -import org.apache.iceberg.data.Record; import org.apache.iceberg.expressions.Expression; -import org.apache.iceberg.flink.FlinkConfigOptions; import org.apache.iceberg.flink.FlinkSchemaUtil; -import org.apache.iceberg.flink.HadoopCatalogExtension; -import org.apache.iceberg.flink.TableLoader; import org.apache.iceberg.flink.TestFixtures; -import org.apache.iceberg.flink.TestHelpers; import org.apache.iceberg.flink.data.RowDataToRowMapper; import org.apache.iceberg.flink.sink.AvroGenericRecordToRowDataMapper; -import org.apache.iceberg.flink.source.assigner.SimpleSplitAssignerFactory; import org.apache.iceberg.flink.source.reader.AvroGenericRecordConverter; import org.apache.iceberg.flink.source.reader.AvroGenericRecordReaderFunction; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; -import org.apache.iceberg.types.TypeUtil; -import org.junit.jupiter.api.TestTemplate; +import org.apache.iceberg.flink.source.reader.ReaderFunction; +import org.apache.iceberg.flink.source.reader.RowDataConverter; import org.junit.jupiter.api.extension.ExtendWith; -import org.junit.jupiter.api.extension.RegisterExtension; -import org.junit.jupiter.api.io.TempDir; @ExtendWith(ParameterizedTestExtension.class) -public class TestIcebergSourceBoundedGenericRecord { - @TempDir protected Path temporaryFolder; - - @RegisterExtension - private static final HadoopCatalogExtension CATALOG_EXTENSION = - new HadoopCatalogExtension(TestFixtures.DATABASE, TestFixtures.TABLE); +public class TestIcebergSourceBoundedGenericRecord + extends TestIcebergSourceBoundedConverterBase { @Parameters(name = "format={0}, parallelism = {1}, useConverter = {2}") public static Object[][] parameters() { @@ -80,143 +57,40 @@ public static Object[][] parameters() { }; } - @Parameter(index = 0) - private FileFormat fileFormat; - - @Parameter(index = 1) - private int parallelism; - - @Parameter(index = 2) - private boolean useConverter; - - @TestTemplate - public void testUnpartitionedTable() throws Exception { - Table table = - CATALOG_EXTENSION.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA); - List expectedRecords = RandomGenericData.generate(TestFixtures.SCHEMA, 2, 0L); - new GenericAppenderHelper(table, fileFormat, temporaryFolder).appendToTable(expectedRecords); - TestHelpers.assertRecords(run(), expectedRecords, TestFixtures.SCHEMA); + @Override + protected RowDataConverter getConverter(Schema icebergSchema, Table table) { + return AvroGenericRecordConverter.fromIcebergSchema(icebergSchema, table.name()); } - @TestTemplate - public void testPartitionedTable() throws Exception { - String dateStr = "2020-03-20"; - Table table = - CATALOG_EXTENSION - .catalog() - .createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA, TestFixtures.SPEC); - List expectedRecords = RandomGenericData.generate(TestFixtures.SCHEMA, 2, 0L); - for (int i = 0; i < expectedRecords.size(); ++i) { - expectedRecords.get(i).setField("dt", dateStr); - } - - new GenericAppenderHelper(table, fileFormat, temporaryFolder) - .appendToTable(org.apache.iceberg.TestHelpers.Row.of(dateStr, 0), expectedRecords); - TestHelpers.assertRecords(run(), expectedRecords, TestFixtures.SCHEMA); + @Override + protected ReaderFunction getReaderFunction( + Schema icebergSchema, Table table, List filters) throws Exception { + return new AvroGenericRecordReaderFunction( + TestFixtures.TABLE_IDENTIFIER.name(), + new Configuration(), + table.schema(), + icebergSchema, + null, + false, + table.io(), + table.encryption(), + filters); } - @TestTemplate - public void testProjection() throws Exception { - Table table = - CATALOG_EXTENSION - .catalog() - .createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA, TestFixtures.SPEC); - List expectedRecords = RandomGenericData.generate(TestFixtures.SCHEMA, 2, 0L); - new GenericAppenderHelper(table, fileFormat, temporaryFolder) - .appendToTable(org.apache.iceberg.TestHelpers.Row.of("2020-03-20", 0), expectedRecords); - // select the "data" field (fieldId == 1) - Schema projectedSchema = TypeUtil.select(TestFixtures.SCHEMA, Sets.newHashSet(1)); - List expectedRows = - Arrays.asList(Row.of(expectedRecords.get(0).get(0)), Row.of(expectedRecords.get(1).get(0))); - TestHelpers.assertRows( - run(projectedSchema, Collections.emptyList(), Collections.emptyMap()), expectedRows); - } - - private List run() throws Exception { - return run(null, Collections.emptyList(), Collections.emptyMap()); - } - - private List run( - Schema projectedSchema, List filters, Map options) - throws Exception { - - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(parallelism); - env.getConfig().enableObjectReuse(); - - Configuration config = new Configuration(); - config.setInteger(FlinkConfigOptions.SOURCE_READER_FETCH_BATCH_RECORD_COUNT, 128); - Table table; - try (TableLoader tableLoader = CATALOG_EXTENSION.tableLoader()) { - tableLoader.open(); - table = tableLoader.loadTable(); - } - - Schema readSchema = projectedSchema != null ? projectedSchema : table.schema(); - IcebergSource.Builder sourceBuilder; - if (useConverter) { - sourceBuilder = createSourceBuilderWithConverter(table, readSchema, config); - } else { - sourceBuilder = - createSourceBuilderWithReaderFunction(table, projectedSchema, filters, config); - } - - if (projectedSchema != null) { - sourceBuilder.project(projectedSchema); - } - - sourceBuilder.filters(filters); - sourceBuilder.setAll(options); - - RowType rowType = FlinkSchemaUtil.convert(readSchema); + @Override + protected TypeInformation getTypeInfo(Schema icebergSchema) { org.apache.avro.Schema avroSchema = - AvroSchemaUtil.convert(readSchema, TestFixtures.TABLE_IDENTIFIER.name()); - - DataStream stream = - env.fromSource( - sourceBuilder.build(), - WatermarkStrategy.noWatermarks(), - "testBasicRead", - new GenericRecordAvroTypeInfo(avroSchema)) - // There are two reasons for converting GenericRecord back to Row. - // 1. Avro GenericRecord/Schema is not serializable. - // 2. leverage the TestHelpers.assertRecords for validation. - .map(AvroGenericRecordToRowDataMapper.forAvroSchema(avroSchema)) - .map(new RowDataToRowMapper(rowType)); - - try (CloseableIterator iter = stream.executeAndCollect()) { - return Lists.newArrayList(iter); - } + AvroSchemaUtil.convert(icebergSchema, TestFixtures.TABLE_IDENTIFIER.name()); + return new GenericRecordAvroTypeInfo(avroSchema); } - private IcebergSource.Builder createSourceBuilderWithReaderFunction( - Table table, Schema projected, List filters, Configuration config) { - AvroGenericRecordReaderFunction readerFunction = - new AvroGenericRecordReaderFunction( - TestFixtures.TABLE_IDENTIFIER.name(), - new Configuration(), - table.schema(), - projected, - null, - false, - table.io(), - table.encryption(), - filters); - - return IcebergSource.builder() - .tableLoader(CATALOG_EXTENSION.tableLoader()) - .readerFunction(readerFunction) - .assignerFactory(new SimpleSplitAssignerFactory()) - .flinkConfig(config); - } - - private IcebergSource.Builder createSourceBuilderWithConverter( - Table table, Schema readSchema, Configuration config) { - AvroGenericRecordConverter converter = - AvroGenericRecordConverter.fromIcebergSchema(readSchema, table.name()); - return IcebergSource.forOutputType(converter) - .tableLoader(CATALOG_EXTENSION.tableLoader()) - .assignerFactory(new SimpleSplitAssignerFactory()) - .flinkConfig(config); + @Override + protected DataStream mapToRow(DataStream inputStream, Schema icebergSchema) { + RowType rowType = FlinkSchemaUtil.convert(icebergSchema); + org.apache.avro.Schema avroSchema = + AvroSchemaUtil.convert(icebergSchema, TestFixtures.TABLE_IDENTIFIER.name()); + return inputStream + .map(AvroGenericRecordToRowDataMapper.forAvroSchema(avroSchema)) + .map(new RowDataToRowMapper(rowType)); } } diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedRow.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedRow.java new file mode 100644 index 000000000000..170069fecb0e --- /dev/null +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedRow.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.types.Row; +import org.apache.iceberg.ParameterizedTestExtension; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.flink.source.reader.RowConverter; +import org.apache.iceberg.flink.source.reader.RowDataConverter; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(ParameterizedTestExtension.class) +public class TestIcebergSourceBoundedRow extends TestIcebergSourceBoundedConverterBase { + + @Override + protected RowDataConverter getConverter(Schema icebergSchema, Table table) { + return RowConverter.fromIcebergSchema(icebergSchema); + } + + @Override + protected TypeInformation getTypeInfo(Schema icebergSchema) { + TableSchema tableSchema = FlinkSchemaUtil.toSchema(icebergSchema); + return new RowTypeInfo(tableSchema.getFieldTypes(), tableSchema.getFieldNames()); + } + + @Override + protected DataStream mapToRow(DataStream inputStream, Schema icebergSchema) { + return inputStream; + } +}