diff --git a/src/main/java/io/confluent/connect/jdbc/dialect/GenericDatabaseDialect.java b/src/main/java/io/confluent/connect/jdbc/dialect/GenericDatabaseDialect.java index 9dbde25b0..f617acd55 100644 --- a/src/main/java/io/confluent/connect/jdbc/dialect/GenericDatabaseDialect.java +++ b/src/main/java/io/confluent/connect/jdbc/dialect/GenericDatabaseDialect.java @@ -150,6 +150,7 @@ public DatabaseDialect create(AbstractConfig config) { protected final String jdbcUrl; protected final DatabaseDialectProvider.JdbcUrlInfo jdbcUrlInfo; protected final QuoteMethod quoteSqlIdentifiers; + protected boolean enabledSinkNullKeyProtection; private final IdentifierRules defaultIdentifierRules; private final AtomicReference identifierRules = new AtomicReference<>(); private final Queue connections = new ConcurrentLinkedQueue<>(); @@ -190,6 +191,7 @@ protected GenericDatabaseDialect( quoteSqlIdentifiers = QuoteMethod.get( config.getString(JdbcSinkConfig.QUOTE_SQL_IDENTIFIERS_CONFIG) ); + enabledSinkNullKeyProtection = config.getBoolean(JdbcSinkConfig.ENABLE_NULL_KEY_PROTECTION); } else { catalogPattern = config.getString(JdbcSourceTaskConfig.CATALOG_PATTERN_CONFIG); schemaPattern = config.getString(JdbcSourceTaskConfig.SCHEMA_PATTERN_CONFIG); diff --git a/src/main/java/io/confluent/connect/jdbc/dialect/OracleDatabaseDialect.java b/src/main/java/io/confluent/connect/jdbc/dialect/OracleDatabaseDialect.java index 21618e09b..1f2185006 100644 --- a/src/main/java/io/confluent/connect/jdbc/dialect/OracleDatabaseDialect.java +++ b/src/main/java/io/confluent/connect/jdbc/dialect/OracleDatabaseDialect.java @@ -314,6 +314,7 @@ public String buildUpsertQueryStatement( Collection keyColumns, Collection nonKeyColumns ) { + // https://blogs.oracle.com/cmar/entry/using_merge_to_do_an final Transform transform = (builder, col) -> { builder.append(table) @@ -323,6 +324,25 @@ public String buildUpsertQueryStatement( .appendColumnName(col.name()); }; + final Transform nullableKeyTransform = (builder, col) -> { + builder + .append("(") + .append(table) + .append(".") + .appendColumnName(col.name()) + .append("=incoming.") + .appendColumnName(col.name()) + .append(" OR (") + .append(table) + .append(".") + .appendColumnName(col.name()) + .append(" IS NULL AND ") + .append("incoming.") + .appendColumnName(col.name()) + .append(" IS NULL))") + ; + }; + ExpressionBuilder builder = expressionBuilder(); builder.append("merge into "); builder.append(table); @@ -334,7 +354,7 @@ public String buildUpsertQueryStatement( builder.append(" FROM dual) incoming on("); builder.appendList() .delimitedBy(" and ") - .transformedBy(transform) + .transformedBy(enabledSinkNullKeyProtection ? nullableKeyTransform : transform) .of(keyColumns); builder.append(")"); if (nonKeyColumns != null && !nonKeyColumns.isEmpty()) { diff --git a/src/main/java/io/confluent/connect/jdbc/sink/JdbcSinkConfig.java b/src/main/java/io/confluent/connect/jdbc/sink/JdbcSinkConfig.java index 81f078622..89a4c0f35 100644 --- a/src/main/java/io/confluent/connect/jdbc/sink/JdbcSinkConfig.java +++ b/src/main/java/io/confluent/connect/jdbc/sink/JdbcSinkConfig.java @@ -166,6 +166,13 @@ public enum PrimaryKeyMode { + "the connector, e.g. ``UPDATE``."; private static final String INSERT_MODE_DISPLAY = "Insert Mode"; + public static final String ENABLE_NULL_KEY_PROTECTION = "enable.null.key.protection"; + private static final String ENABLE_NULL_KEY_PROTECTION_DEFAULT = "false"; + private static final String ENABLE_NULL_KEY_PROTECTION_DOC = + "If upsert mode is enabled (Only in OracleDialect), the system will protect \n" + + "= to prevent 0 row results in nullable keys."; + private static final String ENABLE_NULL_KEY_PROTECTION_DISPLAY = "Enable Null Key Protection"; + public static final String PK_FIELDS = "pk.fields"; private static final String PK_FIELDS_DEFAULT = ""; private static final String PK_FIELDS_DOC = @@ -388,6 +395,17 @@ public enum PrimaryKeyMode { ConfigDef.Width.MEDIUM, TABLE_TYPES_DISPLAY ) + .define( + ENABLE_NULL_KEY_PROTECTION, + ConfigDef.Type.BOOLEAN, + ENABLE_NULL_KEY_PROTECTION_DEFAULT, + ConfigDef.Importance.MEDIUM, + ENABLE_NULL_KEY_PROTECTION_DOC, + WRITES_GROUP, + 5, + ConfigDef.Width.MEDIUM, + ENABLE_NULL_KEY_PROTECTION_DISPLAY + ) // Data Mapping .define( TABLE_NAME_FORMAT, @@ -545,6 +563,7 @@ public enum PrimaryKeyMode { public final boolean useHoldlockInMerge; public final boolean trimSensitiveLogsEnabled; + public final boolean enableNullKeyProtection; public JdbcSinkConfig(Map props) { super(CONFIG_DEF, props); @@ -575,6 +594,7 @@ public JdbcSinkConfig(Map props) { "Primary key mode must be 'record_key' when delete support is enabled"); } tableTypes = TableType.parse(getList(TABLE_TYPES_CONFIG)); + enableNullKeyProtection = getBoolean(ENABLE_NULL_KEY_PROTECTION); } private String getPasswordValue(String key) { diff --git a/src/test/java/io/confluent/connect/jdbc/dialect/OracleDatabaseDialectTest.java b/src/test/java/io/confluent/connect/jdbc/dialect/OracleDatabaseDialectTest.java index 447e403f1..a178284da 100644 --- a/src/test/java/io/confluent/connect/jdbc/dialect/OracleDatabaseDialectTest.java +++ b/src/test/java/io/confluent/connect/jdbc/dialect/OracleDatabaseDialectTest.java @@ -15,6 +15,7 @@ package io.confluent.connect.jdbc.dialect; +import io.confluent.connect.jdbc.sink.JdbcSinkConfig; import io.confluent.connect.jdbc.util.ColumnDefinition; import io.confluent.connect.jdbc.util.DateTimeUtils; import java.io.ByteArrayInputStream; @@ -72,6 +73,10 @@ public void bindFieldBytesValue() throws SQLException { verifyBindField(++index, Schema.BYTES_SCHEMA, ByteBuffer.wrap(new byte[]{42})).setBlob(eq(index), any(ByteArrayInputStream.class)); } + protected OracleDatabaseDialect createSinkDialect(String... propertyPairs) { + return new OracleDatabaseDialect(sinkConfigWithUrl("jdbc:oracle:thin://something", propertyPairs)); + } + @Test public void shouldMapPrimitiveSchemaTypeToSqlTypes() { assertPrimitiveMapping(Type.INT8, "NUMBER(3,0)"); @@ -256,6 +261,26 @@ public void upsert() { assertEquals(expected, actual); } + @Test + public void protectedNullKeyUpsert() { + TableId article = tableId("ARTICLE"); + String expected = "merge into \"ARTICLE\" " + + "using (select ? \"title\", ? \"author\", ? \"body\" FROM dual) incoming on" + + "((\"ARTICLE\".\"title\"=incoming.\"title\" OR (\"ARTICLE\".\"title\" IS " + + "NULL AND incoming.\"title\" IS NULL)) and (\"ARTICLE\"" + + ".\"author\"=incoming.\"author\" OR (\"ARTICLE\".\"author\" IS NULL AND " + + "incoming.\"author\" IS NULL))) " + + "when matched then update set \"ARTICLE\".\"body\"=incoming.\"body\" " + + "when not matched then insert(\"ARTICLE\".\"body\",\"ARTICLE\".\"title\"," + + "\"ARTICLE\".\"author\") " + + "values(incoming.\"body\",incoming.\"title\",incoming.\"author\")"; + + OracleDatabaseDialect protectedSinkDialect = createSinkDialect(JdbcSinkConfig.ENABLE_NULL_KEY_PROTECTION, "true"); + String actual = protectedSinkDialect.buildUpsertQueryStatement(article, columns(article, "title", "author"), + columns(article, "body")); + assertEquals(expected, actual); + } + @Test public void shouldSanitizeUrlWithCredentialsInHosts() { assertSanitizedUrl(