From 8b62653dd3b771d7bf75d7f1f70cfdcdea28d71f Mon Sep 17 00:00:00 2001 From: yunhong <337361684@qq.com> Date: Mon, 9 Dec 2024 20:50:49 +0800 Subject: [PATCH] [client] Indexed row support projection --- .../client/scanner/log/CompletedFetch.java | 32 +++- .../scanner/log/DefaultCompletedFetch.java | 36 +---- .../client/scanner/log/FlussLogScanner.java | 8 - .../fluss/client/scanner/log/LogFetcher.java | 23 ++- .../scanner/log/RemoteCompletedFetch.java | 34 +---- .../scanner/log/RemotePendingFetch.java | 9 +- .../fluss/client/table/FlussTable.java | 2 +- .../log/DefaultCompletedFetchTest.java | 57 +++++-- .../scanner/log/LogFetchBufferTest.java | 3 +- .../scanner/log/LogFetchCollectorTest.java | 2 +- .../scanner/log/RemoteCompletedFetchTest.java | 63 ++++++-- .../scanner/log/RemoteLogScannerITCase.java | 89 +++++++++++ .../fluss/client/table/FlussTableITCase.java | 39 ++++- .../fluss/record/LogRecordReadContext.java | 139 ++++++++++++++---- .../fluss/record/FileLogProjectionTest.java | 4 +- .../fluss/testutils/DataTestUtils.java | 93 ++++++++++-- .../flink/source/FlinkTableSourceITCase.java | 63 ++++++++ .../alibaba/fluss/server/kv/KvTabletTest.java | 6 +- .../fluss/server/replica/ReplicaTest.java | 4 +- 19 files changed, 526 insertions(+), 180 deletions(-) diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/CompletedFetch.java b/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/CompletedFetch.java index f0f9dbec..529aad54 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/CompletedFetch.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/CompletedFetch.java @@ -24,7 +24,9 @@ import com.alibaba.fluss.record.LogRecord; import com.alibaba.fluss.record.LogRecordBatch; import com.alibaba.fluss.record.LogRecordReadContext; +import com.alibaba.fluss.row.GenericRow; import com.alibaba.fluss.row.InternalRow; +import com.alibaba.fluss.row.ProjectedRow; import com.alibaba.fluss.rpc.messages.FetchLogRequest; import com.alibaba.fluss.rpc.protocol.ApiError; import com.alibaba.fluss.utils.CloseableIterator; @@ -58,7 +60,7 @@ abstract class CompletedFetch { private final boolean isCheckCrcs; private final Iterator batches; private final LogScannerStatus logScannerStatus; - private final LogRecordReadContext readContext; + protected final LogRecordReadContext readContext; @Nullable protected final Projection projection; protected final InternalRow.FieldGetter[] fieldGetters; @@ -81,8 +83,7 @@ public CompletedFetch( LogRecordReadContext readContext, LogScannerStatus logScannerStatus, boolean isCheckCrcs, - long fetchOffset, - @Nullable Projection projection) { + long fetchOffset) { this.tableBucket = tableBucket; this.error = error; this.sizeInBytes = sizeInBytes; @@ -91,12 +92,31 @@ public CompletedFetch( this.readContext = readContext; this.isCheckCrcs = isCheckCrcs; this.logScannerStatus = logScannerStatus; - this.projection = projection; + this.projection = readContext.getProjection(); this.nextFetchOffset = fetchOffset; - this.fieldGetters = readContext.getFieldGetters(); + this.fieldGetters = readContext.getProjectedFieldGetters(); } - protected abstract ScanRecord toScanRecord(LogRecord record); + // TODO: optimize this to avoid deep copying the record. + // refactor #fetchRecords to return an iterator which lazily deserialize + // from underlying record stream and arrow buffer. + ScanRecord toScanRecord(LogRecord record) { + GenericRow newRow = new GenericRow(fieldGetters.length); + InternalRow internalRow = record.getRow(); + for (int i = 0; i < fieldGetters.length; i++) { + newRow.setField(i, fieldGetters[i].getFieldOrNull(internalRow)); + } + if (projection != null && projection.isReorderingNeeded()) { + return new ScanRecord( + record.logOffset(), + record.timestamp(), + record.getRowKind(), + ProjectedRow.from(projection.getReorderingIndexes()).replaceRow(newRow)); + } else { + return new ScanRecord( + record.logOffset(), record.timestamp(), record.getRowKind(), newRow); + } + } boolean isConsumed() { return isConsumed; diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/DefaultCompletedFetch.java b/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/DefaultCompletedFetch.java index 9a01ed39..363998d1 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/DefaultCompletedFetch.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/DefaultCompletedFetch.java @@ -17,18 +17,10 @@ package com.alibaba.fluss.client.scanner.log; import com.alibaba.fluss.annotation.Internal; -import com.alibaba.fluss.client.scanner.ScanRecord; import com.alibaba.fluss.metadata.TableBucket; -import com.alibaba.fluss.record.LogRecord; import com.alibaba.fluss.record.LogRecordReadContext; -import com.alibaba.fluss.row.GenericRow; -import com.alibaba.fluss.row.InternalRow; -import com.alibaba.fluss.row.ProjectedRow; import com.alibaba.fluss.rpc.entity.FetchLogResultForBucket; import com.alibaba.fluss.rpc.messages.FetchLogRequest; -import com.alibaba.fluss.utils.Projection; - -import javax.annotation.Nullable; /** * {@link DefaultCompletedFetch} is a {@link CompletedFetch} that represents a completed fetch that @@ -43,8 +35,7 @@ public DefaultCompletedFetch( LogRecordReadContext readContext, LogScannerStatus logScannerStatus, boolean isCheckCrc, - Long fetchOffset, - @Nullable Projection projection) { + Long fetchOffset) { super( tableBucket, fetchLogResultForBucket.getError(), @@ -54,29 +45,6 @@ public DefaultCompletedFetch( readContext, logScannerStatus, isCheckCrc, - fetchOffset, - projection); - } - - // TODO: optimize this to avoid deep copying the record. - // refactor #fetchRecords to return an iterator which lazily deserialize - // from underlying record stream and arrow buffer. - @Override - protected ScanRecord toScanRecord(LogRecord record) { - GenericRow newRow = new GenericRow(fieldGetters.length); - InternalRow internalRow = record.getRow(); - for (int i = 0; i < fieldGetters.length; i++) { - newRow.setField(i, fieldGetters[i].getFieldOrNull(internalRow)); - } - if (projection != null && projection.isReorderingNeeded()) { - return new ScanRecord( - record.logOffset(), - record.timestamp(), - record.getRowKind(), - ProjectedRow.from(projection.getReorderingIndexes()).replaceRow(newRow)); - } else { - return new ScanRecord( - record.logOffset(), record.timestamp(), record.getRowKind(), newRow); - } + fetchOffset); } } diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/FlussLogScanner.java b/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/FlussLogScanner.java index 70fa10a8..0d98129e 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/FlussLogScanner.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/FlussLogScanner.java @@ -22,7 +22,6 @@ import com.alibaba.fluss.client.scanner.RemoteFileDownloader; import com.alibaba.fluss.client.scanner.ScanRecord; import com.alibaba.fluss.config.Configuration; -import com.alibaba.fluss.metadata.LogFormat; import com.alibaba.fluss.metadata.TableBucket; import com.alibaba.fluss.metadata.TableInfo; import com.alibaba.fluss.metadata.TablePath; @@ -113,13 +112,6 @@ public FlussLogScanner( private Projection sanityProjection(@Nullable int[] projectedFields, TableInfo tableInfo) { RowType tableRowType = tableInfo.getTableDescriptor().getSchema().toRowType(); if (projectedFields != null) { - LogFormat logFormat = tableInfo.getTableDescriptor().getLogFormat(); - if (logFormat != LogFormat.ARROW) { - throw new IllegalArgumentException( - String.format( - "Only ARROW log format supports column projection, but the log format of table '%s' is %s", - tableInfo.getTablePath(), logFormat)); - } for (int projectedField : projectedFields) { if (projectedField < 0 || projectedField >= tableRowType.getFieldCount()) { throw new IllegalArgumentException( diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/LogFetcher.java b/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/LogFetcher.java index f4d209f1..0a741aeb 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/LogFetcher.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/LogFetcher.java @@ -27,6 +27,7 @@ import com.alibaba.fluss.config.Configuration; import com.alibaba.fluss.exception.InvalidMetadataException; import com.alibaba.fluss.fs.FsPath; +import com.alibaba.fluss.metadata.LogFormat; import com.alibaba.fluss.metadata.PhysicalTablePath; import com.alibaba.fluss.metadata.TableBucket; import com.alibaba.fluss.metadata.TableInfo; @@ -96,6 +97,7 @@ public class LogFetcher implements Closeable { private final LogFetchBuffer logFetchBuffer; private final LogFetchCollector logFetchCollector; private final RemoteLogDownloader remoteLogDownloader; + private final LogFormat logFormat; @GuardedBy("this") private final Set nodesWithPendingFetchRequests; @@ -117,8 +119,10 @@ public LogFetcher( RemoteFileDownloader remoteFileDownloader) { this.tablePath = tableInfo.getTablePath(); this.isPartitioned = tableInfo.getTableDescriptor().isPartitioned(); - this.readContext = LogRecordReadContext.createReadContext(tableInfo, projection); - this.remoteReadContext = LogRecordReadContext.createReadContext(tableInfo, null); + this.logFormat = tableInfo.getTableDescriptor().getLogFormat(); + this.readContext = LogRecordReadContext.createReadContext(tableInfo, false, projection); + this.remoteReadContext = + LogRecordReadContext.createReadContext(tableInfo, true, projection); this.projection = projection; this.rpcClient = rpcClient; this.logScannerStatus = logScannerStatus; @@ -314,8 +318,7 @@ private synchronized void handleFetchLogResponse( readContext, logScannerStatus, isCheckCrcs, - fetchOffset, - projection); + fetchOffset); logFetchBuffer.add(completedFetch); } } @@ -352,8 +355,7 @@ private void pendRemoteFetches( highWatermark, remoteReadContext, logScannerStatus, - isCheckCrcs, - projection); + isCheckCrcs); logFetchBuffer.pend(pendingFetch); downloadFuture.onComplete(logFetchBuffer::tryComplete); } @@ -421,7 +423,7 @@ private Map prepareFetchLogRequests() { .setMaxBytes(maxFetchBytes); PbFetchLogReqForTable reqForTable = new PbFetchLogReqForTable().setTableId(finalTableId); - if (projection != null) { + if (projectionPushDownEnable()) { reqForTable .setProjectionPushdownEnabled(true) .setProjectedFields(projection.getProjectionInOrder()); @@ -447,6 +449,13 @@ private List fetchableBuckets() { return logScannerStatus.fetchableBuckets(tableBucket -> !exclude.contains(tableBucket)); } + private boolean projectionPushDownEnable() { + // Currently, only ARROW log format supports projection push down to server. Other log + // formats will do project in client, see DefaultCompletedFetch#toScanRecord() for more + // details. + return projection != null && logFormat == LogFormat.ARROW; + } + private Integer getTableBucketLeader(TableBucket tableBucket) { metadataUpdater.checkAndUpdateMetadata(tablePath, tableBucket); if (metadataUpdater.getBucketLocation(tableBucket).isPresent()) { diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/RemoteCompletedFetch.java b/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/RemoteCompletedFetch.java index c2778914..f2d48fd4 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/RemoteCompletedFetch.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/RemoteCompletedFetch.java @@ -17,18 +17,10 @@ package com.alibaba.fluss.client.scanner.log; import com.alibaba.fluss.annotation.Internal; -import com.alibaba.fluss.client.scanner.ScanRecord; import com.alibaba.fluss.metadata.TableBucket; import com.alibaba.fluss.record.FileLogRecords; -import com.alibaba.fluss.record.LogRecord; import com.alibaba.fluss.record.LogRecordReadContext; -import com.alibaba.fluss.row.GenericRow; -import com.alibaba.fluss.row.InternalRow; -import com.alibaba.fluss.row.ProjectedRow; import com.alibaba.fluss.rpc.protocol.ApiError; -import com.alibaba.fluss.utils.Projection; - -import javax.annotation.Nullable; import java.io.IOException; @@ -52,7 +44,6 @@ class RemoteCompletedFetch extends CompletedFetch { LogScannerStatus logScannerStatus, boolean isCheckCrc, long fetchOffset, - @Nullable Projection projection, Runnable recycleCallback) { super( tableBucket, @@ -63,8 +54,7 @@ class RemoteCompletedFetch extends CompletedFetch { readContext, logScannerStatus, isCheckCrc, - fetchOffset, - projection); + fetchOffset); this.fileLogRecords = fileLogRecords; this.recycleCallback = recycleCallback; } @@ -81,26 +71,4 @@ void drain() { // call recycle to remove the fetched files and increment the prefetch semaphore recycleCallback.run(); } - - // TODO: optimize this to avoid deep copying the record. - // refactor #fetchRecords to return an iterator which lazily deserialize - // from underlying record stream and arrow buffer. - @Override - protected ScanRecord toScanRecord(LogRecord record) { - GenericRow newRow = new GenericRow(fieldGetters.length); - InternalRow internalRow = record.getRow(); - for (int i = 0; i < fieldGetters.length; i++) { - newRow.setField(i, fieldGetters[i].getFieldOrNull(internalRow)); - } - if (projection != null) { - return new ScanRecord( - record.logOffset(), - record.timestamp(), - record.getRowKind(), - ProjectedRow.from(projection.projection()).replaceRow(newRow)); - } else { - return new ScanRecord( - record.logOffset(), record.timestamp(), record.getRowKind(), newRow); - } - } } diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/RemotePendingFetch.java b/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/RemotePendingFetch.java index e02e76d2..4ae9a0c4 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/RemotePendingFetch.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/RemotePendingFetch.java @@ -20,9 +20,6 @@ import com.alibaba.fluss.record.FileLogRecords; import com.alibaba.fluss.record.LogRecordReadContext; import com.alibaba.fluss.remote.RemoteLogSegment; -import com.alibaba.fluss.utils.Projection; - -import javax.annotation.Nullable; /** * {@link RemotePendingFetch} is a {@link PendingFetch} that represents a pending fetch that waiting @@ -39,7 +36,6 @@ class RemotePendingFetch implements PendingFetch { private final LogRecordReadContext readContext; private final LogScannerStatus logScannerStatus; private final boolean isCheckCrc; - private final @Nullable Projection projection; RemotePendingFetch( RemoteLogSegment remoteLogSegment, @@ -49,8 +45,7 @@ class RemotePendingFetch implements PendingFetch { long highWatermark, LogRecordReadContext readContext, LogScannerStatus logScannerStatus, - boolean isCheckCrc, - @Nullable Projection projection) { + boolean isCheckCrc) { this.remoteLogSegment = remoteLogSegment; this.downloadFuture = downloadFuture; this.posInLogSegment = posInLogSegment; @@ -59,7 +54,6 @@ class RemotePendingFetch implements PendingFetch { this.readContext = readContext; this.logScannerStatus = logScannerStatus; this.isCheckCrc = isCheckCrc; - this.projection = projection; } @Override @@ -83,7 +77,6 @@ public CompletedFetch toCompletedFetch() { logScannerStatus, isCheckCrc, fetchOffset, - projection, downloadFuture.getRecycleCallback()); } } diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/table/FlussTable.java b/fluss-client/src/main/java/com/alibaba/fluss/client/table/FlussTable.java index 54dda698..16dc10e0 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/table/FlussTable.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/table/FlussTable.java @@ -279,7 +279,7 @@ private List parseLimitScanResponse( } } else { LogRecordReadContext readContext = - LogRecordReadContext.createReadContext(tableInfo, null); + LogRecordReadContext.createReadContext(tableInfo, false, null); LogRecords records = MemoryLogRecords.pointToByteBuffer(recordsBuffer); for (LogRecordBatch logRecordBatch : records.batches()) { // A batch of log record maybe little more than limit, thus we need slice the diff --git a/fluss-client/src/test/java/com/alibaba/fluss/client/scanner/log/DefaultCompletedFetchTest.java b/fluss-client/src/test/java/com/alibaba/fluss/client/scanner/log/DefaultCompletedFetchTest.java index 007ba676..16a2cf3e 100644 --- a/fluss-client/src/test/java/com/alibaba/fluss/client/scanner/log/DefaultCompletedFetchTest.java +++ b/fluss-client/src/test/java/com/alibaba/fluss/client/scanner/log/DefaultCompletedFetchTest.java @@ -17,7 +17,10 @@ package com.alibaba.fluss.client.scanner.log; import com.alibaba.fluss.client.scanner.ScanRecord; +import com.alibaba.fluss.metadata.LogFormat; +import com.alibaba.fluss.metadata.Schema; import com.alibaba.fluss.metadata.TableBucket; +import com.alibaba.fluss.metadata.TableDescriptor; import com.alibaba.fluss.metadata.TableInfo; import com.alibaba.fluss.record.FileLogProjection; import com.alibaba.fluss.record.FileLogRecords; @@ -26,6 +29,7 @@ import com.alibaba.fluss.record.RowKind; import com.alibaba.fluss.row.InternalRow; import com.alibaba.fluss.rpc.entity.FetchLogResultForBucket; +import com.alibaba.fluss.types.DataTypes; import com.alibaba.fluss.types.RowType; import com.alibaba.fluss.utils.FlussPaths; import com.alibaba.fluss.utils.Projection; @@ -33,6 +37,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.io.File; import java.nio.ByteBuffer; @@ -46,6 +52,7 @@ import static com.alibaba.fluss.record.TestData.DATA2_ROW_TYPE; import static com.alibaba.fluss.record.TestData.DATA2_TABLE_ID; import static com.alibaba.fluss.record.TestData.DATA2_TABLE_INFO; +import static com.alibaba.fluss.record.TestData.DATA2_TABLE_PATH; import static com.alibaba.fluss.record.TestData.DEFAULT_SCHEMA_ID; import static com.alibaba.fluss.testutils.DataTestUtils.createRecordsWithoutBaseLogOffset; import static org.assertj.core.api.Assertions.assertThat; @@ -75,7 +82,8 @@ void testSimple() throws Exception { int bucketId = 0; // records for 0-10. TableBucket tb = new TableBucket(DATA2_TABLE_ID, bucketId); FetchLogResultForBucket resultForBucket0 = - new FetchLogResultForBucket(tb, createMemoryLogRecords(DATA2), 10L); + new FetchLogResultForBucket( + tb, createMemoryLogRecords(DATA2, LogFormat.ARROW), 10L); DefaultCompletedFetch defaultCompletedFetch = makeCompletedFetch(tb, resultForBucket0, fetchOffset); List scanRecords = defaultCompletedFetch.fetchRecords(8); @@ -96,7 +104,8 @@ void testNegativeFetchCount() throws Exception { int bucketId = 0; // records for 0-10. TableBucket tb = new TableBucket(DATA2_TABLE_ID, bucketId); FetchLogResultForBucket resultForBucket0 = - new FetchLogResultForBucket(tb, createMemoryLogRecords(DATA2), 10L); + new FetchLogResultForBucket( + tb, createMemoryLogRecords(DATA2, LogFormat.ARROW), 10L); DefaultCompletedFetch defaultCompletedFetch = makeCompletedFetch(tb, resultForBucket0, fetchOffset); List scanRecords = defaultCompletedFetch.fetchRecords(-10); @@ -116,14 +125,37 @@ void testNoRecordsInFetch() { assertThat(scanRecords.size()).isEqualTo(0); } - @Test - void testProjection() throws Exception { + @ParameterizedTest + @ValueSource(strings = {"INDEXED", "ARROW"}) + void testProjection(String format) throws Exception { + LogFormat logFormat = LogFormat.fromString(format); + Schema schema = + Schema.newBuilder() + .column("a", DataTypes.INT()) + .withComment("a is first column") + .column("b", DataTypes.STRING()) + .withComment("b is second column") + .column("c", DataTypes.STRING()) + .withComment("c is adding column") + .build(); + tableInfo = + new TableInfo( + DATA2_TABLE_PATH, + DATA2_TABLE_ID, + TableDescriptor.builder().schema(schema).logFormat(logFormat).build(), + 1); long fetchOffset = 0L; int bucketId = 0; // records for 0-10. TableBucket tb = new TableBucket(DATA2_TABLE_ID, bucketId); Projection projection = Projection.of(new int[] {0, 2}); + MemoryLogRecords memoryLogRecords; + if (logFormat == LogFormat.ARROW) { + memoryLogRecords = genRecordsWithProjection(DATA2, projection); + } else { + memoryLogRecords = createMemoryLogRecords(DATA2, LogFormat.INDEXED); + } FetchLogResultForBucket resultForBucket0 = - new FetchLogResultForBucket(tb, genRecordsWithProjection(DATA2, projection), 10L); + new FetchLogResultForBucket(tb, memoryLogRecords, 10L); DefaultCompletedFetch defaultCompletedFetch = makeCompletedFetch(tb, resultForBucket0, fetchOffset, projection); List scanRecords = defaultCompletedFetch.fetchRecords(8); @@ -148,6 +180,7 @@ void testProjection() throws Exception { assertThat(row.getString(1).toString()).isEqualTo(expectObject[1]); } + // test projection reorder. defaultCompletedFetch = makeCompletedFetch( tb, resultForBucket0, fetchOffset, Projection.of(new int[] {2, 0})); @@ -177,15 +210,16 @@ private DefaultCompletedFetch makeCompletedFetch( return new DefaultCompletedFetch( tableBucket, resultForBucket, - LogRecordReadContext.createReadContext(tableInfo, projection), + LogRecordReadContext.createReadContext(tableInfo, false, projection), logScannerStatus, true, - offset, - projection); + offset); } - private MemoryLogRecords createMemoryLogRecords(List objects) throws Exception { - return createRecordsWithoutBaseLogOffset(rowType, DEFAULT_SCHEMA_ID, 0L, 1000L, objects); + private MemoryLogRecords createMemoryLogRecords(List objects, LogFormat logFormat) + throws Exception { + return createRecordsWithoutBaseLogOffset( + rowType, DEFAULT_SCHEMA_ID, 0L, 1000L, objects, logFormat); } private MemoryLogRecords genRecordsWithProjection(List objects, Projection projection) @@ -193,7 +227,8 @@ private MemoryLogRecords genRecordsWithProjection(List objects, Projec File logFile = FlussPaths.logFile(tempDir, 0L); FileLogRecords fileLogRecords = FileLogRecords.open(logFile, false, 1024 * 1024, false); fileLogRecords.append( - createRecordsWithoutBaseLogOffset(rowType, DEFAULT_SCHEMA_ID, 0L, 1000L, objects)); + createRecordsWithoutBaseLogOffset( + rowType, DEFAULT_SCHEMA_ID, 0L, 1000L, objects, LogFormat.ARROW)); fileLogRecords.flush(); FileLogProjection fileLogProjection = new FileLogProjection(); diff --git a/fluss-client/src/test/java/com/alibaba/fluss/client/scanner/log/LogFetchBufferTest.java b/fluss-client/src/test/java/com/alibaba/fluss/client/scanner/log/LogFetchBufferTest.java index aa54ba68..da2d7417 100644 --- a/fluss-client/src/test/java/com/alibaba/fluss/client/scanner/log/LogFetchBufferTest.java +++ b/fluss-client/src/test/java/com/alibaba/fluss/client/scanner/log/LogFetchBufferTest.java @@ -250,8 +250,7 @@ private DefaultCompletedFetch makeCompletedFetch(TableBucket tableBucket) throws readContext, logScannerStatus, true, - 0L, - null); + 0L); } private PendingFetch makePendingFetch(TableBucket tableBucket) throws Exception { diff --git a/fluss-client/src/test/java/com/alibaba/fluss/client/scanner/log/LogFetchCollectorTest.java b/fluss-client/src/test/java/com/alibaba/fluss/client/scanner/log/LogFetchCollectorTest.java index 8266adde..1825f5b9 100644 --- a/fluss-client/src/test/java/com/alibaba/fluss/client/scanner/log/LogFetchCollectorTest.java +++ b/fluss-client/src/test/java/com/alibaba/fluss/client/scanner/log/LogFetchCollectorTest.java @@ -157,6 +157,6 @@ void testCollectAfterUnassign() throws Exception { private DefaultCompletedFetch makeCompletedFetch( TableBucket tableBucket, FetchLogResultForBucket resultForBucket, long offset) { return new DefaultCompletedFetch( - tableBucket, resultForBucket, readContext, logScannerStatus, true, offset, null); + tableBucket, resultForBucket, readContext, logScannerStatus, true, offset); } } diff --git a/fluss-client/src/test/java/com/alibaba/fluss/client/scanner/log/RemoteCompletedFetchTest.java b/fluss-client/src/test/java/com/alibaba/fluss/client/scanner/log/RemoteCompletedFetchTest.java index 0920f925..9a75cfc0 100644 --- a/fluss-client/src/test/java/com/alibaba/fluss/client/scanner/log/RemoteCompletedFetchTest.java +++ b/fluss-client/src/test/java/com/alibaba/fluss/client/scanner/log/RemoteCompletedFetchTest.java @@ -18,14 +18,18 @@ import com.alibaba.fluss.client.scanner.ScanRecord; import com.alibaba.fluss.fs.FsPath; +import com.alibaba.fluss.metadata.LogFormat; import com.alibaba.fluss.metadata.PhysicalTablePath; +import com.alibaba.fluss.metadata.Schema; import com.alibaba.fluss.metadata.TableBucket; +import com.alibaba.fluss.metadata.TableDescriptor; import com.alibaba.fluss.metadata.TableInfo; import com.alibaba.fluss.record.FileLogRecords; import com.alibaba.fluss.record.LogRecordReadContext; import com.alibaba.fluss.record.RowKind; import com.alibaba.fluss.remote.RemoteLogSegment; import com.alibaba.fluss.row.InternalRow; +import com.alibaba.fluss.types.DataTypes; import com.alibaba.fluss.types.RowType; import com.alibaba.fluss.utils.Projection; @@ -33,6 +37,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import javax.annotation.Nullable; @@ -90,7 +96,8 @@ void testSimple() throws Exception { TableBucket tableBucket = new TableBucket(DATA2_TABLE_ID, 0); AtomicBoolean recycleCalled = new AtomicBoolean(false); FileLogRecords fileLogRecords = - createFileLogRecords(tableBucket, DATA2_PHYSICAL_TABLE_PATH, DATA2); + createFileLogRecords( + tableBucket, DATA2_PHYSICAL_TABLE_PATH, DATA2, LogFormat.ARROW); RemoteCompletedFetch completedFetch = makeCompletedFetch( tableBucket, @@ -123,7 +130,11 @@ void testFetchForPartitionTable() throws Exception { TableBucket tb = new TableBucket(DATA2_TABLE_ID, (long) 0, 0); AtomicBoolean recycleCalled = new AtomicBoolean(false); FileLogRecords fileLogRecords = - createFileLogRecords(tb, PhysicalTablePath.of(DATA2_TABLE_PATH, "20240904"), DATA2); + createFileLogRecords( + tb, + PhysicalTablePath.of(DATA2_TABLE_PATH, "20240904"), + DATA2, + LogFormat.ARROW); RemoteCompletedFetch completedFetch = makeCompletedFetch( tb, fileLogRecords, fetchOffset, null, () -> recycleCalled.set(true)); @@ -151,7 +162,8 @@ void testNegativeFetchCount() throws Exception { long fetchOffset = 0L; TableBucket tableBucket = new TableBucket(DATA2_TABLE_ID, 0); FileLogRecords fileLogRecords = - createFileLogRecords(tableBucket, DATA2_PHYSICAL_TABLE_PATH, DATA2); + createFileLogRecords( + tableBucket, DATA2_PHYSICAL_TABLE_PATH, DATA2, LogFormat.ARROW); RemoteCompletedFetch completedFetch = makeCompletedFetch(tableBucket, fileLogRecords, fetchOffset, null); @@ -165,7 +177,10 @@ void testNoRecordsInFetch() throws Exception { TableBucket tableBucket = new TableBucket(DATA2_TABLE_ID, 0); FileLogRecords fileLogRecords = createFileLogRecords( - tableBucket, DATA2_PHYSICAL_TABLE_PATH, Collections.emptyList()); + tableBucket, + DATA2_PHYSICAL_TABLE_PATH, + Collections.emptyList(), + LogFormat.ARROW); RemoteCompletedFetch completedFetch = makeCompletedFetch(tableBucket, fileLogRecords, fetchOffset, null); @@ -173,12 +188,29 @@ void testNoRecordsInFetch() throws Exception { assertThat(scanRecords.size()).isEqualTo(0); } - @Test - void testProjection() throws Exception { + @ParameterizedTest + @ValueSource(strings = {"INDEXED", "ARROW"}) + void testProjection(String format) throws Exception { + LogFormat logFormat = LogFormat.fromString(format); + Schema schema = + Schema.newBuilder() + .column("a", DataTypes.INT()) + .withComment("a is first column") + .column("b", DataTypes.STRING()) + .withComment("b is second column") + .column("c", DataTypes.STRING()) + .withComment("c is adding column") + .build(); + tableInfo = + new TableInfo( + DATA2_TABLE_PATH, + DATA2_TABLE_ID, + TableDescriptor.builder().schema(schema).logFormat(logFormat).build(), + 1); long fetchOffset = 0L; TableBucket tableBucket = new TableBucket(DATA2_TABLE_ID, 0); FileLogRecords fileLogRecords = - createFileLogRecords(tableBucket, DATA2_PHYSICAL_TABLE_PATH, DATA2); + createFileLogRecords(tableBucket, DATA2_PHYSICAL_TABLE_PATH, DATA2, logFormat); RemoteCompletedFetch completedFetch = makeCompletedFetch( tableBucket, fileLogRecords, fetchOffset, Projection.of(new int[] {0, 2})); @@ -222,7 +254,10 @@ void testProjection() throws Exception { } private FileLogRecords createFileLogRecords( - TableBucket tableBucket, PhysicalTablePath physicalTablePath, List objects) + TableBucket tableBucket, + PhysicalTablePath physicalTablePath, + List objects, + LogFormat logFormat) throws Exception { UUID segmentId = UUID.randomUUID(); RemoteLogSegment remoteLogSegment = @@ -235,7 +270,8 @@ private FileLogRecords createFileLogRecords( .segmentSizeInBytes(Integer.MAX_VALUE) .build(); File logFile = - genRemoteLogSegmentFile(DATA2_ROW_TYPE, tempDir, remoteLogSegment, objects, 0L); + genRemoteLogSegmentFile( + DATA2_ROW_TYPE, tempDir, remoteLogSegment, objects, 0L, logFormat); return FileLogRecords.open(logFile, false); } @@ -249,11 +285,10 @@ private RemoteCompletedFetch makeCompletedFetch( tableBucket, fileLogRecords, 10L, - LogRecordReadContext.createReadContext(tableInfo, null), + LogRecordReadContext.createReadContext(tableInfo, true, projection), logScannerStatus, true, fetchOffset, - projection, recycle); } @@ -270,12 +305,14 @@ private static File genRemoteLogSegmentFile( File remoteLogTabletDir, RemoteLogSegment remoteLogSegment, List objects, - long baseOffset) + long baseOffset, + LogFormat logFormat) throws Exception { FsPath remoteLogSegmentDir = remoteLogSegmentDir( new FsPath(remoteLogTabletDir.getAbsolutePath()), remoteLogSegment.remoteLogSegmentId()); - return genLogFile(rowType, new File(remoteLogSegmentDir.toString()), objects, baseOffset); + return genLogFile( + rowType, new File(remoteLogSegmentDir.toString()), objects, baseOffset, logFormat); } } diff --git a/fluss-client/src/test/java/com/alibaba/fluss/client/scanner/log/RemoteLogScannerITCase.java b/fluss-client/src/test/java/com/alibaba/fluss/client/scanner/log/RemoteLogScannerITCase.java index 9ac8b544..c166ebb4 100644 --- a/fluss-client/src/test/java/com/alibaba/fluss/client/scanner/log/RemoteLogScannerITCase.java +++ b/fluss-client/src/test/java/com/alibaba/fluss/client/scanner/log/RemoteLogScannerITCase.java @@ -27,6 +27,7 @@ import com.alibaba.fluss.config.ConfigOptions; import com.alibaba.fluss.config.Configuration; import com.alibaba.fluss.config.MemorySize; +import com.alibaba.fluss.metadata.LogFormat; import com.alibaba.fluss.metadata.Schema; import com.alibaba.fluss.metadata.TableBucket; import com.alibaba.fluss.metadata.TableDescriptor; @@ -40,6 +41,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.time.Duration; import java.util.ArrayList; @@ -109,6 +112,88 @@ void testScanFromRemote() throws Exception { assertThat(rowList).containsExactlyInAnyOrderElementsOf(expectedRows); } + @ParameterizedTest + @ValueSource(strings = {"INDEXED", "ARROW"}) + void testScanFromRemoteAndProject(String format) throws Exception { + Schema schema = + Schema.newBuilder() + .column("a", DataTypes.INT()) + .column("b", DataTypes.INT()) + .column("c", DataTypes.STRING()) + .column("d", DataTypes.BIGINT()) + .build(); + TableDescriptor tableDescriptor = + TableDescriptor.builder() + .schema(schema) + .distributedBy(1) + .logFormat(LogFormat.fromString(format)) + .build(); + long tableId = createTable(DATA1_TABLE_PATH, tableDescriptor); + + // append a batch of data. + List expectedRows = new ArrayList<>(); + Table table = conn.getTable(DATA1_TABLE_PATH); + AppendWriter appendWriter = table.getAppendWriter(); + int expectedSize = 30; + for (int i = 0; i < expectedSize; i++) { + String value = i % 2 == 0 ? "hello, friend" + i : null; + InternalRow row = row(schema.toRowType(), new Object[] {i, 100, value, i * 10L}); + appendWriter.append(row); + if (i % 10 == 0) { + // insert 3 bathes, each batch has 10 rows + appendWriter.flush(); + } + } + + FLUSS_CLUSTER_EXTENSION.waitUtilSomeLogSegmentsCopyToRemote(new TableBucket(tableId, 0)); + + // test fetch. + LogScanner logScanner = createLogScanner(table, new int[] {0, 2}); + logScanner.subscribeFromBeginning(0); + int count = 0; + while (count < expectedSize) { + ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1)); + for (ScanRecord scanRecord : scanRecords) { + assertThat(scanRecord.getRowKind()).isEqualTo(RowKind.APPEND_ONLY); + assertThat(scanRecord.getRow().getFieldCount()).isEqualTo(2); + assertThat(scanRecord.getRow().getInt(0)).isEqualTo(count); + if (count % 2 == 0) { + assertThat(scanRecord.getRow().getString(1).toString()) + .isEqualTo("hello, friend" + count); + } else { + // check null values + assertThat(scanRecord.getRow().isNullAt(1)).isTrue(); + } + count++; + } + } + assertThat(count).isEqualTo(expectedSize); + logScanner.close(); + + // fetch data with projection reorder. + logScanner = createLogScanner(table, new int[] {2, 0}); + logScanner.subscribeFromBeginning(0); + count = 0; + while (count < expectedSize) { + ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1)); + for (ScanRecord scanRecord : scanRecords) { + assertThat(scanRecord.getRowKind()).isEqualTo(RowKind.APPEND_ONLY); + assertThat(scanRecord.getRow().getFieldCount()).isEqualTo(2); + assertThat(scanRecord.getRow().getInt(1)).isEqualTo(count); + if (count % 2 == 0) { + assertThat(scanRecord.getRow().getString(0).toString()) + .isEqualTo("hello, friend" + count); + } else { + // check null values + assertThat(scanRecord.getRow().isNullAt(0)).isTrue(); + } + count++; + } + } + assertThat(count).isEqualTo(expectedSize); + logScanner.close(); + } + @Test void testPartitionTableFetchFromRemote() throws Exception { final Schema data2NonPkSchema = @@ -186,4 +271,8 @@ private static Configuration initConfig() { conf.set(ConfigOptions.LOG_SEGMENT_FILE_SIZE, MemorySize.parse("1kb")); return conf; } + + private static LogScanner createLogScanner(Table table, int[] projectFields) { + return table.getLogScanner(new LogScan().withProjectedFields(projectFields)); + } } diff --git a/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java b/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java index 9d26a9b3..48abf51c 100644 --- a/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java +++ b/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java @@ -610,8 +610,9 @@ void verifyAppendOrPut(boolean append, String logFormat, @Nullable String kvForm } } - @Test - void testAppendAndProject() throws Exception { + @ParameterizedTest + @ValueSource(strings = {"INDEXED", "ARROW"}) + void testAppendAndProject(String format) throws Exception { Schema schema = Schema.newBuilder() .column("a", DataTypes.INT()) @@ -619,7 +620,11 @@ void testAppendAndProject() throws Exception { .column("c", DataTypes.STRING()) .column("d", DataTypes.BIGINT()) .build(); - TableDescriptor tableDescriptor = TableDescriptor.builder().schema(schema).build(); + TableDescriptor tableDescriptor = + TableDescriptor.builder() + .schema(schema) + .logFormat(LogFormat.fromString(format)) + .build(); TablePath tablePath = TablePath.of("test_db_1", "test_append_and_project"); createTable(tablePath, tableDescriptor, false); @@ -658,6 +663,29 @@ void testAppendAndProject() throws Exception { } assertThat(count).isEqualTo(expectedSize); logScanner.close(); + + // fetch data with projection reorder. + logScanner = createLogScanner(table, new int[] {2, 0}); + subscribeFromBeginning(logScanner, table); + count = 0; + while (count < expectedSize) { + ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1)); + for (ScanRecord scanRecord : scanRecords) { + assertThat(scanRecord.getRowKind()).isEqualTo(RowKind.APPEND_ONLY); + assertThat(scanRecord.getRow().getFieldCount()).isEqualTo(2); + assertThat(scanRecord.getRow().getInt(1)).isEqualTo(count); + if (count % 2 == 0) { + assertThat(scanRecord.getRow().getString(0).toString()) + .isEqualTo("hello, friend" + count); + } else { + // check null values + assertThat(scanRecord.getRow().isNullAt(0)).isTrue(); + } + count++; + } + } + assertThat(count).isEqualTo(expectedSize); + logScanner.close(); } } @@ -784,10 +812,9 @@ void testInvalidColumnProjection() throws Exception { Table table = conn.getTable(DATA1_TABLE_PATH); // validation on projection - assertThatThrownBy(() -> createLogScanner(table, new int[] {1})) + assertThatThrownBy(() -> createLogScanner(table, new int[] {1, 2, 3, 4, 5})) .isInstanceOf(IllegalArgumentException.class) .hasMessage( - "Only ARROW log format supports column projection, but the log format " - + "of table 'test_db_1.test_non_pk_table_1' is INDEXED"); + "Projected field index 2 is out of bound for schema ROW<`a` INT, `b` STRING>"); } } diff --git a/fluss-common/src/main/java/com/alibaba/fluss/record/LogRecordReadContext.java b/fluss-common/src/main/java/com/alibaba/fluss/record/LogRecordReadContext.java index 8178c0ca..c27eebf5 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/record/LogRecordReadContext.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/record/LogRecordReadContext.java @@ -17,7 +17,6 @@ package com.alibaba.fluss.record; import com.alibaba.fluss.annotation.VisibleForTesting; -import com.alibaba.fluss.exception.InvalidColumnProjectionException; import com.alibaba.fluss.metadata.LogFormat; import com.alibaba.fluss.metadata.TableDescriptor; import com.alibaba.fluss.metadata.TableInfo; @@ -25,12 +24,15 @@ import com.alibaba.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocator; import com.alibaba.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator; import com.alibaba.fluss.shaded.arrow.org.apache.arrow.vector.VectorSchemaRoot; +import com.alibaba.fluss.types.DataType; import com.alibaba.fluss.types.RowType; import com.alibaba.fluss.utils.ArrowUtils; import com.alibaba.fluss.utils.Projection; import javax.annotation.Nullable; +import java.util.List; + import static com.alibaba.fluss.utils.Preconditions.checkArgument; import static com.alibaba.fluss.utils.Preconditions.checkNotNull; @@ -39,22 +41,26 @@ public class LogRecordReadContext implements LogRecordBatch.ReadContext, AutoClo // the log format of the table private final LogFormat logFormat; - // the static schema of the table - private final RowType rowType; + // the schema of the date read form server or remote. (which is projected in the server side) + private final RowType dataRowType; // the static schemaId of the table, should support dynamic schema evolution in the future private final int schemaId; - // the fieldGetter to get the log value of the table; - private final InternalRow.FieldGetter[] fieldGetters; + // the projectedFieldGetter to get the log value of the table after projection; + private final InternalRow.FieldGetter[] projectedFieldGetters; // the Arrow vector schema root of the table, should be null if not ARROW log format @Nullable private final VectorSchemaRoot vectorSchemaRoot; // the Arrow memory buffer allocator for the table, should be null if not ARROW log format @Nullable private final BufferAllocator bufferAllocator; + // whether context is to read from remote + private final boolean readFromRemote; + // the projection info + @Nullable private final Projection projection; /** * Creates a LogRecordReadContext for the given table information and projection information. */ public static LogRecordReadContext createReadContext( - TableInfo tableInfo, @Nullable Projection projection) { + TableInfo tableInfo, boolean readFromRemote, @Nullable Projection projection) { TableDescriptor desc = tableInfo.getTableDescriptor(); RowType rowType = desc.getSchema().toRowType(); LogFormat logFormat = desc.getLogFormat(); @@ -63,23 +69,23 @@ public static LogRecordReadContext createReadContext( if (logFormat == LogFormat.ARROW) { // TODO: use a more reasonable memory limit BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); - if (projection == null) { + + // currently, For remote read, arrow log format do not support projection in remote. + if (projection == null || readFromRemote) { VectorSchemaRoot vectorRoot = VectorSchemaRoot.create(ArrowUtils.toArrowSchema(rowType), allocator); - return createArrowReadContext(rowType, schemaId, vectorRoot, allocator); + return createArrowReadContext( + rowType, schemaId, vectorRoot, allocator, readFromRemote, projection); } else { RowType projectedRowType = projection.projectInOrder(rowType); VectorSchemaRoot vectorRoot = VectorSchemaRoot.create( ArrowUtils.toArrowSchema(projectedRowType), allocator); - return createArrowReadContext(projectedRowType, schemaId, vectorRoot, allocator); + return createArrowReadContext( + projectedRowType, schemaId, vectorRoot, allocator, false, projection); } } else if (logFormat == LogFormat.INDEXED) { - if (projection != null) { - throw new InvalidColumnProjectionException( - "Column projection is not supported for INDEXED log format."); - } - return createIndexedReadContext(rowType, schemaId); + return createIndexedReadContext(rowType, schemaId, readFromRemote, projection); } else { throw new IllegalArgumentException("Unsupported log format: " + logFormat); } @@ -88,20 +94,30 @@ public static LogRecordReadContext createReadContext( /** * Creates a LogRecordReadContext for ARROW log format. * - * @param rowType the schema of the table + * @param dataRowType the schema of the date read form server or remote * @param schemaId the schemaId of the table * @param vectorSchemaRoot the shared vector schema root for the table * @param bufferAllocator the shared buffer allocator for the table + * @param readFromRemote whether context is to read from remote + * @param projection the projection info */ public static LogRecordReadContext createArrowReadContext( - RowType rowType, + RowType dataRowType, int schemaId, VectorSchemaRoot vectorSchemaRoot, - BufferAllocator bufferAllocator) { + BufferAllocator bufferAllocator, + boolean readFromRemote, + @Nullable Projection projection) { checkNotNull(vectorSchemaRoot); checkNotNull(bufferAllocator); return new LogRecordReadContext( - LogFormat.ARROW, rowType, schemaId, vectorSchemaRoot, bufferAllocator); + LogFormat.ARROW, + dataRowType, + schemaId, + vectorSchemaRoot, + bufferAllocator, + readFromRemote, + projection); } /** @@ -116,7 +132,7 @@ public static LogRecordReadContext createArrowReadContext(RowType rowType, int s BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); VectorSchemaRoot vectorRoot = VectorSchemaRoot.create(ArrowUtils.toArrowSchema(rowType), allocator); - return createArrowReadContext(rowType, schemaId, vectorRoot, allocator); + return createArrowReadContext(rowType, schemaId, vectorRoot, allocator, false, null); } /** @@ -126,24 +142,43 @@ public static LogRecordReadContext createArrowReadContext(RowType rowType, int s * @param schemaId the schemaId of the table */ public static LogRecordReadContext createIndexedReadContext(RowType rowType, int schemaId) { - return new LogRecordReadContext(LogFormat.INDEXED, rowType, schemaId, null, null); + return new LogRecordReadContext( + LogFormat.INDEXED, rowType, schemaId, null, null, false, null); + } + + /** + * Creates a LogRecordReadContext for INDEXED log format. + * + * @param rowType the schema of the table + * @param schemaId the schemaId of the table + * @param readFromRemote whether context is to read from remote + * @param projection the projection info + */ + public static LogRecordReadContext createIndexedReadContext( + RowType rowType, + int schemaId, + boolean readFromRemote, + @Nullable Projection projection) { + return new LogRecordReadContext( + LogFormat.INDEXED, rowType, schemaId, null, null, readFromRemote, projection); } private LogRecordReadContext( LogFormat logFormat, - RowType rowType, + RowType dataRowType, int schemaId, VectorSchemaRoot vectorSchemaRoot, - BufferAllocator bufferAllocator) { + BufferAllocator bufferAllocator, + boolean readFromRemote, + @Nullable Projection projection) { this.logFormat = logFormat; - this.rowType = rowType; + this.dataRowType = dataRowType; this.schemaId = schemaId; this.vectorSchemaRoot = vectorSchemaRoot; this.bufferAllocator = bufferAllocator; - this.fieldGetters = new InternalRow.FieldGetter[rowType.getFieldCount()]; - for (int i = 0; i < fieldGetters.length; i++) { - fieldGetters[i] = InternalRow.createFieldGetter(rowType.getChildren().get(i), i); - } + this.readFromRemote = readFromRemote; + this.projection = projection; + this.projectedFieldGetters = buildProjectedFieldGetters(); } @Override @@ -158,17 +193,59 @@ public RowType getRowType(int schemaId) { "The schemaId (%s) in the record batch is not the same as the context (%s).", schemaId, this.schemaId); - return rowType; + return dataRowType; } - public RowType getRowType() { - return rowType; + public RowType getDataRowType() { + return dataRowType; } - public InternalRow.FieldGetter[] getFieldGetters() { + private InternalRow.FieldGetter[] buildProjectedFieldGetters() { + InternalRow.FieldGetter[] fieldGetters; + List dataTypeList = dataRowType.getChildren(); + if (readFromRemote) { + if (projection != null) { + int[] projectionInOrder = projection.getProjectionInOrder(); + fieldGetters = new InternalRow.FieldGetter[projectionInOrder.length]; + for (int i = 0; i < fieldGetters.length; i++) { + fieldGetters[i] = + InternalRow.createFieldGetter( + dataTypeList.get(projectionInOrder[i]), projectionInOrder[i]); + } + } else { + fieldGetters = new InternalRow.FieldGetter[dataTypeList.size()]; + for (int i = 0; i < fieldGetters.length; i++) { + fieldGetters[i] = InternalRow.createFieldGetter(dataTypeList.get(i), i); + } + } + } else { + // Arrow log format already project in the server side. + if (projection != null && logFormat != LogFormat.ARROW) { + int[] projectionInOrder = projection.getProjectionInOrder(); + fieldGetters = new InternalRow.FieldGetter[projectionInOrder.length]; + for (int i = 0; i < fieldGetters.length; i++) { + fieldGetters[i] = + InternalRow.createFieldGetter( + dataTypeList.get(projectionInOrder[i]), projectionInOrder[i]); + } + } else { + fieldGetters = new InternalRow.FieldGetter[dataTypeList.size()]; + for (int i = 0; i < fieldGetters.length; i++) { + fieldGetters[i] = InternalRow.createFieldGetter(dataTypeList.get(i), i); + } + } + } return fieldGetters; } + public InternalRow.FieldGetter[] getProjectedFieldGetters() { + return projectedFieldGetters; + } + + public @Nullable Projection getProjection() { + return projection; + } + @Override public VectorSchemaRoot getVectorSchemaRoot(int schemaId) { checkArgument( diff --git a/fluss-common/src/test/java/com/alibaba/fluss/record/FileLogProjectionTest.java b/fluss-common/src/test/java/com/alibaba/fluss/record/FileLogProjectionTest.java index 04d7470c..34e8bea8 100644 --- a/fluss-common/src/test/java/com/alibaba/fluss/record/FileLogProjectionTest.java +++ b/fluss-common/src/test/java/com/alibaba/fluss/record/FileLogProjectionTest.java @@ -17,6 +17,7 @@ package com.alibaba.fluss.record; import com.alibaba.fluss.exception.InvalidColumnProjectionException; +import com.alibaba.fluss.metadata.LogFormat; import com.alibaba.fluss.row.InternalRow; import com.alibaba.fluss.types.RowType; import com.alibaba.fluss.utils.CloseableIterator; @@ -209,7 +210,8 @@ private final FileLogRecords createFileLogRecords(RowType rowType, List objects) throws Exception { return createRecordsWithoutBaseLogOffset( - DATA1_ROW_TYPE, DEFAULT_SCHEMA_ID, 0, System.currentTimeMillis(), objects); + DATA1_ROW_TYPE, + DEFAULT_SCHEMA_ID, + 0, + System.currentTimeMillis(), + objects, + LogFormat.ARROW); } public static MemoryLogRecords genMemoryLogRecordsWithWriterId( @@ -148,7 +154,8 @@ public static MemoryLogRecords genMemoryLogRecordsWithWriterId( writerId, batchSequence, rowKinds, - objects); + objects, + LogFormat.ARROW); } public static MemoryLogRecords genIndexedMemoryLogRecords(List rows) @@ -168,13 +175,18 @@ public static MemoryLogRecords genIndexedMemoryLogRecords(List rows) public static MemoryLogRecords genMemoryLogRecordsWithBaseOffset( long offsetBase, List objects) throws Exception { return createRecordsWithoutBaseLogOffset( - DATA1_ROW_TYPE, DEFAULT_SCHEMA_ID, offsetBase, -1L, objects); + DATA1_ROW_TYPE, DEFAULT_SCHEMA_ID, offsetBase, -1L, objects, LogFormat.ARROW); } public static MemoryLogRecords genLogRecordsWithBaseOffsetAndTimestamp( long offsetBase, long maxTimestamp, List objects) throws Exception { return createRecordsWithoutBaseLogOffset( - DATA1_ROW_TYPE, DEFAULT_SCHEMA_ID, offsetBase, maxTimestamp, objects); + DATA1_ROW_TYPE, + DEFAULT_SCHEMA_ID, + offsetBase, + maxTimestamp, + objects, + LogFormat.ARROW); } public static KvRecordBatch genKvRecordBatch(List> keyAndValues) @@ -246,11 +258,20 @@ public static void genRemoteLogSegmentFile( remoteLogTabletDir(remoteLogDir(conf), physicalTablePath, tableBucket); FsPath remoteLogSegmentDir = remoteLogSegmentDir(remoteLogTabletDir, remoteLogSegment.remoteLogSegmentId()); - genLogFile(DATA1_ROW_TYPE, new File(remoteLogSegmentDir.toString()), DATA1, baseOffset); + genLogFile( + DATA1_ROW_TYPE, + new File(remoteLogSegmentDir.toString()), + DATA1, + baseOffset, + LogFormat.ARROW); } public static File genLogFile( - RowType rowType, File segmentDir, List objects, long baseOffset) + RowType rowType, + File segmentDir, + List objects, + long baseOffset, + LogFormat logFormat) throws Exception { if (!segmentDir.exists()) { segmentDir.mkdirs(); @@ -264,7 +285,8 @@ public static File genLogFile( DEFAULT_SCHEMA_ID, baseOffset, System.currentTimeMillis(), - objects)); + objects, + logFormat)); fileLogRecords.flush(); fileLogRecords.close(); return logFile; @@ -305,7 +327,8 @@ public static MemoryLogRecords createRecordsWithoutBaseLogOffset( int schemaId, long offsetBase, long maxTimestamp, - List objects) + List objects, + LogFormat logFormat) throws Exception { List rowKinds = objects.stream().map(row -> RowKind.APPEND_ONLY).collect(Collectors.toList()); @@ -317,7 +340,8 @@ public static MemoryLogRecords createRecordsWithoutBaseLogOffset( NO_WRITER_ID, NO_BATCH_SEQUENCE, rowKinds, - objects); + objects, + logFormat); } public static MemoryLogRecords createBasicMemoryLogRecords( @@ -328,19 +352,58 @@ public static MemoryLogRecords createBasicMemoryLogRecords( long writerId, int batchSequence, List rowKinds, - List objects) + List objects, + LogFormat logFormat) throws Exception { - List rows = - objects.stream().map(object -> row(rowType, object)).collect(Collectors.toList()); - return createArrowMemoryLogRecords( + return createMemoryLogRecords( rowType, + schemaId, offsetBase, maxTimestamp, - schemaId, writerId, batchSequence, rowKinds, - rows); + objects, + logFormat); + } + + public static MemoryLogRecords createMemoryLogRecords( + RowType rowType, + int schemaId, + long offsetBase, + long maxTimestamp, + long writerId, + int batchSequence, + List rowKinds, + List objects, + LogFormat logFormat) + throws Exception { + if (logFormat == LogFormat.ARROW) { + List rows = + objects.stream() + .map(object -> row(rowType, object)) + .collect(Collectors.toList()); + return createArrowMemoryLogRecords( + rowType, + offsetBase, + maxTimestamp, + schemaId, + writerId, + batchSequence, + rowKinds, + rows); + } else { + return createIndexedMemoryLogRecords( + offsetBase, + maxTimestamp, + schemaId, + writerId, + batchSequence, + rowKinds, + objects.stream() + .map(object -> row(rowType, object)) + .collect(Collectors.toList())); + } } private static MemoryLogRecords createIndexedMemoryLogRecords( diff --git a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/source/FlinkTableSourceITCase.java b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/source/FlinkTableSourceITCase.java index c88d5635..a07df25e 100644 --- a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/source/FlinkTableSourceITCase.java +++ b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/source/FlinkTableSourceITCase.java @@ -191,6 +191,69 @@ void testNonPkTableRead() throws Exception { } } + @ParameterizedTest + @ValueSource(strings = {"ARROW", "INDEXED"}) + void testAppendTableProjectPushDown(String logFormat) throws Exception { + String tableName = "append_table_project_push_down_" + logFormat; + tEnv.executeSql( + String.format( + "create table %s (a int, b varchar, c bigint, d int, e int, f bigint) with" + + " ('connector' = 'fluss', 'table.log.format' = '%s')", + tableName, logFormat)); + TablePath tablePath = TablePath.of(DEFAULT_DB, tableName); + Table table = conn.getTable(tablePath); + RowType dataType = table.getDescriptor().getSchema().toRowType(); + List rows = + Arrays.asList( + genRow(false, dataType, new Object[] {1, "v1", 100L, 1000, 100, 1000L}), + genRow(false, dataType, new Object[] {2, "v2", 200L, 2000, 200, 2000L}), + genRow(false, dataType, new Object[] {3, "v3", 300L, 3000, 300, 3000L}), + genRow(false, dataType, new Object[] {4, "v4", 400L, 4000, 400, 4000L}), + genRow(false, dataType, new Object[] {5, "v5", 500L, 5000, 500, 5000L}), + genRow(false, dataType, new Object[] {6, "v6", 600L, 6000, 600, 6000L}), + genRow(false, dataType, new Object[] {7, "v7", 700L, 7000, 700, 7000L}), + genRow(false, dataType, new Object[] {8, "v8", 800L, 8000, 800, 8000L}), + genRow(false, dataType, new Object[] {9, "v9", 900L, 9000, 900, 9000L}), + genRow( + false, + dataType, + new Object[] {10, "v10", 1000L, 10000, 1000, 10000L})); + writeRows(tablePath, rows, true); + + // projection + reorder. + String query = "select b, d, c from " + tableName; + // make sure the plan has pushed down the projection into source + assertThat(tEnv.explainSql(query)) + .contains( + "TableSourceScan(table=[[testcatalog, defaultdb, " + + tableName + + ", project=[b, d, c]]], fields=[b, d, c])"); + + List expected = + Arrays.asList( + "+I[v1, 1000, 100]", + "+I[v2, 2000, 200]", + "+I[v3, 3000, 300]", + "+I[v4, 4000, 400]", + "+I[v5, 5000, 500]", + "+I[v6, 6000, 600]", + "+I[v7, 7000, 700]", + "+I[v8, 8000, 800]", + "+I[v9, 9000, 900]", + "+I[v10, 10000, 1000]"); + try (org.apache.flink.util.CloseableIterator rowIter = + tEnv.executeSql(query).collect()) { + int expectRecords = expected.size(); + List actual = new ArrayList<>(expectRecords); + for (int i = 0; i < expectRecords; i++) { + Row r = rowIter.next(); + String row = r.toString(); + actual.add(row); + } + assertThat(actual).containsExactlyElementsOf(expected); + } + } + @ParameterizedTest @ValueSource(strings = {"PK_SNAPSHOT", "PK_LOG", "LOG"}) void testTableProjectPushDown(String mode) throws Exception { diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/kv/KvTabletTest.java b/fluss-server/src/test/java/com/alibaba/fluss/server/kv/KvTabletTest.java index 63b126fb..89064e59 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/kv/KvTabletTest.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/kv/KvTabletTest.java @@ -556,7 +556,8 @@ private MemoryLogRecords logRecords( NO_WRITER_ID, NO_BATCH_SEQUENCE, rowKinds, - values); + values, + LogFormat.ARROW); } private MemoryLogRecords logRecords( @@ -570,7 +571,8 @@ private MemoryLogRecords logRecords( NO_WRITER_ID, NO_BATCH_SEQUENCE, rowKinds, - values); + values, + LogFormat.ARROW); } private void checkEqual( diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/ReplicaTest.java b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/ReplicaTest.java index 5b791e62..d9cf963c 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/ReplicaTest.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/ReplicaTest.java @@ -17,6 +17,7 @@ package com.alibaba.fluss.server.replica; import com.alibaba.fluss.config.ConfigOptions; +import com.alibaba.fluss.metadata.LogFormat; import com.alibaba.fluss.metadata.PhysicalTablePath; import com.alibaba.fluss.metadata.TableBucket; import com.alibaba.fluss.metadata.TablePath; @@ -527,7 +528,8 @@ private static MemoryLogRecords logRecords( NO_WRITER_ID, NO_BATCH_SEQUENCE, rowKinds, - values); + values, + LogFormat.ARROW); } private LogAppendInfo putRecordsToLeader(