I'd like to write to a database as the final process in a core.async.flow flow. The db load can take several minutes. If I do this work directly in the :transform function's thread, flow-monitor can't ping that process for the duration of the load. But if I offload the work to a different thread and return from the :transform function, I can't as easily control back pressure from that process.
The relevant process currently looks something like:
(def db-loader
(flow/map->step
{:describe (fn [] {:workload :io
:ins {:in "Batched items"}})
:init (fn [state] state)
:transition (fn [state status] state)
:transform (fn [state _ batch]
(t/log! "DB Loader is loading a batch.")
(Thread/sleep 10000) ;; imagine this code writes to the db.
(t/log! "DB Loader loaded a batch.")
[state])}))
A more complete code example is at https://gist.github.com/tomconnors/245fb69ed757b34502c8d57637db8de2.
Is there a better option for this process?