商城首页欢迎来到中国正版软件门户

您的位置:首页 >Flink滚动窗口详解与应用技巧

Flink滚动窗口详解与应用技巧

  发布于2025-08-16 阅读(0)

扫一扫,手机访问

本文主要研究一下flink的Tumbling Window

聊聊flink的Tumbling Window
WindowAssigner

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java

代码语言:javascript代码运行次数:0运行复制
@PublicEvolvingpublic abstract class WindowAssigner implements Serializable {    private static final long serialVersionUID = 1L;​    /**     * Returns a {@code Collection} of windows that should be assigned to the element.     *     * @param element The element to which windows should be assigned.     * @param timestamp The timestamp of the element.     * @param context The {@link WindowAssignerContext} in which the assigner operates.     */    public abstract Collection assignWindows(T element, long timestamp, WindowAssignerContext context);​    /**     * Returns the default trigger associated with this {@code WindowAssigner}.     */    public abstract Trigger getDefaultTrigger(StreamExecutionEnvironment env);​    /**     * Returns a {@link TypeSerializer} for serializing windows that are assigned by     * this {@code WindowAssigner}.     */    public abstract TypeSerializer getWindowSerializer(ExecutionConfig executionConfig);​    /**     * Returns {@code true} if elements are assigned to windows based on event time,     * {@code false} otherwise.     */    public abstract boolean isEventTime();​    /**     * A context provided to the {@link WindowAssigner} that allows it to query the     * current processing time.     *     * 

This is provided to the assigner by its containing * {@link org.apache.flink.streaming.runtime.operators.windowing.WindowOperator}, * which, in turn, gets it from the containing * {@link org.apache.flink.streaming.runtime.tasks.StreamTask}. */ public abstract static class WindowAssignerContext {​ /** * Returns the current processing time. */ public abstract long getCurrentProcessingTime();​ }}

WindowAssigner定义了assignWindows、getDefaultTrigger、getWindowSerializer、isEventTime这几个抽象方法,同时定义了抽象静态类WindowAssignerContext;它有两个泛型,其中T为元素类型,而W为窗口类型Window

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/windows/Window.java

代码语言:javascript代码运行次数:0运行复制
@PublicEvolvingpublic abstract class Window {​    /**     * Gets the largest timestamp that still belongs to this window.     *     * @return The largest timestamp that still belongs to this window.     */    public abstract long maxTimestamp();}
Window对象代表把无限流数据划分为有限buckets的集合,它有一个maxTimestamp,代表该窗口数据在该时间点内到达;它有两个子类,一个是GlobalWindow,一个是TimeWindowTimeWindow

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java

