Skip to content

Commit

Permalink
update for 0.11
Browse files Browse the repository at this point in the history
  • Loading branch information
jerrinot committed Apr 7, 2024
1 parent 4ac96f7 commit 413334b
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 8 deletions.
3 changes: 1 addition & 2 deletions kafka-questdb-connector-samples/faker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,5 +57,4 @@ services:
CONNECT_REST_ADVERTISED_HOST_NAME: "connect"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_OFFSET_FLUSH_INTERVAL_MS: "1000"
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
3 changes: 1 addition & 2 deletions kafka-questdb-connector-samples/stocks/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,5 +86,4 @@ services:
BOOTSTRAP_SERVERS: "kafka:9092"
CONFIG_STORAGE_TOPIC: "debezium_connect_config"
OFFSET_STORAGE_TOPIC: "debezium_connect_offsets"
STATUS_STORAGE_TOPIC: "debezium_connect_status"
OFFSET_FLUSH_INTERVAL_MS: "1000"
STATUS_STORAGE_TOPIC: "debezium_connect_status"
8 changes: 4 additions & 4 deletions kafka-questdb-connector-samples/stocks/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ Bear in mind the sample starts multiple containers. It's running fine on my mach
It starts the Debezium connector that will capture changes from Postgres and feed them to Kafka.
8. Execute following command to start QuestDB Kafka Connect sink:
```shell
curl -X POST -H "Content-Type: application/json" -d '{"name":"questdb-connect","config":{"topics":"dbserver1.public.stock","table":"stock", "connector.class":"io.questdb.kafka.QuestDBSinkConnector","tasks.max":"1","key.converter":"org.apache.kafka.connect.storage.StringConverter","value.converter":"org.apache.kafka.connect.json.JsonConverter","client.conf.string":"http::addr=questdb;auto_flush_interval=1000000;", "transforms":"unwrap", "transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState", "include.key": "false", "symbols": "symbol", "timestamp.field.name": "last_update"}}' localhost:8083/connectors
curl -X POST -H "Content-Type: application/json" -d '{"name":"questdb-connect","config":{"topics":"dbserver1.public.stock","table":"stock", "connector.class":"io.questdb.kafka.QuestDBSinkConnector","tasks.max":"1","key.converter":"org.apache.kafka.connect.storage.StringConverter","value.converter":"org.apache.kafka.connect.json.JsonConverter","client.conf.string":"http::addr=questdb;", "transforms":"unwrap", "transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState", "include.key": "false", "symbols": "symbol", "timestamp.field.name": "last_update"}}' localhost:8083/connectors
```
It starts the QuestDB Kafka Connect sink that will read changes from Kafka and write them to QuestDB.
9. Go to QuestDB Web Console running at http://localhost:19000/ and execute following query:
Expand Down Expand Up @@ -129,7 +129,7 @@ Most of the fields are self-explanatory. The only non-obvious one is `database.s
### Kafka QuestDB connector
The Kafka QuestDB connector re-uses the same Kafka Connect runtime as the Debezium connector. It's also started using `curl` command. This is how we started the QuestDB connector:
```shell
curl -X POST -H "Content-Type: application/json" -d '{"name":"questdb-connect","config":{"topics":"dbserver1.public.stock","table":"stock", "connector.class":"io.questdb.kafka.QuestDBSinkConnector","tasks.max":"1","key.converter":"org.apache.kafka.connect.storage.StringConverter","value.converter":"org.apache.kafka.connect.json.JsonConverter","client.conf.string":"http::addr=questdb;auto_flush_interval=1000000;", "transforms":"unwrap", "transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState", "include.key": "false", "symbols": "symbol", "timestamp.field.name": "last_update"}}' localhost:8083/connectors
curl -X POST -H "Content-Type: application/json" -d '{"name":"questdb-connect","config":{"topics":"dbserver1.public.stock","table":"stock", "connector.class":"io.questdb.kafka.QuestDBSinkConnector","tasks.max":"1","key.converter":"org.apache.kafka.connect.storage.StringConverter","value.converter":"org.apache.kafka.connect.json.JsonConverter","client.conf.string":"http::addr=questdb;", "transforms":"unwrap", "transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState", "include.key": "false", "symbols": "symbol", "timestamp.field.name": "last_update"}}' localhost:8083/connectors
```
This is the connector JSON configuration nicely formatted:
```json
Expand All @@ -142,7 +142,7 @@ This is the connector JSON configuration nicely formatted:
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"client.conf.string": "http::addr=questdb;auto_flush_interval=1000000;",
"client.conf.string": "http::addr=questdb;",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"include.key": "false",
Expand All @@ -153,7 +153,7 @@ This is the connector JSON configuration nicely formatted:
```
Again, most of the fields are obvious. Let's focus on the non-obvious ones.
1. `"symbols": "symbol"` this instruct to connector to use the [QuestDB symbol type](https://questdb.io/docs/concept/symbol/) for a column named "symbols". This column has low cardinality thus it's a good candidate for symbol type.
2. `"client.conf.string": "http::addr=questdb;auto_flush_interval=1000000;"` this configures the QuestDB client to use the HTTP transport and connect to a hostname `questdb`. The hostname is defined in the [docker-compose.yml](docker-compose.yml) file. The `auto_flush_interval=1000000` controls frequency of automatic flushing data to QuestDB. It's set to 1000 seconds which effectively disables automatic interval-based flushing.
2. `"client.conf.string": "http::addr=questdb;"` this configures the QuestDB client to use the HTTP transport and connect to a hostname `questdb`. The hostname is defined in the [docker-compose.yml](docker-compose.yml) file.
3. `"timestamp.field.name": "last_update"` this instructs the connector to use the `last_update` column as the [designated timestamp](https://questdb.io/docs/concept/designated-timestamp/) column.
4. `"transforms":"unwrap"` and `"transforms.unwrap.type"` this instructs the connector to use Debezium's ExtractNewRecordState.
Expand Down

0 comments on commit 413334b

Please sign in to comment.