Thursday, September 1, 2011

Testing oneway actors with the stackable trait pattern

Just a quick post on something quite simple our team came up with recently for testing a particular type of Actor.
It comes in handy when you want to test an Actor that receives one-way messages and does not send out any messages, or reply to messages. Maybe after processing you want to assert some state somewhere.

Actors that respond to request/response are very easy to test. You don't have to use any Barriers or Latches or even the (very handy) TestKit, you just ask the Actor to send a reply using !!, and assert on the reply, fail on timeout.

Would it not be nice to test one-way Actors in the same way? What we would like is for the Actor to just send the exact same message back that was sent to it after it is done processing the message. Of course only in a test scenario. That way you would be able to test one-way messages in the same way as request/response messages.

The idea was to dynamically add some code to wrap around the Actor that you would like to test, in some way delegate to the Actor, and always do a reply to the sender with the original message, after the Actor is done with the processing of that message. That way the test becomes very easy, you just send a 'asking' !! to the Actor, the Actor does it's normal thing as if it received a 'telling' ! after which the extra bit of code kicks in and sends the original message back to the unit test, so you know its done its job.

Akka has a nice way of handling  the difference between one-way and request/response. You can use the reply_? message from within an Actor, which replies back to the sender if the sender asked for a response, or does nothing in the event the sender 'told' the Actor something in one-way style.

The easiest way we found to dynamically add a sort of template method was to use a stackable trait (which is described on the artima site here). The below ReplyAfterProcessing trait overrides the receive of the Actor, first applies the Actor's receive (which is a PartialFunction) and chains it with an anonymous partial function that always replies the message.


That way you can test a one-way Actor like this. The "with ReplyAfterProcessing" mixes in the trait and replaces the standard receive processing "dynamically".

Happy hAkking!

Saturday, February 12, 2011

Unit testing Akka Actors with the TestKit

Just a quick post to show some examples of how you can use the akka.util.TestKit to unit test Actors. I'm not sure when it was added to Akka but it is really handy. Instead of using CyclicBarriers and CountDownLatches, you just mixin this very handy trait into your spec, akka.util.TestKit.

I added some examples of test scenarios that pop up frequently when working with Actors:
  • You send something to an Actor and you would like to know if it received the message
  • You send something to an Actor and that in turn does some processing and on success sends something to another Actor and you would like to know if that Actor received a specific message
  • You send some messages and you are not interested in every message the Actor receives and would like to ignore them until some message is received
  • You would like to know for sure that an Actor does not send through certain messages to others
  • You would like to know what an Actor sends back as a response based on a certain message
Testing this by hand with the correct concurrent latches and barriers can be brittle and quite error prone, not to mention quite complex.

So how does it work? The TestKit sort of swaps out the senderOption of an Actor through an implicit value scoping trick, which means that when your Actor sends back a message (to "the sender", for instance with a reply), it automatically sends it back to a testActor in TestKit, which queues up messages for you so you can inspect stuff. You can also use that same testActor when you have an Actor that sends messages to some ActorRef. You just switch out the ActorRef you would normally pass to your Actor with the testActor Ref.

I added some very simple Actors for the above scenarios, which touch a lot of typical interactions:
  • EchoActor - an Actor that just echoes any message you send to it
  • ForwardingActor - an Actor that forwards a message to another Actor(Ref)
  • FilteringActor - an Actor that forwards only certain messages (of type String)
  • SequencingActor - an Actor that forwards some random amount of uninteresting messages, an interesting message, and then some random amount of uninteresting messages
As you can see in the code below, you use a within block to indicate that some piece of code must complete with a certain duration. Every within block gets handled sequentially to keep things simple. Some nice methods and implicits in akka.util.duration make it possible to just write 'within(100 millis)' to indicate a duration of 100 milliseconds, or 'within(50 millis, 100 millis)' to indicate a min and max for the duration.
Then you just bang out some messages on an ActorRef. After that you can use expectMsg to assert that a specific message has been received, or expectNoMsg to assert that no message should have been received.  'ignoreMsg' takes a partial function that ignores all messages for which the partial function returns true (Funny, the scala code explains it better than that sentence just now).  in the test with the SequencingActor, I ignore all messages that are not "something", than assert the "something" message and than ignore everything again that is not "something", making sure there are no messages after that. Make sure you stop all actors after all.
Anyway, enough rambling, check out this gist :)


