I am trying to implement a simple stateful transducer that counts the number of items (with ClojureScript):
(defn stateful-counter []
(fn [xf]
(let [counter (atom 0)]
(fn
([] (xf))
([result]
(xf (xf result @counter)))
([result _]
(swap! counter inc)
result)))))
When running this on a sequence I get the output:
(into [] (stateful-counter) (range 5))
[5]
This is exactly what I expect.
When running this on a core.async channel I get an infinite sequence of 5s:
(go (println
(<! (let [c (async/chan 1 (stateful-counter))]
(async/onto-chan! c (range 5))
(async/into []
(async/take 10 c))))))
[5 5 5 5 5 5 5 5 5 5]
If I don't use (async/take 10 _) there seems to be an infinite loop. The expected outcome would be [5].
I have also tried using net.cgrand.xforms/count from the xforms library and get the same unexpected result:
(go (println
(<! (let [c (async/chan 1 xforms/count)]
(async/onto-chan! c (range 5))
(async/into []
(async/take 10 c))))))
I was assuming that I made an implementation error in stateful-counter that caused the loop. But am confused that xforms/count produces the same (unexpected) result.
This is a ClojureScript snippet, but I have been able to produce the same unexpected result with Clojure.
Can somebody help me understand why applying the stateful transducer on a core.async channel results in an infinite sequence?
EDIT: Same unexpected behavior seen with Clojure.