diff --git a/docs/content/flink/procedures.md b/docs/content/flink/procedures.md
index ce8c8043ae40..deb005292ff2 100644
--- a/docs/content/flink/procedures.md
+++ b/docs/content/flink/procedures.md
@@ -241,7 +241,10 @@ All available procedures are listed below.
matched_upsert_setting => 'matchedUpsertSetting',
not_matched_insert_condition => 'notMatchedInsertCondition',
not_matched_insert_values => 'notMatchedInsertValues',
- matched_delete_condition => 'matchedDeleteCondition')
+ matched_delete_condition => 'matchedDeleteCondition',
+ not_matched_by_source_upsert_condition => 'notMatchedBySourceUpsertCondition',
+ not_matched_by_source_upsert_setting => 'notMatchedBySourceUpsertSetting',
+ not_matched_by_source_delete_condition => 'notMatchedBySourceDeleteCondition')
To perform "MERGE INTO" syntax. See merge_into action for
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeIntoProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeIntoProcedure.java
index cf8d7191953e..e297c0bdbb4c 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeIntoProcedure.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeIntoProcedure.java
@@ -102,7 +102,19 @@ public class MergeIntoProcedure extends ProcedureBase {
@ArgumentHint(
name = "matched_delete_condition",
type = @DataTypeHint("STRING"),
- isOptional = true)
+ isOptional = true),
+ @ArgumentHint(
+ name = "not_matched_by_source_upsert_condition",
+ type = @DataTypeHint("STRING"),
+ isOptional = true),
+ @ArgumentHint(
+ name = "not_matched_by_source_upsert_setting",
+ type = @DataTypeHint("STRING"),
+ isOptional = true),
+ @ArgumentHint(
+ name = "not_matched_by_source_delete_condition",
+ type = @DataTypeHint("STRING"),
+ isOptional = true),
})
public String[] call(
ProcedureContext procedureContext,
@@ -115,7 +127,10 @@ public String[] call(
String matchedUpsertSetting,
String notMatchedInsertCondition,
String notMatchedInsertValues,
- String matchedDeleteCondition) {
+ String matchedDeleteCondition,
+ String notMatchedBySourceUpsertCondition,
+ String notMatchedBySourceUpsertSetting,
+ String notMatchedBySourceDeleteCondition) {
targetAlias = notnull(targetAlias);
sourceSqls = notnull(sourceSqls);
sourceTable = notnull(sourceTable);
@@ -125,6 +140,9 @@ public String[] call(
notMatchedInsertCondition = notnull(notMatchedInsertCondition);
notMatchedInsertValues = notnull(notMatchedInsertValues);
matchedDeleteCondition = notnull(matchedDeleteCondition);
+ notMatchedBySourceUpsertCondition = notnull(notMatchedBySourceUpsertCondition);
+ notMatchedBySourceUpsertSetting = notnull(notMatchedBySourceUpsertSetting);
+ notMatchedBySourceDeleteCondition = notnull(notMatchedBySourceDeleteCondition);
String warehouse = catalog.warehouse();
Map catalogOptions = catalog.options();
@@ -166,6 +184,20 @@ public String[] call(
action.withMatchedDelete(matchedDeleteCondition);
}
+ if (!notMatchedBySourceUpsertCondition.isEmpty()
+ || !notMatchedBySourceUpsertSetting.isEmpty()) {
+ String condition = nullable(notMatchedBySourceUpsertCondition);
+ String values = nullable(notMatchedBySourceUpsertSetting);
+ checkArgument(
+ !"*".equals(values),
+ "not-matched-by-source-upsert does not support setting notMatchedBySourceUpsertSetting to *.");
+ action.withNotMatchedBySourceUpsert(condition, values);
+ }
+
+ if (!notMatchedBySourceDeleteCondition.isEmpty()) {
+ action.withNotMatchedBySourceDelete(notMatchedBySourceDeleteCondition);
+ }
+
action.withStreamExecutionEnvironment(procedureContext.getExecutionEnvironment());
action.validate();
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
index 41d607fac5f6..3907c0398532 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
@@ -571,6 +571,123 @@ public void testIllegalSourceNameSqlCase() {
.satisfies(anyCauseMatches(ValidationException.class, "Object 'S' not found"));
}
+ @ParameterizedTest
+ @MethodSource("testArguments")
+ public void testNotMatchedBySourceUpsert(boolean qualified, String invoker) throws Exception {
+ sEnv.executeSql("DROP TABLE T");
+ prepareTargetTable(CoreOptions.ChangelogProducer.INPUT);
+
+ // build MergeIntoAction
+ MergeIntoActionBuilder action = new MergeIntoActionBuilder(warehouse, database, "T");
+ action.withSourceSqls("CREATE TEMPORARY VIEW SS AS SELECT k, v, 'unknown', dt FROM S")
+ .withSourceTable(qualified ? "default.SS" : "SS")
+ .withMergeCondition("T.k = SS.k AND T.dt = SS.dt")
+ .withNotMatchedBySourceUpsert(
+ "dt < '02-28'", "v = v || '_nmu', last_action = 'not_matched_upsert'");
+
+ String procedureStatement = "";
+ if ("procedure_indexed".equals(invoker)) {
+ procedureStatement =
+ String.format(
+ "CALL sys.merge_into('%s.T', '', '%s', '%s', 'T.k = SS.k AND T.dt = SS.dt', '', '', '', '', '', 'dt < ''02-28''', 'v = v || ''_nmu'', last_action = ''not_matched_upsert''')",
+ database,
+ "CREATE TEMPORARY VIEW SS AS SELECT k, v, ''unknown'', dt FROM S",
+ qualified ? "default.SS" : "SS");
+ } else if ("procedure_named".equals(invoker)) {
+ procedureStatement =
+ String.format(
+ "CALL sys.merge_into("
+ + "target_table => '%s.T', "
+ + "source_sqls => '%s', "
+ + "source_table => '%s', "
+ + "merge_condition => 'T.k = SS.k AND T.dt = SS.dt', "
+ + "not_matched_by_source_upsert_condition => 'dt < ''02-28''',"
+ + "not_matched_by_source_upsert_setting => 'v = v || ''_nmu'', last_action = ''not_matched_upsert''')",
+ database,
+ "CREATE TEMPORARY VIEW SS AS SELECT k, v, ''unknown'', dt FROM S",
+ qualified ? "default.SS" : "SS");
+ }
+
+ List streamingExpected =
+ Arrays.asList(
+ changelogRow("+U", 2, "v_2_nmu", "not_matched_upsert", "02-27"),
+ changelogRow("+U", 3, "v_3_nmu", "not_matched_upsert", "02-27"));
+
+ List batchExpected =
+ Arrays.asList(
+ changelogRow("+I", 1, "v_1", "creation", "02-27"),
+ changelogRow("+I", 2, "v_2_nmu", "not_matched_upsert", "02-27"),
+ changelogRow("+I", 3, "v_3_nmu", "not_matched_upsert", "02-27"),
+ changelogRow("+I", 4, "v_4", "creation", "02-27"),
+ changelogRow("+I", 5, "v_5", "creation", "02-28"),
+ changelogRow("+I", 6, "v_6", "creation", "02-28"),
+ changelogRow("+I", 7, "v_7", "creation", "02-28"),
+ changelogRow("+I", 8, "v_8", "creation", "02-28"),
+ changelogRow("+I", 9, "v_9", "creation", "02-28"),
+ changelogRow("+I", 10, "v_10", "creation", "02-28"));
+
+ if ("action".equals(invoker)) {
+ validateActionRunResult(action.build(), streamingExpected, batchExpected);
+ } else {
+ validateProcedureResult(procedureStatement, streamingExpected, batchExpected);
+ }
+ }
+
+ @ParameterizedTest
+ @MethodSource("testArguments")
+ public void testNotMatchedBySourceDelete(boolean qualified, String invoker) throws Exception {
+ // build MergeIntoAction
+ MergeIntoActionBuilder action = new MergeIntoActionBuilder(warehouse, database, "T");
+ action.withSourceSqls("CREATE TEMPORARY VIEW SS AS SELECT k, v, 'unknown', dt FROM S")
+ .withSourceTable(qualified ? "default.SS" : "SS")
+ .withMergeCondition("T.k = SS.k AND T.dt = SS.dt")
+ .withNotMatchedBySourceDelete(null);
+
+ String procedureStatement = "";
+ if ("procedure_indexed".equals(invoker)) {
+ procedureStatement =
+ String.format(
+ "CALL sys.merge_into('%s.T', '', '%s', '%s', 'T.k = SS.k AND T.dt = SS.dt', '', '', '', '', '', '', '', 'TRUE')",
+ database,
+ "CREATE TEMPORARY VIEW SS AS SELECT k, v, ''unknown'', dt FROM S",
+ qualified ? "default.SS" : "SS");
+ } else if ("procedure_named".equals(invoker)) {
+ procedureStatement =
+ String.format(
+ "CALL sys.merge_into("
+ + "target_table => '%s.T', "
+ + "source_sqls => '%s', "
+ + "source_table => '%s', "
+ + "merge_condition => 'T.k = SS.k AND T.dt = SS.dt', "
+ + "not_matched_by_source_delete_condition => 'TRUE')",
+ database,
+ "CREATE TEMPORARY VIEW SS AS SELECT k, v, ''unknown'', dt FROM S",
+ qualified ? "default.SS" : "SS");
+ }
+
+ List streamingExpected =
+ Arrays.asList(
+ changelogRow("-D", 2, "v_2", "creation", "02-27"),
+ changelogRow("-D", 3, "v_3", "creation", "02-27"),
+ changelogRow("-D", 5, "v_5", "creation", "02-28"),
+ changelogRow("-D", 6, "v_6", "creation", "02-28"),
+ changelogRow("-D", 9, "v_9", "creation", "02-28"),
+ changelogRow("-D", 10, "v_10", "creation", "02-28"));
+
+ List batchExpected =
+ Arrays.asList(
+ changelogRow("+I", 1, "v_1", "creation", "02-27"),
+ changelogRow("+I", 4, "v_4", "creation", "02-27"),
+ changelogRow("+I", 7, "v_7", "creation", "02-28"),
+ changelogRow("+I", 8, "v_8", "creation", "02-28"));
+
+ if ("action".equals(invoker)) {
+ validateActionRunResult(action.build(), streamingExpected, batchExpected);
+ } else {
+ validateProcedureResult(procedureStatement, streamingExpected, batchExpected);
+ }
+ }
+
private void validateActionRunResult(
MergeIntoAction action, List streamingExpected, List batchExpected)
throws Exception {
|