(nsconfluent"Functions to start and stop ZooKeeper and Kafka. These functions require the Confluent Platform CLI which can be obtained from ``. WARNING: Quitting the REPL will not stop ZooKeeper and Kafka. Before exiting, you must invoke `confluent/stop`. Otherwise, run `confluent destroy` from the command line."
(:require [clojure.string :as str]
[ :refer [sh]]))
(defnnot-up"Takes `service` and returns true if the service is down"
(->> (:out (sh"confluent""status"))
(keep (fn [x] (re-find (re-pattern (str service " is.*")) x)))
(defnstop"Starts ZooKeeper and Kafka."
(println"schema-registry is down")
(println"kafka is down")
(println"zookeeper is down"))
(defnstart"Starts ZooKeeper and Kafka."
(with-out-str (stop))
(doseq [s ["zookeeper""kafka""schema-registry"]]
(do (while (not-up s)
(sh"confluent""start" s)
(println s "is up"))))
(defnreset"Stops and starts ZooKeeper and Kafka."
Start confluent platform
zookeeper is up
kafka is up
schema-registry is up
Helpful helpers
(nsuser"your lovely home"
(:require [ :refer [sh]]
[jackdaw.client :as jc]
[jackdaw.client.log :as jcl]
[jackdaw.admin :as ja]
[jackdaw.serdes.edn :as jse]
[jackdaw.streams :as j]
(:import org.apache.kafka.common.serialization.Serdes))
;;; ------------------------------------------------------------;;;;;; Configure topics;;;
(defntopic-config"Takes a topic name and (optionally) key and value serdes and a partition count, and returns a topic configuration map, which may be used to create a topic or produce/consume records."
(topic-config topic-name (jse/serde)))
([topic-name value-serde]
(topic-config topic-name (jse/serde) value-serde))
([topic-name key-serde value-serde]
(topic-config topic-name 1 key-serde value-serde))
([topic-name partition-count key-serde value-serde]
{:topic-name topic-name
:partition-count partition-count
:replication-factor1:topic-config {}
:key-serde key-serde
:value-serde value-serde}))
;;; ------------------------------------------------------------;;;;;; Create, delete and list topics;;;
(defncreate-topics"Takes a list of topics and creates these using the names given."
(with-open [client (ja/->AdminClient (kafka-admin-client-config))]
(ja/create-topics! client topic-config-list)))
(defnre-delete-topics"Takes an instance of java.util.regex.Pattern and deletes any Kafka topics that match."
(with-open [client (ja/->AdminClient (kafka-admin-client-config))]
(let [topics-to-delete (->> (ja/list-topics client)
(filter #(re-find re (:topic-name %))))]
(ja/delete-topics! client topics-to-delete))))
(defncreate-topic"Takes a single topic config and creates a Kafka topic."
(create-topics [topic-config]))
(defnlist-topics"Returns a list of Kafka topics."
(with-open [client (ja/->AdminClient (kafka-admin-client-config))]
(ja/list-topics client)))
(defntopic-exists?"Takes a topic name and returns true if the topic exists."
(with-open [client (ja/->AdminClient (kafka-admin-client-config))]
(ja/topic-exists? client topic-config)))
;;; ------------------------------------------------------------;;;;;; Produce and consume records;;;
{"bootstrap.servers""localhost:9092""" group-id
(defnpublish"Takes a topic config and record value, and (optionally) a key and parition number, and produces to a Kafka topic."
([topic-config value]
(with-open [client (jc/producer (kafka-producer-config) topic-config)]
@(jc/produce! client topic-config value))
([topic-config key value]
(with-open [client (jc/producer (kafka-producer-config) topic-config)]
@(jc/produce! client topic-config key value))
([topic-config partition key value]
(with-open [client (jc/producer (kafka-producer-config) topic-config)]
@(jc/produce! client topic-config partition key value))
(defnget-records"Takes a topic config, consumes from a Kafka topic, and returns a seq of maps."
(get-records topic-config 200))
([topic-config polling-interval-ms]
(let [client-config (kafka-consumer-config
(str (java.util.UUID/randomUUID)))]
(with-open [client (jc/subscribed-consumer client-config
(doall (jcl/log client 100 seq))))))
(defnget-keyvals"Takes a topic config, consumes from a Kafka topic, and returns a seq of key-value pairs."
(get-keyvals topic-config 20))
([topic-config polling-interval-ms]
(map (juxt:key:value) (get-records topic-config polling-interval-ms))))
;;; ------------------------------------------------------------;;;;;; System;;;
(nspipe"This tutorial contains a simple stream processing application using Jackdaw and Kafka Streams. Pipe reads from a Kafka topic called `input`, logs the key and value, and writes these to a Kafka topic called `output`."
(:require [clojure.string :as str]
[ :refer [info]]
[jackdaw.streams :as j]
[jackdaw.serdes.edn :as jse])
(:import [org.apache.kafka.common.serialization Serdes]))
(defntopic-config"Takes a topic name and returns a topic configuration map, which may be used to create a topic or produce/consume records."
{:topic-name topic-name
:partition-count1:replication-factor1:key-serde (jse/serde)
:value-serde (jse/serde)})
(defnapp-config"Returns the application config."
(defnbuild-topology"Reads from a Kafka topic called `input`, logs the key and value, and writes these to a Kafka topic called `output`. Returns a topology builder."
(-> (j/kstream builder (topic-config"input"))
(j/peek (fn [[k v]]
(info (str {:key k :value v}))))
(j/to (topic-config"output")))
(defnstart-app"Starts the stream processing application."
(let [builder (j/streams-builder)
topology (build-topology builder)
app (j/kafka-streams topology app-config)]
(j/start app)
(info"pipe is up")
(defnstop-app"Stops the stream processing application."
(j/close app)
(info"pipe is down"))
[& _]
(start-app (app-config)))
Define topology start stop
(require '[pipe])
(defnstop-pipe"Stops the app, and deletes topics and internal state."
(when (and system (:pipe-app system))
(pipe/stop-app (:pipe-app system)))
(alter-var-root #'system merge {:pipe-appnil}))
(defnstart-pipe"Creates topics, and starts the app."
(create-topics (map pipe/topic-config ["input""output"]))
(alter-var-root #'system merge {:pipe-app (pipe/start-app (pipe/app-config))}))
Start/reset topology state
#object[org.apache.kafka.streams.KafkaStreams 0x2855d41f "org.apache.kafka.streams.KafkaStreams@2855d41f"]}
digraph {
node [shape=box];
e [label="Events"]
us [label="User Sources"]
evs [label="Events by Source"]
evus [label="Events by user and source"]
s [label="User stats"]
e -> evs
evs -> evus
us -> evus
evus -> s
Define topology
(:require [clojure.string :as str]
[ :refer [info]]
[jackdaw.streams :as j]
[jackdaw.serdes.edn :as jse])
(:import [org.apache.kafka.common.serialization Serdes]))
(defntopic-config"Takes a topic name and returns a topic configuration map, which may be used to create a topic or produce/consume records."
{:topic-name topic-name
:partition-count1:replication-factor1:key-serde (jse/serde)
:value-serde (jse/serde)})
(defnapp-config"Returns the application config."
(let [event-stream (j/kstream builder (topic-config"events"))
user-sources-table (j/ktable builder (topic-config"user-sources"))
events-by-source (-> event-stream
(j/map (fn [[_ v]]
[(:source-id v) v]))
(j/through (topic-config"events-by-source")))
events-by-user-and-source (-> events-by-source
(j/left-join user-sources-table
(fn [event user-source]
(merge event user-source))
(j/map (fn [[_ v]]
[[(:user-id v) (:source-id v)] v]))
(j/through (topic-config"events-by-user-and-source")))]
(-> events-by-user-and-source
(j/group-by-key (topic-config""))
(j/aggregate (constantly {:count0:sum0})
(fn [acc [k v]]
(-> acc
(update:count inc)
(update:sum #(+ % (:value v)))
(merge (select-keys v [:name:user-id]))))
(j/to (topic-config"user-stats")))
(defnstart-app"Starts the stream processing application."
(let [builder (j/streams-builder)
topology (build-topology builder)
app (j/kafka-streams topology app-config)]
(j/start app)
(info"flex is up")
(defnstop-app"Stops the stream processing application."
(j/close app)
(info"flex is down"))
[& _]
(start-app (app-config)))
Define topology start stop
(require '[flex])
(defnstop-flex"Stops the app, and deletes topics and internal state."
(when (and system (:flex-app system))
(flex/stop-app (:flex-app system))
(.cleanUp (:flex-app system)) ;; clears internal state topics
(alter-var-root #'system merge {:flex-appnil}))
(defnstart-flex"Creates topics, and starts the app."
(create-topics (map flex/topic-config ["events""events-by-source""events-by-user-and-source""user-sources""user-stats"]))
(alter-var-root #'system merge {:flex-app (flex/start-app (flex/app-config))}))
Start/reset topology state
#object[org.apache.kafka.streams.KafkaStreams 0x12441536 "org.apache.kafka.streams.KafkaStreams@12441536"]}
