Skip to content

Commit

Permalink
clickhouse-cdc updated terraform depends_on and various fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
runwuf committed Jan 29, 2024
1 parent 2cf923e commit 8afd41a
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 63 deletions.
10 changes: 5 additions & 5 deletions solutions/clickhouse-cdc/ch_users.sql
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
CREATE USER OR REPLACE frodo IDENTIFIED BY 'your_password';
CREATE USER OR REPLACE frodo IDENTIFIED BY 'password';

CREATE USER OR REPLACE sam IDENTIFIED BY 'your_password';
CREATE USER OR REPLACE sam IDENTIFIED BY 'password';

CREATE USER OR REPLACE arwen IDENTIFIED BY 'your_password';
CREATE USER OR REPLACE arwen IDENTIFIED BY 'password';

CREATE USER OR REPLACE elrond IDENTIFIED BY 'your_password';
CREATE USER OR REPLACE elrond IDENTIFIED BY 'password';

GRANT shire to frodo;

GRANT shire to sam;

GRANT rivendell to arwen;

GRANT rivendell to elrond;
GRANT rivendell to elrond;
73 changes: 35 additions & 38 deletions solutions/clickhouse-cdc/lab.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,58 +26,55 @@ lab_psql() {
psql ${PG_SERVICE_URI} $@
}

lab_setuppg() {
echo
PG_SERVICE_URI=$(avn service get ${SERVICE_PG} --json | jq -r '.service_uri')
cat pg_tables.sql | psql ${PG_SERVICE_URI} \
&& printf "" || echo ""
echo "pg_tables.sql imported into postgres ${SERVICE_PG}"
}

lab_setup() {
avn service user-creds-download ${SERVICE_KAFKA} --username avnadmin -d . \
&& printf "" || echo ""
echo "certificates and keys downloaded from ${SERVICE_KAFKA}"

echo
KAFKA_SERVICE_URI=$(avn service list --json ${SERVICE_KAFKA} | jq -r '.[].service_uri')
echo ${KAFKA_SERVICE_URI}
cat kcat.config.template > kcat.config
sed -i '' -e "s/address:port/${KAFKA_SERVICE_URI}/" kcat.config \
&& printf "" || echo ""
echo "kcat.config setup completed"

echo
PG_SERVICE_URI=$(avn service get ${SERVICE_PG} --json | jq -r '.service_uri')
cat pg_tables.sql | psql ${PG_SERVICE_URI} \
&& printf "" || echo ""
echo "pg_tables.sql imported into postgres ${SERVICE_PG}"
setup_env
avn service user-creds-download ${SERVICE_KAFKA} --username avnadmin -d . \
&& printf "" || echo ""
echo "certificates and keys downloaded from ${SERVICE_KAFKA}"

echo
KAFKA_SERVICE_URI=$(avn service list --json ${SERVICE_KAFKA} | jq -r '.[].service_uri')
echo ${KAFKA_SERVICE_URI}
cat kcat.config.template > kcat.config
sed -i '' -e "s/address:port/${KAFKA_SERVICE_URI}/" kcat.config \
&& printf "" || echo ""
echo "kcat.config setup completed"

echo
PG_SERVICE_URI=$(avn service get ${SERVICE_PG} --json | jq -r '.service_uri')
cat pg_tables.sql | psql ${PG_SERVICE_URI} \
&& printf "" || echo ""
echo "pg_tables.sql imported into postgres ${SERVICE_PG}"

[ -e "./clickhouse" ] || curl https://clickhouse.com/ | sh
[ -e "./clickhouse" ] || curl https://clickhouse.com/ | sh

# echo
# OS_SERVICE_URI=$(avn service get ${SERVICE_OS} --json | jq -r '.service_uri')
# curl -X PUT ${OS_SERVICE_URI}/suspecious-logins -H 'Content-Type: application/json' --data @suspecious-logins-mapping.json \
# && printf "\n✅ " || echo "❌ "
# echo "suspecious-logins index mapping created in opensearch ${SERVICE_OS}"
${CH_CLI} --queries-file ./ch_mv.sql --progress=tty --processed-rows --echo -t 2>&1
}

lab_teardown() {
rm -f ca.pem service.cert service.key os-connector.json kcat.config
cd terraform && terraform destroy
# echo ${SERVICE_KAFKA} | avn service terminate ${SERVICE_KAFKA}
# echo ${SERVICE_CH} | avn service terminate ${SERVICE_CH}
# echo ${SERVICE_PG} | avn service terminate ${SERVICE_PG}
}

