Thursday, June 19, 2014

Journal Migration

The last two posts were about testing recovery of the application state of an akka-persistence based application from a journal or a snapshot. This post concludes the series of posts on testing akka-persistence based application with some thoughts on testing migration of journals.

Journal Migration

The migration problem has already been addressed in the first post of this series. When the application evolves, the objects that are written to the journal evolve and thus their serialized representation has to change, too. To ensure that even if the evolved objects have a new serialized representation the deserialization is still able to read and process the old representation, custom akka-serializers are applied. So the idea is to maintain compatibility to existing journals and to restore the changed command-objects from the old serialized representation. Alternatively one could write a migration-tool that reads an existing journal and writes the content in the new serialization format. While this post focuses on the first approach the testing principles can be applied to both.

Snapshot Migration

Before we get into details, one word about migration of snapshots. In principle we have the same problems here as in case of journals. As the application evolves, the state of processors evolve and so does their serialized representation, and one has to make sure that the evolved application is still able to read existing snapshots. Like for the journal akka-persistence uses akka-serialization when it comes to writing the snapshot to the storage and that is why it makes sense to employ custom serializers for maintaining compatibility between versions. However, there is an important difference between the journal and snapshots. Snapshots can be considered a pure performance optimization and are not important from a functional point of view for a successful recovery of application state. That is why it might be a valid alternative to do without custom serializers and backwards compatibility in case of snapshots. Incompatible snapshots could simply be deleted in case of an upgrade. Of course this results in a longer recovery time for the first restart after an upgrade.

Because of this and because of the fact that maintaining backwards compatibility for snapshots is a very similar challenge (for implementation and test) as in case of journals, this post only considers migration of journals.

General Idea

Once again the idea is to reuse as much of the existing tests as possible. Basically we want to know if the test for recovery from a journal still works even when an old journal is used. So instead of writing the journal and recover from it in a single test-run, we rather save the journals produced by the tests when development has reached a state compatibility has to be maintained to (e.g. right after a release) and make the test read from this journal when testing recovery instead of reading from a journal produced in the same test.

Let's have a quick look at one of the recover-tests:

 1         val created = startApplication(persistDir) { application =>
 2           application.itemServiceTestExtension.createNewItem()
 3         }
 4         restartApplication(persistDir) { application =>
 5           application.itemServiceTestExtension.findItem(created.id) should be (
 6               Some(created))
 7         }

In lines 1-3 the journal is produced and the relevant state is kept locally in created. For our migration-test we do not need to produce a journal as this has been saved before. We rather just need to initialize created with a value that corresponds to the saved journal.

In lines 4-7 the application is recovered. For our migration-test we need to pass in the folder that contains the saved journal.

Saving old Journals

So the first challenge is to write a tool that saves the journals produced by the tests to a dedicated place. You may have noticed that the test-code above differs slightly from the initial-version shown in the post about testing recovery. The withApplication used before for both creating the journal and testing recovery are replaced by startApplication (line 1) and restartApplication (line 4) respectively. This allows us to inject different actions in either case.

For our use-case to save the journal to a dedicated place we simply copy the journal from the temporary folder to the dedicated place after the first application shutdown. For this we create a trait WithItemApplicationWithSaveJournal extending the well known WithItemApplication and overwriting startApplication like this:

 1   override def startApplication[A](persistDir: File)(block: TestApplication => A): A = {
 2     val result = super.startApplication(persistDir)(block)
 3     saveData(persistDir, result)
 4     result
 5   }
 6 
 7   def saveData[A](persistDir: File, result: A): Unit = {
 8     val destinationDir = migrationDataDir
 9     if(destinationDir.isDirectory)
10       FileUtils.deleteDirectoryContents(destinationDir)
11     FileUtils.copyDirectoryContents(persistDir, destinationDir)
12     Files.write(result.toString, new File(destinationDir, "expected.txt"),
13       Charsets.UTF_8)
14   }

When startApplication (line 2) returns, the application has not only been started, but the test-code (block) has been executed and the application is shutdown. Before it simply returns the result (line 4), it copies the temporary folder containing the journal (persistDir) to a dedicated place (line 3). In addition to this it writes a text file (expected.txt) to this folder with a string-representation of the result (line 12-13). We will see below how this helps us to prepare the relevant state for verification.

