BT

Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ

Topics

Choose your language

InfoQ Homepage Articles Getting started with AMQP and RabbitMQ

Getting started with AMQP and RabbitMQ

Getting Started

The Advanced Message Queuing Protocol (AMQP1) is an application layer protocol specification for asynchronous messaging. Being built as a wire-level protocol instead of an API (like e.g. JMS2) AMQP clients should be capable of sending and receiving message regardless of their respective vendors. As of now there are already quite a number of server3and client4implementations on multiple platforms available.

While the original use case for the AMQP was to provide an interoperable messaging protocol for the financial industry the current standard aims to provide an universal construction kit for generic message queue architectures. In that sense concepts of message-oriented middleware (MOM) systems such as publish/subscribe queues are not implemented directly as first class citizens. Instead users are given the means to establish such concepts by wiring relatively simple AMQ entities together. These entities are also part of the specification and form a layer on top of the wire level protocol: the AMQP model. This model unifies messaging patterns such as the before mentioned publish/subscribe, queues, transactions and streaming while adding extra capabilities such as easily extensible, content-based routing.

Publish/subscribe in this context means the decoupling of producers and consumers: producers do not need to know based on what criteria consumers will receive messages. Queues are the structures holding messages in a first-in, first-out fashion. Routing encapsulates the decision which message will eventually turn up in the messages queues present in an asynchronous messaging system.

In this introduction I will try to illustrate the concepts introduced by this model using an AMQP client implementation5written by Aman Gupta in Ruby6. It uses a event-driven architecture (based on EventMachine7) and therefore may look and work a bit unfamiliar. However the design of the API shows the correspondence between the AMQ model entities in a very straightforward fashion and should be of value even for programmers not familiar with the Ruby language.

It should be noted that there are at least three more Ruby clients available 8, 9, 10. One of those clients, Carrot, has been explicitly written to get back to non-event-driven, synchronous Ruby while preserving much of the clean style in which the event-driven Ruby API is written.

The AMQP server used in this article is the RabbitMQ server written in Erlang11. It is an implementation of the release 0-8 of the AMQP specification12, targeting 0-9-1 in the near future.

One last thing before getting started: asynchronous messaging is a very common and widespread technology, ranging from the various instant messenger protocols such as Skype or XMPP/Jabber to the good old email. These services all share a couple of traits:

-They carry messages with some more or less arbitrary content (e.g. an email containing text and a PowerPoint presentation about office jokes) and some routing information (e.g. an email address) which is somewhat more formalized.

-They are asynchronous which means they have to decouple producer and consumer and thus may have to queue messages (e.g. someone sends you a chat message but you are offline or your mailbox receives an email).

-The producer and the consumer are distinct roles with different knowledge. I do not need to know your IMAP username and password in order to send you an email. In fact I don’t even need to know if you email address is an alias or a “real" address let alone that you use IMAP to access your mails. This distinction also means that a producer does not control which content is actually read/being subscribed to – exactly like my email application chooses to drop most of the unsolicited medical adverts that are send hourly to my mailbox.

The fact the AMQP is an abstract protocol (in the sense that it does not address one specific use case) does not make things much more complicated. On the contrary: the Internet made the metaphors and patterns of messaging ubiquitous. People generally are used to them and asynchronous messaging solves many problems in a straightforward and scalable fashion. As soon as the initial learning roadblock has been overcome modelling asynchronous messaging architectures in AMQ will not add any unnecessary complexity to it.

In order to get started with the examples you probably need to install some software. If you already have Ruby running on your system this should take less than 10 minutes. The RabbitMQ site also carries a lot of information13 to get you started as soon as possible. You will need:

-The Erlang/OTP package. See http://erlang.org/download.html for download and http://www.erlang.org/doc/installation_guide/part_frame.html for installation instructions.

-The RabbitMQ broker. See http://www.rabbitmq.com/download.html for download and http://www.rabbitmq.com/install.html for installation instructions.

-A Ruby VM. If there is no Ruby interpreter installed on your platform of choice you will probably want to download the Ruby MRI VM. See http://www.ruby-lang.org/en/ downloads/ for download options and installation instructions.

