背景
最近有局點(diǎn)客戶有這么一個(gè)場景:利用Flink CDC讀取MySql數(shù)據(jù)binlog日志,然后使用窗口進(jìn)行聚合統(tǒng)計(jì),遇到的問題就是Flink現(xiàn)有窗口的觸發(fā)機(jī)制(定時(shí)或者定量)、不滿足他們的實(shí)際需求(定時(shí)和定量)。溫故而知新,可以為師矣。本文基于官網(wǎng)和源碼梳理Flink現(xiàn)有窗口的類型、觸發(fā)機(jī)制等內(nèi)容。最后,基于自定義觸發(fā)器實(shí)現(xiàn)定時(shí)定量觸發(fā)機(jī)制解決該局點(diǎn)客戶的實(shí)際場景問題。
Flink窗口分類
窗口是處理無限實(shí)時(shí)流的核心,將無界數(shù)據(jù)流切分為邏輯概念的桶、進(jìn)行實(shí)際計(jì)算邏輯。
分類方法一
從窗口是否分組來說,F(xiàn)link窗口分為兩類:分組窗口(Keyed Windows)、非分組窗口(Non-Keyed Windows),單從概念上看,區(qū)別就是前者用于keyed streams(即keyby操作后的DataStream)、后者直接用于DataStream。
實(shí)際區(qū)別很大:keyed streams因?yàn)榘凑罩付╧ey(keyselector)進(jìn)行分組、相同key的元素發(fā)送到相同subtask,所以keyed windows允許在多個(gè)subtask中并行化計(jì)算;而non-keyed streams不進(jìn)行分組,所有non-keyed windows計(jì)算邏輯只能在單個(gè)stask中處理,即并行度只能為1。
分類方法二
從窗口屬性來說,F(xiàn)link窗口分為兩類:基于時(shí)間的窗口(滾動(dòng)窗口-Tumbling Window、滑動(dòng)窗口-Sliding Window、會(huì)話窗口-Session Window)、基于計(jì)數(shù)的窗口(全局窗口-GlobalWindow)。很明顯時(shí)間窗口與時(shí)間相關(guān)、每個(gè)窗口都有開始時(shí)間和結(jié)束時(shí)間;計(jì)數(shù)窗口與數(shù)據(jù)條數(shù)相關(guān)、與時(shí)間無關(guān)。內(nèi)置的這幾種窗口的詳細(xì)介紹,后續(xù)單獨(dú)發(fā)文描述。
兩種分類相關(guān)性
兩種分類方式的關(guān)系是什么呢?
從窗口是否分組的圖中我們可以知道Keyed Windows使用window方法、Non-Keyed Windows使用windowAll方法,我們可以看下源碼中這兩個(gè)方法的使用情況。
這兩個(gè)方法的入?yún)⒍紴閃indowAssigner抽象類。
上圖紅線部分可以看出,window方法和windowAll方法都可以使用時(shí)間窗口和計(jì)數(shù)窗口;
上圖標(biāo)黃部分可以看出,window方法和windowAll方法可以使用相同的WindowAssigner(即TumblingWindow、SlidingWindow、GlobalWindow);
細(xì)心讀者可能會(huì)發(fā)現(xiàn),為何沒有SessionWindows的蹤影?
我們繼續(xù)看下WindowAssigner抽象類的子類,一目了然、豁然開朗:
客戶需求
客戶原本是需要數(shù)據(jù)條數(shù)達(dá)到時(shí)觸發(fā)后續(xù)操作,但是發(fā)現(xiàn)某些時(shí)間段(如非高峰期)數(shù)據(jù)條數(shù)長時(shí)間達(dá)不到以至于不觸發(fā)后續(xù)操作。所以,需要本文開頭所說的定量觸發(fā)基礎(chǔ)上加上定時(shí)觸發(fā)。很顯然,從上面介紹的Flink窗口分類來看,內(nèi)置的這幾類窗口類型并不滿足(時(shí)間窗口屬于定時(shí)觸發(fā)、計(jì)數(shù)窗口屬于定量觸發(fā))。
自定義觸發(fā)器
那么,到底是選擇計(jì)數(shù)窗口+自定義時(shí)間觸發(fā)器還是時(shí)間窗口+自定義計(jì)數(shù)觸發(fā)器?
根據(jù)現(xiàn)有Flink API架構(gòu)上說,時(shí)間窗口+自定義計(jì)數(shù)觸發(fā)器是唯一選擇。
Trigger觸發(fā)器決定了窗口function何時(shí)對(duì)窗口進(jìn)行運(yùn)算,自定義Trigger觸發(fā)器需要繼承實(shí)現(xiàn)Trigger抽象類。
該類包括抽象方法如下:
TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx)
TriggerResult onProcessingTime(long time, W window, TriggerContext ctx)
TriggerResult onEventTime(long time, W window, TriggerContext ctx)
boolean canMerge()
void onMerge(W window, OnMergeContext ctx)
void clear(W window, TriggerContext ctx)
該類有多種類型的實(shí)現(xiàn)子類,感興趣可以自行閱讀,方便自己實(shí)現(xiàn)自定義觸發(fā)器:
案例實(shí)踐:計(jì)數(shù)窗口
完整代碼見github:
https://github.com/felixzh2020/felixzh-learning-flink/blob/master/MyWindowTrigger/src/main/java/CountWindowDemo.java
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream socket = env.socketTextStream(“felixzh”, 4444, “”);socket.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))//.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))).process(new ProcessAllWindowFunction() {@Overridepublic void process(Context context, Iterable elements, Collector out) throws Exception {elements.forEach(value -> out.collect(value + ” ” + new Date()));}}).print();env.execute();
數(shù)據(jù)源:nc –l 4444
首先輸入5個(gè)1(注意別忘回車)和1個(gè)2,過一會(huì)再輸入4個(gè)2。
從上圖可以看出,每5條數(shù)據(jù)觸發(fā)一次窗口計(jì)算,效果與實(shí)際代碼預(yù)期相符。
實(shí)際代碼詳見countWindowAll內(nèi)部使用的CountTrigger觸發(fā)器
public AllWindowedStream countWindowAll(long size) {return windowAll(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));}
案例實(shí)踐:時(shí)間窗口
完整代碼見github:
https://github.com/felixzh2020/felixzh-learning-flink/blob/master/MyWindowTrigger/src/main/java/TimeWindowDemo.java
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream socket = env.socketTextStream(“felixzh”, 4444, “”);socket.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))//.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))).process(new ProcessAllWindowFunction() {@Overridepublic void process(Context context, Iterable elements, Collector out) throws Exception {elements.forEach(value -> out.collect(value + ” ” + new Date()));}}).print();env.execute();
數(shù)據(jù)源:nc –l 4444
首先輸入任意字符如felixzh,然后分別5秒內(nèi)和5秒后輸入。
從上圖可以看出,每5秒觸發(fā)一次窗口計(jì)算,效果與實(shí)際代碼預(yù)期相符。
實(shí)際代碼詳見TumblingProcessingTimeWindows內(nèi)部使用的ProcessingTimeTrigger觸發(fā)器。
案例實(shí)踐:時(shí)間窗口+CountTrigger
樂于思考的朋友很容易想到,既然有時(shí)間窗口也有CountTrigger觸發(fā)器,直接組合不就解決背景所述的定時(shí)定量觸發(fā)了嗎?不需要自定義計(jì)數(shù)觸發(fā)器了吧?
該思路下的完整代碼見github:
https://github.com/felixzh2020/felixzh-learning-flink/blob/master/MyWindowTrigger/src/main/java/TimeWindowAddCountTriggerDemo.java
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream socket = env.socketTextStream(“felixzh”, 4444, “”);CountTrigger countTrigger = CountTrigger.of(5);socket.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(30))).trigger(countTrigger)//.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))).process(new ProcessAllWindowFunction() {@Overridepublic void process(Context context, Iterable elements, Collector out) throws Exception {elements.forEach(value -> out.collect(value + ” ” + new Date()));}}).print();env.execute();
數(shù)據(jù)源:nc –l 4444
輸入任意6行字符
從上圖可以看出,即使等待30秒之后,實(shí)際效果并非預(yù)期。
預(yù)期的定時(shí)定量觸發(fā):即5條數(shù)據(jù)達(dá)到觸發(fā)條件,就要觸發(fā)計(jì)算,這點(diǎn)沒毛病。
而30秒達(dá)到觸發(fā)條件,并沒有觸發(fā)計(jì)算。
究其原因:ProcessingTimeTrigger觸發(fā)器onProcessingTime方法返回TriggerResult.FIRE;而CountTrigger觸發(fā)器onProcessingTime方法返回TriggerResult.CONTINUE。
看完TriggerResult枚舉類,相信你會(huì)一目了然:
/** No action is taken on the window. */CONTINUE(false, false),/** {@code FIRE_AND_PURGE} evaluates the window function and emits the window result. */FIRE_AND_PURGE(true, true),/*** On {@code FIRE}, the window is evaluated and results are emitted. The window is not purged,* though, all elements are retained.*/FIRE(true, false),/*** All elements in the window are cleared and the window is discarded, without evaluating the* window function or emitting any elements.*/PURGE(false, true);
簡而言之一句話:FIRE會(huì)觸發(fā)計(jì)算,CONTINUE不會(huì)觸發(fā)計(jì)算。
案例實(shí)踐:時(shí)間窗口+自定義計(jì)數(shù)觸發(fā)器
經(jīng)過上述描述,我們還是需要實(shí)現(xiàn)自定義的計(jì)數(shù)觸發(fā)器,需要區(qū)分事件時(shí)間和處理時(shí)間。
當(dāng)然,思路還是借鑒CountTrigger觸發(fā)器的已有內(nèi)容。
定義MyCountTrigger自定義觸發(fā)器繼承Trigger,完整代碼見github:
https://github.com/felixzh2020/felixzh-learning-flink/blob/master/MyWindowTrigger/src/main/java/MyCountTrigger.java
https://github.com/felixzh2020/felixzh-learning-flink/blob/master/MyWindowTrigger/src/main/java/TimeWindowAddMyCountTriggerDemo.java
案例實(shí)踐代碼如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream socket = env.socketTextStream(“felixzh”, 4444, “”);socket.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(30))).trigger(MyCountTrigger.of(5)).process(new ProcessAllWindowFunction() {@Overridepublic void process(Context context, Iterable elements, Collector out) throws Exception {elements.forEach(value -> out.collect(value + ” ” + new Date()));}}).print();env.execute();
數(shù)據(jù)源:nc –l 4444
輸入任意6行字符
從上圖可以看出,每5條數(shù)據(jù)觸發(fā)一次窗口計(jì)算,每30秒觸發(fā)一次窗口計(jì)算,效果與實(shí)際代碼預(yù)期相符。
結(jié)論
以上,借鑒Flink原生CountTrigger和ProcessingTimeTrigger,實(shí)現(xiàn)自定義Trigger觸發(fā)器,解決客戶現(xiàn)場定時(shí)定量觸發(fā)窗口聚合計(jì)算的效果。