BT

Oozie by Example

Posted by Boris Lublinsky, Mike Segel on Jul 18, 2011 |

In our previous article [Introduction to Oozie] we described Oozie workflow server and presented an example of a very simple workflow. We also described deployment and configuration of workflow for Oozie and tools for starting, stoping and monitoring Oozie workflows.

In this article we will describe a more complex Oozie example, which will allow us to discuss more Oozie features and demonstrate how to use them.

Defining process

The workflow which we are describing here implements vehicle GPS probe data ingestion. Probes data is delivered to a specific HDFS directory[1] hourly in a form of file, containing all probes for this hour. Probes ingestion is done daily for all 24 files for this day. If the amount of files is 24, an ingestion process should start. Otherwise:

  • For the current day do nothing
  • For the previous days – up to 7, send the reminder to the probes provider
  • If the age of the directory is 7 days, ingest all available probes files.

The overall implementation of the process is presented at Figure 1

(Click on the image to enlarge it.)

Figure 1: Process diagram

Here the main process (ingestion process) first calculates directories names for current and 6 previous days and the starts (forks) 7 directory subprocesses (subflows). Once all subprocesses reach the end state, the join step will transfer control to end state.

The subprocess starts by getting information about the directory – its age and amount of files. Based on this information, it makes a decision whether to ingest and archive data, send reminder email or do nothing.

Directory subprocess implementation

The workhorse of our implementation is directory subprocess (Listing 1)

<workflow-app xmlns='uri:oozie:workflow:0.1' name='processDir'>

       <start to='getDirInfo' />

       <!-- STEP ONE -->
       <action name='getDirInfo'>
               <!--writes 2 properties: dir.num-files: returns -1 if dir doesn't exist,
                   otherwise returns # of files in dir dir.age: returns -1 if dir doesn't exist,
                   otherwise returns age of dir in days -->
               <java>
                   <job-tracker>${jobTracker}</job-tracker>
                   <name-node>${nameNode}</name-node>
                   <main-class>com.navteq.oozie.GetDirInfo</main-class>
                   <arg>${inputDir}</arg>
                   <capture-output />
               </java>
               <ok to="makeIngestDecision" />
               <error to="fail" />
        </action>

        <!-- STEP TWO -->
        <decision name="makeIngestDecision">
               <switch>
                      <!-- empty or doesn't exist -->
                      <case to="end">
                            ${wf:actionData('getDirInfo')['dir.num-files'] lt 0 ||
                            (wf:actionData('getDirInfo')['dir.age'] lt 1 and
                            wf:actionData('getDirInfo')['dir.num-files'] lt 24)}
                      </case>
                      <!-- # of files >= 24 -->
                      <case to="ingest">
                            ${wf:actionData('getDirInfo')['dir.num-files'] gt 23 ||
                            wf:actionData('getDirInfo')['dir.age'] gt 6}
                      </case>
                      <default to="sendEmail"/>
               </switch>
        </decision>

        <!--EMAIL-->
        <action name="sendEmail">
                <java>
                      <job-tracker>${jobTracker}</job-tracker>
                      <name-node>${nameNode}</name-node>
                      <main-class>com.navteq.oozie.StandaloneMailer</main-class>
                      <arg>probedata2@navteq.com</arg>
                      <arg>gregory.titievsky@navteq.com</arg>
                      <arg>${inputDir}</arg>
                      <arg>${wf:actionData('getDirInfo')['dir.num-files']}</arg>
                      <arg>${wf:actionData('getDirInfo')['dir.age']}</arg>
                </java>
                <ok to="end" />
                <error to="fail" />
        </action>

        <!--INGESTION -->
        <action name="ingest">
                <java>
                      <job-tracker>${jobTracker}</job-tracker>
                      <name-node>${nameNode}</name-node>
                      <prepare>
                             <delete path="${outputDir}" />
                      </prepare>
                      <configuration>
                             <property>
                                   <name>mapred.reduce.tasks</name>
                                   <value>300</value>
                             </property>
                      </configuration>
                      <main-class>com.navteq.probedata.drivers.ProbeIngest</main-class>
                      <arg>-conf</arg>
                      <arg>action.xml</arg>
                      <arg>${inputDir}</arg>
                      <arg>${outputDir}</arg>
                </java>
                <ok to=" archive-data" />
                <error to="ingest-fail" />
        </action>

        <!—Archive Data -->
        <action name="archive-data">
                <fs>
                      <move source='${inputDir}' target='/probe/backup/${dirName}' />
                      <delete path = '${inputDir}' />
                </fs>
                <ok to="end" />
                <error to="ingest-fail" />
        </action>

        <kill name="ingest-fail">
                <message>Ingestion failed, error
                     message[${wf:errorMessage(wf:lastErrorNode())}]</message>
        </kill>

        <kill name="fail">
                <message>Java failed, error
                     message[${wf:errorMessage(wf:lastErrorNode())}]</message>
        </kill>
        <end name='end' />
