Facilitating the spread of knowledge and innovation in professional software development

Contribute

### Choose your language

InfoQ Homepage Articles Uncovering mysteries of InputFormat: Providing better control for your Map Reduce execution.

# Uncovering mysteries of InputFormat: Providing better control for your Map Reduce execution.

As more companies adopt Hadoop, there is a greater variety in the types of problems for which Hadoop's framework is being utilized. As the various scenarios where Hadoop is applied grow, it becomes critical to control how and where map tasks are executed. One key to such control is custom InputFormat implementation.

The InputFormat class is one of the fundamental classes in the Hadoop Map Reduce framework. This class is responsible for defining two main things:

• Data splits

Data split is a fundamental concept in Hadoop Map Reduce framework which defines both the size of individual Map tasks and its potential execution server. The Record Reader is responsible for actual reading records from the input file and submitting them (as key/value pairs) to the mapper. There are quite a few publications on how to implement a custom Record Reader (see, for example, [1]), but the information on splits is very sketchy. Here we will explain what a split is and how to implement custom splits for specific purposes.

## An Anatomy of Split

Any split implementation extends the Apache base abstract class - InputSplit, defining a split length and locations. A split length is the size of the split data (in bytes), while locations is the list of node names where the data for the split would be local. Split locations are a way for a scheduler to decide on which particular machine to execute this split. A very simple[1] a job tracker works as follows:

• Receive a heartbeat form one of the task trackers, reporting map slot availability.
• Find queued up split for which the available node is "local".
• Submit split to the task tracker for the execution.

Locality can mean different things depending on storage mechanisms and the overall execution strategy. In the case of HDFS, for example, a split typically corresponds to a physical data block size and locations is a set of machines (with the set size defined by a replication factor) where this block is physically located. This is how FileInputFormat calculates splits.

A different approach was taken by HBase implementers. There, a split corresponds to a set of table keys belonging to a table region and location is a machine where a region server is running.

## Compute-Intensive Applications

A special class of Map Reduce applications is compute-intensive applications. This class of applications is characterized by the fact that execution of the Mapper.map() function is significantly longer than the data access time, by an order of magnitude at least. Technically, such applications can still use "standard" input format implementation, however, this creates a problem by overwhelming the data nodes where the data resides and leaving other nodes within the cluster underutilized. (Figure 1).

(Click on the image to enlarge it)

Figure 1: Nodes utilization in the case data locality.

Figure 1 shows that utilization of "standard" data locality for compute-intensive applications leads to huge variations in the nodes utilization - over utilization of some (red) and under utilization of the other ones (yellow and light green) . Based on this, it becomes apparent that for compute-intensive applications, the notion of "locality" has to be rethought. In this case, "locality" means even distribution of map execution between all available nodes - maximum utilization of compute capabilities of the cluster's machines.

## Changing "locality" using custom InputFormat

Assuming that the source data is available in the form of a sequence file, a simple ComputeIntensiveSequenceFileInputFormat class (Listing 1) implements the generation of splits which will be evenly distributed across all servers of the cluster.

