Fluentd 完全指南

開源統一日誌層,用於收集、處理和轉發日誌數據,是 CNCF 畢業專案。


目錄


什麼是 Fluentd?

Fluentd 是一個開源的資料收集器,用於建立統一的日誌層(Unified Logging Layer),將各種來源的日誌收集、處理後轉發到不同的目的地。

核心特點

  • 🎯 統一日誌層:將分散的日誌來源統一為結構化 JSON 格式
  • 插件生態豐富:500+ 社群插件,支援各種輸入輸出
  • 🔧 可靠性高:支援緩衝、重試、故障轉移機制
  • 📦 輕量靈活:核心小巧,按需載入插件
  • 🎨 CNCF 畢業專案:雲原生生態系統的標準組件

為什麼選擇 Fluentd?

傳統日誌管理的問題

  • 日誌格式不統一,難以分析
  • 各系統日誌分散,無法集中管理
  • 擴展性差,新增來源需要大量配置

使用 Fluentd 的優勢

  • 統一日誌格式為 JSON,方便分析
  • 解耦日誌生產者和消費者
  • 插件系統支援任意來源和目的地
  • 原生支援 Kubernetes 和雲原生環境

典型架構

┌───────────────────────────────────────────────────────────┐
│                      應用程式層                             │
├─────────┬─────────┬─────────┬─────────┬─────────┬─────────┤
│  App 1  │  App 2  │  App 3  │ Nginx   │ MySQL   │ System  │
└────┬────┴────┬────┴────┬────┴────┬────┴────┬────┴────┬────┘
     │         │         │         │         │         │
     └─────────┴─────────┴────┬────┴─────────┴─────────┘
                    ┌─────────────────┐
                    │    Fluentd      │
                    │   (收集/處理)    │
                    └────────┬────────┘
         ┌────────────────────┼────────────────────┐
         │                    │                    │
         ▼                    ▼                    ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Elasticsearch   │ │    S3/GCS       │ │    Kafka        │
│   (分析)         │ │   (歸檔)        │ │   (串流)         │
└─────────────────┘ └─────────────────┘ └─────────────────┘

核心概念

事件(Event)

Fluentd 處理的基本單位是事件,每個事件包含三個部分:

event = {
    tag: "myapp.access",           # 標籤:用於路由
    time: 1701388800,              # 時間戳
    record: {                      # 記錄:實際日誌內容
        "host": "192.168.1.1",
        "method": "GET",
        "path": "/api/users"
    }
}

標籤(Tag)

標籤用於路由事件到不同的處理流程:

# 標籤格式:用點號分隔的層級結構
myapp.access          # 應用程式存取日誌
myapp.error           # 應用程式錯誤日誌
kubernetes.var.log.*  # Kubernetes 日誌

# 標籤匹配模式
*          # 匹配單一層級
**         # 匹配任意層級
{a,b}      # 匹配 a 或 b

插件類型

插件類型 功能 範例
Input 資料輸入來源 tail、http、forward、syslog
Output 資料輸出目的地 elasticsearch、s3、kafka、file
Filter 資料處理轉換 record_transformer、grep、parser
Parser 解析日誌格式 json、regexp、apache、nginx
Formatter 輸出格式化 json、csv、single_value
Buffer 緩衝機制 memory、file

資料流程

Input → Parser → Filter → Buffer → Output
  │        │        │        │        │
  │        │        │        │        └─ 輸出到目的地
  │        │        │        └─ 緩衝提高可靠性
  │        │        └─ 轉換、過濾、修改
  │        └─ 解析日誌格式
  └─ 接收日誌資料

快速開始

安裝方式

# macOS
brew install fluentd

# Ubuntu/Debian(使用 td-agent,Fluentd 的穩定發行版)
curl -fsSL https://toolbelt.treasuredata.com/sh/install-ubuntu-jammy-fluent-package5-lts.sh | sh

# CentOS/RHEL
curl -fsSL https://toolbelt.treasuredata.com/sh/install-redhat-fluent-package5-lts.sh | sh

# Docker
docker pull fluent/fluentd:v1.16-debian

# Ruby gem(開發環境)
gem install fluentd
fluentd --setup ./fluent

驗證安裝

