蘇寧易購IT總部 大數據平臺高級技術經理 陳豐

筆者將整份報告按四大模塊梳理為:

第一部分,流計算平臺的發展歷程——從2014年到現在,4年多的發展歷程中,蘇寧經歷storm->spark streaming->flink的轉變,目前還在轉變中。形成storm(4000~虛機節點),flink&spark streaming(200+物理節點,on yarn模式)的規模,同時介紹了各引擎發展過程中的問題以及解決路徑;

第二部分,storm及spark streaming的缺點,從兼顧吞吐量和延時、高效的狀態管理、Exactly-Once的保證及Event-Time等要點闡述了蘇寧選擇flink的理由;

第三部分,蘇寧基于flink框架所做的具體工作。(1)平臺層功能豐富:sql語法豐富(distinct,流表join),算子自動擴縮容,connector(mysql, hbase,kafka1.0)以及sink降速(2)工具層:統一日志收集及展示、平臺層和業務層的統一監控管理平臺(3)服務層:Dlink 一站式開發平臺;

第四部分是在數據集成、機器學習和CEP等方面,談談蘇寧對未來的展望。

目前,陳豐主要負責蘇寧易購集團大數據流計算平臺建設,包括Storm、SparkStreaming、Flink等組件,經歷了流計算從組件化到平臺服務化到智能化的發展過程。對于大數據開源框架有較為豐富的經驗,在分布式計算架構設計和系統優化方面有自己的思考和領悟:

既然說到前世今生,首先介紹一下流計算平臺在蘇寧的整個發展歷程,怎么從Storm到目前很火的Flink,以及它的現狀,談談整體的架構以及它的整體集群規模。2018年上半年,蘇寧把主要精力都投向了Flink。

首先看一下平臺的發展歷程。

最早2014年蘇寧上線了第一個Storm的大屏展示任務,同年Storm整體的孵化平臺上線。到了2015年因為對于SQL開發的需求蘇寧還是比較多的,蘇寧自研了一套基于安踏做SQL的平臺。2016年基于吞吐量的上線,有了spark streaming,同年考慮到性能和流計算的痛點,把目光投向了Flink。到了2018年,Flink是蘇寧流計算基礎平臺重要的目標項目,將業務推到Flink上做,比如說Flink的開發平臺、管理平臺等等一系列配套的業務上線。

再看流計算在蘇寧的配套。Storm2014年就用了,整體規模和占比比較多50%,物理機1000多,虛擬機4000+,任務數1500+。蘇寧做Flink起步較晚,但調研時間比較長,目前占比占到15%,計劃未來1-2年都會把流計算底層平臺所有的都投入到Flink上。

為什么選擇Flink?從蘇寧業務層面來看,首先Storm和2.0的spark streaming都使用的是processing time,它處理的時間遠晚于數據產生的時間,產生大量的數據再1或2小時堆積后,數據是錯誤的,沒辦法接受的。第二個就是容錯能力,Storm只能做到 Exacly once。第三個就是中間狀態的維護,Storm維護不提供state的東西,做中間狀態的維護只能依靠第三方來做,那么業務開發的時候成本相對高一些,會寫很多的代碼,效果也不是很好,因為它用第三方組件的時候,有可能出現一致性問題,或重啟后計算結果不準確等等。從蘇寧的平臺來看,兩者都沒有辦法兼顧高吞吐、低延時,兩個性能互補,但不能兼顧。

調研階段,對Flink的各個優勢做過簡單的列表,Flink是一個設計的比較優雅的流計算框架,它能兼顧到低延時和高吞吐,同時支持Exacly once。

談談在功能擴展、服務平臺開發以及運行時管理系統方面的經驗分享。

首先說一下功能擴展。Flink sql從它出來就比較火,為什么,因為很簡單,SQL對于程序員來說非常熟悉,開發成本非常低,同時由于SQL是一個統一的標準,它的遷移成本非常低的,如果今天用了subeg SQL,明天出的新的組件,可以非常輕松的遷移到其它的組件上,它是通用的語法。所以蘇寧FlinkSQL上做了一系列的語法擴展。另外Connectors,可以打通不同組件的聯系。

最后結合業務痛點,聊一下在運行時的它的算子動態擴容縮容,以及Checkpoint動態調整,我們怎么實現怎么把它做出來的。

(1)首先看一下語法擴展。因為我們從StormSQL開始就做了純SQL的開發,純SQL開發起碼要支持DDL和DML,但是Flink社區明確的講述它不會做DDL的事情,這件事由我們自己做出來。然后是DML語言,對于電商領域來說很典型的事情就是統計UV,對于這種聚合也做了大量支持,支持on? group by,over? window,group by window。同時Flink版本有它的局限性,它的流數據和靜態數據是沒有辦法去做互相操作的,然后最后說一個batch window,后面會具體的說說。

