Apache Kafka 完全指南

Apache Kafka 是一個分散式串流平台,用來處理大量即時資料流,提供訊息佇列、資料串流和持久化儲存功能。


目錄


什麼是 Kafka?

Apache Kafka 是一個分散式串流平台(Distributed Streaming Platform),用來處理大量即時資料流。

核心功能

  1. 訊息佇列(Message Queue):生產者發送訊息,消費者接收訊息
  2. 資料串流(Data Streaming):即時處理持續流入的資料
  3. 資料儲存(Data Storage):持久化儲存訊息,可重複讀取

為什麼需要 Kafka?

傳統問題

服務 A → 直接呼叫 → 服務 B
                   → 服務 C
                   → 服務 D
  • ❌ 服務之間緊密耦合
  • ❌ 一個服務掛掉,全部受影響
  • ❌ 難以擴展

使用 Kafka 後

服務 A → Kafka ← 服務 B
              ← 服務 C
              ← 服務 D
  • ✅ 服務解耦
  • ✅ 非同步處理
  • ✅ 易於擴展

Kafka 核心概念

基本架構

Producer(生產者)→ Kafka Cluster(Kafka 叢集)→ Consumer(消費者)
                   [Topic 1]
                   [Topic 2]
                   [Topic 3]

1. Producer(生產者)

作用:產生並發送訊息到 Kafka

範例

  • Web 應用程式記錄使用者行為
  • IoT 設備發送感測器資料
  • 日誌收集系統
// Java Producer 範例
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("my-topic", "key", "Hello Kafka"));
producer.close();

2. Consumer(消費者)

作用:從 Kafka 讀取並處理訊息

特性

  • 可以有多個消費者同時讀取
  • 每個消費者獨立追蹤自己的讀取進度(offset)
// Java Consumer 範例
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s%n",
                          record.offset(), record.key(), record.value());
    }
}

3. Topic(主題)

作用:訊息的分類單位,類似資料庫的「表」

特性

  • 一個 Topic 可以有多個 Producer 和 Consumer
  • Topic 中的訊息是有序的
  • 訊息保留時間可設定(預設 7 天)

範例

Topic: user-events
Topic: order-created
Topic: payment-processed

4. Partition(分區)

作用:Topic 的物理分割,用來提高並行處理能力

結構

Topic: user-events
├─ Partition 0: [msg1, msg2, msg3]
├─ Partition 1: [msg4, msg5, msg6]
└─ Partition 2: [msg7, msg8, msg9]

特性

  • 每個 Partition 內部訊息有序
  • 不同 Partition 之間訊息無法保證順序
  • 相同 key 的訊息會進入同一個 Partition

為什麼需要 Partition?

  • 平行處理:多個 Consumer 可以同時讀取不同 Partition
  • 水平擴展:增加 Partition 數量提升吞吐量
  • 容錯能力:每個 Partition 有多個副本

5. Offset(偏移量)

作用:訊息在 Partition 中的唯一序號

運作方式

Partition 0:
offset: 0    1    2    3    4    5
msg:  [msg1][msg2][msg3][msg4][msg5][msg6]
  Consumer 已讀到這裡

特性

  • Consumer 自己記錄讀到哪裡(offset)
  • 可以重設 offset,重新讀取歷史訊息
  • 每個 Consumer Group 維護自己的 offset

6. Consumer Group(消費者群組)

作用:多個 Consumer 組成一個群組,共同消費一個 Topic

運作模式

模式 1:負載平衡(一個群組)

Topic (3 Partitions)
├─ P0 → Consumer A ┐
├─ P1 → Consumer B ├─ Consumer Group 1
└─ P2 → Consumer C ┘

每個 Partition 只被一個 Consumer 讀取
效果:負載平衡,提高處理速度

模式 2:廣播(多個群組)

Topic (3 Partitions)
├─ P0 → Consumer A (Group 1)
│     → Consumer D (Group 2)
├─ P1 → Consumer B (Group 1)
│     → Consumer E (Group 2)
└─ P2 → Consumer C (Group 1)
      → Consumer F (Group 2)

