Fault Tolerance Mechanism

Fault Tolerance and Resilience in Cloud Computing Environments

Ravi Jhawar , Vincenzo Piuri , in Computer and Information Security Handbook (Third Edition), 2017

Multiple Choice

1.

What measures the strength of the fault tolerance mechanism in terms of the granularity at which it can handle errors and failures in the system?

A.

Resource consumption

B.

Performance

C.

Fault tolerance model

D.

Multiple machines within the same cluster

E.

All of the above

2.

What factor deals with the impact of the fault tolerance procedure on the end-to-end QoS during both failure and failure-free periods?

A.

Resource consumption

B.

Fault tolerance model

C.

Performance

D.

Multiple machines within the same cluster

E.

All of the above

3.

How many replicas of an application can be placed on hosts that are connected by a ToR switch (within a LAN)?

A.

One

B.

Three

C.

Five

D.

Four

E.

Two

4.

How many replicas of an application can be placed on hosts belonging to different clusters in the same data center (on hosts that are connected via a ToR switch and AggS)?

A.

One

B.

Three

C.

Five

D.

Four

E.

Two

5.

How many replicas of an application can be placed on hosts belonging to different data centers (connected via a switch), AggS and AccR?

A.

Two

B.

Four

C.

One

D.

Three

E.

Five

Read full chapter

URL:

https://www.sciencedirect.com/science/article/pii/B9780128038437000090

Advances in Computers

Vidar Slåtten , ... Frank Alexander Kraemer , in Advances in Computers, 2013

8 Conclusion

To improve the reliability of a system, we can add fault-tolerance mechanisms, so as to tolerate faults that cannot be removed at design-time. However, the resulting rise in complexity increases the probability of software faults being introduced. Hence, unless the process is handled carefully, adding fault tolerance may even lead to a less reliable system. Many research groups are trying to develop practical approaches for incorporating fault-tolerance mechanisms with the functional parts of systems. As a way to deal with the inherently high level of complexity of such systems, some have turned to the paradigm of model-driven engineering. This results in a research field that crosscuts the established fields of software engineering, system verification, fault tolerance and distributed systems. Many works are presented in the context of one of these traditional fields, making it difficult to get a good overview of what is presently offered. We have therefore surveyed 10 approaches for model-driven engineering of reliable fault-tolerant systems and decided on 13 characteristics that we claim to classify the approaches in a manner useful for both users and developers of such approaches.

We have found it natural to group the approaches into those that allow developers to create their own fault-tolerance mechanisms and those that are tied to a fixed set of mechanisms provided by an existing middleware. The first group allows developers more fine-grained control, but makes the integration of the fault-tolerance mechanisms more difficult. These approaches aid developers by various forms of verification methods, so as to ensure that the system is built correctly. The latter group abstracts away from the system behavior and focus on fault-tolerance mechanisms that do not need to be tightly integrated with the functional behavior. They also offer domain-specific languages to easily configure the fault-tolerance mechanisms as desired.

Most approaches are from research groups at universities, and they focus on demonstrating a few new capabilities, rather than consolidating all existing ones into their approach. As a result, none of the approaches can be said to be a superset of any other one, and choosing any one approach will likely result in a compromise of some sort. For anyone with intentions of using or developing an approach for model-driven engineering of reliable fault-tolerant systems, this survey should therefore provide a good starting point.

Read full chapter

URL:

https://www.sciencedirect.com/science/article/pii/B9780124080898000045

Dependable and Secure Systems Engineering

Yves Crouzet , Karama Kanoun , in Advances in Computers, 2012

2.3 Performance-Related Measures

Classical performance measures include measures such as system response time and system throughput. In the context of dependability benchmarking, performance evaluation addresses the characterization of system behavior in the presence of faults or with respect to the additional fault-tolerance mechanisms. For example, some fault-tolerance mechanisms may have a very high coverage factor with a large time overhead in normal operation. It is interesting to evaluate such time overhead. Concerning the system behavior in the presence of faults, following fault occurrence or fault activation, either the system fails or a correct response is provided (correct value, delivered on time). Indeed, a correct value delivered too late with respect to the system specification is to be considered as a failure, mainly for hard real-time systems.