count distinct,這個當時基于0.003做的,當時社區沒有提供,我個人認為是由于整體代碼的抽象上的問題,它沒有去做指導,只能1.7去實現了,方式和現代社區幾乎是基本上一致的。

多介紹一下Approx count distinct,它的目的其實和count distinct語法是一樣,也是去重復的結構,但是它的目的是用較小的計算精度的誤差換取巨大的計算資源的節省,比如說內存。同時這個語法符合Calcite標準的,也就是說是通用的語法,我們可以遷移到其它的引擎上。

這邊只能粗略的講,看它怎么工作的。首先一條SQL進來進入Calcite做語法解析、變換。然后轉到Data program的時候,我們做定制化的基數和函數,基數和函數不擴展講了,因為其實涉及的算法不少,我們實現了一系列的基數的函數,讓用戶選擇相對應的精度,然后對應它的資源消耗,讓用戶自己去做選擇。然后回到我們Data program這一層,進而轉化成用戶選擇的基礎方程。到了下一層,每條數據進來的時候將進行累加,輸出時可以把數據向下一層的sink進行觸發計算,可以提供相對完美的容錯能力。

(2)另外一個SQL Batch window,這個是蘇寧特色的一個名詞,我們看一下它業務需求的case,它需要統計每日PV、UV,我們在線計算要求延時盡可能低,不可能等到每天結束的時候零點再看到結果,這個不能接受的。業務的需求是每秒都能實時的檢測到PV、UV的變化,這個從開始到第一秒第二秒第三秒都能看到結果,這個是業務能夠接受的case,這個是輸出的頻率,這個頻率是可以定制化的。直到這個窗口的結束,我們的結果會被reset,重新開始被計算,這個是蘇寧常用的Batch window。

怎么用SQL實現Batch window?這點不難,但怎么體現到SQL語法,又不能破壞標準的SQL語法呢?滑動窗口它是可以做到很短的輸出,但是不能固定窗口,窗口滑動到下一秒。第三條就是定制trigger。第四條就是Cascading window,它的窗口是固定的,10點到11點,數據沒有超出的時候是不會被滑動的,同時做到及早的輸出,但是它的問題是每條數據都輸出,不能控制輸出頻率的。第二點如果TPS非常高,一秒一萬,一秒十萬DB吃不下,會造成瓶頸。

所以我們怎么實現蘇寧的Batch window?我們用最后講的這個,加上DDL,定義輸出的間隔,然后再使用我們自己實現的Periodical sink,它主要的目的是把每一條進來的數據都進行緩存,并且能夠根據輸出的頻率和數量的閥值進行定量的輸出,整個鏈路進行的數據都會觸發計算,每個數據出來之后進行緩存,舊值被新值覆蓋,直到task輸出的時候,首先滿足定性定量的輸出,第二個不會對于下層造成太多的壓力,因為定點定時輸出,TBS只有2000左右。這個時間我覺得還是有進一步擴展或者是優化的空間,比如說其實這個的話只在sink層面解決了需求,如果我們在Batch window里面不把數據進行一條條處理,而是進行批處理,我覺得計算的效果效率會真正提升,這個可能我們后面會去做這件事情。

剛才非常簡單的舉了幾個例子,說了一下SQL的擴展。說一下Connectors。這個我不會多說,因為內容不多,我會舉例子來說一說。

HBase Sink實現兩種模式,主要是考慮它的容錯性,現在不會只滿足于端到端正的容錯,我們還希望它能做到Flink和組件之間的容錯。于是我們針對不同的業務場景做了冪的插入模式,一種是mini? wbatch,容錯,有可能會Failover,要求Failover后業務重發的數據與Fail前完全一致,同時我要求table是單版本的,這么一個sink。同時考慮到效率和實時性,我們也做了兩種寫入模式,一個是one by one的同步寫入,效率比較高。還有mini batch,異步寫入的,它的演時比較高,但是可以做到定時和定量。

剛才講的是冪的插入模式,現在講非冪等插入模式。Failover后寫HBase結果與fail之前不同使用的WAL機制。我們用Checkpoint時,將mini batch寫入外部文件系統。Checkpoint完成,將mini batch寫HBase。

