Hadoop Architecture, HA, Failover

MRv1 daemon
• Namenode
• Secondary namenode
• Jobtracker
• Datanode
• Tasktracker

The jobtracker daemon had these two parts tightly coupled within itself and was responsible for managing the tasks and all its related operations by interacting with the tasktracker daemon. This responsibility turned out to be overwhelming for the jobtracker daemon when the nodes in the cluster started increasing and reached the 4000 node mark. This was a scalability issue that needed to be fixed. Also, the investment in Hadoop could not be justified as MapReduce was the only way to process data on HDFS. Other tools were unable to process this data. YARN was built to address these issues and is part of Hadoop Version 2.x. With the introduction of YARN, MapReduce is now just one of the clients that run on the YARN framework.
YARN addresses the preceding mentioned issues by splitting the following two jobtracker responsibilities:
• Resource management
• Job scheduling/monitoring

The jobtracker daemon has been removed and the following two new daemons have been introduced in YARN:
• ResourceManager
• NodeManager


The ResourceManager daemon is a global master daemon that is responsible for managing the resources for the applications in the cluster. The ResourceManager daemon consists of the following two components:

• ApplicationsManager
• Scheduler

The ApplicationsManager performs the following operations:

• Accepts jobs from a client.
• Creates the first container on one of the worker nodes to host the ApplicationMaster. A container, in simple terms, is the memory resource on a single worker node in cluster.
• Restarts the container hosting ApplicationMaster on failure.
The scheduler is responsible for allocating the system resources to the various applications in the cluster and also performs the monitoring of each application.
Each application in YARN will have an ApplicationMaster. This is responsible for communicating with the scheduler and setting up and monitoring its resource containers.


The NodeManager daemon runs on the worker nodes and is responsible for monitoring the containers within the node and its system resources such as CPU, memory, and disk. It sends this monitoring information back to the ResourceManager daemon. Each worker node will have exactly one NodeManager daemon running. An application is either a single job or a DAG of jobs.
Job submission in YARN

The following are the sequence of steps involved when a job is submitted to a YARN cluster:

1. When a job is submitted to the cluster, the client first receives an application ID from the ResourceManager.

2. Next, the client copies the job resources to a location in the HDFS.

3. The ResourceManager then starts the first container under the NodeManager's management to bring up the ApplicationMaster. For example, if a MapReduce job is submitted, the ResourceManager will bring up the MapReduce ApplicationMaster.

4. The ApplicationMaster, based on the job to be executed, requests resources from the ResourceManager.

5. Once the ResourceManager schedules a container with the requested resource, the ApplicationMaster contacts the NodeManager to start the container and execute the task. In case of a MapReduce job, that task would be a map or reduce task.

6. The client checks with the ApplicationMaster for status updates on the submitted job.

The following diagram shows the interactions of the client and the different daemons in a YARN environment:

RM Automatic failover

ResourceManager is the central authority that manages resources and schedules applications running on YARN. Hence, it is potentially a single point of failure in an Apache YARN cluster. ResourceManager HA is realized through an Active/Standby architecture - at any point of time, one of the RMs is Active, and one or more RMs are in Standby mode waiting to take over should anything happen to the Active. The trigger to transition-to-active comes from either the admin (through CLI) or through the integrated failover-controller when automatic-failover is enabled.

The RMs have an option to embed the Zookeeper-based ActiveStandbyElector to decide which RM should be the Active. When the Active goes down or becomes unresponsive, another RM is automatically elected to be the Active which then takes over. Note that, there is no need to run a separate ZKFC daemon as is the case for HDFS because ActiveStandbyElector embedded in RMs acts as a failure detector and a leader elector instead of a separate ZKFC deamon.

NameNode HA

DFS HA using the Quorum Journal Manager (QJM) to share edit logs between the Active and Standby NameNodes. 
In order for the Standby node to keep its state synchronized with the Active node, both nodes communicate with a group of separate daemons called "JournalNodes" (JNs). 
It is vital for the correct operation of an HA cluster that only one of the NameNodes be Active at a time. Otherwise, the namespace state would quickly diverge between the two, risking data loss or other incorrect results. In order to ensure this property and prevent the so-called "split-brain scenario," the JournalNodes will only ever allow a single NameNode to be a writer at a time. During a failover, the NameNode which is to become active will simply take over the role of writing to the JournalNodes, which will effectively prevent the other NameNode from continuing in the Active state, allowing the new Active to safely proceed with failover.
There must be at least 3 JournalNode daemons, since edit log modifications must be written to a majority of JNs. This will allow the system to tolerate the failure of a single machine. You may also run more than 3 JournalNodes, but in order to actually increase the number of failures the system can tolerate, you should run an odd number of JNs, (i.e. 3, 5, 7, etc.). Note that when running with N JournalNodes, the system can tolerate at most (N - 1) / 2 failures and continue to function normally.

