You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
thank you very, very much for this incredible project and the effort you put into it.
I am wondering whether I understood the execution model correctly. I am trying to build a work queue, were data is put irregularly onto a stream via manifold.stream/put!. My naïve idea was to manifold.stream/map the (blocking) worker function with multiple workers (a manifold.executor/fixed-thread-executor pool assigned via manifold.stream/onto) and consume the results with manifold.stream/consume. But this resulted in a serial processing of the input. I found #126, but am unsure if that is the proper way.
At Slack, @dm3 was super-helpful and patiently went through this with me, and came up with the following proposal:
(defnfork"Takes an `src` stream and returns `n` forked streams which will receive messages from `src` on the first-come, first-served basis. Once the `src` stream is exhausted, the forked streams will be closed as well. Takes a map of options as the third argument: * `pool` - the executor where the execution will happen. Uses the Manifold `execute-pool` by default. * `generator` - function which produces the forked streams. Unbuffered `stream` by default."
([src n] (fork src n {:pool (ex/execute-pool), :generator s/stream}))
([src n {:keys [pool generator]}]
(let [src' (s/stream)
dsts (take n (repeatedly generator))
^java.util.concurrent.BlockingQueue ready
(doto (java.util.concurrent.ArrayBlockingQueue. n)
(.addAll dsts))
free-up! #(.offer ready %)
next! #(.take ready)
send! #(-> (s/put! %1 %2)
(d/chain
(fn [result]
(if result
(free-up! %1)
(s/close! %1)))))]
;; in case anyone else wants to consume `src`
(s/connect src src')
(d/loop [dsts (.take ready)]
(-> (s/take! src' ::none)
(d/chain
(fn [result]
(if (= result ::none)
(doseq [d dsts]
(s/close! d))
(do (d/future-with pool
(send! dsts result))
(d/chain (next!)
#(d/recur %))))))))
(map s/source-only dsts))))
What is your valued opinion on realizing concurrent producer/consumer queues with manifold?
The text was updated successfully, but these errors were encountered:
Hello,
thank you very, very much for this incredible project and the effort you put into it.
I am wondering whether I understood the execution model correctly. I am trying to build a work queue, were data is put irregularly onto a stream via
manifold.stream/put!
. My naïve idea was tomanifold.stream/map
the (blocking) worker function with multiple workers (amanifold.executor/fixed-thread-executor
pool assigned viamanifold.stream/onto
) and consume the results withmanifold.stream/consume
. But this resulted in a serial processing of the input. I found #126, but am unsure if that is the proper way.At Slack, @dm3 was super-helpful and patiently went through this with me, and came up with the following proposal:
What is your valued opinion on realizing concurrent producer/consumer queues with
manifold
?The text was updated successfully, but these errors were encountered: