Welcome! Please see the About page for a little more info on how this works.

+6 votes
in core.async by
edited by


I've been working on a fix for the ASYNC-163 bug, and I think I've got a robust solution that doesn't require many changes to the pipeline* function.

I'm new to contributing to Clojure, so I'm a bit unsure about the process, though I've read about it on the clojure.org website. There's already an existing ticket for the problem, which I've linked above, however, it was closed as "won't fix", and I don't have a Jira account to comment there and send a patch, so I've decided to post here.

After studying the source code of the pipeline* function, I've understood the whole process of how jobs are passed between go-threads and channels and found a simple fix, that requires adding one more channel to synchronize job-producer and result-consumer threads.
I have a more in-depth explanation of the bug in my blog. It is rather long and convoluted, so I think it would be best if I will not repeat myself here, making this question unnecessarily long and much harder to understand. The explanation is available here.

I'm attaching a patch here, feel free to point me in the right direction if this place is not for this type of question. I've tested the in the REPL with various scenarios and in one of my projects, that uses regular pipeline-async, it works correctly. In addition, it passes tests in the core.async library, which I ran with the lein test command.

Patch: https://andreyorst.gitlab.io/0001-fix-ASYNC-163.patch

At the moment I have some problems with running CLJS tests with or without my patch for the async.cljs file, but once I figure out the problem I'll post an updated patch, for both JVM and CLJS runtimes

Edit: Nov 25 2022

Patch v2: https://andreyorst.gitlab.io/0001-fix-ASYNC-163-2.patch

This patch includes the same fix for async.cljs, though I wasn't able to run tests for some reason - I get the Uncaught TypeError: process.on is not a function error in the browser. Not sure what I'm missing, the project's readme doesn't mention any requirements besides building with lein and opening the HTML file. But the fix is essentially the same, so it should work fine.

Edit Dec 08 2022

I've put the fixed version of pipeline* into a library pipeline-extras. It also has unordered versions of all pipelines, which should have higher throughput because the conveyer doesn't stop if any of the tasks take longer to complete than the other ones.

1 Answer

0 votes

Hello, the "won't fix" there is to indicate that we don't think this is a problem and don't think it needs to be fixed.

edited by
Well, it is a problem when accessing a service that can serve a limited amount of connections.

In our product, we have a service that is configured to accept a maximum of 10 simultaneous connections.


(pipeline-async 10 out-chan request-fn in-chan)

creates 12 connections instead of 10, which leads to problems as the service begins refusing connections. It was an unexpected bug.

So in all code that uses the pipeline-async function, we have to do (- *max-requests* 2) to avoid creating more connections that the service accepts.

The patch I've attached ensures that the pipeline* function, which pipeline-async internally uses creates exactly n tasks, and not more, which fixes the problem I've faced at work.
It is a problem, though.  You could save the world from many headaches by revisiting this.  I can recollect quite a few first- and secondhand experiences were this created unexpected real world problems.  Unexpected and undocumented "off by 2" parallelism can really mess with your system in hard to diagnose ways.  
The original justification for wontfix was likely another oversight or misapprehension of the problem. It was then explained that `n` was supposed to designate the amount of parallel spawns, instead of running spawned processes. However, to spawns itself, parallelism has no utility at all, since spawning is always a constant time op.
> It was then explained that `n` was supposed to designate the number of parallel spawns, instead of running spawned processes

Interesting! However, neither `pipeline` nor `pipeline-blocking` share the off-by-two error and spawn exactly `n` parallel processes. This happens because the job consumer thread doesn't off-load awaiting the result to another thread, and does it by itself. My patch changes how async results are awaited, so the internal behavior is the same as blocking and compute variants.
Yes Andrey, I just wanted to point out to Alex that this case of wontfix is likely a misapprehension on their side.  Probably its too late to fix (as usercode working around this bug already accounts for off by two). But for instance deprecating the function with a docstring pointing out the issue, and providing a correct version could save future users from some nightmare debugging sessions.
As the author of the original JIRA issue, I feel obliged to chime in, as well :-) I wholeheartedly agree with lgrapenthin's last comment. Over the years I've encountered a few more occasions where people were puzzled by this behavior, too. Even if you accept the different meaning of the "parallelism" argument relative to the other pipeline functions (which is what the "wontfix" status implies), I don't see how it's useful to the caller in any way. Maybe we're all missing something here?

In any case, if this is the final word, the docstring should at least mention this somehow, e.g. "with parallelism n where n doesn't refer to the maximum number of parallel calls to af but X" (still not 100% what to fill in for X here so that it makes sense from the user's point of view). Arguably, the docstring of "pipeline" could be modified accordingly to make the difference between the two meanings explicit, lest readers might confuse them.

That being said, I also would prefer to deprecate this function instead and have Andrey's implementation as a successor (pipeline-async2?).