AMQP

Current article describes AMQP 0-9-1 protocol version

AMQP Concepts

AMQP (Advanced Message Queue Protocol) - is a messaging protocol that enables conforming client applications to communicate with conforming messaging middleware brokers.

Message broker receives messages from publishers and routes them to consumers.

In AMQP, messages are published to exchanges that distribute message copies to the different queues using bindings (special rules). Then, the broker delivers those messages to consumers that are subscribed to queues they are interested in.

AMQP Example Routing

A Publisher might specify various message attributes (some kind of message metadata). Those attributes might be used by the broker or the consuming applications.

To prevent message receiving and processing failures, AMQP provides a mechanism of message acknowledgements. In this case, a consumer application notifies the broker once the message has been received and handled correctly. Only after that message broker can remove that message from its queue.

AMQP is a programmable protocol, which means that routing schemas and rules are defined by the applications themselves, not a message broker administrator. That gives the applications more freedom but requires an attentive configuration to avoid possible conflicts.

Exchanges

Exchanges are AMQP entities where messages are sent to. Exchanges are routing the messages to queues using routing mechanism that depends on Exchange Type and binding rule. There are 4 exchange types provided by AMQP message broker:

Exchange typeDefault pre-declared names
Direct(Empty string) and amq.direct
Fanoutamq.fanout
Topicamq.topic
Headersamq.match (and amq.headers in RabbitMQ)

AMQP Exchange Types

Exchanges are declared with several attributes:

  • Name - exchange name
  • Durability - if an exchange is restored after broker’s restart: durable or transient
  • Auto-delete - exchange is deleted once the last queue is unbound from it
  • Arguments - optional arguments for plugins and broker-specific features

Default Exchange

The Default exchange is a direct exchange without a name (empty string), pre-declared by the broker. Every created queue is bound to it automatically with a binding key which is the same as the queue name.

The default exchange makes it seem like it is possible to publish messages directly to queues, but technically it still uses exchange routing mechanism under the hood.

For instance, if we define a new queue called my-messages, the AMQP broker will bind it to the default exchange using “my-messages” binding key. Therefore, all the published messages to the default exchange with “my-message” routing key will be routed to the my-messages queue.

Direct Exchange

The message is routed to the queues whose binding key exactly matches the routing key of the message.

This type is ideal for the unicast message routing. It is often used to distribute tasks between multiple workers in a Round-robin manner.

  • A queue binds to the exchange using a routing key K
  • When a new message with routing key R is sent to the Direct exchange, it is routed to the queue if routing key is matched: K=R

AMQP Direct Exchange

Fanout Exchange

The message is routed to all the queues bound to it and the routing key is ignored.

AMQP Fanout Exchange

Topic Exchange

Does a pattern match between the message routing key and the routing pattern that was used to bind a queue to an exchange.

Topic exchange is used to implement various “pub/sub” pattern variations and the multicast routing of messages. Once the problem involves multiple consumers that need to selectively receive messages, topic exchange should be considered.

  • *: can substitute for exactly one word
  • #: can substitute for zero or more words

Example use cases:

  • Background task processing done by multiple workers that do own specific set of tasks
  • Orchestration of services of different kinds in the cloud
  • Data updates that involve categorization or tagging

Headers Exchange

Uses message header attributes for routing and ignores a routing key. Could be applied when the message needs to be routed based on multiple attributes rather than a single routing key. Message is routed to the queue if its header values are matched with the values specified upon the queue binding to the exchange.

As soon as it is possible to use multiple headers for matching, AMQP broker needs to know if it should consider messages with any of the headers matching, or all of them. To specify that behavior, there is an x-match binding argument which accepts all, any, any-with-x, all-with-x values.

Queue

Messages placed onto the queue are stored until the consumer retrieves them. Once receiver consumed the message successfully, consumer send ACK (acknowledgment) to the queue, then AMQP broker will delete the message from the queue.

Queues are sharing some properties with exchanges and have a few additional ones:

  • Name - queue name
  • Durability - if queue is restored after broker’s restart
  • Exclusive - if used only by single connection and deleted once connection is closed
  • Auto-delete - deleted when the last consumer unsubscribe from it
  • Arguments - optional arguments for plugins and broker-specific features

Before using a queue, it has to be declared. Queue declaration has no side effects if it already exists with the same attributes (overwise queue declaration will fail with 406 PRECONDITION_FAILED).

Queue Names

