Tested with Confluent Platform 6.1, ksqlDB 0.15, Kafka Connect JDBC connector 5.5.3
CREATE SOURCE CONNECTOR SOURCE_DATA WITH (
'connector.class' = 'io.mdrogalis.voluble.VolubleSourceConnector',
'value.converter' = 'org.apache.kafka.connect.json.JsonConverter',
'value.converter.schemas.enable' = 'false',
'key.converter' = 'org.apache.kafka.connect.storage.StringConverter',
'genkp.customers.with' = '#{Internet.uuid}',
'genv.customers.name.with' = '#{HitchhikersGuideToTheGalaxy.character}',
'genv.customers.email.with' = '#{Internet.emailAddress}',
'genv.customers.location->city.with' = '#{HitchhikersGuideToTheGalaxy.location}',
'genv.customers.location->planet.with' = '#{HitchhikersGuideToTheGalaxy.planet}',
'topic.customers.records.exactly' = 10,
'genkp.transactions.with' = '#{Internet.uuid}',
'genv.transactions.customer_id.matching' = 'customers.key',
'genv.transactions.cost.with' = '#{Commerce.price}',
'genv.transactions.card_type.with' = '#{Business.creditCardType}',
'genv.transactions.item.with' = '#{Beer.name}',
'topic.transactions.throttle.ms' = 500
);
CREATE TABLE CUSTOMER (ID VARCHAR PRIMARY KEY,
NAME VARCHAR,
EMAIL VARCHAR,
LOCATION STRUCT<CITY VARCHAR,
PLANET VARCHAR>)
WITH (KAFKA_TOPIC='customers',
VALUE_FORMAT='JSON');
CREATE STREAM TXNS (ID VARCHAR KEY,
CUSTOMER_ID VARCHAR,
COST DOUBLE,
CARD_TYPE VARCHAR,
ITEM VARCHAR)
WITH (KAFKA_TOPIC='transactions',
VALUE_FORMAT='JSON');
ksql> SELECT * FROM CUSTOMER EMIT CHANGES LIMIT 2;
+---------------------+---------------------+---------------------+---------------------+
|ID |NAME |EMAIL |LOCATION |
+---------------------+---------------------+---------------------+---------------------+
|84e2c8d1-3ac6-461b-8a|Agrajag |benny.skiles@hotmail.|{CITY=Lamuella, PLANE|
|6c-2a765be6bd82 | |com |T=Nano} |
|ee4cb838-cbf3-4cb7-8d|Pasta Fasta |peter.nikolaus@yahoo.|{CITY=Betelgeuse, PLA|
|d2-b7b3a9e9da11 | |com |NET=Krikkit} |
Limit Reached
Query terminated
ksql> SELECT * FROM TXNS EMIT CHANGES LIMIT 5;
+-----------------+-----------------+-----------------+-----------------+-----------------+
|ID |CUSTOMER_ID |COST |CARD_TYPE |ITEM |
+-----------------+-----------------+-----------------+-----------------+-----------------+
|1a14a135-d9c3-42b|84e2c8d1-3ac6-461|44.05 |visa |Alpha King Pale A|
|e-b692-9bfd58beec|b-8a6c-2a765be6bd| | |le |
|63 |82 | | | |
|a01765a8-6d1e-40d|ee4cb838-cbf3-4cb|25.62 |visa |Samuel Smith’s Im|
|8-9bc2-8d8166e020|7-8dd2-b7b3a9e9da| | |perial IPA |
|14 |11 | | | |
|d6047271-d28e-4c2|d83c7c98-c2f1-449|81.8 |jcb |Storm King Stout |
|7-bfc0-8dbf31855c|e-b80a-19c38d06b4| | | |
|03 |76 | | | |
|aaf68dbd-48d0-48e|9058ef36-3ab8-491|37.54 |visa |Nugget Nectar |
|1-a1c2-c28d9365ae|a-8142-c059b74f06| | | |
|e9 |f9 | | | |
|a4ea8cb5-b89f-48e|9058ef36-3ab8-491|27.8 |jcb |Racer 5 India Pal|
|5-a023-7ab33a55b5|a-8142-c059b74f06| | |e Ale, Bear Repub|
|9f |f9 | | |lic Bre |
Limit Reached
Query terminated
SET 'auto.offset.reset' = 'earliest';
CREATE STREAM TXNS_ENRICHED WITH (FORMAT='AVRO') AS
SELECT T.ID,
AS_VALUE(T.ID) AS TXN_ID,
T.ROWTIME AS TXN_TS,
T.ITEM,
T.COST,
T.CARD_TYPE,
C.ID AS CUST_ID,
C.NAME,
C.EMAIL,
C.LOCATION
FROM TXNS T
LEFT OUTER JOIN
CUSTOMER C
ON T.CUSTOMER_ID=C.ID
PARTITION BY T.ID;
ksql> SELECT * FROM TXNS_ENRICHED EMIT CHANGES LIMIT 2;
+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+
|T_ID |TXN_ID |TXN_TS |ITEM |COST |CARD_TYPE |CUST_ID |NAME |EMAIL |LOCATION |
+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+
|8640648b-961f-48|8640648b-961f-48|1615464205090 |Edmund Fitzgeral|87.52 |diners_club |dae12f7a-3b49-43|Vroomfondel |shaun.sauer@gmai|{CITY=Seventh Ga|
|64-82d7-5606c7cf|64-82d7-5606c7cf| |d Porter | | |4a-9b92-2e4ea087| |l.com |laxy of Light an|
|3a68 |3a68 | | | | |a2b1 | | |d Ingenuity, PLA|
| | | | | | | | | |NET=Kria} |
|07626615-d656-4f|07626615-d656-4f|1615464205468 |Nugget Nectar |7.21 |laser |c87d82e3-0f38-4e|Pizpot Gargravar|tristan.upton@ho|{CITY=Evildrome |
|0f-8e32-ed02365c|0f-8e32-ed02365c| | | | |4b-8bd3-9ec61648|r |tmail.com |Boozarama, PLANE|
|011e |011e | | | | |1813 | | |T=Xaxis} |
Limit Reached
Query terminated
CREATE SINK CONNECTOR SINK_TXNS_ENRICHED_PG WITH (
'connector.class' = 'io.confluent.connect.jdbc.JdbcSinkConnector',
'connection.url' = 'jdbc:postgresql://postgres:5432/',
'connection.user' = 'postgres',
'connection.password' = 'postgres',
'topics' = 'TXNS_ENRICHED',
'key.converter' = 'io.confluent.connect.avro.AvroConverter',
'key.converter.schema.registry.url' = 'http://schema-registry:8081',
'value.converter' = 'io.confluent.connect.avro.AvroConverter',
'value.converter.schema.registry.url' = 'http://schema-registry:8081',
'auto.create' = 'true',
'auto.evolve' = 'true'
);
This doesn’t work:
ksql> DESCRIBE CONNECTOR SINK_TXNS_ENRICHED_PG;
Name : SINK_TXNS_ENRICHED_PG
Class : io.confluent.connect.jdbc.JdbcSinkConnector
Type : sink
State : RUNNING
WorkerId : kafka-connect:8083
Task ID | State | Error Trace
---------------------------------------------------------------------------------------------------------------------------------------------
0 | FAILED | org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:561)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.ConnectException: io.confluent.ksql.avro_schemas.KsqlDataSourceSchema_LOCATION (STRUCT) type doesn't have a mapping to the SQL database column type
at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.getSqlType(GenericDatabaseDialect.java:1753)
at io.confluent.connect.jdbc.dialect.PostgreSqlDatabaseDialect.getSqlType(PostgreSqlDatabaseDialect.java:221)
at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.writeColumnSpec(GenericDatabaseDialect.java:1669)
at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.lambda$writeColumnsSpec$33(GenericDatabaseDialect.java:1658)
at io.confluent.connect.jdbc.util.ExpressionBuilder.append(ExpressionBuilder.java:558)
at io.confluent.connect.jdbc.util.ExpressionBuilder$BasicListBuilder.of(ExpressionBuilder.java:597)
at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.writeColumnsSpec(GenericDatabaseDialect.java:1660)
at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.buildCreateTableStatement(GenericDatabaseDialect.java:1583)
at io.confluent.connect.jdbc.sink.DbStructure.create(DbStructure.java:91)
at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:61)
at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:120)
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:66)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)
... 10 more
---------------------------------------------------------------------------------------------------------------------------------------------
ksql>
DROP CONNECTOR SINK_TXNS_ENRICHED_PG;
CREATE SINK CONNECTOR SINK_TXNS_ENRICHED_PG WITH (
'connector.class' = 'io.confluent.connect.jdbc.JdbcSinkConnector',
'connection.url' = 'jdbc:postgresql://postgres:5432/',
'connection.user' = 'postgres',
'connection.password' = 'postgres',
'topics' = 'TXNS_ENRICHED',
'key.converter' = 'io.confluent.connect.avro.AvroConverter',
'key.converter.schema.registry.url' = 'http://schema-registry:8081',
'value.converter' = 'io.confluent.connect.avro.AvroConverter',
'value.converter.schema.registry.url' = 'http://schema-registry:8081',
'auto.create' = 'true',
'auto.evolve' = 'true',
'table.name.format' = '${topic}',
'transforms' = 'flatten',
'transforms.flatten.type' = 'org.apache.kafka.connect.transforms.Flatten$Value'
);
✅Table is created and populated in Postgres:
docker exec -it postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB'
postgres=# \d+ "TXNS_ENRICHED"
Table "public.TXNS_ENRICHED"
Column | Type | Collation | Nullable | Default | Storage | Stats target | Description
-----------------+------------------+-----------+----------+---------+----------+--------------+-------------
TXN_ID | text | | | | extended | |
TXN_TS | bigint | | | | plain | |
ITEM | text | | | | extended | |
COST | double precision | | | | plain | |
CARD_TYPE | text | | | | extended | |
CUST_ID | text | | | | extended | |
NAME | text | | | | extended | |
EMAIL | text | | | | extended | |
LOCATION.CITY | text | | | | extended | |
LOCATION.PLANET | text | | | | extended | |
Access method: heap
postgres=# SELECT * FROM "TXNS_ENRICHED" LIMIT 2;
TXN_ID | TXN_TS | ITEM | COST | CARD_TYPE | CUST_ID | NAME | EMAIL | LOCATION.CITY | LOCATION.PLANET
--------------------------------------+---------------+--------------------------+-------+-------------+--------------------------------------+-------------------+---------------------------+---------------------------------------+-----------------
8640648b-961f-4864-82d7-5606c7cf3a68 | 1615464205090 | Edmund Fitzgerald Porter | 87.52 | diners_club | dae12f7a-3b49-434a-9b92-2e4ea087a2b1 | Vroomfondel | shaun.sauer@gmail.com | Seventh Galaxy of Light and Ingenuity | Kria
07626615-d656-4f0f-8e32-ed02365c011e | 1615464205468 | Nugget Nectar | 7.21 | laser | c87d82e3-0f38-4e4b-8bd3-9ec616481813 | Pizpot Gargravarr | tristan.upton@hotmail.com | Evildrome Boozarama | Xaxis
(2 rows)
Note that the TXN_TS
is a bigint (epoch), not an actual timestamp type.
DROP CONNECTOR SINK_TXNS_ENRICHED_PG;
CREATE SINK CONNECTOR SINK_TXNS_ENRICHED_PG WITH (
'connector.class' = 'io.confluent.connect.jdbc.JdbcSinkConnector',
'connection.url' = 'jdbc:postgresql://postgres:5432/',
'connection.user' = 'postgres',
'connection.password' = 'postgres',
'topics' = 'TXNS_ENRICHED',
'key.converter' = 'io.confluent.connect.avro.AvroConverter',
'key.converter.schema.registry.url' = 'http://schema-registry:8081',
'value.converter' = 'io.confluent.connect.avro.AvroConverter',
'value.converter.schema.registry.url' = 'http://schema-registry:8081',
'auto.create' = 'true',
'auto.evolve' = 'true',
'table.name.format' = '${topic}',
'transforms' = 'flatten,setTimestampType',
'transforms.flatten.type' = 'org.apache.kafka.connect.transforms.Flatten$Value',
'transforms.setTimestampType.type' = 'org.apache.kafka.connect.transforms.TimestampConverter$Value',
'transforms.setTimestampType.field' = 'TXN_TS',
'transforms.setTimestampType.target.type' = 'Timestamp'
);
Connector doesn’t work though (Status is WARNING
):
ksql> SHOW CONNECTORS;
Connector Name | Type | Class | Status
------------------------------------------------------------------------------------------------------------
SOURCE_DATA | SOURCE | io.mdrogalis.voluble.VolubleSourceConnector | RUNNING (1/1 tasks RUNNING)
SINK_TXNS_ENRICHED_PG | SINK | io.confluent.connect.jdbc.JdbcSinkConnector | WARNING (0/1 tasks RUNNING)
------------------------------------------------------------------------------------------------------------
Check the error:
ksql> DESCRIBE CONNECTOR SINK_TXNS_ENRICHED_PG;
Name : SINK_TXNS_ENRICHED_PG
Class : io.confluent.connect.jdbc.JdbcSinkConnector
Type : sink
State : RUNNING
WorkerId : kafka-connect:8083
Task ID | State | Error Trace
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
0 | FAILED | org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:561)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.ConnectException: java.sql.SQLException: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO "TXNS_ENRICHED"("TXN_TS","ITEM","COST","CARD_TYPE","CUST_ID","NAME","EMAIL","LOCATION.CITY","LOCATION.PLANET") VALUES('2020-03-27 23:58:30.431+00','Shakespeare Oatmeal',69.72,'mastercard','ee4cb838-cbf3-4cb7-8dd2-b7b3a9e9da11','Pasta Fasta','[email protected]','Betelgeuse','Krikkit') was aborted: ERROR: invalid input syntax for type bigint: "2020-03-27 23:58:30.431+00" Call getNextException to see other errors in the batch.
org.postgresql.util.PSQLException: ERROR: invalid input syntax for type bigint: "2020-03-27 23:58:30.431+00"
org.postgresql.util.PSQLException: ERROR: invalid input syntax for type bigint: "2020-03-27 23:58:30.431+00"
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:87)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)
... 10 more
Caused by: java.sql.SQLException: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO "TXNS_ENRICHED"("TXN_TS","ITEM","COST","CARD_TYPE","CUST_ID","NAME","EMAIL","LOCATION.CITY","LOCATION.PLANET") VALUES('2020-03-27 23:58:30.431+00','Shakespeare Oatmeal',69.72,'mastercard','ee4cb838-cbf3-4cb7-8dd2-b7b3a9e9da11','Pasta Fasta','[email protected]','Betelgeuse','Krikkit') was aborted: ERROR: invalid input syntax for type bigint: "2020-03-27 23:58:30.431+00" Call getNextException to see other errors in the batch.
org.postgresql.util.PSQLException: ERROR: invalid input syntax for type bigint: "2020-03-27 23:58:30.431+00"
org.postgresql.util.PSQLException: ERROR: invalid input syntax for type bigint: "2020-03-27 23:58:30.431+00"
... 12 more
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
ksql>
Now that the TXN_TS
is coming through as a timestamp, the Postgres INSERT
is failing because we’re trying to write it to a bigint
field:
ERROR: invalid input syntax for type bigint: "2020-03-27 23:58:30.431+00"
So here we’ll ditch the previous table, and instead populate a new one (taking advantage of table.name.format
to modify the target table name) using all of the existing data in the source Kafka topic:
DROP CONNECTOR SINK_TXNS_ENRICHED_PG;
CREATE SINK CONNECTOR SINK_TXNS_ENRICHED_PG_01 WITH (
'connector.class' = 'io.confluent.connect.jdbc.JdbcSinkConnector',
'connection.url' = 'jdbc:postgresql://postgres:5432/',
'connection.user' = 'postgres',
'connection.password' = 'postgres',
'topics' = 'TXNS_ENRICHED',
'key.converter' = 'io.confluent.connect.avro.AvroConverter',
'key.converter.schema.registry.url' = 'http://schema-registry:8081',
'value.converter' = 'io.confluent.connect.avro.AvroConverter',
'value.converter.schema.registry.url' = 'http://schema-registry:8081',
'auto.create' = 'true',
'auto.evolve' = 'true',
'table.name.format' = '${topic}-01',
'transforms' = 'flatten,setTimestampType',
'transforms.flatten.type' = 'org.apache.kafka.connect.transforms.Flatten$Value',
'transforms.setTimestampType.type' = 'org.apache.kafka.connect.transforms.TimestampConverter$Value',
'transforms.setTimestampType.field' = 'TXN_TS',
'transforms.setTimestampType.target.type' = 'Timestamp'
);
Now Postgres table is built and populated with Timestamp column:
postgres=# \d+ "TXNS_ENRICHED-01"
Table "public.TXNS_ENRICHED-01"
Column | Type | Collation | Nullable | Default | Storage | Stats target | Description
-----------------+-----------------------------+-----------+----------+---------+----------+--------------+-------------
TXN_ID | text | | | | extended | |
TXN_TS | timestamp without time zone | | | | plain | |
ITEM | text | | | | extended | |
COST | double precision | | | | plain | |
CARD_TYPE | text | | | | extended | |
CUST_ID | text | | | | extended | |
NAME | text | | | | extended | |
EMAIL | text | | | | extended | |
LOCATION.CITY | text | | | | extended | |
LOCATION.PLANET | text | | | | extended | |
Access method: heap
postgres=# SELECT * FROM "TXNS_ENRICHED-01" LIMIT 2;
TXN_ID | TXN_TS | ITEM | COST | CARD_TYPE | CUST_ID | NAME | EMAIL | LOCATION.CITY | LOCATION.PLANET
--------------------------------------+-------------------------+--------------------------+-------+-------------+--------------------------------------+-------------------+---------------------------+---------------------------------------+-----------------
8640648b-961f-4864-82d7-5606c7cf3a68 | 2021-03-11 12:03:25.09 | Edmund Fitzgerald Porter | 87.52 | diners_club | dae12f7a-3b49-434a-9b92-2e4ea087a2b1 | Vroomfondel | shaun.sauer@gmail.com | Seventh Galaxy of Light and Ingenuity | Kria
07626615-d656-4f0f-8e32-ed02365c011e | 2021-03-11 12:03:25.468 | Nugget Nectar | 7.21 | laser | c87d82e3-0f38-4e4b-8bd3-9ec616481813 | Pizpot Gargravarr | tristan.upton@hotmail.com | Evildrome Boozarama | Xaxis
(2 rows)
SET 'auto.offset.reset' = 'earliest';
CREATE TABLE BEERS_HOUR_AGG WITH (FORMAT='AVRO') AS
SELECT WINDOWSTART AS WINDOW_START_TS,
ITEM,
SUM(COST) AS TOTAL_SOLD,
COUNT(*) AS NUMBER_SOLD
FROM TXNS WINDOW TUMBLING (SIZE 1 HOUR)
GROUP BY ITEM;
-
Push query (stream of aggregate changes)
SELECT TIMESTAMPTOSTRING(WINDOW_START_TS,'yyyy-MM-dd HH:mm:ss') AS WINDOW_START_TS, ITEM, TOTAL_SOLD, NUMBER_SOLD FROM BEERS_HOUR_AGG EMIT CHANGES;
-
Pull query (aggregate current state)
SELECT TIMESTAMPTOSTRING(WINDOW_START_TS,'yyyy-MM-dd HH:mm:ss') AS WINDOW_START_TS, TOTAL_SOLD, NUMBER_SOLD FROM BEERS_HOUR_AGG WHERE ITEM='Duvel';
SET 'auto.offset.reset' = 'earliest';
CREATE TABLE CUSTOMER_SUMMARY WITH (FORMAT='AVRO') AS
SELECT CUST_ID,
NAME,
MAX(TXN_TS) AS MOST_RECENT_ORDER_TS,
COUNT(*) AS NUM_ORDERS,
COUNT_DISTINCT(ITEM) AS UNIQUE_ITEMS,
SUM(COST) AS TOTAL_COST
FROM TXNS_ENRICHED
GROUP BY CUST_ID, NAME;
CREATE SINK CONNECTOR SINK_CUSTOMER_SUMMARY_PG_01 WITH (
'connector.class' = 'io.confluent.connect.jdbc.JdbcSinkConnector',
'connection.url' = 'jdbc:postgresql://postgres:5432/',
'connection.user' = 'postgres',
'connection.password' = 'postgres',
'topics' = 'CUSTOMER_SUMMARY',
'key.converter' = 'io.confluent.connect.avro.AvroConverter',
'key.converter.schema.registry.url' = 'http://schema-registry:8081',
'value.converter' = 'io.confluent.connect.avro.AvroConverter',
'value.converter.schema.registry.url' = 'http://schema-registry:8081',
'auto.create' = 'true',
'auto.evolve' = 'true',
'table.name.format' = '${topic}-01',
'transforms' = 'setTimestampType',
'transforms.setTimestampType.type' = 'org.apache.kafka.connect.transforms.TimestampConverter$Value',
'transforms.setTimestampType.field' = 'MOST_RECENT_ORDER_TS',
'transforms.setTimestampType.target.type' = 'Timestamp'
);
You’ll notice you don’t get the key columns in the data, only the aggregate values:
postgres=# select * from "CUSTOMER_SUMMARY-01" LIMIT 5;
MOST_RECENT_ORDER_TS | NUM_ORDERS | UNIQUE_ITEMS | TOTAL_COST
-------------------------+------------+--------------+--------------------
2021-03-11 12:04:21.399 | 13 | 13 | 642.1600000000001
2021-03-11 12:04:45.403 | 19 | 16 | 1013.25
2021-03-11 12:04:48.869 | 15 | 13 | 747.2699999999999
2021-03-11 12:04:49.369 | 17 | 14 | 1117.6200000000001
2021-03-11 12:04:49.869 | 21 | 14 | 1057.1499999999999
(5 rows)
CREATE SINK CONNECTOR SINK_CUSTOMER_SUMMARY_PG_02 WITH (
'connector.class' = 'io.confluent.connect.jdbc.JdbcSinkConnector',
'connection.url' = 'jdbc:postgresql://postgres:5432/',
'connection.user' = 'postgres',
'connection.password' = 'postgres',
'topics' = 'CUSTOMER_SUMMARY',
'key.converter' = 'io.confluent.connect.avro.AvroConverter',
'key.converter.schema.registry.url' = 'http://schema-registry:8081',
'value.converter' = 'io.confluent.connect.avro.AvroConverter',
'value.converter.schema.registry.url' = 'http://schema-registry:8081',
'auto.create' = 'true',
'auto.evolve' = 'true',
'insert.mode' = 'upsert',
'pk.mode' = 'record_key',
'pk.fields' = 'CUST_ID,NAME',
'table.name.format' = '${topic}-02',
'transforms' = 'setTimestampType',
'transforms.setTimestampType.type' = 'org.apache.kafka.connect.transforms.TimestampConverter$Value',
'transforms.setTimestampType.field' = 'MOST_RECENT_ORDER_TS',
'transforms.setTimestampType.target.type' = 'Timestamp'
);
Important settings:
-
insert.mode
isupsert
(not the defaultinsert
) -
pk.mode
isrecord_key
which says we’re going to define the target table’s primary key based on field(s) from the record’s key -
pk.fields
specifies which field(s) from the record’s key we’d like to use as the PK in the target table (for a ksqlDB aggregate table this is going to be whichever columns you declared for theGROUP BY
)
Note target database table is created by the sink with a primary key:
postgres=# \d+ "CUSTOMER_SUMMARY-02"
Table "public.CUSTOMER_SUMMARY-02"
Column | Type | Collation | Nullable | Default | Storage | Stats target | Description
----------------------+-----------------------------+-----------+----------+---------+----------+--------------+-------------
MOST_RECENT_ORDER_TS | timestamp without time zone | | | | plain | |
NUM_ORDERS | bigint | | | | plain | |
UNIQUE_ITEMS | bigint | | | | plain | |
TOTAL_COST | double precision | | | | plain | |
CUST_ID | text | | not null | | extended | |
NAME | text | | not null | | extended | |
Indexes:
"CUSTOMER_SUMMARY-02_pkey" PRIMARY KEY, btree ("CUST_ID", "NAME")
Access method: heap
One row per unique key with aggregate values updated in-place:
postgres=# SELECT * FROM "CUSTOMER_SUMMARY-02" WHERE "NAME" = 'Vroomfondel' ;
MOST_RECENT_ORDER_TS | NUM_ORDERS | UNIQUE_ITEMS | TOTAL_COST | CUST_ID | NAME
-------------------------+------------+--------------+--------------------+--------------------------------------+-------------
2021-03-11 12:29:51.282 | 331 | 53 | 16004.209999999983 | dae12f7a-3b49-434a-9b92-2e4ea087a2b1 | Vroomfondel
(1 row)
postgres=# SELECT * FROM "CUSTOMER_SUMMARY-02" WHERE "NAME" = 'Vroomfondel' ;
MOST_RECENT_ORDER_TS | NUM_ORDERS | UNIQUE_ITEMS | TOTAL_COST | CUST_ID | NAME
-------------------------+------------+--------------+--------------------+--------------------------------------+-------------
2021-03-11 12:30:11.786 | 336 | 53 | 16269.619999999983 | dae12f7a-3b49-434a-9b92-2e4ea087a2b1 | Vroomfondel
(1 row)