BT
x Your opinion matters! Please fill in the InfoQ Survey about your reading habits!

Unit Testing Hadoop MapReduce Jobs With MRUnit, Mockito, & PowerMock

Posted by Michael Spicuzza on Dec 03, 2012 |

Introduction

Hadoop MapReduce jobs have a unique code architecture that follows a specific template with specific constructs.  This architecture raises interesting issues when doing test-driven development (TDD) and writing unit tests.   This is a real-world example using MRUnit, Mockito, and PowerMock.  I will touch upon 1) using MRUnit to write JUnit tests for hadoop MR applications, 2) using PowerMock & Mockito to mock static methods, 3) mocking-out business-logic contained in another class, 4) verifying that mocked-out business logic was called (or not) 5) testing counters, 6) testing statements in a  log4j conditional block, and 7) handling exceptions in tests.  I’m assuming the reader is already familiar with JUnit 4.

With MRUnit, you can craft test input, push it through your mapper and/or reducer, and verify it’s output all in a JUnit test.  As do other JUnit tests, this allows you to debug your code using the JUnit test as a driver.  A map/reduce pair can be tested using MRUnit’s MapReduceDriver.  A combiner can be tested using MapReduceDriver as well.  A PipelineMapReduceDriver allows you to test a workflow of map/reduce jobs.  Currently, partitioners do not have a test driver under MRUnit.  MRUnit allows you to do TDD and write light-weight unit tests which accommodate Hadoop’s specific architecture and constructs.

Example

In the following example, we’re processing road surface data used to create maps.  The input contains both linear surfaces (describing a stretch of the road) and intersections (describing a road intersection).  This mapper takes a collection of these mixed surfaces as input, discards anything that isn’t a linear  road surface, i.e., intersections, and then processes each road surface and writes it out to HDFS.   We want to keep count and eventually print out how many non-road surfaces are input.  For debugging purposes, we will additionally print out how many road surfaces were processed.

public class MergeAndSplineMapper extends Mapper<LongWritable, BytesWritable, LongWritable, BytesWritable> {
	
	 private static Logger LOG = Logger.getLogger(MergeAndSplineMapper.class);
	
	 enum SurfaceCounters {
	         ROADS, NONLINEARS, UNKNOWN
	 }
	        
	 @Override
	 public void map(LongWritable key, BytesWritable value, Context context) throws IOException, InterruptedException {
	          // A list of mixed surface types
	          LinkSurfaceMap lsm = (LinkSurfaceMap) BytesConverter.bytesToObject(value.getBytes());
	        
	          List<RoadSurface> mixedSurfaces = lsm.toSurfaceList();
	        
	          for (RoadSurface surface : mixedSurfaces)  {
	                   Long surfaceId = surface.getNumericId();
	                   Enums.SurfaceType surfaceType = surface.getSurfaceType();
	            
	                   if ( surfaceType.equals(SurfaceType.INTERSECTION)  )  {
	                             // Ignore non-linear surfaces.
	                             context.getCounter(SurfaceCounters.NONLINEARS).increment(1);
	                             continue;
	                   }
	                   else if ( ! surfaceType.equals(SurfaceType.ROAD) ) {
	                            // Ignore anything that wasn’t an INTERSECTION or ROAD, ie any future additions.
	                            context.getCounter(SurfaceCounters.UNKNOWN).increment(1);
	                            continue;
	                   }
	            
	                   PopulatorPreprocessor.processLinearSurface(surface);
	            
	                   // Write out the processed linear surface.
	                   lsm.setSurface(surface);
	                   context.write(new LongWritable(surfaceId), new BytesWritable(BytesConverter.objectToBytes(lsm)));
	                   if (LOG.isDebugEnabled()) {
	                             context.getCounter(SurfaceCounters.ROADS).increment(1);
	                   }
	          }
	 }
}

We’ve written the following unit test for our class using MRUnit, Mockito, and PowerMock.

@RunWith(PowerMockRunner.class)
@PrepareForTest(PopulatorPreprocessor.class)
public class MergeAndSplineMapperTest {
	
	 private MapDriver<LongWritable, BytesWritable, LongWritable, BytesWritable> mapDriver;
	
	 @Before
         public void setUp() {
	          MergeAndSplineMapper mapper = new MergeAndSplineMapper();
	          mapDriver = new MapDriver<LongWritable, BytesWritable, LongWritable, BytesWritable>();
	          mapDriver.setMapper(mapper);
	 }
	    
	 @Test
	 public void testMap_INTERSECTION() throws IOException {
	          LinkSurfaceMap lsm = new LinkSurfaceMap();
	          RoadSurface rs = new RoadSurface(Enums.RoadType.INTERSECTION);
	          byte[] lsmBytes = append(lsm, rs);
	        
	          PowerMockito.mockStatic(PopulatorPreprocessor.class);
	        
	          mapDriver.withInput(new LongWritable(1234567), new BytesWritable(lsmBytes));
	          mapDriver.runTest();
	
	          Assert.assertEquals("ROADS count incorrect.", 0,
	                                    mapDriver.getCounters().findCounter(SurfaceCounters.ROADS).getValue());
	          Assert.assertEquals("NONLINEARS count incorrect.", 1,
	                                    mapDriver.getCounters().findCounter(SurfaceCounters.NONLINEARS).getValue());
	          Assert.assertEquals("UNKNOWN count incorrect.", 0,
	                                    mapDriver.getCounters().findCounter(SurfaceCounters.UNKNOWN).getValue());
	        
	          PowerMockito.verifyStatic(Mockito.never());
	          PopulatorPreprocessor.processLinearSurface(rs);
	 }
		    

	 @Test
	 public void testMap_ROAD() throws IOException {
	          LinkSurfaceMap lsm = new LinkSurfaceMap();
	          RoadSurface rs = new RoadSurface(Enums.RoadType.ROAD);
	          byte[] lsmBytes = append(lsm, rs);
                 
                  // save logging level since we are modifying it.
                  Level originalLevel = Logger.getRootLogger().getLevel();
     	          Logger.getRootLogger().setLevel(Level.DEBUG);
	          PowerMockito.mockStatic(PopulatorPreprocessor.class);
        
	          mapDriver.withInput(new LongWritable(1234567), new BytesWritable(lsmBytes));
	          mapDriver.withOutput(new LongWritable(1000000), new BytesWritable(lsmBytes));
	          mapDriver.runTest();
	
	          Assert.assertEquals("ROADS count incorrect.", 1,
	                                    mapDriver.getCounters().findCounter(SurfaceCounters.ROADS).getValue());
	          Assert.assertEquals("NONLINEARS count incorrect.", 0,
	                                    mapDriver.getCounters().findCounter(SurfaceCounters.NONLINEARS).getValue());
	          Assert.assertEquals("UNKNOWN count incorrect.", 0,
	                                    mapDriver.getCounters().findCounter(SurfaceCounters.UNKNOWN).getValue());
	        
	          PowerMockito.verifyStatic(Mockito.times(1));
	          PopulatorPreprocessor.processLinearSurface(rs);
                  // set logging level back to it's original state so as not to affect other tests
                  Logger.getRootLogger().setLevel(originalLevel);

	}
}

Breaking It Down

If you look back at our class under test, we are only inspecting the surface Id and surface type, discarding anything that is not a road surface, incrementing some counters and processing road surfaces.  Let’s take a look at the first test, testMap_INTERSECTION().

testMap_INTERSECTION

Our objective is to verify

  1. SurfaceCounters.NONLINEARS is incremented.
  2. The for-loop continues, i.e., PopulatorPreprocessor.processLinearSurface(surface) is never called.
  3. SurfaceCounters.ROADS and SurfaceCounters.UNKNOWN are not incremented.

Since this is a mapper, we start by defining and initializing a mapper driver.  Note that the four type-parameters defined for the MapDriver must match our class under test, i.e., MergeAndSplineMapper.

         private MapDriver<LongWritable, BytesWritable, LongWritable, BytesWritable> mapDriver; 	 	
         @Before 	
         public void setUp() { 	        
                  MergeAndSplineMapper mapper = new MergeAndSplineMapper();
                  mapDriver = new MapDriver<LongWritable, BytesWritable, LongWritable, BytesWritable>(); 	         
                  mapDriver.setMapper(mapper);         
        }

