VMware Tanzu RabbitMQ supports continuous schema definition and message replication to a remote cluster, which makes it easy to run a standby cluster for disaster recovery.

This feature is not available in the open source RabbitMQ distribution.

To achieve Standby replication, two plugins must be enabled and configured individually:

  1. Continuous Schema Replication
  2. Standby Message Replication

Continuous Schema Replication

This guide covers WAN-friendly replication of schema between RabbitMQ clusters. This feature is typically used for setting up one or more standby (passive) disaster recovery clusters.

The plugin's replication model has a number of features and limitations:

  • It transfers definitions (schema) but not enqueued messages
  • Syncing happens periodically, so with volatile topologies followers will always be trailing behind the leader. With a sync interval of thirty seconds the lag will usually be within one minute.
  • The schema (virtual hosts, users, queues, and so on) on the follower side is replaced with that on the leader side
  • All communication between the sides is completely asynchronous and avoids introducing cluster co-dependencies
  • Except for the initial import, definitions are transferred and imported incrementally
  • Definitions are transferred in a compressed binary format to reduce bandwidth usage
  • Links to other clusters are easily to configure and reason about, in particular during a disaster recovery event

In case of a disaster event the recovery process involves several steps:

  • A standby cluster will be promoted to active by the service operator
  • Applications will be redeployed or reconfigured to connect to the newly promoted cluster
  • Other standby clusters have to be reconfigured to follow the newly promoted cluster

As explained later in this guide, promotion and reconfiguration happen on the fly, and do not involve RabbitMQ node restarts or redeployment.

Enabling the Plugin

As any other RabbitMQ plugin, rabbitmq_schema_definition_sync must be enabled before it can be used. Usually this plugin should be pre-configured or enabled as --offline before node start:

rabbitmq-plugins enable rabbitmq_schema_definition_sync --offline

How the Syncing Works

Upstreams and Downstreams

The plugin has two sides on a schema replication link (connection):

  • A source cluster, a.k.a. "origin", a.k.a. upstream (borrowing a term from the Federation plugin)
  • A destination cluster, a.k.a. follower, a.k.a. downstream

There can be multiple downstreams for an upstream; this document primarily discusses a single upstream, single downstream scenario for the sake of simplicity.

Downstreams connect to their upstream and periodically initiate sync operations. These operations synchronise the schema on the downstream side with that of the upstream, with some safety mechanisms (covered later in this guide).

A node running in the downstream mode (a follower) can be converted to an upstream (leader) on the fly. This will make it disconnect from its original source, therefore stopping all syncing. The node will then continue operating as a member of an independent cluster, no longer associated with its original upstream.

Such conversion is called a promotion and should be performed in case of a disaster recovery event.

Sync Operations

A sync operation is a request/response sequence that involves:

  • A sync request sent by the downstream
  • A sync response sent back by the upstream

A sync request carries a payload that allows the upstream to compute the delta between the schemas. A sync response carries the delta plus all the definitions that are only present on the upstream side or conflict. Downstream will use this information to apply the definitions. Any entities only present on the downstream will be deleted. This is to make sure that downstreams follow their upstream's schema as closely as possible, with some practical limits (discussed further in this guide).

Downstreams connect to their upstream using AMQP 1.0. This has a few benefits:

  • All communication is asynchronous, there is no coupling, a standby can run a different (e.g. newer) version of RabbitMQ
  • Authentication is identical to that of applications
  • No additional ports need to be open

Loose Coupling

The upstream and its followers (downstreams) are loosely connected. If one end of a schema replication connection fails, the delta between clusters' schema will grow but neither will be affected in any other way. If an upstream is under too much load to serve definition request or the sync plugin is unintentionally disabled, the downstream simply won't receive responses for sync requests for a period of time.

If a downstream fails to apply definitions, the upstream is not affected and neither its downstream peers. Therefore availability of both sides of the synchronisation links does not depend on that on the other end.

