H-Quincy: Fair Scheduling for Hadoop Clusters2167 2013-09-14 16:49
H-Quincy implements the paper Quincy: Fair Scheduling for Distributed Computing Clusters on Hadoop, which improves the mapreduce scheduler by replacing the default queue-based one with a flow-based one. A min cost flow is calculated and updated to assign map tasks among the cluster, according to the size of the data split and the communication overhead in the cluster’s hierarchy.
git clone https://github.com/puncsky/H-Quincy.git
You can either build from source code or user the JAR directly.
- Build from Source Code. Replace your
$HADOOP_HOME/src/mapred/org/apache/hadoop/mapredwith files in
$HADOOP_HOMEand build with
- utilize the JAR directly. Replace your
The Quincy paper is rather theoretical organized and involves a large number of mathematical details, which reasonably makes itself hard to understand. The following sections explains our implementation.
Figure 1 displays an outline of our architecture. There exist three kinds of nodes and accordingly three levels of hierarchy in the cluster. Computing nodes are connected via a rack switch, and placed in the same rack. Rack switches are connected via a core switch. Core switches and rack switches do not undertake computing works but can still be presented in the Hadoop system as nodes.
Figure 1: A sample architecture with simplified preferred lists.
As we know, Hadoop takes a master/slave architecture, which includes a single master and multiple worker nodes. The master node in general consists of a JobTracker and a NameNode. The slave worker consist of a TaskTracker and a DataNode. Nevertheless, the master can also play the role of the slave at the same time. JobTracker assigns tasks to the TaskTrackers for execution, and then collects the result. NameNode manages the index for blocks of data storing in DataNodes among the Hadoop Distributed File System.
In our cluster, each computing node is both a TaskTracker and a DataNode, which means they are all slaves. And we select one of them as a master, which is both a JobTracker and a DataNode simultaneously. The master maintains the relationship with slaves through heartbeat, whose overhead is ignorable when compared to the data transfer or the execution of tasks.
There may be many jobs sharing the cluster resources at the same time. When a job comes into the JobTracker’s job queue, the JobTracker resorts the queue by the priority and the start time of these jobs. A job is composed of the map tasks and the reduce tasks. When the job comes out of the queue and starts to run, the JobTracker will analyze the input data’s distribution over those TaskTrackers and initialize a list of preferred tasks for each TaskTracker, rack switch, and core switch, as shown in figure 1. A task will occur on the preferred list of some node if its data splits are stored in that node or in any of its child nodes. Then the JobTracker’s scheduler matches tasks to the TaskTrackers and launch them to run the tasks on their own newly-assigned lists. At the same time, the JobTracker keeps collecting status information from TaskTrackers until all the tasks finish. If failure happens in the TaskTracker, the JobTracker will restart the task from that TaskTracker or enable a new TaskTracker to execute the task. The scheduler can kill a task on a TaskTracker with preemption if there is a more suitable arrangement.
After the initialization of preferred lists, the JobTracker assign a series of actions, including the tasks waiting for execution, into the response to heartbeat from the matched TaskTracker. The matched TaskTracker receives the heartbeat response and adds the actions to the task queue.
In the task assignment, the JobTracker’s task scheduler first calculates the max workload for every job, and leaves certain padding slots for speculative tasks. The speculative task backs up some running task, in case that the running task is too slow and impede the job’s progress. Map tasks are assigned before reduce tasks. Data-local and rack-local tasks are assigned before non-local tasks.
The default setup for Hadoop is a queue-based greedy scheduler. The preferred task lists of every node can be deemed as queues. Each computing node has a queue of tasks which can be executed without pulling data from other places. Each computing node in a rack shares a rack queue in case that computing node can execute tasks with pulling data splits from other nodes in the same rack, when the local list has already been finished. Since racks are connected with a core switch, racks also share a global queue. For every assignment, node will compute the previously failed tasks first, then the non-running local and non-local tasks, and finally the speculative tasks.
When only one job is running on the cluster, that job will use the entire cluster. What will happen if multiple jobs are submitted? The default’s sorting by priority and start time only ensures that more significant and earlier submitted jobs are dequeued first. However, the Hadoop fair scheduler arranges jobs into pools. Computing resources are divided fairly between these pools. A pool can be occupied by only one user (by default) or shared by a user group. Within the pool, jobs can be scheduled evenly or first-in-first-serve.
Assume we have computers and running jobs, and job has tasks in total. Each job will be allocated task slots. If there are still resting slots, divide them evenly. After finishing the running tasks, a TaskTracker will follow the new allocation . However, if the process is not preemptive, the running tasks will definitely not be affected when new jobs are submitted irrespective of the changed allocation . If the fairness is ensured with preemption, the running tasks will be killed while a new quota shows up.
Hadoop default scheduler sets up a wait time to enable a portion of the cluster wait to get better locality, preventing a job’s tasks from becoming sticky to a TaskTracker.
[Quincy](Quincy: Fair Scheduling for Distributed Computing Clusters) introduces a new framework transforming the scheduling into a global min cost max flow problem. Running a particular task on some machine incurs a data calculation cost and, potentially, a data transfer cost. Killing a running task also incurs a wasted time cost. If different kinds of costs can be expressed in the same unit, then we can investigate an algorithm to minimize the total cost of the scheduling.
In a flow network, a directed graph has and . For each edge , there are , and .
The problem is to calculate the min cost flow
Edmonds-Karp algorithm is used to calculate the min cost flow with in our implementation.
Figure 2 shows the graph along with the same topology in Figure 1. Since supplies are from a variety of sources – task nodes and unscheduled nodes, the graph is a multi-source single-sink flow. Our implementation adds a virtual source to transform the flow into a single-source one.
Figure 2: Min-Cost Max Flow Graph
Each task node has a supply of 1, so . The unscheduled is used to control the fairness. Tasks flowing to the unscheduled will not be assigned to computing nodes at this time. Each job must have and only has one unscheduled node with $$capacity(source, unscheduled) = F_j - N_j$$ where is the max number of running tasks job j may has. is the number of TaskTrackers assigned to this job.
From each task node, there are edges to the core switch, preferred rack, and preferred computing nodes. By default, every split of data has three replicas, so the number of preferred computing nodes is usually 3. So we can yield $$capacity(task, core switch) = 1$$ $$capacity(task, preferredRackSwitch) = 1$$ $$capacity(task, preferredComputingNode) = 1$$
From the unscheduled, there is only one edge to sink with , where is the min number of running tasks job j may have.
From the core switch, there are edges to every rack with capacities of $$capacity(coreSwitch, rackSwitch) = numberOfTaskTrackersInThatRack$$
From each rack switch, there are edges to every computing node with capacity of .
From each computing node, there is only one edge with . The number of task slots is 2 by Hadoop’s default for map tasks. The value is 1 for reduce tasks.
The cost of scheduling a task job with tasks onto a computing node is , where is the cost to transfer one GB across a rack switch, is the cost to transfer one GB across the core switch. is the upper bounds of the transferred data size across a rack switch and across a core switch.
The cost of scheduling a task onto a preferred rack is .
The cost of scheduling a task onto a preferred computer is . However, if the computer is now executing the same task, the cost should be , where is the number of seconds for which the task has been running.
The cost of scheduling a task onto the unscheduled is , where is a wait-time factor and is the total number of seconds that task in job has spent unscheduled.
In our current version for testing, the wait-time factor , per GB, per GB. and can be set larger to achieve a better locality.
After initialization, the min cost flow matrix will be recalculated every time before a new task is assigned to a TaskTracker. When the job is running on the cluster, the capacity matrix and cost matrix will be updated if a task is finished. An edge from the finished task to the sink will be set with a capacity 1 and cost .
There exist four versions of quincy.
- Quincy without Preemption and without Fairness(Q).
- Quincy with Preemption and without Fairness(QP).
- Quincy without Preemption and with Fairness(QF).
- Quincy with Preemption and with Fairness(QPF).
Limited to the time, our current implementation does not include preemption and fairness. Preemption is easy to achieve but there are more classes and source codes to modify for fairness control.