I've been toying with this idea for a bit and thought I'd share. It's basically a way to add things to an atom subject to a transducer, while also keeping a promise to track if the result is reduced?
. Sort of in the way that you can give a core.async/chan
a transducer
(defn mkadder
"returns a function which, given a value, adds that value the the atom a with the reducing function rf. If the result is reduced, deliver the dereferenced result to promise p."
[a p rf]
(fn [x]
(let [result (rf a x)]
(if (reduced? result)
(do (deliver p @@result) @result) ;;what to return?
(defn acc
"accumulates state in an atom subject to a transducer. returns a map
with the keys :adder, :a and :p. Use the :add! function to add
state. :p is a promise which will be delivered the state in a when
rf is realized"
([xf rf] (acc xf rf (rf)))
([xf rf init]
(let [a (atom init)
swapper (fn [acc x] (doto acc (swap! rf x)))
rf (xf swapper)
p (promise)]
{:add! (mkadder a p rf) :a a :p p})))
(let [{:keys [add! a p]}
(acc (comp (map inc) (filter even?) (take 5)) conj)]
(doseq [i (range 20)]
(add! i)
(println "atom contains: " @a "promise realized?" (realized? p))
(Thread/sleep 500))))
I have a case where I have a set number of incoming messages from an outside source that need to be subjected to filter
and take
it's nice to be able to leverage these clojure.core features than to have to manage the state myself. And core.async would be a bit overkill (I don't need CSP).