Flink – InstanceManager
InstanceManager用于管理JobManager申请到的taskManager和slots资源
/**<br/> * Simple manager that keeps track of which TaskManager are available and alive.<br/> */<br/> public class InstanceManager { // ------------------------------------------------------------------------<br/> // Fields<br/> // ------------------------------------------------------------------------ //分别以InstanceId和ResourceId来索引Instance<br/> /** Set of hosts known to run a task manager that are thus able to execute tasks (by ID). */<br/> private final Map<InstanceID, Instance> registeredHostsById;<br/> /** Set of hosts known to run a task manager that are thus able to execute tasks (by ResourceID). */<br/> private final Map<ResourceID, Instance> registeredHostsByResource; /** Set of hosts that were present once and have died */<br/> private final Set<ResourceID> deadHosts; /** Listeners that want to be notified about availability and disappearance of instances */<br/> private final List<InstanceListener> instanceListeners = new ArrayList<>(); //Instance资源发生变化时,需要通知谁,如Scheduler /** The total number of task slots that the system has */<br/> private int totalNumberOfAliveTaskSlots;
关键的操作,
registerTaskManager
/**<br/> * Registers a task manager. Registration of a task manager makes it available to be used<br/> * for the job execution.<br/> *<br/> * @param taskManagerGateway gateway to the task manager<br/> * @param taskManagerLocation Location info of the TaskManager<br/> * @param resources Hardware description of the TaskManager<br/> * @param numberOfSlots Number of available slots on the TaskManager<br/> * @return The assigned InstanceID of the registered task manager<br/> */<br/> public InstanceID registerTaskManager(<br/> TaskManagerGateway taskManagerGateway,<br/> TaskManagerLocation taskManagerLocation,<br/> HardwareDescription resources,<br/> int numberOfSlots) { synchronized (this.lock) {<br/> InstanceID instanceID = new InstanceID(); Instance host = new Instance( //创建新的instance<br/> taskManagerGateway,<br/> taskManagerLocation,<br/> instanceID,<br/> resources,<br/> numberOfSlots); registeredHostsById.put(instanceID, host); //register<br/> registeredHostsByResource.put(taskManagerLocation.getResourceID(), host); totalNumberOfAliveTaskSlots += numberOfSlots; host.reportHeartBeat(); // notify all listeners (for example the scheduler)<br/> notifyNewInstance(host); return instanceID;<br/> }<br/> }
其中,notifyNewInstance
private void notifyNewInstance(Instance instance) {<br/> synchronized (this.instanceListeners) {<br/> for (InstanceListener listener : this.instanceListeners) {<br/> try {<br/> listener.newInstanceAvailable(instance); //调用listener的newInstanceAvailable<br/> }<br/> catch (Throwable t) {<br/> LOG.error("Notification of new instance availability failed.", t);<br/> }<br/> }<br/> }<br/> }
Instance
看注释,instance就是一种抽象
用于描述注册到JobManager,并准备接受work的TaskManager
/**<br/> * An instance represents a {@link org.apache.flink.runtime.taskmanager.TaskManager}<br/> * registered at a JobManager and ready to receive work.<br/> */<br/> public class Instance implements SlotOwner { /** The instance gateway to communicate with the instance */<br/> private final TaskManagerGateway taskManagerGateway; /** The instance connection information for the data transfer. */<br/> private final TaskManagerLocation location; /** A description of the resources of the task manager */<br/> private final HardwareDescription resources; /** The ID identifying the taskManager. */<br/> private final InstanceID instanceId; /** The number of task slots available on the node */<br/> private final int numberOfSlots; /** A list of available slot positions */<br/> private final Queue<Integer> availableSlots; //注意这里记录的不是slot,而是position,因为slot是在用的时候创建的 /** Allocated slots on this taskManager */<br/> private final Set<Slot> allocatedSlots = new HashSet<Slot>(); /** A listener to be notified upon new slot availability */<br/> private SlotAvailabilityListener slotAvailabilityListener; //listener用于通知当slot状态发生变化 /** Time when last heat beat has been received from the task manager running on this taskManager. */<br/> private volatile long lastReceivedHeartBeat = System.currentTimeMillis();
核心的操作,
申请slot
/**<br/> * Allocates a simple slot on this TaskManager instance. This method returns {@code null}, if no slot<br/> * is available at the moment.<br/> *<br/> * @param jobID The ID of the job that the slot is allocated for.<br/> *<br/> * @return A simple slot that represents a task slot on this TaskManager instance, or null, if the<br/> * TaskManager instance has no more slots available.<br/> *<br/> * @throws InstanceDiedException Thrown if the instance is no longer alive by the time the<br/> * slot is allocated.<br/> */<br/> public SimpleSlot allocateSimpleSlot(JobID jobID) throws InstanceDiedException { synchronized (instanceLock) {<br/> Integer nextSlot = availableSlots.poll(); //看看有没有available的slot position<br/> if (nextSlot == null) {<br/> return null;<br/> }<br/> else {<br/> SimpleSlot slot = new SimpleSlot(jobID, this, location, nextSlot, taskManagerGateway);<br/> allocatedSlots.add(slot);<br/> return slot;<br/> }<br/> }<br/> }
归还slot
/**<br/> * Returns a slot that has been allocated from this instance. The slot needs have been canceled<br/> * prior to calling this method.<br/> *<br/> * <p>The method will transition the slot to the "released" state. If the slot is already in state<br/> * "released", this method will do nothing.</p><br/> *<br/> * @param slot The slot to return.<br/> * @return True, if the slot was returned, false if not.<br/> */<br/> @Override<br/> public boolean returnAllocatedSlot(Slot slot) { if (slot.markReleased()) {<br/> LOG.debug("Return allocated slot {}.", slot);<br/> synchronized (instanceLock) { if (this.allocatedSlots.remove(slot)) {<br/> this.availableSlots.add(slot.getSlotNumber()); if (this.slotAvailabilityListener != null) {<br/> this.slotAvailabilityListener.newSlotAvailable(this); //通知有个slot可以用<br/> } return true;<br/> }<br/> }<br/> }<br/> }
转发申明:
本文转自互联网,由小站整理并发布,在于分享相关技术和知识。版权归原作者所有,如有侵权,请联系本站 top8488@163.com,将在24小时内删除。谢谢