The dedicated place is migrationDataDir (line 8) which computes to the folder src/test/saved-journals/<id>/<test-class-name>/<test-name>/. The variables in this path are basically provided by a dummy-test that derives from the original ItemApplicationRecoverSpec and mixes in the trait WithItemApplicationWithSaveJournal like follows:

1 class SaveItemApplicationJournal
2     (protected val runId: String)
3     (implicit val tag: ClassTag[ItemApplicationRecoverSpec])
4     extends ItemApplicationRecoverSpec with JournalMigrationRecoverSource
5     with WithItemApplicationWithSaveJournal[ItemApplicationRecoverSpec]

<id> becomes runId (line 2), <test-class-name> is determined by tag (line 3) and <test-name> stands for the names of the individual tests in ItemApplicationRecoverSpec. This dummy test is run by a dedicated scala-application:

1 object SaveJournalApp extends App {
2   val runId = args.headOption.getOrElse("generic")
3 
4   run(new SaveItemApplicationJournal(runId))
5 }

This application simply instantiates the dummy-test with a certain runId and runs it using a scala-test test-runner (line 4). It could for example be used after each release of the application and actually also be included in a release-process. In that case the runId could be the version of the application. In our case it produces three folders, one for each test in ItemApplicationRecoverSpec, containing the journal created by the respective test and the file expected.txt containing the string-representation of the value returned by startApplication. So for our three tests these expected.txt files contain the created, updated or deleted item.

The files produced by SaveJournalApp should be added to the source-control as resources required to run tests as we cannot easily reproduce those files once the application evolves.

Running tests against old journals

The next step is to execute the test of ItemApplicationRecoverSpec while making sure that it reads the saved journals when restarting the application. Once again we basically just need to provide another variation of the WithItemApplication trait for this:

 1   protected def expectedValueFor(test: TestData): Any
 2 
 3   override def startApplication[A](persistDir: File)(block: (TestApplication) => A): A =
 4     expectedValueFor(currentTest).asInstanceOf[A]
 5 
 6   override def restartApplication[A](persistDir: File)(block: (TestApplication) => A): A =
 7     super.restartApplication(migrationDataDir)(block)

This overwrites startApplication and restartApplication. In case of startApplication it simply returns prepared values without executing the test-code (block) (line 4). These values have to be provided by the test-implementation in form of the method expectedValueFor (line 1).

restartApplication executes the block as usual, however instead of providing the temporary folder persistDir it passes the folder with the previously saved journal migrationDataDir (line 7).

Having this, a migration test basically just needs to provide the expected values and the metadata required to find the correct folder containing the previously saved journal:

 1   protected def runId = "0.1"
 2 
 3   protected def tag = classTag[ItemApplicationRecoverSpec]
 4 
 5   protected val Id = ItemId(1)
 6   protected def description(d: Long) = s"$d - description"
 7 
 8   protected val expectedValues: Iterator[Any] = Iterator(
 9     Item(Id, description(1)),
10     Item(Id, description(3)),
11     Item(Id, description(4)))
12 
13   override protected def expectedValueFor(test: TestData): Any =
14     expectedValues.next()

runId (line 1) and tag (line 3) represent this metadata for the folder-name. As you can see, the method expectedValueFor gets a scala-test TestData instance (line 13) which allows it to pick the right expected value. For simplicity reasons the implementation here ignores it and relies on the order in which the tests are executed (line 14). The actual values (line 9-11) can be derived from the expected.txt files that were generated along with the saved journals.

Test in Action

So just like in case of the snapshot-tests we are able to provide migration tests while fully reusing the existing test-logic. Now lets see this test in action. After the journals have been saved by running SaveJournalApp, we modify the domain model by adding the optional field rank: Option[Int] = None defaulting to None to Item and ItemDescription. Thanks to the play-json macro -based serialization that we used for the custom serializers they basically adapt to these changes automagically. Of course we expect that the Items recovered from an old journal have the field initialized to None and as this is also the default we do not even have to change the migration test and can simply rerun it without any further changes and indeed it runs through just fine.

However if we break compatibility of the macro-based serializer for example by adding a field rank: Int = 0, the test fails accordingly:

akka.persistence.RecoveryException:
  Recovery failure by journal (processor id = [/user/ItemActor])
...
Caused by: play.api.libs.json.JsResultException:
  JsResultException(errors:List((/template/rank,
    List(ValidationError(error.path.missing,WrappedArray())))))

So for this kind of change we would have to provide a json-serializer that initializes missing rank-fields with 0.

