Flink – window operator
参考,
http://wuchong.me/blog/2016/05/25/flink-internals-window-mechanism/
http://wuchong.me/blog/2016/06/06/flink-internals-session-window/
WindowOperator
window operator通过WindowAssigner和Trigger来实现它的逻辑
当一个element到达时,通过KeySelector先assign一个key,并且通过WindowAssigner assign若干个windows,这样这个element会被放入若干个pane
一个pane会存放所有相同key和相同window的elements
/**<br/> * An operator that implements the logic for windowing based on a {@link WindowAssigner} and<br/> * {@link Trigger}.<br/> *<br/> * <p><br/> * When an element arrives it gets assigned a key using a {@link KeySelector} and it gets<br/> * assigned to zero or more windows using a {@link WindowAssigner}. Based on this, the element<br/> * is put into panes. A pane is the bucket of elements that have the same key and same<br/> * {@code Window}. An element can be in multiple panes if it was assigned to multiple windows by the<br/> * {@code WindowAssigner}.<br/> *<br/> * <p><br/> * Each pane gets its own instance of the provided {@code Trigger}. This trigger determines when<br/> * the contents of the pane should be processed to emit results. When a trigger fires,<br/> * the given {@link InternalWindowFunction} is invoked to produce the results that are emitted for<br/> * the pane to which the {@code Trigger} belongs.<br/> *<br/> * @param <K> The type of key returned by the {@code KeySelector}.<br/> * @param <IN> The type of the incoming elements.<br/> * @param <OUT> The type of elements emitted by the {@code InternalWindowFunction}.<br/> * @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns.<br/> */<br/> @Internal<br/> public class WindowOperator<K, IN, ACC, OUT, W extends Window><br/> extends AbstractUdfStreamOperator<OUT, InternalWindowFunction<ACC, OUT, K, W>><br/> implements OneInputStreamOperator<IN, OUT>, Triggerable, InputTypeConfigurable { // ------------------------------------------------------------------------<br/> // Configuration values and user functions<br/> // ------------------------------------------------------------------------ protected final WindowAssigner<? super IN, W> windowAssigner; protected final KeySelector<IN, K> keySelector; protected final Trigger<? super IN, ? super W> trigger; protected final StateDescriptor<? extends AppendingState<IN, ACC>, ?> windowStateDescriptor; /**<br/> * The allowed lateness for elements. This is used for:<br/> * <ul><br/> * <li>Deciding if an element should be dropped from a window due to lateness.<br/> * <li>Clearing the state of a window if the system time passes the<br/> * {@code window.maxTimestamp + allowedLateness} landmark.<br/> * </ul><br/> */<br/> protected final long allowedLateness; //允许late多久,即当watermark已经触发后 /**<br/> * To keep track of the current watermark so that we can immediately fire if a trigger<br/> * registers an event time callback for a timestamp that lies in the past.<br/> */<br/> protected transient long currentWatermark = Long.MIN_VALUE; protected transient Context context = new Context(null, null); //Trigger Context protected transient WindowAssigner.WindowAssignerContext windowAssignerContext; //只为获取getCurrentProcessingTime // ------------------------------------------------------------------------<br/> // State that needs to be checkpointed<br/> // ------------------------------------------------------------------------ /**<br/> * Processing time timers that are currently in-flight.<br/> */<br/> protected transient PriorityQueue<Timer<K, W>> processingTimeTimersQueue; //Timer用于存储timestamp,key,window, queue按时间排序 /**<br/> * Current waiting watermark callbacks.<br/> */<br/> protected transient Set<Timer<K, W>> watermarkTimers;<br/> protected transient PriorityQueue<Timer<K, W>> watermarkTimersQueue; // protected transient Map<K, MergingWindowSet<W>> mergingWindowsByKey; //用于记录merge后的stateWindow和window的对应关系
对于window operator而已,最关键的是WindowAssigner和Trigger
WindowAssigner
WindowAssigner,用于指定一个tuple应该被分配到那些windows去
借用个图,可以看出有多少种WindowAssigner
对于WindowAssigner,最关键的接口是,assignWindows
为一个element,分配一组windows, Collection<W>
@PublicEvolving<br/> public abstract class WindowAssigner<T, W extends Window> implements Serializable {<br/> private static final long serialVersionUID = 1L; /**<br/> * Returns a {@code Collection} of windows that should be assigned to the element.<br/> *<br/> * @param element The element to which windows should be assigned.<br/> * @param timestamp The timestamp of the element.<br/> * @param context The {@link WindowAssignerContext} in which the assigner operates.<br/> */<br/> public abstract Collection<W> assignWindows(T element, long timestamp, WindowAssignerContext context); /**<br/> * Returns the default trigger associated with this {@code WindowAssigner}.<br/> */<br/> public abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment env); /**<br/> * Returns a {@link TypeSerializer} for serializing windows that are assigned by<br/> * this {@code WindowAssigner}.<br/> */<br/> public abstract TypeSerializer<W> getWindowSerializer(ExecutionConfig executionConfig);
实际看下,具体WindowAssigner的实现
public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> { @Override<br/> public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {<br/> final long now = context.getCurrentProcessingTime();<br/> long start = now - (now % size);<br/> return Collections.singletonList(new TimeWindow(start, start + size)); //很简单,分配一个TimeWindow<br/> } @Override<br/> public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {<br/> return ProcessingTimeTrigger.create(); //默认给出的是ProcessingTimeTrigger,如其名<br/> }
public class SlidingEventTimeWindows extends WindowAssigner<Object, TimeWindow> { private final long size;<br/> private final long slide; @Override<br/> public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {<br/> if (timestamp > Long.MIN_VALUE) {<br/> List<TimeWindow> windows = new ArrayList<>((int) (size / slide));<br/> long lastStart = timestamp - timestamp % slide;<br/> for (long start = lastStart;<br/> start > timestamp - size;<br/> start -= slide) {<br/> windows.add(new TimeWindow(start, start + size)); //可以看到这里会assign多个TimeWindow,因为是slide<br/> }<br/> return windows;<br/> } else { }<br/> } @Override<br/> public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {<br/> return EventTimeTrigger.create();<br/> }
Trigger, Evictor
下面看看3个主要的接口,分别触发,onElement,onEventTime,onProcessingTime
processElement
处理element到达的逻辑,触发onElement
public void processElement(StreamRecord<IN> element) throws Exception {<br/> Collection<W> elementWindows = windowAssigner.assignWindows( //通过WindowAssigner为element分配一系列windows<br/> element.getValue(), element.getTimestamp(), windowAssignerContext); final K key = (K) getStateBackend().getCurrentKey(); if (windowAssigner instanceof MergingWindowAssigner) { //如果是MergingWindow<br/> //.......<br/> } else { //如果是普通window<br/> for (W window: elementWindows) { // drop if the window is already late<br/> if (isLate(window)) { //late data的处理,默认是丢弃<br/> continue;<br/> } AppendingState<IN, ACC> windowState = getPartitionedState( //从backend中取出该window的状态,就是buffer的element<br/> window, windowSerializer, windowStateDescriptor);<br/> windowState.add(element.getValue()); //把当前的element加入buffer state context.key = key;<br/> context.window = window; //context的设计相当tricky和晦涩 TriggerResult triggerResult = context.onElement(element); //触发onElment,得到triggerResult if (triggerResult.isFire()) { //对triggerResult做各种处理<br/> ACC contents = windowState.get();<br/> if (contents == null) {<br/> continue;<br/> }<br/> fire(window, contents); //如果fire,真正去计算窗口中的elements<br/> } if (triggerResult.isPurge()) {<br/> cleanup(window, windowState, null); //purge,即去cleanup elements<br/> } else {<br/> registerCleanupTimer(window);<br/> }<br/> }<br/> }<br/> }
判断是否是late data的逻辑
protected boolean isLate(W window) {<br/> return (windowAssigner.isEventTime() && (cleanupTime(window) <= currentWatermark));<br/> }<br/> private long cleanupTime(W window) {<br/> long cleanupTime = window.maxTimestamp() + allowedLateness; //allowedLateness;<br/> return cleanupTime >= window.maxTimestamp() ? cleanupTime : Long.MAX_VALUE;<br/> }
fire逻辑
private void fire(W window, ACC contents) throws Exception {<br/> timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());<br/> userFunction.apply(context.key, context.window, contents, timestampedCollector);<br/> }
processWatermark
处理watermark,onEvent触发
@Override<br/> public void processWatermark(Watermark mark) throws Exception {<br/> boolean fire;<br/> do {<br/> Timer<K, W> timer = watermarkTimersQueue.peek(); //这叫watermarkTimersQueue,是否有些歧义,叫eventTimerQueue更好理解些<br/> if (timer != null && timer.timestamp <= mark.getTimestamp()) {<br/> fire = true; watermarkTimers.remove(timer);<br/> watermarkTimersQueue.remove(); context.key = timer.key;<br/> context.window = timer.window;<br/> setKeyContext(timer.key); //stateBackend.setCurrentKey(key); AppendingState<IN, ACC> windowState;<br/> MergingWindowSet<W> mergingWindows = null; if (windowAssigner instanceof MergingWindowAssigner) { //MergingWindow<br/> mergingWindows = getMergingWindowSet();<br/> W stateWindow = mergingWindows.getStateWindow(context.window);<br/> if (stateWindow == null) {<br/> // then the window is already purged and this is a cleanup<br/> // timer set due to allowed lateness that has nothing to clean,<br/> // so it is safe to just ignore<br/> continue;<br/> }<br/> windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);<br/> } else { //普通window<br/> windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor); //取得window的state<br/> } ACC contents = windowState.get();<br/> if (contents == null) {<br/> // if we have no state, there is nothing to do<br/> continue;<br/> } TriggerResult triggerResult = context.onEventTime(timer.timestamp); //触发onEvent<br/> if (triggerResult.isFire()) {<br/> fire(context.window, contents);<br/> } if (triggerResult.isPurge() || (windowAssigner.isEventTime() && isCleanupTime(context.window, timer.timestamp))) {<br/> cleanup(context.window, windowState, mergingWindows);<br/> } } else {<br/> fire = false;<br/> }<br/> } while (fire); //如果fire为true,继续看下个waterMarkTimer是否需要fire output.emitWatermark(mark); //把waterMark传递下去 this.currentWatermark = mark.getTimestamp(); //更新currentWaterMark<br/> }
trigger
首先,这个函数的命名有问题,为何和前面的process…不匹配
这个是用来触发onProcessingTime,这个需要依赖系统时间的定时器来触发,逻辑和processWatermark基本等同,只是触发条件不一样
@Override<br/> public void trigger(long time) throws Exception {<br/> boolean fire; //Remove information about the triggering task<br/> processingTimeTimerFutures.remove(time);<br/> processingTimeTimerTimestamps.remove(time, processingTimeTimerTimestamps.count(time)); do {<br/> Timer<K, W> timer = processingTimeTimersQueue.peek();<br/> if (timer != null && timer.timestamp <= time) {<br/> fire = true; processingTimeTimers.remove(timer);<br/> processingTimeTimersQueue.remove(); context.key = timer.key;<br/> context.window = timer.window;<br/> setKeyContext(timer.key); AppendingState<IN, ACC> windowState;<br/> MergingWindowSet<W> mergingWindows = null; if (windowAssigner instanceof MergingWindowAssigner) {<br/> mergingWindows = getMergingWindowSet();<br/> W stateWindow = mergingWindows.getStateWindow(context.window);<br/> if (stateWindow == null) {<br/> // then the window is already purged and this is a cleanup<br/> // timer set due to allowed lateness that has nothing to clean,<br/> // so it is safe to just ignore<br/> continue;<br/> }<br/> windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);<br/> } else {<br/> windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor);<br/> } ACC contents = windowState.get();<br/> if (contents == null) {<br/> // if we have no state, there is nothing to do<br/> continue;<br/> } TriggerResult triggerResult = context.onProcessingTime(timer.timestamp);<br/> if (triggerResult.isFire()) {<br/> fire(context.window, contents);<br/> } if (triggerResult.isPurge() || (!windowAssigner.isEventTime() && isCleanupTime(context.window, timer.timestamp))) {<br/> cleanup(context.window, windowState, mergingWindows);<br/> } } else {<br/> fire = false;<br/> }<br/> } while (fire);<br/> }
EvictingWindowOperator
Evicting对于WindowOperator而言,就是多了Evictor
private void fire(W window, Iterable<StreamRecord<IN>> contents) throws Exception {<br/> timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp()); // Work around type system restrictions...<br/> int toEvict = evictor.evict((Iterable) contents, Iterables.size(contents), context.window); //执行evict FluentIterable<IN> projectedContents = FluentIterable<br/> .from(contents)<br/> .skip(toEvict)<br/> .transform(new Function<StreamRecord<IN>, IN>() {<br/> @Override<br/> public IN apply(StreamRecord<IN> input) {<br/> return input.getValue();<br/> }<br/> });<br/> userFunction.apply(context.key, context.window, projectedContents, timestampedCollector);<br/> }
转发申明:
本文转自互联网,由小站整理并发布,在于分享相关技术和知识。版权归原作者所有,如有侵权,请联系本站 top8488@163.com,将在24小时内删除。谢谢