From 85d13545275678a1077b9600fce136ae10dcb809 Mon Sep 17 00:00:00 2001
From: Stefan van den Oord <stefan@medicinemen.eu>
Date: Fri, 14 Jun 2024 16:08:59 +0200
Subject: [PATCH] #3 Add optional batch name to queue jobs

---
 src/com/github/ivarref/yoltq.clj              | 23 +++++++++++++++++++
 src/com/github/ivarref/yoltq/impl.clj         |  5 +++-
 .../com/github/ivarref/yoltq/virtual_test.clj | 22 ++++++++++++++++++
 3 files changed, 49 insertions(+), 1 deletion(-)

diff --git a/src/com/github/ivarref/yoltq.clj b/src/com/github/ivarref/yoltq.clj
index 379d701..1ba286e 100644
--- a/src/com/github/ivarref/yoltq.clj
+++ b/src/com/github/ivarref/yoltq.clj
@@ -230,6 +230,29 @@
          (sort-by (juxt :qname :status))
          (vec))))
 
+(defn batch-progress [queue-name batch-name]
+  (let [{:keys [conn]} @*config*
+        db (d/db conn)]
+    (->> (d/q '[:find ?e ?qname ?bname ?status
+                :keys :e :qname :bname :status
+                :in $ ?qname ?bname
+                :where
+                [?e :com.github.ivarref.yoltq/queue-name ?qname]
+                [?e :com.github.ivarref.yoltq/batch-name ?bname]
+                [?e :com.github.ivarref.yoltq/status ?status]]
+              db queue-name batch-name)
+         (mapv #(select-keys % [:qname :bname :status]))
+         (mapv (fn [qitem] {qitem 1}))
+         (reduce (partial merge-with +) {})
+         (mapv (fn [[{:keys [qname bname status]} v]]
+                 (array-map
+                  :qname qname
+                  :batch-name bname
+                  :status status
+                  :count v)))
+         (sort-by (juxt :qname :batch-name :status))
+         (vec))))
+
 (defn get-errors [qname]
   (let [{:keys [conn]} @*config*
         db (d/db conn)]
diff --git a/src/com/github/ivarref/yoltq/impl.clj b/src/com/github/ivarref/yoltq/impl.clj
index ac573d1..6d2aa3d 100644
--- a/src/com/github/ivarref/yoltq/impl.clj
+++ b/src/com/github/ivarref/yoltq/impl.clj
@@ -12,6 +12,7 @@
   [#:db{:ident :com.github.ivarref.yoltq/id, :cardinality :db.cardinality/one, :valueType :db.type/uuid, :unique :db.unique/identity}
    #:db{:ident :com.github.ivarref.yoltq/ext-id, :cardinality :db.cardinality/one, :valueType :db.type/string, :unique :db.unique/value}
    #:db{:ident :com.github.ivarref.yoltq/queue-name, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true}
+   #:db{:ident :com.github.ivarref.yoltq/batch-name, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true}
    #:db{:ident :com.github.ivarref.yoltq/status, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true}
    #:db{:ident :com.github.ivarref.yoltq/payload, :cardinality :db.cardinality/one, :valueType :db.type/string}
    #:db{:ident :com.github.ivarref.yoltq/payload-bytes, :cardinality :db.cardinality/one, :valueType :db.type/bytes}
@@ -103,7 +104,9 @@
                            (pr-str-safe :depends-on [q ext-id]))
               (throw (ex-info (str ":depends-on not found in database. Queue: " q ", id: " ext-id) opts))))
           (when-let [ext-id (:id opts)]
-            {:com.github.ivarref.yoltq/ext-id (pr-str-safe :id [queue-name ext-id])}))))
+            {:com.github.ivarref.yoltq/ext-id (pr-str-safe :id [queue-name ext-id])})
+          (when-let [batch-name (:batch-name opts)]
+            {:com.github.ivarref.yoltq/batch-name batch-name}))))
     (do
       (log/error "Did not find registered handler for queue" queue-name)
       (throw (ex-info (str "Did not find registered handler for queue: " queue-name) {:queue queue-name})))))
diff --git a/test/com/github/ivarref/yoltq/virtual_test.clj b/test/com/github/ivarref/yoltq/virtual_test.clj
index 2800c21..7621b13 100644
--- a/test/com/github/ivarref/yoltq/virtual_test.clj
+++ b/test/com/github/ivarref/yoltq/virtual_test.clj
@@ -450,3 +450,25 @@
     @(d/transact conn [(yq/put :q "asdf")])
     (tq/consume! :q)
     (is (= @got-work "asdf"))))
+
+(deftest batch-of-jobs-test
+  (let [conn (u/empty-conn)]
+    (yq/init! {:conn conn})
+    (yq/add-consumer! :q1 identity)
+    (yq/add-consumer! :q2 identity)
+    @(d/transact conn [(yq/put :q1 {:work 123} {:batch-name :b1})
+                       (yq/put :q1 {:work 456} {:batch-name :b2})
+                       (yq/put :q2 {:work 789} {:batch-name :b1})])
+    (is (= [{:qname :q1
+             :batch-name :b1
+             :status :init
+             :count 1}]
+           (yq/batch-progress :q1 :b1)))
+
+    (is (= {:work 123} (tq/consume! :q1)))
+
+    (is (= [{:qname :q1
+             :batch-name :b1
+             :status :done
+             :count 1}]
+           (yq/batch-progress :q1 :b1)))))