MapReduce with MongoDB and Clojure
A few days ago, we decided to create a dashboard in order to better visualize some statistics of our production systems. One important function is to plot the average latency as a time-series graph, so we can see the trend over time. Since MongoDB implemented MapReduce, and we store our logs in MongoDB, MapReduce seems a natural fit for log analysis.
One issue with MongoDB’s implementation of MapReduce is that no matter what language you use, you have to pass JavaScript code as strings to MongoDB. Storing code written in another language as strings in a program is … inelegant, to say the least.
Fortunately, Clojure being a homoiconic language, it is relatively easy to transform Clojure forms into code snippets of other languages using Clojure itself in the same program. In other words, it is possible to embed JavaScript programs in a Clojure program without actually seeing any JavaScript syntax. There are already a number of libraries, with different level of maturity, that allow you to transform Clojure forms to JavaScript. I haven’t done an extensive survey, but ClojureJS is good enough for our purpose.
The following the mapper to collect the latency numbers in the request log so that they are keyed by date:
(def req-latency-mapper
(js (fn []
(let [time (:time this)
date (do (.setHours time 1)
(.setMinutes time 0)
(.setSeconds time 0)
(.setMilliseconds time 0)
time)
latency_ms (:latency_ms this)]
(emit date {:num_requests 1, :sum_latency_ms latency_ms})))))
js is a macro defined in ClojureJS that takes a Clojure form and returns JavaScript code as a string.
Then define a reducer to compute the sum of latency and the number of requests in each day.
(def req-latency-day-reducer
(js (fn [date vals]
(let [result {:num_requests 0, :sum_latency_ms 0}]
(.forEach vals
(fn [val]
(set! result.num_requests
(+ result.num_requests val.num_requests))
(set! result.sum_latency_ms
(+ result.sum_latency_ms val.sum_latency_ms))))
result))))
We also need a finalizer to compute the average.
(def average-finalizer
(js (fn [date val]
(let [{:keys [num_requests sum_latency_ms]} val
average (/ sum_latency_ms num_requests)]
(set! val.average_latency_ms average)
val))))
Finally, we call map-reduce, which is a wrapper around the MongoDB Java API recently implemented in CongoMongo.
(defn gen-daily-average-req-latency []
(map-reduce :oc_request_log
req-latency-mapper
req-latency-day-reducer
:oc_average_daily_req_latency
:finalize average-finalizer))
Now the job is done, with a nice and clean Clojure program.