I am getting the following error
No more than 1024 pending takes are allowed on a single channel
However, I though that if i defined a channel with a specifed size , the puts would block.
(def event-chan (chan 10))
I am calling this in a loop
(>!! event-chan evt)
My expectation is that it would block is there were pending 10 items in the channel .. however does not seem to be happening. My code is reading a fifo queue in a database and proceesiing it. it seems to not block even if the items are pending in the channel
edit: updated to copy paste the full actual code.
(defn process-edn [{:keys [data code]}]
; lots db processing code here
)
(def event-chan (chan 10))
(defn eat-now-or-later[evt]
(if ( = (evt :code) :loadevt)
(do (println "sync processing load event ")
(process-edn evt))
(>!! event-chan evt)
))
(defn async-deque [shutdown?]
(time! dq-call-processing-time
(let [k-t (ftup/from commons/*JOB_FIFO_EVENT_QUEUE*)
fdb (cfdb/select-api-version cfdb/clj-fdb-api-version)]
(go (while true (process-edn (<! event-chan))))
(loop[evt (with-open [^Database db (cfdb/open fdb)] (fifo/dequeue db k-t))]
(if (or (shutdown?) (nil? evt)) 0
(do
(eat-now-or-later evt)
(recur (with-open [^Database db (cfdb/open fdb)] (fifo/dequeue db k-t))))
)
)))
(ptasks/print-stats))
The above is the function definition. I am calling this function from the daemon code below
(ns dqdaemon.core
(:require [shutdown.core :as shutdown])
(:require [com.videocloudmanager.petikaa.dequeue-tasks :as dqt])
)
;; A crude approximation of your application's state.
(def state (atom {}))
(defn shutdown?[] (false? (:running @state)))
(defn init [args]
(swap! state assoc :running true)
(shutdown/add-hook! ::descriptive-name #(do
(swap! state assoc :running false)
(println "It's about to go down!")))
)
(defn start []
(while (:running @state)
(println "tick")
(dqt/async-deque shutdown?)
(Thread/sleep 2000)))
;; Enable command-line invocation
(defn -main [& args]
(init args)
(start))
Update:
I understand that this is not an issue with blocking puts. It is because my code is creating unlimited goloops.
This code is being called in a loop and the parent method of this loop , async-dequ, is being called in a forever loop.
(go (while true (process-edn (<! event-chan)))))
I am moving this code out of the loop-recur and into the main method to initialize when the daemon is started