Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#1366] - Sink null control (upsert OracleDialect) #1403

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ public DatabaseDialect create(AbstractConfig config) {
protected final String jdbcUrl;
protected final DatabaseDialectProvider.JdbcUrlInfo jdbcUrlInfo;
protected final QuoteMethod quoteSqlIdentifiers;
protected boolean enabledSinkNullKeyProtection;
private final IdentifierRules defaultIdentifierRules;
private final AtomicReference<IdentifierRules> identifierRules = new AtomicReference<>();
private final Queue<Connection> connections = new ConcurrentLinkedQueue<>();
Expand Down Expand Up @@ -190,6 +191,7 @@ protected GenericDatabaseDialect(
quoteSqlIdentifiers = QuoteMethod.get(
config.getString(JdbcSinkConfig.QUOTE_SQL_IDENTIFIERS_CONFIG)
);
enabledSinkNullKeyProtection = config.getBoolean(JdbcSinkConfig.ENABLE_NULL_KEY_PROTECTION);
} else {
catalogPattern = config.getString(JdbcSourceTaskConfig.CATALOG_PATTERN_CONFIG);
schemaPattern = config.getString(JdbcSourceTaskConfig.SCHEMA_PATTERN_CONFIG);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,7 @@ public String buildUpsertQueryStatement(
Collection<ColumnId> keyColumns,
Collection<ColumnId> nonKeyColumns
) {

// https://blogs.oracle.com/cmar/entry/using_merge_to_do_an
final Transform<ColumnId> transform = (builder, col) -> {
builder.append(table)
Expand All @@ -323,6 +324,25 @@ public String buildUpsertQueryStatement(
.appendColumnName(col.name());
};

final Transform<ColumnId> nullableKeyTransform = (builder, col) -> {
builder
.append("(")
.append(table)
.append(".")
.appendColumnName(col.name())
.append("=incoming.")
.appendColumnName(col.name())
.append(" OR (")
.append(table)
.append(".")
.appendColumnName(col.name())
.append(" IS NULL AND ")
.append("incoming.")
.appendColumnName(col.name())
.append(" IS NULL))")
;
};

ExpressionBuilder builder = expressionBuilder();
builder.append("merge into ");
builder.append(table);
Expand All @@ -334,7 +354,7 @@ public String buildUpsertQueryStatement(
builder.append(" FROM dual) incoming on(");
builder.appendList()
.delimitedBy(" and ")
.transformedBy(transform)
.transformedBy(enabledSinkNullKeyProtection ? nullableKeyTransform : transform)
.of(keyColumns);
builder.append(")");
if (nonKeyColumns != null && !nonKeyColumns.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,13 @@ public enum PrimaryKeyMode {
+ "the connector, e.g. ``UPDATE``.";
private static final String INSERT_MODE_DISPLAY = "Insert Mode";

public static final String ENABLE_NULL_KEY_PROTECTION = "enable.null.key.protection";
private static final String ENABLE_NULL_KEY_PROTECTION_DEFAULT = "false";
private static final String ENABLE_NULL_KEY_PROTECTION_DOC =
"If upsert mode is enabled (Only in OracleDialect), the system will protect \n"
+ "<table.column>=<incoming.column> to prevent 0 row results in nullable keys.";
private static final String ENABLE_NULL_KEY_PROTECTION_DISPLAY = "Enable Null Key Protection";

public static final String PK_FIELDS = "pk.fields";
private static final String PK_FIELDS_DEFAULT = "";
private static final String PK_FIELDS_DOC =
Expand Down Expand Up @@ -388,6 +395,17 @@ public enum PrimaryKeyMode {
ConfigDef.Width.MEDIUM,
TABLE_TYPES_DISPLAY
)
.define(
ENABLE_NULL_KEY_PROTECTION,
ConfigDef.Type.BOOLEAN,
ENABLE_NULL_KEY_PROTECTION_DEFAULT,
ConfigDef.Importance.MEDIUM,
ENABLE_NULL_KEY_PROTECTION_DOC,
WRITES_GROUP,
5,
ConfigDef.Width.MEDIUM,
ENABLE_NULL_KEY_PROTECTION_DISPLAY
)
// Data Mapping
.define(
TABLE_NAME_FORMAT,
Expand Down Expand Up @@ -545,6 +563,7 @@ public enum PrimaryKeyMode {
public final boolean useHoldlockInMerge;

public final boolean trimSensitiveLogsEnabled;
public final boolean enableNullKeyProtection;

public JdbcSinkConfig(Map<?, ?> props) {
super(CONFIG_DEF, props);
Expand Down Expand Up @@ -575,6 +594,7 @@ public JdbcSinkConfig(Map<?, ?> props) {
"Primary key mode must be 'record_key' when delete support is enabled");
}
tableTypes = TableType.parse(getList(TABLE_TYPES_CONFIG));
enableNullKeyProtection = getBoolean(ENABLE_NULL_KEY_PROTECTION);
}

private String getPasswordValue(String key) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package io.confluent.connect.jdbc.dialect;

import io.confluent.connect.jdbc.sink.JdbcSinkConfig;
import io.confluent.connect.jdbc.util.ColumnDefinition;
import io.confluent.connect.jdbc.util.DateTimeUtils;
import java.io.ByteArrayInputStream;
Expand Down Expand Up @@ -72,6 +73,10 @@ public void bindFieldBytesValue() throws SQLException {
verifyBindField(++index, Schema.BYTES_SCHEMA, ByteBuffer.wrap(new byte[]{42})).setBlob(eq(index), any(ByteArrayInputStream.class));
}

protected OracleDatabaseDialect createSinkDialect(String... propertyPairs) {
return new OracleDatabaseDialect(sinkConfigWithUrl("jdbc:oracle:thin://something", propertyPairs));
}

@Test
public void shouldMapPrimitiveSchemaTypeToSqlTypes() {
assertPrimitiveMapping(Type.INT8, "NUMBER(3,0)");
Expand Down Expand Up @@ -256,6 +261,26 @@ public void upsert() {
assertEquals(expected, actual);
}

@Test
public void protectedNullKeyUpsert() {
TableId article = tableId("ARTICLE");
String expected = "merge into \"ARTICLE\" " +
"using (select ? \"title\", ? \"author\", ? \"body\" FROM dual) incoming on" +
"((\"ARTICLE\".\"title\"=incoming.\"title\" OR (\"ARTICLE\".\"title\" IS " +
"NULL AND incoming.\"title\" IS NULL)) and (\"ARTICLE\"" +
".\"author\"=incoming.\"author\" OR (\"ARTICLE\".\"author\" IS NULL AND " +
"incoming.\"author\" IS NULL))) " +
"when matched then update set \"ARTICLE\".\"body\"=incoming.\"body\" " +
"when not matched then insert(\"ARTICLE\".\"body\",\"ARTICLE\".\"title\"," +
"\"ARTICLE\".\"author\") " +
"values(incoming.\"body\",incoming.\"title\",incoming.\"author\")";

OracleDatabaseDialect protectedSinkDialect = createSinkDialect(JdbcSinkConfig.ENABLE_NULL_KEY_PROTECTION, "true");
String actual = protectedSinkDialect.buildUpsertQueryStatement(article, columns(article, "title", "author"),
columns(article, "body"));
assertEquals(expected, actual);
}

@Test
public void shouldSanitizeUrlWithCredentialsInHosts() {
assertSanitizedUrl(
Expand Down