When multiple downstreams are syncing from a shared upstream, they do not interfere or coordinate with each other. Both sides have to do a little bit more work. On the upstream side, this load is shared between all cluster nodes. On the downstream side, the load should be minimal in practice, assuming that sync operations are applied successfully, so the delta does not accumulate.

Configuration

Upstream

A node participating in schema definition syncing must be provided with two pieces of configuration:

  • What mode it operates in, upstream (leader) or downstream (passive follower)
  • Upstream connection endpoints

This is true for both upstreams and downstreams.

The mode must be provided in the config file. Supported values are upstream and downstream, respectively:

# source cluster
cluster_name = eu.1

# this node will run as an upstream (source) for
# schema replication
schema_definition_sync.operating_mode = upstream

Upstream nodes also need to have a list of connection endpoints. An upstream node will connect to the first reachable node. Providing a list makes schema replication more resilient to node failures on the upstream side.

Connection endpoints and credentials are configured using runtime parameters. This makes it possible to reconfigure them without a node restart:

# This virtual host will be used for schema replication
rabbitmqctl add_vhost rabbitmq_schema_definition_sync

# Create a user and grant it permissions to the virtual host that will be
# used for schema replication.
# This command is similar to 'rabbitmqctl add_user' but also grants full permissions
# to the virtual host used for definition sync.
rabbitmqctl add_schema_replication_user "schema-replicator" "s3kRe7"

# specify local (upstream cluster) nodes and credentials to be used
# for schema replication
rabbitmqctl set_schema_replication_upstream_endpoints '{"endpoints": ["a.rabbitmq.eu-1.local:5672","b.rabbitmq.eu-1.local:5672","c.rabbitmq.eu-1.local:5672"], "username": "schema-replicator", "password": "s3kRe7"}'

To verify replication status of a running node, use rabbitmqctl schema_replication_status:

rabbitmqctl schema_replication_status

Downstreams

Downstream configuration is generally similar to that of the upstream side:

# follower cluster
cluster_name = eu.2

# this node will run as a downstream (follower) for
# schema replication
schema_definition_sync.operating_mode = downstream

Just like for upstream nodes, a list of upstream hosts and connection credentials must be provided:

# specify upstream cluster nodes and credentials to be used
# for schema replication
rabbitmqctl set_schema_replication_upstream_endpoints '{"endpoints": ["a.rabbitmq.eu-1.local:5672","b.rabbitmq.eu-1.local:5672","c.rabbitmq.eu-1.local:5672"], "username": "schema-replicator", "password": "s3kRe7"}'

For downstreams, there is one more setting that can be configured: sync operation interval. The interval is in seconds and controls how often this downstream will initiate sync operations:

# follower cluster
cluster_name = eu.2

# this node will run as a downstream (follower) for
# schema replication
schema_definition_sync.operating_mode = downstream

# initiate sync operations every 30 seconds
schema_definition_sync.downstream.minimum_sync_interval = 30

Depending on the accumulated delta between the two sides, a sync operation can take some time to complete.

If the actual amount of time taken exceeds the configured minimum, the greater value of the two will be used. This is to make sure that in case of a large volume of data to import, sync operations are not consistently initiated more frequently than it takes to import them, conserving resources on both sides.

Downstream status can be inspected using the same command, rabbitmqctl schema_replication_status:

rabbitmqctl schema_replication_status

The downstream can be configured to exclude certain entities (queues, exchanges, users, etc.) from the synchronisation process. For example, we can filter the user local-admin from the synchronisation process; this can be useful if you want to have certain entities in the downstream, even if they don't exist in the upstream. Keep in mind that any entity not present in the upstream, will be deleted by the downstream synchronisation process.

# to filter users using regex
schema_definition_sync.downstream.locals.users = ^my-user$

# to filter vhosts using regex
schema_definition_sync.downstream.locals.vhosts = ^vhost-test.*

# to filter policies using regex
schema_definition_sync.downstream.locals.policies = ^example$

# to filter global parameters
schema_definition_sync.downstream.locals.global_parameters = ^some-param