Throwing IOException on the unit test method signature

The mapper could throw an IOException.  In JUnit tests you can handle exceptions thrown by the calling code by catching them or throwing them.  Keep in mind that we are not specifically testing exceptions.  I prefer not to catch  the exception and have the unit test method throw it.  If the unit test method encounters the exception, the test will fail.  Which is what we want. Trying to catch exceptions in unit tests, when you are not specifically testing exception handling, can lead to uneccesary clutter, logic, maintainence, when you can simply throw the exception to fail the test.

	@Test
	public void testMap_INTERSECTION() throws IOException {

Initialize the test input to drive the test.  In order to hit the if-block we want to test, we have to ensure the surface type is of RoadType.INTERSECTION.

	          LinkSurfaceMap lsm = new LinkSurfaceMap();
	          RoadSurface rs = new RoadSurface(Enums.RoadType.INTERSECTION);
	          byte[] lsmBytes = append(lsm, rs);

We use PowerMock[3] to mock out a static call to the PopulatorPreprocessor classPopulatorPreprocessor is a separate class containing business logic and is tested by it’s own JUnit test.  At the class level, we set-up PowerMock with the @RunWith annotation and tell it which classes to mock; in this case one, PopulatorPreprocessor.  With @PrepareForTest  we tell PowerMock which classes have static methods that we want to mock.  PowerMock supports both EasyMock and Mockito, since we’re using Mockito, you’ll see references to PowerMockito.  We mock the static class by calling PowerMockito.mockStatic.

@RunWith(PowerMockRunner.class)
@PrepareForTest(PopulatorPreprocessor.class)
	
	          PowerMockito.mockStatic(PopulatorPreprocessor.class);

Set the previously created test input and run the mapper:

	          mapDriver.withInput(new LongWritable(1234567), new BytesWritable(lsmBytes));
	          mapDriver.runTest();

Verify the output.  SurfaceCounters.NONLINEARS is incremented once, and SurfaceCounters.ROADS and SurfaceCounters.UNKNOWN are not incremented. A quick review – with JUnit’s assertEquals, the first parameter, a String, which is optional, is the assertion error message.  The second parameter is the expected value and the third parameter is the actual valueassertEquals prints out a nice error message of the form “expected: <x> but was: <y>.“  So if the second assertion were to fire, e.g., we could get the error message “java.lang.AssertionError: NONLINEARS count incorrect. expected:<1> but was:<0>.”

	          Assert.assertEquals("ROADS count incorrect.", 0,
	                                    mapDriver.getCounters().findCounter(SurfaceCounters.ROADS).getValue());
	          Assert.assertEquals("NONLINEARS count incorrect.", 1,
	                                    mapDriver.getCounters().findCounter(SurfaceCounters.NONLINEARS).getValue());
	          Assert.assertEquals("UNKNOWN count incorrect.", 0,
	                                    mapDriver.getCounters().findCounter(SurfaceCounters.UNKNOWN).getValue());

Verify that PopulatorPreprocessor.processLinearSurface(surface) has not been called, by using the following PowerMock/Mockito syntax.

	          PowerMockito.verifyStatic(Mockito.never());
	          PopulatorPreprocessor.processLinearSurface(rs);

testMap_ROAD

In our second test, testMap_ROAD().  Our Objective is to verify:

  1. SurfaceCounters.ROADS is incremented.
  2. That PopulatorPreprocessor.processLinearSurface(surface) is called.
  3. SurfaceCounters.NONLINEARS and SurfaceCounters.UNKNOWN are not incremented.

The setup is identical to the first test with a couple of exceptions.

       1. Specifying a Road type in our input data.

	           RoadSurface rs = new RoadSurface(Enums.RoadType.ROAD);

       2. Setting the log4j debug level.

Interestingly, in our source code we only want to count road surfaces when debug level is set in the log4j logger.  To test this,first we save the original logging level, then we retrieve the Root logger and set the level to DEBUG .

                   Level originalLevel = Logger.getRootLogger().getLevel();
                   Logger.getRootLogger().setLevel(Level.DEBUG)

At the end of the test, we revert to the original logging level so as not to affect other tests

                   Logger.getRootLogger().setLevel(originalLevel);

Once again, let’s verify the output.  SurfaceCounters. ROADS is incremented once, and SurfaceCounters. NONLINEARS and SurfaceCounters.UNKNOWN are not incremented.

	           Assert.assertEquals("ROADS count incorrect.", 1,
                                             mapDriver.getCounters().findCounter(SurfaceCounters.ROADS).getValue());
	           Assert.assertEquals("NONLINEARS count incorrect.", 0,
	                                     mapDriver.getCounters().findCounter(SurfaceCounters.NONLINEARS).getValue());
	           Assert.assertEquals("UNKNOWN count incorrect.", 0,
	                                     mapDriver.getCounters().findCounter(SurfaceCounters.UNKNOWN).getValue());

Verify that PopulatorPreprocessor.processLinearSurface(surface) has been called once, by using the following PowerMock/Mockito syntax.

	           PowerMockito.verifyStatic(Mockito.times(1));
	           PopulatorPreprocessor.processLinearSurface(rs);

Testing A REDUCER

The same principles would apply as in testing a mapper.  The difference being that we would want to create a ReducerDriver, and populate it with our reducer class under test as shown below.

     private ReduceDriver<LongWritable, BytesWritable, LongWritable, BytesWritable> reduceDriver;
	
     @Before
     public void setUp() {
	      MyReducer reducer = new MyReducer ();
	      reduceDriver = new ReduceDriver <LongWritable, BytesWritable, LongWritable, BytesWritable>();
	      reduceDriver.setReducer(reducer);
     }

MAVEN Pom Dependencies

In addition to JUnit 4, you’ll have to include the following dependencies in your maven pom.xml.  On the PowerMock web page[3], take note of the supported versions of Mockito.

	            <dependency>
	                  <groupId>org.apache.mrunit</groupId>
	                  <artifactId>mrunit</artifactId>
	                  <version>0.8.0-incubating</version>
	                  <scope>test</scope>
	            </dependency>
	            <dependency>
	                  <groupId>org.mockito</groupId>
	                  <artifactId>mockito-all</artifactId>
	                  <version>1.9.0-rc1</version>
	                  <scope>test</scope>
	            </dependency>
	            <dependency>
	                  <groupId>org.powermock</groupId>
	                  <artifactId>powermock-module-junit4</artifactId>
	                  <version>1.4.12</version>
	                  <scope>test</scope>
	            </dependency>
	            <dependency>
	                  <groupId>org.powermock</groupId>
	                  <artifactId>powermock-api-mockito</artifactId>
	                  <version>1.4.12</version>
	                  <scope>test</scope>
                    </dependency>

Running In Eclipse

The test is run just as any other JUnit test would be run.  Here’s an example of the test running inside Eclipse.

(click on the image to enlarge it)
 

Summary

MRUnit provides a powerful and light-weight approach to do test-driven development.  A nice side effect is that it helps move you to better code coverage than was previously possible.

Acknowledgements

I’d like to thank Boris Lublinsky for his perseverance in helping me complete this project.  And Miao Li for adding copius amounts of MRUnit tests to our project.

References

     [1]  Apache MRUnit
     [2]  Mockito  
     [3]  MockitoUsage13
     [4]  Hadoop: The Definitive Guide (3rd Edition) by Tom White

About The Author

Michael Spicuzza holds an M.S. in Computer Science from DePaul University and has worked in the Insurance, Travel, and Telecommunications industries.
He specializes in off-shore Agile team leadership and is focused on automated code quality and TDD.

Hello stranger!

You need to Register an InfoQ account or or login to post comments. But there's so much more behind being registered.

Get the most out of the InfoQ experience.

Tell us what you think

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

Email me replies to any of my messages in this thread

Sample data by Naresh Chintalcheru

Do you have few lines of sample data for the above ?

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

Email me replies to any of my messages in this thread

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

Email me replies to any of my messages in this thread

1 Discuss

Educational Content

General Feedback
Bugs
Advertising
Editorial
InfoQ.com and all content copyright © 2006-2014 C4Media Inc. InfoQ.com hosted at Contegix, the best ISP we've ever worked with.
Privacy policy
BT