Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark: backport #8656 and update docs #9512

Merged
merged 3 commits into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion docs/spark-procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -770,7 +770,6 @@ Creates a view that contains the changes from a given table.
| `net_changes` | | boolean | Whether to output net changes (see below for more information). Defaults to false. |
| `compute_updates` | | boolean | Whether to compute pre/post update images (see below for more information). Defaults to false. |
| `identifier_columns` | | array<string> | The list of identifier columns to compute updates. If the argument `compute_updates` is set to true and `identifier_columns` are not provided, the table’s current identifier fields will be used. |
| `remove_carryovers` | | boolean | Whether to remove carry-over rows (see below for more information). Defaults to true. Deprecated since 1.4.0, will be removed in 1.5.0; Please query `SparkChangelogTable` to view carry-over rows. |

Here is a list of commonly used Spark read options:
* `start-snapshot-id`: the exclusive start snapshot ID. If not provided, it reads from the table’s first snapshot inclusively.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,41 +186,6 @@ public void testTimestampsBasedQuery() {
sql("select * from %s order by _change_ordinal, id", returns.get(0)[0]));
}

@Test
public void testWithCarryovers() {
createTableWithTwoColumns();
sql("INSERT INTO %s VALUES (1, 'a')", tableName);
Table table = validationCatalog.loadTable(tableIdent);
Snapshot snap0 = table.currentSnapshot();

sql("INSERT INTO %s VALUES (2, 'b')", tableName);
table.refresh();
Snapshot snap1 = table.currentSnapshot();

sql("INSERT OVERWRITE %s VALUES (-2, 'b'), (2, 'b'), (2, 'b')", tableName);
table.refresh();
Snapshot snap2 = table.currentSnapshot();

List<Object[]> returns =
sql(
"CALL %s.system.create_changelog_view("
+ "remove_carryovers => false,"
+ "table => '%s')",
catalogName, tableName, "cdc_view");

String viewName = (String) returns.get(0)[0];
assertEquals(
"Rows should match",
ImmutableList.of(
row(1, "a", INSERT, 0, snap0.snapshotId()),
row(2, "b", INSERT, 1, snap1.snapshotId()),
row(-2, "b", INSERT, 2, snap2.snapshotId()),
row(2, "b", DELETE, 2, snap2.snapshotId()),
row(2, "b", INSERT, 2, snap2.snapshotId()),
row(2, "b", INSERT, 2, snap2.snapshotId())),
sql("select * from %s order by _change_ordinal, id, _change_type", viewName));
}

@Test
public void testUpdate() {
createTableWithTwoColumns();
Expand Down Expand Up @@ -474,41 +439,4 @@ public void testNetChangesWithComputeUpdates() {
"CALL %s.system.create_changelog_view(table => '%s', identifier_columns => array('id'), net_changes => true)",
catalogName, tableName));
}