Conclusion

There is one thing to keep in mind when it comes to reusing a recovery test for this kind of migration-test. Once the original recovery test evolves, it might no longer be a suitable basis for the saved journals. Even just adding test-cases is not a good idea as there is no old journal for these new test-cases. So the migration-test and the original recovery-test might diverge when the development continues and you have to find a suitable strategy to avoid as much redundancy as possible.

This post concludes the series of posts about akka-persistence and testing. It showed that it is possible to re-use the same test for testing three different kind of recovery scenarios: recovery from a journal, a snapshot and an old journal of the previous release.

Thursday, May 29, 2014

Testing Recovery through Snapshots

In the last post I explained how to test if the state of an akka-persistence based application is properly recovered by replaying the journals after a system restart. This post is about testing the second way of recovering application state supported by akka-persistence: by using snapshots.

Testing Recovery through Snapshots

In addition to recover application state by replaying the entire journal akka-persistence supports taking snapshots of application state and recover from them. This typically decreases recovery time significantly as the state is not reconstructed command-message by command-message but rather all at once and only those commands that arrived after the snapshot was taken need to be replayed message by message.

Enabling Snapshots

To enable the ItemActor to take snapshots we only need a few lines of code:

 1     case SaveSnapshot => saveSnapshot(ItemActorSnapshot(itemById, idCounter))
 2     case SaveSnapshotSuccess(metadata) =>
 3     case SaveSnapshotFailure(metadata, cause) =>
 4 
 5     case SnapshotOffer(_, ItemActorSnapshot(itemMap, lastId)) =>
 6       this.itemById = itemMap
 7       this.idCounter = lastId

The actor reacts on the custom message SaveSnapshot by providing the actor's current state to saveSnapshot. The entire state of the actor is simply modelled as the case class ItemActorSnapshot containing the item-map and the id-counter. saveSnapshot basically responds with SaveSnapshotSuccess or SaveSnapshotFailure depending on the success of the operation and the actor can react accordingly. We do not want to go into the details of handling these properly here, that is why they are simply ignored.

When it comes to recovery of an actor (e.g. after a restart of the application) and akka-persistence finds a snapshot for a Processor it offers that (SnapshotOffer) before replaying messages from the journal that arrived after the snapshot was taken.

Testing Recovery

Testing recovery from snapshots is in principle pretty similar to testing recovery from a journal. We have almost the same steps:

  • start the application
  • modify the application's state by sending corresponding commands
  • take a snapshot
  • stop the application
  • restart the application (and recover from the snapshot)
  • verify the application's state by queries

As these steps are almost identical we should try to reuse as much as possible from the previous test. In fact we can reuse the entire test and just have to take care that a snapshot is taken before the application is stopped. Let's one more time have a look at the central method that starts/stops an application in the tests:

 1   def withApplication[A](persistDir: File)(block: TestApplication => A) = {
 2     val tmpDirPersistenceConfig = ConfigFactory.parseMap(
 3       Map(JournalDirConfig -> new File(persistDir, "journal").getPath,
 4         NativeLevelDbConfig -> false.toString,
 5         SnapshotDirConfig -> new File(persistDir, "snapshots").getPath)
 6         .asJava)
 7     val application = newItemApplication(tmpDirPersistenceConfig)
 8     ultimately(application.shutdown())(block(application))
 9   }
10 
11   def newItemApplication(config: Config) =
12     new ItemApplication(config) with ItemApplicationTestExtensions

The application is shutdown after the test by invoking the corresponding method of the application (line 8). If we are able to inject taking a snapshot here we are basically all set as in this case akka-persistence will find the snapshot after the next restart and recover from that instead of the journal. We can actually easily do this as we already amend the application with some test-extensions and we just have to modify this a bit for taking a snapshot at shutdown.

First we overwrite the newItemApplication method in the trait WithItemApplicationWithSnapshot that extends the original one WithItemApplication to amend the application with a snapshot-specific extension:

1 trait WithItemApplicationWithSnapshot extends WithItemApplication {
2   override def newItemApplication(config: Config) =
3     new ItemApplication(config) with ItemApplicationWithSnapshot
4 }

