Each of the policies described in the model above are first considered in isolation in order to understand how they work individually. Section 6.5 considers the interaction of configurations and combinations of these policies together.
6.3.1 Compilation Policy
When a user submits their partial specification, and the plan search returns its results, the compilation policy is able to start recommending plans for compilation. This is a three stage process. First, the policy enumerates all possible plans
for this specification, and allrooted subgraphs (subgraphs which start at a data
source, and perform zero or more analytical steps on that data) within each plan, accumulating the frequency with which each subgraph appears in the set of possible plans. These subgraphs and frequencies are then sorted such that subgraphs with the highest frequency of occurrence appear first in the output. Subgraphs with equal frequency are sorted in descending order of the number of components in the graph; longer subgraphs are preferred, as they are more likely to result in an (at least partially) successful match when the user finalises their design. This sorted set of subgraphs will naturally contain many plans which arecovered by (are strict subsets of) others in the enumeration. The final stage
of the compilation policy thus discards any so-called covered plans which are
sorted later in the output, so as to minimise redundant compilation effort.
6.3.2 Parameter Generation Policies
Each of the plans generated above makes use of a parameter generation policy before being added to a compilation queue. This research examines three
different parameter generation policies, each of which is limited to producing
a fixed number of parameter sets for each plan – this number is governed by a system-level parameter. If a parameter has not yet been observed by the Speculative Plugin, a default value is generated or retrieved from that
6. Speculative Execution of Analytic Workflows
component’s specification (if possible). If no default value can be generated or retrieved, speculative compilation of this candidate plan is cancelled – the next time a plan with this parameter is deployed by a user, there will be an observation to inform future parameter generation.
The first of these policies,Random, simply selects combinations of parameters
at random, based on those which have been deployed before. A refinement of this,
Frequency Weighted, selects parameters randomly with a weight derived from the frequency with which a given parameter set has been observed before. Finally, the Top Frequency generation policy generates a list of parameters based purely on the frequency with which they have been deployed, without any stochastic component.
6.3.3 Deployment Policies
Deployment decisions are taken on an on-line basis, as plans for a given search event are generated. This research tests four configurations of deployment. The
first two configurations simplyDisable all speculative deployment, orDeploy
Everything that is compiled. This provides a set of baseline figures for a naïve
approach. A third policy tests deployment of aRandom selection of compiled
jobs, with a system-level tuneable parameter for the probability of deploying a
given plan. Finally, theTop N policy uses the ordering of plan scores created
in the compilation policy to deploy only top-rated plans, limited to a number defined in a system-level parameter.
6.3.4 Termination Policy
In order to ensure that speculatively executed jobs do not overwhelm available
cluster resources, a termination policy is used. This employs the concept ofserver
ticks as a proxy for time which takes into account the level of activity in the Speculative Plugin: a server tick occurs every time a job is compiled or deployed by the Speculative Plugin. A job is described as having been “used” if either a user has requested its deployment (and not yet requested its termination),
or another job has used data which it publishes. If a job has not been “used”,
therefore, within the lastT server ticks, it isterminated immediately and marked
as unavailable for reuse. This approach can be considered similar to a Least Recently Used cache invalidation policy.
The behaviour of termination varies depending on the platform on which the given job is deployed. In most environments, termination should cancel any running tasks associated with the job (e.g., streaming processors, MapReduce tasks, Yarn containers, etc.). An extension to the termination policy may further be used to “clean up” unused data stored to shared filesystems (such as the output files from an Apache Pig job, or debug logging from a streaming analytic). The precise number of server ticks to use for these timeouts depends on the size and capacity of the cluster to which jobs are submitted. As such, it is dictated by a system-level tuning parameter.
6.3.5 Sub-Flow Identification & Sharing
In order to facilitate job sharing and sub-flow reuse, it is necessary to define a stable scheme for identifying a subgraph within a flow. This research achieves this by traversing the graph in topological order, accumulating a textual description of the generated code and input / output connections from each component. In the event of a tie in the topological ordering, the traversal uses a stable solution for tie-breaking. For each intermediate output in the plan’s flow, the accumulated description is hashed using SHA-256: the resulting digest is used as an identifier for that flow.
Once a job is generated in the Compilation Policy, anExport Selection Policy
adds nodes to the flow for platform-specific export implementations. These exports indicate that results from this portion of the analytic should be made available to future subscribers. The specific implementation of this loosely framed requirement depends on the runtime model employed in the target framework.
For example, in IBM InfoSphere Streams, an Export processing element is
6. Speculative Execution of Analytic Workflows
and the cost of this export operation is near-zero. In Apache Pig, this export operation must persist the full result-set of this portion of the analytic to the underlying filesystem (typically HDFS). This is not without cost, but has the advantage of making the full set of partial analysis available for future re-use – not just results for the data analysed while both the “publisher” and “subscriber” job are running.
When a plan is passed to the deployment process, it seeks the longest rooted subgraph which has results available, or failing that the longest which has already been compiled. These existing nodes are then cut from the new plan, and replaced with a matching Import node, to make use of the results from the speculatively prepared flow. When deployment for such a cut flow is requested, each of its transitive dependencies must first be deployed if results are not already available.