Skip to content

Commit

Permalink
[core] KeyValueTableRead should always project keys (apache#2992)
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi authored Mar 13, 2024
1 parent 132e09a commit 6d723a4
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import javax.annotation.Nullable;

import java.io.IOException;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
Expand All @@ -50,6 +51,11 @@ public static <R> RecordReader<R> create(List<ReaderSupplier<R>> readers) throws
return readers.size() == 1 ? readers.get(0).get() : new ConcatRecordReader<>(readers);
}

public static <R> RecordReader<R> create(ReaderSupplier<R> reader1, ReaderSupplier<R> reader2)
throws IOException {
return create(Arrays.asList(reader1, reader2));
}

@Nullable
@Override
public RecordIterator<T> readBatch() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -180,77 +179,78 @@ public RecordReader<KeyValue> createReader(DataSplit split) throws IOException {

private RecordReader<KeyValue> createReaderWithoutOuterProjection(DataSplit split)
throws IOException {
ReaderSupplier<KeyValue> beforeSupplier = null;
if (split.beforeFiles().size() > 0) {
if (split.isStreaming() || split.beforeDeletionFiles().isPresent()) {
beforeSupplier =
() ->
new ReverseReader(
noMergeRead(
split.partition(),
split.bucket(),
split.beforeFiles(),
split.beforeDeletionFiles().orElse(null),
split.isStreaming()));
if (split.beforeFiles().isEmpty()) {
if (split.isStreaming() || split.deletionFiles().isPresent()) {
return noMergeRead(
split.partition(),
split.bucket(),
split.dataFiles(),
split.deletionFiles().orElse(null),
split.isStreaming());
} else {
beforeSupplier =
() ->
mergeRead(
split.partition(),
split.bucket(),
split.beforeFiles(),
false);
return projectKey(
mergeRead(
split.partition(),
split.bucket(),
split.dataFiles(),
null,
forceKeepDelete));
}
}

ReaderSupplier<KeyValue> dataSupplier;
if (split.isStreaming() || split.deletionFiles().isPresent()) {
dataSupplier =
} else if (split.isStreaming()) {
// streaming concat read
return ConcatRecordReader.create(
() ->
new ReverseReader(
noMergeRead(
split.partition(),
split.bucket(),
split.beforeFiles(),
split.beforeDeletionFiles().orElse(null),
true)),
() ->
noMergeRead(
split.partition(),
split.bucket(),
split.dataFiles(),
split.deletionFiles().orElse(null),
split.isStreaming());
true));
} else {
dataSupplier =
() ->
// batch diff read
return projectKey(
DiffReader.readDiff(
mergeRead(
split.partition(),
split.bucket(),
split.beforeFiles(),
split.beforeDeletionFiles().orElse(null),
false),
mergeRead(
split.partition(),
split.bucket(),
split.dataFiles(),
split.beforeFiles().isEmpty() && forceKeepDelete);
}

if (split.isStreaming()) {
return beforeSupplier == null
? dataSupplier.get()
: ConcatRecordReader.create(Arrays.asList(beforeSupplier, dataSupplier));
} else {
return beforeSupplier == null
? dataSupplier.get()
: DiffReader.readDiff(
beforeSupplier.get(),
dataSupplier.get(),
split.deletionFiles().orElse(null),
false),
keyComparator,
userDefinedSeqComparator,
mergeSorter,
forceKeepDelete);
forceKeepDelete));
}
}

private RecordReader<KeyValue> mergeRead(
BinaryRow partition, int bucket, List<DataFileMeta> files, boolean keepDelete)
BinaryRow partition,
int bucket,
List<DataFileMeta> files,
@Nullable List<DeletionFile> deletionFiles,
boolean keepDelete)
throws IOException {
// Sections are read by SortMergeReader, which sorts and merges records by keys.
// So we cannot project keys or else the sorting will be incorrect.
DeletionVector.Factory dvFactory = DeletionVector.factory(fileIO, files, deletionFiles);
KeyValueFileReaderFactory overlappedSectionFactory =
readerFactoryBuilder.build(
partition, bucket, DeletionVector.emptyFactory(), false, filtersForKeys);
readerFactoryBuilder.build(partition, bucket, dvFactory, false, filtersForKeys);
KeyValueFileReaderFactory nonOverlappedSectionFactory =
readerFactoryBuilder.build(
partition, bucket, DeletionVector.emptyFactory(), false, filtersForAll);
readerFactoryBuilder.build(partition, bucket, dvFactory, false, filtersForAll);

List<ReaderSupplier<KeyValue>> sectionReaders = new ArrayList<>();
MergeFunctionWrapper<KeyValue> mergeFuncWrapper =
Expand All @@ -274,8 +274,7 @@ private RecordReader<KeyValue> mergeRead(
reader = new DropDeleteReader(reader);
}

// Project results from SortMergeReader using ProjectKeyRecordReader.
return keyProjectedFields == null ? reader : projectKey(reader, keyProjectedFields);
return reader;
}

