BT

Practical MQTT with Paho

Posted by Dj Walker-Morgan on Nov 08, 2013 |

There is always a temptation when faced with a problem such as "This application needs to just send a value to another server" to reduce it to something as simple as opening a socket and sending a value. But that simple proposition soon falls apart in production. Apart from having to write the server end of the system, the developer then has to cope with the fact that networks are not 100% reliable and the wireless and mobile networks that surround us are unreliable by design and there'll most likely need to be access control and encryption.

Writing code to cope with that winds up with more complex, hard-to-test routines which are difficult to proof against the edge cases they will encounter. Worse still, the increase in complexity hasn't increased the functionality or interoperability. Faced with all that wouldn't it be better to start with an interoperable, featured protocol which already allows for all of those issues? This is where MQTT, MQ Telemetry Transport, comes in.

Why MQTT?

MQTT comes from the world of M2M (Machine to Machine) and the Internet of Things. There, devices can be as small as a sensor and controller connected over a wireless system. This environment drives the need for any protocol's implementation to be lightweight in terms of code footprint and system load, while taking care of that variable reliability connection problem.

MQTT was originally created by IBM's Andy Stanford-Clark and Arlen Nipper of Arcom (taken over later by Eurotech) as a complement to enterprise messaging systems so that a wealth of data outside the enterprise could be safely and easily brought inside the enterprise. MQTT is a publish/subscribe messaging system that allows clients to publish messages without concerning themselves about their eventual destination; messages are sent to an MQTT broker where they may be retained. The messages' payloads are just a sequence of bytes, up to 256MB, with no requirements placed on the format of those payloads and with the MQTT protocol usually adding a fixed header of two bytes to most messages.

Other clients can subscribe to these messages and get updated by the broker when new messages arrive. To allow for the variety of possible situations where MQTT can be put to use, it lets clients and brokers set a "Quality of Service" on a per-message basis from "fire and forget" to "confirmed delivery". MQTT also has a very light API, with all of five protocol methods, making it easy to learn and recall, but there's also support for SSL-encrypted connections and username/password authentication for clients to brokers.

Since making its debut, MQTT has proved itself in production scenarios. As well as standalone MQTT brokers, it has also been integrated into other message queuing brokers such as ActiveMQ and RabbitMQ, providing a bridge into the enterprise network. The most recent version of the specification MQTT 3.1 is being used as the basis for an OASIS standard for messaging telemetry, a basis that’s not expected to vary much, if at all, from the MQTT specification in order to maintain compatibility.

Why Paho?

MQTT is a protocol and protocols need client implementations. The Eclipse Paho project is part of the Eclipse Foundation's M2M mission to provide high quality implementations of M2M libraries and tools. Under the Paho banner, open source client libraries for MQTT are being curated and developed; there are already MQTT C and Java libraries with Lua, Python, C++ and JavaScript at various stages of development. In this article we'll be showing how to use the Paho Java MQTT libraries to publish and subscribe.

Diving deeper into MQTT

To start thinking about MQTT in code, here's the simplest use of the MQTT API:

client = new MqttClient("tcp://localhost:1883", "pahomqttpublish1");
client.connect();
MqttMessage message = new MqttMessage();
message.setPayload("A single message".getBytes());
client.publish("pahodemo/test", message);
client.disconnect();

In this snippet, we create a client connection to an MQTT broker running on the local host, over TCP to port 1883 (the default port for MQTT). Clients need to have an identifier that is unique for all clients connecting to the broker – in this case we give the client an id of pahomqttpublish1. We then tell the client to connect. Now we can create an MqttMessage and we set its payload to a simple string. Notice that we convert the string to bytes as setPayload only takes an array of bytes. We're relying on the default settings for MqttMessage to set the various other parameters. Next, we publish the message and it's here we need to introduce topics.

To avoid the obvious problem of every client getting every message published by every other client, MQTT messages are published with what are called topics. A topic is a structured string that defines a location in a namespace with "/" used to delimit levels of that namespace's hierarchy. A topic could be, for example, "/pumpmonitor/pumps/1/level" or "/stockmarket/prices/FOO". It's up to the developer to come up with a structure for topics which is appropriate to the task they are handling. Clients publish to an absolute topic with no ambiguity, but they can subscribe to a topic using wildcards to aggregate messages. A "+" represents one level of the implied hierarchy, while a "#" represents all the tree from that point on. Given the previous examples, one could subscribe to "pumpmonitor/pumps/1/level" for pump 1's level or "pumpmonitor/pumps/+/level" for all pump levels or even "pumpmonitor/pumps/#" for all pump activity.

