diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/lookup/FlinkAsyncLookupFunction.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/lookup/FlinkAsyncLookupFunction.java index 87f38d07..09160081 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/lookup/FlinkAsyncLookupFunction.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/lookup/FlinkAsyncLookupFunction.java @@ -28,6 +28,7 @@ import com.alibaba.fluss.exception.TableNotExistException; import com.alibaba.fluss.metadata.TablePath; import com.alibaba.fluss.row.InternalRow; +import com.alibaba.fluss.row.ProjectedRow; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.utils.ProjectedRowData; @@ -171,9 +172,9 @@ private void fetchResult( if (row == null) { resultFuture.complete(Collections.emptyList()); } else { - // TODO: we can project fluss row first, - // to avoid deserialize unnecessary fields - RowData flinkRow = flussRowToFlinkRowConverter.toFlinkRowData(row); + RowData flinkRow = + flussRowToFlinkRowConverter.toFlinkRowData( + ProjectedRow.from(projection).replaceRow(row)); if (remainingFilter != null && !remainingFilter.isMatch(flinkRow)) { resultFuture.complete(Collections.emptyList()); } else { diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/lookup/FlinkLookupFunction.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/lookup/FlinkLookupFunction.java index 2e28e856..e0846340 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/lookup/FlinkLookupFunction.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/lookup/FlinkLookupFunction.java @@ -25,6 +25,7 @@ import com.alibaba.fluss.connector.flink.utils.FlussRowToFlinkRowConverter; import com.alibaba.fluss.metadata.TablePath; import com.alibaba.fluss.row.InternalRow; +import com.alibaba.fluss.row.ProjectedRow; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.utils.ProjectedRowData; @@ -124,8 +125,9 @@ public Collection lookup(RowData keyRow) { try { InternalRow row = table.lookup(flussKeyRow).get().getRow(); if (row != null) { - // TODO: we can project fluss row first, to avoid deserialize unnecessary fields - RowData flinkRow = flussRowToFlinkRowConverter.toFlinkRowData(row); + RowData flinkRow = + flussRowToFlinkRowConverter.toFlinkRowData( + ProjectedRow.from(projection).replaceRow(row)); if (remainingFilter == null || remainingFilter.isMatch(flinkRow)) { return Collections.singletonList(maybeProject(flinkRow)); } else {