背景
最近有局點客戶有這么一個場景:利用Flink CDC讀取MySql數(shù)據(jù)binlog日志,然后使用窗口進行聚合統(tǒng)計,遇到的問題就是Flink現(xiàn)有窗口的觸發(fā)機制(定時或者定量)、不滿足他們的實際需求(定時和定量)。溫故而知新,可以為師矣。本文基于官網(wǎng)和源碼梳理Flink現(xiàn)有窗口的類型、觸發(fā)機制等內(nèi)容。最后,基于自定義觸發(fā)器實現(xiàn)定時定量觸發(fā)機制解決該局點客戶的實際場景問題。
Flink窗口分類
窗口是處理無限實時流的核心,將無界數(shù)據(jù)流切分為邏輯概念的桶、進行實際計算邏輯。
分類方法一
從窗口是否分組來說,F(xiàn)link窗口分為兩類:分組窗口(Keyed Windows)、非分組窗口(Non-Keyed Windows),單從概念上看,區(qū)別就是前者用于keyed streams(即keyby操作后的DataStream)、后者直接用于DataStream。
實際區(qū)別很大:keyed streams因為按照指定key(keyselector)進行分組、相同key的元素發(fā)送到相同subtask,所以keyed windows允許在多個subtask中并行化計算;而non-keyed streams不進行分組,所有non-keyed windows計算邏輯只能在單個stask中處理,即并行度只能為1。
分類方法二
從窗口屬性來說,F(xiàn)link窗口分為兩類:基于時間的窗口(滾動窗口-Tumbling Window、滑動窗口-Sliding Window、會話窗口-Session Window)、基于計數(shù)的窗口(全局窗口-GlobalWindow)。很明顯時間窗口與時間相關(guān)、每個窗口都有開始時間和結(jié)束時間;計數(shù)窗口與數(shù)據(jù)條數(shù)相關(guān)、與時間無關(guān)。內(nèi)置的這幾種窗口的詳細(xì)介紹,后續(xù)單獨發(fā)文描述。
兩種分類相關(guān)性
兩種分類方式的關(guān)系是什么呢?
從窗口是否分組的圖中我們可以知道Keyed Windows使用window方法、Non-Keyed Windows使用windowAll方法,我們可以看下源碼中這兩個方法的使用情況。
這兩個方法的入?yún)⒍紴閃indowAssigner抽象類。
上圖紅線部分可以看出,window方法和windowAll方法都可以使用時間窗口和計數(shù)窗口;
上圖標(biāo)黃部分可以看出,window方法和windowAll方法可以使用相同的WindowAssigner(即TumblingWindow、SlidingWindow、GlobalWindow);
細(xì)心讀者可能會發(fā)現(xiàn),為何沒有SessionWindows的蹤影?
我們繼續(xù)看下WindowAssigner抽象類的子類,一目了然、豁然開朗:
客戶需求
客戶原本是需要數(shù)據(jù)條數(shù)達到時觸發(fā)后續(xù)操作,但是發(fā)現(xiàn)某些時間段(如非高峰期)數(shù)據(jù)條數(shù)長時間達不到以至于不觸發(fā)后續(xù)操作。所以,需要本文開頭所說的定量觸發(fā)基礎(chǔ)上加上定時觸發(fā)。很顯然,從上面介紹的Flink窗口分類來看,內(nèi)置的這幾類窗口類型并不滿足(時間窗口屬于定時觸發(fā)、計數(shù)窗口屬于定量觸發(fā))。
自定義觸發(fā)器
那么,到底是選擇計數(shù)窗口+自定義時間觸發(fā)器還是時間窗口+自定義計數(shù)觸發(fā)器?
根據(jù)現(xiàn)有Flink API架構(gòu)上說,時間窗口+自定義計數(shù)觸發(fā)器是唯一選擇。
Trigger觸發(fā)器決定了窗口function何時對窗口進行運算,自定義Trigger觸發(fā)器需要繼承實現(xiàn)Trigger抽象類。
該類包括抽象方法如下:
TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx)
TriggerResult onProcessingTime(long time, W window, TriggerContext ctx)
TriggerResult onEventTime(long time, W window, TriggerContext ctx)
boolean canMerge()
void onMerge(W window, OnMergeContext ctx)
void clear(W window, TriggerContext ctx)
該類有多種類型的實現(xiàn)子類,感興趣可以自行閱讀,方便自己實現(xiàn)自定義觸發(fā)器:
案例實踐:計數(shù)窗口
完整代碼見github:
https://github.com/felixzh2020/felixzh-learning-flink/blob/master/MyWindowTrigger/src/main/java/CountWindowDemo.java
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream socket = env.socketTextStream(“felixzh”, 4444, “”);socket.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))//.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))).process(new ProcessAllWindowFunction() {@Overridepublic void process(Context context, Iterable elements, Collector out) throws Exception {elements.forEach(value -> out.collect(value + ” ” + new Date()));}}).print();env.execute();
數(shù)據(jù)源:nc –l 4444
首先輸入5個1(注意別忘回車)和1個2,過一會再輸入4個2。
從上圖可以看出,每5條數(shù)據(jù)觸發(fā)一次窗口計算,效果與實際代碼預(yù)期相符。
實際代碼詳見countWindowAll內(nèi)部使用的CountTrigger觸發(fā)器
public AllWindowedStream countWindowAll(long size) {return windowAll(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));}
案例實踐:時間窗口
完整代碼見github:
https://github.com/felixzh2020/felixzh-learning-flink/blob/master/MyWindowTrigger/src/main/java/TimeWindowDemo.java
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream socket = env.socketTextStream(“felixzh”, 4444, “”);socket.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))//.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))).process(new ProcessAllWindowFunction() {@Overridepublic void process(Context context, Iterable elements, Collector out) throws Exception {elements.forEach(value -> out.collect(value + ” ” + new Date()));}}).print();env.execute();
數(shù)據(jù)源:nc –l 4444
首先輸入任意字符如felixzh,然后分別5秒內(nèi)和5秒后輸入。
從上圖可以看出,每5秒觸發(fā)一次窗口計算,效果與實際代碼預(yù)期相符。
實際代碼詳見TumblingProcessingTimeWindows內(nèi)部使用的ProcessingTimeTrigger觸發(fā)器。
案例實踐:時間窗口+CountTrigger
樂于思考的朋友很容易想到,既然有時間窗口也有CountTrigger觸發(fā)器,直接組合不就解決背景所述的定時定量觸發(fā)了嗎?不需要自定義計數(shù)觸發(fā)器了吧?
該思路下的完整代碼見github:
https://github.com/felixzh2020/felixzh-learning-flink/blob/master/MyWindowTrigger/src/main/java/TimeWindowAddCountTriggerDemo.java
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream socket = env.socketTextStream(“felixzh”, 4444, “”);CountTrigger countTrigger = CountTrigger.of(5);socket.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(30))).trigger(countTrigger)//.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))).process(new ProcessAllWindowFunction() {@Overridepublic void process(Context context, Iterable elements, Collector out) throws Exception {elements.forEach(value -> out.collect(value + ” ” + new Date()));}}).print();env.execute();
數(shù)據(jù)源:nc –l 4444
輸入任意6行字符
從上圖可以看出,即使等待30秒之后,實際效果并非預(yù)期。
預(yù)期的定時定量觸發(fā):即5條數(shù)據(jù)達到觸發(fā)條件,就要觸發(fā)計算,這點沒毛病。
而30秒達到觸發(fā)條件,并沒有觸發(fā)計算。
究其原因:ProcessingTimeTrigger觸發(fā)器onProcessingTime方法返回TriggerResult.FIRE;而CountTrigger觸發(fā)器onProcessingTime方法返回TriggerResult.CONTINUE。
看完TriggerResult枚舉類,相信你會一目了然:
/** No action is taken on the window. */CONTINUE(false, false),/** {@code FIRE_AND_PURGE} evaluates the window function and emits the window result. */FIRE_AND_PURGE(true, true),/*** On {@code FIRE}, the window is evaluated and results are emitted. The window is not purged,* though, all elements are retained.*/FIRE(true, false),/*** All elements in the window are cleared and the window is discarded, without evaluating the* window function or emitting any elements.*/PURGE(false, true);
簡而言之一句話:FIRE會觸發(fā)計算,CONTINUE不會觸發(fā)計算。
案例實踐:時間窗口+自定義計數(shù)觸發(fā)器
經(jīng)過上述描述,我們還是需要實現(xiàn)自定義的計數(shù)觸發(fā)器,需要區(qū)分事件時間和處理時間。
當(dāng)然,思路還是借鑒CountTrigger觸發(fā)器的已有內(nèi)容。
定義MyCountTrigger自定義觸發(fā)器繼承Trigger,完整代碼見github:
https://github.com/felixzh2020/felixzh-learning-flink/blob/master/MyWindowTrigger/src/main/java/MyCountTrigger.java
https://github.com/felixzh2020/felixzh-learning-flink/blob/master/MyWindowTrigger/src/main/java/TimeWindowAddMyCountTriggerDemo.java
案例實踐代碼如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream socket = env.socketTextStream(“felixzh”, 4444, “”);socket.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(30))).trigger(MyCountTrigger.of(5)).process(new ProcessAllWindowFunction() {@Overridepublic void process(Context context, Iterable elements, Collector out) throws Exception {elements.forEach(value -> out.collect(value + ” ” + new Date()));}}).print();env.execute();
數(shù)據(jù)源:nc –l 4444
輸入任意6行字符
從上圖可以看出,每5條數(shù)據(jù)觸發(fā)一次窗口計算,每30秒觸發(fā)一次窗口計算,效果與實際代碼預(yù)期相符。
結(jié)論
以上,借鑒Flink原生CountTrigger和ProcessingTimeTrigger,實現(xiàn)自定義Trigger觸發(fā)器,解決客戶現(xiàn)場定時定量觸發(fā)窗口聚合計算的效果。