在线不卡日本ⅴ一区v二区_精品一区二区中文字幕_天堂v在线视频_亚洲五月天婷婷中文网站

  • <menu id="lky3g"></menu>
  • <style id="lky3g"></style>
    <pre id="lky3g"><tt id="lky3g"></tt></pre>

    廣東醫(yī)院項目Flink開發(fā)需求:定時定量窗口觸發(fā)器(從入門到精通)

    廣東醫(yī)院項目Flink開發(fā)需求:定時定量窗口觸發(fā)器(從入門到精通)

    背景

    最近有局點客戶有這么一個場景:利用Flink CDC讀取MySql數(shù)據(jù)binlog日志,然后使用窗口進行聚合統(tǒng)計,遇到的問題就是Flink現(xiàn)有窗口的觸發(fā)機制(定時或者定量)、不滿足他們的實際需求(定時和定量)。溫故而知新,可以為師矣。本文基于官網(wǎng)和源碼梳理Flink現(xiàn)有窗口的類型、觸發(fā)機制等內(nèi)容。最后,基于自定義觸發(fā)器實現(xiàn)定時定量觸發(fā)機制解決該局點客戶的實際場景問題。

    Flink窗口分類

    窗口是處理無限實時流的核心,將無界數(shù)據(jù)流切分為邏輯概念的桶、進行實際計算邏輯。

    分類方法

    從窗口是否分組來說,F(xiàn)link窗口分為兩類:分組窗口(Keyed Windows)、非分組窗口(Non-Keyed Windows),單從概念上看,區(qū)別就是前者用于keyed streams(即keyby操作后的DataStream)、后者直接用于DataStream。

    實際區(qū)別很大:keyed streams因為按照指定key(keyselector)進行分組、相同key的元素發(fā)送到相同subtask,所以keyed windows允許在多個subtask中并行化計算;而non-keyed streams不進行分組,所有non-keyed windows計算邏輯只能在單個stask中處理,即并行度只能為1。

    分類方法二

    從窗口屬性來說,F(xiàn)link窗口分為兩類:基于時間的窗口(滾動窗口-Tumbling Window、滑動窗口-Sliding Window、會話窗口-Session Window)、基于計數(shù)的窗口(全局窗口-GlobalWindow)。很明顯時間窗口與時間相關(guān)、每個窗口都有開始時間和結(jié)束時間;計數(shù)窗口與數(shù)據(jù)條數(shù)相關(guān)、與時間無關(guān)。內(nèi)置的這幾種窗口的詳細(xì)介紹,后續(xù)單獨發(fā)文描述。

    兩種分類相關(guān)性

    兩種分類方式的關(guān)系是什么呢?

    從窗口是否分組的圖中我們可以知道Keyed Windows使用window方法、Non-Keyed Windows使用windowAll方法,我們可以看下源碼中這兩個方法的使用情況。

    這兩個方法的入?yún)⒍紴閃indowAssigner抽象類。

    上圖紅線部分可以看出,window方法和windowAll方法都可以使用時間窗口和計數(shù)窗口;

    上圖標(biāo)黃部分可以看出,window方法和windowAll方法可以使用相同的WindowAssigner(即TumblingWindow、SlidingWindow、GlobalWindow);

    細(xì)心讀者可能會發(fā)現(xiàn),為何沒有SessionWindows的蹤影?

    我們繼續(xù)看下WindowAssigner抽象類的子類,一目了然、豁然開朗:

    客戶需求

    客戶原本是需要數(shù)據(jù)條數(shù)達到時觸發(fā)后續(xù)操作,但是發(fā)現(xiàn)某些時間段(如非高峰期)數(shù)據(jù)條數(shù)長時間達不到以至于不觸發(fā)后續(xù)操作。所以,需要本文開頭所說的定量觸發(fā)基礎(chǔ)上加上定時觸發(fā)。很顯然,從上面介紹的Flink窗口分類來看,內(nèi)置的這幾類窗口類型并不滿足(時間窗口屬于定時觸發(fā)、計數(shù)窗口屬于定量觸發(fā))。

    自定義觸發(fā)器

    那么,到底是選擇計數(shù)窗口+自定義時間觸發(fā)器還是時間窗口+自定義計數(shù)觸發(fā)器?

    根據(jù)現(xiàn)有Flink API架構(gòu)上說,時間窗口+自定義計數(shù)觸發(fā)器是唯一選擇。

    Trigger觸發(fā)器決定了窗口function何時對窗口進行運算,自定義Trigger觸發(fā)器需要繼承實現(xiàn)Trigger抽象類。

    該類包括抽象方法如下:

    TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx)

    TriggerResult onProcessingTime(long time, W window, TriggerContext ctx)

    TriggerResult onEventTime(long time, W window, TriggerContext ctx)

    boolean canMerge()

    void onMerge(W window, OnMergeContext ctx)

    void clear(W window, TriggerContext ctx)

    該類有多種類型的實現(xiàn)子類,感興趣可以自行閱讀,方便自己實現(xiàn)自定義觸發(fā)器:

    案例實踐:計數(shù)窗口

    完整代碼見github:

    https://github.com/felixzh2020/felixzh-learning-flink/blob/master/MyWindowTrigger/src/main/java/CountWindowDemo.java

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream socket = env.socketTextStream(“felixzh”, 4444, “”);socket.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))//.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))).process(new ProcessAllWindowFunction() {@Overridepublic void process(Context context, Iterable elements, Collector out) throws Exception {elements.forEach(value -> out.collect(value + ” ” + new Date()));}}).print();env.execute();

    數(shù)據(jù)源:nc –l 4444

    首先輸入5個1(注意別忘回車)和1個2,過一會再輸入4個2。

    從上圖可以看出,每5條數(shù)據(jù)觸發(fā)一次窗口計算,效果與實際代碼預(yù)期相符。

    實際代碼詳見countWindowAll內(nèi)部使用的CountTrigger觸發(fā)器

    public AllWindowedStream countWindowAll(long size) {return windowAll(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));}

    案例實踐:時間窗口

    完整代碼見github:

    https://github.com/felixzh2020/felixzh-learning-flink/blob/master/MyWindowTrigger/src/main/java/TimeWindowDemo.java

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream socket = env.socketTextStream(“felixzh”, 4444, “”);socket.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))//.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))).process(new ProcessAllWindowFunction() {@Overridepublic void process(Context context, Iterable elements, Collector out) throws Exception {elements.forEach(value -> out.collect(value + ” ” + new Date()));}}).print();env.execute();

    數(shù)據(jù)源:nc –l 4444

    首先輸入任意字符如felixzh,然后分別5秒內(nèi)和5秒后輸入。

    從上圖可以看出,每5秒觸發(fā)一次窗口計算,效果與實際代碼預(yù)期相符。

    實際代碼詳見TumblingProcessingTimeWindows內(nèi)部使用的ProcessingTimeTrigger觸發(fā)器。

    案例實踐:時間窗口+CountTrigger

    樂于思考的朋友很容易想到,既然有時間窗口也有CountTrigger觸發(fā)器,直接組合不就解決背景所述的定時定量觸發(fā)了嗎?不需要自定義計數(shù)觸發(fā)器了吧?

    該思路下的完整代碼見github:

    https://github.com/felixzh2020/felixzh-learning-flink/blob/master/MyWindowTrigger/src/main/java/TimeWindowAddCountTriggerDemo.java

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream socket = env.socketTextStream(“felixzh”, 4444, “”);CountTrigger countTrigger = CountTrigger.of(5);socket.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(30))).trigger(countTrigger)//.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))).process(new ProcessAllWindowFunction() {@Overridepublic void process(Context context, Iterable elements, Collector out) throws Exception {elements.forEach(value -> out.collect(value + ” ” + new Date()));}}).print();env.execute();

    數(shù)據(jù)源:nc –l 4444

    輸入任意6行字符

    從上圖可以看出,即使等待30秒之后,實際效果并非預(yù)期。

    預(yù)期的定時定量觸發(fā):即5條數(shù)據(jù)達到觸發(fā)條件,就要觸發(fā)計算,這點沒毛病。

    而30秒達到觸發(fā)條件,并沒有觸發(fā)計算。

    究其原因:ProcessingTimeTrigger觸發(fā)器onProcessingTime方法返回TriggerResult.FIRE;而CountTrigger觸發(fā)器onProcessingTime方法返回TriggerResult.CONTINUE。

    看完TriggerResult枚舉類,相信你會一目了然:

    /** No action is taken on the window. */CONTINUE(false, false),/** {@code FIRE_AND_PURGE} evaluates the window function and emits the window result. */FIRE_AND_PURGE(true, true),/*** On {@code FIRE}, the window is evaluated and results are emitted. The window is not purged,* though, all elements are retained.*/FIRE(true, false),/*** All elements in the window are cleared and the window is discarded, without evaluating the* window function or emitting any elements.*/PURGE(false, true);

    簡而言之一句話:FIRE會觸發(fā)計算,CONTINUE不會觸發(fā)計算。

    案例實踐:時間窗口+自定義計數(shù)觸發(fā)器

    經(jīng)過上述描述,我們還是需要實現(xiàn)自定義的計數(shù)觸發(fā)器,需要區(qū)分事件時間和處理時間。

    當(dāng)然,思路還是借鑒CountTrigger觸發(fā)器的已有內(nèi)容。

    定義MyCountTrigger自定義觸發(fā)器繼承Trigger,完整代碼見github:

    https://github.com/felixzh2020/felixzh-learning-flink/blob/master/MyWindowTrigger/src/main/java/MyCountTrigger.java

    https://github.com/felixzh2020/felixzh-learning-flink/blob/master/MyWindowTrigger/src/main/java/TimeWindowAddMyCountTriggerDemo.java

    案例實踐代碼如下:

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream socket = env.socketTextStream(“felixzh”, 4444, “”);socket.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(30))).trigger(MyCountTrigger.of(5)).process(new ProcessAllWindowFunction() {@Overridepublic void process(Context context, Iterable elements, Collector out) throws Exception {elements.forEach(value -> out.collect(value + ” ” + new Date()));}}).print();env.execute();

    數(shù)據(jù)源:nc –l 4444

    輸入任意6行字符

    從上圖可以看出,每5條數(shù)據(jù)觸發(fā)一次窗口計算,每30秒觸發(fā)一次窗口計算,效果與實際代碼預(yù)期相符。

    結(jié)論

    以上,借鑒Flink原生CountTrigger和ProcessingTimeTrigger,實現(xiàn)自定義Trigger觸發(fā)器,解決客戶現(xiàn)場定時定量觸發(fā)窗口聚合計算的效果。

    鄭重聲明:本文內(nèi)容及圖片均整理自互聯(lián)網(wǎng),不代表本站立場,版權(quán)歸原作者所有,如有侵權(quán)請聯(lián)系管理員(admin#wlmqw.com)刪除。
    用戶投稿
    上一篇 2022年6月20日 23:18
    下一篇 2022年6月20日 23:19

    相關(guān)推薦

    • 分享4條發(fā)微商朋友圈的方法(微商朋友圈應(yīng)該怎么發(fā))

      對于微商朋友來說,朋友圈的重要性不言而喻了。 那么微商的朋友圈到底該怎么發(fā)呢? 為什么同樣是經(jīng)營一個朋友圈,有的微商看起來逼格滿滿,實際效果也不錯;而有的卻動都不動就被屏蔽甚至拉黑…

      2022年11月27日
    • 廣東佛山瓷磚十大品牌有哪些(瓷磚十大名牌排行榜)

      關(guān)于瓷磚品牌的選擇,經(jīng)常有朋友問到這樣的問題:中國的十大瓷磚品牌有哪些?近幾年備受追捧的大角鹿瓷磚排名第幾?今天小編就來為大家解答一下。 先來說第一個問題:中國十大瓷磚品牌有哪些?…

      2022年11月27日
    • 30個無加盟費的項目(茶顏悅色奶茶店加盟費多少)

      茶顏悅色又爆了,8月18日,茶顏悅色南京門店正式開業(yè),開張不到半小時,門店就人滿為患,消費者的購買熱情十分高漲,而由于人流量過大造成擁堵,茶顏悅色也不得不暫停營業(yè)。 當(dāng)然,這里面排…

      2022年11月27日
    • 凈利潤率越高越好嗎(凈利潤率多少合適)

      一、持續(xù)增收不增利,平均凈利潤率首次跌入個位數(shù) 2021年,增收不增利依舊是行業(yè)主流。具體來看,大部分企業(yè)營業(yè)收入呈增長態(tài)勢,E50企業(yè)平均同比增速達到17.3%,但是利潤增速則明…

      2022年11月26日
    • 抖音帶貨怎么做入門(抖音帶貨怎么做入門教學(xué))

      相信很多小伙伴都有注意到,現(xiàn)在抖音已經(jīng)成為大家最常光顧的一個平臺了,作為一個日活破億的流量池,如今抖音上的用戶數(shù)量極大。因此,現(xiàn)在在抖音上帶貨、賣貨的人也是越來越多了,那么想在抖音…

      2022年11月25日
    • 《寶可夢朱紫》夢特性怎么獲得?隱藏特性獲取方法推薦

      寶可夢朱紫里有很多寶可夢都是擁有夢特性會變強的寶可夢,很多玩家不知道夢特性怎么獲得,下面就給大家?guī)韺毧蓧糁熳想[藏特性獲取方法推薦,感興趣的小伙伴一起來看看吧,希望能幫助到大家。 …

      2022年11月25日
    • 《寶可夢朱紫》奇魯莉安怎么進化?奇魯莉安進化方法分享

      寶可夢朱紫中的奇魯莉安要怎么進化呢?很多玩家都不知道,下面就給大家?guī)韺毧蓧糁熳掀骠斃虬策M化方法分享,感興趣的小伙伴一起來看看吧,希望能幫助到大家。 奇魯莉安進化方法分享 奇魯莉安…

      2022年11月25日
    • 5+3疫情防控從哪天開始算(遼寧疫情防控最新政策)

      最近有關(guān)國內(nèi)各地的疫情大家也都有在持續(xù)關(guān)注,目前國內(nèi)各地疫情隔離時間也根據(jù)二十條防控措施有了新的調(diào)整。那么,5+3疫情防控從哪天開始算?對于密接的5+3隔離時間計算大家還是比較關(guān)心…

      2022年11月25日
    • 藍碼怎么變綠碼需要幾天(藍碼怎么變綠碼需要幾天)

      大家都知道健康碼的顏色有紅碼、綠碼、黃碼,近日湖南健康碼上線“藍碼”,不少小伙伴發(fā)現(xiàn)自己健康碼變藍了,都想趕緊恢復(fù)綠碼,那么藍碼怎么變綠碼需要幾天?下面小編為大家?guī)硭{碼變綠碼需要…

      2022年11月25日
    • 規(guī)范透明促PPP高質(zhì)量發(fā)展——16萬億元大市場迎來新規(guī)

      近日,財政部印發(fā)《關(guān)于進一步推動政府和社會資本合作(PPP)規(guī)范發(fā)展、陽光運行的通知》,從做好項目前期論證、推動項目規(guī)范運作、嚴(yán)防隱性債務(wù)風(fēng)險、保障項目陽光運行四個方面進一步規(guī)范P…

      2022年11月25日

    聯(lián)系我們

    聯(lián)系郵箱:admin#wlmqw.com
    工作時間:周一至周五,10:30-18:30,節(jié)假日休息