環(huán)境
- flink 1.13.6
- hudi 0.11.0
- merge on read 表
代碼示例
tEnv.executeSql(“CREATE TABLE tb_person_hudi ( id BIGINT, age INT, name STRING,create_time TIMESTAMP ( 3 ), time_stamp TIMESTAMP(3),PRIMARY KEY ( id ) NOT ENFORCED ) WITH (” + “‘connector’ = ‘hudi’,” + “‘table.type’ = ‘MERGE_ON_READ’,” + “‘path’ = ‘file:///D:/data/hadoop3.2.1/warehouse/tb_person_hudi’,” + “‘read.start-commit’ = ‘20220722103000’,” + // “‘read.end-commit’ = ‘20220722104000’,” + “‘read.task’ = ‘1’,” + “‘read.streaming.enabled’ = ‘true’,” + “‘read.streaming.check-interval’ = ’30’ ” + “)”);Table table = tEnv.sqlQuery(“select * from tb_person_hudi “);tEnv.toChangelogStream(table).print().setParallelism(1);env.execute(“test”);
流程分析
hudi源入口(HoodieTableSource)
HoodieTableSource實現(xiàn)ScanTableSource,SupportsPartitionPushDown,SupportsProjectionPushDown,SupportsLimitPushDown,SupportsFilterPushDown接口,后4個接口主要是支持對查詢計劃的優(yōu)化。ScanTableSource則提供了讀取hudi表的具體實現(xiàn),核心方法為org.apache.hudi.table.HoodieTableSource#getScanRuntimeProvider:
if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING)) { //開啟了流式讀(read.streaming.enabled) StreamReadMonitoringFunction monitoringFunction = new StreamReadMonitoringFunction( conf, FilePathUtils.toFlinkPath(path), maxCompactionMemoryInBytes, getRequiredPartitionPaths()); InputFormat inputFormat = getInputFormat(true); OneInputStreamOperatorFactory factory = StreamReadOperator.factory((MergeOnReadInputFormat) inputFormat); SingleOutputStreamOperator source = execEnv.addSource(monitoringFunction, getSourceOperatorName(“split_monitor”)) .setParallelism(1) .transform(“split_reader”, typeInfo, factory) .setParallelism(conf.getInteger(FlinkOptions.READ_TASKS)); return new DataStreamSource(source);}
上面代碼在流環(huán)境中創(chuàng)建了一個SourceFunction(StreamReadMonitoringFunction)和一個自定義的轉(zhuǎn)換(StreamReadOperator)
- StreamReadMonitoringFunction: 監(jiān)控hudi表元數(shù)據(jù)目錄(.hoodie)獲取需要被讀取的文件分片(MergeOnReadInputSplit,一個base parquet文件和一組log文件),然后把分片遞給下游的轉(zhuǎn)換算子StreamReadOperator進行文件讀??;固定一個線程去監(jiān)控,名稱為split_monitorxxxxx.
- StreamReadOperator:將按timeline升序收到的MergeOnReadInputSplit一個一個地讀取分片數(shù)據(jù);算子名稱為split_reader->xxxxx,可以通過設(shè)置read.tasks進行設(shè)置并行度
定時監(jiān)控元數(shù)據(jù)獲得增量分片(StreamReadMonitoringFunction)
StreamReadMonitoringFunction負責定時(read.streaming.check-interval)掃描hudi表的元數(shù)據(jù)目錄.hoodie,如果發(fā)現(xiàn)在active timeline上有新增的instant[action=commit,deltacommit,compaction,replace && active=completed],從這些instant信息中可以知道數(shù)據(jù)變更寫到了哪些文件(parquet,log),然后構(gòu)建成分片對象(MergeOnReadInputSplit)。
- 核心屬性:issuedInstant,這個是增量查詢的依據(jù),記錄著當前已經(jīng)消費的數(shù)據(jù)的最新instant,類似于kafka的offset,但是hudi是基于timeline.該值是有狀態(tài)的,維護在ListState中,所以flink job重啟依然可以做到增量。
- 核心方法:StreamReadMonitoringFunction#monitorDirAndForwardSplits,很簡單,就做了兩件事,調(diào)用IncrementalInputSplits#inputSplits獲取到增量分片(有序),然后傳遞給下游的算子(StreamReadOperator)
public void monitorDirAndForwardSplits(SourceContext context) { HoodieTableMetaClient metaClient = getOrCreateMetaClient(); IncrementalInputSplits.Result result = incrementalInputSplits.inputSplits(metaClient, this.hadoopConf, this.issuedInstant); for (MergeOnReadInputSplit split : result.getInputSplits()) { context.collect(split); }}
獲取增量分片(IncrementalInputSplits)
主要邏輯在方法IncrementalInputSplits#inputSplits(metaClient, hadoopConf, issuedInstant),需要先了解hudi關(guān)于timeline和instant的一些基本概念,詳細的流程如下圖所示:
如果flink job首次運行指定了read.start-commit和read.end-commit,但是該范圍是比較久以前,instant已經(jīng)被歸檔,那么流作業(yè)將永遠不能消費到數(shù)據(jù)
https://github.com/apache/hudi/issues/6167
讀取數(shù)據(jù)文件(StreamReadOperator)
StreamReadOperator算子接收分片后會緩存在隊列Queue splits,然后不停從隊列中poll分片放到線程池中執(zhí)行
private void processSplits() throws IOException { format.open(split); consumeAsMiniBatch(split); enqueueProcessSplits(); }
主要有三個步驟: