Welcome! Please see the About page for a little more info on how this works.

0 votes
in core.async by

I have a use case for clojure.core.async/pipeline whereby the order of the outputs is unimportant and it is desirable to have the n^th^ output present on the to channel without having to wait for the first output to be ready.

I don't have a great deal of experience with clojure; I'm wondering if this is something that belongs in the core library or whether there's some idiomatic way of achieving an unordered pipeline without rewriting the pipeline function. In any case, I have made an attempt at rewriting pipeline, and learnt a thing or two in the process :)

Obviously, this code (below) cannot be merged as it completely breaks the existing behaviour (not to mention I've barely tested it), but I would be appreciative if you could provide some feedback and/or consider adding this functionality. Perhaps a new function, {{unordered-pipeline}}?

`
(defn- pipeline*
([n to xf from close? ex-handler type]

 (assert (pos? n))
 (let [ex-handler (or ex-handler (fn [ex]
                                   (-> (Thread/currentThread)
                                       .getUncaughtExceptionHandler
                                       (.uncaughtException (Thread/currentThread) ex))
                                   nil))
       jobs (chan n)
       results (chan n)
       process (fn [[v p :as job]]
                 (if (nil? job)
                   (comment closing results here would be too early)
                   (let [res (chan 1 xf ex-handler)]
                     (>!! res v)
                     (close! res)
                     (put! p res)
                     true)))
       async (fn [[v p :as job]]
               (if (nil? job)
                 (comment closing results here would be too early)
                 (let [res (chan 1)]
                   (xf v res)
                   (put! p res)
                   true)))]
   (go-loop []
            (let [v (<! from)]
              (if (nil? v)
                (close! jobs)
                (do
                  (comment removed the indirection and pass results as part of the job)
                  (>! jobs [v results])
                  (recur)))))
   (go-loop []
            (let [res (<! results)]
              (if (nil? res)
                (when close? (close! to))
                (do (loop []
                      (let [v (<! res)]
                        (when (and (not (nil? v)) (>! to v))
                          (recur))))
                    (recur)))))
   (go
     (comment ensure results is closed after all jobs have been processed)
     (<! (async/merge
           (map #(case %
                   :blocking (thread
                               (let [job (<!! jobs)]
                                 (when (process job)
                                   (recur))))
                   :compute (go-loop []
                                     (let [job (<! jobs)]
                                       (when (process job)
                                         (recur))))
                   :async (go-loop []
                                   (let [job (<! jobs)]
                                     (when (async job)
                                       (recur)))))
                (repeat n type))))
     (close! results)))))

`

3 Answers

0 votes
by

Comment made by: phiware

I've only tried the {{:compute}} scenario...

(snooze [] (Thread/sleep (+ 5 (rand-int 10)))) (defn slowly [f] (fn ([] (snooze) (let [out (f)] (snooze) out)) ([r] (snooze) (let [out (f r)] (snooze) out)) ([r i] (snooze) (let [out (f r i)] (snooze) out)))) (def cout (chan)) (pipeline 3 cout (comp slowly (map inc)) (async/to-chan (range 18))) (<!! (async/into [] cout)) -> [1 3 2 4 5 6 8 9 7 10 12 11 13 14 15 16 17 18]

Each group of 3 (n) is outputted in a random order :)

0 votes
by

Comment made by: lgs32a

I have had this scenario also. A simple way to do it is to use a channel with a dropping-buffer of size 0 as the to channel and fill the unordered target channel from within the op via put! yourself. In case that you want to get rid of all pipeline overhead its very easy to spawn n thread loops that take from the same channel a thunk (fn (link: ) (put! unordered-target-chan (heavy-calc...)) that puts the result on your desired target channel.

0 votes
by
Reference: https://clojure.atlassian.net/browse/ASYNC-150 (reported by phiware)
...