Skip to content

Commit

Permalink
Release 0.2.63: Add support for :encode and :decode function. Add :pa…
Browse files Browse the repository at this point in the history
…rtition-fn. Fixes ivarref#1
  • Loading branch information
ivarref committed Nov 18, 2022
1 parent f3fc5f7 commit 8f945d8
Show file tree
Hide file tree
Showing 6 changed files with 218 additions and 56 deletions.
65 changes: 64 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ Inspecting `(yq/put :q {:work 123})]` you will see something like this:

This is the queue job as it will be stored into the database.
You can see that the payload, i.e. the second argument of `yq/put`,
is persisted into the database. Thus the payload must be `pr-str`-able.
is persisted into the database. Thus the payload must be `pr-str`-able (unless you have specified
custom `:encode` and `:decode` functions that override this).


A queue job will initially have status `:init`.
Expand Down Expand Up @@ -220,6 +221,47 @@ is shut down abruptly during processing of queue jobs.
A queue job will remain in status `:error` once `:max-retries` (default: 100) have been reached.
Ideally this will not happen. ¯\\\_(ツ)\_

### Custom encoding and decoding

Yoltq will use `pr-str` and `clojure.edn/read-string` by default to encode and decode data.
You may specify `:encode` and `:decode` either globally or per queue to override this behaviour.
The `:encode` function must return a byte array or a string.

