Skip to content

Commit

Permalink
ivarref#3 Add optional batch name to queue jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
svdo committed Jun 14, 2024
1 parent 55574ae commit 85d1354
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 1 deletion.
23 changes: 23 additions & 0 deletions src/com/github/ivarref/yoltq.clj
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,29 @@
(sort-by (juxt :qname :status))
(vec))))

(defn batch-progress [queue-name batch-name]
(let [{:keys [conn]} @*config*
db (d/db conn)]
(->> (d/q '[:find ?e ?qname ?bname ?status
:keys :e :qname :bname :status
:in $ ?qname ?bname
:where
[?e :com.github.ivarref.yoltq/queue-name ?qname]
[?e :com.github.ivarref.yoltq/batch-name ?bname]
[?e :com.github.ivarref.yoltq/status ?status]]
db queue-name batch-name)
(mapv #(select-keys % [:qname :bname :status]))
(mapv (fn [qitem] {qitem 1}))
(reduce (partial merge-with +) {})
(mapv (fn [[{:keys [qname bname status]} v]]
(array-map
:qname qname
:batch-name bname
:status status
:count v)))
(sort-by (juxt :qname :batch-name :status))
(vec))))

(defn get-errors [qname]
(let [{:keys [conn]} @*config*
db (d/db conn)]
Expand Down
5 changes: 4 additions & 1 deletion src/com/github/ivarref/yoltq/impl.clj
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
[#:db{:ident :com.github.ivarref.yoltq/id, :cardinality :db.cardinality/one, :valueType :db.type/uuid, :unique :db.unique/identity}
#:db{:ident :com.github.ivarref.yoltq/ext-id, :cardinality :db.cardinality/one, :valueType :db.type/string, :unique :db.unique/value}
#:db{:ident :com.github.ivarref.yoltq/queue-name, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true}
#:db{:ident :com.github.ivarref.yoltq/batch-name, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true}
#:db{:ident :com.github.ivarref.yoltq/status, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true}
#:db{:ident :com.github.ivarref.yoltq/payload, :cardinality :db.cardinality/one, :valueType :db.type/string}
#:db{:ident :com.github.ivarref.yoltq/payload-bytes, :cardinality :db.cardinality/one, :valueType :db.type/bytes}
Expand Down Expand Up @@ -103,7 +104,9 @@
(pr-str-safe :depends-on [q ext-id]))
(throw (ex-info (str ":depends-on not found in database. Queue: " q ", id: " ext-id) opts))))
(when-let [ext-id (:id opts)]
{:com.github.ivarref.yoltq/ext-id (pr-str-safe :id [queue-name ext-id])}))))
{:com.github.ivarref.yoltq/ext-id (pr-str-safe :id [queue-name ext-id])})
(when-let [batch-name (:batch-name opts)]
{:com.github.ivarref.yoltq/batch-name batch-name}))))
(do
(log/error "Did not find registered handler for queue" queue-name)
(throw (ex-info (str "Did not find registered handler for queue: " queue-name) {:queue queue-name})))))
Expand Down
22 changes: 22 additions & 0 deletions test/com/github/ivarref/yoltq/virtual_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -450,3 +450,25 @@
@(d/transact conn [(yq/put :q "asdf")])
(tq/consume! :q)
(is (= @got-work "asdf"))))

(deftest batch-of-jobs-test
(let [conn (u/empty-conn)]
(yq/init! {:conn conn})
(yq/add-consumer! :q1 identity)
(yq/add-consumer! :q2 identity)
@(d/transact conn [(yq/put :q1 {:work 123} {:batch-name :b1})
(yq/put :q1 {:work 456} {:batch-name :b2})
(yq/put :q2 {:work 789} {:batch-name :b1})])
(is (= [{:qname :q1
:batch-name :b1
:status :init
:count 1}]
(yq/batch-progress :q1 :b1)))

(is (= {:work 123} (tq/consume! :q1)))

(is (= [{:qname :q1
:batch-name :b1
:status :done
:count 1}]
(yq/batch-progress :q1 :b1)))))

0 comments on commit 85d1354

Please sign in to comment.