每個群組都能讀到完整訊息
效果:廣播給多個應用

規則

  • 同一個 Consumer Group 內,每個 Partition 只能被一個 Consumer 讀取
  • 不同 Consumer Group 可以獨立消費相同的訊息

7. Broker(代理伺服器)

作用:Kafka 叢集中的伺服器節點,儲存和處理訊息

架構

Kafka Cluster
├─ Broker 1 (Leader for P0)
├─ Broker 2 (Leader for P1)
└─ Broker 3 (Leader for P2)

特性

  • 一個叢集通常有多個 Broker
  • 每個 Broker 儲存部分 Partition
  • Broker 之間互相複製資料(Replication)

8. Replication(副本)

作用:資料備份,提高可靠性

運作方式

Partition 0 (Replication Factor = 3)
├─ Broker 1: Leader(主副本,讀寫)
├─ Broker 2: Follower(從副本,備份)
└─ Broker 3: Follower(從副本,備份)

如果 Broker 1 掛掉,Broker 2 或 3 會成為新的 Leader

設定

# 建立 Topic 時設定副本數
kafka-topics.sh --create \
  --topic my-topic \
  --partitions 3 \
  --replication-factor 3 \
  --bootstrap-server localhost:9092

9. ZooKeeper(協調器)

作用:管理 Kafka 叢集的元資料和協調

負責

  • Broker 註冊和健康檢查
  • Topic 和 Partition 資訊
  • Leader 選舉
  • 配置管理

注意:Kafka 3.x+ 開始可以不依賴 ZooKeeper(KRaft 模式)


Kafka 架構圖

完整架構

┌─────────────────────────────────────────────────┐
│              ZooKeeper Cluster                  │
│         (協調 Kafka Cluster)                     │
└────────────────┬────────────────────────────────┘
┌────────────────┴────────────────────────────────┐
│            Kafka Cluster                        │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐       │
│  │ Broker 1 │  │ Broker 2 │  │ Broker 3 │       │
│  │          │  │          │  │          │       │
│  │ Topic A  │  │ Topic A  │  │ Topic B  │       │
│  │  - P0(L) │  │  - P0(F) │  │  - P1(L) │       │
│  │  - P1(F) │  │  - P1(F) │  │  - P2(L) │       │
│  └──────────┘  └──────────┘  └──────────┘       │
└─────────────────────────────────────────────────┘
         ↑                            ↓
    [Producer]                   [Consumer]

說明:

  • L = Leader(主副本)
  • F = Follower(從副本)

Kafka 工作原理

1. 訊息發送流程

1. Producer 產生訊息
2. 根據 key 或 round-robin 選擇 Partition
3. 發送到該 Partition 的 Leader Broker
4. Leader 寫入本地磁碟
5. Follower 從 Leader 同步
6. 確認寫入成功(根據 acks 設定)
7. 回傳成功給 Producer

acks 設定

  • acks=0:不等待確認(最快,可能丟失)
  • acks=1:等待 Leader 確認(預設)
  • acks=all:等待所有副本確認(最安全,最慢)

2. 訊息消費流程

1. Consumer 加入 Consumer Group
2. Kafka 協調器分配 Partition 給 Consumer
3. Consumer 從分配的 Partition 讀取
4. 從上次的 offset 開始讀取
5. 處理訊息
6. 提交 offset(自動或手動)

3. Partition 分配策略

策略 1:Range(範圍分配)

Topic: 10 Partitions, 3 Consumers

Consumer A: P0, P1, P2, P3
Consumer B: P4, P5, P6
Consumer C: P7, P8, P9

策略 2:Round-Robin(輪詢分配)

Topic: 10 Partitions, 3 Consumers

Consumer A: P0, P3, P6, P9
Consumer B: P1, P4, P7
Consumer C: P2, P5, P8

策略 3:Sticky(黏性分配)

  • 盡量保持原有分配
  • 當 Consumer 加入/離開時才重新平衡

Kafka 使用情境

1. 日誌收集(Log Aggregation)

場景:收集多台伺服器的日誌

