問題現象
每 30 分鐘一次的監控信發現:
- 某個 Consumer 的 Lag 從 30 萬飆到 500 萬
- Kafka 機器的磁碟空間不足
- 系統即將崩潰
問題分析
上下游程式都出問題
- 下游:有 Error 導致工作做不了
- 上游:持續產生工作,完全不會停止(自走砲設計)
- 結果:每小時灌入 10 萬筆 Queue
緊急處理步驟
步驟一:快速消化 Queue
部署一個「什麼都不做」的 Consumer:
@KafkaListener(topics = "problem-topic")
public void consume(String message) {
// 直接 ack,不做任何處理
// 目的是快速清空 Queue
}
開 5 台以上同時消化。
步驟二:設定 Topic 過期策略
使用 CMAK 調整 Topic 設定:
| 設定 | 值 | 說明 |
|---|---|---|
| retention.ms | 600000 | 10 分鐘後刪除 |
| cleanup.policy | delete | 直接刪除舊資料 |
步驟三:多機器 Rebalance
500 萬筆分配到 32 個 Partition,平均每個 17 萬筆。
技巧:
- 開 5 台快速消化的機器
- 快速機器清空 26 個 Partition
- 加減機器觸發 Rebalance
- 把剩餘的塞住 Partition 分配給快速機器
根本解決方案
1. 上游加入熔斷機制
// 檢查下游 Lag
if (getConsumerLag() > THRESHOLD) {
// 暫停產生新工作
logger.warn("下游積壓過多,暫停發送");
return;
}
sendToKafka(message);
2. 下游錯誤處理
// 失敗時發送告警
@KafkaListener
public void consume(String message) {
try {
process(message);
} catch (Exception e) {
alertService.send("處理失敗: " + e.getMessage());
// 決定是重試還是丟棄
}
}
3. 監控告警
- 設定 Queue Lag 閾值告警
- 監控磁碟使用量
- 每次上線後觀察 Queue 狀況
關鍵學習
每一次程式上線後的一天,都要觀察 Queue 的狀況。自走砲式的設計尤其危險。
發佈留言