<xi:include href="site/tutorials/tutorials-help.xml.inc"/>
In the second tutorial we learned how to use Work Queues to distribute time-consuming tasks among multiple workers.
But what if we need to run a function on a remote computer and wait for the result? Well, that's a different story. This pattern is commonly known as Remote Procedure Call or RPC.
In this tutorial we're going to use RabbitMQ to build an RPC system: a client and a scalable RPC server. As we don't have any time-consuming tasks that are worth distributing, we're going to create a dummy RPC service that returns Fibonacci numbers.
To illustrate how an RPC service could be used we're going to create a simple client class. It's going to expose a method named call
which sends an RPC request and blocks until the answer is received:
FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient(); String result = fibonacciRpc.call("4"); System.out.println( "fib(4) is " + result);
A note on RPC
Although RPC is a pretty common pattern in computing, it's often criticised. The problems arise when a programmer is not aware whether a function call is local or if it's a slow RPC. Confusions like that result in an unpredictable system and adds unnecessary complexity to debugging. Instead of simplifying software, misused RPC can result in unmaintainable spaghetti code.
Bearing that in mind, consider the following advice:
- Make sure it's obvious which function call is local and which is remote.
- Document your system. Make the dependencies between components clear.
- Handle error cases. How should the client react when the RPC server is down for a long time?
When in doubt avoid RPC. If you can, you should use an asynchronous pipeline - instead of RPC-like blocking, results are asynchronously pushed to a next computation stage.
In general doing RPC over RabbitMQ is easy. A client sends a request message and a server replies with a response message. In order to receive a response we need to send a 'callback' queue address with the request. We can use the default queue (which is exclusive in the Java client). Let's try it:
callbackQueueName = channel.queueDeclare().getQueue(); BasicProperties props = new BasicProperties .Builder() .replyTo(callbackQueueName) .build(); channel.basicPublish("", "rpc_queue", props, message.getBytes()); // ... then code to read a response message from the callback_queue ...
We need this new import:
import com.rabbitmq.client.AMQP.BasicProperties;
Message properties
The AMQP 0-9-1 protocol predefines a set of 14 properties that go with a message. Most of the properties are rarely used, with the exception of the following:
deliveryMode
: Marks a message as persistent (with a value of2
) or transient (any other value). You may remember this property from the second tutorial.contentType
: Used to describe the mime-type of the encoding. For example for the often used JSON encoding it is a good practice to set this property to:application/json
.replyTo
: Commonly used to name a callback queue.correlationId
: Useful to correlate RPC responses with requests.
In the method presented above we suggest creating a callback queue for every RPC request. That's pretty inefficient, but fortunately there is a better way - let's create a single callback queue per client.
That raises a new issue, having received a response in that queue it's not clear to which request the response belongs. That's when the correlationId
property is used. We're going to set it to a unique value for every request. Later, when we receive a message in the callback queue we'll look at this property, and based on that we'll be able to match a response with a request. If we see an unknown correlationId
value, we may safely discard the message - it doesn't belong to our requests.
You may ask, why should we ignore unknown messages in the callback queue, rather than failing with an error? It's due to a possibility of a race condition on the server side. Although unlikely, it is possible that the RPC server will die just after sending us the answer, but before sending an acknowledgment message for the request. If that happens, the restarted RPC server will process the request again. That's why on the client we must handle the duplicate responses gracefully, and the RPC should ideally be idempotent.
Our RPC will work like this:
replyTo
, which is set to an anonymous exclusive queue created just for the request, and correlationId
, which is set to a unique value for every request.rpc_queue
queue.replyTo
field.correlationId
property. If it matches the value from the request it returns the response to the application.The Fibonacci task:
private static int fib(int n) { if (n == 0) return 0; if (n == 1) return 1; return fib(n-1) + fib(n-2); }
We declare our fibonacci function. It assumes only valid positive integer input. (Don't expect this one to work for big numbers, and it's probably the slowest recursive implementation possible).
The code for our RPC server can be found here: RPCServer.java
.
The server code is rather straightforward:
prefetchCount
setting in channel.basicQos.basicConsume
to access the queue, where we provide a callback in the form of an object (DeliverCallback
) that will do the work and send the response back.The code for our RPC client can be found here: RPCClient.java
.
The client code is slightly more involved:
call
method makes the actual RPC request.correlationId
number and save it - our consumer callback will use this value to match the appropriate response.replyTo
and correlationId
.main
thread before the response arrives. Usage of CompletableFuture
is one possible solution to do so.correlationId
is the one we're looking for. If so, it completes the CompletableFuture
.main
thread is waiting for the CompletableFuture
to complete.Now is a good time to take a look at our full example source code (which includes basic exception handling) for RPCClient.java and RPCServer.java.
Compile and set up the classpath as usual (see tutorial one):
javac -cp $CP RPCClient.java RPCServer.java
Our RPC service is now ready. We can start the server:
java -cp $CP RPCServer # => [x] Awaiting RPC requests
To request a fibonacci number run the client:
java -cp $CP RPCClient # => [x] Requesting fib(30)
The design presented here is not the only possible implementation of a RPC service, but it has some important advantages:
RPCServer
in a new console.queueDeclare
are required. As a result the RPC client needs only one network round trip for a single RPC request.Our code is still pretty simplistic and doesn't try to solve more complex (but important) problems, like:
If you want to experiment, you may find the management UI useful for viewing the queues.