Welcome! Please see the About page for a little more info on how this works.

+4 votes
in Transducers by

In Rich Hickey's talks Transducers (2014) he speaks about transducer contexts that exist "today" — collections and core.async, and what might be "tomorrow" (observables).

Now that 5 years has gone, I wonder, what are other transducer contexts that are created and used in the wild?

4 Answers

+4 votes

There's several in the xforms lib.

+1 vote

I think that you end up creating your own when you need them - for example, we process gazillions of events from web sockets and write them to Kafka topics, where they get processed multiple times by different downstream consumers. Transducers are a good fit for these operations and we use them extensively.

0 votes

I've been toying with this idea for a bit and thought I'd share. It's basically a way to add things to an atom subject to a transducer, while also keeping a promise to track if the result is reduced?. Sort of in the way that you can give a core.async/chan a transducer

(defn mkadder
  "returns a function which, given a value, adds that value the the atom a with the reducing function rf. If the result is reduced, deliver the dereferenced result to promise p."
  [a p rf]
  (fn [x]
    (let [result (rf a x)]
      (if (reduced? result)
        (do (deliver p @@result) @result) ;;what to return?

(defn acc
  "accumulates state in an atom subject to a transducer. returns a map
  with the keys :adder, :a and :p. Use the :add! function to add
  state. :p is a promise which will be delivered the state in a when
  rf is realized"
  ([xf rf] (acc xf rf (rf)))
  ([xf rf init]
   (let [a       (atom init)
         swapper (fn [acc x] (doto acc (swap! rf x)))
         rf      (xf swapper)
         p       (promise)]
     {:add! (mkadder a p rf) :a a :p p})))

(let [{:keys [add! a p]}
      (acc (comp (map inc) (filter even?) (take 5)) conj)]
    (doseq [i (range 20)]
      (add! i)
      (println "atom contains: " @a "promise realized?" (realized? p))
      (Thread/sleep 500))))

I have a case where I have a set number of incoming messages from an outside source that need to be subjected to filter and take/take-while it's nice to be able to leverage these clojure.core features than to have to manage the state myself. And core.async would be a bit overkill (I don't need CSP).