Web Server 1 ─┐
Web Server 2 ─┼→ Kafka (Topic: logs) → ELK Stack
Web Server 3 ─┘                        (Elasticsearch)

優點

  • 集中管理日誌
  • 即時監控
  • 歷史日誌可查

2. 訊息佇列(Message Queue)

場景:訂單處理系統

訂單服務 → Kafka (Topic: orders) → 庫存服務
                                  → 支付服務
                                  → 通知服務

優點

  • 服務解耦
  • 非同步處理
  • 容錯能力強

3. 活動追蹤(Activity Tracking)

場景:追蹤使用者行為

Web App → Kafka (Topic: user-events) → 資料分析系統
                                      → 推薦系統
                                      → 即時儀表板

資料範例

  • 使用者點擊
  • 頁面瀏覽
  • 購物車操作

4. 串流處理(Stream Processing)

場景:即時詐騙偵測

交易事件 → Kafka → Kafka Streams → 詐騙偵測 → 警報系統
                  (即時分析)

應用

  • 即時推薦
  • 異常偵測
  • 即時統計

5. Event Sourcing(事件溯源)

場景:銀行帳戶系統

所有操作都記錄為事件:
- 開戶事件
- 存款事件
- 提款事件

Kafka 儲存所有事件 → 可重建任何時間點的狀態

6. CDC(Change Data Capture)

場景:資料庫同步

MySQL → Debezium → Kafka → 其他資料庫/搜尋引擎
        (監聽變更)         (Elasticsearch, Redis)

7. 微服務通訊

場景:電商系統

訂單服務 → Kafka (order-created) → 庫存服務
                                  → 發票服務
                                  → Email 服務

優點

  • 解耦合
  • 容易擴展
  • 易於維護

Kafka vs 其他技術

Kafka vs RabbitMQ

特性 Kafka RabbitMQ
類型 分散式日誌 傳統訊息佇列
吞吐量 極高(百萬/秒) 中等(萬/秒)
訊息順序 Partition 內保證 佇列內保證
訊息保留 可長期儲存 消費後刪除
重複消費 支援 不支援
使用情境 大數據、串流處理 一般應用、任務佇列

Kafka vs Redis Pub/Sub

特性 Kafka Redis Pub/Sub
持久化 否(在記憶體)
可靠性 高(副本機制)
回溯 可以 不可以
複雜度
使用情境 大量資料、需持久化 簡單即時通知

Kafka 部署與操作

1. 使用 Docker 快速啟動

# docker-compose.yml
version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
# 啟動
docker-compose up -d

2. 常用管理命令

建立 Topic

kafka-topics.sh --create \
  --topic my-topic \
  --partitions 3 \
  --replication-factor 1 \
  --bootstrap-server localhost:9092

列出所有 Topic

kafka-topics.sh --list \
  --bootstrap-server localhost:9092

查看 Topic 詳情

kafka-topics.sh --describe \
  --topic my-topic \
  --bootstrap-server localhost:9092

刪除 Topic

kafka-topics.sh --delete \
  --topic my-topic \
  --bootstrap-server localhost:9092

3. 生產和消費測試

使用命令列 Producer

kafka-console-producer.sh \
  --topic my-topic \
  --bootstrap-server localhost:9092

# 然後輸入訊息
> Hello Kafka
> This is a test message

使用命令列 Consumer

kafka-console-consumer.sh \
  --topic my-topic \
  --from-beginning \
  --bootstrap-server localhost:9092

4. 查看 Consumer Group

# 列出所有 Consumer Group
kafka-consumer-groups.sh --list \
  --bootstrap-server localhost:9092

# 查看 Consumer Group 詳情(offset lag)
kafka-consumer-groups.sh --describe \
  --group my-group \
  --bootstrap-server localhost:9092

輸出範例:

GROUP           TOPIC      PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
my-group        my-topic   0          100             150             50
my-group        my-topic   1          200             200             0
my-group        my-topic   2          150             180             30
  • LAG:落後的訊息數(消費速度慢)

5. 重設 Consumer Offset