For example if you want to use [nippy](https://github.com/ptaoussanis/nippy):
```clojure
(require '[taoensso.nippy :as nippy])

; Globally for all queues:
(yq/init!
{:conn conn
:encode nippy/freeze
:decode nippy/thaw})

; Or per queue:
(yq/add-consumer!
:q ; Queue to consume
(fn [payload] (println "got payload:" payload)) ; Queue consumer function
{:encode nippy/freeze
:decode nippy/thaw}) ; Queue options, here with :encode and :decode
```

### Partitions

Yoltq supports specifying which [partition](https://docs.datomic.com/on-prem/schema/schema.html#partitions)
queue entities should belong to.
The default function is:
```clojure
(defn default-partition-fn [_queue-name]
(keyword "yoltq" (str "queue_" (.getValue (java.time.Year/now)))))
```
This is to say that there will be a single partition per year for yoltq.
Yoltq will take care of creating the partition if it does not exist.

You may override this function, either globally or per queue, with the keyword `:partition-fn`.
E.g.:
```clojure
(yq/init! {:conn conn :partition-fn (fn [_queue-name] :my-partition)})
```

### All configuration options

Expand Down Expand Up @@ -376,8 +418,29 @@ For Redis there is [carmine](https://github.com/ptaoussanis/carmine).

Note: I have not tried these libraries myself.

## Other stuff

If you liked this library, you may also like:

* [conformity](https://github.com/avescodes/conformity): A Clojure/Datomic library for idempotently transacting norms into your database – be they schema, data, or otherwise.
* [datomic-schema](https://github.com/ivarref/datomic-schema): Simplified writing of Datomic schemas (works with conformity).
* [double-trouble](https://github.com/ivarref/double-trouble): Handle duplicate Datomic transactions with ease.
* [gen-fn](https://github.com/ivarref/gen-fn): Generate Datomic function literals from regular Clojure namespaces.
* [rewriting-history](https://github.com/ivarref/rewriting-history): A library to rewrite Datomic history.

## Change log

#### 2022-11-18 v0.2.63 [diff](https://github.com/ivarref/yoltq/compare/v0.2.62...v0.2.63)
Added custom `:encode` and `:decode` support.

Added support for specifying `:partifion-fn` to specify which partition a queue item should belong to.
It defaults to:
```clojure
(defn default-partition-fn [_queue-name]
(keyword "yoltq" (str "queue_" (.getValue (Year/now)))))
```
Yoltq takes care of creating the partition if it does not exist.

#### 2022-11-15 v0.2.62 [diff](https://github.com/ivarref/yoltq/compare/v0.2.61...v0.2.62)
Added function `processing-time-stats`:

Expand Down
6 changes: 4 additions & 2 deletions deps.edn
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{:deps {org.clojure/tools.logging {:mvn/version "1.2.4"}
org.clojure/clojure {:mvn/version "1.11.1"}}
{:deps {com.github.ivarref/double-trouble {:mvn/version "0.1.102"}
org.clojure/tools.logging {:mvn/version "1.2.4"}
org.clojure/clojure {:mvn/version "1.11.1"}}

:paths ["src"]

Expand All @@ -11,6 +12,7 @@
clojure-term-colors/clojure-term-colors {:mvn/version "0.1.0"}
com.datomic/datomic-pro {:mvn/version "1.0.6316" :exclusions [org.slf4j/slf4j-nop]}
org.postgresql/postgresql {:mvn/version "9.3-1102-jdbc41"}
com.taoensso/nippy {:mvn/version "3.2.0"}
io.github.cognitect-labs/test-runner {:git/tag "v0.5.0" :git/sha "b3fd0d2"}}
:jvm-opts ["-DDISABLE_SPY=true"
"-DTAOENSSO_TIMBRE_MIN_LEVEL_EDN=:error"]
Expand Down
9 changes: 7 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,19 @@
<packaging>jar</packaging>
<groupId>com.github.ivarref</groupId>
<artifactId>yoltq</artifactId>
<version>0.2.62</version>
<version>0.2.63</version>
<name>yoltq</name>
<dependencies>
<dependency>
<groupId>org.clojure</groupId>
<artifactId>clojure</artifactId>
<version>1.11.1</version>
</dependency>
<dependency>
<groupId>com.github.ivarref</groupId>
<artifactId>double-trouble</artifactId>
<version>0.1.102</version>
</dependency>
<dependency>
<groupId>org.clojure</groupId>
<artifactId>tools.logging</artifactId>
Expand All @@ -30,7 +35,7 @@
<scm>
<connection>scm:git:git://github.com/ivarref/yoltq.git</connection>
<developerConnection>scm:git:ssh://[email protected]/ivarref/yoltq.git</developerConnection>
<tag>v0.2.62</tag>
<tag>v0.2.63</tag>
<url>https://github.com/ivarref/yoltq</url>
</scm>
</project>
95 changes: 59 additions & 36 deletions src/com/github/ivarref/yoltq/impl.clj
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
(ns com.github.ivarref.yoltq.impl
(:require [datomic.api :as d]
[clojure.tools.logging :as log]
(:require [clojure.edn :as edn]
[clojure.string :as str]
[com.github.ivarref.yoltq.utils :as u]
[clojure.tools.logging :as log]
[com.github.ivarref.double-trouble :as dt]
[com.github.ivarref.yoltq.ext-sys :as ext]
[clojure.edn :as edn]))

[com.github.ivarref.yoltq.utils :as u]
[datomic.api :as d])
(:import (java.time Year)))

(def schema
[#:db{:ident :com.github.ivarref.yoltq/id, :cardinality :db.cardinality/one, :valueType :db.type/uuid, :unique :db.unique/identity}
#:db{:ident :com.github.ivarref.yoltq/ext-id, :cardinality :db.cardinality/one, :valueType :db.type/string, :unique :db.unique/value}
#:db{:ident :com.github.ivarref.yoltq/queue-name, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true}
#:db{:ident :com.github.ivarref.yoltq/status, :cardinality :db.cardinality/one, :valueType :db.type/keyword, :index true}
#:db{:ident :com.github.ivarref.yoltq/payload, :cardinality :db.cardinality/one, :valueType :db.type/string}
#:db{:ident :com.github.ivarref.yoltq/payload-bytes, :cardinality :db.cardinality/one, :valueType :db.type/bytes}
#:db{:ident :com.github.ivarref.yoltq/opts, :cardinality :db.cardinality/one, :valueType :db.type/string}
#:db{:ident :com.github.ivarref.yoltq/bindings, :cardinality :db.cardinality/one, :valueType :db.type/string}
#:db{:ident :com.github.ivarref.yoltq/tries, :cardinality :db.cardinality/one, :valueType :db.type/long, :noHistory true}
Expand Down Expand Up @@ -41,13 +43,22 @@
(log/error "could not read-string" what ":" (ex-message e))
(throw e))))

(defn default-partition-fn [_queue-keyword]
(keyword "yoltq" (str "queue_" (.getValue (Year/now)))))

(defn put [{:keys [capture-bindings conn] :as config}
(defn put [{:keys [capture-bindings conn encode partition-fn]
:or {partition-fn default-partition-fn
encode (partial pr-str-safe :payload)}
:as config}
queue-name
payload
opts]
(if-let [q-config (get-in config [:handlers queue-name])]
(let [id (u/squuid)
encode (get q-config :encode encode)
partition-fn (get q-config :partition-fn partition-fn)
partition (partition-fn queue-name)
_ (assert (keyword? partition) "Partition must be a keyword")
depends-on (get q-config :depends-on (fn [_] nil))
valid-payload? (get q-config :valid-payload? (fn [_] true))
opts (merge
Expand All @@ -58,32 +69,41 @@
(assoc o (symbol k) (deref k)))
{}
(or capture-bindings []))
(pr-str-safe :capture-bindings))]
(when-not (valid-payload? payload)
(log/error "Payload was not valid. Payload was:" payload)
(throw (ex-info (str "Payload was not valid: " payload) {:payload payload})))
(pr-str-safe :capture-bindings))
_ (when-not (valid-payload? payload)
(log/error "Payload was not valid. Payload was:" payload)
(throw (ex-info (str "Payload was not valid: " payload) {:payload payload})))
encoded (encode payload)
_ (when (not (or (bytes? encoded) (string? encoded)))
(log/error "Payload must be encoded to either a string or a byte array")
(throw (ex-info (str "Payload must be encoded to a string or a byte array. Payload: " payload) {:payload payload})))]
(log/debug "queue item" (str id) "for queue" queue-name "is pending status" u/status-init)
(merge
{:com.github.ivarref.yoltq/id id
:com.github.ivarref.yoltq/queue-name queue-name
:com.github.ivarref.yoltq/status u/status-init
:com.github.ivarref.yoltq/payload (pr-str-safe :payload payload)
:com.github.ivarref.yoltq/bindings str-bindings
:com.github.ivarref.yoltq/opts (pr-str-safe :opts opts)
:com.github.ivarref.yoltq/lock (u/random-uuid)
:com.github.ivarref.yoltq/tries 0
:com.github.ivarref.yoltq/init-time (u/now-ms)
:com.github.ivarref.yoltq/version "2"}
(when-let [[q ext-id] (:depends-on opts)]
(when-not (d/q '[:find ?e .
:in $ ?ext-id
:where
[?e :com.github.ivarref.yoltq/ext-id ?ext-id]]
(d/db conn)
(pr-str-safe :depends-on [q ext-id]))
(throw (ex-info (str ":depends-on not found in database. Queue: " q ", id: " ext-id) opts))))
(when-let [ext-id (:id opts)]
{:com.github.ivarref.yoltq/ext-id (pr-str-safe :id [queue-name ext-id])})))
(do
(dt/ensure-partition! conn partition)
(merge
(if (bytes? encoded)
{:com.github.ivarref.yoltq/payload-bytes encoded}
{:com.github.ivarref.yoltq/payload encoded})
{:db/id (d/tempid partition)
:com.github.ivarref.yoltq/id id
:com.github.ivarref.yoltq/queue-name queue-name
:com.github.ivarref.yoltq/status u/status-init
:com.github.ivarref.yoltq/bindings str-bindings
:com.github.ivarref.yoltq/opts (pr-str-safe :opts opts)
:com.github.ivarref.yoltq/lock (u/random-uuid)
:com.github.ivarref.yoltq/tries 0
:com.github.ivarref.yoltq/init-time (u/now-ms)
:com.github.ivarref.yoltq/version "2"}
(when-let [[q ext-id] (:depends-on opts)]
(when-not (d/q '[:find ?e .
:in $ ?ext-id
:where
[?e :com.github.ivarref.yoltq/ext-id ?ext-id]]
(d/db conn)
(pr-str-safe :depends-on [q ext-id]))
(throw (ex-info (str ":depends-on not found in database. Queue: " q ", id: " ext-id) opts))))
(when-let [ext-id (:id opts)]
{:com.github.ivarref.yoltq/ext-id (pr-str-safe :id [queue-name ext-id])}))))
(do
(log/error "Did not find registered handler for queue" queue-name)
(throw (ex-info (str "Did not find registered handler for queue: " queue-name) {:queue queue-name})))))
Expand Down Expand Up @@ -169,20 +189,23 @@
"in" (format "%.1f" (double (/ spent-ns 1e6))) "ms"]))


