diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/CompletedFetch.java b/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/CompletedFetch.java index 246279af..6fe9c1e3 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/CompletedFetch.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/CompletedFetch.java @@ -24,11 +24,12 @@ import com.alibaba.fluss.record.LogRecord; import com.alibaba.fluss.record.LogRecordBatch; import com.alibaba.fluss.record.LogRecordReadContext; +import com.alibaba.fluss.row.GenericRow; import com.alibaba.fluss.row.InternalRow; import com.alibaba.fluss.row.InternalRow.FieldGetter; +import com.alibaba.fluss.row.ProjectedRow; import com.alibaba.fluss.rpc.messages.FetchLogRequest; import com.alibaba.fluss.rpc.protocol.ApiError; -import com.alibaba.fluss.types.RowType; import com.alibaba.fluss.utils.CloseableIterator; import com.alibaba.fluss.utils.Projection; @@ -60,7 +61,7 @@ abstract class CompletedFetch { private final boolean isCheckCrcs; private final Iterator batches; private final LogScannerStatus logScannerStatus; - private final LogRecordReadContext readContext; + protected final LogRecordReadContext readContext; @Nullable protected final Projection projection; protected final InternalRow.FieldGetter[] fieldGetters; @@ -95,15 +96,37 @@ public CompletedFetch( this.logScannerStatus = logScannerStatus; this.projection = projection; this.nextFetchOffset = fetchOffset; - RowType rowType = readContext.getRowType(); - this.fieldGetters = new FieldGetter[rowType.getFieldCount()]; + this.fieldGetters = buildFieldGetters(); + } + + /** + * Builds the field getters for fetch. + * + * @return the field getters + */ + protected abstract FieldGetter[] buildFieldGetters(); + + // TODO: optimize this to avoid deep copying the record. + // refactor #fetchRecords to return an iterator which lazily deserialize + // from underlying record stream and arrow buffer. + ScanRecord toScanRecord(LogRecord record) { + GenericRow newRow = new GenericRow(fieldGetters.length); + InternalRow internalRow = record.getRow(); for (int i = 0; i < fieldGetters.length; i++) { - fieldGetters[i] = InternalRow.createFieldGetter(rowType.getChildren().get(i), i); + newRow.setField(i, fieldGetters[i].getFieldOrNull(internalRow)); + } + if (projection != null && projection.isReorderingNeeded()) { + return new ScanRecord( + record.logOffset(), + record.timestamp(), + record.getRowKind(), + ProjectedRow.from(projection.getReorderingIndexes()).replaceRow(newRow)); + } else { + return new ScanRecord( + record.logOffset(), record.timestamp(), record.getRowKind(), newRow); } } - protected abstract ScanRecord toScanRecord(LogRecord record); - boolean isConsumed() { return isConsumed; } diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/DefaultCompletedFetch.java b/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/DefaultCompletedFetch.java index 9a01ed39..fc480ae1 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/DefaultCompletedFetch.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/DefaultCompletedFetch.java @@ -17,15 +17,14 @@ package com.alibaba.fluss.client.scanner.log; import com.alibaba.fluss.annotation.Internal; -import com.alibaba.fluss.client.scanner.ScanRecord; +import com.alibaba.fluss.metadata.LogFormat; import com.alibaba.fluss.metadata.TableBucket; -import com.alibaba.fluss.record.LogRecord; import com.alibaba.fluss.record.LogRecordReadContext; -import com.alibaba.fluss.row.GenericRow; import com.alibaba.fluss.row.InternalRow; -import com.alibaba.fluss.row.ProjectedRow; +import com.alibaba.fluss.row.InternalRow.FieldGetter; import com.alibaba.fluss.rpc.entity.FetchLogResultForBucket; import com.alibaba.fluss.rpc.messages.FetchLogRequest; +import com.alibaba.fluss.types.RowType; import com.alibaba.fluss.utils.Projection; import javax.annotation.Nullable; @@ -58,25 +57,28 @@ public DefaultCompletedFetch( projection); } - // TODO: optimize this to avoid deep copying the record. - // refactor #fetchRecords to return an iterator which lazily deserialize - // from underlying record stream and arrow buffer. @Override - protected ScanRecord toScanRecord(LogRecord record) { - GenericRow newRow = new GenericRow(fieldGetters.length); - InternalRow internalRow = record.getRow(); - for (int i = 0; i < fieldGetters.length; i++) { - newRow.setField(i, fieldGetters[i].getFieldOrNull(internalRow)); - } - if (projection != null && projection.isReorderingNeeded()) { - return new ScanRecord( - record.logOffset(), - record.timestamp(), - record.getRowKind(), - ProjectedRow.from(projection.getReorderingIndexes()).replaceRow(newRow)); + protected FieldGetter[] buildFieldGetters() { + RowType rowType = readContext.getRowType(); + LogFormat logFormat = readContext.getLogFormat(); + FieldGetter[] fieldGetters; + // Arrow log format already project in the server side. + if (projection != null && logFormat != LogFormat.ARROW) { + int[] projectionInOrder = projection.getProjectionInOrder(); + fieldGetters = new FieldGetter[projectionInOrder.length]; + for (int i = 0; i < fieldGetters.length; i++) { + fieldGetters[i] = + InternalRow.createFieldGetter( + rowType.getChildren().get(projectionInOrder[i]), + projectionInOrder[i]); + } } else { - return new ScanRecord( - record.logOffset(), record.timestamp(), record.getRowKind(), newRow); + fieldGetters = new FieldGetter[rowType.getChildren().size()]; + for (int i = 0; i < fieldGetters.length; i++) { + fieldGetters[i] = InternalRow.createFieldGetter(rowType.getChildren().get(i), i); + } } + + return fieldGetters; } } diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/FlussLogScanner.java b/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/FlussLogScanner.java index 70fa10a8..0d98129e 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/FlussLogScanner.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/FlussLogScanner.java @@ -22,7 +22,6 @@ import com.alibaba.fluss.client.scanner.RemoteFileDownloader; import com.alibaba.fluss.client.scanner.ScanRecord; import com.alibaba.fluss.config.Configuration; -import com.alibaba.fluss.metadata.LogFormat; import com.alibaba.fluss.metadata.TableBucket; import com.alibaba.fluss.metadata.TableInfo; import com.alibaba.fluss.metadata.TablePath; @@ -113,13 +112,6 @@ public FlussLogScanner( private Projection sanityProjection(@Nullable int[] projectedFields, TableInfo tableInfo) { RowType tableRowType = tableInfo.getTableDescriptor().getSchema().toRowType(); if (projectedFields != null) { - LogFormat logFormat = tableInfo.getTableDescriptor().getLogFormat(); - if (logFormat != LogFormat.ARROW) { - throw new IllegalArgumentException( - String.format( - "Only ARROW log format supports column projection, but the log format of table '%s' is %s", - tableInfo.getTablePath(), logFormat)); - } for (int projectedField : projectedFields) { if (projectedField < 0 || projectedField >= tableRowType.getFieldCount()) { throw new IllegalArgumentException( diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/LogFetcher.java b/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/LogFetcher.java index f047572b..d98f35b4 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/LogFetcher.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/LogFetcher.java @@ -27,6 +27,7 @@ import com.alibaba.fluss.config.Configuration; import com.alibaba.fluss.exception.InvalidMetadataException; import com.alibaba.fluss.fs.FsPath; +import com.alibaba.fluss.metadata.LogFormat; import com.alibaba.fluss.metadata.PhysicalTablePath; import com.alibaba.fluss.metadata.TableBucket; import com.alibaba.fluss.metadata.TableInfo; @@ -87,7 +88,6 @@ public class LogFetcher implements Closeable { private final LogRecordReadContext remoteReadContext; @Nullable private final Projection projection; private final RpcClient rpcClient; - private final Configuration conf; private final int maxFetchBytes; private final int maxBucketFetchBytes; private final boolean isCheckCrcs; @@ -95,6 +95,7 @@ public class LogFetcher implements Closeable { private final LogFetchBuffer logFetchBuffer; private final LogFetchCollector logFetchCollector; private final RemoteLogDownloader remoteLogDownloader; + private final LogFormat logFormat; @GuardedBy("this") private final Set nodesWithPendingFetchRequests; @@ -116,12 +117,12 @@ public LogFetcher( RemoteFileDownloader remoteFileDownloader) { this.tablePath = tableInfo.getTablePath(); this.isPartitioned = tableInfo.getTableDescriptor().isPartitioned(); + this.logFormat = tableInfo.getTableDescriptor().getLogFormat(); this.readContext = LogRecordReadContext.createReadContext(tableInfo, projection); this.remoteReadContext = LogRecordReadContext.createReadContext(tableInfo, null); this.projection = projection; this.rpcClient = rpcClient; this.logScannerStatus = logScannerStatus; - this.conf = conf; this.maxFetchBytes = (int) conf.get(ConfigOptions.LOG_FETCH_MAX_BYTES).getBytes(); this.maxBucketFetchBytes = (int) conf.get(ConfigOptions.LOG_FETCH_MAX_BYTES_FOR_BUCKET).getBytes(); @@ -416,7 +417,7 @@ private Map prepareFetchLogRequests() { .setMaxBytes(maxFetchBytes); PbFetchLogReqForTable reqForTable = new PbFetchLogReqForTable().setTableId(finalTableId); - if (projection != null) { + if (projectPushDownEnable()) { reqForTable .setProjectionPushdownEnabled(true) .setProjectedFields(projection.getProjectionInOrder()); @@ -442,6 +443,13 @@ private List fetchableBuckets() { return logScannerStatus.fetchableBuckets(tableBucket -> !exclude.contains(tableBucket)); } + private boolean projectPushDownEnable() { + // Currently, only ARROW log format supports projection push down to server. Other log + // formats will do project in client, see DefaultCompletedFetch#toScanRecord() for more + // details. + return projection != null && logFormat == LogFormat.ARROW; + } + private Integer getTableBucketLeader(TableBucket tableBucket) { metadataUpdater.checkAndUpdateMetadata(tablePath, tableBucket); if (metadataUpdater.getBucketLocation(tableBucket).isPresent()) { diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/RemoteCompletedFetch.java b/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/RemoteCompletedFetch.java index c2778914..39e0ecfb 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/RemoteCompletedFetch.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/RemoteCompletedFetch.java @@ -17,15 +17,13 @@ package com.alibaba.fluss.client.scanner.log; import com.alibaba.fluss.annotation.Internal; -import com.alibaba.fluss.client.scanner.ScanRecord; import com.alibaba.fluss.metadata.TableBucket; import com.alibaba.fluss.record.FileLogRecords; -import com.alibaba.fluss.record.LogRecord; import com.alibaba.fluss.record.LogRecordReadContext; -import com.alibaba.fluss.row.GenericRow; import com.alibaba.fluss.row.InternalRow; -import com.alibaba.fluss.row.ProjectedRow; +import com.alibaba.fluss.row.InternalRow.FieldGetter; import com.alibaba.fluss.rpc.protocol.ApiError; +import com.alibaba.fluss.types.RowType; import com.alibaba.fluss.utils.Projection; import javax.annotation.Nullable; @@ -82,25 +80,26 @@ void drain() { recycleCallback.run(); } - // TODO: optimize this to avoid deep copying the record. - // refactor #fetchRecords to return an iterator which lazily deserialize - // from underlying record stream and arrow buffer. @Override - protected ScanRecord toScanRecord(LogRecord record) { - GenericRow newRow = new GenericRow(fieldGetters.length); - InternalRow internalRow = record.getRow(); - for (int i = 0; i < fieldGetters.length; i++) { - newRow.setField(i, fieldGetters[i].getFieldOrNull(internalRow)); - } + protected FieldGetter[] buildFieldGetters() { + RowType rowType = readContext.getRowType(); + FieldGetter[] fieldGetters; if (projection != null) { - return new ScanRecord( - record.logOffset(), - record.timestamp(), - record.getRowKind(), - ProjectedRow.from(projection.projection()).replaceRow(newRow)); + int[] projectionInOrder = projection.getProjectionInOrder(); + fieldGetters = new FieldGetter[projectionInOrder.length]; + for (int i = 0; i < fieldGetters.length; i++) { + fieldGetters[i] = + InternalRow.createFieldGetter( + rowType.getChildren().get(projectionInOrder[i]), + projectionInOrder[i]); + } } else { - return new ScanRecord( - record.logOffset(), record.timestamp(), record.getRowKind(), newRow); + fieldGetters = new FieldGetter[rowType.getChildren().size()]; + for (int i = 0; i < fieldGetters.length; i++) { + fieldGetters[i] = InternalRow.createFieldGetter(rowType.getChildren().get(i), i); + } } + + return fieldGetters; } } diff --git a/fluss-client/src/test/java/com/alibaba/fluss/client/scanner/log/DefaultCompletedFetchTest.java b/fluss-client/src/test/java/com/alibaba/fluss/client/scanner/log/DefaultCompletedFetchTest.java index 007ba676..4a1780f2 100644 --- a/fluss-client/src/test/java/com/alibaba/fluss/client/scanner/log/DefaultCompletedFetchTest.java +++ b/fluss-client/src/test/java/com/alibaba/fluss/client/scanner/log/DefaultCompletedFetchTest.java @@ -17,7 +17,10 @@ package com.alibaba.fluss.client.scanner.log; import com.alibaba.fluss.client.scanner.ScanRecord; +import com.alibaba.fluss.metadata.LogFormat; +import com.alibaba.fluss.metadata.Schema; import com.alibaba.fluss.metadata.TableBucket; +import com.alibaba.fluss.metadata.TableDescriptor; import com.alibaba.fluss.metadata.TableInfo; import com.alibaba.fluss.record.FileLogProjection; import com.alibaba.fluss.record.FileLogRecords; @@ -26,6 +29,7 @@ import com.alibaba.fluss.record.RowKind; import com.alibaba.fluss.row.InternalRow; import com.alibaba.fluss.rpc.entity.FetchLogResultForBucket; +import com.alibaba.fluss.types.DataTypes; import com.alibaba.fluss.types.RowType; import com.alibaba.fluss.utils.FlussPaths; import com.alibaba.fluss.utils.Projection; @@ -33,6 +37,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.io.File; import java.nio.ByteBuffer; @@ -46,6 +52,7 @@ import static com.alibaba.fluss.record.TestData.DATA2_ROW_TYPE; import static com.alibaba.fluss.record.TestData.DATA2_TABLE_ID; import static com.alibaba.fluss.record.TestData.DATA2_TABLE_INFO; +import static com.alibaba.fluss.record.TestData.DATA2_TABLE_PATH; import static com.alibaba.fluss.record.TestData.DEFAULT_SCHEMA_ID; import static com.alibaba.fluss.testutils.DataTestUtils.createRecordsWithoutBaseLogOffset; import static org.assertj.core.api.Assertions.assertThat; @@ -75,7 +82,8 @@ void testSimple() throws Exception { int bucketId = 0; // records for 0-10. TableBucket tb = new TableBucket(DATA2_TABLE_ID, bucketId); FetchLogResultForBucket resultForBucket0 = - new FetchLogResultForBucket(tb, createMemoryLogRecords(DATA2), 10L); + new FetchLogResultForBucket( + tb, createMemoryLogRecords(DATA2, LogFormat.ARROW), 10L); DefaultCompletedFetch defaultCompletedFetch = makeCompletedFetch(tb, resultForBucket0, fetchOffset); List scanRecords = defaultCompletedFetch.fetchRecords(8); @@ -96,7 +104,8 @@ void testNegativeFetchCount() throws Exception { int bucketId = 0; // records for 0-10. TableBucket tb = new TableBucket(DATA2_TABLE_ID, bucketId); FetchLogResultForBucket resultForBucket0 = - new FetchLogResultForBucket(tb, createMemoryLogRecords(DATA2), 10L); + new FetchLogResultForBucket( + tb, createMemoryLogRecords(DATA2, LogFormat.ARROW), 10L); DefaultCompletedFetch defaultCompletedFetch = makeCompletedFetch(tb, resultForBucket0, fetchOffset); List scanRecords = defaultCompletedFetch.fetchRecords(-10); @@ -116,14 +125,37 @@ void testNoRecordsInFetch() { assertThat(scanRecords.size()).isEqualTo(0); } - @Test - void testProjection() throws Exception { + @ParameterizedTest + @ValueSource(strings = {"INDEXED", "ARROW"}) + void testProjection(String format) throws Exception { + LogFormat logFormat = LogFormat.fromString(format); + Schema schema = + Schema.newBuilder() + .column("a", DataTypes.INT()) + .withComment("a is first column") + .column("b", DataTypes.STRING()) + .withComment("b is second column") + .column("c", DataTypes.STRING()) + .withComment("c is adding column") + .build(); + tableInfo = + new TableInfo( + DATA2_TABLE_PATH, + DATA2_TABLE_ID, + TableDescriptor.builder().schema(schema).logFormat(logFormat).build(), + 1); long fetchOffset = 0L; int bucketId = 0; // records for 0-10. TableBucket tb = new TableBucket(DATA2_TABLE_ID, bucketId); Projection projection = Projection.of(new int[] {0, 2}); + MemoryLogRecords memoryLogRecords; + if (logFormat == LogFormat.ARROW) { + memoryLogRecords = genRecordsWithProjection(DATA2, projection); + } else { + memoryLogRecords = createMemoryLogRecords(DATA2, LogFormat.INDEXED); + } FetchLogResultForBucket resultForBucket0 = - new FetchLogResultForBucket(tb, genRecordsWithProjection(DATA2, projection), 10L); + new FetchLogResultForBucket(tb, memoryLogRecords, 10L); DefaultCompletedFetch defaultCompletedFetch = makeCompletedFetch(tb, resultForBucket0, fetchOffset, projection); List scanRecords = defaultCompletedFetch.fetchRecords(8); @@ -148,6 +180,7 @@ void testProjection() throws Exception { assertThat(row.getString(1).toString()).isEqualTo(expectObject[1]); } + // test projection reorder. defaultCompletedFetch = makeCompletedFetch( tb, resultForBucket0, fetchOffset, Projection.of(new int[] {2, 0})); @@ -184,8 +217,10 @@ private DefaultCompletedFetch makeCompletedFetch( projection); } - private MemoryLogRecords createMemoryLogRecords(List objects) throws Exception { - return createRecordsWithoutBaseLogOffset(rowType, DEFAULT_SCHEMA_ID, 0L, 1000L, objects); + private MemoryLogRecords createMemoryLogRecords(List objects, LogFormat logFormat) + throws Exception { + return createRecordsWithoutBaseLogOffset( + rowType, DEFAULT_SCHEMA_ID, 0L, 1000L, objects, logFormat); } private MemoryLogRecords genRecordsWithProjection(List objects, Projection projection) @@ -193,7 +228,8 @@ private MemoryLogRecords genRecordsWithProjection(List objects, Projec File logFile = FlussPaths.logFile(tempDir, 0L); FileLogRecords fileLogRecords = FileLogRecords.open(logFile, false, 1024 * 1024, false); fileLogRecords.append( - createRecordsWithoutBaseLogOffset(rowType, DEFAULT_SCHEMA_ID, 0L, 1000L, objects)); + createRecordsWithoutBaseLogOffset( + rowType, DEFAULT_SCHEMA_ID, 0L, 1000L, objects, LogFormat.ARROW)); fileLogRecords.flush(); FileLogProjection fileLogProjection = new FileLogProjection(); diff --git a/fluss-client/src/test/java/com/alibaba/fluss/client/scanner/log/RemoteCompletedFetchTest.java b/fluss-client/src/test/java/com/alibaba/fluss/client/scanner/log/RemoteCompletedFetchTest.java index 0920f925..bbca0aa0 100644 --- a/fluss-client/src/test/java/com/alibaba/fluss/client/scanner/log/RemoteCompletedFetchTest.java +++ b/fluss-client/src/test/java/com/alibaba/fluss/client/scanner/log/RemoteCompletedFetchTest.java @@ -18,14 +18,18 @@ import com.alibaba.fluss.client.scanner.ScanRecord; import com.alibaba.fluss.fs.FsPath; +import com.alibaba.fluss.metadata.LogFormat; import com.alibaba.fluss.metadata.PhysicalTablePath; +import com.alibaba.fluss.metadata.Schema; import com.alibaba.fluss.metadata.TableBucket; +import com.alibaba.fluss.metadata.TableDescriptor; import com.alibaba.fluss.metadata.TableInfo; import com.alibaba.fluss.record.FileLogRecords; import com.alibaba.fluss.record.LogRecordReadContext; import com.alibaba.fluss.record.RowKind; import com.alibaba.fluss.remote.RemoteLogSegment; import com.alibaba.fluss.row.InternalRow; +import com.alibaba.fluss.types.DataTypes; import com.alibaba.fluss.types.RowType; import com.alibaba.fluss.utils.Projection; @@ -33,6 +37,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import javax.annotation.Nullable; @@ -90,7 +96,8 @@ void testSimple() throws Exception { TableBucket tableBucket = new TableBucket(DATA2_TABLE_ID, 0); AtomicBoolean recycleCalled = new AtomicBoolean(false); FileLogRecords fileLogRecords = - createFileLogRecords(tableBucket, DATA2_PHYSICAL_TABLE_PATH, DATA2); + createFileLogRecords( + tableBucket, DATA2_PHYSICAL_TABLE_PATH, DATA2, LogFormat.ARROW); RemoteCompletedFetch completedFetch = makeCompletedFetch( tableBucket, @@ -123,7 +130,11 @@ void testFetchForPartitionTable() throws Exception { TableBucket tb = new TableBucket(DATA2_TABLE_ID, (long) 0, 0); AtomicBoolean recycleCalled = new AtomicBoolean(false); FileLogRecords fileLogRecords = - createFileLogRecords(tb, PhysicalTablePath.of(DATA2_TABLE_PATH, "20240904"), DATA2); + createFileLogRecords( + tb, + PhysicalTablePath.of(DATA2_TABLE_PATH, "20240904"), + DATA2, + LogFormat.ARROW); RemoteCompletedFetch completedFetch = makeCompletedFetch( tb, fileLogRecords, fetchOffset, null, () -> recycleCalled.set(true)); @@ -151,7 +162,8 @@ void testNegativeFetchCount() throws Exception { long fetchOffset = 0L; TableBucket tableBucket = new TableBucket(DATA2_TABLE_ID, 0); FileLogRecords fileLogRecords = - createFileLogRecords(tableBucket, DATA2_PHYSICAL_TABLE_PATH, DATA2); + createFileLogRecords( + tableBucket, DATA2_PHYSICAL_TABLE_PATH, DATA2, LogFormat.ARROW); RemoteCompletedFetch completedFetch = makeCompletedFetch(tableBucket, fileLogRecords, fetchOffset, null); @@ -165,7 +177,10 @@ void testNoRecordsInFetch() throws Exception { TableBucket tableBucket = new TableBucket(DATA2_TABLE_ID, 0); FileLogRecords fileLogRecords = createFileLogRecords( - tableBucket, DATA2_PHYSICAL_TABLE_PATH, Collections.emptyList()); + tableBucket, + DATA2_PHYSICAL_TABLE_PATH, + Collections.emptyList(), + LogFormat.ARROW); RemoteCompletedFetch completedFetch = makeCompletedFetch(tableBucket, fileLogRecords, fetchOffset, null); @@ -173,12 +188,29 @@ void testNoRecordsInFetch() throws Exception { assertThat(scanRecords.size()).isEqualTo(0); } - @Test - void testProjection() throws Exception { + @ParameterizedTest + @ValueSource(strings = {"INDEXED", "ARROW"}) + void testProjection(String format) throws Exception { + LogFormat logFormat = LogFormat.fromString(format); + Schema schema = + Schema.newBuilder() + .column("a", DataTypes.INT()) + .withComment("a is first column") + .column("b", DataTypes.STRING()) + .withComment("b is second column") + .column("c", DataTypes.STRING()) + .withComment("c is adding column") + .build(); + tableInfo = + new TableInfo( + DATA2_TABLE_PATH, + DATA2_TABLE_ID, + TableDescriptor.builder().schema(schema).logFormat(logFormat).build(), + 1); long fetchOffset = 0L; TableBucket tableBucket = new TableBucket(DATA2_TABLE_ID, 0); FileLogRecords fileLogRecords = - createFileLogRecords(tableBucket, DATA2_PHYSICAL_TABLE_PATH, DATA2); + createFileLogRecords(tableBucket, DATA2_PHYSICAL_TABLE_PATH, DATA2, logFormat); RemoteCompletedFetch completedFetch = makeCompletedFetch( tableBucket, fileLogRecords, fetchOffset, Projection.of(new int[] {0, 2})); @@ -222,7 +254,10 @@ void testProjection() throws Exception { } private FileLogRecords createFileLogRecords( - TableBucket tableBucket, PhysicalTablePath physicalTablePath, List objects) + TableBucket tableBucket, + PhysicalTablePath physicalTablePath, + List objects, + LogFormat logFormat) throws Exception { UUID segmentId = UUID.randomUUID(); RemoteLogSegment remoteLogSegment = @@ -235,7 +270,8 @@ private FileLogRecords createFileLogRecords( .segmentSizeInBytes(Integer.MAX_VALUE) .build(); File logFile = - genRemoteLogSegmentFile(DATA2_ROW_TYPE, tempDir, remoteLogSegment, objects, 0L); + genRemoteLogSegmentFile( + DATA2_ROW_TYPE, tempDir, remoteLogSegment, objects, 0L, logFormat); return FileLogRecords.open(logFile, false); } @@ -270,12 +306,14 @@ private static File genRemoteLogSegmentFile( File remoteLogTabletDir, RemoteLogSegment remoteLogSegment, List objects, - long baseOffset) + long baseOffset, + LogFormat logFormat) throws Exception { FsPath remoteLogSegmentDir = remoteLogSegmentDir( new FsPath(remoteLogTabletDir.getAbsolutePath()), remoteLogSegment.remoteLogSegmentId()); - return genLogFile(rowType, new File(remoteLogSegmentDir.toString()), objects, baseOffset); + return genLogFile( + rowType, new File(remoteLogSegmentDir.toString()), objects, baseOffset, logFormat); } } diff --git a/fluss-client/src/test/java/com/alibaba/fluss/client/scanner/log/RemoteLogScannerITCase.java b/fluss-client/src/test/java/com/alibaba/fluss/client/scanner/log/RemoteLogScannerITCase.java index 9ac8b544..c166ebb4 100644 --- a/fluss-client/src/test/java/com/alibaba/fluss/client/scanner/log/RemoteLogScannerITCase.java +++ b/fluss-client/src/test/java/com/alibaba/fluss/client/scanner/log/RemoteLogScannerITCase.java @@ -27,6 +27,7 @@ import com.alibaba.fluss.config.ConfigOptions; import com.alibaba.fluss.config.Configuration; import com.alibaba.fluss.config.MemorySize; +import com.alibaba.fluss.metadata.LogFormat; import com.alibaba.fluss.metadata.Schema; import com.alibaba.fluss.metadata.TableBucket; import com.alibaba.fluss.metadata.TableDescriptor; @@ -40,6 +41,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.time.Duration; import java.util.ArrayList; @@ -109,6 +112,88 @@ void testScanFromRemote() throws Exception { assertThat(rowList).containsExactlyInAnyOrderElementsOf(expectedRows); } + @ParameterizedTest + @ValueSource(strings = {"INDEXED", "ARROW"}) + void testScanFromRemoteAndProject(String format) throws Exception { + Schema schema = + Schema.newBuilder() + .column("a", DataTypes.INT()) + .column("b", DataTypes.INT()) + .column("c", DataTypes.STRING()) + .column("d", DataTypes.BIGINT()) + .build(); + TableDescriptor tableDescriptor = + TableDescriptor.builder() + .schema(schema) + .distributedBy(1) + .logFormat(LogFormat.fromString(format)) + .build(); + long tableId = createTable(DATA1_TABLE_PATH, tableDescriptor); + + // append a batch of data. + List expectedRows = new ArrayList<>(); + Table table = conn.getTable(DATA1_TABLE_PATH); + AppendWriter appendWriter = table.getAppendWriter(); + int expectedSize = 30; + for (int i = 0; i < expectedSize; i++) { + String value = i % 2 == 0 ? "hello, friend" + i : null; + InternalRow row = row(schema.toRowType(), new Object[] {i, 100, value, i * 10L}); + appendWriter.append(row); + if (i % 10 == 0) { + // insert 3 bathes, each batch has 10 rows + appendWriter.flush(); + } + } + + FLUSS_CLUSTER_EXTENSION.waitUtilSomeLogSegmentsCopyToRemote(new TableBucket(tableId, 0)); + + // test fetch. + LogScanner logScanner = createLogScanner(table, new int[] {0, 2}); + logScanner.subscribeFromBeginning(0); + int count = 0; + while (count < expectedSize) { + ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1)); + for (ScanRecord scanRecord : scanRecords) { + assertThat(scanRecord.getRowKind()).isEqualTo(RowKind.APPEND_ONLY); + assertThat(scanRecord.getRow().getFieldCount()).isEqualTo(2); + assertThat(scanRecord.getRow().getInt(0)).isEqualTo(count); + if (count % 2 == 0) { + assertThat(scanRecord.getRow().getString(1).toString()) + .isEqualTo("hello, friend" + count); + } else { + // check null values + assertThat(scanRecord.getRow().isNullAt(1)).isTrue(); + } + count++; + } + } + assertThat(count).isEqualTo(expectedSize); + logScanner.close(); + + // fetch data with projection reorder. + logScanner = createLogScanner(table, new int[] {2, 0}); + logScanner.subscribeFromBeginning(0); + count = 0; + while (count < expectedSize) { + ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1)); + for (ScanRecord scanRecord : scanRecords) { + assertThat(scanRecord.getRowKind()).isEqualTo(RowKind.APPEND_ONLY); + assertThat(scanRecord.getRow().getFieldCount()).isEqualTo(2); + assertThat(scanRecord.getRow().getInt(1)).isEqualTo(count); + if (count % 2 == 0) { + assertThat(scanRecord.getRow().getString(0).toString()) + .isEqualTo("hello, friend" + count); + } else { + // check null values + assertThat(scanRecord.getRow().isNullAt(0)).isTrue(); + } + count++; + } + } + assertThat(count).isEqualTo(expectedSize); + logScanner.close(); + } + @Test void testPartitionTableFetchFromRemote() throws Exception { final Schema data2NonPkSchema = @@ -186,4 +271,8 @@ private static Configuration initConfig() { conf.set(ConfigOptions.LOG_SEGMENT_FILE_SIZE, MemorySize.parse("1kb")); return conf; } + + private static LogScanner createLogScanner(Table table, int[] projectFields) { + return table.getLogScanner(new LogScan().withProjectedFields(projectFields)); + } } diff --git a/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java b/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java index 9d26a9b3..48abf51c 100644 --- a/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java +++ b/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java @@ -610,8 +610,9 @@ void verifyAppendOrPut(boolean append, String logFormat, @Nullable String kvForm } } - @Test - void testAppendAndProject() throws Exception { + @ParameterizedTest + @ValueSource(strings = {"INDEXED", "ARROW"}) + void testAppendAndProject(String format) throws Exception { Schema schema = Schema.newBuilder() .column("a", DataTypes.INT()) @@ -619,7 +620,11 @@ void testAppendAndProject() throws Exception { .column("c", DataTypes.STRING()) .column("d", DataTypes.BIGINT()) .build(); - TableDescriptor tableDescriptor = TableDescriptor.builder().schema(schema).build(); + TableDescriptor tableDescriptor = + TableDescriptor.builder() + .schema(schema) + .logFormat(LogFormat.fromString(format)) + .build(); TablePath tablePath = TablePath.of("test_db_1", "test_append_and_project"); createTable(tablePath, tableDescriptor, false); @@ -658,6 +663,29 @@ void testAppendAndProject() throws Exception { } assertThat(count).isEqualTo(expectedSize); logScanner.close(); + + // fetch data with projection reorder. + logScanner = createLogScanner(table, new int[] {2, 0}); + subscribeFromBeginning(logScanner, table); + count = 0; + while (count < expectedSize) { + ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1)); + for (ScanRecord scanRecord : scanRecords) { + assertThat(scanRecord.getRowKind()).isEqualTo(RowKind.APPEND_ONLY); + assertThat(scanRecord.getRow().getFieldCount()).isEqualTo(2); + assertThat(scanRecord.getRow().getInt(1)).isEqualTo(count); + if (count % 2 == 0) { + assertThat(scanRecord.getRow().getString(0).toString()) + .isEqualTo("hello, friend" + count); + } else { + // check null values + assertThat(scanRecord.getRow().isNullAt(0)).isTrue(); + } + count++; + } + } + assertThat(count).isEqualTo(expectedSize); + logScanner.close(); } } @@ -784,10 +812,9 @@ void testInvalidColumnProjection() throws Exception { Table table = conn.getTable(DATA1_TABLE_PATH); // validation on projection - assertThatThrownBy(() -> createLogScanner(table, new int[] {1})) + assertThatThrownBy(() -> createLogScanner(table, new int[] {1, 2, 3, 4, 5})) .isInstanceOf(IllegalArgumentException.class) .hasMessage( - "Only ARROW log format supports column projection, but the log format " - + "of table 'test_db_1.test_non_pk_table_1' is INDEXED"); + "Projected field index 2 is out of bound for schema ROW<`a` INT, `b` STRING>"); } } diff --git a/fluss-common/src/main/java/com/alibaba/fluss/record/LogRecordReadContext.java b/fluss-common/src/main/java/com/alibaba/fluss/record/LogRecordReadContext.java index f2a9460f..d136f510 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/record/LogRecordReadContext.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/record/LogRecordReadContext.java @@ -17,7 +17,6 @@ package com.alibaba.fluss.record; import com.alibaba.fluss.annotation.VisibleForTesting; -import com.alibaba.fluss.exception.InvalidColumnProjectionException; import com.alibaba.fluss.metadata.LogFormat; import com.alibaba.fluss.metadata.TableDescriptor; import com.alibaba.fluss.metadata.TableInfo; @@ -72,10 +71,6 @@ public static LogRecordReadContext createReadContext( return createArrowReadContext(projectedRowType, schemaId, vectorRoot, allocator); } } else if (logFormat == LogFormat.INDEXED) { - if (projection != null) { - throw new InvalidColumnProjectionException( - "Column projection is not supported for INDEXED log format."); - } return createIndexedReadContext(rowType, schemaId); } else { throw new IllegalArgumentException("Unsupported log format: " + logFormat); diff --git a/fluss-common/src/test/java/com/alibaba/fluss/record/FileLogProjectionTest.java b/fluss-common/src/test/java/com/alibaba/fluss/record/FileLogProjectionTest.java index 04d7470c..34e8bea8 100644 --- a/fluss-common/src/test/java/com/alibaba/fluss/record/FileLogProjectionTest.java +++ b/fluss-common/src/test/java/com/alibaba/fluss/record/FileLogProjectionTest.java @@ -17,6 +17,7 @@ package com.alibaba.fluss.record; import com.alibaba.fluss.exception.InvalidColumnProjectionException; +import com.alibaba.fluss.metadata.LogFormat; import com.alibaba.fluss.row.InternalRow; import com.alibaba.fluss.types.RowType; import com.alibaba.fluss.utils.CloseableIterator; @@ -209,7 +210,8 @@ private final FileLogRecords createFileLogRecords(RowType rowType, List objects) throws Exception { return createRecordsWithoutBaseLogOffset( - DATA1_ROW_TYPE, DEFAULT_SCHEMA_ID, 0, System.currentTimeMillis(), objects); + DATA1_ROW_TYPE, + DEFAULT_SCHEMA_ID, + 0, + System.currentTimeMillis(), + objects, + LogFormat.ARROW); } public static MemoryLogRecords genMemoryLogRecordsWithWriterId( @@ -148,7 +154,8 @@ public static MemoryLogRecords genMemoryLogRecordsWithWriterId( writerId, batchSequence, rowKinds, - objects); + objects, + LogFormat.ARROW); } public static MemoryLogRecords genIndexedMemoryLogRecords(List rows) @@ -168,13 +175,18 @@ public static MemoryLogRecords genIndexedMemoryLogRecords(List rows) public static MemoryLogRecords genMemoryLogRecordsWithBaseOffset( long offsetBase, List objects) throws Exception { return createRecordsWithoutBaseLogOffset( - DATA1_ROW_TYPE, DEFAULT_SCHEMA_ID, offsetBase, -1L, objects); + DATA1_ROW_TYPE, DEFAULT_SCHEMA_ID, offsetBase, -1L, objects, LogFormat.ARROW); } public static MemoryLogRecords genLogRecordsWithBaseOffsetAndTimestamp( long offsetBase, long maxTimestamp, List objects) throws Exception { return createRecordsWithoutBaseLogOffset( - DATA1_ROW_TYPE, DEFAULT_SCHEMA_ID, offsetBase, maxTimestamp, objects); + DATA1_ROW_TYPE, + DEFAULT_SCHEMA_ID, + offsetBase, + maxTimestamp, + objects, + LogFormat.ARROW); } public static KvRecordBatch genKvRecordBatch(List> keyAndValues) @@ -246,11 +258,20 @@ public static void genRemoteLogSegmentFile( remoteLogTabletDir(remoteLogDir(conf), physicalTablePath, tableBucket); FsPath remoteLogSegmentDir = remoteLogSegmentDir(remoteLogTabletDir, remoteLogSegment.remoteLogSegmentId()); - genLogFile(DATA1_ROW_TYPE, new File(remoteLogSegmentDir.toString()), DATA1, baseOffset); + genLogFile( + DATA1_ROW_TYPE, + new File(remoteLogSegmentDir.toString()), + DATA1, + baseOffset, + LogFormat.ARROW); } public static File genLogFile( - RowType rowType, File segmentDir, List objects, long baseOffset) + RowType rowType, + File segmentDir, + List objects, + long baseOffset, + LogFormat logFormat) throws Exception { if (!segmentDir.exists()) { segmentDir.mkdirs(); @@ -264,7 +285,8 @@ public static File genLogFile( DEFAULT_SCHEMA_ID, baseOffset, System.currentTimeMillis(), - objects)); + objects, + logFormat)); fileLogRecords.flush(); fileLogRecords.close(); return logFile; @@ -305,7 +327,8 @@ public static MemoryLogRecords createRecordsWithoutBaseLogOffset( int schemaId, long offsetBase, long maxTimestamp, - List objects) + List objects, + LogFormat logFormat) throws Exception { List rowKinds = objects.stream().map(row -> RowKind.APPEND_ONLY).collect(Collectors.toList()); @@ -317,7 +340,8 @@ public static MemoryLogRecords createRecordsWithoutBaseLogOffset( NO_WRITER_ID, NO_BATCH_SEQUENCE, rowKinds, - objects); + objects, + logFormat); } public static MemoryLogRecords createBasicMemoryLogRecords( @@ -328,19 +352,58 @@ public static MemoryLogRecords createBasicMemoryLogRecords( long writerId, int batchSequence, List rowKinds, - List objects) + List objects, + LogFormat logFormat) throws Exception { - List rows = - objects.stream().map(object -> row(rowType, object)).collect(Collectors.toList()); - return createArrowMemoryLogRecords( + return createMemoryLogRecords( rowType, + schemaId, offsetBase, maxTimestamp, - schemaId, writerId, batchSequence, rowKinds, - rows); + objects, + logFormat); + } + + public static MemoryLogRecords createMemoryLogRecords( + RowType rowType, + int schemaId, + long offsetBase, + long maxTimestamp, + long writerId, + int batchSequence, + List rowKinds, + List objects, + LogFormat logFormat) + throws Exception { + if (logFormat == LogFormat.ARROW) { + List rows = + objects.stream() + .map(object -> row(rowType, object)) + .collect(Collectors.toList()); + return createArrowMemoryLogRecords( + rowType, + offsetBase, + maxTimestamp, + schemaId, + writerId, + batchSequence, + rowKinds, + rows); + } else { + return createIndexedMemoryLogRecords( + offsetBase, + maxTimestamp, + schemaId, + writerId, + batchSequence, + rowKinds, + objects.stream() + .map(object -> row(rowType, object)) + .collect(Collectors.toList())); + } } private static MemoryLogRecords createIndexedMemoryLogRecords( diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/kv/KvTabletTest.java b/fluss-server/src/test/java/com/alibaba/fluss/server/kv/KvTabletTest.java index 63b126fb..89064e59 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/kv/KvTabletTest.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/kv/KvTabletTest.java @@ -556,7 +556,8 @@ private MemoryLogRecords logRecords( NO_WRITER_ID, NO_BATCH_SEQUENCE, rowKinds, - values); + values, + LogFormat.ARROW); } private MemoryLogRecords logRecords( @@ -570,7 +571,8 @@ private MemoryLogRecords logRecords( NO_WRITER_ID, NO_BATCH_SEQUENCE, rowKinds, - values); + values, + LogFormat.ARROW); } private void checkEqual( diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/ReplicaTest.java b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/ReplicaTest.java index 5b791e62..d9cf963c 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/ReplicaTest.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/ReplicaTest.java @@ -17,6 +17,7 @@ package com.alibaba.fluss.server.replica; import com.alibaba.fluss.config.ConfigOptions; +import com.alibaba.fluss.metadata.LogFormat; import com.alibaba.fluss.metadata.PhysicalTablePath; import com.alibaba.fluss.metadata.TableBucket; import com.alibaba.fluss.metadata.TablePath; @@ -527,7 +528,8 @@ private static MemoryLogRecords logRecords( NO_WRITER_ID, NO_BATCH_SEQUENCE, rowKinds, - values); + values, + LogFormat.ARROW); } private LogAppendInfo putRecordsToLeader(