目錄
- 什麼是 Fluentd?
- 核心概念
- 快速開始
- 配置詳解
- 常用插件
- Buffer 緩衝機制
- Kubernetes 整合
- Fluent Bit vs Fluentd
- 實戰範例
- 效能調優
- 最佳實踐
- 常見問題
- 總結
什麼是 Fluentd?
Fluentd 是一個開源的資料收集器,用於建立統一的日誌層(Unified Logging Layer),將各種來源的日誌收集、處理後轉發到不同的目的地。
核心特點
- 🎯 統一日誌層:將分散的日誌來源統一為結構化 JSON 格式
- ⚡ 插件生態豐富:500+ 社群插件,支援各種輸入輸出
- 🔧 可靠性高:支援緩衝、重試、故障轉移機制
- 📦 輕量靈活:核心小巧,按需載入插件
- 🎨 CNCF 畢業專案:雲原生生態系統的標準組件
為什麼選擇 Fluentd?
傳統日誌管理的問題:
- 日誌格式不統一,難以分析
- 各系統日誌分散,無法集中管理
- 擴展性差,新增來源需要大量配置
使用 Fluentd 的優勢:
- 統一日誌格式為 JSON,方便分析
- 解耦日誌生產者和消費者
- 插件系統支援任意來源和目的地
- 原生支援 Kubernetes 和雲原生環境
典型架構
┌───────────────────────────────────────────────────────────┐
│ 應用程式層 │
├─────────┬─────────┬─────────┬─────────┬─────────┬─────────┤
│ App 1 │ App 2 │ App 3 │ Nginx │ MySQL │ System │
└────┬────┴────┬────┴────┬────┴────┬────┴────┬────┴────┬────┘
│ │ │ │ │ │
└─────────┴─────────┴────┬────┴─────────┴─────────┘
│
▼
┌─────────────────┐
│ Fluentd │
│ (收集/處理) │
└────────┬────────┘
│
┌────────────────────┼────────────────────┐
│ │ │
▼ ▼ ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Elasticsearch │ │ S3/GCS │ │ Kafka │
│ (分析) │ │ (歸檔) │ │ (串流) │
└─────────────────┘ └─────────────────┘ └─────────────────┘
核心概念
事件(Event)
Fluentd 處理的基本單位是事件,每個事件包含三個部分:
event = {
tag: "myapp.access", # 標籤:用於路由
time: 1701388800, # 時間戳
record: { # 記錄:實際日誌內容
"host": "192.168.1.1",
"method": "GET",
"path": "/api/users"
}
}
標籤(Tag)
標籤用於路由事件到不同的處理流程:
# 標籤格式:用點號分隔的層級結構
myapp.access # 應用程式存取日誌
myapp.error # 應用程式錯誤日誌
kubernetes.var.log.* # Kubernetes 日誌
# 標籤匹配模式
* # 匹配單一層級
** # 匹配任意層級
{a,b} # 匹配 a 或 b
插件類型
| 插件類型 | 功能 | 範例 |
|---|---|---|
| Input | 資料輸入來源 | tail、http、forward、syslog |
| Output | 資料輸出目的地 | elasticsearch、s3、kafka、file |
| Filter | 資料處理轉換 | record_transformer、grep、parser |
| Parser | 解析日誌格式 | json、regexp、apache、nginx |
| Formatter | 輸出格式化 | json、csv、single_value |
| Buffer | 緩衝機制 | memory、file |
資料流程
Input → Parser → Filter → Buffer → Output
│ │ │ │ │
│ │ │ │ └─ 輸出到目的地
│ │ │ └─ 緩衝提高可靠性
│ │ └─ 轉換、過濾、修改
│ └─ 解析日誌格式
└─ 接收日誌資料
快速開始
安裝方式
# macOS
brew install fluentd
# Ubuntu/Debian(使用 td-agent,Fluentd 的穩定發行版)
curl -fsSL https://toolbelt.treasuredata.com/sh/install-ubuntu-jammy-fluent-package5-lts.sh | sh
# CentOS/RHEL
curl -fsSL https://toolbelt.treasuredata.com/sh/install-redhat-fluent-package5-lts.sh | sh
# Docker
docker pull fluent/fluentd:v1.16-debian
# Ruby gem(開發環境)
gem install fluentd
fluentd --setup ./fluent
驗證安裝
# 檢查版本
fluentd --version
# 或 td-agent
td-agent --version
# 檢查配置語法
fluentd --dry-run -c /etc/fluent/fluent.conf
# 啟動 Fluentd
fluentd -c /etc/fluent/fluent.conf
# td-agent 服務管理
sudo systemctl start td-agent
sudo systemctl status td-agent
第一個配置
# fluent.conf
# 輸入:監聽 HTTP 請求
<source>
@type http
port 8888
bind 0.0.0.0
</source>
# 輸入:監聽檔案
<source>
@type tail
path /var/log/myapp/*.log
pos_file /var/log/fluentd/myapp.pos
tag myapp.log
<parse>
@type json
</parse>
</source>
# 過濾:添加主機名
<filter myapp.**>
@type record_transformer
<record>
hostname "#{Socket.gethostname}"
</record>
</filter>
# 輸出:打印到標準輸出(用於測試)
<match **>
@type stdout
</match>
測試配置:
# 啟動 Fluentd
fluentd -c fluent.conf
# 發送測試事件
curl -X POST -d 'json={"message":"hello"}' http://localhost:8888/myapp.test
# 輸出
# myapp.test: {"message":"hello","hostname":"myserver"}
配置詳解
配置檔案結構
# 全域設定
<system>
log_level info
workers 4
</system>
# 輸入源
<source>
@type forward
port 24224
</source>
# 過濾器(按順序執行)
<filter pattern>
@type xxx
</filter>
# 輸出(路由匹配)
<match pattern>
@type xxx
</match>
# 標籤重寫
<match pattern>
@type relabel
@label @PROCESSED
</match>
<label @PROCESSED>
<match **>
@type elasticsearch
</match>
</label>
Source 輸入配置
tail(監控檔案)
<source>
@type tail
path /var/log/nginx/access.log
pos_file /var/log/fluentd/nginx-access.pos # 記錄讀取位置
tag nginx.access
read_from_head true # 從頭開始讀取
<parse>
@type nginx
</parse>
</source>
forward(接收其他 Fluentd)
<source>
@type forward
port 24224
bind 0.0.0.0
# 安全設定
<security>
self_hostname server.example.com
shared_key my_secret_key
</security>
</source>
http(HTTP 端點)
<source>
@type http
port 9880
bind 0.0.0.0
body_size_limit 32m
keepalive_timeout 10s
</source>
syslog(系統日誌)
<source>
@type syslog
port 5140
bind 0.0.0.0
tag system
<parse>
message_format auto
</parse>
</source>
Filter 過濾配置
record_transformer(修改記錄)
<filter myapp.**>
@type record_transformer
enable_ruby true
<record>
# 新增欄位
hostname "#{Socket.gethostname}"
timestamp ${time.strftime('%Y-%m-%d %H:%M:%S')}
# 修改欄位
level ${record["level"].upcase}
# 條件新增
environment ${ENV["RAILS_ENV"] || "development"}
</record>
# 移除欄位
remove_keys debug_info, internal_id
</filter>
grep(過濾事件)
# 只保留符合條件的事件
<filter myapp.**>
@type grep
<regexp>
key level
pattern /^(error|warn)$/
</regexp>
<exclude>
key message
pattern /health_check/
</exclude>
</filter>
parser(重新解析)
<filter raw.**>
@type parser
key_name message
reserve_data true
<parse>
@type json
</parse>
</filter>
Match 輸出配置
elasticsearch
<match myapp.**>
@type elasticsearch
host elasticsearch.example.com
port 9200
index_name fluentd
type_name _doc
# 日期索引
logstash_format true
logstash_prefix myapp
logstash_dateformat %Y.%m.%d
# 認證
user elastic
password ${ELASTIC_PASSWORD}
# 緩衝設定
<buffer>
@type file
path /var/log/fluentd/buffer/elasticsearch
flush_interval 5s
chunk_limit_size 8MB
queue_limit_length 512
retry_max_interval 30s
retry_forever true
</buffer>
</match>
file(本地檔案)
<match backup.**>
@type file
path /var/log/fluentd/backup/${tag}/%Y/%m/%d/
append true
<buffer tag, time>
@type file
path /var/log/fluentd/buffer/file
timekey 1h
timekey_wait 10m
flush_at_shutdown true
</buffer>
<format>
@type json
</format>
</match>
kafka
<match stream.**>
@type kafka2
brokers kafka1:9092,kafka2:9092,kafka3:9092
topic_key tag
default_topic fluentd
<format>
@type json
</format>
<buffer tag>
@type file
path /var/log/fluentd/buffer/kafka
flush_interval 3s
</buffer>
</match>
forward(轉發到其他 Fluentd)
<match **>
@type forward
<server>
host aggregator1.example.com
port 24224
weight 60
</server>
<server>
host aggregator2.example.com
port 24224
weight 40
standby true # 備援伺服器
</server>
<buffer>
@type file
path /var/log/fluentd/buffer/forward
flush_interval 1s
</buffer>
</match>
copy(複製到多個目的地)
<match myapp.**>
@type copy
<store>
@type elasticsearch
host es.example.com
# ... elasticsearch 配置
</store>
<store>
@type s3
s3_bucket myapp-logs
# ... s3 配置
</store>
<store>
@type kafka2
brokers kafka:9092
# ... kafka 配置
</store>
</match>
常用插件
Input 插件
| 插件 | 說明 | 使用場景 |
|---|---|---|
tail |
監控檔案 | 應用程式日誌檔 |
forward |
接收 Fluentd 轉發 | 聚合器接收 |
http |
HTTP 端點 | 應用程式直接發送 |
syslog |
Syslog 協定 | 系統日誌、網路設備 |
tcp/udp |
TCP/UDP socket | 自定義協定 |
exec |
執行外部命令 | 定期收集資料 |
monitor_agent |
監控 Fluentd 自身 | 運維監控 |
Output 插件
| 插件 | 說明 | 使用場景 |
|---|---|---|
elasticsearch |
Elasticsearch | 日誌搜尋分析 |
s3 |
AWS S3 | 日誌歸檔 |
kafka/kafka2 |
Apache Kafka | 串流處理 |
forward |
轉發 Fluentd | 多層架構 |
file |
本地檔案 | 備份、除錯 |
stdout |
標準輸出 | 開發測試 |
cloudwatch_logs |
AWS CloudWatch | AWS 監控 |
bigquery |
Google BigQuery | 大數據分析 |
Filter 插件
| 插件 | 說明 | 使用場景 |
|---|---|---|
record_transformer |
修改記錄 | 新增/刪除/修改欄位 |
grep |
過濾事件 | 按條件篩選 |
parser |
重新解析 | 解析巢狀 JSON |
geoip |
IP 地理位置 | 使用者分佈分析 |
throttle |
限流 | 防止日誌洪水 |
Buffer 緩衝機制
Buffer 是 Fluentd 可靠性的核心,用於暫存事件直到成功輸出。
Buffer 類型
| 類型 | 說明 | 適用場景 |
|---|---|---|
memory |
記憶體緩衝 | 開發測試、低延遲需求 |
file |
檔案緩衝 | 生產環境、高可靠性 |
Buffer 配置
<match **>
@type elasticsearch
<buffer tag, time>
@type file
path /var/log/fluentd/buffer/es
# Chunk 設定
chunk_limit_size 8MB # 單個 chunk 大小上限
total_limit_size 2GB # 總緩衝大小上限
# 刷新設定
flush_mode interval # lazy, interval, immediate
flush_interval 5s # 刷新間隔
flush_thread_count 2 # 刷新執行緒數
flush_at_shutdown true # 關閉時刷新
# 重試設定
retry_type exponential_backoff
retry_wait 1s # 初始重試等待
retry_max_interval 60s # 最大重試間隔
retry_timeout 72h # 重試超時
retry_forever false # 是否永遠重試
# 溢出處理
overflow_action block # throw_exception, block, drop_oldest_chunk
# 時間分片
timekey 1h # 按時間分片
timekey_wait 10m # 等待延遲資料
</buffer>
</match>
Buffer 工作流程
事件 → Chunk (staging) → Chunk (queued) → Output
│ │
│ chunk_limit_size │ flush_interval
│ 或 timekey 到期 │ 或 queue 達到閾值
│ │
└───────────────────┘
關鍵參數說明
| 參數 | 說明 | 建議值 |
|---|---|---|
chunk_limit_size |
單 chunk 大小 | 8-16MB |
total_limit_size |
總緩衝大小 | 根據磁碟空間 |
flush_interval |
刷新間隔 | 5-30s |
retry_max_interval |
最大重試間隔 | 30-60s |
overflow_action |
溢出處理 | block 或 drop_oldest_chunk |
Kubernetes 整合
DaemonSet 部署
# fluentd-daemonset.yaml
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: fluentd
namespace: logging
labels:
app: fluentd
spec:
selector:
matchLabels:
app: fluentd
template:
metadata:
labels:
app: fluentd
spec:
serviceAccountName: fluentd
tolerations:
- key: node-role.kubernetes.io/master
effect: NoSchedule
containers:
- name: fluentd
image: fluent/fluentd-kubernetes-daemonset:v1.16-debian-elasticsearch8
env:
- name: FLUENT_ELASTICSEARCH_HOST
value: "elasticsearch.logging.svc.cluster.local"
- name: FLUENT_ELASTICSEARCH_PORT
value: "9200"
- name: FLUENT_ELASTICSEARCH_SCHEME
value: "http"
resources:
limits:
memory: 512Mi
requests:
cpu: 100m
memory: 200Mi
volumeMounts:
- name: varlog
mountPath: /var/log
- name: dockercontainerlogdirectory
mountPath: /var/lib/docker/containers
readOnly: true
- name: config
mountPath: /fluentd/etc/fluent.conf
subPath: fluent.conf
volumes:
- name: varlog
hostPath:
path: /var/log
- name: dockercontainerlogdirectory
hostPath:
path: /var/lib/docker/containers
- name: config
configMap:
name: fluentd-config
ConfigMap 配置
# fluentd-configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: fluentd-config
namespace: logging
data:
fluent.conf: |
# 收集容器日誌
<source>
@type tail
path /var/log/containers/*.log
pos_file /var/log/fluentd-containers.log.pos
tag kubernetes.*
read_from_head true
<parse>
@type json
time_format %Y-%m-%dT%H:%M:%S.%NZ
</parse>
</source>
# 解析 Kubernetes 元資料
<filter kubernetes.**>
@type kubernetes_metadata
@id filter_kube_metadata
</filter>
# 輸出到 Elasticsearch
<match kubernetes.**>
@type elasticsearch
host "#{ENV['FLUENT_ELASTICSEARCH_HOST']}"
port "#{ENV['FLUENT_ELASTICSEARCH_PORT']}"
scheme "#{ENV['FLUENT_ELASTICSEARCH_SCHEME']}"
logstash_format true
logstash_prefix kubernetes
include_tag_key true
<buffer>
@type file
path /var/log/fluentd-buffers/kubernetes.buffer
flush_mode interval
flush_interval 5s
retry_type exponential_backoff
retry_forever true
</buffer>
</match>
RBAC 配置
# fluentd-rbac.yaml
apiVersion: v1
kind: ServiceAccount
metadata:
name: fluentd
namespace: logging
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: fluentd
rules:
- apiGroups: [""]
resources: ["pods", "namespaces"]
verbs: ["get", "list", "watch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: fluentd
roleRef:
kind: ClusterRole
name: fluentd
apiGroup: rbac.authorization.k8s.io
subjects:
- kind: ServiceAccount
name: fluentd
namespace: logging
Fluent Bit vs Fluentd
比較表
| 特性 | Fluent Bit | Fluentd |
|---|---|---|
| 語言 | C | Ruby + C |
| 記憶體使用 | ~450KB | ~40MB |
| 插件數量 | ~80 | ~500+ |
| 定位 | 輕量級收集器 | 功能完整的處理器 |
| 適用場景 | 邊緣節點、IoT、容器 | 聚合器、複雜處理 |
| 處理能力 | 基本處理 | 複雜轉換、路由 |
架構建議
┌─────────────────────────────────────────────────────────────┐
│ Kubernetes Nodes │
├─────────┬─────────┬─────────┬─────────┬─────────┬─────────┤
│ Node 1 │ Node 2 │ Node 3 │ Node 4 │ Node 5 │ Node 6 │
│Fluent │Fluent │Fluent │Fluent │Fluent │Fluent │
│ Bit │ Bit │ Bit │ Bit │ Bit │ Bit │
└────┬────┴────┬────┴────┬────┴────┬────┴────┬────┴────┬────┘
│ │ │ │ │ │
└─────────┴─────────┴────┬────┴─────────┴─────────┘
│ forward
▼
┌─────────────────┐
│ Fluentd │
│ (聚合/處理) │
└────────┬────────┘
│
┌────────────────────┼────────────────────┐
▼ ▼ ▼
Elasticsearch S3 Kafka
Fluent Bit 配置範例
# fluent-bit.conf
[SERVICE]
Flush 1
Log_Level info
Daemon off
Parsers_File parsers.conf
[INPUT]
Name tail
Path /var/log/containers/*.log
Parser docker
Tag kube.*
Refresh_Interval 5
Mem_Buf_Limit 5MB
[FILTER]
Name kubernetes
Match kube.*
Kube_URL https://kubernetes.default.svc:443
Kube_CA_File /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
Kube_Token_File /var/run/secrets/kubernetes.io/serviceaccount/token
[OUTPUT]
Name forward
Match *
Host fluentd-aggregator
Port 24224
實戰範例
範例 1:Web 應用日誌收集
# 收集 Nginx 存取日誌和應用程式日誌
<source>
@type tail
path /var/log/nginx/access.log
pos_file /var/log/fluentd/nginx-access.pos
tag nginx.access
<parse>
@type nginx
</parse>
</source>
<source>
@type tail
path /var/log/myapp/*.log
pos_file /var/log/fluentd/myapp.pos
tag myapp.log
<parse>
@type json
time_key timestamp
time_format %Y-%m-%dT%H:%M:%S.%L%z
</parse>
</source>
# 添加通用欄位
<filter **>
@type record_transformer
<record>
hostname "#{Socket.gethostname}"
environment "#{ENV['RAILS_ENV'] || 'production'}"
</record>
</filter>
# 分離錯誤日誌
<match myapp.log>
@type rewrite_tag_filter
<rule>
key level
pattern /^(error|fatal)$/
tag myapp.error
</rule>
<rule>
key level
pattern /.*/
tag myapp.info
</rule>
</match>
# 錯誤日誌發送告警
<match myapp.error>
@type copy
<store>
@type elasticsearch
host es.example.com
index_name myapp-errors
</store>
<store>
@type slack
webhook_url https://hooks.slack.com/services/xxx
channel alerts
username fluentd
</store>
</match>
# 一般日誌存儲
<match {nginx.**,myapp.info}>
@type elasticsearch
host es.example.com
logstash_format true
logstash_prefix logs
<buffer>
@type file
path /var/log/fluentd/buffer/es
flush_interval 10s
</buffer>
</match>
範例 2:多租戶日誌隔離
# 根據 Kubernetes namespace 路由到不同索引
<source>
@type tail
path /var/log/containers/*.log
pos_file /var/log/fluentd/containers.pos
tag kubernetes.*
<parse>
@type json
</parse>
</source>
<filter kubernetes.**>
@type kubernetes_metadata
</filter>
# 根據 namespace 重寫 tag
<match kubernetes.**>
@type rewrite_tag_filter
<rule>
key $.kubernetes.namespace_name
pattern /^(.+)$/
tag tenant.$1
</rule>
</match>
# 為每個租戶創建獨立索引
<match tenant.**>
@type elasticsearch
host es.example.com
# 動態索引名稱
index_name ${tag_parts[1]}-logs
<buffer tag, time>
@type file
path /var/log/fluentd/buffer/${tag}
timekey 1h
flush_interval 30s
</buffer>
</match>
範例 3:日誌歸檔到 S3
# 即時分析 + 長期歸檔
<match myapp.**>
@type copy
# 即時:發送到 Elasticsearch
<store>
@type elasticsearch
host es.example.com
logstash_format true
<buffer>
@type file
path /var/log/fluentd/buffer/es
flush_interval 5s
</buffer>
</store>
# 歸檔:發送到 S3
<store>
@type s3
aws_key_id "#{ENV['AWS_ACCESS_KEY_ID']}"
aws_sec_key "#{ENV['AWS_SECRET_ACCESS_KEY']}"
s3_bucket myapp-logs-archive
s3_region ap-northeast-1
path logs/%Y/%m/%d/
s3_object_key_format %{path}%{time_slice}_%{index}.%{file_extension}
<buffer time>
@type file
path /var/log/fluentd/buffer/s3
timekey 1h # 每小時一個檔案
timekey_wait 10m # 等待延遲資料
chunk_limit_size 256m
</buffer>
<format>
@type json
</format>
</store>
</match>
效能調優
多 Worker 模式
<system>
workers 4 # 啟用 4 個 worker 程序
root_dir /var/log/fluentd # 工作目錄
</system>
# Worker 特定配置
<worker 0>
<source>
@type tail
path /var/log/app1/*.log
tag app1
</source>
</worker>
<worker 1>
<source>
@type tail
path /var/log/app2/*.log
tag app2
</source>
</worker>
效能參數調優
<system>
# 日誌設定
log_level warn # 減少日誌輸出
suppress_repeated_stacktrace true
# 效能設定
workers 4
# 記憶體限制
<root_agent>
emit_error_log_interval 30s
</root_agent>
</system>
<source>
@type tail
path /var/log/app/*.log
# 效能調優
read_lines_limit 1000 # 每次讀取行數
read_bytes_limit_per_second 10485760 # 10MB/s 限速
<parse>
@type json
</parse>
</source>
<match **>
@type elasticsearch
<buffer>
@type file
path /var/log/fluentd/buffer
# 緩衝效能
chunk_limit_size 16MB
queue_limit_length 256
flush_thread_count 4 # 多執行緒刷新
flush_interval 5s
# 壓縮
compress gzip
</buffer>
</match>
資源需求估算
| 日誌量 | CPU | 記憶體 | 建議配置 |
|---|---|---|---|
| <1GB/天 | 0.5 core | 512MB | 單 worker |
| 1-10GB/天 | 1 core | 1GB | 2 workers |
| 10-50GB/天 | 2 cores | 2GB | 4 workers |
| >50GB/天 | 4+ cores | 4GB+ | 多節點聚合 |
最佳實踐
1. 可靠性設計
# ✅ 推薦:使用檔案緩衝 + 重試
<match **>
@type elasticsearch
<buffer>
@type file # 使用檔案緩衝
path /var/log/fluentd/buffer
flush_at_shutdown true # 關閉時刷新
retry_forever true # 永遠重試
retry_type exponential_backoff
</buffer>
</match>
# ❌ 不推薦:記憶體緩衝無重試
<match **>
@type elasticsearch
<buffer>
@type memory
# 程序崩潰會丟失資料
</buffer>
</match>
2. 標籤設計
# ✅ 推薦:層級化標籤
# 格式:{環境}.{應用}.{類型}
production.myapp.access
production.myapp.error
staging.myapp.access
# 方便路由
<match production.**> # 所有生產環境
<match **.error> # 所有錯誤日誌
<match production.myapp.*> # 生產環境的 myapp
# ❌ 不推薦:扁平標籤
myapp_production_access
myapp_production_error
3. 避免日誌洪水
# 限制日誌速率
<filter myapp.**>
@type throttle
group_key kubernetes.pod_name
group_bucket_period_s 60
group_bucket_limit 1000
group_reset_rate_s 100
</filter>
# 取樣高頻日誌
<filter debug.**>
@type sampling
sample_rate 10 # 只保留 10% 的日誌
</filter>
4. 安全配置
# 加密傳輸
<source>
@type forward
port 24224
<transport tls>
cert_path /etc/fluentd/certs/server.crt
private_key_path /etc/fluentd/certs/server.key
ca_path /etc/fluentd/certs/ca.crt
</transport>
<security>
self_hostname aggregator.example.com
shared_key ${FLUENTD_SHARED_KEY}
</security>
</source>
# 敏感資料遮罩
<filter **>
@type record_transformer
enable_ruby true
<record>
message ${record["message"].gsub(/password=\S+/, 'password=***')}
message ${record["message"].gsub(/\b\d{16}\b/, '****-****-****-****')}
</record>
</filter>
常見問題
問題 1:日誌丟失
症狀:部分日誌沒有到達目的地
原因:
- 緩衝溢出
- 程序異常終止
- 網路問題
解決方案:
<buffer>
@type file # 使用檔案緩衝
path /var/log/fluentd/buffer
total_limit_size 5GB # 增加緩衝大小
overflow_action block # 溢出時阻塞而非丟棄
flush_at_shutdown true # 關閉時刷新
retry_forever true # 永遠重試
</buffer>
問題 2:記憶體使用過高
症狀:Fluentd 消耗大量記憶體
原因:
- 使用記憶體緩衝
- chunk 太大
- 積壓太多事件
解決方案:
<buffer>
@type file # 改用檔案緩衝
chunk_limit_size 8MB # 限制 chunk 大小
queue_limit_length 128 # 限制佇列長度
compress gzip # 壓縮緩衝資料
</buffer>
問題 3:日誌延遲
症狀:日誌到達目的地有延遲
原因:
- flush_interval 太長
- 緩衝積壓
- 下游處理慢
解決方案:
<buffer>
flush_mode immediate # 立即刷新
# 或
flush_interval 1s # 縮短刷新間隔
flush_thread_count 4 # 增加刷新執行緒
</buffer>
問題 4:pos_file 錯誤
症狀:重啟後重複讀取或跳過日誌
原因:
- pos_file 路徑無權限
- pos_file 被刪除
- 多個 source 共用 pos_file
解決方案:
<source>
@type tail
path /var/log/app/*.log
# 每個 source 獨立的 pos_file
pos_file /var/log/fluentd/app.log.pos
# 確保目錄存在且有權限
# mkdir -p /var/log/fluentd && chown fluentd:fluentd /var/log/fluentd
</source>
問題 5:Kubernetes 元資料缺失
症狀:日誌沒有 Pod、Namespace 等資訊
原因:
- kubernetes_metadata 插件未安裝
- RBAC 權限不足
- API Server 連接問題
解決方案:
# 安裝插件
fluent-gem install fluent-plugin-kubernetes_metadata_filter
# 檢查 RBAC
kubectl auth can-i get pods --as=system:serviceaccount:logging:fluentd
# 配置
<filter kubernetes.**>
@type kubernetes_metadata
@log_level debug # 開啟除錯日誌
skip_labels false
skip_container_metadata false
</filter>
總結
核心要點
Fluentd = Input + Filter + Buffer + Output
關鍵優勢:
- ✅ 統一日誌格式,解耦生產者和消費者
- ✅ 豐富的插件生態系統
- ✅ 可靠的緩衝和重試機制
- ✅ 原生支援 Kubernetes 和雲原生環境
架構選擇指南
| 場景 | 推薦架構 |
|---|---|
| 單機應用 | Fluentd 直接收集 |
| Kubernetes | Fluent Bit DaemonSet + Fluentd 聚合器 |
| 大規模叢集 | Fluent Bit → Kafka → Fluentd → 儲存 |
| 邊緣/IoT | Fluent Bit 輕量收集 |
配置速查
# 基本結構
<source> # 輸入
@type tail
path /var/log/*.log
tag myapp
</source>
<filter myapp> # 過濾
@type record_transformer
</filter>
<match myapp> # 輸出
@type elasticsearch
<buffer>
@type file
</buffer>
</match>
常用插件速查
| 功能 | Input | Output | Filter |
|---|---|---|---|
| 檔案 | tail | file | - |
| 轉發 | forward | forward | - |
| ES | - | elasticsearch | - |
| Kafka | kafka | kafka2 | - |
| 修改 | - | - | record_transformer |
| 過濾 | - | - | grep |
參考資源
- 官方文件:https://docs.fluentd.org/
- GitHub:https://github.com/fluent/fluentd
- 插件列表:https://www.fluentd.org/plugins
- Fluent Bit:https://fluentbit.io/
- Kubernetes 整合:https://github.com/fluent/fluentd-kubernetes-daemonset
建立日期:2025-12-01 最後更新:2025-12-01