private RecordReader<KeyValue> noMergeRead(
Expand All @@ -285,12 +284,11 @@ private RecordReader<KeyValue> noMergeRead(
@Nullable List<DeletionFile> deletionFiles,
boolean onlyFilterKey)
throws IOException {
DeletionVector.Factory dvFactory = DeletionVector.factory(fileIO, files, deletionFiles);
KeyValueFileReaderFactory readerFactory =
readerFactoryBuilder.build(
partition,
bucket,
dvFactory,
DeletionVector.factory(fileIO, files, deletionFiles),
true,
onlyFilterKey ? filtersForKeys : filtersForAll);
List<ReaderSupplier<KeyValue>> suppliers = new ArrayList<>();
Expand All @@ -316,8 +314,11 @@ private Optional<String> changelogFile(DataFileMeta fileMeta) {
return Optional.empty();
}

private RecordReader<KeyValue> projectKey(
RecordReader<KeyValue> reader, int[][] keyProjectedFields) {
private RecordReader<KeyValue> projectKey(RecordReader<KeyValue> reader) {
if (keyProjectedFields == null) {
return reader;
}

ProjectedRow projectedRow = ProjectedRow.from(keyProjectedFields);
return reader.transform(kv -> kv.replaceKey(projectedRow.replaceRow(kv.key())));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ public abstract class KeyValueTableRead extends AbstractDataTableRead<KeyValue>

protected KeyValueTableRead(KeyValueFileStoreRead read, TableSchema schema) {
super(read, schema);
this.read = read;
// We don't need any key fields, the columns that need to be read are already included in
// the value
this.read = read.withKeyProjection(new int[0][]);
}

@Override
Expand All @@ -53,10 +55,6 @@ public final RecordReader<InternalRow> reader(Split split) throws IOException {
return new RowDataRecordReader(read.createReader((DataSplit) split));
}

public final RecordReader<KeyValue> kvReader(Split split) throws IOException {
return read.createReader((DataSplit) split);
}

protected abstract RecordReader.RecordIterator<InternalRow> rowDataRecordIteratorFromKv(
RecordReader.RecordIterator<KeyValue> kvRecordIterator);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@
package org.apache.paimon.flink.lookup;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.JoinedRow;
import org.apache.paimon.io.SplitsParallelReadUtil;
import org.apache.paimon.mergetree.compact.ConcatRecordReader;
import org.apache.paimon.options.ConfigOption;
Expand All @@ -31,11 +29,9 @@
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.KeyValueTableRead;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.StreamTableScan;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FunctionWithIOException;
import org.apache.paimon.utils.TypeUtils;
Expand Down Expand Up @@ -148,25 +144,4 @@ public RecordReader<InternalRow> nextBatch(boolean useParallelism) throws Except
}
return reader;
}

private FunctionWithIOException<Split, RecordReader<InternalRow>>
createReaderWithSequenceSupplier() {
return split -> {
TableRead read = readBuilder.newRead();
if (!(read instanceof KeyValueTableRead)) {
throw new IllegalArgumentException(
"Only KeyValueTableRead supports sequence read, but it is: " + read);
}

KeyValueTableRead kvRead = (KeyValueTableRead) read;
JoinedRow reused = new JoinedRow();
return kvRead.kvReader(split)
.transform(
kv -> {
reused.replace(kv.value(), GenericRow.of(kv.sequenceNumber()));
reused.setRowKind(kv.valueKind());
return reused;
});
};
}
}

0 comments on commit 6d723a4

Please sign in to comment.