# 檢查版本
fluentd --version
# 或 td-agent
td-agent --version

# 檢查配置語法
fluentd --dry-run -c /etc/fluent/fluent.conf

# 啟動 Fluentd
fluentd -c /etc/fluent/fluent.conf

# td-agent 服務管理
sudo systemctl start td-agent
sudo systemctl status td-agent

第一個配置

# fluent.conf

# 輸入:監聽 HTTP 請求
<source>
  @type http
  port 8888
  bind 0.0.0.0
</source>

# 輸入:監聽檔案
<source>
  @type tail
  path /var/log/myapp/*.log
  pos_file /var/log/fluentd/myapp.pos
  tag myapp.log
  <parse>
    @type json
  </parse>
</source>

# 過濾:添加主機名
<filter myapp.**>
  @type record_transformer
  <record>
    hostname "#{Socket.gethostname}"
  </record>
</filter>

# 輸出:打印到標準輸出(用於測試)
<match **>
  @type stdout
</match>

測試配置

# 啟動 Fluentd
fluentd -c fluent.conf

# 發送測試事件
curl -X POST -d 'json={"message":"hello"}' http://localhost:8888/myapp.test

# 輸出
# myapp.test: {"message":"hello","hostname":"myserver"}

配置詳解

配置檔案結構

# 全域設定
<system>
  log_level info
  workers 4
</system>

# 輸入源
<source>
  @type forward
  port 24224
</source>

# 過濾器(按順序執行)
<filter pattern>
  @type xxx
</filter>

# 輸出(路由匹配)
<match pattern>
  @type xxx
</match>

# 標籤重寫
<match pattern>
  @type relabel
  @label @PROCESSED
</match>

<label @PROCESSED>
  <match **>
    @type elasticsearch
  </match>
</label>

Source 輸入配置

tail(監控檔案)

<source>
  @type tail
  path /var/log/nginx/access.log
  pos_file /var/log/fluentd/nginx-access.pos  # 記錄讀取位置
  tag nginx.access
  read_from_head true                          # 從頭開始讀取

  <parse>
    @type nginx
  </parse>
</source>

forward(接收其他 Fluentd)

<source>
  @type forward
  port 24224
  bind 0.0.0.0

  # 安全設定
  <security>
    self_hostname server.example.com
    shared_key my_secret_key
  </security>
</source>

http(HTTP 端點)

<source>
  @type http
  port 9880
  bind 0.0.0.0
  body_size_limit 32m
  keepalive_timeout 10s
</source>

syslog(系統日誌)

<source>
  @type syslog
  port 5140
  bind 0.0.0.0
  tag system

  <parse>
    message_format auto
  </parse>
</source>

Filter 過濾配置

record_transformer(修改記錄)

<filter myapp.**>
  @type record_transformer
  enable_ruby true

  <record>
    # 新增欄位
    hostname "#{Socket.gethostname}"
    timestamp ${time.strftime('%Y-%m-%d %H:%M:%S')}

    # 修改欄位
    level ${record["level"].upcase}

    # 條件新增
    environment ${ENV["RAILS_ENV"] || "development"}
  </record>

  # 移除欄位
  remove_keys debug_info, internal_id
</filter>

grep(過濾事件)

# 只保留符合條件的事件
<filter myapp.**>
  @type grep

  <regexp>
    key level
    pattern /^(error|warn)$/
  </regexp>

  <exclude>
    key message
    pattern /health_check/
  </exclude>
</filter>

parser(重新解析)

<filter raw.**>
  @type parser
  key_name message
  reserve_data true

  <parse>
    @type json
  </parse>
</filter>

Match 輸出配置

elasticsearch

<match myapp.**>
  @type elasticsearch
  host elasticsearch.example.com
  port 9200
  index_name fluentd
  type_name _doc

  # 日期索引
  logstash_format true
  logstash_prefix myapp
  logstash_dateformat %Y.%m.%d

  # 認證
  user elastic
  password ${ELASTIC_PASSWORD}

  # 緩衝設定
  <buffer>
    @type file
    path /var/log/fluentd/buffer/elasticsearch
    flush_interval 5s
    chunk_limit_size 8MB
    queue_limit_length 512
    retry_max_interval 30s
    retry_forever true
  </buffer>
</match>

file(本地檔案)

<match backup.**>
  @type file
  path /var/log/fluentd/backup/${tag}/%Y/%m/%d/
  append true

  <buffer tag, time>
    @type file
    path /var/log/fluentd/buffer/file
    timekey 1h
    timekey_wait 10m
    flush_at_shutdown true
  </buffer>

  <format>
    @type json
  </format>
</match>

kafka

<match stream.**>
  @type kafka2
  brokers kafka1:9092,kafka2:9092,kafka3:9092
  topic_key tag
  default_topic fluentd

  <format>
    @type json
  </format>

  <buffer tag>
    @type file
    path /var/log/fluentd/buffer/kafka
    flush_interval 3s
  </buffer>
</match>

forward(轉發到其他 Fluentd)

<match **>
  @type forward

  <server>
    host aggregator1.example.com
    port 24224
    weight 60
  </server>

  <server>
    host aggregator2.example.com
    port 24224
    weight 40
    standby true  # 備援伺服器
  </server>

  <buffer>
    @type file
    path /var/log/fluentd/buffer/forward
    flush_interval 1s
  </buffer>
</match>

copy(複製到多個目的地)

<match myapp.**>
  @type copy

  <store>
    @type elasticsearch
    host es.example.com
    # ... elasticsearch 配置
  </store>

  <store>
    @type s3
    s3_bucket myapp-logs
    # ... s3 配置
  </store>

  <store>
    @type kafka2
    brokers kafka:9092
    # ... kafka 配置
  </store>
</match>

常用插件

Input 插件

插件 說明 使用場景
tail 監控檔案 應用程式日誌檔
forward 接收 Fluentd 轉發 聚合器接收
http HTTP 端點 應用程式直接發送
syslog Syslog 協定 系統日誌、網路設備
tcp/udp TCP/UDP socket 自定義協定
exec 執行外部命令 定期收集資料
monitor_agent 監控 Fluentd 自身 運維監控

Output 插件

插件 說明 使用場景
elasticsearch Elasticsearch 日誌搜尋分析
s3 AWS S3 日誌歸檔
kafka/kafka2 Apache Kafka 串流處理
forward 轉發 Fluentd 多層架構
file 本地檔案 備份、除錯
stdout 標準輸出 開發測試
cloudwatch_logs AWS CloudWatch AWS 監控
bigquery Google BigQuery 大數據分析

Filter 插件

插件 說明 使用場景
record_transformer 修改記錄 新增/刪除/修改欄位
grep 過濾事件 按條件篩選
parser 重新解析 解析巢狀 JSON
geoip IP 地理位置 使用者分佈分析
throttle 限流 防止日誌洪水

Buffer 緩衝機制

Buffer 是 Fluentd 可靠性的核心,用於暫存事件直到成功輸出。

Buffer 類型

類型 說明 適用場景
memory 記憶體緩衝 開發測試、低延遲需求
file 檔案緩衝 生產環境、高可靠性

Buffer 配置

<match **>
  @type elasticsearch

  <buffer tag, time>
    @type file
    path /var/log/fluentd/buffer/es

    # Chunk 設定
    chunk_limit_size 8MB          # 單個 chunk 大小上限
    total_limit_size 2GB          # 總緩衝大小上限

    # 刷新設定
    flush_mode interval           # lazy, interval, immediate
    flush_interval 5s             # 刷新間隔
    flush_thread_count 2          # 刷新執行緒數
    flush_at_shutdown true        # 關閉時刷新

    # 重試設定
    retry_type exponential_backoff
    retry_wait 1s                 # 初始重試等待
    retry_max_interval 60s        # 最大重試間隔
    retry_timeout 72h             # 重試超時
    retry_forever false           # 是否永遠重試

    # 溢出處理
    overflow_action block         # throw_exception, block, drop_oldest_chunk

    # 時間分片
    timekey 1h                    # 按時間分片
    timekey_wait 10m              # 等待延遲資料
  </buffer>
</match>

Buffer 工作流程

事件 → Chunk (staging) → Chunk (queued) → Output
           │                   │
           │ chunk_limit_size  │ flush_interval
           │ 或 timekey 到期   │ 或 queue 達到閾值
           │                   │
           └───────────────────┘

關鍵參數說明

參數 說明 建議值
chunk_limit_size 單 chunk 大小 8-16MB
total_limit_size 總緩衝大小 根據磁碟空間
flush_interval 刷新間隔 5-30s
retry_max_interval 最大重試間隔 30-60s
overflow_action 溢出處理 block 或 drop_oldest_chunk

Kubernetes 整合

DaemonSet 部署

# fluentd-daemonset.yaml
apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: fluentd
  namespace: logging
  labels:
    app: fluentd
spec:
  selector:
    matchLabels:
      app: fluentd
  template:
    metadata:
      labels:
        app: fluentd
    spec:
      serviceAccountName: fluentd
      tolerations:
        - key: node-role.kubernetes.io/master
          effect: NoSchedule
      containers:
        - name: fluentd
          image: fluent/fluentd-kubernetes-daemonset:v1.16-debian-elasticsearch8
          env:
            - name: FLUENT_ELASTICSEARCH_HOST
              value: "elasticsearch.logging.svc.cluster.local"
            - name: FLUENT_ELASTICSEARCH_PORT
              value: "9200"
            - name: FLUENT_ELASTICSEARCH_SCHEME
              value: "http"
          resources:
            limits:
              memory: 512Mi
            requests:
              cpu: 100m
              memory: 200Mi
          volumeMounts:
            - name: varlog
              mountPath: /var/log
            - name: dockercontainerlogdirectory
              mountPath: /var/lib/docker/containers
              readOnly: true
            - name: config
              mountPath: /fluentd/etc/fluent.conf
              subPath: fluent.conf
      volumes:
        - name: varlog
          hostPath:
            path: /var/log
        - name: dockercontainerlogdirectory
          hostPath:
            path: /var/lib/docker/containers
        - name: config
          configMap:
            name: fluentd-config

ConfigMap 配置

# fluentd-configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: fluentd-config
  namespace: logging
data:
  fluent.conf: |
    # 收集容器日誌
    <source>
      @type tail
      path /var/log/containers/*.log
      pos_file /var/log/fluentd-containers.log.pos
      tag kubernetes.*
      read_from_head true
      <parse>
        @type json
        time_format %Y-%m-%dT%H:%M:%S.%NZ
      </parse>
    </source>

    # 解析 Kubernetes 元資料
    <filter kubernetes.**>
      @type kubernetes_metadata
      @id filter_kube_metadata
    </filter>

    # 輸出到 Elasticsearch
    <match kubernetes.**>
      @type elasticsearch
      host "#{ENV['FLUENT_ELASTICSEARCH_HOST']}"
      port "#{ENV['FLUENT_ELASTICSEARCH_PORT']}"
      scheme "#{ENV['FLUENT_ELASTICSEARCH_SCHEME']}"
      logstash_format true
      logstash_prefix kubernetes
      include_tag_key true

      <buffer>
        @type file
        path /var/log/fluentd-buffers/kubernetes.buffer
        flush_mode interval
        flush_interval 5s
        retry_type exponential_backoff
        retry_forever true
      </buffer>
    </match>

RBAC 配置

# fluentd-rbac.yaml
apiVersion: v1
kind: ServiceAccount
metadata:
  name: fluentd
  namespace: logging

---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: fluentd
rules:
  - apiGroups: [""]
    resources: ["pods", "namespaces"]
    verbs: ["get", "list", "watch"]

---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: fluentd
roleRef:
  kind: ClusterRole
  name: fluentd
  apiGroup: rbac.authorization.k8s.io
subjects:
  - kind: ServiceAccount
    name: fluentd
    namespace: logging

Fluent Bit vs Fluentd

比較表

特性 Fluent Bit Fluentd
語言 C Ruby + C
記憶體使用 ~450KB ~40MB
插件數量 ~80 ~500+
定位 輕量級收集器 功能完整的處理器
適用場景 邊緣節點、IoT、容器 聚合器、複雜處理
處理能力 基本處理 複雜轉換、路由

架構建議

┌─────────────────────────────────────────────────────────────┐
│                     Kubernetes Nodes                         │
├─────────┬─────────┬─────────┬─────────┬─────────┬─────────┤
│ Node 1  │ Node 2  │ Node 3  │ Node 4  │ Node 5  │ Node 6  │
│Fluent   │Fluent   │Fluent   │Fluent   │Fluent   │Fluent   │
│  Bit    │  Bit    │  Bit    │  Bit    │  Bit    │  Bit    │
└────┬────┴────┬────┴────┬────┴────┬────┴────┬────┴────┬────┘
     │         │         │         │         │         │
     └─────────┴─────────┴────┬────┴─────────┴─────────┘
                              │ forward
                    ┌─────────────────┐
                    │    Fluentd      │
                    │   (聚合/處理)    │
                    └────────┬────────┘
         ┌────────────────────┼────────────────────┐
         ▼                    ▼                    ▼
   Elasticsearch            S3                  Kafka

Fluent Bit 配置範例

# fluent-bit.conf
[SERVICE]
    Flush         1
    Log_Level     info
    Daemon        off
    Parsers_File  parsers.conf

[INPUT]
    Name              tail
    Path              /var/log/containers/*.log
    Parser            docker
    Tag               kube.*
    Refresh_Interval  5
    Mem_Buf_Limit     5MB

[FILTER]
    Name                kubernetes
    Match               kube.*
    Kube_URL            https://kubernetes.default.svc:443
    Kube_CA_File        /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
    Kube_Token_File     /var/run/secrets/kubernetes.io/serviceaccount/token

[OUTPUT]
    Name          forward
    Match         *
    Host          fluentd-aggregator
    Port          24224

實戰範例

範例 1:Web 應用日誌收集

# 收集 Nginx 存取日誌和應用程式日誌

<source>
  @type tail
  path /var/log/nginx/access.log
  pos_file /var/log/fluentd/nginx-access.pos
  tag nginx.access

  <parse>
    @type nginx
  </parse>
</source>

<source>
  @type tail
  path /var/log/myapp/*.log
  pos_file /var/log/fluentd/myapp.pos
  tag myapp.log

  <parse>
    @type json
    time_key timestamp
    time_format %Y-%m-%dT%H:%M:%S.%L%z
  </parse>
</source>

# 添加通用欄位
<filter **>
  @type record_transformer
  <record>
    hostname "#{Socket.gethostname}"
    environment "#{ENV['RAILS_ENV'] || 'production'}"
  </record>
</filter>

# 分離錯誤日誌
<match myapp.log>
  @type rewrite_tag_filter

  <rule>
    key level
    pattern /^(error|fatal)$/
    tag myapp.error
  </rule>

  <rule>
    key level
    pattern /.*/
    tag myapp.info
  </rule>
