
名詞介紹
- Producer:訊息生產者
- Broker:傳遞訊息的中介者
- Consumer:訊息消費者
- Topic:訊息的主題
- Partition:主題內的分區
- ConsumerGroup:消費者群組
Producer
只要是發送訊息出去的都是這個角色,定位上是往 Kafka push queue 的就是。
Broker
保存並且接受訊息的實體,同時也負起訊息的備份跟 index 等等的角色。多個 broker 內可以有多個 Topic 進行備份或是調整空間。有可能 A Topic 同時在第一個 Broker 跟第三個 Broker 內。

Consumer
消化訊息者,只要是從 Kafka 拉訊息去處理邏輯的都是。
注意:Consumer 可能同時成為 Producer,完全看你的邏輯處理如何 handle。
Topic
設定主題,讓你的訊息往這裡走,讓 producer 跟 consumer 有同一個聚焦點。

Partition
一個 topic 內可以有多個 partition,使資料達到分散又可以有序,partition 的數量跟你可以做到多大的分散是有絕對的關係。
範例:一個 topic(前提 partition key 分散的很均勻)
如果有 5 個 partition → 最多可以讓 5 個 consumer 同步消化工作
如果有 100 個 partition → 最多可以讓 100 個 consumer 同步消化工作
⚠️ 注意:
- 如果有需要順序性處理的 queue,必須用 key 強制鎖在同一個 partition 中,讓他一個一個消化
- 如果不需要順序性處理,可以把 key 設定 null 讓他自動隨機,達到多 consumer 的效益
- 同時要注意你 consumer 程式的內容,如果兩隻同時拿到一模一樣內容的 queue 有沒有關係

ConsumerGroup
讓一份資料給不同的 consumer 各自帶不同的 ConsumerGroupId 來拿走一樣的資料。
- 同一個 group 的就會按照順序一個一個給(0, 1, 2, 3, 4, 5)
- 增加一個 consumer 且同 groupId,他也只是從 6 開始
- 但如果增加一個不同 groupId 的 consumer,他就會從 0 開始
⚠️ 如果資料量很多,會根據 Kafka 設定的保存時間或大小來自動丟棄最前面的,所以有可能會變成從 2 開始。
實戰案例:工程版

假設原有一支 JOB 每 10 分鐘迴圈檢查 DB 某張表(A表),然後這個 JOB 要再查 API + 部分邏輯 + 查第二個表(B表) 後組合成為新資料塞回資料庫的另一張表(C表)。
問題:當 DB.A 或 DB.B 的量開始增加時,這個 JOB 所需要的時間就開始難以掌握。
透過 Kafka 改造成三隻程式
- 純 Producer:睡眠 10 分鐘,並且打入 Kafka 的 topic A(定時發動一件事情)
- Consumer + Producer:接收 topic A 的 queue,檢查 DB.A 並且打入 topic B
- 純 Consumer:接收 topic B 的 queue,查 API + 部分邏輯 + 查 DB.B,確定要塞入的才寫入
✅ 如此 DB.A 的量大時,只要 consumer 增大就可,消化時間也可控(只要確定每一隻 consumer 正常的消化速度,幾個 queue/s)
Producer 基本參數
bootstrap.servers
key.serializer
value.serializer
linger.ms
batch.size
buffer.memory
compression.type
max.in.flight.requests.per.connection
request.timeout.ms
delivery.timeout.ms
acks
retries
重要參數說明
- acks = 1:Producer 發送訊息時是否要等待訊息被確認收到
0:不等待確認即當作發送成功,可能會缺漏訊息1:broker leader 收到後直接回應確認,不等其他 replicaall(-1):等到所有 replica 都保存好訊息後才回覆確認
- retries = 3:當訊息發送失敗時重試的次數
- max.in.flight.requests.per.connection = 1:未被確認收到的訊息最多可以有幾筆準備被發送。當 retries 不為 0 時建議設定為 1,避免訊息重複被發送
- request.timeout.ms = 30000:請求超時時間
- delivery.timeout.ms = 6000000:整體交付超時,通常要比 request + linger + retry 總和還長
- batch.size = 16384:當 batch 量到達多少會進行發送
- buffer.memory = 33554432:Producer 可用的 memory 大小
- compression.type = lz4:訊息壓縮編碼
Consumer 基本參數
bootstrap.servers
group.id
key.serializer
value.serializer
max.poll.records
enable.auto.commit
max.poll.interval.ms
重要參數說明
- max.poll.records:最大一次取得的量。如果取得的量過大,可能會造成整體時間過長,進而因為 max.poll.interval.ms 導致 Kafka 認為這個 consumer 已經沒有下文,自動進行 rebalance
- enable.auto.commit:強烈建議 false,然後程式內確定做完才壓 commit(手動 commit)。如果設定 true,取過的 queue 就會自動 commit,下一次就無法取得
⚠️ Consumer 有一種模式可以指定 partition,但缺點眾多,後續擴充或增減機器時 Kafka 要做 rebalance 相對麻煩,不建議使用。
Topic 監控(Kowl)




Kafka 的坑
- 消費者組的命名不當:命名要有意義,以便後續調試和維護。選擇不好的命名方式可能會導致消息重複處理或消息丟失
- 錯誤的分區策略:可能會導致消息不均衡分配,或導致某些分區中的消息過度堆積
- 錯誤的副本數量:可能會導致過度的網絡流量或性能下降
- 消息序列化和反序列化的性能問題:使用不當的序列化庫可能會導致性能下降
- 錯誤的存儲設置:需要注意存儲設置的大小和清理策略
- 資料傳輸安全性:需要使用安全性功能(SSL/TLS、SASL、Kerberos 等)
Consumer Group 命名範例
data-processing-consumer-group:處理數據user-activity-tracking-consumer-group:追踪用戶活動order-fulfillment-consumer-group:履行訂單anomaly-detection-consumer-group:檢測異常事件
Partition 策略
正確的策略:
- 輪詢策略:將消息均勻分配給所有分區
- 密集型策略:將消息分配給可用資源最多的分區
- 負載平衡策略:將消息分配給負載最輕的分區
錯誤的策略:
- 固定策略:將消息固定分配到某個特定分區,會導致不均衡
- 隨機策略:將消息隨機分配,可能導致某些分區過度堆積
最終結果
這張是 Kafka 的 logo 也就是 Kafka 的精神:中間的大圈圈就是 Kafka 的各種 Topic,周圍就是各種 consumer 跟 producer。


發佈留言