H-Quincy: Fair Scheduling for Hadoop Clusters

5563 2013-09-14 16:49

H-Quincy implements the paper Quincy: Fair Scheduling for Distributed Computing Clusters on Hadoop, which improves the mapreduce scheduler by replacing the default queue-based one with a flow-based one. A min cost flow is calculated and updated to assign map tasks among the cluster, according to the size of the data split and the communication overhead in the cluster’s hierarchy.


git clone https://github.com/puncsky/H-Quincy.git

You can either build from source code or user the JAR directly.

  • Build from Source Code. Replace your $HADOOP_HOME/src/mapred/org/apache/hadoop/mapred with files in src/. Enter $HADOOP_HOME and build with ant.
  • utilize the JAR directly. Replace your $HADOOP_HOME/hadoop-core-{version}.jar with hadoop-core-1.0.4-SNAPSHOT.jar

The Quincy paper is rather theoretical organized and involves a large number of mathematical details, which reasonably makes itself hard to understand. The following sections explains our implementation.

1. Architecture

Figure 1 displays an outline of our architecture. There exist three kinds of nodes and accordingly three levels of hierarchy in the cluster. Computing nodes are connected via a rack switch, and placed in the same rack. Rack switches are connected via a core switch. Core switches and rack switches do not undertake computing works but can still be presented in the Hadoop system as nodes.


Figure 1: A sample architecture with simplified preferred lists.

As we know, Hadoop takes a master/slave architecture, which includes a single master and multiple worker nodes. The master node in general consists of a JobTracker and a NameNode. The slave worker consist of a TaskTracker and a DataNode. Nevertheless, the master can also play the role of the slave at the same time. JobTracker assigns tasks to the TaskTrackers for execution, and then collects the result. NameNode manages the index for blocks of data storing in DataNodes among the Hadoop Distributed File System.

In our cluster, each computing node is both a TaskTracker and a DataNode, which means they are all slaves. And we select one of them as a master, which is both a JobTracker and a DataNode simultaneously. The master maintains the relationship with slaves through heartbeat, whose overhead is ignorable when compared to the data transfer or the execution of tasks.

There may be many jobs sharing the cluster resources at the same time. When a job comes into the JobTracker’s job queue, the JobTracker resorts the queue by the priority and the start time of these jobs. A job is composed of the map tasks and the reduce tasks. When the job comes out of the queue and starts to run, the JobTracker will analyze the input data’s distribution over those TaskTrackers and initialize a list of preferred tasks for each TaskTracker, rack switch, and core switch, as shown in figure 1. A task will occur on the preferred list of some node if its data splits are stored in that node or in any of its child nodes. Then the JobTracker’s scheduler matches tasks to the TaskTrackers and launch them to run the tasks on their own newly-assigned lists. At the same time, the JobTracker keeps collecting status information from TaskTrackers until all the tasks finish. If failure happens in the TaskTracker, the JobTracker will restart the task from that TaskTracker or enable a new TaskTracker to execute the task. The scheduler can kill a task on a TaskTracker with preemption if there is a more suitable arrangement.

2. Hadoop Default Scheduler

After the initialization of preferred lists, the JobTracker assign a series of actions, including the tasks waiting for execution, into the response to heartbeat from the matched TaskTracker. The matched TaskTracker receives the heartbeat response and adds the actions to the task queue.

2.1 Queue-based Greedy Scheduler without Fairness

In the task assignment, the JobTracker’s task scheduler first calculates the max workload for every job, and leaves certain padding slots for speculative tasks. The speculative task backs up some running task, in case that the running task is too slow and impede the job’s progress. Map tasks are assigned before reduce tasks. Data-local and rack-local tasks are assigned before non-local tasks.

The default setup for Hadoop is a queue-based greedy scheduler. The preferred task lists of every node can be deemed as queues. Each computing node has a queue of tasks which can be executed without pulling data from other places. Each computing node in a rack shares a rack queue in case that computing node can execute tasks with pulling data splits from other nodes in the same rack, when the local list has already been finished. Since racks are connected with a core switch, racks also share a global queue. For every assignment, node will compute the previously failed tasks first, then the non-running local and non-local tasks, and finally the speculative tasks.

2.2 Queue-based Greedy and Fair Scheduler

When only one job is running on the cluster, that job will use the entire cluster. What will happen if multiple jobs are submitted? The default’s sorting by priority and start time only ensures that more significant and earlier submitted jobs are dequeued first. However, the Hadoop fair scheduler arranges jobs into pools. Computing resources are divided fairly between these pools. A pool can be occupied by only one user (by default) or shared by a user group. Within the pool, jobs can be scheduled evenly or first-in-first-serve.

Assume we have MM computers and KK running jobs, and job jj has N_jN\_{j} tasks in total. Each job jj will be allocated A_j=min(M/K,N_j)A\_{j} = min(\lfloor M/K\rfloor, N\_{j}) task slots. If there are still resting slots, divide them evenly. After finishing the running tasks, a TaskTracker will follow the new allocation A_jA\_{j}. However, if the process is not preemptive, the running tasks will definitely not be affected when new jobs are submitted irrespective of the changed allocation A_jA\_{j}. If the fairness is ensured with preemption, the running tasks will be killed while a new quota A_jA\_{j} shows up.

Hadoop default scheduler sets up a wait time to enable a portion of the cluster wait to get better locality, preventing a job’s tasks from becoming sticky to a TaskTracker.

3. Quincy Scheduler

[Quincy](Quincy: Fair Scheduling for Distributed Computing Clusters) introduces a new framework transforming the scheduling into a global min cost max flow problem. Running a particular task on some machine incurs a data calculation cost and, potentially, a data transfer cost. Killing a running task also incurs a wasted time cost. If different kinds of costs can be expressed in the same unit, then we can investigate an algorithm to minimize the total cost of the scheduling.

