Hello all! This repository contains a set of instructions explaining how to work with a combination of Apache Kafka and ClickHouse. In particular, we'll look at how to send data from an Apache Kafka topic to a table in ClickHouse, how to transform the data with the help of ClickHouse materialized views and how to run aggregation requests.
We'll take the educational system of Hogwarts as an idea for our scenario. Every time a student enters a classroom an event is generated and sent to an Apache Kafka topic. The data is accumulated over years. We would like to send the raw data from this topic into ClickHouse for long term storage, shape it and send requests for data aggregation to efficiently analyse past data.
To follow these steps you'll need running instances of Apache Kafka and ClickHouse. I'll be running both of them on my local machine.
You can find instructions on how to set up both of the instances in the quick start for ClickHouse and quick start for Apache Kafka
If you're using a mac machine, you can also use the instructions below:
- Install Apache Kafka with brew. I used kafka homebrew formula.
- Start Zookeeper by running
/usr/local/bin/zookeeper-server-start /usr/local/etc/zookeeper/zoo.cfg
. - Start Apache Kafka by running
/usr/local/bin/kafka-server-start /usr/local/etc/kafka/server.properties
. - Install kcat. We'll use this tool to send data into the topic.
- This repository contains a simple kcat.config, which will be enough if you're using Apache Kafka locally. Make sure that you're located at the repository's directory when running kcat commands.
You're ready to create and populate topics.
- Pull Docker image
docker pull clickhouse/clickhouse-server
, read more in docker hub reference page. - Start server instance
docker run -it --rm --link some-clickhouse-server:clickhouse-server clickhouse/clickhouse-client --host clickhouse-server
. - Most convenient way to run SQL queries is to use ClickHouse native client . To connect to native client run
docker run -it --rm --link some-clickhouse-server:clickhouse-server clickhouse/clickhouse-client --host clickhouse-server
.
You're ready to send requests to ClickHouse server, for example try
SHOW TABLES
In this repository you can find two files containing data for our experiments: events_years_2_12.ndjson.zip and events_years_2_12.ndjson.zip. Unzip them to retrieve ndjson files. NDJSON stands for Newline Delimited JSON and is used to store streaming structured data.
- events_years_2_12.ndjson contains data for years 2002 - 2012 (2 615 075 items from 1030867200000 till 1368453600000)
- events_years_2_12.ndjson contains data for years 2013 - 2022 (2 649 615 items from 1378022400000 till 1652450400000)
Data is based on following assumptions and simplifications: There are 18 subjects, 3 classes per day. Educational year starts in September and finishes in May. Each student spends 7 years in Hogwarts.
- Create Apache Kafka topic
kafka-topics --bootstrap-server localhost:9092 --topic entry-events --create
. - Populate topic with the content of the first file events_years_2_12.ndjson by running
kcat -F kcat.config -P -t entry-events < events_years_2_12.ndjson
. This will add first half of our data as a bulk. - Run send_data, this script will send messages from the second file one by one, imitating a data flow into the topic.
We'll use a built-in ClickHouse engine for Apache Kafka and a materialized view <https://clickhouse.com/docs/en/guides/developer/working-with-json/json-other-approaches#using-materialized-views> .
- In the ClickHouse client run SQL statement to create a Kafka engine table. For data format we use JSONAsString to have a granular control on transforming every property. Alternatively you can try JSONEachRow.
CREATE TABLE entry_events_queue
(
`message` String
)
ENGINE = Kafka
SETTINGS
kafka_broker_list = 'host.docker.internal:9092',
kafka_topic_list = 'entry-events',
kafka_group_name = 'group1',
kafka_format = 'JSONAsString'
- Create the destination table where the data should be stored
CREATE TABLE student_entry_events
(
`timestamp` DateTime,
`subject` String,
`teacher` String,
`room` String,
`points` Int8,
`student` Tuple(name String, house String)
)
ENGINE = MergeTree
ORDER BY timestamp
- Create materialised view to establish connection between the Kafka Engine and the destination table:
CREATE MATERIALIZED VIEW materialized_view TO student_entry_events
AS SELECT
fromUnixTimestamp64Milli(JSONExtractUInt(message, 'timestamp')) AS timestamp,
JSONExtractString(message, 'subject') AS subject,
JSONExtractString(message, 'teacher') AS teacher,
JSONExtractString(message, 'room') AS room,
toInt8(JSONExtractInt(message, 'points')) AS points,
JSONExtract(message, 'student', 'Tuple(String,String)') AS student
FROM entry_events_queue
- Test that you have the data:
SELECT count(*) FROM student_entry_events
SELECT student.house as house, sum(points)
FROM default.student_entry_events
GROUP BY student.house
In this step our goal is to transform and aggregate data coming from student_entry_events
(source table), and store new information in a table ``class_attendance_granular``(destination table).
Because the data is continuously flowing into the source table, we need to be careful not to miss any items when processing requests for the destination table. To overcome this challenge, we'll select a timestamp in the future. Based on this timestamp we create a materialized view, and the old items we'll copy with the insert with the help of INSERT statement.
- Create a new destination table of a type MergeTree
CREATE TABLE class_attendance_granular
(
`timestamp` DateTime,
`subject` String,
`studentCount` UInt16
)
ENGINE = MergeTree
ORDER BY timestamp
- Check what is the timestamp of the latest event in the source table
SELECT timestamp
FROM default.student_entry_events
ORDER BY timestamp DESC
LIMIT 1
- Select a timestamp a bit farther in the future (you can use 1 or 2 days into the future, our data is moving fast enough)
4.Create a materialized view
CREATE MATERIALIZED VIEW default.materialized_view_class_attendance_granular TO default.class_attendance_granular
AS SELECT
timestamp,
subject,
count(student) as studentCount
FROM default.student_entry_events
WHERE timestamp >= 'use-your-future-time-stamp-here'
Group by (timestamp, subject)
ORDER BY timestamp;
- Wait till you cross that date
- Verify that the data is flowing
SELECT count(*) FROM default.class_attendance_granular
You should see low numbers of fresh data coming into the destination table (data starting from your selected timestamp)
- Copy the old data from the source table with a help of INSERT statement
INSERT INTO default.class_attendance_granular
SELECT
timestamp,
subject,
count(student) as studentCount
FROM default.student_entry_events
WHERE timestamp < 'use-your-future-time-stamp-here'
GROUP BY (timestamp, subject)
- Now you can see number of all rows by running
SELECT count(*) FROM default.class_attendance_granular
SELECT * FROM default.class_attendance_granular LIMIT 20
Similar to the previous step, but now using a table that includes aggregate functions. We'll pre-aggregate data about maximum/minimum/average students in a class.
- Create a destination table of type SummingMergeTree
CREATE TABLE class_attendance_daily
(
`day` DateTime,
`subject` String,
`max_intermediate_state` AggregateFunction(max, UInt16),
`min_intermediate_state` AggregateFunction(min, UInt16),
`avg_intermediate_state` AggregateFunction(avg, UInt16)
)
ENGINE = SummingMergeTree
PARTITION BY tuple()
ORDER BY (day, subject)
- Create a materialized view and use maxState
CREATE MATERIALIZED VIEW class_attendance_daily_mv TO class_attendance_daily AS
SELECT
toStartOfDay(timestamp) AS day,
subject,
maxState(studentCount) AS max_intermediate_state,
minState(studentCount) AS min_intermediate_state,
avgState(studentCount) AS avg_intermediate_state
FROM default.class_attendance_granular
WHERE timestamp >= 'use-your-future-time-stamp-here'
GROUP BY
day,
subject
ORDER BY
day ASC,
subject ASC
- The materialized view will only process new records, so if you want to bring old records, run:
INSERT INTO class_attendance_daily
SELECT
toStartOfDay(timestamp) as day,
subject,
maxState(studentCount) AS max_intermediate_state,
minState(studentCount) AS min_intermediate_state,
avgState(studentCount) AS avg_intermediate_state
FROM default.class_attendance_granular
WHERE timestamp < 'use-your-future-time-stamp-here'
GROUP BY day, subject
ORDER BY day, subject
- maxState, minState and avgState calculate intermediate values, and by themselves they don't bring any value. You can try retrieving first 10 lines to see that there is no readable values in those columns.
SELECT * FROM default.class_attendance_daily LIMIT 10
To properly select the aggregated data we need to merge it back:
SELECT
day,
subject,
maxMerge(max_intermediate_state) AS max,
minMerge(min_intermediate_state) AS min,
avgMerge(avg_intermediate_state) AS avg
FROM class_attendance_daily
GROUP BY (day, subject)
ORDER BY (day, subject)
- Official docs for Apache Kafka.
- Official docs for ClickHouse.
- Distinctive Features of ClickHouse.
- How to start working with Aiven for ClickHouse®.
- ClickHouse Kafka engine.
- Using Materialized Views.
- Approximate calculations.
- Array functions.
- Cloudflare experience: ClickHouse Capacity Estimation Framework.
- A variety of example data sets.
- Understanding ClickHouse Data Skipping Indexes.
This work is licensed under the Apache License, Version 2.0. Full license text is available in the LICENSE file and at http://www.apache.org/licenses/LICENSE-2.0.txt