Skip to content

Commit

Permalink
chore: update code samples
Browse files Browse the repository at this point in the history
except the confluent sample where the new connector must
be uploaded to Confluent hub first
  • Loading branch information
jerrinot committed Apr 6, 2024
1 parent f0350f5 commit 01bb74a
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,5 @@ services:
CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
CONNECT_REST_ADVERTISED_HOST_NAME: "connect"
CONNECT_REST_ADVERTISED_HOST_NAME: "connect"
OFFSET_FLUSH_INTERVAL_MS: "1000"
1 change: 1 addition & 0 deletions kafka-questdb-connector-samples/faker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,4 @@ services:
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_OFFSET_FLUSH_INTERVAL_MS: "1000"
12 changes: 6 additions & 6 deletions kafka-questdb-connector-samples/faker/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ The project was tested on MacOS with M1, but it should work on other platforms t
## Usage:
1. Clone this repository via `git clone https://github.com/questdb/kafka-questdb-connector.git`
2. `cd kafka-questdb-connector/kafka-questdb-connector-samples/faker/` to enter the directory with this sample.
3. Run `docker-compose build` to build a docker image with the sample project.
4. Run `docker-compose up` to start the node.js producer, Apache Kafka and QuestDB containers.
3. Run `docker compose build` to build a docker image with the sample project.
4. Run `docker compose up` to start the node.js producer, Apache Kafka and QuestDB containers.
5. The previous command will generate a lot of log messages. Eventually logging should cease. This means both Apache Kafka and QuestDB are running. The last log message should contain the following text: `Session key updated`
6. Execute the following command in shell:
```shell
$ curl -X POST -H "Content-Type: application/json" -d '{"name":"questdb-connect","config":{"topics":"People","connector.class":"io.questdb.kafka.QuestDBSinkConnector","tasks.max":"1","key.converter":"org.apache.kafka.connect.storage.StringConverter","value.converter":"org.apache.kafka.connect.json.JsonConverter","value.converter.schemas.enable":"false","host":"questdb", "timestamp.field.name": "birthday", "transforms":"convert_birthday","transforms.convert_birthday.type":"org.apache.kafka.connect.transforms.TimestampConverter$Value","transforms.convert_birthday.target.type":"Timestamp","transforms.convert_birthday.field":"birthday","transforms.convert_birthday.format": "yyyy-MM-dd'"'"'T'"'"'HH:mm:ss.SSSX"}}' localhost:8083/connectors
$ curl -X POST -H "Content-Type: application/json" -d '{"name":"questdb-connect","config":{"topics":"People","connector.class":"io.questdb.kafka.QuestDBSinkConnector","tasks.max":"1","key.converter":"org.apache.kafka.connect.storage.StringConverter","value.converter":"org.apache.kafka.connect.json.JsonConverter","value.converter.schemas.enable":"false","client.conf.string":"http::addr=questdb;", "timestamp.field.name": "birthday", "transforms":"convert_birthday","transforms.convert_birthday.type":"org.apache.kafka.connect.transforms.TimestampConverter$Value","transforms.convert_birthday.target.type":"Timestamp","transforms.convert_birthday.field":"birthday","transforms.convert_birthday.format": "yyyy-MM-dd'"'"'T'"'"'HH:mm:ss.SSSX"}}' localhost:8083/connectors
```
7. The command above will create a new Kafka connector that will read data from the `People` topic and write it to a QuestDB table called `People`. The connector will also convert the `birthday` field to a timestamp.
8. Go to the QuestDB console running at http://localhost:19000 and run `select * from 'People';` and you should see some rows.
Expand All @@ -34,7 +34,7 @@ The sample project consists of 3 components:

