Overview

Streams are a new persistent and replicated data structure in RabbitMQ 3.9 which models an append-only log with non-destructive consumer semantics. They can be used via a RabbitMQ client library as if it was a queue or through a dedicated binary protocol plugin and associated client(s). The latter option is recommended as it provides access to all stream-specific features and offers best possible throughput (performance).

This page covers the concepts of streams, their usage, and their administration and maintenance operations. Please visit the Stream plugin page to learn more about the usage of streams with the binary RabbitMQ Stream protocol.

Use Cases

Streams were developed to initially cover 4 messaging use-cases that existing queue types either can not provide or provide with downsides:

  1. Large fan-outs

    When wanting to deliver the same message to multiple subscribers users currently have to bind a dedicated queue for each consumer. If the number of consumers is large this becomes potentially inefficient, especially when wanting persistence and/or replication. Streams will allow any number of consumers to consume the same messages from the same queue in a non-destructive manner, negating the need to bind multiple queues. Stream consumers will also be able to read from replicas allowing read load to be spread across the cluster.

  2. Replay / Time-travelling

    As all current RabbitMQ queue types have destructive consume behaviour, i.e. messages are deleted from the queue when a consumer is finished with them, it is not possible to re-read messages that have been consumed. Streams will allow consumers to attach at any point in the log and read from there.

  3. Throughput Performance

    No persistent queue types are able to deliver throughput that can compete with any of the existing log based messaging systems. Streams have been designed with performance as a major goal.

  4. Large logs

    Most RabbitMQ queues are designed to converge towards the empty state and are optimised as such and can perform worse when there are millions of messages on a given queue. Streams are designed to store larger amounts of data in an efficient manner with minimal in-memory overhead.

Usage

An AMQP 0.9.1 client library that can specify optional queue and consumer arguments will be able to use streams as regular AMQP 0.9.1 queues.

Just like queues, streams have to be declared first.

Declaring

To declare a stream, set the x-queue-type queue argument to stream (the default is classic). This argument must be provided by a client at declaration time; it cannot be set or changed using a policy. This is because policy definition or applicable policy can be changed dynamically but queue type cannot. It must be specified at the time of declaration.

The following snippet shows how to create a stream with the AMQP 0.9.1 Java client:

ConnectionFactory factory = new ConnectionFactory();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(
  "my-stream",
  true,         // durable
  false, false, // not exclusive, not auto-delete
  Collections.singletonMap("x-queue-type", "stream")
);

Declaring a queue with an x-queue-type argument set to stream will create a stream with a replica on each configured RabbitMQ node. Streams are quorum systems so uneven cluster sizes is strongly recommended.

A stream remains an AMQP 0.9.1 queue, so it can be bound to any exchange after its creation, just as any other RabbitMQ queue.

If declaring using management UI, the stream type must be specified using the queue type drop down menu.

Streams support 3 additional queue arguments that are best configured using a policy

  • x-max-length-bytes

Sets the maximum size of the stream in bytes. See retention. Default: not set.

  • x-max-age

Sets the maximum age of the stream. See retention. Default: not set.

  • x-stream-max-segment-size-bytes

Unit: bytes. A stream is divided up into fixed size segment files on disk. This setting controls the size of these. Default: (500000000 bytes).

The following snippet shows how to set the maximum size of a stream to 20 GB, with segment files of 100 MB:

Map<String, Object> arguments = new HashMap<>();
arguments.put("x-queue-type", "stream");
arguments.put("x-max-length-bytes", 20_000_000_000); // maximum stream size: 20 GB
arguments.put("x-stream-max-segment-size-bytes", 100_000_000); // size of segment files: 100 MB
channel.queueDeclare(
  "my-stream",
  true,         // durable
  false, false, // not exclusive, not auto-delete
  arguments
);

Client Operations

Consuming

As streams never delete any messages, any consumer can start reading/consuming from any point in the log. This is controlled by the x-stream-offset consumer argument. If it is unspecified the consumer will start reading from the next offset written to the log after the consumer starts. The following values are supported:

  • first - start from the first available message in the log
  • last - this starts reading from the last written "chunk" of messages (a chunk is the storage and transportation unit used in streams, put simply it is a batch of messages made of several to a few thousands of messages, depending on the ingress)
  • next - same as not specifying any offset
  • Offset - a numerical value specifying an exact offset to attach to the log at. If this offset does not exist it will clamp to either the start or end of the log respectively.
  • Timestamp - a timestamp value specifying the point in time to attach to the log at. It will clamp to the closest offset, if the timestamp is out of range for the stream it will clamp either the start or end of the log respectively. With AMQP 0.9.1, the timestamp used is POSIX time with an accuracy of one second, that is the number of seconds since 00:00:00 UTC, 1970-01-01.

