BT

Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ

Topics

Choose your language

InfoQ Homepage Articles Extending Oozie

Extending Oozie

In our previous articles [1, 2] we described Oozie workflow server and presented several workflow examples. We also described deployment and configuration of workflow for Oozie and tools for starting, stoping and monitoring Oozie workflows.

In this article we will show how to leverage Oozie extensibility to implement custom orchestration language extensions.

Why Custom Nodes?

As we have explained in [1], out of the box, Oozie provides a “minimal” workflow language, which contains only a handful of control and actions nodes. Although one of the action nodes is java action node, which allows invoking any java class with the main method from Oozie workflow, such approach is not always an optimal one. One of the reasons being is that a java action is executed in the Hadoop cluster as map-reduce job with a single Mapper task. On one hand, this provides a lot of advantages:

  • Build in scalability and failover support of map/reduce framework alleviate the necessity of building these features into Oozie.
  • Externalizing execution make Oozie engine more light weight, which allows it to support more concurrently running processes

On another hand, this approach has a few drawbacks:

  • Starting every java node as a mapper task incurs an overhead of starting a new JVM in Hadoop cluster.
  • Externalizing java class execution creates additional network traffic for synchronizing this execution with Oozie server.
  • Passing parameters from java code becomes fairly expensive operation

Although, in the case of relatively long running (minutes, hours) map/reduce or Pig jobs, the advantages by far out weight drawbacks, in the case of simple java nodes (see [2]) overhead of externalized execution is much harder to justify. So one of the reasons to use custom action nodes is to support execution of light weight java classes[1] directly inside Oozie execution context.

The other reason to use custom actions is to improve workflows semantics/readability. As Oozie is a workflow engine for supporting orchestration of Hadoop – based processing, its language semantics is very Hadoop execution centric – Hadoop file system, map/reduce, Pig, etc. This semantic suites well Hadoop developers, but does not say much about functionality of the given action. It is possible to come up with the business – related naming conventions for actions themselves, but this only partially solves the problem – action naming reflects only semantic of a given process, not the subject area as a whole nor does it solve the issue of action parameters, which remain to be meaningful only to the developers.

Fortunately Oozie support a very elegant extensibility mechanism – custom action nodes [3], which allows to easily solve both problems. Custom action nodes allow to extend Oozie’ language with additional actions (verbs). Oozie Action Nodes can be synchronous or asynchronous.

  • Synchronous Node - are executed inline by Oozie, which waits for completion of these nodes before proceeding. These nodes are meant for lightweight tasks like custom calculations, FileSystem move, mkdir, delete.
  • Asynchronous Nodes – are started by Oozie, but are executed outside of Oozie engine, monitoring the action being executed for completion. This is done via a callback from the action or Oozie polling for the action status.

Implementing Oozie custom action handler

In this example we will convert a standalone mailer, presented in [2] into custom email action (Listing 1).

package com.navteq.assetmgmt.oozie.custom;

import java.util.Properties;
import java.util.StringTokenizer;

import javax.mail.Message;
import javax.mail.Session;
import javax.mail.Transport;
import javax.mail.internet.InternetAddress;
import javax.mail.internet.MimeMessage;

import org.apache.oozie.ErrorCode;
import org.apache.oozie.action.ActionExecutor;
import org.apache.oozie.action.ActionExecutorException;
import org.apache.oozie.action.ActionExecutorException.ErrorType;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.util.XmlUtils;
import org.jdom.Element;
import org.jdom.Namespace;

public class EmailActionExecutor extends ActionExecutor {

         private static final String NODENAME = "eMail";

         private static final String SUCCEEDED = "OK";
  private static final String FAILED = "FAIL";
  private static final String KILLED = "KILLED";

         private static final String DEFAULMAILSERVER = "imailchi.navtech.com";
         private static final String EMAILSERVER = "emailServer";
         private static final String SUBJECT = "emailSubject";
         private static final String MESSAGE = "emailBody";
         private static final String FROM = "emailFrom";
         private static final String TO = "emailTo";

  public EmailActionExecutor() {
    super(NODENAME);
}

         @Override
         public void check(Context context, WorkflowAction action) throws ActionExecutorException {

                  // Should not be called for synch operation
                  throw new UnsupportedOperationException();
         }

         @Override
         public void end(Context context, WorkflowAction action)throws ActionExecutorException {

                  String externalStatus = action.getExternalStatus();
                  WorkflowAction.Status status = externalStatus.equals(SUCCEEDED) ?
                         WorkflowAction.Status.OK : WorkflowAction.Status.ERROR;
                  context.setEndData(status, getActionSignal(status));
         }

         @Override
         public boolean isCompleted(String arg0) {

                  return true;
         }

         @Override
         public void kill(Context context, WorkflowAction action) throws ActionExecutorException {

context.setExternalStatus(KILLED);
context.setExecutionData(KILLED, null);
         }