# to filter parameters
schema_definition_sync.downstream.locals.parameters = example$

# to filter queues
schema_definition_sync.downstream.locals.queues = ^leave-this-q$

# to filter exchanges
schema_definition_sync.downstream.locals.exchanges = ^important-exchange-[a-z]+

Stopping and Resuming Replication

Replication can be stopped on either end by invoking rabbitmqctl disable_schema_replication:

rabbitmqctl disable_schema_replication

This will make the node disconnect from the upstream and stop initiating (if it is a downstream) or serving (if it is an upstream) sync operation requests.

To re-enable synchronisation, use rabbitmqctl enable_schema_replication:

rabbitmqctl enable_schema_replication

To restart schema replication, e.g. after an upstream endpoint or credential change, use rabbitmqctl restart_schema_replication:

rabbitmqctl restart_schema_replication

This is identical to disabling and immediately re-enabling replication using the aforementioned commands.

Secondary Cluster (Standby) Promotion

Having a standby cluster with synchronised virtual hosts, users, permissions, topologies and so on is only useful if it can be turned into a new primary cluster in case of a disaster event. In this guide we will refer to such event as a downstream promotion.

A promoted downstream becomes a "regular" cluster that can, if needed, itself serve as an upstream. It does not sync from its original upstream but can serve sync operation requests.

A downstream promotion involves a few steps on the downstream side:

  • Replication is stopped
  • An upstream setup is performed
  • Node mode is switched to upstream
  • Replication is resumed

All these steps are performed using CLI tools and do not require a node restart:

# stop replication
rabbitmqctl disable_schema_replication

# this upstream setup has to be performed just once, not for every upstream cluster node
rabbitmqctl add_vhost rabbitmq_schema_definition_sync
# similar to 'rabbitmqctl add_user' but also grants full permissions
# to the virtual host used for definition sync
rabbitmqctl add_schema_replication_user "schema-replicator" "s3kRe7"

# connect to the local (promoted) nodes
rabbitmqctl set_schema_replication_upstream_endpoints '{"endpoints": ["a.rabbitmq.eu-2.local:5672","b.rabbitmq.eu-2.local:5672","c.rabbitmq.eu-2.local:5672"], "username": "schema-replicator", "password": "s3kRe7"}'

# act as an upstream
rabbitmqctl set_schema_replication_mode upstream

# resume replication
rabbitmqctl enable_schema_replication

The promoted cluster then can be used by applications and as an upstream for other clusters. It no longer has any connection to its original upstream.

Post-Promotion

Note that if the promoted cluster is to be restarted, its operating mode must be updated in the configuration file as well, otherwise it will revert back to its originally configured mode, downstream.

The plugin does not make any assumptions about what happens to the original cluster that has experienced a disaster event. It can be gone permanently, brought back as a standby for the newly promoted one or be eventually promoted back.


Standby Replication

This guide covers WAN-friendly replication of quorum queues and streams between RabbitMQ clusters. This feature is typically used for setting up one or more standby (passive) disaster recovery clusters.

WAN quorum queue and stream replication is a Tanzu RabbitMQ-specific feature provided by the rabbitmq_standby_replication plugin.

The plugin's replication model has a number of features and limitations:

  • It assumes that schema definitions are replicated between clusters, e.g. using rabbitmq_schema_definition_sync, a Tanzu RabbitMQ-specific plugin
  • It is enabled using policies, and therefore, on a virtual host by virtual host basis and can replicate only a subset of queues
  • Replicated data is retained for a configurable period of time. This makes the recovery procedure and limitations easy to explain, reason about and explain
  • It has low overhead for message collection (duplication): all messages in offsite replicated queues are stored in a local stream, a special version of stream client is used to transfer the stream state
  • Messages in a data center can be replicated to more than one data center
  • There is one connection per replicated virtual host, allowing for some connection parallelism
  • Connections are authenticated the same way clients would be
  • Connections supports TLS

In case of a disaster event the recovery process involves several steps:

  • A standby cluster will be promoted to the operator
  • Applications will be redeployed or reconfigured to connect to thew newly promoted cluster
  • Other standby clusters have to be reconfigured to follow the newly promoted cluster

As explained later in this guide, promotion and reconfiguration happen on the fly, and do not involve RabbitMQ node restarts or redeployment.

Enabling the Plugin

As any other RabbitMQ plugin, rabbitmq_standby_replication must be enabled before it can be used. Usually this plugin should be pre-configured or enabled as --offline before node start:

rabbitmq-plugins enable rabbitmq_standby_replication --offline

How the Syncing Works

Upstreams and Downstreams

The plugin assumes there are two sides to Warm Standby Replication (WSR):

  • A source cluster, a.k.a. "origin", a.k.a. upstream (borrowing a term from the Federation plugin)
  • A destination cluster, a.k.a. follower, a.k.a. downstream

There can be multiple downstreams for an upstream; this document primarily discusses a single upstream, single downstream scenario for the sake of simplicity.

A node can run as an upstream (in the upstream mode) or a downstream. All nodes in a cluster must run in the same mode.

A node running in the downstream mode (a follower) can be converted to an upstream (leader) on the fly. This will make it disconnect from its original source, therefore stopping all syncing. The node will then continue operating as a member of an independent cluster, no longer associated with its original upstream.

Such conversion is called a promotion and should be performed in case of a disaster recovery event.

Message Collection

On the upstream side, the plugin enables message collection to a local outgoing stream. This stream collects all messages that should be transferred to one or more downstreams (passive clusters).

Collected messages are retained for a configurable period of time. As they are transferred to downstreams, the log is truncated.

The outgoing message stream can be replicated. Whether enabling outgoing stream replication for Warm Standby Replication makes sense depends on the data safety requirements of the system.

Remote Cluster Transfer

Downstream clusters connect to their configured upstream and begin transferring the messages collected upstream. Every virtual host with WSR enabled gets a connection from downstream to upstream. Similarly to the Federation plugin, those connections are often referred to as links.

In clusters with multiple nodes, each node is responsible for the data of a particular virtual host. For example, in a cluster of three nodes A, B and C, and ten virtual hosts, the nodes will own, transfer and recover data for four, three and three more virtual hosts, respectively. This spreads the load on the downstream cluster in terms of bandwidth, disk space, disk I/O and CPU resources, in particular at the time of promotion (recovery).

As nodes are added and removed from the downstream clusters, transfer links will be stopped and started again to keep their distribution reasonably even across all nodes.

A link uses a RabbitMQ stream binary protocol client operating in a special "raw" mode. In this mode, all stream events are transferred with little encoding and decoding involved.

Links acting as stream protocol clients has a few benefits:

  • All communication is asynchronous, there is no coupling, a standby can run a different version of RabbitMQ
  • Authentication is identical to that of applications
  • No additional ports need to be open

WSR links authenticate the same way any RabbitMQ stream client would. Most environments should use a separate user that has adequate permissions for the virtual hosts involved in offsite replication. WSR links can and typically will use TLS.

When multiple nodes have the WSR plugin enabled, each virtual host will be "owned" by only one downstream node. This means that for a given virtual host, only one downstream node will start a link and only one node will perform message recovery at promotion time (covered below). Both outgoing and incoming message streams can be replicated to any number of nodes.

Once the stream of collected messages is transferred, it is stored on the downstream cluster waiting for it to be promoted (a recovery event).

A downstream can be temporarily disconnected and reconnected to its upstream:

# disconnects downstream nodes from the upstream
rabbitmqctl disconnect_standby_replication_downstream

# some time passes..

# reconnects
rabbitmqctl connect_standby_replication_downstream

Configuration

Upstreams

Every node in a Standby Replication setup must be provided with some information that will determine what it does:

  • What mode it operates in, upstream (active or source) or downstream (passive or standby)
  • For downstreams, a set of upstream connection endpoints
  • Using a policy, what queues in a particular virtual host should be replicated offsite

