Monday, September 27, 2010

Repeater and Idempotent Receiver implementation in Akka

Update to this post: I have upgraded the examples to Akka 1.0-RC1. Check the updated code at github here.


After a couple of examples to get started with Akka, I thought it would be nice to show a more involved example.
In this post I am going to show how you can build your own simple reliable messaging (Guaranteed Delivery) on top of Akka Remote Actors. It is an implementation of an Idempotent Receiver and a Repeater. I'm currently using the master branch of Akka (1.0-SNAPSHOT). I have put this example up on github here along with my other Akka examples from previous posts, so you can skip the lengthy explanation below completely and go straight to the source and check out the eip.idempotent package :)
Instead of pygmentizing all my code into this post, I'll just link to github instead.

Update: You might ask yourself why I am implementing this so high in the stack (which some very smart people did :) At the moment I need this to work on an unmodified version of Akka and I needed it quick. Building it in for instance TCP would have probably made it necessary to modify the Akka source itself and how it builds on top of JBoss Netty. I'm leaving that for another day, stay tuned :)

And I thought it would be nice to show the use of the LifeCycleEvent features of Akka, how it inherits all the Netty goodness and how it is a bit easier to work with Actors instead of hand-rolled threading.

Anyway, back to the example.

What I wanted to achieve with the code:
  • Recovery from error conditions on the client or the server side.
  • Guaranteed delivery of messages, even if connection loss occurs (reconnects, errors, shutdown/startup).
  • A simple lightweight protocol. 
  • It should work with Google Protobuf serialization, since I need a very compact wire protocol. (It shouldn't be to hard to add others)
  • It should be a drop-in solution and stay out of my way. I should just be able to bang (!) on an ActorRef on the client side, and receive on an Actor on the server side.
  • I don't need the order of messages maintained.
  • It only has to work for one way messaging (for now), not for synchronous request-response. (It shouldn't be too hard to add).
  • It should work on top of Akka's Remote Actors. 
  • It should work in a network balanced infrastructure.
  • The goal is to improve the reliability of a very lean protocol, not to implement a full featured message queuing protocol or to compete with existing protocols (Because there are plenty of options already available). Keepin it simple.
  • Show that Akka takes away a lot of the complexities in building this type of stuff.
Idempotent Receiver
An Idempotent Receiver is an Enterprise Integration Pattern that is used to safely receive duplicate messages. The main reason for the use of this pattern is for solving common problems in communicating over a network:

  • Network components between servers might duplicate messages
  • Connection errors might only be known on one side of the communication link

For instance, a client sends a message, receives no error on the connection and assumes the message is received by the server. Some network component between the client and the server routes the message to the server, where a connection error occurs. The client is unaware of this and the server never receives the message.
Even in the case where the client uses request-response to communicate with the server, the client cannot be sure that the request has been handled or not, when the client does not receive a reply. The server might have handled the request and failed to send back a response. Resending a message to the server on no reply could result in the operation being applied twice.

The Idempotent Receiver pattern allows the client to repeat messages, because it can identify duplicate messages. I've implemented the Idempotent Receiver as a Remote Actor. It passes messages on to the actual Actor that is supposed to receive the messages, unless the message is a duplicate of course. The Idempotent Receiver checks every incoming message on an Envelopes implementation. I've just included some in-memory  implementations, it is quite easy to add a persistent one, for instance using the Akka Persistence module.

Repeater
On the client side I've implemented an Actor that communicates with the Idempotent Receiver on the server side. The Repeater keeps track of all messages that are sent to the server in a RepeatBuffer and keeps a copy of the messages, which it can repeat if needed to the server. The user of the repeater can just get an ActorRef to it and send the messages as you normally would. The Repeater wraps every message in an Envelope, and sends this Envelope to the Idempotent Receiver using the Remote Actor Protocol.

Protocol
I've cheated a bit (in Dutch we say "Beter goed gestolen dan slecht bedacht" :) and looked at how the Remote Actor Protocol works, and borrowed a bit from TCP and other message standards and came up with the following.
Every message will be sent inside an Envelope. An Envelope consists of a Frame ID and an  Envelope ID, and a Payload part that can contain any message. I've created protobuf message definition for this here.
Because the Repeater and Idempotent Receiver are keeping messages in memory or on disk, you need a way to tell which messages can be removed. That is where the Frame comes in. Envelopes are always part of a Frame, which is like a 'sticky session' for a number of Envelopes. The Frame consists of a source address, a destination address, an ID and a size (the amount of envelopes).


The Repeater first requests a Frame from the Idempotent Receiver and then starts sending messages for that Frame. The Idempotent Receiver checks if it has received every (unique) message for a Frame based on the Frame size. When a Frame is complete, the Idempotent Receiver sends a CompleteFrameRequest out of band to the Repeater and removes the envelopes from storage. The Repeater receives the request and also removes the envelopes from storage.

As from Akka 0.10 you can register Lifecycle event listeners on both the RemoteClient and RemoteServer (By the way, check out the awesome Akka User List, the guys there are extremely helpful).

I use the Lifecycle event Listeners to be notified of connection errors and successful reconnects.When the connection has been restored after a disconnect or error, a repeat of the Frame is triggered on the client side, or a repeat request of the Frame is triggered on the server side to the Repeater with some selective acknowledgement of the already received Envelope IDs.

Since the Envelope message is just another protobuf encoded message, Akka handles it as usual in the RemoteClient and RemoteServer using the Akka Remote Protocol. The Idempotent Receiver receives the Envelope message and deserializes the payload to the correct protobuf message that the Actor is expecting, and the Envelope itself. Right now it only works for protobuf messages because that's all I use, but the EnvelopeSerializer can be easily changed to handle all types of messages (If that code looks familiar it's because it's basically a copy of the MessageSerializer class in Akka :)

Scale out
A problem with the Idempotent Receiver is that it is not easy to scale out. If you would have RemoteActors distributed over many Servers and use a network load balancer, you can't just put an Idempotent Receiver on all of the servers, since they would not know about each other. Sharing state over distributed nodes is not really what you want, it gets complicated very quickly. Instead of distributing the Envelope and Frame state over several servers, I have implemented a JGroups based Envelopes implementation where every node in the cluster 'owns' the Frames that is has created. If it receives a message for a Frame that it doesn't own, it sends it into the cluster, so that the Idempotent Receiver that owns the Frame can handle it. You can specify a range for the Frame IDs in the Envelopes so that different servers will never create the same Frame.

Testing
You can check out the unit tests here. To make sure that everything works, I have written a very simple and crude Network Proxy (Blocking I/O) for testing, that will be used in the unit tests between the Repeater and Idempotent Receiver. The Network Proxy can be started, stopped, and connection errors can be caused by registering a function in the loop that receives from the client and passes this to the RemoteServer, and in the loop that receives responses from the RemoteServer and passes these back to the client.

In the repository you can also find this unit test that tests if all LifeCycle events (both RemoteClient and RemoteServer) are triggered by the Akka framework, using this simple proxy.

I've written this unit test here to test the Repeater and Idempotent Receiver. The JGroups version is tested in this unit test here. Let me know what you think!

7 comments:

  1. Nice example, some edge cases that might be interesting (accepting I may have misinterpreted, mis-read or failed to absorb some subtle details in your code and explanation):

    (1) What happens if the owner of an in-flight/partially full Envelope goes offline permanently? You'll never receive a confirmation of a full envelope.

    (2) What happens if the envelope complete protocol does not close out due to permanent death of a client prior? There's a chunk of storage you cannot free.

    (3) Flow-control: A client can continue to create messages and queue them up for transmission creating a substantial backlog of messages. How many do you want to queue up, how much resource do you want to dedicate to that queue? What happens when it's exhausted?

    And some stuff I was kinda curious about:

    "since I need a very compact wire protocol."

    Why is compactness important to you?

    "A problem with the Idempotent Receiver is that it is not easy to scale out."

    And by scale out you mean having many actors any one of which can process a message?

    Seemingly the core of your scale out problem is related to the stateful nature of your protocol (a tight tie between envelope owner and client). What kind of timeliness requirements have you for message delivery? Are you doing something stateful when processing these messages after reception (like shoving them in a shared database)?

    ReplyDelete
  2. [Groan think I used Envelope when I meant Frame] - try again:

    (1) What happens if the owner of a frame (an Idempotent Receiver) dies before the frame is complete?

    (2) What happens if the client filling the frame dies, leaving it partially filled?

    And:

    A key challenge of scaling out your implementation is related to the fact that some Idempotent Receiver "owns" the frame.

    ReplyDelete
  3. Hi Dan,

    Thanks for your feedback.

    With the scale out problem I meant to say that the Idempotent Receiver pattern in itself has a tight coupling with state. Because you want to check if you have received a message before, you need to keep track of the messages you received. Once you scale out receivers over servers, you are basically forced to distribute that state as well, so that you can ask the distributed state if the message was already handled by someone, and you would need transactions between the servers. That would be ideal from a 'no message lost' point of view, I would expect throughput to be impacted though since all servers would have to agree all the time before sending through.

    I have only started the distributed part, and there I took the shortcut on the ownership of Frames, since then you never have to wait for acknowledgement of another server and don't need 'distributed locks' of any kind (well thats the idea anyway :) and it was good enough for my use case.

    What I haven't added yet is taking over ownership of Frames, which could be added to the JGroupEnvelopes. When a member in the cluster fails in JGroups every surviving server gets a message (view change with list of servers in group), another server could take over the Frame. I do need distributed state for that, but only of ownership of frames. You could get some duplication of envelopes at that point, if the crashed receiver never did a partial acknowledgement of sent through envelopes. Adding the envelopeIds of the owned frame in the distributed state (and getting the receiver to send this out) could be good enough to solve most of that.

    1) In the load balanced version, the Frame will be sent to another receiver and this receiver will not be able to handle it. The Frame will be forwarded to the cluster group (with no receiver in it to handle the Frame) and that will continue until the receiver that went offline is replaced (on the samde address), at which point the Frame could be handled, if the state of the Envelopes was restored on that new receiver, (you would need to use a persistent version of the Envelopes trait). That implies backup of envelope storage for every server, and restore from that backup. No replacement of server means that client will keep on repeating...

    2) You are correct. Right now it will not free that chunk of storage.

    3) Right now it fills up memory in the in memory implementation, and fills up the disk in a persistent implementation. So it crashes at some point.

    2) and 3) are related in that aspect. Adding a timeout and mark as stale, or a max buffer could solve that problem, at which point you would have to alert someone that can decide what to do with it (back it up, copy to another server), or remove and lose the message. That could be changed in the Envelopes and RepeatBuffer implementations. So you do need to monitor that in the servers. A way of configuring how much of of resources you would want to dedicate to that would be handy.

    The compactness is important for me because of the restrictions I have in certain projects (low bandwidth).

    ReplyDelete
  4. "With the scale out problem I meant to say that the Idempotent Receiver pattern in itself has a tight coupling with state. Because you want to check if you have received a message before, you need to keep track of the messages you received. Once you scale out receivers over servers, you are basically forced to distribute that state as well, so that you can ask the distributed state if the message was already handled by someone, and you would need transactions between the servers. That would be ideal from a 'no message lost' point of view, I would expect throughput to be impacted though since all servers would have to agree all the time before sending through."

    That's what I thought you would be doing - so you could partition somewhat and have a number of little clusters of servers (say two or three) running the receivers. A client would pick a small cluster to work with and thus each cluster only has to talk within a couple of servers to agree state.

    Of course, once you start doing this sort of thing you're getting quite close to implementing the core of a message queue :)

    "Adding a timeout and mark as stale, or a max buffer could solve that problem, at which point you would have to alert someone that can decide what to do with it"

    Yeah, that's another thing I was expecting - guaranteed delivery can only be so guaranteed :)

    ReplyDelete
  5. And one more:

    "The compactness is important for me because of the restrictions I have in certain projects (low bandwidth)."

    I don't know how limited your bandwidth is but lots of discrete little messages will mean lots of headers and tails taking up valuable bytes so you might want to consider some batching.

    ReplyDelete
  6. Hi Ray

    Great article

    I have git cloned your AkkaExample but are not able to sbt update due to missing libs from the akka 1.0 snapshot.

    If you have any idea how to fix this I would very much appreciate your effort

    Kind regards
    Steffen

    ReplyDelete
  7. Hi Steffen,

    Thanks for the interest!

    That is because I used a very specific version of Akka, there was no official release at the time which had the fixes I needed, so I built my own version from source. The 1.0 version of Akka is comming soon (Milestone 1 has been released very recently, although I'm not sure if it is available in the maven repo's) When the official release is out I will update the example to use the 1.0 version. What you can do now is git clone the latest Akka source, build it locally, publish it locally as 1.0 snapshot to your local ivy repo and then try the sbt update again. Let me know if you need more help getting this to work.

    ReplyDelete