From 6b33b82ca91c3ae8f545f9e6ca687087fdfe1088 Mon Sep 17 00:00:00 2001 From: Yufei Gu Date: Tue, 26 Sep 2023 13:04:09 -0700 Subject: [PATCH] =?UTF-8?q?Spark=203.5:=20Drop=20the=20=E2=80=9Cremove=5Fc?= =?UTF-8?q?arryovers=E2=80=9D=20flag=20for=20CDC=20view=20creation=20(#865?= =?UTF-8?q?6)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../TestCreateChangelogViewProcedure.java | 72 ------------------- .../CreateChangelogViewProcedure.java | 19 +---- 2 files changed, 1 insertion(+), 90 deletions(-) diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateChangelogViewProcedure.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateChangelogViewProcedure.java index 015f2ad5fd17..9aa4bd3d7c8c 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateChangelogViewProcedure.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateChangelogViewProcedure.java @@ -186,41 +186,6 @@ public void testTimestampsBasedQuery() { sql("select * from %s order by _change_ordinal, id", returns.get(0)[0])); } - @Test - public void testWithCarryovers() { - createTableWithTwoColumns(); - sql("INSERT INTO %s VALUES (1, 'a')", tableName); - Table table = validationCatalog.loadTable(tableIdent); - Snapshot snap0 = table.currentSnapshot(); - - sql("INSERT INTO %s VALUES (2, 'b')", tableName); - table.refresh(); - Snapshot snap1 = table.currentSnapshot(); - - sql("INSERT OVERWRITE %s VALUES (-2, 'b'), (2, 'b'), (2, 'b')", tableName); - table.refresh(); - Snapshot snap2 = table.currentSnapshot(); - - List returns = - sql( - "CALL %s.system.create_changelog_view(" - + "remove_carryovers => false," - + "table => '%s')", - catalogName, tableName, "cdc_view"); - - String viewName = (String) returns.get(0)[0]; - assertEquals( - "Rows should match", - ImmutableList.of( - row(1, "a", INSERT, 0, snap0.snapshotId()), - row(2, "b", INSERT, 1, snap1.snapshotId()), - row(-2, "b", INSERT, 2, snap2.snapshotId()), - row(2, "b", DELETE, 2, snap2.snapshotId()), - row(2, "b", INSERT, 2, snap2.snapshotId()), - row(2, "b", INSERT, 2, snap2.snapshotId())), - sql("select * from %s order by _change_ordinal, id, _change_type", viewName)); - } - @Test public void testUpdate() { createTableWithTwoColumns(); @@ -474,41 +439,4 @@ public void testNetChangesWithComputeUpdates() { "CALL %s.system.create_changelog_view(table => '%s', identifier_columns => array('id'), net_changes => true)", catalogName, tableName)); } - - @Test - public void testNotRemoveCarryOvers() { - createTableWithThreeColumns(); - - sql("INSERT INTO %s VALUES (1, 'a', 12), (2, 'b', 11), (2, 'e', 12)", tableName); - Table table = validationCatalog.loadTable(tableIdent); - Snapshot snap1 = table.currentSnapshot(); - - // carry-over row (2, 'e', 12) - sql("INSERT OVERWRITE %s VALUES (3, 'c', 13), (2, 'd', 11), (2, 'e', 12)", tableName); - table.refresh(); - Snapshot snap2 = table.currentSnapshot(); - - List returns = - sql( - "CALL %s.system.create_changelog_view(" - + "remove_carryovers => false," - + "table => '%s')", - catalogName, tableName); - - String viewName = (String) returns.get(0)[0]; - - assertEquals( - "Rows should match", - ImmutableList.of( - row(1, "a", 12, INSERT, 0, snap1.snapshotId()), - row(2, "b", 11, INSERT, 0, snap1.snapshotId()), - row(2, "e", 12, INSERT, 0, snap1.snapshotId()), - row(2, "b", 11, DELETE, 1, snap2.snapshotId()), - row(2, "d", 11, INSERT, 1, snap2.snapshotId()), - // the following two rows are carry-over rows - row(2, "e", 12, DELETE, 1, snap2.snapshotId()), - row(2, "e", 12, INSERT, 1, snap2.snapshotId()), - row(3, "c", 13, INSERT, 1, snap2.snapshotId())), - sql("select * from %s order by _change_ordinal, id, data, _change_type", viewName)); - } } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java index 3b51cf79d524..d46e3cab0bfc 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java @@ -91,18 +91,6 @@ public class CreateChangelogViewProcedure extends BaseProcedure { ProcedureParameter.optional("options", STRING_MAP); private static final ProcedureParameter COMPUTE_UPDATES_PARAM = ProcedureParameter.optional("compute_updates", DataTypes.BooleanType); - - /** - * Enable or disable the remove carry-over rows. - * - * @deprecated since 1.4.0, will be removed in 1.5.0; The procedure will always remove carry-over - * rows. Please query {@link SparkChangelogTable} instead for the use cases doesn't remove - * carry-over rows. - */ - @Deprecated - private static final ProcedureParameter REMOVE_CARRYOVERS_PARAM = - ProcedureParameter.optional("remove_carryovers", DataTypes.BooleanType); - private static final ProcedureParameter IDENTIFIER_COLUMNS_PARAM = ProcedureParameter.optional("identifier_columns", STRING_ARRAY); private static final ProcedureParameter NET_CHANGES = @@ -114,7 +102,6 @@ public class CreateChangelogViewProcedure extends BaseProcedure { CHANGELOG_VIEW_PARAM, OPTIONS_PARAM, COMPUTE_UPDATES_PARAM, - REMOVE_CARRYOVERS_PARAM, IDENTIFIER_COLUMNS_PARAM, NET_CHANGES, }; @@ -163,7 +150,7 @@ public InternalRow[] call(InternalRow args) { if (shouldComputeUpdateImages(input)) { Preconditions.checkArgument(!netChanges, "Not support net changes with update images"); df = computeUpdateImages(identifierColumns(input, tableIdent), df); - } else if (shouldRemoveCarryoverRows(input)) { + } else { df = removeCarryoverRows(df, netChanges); } @@ -195,10 +182,6 @@ private boolean shouldComputeUpdateImages(ProcedureInput input) { return input.asBoolean(COMPUTE_UPDATES_PARAM, defaultValue); } - private boolean shouldRemoveCarryoverRows(ProcedureInput input) { - return input.asBoolean(REMOVE_CARRYOVERS_PARAM, true); - } - private Dataset removeCarryoverRows(Dataset df, boolean netChanges) { Predicate columnsToKeep; if (netChanges) {