
名詞介紹
- Producer:訊息生產者
- Broker:傳遞訊息的中介者
- Consumer:訊息消費者
- Topic:訊息的主題
- Partition:主題內的分區
- ComsumerGroup:消費者群組
Producer
只要是發送訊息出去的都是這一個腳色,定位上是往kafka push queue的就是。
Broker

Comsumer
消化訊息者,只要是從kafka拉訊息去處理邏輯的都是。
注意:Comsumer 可能同時成為Producer,完全看你的邏輯處理如何handle
Topic

Partition
一個topic內可以有多個partition,使資料達到分散又可以有序,partition的數量跟你可以做到多大的分散是有絕對的關係。
INFO: 一個topic (前提 partition key 分散的很均勻)
如果有5個partition
那最多可以讓5個comsumer同步消化工作
如果有100個partition
最多可以讓100個comsumer同步消化工作
WARNING: 注意
如果有需要順序性處理的queue,那就必須要用key 強制鎖在同一個partition中,讓他一個一個消化。
反之 如果不需要順序性處理的就可以把key 設定null 讓他自動隨機,達到多comsumer的效益。
同時要注意你comsumer程式的內容,如果兩隻同時拿到一模一樣內容的queue有沒有關係,如果有關係,請設定key 讓他走同一個partition。

ComsumerGroup
這個相對還好,只是讓一份資料給不同的comsumer各自帶不同的ComsumerGroupid來拿走一樣的資料。
觀念像是同一個group的就會按照順序一個一個給 從0 1 2 3 4 5
增加一個comsumer且同groupId他也只是從6開始
但是如果增加一個不同groupId的comsumer 他就會從0開始
WARNING: 如果資料量很多,會根據kafka設定的保存時間或是大小來自動丟棄最前面的
所以有可能會變成從2開始
舉個例子
工程版

假設原有一支JOB 每10分鐘,迴圈檢查DB某張表(A表),然後這個JOB要再查API+部分邏輯+查第二個表(B表)後組合成為新資料塞回資料庫的另一張表(C表)
graph TD
start("check DB.A") --> handler("打API")
handler --> logic("部分邏輯")
logic --> query("check DB.B")
query --yes --> insert("insert DB.C")
query --no--> exit("sleep 10 mins")
insert --> exit("sleep 10 mins")
exit("sleep 10 mins") -->start("check DB.A")
INFO: 當 DB.A的量或是DB.B的量開始增加的時候
這個JOB所需要的時間就開始難以掌握
因此,這個JOB,如果透過kafka可以這樣改
三隻程式
1. 睡眠10分鐘,並且打入Kafka的topic A
2. 接收Kafka topic A 的queue,檢查DB.A 並且打入Kafka的topic B
3. 接收Kafka topic B 的queue,查API+部分邏輯+查第二個表(DB.B表) 確定要塞入的才寫入
SUCCESS: 第一隻的定位是純producer
他的角色就是定時去發動一件事情
第二隻的定位是comsumer+producer
他是接續第一隻的消化者同時去檢查資料庫有多少下一階段工作要做的給第三隻的producer
第三隻的定位就是純comsumer
他的工作就是把邏輯處理乾淨
如此DB.A的量大時
只要comsumer增大就可 消化時間也可控
(只要確定每一隻comsumer正常的消化速度 幾個queue/s)
一般版

