Skip to content

Latest commit

 

History

History
925 lines (799 loc) · 30.8 KB

streams-basic.org

File metadata and controls

925 lines (799 loc) · 30.8 KB

Project deps

{:deps 
 {fundingcircle/jackdaw {:mvn/version "0.6.6"}
  org.apache.kafka/kafka-streams-test-utils {:mvn/version "2.2.0"}
  ;;org.apache.kafka/kafka-streams {:mvn/version "2.1.0"}
  ;;org.apache.kafka/kafka-streams-test-utils {:mvn/version "2.1.0"}
  org.clojure/clojure {:mvn/version "1.10.0"}
  org.clojure/tools.logging {:mvn/version "0.4.1"}}

 :mvn/repos
 {"confluent" {:url "https://packages.confluent.io/maven/"}}

 :paths
 ["src" "test" "dev"]

 :aliases
 {:test {:extra-deps {com.cognitect/test-runner
                      {:git/url "https://github.com/cognitect-labs/test-runner.git"
                       :sha "209b64504cb3bd3b99ecfec7937b358a879f55c1"}}
         :main-opts ["-m" "cognitect.test-runner"]}}}

Confluent tools

Manage Confluent tools on the REPL

(ns confluent
  "Functions to start and stop ZooKeeper and Kafka.
  These functions require the Confluent Platform CLI which can be
  obtained from `https://www.confluent.io/download/`.
  WARNING: Quitting the REPL will not stop ZooKeeper and Kafka. Before
  exiting, you must invoke `confluent/stop`. Otherwise, run `confluent
  destroy` from the command line."
  (:require [clojure.string :as str]
            [clojure.java.shell :refer [sh]]))

