Redis Pub/Sub 完全指南

深入理解 Redis Pub/Sub 發布訂閱模式的原理、指令用法與實戰應用


目錄

  1. 什麼是 Pub/Sub
  2. 基本指令
  3. 實戰範例
  4. 應用情境
  5. 最佳實踐
  6. 常見問題
  7. 總結

什麼是 Pub/Sub?

Pub/Sub(Publish/Subscribe,發布/訂閱) 是一種訊息傳遞模式,發送者(Publisher)發送訊息到頻道(Channel),訂閱者(Subscriber)接收頻道的訊息。

核心概念

Publisher(發布者)→ Channel(頻道)→ Subscriber(訂閱者)
                  [news]
                  [chat]
                  [alerts]

特點

  • ✅ 發布者和訂閱者解耦(不需要知道對方存在)
  • 即時推送(訂閱者立即收到訊息)
  • 一對多(一個訊息可以被多個訂閱者接收)
  • 不持久化(訊息發送後就消失,訂閱者離線就收不到)

Redis Pub/Sub vs Kafka

特性 Redis Pub/Sub Kafka
持久化 ❌ 否(在記憶體) ✅ 是(寫入磁碟)
歷史訊息 ❌ 無法回溯 ✅ 可以重新消費
可靠性 低(訂閱者離線會丟失) 高(訊息保留)
速度 極快 ⚡⚡⚡ 快 ⚡⚡
複雜度 簡單 複雜
使用情境 即時通知、聊天 日誌、事件溯源

選擇原則

  • Redis Pub/Sub:簡單、即時、不在乎丟失(如聊天室、即時通知)
  • Kafka:需要持久化、可靠性、大量資料(如訂單系統、日誌收集)

基本指令

1. 發布訊息(PUBLISH)

# 語法
PUBLISH channel message

# 範例
PUBLISH news "Breaking news: Redis is awesome!"
PUBLISH chat:room1 "Hello everyone!"

回傳值:接收到訊息的訂閱者數量

redis> PUBLISH news "Hello"
(integer) 3  # 有 3 個訂閱者收到這則訊息

2. 訂閱頻道(SUBSCRIBE)

# 語法
SUBSCRIBE channel [channel ...]

# 範例:訂閱單一頻道
SUBSCRIBE news

# 範例:訂閱多個頻道
SUBSCRIBE news chat weather

執行後

  • 進入訂閱模式(blocking mode)
  • 只能執行 SUBSCRIBE、PSUBSCRIBE、UNSUBSCRIBE、PUNSUBSCRIBE、QUIT
  • 無法執行其他 Redis 命令

輸出範例

redis> SUBSCRIBE news
Reading messages... (press Ctrl-C to quit)
1) "subscribe"      # 訂閱成功
2) "news"           # 頻道名稱
3) (integer) 1      # 目前訂閱的頻道數

# 收到訊息時
1) "message"        # 訊息類型
2) "news"           # 來源頻道
3) "Hello World!"   # 訊息內容

3. 取消訂閱(UNSUBSCRIBE)

# 語法
UNSUBSCRIBE [channel [channel ...]]

# 範例:取消特定頻道
UNSUBSCRIBE news

# 範例:取消所有頻道
UNSUBSCRIBE

4. 模式訂閱(PSUBSCRIBE)

使用萬用字元訂閱多個頻道:

# 語法
PSUBSCRIBE pattern [pattern ...]

# 範例:訂閱所有 news 開頭的頻道
PSUBSCRIBE news:*

# 範例:訂閱所有頻道
PSUBSCRIBE *

# 範例:訂閱特定格式
PSUBSCRIBE chat:room:*
PSUBSCRIBE user:*:notifications

萬用字元規則

  • *:匹配任意字元
  • ?:匹配單一字元
  • [abc]:匹配 a、b 或 c

輸出範例

redis> PSUBSCRIBE news:*
Reading messages...
1) "psubscribe"
2) "news:*"
3) (integer) 1

# 收到訊息時
1) "pmessage"           # 模式訊息
2) "news:*"             # 匹配的模式
3) "news:sports"        # 實際頻道
4) "Team wins!"         # 訊息內容

5. 取消模式訂閱(PUNSUBSCRIBE)

# 語法
PUNSUBSCRIBE [pattern [pattern ...]]

# 範例
PUNSUBSCRIBE news:*

6. 查看頻道(PUBSUB)

查看活躍頻道

# 列出至少有 1 個訂閱者的頻道
PUBSUB CHANNELS

