Thursday, May 27, 2010

Testing Akka Remote Actor using Serializable.Protobuf

Just a short post to show how you can test a remote actor, where the messages are serialized using Google protobuf. Protobuf is automatically used for the internal akka protocols (to make remote actors possible for instance), but the messages that you define yourself are not automatically encoded using protobuf of course, (you would need to do quite a bit of reflection and include the protoc compiler in the framework, and some translation or mapping for that to automagically work, that's a path down the rabbit hole I'm glad the akka team didn't take).

By default java serialization is used. There is a setting in akka.conf for serialization, but that's only for the cluster protocol. I'm using akka 0.8 by the way.

Let's get back to the task at hand.

First you need to install protobuf 2.2.0. (check here). 2.3.0 gave me some issues, the generated java classes did not compile. (and akka 0.8 ships with the 2.2.0 jar, so you might be asking for trouble with 2.3.0)

Create a .proto file that describes the message (in my case I named the file ProtobufCommandPojo.proto):

package unit.test.proto;

message workercommand {
required uint64 id = 1;
required string name = 2;
required string data = 3;
}

Then generate the java code for the protobuf message (in my case to a java directory):


$ protoc ProtobufCommandPojo.proto --java_out ../../java

This generates quite a few java classes, mainly ProtobufCommandPojo.

Create a message case class that mixes in the Serializable.Protobuf trait:
package unit

import se.scalablesolutions.akka.serialization.Serializable
import test.proto.ProtobufCommandPojo

/**
 * message class that uses Serializable.Protobuf trait.
 * uses generated protobuf classes.
 */
case class ProtobufMixedIn(id: Long, name :String, data :String) extends Serializable.Protobuf[ProtobufMixedIn] {

     def getMessage : com.google.protobuf.Message = {
       ProtobufCommandPojo.workercommand.getDefaultInstance().toBuilder().setId(id).setData(data).setName(name).build()
     }
}

As you can see you use the generated class to do the actual protobuf message building. this is basically the mapping between protobuf and the message class.
Note: the Serializable.Protobuf class has a default implementation for fromBytes, which assumes that you do a standard mapping between your class and the protobuf message definition:
def fromBytes(bytes: Array[Byte]): T = getMessage.toBuilder.mergeFrom(bytes).asInstanceOf[T]

I'm expecting that if you build a protobuf message in a different way than is 'expected' in the getMessage method, you might not get the result you want. I'm guessing that there is some reflection or naming convention going on under the hood to make this default implementation work, I'll leave that investigation to you if you feel like you want to know how this works exactly.

And then the unit test. You simply work with the case class ProtobufMixedIn, as you would with a normal message. Akka does the rest.

package unit

import se.scalablesolutions.akka.actor.Actor
import org.scalatest.{BeforeAndAfterAll, Spec}
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import se.scalablesolutions.akka.remote.{RemoteClient, RemoteServer}
import org.scalatest.matchers.{MustMatchers, ShouldMatchers}

/**
 * Test to check if communicating with protobuf serialized messages works.
 */
@RunWith(classOf[JUnitRunner])
class ProtobufTest extends Spec with MustMatchers {
  describe("Send using Protobuf protocol") {

    it("should receive local protobuf pojo and reply") {
      // send a local msg, check if everything is ok
      val actor = new TestProtobufWorker()
      actor.start
      val msg = new ProtobufMixedIn(1, "my-name-1", "my-data-1")
      val result: Option[ProtobufMixedIn] = actor !! msg
      result match {
        case Some(reply: ProtobufMixedIn) => {
          // test actor changes name to uppercase
          reply.name must be === "MY-NAME-1"
        }
        case None => fail("no response")
      }
      actor.stop
    }

    it("should receive remote protobuf pojo and reply") {
      //start a remote server
      val server = new RemoteServer()
      val actor = new TestProtobufWorker()
      server.start("127.0.0.1", 8091)
      //register the actor that can be remotely accessed
      server.register("protobuftest", actor)
      val msg = new ProtobufMixedIn(2, "my-name-2", "my-data-2")

      val result: Option[ProtobufMixedIn] = RemoteClient.actorFor("protobuftest", "127.0.0.1", 8091) !! msg

      result match {
        case Some(reply: ProtobufMixedIn) => {
          // test actor changes name to uppercase
          reply.name must be === "MY-NAME-2"
        }
        case None => fail("no response")
      }
      //shutdown server and client processes
      server.shutdown
      RemoteClient.shutdownAll
    }
  }
}

/**
 * Actor that sends back the message uppercased.
 */
class TestProtobufWorker extends Actor {
  def receive = {
    case msg: ProtobufMixedIn => {
      log.info("received protobuf command pojo:" + msg.id)
      val r = new ProtobufMixedIn(2, msg.name.toUpperCase, msg.data.toUpperCase)
      log.info("sending back a renamed pojo:" + r.name)
      reply(r)
    }
  }
}

Thursday, May 6, 2010

Join messages with Akka

Update to this post: I have upgraded the examples to Akka 1.0-RC1. Check the updated code at github here.

Akka raises the abstraction level for writing concurrent systems. In this post I want to give a simple example of this.

Let's say that you have two types of messages coming into your system (of let's say type FirstMessage and SecondMessage), and you would like to:
1. First do some processing on them separately/independently and after that:
2. 'join' or correlate these two messages when they come in one after the other in a pair, based on the time they are received by the system and send out a new message.
3. All processing has to be non-blocking.
4 only make aggregates of the messages that come in as [FirstMessage, SecondMessage], ignoring consecutive FirstMessages.

