免费爱碰视频在线观看,九九精品国产屋,欧美亚洲尤物久久精品,1024在线观看视频亚洲

      廣東醫(yī)院項(xiàng)目Flink開發(fā)需求:定時(shí)定量窗口觸發(fā)器(從入門到精通)

      廣東醫(yī)院項(xiàng)目Flink開發(fā)需求:定時(shí)定量窗口觸發(fā)器(從入門到精通)

      背景

      最近有局點(diǎn)客戶有這么一個(gè)場景:利用Flink CDC讀取MySql數(shù)據(jù)binlog日志,然后使用窗口進(jìn)行聚合統(tǒng)計(jì),遇到的問題就是Flink現(xiàn)有窗口的觸發(fā)機(jī)制(定時(shí)或者定量)、不滿足他們的實(shí)際需求(定時(shí)和定量)。溫故而知新,可以為師矣。本文基于官網(wǎng)和源碼梳理Flink現(xiàn)有窗口的類型、觸發(fā)機(jī)制等內(nèi)容。最后,基于自定義觸發(fā)器實(shí)現(xiàn)定時(shí)定量觸發(fā)機(jī)制解決該局點(diǎn)客戶的實(shí)際場景問題。

      Flink窗口分類

      窗口是處理無限實(shí)時(shí)流的核心,將無界數(shù)據(jù)流切分為邏輯概念的桶、進(jìn)行實(shí)際計(jì)算邏輯。

      分類方法

      從窗口是否分組來說,F(xiàn)link窗口分為兩類:分組窗口(Keyed Windows)、非分組窗口(Non-Keyed Windows),單從概念上看,區(qū)別就是前者用于keyed streams(即keyby操作后的DataStream)、后者直接用于DataStream。

      實(shí)際區(qū)別很大:keyed streams因?yàn)榘凑罩付╧ey(keyselector)進(jìn)行分組、相同key的元素發(fā)送到相同subtask,所以keyed windows允許在多個(gè)subtask中并行化計(jì)算;而non-keyed streams不進(jìn)行分組,所有non-keyed windows計(jì)算邏輯只能在單個(gè)stask中處理,即并行度只能為1。

      分類方法二

      從窗口屬性來說,F(xiàn)link窗口分為兩類:基于時(shí)間的窗口(滾動(dòng)窗口-Tumbling Window、滑動(dòng)窗口-Sliding Window、會(huì)話窗口-Session Window)、基于計(jì)數(shù)的窗口(全局窗口-GlobalWindow)。很明顯時(shí)間窗口與時(shí)間相關(guān)、每個(gè)窗口都有開始時(shí)間和結(jié)束時(shí)間;計(jì)數(shù)窗口與數(shù)據(jù)條數(shù)相關(guān)、與時(shí)間無關(guān)。內(nèi)置的這幾種窗口的詳細(xì)介紹,后續(xù)單獨(dú)發(fā)文描述。

      兩種分類相關(guān)性

      兩種分類方式的關(guān)系是什么呢?

      從窗口是否分組的圖中我們可以知道Keyed Windows使用window方法、Non-Keyed Windows使用windowAll方法,我們可以看下源碼中這兩個(gè)方法的使用情況。

      這兩個(gè)方法的入?yún)⒍紴閃indowAssigner抽象類。

      上圖紅線部分可以看出,window方法和windowAll方法都可以使用時(shí)間窗口和計(jì)數(shù)窗口;

      上圖標(biāo)黃部分可以看出,window方法和windowAll方法可以使用相同的WindowAssigner(即TumblingWindow、SlidingWindow、GlobalWindow);

      細(xì)心讀者可能會(huì)發(fā)現(xiàn),為何沒有SessionWindows的蹤影?

      我們繼續(xù)看下WindowAssigner抽象類的子類,一目了然、豁然開朗:

      客戶需求

      客戶原本是需要數(shù)據(jù)條數(shù)達(dá)到時(shí)觸發(fā)后續(xù)操作,但是發(fā)現(xiàn)某些時(shí)間段(如非高峰期)數(shù)據(jù)條數(shù)長時(shí)間達(dá)不到以至于不觸發(fā)后續(xù)操作。所以,需要本文開頭所說的定量觸發(fā)基礎(chǔ)上加上定時(shí)觸發(fā)。很顯然,從上面介紹的Flink窗口分類來看,內(nèi)置的這幾類窗口類型并不滿足(時(shí)間窗口屬于定時(shí)觸發(fā)、計(jì)數(shù)窗口屬于定量觸發(fā))。

      自定義觸發(fā)器

      那么,到底是選擇計(jì)數(shù)窗口+自定義時(shí)間觸發(fā)器還是時(shí)間窗口+自定義計(jì)數(shù)觸發(fā)器?

      根據(jù)現(xiàn)有Flink API架構(gòu)上說,時(shí)間窗口+自定義計(jì)數(shù)觸發(fā)器是唯一選擇。

      Trigger觸發(fā)器決定了窗口function何時(shí)對(duì)窗口進(jìn)行運(yùn)算,自定義Trigger觸發(fā)器需要繼承實(shí)現(xiàn)Trigger抽象類。

      該類包括抽象方法如下:

      TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx)

      TriggerResult onProcessingTime(long time, W window, TriggerContext ctx)

      TriggerResult onEventTime(long time, W window, TriggerContext ctx)

      boolean canMerge()

      void onMerge(W window, OnMergeContext ctx)

      void clear(W window, TriggerContext ctx)

      該類有多種類型的實(shí)現(xiàn)子類,感興趣可以自行閱讀,方便自己實(shí)現(xiàn)自定義觸發(fā)器:

      案例實(shí)踐:計(jì)數(shù)窗口

      完整代碼見github:

      https://github.com/felixzh2020/felixzh-learning-flink/blob/master/MyWindowTrigger/src/main/java/CountWindowDemo.java

      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream socket = env.socketTextStream(“felixzh”, 4444, “”);socket.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))//.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))).process(new ProcessAllWindowFunction() {@Overridepublic void process(Context context, Iterable elements, Collector out) throws Exception {elements.forEach(value -> out.collect(value + ” ” + new Date()));}}).print();env.execute();

      數(shù)據(jù)源:nc –l 4444

      首先輸入5個(gè)1(注意別忘回車)和1個(gè)2,過一會(huì)再輸入4個(gè)2。

      從上圖可以看出,每5條數(shù)據(jù)觸發(fā)一次窗口計(jì)算,效果與實(shí)際代碼預(yù)期相符。

      實(shí)際代碼詳見countWindowAll內(nèi)部使用的CountTrigger觸發(fā)器

      public AllWindowedStream countWindowAll(long size) {return windowAll(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));}

      案例實(shí)踐:時(shí)間窗口

      完整代碼見github:

      https://github.com/felixzh2020/felixzh-learning-flink/blob/master/MyWindowTrigger/src/main/java/TimeWindowDemo.java

      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream socket = env.socketTextStream(“felixzh”, 4444, “”);socket.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))//.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))).process(new ProcessAllWindowFunction() {@Overridepublic void process(Context context, Iterable elements, Collector out) throws Exception {elements.forEach(value -> out.collect(value + ” ” + new Date()));}}).print();env.execute();

      數(shù)據(jù)源:nc –l 4444

      首先輸入任意字符如felixzh,然后分別5秒內(nèi)和5秒后輸入。

      從上圖可以看出,每5秒觸發(fā)一次窗口計(jì)算,效果與實(shí)際代碼預(yù)期相符。

      實(shí)際代碼詳見TumblingProcessingTimeWindows內(nèi)部使用的ProcessingTimeTrigger觸發(fā)器。

      案例實(shí)踐:時(shí)間窗口+CountTrigger

      樂于思考的朋友很容易想到,既然有時(shí)間窗口也有CountTrigger觸發(fā)器,直接組合不就解決背景所述的定時(shí)定量觸發(fā)了嗎?不需要自定義計(jì)數(shù)觸發(fā)器了吧?

      該思路下的完整代碼見github:

      https://github.com/felixzh2020/felixzh-learning-flink/blob/master/MyWindowTrigger/src/main/java/TimeWindowAddCountTriggerDemo.java

      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream socket = env.socketTextStream(“felixzh”, 4444, “”);CountTrigger countTrigger = CountTrigger.of(5);socket.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(30))).trigger(countTrigger)//.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))).process(new ProcessAllWindowFunction() {@Overridepublic void process(Context context, Iterable elements, Collector out) throws Exception {elements.forEach(value -> out.collect(value + ” ” + new Date()));}}).print();env.execute();

      數(shù)據(jù)源:nc –l 4444

      輸入任意6行字符

      從上圖可以看出,即使等待30秒之后,實(shí)際效果并非預(yù)期。

      預(yù)期的定時(shí)定量觸發(fā):即5條數(shù)據(jù)達(dá)到觸發(fā)條件,就要觸發(fā)計(jì)算,這點(diǎn)沒毛病。

      而30秒達(dá)到觸發(fā)條件,并沒有觸發(fā)計(jì)算。

      究其原因:ProcessingTimeTrigger觸發(fā)器onProcessingTime方法返回TriggerResult.FIRE;而CountTrigger觸發(fā)器onProcessingTime方法返回TriggerResult.CONTINUE。

      看完TriggerResult枚舉類,相信你會(huì)一目了然:

      /** No action is taken on the window. */CONTINUE(false, false),/** {@code FIRE_AND_PURGE} evaluates the window function and emits the window result. */FIRE_AND_PURGE(true, true),/*** On {@code FIRE}, the window is evaluated and results are emitted. The window is not purged,* though, all elements are retained.*/FIRE(true, false),/*** All elements in the window are cleared and the window is discarded, without evaluating the* window function or emitting any elements.*/PURGE(false, true);

      簡而言之一句話:FIRE會(huì)觸發(fā)計(jì)算,CONTINUE不會(huì)觸發(fā)計(jì)算。

      案例實(shí)踐:時(shí)間窗口+自定義計(jì)數(shù)觸發(fā)器

      經(jīng)過上述描述,我們還是需要實(shí)現(xiàn)自定義的計(jì)數(shù)觸發(fā)器,需要區(qū)分事件時(shí)間和處理時(shí)間。

      當(dāng)然,思路還是借鑒CountTrigger觸發(fā)器的已有內(nèi)容。

      定義MyCountTrigger自定義觸發(fā)器繼承Trigger,完整代碼見github:

      https://github.com/felixzh2020/felixzh-learning-flink/blob/master/MyWindowTrigger/src/main/java/MyCountTrigger.java

      https://github.com/felixzh2020/felixzh-learning-flink/blob/master/MyWindowTrigger/src/main/java/TimeWindowAddMyCountTriggerDemo.java

      案例實(shí)踐代碼如下:

      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream socket = env.socketTextStream(“felixzh”, 4444, “”);socket.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(30))).trigger(MyCountTrigger.of(5)).process(new ProcessAllWindowFunction() {@Overridepublic void process(Context context, Iterable elements, Collector out) throws Exception {elements.forEach(value -> out.collect(value + ” ” + new Date()));}}).print();env.execute();

      數(shù)據(jù)源:nc –l 4444

      輸入任意6行字符

      從上圖可以看出,每5條數(shù)據(jù)觸發(fā)一次窗口計(jì)算,每30秒觸發(fā)一次窗口計(jì)算,效果與實(shí)際代碼預(yù)期相符。

      結(jié)論

      以上,借鑒Flink原生CountTrigger和ProcessingTimeTrigger,實(shí)現(xiàn)自定義Trigger觸發(fā)器,解決客戶現(xiàn)場定時(shí)定量觸發(fā)窗口聚合計(jì)算的效果。

      鄭重聲明:本文內(nèi)容及圖片均整理自互聯(lián)網(wǎng),不代表本站立場,版權(quán)歸原作者所有,如有侵權(quán)請(qǐng)聯(lián)系管理員(admin#wlmqw.com)刪除。
      (0)
      用戶投稿
      上一篇 2022年6月20日 23:18
      下一篇 2022年6月20日 23:19

      相關(guān)推薦

      • 在從廣西到廣東打工的人看來,你心中的廣西人是什么樣的?

        我最早接廣西人是在來賓,那是08年,在一個(gè)項(xiàng)目部做技術(shù)指導(dǎo)及投入運(yùn)營后的服務(wù),剛下火車就把我嚇一跳,車站廣場的高音喇叭循環(huán)播放要注意傳銷,別誤入傳銷組織,來接我的司機(jī)師傅告訴我,這…

        2022年4月28日
      • 潤芯微科技再獲上汽定點(diǎn),行業(yè)合作持續(xù)深入

        6月16日,潤芯微科技(江蘇)有限公司(以下簡稱“潤芯微科技”)獲得了上汽的量產(chǎn)項(xiàng)目定點(diǎn),潤芯微科技將作為榮威下一代A級(jí)主力車型AP31智能座艙供應(yīng)商。這意味著,潤芯微科技與上汽在…

        2022年6月19日
      • 自我管理的好習(xí)慣

        1、凡事提前10分鐘   凡事提前10分鐘,會(huì)讓你有充裕的時(shí)間應(yīng)對(duì)可能的突發(fā)事件,更加從容。試著把起床鬧鐘提前10分鐘。你就會(huì)發(fā)現(xiàn)你出門不必急匆匆,早飯也可以慢慢享用,一整天的狀態(tài)…

        2022年4月29日
      • 新專利曝光,魅族 19 熱帖發(fā)布!魅族不斷布局出行科技圈

        如今,手機(jī)行業(yè)和汽車行業(yè)跨界互聯(lián)已成大勢(shì),有汽車企業(yè)選擇造手機(jī),比如蔚來;也有手機(jī)廠商選擇開始造車,比如小米;還有的手機(jī)廠商選擇跨界合作、強(qiáng)強(qiáng)聯(lián)手,比如魅族。相比于從零開始邁入新圈…

        2022年8月14日
      • 緊急提醒:坐過這兩趟列車,請(qǐng)立刻報(bào)備

        來源:深圳晚報(bào) 目前正值暑期旅游旺季 往西藏自治區(qū)方向出游游客較多 在此,廣東疾控呼吁 重要提醒 有乘坐8月4日C924次列車和8月5日C894次列車的在粵市民盡快向社區(qū)報(bào)備,做好…

        2022年8月12日
      • IT設(shè)備行業(yè)借助CRM數(shù)字化轉(zhuǎn)型,重塑企業(yè)新增長

        “疫情時(shí)代”企業(yè)傳統(tǒng)線下營銷模式屢屢受挫,以往得心應(yīng)手的市場或內(nèi)部管理手段,在這個(gè)特殊的時(shí)期,舉步維艱。越來越多的企業(yè)嘗試開辟新的路徑,實(shí)現(xiàn)企業(yè)在內(nèi)外部之間的交流。 隨著互聯(lián)網(wǎng)人口…

        2022年6月16日
      • "亞洲飛人"蘇炳添:與妻子相濡以沫20年,我虧欠她太多了

        01前言 一年前,在東京奧運(yùn)會(huì)男子百米半決賽中,一位選手以9.83秒的優(yōu)異成績沖向終點(diǎn)。 他的成績不僅打破亞洲紀(jì)錄,進(jìn)入決賽;而且這還是第一位進(jìn)入奧運(yùn)會(huì)男子百米決賽的中國運(yùn)動(dòng)員,也…

        2022年7月17日
      • 6月19日區(qū)塊鏈資訊匯總(五)

        【16:56】【數(shù)據(jù):以太坊2.0合約質(zhì)押數(shù)量達(dá)1291萬ETH】6月19日消息,Watcher.Guru發(fā)推表示,當(dāng)前以太坊2.0合約質(zhì)押數(shù)量達(dá)12,916,565ETH,創(chuàng)歷史…

        2022年6月22日
      • 我思?我在?

        生活紛雜,諸事擾心。常常寄心于各種繁雜場合,戴上一副違心的面具,說著話術(shù)般的話語,回首而望,只有落寞。為什么明明一天安排的滿滿的,卻沒有充實(shí)感?一個(gè)重要的問題是“我在干什么”? 思…

        2022年8月20日
      • 100+經(jīng)典Java面試題及答案解析

        面向?qū)ο缶幊蹋∣OP) Java是一個(gè)支持并發(fā)、基于類和面向?qū)ο蟮挠?jì)算機(jī)編程語言。下面列出了面向?qū)ο筌浖_發(fā)的優(yōu)點(diǎn): 代碼開發(fā)模塊化,更易維護(hù)和修改。 代碼復(fù)用。 增強(qiáng)代碼的可靠性…

        2022年8月5日

      聯(lián)系我們

      聯(lián)系郵箱:admin#wlmqw.com
      工作時(shí)間:周一至周五,10:30-18:30,節(jié)假日休息