Capítulo III: Elaboración de la solución propuesta
3.4 Diseño
3.4.1 Patrones de diseño
Topologies in Storm are analogous to jobs in Hadoop - they define the path data takes through your system and the operations applied along the way. Topologies are compiled locally and then submitted to a Storm cluster where they run indefinitely until stopped.
94 | Chapter 8: Intro to Storm+Trident
You define your topology and Storm handles all the hard parts — fault tolerance, ret‐
rying, and distributing your code across the cluster among other things.
For your first Storm+Trident topology, we’re going to create a topology to handle a typical streaming use case: accept a high rate event stream, process the events to power a realtime dashboard, and then store the records for later analysis. Specifically, we’re going to analyze the Github public timeline and monitor the number of commits per language.
A basic logical diagram of the topology looks like this:
i. 89-intro-to-storm-topo.png …
Each node in the diagram above represents a specific operation on the data flow. Initially JSON records are retrieved from Github and injected into the topology by the Github Spout, where they are transformed by a series of operations and eventually persisted to an external data store. Trident spouts are sources of streams of data — common use cases include pulling from a Kafka queue, Redis queue, or some other external data source. Streams are in turn made up of tuples which are just lists of values with names attached to each field.
The meat of the Java code that constructs this topology is as follows:
IBlobStore bs = new FileBlobStore(“~/dev/github-data/test-data”); OpaqueTransactio‐
The first two lines are responsible for constructing the spout. Instead of pulling directly from Github, we’ll be using a directory of downloaded json files so as not to a) unnec‐
essarily burden Github and b) unnecessarily complicate the code. You don’t need to worry about the specifics, but the OpaqueTransactionalBlobSpout reads each json file and feeds it line by line into the topology.
After creating the spout we construct the topology by calling new TridentTopolo gy(). We then create the topology’s first (and only) stream by calling newStream and passing in the spout we instantiated earlier along with a name, “github-activities”. We can then chain a series of method calls off newStream() to tack on our logic after the spout.
The each method call, appropriately, applies an operation to each tuple in the stream.
The important parameter in the each calls is the second one, which is a class that defines
Your First Topology | 95
the operation to be applied to each tuple. The first each uses the JsonParse class which parses the JSON coming off the spout and turns it into an object representation that we can work with more easily. Our second each uses ExtractLanguageCommits.class to pull the statistics we’re interested in from the parsed JSON objects, namely the language and number of commits. ExtractLanguageCommits.class is fairly straightforward, and it is instructive to digest it a bit:
public static class ExtractLanguageCommits extends BaseFunction { private static final Logger LOG = LoggerFactory.getLogger(ExtractLanguageCommits.class); public void execute(TridentTuple tuple, TridentCollector collector){ JsonNode node = (JsonNode) tuple.getValue(0); if(!node.get(“type”).toString().equals(“\"PushEvent\"”)) return; List values = new ArrayList(2); //grab the language and the action val‐
ues.add(node.get(“repository”).get(“language”).asText()); values.add(node.get(“pay‐
load”).get(“size”).asLong()); collector.emit(values); return; } }
There is only one method, execute, that excepts a tuple and a collector. The tuples coming into ExtractLanguageCommits have only one field, parsed-json, which con‐
tains a JsonNode, so the first thing we do is cast it. We then use the get method to access the various pieces of information we need.
At the time of writing, the full schema for Github’s public stream is available here, but here are the important bits:
{ “type”: “PushEvent”, // can be one of .. finish JSON… }
… finish this section …
At this point the tuples in our stream might look something like this:
(“C”, 2), (“JavaScript”, 5), (“CoffeeScript”, 1), (“PHP”, 1), (“JavaScript”, 1), (“PHP”, 2) We then group on the language and sum the counts, giving our final tuple stream which could look like this:
(“C”, 2), (“JavaScript”, 6), (“CoffeeScript”, 1), (“PHP”, 3)
The group by is exactly what you think it is - it ensures that every tuple with the same language is grouped together and passed through the same thread of execution, allowing you to perform the sum operation across all tuples in each group. After summing the commits, the final counts are stored in a database. Feel free to go ahead and try it out yourself.
So What?
You might be thinking to yourself “So what, I can do that in Hadoop in 3 lines…” and you’d be right — almost. It’s important to internalize the difference in focus between Hadoop and Storm+Trident — when using Hadoop you must have all your data sitting in front of you before you can start, and Hadoop won’t provide any results until pro‐
96 | Chapter 8: Intro to Storm+Trident
cessing all of the data is complete. The Storm+Trident topology you just built allows you to update your results as you receive your stream of data in real time, which opens up a whole set of applications you could only dream about with Hadoop.
Your First Topology | 97