In the last set of experiments, we investigated the impact of failures on the throughput. The setup for this experiment is similar to the slice distribution example shown in Figure 6.2. However, we used three nodes instead of four and only two primary and two secondary slices that run on each individual node. Each group with four graphs in Figure 6.10 and 6.11 represents one physical node. Inside a group, the two graphs on the left column depict the throughput over time of the primary slices, while the two on the right depict the secondary slices.
Figure 6.10 illustrates the evolution of throughput under moderate load. At around 26 seconds, the second node becomes permanently unavailable due to a hardware or software fault, hence, slices c, d , b0 and e0 are not functional anymore. The unavailability of the crashed node is detected by the manager component, hence, the previous backup slices c0 and d0 (running on
Time (s) Throughput (kEv ents/s) 0 5 10 10 20 30 40
Slice (f) − Node #3 Slice (a') − Node #3 Slice (e) − Node #3
0 5 10 Slice (d') − Node #3 0 5 10 crash Slice (d) − Node #2 crash
Slice (e') − Node #2
crash Slice (c) − Node #2 0 5 10 crash Slice (b') − Node #2 0 5 10
Slice (b) − Node #1 Slice (c') − Node #1 Slice (a) − Node #1
10 20 30 40 0 5 10 Slice (f') − Node #1 Primary Secondary Primary Secondary
Figure 6.10: Node failure under moderate load. Time (s) Throughput (kEv ents/s) 0 20 40 60 80 10 20 30 40
Slice (f) − Node #3 Slice (a') − Node #3 Slice (e) − Node #3
0 20 40 60 80 secondary primary Slice (d') − Node #3 0 20 40 60 80 crash Slice (d) − Node #2 crash
Slice (e') − Node #2
crash Slice (c) − Node #2 0 20 40 60 80 crash Slice (b') − Node #2 0 20 40 60 80 Slice (b) − Node #1 secondary primary Slice (c') − Node #1 Slice (a) − Node #1
10 20 30 40 0 20 40 60 80 Slice (f') − Node #1 Primary Secondary Primary Secondary
Figure 6.11: Node failure under high load.
node 1 and 3, respectively) are transparently promoted to primary slices. Nevertheless, note that the upgrade does not incur any additional load on peer nodes as sufficient CPU cycles were already available for the processing of both, primary and secondary slices prior to the crash.
In the final experiment, shown in Figure 6.11, the system is running constantly under high load. Therefore, secondary slices a0, b0, c0, d0, e0 and f 0 have not processed events until the failure of the second node after 26 seconds. At this point, as secondary slices c0 and d0 become primary, the promotion forces additional load to the node. This additional load lowers the throughput for the primary slices a, b and e, f in nodes 1 and 3, respectively, as seen in the figure.
6.4 Related Work
The idea of interleaved partitioning of operator state in online processing systems was first used in Flux [SSC+02]. Flux also considers moving pieces of state to different nodes in order to handle long term load imbalances, and handles short term load imbalances by buffering events. However, Flux considers database-like operations and focuses on operators with group-by clauses. In contrast, we assume the state partitioning scheme as a feature of the programming model, forcing the user to consider the MapReduce programming model and considerably improving scalability and fault-tolerance potential. We also consider a highly- parallel cloud execution environment, which enables a different load-managing goal: when some partitions are overloaded, Flux tries to allow non-overloaded partitions to proceed, we, in contrast, focus on temporarily freeing more resources for the overloaded partitions. Because active replication provides a virtually instantaneous recovery (i.e., a replica can actively produce redundant outputs that are used when a primary fails), it has been commonly used in ESP systems (e.g., in Flux [SHB04] and Borealis [HBR+05]). Nevertheless, state is, again, a problem. Because operator replicas must be consistent, operations need to be deterministic. Some works, such as the one by Brito et al. [BFF09b] for ESP systems and by Jiménez-Peris et al. [JPPnMA00] for general distributed systems, address a scenario where multi-threading is used but there is no static partitioning of the state. In this case, scalability of an operator is limited to a single node. The MapReduce approach requires the operator state to be partition- able but enables a single operator to scale to a large number of nodes.
An approach related, but somehow opposed, to ours is proposed by Zhang et al. [ZGY+10]. In their work passive replication is used when the system is in steady state. The system then switches to active replication if signs of a failure are detected (i.e., they speculatively activate the replica). Their goal is to reduce recovery time when using passive replication while still avoiding the usage of active replication at all times. In contrast, our goal is to exploit cycles that are already available to implement active replication and switch to passive replication, when the system is under a temporary load peak that consumes the normally available extra cycles.
6.5 Conclusion
In this chapter, we presented an approach that transparently switches between the two fault tol- erances schemes active replication and passive standby based on the availability of resources. The objective of the approach is to provide high availability without incurring additional costs by utilizing temporarily resources which have been originally reserved to accommodate sudden load spike where other load balancing techniques such as slice migration are not applicable due to large latencies. The transition between the two schemes is transparently performed using a priority scheduler which ensures that the previously reserved resources are only used for fault tolerance, i.e., replicated processing, if sufficient CPU cycles are avail-
able. In situations of an overload for longer periods of times, passive standby ensures that secondary operator slices are periodically refreshed through the state synchronization mecha- nism employed in STREAMMINE3G which furthermore prevents memory exhaustion as well as decreases the overall recovery time.
Our evaluation shows that a full utilization of the system through the use of redundant pro- cessing does only marginally increase energy consumption and that our system scales well with an increasing number of nodes and number of cores. Moreover, the figures show that our system transitions between to the two fault tolerance schemes ensuring that load spikes are accommodated by pausing immediately the processing of events at secondary slices. In case load spikes occur too frequently or the system is overloaded for a longer period of time, the mechanisms provided by passive standby enable the system to recover in the event of a crash.
ance for Cloud Environments
While the objective of the approach presented in the previous chapter was to improve system availability by utilizing spare but already paid cloud resources through a transition between
active replication and passive standby at runtime, the approach presented in this chapter will
take a slightly different direction with the goal to reduce of the overall resource consumption
for fault tolerance: Instead of switching between available fault tolerance schemes based on
the system’s resource availability and utilization, the system will now select a fault tolerance scheme that consumes the least resources while still ensuring a recovery of the system within the user-provided recovery time threshold and recovery semantics.
7.1 Motivation
Inspired by the simplicity of MapReduce, a number of new and open-source ESP systems have emerged and gained traction over the past three years such as Apache Samza [Sam15] (LinkedIn), Storm [Sto15] (Twitter) and S4 [NRNK10] (Yahoo!). Although all of those systems have a simple MapReduce-like interface in common, they have different guarantees when it comes to fault tolerance. For example, Apache S4 can recover from faults by restarting an operator on a new node and loading a previous checkpoint of the operator’s state. However, in-flight events, which were not included in the checkpoint are simply lost, hence, only gap recovery is provided. On the contrary, Apache Storm guarantees no event loss through its transactional topologies, however, lacks appropriate mechanisms for state persistence. Although the previously mentioned ESP systems offer fault tolerance to their users per se, the provided schemes are often only suitable for certain types of applications: While Apache S4 is a good choice for applications with stateful operators, where state cannot be recreated by simply reprocessing events, applications sensitive to event loss require schemes such as offered in Apache Storm.
Since the majority of ESP systems often employ only a single fault tolerance scheme, users have to choose from a pool of ESP systems rather than a pool of schemes that best matches the
requirements of the application at hand. In fact, a wide variety of fault tolerance schemes for ESP systems is known in literature ranging from active replication [SHB04, HCZ07], active or passive standby [MFB11, HBR+05], to passive replication where a combination of checkpoint and logging (i.e., upstream backup [HBR+05]) is used.
Choosing the right fault tolerance scheme is often not a trivial task as there is a trade-off between recovery time and resource overhead imposed by each scheme. For example, using active replication, an operator can recover almost instantaneously, however, at the cost of consuming twice of the resources (CPU, memory and network). On the contrary, passive replication consumes only little additional resources for state persistence (disk) and the in- memory log upstream. However, it comes with the price of a long recovery time comprising the time it takes to load the most recent checkpoint from disk and replaying events from the upstream node’s in-memory log.
Fault tolerance schemes such as active or passive standby can be considered as intermediate or hybrid alternatives as they trade recovery time by resource consumption so that they can recover faster than passive replication, however, at a much lower resource usage cost compared to the use of full active replication.
For applications that have very tight constraints such as found in the financial trading sector, the choice of using active replication is clear as those applications do not tolerate downtimes of even a few seconds. However, there exists a wide variety of applications which are less critical and where blocking for a few seconds is acceptable. Consider for example a recommendation system: During a recovery, an e-commerce site may not be able to serve its visitors with dynamically-updated recommendations while they are shopping. However, this degraded service will not necessarily lead to high financial losses, as opposed to financial trading or fraud detection applications. Hence, there is a huge potential for a variety of applications to save resources while still tolerating faults.
On the other hand, from the development perspective, application developers and data analysts often lack a comprehensive knowledge about fault tolerance concepts and their implications with regards to recovery times and resource footprint. However, even then, users have clear constraints such as the (i ) maximum amount of time an application may stay unresponsive due to recovery and if (i i ) events may be lost or not.
Considering those constraints, choosing an appropriate scheme seems to be straightforward. However, ESP systems are highly dynamic systems where the natural fluctuation in throughput originating from online data sources can highly influence the time an operator may need to recover. Consider for example an application that processes tweets using a time-based sliding window. In case the user opted for passive replication, the application may recover quite quickly if the throughput is low, as the state it keeps is relatively small. Nevertheless, with increased throughput, more tuples are accumulated per window, increasing the size of the state, and, consequently, checkpoint sizes and recovery times. If recovery time is a priority, the above example is a good use case for adaptation: while in times of low system load passive
replication may be sufficient to satisfy the user’s specified recovery time threshold, schemes providing faster recovery such as active standby must be used in times of high system load. In this chapter, we present six different fault tolerance schemes we employed in STREAM- MINE3G a user can choose from such as passive and active replication as well as intermediate alternatives such as active and passive standby. In order to free the user from the burden of choosing the right scheme for the application at hand, we propose a self-adaptive fault tolerance controller that transitions between the employed schemes during runtime based on evolution of the given workload and the user’s provided constraints (acceptable recovery time and recovery semantics, i.e., gap or precise recovery). Our evaluation shows that the overall resource footprint for fault tolerance can already be reduced by 50% with a recovery time threshold of 3 second s using our adaptive scheme compared to a conservative use of active replication.