This extension overwrites the shutdown and also amends the ItemActor by overwriting itemActorProps:

 1 trait ItemApplicationWithSnapshot extends ItemApplicationTestExtensions
 2     with TestUtil { this: ItemApplication =>
 3 
 4   abstract override def itemActorProps: Props =
 5     Props(new ItemActor with RespondToSnapshotRequest)
 6 
 7   abstract override def shutdown() = {
 8     resultOf(itemActor ? SaveSnapshot) match {
 9       case _: SaveSnapshotSuccess =>
10       case SaveSnapshotFailure(_, cause) =>
11         sys.error(s"Saving snapshot failed with: $cause")
12     }
13     super.shutdown()
14   }
15 }

shutdown simply sends a SaveSnapshot message to ItemActor and waits for a response (line 8) before it actually shuts down the application (line 13). However to make ItemActor actually respond to this message it has to be modified slightly and that is why itemActorProps is overwritten as well (line 4) and the returned actor is extended with the trait RespondToSnapshotRequest:

 1 trait RespondToSnapshotRequest extends Actor {
 2 
 3   private var lastSnapshotSender: Option[ActorRef] = None
 4 
 5   abstract override def receive: Receive = respondToSnapshotReceive.orElse(super.receive)
 6 
 7   def respondToSnapshotReceive: Receive = {
 8     case SaveSnapshot =>
 9       lastSnapshotSender = Some(sender())
10       super.receive(SaveSnapshot)
11 
12     case message: SaveSnapshotSuccess =>
13       super.receive(message)
14       respondToSnapshotRequester(message)
15 
16     case message: SaveSnapshotFailure =>
17       super.receive(message)
18       respondToSnapshotRequester(message)
19   }
20 
21   private def respondToSnapshotRequester(response: AnyRef) = {
22     lastSnapshotSender.foreach(_ ! response)
23     lastSnapshotSender = None
24   }
25 }

The receive-method of this trait intercepts SaveSnapshot messages and keeps the sender of the message (line 9) before it continues with normal processing (line 10). The saved sender reference is used to forward the SaveSnapshotSuccess (line 12) or SaveSnapshotFailure (line 16) messages to it.

Armed with this the test for testing successful recovery from snapshots can simply extend the one for testing recovery from the journal but mix-in the WithItemApplicationWithSnapshot-trait:

1 class ItemApplicationRecoverFromSnapshotSpec extends ItemApplicationRecoverSpec
2     with WithItemApplicationWithSnapshot {

So there is no need to redundantly formulate the individual tests for recovery after create, update and delete. To verify how those tests are working we can break the implementation of taking snapshots. We could for example forget to include the id-counter in a snapshot, so stripping down the ItemActorSnapshot to case class ItemActorSnapshot(itemById: Map[ItemId, Item]). Running the tests immediately shows ...oops... that all are running fine. So the bug created by this change is not discovered by the tests. It seems we need an additional one. The difference between recovery from the journal and recovery from a snapshot is that when the state is recovered from the journal the same application logic is triggered as in case of normal operation. So if tests have proven that during normal operation the application's state is not being messed up, its hard to mess it up during recovery (as long as all command-messages really end up in the journal). However in case of snapshots this is different as the state is handled independently from the processed messages and that is why we have to add more tests. The bug we just introduced resets the id-counter to 0 after each restart. To verify that this does not occur we need to create a new item before and after the restart and check if both can be retrieved afterwards. The test looks like this:

 1         val existingItem = withApplication(persistDir) { application =>
 2           application.itemServiceTestExtension.createNewItem()
 3         }
 4         withApplication(persistDir) { application =>
 5           val service = application.itemServiceTestExtension
 6 
 7           val created = service.createNewItem()
 8 
 9           service.findItem(created.id) should be (Some(created))
10           service.findItem(existingItem.id) should be (Some(existingItem))
11         }

First an item is created before the restart (line 2) and another one after the restart (line 7). Then the test asserts that both can be retrieved (line 9-10). With the bug introduced above this test fails (in line 9) as the newly created item gets the same id as the old one and thus overwrites it. Fixing the bug makes the test run fine again.

This blog post showed how one can reuse the tests for recovery from the journal for testing recovery from snapshots. The next one shows if the same ideas can even be applied to testing successful recovery from an old journal.

Friday, May 23, 2014

Testing Recovery

In the last post I introduced the problem area of what and how to test applications that use akka-persistence for persisting their state. I demonstrated the first important test verifying that the custom serializers for the messages to be persisted are actually used. This second part is about testing the successful recovery of the application state after a restart.

Testing Recovery

To test recovery we basically have to execute the following steps:

  • start the application (with an empty journal)
  • modify the application's state by sending corresponding commands
  • stop the application
  • restart the application
  • verify the application's state by queries

While it sounds a little bit odd to start, stop and restart an application in a unit test, with the ItemApplicationFixture that we have seen in the last post it is actually not a big deal. Let's have another quick look at the central method: withApplication

 1   def withApplication[A](persistDir: File)(block: TestApplication => A) = {
 2     val tmpDirPersistenceConfig = ConfigFactory.parseMap(
 3       Map(JournalDirConfig -> new File(persistDir, "journal").getPath,
 4         NativeLevelDbConfig -> false.toString,
 5         SnapshotDirConfig -> new File(persistDir, "snapshots").getPath)
 6         .asJava)
 7     val application = newItemApplication(tmpDirPersistenceConfig)
 8     ultimately(application.shutdown())(block(application))
 9   }
10 
11   def newItemApplication(config: Config) =
12     new ItemApplication(config) with ItemApplicationTestExtensions

As you can see it takes a temporary folder (where journals and snapshots are stored) as one argument (persistDir) and test-code (block) as second argument. The application is started (line 7), the test is executed and in any case (failure or success) the application is shut down (line 8).

Armed with this we can pretty easily start, stop and restart the application in a test, like follows:

 1         val created = withApplication(persistDir) { application =>
 2           application.itemServiceTestExtension.createNewItem()
 3         }
 4         withApplication(persistDir) { application =>
 5           application.itemServiceTestExtension.findItem(created.id) should be (
 6               Some(created))
 7         }

This test starts the application (line 1), creates a new item (line 2) and shuts the application down by ending the block. Immediately after that it is restarted (line 4). As the directory for storing the journal is the same as before (persistDir) this should recover the state of the application from the previously written journal such that the following findItem successfully returns the item created before (line 5-6).

The application that is passed to the block is extended with some convenience functions (like itemServiceTestExtension) and also with a wrapper for the ItemService that eases invoking item-commands by waiting for the returned Futures. Let's have a quick look at createNewItem:

1   def createNewItem(template: ItemTemplate = newItemTemplate()): Item =
2     successOf(service.create(template))

And the implementation of successOf looks like follows:

1   def resultOf[A](future: Future[A]): A = Await.result(future, timeoutDuration)
2 
3   def successOf[A](future: Future[Try[A]]): A = successOf(resultOf(future))
4 
5   def successOf[A](result: Try[A]): A = result.get

Factoring out this kind of code required to handle the Future or Try avoids polluting the test details that do not contribute to the documentation aspect of the test, as the test does not test the creation of new items, but rather just the proper recovery.

Similar tests exist for verifying that an update is recovered successfully:

 1         val updated = withApplication(persistDir) { application =>
 2           val service = application.itemServiceTestExtension
 3 
 4           service.updateItem(service.createNewItem())
 5         }
 6         withApplication(persistDir) { application =>
 7           application.itemServiceTestExtension.findItem(updated.id) should be (
 8               Some(updated))
 9         }

or a delete:

 1         val deleted = withApplication(persistDir) { application =>
 2           val service = application.itemServiceTestExtension
 3 
 4           service.deleteItem(service.createNewItem().id)
 5         }
 6         withApplication(persistDir) { application =>
 7           application.itemServiceTestExtension.findItem(deleted.id) should be (
 8               None)
 9         }

We can easily verify that these tests are actually significant by introducing a bug in our code. For example, if we forget to wrap the UpdateItem command in a Persistent when we send it in ItemService:

1   def update(item: Item): Future[Try[Item]] =
2     (itemActor ? UpdateItem(item)).mapTo[Try[Item]]

as well as when we receive it in ItemActor:

1     case UpdateItem(item: Item) =>

the corresponding test fails with:

Some(Item(1,2 - description)) was not equal to Some(Item(1,3 - description))

This completes the tests for application-state recovery from the journal after a restart. What we still do not know is, if the application state can also be recovered from a snapshot. We will have a closer look into this in the next blog-post.

Sunday, April 6, 2014

Akka-Persistence and Testing

In my first scala project I had the pleasure not only to learn scala, akka and play but also to work with eventsourced. Eventsourced is an akka-based open-source library for implementing the concepts of event sourcing. The basic idea is that instead of persisting the current state of the application - like you usually do when working with a relational database -, you rather persist the individual messages that cause the state-changes. More details on this concept can be found for example in the documentation for eventsourced, its successor akka-persistence or an article by Martin Fowler. This blog-post focuses on a specific (typical) usage scenario and in particular on testing the various aspects of it that are important for real life projects.

Akka-Persistence and Testing

The example used in this blog-post is supposed to represent a complex Akka-based application (or parts of it) keeping its state in memory and using akka-persistence to persist this state. The tests are integration style tests accessing the application from outside. To keep things simple the application is just modelled as a simple CRUD service for generic items. The service basically delegates all operations to a stateful actor that persists its state through akka-persistence by using it as a write-ahead log (for command-sourcing). So the service stands for the entire interface of the application (respectively the part under test) and the actor for the arbitrarily complex logic behind this interface. In this case the interface is based on regular method calls, but the same ideas apply for a message based interface where actors are addressed directly.

When it comes to testing the integration with akka-persistence, I see 4 important aspects that should be covered:

  1. Are custom serializers used for serializing PersistentMessages into the journal?
  2. Does the application recover its state correctly after a restart when replaying the journal?
  3. Does the application recover its state correctly after a restart when reading state from a snapshot?
  4. Does the application recover its state correctly from an old journal of a previous release?

In this blog-post I just cover the first point and will address the other issues in following posts.

The Demo Application

Let's first have a closer look at the application. As already stated the actor is pretty simplistic:

 1 class ItemActor extends Processor {
 2 
 3   import ItemActor._
 4 
 5   private var itemById: Map[ItemId, Item] = Map.empty
 6   private var idCounter: Long = 0
 7 
 8   def receive = {
 9     case Persistent(CreateItem(template: ItemTemplate), _) =>
10       idCounter += 1
11       sender ! addItem(Item(ItemId(idCounter), template.description))
12 
13     case Persistent(UpdateItem(item: Item), _) =>
14       sender ! (if(itemById.isDefinedAt(item.id))
15         addItem(item)
16       else
17         Failure(new NonExistingItemCannotBeUpdatedException(item)))
18 
19     case Persistent(DeleteItem(itemId: ItemId), _) =>
20       val deletedItem = itemById.get(itemId)
21       deletedItem.foreach(itemById -= _.id)
22       sender ! deletedItem.fold[Try[Item]](
23         Failure(new NonExistingItemCannotBeDeletedException(itemId)))(
24         Success.apply)
25 
26     case GetItem(itemId: ItemId) =>
27       sender ! itemById.get(itemId)
28 
29     // SNAPSHOT BEGIN
30     case SaveSnapshot => saveSnapshot(ItemActorSnapshot(itemById, idCounter))
31     case SaveSnapshotSuccess(metadata) =>
32     case SaveSnapshotFailure(metadata, cause) =>
33 
34     case SnapshotOffer(_, ItemActorSnapshot(itemMap, lastId)) =>
35       this.itemById = itemMap
36       this.idCounter = lastId
37     // SNAPSHOT END
38   }
39 
40   private def addItem(item: Item): Try[Item] = {
41     itemById += (item.id -> item)
42     Success(item)
43   }
44 }

It extends Processor so akka-persistence can take care of persisting messages sent to it. The accepted messages are

  • CreateItem
  • UpdateItem
  • DeleteItem
  • GetItem

(The snapshot related messages can be ignored for the moment.) They return the created, updated, deleted or requested Item if the operation was successful and a failure if it was not. The first three are those that modify the actor's state and thus have to be logged into the journal by akka-persistence. That is why they are wrapped in a Persistent, while the GetItem can be received plainly. The state is basically a Map containing all Items by their ids (itemById in line 5).

The corresponding domain classes Item and ItemTemplate are likewise simplistic:

 1 class ItemId(val id: Long) extends AnyVal with Serializable
 2 object ItemId {
 3   def apply(id: Long): ItemId = new ItemId(id)
 4 }
 5 case class ItemTemplate(description: String) extends CommonItem
 6 case class Item(id: ItemId, description: String) extends CommonItem {
 7   def asTemplate: ItemTemplate = ItemTemplate(description)
 8   def withTemplate(template: ItemTemplate): Item =
 9     copy(description = template.description)
10 }

An Item contains an id and as representative for a more complex structure a single field called description. The ItemTemplate is used for creating new Items when the id is not yet known.

As described above the interface to our application is a service that wraps the ItemActor:

 1 class ActorService(itemActor: ActorRef)(implicit timeout: Timeout) extends ItemService {
 2   def find(itemId: ItemId): Future[Option[Item]] =
 3     (itemActor ? GetItem(itemId)).mapTo[Option[Item]]
 4 
 5   def create(template: ItemTemplate): Future[Try[Item]] =
 6     (itemActor ? Persistent(CreateItem(template))).mapTo[Try[Item]]
 7 
 8   def delete(itemId: ItemId): Future[Try[Item]] =
 9     (itemActor ? Persistent(DeleteItem(itemId))).mapTo[Try[Item]]
10 
11   def update(item: Item): Future[Try[Item]] =
12     (itemActor ? Persistent(UpdateItem(item))).mapTo[Try[Item]]
13 }

For each of the four commands we have seen above it offers corresponding methods. In each case the implementation creates the corresponding command, sends it to the actor (with ask) and returns the result wrapped in a Future to the caller. As we have already seen on the receiving side, those commands that modify the application's state are wrapped in a Persistent.

An ItemApplication rounds off the implementation and takes care of the initialization and dependency injection as well as the proper configuration of the akka-system.

 1 class ItemApplication(overwriteConfig: Config = ConfigFactory.empty()) {
 2 
 3   val akkaSystemConfig: Config = overwriteConfig.withFallback(
 4     akkaSerializerConfig[Command, CommandSerializer])
 5 
 6   private val config = akkaSystemConfig.withFallback(ConfigFactory.load())
 7   val system = ActorSystem(classOf[ItemApplication].getSimpleName, config)
 8   protected val itemActor =
 9     system.actorOf(ItemActor.props(), classOf[ItemActor].getSimpleName)
10   val itemService = new ActorService(itemActor)(Timeout(5.seconds))
11 
12   private def akkaSerializerConfig[M : ClassTag, S <: Serializer : ClassTag]
13       : Config = {
14     val messageClassName = classTag[M].runtimeClass.getName
15     val serializerClassName = classTag[S].runtimeClass.getName
16     ConfigFactory.parseString(s"""
17         |akka.actor {
18         |  serializers {
19         |    "$messageClassName" = "$serializerClassName"
20         |  }
21         |  serialization-bindings {
22         |    "$messageClassName" = "$messageClassName"
23         |  }
24         |}
25         |""".stripMargin)
26   }
27 
28   def shutdown(): Unit = {
29     system.shutdown()
30     system.awaitTermination()
31   }
32 }

The interesting part here is the configuration of the serializers for the akka-system. It is created in a hard-coded manner through the method akkaSerializerConfig, but what is it good for?

Customer Serializers

When akka-persistence writes the persistent messages received by a Processor to a journal it makes use of standard akka-serialization. By default this uses java-serialization. That means all messages are first converted to an Array[Byte] through java-serialization and afterwards this array is written to the journal. When the journal is replayed, the data is read from disk as Array[Byte] and this is converted back to messages again through java-(de)serialization. If you just go with the default you can quickly run into compatibility problems when trying to read an existing journal with a new version of the application as plain java-serialization by default does not ensure compatibility of the serialized data to evolved classes. To avoid this kind of problems you can either

  • migrate the journal files to the new format before starting the new application. However when using java-serialization this can be a challenging task by itself, since one application must be able to read an old serialized form and write the new serialized form of instances of the same class (in different versions) or
  • cautiously ensure that your messages stay downwards compatible to their serialized representation (e.g. by declaring serialVersionUID explicitly and things like implementing readObject, writeObject methods) or
  • you can make akka use custom serializers for your messages.

The latter gives you very good control over maintaining compatibility of the serialized representation of your message-instances with the implementation of the corresponding classes. Based on my experience I can recommend using custom serializers. The serializers should of course use a serial representation that can easily be kept downwards compatible. JSON is a valid alternative (even though certainly not one that performs best) as

  • it is very flexible when it comes to migrating structures
  • it might be used already in a program that offers a REST interface where the resources are represented in form of JSON documents

In addition to this it was very convenient to use it for this blog-post as the play-json-lib with macro-inception allows to write JSON (de-)serializers of case-classes in basically one line of code. The corresponding akka-serializer is implemented in the class CommandSerializers for all messages that derive from Command (see line 3-4) which are all messages to be written into the journal in our case. akka allows to configure custom serializers through the configuration object passed to the akka-system. Typically the corresponding configuration is read from a configuration file like application.conf or reference.conf. As the application needs to ensure backwards compatibility of the serialized representation of the message in the journal I do not consider the custom serializers for these classes as part of the configuration that could be modified by an administrator of the application. That is why this configuration is provided in a hard-coded manner in ItemApplication (line 12-26).

The test

The tricky part about this configuration is, that akka will ignore it silently if it contains errors. Thus we we will not notice any problems during runtime, if the configuration is wrong. In that case akka will silently use the default java-serialization, a fact that might remain unnoticed until the first migration problem appears. Even if the code-based configuration ensures that we cannot introduce typos in class-names there are still other things that might go wrong as we will see soon. That is why it is important to test that the configured serializers are actually used by akka.

For this we have the following test:

 1           in { application =>
 2         val serializer = SerializationExtension(application.system)
 3 
 4         Seq(CreateItem(newItemTemplate()),
 5             UpdateItem(newItem()),
 6             DeleteItem(newId)).foreach { expected =>
 7 
 8           val bytes = successOf(serializer.serialize(expected))
 9           val actual = new CommandSerializer()
10               .fromBinary(bytes, Some(expected.getClass))
11 
12           actual should be (expected)
13         }
14       }

At first this test retrieves the SerializationExtension from the akka-system of the application. Then it uses it to serialize each of the possible command-objects and afterwards de-serializes them with the CommandSerializer that is also expected to be used by the SerializationExtension. If the de-serialized command equals the original one we can be sure that the serialization config was correct.

The first test-run discloses that there is indeed an error in the configuration:

[WARN] [...] [...] [akka.serialization.Serialization (akka://ItemApplication)]
 Multiple serializers found for class
 de.blogspot.volkersyadb.akkapersistence.ItemActor$CreateItem,
 choosing first: Vector(
  (interface java.io.Serializable,akka.serialization.JavaSerializer@3545fe3b),
  (interface de...ItemActor$Command,de...CommandSerializer@635eed0))

Unexpected character ('�' (code 65533 / 0xfffd)): expected a valid value
 (number, String, array, object, 'true', 'false' or 'null')

As you can see bytes does not contain valid JSON and the parser complains about an unexpected character. However the more interesting output is the line above that. akka warns about the fact that there are multiple serializers for the command-object and it chooses an arbitrary one which happens to be the java-serializer in this case. If there are multiple alternative serializers for a class the akka documentation states:

You only need to specify the name of an interface or abstract base class of the messages. In case of ambiguity, i.e. the message implements several of the configured classes, the most specific configured class will be used, i.e. the one of which all other candidates are superclasses. If this condition cannot be met, because e.g. java.io.Serializable and MyOwnSerializable both apply and neither is a subtype of the other, a warning will be issued.

That means our interface (Command) must be more specific than Serializable. We can easily achieve this if we make Command extend Serializable. As soon as we have this the test runs through just fine.

Note that for this test to be relevant, it is important that it uses the same configuration as is used in production and not a configuration that is specifically made up for the test. In our case this is ensured by the ItemApplicationFixture that prepares and cleans the ItemApplication up for each test. Let's have a look at the central method:

 1   def withApplication[A](persistDir: File)(block: TestApplication => A) = {
 2     val tmpDirPersistenceConfig = ConfigFactory.parseMap(
 3       Map(JournalDirConfig -> new File(persistDir, "journal").getPath,
 4         NativeLevelDbConfig -> false.toString,
 5         SnapshotDirConfig -> new File(persistDir, "snapshots").getPath)
 6         .asJava)
 7     val application = newItemApplication(tmpDirPersistenceConfig)
 8     ultimately(application.shutdown())(block(application))
 9   }
10 
11   def newItemApplication(config: Config) =
12     new ItemApplication(config) with ItemApplicationTestExtensions

As you can see it uses the real ItemApplication (line 12) that is also used for production. It does provide test-specific extensions (ItemApplicationTestExtensions) and config (line 2-6) to ItemApplication but just to ease writing the tests and to ensure that the journals and snapshots for the tests are written to a temporary folder that is cleaned up after the test and that the java-leveldb implementation is used instead of the native one which is perfectly fine for testing and typically comes with less problems when running the tests in different environments.

This concludes the first blog-post about akka-persistence and testing. I will cover the other important topics about recovery, snapshots and migration in following posts so stay tuned.