Steps and Flows: Higher-order Composition for Clojure-Hadoop

The clojure-hadoop library is an excellent facility for running Clojure functions within the Hadoop MapReduce framework. At Compass Labs we’ve been using its job abstraction for elements of our production flow and found a number of limitations that motivated us to implement extensions to the base library.  We’ve promoted this for integration into a new release of clojure-hadoop which should issue shortly.

There are still some design and implementation warts in the current release which should be fixed by ourselves or other users in the coming weeks. 

Minor extensions to the job abstraction 

  • :param argument – Accepts a list of key value pairs to be written into the raw job configuration. For example, ‘:param ["mapreduce.job.reuse.jvm.num.tasks" 5]‘ configures the job to reuse JVM tasks.
  • :config argument – Accepts a function or list of functions or function names which will be called on the job object during job configuration but prior to Job submission. These functions are run on the main node only. 
  • :batch argument – A new command line argument to use (.submit job) instead of (.waitForCompletion job) to submit the job 

There is also a hack in the job internals that allow you to override some default behaviors by which the Job object gets created to work around some issues that came up in our environment.  Write to me if you are curious about the with-job-customization macro at the bottom of job.clj.

    Complex Jobs: Steps and Flows

    While it’s possible to use a job scheduling language, such as Oozie, to compose a set of interrelated job steps, we already have Clojure close at hand and can use Clojure functions to manage more complex job flows as single invocations on Hadoop.  Moreoever, we find that we want to be able to build a jar and pass a wider variety of parameters than afforded by the default arguments to jobs.

    Jobs are basically function objects that return maps, so it should be easy to give them arguments and allow them to accept parsed strings from a command line or clojure values directly from calls on the repl.  A Step is basically a job object that accepts parameters and accepts some additional key-value pairs defining components.  Components are fragments of a job definition.  The motivation for components is that there are often a set of parameters given to jobs that are highly interdependent, such as the quad set of output writer, output format, output key and output value types. Why not package these up in maps of their own, allow them to be parameterized, and then merge them together when assembling job definitions?

    The clojure-hadoop.flow namespace provides useful functions and macros that enable higher level composition and abstraction for jobs and multi-step job flows.

    As mentioned, components are partial descriptions of jobs broken into three logical pieces that are used to simplify the definition of steps.  Steps can be further composed in Flows, which are simply clojure functions that can wrap control code around individual steps and executed on the cluster master.

    • Source. Components which contain a partial description of the input key pairs for a single job. 

    https://gist.github.com/840745

    • Sink. Components which contain a partial description of the output formats and types for a single job. 

    https://gist.github.com/840750

    • Shuffle. A matched set of writers, readers and optionally map stage output types 

    https://gist.github.com/840748

    • A step is nearly identical to a defjob, a function that returns a configuration. However, it accepts the special keywords :source, :sink, and :shuffle and merges the descriptions into the configuration. It also substitutes any arguments at the time it is called to allow a job to be parameterized. 
    • A flow is a standard clojure function which calls a series of job steps using the do-step function and can take arbitrary action based on what is returned by that job step. It can return a value to the caller so that flows can be composed themselves just like ordinary clojure functions logic.  Any uncaught exception results in a failure of the flow. A regular return value of ‘false’ is also considered a failure. 

    For example, a little job we use to collect stats on users:

    https://gist.github.com/841219

    Launching Steps and Flows on Hadoop

    To call a flow or a step with arguments, you need to use a different interface when using hadoop -jar. If you make clojure-hadoop.flow your class the main method accepts arguments with the following signature: [arg]*. Each arg is passed to the clojure reader function so that clojure values are passed to the named step or flow. Type can be “-job”, “-step”, or “-flow”. In the case of legacy support for jobs, you need to provide arguments in the original style (e.g. “-output <filename>”).

    You can run steps from the repl using do-step.  Flows are just functions that call do-step so you can call them directly.  The hadoop setup on your classpath will determine how these are run.  Currently the framework doesn’t support JobClient functionality for remote submission.  

    Passing Values within Flows ???

     :post-hook is supplied as an optional argument to steps to allow you to pass a function taking a single argument, the Job object, to be run after the step completes within a call to do-step. 

    Some ideas for various ways to return values to an enclosing flow: 

    1. Task successfully completed boolean (default return argument) 
    2. :post-hook function is called with job object can return any clojure value  
      1. Query hadoop counters via (get-counter (.getCounters job)
      2. Access out-of-band data in an external database 
      3. Access data on the DFS or local FS
      4. Interaction via shared state passed in an argument (e.g. an agent or atom)
      5. Interaction via dynamic binding 
      6. Return settings by setting the job configuraiton (discouraged) 
    3. Throw a signal which will result in printing the exception to stdout and terminating the flow

    Some Example Best Practice Patterns

    Guarded files

    One common pattern is providing guards around datasets so they are only recomputed after some time delay (i.e. weekly). We’ve implemented some simple wrappers around steps that run the step only when the time has expired.

    https://gist.github.com/840823

    Tracking ???

    Another great pattern is to use an external database or HDFS file to remember the state of flows so you can, for example, remember the last time you updated some data.

     

    About these ads

    One comment

    1. Sunil Nandihalli

      I see the Gists .. but would be helpfull if you gave a single file and a way to run it. I have used clojure-hadoop quiet extensively .. but just not flows. Looking forward to hearing from you. Thanks,Sunil.

    Leave a Reply

    Fill in your details below or click an icon to log in:

    WordPress.com Logo

    You are commenting using your WordPress.com account. Log Out / Change )

    Twitter picture

    You are commenting using your Twitter account. Log Out / Change )

    Facebook photo

    You are commenting using your Facebook account. Log Out / Change )

    Google+ photo

    You are commenting using your Google+ account. Log Out / Change )

    Connecting to %s