第一步驟
客戶根據各家既定的”規格格式”,選擇他的問題,寄出信件
這就是客戶選擇topic跟內容的範本,如同兩個廠家有不同的內容範本一樣,兩個信件收件地址(Topic),也可以有不同的內容範本去處理
第二步驟
這封信寄出了以後
如果是熱門的議題,後面可能會有三四個客服小姐準備回信,冷門的議題可能就少一點,在議題中,如果有明確指定說這個VIP客戶的都要給某位聲音甜美的客服小姐姐處理,那就要有一個欄位去走這條特殊走道。
這就是Topic底下的partition數量(預估會報量的就要多一點,預估不怎樣的就少一點),然後如果有特別指名要給甜美小姐姐處理的就帶上partition key 讓他永遠往同一個partition去。
第三步驟
接下來就是各位客服開始看信收信解決事情,有可能要回信,有可能要跟官方溝通開PR單,有可能要轉寄出去
這就是Comsumer吃partition後處理的動作,也就是真正的Worker在處理事情,有可能塞DB,有可能往下一個topic丟,有可能打API,完全看這個信件的內容決定怎麼處理(logic)
基本所需參數
producer
bootstrap.servers
key.serializer
value.serializer
linger.ms
batch.size
buffer.memory
compression.type
max.in.flight.requests.per.connection
request.timeout.ms
delivery.timeout.ms
acks
retries
除此之外
topic ,partition key , value 也是重要producer必須
INFO: acks = 1
Producer發送訊息時是否要等待訊息被確認收到,分為0, 1, all(-1)三種
0:producer不等待borker leader回應確認即當作發送成功,不斷發送訊息,因不做確認而有可能會缺漏訊息,設定為0則retries參數等於失效
1:broker leader收到後會直接回應確認,不等其他replica也保存好訊息
all=-1:broker收到後會等到所有replica都保存好訊息後才回覆確認
retries = 3
當訊息發送失敗時重試的次數
max.in.flight.requests.per.connection = 1
未被確認收到的訊息最多可以有幾筆準備被發送,當retries不為0時建議設定該值為1,代表在消息未被確認收到前不會再發送其他訊息,避免訊息重複被發送,但會降低效能
request.timeout.ms=30000
因acks模式為1,他必須明確知道怎樣的條件下事timeout
delivery.timeout.ms=6000000
不管任何模式下,因為KAFKA要到一定內存量後才會打入,因此後面還有一個linger.ms讓batch可以批次作業,沒有設定的話 linger.ms 預設是0,此參數通常要比整個request+ linger+retry總和還要長,如果acks=-1的話 request會比較長。
batch.size = 16384
Kafka producer的send是async方式,訊息會先被暫存到batch裡面然後再一次發送,該參數設定當batch量到達多少會進行發送,設定太少會讓發送次數過於頻繁導致效能下降,設定太多會消耗過多的memory在暫存batch
buffer.memory = 33554432
Producer可以用的memory大小,包含buffer訊息、進行壓縮所需使用的memory、等待被retry的訊息…等等
compression.type = lz4
訊息壓縮編碼 其他方法可參考最底下
replica.lag.time.max.ms
如果 acks=-1 這個參數就會配合在整個流程裡面進而造成一個限制,因為整個request變成必須要做完replica
comsumer
bootstrap.servers
group.id
key.serializer
value.serializer
max.poll.records
enable.auto.commit
max.poll.interval.ms
除此之外
topic 也是重要comsumer必須
WARNING: max.poll.records<最大一次取得的量
如果取得的量過大,可能會造成整體時間過長進而因為 max.poll.interval.ms
導致Kafka 認為這個comsumer已經沒有下文了 他會自動進行rebalance
把現在的partition-comsumer之間重新安排
另外enable.auto.commit 這個參數 如果設定ture,取過的queue就會自動commit 下一次就會無法取得,因此這一個參數強烈建議false 然後程式內確定做完才壓commit,也就是”手動”commit,但是如果是搞丟的queue也不影響的,這種就可以設定自動commit。
DANGER: comsumer 有一種模式 可以指定partition ,也就是說 可以指定要partition 1 這條裡面的queue ,但是這樣缺點眾多,後續擴充或是增減機器,kafka要做rebalance的時候也相對麻煩,因此不建議
詳細動作
flowchart LR
subgraph 0
direction LR
kafka製造者Case1 --acks=0-->topicCase1
end
subgraph 1
direction LR
topicCase2 --acks=1-->kafka製造者Case2
kafka製造者Case2 --acks=1-->topicCase2
end
acks=all(-1)
sequenceDiagram
participant kafka製造者
participant batch
participant topic
participant replica
Note left of kafka製造者: delivery.timeout.ms
loop retries
Note left of batch: request.timeout.ms<br><br> <br>max.in.flight.<br>requests.per.connection
Note left of topic: batch.size<br>compression.type
kafka製造者->>batch: buffer.memory
batch->>topic: linger.ms
topic->>replica: replica.lag.time.max.ms
replica-->replica: replication-factor
replica->>topic:
topic->>batch:
batch->>kafka製造者:
end
Topic監控(Kowl)
此topic有三個comsumer




最終結果
這張是kafka的logo 也就是kafka的精神
中間的大圈圈 就是Kafka的各種Topic ,周圍就是各種comsumer跟producer


