Skip to content

Commit

Permalink
Release 0.2.64: Allow for infinitive retries
Browse files Browse the repository at this point in the history
  • Loading branch information
ivarref committed Mar 20, 2023
1 parent 8f945d8 commit e848610
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 13 deletions.
16 changes: 13 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,9 @@ the payload. It can be added like this:
; consumer function to ensure idempotence.
:valid-payload? (fn [payload] (some? (:id payload))) ; Function that verifies payload. Should return truthy for valid payloads.
; The default function always returns true.
:max-retries 10}) ; Specify maximum number of times an item will be retried. Default: 100
:max-retries 10}) ; Specify maximum number of times an item will be retried. Default: 10000.
; If :max-retries is given as 0, the job will ~always be retried, i.e.
; 9223372036854775807 times (Long/MAX_VALUE).
```

The `payload` will be deserialized from the database using `clojure.edn/read-string` before invocation, i.e.
Expand Down Expand Up @@ -218,8 +220,10 @@ is shut down abruptly during processing of queue jobs.

### Giving up

A queue job will remain in status `:error` once `:max-retries` (default: 100) have been reached.
Ideally this will not happen. ¯\\\_(ツ)\_
A queue job will remain in status `:error` once `:max-retries` (default: 10000) have been reached.
If `:max-retries` is given as `0`, the job will be retried 9223372036854775807 times before
giving up.
Ideally this should not happen. ¯\\\_(ツ)\_

### Custom encoding and decoding

Expand Down Expand Up @@ -430,6 +434,12 @@ If you liked this library, you may also like:

## Change log

#### 2023-03-20 v0.2.64 [diff](https://github.com/ivarref/yoltq/compare/v0.2.63...v0.2.64)
Added support for `max-retries` being `0`, meaning the job should be retried forever
(or at least 9223372036854775807 times).

Changed the default for `max-retries` from `100` to `10000`.

#### 2022-11-18 v0.2.63 [diff](https://github.com/ivarref/yoltq/compare/v0.2.62...v0.2.63)
Added custom `:encode` and `:decode` support.

Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<packaging>jar</packaging>
<groupId>com.github.ivarref</groupId>
<artifactId>yoltq</artifactId>
<version>0.2.63</version>
<version>0.2.64</version>
<name>yoltq</name>
<dependencies>
<dependency>
Expand Down Expand Up @@ -35,7 +35,7 @@
<scm>
<connection>scm:git:git://github.com/ivarref/yoltq.git</connection>
<developerConnection>scm:git:ssh://[email protected]/ivarref/yoltq.git</developerConnection>
<tag>v0.2.63</tag>
<tag>v0.2.64</tag>
<url>https://github.com/ivarref/yoltq</url>
</scm>
</project>
7 changes: 5 additions & 2 deletions src/com/github/ivarref/yoltq.clj
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@
(-> {; Default number of times a queue job will be retried before giving up
; Can be overridden on a per-consumer basis with
; (yq/add-consumer! :q (fn [payload] ...) {:max-retries 200})
:max-retries 100
; If you want no limit on the number of retries, specify
; the value `0`. That will set the effective retry limit to
; 9223372036854775807 times.
:max-retries 10000

; Minimum amount of time to wait before a failed queue job is retried
:error-backoff-time (Duration/ofSeconds 5)
Expand Down Expand Up @@ -244,7 +247,7 @@
(defn retry-one-error! [qname]
(let [{:keys [handlers] :as cfg} @*config*
_ (assert (contains? handlers qname) "Queue not found")
cfg (assoc-in cfg [:handlers qname :max-retries] Integer/MAX_VALUE)]
cfg (assoc-in cfg [:handlers qname :max-retries] Long/MAX_VALUE)]
(poller/poll-once! cfg qname :error)))

(defn retry-stats
Expand Down
17 changes: 11 additions & 6 deletions src/com/github/ivarref/yoltq/utils.clj
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,18 @@
(prepare-processing db id queue-name old-lock :init))
(log/debug "no new-items in :init status for queue" queue-name))))

(defn- get-max-retries [cfg queue-name]
(let [v (get-in cfg [:handlers queue-name :max-retries] (:max-retries cfg))]
(if (and (number? v) (pos-int? v))
v
Long/MAX_VALUE)))

(defn get-error [{:keys [conn db error-backoff-time max-retries] :as cfg} queue-name]
(defn get-error [{:keys [conn db error-backoff-time] :as cfg} queue-name]
(assert (instance? Connection conn) (str "Expected conn to be of type datomic.Connection. Was: "
(str (if (nil? conn) "nil" conn))
"\nConfig was: " (str cfg)))
(let [db (or db (d/db conn))
max-retries (get-in cfg [:handlers queue-name :max-retries] max-retries)]
max-retries (get-max-retries cfg queue-name)]
(when-let [ids (->> (d/q '[:find ?id ?lock
:in $ ?queue-name ?backoff ?max-tries ?current-version
:where
Expand All @@ -118,26 +123,26 @@
[?e :com.github.ivarref.yoltq/error-time ?time]
[(>= ?backoff ?time)]
[?e :com.github.ivarref.yoltq/tries ?tries]
[(> ?max-tries ?tries)]
[(>= ?max-tries ?tries)]
[?e :com.github.ivarref.yoltq/id ?id]
[?e :com.github.ivarref.yoltq/lock ?lock]
[?e :com.github.ivarref.yoltq/version ?current-version]]
db
queue-name
(- (now-ms) error-backoff-time)
(inc max-retries)
max-retries
current-version)
(not-empty))]
(let [[id old-lock] (rand-nth (into [] ids))]
(prepare-processing db id queue-name old-lock :error)))))


(defn get-hung [{:keys [conn db now hung-backoff-time max-retries] :as cfg} queue-name]
(defn get-hung [{:keys [conn db now hung-backoff-time] :as cfg} queue-name]
(assert (instance? Connection conn) (str "Expected conn to be of type datomic.Connection. Was: "
(str (if (nil? conn) "nil" conn))
"\nConfig was: " (str cfg)))
(let [now (or now (now-ms))
max-retries (get-in cfg [:handlers queue-name :max-retries] max-retries)
max-retries (get-max-retries cfg queue-name)
db (or db (d/db conn))]
(when-let [ids (->> (d/q '[:find ?id ?lock ?tries
:in $ ?qname ?backoff ?current-version
Expand Down

0 comments on commit e848610

Please sign in to comment.