Welcome! Please see the About page for a little more info on how this works.

0 votes
ago in Clojure by

I'd like to write to a database as the final process in a core.async.flow flow. The db load can take several minutes. If I do this work directly in the :transform function's thread, flow-monitor can't ping that process for the duration of the load. But if I offload the work to a different thread and return from the :transform function, I can't as easily control back pressure from that process.

The relevant process currently looks something like:

(def db-loader
  (flow/map->step
   {:describe (fn [] {:workload :io
                      :ins {:in "Batched items"}})
    :init     (fn [state] state)
    :transition (fn [state status] state)
    :transform (fn [state _ batch]
                 (t/log! "DB Loader is loading a batch.")
                 (Thread/sleep 10000) ;; imagine this code writes to the db.
                 (t/log! "DB Loader loaded a batch.")
                 [state])}))

A more complete code example is at https://gist.github.com/tomconnors/245fb69ed757b34502c8d57637db8de2.

Is there a better option for this process?

Please log in or register to answer this question.

...