The following snippet shows how to use the first offset specification:

channel.basicQos(100); // QoS must be specified
channel.basicConsume(
  "my-stream",
  false,
  Collections.singletonMap("x-stream-offset", "first"), // "first" offset specification
  (consumerTag, message) -> {
    // message processing
    // ...
   channel.basicAck(message.getEnvelope().getDeliveryTag(), false); // ack is required
  },
  consumerTag -> { });

The following snippet shows how to specify a specific offset to consume from:

channel.basicQos(100); // QoS must be specified
channel.basicConsume(
  "my-stream",
  false,
  Collections.singletonMap("x-stream-offset", 5000), // offset value
  (consumerTag, message) -> {
    // message processing
    // ...
   channel.basicAck(message.getEnvelope().getDeliveryTag(), false); // ack is required
  },
  consumerTag -> { });

The following snippet shows how to specify a specific timestamp to consume from:

// an hour ago
Date timestamp = new Date(System.currentTimeMillis() - 60 * 60 * 1_000)
channel.basicQos(100); // QoS must be specified
channel.basicConsume(
  "my-stream",
  false,
  Collections.singletonMap("x-stream-offset", timestamp), // timestamp offset
  (consumerTag, message) -> {
    // message processing
    // ...
   channel.basicAck(message.getEnvelope().getDeliveryTag(), false); // ack is required
  },
  consumerTag -> { });

Other

The following operations can be used in a similar way to classic and quorum queues but some have some queue specific behaviour.

Feature Comparison with Regular Queues

Streams are not really queues in the traditional sense and thus do not align very closely with AMQP 0.9.1 queue semantics. Many features that other queue types support are not supported and will never be due to the nature of the queue type.

An AMQP 0.9.1 client library that can use regular mirrored queues will be able to use streams as long as it uses consumer acknowledgements.

Many features will never be supported by streams due to their non-destructive read semantics.

Feature Matrix

Feature Classic Stream
Non-durable queues yes no
Exclusivity yes no
Per message persistence per message always
Membership changes automatic manual
TTL yes no (but see Retention)
Queue length limits yes no (but see Retention)
Lazy behaviour yes inherent
Message priority yes no
Consumer priority yes no
Dead letter exchanges yes no
Adheres to policies yes (see Retention)
Reacts to memory alarms yes no (uses minimal RAM)
Poison message handling no no
Global QoS Prefetch yes no

Non-durable Queues

Streams are always durable per their assumed use cases, they cannot be non-durable like regular queues.

Exclusivity

Streams are always durable per their assumed use cases, they cannot be exclusive like regular queues. They are not meant to be used as temporary queues.

Lazy Mode

Streams store all data directly on disk, after a message has been written it does not use any memory until it is read. Streams are inherently lazy, so to speak.

Global QoS

Streams do not support global QoS prefetch where a channel sets a single prefetch limit for all consumers using that channel. If an attempt is made to consume from a stream from a channel with global QoS enabled a channel error will be returned.

Use per-consumer QoS prefetch, which is the default in several popular clients.

Data Retention

Streams are implemented as an immutable append-only disk log. This means that the log will grow indefinitely until the disk runs out. To avoid this undesirable scenario it is possible to set a retention configuration per stream which will discard the oldest data in the log based on total log data size and/or age.

There are two parameters that control the retention of a stream. These can be combined. These are either set at declaration time using a queue argument or as a policy which can be dynamically updated.

  • max-age:

    valid units: Y, M, D, h, m, s

    e.g. 7D for a week

  • max-length-bytes:

    the max total size in bytes

NB: retention is evaluated on per segment basis so there is one more parameter that comes into effect and that is the segment size of the stream. The stream will always leave at least one segment in place as long as the segment contains at least one message. When using broker-provided offset-tracking, offsets for each consumer are persisted in the stream itself as non-message data.

Performance Characteristics

As streams persist all data to disks before doing anything it is recommended to use the fastest disks possible.

Due to the disk I/O-heavy nature of streams, their throughput decreases as message sizes increase.

Just like quorum queues, streams are also affected by cluster sizes. The more replicas a stream has, the lower its throughput generally will be since more work has to be done to replicate data and achieve consensus.

Controlling the Initial Replication Factor

The x-initial-cluster-size queue argument controls how many rabbit nodes the initial stream cluster should span.

Managing Stream Replicas

Replicas of a stream are explicitly managed by the operator. When a new node is added to the cluster, it will host no stream replicas unless the operator explicitly adds it to a replica set of a stream.

When a node has to be decommissioned (permanently removed from the cluster), it must be explicitly removed from the replica list of all streams it currently hosts replicas for.

