Wednesday, June 23, 2010

Scala 2.8.0, Akka 0.8.1 and cassandra 0.6.2

In this post I'm going to install cassandra 0.6.2 on ubuntu karmic koala. I'm going to use the akka persistence module to access it and write a unit test to check if everything is working.

I've used the installation instructions for cassandra as shown here, with the following changes:

3. Add the following lines in /etc/apt/sources.list instead:

deb unstable main
deb-src unstable main

That's pointing to version 0.6.2 at the moment.

9. Instead change the JMX port in /etc/init.d/cassandra, it doesn't look like is used. When I changed it in, Cassandra kept bumping into my local Tomcat server.
The storage-conf.xml is in /etc/cassandra. The storage-conf.xml file that comes with akka does not seem to work and needs a small change, I have copied the below KeySpace definition in the /ect/cassandra/storage-conf.xml file:
<Keyspace Name="akka">
        <ColumnFamily CompareWith="UTF8Type" Name="map" />
        <ColumnFamily CompareWith="UTF8Type" Name="vector"/>
        <ColumnFamily CompareWith="UTF8Type" Name="ref"/>


      <!-- Number of replicas of the data -->

       ~ EndPointSnitch: Setting this to the class that implements
       ~ AbstractEndpointSnitch, which lets Cassandra know enough
       ~ about your network topology to route requests efficiently.
       ~ Out of the box, Cassandra provides org.apache.cassandra.locator.EndPointSnitch,
       ~ and PropertyFileEndPointSnitch is available in contrib/.
Cassandra complains if you don't add the ReplicaPlacementStrategy. I'm not sure yet what these settings mean exactly but I didn't let that stop me :)

You should now be able to start cassandra and use it from akka (sudo /init.d/etc/cassandra start). You can check if the akka keyspace is present, with the 'show keyspaces' command in cassandra-cli.

Be sure to check /var/log/cassandra/*.log if everything works, and you can start jconsole and connect to casssandra through JMX, check out some MBeans. (localhost, port 10036 in my case)

And then a simple unit test. What is important is that you use the lazy declaration for the storage, and use the atomic blocks. You can also use the persistent map straight from an Actor, but I like to do it this way because you can switch out the implementation (for instance from Cassandra to Redis), by passing in the below PersistentMap trait to for instance an Actor constructor.

You need to convert keys and values to byte arrays, so it is quite a low level interface for storing data. This unit test is very simple on purpose, I would probably use the Akka Serializable interfaces on case classes that I would want to store.

package unittest

import se.scalablesolutions.akka.stm.Transaction.Local._
import se.scalablesolutions.akka.util.Logging
import org.scalatest.{Spec, BeforeAndAfterAll}
import org.scalatest.matchers.ShouldMatchers
import se.scalablesolutions.akka.persistence.cassandra.CassandraStorage
import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith
import org.junit.Assert._
import org.apache.cassandra.service.NotFoundException

class AkkaCassandraTest extends Spec with ShouldMatchers with BeforeAndAfterAll with Logging {
  describe("Store stuff in Cassandra using Akka persistence abstraction") {
    it("should be able to store some stuff and find it again") {
      val map = new CassandraPersistentMap("test-map")
      map.put("test-key", "test-value")

      val someValue = map.get("test-key")
      if (someValue.isDefined) {
        val value = someValue.get"found " + value)
      } else {
        fail("test-key not found")
      val anotherValue = map.get("no-key")

 * Simple map trait  
trait PersistentMap {
  def put(key: String, value: String)

  def get(key: String): Option[String]

 * Simple Map that uses Cassandra
class CassandraPersistentMap(storageKey: String) extends PersistentMap {
  //Using atomic we define the transaction span.
  private lazy val storage = atomic {CassandraStorage.getMap(storageKey)}

  def put(key: String, value: String): Unit = {
    atomic {
      storage.put(key.getBytes, value.getBytes)

  def get(key: String): Option[String] = {
    atomic {
      val result = storage.get(key.getBytes)
      result match {
        case Some(bytes) => {
          Some(new String(bytes))
        case None => {