In the presence of errors, a system may still provide a correct response with a degraded performance. Hence, the response time and the throughput (which are at the origin pure performance measures) become dependability and performance-related measures characterizing the system performance in the presence of faults.

Read full chapter

URL:

https://www.sciencedirect.com/science/article/pii/B9780123965257000046

Software Fault Tolerance

Israel Koren , C. Mani Krishna , in Fault-Tolerant Systems (Second Edition), 2021

5.1 Acceptance Tests

As with hardware systems, an important step in any attempt to tolerate faults is to detect them. A common way to detect software defects is through acceptance tests. These are used in wrappers and in recovery blocks, both of which are important software fault-tolerance mechanisms; these will be discussed later.

If your thermometer were to read 40 o C on a sweltering midsummer day, you would suspect it was malfunctioning. This is an example of an acceptance test. An acceptance test is essentially a check of reasonableness. Most acceptance tests fall into one of the following categories:

Timing checks: One of the simplest checks is timing. If we have a rough idea of how long the code should run, a watchdog timer can be set appropriately. When the timer goes off, the system can assume that a failure has occurred (either a hardware failure or something in the software that caused the node to "hang"). The timing check can be used in parallel with other acceptance tests.

Verification of output: In some cases, the acceptance test is suggested naturally from the problem itself. That is, the nature of the problem is such that although the problem itself is difficult to solve, it is much easier to check that the answer is correct, and it is also less likely that the check itself will be incorrect. To take a human analogy, solving a jigsaw puzzle can take a long time; checking to see that the puzzle has been correctly put together is trivial and takes just a glance.

Examples of such problems are calculating the square root (square the result to check if you get the original number back), the factorization of large numbers (multiply the factors together), the solution of equations (substitute the supposed solution into the original equations), and sorting. Note that in sorting, it is not enough merely to check that the numbers are sorted: we have also to verify that all the numbers at the input are included in the output.

Sometimes, to save time, we will restrict ourselves to probabilistic checks. These do not guarantee that all erroneous outputs will be caught even if the checks are executed perfectly, but have the advantage of requiring less time. One example of such a check for the correctness of matrix multiplication is as follows:

Suppose we multiply two n × n integer matrices A and B to produce C. To check the result without repeating the matrix multiplication, we may select at random an n × 1 vector of integers R, and carry out the operations M 1 = A × ( B × R ) and M 2 = C × R . If M 1 M 2 , then we know that an error has occurred. If M 1 = M 2 , that still does not prove that the original result C was correct; however, it is very unlikely that the random vector R was selected such that M 1 = M 2 , even if A × B C . To further reduce this probability, we may select another n × 1 vector and repeat the check.

Range checks: In other cases, we do not have such convenient and obvious approaches to checking the correctness of the output. In such situations, range checks can be used. That is, we use our knowledge of the application to set acceptable bounds for the output: if it falls outside these bounds, it is declared to be erroneous. Such bounds may be either preset, or some simple function of the inputs. If the latter, the function has to be simple enough to implement, so that the probability of the acceptance test software itself being faulty is sufficiently low.

For example, consider a remote-sensing satellite that takes thermal imagery of the earth. We could obviously set bounds on the temperature range and regard any output outside these bounds as indicating an error. Furthermore, we could use spatial correlations, which means looking for excessive differences between the temperatures in adjacent areas, and flagging an error if the differences cannot be explained by physical features (such as volcanoes).

When setting the bounds on acceptance tests, we have to balance two parameters: sensitivity and false alarm rate. We have encountered these quantities before in Chapter 2: recall that sensitivity is the probability that the acceptance test catches an erroneous output. To be more precise, it is the conditional probability that the test declares an error, given the output is erroneous. The false alarm rate is the conditional probability that the test has declared an error, given that the tested entity is actually good.

