菜单

Administrator
发布于 2026-05-18 / 0 阅读
0
0

Java 并发编程

Java 并发编程全面指南

目录

  1. 线程基础
  2. synchronized 关键字
  3. volatile 关键字
  4. Lock 与 synchronized 对比
  5. 线程池(ThreadPoolExecutor)
  6. CompletableFuture
  7. 并发集合(ConcurrentHashMap)
  8. 总结

1. 线程基础

1.1 进程与线程

  • 进程:操作系统分配资源的基本单位,拥有独立的地址空间。
  • 线程:CPU 调度的基本单位,共享进程的堆和方法区,拥有独立的程序计数器、虚拟机栈和本地方法栈。

1.2 创建线程的三种方式

方式一:继承 Thread 类

class MyThread extends Thread {
    @Override
    public void run() {
        System.out.println("Thread 方式运行: " + Thread.currentThread().getName());
    }
}

// 使用
new MyThread().start();

方式二:实现 Runnable 接口

class MyRunnable implements Runnable {
    @Override
    public void run() {
        System.out.println("Runnable 方式运行: " + Thread.currentThread().getName());
    }
}

// 使用
new Thread(new MyRunnable()).start();

方式三:实现 Callable 接口(带返回值)

class MyCallable implements Callable<String> {
    @Override
    public String call() throws Exception {
        return "Callable 返回值: " + Thread.currentThread().getName();
    }
}

// 使用
FutureTask<String> futureTask = new FutureTask<>(new MyCallable());
new Thread(futureTask).start();
String result = futureTask.get(); // 获取返回值(可能阻塞)
System.out.println(result);

1.3 线程生命周期

Java 线程有六种状态(Thread.State 枚举):

状态 说明
NEW 新建,尚未调用 start()
RUNNABLE 可运行,可能在运行或等待 CPU 时间片
BLOCKED 阻塞,等待监视器锁
WAITING 等待,需其他线程显式唤醒
TIMED_WAITING 超时等待,到时间自动唤醒
TERMINATED 终止,执行完毕

线程状态转换图

1.4 线程常用方法

Thread t = new Thread(() -> {
    try {
        Thread.sleep(1000);    // 休眠 1 秒
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt(); // 恢复中断状态
    }
});

t.start();
t.join();       // 等待 t 执行完毕
t.setDaemon(true); // 设为守护线程
t.interrupt();  // 中断线程

2. synchronized 关键字

2.1 核心原理

synchronized 是 Java 内置的互斥锁,基于 Monitor 对象实现。在 JVM 层面,通过 monitorenter 和 monitorexit 指令完成加锁与解锁。

2.2 三种使用方式

public class SynchronizedDemo {

    // 1. 同步实例方法 — 锁是当前实例对象
    public synchronized void instanceMethod() {
        System.out.println("实例方法锁: this");
    }

    // 2. 同步静态方法 — 锁是当前 Class 对象
    public static synchronized void staticMethod() {
        System.out.println("静态方法锁: Demo.class");
    }

    // 3. 同步代码块 — 可指定任意对象作为锁
    private final Object lock = new Object();

    public void blockMethod() {
        synchronized (lock) {
            System.out.println("代码块锁: 自定义对象");
        }
    }
}

2.3 锁升级过程(JDK 1.6+)

Java 1.6 对 synchronized 进行了大幅优化,引入锁升级机制,从低到高依次为:

无锁 → 偏向锁 → 轻量级锁 → 重量级锁
锁状态 适用场景 说明
偏向锁 只有一个线程访问同步块 在对象头 Mark Word 中存储线程 ID
轻量级锁 多线程交替执行,无竞争 CAS 自旋获取锁,避免阻塞
重量级锁 多线程同时竞争 依赖操作系统互斥量,线程阻塞
// JVM 参数查看锁信息
// -XX:+PrintBiasedLockingStatistics  打印偏向锁统计
// -XX:BiasedLockingStartupDelay=0    关闭偏向锁延迟

2.4 可重入性

synchronized可重入锁,同一个线程可以多次获取同一把锁:

