Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added datetime data types #378

Open
wants to merge 7 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,9 @@ protected void writeToDB(PreparedStatement stmt, @Nullable Schema.Field field, i
case DECIMAL:
stmt.setBigDecimal(sqlIndex, record.getDecimal(fieldName));
break;
case DATETIME:
stmt.setTimestamp(sqlIndex, Timestamp.valueOf(record.getDateTime(fieldName)));
break;
}
return;
}
Expand Down
65 changes: 65 additions & 0 deletions mssql-plugin/src/e2e-test/features/mssql/DatatypeDateTime.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#
# Copyright © 2023 Cask Data, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
#

@Mssql
Feature: Mssql - Verify Mssql source data transfer
@MSSQL_SOURCE_DATATYPES_DATETIME_TEST @MSSQL_SINK_TEST @Mssql_Required
Scenario: To verify data is getting transferred from Mssql to Mssql successfully
Given Open Datafusion Project to configure pipeline
When Expand Plugin group in the LHS plugins list: "Source"
When Select plugin: "SQL Server" from the plugins list as: "Source"
When Expand Plugin group in the LHS plugins list: "Sink"
When Select plugin: "SQL Server" from the plugins list as: "Sink"
Then Connect plugins: "SQL Server" and "SQL Server2" to establish connection
Then Navigate to the properties page of plugin: "SQL Server"
Then Select dropdown plugin property: "select-jdbcPluginName" with option value: "driverName"
Then Replace input plugin property: "host" with value: "host" for Credentials and Authorization related fields
Then Replace input plugin property: "port" with value: "port" for Credentials and Authorization related fields
Then Replace input plugin property: "user" with value: "username" for Credentials and Authorization related fields
Then Replace input plugin property: "password" with value: "password" for Credentials and Authorization related fields
Then Enter input plugin property: "referenceName" with value: "sourceRef"
Then Replace input plugin property: "database" with value: "databaseName"
Then Enter textarea plugin property: "importQuery" with value: "selectQuery"
Then Click on the Get Schema button
Then Verify the Output Schema matches the Expected Schema: "outputDatatypesSchema4"
Then Validate "SQL Server" plugin properties
Then Close the Plugin Properties page
Then Navigate to the properties page of plugin: "SQL Server2"
Then Select dropdown plugin property: "select-jdbcPluginName" with option value: "driverName"
Then Replace input plugin property: "host" with value: "host" for Credentials and Authorization related fields
Then Replace input plugin property: "port" with value: "port" for Credentials and Authorization related fields
Then Replace input plugin property: "database" with value: "databaseName"
Then Replace input plugin property: "tableName" with value: "targetTable"
Then Replace input plugin property: "dbSchemaName" with value: "schema"
Then Replace input plugin property: "user" with value: "username" for Credentials and Authorization related fields
Then Replace input plugin property: "password" with value: "password" for Credentials and Authorization related fields
Then Enter input plugin property: "referenceName" with value: "targetRef"
Then Validate "SQL Server2" plugin properties
Then Close the Plugin Properties page
Then Save the pipeline
Then Preview and run the pipeline
Then Verify the preview of pipeline is "success"
Then Click on preview data for Mssql sink
Then Verify preview output schema matches the outputSchema captured in properties
Then Close the preview data
Then Deploy the pipeline
Then Run the Pipeline in Runtime
Then Wait till pipeline is in running state
Then Open and capture logs
Then Verify the pipeline status is "Succeeded"
Then Validate records transferred to target table are equal to number of records from the source table


24 changes: 24 additions & 0 deletions mssql-plugin/src/e2e-test/java/io.cdap.plugin/MssqlClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,30 @@ public static void createTargetUniqueIdentifierTable(String targetTable, String
}
}

