Graphs have always been an interesting structure to study in both mathematics and computer science (among other fields), and have become even more interesting in the context of online social networks such as Facebook and Twitter, whose underlying network structures are nicely represented by graphs.
These graphs are typically “big”, even when sub-graphed by things such as location or school. With “big” graphs comes the desire to extract meaningful information from these graphs. In the age of multi-core CPU’s and distributed computing, concurrent processing of graphs proves to be an important topic.
Luckily, many graph analysis algorithms are trivially parallelizable. One example that comes to mind is all-pairs shortest path. In the case of an undirected, unweighted graph, we can consider each vertex individually, and do a full BFS from each vertex.
In this post I detail a general framework for distributed graph processing. I do not use any particular graph library, so my graph class will simply be called
Graph. Popular graph libraries for Scala can be found in Twitter’s Cassovary project or the Graph for Scala project.
I will also make use of Derek Wyatt’s submission to the Akka Summer of Blog—”Balancing Workloads Across Nodes with Akka 2"—which provides a nice and simple implementation of a BalancingDispatcher in the context of distributed processing.
The goal will be to have the client to only need to specify an algorithm and the output filename. These two will be passed into a
System, which will spawn a
ResultHandler actor. Then on each machine, we will start a microkernel which will read in our graph off the disk, spawn some workers, and begin the computation.
As a simple example, let’s implement a degree algorithm for a graph. The degree of a vertex is defined as the number of edges incident to that vertex. All we want in the end is a file that lists two columns, one for the ID of the vertex and another for the corresponding degree. Assume all vertex ID’s are
First, let’s implement the algorithm interface.
We want a function that takes something as input and returns something as output. Thankfully for many graph algorithms, certain types of inputs and outputs are commonly used. For input, we clearly need a
Graph, along with an input vertex, or a pair of vertices. For output, for each input vertex, you typically get either a numeric result, a collection of numeric results, or a map whose key is another input vertex and value that is either a numeric result or a collection of numeric results.
We can define a
trait AbstractAlgorithm with an abstract
execute method that takes in a
Graph and a subclass of a trait
AbstractInput and outputs a subclass of a trait
AbstactResult. To be safe, it should return an
Option[AbstractResult] on the off chance that input we did not expect is fed in.
In the case of our degree algorithm, we create a case class
SingleVertexInput that extends
AbstractInput and another case class
LongResult that extends
AbstractResult. Let’s also override the
toString method so that later on when printing the results we can simply do something like
println(input + " " + result)
We can then implement an object
DegreeAlgorithm that extends
AbstractAlgorithm, look for a
SingleNodeInput, and return the result. Most graph libraries have an easy way to get a degree of a vertex, so the implementation is trivial.
In Derek Wyatt’s post, the work done by the
Workers is typed as
Any, since we know what kind of work we’ll be feeding them, we can type it as
With that information, we can create our
System will take as input the algorithm, a collection of work, and an output filename. From these it will create an
ResultHandler actor (we pass the output filename to the constructor of this actor), a
Master (we pass the algorithm and the reference of our
ResultHandler to the constructor), send all the work to the
Master, and then send an
AllWorkSent message to the
We send the
ResultHandler reference to the
Master as we will soon see, instead of tying each work we send to the
System (as Derek Wyatt’s implementation does), we tie it to the
The ResultHandler is easy enough to implement, it simply waits for results, writes them to a file, and once it gets a message notifying it that computation is done, it shuts down the system.
ResultHandler expects a
Result, which is a case class that encapsulates an
AbstractInput and an
The master remains mostly the same as Derek Wyatt’s save for a few modifications. First, as stated before, the constructor now takes as input an
AbstractAlgorithm and an
Second, everywhere where there is “work” we type it as
AbstractInput instead of
Any (remember to do the same in the
Third, we have
var allWorkReceived: Boolean = false and a
def checkIfAllWorkIsDone fields added. The former starts out with a value
false and becomes
true once the
Master receives the
AllWorkSent message from the
System. The latter checks to see if
true and if all work delegated to
Workers are complete - if so, it tells the
Fourth, when the
Master it notified of a
Worker's existence, the algorithm is sent to the
DoAlgorithm case class will have to be added to the
Finally, since all work is tied to the
resultHandler, instead of storing
Tuple2s in the data structures we can just store the
The worker also remains mostly the same with the following differences. First in the constructor, it will also take a
Graph as input which it will use to feed into the algorithm’s
Second, it will have a
var algorithm: Option[AbstractAlgorithm] = None field that will contain the algorithm once it is received.
Third, all work is typed to
AbstractInput as before.
Implementing the actual
Worker is now easy, and we could in fact throw away the abstract
Worker, but to mirror Derek Wyatt’s implementation let’s just implement the
Worker interface. The actual
doWork method defers the work to the algorithm’s
execute method - if a valid result is returned we forward it to the
ResultHandler in the form of a
Result message (described above), otherwise we ignore it.
The Akka Microkernel makes it easy to distribute code across several machines, and it can handle things we need such as reading in the graph off the disk and spawning the workers. Since the
Bootable classes cannot take any constructor parameters, we can delegate info we need to a default config file that will hold graph information, as well as the location of the
At this point we are more or less done. Note that this post only provided the bare essentials, more details may need to be implemented depending on what you want to do. In this case, the client will send the
DegreeAlgorithm object along with a
String it wants to use as the output result to the
System. Once the
Master is up, the user can start spawning
Workers on other machines and off we go.
For other algorithm, the client need only create new
AbstractInput classes if needed, new
AbstractResult classes if needed, create a new
AbstractAlgorithm object, and they are pretty much done.
Room for Improvement
As it stands, the number of messages being sent around the network is very high, |V| for algorithms that take in a single node as input and |V|^2 for algorithms that take pairs. If the results are large (e.g. a
Vector[Long] or a
Map[Int, Vector[Long]) this can cause the network to be congested. Depending on the algorithm, instead of considering individual vertices of pairs at a time, one could partition the graph vertices into n near-equal pieces and feed each remote machine a slice, and send only the results back once all the work for a particular slice is done. One downside to this however is that it may not be as flexible or easy to adapt to other algorithms that perhaps require more information than just a vertex or a pair of vertices.
Error handling could also be improved - in my implementation erroneous input is more or less ignored, similar to swallowing an exception. This means that aside from the logging of the
Actors, there is not really and indication of bad behavior - this could perhaps be rectified with strategically placed exceptions or with better type safety than passing around extensions of vague traits.
About the Author
Adelbert Chang is a 3rd year B.S./M.S. Computer Science student at the University of California at Santa Barbara where he also works as a researcher in an on-campus research lab studying social graph modeling and analysis.