From b6ed5aa09d839a3c85c710934d7257116977a4df Mon Sep 17 00:00:00 2001 From: ajantha-bhat Date: Thu, 18 Jan 2024 19:27:48 +0530 Subject: [PATCH 1/3] Spark: backport #8656 --- .../TestCreateChangelogViewProcedure.java | 72 ------------------- .../CreateChangelogViewProcedure.java | 19 +---- .../TestCreateChangelogViewProcedure.java | 72 ------------------- .../CreateChangelogViewProcedure.java | 19 +---- 4 files changed, 2 insertions(+), 180 deletions(-) diff --git a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateChangelogViewProcedure.java b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateChangelogViewProcedure.java index 015f2ad5fd17..9aa4bd3d7c8c 100644 --- a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateChangelogViewProcedure.java +++ b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateChangelogViewProcedure.java @@ -186,41 +186,6 @@ public void testTimestampsBasedQuery() { sql("select * from %s order by _change_ordinal, id", returns.get(0)[0])); } - @Test - public void testWithCarryovers() { - createTableWithTwoColumns(); - sql("INSERT INTO %s VALUES (1, 'a')", tableName); - Table table = validationCatalog.loadTable(tableIdent); - Snapshot snap0 = table.currentSnapshot(); - - sql("INSERT INTO %s VALUES (2, 'b')", tableName); - table.refresh(); - Snapshot snap1 = table.currentSnapshot(); - - sql("INSERT OVERWRITE %s VALUES (-2, 'b'), (2, 'b'), (2, 'b')", tableName); - table.refresh(); - Snapshot snap2 = table.currentSnapshot(); - - List returns = - sql( - "CALL %s.system.create_changelog_view(" - + "remove_carryovers => false," - + "table => '%s')", - catalogName, tableName, "cdc_view"); - - String viewName = (String) returns.get(0)[0]; - assertEquals( - "Rows should match", - ImmutableList.of( - row(1, "a", INSERT, 0, snap0.snapshotId()), - row(2, "b", INSERT, 1, snap1.snapshotId()), - row(-2, "b", INSERT, 2, snap2.snapshotId()), - row(2, "b", DELETE, 2, snap2.snapshotId()), - row(2, "b", INSERT, 2, snap2.snapshotId()), - row(2, "b", INSERT, 2, snap2.snapshotId())), - sql("select * from %s order by _change_ordinal, id, _change_type", viewName)); - } - @Test public void testUpdate() { createTableWithTwoColumns(); @@ -474,41 +439,4 @@ public void testNetChangesWithComputeUpdates() { "CALL %s.system.create_changelog_view(table => '%s', identifier_columns => array('id'), net_changes => true)", catalogName, tableName)); } - - @Test - public void testNotRemoveCarryOvers() { - createTableWithThreeColumns(); - - sql("INSERT INTO %s VALUES (1, 'a', 12), (2, 'b', 11), (2, 'e', 12)", tableName); - Table table = validationCatalog.loadTable(tableIdent); - Snapshot snap1 = table.currentSnapshot(); - - // carry-over row (2, 'e', 12) - sql("INSERT OVERWRITE %s VALUES (3, 'c', 13), (2, 'd', 11), (2, 'e', 12)", tableName); - table.refresh(); - Snapshot snap2 = table.currentSnapshot(); - - List returns = - sql( - "CALL %s.system.create_changelog_view(" - + "remove_carryovers => false," - + "table => '%s')", - catalogName, tableName); - - String viewName = (String) returns.get(0)[0]; - - assertEquals( - "Rows should match", - ImmutableList.of( - row(1, "a", 12, INSERT, 0, snap1.snapshotId()), - row(2, "b", 11, INSERT, 0, snap1.snapshotId()), - row(2, "e", 12, INSERT, 0, snap1.snapshotId()), - row(2, "b", 11, DELETE, 1, snap2.snapshotId()), - row(2, "d", 11, INSERT, 1, snap2.snapshotId()), - // the following two rows are carry-over rows - row(2, "e", 12, DELETE, 1, snap2.snapshotId()), - row(2, "e", 12, INSERT, 1, snap2.snapshotId()), - row(3, "c", 13, INSERT, 1, snap2.snapshotId())), - sql("select * from %s order by _change_ordinal, id, data, _change_type", viewName)); - } } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java index 259254aa2d51..2ece0790d143 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java @@ -91,18 +91,6 @@ public class CreateChangelogViewProcedure extends BaseProcedure { ProcedureParameter.optional("options", STRING_MAP); private static final ProcedureParameter COMPUTE_UPDATES_PARAM = ProcedureParameter.optional("compute_updates", DataTypes.BooleanType); - - /** - * Enable or disable the remove carry-over rows. - * - * @deprecated since 1.4.0, will be removed in 1.5.0; The procedure will always remove carry-over - * rows. Please query {@link SparkChangelogTable} instead for the use cases doesn't remove - * carry-over rows. - */ - @Deprecated - private static final ProcedureParameter REMOVE_CARRYOVERS_PARAM = - ProcedureParameter.optional("remove_carryovers", DataTypes.BooleanType); - private static final ProcedureParameter IDENTIFIER_COLUMNS_PARAM = ProcedureParameter.optional("identifier_columns", STRING_ARRAY); private static final ProcedureParameter NET_CHANGES = @@ -114,7 +102,6 @@ public class CreateChangelogViewProcedure extends BaseProcedure { CHANGELOG_VIEW_PARAM, OPTIONS_PARAM, COMPUTE_UPDATES_PARAM, - REMOVE_CARRYOVERS_PARAM, IDENTIFIER_COLUMNS_PARAM, NET_CHANGES, }; @@ -163,7 +150,7 @@ public InternalRow[] call(InternalRow args) { if (shouldComputeUpdateImages(input)) { Preconditions.checkArgument(!netChanges, "Not support net changes with update images"); df = computeUpdateImages(identifierColumns(input, tableIdent), df); - } else if (shouldRemoveCarryoverRows(input)) { + } else { df = removeCarryoverRows(df, netChanges); } @@ -195,10 +182,6 @@ private boolean shouldComputeUpdateImages(ProcedureInput input) { return input.asBoolean(COMPUTE_UPDATES_PARAM, defaultValue); } - private boolean shouldRemoveCarryoverRows(ProcedureInput input) { - return input.asBoolean(REMOVE_CARRYOVERS_PARAM, true); - } - private Dataset removeCarryoverRows(Dataset df, boolean netChanges) { Predicate columnsToKeep; if (netChanges) { diff --git a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateChangelogViewProcedure.java b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateChangelogViewProcedure.java index 015f2ad5fd17..9aa4bd3d7c8c 100644 --- a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateChangelogViewProcedure.java +++ b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateChangelogViewProcedure.java @@ -186,41 +186,6 @@ public void testTimestampsBasedQuery() { sql("select * from %s order by _change_ordinal, id", returns.get(0)[0])); } - @Test - public void testWithCarryovers() { - createTableWithTwoColumns(); - sql("INSERT INTO %s VALUES (1, 'a')", tableName); - Table table = validationCatalog.loadTable(tableIdent); - Snapshot snap0 = table.currentSnapshot(); - - sql("INSERT INTO %s VALUES (2, 'b')", tableName); - table.refresh(); - Snapshot snap1 = table.currentSnapshot(); - - sql("INSERT OVERWRITE %s VALUES (-2, 'b'), (2, 'b'), (2, 'b')", tableName); - table.refresh(); - Snapshot snap2 = table.currentSnapshot(); - - List returns = - sql( - "CALL %s.system.create_changelog_view(" - + "remove_carryovers => false," - + "table => '%s')", - catalogName, tableName, "cdc_view"); - - String viewName = (String) returns.get(0)[0]; - assertEquals( - "Rows should match", - ImmutableList.of( - row(1, "a", INSERT, 0, snap0.snapshotId()), - row(2, "b", INSERT, 1, snap1.snapshotId()), - row(-2, "b", INSERT, 2, snap2.snapshotId()), - row(2, "b", DELETE, 2, snap2.snapshotId()), - row(2, "b", INSERT, 2, snap2.snapshotId()), - row(2, "b", INSERT, 2, snap2.snapshotId())), - sql("select * from %s order by _change_ordinal, id, _change_type", viewName)); - } - @Test public void testUpdate() { createTableWithTwoColumns(); @@ -474,41 +439,4 @@ public void testNetChangesWithComputeUpdates() { "CALL %s.system.create_changelog_view(table => '%s', identifier_columns => array('id'), net_changes => true)", catalogName, tableName)); } - - @Test - public void testNotRemoveCarryOvers() { - createTableWithThreeColumns(); - - sql("INSERT INTO %s VALUES (1, 'a', 12), (2, 'b', 11), (2, 'e', 12)", tableName); - Table table = validationCatalog.loadTable(tableIdent); - Snapshot snap1 = table.currentSnapshot(); - - // carry-over row (2, 'e', 12) - sql("INSERT OVERWRITE %s VALUES (3, 'c', 13), (2, 'd', 11), (2, 'e', 12)", tableName); - table.refresh(); - Snapshot snap2 = table.currentSnapshot(); - - List returns = - sql( - "CALL %s.system.create_changelog_view(" - + "remove_carryovers => false," - + "table => '%s')", - catalogName, tableName); - - String viewName = (String) returns.get(0)[0]; - - assertEquals( - "Rows should match", - ImmutableList.of( - row(1, "a", 12, INSERT, 0, snap1.snapshotId()), - row(2, "b", 11, INSERT, 0, snap1.snapshotId()), - row(2, "e", 12, INSERT, 0, snap1.snapshotId()), - row(2, "b", 11, DELETE, 1, snap2.snapshotId()), - row(2, "d", 11, INSERT, 1, snap2.snapshotId()), - // the following two rows are carry-over rows - row(2, "e", 12, DELETE, 1, snap2.snapshotId()), - row(2, "e", 12, INSERT, 1, snap2.snapshotId()), - row(3, "c", 13, INSERT, 1, snap2.snapshotId())), - sql("select * from %s order by _change_ordinal, id, data, _change_type", viewName)); - } } diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java index 259254aa2d51..2ece0790d143 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java @@ -91,18 +91,6 @@ public class CreateChangelogViewProcedure extends BaseProcedure { ProcedureParameter.optional("options", STRING_MAP); private static final ProcedureParameter COMPUTE_UPDATES_PARAM = ProcedureParameter.optional("compute_updates", DataTypes.BooleanType); - - /** - * Enable or disable the remove carry-over rows. - * - * @deprecated since 1.4.0, will be removed in 1.5.0; The procedure will always remove carry-over - * rows. Please query {@link SparkChangelogTable} instead for the use cases doesn't remove - * carry-over rows. - */ - @Deprecated - private static final ProcedureParameter REMOVE_CARRYOVERS_PARAM = - ProcedureParameter.optional("remove_carryovers", DataTypes.BooleanType); - private static final ProcedureParameter IDENTIFIER_COLUMNS_PARAM = ProcedureParameter.optional("identifier_columns", STRING_ARRAY); private static final ProcedureParameter NET_CHANGES = @@ -114,7 +102,6 @@ public class CreateChangelogViewProcedure extends BaseProcedure { CHANGELOG_VIEW_PARAM, OPTIONS_PARAM, COMPUTE_UPDATES_PARAM, - REMOVE_CARRYOVERS_PARAM, IDENTIFIER_COLUMNS_PARAM, NET_CHANGES, }; @@ -163,7 +150,7 @@ public InternalRow[] call(InternalRow args) { if (shouldComputeUpdateImages(input)) { Preconditions.checkArgument(!netChanges, "Not support net changes with update images"); df = computeUpdateImages(identifierColumns(input, tableIdent), df); - } else if (shouldRemoveCarryoverRows(input)) { + } else { df = removeCarryoverRows(df, netChanges); } @@ -195,10 +182,6 @@ private boolean shouldComputeUpdateImages(ProcedureInput input) { return input.asBoolean(COMPUTE_UPDATES_PARAM, defaultValue); } - private boolean shouldRemoveCarryoverRows(ProcedureInput input) { - return input.asBoolean(REMOVE_CARRYOVERS_PARAM, true); - } - private Dataset removeCarryoverRows(Dataset df, boolean netChanges) { Predicate columnsToKeep; if (netChanges) { From ce05f37085cd741ab1abfdf9ef4b746f90aba8f0 Mon Sep 17 00:00:00 2001 From: ajantha-bhat Date: Thu, 18 Jan 2024 19:34:49 +0530 Subject: [PATCH 2/3] Update docs --- docs/spark-procedures.md | 1 - .../spark/procedures/CreateChangelogViewProcedure.java | 4 ++-- .../spark/procedures/CreateChangelogViewProcedure.java | 4 ++-- .../spark/procedures/CreateChangelogViewProcedure.java | 4 ++-- 4 files changed, 6 insertions(+), 7 deletions(-) diff --git a/docs/spark-procedures.md b/docs/spark-procedures.md index 45a9f80ea633..ee25de8f3349 100644 --- a/docs/spark-procedures.md +++ b/docs/spark-procedures.md @@ -770,7 +770,6 @@ Creates a view that contains the changes from a given table. | `net_changes` | | boolean | Whether to output net changes (see below for more information). Defaults to false. | | `compute_updates` | | boolean | Whether to compute pre/post update images (see below for more information). Defaults to false. | | `identifier_columns` | | array | The list of identifier columns to compute updates. If the argument `compute_updates` is set to true and `identifier_columns` are not provided, the table’s current identifier fields will be used. | -| `remove_carryovers` | | boolean | Whether to remove carry-over rows (see below for more information). Defaults to true. Deprecated since 1.4.0, will be removed in 1.5.0; Please query `SparkChangelogTable` to view carry-over rows. | Here is a list of commonly used Spark read options: * `start-snapshot-id`: the exclusive start snapshot ID. If not provided, it reads from the table’s first snapshot inclusively. diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java index 2ece0790d143..638c66737223 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java @@ -49,8 +49,8 @@ /** * A procedure that creates a view for changed rows. * - *

