Skip to content

Commit

Permalink
[client] Indexed row support projection
Browse files Browse the repository at this point in the history
  • Loading branch information
swuferhong committed Dec 13, 2024
1 parent 945fee0 commit 9188502
Show file tree
Hide file tree
Showing 14 changed files with 386 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@
import com.alibaba.fluss.record.LogRecord;
import com.alibaba.fluss.record.LogRecordBatch;
import com.alibaba.fluss.record.LogRecordReadContext;
import com.alibaba.fluss.row.GenericRow;
import com.alibaba.fluss.row.InternalRow;
import com.alibaba.fluss.row.InternalRow.FieldGetter;
import com.alibaba.fluss.row.ProjectedRow;
import com.alibaba.fluss.rpc.messages.FetchLogRequest;
import com.alibaba.fluss.rpc.protocol.ApiError;
import com.alibaba.fluss.types.RowType;
import com.alibaba.fluss.utils.CloseableIterator;
import com.alibaba.fluss.utils.Projection;

Expand Down Expand Up @@ -60,7 +61,7 @@ abstract class CompletedFetch {
private final boolean isCheckCrcs;
private final Iterator<LogRecordBatch> batches;
private final LogScannerStatus logScannerStatus;
private final LogRecordReadContext readContext;
protected final LogRecordReadContext readContext;
@Nullable protected final Projection projection;
protected final InternalRow.FieldGetter[] fieldGetters;

Expand Down Expand Up @@ -95,15 +96,37 @@ public CompletedFetch(
this.logScannerStatus = logScannerStatus;
this.projection = projection;
this.nextFetchOffset = fetchOffset;
RowType rowType = readContext.getRowType();
this.fieldGetters = new FieldGetter[rowType.getFieldCount()];
this.fieldGetters = buildFieldGetters();
}

/**
* Builds the field getters for fetch.
*
* @return the field getters
*/
protected abstract FieldGetter[] buildFieldGetters();

// TODO: optimize this to avoid deep copying the record.
// refactor #fetchRecords to return an iterator which lazily deserialize
// from underlying record stream and arrow buffer.
ScanRecord toScanRecord(LogRecord record) {
GenericRow newRow = new GenericRow(fieldGetters.length);
InternalRow internalRow = record.getRow();
for (int i = 0; i < fieldGetters.length; i++) {
fieldGetters[i] = InternalRow.createFieldGetter(rowType.getChildren().get(i), i);
newRow.setField(i, fieldGetters[i].getFieldOrNull(internalRow));
}
if (projection != null && projection.isReorderingNeeded()) {
return new ScanRecord(
record.logOffset(),
record.timestamp(),
record.getRowKind(),
ProjectedRow.from(projection.getReorderingIndexes()).replaceRow(newRow));
} else {
return new ScanRecord(
record.logOffset(), record.timestamp(), record.getRowKind(), newRow);
}
}

protected abstract ScanRecord toScanRecord(LogRecord record);

boolean isConsumed() {
return isConsumed;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,14 @@
package com.alibaba.fluss.client.scanner.log;

import com.alibaba.fluss.annotation.Internal;
import com.alibaba.fluss.client.scanner.ScanRecord;
import com.alibaba.fluss.metadata.LogFormat;
import com.alibaba.fluss.metadata.TableBucket;
import com.alibaba.fluss.record.LogRecord;
import com.alibaba.fluss.record.LogRecordReadContext;
import com.alibaba.fluss.row.GenericRow;
import com.alibaba.fluss.row.InternalRow;
import com.alibaba.fluss.row.ProjectedRow;
import com.alibaba.fluss.row.InternalRow.FieldGetter;
import com.alibaba.fluss.rpc.entity.FetchLogResultForBucket;
import com.alibaba.fluss.rpc.messages.FetchLogRequest;
import com.alibaba.fluss.types.RowType;
import com.alibaba.fluss.utils.Projection;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -58,25 +57,28 @@ public DefaultCompletedFetch(
projection);
}

// TODO: optimize this to avoid deep copying the record.
// refactor #fetchRecords to return an iterator which lazily deserialize
// from underlying record stream and arrow buffer.
@Override
protected ScanRecord toScanRecord(LogRecord record) {
GenericRow newRow = new GenericRow(fieldGetters.length);
InternalRow internalRow = record.getRow();
for (int i = 0; i < fieldGetters.length; i++) {
newRow.setField(i, fieldGetters[i].getFieldOrNull(internalRow));
}
if (projection != null && projection.isReorderingNeeded()) {
return new ScanRecord(
record.logOffset(),
record.timestamp(),
record.getRowKind(),
ProjectedRow.from(projection.getReorderingIndexes()).replaceRow(newRow));
protected FieldGetter[] buildFieldGetters() {
RowType rowType = readContext.getRowType();
LogFormat logFormat = readContext.getLogFormat();
FieldGetter[] fieldGetters;
// Arrow log format already project in the server side.
if (projection != null && logFormat != LogFormat.ARROW) {
int[] projectionInOrder = projection.getProjectionInOrder();
fieldGetters = new FieldGetter[projectionInOrder.length];
for (int i = 0; i < fieldGetters.length; i++) {
fieldGetters[i] =
InternalRow.createFieldGetter(
rowType.getChildren().get(projectionInOrder[i]),
projectionInOrder[i]);
}
} else {
return new ScanRecord(
record.logOffset(), record.timestamp(), record.getRowKind(), newRow);
fieldGetters = new FieldGetter[rowType.getChildren().size()];
for (int i = 0; i < fieldGetters.length; i++) {
fieldGetters[i] = InternalRow.createFieldGetter(rowType.getChildren().get(i), i);
}
}

return fieldGetters;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.alibaba.fluss.client.scanner.RemoteFileDownloader;
import com.alibaba.fluss.client.scanner.ScanRecord;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.metadata.LogFormat;
import com.alibaba.fluss.metadata.TableBucket;
import com.alibaba.fluss.metadata.TableInfo;
import com.alibaba.fluss.metadata.TablePath;
Expand Down Expand Up @@ -113,13 +112,6 @@ public FlussLogScanner(
private Projection sanityProjection(@Nullable int[] projectedFields, TableInfo tableInfo) {
RowType tableRowType = tableInfo.getTableDescriptor().getSchema().toRowType();
if (projectedFields != null) {
LogFormat logFormat = tableInfo.getTableDescriptor().getLogFormat();
if (logFormat != LogFormat.ARROW) {
throw new IllegalArgumentException(
String.format(
"Only ARROW log format supports column projection, but the log format of table '%s' is %s",
tableInfo.getTablePath(), logFormat));
}
for (int projectedField : projectedFields) {
if (projectedField < 0 || projectedField >= tableRowType.getFieldCount()) {
throw new IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.exception.InvalidMetadataException;
import com.alibaba.fluss.fs.FsPath;
import com.alibaba.fluss.metadata.LogFormat;
import com.alibaba.fluss.metadata.PhysicalTablePath;
import com.alibaba.fluss.metadata.TableBucket;
import com.alibaba.fluss.metadata.TableInfo;
Expand Down Expand Up @@ -87,14 +88,14 @@ public class LogFetcher implements Closeable {
private final LogRecordReadContext remoteReadContext;
@Nullable private final Projection projection;
private final RpcClient rpcClient;
private final Configuration conf;
private final int maxFetchBytes;
private final int maxBucketFetchBytes;
private final boolean isCheckCrcs;
private final LogScannerStatus logScannerStatus;
private final LogFetchBuffer logFetchBuffer;
private final LogFetchCollector logFetchCollector;
private final RemoteLogDownloader remoteLogDownloader;
private final LogFormat logFormat;

@GuardedBy("this")
private final Set<Integer> nodesWithPendingFetchRequests;
Expand All @@ -116,12 +117,12 @@ public LogFetcher(
RemoteFileDownloader remoteFileDownloader) {
this.tablePath = tableInfo.getTablePath();
this.isPartitioned = tableInfo.getTableDescriptor().isPartitioned();
this.logFormat = tableInfo.getTableDescriptor().getLogFormat();
this.readContext = LogRecordReadContext.createReadContext(tableInfo, projection);
this.remoteReadContext = LogRecordReadContext.createReadContext(tableInfo, null);
this.projection = projection;
this.rpcClient = rpcClient;
this.logScannerStatus = logScannerStatus;
this.conf = conf;
this.maxFetchBytes = (int) conf.get(ConfigOptions.LOG_FETCH_MAX_BYTES).getBytes();
this.maxBucketFetchBytes =
(int) conf.get(ConfigOptions.LOG_FETCH_MAX_BYTES_FOR_BUCKET).getBytes();
Expand Down Expand Up @@ -416,7 +417,7 @@ private Map<Integer, FetchLogRequest> prepareFetchLogRequests() {
.setMaxBytes(maxFetchBytes);
PbFetchLogReqForTable reqForTable =
new PbFetchLogReqForTable().setTableId(finalTableId);
if (projection != null) {
if (projectPushDownEnable()) {
reqForTable
.setProjectionPushdownEnabled(true)
.setProjectedFields(projection.getProjectionInOrder());
Expand All @@ -442,6 +443,13 @@ private List<TableBucket> fetchableBuckets() {
return logScannerStatus.fetchableBuckets(tableBucket -> !exclude.contains(tableBucket));
}

private boolean projectPushDownEnable() {
// Currently, only ARROW log format supports projection push down to server. Other log
// formats will do project in client, see DefaultCompletedFetch#toScanRecord() for more
// details.
return projection != null && logFormat == LogFormat.ARROW;
}

private Integer getTableBucketLeader(TableBucket tableBucket) {
metadataUpdater.checkAndUpdateMetadata(tablePath, tableBucket);
if (metadataUpdater.getBucketLocation(tableBucket).isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,13 @@
package com.alibaba.fluss.client.scanner.log;

import com.alibaba.fluss.annotation.Internal;
import com.alibaba.fluss.client.scanner.ScanRecord;
import com.alibaba.fluss.metadata.TableBucket;
import com.alibaba.fluss.record.FileLogRecords;
import com.alibaba.fluss.record.LogRecord;
import com.alibaba.fluss.record.LogRecordReadContext;
import com.alibaba.fluss.row.GenericRow;
import com.alibaba.fluss.row.InternalRow;
import com.alibaba.fluss.row.ProjectedRow;
import com.alibaba.fluss.row.InternalRow.FieldGetter;
import com.alibaba.fluss.rpc.protocol.ApiError;
import com.alibaba.fluss.types.RowType;
import com.alibaba.fluss.utils.Projection;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -82,25 +80,26 @@ void drain() {
recycleCallback.run();
}

// TODO: optimize this to avoid deep copying the record.
// refactor #fetchRecords to return an iterator which lazily deserialize
// from underlying record stream and arrow buffer.
@Override
protected ScanRecord toScanRecord(LogRecord record) {
GenericRow newRow = new GenericRow(fieldGetters.length);
InternalRow internalRow = record.getRow();
for (int i = 0; i < fieldGetters.length; i++) {
newRow.setField(i, fieldGetters[i].getFieldOrNull(internalRow));
}
protected FieldGetter[] buildFieldGetters() {
RowType rowType = readContext.getRowType();
FieldGetter[] fieldGetters;
if (projection != null) {
return new ScanRecord(
record.logOffset(),
record.timestamp(),
record.getRowKind(),
ProjectedRow.from(projection.projection()).replaceRow(newRow));
int[] projectionInOrder = projection.getProjectionInOrder();
fieldGetters = new FieldGetter[projectionInOrder.length];
for (int i = 0; i < fieldGetters.length; i++) {
fieldGetters[i] =
InternalRow.createFieldGetter(
rowType.getChildren().get(projectionInOrder[i]),
projectionInOrder[i]);
}
} else {
return new ScanRecord(
record.logOffset(), record.timestamp(), record.getRowKind(), newRow);
fieldGetters = new FieldGetter[rowType.getChildren().size()];
for (int i = 0; i < fieldGetters.length; i++) {
fieldGetters[i] = InternalRow.createFieldGetter(rowType.getChildren().get(i), i);
}
}

return fieldGetters;
}
}
Loading

0 comments on commit 9188502

Please sign in to comment.