Thursday, May 6, 2010

Join messages with Akka

Update to this post: I have upgraded the examples to Akka 1.0-RC1. Check the updated code at github here.

Akka raises the abstraction level for writing concurrent systems. In this post I want to give a simple example of this.

Let's say that you have two types of messages coming into your system (of let's say type FirstMessage and SecondMessage), and you would like to:
1. First do some processing on them separately/independently and after that:
2. 'join' or correlate these two messages when they come in one after the other in a pair, based on the time they are received by the system and send out a new message.
3. All processing has to be non-blocking.
4 only make aggregates of the messages that come in as [FirstMessage, SecondMessage], ignoring consecutive FirstMessages.

In this example, the only thing you know is that one type of message always comes in a little bit later than the other, and when this happens, you want to combine the information in these two messages into one outgoing message. And, this is not the only thing happening in the system, you also want to handle these messages asynchronously, do some type of processing before aggregation.

Something like this (apologies for the crappy graphics, got no time to waste on it :)

I tried to sort of use the enterprise integration patterns diagrams. On the left side the two types of messages that come in (let's call them of type FirstMessage and SecondMessage), on the right side the result (AggregatedMessage). I was so lazy to make this diagram, that I left out the two processes that do something with FirstMessage and SecondMessage on just before t=1 and t = 2, but I guess you get the drift.

Without something like Akka, I would probably dive into the java concurrent package.. (what follows is a braindump, you can add your own pinch of salt :)

Anyway, grab out my favourites: ExecutorService, LinkedBlockingQueue and others, create a Runnable class, run in a while loop, poll (or take) from the queue, and based on the type of message in the queue, store that in some state in the Runnable implementing class, let's call that the Aggregator.

(would have to be thread safe, so I need to think about locking, or use a concurrent map, key=type of message, value = list of some wrapper that holds the message plus the time you received it). Then if the message that comes in is of type FirstMessage, put it in the list in the map, if the message is of type SecondMessage, get the last put FirstMessage out of the map, combine the two, creating an AggregatedMessage, remove the handled messages from the data structure. Then I would be able to send that AggregatedMessage to something else, for that I would need to think of a way to call for instance some remoting API, web services or whatever you like, get a proxy to that from inside the Aggregator, need to wire that up in some kind of way. And of course I would need to find a way to get the Aggregator up and running in an ExecutorService, and be able to call it from the outside in a reasonable way. Of course I would need to think of how to start and stop the Aggregator, monitor it's health, be able to restart it if it fails, stuff like that.

And then I need to unit test this on thread safety, calling the code from two threads to simulate the two streams of message types, making sure it really works, mock out the web service in and out, oh yeah write some WSDL, do some config of JAX-WS (oh crap that's synchronous by nature), probably finding out along the way that sometimes SecondMessage comes before FirstMessage, rounding the whole thing out...


And maybe I will use the ConcurrentLinkedQueue instead of the BlockingQueue, just to find out that my thread is tightly spinning in my while(!stop) loop when it's in production.. (yes that does happen.. time to add a Thread.sleep?)

And that's only the aggregator, I haven't covered the two processes yet that need to take place before it (the ones I didn't draw in the diagram..) 


I'm actually already getting tired writing about it, let alone writing the actual code!

Anyway, this post was about akka, so how would I do something like this using Actors?

Well, a first try at it would be something like this:




import java.util.Date
import org.scalatest.matchers.MustMatchers
import org.scalatest.Spec
import se.scalablesolutions.akka.actor.{SupervisorFactory, ActorRegistry, Actor}
import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.config.ScalaConfig._

case class FirstMessage(id: String, name: String)
case class SecondMessage(data: String)
case class AggregateMessage(id: String, name: String, data: String)
case class GiveMeLastMessage()


class Aggregator(pass: Actor) extends Actor {
  def receive = {
    case msg: SecondMessage => {
      println("Aggregator, my data is " + msg.data)
      val firstMessageHandler: Actor = ActorRegistry.actorsFor(classOf[FirstMessageHandler]).head
      var reply: Option[FirstMessage] = firstMessageHandler !! new GiveMeLastMessage
      if (reply.isDefined) {
        val first: FirstMessage = reply.get
        println("Aggregator, my first message is " + first.id)
        val ag = new AggregateMessage(first.id, first.name, msg.data)
        pass ! ag
      }
    }
  }
}

class SecondMessageHandler extends Actor {
  def receive = {
    case msg: SecondMessage => {
      // do some processing
      println("Secondmessage, my data is " + msg.data)
      // then call the aggregator
      val aggregator: Actor = ActorRegistry.actorsFor(classOf[Aggregator]).head
      aggregator ! msg
    }
  }
}

class FirstMessageHandler extends Actor {
  var lastMessage: Option[FirstMessage] = None
  var lastRequestor: Option[Any] = None

  def receive = {
    case msg: FirstMessage => {
      // do some processing
      println("Firstmessage, my name is " + msg.name)

      lastMessage = Some(msg)
      if (lastRequestor != None) {
        val a = lastRequestor.asInstanceOf[Actor]
        a ! msg
        lastMessage = None
      }
    }
    case msg: GiveMeLastMessage => {
      if (!lastMessage.isDefined) {
        lastRequestor = senderFuture
      } else {
        reply(lastMessage.get)
      }
    }
  }
}


class AggregatorSpec extends Spec with MustMatchers {
  var aggregateMessageReceived: Boolean = false
  var aggregateMessage: AggregateMessage = null

  describe("An Aggregator") {

    describe("(when it receives FirstMessage and then SecondMessage)") {
      val first = new FirstMessageHandler()
      val second = new SecondMessageHandler()
      val receiveTest = actor {
        case msg: AggregateMessage => {
          aggregateMessageReceived = true
          aggregateMessage = msg

        }
      }
      val aggregator = new Aggregator(receiveTest)
      first.start
      second.start
      aggregator.start

      it("should send an AggregateMessage containing data of FirstMessage and SecondMessage to the passed in actor") {
        first ! new FirstMessage("id-1", "name-1")
        Thread.sleep(200)
        second ! new SecondMessage("data-1")
        Thread.sleep(1000)
        aggregateMessageReceived must be === true
        aggregateMessage.id must be === "id-1"
        aggregateMessage.name must be === "name-1"
        aggregateMessage.data must be === "data-1"
        first.stop
        second.stop
        aggregator.stop
      }
    }
  }
}


What I really like about it is that I can try something like this out in very few lines of code. And, the solution is quite simple. When receiving the SecondMessage type in Aggregator you can ask the FirstMessageHandler to give you the last message it received, and inside that Actor, you use the senderFuture to respond back to the Aggregator! this is of course a very rough first try at it and should expand the test to run from many threads etc, but it hardly took me any time to write.

No comments:

Post a Comment