Akka 2.0.2 released!

Ladies and gentlemen,

We recently released another incremental release in the Akka 2.0 series called 2.0.2 which features several bugfixes, performance enhancements, documentation improvements and deprecations for the upcoming 2.1 release.

Read more here

Cheers,

Watch the Routees

I answered a question in the Akka mailinglist with example code that might be interesting if you need to do something similar. The use case is to perform a job consisting of a bunch of tasks. Delegate the tasks to worker actors, behind a router. Aggregate the results of workers and reply with one answer. Additionally, a worker should retry failed messages a few times, and then stop. If one worker terminates due to failure the whole job should be aborted. 

Here is the example code:

The retry is implemented by forwarding the failing message to itself in preRestart. This places the message last in the mailbox, so ordering of messages is assumed to be unimportant for the processing of the tasks. Note that forward is necessary to retain the master as sender.

Workers are restarted in case of failure, and stopped after 2 restarts. This is handled by the supervisor of the worker routees, i.e. the router. A supervisorStrategy is specified when creating the router.

To be able to abort the whole job when one worker is stopped the master subscribe to termination, death watch, of the workers. References to the workers is obtained by sending a CurrentRoutees message to the router, which replies with RouterRoutees. context.watch(actorRef) initiates the death watch subscription. Terminated is delivered when a worker has been stopped.

When the master stops itself the children are also stopped, i.e. the router and its worker routees are stopped together with the master.

Aggregation is implemented by correlating the replies from the workers with a message id.

50 million messages per second - on a single machine

50 million messages per second on a single machine is mind blowing!

We have measured this for a micro benchmark of Akka 2.0.

As promised in Scalability of Fork Join Pool I will here describe one of the tuning settings that can be used to achieve even higher throughput than the amazing numbers presented previously. Using the same benchmark as in Scalability of Fork Join Pool and only changing the configuration we go from 20 to 50 million messages per second.

The micro benchmark use pairs of actors sending messages to each other, classical ping-pong. All sharing the same fork join dispatcher.

Hardware and configuration:

  • Processor: 48 core AMD Opteron (4 dual-socket with 6 core AMD® Opteron™ 6172 2.1 GHz Processors)
  • Memory: 128 GB ECC DDR3 1333 MHz memory, 16 DIMMs
  • OS: Ubuntu 11.10
  • JVM: OpenJDK 7, version “1.7.0_147-icedtea”, (IcedTea7 2.0) (7~b147-2.0-0ubuntu0.11.10.1)
  • JVM settings: -server -XX:+UseNUMA -XX:+UseCondCardMark -XX:-UseBiasedLocking -Xms1024M -Xmx2048M -Xss1M -XX:MaxPermSize=128m -XX:+UseParallelGC
  • Akka version: 2.0
  • Dispatcher configuration other than default: 
    parallelism 48 of fork-join-exector
    throughput as described  

Here is the result of using different values for the throughput setting of the dispatcher. 5 is the default value. The test was run with 96 actors and each test result was based on at least 15 seconds of execution time (960 million messages), long warmup excluded.

As you see the number of processed messages per second increase dramatically with increased throughput configuration setting up to 20.

When using even higher throughput values the curve becomes more flat, but with a maximum above 50 million messages per second.

What is the magic behind the throughput setting?

It configures how many messages an actor should process in a batch. For example throughput=20 means that once the dispatcher schedules a thread for the actor it will continue to process 20 messages, if the mailbox isn’t empty, before returning the thread to the pool.

The trade-off is that other actors that use the same dispatcher might have to wait longer before they get a chance to run, i.e. you trade higher throughput for increased latency. It is the classic tradeoff of throughput vs fairness. The optimal value depends on your use case, e.g. how long the message processing time is.

A related configuration setting is throughput-deadline-time, which defines how long time the actor is allowed to continue to process messages from the mailbox before the thread is returned to the pool.

Finally, let’s take a look at how the message throughput (msg/s) scales with number of actors when using throughput configuration value 200.

As you can see, we now get more than 50 million messages per second. Not bad at all. Download Akka 2.0 yourself and give her a spin. 

Happy hAkking. 

Why no mailboxSize in Akka 2 ?

Akka 1.x exposed a method to query the mailbox size, which was available both from inside an actor and from outside. We removed this method in 2.0 for a number of reasons, and since this topic came up on the mailing list multiple times already, here is my attempt at making the rationale accessible to a broader audience and disburden the akka-user group.

What are the problems with querying the mailbox size?

  • it takes O(n) time to get some size answer from a concurrent queue, i.e. querying hurts when you will feel the pain the most (it might even take several seconds in case of durable mailboxes at the “wrong moment”)
  • the answer is not correct, i.e. it does not need to match the real size at either the beginning or the end of processing this request
  • making it “more correct” involves book-keeping which severely limits scalability
  • and even then the number might have changed completely by the time you receive it in your code (e.g. your thread is scheduled out for 100ms and you get 100.000 new messages during that time).

