Java 多執行緒完全指南

深入理解 Java 多執行緒的核心概念、Thread 使用方式、同步機制與併發程式設計實踐


多執行緒是什麼?

多執行緒(Multithreading) 是讓程式同時執行多個任務的技術。

核心概念

單執行緒:
任務 A → 任務 B → 任務 C
(依序執行,必須等待)

多執行緒:
任務 A ↘
任務 B → 同時執行
任務 C ↗

優點:
✅ 提升效能(充分利用 CPU)
✅ 改善響應速度
✅ 處理並發請求

Process vs Thread

Process(程序):
├── 獨立的記憶體空間
├── 獨立的資源
├── 程序間通訊較複雜
└── 建立成本高

Thread(執行緒):
├── 共享同一個程序的記憶體
├── 共享資源(變數、物件)
├── 執行緒間通訊容易
└── 建立成本低

一個 Process 可以有多個 Thread

Java Thread 基礎

建立執行緒的方式

方法 1:繼承 Thread 類別

// 定義執行緒類別
class MyThread extends Thread {
    @Override
    public void run() {
        // 執行緒要執行的程式碼
        for (int i = 0; i < 5; i++) {
            System.out.println(Thread.currentThread().getName() + ": " + i);
            try {
                Thread.sleep(1000);  // 休眠 1 秒
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

// 使用執行緒
public class Main {
    public static void main(String[] args) {
        MyThread t1 = new MyThread();
        MyThread t2 = new MyThread();

        t1.start();  // 啟動執行緒(不是 run())
        t2.start();
    }
}

// 輸出(順序可能不同):
// Thread-0: 0
// Thread-1: 0
// Thread-0: 1
// Thread-1: 1
// ...

方法 2:實作 Runnable 介面(推薦)

// 定義 Runnable
class MyTask implements Runnable {
    @Override
    public void run() {
        for (int i = 0; i < 5; i++) {
            System.out.println(Thread.currentThread().getName() + ": " + i);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

// 使用
public class Main {
    public static void main(String[] args) {
        MyTask task = new MyTask();

        Thread t1 = new Thread(task, "Worker-1");
        Thread t2 = new Thread(task, "Worker-2");

        t1.start();
        t2.start();
    }
}

// 為什麼推薦 Runnable?
// ✅ Java 是單一繼承,用 Runnable 不佔用繼承位置
// ✅ 更符合物件導向設計(分離任務和執行)
// ✅ 可以重複使用同一個 Runnable

方法 3:使用 Lambda(Java 8+)

public class Main {
    public static void main(String[] args) {
        // 直接用 Lambda 建立 Runnable
        Thread t1 = new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                System.out.println(Thread.currentThread().getName() + ": " + i);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        t1.start();
    }
}

// Lambda 最簡潔!

方法 4:使用 Callable 和 Future

import java.util.concurrent.*;

// Callable 可以返回結果
class MyCallable implements Callable<Integer> {
    @Override
    public Integer call() throws Exception {
        int sum = 0;
        for (int i = 1; i <= 100; i++) {
            sum += i;
            Thread.sleep(10);
        }
        return sum;
    }
}

public class Main {
    public static void main(String[] args) throws Exception {
        ExecutorService executor = Executors.newFixedThreadPool(2);

        // 提交 Callable 任務
        Future<Integer> future = executor.submit(new MyCallable());

        // 做其他事情...
        System.out.println("等待結果...");

        // 取得結果(會阻塞直到完成)
        Integer result = future.get();
        System.out.println("結果: " + result);  // 5050

        executor.shutdown();
    }
}

// Callable vs Runnable
// Callable: 可以返回結果,可以拋出異常
// Runnable: 無返回值,不能拋出 checked exception

Thread 生命週期

執行緒狀態

NEW (新建)
  ↓ start()
RUNNABLE (可執行)
  ↓ 獲得 CPU
RUNNING (執行中)
  ├→ sleep()/wait() → BLOCKED/WAITING
  ├→ I/O 操作      → BLOCKED
  ├→ join()        → WAITING
  └→ 完成          → TERMINATED

狀態轉換圖

┌─────────┐
│   NEW   │ 新建執行緒
└────┬────┘
     │ start()
┌─────────┐
│RUNNABLE │ 等待 CPU 排程
└────┬────┘
     │ CPU 排程
┌─────────┐
│ RUNNING │ 執行中
└────┬────┘
     ├→ sleep(ms)    ┌──────────┐
     ├→ wait()      →│  WAITING │ 等待狀態
     ├→ join()       └──────────┘
     │                     ↓ notify()/notifyAll()
     ├→ I/O         ┌──────────┐
     ├→ synchronized→│  BLOCKED │ 阻塞狀態
     │               └──────────┘
     │                     ↓ 獲得鎖/I/O 完成
     └→ 完成        ┌──────────┐
                   →│TERMINATED│ 終止
                    └──────────┘

執行緒狀態範例

public class ThreadStateDemo {
    public static void main(String[] args) throws InterruptedException {
        Thread t = new Thread(() -> {
            try {
                System.out.println("Thread 開始執行");
                Thread.sleep(2000);
                System.out.println("Thread 結束");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        System.out.println("NEW: " + t.getState());        // NEW

        t.start();
        System.out.println("RUNNABLE: " + t.getState());   // RUNNABLE

        Thread.sleep(100);
        System.out.println("TIMED_WAITING: " + t.getState()); // TIMED_WAITING

        t.join();  // 等待執行緒結束
        System.out.println("TERMINATED: " + t.getState()); // TERMINATED
    }
}

Thread 常用方法

基本方法

// 啟動執行緒
thread.start();

// 取得執行緒名稱
String name = Thread.currentThread().getName();

// 設定執行緒名稱
thread.setName("MyThread");

// 休眠(毫秒)
Thread.sleep(1000);  // 休眠 1 秒

// 讓出 CPU(給其他執行緒機會)
Thread.yield();

// 等待執行緒結束
thread.join();       // 無限等待
thread.join(5000);   // 最多等 5 秒

// 中斷執行緒
thread.interrupt();

// 檢查是否被中斷
boolean isInterrupted = thread.isInterrupted();

// 檢查是否還活著
boolean isAlive = thread.isAlive();

// 設定為 Daemon 執行緒(背景執行緒)
thread.setDaemon(true);

sleep() vs wait()

// sleep()
// - Thread 的靜態方法
// - 不釋放鎖
// - 時間到自動醒來
Thread.sleep(1000);

// wait()
// - Object 的方法
// - 釋放鎖
// - 需要 notify()/notifyAll() 喚醒
synchronized (obj) {
    obj.wait();
}

// 範例:sleep 不釋放鎖
class SleepDemo {
    public static void main(String[] args) {
        Object lock = new Object();

        new Thread(() -> {
            synchronized (lock) {
                System.out.println("T1: 獲得鎖");
                try {
                    Thread.sleep(3000);  // 不釋放鎖!
                } catch (InterruptedException e) {}
                System.out.println("T1: 釋放鎖");
            }
        }).start();

        new Thread(() -> {
            synchronized (lock) {
                System.out.println("T2: 獲得鎖");
            }
        }).start();
    }
}
// 輸出:
// T1: 獲得鎖
// (等待 3 秒)
// T1: 釋放鎖
// T2: 獲得鎖

join() 方法

public class JoinDemo {
    public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(() -> {
            System.out.println("T1 開始");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {}
            System.out.println("T1 結束");
        });

        Thread t2 = new Thread(() -> {
            System.out.println("T2 開始");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {}
            System.out.println("T2 結束");
        });

        t1.start();
        t2.start();

        t1.join();  // 等待 t1 結束
        t2.join();  // 等待 t2 結束

        System.out.println("主執行緒結束");
    }
}

// 輸出:
// T1 開始
// T2 開始
// T2 結束
// T1 結束
// 主執行緒結束

執行緒同步(Synchronization)

為什麼需要同步?

// 問題:多執行緒存取共享變數
class Counter {
    private int count = 0;

    public void increment() {
        count++;  // 不是原子操作!
    }

    public int getCount() {
        return count;
    }
}

public class Main {
    public static void main(String[] args) throws InterruptedException {
        Counter counter = new Counter();

        // 建立 1000 個執行緒,每個都 +1
        Thread[] threads = new Thread[1000];
        for (int i = 0; i < 1000; i++) {
            threads[i] = new Thread(() -> counter.increment());
            threads[i].start();
        }

        // 等待所有執行緒結束
        for (Thread t : threads) {
            t.join();
        }

        System.out.println("Count: " + counter.getCount());
        // 預期:1000
        // 實際:可能是 987, 992, 995... (小於 1000)
    }
}

// 原因:count++ 不是原子操作
// 實際上是:
// 1. 讀取 count
// 2. +1
// 3. 寫回 count
// 多執行緒可能同時讀到同一個值!

synchronized 關鍵字

1. 同步方法

class Counter {
    private int count = 0;

    // 方法加上 synchronized
    public synchronized void increment() {
        count++;
    }

    public synchronized int getCount() {
        return count;
    }
}

// synchronized 方法等於:
public void increment() {
    synchronized (this) {
        count++;
    }
}

2. 同步區塊

class Counter {
    private int count = 0;
    private Object lock = new Object();

    public void increment() {
        // 只同步關鍵區域
        synchronized (lock) {
            count++;
        }
    }
}

// 優點:
// ✅ 可以選擇不同的鎖物件
// ✅ 同步範圍更小,效能更好

3. 靜態同步方法

class Counter {
    private static int count = 0;

    // 鎖的是 Class 物件
    public static synchronized void increment() {
        count++;
    }
}

// 等於:
public static void increment() {
    synchronized (Counter.class) {
        count++;
    }
}

synchronized 範例

// 銀行帳戶範例
class BankAccount {
    private int balance = 1000;

    public synchronized void withdraw(int amount) {
        if (balance >= amount) {
            System.out.println(Thread.currentThread().getName() +
                             " 準備提款: " + amount);

            // 模擬處理時間
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {}

            balance -= amount;
            System.out.println(Thread.currentThread().getName() +
                             " 完成提款,餘額: " + balance);
        } else {
            System.out.println(Thread.currentThread().getName() +
                             " 餘額不足");
        }
    }

    public synchronized int getBalance() {
        return balance;
    }
}

public class Main {
    public static void main(String[] args) {
        BankAccount account = new BankAccount();

        // 兩人同時提款
        Thread t1 = new Thread(() -> account.withdraw(600), "小明");
        Thread t2 = new Thread(() -> account.withdraw(600), "小華");

        t1.start();
        t2.start();
    }
}

// 有 synchronized:
// 小明 準備提款: 600
// 小明 完成提款,餘額: 400
// 小華 餘額不足

// 沒有 synchronized:
// 小明 準備提款: 600
// 小華 準備提款: 600
// 小明 完成提款,餘額: 400
// 小華 完成提款,餘額: -200  ← 錯誤!

死鎖(Deadlock)

什麼是死鎖?

死鎖:兩個或多個執行緒互相等待對方釋放資源,導致永久阻塞

情境:
執行緒 A:持有鎖 1,等待鎖 2
執行緒 B:持有鎖 2,等待鎖 1
→ 雙方都在等待,永遠無法繼續

死鎖範例

public class DeadlockDemo {
    private static Object lock1 = new Object();
    private static Object lock2 = new Object();

    public static void main(String[] args) {
        Thread t1 = new Thread(() -> {
            synchronized (lock1) {
                System.out.println("T1: 獲得 lock1");
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {}

                System.out.println("T1: 等待 lock2");
                synchronized (lock2) {
                    System.out.println("T1: 獲得 lock2");
                }
            }
        });

        Thread t2 = new Thread(() -> {
            synchronized (lock2) {
                System.out.println("T2: 獲得 lock2");
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {}

                System.out.println("T2: 等待 lock1");
                synchronized (lock1) {
                    System.out.println("T2: 獲得 lock1");
                }
            }
        });

        t1.start();
        t2.start();
    }
}

// 輸出:
// T1: 獲得 lock1
// T2: 獲得 lock2
// T1: 等待 lock2
// T2: 等待 lock1
// (程式卡住,永遠不會結束)

避免死鎖

// 方法 1:總是以相同順序獲取鎖
public class NoDeadlock {
    private static Object lock1 = new Object();
    private static Object lock2 = new Object();

    public static void main(String[] args) {
        Thread t1 = new Thread(() -> {
            synchronized (lock1) {  // 先 lock1
                synchronized (lock2) {  // 再 lock2
                    System.out.println("T1 完成");
                }
            }
        });

        Thread t2 = new Thread(() -> {
            synchronized (lock1) {  // 先 lock1(順序相同)
                synchronized (lock2) {  // 再 lock2
                    System.out.println("T2 完成");
                }
            }
        });

        t1.start();
        t2.start();
    }
}

// 方法 2:使用 tryLock(Lock 介面)
Lock lock1 = new ReentrantLock();
Lock lock2 = new ReentrantLock();

if (lock1.tryLock()) {
    try {
        if (lock2.tryLock()) {
            try {
                // 同時獲得兩個鎖
            } finally {
                lock2.unlock();
            }
        }
    } finally {
        lock1.unlock();
    }
}

// 方法 3:設定超時
if (lock1.tryLock(1, TimeUnit.SECONDS)) {
    // ...
}

wait() 和 notify()

生產者-消費者模式

import java.util.LinkedList;
import java.util.Queue;

class SharedBuffer {
    private Queue<Integer> buffer = new LinkedList<>();
    private int capacity = 5;

    // 生產
    public synchronized void produce(int value) throws InterruptedException {
        // 緩衝區滿了,等待
        while (buffer.size() == capacity) {
            System.out.println("緩衝區滿,生產者等待");
            wait();  // 釋放鎖,等待消費者通知
        }

        buffer.add(value);
        System.out.println("生產: " + value + " (目前: " + buffer.size() + ")");

        notifyAll();  // 通知消費者
    }

    // 消費
    public synchronized int consume() throws InterruptedException {
        // 緩衝區空了,等待
        while (buffer.isEmpty()) {
            System.out.println("緩衝區空,消費者等待");
            wait();  // 釋放鎖,等待生產者通知
        }

        int value = buffer.poll();
        System.out.println("消費: " + value + " (目前: " + buffer.size() + ")");

        notifyAll();  // 通知生產者
        return value;
    }
}

public class ProducerConsumerDemo {
    public static void main(String[] args) {
        SharedBuffer buffer = new SharedBuffer();

        // 生產者執行緒
        Thread producer = new Thread(() -> {
            try {
                for (int i = 1; i <= 10; i++) {
                    buffer.produce(i);
                    Thread.sleep(500);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        // 消費者執行緒
        Thread consumer = new Thread(() -> {
            try {
                for (int i = 1; i <= 10; i++) {
                    buffer.consume();
                    Thread.sleep(1000);  // 消費慢一點
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        producer.start();
        consumer.start();
    }
}

// 輸出範例:
// 生產: 1 (目前: 1)
// 生產: 2 (目前: 2)
// 消費: 1 (目前: 1)
// 生產: 3 (目前: 2)
// 生產: 4 (目前: 3)
// 消費: 2 (目前: 2)
// ...

wait() vs notify() vs notifyAll()

wait()
- 釋放鎖
- 進入等待狀態
- 需要在 synchronized 區塊內呼叫

notify()
- 隨機喚醒一個等待的執行緒
- 不釋放鎖(要等 synchronized 區塊結束)

notifyAll()
- 喚醒所有等待的執行緒
- 推薦使用(避免訊號遺失)

Lock 介面(java.util.concurrent.locks)

ReentrantLock

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class Counter {
    private int count = 0;
    private Lock lock = new ReentrantLock();

    public void increment() {
        lock.lock();  // 獲取鎖
        try {
            count++;
        } finally {
            lock.unlock();  // 一定要在 finally 釋放鎖!
        }
    }

    public int getCount() {
        lock.lock();
        try {
            return count;
        } finally {
            lock.unlock();
        }
    }
}

ReentrantLock vs synchronized

// synchronized 優點:
// ✅ 語法簡單
// ✅ 自動釋放鎖
// ✅ JVM 優化好

// ReentrantLock 優點:
// ✅ tryLock()(嘗試獲取鎖)
// ✅ lockInterruptibly()(可中斷)
// ✅ 公平鎖(fairness)
// ✅ 多個 Condition

// tryLock 範例
Lock lock = new ReentrantLock();

if (lock.tryLock()) {
    try {
        // 獲得鎖,執行
    } finally {
        lock.unlock();
    }
} else {
    // 沒獲得鎖,做其他事
    System.out.println("無法獲得鎖,稍後重試");
}

// tryLock 帶超時
if (lock.tryLock(1, TimeUnit.SECONDS)) {
    try {
        // 獲得鎖
    } finally {
        lock.unlock();
    }
} else {
    // 1 秒內沒獲得鎖
}

// 公平鎖(先等待的先獲得)
Lock fairLock = new ReentrantLock(true);

ReadWriteLock

import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

class Cache {
    private Map<String, String> data = new HashMap<>();
    private ReadWriteLock rwLock = new ReentrantReadWriteLock();

    // 讀取(允許多執行緒同時讀)
    public String get(String key) {
        rwLock.readLock().lock();
        try {
            return data.get(key);
        } finally {
            rwLock.readLock().unlock();
        }
    }

    // 寫入(獨佔)
    public void put(String key, String value) {
        rwLock.writeLock().lock();
        try {
            data.put(key, value);
        } finally {
            rwLock.writeLock().unlock();
        }
    }
}

// 特點:
// ✅ 多個執行緒可以同時讀
// ✅ 寫入時獨佔(其他讀寫都阻塞)
// ✅ 適合讀多寫少的情境

執行緒池(Thread Pool)

為什麼需要執行緒池?

問題:
每次建立執行緒 → 成本高
頻繁建立/銷毀 → 浪費資源

解決:
執行緒池 → 重複使用執行緒
管理執行緒數量 → 避免過多執行緒

ExecutorService

import java.util.concurrent.*;

public class ThreadPoolDemo {
    public static void main(String[] args) {
        // 建立固定大小的執行緒池
        ExecutorService executor = Executors.newFixedThreadPool(5);

        // 提交 10 個任務
        for (int i = 1; i <= 10; i++) {
            final int taskId = i;
            executor.submit(() -> {
                System.out.println("Task " + taskId + " 執行於 " +
                                 Thread.currentThread().getName());
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {}
            });
        }

        // 關閉執行緒池
        executor.shutdown();  // 不接受新任務,等待已提交的完成

        try {
            // 等待所有任務完成
            if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
                executor.shutdownNow();  // 強制關閉
            }
        } catch (InterruptedException e) {
            executor.shutdownNow();
        }
    }
}

// 輸出:
// Task 1 執行於 pool-1-thread-1
// Task 2 執行於 pool-1-thread-2
// Task 3 執行於 pool-1-thread-3
// Task 4 執行於 pool-1-thread-4
// Task 5 執行於 pool-1-thread-5
// Task 6 執行於 pool-1-thread-1  ← 重複使用
// ...

執行緒池類型

// 1. Fixed Thread Pool(固定大小)
ExecutorService fixed = Executors.newFixedThreadPool(10);
// - 固定 10 個執行緒
// - 適合負載穩定的場景

// 2. Cached Thread Pool(快取)
ExecutorService cached = Executors.newCachedThreadPool();
// - 根據需要建立執行緒
// - 閒置 60 秒自動回收
// - 適合大量短期任務

// 3. Single Thread Executor(單一執行緒)
ExecutorService single = Executors.newSingleThreadExecutor();
// - 只有一個執行緒
// - 保證任務依序執行
// - 適合需要順序執行的場景

// 4. Scheduled Thread Pool(排程)
ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(5);
// - 可以延遲執行
// - 可以定期執行

ScheduledExecutorService 範例

import java.util.concurrent.*;

public class ScheduledDemo {
    public static void main(String[] args) {
        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

        // 延遲 3 秒執行
        scheduler.schedule(() -> {
            System.out.println("3 秒後執行");
        }, 3, TimeUnit.SECONDS);

        // 延遲 1 秒後,每 2 秒執行一次
        scheduler.scheduleAtFixedRate(() -> {
            System.out.println("定期執行: " + System.currentTimeMillis());
        }, 1, 2, TimeUnit.SECONDS);

        // 讓程式執行 10 秒後關閉
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {}

        scheduler.shutdown();
    }
}

// 輸出:
// 3 秒後執行
// 定期執行: 1699876543210
// 定期執行: 1699876545210
// 定期執行: 1699876547210
// ...

自訂 ThreadPoolExecutor

import java.util.concurrent.*;

public class CustomThreadPool {
    public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            5,                      // corePoolSize:核心執行緒數
            10,                     // maximumPoolSize:最大執行緒數
            60,                     // keepAliveTime:閒置存活時間
            TimeUnit.SECONDS,       // 時間單位
            new ArrayBlockingQueue<>(100),  // 工作佇列
            new ThreadPoolExecutor.CallerRunsPolicy()  // 拒絕策略
        );

        // 使用執行緒池
        for (int i = 1; i <= 20; i++) {
            final int taskId = i;
            executor.submit(() -> {
                System.out.println("Task " + taskId);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {}
            });
        }

        executor.shutdown();
    }
}

// 參數說明:
// corePoolSize:常駐執行緒數
// maximumPoolSize:最大執行緒數(負載高時擴展)
// keepAliveTime:超過 core 的執行緒閒置多久後回收
// workQueue:任務佇列
// rejectedExecutionHandler:任務拒絕策略

// 拒絕策略:
// AbortPolicy:拋出異常(預設)
// CallerRunsPolicy:由呼叫者執行
// DiscardPolicy:靜默丟棄
// DiscardOldestPolicy:丟棄最舊的任務

原子類別(Atomic Classes)

AtomicInteger

import java.util.concurrent.atomic.AtomicInteger;

class Counter {
    private AtomicInteger count = new AtomicInteger(0);

    public void increment() {
        count.incrementAndGet();  // 原子操作
    }

    public int getCount() {
        return count.get();
    }
}

public class AtomicDemo {
    public static void main(String[] args) throws InterruptedException {
        Counter counter = new Counter();

        // 1000 個執行緒,每個 +1
        Thread[] threads = new Thread[1000];
        for (int i = 0; i < 1000; i++) {
            threads[i] = new Thread(() -> counter.increment());
            threads[i].start();
        }

        for (Thread t : threads) {
            t.join();
        }

        System.out.println("Count: " + counter.getCount());  // 一定是 1000!
    }
}

// Atomic 類別:
// AtomicInteger
// AtomicLong
// AtomicBoolean
// AtomicReference

// 常用方法:
int get()                 // 取得值
void set(int newValue)    // 設定值
int getAndIncrement()     // get 然後 +1
int incrementAndGet()     // +1 然後 get
int getAndSet(int newValue)  // 取得舊值並設定新值
boolean compareAndSet(int expect, int update)  // CAS 操作

CAS(Compare-And-Swap)

import java.util.concurrent.atomic.AtomicInteger;

public class CASDemo {
    public static void main(String[] args) {
        AtomicInteger value = new AtomicInteger(10);

        // CAS: 如果值是 10,就改成 20
        boolean success = value.compareAndSet(10, 20);
        System.out.println("成功: " + success);  // true
        System.out.println("值: " + value.get());  // 20

        // 再次 CAS: 如果值是 10,就改成 30
        success = value.compareAndSet(10, 30);
        System.out.println("成功: " + success);  // false(值已經不是 10 了)
        System.out.println("值: " + value.get());  // 20
    }
}

// CAS 是無鎖(Lock-Free)演算法
// 原理:
// 1. 讀取記憶體值 A
// 2. 準備寫入新值 B
// 3. 寫入前檢查記憶體值是否還是 A
// 4. 如果是,寫入 B;否則重試

並發集合(Concurrent Collections)

ConcurrentHashMap

import java.util.concurrent.ConcurrentHashMap;

public class ConcurrentMapDemo {
    public static void main(String[] args) {
        ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();

        // 多執行緒安全地操作
        map.put("key1", 100);
        map.putIfAbsent("key2", 200);  // 不存在才 put

        // 原子操作
        map.compute("key1", (k, v) -> v + 1);  // 100 → 101
        map.merge("key2", 50, Integer::sum);   // 200 + 50 = 250

        System.out.println(map);
    }
}

// ConcurrentHashMap vs Hashtable vs synchronizedMap
// Hashtable:整個方法加鎖(效能差)
// synchronizedMap:整個 Map 加鎖(效能差)
// ConcurrentHashMap:分段鎖(效能好)

CopyOnWriteArrayList

import java.util.concurrent.CopyOnWriteArrayList;

public class CopyOnWriteDemo {
    public static void main(String[] args) {
        CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();

        list.add("A");
        list.add("B");

        // 迭代時不會拋出 ConcurrentModificationException
        for (String item : list) {
            System.out.println(item);
            list.add("C");  // 可以在迭代中修改
        }

        System.out.println(list);
    }
}

// CopyOnWriteArrayList 特點:
// ✅ 讀取不加鎖(效能好)
// ✅ 寫入時複製整個陣列
// ❌ 寫入效能差(適合讀多寫少)
// ❌ 記憶體佔用大

BlockingQueue

import java.util.concurrent.*;

public class BlockingQueueDemo {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5);

        // 生產者
        new Thread(() -> {
            try {
                for (int i = 1; i <= 10; i++) {
                    queue.put(i);  // 滿了會阻塞
                    System.out.println("生產: " + i);
                    Thread.sleep(500);
                }
            } catch (InterruptedException e) {}
        }).start();

        // 消費者
        new Thread(() -> {
            try {
                while (true) {
                    Integer value = queue.take();  // 空了會阻塞
                    System.out.println("消費: " + value);
                    Thread.sleep(1000);
                }
            } catch (InterruptedException e) {}
        }).start();
    }
}

// BlockingQueue 實作:
// ArrayBlockingQueue:固定大小
// LinkedBlockingQueue:可選大小
// PriorityBlockingQueue:優先權佇列
// DelayQueue:延遲佇列
// SynchronousQueue:無容量(直接交接)

CompletableFuture(非同步編程)

基本用法

import java.util.concurrent.CompletableFuture;

public class CompletableFutureDemo {
    public static void main(String[] args) throws Exception {
        // 非同步執行
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("非同步任務執行於: " + Thread.currentThread().getName());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {}
            return "Hello";
        });

        System.out.println("主執行緒繼續執行...");

        // 取得結果(阻塞)
        String result = future.get();
        System.out.println("結果: " + result);
    }
}

鏈式操作

import java.util.concurrent.CompletableFuture;

public class CompletableFutureChain {
    public static void main(String[] args) throws Exception {
        CompletableFuture<String> future = CompletableFuture
            .supplyAsync(() -> {
                System.out.println("步驟 1");
                return "Hello";
            })
            .thenApply(s -> {
                System.out.println("步驟 2");
                return s + " World";
            })
            .thenApply(s -> {
                System.out.println("步驟 3");
                return s.toUpperCase();
            });

        System.out.println(future.get());  // HELLO WORLD
    }
}

// 常用方法:
// thenApply:轉換結果
// thenAccept:消費結果(無返回值)
// thenRun:執行動作(不關心結果)
// thenCompose:組合 Future
// thenCombine:合併兩個 Future

組合多個 Future

import java.util.concurrent.CompletableFuture;

public class CombineFuture {
    public static void main(String[] args) throws Exception {
        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
            sleep(1000);
            return 10;
        });

        CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
            sleep(2000);
            return 20;
        });

        // 合併結果
        CompletableFuture<Integer> combined = future1.thenCombine(future2, (a, b) -> {
            return a + b;
        });

        System.out.println("結果: " + combined.get());  // 30

        // 等待所有完成
        CompletableFuture<Void> allOf = CompletableFuture.allOf(future1, future2);
        allOf.get();  // 阻塞直到全部完成

        // 任一完成
        CompletableFuture<Object> anyOf = CompletableFuture.anyOf(future1, future2);
        System.out.println("最快完成: " + anyOf.get());
    }

    private static void sleep(int ms) {
        try {
            Thread.sleep(ms);
        } catch (InterruptedException e) {}
    }
}

異常處理

import java.util.concurrent.CompletableFuture;

public class ExceptionHandling {
    public static void main(String[] args) throws Exception {
        CompletableFuture<Integer> future = CompletableFuture
            .supplyAsync(() -> {
                if (Math.random() > 0.5) {
                    throw new RuntimeException("錯誤");
                }
                return 100;
            })
            .exceptionally(ex -> {
                System.out.println("發生異常: " + ex.getMessage());
                return 0;  // 預設值
            })
            .thenApply(result -> result * 2);

        System.out.println("結果: " + future.get());

        // 或使用 handle(同時處理成功和失敗)
        CompletableFuture<Integer> future2 = CompletableFuture
            .supplyAsync(() -> 100 / 0)  // 會拋異常
            .handle((result, ex) -> {
                if (ex != null) {
                    System.out.println("異常: " + ex.getMessage());
                    return 0;
                }
                return result;
            });

        System.out.println("結果 2: " + future2.get());
    }
}

ThreadLocal

執行緒區域變數

public class ThreadLocalDemo {
    private static ThreadLocal<Integer> threadLocal = ThreadLocal.withInitial(() -> 0);

    public static void main(String[] args) {
        // 執行緒 1
        new Thread(() -> {
            threadLocal.set(100);
            System.out.println("T1: " + threadLocal.get());  // 100
        }).start();

        // 執行緒 2
        new Thread(() -> {
            threadLocal.set(200);
            System.out.println("T2: " + threadLocal.get());  // 200
        }).start();

        // 主執行緒
        System.out.println("Main: " + threadLocal.get());  // 0(預設值)
    }
}

// ThreadLocal 特點:
// ✅ 每個執行緒有自己的副本
// ✅ 執行緒隔離
// ✅ 不需要同步
// ❌ 記憶體洩漏風險(要記得 remove())

實際應用:使用者會話

public class UserContext {
    private static ThreadLocal<User> userThreadLocal = new ThreadLocal<>();

    public static void setUser(User user) {
        userThreadLocal.set(user);
    }

    public static User getUser() {
        return userThreadLocal.get();
    }

    public static void clear() {
        userThreadLocal.remove();  // 防止記憶體洩漏
    }
}

// 在 Web 應用中使用
class RequestFilter {
    public void doFilter(HttpRequest request) {
        // 請求開始
        User user = authenticate(request);
        UserContext.setUser(user);

        try {
            // 處理請求...
            // 任何地方都可以用 UserContext.getUser() 取得使用者
        } finally {
            // 請求結束
            UserContext.clear();  // 清理,避免記憶體洩漏
        }
    }
}

多執行緒與 CPU 的關係

CPU 基礎概念

1. CPU 核心架構

現代 CPU 架構:
┌─────────────────────────────────────┐
│            CPU 晶片                  │
│  ┌──────────┐  ┌──────────┐        │
│  │  Core 0  │  │  Core 1  │        │
│  │ ┌──────┐ │  │ ┌──────┐ │        │
│  │ │Thread│ │  │ │Thread│ │        │
│  │ │  0   │ │  │ │  2   │ │        │
│  │ ├──────┤ │  │ ├──────┤ │        │
│  │ │Thread│ │  │ │Thread│ │        │
│  │ │  1   │ │  │ │  3   │ │        │
│  │ └──────┘ │  │ └──────┘ │        │
│  └──────────┘  └──────────┘        │
│                                     │
│  ┌──────────┐  ┌──────────┐        │
│  │  Core 2  │  │  Core 3  │        │
│  │ ┌──────┐ │  │ ┌──────┐ │        │
│  │ │Thread│ │  │ │Thread│ │        │
│  │ │  4   │ │  │ │  6   │ │        │
│  │ ├──────┤ │  │ ├──────┤ │        │
│  │ │Thread│ │  │ │Thread│ │        │
│  │ │  5   │ │  │ │  7   │ │        │
│  │ └──────┘ │  │ └──────┘ │        │
│  └──────────┘  └──────────┘        │
└─────────────────────────────────────┘

術語說明:
- Physical Core(實體核心):4 個
- Logical Core(邏輯核心/超執行緒):8 個
- Thread(硬體執行緒):每個核心可以有 1-2 個

2. 查看 CPU 資訊

// Java 中查看 CPU 核心數
public class CPUInfo {
    public static void main(String[] args) {
        // 可用的處理器數量(邏輯核心數)
        int processors = Runtime.getRuntime().availableProcessors();
        System.out.println("可用處理器數量: " + processors);

        // 取得作業系統資訊
        OperatingSystemMXBean osBean = ManagementFactory.getOperatingSystemMXBean();
        System.out.println("作業系統: " + osBean.getName());
        System.out.println("架構: " + osBean.getArch());
        System.out.println("可用處理器: " + osBean.getAvailableProcessors());

        // 如果是 com.sun.management.OperatingSystemMXBean
        if (osBean instanceof com.sun.management.OperatingSystemMXBean) {
            com.sun.management.OperatingSystemMXBean sunOsBean =
                (com.sun.management.OperatingSystemMXBean) osBean;

            System.out.println("系統 CPU 負載: " +
                             sunOsBean.getSystemCpuLoad() * 100 + "%");
            System.out.println("程序 CPU 負載: " +
                             sunOsBean.getProcessCpuLoad() * 100 + "%");
        }
    }
}

// 輸出範例:
// 可用處理器數量: 8
// 作業系統: Mac OS X
// 架構: x86_64
// 可用處理器: 8
// 系統 CPU 負載: 35.2%
// 程序 CPU 負載: 12.5%
# Linux/Mac:查看 CPU 資訊
lscpu                    # 詳細 CPU 資訊
nproc                    # 邏輯核心數
grep -c ^processor /proc/cpuinfo  # 邏輯核心數(Linux)

# Mac 特定
sysctl hw.physicalcpu    # 實體核心數
sysctl hw.logicalcpu     # 邏輯核心數

# 輸出範例:
# hw.physicalcpu: 4     ← 4 個實體核心
# hw.logicalcpu: 8      ← 8 個邏輯核心(支援超執行緒)

CPU 密集 vs I/O 密集

任務類型分類

CPU 密集型(CPU-Bound):
- 特徵:大量運算,很少等待
- 範例:
  • 數學運算(加密、解密)
  • 圖像處理
  • 視頻編碼
  • 資料壓縮
  • 機器學習訓練
- CPU 使用率:高(接近 100%)
- 瓶頸:CPU 運算能力

I/O 密集型(I/O-Bound):
- 特徵:大量等待,運算較少
- 範例:
  • 資料庫查詢
  • 網路請求(HTTP API)
  • 檔案讀寫
  • 訊息佇列操作
- CPU 使用率:低(大部分時間在等待)
- 瓶頸:I/O 速度(網路、磁碟)

判斷任務類型

import java.lang.management.ManagementFactory;
import java.lang.management.ThreadMXBean;

public class TaskTypeAnalyzer {
    public static void main(String[] args) throws Exception {
        ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();

        // 測試 CPU 密集任務
        Thread cpuTask = new Thread(() -> {
            long result = 0;
            for (long i = 0; i < 10_000_000_000L; i++) {
                result += i;
            }
        });

        long startCpu = threadBean.getCurrentThreadCpuTime();
        long startTime = System.nanoTime();

        cpuTask.start();
        cpuTask.join();

        long endCpu = threadBean.getCurrentThreadCpuTime();
        long endTime = System.nanoTime();

        long cpuTime = endCpu - startCpu;
        long wallTime = endTime - startTime;

        double cpuUsage = (double) cpuTime / wallTime * 100;
        System.out.println("CPU 使用率: " + cpuUsage + "%");

        // CPU 密集:接近 100%
        // I/O 密集:遠低於 100%(可能 5-20%)
    }
}

執行緒池大小設定原則

基本公式

CPU 密集型任務:
執行緒數 = CPU 核心數 + 1

原因:
- CPU 已經滿載,更多執行緒會增加上下文切換開銷
- +1 是為了當某個執行緒暫停時(page fault),另一個可以補上

範例(8 核心):
Thread Pool Size = 8 + 1 = 9

---

I/O 密集型任務:
執行緒數 = CPU 核心數 × (1 + I/O 時間 / CPU 時間)

或更簡化:
執行緒數 = CPU 核心數 × 2  (保守估計)
執行緒數 = CPU 核心數 × (2 到 50) (根據 I/O 等待比例)

範例(8 核心,I/O 時間佔 80%):
Thread Pool Size = 8 × (1 + 0.8 / 0.2) = 8 × 5 = 40

---

Little's Law(排隊理論):
執行緒數 = 每秒請求數 × 平均響應時間

範例(每秒 100 個請求,平均 0.5 秒完成):
Thread Pool Size = 100 × 0.5 = 50

實際設定建議

public class ThreadPoolSizeCalculator {
    public static void main(String[] args) {
        int cpuCores = Runtime.getRuntime().availableProcessors();

        // 1. CPU 密集型
        int cpuIntensivePoolSize = cpuCores + 1;
        System.out.println("CPU 密集型建議執行緒數: " + cpuIntensivePoolSize);

        // 2. I/O 密集型(保守)
        int ioIntensivePoolSize = cpuCores * 2;
        System.out.println("I/O 密集型建議執行緒數(保守): " + ioIntensivePoolSize);

        // 3. I/O 密集型(根據 I/O 比例)
        double ioWaitRatio = 0.8;  // 80% 時間在等待 I/O
        int ioIntensivePoolSize2 = (int) (cpuCores * (1 + ioWaitRatio / (1 - ioWaitRatio)));
        System.out.println("I/O 密集型建議執行緒數(計算): " + ioIntensivePoolSize2);

        // 4. 混合型(Web 應用常見)
        int mixedPoolSize = cpuCores * 4;  // 經驗值
        System.out.println("混合型建議執行緒數: " + mixedPoolSize);
    }
}

// 輸出範例(8 核心):
// CPU 密集型建議執行緒數: 9
// I/O 密集型建議執行緒數(保守): 16
// I/O 密集型建議執行緒數(計算): 40
// 混合型建議執行緒數: 32

執行緒池參數詳細設定

ThreadPoolExecutor 完整參數

import java.util.concurrent.*;

public class ThreadPoolConfiguration {
    public static void main(String[] args) {
        int cpuCores = Runtime.getRuntime().availableProcessors();

        // 情境 1:CPU 密集型(圖像處理、加密)
        ThreadPoolExecutor cpuBoundPool = new ThreadPoolExecutor(
            cpuCores,                    // corePoolSize:核心執行緒數
            cpuCores + 1,                // maximumPoolSize:最大執行緒數
            60L,                         // keepAliveTime:閒置存活時間
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(100),  // 有界佇列
            new ThreadPoolExecutor.CallerRunsPolicy()  // 拒絕策略
        );

        // 情境 2:I/O 密集型(資料庫、網路)
        ThreadPoolExecutor ioBoundPool = new ThreadPoolExecutor(
            cpuCores * 2,                // corePoolSize
            cpuCores * 4,                // maximumPoolSize
            60L,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(1000),  // 較大佇列
            new ThreadPoolExecutor.CallerRunsPolicy()
        );

        // 情境 3:Web 應用(混合型)
        ThreadPoolExecutor webAppPool = new ThreadPoolExecutor(
            10,                          // 最少 10 個執行緒(應對突發)
            cpuCores * 2,                // 最多不超過 CPU * 2
            120L,                        // 較長的閒置時間
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(500),  // 固定大小佇列
            new ThreadFactory() {
                private int count = 0;
                @Override
                public Thread newThread(Runnable r) {
                    Thread t = new Thread(r, "WebApp-Worker-" + count++);
                    t.setDaemon(false);
                    return t;
                }
            },
            new ThreadPoolExecutor.AbortPolicy()  // 超過負載直接拒絕
        );

        // 情境 4:定時任務
        ScheduledThreadPoolExecutor scheduledPool = new ScheduledThreadPoolExecutor(
            5,  // 5 個核心執行緒足夠
            new ThreadFactory() {
                private int count = 0;
                @Override
                public Thread newThread(Runnable r) {
                    Thread t = new Thread(r, "Scheduled-Worker-" + count++);
                    t.setDaemon(true);  // Daemon 執行緒
                    return t;
                }
            }
        );
    }
}

參數設定原則

corePoolSize(核心執行緒數):
✅ 設定為最小並發需求
✅ 這些執行緒會一直存活
✅ 即使閒置也不會回收(除非設定 allowCoreThreadTimeOut)

建議:
- CPU 密集:CPU 核心數
- I/O 密集:CPU 核心數 × 2
- Web 應用:10 - 50(根據實際負載)

---

maximumPoolSize(最大執行緒數):
✅ 負載高峰時的上限
✅ 超過 core 的執行緒會在閒置後回收

建議:
- CPU 密集:corePoolSize + 1(不需要太多)
- I/O 密集:corePoolSize × 2 或更多
- Web 應用:根據壓測結果調整

---

keepAliveTime(閒置存活時間):
✅ 超過 core 的執行緒閒置多久後回收
✅ 避免資源浪費

建議:
- 穩定負載:60 秒
- 波動負載:120-300 秒(保留執行緒應對突發)

---

workQueue(工作佇列):
✅ 任務在執行緒都忙碌時排隊

類型選擇:
- ArrayBlockingQueue:有界(防止記憶體溢出)推薦
- LinkedBlockingQueue:可選界限
- SynchronousQueue:無容量(直接交接)
- PriorityBlockingQueue:優先權佇列

建議:
- 使用有界佇列(防止 OOM)
- 大小設定為預期並發數的 2-10 倍

---

RejectedExecutionHandler(拒絕策略):
✅ 佇列滿且無法建立新執行緒時的處理

選項:
1. AbortPolicy:拋出異常(預設)
2. CallerRunsPolicy:由呼叫者執行(推薦)
3. DiscardPolicy:靜默丟棄
4. DiscardOldestPolicy:丟棄最舊的任務

建議:
- 重要任務:CallerRunsPolicy(降低提交速度)
- 可丟棄任務:DiscardPolicy
- 需要監控:AbortPolicy + 異常處理

動態調整執行緒池大小

監控與自動調整

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class DynamicThreadPool {
    private ThreadPoolExecutor executor;
    private ScheduledExecutorService monitor;

    public DynamicThreadPool() {
        int cpuCores = Runtime.getRuntime().availableProcessors();

        // 建立執行緒池
        executor = new ThreadPoolExecutor(
            cpuCores,
            cpuCores * 4,
            60L,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(500),
            new ThreadPoolExecutor.CallerRunsPolicy()
        );

        // 啟動監控執行緒
        monitor = Executors.newSingleThreadScheduledExecutor();
        monitor.scheduleAtFixedRate(this::adjustPoolSize, 10, 10, TimeUnit.SECONDS);
    }

    private void adjustPoolSize() {
        int activeThreads = executor.getActiveCount();
        int poolSize = executor.getPoolSize();
        int queueSize = executor.getQueue().size();
        int maxPoolSize = executor.getMaximumPoolSize();

        System.out.println("=== 執行緒池狀態 ===");
        System.out.println("活躍執行緒: " + activeThreads);
        System.out.println("執行緒池大小: " + poolSize);
        System.out.println("佇列大小: " + queueSize);
        System.out.println("最大池大小: " + maxPoolSize);

        // 調整策略
        double utilization = (double) activeThreads / poolSize;

        if (utilization > 0.9 && poolSize < maxPoolSize) {
            // 使用率 > 90%,增加核心執行緒數
            int newCoreSize = Math.min(poolSize + 2, maxPoolSize);
            executor.setCorePoolSize(newCoreSize);
            System.out.println("→ 增加核心執行緒數至: " + newCoreSize);
        } else if (utilization < 0.3 && poolSize > executor.getCorePoolSize()) {
            // 使用率 < 30%,減少核心執行緒數
            int newCoreSize = Math.max(poolSize - 2, Runtime.getRuntime().availableProcessors());
            executor.setCorePoolSize(newCoreSize);
            System.out.println("→ 減少核心執行緒數至: " + newCoreSize);
        }

        // 佇列積壓警告
        if (queueSize > 400) {
            System.out.println("⚠️ 警告:佇列積壓嚴重,考慮增加執行緒數或優化任務");
        }
    }

    public void submit(Runnable task) {
        executor.submit(task);
    }

    public void shutdown() {
        monitor.shutdown();
        executor.shutdown();
    }
}

壓力測試與調優

public class LoadTesting {
    public static void main(String[] args) throws Exception {
        int cpuCores = Runtime.getRuntime().availableProcessors();

        // 測試不同執行緒池大小
        int[] poolSizes = {cpuCores, cpuCores * 2, cpuCores * 4, cpuCores * 8};

        for (int size : poolSizes) {
            System.out.println("\n=== 測試執行緒池大小: " + size + " ===");
            testThreadPool(size, 1000);  // 1000 個任務
        }
    }

    private static void testThreadPool(int poolSize, int taskCount) throws Exception {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            poolSize,
            poolSize,
            60L,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(taskCount),
            new ThreadPoolExecutor.CallerRunsPolicy()
        );

        long startTime = System.currentTimeMillis();

        // 提交任務
        CountDownLatch latch = new CountDownLatch(taskCount);
        for (int i = 0; i < taskCount; i++) {
            executor.submit(() -> {
                try {
                    // 模擬 I/O 密集任務(80% 時間在等待)
                    Thread.sleep(100);  // I/O 等待
                    // 模擬少量運算
                    long result = 0;
                    for (int j = 0; j < 100000; j++) {
                        result += j;
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    latch.countDown();
                }
            });
        }

        // 等待所有任務完成
        latch.await();
        long endTime = System.currentTimeMillis();

        executor.shutdown();
        executor.awaitTermination(1, TimeUnit.MINUTES);

        long duration = endTime - startTime;
        double throughput = (double) taskCount / duration * 1000;

        System.out.println("完成時間: " + duration + " ms");
        System.out.println("吞吐量: " + String.format("%.2f", throughput) + " 任務/秒");
        System.out.println("平均每個任務: " + (duration / taskCount) + " ms");
    }
}

// 輸出範例:
// === 測試執行緒池大小: 8 ===
// 完成時間: 12500 ms
// 吞吐量: 80.00 任務/秒
// 平均每個任務: 12 ms
//
// === 測試執行緒池大小: 16 ===
// 完成時間: 6250 ms
// 吞吐量: 160.00 任務/秒
// 平均每個任務: 6 ms
//
// === 測試執行緒池大小: 32 ===
// 完成時間: 3200 ms
// 吞吐量: 312.50 任務/秒
// 平均每個任務: 3 ms
//
// → 結論:I/O 密集任務,執行緒數越多越好(直到某個點)

CPU 親和性(CPU Affinity)

概念說明

CPU 親和性:
將執行緒綁定到特定的 CPU 核心

優點:
✅ 減少上下文切換
✅ 提升 CPU 快取命中率
✅ 更好的效能可預測性

缺點:
❌ 降低靈活性
❌ 可能造成負載不均
❌ 需要手動管理

適用場景:
- 即時系統
- 高頻交易
- 低延遲應用

Linux 上設定 CPU 親和性

// Java 本身不支援 CPU 親和性
// 需要透過 JNI 或第三方套件

// 方法 1:使用 taskset(啟動時)
// taskset -c 0-3 java -jar myapp.jar
// 將 JVM 綁定到 CPU 0-3

// 方法 2:使用 JNA 套件
import com.sun.jna.Library;
import com.sun.jna.Native;
import com.sun.jna.Platform;

public interface CLibrary extends Library {
    CLibrary INSTANCE = Native.load("c", CLibrary.class);

    int sched_setaffinity(int pid, int cpusetsize, long[] cpuset);
    int sched_getaffinity(int pid, int cpusetsize, long[] cpuset);
}

public class CPUAffinity {
    public static void setCPUAffinity(int cpuCore) {
        if (!Platform.isLinux()) {
            System.out.println("僅支援 Linux");
            return;
        }

        long[] cpuset = new long[1];
        cpuset[0] = 1L << cpuCore;  // 綁定到特定核心

        int pid = 0;  // 0 = 當前執行緒
        int result = CLibrary.INSTANCE.sched_setaffinity(
            pid,
            Long.SIZE / 8,
            cpuset
        );

        if (result == 0) {
            System.out.println("成功綁定到 CPU " + cpuCore);
        } else {
            System.out.println("綁定失敗");
        }
    }
}

// 注意:大部分情況下不需要設定 CPU 親和性
// 讓作業系統的排程器處理通常更好

效能監控與診斷

JMX 監控執行緒池

import javax.management.MBeanServer;
import javax.management.ObjectName;
import java.lang.management.ManagementFactory;
import java.util.concurrent.*;

public class ThreadPoolMonitor {
    public static void main(String[] args) throws Exception {
        // 建立執行緒池
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            4, 8, 60L, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(100)
        );

        // 註冊到 JMX
        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
        ObjectName name = new ObjectName("com.example:type=ThreadPool,name=MyPool");
        ThreadPoolMXBean mxBean = new ThreadPoolMXBean(executor);
        mbs.registerMBean(mxBean, name);

        // 定期輸出統計
        ScheduledExecutorService monitor = Executors.newSingleThreadScheduledExecutor();
        monitor.scheduleAtFixedRate(() -> {
            System.out.println("\n=== 執行緒池統計 ===");
            System.out.println("核心執行緒數: " + executor.getCorePoolSize());
            System.out.println("當前執行緒數: " + executor.getPoolSize());
            System.out.println("活躍執行緒數: " + executor.getActiveCount());
            System.out.println("最大執行緒數: " + executor.getMaximumPoolSize());
            System.out.println("曾經最大執行緒數: " + executor.getLargestPoolSize());
            System.out.println("任務總數: " + executor.getTaskCount());
            System.out.println("已完成任務數: " + executor.getCompletedTaskCount());
            System.out.println("佇列大小: " + executor.getQueue().size());

            // 計算使用率
            double utilization = (double) executor.getActiveCount() / executor.getPoolSize();
            System.out.println("執行緒使用率: " + String.format("%.2f%%", utilization * 100));

            // 計算完成率
            double completionRate = (double) executor.getCompletedTaskCount() / executor.getTaskCount();
            System.out.println("任務完成率: " + String.format("%.2f%%", completionRate * 100));
        }, 5, 5, TimeUnit.SECONDS);

        // 提交一些任務進行測試
        for (int i = 0; i < 50; i++) {
            final int taskId = i;
            executor.submit(() -> {
                try {
                    System.out.println("Task " + taskId + " 執行中");
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
            Thread.sleep(100);
        }

        // 運行一段時間後關閉
        Thread.sleep(30000);
        monitor.shutdown();
        executor.shutdown();
    }
}

使用 VisualVM 或 JConsole

# 啟動 JVM 時加入 JMX 參數
java -Dcom.sun.management.jmxremote \
     -Dcom.sun.management.jmxremote.port=9999 \
     -Dcom.sun.management.jmxremote.authenticate=false \
     -Dcom.sun.management.jmxremote.ssl=false \
     -jar myapp.jar

# 然後用 VisualVM 或 JConsole 連接
jvisualvm
jconsole localhost:9999

# 可以看到:
# - 執行緒數量
# - CPU 使用率
# - 記憶體使用
# - 執行緒狀態
# - 執行緒 Dump

實戰範例:最佳化 Web 應用執行緒池

import java.util.concurrent.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ThreadPoolConfig {

    @Bean(name = "apiRequestPool")
    public Executor apiRequestPool() {
        int cpuCores = Runtime.getRuntime().availableProcessors();

        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

        // I/O 密集(API 請求)
        executor.setCorePoolSize(cpuCores * 2);     // 16(假設 8 核)
        executor.setMaxPoolSize(cpuCores * 4);      // 32
        executor.setQueueCapacity(500);             // 佇列容量
        executor.setKeepAliveSeconds(120);          // 閒置時間
        executor.setThreadNamePrefix("API-");       // 執行緒名稱

        // 拒絕策略:由呼叫者執行(降速)
        executor.setRejectedExecutionHandler(
            new ThreadPoolExecutor.CallerRunsPolicy()
        );

        // 等待任務完成後才關閉
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.setAwaitTerminationSeconds(60);

        executor.initialize();
        return executor;
    }

    @Bean(name = "backgroundTaskPool")
    public Executor backgroundTaskPool() {
        int cpuCores = Runtime.getRuntime().availableProcessors();

        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

        // 背景任務(CPU + I/O 混合)
        executor.setCorePoolSize(cpuCores);         // 8
        executor.setMaxPoolSize(cpuCores * 2);      // 16
        executor.setQueueCapacity(1000);            // 較大佇列
        executor.setKeepAliveSeconds(300);          // 較長閒置時間
        executor.setThreadNamePrefix("Background-");

        // 拒絕策略:丟棄(非關鍵任務)
        executor.setRejectedExecutionHandler(
            new ThreadPoolExecutor.DiscardPolicy()
        );

        executor.initialize();
        return executor;
    }

    @Bean(name = "cpuIntensivePool")
    public Executor cpuIntensivePool() {
        int cpuCores = Runtime.getRuntime().availableProcessors();

        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

        // CPU 密集(圖像處理、加密)
        executor.setCorePoolSize(cpuCores);         // 8
        executor.setMaxPoolSize(cpuCores + 1);      // 9
        executor.setQueueCapacity(50);              // 小佇列
        executor.setKeepAliveSeconds(60);
        executor.setThreadNamePrefix("CPU-");

        // 拒絕策略:阻塞(避免過載)
        executor.setRejectedExecutionHandler(
            new ThreadPoolExecutor.CallerRunsPolicy()
        );

        executor.initialize();
        return executor;
    }
}

// 使用
@Service
public class MyService {

    @Autowired
    @Qualifier("apiRequestPool")
    private Executor apiRequestPool;

    @Autowired
    @Qualifier("backgroundTaskPool")
    private Executor backgroundTaskPool;

    public void handleRequest() {
        // API 請求
        apiRequestPool.execute(() -> {
            // 呼叫外部 API
            callExternalAPI();
        });

        // 背景任務
        backgroundTaskPool.execute(() -> {
            // 發送 Email
            sendEmail();
        });
    }
}

決策流程圖

開始設定執行緒池
┌─────────────────────────────┐
│ 1. 判斷任務類型              │
└──────────┬──────────────────┘
    ┌──────┴───────┐
    │              │
CPU 密集         I/O 密集
    │              │
    ↓              ↓
核心數 + 1    核心數 × (2~50)
    │              │
    └──────┬───────┘
┌─────────────────────────────┐
│ 2. 測量實際 I/O 比例         │
│    - 使用率監控              │
│    - 壓力測試                │
└──────────┬──────────────────┘
┌─────────────────────────────┐
│ 3. 設定執行緒池參數          │
│    - corePoolSize            │
│    - maximumPoolSize         │
│    - workQueue               │
│    - RejectedHandler         │
└──────────┬──────────────────┘
┌─────────────────────────────┐
│ 4. 壓力測試驗證              │
│    - 吞吐量測試              │
│    - 延遲測試                │
│    - 穩定性測試              │
└──────────┬──────────────────┘
┌─────────────────────────────┐
│ 5. 生產環境監控與調整        │
│    - CPU 使用率              │
│    - 執行緒使用率            │
│    - 佇列積壓                │
│    - 響應時間                │
└──────────┬──────────────────┘
    調優完成 ✓

常見問題 FAQ

Q1: 執行緒池設太大會怎樣?

問題:
❌ 上下文切換開銷增加
❌ 記憶體消耗增加(每個執行緒約 1MB stack)
❌ CPU 快取效率下降
❌ 可能導致 OutOfMemoryError

範例:
1000 個執行緒 × 1MB = 1GB 記憶體
頻繁切換 → CPU 時間浪費在切換而非工作

建議:
從保守值開始(CPU 核心數 × 2)
根據監控逐步調整

Q2: 執行緒池設太小會怎樣?

問題:
❌ CPU 利用率低
❌ 吞吐量不足
❌ 請求佇列積壓
❌ 響應時間變長

範例:
8 核心 CPU,只設定 2 個執行緒
→ CPU 大部分時間閒置
→ 無法發揮硬體效能

建議:
至少 = CPU 核心數
I/O 密集任務應該更多

Q3: 如何知道當前設定是否合適?

監控指標:
✅ CPU 使用率應該在 70-80%(CPU 密集)
✅ 執行緒使用率應該在 80-90%
✅ 佇列積壓應該接近 0
✅ 響應時間穩定且符合 SLA
✅ 沒有執行緒飢餓現象

警告訊號:
⚠️ CPU 使用率一直很低 → 執行緒太少
⚠️ 佇列持續積壓 → 執行緒太少或任務太慢
⚠️ 頻繁拒絕任務 → 池太小或佇列太小
⚠️ 大量上下文切換 → 執行緒太多

Q4: 生產環境如何調整?

階段式調整:
1. 從保守值開始
   - CPU 密集:核心數 + 1
   - I/O 密集:核心數 × 2

2. 壓力測試
   - 模擬生產流量
   - 逐步增加執行緒數
   - 找到甜蜜點(Sweet Spot)

3. 灰度發佈
   - 小範圍測試新配置
   - 監控關鍵指標
   - 逐步推廣

4. 持續監控
   - 設定告警閾值
   - 定期檢視報表
   - 根據業務變化調整

執行緒安全的設計模式

單例模式(雙重檢查鎖定)

public class Singleton {
    // volatile 防止指令重排序
    private static volatile Singleton instance;

    private Singleton() {}

    public static Singleton getInstance() {
        if (instance == null) {  // 第一次檢查
            synchronized (Singleton.class) {
                if (instance == null) {  // 第二次檢查
                    instance = new Singleton();
                }
            }
        }
        return instance;
    }
}

// 或使用靜態內部類(更推薦)
public class Singleton {
    private Singleton() {}

    private static class Holder {
        private static final Singleton INSTANCE = new Singleton();
    }

    public static Singleton getInstance() {
        return Holder.INSTANCE;
    }
}

不可變物件(Immutable)

// 執行緒安全的不可變類別
public final class ImmutablePerson {
    private final String name;
    private final int age;

    public ImmutablePerson(String name, int age) {
        this.name = name;
        this.age = age;
    }

    public String getName() {
        return name;
    }

    public int getAge() {
        return age;
    }

    // 修改時返回新物件
    public ImmutablePerson withAge(int newAge) {
        return new ImmutablePerson(this.name, newAge);
    }
}

// 不可變物件的特點:
// ✅ 天生執行緒安全
// ✅ 可以安全地共享
// ✅ 適合作為 Map 的 key
// ❌ 修改時需要建立新物件

最佳實踐

1. 選擇適當的同步機制

簡單情境:
✅ synchronized

需要進階功能(tryLock, 公平鎖):
✅ ReentrantLock

讀多寫少:
✅ ReadWriteLock

無競爭:
✅ Atomic 類別

高併發:
✅ ConcurrentHashMap, CopyOnWriteArrayList

2. 避免過度同步

// ❌ 不好:整個方法同步
public synchronized void process() {
    // 長時間運算
    heavyComputation();

    // 修改共享變數
    count++;
}

// ✅ 好:只同步關鍵區域
public void process() {
    // 長時間運算(不需要同步)
    heavyComputation();

    // 只同步修改共享變數的部分
    synchronized (this) {
        count++;
    }
}

3. 使用執行緒池

// ❌ 不好:每次建立新執行緒
for (int i = 0; i < 1000; i++) {
    new Thread(() -> task()).start();
}

// ✅ 好:使用執行緒池
ExecutorService executor = Executors.newFixedThreadPool(10);
for (int i = 0; i < 1000; i++) {
    executor.submit(() -> task());
}
executor.shutdown();

4. 正確關閉資源

ExecutorService executor = Executors.newFixedThreadPool(10);
try {
    // 使用執行緒池
    executor.submit(() -> task());
} finally {
    // 關閉執行緒池
    executor.shutdown();
    try {
        if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
            executor.shutdownNow();
        }
    } catch (InterruptedException e) {
        executor.shutdownNow();
        Thread.currentThread().interrupt();
    }
}

5. 處理 InterruptedException

// ✅ 正確處理中斷
public void task() {
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        // 恢復中斷狀態
        Thread.currentThread().interrupt();
        // 清理資源
        cleanup();
        // 返回
        return;
    }
}

常見陷阱

1. 忘記 volatile

// ❌ 錯誤:沒有 volatile
class StopThread {
    private boolean stop = false;

    public void run() {
        while (!stop) {
            // 可能永遠不會停止(JVM 優化)
        }
    }

    public void stop() {
        stop = true;
    }
}

// ✅ 正確:加上 volatile
class StopThread {
    private volatile boolean stop = false;
    // volatile 保證可見性
}

2. 雙重檢查鎖定的陷阱

// ❌ 錯誤:沒有 volatile
private static Singleton instance;

public static Singleton getInstance() {
    if (instance == null) {
        synchronized (Singleton.class) {
            if (instance == null) {
                instance = new Singleton();  // 可能指令重排序
            }
        }
    }
    return instance;  // 可能返回未完全初始化的物件
}

// ✅ 正確:加上 volatile
private static volatile Singleton instance;

3. 共享可變狀態

// ❌ 錯誤:共享可變 List
class SharedList {
    private List<String> list = new ArrayList<>();  // 不安全

    public void add(String item) {
        list.add(item);  // 多執行緒會出問題
    }
}

// ✅ 方案 1:同步
class SharedList {
    private List<String> list = new ArrayList<>();

    public synchronized void add(String item) {
        list.add(item);
    }
}

// ✅ 方案 2:使用執行緒安全的集合
class SharedList {
    private List<String> list = new CopyOnWriteArrayList<>();

    public void add(String item) {
        list.add(item);  // 安全
    }
}

效能考量

1. 鎖的粒度

粗粒度鎖(Coarse-grained):
- 鎖的範圍大
- 競爭激烈
- 效能差

細粒度鎖(Fine-grained):
- 鎖的範圍小
- 競爭少
- 效能好

2. 鎖消除(Lock Elimination)

// JVM 會自動優化
public String concat(String s1, String s2) {
    StringBuffer sb = new StringBuffer();  // 區域變數
    sb.append(s1);  // synchronized 方法
    sb.append(s2);
    return sb.toString();
}

// JVM 發現 sb 是區域變數,不會被其他執行緒存取
// 自動消除 synchronized 鎖

3. 鎖粗化(Lock Coarsening)

// 原本:頻繁加鎖/解鎖
for (int i = 0; i < 1000; i++) {
    synchronized (lock) {
        count++;
    }
}

// JVM 優化後:合併為一次加鎖
synchronized (lock) {
    for (int i = 0; i < 1000; i++) {
        count++;
    }
}

除錯技巧

1. 檢測死鎖

# 使用 jstack 檢測死鎖
jstack <pid>

# 或在程式中
ThreadMXBean tmx = ManagementFactory.getThreadMXBean();
long[] deadlockedThreads = tmx.findDeadlockedThreads();
if (deadlockedThreads != null) {
    System.out.println("發現死鎖!");
}

2. 執行緒 Dump

// 取得所有執行緒的堆疊追蹤
Map<Thread, StackTraceElement[]> map = Thread.getAllStackTraces();
for (Map.Entry<Thread, StackTraceElement[]> entry : map.entrySet()) {
    Thread thread = entry.getKey();
    StackTraceElement[] stack = entry.getValue();

    System.out.println("Thread: " + thread.getName());
    for (StackTraceElement element : stack) {
        System.out.println("  " + element);
    }
}

3. 日誌記錄

import java.util.logging.Logger;

public class ThreadSafeLogger {
    private static final Logger logger = Logger.getLogger("ThreadLog");

    public void log(String message) {
        logger.info("[" + Thread.currentThread().getName() + "] " + message);
    }
}

參考資源

官方文件

推薦書籍

  • Java Concurrency in Practice (Brian Goetz)
  • Effective Java (Joshua Bloch)

總結

核心概念速記

執行緒 = 輕量級程序(共享記憶體)
同步 = 防止競態條件(synchronized, Lock)
死鎖 = 互相等待資源(避免:統一順序加鎖)
執行緒池 = 重複使用執行緒(避免頻繁建立)

常用類別對照

用途 類別 說明
建立執行緒 Thread, Runnable, Callable 基本執行緒
同步 synchronized, ReentrantLock 互斥鎖
執行緒池 ExecutorService 管理執行緒
原子操作 AtomicInteger 無鎖並發
並發集合 ConcurrentHashMap 執行緒安全
非同步 CompletableFuture 非同步編程

建立日期:2025-11-07 最後更新:2025-11-18

🔗相關文章