# 重設到最早
kafka-consumer-groups.sh --reset-offsets \
  --group my-group \
  --topic my-topic \
  --to-earliest \
  --execute \
  --bootstrap-server localhost:9092

# 重設到特定時間
kafka-consumer-groups.sh --reset-offsets \
  --group my-group \
  --topic my-topic \
  --to-datetime 2024-01-01T00:00:00.000 \
  --execute \
  --bootstrap-server localhost:9092

# 重設到特定 offset
kafka-consumer-groups.sh --reset-offsets \
  --group my-group \
  --topic my-topic:0 \
  --to-offset 100 \
  --execute \
  --bootstrap-server localhost:9092

實戰範例

範例 1:Node.js Producer

const { Kafka } = require('kafkajs');

const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['localhost:9092']
});

const producer = kafka.producer();

async function sendMessage() {
  await producer.connect();

  await producer.send({
    topic: 'user-events',
    messages: [
      {
        key: 'user-123',
        value: JSON.stringify({
          userId: 123,
          action: 'login',
          timestamp: Date.now()
        })
      }
    ]
  });

  await producer.disconnect();
}

sendMessage();

範例 2:Node.js Consumer

const { Kafka } = require('kafkajs');

const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['localhost:9092']
});

const consumer = kafka.consumer({ groupId: 'my-group' });

async function consumeMessages() {
  await consumer.connect();
  await consumer.subscribe({ topic: 'user-events', fromBeginning: true });

  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      console.log({
        partition,
        offset: message.offset,
        key: message.key.toString(),
        value: message.value.toString()
      });

      // 處理訊息
      const event = JSON.parse(message.value.toString());
      console.log('User action:', event.action);
    }
  });
}

consumeMessages();

範例 3:Python Producer

from kafka import KafkaProducer
import json

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# 發送訊息
producer.send('user-events', {
    'userId': 123,
    'action': 'login',
    'timestamp': 1234567890
})

producer.flush()
producer.close()

範例 4:Python Consumer

from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    'user-events',
    bootstrap_servers=['localhost:9092'],
    group_id='my-group',
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
    auto_offset_reset='earliest'
)

for message in consumer:
    event = message.value
    print(f"User {event['userId']} action: {event['action']}")

最佳實踐

1. Topic 與 Partition 設計

Partition 數量設定

# ✅ 推薦:根據預期吞吐量和 Consumer 數量
Partition 數量 = max(預期吞吐量 / 單個 Partition 吞吐量, Consumer 數量)

# 範例
預期吞吐量:10 MB/s
單個 Partition:1 MB/s
Consumer 數量:5

Partition 數量 = max(10, 5) = 10
# ✅ 推薦:合理的 Partition 數量
kafka-topics.sh --create \
  --topic user-events \
  --partitions 10 \
  --replication-factor 3

# ❌ 不推薦:Partition 太少或太多
--partitions 1    # 無法並行處理
--partitions 1000 # 管理開銷過大

注意事項

  • Partition 太少:無法充分利用並行,吞吐量受限
  • Partition 太多:增加管理開銷,影響 Leader 選舉效能
  • Partition 數量一旦設定難以修改,建議預留成長空間

2. 可靠性配置

Replication Factor 設定

# ✅ 推薦:生產環境使用多副本
--replication-factor 3  # 一般應用
--replication-factor 5  # 關鍵應用

# ❌ 不推薦:生產環境只用單副本
--replication-factor 1  # 僅適合測試環境

Producer 可靠性配置

// ✅ 推薦:高可靠性配置
props.put("acks", "all");           // 等待所有副本確認
props.put("retries", 3);             // 失敗重試 3 次
props.put("max.in.flight.requests.per.connection", 1); // 保證順序

// ❌ 不推薦:可能丟失資料
props.put("acks", "0");  // 不等待確認,可能丟失

Consumer 可靠性配置

// ✅ 推薦:手動提交 offset,確保處理完才提交
props.put("enable.auto.commit", "false");
consumer.commitSync();  // 處理完訊息後手動提交

// ❌ 不推薦:自動提交可能導致訊息丟失
props.put("enable.auto.commit", "true");