An increase in sensitivity can be achieved by narrowing the acceptance range bounds. Unfortunately, this would at the same time increase the false alarm rate. In an absurdly extreme case, we could narrow the acceptance range to zero, so that every output flags an error. In such a case, the sensitivity would be 100%, but then every correct output would also be declared erroneous.

Read full chapter

URL:

https://www.sciencedirect.com/science/article/pii/B9780128181058000152

Introduction

Daniel Aarno , Jakob Engblom , in Software and System Development using Virtual Platforms, 2015

Platform Development

Platform development refers to the development of the fundamental software that makes hardware work and that provides a platform for application development. As discussed before, this includes the development of firmware, boot loaders, and BIOS, as well as operating system kernels and BSPs. In addition to such hardware-interface code, it also usually involves integrating various forms of middleware software on top of the operating system. The middleware provides the crucial domain-specific specialization of the generic operating system platform, such as distributed communications systems, fault-tolerance mechanisms, load balancing, databases, and virtual machines for Java, C#, and other languages. The complete software stack can be developed and run on Simics.

Debugging low-level code in Simics is a much nicer experience than using hardware, especially compared to early unstable prototype hardware. As discussed in depth in Chapter 3, Simics enables the debugging of firmware and boot code from the first instruction after power on, and makes it easy to debug device drivers and interrupt handlers. When drivers and the operating system are up, Simics can be used to integrate middleware and services on top of the operating system, taking the setup all the way to a complete running platform, ready for application developers (Tian, 2013).

In larger organizations, there is usually a dedicated platform team who is responsible for developing and delivering ready-to-use integrated platforms for application developers. Virtual platforms can be used to efficiently deliver the platform to application developers, containing both hardware and software, booted, configured, and ready to go. With a virtual platform, a nightly build can become a nightly boot, using checkpoints as discussed in Chapter 3 to deliver a ready-to-use platform to the application development teams.

Read full chapter

URL:

https://www.sciencedirect.com/science/article/pii/B9780128007259000019

Resource Management in Big Data Processing Systems

S. Tang , ... B.-S. Lee , in Big Data, 2016

7.1 Introduction

In many application domains such as social networks and bioinformatics, data is being gathered at unprecedented scale. Efficient processing for Big Data analysis poses new challenges for almost all aspects of state-of-the-art data processing and management systems. For example, there are a few challenges as follows: (i) the data can be arbitrarily complex structures (eg, graph data) and cannot be efficiently stored in relational database; (ii) the data access of large-scale data processing are frequent and complex, resulting in inefficient disk I/O accesses or network communications; and (iii) last but not least, a variety of unpredictable failure problems must be tackled in the distributed environment, so the data processing system must have a fault tolerance mechanism to recovery the task computation automatically.

Cloud computing has emerged as an appealing paradigm for Big Data processing over the Internet due to its cost effectiveness and powerful computational capacity. Current infrastructure-as-a-service (IaaS) clouds allow tenants to acquire and release resources in the form of virtual machines (VMs) on a pay-as-you-use basis. Most IaaS cloud providers such as Amazon EC2 offer a number of VM types (such as small, medium, large, and extra-large) with fixed amount of CPU, main memory, and disk. Tenants can only purchase fixed-size VMs and increase/decrease the number of VMs when the resource demands change. This is known as T-shirt and scale-out model [1]. However, the T-shirt model leads to inefficient allocation of cloud resources, which translates to higher capital expenses and operating costs for cloud providers, as well as an increase of monetary cost for tenants. First, the granularity of resource acquisition/release is coarse in the sense that the fix-sized VMs are not tailored for cloud applications with dynamic demands delicately. As a result, tenants need to overprovision resource (costly), or risk performance penalty and service level agreement (SLA) violations. Second, elastic resource scaling in clouds [2], also known as a scale-out model, is also costly due to the latencies involved in VM instantiating [3] and software runtime overhead [4]. These costs are ultimately borne by tenants in terms of monetary cost or performance penalty.