-- In case you are using Hadoop V2 with HA, you don't require Secondary NameNode in production as your Slave NameNode will perform the same tasks as Secondary NameNode in optimum way. Edit Logs management can be done with Quorum Journal Manager (QJM).

With QJM, there are group of demons called JournalNode (JN) are communicating with active NameNode. This group is continuously looking for any updates done by active NameNode and maintain the state. StandBy NameNode is constantly getting the edit log updates from JNs and maintains the updated editlog file.
hdfs-site.xml configuration file

dfs.nameservices - the logical name for this new nameservice
Note: If you are also using HDFS Federation, this configuration setting should also include the list of other nameservices, HA or otherwise, as a comma-separated list.


dfs.ha.namenodes.[nameservice ID] - unique identifiers for each NameNode in the nameservice

  <value>nn1,nn2, nn3</value>

Note: The minimum number of NameNodes for HA is two, but you can configure more. Its suggested to not exceed 5 - with a recommended 3 NameNodes - due to communication overheads.

dfs.namenode.shared.edits.dir - the URI which identifies the group of JNs where the NameNodes will write/read edits
For example, if the JournalNodes for this cluster were running on the machines “node1.example.com”, “node2.example.com”, and “node3.example.com” and the nameservice ID were “mycluster”, you would use the following as the value for this setting


fs.defaultFS - the default path prefix used by the Hadoop FS client when none is given


dfs.journalnode.edits.dir - the path where the JournalNode daemon will store its local state


NOTE: Cloudera Manager exposes properties that allow you to insert custom configuration text into XML configuration, property, and text files, or into an environment. The naming convention for these properties is: XXX Advanced
Configuration Snippet (Safety Valve) for YYY or XXX YYY Advanced Configuration Snippet (Safety Valve), where XXX is a service or role and YYY is the target.

Automatic Failover
Apache ZooKeeper is a highly available service for maintaining small amounts of coordination data, notifying clients of changes in that data, and monitoring clients for failures. The implementation of automatic HDFS failover relies on ZooKeeper for the following things:
·        Failure detection - each of the NameNode machines in the cluster maintains a persistent session in ZooKeeper. If the machine crashes, the ZooKeeper session will expire, notifying the other NameNode(s) that a failover should be triggered.
·        Active NameNode election - ZooKeeper provides a simple mechanism to exclusively elect a node as active. If the current active NameNode crashes, another node may take a special exclusive lock in ZooKeeper indicating that it should become the next active.

In a typical deployment, ZooKeeper daemons are configured to run on three or five nodes. Since ZooKeeper itself has light resource requirements, it is acceptable to collocate the ZooKeeper nodes on the same hardware as the HDFS NameNode and Standby Node. Many operators choose to deploy the third ZooKeeper process on the same node as the YARN ResourceManager. It is advisable to configure the ZooKeeper nodes to store their data on separate disk drives from the HDFS metadata for best performance and isolation.
The setup of ZooKeeper is out of scope for this document. We will assume that you have set up a ZooKeeper cluster running on three or more nodes, and have verified its correct operation by connecting using the ZK CLI.

Few Other Important Concepts in Hadoop Architecture:

Decommission Nodes
YARN Nodes could be decommissioned NORMAL or GRACEFUL.
Normal Decommission of YARN Nodes means an immediate shutdown.
Graceful Decommission of YARN Nodes is the mechanism to decommission NMs while minimize the impact to running applications. Once a node is in DECOMMISSIONING state, RM won’t schedule new containers on it and will wait for running containers and applications to complete (or until decommissioning timeout exceeded) before transition the node into DECOMMISSIONED.

The scalability of YARN is determined by the Resource Manager, and is proportional to number of nodes, active applications, active containers, and frequency of heartbeat (of both nodes and applications). Lowering heartbeat can provide scalability increase, but is detrimental to utilization. Other option is federation-based approach to scale a single YARN cluster to tens of thousands of nodes, by federating multiple YARN sub-clusters. The proposed approach is to divide a large (10-100k nodes) cluster into smaller units called sub-clusters, each with its own YARN RM and compute nodes.
YARN applications are submitted to one of the Routers, which in turn applies a routing policy (obtained from the Policy Store), queries the State Store for the sub-cluster URL and redirects the application submission request to the appropriate sub-cluster RM.

HDFS Short-Circuit Local Reads (dfs.client.read.shortcircuit=true)
In HDFS, reads normally go through the DataNode. Thus, when the client asks the DataNode to read a file, the DataNode reads that file off of the disk and sends the data to the client over a TCP socket. So-called “short-circuit” reads bypass the DataNode, allowing the client to read the file directly. Obviously, this is only possible in cases where the client is co-located with the data. Short-circuit reads provide a substantial performance boost to many applications.


