Flink 1.1 – ResourceManager
Flink resource manager的作用如图,
FlinkResourceManager
/**<br/> *<br/> * <h1>Worker allocation steps</h1><br/> *<br/> * <ol><br/> * <li>The resource manager decides to request more workers. This can happen in order<br/> * to fill the initial pool, or as a result of the JobManager requesting more workers.</li><br/> *<br/> * <li>The resource master calls {@link #requestNewWorkers(int)}, which triggers requests<br/> * for more containers. After that, the {@link #getNumWorkerRequestsPending()}<br/> * should reflect the pending requests.</li><br/> *<br/> * <li>The concrete framework may acquire containers and then trigger to start TaskManagers<br/> * in those containers. That should be reflected in {@link #getNumWorkersPendingRegistration()}.</li><br/> *<br/> * <li>At some point, the TaskManager processes will have started and send a registration<br/> * message to the JobManager. The JobManager will perform<br/> * a lookup with the ResourceManager to check if it really started this TaskManager.<br/> * The method {@link #workerStarted(ResourceID)} will be called<br/> * to inform about a registered worker.</li><br/> * </ol><br/> *<br/> */<br/> public abstract class FlinkResourceManager<WorkerType extends ResourceIDRetrievable> extends FlinkUntypedActor { /** The service to find the right leader JobManager (to support high availability) */<br/> private final LeaderRetrievalService leaderRetriever; //用于发现leader jobmanager和当leader切换时收到通知 /** Map which contains the workers from which we know that they have been successfully started<br/> * in a container. This notification is sent by the JM when a TM tries to register at it. */<br/> private final Map<ResourceID, WorkerType> startedWorkers; //已经成功启动的Workers,当他启动成功注册到JM的时候,JM会发出通知 /** The JobManager that the framework master manages resources for */<br/> private ActorRef jobManager; /** Our JobManager's leader session */<br/> private UUID leaderSessionID; /** The size of the worker pool that the resource master strives to maintain */<br/> private int designatedPoolSize; //resource pool大小
上面注释里面,把申请resource的过程写的蛮清楚的
ResourceManager作为actor, 主要是处理message,
@Override<br/> protected void handleMessage(Object message) {<br/> try {<br/> // --- messages about worker allocation and pool sizes if (message instanceof CheckAndAllocateContainers) {<br/> checkWorkersPool();<br/> }<br/> else if (message instanceof SetWorkerPoolSize) {<br/> SetWorkerPoolSize msg = (SetWorkerPoolSize) message;<br/> adjustDesignatedNumberOfWorkers(msg.numberOfWorkers());<br/> }<br/> else if (message instanceof RemoveResource) {<br/> RemoveResource msg = (RemoveResource) message;<br/> removeRegisteredResource(msg.resourceId());<br/> } // --- lookup of registered resources else if (message instanceof NotifyResourceStarted) {<br/> NotifyResourceStarted msg = (NotifyResourceStarted) message;<br/> handleResourceStarted(sender(), msg.getResourceID());<br/> } // --- messages about JobManager leader status and registration else if (message instanceof NewLeaderAvailable) {<br/> NewLeaderAvailable msg = (NewLeaderAvailable) message;<br/> newJobManagerLeaderAvailable(msg.leaderAddress(), msg.leaderSessionId());<br/> }<br/> else if (message instanceof TriggerRegistrationAtJobManager) {<br/> TriggerRegistrationAtJobManager msg = (TriggerRegistrationAtJobManager) message;<br/> triggerConnectingToJobManager(msg.jobManagerAddress());<br/> }<br/> else if (message instanceof RegisterResourceManagerSuccessful) {<br/> RegisterResourceManagerSuccessful msg = (RegisterResourceManagerSuccessful) message;<br/> jobManagerLeaderConnected(msg.jobManager(), msg.currentlyRegisteredTaskManagers());<br/> }
其中关键的是,
checkWorkersPool
/**<br/> * This method causes the resource framework master to <b>synchronously</b>re-examine<br/> * the set of available and pending workers containers, and allocate containers<br/> * if needed.<br/> *<br/> * This method does not automatically release workers, because it is not visible to<br/> * this resource master which workers can be released. Instead, the JobManager must<br/> * explicitly release individual workers.<br/> */<br/> private void checkWorkersPool() {<br/> int numWorkersPending = getNumWorkerRequestsPending();<br/> int numWorkersPendingRegistration = getNumWorkersPendingRegistration(); // see how many workers we want, and whether we have enough<br/> int allAvailableAndPending = startedWorkers.size() +<br/> numWorkersPending + numWorkersPendingRegistration; int missing = designatedPoolSize - allAvailableAndPending; if (missing > 0) {<br/> requestNewWorkers(missing); //如果现有的worker不够,去requestNewWorker<br/> }<br/> }
job在收到taskManager的register信息后,会通知ResourceManager,调用到handleResourceStarted
/**<br/> * Tells the ResourceManager that a TaskManager had been started in a container with the given<br/> * resource id.<br/> *<br/> * @param jobManager The sender (JobManager) of the message<br/> * @param resourceID The resource id of the started TaskManager<br/> */<br/> private void handleResourceStarted(ActorRef jobManager, ResourceID resourceID) {<br/> if (resourceID != null) {<br/> // check if resourceID is already registered (TaskManager may send duplicate register messages)<br/> WorkerType oldWorker = startedWorkers.get(resourceID);<br/> if (oldWorker != null) { //看看该worker是否已经存在<br/> LOG.debug("Notification that TaskManager {} had been started was sent before.", resourceID);<br/> } else {<br/> WorkerType newWorker = workerStarted(resourceID); //取得worker if (newWorker != null) {<br/> startedWorkers.put(resourceID, newWorker); //注册新的worker<br/> LOG.info("TaskManager {} has started.", resourceID);<br/> } else {<br/> LOG.info("TaskManager {} has not been started by this resource manager.", resourceID);<br/> }<br/> }<br/> } // Acknowledge the resource registration<br/> jobManager.tell(decorateMessage(Acknowledge.get()), self()); //告诉jobManager,已经完成注册<br/> }
Job资源分配的过程,
在submitJob中,会生成ExecutionGraph
最终调用到,
executionGraph.scheduleForExecution(scheduler)
接着,ExecutionGraph
public void scheduleForExecution(SlotProvider slotProvider) throws JobException {
// simply take the vertices without inputs.<br/>for (ExecutionJobVertex ejv : this.tasks.values()) {<br/> if (ejv.getJobVertex().isInputVertex()) {<br/> ejv.scheduleAll(slotProvider, allowQueuedScheduling);<br/> }<br/>}
然后,ExecutionJobVertex
public void scheduleAll(SlotProvider slotProvider, boolean queued) throws NoResourceAvailableException { ExecutionVertex[] vertices = this.taskVertices; // kick off the tasks<br/> for (ExecutionVertex ev : vertices) {<br/> ev.scheduleForExecution(slotProvider, queued);<br/> }<br/>}
再,ExecutionVertex
public boolean scheduleForExecution(SlotProvider slotProvider, boolean queued) throws NoResourceAvailableException {<br/> return this.currentExecution.scheduleForExecution(slotProvider, queued);<br/>}
最终,Execution
public boolean scheduleForExecution(SlotProvider slotProvider, boolean queued) throws NoResourceAvailableException { final SlotSharingGroup sharingGroup = vertex.getJobVertex().getSlotSharingGroup();<br/> final CoLocationConstraint locationConstraint = vertex.getLocationConstraint(); if (transitionState(CREATED, SCHEDULED)) { ScheduledUnit toSchedule = locationConstraint == null ?<br/> new ScheduledUnit(this, sharingGroup) :<br/> new ScheduledUnit(this, sharingGroup, locationConstraint); // IMPORTANT: To prevent leaks of cluster resources, we need to make sure that slots are returned<br/> // in all cases where the deployment failed. we use many try {} finally {} clauses to assure that<br/> final Future<SimpleSlot> slotAllocationFuture = slotProvider.allocateSlot(toSchedule, queued); //异步去申请资源 // IMPORTANT: We have to use the synchronous handle operation (direct executor) here so<br/> // that we directly deploy the tasks if the slot allocation future is completed. This is<br/> // necessary for immediate deployment.<br/> final Future<Void> deploymentFuture = slotAllocationFuture.handle(new BiFunction<SimpleSlot, Throwable, Void>() {<br/> @Override<br/> public Void apply(SimpleSlot simpleSlot, Throwable throwable) {<br/> if (simpleSlot != null) {<br/> try {<br/> deployToSlot(simpleSlot); //如果申请到,去部署<br/> } catch (Throwable t) {<br/> try {<br/> simpleSlot.releaseSlot();<br/> } finally {<br/> markFailed(t);<br/> }<br/> }<br/> }<br/> else {<br/> markFailed(throwable);<br/> }<br/> return null;<br/> }<br/> }); return true;<br/> }
调用到,slotProvider.allocateSlot, slotProvider即Scheduler
@Override<br/> public Future<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued)<br/> throws NoResourceAvailableException { final Object ret = scheduleTask(task, allowQueued);<br/> if (ret instanceof SimpleSlot) {<br/> return FlinkCompletableFuture.completed((SimpleSlot) ret); //如果是SimpleSlot,即已经分配成功,表示future结束<br/> }<br/> else if (ret instanceof Future) {<br/> return (Future) ret; //Future说明没有足够资源,申请还在异步中,继续future<br/> }<br/> else {<br/> throw new RuntimeException();<br/> }<br/> }
scheduleTask
/**<br/> * Returns either a {@link SimpleSlot}, or a {@link Future}.<br/> */<br/> private Object scheduleTask(ScheduledUnit task, boolean queueIfNoResource) throws NoResourceAvailableException { final ExecutionVertex vertex = task.getTaskToExecute().getVertex(); final Iterable<TaskManagerLocation> preferredLocations = vertex.getPreferredLocations();<br/> final boolean forceExternalLocation = vertex.isScheduleLocalOnly() &&<br/> preferredLocations != null && preferredLocations.iterator().hasNext(); synchronized (globalLock) { //全局锁 SlotSharingGroup sharingUnit = task.getSlotSharingGroup(); if (sharingUnit != null) { //如果是共享slot // 1) === If the task has a slot sharing group, schedule with shared slots === final SlotSharingGroupAssignment assignment = sharingUnit.getTaskAssignment();<br/> final CoLocationConstraint constraint = task.getLocationConstraint(); // get a slot from the group, if the group has one for us (and can fulfill the constraint)<br/> final SimpleSlot slotFromGroup;<br/> if (constraint == null) {<br/> slotFromGroup = assignment.getSlotForTask(vertex); //试图从现有的slots中找合适的<br/> }<br/> else {<br/> slotFromGroup = assignment.getSlotForTask(vertex, constraint);<br/> } SimpleSlot newSlot = null;<br/> SimpleSlot toUse = null; // the following needs to make sure any allocated slot is released in case of an error<br/> try { // check whether the slot from the group is already what we want.<br/> // any slot that is local, or where the assignment was unconstrained is good!<br/> if (slotFromGroup != null && slotFromGroup.getLocality() != Locality.NON_LOCAL) { //如果可以找到合适的<br/> updateLocalityCounters(slotFromGroup, vertex);<br/> return slotFromGroup; //已经找到合适的slot,返回<br/> } // the group did not have a local slot for us. see if we can one (or a better one)<br/> newSlot = getNewSlotForSharingGroup(vertex, locations, assignment, constraint, localOnly); //试图申请一个新的slot if (slotFromGroup == null || !slotFromGroup.isAlive() || newSlot.getLocality() == Locality.LOCAL) {<br/> // if there is no slot from the group, or the new slot is local,<br/> // then we use the new slot<br/> if (slotFromGroup != null) {<br/> slotFromGroup.releaseSlot();<br/> }<br/> toUse = newSlot;<br/> }<br/> else {<br/> // both are available and usable. neither is local. in that case, we may<br/> // as well use the slot from the sharing group, to minimize the number of<br/> // instances that the job occupies<br/> newSlot.releaseSlot();<br/> toUse = slotFromGroup;<br/> } // if this is the first slot for the co-location constraint, we lock<br/> // the location, because we are going to use that slot<br/> if (constraint != null && !constraint.isAssigned()) {<br/> constraint.lockLocation();<br/> } updateLocalityCounters(toUse, vertex);<br/> } return toUse; //返回申请的slot<br/> }<br/> else { //如果不是共享slot,比较简单 // 2) === schedule without hints and sharing === SimpleSlot slot = getFreeSlotForTask(vertex, preferredLocations, forceExternalLocation); //直接申请slot<br/> if (slot != null) {<br/> updateLocalityCounters(slot, vertex);<br/> return slot; //申请到了就返回slot<br/> }<br/> else {<br/> // no resource available now, so queue the request<br/> if (queueIfNoResource) { //如果可以queue<br/> CompletableFuture<SimpleSlot> future = new FlinkCompletableFuture<>();<br/> this.taskQueue.add(new QueuedTask(task, future)); //把task缓存起来,并把future对象返回,表示异步申请<br/> return future;<br/> }<br/> }<br/> }<br/> }<br/> }
我们直接看非共享slot的case,
会调用到, getFreeSlotForTask
/**<br/> * Gets a suitable instance to schedule the vertex execution to.<br/> * <p><br/> * NOTE: This method does is not thread-safe, it needs to be synchronized by the caller.<br/> *<br/> * @param vertex The task to run.<br/> * @return The instance to run the vertex on, it {@code null}, if no instance is available.<br/> */<br/> protected SimpleSlot getFreeSlotForTask(ExecutionVertex vertex,<br/> Iterable<TaskManagerLocation> requestedLocations,<br/> boolean localOnly) {<br/> // we need potentially to loop multiple times, because there may be false positives<br/> // in the set-with-available-instances<br/> while (true) {<br/> Pair<Instance, Locality> instanceLocalityPair = findInstance(requestedLocations, localOnly); //找到分配slot的并符合location约束的instance if (instanceLocalityPair == null){<br/> return null; //没有合适的instance,分配失败<br/> } Instance instanceToUse = instanceLocalityPair.getLeft();<br/> Locality locality = instanceLocalityPair.getRight(); try {<br/> SimpleSlot slot = instanceToUse.allocateSimpleSlot(vertex.getJobId()); //从instance分配出slot // if the instance has further available slots, re-add it to the set of available resources.<br/> if (instanceToUse.hasResourcesAvailable()) { //如果这个实例还有resources,放入instancesWithAvailableResources,下次可以继续分配<br/> this.instancesWithAvailableResources.put(instanceToUse.getTaskManagerID(), instanceToUse);<br/> } if (slot != null) {<br/> slot.setLocality(locality);<br/> return slot; //成功就返回slot<br/> }<br/> }<br/> catch (InstanceDiedException e) {<br/> // the instance died it has not yet been propagated to this scheduler<br/> // remove the instance from the set of available instances<br/> removeInstance(instanceToUse);<br/> } // if we failed to get a slot, fall through the loop<br/> }<br/> }
findInstance
/**<br/> * Tries to find a requested instance. If no such instance is available it will return a non-<br/> * local instance. If no such instance exists (all slots occupied), then return null.<br/> *<br/> * <p><b>NOTE:</b> This method is not thread-safe, it needs to be synchronized by the caller.</p><br/> *<br/> * @param requestedLocations The list of preferred instances. May be null or empty, which indicates that<br/> * no locality preference exists.<br/> * @param localOnly Flag to indicate whether only one of the exact local instances can be chosen.<br/> */<br/> private Pair<Instance, Locality> findInstance(Iterable<TaskManagerLocation> requestedLocations, boolean localOnly) { // drain the queue of newly available instances<br/> while (this.newlyAvailableInstances.size() > 0) { //把newlyAvailableInstances新加到instancesWithAvailableResources<br/> Instance queuedInstance = this.newlyAvailableInstances.poll();<br/> if (queuedInstance != null) {<br/> this.instancesWithAvailableResources.put(queuedInstance.getTaskManagerID(), queuedInstance);<br/> }<br/> } // if nothing is available at all, return null<br/> if (this.instancesWithAvailableResources.isEmpty()) { //如果没有instancesWithAvailableResources,直接返回失败<br/> return null;<br/> } Iterator<TaskManagerLocation> locations = requestedLocations == null ? null : requestedLocations.iterator(); if (locations != null && locations.hasNext()) { //按照locality preference依次找instance<br/> // we have a locality preference while (locations.hasNext()) {<br/> TaskManagerLocation location = locations.next();<br/> if (location != null) {<br/> Instance instance = instancesWithAvailableResources.remove(location.getResourceID());<br/> if (instance != null) {<br/> return new ImmutablePair<Instance, Locality>(instance, Locality.LOCAL);<br/> }<br/> }<br/> } // no local instance available<br/> if (localOnly) {<br/> return null;<br/> }<br/> else {<br/> // take the first instance from the instances with resources<br/> Iterator<Instance> instances = instancesWithAvailableResources.values().iterator();<br/> Instance instanceToUse = instances.next();<br/> instances.remove(); return new ImmutablePair<>(instanceToUse, Locality.NON_LOCAL);<br/> }<br/> }<br/> else {<br/> // no location preference, so use some instance<br/> Iterator<Instance> instances = instancesWithAvailableResources.values().iterator();<br/> Instance instanceToUse = instances.next();<br/> instances.remove(); return new ImmutablePair<>(instanceToUse, Locality.UNCONSTRAINED);<br/> }<br/> }
那么继续,newlyAvailableInstances,哪儿来的?
@Override<br/> public void newInstanceAvailable(Instance instance) { // synchronize globally for instance changes<br/> synchronized (this.globalLock) { try {<br/> // make sure we get notifications about slots becoming available<br/> instance.setSlotAvailabilityListener(this); //将Scheduler设为Instance的SlotAvailabilityListener // store the instance in the by-host-lookup<br/> String instanceHostName = instance.getTaskManagerLocation().getHostname();<br/> Set<Instance> instanceSet = allInstancesByHost.get(instanceHostName);<br/> if (instanceSet == null) {<br/> instanceSet = new HashSet<Instance>();<br/> allInstancesByHost.put(instanceHostName, instanceSet);<br/> }<br/> instanceSet.add(instance); // add it to the available resources and let potential waiters know<br/> this.instancesWithAvailableResources.put(instance.getTaskManagerID(), instance); //放入instancesWithAvailableResources // add all slots as available<br/> for (int i = 0; i < instance.getNumberOfAvailableSlots(); i++) {<br/> newSlotAvailable(instance);<br/> }<br/> }<br/> }<br/> }
@Override<br/> public void newSlotAvailable(final Instance instance) { // WARNING: The asynchrony here is necessary, because we cannot guarantee the order<br/> // of lock acquisition (global scheduler, instance) and otherwise lead to potential deadlocks:<br/> //<br/> // -> The scheduler needs to grab them (1) global scheduler lock<br/> // (2) slot/instance lock<br/> // -> The slot releasing grabs (1) slot/instance (for releasing) and<br/> // (2) scheduler (to check whether to take a new task item<br/> //<br/> // that leads with a high probability to deadlocks, when scheduling fast this.newlyAvailableInstances.add(instance); //加入到newlyAvailableInstances Futures.future(new Callable<Object>() {<br/> @Override<br/> public Object call() throws Exception {<br/> handleNewSlot(); //异步的处理queue中的task,当有新的slot要把queue中的task执行掉<br/> return null;<br/> }<br/> }, executionContext);<br/> }
接着newInstanceAvailable,在InstanceManager里面被调用,
private void notifyNewInstance(Instance instance) {<br/> synchronized (this.instanceListeners) {<br/> for (InstanceListener listener : this.instanceListeners) {<br/> try {<br/> listener.newInstanceAvailable(instance);<br/> }<br/> catch (Throwable t) {<br/> LOG.error("Notification of new instance availability failed.", t);<br/> }<br/> }<br/> }<br/> }
notifyNewInstance在registerTaskManager中被调用,
registerTaskManager是在JobManager里面当taskManager注册时被调用的
case msg @ RegisterTaskManager(<br/> resourceId,<br/> connectionInfo,<br/> hardwareInformation,<br/> numberOfSlots) => val taskManager = sender() currentResourceManager match {<br/> case Some(rm) => //如果有resourceManager<br/> val future = (rm ? decorateMessage(new NotifyResourceStarted(msg.resourceId)))(timeout) //通知ResourceMananger,某个resource已经成功启动<br/> } // ResourceManager is told about the resource, now let's try to register TaskManager<br/> if (instanceManager.isRegistered(resourceId)) { //如果已经注册过<br/> val instanceID = instanceManager.getRegisteredInstance(resourceId).getId taskManager ! decorateMessage(<br/> AlreadyRegistered(<br/> instanceID,<br/> libraryCacheManager.getBlobServerPort))<br/> } else { //新的resource<br/> try {<br/> val actorGateway = new AkkaActorGateway(taskManager, leaderSessionID.orNull)<br/> val taskManagerGateway = new ActorTaskManagerGateway(actorGateway) val instanceID = instanceManager.registerTaskManager( //向InstanceManager注册该TaskManager<br/> taskManagerGateway,<br/> connectionInfo,<br/> hardwareInformation,<br/> numberOfSlots) taskManagerMap.put(taskManager, instanceID) //在jobManager里面记录该taskManager taskManager ! decorateMessage(<br/> AcknowledgeRegistration(instanceID, libraryCacheManager.getBlobServerPort)) //通知taskManager完成注册 // to be notified when the taskManager is no longer reachable<br/> context.watch(taskManager)<br/> }<br/> }
这个版本没有实现图中的架构
当前TaskManager还是注册到JobManager,然后JobMananger会通知ResourceManager
当前ResourceManager只是起到一个记录的作用
ResourceManager没有从JobManager中独立出来
仍然是这种架构,
转发申明:
本文转自互联网,由小站整理并发布,在于分享相关技术和知识。版权归原作者所有,如有侵权,请联系本站 top8488@163.com,将在24小时内删除。谢谢