Resource sharing is a classic and effective approach to resource efficiency. As more and more Big Data applications with diversifying and heterogeneous resource requirements tend to deploy in the cloud [5,6], there are vast opportunities for resource sharing [1,7]. Recent work has shown that fine-grained and dynamic resource allocation techniques (eg, resource multiplexing or overcommitting [1,8–11]) can significantly improve the resource utilization compared to T-shirt model [1]. As adding/removing resources is directly performed on the existing VMs, a fine-grained resource allocation is also known as a scale-up model, and the cost tends to much smaller compared to the scale-out model. Unfortunately, current IaaS clouds do not offer resource sharing among VMs, even if those VMs belong to the same tenant. Resource sharing models are needed for better cost efficiency of tenants and higher resource utilization of cloud providers.

Researchers have been actively proposing many innovative solutions to address the new challenges of large-scale data processing and resource management. In particular, a notable number of large-scale data processing and resource management systems have recently been proposed. The aims of this chapter are (i) to introduce canonical examples of large data processing, (ii) to make an overview of existing data processing and resource management systems/platforms, and more importantly, (iii) to make a study on the economic fairness for large-scale resource management on the cloud, which bridges large-scale resource management and cloud-based platforms. In particular, we present some desirable properties including sharing incentive, truthfulness, resource-as-you-pay fairness, and pareto efficiency to guide the design of fair policy for the cloud environment.

The chapter is organized as follows: We first present several types of resource management for Big Data processing in Section 7.2. In Section 7.3, we list some representative Big Data processing platforms. Section 7.4 presents the single resource management in the cloud, following by Section 7.5 that introduces multiresource management in the cloud. For the completeness of discussions, Section 7.6 gives an introduction to existing work on resource management. A discussion of open problems is provided in Section 7.7. We conclude the chapter in Section 7.8.

Read full chapter

URL:

https://www.sciencedirect.com/science/article/pii/B9780128053942000076

Routers as Distributed Systems

George Varghese , in Network Algorithmics, 2005

15.1.2 Rescuing Reliability

The protocol sketched in the last subsection uses limited receiver SRAM buffers very efficiently but is not robust to failures. Before understanding how to make the more elaborate flow control protocol robust against failures, it is wiser to start with the simpler credit protocol portrayed in Figure 15.2.

Intuitively, the protocol in Figure 15.2 is like transferring money between two banks: The "banks" are the sender and the receiver, and both credits and cells count as "money." It is easy to see that in the absence of errors the total "money" in the system is conserved. More formally, let CR be the credit register, M the number of cells in transit from sender to receiver, C the number of credits in transit in the other direction, and Q the number of cell buffers that are occupied at the receiver.

Then it is easy to see that (assuming proper initialization and that no cells or credits are lost on the link), the protocol maintains the following property at any instant: CR  + M  + Q  + C  = B, where B is the total buffer space at the receiver. The relation is called an invariant because it holds at all times when the protocol works correctly. It is the job of protocol initialization to establish the invariant and the job of fault tolerance mechanisms to maintain the invariant.

If this invariant is maintained at all times, then the system will never drop cells, because the number of cells in transit plus the number of stored cells is never more than the number of buffers allocated.

There are two potential problems with a simple hop-by-hop flow control scheme. First, if initialization is not done correctly, then the sender can have too many credits, which can lead to cell's being dropped. Second, credits or cells for a class can be lost due to link errors. Even chip-to-chip links are not immune from infrequent bit errors; at high link speeds, such errors can occur several times an hour. This second problem can lead to slowdown or deadlock.

Many implementors can be incorrectly persuaded that these problems can be fixed by simple mechanisms. One immediate response is to argue that these cases won't happen or will happen rarely. Second, one can attempt to fix the second problem by using a timer to detect possible deadlock. Unfortunately, it is difficult to distinguish deadlock from the receiver's removing cells very slowly. Worse, the entire link can slow down to a crawl, causing router performance to fall; the result will be hard to debug.

