Distributed (in-memory) graph processing with Akka

Graphs have always been an interesting structure to study in both mathematics and computer science (among other fields), and have become even more interesting in the context of online social networks such as Facebook and Twitter, whose underlying network structures are nicely represented by graphs.

These graphs are typically “big”, even when sub-graphed by things such as location or school. With “big” graphs comes the desire to extract meaningful information from these graphs. In the age of multi-core CPU’s and distributed computing, concurrent processing of graphs proves to be an important topic.

Luckily, many graph analysis algorithms are trivially parallelizable. One example that comes to mind is all-pairs shortest path. In the case of an undirected, unweighted graph, we can consider each vertex individually, and do a full BFS from each vertex.

In this post I detail a general framework for distributed graph processing. I do not use any particular graph library, so my graph class will simply be called Graph. Popular graph libraries for Scala can be found in Twitter’s Cassovary project or the Graph for Scala project.

I will also make use of Derek Wyatt’s submission to the Akka Summer of Blog—”Balancing Workloads Across Nodes with Akka 2"—which provides a nice and simple implementation of a BalancingDispatcher in the context of distributed processing.

The goal will be to have the client to only need to specify an algorithm and the output filename. These two will be passed into a System, which will spawn a Master and ResultHandler actor. Then on each machine, we will start a microkernel which will read in our graph off the disk, spawn some workers, and begin the computation.

As a simple example, let’s implement a degree algorithm for a graph. The degree of a vertex is defined as the number of edges incident to that vertex. All we want in the end is a file that lists two columns, one for the ID of the vertex and another for the corresponding degree. Assume all vertex ID’s are Ints.

First, let’s implement the algorithm interface.

The Algorithm

We want a function that takes something as input and returns something as output. Thankfully for many graph algorithms, certain types of inputs and outputs are commonly used. For input, we clearly need a Graph, along with an input vertex, or a pair of vertices. For output, for each input vertex, you typically get either a numeric result, a collection of numeric results, or a map whose key is another input vertex and value that is either a numeric result or a collection of numeric results.

We can define a trait AbstractAlgorithm with an abstract execute method that takes in a Graph and a subclass of a trait AbstractInput and outputs a subclass of a trait AbstactResult. To be safe, it should return an Option[AbstractResult] on the off chance that input we did not expect is fed in.

In the case of our degree algorithm, we create a case class SingleVertexInput that extends AbstractInput and another case class LongResult that extends AbstractResult. Let’s also override the toString method so that later on when printing the results we can simply do something like

println(input + " " + result)

We can then implement an object DegreeAlgorithm that extends AbstractAlgorithm, look for a SingleNodeInput, and return the result. Most graph libraries have an easy way to get a degree of a vertex, so the implementation is trivial.

The System

In Derek Wyatt’s post, the work done by the Workers is typed as Any, since we know what kind of work we’ll be feeding them, we can type it as AbstractInput.

With that information, we can create our System. Our System will take as input the algorithm, a collection of work, and an output filename. From these it will create an ActorSystem, a ResultHandler actor (we pass the output filename to the constructor of this actor), a Master (we pass the algorithm and the reference of our ResultHandler to the constructor), send all the work to the Master, and then send an AllWorkSent message to the Master.

We send the ResultHandler reference to the Master as we will soon see, instead of tying each work we send to the System (as Derek Wyatt’s implementation does), we tie it to the ResultHandler.

The ResultHandler

The ResultHandler is easy enough to implement, it simply waits for results, writes them to a file, and once it gets a message notifying it that computation is done, it shuts down the system.

Note the ResultHandler expects a Result, which is a case class that encapsulates an AbstractInput and an AbstractResult.

The Master

The master remains mostly the same as Derek Wyatt’s save for a few modifications. First, as stated before, the constructor now takes as input an AbstractAlgorithm and an ActorRef.

Second, everywhere where there is “work” we type it as AbstractInput instead of Any (remember to do the same in the MasterWorkerProtocol)

Third, we have var allWorkReceived: Boolean = false and a def checkIfAllWorkIsDone fields added. The former starts out with a value false and becomes true once the Master receives the AllWorkSent message from the System. The latter checks to see if allWorkReceived is true and if all work delegated to Workers are complete - if so, it tells the resultHandler so.

Fourth, when the Master it notified of a Worker's existence, the algorithm is sent to the Worker (the DoAlgorithm case class will have to be added to the MasterWorker protocol).

Finally, since all work is tied to the resultHandler, instead of storing Tuple2s in the data structures we can just store the AbstractInput.

The Worker

The worker also remains mostly the same with the following differences. First in the constructor, it will also take a Graph as input which it will use to feed into the algorithm’s execute method.

Second, it will have a var algorithm: Option[AbstractAlgorithm] = None field that will contain the algorithm once it is received.

Third, all work is typed to AbstractInput as before.

Implementing the actual Worker is now easy, and we could in fact throw away the abstract Worker, but to mirror Derek Wyatt’s implementation let’s just implement the Worker interface. The actual doWork method defers the work to the algorithm’s execute method - if a valid result is returned we forward it to the ResultHandler in the form of a Result message (described above), otherwise we ignore it.

The Microkernel

The Akka Microkernel makes it easy to distribute code across several machines, and it can handle things we need such as reading in the graph off the disk and spawning the workers. Since the Bootable classes cannot take any constructor parameters, we can delegate info we need to a default config file that will hold graph information, as well as the location of the Master.

Conclusion

At this point we are more or less done. Note that this post only provided the bare essentials, more details may need to be implemented depending on what you want to do. In this case, the client will send the DegreeAlgorithm object along with a String it wants to use as the output result to the System. Once the Master is up, the user can start spawning Workers on other machines and off we go.

For other algorithm, the client need only create new AbstractInput classes if needed, new AbstractResult classes if needed, create a new AbstractAlgorithm object, and they are pretty much done.

Room for Improvement

As it stands, the number of messages being sent around the network is very high, |V| for algorithms that take in a single node as input and |V|^2 for algorithms that take pairs. If the results are large (e.g. a Vector[Long] or a Map[Int, Vector[Long]) this can cause the network to be congested. Depending on the algorithm, instead of considering individual vertices of pairs at a time, one could partition the graph vertices into n near-equal pieces and feed each remote machine a slice, and send only the results back once all the work for a particular slice is done. One downside to this however is that it may not be as flexible or easy to adapt to other algorithms that perhaps require more information than just a vertex or a pair of vertices.

Error handling could also be improved - in my implementation erroneous input is more or less ignored, similar to swallowing an exception. This means that aside from the logging of the Actors, there is not really and indication of bad behavior - this could perhaps be rectified with strategically placed exceptions or with better type safety than passing around extensions of vague traits.

About the Author

Adelbert Chang is a 3rd year B.S./M.S. Computer Science student at the University of California at Santa Barbara where he also works as a researcher in an on-campus research lab studying social graph modeling and analysis.

Recent comments

Blog comments powered by Disqus

6 Notes

  1. mox601 reblogged this from hakkers
  2. adelbertc submitted this to hakkers