-Two Ruby "gems" (packaged libraries). The gem utility should be distributed along with your Ruby installation.

    - Update the gem utility itself if you did a fresh install/are not sure if it’s current or not. Type gem update --system. On BSD/UNIX systems you might need to perform this (and the following operations) as superuser.
    - Tell gem to search for packages on the Github site: gem sources -a http://gems.github.com
    - Install the AMQP gem: gem install tmm1-amqp. This should also install the event-machine gem.

Now all you need to do is to start the RabbitMQ server14.

The AMQ model

There are several entities described in the AMQ specification. One way to distinguish between them is whether they are configured by the server administrator or if they are declared on the fly by clients.

The configured entities are:

- The message broker server itself which listens to AMQ messages e.g. on a TCP/IP socket.

-The virtual host which partitions a message brokers data into distinct sets, very much alike the virtual host concept in webservers such as the Apache http daemon.

- The user which connects to a virtual host using credentials.

1 require 'rubygems'
2 require 'mq'
3
4 event_loop = Thread.new do
5   EM.run do
6     EM.add_timer(1) do
7       EM.stop
8     end
9   end
10 end
11
12 # connect to the rabbitmq demonstration broker server (http://www.rabbitmq.com/examples.html#demoserver)
13
14 AMQP.start :host => 'dev.rabbitmq.com', :port => 5672, :user => 'guest', :password => 'guest', :vhost
=> 'localhost'
15
16 event_loop.join

It is noteworthy that any access control beyond the granting of access for a specific user to a specific virtual host has been deprecated in the specification and is subsequently no longer supported by RabbitMQ. Consequently a vendor specific solution15 which addresses this gap is expected to arrive in the next major version of the server. The functionality however16 is already available via the Mercurial source code repositories default branch17 and is actively being used by several RabbitMQ users.

In order to operate on the broker a client opens one or more connections to it. These connections are always bound to combination of user and virtual host. The default assumed by the client implementation are the credentials guest/guest and the virtual host /. Both are part of any default installation of the RabbitMQ broker.

On a connection the client declares a channel. A channel serves as a logical connection inside of the network connection to the message broker. Such a multiplexing mechanism is required due to the stateful nature of some operations in the protocol. Subsequently concurrent access to a broker over a single connection should be modelled using a pool of channels with serialized access or e.g. thread-local channels in case of a threaded model of concurrency. The Ruby API used in the examples hides the channel management details from the user.

Operating on a channel the client is now ready to declare AMQ components. A declaration serves a an assertion that the specified component exists on the broker – if it does not, it is created on the fly.

These components are:

-The exchange which is the entity to which messages are sent.

-The queue which is the entity which receives messages.

-The binding which connect exchanges and queues and encapsulates routing information.

All these component have different properties but only the exchange and the queue are named. Knowing the name of an exchange grants a client the power to publish messages to it, knowing the name of a queue the capability to receive messages from it. Since there is no standard way to receive the names of all components using the AMQ protocol this implies that access to queues and exchanges can be facilitated or restricted by using well-known or secret names (see18 for an interesting association of this principle with access control).

Bindings have no names and are dependant on the lifecycle of the exchange and queue they tie together. If one of those two gets deleted the binding is also void. This implies that knowing the exchange and queue name is required to setup message routing.

A message is an opaque piece of data with properties. Among these properties are:

-Metadata such as the type of the content encoding or e.g a field identifying the producing application.

-Flags regarding delivery and/or storage guarantees for the message.

-A special field called routing key.

2.1 Receiving and sending messages: exchange types

Sending a message is a straightforward process. The client declares the exchange it wants to send the message to and publishes the message to it.

The easiest way to receive a message is to setup a subscription. In order to do that the client declares a queue and a binding between the previously declared exchange and that queue. On that binding a subscription can be set up.

1 require 'rubygems'
2 require 'mq'
3
4 event_loop = Thread.new do
5   EM.run do
6     EM.add_timer(1) do
7       EM.stop
8     end
9   end
10 end
11
12 def subscribe_to_queue
13
14   exchange = MQ.fanout('my-fanout-exchange')
15   queue = MQ.queue('my-fanout-queue')
16
17   queue.bind(exchange).subscribe do |header, body|
18     yield header, body
19   end
20
21 end
22
23 def send_to_exchange(message)
24
25   exchange = MQ.fanout('my-fanout-exchange')
26   exchange.publish message
27
28 end
29
30 subscribe_to_queue do |header, body|
31   p "I received a message: #{body}"
32 end
33
34 send_to_exchange 'Hello'
35 send_to_exchange 'World'
36
37 event_loop.join

