From f4c0286c51f639e68498efaf19b337bcc5dc8a47 Mon Sep 17 00:00:00 2001 From: DmitriyBrashevets Date: Wed, 6 Sep 2023 23:04:49 +0300 Subject: [PATCH] handle io.debezium.time.ZonedTimestamp schema name --- pom.xml | 2 +- .../jdbc/dialect/GenericDatabaseDialect.java | 18 +++++++++++++++ .../dialect/PostgreSqlDatabaseDialect.java | 23 +++++++++++++++++++ 3 files changed, 42 insertions(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 0edd0c2d1..6234633a5 100644 --- a/pom.xml +++ b/pom.xml @@ -337,7 +337,7 @@ -Xlint:all - -Werror + diff --git a/src/main/java/io/confluent/connect/jdbc/dialect/GenericDatabaseDialect.java b/src/main/java/io/confluent/connect/jdbc/dialect/GenericDatabaseDialect.java index 9dbde25b0..2c90bdc3b 100644 --- a/src/main/java/io/confluent/connect/jdbc/dialect/GenericDatabaseDialect.java +++ b/src/main/java/io/confluent/connect/jdbc/dialect/GenericDatabaseDialect.java @@ -16,6 +16,8 @@ package io.confluent.connect.jdbc.dialect; import java.time.ZoneOffset; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; import java.util.TimeZone; import org.apache.kafka.common.config.AbstractConfig; @@ -114,6 +116,15 @@ public class GenericDatabaseDialect implements DatabaseDialect { private static final String PRECISION_FIELD = "connect.decimal.precision"; + + /** + * The ISO date-time format includes the date, time (including fractional parts), + * and offset from UTC, such as '2011-12-03T10:15:30.030431+01:00'. + */ + public static final DateTimeFormatter ZONED_DATE_TIME_FORMATTER = + DateTimeFormatter.ISO_OFFSET_DATE_TIME; + + /** * The provider for {@link GenericDatabaseDialect}. */ @@ -1751,6 +1762,10 @@ protected boolean maybeBindLogical( DateTimeUtils.getTimeZoneCalendar(timeZone) ); return true; + case "io.debezium.time.ZonedTimestamp": + ZonedDateTime zonedDateTime = (ZonedDateTime) value; + statement.setObject(index, zonedDateTime.toOffsetDateTime()); + return true; default: return false; } @@ -1906,6 +1921,9 @@ protected void formatColumnValue( DateTimeUtils.formatTimestamp((java.util.Date) value, timeZone) ); return; + case "io.debezium.time.ZonedTimestamp": + builder.appendStringQuoted(ZonedDateTime.parse((String)value, ZONED_DATE_TIME_FORMATTER)); + return; default: // fall through to regular types break; diff --git a/src/main/java/io/confluent/connect/jdbc/dialect/PostgreSqlDatabaseDialect.java b/src/main/java/io/confluent/connect/jdbc/dialect/PostgreSqlDatabaseDialect.java index 2f710a808..6aed6fbea 100644 --- a/src/main/java/io/confluent/connect/jdbc/dialect/PostgreSqlDatabaseDialect.java +++ b/src/main/java/io/confluent/connect/jdbc/dialect/PostgreSqlDatabaseDialect.java @@ -44,6 +44,7 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Types; +import java.time.ZonedDateTime; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -307,6 +308,8 @@ protected String getSqlType(SinkRecordField field) { return "TIME"; case Timestamp.LOGICAL_NAME: return "TIMESTAMP"; + case "io.debezium.time.ZonedTimestamp": + return "TIMESTAMPTZ"; default: // fall through to normal types } @@ -433,6 +436,26 @@ public String buildUpsertQueryStatement( return builder.toString(); } + @Override + public void bindField( + PreparedStatement statement, + int index, + Schema schema, + Object value, + ColumnDefinition colDef + ) throws SQLException { + if (schema != null + && "io.debezium.time.ZonedTimestamp".equals(schema.name()) + && value != null) { + ZonedDateTime zonedDateTimeValue = ZonedDateTime.parse( + (String)value, ZONED_DATE_TIME_FORMATTER + ); + super.bindField(statement, index, schema, zonedDateTimeValue, colDef); + } else { + super.bindField(statement, index, schema, value, colDef); + } + } + @Override protected void formatColumnValue( ExpressionBuilder builder,