免费爱碰视频在线观看,九九精品国产屋,欧美亚洲尤物久久精品,1024在线观看视频亚洲

      Apache Doris 整合 Iceberg + Flink CDC 構(gòu)建實(shí)時(shí)湖倉(cāng)一體的聯(lián)邦查詢(xún)

      Apache Doris 整合 Iceberg + Flink CDC 構(gòu)建實(shí)時(shí)湖倉(cāng)一體的聯(lián)邦查詢(xún)

      導(dǎo)讀:這是一篇非常完整全面的應(yīng)用技術(shù)干貨,手把手教你如何使用 Doris+Iceberg+Flink CDC 構(gòu)建實(shí)時(shí)湖倉(cāng)一體的聯(lián)邦查詢(xún)分析架構(gòu)。按照本文中步驟一步步完成,完整體驗(yàn)搭建操作的完整過(guò)程。

      作者 Apache Doris PMC 成員 張家鋒

      1.概覽

      這篇教程將展示如何使用 Doris+Iceberg+Flink CDC 構(gòu)建實(shí)時(shí)湖倉(cāng)一體的聯(lián)邦查詢(xún)分析,Doris 1.1版本提供了Iceberg的支持,本文主要展示Doris和Iceberg怎么使用,同時(shí)本教程整個(gè)環(huán)境是都基于偽分布式環(huán)境搭建,大家按照步驟可以一步步完成。完整體驗(yàn)整個(gè)搭建操作的過(guò)程。

      1.1 軟件環(huán)境

      本教程的演示環(huán)境如下:

    1. Centos7
    2. Apahce doris 1.1
    3. Hadoop 3.3.3
    4. hive 3.1.3
    5. Fink 1.14.4
    6. flink-sql-connector-mysql-cdc-2.2.1
    7. Apache Iceberg 0.13.2
    8. JDK 1.8.0_311
    9. MySQL 8.0.29
    10. wget https://archive.apache.org/dist/hadoop/core/hadoop-3.3.3/hadoop-3.3.3.tar.gzwget https://archive.apache.org/dist/hive/hive-3.1.3/apache-hive-3.1.3-bin.tar.gzwget https://dlcdn.apache.org/flink/flink-1.14.4/flink-1.14.4-bin-scala_2.12.tgzwget https://search.maven.org/remotecontent?filepath=org/apache/iceberg/iceberg-flink-runtime-1.14/0.13.2/iceberg-flink-runtime-1.14-0.13.2.jarwget https://repository.cloudera.com/artifactory/cloudera-repos/org/apache/flink/flink-shaded-hadoop-3-uber/3.1.1.7.2.9.0-173-9.0/flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar

      1.2 系統(tǒng)架構(gòu)

      我們整理架構(gòu)圖如下

    11. 首先我們從Mysql數(shù)據(jù)中使用Flink 通過(guò) Binlog完成數(shù)據(jù)的實(shí)時(shí)采集
    12. 然后再Flink 中創(chuàng)建 Iceberg 表,Iceberg的元數(shù)據(jù)保存在hive里
    13. 最后我們?cè)贒oris中創(chuàng)建Iceberg外表
    14. 在通過(guò)Doris 統(tǒng)一查詢(xún)?nèi)肟谕瓿蓪?duì)Iceberg里的數(shù)據(jù)進(jìn)行查詢(xún)分析,供前端應(yīng)用調(diào)用,這里iceberg外表的數(shù)據(jù)可以和Doris內(nèi)部數(shù)據(jù)或者Doris其他外部數(shù)據(jù)源的數(shù)據(jù)進(jìn)行關(guān)聯(lián)查詢(xún)分析
    15. Doris湖倉(cāng)一體的聯(lián)邦查詢(xún)架構(gòu)如下:

    16. Doris 通過(guò) ODBC 方式支持:MySQL,Postgresql,Oracle ,SQLServer
    17. 同時(shí)支持 Elasticsearch 外表
    18. 1.0版本支持Hive外表
    19. 1.1版本支持Iceberg外表
    20. 1.2版本支持Hudi 外表
    21. 2.環(huán)境安裝部署

      2.1 安裝Hadoop、Hive

      tar zxvf hadoop-3.3.3.tar.gztar zxvf apache-hive-3.1.3-bin.tar.gz

      配置系統(tǒng)環(huán)境變量

      export HADOOP_HOME=/data/hadoop-3.3.3export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoopexport HADOOP_HDFS_HOME=$HADOOP_HOMEexport HIVE_HOME=/data/hive-3.1.3export PATH=$PATH:$HADOOP_HOME/bin:$HIVE_HOME/bin:$HIVE_HOME/conf

      2.2 配置hdfs

      2.2.1 core-site.xml

      vi etc/hadoop/core-site.xml

      fs.defaultFS hdfs://localhost:9000

      2.2.2 hdfs-site.xml

      vi etc/hadoop/hdfs-site.xml

      dfs.replication 1 dfs.namenode.name.dir /data/hdfs/namenode dfs.datanode.data.dir /data/hdfs/datanode

      2.2.3 修改Hadoop啟動(dòng)腳本

      sbin/start-dfs.sh

      sbin/stop-dfs.sh

      在文件開(kāi)始加上下面的內(nèi)容

      HDFS_DATANODE_USER=rootHADOOP_SECURE_DN_USER=hdfsHDFS_NAMENODE_USER=rootHDFS_SECONDARYNAMENODE_USER=root

      sbin/start-yarn.sh

      sbin/stop-yarn.sh

      在文件開(kāi)始加上下面的內(nèi)容

      YARN_RESOURCEMANAGER_USER=rootHADOOP_SECURE_DN_USER=yarnYARN_NODEMANAGER_USER=root

      2.3 配置yarn

      這里我改變了Yarn的一些端口,因?yàn)槲沂菃螜C(jī)環(huán)境和Doris 的一些端口沖突。你可以不啟動(dòng)yarn

      vi etc/hadoop/yarn-site.xml

      yarn.resourcemanager.address jiafeng-test:50056 yarn.resourcemanager.scheduler.address jiafeng-test:50057 yarn.resourcemanager.resource-tracker.address jiafeng-test:50058 yarn.resourcemanager.admin.address jiafeng-test:50059 yarn.resourcemanager.webapp.address jiafeng-test:9090 yarn.nodemanager.localizer.address 0.0.0.0:50060 yarn.nodemanager.webapp.address 0.0.0.0:50062

      vi etc/hadoop/mapred-site.xm

      mapreduce.jobhistory.address 0.0.0.0:10020 mapreduce.jobhistory.webapp.address 0.0.0.0:19888 mapreduce.shuffle.port 50061

      2.2.4 啟動(dòng)hadoop

      sbin/start-all.sh

      2.4 配置Hive

      2.4.1 創(chuàng)建hdfs目錄

      hdfs dfs -mkdir -p /user/hive/warehousehdfs dfs -mkdir /tmphdfs dfs -chmod g+w /user/hive/warehousehdfs dfs -chmod g+w /tmp

      2.4.2 配置hive-site.xml

      javax.jdo.option.ConnectionURL jdbc:mysql://localhost:3306/hive?createDatabaseIfNotExist=true javax.jdo.option.ConnectionDriverName com.mysql.jdbc.Driver javax.jdo.option.ConnectionUserName root javax.jdo.option.ConnectionPassword MyNewPass4! hive.metastore.warehouse.dir /user/hive/warehouse location of default database for the warehouse hive.metastore.uris Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore. javax.jdo.PersistenceManagerFactoryClass org.datanucleus.api.jdo.JDOPersistenceManagerFactory hive.metastore.schema.verification false datanucleus.schema.autoCreateAll true

      2.4.3 配置 hive-env.sh

      加入以下內(nèi)容

      HADOOP_HOME=/data/hadoop-3.3.3

      2.4.4 hive元數(shù)據(jù)初始化

      schematool -initSchema -dbType mysql

      2.4.5 啟動(dòng)hive metaservice

      后臺(tái)運(yùn)行

      nohup bin/hive –service metaservice 1>/dev/null 2>&1 &

      驗(yàn)證

      lsof -i:9083COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAMEjava 20700 root 567u IPv6 54605348 0t0 TCP *:emc-pp-mgmtsvc (LISTEN)

      2.5 安裝MySQL

      具體請(qǐng)參照這里:

      使用 Flink CDC 實(shí)現(xiàn) MySQL 數(shù)據(jù)實(shí)時(shí)入 Apache Doris

      2.5.1 創(chuàng)建MySQL數(shù)據(jù)庫(kù)表并初始化數(shù)據(jù)

      CREATE DATABASE demo;USE demo;CREATE TABLE userinfo ( id int NOT NULL AUTO_INCREMENT, name VARCHAR(255) NOT NULL DEFAULT ‘flink’, address VARCHAR(1024), phone_number VARCHAR(512), email VARCHAR(255), PRIMARY KEY (`id`))ENGINE=InnoDB ;INSERT INTO userinfo VALUES (10001,’user_110′,’Shanghai’,’13347420870′, NULL);INSERT INTO userinfo VALUES (10002,’user_111′,’xian’,’13347420870′, NULL);INSERT INTO userinfo VALUES (10003,’user_112′,’beijing’,’13347420870′, NULL);INSERT INTO userinfo VALUES (10004,’user_113′,’shenzheng’,’13347420870′, NULL);INSERT INTO userinfo VALUES (10005,’user_114′,’hangzhou’,’13347420870′, NULL);INSERT INTO userinfo VALUES (10006,’user_115′,’guizhou’,’13347420870′, NULL);INSERT INTO userinfo VALUES (10007,’user_116′,’chengdu’,’13347420870′, NULL);INSERT INTO userinfo VALUES (10008,’user_117′,’guangzhou’,’13347420870′, NULL);INSERT INTO userinfo VALUES (10009,’user_118′,’xian’,’13347420870′, NULL);

      2.6 安裝 Flink

      tar zxvf flink-1.14.4-bin-scala_2.12.tgz

      然后需要將下面的依賴(lài)拷貝到Flink安裝目錄下的lib目錄下,具體的依賴(lài)的lib文件如下:

      下面將幾個(gè)Hadoop和Flink里沒(méi)有的依賴(lài)下載地址放在下面

      wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.2.1/flink-sql-connector-mysql-cdc-2.2.1.jarwget https://repo1.maven.org/maven2/org/apache/thrift/libfb303/0.9.3/libfb303-0.9.3.jarwget https://search.maven.org/remotecontent?filepath=org/apache/iceberg/iceberg-flink-runtime-1.14/0.13.2/iceberg-flink-runtime-1.14-0.13.2.jarwget https://repository.cloudera.com/artifactory/cloudera-repos/org/apache/flink/flink-shaded-hadoop-3-uber/3.1.1.7.2.9.0-173-9.0/flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar

      其他的:

      hadoop-3.3.3/share/hadoop/common/lib/commons-configuration2-2.1.1.jarhadoop-3.3.3/share/hadoop/common/lib/commons-logging-1.1.3.jarhadoop-3.3.3/share/hadoop/tools/lib/hadoop-archive-logs-3.3.3.jarhadoop-3.3.3/share/hadoop/common/lib/hadoop-auth-3.3.3.jarhadoop-3.3.3/share/hadoop/common/lib/hadoop-annotations-3.3.3.jarhadoop-3.3.3/share/hadoop/common/hadoop-common-3.3.3.jaradoop-3.3.3/share/hadoop/hdfs/hadoop-hdfs-3.3.3.jarhadoop-3.3.3/share/hadoop/client/hadoop-client-api-3.3.3.jarhive-3.1.3/lib/hive-exec-3.1.3.jarhive-3.1.3/lib/hive-metastore-3.1.3.jarhive-3.1.3/lib/hive-hcatalog-core-3.1.3.jar

      2.6.1 啟動(dòng)Flink

      bin/start-cluster.sh

      啟動(dòng)后的界面如下:

      2.6.2 進(jìn)入 Flink SQL Client

      bin/sql-client.sh embedded

      開(kāi)啟 checkpoint,每隔3秒做一次 checkpoint

      Checkpoint 默認(rèn)是不開(kāi)啟的,我們需要開(kāi)啟 Checkpoint 來(lái)讓 Iceberg 可以提交事務(wù)。 并且,mysql-cdc 在 binlog 讀取階段開(kāi)始前,需要等待一個(gè)完整的 checkpoint 來(lái)避免 binlog 記錄亂序的情況。

      注意:

      這里是演示環(huán)境,checkpoint的間隔設(shè)置比較短,線上使用,建議設(shè)置為3-5分鐘一次checkpoint。

      Flink SQL> SET execution.checkpointing.interval = 3s;[INFO] Session property has been set.

      2.6.3 創(chuàng)建Iceberg Catalog

      CREATE CATALOG hive_catalog WITH ( ‘type’=’iceberg’, ‘catalog-type’=’hive’, ‘uri’=’thrift://localhost:9083’, ‘clients’=’5’, ‘property-version’=’1’, ‘warehouse’=’hdfs://localhost:8020/user/hive/warehouse’);

      查看catalog

      Flink SQL> show catalogs;+—————–+| catalog name |+—————–+| default_catalog || hive_catalog |+—————–+2 rows in set

      2.6.4 創(chuàng)建 Mysql CDC 表

      CREATE TABLE user_source ( database_name STRING METADATA VIRTUAL, table_name STRING METADATA VIRTUAL, `id` DECIMAL(20, 0) NOT NULL, name STRING, address STRING, phone_number STRING, email STRING, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( ‘connector’ = ‘mysql-cdc’, ‘hostname’ = ‘localhost’, ‘port’ = ‘3306’, ‘username’ = ‘root’, ‘password’ = ‘MyNewPass4!’, ‘database-name’ = ‘demo’, ‘table-name’ = ‘userinfo’ );

      查詢(xún)CDC表:

      select * from user_source;

      2.6.5 創(chuàng)建Iceberg表

      —查看catalogshow catalogs;—使用cataloguse catalog hive_catalog;–創(chuàng)建數(shù)據(jù)庫(kù)CREATE DATABASE iceberg_hive; –使用數(shù)據(jù)庫(kù)use iceberg_hive;

      2.6.5.1 創(chuàng)建表

      CREATE TABLE all_users_info ( database_name STRING, table_name STRING, `id` DECIMAL(20, 0) NOT NULL, name STRING, address STRING, phone_number STRING, email STRING, PRIMARY KEY (database_name, table_name, `id`) NOT ENFORCED ) WITH ( ‘catalog-type’=’hive’ );

      從CDC表里插入數(shù)據(jù)到Iceberg表里

      use catalog default_catalog; insert into hive_catalog.iceberg_hive.all_users_info select * from user_source;

      在web界面可以看到任務(wù)的運(yùn)行情況

      然后停掉任務(wù),我們?nèi)ゲ樵?xún)iceberg表

      select * from hive_catalog.iceberg_hive.all_users_info

      可以看到下面的結(jié)果

      我們?nèi)dfs上可以看到hive目錄下的數(shù)據(jù)及對(duì)應(yīng)的元數(shù)據(jù)

      我們也可以通過(guò)Hive建好Iceberg表,然后通過(guò)Flink將數(shù)據(jù)插入到表里

      下載Iceberg Hive運(yùn)行依賴(lài)

      wget https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-hive-runtime/0.13.2/iceberg-hive-runtime-0.13.2.jar

      在hive shell下執(zhí)行:

      SET engine.hive.enabled=true; SET iceberg.engine.hive.enabled=true; SET iceberg.mr.catalog=hive; add jar /path/to/iiceberg-hive-runtime-0.13.2.jar;

      創(chuàng)建表

      CREATE EXTERNAL TABLE iceberg_hive( `id` int, `name` string)STORED BY ‘org.apache.iceberg.mr.hive.HiveIcebergStorageHandler’ LOCATION ‘hdfs://localhost:8020/user/hive/warehouse/iceber_db/iceberg_hive’TBLPROPERTIES ( ‘iceberg.mr.catalog’=’hadoop’, ‘iceberg.mr.catalog.hadoop.warehouse.location’=’hdfs://localhost:8020/user/hive/warehouse/iceber_db/iceberg_hive’ );

      然后再Flink SQL Client下執(zhí)行下面語(yǔ)句將數(shù)據(jù)插入到Iceber表里

      INSERT INTO hive_catalog.iceberg_hive.iceberg_hive values(2, ‘c’);INSERT INTO hive_catalog.iceberg_hive.iceberg_hive values(3, ‘zhangfeng’);

      查詢(xún)這個(gè)表

      select * from hive_catalog.iceberg_hive.iceberg_hive

      可以看到下面的結(jié)果

      3. Doris 查詢(xún) Iceberg

      Apache Doris 提供了 Doris 直接訪問(wèn) Iceberg 外部表的能力,外部表省去了繁瑣的數(shù)據(jù)導(dǎo)入工作,并借助 Doris 本身的 OLAP 的能力來(lái)解決 Iceberg 表的數(shù)據(jù)分析問(wèn)題:

    22. 支持 Iceberg 數(shù)據(jù)源接入Doris
    23. 支持 Doris 與 Iceberg 數(shù)據(jù)源中的表聯(lián)合查詢(xún),進(jìn)行更加復(fù)雜的分析操作
    24. 3.1安裝Doris

      這里我們不在詳細(xì)講解Doris的安裝,如果你不知道怎么安裝Doris請(qǐng)參照官方文檔:快速入門(mén)

      3.2 創(chuàng)建Iceberg外表

      CREATE TABLE `all_users_info` ENGINE = ICEBERGPROPERTIES (“iceberg.database” = “iceberg_hive”,”iceberg.table” = “all_users_info”,”iceberg.hive.metastore.uris” = “thrift://localhost:9083″,”iceberg.catalog.type” = “HIVE_CATALOG”);

      參數(shù)說(shuō)明:

      • ENGINE 需要指定為 ICEBERG
      • PROPERTIES 屬性:
        • iceberg.hive.metastore.uris:Hive Metastore 服務(wù)地址
        • iceberg.database:掛載 Iceberg 對(duì)應(yīng)的數(shù)據(jù)庫(kù)名
        • iceberg.table:掛載 Iceberg 對(duì)應(yīng)的表名,掛載 Iceberg database 時(shí)無(wú)需指定。
        • iceberg.catalog.type:Iceberg 中使用的 catalog 方式,默認(rèn)為 HIVE_CATALOG,當(dāng)前僅支持該方式,后續(xù)會(huì)支持更多的 Iceberg catalog 接入方式。

      mysql> CREATE TABLE `all_users_info` -> ENGINE = ICEBERG -> PROPERTIES ( -> “iceberg.database” = “iceberg_hive”, -> “iceberg.table” = “all_users_info”, -> “iceberg.hive.metastore.uris” = “thrift://localhost:9083”, -> “iceberg.catalog.type” = “HIVE_CATALOG” -> );Query OK, 0 rows affected (0.23 sec) mysql> select * from all_users_info;+—————+————+——-+———-+———–+————–+——-+| database_name | table_name | id | name | address | phone_number | email |+—————+————+——-+———-+———–+————–+——-+| demo | userinfo | 10004 | user_113 | shenzheng | 13347420870 | NULL || demo | userinfo | 10005 | user_114 | hangzhou | 13347420870 | NULL || demo | userinfo | 10002 | user_111 | xian | 13347420870 | NULL || demo | userinfo | 10003 | user_112 | beijing | 13347420870 | NULL || demo | userinfo | 10001 | user_110 | Shanghai | 13347420870 | NULL || demo | userinfo | 10008 | user_117 | guangzhou | 13347420870 | NULL || demo | userinfo | 10009 | user_118 | xian | 13347420870 | NULL || demo | userinfo | 10006 | user_115 | guizhou | 13347420870 | NULL || demo | userinfo | 10007 | user_116 | chengdu | 13347420870 | NULL |+—————+————+——-+———-+———–+————–+——-+9 rows in set (0.18 sec)

      3.3 同步掛載

      當(dāng) Iceberg 表 Schema 發(fā)生變更時(shí),可以通過(guò) REFRESH 命令手動(dòng)同步,該命令會(huì)將 Doris 中的 Iceberg 外表刪除重建。

      — 同步 Iceberg 表REFRESH TABLE t_iceberg; — 同步 Iceberg 數(shù)據(jù)庫(kù)REFRESH DATABASE iceberg_test_db;

      3.4 Doris 和 Iceberg 數(shù)據(jù)類(lèi)型對(duì)應(yīng)關(guān)系

      支持的 Iceberg 列類(lèi)型與 Doris 對(duì)應(yīng)關(guān)系如下表:

      ICEBERG

      DORIS

      描述

      BOOLEAN

      BOOLEAN

      INTEGER

      INT

      LONG

      BIGINT

      FLOAT

      FLOAT

      DOUBLE

      DOUBLE

      DATE

      DATE

      TIMESTAMP

      DATETIME

      Timestamp 轉(zhuǎn)成 Datetime 會(huì)損失精度

      STRING

      STRING

      UUID

      VARCHAR

      使用 VARCHAR 來(lái)代替

      DECIMAL

      DECIMAL

      TIME

      不支持

      FIXED

      不支持

      BINARY

      不支持

      STRUCT

      不支持

      LIST

      不支持

      MAP

      不支持

      3.5 注意事項(xiàng)

      • Iceberg 表 Schema 變更不會(huì)自動(dòng)同步,需要在 Doris 中通過(guò) REFRESH 命令同步 Iceberg 外表或數(shù)據(jù)庫(kù)。
      • 當(dāng)前默認(rèn)支持的 Iceberg 版本為 0.12.0,0.13.x,未在其他版本進(jìn)行測(cè)試。后續(xù)后支持更多版本。

      3.6 Doris FE 配置

      下面幾個(gè)配置屬于 Iceberg 外表系統(tǒng)級(jí)別的配置,可以通過(guò)修改 fe.conf 來(lái)配置,也可以通過(guò) ADMIN SET CONFIG 來(lái)配置。

      • iceberg_table_creation_strict_mode
      • 創(chuàng)建 Iceberg 表默認(rèn)開(kāi)啟 strict mode。 strict mode 是指對(duì) Iceberg 表的列類(lèi)型進(jìn)行嚴(yán)格過(guò)濾,如果有 Doris 目前不支持的數(shù)據(jù)類(lèi)型,則創(chuàng)建外表失敗。
      • iceberg_table_creation_interval_second
      • 自動(dòng)創(chuàng)建 Iceberg 表的后臺(tái)任務(wù)執(zhí)行間隔,默認(rèn)為 10s。
      • max_iceberg_table_creation_record_size
      • Iceberg 表創(chuàng)建記錄保留的最大值,默認(rèn)為 2000. 僅針對(duì)創(chuàng)建 Iceberg 數(shù)據(jù)庫(kù)記錄。

      4. 總結(jié)

      這里Doris On Iceberg我們只演示了Iceberg單表的查詢(xún),你還可以聯(lián)合Doris的表,或者其他的ODBC外表,Hive外表,ES外表等進(jìn)行聯(lián)合查詢(xún)分析,通過(guò)Doris對(duì)外提供統(tǒng)一的查詢(xún)分析入口。

      自此我們完整從搭建Hadoop,hive、flink 、Mysql、Doris 及Doris On Iceberg的使用全部介紹完了,Doris朝著數(shù)據(jù)倉(cāng)庫(kù)和數(shù)據(jù)融合的架構(gòu)演進(jìn),支持湖倉(cāng)一體的聯(lián)邦查詢(xún),給我們的開(kāi)發(fā)帶來(lái)更多的便利,更高效的開(kāi)發(fā),省去了很多數(shù)據(jù)同步的繁瑣工作,快快來(lái)體驗(yàn)吧。

      鄭重聲明:本文內(nèi)容及圖片均整理自互聯(lián)網(wǎng),不代表本站立場(chǎng),版權(quán)歸原作者所有,如有侵權(quán)請(qǐng)聯(lián)系管理員(admin#wlmqw.com)刪除。
      用戶(hù)投稿
      上一篇 2022年6月24日 09:12
      下一篇 2022年6月24日 09:13

      相關(guān)推薦

      聯(lián)系我們

      聯(lián)系郵箱:admin#wlmqw.com
      工作時(shí)間:周一至周五,10:30-18:30,節(jié)假日休息