This guide focuses on statically configured shovels. It assumes familiarity with the key concepts behind the Shovel plugin.
Unlike with dynamic shovels, static shovels are configured using the advanced configuration file. They are started on node boot and are primarily useful for permanently running workloads. Any changes to static shovel configuration would require a node restart, which makes them highly inflexible.
Most users should prefer dynamic shovels to static ones for their flexibility and ease of automation. Generating a dynamic shovel definition (a JSON document) is usually easier compared to a static shovel definition (which uses Erlang terms).
The configuration for the Shovel plugin must be defined in the advanced configuration file.
It consists of a single shovels
clause that lists the shovels that should be started on node boot:
{rabbit, [ %% ... ]}, {rabbitmq_shovel, [ {shovels, [ {shovel_one, [ %% shovel_one properties ... ]}, %% ... ]} ]}
A (deliberately verbose) example configuration can be found below.
Each element of the shovels
clause is a named static shovel. The names in the list must be distinct.
A shovel definition looks like this at the top level:
{shovel_name, [ {source, [ ...protocol specific config... ]}, {destination, [ ...protocol specific config... ]}, {ack_mode, a_mode}, {reconnect_delay, reconn_delay} ]}
where shovel_name
is the name of the shovel (an Erlang atom). The name should be enclosed in single quotes ('
) if they do not begin with a lower-case letter or if they contain other characters than alphanumeric characters, underscore (_
), or @
.
A shovel transfers messages from a source to a destination.
The source
and destination
keys are mandatory and contain nested protocol-specific keys. Currently AMQP 0.9.1 and AMQP 1.0 are two supported protocols. Source and destination do not have to use the same protocol. All the other properties are optional.
source
is a mandatory key and has different keys properties for different protocols. Two properties are common across all protocols: protocol
and uris
. protocol
supports two values: amqp091
and amqp10
, for AMQP 0-9-1 and AMQP 1.0, respectively:
%% for AMQP 0-9-1 {protocol, amqp091}
uris
is a list of AMQP connection URIs:
{uris, [ "amqp://fred:secret@host1.domain/my_vhost", "amqp://john:secret@host2.domain/my_vhost" ]}
The URI syntax is extended to include a query part to permit the configuration of additional connection parameters. See the query parameter reference which are available to static shovels, such as TLS certificate and private key.
Some keys are supported by both AMQP 0-9-1 and AMQP 1.0 sources. They are described in the table below.
Key | Description |
reconnect-delay | The duration (in seconds) to wait before reconnecting to the brokers after being disconnected at either end. Default is 1. {reconnect_delay, 5} |
AMQP 0-9-1-specific source keys are covered in a separate table:
<tr>
<td>queue</td>
<td>
<p>
The name of the source queue as an Erlang binary value. This property is mandatory:
{queue, <<"queue.1">>}
</p>
<p>
<code>queue.1</code> is the name of the queue
to shovel messages from, as a binary string.
</p>
<p>
This queue must exist. Use the resource <code>declarations</code>
covered above to declare the queue or ensure it exists. If
the value is <code><<>></code> (the empty binary string) then the
<em>most recently declared queue</em> in <code>declarations</code> is used.
This allows anonymous queues to be declared and used.
</p>
</td>
</tr>
<tr>
<td>prefetch-count</td>
<td>
The maximum number of unacknowledged messages copied over a shovel at
any one time. Default is <code>1000</code>:
{prefetch_count, 1000}
</td>
</tr>
Key | Description |
declarations | An optional list of AMQP 0-9-1 operations to be executed by the Shovel before it starts transferring messages. They are typically used to set up the topology. {declarations, declaration_list} The declarations follow method and property names used by the RabbitMQ Erlang Client. A minimalistic declaration example: {declarations, [ 'queue.declare', {'queue.bind', [ {exchange, <<"my_exchange">>}, {queue, <<>>} ]} ]} will first declare an anonymous queue, and then bind it to the exchange called Each element of the declaration list is either an AMQP 0-9-1 method given as single quoted atom such as If just the method name is used all the parameters take their defaults (as illustrated with If a tuple and property-list is supplied, then the properties in the list specify some or all of the parameters explicitly. Here is another example: {'exchange.declare', [ {exchange, <<"my_exchange">>}, {type, <<"direct">>}, durable ]} will declare a durable, direct exchange called " |
AMQP 1.0 source settings are different from those of AMQP 0-9-1 sources.
Key | Description |
source_address | This represents the source address of the AMQP 1.0 link. This key is mandatory: {source_address, <<"my-address">>} {prefetch_count, 10} |
destination
is a mandatory key and has different keys properties for different protocols. Two properties are common across all protocols: protocol
and uris
. protocol
supports two values: amqp091
and amqp10
, for AMQP 0-9-1 and AMQP 1.0, respectively:
%% for AMQP 0-9-1 {protocol, amqp091}
uris
is a list of AMQP connection URIs:
{uris, [ "amqp://fred:secret@host1.domain/my_vhost", "amqp://john:secret@host2.domain/my_vhost" ]}
The URI syntax is extended to include a query part to permit the configuration of additional connection parameters. See the query parameter reference which are available to static shovels, such as TLS certificate and private key.
Key | Description |
reconnect-delay | The duration (in seconds) to wait before reconnecting to the brokers after being disconnected at either end. Default is 1. {reconnect_delay, 5} |
Key | Description |
publish_properties | This optional key controls message properties set or overridden by the shovel. It takes the following form {publish_properties, [ {delivery_mode, 2} ]} where the properties in the list are set on the basic properties of each message before it is re-published. This specific example would mark all re-published messages as persistent: {publish_properties, [ {delivery_mode, 2} ]} By default the original properties of the message are preserved, but this clause can be used to change or set any known property: {publish_fields, [ {exchange, <<"my_exchange">>}, {routing_key, <<"from_shovel">>} ]} {publish_fields, [ {exchange, <<"my_exchange">>}, {routing_key, <<"from_shovel">>} ]} {add_timestamp_header, true} {add_forward_headers, true} |
Key | Description |
target_address | This represents the target address of the sending AMQP 1.0 link: {target_address, <<"some-address">>} {properties, [ {content_typle, <<"application/json">>} ]} {application_properties, [ {<<"application-key-1">>, <<"value-1">>}, {<<"application-key-2">>, <<"value-2">>} ]} {add_timestamp_header, true} {add_forward_headers, true} |
A reasonably complete static shovel configuration between AMQP 0.9.1 endpoints might look like this:
{rabbitmq_shovel, [ {shovels, [ {my_first_shovel, [ {source, [ {protocol, amqp091}, {uris, [ "amqp://fred:secret@host1.domain/my_vhost", "amqp://john:secret@host2.domain/my_vhost" ]}, {declarations, [ {'exchange.declare', [ {exchange, <<"my_fanout">>}, {type, <<"fanout">>}, durable ]}, {'queue.declare', [{arguments, [{<<"x-message-ttl">>, long, 60000}]}]}, {'queue.bind', [ {exchange, <<"my_fanout">>}, {queue, <<>>} ]} ]}, {queue, <<>>}, {prefetch_count, 10} ]}, {destination, [ {protocol, amqp091}, {uris, ["amqp://"]}, {declarations, [ {'exchange.declare', [ {exchange, <<"my_direct">>}, {type, <<"direct">>}, durable ]} ]}, {publish_properties, [ {delivery_mode, 2} ]}, {add_forward_headers, true}, {publish_fields, [ {exchange, <<"my_direct">>}, {routing_key, <<"from_shovel">>} ]} ]}, {ack_mode, on_confirm}, {reconnect_delay, 5} ]} ]} ]}
The configuration above defines a single shovel called 'my_first_shovel'
.
'my_first_shovel'
will connect to a broker on either host1
or host2
(as source), and directly to the local broker as destination. It will reconnect to the other source broker on failure, after a delay of 5 seconds.
When connected to the source it will declare a direct, fanout exchange called "my_fanout"
, an anonymous queue with a per-queue message ttl, and bind the queue to the exchange.
When connected to the destination (the local broker) it will declare a durable, direct exchange called "my_direct"
.
This shovel will re-publish messages sent to the anonymous queue on the source to the local exchange with the fixed routing key "from_shovel"
. The messages will be persistent and only acknowledged after receiving a publish confirm from the local broker.
The shovel consumer will not get more deliveries if there are at least ten unacknowledged messages at any moment in time.
A reasonably complete shovel configuration between an AMQP 1.0 source and an AMQP 0.9.1 destination might look like this:
{rabbitmq_shovel, [ {shovels, [ {my_first_shovel, [ {source, [ {protocol, amqp10, uris, [ "amqp://fred:secret@host1.domain/my_vhost", ]}, {source_address, <<"my-source">>}, {prefetch_count, 10} ]}, {destination, [ {protocol, amqp091}, {uris, ["amqp://"]}, {declarations, [ {'exchange.declare', [ {exchange, <<"my_direct">>}, {type, <<"direct">>}, durable ]} ]}, {publish_properties, [ {delivery_mode, 2} ]}, {add_forward_headers, true}, {publish_fields, [ {exchange, <<"my_direct">>}, {routing_key, <<"from_shovel">>} ]} ]}, {ack_mode, on_confirm}, {reconnect_delay, 5} ]} ]} ]}
A more extensive shovel configuration between an AMQP 0.9.1 Source and an AMQP 1.0 destination might look like this:
{rabbitmq_shovel, [{shovels, [{my_first_shovel, {source, [{protocol, amqp091}, {uris, ["amqp://fred:secret@host1.domain/my_vhost", "amqp://john:secret@host2.domain/my_vhost"]}, {declarations, [{'exchange.declare', [{exchange, <<"my_fanout">>}, {type, <<"fanout">>}, durable]}, {'queue.declare', [{arguments, [{<<"x-message-ttl">>, long, 60000}]}]}, {'queue.bind', [{exchange, <<"my_fanout">>}, {queue, <<>>} ]} ]}, {queue, <<>>}, {prefetch_count, 10} ]}, {destination, [{protocol, amqp10}, %% Note: for plain text SASL authentication, use % {uris, ["amqp://user:pass@host:5672?sasl=plain"]}, %% Note: this relies on default user credentials %% which has remote access restrictions, see %% https://www.rabbitmq.com/access-control.html to learn more {uris, ["amqp://host:5672"]}, {properties, [{user_id, <<"my-user">>}]}, {application_properties, [{<<"my-prop">>, <<"my-prop-value">>}]}, {add_forward_headers, true}, {target_address, <<"destination-queue">>} ]}, {ack_mode, on_confirm}, {reconnect_delay, 5} }]} ]} }