Welcome! Please see the About page for a little more info on how this works.

+2 votes
in core.async by
  • each deref will take a value from the channel
  • deref from a closed channel returns nil
  • deref with timeout must either timeout or take value from the channel (not both!). Might just want to call some variant of alt!! through the var
  • use internals efficiently
  • do NOT implement IPending/realized, neither interpretation is that great

Patch: async-102-2.patch

The implementation of deref and timed deref is (link: http://dev.clojure.org/jira/browse/ASYNC-102?focusedCommentId=37211&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-37211 text: conceptually straightforward), but is difficult to implement due to circular namespace dependencies. Suggestions welcome.

8 Answers

0 votes
by
_Comment made by: stu_

The async-102.patch does not match the description "a channel becomes realized when it is closed" -- instead, a channel becomes realized when a value is currently available.  Both of these interpretations seem problematic to me, and I think the definition of {{realized?}} needs clarification to support this new case.   The other uses of {{realized?}} in Clojure all guarantee that subsequent derefs will be filled immediately, but that will not be true for channels if another consumer takes the value.

Once that is solved, I need better doc strings for the core.async implementation protocols before I can screen this.  What guarantees are made about use of threads?  Where does the work enqueue, and what happens if the queue is full?
0 votes
by

Comment made by: alexmiller

My implementation of realized? is based on Rich's response to your question (from internal chat): "open+empty = false, else true" and I believe that is what's implemented and reflected in the tests. This also makes more sense to me than only being realized on close.

Your comment vs other cases of realized? seems accurate, so I agree that's a question.

Can you be more specific on which implementation protocol? I'm guessing you specifically mean Channel and Buffer. Any thread could be calling into the Channel. The M2MC protects its internal state with a mutex and will forward calls down to the Buffer. All calls into the buffer are protected by the channel mutex. M2MC enqueues pending puts and takes in the puts and takes lists. Those are bounded by the fixed limit clojure.core.async.impl.protocols/MAX-QUEUE-SIZE (1024), at which point an exception is thrown.

0 votes
by

Comment made by: alexmiller

The conceptual implementations of IDeref and IBlockingDeref on ManyToManyChannel using existing async constructs is relatively straightforward:

`
(deftype ManyToManyChannel
#_existing_code...
IDeref
(deref [this] (<!! this))

IBlockingDeref
(deref [this ms timeoutValue]

(alt!!
  this ([val _] val)
  (timeout ms) timeoutValue)))

`

However, M2MC is defined in clojure.core.async.impl.channels. <!!, alt(image: ), and timeout are all defined in clojure.core.async, which depends on clojure.core.async.impl.channels, so there is a cyclic dependency problem here. The <!! and timeouts are pretty easy to deal with looking up the var behind delay like this:

`
(def ^:private <!!' (delay (find-var 'clojure.core.async/<!!)))

;; then:
IDeref
(deref [this] (@<!!' this))
`

However, I'm a little stumped on how to do the equivalent with alt!!, which is a macro around do-alt and alts(image: ).

0 votes
by

Comment made by: gshayban

Don't rely on resolving the circular dependency. Instead make a shared alt-flag and enqueue the two handlers (link: 1). fret will be delivering a promise, and then deref the promise.

(link: 1) https://github.com/clojure/core.async/blob/master/src/main/clojure/clojure/core/async.clj#L230-L231

0 votes
by

Comment made by: alexmiller

New patch goes back to prior implementation of blocking deref (basically guts of <!!) and adds new alts-based version of timed deref.

0 votes
by

Comment made by: fogus

However, I'm a little stumped on how to do the equivalent with
alt!!, which is a macro around do-alt and alts!!.

I wonder if the following will solve the alts!! problem:

(def ^:private ^:macro alts!!' (deref (find-var 'clojure.core.async/alts!!)))

And then using it in the deref impl behind IBlockingDeref. As it stands it's difficult to grok the code as written given that it's using some core.async implementation details (indeed, the same code). Whereas your implementation of IBlockingDeref.deref is crystal clear, the implementation prompted by the cyclic dependency is... less so. Not to belabor the point (which I seem to be), but alts!! is a clear implementation for this, but using the guts muddies the water. If I had my druthers, I'd prefer clarity.

That said, being a human macro-expander I was able to reason through the implementation eventually.

0 votes
by

Comment made by: alexmiller

That doesn't seem to work to me. If you have some formulation that would, that would be great though!

0 votes
by
...