public static void createSourceDateTimeTable(String sourceTable, String schema) throws SQLException,
ClassNotFoundException {
try (Connection connect = getMssqlConnection(); Statement statement = connect.createStatement()) {
String dateTimeColumns = PluginPropertyUtils.pluginProp("dateTimeColumns");
String createSourceTableQuery3 = createTableQuery(sourceTable, schema, dateTimeColumns);
statement.executeUpdate(createSourceTableQuery3);

// Insert dummy data.
String dateTimeValues = PluginPropertyUtils.pluginProp("dateTimeValues");
String dateTimeColumnsList = PluginPropertyUtils.pluginProp("dateTimeColumnsList");
statement.executeUpdate(insertQuery(sourceTable, schema, dateTimeColumnsList,
dateTimeValues));
}
}

public static void createTargetDateTimeTable(String targetTable, String schema) throws SQLException,
ClassNotFoundException {
try (Connection connect = getMssqlConnection(); Statement statement = connect.createStatement()) {
String dateTimeColumns = PluginPropertyUtils.pluginProp("dateTimeColumns");
String createTargetTableQuery3 = createTableQuery(targetTable, schema, dateTimeColumns);
statement.executeUpdate(createTargetTableQuery3);
}
}

public static void deleteTables(String schema, String[] tables)
throws SQLException, ClassNotFoundException {
try (Connection connect = getMssqlConnection(); Statement statement = connect.createStatement()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,14 @@ public static void createDatatypesTablesUniqueIdentifier() throws SQLException,
PluginPropertyUtils.pluginProp("schema"));
}

@Before(order = 2, value = "@MSSQL_SOURCE_DATATYPES_DATETIME_TEST")
public static void createDatatypesTablesDateTime() throws SQLException, ClassNotFoundException {
MssqlClient.createSourceDateTimeTable(PluginPropertyUtils.pluginProp("sourceTable"),
PluginPropertyUtils.pluginProp("schema"));
MssqlClient.createTargetDateTimeTable(PluginPropertyUtils.pluginProp("targetTable"),
PluginPropertyUtils.pluginProp("schema"));
}

@After(order = 1, value = "@MSSQL_SINK_TEST")
public static void dropTables() throws SQLException, ClassNotFoundException {
MssqlClient.deleteTables(PluginPropertyUtils.pluginProp("schema"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,13 @@ imageColumnsList=(ID,COL1)
imageValues=VALUES ('User1', '0x48692054686572652120486F772061726520796F75206665656C696E6720746F646179203F')
outputDatatypesSchema2=[{"key":"ID","value":"string"},{"key":"COL1","value":"bytes"}]


uniqueIdentifierColumns=(ID VARCHAR(100) PRIMARY KEY, COL1 UNIQUEIDENTIFIER)
uniqueIdentifierColumnsList=(ID, COL1)
uniqueIdentifierValues=VALUES ('User1', '6F9619FF-8B86-D011-B42D-00C04FC964FF')
outputDatatypesSchema3=[{"key":"ID","value":"string"},{"key":"COL1","value":"string"}]

dateTimeColumns=(ID VARCHAR(100) PRIMARY KEY, COL1 DATETIME, COL2 DATETIME2(0), COL3 DATETIMEOFFSET(0))
dateTimeColumnsList=(ID, COL1, COL2, COL3)
dateTimeValues=VALUES ('User1', '2023-01-01 01:00:00.000', '2023-01-01 01:00:00.000', '2025-12-10 12:32:10 +01:00')
outputDatatypesSchema4=[{"key":"ID","value":"string"},{"key":"COL1","value":"datetime"},\
{"key":"COL2","value":"datetime"},{"key":"COL3","value":"datetime"}]
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ public boolean isFieldCompatible(Schema.Field field, ResultSetMetaData metadata,
boolean isSigned = metadata.isSigned(index);
int precision = metadata.getPrecision(index);

// Handles datetime datatypes
// Case when Timestamp maps to datetime
// Case when Datetime2 maps to datetime
// Case when DatetimeOffset maps to datetime
if ((sqlType == Types.TIMESTAMP || sqlType == SqlServerSourceSchemaReader.DATETIME_OFFSET_TYPE)
&& fieldLogicalType.equals(Schema.LogicalType.DATETIME)) {
return true;
}
// Handle logical types first
if (fieldLogicalType != null) {
return super.isFieldCompatible(fieldType, fieldLogicalType, sqlType, precision, isSigned);
Expand Down