Three things determine if a message is actually delivered to a queue:

  1. The type of the exchange. In this example the type is fanout.
  2. The properties of the message. In this example the message has no properties,it just contains a payload (first Hello, then World).
  3. The single, optional, property of the given binding: it’s key. In our example the binding has no key.

The type of the exchange determines it’s interpretation of the binding. As our example already hinted the fanout exchange does not interpret anything at all: it delivers messages to all the queues bound to it.

Without bindings the exchange would simply never deliver the message to a queue but just drop them. With a subscription on a queue in place the subscriber consumes the message, removing it from the queue.

The following exchange types are mentioned in the specification. I will describe them in order of increasing complexity:

- The direct exchange delivers messages when the routing-key property of the message is identical to the key property of the binding.

1 require 'rubygems'
2 require 'mq'
3
4 event_loop = Thread.new do
5   EM.run do
6     EM.add_timer(1) do
7       EM.stop
8     end
9   end
10 end
11
12 def subscribe_to_queue(key)
13
14   exchange = MQ.direct('my-direct-exchange')
15   queue = MQ.queue('my-direct-queue')
16
17   queue.bind(exchange, :key => key).subscribe do |header, body|
18     yield header, body
19   end
20
21 end
22
23 def send_to_exchange(message, key)
24
25   exchange = MQ.direct('my-direct-exchange')
26   exchange.publish message, :routing_key => key
27
28 end
29
30 subscribe_to_queue('hello_world') do |header, body|
31   p "I received a message: #{body}"
32 end
33
34 send_to_exchange 'Hello', 'hello_world'
35 send_to_exchange 'Cruel', 'ignored'
36 send_to_exchange 'World', 'hello_world'
37
38 event_loop.join

The topic exchange also takes the routing-key property of the message into account by performing a pattern match on it. It does this by splitting the character data of the routing key and binding key intowords. These words are strings seperated by dots. It also recognizes two wildcard characters in the binding key: # matches zero or more words and * matches one word. Example: the binding key *.stock.# matches the routing keys usd.stock and eur.stock.db but not stock.nasdaq.

1 require 'rubygems'
2 require 'mq'
3
4 event_loop = Thread.new do
5   EM.run do
6     EM.add_timer(1) do
7       EM.stop
8     end
9   end
10 end
11
12 def subscribe_to_queue(key)
13
14   exchange = MQ.topic('my-topic-exchange')
15   queue = MQ.queue('my-topic-queue')
16
17   queue.bind(exchange, :key => key).subscribe do |header, body|
18     yield header, body
19   end
20
21 end
22
23 def send_to_exchange(message, key)
24
25   exchange = MQ.topic('my-topic-exchange')
26   exchange.publish message, :routing_key => key
27
28 end
29
30 subscribe_to_queue('hello.*.message.#') do |header, body|
31   p ”I received a message: #{body}”
32 end
33
34 send_to_exchange 'Hello', 'hello.world.message.example.in.ruby'
35 send_to_exchange 'Cruel', 'cruel.world.message'
36 send_to_exchange 'World', 'hello.world.message'
37
38 event_loop.join

-There are other exchanges mentioned in the specification such as theheaders exchange (which matches based on application specific properties of a message which are marked as required or optional in the binding key), failover and system exchange types. However none of those types are implemented in the current release of RabbitMQ.

In contrast to queues, exchanges have been shown to have a type associated with them that dictates their routing behaviour (usually in collaboration with bindings). Since exchange are named entities the attempt to declare a previously existing exchange with a different type results in an error. Clients must delete such an exchange before declaring it again with the changed type.

Exchanges also do have properties. These are:

-Durability: if set to true the exchange will survive a broker restart.

-Auto deletion: if set to true the exchange will get deleted after all queues bound to it have been deleted.

-Passivity: this will not declare the exchange but will cause an exception if no such exchange is present.

2.2 Default exchanges and bindings

Each AMQP broker declares one instance of each supported exchange type on it’s own (for every virtual host). These exchanges are named after the their type with a prefix of amq., e.g. amq.fanout. The empty exchange name is an alias for amq.direct. For this default direct exchange (and only for that) the broker also declares a binding for every queue in the system with the binding key being identical to the queue name.

