在这一个章节中,我们来追踪一下,用户提交了一个作业之后,如何从Mesos之中获取资源?
当然我们还是接着我上一中我们追踪到,FrameworkScheduler的代码,resourceOffers()函数
public void resourceOffers(SchedulerDriver d, List<Offer> offers) { try { synchronized(jobTracker) {
int numOffers = (int) offers.size();
double[] cpus = new double[numOffers];
double[] mem = new double[numOffers];
// Count up the amount of free CPUs and memory
on each node
for (int i = 0; i < numOffers; i++) {
Offer offer = offers.get(i);
LOG.info("Got resource offer " + offer.getId());
cpus[i] = getResource(offer, "cpus");
mem[i] = getResource(offer, "mem");
}
// Assign tasks to the nodes in a round-robin
manner, and stop when we
// are unable to assign a task to any node.
// We do this by keeping a linked list of indices
of nodes for which
// we are still considering assigning tasks. Whenever
we can't find a
// new task for a node, we remove it from the
list. When the list is
// empty, no further assignments can be made.
This algorithm was chosen
// because it minimizing the amount of scanning
we need to do if we
// get a large set of offered nodes.
List<Integer> indices = new LinkedList<Integer>();
List<List<TaskInfo>> replies =
new ArrayList<List<TaskInfo>>(numOffers);
for (int i = 0; i < numOffers; i++) {
indices.add(i);
replies.add(new ArrayList<TaskInfo>());
}
while (indices.size() > 0) {
for (Iterator<Integer> it = indices.iterator();
it.hasNext();) {
int i = it.next();
Offer offer = offers.get(i);
TaskInfo task = findTask(
offer.getSlaveId(), offer.getHostname(), cpus[i],
mem[i]);
if (task != null) {
cpus[i] -= getResource(task, "cpus");
mem[i] -= getResource(task, "mem");
replies.get(i).add(task);
} else {
it.remove();
}
}
}
for (int i = 0; i < numOffers; i++) {
OfferID offerId = offers.get(i).getId();
Status status = d.launchTasks(offerId, replies.get(i));
if (status != Status.DRIVER_RUNNING) {
LOG.warn("SchedulerDriver returned irregular
status: " + status);
}
}
}
} catch(Exception e) {
LOG.error("Error in resourceOffer",
e);
}
} |
这里代码的长度也不是很长,我们一步一步来分析,首先SchedulerProcess调用了本函数,将携带资源的offer传递了过来。
(一)创建了两个数组cpus和mem,这两个数组下标跟offers list的下标是一致的,用他们保存了list中对应下标offer的cpu和mem的数量。
(二)为这个offers list创建了一个索引数组indices,其中保存的是offers
list中的下标。
(三)创建了List<List<TaskInfo>>replies,其中存放的是分配给每个offer的所有任务信息的列表。
(四)按照indices中的顺序遍历,整个offers list,调用了findTask(
offer.getSlaveId(), offer.getHostname(),
cpus[i], mem[i])
我们重点看下这个函数的工作方式:
// Find a single task for a given node. Assumes JobTracker is locked. private TaskInfo findTask( SlaveID slaveId, String host, double cpus, double mem) { if (cpus < cpusPerTask || mem < memPerTask) { return null; // Too few resources are left on the node }
TaskTrackerInfo ttInfo = getTaskTrackerInfo(host,
slaveId);
// Pick whether to launch a map or a reduce
based on available tasks
String taskType = null;
boolean haveMaps = canLaunchMap(host);
boolean haveReduces = canLaunchReduce(host);
//LOG.info("Looking at " + host + ":
haveMaps=" + haveMaps +
// ", haveReduces=" + haveReduces);
if (!haveMaps && !haveReduces) {
return null;
} else if (haveMaps && !haveReduces) {
taskType = "map";
} else if (haveReduces && !haveMaps) {
taskType = "reduce";
} else {
float mapToReduceRatio = 1;
if (ttInfo.reduces.size() < ttInfo.maps.size()
/ mapToReduceRatio)
taskType = "reduce";
else
taskType = "map";
}
//LOG.info("Task type chosen: " + taskType);
// Get a Mesos task ID for the new task
TaskID mesosId = newMesosTaskId();
// Remember that it is launched
boolean isMap = taskType.equals("map");
if (isMap) {
unassignedMaps++;
} else {
unassignedReduces++;
}
MesosTask nt = new MesosTask(isMap, mesosId, host);
mesosIdToMesosTask.put(mesosId, nt);
ttInfo.add(nt);
LOG.info("Launching Mesos task " +
mesosId.getValue() +
" as " + taskType + " on "
+ host);
// Create a task description to pass back to
Mesos.
return TaskInfo.newBuilder()
.setTaskId(mesosId)
.setSlaveId(slaveId)
.setName("task " + mesosId.getValue()
+ " (" + taskType + ")")
.addResources(makeResource("cpus", cpusPerTask))
.addResources(makeResource("mem", memPerTask))
.setExecutor(getExecutorInfo())
.build();
} |
(1)判断该节点剩余的资源是否够分配一个任务
(2)获取该节点主机名对应的TaskTrackerInfo信息,这里TaskTracker仅仅是mesos中用来管理逻辑分配的一个数据结构,跟Hadoop没有关系
(3)那么我们如何获取到hadoop的job的task信息呢?我们来分析另外两个非常重要的函数
private boolean canLaunchMap(String host) { // Check whether the TT is saturated on maps TaskTrackerInfo ttInfo = ttInfos.get(host); if (ttInfo == null) { throw new RuntimeException("Expecting TaskTrackerInfo for host " + host); }
if (ttInfo.maps.size() >= ttInfo.maxMaps)
{
return false;
}
// Compute the total demand for maps to make
sure we don't exceed it
Collection<JobInProgress> jobs = jobTracker.jobs.values();
int neededMaps = 0;
for (JobInProgress job : jobs) {
if (job.getStatus().getRunState() == JobStatus.RUNNING)
{
neededMaps += job.pendingMaps();
}
}
// TODO (!!!): Count speculatable tasks and add
them to neededMaps
// For now, we just add 1
if (jobs.size() > 0)
neededMaps += 1;
if (unassignedMaps < neededMaps) {
// 0. check for a failed map task to place. These
tasks are not included
// in the "normal" lists of tasks in
the JobInProgress object.
for (JobInProgress job: jobs) {
int state = job.getStatus().getRunState();
if (job.failedMaps != null && state ==
JobStatus.RUNNING) {
for (TaskInProgress tip : job.failedMaps) {
if (!tip.hasFailedOnMachine(host)) {
return true;
}
}
}
}
int maxLevel = Integer.MAX_VALUE;
// Look for a map with the required level
for (JobInProgress job: jobs) {
int state = job.getStatus().getRunState();
if (state == JobStatus.RUNNING) {
int availLevel = availableMapLevel(job, host,
maxLevel);
if (availLevel != -1) {
lastMapWasLocal = (availLevel == 0);
return true;
}
}
}
} |
1、从FrameworkSheduler保存的表:Map<String, TaskTrackerInfo>
ttInfos中获取对应于该slave节点的TaskTrackerInfo,TaskTrackInfo中记载了什么东西呢?
private static class TaskTrackerInfo { SlaveID mesosSlaveId; List<MesosTask> maps = new LinkedList<MesosTask>(); List<MesosTask> reduces = new LinkedList<MesosTask>(); int maxMaps = 1; int maxReduces = 1;
public TaskTrackerInfo(SlaveID mesosSlaveId)
{
this.mesosSlaveId = mesosSlaveId;
}
void add(MesosTask nt) {
if (nt.isMap)
maps.add(nt);
else
reduces.add(nt);
}
public void remove(MesosTask nt) {
if (nt.isMap)
maps.remove(nt);
else
reduces.remove(nt);
} |
这是TaskTrackInfo的定义,它是FrameworkScheduler的一个内部类,记录它所属的Slave节点的Slaveid
在该slave节点上属于该计算框架的map和reduce任务的列表,这些任务的标识是mesos内部产生的,对于计算框架是透明的。
里面定义了最大map和reduce队列的长度杀掉超时的任务的方法。
2、获取了jobtracker上已经初始化的jobs的列表(这里非常的重要,就是在这个地方,mesos从hadoop中把用户提交的jobs拉了过来),遍列列表中的没有个JobInprogress,计算所有的job需要的map任务的总和neededmaps。
3、遍历jobs的列表中的每一个job,优先处理job的failmaps列表中任务,failmaps列表中的每一个任务的执行失败的主机列表,如果没有该salve节点,就返回true表明可以lauch一个map任务。
我们分析canLaunchMap这函数的目的是找到mesos跟hadoop交互的那个点,对于具体的任务查找方式,大家可以忽略,因为这些东西都是可以修改的。我们只需要记住就是在findTask->canLaunchMap之中,我们实现的基于mesos的调度器,去jobtracker上获取jobs的信息。
我们回到findTask函数之中,对于canLauchReduce的处理,大家可以自行解读,其实跟canLaunchMap大同小异。
(4)我们知道在该节点上是应该启动一个map和reduce任务之后,就产生一个新的TaskID为这个新任务。更新unassignedMaps或者unassignedReduces两个计数器。创建一个新的MesosTask对象:
MesosTask nt = new MesosTask(isMap, mesosId, host);将其加入到Map<TaskID,
MesosTask> mesosIdToMesosTask这张表之中,便于查找。将该MesosTask加入到该salve节点对应TaskTrackInfo之中。同时返回一个TaskInfo对象,其中记录了任务的mesosid,slaveid,executorinfo等信息
ok,现在我们回到再上一层的resourceOffers函数:
(五)我们通过findtask()函数获取到了一个taskInfo,如果该TaskInfo不为空,我们继续寻找,为空说明资源已经使用完毕或者hadoop任务task都已经分配完毕,应该从indices之中将对应于该offer的下标擦除。
整个循环执行完毕之后,replies之中就记录了对应于每个offer的需要启动的任务信息的一个列表,但是这些任务信息都现在为止还仅仅是一个标志(我应该在哪个slave节点上,启动一个什么框架的什么任务(map或者reduce),并没有具体任务的信息)
(六)接下来就是遍历replies,去启动这些任务,调用了SchedulerDriver.launchTasks()方法。调用我们之前sched.cpp之中launchTasks()方法,该方法会产生一条LanuchTasksMessage消息中包含了frameworkid、offerId、所有在该slave节点上的task的信息,一起发送给了Mesos
master
(七)master接受到了LauchTasksMessage之后会调用这个消息的处理函数launchTasks。
void Master::launchTasks(const FrameworkID& frameworkId, const OfferID& offerId, const vector<TaskInfo>& tasks, const Filters& filters) { Framework* framework = getFramework(frameworkId); if (framework != NULL) { // TODO(benh): Support offer "hoarding" and allow multiple offers // *from the same slave* to be used to launch tasks. This can be // accomplished rather easily by collecting and merging all offers // into a mega-offer and passing that offer to // Master::processTasks. Offer* offer = getOffer(offerId); if (offer != NULL) { CHECK(offer->framework_id() == frameworkId); Slave* slave = getSlave(offer->slave_id()); CHECK(slave != NULL) << "An offer should not outlive a slave!"; processTasks(offer, framework, slave, tasks, filters); } else { // The offer is gone (possibly rescinded, lost slave, re-reply // to same offer, etc). Report all tasks in it as failed. // TODO: Consider adding a new task state TASK_INVALID for // situations like these. LOG(WARNING) << "Offer " << offerId << " is no longer valid"; foreach (const TaskInfo& task, tasks) { StatusUpdateMessage message; StatusUpdate* update = message.mutable_update(); update->mutable_framework_id()->MergeFrom(frameworkId); TaskStatus* status = update->mutable_status(); status->mutable_task_id()->MergeFrom(task.task_id()); status->set_state(TASK_LOST); status->set_message("Task launched with invalid offer"); update->set_timestamp(Clock::now()); update->set_uuid(UUID::random().toBytes()); send(framework->pid, message); } } } } |
【1】根据frameworkId去master的hashmap<FrameworkID, Framework*>frameworks中取出对应的framework对象,通过offerId获取了该offer的所有信息,调用了processTasks函数。
【2】该函数主要作用统计offer之中的资源使用情况,将其报告给allocator更新资源,调用了launchTask()我们下一节分析任务加载的过程。
总结一下:我们可以看到,其实在整个分配资源的过程之中,mesos用一些特定的数据结构,对于任务、资源、计算框架进行了逻辑上的标记、然后等逻辑上划分好了之后呢?然后再去通知真正的干事的程序去执行,这些逻辑上的数据结构跟实际的数据结构之间的对应关系设计的十分的巧妙,使得整个的调度变得十分的灵巧,减少了通讯的开销! |