導(dǎo)讀:這是一篇非常完整全面的應(yīng)用技術(shù)干貨,手把手教你如何使用 Doris+Iceberg+Flink CDC 構(gòu)建實(shí)時(shí)湖倉(cāng)一體的聯(lián)邦查詢分析架構(gòu)。按照本文中步驟一步步完成,完整體驗(yàn)搭建操作的完整過程。
作者 Apache Doris PMC 成員 張家鋒
1.概覽
這篇教程將展示如何使用 Doris+Iceberg+Flink CDC 構(gòu)建實(shí)時(shí)湖倉(cāng)一體的聯(lián)邦查詢分析,Doris 1.1版本提供了Iceberg的支持,本文主要展示Doris和Iceberg怎么使用,同時(shí)本教程整個(gè)環(huán)境是都基于偽分布式環(huán)境搭建,大家按照步驟可以一步步完成。完整體驗(yàn)整個(gè)搭建操作的過程。
1.1 軟件環(huán)境
本教程的演示環(huán)境如下:
wget https://archive.apache.org/dist/hadoop/core/hadoop-3.3.3/hadoop-3.3.3.tar.gzwget https://archive.apache.org/dist/hive/hive-3.1.3/apache-hive-3.1.3-bin.tar.gzwget https://dlcdn.apache.org/flink/flink-1.14.4/flink-1.14.4-bin-scala_2.12.tgzwget https://search.maven.org/remotecontent?filepath=org/apache/iceberg/iceberg-flink-runtime-1.14/0.13.2/iceberg-flink-runtime-1.14-0.13.2.jarwget https://repository.cloudera.com/artifactory/cloudera-repos/org/apache/flink/flink-shaded-hadoop-3-uber/3.1.1.7.2.9.0-173-9.0/flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar
1.2 系統(tǒng)架構(gòu)
我們整理架構(gòu)圖如下
Doris湖倉(cāng)一體的聯(lián)邦查詢架構(gòu)如下:
2.環(huán)境安裝部署
2.1 安裝Hadoop、Hive
tar zxvf hadoop-3.3.3.tar.gztar zxvf apache-hive-3.1.3-bin.tar.gz
配置系統(tǒng)環(huán)境變量
export HADOOP_HOME=/data/hadoop-3.3.3export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoopexport HADOOP_HDFS_HOME=$HADOOP_HOMEexport HIVE_HOME=/data/hive-3.1.3export PATH=$PATH:$HADOOP_HOME/bin:$HIVE_HOME/bin:$HIVE_HOME/conf
2.2 配置hdfs
2.2.1 core-site.xml
vi etc/hadoop/core-site.xml
fs.defaultFS hdfs://localhost:9000
2.2.2 hdfs-site.xml
vi etc/hadoop/hdfs-site.xml
dfs.replication 1 dfs.namenode.name.dir /data/hdfs/namenode dfs.datanode.data.dir /data/hdfs/datanode
2.2.3 修改Hadoop啟動(dòng)腳本
sbin/start-dfs.sh
sbin/stop-dfs.sh
在文件開始加上下面的內(nèi)容
HDFS_DATANODE_USER=rootHADOOP_SECURE_DN_USER=hdfsHDFS_NAMENODE_USER=rootHDFS_SECONDARYNAMENODE_USER=root
sbin/start-yarn.sh
sbin/stop-yarn.sh
在文件開始加上下面的內(nèi)容
YARN_RESOURCEMANAGER_USER=rootHADOOP_SECURE_DN_USER=yarnYARN_NODEMANAGER_USER=root
2.3 配置yarn
這里我改變了Yarn的一些端口,因?yàn)槲沂菃螜C(jī)環(huán)境和Doris 的一些端口沖突。你可以不啟動(dòng)yarn
vi etc/hadoop/yarn-site.xml
yarn.resourcemanager.address jiafeng-test:50056 yarn.resourcemanager.scheduler.address jiafeng-test:50057 yarn.resourcemanager.resource-tracker.address jiafeng-test:50058 yarn.resourcemanager.admin.address jiafeng-test:50059 yarn.resourcemanager.webapp.address jiafeng-test:9090 yarn.nodemanager.localizer.address 0.0.0.0:50060 yarn.nodemanager.webapp.address 0.0.0.0:50062
vi etc/hadoop/mapred-site.xm
mapreduce.jobhistory.address 0.0.0.0:10020 mapreduce.jobhistory.webapp.address 0.0.0.0:19888 mapreduce.shuffle.port 50061
2.2.4 啟動(dòng)hadoop
sbin/start-all.sh
2.4 配置Hive
2.4.1 創(chuàng)建hdfs目錄
hdfs dfs -mkdir -p /user/hive/warehousehdfs dfs -mkdir /tmphdfs dfs -chmod g+w /user/hive/warehousehdfs dfs -chmod g+w /tmp
2.4.2 配置hive-site.xml
javax.jdo.option.ConnectionURL jdbc:mysql://localhost:3306/hive?createDatabaseIfNotExist=true javax.jdo.option.ConnectionDriverName com.mysql.jdbc.Driver javax.jdo.option.ConnectionUserName root javax.jdo.option.ConnectionPassword MyNewPass4! hive.metastore.warehouse.dir /user/hive/warehouse location of default database for the warehouse hive.metastore.uris Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore. javax.jdo.PersistenceManagerFactoryClass org.datanucleus.api.jdo.JDOPersistenceManagerFactory hive.metastore.schema.verification false datanucleus.schema.autoCreateAll true
2.4.3 配置 hive-env.sh
加入以下內(nèi)容
HADOOP_HOME=/data/hadoop-3.3.3
2.4.4 hive元數(shù)據(jù)初始化
schematool -initSchema -dbType mysql
2.4.5 啟動(dòng)hive metaservice
后臺(tái)運(yùn)行
nohup bin/hive –service metaservice 1>/dev/null 2>&1 &
驗(yàn)證
lsof -i:9083COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAMEjava 20700 root 567u IPv6 54605348 0t0 TCP *:emc-pp-mgmtsvc (LISTEN)
2.5 安裝MySQL
具體請(qǐng)參照這里:
使用 Flink CDC 實(shí)現(xiàn) MySQL 數(shù)據(jù)實(shí)時(shí)入 Apache Doris
2.5.1 創(chuàng)建MySQL數(shù)據(jù)庫(kù)表并初始化數(shù)據(jù)
CREATE DATABASE demo;USE demo;CREATE TABLE userinfo ( id int NOT NULL AUTO_INCREMENT, name VARCHAR(255) NOT NULL DEFAULT ‘flink’, address VARCHAR(1024), phone_number VARCHAR(512), email VARCHAR(255), PRIMARY KEY (`id`))ENGINE=InnoDB ;INSERT INTO userinfo VALUES (10001,’user_110′,’Shanghai’,’13347420870′, NULL);INSERT INTO userinfo VALUES (10002,’user_111′,’xian’,’13347420870′, NULL);INSERT INTO userinfo VALUES (10003,’user_112′,’beijing’,’13347420870′, NULL);INSERT INTO userinfo VALUES (10004,’user_113′,’shenzheng’,’13347420870′, NULL);INSERT INTO userinfo VALUES (10005,’user_114′,’hangzhou’,’13347420870′, NULL);INSERT INTO userinfo VALUES (10006,’user_115′,’guizhou’,’13347420870′, NULL);INSERT INTO userinfo VALUES (10007,’user_116′,’chengdu’,’13347420870′, NULL);INSERT INTO userinfo VALUES (10008,’user_117′,’guangzhou’,’13347420870′, NULL);INSERT INTO userinfo VALUES (10009,’user_118′,’xian’,’13347420870′, NULL);
2.6 安裝 Flink
tar zxvf flink-1.14.4-bin-scala_2.12.tgz
然后需要將下面的依賴拷貝到Flink安裝目錄下的lib目錄下,具體的依賴的lib文件如下:
下面將幾個(gè)Hadoop和Flink里沒有的依賴下載地址放在下面
wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.2.1/flink-sql-connector-mysql-cdc-2.2.1.jarwget https://repo1.maven.org/maven2/org/apache/thrift/libfb303/0.9.3/libfb303-0.9.3.jarwget https://search.maven.org/remotecontent?filepath=org/apache/iceberg/iceberg-flink-runtime-1.14/0.13.2/iceberg-flink-runtime-1.14-0.13.2.jarwget https://repository.cloudera.com/artifactory/cloudera-repos/org/apache/flink/flink-shaded-hadoop-3-uber/3.1.1.7.2.9.0-173-9.0/flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar
其他的:
hadoop-3.3.3/share/hadoop/common/lib/commons-configuration2-2.1.1.jarhadoop-3.3.3/share/hadoop/common/lib/commons-logging-1.1.3.jarhadoop-3.3.3/share/hadoop/tools/lib/hadoop-archive-logs-3.3.3.jarhadoop-3.3.3/share/hadoop/common/lib/hadoop-auth-3.3.3.jarhadoop-3.3.3/share/hadoop/common/lib/hadoop-annotations-3.3.3.jarhadoop-3.3.3/share/hadoop/common/hadoop-common-3.3.3.jaradoop-3.3.3/share/hadoop/hdfs/hadoop-hdfs-3.3.3.jarhadoop-3.3.3/share/hadoop/client/hadoop-client-api-3.3.3.jarhive-3.1.3/lib/hive-exec-3.1.3.jarhive-3.1.3/lib/hive-metastore-3.1.3.jarhive-3.1.3/lib/hive-hcatalog-core-3.1.3.jar
2.6.1 啟動(dòng)Flink
bin/start-cluster.sh
啟動(dòng)后的界面如下:
2.6.2 進(jìn)入 Flink SQL Client
bin/sql-client.sh embedded
開啟 checkpoint,每隔3秒做一次 checkpoint
Checkpoint 默認(rèn)是不開啟的,我們需要開啟 Checkpoint 來讓 Iceberg 可以提交事務(wù)。 并且,mysql-cdc 在 binlog 讀取階段開始前,需要等待一個(gè)完整的 checkpoint 來避免 binlog 記錄亂序的情況。
注意:
這里是演示環(huán)境,checkpoint的間隔設(shè)置比較短,線上使用,建議設(shè)置為3-5分鐘一次checkpoint。
Flink SQL> SET execution.checkpointing.interval = 3s;[INFO] Session property has been set.
2.6.3 創(chuàng)建Iceberg Catalog
CREATE CATALOG hive_catalog WITH ( ‘type’=’iceberg’, ‘catalog-type’=’hive’, ‘uri’=’thrift://localhost:9083’, ‘clients’=’5’, ‘property-version’=’1’, ‘warehouse’=’hdfs://localhost:8020/user/hive/warehouse’);
查看catalog
Flink SQL> show catalogs;+—————–+| catalog name |+—————–+| default_catalog || hive_catalog |+—————–+2 rows in set
2.6.4 創(chuàng)建 Mysql CDC 表
CREATE TABLE user_source ( database_name STRING METADATA VIRTUAL, table_name STRING METADATA VIRTUAL, `id` DECIMAL(20, 0) NOT NULL, name STRING, address STRING, phone_number STRING, email STRING, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( ‘connector’ = ‘mysql-cdc’, ‘hostname’ = ‘localhost’, ‘port’ = ‘3306’, ‘username’ = ‘root’, ‘password’ = ‘MyNewPass4!’, ‘database-name’ = ‘demo’, ‘table-name’ = ‘userinfo’ );
查詢CDC表:
select * from user_source;
2.6.5 創(chuàng)建Iceberg表
—查看catalogshow catalogs;—使用cataloguse catalog hive_catalog;–創(chuàng)建數(shù)據(jù)庫(kù)CREATE DATABASE iceberg_hive; –使用數(shù)據(jù)庫(kù)use iceberg_hive;
2.6.5.1 創(chuàng)建表
CREATE TABLE all_users_info ( database_name STRING, table_name STRING, `id` DECIMAL(20, 0) NOT NULL, name STRING, address STRING, phone_number STRING, email STRING, PRIMARY KEY (database_name, table_name, `id`) NOT ENFORCED ) WITH ( ‘catalog-type’=’hive’ );
從CDC表里插入數(shù)據(jù)到Iceberg表里
use catalog default_catalog; insert into hive_catalog.iceberg_hive.all_users_info select * from user_source;
在web界面可以看到任務(wù)的運(yùn)行情況
然后停掉任務(wù),我們?nèi)ゲ樵僫ceberg表
select * from hive_catalog.iceberg_hive.all_users_info
可以看到下面的結(jié)果
我們?nèi)dfs上可以看到hive目錄下的數(shù)據(jù)及對(duì)應(yīng)的元數(shù)據(jù)
我們也可以通過Hive建好Iceberg表,然后通過Flink將數(shù)據(jù)插入到表里
下載Iceberg Hive運(yùn)行依賴
wget https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-hive-runtime/0.13.2/iceberg-hive-runtime-0.13.2.jar
在hive shell下執(zhí)行:
SET engine.hive.enabled=true; SET iceberg.engine.hive.enabled=true; SET iceberg.mr.catalog=hive; add jar /path/to/iiceberg-hive-runtime-0.13.2.jar;
創(chuàng)建表
CREATE EXTERNAL TABLE iceberg_hive( `id` int, `name` string)STORED BY ‘org.apache.iceberg.mr.hive.HiveIcebergStorageHandler’ LOCATION ‘hdfs://localhost:8020/user/hive/warehouse/iceber_db/iceberg_hive’TBLPROPERTIES ( ‘iceberg.mr.catalog’=’hadoop’, ‘iceberg.mr.catalog.hadoop.warehouse.location’=’hdfs://localhost:8020/user/hive/warehouse/iceber_db/iceberg_hive’ );
然后再Flink SQL Client下執(zhí)行下面語句將數(shù)據(jù)插入到Iceber表里
INSERT INTO hive_catalog.iceberg_hive.iceberg_hive values(2, ‘c’);INSERT INTO hive_catalog.iceberg_hive.iceberg_hive values(3, ‘zhangfeng’);
查詢這個(gè)表
select * from hive_catalog.iceberg_hive.iceberg_hive
可以看到下面的結(jié)果
3. Doris 查詢 Iceberg
Apache Doris 提供了 Doris 直接訪問 Iceberg 外部表的能力,外部表省去了繁瑣的數(shù)據(jù)導(dǎo)入工作,并借助 Doris 本身的 OLAP 的能力來解決 Iceberg 表的數(shù)據(jù)分析問題:
3.1安裝Doris
這里我們不在詳細(xì)講解Doris的安裝,如果你不知道怎么安裝Doris請(qǐng)參照官方文檔:快速入門
3.2 創(chuàng)建Iceberg外表
CREATE TABLE `all_users_info` ENGINE = ICEBERGPROPERTIES (“iceberg.database” = “iceberg_hive”,”iceberg.table” = “all_users_info”,”iceberg.hive.metastore.uris” = “thrift://localhost:9083″,”iceberg.catalog.type” = “HIVE_CATALOG”);
參數(shù)說明:
- ENGINE 需要指定為 ICEBERG
- PROPERTIES 屬性:
- iceberg.hive.metastore.uris:Hive Metastore 服務(wù)地址
- iceberg.database:掛載 Iceberg 對(duì)應(yīng)的數(shù)據(jù)庫(kù)名
- iceberg.table:掛載 Iceberg 對(duì)應(yīng)的表名,掛載 Iceberg database 時(shí)無需指定。
- iceberg.catalog.type:Iceberg 中使用的 catalog 方式,默認(rèn)為 HIVE_CATALOG,當(dāng)前僅支持該方式,后續(xù)會(huì)支持更多的 Iceberg catalog 接入方式。
mysql> CREATE TABLE `all_users_info` -> ENGINE = ICEBERG -> PROPERTIES ( -> “iceberg.database” = “iceberg_hive”, -> “iceberg.table” = “all_users_info”, -> “iceberg.hive.metastore.uris” = “thrift://localhost:9083”, -> “iceberg.catalog.type” = “HIVE_CATALOG” -> );Query OK, 0 rows affected (0.23 sec) mysql> select * from all_users_info;+—————+————+——-+———-+———–+————–+——-+| database_name | table_name | id | name | address | phone_number | email |+—————+————+——-+———-+———–+————–+——-+| demo | userinfo | 10004 | user_113 | shenzheng | 13347420870 | NULL || demo | userinfo | 10005 | user_114 | hangzhou | 13347420870 | NULL || demo | userinfo | 10002 | user_111 | xian | 13347420870 | NULL || demo | userinfo | 10003 | user_112 | beijing | 13347420870 | NULL || demo | userinfo | 10001 | user_110 | Shanghai | 13347420870 | NULL || demo | userinfo | 10008 | user_117 | guangzhou | 13347420870 | NULL || demo | userinfo | 10009 | user_118 | xian | 13347420870 | NULL || demo | userinfo | 10006 | user_115 | guizhou | 13347420870 | NULL || demo | userinfo | 10007 | user_116 | chengdu | 13347420870 | NULL |+—————+————+——-+———-+———–+————–+——-+9 rows in set (0.18 sec)
3.3 同步掛載
當(dāng) Iceberg 表 Schema 發(fā)生變更時(shí),可以通過 REFRESH 命令手動(dòng)同步,該命令會(huì)將 Doris 中的 Iceberg 外表刪除重建。
— 同步 Iceberg 表REFRESH TABLE t_iceberg; — 同步 Iceberg 數(shù)據(jù)庫(kù)REFRESH DATABASE iceberg_test_db;
3.4 Doris 和 Iceberg 數(shù)據(jù)類型對(duì)應(yīng)關(guān)系
支持的 Iceberg 列類型與 Doris 對(duì)應(yīng)關(guān)系如下表:
ICEBERG | DORIS | 描述 |
BOOLEAN | BOOLEAN | |
INTEGER | INT | |
LONG | BIGINT | |
FLOAT | FLOAT | |
DOUBLE | DOUBLE | |
DATE | DATE | |
TIMESTAMP | DATETIME | Timestamp 轉(zhuǎn)成 Datetime 會(huì)損失精度 |
STRING | STRING | |
UUID | VARCHAR | 使用 VARCHAR 來代替 |
DECIMAL | DECIMAL | |
TIME | – | 不支持 |
FIXED | – | 不支持 |
BINARY | – | 不支持 |
STRUCT | – | 不支持 |
LIST | – | 不支持 |
MAP | – | 不支持 |
3.5 注意事項(xiàng)
- Iceberg 表 Schema 變更不會(huì)自動(dòng)同步,需要在 Doris 中通過 REFRESH 命令同步 Iceberg 外表或數(shù)據(jù)庫(kù)。
- 當(dāng)前默認(rèn)支持的 Iceberg 版本為 0.12.0,0.13.x,未在其他版本進(jìn)行測(cè)試。后續(xù)后支持更多版本。
3.6 Doris FE 配置
下面幾個(gè)配置屬于 Iceberg 外表系統(tǒng)級(jí)別的配置,可以通過修改 fe.conf 來配置,也可以通過 ADMIN SET CONFIG 來配置。
- iceberg_table_creation_strict_mode
- 創(chuàng)建 Iceberg 表默認(rèn)開啟 strict mode。 strict mode 是指對(duì) Iceberg 表的列類型進(jìn)行嚴(yán)格過濾,如果有 Doris 目前不支持的數(shù)據(jù)類型,則創(chuàng)建外表失敗。
- iceberg_table_creation_interval_second
- 自動(dòng)創(chuàng)建 Iceberg 表的后臺(tái)任務(wù)執(zhí)行間隔,默認(rèn)為 10s。
- max_iceberg_table_creation_record_size
- Iceberg 表創(chuàng)建記錄保留的最大值,默認(rèn)為 2000. 僅針對(duì)創(chuàng)建 Iceberg 數(shù)據(jù)庫(kù)記錄。
4. 總結(jié)
這里Doris On Iceberg我們只演示了Iceberg單表的查詢,你還可以聯(lián)合Doris的表,或者其他的ODBC外表,Hive外表,ES外表等進(jìn)行聯(lián)合查詢分析,通過Doris對(duì)外提供統(tǒng)一的查詢分析入口。
自此我們完整從搭建Hadoop,hive、flink 、Mysql、Doris 及Doris On Iceberg的使用全部介紹完了,Doris朝著數(shù)據(jù)倉(cāng)庫(kù)和數(shù)據(jù)融合的架構(gòu)演進(jìn),支持湖倉(cāng)一體的聯(lián)邦查詢,給我們的開發(fā)帶來更多的便利,更高效的開發(fā),省去了很多數(shù)據(jù)同步的繁瑣工作,快快來體驗(yàn)吧。