</workflow-app>

Listing 1: Directory subprocess

Start node of this subprocess invokes custom java node, which gets the directory information (Listing 2)

package com.navteq.oozie;

import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStream;
import java.util.GregorianCalendar;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class GetDirInfo {
         private static final String OOZIE_ACTION_OUTPUT_PROPERTIES = "oozie.action.output.properties";

         public static void main(String[] args) throws Exception {
                  String dirPath = args[0];
                  String propKey0 = "dir.num-files";
                  String propVal0 = "-1";
                  String propKey1 = "dir.age";
                  String propVal1 = "-1";
                  System.out.println("Directory path: '"+dirPath+"'");

                  Configuration conf = new Configuration();
                  FileSystem fs = FileSystem.get(conf);
                  Path hadoopDir = new Path(dirPath);
                  if (fs.exists(hadoopDir)){
                             FileStatus[] files = FileSystem.get(conf).listStatus(hadoopDir);
                             int numFilesInDir = files.length;
                             propVal0 = Integer.toString(numFilesInDir);
                             long timePassed, daysPassedLong;
                             int daysPassed;
                             String dirName = hadoopDir.getName();
                             String[] dirNameArray = dirName.split("-");
                             if (dirNameArray.length == 3) {
                                      int year = Integer.valueOf(dirNameArray[0]);
                                      int month = Integer.valueOf(dirNameArray[1]) - 1; //months are 0 based
                                      int date = Integer.valueOf(dirNameArray[2]);
                                      GregorianCalendar dirCreationDate = new GregorianCalendar(year,
                                                      month, date);
                                      timePassed = (new GregorianCalendar()).getTimeInMillis()
                                                      - dirCreationDate.getTimeInMillis();
                                      daysPassed = (int) = timePassed / 1000 / 60 / 60 / 24;;
                                      propVal1 = Integer.toString(daysPassed);
                             }
                  }
                  String oozieProp = System.getProperty(OOZIE_ACTION_OUTPUT_PROPERTIES);
                  if (oozieProp != null) {
                             File propFile = new File(oozieProp);
                             Properties props = new Properties();
                             props.setProperty(propKey0, propVal0);
                             props.setProperty(propKey1, propVal1);
                             OutputStream os = new FileOutputStream(propFile);
                             props.store(os, "");
                             os.close();
                  } else
                             throw new RuntimeException(OOZIE_ACTION_OUTPUT_PROPERTIES
                                            + " System property not defined");
         }
}

Listing 2: Get directory information node

This class gets directory name as an input parameter and first checks whether directory exists. If directory does not it will return -1 for both age and number of files, else, both age and the number of files will be returned to the subprocess.

The next step in the subprocess is a switch (decision) statement, which decides how to process the directory. If directory does not exist (number of files < 0), or it is current (directory age < 1) and number of files is less than 24 (number of files < 24) subprocess transitions directly to the end. Iif all the files are in the subdirectory (number of files > 23) or directory is at least 7 days old (directory age > 6), the following will occur:

  • Data is ingested using existing Map/reduce program[2]
  • Directory is backed up in the data archive and deleted

Additional configuration on action nodes
The ingestion action shows some additional Oozie configuration parameters including:
  • Prepare - The prepare element, if present, indicates a list of path to delete before starting the job. This should be used exclusively for directory cleanup. The delete operation will be performed in the fs.default.name filesystem.
  • Configuration - The configuration element, if present, contains JobConf properties for the Map/Reduce job. It can be used not only for map/reduce action. But also in java action that starts map/reduce job

If neither of the above cases is true then a subprocess sends remainder email and exits. An email is implemented as another java main class (Listing 3)

package com.navteq.oozie;

import java.util.Properties;
import javax.mail.Message;
import javax.mail.Session;
import javax.mail.Transport;
import javax.mail.internet.InternetAddress;
import javax.mail.internet.MimeMessage;

public class StandaloneMailer {

         private static String _mServer = "imailchi.navtech.com";
         private static Properties _props = null;

         private StandaloneMailer(){}

         public static void init(String mServer){

                  _mServer = mServer;
                  _props = new Properties();
                  _props.setProperty("mail.smtp.host", _mServer);
         }