lab_pgload() {
setup_env
# while true; do
# num_entries=$((1 + RANDOM % 100))
num_entries=$1
num_entries=$1

echo "---${num_entries} entries---"
echo "Generating ${num_entries} entries..."
SQL="\c middleearth;\n"
for _ in $(seq $num_entries); do
# time_stamp=$(date +%s)
# user_id=$((190 + RANDOM % 10))
# action=("login" "attempt")
# random_action=${action[RANDOM % ${#action[@]}]}
# source_ip="192.168.123.16$((RANDOM % 10))"

# echo "{\"time_stamp\": $time_stamp, \"user_id\": $user_id, \"action\": \"$random_action\", \"source_ip\": \"$source_ip\"}" | kcat -T -F kcat.config -P -t test00

region="1$((RANDOM % 2))"
total="1$((RANDOM % 1000))"
Expand All @@ -87,12 +84,9 @@ lab_pgload() {
WSQL+="INSERT INTO weather (region, temperature) VALUES (${region}, ${temperature});\n"
done
SQL+=${PSQL}${WSQL};
# SQL+=" | psql ${PG_SERVICE_URI}
# sleep 1

printf "${SQL}"
printf "${SQL}" | psql ${PG_SERVICE_URI}
# sleep 10;
# done
}

lab_chmv() {
Expand All @@ -112,6 +106,7 @@ lab_chmv() {

lab_reset() {
setup_env
printf "Reset all test data in postgres ${SERVICE_PG} and clickhouse ${SERVICE_CH}..."
printf "\c middleearth;\nDELETE FROM population;\nDELETE FROM weather;\n" | psql ${PG_SERVICE_URI}
# ${CH_CLI} --queries-file ./ch_drop.sql --progress=tty --processed-rows --echo -t 2>&1

Expand All @@ -134,12 +129,14 @@ case $1 in
lab_reset ;;
setup)
lab_setup ;;
setuppg)
lab_setuppg ;;
teardown)
lab_teardown ;;
pgload)
lab_pgload "${@:2}" ;;
chmv)
lab_chmv "${@:2}" ;;
*)
printf "Usage: ./lab.sh [ setup | pgload n | teardown]\n" ;;
printf "Usage: ./lab.sh [ setup | clickhouse | psql | pgload n | reset | teardown]\n" ;;
esac
3 changes: 3 additions & 0 deletions solutions/clickhouse-cdc/mv.sql.template
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
-- ReplacingMergeTree
-- https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/replacingmergetree

CREATE TABLE `role_name`.population (
`id` UInt64,
`ts` DateTime64,
Expand Down
27 changes: 7 additions & 20 deletions solutions/clickhouse-cdc/terraform/cdc.tf
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ resource "aiven_pg" "pg" {
}

resource "aiven_pg_database" "pgdb" {
depends_on = [aiven_pg.pg]
project = var.aiven_project
service_name = "cdc-pg"
database_name = "middleearth"
Expand All @@ -30,23 +31,8 @@ resource "aiven_kafka" "kafka" {
}
}

# resource "aiven_kafka_topic" "source" {
# project = aiven_kafka.kafka.project
# service_name = aiven_kafka.kafka.service_name
# partitions = 2
# replication = 3
# topic_name = "source_topic"
# }

# resource "aiven_kafka_topic" "sink" {
# project = aiven_kafka.kafka.project
# service_name = aiven_kafka.kafka.service_name
# partitions = 2
# replication = 3
# topic_name = "sink_topic"
# }

resource "aiven_kafka_connector" "kafka-pg-source" {
depends_on = [aiven_pg_database.pgdb]
project = var.aiven_project
service_name = aiven_kafka.kafka.service_name
connector_name = "kafka-pg-source"
Expand All @@ -64,12 +50,12 @@ resource "aiven_kafka_connector" "kafka-pg-source" {
"database.ssl.mode" = "require"
"include.schema.changes" = true
"include.query" = true
# "plugin.name" = "wal2json"
"plugin.name" = "pgoutput"
"plugin.name" = "wal2json"
# tables needs to be specified for pgoutput plugin
# see details https://docs.aiven.io/docs/products/kafka/kafka-connect/howto/debezium-source-connector-pg#
"publication.autocreate.mode" = "filtered"
"table.include.list" = "public.population,public.weather"
# "plugin.name" = "pgoutput"
# "publication.autocreate.mode" = "filtered"
# "table.include.list" = "public.population,public.weather"
"slot.name" = "dbz"
"decimal.handling.mode" = "double"
"_aiven.restart.on.failure" = "true"
Expand All @@ -86,6 +72,7 @@ resource "aiven_clickhouse" "clickhouse" {
}

resource "aiven_service_integration" "clickhouse_kafka_source" {
depends_on = [aiven_kafka_connector.kafka-pg-source]
project = var.aiven_project
integration_type = "clickhouse_kafka"
source_service_name = aiven_kafka.kafka.service_name
Expand Down

0 comments on commit 8afd41a

Please sign in to comment.