diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java index 3dce5dd5901e..cf57a126ae59 100644 --- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java +++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java @@ -152,11 +152,6 @@ private void validate() { "Invalid starting snapshot id for SPECIFIC_START_SNAPSHOT_ID strategy: not null"); } - Preconditions.checkArgument( - branch == null, - String.format( - "Cannot scan table using ref %s configured for streaming reader yet", branch)); - Preconditions.checkArgument( tag == null, String.format("Cannot scan table using ref %s configured for streaming reader", tag)); diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java index c27e29613fed..a07613aee59b 100644 --- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java +++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java @@ -130,9 +130,6 @@ public void initializeState(FunctionInitializationContext context) throws Except Preconditions.checkArgument( !(scanContext.startTag() != null && scanContext.startSnapshotId() != null), "START_SNAPSHOT_ID and START_TAG cannot both be set."); - Preconditions.checkArgument( - scanContext.branch() == null, - "Cannot scan table using ref %s configured for streaming reader yet."); Preconditions.checkNotNull( table.currentSnapshot(), "Don't have any available snapshot in table."); @@ -195,7 +192,10 @@ void monitorAndForwardSplits() { // Refresh the table to get the latest committed snapshot. table.refresh(); - Snapshot snapshot = table.currentSnapshot(); + Snapshot snapshot = + scanContext.branch() != null + ? table.snapshot(scanContext.branch()) + : table.currentSnapshot(); if (snapshot != null && snapshot.snapshotId() != lastSnapshotId) { long snapshotId = snapshot.snapshotId(); diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java index 450b649253a4..e9e3c159b07b 100644 --- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java +++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java @@ -104,7 +104,11 @@ private Snapshot toSnapshotInclusive( private ContinuousEnumerationResult discoverIncrementalSplits( IcebergEnumeratorPosition lastPosition) { - Snapshot currentSnapshot = table.currentSnapshot(); + Snapshot currentSnapshot = + scanContext.branch() != null + ? table.snapshot(scanContext.branch()) + : table.currentSnapshot(); + if (currentSnapshot == null) { // empty table Preconditions.checkArgument( diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java index 31e9733fcd60..d29829c5ca5a 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java @@ -370,6 +370,88 @@ public void testSpecificSnapshotTimestamp() throws Exception { } } + @Test + public void testReadingFromBranch() throws Exception { + String branch = "b1"; + GenericAppenderHelper dataAppender = + new GenericAppenderHelper(tableResource.table(), FileFormat.PARQUET, TEMPORARY_FOLDER); + + List batchBase = + RandomGenericData.generate(tableResource.table().schema(), 2, randomSeed.incrementAndGet()); + dataAppender.appendToTable(batchBase); + + // create branch + tableResource + .table() + .manageSnapshots() + .createBranch(branch, tableResource.table().currentSnapshot().snapshotId()) + .commit(); + + // snapshot1 to branch + List batch1 = + RandomGenericData.generate(tableResource.table().schema(), 2, randomSeed.incrementAndGet()); + dataAppender.appendToTable(branch, batch1); + + // snapshot2 to branch + List batch2 = + RandomGenericData.generate(tableResource.table().schema(), 2, randomSeed.incrementAndGet()); + dataAppender.appendToTable(branch, batch2); + + List branchExpectedRecords = Lists.newArrayList(); + branchExpectedRecords.addAll(batchBase); + branchExpectedRecords.addAll(batch1); + branchExpectedRecords.addAll(batch2); + // reads from branch: it should contain the first snapshot (before the branch creation) followed + // by the next 2 + ScanContext scanContext = + ScanContext.builder() + .streaming(true) + .monitorInterval(Duration.ofMillis(10L)) + .startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL) + .useBranch(branch) + .build(); + + try (CloseableIterator iter = + createStream(scanContext).executeAndCollect(getClass().getSimpleName())) { + List resultMain = waitForResult(iter, 6); + TestHelpers.assertRecords(resultMain, branchExpectedRecords, tableResource.table().schema()); + + // snapshot3 to branch + List batch3 = + RandomGenericData.generate( + tableResource.table().schema(), 2, randomSeed.incrementAndGet()); + dataAppender.appendToTable(branch, batch3); + + List batch4 = + RandomGenericData.generate( + tableResource.table().schema(), 2, randomSeed.incrementAndGet()); + dataAppender.appendToTable(branch, batch4); + + List result3 = waitForResult(iter, 2); + TestHelpers.assertRecords(result3, batch3, tableResource.table().schema()); + } + + // read only from main branch. Should contain only the first snapshot + scanContext = + ScanContext.builder() + .streaming(true) + .monitorInterval(Duration.ofMillis(10L)) + .startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL) + .build(); + try (CloseableIterator iter = + createStream(scanContext).executeAndCollect(getClass().getSimpleName())) { + List resultMain = waitForResult(iter, 2); + TestHelpers.assertRecords(resultMain, batchBase, tableResource.table().schema()); + + List batchMain2 = + RandomGenericData.generate( + tableResource.table().schema(), 2, randomSeed.incrementAndGet()); + dataAppender.appendToTable(batchMain2); + resultMain = waitForResult(iter, 2); + TestHelpers.assertRecords(resultMain, batchMain2, tableResource.table().schema()); + } + } + private DataStream createStream(ScanContext scanContext) throws Exception { // start the source and collect output StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -384,6 +466,7 @@ private DataStream createStream(ScanContext scanContext) throws Exception { .startSnapshotTimestamp(scanContext.startSnapshotTimestamp()) .startSnapshotId(scanContext.startSnapshotId()) .monitorInterval(Duration.ofMillis(10L)) + .branch(scanContext.branch()) .build(), WatermarkStrategy.noWatermarks(), "icebergSource", diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java index 09d5a5481aee..9a7d174afcb3 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java @@ -97,6 +97,31 @@ public void clean() { super.clean(); } + private void insertRows(String partition, String branch, Table table, Row... rows) + throws IOException { + GenericAppenderHelper appender = new GenericAppenderHelper(table, FORMAT, temporaryDirectory); + + GenericRecord gRecord = GenericRecord.create(table.schema()); + List records = Lists.newArrayList(); + for (Row row : rows) { + records.add( + gRecord.copy( + "id", row.getField(0), + "data", row.getField(1), + "dt", row.getField(2))); + } + + if (partition != null) { + appender.appendToTable(TestHelpers.Row.of(partition, 0), records); + } else { + if (branch != null) { + appender.appendToTable(branch, records); + } else { + appender.appendToTable(records); + } + } + } + private void insertRows(String partition, Table table, Row... rows) throws IOException { GenericAppenderHelper appender = new GenericAppenderHelper(table, FORMAT, temporaryDirectory); @@ -117,6 +142,10 @@ private void insertRows(String partition, Table table, Row... rows) throws IOExc } } + private void insertRowsInBranch(String branch, Table table, Row... rows) throws IOException { + insertRows(null, branch, table, rows); + } + private void insertRows(Table table, Row... rows) throws IOException { insertRows(null, table, rows); } @@ -205,20 +234,132 @@ public void testConsumeFromBeginning() throws Exception { } @TestTemplate - public void testConsumeFilesWithBranch() throws Exception { + /** + * Insert records on the main branch. Then, insert in a named branch. Reads from the main branch + * and assert that the only records from main are returned + */ + public void testConsumeFilesFromMainOnlyWithBranch() throws Exception { sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE); Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE)); + + // Produce two snapshots on main branch Row row1 = Row.of(1, "aaa", "2021-01-01"); Row row2 = Row.of(2, "bbb", "2021-01-01"); + insertRows(table, row1, row2); + String branchName = "b1"; + table.manageSnapshots().createBranch(branchName).commit(); - Assertions.assertThatThrownBy( - () -> - exec( - "SELECT * FROM %s /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'branch'='b1')*/", - TABLE)) - .isInstanceOf(IllegalArgumentException.class) - .hasMessage("Cannot scan table using ref b1 configured for streaming reader yet"); + // insert on the 'b1' branch + Row row3 = Row.of(3, "ccc", "2021-01-01"); + Row row4 = Row.of(4, "ddd", "2021-01-01"); + + insertRowsInBranch(branchName, table, row3, row4); + + // read from main + TableResult result = + exec("SELECT * FROM %s /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/", TABLE); + + try (CloseableIterator iterator = result.collect()) { + // the start snapshot(row2) is exclusive. + assertRows(ImmutableList.of(row1, row2), iterator); + + Row row5 = Row.of(5, "eee", "2021-01-01"); + Row row6 = Row.of(6, "fff", "2021-01-01"); + insertRows(table, row5, row6); + assertRows(ImmutableList.of(row5, row6), iterator); + + Row row7 = Row.of(7, "ggg", "2021-01-01"); + insertRows(table, row7); + assertRows(ImmutableList.of(row7), iterator); + } + result.getJobClient().ifPresent(JobClient::cancel); + } + + @TestTemplate + /** + * Insert records on the main branch. Then insert record on named branch. Then select from the + * named branch and assert all the records are returned + */ + public void testConsumeFilesFromBranch() throws Exception { + sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE); + Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE)); + + // Produce two snapshots on main branch + Row row1 = Row.of(1, "aaa", "2021-01-01"); + Row row2 = Row.of(2, "bbb", "2021-01-01"); + + insertRows(table, row1, row2); + String branchName = "b1"; + table.manageSnapshots().createBranch(branchName).commit(); + + // read from main + TableResult result = + exec( + "SELECT * FROM %s /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'branch'='%s')*/ ", + TABLE, branchName); + + try (CloseableIterator iterator = result.collect()) { + assertRows(ImmutableList.of(row1, row2), iterator); + // insert on the 'b1' branch + Row row3 = Row.of(3, "ccc", "2021-01-01"); + Row row4 = Row.of(4, "ddd", "2021-01-01"); + insertRowsInBranch(branchName, table, row3, row4); + assertRows(ImmutableList.of(row3, row4), iterator); + } + result.getJobClient().ifPresent(JobClient::cancel); + } + + @TestTemplate + /** + * Insert records on branch b1. Then insert record on b2. Then select from each branch and assert + * the correct records are returned + */ + public void testConsumeFilesFromTwoBranches() throws Exception { + sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE); + Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE)); + + String branch1 = "b1"; + String branch2 = "b2"; + table.manageSnapshots().createBranch(branch1).commit(); + table.manageSnapshots().createBranch(branch2).commit(); + + // Produce two snapshots on main branch + Row row1B1 = Row.of(1, "b1", "2021-01-01"); + Row row2B1 = Row.of(2, "b1", "2021-01-01"); + + Row row1B2 = Row.of(2, "b2", "2021-01-01"); + Row row2B2 = Row.of(3, "b3", "2021-01-01"); + + insertRowsInBranch(branch1, table, row1B1, row2B1); + insertRowsInBranch(branch2, table, row1B2, row2B2); + + // read from main + TableResult resultBranch1 = + exec( + "SELECT * FROM %s /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'branch'='%s')*/ ", + TABLE, branch1); + + try (CloseableIterator iterator = resultBranch1.collect()) { + assertRows(ImmutableList.of(row1B1, row2B1), iterator); + Row another = Row.of(4, "ccc", "2021-01-01"); + insertRowsInBranch(branch1, table, another); + assertRows(ImmutableList.of(another), iterator); + } + + TableResult resultBranch2 = + exec( + "SELECT * FROM %s /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'branch'='%s')*/ ", + TABLE, branch2); + try (CloseableIterator iterator = resultBranch2.collect()) { + assertRows(ImmutableList.of(row1B2, row2B2), iterator); + Row another = Row.of(4, "ccc", "2021-01-01"); + insertRowsInBranch(branch2, table, another); + assertRows(ImmutableList.of(another), iterator); + } + + resultBranch1.getJobClient().ifPresent(JobClient::cancel); + resultBranch2.getJobClient().ifPresent(JobClient::cancel); } @TestTemplate