Monday, September 27, 2010

Repeater and Idempotent Receiver implementation in Akka

Update to this post: I have upgraded the examples to Akka 1.0-RC1. Check the updated code at github here.

After a couple of examples to get started with Akka, I thought it would be nice to show a more involved example.
In this post I am going to show how you can build your own simple reliable messaging (Guaranteed Delivery) on top of Akka Remote Actors. It is an implementation of an Idempotent Receiver and a Repeater. I'm currently using the master branch of Akka (1.0-SNAPSHOT). I have put this example up on github here along with my other Akka examples from previous posts, so you can skip the lengthy explanation below completely and go straight to the source and check out the eip.idempotent package :)
Instead of pygmentizing all my code into this post, I'll just link to github instead.

Update: You might ask yourself why I am implementing this so high in the stack (which some very smart people did :) At the moment I need this to work on an unmodified version of Akka and I needed it quick. Building it in for instance TCP would have probably made it necessary to modify the Akka source itself and how it builds on top of JBoss Netty. I'm leaving that for another day, stay tuned :)

And I thought it would be nice to show the use of the LifeCycleEvent features of Akka, how it inherits all the Netty goodness and how it is a bit easier to work with Actors instead of hand-rolled threading.

Anyway, back to the example.

What I wanted to achieve with the code:
  • Recovery from error conditions on the client or the server side.
  • Guaranteed delivery of messages, even if connection loss occurs (reconnects, errors, shutdown/startup).
  • A simple lightweight protocol. 
  • It should work with Google Protobuf serialization, since I need a very compact wire protocol. (It shouldn't be to hard to add others)
  • It should be a drop-in solution and stay out of my way. I should just be able to bang (!) on an ActorRef on the client side, and receive on an Actor on the server side.
  • I don't need the order of messages maintained.
  • It only has to work for one way messaging (for now), not for synchronous request-response. (It shouldn't be too hard to add).
  • It should work on top of Akka's Remote Actors. 
  • It should work in a network balanced infrastructure.
  • The goal is to improve the reliability of a very lean protocol, not to implement a full featured message queuing protocol or to compete with existing protocols (Because there are plenty of options already available). Keepin it simple.
  • Show that Akka takes away a lot of the complexities in building this type of stuff.
Idempotent Receiver
An Idempotent Receiver is an Enterprise Integration Pattern that is used to safely receive duplicate messages. The main reason for the use of this pattern is for solving common problems in communicating over a network:

  • Network components between servers might duplicate messages
  • Connection errors might only be known on one side of the communication link

For instance, a client sends a message, receives no error on the connection and assumes the message is received by the server. Some network component between the client and the server routes the message to the server, where a connection error occurs. The client is unaware of this and the server never receives the message.
Even in the case where the client uses request-response to communicate with the server, the client cannot be sure that the request has been handled or not, when the client does not receive a reply. The server might have handled the request and failed to send back a response. Resending a message to the server on no reply could result in the operation being applied twice.

The Idempotent Receiver pattern allows the client to repeat messages, because it can identify duplicate messages. I've implemented the Idempotent Receiver as a Remote Actor. It passes messages on to the actual Actor that is supposed to receive the messages, unless the message is a duplicate of course. The Idempotent Receiver checks every incoming message on an Envelopes implementation. I've just included some in-memory  implementations, it is quite easy to add a persistent one, for instance using the Akka Persistence module.

On the client side I've implemented an Actor that communicates with the Idempotent Receiver on the server side. The Repeater keeps track of all messages that are sent to the server in a RepeatBuffer and keeps a copy of the messages, which it can repeat if needed to the server. The user of the repeater can just get an ActorRef to it and send the messages as you normally would. The Repeater wraps every message in an Envelope, and sends this Envelope to the Idempotent Receiver using the Remote Actor Protocol.

I've cheated a bit (in Dutch we say "Beter goed gestolen dan slecht bedacht" :) and looked at how the Remote Actor Protocol works, and borrowed a bit from TCP and other message standards and came up with the following.
Every message will be sent inside an Envelope. An Envelope consists of a Frame ID and an  Envelope ID, and a Payload part that can contain any message. I've created protobuf message definition for this here.
Because the Repeater and Idempotent Receiver are keeping messages in memory or on disk, you need a way to tell which messages can be removed. That is where the Frame comes in. Envelopes are always part of a Frame, which is like a 'sticky session' for a number of Envelopes. The Frame consists of a source address, a destination address, an ID and a size (the amount of envelopes).

The Repeater first requests a Frame from the Idempotent Receiver and then starts sending messages for that Frame. The Idempotent Receiver checks if it has received every (unique) message for a Frame based on the Frame size. When a Frame is complete, the Idempotent Receiver sends a CompleteFrameRequest out of band to the Repeater and removes the envelopes from storage. The Repeater receives the request and also removes the envelopes from storage.

As from Akka 0.10 you can register Lifecycle event listeners on both the RemoteClient and RemoteServer (By the way, check out the awesome Akka User List, the guys there are extremely helpful).

I use the Lifecycle event Listeners to be notified of connection errors and successful reconnects.When the connection has been restored after a disconnect or error, a repeat of the Frame is triggered on the client side, or a repeat request of the Frame is triggered on the server side to the Repeater with some selective acknowledgement of the already received Envelope IDs.

Since the Envelope message is just another protobuf encoded message, Akka handles it as usual in the RemoteClient and RemoteServer using the Akka Remote Protocol. The Idempotent Receiver receives the Envelope message and deserializes the payload to the correct protobuf message that the Actor is expecting, and the Envelope itself. Right now it only works for protobuf messages because that's all I use, but the EnvelopeSerializer can be easily changed to handle all types of messages (If that code looks familiar it's because it's basically a copy of the MessageSerializer class in Akka :)

Scale out
A problem with the Idempotent Receiver is that it is not easy to scale out. If you would have RemoteActors distributed over many Servers and use a network load balancer, you can't just put an Idempotent Receiver on all of the servers, since they would not know about each other. Sharing state over distributed nodes is not really what you want, it gets complicated very quickly. Instead of distributing the Envelope and Frame state over several servers, I have implemented a JGroups based Envelopes implementation where every node in the cluster 'owns' the Frames that is has created. If it receives a message for a Frame that it doesn't own, it sends it into the cluster, so that the Idempotent Receiver that owns the Frame can handle it. You can specify a range for the Frame IDs in the Envelopes so that different servers will never create the same Frame.

You can check out the unit tests here. To make sure that everything works, I have written a very simple and crude Network Proxy (Blocking I/O) for testing, that will be used in the unit tests between the Repeater and Idempotent Receiver. The Network Proxy can be started, stopped, and connection errors can be caused by registering a function in the loop that receives from the client and passes this to the RemoteServer, and in the loop that receives responses from the RemoteServer and passes these back to the client.

In the repository you can also find this unit test that tests if all LifeCycle events (both RemoteClient and RemoteServer) are triggered by the Akka framework, using this simple proxy.

I've written this unit test here to test the Repeater and Idempotent Receiver. The JGroups version is tested in this unit test here. Let me know what you think!