Two CLI commands are provided to perform the above operations, rabbitmq-streams add_replica and rabbitmq-streams delete_replica:

rabbitmq-streams add_replica [-p <vhost>] <stream-name> <node>
rabbitmq-streams delete_replica [-p <vhost>] <stream-name> <node>

To successfully add and remove replicas the stream coordinator must be available in the cluster.

Care needs to be taken not to accidentally make a stream unavailable by losing the quorum whilst performing maintenance operations that involve membership changes.

Because the stream membership isn't embedded in the stream itself adding a replica cannot be made entirely safe at the current time. Hence if there at any time is an out of sync replica another replica cannot be added and an error will be returned.

When replacing a cluster node, it is safer to first add a new node, wait for it to become in-sync and then de-comission the node it replaces.

The replication status of a stream can be queried using the following command:

rabbitmq-streams stream_status [-p <vhost>] <stream-name>

Behaviour

Every stream has a primary writer and zero or more replicas.

A leader is elected when the cluster is first formed and later if the leader becomes unavailable.

Leader Election and Failure Handling

A stream requires a quorum of the declared nodes to be available to function. When a RabbitMQ node hosting a stream's leader fails or is stopped another node hosting one of that stream's replica will be elected leader and resume operations.

Failed and rejoining replicas will re-synchronise ("catch up") with the leader. In contrast to classic mirrored queues, a temporary replica failure does not require a full re-synchronization from the currently elected leader. Only the delta will be transferred if a re-joining replica is behind the leader. This "catching up" process does not affect leader availability.

Replicas must be explicitly added. When a new replica is added, it will synchronise the entire stream state from the leader, similarly to classic mirrored queues.

Fault Tolerance and Minimum Number of Replicas Online

Consensus systems can provide certain guarantees with regard to data safety. These guarantees do mean that certain conditions need to be met before they become relevant such as requiring a minimum of three cluster nodes to provide fault tolerance and requiring more than half of members to be available to work at all.

Failure tolerance characteristics of clusters of various size can be described in a table:

Cluster node count Tolerated number of node failures Tolerant to a network partition
1 0 not applicable
2 0 no
3 1 yes
4 1 yes if a majority exists on one side
5 2 yes
6 2 yes if a majority exists on one side
7 3 yes
8 3 yes if a majority exists on one side
9 4 yes

Data Safety

Streams replicate data across multiple nodes and publisher confirms are only issued once the data has been replicated to a quorum of stream replicas.

Streams always store data on disk, however, they do not explicitly flush (fsync) the data from the operating system page cache to the underlying storage medium, instead they rely on the operating system to do as and when required. This means that an uncontrolled shutdown of a server could result in data loss for replicas hosted on that node. Although theoretically this opens up the possibility of confirmed data loss, the chances of this happening during normal operation is very small and the loss of data on a single node would typically just be re-replicated from the other nodes in the system.

If more data safety is required then consider using quorum queues instead as no publisher confirms are issued until at least a quorum of nodes have both written and flushed the data to disk.

No guarantees are provided for messages that have not been confirmed using the publisher confirm mechanism. Such messages could be lost "mid-way", in an operating system buffer or otherwise fail to reach the stream leader.

Availability

A stream should be able to tolerate a minority of stream replicas becoming unavailable with no or little effect on availability.

Note that depending on the partition handling strategy used RabbitMQ may restart itself during recovery and reset the node but as long as that does not happen, this availability guarantee should hold true.

For example, a stream with three replicas can tolerate one node failure without losing availability. A stream with five replicas can tolerate two, and so on.

If a quorum of nodes cannot be recovered (say if 2 out of 3 RabbitMQ nodes are permanently lost) the queue is permanently unavailable and will most likely need operator involvement to be recovered.

Configuration

For stream protocol port, TLS and other configuration, see the Stream plugin guide.

Resource Use

Streams are typically more light-weight than mirrored and quorum queues.

All data is stored on disk with only unwritten data stored in memory.

Offset Tracking

When using the broker provided offset tracking features (currently only available when using the Stream plugin) offsets are persisted in the stream itself as non-message data. This means that as offset persistence is requested the stream will grow on disk by some small amount per offset persistence request.

Limitations

Streams internally store their messages as AMQP 1.0 encoded data. This means when publishing using AMQP 0.9.1 a conversion takes place. Although the AMQP 1.0 data model is mostly capable of containing all of AMQP 0.9.1's data model there are some limitations. If an AMQP 0.9.1 message contains header entries with complex values such as arrays or tables these headers will not be converted. That is because headers are stored as application properties inside the AMQP 1.0 message and these can only contain values of simple types, such as strings and numbers.

check-circle-line exclamation-circle-line close-line
Scroll to top icon