         public static void SendMail(String subject, String message, String from, String to) throws Exception {

          // create some properties and get the default Session
          Session session = Session.getDefaultInstance(_props, null);

          // create a message
          Message msg = new MimeMessage(session);

          // set the from and to address
          InternetAddress addressFrom = new InternetAddress(from);
          msg.setFrom(addressFrom);

          String [] recipients = new String[] {to};
          InternetAddress[] addressTo = new InternetAddress[recipients.length];
          for (int i = 0; i < recipients.length; i++){
            addressTo[i] = new InternetAddress(recipients[i]);
          }
          msg.setRecipients(Message.RecipientType.TO, addressTo);

          // Setting the Subject and Content Type
          msg.setSubject(subject);
          msg.setContent(message, "text/plain");
          Transport.send(msg);
         }

         public static void main (String[] args) throws Exception {
                  if (args.length ==5){
                            init(_mServer);
                            StringBuilder subject = new StringBuilder();
                            StringBuilder body = new StringBuilder();
                            subject.append("Directory ").append(args[2]).append(" contains
").append(args[3]).append(" files.");
                            body.append("Directory ").append(args[2]).append(" is ").append(args[4]).
                            append(" days old and contains only ").append(args[3]).append(" files instead of 24.");
                            SendMail(subject.toString(), body.toString(), args[0], args[1]);
                  }
                  else throw new Exception("Invalid number of parameters provided for email");
         }
}

Listing 3: Send reminder email

This is a straightforward implementation using javax.mail APIs to send emails.

Main process implementation

With the subprocess implementation in place, implementation of the main process is fairly straightforward (Listing 4)[3]

<workflow-app xmlns='uri:oozie:workflow:0.1' name='processDirsWF'>

        <start to='getDirs2Process' />

        <!-- STEP ONE -->
        <action name='getDirs2Process'>
                 <!--writes 2 properties: dir.num-files: returns -1 if dir doesn't exist,
                          otherwise returns # of files in dir dir.age: returns -1 if dir doesn't exist,
                          otherwise returns age of dir in days -->
                 <java>
                          <job-tracker>${jobTracker}</job-tracker>
                          <name-node>${nameNode}</name-node>
                          <main-class>com.navteq.oozie.GenerateLookupDirs</main-class>
                          <capture-output />
                 </java>
                 <ok to="forkSubWorkflows" />
                 <error to="fail" />
        </action>

<fork name="forkSubWorkflows">
       <path start="processDir0"/>
       <path start="processDir1"/>
       <path start="processDir2"/>
       <path start="processDir3"/>
       <path start="processDir4"/>
       <path start="processDir5"/>
       <path start="processDir6"/>
       <path start="processDir7"/>
</fork>

<action name="processDir0">
 <sub-workflow>
         <app-path>hdfs://sachicn001:8020/user/gtitievs/workflows/ingest</app-path>
         <configuration>
          <property>
              <name>inputDir</name>
<value>hdfs://sachicn001:8020/user/data/probedev/files/${wf:actionData('getDirs2Process')['dir0']}</value>
          </property>
          <property>
              <name>outputDir</name>
<value>hdfs://sachicn001:8020/user/gtitievs/probe-output/${wf:actionData('getDirs2Process')['dir0']}</value>
          </property>
          <property>
              <name>jobTracker</name>
              <value>${jobTracker}</value>
          </property>
          <property>
              <name>nameNode</name>
              <value>${nameNode}</value>
          </property>
          <property>
              <name>activeDir</name>
              <value>hdfs://sachicn001:8020/user/gtitievs/test-activeDir</value>
          </property>
          <property>
              <name>dirName</name>
              <value>${wf:actionData('getDirs2Process')['dir0']}</value>
          </property>
      </configuration>
  </sub-workflow>
  <ok to="joining"/>
  <error to="fail"/>
 </action>
….
 <action name="processDir7">
  <sub-workflow>
         <app-path>hdfs://sachicn001:8020/user/gtitievs/workflows/ingest</app-path>
         <configuration>
           <property>
             <name>inputDir</name>
<value>hdfs://sachicn001:8020/user/data/probedev/files/${wf:actionData('getDirs2Process')['dir7']}</value>

           </property>
           <property>
             <name>outputDir</name>
<value>hdfs://sachicn001:8020/user/gtitievs/probe-output/${wf:actionData('getDirs2Process')['dir7']}</value>
           </property>
           <property>
             <name>dirName</name>
             <value>${wf:actionData('getDirs2Process')['dir7']}</value>
           </property>
       </configuration>
  </sub-workflow>
  <ok to="joining"/>
  <error to="fail"/>
 </action>