This behaviour implies that any queue on the system can be written into by publishing a message to the default direct exchange with it’s routing-key property being equal to the name of the queue.

2.3 Queue properties and multiple bindings

This default binding behaviour implies that multiple bindings can exist – from one or many queues to one or many exchanges. This enables the routing of messages send to different exchanges with different routing keys (or other message properties) into a single queue.

1 require 'rubygems'
2 require 'mq'
3
4 event_loop = Thread.new do
5   EM.run do
6     EM.add_timer(1) do
7       EM.stop
8     end
9   end
10 end
11
12 def subscribe_to_queue(*keys)
13
14   exchange = MQ.direct('my-direct-exchange')
15   queue = MQ.queue('my-direct-queue-with-multiple-bindings')
16
17   bindings = keys.map do |key|
18     queue.bind(exchange, :key => key)
19   end
20
21   bindings.last.subscribe do |header, body|
22     yield header, body
23   end
24
25 end
26
27 def send_to_exchange(message, key)
28
29   exchange = MQ.direct('my-direct-exchange')
30   exchange.publish message, :routing_key => key
31
32 end
33
34 subscribe_to_queue('foo', 'bar', 'wee') do |header, body|
35   p "I received a message: #{body}"
36 end

37
38 send_to_exchange 'Hello', 'foo'
39 send_to_exchange 'You', 'gee'
40 send_to_exchange 'Cruel', 'bar'
41 send_to_exchange 'World', 'wee'
42
43 event_loop.join

Apart from being named, queues also do carry properties. These are basically the same properties exchanges carry:

-Durability: if set to true the queue will survive a broker restart.

-Auto deletion: if set to true the queue will get deleted after all consumers have ceased using it.

-Passivity: this will not declare the queue but will cause an exception if no such queue is present.

-Exclusivity: if set to true this queue can only be used by the declaring consumer.

These properties can be used to create e.g. transient and private queues which are exclusive and auto-deleted. Such queues will automatically get removed by the broker if the connection of the declaring client breaks up – they serve as short lived connections to the broker and are useful e.g. for implementing RPC or synchronous communication over the AMQ.

An exemplary RPC in AMQP looks like this: the RPC client declares a reply queue with a unique name (e.g. an UUID19) and the properties auto-deleted and exclusive. Then it sends some request to some exchange and includes the name of the previously declared reply queue in the reply-to property of the message. The RPC server would answer to these requests by sending message to the default exchange, using the value in the reply-to field as routing key (relying on the above mentioned default bindings between the default exchange and all queues). Note that this is just a convention. Depending on the contract with the RPC server it could interpret any property (or even the body) of the message to determine where to reply.

Queues could also be e.g. persistent and shared or durable, non-autodeleted and not exclusive. Multiple consumers on such a queue would not receive individual copies of the messages send to it. Instead they would share the messages on the queue, removing them atomically while consuming.

2.4 Message delivery guarantees

Consumers implicitly or explicitly acknowledge the consumption of a message. When choosing to implicitly acknowledge, a message is considered consumed as soon as it is delivered to the consumer. Otherwise the client has explicitly to send an acknowledgement. Only if this is send, the message is considered received and will get removed from the queue. If not the broker will attempt to redeliver the message as soon as the channel20 on which the message was send before has been closed.

1 require 'rubygems'
2 require 'mq'
3
4 event_loop = Thread.new do
5   EM.run do
6     EM.add_timer(1) do
7       EM.stop
8     end
9   end
10 end
11
12 def subscribe_to_queue
13
14   exchange = MQ.fanout('my-fanout-exchange-with-acks')
15   queue = MQ.queue('my-fanout-queue-with-acks')
16
17   queue.bind(exchange).subscribe(:ack => true) do |header, body|
18     yield header, body
19     header.ack unless body == 'Cruel'
20   end
21
22 end
23
24 def send_to_exchange(message)
25
26   exchange = MQ.fanout('my-fanout-exchange-with-acks')
27   exchange.publish message
28
29 end
30
31 subscribe_to_queue do |header, body|
32   p "I received a message: #{body}"
33 end
34
35 send_to_exchange 'Hello'
36 send_to_exchange 'Cruel'
37 send_to_exchange 'World'
38
39 event_loop.join
40
41 __END__
42
43 First run:
44
45 "I received a message: Hello"
46 "I received a message: Cruel"
47 "I received a message: World"
48
49 Second run:
50
51 "I received a message: Cruel"
52 "I received a message: Hello"
53 "I received a message: Cruel"
54 "I received a message: World"
55
56 ... and so forth

