Welcome! Please see the About page for a little more info on how this works.

0 votes
in Libs by

Hi, I've been testing the new clojure.core.async 1.8.711-beta1, and suspected that since the default fixed thread pool was replaced with a cached thread pool, it could be possible for the system to create a very large number of OS threads, in the order of hundreds or thousands depending on the size of the workload.

I am wondering if this was intended or if there might be a better middle-ground such as still using a CachedThreadPool but potentially accepting maximum sizes for each pool type via sys props, with reasonable defaults.

Example code below:

(ns user)

(require '[clojure.core.async :as a]
         '[clojure.core.async.impl.dispatch :as d])

(defn do-work
  "Simulate some async work which "
  [task-count task-timeout]
  (a/go-loop [c (a/merge
                 (doall
                  (for [i (range task-count)]
                    (a/go
                      ;;; this simulates non-blocking park insdie the go-block
                      (a/<! (a/timeout
                             (rand-int task-timeout)))
                      ;;; this simulates blocking IO inside an io-thread
                      (a/<! (a/io-thread
                             (Thread/sleep
                              ^int (rand-int task-timeout))))
                      i))))]
    (let [v (a/<! c)]
      (if (nil? v)
        {:pool-size (.getPoolSize ^java.util.concurrent.ThreadPoolExecutor (d/executor-for :core-async-dispatch))}
        (recur c)))))

(def active
  (atom false))

(defn run-sim
  []
  (reset! active true)
  (a/go-loop [iter 1]
    (let [start (System/currentTimeMillis)
          {:keys [pool-size]} (a/<! (do-work
                                     2000  ;;; simulated number of tasks
                                     100)) ;;; simulated max delay in ms
          stop (System/currentTimeMillis)
          elapsed (- stop start)]
      (println "Iteration:" iter "Pool Size:" pool-size "Elapsed:" elapsed)
      (when (and @active
                 (< iter 10))
        (recur (inc iter))))))

(defn stop-sim
  []
  (reset! active false))

(comment
  (run-sim)   ;;; run the sim
  (stop-sim)) ;;; stop the sim
by
Deps:
org.clojure/clojure {:mvn/version "1.12.0"
org.clojure/core.async {:mvn/version "1.8.711-beta1"}

1 Answer

0 votes
by

There's so much stuff in this example, it's hard to tell what actual use case you're trying to think through. Yes, mechanically it's possible, but I can't tell what's important here or if this is just poorly structured - are you trying to make a point about io-thread or the change in go or the use of them together?

You're eagerly spinning task-count number of blocking tasks. Before io-thread, you would have used thread for that, which is no different. (In the future io-thread will be able to utilize JVM virtual threads when available, which will be ideal for this in resource usage.)

by
Thanks for the quick reply Alex, the important part was the number of threads growing depending on the number of of io-thread calls, the future state makes a lot of sense in that regard, since virtual threads will be very light.
...