Skip to content

Commit

Permalink
Release 0.2.60
Browse files Browse the repository at this point in the history
Warn about not setting connection/socket-timeout when using clj-http ivarref#2

Add :healthy-allowed-error-time configuration option, default is 15 minutes
  • Loading branch information
ivarref committed Aug 18, 2022
1 parent 7d4477c commit 812a07b
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 28 deletions.
28 changes: 25 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,25 @@ Imagine the following code:
```clojure
(defn post-handler [user-input]
(let [db-item (process user-input)
ext-ref (clj-http.client/post ext-service {...})] ; may throw exception
ext-ref (clj-http.client/post ext-service {:connection-timeout 3000 ; timeout in milliseconds
:socket-timeout 10000 ; timeout in milliseconds
...})] ; may throw exception
@(d/transact conn [(assoc db-item :some/ext-ref ext-ref)])))
```

What if the POST request fails? Should it be retried? For how long?
Should it be allowed to fail? How do you then process failures later?

PS: If you do not set connection/socket-timeout, there is a chance that
clj-http/client will wait for all eternity in the case of a dropped TCP connection.

The queue way to solve this would be:

```clojure
(defn get-ext-ref [{:keys [id]}]
(let [ext-ref (clj-http.client/post ext-service {...})] ; may throw exception
(let [ext-ref (clj-http.client/post ext-service {:connection-timeout 3000 ; timeout in milliseconds
:socket-timeout 10000 ; timeout in milliseconds
...})] ; may throw exception
@(d/transact conn [[:db/cas [:some/id id]
:some/ext-ref
nil
Expand All @@ -82,7 +89,7 @@ The queue way to solve this would be:
(yq/add-consumer! :get-ext-ref get-ext-ref {:allow-cas-failure? true})

(defn post-handler [user-input]
(let [{:some/keys [id] :as db-item} (process user-input)
(let [{:some/keys [id] :as db-item} (process user-input)]
@(d/transact conn [db-item
(yq/put :get-ext-ref {:id id})])))
```
Expand Down Expand Up @@ -371,6 +378,21 @@ Note: I have not tried these libraries myself.

## Change log

#### 2022-08-18 v0.2.60 [diff](https://github.com/ivarref/yoltq/compare/v0.2.59...v0.2.60)
Improved: Added config option `:healthy-allowed-error-time`:
```
; If you are dealing with a flaky downstream service, you may not want
; yoltq to mark itself as unhealthy on the first failure encounter with
; the downstream service. Change this setting to let yoltq mark itself
; as healthy even though a queue item has been failing for some time.
:healthy-allowed-error-time (Duration/ofMinutes 15)
```

#### 2022-08-15 v0.2.59 [diff](https://github.com/ivarref/yoltq/compare/v0.2.58...v0.2.59)
Fixed:
* Race condition that made the following possible: `stop!` would terminate the slow thread
watcher, and a stuck thread could keep `stop!` from completing!

#### 2022-06-30 v0.2.58 [diff](https://github.com/ivarref/yoltq/compare/v0.2.57...v0.2.58)
Slightly more safe EDN printing and parsing.
Recommended reading:
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<packaging>jar</packaging>
<groupId>com.github.ivarref</groupId>
<artifactId>yoltq</artifactId>
<version>0.2.59</version>
<version>0.2.60</version>
<name>yoltq</name>
<dependencies>
<dependency>
Expand All @@ -30,7 +30,7 @@
<scm>
<connection>scm:git:git://github.com/ivarref/yoltq.git</connection>
<developerConnection>scm:git:ssh://[email protected]/ivarref/yoltq.git</developerConnection>
<tag>v0.2.59</tag>
<tag>v0.2.60</tag>
<url>https://github.com/ivarref/yoltq</url>
</scm>
</project>
14 changes: 10 additions & 4 deletions src/com/github/ivarref/yoltq.clj
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@
; otherwise occur if competing with the tx-report-queue listener.
:init-backoff-time (Duration/ofSeconds 60)

; If you are dealing with a flaky downstream service, you may not want
; yoltq to mark itself as unhealthy on the first failure encounter with
; the downstream service. Change this setting to let yoltq mark itself
; as healthy even though a queue item has been failing for some time.
:healthy-allowed-error-time (Duration/ofMinutes 15)

; How frequent polling for init, error and hung jobs should be done.
:poll-delay (Duration/ofSeconds 10)

Expand Down Expand Up @@ -259,10 +265,10 @@
(let [conn (d/connect uri)
started-consuming? (promise)
n 1]
(init! {:conn conn
:error-backoff-time (Duration/ofSeconds 1)
:poll-delay (Duration/ofSeconds 1)
:max-execute-time (Duration/ofSeconds 3)
(init! {:conn conn
:error-backoff-time (Duration/ofSeconds 1)
:poll-delay (Duration/ofSeconds 1)
:max-execute-time (Duration/ofSeconds 3)
:slow-thread-show-stacktrace? false})
(add-consumer! :q (fn [_]
(deliver started-consuming? true)
Expand Down
36 changes: 22 additions & 14 deletions src/com/github/ivarref/yoltq/error_poller.clj
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
(ns com.github.ivarref.yoltq.error-poller
(:require [datomic.api :as d]
[com.github.ivarref.yoltq.utils :as u]
(:require [clojure.tools.logging :as log]
[com.github.ivarref.yoltq.ext-sys :as ext]
[clojure.tools.logging :as log]))
[com.github.ivarref.yoltq.utils :as u]
[datomic.api :as d]))


(defn get-state [v]
Expand Down Expand Up @@ -64,31 +64,39 @@
{:run-callback :recovery}))))))


