作者 | 張軍
策劃 | 蔡芳芳
過去幾年,數(shù)據(jù)倉庫和數(shù)據(jù)湖方案在快速演進和彌補自身缺陷的同時,二者之間的邊界也逐漸淡化。云原生的新一代數(shù)據(jù)架構不再遵循數(shù)據(jù)湖或數(shù)據(jù)倉庫的單一經(jīng)典架構,而是在一定程度上結合二者的優(yōu)勢重新構建。在云廠商和開源技術方案的共同推動之下,2021 年我們將會看到更多“湖倉一體”的實際落地案例。InfoQ 希望通過選題的方式對數(shù)據(jù)湖和數(shù)倉融合架構在不同企業(yè)的落地情況、實踐過程、改進優(yōu)化方案等內容進行呈現(xiàn)。本文將分享同程藝龍將 Flink 與 Iceberg 深度集成的落地經(jīng)驗和思考。
背景及痛點
業(yè)務背景
同程藝龍是一個提供機票、住宿、交通等服務的在線旅游服務平臺,目前我所在的部門屬于公司的研發(fā)部門,主要職責是為公司內其他業(yè)務部門提供一些基礎服務,我們的大數(shù)據(jù)系統(tǒng)主要承接的業(yè)務是部門內的一些大數(shù)據(jù)相關的數(shù)據(jù)統(tǒng)計、分析工作等。數(shù)據(jù)來源有網(wǎng)關日志數(shù)據(jù)、服務器監(jiān)控數(shù)據(jù)、K8s 容器的相關日志數(shù)據(jù),App 的打點日志, MySQL 的 binlog 日志等。我們主要的大數(shù)據(jù)任務是基于上述日志構建實時報表,提供基于 Presto 的報表展示和即時查詢服務,同時也會基于 Flink 開發(fā)一些實時、批處理任務,為業(yè)務方提供準確及時的數(shù)據(jù)支撐。
原架構方案
由于我們所有的原始數(shù)據(jù)都是存儲在 Kafka 的,所以原來的技術架構就是首先是 Flink 任務消費 Kafka 的數(shù)據(jù),經(jīng)過 Flink SQL 或者 Flink jar 的各種處理之后實時寫入 Hive,其中絕大部分任務都是 Flink SQL 任務,因為我認為 SQL 開發(fā)相對代碼要簡單的多,并且維護方便、好理解,所以能用 SQL 寫的都盡量用 SQL 來寫。
提交 Flink 的平臺使用的是 Zeppelin,其中提交 Flink SQL 任務是 Zeppelin 自帶的功能,提交 jar 包任務是我自己基于 Application 模式開發(fā)的 Zeppelin 插件。
對于落地到 Hive 的數(shù)據(jù),使用開源的報表系統(tǒng) metabase (底層使用 Presto) 提供實時報表展示、定時發(fā)送郵件報表,以及自定義 SQL 查詢服務。由于業(yè)務對數(shù)據(jù)的實時性要求比較高,希望數(shù)據(jù)能盡快的展示出來,所以我們很多的 Flink 流式任務的 checkpoint 設置為 1 分鐘,數(shù)據(jù)格式采用的是 orc 格式。
痛點
由于采用的是列式存儲格式 ORC,無法像行式存儲格式那樣進行追加操作,所以不可避免的產(chǎn)生了一個大數(shù)據(jù)領域非常常見且非常棘手的問題,即 HDFS 小文件問題。
開始的時候我們的小文件解決方案是自己寫的一個小文件壓縮工具,定期去合并,我們的 Hive 分區(qū)一般都是天級別的,所以這個工具的原理就是每天凌晨啟動一個定時任務去壓縮昨天的數(shù)據(jù),首先把昨天的數(shù)據(jù)寫入一個臨時文件夾,壓縮完,和原來的數(shù)據(jù)進行記錄數(shù)的比對檢驗,數(shù)據(jù)條數(shù)一致之后,用壓縮后的數(shù)據(jù)覆蓋原來的數(shù)據(jù),但是由于無法保證事務,所以出現(xiàn)了很多問題:
壓縮的同時由于延遲數(shù)據(jù)的到來導致昨天的 Hive 分區(qū)又有數(shù)據(jù)寫入了,檢驗就會失敗,導致合并小文件失敗。
替換舊數(shù)據(jù)的操作是沒有事務保證的,如果替換的過程中舊分區(qū)有新的數(shù)據(jù)寫入,就會覆蓋新寫入的數(shù)據(jù),造成數(shù)據(jù)丟失。
沒有事務的支持,無法實時合并當前分區(qū)的數(shù)據(jù),只能合并壓縮前一個分區(qū)的,最新的分區(qū)數(shù)據(jù)仍然有小文件的問題,導致最新數(shù)據(jù)查詢性能提高不了。
Flink+Iceberg 的落地
Iceberg 技術調研
所以基于以上的 HDFS 小文件、查詢慢等問題,結合我們的現(xiàn)狀,我調研了目前市面上的數(shù)據(jù)湖技術:Delta、Apache Iceberg 和 Apache Hudi,考慮了目前數(shù)據(jù)湖框架支持的功能和以后的社區(qū)規(guī)劃,最終我們是選擇了 Iceberg,其中考慮的原因有以下幾方面:
Iceberg 深度集成 Flink
前面講到,我們的絕大部分任務都是 Flink 任務,包括批處理任務和流處理任務,目前這三個數(shù)據(jù)湖框架,Iceberg 是集成 Flink 做的最完善的,如果采用 Iceberg 替代 Hive 之后,遷移的成本非常小,對用戶幾乎是無感知的,
比如我們原來的 SQL 是這樣的:
遷移到 Iceberg 以后,只需要修改 catalog 就行。
Presto 查詢也是和這個類似,只需要修改 catalog 就行了。
Iceberg 的設計架構使得查詢更快
在 Iceberg 的設計架構中,manifest 文件存儲了分區(qū)相關信息、data files 的相關統(tǒng)計信息(max/min)等,去查詢一些大的分區(qū)的數(shù)據(jù),就可以直接定位到所要的數(shù)據(jù),而不是像 Hive 一樣去 list 整個 HDFS 文件夾,時間復雜度從 O(n) 降到了 O(1),使得一些大的查詢速度有了明顯的提升,在 Iceberg PMC Chair Ryan Blue 的演講中,我們看到命中 filter 的任務執(zhí)行時間從 61.5 小時降到了 22 分鐘。
使用 Flink SQL 將 CDC 數(shù)據(jù)寫入 Iceberg
Flink CDC 提供了直接讀取 MySQL binlog 的方式,相對以前需要使用 canal 讀取 binlog 寫入 Iceberg,然后再去消費 Iceberg 數(shù)據(jù)。少了兩個組件的維護,鏈路減少了,節(jié)省了維護的成本和出錯的概率。并且可以實現(xiàn)導入全量數(shù)據(jù)和增量數(shù)據(jù)的完美對接,所以使用 Flink SQL 將 MySQL binlog 數(shù)據(jù)導入 Iceberg 來做 MySQL->Iceberg 的導入將會是一件非常有意義的事情。
此外對于我們最初的壓縮小文件的需求,雖然 Iceberg 目前還無法實現(xiàn)自動壓縮,但是它提供了一個批處理任務,已經(jīng)能滿足我們的需求。
Hive 表遷移 Iceberg 表
遷移準備工作
目前我們的所有數(shù)據(jù)都是存儲在 Hive 表的,在驗證完 Iceberg 之后,我們決定將 Hive 的數(shù)據(jù)遷移到 Iceberg,所以我寫了一個工具,可以使用 Hive 的數(shù)據(jù),然后新建一個 Iceberg 表,為其建立相應的元數(shù)據(jù),但是測試的時候發(fā)現(xiàn),如果采用這種方式,需要把寫入 Hive 的程序停止,因為如果 Iceberg 和 Hive 使用同一個數(shù)據(jù)文件,而壓縮程序會不斷地壓縮 Iceberg 表的小文件,壓縮完之后,不會馬上刪除舊數(shù)據(jù),所以 Hive 表就會查到雙份的數(shù)據(jù),故我們采用雙寫的策略,原來寫入 Hive 的程序不動,新啟動一套程序寫入 Iceberg,這樣能對 Iceberg 表觀察一段時間。還能和原來 Hive 中的數(shù)據(jù)進行比對,來驗證程序的正確性。
經(jīng)過一段時間觀察,每天將近幾十億條數(shù)據(jù)、壓縮后幾個 T 大小的 Hive 表和 Iceberg 表,一條數(shù)據(jù)也不差。所以在最終對比數(shù)據(jù)沒有問題之后,把 Hive 表停止寫入,使用新的 Iceberg 表。
遷移工具
我將這個 Hive 表遷移 Iceberg 表的工具做成了一個基于 Flink batch job 的 Iceberg Action,提交了社區(qū),不過目前還沒合并:https://github.com/apache/iceberg/pull/2217。這個功能的思路是使用 Hive 原始的數(shù)據(jù)不動,然后新建一個 Iceberg table,再為這個新的 Iceberg table 生成對應的元數(shù)據(jù),大家有需要的話可以先看看。
此外,Iceberg 社區(qū),還有一個把現(xiàn)有的數(shù)據(jù)遷移到已存在的 Iceberg table 的工具,類似 Hive 的 LOAD DATA INPATH ... INTO TABLE ,是用 Spark 的存儲過程做的,大家也可以關注下:https://github.com/apache/iceberg/pull/2210
Iceberg 優(yōu)化實踐
壓縮小文件
目前壓縮小文件是采用的一個額外批任務來進行的,Iceberg 提供了一個 Spark 版本的 action,我在做功能測試的時候發(fā)現(xiàn)了一些問題,此外我對 Spark 也不是非常熟悉,擔心出了問題不好排查,所以參照 Spark 版本的自己實現(xiàn)了一個 Flink 版本,并修復了一些 bug,進行了一些功能的優(yōu)化。
由于我們的 Iceberg 的元數(shù)據(jù)都是存儲在 Hive 中的,也就是我們使用了 HiveCatalog,所以壓縮程序的邏輯是把 Hive 中所有的 Iceberg 表全部都查出來,依次壓縮。壓縮沒有過濾條件,不管是分區(qū)表還是非分區(qū)表,都進行全表的壓縮,這樣做是為了處理某些使用 eventtime 的 Flink 任務。如果有延遲的數(shù)據(jù)的到來,就會把數(shù)據(jù)寫入以前的分區(qū),如果不是全表壓縮只壓縮當天分區(qū)的話,新寫入的其他天的數(shù)據(jù)就不會被壓縮。
之所以沒有開啟定時任務來壓縮,是因為比如定時五分鐘壓縮一個表,如果五分鐘之內這個壓縮任務沒完成,沒有提交新的 snapshot,下一個定時任務又開啟了,就會把上一個沒有完成的壓縮任務中的數(shù)據(jù)重新壓縮一次,所以每個表依次壓縮的策略可以保證某一時刻一個表只有一個任務在壓縮。
代碼示例參考:
目前系統(tǒng)運行穩(wěn)定,已經(jīng)完成了幾萬次任務的壓縮。
注意:
不過目前對于新發(fā)布的 Iceberg 0.11 來說,還有一個已知的 bug,即當壓縮前的文件大小大于要壓縮的大?。╰argetSizeInBytes)時,會造成數(shù)據(jù)丟失,其實這個問題我在最開始測試小文件壓縮的時候就發(fā)現(xiàn)了,并且提了一個 pr,我的策略是大于目標文件的數(shù)據(jù)文件不參與壓縮,不過這個 pr 沒有合并到 0.11 版本中,后來社區(qū)另外一個兄弟也發(fā)現(xiàn)了相同的問題,提交了一個 pr( https://github.com/apache/iceberg/pull/2196 ) ,策略是將這個大文件拆分到目標文件大小,目前已經(jīng)合并到 master,會在下一個 bug fix 版本 0.11.1 中發(fā)布。
查詢優(yōu)化
批處理定時任務
目前對于定時調度中的批處理任務,F(xiàn)link 的 SQL 客戶端還沒 Hive 那樣做的很完善,比如執(zhí)行 hive-f 來執(zhí)行一個文件。而且不同的任務需要不同的資源,并行度等。
所以我自己封裝了一個 Flink 程序,通過調用這個程序來進行處理,讀取一個指定文件里面的 SQL,來提交批任務。在命令行控制任務的資源和并行度等。
優(yōu)化
批任務的查詢這塊,我做了一些優(yōu)化工作,比如 limit 下推,filter 下推,查詢并行度推斷等,可以大大提高查詢的速度,這些優(yōu)化都已經(jīng)推回給社區(qū),并且在 Iceberg 0.11 版本中發(fā)布。
運維管理
清理 orphan 文件
1. 定時任務刪除
在使用 Iceberg 的過程中,有時候會有這樣的情況,我提交了一個 Flink 任務,由于各種原因,把它停了,這個時候 Iceberg 還沒提交相應的快照。此外由于一些異常導致程序失敗,會產(chǎn)生一些不在 Iceberg 元數(shù)據(jù)里面的孤立的數(shù)據(jù)文件,這些文件對 Iceberg 來說是不可達的,也是沒用的。所以我們需要像 jvm 的垃圾回收一樣來清理這些文件。
目前 Iceberg 提供了一個 Spark 版本的 action 來處理這些沒用的文件,我們采取的策略和壓縮小文件一樣,獲取 Hive 中的所有的 Iceberg 表。每隔一個小時執(zhí)行一次定時任務來刪除這些沒用的文件。
2. 踩坑
我們在程序運行過程中出現(xiàn)了正常的數(shù)據(jù)文件被刪除的問題,經(jīng)過調研,由于快照保留設置是一小時,這個清理程序清理時間也是設置一個小時,通過日志發(fā)現(xiàn)是這個清理程序刪除了正常的數(shù)據(jù)。查了查代碼,應該是設置了一樣的時間,在清理孤立文件的時候,有其他程序正在讀取要 expired 的 snapshot,導致刪除了正常的數(shù)據(jù)。最后把這個清理程序的清理時間改成默認的三天,沒有再出現(xiàn)刪除數(shù)據(jù)文件的問題。
當然,為了保險起見,我們可以覆蓋原來的刪除文件的方法,改成將文件到一個備份文件夾,檢查沒有問題之后,手工刪除。
快照過期處理
我們的快照過期策略,是和壓縮小文件的批處理任務寫在一起的,壓縮完小文件之后,進行表的快照過期處理,目前保留的時間是一個小時。這是因為對于有一些比較大的表,分區(qū)比較多,而且 checkpoint 比較短,如果保留的快照過長的話,還是會保留過多小文件,我們暫時沒有查詢歷史快照的需求,所以我將快照的保留時間設置了一個小時。
數(shù)據(jù)管理
寫入了數(shù)據(jù)之后,當想查看相應的快照有多少數(shù)據(jù)文件時,直接查詢 Spark 無法知道哪個是有用的,哪個是沒用的。所以需要有對應的管理工具。目前 Flink 這塊還不太成熟,我們可以使用 Spark3 提供的工具來查看。
1. DDL
目前 create table 這些操作我們是通過 Flink SQL Client 來做的。其他相關的 DDL 的操作可以使用 Spark 來做:
https://iceberg.apache.org/spark/#ddl-commands
2. DML
一些相關的數(shù)據(jù)的操作,比如刪除數(shù)據(jù)等可以通過 MySQL 來實現(xiàn),Presto 目前只支持分區(qū)級別的刪除功能。
3. show partitions & show create table
在我們操作 Hive 的時候,有一些很常用的操作,比如 show partitions、 show create table 等,這些目前 Flink 還沒有支持,所以在操作 Iceberg 的時候就很不方便,我們自己基于 Flink 1.12 做 了修改,不過目前還沒有完全提交到社區(qū),后續(xù)有時間會提交到 Flink 和 Iceberg 社區(qū)。
后續(xù)工作
Flink SQL 接入 CDC 數(shù)據(jù)到 Iceberg
目前在我們內部的版本中,我已經(jīng)測試通過可以使用 Flink SQL 將 CDC 數(shù)據(jù)(比如 MySQL binlog)寫入 Iceberg,社區(qū)的版本中實現(xiàn)該功能還需要做一些工作,我也提交了一些相關的 PR 來推進這個工作。
使用 SQL 進行刪除和更新
使用 Flink SQL 進行 streaming read
在工作中會有一些這樣的場景,由于數(shù)據(jù)比較大,Iceberg 的數(shù)據(jù)只存了較短的時間,如果很不幸因為程序寫錯了等原因,想從更早的時間來消費就無能為力了。
當引入了 Iceberg 的 streaming read 之后,這些問題就可以解決了,因為 Iceberg 存儲了所有的數(shù)據(jù),當然這里有一個前提就是對于數(shù)據(jù)沒有要求特別精確,比如達到秒級別,因為目前 Flink 寫入 Iceberg 的事務提交是基于 Flink Checkpoint 間隔的。
收益及總結
經(jīng)過對 Iceberg 大概一個季度的調研,測試,優(yōu)化和 bug 修復,我們將現(xiàn)有的 Hive 表都遷移到了 Iceberg,完美解決了原來的所有的痛點問題,目前系統(tǒng)穩(wěn)定運行,而且相對 Hive 得到了很多的收益:
Flink 寫入的資源減少
舉一個例子,默認配置下,原來一個 flink 讀取 kafka 寫入 hive 的任務,需要60個并行度才不會讓 Kafka 產(chǎn)生積壓。改成寫入 iceberg 之后,只需要20個并行度就夠了.
查詢速度變快
前面我們講到 Iceberg 查詢的時候不會像 Hive 一樣去 list 整個文件夾來獲取分區(qū)數(shù)據(jù),而是先從 manifest 文件中獲取相關數(shù)據(jù),查詢的性能得到了顯著的提升,一些大的報表的查詢速度從 50 秒提高到 30 秒。
并發(fā)讀寫
由于 Iceberg 的事務支持,我們可以實現(xiàn)對一個表進行并發(fā)讀寫,F(xiàn)link 流式數(shù)據(jù)實時入湖,壓縮程序同時壓縮小文件,清理過期文件和快照的程序同時清理無用的文件,這樣就能更及時的提供數(shù)據(jù),做到分鐘級的延遲,查詢最新分區(qū)數(shù)據(jù)的速度大大加快了,并且由于 Iceberg 的 ACID 特性可以保證數(shù)據(jù)的準確性。
time travel
可以回溯查詢以前某一時刻的數(shù)據(jù)。
總結一下,我們目前可以實現(xiàn)使用 Flink SQL 對 Iceberg 進行批、流的讀寫,并可以對小文件進行實時的壓縮,使用 Spark SQL 做一些 delete 和 update 工作以及一些 DDL 操作,后續(xù)可以使用 Flink SQL 將 CDC 的數(shù)據(jù)寫入 Iceberg。目前對 Iceberg 的所有的優(yōu)化和 bug fix,我已經(jīng)貢獻給社區(qū)。由于筆者水平有限,有時候也難免有錯誤,還請大家不吝賜教。
作者介紹:
張軍,同程藝龍大數(shù)據(jù)開發(fā)工程師