(defn not-up
  "Takes `service` and returns true if the service is down"
  [service]
  (->> (:out (sh "confluent" "status"))
       str/split-lines
       (keep (fn [x] (re-find (re-pattern (str service " is.*")) x)))
       first
       (re-find #"DOWN")
       boolean))

(defn stop
  "Starts ZooKeeper and Kafka."
  []
  (sh "confluent" "destroy")
  (println "schema-registry is down")
  (println "kafka is down")
  (println "zookeeper is down"))

(defn start
  "Starts ZooKeeper and Kafka."
  []
  (with-out-str (stop))
  (doseq [s ["zookeeper" "kafka" "schema-registry"]]
    (do (while (not-up s)
          (sh "confluent" "start" s)
          (Thread/sleep 1000))
        (println s "is up"))))

(defn reset
  "Stops and starts ZooKeeper and Kafka."
  []
  (stop)
  (start))

Start confluent platform

(confluent/start)
zookeeper is up
kafka is up
schema-registry is up
nil

Helpful helpers

(ns user
  "your lovely home"
  (:require [clojure.java.shell :refer [sh]]
            [jackdaw.client :as jc]
            [jackdaw.client.log :as jcl]
            [jackdaw.admin :as ja]
            [jackdaw.serdes.edn :as jse]
            [jackdaw.streams :as j]
            [confluent])
  (:import org.apache.kafka.common.serialization.Serdes))

;;; ------------------------------------------------------------
;;;
;;; Configure topics
;;;
(defn topic-config
  "Takes a topic name and (optionally) key and value serdes and a
  partition count, and returns a topic configuration map, which may be
  used to create a topic or produce/consume records."
  ([topic-name]
   (topic-config topic-name (jse/serde)))

  ([topic-name value-serde]
   (topic-config topic-name (jse/serde) value-serde))

  ([topic-name key-serde value-serde]
   (topic-config topic-name 1 key-serde value-serde))

  ([topic-name partition-count key-serde value-serde]
   {:topic-name topic-name
    :partition-count partition-count
    :replication-factor 1
    :topic-config {}
    :key-serde key-serde
    :value-serde value-serde}))

;;; ------------------------------------------------------------
;;;
;;; Create, delete and list topics
;;;
(defn kafka-admin-client-config
  []
  {"bootstrap.servers" "localhost:9092"})

(defn create-topics
  "Takes a list of topics and creates these using the names given."
  [topic-config-list]
  (with-open [client (ja/->AdminClient (kafka-admin-client-config))]
    (ja/create-topics! client topic-config-list)))

(defn re-delete-topics
  "Takes an instance of java.util.regex.Pattern and deletes any Kafka
  topics that match."
  [re]
  (with-open [client (ja/->AdminClient (kafka-admin-client-config))]
    (let [topics-to-delete (->> (ja/list-topics client)
                                (filter #(re-find re (:topic-name %))))]
      (ja/delete-topics! client topics-to-delete))))

(defn create-topic
  "Takes a single topic config and creates a Kafka topic."
  [topic-config]
  (create-topics [topic-config]))

(defn list-topics
  "Returns a list of Kafka topics."
  []
  (with-open [client (ja/->AdminClient (kafka-admin-client-config))]
    (ja/list-topics client)))

(defn topic-exists?
  "Takes a topic name and returns true if the topic exists."
  [topic-config]
  (with-open [client (ja/->AdminClient (kafka-admin-client-config))]
    (ja/topic-exists? client topic-config)))

;;; ------------------------------------------------------------
;;;
;;; Produce and consume records
;;;

(defn kafka-producer-config
  []
  {"bootstrap.servers" "localhost:9092"})

(defn kafka-consumer-config
  [group-id]
  {"bootstrap.servers" "localhost:9092"
   "group.id" group-id
   "auto.offset.reset" "earliest"
   "enable.auto.commit" "false"})

(defn publish
  "Takes a topic config and record value, and (optionally) a key and
  parition number, and produces to a Kafka topic."
  ([topic-config value]
   (with-open [client (jc/producer (kafka-producer-config) topic-config)]
     @(jc/produce! client topic-config value))
   nil)

  ([topic-config key value]
   (with-open [client (jc/producer (kafka-producer-config) topic-config)]
     @(jc/produce! client topic-config key value))
   nil)

  ([topic-config partition key value]
   (with-open [client (jc/producer (kafka-producer-config) topic-config)]
     @(jc/produce! client topic-config partition key value))
   nil))

(defn get-records
  "Takes a topic config, consumes from a Kafka topic, and returns a
  seq of maps."
  ([topic-config]
   (get-records topic-config 200))

  ([topic-config polling-interval-ms]
   (let [client-config (kafka-consumer-config
                        (str (java.util.UUID/randomUUID)))]
     (with-open [client (jc/subscribed-consumer client-config
                                                [topic-config])]
       (doall (jcl/log client 100 seq))))))

(defn get-keyvals
  "Takes a topic config, consumes from a Kafka topic, and returns a
  seq of key-value pairs."
  ([topic-config]
   (get-keyvals topic-config 20))

  ([topic-config polling-interval-ms]
   (map (juxt :key :value) (get-records topic-config polling-interval-ms))))

;;; ------------------------------------------------------------
;;;
;;; System
;;;

(def system nil)

Simple pipe topology example

Overview

digraph {
splines=true;
node [shape=box];

input -> output
}

streams-basic/pipe.png

Define topology

(ns pipe
  "This tutorial contains a simple stream processing application using
  Jackdaw and Kafka Streams.
  Pipe reads from a Kafka topic called `input`, logs the key and
  value, and writes these to a Kafka topic called `output`."
  (:gen-class)
  (:require [clojure.string :as str]
            [clojure.tools.logging :refer [info]]
            [jackdaw.streams :as j]
            [jackdaw.serdes.edn :as jse])
  (:import [org.apache.kafka.common.serialization Serdes]))

(defn topic-config
  "Takes a topic name and returns a topic configuration map, which may
  be used to create a topic or produce/consume records."
  [topic-name]
  {:topic-name topic-name
   :partition-count 1
   :replication-factor 1
   :key-serde (jse/serde)
   :value-serde (jse/serde)})

(defn app-config
  "Returns the application config."
  []
  {"application.id" "foo"
   "bootstrap.servers" "localhost:9092"
   "cache.max.bytes.buffering" "0"})

(defn build-topology
  "Reads from a Kafka topic called `input`, logs the key and value,
  and writes these to a Kafka topic called `output`. Returns a
  topology builder."
  [builder]
  (-> (j/kstream builder (topic-config "input"))
      (j/peek (fn [[k v]]
                (info (str {:key k :value v}))))
      (j/to (topic-config "output")))
  builder)

(defn start-app
  "Starts the stream processing application."
  [app-config]
  (let [builder (j/streams-builder)
        topology (build-topology builder)
        app (j/kafka-streams topology app-config)]
    (j/start app)
    (info "pipe is up")
    app))

(defn stop-app
  "Stops the stream processing application."
  [app]
  (j/close app)
  (info "pipe is down"))

(defn -main
  [& _]
  (start-app (app-config)))

Define topology start stop

(require '[pipe])

(defn stop-pipe
  "Stops the app, and deletes topics and internal state."
  []
  (when (and system (:pipe-app system))
    (pipe/stop-app (:pipe-app system)))
  (re-delete-topics #"(input|output)")
  (alter-var-root #'system merge {:pipe-app nil}))

(defn start-pipe
  "Creates topics, and starts the app."
  []
  (create-topics (map pipe/topic-config ["input" "output"]))
  (alter-var-root #'system merge {:pipe-app (pipe/start-app (pipe/app-config))}))

Start/reset topology state

(stop-pipe)

(Thread/sleep 1000)

(start-pipe)
{:pipe-app
 #object[org.apache.kafka.streams.KafkaStreams 0x2855d41f "org.apache.kafka.streams.KafkaStreams@2855d41f"]}
log4j:WARN No appenders could be found for logger (org.apache.kafka.clients.admin.AdminClientConfig).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.

List topics

(list-topics)
({:topic-name "__confluent.support.metrics"}
 {:topic-name "_confluent-metrics"}
 {:topic-name "_schemas"}
 {:topic-name "input"}
 {:topic-name "output"})

List publish input

(publish (topic-config "input") {:foo "hola"})

Read from the output

(get-keyvals (topic-config "output"))
([nil "mundo"] [nil {:foo "hola"}])

The flex app

Overview

digraph   {
splines=true;
node [shape=box];

e [label="Events"]
us [label="User Sources"]
evs [label="Events by Source"]
evus [label="Events by user and source"]
s [label="User stats"]

e -> evs 
evs -> evus
us -> evus
evus -> s
}

streams-basic/flex.png

Define topology

(ns flex
  ""
  (:gen-class)
  (:require [clojure.string :as str]
            [clojure.tools.logging :refer [info]]
            [jackdaw.streams :as j]
            [jackdaw.serdes.edn :as jse])
  (:import [org.apache.kafka.common.serialization Serdes]))

(defn topic-config
  "Takes a topic name and returns a topic configuration map, which may
  be used to create a topic or produce/consume records."
  [topic-name]
  {:topic-name topic-name
   :partition-count 1
   :replication-factor 1
   :key-serde (jse/serde)
   :value-serde (jse/serde)})

(defn app-config
  "Returns the application config."
  []
  {"application.id" "flex-app"
   "bootstrap.servers" "localhost:9092"
   "cache.max.bytes.buffering" "0"})

(defn build-topology
  ""
  [builder]
  (let [event-stream (j/kstream builder (topic-config "events"))
        user-sources-table (j/ktable builder (topic-config "user-sources"))
        events-by-source (-> event-stream
                             (j/map (fn [[_ v]]
                                      [(:source-id v) v]))
                             (j/through (topic-config "events-by-source")))
        events-by-user-and-source (-> events-by-source
                                      (j/left-join user-sources-table
                                                   (fn [event user-source]
                                                     (merge event user-source))
                                                   (topic-config "")
                                                   (topic-config ""))
                                      (j/map (fn [[_ v]]
                                               [[(:user-id v) (:source-id v)] v]))
                                      (j/through (topic-config "events-by-user-and-source")))]
    (-> events-by-user-and-source
        (j/group-by-key (topic-config ""))
        (j/aggregate (constantly {:count 0 :sum 0})
                     (fn [acc [k v]]
                       (-> acc
                           (update :count inc)
                           (update :sum #(+ % (:value v)))
                           (merge (select-keys v [:name :user-id]))))
                     (topic-config "user-stats"))
        (j/to-kstream)
        (j/to (topic-config "user-stats")))
    builder))

(defn start-app
  "Starts the stream processing application."
  [app-config]
  (let [builder (j/streams-builder)
        topology (build-topology builder)
        app (j/kafka-streams topology app-config)]
    (j/start app)
    (info "flex is up")
    app))

(defn stop-app
  "Stops the stream processing application."
  [app]
  (j/close app)
  (info "flex is down"))

(defn -main
  [& _]
  (start-app (app-config)))

Define topology start stop

(require '[flex])

(defn stop-flex
  "Stops the app, and deletes topics and internal state."
  []
  (when (and system (:flex-app system))
    (flex/stop-app (:flex-app system))
    (.cleanUp (:flex-app system)) ;; clears internal state topics
    )
  (re-delete-topics #"(events|events-by-source|events-by-user-and-source|user-sources|user-stats)")
  (alter-var-root #'system merge {:flex-app nil}))

(defn start-flex
  "Creates topics, and starts the app."
  []
  (create-topics (map flex/topic-config ["events" "events-by-source" "events-by-user-and-source" "user-sources" "user-stats"]))
  (alter-var-root #'system merge {:flex-app (flex/start-app (flex/app-config))}))

Start/reset topology state

(stop-flex)

(Thread/sleep 1000)

(start-flex)
{:flex-app
 #object[org.apache.kafka.streams.KafkaStreams 0x12441536 "org.apache.kafka.streams.KafkaStreams@12441536"]}
log4j:WARN No appenders could be found for logger (org.apache.kafka.clients.admin.AdminClientConfig).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.

List topics

(list-topics)
({:topic-name "__confluent.support.metrics"}
 {:topic-name "_confluent-metrics"}
 {:topic-name "_schemas"}
 {:topic-name "events"}
 {:topic-name "events-by-source"}
 {:topic-name "events-by-user-and-source"}
 {:topic-name "flex-app-user-sources-changelog"}
 {:topic-name "flex-app-user-stats-changelog"}
 {:topic-name "user-sources"}
 {:topic-name "user-stats"})

List publish input

(def user-1 (java.util.UUID/randomUUID))

(def source-1 (java.util.UUID/randomUUID))

(def source-2 (java.util.UUID/randomUUID))

(def user-2 (java.util.UUID/randomUUID))

(def source-3 (java.util.UUID/randomUUID))


(publish (topic-config "user-sources")
         source-1
         {:name "step counter"
          :user-id user-1})

(publish (topic-config "user-sources")
         source-2
         {:name "pushup counter"
          :user-id user-1})

(publish (topic-config "user-sources")
         source-3
         {:name "step counter"
          :user-id user-2})

(publish (topic-config "events")
         {:event-id (java.util.UUID/randomUUID)
          :source-id source-1
          :value 1
          :timestamp (System/currentTimeMillis)})

(publish (topic-config "events")
         {:event-id (java.util.UUID/randomUUID)
          :source-id source-2
          :value 2
          :timestamp (System/currentTimeMillis)})

(publish (topic-config "events")
         {:event-id (java.util.UUID/randomUUID)
          :source-id source-3
          :value 100
          :timestamp (System/currentTimeMillis)})

(publish (topic-config "events")
         {:event-id (java.util.UUID/randomUUID)
          :source-id source-2
          :value 100
          :timestamp (System/currentTimeMillis)})

Read from the output

(in-ns 'user)

(get-keyvals (topic-config "events"))

(get-keyvals (topic-config "user-sources"))

(get-keyvals (topic-config "events-by-source"))

(get-keyvals (topic-config "events-by-user-and-source"))

(get-keyvals (topic-config "user-stats"))

(get-keyvals (topic-config "flex-app-user-stats-changelog"))
()

Test Machine

Tests using mock-transport aka (TopologyTestDriver)

Lets start with super verbose tests

(ns flex-test-1
  (:require [clojure.test :refer :all]
            [jackdaw.streams.mock :as j.s.m]
            [jackdaw.test :as j.t]))

(defn mock-transport
  [builder topics]
  (let [mock-driver (-> (j.s.m/streams-builder)
                        (builder)
                        (j.s.m/streams-builder->test-driver))]
    (j.t/mock-transport {:driver mock-driver}
                        topics)))

(deftest topology-test
  (let [topology-topics {:events (flex/topic-config "events")
                         :user-sources (flex/topic-config "user-sources")
                         :events-by-source (flex/topic-config "events-by-source")
                         :events-by-user-and-source (flex/topic-config "events-by-user-and-source")
                         :user-stats (flex/topic-config "user-stats")}
        ;; Transport
        transport (mock-transport flex/build-topology topology-topics)

        user-1 (java.util.UUID/randomUUID)
        user-2 (java.util.UUID/randomUUID)
        source-1 (java.util.UUID/randomUUID)
        source-2 (java.util.UUID/randomUUID)
        source-3 (java.util.UUID/randomUUID)

        ;; User Sources
        user-1-step-counter {:id source-1
                             :name "step counter"
                             :user-id user-1}
        user-1-pushup-counter {:id source-2
                               :name "pushup counter"
                               :user-id user-1}
        user-2-step-counter {:id source-3
                             :name "step counter"
                             :user-id user-2}]

    (with-open [machine (j.t/test-machine transport)]
      (testing "user-1 takes 1 step"
        (let [commands [[:write! :user-sources user-1-step-counter]
                        [:write! :events {:id (java.util.UUID/randomUUID)
                                          :event-id (java.util.UUID/randomUUID)
                                          :source-id source-1
                                          :value 1
                                          :timestamp (System/currentTimeMillis)}]
                        [:watch (fn [journal]
                                  (->> (get-in journal [:topics :user-stats])
                                       (filter #(= [user-1 source-1] (:key %)))
                                       (count)
                                       (= 1)))]]
              {:keys [results journal]} (j.t/run-test machine commands)]

          ;; check status on every command
          (is (every? #(= :ok (:status %)) results))
          ;; gets one step entry with one step count
          (is (= 1 (-> journal
                       (get-in [:topics :user-stats])
                       (->>
                        (filter #(= [user-1 source-1] (:key %))))
                       last
                       :value
                       :count)))
          (is (= 1 (-> journal
                       (get-in [:topics :user-stats])
                       (->>
                        (filter #(= [user-1 source-1] (:key %))))
                       last
                       :value
                       :sum)))))

      (testing "user-2 steps gets 2 counts of 50 steps while user-1 does 3 pushups"
        (let [commands [[:write! :user-sources user-1-step-counter]
                        [:write! :user-sources user-1-pushup-counter]
                        [:write! :user-sources user-2-step-counter]
                        [:write! :events {:id (java.util.UUID/randomUUID)
                                          :event-id (java.util.UUID/randomUUID)
                                          :source-id source-3
                                          :value 50
                                          :timestamp (System/currentTimeMillis)}]
                        [:write! :events {:id (java.util.UUID/randomUUID)
                                          :event-id (java.util.UUID/randomUUID)
                                          :source-id source-2
                                          :value 3
                                          :timestamp (System/currentTimeMillis)}]
                        [:write! :events {:id (java.util.UUID/randomUUID)
                                          :event-id (java.util.UUID/randomUUID)
                                          :source-id source-3
                                          :value 50
                                          :timestamp (System/currentTimeMillis)}]
                        [:watch (fn [journal]
                                  (->> (get-in journal [:topics :user-stats])
                                       (filter #(= [user-2 source-3] (:key %)))
                                       (count)
                                       (= 2)))]]
              {:keys [results journal]} (j.t/run-test machine commands)]

          (is (every? #(= :ok (:status %)) results))
          ;; there where two step updates for user 3
          (is (= 2 (-> journal
                       (get-in [:topics :user-stats])
                       (->>
                        (filter #(= [user-2 source-3] (:key %))))
                       last
                       :value
                       :count)))
          ;; total steps for user 3 was 100
          (is (= 100 (-> journal
                         (get-in [:topics :user-stats])
                         (->>
                          (filter #(= [user-2 source-3] (:key %))))
                         last
                         :value
                         :sum)))
          ;; gets one pushup entry with 3 for user 1
          (is (= 1 (-> journal
                       (get-in [:topics :user-stats])
                       (->>
                        (filter #(= [user-1 source-2] (:key %))))
                       last
                       :value
                       :count)))
          (is (= 3 (-> journal
                       (get-in [:topics :user-stats])
                       (->>
                        (filter #(= [user-1 source-2] (:key %))))
                       last
                       :value
                       :sum))))))))

We run the tests

(run-tests)
Testing flex-test

Ran 1 tests containing 8 assertions.
0 failures, 0 errors.
{:test 1, :pass 8, :fail 0, :error 0, :type :summary}

Tests also using kafka-transport

First lets make some helpers

(ns test-helpers
  (:require [jackdaw.streams.mock :as j.s.m]
            [jackdaw.test :as j.t]
            [jackdaw.test.fixtures :as j.t.f]))

;; Transport helpers
(def ^:dynamic *use-kafka* false)

(defn test-transport
  [topics build-topology-fn]
  (if *use-kafka*
    (let [kafka-test-config {"bootstrap.servers" "localhost:9092"
                             "group.id" "ce-data-aggregator-test"}]
      (j.t/kafka-transport
       kafka-test-config
       topics))
    (let [mock-driver (-> (j.s.m/streams-builder)
                          (build-topology-fn)
                          (j.s.m/streams-builder->test-driver))]
      (j.t/mock-transport {:driver mock-driver}
                          topics))))

;; Run test helpers
(defn results-ok? [tm-results]
  (every? #(= :ok %) (map :status tm-results)))

(defn run-commands [topics build-topology-fn app-config commands]
  (j.t.f/with-fixtures [(j.t.f/integration-fixture
                         (fn [_]
                           build-topology-fn)
                         {:broker-config {"bootstrap.servers" "localhost:9092"}
                          :topic-metadata topics
                          :app-config (-> app-config
                                          (update "application.id" #(str % "-" (java.util.UUID/randomUUID)))
                                          (assoc "cache.max.bytes.buffering" "0"))
                          :enable? *use-kafka*})]
    (j.t/with-test-machine
      (test-transport topics build-topology-fn)
      (fn [machine]
        (j.t/run-test machine commands)))))

;; Journal helpers
(defn raw-messages
  [journal topic-name]
  (sort-by :offset (get-in journal [:topics topic-name])))

(defn messages
  [journal topic-name]
  (->> (raw-messages journal topic-name)))

(defn messages-by-kv-fn
  [journal topic-name ks pred]
  (->> (messages journal topic-name)
       (filter (fn [m]
                 (pred (get-in m ks))))))

(defn messages-by-kv
  [journal topic-name ks value]
  (messages-by-kv-fn journal topic-name ks #(= value %)))

(defn message-by-kv
  [journal topic-name ks value]
  (first (messages-by-kv-fn journal topic-name ks #(= value %))))

(defn by-key [topic-name ks id]
  (fn [journal]
    (last (messages-by-kv journal topic-name ks id))))

(defn by-keys [topic-name ks ids]
  (fn [journal]
    (messages-by-kv-fn journal topic-name ks (set ids))))

(defn by-id [topic id]
  (by-key topic [:id] id))

(defn by-message-key [topic key]
  (by-key topic [:key] key))

Lets refactor

(ns flex-test-2
  (:require [clojure.test :refer :all]
            [jackdaw.streams.mock :as j.s.m]
            [jackdaw.test :as j.t]
            [jackdaw.test.commands.watch :as watch]
            [test-helpers]))

(defn user-source
  [user source name]
  {:id source
   :name name
   :user-id user})

(defn event
  [source value]
  {:id (java.util.UUID/randomUUID)
   :event-id (java.util.UUID/randomUUID)
   :source-id source
   :value value
   :timestamp (System/currentTimeMillis)})

(deftest topology-test
  (let [topology-topics {:events (flex/topic-config "events")
                         :user-sources (flex/topic-config "user-sources")
                         :events-by-source (flex/topic-config "events-by-source")
                         :events-by-user-and-source (flex/topic-config "events-by-user-and-source")
                         :user-stats (flex/topic-config "user-stats")}

        user-1 (java.util.UUID/randomUUID)
        user-2 (java.util.UUID/randomUUID)

        source-1 (java.util.UUID/randomUUID)
        source-2 (java.util.UUID/randomUUID)
        source-3 (java.util.UUID/randomUUID)

        ;; User Sources
        user-1-step-counter (user-source user-1 source-1 "step counter")
        user-1-pushup-counter (user-source user-1 source-2 "pushup counter")
        user-2-step-counter (user-source user-2 source-3 "step counter")]

    (binding [test-helpers/*use-kafka* true
              watch/*default-watch-timeout* 1000]
      (testing "user-1 takes 1 step"
        (let [step (event source-1 1)
              commands [[:write! :user-sources user-1-step-counter]
                        [:write! :events step]
                        [:watch (test-helpers/by-message-key :user-stats [user-1 source-1])]]
              {:keys [results journal]} (test-helpers/run-commands
                                         topology-topics
                                         flex/build-topology
                                         (flex/app-config)
                                         commands)]
          ;; check status on every command
          (is (test-helpers/results-ok? results))
          ;; gets one step entry with one step count
          (is (= 1 (-> ((test-helpers/by-message-key :user-stats [user-1 source-1]) journal)
                       :value
                       :count)))
          (is (= 1 (-> ((test-helpers/by-message-key :user-stats [user-1 source-1]) journal)
                       :value
                       :sum)))))

      (testing "user-2 steps gets 2 counts of 50 steps while user-1 does 3 pushups"
        (let [commands [[:write! :user-sources user-1-step-counter]
                        [:write! :user-sources user-1-pushup-counter]
                        [:write! :user-sources user-2-step-counter]
                        [:write! :events (event source-3 50)]
                        [:write! :events (event source-2 3)]
                        [:write! :events (event source-3 50)]
                        [:watch (test-helpers/by-message-key :user-stats [user-2 source-3])]]
              {:keys [results journal]} (test-helpers/run-commands
                                         topology-topics
                                         flex/build-topology
                                         (flex/app-config)
                                         commands)]
          (is (every? #(= :ok (:status %)) results))
          ;; there where two step updates for user 3
          (is (= 2 (-> ((test-helpers/by-message-key :user-stats [user-2 source-3]) journal)
                       :value
                       :count)))
          ;; total steps for user 3 was 100
          (is (= 100 (-> ((test-helpers/by-message-key :user-stats [user-2 source-3]) journal)
                         :value
                         :sum)))
          ;; gets one pushup entry with 3 for user 1
          (is (= 1 (-> ((test-helpers/by-message-key :user-stats [user-1 source-2]) journal)
                       :value
                       :count)))
          (is (= 3 (-> ((test-helpers/by-message-key :user-stats [user-1 source-2]) journal)
                       :value
                       :sum))))))))

Run tests

(run-tests)
Testing flex-test-2

Ran 1 tests containing 8 assertions.
0 failures, 0 errors.
{:test 1, :pass 8, :fail 0, :error 0, :type :summary}