In our short snippet we've published it to "pahodemo/test". Finally we disconnect from the broker and we've completed an MQTT session. But where can we publish the message to?

Getting a Broker

A broker in MQTT handles receiving published messages and sending them on to any clients who have subscribed. In our brief example, we connect to a broker running on the local system. Although there are a number of brokers available, the Mosquitto broker is by far the easiest to configure and run for MQTT-only work. It's also open source, so you can download it and run it on your own system, be it Windows, Mac OS X, Linux or many other platforms. The Mosquitto broker code is also being contributed to Eclipse as part of a new project.

The Eclipse Foundation is no stranger to Mosquitto – it runs a public instance of Mosquitto as an MQTT sandbox on m2m.eclipse.org so if you cannot download and run your own Mosquitto server you can change the connection URI in the example to "tcp://m2m.eclipse.org:1883". Do remember this is a shared sandbox, so publishing to a topic used in this article may well be over-written by someone else reading this article and running examples.

Mosquitto's default configuration means it is set up to not use username/password authentication and accepts all connections on port 1883. It also comes with two clients, mosquitto_pub and mosquitto_sub, the latter of which will be useful when you are debugging your applications. Running:

  mosquitto_sub -t "#" -v

will dump all new messages to the broker. Remember the quotes around the topic, especially with the "#" wildcard on Unix as, unquoted or unescaped, that marks the start of a comment and would see the rest of the command discarded. If you leave that command running and, in another window, run 'mosquitto_pub -t "mosquittodemo/test" -m "Hi"' then you should see the mosquitto_sub session list the message. We now have somewhere to publish to, so let’s get that code running.

In the IDE

To get our snippet of code running, we're going to use the Eclipse Maven support to handle dependencies. Create a new Java project and then select Configure → Convert to Maven project. First, as the Paho MQTT code isn't in Maven Central (yet), we need to include its repository – open the pom.xml file and after </version> add

<repositories>
  <repository>
    <id>paho-mqtt-client</id>
    <name>Paho MQTT Client</name>
    <url>https://repo.eclipse.org/content/repositories/paho-releases/</url>
  </repository>
</repositories>

Then we need to add the dependency for the Mqtt-client code. Still in the pom.xml file but this time, after </build>, add

<dependencies>
  <dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>mqtt-client</artifactId>
    <packaging>jar</packaging>
    <version>0.4.0</version>
  </dependency>
</dependencies>

Save pom.xml and create a new Java class, PahoDemo. It will basically be the required Java code to wrap around the snippet earlier and should look like this:

package org.eclipse.pahodemo;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;

public class PahoDemo {

  MqttClient client;
  
  public PahoDemo() {}

  public static void main(String[] args) {
    new PahoDemo().doDemo();
  }

  public void doDemo() {
    try {
      client = new MqttClient("tcp://localhost:1883", "pahomqttpublish1");
      client.connect();
      MqttMessage message = new MqttMessage();
      message.setPayload("A single message".getBytes());
      client.publish("pahodemo/test", message);
      client.disconnect();
    } catch (MqttException e) {
      e.printStackTrace();
    }
  }
}

And run this as a Java Application in Eclipse. If you still have mosquitto and mosquitto_sub running, you should see:

  pahodemo/test A single message

appear. We've now got a basic Paho MQTT publish client running and we can start exploring the various options available.

Message options

Each message in MQTT can have its quality of service and retain flag set. The quality of service advises the code if and how it should ensure the message arrives. There are three options, 0 (At Most Once),1 (At Least Once) and 2 (Exactly Once). By default, a new message instance is set to "At Least Once", a Quality of Service (QoS) of 1, which means the sender will deliver the message at least once and, if there's no acknowledgement of it, it will keep sending it with a duplicate flag set until an acknowledgement turns up, at which point the client removes the message from its persisted set of messages.

A QoS of 0, "At Most Once", is the fastest mode, where the client doesn't wait for an acknowledgement. This means, of course, that if there’s a disconnection or server failure, a message may be lost. At the other end of the scale is a QoS of 2, "Exactly Once", which uses two pairs of exchanges, first to transfer the message and then to ensure only one copy has been received and is being processed. This does make Exactly Once the slower but most reliable QoS setting.

The retain flag for an MqttMessage is set to false by default. This means that a broker will not hold onto the message so that any subscribers arriving after the message was sent will not see the message. By setting the retain flag, the message is held onto by the broker, so when the late arrivers connect to the broker or clients create a new subscription they get all the relevant retained messages.

Connection options