public class ReentrantDemo {
    public synchronized void outer() {
        System.out.println("outer");
        inner(); // 再次获取同一把锁
    }

    public synchronized void inner() {
        System.out.println("inner");
    }

    public static void main(String[] args) {
        new ReentrantDemo().outer(); // 正常运行,不会死锁
    }
}

3. volatile 关键字

3.1 两大语义

  1. 保证可见性:写操作立即刷新到主内存,读操作从主内存重新读取。
  2. 禁止指令重排序:通过插入内存屏障实现,防止编译器和 CPU 对指令进行乱序优化。

3.2 典型使用场景

public class VolatileDemo {
    // 使用 volatile 保证 flag 的可见性
    private static volatile boolean flag = true;

    public static void main(String[] args) throws InterruptedException {
        new Thread(() -> {
            while (flag) {
                // 循环等待
            }
            System.out.println("线程检测到 flag 变化,退出");
        }).start();

        Thread.sleep(1000);
        flag = false; // 主线程修改,工作线程立即可见
        System.out.println("主线程将 flag 设为 false");
    }
}

如果不加 volatile,工作线程可能永远看不到主线程对 flag 的修改,导致死循环。

3.3 volatile 与 synchronized 的区别

特性 volatile synchronized
可见性
原子性 ❌(仅单次读/写)
有序性 ✅(禁止重排序)
是否阻塞
适用场景 状态标记、单次读写 复合操作、需要原子性

3.4 注意:volatile 不能保证原子性

public class Counter {
    private static volatile int count = 0;

    public static void increment() {
        count++; // 这不是原子操作!包含:读、加、写三步
    }

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                for (int j = 0; j < 1000; j++) {
                    increment();
                }
            }).start();
        }
        Thread.sleep(2000);
        System.out.println("最终结果: " + count); // 通常小于 10000
    }
}

要解决此问题,应使用 synchronizedAtomicIntegerLock


4. Lock 与 synchronized 对比

4.1 Lock 接口体系

// 核心接口
public interface Lock {
    void lock();
    void lockInterruptibly() throws InterruptedException;
    boolean tryLock();
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
    void unlock();
    Condition newCondition();
}

主要实现类:

  • ReentrantLock:可重入、可中断、可公平
  • ReentrantReadWriteLock:读写分离,读读不互斥

4.2 ReentrantLock 使用示例

import java.util.concurrent.locks.ReentrantLock;

public class LockDemo {
    private final ReentrantLock lock = new ReentrantLock(true); // fair=true 公平锁
    private int count = 0;

    public void increment() {
        lock.lock(); // 加锁
        try {
            count++;
        } finally {
            lock.unlock(); // 务必在 finally 中释放
        }
    }

    // tryLock 避免阻塞
    public boolean tryIncrement() {
        if (lock.tryLock()) {
            try {
                count++;
                return true;
            } finally {
                lock.unlock();
            }
        }
        return false; // 获取锁失败,不阻塞
    }
}

4.3 公平锁与非公平锁

// 公平锁:按线程等待时间顺序获取锁
ReentrantLock fairLock = new ReentrantLock(true);

// 非公平锁(默认):插队,可能产生饥饿
ReentrantLock unfairLock = new ReentrantLock(false);

4.4 Condition 条件等待

public class ConditionDemo {
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition notEmpty = lock.newCondition();
    private final Queue<String> queue = new LinkedList<>();

    public void produce(String item) {
        lock.lock();
        try {
            queue.add(item);
            notEmpty.signal(); // 唤醒消费者
        } finally {
            lock.unlock();
        }
    }

    public String consume() throws InterruptedException {
        lock.lock();
        try {
            while (queue.isEmpty()) {
                notEmpty.await(); // 等待生产者
            }
            return queue.poll();
        } finally {
            lock.unlock();
        }
    }
}

4.5 对比总结