(defn do-poll-errors [{:keys [conn system-error
(defn do-poll-errors [{:keys [conn
system-error
on-system-error
on-system-recovery
healthy?]
healthy?
healthy-allowed-error-time]
:or {on-system-error (fn []
(log/error "There are yoltq queues which have errors")
nil)
on-system-recovery (fn []
(log/info "Yoltq recovered"))}
:as config}]
:as config}
now-ms]
(assert (some? conn) "expected :conn to be present")
(assert (some? system-error) "expected :system-error to be present")
(let [error-count (or (d/q '[:find (count ?e) .
:in $ ?status
(assert (nat-int? healthy-allowed-error-time) "expected :healthy-allowed-error-time to be present")
(let [max-init-time (- now-ms healthy-allowed-error-time)
error-count (or (d/q '[:find (count ?e) .
:in $ ?status ?max-init-time
:where
[?e :com.github.ivarref.yoltq/status ?status]]
[?e :com.github.ivarref.yoltq/status ?status]
[?e :com.github.ivarref.yoltq/init-time ?init-time]
[(<= ?init-time ?max-init-time)]]
(d/db conn)
u/status-error)
u/status-error
max-init-time)
0)]
(if (pos-int? error-count)
(do
(log/debug "poll-errors found" error-count "errors in system")
(reset! healthy? false))
(reset! healthy? true))
(let [{:keys [run-callback] :as new-state} (swap! system-error handle-error-count config (ext/now-ms) error-count)]
(let [{:keys [run-callback] :as new-state} (swap! system-error handle-error-count config now-ms error-count)]
(when run-callback
(cond (= run-callback :error)
(on-system-error)
Expand All @@ -99,18 +107,18 @@
:else
(log/error "unhandled callback-type" run-callback))
(log/debug "run-callback is" run-callback))
new-state)))
error-count)))


(defn poll-errors [running? config-atom]
(try
(when @running?
(do-poll-errors @config-atom))
(do-poll-errors @config-atom (ext/now-ms)))
(catch Throwable t
(log/error t "unexpected error in poll-errors:" (ex-message t))
nil)))


(comment
(do-poll-errors @com.github.ivarref.yoltq/*config*))
(do-poll-errors @com.github.ivarref.yoltq/*config* (ext/now-ms)))

8 changes: 4 additions & 4 deletions test/com/github/ivarref/yoltq/error_poller_test.clj
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
(ns com.github.ivarref.yoltq.error-poller-test
(:require [clojure.test :refer [deftest is]]
[com.github.ivarref.yoltq.error-poller :as ep]
(:require [clojure.edn :as edn]
[clojure.test :refer [deftest is]]
[clojure.tools.logging :as log]
[com.github.ivarref.yoltq.log-init :as logconfig]
[clojure.edn :as edn]))
[com.github.ivarref.yoltq.error-poller :as ep]
[com.github.ivarref.yoltq.log-init :as logconfig]))


(deftest error-poller
Expand Down
15 changes: 14 additions & 1 deletion test/com/github/ivarref/yoltq/virtual_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@
(:require [clojure.test :refer [deftest is use-fixtures] :refer-macros [thrown?]]
[clojure.tools.logging :as log]
[com.github.ivarref.yoltq :as yq]
[com.github.ivarref.yoltq.error-poller :as error-poller]
[com.github.ivarref.yoltq.ext-sys :as ext]
[com.github.ivarref.yoltq.impl :as i]
[com.github.ivarref.yoltq.migrate :as migrate]
[com.github.ivarref.yoltq.test-queue :as tq]
[com.github.ivarref.yoltq.test-utils :as u]
[com.github.ivarref.yoltq.utils :as uu]
[datomic-schema.core]
[datomic.api :as d]
[taoensso.timbre :as timbre]))
[taoensso.timbre :as timbre])
(:import (java.time Duration)))


(use-fixtures :each tq/call-with-virtual-queue!)
Expand Down Expand Up @@ -367,3 +370,13 @@
(is (= #{{:id "a"}} @received))
#_(timbre/with-level :fatal
(is (thrown? Exception @(d/transact conn [(yq/put :q {})]))))))

(deftest healthy-allowed-error-time-test
(let [conn (u/empty-conn)]
(yq/init! {:conn conn})
(yq/add-consumer! :q (fn [_] (throw (ex-info "" {}))))
@(d/transact conn [(yq/put :q {:work 123})])
(tq/consume-expect! :q :error)
(is (= 0 (error-poller/do-poll-errors @yq/*config* (ext/now-ms))))
(is (= 0 (error-poller/do-poll-errors @yq/*config* (+ (dec (.toMillis (Duration/ofMinutes 15))) (ext/now-ms)))))
(is (= 1 (error-poller/do-poll-errors @yq/*config* (+ (.toMillis (Duration/ofMinutes 15)) (ext/now-ms)))))))

0 comments on commit 812a07b

Please sign in to comment.