Queue names may be up to 255 bytes of UTF-8 characters.

Queue names started with amqp. are reserved by the broker (declaration of a such queue will fail with 403 ACCESS_REFUSED).

Queue Durability

If a queue is declared with durability: durable argument, its metadata will be stored on disk. For the durability: transient queue all the metadata is stored in memory.

If the data durability is important for the application, there are durable queues with persisted messages should be used.

Bindings

Bindings are AMQP entities that route published messages to appropriate queues. Binding might have an optional routing key attribute as an additional instruction, that is used in some exchange types.

To instruct an exchange E to route messages to a queue Q, Q has to be bound to E.

This additional layer allows implementing various routing scenarios between publisher and consumer. It also leads to system decoupling, when the consumers know nothing about the publisher and we don’t need to change publishing logic in case of adding new consumers to the system communication.

Consumers

AMQP broker stores messages in queues until the subscriber application will consume them. For consumers there are a few ways to retrieve a message:

  • Push-based approach (recommended) - subscribe to a queue and broker will deliver messages
  • Pull-based approach (not recommended) - open a pull connecting and retrieve messages by consumer itself

Messages

Message Acknowledgements

Consumer application may occasionally fail to process individual messages because of processing errors, application crush itself or network issues. To keep messages until they are successfully delivered and handled, AMQP provides two acknowledgements modes:

  • Broker sends a message to a consumer (using basic.deliver or basic.get-ok methods)

  • Consumer sends back an acknowledgement (using basic.ack method) Explicit model when consumer application decides when to send an acknowledgement: right after receiving a message or once the message were fully processed.

If a consumer dies without sending an acknowledgement, the message will be redelivered to another available consumer or stay in a queue until consumer will be restored, if there are no any.

Rejecting Messages

If the message processing has failed for whatever reason, the consumer can notify the broker about that by rejecting a message. When rejecting a message, a consumer might ask the broker to discard or requeue it.

Prefetching Messages

In AMQP it is possible to specify how many messages each consumer can be sent at once before sending the next acknowledgement. It might be considered as some kind of load balancing for the message batches.

Message Attributes and Payload

AMQP messages have attributes that are used by the AMQP broker to handle and route them. There are some examples:

  • Content type
  • Content encoding
  • Routing key
  • Delivery mode
  • Message priority
  • Message publishing timestamp
  • Expiration period
  • Publisher application id

Some attributes are optional and known as headers.

Messages also have a payload, that is treated as a byte array. The AMQP broker is not accessing or modifying the payload. Payload is optional and it is possible to publish a message without it.

Messages may be published with a persistent attribute that will instruct the broker to persist them to disk.

Connections

AMQP is an application-level protocol that uses TCP. The connection is typically long-lived and can be protected using TLS. For the disconnection, the application should gracefully close the AMQP connection instead of closing the underlying TCP connection.

Channels

The channel could be considered as a lightweight connection that shares a single TCP connection. It allows having multiple connections to the broker, but keeping only one TCP connection to reduce the usage of consumers system resources and simplify firewalls configuration.

Every protocol operation is performed on a channel and it’s completely separated from other channels. That’s why every protocol method needs the channel ID, to identify which channel the method is for.

A channel only exists in the context of connection and once the connection is closed, all its channels are closed altogether.

Virtual Hosts

AMQP includes the concept of virtual hosts that allows the broker to host multiple isolated environments with their own users, exchanges, queues, and other AMQP entities.

AMQP 0-9-1 structure

AMQP is structured as a number of methods. Protocol methods are grouped into classes.

For example, exchange class includes the following methods:

  • exchange.declare
  • exchange.declare-ok
  • exchange.delete
  • exchange.delete-ok

These operations are requests (sent by clients) and responses (sent by brokers).

To declare a new exchange, the client sends the request to the broker using exchange.declare method with several parameters. They allow the client to specify exchange properties.

AMQP Exchange Declare Methods

If the operation succeeds, the broker responds with the exchange.declare-ok method:

AMQP Exchange Declare Ok Methods

AMQP Extensions

AMQP provides a possibility for the implementation of various extensions, such as:

  • Custom exchange types - developing new routing schemas, for example, geo-based routing
  • Additional broker features - for example, per-queue message TTL
  • Additional plugins - for instance, UI broker management tool
  • Custom AMQP 0-9-1 methods - extend default AMQP API with additional logic

Examples

Resources

Top