维度 synchronized Lock
关键字 vs 接口 Java 关键字 Java 接口
锁释放 自动(代码块结束) 手动(finally 中 unlock)
中断响应 不支持 支持 lockInterruptibly()
超时获取 不支持 支持 tryLock(timeout)
公平性 非公平 支持公平/非公平
条件变量 wait()/notify() Condition.await()/signal()
性能 (低竞争) 较好(偏向锁) 较轻量级锁相当
性能 (高竞争) 较差(升级为重量级) 较好(自旋 + CAS)

选型建议

  • 简单同步场景首选 synchronized——代码简洁,不易出错。
  • 需要超时、可中断、公平锁或读写分离时,使用 Lock

5. 线程池(ThreadPoolExecutor)

5.1 为什么使用线程池

  • 降低资源消耗:复用已创建的线程,减少创建/销毁开销。
  • 提高响应速度:任务到达时无需等待线程创建。
  • 提高可管理性:统一分配、调优和监控。

5.2 ThreadPoolExecutor 核心构造参数

public ThreadPoolExecutor(
    int corePoolSize,        // 核心线程数
    int maximumPoolSize,     // 最大线程数
    long keepAliveTime,      // 非核心线程空闲存活时间
    TimeUnit unit,           // 存活时间单位
    BlockingQueue<Runnable> workQueue,  // 任务队列
    ThreadFactory threadFactory,        // 线程工厂(可选)
    RejectedExecutionHandler handler    // 拒绝策略(可选)
)

5.3 任务提交处理流程

提交任务 →
  ① 工作线程数 < corePoolSize? → 创建核心线程执行
  ② 工作线程数 ≥ corePoolSize? → 尝试放入 workQueue
  ③ workQueue 已满? → 创建非核心线程执行
  ④ 线程数 ≥ maximumPoolSize? → 执行拒绝策略

5.4 四种拒绝策略

策略 说明
AbortPolicy(默认) 抛出 RejectedExecutionException
CallerRunsPolicy 由提交任务的线程自己执行
DiscardPolicy 直接丢弃任务,不抛异常
DiscardOldestPolicy 丢弃队列中最旧的任务,重试提交

5.5 完整示例

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class ThreadPoolExample {

    public static void main(String[] args) {
        // 自定义线程工厂(命名线程,方便排查)
        ThreadFactory threadFactory = new ThreadFactory() {
            private final AtomicInteger counter = new AtomicInteger(1);
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r, "work-thread-" + counter.getAndIncrement());
                t.setDaemon(false);
                t.setPriority(Thread.NORM_PRIORITY);
                return t;
            }
        };

        // 创建线程池
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            2,              // corePoolSize
            5,              // maximumPoolSize
            60,             // keepAliveTime
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(10), // 有界队列
            threadFactory,
            new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
        );

        // 提交任务
        for (int i = 0; i < 20; i++) {
            int taskId = i;
            executor.submit(() -> {
                System.out.println(Thread.currentThread().getName() + " 执行任务 #" + taskId);
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }

        // 优雅关闭
        executor.shutdown();
        try {
            if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
                executor.shutdownNow(); // 强制关闭
            }
        } catch (InterruptedException e) {
            executor.shutdownNow();
        }

        System.out.println("所有任务执行完毕");
    }
}

5.6 预定义线程池(不推荐直接使用)

// FixedThreadPool — 固定大小,无界队列(可能 OOM)
ExecutorService fixedPool = Executors.newFixedThreadPool(5);

// CachedThreadPool — 可缓存,最大 Integer.MAX_VALUE(可能 OOM)
ExecutorService cachedPool = Executors.newCachedThreadPool();

// SingleThreadExecutor — 单线程,无界队列(可能 OOM)
ExecutorService singlePool = Executors.newSingleThreadExecutor();

// ScheduledThreadPool — 定时/周期任务
ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(3);

阿里巴巴 Java 开发手册强制规定:不允许使用 Executors 创建线程池,必须使用 ThreadPoolExecutor 明确参数。

5.7 如何合理配置线程池

  • CPU 密集型任务corePoolSize = CPU核数 + 1
  • IO 密集型任务corePoolSize = CPU核数 * 2(或更多,取决于 IO 等待时间占比)
  • 混合型任务:拆分 CPU 密集和 IO 密集部分
