3 ANÁLISIS DEL COMERCIO
Mapa 2-. Zona básica de comercio de Valladolid
A first step in tuning and debugging Spark is to have a deeper understanding of the system’s internal design. In previous chapters you saw the “logical” representation of RDDs and their partitions. When executing, Spark translates this logical representa‐ tion into a physical execution plan by merging multiple operations into tasks. Under‐ standing every aspect of Spark’s execution is beyond the scope of this book, but an appreciation for the steps involved along with the relevant terminology can be helpful when tuning and debugging jobs.
To demonstrate Spark’s phases of execution, we’ll walk through an example applica‐ tion and see how user code compiles down to a lower-level execution plan. The appli‐ cation we’ll consider is a simple bit of log analysis in the Spark shell. For input data, we’ll use a text file that consists of log messages of varying degrees of severity, along with some blank lines interspersed (Example 8-6).
Example 8-6. input.txt, the source file for our example
## input.txt ##
INFO This is a message with content INFO This is some other content (empty line)
INFO Here are more messages WARN This is a warning (empty line)
ERROR Something bad happened WARN More details on the bad thing INFO back to normal messages
We want to open this file in the Spark shell and compute how many log messages appear at each level of severity. First let’s create a few RDDs that will help us answer this question, as shown in Example 8-7.
Example 8-7. Processing text data in the Scala Spark shell
// Read input file
scala> val input = sc.textFile("input.txt")
// Split into words and remove empty lines scala> val tokenized = input.
| map(line => line.split(" ")).
| filter(words => words.size > 0)
// Extract the first word from each line (the log level) and do a count scala> val counts = tokenized.
| map(words => (words(0), 1)).
| reduceByKey{ (a, b) => a + b }
This sequence of commands results in an RDD, counts, that will contain the number of log entries at each level of severity. After executing these lines in the shell, the pro‐ gram has not performed any actions. Instead, it has implicitly defined a directed acy‐ clic graph (DAG) of RDD objects that will be used later once an action occurs. Each RDD maintains a pointer to one or more parents along with metadata about what type of relationship they have. For instance, when you call val b = a.map() on an RDD, the RDD b keeps a reference to its parent a. These pointers allow an RDD to be traced to all of its ancestors.
To display the lineage of an RDD, Spark provides a toDebugString() method. In Example 8-8, we’ll look at some of the RDDs we created in the preceding example.
Example 8-8. Visualizing RDDs with toDebugString() in Scala
scala> input.toDebugString res85: String =
(2) input.text MappedRDD[292] at textFile at <console>:13
| input.text HadoopRDD[291] at textFile at <console>:13 scala> counts.toDebugString
res84: String =
(2) ShuffledRDD[296] at reduceByKey at <console>:17
+-(2) MappedRDD[295] at map at <console>:17 | FilteredRDD[294] at filter at <console>:15 | MappedRDD[293] at map at <console>:15
| input.text MappedRDD[292] at textFile at <console>:13 | input.text HadoopRDD[291] at textFile at <console>:13
The first visualization shows the input RDD. We created this RDD by calling sc.textFile(). The lineage gives us some clues as to what sc.textFile() does since it reveals which RDDs were created in the textFile() function. We can see that it creates a HadoopRDD and then performs a map on it to create the returned RDD. The lineage of counts is more complicated. That RDD has several ancestors, since there are other operations that were performed on top of the input RDD, such as addi‐ tional maps, filtering, and reduction. The lineage of counts shown here is also dis‐ played graphically on the left side of Figure 8-1.
Before we perform an action, these RDDs simply store metadata that will help us compute them later. To trigger computation, let’s call an action on the counts RDD and collect() it to the driver, as shown in Example 8-9.
Example 8-9. Collecting an RDD
scala> counts.collect()
res86: Array[(String, Int)] = Array((ERROR,1), (INFO,4), (WARN,2))
Spark’s scheduler creates a physical execution plan to compute the RDDs needed for performing the action. Here when we call collect() on the RDD, every partition of the RDD must be materialized and then transferred to the driver program. Spark’s scheduler starts at the final RDD being computed (in this case, counts) and works backward to find what it must compute. It visits that RDD’s parents, its parents’ parents, and so on, recursively to develop a physical plan necessary to compute all ancestor RDDs. In the simplest case, the scheduler outputs a computation stage for each RDD in this graph where the stage has tasks for each partition in that RDD. Those stages are then executed in reverse order to compute the final required RDD. In more complex cases, the physical set of stages will not be an exact 1:1 correspond‐ ence to the RDD graph. This can occur when the scheduler performs pipelining, or collapsing of multiple RDDs into a single stage. Pipelining occurs when RDDs can be Components of Execution: Jobs, Tasks, and Stages | 147
computed from their parents without data movement. The lineage output shown in Example 8-8 uses indentation levels to show where RDDs are going to be pipelined together into physical stages. RDDs that exist at the same level of indentation as their parents will be pipelined during physical execution. For instance, when we are com‐ puting counts, even though there are a large number of parent RDDs, there are only two levels of indentation shown. This indicates that the physical execution will require only two stages. The pipelining in this case is because there are several filter and map operations in sequence. The right half of Figure 8-1 shows the two stages of execution that are required to compute the counts RDD.
Figure 8-1. RDD transformations pipelined into physical stages
If you visit the application’s web UI, you will see that two stages occur in order to fulfill the collect() action. The Spark UI can be found at http://localhost:4040 if you are running this example on your own machine. The UI is discussed in more detail later in this chapter, but you can use it here to quickly see what stages are executing during this program.
In addition to pipelining, Spark’s internal scheduler may truncate the lineage of the RDD graph if an existing RDD has already been persisted in cluster memory or on disk. Spark can “short-circuit” in this case and just begin computing based on the persisted RDD. A second case in which this truncation can happen is when an RDD
is already materialized as a side effect of an earlier shuffle, even if it was not explicitly persist()ed. This is an under-the-hood optimization that takes advantage of the fact that Spark shuffle outputs are written to disk, and exploits the fact that many times portions of the RDD graph are recomputed.
To see the effects of caching on physical execution, let’s cache the counts RDD and see how that truncates the execution graph for future actions (Example 8-10). If you revisit the UI, you should see that caching reduces the number of stages required when executing future computations. Calling collect() a few more times will reveal only one stage executing to perform the action.
Example 8-10. Computing an already cached RDD
// Cache the RDD scala> counts.cache()
// The first subsequent execution will again require 2 stages scala> counts.collect()
res87: Array[(String, Int)] = Array((ERROR,1), (INFO,4), (WARN,2), (##,1), ((empty,2))
// This execution will only require a single stage scala> counts.collect()
res88: Array[(String, Int)] = Array((ERROR,1), (INFO,4), (WARN,2), (##,1), ((empty,2))
The set of stages produced for a particular action is termed a job. In each case when we invoke actions such as count(), we are creating a job composed of one or more stages.
Once the stage graph is defined, tasks are created and dispatched to an internal scheduler, which varies depending on the deployment mode being used. Stages in the physical plan can depend on each other, based on the RDD lineage, so they will be executed in a specific order. For instance, a stage that outputs shuffle data must occur before one that relies on that data being present.
A physical stage will launch tasks that each do the same thing but on specific parti‐ tions of data. Each task internally performs the same steps:
1. Fetching its input, either from data storage (if the RDD is an input RDD), an existing RDD (if the stage is based on already cached data), or shuffle outputs. 2. Performing the operation necessary to compute RDD(s) that it represents. For
instance, executing filter() or map() functions on the input data, or perform‐ ing grouping or reduction.
3. Writing output to a shuffle, to external storage, or back to the driver (if it is the final RDD of an action such as count()).
Most logging and instrumentation in Spark is expressed in terms of stages, tasks, and shuffles. Understanding how user code compiles down into the bits of physical exe‐ cution is an advanced concept, but one that will help you immensely in tuning and debugging applications.
To summarize, the following phases occur during Spark execution:
User code defines a DAG (directed acyclic graph) of RDDs
Operations on RDDs create new RDDs that refer back to their parents, thereby creating a graph.
Actions force translation of the DAG to an execution plan
When you call an action on an RDD it must be computed. This requires comput‐ ing its parent RDDs as well. Spark’s scheduler submits a job to compute all needed RDDs. That job will have one or more stages, which are parallel waves of computation composed of tasks. Each stage will correspond to one or more RDDs in the DAG. A single stage can correspond to multiple RDDs due to
pipelining.
Tasks are scheduled and executed on a cluster
Stages are processed in order, with individual tasks launching to compute seg‐ ments of the RDD. Once the final stage is finished in a job, the action is com‐ plete.
In a given Spark application, this entire sequence of steps may occur many times in a continuous fashion as new RDDs are created.