Streams are a new persistent and replicated data structure in RabbitMQ 3.9 which models an append-only log with non-destructive consumer semantics. They can be used as a regular AMQP 0.9.1 queue or through a dedicated binary protocol plugin and associated client(s).
This page covers the Stream plugin, which allows to interact with streams using this new binary protocol. For an overview of the concepts and the ways to operate streams, please see the guide on RabbitMQ streams.
Client libraries for the stream protocol are available on several platforms: Java, Go, .NET, Python (rbly, rstream), Erlang, Elixir, Rust.
The Stream plugin is included in the RabbitMQ distribution. Before clients can successfully connect, it must be enabled using rabbitmq-plugins:
rabbitmq-plugins enable rabbitmq_stream
When no configuration is specified the Stream Adapter will listen on all interfaces on port 5552 and have a default user login/passcode of
The port stream listener will listen on can be changed via
Below is a minimalistic configuration file which changes the listener port to 12345:
stream.listeners.tcp.1 = 12345
while one which changes the listener to listen only on localhost (for both IPv4 and IPv6) would look like:
stream.listeners.tcp.1 = 127.0.0.1:5552 stream.listeners.tcp.2 = ::1:5552
The plugin supports TCP listener option configuration.
The settings use a common prefix,
stream.tcp_listen_options, and control things such as TCP buffer sizes, inbound TCP connection queue length, whether TCP keepalives are enabled and so on. See the Networking guide for details.
stream.listeners.tcp.1 = 127.0.0.1:5552 stream.listeners.tcp.2 = ::1:5552 stream.tcp_listen_options.backlog = 4096 stream.tcp_listen_options.recbuf = 131072 stream.tcp_listen_options.sndbuf = 131072 stream.tcp_listen_options.keepalive = true stream.tcp_listen_options.nodelay = true stream.tcp_listen_options.exit_on_close = true stream.tcp_listen_options.send_timeout = 120
It is possible to set the maximum size of frames (default is 1 MiB) and the heartbeat (default is 60 seconds), if needed:
stream.frame_max = 2097152 # in bytes stream.heartbeat = 120 # in seconds
Fast publishers can overwhelm the broker if it cannot keep up writing and replicating inbound messages. So each connection has a maximum number of outstanding unconfirmed messages allowed before being blocked (
initial_credits, defaults to 50,000). The connection is unblocked when a given number of messages is confirmed (
credits_required_for_unblocking, defaults to 12,500). You can change those values according to your workload:
stream.initial_credits = 100000 stream.credits_required_for_unblocking = 25000
High values for these settings can improve publishing throughput at the cost of higher memory consumption (which can finally make the broker crash). Low values can help to cope with a lot of moderately fast-publishing connections.
The stream protocol allows to discover the topology of streams, that is where the leader and replicas for a given set of streams are located in the cluster. This way the client can choose to connect to the appropriate node to interact with the streams: the leader node to publish, a replica to consume. By default, nodes return their hostname and listener port, which may be fine for most situations, but not always (proxy sitting between the cluster nodes and the clients, cluster nodes and/or clients running in containers, etc).
advertised_port keys allow to specify which information a broker node returns when asked the topology of streams. One can set those settings according to their infrastructure, so that clients can connect to cluster nodes:
stream.advertised_host = rabbitmq-1 stream.advertised_port = 12345
The Connecting to Streams blog post covers why the
advertised_port settings are necessary in some deployments.
To use TLS for stream connections, TLS must be configured in the broker. To enable TLS-enabled stream connections, add a TLS listener for streams using the
stream.listeners.ssl.* configuration keys.
The plugin will use core RabbitMQ server certificates and key (just like AMQP 0-9-1 and AMQP 1.0 listeners do):
ssl_options.cacertfile = /path/to/tls/ca_certificate.pem ssl_options.certfile = /path/to/tls/server_certificate.pem ssl_options.keyfile = /path/to/tls/server_key.pem ssl_options.verify = verify_peer ssl_options.fail_if_no_peer_cert = true stream.listeners.tcp.1 = 5552 # default TLS-enabled port for stream connections stream.listeners.ssl.1 = 5551
This configuration creates a standard TCP listener on port 5552 and a TLS listener on port 5551.
When a TLS listener is set up it may be desired to disable all non-TLS ones. This can be configured like so:
stream.listeners.tcp = none stream.listeners.ssl.1 = 5551
Just like for plain connections, it is possible to configure advertised TLS host and port. When TLS is used, the plugin returns the following metadata:
advertised_host, or the hostname if
advertised_hostis not set
It is possible to override this behavior by setting together or individually the
advertised_tls_port configuration entries:
stream.advertised_host = private-rabbitmq-1 stream.advertised_port = 12345 stream.advertised_tls_host = public-rabbitmq-1 stream.advertised_tls_port = 12344