Tuesday, July 27, 2010

Upgrading examples to Akka master (0.10) and Scala 2.8.0 Final

Update on this post: I have upgraded the AkkaExampels git repo to version 1.0-RC1. Camel references are upgraded to 2.5.0. I have also included a reference to the scalablesolutions Akka maven repository, so that the akka-sbt-plugin is automatically downloaded at sbt update. AkkaExamples on github

In this post I am going to write about upgrading from Akka 0.8.1 to Akka 0.10. (at the moment 0.10 is not ready yet, but for simplicity I will refer to 0.10, To be exact it's git commit c737e2edf4b9ad980d6922f169ca4fb19f8c8284 :)
Also to make things a bit simpler, I have pushed my previous Akka examples here to github, so you can just clone from there and check it out. (You will need to build Akka as shown below first until 0.10 comes out, and be sure to run 'sbt update'  before you try to build)

Building Akka
At the time of writing there is no 0.10 release yet that you can just download, so I built Akka from master and published it to my local ivy repository. the sbt build has really improved a lot, the update is quicker and so is the build:

$ git clone git://github.com/jboner/akka.git
$ cd akka
$ sbt update
$ sbt dist
$ sbt publish-local
$ cd akka-sbt-plugin
$ sbt publish-local

I am also building and publishing the akka-sbt-plugin to local, so I can use it later.
Akka now uses Google Protobuf 2.3.0. If you are going to use Google Protobuf for your own messages, you will need to install the 2.3.0 protoc compiler, get it here and follow instructions in the readme file.
In case you are going to use Apache Camel, you need to upgrade the Camel dependencies to 2.4.0 (Akka 0.8.1 used Camel 2.2.0).

SBT Project file plugins
Akka 0.10 comes with a very nice addition for SBT, the AkkaProject sbt plugin. This makes it a lot more succinct and easy to start off with an Akka project, as you can see below:

import sbt._
import Process._
import sbt_akka_bivy._
class Project(info: ProjectInfo) extends DefaultProject(info) with AkkaProject with AkkaKernelDeployment{
  val akkaCamel = akkaModule("camel")
  val akkaKernel = akkaModule("kernel")
  val junit = "junit" % "junit" % "4.8.1" % "test->default"
  val camelFtp= "org.apache.camel" % "camel-ftp" % "2.4.0" % "compile"
  val camelMina= "org.apache.camel" % "camel-mina" % "2.4.0" % "compile"
  val camelJetty= "org.apache.camel" % "camel-jetty" % "2.4.0" % "compile"

  val scalatest = "org.scalatest" % "scalatest" % "1.2-for-scala-2.8.0.final-SNAPSHOT" % "test->default"
  override def repositories = Set( "scala-tools-snapshots" at 
      "http://scala-tools.org/repo-snapshots/"
  )
}

As you can see I have added some Camel components to play with, and I've added the scalatest and junit dependencies for unit testing. The Project file now uses the AkkaProject plugin and the Bivy Sack plugin (sbt-akka-bivy) for microkernel deployments (check here, really cool) which I am going to use in a later post for quick and easy deployment when I actually have something to deploy, the project only contains unit tests for now. Before you can use this Project file, you need to add a Plugins.scala file to the project/plugins directory, which describes the akka-sbt-plugin that we built, and it also pulls in the sbt-akka-bivy plugin for deployment:

import sbt._

class Plugins(info: ProjectInfo) extends PluginDefinition(info) {
  val bumRepo = "Bum Networks Release Repository" at "http://repo.bumnetworks.com/releases"
  val akkaPlugin = "se.scalablesolutions.akka" % "akka-sbt-plugin" % "0.10"
  val sbtAkkaBivy = "net.evilmonkeylabs" % "sbt-akka-bivy" % "0.2.0"
}


Run 'sbt update' to pull the dependencies in. After that you can setup a new project in IDEA. (I'm hoping an sbt IDEA plugin will get really good soon cause this always means some fiddling with the classpath).


Intellij IDEA
I am using the Intellij IDEA Community EAP (95.390 to be precise) which you can get from here.
Once you have installed it, update the Scala plugin in the plugins section if you had a previous version, otherwise install from the available list. (File-Settings-Plugins)

Choose a new project, from existing source, make sure you select the compile and test jars (in lib_managed) and the scala 2.8.0 jars, leave out the 2.7.7 ones from sbt. You can add a Scala Facet yourself, or IDEA will autodetect it and you can choose that it will create it for you, just make sure to use the 2.8.0 final ones. The same goes for the src 'IDEA module', you can add it in the new project wizard or in the Project Structure settings (set the src/main/scala as source and src/test/scala as test). Make sure you set IDEA to use the 2.8.0 final compiler and library in the Scala Facet settings, to the jars found in project/boot/scala-2.8.0/lib. Put the scala 2.8.0 jars right after your JDK in your module dependencies.


Project Structure - src module dependencies

Project Structure - src module sources


Project Structure - scala 2.8.0 project library

I have updated the examples from previous posts about Akka in the project on github. The biggest difference from Akka 0.8 is that the identity and the instance of an Actor are separated respectively into an ActorRef trait and an Actor trait, as described here. There is an Actor object as well which is used as a factory module. The Actor.actorOf method makes sure that you cannot access the Actor instance methods directly to prevent threading problems if you would do such a thing. In the examples I import all the Actor object methods. You can only send messages to the Actor implementation through the API exposed through the ActorRef class. Within the Actor instance, this is done through the self reference.

Below the updated code for 0.10 of the previous simple example in 0.8, to show the usage of actorOf in the specs and the self reference in the Worker Actor.

import org.scalatest.Spec
import org.scalatest.matchers.MustMatchers

import se.scalablesolutions.akka.actor.{Actor}
import se.scalablesolutions.akka.actor.Actor._

/**
 * A Spec for an example Worker Actor.
 */
class WorkerSpecs extends Spec with MustMatchers {
  describe("A Worker") {

    describe("(when it queues commands)") {
      it("should have the correct number of commands queued") {

        val command = new Command("test", "data")
        val actorRef = actorOf(new Worker())
        actorRef.start
        var reply: CountResponse = (actorRef !! new CountCommandsQueued()).getOrElse(fail()).asInstanceOf[CountResponse]
        reply.count must be === 0

        actorRef ! command
        actorRef ! command
        actorRef ! command
        reply = (actorRef !! new CountCommandsQueued()).getOrElse(fail()).asInstanceOf[CountResponse]
        reply.count must be === 3
        actorRef.stop
      }
    }
    describe("(when it executes all queued commands)") {
      it("should have no commands queued after executing") {
        val command = new Command("test", "data")
        val actorRef = actorOf(new Worker())
        actorRef.start
        actorRef ! command
        actorRef ! command
        var reply: CountResponse = (actorRef !! new CountCommandsQueued()).getOrElse(fail()).asInstanceOf[CountResponse]
        reply.count must be === 2
        actorRef ! new Execute()
        reply = (actorRef !! new CountCommandsQueued()).getOrElse(fail()).asInstanceOf[CountResponse]
        reply.count must be === 0
        actorRef.stop
      }
    }
  }
}

/**A command message */
case class Command(name: String, data: String)
/** A message to get the amount of commands queued*/
case class CountCommandsQueued()
/** A message to execute all commands queued */
case class Execute()
/** A message reply on the CountCommandsQueued message */
case class CountResponse(count: Int)

/**
 * A Worker actor that receives Commands, queues the commands, and executes commands on the Execute message,
 * as example for some simple Actor testing
 */
class Worker extends Actor {
  var commands: List[Command] = Nil

  def receive = {
    case msg: Command => {
      commands = msg :: commands
    }
    case msg: Execute => {
      for (command <- commands) {
        commands = commands.dropRight(1)
      }
    }
    case msg: CountCommandsQueued => {
      self.reply(new CountResponse(commands.size))
    }
  }
}


Self reference
In the simple example above I updated the code that uses the Actor API from within the Actor, by prefixing the calls with self. What you can also do is import the 'self methods' in the Actor class, in case you want to save time typing 'self' all the time. I've done this in the 0.10 update to my previous example to join messages in 0.8 (FirstMessageHandler Actor):

import org.scalatest.matchers.MustMatchers
import org.scalatest.Spec
import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.actor.{ActorRef, ActorRegistry, Actor}
/**
 * A Spec for the Aggregator
 */
class AggregatorSpecs extends Spec with MustMatchers {
  var aggregateMessageReceived: Boolean = false
  var aggregateMessage: AggregateMessage = null

  describe("An Aggregator") {

    describe("(when it receives FirstMessage and then SecondMessage)") {
      val firstRef = actorOf(new FirstMessageHandler())
      val secondRef = actorOf(new SecondMessageHandler())
      val receiveTestRef = actor {
        case msg: AggregateMessage => {
          aggregateMessageReceived = true
          aggregateMessage = msg

        }
      }
      val aggregator = actorOf(new Aggregator(receiveTestRef))
      firstRef.start
      secondRef.start
      aggregator.start

      it("should send an AggregateMessage containing data of FirstMessage and SecondMessage to the passed in actor") {
        firstRef ! new FirstMessage("id-1", "name-1")
        Thread.sleep(200)
        secondRef ! new SecondMessage("data-1")
        Thread.sleep(1000)
        aggregateMessageReceived must be === true
        aggregateMessage.id must be === "id-1"
        aggregateMessage.name must be === "name-1"
        aggregateMessage.data must be === "data-1"
        firstRef.stop
        secondRef.stop
        aggregator.stop
      }
    }
  }
}

/** A message that is expected to arrive first*/
case class FirstMessage(id: String, name: String)
/** A message that is expected to arrive second*/
case class SecondMessage(data: String)
/** An aggregated message, from first and second */
case class AggregateMessage(id: String, name: String, data: String)
/** A command to get the last message*/
case class GiveMeLastMessage()

/**
 * An Aggregator actor that aggregates a first and second message type
 */
class Aggregator(pass: ActorRef) extends Actor {
  def receive = {
    case msg: SecondMessage => {
      println("Aggregator, my data is " + msg.data)
      val firstMessageHandler: ActorRef = ActorRegistry.actorsFor(classOf[FirstMessageHandler]).head
      var reply: Option[Any] = firstMessageHandler !! new GiveMeLastMessage
      if (reply.isDefined) {
        val first: FirstMessage = reply.get.asInstanceOf[FirstMessage]
        println("Aggregator, my first message is " + first.id)
        val ag = new AggregateMessage(first.id, first.name, msg.data)
        pass ! ag
      }
    }
  }
}

/**
 * A Message Handler for the SecondMessage type
 */
class SecondMessageHandler extends Actor {
  def receive = {
    case msg: SecondMessage => {
      // do some processing
      println("Secondmessage, my data is " + msg.data)
      // then call the aggregator
      val aggregator: ActorRef = ActorRegistry.actorsFor(classOf[Aggregator]).head
      aggregator ! msg
    }
  }
}

/**
 * A Message Handler for the FirstMessage type
 */
class FirstMessageHandler extends Actor {
  import self._
  var lastMessage: Option[FirstMessage] = None
  var lastRequestor: Option[Any] = None

  def receive = {
    case msg: FirstMessage => {
      // do some processing
      println("Firstmessage, my name is " + msg.name)

      lastMessage = Some(msg)
      if (lastRequestor != None) {
        val a = lastRequestor.asInstanceOf[ActorRef]
        a ! msg
        lastMessage = None
      }
    }
    case msg: GiveMeLastMessage => {
      if (!lastMessage.isDefined) {
        lastRequestor = senderFuture
      } else {
        reply(lastMessage.get)
      }
    }
  }
}

Protobuf Serialization
The Serializable.Protobuf trait has been removed in 0.10, and Akka 0.10 now uses protobuf 2.3.0. Below the updated code for 0.10 for the previous simple protobuf serialization example in 0.8:

package unit.akka

import se.scalablesolutions.akka.actor.Actor
import se.scalablesolutions.akka.actor.Actor._
import org.scalatest.{BeforeAndAfterAll, Spec}
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import se.scalablesolutions.akka.remote.{RemoteClient, RemoteServer}
import org.scalatest.matchers.{MustMatchers, ShouldMatchers}
import unit.test.proto.Commands
import com.google.protobuf.Message
import unit.test.proto.Commands.WorkerCommand
import unit.akka.CommandBuilder._

/**
 * Test to check if communicating with protobuf serialized messages works.
 */
@RunWith(classOf[JUnitRunner])
class ProtobufSpecs extends Spec with BeforeAndAfterAll with MustMatchers {
  var server: RemoteServer = new RemoteServer()

  override def beforeAll(configMap: Map[String, Any]) {
    server.start("127.0.0.1", 8091)
  }

  override def afterAll(configMap: Map[String, Any]) {
    RemoteClient.shutdownAll
    server.shutdown
  }

  describe("Send using Protobuf protocol") {

    it("should receive local protobuf pojo and reply") {
      // send a local msg, check if everything is ok
      val actor = actorOf(new TestProtobufWorker())
      actor.start
      val msg = Worker(1, "my-name-1", "my-data-1")
      val result: Option[Any] = actor !! msg
      result match {
        case Some(reply: Commands.WorkerCommand) => {
          // test actor changes name to uppercase
          reply.getName must be === "MY-NAME-1"
        }
        case None => fail("no response")
      }
      actor.stop
    }

    it("should receive remote protobuf pojo and reply") {
      //start a remote server
      val actor = actorOf(new TestProtobufWorker())
      //register the actor that can be remotely accessed
      server.register("protobuftest", actor)
      val msg = Worker(2, "my-name-2", "my-data-2")
      val result: Option[Any] = RemoteClient.actorFor("protobuftest", "127.0.0.1", 8091) !! msg

      result match {
        case Some(reply: Commands.WorkerCommand) => {
          // test actor changes name to uppercase
          reply.getName must be === "MY-NAME-2"
        }
        case None => fail("no response")
      }
      actor.stop
    }
  }
}


/**
 * Actor that sends back the message uppercased.
 */
class TestProtobufWorker extends Actor {
  def receive = {
    case msg: Commands.WorkerCommand => {
      log.info("received protobuf command pojo:" + msg.getId)
      val r = Worker(2, msg.getName.toUpperCase, msg.getData.toUpperCase)
      log.info("sending back a renamed pojo:" + r.getName)
      self.reply(r)
    }
  }
}

/**
 * Shortcuts for creating WorkerCommands
 */
object CommandBuilder {
  def Worker(id: Int, name: String, data: String): WorkerCommand = {
    Commands.WorkerCommand.newBuilder.setId(id).setName(name).setData(data).build
  }

  def Worker(command: WorkerCommand): WorkerCommand = {
    Commands.WorkerCommand.newBuilder.setId(command.getId).setName(command.getName).setData(command.getData).build
  }
}

You now just work directly with the protobuf messages, instead of creating a case class that extends the Serializable.Protobuf trait and that serializes to and from the protobuf message. In the test I added the CommandBuilder object just for convenience.

If you want to check out the source code go to my github repo here.

5 comments:

  1. Great post! Very helpful. For camel-jetty, there's already a version 2.4.0.1 in Akka's embedded-repo that fixes a concurrency bug (will be officially released with Camel 2.5.0). When you're using camel-jetty for implementing consumer endpoints, it is highly recommended to use patched version.

    ReplyDelete
  2. Hi,

    import sbt_akka_bivy._
    When I run the compile on sbt it is complaining about not finding AkkaKernelDeployment. The plugin is in my local ivy repository. Should sbt find it automatically?

    MixMasta

    ReplyDelete
  3. ignore last post - it is compiling now.

    Getting this error on update -
    module not found: se.scalablesolutions.akka#akka-kernel_2.8.0;0.9.1

    Is this why you downloaded akka and published locally? because it is not released yet in the remote repository? I see in the repository it is built against akka-kernel_2.8.0.RC3?

    Laters
    MixMasta

    ReplyDelete
  4. @MixMasta yes exactly, that is why I published locally.

    ReplyDelete