A message producer can choose to get notified if the message send to an exchange will not get routed to a queue (read: no bindings exist) and/or if there is no consumer on the queue to deal with the message right away. These delivery guarantees can be enforced by setting the message properties mandatory and/or immediate to true.

Currently the Ruby AMQP API used for writing the examples in this article does not support these flags entirely. There are however at least two patches that demonstrate how such a support could look like on Github21, 22.

Additionally a producer can set the persistent property of a message to true. The broker will perform a best effort to store these message in a non-volatile storage which should survive a broker crash. Naturally it does not make much sense to route persistent messages to non-durable queues.

2.5 Congestion control

Consumption of messages was always a subscription in the shown examples. What about congestion control? The specification offers a QoS feature23 which limits the amount of message send to a consumer on a specific channel. Unfortunaly this feature is not yet available in a released version of RabbitMQ (it is planned of 1.6) but is in principal supported by the AMQP API.

As an alternative a client could opt to pull messages from the queue instead of getting it via a subscription. When using this approach congestion control can be implemented manually.

1 require 'rubygems'
2 require 'mq'
3
4 event_loop = Thread.new do
5   EM.run do
6     EM.add_timer(5) do
7       EM.stop
8     end
9   end
10 end
11
12 def subscribe_to_queue
13
14   exchange = MQ.fanout('my-fanout-exchange')
15   queue = MQ.queue('my-fanout-queue')
16
17   queue.bind(exchange).pop do |header, body|
18     yield header, body
19   end
20
21   EM.add_periodic_timer(0.25) do
22     queue.pop
23   end
24
25 end
26
27 def send_to_exchange(message)
28
29   exchange = MQ.fanout('my-fanout-exchange')
30   exchange.publish message
31
32 end
33
34 received = 0
35
36 subscribe_to_queue do |header, body|
37   p "I received a message: #{body}"
38 end
39
40 send_to_exchange 'Hello'
41 send_to_exchange 'World'
42
43 event_loop.join

An example model

Imagine you want to create a trivial chat application. It should feature:

-Chat – two users should be able to send each other messages.

-A friendship system – users should be able to control who can send them messages.

We assume two types of consumers on the broker: friendship servers and chat clients.

3.1 Becoming friends

To become friends with a user Bob, user Alice sends a message to the fanout exchange iends. We assume that this exchange is somehow access restricted24: ordinary users cannot bind queues to it. In the message she expresses her wish to befriend Bob.

A bunch of chat servers is present on the broker, consuming messages from a single, durable queue bound to the friends exchange. The queue has an unguessable name e.g. friends.298F2DBC6865-4225-8A73-8FF6175D396D which prevents chat client from consuming such messages – remember: without the knowledge of the name of a queue no subscriptions can be set up.

When one of the chat servers receives the message of Alice (only one of the processes will actually get the message since they all consume from the same queue) and decides that the request is valid it forwards the message (perhaps changed or augmented) to the default exchange (being direct and durable). It uses another unguessable routing key known only to the client Bob to do that. When signing in for the first time Bob (or one of the servers for that matter) had declared that queue, having the same name as the unguessable routing key (remember that default bindings exist for every queue on a virtual host to the default exchange).

The user Bob now gets asked by his chat client if he wants to be friends with Alice. Alice included a special property called reply-to in her request message – this property contained the unguessable name of a durable and exclusive friendship queue that Alice has declared for her future communication with Bob. If Bob wants to be friends with Alice he sends the default exchange a chat message using the name of the friendship queue as routing key. He also declares an exclusive and durable friendship queue and puts it’s name into the reply-to property.

Example: The friendship between Alice and Bob is represented by the queue B5725C4A-6621463E-AAF1-8222AA3AD601. This is the value of the routing-key property of Bobs messages to Alice, the name of the queue Bobs messages for Alice are turning up in and it is the value of the reply-to property of Alices messages to Bob.