# 列出符合模式的頻道
PUBSUB CHANNELS news:*

查看訂閱者數量

# 查看特定頻道的訂閱者數量
PUBSUB NUMSUB channel1 channel2

# 範例
redis> PUBSUB NUMSUB news chat
1) "news"
2) (integer) 3    # news 有 3 個訂閱者
3) "chat"
4) (integer) 1    # chat 有 1 個訂閱者

查看模式訂閱數量

# 查看活躍的模式訂閱數量
PUBSUB NUMPAT

redis> PUBSUB NUMPAT
(integer) 2  # 有 2 個客戶端使用模式訂閱

實戰範例

範例 1:命令列測試

終端機 1(訂閱者)

redis-cli
> SUBSCRIBE news

# 等待訊息...

終端機 2(發布者)

redis-cli
> PUBLISH news "Hello from publisher!"
(integer) 1

> PUBLISH news "Second message"
(integer) 1

終端機 1 會收到

1) "message"
2) "news"
3) "Hello from publisher!"

1) "message"
2) "news"
3) "Second message"

範例 2:Node.js 實作

安裝

npm install redis

訂閱者(Subscriber)

const redis = require('redis');

async function subscriber() {
  const client = redis.createClient();
  await client.connect();

  // 訂閱頻道
  await client.subscribe('news', (message, channel) => {
    console.log(`收到來自 ${channel} 的訊息: ${message}`);
  });

  console.log('已訂閱 news 頻道');
}

subscriber();

發布者(Publisher)

const redis = require('redis');

async function publisher() {
  const client = redis.createClient();
  await client.connect();

  // 發布訊息
  const subscriberCount = await client.publish('news', 'Breaking news!');
  console.log(`訊息已發送給 ${subscriberCount} 個訂閱者`);

  await client.quit();
}

publisher();

範例 3:Python 實作

安裝

pip install redis

訂閱者

import redis

# 連接 Redis
r = redis.Redis(host='localhost', port=6379, decode_responses=True)

# 建立 PubSub 物件
pubsub = r.pubsub()

# 訂閱頻道
pubsub.subscribe('news')

print('已訂閱 news 頻道,等待訊息...')

# 接收訊息
for message in pubsub.listen():
    if message['type'] == 'message':
        print(f"收到訊息: {message['data']}")

發布者

import redis

# 連接 Redis
r = redis.Redis(host='localhost', port=6379, decode_responses=True)

# 發布訊息
subscriber_count = r.publish('news', 'Breaking news from Python!')
print(f'訊息已發送給 {subscriber_count} 個訂閱者')

範例 4:Go 實作

安裝

go get github.com/go-redis/redis/v8

訂閱者

package main

import (
    "context"
    "fmt"
    "github.com/go-redis/redis/v8"
)

func main() {
    ctx := context.Background()

    // 連接 Redis
    rdb := redis.NewClient(&redis.Options{
        Addr: "localhost:6379",
    })

    // 訂閱頻道
    pubsub := rdb.Subscribe(ctx, "news")
    defer pubsub.Close()

    // 接收訊息
    ch := pubsub.Channel()
    fmt.Println("已訂閱 news 頻道")

    for msg := range ch {
        fmt.Printf("收到訊息: %s\n", msg.Payload)
    }
}

發布者

package main

import (
    "context"
    "fmt"
    "github.com/go-redis/redis/v8"
)

func main() {
    ctx := context.Background()

    // 連接 Redis
    rdb := redis.NewClient(&redis.Options{
        Addr: "localhost:6379",
    })

    // 發布訊息
    count, err := rdb.Publish(ctx, "news", "Breaking news from Go!").Result()
    if err != nil {
        panic(err)
    }

    fmt.Printf("訊息已發送給 %d 個訂閱者\n", count)
}

應用情境

情境 1:即時聊天室

// 加入聊天室
await client.subscribe('chat:room1', (message) => {
  const data = JSON.parse(message);
  displayMessage(data.user, data.text);
});

// 發送訊息
await client.publish('chat:room1', JSON.stringify({
  user: 'Alice',
  text: 'Hello everyone!',
  timestamp: Date.now()
}));

情境 2:即時通知系統

# 訂閱使用者的通知頻道
pubsub.subscribe(f'user:{user_id}:notifications')

# 發送通知給特定使用者
r.publish(f'user:{user_id}:notifications', json.dumps({
    'type': 'friend_request',
    'from': 'Bob',
    'message': 'Bob 想加你為好友'
}))