The Kafka Connect configuration looks complex, but it's quite simple. Let's have a closer look. This is how the `curl` command looks like:
```shell
$ curl -X POST -H "Content-Type: application/json" -d '{"name":"questdb-connect","config":{"topics":"People","connector.class":"io.questdb.kafka.QuestDBSinkConnector","tasks.max":"1","key.converter":"org.apache.kafka.connect.storage.StringConverter","value.converter":"org.apache.kafka.connect.json.JsonConverter","value.converter.schemas.enable":"false","host":"questdb", "timestamp.field.name": "birthday", "transforms":"convert_birthday","transforms.convert_birthday.type":"org.apache.kafka.connect.transforms.TimestampConverter$Value","transforms.convert_birthday.target.type":"Timestamp","transforms.convert_birthday.field":"birthday","transforms.convert_birthday.format": "yyyy-MM-dd'"'"'T'"'"'HH:mm:ss.SSSX"}}' localhost:8083/connectors
$ curl -X POST -H "Content-Type: application/json" -d '{"name":"questdb-connect","config":{"topics":"People","connector.class":"io.questdb.kafka.QuestDBSinkConnector","tasks.max":"1","key.converter":"org.apache.kafka.connect.storage.StringConverter","value.converter":"org.apache.kafka.connect.json.JsonConverter","value.converter.schemas.enable":"false","client.conf.string":"http::addr=questdb;", "timestamp.field.name": "birthday", "transforms":"convert_birthday","transforms.convert_birthday.type":"org.apache.kafka.connect.transforms.TimestampConverter$Value","transforms.convert_birthday.target.type":"Timestamp","transforms.convert_birthday.field":"birthday","transforms.convert_birthday.format": "yyyy-MM-dd'"'"'T'"'"'HH:mm:ss.SSSX"}}' localhost:8083/connectors
```

