Java 并发编程全面指南
目录
- 线程基础
- synchronized 关键字
- volatile 关键字
- Lock 与 synchronized 对比
- 线程池(ThreadPoolExecutor)
- CompletableFuture
- 并发集合(ConcurrentHashMap)
- 总结
1. 线程基础
1.1 进程与线程
- 进程:操作系统分配资源的基本单位,拥有独立的地址空间。
- 线程:CPU 调度的基本单位,共享进程的堆和方法区,拥有独立的程序计数器、虚拟机栈和本地方法栈。
1.2 创建线程的三种方式
方式一:继承 Thread 类
class MyThread extends Thread {
@Override
public void run() {
System.out.println("Thread 方式运行: " + Thread.currentThread().getName());
}
}
// 使用
new MyThread().start();
方式二:实现 Runnable 接口
class MyRunnable implements Runnable {
@Override
public void run() {
System.out.println("Runnable 方式运行: " + Thread.currentThread().getName());
}
}
// 使用
new Thread(new MyRunnable()).start();
方式三:实现 Callable 接口(带返回值)
class MyCallable implements Callable<String> {
@Override
public String call() throws Exception {
return "Callable 返回值: " + Thread.currentThread().getName();
}
}
// 使用
FutureTask<String> futureTask = new FutureTask<>(new MyCallable());
new Thread(futureTask).start();
String result = futureTask.get(); // 获取返回值(可能阻塞)
System.out.println(result);
1.3 线程生命周期
Java 线程有六种状态(Thread.State 枚举):
| 状态 | 说明 |
|---|---|
NEW |
新建,尚未调用 start() |
RUNNABLE |
可运行,可能在运行或等待 CPU 时间片 |
BLOCKED |
阻塞,等待监视器锁 |
WAITING |
等待,需其他线程显式唤醒 |
TIMED_WAITING |
超时等待,到时间自动唤醒 |
TERMINATED |
终止,执行完毕 |

