在线不卡日本ⅴ一区v二区_精品一区二区中文字幕_天堂v在线视频_亚洲五月天婷婷中文网站

  • <menu id="lky3g"></menu>
  • <style id="lky3g"></style>
    <pre id="lky3g"><tt id="lky3g"></tt></pre>

    flink流式增量查詢hudi表流程分析

    flink流式增量查詢hudi表流程分析

    環(huán)境

    • flink 1.13.6
    • hudi 0.11.0
    • merge on read 表

    代碼示例

    tEnv.executeSql(“CREATE TABLE tb_person_hudi ( id BIGINT, age INT, name STRING,create_time TIMESTAMP ( 3 ), time_stamp TIMESTAMP(3),PRIMARY KEY ( id ) NOT ENFORCED ) WITH (” + “‘connector’ = ‘hudi’,” + “‘table.type’ = ‘MERGE_ON_READ’,” + “‘path’ = ‘file:///D:/data/hadoop3.2.1/warehouse/tb_person_hudi’,” + “‘read.start-commit’ = ‘20220722103000’,” + // “‘read.end-commit’ = ‘20220722104000’,” + “‘read.task’ = ‘1’,” + “‘read.streaming.enabled’ = ‘true’,” + “‘read.streaming.check-interval’ = ’30’ ” + “)”);Table table = tEnv.sqlQuery(“select * from tb_person_hudi “);tEnv.toChangelogStream(table).print().setParallelism(1);env.execute(“test”);

    流程分析

    hudi源入口(HoodieTableSource)

    HoodieTableSource實現(xiàn)ScanTableSource,SupportsPartitionPushDown,SupportsProjectionPushDown,SupportsLimitPushDown,SupportsFilterPushDown接口,后4個接口主要是支持對查詢計劃的優(yōu)化。ScanTableSource則提供了讀取hudi表的具體實現(xiàn),核心方法為org.apache.hudi.table.HoodieTableSource#getScanRuntimeProvider:

    if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING)) { //開啟了流式讀(read.streaming.enabled) StreamReadMonitoringFunction monitoringFunction = new StreamReadMonitoringFunction( conf, FilePathUtils.toFlinkPath(path), maxCompactionMemoryInBytes, getRequiredPartitionPaths()); InputFormat inputFormat = getInputFormat(true); OneInputStreamOperatorFactory factory = StreamReadOperator.factory((MergeOnReadInputFormat) inputFormat); SingleOutputStreamOperator source = execEnv.addSource(monitoringFunction, getSourceOperatorName(“split_monitor”)) .setParallelism(1) .transform(“split_reader”, typeInfo, factory) .setParallelism(conf.getInteger(FlinkOptions.READ_TASKS)); return new DataStreamSource(source);}

    上面代碼在流環(huán)境中創(chuàng)建了一個SourceFunction(StreamReadMonitoringFunction)和一個自定義的轉(zhuǎn)換(StreamReadOperator)

    • StreamReadMonitoringFunction: 監(jiān)控hudi表元數(shù)據(jù)目錄(.hoodie)獲取需要被讀取的文件分片(MergeOnReadInputSplit,一個base parquet文件和一組log文件),然后把分片遞給下游的轉(zhuǎn)換算子StreamReadOperator進行文件讀??;固定一個線程去監(jiān)控,名稱為split_monitorxxxxx.
    • StreamReadOperator:將按timeline升序收到的MergeOnReadInputSplit一個一個地讀取分片數(shù)據(jù);算子名稱為split_reader->xxxxx,可以通過設(shè)置read.tasks進行設(shè)置并行度

    定時監(jiān)控元數(shù)據(jù)獲得增量分片(StreamReadMonitoringFunction)

    StreamReadMonitoringFunction負責定時(read.streaming.check-interval)掃描hudi表的元數(shù)據(jù)目錄.hoodie,如果發(fā)現(xiàn)在active timeline上有新增的instant[action=commit,deltacommit,compaction,replace && active=completed],從這些instant信息中可以知道數(shù)據(jù)變更寫到了哪些文件(parquet,log),然后構(gòu)建成分片對象(MergeOnReadInputSplit)。

    • 核心屬性:issuedInstant,這個是增量查詢的依據(jù),記錄著當前已經(jīng)消費的數(shù)據(jù)的最新instant,類似于kafka的offset,但是hudi是基于timeline.該值是有狀態(tài)的,維護在ListState中,所以flink job重啟依然可以做到增量。
    • 核心方法:StreamReadMonitoringFunction#monitorDirAndForwardSplits,很簡單,就做了兩件事,調(diào)用IncrementalInputSplits#inputSplits獲取到增量分片(有序),然后傳遞給下游的算子(StreamReadOperator)

    public void monitorDirAndForwardSplits(SourceContext context) { HoodieTableMetaClient metaClient = getOrCreateMetaClient(); IncrementalInputSplits.Result result = incrementalInputSplits.inputSplits(metaClient, this.hadoopConf, this.issuedInstant); for (MergeOnReadInputSplit split : result.getInputSplits()) { context.collect(split); }}

    獲取增量分片(IncrementalInputSplits)

    主要邏輯在方法IncrementalInputSplits#inputSplits(metaClient, hadoopConf, issuedInstant),需要先了解hudi關(guān)于timeline和instant的一些基本概念,詳細的流程如下圖所示:

    如果flink job首次運行指定了read.start-commit和read.end-commit,但是該范圍是比較久以前,instant已經(jīng)被歸檔,那么流作業(yè)將永遠不能消費到數(shù)據(jù)

    https://github.com/apache/hudi/issues/6167

    讀取數(shù)據(jù)文件(StreamReadOperator)

    StreamReadOperator算子接收分片后會緩存在隊列Queue splits,然后不停從隊列中poll分片放到線程池中執(zhí)行

    private void processSplits() throws IOException { format.open(split); consumeAsMiniBatch(split); enqueueProcessSplits(); }

    主要有三個步驟

  • 從隊列中peek分片,調(diào)用MergeOnReadInputFormat.open構(gòu)建迭代器,迭代器是用來進行文件的數(shù)據(jù)讀取,一個迭代器對應(yīng)一個分片(多個物理文件,base+log),對應(yīng)不同讀取的場景,有幾種迭代器:BaseFileOnlyFilteringIterator,BaseFileOnlyIterator,LogFileOnlyIterator,MergeIterator,SkipMergeIterator
  • 微批量消費,每批只讀2048記錄,將把記錄傳遞給下游的算子消費同時標記消費的總數(shù),如果該分片讀到了尾,則將該分片從隊列中彈出,并關(guān)閉MergeOnReadInputFormat
  • 繼續(xù)處理隊列中的分片,回到步驟1,如果上一次的分片沒消費完,那么本次循環(huán)將繼續(xù)消費,只不過是由另一個線程處理。
  • 鄭重聲明:本文內(nèi)容及圖片均整理自互聯(lián)網(wǎng),不代表本站立場,版權(quán)歸原作者所有,如有侵權(quán)請聯(lián)系管理員(admin#wlmqw.com)刪除。
    上一篇 2022年7月24日 18:43
    下一篇 2022年7月24日 18:43

    相關(guān)推薦

    聯(lián)系我們

    聯(lián)系郵箱:admin#wlmqw.com
    工作時間:周一至周五,10:30-18:30,節(jié)假日休息