目錄
什麼是 Pub/Sub?
Pub/Sub(Publish/Subscribe,發布/訂閱) 是一種訊息傳遞模式,發送者(Publisher)發送訊息到頻道(Channel),訂閱者(Subscriber)接收頻道的訊息。
核心概念
Publisher(發布者)→ Channel(頻道)→ Subscriber(訂閱者)
↓
[news]
[chat]
[alerts]
特點:
- ✅ 發布者和訂閱者解耦(不需要知道對方存在)
- ✅ 即時推送(訂閱者立即收到訊息)
- ✅ 一對多(一個訊息可以被多個訂閱者接收)
- ❌ 不持久化(訊息發送後就消失,訂閱者離線就收不到)
Redis Pub/Sub vs Kafka
| 特性 | Redis Pub/Sub | Kafka |
|---|---|---|
| 持久化 | ❌ 否(在記憶體) | ✅ 是(寫入磁碟) |
| 歷史訊息 | ❌ 無法回溯 | ✅ 可以重新消費 |
| 可靠性 | 低(訂閱者離線會丟失) | 高(訊息保留) |
| 速度 | 極快 ⚡⚡⚡ | 快 ⚡⚡ |
| 複雜度 | 簡單 | 複雜 |
| 使用情境 | 即時通知、聊天 | 日誌、事件溯源 |
選擇原則:
- Redis Pub/Sub:簡單、即時、不在乎丟失(如聊天室、即時通知)
- Kafka:需要持久化、可靠性、大量資料(如訂單系統、日誌收集)
基本指令
1. 發布訊息(PUBLISH)
# 語法
PUBLISH channel message
# 範例
PUBLISH news "Breaking news: Redis is awesome!"
PUBLISH chat:room1 "Hello everyone!"
回傳值:接收到訊息的訂閱者數量
redis> PUBLISH news "Hello"
(integer) 3 # 有 3 個訂閱者收到這則訊息
2. 訂閱頻道(SUBSCRIBE)
# 語法
SUBSCRIBE channel [channel ...]
# 範例:訂閱單一頻道
SUBSCRIBE news
# 範例:訂閱多個頻道
SUBSCRIBE news chat weather
執行後:
- 進入訂閱模式(blocking mode)
- 只能執行 SUBSCRIBE、PSUBSCRIBE、UNSUBSCRIBE、PUNSUBSCRIBE、QUIT
- 無法執行其他 Redis 命令
輸出範例:
redis> SUBSCRIBE news
Reading messages... (press Ctrl-C to quit)
1) "subscribe" # 訂閱成功
2) "news" # 頻道名稱
3) (integer) 1 # 目前訂閱的頻道數
# 收到訊息時
1) "message" # 訊息類型
2) "news" # 來源頻道
3) "Hello World!" # 訊息內容
3. 取消訂閱(UNSUBSCRIBE)
# 語法
UNSUBSCRIBE [channel [channel ...]]
# 範例:取消特定頻道
UNSUBSCRIBE news
# 範例:取消所有頻道
UNSUBSCRIBE
4. 模式訂閱(PSUBSCRIBE)
使用萬用字元訂閱多個頻道:
# 語法
PSUBSCRIBE pattern [pattern ...]
# 範例:訂閱所有 news 開頭的頻道
PSUBSCRIBE news:*
# 範例:訂閱所有頻道
PSUBSCRIBE *
# 範例:訂閱特定格式
PSUBSCRIBE chat:room:*
PSUBSCRIBE user:*:notifications
萬用字元規則:
*:匹配任意字元?:匹配單一字元[abc]:匹配 a、b 或 c
輸出範例:
redis> PSUBSCRIBE news:*
Reading messages...
1) "psubscribe"
2) "news:*"
3) (integer) 1
# 收到訊息時
1) "pmessage" # 模式訊息
2) "news:*" # 匹配的模式
3) "news:sports" # 實際頻道
4) "Team wins!" # 訊息內容
5. 取消模式訂閱(PUNSUBSCRIBE)
# 語法
PUNSUBSCRIBE [pattern [pattern ...]]
# 範例
PUNSUBSCRIBE news:*
6. 查看頻道(PUBSUB)
查看活躍頻道
# 列出至少有 1 個訂閱者的頻道
PUBSUB CHANNELS
# 列出符合模式的頻道
PUBSUB CHANNELS news:*
查看訂閱者數量
# 查看特定頻道的訂閱者數量
PUBSUB NUMSUB channel1 channel2
# 範例
redis> PUBSUB NUMSUB news chat
1) "news"
2) (integer) 3 # news 有 3 個訂閱者
3) "chat"
4) (integer) 1 # chat 有 1 個訂閱者
查看模式訂閱數量
# 查看活躍的模式訂閱數量
PUBSUB NUMPAT
redis> PUBSUB NUMPAT
(integer) 2 # 有 2 個客戶端使用模式訂閱
實戰範例
範例 1:命令列測試
終端機 1(訂閱者):
redis-cli
> SUBSCRIBE news
# 等待訊息...
終端機 2(發布者):
redis-cli
> PUBLISH news "Hello from publisher!"
(integer) 1
> PUBLISH news "Second message"
(integer) 1
終端機 1 會收到:
1) "message"
2) "news"
3) "Hello from publisher!"
1) "message"
2) "news"
3) "Second message"
範例 2:Node.js 實作
安裝
npm install redis
訂閱者(Subscriber)
const redis = require('redis');
async function subscriber() {
const client = redis.createClient();
await client.connect();
// 訂閱頻道
await client.subscribe('news', (message, channel) => {
console.log(`收到來自 ${channel} 的訊息: ${message}`);
});
console.log('已訂閱 news 頻道');
}
subscriber();
發布者(Publisher)
const redis = require('redis');
async function publisher() {
const client = redis.createClient();
await client.connect();
// 發布訊息
const subscriberCount = await client.publish('news', 'Breaking news!');
console.log(`訊息已發送給 ${subscriberCount} 個訂閱者`);
await client.quit();
}
publisher();
範例 3:Python 實作
安裝
pip install redis
訂閱者
import redis
# 連接 Redis
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
# 建立 PubSub 物件
pubsub = r.pubsub()
# 訂閱頻道
pubsub.subscribe('news')
print('已訂閱 news 頻道,等待訊息...')
# 接收訊息
for message in pubsub.listen():
if message['type'] == 'message':
print(f"收到訊息: {message['data']}")
發布者
import redis
# 連接 Redis
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
# 發布訊息
subscriber_count = r.publish('news', 'Breaking news from Python!')
print(f'訊息已發送給 {subscriber_count} 個訂閱者')
範例 4:Go 實作
安裝
go get github.com/go-redis/redis/v8
訂閱者
package main
import (
"context"
"fmt"
"github.com/go-redis/redis/v8"
)
func main() {
ctx := context.Background()
// 連接 Redis
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
// 訂閱頻道
pubsub := rdb.Subscribe(ctx, "news")
defer pubsub.Close()
// 接收訊息
ch := pubsub.Channel()
fmt.Println("已訂閱 news 頻道")
for msg := range ch {
fmt.Printf("收到訊息: %s\n", msg.Payload)
}
}
發布者
package main
import (
"context"
"fmt"
"github.com/go-redis/redis/v8"
)
func main() {
ctx := context.Background()
// 連接 Redis
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
// 發布訊息
count, err := rdb.Publish(ctx, "news", "Breaking news from Go!").Result()
if err != nil {
panic(err)
}
fmt.Printf("訊息已發送給 %d 個訂閱者\n", count)
}
應用情境
情境 1:即時聊天室
// 加入聊天室
await client.subscribe('chat:room1', (message) => {
const data = JSON.parse(message);
displayMessage(data.user, data.text);
});
// 發送訊息
await client.publish('chat:room1', JSON.stringify({
user: 'Alice',
text: 'Hello everyone!',
timestamp: Date.now()
}));
情境 2:即時通知系統
# 訂閱使用者的通知頻道
pubsub.subscribe(f'user:{user_id}:notifications')
# 發送通知給特定使用者
r.publish(f'user:{user_id}:notifications', json.dumps({
'type': 'friend_request',
'from': 'Bob',
'message': 'Bob 想加你為好友'
}))
情境 3:即時儀表板更新
// 前端訂閱儀表板更新
await client.subscribe('dashboard:metrics', (message) => {
const metrics = JSON.parse(message);
updateDashboard(metrics);
});
// 後端定期發布更新
setInterval(async () => {
const metrics = await getSystemMetrics();
await client.publish('dashboard:metrics', JSON.stringify(metrics));
}, 1000); // 每秒更新
情境 4:多伺服器之間的協調
# 伺服器 A 通知其他伺服器清除快取
r.publish('cache:invalidate', json.dumps({
'key': 'user:123',
'action': 'delete'
}))
# 其他伺服器監聽並處理
pubsub.subscribe('cache:invalidate')
for message in pubsub.listen():
if message['type'] == 'message':
data = json.loads(message['data'])
cache.delete(data['key'])
情境 5:遊戲伺服器廣播
// 廣播遊戲狀態更新
rdb.Publish(ctx, "game:room1:updates", json.Marshal(GameState{
Players: players,
Score: score,
Timestamp: time.Now(),
}))
// 所有玩家接收更新
pubsub := rdb.Subscribe(ctx, "game:room1:updates")
for msg := range pubsub.Channel() {
var state GameState
json.Unmarshal([]byte(msg.Payload), &state)
updateGameUI(state)
}
進階模式
模式 1:分層頻道
# 按層級組織頻道
user:123:notifications
user:123:messages
user:123:alerts
order:456:status
order:456:updates
# 訂閱特定使用者的所有頻道
PSUBSCRIBE user:123:*
# 訂閱所有使用者的通知
PSUBSCRIBE user:*:notifications
模式 2:廣播 + 私人訊息
// 廣播給所有人
await client.publish('broadcast', '系統維護通知');
// 私人訊息給特定使用者
await client.publish(`user:${userId}:private`, '你有新訊息');
// 使用者訂閱兩個頻道
await client.subscribe(['broadcast', `user:${userId}:private`], handler);
模式 3:多語言頻道
# 根據使用者語言訂閱
lang = user.language # 'en', 'zh', 'ja'
pubsub.subscribe(f'notifications:{lang}')
# 發送多語言通知
for lang in ['en', 'zh', 'ja']:
message = translate(notification, lang)
r.publish(f'notifications:{lang}', message)
常見問題
問題 1:訊息不持久化的影響
問題:訂閱者離線期間的訊息會丟失
# 發布訊息時沒有訂閱者
PUBLISH news "Important message"
(integer) 0 # 沒有人收到
# 之後才訂閱
SUBSCRIBE news # 收不到之前的訊息
解決方案:
- 使用 Redis Streams(持久化的 Pub/Sub)
- 搭配其他儲存機制(資料庫、Kafka)
- 重要訊息不應該只用 Pub/Sub
問題 2:無法確認送達
Redis Pub/Sub 是「fire-and-forget」(發送後忘記):
- ❌ 無法知道訊息是否被處理
- ❌ 無法確認訂閱者是否在線
- ❌ 無法重試失敗的訊息
適用情境:
- ✅ 即時通知(丟失無所謂)
- ✅ 即時監控(下一次更新會補上)
- ❌ 交易訊息(需要保證送達)
- ❌ 關鍵業務邏輯
問題 3:訂閱模式會阻塞連線
# ❌ 訂閱後無法執行其他 Redis 命令
pubsub.subscribe('news')
r.get('key') # 這行會失敗
# ✅ 使用兩個連線
r1 = redis.Redis() # 用於訂閱
r2 = redis.Redis() # 用於其他操作
pubsub = r1.pubsub()
pubsub.subscribe('news')
r2.get('key') # 這樣可以
問題 4:效能考量
記憶體使用:
- 每個訂閱者的輸出緩衝區都會佔用記憶體
- 大量訂閱者 + 高頻訊息 = 高記憶體使用
建議:
# 設定客戶端輸出緩衝區限制
config set client-output-buffer-limit "pubsub 32mb 8mb 60"
# 類型 硬限制 軟限制 軟限制時間
Redis Pub/Sub vs Redis Streams
Redis Streams(Redis 5.0+)
優點:
- ✅ 訊息持久化
- ✅ 可以回溯歷史訊息
- ✅ Consumer Group 支援
- ✅ 訊息確認機制
何時用 Streams?
- 需要訊息持久化
- 需要處理確認
- 需要回溯歷史
- 類似 Kafka 的使用場景
何時用 Pub/Sub?
- 即時性要求極高
- 不在乎訊息丟失
- 簡單的廣播場景
- 低延遲要求
監控與除錯
查看當前訂閱情況
# 查看所有活躍頻道
PUBSUB CHANNELS
# 查看特定頻道訂閱數
PUBSUB NUMSUB news chat
# 查看模式訂閱數
PUBSUB NUMPAT
使用 MONITOR 除錯
# 監控所有 Redis 命令(包含 Pub/Sub)
redis-cli MONITOR
# 輸出範例
1699999999.123456 [0 127.0.0.1:12345] "PUBLISH" "news" "Hello"
1699999999.234567 [0 127.0.0.1:12346] "SUBSCRIBE" "news"
客戶端輸出緩衝區監控
# 查看客戶端資訊
CLIENT LIST
# 輸出包含緩衝區使用情況
id=123 addr=127.0.0.1:54321 ... qbuf=0 qbuf-free=0 obl=16384 oll=0 omem=0 ...
最佳實踐
1. 使用有意義的頻道命名
# ✅ 好的命名
user:123:notifications
order:456:status
game:room1:updates
# ❌ 不好的命名
ch1
temp
test
2. 使用 JSON 格式傳遞結構化資料
// ✅ 使用 JSON
await client.publish('events', JSON.stringify({
type: 'user_login',
userId: 123,
timestamp: Date.now(),
metadata: { ip: '1.2.3.4' }
}));
// ❌ 使用純文字(難以解析)
await client.publish('events', 'user 123 logged in at 1699999999');
3. 錯誤處理
// 訂閱者端
client.on('error', (err) => {
console.error('Redis 錯誤:', err);
// 實作重連邏輯
});
// 發布者端
try {
const count = await client.publish('news', message);
if (count === 0) {
console.warn('沒有訂閱者收到訊息');
}
} catch (err) {
console.error('發布失敗:', err);
}
4. 優雅關閉
// 清理資源
process.on('SIGINT', async () => {
await client.unsubscribe();
await client.quit();
process.exit(0);
});
5. 測試時使用不同的 Redis DB
// 開發環境
const devClient = redis.createClient({ db: 0 });
// 測試環境
const testClient = redis.createClient({ db: 1 });
總結
快速決策指南
| 需求 | Redis Pub/Sub | 替代方案 |
|---|---|---|
| 即時聊天室 | ✅ 很適合 | WebSocket |
| 即時通知 | ✅ 很適合 | Server-Sent Events |
| 儀表板即時更新 | ✅ 很適合 | WebSocket |
| 訂單處理 | ❌ 不適合 | Kafka, RabbitMQ |
| 日誌收集 | ❌ 不適合 | Kafka, Fluentd |
| 需要訊息持久化 | ❌ 不適合 | Redis Streams, Kafka |
| 需要訊息確認 | ❌ 不適合 | RabbitMQ, Kafka |
核心指令
# 發布
PUBLISH channel message
# 訂閱
SUBSCRIBE channel [channel ...]
# 模式訂閱
PSUBSCRIBE pattern [pattern ...]
# 取消訂閱
UNSUBSCRIBE [channel ...]
PUNSUBSCRIBE [pattern ...]
# 查詢
PUBSUB CHANNELS [pattern]
PUBSUB NUMSUB [channel ...]
PUBSUB NUMPAT
特性對照
| 特性 | Redis Pub/Sub |
|---|---|
| 持久化 | ❌ 否 |
| 速度 | ⚡⚡⚡ 極快 |
| 複雜度 | 簡單 |
| 訊息順序 | 保證 |
| 訊息確認 | ❌ 無 |
| 回溯歷史 | ❌ 無 |
| 多播能力 | ✅ 有 |
| 適用場景 | 即時、輕量、可丟失 |
核心要點
-
Redis Pub/Sub 特性:
- 極快的即時推送
- 發布者與訂閱者解耦
- 不持久化(訂閱者離線會丟失)
-
適合場景:
- ✅ 即時通知、聊天室、監控
- ✅ 訊息可以丟失的場景
- ❌ 需要持久化和確認的場景
-
與其他方案對比:
- vs Kafka:更快但不持久化
- vs WebSocket:Redis 處理訊息路由
- vs Redis Streams:Streams 提供持久化
-
最佳實踐:
- 使用 JSON 格式傳遞資料
- 合理命名頻道(user:123:notifications)
- 使用兩個 Redis 連線(訂閱+操作)
- 做好錯誤處理和重連邏輯
建立日期:2025-10-28 最後更新:2025-11-18