Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PostGIS support using Debezium libaries #1413

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ Documentation for this connector can be found [here](http://docs.confluent.io/cu
To build a development version you'll need a recent version of Kafka as well as a set of upstream Confluent projects, which you'll have to build from their appropriate snapshot branch. See the [FAQ](https://github.com/confluentinc/kafka-connect-jdbc/wiki/FAQ)
for guidance on this process.

You can build kafka-connect-jdbc with Maven using the standard lifecycle phases.
You can build kafka-connect-jdbc with Maven and Java 11 using the standard lifecycle phases.

# FAQ

Expand Down
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
<oracle.jdbc.driver.version>19.7.0.0</oracle.jdbc.driver.version>
<mssqlserver.jdbc.driver.version>8.4.1.jre8</mssqlserver.jdbc.driver.version>
<postgresql.version>42.4.4</postgresql.version>
<debezium.version>2.6.0.Final</debezium.version>
<jtds.driver.version>1.3.1</jtds.driver.version>
<slf4j.version>1.7.36</slf4j.version>
<reload4j.version>1.2.19</reload4j.version>
Expand Down Expand Up @@ -190,6 +191,11 @@
<version>${jtds.driver.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>
<version>${debezium.version}</version>
</dependency>

<dependency>
<groupId>junit</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,19 @@
import io.confluent.connect.jdbc.util.QuoteMethod;
import io.confluent.connect.jdbc.util.TableDefinition;
import io.confluent.connect.jdbc.util.TableId;
import io.debezium.data.geometry.Geography;
import io.debezium.data.geometry.Geometry;
import io.debezium.data.geometry.Point;
import io.debezium.time.ZonedTime;
import io.debezium.time.ZonedTimestamp;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.data.Date;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Schema.Type;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.data.Time;
import org.apache.kafka.connect.data.Timestamp;
import org.apache.kafka.connect.errors.DataException;
Expand Down Expand Up @@ -78,6 +84,7 @@ public DatabaseDialect create(AbstractConfig config) {
static final String JSON_TYPE_NAME = "json";
static final String JSONB_TYPE_NAME = "jsonb";
static final String UUID_TYPE_NAME = "uuid";
static final String PRECISION_PARAMETER_KEY = "connect.decimal.precision";

/**
* Define the PG datatypes that require casting upon insert/update statements.
Expand Down Expand Up @@ -307,6 +314,16 @@ protected String getSqlType(SinkRecordField field) {
return "TIME";
case Timestamp.LOGICAL_NAME:
return "TIMESTAMP";
case ZonedTime.SCHEMA_NAME:
return "TIMETZ";
case ZonedTimestamp.SCHEMA_NAME:
return "TIMESTAMPTZ";
case Geometry.LOGICAL_NAME:
return "GEOMETRY";
case Geography.LOGICAL_NAME:
return "GEOGRAPHY";
case Point.LOGICAL_NAME:
return "GEOMETRY (POINT)";
default:
// fall through to normal types
}
Expand Down Expand Up @@ -518,6 +535,10 @@ protected boolean maybeBindPrimitive(
default:
break;
}

if (maybeBindPostgresDataType(statement, index, schema, value)) {
return true;
}
return super.maybeBindPrimitive(statement, index, schema, value);
}

Expand Down Expand Up @@ -580,6 +601,27 @@ protected String valueTypeCast(TableDefinition tableDefn, ColumnId columnId) {
return "";
}

private boolean maybeBindPostgresDataType(
PreparedStatement statement, int index, Schema schema, Object value) throws SQLException {
if (schema.name() != null) {
switch (schema.name()) {
case ZonedTime.SCHEMA_NAME:
case ZonedTimestamp.SCHEMA_NAME:
statement.setObject(index, value, Types.OTHER);
return true;
case Geometry.LOGICAL_NAME:
case Geography.LOGICAL_NAME:
case Point.LOGICAL_NAME:
byte[] wkb = ((Struct) value).getBytes(Geometry.WKB_FIELD);
statement.setBytes(index, wkb);
return true;
default:
return false;
}
}
return false;
}

@Override
protected int decimalScale(ColumnDefinition defn) {
if (defn.scale() == NUMERIC_TYPE_SCALE_UNSET) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@
import io.confluent.connect.jdbc.util.ColumnDefinition.Nullability;
import java.sql.Types;
import java.time.ZoneOffset;
import io.debezium.data.geometry.Geography;
import io.debezium.data.geometry.Geometry;
import io.debezium.data.geometry.Point;
import io.debezium.time.ZonedTime;
import io.debezium.time.ZonedTimestamp;
import org.apache.kafka.connect.data.Date;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Field;
Expand Down Expand Up @@ -62,10 +67,7 @@
import static junit.framework.TestCase.assertNotNull;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.*;

public abstract class BaseDialectTest<T extends GenericDatabaseDialect> {

Expand Down Expand Up @@ -484,7 +486,12 @@ public void bindFieldNull() throws SQLException {
Decimal.schema(0),
Date.SCHEMA,
Time.SCHEMA,
Timestamp.SCHEMA
Timestamp.SCHEMA,
ZonedTime.schema(),
ZonedTimestamp.schema(),
Geometry.schema(),
Geography.schema(),
Point.schema()
);
int index = 0;
for (Schema schema : nullableTypes) {
Expand Down Expand Up @@ -541,6 +548,15 @@ protected PreparedStatement verifyBindField(int index, Schema schema, Object val
case Timestamp.LOGICAL_NAME:
when(colDef.type()).thenReturn(Types.TIMESTAMP);
break;
case ZonedTime.SCHEMA_NAME:
case ZonedTimestamp.SCHEMA_NAME:
when(colDef.type()).thenReturn(Types.OTHER);
break;
case Geometry.LOGICAL_NAME:
case Geography.LOGICAL_NAME:
case Point.LOGICAL_NAME:
when(colDef.type()).thenReturn(Types.BLOB);
break;
default:
when(colDef.type()).thenThrow(
new UnsupportedOperationException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,23 @@
import io.confluent.connect.jdbc.util.TableDefinitionBuilder;
import io.confluent.connect.jdbc.util.TableId;

import io.debezium.data.geometry.Geography;
import io.debezium.data.geometry.Geometry;
import io.debezium.data.geometry.Point;
import io.debezium.time.ZonedTime;
import io.debezium.time.ZonedTimestamp;
import org.apache.kafka.connect.data.Date;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Schema.Type;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.data.Time;
import org.apache.kafka.connect.data.Timestamp;
import org.junit.Test;

import java.sql.Connection;
import java.io.IOException;
import java.sql.JDBCType;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
Expand All @@ -45,12 +52,23 @@
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;

import javax.xml.bind.DatatypeConverter;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;



public class PostgreSqlDatabaseDialectTest extends BaseDialectTest<PostgreSqlDatabaseDialect> {

// 'SRID=3187;POINT(174.9479 -36.7208)'::postgis.geometry
private static final Struct GEOMETRY_VALUE = Geometry.createValue(Geometry.schema(), DatatypeConverter.parseHexBinary("0101000020730C00001C7C613255DE6540787AA52C435C42C0"), 3187);

// 'MULTILINESTRING((169.1321 -44.7032, 167.8974 -44.6414))'::postgis.geography
private static final Struct GEOGRAPHY_VALUE = Geography.createValue(Geography.schema(), DatatypeConverter.parseHexBinary("0105000020E610000001000000010200000002000000A779C7293A2465400B462575025A46C0C66D3480B7FC6440C3D32B65195246C0"),4326);

private static final Struct POINT_VALUE = Point.createValue(Point.builder().build(), 1, 1);

@Override
protected PostgreSqlDatabaseDialect createDialect() {
return new PostgreSqlDatabaseDialect(sourceConfigWithUrl("jdbc:postgresql://something"));
Expand Down Expand Up @@ -99,6 +117,12 @@ public void shouldMapDataTypesForAddingColumnToTable() {
verifyDataTypeMapping("DATE", Date.SCHEMA);
verifyDataTypeMapping("TIME", Time.SCHEMA);
verifyDataTypeMapping("TIMESTAMP", Timestamp.SCHEMA);
verifyDataTypeMapping("TIMETZ", ZonedTime.schema());
verifyDataTypeMapping("TIMESTAMPTZ", ZonedTimestamp.schema());
verifyDataTypeMapping("GEOMETRY", Geometry.schema());
// Geography is also derived from Geometry
verifyDataTypeMapping("GEOMETRY", Geography.schema());
verifyDataTypeMapping("GEOMETRY", Point.schema());
}

@Test
Expand All @@ -116,6 +140,13 @@ public void shouldMapTimestampSchemaTypeToTimestampSqlType() {
assertTimestampMapping("TIMESTAMP");
}

@Test
public void shouldMapGeometryTypeToPostGisTypes() {
assertMapping("GEOMETRY", Geometry.schema());
assertMapping("GEOMETRY", Geography.schema());
assertMapping("GEOMETRY", Point.schema());
}

@Test
public void shouldBuildCreateQueryStatement() {
assertEquals(
Expand Down Expand Up @@ -426,6 +457,30 @@ public void shouldSanitizeUrlWithCredentialsInUrlProperties() {
public void bindFieldArrayUnsupported() throws SQLException {
// Overridden simply to dummy out the test.
}

@Test
public void bindFieldZonedTimeValue() throws SQLException {
int index = ThreadLocalRandom.current().nextInt();
String value = "10:15:30+01:00";
super.verifyBindField(++index, ZonedTime.schema(), value).setObject(index, value, Types.OTHER);
}

@Test
public void bindFieldZonedTimestampValue() throws SQLException {
int index = ThreadLocalRandom.current().nextInt();
String value = "2021-05-01T18:00:00.030431+02:00";
super.verifyBindField(++index, ZonedTimestamp.schema(), value).setObject(index, value, Types.OTHER);
}



@Test
public void bindFieldPostGisValues() throws SQLException, IOException {
int index = ThreadLocalRandom.current().nextInt();
super.verifyBindField(++index, Geometry.schema(), GEOMETRY_VALUE).setBytes(index, GEOMETRY_VALUE.getBytes(Geometry.WKB_FIELD));
super.verifyBindField(++index, Geography.schema(), GEOGRAPHY_VALUE).setBytes(index, GEOGRAPHY_VALUE.getBytes(Geometry.WKB_FIELD));
super.verifyBindField(++index, Geometry.schema(), POINT_VALUE).setBytes(index, POINT_VALUE.getBytes(Geometry.WKB_FIELD));
}

@Test
public void bindFieldPrimitiveValues() throws SQLException {
Expand Down