When connecting to the broker, there are a number of options that can be set which are encapsulated in the MqttConnectOptions class. These include the keep-alive interval for maintaining the connection with the broker, the retry interval for delivering messages, the connection timeout period, the clean session flag, the connection's will and, for the Java side of the code, which SocketFactory to use.

If we modify our client so it reads:

  import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
  ...
  MqttConnectOptions options;
  ...
    client = new MqttClient("tcp://localhost:1883", "pahomqttpublish2");
    options = new MqttConnectOptions();
    client.connect(options);

We can experiment with the connection options. For this example, the interesting options are the clean flag and the will. When messages are sent with a QoS above 0, steps need to be taken to ensure that when a client reconnects it doesn't repeat messages and resumes the previous session with the broker. But if you want to ensure that all that state information is discarded at connection and disconnection, you set the clean session flag to true. How does the broker identify clients you may ask? Through that client id is the answer and is also the reason why you need to ensure that client ids are different.

The will option allows clients to prepare for the worst. Despite being called a will, it is more like a "letter left with a lawyer in case something suspicious happens to me". The will consists of a message which will be sent by the broker if the client disappears without cleanly closing the connection. Like a normal message, there's a topic, payload, QoS setting and retain flag. So, if we want to record clients failing by sending out an unretained but assured delivery message we can change the code to read:

options = new MqttConnectOptions();
options.setWill("pahodemo/clienterrors", "crashed".getBytes(),2,true);
client.connect(options);

Run the code and you'll find no change. If you want to test this, insert a System.exit(1); before the client.disconnect to simulate an abnormal termination. We're now sending messages happily, but we don't know when they've been delivered and we haven't subscribed to a topic yet.

Delivery callbacks

The core of listening to MQTT activity in the Java API is the MqttCallback interface. It allows the API to call code we have specified when a message arrives, when delivery of a message is completed or when the connection is lost. If we add implements MqttCallback to our PahoDemo class declaration, the Eclipse IDE will assist us to add needed imports and offer to implement the required methods:

  import org.eclipse.paho.client.mqttv3.MqttCallback;
  import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;

  public void deliveryComplete(IMqttDeliveryToken token) {}
  public void messageArrived(String topic, MqttMessage message)
      throws Exception {}
  public void connectionLost(Throwable cause) {}

Now all we need to do is tell the MqttClient that we have done this by adding client.setCallback(this); before using it to connect to the broker. With this in place, let’s look at when these methods will be called.

The deliveryComplete() callback gets called when a message has been completely delivered as per its quality of service setting. That means, for a QoS of 0, when the message has been written to the network, for a QoS of 1, when the message publication has been acknowledged and for a QoS of 2 when the message publication has not only been acknowledged but confirmed to have been the only copy of the message delivered.

As there is a callback, a developer may wonder if the publish method is asynchronous or blocking. The answer is that it can be either as it is controlled by the MqttClient setting timeToWait. This sets how long, in milliseconds, any action by the client will wait before returning control to the rest of the application. By default, this is set to -1 which means never timeout and block till complete. If the code called client.setTimeToWait(100); then any call would return control to the application as soon as it had completed if it took less than 100 milliseconds, after 100 milliseconds or if there was a disconnection or shutdown. Calling client.getPendingDeliveryTokens() will return an array of tokens which contain information about messages that are currently "in-flight". Whichever way the timeToWait is set though, the deliveryComplete() method will still be called when a delivery is made.

Subscriptions

The messageArrived() callback method is the method invoked whenever any subscribed-to topic has received a message. The MqttClient's subscribe() and unsubscribe() methods set which topic's messages we are interested in. The simplest version is client.subscribe("topicfilter") which sets the subscription's quality of service to 1 as a default. We can of course set the QoS – client.subscribe("topicfilter", qos) – or subscribe with an array of filters and an optional array of QoS values to go with them. The QoS setting is, by the way, a maximum so that if you have subscribed with a QoS of 1, messages published with a QoS of 0 or 1 will be delivered at that QoS and messages published with a QoS of 2 will be delivered at a QoS of 1.

Once subscribed, messages will begin arriving at the messageArrived() callback method where the topic and MqttMessage are passed in as parameters. When in messageArrived(), newly arriving messages will be queued up and the acknowledgement for the message being processed will not be sent till the callback has cleanly completed. If you have complex processing of the message to do, copy and queue the data in some other mechanism to avoid blocking the messaging system.

Subscriptions are affected by the clean session flag used when establishing a connection. If a session has the clean setting set to false, the system should persist the subscriptions between sessions and shouldn’t need to resubscribe. With the clean flag set to true, the client will have to resubscribe when reconnecting. When a client does subscribe to a topic, it will receive all the retained values that match the topic they are requesting, even if the subscription’s topic query is in part or in whole intersecting with a previous subscription.

