前言:為什麼需要事件驅動?
想像一個場景:使用者在後台點擊「同步蝦皮訂單」。
- 蝦皮 API 回應慢 → 使用者等待 30 秒以上
- API 超時 → 整個請求失敗
- 大量請求 → 伺服器資源耗盡
解決方案:非同步事件驅動
│ │
▼ │
┌─────────┐ 發送訊息 ┌─────────┐
│ Web API │ ──────────────► │ Kafka │
└─────────┘ └────┬────┘
│ │
▼ ▼
回應成功 ┌───────────────┐
(立即返回) │ Consumer Job │
│ 慢慢處理… │
└───────────────┘
架構設計
Topic 設計:每個通路獨立
| Topic 名稱 | 用途 | Consumer |
|---|---|---|
| oms-action-shopee | 蝦皮相關動作 | Shopee Consumer |
| oms-action-momo | Momo 相關動作 | Momo Consumer |
| oms-action-yahoo | Yahoo 相關動作 | Yahoo Consumer |
| oms-action-pchome | PChome 相關動作 | PChome Consumer |
- 蝦皮 API 壞了,不影響 Momo 訂單處理
- 可以針對不同平台調整 Consumer 數量
- 方便監控各平台的處理狀況
Producer:發送訊息
public class ActionProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
/**
* 發送動作到對應的通路 Topic
*/
public void sendAction(ChannelType channel, ActionMessage message) {
String topic = “oms-action-“ + channel.getCode();
String payload = JsonUtil.toJson(message);
kafkaTemplate.send(topic, message.getMerchantId(), payload)
.addCallback(
result -> log.info(“發送成功: {}”, topic),
error -> log.error(“發送失敗: {}”, error.getMessage())
);
}
}
訊息格式設計
“header”: {
“messageId”: “uuid-xxxx-xxxx”,
“timestamp”: “2024-03-18T10:30:00Z”,
“traceId”: “trace-xxxx”
},
“body”: {
“merchantId”: “M001”,
“actionType”: “SYNC_ORDERS”,
“parameters”: {
“startDate”: “2024-03-17”,
“endDate”: “2024-03-18”
}
}
}
Consumer:處理訊息
public class ActionConsumer {
@Autowired
private ChannelFactory channelFactory;
@KafkaListener(topics = “oms-action-shopee”)
public void consumeShopee(String message) {
processAction(ChannelType.SHOPEE, message);
}
@KafkaListener(topics = “oms-action-momo”)
public void consumeMomo(String message) {
processAction(ChannelType.MOMO, message);
}
private void processAction(ChannelType channel, String message) {
try {
// 1. 解析訊息
ActionMessage action = JsonUtil.fromJson(message);
// 2. 取得對應的通路處理器
ChannelAction handler = channelFactory.getAction(channel);
// 3. 執行動作
ActionResult result = handler.execute(action);
// 4. 回寫結果
saveResult(action, result);
} catch (Exception e) {
// 5. 錯誤處理
handleError(message, e);
}
}
}
錯誤處理策略
| 錯誤類型 | 處理方式 | 範例 |
|---|---|---|
| 暫時性錯誤 | 重試 3 次 | API 超時、網路問題 |
| 永久性錯誤 | 記錄並跳過 | 資料格式錯誤 |
| 未知錯誤 | 進入 Dead Letter Queue | 系統異常 |
public DefaultErrorHandler errorHandler() {
// 設定重試策略
BackOff backOff = new ExponentialBackOff(1000L, 2.0);
backOff.setMaxElapsedTime(30000L); // 最多重試 30 秒
return new DefaultErrorHandler(
(record, exception) -> {
// 重試失敗後,送到 Dead Letter Queue
sendToDeadLetterQueue(record, exception);
},
backOff
);
}
監控與告警
| 監控指標 | 正常值 | 告警條件 |
|---|---|---|
| Consumer Lag | < 1000 | > 5000 持續 5 分鐘 |
| 處理時間 | < 5 秒 | > 30 秒 |
| 錯誤率 | < 1% | > 5% |
| Dead Letter 數量 | 0 | > 10 |
效能調校
spring:
kafka:
consumer:
# 每次拉取的最大筆數
max-poll-records: 100
# 拉取間隔
fetch-min-size: 1
fetch-max-wait: 500ms
producer:
# 批次發送設定
batch-size: 16384
buffer-memory: 33554432
# 壓縮
compression-type: lz4
總結
| 設計 | 效果 |
|---|---|
| 非同步處理 | 使用者不用等待 API 回應 |
| Topic 分離 | 通路故障隔離 |
| 重試機制 | 暫時性錯誤自動恢復 |
| Dead Letter Queue | 問題訊息不遺失 |
| 監控告警 | 問題即時發現 |
為什麼不用其他方案?
| 方案 | 優點 | 缺點 | 結論 |
|---|---|---|---|
| 同步處理 | 簡單、好除錯 | 使用者要等、效能差 | 小流量可用 |
| Redis Queue | 輕量、快速 | 持久化弱、無法分區 | 簡單場景可用 |
| RabbitMQ | 功能豐富、可靠 | 吞吐量不如 Kafka | 適合複雜路由 |
| Kafka | 高吞吐、持久化、分區 | 學習曲線、維運成本 | 大流量首選 |
實戰踩坑
雙 11 當天 Consumer Lag 飆到 50,000+,訂單處理延遲 2 小時。原因:單一 Consumer 處理太慢。解法:增加 Consumer 數量到 Partition 數量,同時優化處理邏輯(批次處理)。
Consumer 處理到一半掛掉,重啟後同一筆訂單被處理兩次,導致重複出貨。解法:加入冪等性檢查(用訂單 ID 去重)。
最初所有平台共用一個 Topic,蝦皮 API 壞了堵住整條 Queue,Momo 訂單也跟著延遲。後來拆成每個平台獨立 Topic,故障隔離。
系列導航
| ◀ 上一篇 工廠模式 |
📚 返回目錄 | 下一篇 ▶ 多租戶認證 |
發佈留言