Welcome! Please see the About page for a little more info on how this works.

0 votes
in Sequences by
edited by

I would have expected the below to be parallelized across all available cpus (in this case 12) but it only runs on one. What am I doing wrong?

(ns csv2summap.core
  (:require [clojure.data.csv :as csv]
            [clojure.java.io :as io]
            [clojure.core.reducers :as r])))

(with-open [writer (io/writer "numbers.csv")]
  (csv/write-csv
   writer
   (take 10000000
         (repeatedly #(vector (char (+ 65 (rand-int 26))) (rand-int 1000))))))

(defn sum-vals
([] {})
([m [k v]]
 (update m k (fnil + 0) (Integer/parseInt v))))

(defn merge-sums
([] {})
([& m] (apply merge-with + m)))

(time
(with-open [reader (io/reader "numbers.csv")]
  (doall
   (r/fold
    (/ 10000000 12)
    merge-sums
    sum-vals
    (csv/read-csv reader)))))
(def n-cpu (.availableProcessors (Runtime/getRuntime)))
=>12
(ns csv2summap.core
  (:require [clojure.data.csv :as csv]
            [clojure.java.io :as io]
            [clojure.core.reducers :as r])))

-

(defproject csv2summap "0.1.0-SNAPSHOT"
  :description "FIXME: write description"
  :url "http://example.com/FIXME"
  :license {:name "EPL-2.0 OR GPL-2.0-or-later WITH Classpath-exception-2.0"
            :url "https://www.eclipse.org/legal/epl-2.0/"}
  :dependencies [[org.clojure/clojure "1.10.1"]
                 [org.clojure/data.csv "1.0.0"]]
  :repl-options {:init-ns csv2summap.core})

2 Answers

+1 vote
by
selected by
 
Best answer

Thanks to Sean Corfield and Adrian Smith from the Clojurians slack channel, I now understand that r/fold requires a foldable sequence to run in parallel.

by
Specifically, you need something that's implementing CollFold.  Out of the box, vectors and some other types (range, iterate, hashmap) implement it, but the default fallback is to use serial reduce.  So, simple notion is to use vectors, slightly less simple is to define your own foldable or implement CollFold (using reducer api or reify).
0 votes
by

Great question! Efficient operations on this kind of data are super-important.

To that end, we have developed some relevant libraries.

This gist shows some of how we have simplified working with csv data, and also compares a naive solution to the summing/merging you were doing to a dataset/datatype-aware parallelized version (which, unsurprisingly, is quite a bit faster.

https://gist.github.com/harold/7335b78606f8e962f2b385f1ed79d15c

Hope this helps, and shows a different way to approach such a problem.

PS. Learned about fnil from your question, neat!

by
Thanks for your informative answer. Creating the dataset appears to take the bulk of the time (approx 82 secs from 100 M lines), whereas the processing itself is very fast - 100M lines in about 10 secs.
by
edited by
For sure, you're welcome. And yes, 100M is a lot of lines! Just parsing 200M numbers is bound to take a bit of time. Your question was about paralleling the merge/sum, so my code separated that part of the work. If you want to optimize dataset saving and loading `nippy` is typically way faster than CSV. Look into `tech.io/put-nippy!` and `tech.io/get-nippy`.
by
Thanks. Trying to produce 100M lines with  (ds/write-csv! (ds/->dataset source-data) or (io/put-nippy! "./data.nippy" (ds/->dataset source-data)) blows the heap on my machine. I produced the csv (and manually added the header) with my original code.
by
Got it. That makes sense, and is a cool example of the streaming possibilities of `clojure.data.csv`. This machine has 32GB of ram, so I was able to amend my code to write out 100M row csv and nippy datasets. The csv on disk is ~658MB and the nippy is ~537MB. The csv dataset loads in ~47s and the nippy dataset loads in ~2.8s. (:
...