The problems can probably be cured by a router reset, but this is a Draconian solution. Instead, consider the following resynchronization scheme. For clarity, the scheme is presented using a series of refinements depicted in Figure 15.3.

Figure 15.3. Three steps to a marker algorithm.

In the simplest synchronization scheme (Scheme 1, Figure 15.3), assume that the protocol periodically sends a specially marked cell called a marker. Until the marker returns, the sender stops sending data cells. At the receiver, the marker flows through the buffer before being sent back to the upstream node. It is easy to see that after the marker returns, it has "flushed" the pipe of all cells and credits. Thus at the point the marker returns, the protocol can set the credit register (CR) to the maximum value (B). Scheme 1 is simple but requires the sender to be idled periodically in order to do resynchronization.

So Scheme 2 (Figure 15.3) augments Scheme 1 by allowing the sender to send cells after the marker has been sent; however, the sender keeps track of the cells sent since the marker was launched in a register, say, CSM (for "cells sent since marker"). When the marker returns, the sender adjusts the correction to take into account the cells sent since the marker was launched and so sets CR  = BCSM.

The major flaw in Scheme 2 is the inability to bound the delay that it takes the marker to go through the queue at the receiver. This causes two problems. First, it makes it hard to bound how long the scheme takes to correct itself. Second, in order to make the marker scheme itself reliable, the sender must periodically retransmit the marker. Without a bound on the marker round-trip delay, the sender could retransmit too early, making it hard to match a marker response to a marker request without additional complexity in terms of sequence numbers.

To bound the marker round-trip delay, Scheme 3 (Figure 15.3) lets the marker bypass the receiver queue and "reflect back" immediately. However, this requires the marker to return with the number of free cell buffers F in the receiver at the instant the marker was received. Then when the marker returns, the sender sets the credit register CR  = FCSM.

The marker scheme is a special instance of a classical distributed systems technique called a snapshot. Informally, a snapshot is a distributed audit that produces a consistent state of a distributed system. Our marker-based snapshot is slightly different from the classical snapshot described in Chandy and Lamport [CL85]. The important point, however, is that snapshots can be used to detect incorrect states of any distributed algorithm and can be efficiently implemented in a two-node subsystem to make any such protocol robust. In particular, the same technique can be used [OSV94] to make the fancier flow control of Section 15.1.1 equally robust.

In particular, the marker protocol makes the credit-update protocol self-stabilizing; i.e., it can recover from arbitrary errors, including link errors, and also hardware errors that corrupt registers. This is an extreme form of fault tolerance that can greatly improve the reliability of subsystems without sacrificing performance.

In summary, the general technique for a two-node system is to write down the protocol invariants and then to design a periodic snapshot to verify and, if necessary, correct the invariants. Further techniques for protocols that work on more than two nodes are describe in Awerbuch et al. [APV91]; they are based on decomposing, when possible, multinode protocols into two-node subsystems and repeating the snapshot idea.

An alternative technique for making a two-node credit protocol fault tolerant is the FCVC idea of Kung et al. [KCB94], which is explored in the exercises. The main idea is to use absolute packet numbers instead of incremental updates; with this modification the protocol can be made robust by the technique of periodically resending control state on the two links without the use of a snapshot.

Read full chapter

URL:

https://www.sciencedirect.com/science/article/pii/B9780120884773500187

Large-Scale Network and Service Management with WANTS

Federico Bergenti , ... Danilo Gotta , in Industrial Agents, 2015

13.3.1 A Brief Recall on WADE

WADE is essentially the main evolution of JADE (Java Agent and DEvelopment framework) (Bellifemine et al., 2001, 2007; Bergenti et al., 2001, 2002). JADE is a popular framework that facilitates the development of interoperable multi-agent systems. It is open-source and can be downloaded from its official web site at http://jade.tilab.com.