下面來說一說業務上也經常面對的這么一個問題,就是擴容縮容的問題。我們看一下流程的分析,首先業務開發兩種模式,一種是寫SQL,還有一種是寫Flink SQL,還有一種是用Datastream API而進行開發。上線之后發現并行不夠,需要擴容,擴容的話對于SQL來說,我能做到的是什么,我可以用原生的Flink提供的去進行工作,把鏈路上的節點都進行擴容或者縮容,同時對于重新打包發布,然后再去重新上線的那些,這兩種開發模式都有問題,SQL的開發面對雙十一零點的大促,我們需要改代碼,并且還需要有高的延遲,業務才能上線,這個我們不能接受??傮w對于SQL開發它的擴容是任務級別的,而對于Datastream成本太高了。

我們做了Operator,我們一開始考慮是從wrong time考慮這個事情的。如果說我們要從這一層做的話,首先對元碼改動比較多,第二個任務相對比較復雜,我們需要重新生成不同的job,同時還要有我們自己運行時的管理服務系統,我們會把某個需要去RESCALE的 job拿過來進行修改,再把提交新的JOB graph,做真正擴縮容的事情。這邊著重說一下這個DO RESCALE會再領任務,資源不會釋放的,資源部釋放意味著響應的時間非???,我們也做過實驗,基本上到達秒級別甚至百毫秒級別做到擴容縮容,這個就是我們的解決方案。

剛才介紹了基礎的組建的擴展或者優化,現在來聊聊平臺服務化。首先看一下流計算平臺的架構,從左往右看,這邊是數據元,底層進來之后有Storm,然后是Flink streaming和spark streaming,上面有我們運行時管理系統,主要的作用是對業務進行監控、運維、報警一系列的事情。再往上一層是自己開發的一個開發者平臺或者工具層,對于Storm來說有Storm SQL LIBRO,還有Stream SQL 還有,可視化流程開發,Datastream。再網上就是支持我們的業務,體育、易購、風控、物流、BI等業務層。

我是做平臺的,主要介紹一下平臺層,也是工具層,下面運維的這么一個系統。

平臺服務首先是Stream SQL開發平臺,還有就是這個可視化流程開發平臺。

我們的Stream SQL是元數據處理,通過拖拉拽動態的生成我們的語句,可以支持整個的流程開發,從編寫到測試到業務上線,都可以這個平臺去做,業務完全不用寫代碼,直接寫SQL,在上面做就行了。

第二個可視化流程開發,把功能拽上來,建立它們之間的關系就可以了,同樣可以做到流程生命周期的事情,都能涵蓋。

最后任務提交,我們對于這個Flink底層的元碼也做了修改,也是覺得它很多的關于Checkpoint很多的行為要通過代碼體現的,我們覺得這個非常不靈活,所以我們對于底層做了相應的修改,只需要在提交的時候對于進行配置,就能做到動態的去設置和修改它的相對應的行為,只要一鍵提交就可以了。

下面看一下運行時管理。運行時管理主要解決了一下這些事情。解決了Flink運行時以及歷史日志的問題,我們做平臺的時候,Flink的運行時日志可以通過原生的UI看的,但是在使用過程中去做歷史日志就相當有問題了,它往往要通過YARN日志查看,所以業務用的時候非常頭痛。針對這一點我們提供了統一的日志解決方案,同時還有一些子代的Metric的查詢,我們也弄出來做了統計和展示。同時我們也把一些比較重要的事件也從我們的APP里截出來,比如說交互啟停的動作做了展示和通集。其次就是剛才描述的運行時的運行調整,比如說調整Operator并行度,還有在線調整。最后還有告警。

日志查看,通過任務名查,也可以通過關鍵字搜索。Metrics監控也是類似的,可以卡時間范圍,也可以不同維度查詢,并且做了一系列的聚合,為用戶提供相對有效的信息,提供給用戶比較有用的信息。

對于事件的接觸我們也做了相對的統計,左邊可以看到備壓等等一系列事件的統計,我們可以統計Checkpoint成功率,以及Checkpoint它的打下分布等等一些事情。動態修改Checkpoint并行度。

最后簡單的展望一下未來,2019工作計劃。首先我們可能會考慮一下做機器學習,據官方所稱,對于迭代計算, Flink應該是比spark還要快的,看有沒有辦法實現流處理的機器學習的算法模型。第二點就是去做通用的數據集成,因為Flink首先實時計算,同時它也提供了很多sink或souser,把組件連接起來。第三個就是智能動態擴容,現在的擴容都是手動的,如果有可能的話可以用STM做一些算法。最后一個是CEP的事情。

未經允許不得轉載:存儲在線-存儲專業媒體 » 流計算在蘇寧的前世今生
分享到

zhangnn

相關推薦

精品国产午夜肉伦伦影院,双性老师灌满浓jing上课h,天天做天天爱夜夜爽,攵女乱h边做边走