The mode must be provided in rabbitmq.conf. Supported values are upstream and downstream, respectively:

# active (source) cluster name
cluster_name = eu.1

# this node will run as an upstream (active, source) for
# warm standby queue and stream replication (WSR)
standby.replication.operating_mode = upstream

# total size message stream limit in bytes
# 5 GB - adjust if needed
standby.replication.retention.size_limit.messages = 5000000000

Downstreams

Downstream nodes need to be configured to use the downstream operating mode:

# passive (standby) cluster name
cluster_name = eu.2

# this node will run as a downstream (passive) for
# warm standby queue and stream replication (WSR)
standby.replication.operating_mode = downstream

# total size message stream limit in bytes
# 5 GB - adjust if needed
standby.replication.retention.size_limit.messages = 5000000000

# this is IMPORTANT, otherwise schema sync plugin
# can interfere with the local state of the standby replication
schema_definition_sync.downstream.locals.global_parameters = ^standby

Downstream nodes need to have a list of connection endpoints. Connection endpoints and credentials are configured using runtime parameters. This makes it possible to reconfigure them without a node restart:

# Specify local (upstream cluster) nodes and credentials to be used
# for WSR.
#
# Note that the target port is that of the RabbitMQ Stream protocol, *not* AMQP 1.0.
rabbitmqctl set_standby_replication_upstream_endpoints '{"endpoints": ["a.rabbitmq.eu-1.local:5552","b.rabbitmq.eu-1.local:5552","c.rabbitmq.eu-1.local:5552"], "username": "offsite-replicator", "password": "s3kRe7"}'

If a list of endpoints is provided, a downstream node will connect to the first reachable node. Providing a list makes schema replication more resilient to node failures on the upstream side.

Message Collection

The collection feature is enabled for specific queues in a virtual host using a policy. Hosts that enable offsite replication should be tagged with standby_replication:

rabbitmqctl add_vhost "vhost1" --tags "standby_replication"

Virtual hosts tagged this way on the upstream cluster will have an outgoing stream declared by the WSR plugin. Tagging also makes it easy for operators to spot what virtual hosts have WSR enabled: this is a feature that can carry a substantial bandwidth and disk space footprint.

After declaring or marking an existing virtual host, a policy has to be added to match the queues that will be replicated to a Warm Standby:

rabbitmqctl set_policy --vhost "vhost1" osr "^.*" '{"remote-dc-replicate": true}' --apply-to "queues"

As with other policy-driven features, only one policy applies at a time, therefore Warm Standby Replication key(s) may have to be merged with other policy keys.

Configuration settings

Standby replication streams can be configured to adjust the retention of the data in the local streams. The following configuration keys can be set in rabbitmq.conf to configure various standby replication settings:

# Accepted values are upstream and downstream
standby.replication.operating_mode = upstream

# The location of stream data files can be customised
# For example, to use replicated storage
standby.replication.downstream.data_dir = /some/location

# The maximum amount of data to keep inside a stream in bytes
# This limit is applied per vhost. In other words, each stream can use up to
standby.replication.retention.size_limit.messages = 10000000000

# Retention based on time
standby.replication.retention.time_limit.messages = 12h

# TLS options for downstream worker connection to the upstream
# Set these options if upstream is configured to use TLS
# Same options ssl_options documented here: https://rabbitmq.com/ssl.html#enabling-tls
standby.replication.downstream.ssl_options.*

Verifying Replication Status on a Node

Upstream

To observe information regarding messages collected for replication in the upstream, use:

rabbitmq-diagnostics inspect_standby_upstream_metrics
# Inspecting standby upstream metrics related to recovery...
# queue   timestamp       vhost
# myqueue 1638363684843   vhost1

In the upstream, you should also observe a stream type connection from the downstream.

Downstream

To observe information regarding messages transferred to the downstream for recovery, use:

rabbitmq-diagnostics inspect_standby_downstream_metrics
# Inspecting standby downstream metrics related to recovery...
# queue   timestamp       vhost
# myqueue 1638363684843   vhost1

