標籤: Kafka

  • Spring Boot OMS Code Review 實戰:20 個 Bug 與事件驅動架構的一課

    重點摘要

    • 三輪 Code Review 共找出 20+ 個問題,從 NPE 連鎖到種子資料欄位名稱全錯
    • Kafka Seeder 寫了三個版本,每次重寫都是對「事件驅動架構正確入口」理解的加深
    • 能走事件流就走事件流:API → Kafka → Consumer → DB,每一層都有可追蹤、可重試的意義
    • 21 個 Java 容器沒有 JVM 記憶體限制,用 JAVA_TOOL_OPTIONS 一行解決,不需改 Dockerfile

    這是 多通路電商 OMS 系統開發過程中的一天工作紀錄。系統整合了 Momo、Shopee、Yahoo 等電商平台,透過 Kafka 事件流處理訂單同步、退貨與統計。今天的目標:完成 feature/stats-pipeline 分支上的所有待辦修復,讓系統能順利 docker compose up,並驗證端對端資料流。

    三輪 Code Review:每一輪都有新發現

    第一輪:已知清單上的 7 個問題

    進入狀態之前就有一份清單,分為 Critical、Warning、Info 三個等級:

    等級 問題 修復方式
    CriticalOrderUpsertConsumer .get() NPE 連鎖.path() + 加 orderDataJson null 守衛
    Criticaldaily_statistics.id 缺 NOT NULL加約束 + DEFAULT partition
    CriticalReturnUpsertConsumer 未寫 stats dirty marker新增 Redis ZSET 寫入
    WarningDailyStatisticsService early return 留舊資料改成刪除過時的 stats 列
    Warningenum 預設值小寫 'pending'改大寫 'PENDING',與 JPA EnumType.STRING 對齊
    WarningRetryJobConsumer MissingNode cast.isObject() 判斷再 cast

    其中 enum 大小寫這個問題值得特別說明。Java 的 @Enumerated(EnumType.STRING) 在讀取時呼叫 Enum.valueOf(),這個方法是 case-sensitive 的。資料庫預設值寫 'pending',但 enum 常數叫 PENDING,啟動時不會出錯,但一讀到有 DEFAULT 值的列就會拋 IllegalArgumentException

    第二輪:種子資料是另一個地雷區

    Schema 修完了,以為大功告成,結果種子資料(02-seed-data.sql)是第二個地雷區:

    1. BCrypt hash 是假的$2a$10$dummyhashfordevonly... 根本不是有效的 BCrypt hash,Spring Security 的 passwordEncoder.matches() 永遠回傳 false,登入 100% 失敗。
    2. 訂單狀態小寫'completed''shipped' — 和上面一樣的 case-sensitive 問題,這次在資料列而不是 DEFAULT 值。
    3. daily_statistics 欄位名稱全錯:用了 order_counttotal_amount 這些不存在的欄位名,docker compose up 的 DB 初始化階段會直接 fail。

    這些問題的共同特徵是:compile time 抓不到,schema validate 也抓不到。Hibernate 的 ddl-auto: validate 只單向檢查「entity 中有 mapping 的欄位是否存在於 DB」,不會反向驗證 SQL 腳本的正確性。唯一的防護是跑起來測試。

    第三輪:21 個容器,一個 JVM 記憶體問題

    系統在開發機上跑 21 個 Java 容器(Spring Boot services),沒有任何 JVM heap 限制。JVM ergonomic sizing 預設使用系統 RAM 的 25%,7.4GB 可用 RAM 很快就會不夠。

    解法是在 docker-compose.yml 每個服務加 JAVA_TOOL_OPTIONS

    environment:
      JAVA_TOOL_OPTIONS: "-Xmx256m -XX:+ExitOnOutOfMemoryError"

    JAVA_TOOL_OPTIONS 是 JDK 標準環境變數,JVM 啟動時自動讀取,不需要修改 Dockerfile 的 ENTRYPOINT-XX:+ExitOnOutOfMemoryError 讓容器在 OOM 時立刻崩潰(而不是卡死),對 Docker 的 restart: unless-stopped 友好,等於有了自動恢復機制。

    Seeder 的三次重寫:對事件驅動架構的理解之旅

    今天最有收穫的插曲。目標是「準備一個 Docker 服務,打假訂單資料,確認整體資料流順暢」。這個任務看起來很簡單,結果寫了三個版本。

    第一版:直接打 Kafka(被打槍)

    第一直覺:用 kafka-python 直接連 kafka:9092,組好 ORDER_UPSERT 訊息送到 order.process topic。快速、直接。

    問題:系統對外只有 API,直接操作 Kafka 是繞過了系統設計的邊界。內部基礎設施不應該是外部系統的接入點。

    第二版:打 POST /api/orders(沒走事件流)

    改用 REST API。先 login 拿 JWT,再 POST /api/orders

    問題:OrderController.createOrder() 是直接寫資料庫,跳過了整個 Kafka pipeline。Stats dirty marker 不會被寫入,DailyStatisticsService 不會被觸發,daily_statistics 表不會更新。雖然訂單進了 DB,但「整體資料流」沒有跑通。

    第三版:新增正確的 API 端點(走完整事件流)

    UserOrderController 新增 POST /api/user/orders,接收訂單資料後發布 ORDER_UPSERT 到 Kafka,回傳 202 Accepted:

    POST /api/user/orders  (帶 JWT)
      → 查 Channel → Platform(取得 platformId)
      → 組 ORDER_UPSERT 訊息(header + body + hash)
      → kafkaTemplate.send("order.process", ...)
      → 回傳 202 Accepted
    
    接著:
      Kafka order.process
        → OrderUpsertConsumer(Redis 去重 → INSERT/UPDATE)
            → stats dirty marker 寫入 Redis ZSET
                → StatsRecalcHandler(定時掃)
                    → DailyStatisticsService.recalculate()
                        → daily_statistics 更新

    端到端,一條不少。

    為什麼「能走事件流就走事件流」不只是口號

    三次重寫讓這個原則從抽象變得具體。走事件流的好處不只是「解耦」這個詞能涵蓋的:

    層面 直接寫 DB 走 Kafka 事件流
    可追蹤性只有 DB recordKafka UI 可看完整訊息歷史,帶 traceId
    錯誤處理拋 exception,呼叫方看到 500失敗走 task.failed → retry → task.dlt
    去重需要自己實作Consumer 有 Redis + DB 兩層去重
    統計觸發需要額外呼叫Consumer 自動寫 dirty marker,批次計算
    一致性邏輯分散在多處無論來源(channel job / API),走同一套邏輯

    最後一點是最重要的:一致性。不管訂單是從 Shopee channel job 來的,還是透過 API 手動新增的,都走同一個 OrderUpsertConsumer,同一套去重邏輯,同一套 stats pipeline。系統裡沒有「繞過」的快捷路徑。

    今日修改摘要

    檔案 類型 說明
    01-schema.sqlBug FixNOT NULL、DEFAULT partition、enum 大小寫
    02-seed-data.sqlBug FixBCrypt hash、訂單狀態大小寫、daily_statistics 欄位名稱
    OrderUpsertConsumerBug Fix.get().path(),移除 unused import
    ReturnUpsertConsumerBug Fix加 stats dirty marker、移除 unused import
    DailyStatisticsServiceBug Fixearly return 時刪除過時 stats 列
    OrderServiceBug FixNOT NULL 欄位的 null 守衛
    docker-compose.ymlInfra所有 21 個 Java 容器加 JAVA_TOOL_OPTIONS
    UserOrderControllerFeature新增 POST /api/user/orders → Kafka pipeline
    docker/test-data-generator/FeaturePython seeder,透過 API 打假訂單

    結語:追蹤路徑比結果更重要

    今天花最多時間的不是寫 code,而是「把對的事情弄清楚」。Seeder 寫了三個版本,不是因為技術難,而是因為對系統的理解在逐漸深化。

    一個好的事件驅動系統,它的「正確入口」只有一個。找到那個入口,比快速把功能做出來更重要。這條原則同樣適用於大系統的任何角落:追蹤路徑比結果更重要,因為你下次出問題的時候,你需要知道訊息從哪裡來、往哪裡去。

    能走事件流就走事件流。能用快取盡量快取。這不是教條,是讓大系統在出問題時還能被追蹤、被診斷、被修復的保險。

  • 分佈式健康檢查:自定義 Spring Boot Actuator

    商業價值:健康檢查讓系統「自動發現問題、自動恢復」,直接支撐 導讀篇提到的 99% 庫存準確率——系統不穩定就不可能有準確的庫存。

    前言:為什麼需要健康檢查?

    在微服務架構中,一個服務可能依賴多個外部元件:

    元件 用途 掛掉的影響
    PostgreSQL 主資料庫 無法讀寫訂單
    Redis 快取 效能下降
    Kafka 訊息佇列 無法非同步處理
    Solr 搜尋引擎 無法搜尋訂單
    問題:Kubernetes 預設只檢查 HTTP 回應,無法知道資料庫是否正常。

    Spring Boot Actuator 健康檢查

    基本設定

    # application.yml
    management:
    endpoints:
    web:
    base-path: /
    exposure:
    include: health, info, metrics

    endpoint:
    health:
    show-details: always
    show-components: always

    health:
    # 啟用各元件的健康檢查
    db:
    enabled: true
    redis:
    enabled: true

    健康檢查端點

    端點 用途 使用場景
    /health 完整健康狀態 監控系統
    /health/liveness 存活檢查 K8s liveness probe
    /health/readiness 就緒檢查 K8s readiness probe

    自定義健康檢查指標

    Kafka 健康檢查

    @Component
    public class KafkaHealthIndicator implements HealthIndicator {

    @Value(“${kafka.bootstrap-servers}”)
    private String bootstrapServers;

    private AtomicReference<Health> cachedHealth =
    new AtomicReference<>(Health.unknown().build());

    @Override
    public Health health() {
    return cachedHealth.get();
    }

    /**
    * 背景執行緒定期檢查,避免阻塞健康檢查端點
    */

    @Scheduled(fixedRate = 30000) // 每 30 秒檢查一次
    public void checkHealth() {
    try {
    Properties props = new Properties();
    props.put(“bootstrap.servers”, bootstrapServers);
    props.put(“request.timeout.ms”, “5000”);

    try (AdminClient admin = AdminClient.create(props)) {
    admin.listTopics().names().get(5, TimeUnit.SECONDS);
    }

    cachedHealth.set(Health.up()
    .withDetail(“servers”, bootstrapServers)
    .build());

    } catch (Exception e) {
    cachedHealth.set(Health.down()
    .withDetail(“error”, e.getMessage())
    .build());
    }
    }
    }

    Solr 健康檢查

    @Component
    public class SolrHealthIndicator implements HealthIndicator {

    @Autowired
    private SolrClient solrClient;

    private AtomicReference<Health> cachedHealth =
    new AtomicReference<>(Health.unknown().build());

    @Override
    public Health health() {
    return cachedHealth.get();
    }

    @Scheduled(fixedRate = 30000)
    public void checkHealth() {
    try {
    SolrPingResponse response = solrClient.ping();
    int status = response.getStatus();

    if (status == 0) {
    cachedHealth.set(Health.up()
    .withDetail(“responseTime”, response.getQTime())
    .build());
    } else {
    cachedHealth.set(Health.down()
    .withDetail(“status”, status)
    .build());
    }

    } catch (Exception e) {
    cachedHealth.set(Health.down()
    .withDetail(“error”, e.getMessage())
    .build());
    }
    }
    }


    健康檢查回應範例

    {
    “status”: “UP”,
    “components”: {
    “db”: {
    “status”: “UP”,
    “details”: {
    “database”: “PostgreSQL”,
    “validationQuery”: “isValid()”
    }
    },
    “kafka”: {
    “status”: “UP”,
    “details”: {
    “servers”: “kafka:9092”
    }
    },
    “redis”: {
    “status”: “UP”,
    “details”: {
    “version”: “7.0.0”
    }
    },
    “solr”: {
    “status”: “UP”,
    “details”: {
    “responseTime”: 5
    }
    }
    }
    }

    Kubernetes 整合

    # deployment.yaml
    spec:
    containers:
    – name: oms-service
    # 存活檢查:程式是否還活著
    livenessProbe:
    httpGet:
    path: /health/liveness
    port: 8080
    initialDelaySeconds: 30
    periodSeconds: 10
    timeoutSeconds: 5
    failureThreshold: 3

    # 就緒檢查:是否可以接受流量
    readinessProbe:
    httpGet:
    path: /health/readiness
    port: 8080
    initialDelaySeconds: 20
    periodSeconds: 5
    timeoutSeconds: 3
    failureThreshold: 3

    Probe 類型 失敗後行為 使用場景
    liveness 重啟 Pod 程式死當、無回應
    readiness 從 Service 移除 暫時無法服務(如 DB 斷線)

    設計考量

    為什麼用背景執行緒 + 快取?

    • 健康檢查端點需要快速回應(< 1秒)
    • 外部元件檢查可能很慢(網路延遲)
    • Kubernetes 頻繁呼叫(每 5-10 秒)
    設計 說明
    背景檢查 每 30 秒執行一次,不阻塞端點
    結果快取 AtomicReference 儲存最新狀態
    逾時設定 檢查逾時 5 秒,避免卡住
    狀態詳情 包含時間、錯誤訊息等資訊

    監控整合

    將健康狀態匯出到 Prometheus:

    # 健康狀態指標
    health_check_status{component=”kafka”} 1
    health_check_status{component=”solr”} 1
    health_check_status{component=”redis”} 1
    health_check_status{component=”db”} 1

    # 檢查執行時間
    health_check_duration_seconds{component=”kafka”} 0.023
    health_check_duration_seconds{component=”solr”} 0.005


    總結

    設計 效果
    自定義 HealthIndicator 檢查所有依賴元件
    背景執行 + 快取 端點回應快速
    K8s Probe 整合 自動重啟/移除故障 Pod
    Prometheus 匯出 歷史趨勢監控

    為什麼不用其他方案?

    方案 優點 缺點 結論
    只靠 K8s 預設檢查 零設定 只檢查 HTTP 回應,不知道 DB 狀態 不夠
    外部監控工具打 API 不侵入程式碼 只知道 API 回應,不知道內部狀態 補充用
    自己寫健康檢查 API 完全控制 要自己處理快取、超時 重複造輪子
    Actuator + 自訂 整合好、可擴展 要學 Spring 生態 Spring 專案首選

    實戰踩坑

    坑 1:健康檢查太慢導致 Pod 被殺

    最初健康檢查直接連 Kafka,網路慢時要 10 秒才回應。K8s 以為 Pod 死了,不斷重啟。解法:改成背景執行緒定期檢查,健康端點只回傳快取結果。

    坑 2:Liveness 和 Readiness 混用

    最初兩個 Probe 用同一個端點。結果 Kafka 斷線時,所有 Pod 都被重啟(Liveness 失敗)。正確做法:Liveness 只檢查「程式還活著」,Readiness 檢查「能不能接流量」。Kafka 斷線應該是 Readiness 失敗(從 Service 移除),不是 Liveness 失敗(重啟)。

    坑 3:忘記設定 initialDelaySeconds

    應用程式啟動要 30 秒,但健康檢查 10 秒就開始。結果 Pod 永遠起不來,一直被重啟。


    系列導航

    ◀ 上一篇
    多租戶認證
    📚 返回目錄 下一篇 ▶
    DTO 設計
  • Kafka 事件驅動架構:打造高可用訂單處理系統

    商業價值:事件驅動架構讓系統能「處理速度提升 10 倍」,從 4-8 小時縮短到 25-35 分鐘。詳見 導讀篇的 ROI 計算

    前言:為什麼需要事件驅動?

    想像一個場景:使用者在後台點擊「同步蝦皮訂單」。

    同步處理的問題:

    • 蝦皮 API 回應慢 → 使用者等待 30 秒以上
    • API 超時 → 整個請求失敗
    • 大量請求 → 伺服器資源耗盡

    解決方案:非同步事件驅動

    使用者請求 背景處理
    │ │
    ▼ │
    ┌─────────┐ 發送訊息 ┌─────────┐
    │ Web API │ ──────────────► │ Kafka │
    └─────────┘ └────┬────┘
    │ │
    ▼ ▼
    回應成功 ┌───────────────┐
    (立即返回) │ Consumer Job │
    │ 慢慢處理… │
    └───────────────┘
    效果:使用者立即收到「已排程」回應,實際同步在背景執行。

    架構設計

    Topic 設計:每個通路獨立

    Topic 名稱 用途 Consumer
    oms-action-shopee 蝦皮相關動作 Shopee Consumer
    oms-action-momo Momo 相關動作 Momo Consumer
    oms-action-yahoo Yahoo 相關動作 Yahoo Consumer
    oms-action-pchome PChome 相關動作 PChome Consumer
    為什麼要分開?

    • 蝦皮 API 壞了,不影響 Momo 訂單處理
    • 可以針對不同平台調整 Consumer 數量
    • 方便監控各平台的處理狀況

    Producer:發送訊息

    @Service
    public class ActionProducer {

    private final KafkaTemplate<String, String> kafkaTemplate;

    /**
    * 發送動作到對應的通路 Topic
    */

    public void sendAction(ChannelType channel, ActionMessage message) {
    String topic = “oms-action-“ + channel.getCode();
    String payload = JsonUtil.toJson(message);

    kafkaTemplate.send(topic, message.getMerchantId(), payload)
    .addCallback(
    result -> log.info(“發送成功: {}”, topic),
    error -> log.error(“發送失敗: {}”, error.getMessage())
    );
    }
    }

    訊息格式設計

    {
    “header”: {
    “messageId”: “uuid-xxxx-xxxx”,
    “timestamp”: “2024-03-18T10:30:00Z”,
    “traceId”: “trace-xxxx”
    },
    “body”: {
    “merchantId”: “M001”,
    “actionType”: “SYNC_ORDERS”,
    “parameters”: {
    “startDate”: “2024-03-17”,
    “endDate”: “2024-03-18”
    }
    }
    }

    Consumer:處理訊息

    @Component
    public class ActionConsumer {

    @Autowired
    private ChannelFactory channelFactory;

    @KafkaListener(topics = “oms-action-shopee”)
    public void consumeShopee(String message) {
    processAction(ChannelType.SHOPEE, message);
    }

    @KafkaListener(topics = “oms-action-momo”)
    public void consumeMomo(String message) {
    processAction(ChannelType.MOMO, message);
    }

    private void processAction(ChannelType channel, String message) {
    try {
    // 1. 解析訊息
    ActionMessage action = JsonUtil.fromJson(message);

    // 2. 取得對應的通路處理器
    ChannelAction handler = channelFactory.getAction(channel);

    // 3. 執行動作
    ActionResult result = handler.execute(action);

    // 4. 回寫結果
    saveResult(action, result);

    } catch (Exception e) {
    // 5. 錯誤處理
    handleError(message, e);
    }
    }
    }


    錯誤處理策略

    錯誤類型 處理方式 範例
    暫時性錯誤 重試 3 次 API 超時、網路問題
    永久性錯誤 記錄並跳過 資料格式錯誤
    未知錯誤 進入 Dead Letter Queue 系統異常
    @Bean
    public DefaultErrorHandler errorHandler() {
    // 設定重試策略
    BackOff backOff = new ExponentialBackOff(1000L, 2.0);
    backOff.setMaxElapsedTime(30000L); // 最多重試 30 秒

    return new DefaultErrorHandler(
    (record, exception) -> {
    // 重試失敗後,送到 Dead Letter Queue
    sendToDeadLetterQueue(record, exception);
    },
    backOff
    );
    }


    監控與告警

    監控指標 正常值 告警條件
    Consumer Lag < 1000 > 5000 持續 5 分鐘
    處理時間 < 5 秒 > 30 秒
    錯誤率 < 1% > 5%
    Dead Letter 數量 0 > 10

    效能調校

    # application.yml
    spring:
    kafka:
    consumer:
    # 每次拉取的最大筆數
    max-poll-records: 100

    # 拉取間隔
    fetch-min-size: 1
    fetch-max-wait: 500ms

    producer:
    # 批次發送設定
    batch-size: 16384
    buffer-memory: 33554432

    # 壓縮
    compression-type: lz4


    總結

    設計 效果
    非同步處理 使用者不用等待 API 回應
    Topic 分離 通路故障隔離
    重試機制 暫時性錯誤自動恢復
    Dead Letter Queue 問題訊息不遺失
    監控告警 問題即時發現

    為什麼不用其他方案?

    方案 優點 缺點 結論
    同步處理 簡單、好除錯 使用者要等、效能差 小流量可用
    Redis Queue 輕量、快速 持久化弱、無法分區 簡單場景可用
    RabbitMQ 功能豐富、可靠 吞吐量不如 Kafka 適合複雜路由
    Kafka 高吞吐、持久化、分區 學習曲線、維運成本 大流量首選

    實戰踩坑

    坑 1:Consumer Lag 暴增

    雙 11 當天 Consumer Lag 飆到 50,000+,訂單處理延遲 2 小時。原因:單一 Consumer 處理太慢。解法:增加 Consumer 數量到 Partition 數量,同時優化處理邏輯(批次處理)。

    坑 2:訊息重複消費

    Consumer 處理到一半掛掉,重啟後同一筆訂單被處理兩次,導致重複出貨。解法:加入冪等性檢查(用訂單 ID 去重)。

    坑 3:Topic 沒分開

    最初所有平台共用一個 Topic,蝦皮 API 壞了堵住整條 Queue,Momo 訂單也跟著延遲。後來拆成每個平台獨立 Topic,故障隔離。


    系列導航

    ◀ 上一篇
    工廠模式
    📚 返回目錄 下一篇 ▶
    多租戶認證
  • 分散式資料庫訊息

    重要觀念

    分散式資料庫鎖

    https://www.gushiciku.cn/pl/gkKm/zh-tw

    (閱讀全文…)

  • 成本預估故事

    Cost

    所有服務

    服務 VM VCPU Memory size(GB) Hard disk size(GB)
    K8s(Run 69 Pods) 3 16 128 200
    Solr 2 16 16 200
    PostgreSQL 2 16 32 600
    Kafka 3 4 4 80
    ZooKeeper(zk01) 1 16 8 20
    ZooKeeper(zk02,zk03) 2 4 4 20
    Infinispan 2 2 4 20
    HAProxy 2 4 4 20
    Nginx 2 2 4 50
    GitLab 1 8 8 100
    Jenkins 1 2 4 50
    Harbor Registry (IMG Hub) 1 2 2 100
    Elasticsearch 1 8 8 750
    Logstash 1 4 4 20
    Kibana 1 4 8 100
    DNS 1 2 2 16
    MAIL Server 1 4 4 20
    Object Storage (Ceph) 3 4 4 150

    故事

    2021

    5月 我加入精誠,非Oneec身分,但是閒暇時會與Ethan進行相關的討論,並且不時會看SHOPEE跟東森的API文件思考架構
    8月 infra加入精誠,非Oneec身分,但是Ethan已經準備好了技術選型並且請這位Infra整理機器,清理空間
    9月 PM加入精誠 ,Oneec身分,Ethan請她進行思考
    10月 最強的全端RD入場,Oneec身分,Ethan請他跟Infra準備K8S環境底下的高可用環境程式
    12月 還在討論Topic,12月中 全端RD回報,準備好了,開工

    (閱讀全文…)

  • kafka 綜合筆記

    名詞介紹

    1. Producer:訊息生產者
    2. Broker:傳遞訊息的中介者
    3. Consumer:訊息消費者
    4. Topic:訊息的主題
    5. Partition:主題內的分區
    6. ComsumerGroup:消費者群組

    Producer

    只要是發送訊息出去的都是這一個腳色,定位上是往kafka push queue的就是。

    (閱讀全文…)