Welcome! Please see the About page for a little more info on how this works.

0 votes
in core.async by
edited by

I am getting the following error

No more than 1024 pending takes are allowed on a single channel

However, I though that if i defined a channel with a specifed size , the puts would block.

(def event-chan (chan 10))

I am calling this in a loop

(>!! event-chan evt)

My expectation is that it would block is there were pending 10 items in the channel .. however does not seem to be happening. My code is reading a fifo queue in a database and proceesiing it. it seems to not block even if the items are pending in the channel

edit: updated to copy paste the full actual code.

(defn process-edn [{:keys [data code]}]
; lots db processing code here
)
  
(def event-chan (chan 10))
  
(defn eat-now-or-later[evt]
    (if ( = (evt :code) :loadevt)
        (do (println "sync processing load event ")
            (process-edn evt))
        (>!! event-chan evt)
      ))
  
  (defn async-deque [shutdown?]
    (time! dq-call-processing-time
           (let [k-t (ftup/from commons/*JOB_FIFO_EVENT_QUEUE*)
                 fdb (cfdb/select-api-version cfdb/clj-fdb-api-version)]
             (go (while true (process-edn (<! event-chan))))

             (loop[evt (with-open [^Database db (cfdb/open fdb)] (fifo/dequeue db k-t))]
              
              (if (or (shutdown?)  (nil? evt)) 0
                  
                  (do                   
                   (eat-now-or-later evt)  
                   (recur (with-open [^Database db (cfdb/open fdb)] (fifo/dequeue db k-t))))
                  )
              )))
    (ptasks/print-stats))

The above is the function definition. I am calling this function from the daemon code below

(ns dqdaemon.core
  (:require [shutdown.core :as shutdown])
  (:require [com.videocloudmanager.petikaa.dequeue-tasks :as dqt])
 )

;; A crude approximation of your application's state.
(def state (atom {}))

(defn shutdown?[] (false? (:running @state)))

(defn init [args]
  (swap! state assoc :running true)
  (shutdown/add-hook! ::descriptive-name #(do
                                            (swap! state assoc :running false)         
                                            (println "It's about to go down!")))
  )

(defn start []
  (while (:running @state)
    (println "tick")
     (dqt/async-deque shutdown?)
    (Thread/sleep 2000)))


;; Enable command-line invocation
(defn -main [& args]
  (init args)
  (start))

Update:
I understand that this is not an issue with blocking puts. It is because my code is creating unlimited goloops.

This code is being called in a loop and the parent method of this loop , async-dequ, is being called in a forever loop.

(go (while true (process-edn (<! event-chan)))))

I am moving this code out of the loop-recur and into the main method to initialize when the daemon is started

1 Answer

0 votes
by
selected by
 
Best answer

It's not clear to me how you could get that error from that code. Could you post either the real code or a standalone example that reproduces the problem?

by
Hi @alexmiller -- thanks for the feedback. I have pasted the original code
by
The error message refers to takes - not puts - so it isn't related to >!!.  The message indicates there are 1000+ go blocks or threads all trying to take from the same channel.  The code still does not quite add up, but the problem could arise because `start` calls `async-deque` in a loop, and `async-deque` starts an endless go loop, so eventually tons of those go loops will be operating.
...