rabbitmqctl display_disk_space_used_by_standby_replication_data
# Listing disk space (in gb) used by multi-DC replication
# node    size    unit    vhost
# rabbit@eac27b7ff55c     0.0012  gb      vhost1

Relevant metrics will also be exposed via the Prometheus metrics endpoint.

Downstream (Passive, Hot Standby) Promotion

Having a standby cluster with synchronised schema and messages is only useful if it can be turned into a new primary cluster in case of a disaster event. In this guide we will refer to such event as a downstream promotion.

A promoted downstream cluster is detached from its upstream to operate independently. Promotion is never performed by the plugin itself; the decision is performed by a human operator. Promotion is triggered via a CLI command.

A promoted downstream becomes a "regular" cluster that can, if needed, itself serve as an upstream. It does not sync from its original upstream but can be configured to collect messages for offsite replication to another datacenter.

When a cluster is promoted, a few things happen:

  • All upstream links are closed
  • For every virtual host, all messages in the incoming message are re-published to their original destination queues

The promotion process takes time. The amount of time it takes will be proportional to the retention period used. This operation will be CPU and disk I/O intensive.

Every downstream node will be responsible for recovering the virtual hosts it "owns". This helps distribute the load between cluster members.

To list virtual hosts available for promotion, i.e. have local data to recover:

rabbitmqctl list_vhosts_available_for_standby_replication_recovery

To initiate a recovery procedure:

rabbitmqctl promote_standby_replication_downstream [--start-from-scratch] [--all-available] [--exclude-virtual-hosts \"<vhost1>,<vhost2>,<...>\"]

The flag --start-from-scratch recovers the messages that were reported as consumed and acknowledged by the upstream, even if information about the last recovery is available.

The flag --all-available forces to recover all messages available if neither the last cutoff nor the last recovery information are available.

Virtual hosts could be excluded from promotion with the --exclude-virtual-hosts flag.

To display promotion summary (in case a promotion was attempted):

rabbitmqctl display_standby_promotion_summary

The recovery process stores a summary on disk indicating what was the last timestamp recovered. This allows for idempotent recovery that avoids recovering the same set of messages twice.

After the recovery process completes, the cluster can be used as usual.

Additional Commands

If the cluster size changes, the virtual hosts "owned" by every node might change. To delete the data for the virtual hosts that nodes do not longer own, use the next command:

rabbitmqctl delete_orphaned_data_on_standby_replication_downstream_cluster

To inspect the size of the data replicated:

rabbitmqctl display_disk_space_used_by_standby_replication_data

To disconnect the downstream, effectively stopping message replication, run:

rabbitmqctl disconnect_standby_replication_downstream

To (re)connect the downstream, effectively starting/resuming message replication, run:

rabbitmqctl connect_standby_replication_downstream

Post-Promotion

Note that if the promoted cluster is to be restarted, its operating mode must be updated in the configuration file as well, otherwise it will revert back to its originally configured mode, downstream.

The plugin does not make any assumptions about what happens to the original cluster that has experienced a disaster event. It can be gone permanently, brought back as a standby for the newly promoted one or be eventually promoted back.

After promotion, the replicated data on the old downstream can be erased from disk with:

rabbitmqctl delete_all_data_on_standby_replication_cluster

If this command is used on an active downstream, it will deleted all tranferred data until that point, but it also might stop the replication. To ensure it continues, the downstream must be disconnected and connected again using the commands listed above.

To delete the internal streams on the upstream:

rabbitmqcl delete_internal_streams_on_standby_replication_upstream_cluster

Diagnostics

To inspect the number of messages replicated for each virtual host, exchange and routing key:

rabbitmq-diagnostics inspect_local_data_available_for_standby_replication_recovery

This is a very expensive operation as it reads and parses all data on disk, it should be used with care. This operation can take a long time to run, even for medium data sizes.

check-circle-line exclamation-circle-line close-line
Scroll to top icon