@Test
public void testNotRemoveCarryOvers() {
createTableWithThreeColumns();

sql("INSERT INTO %s VALUES (1, 'a', 12), (2, 'b', 11), (2, 'e', 12)", tableName);
Table table = validationCatalog.loadTable(tableIdent);
Snapshot snap1 = table.currentSnapshot();

// carry-over row (2, 'e', 12)
sql("INSERT OVERWRITE %s VALUES (3, 'c', 13), (2, 'd', 11), (2, 'e', 12)", tableName);
table.refresh();
Snapshot snap2 = table.currentSnapshot();

List<Object[]> returns =
sql(
"CALL %s.system.create_changelog_view("
+ "remove_carryovers => false,"
+ "table => '%s')",
catalogName, tableName);

String viewName = (String) returns.get(0)[0];

assertEquals(
"Rows should match",
ImmutableList.of(
row(1, "a", 12, INSERT, 0, snap1.snapshotId()),
row(2, "b", 11, INSERT, 0, snap1.snapshotId()),
row(2, "e", 12, INSERT, 0, snap1.snapshotId()),
row(2, "b", 11, DELETE, 1, snap2.snapshotId()),
row(2, "d", 11, INSERT, 1, snap2.snapshotId()),
// the following two rows are carry-over rows
row(2, "e", 12, DELETE, 1, snap2.snapshotId()),
row(2, "e", 12, INSERT, 1, snap2.snapshotId()),
row(3, "c", 13, INSERT, 1, snap2.snapshotId())),
sql("select * from %s order by _change_ordinal, id, data, _change_type", viewName));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@
/**
* A procedure that creates a view for changed rows.
*
* <p>The procedure removes the carry-over rows by default. If you want to keep them, you can set
* "remove_carryovers" to be false in the options.
* <p>The procedure always removes the carry-over rows. Please query {@link SparkChangelogTable}
* instead when carry-over rows are required.
*
* <p>The procedure doesn't compute the pre/post update images by default. If you want to compute
* them, you can set "compute_updates" to be true in the options.
Expand Down Expand Up @@ -91,18 +91,6 @@ public class CreateChangelogViewProcedure extends BaseProcedure {
ProcedureParameter.optional("options", STRING_MAP);
private static final ProcedureParameter COMPUTE_UPDATES_PARAM =
ProcedureParameter.optional("compute_updates", DataTypes.BooleanType);

/**
* Enable or disable the remove carry-over rows.
*
* @deprecated since 1.4.0, will be removed in 1.5.0; The procedure will always remove carry-over
* rows. Please query {@link SparkChangelogTable} instead for the use cases doesn't remove
* carry-over rows.
*/
@Deprecated
private static final ProcedureParameter REMOVE_CARRYOVERS_PARAM =
ProcedureParameter.optional("remove_carryovers", DataTypes.BooleanType);

private static final ProcedureParameter IDENTIFIER_COLUMNS_PARAM =
ProcedureParameter.optional("identifier_columns", STRING_ARRAY);
private static final ProcedureParameter NET_CHANGES =
Expand All @@ -114,7 +102,6 @@ public class CreateChangelogViewProcedure extends BaseProcedure {
CHANGELOG_VIEW_PARAM,
OPTIONS_PARAM,
COMPUTE_UPDATES_PARAM,
REMOVE_CARRYOVERS_PARAM,
IDENTIFIER_COLUMNS_PARAM,
NET_CHANGES,
};
Expand Down Expand Up @@ -163,7 +150,7 @@ public InternalRow[] call(InternalRow args) {
if (shouldComputeUpdateImages(input)) {
Preconditions.checkArgument(!netChanges, "Not support net changes with update images");
df = computeUpdateImages(identifierColumns(input, tableIdent), df);
} else if (shouldRemoveCarryoverRows(input)) {
} else {
df = removeCarryoverRows(df, netChanges);
}

Expand Down Expand Up @@ -195,10 +182,6 @@ private boolean shouldComputeUpdateImages(ProcedureInput input) {
return input.asBoolean(COMPUTE_UPDATES_PARAM, defaultValue);
}

private boolean shouldRemoveCarryoverRows(ProcedureInput input) {
return input.asBoolean(REMOVE_CARRYOVERS_PARAM, true);
}

private Dataset<Row> removeCarryoverRows(Dataset<Row> df, boolean netChanges) {
Predicate<String> columnsToKeep;
if (netChanges) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,41 +186,6 @@ public void testTimestampsBasedQuery() {
sql("select * from %s order by _change_ordinal, id", returns.get(0)[0]));
}

@Test
public void testWithCarryovers() {
createTableWithTwoColumns();
sql("INSERT INTO %s VALUES (1, 'a')", tableName);
Table table = validationCatalog.loadTable(tableIdent);
Snapshot snap0 = table.currentSnapshot();

sql("INSERT INTO %s VALUES (2, 'b')", tableName);
table.refresh();
Snapshot snap1 = table.currentSnapshot();

sql("INSERT OVERWRITE %s VALUES (-2, 'b'), (2, 'b'), (2, 'b')", tableName);
table.refresh();
Snapshot snap2 = table.currentSnapshot();

List<Object[]> returns =
sql(
"CALL %s.system.create_changelog_view("
+ "remove_carryovers => false,"
+ "table => '%s')",
catalogName, tableName, "cdc_view");

String viewName = (String) returns.get(0)[0];
assertEquals(
"Rows should match",
ImmutableList.of(
row(1, "a", INSERT, 0, snap0.snapshotId()),
row(2, "b", INSERT, 1, snap1.snapshotId()),
row(-2, "b", INSERT, 2, snap2.snapshotId()),
row(2, "b", DELETE, 2, snap2.snapshotId()),
row(2, "b", INSERT, 2, snap2.snapshotId()),
row(2, "b", INSERT, 2, snap2.snapshotId())),
sql("select * from %s order by _change_ordinal, id, _change_type", viewName));
}

@Test
public void testUpdate() {
createTableWithTwoColumns();
Expand Down Expand Up @@ -474,41 +439,4 @@ public void testNetChangesWithComputeUpdates() {
"CALL %s.system.create_changelog_view(table => '%s', identifier_columns => array('id'), net_changes => true)",
catalogName, tableName));
}

@Test
public void testNotRemoveCarryOvers() {
createTableWithThreeColumns();

sql("INSERT INTO %s VALUES (1, 'a', 12), (2, 'b', 11), (2, 'e', 12)", tableName);
Table table = validationCatalog.loadTable(tableIdent);
Snapshot snap1 = table.currentSnapshot();

// carry-over row (2, 'e', 12)
sql("INSERT OVERWRITE %s VALUES (3, 'c', 13), (2, 'd', 11), (2, 'e', 12)", tableName);
table.refresh();
Snapshot snap2 = table.currentSnapshot();

List<Object[]> returns =
sql(
"CALL %s.system.create_changelog_view("
+ "remove_carryovers => false,"
+ "table => '%s')",
catalogName, tableName);

String viewName = (String) returns.get(0)[0];

assertEquals(
"Rows should match",
ImmutableList.of(
row(1, "a", 12, INSERT, 0, snap1.snapshotId()),
row(2, "b", 11, INSERT, 0, snap1.snapshotId()),
row(2, "e", 12, INSERT, 0, snap1.snapshotId()),
row(2, "b", 11, DELETE, 1, snap2.snapshotId()),
row(2, "d", 11, INSERT, 1, snap2.snapshotId()),
// the following two rows are carry-over rows
row(2, "e", 12, DELETE, 1, snap2.snapshotId()),
row(2, "e", 12, INSERT, 1, snap2.snapshotId()),
row(3, "c", 13, INSERT, 1, snap2.snapshotId())),
sql("select * from %s order by _change_ordinal, id, data, _change_type", viewName));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@
/**
* A procedure that creates a view for changed rows.
*
* <p>The procedure removes the carry-over rows by default. If you want to keep them, you can set
* "remove_carryovers" to be false in the options.
* <p>The procedure always removes the carry-over rows. Please query {@link SparkChangelogTable}
* instead when carry-over rows are required.
*
* <p>The procedure doesn't compute the pre/post update images by default. If you want to compute
* them, you can set "compute_updates" to be true in the options.
Expand Down Expand Up @@ -91,18 +91,6 @@ public class CreateChangelogViewProcedure extends BaseProcedure {
ProcedureParameter.optional("options", STRING_MAP);
private static final ProcedureParameter COMPUTE_UPDATES_PARAM =
ProcedureParameter.optional("compute_updates", DataTypes.BooleanType);

/**
* Enable or disable the remove carry-over rows.
*
* @deprecated since 1.4.0, will be removed in 1.5.0; The procedure will always remove carry-over
* rows. Please query {@link SparkChangelogTable} instead for the use cases doesn't remove
* carry-over rows.
*/
@Deprecated
private static final ProcedureParameter REMOVE_CARRYOVERS_PARAM =
ProcedureParameter.optional("remove_carryovers", DataTypes.BooleanType);

private static final ProcedureParameter IDENTIFIER_COLUMNS_PARAM =
ProcedureParameter.optional("identifier_columns", STRING_ARRAY);
private static final ProcedureParameter NET_CHANGES =
Expand All @@ -114,7 +102,6 @@ public class CreateChangelogViewProcedure extends BaseProcedure {
CHANGELOG_VIEW_PARAM,
OPTIONS_PARAM,
COMPUTE_UPDATES_PARAM,
REMOVE_CARRYOVERS_PARAM,
IDENTIFIER_COLUMNS_PARAM,
NET_CHANGES,
};
Expand Down Expand Up @@ -163,7 +150,7 @@ public InternalRow[] call(InternalRow args) {
if (shouldComputeUpdateImages(input)) {
Preconditions.checkArgument(!netChanges, "Not support net changes with update images");
df = computeUpdateImages(identifierColumns(input, tableIdent), df);
} else if (shouldRemoveCarryoverRows(input)) {
} else {
df = removeCarryoverRows(df, netChanges);
}

Expand Down Expand Up @@ -195,10 +182,6 @@ private boolean shouldComputeUpdateImages(ProcedureInput input) {
return input.asBoolean(COMPUTE_UPDATES_PARAM, defaultValue);
}

private boolean shouldRemoveCarryoverRows(ProcedureInput input) {
return input.asBoolean(REMOVE_CARRYOVERS_PARAM, true);
}

private Dataset<Row> removeCarryoverRows(Dataset<Row> df, boolean netChanges) {
Predicate<String> columnsToKeep;
if (netChanges) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@
/**
* A procedure that creates a view for changed rows.
*
* <p>The procedure removes the carry-over rows by default. If you want to keep them, you can set
* "remove_carryovers" to be false in the options.
* <p>The procedure always removes the carry-over rows. Please query {@link SparkChangelogTable}
* instead when carry-over rows are required.
*
* <p>The procedure doesn't compute the pre/post update images by default. If you want to compute
* them, you can set "compute_updates" to be true in the options.
Expand Down