Since its initial development back in 1998, JADE has been used in many research and industrial projects on an international scale. Just to cite a known and appreciated use of JADE, which is closely related to the network management scenario that WANTS addresses, British Telecommunications uses JADE as the core platform for mPower (Lee et al., 2007), a multi-agent system used by engineers to support cooperation among mobile workers and team-based job management.

WADE mainly adds to JADE the support for the execution of tasks defined according to the workflow metaphor, as shown in Figure 13.3. It also provides a number of mechanisms that help manage the inherent complexity of a distributed system, both in terms of administration and fault tolerance.

Figure 13.3. Main elements of the WADE architecture.

A WADE container is nothing but a JADE container equipped with a special CA (container agent) that provides, together with a local boot daemon, the functionality needed to implement a robust and scalable container life cycle. A boot daemon process runs in each host of the platform and is in charge of the activation of agent containers for its host. WADE sets a CA for each container in the platform and each CA is responsible for supervising the activities in its container and for all fault-tolerance mechanisms.

The main container of the platform also hosts a CFA (ConFiguration Agent) that centralizes the management of the configuration parameters for the platform. The CFA is responsible for interacting with boot daemons and for controlling the life cycle of applications.

This brief recall on WADE is mainly focused on the aspects related to workflow-based development because they are considered the most characterizing feature of WADE in its current form. This is the reason why Figure 13.3 uses the acronym WEA (Workflow Engine Agent) to exemplify WADE agents. Instead of providing a single powerful workflow engine, as standard practice in traditional workflow management systems, WADE gives to each JADE agent the possibility of executing workflows. The WEAs are the peculiar type of agents capable of downloading and executing workflows. Each WEA embeds a micro-workflow engine, and a complex process is carried out by a set of cooperating agents, each of which executes a part of the process.

It is worth noting that WADE can be used as an everyday development platform and, in principle, developers can use WADE with little or no adoption of workflows. WADE can be used as an extended JADE that provides transparent functionality to support fault tolerance and load balancing.

One of the main advantages of the workflow metaphor is the possibility of representing processes with friendly visual notations. WADE provides both (i) the expressiveness of the visual representation of workflows, and (ii) the power of visual programming languages.

WADE comes with a development environment called WOLF (WOrkflow LiFe cycle management environment) described in (Caire et al., 2008). WOLF facilitates the creation of WADE workflow-based agents: It provides users with a visual notation and an advanced editor integrated with the Eclipse IDE, and little or no programming skills are needed to implement WADE workflows. As its name suggests, WOLF is not only a tool to graphically create workflows for WADE, it is also a complete environment to manage the life cycle of workflows from early prototypes to the final deployment.

Another characterizing feature of WADE is that it does not force a privileged textual or visual notation to express workflows; rather, it expects workflows in terms of sets of Java classes. This design choice makes workflows immediately executable and no interpretation is needed. This approach eases the graceful scaling from a high-level view of workflows to a lower-level view of Java code. WOLF is an essential tool in this picture because it provides a convenient visual view of workflow classes and smoothly integrates workflow editing with Java editing.