Since these friendship queues are durable, messages sent while the chat users are offline are not getting lost. When logging in users would consume all the message waiting for them in their friends queues while subscribing to new messages on it.

Should Bob ever decide that he does not like Alice anymore he can simply delete the friendship queue he declared for Alice. Alice will notice this when she sends messages using the mandatory flag. This will cause the exchange to return her messages as not routable.

Things left unmentioned

There are many areas left untouchted by this introduction. Missing are e.g. transaction semantics, details about the re-routing of messages, the matching specifications of the header exchanges and details about the differences between the AMQP specifications – especially the model changes that are lying ahead in the version 1.0. For the sake of brevity a presence model for the chat example was also omitted.

The management perspective has been left out of the picture as it is not yet clear which direction AMQP in general and RabbitMQ in particular will take. Currently there is one topic exchange available in the reserved amq. namespace which gets all the log message of the broker. However the central tool to show the currently declared components and connected users is implemented in the administrative rabbitmqctl commandline interface instead of an AMQ entity.

1 require 'rubygems'
2 require 'mq'
3
4 PATH_TO_RABBITMQCTL = '/usr/local/sbin/rabbitmqctl'
5
6 event_loop = Thread.new { EM.run }
7
8 def subscribe_to_logger
9
10   random_name = (0...50).map{ ('a'..'z').to_a[rand(26)] }.join
11
12   exchange = MQ.topic('amq.rabbitmq.log')
13   queue = MQ.queue(random_name, :autodelete => true, :exclusive => true)
14   binding = queue.bind(exchange, :key => '#')
15
16   binding.subscribe do |header, body|
17     body.split("\n").each do |message|
18       yield header, message
19     end
20   end
21
22 end
23
24 def exchange_info(vhost = '/')
25   info :exchange, vhost, %w(name type durable auto_delete arguments)
26 end
27
28 def queue_info(vhost = '/')
29   info :queue, vhost, %w(name durable auto_delete arguments node messages_ready messages_unacknowledged
                            messages_uncommitted messages acks_uncommitted consumers transactions memory)
30 end
31
32 def binding_info(vhost = '/')
33   info :binding, vhost
34 end
35
36 def connection_info
37   info :exchange, nil, %w(node address port peer_address peer_port state channels user vhost timeout
                             frame_max recv_oct recv_cnt send_oct send_cnt send_pend)
38 end
39
40 def info(about, vhost = nil, items = [])
41
42   column_length = 20
43
44   puts "#{about} info\n"
45
46   cmd = "#{PATH_TO_RABBITMQCTL} list_#{about}s"
47   cmd << " -p #{vhost}" if vhost
48   cmd << " #{items.join(' ')} 2>&1"
49
50   pipe = IO.popen(cmd)
51
52   pipe.readlines.map { |line| line.chomp.split("\t").map { |item| item.ljust(column_length)[0,
                             column_length] } }.slice(1..-2).each do |exchange|
53     print exchange.join(' ') + "\n"
54 end
55
56 end
57
58 subscribe_to_logger do |message|
59   p "RabbitMQ logger: #{message}"
60 end
61
62 %w(connection exchange queue binding).each do |method|
63   self.send "#{method}_info".to_sym
64 end
65
66 event_loop.join

It should also be mentioned that there are already a few distribution frameworks available which use AMQP (or even RabbitMQ specifically). These frameworks (e.g. Nanite25 or Lizzy26) introduce more or less abstract layers on top of AMQP in order to facilitate operations such as the distribution of work over Ruby clients in a cluster.

4.1 Where to go next?

Apart from toying around with a locally installed broker the friendly and responsive mailing list27, 28 should be the first stop for learning more about AMQP and RabbitMQ. Apart from that you could have a look at the presentations and articles linked on the RabbitMQ homepage29, chat with members of the community using the #rabbitmq channel on the freenode30 IRC network or read one31 of32 the33 various blogs with RabbitMQ and/or AMQP related content, for starters the blog of LShift34, one of the founders of RabbitMQ. There are also quite a few AMQP and/or RabbitMQ specific tweets available on Twitter35 via the hashtags #rabbitmq and #amqp.

Have fun exploring and welcome to the world of asynchronous messaging!

Rate this Article

Adoption
Style

BT