Skip to content

Commit

Permalink
[client] Refactor code to avoid doing projection for rows twice
Browse files Browse the repository at this point in the history
This simplifies code and reduce CPU costs as well.
  • Loading branch information
wuchong committed Dec 24, 2024
1 parent 27e140e commit 55e7bb9
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 148 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,13 @@
import com.alibaba.fluss.record.LogRecordReadContext;
import com.alibaba.fluss.row.GenericRow;
import com.alibaba.fluss.row.InternalRow;
import com.alibaba.fluss.row.ProjectedRow;
import com.alibaba.fluss.rpc.messages.FetchLogRequest;
import com.alibaba.fluss.rpc.protocol.ApiError;
import com.alibaba.fluss.utils.CloseableIterator;
import com.alibaba.fluss.utils.Projection;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -61,8 +57,7 @@ abstract class CompletedFetch {
private final Iterator<LogRecordBatch> batches;
private final LogScannerStatus logScannerStatus;
protected final LogRecordReadContext readContext;
@Nullable protected final Projection projection;
protected final InternalRow.FieldGetter[] fieldGetters;
protected final InternalRow.FieldGetter[] selectedFieldGetters;

private LogRecordBatch currentBatch;
private LogRecord lastRecord;
Expand Down Expand Up @@ -92,30 +87,20 @@ public CompletedFetch(
this.readContext = readContext;
this.isCheckCrcs = isCheckCrcs;
this.logScannerStatus = logScannerStatus;
this.projection = readContext.getProjection();
this.nextFetchOffset = fetchOffset;
this.fieldGetters = readContext.getProjectedFieldGetters();
this.selectedFieldGetters = readContext.getSelectedFieldGetters();
}

// TODO: optimize this to avoid deep copying the record.
// refactor #fetchRecords to return an iterator which lazily deserialize
// from underlying record stream and arrow buffer.
ScanRecord toScanRecord(LogRecord record) {
GenericRow newRow = new GenericRow(fieldGetters.length);
GenericRow newRow = new GenericRow(selectedFieldGetters.length);
InternalRow internalRow = record.getRow();
for (int i = 0; i < fieldGetters.length; i++) {
newRow.setField(i, fieldGetters[i].getFieldOrNull(internalRow));
}
if (projection != null && projection.isReorderingNeeded()) {
return new ScanRecord(
record.logOffset(),
record.timestamp(),
record.getRowKind(),
ProjectedRow.from(projection.getReorderingIndexes()).replaceRow(newRow));
} else {
return new ScanRecord(
record.logOffset(), record.timestamp(), record.getRowKind(), newRow);
for (int i = 0; i < selectedFieldGetters.length; i++) {
newRow.setField(i, selectedFieldGetters[i].getFieldOrNull(internalRow));
}
return new ScanRecord(record.logOffset(), record.timestamp(), record.getRowKind(), newRow);
}

boolean isConsumed() {
Expand Down Expand Up @@ -243,7 +228,7 @@ private LogRecord nextFetchedRecord() throws Exception {

private void maybeEnsureValid(LogRecordBatch batch) {
if (isCheckCrcs) {
if (projection != null) {
if (readContext.isProjectionPushDowned()) {
LOG.debug("Skipping CRC check for column projected log record batch.");
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.exception.InvalidMetadataException;
import com.alibaba.fluss.fs.FsPath;
import com.alibaba.fluss.metadata.LogFormat;
import com.alibaba.fluss.metadata.PhysicalTablePath;
import com.alibaba.fluss.metadata.TableBucket;
import com.alibaba.fluss.metadata.TableInfo;
Expand Down Expand Up @@ -85,8 +84,8 @@ public class LogFetcher implements Closeable {
private final boolean isPartitioned;
private final LogRecordReadContext readContext;
// TODO this context can be merge with readContext. Introduce it only because log remote read
// currently can only do project when generate scanRecord instead of doing project while read
// bytes from remote file.
// currently can only do project when generate scanRecord instead of doing project while read
// bytes from remote file.
private final LogRecordReadContext remoteReadContext;
@Nullable private final Projection projection;
private final RpcClient rpcClient;
Expand All @@ -97,7 +96,6 @@ public class LogFetcher implements Closeable {
private final LogFetchBuffer logFetchBuffer;
private final LogFetchCollector logFetchCollector;
private final RemoteLogDownloader remoteLogDownloader;
private final LogFormat logFormat;

@GuardedBy("this")
private final Set<Integer> nodesWithPendingFetchRequests;
Expand All @@ -119,7 +117,6 @@ public LogFetcher(
RemoteFileDownloader remoteFileDownloader) {
this.tablePath = tableInfo.getTablePath();
this.isPartitioned = tableInfo.getTableDescriptor().isPartitioned();
this.logFormat = tableInfo.getTableDescriptor().getLogFormat();
this.readContext = LogRecordReadContext.createReadContext(tableInfo, false, projection);
this.remoteReadContext =
LogRecordReadContext.createReadContext(tableInfo, true, projection);
Expand Down Expand Up @@ -317,6 +314,8 @@ private synchronized void handleFetchLogResponse(
fetchResultForBucket,
readContext,
logScannerStatus,
// skipping CRC check if projection push downed as
// the data is pruned
isCheckCrcs,
fetchOffset);
logFetchBuffer.add(completedFetch);
Expand Down Expand Up @@ -423,7 +422,8 @@ private Map<Integer, FetchLogRequest> prepareFetchLogRequests() {
.setMaxBytes(maxFetchBytes);
PbFetchLogReqForTable reqForTable =
new PbFetchLogReqForTable().setTableId(finalTableId);
if (projectionPushDownEnable()) {
if (readContext.isProjectionPushDowned()) {
assert projection != null;
reqForTable
.setProjectionPushdownEnabled(true)
.setProjectedFields(projection.getProjectionInOrder());
Expand All @@ -449,13 +449,6 @@ private List<TableBucket> fetchableBuckets() {
return logScannerStatus.fetchableBuckets(tableBucket -> !exclude.contains(tableBucket));
}

private boolean projectionPushDownEnable() {
// Currently, only ARROW log format supports projection push down to server. Other log
// formats will do project in client, see DefaultCompletedFetch#toScanRecord() for more
// details.
return projection != null && logFormat == LogFormat.ARROW;
}

private Integer getTableBucketLeader(TableBucket tableBucket) {
metadataUpdater.checkAndUpdateMetadata(tablePath, tableBucket);
if (metadataUpdater.getBucketLocation(tableBucket).isPresent()) {
Expand Down
Loading

0 comments on commit 55e7bb9

Please sign in to comment.