3.1 Min Cost Max Flow

In a flow network, a directed graph G=(V,E)G=(V, E) has sourceVsource \in V and sinkVsink \in V. For each edge (u,v)E(u,v)\in E, there are capacity(u,v)Ncapacity(u,v)\in\mathbb{N}, flow(u,v)Nflow(u,v)\in\mathbb{N} and cost(u,v)Rcost(u,v)\in\mathbb{R}.

The problem is to calculate the min cost flow

min(Eflowcost)min(\sum_{E}flow\cdot cost)

Edmonds-Karp algorithm is used to calculate the min cost flow with O(VE2)O(V\cdot E ^ 2) in our implementation.

3.2 Initialization of the Matrix

Figure 2 shows the graph along with the same topology in Figure 1. Since supplies are from a variety of sources – task nodes and unscheduled nodes, the graph is a multi-source single-sink flow. Our implementation adds a virtual source to transform the flow into a single-source one.

flow graph

Figure 2: Min-Cost Max Flow Graph

3.2.1 Capacity

Each task node has a supply of 1, so capacity(source,task)=1capacity(source, task) = 1. The unscheduled is used to control the fairness. Tasks flowing to the unscheduled will not be assigned to computing nodes at this time. Each job must have and only has one unscheduled node with $$capacity(source, unscheduled) = F_j - N_j$$ where F_jF\_j is the max number of running tasks job j may has. N_jN\_j is the number of TaskTrackers assigned to this job.

From each task node, there are edges to the core switch, preferred rack, and preferred computing nodes. By default, every split of data has three replicas, so the number of preferred computing nodes is usually 3. So we can yield $$capacity(task, core switch) = 1$$ $$capacity(task, preferredRackSwitch) = 1$$ $$capacity(task, preferredComputingNode) = 1$$

From the unscheduled, there is only one edge to sink with capacity(unscheduled,sink)=F_jE_jcapacity(unscheduled, sink) = F\_j - E\_j, where E_jE\_j is the min number of running tasks job j may have.

From the core switch, there are edges to every rack with capacities of $$capacity(coreSwitch, rackSwitch) = numberOfTaskTrackersInThatRack$$

From each rack switch, there are edges to every computing node with capacity of capacity(rackSwitch,computingNode)=1capacity(rackSwitch, computingNode) = 1.

From each computing node, there is only one edge with capacity(computingnode,sink)=numberOfTaskSlotscapacity(computing node, sink) = numberOfTaskSlots. The number of task slots is 2 by Hadoop’s default for map tasks. The value is 1 for reduce tasks.

3.2.2 Cost

The cost of scheduling a task t_njt\_n ^ j job jj with nn tasks onto a computing node is α_nj=ψRX(t_nj)+ξXX(t_nj)\alpha\_n ^ j = \psi R ^ X(t\_n ^ j) + \xi X ^ X(t\_n ^ j), where ψ\psi is the cost to transfer one GB across a rack switch, ξ\xi is the cost to transfer one GB across the core switch. (RX(t_nj),XX(t_nj))(R ^ X(t\_n ^ j), X ^ X(t\_n ^ j)) is the upper bounds of the transferred data size across a rack switch and across a core switch.

The cost of scheduling a task onto a preferred rack is ρj_n,l=ψRR_l(t_nj)+ξXR_l(t_nj)\rho ^ j\_{n,l} = \psi R ^ R\_l(t\_n ^ j) + \xi X ^ R\_l(t\_n ^ j).

The cost of scheduling a task onto a preferred computer is γj_n,m=ψRC_m(t_nj)+ξXC_m(t_nj)\gamma ^ j\_{n,m} = \psi R ^ C\_m(t\_n ^ j) + \xi X ^ C\_m(t\_n ^ j). However, if the computer is now executing the same task, the cost should be γj_n,m=ψRC_m(t_nj)+ξXC_m(t_nj)θj_n\gamma ^ j\_{n,m} = \psi R ^ C\_m(t\_n ^ j) + \xi X ^ C\_m(t\_n ^ j) - \theta ^ j\_n, where θ\theta is the number of seconds for which the task has been running.

The cost of scheduling a task onto the unscheduled is υj_n=ωνj_n\upsilon ^ j\_n = \omega \nu ^ j\_n, where ω\omega is a wait-time factor and νj_n\nu ^ j\_n is the total number of seconds that task nn in job jj has spent unscheduled.

In our current version for testing, the wait-time factor ω=0.5\omega=0.5, ψ=1\psi = 1 per GB, ξ=2\xi = 2 per GB. ψ\psi and ξ\xi can be set larger to achieve a better locality.

3.3 Assignment and Update

After initialization, the min cost flow matrix will be recalculated every time before a new task is assigned to a TaskTracker. When the job is running on the cluster, the capacity matrix and cost matrix will be updated if a task is finished. An edge from the finished task to the sink will be set with a capacity 1 and cost 1000νj_n-1000-\nu ^ j\_n.

3.4 Preemption and Fairness

There exist four versions of quincy.

  • Quincy without Preemption and without Fairness(Q).
  • Quincy with Preemption and without Fairness(QP).
  • Quincy without Preemption and with Fairness(QF).
  • Quincy with Preemption and with Fairness(QPF).

Limited to the time, our current implementation does not include preemption and fairness. Preemption is easy to achieve but there are more classes and source codes to modify for fairness control.


在 Github 上 Follow 我 :)

下载 硅谷io App

关注互联网创业,用移动 App ,随时随地收藏和复习好文章