Share your thoughts in the 2024 State of Clojure Survey!

Welcome! Please see the About page for a little more info on how this works.

+1 vote
in core.async by
When a mult is tapped at around the same time as the source channel is closed, the tapped channel may not be closed.


(require '[clojure.core.async :refer (chan mult tap close!)])
(let [s (chan)
      m (mult s)
      c (chan)]
  (tap m c)
  (close! s)
  (impl/closed? c))


The above code will sometimes return true, and sometimes return false.

*Cause:* This is caused by the following code in the {{mult}} function:


(if (nil? val)
  (doseq [[c close?] @cs]
    (when close? (close! c)))


Any channels tapped after cs is dereferenced will not be closed.

*Approach:* A possible solution to this could be to always close channels tapped to a closed source. i.e.


(let [s (chan)
      m (mult s)
      c (chan)]
  (close! s)
  (tap m c))  ;; will always close c


This could be achieved by adding a flag to the cs atom to denote whether the mult is open or closed. If it's closed, any tapped channel is closed automatically.

7 Answers

0 votes
by

Comment made by: jreeves

For reference, below is the custom fix for mult I'm using:

`
(defn mult [ch]
(let [state (atom [true {}])

    m (reify
        Mux
        (muxch* [_] ch)
        Mult
        (tap* [_ ch close?]
          (let [add-ch    (fn [[o? cs]] [o? (if o? (assoc cs ch close?) cs)])
                [open? _] (swap! state add-ch)]
            (when-not open? (close! ch))
            nil))
        (untap* [_ ch]
          (swap! state (fn [[open? cs]] [open? (dissoc cs ch)]))
          nil)
        (untap-all* [_]
          (swap! state (fn [[open? _]] [open? {}]))))
    dchan (chan 1)
    dctr (atom nil)
    done (fn [_] (when (zero? (swap! dctr dec))
                   (put! dchan true)))]
(go-loop []
  (let [val (<! ch)]
    (if (nil? val)
      (let [[_ cs] (swap! state (fn [[_ cs]] [false cs]))]
        (doseq [[c close?] cs]
          (when close? (close! c))))
      (let [chs (keys (second @state))]
        (reset! dctr (count chs))
        (doseq [c chs]
          (when-not (put! c val done)
            (swap! dctr dec)
            (untap* m c)))
        (when (seq chs)
          (<! dchan))
        (recur)))))
m))

`

0 votes
by

Comment made by: dnolen

Is this also fixed in master? Thanks.

0 votes
by

Comment made by: gshayban

I understand the scenario, but honestly I'm not sure this is a bug in mult or the usage. A channel shouldn't be expected to always yield a take. The consumer of the "late tap" can guard against it with alts or some other mechanism, and also you can enforce a no-late-taps through a policy on the "production" side of things.

(link: ~richhickey) can you weigh in?

0 votes
by

Comment made by: jreeves

The "tap" function currently has an explicit "close?" flag, and if a tapped channel isn't guaranteed to close when the source channel closes, that argument probably shouldn't exist. Also, if auto-closing taps is taken out, should we remove the "close?" argument on "sub" as well?

0 votes
by

Comment made by: gshayban

It's more than respecting the flag. Related to the close behavior, channels can tap and untap without receiving anything while the mult process happily distributes a value to another set of channels (like the ABA problem). Could also make it an error to tap after the close is distributed to the last deref'ed set of channels. That is different than the familiar permanent nil receive, but mults already differ from simple channels.

0 votes
by
_Comment made by: stuart.sierra_

I was recently working on a system which relied on the default behavior of {{mult}} and {{pipeline}} to automatically close downstream channels. But sometimes the initial "input" channel was closed very quickly, while the graph of channels was still being constructed. As a result, some output channels were left open and some go-loops continued running.

The fix in my case was to create the taps earlier, before any processing, but it made me think about what the default behavior should be.

The behavior I expected is that when {{tap}} is called on a {{mult}} with the {{close?}} parameter true (the default), and the input channel of the mult is already closed, then the channel passed to {{tap}} is closed immediately.
0 votes
by
Reference: https://clojure.atlassian.net/browse/ASYNC-64 (reported by alex+import)
...