情境 3:即時儀表板更新

// 前端訂閱儀表板更新
await client.subscribe('dashboard:metrics', (message) => {
  const metrics = JSON.parse(message);
  updateDashboard(metrics);
});

// 後端定期發布更新
setInterval(async () => {
  const metrics = await getSystemMetrics();
  await client.publish('dashboard:metrics', JSON.stringify(metrics));
}, 1000);  // 每秒更新

情境 4:多伺服器之間的協調

# 伺服器 A 通知其他伺服器清除快取
r.publish('cache:invalidate', json.dumps({
    'key': 'user:123',
    'action': 'delete'
}))

# 其他伺服器監聽並處理
pubsub.subscribe('cache:invalidate')
for message in pubsub.listen():
    if message['type'] == 'message':
        data = json.loads(message['data'])
        cache.delete(data['key'])

情境 5:遊戲伺服器廣播

// 廣播遊戲狀態更新
rdb.Publish(ctx, "game:room1:updates", json.Marshal(GameState{
    Players: players,
    Score: score,
    Timestamp: time.Now(),
}))

// 所有玩家接收更新
pubsub := rdb.Subscribe(ctx, "game:room1:updates")
for msg := range pubsub.Channel() {
    var state GameState
    json.Unmarshal([]byte(msg.Payload), &state)
    updateGameUI(state)
}

進階模式

模式 1:分層頻道

# 按層級組織頻道
user:123:notifications
user:123:messages
user:123:alerts

order:456:status
order:456:updates

# 訂閱特定使用者的所有頻道
PSUBSCRIBE user:123:*

# 訂閱所有使用者的通知
PSUBSCRIBE user:*:notifications

模式 2:廣播 + 私人訊息

// 廣播給所有人
await client.publish('broadcast', '系統維護通知');

// 私人訊息給特定使用者
await client.publish(`user:${userId}:private`, '你有新訊息');

// 使用者訂閱兩個頻道
await client.subscribe(['broadcast', `user:${userId}:private`], handler);

模式 3:多語言頻道

# 根據使用者語言訂閱
lang = user.language  # 'en', 'zh', 'ja'
pubsub.subscribe(f'notifications:{lang}')

# 發送多語言通知
for lang in ['en', 'zh', 'ja']:
    message = translate(notification, lang)
    r.publish(f'notifications:{lang}', message)


常見問題

問題 1:訊息不持久化的影響

問題:訂閱者離線期間的訊息會丟失

# 發布訊息時沒有訂閱者
PUBLISH news "Important message"
(integer) 0  # 沒有人收到

# 之後才訂閱
SUBSCRIBE news  # 收不到之前的訊息

解決方案

  • 使用 Redis Streams(持久化的 Pub/Sub)
  • 搭配其他儲存機制(資料庫、Kafka)
  • 重要訊息不應該只用 Pub/Sub

問題 2:無法確認送達

Redis Pub/Sub 是「fire-and-forget」(發送後忘記):

  • ❌ 無法知道訊息是否被處理
  • ❌ 無法確認訂閱者是否在線
  • ❌ 無法重試失敗的訊息

適用情境

  • ✅ 即時通知(丟失無所謂)
  • ✅ 即時監控(下一次更新會補上)
  • ❌ 交易訊息(需要保證送達)
  • ❌ 關鍵業務邏輯

問題 3:訂閱模式會阻塞連線

# ❌ 訂閱後無法執行其他 Redis 命令
pubsub.subscribe('news')
r.get('key')  # 這行會失敗

# ✅ 使用兩個連線
r1 = redis.Redis()  # 用於訂閱
r2 = redis.Redis()  # 用於其他操作

pubsub = r1.pubsub()
pubsub.subscribe('news')

r2.get('key')  # 這樣可以

問題 4:效能考量

記憶體使用

  • 每個訂閱者的輸出緩衝區都會佔用記憶體
  • 大量訂閱者 + 高頻訊息 = 高記憶體使用

建議

# 設定客戶端輸出緩衝區限制
config set client-output-buffer-limit "pubsub 32mb 8mb 60"
#                                      類型  硬限制 軟限制 軟限制時間

Redis Pub/Sub vs Redis Streams

Redis Streams(Redis 5.0+)

優點

  • ✅ 訊息持久化
  • ✅ 可以回溯歷史訊息
  • ✅ Consumer Group 支援
  • ✅ 訊息確認機制

何時用 Streams?

  • 需要訊息持久化
  • 需要處理確認
  • 需要回溯歷史
  • 類似 Kafka 的使用場景

