diff --git a/database-commons/src/main/java/io/cdap/plugin/db/DBRecord.java b/database-commons/src/main/java/io/cdap/plugin/db/DBRecord.java index 1d1743177..7998611d4 100644 --- a/database-commons/src/main/java/io/cdap/plugin/db/DBRecord.java +++ b/database-commons/src/main/java/io/cdap/plugin/db/DBRecord.java @@ -311,6 +311,9 @@ protected void writeToDB(PreparedStatement stmt, @Nullable Schema.Field field, i case DECIMAL: stmt.setBigDecimal(sqlIndex, record.getDecimal(fieldName)); break; + case DATETIME: + stmt.setTimestamp(sqlIndex, Timestamp.valueOf(record.getDateTime(fieldName))); + break; } return; } diff --git a/mssql-plugin/src/e2e-test/features/mssql/DatatypeDateTime.feature b/mssql-plugin/src/e2e-test/features/mssql/DatatypeDateTime.feature new file mode 100644 index 000000000..0e82b034e --- /dev/null +++ b/mssql-plugin/src/e2e-test/features/mssql/DatatypeDateTime.feature @@ -0,0 +1,65 @@ +# +# Copyright © 2023 Cask Data, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. +# + +@Mssql +Feature: Mssql - Verify Mssql source data transfer + @MSSQL_SOURCE_DATATYPES_DATETIME_TEST @MSSQL_SINK_TEST @Mssql_Required + Scenario: To verify data is getting transferred from Mssql to Mssql successfully + Given Open Datafusion Project to configure pipeline + When Expand Plugin group in the LHS plugins list: "Source" + When Select plugin: "SQL Server" from the plugins list as: "Source" + When Expand Plugin group in the LHS plugins list: "Sink" + When Select plugin: "SQL Server" from the plugins list as: "Sink" + Then Connect plugins: "SQL Server" and "SQL Server2" to establish connection + Then Navigate to the properties page of plugin: "SQL Server" + Then Select dropdown plugin property: "select-jdbcPluginName" with option value: "driverName" + Then Replace input plugin property: "host" with value: "host" for Credentials and Authorization related fields + Then Replace input plugin property: "port" with value: "port" for Credentials and Authorization related fields + Then Replace input plugin property: "user" with value: "username" for Credentials and Authorization related fields + Then Replace input plugin property: "password" with value: "password" for Credentials and Authorization related fields + Then Enter input plugin property: "referenceName" with value: "sourceRef" + Then Replace input plugin property: "database" with value: "databaseName" + Then Enter textarea plugin property: "importQuery" with value: "selectQuery" + Then Click on the Get Schema button + Then Verify the Output Schema matches the Expected Schema: "outputDatatypesSchema4" + Then Validate "SQL Server" plugin properties + Then Close the Plugin Properties page + Then Navigate to the properties page of plugin: "SQL Server2" + Then Select dropdown plugin property: "select-jdbcPluginName" with option value: "driverName" + Then Replace input plugin property: "host" with value: "host" for Credentials and Authorization related fields + Then Replace input plugin property: "port" with value: "port" for Credentials and Authorization related fields + Then Replace input plugin property: "database" with value: "databaseName" + Then Replace input plugin property: "tableName" with value: "targetTable" + Then Replace input plugin property: "dbSchemaName" with value: "schema" + Then Replace input plugin property: "user" with value: "username" for Credentials and Authorization related fields + Then Replace input plugin property: "password" with value: "password" for Credentials and Authorization related fields + Then Enter input plugin property: "referenceName" with value: "targetRef" + Then Validate "SQL Server2" plugin properties + Then Close the Plugin Properties page + Then Save the pipeline + Then Preview and run the pipeline + Then Verify the preview of pipeline is "success" + Then Click on preview data for Mssql sink + Then Verify preview output schema matches the outputSchema captured in properties + Then Close the preview data + Then Deploy the pipeline + Then Run the Pipeline in Runtime + Then Wait till pipeline is in running state + Then Open and capture logs + Then Verify the pipeline status is "Succeeded" + Then Validate records transferred to target table are equal to number of records from the source table + + diff --git a/mssql-plugin/src/e2e-test/java/io.cdap.plugin/MssqlClient.java b/mssql-plugin/src/e2e-test/java/io.cdap.plugin/MssqlClient.java index 941149549..14cd3d2d3 100644 --- a/mssql-plugin/src/e2e-test/java/io.cdap.plugin/MssqlClient.java +++ b/mssql-plugin/src/e2e-test/java/io.cdap.plugin/MssqlClient.java @@ -155,6 +155,30 @@ public static void createTargetUniqueIdentifierTable(String targetTable, String } } + public static void createSourceDateTimeTable(String sourceTable, String schema) throws SQLException, + ClassNotFoundException { + try (Connection connect = getMssqlConnection(); Statement statement = connect.createStatement()) { + String dateTimeColumns = PluginPropertyUtils.pluginProp("dateTimeColumns"); + String createSourceTableQuery3 = createTableQuery(sourceTable, schema, dateTimeColumns); + statement.executeUpdate(createSourceTableQuery3); + + // Insert dummy data. + String dateTimeValues = PluginPropertyUtils.pluginProp("dateTimeValues"); + String dateTimeColumnsList = PluginPropertyUtils.pluginProp("dateTimeColumnsList"); + statement.executeUpdate(insertQuery(sourceTable, schema, dateTimeColumnsList, + dateTimeValues)); + } + } + + public static void createTargetDateTimeTable(String targetTable, String schema) throws SQLException, + ClassNotFoundException { + try (Connection connect = getMssqlConnection(); Statement statement = connect.createStatement()) { + String dateTimeColumns = PluginPropertyUtils.pluginProp("dateTimeColumns"); + String createTargetTableQuery3 = createTableQuery(targetTable, schema, dateTimeColumns); + statement.executeUpdate(createTargetTableQuery3); + } + } + public static void deleteTables(String schema, String[] tables) throws SQLException, ClassNotFoundException { try (Connection connect = getMssqlConnection(); Statement statement = connect.createStatement()) { diff --git a/mssql-plugin/src/e2e-test/java/io.cdap.plugin/common.stepsdesign/TestSetupHooks.java b/mssql-plugin/src/e2e-test/java/io.cdap.plugin/common.stepsdesign/TestSetupHooks.java index 95e625f82..02737fe1f 100644 --- a/mssql-plugin/src/e2e-test/java/io.cdap.plugin/common.stepsdesign/TestSetupHooks.java +++ b/mssql-plugin/src/e2e-test/java/io.cdap.plugin/common.stepsdesign/TestSetupHooks.java @@ -73,6 +73,14 @@ public static void createDatatypesTablesUniqueIdentifier() throws SQLException, PluginPropertyUtils.pluginProp("schema")); } + @Before(order = 2, value = "@MSSQL_SOURCE_DATATYPES_DATETIME_TEST") + public static void createDatatypesTablesDateTime() throws SQLException, ClassNotFoundException { + MssqlClient.createSourceDateTimeTable(PluginPropertyUtils.pluginProp("sourceTable"), + PluginPropertyUtils.pluginProp("schema")); + MssqlClient.createTargetDateTimeTable(PluginPropertyUtils.pluginProp("targetTable"), + PluginPropertyUtils.pluginProp("schema")); + } + @After(order = 1, value = "@MSSQL_SINK_TEST") public static void dropTables() throws SQLException, ClassNotFoundException { MssqlClient.deleteTables(PluginPropertyUtils.pluginProp("schema"), diff --git a/mssql-plugin/src/e2e-test/resources/pluginParameters.properties b/mssql-plugin/src/e2e-test/resources/pluginParameters.properties index c22b6b6c6..e75cefa47 100644 --- a/mssql-plugin/src/e2e-test/resources/pluginParameters.properties +++ b/mssql-plugin/src/e2e-test/resources/pluginParameters.properties @@ -35,8 +35,13 @@ imageColumnsList=(ID,COL1) imageValues=VALUES ('User1', '0x48692054686572652120486F772061726520796F75206665656C696E6720746F646179203F') outputDatatypesSchema2=[{"key":"ID","value":"string"},{"key":"COL1","value":"bytes"}] - uniqueIdentifierColumns=(ID VARCHAR(100) PRIMARY KEY, COL1 UNIQUEIDENTIFIER) uniqueIdentifierColumnsList=(ID, COL1) uniqueIdentifierValues=VALUES ('User1', '6F9619FF-8B86-D011-B42D-00C04FC964FF') outputDatatypesSchema3=[{"key":"ID","value":"string"},{"key":"COL1","value":"string"}] + +dateTimeColumns=(ID VARCHAR(100) PRIMARY KEY, COL1 DATETIME, COL2 DATETIME2(0), COL3 DATETIMEOFFSET(0)) +dateTimeColumnsList=(ID, COL1, COL2, COL3) +dateTimeValues=VALUES ('User1', '2023-01-01 01:00:00.000', '2023-01-01 01:00:00.000', '2025-12-10 12:32:10 +01:00') +outputDatatypesSchema4=[{"key":"ID","value":"string"},{"key":"COL1","value":"datetime"},\ + {"key":"COL2","value":"datetime"},{"key":"COL3","value":"datetime"}] diff --git a/mssql-plugin/src/main/java/io/cdap/plugin/mssql/SqlFieldsValidator.java b/mssql-plugin/src/main/java/io/cdap/plugin/mssql/SqlFieldsValidator.java index b9c95edd9..d5d7ae9b1 100644 --- a/mssql-plugin/src/main/java/io/cdap/plugin/mssql/SqlFieldsValidator.java +++ b/mssql-plugin/src/main/java/io/cdap/plugin/mssql/SqlFieldsValidator.java @@ -38,6 +38,14 @@ public boolean isFieldCompatible(Schema.Field field, ResultSetMetaData metadata, boolean isSigned = metadata.isSigned(index); int precision = metadata.getPrecision(index); + // Handles datetime datatypes + // Case when Timestamp maps to datetime + // Case when Datetime2 maps to datetime + // Case when DatetimeOffset maps to datetime + if ((sqlType == Types.TIMESTAMP || sqlType == SqlServerSourceSchemaReader.DATETIME_OFFSET_TYPE) + && fieldLogicalType.equals(Schema.LogicalType.DATETIME)) { + return true; + } // Handle logical types first if (fieldLogicalType != null) { return super.isFieldCompatible(fieldType, fieldLogicalType, sqlType, precision, isSigned);