One important point to note is that we have, for simplicity, only covered the synchronous version of the API where every call to the MQTT API blocks and the only thing that comes through on its own schedule are inbound messages from subscriptions. This version of the API, MqttClient, is a thin wrapper around the more powerful asynchronous version of the API, MqttAsyncClient, where all calls do not block, giving their results either by the application monitoring a token which is returned by the call or by the completed action calling back to a class that implements an IMqttActionListener interface. When you progress further into developing MQTT-based applications, it is worth considering whether using the synchronous API or the asynchronous API is more appropriate for your case.

Serving statistics via MQTT

To wrap up, we are going to show how little of the MQTT API you need to add functionality to a Java application. In this case, we'll use the example Jetty FileServer.java example from the Jetty documentation. If we wanted to count the number of times the page handler handled requests we'd simply extend the ResourceHandler class, add the counting code and make the server use that enhanced handler instead of the default one. In this case we also want to add in some counting functionality and start and stop an MQTT client:

class CountingResourceHandler extends ResourceHandler {

  int req_count=0;
  MqttClient client;
  
  public CountingResourceHandler() {
    super();
  }
  
  @Override
  public void doStart() throws Exception {
    super.doStart();
    // Create the MqttClient connection to the broker
    client=new MqttClient("tcp://localhost:1883", MqttClient.generateClientId());
    client.connect();
  }
  
  public void handle(String target, Request baseRequest,
HttpServletRequest request, HttpServletResponse response) 
throws IOException, ServletException {
    super.handle(target, baseRequest, request, response);

    // Increment the count
    req_count++;
    try {
      // Publish to the broker with a QoS of 0 but retained
      client.publish("countingjetty/handlerequest", Integer.toString(req_count).getBytes(),0,true );
    } catch (MqttException e) {
      e.printStackTrace();
    } 
  }
  
  @Override
  public void doStop() throws Exception {
    super.doStop();
    // Cleanly stop the Mqtt client connection
    client.disconnect();
  }
}

This is not a scalable example as it has the MqttClient bound to the resource handler, but if you incorporate this into the Jetty example, then whenever a request is handled by the servlet, it will publish that count to, in this case, a broker on localhost. The clientid is generated here with MqttClient.generateClientId(), which will use the loggedin user name and time of day to try and ensure non-clashing client ids.

Remember though that the recovery of sessions depends on the client id being the same between connections and here, unless we recorded and reused it, the client id will be different for every run. By default, the MqttClient opens a “clean” session; don’t use generateClientId() with a clean session set to “false” otherwise, every time the client starts up, debris from previous sessions will be left in the broker because it can’t tidy up as there’s no matching clientid to tidy up against.

Also notice we are publishing the statistics with a QoS of 0, because we aren't worried about the stats being delivered, but we are also setting the retain flag to true so that the broker will remember the most recently delivered value for any clients who subscribe to the statistics.

Wrapping up

So, MQTT and the Paho project gives us a flexible, lightweight protocol with Java and C and Lua and other implementations which can be easily tuned to a range of use cases and doesn't place requirements on how we pass data across it. It’s a powerful tool and we haven't even started looking at it in the environment it was designed for, in the Internet of Things connecting sensors to servers - we'll come to that in our next part of Practical MQTT with Paho.

About the Author

Dj Walker-Morgan has been writing code since the early 80s and writing about software since the 90s. Developing in everything from 6502 to Java and working on projects from enterprise-level network management to embedded devices.

 

 

 Note*: This article was commissioned and paid for by the Eclipse Foundation.

Hello stranger!

You need to Register an InfoQ account or to post comments. But there's so much more behind being registered.

Get the most out of the InfoQ experience.

Tell us what you think

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

Email me replies to any of my messages in this thread

Misleading sentence by Marc Cohen

The following sentence:
A "+" represents one or more levels of the implied hierarchy, while a "#" represents all the tree from that point on.
Is a little bit misleading. The "+" represents exactly one level in the topic tree hierarchy.

How to read MQTT messages from subscribed topics? by C. Roland

How can I read messages from a subscribed topic? For some reason there is no documentation out thereabout this...

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

Email me replies to any of my messages in this thread

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

Email me replies to any of my messages in this thread

2 Discuss

Educational Content

General Feedback
Bugs
Advertising
Editorial
InfoQ.com and all content copyright © 2006-2013 C4Media Inc. InfoQ.com hosted at Contegix, the best ISP we've ever worked with.
Privacy policy
BT