BT

Apache Crunch: A Java Library for Easier MapReduce Programming

Posted by Josh Wills on Dec 27, 2012 |

Apache Crunch (incubating) is a Java library for creating MapReduce pipelines that is based on Google's FlumeJava library. Like other high-level tools for creating MapReduce jobs, such as Apache Hive, Apache Pig, and Cascading, Crunch provides a library of patterns to implement common tasks like joining data, performing aggregations, and sorting records. Unlike those other tools, Crunch does not impose a single data type that all of its inputs must conform to. Instead, Crunch uses a customizable type system that is flexible enough to work directly with complex data such as time series, HDF5 files, Apache HBase tables, and serialized objects like protocol buffers or Avro records.

Crunch does not try to discourage developers from thinking in MapReduce, but it does try to make thinking in MapReduce easier to do. MapReduce, for all of its virtues, is the wrong level of abstraction for many problems: most interesting computations are made up of multiple MapReduce jobs, and it is often the case that we need to compose logically independent operations (e.g., data filtering, data projection, data transformation) into a single physical MapReduce job for performance reasons.

Essentially, Crunch is designed to be a thin veneer on top of MapReduce -- with the intention being not to diminish MapReduce's power (or the developer's access to the MapReduce APIs) but rather to make it easy to work at the right level of abstraction for the problem at hand.

Although Crunch is reminiscent of the venerable Cascading API, their respective data models are very different: one simple common-sense summary would be that folks who think about problems as data flows prefer Crunch and Pig, and people who think in terms of SQL-style joins prefer Cascading and Hive.

Crunch Concepts

Crunch's core abstractions are a PCollection<T>, which represents a distributed, immutable collection of objects, and a PTable<K, V>, which is a sub-interface of PCollection that contains additional methods for working with key-value pairs. These two core classes support four primitive operations:

  1. parallelDo: Apply a user-defined function to a given PCollection and return a new PCollection as a result.
  2. groupByKey: Sort and group the elements of a PTable by their keys (equivalent to the shuffle phase of a MapReduce job).
  3. combineValues: Perform an associative operation to aggregate the values from a groupByKey operation.
  4. union: Treat two or more PCollections as a single, virtual PCollection.

All of Crunch's higher-order operations (joins, cogroups, set operations, etc.) are implemented in terms of these primitives. The Crunch job planner takes in the graph of operations defined by the pipeline developer, breaks the operations up into a series of dependent MapReduce jobs, and then executes them on a Hadoop cluster. Crunch also supports an in-memory execution engine that can be used to test and debug pipelines on local data.

Crunch was designed for problems that benefit from lots of user-defined functions operating on custom data types. User-defined functions in Crunch are designed to be lightweight while still providing complete access to the underlying MapReduce APIs for applications that require it. Crunch developers can also use the Crunch primitives to define APIs that provide clients with advanced ETL, machine learning, and scientific computing functionality that involves a series of complex MapReduce jobs.

Getting Started with Crunch

You can download the source or the binaries of latest version of Crunch from the website, or you can use the dependencies that are published at Maven Central.
The source code ships with a number of example applications. Here is the source code for the WordCount application in Crunch:

import org.apache.crunch.DoFn;
import org.apache.crunch.Emitter;
import org.apache.crunch.PCollection;
import org.apache.crunch.PTable;
import org.apache.crunch.Pair;
import org.apache.crunch.Pipeline;
import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.type.writable.Writables;

public class WordCount {
  public static void main(String[] args) throws Exception {
    // Create an object to coordinate pipeline creation and execution.
    Pipeline pipeline = new MRPipeline(WordCount.class);
    // Reference a given text file as a collection of Strings.
    PCollection<String> lines = pipeline.readTextFile(args[0]);

    // Define a function that splits each line in a PCollection of Strings into a
    // PCollection made up of the individual words in the file.
    PCollection<String> words = lines.parallelDo(new DoFn<String, String>() {
      public void process(String line, Emitter<String> emitter) {
	for (String word : line.split("\\s+")) {
	  emitter.emit(word);
	}
      }
    }, Writables.strings()); // Indicates the serialization format

    // The count method applies a series of Crunch primitives and returns
    // a map of the top 20 unique words in the input PCollection to their counts.
    // We then read the results of the MapReduce jobs that performed the
    // computations into the client and write them to stdout.
     for (Pair<String, Long> wordCount : words.count().top(20).materialize()) {
      System.out.println(wordCount);
     }
   }
}

The last code block in this example shows the power of Crunch’s literate API: in a single line of Java code, we configured and executed two dependent MapReduce jobs (one to count the elements of a PCollection, and a second to find the top twenty elements by that count) and read the output of the second MapReduce job into the client via Crunch’s ability to materialize PCollections as Java Iterables.

Crunch Optimization Plans

The objective of Crunch's optimizer is to run as few MapReduce jobs as possible. Most MapReduce jobs are IO-bound, so the fewer times we have to go over the data, the better. To be fair, every optimizer (Hive, Pig, Cascading, Crunch) works essentially the same way. But unlike the other frameworks, Crunch exposes its optimizer primitives to client developers, making it much easier to construct reusable, higher-level operations for tasks like constructing an ETL pipeline or building and evaluating an ensemble of random forest models.

Conclusion

Crunch is currently in incubation status with Apache, and we gladly welcome contributions from the community (see project page) to make the library even better. In particular we are seeking ideas for more efficient MapReduce compilation (including cost-based optimizations), new MapReduce design patterns, and support for more data sources and targets like HCatalog, Solr, and ElasticSearch. There are also a number of projects that bring Crunch to other JVM languages like Scala and Clojure, as well as tools that use Crunch to create MapReduce pipelines in R.

About the Author

Josh Wills is Cloudera's Director of Data Science, working with customers and engineers to develop Hadoop-based solutions across a wide-range of industries. He earned his Bachelor's degree in Mathematics from Duke University and his Master's in Operations Research from The University of Texas - Austin.

Hello stranger!

You need to Register an InfoQ account or to post comments. But there's so much more behind being registered.

Get the most out of the InfoQ experience.

Tell us what you think

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

Email me replies to any of my messages in this thread
Community comments

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

Email me replies to any of my messages in this thread

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

Email me replies to any of my messages in this thread

Discuss

Educational Content

General Feedback
Bugs
Advertising
Editorial
InfoQ.com and all content copyright © 2006-2013 C4Media Inc. InfoQ.com hosted at Contegix, the best ISP we've ever worked with.
Privacy policy
BT