BT

Exploring Hadoop OutputFormat

Posted by Jim.Blomo on Dec 07, 2011 |

Hadoop is often used as a part in a larger ecosystem of data processing. Hadoop's sweet spot, batch processing large amounts of data, can best be put to use by integrating it with other systems. At a high level, Hadoop ingests input files, streams the contents through custom transformations (the Map-Reduce steps), and writes output files back to disk. Last month InfoQ showed how to gain finer control over the first step, ingestion of input files via the InputFormat class. In this article, we'll discuss how to customize the final step, writing the output files. OutputFormats let you easily interoperate with other systems by writing the result of a MapReduce job in formats readable by other applications. To demonstrate the usefulness of OutputFormats, we'll discuss two examples: how to split up the result of a job into different directories, and how to write files for a service providing fast key-value lookups.

What do OutputFormats do?

The OutputFormat interface determines where and how the results of your job are persisted. Hadoop comes with a collection of classes and interfaces for different types of formats, and customization is done by extending one of these. You are probably already familiar with the default OutputFormat: a line separated, tab delimited text file of key-value pairs. This is the TextOutputFormat. But for many types of data, most obviously numbers, text serialization wastes space, which translates into longer run times and more resources consumed. To avoid the drawbacks of text files, Hadoop provides the SequenceFileOutputformat which can write the binary representation of objects instead of converting it to text, and compress the result. Below is the class hierarchy Hadoop provides:

  • FileOutputFormat (implements interface OutputFormat) – base class for all OutputFormats
    • MapFileOutputFormat – a format for partially indexed keys
    • SequenceFileOutputFormat – compressed binary key-value data
      • SequenceFileAsBinaryOutputFormat – compressed raw binary data
    • TextOutputFormat – line separated, tab delimited text file of key-value pairs
    • MultipleOutputFormat – abstract class for writing a file paramaterized by the key-value
      • MultipleTextOutputFormat – write to multiple files in the standard line separated, tab delimited format
      • MultipleSequenceFileOutputFormat – write to multiple files in the compressed file format

OutputFormats specify how to serialize data by providing a implementation of RecordWriter. RecordWriter classes handle the job of taking an individual key-value pair and writing it to the location prepared by the OutputFormat. There are two main functions to a RecordWriter implements: 'write' and 'close'. The 'write' function takes key-values from the MapReduce job and writes the bytes to disk. The default RecordWriter is LineRecordWriter, part of the TextOutputFormat mentioned earlier. It writes:

  • The key's bytes (returned by the getBytes() function)
  • a tab character delimiter
  • the value's bytes (again, produced by getBytes())
  • a newline character.

The 'close' function closes the Hadoop data stream to the output file.

We've talked about the format of output data, but where is it stored? Again, you've probably seen the output of a job stored in many ‘part' files under the output directory like so:

|-- output-directory
| |-- part-00000
| |-- part-00001
| |-- part-00002
| |-- part-00003
| |-- part-00004
  '-- part-00005

By default, when data needs to be written, each process creates its own file in the output directory. Data is written by reducers at the end of a job (or mappers if there are no reducers). Even when we create customized output directories later in this article, we'll keep the "part" files so that multiple writers can write to the same directory without interfering with one another.

Customizing Your OutputFormat

We've seen that the main responsibilities of an OutputFormat class are determining where the data is stored and how the data is written. Why would you want to customize these behaviors? One reason for customizing the data location is to separate the output of a MapReduce job into different directories. For example, imagine wanting to process a log file of search requests from around the world to get the frequency of searches per country. You'd like to be able to look at results from a particular country independent of any other. Perhaps later in your data pipeline you have different processes that apply to different countries, or you'd like to replicate the results of a specific country to a datacenter in that country. With the default OutputFormat, all data is stored in one directory and the contents of the part files are unknown without scanning through them. With a custom OutputFormat, you can create a layout of one subdirectory per country like this:

|-- output-directory
|   |-- France
|   |   |-- part-00000
|   |   |-- part-00001
|   |   '-- part-00002
... |
|   '-- Zimbabwe
|       |-- part-00000
|       |-- part-00001
|       '-- part-00002

Where each part file has key-value pairs for "search terms" => frequency. Reading data from only one country is now as easy as specifying the path with that country's data. We'll see below how we can subclass the MultipleTextOutputFormat class to get the desired behavior.

The other reason to customize, exemplified in a project called ElephantDB, is to store data in a format that is "native" to consuming application. This project was created to allow the result of a MapReduce job to be queried as a distributed service. Instead of writing text files, ElephantDB uses a custom OutputFormat to write BerkeleyDB files, which are indexed by the keys of a job output. The BerkeleyDB files can then be loaded by a service to provide low latency lookups of arbitrary keys. Similar systems include HBase and Voldemort, which can store key-value data generated by Hadoop. ElephantDB's focus is on simplicity and tighter integration with Hadoop's batch style updates.

Multiplexing Output

To solve the search logs problem we'll extend MultipleTextOutputFormat to choose an output directory based on the key-value being written. Our MapReduce job will produce keys of the country from which the search request originated, and a value of the search terms and frequency of that search. MultipleTextOutputFormat already knows how to write Text files, so we don't need to implement the serialization responsibility of the OutputFormat. Listing 1 implements the class:

1 package oddjob.hadoop;
2
3 import org.apache.hadoop.fs.Path;
4 import org.apache.hadoop.io.Text;
5 import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat;
6
7 public class MultipleTextOutputFormatByKey extends MultipleTextOutputFormat<Text, Text> {
8
9        /**
10        * Use they key as part of the path for the final output file.
11        */
12       @Override
13       protected String generateFileNameForKeyValue(Text key, Text value, String leaf) {
14             return new Path(key.toString(), leaf).toString();
15       }
16
17       /**
18        * When actually writing the data, discard the key since it is already in
19        * the file path.
20        */
21       @Override
22       protected Text generateActualKey(Text key, Text value) {
23             return null;
24          }
25 }

Listing 1: An example subclass of MultipleTextOutputFormat

In MultipleTextOutputFormatByKey we specify where the job output is stored in generateActualFileNameForKeyValue (line 13). For each output key-value pair produced by a MapReduce job, the class adds the key to the path name for output. The 'leaf' argument is the 'part-00000' file we saw earlier, which is unique per reducer to allow simultaneous writes to an output directory without impacting each other . For example, a key of "France" and a value of "soccer 5000" produced by the first reducer would be written to a file in 'output-directory/France/part-00000'.

To use this class, ensure that Hadoop includes the jar with the custom class, and specify the full class name as the argument to '-outputformat':

hadoop jar hadoop-streaming.jar -libjars CustomOutputFormats.jar \
  -outputformat oddjob.hadoop.MultipleTextOutputFormatByKey \
  -input search-logs \
  -output search-frequency-by-country \
  -mapper parse-logs.py \
  -reducer count-searches.py 

Listing 1 is a Java implementation of one of the classes in oddjob, an open source library with several MultipleTextOutputFormats. The library is geared toward Hadoop's streaming feature, but can be used other jobs that produce Text key-value outputs.

Preparing Output for a Service

Our next example must implement two interfaces to customize both the serialization of data and the placement of those files into a directory structure that is loadable by the ElephantDB service. As discussed, the serialization is handled by implementations of RecordWriters. While the simple LineRecordWriter streams bytes to an output file, the ElephantRecordWriter contains specialized logic to select files in which to write and uses a third party library to format data on disk.

1   public class ElephantRecordWriter implements RecordWriter<IntWritable, ElephantRecordWritable> {
2
3       FileSystem _fs;
4       Args _args;
5       Map<Integer, LocalPersistence> _lps = new HashMap<Integer, LocalPersistence>();
6       Progressable _progressable;
7       LocalElephantManager _localManager;
8
9       int _numWritten = 0;
10      long _lastCheckpoint = System.currentTimeMillis();
11
12      public ElephantRecordWriter(Configuration conf, Args args, Progressable progressable) throws IOException {
13         _fs = Utils.getFS(args.outputDirHdfs, conf);
14         _args = args;
15         _progressable = progressable;
16         _localManager = new LocalElephantManager(_fs, args.spec, args.persistenceOptions, LocalElephantManager.getTmpDirs(conf));
17      }
18
19      private String remoteUpdateDirForShard(int shard) {
20          if(_args.updateDirHdfs==null) return null;
21          else return _args.updateDirHdfs + "/" + shard;
22      }
23
24      public void write(IntWritable shard, ElephantRecordWritable record) throws IOException {
25          LocalPersistence lp = null;
26          LocalPersistenceFactory fact = _args.spec.getLPFactory();
27          Map<String, Object> options = _args.persistenceOptions;
28          if(_lps.containsKey(shard.get())) {
29             lp = _lps.get(shard.get());
30          } else {
31             String updateDir = remoteUpdateDirForShard(shard.get());
32             String localShard = _localManager.downloadRemoteShard("" + shard.get(), updateDir);
33             lp = fact.openPersistenceForAppend(localShard, options);
34             _lps.put(shard.get(), lp);
35             progress();
36          }
37
38          _args.updater.updateElephant(lp, record.key, record.val);
39
40          _numWritten++;
41          if(_numWritten % 25000 == 0) {
42             long now = System.currentTimeMillis();
43             long delta = now - _lastCheckpoint;
44             _lastCheckpoint = now;
45             LOG.info("Wrote last 25000 records in " + delta + " ms");
46             _localManager.progress();
47          }
48      }
49
50      public void close(Reporter reporter) throws IOException {
51          for(Integer shard: _lps.keySet()) {
52             String lpDir = _localManager.localTmpDir("" + shard);
53             LOG.info("Closing LP for shard " + shard + " at " + lpDir);
54             _lps.get(shard).close();
55             LOG.info("Closed LP for shard " + shard + " at " + lpDir);
56             progress();
57             String remoteDir = _args.outputDirHdfs + "/" + shard;
58             if(_fs.exists(new Path(remoteDir))) {
59                 LOG.info("Deleting existing shard " + shard + " at " + remoteDir);
60                 _fs.delete(new Path(remoteDir), true);
61                 LOG.info("Deleted existing shard " + shard + " at " + remoteDir);
62             }
63             LOG.info("Copying " + lpDir + " to " + remoteDir);
64             _fs.copyFromLocalFile(new Path(lpDir), new Path(remoteDir));
65             LOG.info("Copied " + lpDir + " to " + remoteDir);
66             progress();
67          }
68          _localManager.cleanup();
69      }
70
71      private void progress() {
72           if(_progressable!=null) _progressable.progress();
73      }
74   }

