Kafka 事件驅動架構:打造高可用訂單處理系統

商業價值:事件驅動架構讓系統能「處理速度提升 10 倍」,從 4-8 小時縮短到 25-35 分鐘。詳見 導讀篇的 ROI 計算

前言:為什麼需要事件驅動?

想像一個場景:使用者在後台點擊「同步蝦皮訂單」。

同步處理的問題:

  • 蝦皮 API 回應慢 → 使用者等待 30 秒以上
  • API 超時 → 整個請求失敗
  • 大量請求 → 伺服器資源耗盡

解決方案:非同步事件驅動

使用者請求 背景處理
│ │
▼ │
┌─────────┐ 發送訊息 ┌─────────┐
│ Web API │ ──────────────► │ Kafka │
└─────────┘ └────┬────┘
│ │
▼ ▼
回應成功 ┌───────────────┐
(立即返回) │ Consumer Job │
│ 慢慢處理… │
└───────────────┘
效果:使用者立即收到「已排程」回應,實際同步在背景執行。

架構設計

Topic 設計:每個通路獨立

Topic 名稱 用途 Consumer
oms-action-shopee 蝦皮相關動作 Shopee Consumer
oms-action-momo Momo 相關動作 Momo Consumer
oms-action-yahoo Yahoo 相關動作 Yahoo Consumer
oms-action-pchome PChome 相關動作 PChome Consumer
為什麼要分開?

  • 蝦皮 API 壞了,不影響 Momo 訂單處理
  • 可以針對不同平台調整 Consumer 數量
  • 方便監控各平台的處理狀況

Producer:發送訊息

@Service
public class ActionProducer {

private final KafkaTemplate<String, String> kafkaTemplate;

/**
* 發送動作到對應的通路 Topic
*/

public void sendAction(ChannelType channel, ActionMessage message) {
String topic = “oms-action-“ + channel.getCode();
String payload = JsonUtil.toJson(message);

kafkaTemplate.send(topic, message.getMerchantId(), payload)
.addCallback(
result -> log.info(“發送成功: {}”, topic),
error -> log.error(“發送失敗: {}”, error.getMessage())
);
}
}

訊息格式設計

{
“header”: {
“messageId”: “uuid-xxxx-xxxx”,
“timestamp”: “2024-03-18T10:30:00Z”,
“traceId”: “trace-xxxx”
},
“body”: {
“merchantId”: “M001”,
“actionType”: “SYNC_ORDERS”,
“parameters”: {
“startDate”: “2024-03-17”,
“endDate”: “2024-03-18”
}
}
}

Consumer:處理訊息

@Component
public class ActionConsumer {

@Autowired
private ChannelFactory channelFactory;

@KafkaListener(topics = “oms-action-shopee”)
public void consumeShopee(String message) {
processAction(ChannelType.SHOPEE, message);
}

@KafkaListener(topics = “oms-action-momo”)
public void consumeMomo(String message) {
processAction(ChannelType.MOMO, message);
}

private void processAction(ChannelType channel, String message) {
try {
// 1. 解析訊息
ActionMessage action = JsonUtil.fromJson(message);

// 2. 取得對應的通路處理器
ChannelAction handler = channelFactory.getAction(channel);

// 3. 執行動作
ActionResult result = handler.execute(action);

// 4. 回寫結果
saveResult(action, result);

} catch (Exception e) {
// 5. 錯誤處理
handleError(message, e);
}
}
}


錯誤處理策略

錯誤類型 處理方式 範例
暫時性錯誤 重試 3 次 API 超時、網路問題
永久性錯誤 記錄並跳過 資料格式錯誤
未知錯誤 進入 Dead Letter Queue 系統異常
@Bean
public DefaultErrorHandler errorHandler() {
// 設定重試策略
BackOff backOff = new ExponentialBackOff(1000L, 2.0);
backOff.setMaxElapsedTime(30000L); // 最多重試 30 秒

return new DefaultErrorHandler(
(record, exception) -> {
// 重試失敗後,送到 Dead Letter Queue
sendToDeadLetterQueue(record, exception);
},
backOff
);
}


監控與告警

監控指標 正常值 告警條件
Consumer Lag < 1000 > 5000 持續 5 分鐘
處理時間 < 5 秒 > 30 秒
錯誤率 < 1% > 5%
Dead Letter 數量 0 > 10

效能調校

# application.yml
spring:
kafka:
consumer:
# 每次拉取的最大筆數
max-poll-records: 100

# 拉取間隔
fetch-min-size: 1
fetch-max-wait: 500ms

producer:
# 批次發送設定
batch-size: 16384
buffer-memory: 33554432

# 壓縮
compression-type: lz4


總結

設計 效果
非同步處理 使用者不用等待 API 回應
Topic 分離 通路故障隔離
重試機制 暫時性錯誤自動恢復
Dead Letter Queue 問題訊息不遺失
監控告警 問題即時發現

為什麼不用其他方案?

方案 優點 缺點 結論
同步處理 簡單、好除錯 使用者要等、效能差 小流量可用
Redis Queue 輕量、快速 持久化弱、無法分區 簡單場景可用
RabbitMQ 功能豐富、可靠 吞吐量不如 Kafka 適合複雜路由
Kafka 高吞吐、持久化、分區 學習曲線、維運成本 大流量首選

實戰踩坑

坑 1:Consumer Lag 暴增

雙 11 當天 Consumer Lag 飆到 50,000+,訂單處理延遲 2 小時。原因:單一 Consumer 處理太慢。解法:增加 Consumer 數量到 Partition 數量,同時優化處理邏輯(批次處理)。

坑 2:訊息重複消費

Consumer 處理到一半掛掉,重啟後同一筆訂單被處理兩次,導致重複出貨。解法:加入冪等性檢查(用訂單 ID 去重)。

坑 3:Topic 沒分開

最初所有平台共用一個 Topic,蝦皮 API 壞了堵住整條 Queue,Momo 訂單也跟著延遲。後來拆成每個平台獨立 Topic,故障隔離。


系列導航

◀ 上一篇
工廠模式
📚 返回目錄 下一篇 ▶
多租戶認證

留言

發佈留言

發佈留言必須填寫的電子郵件地址不會公開。 必填欄位標示為 *