It uses `curl` to submit a following JSON to Kafka Connect:
Expand All @@ -48,7 +48,7 @@ It uses `curl` to submit a following JSON to Kafka Connect:
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"host": "questdb",
"client.conf.string": "http::addr=questdb;",
"timestamp.field.name": "birthday",
"transforms": "convert_birthday",
"transforms.convert_birthday.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
Expand All @@ -62,6 +62,6 @@ Most of the fields are self-explanatory. The `transforms` field is a bit more co
The other potentially non-obvious configuration elements are:
1. `"value.converter.schemas.enable": "false"` It disables the schema support in the Kafka Connect JSON converter. The sample project doesn't use schemas.
2. `"host": "questdb"` It defines the hostname of the QuestDB instance. The hostname is defined in the [docker-compose.yml](docker-compose.yml) file.
2. `"client.conf.string": "http::addr=questdb;"` It configures QuestDB client to use the HTTP transport and connect to a hostname `questdb`. The hostname is defined in the [docker-compose.yml](docker-compose.yml) file.
3. `"timestamp.field.name": "birthday"` It defines the name of the field that should be used as a timestamp. It uses the field that was converted by the Kafka Connect transformation described above.
4. The ugly value in `"transforms.convert_birthday.format": "yyyy-MM-dd'"'"'T'"'"'HH:mm:ss.SSSX"`. This part looks funny: `'"'"'T'"'"'`. In fact, it's a way to submit an apostrophe via shell which uses apostrophes to define strings. The apostrophe is required to escape the `T` character in the date format. The date format is `yyyy-MM-dd'T'HH:mm:ss.SSSX`. If you know a better way to submit the same JSON then please [open a new issue](https://github.com/questdb/kafka-questdb-connector/issues/new).
1 change: 1 addition & 0 deletions kafka-questdb-connector-samples/stocks/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,4 @@ services:
CONFIG_STORAGE_TOPIC: "debezium_connect_config"
OFFSET_STORAGE_TOPIC: "debezium_connect_offsets"
STATUS_STORAGE_TOPIC: "debezium_connect_status"
OFFSET_FLUSH_INTERVAL_MS: "1000"
11 changes: 6 additions & 5 deletions kafka-questdb-connector-samples/stocks/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ Bear in mind the sample starts multiple containers. It's running fine on my mach
It starts the Debezium connector that will capture changes from Postgres and feed them to Kafka.
8. Execute following command to start QuestDB Kafka Connect sink:
```shell
curl -X POST -H "Content-Type: application/json" -d '{"name":"questdb-connect","config":{"topics":"dbserver1.public.stock","table":"stock", "connector.class":"io.questdb.kafka.QuestDBSinkConnector","tasks.max":"1","key.converter":"org.apache.kafka.connect.storage.StringConverter","value.converter":"org.apache.kafka.connect.json.JsonConverter","host":"questdb", "transforms":"unwrap", "transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState", "include.key": "false", "symbols": "symbol", "timestamp.field.name": "last_update"}}' localhost:8083/connectors
curl -X POST -H "Content-Type: application/json" -d '{"name":"questdb-connect","config":{"topics":"dbserver1.public.stock","table":"stock", "connector.class":"io.questdb.kafka.QuestDBSinkConnector","tasks.max":"1","key.converter":"org.apache.kafka.connect.storage.StringConverter","value.converter":"org.apache.kafka.connect.json.JsonConverter","client.conf.string":"http::addr=questdb;auto_flush_interval=1000000;", "transforms":"unwrap", "transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState", "include.key": "false", "symbols": "symbol", "timestamp.field.name": "last_update"}}' localhost:8083/connectors
```
It starts the QuestDB Kafka Connect sink that will read changes from Kafka and write them to QuestDB.
9. Go to QuestDB Web Console running at http://localhost:19000/ and execute following query:
Expand Down Expand Up @@ -129,7 +129,7 @@ Most of the fields are self-explanatory. The only non-obvious one is `database.s
### Kafka QuestDB connector
The Kafka QuestDB connector re-uses the same Kafka Connect runtime as the Debezium connector. It's also started using `curl` command. This is how we started the QuestDB connector:
```shell
curl -X POST -H "Content-Type: application/json" -d '{"name":"questdb-connect","config":{"topics":"dbserver1.public.stock","table":"stock", "connector.class":"io.questdb.kafka.QuestDBSinkConnector","tasks.max":"1","key.converter":"org.apache.kafka.connect.storage.StringConverter","value.converter":"org.apache.kafka.connect.json.JsonConverter","host":"questdb", "transforms":"unwrap", "transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState", "include.key": "false", "symbols": "symbol", "timestamp.field.name": "last_update"}}' localhost:8083/connectors
curl -X POST -H "Content-Type: application/json" -d '{"name":"questdb-connect","config":{"topics":"dbserver1.public.stock","table":"stock", "connector.class":"io.questdb.kafka.QuestDBSinkConnector","tasks.max":"1","key.converter":"org.apache.kafka.connect.storage.StringConverter","value.converter":"org.apache.kafka.connect.json.JsonConverter","client.conf.string":"http::addr=questdb;auto_flush_interval=1000000;", "transforms":"unwrap", "transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState", "include.key": "false", "symbols": "symbol", "timestamp.field.name": "last_update"}}' localhost:8083/connectors
```
This is the connector JSON configuration nicely formatted:
```json
Expand All @@ -142,7 +142,7 @@ This is the connector JSON configuration nicely formatted:
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"host": "questdb",
"client.conf.string": "http::addr=questdb;auto_flush_interval=1000000;",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"include.key": "false",
Expand All @@ -153,8 +153,9 @@ This is the connector JSON configuration nicely formatted:
```
Again, most of the fields are obvious. Let's focus on the non-obvious ones.
1. `"symbols": "symbol"` this instruct to connector to use the [QuestDB symbol type](https://questdb.io/docs/concept/symbol/) for a column named "symbols". This column has low cardinality thus it's a good candidate for symbol type.
2. `"timestamp.field.name": "last_update"` this instructs the connector to use the `last_update` column as the [designated timestamp](https://questdb.io/docs/concept/designated-timestamp/) column.
3. `"transforms":"unwrap"` and `"transforms.unwrap.type"` this instructs the connector to use Debezium's ExtractNewRecordState.
2. `"client.conf.string": "http::addr=questdb;auto_flush_interval=1000000;"` this configures the QuestDB client to use the HTTP transport and connect to a hostname `questdb`. The hostname is defined in the [docker-compose.yml](docker-compose.yml) file. The `auto_flush_interval=1000000` controls frequency of automatic flushing data to QuestDB. It's set to 1000 seconds which effectively disables automatic interval-based flushing.
3. `"timestamp.field.name": "last_update"` this instructs the connector to use the `last_update` column as the [designated timestamp](https://questdb.io/docs/concept/designated-timestamp/) column.
4. `"transforms":"unwrap"` and `"transforms.unwrap.type"` this instructs the connector to use Debezium's ExtractNewRecordState.
Let's focus on the ExtractNewRecordState transform a bit more. Why is it needed at all? For every change in the Postgres table the Debezium emits a JSON message to a Kafka topic. Messages look like this:
```json
Expand Down

0 comments on commit 01bb74a

Please sign in to comment.