Kafka的坑
Apache Kafka 是一個流行的分散式消息系統,能夠處理高容量的實時數據流。在使用 Kafka 時,以下是一些坑要避免的:
-
消費者組的命名不當:消費者組是一個重要的概念,並且命名要有意義,以便後續調試和維護。選擇不好的命名方式可能會導致消費者組中出現奇怪的行為,例如消息重複處理或消息丟失。
-
錯誤的分區策略:分區策略是指如何將消息分配給不同的分區。錯誤的分區策略可能會導致消息不均衡分配,或者導致某些分區中的消息過度堆積而且沒有處理。
-
錯誤的副本數量:在設置 Kafka 主題時,需要決定副本數量。如果設置錯誤,可能會導致過度的網絡流量或性能下降。
-
消息序列化和反序列化的性能問題:Kafka 中的消息序列化和反序列化可能是性能瓶頸之一。如果使用不當的序列化庫或錯誤的序列化格式,可能會導致性能下降。
-
錯誤的存儲設置:Kafka 存儲配置錯誤可能會導致性能下降或存儲故障。需要注意存儲設置的大小和清理策略,以便保證高效和可靠的存儲。
-
資料傳輸安全性:Kafka 資料傳輸可能存在安全性風險。需要使用安全性功能(例如 SSL/TLS、SASL、Kerberos 等)以保護傳輸資料的安全性。
總之,在使用 Kafka 時,需要仔細設置和監控,以保證高效、可靠和安全的數據流傳輸。
consumer-group命名
消費者組的命名應該要能夠清楚地表達出該消費者組的用途或目的。以下是一些消費者組命名的範例:
data-processing-consumer-group:這個消費者組的目的是處理數據,因此將其命名為 data-processing-consumer-group 是很有意義的。
user-activity-tracking-consumer-group:這個消費者組的目的是追踪用戶活動,因此將其命名為 user-activity-tracking-consumer-group 是很有意義的。
order-fulfillment-consumer-group:這個消費者組的目的是履行訂單,因此將其命名為 order-fulfillment-consumer-group 是很有意義的。
anomaly-detection-consumer-group:這個消費者組的目的是檢測異常事件,因此將其命名為 anomaly-detection-consumer-group 是很有意義的。
總之,消費者組的命名應該要具有意義並清晰表達出其目的,以便在後續調試和維護時更容易理解和管理。
partition 策略
分區策略是一個重要的概念,用於將消息分配到不同的分區中。以下是一些正確和錯誤的分區策略的範例:
正確的分區策略範例:
輪詢策略:將消息均勻地分配給所有分區。這是一種簡單但有效的策略,特別適用於消費者組中的消費者數量比分區數量少的情況。
密集型策略:將消息分配給可用資源最多的分區,這些資源可以是 CPU、記憶體或網路帶寬等。這種策略可以最大化使用資源,提高整個系統的效能。
負載平衡策略:將消息分配給負載最輕的分區,以平衡整個系統的負載。這種策略可以避免某些分區的負載過重,導致系統性能下降。
錯誤的分區策略範例:
固定策略:將消息固定分配到某個特定的分區,這種策略可能會導致消息不均衡分配,某些分區的消息堆積過多。
隨機策略:將消息隨機分配到任意一個分區,這種策略可能會導致某些分區的消息過度堆積而沒有處理。
總之,在選擇分區策略時,需要考慮消費者組的消費者數量、分區數量、資源利用率和負載平衡等因素,以保證整個系統的效能和穩定性。
Kafka串接的常用參數
bootstrap.servers:這是一個必須設置的參數,它指定Kafka broker的地址,客戶端可以通過這些地址來連接Kafka集群。
group.id:指定消費者組的ID,相同ID的消費者屬於同一個消費者組,可以實現負載均衡和高可用。
auto.offset.reset:當消費者第一次加入消費者組或者它所屬的消費者組沒有之前的offset時,Kafka將如何處理消息的offset。可以設置為“latest”或“earliest”,分別代表最新的offset和最早的offset。
enable.auto.commit:設置為true時,消費者將自動提交offset,設置為false時,需要手動提交offset。
compression.type:指定消息壓縮算法,可以是“none”、“gzip”、“snappy”或“lz4”。
acks:指定消息的確認模式,可以是“0”、“1”或“all”。0表示生產者不等待任何確認,1表示只要leader確認消息已經寫入就可以,all表示所有follower都確認後才算完成。
max.request.size:設置生產者發送的最大消息大小,默認值為1048576字節。
request.timeout.ms:設置Kafka broker等待生產者或消費者的請求超時時間,默認值為30000毫秒。
max.poll.records:設置消費者一次拉取的最大消息數量,默認值為500。
partition.assignment.strategy:設置消費者如何分配分區,可以是“range”或“roundrobin”。range表示按照分區的範圍進行分配,roundrobin表示輪詢分配分區。
這些參數只是Kafka配置中的一部分,實際使用中還需要根據具體情況進行調整。
(選擇)實務坑
https://hackmd.io/@2GrxaqznSJKLrhmXwCaeug/BkCFYgYa9
發佈留言