(defn execute! [{:keys [handlers mark-status-fn! start-execute-time collect-spent-time!]
:or {mark-status-fn! mark-status!}
(defn execute! [{:keys [decode handlers mark-status-fn! start-execute-time collect-spent-time!]
:or {mark-status-fn! mark-status!
decode edn/read-string}
:as cfg}
{:com.github.ivarref.yoltq/keys [status id queue-name payload] :as queue-item}]
{:com.github.ivarref.yoltq/keys [status id queue-name payload payload-bytes] :as queue-item}]
(when queue-item
(if (= :error status)
(assoc queue-item :failed? true)
(if-let [queue (get handlers queue-name)]
(let [{:keys [f allow-cas-failure?]} queue]
(let [{:keys [f allow-cas-failure?]} queue
decode (get queue :decode decode)]
(log/debug "queue item" (str id) "for queue" queue-name "is now processing")
(let [{:keys [retval exception]}
(try
(swap! start-execute-time assoc (Thread/currentThread) [(ext/now-ms) id queue-name])
(let [v (f payload)]
(let [payload (decode (or payload payload-bytes))
v (f payload)]
{:retval v})
(catch Throwable t
{:exception t})
Expand Down
1 change: 0 additions & 1 deletion src/com/github/ivarref/yoltq/utils.clj
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
(defn get-queue-item [db id]
(-> (d/pull db '[:*] [:com.github.ivarref.yoltq/id id])
(dissoc :db/id)
(update :com.github.ivarref.yoltq/payload edn/read-string)
(update :com.github.ivarref.yoltq/opts (fn [s] (or (when s (edn/read-string s)) {})))
(update :com.github.ivarref.yoltq/bindings
(fn [s]
Expand Down
98 changes: 84 additions & 14 deletions test/com/github/ivarref/yoltq/virtual_test.clj
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
(ns com.github.ivarref.yoltq.virtual-test
(:require [clojure.test :refer [deftest is use-fixtures] :refer-macros [thrown?]]
[clojure.tools.logging :as log]
[com.github.ivarref.yoltq :as yq]
[com.github.ivarref.yoltq.error-poller :as error-poller]
[com.github.ivarref.yoltq.ext-sys :as ext]
[com.github.ivarref.yoltq.impl :as i]
[com.github.ivarref.yoltq.migrate :as migrate]
[com.github.ivarref.yoltq.test-queue :as tq]
[com.github.ivarref.yoltq.test-utils :as u]
[com.github.ivarref.yoltq.utils :as uu]
[datomic-schema.core]
[datomic.api :as d]
[taoensso.timbre :as timbre])
(:import (java.time Duration)))
(:require
[clojure.string :as str]
[clojure.test :refer [deftest is use-fixtures] :refer-macros [thrown?]]
[clojure.tools.logging :as log]
[com.github.ivarref.yoltq :as yq]
[com.github.ivarref.yoltq.error-poller :as error-poller]
[com.github.ivarref.yoltq.ext-sys :as ext]
[com.github.ivarref.yoltq.impl :as i]
[com.github.ivarref.yoltq.migrate :as migrate]
[com.github.ivarref.yoltq.test-queue :as tq]
[com.github.ivarref.yoltq.test-utils :as u]
[com.github.ivarref.yoltq.utils :as uu]
[datomic-schema.core]
[datomic.api :as d]
[taoensso.nippy :as nippy]
[taoensso.timbre :as timbre])
(:import (java.time Duration LocalDateTime)))


(use-fixtures :each tq/call-with-virtual-queue!)
Expand Down Expand Up @@ -380,3 +383,70 @@
(is (= 0 (error-poller/do-poll-errors @yq/*config* (ext/now-ms))))
(is (= 0 (error-poller/do-poll-errors @yq/*config* (+ (dec (.toMillis (Duration/ofMinutes 15))) (ext/now-ms)))))
(is (= 1 (error-poller/do-poll-errors @yq/*config* (+ (.toMillis (Duration/ofMinutes 15)) (ext/now-ms)))))))

(deftest global-encode-decode
(let [conn (u/empty-conn)
ldt (LocalDateTime/now)
got-work (atom nil)]
(yq/init! {:conn conn
:encode nippy/freeze
:decode nippy/thaw})
(yq/add-consumer! :q (fn [work] (reset! got-work work)))
@(d/transact conn [(yq/put :q {:work ldt})])
(tq/consume! :q)
(is (= @got-work {:work ldt}))))

(deftest queue-encode-decode
(let [conn (u/empty-conn)
ldt (LocalDateTime/now)
got-work (atom nil)]
(yq/init! {:conn conn})
(yq/add-consumer! :q (fn [work] (reset! got-work work))
{:encode nippy/freeze
:decode nippy/thaw})
@(d/transact conn [(yq/put :q {:work ldt})])
(tq/consume! :q)
(is (= @got-work {:work ldt}))))

(deftest global-partition
(let [conn (u/empty-conn)
got-work (atom nil)]
(yq/init! {:conn conn
:partition-fn (fn [_queue-name] :my-part)})
(yq/add-consumer! :q (fn [work] (reset! got-work work)))
@(d/transact conn [(yq/put :q {:work 123})])
(tq/consume! :q)
(is (some? (d/q '[:find ?e .
:in $ ?part
:where
[?e :db/ident ?part]]
(d/db conn)
:my-part)))
(is (= @got-work {:work 123}))))

(deftest partition-per-queue
(let [conn (u/empty-conn)
got-work (atom nil)]
(yq/init! {:conn conn})
(yq/add-consumer! :q (fn [work] (reset! got-work work))
{:partition-fn (fn [_queue-name] :my-part)})
@(d/transact conn [(yq/put :q {:work 123})])
(tq/consume! :q)
(is (some? (d/q '[:find ?e .
:in $ ?part
:where
[?e :db/ident ?part]]
(d/db conn)
:my-part)))
(is (= @got-work {:work 123}))))

(deftest string-encode-decode
(let [conn (u/empty-conn)
got-work (atom nil)]
(yq/init! {:conn conn
:encode (fn [x] (str/join (reverse x)))
:decode (fn [x] (str/join (reverse x)))})
(yq/add-consumer! :q (fn [work] (reset! got-work work)))
@(d/transact conn [(yq/put :q "asdf")])
(tq/consume! :q)
(is (= @got-work "asdf"))))

0 comments on commit 8f945d8

Please sign in to comment.