The procedure removes the carry-over rows by default. If you want to keep them, you can set - * "remove_carryovers" to be false in the options. + *

The procedure always removes the carry-over rows. Please query {@link SparkChangelogTable} + * instead for the use cases doesn't remove carry-over rows. * *

The procedure doesn't compute the pre/post update images by default. If you want to compute * them, you can set "compute_updates" to be true in the options. diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java index 2ece0790d143..638c66737223 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java @@ -49,8 +49,8 @@ /** * A procedure that creates a view for changed rows. * - *

The procedure removes the carry-over rows by default. If you want to keep them, you can set - * "remove_carryovers" to be false in the options. + *

The procedure always removes the carry-over rows. Please query {@link SparkChangelogTable} + * instead for the use cases doesn't remove carry-over rows. * *

The procedure doesn't compute the pre/post update images by default. If you want to compute * them, you can set "compute_updates" to be true in the options. diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java index d46e3cab0bfc..aaa95f912b7c 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java @@ -49,8 +49,8 @@ /** * A procedure that creates a view for changed rows. * - *

The procedure removes the carry-over rows by default. If you want to keep them, you can set - * "remove_carryovers" to be false in the options. + *

The procedure always removes the carry-over rows. Please query {@link SparkChangelogTable} + * instead for the use cases doesn't remove carry-over rows. * *