</match>

# 錯誤日誌發送告警
<match myapp.error>
  @type copy

  <store>
    @type elasticsearch
    host es.example.com
    index_name myapp-errors
  </store>

  <store>
    @type slack
    webhook_url https://hooks.slack.com/services/xxx
    channel alerts
    username fluentd
  </store>
</match>

# 一般日誌存儲
<match {nginx.**,myapp.info}>
  @type elasticsearch
  host es.example.com
  logstash_format true
  logstash_prefix logs

  <buffer>
    @type file
    path /var/log/fluentd/buffer/es
    flush_interval 10s
  </buffer>
</match>

範例 2:多租戶日誌隔離

# 根據 Kubernetes namespace 路由到不同索引

<source>
  @type tail
  path /var/log/containers/*.log
  pos_file /var/log/fluentd/containers.pos
  tag kubernetes.*

  <parse>
    @type json
  </parse>
</source>

<filter kubernetes.**>
  @type kubernetes_metadata
</filter>

# 根據 namespace 重寫 tag
<match kubernetes.**>
  @type rewrite_tag_filter

  <rule>
    key $.kubernetes.namespace_name
    pattern /^(.+)$/
    tag tenant.$1
  </rule>
</match>

# 為每個租戶創建獨立索引
<match tenant.**>
  @type elasticsearch
  host es.example.com

  # 動態索引名稱
  index_name ${tag_parts[1]}-logs

  <buffer tag, time>
    @type file
    path /var/log/fluentd/buffer/${tag}
    timekey 1h
    flush_interval 30s
  </buffer>
</match>

範例 3:日誌歸檔到 S3

# 即時分析 + 長期歸檔

<match myapp.**>
  @type copy

  # 即時:發送到 Elasticsearch
  <store>
    @type elasticsearch
    host es.example.com
    logstash_format true

    <buffer>
      @type file
      path /var/log/fluentd/buffer/es
      flush_interval 5s
    </buffer>
  </store>

  # 歸檔:發送到 S3
  <store>
    @type s3

    aws_key_id "#{ENV['AWS_ACCESS_KEY_ID']}"
    aws_sec_key "#{ENV['AWS_SECRET_ACCESS_KEY']}"
    s3_bucket myapp-logs-archive
    s3_region ap-northeast-1

    path logs/%Y/%m/%d/
    s3_object_key_format %{path}%{time_slice}_%{index}.%{file_extension}

    <buffer time>
      @type file
      path /var/log/fluentd/buffer/s3
      timekey 1h           # 每小時一個檔案
      timekey_wait 10m     # 等待延遲資料
      chunk_limit_size 256m
    </buffer>

    <format>
      @type json
    </format>
  </store>
</match>

效能調優

多 Worker 模式

<system>
  workers 4                    # 啟用 4 個 worker 程序
  root_dir /var/log/fluentd   # 工作目錄
</system>

# Worker 特定配置
<worker 0>
  <source>
    @type tail
    path /var/log/app1/*.log
    tag app1
  </source>
</worker>

<worker 1>
  <source>
    @type tail
    path /var/log/app2/*.log
    tag app2
  </source>
</worker>

效能參數調優

<system>
  # 日誌設定
  log_level warn              # 減少日誌輸出
  suppress_repeated_stacktrace true

  # 效能設定
  workers 4

  # 記憶體限制
  <root_agent>
    emit_error_log_interval 30s
  </root_agent>
</system>

<source>
  @type tail
  path /var/log/app/*.log

  # 效能調優
  read_lines_limit 1000       # 每次讀取行數
  read_bytes_limit_per_second 10485760  # 10MB/s 限速

  <parse>
    @type json
  </parse>
</source>

<match **>
  @type elasticsearch

  <buffer>
    @type file
    path /var/log/fluentd/buffer

    # 緩衝效能
    chunk_limit_size 16MB
    queue_limit_length 256
    flush_thread_count 4      # 多執行緒刷新
    flush_interval 5s

    # 壓縮
    compress gzip
  </buffer>
</match>

資源需求估算

日誌量 CPU 記憶體 建議配置
<1GB/天 0.5 core 512MB 單 worker
1-10GB/天 1 core 1GB 2 workers
10-50GB/天 2 cores 2GB 4 workers
>50GB/天 4+ cores 4GB+ 多節點聚合

最佳實踐

1. 可靠性設計

# ✅ 推薦:使用檔案緩衝 + 重試
<match **>
  @type elasticsearch

  <buffer>
    @type file                    # 使用檔案緩衝
    path /var/log/fluentd/buffer
    flush_at_shutdown true        # 關閉時刷新
    retry_forever true            # 永遠重試
    retry_type exponential_backoff
  </buffer>
</match>

# ❌ 不推薦:記憶體緩衝無重試
<match **>
  @type elasticsearch

  <buffer>
    @type memory
    # 程序崩潰會丟失資料
  </buffer>
</match>

2. 標籤設計

# ✅ 推薦:層級化標籤
# 格式:{環境}.{應用}.{類型}
production.myapp.access
production.myapp.error
staging.myapp.access

# 方便路由
<match production.**>       # 所有生產環境
<match **.error>            # 所有錯誤日誌
<match production.myapp.*>  # 生產環境的 myapp

# ❌ 不推薦:扁平標籤
myapp_production_access
myapp_production_error

3. 避免日誌洪水

# 限制日誌速率
<filter myapp.**>
  @type throttle
  group_key kubernetes.pod_name
  group_bucket_period_s 60
  group_bucket_limit 1000
  group_reset_rate_s 100
</filter>

# 取樣高頻日誌
<filter debug.**>
  @type sampling
  sample_rate 10              # 只保留 10% 的日誌
</filter>

4. 安全配置

# 加密傳輸
<source>
  @type forward
  port 24224

  <transport tls>
    cert_path /etc/fluentd/certs/server.crt
    private_key_path /etc/fluentd/certs/server.key
    ca_path /etc/fluentd/certs/ca.crt
  </transport>

  <security>
    self_hostname aggregator.example.com
    shared_key ${FLUENTD_SHARED_KEY}
  </security>
</source>

# 敏感資料遮罩
<filter **>
  @type record_transformer
  enable_ruby true

  <record>
    message ${record["message"].gsub(/password=\S+/, 'password=***')}
    message ${record["message"].gsub(/\b\d{16}\b/, '****-****-****-****')}
  </record>
</filter>

常見問題

問題 1:日誌丟失

症狀:部分日誌沒有到達目的地

原因

  • 緩衝溢出
  • 程序異常終止
  • 網路問題

解決方案

<buffer>
  @type file                      # 使用檔案緩衝
  path /var/log/fluentd/buffer
  total_limit_size 5GB            # 增加緩衝大小
  overflow_action block           # 溢出時阻塞而非丟棄
  flush_at_shutdown true          # 關閉時刷新
  retry_forever true              # 永遠重試
</buffer>

問題 2:記憶體使用過高

症狀:Fluentd 消耗大量記憶體

原因

  • 使用記憶體緩衝
  • chunk 太大
  • 積壓太多事件

解決方案

<buffer>
  @type file                      # 改用檔案緩衝
  chunk_limit_size 8MB            # 限制 chunk 大小
  queue_limit_length 128          # 限制佇列長度
  compress gzip                   # 壓縮緩衝資料
</buffer>

問題 3:日誌延遲

症狀:日誌到達目的地有延遲

原因

  • flush_interval 太長
  • 緩衝積壓
  • 下游處理慢

解決方案

<buffer>
  flush_mode immediate            # 立即刷新
  # 或
  flush_interval 1s               # 縮短刷新間隔
  flush_thread_count 4            # 增加刷新執行緒
</buffer>

問題 4:pos_file 錯誤

症狀:重啟後重複讀取或跳過日誌

原因

  • pos_file 路徑無權限
  • pos_file 被刪除
  • 多個 source 共用 pos_file

解決方案

<source>
  @type tail
  path /var/log/app/*.log

  # 每個 source 獨立的 pos_file
  pos_file /var/log/fluentd/app.log.pos

  # 確保目錄存在且有權限
  # mkdir -p /var/log/fluentd && chown fluentd:fluentd /var/log/fluentd
</source>

問題 5:Kubernetes 元資料缺失

症狀:日誌沒有 Pod、Namespace 等資訊

原因

  • kubernetes_metadata 插件未安裝
  • RBAC 權限不足
  • API Server 連接問題

解決方案

# 安裝插件
fluent-gem install fluent-plugin-kubernetes_metadata_filter

# 檢查 RBAC
kubectl auth can-i get pods --as=system:serviceaccount:logging:fluentd

# 配置
<filter kubernetes.**>
  @type kubernetes_metadata
  @log_level debug                # 開啟除錯日誌
  skip_labels false
  skip_container_metadata false
</filter>

總結

核心要點

Fluentd = Input + Filter + Buffer + Output

關鍵優勢

  • ✅ 統一日誌格式,解耦生產者和消費者
  • ✅ 豐富的插件生態系統
  • ✅ 可靠的緩衝和重試機制
  • ✅ 原生支援 Kubernetes 和雲原生環境

架構選擇指南

場景 推薦架構
單機應用 Fluentd 直接收集
Kubernetes Fluent Bit DaemonSet + Fluentd 聚合器
大規模叢集 Fluent Bit → Kafka → Fluentd → 儲存
邊緣/IoT Fluent Bit 輕量收集

配置速查

# 基本結構
<source>          # 輸入
  @type tail
  path /var/log/*.log
  tag myapp
</source>

<filter myapp>    # 過濾
  @type record_transformer
</filter>

<match myapp>     # 輸出
  @type elasticsearch
  <buffer>
    @type file
  </buffer>
</match>

常用插件速查

功能 Input Output Filter
檔案 tail file -
轉發 forward forward -
ES - elasticsearch -
Kafka kafka kafka2 -
修改 - - record_transformer
過濾 - - grep

參考資源


建立日期:2025-12-01 最後更新:2025-12-01

🔗相關文章