3. 訊息 Key 設計

保證訊息順序

// ✅ 推薦:使用業務 key 保證同一實體的訊息順序
producer.send(new ProducerRecord<>(
    "user-events",
    userId.toString(),  // key = userId,相同使用者訊息進入同一 Partition
    eventJson
));

// ✅ 推薦:訂單系統使用訂單 ID 作為 key
producer.send(new ProducerRecord<>(
    "order-events",
    orderId.toString(),
    orderData
));

// ❌ 不推薦:需要順序但沒有 key
producer.send(new ProducerRecord<>(
    "user-events",
    null,  // 無 key,訊息隨機分配,無法保證順序
    eventJson
));

均勻分布

// ✅ 推薦:key 分布均勻,避免資料傾斜
// 好的 key 選擇:userId, sessionId, deviceId

// ❌ 不推薦:key 分布不均,造成熱點 Partition
// 壞的 key 選擇:日期(所有當天資料進同一 Partition)、常數

4. 錯誤處理與重試

Consumer 錯誤處理

// ✅ 推薦:完整的錯誤處理機制
await consumer.run({
  eachMessage: async ({ topic, partition, message }) => {
    const maxRetries = 3;
    let retryCount = 0;

    while (retryCount < maxRetries) {
      try {
        // 處理訊息
        await processMessage(message);
        return; // 成功,退出
      } catch (error) {
        retryCount++;
        console.error(`處理失敗 (${retryCount}/${maxRetries}):`, error);

        if (retryCount >= maxRetries) {
          // 超過重試次數,寫入 Dead Letter Queue
          await producer.send({
            topic: 'dlq-topic',
            messages: [{
              key: message.key,
              value: message.value,
              headers: {
                'original-topic': topic,
                'error': error.message,
                'retry-count': retryCount.toString()
              }
            }]
          });
        } else {
          // 等待後重試
          await sleep(1000 * retryCount); // 指數退避
        }
      }
    }
  }
});

// ❌ 不推薦:忽略錯誤或無限重試
await consumer.run({
  eachMessage: async ({ topic, partition, message }) => {
    try {
      await processMessage(message);
    } catch (error) {
      // 忽略錯誤,可能導致訊息丟失
    }
  }
});

冪等性處理

# ✅ 推薦:使用唯一 ID 實現冪等性
def process_message(message):
    message_id = message['id']

    # 檢查是否已處理
    if redis.exists(f"processed:{message_id}"):
        return  # 跳過重複訊息

    # 處理訊息
    do_something(message)

    # 標記為已處理(設定過期時間)
    redis.set(f"processed:{message_id}", 1, ex=86400)

# ❌ 不推薦:沒有去重機制,可能重複處理
def process_message(message):
    do_something(message)  # 重複消費會重複執行

5. 效能優化

批次處理

// ✅ 推薦:批次發送提高吞吐量
props.put("batch.size", 16384);        // 批次大小 16KB
props.put("linger.ms", 10);            // 等待 10ms 累積訊息

// ❌ 不推薦:每次只發送一則訊息
// 效能低下,無法充分利用網路頻寬

壓縮設定

// ✅ 推薦:啟用壓縮節省頻寬和儲存
props.put("compression.type", "snappy"); // 或 lz4, gzip

// ❌ 不推薦:不壓縮(頻寬和儲存浪費)
props.put("compression.type", "none");

Consumer 並行處理

// ✅ 推薦:充分利用 Partition 並行能力
// Consumer 數量 = Partition 數量(最佳)
const consumers = [];
for (let i = 0; i < partitionCount; i++) {
  consumers.push(createConsumer());
}

// ❌ 不推薦:Consumer 數量不當
// Consumer 太少:無法充分並行
// Consumer 太多:超過 Partition 數量,浪費資源

6. 監控與維護

重要監控指標

# ✅ 推薦:定期監控 Consumer Lag
kafka-consumer-groups.sh --describe --group my-group \
  --bootstrap-server localhost:9092

# 關鍵指標
# - LAG:消費延遲(應該趨近 0)
# - CURRENT-OFFSET:當前消費位置
# - LOG-END-OFFSET:最新訊息位置

