Skip to content

Commit

Permalink
[core] Refactor MergeTreeReaders.readerForMergeTree to unify invoking (
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi authored Mar 13, 2024
1 parent 6d723a4 commit 48c135e
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@
import org.apache.paimon.io.KeyValueFileReaderFactory;
import org.apache.paimon.mergetree.compact.ConcatRecordReader;
import org.apache.paimon.mergetree.compact.ConcatRecordReader.ReaderSupplier;
import org.apache.paimon.mergetree.compact.MergeFunction;
import org.apache.paimon.mergetree.compact.MergeFunctionWrapper;
import org.apache.paimon.mergetree.compact.ReducerMergeFunctionWrapper;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.utils.FieldsComparator;

Expand All @@ -42,16 +40,15 @@ public class MergeTreeReaders {

private MergeTreeReaders() {}

public static RecordReader<KeyValue> readerForMergeTree(
public static <T> RecordReader<T> readerForMergeTree(
List<List<SortedRun>> sections,
boolean dropDelete,
KeyValueFileReaderFactory readerFactory,
Comparator<InternalRow> userKeyComparator,
@Nullable FieldsComparator userDefinedSeqComparator,
MergeFunction<KeyValue> mergeFunction,
MergeFunctionWrapper<T> mergeFunctionWrapper,
MergeSorter mergeSorter)
throws IOException {
List<ReaderSupplier<KeyValue>> readers = new ArrayList<>();
List<ReaderSupplier<T>> readers = new ArrayList<>();
for (List<SortedRun> section : sections) {
readers.add(
() ->
Expand All @@ -60,14 +57,10 @@ public static RecordReader<KeyValue> readerForMergeTree(
readerFactory,
userKeyComparator,
userDefinedSeqComparator,
new ReducerMergeFunctionWrapper(mergeFunction),
mergeFunctionWrapper,
mergeSorter));
}
RecordReader<KeyValue> reader = ConcatRecordReader.create(readers);
if (dropDelete) {
reader = new DropDeleteReader(reader);
}
return reader;
return ConcatRecordReader.create(readers);
}

public static <T> RecordReader<T> readerForSection(
Expand All @@ -86,7 +79,7 @@ public static <T> RecordReader<T> readerForSection(
readers, userKeyComparator, userDefinedSeqComparator, mergeFunctionWrapper);
}

public static RecordReader<KeyValue> readerForRun(
private static RecordReader<KeyValue> readerForRun(
SortedRun run, KeyValueFileReaderFactory readerFactory) throws IOException {
List<ReaderSupplier<KeyValue>> readers = new ArrayList<>();
for (DataFileMeta file : run.files()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,12 @@
import org.apache.paimon.io.KeyValueFileWriterFactory;
import org.apache.paimon.io.RollingFileWriter;
import org.apache.paimon.mergetree.MergeSorter;
import org.apache.paimon.mergetree.MergeTreeReaders;
import org.apache.paimon.mergetree.SortedRun;
import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.utils.CloseableIterator;
import org.apache.paimon.utils.FieldsComparator;

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
Expand Down Expand Up @@ -120,25 +118,15 @@ private CompactResult rewriteChangelogCompaction(
boolean dropDelete,
boolean rewriteCompactFile)
throws Exception {
List<ConcatRecordReader.ReaderSupplier<ChangelogResult>> sectionReaders = new ArrayList<>();
for (List<SortedRun> section : sections) {
sectionReaders.add(
() ->
MergeTreeReaders.readerForSection(
section,
readerFactory,
keyComparator,
userDefinedSeqComparator,
createMergeWrapper(outputLevel),
mergeSorter));
}

RecordReaderIterator<ChangelogResult> iterator = null;
CloseableIterator<ChangelogResult> iterator = null;
RollingFileWriter<KeyValue, DataFileMeta> compactFileWriter = null;
RollingFileWriter<KeyValue, DataFileMeta> changelogFileWriter = null;

try {
iterator = new RecordReaderIterator<>(ConcatRecordReader.create(sectionReaders));
iterator =
readerForMergeTree(sections, createMergeWrapper(outputLevel))
.toCloseableIterator();
if (rewriteCompactFile) {
compactFileWriter = writerFactory.createRollingMergeTreeFileWriter(outputLevel);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.paimon.io.KeyValueFileReaderFactory;
import org.apache.paimon.io.KeyValueFileWriterFactory;
import org.apache.paimon.io.RollingFileWriter;
import org.apache.paimon.mergetree.DropDeleteReader;
import org.apache.paimon.mergetree.MergeSorter;
import org.apache.paimon.mergetree.MergeTreeReaders;
import org.apache.paimon.mergetree.SortedRun;
Expand All @@ -34,6 +35,7 @@

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.Comparator;
import java.util.List;

Expand Down Expand Up @@ -72,21 +74,29 @@ protected CompactResult rewriteCompaction(
int outputLevel, boolean dropDelete, List<List<SortedRun>> sections) throws Exception {
RollingFileWriter<KeyValue, DataFileMeta> writer =
writerFactory.createRollingMergeTreeFileWriter(outputLevel);
RecordReader<KeyValue> sectionsReader =
MergeTreeReaders.readerForMergeTree(
sections,
dropDelete,
readerFactory,
keyComparator,
userDefinedSeqComparator,
mfFactory.create(),
mergeSorter);
writer.write(new RecordReaderIterator<>(sectionsReader));
RecordReader<KeyValue> reader =
readerForMergeTree(sections, new ReducerMergeFunctionWrapper(mfFactory.create()));
if (dropDelete) {
reader = new DropDeleteReader(reader);
}
writer.write(new RecordReaderIterator<>(reader));
writer.close();
List<DataFileMeta> before = extractFilesFromSections(sections);
notifyCompactBefore(before);
return new CompactResult(before, writer.result());
}

protected <T> RecordReader<T> readerForMergeTree(
List<List<SortedRun>> sections, MergeFunctionWrapper<T> mergeFunctionWrapper)
throws IOException {
return MergeTreeReaders.readerForMergeTree(
sections,
readerFactory,
keyComparator,
userDefinedSeqComparator,
mergeFunctionWrapper,
mergeSorter);
}

protected void notifyCompactBefore(List<DataFileMeta> files) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction;
import org.apache.paimon.mergetree.compact.IntervalPartition;
import org.apache.paimon.mergetree.compact.MergeTreeCompactManager;
import org.apache.paimon.mergetree.compact.ReducerMergeFunctionWrapper;
import org.apache.paimon.mergetree.compact.UniversalCompaction;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
Expand Down Expand Up @@ -556,12 +557,15 @@ private List<TestRecord> readAll(List<DataFileMeta> files, boolean dropDelete)
RecordReader<KeyValue> reader =
MergeTreeReaders.readerForMergeTree(
new IntervalPartition(files, comparator).partition(),
dropDelete,
readerFactory,
comparator,
null,
DeduplicateMergeFunction.factory().create(),
new ReducerMergeFunctionWrapper(
DeduplicateMergeFunction.factory().create()),
new MergeSorter(options, null, null, null));
if (dropDelete) {
reader = new DropDeleteReader(reader);
}
List<TestRecord> records = new ArrayList<>();
try (RecordReaderIterator<KeyValue> iterator = new RecordReaderIterator<>(reader)) {
while (iterator.hasNext()) {
Expand Down Expand Up @@ -600,16 +604,19 @@ public CompactResult rewrite(
throws Exception {
RollingFileWriter<KeyValue, DataFileMeta> writer =
writerFactory.createRollingMergeTreeFileWriter(outputLevel);
RecordReader<KeyValue> sectionsReader =
RecordReader<KeyValue> reader =
MergeTreeReaders.readerForMergeTree(
sections,
dropDelete,
compactReaderFactory,
comparator,
null,
DeduplicateMergeFunction.factory().create(),
new ReducerMergeFunctionWrapper(
DeduplicateMergeFunction.factory().create()),
new MergeSorter(options, null, null, null));
writer.write(new RecordReaderIterator<>(sectionsReader));
if (dropDelete) {
reader = new DropDeleteReader(reader);
}
writer.write(new RecordReaderIterator<>(reader));
writer.close();
return new CompactResult(extractFilesFromSections(sections), writer.result());
}
Expand Down

0 comments on commit 48c135e

Please sign in to comment.