代码语言:javascript代码运行次数:0运行复制
@PublicEvolvingpublic class TimeWindow extends Window {​    private final long start;    private final long end;​    public TimeWindow(long start, long end) {        this.start = start;        this.end = end;    }​    /**     * Gets the starting timestamp of the window. This is the first timestamp that belongs     * to this window.     *     * @return The starting timestamp of this window.     */    public long getStart() {        return start;    }​    /**     * Gets the end timestamp of this window. The end timestamp is exclusive, meaning it     * is the first timestamp that does not belong to this window any more.     *     * @return The exclusive end timestamp of this window.     */    public long getEnd() {        return end;    }​    /**     * Gets the largest timestamp that still belongs to this window.     *     * 

This timestamp is identical to {@code getEnd() - 1}. * * @return The largest timestamp that still belongs to this window. * * @see #getEnd() */ @Override public long maxTimestamp() { return end - 1; }​ @Override public boolean equals(Object o) { if (this == o) { return true; } if (o == null || getClass() != o.getClass()) { return false; }​ TimeWindow window = (TimeWindow) o;​ return end == window.end && start == window.start; }​ @Override public int hashCode() { return MathUtils.longToIntWithBitMixing(start + end); }​ @Override public String toString() { return "TimeWindow{" + "start=" + start + ", end=" + end + '}'; }​ /** * Returns {@code true} if this window intersects the given window. */ public boolean intersects(TimeWindow other) { return this.start <= other.end && this.end >= other.start; }​ /** * Returns the minimal window covers both this window and the given window. */ public TimeWindow cover(TimeWindow other) { return new TimeWindow(Math.min(start, other.start), Math.max(end, other.end)); }​ // ------------------------------------------------------------------------ // Serializer // ------------------------------------------------------------------------​ //......​ // ------------------------------------------------------------------------ // Utilities // ------------------------------------------------------------------------​ /** * Merge overlapping {@link TimeWindow}s. For use by merging * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner WindowAssigners}. */ public static void mergeWindows(Collection windows, MergingWindowAssigner.MergeCallback c) {​ // sort the windows by the start time and then merge overlapping windows​ List sortedWindows = new ArrayList<>(windows);​ Collections.sort(sortedWindows, new Comparator() { @Override public int compare(TimeWindow o1, TimeWindow o2) { return Long.compare(o1.getStart(), o2.getStart()); } });​ List>> merged = new ArrayList<>(); Tuple2> currentMerge = null;​ for (TimeWindow candidate: sortedWindows) { if (currentMerge == null) { currentMerge = new Tuple2<>(); currentMerge.f0 = candidate; currentMerge.f1 = new HashSet<>(); currentMerge.f1.add(candidate); } else if (currentMerge.f0.intersects(candidate)) { currentMerge.f0 = currentMerge.f0.cover(candidate); currentMerge.f1.add(candidate); } else { merged.add(currentMerge); currentMerge = new Tuple2<>(); currentMerge.f0 = candidate; currentMerge.f1 = new HashSet<>(); currentMerge.f1.add(candidate); } }​ if (currentMerge != null) { merged.add(currentMerge); }​ for (Tuple2> m: merged) { if (m.f1.size() > 1) { c.merge(m.f1, m.f0); } } }​ /** * Method to get the window start for a timestamp. * * @param timestamp epoch millisecond to get the window start. * @param offset The offset which window start would be shifted by. * @param windowSize The size of the generated windows. * @return window start */ public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) { return timestamp - (timestamp - offset + windowSize) % windowSize; }}

TimeWindow有start及end属性,其中start为inclusive,而end为exclusive,所以maxTimestamp返回的是end-1;这里重写了equals及hashcode方法TimeWindow提供了intersects方法用于表示本窗口与指定窗口是否有交叉;而cover方法用于返回本窗口与指定窗口的重叠窗口TimeWindow还提供了mergeWindows及getWindowStartWithOffset静态方法;前者用于合并重叠的时间窗口,后者用于获取指定timestamp、offset、windowSize的window startTumblingEventTimeWindows

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java

代码语言:javascript代码运行次数:0运行复制
@PublicEvolvingpublic class TumblingEventTimeWindows extends WindowAssigner {    private static final long serialVersionUID = 1L;​    private final long size;​    private final long offset;​    protected TumblingEventTimeWindows(long size, long offset) {        if (offset < 0 || offset >= size) {            throw new IllegalArgumentException("TumblingEventTimeWindows parameters must satisfy 0 <= offset < size");        }​        this.size = size;        this.offset = offset;    }​    @Override    public Collection assignWindows(Object element, long timestamp, WindowAssignerContext context) {        if (timestamp > Long.MIN_VALUE) {            // Long.MIN_VALUE is currently assigned when no timestamp is present            long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, size);            return Collections.singletonList(new TimeWindow(start, start + size));        } else {            throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +                    "Is the time characteristic set to 'ProcessingTime', or did you forget to call " +                    "'DataStream.assignTimestampsAndWatermarks(...)'?");        }    }​    @Override    public Trigger getDefaultTrigger(StreamExecutionEnvironment env) {        return EventTimeTrigger.create();    }​    @Override    public String toString() {        return "TumblingEventTimeWindows(" + size + ")";    }​    public static TumblingEventTimeWindows of(Time size) {        return new TumblingEventTimeWindows(size.toMilliseconds(), 0);    }​    public static TumblingEventTimeWindows of(Time size, Time offset) {        return new TumblingEventTimeWindows(size.toMilliseconds(), offset.toMilliseconds());    }​    @Override    public TypeSerializer getWindowSerializer(ExecutionConfig executionConfig) {        return new TimeWindow.Serializer();    }​    @Override    public boolean isEventTime() {        return true;    }}
TumblingEventTimeWindows继承了Window,其中元素类型为Object,而窗口类型为TimeWindow;它有两个参数,一个是size,一个是offset,其中offset必须大于等于0,size必须大于offsetassignWindows方法获取的窗口为start及start+size,而start=TimeWindow.getWindowStartWithOffset(timestamp, offset, size);getDefaultTrigger方法返回的是EventTimeTrigger;getWindowSerializer方法返回的是TimeWindow.Serializer();isEventTime返回trueTumblingEventTimeWindows提供了of静态工厂方法,可以指定size及offset参数TumblingProcessingTimeWindows

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java

代码语言:javascript代码运行次数:0运行复制
public class TumblingProcessingTimeWindows extends WindowAssigner {    private static final long serialVersionUID = 1L;​    private final long size;​    private final long offset;​    private TumblingProcessingTimeWindows(long size, long offset) {        if (offset < 0 || offset >= size) {            throw new IllegalArgumentException("TumblingProcessingTimeWindows parameters must satisfy  0 <= offset < size");        }​        this.size = size;        this.offset = offset;    }​    @Override    public Collection assignWindows(Object element, long timestamp, WindowAssignerContext context) {        final long now = context.getCurrentProcessingTime();        long start = TimeWindow.getWindowStartWithOffset(now, offset, size);        return Collections.singletonList(new TimeWindow(start, start + size));    }​    public long getSize() {        return size;    }​    @Override    public Trigger getDefaultTrigger(StreamExecutionEnvironment env) {        return ProcessingTimeTrigger.create();    }​    @Override    public String toString() {        return "TumblingProcessingTimeWindows(" + size + ")";    }​    public static TumblingProcessingTimeWindows of(Time size) {        return new TumblingProcessingTimeWindows(size.toMilliseconds(), 0);    }​    public static TumblingProcessingTimeWindows of(Time size, Time offset) {        return new TumblingProcessingTimeWindows(size.toMilliseconds(), offset.toMilliseconds());    }​    @Override    public TypeSerializer getWindowSerializer(ExecutionConfig executionConfig) {        return new TimeWindow.Serializer();    }​    @Override    public boolean isEventTime() {        return false;    }}
TumblingProcessingTimeWindows继承了WindowAssigner,其中元素类型为Object,而窗口类型为TimeWindow;它有两个参数,一个是size,一个是offset,其中offset必须大于等于0,size必须大于offsetassignWindows方法获取的窗口为start及start+size,而start=TimeWindow.getWindowStartWithOffset(now, offset, size),而now值则为context.getCurrentProcessingTime(),则是与TumblingEventTimeWindows的不同之处,TumblingProcessingTimeWindows不使用timestamp参数来计算,它使用now值替代;getDefaultTrigger方法返回的是ProcessingTimeTrigger,而isEventTime方法返回的为falseTumblingProcessingTimeWindows也提供了of静态工厂方法,可以指定size及offset参数小结flink的Tumbling Window分为TumblingEventTimeWindows及TumblingProcessingTimeWindows,它们都继承了WindowAssigner,其中元素类型为Object,而窗口类型为TimeWindow;它有两个参数,一个是size,一个是offset,其中offset必须大于等于0,size必须大于offsetWindowAssigner定义了assignWindows、getDefaultTrigger、getWindowSerializer、isEventTime这几个抽象方法,同时定义了抽象静态类WindowAssignerContext;它有两个泛型,其中T为元素类型,而W为窗口类型;TumblingEventTimeWindows及TumblingProcessingTimeWindows的窗口类型为TimeWindow,它有start及end属性,其中start为inclusive,而end为exclusive,maxTimestamp返回的是end-1,它还提供了mergeWindows及getWindowStartWithOffset静态方法;前者用于合并重叠的时间窗口,后者用于获取指定timestamp、offset、windowSize的window startTumblingEventTimeWindows及TumblingProcessingTimeWindows的不同在于assignWindows、getDefaultTrigger、isEventTime方法;前者assignWindows使用的是参数中的timestamp,而后者使用的是now值;前者的getDefaultTrigger返回的是EventTimeTrigger,而后者返回的是ProcessingTimeTrigger;前者isEventTime方法返回的为true,而后者返回的为falsedocTumbling Windows
本文转载于:https://cloud.tencent.com/developer/article/1380590 如有侵犯,请联系zhengruancom@outlook.com删除。
免责声明:正软商城发布此文仅为传递信息,不代表正软商城认同其观点或证实其描述。

热门关注