Welcome! Please see the About page for a little more info on how this works.

0 votes
in Clojure by
recategorized by

I'm trying to convert a channel into a seq. This is what I'm using currently:

(defn chan->seq [ch]
  (when [v (<!! ch)]
    (lazy-seq (cons v (chan->seq ch)))))

It works but it uses a blocking read, so the current thread will be blocked until the channel is closed, and you can't have many calls of this function active at the same time. Is it possible to have a non-blocking version of this function?

This example demonstrates this issue:

(doseq [_ (range 20)]
  (go (chan->seq (chan 10))))

(go (println :OK))

The last statement won't print anything since chan->seq calls are squatting on all the threads in the threadpool. It works if 20 is changed to 5 (in a new repl session).

1 Answer

0 votes
by
edited by

Your implementation is good.

We wrote (and have used) something very similar:

https://github.com/techascent/tech.parallel/blob/cc1b9b0ef8a0893fe1278a700a4dbb0271f92156/src/tech/parallel.clj#L38-L53

you can't have 1000 calls of this function active at the same time

I am not so sure about this. Have you tried?

Under some conditions of resource constraint the stack usages of the parked threads may cause memory pressure, but in general contemporary schedulers perform fine with large numbers of parked threads (I would be surprised if resuming a thread was not O(1), at least on the systems we use - OpenJDK on linux on various EC2 machines on AWS).

Hope that helps.

by
It doesn't have to be 1000, 20 is fine. I've updated the question with an example.
by
Ah, in that case, `go` is the culprit. `go` uses its own thread pool which doesn't have many threads. But! Because the read is blocking, you don't need `go` here at all. See the implementation I linked, which works with lots of threads.
by
What if I want to use this function inside a go block? Or in clojurescript, which doesn't have blocking operations.
by
edited by
In that case, it's perhaps a bit more complicated. You'd probably have to know something else about the problem (like the pattern/timing/frequency of items showing up on the channel) and use that to engineer something non-blocking, perhaps restartable and with timeouts, or similar. In many contexts, JavaScript is still effectively single-threaded.
...