diff --git a/kafka-questdb-connector-samples/confluent-docker-images/docker-compose.yaml b/kafka-questdb-connector-samples/confluent-docker-images/docker-compose.yaml index 2e89df6..f47daa5 100644 --- a/kafka-questdb-connector-samples/confluent-docker-images/docker-compose.yaml +++ b/kafka-questdb-connector-samples/confluent-docker-images/docker-compose.yaml @@ -65,4 +65,5 @@ services: CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter" CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter" CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false" - CONNECT_REST_ADVERTISED_HOST_NAME: "connect" \ No newline at end of file + CONNECT_REST_ADVERTISED_HOST_NAME: "connect" + OFFSET_FLUSH_INTERVAL_MS: "1000" \ No newline at end of file diff --git a/kafka-questdb-connector-samples/faker/docker-compose.yml b/kafka-questdb-connector-samples/faker/docker-compose.yml index d42df9d..b55b59a 100644 --- a/kafka-questdb-connector-samples/faker/docker-compose.yml +++ b/kafka-questdb-connector-samples/faker/docker-compose.yml @@ -58,3 +58,4 @@ services: CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1" CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1" CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1" + CONNECT_OFFSET_FLUSH_INTERVAL_MS: "1000" diff --git a/kafka-questdb-connector-samples/faker/readme.md b/kafka-questdb-connector-samples/faker/readme.md index 9e94ee4..0e5e2d6 100644 --- a/kafka-questdb-connector-samples/faker/readme.md +++ b/kafka-questdb-connector-samples/faker/readme.md @@ -12,12 +12,12 @@ The project was tested on MacOS with M1, but it should work on other platforms t ## Usage: 1. Clone this repository via `git clone https://github.com/questdb/kafka-questdb-connector.git` 2. `cd kafka-questdb-connector/kafka-questdb-connector-samples/faker/` to enter the directory with this sample. -3. Run `docker-compose build` to build a docker image with the sample project. -4. Run `docker-compose up` to start the node.js producer, Apache Kafka and QuestDB containers. +3. Run `docker compose build` to build a docker image with the sample project. +4. Run `docker compose up` to start the node.js producer, Apache Kafka and QuestDB containers. 5. The previous command will generate a lot of log messages. Eventually logging should cease. This means both Apache Kafka and QuestDB are running. The last log message should contain the following text: `Session key updated` 6. Execute the following command in shell: ```shell - $ curl -X POST -H "Content-Type: application/json" -d '{"name":"questdb-connect","config":{"topics":"People","connector.class":"io.questdb.kafka.QuestDBSinkConnector","tasks.max":"1","key.converter":"org.apache.kafka.connect.storage.StringConverter","value.converter":"org.apache.kafka.connect.json.JsonConverter","value.converter.schemas.enable":"false","host":"questdb", "timestamp.field.name": "birthday", "transforms":"convert_birthday","transforms.convert_birthday.type":"org.apache.kafka.connect.transforms.TimestampConverter$Value","transforms.convert_birthday.target.type":"Timestamp","transforms.convert_birthday.field":"birthday","transforms.convert_birthday.format": "yyyy-MM-dd'"'"'T'"'"'HH:mm:ss.SSSX"}}' localhost:8083/connectors + $ curl -X POST -H "Content-Type: application/json" -d '{"name":"questdb-connect","config":{"topics":"People","connector.class":"io.questdb.kafka.QuestDBSinkConnector","tasks.max":"1","key.converter":"org.apache.kafka.connect.storage.StringConverter","value.converter":"org.apache.kafka.connect.json.JsonConverter","value.converter.schemas.enable":"false","client.conf.string":"http::addr=questdb;", "timestamp.field.name": "birthday", "transforms":"convert_birthday","transforms.convert_birthday.type":"org.apache.kafka.connect.transforms.TimestampConverter$Value","transforms.convert_birthday.target.type":"Timestamp","transforms.convert_birthday.field":"birthday","transforms.convert_birthday.format": "yyyy-MM-dd'"'"'T'"'"'HH:mm:ss.SSSX"}}' localhost:8083/connectors ``` 7. The command above will create a new Kafka connector that will read data from the `People` topic and write it to a QuestDB table called `People`. The connector will also convert the `birthday` field to a timestamp. 8. Go to the QuestDB console running at http://localhost:19000 and run `select * from 'People';` and you should see some rows. @@ -34,7 +34,7 @@ The sample project consists of 3 components: The Kafka Connect configuration looks complex, but it's quite simple. Let's have a closer look. This is how the `curl` command looks like: ```shell -$ curl -X POST -H "Content-Type: application/json" -d '{"name":"questdb-connect","config":{"topics":"People","connector.class":"io.questdb.kafka.QuestDBSinkConnector","tasks.max":"1","key.converter":"org.apache.kafka.connect.storage.StringConverter","value.converter":"org.apache.kafka.connect.json.JsonConverter","value.converter.schemas.enable":"false","host":"questdb", "timestamp.field.name": "birthday", "transforms":"convert_birthday","transforms.convert_birthday.type":"org.apache.kafka.connect.transforms.TimestampConverter$Value","transforms.convert_birthday.target.type":"Timestamp","transforms.convert_birthday.field":"birthday","transforms.convert_birthday.format": "yyyy-MM-dd'"'"'T'"'"'HH:mm:ss.SSSX"}}' localhost:8083/connectors +$ curl -X POST -H "Content-Type: application/json" -d '{"name":"questdb-connect","config":{"topics":"People","connector.class":"io.questdb.kafka.QuestDBSinkConnector","tasks.max":"1","key.converter":"org.apache.kafka.connect.storage.StringConverter","value.converter":"org.apache.kafka.connect.json.JsonConverter","value.converter.schemas.enable":"false","client.conf.string":"http::addr=questdb;", "timestamp.field.name": "birthday", "transforms":"convert_birthday","transforms.convert_birthday.type":"org.apache.kafka.connect.transforms.TimestampConverter$Value","transforms.convert_birthday.target.type":"Timestamp","transforms.convert_birthday.field":"birthday","transforms.convert_birthday.format": "yyyy-MM-dd'"'"'T'"'"'HH:mm:ss.SSSX"}}' localhost:8083/connectors ``` It uses `curl` to submit a following JSON to Kafka Connect: @@ -48,7 +48,7 @@ It uses `curl` to submit a following JSON to Kafka Connect: "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "false", - "host": "questdb", + "client.conf.string": "http::addr=questdb;", "timestamp.field.name": "birthday", "transforms": "convert_birthday", "transforms.convert_birthday.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value", @@ -62,6 +62,6 @@ Most of the fields are self-explanatory. The `transforms` field is a bit more co The other potentially non-obvious configuration elements are: 1. `"value.converter.schemas.enable": "false"` It disables the schema support in the Kafka Connect JSON converter. The sample project doesn't use schemas. -2. `"host": "questdb"` It defines the hostname of the QuestDB instance. The hostname is defined in the [docker-compose.yml](docker-compose.yml) file. +2. `"client.conf.string": "http::addr=questdb;"` It configures QuestDB client to use the HTTP transport and connect to a hostname `questdb`. The hostname is defined in the [docker-compose.yml](docker-compose.yml) file. 3. `"timestamp.field.name": "birthday"` It defines the name of the field that should be used as a timestamp. It uses the field that was converted by the Kafka Connect transformation described above. 4. The ugly value in `"transforms.convert_birthday.format": "yyyy-MM-dd'"'"'T'"'"'HH:mm:ss.SSSX"`. This part looks funny: `'"'"'T'"'"'`. In fact, it's a way to submit an apostrophe via shell which uses apostrophes to define strings. The apostrophe is required to escape the `T` character in the date format. The date format is `yyyy-MM-dd'T'HH:mm:ss.SSSX`. If you know a better way to submit the same JSON then please [open a new issue](https://github.com/questdb/kafka-questdb-connector/issues/new). \ No newline at end of file diff --git a/kafka-questdb-connector-samples/stocks/docker-compose.yml b/kafka-questdb-connector-samples/stocks/docker-compose.yml index 422cf3b..b52a2b1 100644 --- a/kafka-questdb-connector-samples/stocks/docker-compose.yml +++ b/kafka-questdb-connector-samples/stocks/docker-compose.yml @@ -87,3 +87,4 @@ services: CONFIG_STORAGE_TOPIC: "debezium_connect_config" OFFSET_STORAGE_TOPIC: "debezium_connect_offsets" STATUS_STORAGE_TOPIC: "debezium_connect_status" + OFFSET_FLUSH_INTERVAL_MS: "1000" diff --git a/kafka-questdb-connector-samples/stocks/readme.md b/kafka-questdb-connector-samples/stocks/readme.md index d795ae3..5121401 100644 --- a/kafka-questdb-connector-samples/stocks/readme.md +++ b/kafka-questdb-connector-samples/stocks/readme.md @@ -29,7 +29,7 @@ Bear in mind the sample starts multiple containers. It's running fine on my mach It starts the Debezium connector that will capture changes from Postgres and feed them to Kafka. 8. Execute following command to start QuestDB Kafka Connect sink: ```shell - curl -X POST -H "Content-Type: application/json" -d '{"name":"questdb-connect","config":{"topics":"dbserver1.public.stock","table":"stock", "connector.class":"io.questdb.kafka.QuestDBSinkConnector","tasks.max":"1","key.converter":"org.apache.kafka.connect.storage.StringConverter","value.converter":"org.apache.kafka.connect.json.JsonConverter","host":"questdb", "transforms":"unwrap", "transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState", "include.key": "false", "symbols": "symbol", "timestamp.field.name": "last_update"}}' localhost:8083/connectors + curl -X POST -H "Content-Type: application/json" -d '{"name":"questdb-connect","config":{"topics":"dbserver1.public.stock","table":"stock", "connector.class":"io.questdb.kafka.QuestDBSinkConnector","tasks.max":"1","key.converter":"org.apache.kafka.connect.storage.StringConverter","value.converter":"org.apache.kafka.connect.json.JsonConverter","client.conf.string":"http::addr=questdb;auto_flush_interval=1000000;", "transforms":"unwrap", "transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState", "include.key": "false", "symbols": "symbol", "timestamp.field.name": "last_update"}}' localhost:8083/connectors ``` It starts the QuestDB Kafka Connect sink that will read changes from Kafka and write them to QuestDB. 9. Go to QuestDB Web Console running at http://localhost:19000/ and execute following query: @@ -129,7 +129,7 @@ Most of the fields are self-explanatory. The only non-obvious one is `database.s ### Kafka QuestDB connector The Kafka QuestDB connector re-uses the same Kafka Connect runtime as the Debezium connector. It's also started using `curl` command. This is how we started the QuestDB connector: ```shell -curl -X POST -H "Content-Type: application/json" -d '{"name":"questdb-connect","config":{"topics":"dbserver1.public.stock","table":"stock", "connector.class":"io.questdb.kafka.QuestDBSinkConnector","tasks.max":"1","key.converter":"org.apache.kafka.connect.storage.StringConverter","value.converter":"org.apache.kafka.connect.json.JsonConverter","host":"questdb", "transforms":"unwrap", "transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState", "include.key": "false", "symbols": "symbol", "timestamp.field.name": "last_update"}}' localhost:8083/connectors +curl -X POST -H "Content-Type: application/json" -d '{"name":"questdb-connect","config":{"topics":"dbserver1.public.stock","table":"stock", "connector.class":"io.questdb.kafka.QuestDBSinkConnector","tasks.max":"1","key.converter":"org.apache.kafka.connect.storage.StringConverter","value.converter":"org.apache.kafka.connect.json.JsonConverter","client.conf.string":"http::addr=questdb;auto_flush_interval=1000000;", "transforms":"unwrap", "transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState", "include.key": "false", "symbols": "symbol", "timestamp.field.name": "last_update"}}' localhost:8083/connectors ``` This is the connector JSON configuration nicely formatted: ```json @@ -142,7 +142,7 @@ This is the connector JSON configuration nicely formatted: "tasks.max": "1", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", - "host": "questdb", + "client.conf.string": "http::addr=questdb;auto_flush_interval=1000000;", "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", "include.key": "false", @@ -153,8 +153,9 @@ This is the connector JSON configuration nicely formatted: ``` Again, most of the fields are obvious. Let's focus on the non-obvious ones. 1. `"symbols": "symbol"` this instruct to connector to use the [QuestDB symbol type](https://questdb.io/docs/concept/symbol/) for a column named "symbols". This column has low cardinality thus it's a good candidate for symbol type. -2. `"timestamp.field.name": "last_update"` this instructs the connector to use the `last_update` column as the [designated timestamp](https://questdb.io/docs/concept/designated-timestamp/) column. -3. `"transforms":"unwrap"` and `"transforms.unwrap.type"` this instructs the connector to use Debezium's ExtractNewRecordState. +2. `"client.conf.string": "http::addr=questdb;auto_flush_interval=1000000;"` this configures the QuestDB client to use the HTTP transport and connect to a hostname `questdb`. The hostname is defined in the [docker-compose.yml](docker-compose.yml) file. The `auto_flush_interval=1000000` controls frequency of automatic flushing data to QuestDB. It's set to 1000 seconds which effectively disables automatic interval-based flushing. +3. `"timestamp.field.name": "last_update"` this instructs the connector to use the `last_update` column as the [designated timestamp](https://questdb.io/docs/concept/designated-timestamp/) column. +4. `"transforms":"unwrap"` and `"transforms.unwrap.type"` this instructs the connector to use Debezium's ExtractNewRecordState. Let's focus on the ExtractNewRecordState transform a bit more. Why is it needed at all? For every change in the Postgres table the Debezium emits a JSON message to a Kafka topic. Messages look like this: ```json