Designing and Developing Cross-Cutting Features
Every developer has had to integrate with another system, API or component. Tis article provides strategies to handle the change and for he separating system boundaries.
The content has been bookmarked!
There was an error bookmarking this content! Please retry.

Posted by Boris Lublinsky, Mike Segel on Aug 02, 2011
In our previous articles [1, 2] we described Oozie workflow server and presented several workflow examples. We also described deployment and configuration of workflow for Oozie and tools for starting, stoping and monitoring Oozie workflows.
In this article we will show how to leverage Oozie extensibility to implement custom orchestration language extensions.
As we have explained in [1], out of the box, Oozie provides a “minimal” workflow language, which contains only a handful of control and actions nodes. Although one of the action nodes is java action node, which allows invoking any java class with the main method from Oozie workflow, such approach is not always an optimal one. One of the reasons being is that a java action is executed in the Hadoop cluster as map-reduce job with a single Mapper task. On one hand, this provides a lot of advantages:
On another hand, this approach has a few drawbacks:
Although, in the case of relatively long running (minutes, hours) map/reduce or Pig jobs, the advantages by far out weight drawbacks, in the case of simple java nodes (see [2]) overhead of externalized execution is much harder to justify. So one of the reasons to use custom action nodes is to support execution of light weight java classes[1] directly inside Oozie execution context.
The other reason to use custom actions is to improve workflows semantics/readability. As Oozie is a workflow engine for supporting orchestration of Hadoop – based processing, its language semantics is very Hadoop execution centric – Hadoop file system, map/reduce, Pig, etc. This semantic suites well Hadoop developers, but does not say much about functionality of the given action. It is possible to come up with the business – related naming conventions for actions themselves, but this only partially solves the problem – action naming reflects only semantic of a given process, not the subject area as a whole nor does it solve the issue of action parameters, which remain to be meaningful only to the developers.
Fortunately Oozie support a very elegant extensibility mechanism – custom action nodes [3], which allows to easily solve both problems. Custom action nodes allow to extend Oozie’ language with additional actions (verbs). Oozie Action Nodes can be synchronous or asynchronous.
In this example we will convert a standalone mailer, presented in [2] into custom email action (Listing 1).
package com.navteq.assetmgmt.oozie.custom;import java.util.Properties;
import java.util.StringTokenizer;import javax.mail.Message;
import javax.mail.Session;
import javax.mail.Transport;
import javax.mail.internet.InternetAddress;
import javax.mail.internet.MimeMessage;import org.apache.oozie.ErrorCode;
import org.apache.oozie.action.ActionExecutor;
import org.apache.oozie.action.ActionExecutorException;
import org.apache.oozie.action.ActionExecutorException.ErrorType;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.util.XmlUtils;
import org.jdom.Element;
import org.jdom.Namespace;public class EmailActionExecutor extends ActionExecutor {
private static final String NODENAME = "eMail";
private static final String SUCCEEDED = "OK";
private static final String FAILED = "FAIL";
private static final String KILLED = "KILLED";private static final String DEFAULMAILSERVER = "imailchi.navtech.com";
private static final String EMAILSERVER = "emailServer";
private static final String SUBJECT = "emailSubject";
private static final String MESSAGE = "emailBody";
private static final String FROM = "emailFrom";
private static final String TO = "emailTo";public EmailActionExecutor() {
super(NODENAME);
}@Override
public void check(Context context, WorkflowAction action) throws ActionExecutorException {// Should not be called for synch operation
throw new UnsupportedOperationException();
}@Override
public void end(Context context, WorkflowAction action)throws ActionExecutorException {String externalStatus = action.getExternalStatus();
WorkflowAction.Status status = externalStatus.equals(SUCCEEDED) ?
WorkflowAction.Status.OK : WorkflowAction.Status.ERROR;
context.setEndData(status, getActionSignal(status));
}@Override
public boolean isCompleted(String arg0) {return true;
}@Override
public void kill(Context context, WorkflowAction action) throws ActionExecutorException {context.setExternalStatus(KILLED);
context.setExecutionData(KILLED, null);
}@Override
public void start(Context context, WorkflowAction action) throws ActionExecutorException {// Get parameters from Node configuration
try{
Element actionXml = XmlUtils.parseXml(action.getConf());
Namespace ns = Namespace.getNamespace("uri:custom:email-action:0.1");String server = actionXml.getChildTextTrim(EMAILSERVER, ns);
String subject = actionXml.getChildTextTrim(SUBJECT, ns);
String message = actionXml.getChildTextTrim(MESSAGE, ns);
String from = actionXml.getChildTextTrim(FROM, ns);
String to = actionXml.getChildTextTrim(TO, ns);// Check if all parameters are there
if(server == null)
server = DEFAULMAILSERVER;
if((message == null) || (from == null) || (to == null))
throw new ActionExecutorException(ErrorType.FAILED,
ErrorCode.E0000.toString(), "Not all parameters are defined");// Execute action synchronously
SendMail(server, subject, message, from, to);
context.setExecutionData(SUCCEEDED, null);
}
catch(Exception e){
context.setExecutionData(FAILED, null);
throw new ActionExecutorException(ErrorType.FAILED,
ErrorCode.E0000.toString(), e.getMessage());
}
}// Sending an email
public void SendMail(String server, String subject, String message,
String from, String to) throws Exception {// create some properties and get the default Session
Properties props = new Properties();
props.setProperty("mail.smtp.host", server);
Session session = Session.getDefaultInstance(props, null);// create a message
Message msg = new MimeMessage(session);// set the from and to address
InternetAddress addressFrom = new InternetAddress(from);
msg.setFrom(addressFrom);// To is a comma separated list
StringTokenizer st = new StringTokenizer(to, ",");
String [] recipients = new String[st.countTokens()];
int rc = 0;
while(st.hasMoreTokens())
recipients[rc++] = st.nextToken();
InternetAddress[] addressTo = new InternetAddress[recipients.length];
for (int i = 0; i < recipients.length; i++){
addressTo[i] = new InternetAddress(recipients[i]);
}
msg.setRecipients(Message.RecipientType.TO, addressTo);// Setting the Subject and Content Type
msg.setSubject(subject);
msg.setContent(message, "text/plain");
Transport.send(msg);
}
}
Listing 1: Email Custom Action
This implementation extends ActionExecutor[2] class (provided by Oozie) and overrides the required methods. Because sending of email is very quick operation, we have decided to implement it as a synchronous action handler, which means that it is executed within Oozie execution context.
Our implementation (Listing 1) follows Oozie documentation and implements all required methods:
With custom action execution implementation in place, it is necessary to define the XML schema for our new email action[5] (Listing 2)
<?xml version="1.0" encoding="UTF-8"?> <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" xmlns:email="uri:custom:email-action:0.1" elementFormDefault="qualified" targetNamespace="uri:custom:email-action:0.1"> <xs:complexType name="EMAIL"> <xs:sequence> <xs:element name="emailServer" type="xs:string" minOccurs="0" maxOccurs="1" /> <xs:element name="emailSubject" type="xs:string" /> <xs:element name="emailFrom" type="xs:string" /> <xs:element name="emailTo" type="xs:string" /> <xs:element name="emailBody" type="xs:string" /> </xs:sequence> </xs:complexType><xs:element name="eMail" type="email:EMAIL"></xs:element>
</xs:schema>
Listing 2: XML schema for email component
Both custom action code and XML schema have to be packaged in a single jar, for example, emailAction. Jar. An Oozie's oozie-setup.sh script can be used to add this (and any other) jars to Oozie's war with the following command (Listing 3):
$ bin/oozie-setup.sh -jars emailAction.jar:mail.jar (See Adding Jars to Oozie)
Listing 3: Deployment command
|
Adding Jars to Oozie Be aware that oozie-setup.sh command line recommended by Cloudera will rebuild your Oozie war file and if you are using the web page to monitor your jobs, you will lose the java script extensions. In testing we had difficulty in including both the -extjs and the –jars option. For expediency, we copied in our jar files in to ${CATALINA_BASE}/webapps/oozie/WEB-INF/lib where ${CATALINA_BASE} is /var/lib/oozie/oozie-server. Please note that there is a danger that if someone else rebuilds the war file, you will lose these extensions and they will have to be added manually. For testing, we recommend copying in your jars, however for production and implementation, we recommend adding the jars to the war file. |
Now we need to register information about custom executor with Oozie runtime. This is done by extending oozie-site.xml[6]. Registration of the custom action itself can be done by adding/changing “oozie.service.ActionService.executor.ext.classes”[7] in the oozie configuration file oozie-site.xml (Listing 4)
…………………………………… <property> <name>oozie.service.ActionService.executor.ext.classes</name> <value>com.navteq.assetmgmt.oozie.custom. EmailActionExecutor </value> </property> ……………………………………
Listing 4: Custom execution configuration
The XML schema for the new Actions (Listing 2) should be added to oozie-site.xml, under the property “oozie.service.WorkflowSchemaService.ext.schemas”[8] (Listing 5).
……………………………………… <property> <name>oozie.service.SchemaService.wf.ext.schemas</name> <value> emailAction.xsd</value> </property> …………………………………
Listing 5: Custom schema configuration
Finally, once Tomcat is restarted, the custom action node can be used in workflows.
To test our implementation we have created a simple workflow (Listing 6), which sends an email using our executor.
<!--
Copyright (c) 2011 NAVTEQ! Inc. All rights reserved.
Test email Oozie Script
-->
<workflow-app xmlns='uri:oozie:workflow:0.1' name='emailTester'>
<start to='simpleEmail'/>
<action name='simpleEmail'>
<eMail xlmns=“ uri:custom:email-action:0.1”>
<emailSubject>test</emailSubject>
<emailFrom>mike.segel@<mycompany>.com</emailFrom>
<emailTo>boris.lublinsky@<mycompany>.com</emailTo>
<emailMessage>This is a test message, if you can see this, Mikey did something right! :)</emailMessage>
</eMail>
<error to="fail"/>
<ok to="end"/>
</action>
<kill name="fail">
<message>Workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name='end'/>
</workflow-app>
Listing 6: Simple workflow using email custom action executor
In this article we have shown how to extend Oozie by creating custom action executors. Doing this allows for defining and implementing of a department/enterprise specific Oozie dialect (domain specific language), aligned with the department/enterprise functionality. Such domain specific language can simplify building processes for a given department/enterprise and improve their readability.
Granted Oozie is still relatively immature, it provides a basic framework for handling processes that are comprised of multiple map/reduce jobs along with the ability to add non map-reduce jobs to the overall business process. As more users work with Oozie and provide feedback, we believe that it has the potential to become a powerful and integral part of the Hadoop environment.
There is significantly more to Oozie than we have covered in these three small articles. We are hoping that they will get you sufficiently interested in Oozie workflow engine and will serve as a good starting point for your further Oozie exploration.
Boris Lublinsky is principal architect at NAVTEQ, where he is working on defining architecture vision for large data management and processing and SOA and implementing various NAVTEQ projects. He is also an SOA editor for InfoQ and a participant of SOA RA working group in OASIS. Boris is an author and frequent speaker, his most recent book "Applied SOA".
Michael Segel has spent the past 20+ years working with customers identifying and solving their business problems. Michael has worked in multiple roles, in multiple industries. He is an independent consultant who is always looking to solve any challenging problems. Michael has a Software Engineering degree from the Ohio State University.
[1] The examples of such classes can be various counters operations, simple calculations, etc.
[2] All Oozie action executors, which are part of Oozie distribution are implemented by extending this class
[3] In our implementation we are using default implementation, which is why it does not appear in the code. Take a look at Oozie source code [4] to see how this method is implemented in the existing Oozie action handlers
[4] There are two options of configuring custom executor – workflow variables and/or action configuration. In our example we have shown the latter, but in reality it is virtually always combination of the two
[5] Make sure to define not only complex type definition, but also the element definition. That is what OOzie expects
[6] Typically called oozie-default.xml in the oozie distribution.
[7] For multiple Executors, the class names should be separated by commas.
[8] For multiple executions a comma separated list for multiple schemas can be used.
I get the following error on Oozie 3.0.0 release:
Error: E0701 : E0701: XML schema error, cvc-complex-type.2.4.a: Invalid content was found starting with element 'eMail'. One of '{"uri:oozie:workflow:0.1":map-reduce, "uri:oozie:workflow:0.1":pig, "uri:oozie:workflow:0.1":ssh, "uri:oozie:workflow:0.1":sub-workflow, "uri:oozie:workflow:0.1":fs, "uri:oozie:workflow:0.1":http, "uri:oozie:workflow:0.1":email, "uri:oozie:workflow:0.1":java, WC[##other:"uri:oozie:workflow:0.1"]}' is expected.
I configured both the oozie.service.SchemaService.wf.ext.schemas and oozie.service.WorkflowSchemaService.wf.ext.schemas properties as described in your article and the official Oozie documentation (which should i be using)
Any advice?
Every developer has had to integrate with another system, API or component. Tis article provides strategies to handle the change and for he separating system boundaries.
Alex Russell talks about the shortcomings of the web platform and how it is evolving in order to adress them. He also explains about how browsers are improving and shares his vision on things to come.
Jeff Lindsay discusses creating distributed and concurrent systems using ZeroMQ – a lightweight message queue-, and gevent – a coroutine-based networking library.
Brian Ketelsen introduces Skynet, a platform for polyglot, distributed and composable services that communicate with each other over RPC/JSON.
Carin Meier tells the story of Alice discovering Monads, meeting three types of monads – Identity, Maybe, State-, and learning how to implement them in Clojure.
The need for agile, queryable, reliable, scalable storage without the pain of SQL schema migration is real. This article uses MongoDB to introduce NoSQL concepts to Java, PHP, and Python developers.
Jérôme Giraud introduces Wink Toolkit, an open source mobile JavaScript framework for HTML5 web or hybrid apps, showing widgets and interactions.
Greg Wilson and Christophe Coenraets demo Adobe Edge, a motion and interaction tool, CSS Regions and Shaders, and PhoneGap.
1 comment
Watch Thread Reply