int cpuCores = Runtime.getRuntime().availableProcessors();

// CPU 密集型
int corePoolSize = cpuCores + 1;

// IO 密集型(估算等待时间占比 80%)
int corePoolSize = cpuCores * 2;
// 更精确:corePoolSize = CPU核数 * (1 + 等待时间/计算时间)

6. CompletableFuture

6.1 概述

CompletableFuture 是 Java 8 引入的异步编程工具,实现了 FutureCompletionStage,支持函数式编程风格的异步编排。

6.2 基础用法

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        // 1. 运行异步任务(无返回值)
        CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
            System.out.println("异步任务执行中...");
        });

        // 2. 运行异步任务(有返回值)
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            return "任务结果";
        });
        System.out.println(future2.get()); // 输出: 任务结果

        // 3. 链式处理
        CompletableFuture<Integer> future3 = CompletableFuture.supplyAsync(() -> 10)
            .thenApply(r -> r * 2)       // 同步转换
            .thenApply(r -> r + 5);       // 继续转换
        System.out.println(future3.get()); // 输出: 25

        // 4. 消费结果(不返回值)
        CompletableFuture.supplyAsync(() -> "Hello")
            .thenAccept(System.out::println); // 输出: Hello

        // 5. 不处理结果,只执行 Runnable
        CompletableFuture.supplyAsync(() -> "Hello")
            .thenRun(() -> System.out.println("任务完成"));

        Thread.sleep(1000); // 等待异步完成
    }
}

6.3 异常处理

CompletableFuture.supplyAsync(() -> {
        if (Math.random() > 0.5) {
            throw new RuntimeException("出错了!");
        }
        return "成功";
    })
    .exceptionally(ex -> {
        System.out.println("捕获异常: " + ex.getMessage());
        return "默认值";
    })
    .thenAccept(System.out::println);

6.4 多个 Future 编排

// 1. 两个任务都完成 thenCombine
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "A");
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> "B");
CompletableFuture<String> combined = f1.thenCombine(f2, (a, b) -> a + " + " + b);
System.out.println(combined.get()); // 输出: A + B

// 2. 任意一个完成 thenAcceptEither
f1.acceptEither(f2, result -> System.out.println("最快结果: " + result));

// 3. 等待全部完成 allOf
CompletableFuture<Void> all = CompletableFuture.allOf(f1, f2);
all.join(); // 等待所有

// 4. 等待任意一个完成 anyOf
CompletableFuture<Object> any = CompletableFuture.anyOf(f1, f2);
System.out.println("最先完成的: " + any.get());

6.5 真实业务场景:并行调用聚合

public class OrderService {
    public OrderInfo getOrderDetail(Long orderId) {
        // 三个查询并行执行,全部完成后聚合
        CompletableFuture<OrderBase> baseFuture =
            CompletableFuture.supplyAsync(() -> queryOrderBase(orderId));
        CompletableFuture<List<OrderItem>> itemsFuture =
            CompletableFuture.supplyAsync(() -> queryOrderItems(orderId));
        CompletableFuture<OrderLogistics> logisticsFuture =
            CompletableFuture.supplyAsync(() -> queryLogistics(orderId));

        return CompletableFuture.allOf(baseFuture, itemsFuture, logisticsFuture)
            .thenApply(v -> {
                OrderInfo info = new OrderInfo();
                info.setBase(baseFuture.join());
                info.setItems(itemsFuture.join());
                info.setLogistics(logisticsFuture.join());
                return info;
            })
            .join(); // 阻塞直到全部完成
    }
}

7. 并发集合(ConcurrentHashMap)

7.1 HashMap 的线程安全问题

HashMap 在多线程环境下:

  • JDK 1.7:头插法导致环形链表,引起 get() 死循环。
  • JDK 1.8:尾插法解决死循环,但 put() 仍可能覆盖数据。
