Welcome! Please see the About page for a little more info on how this works.

0 votes
in core.async by

We encountered a thorny problem with core.async:

In some situations "No more than 1024 pending takes are allowed" is raised in an async-dispatch thread, even though there are actually only 1024 pending takes.
That situation can be triggered quickly with the following code:

`
(dotimes [_ 5000]
;; - start 1024 threads that all alt! on a common channel (ch) and on an individual channel (x)
;; - the common channel remains 'empty' all the time
;; - wakeup all theads twice via the x channel, which makes them loop.
;; - this quickly raises multiple 'No more than 1024 pending takes are allowed on a single channel.' assertions in an async-dispatch thread.
(let [ch (async/chan)

    threads clojure.core.async.impl.protocols/MAX-QUEUE-SIZE
    wakeups 2
    x-chans
    (mapv (fn [n]
            (let [x (async/chan)]
              (async/go-loop []
                (async/alt!
                  ch nil
                  x ([_] (recur))))
              x))
          (range threads))]
(doseq [x x-chans]
  (dotimes [n wakeups]
    (async/>!! x 42)))
(async/close! ch)))

`

Environment:
Mac OS, JDK 8, Clojure 1.10.0, core-async 0.6.532

1 Answer

0 votes
by

Dear Mr. Sperber,

i do not think i understand the problem very well. (... also... and tbh i have not really spent much thought on this :-).... )

Having said that, i just ran:

(ns asyncbug.core
  (:gen-class)
  (:require [clojure.core.async :as async]))


(defn -main
  "I don't do a whole lot ... yet."
  [& args]

  (dotimes [n 5000]
    (comment "
- start 1024 threads that all alt! on a common channel (ch) and
  on an individual channel (x)
- the common channel remains 'empty' all the time
- wakeup all theads twice via the x channel, which makes them loop.
- this quickly raises multiple
  'No more than 1024 pending takes are allowed on a single channel.'
  assertions in an async-dispatch thread.
")
    (let [ch (async/chan)

          threads clojure.core.async.impl.protocols/MAX-QUEUE-SIZE
          wakeups 2
          x-chans
          (mapv (fn [n]
                  (let [x (async/chan)]
                    (async/go-loop []
                      (async/alt!
                        ch nil
                        x ([_] (recur))))
                    x))
                (range threads))]
      (doseq [x x-chans]
        (dotimes [n wakeups]
          (async/>!! x 42)))
      (async/close! ch))

    (println "iteration: " n))

  (println "Hello, World!"))

(defproject asyncbug "0.1.0-SNAPSHOT"
  :description "FIXME: write description"
  :url "http://example.com/FIXME"
  :license {:name "EPL-2.0 OR GPL-2.0-or-later WITH Classpath-exception-2.0"
            :url "https://www.eclipse.org/legal/epl-2.0/"}
  :dependencies [[org.clojure/clojure "1.10.0"]
                 [org.clojure/core.async "0.7.559"]]
  :main ^:skip-aot asyncbug.core
  :target-path "target/%s"
  :profiles {:uberjar {:aot :all}})

java -version
openjdk version "11.0.6" 2020-01-14
OpenJDK Runtime Environment (build 11.0.6+10-post-Ubuntu-1ubuntu118.04.1)
OpenJDK 64-Bit Server VM (build 11.0.6+10-post-Ubuntu-1ubuntu118.04.1, mixed mode, sharing)

which eventually yields the "Hello, World!".

again.... i am really not quite sure about what is ( supposed to be ) going on here.... so i may be doing this all wrong :-)....

i have also given this a quick go using the core-async 0.6.532 version you mentioned, but aborted after 32 successful iterations..... until then no AssertionError had been raised.

i am sorry i could not be of more help to you.

Best Regards,
Florian Tauber

...