Listing 2: A subclass of RecordWriter excerpted from ElephantDB

ElephantDB works by sharding (partitioning) data across several LocalPersistence objects (BerkeleyDB files). The write function in the ElephantRecordWriter takes the shard ID, checks to see if it already has the shard open (line 28), and if not opens or creates a new local file (line 33). The updateElephant call on line 38 then writes the key-value pair from the job output into the BerkeleyDB file.

When closing the ElephantRecordWriter, the class copies the local BerkeleyDB files into HDFS on line 64, optionally overwriting older files. The progress calls inform Hadoop that the RecordWriter is proceeding as planned, similar to status or counter updates in the actual MapReduce job.

The next step is to implement the OutputFormat which utilizes the ElephantRecordWriter. To understand this listing, it is important to know what a Hadoop JobConf object encapsulates. As the name suggests, the object contains all of the settings for a job, including the input and output directories, job name, and mapper and reducer classes. Listing 3 demonstrates how the two customized classes work together:

1 public class ElephantOutputFormat implements OutputFormat<IntWritable, ElephantRecordWritable> {
2     public static Logger LOG = Logger.getLogger(ElephantOutputFormat.class);
3
4     public RecordWriter<IntWritable, ElephantRecordWritable> getRecordWriter(FileSystem fs, JobConf conf, String string, Progressable progressable) throws IOException {
5         return new ElephantRecordWriter(conf, (Args) Utils.getObject(conf, ARGS_CONF), progressable);
6     }
7
8     public void checkOutputSpecs(FileSystem fs, JobConf conf) throws IOException {
9         Args args = (Args) Utils.getObject(conf, ARGS_CONF);
10         fs = Utils.getFS(args.outputDirHdfs, conf);
11         if(conf.getBoolean("mapred.reduce.tasks.speculative.execution", true)) {
12             throw new InvalidJobConfException("Speculative execution should be false");
13         }
14         if(fs.exists(new Path(args.outputDirHdfs))) {
15             throw new InvalidJobConfException("Output dir already exists " + args.outputDirHdfs);
16         }
17         if(args.updateDirHdfs!=null && !fs.exists(new Path(args.updateDirHdfs))) {
18             throw new InvalidJobConfException("Shards to update does not exist " + args.updateDirHdfs);
19         }
20     }
21   }

Listing 3: An implementation of OutputFormat excerpted from ElephantDB

As shown earlier, the OutputFormat has two responsibilities: determining where and how data is written. ElephantOutputFormat determines where by examining the JobConf and checking to make sure it is a legitimate destination on lines 14 and 17. The 'how' is handled by the getRecordWriter function, which returns the ElephantRecordWriter in Listing 2.

From Hadoop's point of view, these classes come together when the MapReduce job finishes and each reducer has produced a stream of key-value pairs. Hadoop calls checkOutputSpecs with the job's configuration. If the function runs without throwing an Exception, it moves on and calls getRecordWriter which returns an object which can write that stream of data. When all of the pairs have been written, Hadoop calls the close function on the writer, committing that data to HDFS and finishing the responsibility of that reducer.

Conclusion

OutputFormats are an essential component of the Hadoop framework. They allow you to interoperate with other systems and services by producing output fit for their consumption. Customizing where you place output from your jobs can simplify and speed up your data workflows. Customizing how your write your data can let you quickly put it to work in the rest of your environment. Implementing OutputFormats can be as simple as overriding a few methods, but is flexible enough to support entirely new on-disk formats.

About the Author

Jim Blomo (@jimblomo) is passionate about putting data to work by developing robust, elegant systems. At Yelp, he manages a growing data mining team that uses Hadoop, mrjob, and oddjob to process TBs of data. Before Yelp, he built infrastructure for startups and Amazon. He enjoys the food, culture, and outdoors of the San Francisco Bay Area with his wife.

 

 

Hello stranger!

You need to Register an InfoQ account or or login to post comments. But there's so much more behind being registered.

Get the most out of the InfoQ experience.

Tell us what you think

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

Email me replies to any of my messages in this thread

great post by dharmi sarkar

Thank you. great post on output formats.
If I have to persist serialized java (jaxb) objects into HDFS, what should the ideal output format be?

Re: great post by Jim Blomo

I'd recommend using something very similar to the ElephantDB outputformat: write out the Java objects to a file, and move the file into HDFS on close().

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

Email me replies to any of my messages in this thread

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

Email me replies to any of my messages in this thread

2 Discuss

Educational Content

General Feedback
Bugs
Advertising
Editorial
InfoQ.com and all content copyright © 2006-2014 C4Media Inc. InfoQ.com hosted at Contegix, the best ISP we've ever worked with.
Privacy policy
BT