         @Override
         public void start(Context context, WorkflowAction action) throws ActionExecutorException {

                  // Get parameters from Node configuration
                  try{
                           Element actionXml = XmlUtils.parseXml(action.getConf());
                           Namespace ns = Namespace.getNamespace("uri:custom:email-action:0.1");

                           String server = actionXml.getChildTextTrim(EMAILSERVER, ns);
                           String subject = actionXml.getChildTextTrim(SUBJECT, ns);
                           String message = actionXml.getChildTextTrim(MESSAGE, ns);
                           String from = actionXml.getChildTextTrim(FROM, ns);
                           String to = actionXml.getChildTextTrim(TO, ns);

                           // Check if all parameters are there
                           if(server == null)
                                    server = DEFAULMAILSERVER;
                           if((message == null) || (from == null) || (to == null))
                                    throw new ActionExecutorException(ErrorType.FAILED,
   ErrorCode.E0000.toString(), "Not all parameters are defined");

                           // Execute action synchronously
                           SendMail(server, subject, message, from, to);
                           context.setExecutionData(SUCCEEDED, null);
                }
                catch(Exception e){
     context.setExecutionData(FAILED, null);
                       throw new ActionExecutorException(ErrorType.FAILED,
   ErrorCode.E0000.toString(), e.getMessage());
                }
     }

     // Sending an email
     public void SendMail(String server, String subject, String message,
                           String from, String to) throws Exception {

                // create some properties and get the default Session
                Properties props = new Properties();
                props.setProperty("mail.smtp.host", server);
                Session session = Session.getDefaultInstance(props, null);

                // create a message
                Message msg = new MimeMessage(session);

                // set the from and to address
                InternetAddress addressFrom = new InternetAddress(from);
                msg.setFrom(addressFrom);

                // To is a comma separated list
                StringTokenizer st = new StringTokenizer(to, ",");
                String [] recipients = new String[st.countTokens()];
                int rc = 0;
                while(st.hasMoreTokens())
                        recipients[rc++] = st.nextToken();
                InternetAddress[] addressTo = new InternetAddress[recipients.length];
                for (int i = 0; i < recipients.length; i++){
                        addressTo[i] = new InternetAddress(recipients[i]);
                }
                msg.setRecipients(Message.RecipientType.TO, addressTo);

                // Setting the Subject and Content Type
                msg.setSubject(subject);
                msg.setContent(message, "text/plain");
                Transport.send(msg);
     }
}

Listing 1: Email Custom Action

This implementation extends ActionExecutor[2] class (provided by Oozie) and overrides the required methods. Because sending of email is very quick operation, we have decided to implement it as a synchronous action handler, which means that it is executed within Oozie execution context.

Our implementation (Listing 1) follows Oozie documentation and implements all required methods:

  • No arguments constructor is required for any custom actions handler. This constructor registers the action handler name (invoking super with the action name) that will be used inside workflow XML.
  • InitActionType[3] metod can be used to register possible exceptions while executing the action, along with their type and error message and do initial initialization for the executor itself.
  • Start method is used to start action execution. Because we have implemented synchronous action the whole action is executed here. This method is invoked by Oozie with two parameters Context and WorkflowAction. Context provides access to Oozie workflow execution context which, among other things contains workflow variables and provides very simple APIs (set, get) for manipulating them[4]. WorkflowAction provides Oozie’ definition of the current action.
  • Check method, which is used by Oozie to check action’s status. Should never be called for synchronous actions.
  • Kill method which is used to kill the running job or action.
  • End method is used for any cleanup or processing which may need to be done after completion of the action. It also has to set the result of the execution.

Deploying and using Oozie custom action handler

With custom action execution implementation in place, it is necessary to define the XML schema for our new email action[5] (Listing 2)

<?xml version="1.0" encoding="UTF-8"?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" 
                 xmlns:email="uri:custom:email-action:0.1"
     elementFormDefault="qualified"
     targetNamespace="uri:custom:email-action:0.1">
<xs:complexType name="EMAIL">
       <xs:sequence>
                 <xs:element name="emailServer" type="xs:string" minOccurs="0" maxOccurs="1" />
                 <xs:element name="emailSubject" type="xs:string" />
                 <xs:element name="emailFrom" type="xs:string" />
                 <xs:element name="emailTo" type="xs:string" />
                 <xs:element name="emailBody" type="xs:string" />
       </xs:sequence>
</xs:complexType>

<xs:element name="eMail" type="email:EMAIL"></xs:element>
</xs:schema>

Listing 2: XML schema for email component

Both custom action code and XML schema have to be packaged in a single jar, for example, emailAction. Jar. An Oozie's oozie-setup.sh script can be used to add this (and any other) jars to Oozie's war with the following command (Listing 3):

$ bin/oozie-setup.sh -jars emailAction.jar:mail.jar (See Adding Jars to Oozie) 

Listing 3: Deployment command

Adding Jars to Oozie

