From 04cb41c12425d8bbea65bf4f5de1dc6bbecebbc2 Mon Sep 17 00:00:00 2001 From: "Benjamin.Berhault@lausanne.ch" Date: Wed, 22 May 2024 09:22:58 +0200 Subject: [PATCH 1/3] PostGIS support using Debezium libaries --- .../dialect/PostgreSqlDatabaseDialect.java | 42 +++++++++++++++ .../connect/jdbc/dialect/BaseDialectTest.java | 21 +++++++- .../PostgreSqlDatabaseDialectTest.java | 54 +++++++++++++++++++ 3 files changed, 116 insertions(+), 1 deletion(-) diff --git a/src/main/java/io/confluent/connect/jdbc/dialect/PostgreSqlDatabaseDialect.java b/src/main/java/io/confluent/connect/jdbc/dialect/PostgreSqlDatabaseDialect.java index 2f710a808..fecbbf83b 100644 --- a/src/main/java/io/confluent/connect/jdbc/dialect/PostgreSqlDatabaseDialect.java +++ b/src/main/java/io/confluent/connect/jdbc/dialect/PostgreSqlDatabaseDialect.java @@ -26,6 +26,11 @@ import io.confluent.connect.jdbc.util.QuoteMethod; import io.confluent.connect.jdbc.util.TableDefinition; import io.confluent.connect.jdbc.util.TableId; +import io.debezium.data.geometry.Geography; +import io.debezium.data.geometry.Geometry; +import io.debezium.data.geometry.Point; +import io.debezium.time.ZonedTime; +import io.debezium.time.ZonedTimestamp; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.data.Date; @@ -33,6 +38,7 @@ import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Schema.Type; import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.data.Time; import org.apache.kafka.connect.data.Timestamp; import org.apache.kafka.connect.errors.DataException; @@ -78,6 +84,7 @@ public DatabaseDialect create(AbstractConfig config) { static final String JSON_TYPE_NAME = "json"; static final String JSONB_TYPE_NAME = "jsonb"; static final String UUID_TYPE_NAME = "uuid"; + static final String PRECISION_PARAMETER_KEY = "connect.decimal.precision"; /** * Define the PG datatypes that require casting upon insert/update statements. @@ -307,6 +314,16 @@ protected String getSqlType(SinkRecordField field) { return "TIME"; case Timestamp.LOGICAL_NAME: return "TIMESTAMP"; + case ZonedTime.SCHEMA_NAME: + return "TIMETZ"; + case ZonedTimestamp.SCHEMA_NAME: + return "TIMESTAMPTZ"; + case Geometry.LOGICAL_NAME: + return "GEOMETRY"; + case Geography.LOGICAL_NAME: + return "GEOGRAPHY"; + case Point.LOGICAL_NAME: + return "GEOMETRY (POINT)"; default: // fall through to normal types } @@ -518,6 +535,10 @@ protected boolean maybeBindPrimitive( default: break; } + + if (maybeBindPostgresDataType(statement, index, schema, value)) { + return true; + } return super.maybeBindPrimitive(statement, index, schema, value); } @@ -580,6 +601,27 @@ protected String valueTypeCast(TableDefinition tableDefn, ColumnId columnId) { return ""; } + private boolean maybeBindPostgresDataType( + PreparedStatement statement, int index, Schema schema, Object value) throws SQLException { + if (schema.name() != null) { + switch (schema.name()) { + case ZonedTime.SCHEMA_NAME: + case ZonedTimestamp.SCHEMA_NAME: + statement.setObject(index, value, Types.OTHER); + return true; + case Geometry.LOGICAL_NAME: + case Geography.LOGICAL_NAME: + case Point.LOGICAL_NAME: + byte[] wkb = ((Struct) value).getBytes(Geometry.WKB_FIELD); + statement.setBytes(index, wkb); + return true; + default: + return false; + } + } + return false; + } + @Override protected int decimalScale(ColumnDefinition defn) { if (defn.scale() == NUMERIC_TYPE_SCALE_UNSET) { diff --git a/src/test/java/io/confluent/connect/jdbc/dialect/BaseDialectTest.java b/src/test/java/io/confluent/connect/jdbc/dialect/BaseDialectTest.java index 50354c50d..94e8e7e1d 100644 --- a/src/test/java/io/confluent/connect/jdbc/dialect/BaseDialectTest.java +++ b/src/test/java/io/confluent/connect/jdbc/dialect/BaseDialectTest.java @@ -21,6 +21,11 @@ import io.confluent.connect.jdbc.util.ColumnDefinition.Nullability; import java.sql.Types; import java.time.ZoneOffset; +import io.debezium.data.geometry.Geography; +import io.debezium.data.geometry.Geometry; +import io.debezium.data.geometry.Point; +import io.debezium.time.ZonedTime; +import io.debezium.time.ZonedTimestamp; import org.apache.kafka.connect.data.Date; import org.apache.kafka.connect.data.Decimal; import org.apache.kafka.connect.data.Field; @@ -484,7 +489,12 @@ public void bindFieldNull() throws SQLException { Decimal.schema(0), Date.SCHEMA, Time.SCHEMA, - Timestamp.SCHEMA + Timestamp.SCHEMA, + ZonedTime.schema(), + ZonedTimestamp.schema(), + Geometry.schema(), + Geography.schema(), + Point.schema() ); int index = 0; for (Schema schema : nullableTypes) { @@ -541,6 +551,15 @@ protected PreparedStatement verifyBindField(int index, Schema schema, Object val case Timestamp.LOGICAL_NAME: when(colDef.type()).thenReturn(Types.TIMESTAMP); break; + case ZonedTime.SCHEMA_NAME: + case ZonedTimestamp.SCHEMA_NAME: + when(colDef.type()).thenReturn(Types.OTHER); + break; + case Geometry.LOGICAL_NAME: + case Geography.LOGICAL_NAME: + case Point.LOGICAL_NAME: + when(colDef.type()).thenReturn(Types.BLOB); + break; default: when(colDef.type()).thenThrow( new UnsupportedOperationException( diff --git a/src/test/java/io/confluent/connect/jdbc/dialect/PostgreSqlDatabaseDialectTest.java b/src/test/java/io/confluent/connect/jdbc/dialect/PostgreSqlDatabaseDialectTest.java index c8e993b4e..2de15e48c 100644 --- a/src/test/java/io/confluent/connect/jdbc/dialect/PostgreSqlDatabaseDialectTest.java +++ b/src/test/java/io/confluent/connect/jdbc/dialect/PostgreSqlDatabaseDialectTest.java @@ -22,6 +22,11 @@ import io.confluent.connect.jdbc.util.TableDefinitionBuilder; import io.confluent.connect.jdbc.util.TableId; +import io.debezium.data.geometry.Geography; +import io.debezium.data.geometry.Geometry; +import io.debezium.data.geometry.Point; +import io.debezium.time.ZonedTime; +import io.debezium.time.ZonedTimestamp; import org.apache.kafka.connect.data.Date; import org.apache.kafka.connect.data.Decimal; import org.apache.kafka.connect.data.Schema; @@ -32,6 +37,7 @@ import org.junit.Test; import java.sql.Connection; +import java.io.IOException; import java.sql.JDBCType; import java.sql.PreparedStatement; import java.sql.ResultSet; @@ -45,12 +51,23 @@ import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; +import javax.xml.bind.DatatypeConverter; import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; + + public class PostgreSqlDatabaseDialectTest extends BaseDialectTest { + // 'SRID=3187;POINT(174.9479 -36.7208)'::postgis.geometry + private static final Struct GEOMETRY_VALUE = Geometry.createValue(Geometry.schema(), DatatypeConverter.parseHexBinary("0101000020730C00001C7C613255DE6540787AA52C435C42C0"), 3187); + + // 'MULTILINESTRING((169.1321 -44.7032, 167.8974 -44.6414))'::postgis.geography + private static final Struct GEOGRAPHY_VALUE = Geography.createValue(Geography.schema(), DatatypeConverter.parseHexBinary("0105000020E610000001000000010200000002000000A779C7293A2465400B462575025A46C0C66D3480B7FC6440C3D32B65195246C0"),4326); + + private static final Struct POINT_VALUE = Point.createValue(Point.builder().build(), 1, 1); + @Override protected PostgreSqlDatabaseDialect createDialect() { return new PostgreSqlDatabaseDialect(sourceConfigWithUrl("jdbc:postgresql://something")); @@ -99,6 +116,12 @@ public void shouldMapDataTypesForAddingColumnToTable() { verifyDataTypeMapping("DATE", Date.SCHEMA); verifyDataTypeMapping("TIME", Time.SCHEMA); verifyDataTypeMapping("TIMESTAMP", Timestamp.SCHEMA); + verifyDataTypeMapping("TIMETZ", ZonedTime.schema()); + verifyDataTypeMapping("TIMESTAMPTZ", ZonedTimestamp.schema()); + verifyDataTypeMapping("GEOMETRY", Geometry.schema()); + // Geography is also derived from Geometry + verifyDataTypeMapping("GEOMETRY", Geography.schema()); + verifyDataTypeMapping("GEOMETRY", Point.schema()); } @Test @@ -116,6 +139,13 @@ public void shouldMapTimestampSchemaTypeToTimestampSqlType() { assertTimestampMapping("TIMESTAMP"); } + @Test + public void shouldMapGeometryTypeToPostGisTypes() { + assertMapping("GEOMETRY", Geometry.schema()); + assertMapping("GEOMETRY", Geography.schema()); + assertMapping("GEOMETRY", Point.schema()); + } + @Test public void shouldBuildCreateQueryStatement() { assertEquals( @@ -426,6 +456,30 @@ public void shouldSanitizeUrlWithCredentialsInUrlProperties() { public void bindFieldArrayUnsupported() throws SQLException { // Overridden simply to dummy out the test. } + + @Test + public void bindFieldZonedTimeValue() throws SQLException { + int index = ThreadLocalRandom.current().nextInt(); + String value = "10:15:30+01:00"; + super.verifyBindField(++index, ZonedTime.schema(), value).setObject(index, value, Types.OTHER); + } + + @Test + public void bindFieldZonedTimestampValue() throws SQLException { + int index = ThreadLocalRandom.current().nextInt(); + String value = "2021-05-01T18:00:00.030431+02:00"; + super.verifyBindField(++index, ZonedTimestamp.schema(), value).setObject(index, value, Types.OTHER); + } + + + + @Test + public void bindFieldPostGisValues() throws SQLException, IOException { + int index = ThreadLocalRandom.current().nextInt(); + super.verifyBindField(++index, Geometry.schema(), GEOMETRY_VALUE).setBytes(index, GEOMETRY_VALUE.getBytes(Geometry.WKB_FIELD)); + super.verifyBindField(++index, Geography.schema(), GEOGRAPHY_VALUE).setBytes(index, GEOGRAPHY_VALUE.getBytes(Geometry.WKB_FIELD)); + super.verifyBindField(++index, Geometry.schema(), POINT_VALUE).setBytes(index, POINT_VALUE.getBytes(Geometry.WKB_FIELD)); + } @Test public void bindFieldPrimitiveValues() throws SQLException { From eb0ad961bb01157e0bbb88d0dc4b02a3f2b9a651 Mon Sep 17 00:00:00 2001 From: "Benjamin.Berhault@lausanne.ch" Date: Wed, 22 May 2024 11:35:34 +0200 Subject: [PATCH 2/3] PostGIS support pom.xml update --- pom.xml | 6 ++++++ .../io/confluent/connect/jdbc/dialect/BaseDialectTest.java | 5 +---- .../connect/jdbc/dialect/PostgreSqlDatabaseDialectTest.java | 1 + 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/pom.xml b/pom.xml index 3390268cd..f02ff6e39 100644 --- a/pom.xml +++ b/pom.xml @@ -60,6 +60,7 @@ 19.7.0.0 8.4.1.jre8 42.4.4 + 2.6.0.Final 1.3.1 1.7.36 1.2.19 @@ -190,6 +191,11 @@ ${jtds.driver.version} runtime + + io.debezium + debezium-core + ${debezium.version} + junit diff --git a/src/test/java/io/confluent/connect/jdbc/dialect/BaseDialectTest.java b/src/test/java/io/confluent/connect/jdbc/dialect/BaseDialectTest.java index 94e8e7e1d..b83869ab8 100644 --- a/src/test/java/io/confluent/connect/jdbc/dialect/BaseDialectTest.java +++ b/src/test/java/io/confluent/connect/jdbc/dialect/BaseDialectTest.java @@ -67,10 +67,7 @@ import static junit.framework.TestCase.assertNotNull; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; public abstract class BaseDialectTest { diff --git a/src/test/java/io/confluent/connect/jdbc/dialect/PostgreSqlDatabaseDialectTest.java b/src/test/java/io/confluent/connect/jdbc/dialect/PostgreSqlDatabaseDialectTest.java index 2de15e48c..f2fdae170 100644 --- a/src/test/java/io/confluent/connect/jdbc/dialect/PostgreSqlDatabaseDialectTest.java +++ b/src/test/java/io/confluent/connect/jdbc/dialect/PostgreSqlDatabaseDialectTest.java @@ -32,6 +32,7 @@ import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Schema.Type; import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.data.Time; import org.apache.kafka.connect.data.Timestamp; import org.junit.Test; From f6ae3efa07d0c48843617c06efe6d0d5c947b237 Mon Sep 17 00:00:00 2001 From: "Benjamin.Berhault@lausanne.ch" Date: Wed, 22 May 2024 11:38:37 +0200 Subject: [PATCH 3/3] README.md Java version required --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 71d80a2a6..6817d25e6 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,7 @@ Documentation for this connector can be found [here](http://docs.confluent.io/cu To build a development version you'll need a recent version of Kafka as well as a set of upstream Confluent projects, which you'll have to build from their appropriate snapshot branch. See the [FAQ](https://github.com/confluentinc/kafka-connect-jdbc/wiki/FAQ) for guidance on this process. -You can build kafka-connect-jdbc with Maven using the standard lifecycle phases. +You can build kafka-connect-jdbc with Maven and Java 11 using the standard lifecycle phases. # FAQ