So, no mailboxSize in the default implementation. (There are even more reasons as soon as you consider remote actor references and the asynchronous nature of everything within Akka, just to name a few.)

Why would you want to use it anyway?

Assuming there would be a method to query the mailbox size from within an actor (doing it from without is really impossible in a distributed setting), and assuming that the actor detects that messages are piling up faster than it can process them, what would its plan of action be? Slowing down the senders—by way of a bounded queue—is basically the only thing it can do itself. Other than that only the supervisor can do something, but your application is likely not prepared to handle that, since all the supervisor can do is terminate the poor guy, invalidating all the references the senders have.

But I want to check periodically and interrupt my long-running task when new messages arrive …

That is not a good idea, because this way you block the processing of internal messages (system messages) which are used in the implementation of supervision, actor selections and other things. It is a much better approach to break up long-running tasks into smaller packages and have the actor send these to itself continuously until the job is done; that way it stays fully reactive and does not hog resources uncontrollably. Or you hand off the big pieces of work to Futures, compose those with the awesome Future API and feed the result back to the target actor using pipeTo (or callbacks as per onSuccess, etc.).

So, what is the recommended way?

When designing an application, you will immediately spot (most of) the hot spots and create these actors using Routers. And if you forget one, don’t worry, it is quite painless to insert “.withRouterConfig(RoundRobinRouter(10))” when testing reveals a bottle-neck. This gets even better when using Resizers, which add elasticity to the scaling.

Okay, I considered all this and I still want mailbox metrics.

In case you cannot do without, it is quite easy to write your own mailbox implementation, building on the traits in the akka.dispatch package and inserting book-keeping code into enqueue() and dequeue(). Then you could either use down-casting (evil) or keep track of your mailboxes in an akka.actor.Extension (recommended) to access the stats from within your actor and do whatever is necessary.

But wait: did I mention that it might even be easier to tag latency-critical (but not too high frequency) messages with timestamps and react on the age of a message when processing it?

So, in summary: while there still is a way to get the mailbox size, you will probably never actually need it.

PS: In case you need mailbox metrics for monitoring and in general operating a deployed system, you might want to have a look at the Typesafe Console.

Scalability of Fork Join Pool

Akka 2.0 message passing throughput scales way better on multi-core hardware than in previous versions, thanks to the new fork join executor developed by Doug Lea. One micro benchmark illustrates a 1100% increase in throughput!

The new 48 core server had arrived and we were excited to run the benchmarks on the new hardware, but it was sad to see the initial results. It didn’t scale! What was wrong?

The purpose of the used micro benchmark is to see how throughput of message send and receive is affected by increasing number of concurrent, active, actors sharing the same dispatcher. Pairs of actors send messages to each other, classical ping-pong. Load is increased by adding more pairs of actors that are processing messages in parallel with other actors.

Full source code of the benchmark: TellThroughputPerformanceSpec.scala

Hardware and configuration:

  • Processor: 48 core AMD Opteron (4 dual-socket with 6 core AMD® Opteron™ 6172 2.1 GHz Processors)
  • Memory: 128 GB ECC DDR3 1333 MHz memory, 16 DIMMs
  • OS: Ubuntu 11.10
  • JVM: OpenJDK 7, version “1.7.0_147-icedtea”, (IcedTea7 2.0) (7~b147-2.0-0ubuntu0.11.10.1)
  • JVM settings: -server -XX:+UseNUMA -XX:+UseCondCardMark -Xms1024M -Xmx2048M -Xss1M -XX:MaxPermSize=128m -XX:+UseParallelGC
  • Akka version: 2.0-M4
  • Dispatcher configuration other than default:
    core-pool-size:48 of thread-pool-exector
    parallelism:48 of fork-join-exector

When using thread pool executor (java.util.concurrent.ThreadPoolExecutor) the benchmark didn’t scale beyond 12 parallel actors. Throughput was stuck at 1.4 million messages per second and didn’t increase with added load even though the 48 core box at a first glance was not heavily loaded, less than 10% of total cpu capacity was used.

First we thought the BIOS configuration was wrong, since the machine was new, but after going through all settings and turning off all power savings options the result was still as bad. We also bought more memory, to use 4 DIMMs per CPU for maximum memory bandwidth.

We noticed that the number of context switches was abnormal, above 70000 per second.

That must be the problem, but what is causing it? Viktor came up with the qualified guess that it must be the task queue of the thread pool executor, since that is shared and the locks in the LinkedBlockingQueue could potentially generate the context switches when there is contention.

Discussions with Doug Lea resulted in an improved implementation of the fork join pool, which we have embedded in akka-actor, and is used by default. The task queue is striped using randomized queuing and stealing. Read more about it here.

When running the same benchmark with the fork-join-executor the context switches were normal, around 1300 per second.

The results of the benchmark illustrates that the throughput scales with number of actors up to the number of cores (48) and saturates at around 20 million messages per second.

There are several things that can be tuned to achieve even higher throughput, which we will describe in follow up blog posts.