RabbitMQ supports MQTT 3.1.1 via a plugin that ships in the core distribution.
Key covered topics are:
and more.
As of 3.8, the plugin requires a quorum of cluster nodes to be present. This means two nodes out of three, three out of five and so on.
The plugin can also be used on a single node but does not support clusters of two nodes.
In case the majority of cluster nodes is down, remaining cluster nodes would not be able to accept new MQTT client connections.
There are other documented limitations.
The plugin must be enabled on all cluster nodes.
MQTT clients can interoperate with other protocols. All the functionality in the management UI and several other plugins can be used with MQTT, although there may be some limitations or the need to tweak the defaults.
The MQTT plugin is included in the RabbitMQ distribution. Before clients can successfully connect, it must be enabled using rabbitmq-plugins:
rabbitmq-plugins enable rabbitmq_mqtt
Now that the plugin is enabled, MQTT clients will be able to connect provided that they have a set of credentials for an existing user with the appropriate permissions.
For an MQTT connection to succeed, it must successfully authenticate and the user must have the appropriate permissions to the virtual host used by the plugin (see below).
MQTT clients can (and usually do) specify a set of credentials when they connect.
The plugin supports anonymous authentication but its use is highly discouraged and it is a subject to certain limitations (listed below) enforced for a reasonable level of security by default.
Users and their permissions can be managed using rabbitmqctl, management UI or HTTP API.
For example, the following commands create a new user for MQTT connections with full access to the default virtual host used by this plugin:
# username and password are both "mqtt-test" rabbitmqctl add_user mqtt-test mqtt-test rabbitmqctl set_permissions -p / mqtt-test ".*" ".*" ".*" rabbitmqctl set_user_tags mqtt-test management
Note that colons may not appear in usernames.
RabbitMQ MQTT plugin targets MQTT 3.1.1 and supports a broad range of MQTT clients. It also makes it possible for MQTT clients to interoperate with AMQP 0-9-1, AMQP 1.0, and STOMP clients. There is also support for multi-tenancy.
The plugin builds on top of RabbitMQ core protocol's entities: exchanges and queues. Messages published to MQTT topics use a topic exchange (amq.topic
by default) internally. Subscribers consume from RabbitMQ queues bound to the topic exchange. This both enables interoperability with other protocols and makes it possible to use the Management plugin to inspect queue sizes, message rates, and so on.
Note that MQTT uses slashes ("/") for topic segment separators and AMQP 0-9-1 uses dots. This plugin translates patterns under the hood to bridge the two, for example, cities/london
becomes cities.london
and vice versa. This has one important limitation: MQTT topics that have dots in them won't work as expected and are to be avoided, the same goes for AMQP 0-9-1 routing keys that contains slashes.
When an MQTT client provides no login credentials, the plugin uses the guest
account by default which will not allow non-localhost
connections. When connecting from a remote host, here are the options that make sure remote clients can successfully connect:
default_user
and default_pass
via MQTT plugin configuration to a non-guest
user who has the appropriate permissions.MQTT supports optional authentication (clients may provide no credentials) but RabbitMQ does not. Therefore a default set of credentials is used for anonymous connections.
The mqtt.default_user
and mqtt.default_pass
configuration keys are used to specify the credentials:
mqtt.default_user = some-user mqtt.default_pass = s3kRe7
It is possible to disable anonymous connections:
mqtt.allow_anonymous = false
If the mqtt.allow_anonymous
key is set to false
then clients must provide credentials.
The use of anonymous connections is highly discouraged and it is a subject to certain limitations (see above) enforced for a reasonable level of security by default.
MQTT 3.1 assumes two primary usage scenarios:
This section briefly covers how these scenarios map to RabbitMQ queue durability and persistence features.
Transient (QoS0) subscription use non-durable, auto-delete queues that will be deleted when the client disconnects.
Durable (QoS1) subscriptions use durable queues. Whether the queues are auto-deleted is controlled by the client's clean session flag. Clients with clean sessions use auto-deleted queues, others use non-auto-deleted ones.
For transient (QoS0) publishes, the plugin will publish messages as transient (non-persistent). Naturally, for durable (QoS1) publishes, persistent messages will be used internally.
Queues created for MQTT subscribers will have names starting with mqtt-subscription-
, one per subscription QoS level. The queues will have queue TTL depending on MQTT plugin configuration, 24 hours by default.
RabbitMQ does not support QoS2 subscriptions. RabbitMQ automatically downgrades QoS 2 publishes and subscribes to QoS
Starting with RabbitMQ 3.10, it is possible to opt in to use quorum queues for durable subscriptions using the mqtt.durable_queue_type
option.
This value must only be enabled for new clusters, before any clients declare durable subscriptions. Because queue type cannot be changed after declaration, if the value of this setting is changed for an existing cluster, clients with existing durable state would run into a queue type mismatch error and fail to subscribe.
Below is a rabbitmq.conf
example that opts in to use quorum queues for durable subscriptions:
# must ONLY be enabled for new clusters before any clients declare durable # subscriptions mqtt.durable_queue_type = quorum
While quorum queues are designed for data safety and predictable efficient recovery from replica failures, they also have downsides. A quorum queue by definition requires at least three replicas in the cluster. Therefore quorum queues take longer to declare and delete, and are not a good fit for environments with high client connection churn.
Quorum queues are a great fit for longer lived clients that actually care a great deal about the durability of their state.
As of RabbitMQ 3.8, this plugin requires a quorum (majority) of nodes to be online. This is because client ID tracking now uses a consensus protocol which requires a quorum of nodes to be online in order to make progress.
If a quorum of nodes is down or lost, the plugin won't be able to accept new client connections until the quorum is restored.
This also means that two node clusters are not supported since the loss of just one node out of two means the loss of a quorum of online nodes.
RabbitMQ's Raft implementation keeps a portion of the operation log in memory as well as on disk. In environments where the MQTT plugin is the only Raft-based feature used (namely where quorum queues are not used), reducing the portion of the log stored in memory will reduce memory footprint of the plugin in case of high connection churn.
The configuration key of interest is raft.wal_max_size_bytes
:
# if quorum queues are not used, configure a lower max WAL segment # limit compared to the default of 512 MiB, e.g. 64 MiB raft.wal_max_size_bytes = 67108864
If quorum queues are adopted at a later point, this setting should be revisited to be closer to the default one.
Here is a sample configuration that demonstrates a number of MQTT plugin settings:
mqtt.listeners.tcp.default = 1883 ## Default MQTT with TLS port is 8883 # mqtt.listeners.ssl.default = 8883 # anonymous connections, if allowed, will use the default # credentials specified here mqtt.allow_anonymous = true mqtt.default_user = guest mqtt.default_pass = guest mqtt.vhost = / mqtt.exchange = amq.topic # 24 hours by default mqtt.subscription_ttl = 86400000 mqtt.prefetch = 10
When no configuration is specified the MQTT plugin will listen on all interfaces on port 1883 and have a default user login/passcode of guest
/guest
.
To change the listener port, edit your Configuration file, to contain a tcp_listeners
variable for the rabbitmq_mqtt
application.
For example, a minimalistic configuration file which changes the listener port to 12345 would look like:
mqtt.listeners.tcp.1 = 12345
while one which changes the listener to listen only on localhost (for both IPv4 and IPv6) would look like:
mqtt.listeners.tcp.1 = 127.0.0.1:1883 mqtt.listeners.tcp.2 = ::1:1883
The plugin supports TCP listener option configuration.
The settings use a common prefix, mqtt.tcp_listen_options
, and control things such as TCP buffer sizes, inbound TCP connection queue length, whether TCP keepalives are enabled and so on. See the Networking guide for details.
mqtt.listeners.tcp.1 = 127.0.0.1:1883 mqtt.listeners.tcp.2 = ::1:1883 mqtt.tcp_listen_options.backlog = 4096 mqtt.tcp_listen_options.recbuf = 131072 mqtt.tcp_listen_options.sndbuf = 131072 mqtt.tcp_listen_options.keepalive = true mqtt.tcp_listen_options.nodelay = true mqtt.tcp_listen_options.exit_on_close = true mqtt.tcp_listen_options.send_timeout = 120
To use TLS for MQTT connections, TLS must be configured in the broker. To enable TLS-enabled MQTT connections, add a TLS listener for MQTT using the mqtt.listeners.ssl.*
configuration keys.
The plugin will use core RabbitMQ server certificates and key (just like AMQP 0-9-1 and AMQP 1.0 listeners do):
ssl_options.cacertfile = /path/to/ca_certificate.pem ssl_options.certfile = /path/to/server_certificate.pem ssl_options.keyfile = /path/to/server_key.pem ssl_options.verify = verify_peer ssl_options.fail_if_no_peer_cert = true # default TLS-enabled port for MQTT connections mqtt.listeners.ssl.default = 8883 mqtt.listeners.tcp.default = 1883
Note that RabbitMQ rejects SSLv3 connections by default because that protocol is known to be compromised.
See the TLS configuration guide for details.
RabbitMQ is a multi-tenant system at the core and every connection belongs to a virtual host. Some messaging protocols have the concept of vhosts, others don't. MQTT falls into the latter category. Therefore the MQTT plugin needs to provide a way to map connections to vhosts.
The vhost
option controls which RabbitMQ vhost the adapter connects to by default. The vhost
configuration is only consulted if no vhost is provided during connection establishment. There are several (optional) ways to specify the vhost the client will connect to.
First way is mapping MQTT plugin (TCP or TLS) listener ports to vhosts. The mapping is specified thanks to the mqtt_port_to_vhost_mapping
global runtime parameter. Let's take the following plugin configuration:
mqtt.listeners.tcp.1 = 1883 mqtt.listeners.tcp.2 = 1884 mqtt.listeners.ssl.1 = 8883 mqtt.listeners.ssl.2 = 8884 # (other TLS settings are omitted for brevity) mqtt.vhost = /
Note the plugin listens on ports 1883, 1884, 8883, and 8884. Imagine you want clients connecting to ports 1883 and 8883 to connect to the vhost1
virtual host, and clients connecting to ports 1884 and 8884 to connect to the vhost2
virtual host. You can specify port-to-vhost mapping by setting the mqtt_port_to_vhost_mapping
global parameter with rabbitmqctl
:
rabbitmqctl set_global_parameter mqtt_port_to_vhost_mapping \ '{"1883":"vhost1", "8883":"vhost1", "1884":"vhost2", "8884":"vhost2"}'
with rabbitmqctl.bat
on Windows:
rabbitmqctl.bat set_global_parameter mqtt_port_to_vhost_mapping ^ "{""1883"":""vhost1"", ""8883"":""vhost1"", ""1884"":""vhost2"", ""8884"":""vhost2""}"
and with the HTTP API:
PUT /api/global-parameters/mqtt_port_to_vhost_mapping # => {"value": {"1883":"vhost1", "8883":"vhost1", "1884":"vhost2", "8884":"vhost2"}}
If there's no mapping for a given port (because the port cannot be found in the mqtt_port_to_vhost_mapping
global parameter JSON document or if the global parameter isn't set at all), the plugin will try to extract the virtual host from the username (see below) and will ultimately use the vhost
plugin config option.
The broker queries the mqtt_port_to_vhost_mapping
global parameter value at connection time. If the value changes, connected clients are not notified or disconnected. They need to reconnect to switch to a new virtual host.
Another and more specific way to specify a vhost while connecting is to prepend the vhost to the username and to separate with a colon.
For example, connecting with
/:guest
is equivalent to the default vhost and username.
mqtt-vhost:mqtt-username
means connecting to the vhost mqtt-host
with username mqtt-username
.
Specifying the virtual host in the username takes precedence over the port-to-vhost mapping specified with the mqtt_port_to_vhost_mapping
global parameter.
The plugin can authenticate TLS-enabled connections by extracting a name from the client's TLS (x509) certificate, without using a password.
For safety the server must be configured with the TLS options fail_if_no_peer_cert
set to true
and verify
set to verify_peer
, to force all TLS clients to have a verifiable client certificate.
To switch this feature on, set ssl_cert_login
to true
for the rabbitmq_mqtt
application. For example:
mqtt.ssl_cert_login = true
By default this will set the username to an RFC4514-ish string form of the certificate's subject's Distinguished Name, similar to that produced by OpenSSL's "-nameopt RFC2253" option.
To use the Common Name instead, add:
ssl_cert_login_from = common_name
to your configuration.
Note that:
You can optionally specify a virtual host for a client certificate by using the mqtt_default_vhosts
global runtime parameter. The value of this global parameter must contain a JSON document that maps certificates' subject's Distinguished Name to their target virtual host. Let's see how to map 2 certificates, O=client,CN=guest
and O=client,CN=rabbit
, to the vhost1
and vhost2
virtual hosts, respectively.
Global parameters can be set up with rabbitmqctl
:
rabbitmqctl set_global_parameter mqtt_default_vhosts \ '{"O=client,CN=guest": "vhost1", "O=client,CN=rabbit": "vhost2"}'
With rabbitmqctl
, on Windows:
rabbitmqctl set_global_parameter mqtt_default_vhosts ^ "{""O=client,CN=guest"": ""vhost1"", ""O=client,CN=rabbit"": ""vhost2""}'
And with the HTTP API:
PUT /api/global-parameters/mqtt_default_vhosts # => {"value": {"O=client,CN=guest": "vhost1", "O=client,CN=rabbit": "vhost2"}}
Note that:
mqtt_default_vhosts
global parameter JSON document or if the global parameter isn't set at all), the virtual host specified by the vhost
plugin config option will be used.mqtt_default_vhosts
global parameter value at connection time. If the value changes, connected clients are not notified or disconnected. They need to reconnect to switch to a new virtual host.mqtt_default_vhosts
global parameter is considered more specific than the port-to-vhost mapping with the mqtt_port_to_vhost_mapping
global parameter and so takes precedence over it.The subscription_ttl
option controls the lifetime of non-clean sessions. This option is interpreted in the same way as the queue TTL parameter, so the value 86400000
means 24 hours. To disable the TTL feature, just set the subscription_ttl
to undefined
in the configuration file:
listeners.tcp.default = 5672 mqtt.default_user = guest mqtt.default_pass = guest mqtt.allow_anonymous = true mqtt.vhost = / mqtt.exchange = amq.topic mqtt.subscription_ttl = undefined mqtt.prefetch = 10 ...
Note that disabling queue TTL carries a risk: short-lived clients that don't use clean sessions can leave queues and messages behind, which will consume resources and require manual cleanup.
The prefetch
option controls the maximum number of unacknowledged messages that will be delivered. This option is interpreted in the same way as the AMQP 0-9-1 prefetch-count field, so a value of 0
means "no limit".
The exchange
option determines which exchange messages from MQTT clients are published to. If a non-default exchange is chosen then it must be created before clients publish any messages. The exchange is expected to be a topic exchange.
The MQTT plugin supports the proxy protocol. This feature is disabled by default, to enable it for MQTT clients:
mqtt.proxy_protocol = true
See the Networking Guide for more information about the proxy protocol.
Sparkplug is a specification that provides guidance for the design of an MQTT system. In Sparkplug, MQTT topics must start with spAvM.N
or spBvM.N
, where M
and N
are integers. This unfortunately conflicts with the way the RabbitMQ MQTT plugin translates MQTT topics into AMQP routing keys.
To solve this, the sparkplug
configuration entry can be set to true
:
mqtt.sparkplug = true
When the Sparkplug support is enabled, the MQTT plugin will not translate the spAvM.N
/spBvM.N
part of the names of topics.
See Consensus Features.
Overlapping subscriptions from the same client (e.g. /sports/football/epl/#
and /sports/football/#
) can result in duplicate messages being delivered. Applications need to account for this.
See Retained Messages above. Different retained message stores have different benefits, trade-offs, and limitations.
Before the plugin is disabled on a node, or a node removed from the cluster, it must be decommissioned using rabbitmqctl
:
rabbitmqctl decommission_mqtt_node <node>
The plugin supports retained messages. Message store implementation is pluggable and the plugin ships with two implementation out of the box:
rabbit_mqtt_retained_msg_store_ets
modulerabbit_mqtt_retained_msg_store_dets
Both implementations have limitations and trade-offs. With the first one, maximum number of messages that can be retained is limited by RAM. With the second one, there is a limit of 2 GB per vhost. Both are node-local (messages retained on one broker node are not replicated to other nodes in the cluster).
To configure the store, use rabbitmq_mqtt.retained_message_store
configuration key:
mqtt.default_user = guest mqtt.default_pass = guest mqtt.allow_anonymous = true mqtt.vhost = / mqtt.exchange = amq.topic mqtt.subscription_ttl = 1800000 mqtt.prefetch = 10 ## use DETS (disk-based) store for retained messages mqtt.retained_message_store = rabbit_mqtt_retained_msg_store_dets ## only used by DETS store mqtt.retained_message_store_dets_sync_interval = 2000 mqtt.listeners.ssl = none mqtt.listeners.tcp.default = 1883
The value must be a module that implements the store:
rabbit_mqtt_retained_msg_store_ets
for RAM-basedrabbit_mqtt_retained_msg_store_dets
for disk-basedThese implementations are suitable for development but sometimes won't be for production needs. MQTT 3.1 specification does not define consistency or replication requirements for retained message stores, therefore RabbitMQ allows for custom ones to meet the consistency and availability needs of a particular environment. For example, stores based on Riak and Cassandra would be suitable for most production environments as those data stores provide tunable consistency.
Message stores must implement the rabbit_mqtt_retained_msg_store
behaviour.