Welcome! Please see the About page for a little more info on how this works.

+3 votes
in Transducers by
recategorized by

I am trying to implement a simple stateful transducer that counts the number of items (with ClojureScript):

(defn stateful-counter []
  (fn [xf]
    (let [counter (atom 0)]

      (fn
        ([] (xf))

        ([result]
         (xf (xf result @counter)))

        ([result _]
          (swap! counter inc)
          result)))))

When running this on a sequence I get the output:

(into [] (stateful-counter) (range 5))

[5]

This is exactly what I expect.

When running this on a core.async channel I get an infinite sequence of 5s:

(go (println
     (<! (let [c (async/chan 1 (stateful-counter))]

           (async/onto-chan! c (range 5))

           (async/into []
                       (async/take 10 c))))))

[5 5 5 5 5 5 5 5 5 5]

If I don't use (async/take 10 _) there seems to be an infinite loop. The expected outcome would be [5].

I have also tried using net.cgrand.xforms/count from the xforms library and get the same unexpected result:

(go (println
     (<! (let [c (async/chan 1 xforms/count)]

           (async/onto-chan! c (range 5))

           (async/into []
                       (async/take 10 c))))))

I was assuming that I made an implementation error in stateful-counter that caused the loop. But am confused that xforms/count produces the same (unexpected) result.

This is a ClojureScript snippet, but I have been able to produce the same unexpected result with Clojure.

Can somebody help me understand why applying the stateful transducer on a core.async channel results in an infinite sequence?

EDIT: Same unexpected behavior seen with Clojure.

1 Answer

+1 vote
by

Have a look at partition-by:
https://github.com/clojure/clojure/blob/clojure-1.10.1/src/clj/clojure/core.clj#L7160
Especially the comment ";;clear first!", one of the two comments in Clojure core that has an exclamation point.

by
Thanks! Adding "(reset! counter nil)" (akin to "(.clear a)" in partition-by) makes it work:

    (defn stateful-counter []
      (fn [xf]
        (let [counter (atom 0)]
    
          (fn
            ([] (xf))
    
                ([result]
             (let [c @counter]
               (reset! counter nil)
               (xf (xf result c))))
    
            ([result _]
             (swap! counter inc)
             result)))))

I still don't understand why. Could you give me a hint on what is going on?

Also this then seems to be a bug in the xforms library?
by
Well, the manual (https://clojure.org/reference/transducers) says, "Completion (arity 1) ... This arity must call the rf completion arity exactly once."  It could be made more sweeping or emphatic - "exactly once EVER".  At any rate, your program now complies with that rule, and works.  P.S. AFAIK, a documentation enhancement suggestion could be made by way of a GitHub issue on the website project, https://github.com/clojure/clojure-site
by
Wouldn't this call the rf completion arity exactly once?

```
 (xf (xf result @counter))
```
by
edited by
Well my understanding has always been that this requirement is transitive: you should call the completing arity once but your own completing arity must also be called only once.
To me `partition-all` clearing is more about memory issues (not keeping undue references to potentially reclaimable objects) than to transducers.
(edited to correct autocorrection)
...