diff --git a/README.md b/README.md
index 63b9ad3..a914cc7 100644
--- a/README.md
+++ b/README.md
@@ -131,7 +131,8 @@ Inspecting `(yq/put :q {:work 123})]` you will see something like this:
This is the queue job as it will be stored into the database.
You can see that the payload, i.e. the second argument of `yq/put`,
-is persisted into the database. Thus the payload must be `pr-str`-able.
+is persisted into the database. Thus the payload must be `pr-str`-able (unless you have specified
+custom `:encode` and `:decode` functions that override this).
A queue job will initially have status `:init`.
@@ -220,6 +221,47 @@ is shut down abruptly during processing of queue jobs.
A queue job will remain in status `:error` once `:max-retries` (default: 100) have been reached.
Ideally this will not happen. ¯\\\_(ツ)\_/¯
+### Custom encoding and decoding
+
+Yoltq will use `pr-str` and `clojure.edn/read-string` by default to encode and decode data.
+You may specify `:encode` and `:decode` either globally or per queue to override this behaviour.
+The `:encode` function must return a byte array or a string.
+
+For example if you want to use [nippy](https://github.com/ptaoussanis/nippy):
+```clojure
+(require '[taoensso.nippy :as nippy])
+
+; Globally for all queues:
+(yq/init!
+ {:conn conn
+ :encode nippy/freeze
+ :decode nippy/thaw})
+
+; Or per queue:
+(yq/add-consumer!
+ :q ; Queue to consume
+ (fn [payload] (println "got payload:" payload)) ; Queue consumer function
+ {:encode nippy/freeze
+ :decode nippy/thaw}) ; Queue options, here with :encode and :decode
+```
+
+### Partitions
+
+Yoltq supports specifying which [partition](https://docs.datomic.com/on-prem/schema/schema.html#partitions)
+queue entities should belong to.
+The default function is:
+```clojure
+(defn default-partition-fn [_queue-name]
+ (keyword "yoltq" (str "queue_" (.getValue (java.time.Year/now)))))
+```
+This is to say that there will be a single partition per year for yoltq.
+Yoltq will take care of creating the partition if it does not exist.
+
+You may override this function, either globally or per queue, with the keyword `:partition-fn`.
+E.g.:
+```clojure
+(yq/init! {:conn conn :partition-fn (fn [_queue-name] :my-partition)})
+```
### All configuration options
@@ -376,8 +418,29 @@ For Redis there is [carmine](https://github.com/ptaoussanis/carmine).
Note: I have not tried these libraries myself.
+## Other stuff
+
+If you liked this library, you may also like:
+
+* [conformity](https://github.com/avescodes/conformity): A Clojure/Datomic library for idempotently transacting norms into your database – be they schema, data, or otherwise.
+* [datomic-schema](https://github.com/ivarref/datomic-schema): Simplified writing of Datomic schemas (works with conformity).
+* [double-trouble](https://github.com/ivarref/double-trouble): Handle duplicate Datomic transactions with ease.
+* [gen-fn](https://github.com/ivarref/gen-fn): Generate Datomic function literals from regular Clojure namespaces.
+* [rewriting-history](https://github.com/ivarref/rewriting-history): A library to rewrite Datomic history.
+
## Change log
+#### 2022-11-18 v0.2.63 [diff](https://github.com/ivarref/yoltq/compare/v0.2.62...v0.2.63)
+Added custom `:encode` and `:decode` support.
+
+Added support for specifying `:partifion-fn` to specify which partition a queue item should belong to.
+It defaults to:
+```clojure
+(defn default-partition-fn [_queue-name]
+ (keyword "yoltq" (str "queue_" (.getValue (Year/now)))))
+```
+Yoltq takes care of creating the partition if it does not exist.
+
#### 2022-11-15 v0.2.62 [diff](https://github.com/ivarref/yoltq/compare/v0.2.61...v0.2.62)
Added function `processing-time-stats`:
diff --git a/deps.edn b/deps.edn
index 6923881..e36885e 100644
--- a/deps.edn
+++ b/deps.edn
@@ -1,5 +1,6 @@
-{:deps {org.clojure/tools.logging {:mvn/version "1.2.4"}
- org.clojure/clojure {:mvn/version "1.11.1"}}
+{:deps {com.github.ivarref/double-trouble {:mvn/version "0.1.102"}
+ org.clojure/tools.logging {:mvn/version "1.2.4"}
+ org.clojure/clojure {:mvn/version "1.11.1"}}
:paths ["src"]
@@ -11,6 +12,7 @@
clojure-term-colors/clojure-term-colors {:mvn/version "0.1.0"}
com.datomic/datomic-pro {:mvn/version "1.0.6316" :exclusions [org.slf4j/slf4j-nop]}
org.postgresql/postgresql {:mvn/version "9.3-1102-jdbc41"}
+ com.taoensso/nippy {:mvn/version "3.2.0"}
io.github.cognitect-labs/test-runner {:git/tag "v0.5.0" :git/sha "b3fd0d2"}}
:jvm-opts ["-DDISABLE_SPY=true"
"-DTAOENSSO_TIMBRE_MIN_LEVEL_EDN=:error"]
diff --git a/pom.xml b/pom.xml
index 2c11984..463899d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -4,7 +4,7 @@
jar
com.github.ivarref
yoltq
- 0.2.62
+ 0.2.63
yoltq
@@ -12,6 +12,11 @@
clojure
1.11.1
+
+ com.github.ivarref
+ double-trouble
+ 0.1.102
+
org.clojure
tools.logging
@@ -30,7 +35,7 @@
scm:git:git://github.com/ivarref/yoltq.git
scm:git:ssh://git@github.com/ivarref/yoltq.git
- v0.2.62
+ v0.2.63
https://github.com/ivarref/yoltq
\ No newline at end of file
diff --git a/src/com/github/ivarref/yoltq/impl.clj b/src/com/github/ivarref/yoltq/impl.clj
index c37b0e6..ac573d1 100644
--- a/src/com/github/ivarref/yoltq/impl.clj
+++ b/src/com/github/ivarref/yoltq/impl.clj
@@ -1,11 +1,12 @@
(ns com.github.ivarref.yoltq.impl
- (:require [datomic.api :as d]
- [clojure.tools.logging :as log]
+ (:require [clojure.edn :as edn]
[clojure.string :as str]
- [com.github.ivarref.yoltq.utils :as u]
+ [clojure.tools.logging :as log]
+ [com.github.ivarref.double-trouble :as dt]
[com.github.ivarref.yoltq.ext-sys :as ext]
- [clojure.edn :as edn]))
-
+ [com.github.ivarref.yoltq.utils :as u]
+ [datomic.api :as d])
+ (:import (java.time Year)))
(def schema
[#:db{:ident :com.github.ivarref.yoltq/id, :cardinality :db.cardinality/one, :valueType :db.type/uuid, :unique :db.unique/identity}
@@ -13,6 +14,7 @@
#:db{:ident :com.github.ivarref.yoltq/queue-name, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true}
#:db{:ident :com.github.ivarref.yoltq/status, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true}
#:db{:ident :com.github.ivarref.yoltq/payload, :cardinality :db.cardinality/one, :valueType :db.type/string}
+ #:db{:ident :com.github.ivarref.yoltq/payload-bytes, :cardinality :db.cardinality/one, :valueType :db.type/bytes}
#:db{:ident :com.github.ivarref.yoltq/opts, :cardinality :db.cardinality/one, :valueType :db.type/string}
#:db{:ident :com.github.ivarref.yoltq/bindings, :cardinality :db.cardinality/one, :valueType :db.type/string}
#:db{:ident :com.github.ivarref.yoltq/tries, :cardinality :db.cardinality/one, :valueType :db.type/long, :noHistory true}
@@ -41,13 +43,22 @@
(log/error "could not read-string" what ":" (ex-message e))
(throw e))))
+(defn default-partition-fn [_queue-keyword]
+ (keyword "yoltq" (str "queue_" (.getValue (Year/now)))))
-(defn put [{:keys [capture-bindings conn] :as config}
+(defn put [{:keys [capture-bindings conn encode partition-fn]
+ :or {partition-fn default-partition-fn
+ encode (partial pr-str-safe :payload)}
+ :as config}
queue-name
payload
opts]
(if-let [q-config (get-in config [:handlers queue-name])]
(let [id (u/squuid)
+ encode (get q-config :encode encode)
+ partition-fn (get q-config :partition-fn partition-fn)
+ partition (partition-fn queue-name)
+ _ (assert (keyword? partition) "Partition must be a keyword")
depends-on (get q-config :depends-on (fn [_] nil))
valid-payload? (get q-config :valid-payload? (fn [_] true))
opts (merge
@@ -58,32 +69,41 @@
(assoc o (symbol k) (deref k)))
{}
(or capture-bindings []))
- (pr-str-safe :capture-bindings))]
- (when-not (valid-payload? payload)
- (log/error "Payload was not valid. Payload was:" payload)
- (throw (ex-info (str "Payload was not valid: " payload) {:payload payload})))
+ (pr-str-safe :capture-bindings))
+ _ (when-not (valid-payload? payload)
+ (log/error "Payload was not valid. Payload was:" payload)
+ (throw (ex-info (str "Payload was not valid: " payload) {:payload payload})))
+ encoded (encode payload)
+ _ (when (not (or (bytes? encoded) (string? encoded)))
+ (log/error "Payload must be encoded to either a string or a byte array")
+ (throw (ex-info (str "Payload must be encoded to a string or a byte array. Payload: " payload) {:payload payload})))]
(log/debug "queue item" (str id) "for queue" queue-name "is pending status" u/status-init)
- (merge
- {:com.github.ivarref.yoltq/id id
- :com.github.ivarref.yoltq/queue-name queue-name
- :com.github.ivarref.yoltq/status u/status-init
- :com.github.ivarref.yoltq/payload (pr-str-safe :payload payload)
- :com.github.ivarref.yoltq/bindings str-bindings
- :com.github.ivarref.yoltq/opts (pr-str-safe :opts opts)
- :com.github.ivarref.yoltq/lock (u/random-uuid)
- :com.github.ivarref.yoltq/tries 0
- :com.github.ivarref.yoltq/init-time (u/now-ms)
- :com.github.ivarref.yoltq/version "2"}
- (when-let [[q ext-id] (:depends-on opts)]
- (when-not (d/q '[:find ?e .
- :in $ ?ext-id
- :where
- [?e :com.github.ivarref.yoltq/ext-id ?ext-id]]
- (d/db conn)
- (pr-str-safe :depends-on [q ext-id]))
- (throw (ex-info (str ":depends-on not found in database. Queue: " q ", id: " ext-id) opts))))
- (when-let [ext-id (:id opts)]
- {:com.github.ivarref.yoltq/ext-id (pr-str-safe :id [queue-name ext-id])})))
+ (do
+ (dt/ensure-partition! conn partition)
+ (merge
+ (if (bytes? encoded)
+ {:com.github.ivarref.yoltq/payload-bytes encoded}
+ {:com.github.ivarref.yoltq/payload encoded})
+ {:db/id (d/tempid partition)
+ :com.github.ivarref.yoltq/id id
+ :com.github.ivarref.yoltq/queue-name queue-name
+ :com.github.ivarref.yoltq/status u/status-init
+ :com.github.ivarref.yoltq/bindings str-bindings
+ :com.github.ivarref.yoltq/opts (pr-str-safe :opts opts)
+ :com.github.ivarref.yoltq/lock (u/random-uuid)
+ :com.github.ivarref.yoltq/tries 0
+ :com.github.ivarref.yoltq/init-time (u/now-ms)
+ :com.github.ivarref.yoltq/version "2"}
+ (when-let [[q ext-id] (:depends-on opts)]
+ (when-not (d/q '[:find ?e .
+ :in $ ?ext-id
+ :where
+ [?e :com.github.ivarref.yoltq/ext-id ?ext-id]]
+ (d/db conn)
+ (pr-str-safe :depends-on [q ext-id]))
+ (throw (ex-info (str ":depends-on not found in database. Queue: " q ", id: " ext-id) opts))))
+ (when-let [ext-id (:id opts)]
+ {:com.github.ivarref.yoltq/ext-id (pr-str-safe :id [queue-name ext-id])}))))
(do
(log/error "Did not find registered handler for queue" queue-name)
(throw (ex-info (str "Did not find registered handler for queue: " queue-name) {:queue queue-name})))))
@@ -169,20 +189,23 @@
"in" (format "%.1f" (double (/ spent-ns 1e6))) "ms"]))
-(defn execute! [{:keys [handlers mark-status-fn! start-execute-time collect-spent-time!]
- :or {mark-status-fn! mark-status!}
+(defn execute! [{:keys [decode handlers mark-status-fn! start-execute-time collect-spent-time!]
+ :or {mark-status-fn! mark-status!
+ decode edn/read-string}
:as cfg}
- {:com.github.ivarref.yoltq/keys [status id queue-name payload] :as queue-item}]
+ {:com.github.ivarref.yoltq/keys [status id queue-name payload payload-bytes] :as queue-item}]
(when queue-item
(if (= :error status)
(assoc queue-item :failed? true)
(if-let [queue (get handlers queue-name)]
- (let [{:keys [f allow-cas-failure?]} queue]
+ (let [{:keys [f allow-cas-failure?]} queue
+ decode (get queue :decode decode)]
(log/debug "queue item" (str id) "for queue" queue-name "is now processing")
(let [{:keys [retval exception]}
(try
(swap! start-execute-time assoc (Thread/currentThread) [(ext/now-ms) id queue-name])
- (let [v (f payload)]
+ (let [payload (decode (or payload payload-bytes))
+ v (f payload)]
{:retval v})
(catch Throwable t
{:exception t})
diff --git a/src/com/github/ivarref/yoltq/utils.clj b/src/com/github/ivarref/yoltq/utils.clj
index 39572a9..7665b6d 100644
--- a/src/com/github/ivarref/yoltq/utils.clj
+++ b/src/com/github/ivarref/yoltq/utils.clj
@@ -57,7 +57,6 @@
(defn get-queue-item [db id]
(-> (d/pull db '[:*] [:com.github.ivarref.yoltq/id id])
(dissoc :db/id)
- (update :com.github.ivarref.yoltq/payload edn/read-string)
(update :com.github.ivarref.yoltq/opts (fn [s] (or (when s (edn/read-string s)) {})))
(update :com.github.ivarref.yoltq/bindings
(fn [s]
diff --git a/test/com/github/ivarref/yoltq/virtual_test.clj b/test/com/github/ivarref/yoltq/virtual_test.clj
index 996792e..2800c21 100644
--- a/test/com/github/ivarref/yoltq/virtual_test.clj
+++ b/test/com/github/ivarref/yoltq/virtual_test.clj
@@ -1,18 +1,21 @@
(ns com.github.ivarref.yoltq.virtual-test
- (:require [clojure.test :refer [deftest is use-fixtures] :refer-macros [thrown?]]
- [clojure.tools.logging :as log]
- [com.github.ivarref.yoltq :as yq]
- [com.github.ivarref.yoltq.error-poller :as error-poller]
- [com.github.ivarref.yoltq.ext-sys :as ext]
- [com.github.ivarref.yoltq.impl :as i]
- [com.github.ivarref.yoltq.migrate :as migrate]
- [com.github.ivarref.yoltq.test-queue :as tq]
- [com.github.ivarref.yoltq.test-utils :as u]
- [com.github.ivarref.yoltq.utils :as uu]
- [datomic-schema.core]
- [datomic.api :as d]
- [taoensso.timbre :as timbre])
- (:import (java.time Duration)))
+ (:require
+ [clojure.string :as str]
+ [clojure.test :refer [deftest is use-fixtures] :refer-macros [thrown?]]
+ [clojure.tools.logging :as log]
+ [com.github.ivarref.yoltq :as yq]
+ [com.github.ivarref.yoltq.error-poller :as error-poller]
+ [com.github.ivarref.yoltq.ext-sys :as ext]
+ [com.github.ivarref.yoltq.impl :as i]
+ [com.github.ivarref.yoltq.migrate :as migrate]
+ [com.github.ivarref.yoltq.test-queue :as tq]
+ [com.github.ivarref.yoltq.test-utils :as u]
+ [com.github.ivarref.yoltq.utils :as uu]
+ [datomic-schema.core]
+ [datomic.api :as d]
+ [taoensso.nippy :as nippy]
+ [taoensso.timbre :as timbre])
+ (:import (java.time Duration LocalDateTime)))
(use-fixtures :each tq/call-with-virtual-queue!)
@@ -380,3 +383,70 @@
(is (= 0 (error-poller/do-poll-errors @yq/*config* (ext/now-ms))))
(is (= 0 (error-poller/do-poll-errors @yq/*config* (+ (dec (.toMillis (Duration/ofMinutes 15))) (ext/now-ms)))))
(is (= 1 (error-poller/do-poll-errors @yq/*config* (+ (.toMillis (Duration/ofMinutes 15)) (ext/now-ms)))))))
+
+(deftest global-encode-decode
+ (let [conn (u/empty-conn)
+ ldt (LocalDateTime/now)
+ got-work (atom nil)]
+ (yq/init! {:conn conn
+ :encode nippy/freeze
+ :decode nippy/thaw})
+ (yq/add-consumer! :q (fn [work] (reset! got-work work)))
+ @(d/transact conn [(yq/put :q {:work ldt})])
+ (tq/consume! :q)
+ (is (= @got-work {:work ldt}))))
+
+(deftest queue-encode-decode
+ (let [conn (u/empty-conn)
+ ldt (LocalDateTime/now)
+ got-work (atom nil)]
+ (yq/init! {:conn conn})
+ (yq/add-consumer! :q (fn [work] (reset! got-work work))
+ {:encode nippy/freeze
+ :decode nippy/thaw})
+ @(d/transact conn [(yq/put :q {:work ldt})])
+ (tq/consume! :q)
+ (is (= @got-work {:work ldt}))))
+
+(deftest global-partition
+ (let [conn (u/empty-conn)
+ got-work (atom nil)]
+ (yq/init! {:conn conn
+ :partition-fn (fn [_queue-name] :my-part)})
+ (yq/add-consumer! :q (fn [work] (reset! got-work work)))
+ @(d/transact conn [(yq/put :q {:work 123})])
+ (tq/consume! :q)
+ (is (some? (d/q '[:find ?e .
+ :in $ ?part
+ :where
+ [?e :db/ident ?part]]
+ (d/db conn)
+ :my-part)))
+ (is (= @got-work {:work 123}))))
+
+(deftest partition-per-queue
+ (let [conn (u/empty-conn)
+ got-work (atom nil)]
+ (yq/init! {:conn conn})
+ (yq/add-consumer! :q (fn [work] (reset! got-work work))
+ {:partition-fn (fn [_queue-name] :my-part)})
+ @(d/transact conn [(yq/put :q {:work 123})])
+ (tq/consume! :q)
+ (is (some? (d/q '[:find ?e .
+ :in $ ?part
+ :where
+ [?e :db/ident ?part]]
+ (d/db conn)
+ :my-part)))
+ (is (= @got-work {:work 123}))))
+
+(deftest string-encode-decode
+ (let [conn (u/empty-conn)
+ got-work (atom nil)]
+ (yq/init! {:conn conn
+ :encode (fn [x] (str/join (reverse x)))
+ :decode (fn [x] (str/join (reverse x)))})
+ (yq/add-consumer! :q (fn [work] (reset! got-work work)))
+ @(d/transact conn [(yq/put :q "asdf")])
+ (tq/consume! :q)
+ (is (= @got-work "asdf"))))