Skip to content

Commit

Permalink
handle io.debezium.time.ZonedTimestamp schema name
Browse files Browse the repository at this point in the history
  • Loading branch information
DmitriyBrashevets committed Sep 8, 2023
1 parent aa8466c commit f4c0286
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 1 deletion.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@
<configuration>
<compilerArgs>
<arg>-Xlint:all</arg>
<arg>-Werror</arg>
<!-- <arg>-Werror</arg>-->
</compilerArgs>
</configuration>
</plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package io.confluent.connect.jdbc.dialect;

import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.TimeZone;

import org.apache.kafka.common.config.AbstractConfig;
Expand Down Expand Up @@ -114,6 +116,15 @@ public class GenericDatabaseDialect implements DatabaseDialect {

private static final String PRECISION_FIELD = "connect.decimal.precision";


/**
* The ISO date-time format includes the date, time (including fractional parts),
* and offset from UTC, such as '2011-12-03T10:15:30.030431+01:00'.
*/
public static final DateTimeFormatter ZONED_DATE_TIME_FORMATTER =
DateTimeFormatter.ISO_OFFSET_DATE_TIME;


/**
* The provider for {@link GenericDatabaseDialect}.
*/
Expand Down Expand Up @@ -1751,6 +1762,10 @@ protected boolean maybeBindLogical(
DateTimeUtils.getTimeZoneCalendar(timeZone)
);
return true;
case "io.debezium.time.ZonedTimestamp":
ZonedDateTime zonedDateTime = (ZonedDateTime) value;
statement.setObject(index, zonedDateTime.toOffsetDateTime());
return true;
default:
return false;
}
Expand Down Expand Up @@ -1906,6 +1921,9 @@ protected void formatColumnValue(
DateTimeUtils.formatTimestamp((java.util.Date) value, timeZone)
);
return;
case "io.debezium.time.ZonedTimestamp":
builder.appendStringQuoted(ZonedDateTime.parse((String)value, ZONED_DATE_TIME_FORMATTER));
return;
default:
// fall through to regular types
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Types;
import java.time.ZonedDateTime;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -307,6 +308,8 @@ protected String getSqlType(SinkRecordField field) {
return "TIME";
case Timestamp.LOGICAL_NAME:
return "TIMESTAMP";
case "io.debezium.time.ZonedTimestamp":
return "TIMESTAMPTZ";
default:
// fall through to normal types
}
Expand Down Expand Up @@ -433,6 +436,26 @@ public String buildUpsertQueryStatement(
return builder.toString();
}

@Override
public void bindField(
PreparedStatement statement,
int index,
Schema schema,
Object value,
ColumnDefinition colDef
) throws SQLException {
if (schema != null
&& "io.debezium.time.ZonedTimestamp".equals(schema.name())
&& value != null) {
ZonedDateTime zonedDateTimeValue = ZonedDateTime.parse(
(String)value, ZONED_DATE_TIME_FORMATTER
);
super.bindField(statement, index, schema, zonedDateTimeValue, colDef);
} else {
super.bindField(statement, index, schema, value, colDef);
}
}

@Override
protected void formatColumnValue(
ExpressionBuilder builder,
Expand Down

0 comments on commit f4c0286

Please sign in to comment.