告警設定

# ✅ 推薦:設定監控告警
alerts:
  - name: HighConsumerLag
    condition: lag > 10000
    action: notify_team

  - name: BrokerDown
    condition: broker_unavailable
    action: critical_alert

  - name: DiskUsageHigh
    condition: disk_usage > 80%
    action: warning

定期維護

# ✅ 推薦:定期清理過期 Topic
kafka-topics.sh --delete --topic old-topic

# ✅ 推薦:定期檢查 Partition 分布
kafka-topics.sh --describe --topic my-topic

# ✅ 推薦:監控磁碟使用率
df -h /var/lib/kafka/data

7. 安全性考量

啟用認證與加密

# ✅ 推薦:生產環境啟用 SASL/SSL
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
ssl.truststore.location=/path/to/truststore.jks

# ❌ 不推薦:生產環境使用 PLAINTEXT
security.protocol=PLAINTEXT

ACL 權限控制

# ✅ 推薦:設定細粒度權限
kafka-acls.sh --add \
  --allow-principal User:app1 \
  --operation Read \
  --topic user-events

# ❌ 不推薦:所有應用共用超級使用者

8. 資源規劃

硬體建議

✅ 推薦配置
CPU:16+ cores
記憶體:64GB+
磁碟:SSD,1TB+ per broker
網路:10Gbps

❌ 不推薦
CPU 太少:< 8 cores
使用 HDD:效能差
網路頻寬不足:< 1Gbps

Broker 數量規劃

# ✅ 推薦:根據資料量和可用性需求
# 資料量:每天 1TB,保留 7 天
# 副本數:3
# 總儲存需求:1TB × 7 × 3 = 21TB
# Broker 數量:至少 3 台(每台 8TB 磁碟)

# ❌ 不推薦:單台 Broker(無容錯能力)

常見問題

Q1: Kafka 如何保證訊息不丟失?

A: 三個層面保證

  1. Producer 層面
props.put("acks", "all");  // 等待所有副本確認
props.put("retries", 3);   // 失敗重試
  1. Broker 層面
--replication-factor 3  # 多個副本
  1. Consumer 層面
// 手動提交 offset(處理完才提交)
consumer.commitSync();

Q2: Kafka 如何保證訊息順序?

A: 在 Partition 內保證順序

// 相同 key 的訊息會進入同一 Partition
producer.send(new ProducerRecord<>(
    "my-topic",
    userId,  // key
    message
));

// Partition 內訊息是有序的
// 不同 Partition 之間無法保證順序

注意:如果需要全域順序,只能用 1 個 Partition(但失去並行能力)


Q3: Consumer 掛掉後,訊息會丟失嗎?

A: 不會,因為:

  1. Kafka 會儲存訊息(預設 7 天)
  2. Offset 被記錄,Consumer 重啟後從上次位置繼續
  3. Consumer Group 會重新平衡,由其他 Consumer 接手

Q4: 如何避免重複消費?

A: 設計**冪等性(Idempotent)**處理邏輯

# 範例:使用唯一 ID 檢查
def process_message(message):
    message_id = message['id']

    # 檢查是否已處理
    if redis.exists(f"processed:{message_id}"):
        return  # 跳過重複訊息

    # 處理訊息
    do_something(message)

    # 標記為已處理
    redis.set(f"processed:{message_id}", 1, ex=86400)

Q5: Kafka 適合什麼樣的系統規模?

A:

規模 適用性 原因
小型(< 1000 msg/s) ⚠️ 可能過度設計 RabbitMQ 或 Redis 更簡單
中型(1000-10萬 msg/s) ✅ 非常適合 Kafka 的甜蜜點
大型(> 10萬 msg/s) ✅ 最佳選擇 Kafka 專為高吞吐量設計

總結

核心要點

Apache Kafka = 分散式日誌 + 訊息佇列 + 串流處理平台

關鍵特性:
  ├─ 高吞吐量:百萬訊息/秒
  ├─ 持久化:訊息可長期儲存
  ├─ 可擴展:水平擴展 Partition 和 Broker
  └─ 容錯性:多副本機制保證可靠性