// 多线程下 HashMap 的问题演示
Map<String, Integer> map = new HashMap<>();
ExecutorService pool = Executors.newFixedThreadPool(10);
for (int i = 0; i < 1000; i++) {
    int finalI = i;
    pool.submit(() -> map.put("key" + finalI, finalI));
}
pool.shutdown();
System.out.println(map.size()); // 可能小于 1000(数据丢失)

7.2 ConcurrentHashMap 的演进

JDK 1.7:分段锁(Segment)

ConcurrentHashMap
├── Segment 0 (继承 ReentrantLock)
├── Segment 1
├── ...
└── Segment 15 (默认 16 个)
  • 将数据分成多个 Segment,每个 Segment 独立加锁。
  • 理论并发度为 16(Segment 数量)。
  • 查询无锁(volatile 读取),修改锁住对应 Segment。

JDK 1.8+:CAS + synchronized

  • 摒弃分段锁,采用数组 + 链表 + 红黑树结构。
  • 使用 CAS 操作数组节点,synchronized 锁住链表/红黑树头节点。
  • 并发升级为桶粒度,远高于 1.7 的分段锁。

7.3 ConcurrentHashMap 核心方法源码分析(JDK 1.8)

put 方法流程

final V putVal(K key, V value, boolean onlyIfAbsent) {
    // 1. 计算 hash
    int hash = spread(key.hashCode());
    int binCount = 0;

    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh; K fk; V fv;

        // 2. 首次 put,初始化数组
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();

        // 3. 当前桶为空,CAS 插入(无锁)
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
                break;
        }

        // 4. 正在扩容,帮助迁移
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);

        // 5. 桶非空,synchronized 锁住头节点
        else {
            synchronized (f) {
                // ... 遍历链表/红黑树插入
            }
        }
    }
    // 6. 检查是否需要扩容
    addCount(1L, binCount);
    return null;
}

7.4 常用并发集合一览

集合类 说明
ConcurrentHashMap 线程安全 HashMap
ConcurrentLinkedQueue 基于 CAS 的无界非阻塞队列
ConcurrentSkipListMap 有序的并发 Map(跳表实现)
ConcurrentSkipListSet 有序的并发 Set
CopyOnWriteArrayList 读多写少场景,写时复制
CopyOnWriteArraySet 基于 CopyOnWriteArrayList 的 Set
BlockingQueue 系列 阻塞队列(ArrayBlockingQueue / LinkedBlockingQueue / DelayQueue 等)

7.5 使用建议

// ✅ 推荐:并发场景使用 ConcurrentHashMap
Map<String, Object> concurrentMap = new ConcurrentHashMap<>();

// ❌ 不推荐:手动加锁包装 HashMap(效率低且易出错)
Map<String, Object> bad = Collections.synchronizedMap(new HashMap<>());

// 注意:ConcurrentHashMap 不允许 key 或 value 为 null
// concurrentMap.put(null, "value"); // 抛 NullPointerException

// 常用原子方法
concurrentMap.putIfAbsent("key", "value");       // 不存在才 put
concurrentMap.computeIfAbsent("key", k -> "v");  // 不存在则计算
concurrentMap.merge("key", "v1", (old, v) -> old + v); // 合并

8. 总结

本文从底层到上层系统性地梳理了 Java 并发编程的核心知识:

主题 核心要点
线程基础 三种创建方式、六种状态、常用 API
synchronized 内置锁、锁升级、可重入、三种用法
volatile 可见性 + 禁止重排序、不保证原子性
Lock 可中断、超时、公平、Condition 灵活控制
线程池 七大参数、四种拒绝策略、合理配置
CompletableFuture 异步编排、异常处理、多 Future 组合
ConcurrentHashMap 分段锁→CAS+synchronized、原子组合方法

学习路线建议

  1. 入门:线程创建 → synchronized → volatile
  2. 进阶:Lock → AQS 原理 → 并发容器
  3. 高级:线程池源码 → CompletableFuture → ForkJoinPool
  4. 实战:结合项目进行压测和调优

并发编程是 Java 开发者的核心能力之一,理解其原理而不只是会用,才能写出真正高效且正确的并发程序。


本文发布于 Java 学习博客,欢迎交流讨论。


评论