_Comment made by: jwhitlark_
;; I dug this out of some scratch code experimenting with kafka streams. All the reify's were filled with java8 lambdas in the original.
;; I'll dig up another example that shows examples using stuff from java.utils.funstion.*
;;Some of this was lifted from a franzy example or something?
;; Note that, for example,
;;
https://kafka.apache.org/0102/javadoc/org/apache/kafka/streams/kstream/Predicate.html
;; is different from
;;
https://docs.oracle.com/javase/8/docs/api/java/util/function/Predicate.html
(ns utils
(:import (org.apache.kafka.streams.kstream Reducer KeyValueMapper ValueMapper Predicate))
(defmacro reducer [kv & body]
`(reify Reducer
(apply [_# ~(first kv) ~(second kv)]
~@body)))
;; public interface KeyValueMapper<K,V,R>
;; apply(K key, V value)
(defmacro kv-mapper [kv & body]
`(reify KeyValueMapper
(apply [_# ~(first kv) ~(second kv)]
~@body)))
;; public interface ValueMapper<V1,V2>
;; apply(V1 value)
(defmacro v-mapper [v & body]
`(reify ValueMapper
(apply [_# ~v]
~@body)))
(defmacro pred [kv & body]
`(reify Predicate
(test [_# ~(first kv) ~(second kv)]
~@body)))
;; I used it something like this:
(ns our-service.kafka-streams
(:require
[our-service.util :as k]
[clojure.string :as str]
(:import
(org.apache.kafka.streams StreamsConfig KafkaStreams KeyValue)
(org.apache.kafka.streams.kstream KStreamBuilder ValueMapper)))
(defn create-word-count-topology []
(let [builder (KStreamBuilder.)
init-stream (.stream builder (into-array ["streams-str-input"]))
wc (-> init-stream
(.flatMapValues (k/v-mapper [& value]
(str/split (apply str value) #"\s")))
(.map (k/kv-mapper [k v]
(KeyValue/pair v v)))
(.filter (k/pred [k v]
(println v)
(not= v "the")))
(.groupByKey)
(.count "CountStore")
show-item
;; this needs to be mapValues
(.mapValues (reify ValueMapper
(apply [_ v]
(println v)
(str v))))
(.toStream)
(.to "wordcount-output"))]
[builder wc]))