核心概念速記

概念 說明 比喻
Topic 訊息分類 資料夾
Partition Topic 的分割 資料夾裡的檔案
Producer 發送訊息 寫入檔案的人
Consumer 讀取訊息 讀取檔案的人
Broker 儲存訊息的伺服器 硬碟
Offset 訊息編號 檔案行號
Consumer Group 消費者群組 讀取團隊

常用命令速查

# Topic 管理
kafka-topics.sh --create --topic my-topic --partitions 3 --replication-factor 3 --bootstrap-server localhost:9092
kafka-topics.sh --list --bootstrap-server localhost:9092
kafka-topics.sh --describe --topic my-topic --bootstrap-server localhost:9092
kafka-topics.sh --delete --topic my-topic --bootstrap-server localhost:9092

# 生產消費
kafka-console-producer.sh --topic my-topic --bootstrap-server localhost:9092
kafka-console-consumer.sh --topic my-topic --from-beginning --bootstrap-server localhost:9092

# Consumer Group
kafka-consumer-groups.sh --list --bootstrap-server localhost:9092
kafka-consumer-groups.sh --describe --group my-group --bootstrap-server localhost:9092

# Offset 重設
kafka-consumer-groups.sh --reset-offsets --group my-group --topic my-topic --to-earliest --execute --bootstrap-server localhost:9092

關鍵配置速查

Producer 配置:

  • acks: all(最安全), 1(預設), 0(最快)
  • retries: 重試次數(建議 3)
  • compression.type: 壓縮類型(snappy, lz4, gzip
  • batch.size: 批次大小(預設 16KB)

Consumer 配置:

  • group.id: Consumer Group ID(必須)
  • auto.offset.reset: earliest(從最早), latest(從最新)
  • enable.auto.commit: 自動提交 offset(建議 false)
  • max.poll.records: 每次拉取訊息數(預設 500)

Topic 配置:

  • partitions: Partition 數量(根據吞吐量設定)
  • replication-factor: 副本數(生產環境建議 3)
  • log.retention.hours: 訊息保留時間(預設 168 小時 = 7 天)

使用情境決策指南

需求 是否使用 Kafka 理由
大量日誌收集 ✅ 非常適合 高吞吐量、持久化儲存
即時串流處理 ✅ 最佳選擇 原生支援串流處理
訂單/支付(需要順序) ✅ 適合 使用 key 保證 Partition 內順序
Event Sourcing ✅ 非常適合 可重複消費歷史事件
微服務通訊 ✅ 適合 服務解耦、非同步處理
簡單任務佇列 ⚠️ 可能過度設計 RabbitMQ 更輕量
即時推播通知 ⚠️ 可能過度設計 Redis Pub/Sub 延遲更低
小型應用(< 1000 msg/s) ❌ 不建議 維護成本高,收益低

快速決策流程

需要處理大量資料?
├─ 是 → 需要持久化儲存?
│        ├─ 是 → 需要重複消費?
│        │      ├─ 是 → ✅ Kafka(最佳選擇)
│        │      └─ 否 → ✅ Kafka 或 RabbitMQ
│        └─ 否 → Redis Pub/Sub
└─ 否 → 訊息量 < 1000/s?
         ├─ 是 → RabbitMQ 或 Redis
         └─ 否 → ✅ Kafka

學習路徑

1. 理解核心概念 ✓
   - Topic, Partition, Producer, Consumer
   - Offset, Consumer Group, Broker

2. 快速部署實驗 ✓
   - Docker 快速啟動
   - 建立 Topic 和測試收發

3. 實作程式碼 ✓
   - Producer 實作(Java/Node.js/Python)
   - Consumer 實作與錯誤處理

4. 最佳實踐應用 ✓
   - 可靠性配置
   - 效能優化
   - 監控與維護

5. 生產環境部署
   - 叢集架構設計
   - 安全性設定
   - 容災與備份

參考資源


建立日期:2025-01-15 最後更新:2025-01-15

🔗相關文章