Welcome! Please see the About page for a little more info on how this works.

0 votes
in core.async by
I have two (or more) listeners tapped onto a mult channel. I want to use them all then have one (or more) of them to leave at will without blocking the other consumer(s) or the publisher. Initially they work fine until one of them wants to stop listening. I thought the listener which drops out needs to (be a good citizen and) untap its channel from mult (otherwise a deadlock is systematic). However if messages are put into the mult before the leaving listener has had a chance to untap its channel, it creates a deadlock on the main thread (which is putting more messages simultaneously). I do not find a way to guarantee that I can untap the channel in time to avoid this race condition.

Once I have reproduced the deadlock, the repl is frozen until I interrupt with ctrl-c.
I have also tried to close the tapped channel before untapping it but the result was the same.

In the following snippet, the last (println "I'm done. You will never see this") is never reached. The publisher and the remaining consumer (consumer 1) are deadlocked even though consumer 2 was trying to leave in good terms.


(require '[clojure.core.async :refer (chan go <! >!! mult tap untap)])
(let [to-mult (chan 1)
      m (mult to-mult)]

  ;;consumer 1
  (let [c (chan 1)]
    (tap m c)
    (go (loop []
          (when-let [v (<! c)]
            (println "1 Got! " v)
            (recur))
          (println "1 Exiting!"))))

  ;;consumer 2
  (let [c (chan 1)]
    (tap m c)
    (go (loop []
          (when-let [v (<! c)]
            (when (= v 42)  ;; exit when value is not 42
              (println "2 Got! " v)
              (recur)))
          (println "2 about to leave!")
          (Thread/sleep 5000) ;; wait a bit to exacerbate the race condition
          (untap m c) ;; before unsubscribing this reader
          (println "2 Exiting."))))

   (println "about to put a few messages that work")
   (doseq [a (range 10)]
     (>!! to-mult 42))
   (println "about to put a message that will force the exit of 2")
   (>!! to-mult 43)
   (println "about to put a few more messages before reader 2 is unsubscribed to show the deadlock")
   (doseq [a (range 10)]
     (println "putting msg" a)
     (>!! to-mult 42))
   (println "I'm done. You will never see this"))



about to put a few messages that work
2 Got!  42
1 Got!  42
2 Got!  42
1 Got!  42
1 Got!  42
2 Got!  42
1 Got!  42
1 Got!  42
2 Got!  42
2 Got!  42
2 Got!  42
2 Got!  1 Got!  42
422 Got!  42

1 Got!  42
1 Got!  42
2 Got!  42
1 Got!  42
about to put a message that will force the exit of 2
1 Got!  42
2 Got!  about to put a few more messages before reader 2 is unsubscribed to show the deadlock
42
putting msg 1 Got!  0
2 about to leave!
43
1 Got!  42
putting msg 1
putting msg 2
putting msg 3
1 Got!  42
2 Exiting.

3 Answers

0 votes
by

Comment made by: gshayban

Mathieu, this is probably expected. It's important to note that to guarantee correct ordering/flow when using a mult, you should enforce it on the source/producer side of the mult, and not asynchronously on the tap side.

Mult will deref a stable set taps just before distributing a value to them, and does not adjust dynamically during value distribution except when a tap has been closed (link: 1). If you would like to stably untap without closing the tap you can/should let the 'producer' do it in an ordered fashion in between values on the input channel.

Knowing that a put occurred to a closed channel is new on release 0.1.278.

In general, walking away on the consuming side of a channel is tricky. Depending on the semantics of your processes, if the producer side of a channel isn't aware that a close! can happen from the consumer side, you might have to launch a draining operation.

(defn drain (link: c) (go (when (some? (<! c)) (recur))))

Golang disallows closing a read-only channel FWIW (link: 2)

Better documentation is probably warranted.

(link: 1) https://github.com/clojure/core.async/blob/master/src/main/clojure/clojure/core/async.clj#L680-L682
(link: 2) http://golang.org/ref/spec#Close

0 votes
by

Comment made by: samumbach

See also ASYNC-66.

0 votes
by
Reference: https://clojure.atlassian.net/browse/ASYNC-58 (reported by alex+import)
...