Wednesday, February 9, 2011

Starting with Akka 1.0

Since version 1.0 of Akka is coming out very soon, I thought it might be handy to write a quick new post on getting started with Akka again since a couple of things have gotten quite a bit easier.

Development setup
I am using the following tools:
Install these tools by following the links. I am using sbt-idea as a sbt processor (see sbt-idea link). idea-sbt-plugin is a plugin that you can use to run sbt from within IDEA. Add the Scala plugin and idea-sbt-plugin (look for SBT) in IDEA by using the plugin manager (Settings-> Plugins).
After you have installed these tools you first need to create an sbt project. So lets do that first. ($ means I'm typing in the console, > means I'm in sbt, scala> means I'm in scala interpreter)


Minimal setup
$ mkdir akkastart
$ cd akkastart
$ sbt
press Y to start a new project, fill in some details like name, organization, version, scala version (2.8.1), sbt version (0.7.4) and wait for sbt to download scala and sbt dependencies.
This should create a lib, project, src and target directory. Create a project/build/ directory, add Project.scala to it with the following content:
import sbt._
import sbt_akka_bivy._

class MyProject(info: ProjectInfo) extends DefaultProject(info) with AkkaProject{
}

Create a project/plugins directory and add a Plugins.scala to it with the following content:
import sbt._

class Plugins(info: ProjectInfo) extends PluginDefinition(info) {
  val bumRepo = "Bum Networks Release Repository" at "http://repo.bumnetworks.com/releases"
  val AkkaRepo = "Akka Repository" at "http://akka.io/repository"
  val akkaPlugin = "se.scalablesolutions.akka" % "akka-sbt-plugin" % "1.0-RC6"
  val sbtAkkaBivy = "net.evilmonkeylabs" % "sbt-akka-bivy" % "0.2.0"
}

The akka plugin makes getting all the dependencies and modules for akka extremely easy. The bivy plugin is just really handy for deployment.
Then run sbt update. this will add a lib_managed directory with the minimal dependencies for Akka. add akka modules to the Project.scala file if you expect to use them, for instance akka-camel:

import sbt._
import sbt_akka_bivy._

class MyProject(info: ProjectInfo) extends DefaultProject(info) with AkkaProject{
  val akkaCamel = akkaModule("camel")
}

run sbt update after adding akka modules.
Now we can generate an idea project from the sbt project by running sbt idea: (if you followed installation instructions for sbt-idea as processor from the sbt-idea page)

$ sbt idea

That creates a .iml file your directory and a .idea directory with all the stuff Idea needs. (it also creates a .iml for building the sbt sources like Project.scala)

Now open the .iml file with Idea. Everything should just work (Don't you just love it when that happens? :)
Making your module (Build->Make Module akkastart) builds in the same target directories as sbt would.
Add an akka.conf to src/main/resources (so it gets on your classpath) if you don't like the defaults, check here on akka.io for more information. At first you probably don't need an akka.conf yet, when Akka finds no config it sets up defaults.

That's all there is to it, now you can start adding your Actors in src/main/scala as you see fit.
Of course you can also just do something like this to get that quick scala console fix (output in italics):

$ sbt
> console

scala> import akka.actor.Actor
import akka.actor.Actor
scala> class PrintActor extends Actor {
     |  def receive = {
     |   case s:String => {
     |     println("received:"+s)
     |   }
     |  }
     | }
defined class PrintActor
scala> val actorRef = Actor.actorOf(new PrintActor)
23:28:45.830 [run-main] WARN  akka.config.Config$ - 
Can't load 'akka.conf'.
One of the three ways of locating the 'akka.conf' file needs to be defined:
1. Define the '-Dakka.config=...' system property option.
2. Put the 'akka.conf' file on the classpath.
3. Define 'AKKA_HOME' environment variable pointing to the root of the Akka distribution.
I have no way of finding the 'akka.conf' configuration file.
Using default values everywhere.
actorRef: akka.actor.ActorRef = Actor[PrintActor:f55f3eb0-349b-11e0-ae19-0019d2b39ec9]
scala> actorRef.start   
23:28:59.710 [run-main] DEBUG a.d.Dispatchers$globalExecutorBasedEventDrivenDispatcher$ - Starting up Dispatchers$globalExecutorBasedEventDrivenDispatcher$[akka:event-driven:dispatcher:global]
with throughput [5]
res1: akka.actor.ActorRef = Actor[PrintActor:f55f3eb0-349b-11e0-ae19-0019d2b39ec9]
scala> actorRef ! "test"
23:29:03.412 [run-main] INFO  a.d.LazyExecutorServiceWrapper - Lazily initializing ExecutorService for 

scala> 23:29:03.428 [akka:event-driven:dispatcher:global-1] DEBUG akka.dispatch.MonitorableThread - Created thread akka:event-driven:dispatcher:global-1
received:test

For some more examples you can check these here on github, I've updated them to 1.0-RC6.


Happy hAkking!

Monday, September 27, 2010

Repeater and Idempotent Receiver implementation in Akka

Update to this post: I have upgraded the examples to Akka 1.0-RC1. Check the updated code at github here.


After a couple of examples to get started with Akka, I thought it would be nice to show a more involved example.
In this post I am going to show how you can build your own simple reliable messaging (Guaranteed Delivery) on top of Akka Remote Actors. It is an implementation of an Idempotent Receiver and a Repeater. I'm currently using the master branch of Akka (1.0-SNAPSHOT). I have put this example up on github here along with my other Akka examples from previous posts, so you can skip the lengthy explanation below completely and go straight to the source and check out the eip.idempotent package :)
Instead of pygmentizing all my code into this post, I'll just link to github instead.

Update: You might ask yourself why I am implementing this so high in the stack (which some very smart people did :) At the moment I need this to work on an unmodified version of Akka and I needed it quick. Building it in for instance TCP would have probably made it necessary to modify the Akka source itself and how it builds on top of JBoss Netty. I'm leaving that for another day, stay tuned :)

And I thought it would be nice to show the use of the LifeCycleEvent features of Akka, how it inherits all the Netty goodness and how it is a bit easier to work with Actors instead of hand-rolled threading.

Anyway, back to the example.

What I wanted to achieve with the code:
  • Recovery from error conditions on the client or the server side.
  • Guaranteed delivery of messages, even if connection loss occurs (reconnects, errors, shutdown/startup).
  • A simple lightweight protocol. 
  • It should work with Google Protobuf serialization, since I need a very compact wire protocol. (It shouldn't be to hard to add others)
  • It should be a drop-in solution and stay out of my way. I should just be able to bang (!) on an ActorRef on the client side, and receive on an Actor on the server side.
  • I don't need the order of messages maintained.
  • It only has to work for one way messaging (for now), not for synchronous request-response. (It shouldn't be too hard to add).
  • It should work on top of Akka's Remote Actors. 
  • It should work in a network balanced infrastructure.
  • The goal is to improve the reliability of a very lean protocol, not to implement a full featured message queuing protocol or to compete with existing protocols (Because there are plenty of options already available). Keepin it simple.
  • Show that Akka takes away a lot of the complexities in building this type of stuff.
Idempotent Receiver
An Idempotent Receiver is an Enterprise Integration Pattern that is used to safely receive duplicate messages. The main reason for the use of this pattern is for solving common problems in communicating over a network:

  • Network components between servers might duplicate messages
  • Connection errors might only be known on one side of the communication link

For instance, a client sends a message, receives no error on the connection and assumes the message is received by the server. Some network component between the client and the server routes the message to the server, where a connection error occurs. The client is unaware of this and the server never receives the message.
Even in the case where the client uses request-response to communicate with the server, the client cannot be sure that the request has been handled or not, when the client does not receive a reply. The server might have handled the request and failed to send back a response. Resending a message to the server on no reply could result in the operation being applied twice.

The Idempotent Receiver pattern allows the client to repeat messages, because it can identify duplicate messages. I've implemented the Idempotent Receiver as a Remote Actor. It passes messages on to the actual Actor that is supposed to receive the messages, unless the message is a duplicate of course. The Idempotent Receiver checks every incoming message on an Envelopes implementation. I've just included some in-memory  implementations, it is quite easy to add a persistent one, for instance using the Akka Persistence module.

Repeater
On the client side I've implemented an Actor that communicates with the Idempotent Receiver on the server side. The Repeater keeps track of all messages that are sent to the server in a RepeatBuffer and keeps a copy of the messages, which it can repeat if needed to the server. The user of the repeater can just get an ActorRef to it and send the messages as you normally would. The Repeater wraps every message in an Envelope, and sends this Envelope to the Idempotent Receiver using the Remote Actor Protocol.

Protocol
I've cheated a bit (in Dutch we say "Beter goed gestolen dan slecht bedacht" :) and looked at how the Remote Actor Protocol works, and borrowed a bit from TCP and other message standards and came up with the following.
Every message will be sent inside an Envelope. An Envelope consists of a Frame ID and an  Envelope ID, and a Payload part that can contain any message. I've created protobuf message definition for this here.
Because the Repeater and Idempotent Receiver are keeping messages in memory or on disk, you need a way to tell which messages can be removed. That is where the Frame comes in. Envelopes are always part of a Frame, which is like a 'sticky session' for a number of Envelopes. The Frame consists of a source address, a destination address, an ID and a size (the amount of envelopes).


The Repeater first requests a Frame from the Idempotent Receiver and then starts sending messages for that Frame. The Idempotent Receiver checks if it has received every (unique) message for a Frame based on the Frame size. When a Frame is complete, the Idempotent Receiver sends a CompleteFrameRequest out of band to the Repeater and removes the envelopes from storage. The Repeater receives the request and also removes the envelopes from storage.

As from Akka 0.10 you can register Lifecycle event listeners on both the RemoteClient and RemoteServer (By the way, check out the awesome Akka User List, the guys there are extremely helpful).

I use the Lifecycle event Listeners to be notified of connection errors and successful reconnects.When the connection has been restored after a disconnect or error, a repeat of the Frame is triggered on the client side, or a repeat request of the Frame is triggered on the server side to the Repeater with some selective acknowledgement of the already received Envelope IDs.

Since the Envelope message is just another protobuf encoded message, Akka handles it as usual in the RemoteClient and RemoteServer using the Akka Remote Protocol. The Idempotent Receiver receives the Envelope message and deserializes the payload to the correct protobuf message that the Actor is expecting, and the Envelope itself. Right now it only works for protobuf messages because that's all I use, but the EnvelopeSerializer can be easily changed to handle all types of messages (If that code looks familiar it's because it's basically a copy of the MessageSerializer class in Akka :)

Scale out
A problem with the Idempotent Receiver is that it is not easy to scale out. If you would have RemoteActors distributed over many Servers and use a network load balancer, you can't just put an Idempotent Receiver on all of the servers, since they would not know about each other. Sharing state over distributed nodes is not really what you want, it gets complicated very quickly. Instead of distributing the Envelope and Frame state over several servers, I have implemented a JGroups based Envelopes implementation where every node in the cluster 'owns' the Frames that is has created. If it receives a message for a Frame that it doesn't own, it sends it into the cluster, so that the Idempotent Receiver that owns the Frame can handle it. You can specify a range for the Frame IDs in the Envelopes so that different servers will never create the same Frame.

Testing
You can check out the unit tests here. To make sure that everything works, I have written a very simple and crude Network Proxy (Blocking I/O) for testing, that will be used in the unit tests between the Repeater and Idempotent Receiver. The Network Proxy can be started, stopped, and connection errors can be caused by registering a function in the loop that receives from the client and passes this to the RemoteServer, and in the loop that receives responses from the RemoteServer and passes these back to the client.

In the repository you can also find this unit test that tests if all LifeCycle events (both RemoteClient and RemoteServer) are triggered by the Akka framework, using this simple proxy.

I've written this unit test here to test the Repeater and Idempotent Receiver. The JGroups version is tested in this unit test here. Let me know what you think!

Tuesday, July 27, 2010

Upgrading examples to Akka master (0.10) and Scala 2.8.0 Final

Update on this post: I have upgraded the AkkaExampels git repo to version 1.0-RC1. Camel references are upgraded to 2.5.0. I have also included a reference to the scalablesolutions Akka maven repository, so that the akka-sbt-plugin is automatically downloaded at sbt update. AkkaExamples on github

In this post I am going to write about upgrading from Akka 0.8.1 to Akka 0.10. (at the moment 0.10 is not ready yet, but for simplicity I will refer to 0.10, To be exact it's git commit c737e2edf4b9ad980d6922f169ca4fb19f8c8284 :)
Also to make things a bit simpler, I have pushed my previous Akka examples here to github, so you can just clone from there and check it out. (You will need to build Akka as shown below first until 0.10 comes out, and be sure to run 'sbt update'  before you try to build)

Building Akka
At the time of writing there is no 0.10 release yet that you can just download, so I built Akka from master and published it to my local ivy repository. the sbt build has really improved a lot, the update is quicker and so is the build:

$ git clone git://github.com/jboner/akka.git
$ cd akka
$ sbt update
$ sbt dist
$ sbt publish-local
$ cd akka-sbt-plugin
$ sbt publish-local

I am also building and publishing the akka-sbt-plugin to local, so I can use it later.
Akka now uses Google Protobuf 2.3.0. If you are going to use Google Protobuf for your own messages, you will need to install the 2.3.0 protoc compiler, get it here and follow instructions in the readme file.
In case you are going to use Apache Camel, you need to upgrade the Camel dependencies to 2.4.0 (Akka 0.8.1 used Camel 2.2.0).

SBT Project file plugins
Akka 0.10 comes with a very nice addition for SBT, the AkkaProject sbt plugin. This makes it a lot more succinct and easy to start off with an Akka project, as you can see below:

import sbt._
import Process._
import sbt_akka_bivy._
class Project(info: ProjectInfo) extends DefaultProject(info) with AkkaProject with AkkaKernelDeployment{
  val akkaCamel = akkaModule("camel")
  val akkaKernel = akkaModule("kernel")
  val junit = "junit" % "junit" % "4.8.1" % "test->default"
  val camelFtp= "org.apache.camel" % "camel-ftp" % "2.4.0" % "compile"
  val camelMina= "org.apache.camel" % "camel-mina" % "2.4.0" % "compile"
  val camelJetty= "org.apache.camel" % "camel-jetty" % "2.4.0" % "compile"

  val scalatest = "org.scalatest" % "scalatest" % "1.2-for-scala-2.8.0.final-SNAPSHOT" % "test->default"
  override def repositories = Set( "scala-tools-snapshots" at 
      "http://scala-tools.org/repo-snapshots/"
  )
}

As you can see I have added some Camel components to play with, and I've added the scalatest and junit dependencies for unit testing. The Project file now uses the AkkaProject plugin and the Bivy Sack plugin (sbt-akka-bivy) for microkernel deployments (check here, really cool) which I am going to use in a later post for quick and easy deployment when I actually have something to deploy, the project only contains unit tests for now. Before you can use this Project file, you need to add a Plugins.scala file to the project/plugins directory, which describes the akka-sbt-plugin that we built, and it also pulls in the sbt-akka-bivy plugin for deployment:

import sbt._

class Plugins(info: ProjectInfo) extends PluginDefinition(info) {
  val bumRepo = "Bum Networks Release Repository" at "http://repo.bumnetworks.com/releases"
  val akkaPlugin = "se.scalablesolutions.akka" % "akka-sbt-plugin" % "0.10"
  val sbtAkkaBivy = "net.evilmonkeylabs" % "sbt-akka-bivy" % "0.2.0"
}


Run 'sbt update' to pull the dependencies in. After that you can setup a new project in IDEA. (I'm hoping an sbt IDEA plugin will get really good soon cause this always means some fiddling with the classpath).


Intellij IDEA
I am using the Intellij IDEA Community EAP (95.390 to be precise) which you can get from here.
Once you have installed it, update the Scala plugin in the plugins section if you had a previous version, otherwise install from the available list. (File-Settings-Plugins)

Choose a new project, from existing source, make sure you select the compile and test jars (in lib_managed) and the scala 2.8.0 jars, leave out the 2.7.7 ones from sbt. You can add a Scala Facet yourself, or IDEA will autodetect it and you can choose that it will create it for you, just make sure to use the 2.8.0 final ones. The same goes for the src 'IDEA module', you can add it in the new project wizard or in the Project Structure settings (set the src/main/scala as source and src/test/scala as test). Make sure you set IDEA to use the 2.8.0 final compiler and library in the Scala Facet settings, to the jars found in project/boot/scala-2.8.0/lib. Put the scala 2.8.0 jars right after your JDK in your module dependencies.


Project Structure - src module dependencies

Project Structure - src module sources


Project Structure - scala 2.8.0 project library

I have updated the examples from previous posts about Akka in the project on github. The biggest difference from Akka 0.8 is that the identity and the instance of an Actor are separated respectively into an ActorRef trait and an Actor trait, as described here. There is an Actor object as well which is used as a factory module. The Actor.actorOf method makes sure that you cannot access the Actor instance methods directly to prevent threading problems if you would do such a thing. In the examples I import all the Actor object methods. You can only send messages to the Actor implementation through the API exposed through the ActorRef class. Within the Actor instance, this is done through the self reference.

Below the updated code for 0.10 of the previous simple example in 0.8, to show the usage of actorOf in the specs and the self reference in the Worker Actor.

import org.scalatest.Spec
import org.scalatest.matchers.MustMatchers

import se.scalablesolutions.akka.actor.{Actor}
import se.scalablesolutions.akka.actor.Actor._

/**
 * A Spec for an example Worker Actor.
 */
class WorkerSpecs extends Spec with MustMatchers {
  describe("A Worker") {

    describe("(when it queues commands)") {
      it("should have the correct number of commands queued") {

        val command = new Command("test", "data")
        val actorRef = actorOf(new Worker())
        actorRef.start
        var reply: CountResponse = (actorRef !! new CountCommandsQueued()).getOrElse(fail()).asInstanceOf[CountResponse]
        reply.count must be === 0

        actorRef ! command
        actorRef ! command
        actorRef ! command
        reply = (actorRef !! new CountCommandsQueued()).getOrElse(fail()).asInstanceOf[CountResponse]
        reply.count must be === 3
        actorRef.stop
      }
    }
    describe("(when it executes all queued commands)") {
      it("should have no commands queued after executing") {
        val command = new Command("test", "data")
        val actorRef = actorOf(new Worker())
        actorRef.start
        actorRef ! command
        actorRef ! command
        var reply: CountResponse = (actorRef !! new CountCommandsQueued()).getOrElse(fail()).asInstanceOf[CountResponse]
        reply.count must be === 2
        actorRef ! new Execute()
        reply = (actorRef !! new CountCommandsQueued()).getOrElse(fail()).asInstanceOf[CountResponse]
        reply.count must be === 0
        actorRef.stop
      }
    }
  }
}

/**A command message */
case class Command(name: String, data: String)
/** A message to get the amount of commands queued*/
case class CountCommandsQueued()
/** A message to execute all commands queued */
case class Execute()
/** A message reply on the CountCommandsQueued message */
case class CountResponse(count: Int)

/**
 * A Worker actor that receives Commands, queues the commands, and executes commands on the Execute message,
 * as example for some simple Actor testing
 */
class Worker extends Actor {
  var commands: List[Command] = Nil

  def receive = {
    case msg: Command => {
      commands = msg :: commands
    }
    case msg: Execute => {
      for (command <- commands) {
        commands = commands.dropRight(1)
      }
    }
    case msg: CountCommandsQueued => {
      self.reply(new CountResponse(commands.size))
    }
  }
}


Self reference
In the simple example above I updated the code that uses the Actor API from within the Actor, by prefixing the calls with self. What you can also do is import the 'self methods' in the Actor class, in case you want to save time typing 'self' all the time. I've done this in the 0.10 update to my previous example to join messages in 0.8 (FirstMessageHandler Actor):

import org.scalatest.matchers.MustMatchers
import org.scalatest.Spec
import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.actor.{ActorRef, ActorRegistry, Actor}
/**
 * A Spec for the Aggregator
 */
class AggregatorSpecs extends Spec with MustMatchers {
  var aggregateMessageReceived: Boolean = false
  var aggregateMessage: AggregateMessage = null

  describe("An Aggregator") {

    describe("(when it receives FirstMessage and then SecondMessage)") {
      val firstRef = actorOf(new FirstMessageHandler())
      val secondRef = actorOf(new SecondMessageHandler())
      val receiveTestRef = actor {
        case msg: AggregateMessage => {
          aggregateMessageReceived = true
          aggregateMessage = msg

        }
      }
      val aggregator = actorOf(new Aggregator(receiveTestRef))
      firstRef.start
      secondRef.start
      aggregator.start

      it("should send an AggregateMessage containing data of FirstMessage and SecondMessage to the passed in actor") {
        firstRef ! new FirstMessage("id-1", "name-1")
        Thread.sleep(200)
        secondRef ! new SecondMessage("data-1")
        Thread.sleep(1000)
        aggregateMessageReceived must be === true
        aggregateMessage.id must be === "id-1"
        aggregateMessage.name must be === "name-1"
        aggregateMessage.data must be === "data-1"
        firstRef.stop
        secondRef.stop
        aggregator.stop
      }
    }
  }
}

/** A message that is expected to arrive first*/
case class FirstMessage(id: String, name: String)
/** A message that is expected to arrive second*/
case class SecondMessage(data: String)
/** An aggregated message, from first and second */
case class AggregateMessage(id: String, name: String, data: String)
/** A command to get the last message*/
case class GiveMeLastMessage()

/**
 * An Aggregator actor that aggregates a first and second message type
 */
class Aggregator(pass: ActorRef) extends Actor {
  def receive = {
    case msg: SecondMessage => {
      println("Aggregator, my data is " + msg.data)
      val firstMessageHandler: ActorRef = ActorRegistry.actorsFor(classOf[FirstMessageHandler]).head
      var reply: Option[Any] = firstMessageHandler !! new GiveMeLastMessage
      if (reply.isDefined) {
        val first: FirstMessage = reply.get.asInstanceOf[FirstMessage]
        println("Aggregator, my first message is " + first.id)
        val ag = new AggregateMessage(first.id, first.name, msg.data)
        pass ! ag
      }
    }
  }
}

/**
 * A Message Handler for the SecondMessage type
 */
class SecondMessageHandler extends Actor {
  def receive = {
    case msg: SecondMessage => {
      // do some processing
      println("Secondmessage, my data is " + msg.data)
      // then call the aggregator
      val aggregator: ActorRef = ActorRegistry.actorsFor(classOf[Aggregator]).head
      aggregator ! msg
    }
  }
}

/**
 * A Message Handler for the FirstMessage type
 */
class FirstMessageHandler extends Actor {
  import self._
  var lastMessage: Option[FirstMessage] = None
  var lastRequestor: Option[Any] = None

  def receive = {
    case msg: FirstMessage => {
      // do some processing
      println("Firstmessage, my name is " + msg.name)

      lastMessage = Some(msg)
      if (lastRequestor != None) {
        val a = lastRequestor.asInstanceOf[ActorRef]
        a ! msg
        lastMessage = None
      }
    }
    case msg: GiveMeLastMessage => {
      if (!lastMessage.isDefined) {
        lastRequestor = senderFuture
      } else {
        reply(lastMessage.get)
      }
    }
  }
}

Protobuf Serialization
The Serializable.Protobuf trait has been removed in 0.10, and Akka 0.10 now uses protobuf 2.3.0. Below the updated code for 0.10 for the previous simple protobuf serialization example in 0.8:

package unit.akka

import se.scalablesolutions.akka.actor.Actor
import se.scalablesolutions.akka.actor.Actor._
import org.scalatest.{BeforeAndAfterAll, Spec}
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import se.scalablesolutions.akka.remote.{RemoteClient, RemoteServer}
import org.scalatest.matchers.{MustMatchers, ShouldMatchers}
import unit.test.proto.Commands
import com.google.protobuf.Message
import unit.test.proto.Commands.WorkerCommand
import unit.akka.CommandBuilder._

/**
 * Test to check if communicating with protobuf serialized messages works.
 */
@RunWith(classOf[JUnitRunner])
class ProtobufSpecs extends Spec with BeforeAndAfterAll with MustMatchers {
  var server: RemoteServer = new RemoteServer()

  override def beforeAll(configMap: Map[String, Any]) {
    server.start("127.0.0.1", 8091)
  }

  override def afterAll(configMap: Map[String, Any]) {
    RemoteClient.shutdownAll
    server.shutdown
  }

  describe("Send using Protobuf protocol") {

    it("should receive local protobuf pojo and reply") {
      // send a local msg, check if everything is ok
      val actor = actorOf(new TestProtobufWorker())
      actor.start
      val msg = Worker(1, "my-name-1", "my-data-1")
      val result: Option[Any] = actor !! msg
      result match {
        case Some(reply: Commands.WorkerCommand) => {
          // test actor changes name to uppercase
          reply.getName must be === "MY-NAME-1"
        }
        case None => fail("no response")
      }
      actor.stop
    }

    it("should receive remote protobuf pojo and reply") {
      //start a remote server
      val actor = actorOf(new TestProtobufWorker())
      //register the actor that can be remotely accessed
      server.register("protobuftest", actor)
      val msg = Worker(2, "my-name-2", "my-data-2")
      val result: Option[Any] = RemoteClient.actorFor("protobuftest", "127.0.0.1", 8091) !! msg

      result match {
        case Some(reply: Commands.WorkerCommand) => {
          // test actor changes name to uppercase
          reply.getName must be === "MY-NAME-2"
        }
        case None => fail("no response")
      }
      actor.stop
    }
  }
}


/**
 * Actor that sends back the message uppercased.
 */
class TestProtobufWorker extends Actor {
  def receive = {
    case msg: Commands.WorkerCommand => {
      log.info("received protobuf command pojo:" + msg.getId)
      val r = Worker(2, msg.getName.toUpperCase, msg.getData.toUpperCase)
      log.info("sending back a renamed pojo:" + r.getName)
      self.reply(r)
    }
  }
}

/**
 * Shortcuts for creating WorkerCommands
 */
object CommandBuilder {
  def Worker(id: Int, name: String, data: String): WorkerCommand = {
    Commands.WorkerCommand.newBuilder.setId(id).setName(name).setData(data).build
  }

  def Worker(command: WorkerCommand): WorkerCommand = {
    Commands.WorkerCommand.newBuilder.setId(command.getId).setName(command.getName).setData(command.getData).build
  }
}

You now just work directly with the protobuf messages, instead of creating a case class that extends the Serializable.Protobuf trait and that serializes to and from the protobuf message. In the test I added the CommandBuilder object just for convenience.

If you want to check out the source code go to my github repo here.