From 2cf923e56b2c0466d5d294ab995248507a4236dc Mon Sep 17 00:00:00 2001 From: Felix Wu Date: Sat, 27 Jan 2024 21:01:52 -0800 Subject: [PATCH 1/4] ClickHouse CDC MV demo [ pg -> kafka -> ch ] --- solutions/clickhouse-cdc/.gitignore | 7 + solutions/clickhouse-cdc/ch_drop.sql | 8 + solutions/clickhouse-cdc/ch_users.sql | 15 ++ solutions/clickhouse-cdc/kcat.config.template | 5 + solutions/clickhouse-cdc/lab.env | 4 + solutions/clickhouse-cdc/lab.sh | 145 +++++++++++++ solutions/clickhouse-cdc/mv.sql.template | 62 ++++++ solutions/clickhouse-cdc/pg_tables.sql | 20 ++ .../clickhouse-cdc/terraform/avn-token.sh | 6 + solutions/clickhouse-cdc/terraform/cdc.tf | 193 ++++++++++++++++++ solutions/clickhouse-cdc/terraform/data.tf | 3 + .../clickhouse-cdc/terraform/provider.tf | 11 + .../clickhouse-cdc/terraform/variables.tf | 24 +++ 13 files changed, 503 insertions(+) create mode 100644 solutions/clickhouse-cdc/.gitignore create mode 100644 solutions/clickhouse-cdc/ch_drop.sql create mode 100644 solutions/clickhouse-cdc/ch_users.sql create mode 100644 solutions/clickhouse-cdc/kcat.config.template create mode 100644 solutions/clickhouse-cdc/lab.env create mode 100755 solutions/clickhouse-cdc/lab.sh create mode 100644 solutions/clickhouse-cdc/mv.sql.template create mode 100644 solutions/clickhouse-cdc/pg_tables.sql create mode 100755 solutions/clickhouse-cdc/terraform/avn-token.sh create mode 100644 solutions/clickhouse-cdc/terraform/cdc.tf create mode 100644 solutions/clickhouse-cdc/terraform/data.tf create mode 100644 solutions/clickhouse-cdc/terraform/provider.tf create mode 100644 solutions/clickhouse-cdc/terraform/variables.tf diff --git a/solutions/clickhouse-cdc/.gitignore b/solutions/clickhouse-cdc/.gitignore new file mode 100644 index 0000000..cb642d2 --- /dev/null +++ b/solutions/clickhouse-cdc/.gitignore @@ -0,0 +1,7 @@ +clickhouse +kcat.config +mv-*.sql +ca.pem +service.cert +service.key +*/terraform.tfstate* diff --git a/solutions/clickhouse-cdc/ch_drop.sql b/solutions/clickhouse-cdc/ch_drop.sql new file mode 100644 index 0000000..a75dd4e --- /dev/null +++ b/solutions/clickhouse-cdc/ch_drop.sql @@ -0,0 +1,8 @@ +DROP TABLE IF EXISTS shire.population; +DROP TABLE IF EXISTS shire.population_mv; +DROP TABLE IF EXISTS shire.weather; +DROP TABLE IF EXISTS shire.weather_mv; +DROP TABLE IF EXISTS rivendell.population; +DROP TABLE IF EXISTS rivendell.population_mv; +DROP TABLE IF EXISTS rivendell.weather; +DROP TABLE IF EXISTS rivendell.weather_mv; diff --git a/solutions/clickhouse-cdc/ch_users.sql b/solutions/clickhouse-cdc/ch_users.sql new file mode 100644 index 0000000..ede30c6 --- /dev/null +++ b/solutions/clickhouse-cdc/ch_users.sql @@ -0,0 +1,15 @@ +CREATE USER OR REPLACE frodo IDENTIFIED BY 'your_password'; + +CREATE USER OR REPLACE sam IDENTIFIED BY 'your_password'; + +CREATE USER OR REPLACE arwen IDENTIFIED BY 'your_password'; + +CREATE USER OR REPLACE elrond IDENTIFIED BY 'your_password'; + +GRANT shire to frodo; + +GRANT shire to sam; + +GRANT rivendell to arwen; + +GRANT rivendell to elrond; \ No newline at end of file diff --git a/solutions/clickhouse-cdc/kcat.config.template b/solutions/clickhouse-cdc/kcat.config.template new file mode 100644 index 0000000..4c674e2 --- /dev/null +++ b/solutions/clickhouse-cdc/kcat.config.template @@ -0,0 +1,5 @@ +bootstrap.servers=address:port +security.protocol=ssl +ssl.key.location=service.key +ssl.certificate.location=service.cert +ssl.ca.location=ca.pem diff --git a/solutions/clickhouse-cdc/lab.env b/solutions/clickhouse-cdc/lab.env new file mode 100644 index 0000000..aa8cc42 --- /dev/null +++ b/solutions/clickhouse-cdc/lab.env @@ -0,0 +1,4 @@ +PROJECT="felixwu-demo" +SERVICE_KAFKA="cdc-kafka" +SERVICE_CH="cdc-clickhouse" +SERVICE_PG="cdc-pg" \ No newline at end of file diff --git a/solutions/clickhouse-cdc/lab.sh b/solutions/clickhouse-cdc/lab.sh new file mode 100755 index 0000000..a9086b3 --- /dev/null +++ b/solutions/clickhouse-cdc/lab.sh @@ -0,0 +1,145 @@ +#!/bin/bash +source ./lab.env + +setup_env() { +CH_JSON="$(avn service get --json ${SERVICE_CH} --project ${PROJECT})" + +CH_USER=avnadmin +CH_PASS=$(echo ${CH_JSON} | jq -r '.users[] | select(.username=="avnadmin") | .password') +CH_HOST=$(echo ${CH_JSON} | jq -r '.components[] | select(.component=="clickhouse") | .host') +CH_PORT=$(echo ${CH_JSON} | jq -r '.components[] | select(.component=="clickhouse") | .port') +CH_HTTPS_PORT=$(echo ${CH_JSON} | jq -r '.components[] | select(.component=="clickhouse_https") | .port') +CH_CLI="./clickhouse client --user ${CH_USER} --password ${CH_PASS} --host ${CH_HOST} --port ${CH_PORT} --secure" + +PG_SERVICE_URI=$(avn service get ${SERVICE_PG} --json | jq -r '.service_uri') +} + +lab_clickhouse() { + setup_env + echo $CH_CLI + $CH_CLI $@ +} + +lab_psql() { + setup_env + echo $PG_SERVICE_URI + psql ${PG_SERVICE_URI} $@ +} + +lab_setup() { +avn service user-creds-download ${SERVICE_KAFKA} --username avnadmin -d . \ +&& printf "✅ " || echo "❌ " +echo "certificates and keys downloaded from ${SERVICE_KAFKA}" + +echo +KAFKA_SERVICE_URI=$(avn service list --json ${SERVICE_KAFKA} | jq -r '.[].service_uri') +echo ${KAFKA_SERVICE_URI} +cat kcat.config.template > kcat.config +sed -i '' -e "s/address:port/${KAFKA_SERVICE_URI}/" kcat.config \ +&& printf "✅ " || echo "❌ " +echo "kcat.config setup completed" + +echo +PG_SERVICE_URI=$(avn service get ${SERVICE_PG} --json | jq -r '.service_uri') +cat pg_tables.sql | psql ${PG_SERVICE_URI} \ +&& printf "✅ " || echo "❌ " +echo "pg_tables.sql imported into postgres ${SERVICE_PG}" + + +[ -e "./clickhouse" ] || curl https://clickhouse.com/ | sh + +# echo +# OS_SERVICE_URI=$(avn service get ${SERVICE_OS} --json | jq -r '.service_uri') +# curl -X PUT ${OS_SERVICE_URI}/suspecious-logins -H 'Content-Type: application/json' --data @suspecious-logins-mapping.json \ +# && printf "\n✅ " || echo "❌ " +# echo "suspecious-logins index mapping created in opensearch ${SERVICE_OS}" +} + +lab_teardown() { +rm -f ca.pem service.cert service.key os-connector.json kcat.config +# echo ${SERVICE_KAFKA} | avn service terminate ${SERVICE_KAFKA} +# echo ${SERVICE_CH} | avn service terminate ${SERVICE_CH} +# echo ${SERVICE_PG} | avn service terminate ${SERVICE_PG} +} + +lab_pgload() { + setup_env +# while true; do + # num_entries=$((1 + RANDOM % 100)) + num_entries=$1 + + echo "---${num_entries} entries---" + SQL="\c middleearth;\n" + for _ in $(seq $num_entries); do + # time_stamp=$(date +%s) + # user_id=$((190 + RANDOM % 10)) + # action=("login" "attempt") + # random_action=${action[RANDOM % ${#action[@]}]} + # source_ip="192.168.123.16$((RANDOM % 10))" + + # echo "{\"time_stamp\": $time_stamp, \"user_id\": $user_id, \"action\": \"$random_action\", \"source_ip\": \"$source_ip\"}" | kcat -T -F kcat.config -P -t test00 + + region="1$((RANDOM % 2))" + total="1$((RANDOM % 1000))" + temperature=$((RANDOM % 66 - 20)).$((RANDOM % 100)) + + PSQL+="INSERT INTO population (region, total) VALUES (${region}, ${total});\n" + WSQL+="INSERT INTO weather (region, temperature) VALUES (${region}, ${temperature});\n" + done + SQL+=${PSQL}${WSQL}; + # SQL+=" | psql ${PG_SERVICE_URI} + # sleep 1 + printf "${SQL}" + printf "${SQL}" | psql ${PG_SERVICE_URI} +# sleep 10; +# done +} + +lab_chmv() { + setup_env + ROLE=$1 + REGION=$2 + echo "REGION: ${REGION}" + avn service clickhouse database create ${SERVICE_CH} ${ROLE} \ + && printf "✅ " || echo "❌ " + echo "${ROLE} created in clickhouse ${SERVICE_CH}" + sed -e "s/role_name/${ROLE}/g" mv.sql.template > mv-${ROLE}.sql + sed -i '' -e "s/region_id/${REGION}/g" mv-${ROLE}.sql \ + && printf "✅ " || echo "❌ " + echo " mv-${ROLE}.sql created successfully." + ${CH_CLI} --queries-file ./mv-${ROLE}.sql --progress=tty --processed-rows --echo -t 2>&1 +} + +lab_reset() { + setup_env + printf "\c middleearth;\nDELETE FROM population;\nDELETE FROM weather;\n" | psql ${PG_SERVICE_URI} + # ${CH_CLI} --queries-file ./ch_drop.sql --progress=tty --processed-rows --echo -t 2>&1 + + avn service clickhouse database delete cdc-clickhouse rivendell + avn service clickhouse database delete cdc-clickhouse shire + + lab_chmv rivendell 10 + lab_chmv shire 11 + ${CH_CLI} --queries-file ./ch_users.sql --progress=tty --processed-rows --echo -t 2>&1 + + lab_pgload 10 +} + +case $1 in + clickhouse) + lab_clickhouse "${@:2}" ;; + psql) + lab_psql "${@:2}" ;; + reset) + lab_reset ;; + setup) + lab_setup ;; + teardown) + lab_teardown ;; + pgload) + lab_pgload "${@:2}" ;; + chmv) + lab_chmv "${@:2}" ;; + *) + printf "Usage: ./lab.sh [ setup | pgload n | teardown]\n" ;; +esac diff --git a/solutions/clickhouse-cdc/mv.sql.template b/solutions/clickhouse-cdc/mv.sql.template new file mode 100644 index 0000000..1048709 --- /dev/null +++ b/solutions/clickhouse-cdc/mv.sql.template @@ -0,0 +1,62 @@ +CREATE TABLE `role_name`.population ( + `id` UInt64, + `ts` DateTime64, + `region` UInt64, + `total` UInt64, + `version` UInt64, + `deleted` UInt8 +) ENGINE = ReplacingMergeTree(version, deleted) +PRIMARY KEY(id) +ORDER BY (id); + +CREATE MATERIALIZED VIEW `role_name`.population_mv TO `role_name`.population ( + `id` UInt64, + `ts` DateTime64, + `region` UInt64, + `total` UInt64, + `version` UInt64, + `deleted` UInt8 +) AS +SELECT + if(op = 'd', before.id, after.id) AS id, + toDateTime64(ts_ms / 1000, 3) as ts, + if(op = 'd', before.region, after.region) AS region, + if(op = 'd', before.total, after.total) AS total, + source.lsn, source.lsn AS version, + if(op = 'd', 1, 0) AS deleted +FROM `service_cdc-kafka`.population_cdc +WHERE region = region_id AND ((op = 'c') OR (op = 'r') OR (op = 'u') OR (op = 'd')); + +CREATE TABLE `role_name`.weather ( + `id` UInt64, + `ts` DateTime64, + `region` UInt64, + `temperature` Float64, + `version` UInt64, + `deleted` UInt8 +) ENGINE = ReplacingMergeTree(version, deleted) +PRIMARY KEY(id) +ORDER BY (id); + +CREATE MATERIALIZED VIEW `role_name`.weather_mv TO `role_name`.weather ( + `id` UInt64, + `ts` DateTime64, + `region` UInt64, + `temperature` Float64, + `version` UInt64, + `deleted` UInt8 +) AS +SELECT + if(op = 'd', before.id, after.id) AS id, + toDateTime64(ts_ms / 1000, 3) as ts, + if(op = 'd', before.region, after.region) AS region, + if(op = 'd', before.temperature, after.temperature) AS temperature, + source.lsn AS version, + if(op = 'd', 1, 0) AS deleted +FROM `service_cdc-kafka`.weather_cdc +WHERE region = region_id AND ((op = 'c') OR (op = 'r') OR (op = 'u') OR (op = 'd')); + +CREATE role OR REPLACE role_name; + +GRANT SELECT ON role_name.* TO role_name; +GRANT SELECT ON role_name.* TO role_name; diff --git a/solutions/clickhouse-cdc/pg_tables.sql b/solutions/clickhouse-cdc/pg_tables.sql new file mode 100644 index 0000000..0684ef1 --- /dev/null +++ b/solutions/clickhouse-cdc/pg_tables.sql @@ -0,0 +1,20 @@ +CREATE DATABASE middleearth; + +\c middleearth; + +CREATE TABLE IF NOT EXISTS population ( + id SERIAL PRIMARY KEY, + region INT NOT NULL, + total INT NOT NULL +); + +CREATE TABLE IF NOT EXISTS weather ( + id SERIAL PRIMARY KEY, + region INT NOT NULL, + temperature FLOAT NOT NULL +); + +-- FULL - Emitted events for update and delete operations contain the previous values of all columns in the table. +-- This is needed if you need to support delete operations. +ALTER TABLE population REPLICA IDENTITY FULL; +ALTER TABLE weather REPLICA IDENTITY FULL; \ No newline at end of file diff --git a/solutions/clickhouse-cdc/terraform/avn-token.sh b/solutions/clickhouse-cdc/terraform/avn-token.sh new file mode 100755 index 0000000..3407162 --- /dev/null +++ b/solutions/clickhouse-cdc/terraform/avn-token.sh @@ -0,0 +1,6 @@ +#! /bin/bash +if [ $(avn user info --json | jq -r '.[].state') != "active" ]; then + avn user login +fi + +cat ~/.config/aiven/aiven-credentials.json diff --git a/solutions/clickhouse-cdc/terraform/cdc.tf b/solutions/clickhouse-cdc/terraform/cdc.tf new file mode 100644 index 0000000..9068859 --- /dev/null +++ b/solutions/clickhouse-cdc/terraform/cdc.tf @@ -0,0 +1,193 @@ +resource "aiven_pg" "pg" { + project = var.aiven_project + cloud_name = var.cloud_name + plan = var.pg_plan + service_name = "cdc-pg" +} + +resource "aiven_pg_database" "pgdb" { + project = var.aiven_project + service_name = "cdc-pg" + database_name = "middleearth" +} + +resource "aiven_kafka" "kafka" { + project = var.aiven_project + cloud_name = var.cloud_name + plan = var.kafka_plan + service_name = "cdc-kafka" + kafka_user_config { + kafka_rest = true + kafka_connect = true + schema_registry = true + kafka_version = "3.6" + kafka { + auto_create_topics_enable = true + num_partitions = 3 + default_replication_factor = 2 + min_insync_replicas = 2 + } + } +} + +# resource "aiven_kafka_topic" "source" { +# project = aiven_kafka.kafka.project +# service_name = aiven_kafka.kafka.service_name +# partitions = 2 +# replication = 3 +# topic_name = "source_topic" +# } + +# resource "aiven_kafka_topic" "sink" { +# project = aiven_kafka.kafka.project +# service_name = aiven_kafka.kafka.service_name +# partitions = 2 +# replication = 3 +# topic_name = "sink_topic" +# } + +resource "aiven_kafka_connector" "kafka-pg-source" { + project = var.aiven_project + service_name = aiven_kafka.kafka.service_name + connector_name = "kafka-pg-source" + + config = { + "name" = "kafka-pg-source" + "connector.class" = "io.debezium.connector.postgresql.PostgresConnector" + "snapshot.mode" = "initial" + "database.hostname" = sensitive(aiven_pg.pg.service_host) + "database.port" = sensitive(aiven_pg.pg.service_port) + "database.password" = sensitive(aiven_pg.pg.service_password) + "database.user" = sensitive(aiven_pg.pg.service_username) + "database.dbname" = "middleearth" + "database.server.name" = "middleearth-replicator" + "database.ssl.mode" = "require" + "include.schema.changes" = true + "include.query" = true + # "plugin.name" = "wal2json" + "plugin.name" = "pgoutput" + # tables needs to be specified for pgoutput plugin + # see details https://docs.aiven.io/docs/products/kafka/kafka-connect/howto/debezium-source-connector-pg# + "publication.autocreate.mode" = "filtered" + "table.include.list" = "public.population,public.weather" + "slot.name" = "dbz" + "decimal.handling.mode" = "double" + "_aiven.restart.on.failure" = "true" + "transforms" = "flatten" + "transforms.flatten.type" = "org.apache.kafka.connect.transforms.Flatten$Value" + } +} + +resource "aiven_clickhouse" "clickhouse" { + project = var.aiven_project + cloud_name = var.cloud_name + plan = var.clickhouse_plan + service_name = "cdc-clickhouse" +} + +resource "aiven_service_integration" "clickhouse_kafka_source" { + project = var.aiven_project + integration_type = "clickhouse_kafka" + source_service_name = aiven_kafka.kafka.service_name + destination_service_name = aiven_clickhouse.clickhouse.service_name + clickhouse_kafka_user_config { + tables { + name = "population_cdc" + group_name = "clickhouse-ingestion" + data_format = "JSONEachRow" + columns { + name = "before.id" + type = "UInt64" + } + columns { + name = "before.region" + type = "UInt64" + } + columns { + name = "before.total" + type = "UInt64" + } + columns { + name = "after.id" + type = "UInt64" + } + columns { + name = "after.region" + type = "UInt64" + } + columns { + name = "after.total" + type = "UInt64" + } + columns { + name = "op" + type = "LowCardinality(String)" + } + columns { + name = "ts_ms" + type = "UInt64" + } + columns { + name = "source.sequence" + type = "String" + } + columns { + name = "source.lsn" + type = "UInt64" + } + + topics { + name = "middleearth-replicator.public.population" + } + } + tables { + name = "weather_cdc" + group_name = "clickhouse-ingestion" + data_format = "JSONEachRow" + columns { + name = "before.id" + type = "UInt64" + } + columns { + name = "before.region" + type = "UInt64" + } + columns { + name = "before.temperature" + type = "Float64" + } + columns { + name = "after.id" + type = "UInt64" + } + columns { + name = "after.region" + type = "UInt64" + } + columns { + name = "after.temperature" + type = "Float64" + } + columns { + name = "op" + type = "LowCardinality(String)" + } + columns { + name = "ts_ms" + type = "UInt64" + } + columns { + name = "source.sequence" + type = "String" + } + columns { + name = "source.lsn" + type = "UInt64" + } + + topics { + name = "middleearth-replicator.public.weather" + } + } + } +} diff --git a/solutions/clickhouse-cdc/terraform/data.tf b/solutions/clickhouse-cdc/terraform/data.tf new file mode 100644 index 0000000..e597d79 --- /dev/null +++ b/solutions/clickhouse-cdc/terraform/data.tf @@ -0,0 +1,3 @@ +data "external" "env" { + program = [ "./avn-token.sh" ] +} diff --git a/solutions/clickhouse-cdc/terraform/provider.tf b/solutions/clickhouse-cdc/terraform/provider.tf new file mode 100644 index 0000000..b2d3a32 --- /dev/null +++ b/solutions/clickhouse-cdc/terraform/provider.tf @@ -0,0 +1,11 @@ +terraform { + required_providers { + aiven = { + source = "aiven/aiven" + version = ">=4.0.0, < 5.0.0" } + } +} + +provider "aiven" { + api_token = data.external.env.result["auth_token"] +} diff --git a/solutions/clickhouse-cdc/terraform/variables.tf b/solutions/clickhouse-cdc/terraform/variables.tf new file mode 100644 index 0000000..0ececbd --- /dev/null +++ b/solutions/clickhouse-cdc/terraform/variables.tf @@ -0,0 +1,24 @@ +variable "aiven_project" { + type = string + default = "felixwu-demo" +} + +variable "cloud_name" { + type = string + default = "google-us-west1" +} + +variable "clickhouse_plan" { + type = string + default = "startup-16" +} + +variable "kafka_plan" { + type = string + default = "business-4" +} + +variable "pg_plan" { + type = string + default = "startup-4" +} From 8afd41a2c059ea806e9c2f12e469d0fac346930d Mon Sep 17 00:00:00 2001 From: Felix Wu Date: Sun, 28 Jan 2024 22:35:24 -0800 Subject: [PATCH 2/4] clickhouse-cdc updated terraform depends_on and various fixes --- solutions/clickhouse-cdc/ch_users.sql | 10 ++-- solutions/clickhouse-cdc/lab.sh | 73 +++++++++++------------ solutions/clickhouse-cdc/mv.sql.template | 3 + solutions/clickhouse-cdc/terraform/cdc.tf | 27 +++------ 4 files changed, 50 insertions(+), 63 deletions(-) diff --git a/solutions/clickhouse-cdc/ch_users.sql b/solutions/clickhouse-cdc/ch_users.sql index ede30c6..602e30e 100644 --- a/solutions/clickhouse-cdc/ch_users.sql +++ b/solutions/clickhouse-cdc/ch_users.sql @@ -1,10 +1,10 @@ -CREATE USER OR REPLACE frodo IDENTIFIED BY 'your_password'; +CREATE USER OR REPLACE frodo IDENTIFIED BY 'password'; -CREATE USER OR REPLACE sam IDENTIFIED BY 'your_password'; +CREATE USER OR REPLACE sam IDENTIFIED BY 'password'; -CREATE USER OR REPLACE arwen IDENTIFIED BY 'your_password'; +CREATE USER OR REPLACE arwen IDENTIFIED BY 'password'; -CREATE USER OR REPLACE elrond IDENTIFIED BY 'your_password'; +CREATE USER OR REPLACE elrond IDENTIFIED BY 'password'; GRANT shire to frodo; @@ -12,4 +12,4 @@ GRANT shire to sam; GRANT rivendell to arwen; -GRANT rivendell to elrond; \ No newline at end of file +GRANT rivendell to elrond; diff --git a/solutions/clickhouse-cdc/lab.sh b/solutions/clickhouse-cdc/lab.sh index a9086b3..811fab2 100755 --- a/solutions/clickhouse-cdc/lab.sh +++ b/solutions/clickhouse-cdc/lab.sh @@ -26,37 +26,42 @@ lab_psql() { psql ${PG_SERVICE_URI} $@ } +lab_setuppg() { + echo + PG_SERVICE_URI=$(avn service get ${SERVICE_PG} --json | jq -r '.service_uri') + cat pg_tables.sql | psql ${PG_SERVICE_URI} \ + && printf "✅ " || echo "❌ " + echo "pg_tables.sql imported into postgres ${SERVICE_PG}" +} + lab_setup() { -avn service user-creds-download ${SERVICE_KAFKA} --username avnadmin -d . \ -&& printf "✅ " || echo "❌ " -echo "certificates and keys downloaded from ${SERVICE_KAFKA}" - -echo -KAFKA_SERVICE_URI=$(avn service list --json ${SERVICE_KAFKA} | jq -r '.[].service_uri') -echo ${KAFKA_SERVICE_URI} -cat kcat.config.template > kcat.config -sed -i '' -e "s/address:port/${KAFKA_SERVICE_URI}/" kcat.config \ -&& printf "✅ " || echo "❌ " -echo "kcat.config setup completed" - -echo -PG_SERVICE_URI=$(avn service get ${SERVICE_PG} --json | jq -r '.service_uri') -cat pg_tables.sql | psql ${PG_SERVICE_URI} \ -&& printf "✅ " || echo "❌ " -echo "pg_tables.sql imported into postgres ${SERVICE_PG}" + setup_env + avn service user-creds-download ${SERVICE_KAFKA} --username avnadmin -d . \ + && printf "✅ " || echo "❌ " + echo "certificates and keys downloaded from ${SERVICE_KAFKA}" + echo + KAFKA_SERVICE_URI=$(avn service list --json ${SERVICE_KAFKA} | jq -r '.[].service_uri') + echo ${KAFKA_SERVICE_URI} + cat kcat.config.template > kcat.config + sed -i '' -e "s/address:port/${KAFKA_SERVICE_URI}/" kcat.config \ + && printf "✅ " || echo "❌ " + echo "kcat.config setup completed" + + echo + PG_SERVICE_URI=$(avn service get ${SERVICE_PG} --json | jq -r '.service_uri') + cat pg_tables.sql | psql ${PG_SERVICE_URI} \ + && printf "✅ " || echo "❌ " + echo "pg_tables.sql imported into postgres ${SERVICE_PG}" -[ -e "./clickhouse" ] || curl https://clickhouse.com/ | sh + [ -e "./clickhouse" ] || curl https://clickhouse.com/ | sh -# echo -# OS_SERVICE_URI=$(avn service get ${SERVICE_OS} --json | jq -r '.service_uri') -# curl -X PUT ${OS_SERVICE_URI}/suspecious-logins -H 'Content-Type: application/json' --data @suspecious-logins-mapping.json \ -# && printf "\n✅ " || echo "❌ " -# echo "suspecious-logins index mapping created in opensearch ${SERVICE_OS}" + ${CH_CLI} --queries-file ./ch_mv.sql --progress=tty --processed-rows --echo -t 2>&1 } lab_teardown() { rm -f ca.pem service.cert service.key os-connector.json kcat.config +cd terraform && terraform destroy # echo ${SERVICE_KAFKA} | avn service terminate ${SERVICE_KAFKA} # echo ${SERVICE_CH} | avn service terminate ${SERVICE_CH} # echo ${SERVICE_PG} | avn service terminate ${SERVICE_PG} @@ -64,20 +69,12 @@ rm -f ca.pem service.cert service.key os-connector.json kcat.config lab_pgload() { setup_env -# while true; do - # num_entries=$((1 + RANDOM % 100)) - num_entries=$1 + num_entries=$1 - echo "---${num_entries} entries---" + echo "Generating ${num_entries} entries..." SQL="\c middleearth;\n" for _ in $(seq $num_entries); do # time_stamp=$(date +%s) - # user_id=$((190 + RANDOM % 10)) - # action=("login" "attempt") - # random_action=${action[RANDOM % ${#action[@]}]} - # source_ip="192.168.123.16$((RANDOM % 10))" - - # echo "{\"time_stamp\": $time_stamp, \"user_id\": $user_id, \"action\": \"$random_action\", \"source_ip\": \"$source_ip\"}" | kcat -T -F kcat.config -P -t test00 region="1$((RANDOM % 2))" total="1$((RANDOM % 1000))" @@ -87,12 +84,9 @@ lab_pgload() { WSQL+="INSERT INTO weather (region, temperature) VALUES (${region}, ${temperature});\n" done SQL+=${PSQL}${WSQL}; - # SQL+=" | psql ${PG_SERVICE_URI} - # sleep 1 + printf "${SQL}" printf "${SQL}" | psql ${PG_SERVICE_URI} -# sleep 10; -# done } lab_chmv() { @@ -112,6 +106,7 @@ lab_chmv() { lab_reset() { setup_env + printf "Reset all test data in postgres ${SERVICE_PG} and clickhouse ${SERVICE_CH}..." printf "\c middleearth;\nDELETE FROM population;\nDELETE FROM weather;\n" | psql ${PG_SERVICE_URI} # ${CH_CLI} --queries-file ./ch_drop.sql --progress=tty --processed-rows --echo -t 2>&1 @@ -134,6 +129,8 @@ case $1 in lab_reset ;; setup) lab_setup ;; + setuppg) + lab_setuppg ;; teardown) lab_teardown ;; pgload) @@ -141,5 +138,5 @@ case $1 in chmv) lab_chmv "${@:2}" ;; *) - printf "Usage: ./lab.sh [ setup | pgload n | teardown]\n" ;; + printf "Usage: ./lab.sh [ setup | clickhouse | psql | pgload n | reset | teardown]\n" ;; esac diff --git a/solutions/clickhouse-cdc/mv.sql.template b/solutions/clickhouse-cdc/mv.sql.template index 1048709..c8d5563 100644 --- a/solutions/clickhouse-cdc/mv.sql.template +++ b/solutions/clickhouse-cdc/mv.sql.template @@ -1,3 +1,6 @@ +-- ReplacingMergeTree +-- https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/replacingmergetree + CREATE TABLE `role_name`.population ( `id` UInt64, `ts` DateTime64, diff --git a/solutions/clickhouse-cdc/terraform/cdc.tf b/solutions/clickhouse-cdc/terraform/cdc.tf index 9068859..1679f7b 100644 --- a/solutions/clickhouse-cdc/terraform/cdc.tf +++ b/solutions/clickhouse-cdc/terraform/cdc.tf @@ -6,6 +6,7 @@ resource "aiven_pg" "pg" { } resource "aiven_pg_database" "pgdb" { + depends_on = [aiven_pg.pg] project = var.aiven_project service_name = "cdc-pg" database_name = "middleearth" @@ -30,23 +31,8 @@ resource "aiven_kafka" "kafka" { } } -# resource "aiven_kafka_topic" "source" { -# project = aiven_kafka.kafka.project -# service_name = aiven_kafka.kafka.service_name -# partitions = 2 -# replication = 3 -# topic_name = "source_topic" -# } - -# resource "aiven_kafka_topic" "sink" { -# project = aiven_kafka.kafka.project -# service_name = aiven_kafka.kafka.service_name -# partitions = 2 -# replication = 3 -# topic_name = "sink_topic" -# } - resource "aiven_kafka_connector" "kafka-pg-source" { + depends_on = [aiven_pg_database.pgdb] project = var.aiven_project service_name = aiven_kafka.kafka.service_name connector_name = "kafka-pg-source" @@ -64,12 +50,12 @@ resource "aiven_kafka_connector" "kafka-pg-source" { "database.ssl.mode" = "require" "include.schema.changes" = true "include.query" = true - # "plugin.name" = "wal2json" - "plugin.name" = "pgoutput" + "plugin.name" = "wal2json" # tables needs to be specified for pgoutput plugin # see details https://docs.aiven.io/docs/products/kafka/kafka-connect/howto/debezium-source-connector-pg# - "publication.autocreate.mode" = "filtered" - "table.include.list" = "public.population,public.weather" + # "plugin.name" = "pgoutput" + # "publication.autocreate.mode" = "filtered" + # "table.include.list" = "public.population,public.weather" "slot.name" = "dbz" "decimal.handling.mode" = "double" "_aiven.restart.on.failure" = "true" @@ -86,6 +72,7 @@ resource "aiven_clickhouse" "clickhouse" { } resource "aiven_service_integration" "clickhouse_kafka_source" { + depends_on = [aiven_kafka_connector.kafka-pg-source] project = var.aiven_project integration_type = "clickhouse_kafka" source_service_name = aiven_kafka.kafka.service_name From e38c9161bba44a694d9dc512327a302b4bd1ae3b Mon Sep 17 00:00:00 2001 From: Felix Wu Date: Sun, 28 Jan 2024 22:42:37 -0800 Subject: [PATCH 3/4] clickhouse-cdc added README --- solutions/clickhouse-cdc/README.md | 48 ++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) create mode 100644 solutions/clickhouse-cdc/README.md diff --git a/solutions/clickhouse-cdc/README.md b/solutions/clickhouse-cdc/README.md new file mode 100644 index 0000000..1bf586a --- /dev/null +++ b/solutions/clickhouse-cdc/README.md @@ -0,0 +1,48 @@ +# clickhouse-cdc + +### PostgreSQL® -> Debezium -> Apache Kafka® -> ClickHouse® +This demo showcases a solution for real-time data integration and analytics using PostgreSQL, Debezium, Apache Kafka, and ClickHouse. The workflow involves capturing changes (supports insert, update and delete) from a PostgreSQL source table, streaming the changes into Kafka using Debezium, integrating Kafka topics into ClickHouse via ClickHouse Kafka Engine, and finally, leveraging ClickHouse's **ReplacingMergeTree** engine with **Materialized View** to isolate and efficiently manage data for different regions. + + +## Requirements + +- `avn` +- `jq` +- `terraform` +- `clickhouse` +- `psql` + + +## Steps + +- Update the `PROJECT` and `SERVICE` in `lab.env` + +Run the following to setup required services and download clickhouse CLI. + +```bash +cd terraform && terraform apply && cd .. +./lab.sh setup +``` + +Run the following to create clickhouse databases, tables materialized views and load sample data into postgres. + +```bash +./lab.sh reset +``` + +Run the following to delete all the resources created in this demo. + +```bash +./lab.sh teardown +``` + +## References + +- [Create a Debezium source connector from PostgreSQL® to Apache Kafka®](https://docs.aiven.io/docs/products/kafka/kafka-connect/howto/debezium-source-connector-pg) +- [Connect Apache Kafka® to Aiven for ClickHouse®](https://docs.aiven.io/docs/products/clickhouse/howto/integrate-kafka) +- [Create materialized views in ClickHouse®](https://docs.aiven.io/docs/products/clickhouse/howto/materialized-views) +- [Change Data Capture (CDC) with PostgreSQL and ClickHouse - Part 1](https://clickhouse.com/blog/clickhouse-postgresql-change-data-capture-cdc-part-1) +- [Change Data Capture (CDC) with PostgreSQL and ClickHouse - Part 2](https://clickhouse.com/blog/clickhouse-postgresql-change-data-capture-cdc-part-2) +- [Materialized views in Aiven for ClickHouse® optimize queries for speed and freshness](https://aiven.io/blog/materialized-views-in-aiven-for-clickhouse) +- [ClickHouse® Kafka Engine](https://clickhouse.com/docs/en/engines/table-engines/integrations/kafka) +- [ClickHouse® ReplacingMergeTree Engine](https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/replacingmergetree) \ No newline at end of file From b48e9a28358da13496b0a840002776bc21157484 Mon Sep 17 00:00:00 2001 From: Felix Wu Date: Mon, 29 Jan 2024 16:57:18 -0800 Subject: [PATCH 4/4] clickhouse-cdc README updates and minor fixes --- solutions/clickhouse-cdc/README.md | 256 +++++++++++++++++++++- solutions/clickhouse-cdc/lab.sh | 3 - solutions/clickhouse-cdc/mv.sql.template | 1 - solutions/clickhouse-cdc/terraform/cdc.tf | 8 +- 4 files changed, 257 insertions(+), 11 deletions(-) diff --git a/solutions/clickhouse-cdc/README.md b/solutions/clickhouse-cdc/README.md index 1bf586a..e4f121f 100644 --- a/solutions/clickhouse-cdc/README.md +++ b/solutions/clickhouse-cdc/README.md @@ -1,6 +1,6 @@ # clickhouse-cdc -### PostgreSQL® -> Debezium -> Apache Kafka® -> ClickHouse® +**PostgreSQL® -> Debezium -> Apache Kafka® -> ClickHouse®** This demo showcases a solution for real-time data integration and analytics using PostgreSQL, Debezium, Apache Kafka, and ClickHouse. The workflow involves capturing changes (supports insert, update and delete) from a PostgreSQL source table, streaming the changes into Kafka using Debezium, integrating Kafka topics into ClickHouse via ClickHouse Kafka Engine, and finally, leveraging ClickHouse's **ReplacingMergeTree** engine with **Materialized View** to isolate and efficiently manage data for different regions. @@ -17,14 +17,14 @@ This demo showcases a solution for real-time data integration and analytics usin - Update the `PROJECT` and `SERVICE` in `lab.env` -Run the following to setup required services and download clickhouse CLI. +Run the following to setup required services and download `clickhouse` CLI. ```bash cd terraform && terraform apply && cd .. ./lab.sh setup ``` -Run the following to create clickhouse databases, tables materialized views and load sample data into postgres. +Run the following to create clickhouse databases, tables materialized views and load sample data into postgres. This step would create `mv-${ROLE}.sql` for creating materizled views in ClickHouse. ```bash ./lab.sh reset @@ -36,6 +36,256 @@ Run the following to delete all the resources created in this demo. ./lab.sh teardown ``` +## Successful Demo Run + +### Run terraform to setup services + +```bash +cd terraform && terraform apply && cd .. +data.external.env: Reading... +. . . +Plan: 6 to add, 0 to change, 0 to destroy. + +Do you want to perform these actions? + Terraform will perform the actions described above. + Only 'yes' will be accepted to approve. + + Enter a value: yes + +aiven_clickhouse.clickhouse: Creating... +aiven_pg.pg: Creating... +aiven_kafka.kafka: Creating... +. . . +Apply complete! Resources: 6 added, 0 changed, 0 destroyed. + +``` + +### Configure security and setup databases and tables + +```bash +./lab.sh setup +Downloaded to directory '.': CA certificate, certificate, key + +✅ certificates and keys downloaded from cdc-kafka + +cdc-kafka-felixwu-demo.a.aivencloud.com:24949 +✅ kcat.config setup completed + +ERROR: database "middleearth" already exists +You are now connected to database "middleearth" as user "avnadmin". +CREATE TABLE +CREATE TABLE +ALTER TABLE +ALTER TABLE +✅ pg_tables.sql imported into postgres cdc-pg +. . . +Processed rows: 1 +``` + +### Reset all test data and re-populate 10 rows of random data + +```bash +./lab.sh reset +Reset all test data in postgres cdc-pg and clickhouse cdc-clickhouse...You are now connected to database "middleearth" as user "avnadmin". +. . . +✅ rivendell created in clickhouse cdc-clickhouse +✅ mv-rivendell.sql created successfully. +. . . +✅ shire created in clickhouse cdc-clickhouse +✅ mv-shire.sql created successfully. +. . . +Generating 10 entries... +\c middleearth; +INSERT INTO population (region, total) VALUES (11, 1493); +. . . +INSERT INTO weather (region, temperature) VALUES (11, 35.82); +. . . +INSERT 0 1 +``` + +### Use kcat to watch Debezium CDC into Kafka topic + +```bash +kcat -F kcat.config -t middleearth-replicator.public.population +% Reading configuration from file kcat.config +% Auto-selecting Consumer mode (use -P or -C to override) +{"before.id":0,"before.region":null,"before.total":null,"after.id":1,"after.region":11,"after.total":1493,"source.version":"1.9.7.aiven","source.connector":"postgresql","source.name":"middleearth-replicator","source.ts_ms":1706573591899,"source.snapshot":"true","source.db":"middleearth","source.sequence":"[null,\"150998520\"]","source.schema":"public","source.table":"population","source.txId":913,"source.lsn":150998520,"source.xmin":null,"op":"r","ts_ms":1706573592198,"transaction.id":null,"transaction.total_order":null,"transaction.data_collection_order":null} +. . . +% Reached end of topic middleearth-replicator.public.population [2] at offset 5 +``` + +### Check test data has been loaded into PostgreSQL® + +```bash +./lab.sh psql +. . . +defaultdb=> \c middleearth; +middleearth=> \dt + List of relations + Schema | Name | Type | Owner +--------+------------+-------+---------- + public | population | table | avnadmin + public | weather | table | avnadmin +(2 rows) + +middleearth=> select * from weather; + id | region | temperature +----+--------+------------- + 1 | 11 | 35.82 + 2 | 10 | 42.12 + 3 | 11 | 10.28 + 4 | 10 | 42.38 + 5 | 11 | -9.75 + 6 | 10 | 25.8 + 7 | 10 | 16.16 + 8 | 11 | -17.82 + 9 | 11 | 8.92 + 10 | 10 | 2.42 +(10 rows) + +middleearth=> select * from population; + id | region | total +----+--------+------- + 1 | 11 | 1493 + 2 | 10 | 174 + 3 | 11 | 1160 + 4 | 10 | 1127 + 5 | 11 | 1103 + 6 | 10 | 1264 + 7 | 10 | 1529 + 8 | 11 | 1386 + 9 | 11 | 1442 + 10 | 10 | 1912 +(10 rows) +``` + +### Check test data has been synced into ClickHouse® + +```bash +./lab.sh clickhouse +. . . +cdc-clickhouse-1 :) show databases; +SHOW DATABASES +Query id: f2bea58f-70b5-4085-99dc-ea0d4ecf6fff +┌─name───────────────┐ +│ INFORMATION_SCHEMA │ +│ default │ +│ information_schema │ +│ rivendell │ +│ service_cdc-kafka │ +│ shire │ +│ system │ +└────────────────────┘ +7 rows in set. Elapsed: 0.001 sec. + +cdc-clickhouse-1 :) select * from `shire`.weather final order by id; +. . . +┌─id─┬──────────────────────ts─┬─region─┬─temperature─┬───version─┬─deleted─┐ +│ 1 │ 2024-01-30 00:13:12.224 │ 11 │ 35.82 │ 150998520 │ 0 │ +│ 3 │ 2024-01-30 00:13:12.225 │ 11 │ 10.28 │ 150998520 │ 0 │ +│ 5 │ 2024-01-30 00:13:12.225 │ 11 │ -9.75 │ 150998520 │ 0 │ +│ 8 │ 2024-01-30 00:13:12.228 │ 11 │ -17.82 │ 150998520 │ 0 │ +│ 9 │ 2024-01-30 00:13:12.228 │ 11 │ 8.92 │ 150998520 │ 0 │ +└────┴─────────────────────────┴────────┴─────────────┴───────────┴─────────┘ +5 rows in set. Elapsed: 0.002 sec. + +cdc-clickhouse-1 :) select * from `rivendell`.weather final order by id; +. . . +┌─id─┬──────────────────────ts─┬─region─┬─temperature─┬───version─┬─deleted─┐ +│ 2 │ 2024-01-30 00:13:12.225 │ 10 │ 42.12 │ 150998520 │ 0 │ +│ 4 │ 2024-01-30 00:13:12.225 │ 10 │ 42.38 │ 150998520 │ 0 │ +│ 6 │ 2024-01-30 00:13:12.227 │ 10 │ 25.8 │ 150998520 │ 0 │ +│ 7 │ 2024-01-30 00:13:12.228 │ 10 │ 16.16 │ 150998520 │ 0 │ +│ 10 │ 2024-01-30 00:13:12.228 │ 10 │ 2.42 │ 150998520 │ 0 │ +└────┴─────────────────────────┴────────┴─────────────┴───────────┴─────────┘ +5 rows in set. Elapsed: 0.002 sec. +``` + +### Test UPDATE from PostgreSQL® + +```bash +middleearth=> update weather set temperature=99.99 where id > 3; +UPDATE 7 +``` + +### Validate updated values are in ClickHouse® + +```bash +cdc-clickhouse-1 :) select * from `shire`.weather final order by id; +. . . +┌─id─┬──────────────────────ts─┬─region─┬─temperature─┬───version─┬─deleted─┐ +│ 1 │ 2024-01-30 00:13:12.224 │ 11 │ 35.82 │ 150998520 │ 0 │ +│ 3 │ 2024-01-30 00:13:12.225 │ 11 │ 10.28 │ 150998520 │ 0 │ +│ 5 │ 2024-01-30 00:31:39.817 │ 11 │ 99.99 │ 201328456 │ 0 │ +│ 8 │ 2024-01-30 00:31:39.818 │ 11 │ 99.99 │ 201328768 │ 0 │ +│ 9 │ 2024-01-30 00:31:39.818 │ 11 │ 99.99 │ 201328872 │ 0 │ +└────┴─────────────────────────┴────────┴─────────────┴───────────┴─────────┘ + +5 rows in set. Elapsed: 0.003 sec. + +cdc-clickhouse-1 :) select * from `rivendell`.weather final order by id; +. . . +┌─id─┬──────────────────────ts─┬─region─┬─temperature─┬───version─┬─deleted─┐ +│ 2 │ 2024-01-30 00:13:12.225 │ 10 │ 42.12 │ 150998520 │ 0 │ +│ 4 │ 2024-01-30 00:31:39.816 │ 10 │ 99.99 │ 201328352 │ 0 │ +│ 6 │ 2024-01-30 00:31:39.817 │ 10 │ 99.99 │ 201328560 │ 0 │ +│ 7 │ 2024-01-30 00:31:39.818 │ 10 │ 99.99 │ 201328664 │ 0 │ +│ 10 │ 2024-01-30 00:31:39.819 │ 10 │ 99.99 │ 201328976 │ 0 │ +└────┴─────────────────────────┴────────┴─────────────┴───────────┴─────────┘ +5 rows in set. Elapsed: 0.002 sec. +``` + +### Test DELETE from PostgreSQL® + +```bash +middleearth=> delete from weather where id > 3; +DELETE 7 +``` + +### Validate deleted values are no longer in ClickHouse® + +```bash +cdc-clickhouse-1 :) select * from `shire`.weather final order by id; +. . . +┌─id─┬──────────────────────ts─┬─region─┬─temperature─┬───version─┬─deleted─┐ +│ 1 │ 2024-01-30 00:13:12.224 │ 11 │ 35.82 │ 150998520 │ 0 │ +│ 3 │ 2024-01-30 00:13:12.225 │ 11 │ 10.28 │ 150998520 │ 0 │ +└────┴─────────────────────────┴────────┴─────────────┴───────────┴─────────┘ +2 rows in set. Elapsed: 0.002 sec. + +cdc-clickhouse-1 :) select * from `rivendell`.weather final order by id; +. . . +┌─id─┬──────────────────────ts─┬─region─┬─temperature─┬───version─┬─deleted─┐ +│ 2 │ 2024-01-30 00:13:12.225 │ 10 │ 42.12 │ 150998520 │ 0 │ +└────┴─────────────────────────┴────────┴─────────────┴───────────┴─────────┘ +1 row in set. Elapsed: 0.002 sec. +``` + +### Validate users only have permission to assigned database in ClickHouse® + +```bash +./clickhouse client --user sam --host **********.a.aivencloud.com --port 24947 --secure +Connecting to **********..a.aivencloud.com:24947 as user sam. +. . . +cdc-clickhouse-1 :) show databases; +. . . +┌─name──┐ +│ shire │ +└───────┘ +1 row in set. Elapsed: 0.001 sec. +cdc-clickhouse-1 :) select * from `shire`.weather final order by id; +. . . +┌─id─┬──────────────────────ts─┬─region─┬─temperature─┬───version─┬─deleted─┐ +│ 1 │ 2024-01-30 00:13:12.224 │ 11 │ 35.82 │ 150998520 │ 0 │ +│ 3 │ 2024-01-30 00:13:12.225 │ 11 │ 10.28 │ 150998520 │ 0 │ +└────┴─────────────────────────┴────────┴─────────────┴───────────┴─────────┘ +2 rows in set. Elapsed: 0.002 sec. +cdc-clickhouse-1 :) select * from `rivendell`.weather final order by id; +. . . +Received exception from server (version 23.8.8): +Code: 497. DB::Exception: Received from **********.a.aivencloud.com:24947. DB::Exception: sam: Not enough privileges. To execute this query, it's necessary to have the grant SELECT(id, ts, region, temperature, version, deleted) ON rivendell.weather. (ACCESS_DENIED) +``` + ## References - [Create a Debezium source connector from PostgreSQL® to Apache Kafka®](https://docs.aiven.io/docs/products/kafka/kafka-connect/howto/debezium-source-connector-pg) diff --git a/solutions/clickhouse-cdc/lab.sh b/solutions/clickhouse-cdc/lab.sh index 811fab2..98d4504 100755 --- a/solutions/clickhouse-cdc/lab.sh +++ b/solutions/clickhouse-cdc/lab.sh @@ -62,9 +62,6 @@ lab_setup() { lab_teardown() { rm -f ca.pem service.cert service.key os-connector.json kcat.config cd terraform && terraform destroy -# echo ${SERVICE_KAFKA} | avn service terminate ${SERVICE_KAFKA} -# echo ${SERVICE_CH} | avn service terminate ${SERVICE_CH} -# echo ${SERVICE_PG} | avn service terminate ${SERVICE_PG} } lab_pgload() { diff --git a/solutions/clickhouse-cdc/mv.sql.template b/solutions/clickhouse-cdc/mv.sql.template index c8d5563..3b4022e 100644 --- a/solutions/clickhouse-cdc/mv.sql.template +++ b/solutions/clickhouse-cdc/mv.sql.template @@ -62,4 +62,3 @@ WHERE region = region_id AND ((op = 'c') OR (op = 'r') OR (op = 'u') OR (op = 'd CREATE role OR REPLACE role_name; GRANT SELECT ON role_name.* TO role_name; -GRANT SELECT ON role_name.* TO role_name; diff --git a/solutions/clickhouse-cdc/terraform/cdc.tf b/solutions/clickhouse-cdc/terraform/cdc.tf index 1679f7b..1fad001 100644 --- a/solutions/clickhouse-cdc/terraform/cdc.tf +++ b/solutions/clickhouse-cdc/terraform/cdc.tf @@ -50,12 +50,12 @@ resource "aiven_kafka_connector" "kafka-pg-source" { "database.ssl.mode" = "require" "include.schema.changes" = true "include.query" = true - "plugin.name" = "wal2json" + # "plugin.name" = "wal2json" # tables needs to be specified for pgoutput plugin # see details https://docs.aiven.io/docs/products/kafka/kafka-connect/howto/debezium-source-connector-pg# - # "plugin.name" = "pgoutput" - # "publication.autocreate.mode" = "filtered" - # "table.include.list" = "public.population,public.weather" + "plugin.name" = "pgoutput" + "publication.autocreate.mode" = "filtered" + "table.include.list" = "public.population,public.weather" "slot.name" = "dbz" "decimal.handling.mode" = "double" "_aiven.restart.on.failure" = "true"