多執行緒是什麼?
多執行緒(Multithreading) 是讓程式同時執行多個任務的技術。
核心概念
單執行緒:
任務 A → 任務 B → 任務 C
(依序執行,必須等待)
多執行緒:
任務 A ↘
任務 B → 同時執行
任務 C ↗
優點:
✅ 提升效能(充分利用 CPU)
✅ 改善響應速度
✅ 處理並發請求
Process vs Thread
Process(程序):
├── 獨立的記憶體空間
├── 獨立的資源
├── 程序間通訊較複雜
└── 建立成本高
Thread(執行緒):
├── 共享同一個程序的記憶體
├── 共享資源(變數、物件)
├── 執行緒間通訊容易
└── 建立成本低
一個 Process 可以有多個 Thread
Java Thread 基礎
建立執行緒的方式
方法 1:繼承 Thread 類別
// 定義執行緒類別
class MyThread extends Thread {
@Override
public void run() {
// 執行緒要執行的程式碼
for (int i = 0; i < 5; i++) {
System.out.println(Thread.currentThread().getName() + ": " + i);
try {
Thread.sleep(1000); // 休眠 1 秒
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
// 使用執行緒
public class Main {
public static void main(String[] args) {
MyThread t1 = new MyThread();
MyThread t2 = new MyThread();
t1.start(); // 啟動執行緒(不是 run())
t2.start();
}
}
// 輸出(順序可能不同):
// Thread-0: 0
// Thread-1: 0
// Thread-0: 1
// Thread-1: 1
// ...
方法 2:實作 Runnable 介面(推薦)
// 定義 Runnable
class MyTask implements Runnable {
@Override
public void run() {
for (int i = 0; i < 5; i++) {
System.out.println(Thread.currentThread().getName() + ": " + i);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
// 使用
public class Main {
public static void main(String[] args) {
MyTask task = new MyTask();
Thread t1 = new Thread(task, "Worker-1");
Thread t2 = new Thread(task, "Worker-2");
t1.start();
t2.start();
}
}
// 為什麼推薦 Runnable?
// ✅ Java 是單一繼承,用 Runnable 不佔用繼承位置
// ✅ 更符合物件導向設計(分離任務和執行)
// ✅ 可以重複使用同一個 Runnable
方法 3:使用 Lambda(Java 8+)
public class Main {
public static void main(String[] args) {
// 直接用 Lambda 建立 Runnable
Thread t1 = new Thread(() -> {
for (int i = 0; i < 5; i++) {
System.out.println(Thread.currentThread().getName() + ": " + i);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
t1.start();
}
}
// Lambda 最簡潔!
方法 4:使用 Callable 和 Future
import java.util.concurrent.*;
// Callable 可以返回結果
class MyCallable implements Callable<Integer> {
@Override
public Integer call() throws Exception {
int sum = 0;
for (int i = 1; i <= 100; i++) {
sum += i;
Thread.sleep(10);
}
return sum;
}
}
public class Main {
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(2);
// 提交 Callable 任務
Future<Integer> future = executor.submit(new MyCallable());
// 做其他事情...
System.out.println("等待結果...");
// 取得結果(會阻塞直到完成)
Integer result = future.get();
System.out.println("結果: " + result); // 5050
executor.shutdown();
}
}
// Callable vs Runnable
// Callable: 可以返回結果,可以拋出異常
// Runnable: 無返回值,不能拋出 checked exception
Thread 生命週期
執行緒狀態
NEW (新建)
↓ start()
RUNNABLE (可執行)
↓ 獲得 CPU
RUNNING (執行中)
↓
├→ sleep()/wait() → BLOCKED/WAITING
├→ I/O 操作 → BLOCKED
├→ join() → WAITING
└→ 完成 → TERMINATED
狀態轉換圖
┌─────────┐
│ NEW │ 新建執行緒
└────┬────┘
│ start()
↓
┌─────────┐
│RUNNABLE │ 等待 CPU 排程
└────┬────┘
│ CPU 排程
↓
┌─────────┐
│ RUNNING │ 執行中
└────┬────┘
│
├→ sleep(ms) ┌──────────┐
├→ wait() →│ WAITING │ 等待狀態
├→ join() └──────────┘
│ ↓ notify()/notifyAll()
├→ I/O ┌──────────┐
├→ synchronized→│ BLOCKED │ 阻塞狀態
│ └──────────┘
│ ↓ 獲得鎖/I/O 完成
└→ 完成 ┌──────────┐
→│TERMINATED│ 終止
└──────────┘
執行緒狀態範例
public class ThreadStateDemo {
public static void main(String[] args) throws InterruptedException {
Thread t = new Thread(() -> {
try {
System.out.println("Thread 開始執行");
Thread.sleep(2000);
System.out.println("Thread 結束");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
System.out.println("NEW: " + t.getState()); // NEW
t.start();
System.out.println("RUNNABLE: " + t.getState()); // RUNNABLE
Thread.sleep(100);
System.out.println("TIMED_WAITING: " + t.getState()); // TIMED_WAITING
t.join(); // 等待執行緒結束
System.out.println("TERMINATED: " + t.getState()); // TERMINATED
}
}
Thread 常用方法
基本方法
// 啟動執行緒
thread.start();
// 取得執行緒名稱
String name = Thread.currentThread().getName();
// 設定執行緒名稱
thread.setName("MyThread");
// 休眠(毫秒)
Thread.sleep(1000); // 休眠 1 秒
// 讓出 CPU(給其他執行緒機會)
Thread.yield();
// 等待執行緒結束
thread.join(); // 無限等待
thread.join(5000); // 最多等 5 秒
// 中斷執行緒
thread.interrupt();
// 檢查是否被中斷
boolean isInterrupted = thread.isInterrupted();
// 檢查是否還活著
boolean isAlive = thread.isAlive();
// 設定為 Daemon 執行緒(背景執行緒)
thread.setDaemon(true);
sleep() vs wait()
// sleep()
// - Thread 的靜態方法
// - 不釋放鎖
// - 時間到自動醒來
Thread.sleep(1000);
// wait()
// - Object 的方法
// - 釋放鎖
// - 需要 notify()/notifyAll() 喚醒
synchronized (obj) {
obj.wait();
}
// 範例:sleep 不釋放鎖
class SleepDemo {
public static void main(String[] args) {
Object lock = new Object();
new Thread(() -> {
synchronized (lock) {
System.out.println("T1: 獲得鎖");
try {
Thread.sleep(3000); // 不釋放鎖!
} catch (InterruptedException e) {}
System.out.println("T1: 釋放鎖");
}
}).start();
new Thread(() -> {
synchronized (lock) {
System.out.println("T2: 獲得鎖");
}
}).start();
}
}
// 輸出:
// T1: 獲得鎖
// (等待 3 秒)
// T1: 釋放鎖
// T2: 獲得鎖
join() 方法
public class JoinDemo {
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
System.out.println("T1 開始");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {}
System.out.println("T1 結束");
});
Thread t2 = new Thread(() -> {
System.out.println("T2 開始");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {}
System.out.println("T2 結束");
});
t1.start();
t2.start();
t1.join(); // 等待 t1 結束
t2.join(); // 等待 t2 結束
System.out.println("主執行緒結束");
}
}
// 輸出:
// T1 開始
// T2 開始
// T2 結束
// T1 結束
// 主執行緒結束
執行緒同步(Synchronization)
為什麼需要同步?
// 問題:多執行緒存取共享變數
class Counter {
private int count = 0;
public void increment() {
count++; // 不是原子操作!
}
public int getCount() {
return count;
}
}
public class Main {
public static void main(String[] args) throws InterruptedException {
Counter counter = new Counter();
// 建立 1000 個執行緒,每個都 +1
Thread[] threads = new Thread[1000];
for (int i = 0; i < 1000; i++) {
threads[i] = new Thread(() -> counter.increment());
threads[i].start();
}
// 等待所有執行緒結束
for (Thread t : threads) {
t.join();
}
System.out.println("Count: " + counter.getCount());
// 預期:1000
// 實際:可能是 987, 992, 995... (小於 1000)
}
}
// 原因:count++ 不是原子操作
// 實際上是:
// 1. 讀取 count
// 2. +1
// 3. 寫回 count
// 多執行緒可能同時讀到同一個值!
synchronized 關鍵字
1. 同步方法
class Counter {
private int count = 0;
// 方法加上 synchronized
public synchronized void increment() {
count++;
}
public synchronized int getCount() {
return count;
}
}
// synchronized 方法等於:
public void increment() {
synchronized (this) {
count++;
}
}
2. 同步區塊
class Counter {
private int count = 0;
private Object lock = new Object();
public void increment() {
// 只同步關鍵區域
synchronized (lock) {
count++;
}
}
}
// 優點:
// ✅ 可以選擇不同的鎖物件
// ✅ 同步範圍更小,效能更好
3. 靜態同步方法
class Counter {
private static int count = 0;
// 鎖的是 Class 物件
public static synchronized void increment() {
count++;
}
}
// 等於:
public static void increment() {
synchronized (Counter.class) {
count++;
}
}
synchronized 範例
// 銀行帳戶範例
class BankAccount {
private int balance = 1000;
public synchronized void withdraw(int amount) {
if (balance >= amount) {
System.out.println(Thread.currentThread().getName() +
" 準備提款: " + amount);
// 模擬處理時間
try {
Thread.sleep(100);
} catch (InterruptedException e) {}
balance -= amount;
System.out.println(Thread.currentThread().getName() +
" 完成提款,餘額: " + balance);
} else {
System.out.println(Thread.currentThread().getName() +
" 餘額不足");
}
}
public synchronized int getBalance() {
return balance;
}
}
public class Main {
public static void main(String[] args) {
BankAccount account = new BankAccount();
// 兩人同時提款
Thread t1 = new Thread(() -> account.withdraw(600), "小明");
Thread t2 = new Thread(() -> account.withdraw(600), "小華");
t1.start();
t2.start();
}
}
// 有 synchronized:
// 小明 準備提款: 600
// 小明 完成提款,餘額: 400
// 小華 餘額不足
// 沒有 synchronized:
// 小明 準備提款: 600
// 小華 準備提款: 600
// 小明 完成提款,餘額: 400
// 小華 完成提款,餘額: -200 ← 錯誤!
死鎖(Deadlock)
什麼是死鎖?
死鎖:兩個或多個執行緒互相等待對方釋放資源,導致永久阻塞
情境:
執行緒 A:持有鎖 1,等待鎖 2
執行緒 B:持有鎖 2,等待鎖 1
→ 雙方都在等待,永遠無法繼續
死鎖範例
public class DeadlockDemo {
private static Object lock1 = new Object();
private static Object lock2 = new Object();
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
synchronized (lock1) {
System.out.println("T1: 獲得 lock1");
try {
Thread.sleep(100);
} catch (InterruptedException e) {}
System.out.println("T1: 等待 lock2");
synchronized (lock2) {
System.out.println("T1: 獲得 lock2");
}
}
});
Thread t2 = new Thread(() -> {
synchronized (lock2) {
System.out.println("T2: 獲得 lock2");
try {
Thread.sleep(100);
} catch (InterruptedException e) {}
System.out.println("T2: 等待 lock1");
synchronized (lock1) {
System.out.println("T2: 獲得 lock1");
}
}
});
t1.start();
t2.start();
}
}
// 輸出:
// T1: 獲得 lock1
// T2: 獲得 lock2
// T1: 等待 lock2
// T2: 等待 lock1
// (程式卡住,永遠不會結束)
避免死鎖
// 方法 1:總是以相同順序獲取鎖
public class NoDeadlock {
private static Object lock1 = new Object();
private static Object lock2 = new Object();
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
synchronized (lock1) { // 先 lock1
synchronized (lock2) { // 再 lock2
System.out.println("T1 完成");
}
}
});
Thread t2 = new Thread(() -> {
synchronized (lock1) { // 先 lock1(順序相同)
synchronized (lock2) { // 再 lock2
System.out.println("T2 完成");
}
}
});
t1.start();
t2.start();
}
}
// 方法 2:使用 tryLock(Lock 介面)
Lock lock1 = new ReentrantLock();
Lock lock2 = new ReentrantLock();
if (lock1.tryLock()) {
try {
if (lock2.tryLock()) {
try {
// 同時獲得兩個鎖
} finally {
lock2.unlock();
}
}
} finally {
lock1.unlock();
}
}
// 方法 3:設定超時
if (lock1.tryLock(1, TimeUnit.SECONDS)) {
// ...
}
wait() 和 notify()
生產者-消費者模式
import java.util.LinkedList;
import java.util.Queue;
class SharedBuffer {
private Queue<Integer> buffer = new LinkedList<>();
private int capacity = 5;
// 生產
public synchronized void produce(int value) throws InterruptedException {
// 緩衝區滿了,等待
while (buffer.size() == capacity) {
System.out.println("緩衝區滿,生產者等待");
wait(); // 釋放鎖,等待消費者通知
}
buffer.add(value);
System.out.println("生產: " + value + " (目前: " + buffer.size() + ")");
notifyAll(); // 通知消費者
}
// 消費
public synchronized int consume() throws InterruptedException {
// 緩衝區空了,等待
while (buffer.isEmpty()) {
System.out.println("緩衝區空,消費者等待");
wait(); // 釋放鎖,等待生產者通知
}
int value = buffer.poll();
System.out.println("消費: " + value + " (目前: " + buffer.size() + ")");
notifyAll(); // 通知生產者
return value;
}
}
public class ProducerConsumerDemo {
public static void main(String[] args) {
SharedBuffer buffer = new SharedBuffer();
// 生產者執行緒
Thread producer = new Thread(() -> {
try {
for (int i = 1; i <= 10; i++) {
buffer.produce(i);
Thread.sleep(500);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// 消費者執行緒
Thread consumer = new Thread(() -> {
try {
for (int i = 1; i <= 10; i++) {
buffer.consume();
Thread.sleep(1000); // 消費慢一點
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
producer.start();
consumer.start();
}
}
// 輸出範例:
// 生產: 1 (目前: 1)
// 生產: 2 (目前: 2)
// 消費: 1 (目前: 1)
// 生產: 3 (目前: 2)
// 生產: 4 (目前: 3)
// 消費: 2 (目前: 2)
// ...
wait() vs notify() vs notifyAll()
wait()
- 釋放鎖
- 進入等待狀態
- 需要在 synchronized 區塊內呼叫
notify()
- 隨機喚醒一個等待的執行緒
- 不釋放鎖(要等 synchronized 區塊結束)
notifyAll()
- 喚醒所有等待的執行緒
- 推薦使用(避免訊號遺失)
Lock 介面(java.util.concurrent.locks)
ReentrantLock
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class Counter {
private int count = 0;
private Lock lock = new ReentrantLock();
public void increment() {
lock.lock(); // 獲取鎖
try {
count++;
} finally {
lock.unlock(); // 一定要在 finally 釋放鎖!
}
}
public int getCount() {
lock.lock();
try {
return count;
} finally {
lock.unlock();
}
}
}
ReentrantLock vs synchronized
// synchronized 優點:
// ✅ 語法簡單
// ✅ 自動釋放鎖
// ✅ JVM 優化好
// ReentrantLock 優點:
// ✅ tryLock()(嘗試獲取鎖)
// ✅ lockInterruptibly()(可中斷)
// ✅ 公平鎖(fairness)
// ✅ 多個 Condition
// tryLock 範例
Lock lock = new ReentrantLock();
if (lock.tryLock()) {
try {
// 獲得鎖,執行
} finally {
lock.unlock();
}
} else {
// 沒獲得鎖,做其他事
System.out.println("無法獲得鎖,稍後重試");
}
// tryLock 帶超時
if (lock.tryLock(1, TimeUnit.SECONDS)) {
try {
// 獲得鎖
} finally {
lock.unlock();
}
} else {
// 1 秒內沒獲得鎖
}
// 公平鎖(先等待的先獲得)
Lock fairLock = new ReentrantLock(true);
ReadWriteLock
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
class Cache {
private Map<String, String> data = new HashMap<>();
private ReadWriteLock rwLock = new ReentrantReadWriteLock();
// 讀取(允許多執行緒同時讀)
public String get(String key) {
rwLock.readLock().lock();
try {
return data.get(key);
} finally {
rwLock.readLock().unlock();
}
}
// 寫入(獨佔)
public void put(String key, String value) {
rwLock.writeLock().lock();
try {
data.put(key, value);
} finally {
rwLock.writeLock().unlock();
}
}
}
// 特點:
// ✅ 多個執行緒可以同時讀
// ✅ 寫入時獨佔(其他讀寫都阻塞)
// ✅ 適合讀多寫少的情境
執行緒池(Thread Pool)
為什麼需要執行緒池?
問題:
每次建立執行緒 → 成本高
頻繁建立/銷毀 → 浪費資源
解決:
執行緒池 → 重複使用執行緒
管理執行緒數量 → 避免過多執行緒
ExecutorService
import java.util.concurrent.*;
public class ThreadPoolDemo {
public static void main(String[] args) {
// 建立固定大小的執行緒池
ExecutorService executor = Executors.newFixedThreadPool(5);
// 提交 10 個任務
for (int i = 1; i <= 10; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("Task " + taskId + " 執行於 " +
Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {}
});
}
// 關閉執行緒池
executor.shutdown(); // 不接受新任務,等待已提交的完成
try {
// 等待所有任務完成
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow(); // 強制關閉
}
} catch (InterruptedException e) {
executor.shutdownNow();
}
}
}
// 輸出:
// Task 1 執行於 pool-1-thread-1
// Task 2 執行於 pool-1-thread-2
// Task 3 執行於 pool-1-thread-3
// Task 4 執行於 pool-1-thread-4
// Task 5 執行於 pool-1-thread-5
// Task 6 執行於 pool-1-thread-1 ← 重複使用
// ...
執行緒池類型
// 1. Fixed Thread Pool(固定大小)
ExecutorService fixed = Executors.newFixedThreadPool(10);
// - 固定 10 個執行緒
// - 適合負載穩定的場景
// 2. Cached Thread Pool(快取)
ExecutorService cached = Executors.newCachedThreadPool();
// - 根據需要建立執行緒
// - 閒置 60 秒自動回收
// - 適合大量短期任務
// 3. Single Thread Executor(單一執行緒)
ExecutorService single = Executors.newSingleThreadExecutor();
// - 只有一個執行緒
// - 保證任務依序執行
// - 適合需要順序執行的場景
// 4. Scheduled Thread Pool(排程)
ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(5);
// - 可以延遲執行
// - 可以定期執行
ScheduledExecutorService 範例
import java.util.concurrent.*;
public class ScheduledDemo {
public static void main(String[] args) {
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
// 延遲 3 秒執行
scheduler.schedule(() -> {
System.out.println("3 秒後執行");
}, 3, TimeUnit.SECONDS);
// 延遲 1 秒後,每 2 秒執行一次
scheduler.scheduleAtFixedRate(() -> {
System.out.println("定期執行: " + System.currentTimeMillis());
}, 1, 2, TimeUnit.SECONDS);
// 讓程式執行 10 秒後關閉
try {
Thread.sleep(10000);
} catch (InterruptedException e) {}
scheduler.shutdown();
}
}
// 輸出:
// 3 秒後執行
// 定期執行: 1699876543210
// 定期執行: 1699876545210
// 定期執行: 1699876547210
// ...
自訂 ThreadPoolExecutor
import java.util.concurrent.*;
public class CustomThreadPool {
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, // corePoolSize:核心執行緒數
10, // maximumPoolSize:最大執行緒數
60, // keepAliveTime:閒置存活時間
TimeUnit.SECONDS, // 時間單位
new ArrayBlockingQueue<>(100), // 工作佇列
new ThreadPoolExecutor.CallerRunsPolicy() // 拒絕策略
);
// 使用執行緒池
for (int i = 1; i <= 20; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("Task " + taskId);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {}
});
}
executor.shutdown();
}
}
// 參數說明:
// corePoolSize:常駐執行緒數
// maximumPoolSize:最大執行緒數(負載高時擴展)
// keepAliveTime:超過 core 的執行緒閒置多久後回收
// workQueue:任務佇列
// rejectedExecutionHandler:任務拒絕策略
// 拒絕策略:
// AbortPolicy:拋出異常(預設)
// CallerRunsPolicy:由呼叫者執行
// DiscardPolicy:靜默丟棄
// DiscardOldestPolicy:丟棄最舊的任務
原子類別(Atomic Classes)
AtomicInteger
import java.util.concurrent.atomic.AtomicInteger;
class Counter {
private AtomicInteger count = new AtomicInteger(0);
public void increment() {
count.incrementAndGet(); // 原子操作
}
public int getCount() {
return count.get();
}
}
public class AtomicDemo {
public static void main(String[] args) throws InterruptedException {
Counter counter = new Counter();
// 1000 個執行緒,每個 +1
Thread[] threads = new Thread[1000];
for (int i = 0; i < 1000; i++) {
threads[i] = new Thread(() -> counter.increment());
threads[i].start();
}
for (Thread t : threads) {
t.join();
}
System.out.println("Count: " + counter.getCount()); // 一定是 1000!
}
}
// Atomic 類別:
// AtomicInteger
// AtomicLong
// AtomicBoolean
// AtomicReference
// 常用方法:
int get() // 取得值
void set(int newValue) // 設定值
int getAndIncrement() // get 然後 +1
int incrementAndGet() // +1 然後 get
int getAndSet(int newValue) // 取得舊值並設定新值
boolean compareAndSet(int expect, int update) // CAS 操作
CAS(Compare-And-Swap)
import java.util.concurrent.atomic.AtomicInteger;
public class CASDemo {
public static void main(String[] args) {
AtomicInteger value = new AtomicInteger(10);
// CAS: 如果值是 10,就改成 20
boolean success = value.compareAndSet(10, 20);
System.out.println("成功: " + success); // true
System.out.println("值: " + value.get()); // 20
// 再次 CAS: 如果值是 10,就改成 30
success = value.compareAndSet(10, 30);
System.out.println("成功: " + success); // false(值已經不是 10 了)
System.out.println("值: " + value.get()); // 20
}
}
// CAS 是無鎖(Lock-Free)演算法
// 原理:
// 1. 讀取記憶體值 A
// 2. 準備寫入新值 B
// 3. 寫入前檢查記憶體值是否還是 A
// 4. 如果是,寫入 B;否則重試
並發集合(Concurrent Collections)
ConcurrentHashMap
import java.util.concurrent.ConcurrentHashMap;
public class ConcurrentMapDemo {
public static void main(String[] args) {
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
// 多執行緒安全地操作
map.put("key1", 100);
map.putIfAbsent("key2", 200); // 不存在才 put
// 原子操作
map.compute("key1", (k, v) -> v + 1); // 100 → 101
map.merge("key2", 50, Integer::sum); // 200 + 50 = 250
System.out.println(map);
}
}
// ConcurrentHashMap vs Hashtable vs synchronizedMap
// Hashtable:整個方法加鎖(效能差)
// synchronizedMap:整個 Map 加鎖(效能差)
// ConcurrentHashMap:分段鎖(效能好)
CopyOnWriteArrayList
import java.util.concurrent.CopyOnWriteArrayList;
public class CopyOnWriteDemo {
public static void main(String[] args) {
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
list.add("A");
list.add("B");
// 迭代時不會拋出 ConcurrentModificationException
for (String item : list) {
System.out.println(item);
list.add("C"); // 可以在迭代中修改
}
System.out.println(list);
}
}
// CopyOnWriteArrayList 特點:
// ✅ 讀取不加鎖(效能好)
// ✅ 寫入時複製整個陣列
// ❌ 寫入效能差(適合讀多寫少)
// ❌ 記憶體佔用大
BlockingQueue
import java.util.concurrent.*;
public class BlockingQueueDemo {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5);
// 生產者
new Thread(() -> {
try {
for (int i = 1; i <= 10; i++) {
queue.put(i); // 滿了會阻塞
System.out.println("生產: " + i);
Thread.sleep(500);
}
} catch (InterruptedException e) {}
}).start();
// 消費者
new Thread(() -> {
try {
while (true) {
Integer value = queue.take(); // 空了會阻塞
System.out.println("消費: " + value);
Thread.sleep(1000);
}
} catch (InterruptedException e) {}
}).start();
}
}
// BlockingQueue 實作:
// ArrayBlockingQueue:固定大小
// LinkedBlockingQueue:可選大小
// PriorityBlockingQueue:優先權佇列
// DelayQueue:延遲佇列
// SynchronousQueue:無容量(直接交接)
CompletableFuture(非同步編程)
基本用法
import java.util.concurrent.CompletableFuture;
public class CompletableFutureDemo {
public static void main(String[] args) throws Exception {
// 非同步執行
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("非同步任務執行於: " + Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {}
return "Hello";
});
System.out.println("主執行緒繼續執行...");
// 取得結果(阻塞)
String result = future.get();
System.out.println("結果: " + result);
}
}
鏈式操作
import java.util.concurrent.CompletableFuture;
public class CompletableFutureChain {
public static void main(String[] args) throws Exception {
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> {
System.out.println("步驟 1");
return "Hello";
})
.thenApply(s -> {
System.out.println("步驟 2");
return s + " World";
})
.thenApply(s -> {
System.out.println("步驟 3");
return s.toUpperCase();
});
System.out.println(future.get()); // HELLO WORLD
}
}
// 常用方法:
// thenApply:轉換結果
// thenAccept:消費結果(無返回值)
// thenRun:執行動作(不關心結果)
// thenCompose:組合 Future
// thenCombine:合併兩個 Future
組合多個 Future
import java.util.concurrent.CompletableFuture;
public class CombineFuture {
public static void main(String[] args) throws Exception {
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
sleep(1000);
return 10;
});
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
sleep(2000);
return 20;
});
// 合併結果
CompletableFuture<Integer> combined = future1.thenCombine(future2, (a, b) -> {
return a + b;
});
System.out.println("結果: " + combined.get()); // 30
// 等待所有完成
CompletableFuture<Void> allOf = CompletableFuture.allOf(future1, future2);
allOf.get(); // 阻塞直到全部完成
// 任一完成
CompletableFuture<Object> anyOf = CompletableFuture.anyOf(future1, future2);
System.out.println("最快完成: " + anyOf.get());
}
private static void sleep(int ms) {
try {
Thread.sleep(ms);
} catch (InterruptedException e) {}
}
}
異常處理
import java.util.concurrent.CompletableFuture;
public class ExceptionHandling {
public static void main(String[] args) throws Exception {
CompletableFuture<Integer> future = CompletableFuture
.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("錯誤");
}
return 100;
})
.exceptionally(ex -> {
System.out.println("發生異常: " + ex.getMessage());
return 0; // 預設值
})
.thenApply(result -> result * 2);
System.out.println("結果: " + future.get());
// 或使用 handle(同時處理成功和失敗)
CompletableFuture<Integer> future2 = CompletableFuture
.supplyAsync(() -> 100 / 0) // 會拋異常
.handle((result, ex) -> {
if (ex != null) {
System.out.println("異常: " + ex.getMessage());
return 0;
}
return result;
});
System.out.println("結果 2: " + future2.get());
}
}
ThreadLocal
執行緒區域變數
public class ThreadLocalDemo {
private static ThreadLocal<Integer> threadLocal = ThreadLocal.withInitial(() -> 0);
public static void main(String[] args) {
// 執行緒 1
new Thread(() -> {
threadLocal.set(100);
System.out.println("T1: " + threadLocal.get()); // 100
}).start();
// 執行緒 2
new Thread(() -> {
threadLocal.set(200);
System.out.println("T2: " + threadLocal.get()); // 200
}).start();
// 主執行緒
System.out.println("Main: " + threadLocal.get()); // 0(預設值)
}
}
// ThreadLocal 特點:
// ✅ 每個執行緒有自己的副本
// ✅ 執行緒隔離
// ✅ 不需要同步
// ❌ 記憶體洩漏風險(要記得 remove())
實際應用:使用者會話
public class UserContext {
private static ThreadLocal<User> userThreadLocal = new ThreadLocal<>();
public static void setUser(User user) {
userThreadLocal.set(user);
}
public static User getUser() {
return userThreadLocal.get();
}
public static void clear() {
userThreadLocal.remove(); // 防止記憶體洩漏
}
}
// 在 Web 應用中使用
class RequestFilter {
public void doFilter(HttpRequest request) {
// 請求開始
User user = authenticate(request);
UserContext.setUser(user);
try {
// 處理請求...
// 任何地方都可以用 UserContext.getUser() 取得使用者
} finally {
// 請求結束
UserContext.clear(); // 清理,避免記憶體洩漏
}
}
}
多執行緒與 CPU 的關係
CPU 基礎概念
1. CPU 核心架構
現代 CPU 架構:
┌─────────────────────────────────────┐
│ CPU 晶片 │
│ ┌──────────┐ ┌──────────┐ │
│ │ Core 0 │ │ Core 1 │ │
│ │ ┌──────┐ │ │ ┌──────┐ │ │
│ │ │Thread│ │ │ │Thread│ │ │
│ │ │ 0 │ │ │ │ 2 │ │ │
│ │ ├──────┤ │ │ ├──────┤ │ │
│ │ │Thread│ │ │ │Thread│ │ │
│ │ │ 1 │ │ │ │ 3 │ │ │
│ │ └──────┘ │ │ └──────┘ │ │
│ └──────────┘ └──────────┘ │
│ │
│ ┌──────────┐ ┌──────────┐ │
│ │ Core 2 │ │ Core 3 │ │
│ │ ┌──────┐ │ │ ┌──────┐ │ │
│ │ │Thread│ │ │ │Thread│ │ │
│ │ │ 4 │ │ │ │ 6 │ │ │
│ │ ├──────┤ │ │ ├──────┤ │ │
│ │ │Thread│ │ │ │Thread│ │ │
│ │ │ 5 │ │ │ │ 7 │ │ │
│ │ └──────┘ │ │ └──────┘ │ │
│ └──────────┘ └──────────┘ │
└─────────────────────────────────────┘
術語說明:
- Physical Core(實體核心):4 個
- Logical Core(邏輯核心/超執行緒):8 個
- Thread(硬體執行緒):每個核心可以有 1-2 個
2. 查看 CPU 資訊
// Java 中查看 CPU 核心數
public class CPUInfo {
public static void main(String[] args) {
// 可用的處理器數量(邏輯核心數)
int processors = Runtime.getRuntime().availableProcessors();
System.out.println("可用處理器數量: " + processors);
// 取得作業系統資訊
OperatingSystemMXBean osBean = ManagementFactory.getOperatingSystemMXBean();
System.out.println("作業系統: " + osBean.getName());
System.out.println("架構: " + osBean.getArch());
System.out.println("可用處理器: " + osBean.getAvailableProcessors());
// 如果是 com.sun.management.OperatingSystemMXBean
if (osBean instanceof com.sun.management.OperatingSystemMXBean) {
com.sun.management.OperatingSystemMXBean sunOsBean =
(com.sun.management.OperatingSystemMXBean) osBean;
System.out.println("系統 CPU 負載: " +
sunOsBean.getSystemCpuLoad() * 100 + "%");
System.out.println("程序 CPU 負載: " +
sunOsBean.getProcessCpuLoad() * 100 + "%");
}
}
}
// 輸出範例:
// 可用處理器數量: 8
// 作業系統: Mac OS X
// 架構: x86_64
// 可用處理器: 8
// 系統 CPU 負載: 35.2%
// 程序 CPU 負載: 12.5%
# Linux/Mac:查看 CPU 資訊
lscpu # 詳細 CPU 資訊
nproc # 邏輯核心數
grep -c ^processor /proc/cpuinfo # 邏輯核心數(Linux)
# Mac 特定
sysctl hw.physicalcpu # 實體核心數
sysctl hw.logicalcpu # 邏輯核心數
# 輸出範例:
# hw.physicalcpu: 4 ← 4 個實體核心
# hw.logicalcpu: 8 ← 8 個邏輯核心(支援超執行緒)
CPU 密集 vs I/O 密集
任務類型分類
CPU 密集型(CPU-Bound):
- 特徵:大量運算,很少等待
- 範例:
• 數學運算(加密、解密)
• 圖像處理
• 視頻編碼
• 資料壓縮
• 機器學習訓練
- CPU 使用率:高(接近 100%)
- 瓶頸:CPU 運算能力
I/O 密集型(I/O-Bound):
- 特徵:大量等待,運算較少
- 範例:
• 資料庫查詢
• 網路請求(HTTP API)
• 檔案讀寫
• 訊息佇列操作
- CPU 使用率:低(大部分時間在等待)
- 瓶頸:I/O 速度(網路、磁碟)
判斷任務類型
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadMXBean;
public class TaskTypeAnalyzer {
public static void main(String[] args) throws Exception {
ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
// 測試 CPU 密集任務
Thread cpuTask = new Thread(() -> {
long result = 0;
for (long i = 0; i < 10_000_000_000L; i++) {
result += i;
}
});
long startCpu = threadBean.getCurrentThreadCpuTime();
long startTime = System.nanoTime();
cpuTask.start();
cpuTask.join();
long endCpu = threadBean.getCurrentThreadCpuTime();
long endTime = System.nanoTime();
long cpuTime = endCpu - startCpu;
long wallTime = endTime - startTime;
double cpuUsage = (double) cpuTime / wallTime * 100;
System.out.println("CPU 使用率: " + cpuUsage + "%");
// CPU 密集:接近 100%
// I/O 密集:遠低於 100%(可能 5-20%)
}
}
執行緒池大小設定原則
基本公式
CPU 密集型任務:
執行緒數 = CPU 核心數 + 1
原因:
- CPU 已經滿載,更多執行緒會增加上下文切換開銷
- +1 是為了當某個執行緒暫停時(page fault),另一個可以補上
範例(8 核心):
Thread Pool Size = 8 + 1 = 9
---
I/O 密集型任務:
執行緒數 = CPU 核心數 × (1 + I/O 時間 / CPU 時間)
或更簡化:
執行緒數 = CPU 核心數 × 2 (保守估計)
執行緒數 = CPU 核心數 × (2 到 50) (根據 I/O 等待比例)
範例(8 核心,I/O 時間佔 80%):
Thread Pool Size = 8 × (1 + 0.8 / 0.2) = 8 × 5 = 40
---
Little's Law(排隊理論):
執行緒數 = 每秒請求數 × 平均響應時間
範例(每秒 100 個請求,平均 0.5 秒完成):
Thread Pool Size = 100 × 0.5 = 50
實際設定建議
public class ThreadPoolSizeCalculator {
public static void main(String[] args) {
int cpuCores = Runtime.getRuntime().availableProcessors();
// 1. CPU 密集型
int cpuIntensivePoolSize = cpuCores + 1;
System.out.println("CPU 密集型建議執行緒數: " + cpuIntensivePoolSize);
// 2. I/O 密集型(保守)
int ioIntensivePoolSize = cpuCores * 2;
System.out.println("I/O 密集型建議執行緒數(保守): " + ioIntensivePoolSize);
// 3. I/O 密集型(根據 I/O 比例)
double ioWaitRatio = 0.8; // 80% 時間在等待 I/O
int ioIntensivePoolSize2 = (int) (cpuCores * (1 + ioWaitRatio / (1 - ioWaitRatio)));
System.out.println("I/O 密集型建議執行緒數(計算): " + ioIntensivePoolSize2);
// 4. 混合型(Web 應用常見)
int mixedPoolSize = cpuCores * 4; // 經驗值
System.out.println("混合型建議執行緒數: " + mixedPoolSize);
}
}
// 輸出範例(8 核心):
// CPU 密集型建議執行緒數: 9
// I/O 密集型建議執行緒數(保守): 16
// I/O 密集型建議執行緒數(計算): 40
// 混合型建議執行緒數: 32
執行緒池參數詳細設定
ThreadPoolExecutor 完整參數
import java.util.concurrent.*;
public class ThreadPoolConfiguration {
public static void main(String[] args) {
int cpuCores = Runtime.getRuntime().availableProcessors();
// 情境 1:CPU 密集型(圖像處理、加密)
ThreadPoolExecutor cpuBoundPool = new ThreadPoolExecutor(
cpuCores, // corePoolSize:核心執行緒數
cpuCores + 1, // maximumPoolSize:最大執行緒數
60L, // keepAliveTime:閒置存活時間
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100), // 有界佇列
new ThreadPoolExecutor.CallerRunsPolicy() // 拒絕策略
);
// 情境 2:I/O 密集型(資料庫、網路)
ThreadPoolExecutor ioBoundPool = new ThreadPoolExecutor(
cpuCores * 2, // corePoolSize
cpuCores * 4, // maximumPoolSize
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000), // 較大佇列
new ThreadPoolExecutor.CallerRunsPolicy()
);
// 情境 3:Web 應用(混合型)
ThreadPoolExecutor webAppPool = new ThreadPoolExecutor(
10, // 最少 10 個執行緒(應對突發)
cpuCores * 2, // 最多不超過 CPU * 2
120L, // 較長的閒置時間
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(500), // 固定大小佇列
new ThreadFactory() {
private int count = 0;
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "WebApp-Worker-" + count++);
t.setDaemon(false);
return t;
}
},
new ThreadPoolExecutor.AbortPolicy() // 超過負載直接拒絕
);
// 情境 4:定時任務
ScheduledThreadPoolExecutor scheduledPool = new ScheduledThreadPoolExecutor(
5, // 5 個核心執行緒足夠
new ThreadFactory() {
private int count = 0;
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "Scheduled-Worker-" + count++);
t.setDaemon(true); // Daemon 執行緒
return t;
}
}
);
}
}
參數設定原則
corePoolSize(核心執行緒數):
✅ 設定為最小並發需求
✅ 這些執行緒會一直存活
✅ 即使閒置也不會回收(除非設定 allowCoreThreadTimeOut)
建議:
- CPU 密集:CPU 核心數
- I/O 密集:CPU 核心數 × 2
- Web 應用:10 - 50(根據實際負載)
---
maximumPoolSize(最大執行緒數):
✅ 負載高峰時的上限
✅ 超過 core 的執行緒會在閒置後回收
建議:
- CPU 密集:corePoolSize + 1(不需要太多)
- I/O 密集:corePoolSize × 2 或更多
- Web 應用:根據壓測結果調整
---
keepAliveTime(閒置存活時間):
✅ 超過 core 的執行緒閒置多久後回收
✅ 避免資源浪費
建議:
- 穩定負載:60 秒
- 波動負載:120-300 秒(保留執行緒應對突發)
---
workQueue(工作佇列):
✅ 任務在執行緒都忙碌時排隊
類型選擇:
- ArrayBlockingQueue:有界(防止記憶體溢出)推薦
- LinkedBlockingQueue:可選界限
- SynchronousQueue:無容量(直接交接)
- PriorityBlockingQueue:優先權佇列
建議:
- 使用有界佇列(防止 OOM)
- 大小設定為預期並發數的 2-10 倍
---
RejectedExecutionHandler(拒絕策略):
✅ 佇列滿且無法建立新執行緒時的處理
選項:
1. AbortPolicy:拋出異常(預設)
2. CallerRunsPolicy:由呼叫者執行(推薦)
3. DiscardPolicy:靜默丟棄
4. DiscardOldestPolicy:丟棄最舊的任務
建議:
- 重要任務:CallerRunsPolicy(降低提交速度)
- 可丟棄任務:DiscardPolicy
- 需要監控:AbortPolicy + 異常處理
動態調整執行緒池大小
監控與自動調整
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class DynamicThreadPool {
private ThreadPoolExecutor executor;
private ScheduledExecutorService monitor;
public DynamicThreadPool() {
int cpuCores = Runtime.getRuntime().availableProcessors();
// 建立執行緒池
executor = new ThreadPoolExecutor(
cpuCores,
cpuCores * 4,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(500),
new ThreadPoolExecutor.CallerRunsPolicy()
);
// 啟動監控執行緒
monitor = Executors.newSingleThreadScheduledExecutor();
monitor.scheduleAtFixedRate(this::adjustPoolSize, 10, 10, TimeUnit.SECONDS);
}
private void adjustPoolSize() {
int activeThreads = executor.getActiveCount();
int poolSize = executor.getPoolSize();
int queueSize = executor.getQueue().size();
int maxPoolSize = executor.getMaximumPoolSize();
System.out.println("=== 執行緒池狀態 ===");
System.out.println("活躍執行緒: " + activeThreads);
System.out.println("執行緒池大小: " + poolSize);
System.out.println("佇列大小: " + queueSize);
System.out.println("最大池大小: " + maxPoolSize);
// 調整策略
double utilization = (double) activeThreads / poolSize;
if (utilization > 0.9 && poolSize < maxPoolSize) {
// 使用率 > 90%,增加核心執行緒數
int newCoreSize = Math.min(poolSize + 2, maxPoolSize);
executor.setCorePoolSize(newCoreSize);
System.out.println("→ 增加核心執行緒數至: " + newCoreSize);
} else if (utilization < 0.3 && poolSize > executor.getCorePoolSize()) {
// 使用率 < 30%,減少核心執行緒數
int newCoreSize = Math.max(poolSize - 2, Runtime.getRuntime().availableProcessors());
executor.setCorePoolSize(newCoreSize);
System.out.println("→ 減少核心執行緒數至: " + newCoreSize);
}
// 佇列積壓警告
if (queueSize > 400) {
System.out.println("⚠️ 警告:佇列積壓嚴重,考慮增加執行緒數或優化任務");
}
}
public void submit(Runnable task) {
executor.submit(task);
}
public void shutdown() {
monitor.shutdown();
executor.shutdown();
}
}
壓力測試與調優
public class LoadTesting {
public static void main(String[] args) throws Exception {
int cpuCores = Runtime.getRuntime().availableProcessors();
// 測試不同執行緒池大小
int[] poolSizes = {cpuCores, cpuCores * 2, cpuCores * 4, cpuCores * 8};
for (int size : poolSizes) {
System.out.println("\n=== 測試執行緒池大小: " + size + " ===");
testThreadPool(size, 1000); // 1000 個任務
}
}
private static void testThreadPool(int poolSize, int taskCount) throws Exception {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
poolSize,
poolSize,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(taskCount),
new ThreadPoolExecutor.CallerRunsPolicy()
);
long startTime = System.currentTimeMillis();
// 提交任務
CountDownLatch latch = new CountDownLatch(taskCount);
for (int i = 0; i < taskCount; i++) {
executor.submit(() -> {
try {
// 模擬 I/O 密集任務(80% 時間在等待)
Thread.sleep(100); // I/O 等待
// 模擬少量運算
long result = 0;
for (int j = 0; j < 100000; j++) {
result += j;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
latch.countDown();
}
});
}
// 等待所有任務完成
latch.await();
long endTime = System.currentTimeMillis();
executor.shutdown();
executor.awaitTermination(1, TimeUnit.MINUTES);
long duration = endTime - startTime;
double throughput = (double) taskCount / duration * 1000;
System.out.println("完成時間: " + duration + " ms");
System.out.println("吞吐量: " + String.format("%.2f", throughput) + " 任務/秒");
System.out.println("平均每個任務: " + (duration / taskCount) + " ms");
}
}
// 輸出範例:
// === 測試執行緒池大小: 8 ===
// 完成時間: 12500 ms
// 吞吐量: 80.00 任務/秒
// 平均每個任務: 12 ms
//
// === 測試執行緒池大小: 16 ===
// 完成時間: 6250 ms
// 吞吐量: 160.00 任務/秒
// 平均每個任務: 6 ms
//
// === 測試執行緒池大小: 32 ===
// 完成時間: 3200 ms
// 吞吐量: 312.50 任務/秒
// 平均每個任務: 3 ms
//
// → 結論:I/O 密集任務,執行緒數越多越好(直到某個點)
CPU 親和性(CPU Affinity)
概念說明
CPU 親和性:
將執行緒綁定到特定的 CPU 核心
優點:
✅ 減少上下文切換
✅ 提升 CPU 快取命中率
✅ 更好的效能可預測性
缺點:
❌ 降低靈活性
❌ 可能造成負載不均
❌ 需要手動管理
適用場景:
- 即時系統
- 高頻交易
- 低延遲應用
Linux 上設定 CPU 親和性
// Java 本身不支援 CPU 親和性
// 需要透過 JNI 或第三方套件
// 方法 1:使用 taskset(啟動時)
// taskset -c 0-3 java -jar myapp.jar
// 將 JVM 綁定到 CPU 0-3
// 方法 2:使用 JNA 套件
import com.sun.jna.Library;
import com.sun.jna.Native;
import com.sun.jna.Platform;
public interface CLibrary extends Library {
CLibrary INSTANCE = Native.load("c", CLibrary.class);
int sched_setaffinity(int pid, int cpusetsize, long[] cpuset);
int sched_getaffinity(int pid, int cpusetsize, long[] cpuset);
}
public class CPUAffinity {
public static void setCPUAffinity(int cpuCore) {
if (!Platform.isLinux()) {
System.out.println("僅支援 Linux");
return;
}
long[] cpuset = new long[1];
cpuset[0] = 1L << cpuCore; // 綁定到特定核心
int pid = 0; // 0 = 當前執行緒
int result = CLibrary.INSTANCE.sched_setaffinity(
pid,
Long.SIZE / 8,
cpuset
);
if (result == 0) {
System.out.println("成功綁定到 CPU " + cpuCore);
} else {
System.out.println("綁定失敗");
}
}
}
// 注意:大部分情況下不需要設定 CPU 親和性
// 讓作業系統的排程器處理通常更好
效能監控與診斷
JMX 監控執行緒池
import javax.management.MBeanServer;
import javax.management.ObjectName;
import java.lang.management.ManagementFactory;
import java.util.concurrent.*;
public class ThreadPoolMonitor {
public static void main(String[] args) throws Exception {
// 建立執行緒池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
4, 8, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100)
);
// 註冊到 JMX
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
ObjectName name = new ObjectName("com.example:type=ThreadPool,name=MyPool");
ThreadPoolMXBean mxBean = new ThreadPoolMXBean(executor);
mbs.registerMBean(mxBean, name);
// 定期輸出統計
ScheduledExecutorService monitor = Executors.newSingleThreadScheduledExecutor();
monitor.scheduleAtFixedRate(() -> {
System.out.println("\n=== 執行緒池統計 ===");
System.out.println("核心執行緒數: " + executor.getCorePoolSize());
System.out.println("當前執行緒數: " + executor.getPoolSize());
System.out.println("活躍執行緒數: " + executor.getActiveCount());
System.out.println("最大執行緒數: " + executor.getMaximumPoolSize());
System.out.println("曾經最大執行緒數: " + executor.getLargestPoolSize());
System.out.println("任務總數: " + executor.getTaskCount());
System.out.println("已完成任務數: " + executor.getCompletedTaskCount());
System.out.println("佇列大小: " + executor.getQueue().size());
// 計算使用率
double utilization = (double) executor.getActiveCount() / executor.getPoolSize();
System.out.println("執行緒使用率: " + String.format("%.2f%%", utilization * 100));
// 計算完成率
double completionRate = (double) executor.getCompletedTaskCount() / executor.getTaskCount();
System.out.println("任務完成率: " + String.format("%.2f%%", completionRate * 100));
}, 5, 5, TimeUnit.SECONDS);
// 提交一些任務進行測試
for (int i = 0; i < 50; i++) {
final int taskId = i;
executor.submit(() -> {
try {
System.out.println("Task " + taskId + " 執行中");
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
Thread.sleep(100);
}
// 運行一段時間後關閉
Thread.sleep(30000);
monitor.shutdown();
executor.shutdown();
}
}
使用 VisualVM 或 JConsole
# 啟動 JVM 時加入 JMX 參數
java -Dcom.sun.management.jmxremote \
-Dcom.sun.management.jmxremote.port=9999 \
-Dcom.sun.management.jmxremote.authenticate=false \
-Dcom.sun.management.jmxremote.ssl=false \
-jar myapp.jar
# 然後用 VisualVM 或 JConsole 連接
jvisualvm
jconsole localhost:9999
# 可以看到:
# - 執行緒數量
# - CPU 使用率
# - 記憶體使用
# - 執行緒狀態
# - 執行緒 Dump
實戰範例:最佳化 Web 應用執行緒池
import java.util.concurrent.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ThreadPoolConfig {
@Bean(name = "apiRequestPool")
public Executor apiRequestPool() {
int cpuCores = Runtime.getRuntime().availableProcessors();
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// I/O 密集(API 請求)
executor.setCorePoolSize(cpuCores * 2); // 16(假設 8 核)
executor.setMaxPoolSize(cpuCores * 4); // 32
executor.setQueueCapacity(500); // 佇列容量
executor.setKeepAliveSeconds(120); // 閒置時間
executor.setThreadNamePrefix("API-"); // 執行緒名稱
// 拒絕策略:由呼叫者執行(降速)
executor.setRejectedExecutionHandler(
new ThreadPoolExecutor.CallerRunsPolicy()
);
// 等待任務完成後才關閉
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(60);
executor.initialize();
return executor;
}
@Bean(name = "backgroundTaskPool")
public Executor backgroundTaskPool() {
int cpuCores = Runtime.getRuntime().availableProcessors();
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 背景任務(CPU + I/O 混合)
executor.setCorePoolSize(cpuCores); // 8
executor.setMaxPoolSize(cpuCores * 2); // 16
executor.setQueueCapacity(1000); // 較大佇列
executor.setKeepAliveSeconds(300); // 較長閒置時間
executor.setThreadNamePrefix("Background-");
// 拒絕策略:丟棄(非關鍵任務)
executor.setRejectedExecutionHandler(
new ThreadPoolExecutor.DiscardPolicy()
);
executor.initialize();
return executor;
}
@Bean(name = "cpuIntensivePool")
public Executor cpuIntensivePool() {
int cpuCores = Runtime.getRuntime().availableProcessors();
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// CPU 密集(圖像處理、加密)
executor.setCorePoolSize(cpuCores); // 8
executor.setMaxPoolSize(cpuCores + 1); // 9
executor.setQueueCapacity(50); // 小佇列
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("CPU-");
// 拒絕策略:阻塞(避免過載)
executor.setRejectedExecutionHandler(
new ThreadPoolExecutor.CallerRunsPolicy()
);
executor.initialize();
return executor;
}
}
// 使用
@Service
public class MyService {
@Autowired
@Qualifier("apiRequestPool")
private Executor apiRequestPool;
@Autowired
@Qualifier("backgroundTaskPool")
private Executor backgroundTaskPool;
public void handleRequest() {
// API 請求
apiRequestPool.execute(() -> {
// 呼叫外部 API
callExternalAPI();
});
// 背景任務
backgroundTaskPool.execute(() -> {
// 發送 Email
sendEmail();
});
}
}
決策流程圖
開始設定執行緒池
↓
┌─────────────────────────────┐
│ 1. 判斷任務類型 │
└──────────┬──────────────────┘
↓
┌──────┴───────┐
│ │
CPU 密集 I/O 密集
│ │
↓ ↓
核心數 + 1 核心數 × (2~50)
│ │
└──────┬───────┘
↓
┌─────────────────────────────┐
│ 2. 測量實際 I/O 比例 │
│ - 使用率監控 │
│ - 壓力測試 │
└──────────┬──────────────────┘
↓
┌─────────────────────────────┐
│ 3. 設定執行緒池參數 │
│ - corePoolSize │
│ - maximumPoolSize │
│ - workQueue │
│ - RejectedHandler │
└──────────┬──────────────────┘
↓
┌─────────────────────────────┐
│ 4. 壓力測試驗證 │
│ - 吞吐量測試 │
│ - 延遲測試 │
│ - 穩定性測試 │
└──────────┬──────────────────┘
↓
┌─────────────────────────────┐
│ 5. 生產環境監控與調整 │
│ - CPU 使用率 │
│ - 執行緒使用率 │
│ - 佇列積壓 │
│ - 響應時間 │
└──────────┬──────────────────┘
↓
調優完成 ✓
常見問題 FAQ
Q1: 執行緒池設太大會怎樣?
問題:
❌ 上下文切換開銷增加
❌ 記憶體消耗增加(每個執行緒約 1MB stack)
❌ CPU 快取效率下降
❌ 可能導致 OutOfMemoryError
範例:
1000 個執行緒 × 1MB = 1GB 記憶體
頻繁切換 → CPU 時間浪費在切換而非工作
建議:
從保守值開始(CPU 核心數 × 2)
根據監控逐步調整
Q2: 執行緒池設太小會怎樣?
問題:
❌ CPU 利用率低
❌ 吞吐量不足
❌ 請求佇列積壓
❌ 響應時間變長
範例:
8 核心 CPU,只設定 2 個執行緒
→ CPU 大部分時間閒置
→ 無法發揮硬體效能
建議:
至少 = CPU 核心數
I/O 密集任務應該更多
Q3: 如何知道當前設定是否合適?
監控指標:
✅ CPU 使用率應該在 70-80%(CPU 密集)
✅ 執行緒使用率應該在 80-90%
✅ 佇列積壓應該接近 0
✅ 響應時間穩定且符合 SLA
✅ 沒有執行緒飢餓現象
警告訊號:
⚠️ CPU 使用率一直很低 → 執行緒太少
⚠️ 佇列持續積壓 → 執行緒太少或任務太慢
⚠️ 頻繁拒絕任務 → 池太小或佇列太小
⚠️ 大量上下文切換 → 執行緒太多
Q4: 生產環境如何調整?
階段式調整:
1. 從保守值開始
- CPU 密集:核心數 + 1
- I/O 密集:核心數 × 2
2. 壓力測試
- 模擬生產流量
- 逐步增加執行緒數
- 找到甜蜜點(Sweet Spot)
3. 灰度發佈
- 小範圍測試新配置
- 監控關鍵指標
- 逐步推廣
4. 持續監控
- 設定告警閾值
- 定期檢視報表
- 根據業務變化調整
執行緒安全的設計模式
單例模式(雙重檢查鎖定)
public class Singleton {
// volatile 防止指令重排序
private static volatile Singleton instance;
private Singleton() {}
public static Singleton getInstance() {
if (instance == null) { // 第一次檢查
synchronized (Singleton.class) {
if (instance == null) { // 第二次檢查
instance = new Singleton();
}
}
}
return instance;
}
}
// 或使用靜態內部類(更推薦)
public class Singleton {
private Singleton() {}
private static class Holder {
private static final Singleton INSTANCE = new Singleton();
}
public static Singleton getInstance() {
return Holder.INSTANCE;
}
}
不可變物件(Immutable)
// 執行緒安全的不可變類別
public final class ImmutablePerson {
private final String name;
private final int age;
public ImmutablePerson(String name, int age) {
this.name = name;
this.age = age;
}
public String getName() {
return name;
}
public int getAge() {
return age;
}
// 修改時返回新物件
public ImmutablePerson withAge(int newAge) {
return new ImmutablePerson(this.name, newAge);
}
}
// 不可變物件的特點:
// ✅ 天生執行緒安全
// ✅ 可以安全地共享
// ✅ 適合作為 Map 的 key
// ❌ 修改時需要建立新物件
最佳實踐
1. 選擇適當的同步機制
簡單情境:
✅ synchronized
需要進階功能(tryLock, 公平鎖):
✅ ReentrantLock
讀多寫少:
✅ ReadWriteLock
無競爭:
✅ Atomic 類別
高併發:
✅ ConcurrentHashMap, CopyOnWriteArrayList
2. 避免過度同步
// ❌ 不好:整個方法同步
public synchronized void process() {
// 長時間運算
heavyComputation();
// 修改共享變數
count++;
}
// ✅ 好:只同步關鍵區域
public void process() {
// 長時間運算(不需要同步)
heavyComputation();
// 只同步修改共享變數的部分
synchronized (this) {
count++;
}
}
3. 使用執行緒池
// ❌ 不好:每次建立新執行緒
for (int i = 0; i < 1000; i++) {
new Thread(() -> task()).start();
}
// ✅ 好:使用執行緒池
ExecutorService executor = Executors.newFixedThreadPool(10);
for (int i = 0; i < 1000; i++) {
executor.submit(() -> task());
}
executor.shutdown();
4. 正確關閉資源
ExecutorService executor = Executors.newFixedThreadPool(10);
try {
// 使用執行緒池
executor.submit(() -> task());
} finally {
// 關閉執行緒池
executor.shutdown();
try {
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
5. 處理 InterruptedException
// ✅ 正確處理中斷
public void task() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// 恢復中斷狀態
Thread.currentThread().interrupt();
// 清理資源
cleanup();
// 返回
return;
}
}
常見陷阱
1. 忘記 volatile
// ❌ 錯誤:沒有 volatile
class StopThread {
private boolean stop = false;
public void run() {
while (!stop) {
// 可能永遠不會停止(JVM 優化)
}
}
public void stop() {
stop = true;
}
}
// ✅ 正確:加上 volatile
class StopThread {
private volatile boolean stop = false;
// volatile 保證可見性
}
2. 雙重檢查鎖定的陷阱
// ❌ 錯誤:沒有 volatile
private static Singleton instance;
public static Singleton getInstance() {
if (instance == null) {
synchronized (Singleton.class) {
if (instance == null) {
instance = new Singleton(); // 可能指令重排序
}
}
}
return instance; // 可能返回未完全初始化的物件
}
// ✅ 正確:加上 volatile
private static volatile Singleton instance;
3. 共享可變狀態
// ❌ 錯誤:共享可變 List
class SharedList {
private List<String> list = new ArrayList<>(); // 不安全
public void add(String item) {
list.add(item); // 多執行緒會出問題
}
}
// ✅ 方案 1:同步
class SharedList {
private List<String> list = new ArrayList<>();
public synchronized void add(String item) {
list.add(item);
}
}
// ✅ 方案 2:使用執行緒安全的集合
class SharedList {
private List<String> list = new CopyOnWriteArrayList<>();
public void add(String item) {
list.add(item); // 安全
}
}
效能考量
1. 鎖的粒度
粗粒度鎖(Coarse-grained):
- 鎖的範圍大
- 競爭激烈
- 效能差
細粒度鎖(Fine-grained):
- 鎖的範圍小
- 競爭少
- 效能好
2. 鎖消除(Lock Elimination)
// JVM 會自動優化
public String concat(String s1, String s2) {
StringBuffer sb = new StringBuffer(); // 區域變數
sb.append(s1); // synchronized 方法
sb.append(s2);
return sb.toString();
}
// JVM 發現 sb 是區域變數,不會被其他執行緒存取
// 自動消除 synchronized 鎖
3. 鎖粗化(Lock Coarsening)
// 原本:頻繁加鎖/解鎖
for (int i = 0; i < 1000; i++) {
synchronized (lock) {
count++;
}
}
// JVM 優化後:合併為一次加鎖
synchronized (lock) {
for (int i = 0; i < 1000; i++) {
count++;
}
}
除錯技巧
1. 檢測死鎖
# 使用 jstack 檢測死鎖
jstack <pid>
# 或在程式中
ThreadMXBean tmx = ManagementFactory.getThreadMXBean();
long[] deadlockedThreads = tmx.findDeadlockedThreads();
if (deadlockedThreads != null) {
System.out.println("發現死鎖!");
}
2. 執行緒 Dump
// 取得所有執行緒的堆疊追蹤
Map<Thread, StackTraceElement[]> map = Thread.getAllStackTraces();
for (Map.Entry<Thread, StackTraceElement[]> entry : map.entrySet()) {
Thread thread = entry.getKey();
StackTraceElement[] stack = entry.getValue();
System.out.println("Thread: " + thread.getName());
for (StackTraceElement element : stack) {
System.out.println(" " + element);
}
}
3. 日誌記錄
import java.util.logging.Logger;
public class ThreadSafeLogger {
private static final Logger logger = Logger.getLogger("ThreadLog");
public void log(String message) {
logger.info("[" + Thread.currentThread().getName() + "] " + message);
}
}
參考資源
官方文件:
- Java Concurrency Tutorial: https://docs.oracle.com/javase/tutorial/essential/concurrency/
- java.util.concurrent API: https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/package-summary.html
推薦書籍:
- Java Concurrency in Practice (Brian Goetz)
- Effective Java (Joshua Bloch)
總結
核心概念速記
執行緒 = 輕量級程序(共享記憶體)
同步 = 防止競態條件(synchronized, Lock)
死鎖 = 互相等待資源(避免:統一順序加鎖)
執行緒池 = 重複使用執行緒(避免頻繁建立)
常用類別對照
| 用途 | 類別 | 說明 |
|---|---|---|
| 建立執行緒 | Thread, Runnable, Callable | 基本執行緒 |
| 同步 | synchronized, ReentrantLock | 互斥鎖 |
| 執行緒池 | ExecutorService | 管理執行緒 |
| 原子操作 | AtomicInteger | 無鎖並發 |
| 並發集合 | ConcurrentHashMap | 執行緒安全 |
| 非同步 | CompletableFuture | 非同步編程 |
建立日期:2025-11-07 最後更新:2025-11-18