<video id="71low"></video>

            ITPub博客

            首頁 > IT職業 > IT生活 > Flink Kafka Connector 與 Exactly Once 剖析

            Flink Kafka Connector 與 Exactly Once 剖析

            IT生活 作者:吐泡泡ooO 時間:2019-10-23 17:15:59 0 刪除 編輯

            Flink Kafka Connector 是 Flink 內置的 Kafka 連接器,它包含了從 Kafka Topic 讀入數據的 Flink Kafka Consumer 以及向 Kafka Topic 寫出數據的 Flink Kafka Producer,除此之外 Flink Kafa Connector 基于 Flink Checkpoint 機制提供了完善的容錯能力。本文從 Flink Kafka Connector 的基本使用到 Kafka 在 Flink 中端到端的容錯原理展開討論。

            1.Flink Kafka 的使用

            在 Flink 中使用 Kafka Connector 時需要依賴 Kafka 的版本,Flink 針對不同的 Kafka 版本提供了對應的 Connector 實現。

            1.1 版本依賴

            既然 Flink 對不同版本的 Kafka 有不同實現,在使用時需要注意區分,根據使用環境引入正確的依賴關系。

            <dependency>
              <groupId>org.apache.flink</groupId>
              <artifactId>${flink_kafka_connector_version}</artifactId>
              <version>${flink_version}</version>
            </dependency>

            在上面的依賴配置中 ${flink_version} 指使用 Flink 的版本,${flink_connector_kafka_version} 指依賴的 Kafka connector 版本對應的 artifactId。下表描述了截止目前為止 Kafka 服務版本與 Flink Connector 之間的對應關系。

            Flink 官網內容 Apache Kafka Connector 中也有詳細的說明。

            鏈接: https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/kafka.html

            從 Flink 1.7 版本開始為 Kafka 1.0.0 及以上版本提供了全新的 Kafka Connector 支持,如果使用的 Kafka 版本在 1.0.0 及以上可以忽略因 Kafka 版本差異帶來的依賴變化。

            1.2 基本使用

            明確了使用的 Kafka 版本后就可以編寫一個基于 Flink Kafka 讀/寫的應用程序「本文討論內容全部基于 Flink 1.7 版本和 Kafka 1.1.0 版本」。根據上面描述的對應關系在工程中添加 Kafka Connector 依賴。

            <dependency>
              <groupId>org.apache.flink</groupId>
              <artifactId>flink-connector-kafka_2.11</artifactId>
              <version>1.7.0</version>
            </dependency>

            下面的代碼片段是從 Kafka Topic「flink_kafka_poc_input」中消費數據,再寫入 Kafka Topic「flink_kafka_poc_output」的簡單示例。示例中除了讀/寫 Kafka Topic 外,沒有做其他的邏輯處理。

            public static void main(String[] args) {
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              /** 初始化 Consumer 配置 */
              Properties consumerConfig = new Properties();
              consumerConfig.setProperty("bootstrap.servers", "127.0.0.1:9091");
              consumerConfig.setProperty("group.id", "flink_poc_k110_consumer");
              /** 初始化 Kafka Consumer */
              FlinkKafkaConsumer<String> flinkKafkaConsumer = 
                new FlinkKafkaConsumer<String>(
                  "flink_kafka_poc_input", 
                  new SimpleStringSchema(), 
                  consumerConfig
                );
              /** 將 Kafka Consumer 加入到流處理 */
              DataStream<String> stream = env.addSource(flinkKafkaConsumer);
              /** 初始化 Producer 配置 */
              Properties producerConfig = new Properties();
              producerConfig.setProperty("bootstrap.servers", "127.0.0.1:9091");
              /** 初始化 Kafka Producer */
              FlinkKafkaProducer<String> myProducer = 
                new FlinkKafkaProducer<String>(
                  "flink_kafka_poc_output", 
                  new MapSerialization(), 
                  producerConfig
                );
              /** 將 Kafka Producer 加入到流處理 */
              stream.addSink(myProducer);
              /** 執行 */
              env.execute();
            }
            class MapSerialization implements SerializationSchema<String> {
              public byte[] serialize(String element) {
                return element.getBytes();
              }
            }

            Flink API 使用起來確實非常簡單,調用 addSource 方法和 addSink 方法就可以將初始化好的 FlinkKafkaConsumer 和 FlinkKafkaProducer 加入到流處理中。execute 執行后,KafkaConsumer 和 KafkaProducer 就可以開始正常工作了。

            2.Flink Kafka 的容錯

            眾所周知,Flink 支持 Exactly-once semantics。什么意思呢?翻譯過來就是「恰好一次語義」。流處理系統中,數據源源不斷的流入到系統、被處理、最后輸出結果。我們都不希望系統因人為或外部因素產生任何意想不到的結果。對于 Exactly-once 語義達到的目的是指即使系統被人為停止、因故障 shutdown、無故關機等任何因素停止運行狀態時,對于系統中的每條數據不會被重復處理也不會少處理。

            2.1 Flink Exactly-once

            Flink 宣稱支持 Exactly-once 其針對的是 Flink 應用內部的數據流處理。但 Flink 應用內部要想處理數據首先要有數據流入到 Flink 應用,其次 Flink 應用對數據處理完畢后也理應對數據做后續的輸出。在 Flink 中數據的流入稱為 Source,數據的后續輸出稱為 Sink,對于 Source 和 Sink 完全依靠外部系統支撐(比如 Kafka)。

            Flink 自身是無法保證外部系統的 Exactly-once 語義。但這樣一來其實并不能稱為完整的 Exactly-once,或者說 Flink 并不能保證端到端 Exactly-once。而對于數據精準性要求極高的系統必須要保證端到端的 Exactly-once,所謂端到端是指 Flink 應用從 Source 一端開始到 Sink 一端結束,數據必經的起始和結束兩個端點。

            那么如何實現端到端的 Exactly-once 呢?Flink 應用所依賴的外部系統需要提供 Exactly-once 支撐,并結合 Flink 提供的 Checkpoint 機制和 Two Phase Commit 才能實現 Flink 端到端的 Exactly-once。對于 Source 和 Sink 的容錯保障,Flink 官方給出了具體說明:

            Fault Tolerance Guarantees of Data Sources and Sinks: https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/guarantees.html

            2.2 Flink Checkpoint

            在討論基于 Kafka 端到端的 Exactly-once 之前先簡單了解一下 Flink Checkpoint。Flink Checkpoint 是 Flink 用來實現應用一致性快照的核心機制,當 Flink 因故障或其他原因重啟后可以通過最后一次成功的 Checkpoint 將應用恢復到當時的狀態。如果在應用中啟用了 Checkpoint,會由 JobManager 按指定時間間隔觸發 Checkpoint,Flink 應用內所有帶狀態的 Operator 會處理每一輪 Checkpoint 生命周期內的幾個狀態。

            • initializeState

            由 CheckpointedFunction 接口定義。Task 啟動時獲取應用中所有實現了CheckpointedFunction 的 Operator,并觸發執行 initializeState 方法。在方法的實現中一般都是從狀態后端將快照狀態恢復。

            • snapshotState

            由 CheckpointedFunction 接口定義。JobManager 會定期發起 Checkpoint,Task 接收到 Checkpoint 后獲取應用中所有實現了 CheckpointedFunction 的 Operator 并觸發執行對應的 snapshotState 方法。

            JobManager 每發起一輪 Checkpoint 都會攜帶一個自增的 checkpointId,這個 checkpointId 代表了快照的輪次。

            public interface CheckpointedFunction {
              void snapshotState(FunctionSnapshotContext context) throws Exception;
              void initializeState(FunctionInitializationContext context) throws Exception;
            }
            • notifyCheckpointComplete

            由 CheckpointListener 接口定義。當基于同一個輪次(checkpointId 相同)的Checkpoint 快照全部處理成功后獲取應用中所有實現了 CheckpointListener 的 Operator 并觸發執行 notifyCheckpointComplete 方法。觸發 notifyCheckpointComplete 方法時攜帶的 checkpointId 參數用來告訴 Operator 哪一輪 Checkpoint 已經完成。

            public interface CheckpointListener {
              void notifyCheckpointComplete(long checkpointId) throws Exception;
            }

            3. Flink Kafka 端到端 Exactly-once

            Kafka 是非常受歡迎的分布式消息系統,在 Flink 中它可以作為 Source,同時也可以作為 Sink。Kafka 0.11.0 及以上版本提供了對事務的支持,這讓 Flink 應用搭載 Kafka 實現端到端的 exactly-once 成為了可能。下面我們就來深入了解提供了事務支持的 Kafka 是如何與 Flink 結合實現端到端 exactly-once 的。

            本文忽略了 Barrier 機制,所以示例和圖中都以單線程為例。Barrier 在《Flink Checkpoint 原理》有較多討論。

            3.1 Flink Kafka Consumer

            Kafka 自身提供了可重復消費消息的能力,Flink 結合 Kafka 的這個特性以及自身 Checkpoint 機制,得以實現 Flink Kafka Consumer 的容錯。

            Flink Kafka Consumer 是 Flink 應用從 Kafka 獲取數據流消息的一個實現。除了數據流獲取、數據發送下游算子這些基本功能外它還提供了完善的容錯機制。這些特性依賴了其內部的一些組件以及內置的數據結構協同處理完成。這里,我們先簡單了解這些組件和內置數據結構的職責,再結合 Flink 運行時 和 故障恢復時 兩個不同的處理時機來看一看它們之間是如何協同工作的。

            Kafka Topic 元數據

            從 Kafka 消費數據的前提是需要知道消費哪個 topic,這個 topic 有多少個 partition。組件 AbstractPartitionDiscoverer 負責獲得指定 topic 的元數據信息,并將獲取到的 topic 元數據信息封裝成 KafkaTopicPartition 集合。

            • KafkaTopicPartition

            KafkaTopicPartition 結構用于記錄 topic 與 partition 的對應關系,內部定義了 String topic 和 int partition 兩個主要屬性。假設 topic A 有 2 個分區,通過組件 AbstractPartitionDiscoverer 處理后將得到由兩個 KafkaTopicPartition 對象組成的集合:KafkaTopicPartition(topic:A, partition:0) 和 KafkaTopicPartition(topic:A, partition:1)

            • Kafka 數據消費

            作為 Flink Source,Flink Kafka Consumer 最主要的職責就是能從 Kafka 中獲取數據,交給下游處理。在 Kafka Consumer 中 AbstractFetcher 組件負責完成這部分功能。除此之外 Fetcher 還負責 offset 的提交、KafkaTopicPartitionState 結構的數據維護。

            • KafkaTopicPartitionState

            KafkaTopicPartitionState 是一個非常核心的數據結構,基于內部的 4 個基本屬性,Flink Kafka Consumer 維護了 topic、partition、已消費 offset、待提交 offset 的關聯關系。Flink Kafka Consumer 的容錯機制依賴了這些數據。

            除了這 4 個基本屬性外 KafkaTopicPartitionState 還有兩個子類,一個是支持 PunctuatedWatermark 的實現,另一個是支持 PeriodicWatermark 的實現,這兩個子類在原有基礎上擴展了對水印的支持,我們這里不做過多討論。

            • 狀態持久化

            Flink Kafka Consumer 的容錯性依靠的是狀態持久化,也可以稱為狀態快照。對于Flink Kafka Consumer 來說,這個狀態持久化具體是對 topic、partition、已消費 offset 的對應關系做持久化。

            在實現中,使用 ListState> 定義了狀態存儲結構,在這里 Long 表示的是 offset 類型,所以實際上就是使用 KafkaTopicPartition 和 offset 組成了一個對兒,再添加到狀態后端集合。

            • 狀態恢復

            當狀態成功持久化后,一旦應用出現故障,就可以用最近持久化成功的快照恢復應用狀態。在實現中,狀態恢復時會將快照恢復到一個 TreeMap 結構中,其中 key 是 KafkaTopicPartition,value 是對應已消費的 offset。恢復成功后,應用恢復到故障前 Flink Kafka Consumer 消費的 offset,并繼續執行任務,就好像什么都沒發生一樣。

            3.1.1 運行時

            我們假設 Flink 應用正常運行,Flink Kafka Consumer 消費 topic 為 Topic-A,Topic-A 只有一個 partition。在運行期間,主要做了這么幾件事:

            • Kafka 數據消費

            KafkaFetcher 不斷的從 Kafka 消費數據,消費的數據會發送到下游算子并在內部記錄已消費過的 offset。下圖描述的是 Flink Kafka Consumer 從消費 Kafka 消息到將消息發送到下游算子的一個處理過程。

            接下來我們再結合消息真正開始處理后,KafkaTopicPartitionState 結構中的數據變化。

            可以看到,隨著應用的運行,KafkaTopicPartitionState 中的 offset 屬性值發生了變化,它記錄了已經發送到下游算子消息在 Kafka 中的 offset。在這里由于消息 P0-C 已經發送到下游算子,所以 KafkaTopicPartitionState.offset 變更為 2。

            • 狀態快照處理

            如果 Flink 應用開啟了 Checkpoint,JobManager 會定期觸發 Checkpoint。FlinkKafkaConsumer 實現了 CheckpointedFunction,所以它具備快照狀態(snapshotState)的能力。在實現中,snapshotState 具體干了這么兩件事。

            下圖描述當一輪 Checkpoint 開始時 FlinkKafkaConsumer 的處理過程。在例子中,FlinkKafkaConsumer 已經將 offset=3 的 P0-D 消息發送到下游,當checkpoint 觸發時將 topic=Topic-A;partition=0;offset=3 作為最后的狀態持久化到外部存儲。

            • 將當前快照輪次(CheckpointId)與 topic、partition、offset 寫入到一個待提交 offset 的 Map 集合,其中 key 是 CheckpointId。
            • 將 FlinkKafkaConsumer 當前運行狀態持久化,即將 topic、partition、offset 持久化。一旦出現故障,就可以根據最新持久化的快照進行恢復。

            下圖描述當一輪 Checkpoint 開始時 FlinkKafkaConsumer 的處理過程。在例子中,FlinkKafkaConsumer 已經將 offset=3 的 P0-D 消息發送到下游,當 checkpoint 觸發時將 topic=Topic-A;partition=0;offset=3 作為最后的狀態持久化到外部存儲。

            • 快照結束處理

            當所有算子基于同一輪次快照處理結束后,會調用 CheckpointListener.notifyCheckpointComplete(checkpointId) 通知算子 Checkpoint 完成,參數 checkpointId 指明了本次通知是基于哪一輪 Checkpoint。在 FlinkKafkaConsumer 的實現中,接到 Checkpoint 完成通知后會變更 KafkaTopicPartitionState.commitedOffset 屬性值。最后再將變更后的 commitedOffset 提交到 Kafka brokers 或 Zookeeper。

            在這個例子中,commitedOffset 變更為 4,因為在快照階段,將 topic=Topic-A;partition=0;offset=3 的狀態做了快照,在真正提交 offset 時是將快照的 offset + 1 作為結果提交的。「源代碼 KafkaFetcher.java 207 行 doCommitInternalOffsetsToKafka 方法」。

            3.1.2 故障恢復

            Flink 應用崩潰后,開始進入恢復模式。假設 Flink Kafka Consumer 最后一次成功的快照狀態是 topic=Topic-A;partition=0;offset=3,在恢復期間按照下面的先后順序執行處理。

            • 狀態初始化

            狀態初始化階段嘗試從狀態后端加載出可以用來恢復的狀態。它由 CheckpointedFunction.initializeState 接口定義。在 FlinkKafkaConsumer 的實現中,從狀態后端獲得快照并寫入到內部存儲結構 TreeMap,其中 key 是由 KafkaTopicPartition 表示的 topic 與 partition,value 為 offset。下圖描述的是故障恢復的第一個階段,從狀態后端獲得快照,并恢復到內部存儲。

            • function 初始化

            function 初始化階段除了初始化 OffsetCommitMode 和 partitionDiscoverer 外,還會初始化一個 Map 結構,該結構用來存儲應用待消費信息。如果應用需要從快照恢復狀態,則從待恢復狀態中初始化這個 Map 結構。下圖是該階段從快照恢復的處理過程。

            function 初始化階段兼容了正常啟動和狀態恢復時 offset 的初始化。對于正常啟動過程,StartupMode 的設置決定待消費信息中的結果。該模式共有 5 種,默認為 StartupMode.GROUP_OFFSETS。

            • 開始執行

            在該階段中,將 KafkaFetcher 初始化、初始化內部消費狀態、啟動消費線程等等,其目的是為了將 FlinkKafkaConsumer 運行起來,下圖描述了這個階段的處理流程。

            這里對圖中兩個步驟做個描述:

            • 步驟 3,使用狀態后端的快照結果 topic=Topic-A;partition=0;offset=3 初始化 Flink Kafka Consumer 內部維護的 Kafka 處理狀態。因為是恢復流程,所以這個內部維護的處理狀態也應該隨著快照恢復。
            • 步驟 4,在真正消費 Kafka 數據前(指調用 KafkaConsumer.poll 方法),使用Kafka 提供的 seek 方法將 offset 重置到指定位置,而這個 offset 具體算法就是狀態后端 offset + 1。在例子中,消費 Kafka 數據前將 offset 重置為 4,所以狀態恢復后 KafkaConsumer 是從 offset=4 位置開始消費。「源代碼 KafkaConsumerThread.java 428 行」

            3.1.3 總結

            上述的 3 個步驟是恢復期間主要的處理流程,一旦恢復邏輯執行成功,后續處理流程與正常運行期間一致。最后對 FlinkKafkaConsumer 用一句話做個總結。

            「將 offset 提交權交給 FlinkKafkaConsumer,其內部維護 Kafka 消費及提交的狀態。基于 Kafka 可重復消費能力并配合 Checkpoint 機制和狀態后端存儲能力,就能實現 FlinkKafkaConsumer 容錯性,即 Source 端的 Exactly-once 語義」。

            3.2 Flink Kafka Producer

            Flink Kafka Producer 是 Flink 應用向 Kafka 寫出數據的一個實現。在 Kafka 0.11.0 及以上版本中提供了事務支持,這讓 Flink 搭載 Kafka 的事務特性可以輕松實現 Sink 端的 Exactly-once 語義。關于 Kafka 事務特性在《Kafka 冪等與事務》中做了詳細討論。

            在 Flink Kafka Producer 中,有一個非常重要的組件 FlinkKafkaInternalProducer,這個組件代理了 Kafka 客戶端 org.apache.kafka.clients.producer.KafkaProducer,它為 Flink Kafka Producer 操作 Kafka 提供了強有力的支撐。在這個組件內部,除了代理方法外,還提供了一些關鍵操作。個人認為,Flink Kafka Sink 能夠實現 Exactly-once 語義除了需要 Kafka 支持事務特性外,同時也離不開

            FlinkKafkaInternalProducer 組件提供的支持,尤其是下面這些關鍵操作:

            • 事務重置 FlinkKafkaInternalProducer 組件中最關鍵的處理當屬事務重置,事務重置由 resumeTransaction 方法實現「源代碼 FlinkKafkaInternalProducer.java 144 行」。由于 Kafka 客戶端未暴露針對事務操作的 API,所以在這個方法內部,大量的使用了反射。方法中使用反射獲得 KafkaProducer 依賴的 transactionManager 對象,并將狀態后端快照的屬性值恢復到 transactionManager 對象中,這樣以達到讓 Flink Kafka Producer 應用恢復到重啟前的狀態。

            下面我們結合 Flink 運行時 和 故障恢復 兩個不同的處理時機來了解 Flink Kafka Producer 內部如何工作。

            3.2.1 運行時

            我們假設 Flink 應用正常運行,Flink Kafka Producer 正常接收上游數據并寫到 Topic-B 的 Topic 中,Topic-B 只有一個 partition。在運行期間,主要做以下幾件事:

            • 數據發送到 Kafka

            上游算子不斷的將數據 Sink 到 FlinkKafkaProducer,FlinkKafkaProducer 接到數據后封裝 ProducerRecord 對象并調用 Kafka 客戶端 KafkaProducer.send 方法將 ProducerRecord 對象寫入緩沖「源代碼 FlinkKafkaProducer.java 616 行」。下圖是該階段的描述:

            • 狀態快照處理

            Flink 1.7 及以上版本使用 FlinkKafkaProducer 作為 Kafka Sink,它繼承抽象類 TwoPhaseCommitSinkFunction,根據名字就能知道,這個抽象類主要實現兩階段提交。為了集成 Flink Checkpoint 機制,抽象類實現了 CheckpointedFunction 和 CheckpointListener,因此它具備快照狀態(snapshotState)能力。狀態快照處理具體做了下面三件事:

            調用 KafkaProducer 客戶端 flush 方法,將緩沖區內全部記錄發送到 Kafka,但不提交。這些記錄寫入到 Topic-B,此時這些數據的事務隔離級別為 UNCOMMITTED,也就是說如果有個服務消費 Topic-B,并且設置的 isolation.level=read_committed,那么此時這個消費端還無法 poll 到 flush 的數據,因為這些數據尚未 commit。什么時候 commit 呢?在快照結束處理階段進行 commit,后面會提到。
            將快照輪次與當前事務記錄到一個 Map 表示的待提交事務集合中,key 是當前快照輪次的 CheckpointId,value 是由 TransactionHolder 表示的事務對象。TransactionHolder 對象內部記錄了 transactionalId、producerId、epoch 以及 Kafka 客戶端 kafkaProducer 的引用。
            持久化當前事務處理狀態,也就是將當前處理的事務詳情存入狀態后端,供應用恢復時使用。

            下圖是狀態快照處理階段處理過程。

            • 快照結束處理

            TwoPhaseCommitSinkFunction 實現了 CheckpointListener,應用中所有算子的快照處理成功后會收到基于某輪 Checkpoint 完成的通知。當 FlinkKafkaProducer 收到通知后,主要任務就是提交上一階段產生的事務,而具體要提交哪些事務是從上一階段生成的待提交事務集合中獲取的。

            圖中第 4 步執行成功后,flush 到 Kafka 的數據從 UNCOMMITTED 變更為 COMMITTED,這意味著此時消費端可以 poll 到這批數據了。

            2PC(兩階段提交)理論的兩個階段分別對應了 FlinkKafkaProducer 的狀態快照處理階段和快照結束處理階段,前者是通過 Kafka 的事務初始化、事務開啟、flush 等操作預提交事務,后者是通過 Kafka 的 commit 操作真正執行事務提交。

            3.2.2 故障恢復

            Flink 應用崩潰后,FlinkKafkaProducer 開始進入恢復模式。下圖為應用崩潰前的狀態描述:

            在恢復期間主要的處理在狀態初始化階段。當 Flink 任務重啟時會觸發狀態初始化,此時應用與 Kafka 已經斷開了連接。但在運行期間可能存在數據 flush 尚未提交的情況。

            如果想重新提交這些數據需要從狀態后端恢復當時 KafkaProducer 持有的事務對象,具體一點就是恢復當時事務的 transactionalId、producerId、epoch。這個時候就用到了 FlinkKafkaInternalProducer 組件中的事務重置,在狀態初始化時從狀態后端獲得這些事務信息,并重置到當前 KafkaProducer 中,再執行 commit 操作。這樣就可以恢復任務重啟前的狀態,Topic-B 的消費端依然可以 poll 到應用恢復后提交的數據。

            需要注意的是:如果這個重置并提交的動作失敗了,可能會造成數據丟失。下圖描述的是狀態初始化階段的處理流程:

            3.2.3 總結

            FlinkKafkaProducer 故障恢復期間,狀態初始化是比較重要的處理階段。這個階段在 Kafka 事務特性的強有力支撐下,實現了事務狀態的恢復,并且使得狀態存儲占用空間最小。依賴 Flink 提供的 TwoPhaseCommitSinkFunction 實現類,我們自己也可以對 Sink 做更多的擴展。


            本文作者:史天舒

            原文鏈接

            本文為云棲社區原創內容,未經允許不得轉載。


            來自 “ ITPUB博客 ” ,鏈接:http://www.ep4tq.com/69915408/viewspace-2661165/,如需轉載,請注明出處,否則將追究法律責任。

            請登錄后發表評論 登錄
            全部評論

            注冊時間:2019-04-03

            • 博文量
              480
            • 訪問量
              274820
            妹子图每日分享