The procedure doesn't compute the pre/post update images by default. If you want to compute * them, you can set "compute_updates" to be true in the options. From 2e924902a6e300d64504409a30f595ecf2fd70bf Mon Sep 17 00:00:00 2001 From: ajantha-bhat Date: Fri, 19 Jan 2024 09:58:54 +0530 Subject: [PATCH 3/3] reword --- .../iceberg/spark/procedures/CreateChangelogViewProcedure.java | 2 +- .../iceberg/spark/procedures/CreateChangelogViewProcedure.java | 2 +- .../iceberg/spark/procedures/CreateChangelogViewProcedure.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java index 638c66737223..44ec8e719378 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java @@ -50,7 +50,7 @@ * A procedure that creates a view for changed rows. * *

The procedure always removes the carry-over rows. Please query {@link SparkChangelogTable} - * instead for the use cases doesn't remove carry-over rows. + * instead when carry-over rows are required. * *

The procedure doesn't compute the pre/post update images by default. If you want to compute * them, you can set "compute_updates" to be true in the options. diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java index 638c66737223..44ec8e719378 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java @@ -50,7 +50,7 @@ * A procedure that creates a view for changed rows. * *

The procedure always removes the carry-over rows. Please query {@link SparkChangelogTable} - * instead for the use cases doesn't remove carry-over rows. + * instead when carry-over rows are required. * *

The procedure doesn't compute the pre/post update images by default. If you want to compute * them, you can set "compute_updates" to be true in the options. diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java index aaa95f912b7c..b4594d91c0ef 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java @@ -50,7 +50,7 @@ * A procedure that creates a view for changed rows. * *

The procedure always removes the carry-over rows. Please query {@link SparkChangelogTable} - * instead for the use cases doesn't remove carry-over rows. + * instead when carry-over rows are required. * *

The procedure doesn't compute the pre/post update images by default. If you want to compute * them, you can set "compute_updates" to be true in the options.