Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Option for not inserting primary keys into Sql Server tables #1343

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -431,19 +431,61 @@ public String buildUpsertQueryStatement(
.of(nonKeyColumns);
}
builder.append(" when not matched then insert (");

// potential primary key columns
Iterable<ColumnId> pkColumns = Collections.emptyList();

if (((JdbcSinkConfig) this.config).insertPrimaryKeys) {
pkColumns = keyColumns;
}

builder.appendList()
.delimitedBy(", ")
.transformedBy(ExpressionBuilder.columnNames())
.of(nonKeyColumns, keyColumns);
.of(nonKeyColumns, pkColumns);
builder.append(") values (");

builder.appendList()
.delimitedBy(",")
.transformedBy(ExpressionBuilder.columnNamesWithPrefix("incoming."))
.of(nonKeyColumns, keyColumns);
.of(nonKeyColumns, pkColumns);
builder.append(");");


return builder.toString();
}


@Override
public String buildInsertStatement(
TableId table,
Collection<ColumnId> keyColumns,
Collection<ColumnId> nonKeyColumns
) {
ExpressionBuilder builder = expressionBuilder();
builder.append("INSERT INTO ");
builder.append(table);
builder.append("(");

// potential primary key columns
Collection<ColumnId> pkColumns = Collections.emptyList();

if (((JdbcSinkConfig) this.config).insertPrimaryKeys) {
pkColumns = keyColumns;
}

builder.appendList()
.delimitedBy(",")
.transformedBy(ExpressionBuilder.columnNames())
.of(pkColumns, nonKeyColumns);
builder.append(") VALUES(");
builder.appendMultiple(",", "?", pkColumns.size() + nonKeyColumns.size());

builder.append(")");
return builder.toString();
}


/**
* If Sql Server is 2016 or newer, and time stamp mode configured against a datetime column
* kill task.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,15 @@ public enum PrimaryKeyMode {
+ "Note that it is only applicable to SQL Server.";
private static final String MSSQL_USE_MERGE_HOLDLOCK_DISPLAY =
"SQL Server - Use HOLDLOCK in MERGE";

public static final String MSSQL_INSERT_PRIMARY_KEYS = "mssql.insert.primary.keys";
private static final String MSSQL_INSERT_PRIMARY_KEYS_DEFAULT = "true";
private static final String MSSQL_INSERT_PRIMARY_KEYS_DOC =
"This is only implemented for SQL Server: If false the primary key(s) "
+ "will only be used for UPDATE but not for INSERT. This also applies to MERGE(mode=upsert)."
+ "Background: SQL Server datbases reject inserting values into a auto-incremental columns "
+ "of type IDENTITY.";
private static final String MSSQL_INSERT_PRIMARY_KEYS_DISPLAY =
"SQL Server - Whether to insert primary keys in INSERT or MERGE scenario";
public static final ConfigDef CONFIG_DEF = new ConfigDef()
// Connection
.define(
Expand Down Expand Up @@ -446,6 +454,17 @@ public enum PrimaryKeyMode {
ConfigDef.Width.MEDIUM,
DB_TIMEZONE_CONFIG_DISPLAY
)
.define(
MSSQL_INSERT_PRIMARY_KEYS,
ConfigDef.Type.BOOLEAN,
MSSQL_INSERT_PRIMARY_KEYS_DEFAULT,
ConfigDef.Importance.LOW,
MSSQL_INSERT_PRIMARY_KEYS_DOC,
DATAMAPPING_GROUP,
6,
ConfigDef.Width.MEDIUM,
MSSQL_INSERT_PRIMARY_KEYS_DISPLAY
)
// DDL
.define(
AUTO_CREATE,
Expand Down Expand Up @@ -546,6 +565,8 @@ public enum PrimaryKeyMode {

public final boolean trimSensitiveLogsEnabled;

public final boolean insertPrimaryKeys;

public JdbcSinkConfig(Map<?, ?> props) {
super(CONFIG_DEF, props);
connectorName = ConfigUtils.connectorName(props);
Expand All @@ -564,6 +585,7 @@ public JdbcSinkConfig(Map<?, ?> props) {
insertMode = InsertMode.valueOf(getString(INSERT_MODE).toUpperCase());
pkMode = PrimaryKeyMode.valueOf(getString(PK_MODE).toUpperCase());
pkFields = getList(PK_FIELDS);
insertPrimaryKeys = getBoolean(MSSQL_INSERT_PRIMARY_KEYS);
dialectName = getString(DIALECT_NAME_CONFIG);
fieldsWhitelist = new HashSet<>(getList(FIELDS_WHITELIST));
String dbTimeZone = getString(DB_TIMEZONE_CONFIG);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.TimeZone;
import java.util.concurrent.ThreadLocalRandom;

import io.confluent.connect.jdbc.sink.JdbcSinkConfig;
import io.confluent.connect.jdbc.util.ColumnDefinition;
import io.confluent.connect.jdbc.util.ColumnId;
import org.apache.kafka.common.config.AbstractConfig;
Expand All @@ -42,7 +43,7 @@
import io.confluent.connect.jdbc.util.TableId;
import org.mockito.Mockito;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -402,6 +403,61 @@ public void upsert2() {
);
}

@Test
public void upsertSkipsPrimaryKeyInsertsIfConfiguredSo() {
JdbcSinkConfig sinkConfig = sinkConfigWithUrl("jdbc:jtds:sqlsserver://something","mssql.insert.primary.keys","false");
assertFalse(sinkConfig.insertPrimaryKeys);
dialect = createDialect(sinkConfig);
TableId customer = tableId("Customer");
assertEquals(
"merge into [Customer] with (HOLDLOCK) AS target using (select ? AS [id], ? AS [name], ? " +
"AS [salary], ? AS [address]) AS incoming on (target.[id]=incoming.[id]) when matched then update set " +
"[name]=incoming.[name],[salary]=incoming.[salary],[address]=incoming" +
".[address] when not matched then insert " +
"([name], [salary], [address])"+
" values (incoming.[name],incoming.[salary],incoming.[address]);",
dialect.buildUpsertQueryStatement(
customer,
columns(customer, "id"),
columns(customer, "name", "salary", "address")
)
);

}
@Test
public void insertSkipsPrimaryKeyIfConfiguredSo() {
JdbcSinkConfig sinkConfig = sinkConfigWithUrl("jdbc:jtds:sqlsserver://something","mssql.insert.primary.keys","false");
assertFalse(sinkConfig.insertPrimaryKeys);
dialect = createDialect(sinkConfig);
TableId customer = tableId("Customer");
assertEquals(
"INSERT INTO [Customer]([name],[salary],[address]) VALUES(?,?,?)",
dialect.buildInsertStatement(
customer,
columns(customer, "id"),
columns(customer, "name", "salary", "address")
)
);

}

@Test
public void insertAddsPrimaryKeysIfConfiguredSo() {
JdbcSinkConfig sinkConfig = sinkConfigWithUrl("jdbc:jtds:sqlsserver://something");
assertTrue(sinkConfig.insertPrimaryKeys);
dialect = createDialect(sinkConfig);
TableId customer = tableId("Customer");
assertEquals(
"INSERT INTO [Customer]([id],[name],[salary],[address]) VALUES(?,?,?,?)",
dialect.buildInsertStatement(
customer,
columns(customer, "id"),
columns(customer, "name", "salary", "address")
)
);

}

@Test(expected=ConnectException.class)
public void shouldFailDatetimeColumnAsTimeStampColumn() throws SQLException, ConnectException {
String timeStampColumnName = "start_time";
Expand Down