何時用 Pub/Sub?

  • 即時性要求極高
  • 不在乎訊息丟失
  • 簡單的廣播場景
  • 低延遲要求

監控與除錯

查看當前訂閱情況

# 查看所有活躍頻道
PUBSUB CHANNELS

# 查看特定頻道訂閱數
PUBSUB NUMSUB news chat

# 查看模式訂閱數
PUBSUB NUMPAT

使用 MONITOR 除錯

# 監控所有 Redis 命令(包含 Pub/Sub)
redis-cli MONITOR

# 輸出範例
1699999999.123456 [0 127.0.0.1:12345] "PUBLISH" "news" "Hello"
1699999999.234567 [0 127.0.0.1:12346] "SUBSCRIBE" "news"

客戶端輸出緩衝區監控

# 查看客戶端資訊
CLIENT LIST

# 輸出包含緩衝區使用情況
id=123 addr=127.0.0.1:54321 ... qbuf=0 qbuf-free=0 obl=16384 oll=0 omem=0 ...

最佳實踐

1. 使用有意義的頻道命名

# ✅ 好的命名
user:123:notifications
order:456:status
game:room1:updates

# ❌ 不好的命名
ch1
temp
test

2. 使用 JSON 格式傳遞結構化資料

// ✅ 使用 JSON
await client.publish('events', JSON.stringify({
  type: 'user_login',
  userId: 123,
  timestamp: Date.now(),
  metadata: { ip: '1.2.3.4' }
}));

// ❌ 使用純文字(難以解析)
await client.publish('events', 'user 123 logged in at 1699999999');

3. 錯誤處理

// 訂閱者端
client.on('error', (err) => {
  console.error('Redis 錯誤:', err);
  // 實作重連邏輯
});

// 發布者端
try {
  const count = await client.publish('news', message);
  if (count === 0) {
    console.warn('沒有訂閱者收到訊息');
  }
} catch (err) {
  console.error('發布失敗:', err);
}

4. 優雅關閉

// 清理資源
process.on('SIGINT', async () => {
  await client.unsubscribe();
  await client.quit();
  process.exit(0);
});

5. 測試時使用不同的 Redis DB

// 開發環境
const devClient = redis.createClient({ db: 0 });

// 測試環境
const testClient = redis.createClient({ db: 1 });


總結

快速決策指南

需求 Redis Pub/Sub 替代方案
即時聊天室 ✅ 很適合 WebSocket
即時通知 ✅ 很適合 Server-Sent Events
儀表板即時更新 ✅ 很適合 WebSocket
訂單處理 ❌ 不適合 Kafka, RabbitMQ
日誌收集 ❌ 不適合 Kafka, Fluentd
需要訊息持久化 ❌ 不適合 Redis Streams, Kafka
需要訊息確認 ❌ 不適合 RabbitMQ, Kafka

核心指令

# 發布
PUBLISH channel message

# 訂閱
SUBSCRIBE channel [channel ...]

# 模式訂閱
PSUBSCRIBE pattern [pattern ...]

# 取消訂閱
UNSUBSCRIBE [channel ...]
PUNSUBSCRIBE [pattern ...]

# 查詢
PUBSUB CHANNELS [pattern]
PUBSUB NUMSUB [channel ...]
PUBSUB NUMPAT

特性對照

特性 Redis Pub/Sub
持久化 ❌ 否
速度 ⚡⚡⚡ 極快
複雜度 簡單
訊息順序 保證
訊息確認 ❌ 無
回溯歷史 ❌ 無
多播能力 ✅ 有
適用場景 即時、輕量、可丟失

核心要點

  1. Redis Pub/Sub 特性

    • 極快的即時推送
    • 發布者與訂閱者解耦
    • 不持久化(訂閱者離線會丟失)
  2. 適合場景

    • ✅ 即時通知、聊天室、監控
    • ✅ 訊息可以丟失的場景
    • ❌ 需要持久化和確認的場景
  3. 與其他方案對比

    • vs Kafka:更快但不持久化
    • vs WebSocket:Redis 處理訊息路由
    • vs Redis Streams:Streams 提供持久化
  4. 最佳實踐

    • 使用 JSON 格式傳遞資料
    • 合理命名頻道(user:123:notifications)
    • 使用兩個 Redis 連線(訂閱+操作)
    • 做好錯誤處理和重連邏輯

建立日期:2025-10-28 最後更新:2025-11-18

🔗相關文章