I have a use case for clojure.core.async/pipeline whereby the order of the outputs is unimportant and it is desirable to have the n^th^ output present on the to channel without having to wait for the first output to be ready.
I don't have a great deal of experience with clojure; I'm wondering if this is something that belongs in the core library or whether there's some idiomatic way of achieving an unordered pipeline without rewriting the pipeline function. In any case, I have made an attempt at rewriting pipeline, and learnt a thing or two in the process :)
Obviously, this code (below) cannot be merged as it completely breaks the existing behaviour (not to mention I've barely tested it), but I would be appreciative if you could provide some feedback and/or consider adding this functionality. Perhaps a new function, {{unordered-pipeline}}?
`
(defn- pipeline*
([n to xf from close? ex-handler type]
(assert (pos? n))
(let [ex-handler (or ex-handler (fn [ex]
(-> (Thread/currentThread)
.getUncaughtExceptionHandler
(.uncaughtException (Thread/currentThread) ex))
nil))
jobs (chan n)
results (chan n)
process (fn [[v p :as job]]
(if (nil? job)
(comment closing results here would be too early)
(let [res (chan 1 xf ex-handler)]
(>!! res v)
(close! res)
(put! p res)
true)))
async (fn [[v p :as job]]
(if (nil? job)
(comment closing results here would be too early)
(let [res (chan 1)]
(xf v res)
(put! p res)
true)))]
(go-loop []
(let [v (<! from)]
(if (nil? v)
(close! jobs)
(do
(comment removed the indirection and pass results as part of the job)
(>! jobs [v results])
(recur)))))
(go-loop []
(let [res (<! results)]
(if (nil? res)
(when close? (close! to))
(do (loop []
(let [v (<! res)]
(when (and (not (nil? v)) (>! to v))
(recur))))
(recur)))))
(go
(comment ensure results is closed after all jobs have been processed)
(<! (async/merge
(map #(case %
:blocking (thread
(let [job (<!! jobs)]
(when (process job)
(recur))))
:compute (go-loop []
(let [job (<! jobs)]
(when (process job)
(recur))))
:async (go-loop []
(let [job (<! jobs)]
(when (async job)
(recur)))))
(repeat n type))))
(close! results)))))
`