Skip to content

Commit

Permalink
Flink: Adds the ability to read from a branch on the Flink Iceberg So…
Browse files Browse the repository at this point in the history
…urce
  • Loading branch information
rodmeneses committed Jan 23, 2024
1 parent 057f887 commit 3385b3f
Show file tree
Hide file tree
Showing 5 changed files with 241 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,6 @@ private void validate() {
"Invalid starting snapshot id for SPECIFIC_START_SNAPSHOT_ID strategy: not null");
}

Preconditions.checkArgument(
branch == null,
String.format(
"Cannot scan table using ref %s configured for streaming reader yet", branch));

Preconditions.checkArgument(
tag == null,
String.format("Cannot scan table using ref %s configured for streaming reader", tag));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,6 @@ public void initializeState(FunctionInitializationContext context) throws Except
Preconditions.checkArgument(
!(scanContext.startTag() != null && scanContext.startSnapshotId() != null),
"START_SNAPSHOT_ID and START_TAG cannot both be set.");
Preconditions.checkArgument(
scanContext.branch() == null,
"Cannot scan table using ref %s configured for streaming reader yet.");
Preconditions.checkNotNull(
table.currentSnapshot(), "Don't have any available snapshot in table.");

Expand Down Expand Up @@ -195,7 +192,10 @@ void monitorAndForwardSplits() {
// Refresh the table to get the latest committed snapshot.
table.refresh();

Snapshot snapshot = table.currentSnapshot();
Snapshot snapshot =
scanContext.branch() != null
? table.snapshot(scanContext.branch())
: table.currentSnapshot();
if (snapshot != null && snapshot.snapshotId() != lastSnapshotId) {
long snapshotId = snapshot.snapshotId();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,11 @@ private Snapshot toSnapshotInclusive(

private ContinuousEnumerationResult discoverIncrementalSplits(
IcebergEnumeratorPosition lastPosition) {
Snapshot currentSnapshot = table.currentSnapshot();
Snapshot currentSnapshot =
scanContext.branch() != null
? table.snapshot(scanContext.branch())
: table.currentSnapshot();

if (currentSnapshot == null) {
// empty table
Preconditions.checkArgument(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,88 @@ public void testSpecificSnapshotTimestamp() throws Exception {
}
}

@Test
public void testReadingFromBranch() throws Exception {
String branch = "b1";
GenericAppenderHelper dataAppender =
new GenericAppenderHelper(tableResource.table(), FileFormat.PARQUET, TEMPORARY_FOLDER);

List<Record> batchBase =
RandomGenericData.generate(tableResource.table().schema(), 2, randomSeed.incrementAndGet());
dataAppender.appendToTable(batchBase);

// create branch
tableResource
.table()
.manageSnapshots()
.createBranch(branch, tableResource.table().currentSnapshot().snapshotId())
.commit();

// snapshot1 to branch
List<Record> batch1 =
RandomGenericData.generate(tableResource.table().schema(), 2, randomSeed.incrementAndGet());
dataAppender.appendToTable(branch, batch1);

// snapshot2 to branch
List<Record> batch2 =
RandomGenericData.generate(tableResource.table().schema(), 2, randomSeed.incrementAndGet());
dataAppender.appendToTable(branch, batch2);

List<Record> branchExpectedRecords = Lists.newArrayList();
branchExpectedRecords.addAll(batchBase);
branchExpectedRecords.addAll(batch1);
branchExpectedRecords.addAll(batch2);
// reads from branch: it should contain the first snapshot (before the branch creation) followed
// by the next 2
ScanContext scanContext =
ScanContext.builder()
.streaming(true)
.monitorInterval(Duration.ofMillis(10L))
.startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
.useBranch(branch)
.build();

try (CloseableIterator<Row> iter =
createStream(scanContext).executeAndCollect(getClass().getSimpleName())) {
List<Row> resultMain = waitForResult(iter, 6);
TestHelpers.assertRecords(resultMain, branchExpectedRecords, tableResource.table().schema());

// snapshot3 to branch
List<Record> batch3 =
RandomGenericData.generate(
tableResource.table().schema(), 2, randomSeed.incrementAndGet());
dataAppender.appendToTable(branch, batch3);

List<Record> batch4 =
RandomGenericData.generate(
tableResource.table().schema(), 2, randomSeed.incrementAndGet());
dataAppender.appendToTable(branch, batch4);

List<Row> result3 = waitForResult(iter, 2);
TestHelpers.assertRecords(result3, batch3, tableResource.table().schema());
}

// read only from main branch. Should contain only the first snapshot
scanContext =
ScanContext.builder()
.streaming(true)
.monitorInterval(Duration.ofMillis(10L))
.startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
.build();
try (CloseableIterator<Row> iter =
createStream(scanContext).executeAndCollect(getClass().getSimpleName())) {
List<Row> resultMain = waitForResult(iter, 2);
TestHelpers.assertRecords(resultMain, batchBase, tableResource.table().schema());

List<Record> batchMain2 =
RandomGenericData.generate(
tableResource.table().schema(), 2, randomSeed.incrementAndGet());
dataAppender.appendToTable(batchMain2);
resultMain = waitForResult(iter, 2);
TestHelpers.assertRecords(resultMain, batchMain2, tableResource.table().schema());
}
}

private DataStream<Row> createStream(ScanContext scanContext) throws Exception {
// start the source and collect output
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Expand All @@ -384,6 +466,7 @@ private DataStream<Row> createStream(ScanContext scanContext) throws Exception {
.startSnapshotTimestamp(scanContext.startSnapshotTimestamp())
.startSnapshotId(scanContext.startSnapshotId())
.monitorInterval(Duration.ofMillis(10L))
.branch(scanContext.branch())
.build(),
WatermarkStrategy.noWatermarks(),
"icebergSource",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,31 @@ public void clean() {
super.clean();
}

private void insertRows(String partition, String branch, Table table, Row... rows)
throws IOException {
GenericAppenderHelper appender = new GenericAppenderHelper(table, FORMAT, temporaryDirectory);

GenericRecord gRecord = GenericRecord.create(table.schema());
List<Record> records = Lists.newArrayList();
for (Row row : rows) {
records.add(
gRecord.copy(
"id", row.getField(0),
"data", row.getField(1),
"dt", row.getField(2)));
}

if (partition != null) {
appender.appendToTable(TestHelpers.Row.of(partition, 0), records);
} else {
if (branch != null) {
appender.appendToTable(branch, records);
} else {
appender.appendToTable(records);
}
}
}

private void insertRows(String partition, Table table, Row... rows) throws IOException {
GenericAppenderHelper appender = new GenericAppenderHelper(table, FORMAT, temporaryDirectory);

Expand All @@ -117,6 +142,10 @@ private void insertRows(String partition, Table table, Row... rows) throws IOExc
}
}

private void insertRowsInBranch(String branch, Table table, Row... rows) throws IOException {
insertRows(null, branch, table, rows);
}

private void insertRows(Table table, Row... rows) throws IOException {
insertRows(null, table, rows);
}
Expand Down Expand Up @@ -205,20 +234,132 @@ public void testConsumeFromBeginning() throws Exception {
}

@TestTemplate
public void testConsumeFilesWithBranch() throws Exception {
/**
* Insert records on the main branch. Then, insert in a named branch. Reads from the main branch
* and assert that the only records from main are returned
*/
public void testConsumeFilesFromMainOnlyWithBranch() throws Exception {
sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE);
Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE));

// Produce two snapshots on main branch
Row row1 = Row.of(1, "aaa", "2021-01-01");
Row row2 = Row.of(2, "bbb", "2021-01-01");

insertRows(table, row1, row2);
String branchName = "b1";
table.manageSnapshots().createBranch(branchName).commit();

Assertions.assertThatThrownBy(
() ->
exec(
"SELECT * FROM %s /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'branch'='b1')*/",
TABLE))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Cannot scan table using ref b1 configured for streaming reader yet");
// insert on the 'b1' branch
Row row3 = Row.of(3, "ccc", "2021-01-01");
Row row4 = Row.of(4, "ddd", "2021-01-01");

insertRowsInBranch(branchName, table, row3, row4);

// read from main
TableResult result =
exec("SELECT * FROM %s /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/", TABLE);

try (CloseableIterator<Row> iterator = result.collect()) {
// the start snapshot(row2) is exclusive.
assertRows(ImmutableList.of(row1, row2), iterator);

Row row5 = Row.of(5, "eee", "2021-01-01");
Row row6 = Row.of(6, "fff", "2021-01-01");
insertRows(table, row5, row6);
assertRows(ImmutableList.of(row5, row6), iterator);

Row row7 = Row.of(7, "ggg", "2021-01-01");
insertRows(table, row7);
assertRows(ImmutableList.of(row7), iterator);
}
result.getJobClient().ifPresent(JobClient::cancel);
}

@TestTemplate
/**
* Insert records on the main branch. Then insert record on named branch. Then select from the
* named branch and assert all the records are returned
*/
public void testConsumeFilesFromBranch() throws Exception {
sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE);
Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE));

// Produce two snapshots on main branch
Row row1 = Row.of(1, "aaa", "2021-01-01");
Row row2 = Row.of(2, "bbb", "2021-01-01");

insertRows(table, row1, row2);
String branchName = "b1";
table.manageSnapshots().createBranch(branchName).commit();

// read from main
TableResult result =
exec(
"SELECT * FROM %s /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'branch'='%s')*/ ",
TABLE, branchName);

try (CloseableIterator<Row> iterator = result.collect()) {
assertRows(ImmutableList.of(row1, row2), iterator);
// insert on the 'b1' branch
Row row3 = Row.of(3, "ccc", "2021-01-01");
Row row4 = Row.of(4, "ddd", "2021-01-01");
insertRowsInBranch(branchName, table, row3, row4);
assertRows(ImmutableList.of(row3, row4), iterator);
}
result.getJobClient().ifPresent(JobClient::cancel);
}

@TestTemplate
/**
* Insert records on branch b1. Then insert record on b2. Then select from each branch and assert
* the correct records are returned
*/
public void testConsumeFilesFromTwoBranches() throws Exception {
sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE);
Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE));

String branch1 = "b1";
String branch2 = "b2";
table.manageSnapshots().createBranch(branch1).commit();
table.manageSnapshots().createBranch(branch2).commit();

// Produce two snapshots on main branch
Row row1B1 = Row.of(1, "b1", "2021-01-01");
Row row2B1 = Row.of(2, "b1", "2021-01-01");

Row row1B2 = Row.of(2, "b2", "2021-01-01");
Row row2B2 = Row.of(3, "b3", "2021-01-01");

insertRowsInBranch(branch1, table, row1B1, row2B1);
insertRowsInBranch(branch2, table, row1B2, row2B2);

// read from main
TableResult resultBranch1 =
exec(
"SELECT * FROM %s /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'branch'='%s')*/ ",
TABLE, branch1);

try (CloseableIterator<Row> iterator = resultBranch1.collect()) {
assertRows(ImmutableList.of(row1B1, row2B1), iterator);
Row another = Row.of(4, "ccc", "2021-01-01");
insertRowsInBranch(branch1, table, another);
assertRows(ImmutableList.of(another), iterator);
}

TableResult resultBranch2 =
exec(
"SELECT * FROM %s /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'branch'='%s')*/ ",
TABLE, branch2);
try (CloseableIterator<Row> iterator = resultBranch2.collect()) {
assertRows(ImmutableList.of(row1B2, row2B2), iterator);
Row another = Row.of(4, "ccc", "2021-01-01");
insertRowsInBranch(branch2, table, another);
assertRows(ImmutableList.of(another), iterator);
}

resultBranch1.getJobClient().ifPresent(JobClient::cancel);
resultBranch2.getJobClient().ifPresent(JobClient::cancel);
}

@TestTemplate
Expand Down

0 comments on commit 3385b3f

Please sign in to comment.