In this example, the only thing you know is that one type of message always comes in a little bit later than the other, and when this happens, you want to combine the information in these two messages into one outgoing message. And, this is not the only thing happening in the system, you also want to handle these messages asynchronously, do some type of processing before aggregation.

Something like this (apologies for the crappy graphics, got no time to waste on it :)

I tried to sort of use the enterprise integration patterns diagrams. On the left side the two types of messages that come in (let's call them of type FirstMessage and SecondMessage), on the right side the result (AggregatedMessage). I was so lazy to make this diagram, that I left out the two processes that do something with FirstMessage and SecondMessage on just before t=1 and t = 2, but I guess you get the drift.

Without something like Akka, I would probably dive into the java concurrent package.. (what follows is a braindump, you can add your own pinch of salt :)

Anyway, grab out my favourites: ExecutorService, LinkedBlockingQueue and others, create a Runnable class, run in a while loop, poll (or take) from the queue, and based on the type of message in the queue, store that in some state in the Runnable implementing class, let's call that the Aggregator.

(would have to be thread safe, so I need to think about locking, or use a concurrent map, key=type of message, value = list of some wrapper that holds the message plus the time you received it). Then if the message that comes in is of type FirstMessage, put it in the list in the map, if the message is of type SecondMessage, get the last put FirstMessage out of the map, combine the two, creating an AggregatedMessage, remove the handled messages from the data structure. Then I would be able to send that AggregatedMessage to something else, for that I would need to think of a way to call for instance some remoting API, web services or whatever you like, get a proxy to that from inside the Aggregator, need to wire that up in some kind of way. And of course I would need to find a way to get the Aggregator up and running in an ExecutorService, and be able to call it from the outside in a reasonable way. Of course I would need to think of how to start and stop the Aggregator, monitor it's health, be able to restart it if it fails, stuff like that.

And then I need to unit test this on thread safety, calling the code from two threads to simulate the two streams of message types, making sure it really works, mock out the web service in and out, oh yeah write some WSDL, do some config of JAX-WS (oh crap that's synchronous by nature), probably finding out along the way that sometimes SecondMessage comes before FirstMessage, rounding the whole thing out...


And maybe I will use the ConcurrentLinkedQueue instead of the BlockingQueue, just to find out that my thread is tightly spinning in my while(!stop) loop when it's in production.. (yes that does happen.. time to add a Thread.sleep?)

And that's only the aggregator, I haven't covered the two processes yet that need to take place before it (the ones I didn't draw in the diagram..) 


I'm actually already getting tired writing about it, let alone writing the actual code!

Anyway, this post was about akka, so how would I do something like this using Actors?

Well, a first try at it would be something like this:




import java.util.Date
import org.scalatest.matchers.MustMatchers
import org.scalatest.Spec
import se.scalablesolutions.akka.actor.{SupervisorFactory, ActorRegistry, Actor}
import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.config.ScalaConfig._

case class FirstMessage(id: String, name: String)
case class SecondMessage(data: String)
case class AggregateMessage(id: String, name: String, data: String)
case class GiveMeLastMessage()


class Aggregator(pass: Actor) extends Actor {
  def receive = {
    case msg: SecondMessage => {
      println("Aggregator, my data is " + msg.data)
      val firstMessageHandler: Actor = ActorRegistry.actorsFor(classOf[FirstMessageHandler]).head
      var reply: Option[FirstMessage] = firstMessageHandler !! new GiveMeLastMessage
      if (reply.isDefined) {
        val first: FirstMessage = reply.get
        println("Aggregator, my first message is " + first.id)
        val ag = new AggregateMessage(first.id, first.name, msg.data)
        pass ! ag
      }
    }
  }
}

class SecondMessageHandler extends Actor {
  def receive = {
    case msg: SecondMessage => {
      // do some processing
      println("Secondmessage, my data is " + msg.data)
      // then call the aggregator
      val aggregator: Actor = ActorRegistry.actorsFor(classOf[Aggregator]).head
      aggregator ! msg
    }
  }
}

class FirstMessageHandler extends Actor {
  var lastMessage: Option[FirstMessage] = None
  var lastRequestor: Option[Any] = None

  def receive = {
    case msg: FirstMessage => {
      // do some processing
      println("Firstmessage, my name is " + msg.name)

      lastMessage = Some(msg)
      if (lastRequestor != None) {
        val a = lastRequestor.asInstanceOf[Actor]
        a ! msg
        lastMessage = None
      }
    }
    case msg: GiveMeLastMessage => {
      if (!lastMessage.isDefined) {
        lastRequestor = senderFuture
      } else {
        reply(lastMessage.get)
      }
    }
  }
}


class AggregatorSpec extends Spec with MustMatchers {
  var aggregateMessageReceived: Boolean = false
  var aggregateMessage: AggregateMessage = null

  describe("An Aggregator") {

    describe("(when it receives FirstMessage and then SecondMessage)") {
      val first = new FirstMessageHandler()
      val second = new SecondMessageHandler()
      val receiveTest = actor {
        case msg: AggregateMessage => {
          aggregateMessageReceived = true
          aggregateMessage = msg

        }
      }
      val aggregator = new Aggregator(receiveTest)
      first.start
      second.start
      aggregator.start

      it("should send an AggregateMessage containing data of FirstMessage and SecondMessage to the passed in actor") {
        first ! new FirstMessage("id-1", "name-1")
        Thread.sleep(200)
        second ! new SecondMessage("data-1")
        Thread.sleep(1000)
        aggregateMessageReceived must be === true
        aggregateMessage.id must be === "id-1"
        aggregateMessage.name must be === "name-1"
        aggregateMessage.data must be === "data-1"
        first.stop
        second.stop
        aggregator.stop
      }
    }
  }
}


What I really like about it is that I can try something like this out in very few lines of code. And, the solution is quite simple. When receiving the SecondMessage type in Aggregator you can ask the FirstMessageHandler to give you the last message it received, and inside that Actor, you use the senderFuture to respond back to the Aggregator! this is of course a very rough first try at it and should expand the test to run from many threads etc, but it hardly took me any time to write.

Tuesday, May 4, 2010

Starting with Akka part 2, Intellij IDEA, Test Driven Development

Update to this post: I have upgraded the examples to Akka 1.0-RC1. this newer post might be more helpful, or check directly at github here.

In this post I'm going to expand on the previous post's simple project, add the use of an IDE, and do some Test Driven Development with akka (with scalatest+junit)

First, check if your Project.scala file has the dependencies for scalatest and junit:


import sbt._

class Project(info: ProjectInfo) extends DefaultWebProject(info){
  override def repositories = Set(
  "Java.Net" at "http://download.java.net/maven/2",
  "jBoss" at "http://repository.jboss.org/maven2",
  "service mix" at "http://svn.apache.org/repos/asf/servicemix/m2-repo/",
  "Apache Camel" at "https://repository.apache.org/content/repositories/releases/",
  "Akka Maven Repository" at "http://scalablesolutions.se/akka/repository",
  "Multiverse Releases" at "http://multiverse.googlecode.com/svn/maven-repository/releases/",
  "GuiceyFruit" at "http://guiceyfruit.googlecode.com/svn/repo/releases/",
  "DataBinder" at "http://databinder.net/repo",
  "Configgy" at "http://www.lag.net/repo",
  ScalaToolsSnapshots)

  override def libraryDependencies = Set(

  /* servlet implementation */
  "org.eclipse.jetty"  % "jetty-server"   % "7.0.1.v20091125" % "test",
  "org.eclipse.jetty"  % "jetty-webapp"   % "7.0.1.v20091125" % "test",
                "org.scalatest" % "scalatest" % "1.0.1-for-scala-2.8.0.Beta1-with-test-interfaces-0.3-SNAPSHOT" % "test->default",
  /* camel */
  "org.apache.camel" % "camel-ftp" % "2.2.0" % "compile",
  /* akka dependencies */
  "se.scalablesolutions.akka" % "akka-kernel_2.8.0.Beta1"  % "0.8.1" % "compile",
  "se.scalablesolutions.akka" % "akka-core_2.8.0.Beta1"    % "0.8.1" % "compile",
  "se.scalablesolutions.akka" % "akka-servlet_2.8.0.Beta1" % "0.8.1" % "compile",
  "se.scalablesolutions.akka" % "akka-rest_2.8.0.Beta1"    % "0.8.1" % "compile")
  override def jettyPort = 9012
  val junit = "junit" % "junit" % "4.5" % "test"
}


As you can see I have also added a dependency to Camel FTP2, a Camel component for FTP, which I will be using in my next post.

At the moment I am using Intellij IDEA Community Edition EAP with the Scala Plugin for 2.8.0.Beta1. The EAP edition does come with a warning from jetbrains: "Please note that the quality of EAP versions may at times be way below even usual beta standards".

The Eclipse plugin is not usable at the moment, at least it didn't work for me. I did notice that IDEA was storing a +800MB index file somewhere in my home directory, which is kind of freaky, and compilation is a bit slow. I'm going to investigate later what I will lose by using a simpler editor and doing everything through sbt scripting. But for now I'm happy to use IDEA.

(Here is a good tip to add scala highlighting to gedit.)

First download the IDEA Community Edition EAP and install:


$ wget http://download.jetbrains.com/idea/ideaIC-95.54.tar.gz
$ sudo mkdir /usr/share/idea
$ sudo cp ideaIC-95.54.tar.gz /usr/share/idea
$ cd /usr/share/idea
$ tar xvf ideaIC-95.54.tar.gz
$ cd /usr/local/bin
$ sudo ln -s /usr/share/idea/idea-IC-95.54/bin/idea.sh 

  


Then run idea so that it creates a .IdeaIC90 directory in your home directory.
add a plugins directory to this directory:


$ cd ~/.IdeaIC90/config
$ mkdir plugins
  


Then download and install the latest scala plugin (at the moment scala-intellij-bin-0.3.1156.zip):


$ wget 'http://plugins.intellij.net/plugin/?action=download&id=8152' -O scala-plugin.zip
$ cp scala-plugin.zip .IdeaIC90/config/plugins
$ cd .IdeaIC90/config/plugins/
$ unzip scala-plugin.zip


Now run Idea, you should see the Scala plugin in the Plugins section. Create a new project from scratch, create it in the directory where your project is (in my case ~/akka_start):


Before I had some issues that IDEA would take the wrong scala facets and version, if that happens to you, check here:
http://www.jetbrains.net/devnet/thread/284354

Also, if the facet is not automatically included in your project, or you didn't check the Scala tickbox in the project wizard, open the .iml file in an editor (not in IDEA, it will be replaced) and add the facets so you get something like this (add the FacetManager part):


<?xml version="1.0" encoding="UTF-8"?>
<module type="JAVA_MODULE" version="4">
  <component name="FacetManager">
    <facet type="Scala" name="Scala">
      <configuration>
        <option name="takeFromSettings" value="true" />
        <option name="myScalaCompilerJarPaths">
          <array>
            <option value="$MODULE_DIR$/lib/scala-compiler.jar" />
          </array>
        </option>
        <option name="myScalaSdkJarPaths">
          <array>
            <option value="$MODULE_DIR$/lib/scala-library.jar" />
          </array>
        </option>
      </configuration>
    </facet>
  </component>
  <component name="NewModuleRootManager" inherit-compiler-output="true">
    <exclude-output />
    <content url="file://$MODULE_DIR$">
      <sourceFolder url="file://$MODULE_DIR$/src/main/scala" isTestSource="false" />
      <sourceFolder url="file://$MODULE_DIR$/src/test/scala" isTestSource="true" />
    </content>
    <orderEntry type="inheritedJdk" />
    <orderEntry type="sourceFolder" forTests="false" />
    <orderEntry type="library" name="scala-2.8.0.Beta1" level="application" />
    <orderEntry type="library" name="akka" level="project" />
  </component>
</module>


Now add the source and test folders in the project structure dialog:


And add the dependency to the jars in lib_managed/scala_2.8.0.Beta1/compile
If you followed the previous post, rebuild project should now complete with no errors, code completion should work and you are ready to go :)

As I said before, the IDEA scala compiler is a bit slow, so at the moment I don't compile after every change in IDEA. I keep this cool sbt feature running in a terminal:



$ sbt
$ ~test-quick


The ~ makes test-quick running as a triggered action, and compiles and runs all failed or dependent tests when any source modification is done.

I wrote a small test just to check if everything is working. (You would normally of course never put the test code and the scala code together in one file, but its simpler to read here)

The test starts a "Worker" Actor, queues a few Commands, and then checks the amount of commands that are queued in the worker, and checks if all commands are executed. I'm using the MustMatchers for the nice syntax ('must be' matcher). The red-green-refactor flow is very nice while using the ~test-quick command.



import org.scalatest.Spec
import org.scalatest.matchers.MustMatchers

import se.scalablesolutions.akka.actor.{Actor}


case class Command(name: String, data: String)

case class CountCommandsQueued()

case class Execute()

case class CountResponse(count: Int)


class Worker extends Actor {
  var commands: List[Command] = Nil

  def receive = {
    case msg: Command => {
      commands = msg :: commands
    }
    case msg: Execute => {
      for (command <- commands) {
        commands = commands.dropRight(1)
      }
    }
    case msg: CountCommandsQueued => {
      reply(new CountResponse(commands.size))
    }
  }
}

class WorkerSpec extends Spec with MustMatchers {
  describe("A Worker") {

    describe("(when it queues commands)") {
      it("should have the correct number of commands queued") {

        val command = new Command("test", "data")
        val actor = new Worker()
        actor.start
        var reply: CountResponse = (actor !! new CountCommandsQueued()).getOrElse(fail())
        reply.count must be === 0

        actor ! command
        actor ! command
        actor ! command
        reply = (actor !! new CountCommandsQueued()).getOrElse(fail())
        reply.count must be === 3
        actor.stop
      }
    }
    describe("(when it executes all queued commands)") {
      it("should have no commands queued after executing") {
        val command = new Command("test", "data")
        val actor = new Worker()
        actor.start
        actor ! command
        actor ! command
        var reply: CountResponse = (actor !! new CountCommandsQueued()).getOrElse(fail())
        reply.count must be === 2
        actor ! new Execute()
        reply = (actor !! new CountCommandsQueued()).getOrElse(fail())
        reply.count must be === 0
        actor.stop
      }
    }
  }
}