目錄
什麼是 Kafka?
Apache Kafka 是一個分散式串流平台(Distributed Streaming Platform),用來處理大量即時資料流。
核心功能
- 訊息佇列(Message Queue):生產者發送訊息,消費者接收訊息
- 資料串流(Data Streaming):即時處理持續流入的資料
- 資料儲存(Data Storage):持久化儲存訊息,可重複讀取
為什麼需要 Kafka?
傳統問題:
服務 A → 直接呼叫 → 服務 B
→ 服務 C
→ 服務 D
- ❌ 服務之間緊密耦合
- ❌ 一個服務掛掉,全部受影響
- ❌ 難以擴展
使用 Kafka 後:
服務 A → Kafka ← 服務 B
← 服務 C
← 服務 D
- ✅ 服務解耦
- ✅ 非同步處理
- ✅ 易於擴展
Kafka 核心概念
基本架構
Producer(生產者)→ Kafka Cluster(Kafka 叢集)→ Consumer(消費者)
↓
[Topic 1]
[Topic 2]
[Topic 3]
1. Producer(生產者)
作用:產生並發送訊息到 Kafka
範例:
- Web 應用程式記錄使用者行為
- IoT 設備發送感測器資料
- 日誌收集系統
// Java Producer 範例
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("my-topic", "key", "Hello Kafka"));
producer.close();
2. Consumer(消費者)
作用:從 Kafka 讀取並處理訊息
特性:
- 可以有多個消費者同時讀取
- 每個消費者獨立追蹤自己的讀取進度(offset)
// Java Consumer 範例
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n",
record.offset(), record.key(), record.value());
}
}
3. Topic(主題)
作用:訊息的分類單位,類似資料庫的「表」
特性:
- 一個 Topic 可以有多個 Producer 和 Consumer
- Topic 中的訊息是有序的
- 訊息保留時間可設定(預設 7 天)
範例:
Topic: user-events
Topic: order-created
Topic: payment-processed
4. Partition(分區)
作用:Topic 的物理分割,用來提高並行處理能力
結構:
Topic: user-events
├─ Partition 0: [msg1, msg2, msg3]
├─ Partition 1: [msg4, msg5, msg6]
└─ Partition 2: [msg7, msg8, msg9]
特性:
- 每個 Partition 內部訊息有序
- 不同 Partition 之間訊息無法保證順序
- 相同 key 的訊息會進入同一個 Partition
為什麼需要 Partition?
- ✅ 平行處理:多個 Consumer 可以同時讀取不同 Partition
- ✅ 水平擴展:增加 Partition 數量提升吞吐量
- ✅ 容錯能力:每個 Partition 有多個副本
5. Offset(偏移量)
作用:訊息在 Partition 中的唯一序號
運作方式:
Partition 0:
offset: 0 1 2 3 4 5
msg: [msg1][msg2][msg3][msg4][msg5][msg6]
↑
Consumer 已讀到這裡
特性:
- Consumer 自己記錄讀到哪裡(offset)
- 可以重設 offset,重新讀取歷史訊息
- 每個 Consumer Group 維護自己的 offset
6. Consumer Group(消費者群組)
作用:多個 Consumer 組成一個群組,共同消費一個 Topic
運作模式:
模式 1:負載平衡(一個群組)
Topic (3 Partitions)
├─ P0 → Consumer A ┐
├─ P1 → Consumer B ├─ Consumer Group 1
└─ P2 → Consumer C ┘
每個 Partition 只被一個 Consumer 讀取
效果:負載平衡,提高處理速度
模式 2:廣播(多個群組)
Topic (3 Partitions)
├─ P0 → Consumer A (Group 1)
│ → Consumer D (Group 2)
├─ P1 → Consumer B (Group 1)
│ → Consumer E (Group 2)
└─ P2 → Consumer C (Group 1)
→ Consumer F (Group 2)
每個群組都能讀到完整訊息
效果:廣播給多個應用
規則:
- 同一個 Consumer Group 內,每個 Partition 只能被一個 Consumer 讀取
- 不同 Consumer Group 可以獨立消費相同的訊息
7. Broker(代理伺服器)
作用:Kafka 叢集中的伺服器節點,儲存和處理訊息
架構:
Kafka Cluster
├─ Broker 1 (Leader for P0)
├─ Broker 2 (Leader for P1)
└─ Broker 3 (Leader for P2)
特性:
- 一個叢集通常有多個 Broker
- 每個 Broker 儲存部分 Partition
- Broker 之間互相複製資料(Replication)
8. Replication(副本)
作用:資料備份,提高可靠性
運作方式:
Partition 0 (Replication Factor = 3)
├─ Broker 1: Leader(主副本,讀寫)
├─ Broker 2: Follower(從副本,備份)
└─ Broker 3: Follower(從副本,備份)
如果 Broker 1 掛掉,Broker 2 或 3 會成為新的 Leader
設定:
# 建立 Topic 時設定副本數
kafka-topics.sh --create \
--topic my-topic \
--partitions 3 \
--replication-factor 3 \
--bootstrap-server localhost:9092
9. ZooKeeper(協調器)
作用:管理 Kafka 叢集的元資料和協調
負責:
- Broker 註冊和健康檢查
- Topic 和 Partition 資訊
- Leader 選舉
- 配置管理
注意:Kafka 3.x+ 開始可以不依賴 ZooKeeper(KRaft 模式)
Kafka 架構圖
完整架構
┌─────────────────────────────────────────────────┐
│ ZooKeeper Cluster │
│ (協調 Kafka Cluster) │
└────────────────┬────────────────────────────────┘
│
┌────────────────┴────────────────────────────────┐
│ Kafka Cluster │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Broker 1 │ │ Broker 2 │ │ Broker 3 │ │
│ │ │ │ │ │ │ │
│ │ Topic A │ │ Topic A │ │ Topic B │ │
│ │ - P0(L) │ │ - P0(F) │ │ - P1(L) │ │
│ │ - P1(F) │ │ - P1(F) │ │ - P2(L) │ │
│ └──────────┘ └──────────┘ └──────────┘ │
└─────────────────────────────────────────────────┘
↑ ↓
[Producer] [Consumer]
說明:
- L = Leader(主副本)
- F = Follower(從副本)
Kafka 工作原理
1. 訊息發送流程
1. Producer 產生訊息
↓
2. 根據 key 或 round-robin 選擇 Partition
↓
3. 發送到該 Partition 的 Leader Broker
↓
4. Leader 寫入本地磁碟
↓
5. Follower 從 Leader 同步
↓
6. 確認寫入成功(根據 acks 設定)
↓
7. 回傳成功給 Producer
acks 設定:
acks=0:不等待確認(最快,可能丟失)acks=1:等待 Leader 確認(預設)acks=all:等待所有副本確認(最安全,最慢)
2. 訊息消費流程
1. Consumer 加入 Consumer Group
↓
2. Kafka 協調器分配 Partition 給 Consumer
↓
3. Consumer 從分配的 Partition 讀取
↓
4. 從上次的 offset 開始讀取
↓
5. 處理訊息
↓
6. 提交 offset(自動或手動)
3. Partition 分配策略
策略 1:Range(範圍分配)
Topic: 10 Partitions, 3 Consumers
Consumer A: P0, P1, P2, P3
Consumer B: P4, P5, P6
Consumer C: P7, P8, P9
策略 2:Round-Robin(輪詢分配)
Topic: 10 Partitions, 3 Consumers
Consumer A: P0, P3, P6, P9
Consumer B: P1, P4, P7
Consumer C: P2, P5, P8
策略 3:Sticky(黏性分配)
- 盡量保持原有分配
- 當 Consumer 加入/離開時才重新平衡
Kafka 使用情境
1. 日誌收集(Log Aggregation)
場景:收集多台伺服器的日誌
Web Server 1 ─┐
Web Server 2 ─┼→ Kafka (Topic: logs) → ELK Stack
Web Server 3 ─┘ (Elasticsearch)
優點:
- 集中管理日誌
- 即時監控
- 歷史日誌可查
2. 訊息佇列(Message Queue)
場景:訂單處理系統
訂單服務 → Kafka (Topic: orders) → 庫存服務
→ 支付服務
→ 通知服務
優點:
- 服務解耦
- 非同步處理
- 容錯能力強
3. 活動追蹤(Activity Tracking)
場景:追蹤使用者行為
Web App → Kafka (Topic: user-events) → 資料分析系統
→ 推薦系統
→ 即時儀表板
資料範例:
- 使用者點擊
- 頁面瀏覽
- 購物車操作
4. 串流處理(Stream Processing)
場景:即時詐騙偵測
交易事件 → Kafka → Kafka Streams → 詐騙偵測 → 警報系統
(即時分析)
應用:
- 即時推薦
- 異常偵測
- 即時統計
5. Event Sourcing(事件溯源)
場景:銀行帳戶系統
所有操作都記錄為事件:
- 開戶事件
- 存款事件
- 提款事件
Kafka 儲存所有事件 → 可重建任何時間點的狀態
6. CDC(Change Data Capture)
場景:資料庫同步
MySQL → Debezium → Kafka → 其他資料庫/搜尋引擎
(監聽變更) (Elasticsearch, Redis)
7. 微服務通訊
場景:電商系統
訂單服務 → Kafka (order-created) → 庫存服務
→ 發票服務
→ Email 服務
優點:
- 解耦合
- 容易擴展
- 易於維護
Kafka vs 其他技術
Kafka vs RabbitMQ
| 特性 | Kafka | RabbitMQ |
|---|---|---|
| 類型 | 分散式日誌 | 傳統訊息佇列 |
| 吞吐量 | 極高(百萬/秒) | 中等(萬/秒) |
| 訊息順序 | Partition 內保證 | 佇列內保證 |
| 訊息保留 | 可長期儲存 | 消費後刪除 |
| 重複消費 | 支援 | 不支援 |
| 使用情境 | 大數據、串流處理 | 一般應用、任務佇列 |
Kafka vs Redis Pub/Sub
| 特性 | Kafka | Redis Pub/Sub |
|---|---|---|
| 持久化 | 是 | 否(在記憶體) |
| 可靠性 | 高(副本機制) | 中 |
| 回溯 | 可以 | 不可以 |
| 複雜度 | 高 | 低 |
| 使用情境 | 大量資料、需持久化 | 簡單即時通知 |
Kafka 部署與操作
1. 使用 Docker 快速啟動
# docker-compose.yml
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
# 啟動
docker-compose up -d
2. 常用管理命令
建立 Topic
kafka-topics.sh --create \
--topic my-topic \
--partitions 3 \
--replication-factor 1 \
--bootstrap-server localhost:9092
列出所有 Topic
kafka-topics.sh --list \
--bootstrap-server localhost:9092
查看 Topic 詳情
kafka-topics.sh --describe \
--topic my-topic \
--bootstrap-server localhost:9092
刪除 Topic
kafka-topics.sh --delete \
--topic my-topic \
--bootstrap-server localhost:9092
3. 生產和消費測試
使用命令列 Producer
kafka-console-producer.sh \
--topic my-topic \
--bootstrap-server localhost:9092
# 然後輸入訊息
> Hello Kafka
> This is a test message
使用命令列 Consumer
kafka-console-consumer.sh \
--topic my-topic \
--from-beginning \
--bootstrap-server localhost:9092
4. 查看 Consumer Group
# 列出所有 Consumer Group
kafka-consumer-groups.sh --list \
--bootstrap-server localhost:9092
# 查看 Consumer Group 詳情(offset lag)
kafka-consumer-groups.sh --describe \
--group my-group \
--bootstrap-server localhost:9092
輸出範例:
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
my-group my-topic 0 100 150 50
my-group my-topic 1 200 200 0
my-group my-topic 2 150 180 30
- LAG:落後的訊息數(消費速度慢)
5. 重設 Consumer Offset
# 重設到最早
kafka-consumer-groups.sh --reset-offsets \
--group my-group \
--topic my-topic \
--to-earliest \
--execute \
--bootstrap-server localhost:9092
# 重設到特定時間
kafka-consumer-groups.sh --reset-offsets \
--group my-group \
--topic my-topic \
--to-datetime 2024-01-01T00:00:00.000 \
--execute \
--bootstrap-server localhost:9092
# 重設到特定 offset
kafka-consumer-groups.sh --reset-offsets \
--group my-group \
--topic my-topic:0 \
--to-offset 100 \
--execute \
--bootstrap-server localhost:9092
實戰範例
範例 1:Node.js Producer
const { Kafka } = require('kafkajs');
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092']
});
const producer = kafka.producer();
async function sendMessage() {
await producer.connect();
await producer.send({
topic: 'user-events',
messages: [
{
key: 'user-123',
value: JSON.stringify({
userId: 123,
action: 'login',
timestamp: Date.now()
})
}
]
});
await producer.disconnect();
}
sendMessage();
範例 2:Node.js Consumer
const { Kafka } = require('kafkajs');
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092']
});
const consumer = kafka.consumer({ groupId: 'my-group' });
async function consumeMessages() {
await consumer.connect();
await consumer.subscribe({ topic: 'user-events', fromBeginning: true });
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
partition,
offset: message.offset,
key: message.key.toString(),
value: message.value.toString()
});
// 處理訊息
const event = JSON.parse(message.value.toString());
console.log('User action:', event.action);
}
});
}
consumeMessages();
範例 3:Python Producer
from kafka import KafkaProducer
import json
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
# 發送訊息
producer.send('user-events', {
'userId': 123,
'action': 'login',
'timestamp': 1234567890
})
producer.flush()
producer.close()
範例 4:Python Consumer
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
'user-events',
bootstrap_servers=['localhost:9092'],
group_id='my-group',
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
auto_offset_reset='earliest'
)
for message in consumer:
event = message.value
print(f"User {event['userId']} action: {event['action']}")
最佳實踐
1. Topic 與 Partition 設計
Partition 數量設定:
# ✅ 推薦:根據預期吞吐量和 Consumer 數量
Partition 數量 = max(預期吞吐量 / 單個 Partition 吞吐量, Consumer 數量)
# 範例
預期吞吐量:10 MB/s
單個 Partition:1 MB/s
Consumer 數量:5
Partition 數量 = max(10, 5) = 10
# ✅ 推薦:合理的 Partition 數量
kafka-topics.sh --create \
--topic user-events \
--partitions 10 \
--replication-factor 3
# ❌ 不推薦:Partition 太少或太多
--partitions 1 # 無法並行處理
--partitions 1000 # 管理開銷過大
注意事項:
- Partition 太少:無法充分利用並行,吞吐量受限
- Partition 太多:增加管理開銷,影響 Leader 選舉效能
- Partition 數量一旦設定難以修改,建議預留成長空間
2. 可靠性配置
Replication Factor 設定:
# ✅ 推薦:生產環境使用多副本
--replication-factor 3 # 一般應用
--replication-factor 5 # 關鍵應用
# ❌ 不推薦:生產環境只用單副本
--replication-factor 1 # 僅適合測試環境
Producer 可靠性配置:
// ✅ 推薦:高可靠性配置
props.put("acks", "all"); // 等待所有副本確認
props.put("retries", 3); // 失敗重試 3 次
props.put("max.in.flight.requests.per.connection", 1); // 保證順序
// ❌ 不推薦:可能丟失資料
props.put("acks", "0"); // 不等待確認,可能丟失
Consumer 可靠性配置:
// ✅ 推薦:手動提交 offset,確保處理完才提交
props.put("enable.auto.commit", "false");
consumer.commitSync(); // 處理完訊息後手動提交
// ❌ 不推薦:自動提交可能導致訊息丟失
props.put("enable.auto.commit", "true");
3. 訊息 Key 設計
保證訊息順序:
// ✅ 推薦:使用業務 key 保證同一實體的訊息順序
producer.send(new ProducerRecord<>(
"user-events",
userId.toString(), // key = userId,相同使用者訊息進入同一 Partition
eventJson
));
// ✅ 推薦:訂單系統使用訂單 ID 作為 key
producer.send(new ProducerRecord<>(
"order-events",
orderId.toString(),
orderData
));
// ❌ 不推薦:需要順序但沒有 key
producer.send(new ProducerRecord<>(
"user-events",
null, // 無 key,訊息隨機分配,無法保證順序
eventJson
));
均勻分布:
// ✅ 推薦:key 分布均勻,避免資料傾斜
// 好的 key 選擇:userId, sessionId, deviceId
// ❌ 不推薦:key 分布不均,造成熱點 Partition
// 壞的 key 選擇:日期(所有當天資料進同一 Partition)、常數
4. 錯誤處理與重試
Consumer 錯誤處理:
// ✅ 推薦:完整的錯誤處理機制
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const maxRetries = 3;
let retryCount = 0;
while (retryCount < maxRetries) {
try {
// 處理訊息
await processMessage(message);
return; // 成功,退出
} catch (error) {
retryCount++;
console.error(`處理失敗 (${retryCount}/${maxRetries}):`, error);
if (retryCount >= maxRetries) {
// 超過重試次數,寫入 Dead Letter Queue
await producer.send({
topic: 'dlq-topic',
messages: [{
key: message.key,
value: message.value,
headers: {
'original-topic': topic,
'error': error.message,
'retry-count': retryCount.toString()
}
}]
});
} else {
// 等待後重試
await sleep(1000 * retryCount); // 指數退避
}
}
}
}
});
// ❌ 不推薦:忽略錯誤或無限重試
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
try {
await processMessage(message);
} catch (error) {
// 忽略錯誤,可能導致訊息丟失
}
}
});
冪等性處理:
# ✅ 推薦:使用唯一 ID 實現冪等性
def process_message(message):
message_id = message['id']
# 檢查是否已處理
if redis.exists(f"processed:{message_id}"):
return # 跳過重複訊息
# 處理訊息
do_something(message)
# 標記為已處理(設定過期時間)
redis.set(f"processed:{message_id}", 1, ex=86400)
# ❌ 不推薦:沒有去重機制,可能重複處理
def process_message(message):
do_something(message) # 重複消費會重複執行
5. 效能優化
批次處理:
// ✅ 推薦:批次發送提高吞吐量
props.put("batch.size", 16384); // 批次大小 16KB
props.put("linger.ms", 10); // 等待 10ms 累積訊息
// ❌ 不推薦:每次只發送一則訊息
// 效能低下,無法充分利用網路頻寬
壓縮設定:
// ✅ 推薦:啟用壓縮節省頻寬和儲存
props.put("compression.type", "snappy"); // 或 lz4, gzip
// ❌ 不推薦:不壓縮(頻寬和儲存浪費)
props.put("compression.type", "none");
Consumer 並行處理:
// ✅ 推薦:充分利用 Partition 並行能力
// Consumer 數量 = Partition 數量(最佳)
const consumers = [];
for (let i = 0; i < partitionCount; i++) {
consumers.push(createConsumer());
}
// ❌ 不推薦:Consumer 數量不當
// Consumer 太少:無法充分並行
// Consumer 太多:超過 Partition 數量,浪費資源
6. 監控與維護
重要監控指標:
# ✅ 推薦:定期監控 Consumer Lag
kafka-consumer-groups.sh --describe --group my-group \
--bootstrap-server localhost:9092
# 關鍵指標
# - LAG:消費延遲(應該趨近 0)
# - CURRENT-OFFSET:當前消費位置
# - LOG-END-OFFSET:最新訊息位置
告警設定:
# ✅ 推薦:設定監控告警
alerts:
- name: HighConsumerLag
condition: lag > 10000
action: notify_team
- name: BrokerDown
condition: broker_unavailable
action: critical_alert
- name: DiskUsageHigh
condition: disk_usage > 80%
action: warning
定期維護:
# ✅ 推薦:定期清理過期 Topic
kafka-topics.sh --delete --topic old-topic
# ✅ 推薦:定期檢查 Partition 分布
kafka-topics.sh --describe --topic my-topic
# ✅ 推薦:監控磁碟使用率
df -h /var/lib/kafka/data
7. 安全性考量
啟用認證與加密:
# ✅ 推薦:生產環境啟用 SASL/SSL
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
ssl.truststore.location=/path/to/truststore.jks
# ❌ 不推薦:生產環境使用 PLAINTEXT
security.protocol=PLAINTEXT
ACL 權限控制:
# ✅ 推薦:設定細粒度權限
kafka-acls.sh --add \
--allow-principal User:app1 \
--operation Read \
--topic user-events
# ❌ 不推薦:所有應用共用超級使用者
8. 資源規劃
硬體建議:
✅ 推薦配置
CPU:16+ cores
記憶體:64GB+
磁碟:SSD,1TB+ per broker
網路:10Gbps
❌ 不推薦
CPU 太少:< 8 cores
使用 HDD:效能差
網路頻寬不足:< 1Gbps
Broker 數量規劃:
# ✅ 推薦:根據資料量和可用性需求
# 資料量:每天 1TB,保留 7 天
# 副本數:3
# 總儲存需求:1TB × 7 × 3 = 21TB
# Broker 數量:至少 3 台(每台 8TB 磁碟)
# ❌ 不推薦:單台 Broker(無容錯能力)
常見問題
Q1: Kafka 如何保證訊息不丟失?
A: 三個層面保證
- Producer 層面
props.put("acks", "all"); // 等待所有副本確認
props.put("retries", 3); // 失敗重試
- Broker 層面
--replication-factor 3 # 多個副本
- Consumer 層面
// 手動提交 offset(處理完才提交)
consumer.commitSync();
Q2: Kafka 如何保證訊息順序?
A: 在 Partition 內保證順序
// 相同 key 的訊息會進入同一 Partition
producer.send(new ProducerRecord<>(
"my-topic",
userId, // key
message
));
// Partition 內訊息是有序的
// 不同 Partition 之間無法保證順序
注意:如果需要全域順序,只能用 1 個 Partition(但失去並行能力)
Q3: Consumer 掛掉後,訊息會丟失嗎?
A: 不會,因為:
- Kafka 會儲存訊息(預設 7 天)
- Offset 被記錄,Consumer 重啟後從上次位置繼續
- Consumer Group 會重新平衡,由其他 Consumer 接手
Q4: 如何避免重複消費?
A: 設計**冪等性(Idempotent)**處理邏輯
# 範例:使用唯一 ID 檢查
def process_message(message):
message_id = message['id']
# 檢查是否已處理
if redis.exists(f"processed:{message_id}"):
return # 跳過重複訊息
# 處理訊息
do_something(message)
# 標記為已處理
redis.set(f"processed:{message_id}", 1, ex=86400)
Q5: Kafka 適合什麼樣的系統規模?
A:
| 規模 | 適用性 | 原因 |
|---|---|---|
| 小型(< 1000 msg/s) | ⚠️ 可能過度設計 | RabbitMQ 或 Redis 更簡單 |
| 中型(1000-10萬 msg/s) | ✅ 非常適合 | Kafka 的甜蜜點 |
| 大型(> 10萬 msg/s) | ✅ 最佳選擇 | Kafka 專為高吞吐量設計 |
總結
核心要點
Apache Kafka = 分散式日誌 + 訊息佇列 + 串流處理平台
關鍵特性:
├─ 高吞吐量:百萬訊息/秒
├─ 持久化:訊息可長期儲存
├─ 可擴展:水平擴展 Partition 和 Broker
└─ 容錯性:多副本機制保證可靠性
核心概念速記:
| 概念 | 說明 | 比喻 |
|---|---|---|
| Topic | 訊息分類 | 資料夾 |
| Partition | Topic 的分割 | 資料夾裡的檔案 |
| Producer | 發送訊息 | 寫入檔案的人 |
| Consumer | 讀取訊息 | 讀取檔案的人 |
| Broker | 儲存訊息的伺服器 | 硬碟 |
| Offset | 訊息編號 | 檔案行號 |
| Consumer Group | 消費者群組 | 讀取團隊 |
常用命令速查
# Topic 管理
kafka-topics.sh --create --topic my-topic --partitions 3 --replication-factor 3 --bootstrap-server localhost:9092
kafka-topics.sh --list --bootstrap-server localhost:9092
kafka-topics.sh --describe --topic my-topic --bootstrap-server localhost:9092
kafka-topics.sh --delete --topic my-topic --bootstrap-server localhost:9092
# 生產消費
kafka-console-producer.sh --topic my-topic --bootstrap-server localhost:9092
kafka-console-consumer.sh --topic my-topic --from-beginning --bootstrap-server localhost:9092
# Consumer Group
kafka-consumer-groups.sh --list --bootstrap-server localhost:9092
kafka-consumer-groups.sh --describe --group my-group --bootstrap-server localhost:9092
# Offset 重設
kafka-consumer-groups.sh --reset-offsets --group my-group --topic my-topic --to-earliest --execute --bootstrap-server localhost:9092
關鍵配置速查
Producer 配置:
acks:all(最安全),1(預設),0(最快)retries: 重試次數(建議 3)compression.type: 壓縮類型(snappy,lz4,gzip)batch.size: 批次大小(預設 16KB)
Consumer 配置:
group.id: Consumer Group ID(必須)auto.offset.reset:earliest(從最早),latest(從最新)enable.auto.commit: 自動提交 offset(建議 false)max.poll.records: 每次拉取訊息數(預設 500)
Topic 配置:
partitions: Partition 數量(根據吞吐量設定)replication-factor: 副本數(生產環境建議 3)log.retention.hours: 訊息保留時間(預設 168 小時 = 7 天)
使用情境決策指南
| 需求 | 是否使用 Kafka | 理由 |
|---|---|---|
| 大量日誌收集 | ✅ 非常適合 | 高吞吐量、持久化儲存 |
| 即時串流處理 | ✅ 最佳選擇 | 原生支援串流處理 |
| 訂單/支付(需要順序) | ✅ 適合 | 使用 key 保證 Partition 內順序 |
| Event Sourcing | ✅ 非常適合 | 可重複消費歷史事件 |
| 微服務通訊 | ✅ 適合 | 服務解耦、非同步處理 |
| 簡單任務佇列 | ⚠️ 可能過度設計 | RabbitMQ 更輕量 |
| 即時推播通知 | ⚠️ 可能過度設計 | Redis Pub/Sub 延遲更低 |
| 小型應用(< 1000 msg/s) | ❌ 不建議 | 維護成本高,收益低 |
快速決策流程
需要處理大量資料?
├─ 是 → 需要持久化儲存?
│ ├─ 是 → 需要重複消費?
│ │ ├─ 是 → ✅ Kafka(最佳選擇)
│ │ └─ 否 → ✅ Kafka 或 RabbitMQ
│ └─ 否 → Redis Pub/Sub
└─ 否 → 訊息量 < 1000/s?
├─ 是 → RabbitMQ 或 Redis
└─ 否 → ✅ Kafka
學習路徑
1. 理解核心概念 ✓
- Topic, Partition, Producer, Consumer
- Offset, Consumer Group, Broker
2. 快速部署實驗 ✓
- Docker 快速啟動
- 建立 Topic 和測試收發
3. 實作程式碼 ✓
- Producer 實作(Java/Node.js/Python)
- Consumer 實作與錯誤處理
4. 最佳實踐應用 ✓
- 可靠性配置
- 效能優化
- 監控與維護
5. 生產環境部署
- 叢集架構設計
- 安全性設定
- 容災與備份
參考資源
- 官方網站:https://kafka.apache.org
- 官方文件:https://kafka.apache.org/documentation/
- GitHub:https://github.com/apache/kafka
- Confluent 文件:https://docs.confluent.io
- KafkaJS(Node.js):https://kafka.js.org
- kafka-python:https://kafka-python.readthedocs.io
建立日期:2025-01-15 最後更新:2025-01-15