分類: 技術-資料庫

資料庫技術,包含 Kafka、PostgreSQL、SOLR、Elasticsearch 等

  • 分佈式健康檢查:自定義 Spring Boot Actuator

    商業價值:健康檢查讓系統「自動發現問題、自動恢復」,直接支撐 導讀篇提到的 99% 庫存準確率——系統不穩定就不可能有準確的庫存。

    前言:為什麼需要健康檢查?

    在微服務架構中,一個服務可能依賴多個外部元件:

    元件 用途 掛掉的影響
    PostgreSQL 主資料庫 無法讀寫訂單
    Redis 快取 效能下降
    Kafka 訊息佇列 無法非同步處理
    Solr 搜尋引擎 無法搜尋訂單
    問題:Kubernetes 預設只檢查 HTTP 回應,無法知道資料庫是否正常。

    Spring Boot Actuator 健康檢查

    基本設定

    # application.yml
    management:
    endpoints:
    web:
    base-path: /
    exposure:
    include: health, info, metrics

    endpoint:
    health:
    show-details: always
    show-components: always

    health:
    # 啟用各元件的健康檢查
    db:
    enabled: true
    redis:
    enabled: true

    健康檢查端點

    端點 用途 使用場景
    /health 完整健康狀態 監控系統
    /health/liveness 存活檢查 K8s liveness probe
    /health/readiness 就緒檢查 K8s readiness probe

    自定義健康檢查指標

    Kafka 健康檢查

    @Component
    public class KafkaHealthIndicator implements HealthIndicator {

    @Value(“${kafka.bootstrap-servers}”)
    private String bootstrapServers;

    private AtomicReference<Health> cachedHealth =
    new AtomicReference<>(Health.unknown().build());

    @Override
    public Health health() {
    return cachedHealth.get();
    }

    /**
    * 背景執行緒定期檢查,避免阻塞健康檢查端點
    */

    @Scheduled(fixedRate = 30000) // 每 30 秒檢查一次
    public void checkHealth() {
    try {
    Properties props = new Properties();
    props.put(“bootstrap.servers”, bootstrapServers);
    props.put(“request.timeout.ms”, “5000”);

    try (AdminClient admin = AdminClient.create(props)) {
    admin.listTopics().names().get(5, TimeUnit.SECONDS);
    }

    cachedHealth.set(Health.up()
    .withDetail(“servers”, bootstrapServers)
    .build());

    } catch (Exception e) {
    cachedHealth.set(Health.down()
    .withDetail(“error”, e.getMessage())
    .build());
    }
    }
    }

    Solr 健康檢查

    @Component
    public class SolrHealthIndicator implements HealthIndicator {

    @Autowired
    private SolrClient solrClient;

    private AtomicReference<Health> cachedHealth =
    new AtomicReference<>(Health.unknown().build());

    @Override
    public Health health() {
    return cachedHealth.get();
    }

    @Scheduled(fixedRate = 30000)
    public void checkHealth() {
    try {
    SolrPingResponse response = solrClient.ping();
    int status = response.getStatus();

    if (status == 0) {
    cachedHealth.set(Health.up()
    .withDetail(“responseTime”, response.getQTime())
    .build());
    } else {
    cachedHealth.set(Health.down()
    .withDetail(“status”, status)
    .build());
    }

    } catch (Exception e) {
    cachedHealth.set(Health.down()
    .withDetail(“error”, e.getMessage())
    .build());
    }
    }
    }


    健康檢查回應範例

    {
    “status”: “UP”,
    “components”: {
    “db”: {
    “status”: “UP”,
    “details”: {
    “database”: “PostgreSQL”,
    “validationQuery”: “isValid()”
    }
    },
    “kafka”: {
    “status”: “UP”,
    “details”: {
    “servers”: “kafka:9092”
    }
    },
    “redis”: {
    “status”: “UP”,
    “details”: {
    “version”: “7.0.0”
    }
    },
    “solr”: {
    “status”: “UP”,
    “details”: {
    “responseTime”: 5
    }
    }
    }
    }

    Kubernetes 整合

    # deployment.yaml
    spec:
    containers:
    – name: oms-service
    # 存活檢查:程式是否還活著
    livenessProbe:
    httpGet:
    path: /health/liveness
    port: 8080
    initialDelaySeconds: 30
    periodSeconds: 10
    timeoutSeconds: 5
    failureThreshold: 3

    # 就緒檢查:是否可以接受流量
    readinessProbe:
    httpGet:
    path: /health/readiness
    port: 8080
    initialDelaySeconds: 20
    periodSeconds: 5
    timeoutSeconds: 3
    failureThreshold: 3

    Probe 類型 失敗後行為 使用場景
    liveness 重啟 Pod 程式死當、無回應
    readiness 從 Service 移除 暫時無法服務(如 DB 斷線)

    設計考量

    為什麼用背景執行緒 + 快取?

    • 健康檢查端點需要快速回應(< 1秒)
    • 外部元件檢查可能很慢(網路延遲)
    • Kubernetes 頻繁呼叫(每 5-10 秒)
    設計 說明
    背景檢查 每 30 秒執行一次,不阻塞端點
    結果快取 AtomicReference 儲存最新狀態
    逾時設定 檢查逾時 5 秒,避免卡住
    狀態詳情 包含時間、錯誤訊息等資訊

    監控整合

    將健康狀態匯出到 Prometheus:

    # 健康狀態指標
    health_check_status{component=”kafka”} 1
    health_check_status{component=”solr”} 1
    health_check_status{component=”redis”} 1
    health_check_status{component=”db”} 1

    # 檢查執行時間
    health_check_duration_seconds{component=”kafka”} 0.023
    health_check_duration_seconds{component=”solr”} 0.005


    總結

    設計 效果
    自定義 HealthIndicator 檢查所有依賴元件
    背景執行 + 快取 端點回應快速
    K8s Probe 整合 自動重啟/移除故障 Pod
    Prometheus 匯出 歷史趨勢監控

    為什麼不用其他方案?

    方案 優點 缺點 結論
    只靠 K8s 預設檢查 零設定 只檢查 HTTP 回應,不知道 DB 狀態 不夠
    外部監控工具打 API 不侵入程式碼 只知道 API 回應,不知道內部狀態 補充用
    自己寫健康檢查 API 完全控制 要自己處理快取、超時 重複造輪子
    Actuator + 自訂 整合好、可擴展 要學 Spring 生態 Spring 專案首選

    實戰踩坑

    坑 1:健康檢查太慢導致 Pod 被殺

    最初健康檢查直接連 Kafka,網路慢時要 10 秒才回應。K8s 以為 Pod 死了,不斷重啟。解法:改成背景執行緒定期檢查,健康端點只回傳快取結果。

    坑 2:Liveness 和 Readiness 混用

    最初兩個 Probe 用同一個端點。結果 Kafka 斷線時,所有 Pod 都被重啟(Liveness 失敗)。正確做法:Liveness 只檢查「程式還活著」,Readiness 檢查「能不能接流量」。Kafka 斷線應該是 Readiness 失敗(從 Service 移除),不是 Liveness 失敗(重啟)。

    坑 3:忘記設定 initialDelaySeconds

    應用程式啟動要 30 秒,但健康檢查 10 秒就開始。結果 Pod 永遠起不來,一直被重啟。


    系列導航

    ◀ 上一篇
    多租戶認證
    📚 返回目錄 下一篇 ▶
    DTO 設計
  • Kafka 事件驅動架構:打造高可用訂單處理系統

    商業價值:事件驅動架構讓系統能「處理速度提升 10 倍」,從 4-8 小時縮短到 25-35 分鐘。詳見 導讀篇的 ROI 計算

    前言:為什麼需要事件驅動?

    想像一個場景:使用者在後台點擊「同步蝦皮訂單」。

    同步處理的問題:

    • 蝦皮 API 回應慢 → 使用者等待 30 秒以上
    • API 超時 → 整個請求失敗
    • 大量請求 → 伺服器資源耗盡

    解決方案:非同步事件驅動

    使用者請求 背景處理
    │ │
    ▼ │
    ┌─────────┐ 發送訊息 ┌─────────┐
    │ Web API │ ──────────────► │ Kafka │
    └─────────┘ └────┬────┘
    │ │
    ▼ ▼
    回應成功 ┌───────────────┐
    (立即返回) │ Consumer Job │
    │ 慢慢處理… │
    └───────────────┘
    效果:使用者立即收到「已排程」回應,實際同步在背景執行。

    架構設計

    Topic 設計:每個通路獨立

    Topic 名稱 用途 Consumer
    oms-action-shopee 蝦皮相關動作 Shopee Consumer
    oms-action-momo Momo 相關動作 Momo Consumer
    oms-action-yahoo Yahoo 相關動作 Yahoo Consumer
    oms-action-pchome PChome 相關動作 PChome Consumer
    為什麼要分開?

    • 蝦皮 API 壞了,不影響 Momo 訂單處理
    • 可以針對不同平台調整 Consumer 數量
    • 方便監控各平台的處理狀況

    Producer:發送訊息

    @Service
    public class ActionProducer {

    private final KafkaTemplate<String, String> kafkaTemplate;

    /**
    * 發送動作到對應的通路 Topic
    */

    public void sendAction(ChannelType channel, ActionMessage message) {
    String topic = “oms-action-“ + channel.getCode();
    String payload = JsonUtil.toJson(message);

    kafkaTemplate.send(topic, message.getMerchantId(), payload)
    .addCallback(
    result -> log.info(“發送成功: {}”, topic),
    error -> log.error(“發送失敗: {}”, error.getMessage())
    );
    }
    }

    訊息格式設計

    {
    “header”: {
    “messageId”: “uuid-xxxx-xxxx”,
    “timestamp”: “2024-03-18T10:30:00Z”,
    “traceId”: “trace-xxxx”
    },
    “body”: {
    “merchantId”: “M001”,
    “actionType”: “SYNC_ORDERS”,
    “parameters”: {
    “startDate”: “2024-03-17”,
    “endDate”: “2024-03-18”
    }
    }
    }

    Consumer:處理訊息

    @Component
    public class ActionConsumer {

    @Autowired
    private ChannelFactory channelFactory;

    @KafkaListener(topics = “oms-action-shopee”)
    public void consumeShopee(String message) {
    processAction(ChannelType.SHOPEE, message);
    }

    @KafkaListener(topics = “oms-action-momo”)
    public void consumeMomo(String message) {
    processAction(ChannelType.MOMO, message);
    }

    private void processAction(ChannelType channel, String message) {
    try {
    // 1. 解析訊息
    ActionMessage action = JsonUtil.fromJson(message);

    // 2. 取得對應的通路處理器
    ChannelAction handler = channelFactory.getAction(channel);

    // 3. 執行動作
    ActionResult result = handler.execute(action);

    // 4. 回寫結果
    saveResult(action, result);

    } catch (Exception e) {
    // 5. 錯誤處理
    handleError(message, e);
    }
    }
    }


    錯誤處理策略

    錯誤類型 處理方式 範例
    暫時性錯誤 重試 3 次 API 超時、網路問題
    永久性錯誤 記錄並跳過 資料格式錯誤
    未知錯誤 進入 Dead Letter Queue 系統異常
    @Bean
    public DefaultErrorHandler errorHandler() {
    // 設定重試策略
    BackOff backOff = new ExponentialBackOff(1000L, 2.0);
    backOff.setMaxElapsedTime(30000L); // 最多重試 30 秒

    return new DefaultErrorHandler(
    (record, exception) -> {
    // 重試失敗後,送到 Dead Letter Queue
    sendToDeadLetterQueue(record, exception);
    },
    backOff
    );
    }


    監控與告警

    監控指標 正常值 告警條件
    Consumer Lag < 1000 > 5000 持續 5 分鐘
    處理時間 < 5 秒 > 30 秒
    錯誤率 < 1% > 5%
    Dead Letter 數量 0 > 10

    效能調校

    # application.yml
    spring:
    kafka:
    consumer:
    # 每次拉取的最大筆數
    max-poll-records: 100

    # 拉取間隔
    fetch-min-size: 1
    fetch-max-wait: 500ms

    producer:
    # 批次發送設定
    batch-size: 16384
    buffer-memory: 33554432

    # 壓縮
    compression-type: lz4


    總結

    設計 效果
    非同步處理 使用者不用等待 API 回應
    Topic 分離 通路故障隔離
    重試機制 暫時性錯誤自動恢復
    Dead Letter Queue 問題訊息不遺失
    監控告警 問題即時發現

    為什麼不用其他方案?

    方案 優點 缺點 結論
    同步處理 簡單、好除錯 使用者要等、效能差 小流量可用
    Redis Queue 輕量、快速 持久化弱、無法分區 簡單場景可用
    RabbitMQ 功能豐富、可靠 吞吐量不如 Kafka 適合複雜路由
    Kafka 高吞吐、持久化、分區 學習曲線、維運成本 大流量首選

    實戰踩坑

    坑 1:Consumer Lag 暴增

    雙 11 當天 Consumer Lag 飆到 50,000+,訂單處理延遲 2 小時。原因:單一 Consumer 處理太慢。解法:增加 Consumer 數量到 Partition 數量,同時優化處理邏輯(批次處理)。

    坑 2:訊息重複消費

    Consumer 處理到一半掛掉,重啟後同一筆訂單被處理兩次,導致重複出貨。解法:加入冪等性檢查(用訂單 ID 去重)。

    坑 3:Topic 沒分開

    最初所有平台共用一個 Topic,蝦皮 API 壞了堵住整條 Queue,Momo 訂單也跟著延遲。後來拆成每個平台獨立 Topic,故障隔離。


    系列導航

    ◀ 上一篇
    工廠模式
    📚 返回目錄 下一篇 ▶
    多租戶認證
  • Solr 實戰完全筆記:從基礎語法到效能調校與評分機制

    基本參數介紹

    • q:查詢的關鍵字,此參數最為重要,例如 q=id:1,默認為 q=*:*
    • fl:指定返回哪些字段,用逗號或空格分隔,注意字段區分大小寫,例如 fl=id,title,sort
    • start:返回結果的第幾條記錄開始,一般分頁用,默認 0 開始
    • rows:指定返回結果最多有多少條記錄,默認值為 10,配合 start 實現分頁
    • sort:排序方式,例如 id desc 表示按照 id 降序,多個字段:score desc, price asc
    • wt:(writer type) 指定輸出格式,有 xml, json, php 等
    • fq:(filter query) 過濾查詢,提供一個可選的篩選器查詢,例如:q=id:1&fq=sort:[1 TO 5]
    • df:默認的查詢字段,一般默認指定
    • qt:(query type) 指定哪個類型來處理查詢請求,一般不用指定,默認是 standard
    • indent:返回的結果是否縮進,默認關閉,用 indent=true 開啟

    查詢語法

    • : 指定字段查指定值,如返回所有值 *:*
    • ? 表示單個任意字符的通配
    • * 表示多個任意字符的通配(不能在檢索的項開始使用)
    • ~ 表示模糊檢索,如 roam~ 將找到 foam 和 roams;roam~0.8 檢索返回相似度在 0.8 以上的記錄
    • AND|| 布爾操作符
    • OR&& 布爾操作符
    • NOT!- 排除操作符
    • + 存在操作符,要求符號後的項必須在文檔中存在
    • ( ) 用於構成子查詢
    • [] 包含範圍檢索,如 date:[201507 TO 201510] 包含頭尾
    • {} 不包含範圍檢索,如 date:{201507 TO 201510} 不包含頭尾

    Solr 本質

    Solr 本質上還是搜尋引擎,因此優先還是 index 其後才是 store。
    Partial update 也是先把資料拉回來重新 index 後 store。
    順序:index 先,然後 store

    (閱讀全文…)

  • Kafka 實務坑筆記(九):Topic 分流設計的藝術

    問題背景

    我們的 Kafka Topic 設計是依照平台分類

    (閱讀全文…)