You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
(nsconfluent"Functions to start and stop ZooKeeper and Kafka. These functions require the Confluent Platform CLI which can be obtained from `https://www.confluent.io/download/`. WARNING: Quitting the REPL will not stop ZooKeeper and Kafka. Before exiting, you must invoke `confluent/stop`. Otherwise, run `confluent destroy` from the command line."
(:require [clojure.string :as str]
[clojure.java.shell :refer [sh]]))
(defnnot-up"Takes `service` and returns true if the service is down"
[service]
(->> (:out (sh"confluent""status"))
str/split-lines
(keep (fn [x] (re-find (re-pattern (str service " is.*")) x)))
first
(re-find#"DOWN")
boolean))
(defnstop"Starts ZooKeeper and Kafka."
[]
(sh"confluent""destroy")
(println"schema-registry is down")
(println"kafka is down")
(println"zookeeper is down"))
(defnstart"Starts ZooKeeper and Kafka."
[]
(with-out-str (stop))
(doseq [s ["zookeeper""kafka""schema-registry"]]
(do (while (not-up s)
(sh"confluent""start" s)
(Thread/sleep1000))
(println s "is up"))))
(defnreset"Stops and starts ZooKeeper and Kafka."
[]
(stop)
(start))
Start confluent platform
(confluent/start)
zookeeper is up
kafka is up
schema-registry is up
nil
Helpful helpers
(nsuser"your lovely home"
(:require [clojure.java.shell :refer [sh]]
[jackdaw.client :as jc]
[jackdaw.client.log :as jcl]
[jackdaw.admin :as ja]
[jackdaw.serdes.edn :as jse]
[jackdaw.streams :as j]
[confluent])
(:import org.apache.kafka.common.serialization.Serdes))
;;; ------------------------------------------------------------;;;;;; Configure topics;;;
(defntopic-config"Takes a topic name and (optionally) key and value serdes and a partition count, and returns a topic configuration map, which may be used to create a topic or produce/consume records."
([topic-name]
(topic-config topic-name (jse/serde)))
([topic-name value-serde]
(topic-config topic-name (jse/serde) value-serde))
([topic-name key-serde value-serde]
(topic-config topic-name 1 key-serde value-serde))
([topic-name partition-count key-serde value-serde]
{:topic-name topic-name
:partition-count partition-count
:replication-factor1:topic-config {}
:key-serde key-serde
:value-serde value-serde}))
;;; ------------------------------------------------------------;;;;;; Create, delete and list topics;;;
(defnkafka-admin-client-config
[]
{"bootstrap.servers""localhost:9092"})
(defncreate-topics"Takes a list of topics and creates these using the names given."
[topic-config-list]
(with-open [client (ja/->AdminClient (kafka-admin-client-config))]
(ja/create-topics! client topic-config-list)))
(defnre-delete-topics"Takes an instance of java.util.regex.Pattern and deletes any Kafka topics that match."
[re]
(with-open [client (ja/->AdminClient (kafka-admin-client-config))]
(let [topics-to-delete (->> (ja/list-topics client)
(filter #(re-find re (:topic-name %))))]
(ja/delete-topics! client topics-to-delete))))
(defncreate-topic"Takes a single topic config and creates a Kafka topic."
[topic-config]
(create-topics [topic-config]))
(defnlist-topics"Returns a list of Kafka topics."
[]
(with-open [client (ja/->AdminClient (kafka-admin-client-config))]
(ja/list-topics client)))
(defntopic-exists?"Takes a topic name and returns true if the topic exists."
[topic-config]
(with-open [client (ja/->AdminClient (kafka-admin-client-config))]
(ja/topic-exists? client topic-config)))
;;; ------------------------------------------------------------;;;;;; Produce and consume records;;;
(defnkafka-producer-config
[]
{"bootstrap.servers""localhost:9092"})
(defnkafka-consumer-config
[group-id]
{"bootstrap.servers""localhost:9092""group.id" group-id
"auto.offset.reset""earliest""enable.auto.commit""false"})
(defnpublish"Takes a topic config and record value, and (optionally) a key and parition number, and produces to a Kafka topic."
([topic-config value]
(with-open [client (jc/producer (kafka-producer-config) topic-config)]
@(jc/produce! client topic-config value))
nil)
([topic-config key value]
(with-open [client (jc/producer (kafka-producer-config) topic-config)]
@(jc/produce! client topic-config key value))
nil)
([topic-config partition key value]
(with-open [client (jc/producer (kafka-producer-config) topic-config)]
@(jc/produce! client topic-config partition key value))
nil))
(defnget-records"Takes a topic config, consumes from a Kafka topic, and returns a seq of maps."
([topic-config]
(get-records topic-config 200))
([topic-config polling-interval-ms]
(let [client-config (kafka-consumer-config
(str (java.util.UUID/randomUUID)))]
(with-open [client (jc/subscribed-consumer client-config
[topic-config])]
(doall (jcl/log client 100 seq))))))
(defnget-keyvals"Takes a topic config, consumes from a Kafka topic, and returns a seq of key-value pairs."
([topic-config]
(get-keyvals topic-config 20))
([topic-config polling-interval-ms]
(map (juxt:key:value) (get-records topic-config polling-interval-ms))))
;;; ------------------------------------------------------------;;;;;; System;;;
(defsystemnil)
(nspipe"This tutorial contains a simple stream processing application using Jackdaw and Kafka Streams. Pipe reads from a Kafka topic called `input`, logs the key and value, and writes these to a Kafka topic called `output`."
(:gen-class)
(:require [clojure.string :as str]
[clojure.tools.logging :refer [info]]
[jackdaw.streams :as j]
[jackdaw.serdes.edn :as jse])
(:import [org.apache.kafka.common.serialization Serdes]))
(defntopic-config"Takes a topic name and returns a topic configuration map, which may be used to create a topic or produce/consume records."
[topic-name]
{:topic-name topic-name
:partition-count1:replication-factor1:key-serde (jse/serde)
:value-serde (jse/serde)})
(defnapp-config"Returns the application config."
[]
{"application.id""foo""bootstrap.servers""localhost:9092""cache.max.bytes.buffering""0"})
(defnbuild-topology"Reads from a Kafka topic called `input`, logs the key and value, and writes these to a Kafka topic called `output`. Returns a topology builder."
[builder]
(-> (j/kstream builder (topic-config"input"))
(j/peek (fn [[k v]]
(info (str {:key k :value v}))))
(j/to (topic-config"output")))
builder)
(defnstart-app"Starts the stream processing application."
[app-config]
(let [builder (j/streams-builder)
topology (build-topology builder)
app (j/kafka-streams topology app-config)]
(j/start app)
(info"pipe is up")
app))
(defnstop-app"Stops the stream processing application."
[app]
(j/close app)
(info"pipe is down"))
(defn-main
[& _]
(start-app (app-config)))
Define topology start stop
(require '[pipe])
(defnstop-pipe"Stops the app, and deletes topics and internal state."
[]
(when (and system (:pipe-app system))
(pipe/stop-app (:pipe-app system)))
(re-delete-topics#"(input|output)")
(alter-var-root #'system merge {:pipe-appnil}))
(defnstart-pipe"Creates topics, and starts the app."
[]
(create-topics (map pipe/topic-config ["input""output"]))
(alter-var-root #'system merge {:pipe-app (pipe/start-app (pipe/app-config))}))
Start/reset topology state
(stop-pipe)
(Thread/sleep1000)
(start-pipe)
{:pipe-app
#object[org.apache.kafka.streams.KafkaStreams 0x2855d41f "org.apache.kafka.streams.KafkaStreams@2855d41f"]}
log4j:WARN No appenders could be found for logger (org.apache.kafka.clients.admin.AdminClientConfig).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
digraph {
splines=true;
node [shape=box];
e [label="Events"]
us [label="User Sources"]
evs [label="Events by Source"]
evus [label="Events by user and source"]
s [label="User stats"]
e -> evs
evs -> evus
us -> evus
evus -> s
}
Define topology
(nsflex""
(:gen-class)
(:require [clojure.string :as str]
[clojure.tools.logging :refer [info]]
[jackdaw.streams :as j]
[jackdaw.serdes.edn :as jse])
(:import [org.apache.kafka.common.serialization Serdes]))
(defntopic-config"Takes a topic name and returns a topic configuration map, which may be used to create a topic or produce/consume records."
[topic-name]
{:topic-name topic-name
:partition-count1:replication-factor1:key-serde (jse/serde)
:value-serde (jse/serde)})
(defnapp-config"Returns the application config."
[]
{"application.id""flex-app""bootstrap.servers""localhost:9092""cache.max.bytes.buffering""0"})
(defnbuild-topology""
[builder]
(let [event-stream (j/kstream builder (topic-config"events"))
user-sources-table (j/ktable builder (topic-config"user-sources"))
events-by-source (-> event-stream
(j/map (fn [[_ v]]
[(:source-id v) v]))
(j/through (topic-config"events-by-source")))
events-by-user-and-source (-> events-by-source
(j/left-join user-sources-table
(fn [event user-source]
(merge event user-source))
(topic-config"")
(topic-config""))
(j/map (fn [[_ v]]
[[(:user-id v) (:source-id v)] v]))
(j/through (topic-config"events-by-user-and-source")))]
(-> events-by-user-and-source
(j/group-by-key (topic-config""))
(j/aggregate (constantly {:count0:sum0})
(fn [acc [k v]]
(-> acc
(update:count inc)
(update:sum #(+ % (:value v)))
(merge (select-keys v [:name:user-id]))))
(topic-config"user-stats"))
(j/to-kstream)
(j/to (topic-config"user-stats")))
builder))
(defnstart-app"Starts the stream processing application."
[app-config]
(let [builder (j/streams-builder)
topology (build-topology builder)
app (j/kafka-streams topology app-config)]
(j/start app)
(info"flex is up")
app))
(defnstop-app"Stops the stream processing application."
[app]
(j/close app)
(info"flex is down"))
(defn-main
[& _]
(start-app (app-config)))
Define topology start stop
(require '[flex])
(defnstop-flex"Stops the app, and deletes topics and internal state."
[]
(when (and system (:flex-app system))
(flex/stop-app (:flex-app system))
(.cleanUp (:flex-app system)) ;; clears internal state topics
)
(re-delete-topics#"(events|events-by-source|events-by-user-and-source|user-sources|user-stats)")
(alter-var-root #'system merge {:flex-appnil}))
(defnstart-flex"Creates topics, and starts the app."
[]
(create-topics (map flex/topic-config ["events""events-by-source""events-by-user-and-source""user-sources""user-stats"]))
(alter-var-root #'system merge {:flex-app (flex/start-app (flex/app-config))}))
Start/reset topology state
(stop-flex)
(Thread/sleep1000)
(start-flex)
{:flex-app
#object[org.apache.kafka.streams.KafkaStreams 0x12441536 "org.apache.kafka.streams.KafkaStreams@12441536"]}
log4j:WARN No appenders could be found for logger (org.apache.kafka.clients.admin.AdminClientConfig).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.