Welcome! Please see the About page for a little more info on how this works.

+1 vote
in core.cache by
retagged by

We recently deployed core.cache in a web context, and we found out that there was a cache stampede for multi-threaded access. It was obvious in retrospect, but the references to stampede in the docs made me think that this was handled.

With the help of Slack, I have a reproduction here:

(let [thread-count 20
      cache-atom (-> {}
                   (cache/ttl-cache-factory :ttl 120000)
                   (cache/lu-cache-factory :threshold 100)
                   (atom))
      latch (java.util.concurrent.CountDownLatch. thread-count)
      invocations-counter (atom 0)]
 (doseq [i (range thread-count)]
   (println "starting thread" i)
   (.start (Thread. (fn []
                     (cache-wrapped/lookup-or-miss cache-atom "my-key"
                                                   (fn [k]
                                                     (swap! invocations-counter inc)
                                                     (Thread/sleep 3000)
                                                     "some value"))
                     (.countDown latch)))))

 (.await latch)
 (deref invocations-counter))

I would expect that invocations counter would be 1 but it's 20 (once per thread).

by
Based on the suggestion to use core.memoize, this works as expected:

    (let [thread-count 20
          invocations-counter (atom 0)
          expensive-function (fn [k]
                             (swap! invocations-counter inc)
                             (Thread/sleep 3000)
                             (str "value-" k))
          cache (-> {}
                  (cache/ttl-cache-factory :ttl 120000)
                  (cache/lu-cache-factory :threshold 100))
          memoized-function (memoize/memoizer expensive-function cache)
          latch (java.util.concurrent.CountDownLatch. thread-count)]
     (doseq [i (range thread-count)]
       (println "starting thread" i)
       (.start (Thread. (fn []
                         (memoized-function "my-key")
                         (.countDown latch)))))

     (.await latch)
     (assert (= 1 (deref invocations-counter))))

2 Answers

+1 vote
by
by
Thanks, Alex!
+1 vote
by

lookup-or-miss seems to have a concurrency bug indeed. On line 57 and 67 the deref-ed value of a delay is inserted in the cache via a swap! . This delay can only be executed once, however when many threads have a cache miss for the same value at the same time, many threads will
deref a value. As a result there will be many executions before the value is inserted in the cache.

https://github.com/clojure/core.cache/blob/master/src/main/clojure/clojure/core/cache/wrapped.clj#L57
https://github.com/clojure/core.cache/blob/master/src/main/clojure/clojure/core/cache/wrapped.clj#L67

To fix this the wrapped namespace will have to store the delay in the cache and deref these delay only on retrieval.

The workaround with the current version is to do this wrapping yourself:

;; clj -Sdeps '{:deps {org.clojure/core.cache {:mvn/version "1.0.225"}}}'

(require '[clojure.core.cache :as cache]
         '[clojure.core.cache.wrapped :as cache-wrapped])

(let [thread-count 20
      cache-atom (-> {}
                   (cache/ttl-cache-factory :ttl 120000)
                   (cache/lu-cache-factory :threshold 100)
                   (atom))
      latch (java.util.concurrent.CountDownLatch. thread-count)
      invocations-counter (atom 0)]
 (doseq [i (range thread-count)]
   (println "starting thread" i)
   (.start (Thread. (fn []
                     ;; Deref the outcome
                     @(cache-wrapped/lookup-or-miss cache-atom "my-key"
                                                   (fn [k]
                                                     ;; Put the action in a delay
                                                     (delay
                                                       (swap! invocations-counter inc)
                                                       (Thread/sleep 3000)
                                                       "some value")))
                     (.countDown latch)))))

 (.await latch)
 (deref invocations-counter))
by
I think it's going to be hard to fix this without breaking existing code since every path through the API would have to wrap/unwrap these new delays -- including seed needing to wrap every value in the provided base which could be an already composed cache data structure.
...