package com.navteq.fr.mapReduce.InputFormat;
import java.io.IOException;import java.util.ArrayList;import java.util.Collection;import java.util.List;import java.util.StringTokenizer;import org.apache.hadoop.fs.FileStatus;import org.apache.hadoop.fs.Path;import org.apache.hadoop.mapred.ClusterStatus;import org.apache.hadoop.mapred.JobClient;import org.apache.hadoop.mapred.JobConf;import org.apache.hadoop.mapreduce.InputSplit;import org.apache.hadoop.mapreduce.JobContext;import org.apache.hadoop.mapreduce.lib.input.FileSplit;import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;public class ComputeIntensiveSequenceFileInputFormat<K, V> extends SequenceFileInputFormat<K, V> {          private static final double SPLIT_SLOP = 1.1; // 10% slop          static final String NUM_INPUT_FILES = "mapreduce.input.num.files";          /**          * Generate the list of files and make them into FileSplits.          */           @Override          public List<InputSplit> getSplits(JobContext job) throws IOException {                     long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));          long maxSize = getMaxSplitSize(job);          // get servers in the cluster          String[] servers = getActiveServersList(job);          if(servers == null)                     return null;          // generate splits          List<InputSplit> splits = new ArrayList<InputSplit>();          List<FileStatus>files = listStatus(job);          int currentServer = 0;          for (FileStatus file: files) {                    Path path = file.getPath();                    long length = file.getLen();                    if ((length != 0) && isSplitable(job, path)) {                               long blockSize = file.getBlockSize();                              long splitSize = computeSplitSize(blockSize, minSize, maxSize);                              long bytesRemaining = length;                              while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {                                        splits.add(new FileSplit(path, length-bytesRemaining, splitSize,                                                                   new String[] {servers[currentServer]}));                              currentServer = getNextServer(currentServer, servers.length);                              bytesRemaining -= splitSize;                    }                    if (bytesRemaining != 0) {                              splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining,                                                 new String[] {servers[currentServer]}));                              currentServer = getNextServer(currentServer, servers.length);                    }          } else if (length != 0) {                    splits.add(new FileSplit(path, 0, length,                                      new String[] {servers[currentServer]}));                    currentServer = getNextServer(currentServer, servers.length);          } else {                     //Create empty hosts array for zero length files                    splits.add(new FileSplit(path, 0, length, new String[0]));          }      }      // Save the number of input files in the job-conf      job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());      return splits;   }   private String[] getActiveServersList(JobContext context){      String [] servers = null;       try {                JobClient jc = new JobClient((JobConf)context.getConfiguration());                 ClusterStatus status = jc.getClusterStatus(true);                Collection<String> atc = status.getActiveTrackerNames();                servers = new String[atc.size()];                int s = 0;                for(String serverInfo : atc){                         StringTokenizer st = new StringTokenizer(serverInfo, ":");                         String trackerName = st.nextToken();                         StringTokenizer st1 = new StringTokenizer(trackerName, "_");                         st1.nextToken();                         servers[s++] = st1.nextToken();                }      }catch (IOException e) {                e.printStackTrace();      }      return servers;  }  private static int getNextServer(int current, int max){      current++;      if(current >= max)                current = 0;      return current;  }}

Listing 1: ComputeIntensiveSequenceFileInputFormat class

This class extends SequenceFileInputFormat and overwrites getSplits() method, calculating splits exactly the same way as FileInputFormat, but assigns the split's "locality" to leverage the available servers in the cluster. This method leverages two supported methods:

• getActiveServersList() method which calculates an array of servers (names) currently active in the cluster.
• getNextServer() calculates next server in the servers array, wrapping around when the array of servers is exhausted.

Although implementation (Listing 1) evenly distributes execution of maps between all servers of the cluster, it completely ignores the actual locality of the data. A slightly better implementation of getSplits method (Listing 2), tries to combine both strategies by placing as many of the jobs local to the data and then balancing the rest around the cluster.[2]

public List<InputSplit> getSplits(JobContext job) throws IOException {          // get splits        List<InputSplit> originalSplits = super.getSplits(job);         // Get active servers        String[] servers = getActiveServersList(job);        if(servers == null)                return null;        // reassign splits to active servers        List<InputSplit> splits = new ArrayList<InputSplit>(originalSplits.size());        int numSplits = originalSplits.size();        int currentServer = 0;        for(int i = 0; i < numSplits; i++, currentServer = getNextServer(currentServer,                                                                            servers.length)){                String server = servers[currentServer]; // Current server                boolean replaced = false;                // For every remaining split                for(InputSplit split : originalSplits){                        FileSplit fs = (FileSplit)split;                        // For every split location                        for(String l : fs.getLocations()){                                 // If this split is local to the server                                 if(l.equals(server)){                                           // Fix split location                                           splits.add(new FileSplit(fs.getPath(), fs.getStart(),                                                            fs.getLength(), new String[] {server}));                                           originalSplits.remove(split);                                           replaced = true;                                           break;                                 }                        }                        if(replaced)                                 break;                }                // If no local splits are found for this server                if(!replaced){                        // Assign first available split to it                         FileSplit fs = (FileSplit)splits.get(0);                        splits.add(new FileSplit(fs.getPath(), fs.getStart(), fs.getLength(),                                                                              new String[] {server}));                        originalSplits.remove(0);                }        }        return splits;}

Listing 2: Optimized getSplits method

In this implementation, we first use the superclass (FileInputSplit) to get splits with locations calculated to ensure data locality. We then calculate the list of available servers, and for every existing server, try to assign a split with data local to this server.

## Delayed fair scheduler

Although the code (Listing 1, Listing 2) calculates splits locality correctly, when we tried to run the code on our Hadoop cluster, we saw that it was not even close to producing even distribution between servers. The problem that we have observed is well described in [2], which also describes a solution for this problem - delayed fair scheduler.

Assuming that the fair scheduler is already setup, the following block should be added to the mapred-site.xml file in order to enable a delayed scheduler[3]:

<property>
<name>mapred.fairscheduler.locality.delay</name>
<value>360000000</value>
<property> 

With delayed fair scheduler in place, execution of our job leverages the full cluster (Figure 2). Moreover, according to our experiments, execution time in this case is about 30% less when compared to the "data locality" approach.

(Click on the image to enlarge it)

Figure 2: Nodes utilization in the case of execution locality

The Computational job used for testing ran with 96 splits and mapper tasks. The test cluster has 19 data nodes which have 8 mapper slots per node, giving the cluster 152 available slots. When the job is running, it does not fully utilize all of the slots in the cluster.

Both Ganglia screen shots are of our test cluster where the first 3 nodes are our control nodes and the fourth node is our edge node used to launch the job. The graphs show cpu/machine load. In Figure 1 there are several nodes which are heavily utilized (shown in red) and the rest of the cluster is underutilized. In Figure 2, we have a more balanced distribution, yet the cluster is still not being fully utilized. The job used in testing also allows one to run multiple threads. This increases the load on the cpu while decreasing the overall time spent in each Mapper.map() iteration. As shown in Figure 3, by increasing the number of threads we are able to better utilize the cluster resources and further reduce the time it takes for the job to complete. By changing the locality of the jobs, we are able to better utilize the cluster without sacrificing performance due to remote job data.

(Click on the image to enlarge it)

Figure 3: Nodes utilization in the case of execution locality with multithreaded Map Jobs

Even under heavy machine CPU loads, it was still possible to allow other disk I/O-intensive jobs to run in the open slots, recognizing that we would have a slight degradation in performance.

## Custom Splits

The approach described in this article works really well for large files, but for small files, there are not enough splits to leverage many machines available in the cluster. One possible solution is to make the blocks smaller, but this will create more strain (memory requirements) to the cluster's name node. A better approach is to modify the code (Listing 1) to use a custom block size (instead of file block size). Such an approach allows one to calculate the desired amount of splits regardless of the actual file size.

## Conclusion

In this article we have shown how to leverage custom InputFormats to have tighter control on how the map tasks in your Map Reduce jobs are distributed among available servers. Such control is especially important for a special class of applications - compute intensive applications, which leverage Hadoop Map Reduce as a generic parallel execution framework.

## About the Authors

Boris Lublinsky is principal architect at NAVTEQ, where he is working on defining architecture vision for large data management and processing and SOA and implementing various NAVTEQ projects. He is also an SOA editor for InfoQ and a participant of SOA RA working group in OASIS. Boris is an author and frequent speaker, his most recent book "Applied SOA".

Michael Segel has spent the past 20+ years working with customers identifying and solving their business problems. Michael has worked in multiple roles, in multiple industries. He is an independent consultant who is always looking to solve any challenging problems. Michael has a Software Engineering degree from the Ohio State University.

### References

1. Boris Lublinsky, Mike Segel. Custom XML Records Reader.

2. Matei Zaharia, Dhruba Borthakur, Joydeep Sen Sarma, Khaled Elmeleegy, Scott Shenker, Ion Stoica. Delay Scheduling: A Simple Technique for Achieving Locality and Fairness in Cluster Scheduling.

[1] This is a gross over simplification to explain a base mechanics. A real scheduler algorithm is significantly more complex; taking into consideration many more parameters than just splits locations.

[2] While we present this as an option, if the time spent in the Mapper.map() method is an order of magnitude or more than the time it takes to remotely access the data, there will be no performance improvement over the code presented in Listing 1. However, it might somewhat improve network utilization.

[3] Please note that the delay is in ms, and that after changing the value, you need to restart the Job Tracker.

Style

## Hello stranger!

You need to Register an InfoQ account or or login to post comments. But there's so much more behind being registered.

Get the most out of the InfoQ experience.

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

• ##### Nice Article

by Paulo Suzart,

• ##### Is it really needed to override getSplits() to implement fairness?

by Sorin Ciolofan,

• ##### Great Article!!But am struck at a point.

by Swapnil Arsh,

• ##### Nice Article

by Paulo Suzart,

Your message is awaiting moderation. Thank you for participating in the discussion.

Nice article. It clears a lot some important details. Another point I think worth writting is about some patterns and use cases to put date into HDSF.

What is the best approach? Smal files directly created by producer application in HDFS? A job for extracting data from a database?

It would be great to have an article talking about these kind of situation.

Congratulations!

• ##### Is it really needed to override getSplits() to implement fairness?

Your message is awaiting moderation. Thank you for participating in the discussion.

Hi!

Since you have used fairscheduler and have set the delay time for it why is it needed to override the getSplits() as you did in Listing1 and Listing2? I understood that you did that to implement an own version of fairness in order to evenly distribute the jobs on all available servers but this fairness is not done implicitly by the fairscheduler?

• ##### Great Article!!But am struck at a point.

by Swapnil Arsh,

Your message is awaiting moderation. Thank you for participating in the discussion.

I find the article very interesting and very relevant to what i am trying to achieve.
But while implementing the same, the status.getActiveTrackerNames() is returning NULL. Implies no task trackers are running. What could be a reason to this and how could I cope with this problem.
Thanks.

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p