在线不卡日本ⅴ一区v二区_精品一区二区中文字幕_天堂v在线视频_亚洲五月天婷婷中文网站

  • <menu id="lky3g"></menu>
  • <style id="lky3g"></style>
    <pre id="lky3g"><tt id="lky3g"></tt></pre>

    Apache Doris 整合 Iceberg + Flink CDC 構(gòu)建實(shí)時(shí)湖倉(cāng)一體的聯(lián)邦查詢

    Apache Doris 整合 Iceberg + Flink CDC 構(gòu)建實(shí)時(shí)湖倉(cāng)一體的聯(lián)邦查詢

    導(dǎo)讀:這是一篇非常完整全面的應(yīng)用技術(shù)干貨,手把手教你如何使用 Doris+Iceberg+Flink CDC 構(gòu)建實(shí)時(shí)湖倉(cāng)一體的聯(lián)邦查詢分析架構(gòu)。按照本文中步驟一步步完成,完整體驗(yàn)搭建操作的完整過程。

    作者 Apache Doris PMC 成員 張家鋒

    1.概覽

    這篇教程將展示如何使用 Doris+Iceberg+Flink CDC 構(gòu)建實(shí)時(shí)湖倉(cāng)一體的聯(lián)邦查詢分析,Doris 1.1版本提供了Iceberg的支持,本文主要展示Doris和Iceberg怎么使用,同時(shí)本教程整個(gè)環(huán)境是都基于偽分布式環(huán)境搭建,大家按照步驟可以一步步完成。完整體驗(yàn)整個(gè)搭建操作的過程。

    1.1 軟件環(huán)境

    本教程的演示環(huán)境如下:

  • Centos7
  • Apahce doris 1.1
  • Hadoop 3.3.3
  • hive 3.1.3
  • Fink 1.14.4
  • flink-sql-connector-mysql-cdc-2.2.1
  • Apache Iceberg 0.13.2
  • JDK 1.8.0_311
  • MySQL 8.0.29
  • wget https://archive.apache.org/dist/hadoop/core/hadoop-3.3.3/hadoop-3.3.3.tar.gzwget https://archive.apache.org/dist/hive/hive-3.1.3/apache-hive-3.1.3-bin.tar.gzwget https://dlcdn.apache.org/flink/flink-1.14.4/flink-1.14.4-bin-scala_2.12.tgzwget https://search.maven.org/remotecontent?filepath=org/apache/iceberg/iceberg-flink-runtime-1.14/0.13.2/iceberg-flink-runtime-1.14-0.13.2.jarwget https://repository.cloudera.com/artifactory/cloudera-repos/org/apache/flink/flink-shaded-hadoop-3-uber/3.1.1.7.2.9.0-173-9.0/flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar

    1.2 系統(tǒng)架構(gòu)

    我們整理架構(gòu)圖如下

  • 首先我們從Mysql數(shù)據(jù)中使用Flink 通過 Binlog完成數(shù)據(jù)的實(shí)時(shí)采集
  • 然后再Flink 中創(chuàng)建 Iceberg 表,Iceberg的元數(shù)據(jù)保存在hive里
  • 最后我們?cè)贒oris中創(chuàng)建Iceberg外表
  • 在通過Doris 統(tǒng)一查詢?nèi)肟谕瓿蓪?duì)Iceberg里的數(shù)據(jù)進(jìn)行查詢分析,供前端應(yīng)用調(diào)用,這里iceberg外表的數(shù)據(jù)可以和Doris內(nèi)部數(shù)據(jù)或者Doris其他外部數(shù)據(jù)源的數(shù)據(jù)進(jìn)行關(guān)聯(lián)查詢分析
  • Doris湖倉(cāng)一體的聯(lián)邦查詢架構(gòu)如下:

  • Doris 通過 ODBC 方式支持:MySQL,Postgresql,Oracle ,SQLServer
  • 同時(shí)支持 Elasticsearch 外表
  • 1.0版本支持Hive外表
  • 1.1版本支持Iceberg外表
  • 1.2版本支持Hudi 外表
  • 2.環(huán)境安裝部署

    2.1 安裝Hadoop、Hive

    tar zxvf hadoop-3.3.3.tar.gztar zxvf apache-hive-3.1.3-bin.tar.gz

    配置系統(tǒng)環(huán)境變量

    export HADOOP_HOME=/data/hadoop-3.3.3export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoopexport HADOOP_HDFS_HOME=$HADOOP_HOMEexport HIVE_HOME=/data/hive-3.1.3export PATH=$PATH:$HADOOP_HOME/bin:$HIVE_HOME/bin:$HIVE_HOME/conf

    2.2 配置hdfs

    2.2.1 core-site.xml

    vi etc/hadoop/core-site.xml

    fs.defaultFS hdfs://localhost:9000

    2.2.2 hdfs-site.xml

    vi etc/hadoop/hdfs-site.xml

    dfs.replication 1 dfs.namenode.name.dir /data/hdfs/namenode dfs.datanode.data.dir /data/hdfs/datanode

    2.2.3 修改Hadoop啟動(dòng)腳本

    sbin/start-dfs.sh

    sbin/stop-dfs.sh

    在文件開始加上下面的內(nèi)容

    HDFS_DATANODE_USER=rootHADOOP_SECURE_DN_USER=hdfsHDFS_NAMENODE_USER=rootHDFS_SECONDARYNAMENODE_USER=root

    sbin/start-yarn.sh

    sbin/stop-yarn.sh

    在文件開始加上下面的內(nèi)容

    YARN_RESOURCEMANAGER_USER=rootHADOOP_SECURE_DN_USER=yarnYARN_NODEMANAGER_USER=root

    2.3 配置yarn

    這里我改變了Yarn的一些端口,因?yàn)槲沂菃螜C(jī)環(huán)境和Doris 的一些端口沖突。你可以不啟動(dòng)yarn

    vi etc/hadoop/yarn-site.xml

    yarn.resourcemanager.address jiafeng-test:50056 yarn.resourcemanager.scheduler.address jiafeng-test:50057 yarn.resourcemanager.resource-tracker.address jiafeng-test:50058 yarn.resourcemanager.admin.address jiafeng-test:50059 yarn.resourcemanager.webapp.address jiafeng-test:9090 yarn.nodemanager.localizer.address 0.0.0.0:50060 yarn.nodemanager.webapp.address 0.0.0.0:50062

    vi etc/hadoop/mapred-site.xm

    mapreduce.jobhistory.address 0.0.0.0:10020 mapreduce.jobhistory.webapp.address 0.0.0.0:19888 mapreduce.shuffle.port 50061

    2.2.4 啟動(dòng)hadoop

    sbin/start-all.sh

    2.4 配置Hive

    2.4.1 創(chuàng)建hdfs目錄

    hdfs dfs -mkdir -p /user/hive/warehousehdfs dfs -mkdir /tmphdfs dfs -chmod g+w /user/hive/warehousehdfs dfs -chmod g+w /tmp

    2.4.2 配置hive-site.xml

    javax.jdo.option.ConnectionURL jdbc:mysql://localhost:3306/hive?createDatabaseIfNotExist=true javax.jdo.option.ConnectionDriverName com.mysql.jdbc.Driver javax.jdo.option.ConnectionUserName root javax.jdo.option.ConnectionPassword MyNewPass4! hive.metastore.warehouse.dir /user/hive/warehouse location of default database for the warehouse hive.metastore.uris Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore. javax.jdo.PersistenceManagerFactoryClass org.datanucleus.api.jdo.JDOPersistenceManagerFactory hive.metastore.schema.verification false datanucleus.schema.autoCreateAll true

    2.4.3 配置 hive-env.sh

    加入以下內(nèi)容

    HADOOP_HOME=/data/hadoop-3.3.3

    2.4.4 hive元數(shù)據(jù)初始化

    schematool -initSchema -dbType mysql

    2.4.5 啟動(dòng)hive metaservice

    后臺(tái)運(yùn)行

    nohup bin/hive –service metaservice 1>/dev/null 2>&1 &

    驗(yàn)證

    lsof -i:9083COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAMEjava 20700 root 567u IPv6 54605348 0t0 TCP *:emc-pp-mgmtsvc (LISTEN)

    2.5 安裝MySQL

    具體請(qǐng)參照這里:

    使用 Flink CDC 實(shí)現(xiàn) MySQL 數(shù)據(jù)實(shí)時(shí)入 Apache Doris

    2.5.1 創(chuàng)建MySQL數(shù)據(jù)庫(kù)表并初始化數(shù)據(jù)

    CREATE DATABASE demo;USE demo;CREATE TABLE userinfo ( id int NOT NULL AUTO_INCREMENT, name VARCHAR(255) NOT NULL DEFAULT ‘flink’, address VARCHAR(1024), phone_number VARCHAR(512), email VARCHAR(255), PRIMARY KEY (`id`))ENGINE=InnoDB ;INSERT INTO userinfo VALUES (10001,’user_110′,’Shanghai’,’13347420870′, NULL);INSERT INTO userinfo VALUES (10002,’user_111′,’xian’,’13347420870′, NULL);INSERT INTO userinfo VALUES (10003,’user_112′,’beijing’,’13347420870′, NULL);INSERT INTO userinfo VALUES (10004,’user_113′,’shenzheng’,’13347420870′, NULL);INSERT INTO userinfo VALUES (10005,’user_114′,’hangzhou’,’13347420870′, NULL);INSERT INTO userinfo VALUES (10006,’user_115′,’guizhou’,’13347420870′, NULL);INSERT INTO userinfo VALUES (10007,’user_116′,’chengdu’,’13347420870′, NULL);INSERT INTO userinfo VALUES (10008,’user_117′,’guangzhou’,’13347420870′, NULL);INSERT INTO userinfo VALUES (10009,’user_118′,’xian’,’13347420870′, NULL);

    2.6 安裝 Flink

    tar zxvf flink-1.14.4-bin-scala_2.12.tgz

    然后需要將下面的依賴拷貝到Flink安裝目錄下的lib目錄下,具體的依賴的lib文件如下:

    下面將幾個(gè)Hadoop和Flink里沒有的依賴下載地址放在下面

    wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.2.1/flink-sql-connector-mysql-cdc-2.2.1.jarwget https://repo1.maven.org/maven2/org/apache/thrift/libfb303/0.9.3/libfb303-0.9.3.jarwget https://search.maven.org/remotecontent?filepath=org/apache/iceberg/iceberg-flink-runtime-1.14/0.13.2/iceberg-flink-runtime-1.14-0.13.2.jarwget https://repository.cloudera.com/artifactory/cloudera-repos/org/apache/flink/flink-shaded-hadoop-3-uber/3.1.1.7.2.9.0-173-9.0/flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar

    其他的:

    hadoop-3.3.3/share/hadoop/common/lib/commons-configuration2-2.1.1.jarhadoop-3.3.3/share/hadoop/common/lib/commons-logging-1.1.3.jarhadoop-3.3.3/share/hadoop/tools/lib/hadoop-archive-logs-3.3.3.jarhadoop-3.3.3/share/hadoop/common/lib/hadoop-auth-3.3.3.jarhadoop-3.3.3/share/hadoop/common/lib/hadoop-annotations-3.3.3.jarhadoop-3.3.3/share/hadoop/common/hadoop-common-3.3.3.jaradoop-3.3.3/share/hadoop/hdfs/hadoop-hdfs-3.3.3.jarhadoop-3.3.3/share/hadoop/client/hadoop-client-api-3.3.3.jarhive-3.1.3/lib/hive-exec-3.1.3.jarhive-3.1.3/lib/hive-metastore-3.1.3.jarhive-3.1.3/lib/hive-hcatalog-core-3.1.3.jar

    2.6.1 啟動(dòng)Flink

    bin/start-cluster.sh

    啟動(dòng)后的界面如下:

    2.6.2 進(jìn)入 Flink SQL Client

    bin/sql-client.sh embedded

    開啟 checkpoint,每隔3秒做一次 checkpoint

    Checkpoint 默認(rèn)是不開啟的,我們需要開啟 Checkpoint 來讓 Iceberg 可以提交事務(wù)。 并且,mysql-cdc 在 binlog 讀取階段開始前,需要等待一個(gè)完整的 checkpoint 來避免 binlog 記錄亂序的情況。

    注意:

    這里是演示環(huán)境,checkpoint的間隔設(shè)置比較短,線上使用,建議設(shè)置為3-5分鐘一次checkpoint。

    Flink SQL> SET execution.checkpointing.interval = 3s;[INFO] Session property has been set.

    2.6.3 創(chuàng)建Iceberg Catalog

    CREATE CATALOG hive_catalog WITH ( ‘type’=’iceberg’, ‘catalog-type’=’hive’, ‘uri’=’thrift://localhost:9083’, ‘clients’=’5’, ‘property-version’=’1’, ‘warehouse’=’hdfs://localhost:8020/user/hive/warehouse’);

    查看catalog

    Flink SQL> show catalogs;+—————–+| catalog name |+—————–+| default_catalog || hive_catalog |+—————–+2 rows in set

    2.6.4 創(chuàng)建 Mysql CDC 表

    CREATE TABLE user_source ( database_name STRING METADATA VIRTUAL, table_name STRING METADATA VIRTUAL, `id` DECIMAL(20, 0) NOT NULL, name STRING, address STRING, phone_number STRING, email STRING, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( ‘connector’ = ‘mysql-cdc’, ‘hostname’ = ‘localhost’, ‘port’ = ‘3306’, ‘username’ = ‘root’, ‘password’ = ‘MyNewPass4!’, ‘database-name’ = ‘demo’, ‘table-name’ = ‘userinfo’ );

    查詢CDC表:

    select * from user_source;

    2.6.5 創(chuàng)建Iceberg表

    —查看catalogshow catalogs;—使用cataloguse catalog hive_catalog;–創(chuàng)建數(shù)據(jù)庫(kù)CREATE DATABASE iceberg_hive; –使用數(shù)據(jù)庫(kù)use iceberg_hive;

    2.6.5.1 創(chuàng)建表

    CREATE TABLE all_users_info ( database_name STRING, table_name STRING, `id` DECIMAL(20, 0) NOT NULL, name STRING, address STRING, phone_number STRING, email STRING, PRIMARY KEY (database_name, table_name, `id`) NOT ENFORCED ) WITH ( ‘catalog-type’=’hive’ );

    從CDC表里插入數(shù)據(jù)到Iceberg表里

    use catalog default_catalog; insert into hive_catalog.iceberg_hive.all_users_info select * from user_source;

    在web界面可以看到任務(wù)的運(yùn)行情況

    然后停掉任務(wù),我們?nèi)ゲ樵僫ceberg表

    select * from hive_catalog.iceberg_hive.all_users_info

    可以看到下面的結(jié)果

    我們?nèi)dfs上可以看到hive目錄下的數(shù)據(jù)及對(duì)應(yīng)的元數(shù)據(jù)

    我們也可以通過Hive建好Iceberg表,然后通過Flink將數(shù)據(jù)插入到表里

    下載Iceberg Hive運(yùn)行依賴

    wget https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-hive-runtime/0.13.2/iceberg-hive-runtime-0.13.2.jar

    在hive shell下執(zhí)行:

    SET engine.hive.enabled=true; SET iceberg.engine.hive.enabled=true; SET iceberg.mr.catalog=hive; add jar /path/to/iiceberg-hive-runtime-0.13.2.jar;

    創(chuàng)建表

    CREATE EXTERNAL TABLE iceberg_hive( `id` int, `name` string)STORED BY ‘org.apache.iceberg.mr.hive.HiveIcebergStorageHandler’ LOCATION ‘hdfs://localhost:8020/user/hive/warehouse/iceber_db/iceberg_hive’TBLPROPERTIES ( ‘iceberg.mr.catalog’=’hadoop’, ‘iceberg.mr.catalog.hadoop.warehouse.location’=’hdfs://localhost:8020/user/hive/warehouse/iceber_db/iceberg_hive’ );

    然后再Flink SQL Client下執(zhí)行下面語句將數(shù)據(jù)插入到Iceber表里

    INSERT INTO hive_catalog.iceberg_hive.iceberg_hive values(2, ‘c’);INSERT INTO hive_catalog.iceberg_hive.iceberg_hive values(3, ‘zhangfeng’);

    查詢這個(gè)表

    select * from hive_catalog.iceberg_hive.iceberg_hive

    可以看到下面的結(jié)果

    3. Doris 查詢 Iceberg

    Apache Doris 提供了 Doris 直接訪問 Iceberg 外部表的能力,外部表省去了繁瑣的數(shù)據(jù)導(dǎo)入工作,并借助 Doris 本身的 OLAP 的能力來解決 Iceberg 表的數(shù)據(jù)分析問題:

  • 支持 Iceberg 數(shù)據(jù)源接入Doris
  • 支持 Doris 與 Iceberg 數(shù)據(jù)源中的表聯(lián)合查詢,進(jìn)行更加復(fù)雜的分析操作
  • 3.1安裝Doris

    這里我們不在詳細(xì)講解Doris的安裝,如果你不知道怎么安裝Doris請(qǐng)參照官方文檔:快速入門

    3.2 創(chuàng)建Iceberg外表

    CREATE TABLE `all_users_info` ENGINE = ICEBERGPROPERTIES (“iceberg.database” = “iceberg_hive”,”iceberg.table” = “all_users_info”,”iceberg.hive.metastore.uris” = “thrift://localhost:9083″,”iceberg.catalog.type” = “HIVE_CATALOG”);

    參數(shù)說明:

    • ENGINE 需要指定為 ICEBERG
    • PROPERTIES 屬性:
      • iceberg.hive.metastore.uris:Hive Metastore 服務(wù)地址
      • iceberg.database:掛載 Iceberg 對(duì)應(yīng)的數(shù)據(jù)庫(kù)名
      • iceberg.table:掛載 Iceberg 對(duì)應(yīng)的表名,掛載 Iceberg database 時(shí)無需指定。
      • iceberg.catalog.type:Iceberg 中使用的 catalog 方式,默認(rèn)為 HIVE_CATALOG,當(dāng)前僅支持該方式,后續(xù)會(huì)支持更多的 Iceberg catalog 接入方式。

    mysql> CREATE TABLE `all_users_info` -> ENGINE = ICEBERG -> PROPERTIES ( -> “iceberg.database” = “iceberg_hive”, -> “iceberg.table” = “all_users_info”, -> “iceberg.hive.metastore.uris” = “thrift://localhost:9083”, -> “iceberg.catalog.type” = “HIVE_CATALOG” -> );Query OK, 0 rows affected (0.23 sec) mysql> select * from all_users_info;+—————+————+——-+———-+———–+————–+——-+| database_name | table_name | id | name | address | phone_number | email |+—————+————+——-+———-+———–+————–+——-+| demo | userinfo | 10004 | user_113 | shenzheng | 13347420870 | NULL || demo | userinfo | 10005 | user_114 | hangzhou | 13347420870 | NULL || demo | userinfo | 10002 | user_111 | xian | 13347420870 | NULL || demo | userinfo | 10003 | user_112 | beijing | 13347420870 | NULL || demo | userinfo | 10001 | user_110 | Shanghai | 13347420870 | NULL || demo | userinfo | 10008 | user_117 | guangzhou | 13347420870 | NULL || demo | userinfo | 10009 | user_118 | xian | 13347420870 | NULL || demo | userinfo | 10006 | user_115 | guizhou | 13347420870 | NULL || demo | userinfo | 10007 | user_116 | chengdu | 13347420870 | NULL |+—————+————+——-+———-+———–+————–+——-+9 rows in set (0.18 sec)

    3.3 同步掛載

    當(dāng) Iceberg 表 Schema 發(fā)生變更時(shí),可以通過 REFRESH 命令手動(dòng)同步,該命令會(huì)將 Doris 中的 Iceberg 外表刪除重建。

    — 同步 Iceberg 表REFRESH TABLE t_iceberg; — 同步 Iceberg 數(shù)據(jù)庫(kù)REFRESH DATABASE iceberg_test_db;

    3.4 Doris 和 Iceberg 數(shù)據(jù)類型對(duì)應(yīng)關(guān)系

    支持的 Iceberg 列類型與 Doris 對(duì)應(yīng)關(guān)系如下表:

    ICEBERG

    DORIS

    描述

    BOOLEAN

    BOOLEAN

    INTEGER

    INT

    LONG

    BIGINT

    FLOAT

    FLOAT

    DOUBLE

    DOUBLE

    DATE

    DATE

    TIMESTAMP

    DATETIME

    Timestamp 轉(zhuǎn)成 Datetime 會(huì)損失精度

    STRING

    STRING

    UUID

    VARCHAR

    使用 VARCHAR 來代替

    DECIMAL

    DECIMAL

    TIME

    不支持

    FIXED

    不支持

    BINARY

    不支持

    STRUCT

    不支持

    LIST

    不支持

    MAP

    不支持

    3.5 注意事項(xiàng)

    • Iceberg 表 Schema 變更不會(huì)自動(dòng)同步,需要在 Doris 中通過 REFRESH 命令同步 Iceberg 外表或數(shù)據(jù)庫(kù)。
    • 當(dāng)前默認(rèn)支持的 Iceberg 版本為 0.12.0,0.13.x,未在其他版本進(jìn)行測(cè)試。后續(xù)后支持更多版本。

    3.6 Doris FE 配置

    下面幾個(gè)配置屬于 Iceberg 外表系統(tǒng)級(jí)別的配置,可以通過修改 fe.conf 來配置,也可以通過 ADMIN SET CONFIG 來配置。

    • iceberg_table_creation_strict_mode
    • 創(chuàng)建 Iceberg 表默認(rèn)開啟 strict mode。 strict mode 是指對(duì) Iceberg 表的列類型進(jìn)行嚴(yán)格過濾,如果有 Doris 目前不支持的數(shù)據(jù)類型,則創(chuàng)建外表失敗。
    • iceberg_table_creation_interval_second
    • 自動(dòng)創(chuàng)建 Iceberg 表的后臺(tái)任務(wù)執(zhí)行間隔,默認(rèn)為 10s。
    • max_iceberg_table_creation_record_size
    • Iceberg 表創(chuàng)建記錄保留的最大值,默認(rèn)為 2000. 僅針對(duì)創(chuàng)建 Iceberg 數(shù)據(jù)庫(kù)記錄。

    4. 總結(jié)

    這里Doris On Iceberg我們只演示了Iceberg單表的查詢,你還可以聯(lián)合Doris的表,或者其他的ODBC外表,Hive外表,ES外表等進(jìn)行聯(lián)合查詢分析,通過Doris對(duì)外提供統(tǒng)一的查詢分析入口。

    自此我們完整從搭建Hadoop,hive、flink 、Mysql、Doris 及Doris On Iceberg的使用全部介紹完了,Doris朝著數(shù)據(jù)倉(cāng)庫(kù)和數(shù)據(jù)融合的架構(gòu)演進(jìn),支持湖倉(cāng)一體的聯(lián)邦查詢,給我們的開發(fā)帶來更多的便利,更高效的開發(fā),省去了很多數(shù)據(jù)同步的繁瑣工作,快快來體驗(yàn)吧。

    鄭重聲明:本文內(nèi)容及圖片均整理自互聯(lián)網(wǎng),不代表本站立場(chǎng),版權(quán)歸原作者所有,如有侵權(quán)請(qǐng)聯(lián)系管理員(admin#wlmqw.com)刪除。
    上一篇 2022年6月24日 09:12
    下一篇 2022年6月24日 09:13

    相關(guān)推薦

    聯(lián)系我們

    聯(lián)系郵箱:admin#wlmqw.com
    工作時(shí)間:周一至周五,10:30-18:30,節(jié)假日休息