Welcome! Please see the About page for a little more info on how this works.

0 votes
in core.async by
The actual parallelism of {{pipeline-async}} is greater by 2 than what was passed. Here's the simplest reproduction case I could come up with:


(let [message-count 20
      parallelism 5
      parallel-count (atom 0)
      input (async/to-chan (range message-count))
      output (async/chan message-count)]
  (async/pipeline-async parallelism
                        output
                        (fn [_ result]
                          (async/go
                            (swap! parallel-count inc)
                            (async/<! (async/timeout 1000))
                            (async/>! result @parallel-count)
                            (swap! parallel-count dec)
                            (async/close! result)))
                        input)
  (async/<!! (async/into [] output)))


Results in something like this:


[7 4 7 5 3 7 2 7 7 7 5 3 1 3 6 4 3 1 2 6]

5 Answers

0 votes
by

Comment made by: hiredman

I suspect this is just a lack of clarity around what parallelism means
in the context of pipeline-async.

A parallelism of 'n' for pipeline-async means there are 'n' logical
threads spinning off async tasks, and because those tasks are async,
those logical threads spin off the task, and proceed to the next. The
other thing that 'n' effects is the size of the buffer for the
results channel internally in the pipeline. because the tasks are
asynchronous, the only throttle is effectively the size of the buffer
on the output channel.

If the only throttle is the buffer, which is of size 'n', why do we
seem to have n+2 logical threads running at once, well because there
are a few extra channels involved which combined with the logic for
copying from channel to channel, effectively adds another buffer of
size 2.

This behavior does seem arcane. But I would argue that the
behavior of pipeline-async is so different from pipeline and
pipeline-blocking that it really should be something distinct from
them. The other pipelines take transducers, and using the parallelism
'n' to spin up a number of logical threads for the other pipelines
actually does throttle the number of tasks run in parallel.

If you have not spent time evaluating which pipeline variant you
needed to use, and just grabbed pipeline-async because that is the
one that seemed intuitively the right choice (because it has -async
and this is core.async) you almost certainly should be using the
vanilla pipeline variant, its behavior is much more intuitive it is
much more likely to be what you want.

0 votes
by

Comment made by: dergutemoritz

Hi Kevin, thanks for your reply! After reading it, I think your point
regarding clarity around what parallelism means in the context of
{{pipeline-async}} is spot on :-) It seems to be quite a different
beast from the other pipelines, indeed.

We actually did evaluate the various options and from the
documentation it seemed to be the right tool in our case: We want to
launch a maximum number of parallel asynchronous HTTP requests. We
immediatley ruled out {{pipeline-blocking}} since we are not doing
blocking requests after all and don't want to hog up {{n}} threads
just for waiting for the requests to finish. We also figured that this
isn't really "computational parallelism" (as the documentation of
{{pipeline}} puts it), so {{pipeline-async}} seemed like the best
fit. Its API also looked well suited for this case: We just needed to
launch the HTTP request in {{af}} and then fill and close the
conveniently supplied result channel in the the request's completion
callback. This seemed to work fine -- until we actually hit the limit
of the connection pool, that is.

I'm not sure how we would use {{pipeline}} to the same effect. And
moreover, it's now rather unclear to me what {{pipeline-async}}'s
intended use case is. Can you perhaps enlighten me some more? That
would be much appreciated :-)

0 votes
by

Comment made by: hiredman

I dunno.

That does sort of sound like a thing for which you would use pipeline-async, but my experience with nio predates core.async, so I am not sure what the best way to stick the two together is. I am sort of skeptical about combining even "async" io libraries like http clients with core.async, since the async of the io system is different from the async of core.async, and when combining the two it is easy to screw up and use some blocking convenience function because async is hard.

depending on the nature of the http request and how you consume the results it would, of course, be way simpler to use a synchronous http library and pipeline-blocking. if you are bounding the number of http requests at a time to a certain number, I suspect you will run out of io capacity before the jvm has trouble with that number of threads.

if pipeline-async is working for you otherwise, besides the +2 thing, then, uh, maybe just subtract 2 :)

0 votes
by

Comment made by: lgs32a

Strongly disagree with the idea that parallelism N has a different meaning than "at most N ops are executed in parallel".

The current pipeline-async behavior puts N+2 operations in parallel flight all the time.
If pipeline-async has more async ops than N in parallel flight, it doesn't behave correctly.

Arguing that parallelism here means N async ops are /only launched in parallel/ is arguing that pipeline-async shouldn't await completion of async ops and just continue to launch them. That would be completely unbounded parallel process spawning. What would be the point of launching N async ops in parallel? Launching an async op returns immediately so I don't see any reasons why the /launching of async ops/ is parallelized or its parallelism would be relevant to the user. (Highly doubt that it makes anything faster or more parallel.)

Its a completely different case for pipeline and pipeline-blocking (also off by two). Their ops are executed /synchronously/ on the N internal dispatch-loops. Thus only at most N ops are guaranteed to be executed in parallel and the promise is held. (In their case, the off by two only means that if the to channel is blocking two more calculations are executed /after/ the first N calculations.)

0 votes
by
Reference: https://clojure.atlassian.net/browse/ASYNC-163 (reported by alex+import)
...