1.4 线程常用方法
Thread t = new Thread(() -> {
try {
Thread.sleep(1000); // 休眠 1 秒
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // 恢复中断状态
}
});
t.start();
t.join(); // 等待 t 执行完毕
t.setDaemon(true); // 设为守护线程
t.interrupt(); // 中断线程
2. synchronized 关键字
2.1 核心原理
synchronized 是 Java 内置的互斥锁,基于 Monitor 对象实现。在 JVM 层面,通过 monitorenter 和 monitorexit 指令完成加锁与解锁。
2.2 三种使用方式
public class SynchronizedDemo {
// 1. 同步实例方法 — 锁是当前实例对象
public synchronized void instanceMethod() {
System.out.println("实例方法锁: this");
}
// 2. 同步静态方法 — 锁是当前 Class 对象
public static synchronized void staticMethod() {
System.out.println("静态方法锁: Demo.class");
}
// 3. 同步代码块 — 可指定任意对象作为锁
private final Object lock = new Object();
public void blockMethod() {
synchronized (lock) {
System.out.println("代码块锁: 自定义对象");
}
}
}
2.3 锁升级过程(JDK 1.6+)
Java 1.6 对 synchronized 进行了大幅优化,引入锁升级机制,从低到高依次为:
无锁 → 偏向锁 → 轻量级锁 → 重量级锁
| 锁状态 | 适用场景 | 说明 |
|---|---|---|
| 偏向锁 | 只有一个线程访问同步块 | 在对象头 Mark Word 中存储线程 ID |
| 轻量级锁 | 多线程交替执行,无竞争 | CAS 自旋获取锁,避免阻塞 |
| 重量级锁 | 多线程同时竞争 | 依赖操作系统互斥量,线程阻塞 |
// JVM 参数查看锁信息
// -XX:+PrintBiasedLockingStatistics 打印偏向锁统计
// -XX:BiasedLockingStartupDelay=0 关闭偏向锁延迟
2.4 可重入性
synchronized 是可重入锁,同一个线程可以多次获取同一把锁:
public class ReentrantDemo {
public synchronized void outer() {
System.out.println("outer");
inner(); // 再次获取同一把锁
}
public synchronized void inner() {
System.out.println("inner");
}
public static void main(String[] args) {
new ReentrantDemo().outer(); // 正常运行,不会死锁
}
}
3. volatile 关键字
3.1 两大语义
- 保证可见性:写操作立即刷新到主内存,读操作从主内存重新读取。
- 禁止指令重排序:通过插入内存屏障实现,防止编译器和 CPU 对指令进行乱序优化。
3.2 典型使用场景
public class VolatileDemo {
// 使用 volatile 保证 flag 的可见性
private static volatile boolean flag = true;
public static void main(String[] args) throws InterruptedException {
new Thread(() -> {
while (flag) {
// 循环等待
}
System.out.println("线程检测到 flag 变化,退出");
}).start();
Thread.sleep(1000);
flag = false; // 主线程修改,工作线程立即可见
System.out.println("主线程将 flag 设为 false");
}
}
如果不加
volatile,工作线程可能永远看不到主线程对flag的修改,导致死循环。
3.3 volatile 与 synchronized 的区别
| 特性 | volatile | synchronized |
|---|---|---|
| 可见性 | ✅ | ✅ |
| 原子性 | ❌(仅单次读/写) | ✅ |
| 有序性 | ✅(禁止重排序) | ✅ |
| 是否阻塞 | ❌ | ✅ |
| 适用场景 | 状态标记、单次读写 | 复合操作、需要原子性 |
3.4 注意:volatile 不能保证原子性
public class Counter {
private static volatile int count = 0;
public static void increment() {
count++; // 这不是原子操作!包含:读、加、写三步
}
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 10; i++) {
new Thread(() -> {
for (int j = 0; j < 1000; j++) {
increment();
}
}).start();
}
Thread.sleep(2000);
System.out.println("最终结果: " + count); // 通常小于 10000
}
}
要解决此问题,应使用 synchronized、AtomicInteger 或 Lock。
4. Lock 与 synchronized 对比
4.1 Lock 接口体系
// 核心接口
public interface Lock {
void lock();
void lockInterruptibly() throws InterruptedException;
boolean tryLock();
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
void unlock();
Condition newCondition();
}
主要实现类:
- ReentrantLock:可重入、可中断、可公平
- ReentrantReadWriteLock:读写分离,读读不互斥
4.2 ReentrantLock 使用示例
import java.util.concurrent.locks.ReentrantLock;
public class LockDemo {
private final ReentrantLock lock = new ReentrantLock(true); // fair=true 公平锁
private int count = 0;
public void increment() {
lock.lock(); // 加锁
try {
count++;
} finally {
lock.unlock(); // 务必在 finally 中释放
}
}
// tryLock 避免阻塞
public boolean tryIncrement() {
if (lock.tryLock()) {
try {
count++;
return true;
} finally {
lock.unlock();
}
}
return false; // 获取锁失败,不阻塞
}
}
4.3 公平锁与非公平锁
// 公平锁:按线程等待时间顺序获取锁
ReentrantLock fairLock = new ReentrantLock(true);
// 非公平锁(默认):插队,可能产生饥饿
ReentrantLock unfairLock = new ReentrantLock(false);
4.4 Condition 条件等待
public class ConditionDemo {
private final ReentrantLock lock = new ReentrantLock();
private final Condition notEmpty = lock.newCondition();
private final Queue<String> queue = new LinkedList<>();
public void produce(String item) {
lock.lock();
try {
queue.add(item);
notEmpty.signal(); // 唤醒消费者
} finally {
lock.unlock();
}
}
public String consume() throws InterruptedException {
lock.lock();
try {
while (queue.isEmpty()) {
notEmpty.await(); // 等待生产者
}
return queue.poll();
} finally {
lock.unlock();
}
}
}
4.5 对比总结
| 维度 | synchronized | Lock |
|---|---|---|
| 关键字 vs 接口 | Java 关键字 | Java 接口 |
| 锁释放 | 自动(代码块结束) | 手动(finally 中 unlock) |
| 中断响应 | 不支持 | 支持 lockInterruptibly() |
| 超时获取 | 不支持 | 支持 tryLock(timeout) |
| 公平性 | 非公平 | 支持公平/非公平 |
| 条件变量 | wait()/notify() |
Condition.await()/signal() |
| 性能 (低竞争) | 较好(偏向锁) | 较轻量级锁相当 |
| 性能 (高竞争) | 较差(升级为重量级) | 较好(自旋 + CAS) |
选型建议:
- 简单同步场景首选
synchronized——代码简洁,不易出错。 - 需要超时、可中断、公平锁或读写分离时,使用
Lock。
5. 线程池(ThreadPoolExecutor)
5.1 为什么使用线程池
- 降低资源消耗:复用已创建的线程,减少创建/销毁开销。
- 提高响应速度:任务到达时无需等待线程创建。
- 提高可管理性:统一分配、调优和监控。
5.2 ThreadPoolExecutor 核心构造参数
public ThreadPoolExecutor(
int corePoolSize, // 核心线程数
int maximumPoolSize, // 最大线程数
long keepAliveTime, // 非核心线程空闲存活时间
TimeUnit unit, // 存活时间单位
BlockingQueue<Runnable> workQueue, // 任务队列
ThreadFactory threadFactory, // 线程工厂(可选)
RejectedExecutionHandler handler // 拒绝策略(可选)
)
5.3 任务提交处理流程
提交任务 →
① 工作线程数 < corePoolSize? → 创建核心线程执行
② 工作线程数 ≥ corePoolSize? → 尝试放入 workQueue
③ workQueue 已满? → 创建非核心线程执行
④ 线程数 ≥ maximumPoolSize? → 执行拒绝策略
5.4 四种拒绝策略
| 策略 | 说明 |
|---|---|
AbortPolicy(默认) |
抛出 RejectedExecutionException |
CallerRunsPolicy |
由提交任务的线程自己执行 |
DiscardPolicy |
直接丢弃任务,不抛异常 |
DiscardOldestPolicy |
丢弃队列中最旧的任务,重试提交 |
5.5 完整示例
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class ThreadPoolExample {
public static void main(String[] args) {
// 自定义线程工厂(命名线程,方便排查)
ThreadFactory threadFactory = new ThreadFactory() {
private final AtomicInteger counter = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "work-thread-" + counter.getAndIncrement());
t.setDaemon(false);
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
};
// 创建线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, // corePoolSize
5, // maximumPoolSize
60, // keepAliveTime
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(10), // 有界队列
threadFactory,
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);
// 提交任务
for (int i = 0; i < 20; i++) {
int taskId = i;
executor.submit(() -> {
System.out.println(Thread.currentThread().getName() + " 执行任务 #" + taskId);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
// 优雅关闭
executor.shutdown();
try {
if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
executor.shutdownNow(); // 强制关闭
}
} catch (InterruptedException e) {
executor.shutdownNow();
}
System.out.println("所有任务执行完毕");
}
}
5.6 预定义线程池(不推荐直接使用)
// FixedThreadPool — 固定大小,无界队列(可能 OOM)
ExecutorService fixedPool = Executors.newFixedThreadPool(5);
// CachedThreadPool — 可缓存,最大 Integer.MAX_VALUE(可能 OOM)
ExecutorService cachedPool = Executors.newCachedThreadPool();
// SingleThreadExecutor — 单线程,无界队列(可能 OOM)
ExecutorService singlePool = Executors.newSingleThreadExecutor();
// ScheduledThreadPool — 定时/周期任务
ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(3);
阿里巴巴 Java 开发手册强制规定:不允许使用
Executors创建线程池,必须使用ThreadPoolExecutor明确参数。
5.7 如何合理配置线程池
- CPU 密集型任务:
corePoolSize = CPU核数 + 1 - IO 密集型任务:
corePoolSize = CPU核数 * 2(或更多,取决于 IO 等待时间占比) - 混合型任务:拆分 CPU 密集和 IO 密集部分
int cpuCores = Runtime.getRuntime().availableProcessors();
// CPU 密集型
int corePoolSize = cpuCores + 1;
// IO 密集型(估算等待时间占比 80%)
int corePoolSize = cpuCores * 2;
// 更精确:corePoolSize = CPU核数 * (1 + 等待时间/计算时间)
6. CompletableFuture
6.1 概述
CompletableFuture 是 Java 8 引入的异步编程工具,实现了 Future 和 CompletionStage,支持函数式编程风格的异步编排。
6.2 基础用法
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class CompletableFutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 1. 运行异步任务(无返回值)
CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
System.out.println("异步任务执行中...");
});
// 2. 运行异步任务(有返回值)
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
return "任务结果";
});
System.out.println(future2.get()); // 输出: 任务结果
// 3. 链式处理
CompletableFuture<Integer> future3 = CompletableFuture.supplyAsync(() -> 10)
.thenApply(r -> r * 2) // 同步转换
.thenApply(r -> r + 5); // 继续转换
System.out.println(future3.get()); // 输出: 25
// 4. 消费结果(不返回值)
CompletableFuture.supplyAsync(() -> "Hello")
.thenAccept(System.out::println); // 输出: Hello
// 5. 不处理结果,只执行 Runnable
CompletableFuture.supplyAsync(() -> "Hello")
.thenRun(() -> System.out.println("任务完成"));
Thread.sleep(1000); // 等待异步完成
}
}
6.3 异常处理
CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("出错了!");
}
return "成功";
})
.exceptionally(ex -> {
System.out.println("捕获异常: " + ex.getMessage());
return "默认值";
})
.thenAccept(System.out::println);
6.4 多个 Future 编排
// 1. 两个任务都完成 thenCombine
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "A");
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> "B");
CompletableFuture<String> combined = f1.thenCombine(f2, (a, b) -> a + " + " + b);
System.out.println(combined.get()); // 输出: A + B
// 2. 任意一个完成 thenAcceptEither
f1.acceptEither(f2, result -> System.out.println("最快结果: " + result));
// 3. 等待全部完成 allOf
CompletableFuture<Void> all = CompletableFuture.allOf(f1, f2);
all.join(); // 等待所有
// 4. 等待任意一个完成 anyOf
CompletableFuture<Object> any = CompletableFuture.anyOf(f1, f2);
System.out.println("最先完成的: " + any.get());
6.5 真实业务场景:并行调用聚合
public class OrderService {
public OrderInfo getOrderDetail(Long orderId) {
// 三个查询并行执行,全部完成后聚合
CompletableFuture<OrderBase> baseFuture =
CompletableFuture.supplyAsync(() -> queryOrderBase(orderId));
CompletableFuture<List<OrderItem>> itemsFuture =
CompletableFuture.supplyAsync(() -> queryOrderItems(orderId));
CompletableFuture<OrderLogistics> logisticsFuture =
CompletableFuture.supplyAsync(() -> queryLogistics(orderId));
return CompletableFuture.allOf(baseFuture, itemsFuture, logisticsFuture)
.thenApply(v -> {
OrderInfo info = new OrderInfo();
info.setBase(baseFuture.join());
info.setItems(itemsFuture.join());
info.setLogistics(logisticsFuture.join());
return info;
})
.join(); // 阻塞直到全部完成
}
}
7. 并发集合(ConcurrentHashMap)
7.1 HashMap 的线程安全问题
HashMap 在多线程环境下:
- JDK 1.7:头插法导致环形链表,引起
get()死循环。 - JDK 1.8:尾插法解决死循环,但
put()仍可能覆盖数据。
// 多线程下 HashMap 的问题演示
Map<String, Integer> map = new HashMap<>();
ExecutorService pool = Executors.newFixedThreadPool(10);
for (int i = 0; i < 1000; i++) {
int finalI = i;
pool.submit(() -> map.put("key" + finalI, finalI));
}
pool.shutdown();
System.out.println(map.size()); // 可能小于 1000(数据丢失)
7.2 ConcurrentHashMap 的演进
JDK 1.7:分段锁(Segment)
ConcurrentHashMap
├── Segment 0 (继承 ReentrantLock)
├── Segment 1
├── ...
└── Segment 15 (默认 16 个)
- 将数据分成多个 Segment,每个 Segment 独立加锁。
- 理论并发度为 16(Segment 数量)。
- 查询无锁(volatile 读取),修改锁住对应 Segment。
JDK 1.8+:CAS + synchronized
- 摒弃分段锁,采用数组 + 链表 + 红黑树结构。
- 使用
CAS操作数组节点,synchronized锁住链表/红黑树头节点。 - 并发升级为桶粒度,远高于 1.7 的分段锁。
7.3 ConcurrentHashMap 核心方法源码分析(JDK 1.8)
put 方法流程
final V putVal(K key, V value, boolean onlyIfAbsent) {
// 1. 计算 hash
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh; K fk; V fv;
// 2. 首次 put,初始化数组
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// 3. 当前桶为空,CAS 插入(无锁)
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
break;
}
// 4. 正在扩容,帮助迁移
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
// 5. 桶非空,synchronized 锁住头节点
else {
synchronized (f) {
// ... 遍历链表/红黑树插入
}
}
}
// 6. 检查是否需要扩容
addCount(1L, binCount);
return null;
}
7.4 常用并发集合一览
| 集合类 | 说明 |
|---|---|
ConcurrentHashMap |
线程安全 HashMap |
ConcurrentLinkedQueue |
基于 CAS 的无界非阻塞队列 |
ConcurrentSkipListMap |
有序的并发 Map(跳表实现) |
ConcurrentSkipListSet |
有序的并发 Set |
CopyOnWriteArrayList |
读多写少场景,写时复制 |
CopyOnWriteArraySet |
基于 CopyOnWriteArrayList 的 Set |
BlockingQueue 系列 |
阻塞队列(ArrayBlockingQueue / LinkedBlockingQueue / DelayQueue 等) |
7.5 使用建议
// ✅ 推荐:并发场景使用 ConcurrentHashMap
Map<String, Object> concurrentMap = new ConcurrentHashMap<>();
// ❌ 不推荐:手动加锁包装 HashMap(效率低且易出错)
Map<String, Object> bad = Collections.synchronizedMap(new HashMap<>());
// 注意:ConcurrentHashMap 不允许 key 或 value 为 null
// concurrentMap.put(null, "value"); // 抛 NullPointerException
// 常用原子方法
concurrentMap.putIfAbsent("key", "value"); // 不存在才 put
concurrentMap.computeIfAbsent("key", k -> "v"); // 不存在则计算
concurrentMap.merge("key", "v1", (old, v) -> old + v); // 合并
8. 总结
本文从底层到上层系统性地梳理了 Java 并发编程的核心知识:
| 主题 | 核心要点 |
|---|---|
| 线程基础 | 三种创建方式、六种状态、常用 API |
| synchronized | 内置锁、锁升级、可重入、三种用法 |
| volatile | 可见性 + 禁止重排序、不保证原子性 |
| Lock | 可中断、超时、公平、Condition 灵活控制 |
| 线程池 | 七大参数、四种拒绝策略、合理配置 |
| CompletableFuture | 异步编排、异常处理、多 Future 组合 |
| ConcurrentHashMap | 分段锁→CAS+synchronized、原子组合方法 |
学习路线建议
- 入门:线程创建 → synchronized → volatile
- 进阶:Lock → AQS 原理 → 并发容器
- 高级:线程池源码 → CompletableFuture → ForkJoinPool
- 实战:结合项目进行压测和调优
并发编程是 Java 开发者的核心能力之一,理解其原理而不只是会用,才能写出真正高效且正确的并发程序。
本文发布于 Java 学习博客,欢迎交流讨论。