Even if no intermediate formalism is used and a workflow is nothing but a set of Java classes, the internal structure of such classes and their relations are largely inspired by the XPDL (XML Process Definition Language) (see http://www.wfmc.org/standards/xpdl.htm) metamodel. The XPDL metamodel was chosen because XPDL was designed as an interchange formalism, and the early adoption of such a metamodel facilitates the import and export of XPDL files. At the time of writing, WOLF did not yet support the import or export of XPDL files, but this feature is a planned improvement for the tool.

Currently, the metamodel of the Java classes that represent a WADE workflow, or the WADE metamodel of workflows for short, supports all the elements specified in XPDL version 1.0 and some elements, mainly related to the events, introduced in XPDL version 2.0.

In the WADE metamodel of workflows, a process is represented as a workflow consisting of one or more activities. Activities are tasks to be performed by WADE agents or by other actors. In a workflow, the execution entry point is always defined and it specifies the first activity to be performed. Moreover, a workflow must have one or more termination activities that are to be performed before marking the workflow as terminated.

In the WADE metamodel of workflows, the execution flow is defined by means of transitions. A transition is an oriented connection between two activities and can be associated with a condition. With the exception of termination activities, each activity can have one or more outbound transitions. When the execution of an activity is terminated, the conditions associated with its outbound transitions are evaluated. As soon as a condition holds the corresponding transition, it is activated and the execution flow advances toward the destination activity of the selected transition.

Normally, a process uses internal data, for instance, to pass intermediate results among activities and/or for the evaluation of conditional expressions. In the WADE metamodel of workflows, internal data is modeled as data fields.

A process can have input data be provided before the execution can start, and it can have output data available just after termination. Input and output data is formalized in the WADE metamodel of workflows by means of the so-called formal parameters.

Finally, the graphical representation of workflows that WOLF provides is largely inspired by BPMN (Business Process Model and Notation) (see http://www.omg.org/bpmn), an accepted standard that smoothly integrates with the XPDL metamodel, and therefore with the WADE metamodel of workflows.

Read full chapter

URL:

https://www.sciencedirect.com/science/article/pii/B9780128003411000139

Fundamentals of big data in radio astronomy

Jiale Lei , Linghe Kong , in Big Data in Astronomy, 2020

5.1 Horizontal scaling platforms

Some of the popular horizontal scale-out platforms include peer-to-peer networks and Apache Hadoop. Nowadays, researchers have already started to develop the next generation of horizontal scale-out tools to overcome the weaknesses of existing platforms. These platforms will be examined in more detail in the following.

Peer-to-peer networks involve millions of machines connected in a network. This network architecture is designed to be decentralized and distributed. It may be one of the oldest existing distributed computing platforms. Each node in the system is called a peer, and peers communicate and exchange messages by using the message passing interface scheme called MPI. Each node can store a certain amount of data and the scale out is practically without limitations, which means millions of nodes or even more can be involved. However, the major bottleneck in peer-to-peer networks arises in the communication among different nodes. Broadcasting messages in such systems is low cost, but the aggregation of data or analytic results is much more expensive. Moreover, the messages are sent over the network in the form of a spanning tree with an arbitrary node as the root where the broadcasting is initiated.

MPI is the standard software communication paradigm that is used in this network. It has been in use for many years and is well established and thoroughly debugged. One of the main features of MPI is the state preserving process. This means processes can be kept alive as long as the system operates and there is no need to load the same data again and again as other frameworks do, such as MapReduce. All the parameters can be saved locally. Therefore, MPI is suitable for iterative processing. Another important feature of MPI is its hierarchical master/slave paradigm. Under such a paradigm, a slave node can become the master for other processes, which is extremely useful and flexible for dynamic resource allocation, especially when the slave node has large amounts of data to process.

Actually, MPI is available for many programming languages, including C   ++ and Java. It provides methods to send and receive messages and data among processes. One of the most important methods in MPI is "broadcast," which broadcasts the data or messages over all the nodes. Another method called "barrier" is also frequently used to put up a barrier so that all the processes can synchronize and reach a certain state before working further.

Although it seems that MPI is perfect enough for developing algorithms for big data analysis, some drawbacks also arise. For example, MPI does not have any mechanism to deal with faults. Because peer-to-peer networks are completely unreliable hardware, when MPI is used on top of such architectures, a single node failure can lead to the breakdown of the entire system. Therefore, users must implement some fault tolerance mechanism within the program to avoid such tragedies. Other frameworks such as Hadoop are robust to fault tolerance and are becoming increasingly popular, meaning that MPI is not as widely used as before.

Apache Hadoop is an open source framework for storing and processing massive datasets using clusters of commodity hardware. Hadoop is developed to scale up to hundreds and even thousands of nodes with high fault tolerance. The components of a Hadoop stack are illustrated in Fig. 2.4. As the figure shows, the lowest layer of Hadoop is the Hadoop distributed file system (HDFS), which is a distributed file system that is widely used to store data across clusters of commodity machines. The distributed file system supports high availability and excellent fault tolerance. Hadoop YARN is a layer responsible for resource management and job scheduling across the cluster. MapReduce is the programming model used in Hadoop. As mentioned above, MapReduce divides the entire task into two functions, that is, map and reduce. The map functions load the data from HDFS, process it, and then gain some intermediate results that are sent to the reduce function. Reduce functions aggregate the received intermediate results to generate the final output and write the output to HDFS. Typically, a Hadoop job involves operating several map functions and reduce functions across different nodes in the cluster.

Fig. 2.4

Fig. 2.4. Hadoop stack.

Other platforms based on MapReduce have also been developed, and these are called MapReduce wrappers. These MapReduce wrappers can support better control over the MapReduce program and support aid in the source code development. Two popular wrappers are Apache Pig and Hive. Both can make code development easier by avoid making programmers deal with the complexities of MapReduce coding. In addition, programming environments such as DryadLINQ have emerged that allow end users more flexibility over MapReduce because they can have more control over the coding. DryadLINQ is developed by Microsoft, using LINQ (a parallel programming language) and a cluster execution environment named Dryad. Programmers can just use Visual Studio as the tool for better debugging and development, and even interoperate with other languages such as standard. NET.

Spark is regarded as the representative of a next-generation paradigm for big data processing. It is designed to overcome the disk I/O limitations on Hadoop and to improve the performance of earlier systems. What makes Spark unique and efficient is that Spark can support in-memory computations. Compared to Hadoop, which requires loading data from a disk, Spark allows data to be cached in memory to perform in a more efficient way. At present, Spark has become a general framework for massive dataset processing. It also supports many mainstream programming languages, including Java, Python, and Scala. Spark can run up to 100 times faster than Hadoop MapReduce for some certain tasks when the data is suitable to be cached in memory. Even when data resides on the disk, Spark operates up to 10 times as rapidly as Hadoop does. Spark is also compatible with Hadoop frameworks as it can run on Hadoop Yarn manager and load data from HDFS, making it simple to run Spark on different systems.

Berkeley data analytics stack (BDAS): BDAS was developed by researchers at the University of California at Berkeley. The BDAS stack is shown in Fig. 2.5. As is seen, there is a layer called Tachyon that is on the upper level of HDFS. Actually, this component is based on HDFS, and it achieved higher performance than HDFS by utilizing memory in a more efficient way. Tachyon works by caching those files that are frequently read in memory so as to access the disk as little as possible. Thus, these cached files can be accessed at memory speed. Tachyon is also compatible with Hadoop MapReduce. In other words, MapReduce programs can run over Tachyon just as they directly run on HDFS. In addition, Tachyon also supports operations for raw tables. With Tachyon, users can load tables with hundreds of columns, and Tachyon can help identify the columns that are frequently used to cache them in memory for further use.

Fig. 2.5

Fig. 2.5. Berkeley data analytics stack.

Another crucial layer in BDAS is Apache Mesos, shown in Fig. 2.5 above Tachyon. Mesos is used for cluster management that provides effective resource isolation and sharing across distributed applications or frameworks. It allows Hadoop, Spark, and other applications to share a dynamically shared pool of resources. Supported by Mesos, it is possible for tens of thousands of nodes to work together. Mesos provides APIs in programming languages such as Java, Python, and C   ++ to allow users to develop new parallel applications. In addition, Mesos includes the capability for multiresource scheduling, making it a powerful resource manager.

The layer above Mesos is Spark, which replaced Hadoop MapReduce in BDAS architectures. On the top of the stack are various Spark wrappers. For example, Spark Streaming is used for large-scale real-time stream processing. Blink DB can provide queries with bounded errors and bounded response times, even when the datasets are huge. GraphX is a distributed graph system based on Spark, and MLBase provides a distributed machine learning library based on Spark.

Read full chapter

URL:

https://www.sciencedirect.com/science/article/pii/B9780128190845000109