Be aware that oozie-setup.sh command line recommended by Cloudera will rebuild your Oozie war file and if you are using the web page to monitor your jobs, you will lose the java script extensions. In testing we had difficulty in including both the -extjs and the –jars option. For expediency, we copied in our jar files in to ${CATALINA_BASE}/webapps/oozie/WEB-INF/lib where ${CATALINA_BASE} is /var/lib/oozie/oozie-server. Please note that there is a danger that if someone else rebuilds the war file, you will lose these extensions and they will have to be added manually. For testing, we recommend copying in your jars, however for production and implementation, we recommend adding the jars to the war file.

Now we need to register information about custom executor with Oozie runtime. This is done by extending oozie-site.xml[6]. Registration of the custom action itself can be done by adding/changing “oozie.service.ActionService.executor.ext.classes”[7] in the oozie configuration file oozie-site.xml (Listing 4)

……………………………………
<property>
       <name>oozie.service.ActionService.executor.ext.classes</name>
       <value>com.navteq.assetmgmt.oozie.custom. EmailActionExecutor </value>
</property>
……………………………………

Listing 4: Custom execution configuration

The XML schema for the new Actions (Listing 2) should be added to oozie-site.xml, under the property “oozie.service.WorkflowSchemaService.ext.schemas”[8] (Listing 5).

………………………………………
<property>
       <name>oozie.service.SchemaService.wf.ext.schemas</name>
       <value> emailAction.xsd</value>
</property>
…………………………………

Listing 5: Custom schema configuration

Finally, once Tomcat is restarted, the custom action node can be used in workflows.

To test our implementation we have created a simple workflow (Listing 6), which sends an email using our executor.

<!--
Copyright (c) 2011 NAVTEQ! Inc. All rights reserved.
Test email Oozie Script
-->
<workflow-app xmlns='uri:oozie:workflow:0.1' name='emailTester'>
 <start to='simpleEmail'/>
 <action name='simpleEmail'>
   <eMail xlmns=“ uri:custom:email-action:0.1”>
     <emailSubject>test</emailSubject>
     <emailFrom>mike.segel@<mycompany>.com</emailFrom>
     <emailTo>boris.lublinsky@<mycompany>.com</emailTo>
     <emailMessage>This is a test message, if you can see this, Mikey did something right! :)</emailMessage>
   </eMail>
  <error to="fail"/>
  <ok to="end"/>
 </action>
 <kill name="fail">
   <message>Workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
 </kill>
 <end name='end'/>
</workflow-app>

Listing 6: Simple workflow using email custom action executor

Conclusion

In this article we have shown how to extend Oozie by creating custom action executors. Doing this allows for defining and implementing of a department/enterprise specific Oozie dialect (domain specific language), aligned with the department/enterprise functionality. Such domain specific language can simplify building processes for a given department/enterprise and improve their readability.

Granted Oozie is still relatively immature, it provides a basic framework for handling processes that are comprised of multiple map/reduce jobs along with the ability to add non map-reduce jobs to the overall business process. As more users work with Oozie and provide feedback, we believe that it has the potential to become a powerful and integral part of the Hadoop environment.

There is significantly more to Oozie than we have covered in these three small articles. We are hoping that they will get you sufficiently interested in Oozie workflow engine and will serve as a good starting point for your further Oozie exploration.

References

  1. Boris Lublinsky, Mike Segel. Introduction to Oozie.
  2. Boris Lublinsky, Mike Segel. Oozie by Example
  3. Oozie custom Action Nodes 
  4. Oozie source code 

About the Authors

Boris Lublinsky is principal architect at NAVTEQ, where he is working on defining architecture vision for large data management and processing and SOA and implementing various NAVTEQ projects. He is also an SOA editor for InfoQ and a participant of SOA RA working group in OASIS. Boris is an author and frequent speaker, his most recent book "Applied SOA".

Michael Segel has spent the past 20+ years working with customers identifying and solving their business problems. Michael has worked in multiple roles, in multiple industries. He is an independent consultant who is always looking to solve any challenging problems. Michael has a Software Engineering degree from the Ohio State University.


[1] The examples of such classes can be various counters operations, simple calculations, etc.

[2] All Oozie action executors, which are part of Oozie distribution are implemented by extending this class

[3] In our implementation we are using default implementation, which is why it does not appear in the code. Take a look at Oozie source code [4] to see how this method is implemented in the existing Oozie action handlers

[4] There are two options of configuring custom executor – workflow variables and/or action configuration. In our example we have shown the latter, but in reality it is virtually always combination of the two

[5] Make sure to define not only complex type definition, but also the element definition. That is what OOzie expects

[6] Typically called oozie-default.xml in the oozie distribution.

[7] For multiple Executors, the class names should be separated by commas.

[8] For multiple executions a comma separated list for multiple schemas can be used.

 

 
 

 

Rate this Article

Adoption
Style

BT