Simple distributed load-balancing
“AMQP proxies” is a simple way of integrating AMQP with Akka to distribute jobs across a network of computing nodes. You still write “local” code, have very little to configure, and end up with a distributed, elastic, fault-tolerant grid where computing nodes can be written in nearly every programming language.
The full source-code for the demo is available at https://github.com/sstone/akka-amqp-proxies.
Suppose that you are using Akka to implement a Calculator actor:
To make the most of your multicore machine you could use a router and several calculator actors:
Now suppose that you have many computers with many cores and want to reuse the same pattern to distribute computing jobs ? That’s what AMQP proxies are about. You would write the same compute() method, but your computations will be spread among your machines, with resource-based load-balancing, error handling and automated retries, very little configuration, and you can add/remove computing nodes as you wish.
How does it work ?
On the server-side, we start a RPC server proxy for the Calculator actor which:
- consumes messages from a queue
- deserializes them
- passes them to the Calculator actor
- serializes the response
- sends it back to the caller’s private reply queue
On the client side, we start a actor proxy which, every time it receives a message
- serializes it
- makes a RPC call
- waits for an answer
- deserializes it
- passes the deserialized answer to the original sender
To use these proxies, your code becomes:
Please note that we are still using the same compute() method, but now we pass a proxy instead of the actual Calculator actor!
On the server side, you need to instantiate a proxy for the actual Calculator actor:
There is almost nothing to configure: all the clients and server need to know the broker host/IP address…
To create a “distributed computing grid” you will need:
- a serialization language to describe messages
- a “registry/router” for declaring computing units which can all process the same type of computing jobs
- a “router/bookkeeper” for keeping track of who is doing what so that jobs can be sent to the first available computing nodes
- a “router/tunnel” for routing results back to the original caller
You might also wish for a few extra features:
- automatic “discovery/registry”: no configuration hassles, no need to declare routes/ip addresses everywhere
- elastic load-balancing: you can add/remove computing nodes at will
- automated error handling: if a computing node fails while processing a job, that job should be sent to another equivalent computing node
There are several options available but AMQP/RabbitMQ might be one of the simplest yet most powerful. It provides all the features that we need efficiently and reliably, and AMQP being a binary protocol, you can write computing nodes in almost any programming language.
We need a serialization format that is generic enough and can be understood by common programming languages. We use JSON here, but there are many other good options (Google Protocol Buffers being imho one of the best if you’re dealing mostly with binary data).
Obviously, request/response messages must be serializable with JSON (or GPB or ….) which means that they should be simple enough: no closures, parameterized types, no inheritance (composition instead, ..), minimize relationships with other message types…. In practice, when working on large computing grids, this is not really a limitation but rather a good thing.
A common issue in distributed systems is: how do nodes tell each other what type of messages they are sending ?
The basic options are:
- use self-describing messages. This is arguably the best option, but not all serialization formats will let you implement this gracefully. It also means that you always need to parse messages to identify them.
- use “content type” tags provided by your messaging system to identify message contents. It creates a dependency with your messaging system (though most of them allow you to specify custom message properties) but not having to parse messages to tell what they are can be very useful
We will use the second option: use custom message properties (AMQP ‘contentType’ in our case) to identify their contents.
AMQP here refers to AMQP 0.9.1. RabbitMQ implements 0.9.1, but any other broker that implements the 0.9.1 protocol could be used instead.
AMQP routing is quite simple and there are nice tutorials on the net (like this one), but in a nutshell:
- Basic entities are exchanges and queues
- You publish message to exchanges using routing keys
- You consume messages from queues
- You create routes by binding queues to exchanges with routing keys
- Messages are routed to all queues bound to the exchange and routing key they were published to
- Queues live inside the broker (this is a centralized system)
- Queues are the ultimate destination for a message. If you want several consumers to receive the same message you must create one queue for each consumer, and bind them to the right keys.
- When several consumers consume from the same queue, the broker will dispatch messages to them in a round-robin fashion.
- (I’m over-simplifying this):messages transition from ‘ready’ (sitting inside the broker) to ‘unacknowledged’ (sent to a consumer but not acknowledged by this consumer yet) to ‘acknowledged’. A consumer can specify the maximum number of unacknowledged messages it wants to receive with the ‘prefetch’ property. Setting ‘prefetch’ to 1 is a common way of implementing resource-based load-balancing.
- When a consumer fails (i.e.its connection with the broker is lost) the broker will re-route all its unacknowledged messages
Load-balancing and RPC are common AMQP patterns, implemented with shared queues, private reply queues and ‘replyTo’ properties (check the RabbitMQ tutorials for more details)
Performance issues and potential improvements
The actual workflow for one request/response call is:
client —request—> broker —request—> worker —response—> broker —response—> client
So each request/response travels twice. On a very large grid with several shared queues and dozens of consumers behind each queue, the “cost” of this “RPC call” is typically less than 50ms, but with a high-end setup (blade servers, broker has 96GB RAM and 24 cores, ….). You will want to monitor broker traffic and resources, but there are a few options which can be used to improve performance if needed:
- use a more efficient serialization format than JSON
- compress messages (snappy can be very useful here).
- or even use several brokers…
The AMQP RPC client and server rely on acknowledgments and redelivery tags to implement a commonly used error-handling pattern:
- when a server throws an exception while processing a message for the first time, the message is rejected. The broker will publish it again to an equivalent node with a “redeliver” tag set to true
- when a server throws an exception while processing a message that has already been delivered before, a custom error is returned to the client and the message is acknowledged.
About the Author
Fabrice Drouin is a freelance consultant. He has a background in large scale distributed systems and computing grids, and is currently working for a “Big Data” startup in Paris (MFG Labs), building scalable systems with Scala, Akka, AMQP and lots of other smart technologies.