Compass Labs is a heavy user of Clojure for analytics and data process. We have been using Stuart Sierra’s excellent clojure-hadoop package for running a wide variety of algorithms derived from dozens of different Java libraries over our datasets on large Elastic MapReduce clusters.
The standard way to build a Jar for mapreduce is to pack all the libraries for a given package into a single jar and upload it to EMR. Unfortunately, building uberjars for Hadoop is a mallet when a small tap is needed.
We recently reached a point where the overhead of uploading large jars causes a noticeable slow down in our development cycle, especially when launching jobs on connections with limited upload bandwidth and with the slower uberjar creation of lein 1.4.
There are (at least) two solutions to this:
- Build smaller jars by having projects with dependencies specific to a given job
- Cache the dependencies on the cluster and send over your dependency list and pull the set you need into the Distributed Cache at job configuration time.
To allow us to continue to package all our job steps in a single jar, source tree and lein project, we opted for the latter solution which I will now describe.
Development Context
Our development environment for Clojure and Hadoop is simple:
- A single Leiningen project
- Stuart Sierra’s clojure-hadoop library
- Emacs+Slime/Swank for Clojure
- Use the AWS elastic-mapreduce command line tool to launch jobs locally and remotely
Our typical process involves local iterations and then larger runs on the cluster. We grab a sample of the target data to our local disk, develop map and reduce operators on in-memory data from the repl, create clojure-hadoop “jobs” and then run REPL-based mapreduce tests on the functions referenced by those job descriptions.
Until recently we built a ‘lein uberjar’ and submitted that to an AWS EMR cluster using a script wrapped around the elastic-mapreduce package. What we would like to do instead build a small, minimal jar containing clojure sources, and have the startup process capture all the dependencies. The solution, worked out with help from Ron Bodkin (@rbodkin) and Daniel Eklund of Think Big Analytics consists of two simple steps, obvious now in hindsight.
Using Maven in a Bootstrap Action
We now run ‘lein jar’ to package up our clojure source and ‘lein pom’ to generate a Maven POM file that captures the project dependencies and push both these files to S3. We use a custom version of the job.clj file from clojure-hadoop as the :main function in the project file. The modifications to this file are described below.
The solution proceeds in two steps.- Get all the dependencies for the current job onto the HDFS volume of the new cluster as well as into the classpath of the jobtracker node (e.g. it needs the clojure jar to run our main function).
- Load those dependencies into the distributed cache by modifying the JobConf object created by clojure-hadoop’s job methods.
A custom bootstrap-action was built for Amazon’s EMR:
- Grab a tar file from S3 and unpack it into the local maven repository on the jobtracker node of a fresh EMR cluster.
- Use the maven goal “dependency:copy-dependencies” to find and copy the appropriate jar files to a special directory in /home/hadoop/libcl.
- Copy these files to hdfs://localhost:9000/libcl using ‘hadoop fs’.
- The default version of s3cmd on the default Hadoop 0.20 AMI didn’t work for our S3 buckets, so we had to install the latest version of s3cmd to copy files from S3. There wasn’t a default .deb package available so we just wget the latest source distribution, unpack it in /home/hadoop and use the fixed path directly.
- Write the .s3cfg file to give s3cmd appropriate permissions.
(Beware that bootstrap-actions run in a special step directory and you need to cd to /home/hadoop to work in a canonical location. Also, if your job launching code requires any special jars, for example the clojure-1.2 jar, then you need to copy these jars to the hadoop distribution lib directory or otherwise ensure they’re on the classpath when the JVM starts up to run the job jar’s main method).
Modifying the Job Jar’s Startup Code
The final component of this strategy is included below. We look in the local ./libcl directory for a list of jars, then construct HDFS paths to point to the copies now in HDFS as follows:
(defn configure-distributed-cache [config]
(let [dir (new File “./libcl”)]
(when (.exists dir)
(dorun
(map (fn [fname]
(DistributedCache/addArchiveToClassPath
(new Path (str “/libcl/” fname)) config))
(.list dir)))))
config)
The Hadoop framework will ensure that these jars are available to the child map and reduce JVM instances created on the task nodes during the execution of the job step.
In a later post I will discuss refinements to this scheme for permanent clusters. For example, I would like to package the pom.xml with the job jar and do most of the logic in the startup java code including invoking Maven, interacting with HDFS and getting rid of the dependency on s3cmd.Confront the difficult while it is still easy; accomplish the great task by a series of small acts.
Tao Te Ching