 <join name="joining" to="end"/>

         <kill name="fail">
                  <message>Java failed, error
                         message[${wf:errorMessage(wf:lastErrorNode())}]</message>
         </kill>
         <end name='end' />
</workflow-app>

Listing 4: Main ingestion process

The process first invokes java node, calculating a list of directories to process (Listing 5), and then spawns a subprocess per directory to process a given directory.

package com.navteq.oozie;

import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStream;
import java.util.Calendar;
import java.util.GregorianCalendar;
import java.util.Properties;

public class GenerateLookupDirs {

          public static final long dayMillis = 1000 * 60 * 60 * 24;
          private static final String OOZIE_ACTION_OUTPUT_PROPERTIES = "oozie.action.output.properties";

          public static void main(String[] args) throws Exception {
                    Calendar curDate = new GregorianCalendar();
                    int year, month, date;
                    String propKey, propVal;

                    String oozieProp = System.getProperty(OOZIE_ACTION_OUTPUT_PROPERTIES);
                    if (oozieProp != null) {
                              File propFile = new File(oozieProp);
                              Properties props = new Properties();

                              for (int i = 0; i<8; ++i)
                              {
                                         year = curDate.get(Calendar.YEAR);
                                         month = curDate.get(Calendar.MONTH) + 1;
                                         date = curDate.get(Calendar.DATE);
                                         propKey = "dir"+i;
                                         propVal = year + "-" +
                                                 (month < 10 ? "0" + month : month) + "-" +
                                                 (date < 10 ? "0" + date : date);
                                         props.setProperty(propKey, propVal);
                                         curDate.setTimeInMillis(curDate.getTimeInMillis() - dayMillis);
                              }
                              OutputStream os = new FileOutputStream(propFile);
                              props.store(os, "");
                              os.close();
                    } else
                              throw new RuntimeException(OOZIE_ACTION_OUTPUT_PROPERTIES
                                             + " System property not defined");
          }
}

Listing 5: Directories calculator

Conclusion

In this article we have shown a more complex end-to-end workflow example, which allowed us to demonstrate additional Oozie features and their usage. In the next article we will discuss building a library of reusable Oozie components and extending Oozie with custom nodes.

Acknowledgements

Authors are thankful to our Navteq colleague Gregory Titievsky for implementing the majority of the code.

About the Authors

Boris Lublinsky is principal architect at NAVTEQ, where he is working on defining architecture vision for large data management and processing and SOA and implementing various NAVTEQ projects. He is also an SOA editor for InfoQ and a participant of SOA RA working group in OASIS. Boris is an author and frequent speaker, his most recent book "Applied SOA".

Michael Segel has spent the past 20+ years working with customers identifying and solving their business problems. Michael has worked in multiple roles, in multiple industries. He is an independent consultant who is always looking to solve any challenging problems. Michael has a Software Engineering degree from the Ohio State University.

References

1. Boris Lublinsky, Mike Segel. Introduction to Oozie.


[1] The name of the directory is the date for which this data is collected

[2] This is an existing program, description of which is not relevant for this article.

[3] Some of the repeating code here is omitted.

Hello stranger!

You need to Register an InfoQ account or or login to post comments. But there's so much more behind being registered.

Get the most out of the InfoQ experience.

Tell us what you think

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

Email me replies to any of my messages in this thread

Java class to ingest data by KHATHUTSHELO PRINCE

Awesome stuff!!!

But i do not see the java class for

<main-class>com.navteq.probedata.drivers.ProbeIngest</main-class>

may you please help me on that one.

OOZIE_ACTION_OUTPUT_PROPERTIES by A J

Hey,

What are different ways to set OOZIE_ACTION_OUTPUT_PROPERTIES property?

Specifically, when I am running Oozie as LocalOozie in my testing environment, different actions in my workflow are setting different values to OOZIE_ACTION_OUTPUT_PROPERTIES property. Looking at the path generated, I suspect it is due to different mappers get instantiated for different actions.

How to make all actions in my workflow using the same value for OOZIE_ACTION_OUTPUT_PROPERTIES property?

Thanks..!!

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

Email me replies to any of my messages in this thread

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

Email me replies to any of my messages in this thread

2 Discuss

Educational Content

General Feedback
Bugs
Advertising
Editorial
InfoQ.com and all content copyright © 2006-2014 C4Media Inc. InfoQ.com hosted at Contegix, the best ISP we've ever worked with.
Privacy policy
BT