Java 使用阻塞队列实现生产者消费者模式
一、什么是生产者消费者模式
生产者消费者模式(Producer-Consumer Pattern)是并发编程中最经典的设计模式之一。它解决的是一个共享数据缓冲区的协调问题:
- 生产者(Producer):负责生成数据,将数据放入共享缓冲区
- 消费者(Consumer):负责处理数据,从共享缓冲区取出数据
- 共享缓冲区:生产者与消费者之间的中介容器,通常是一个线程安全的队列
生产者线程 ——→ [共享缓冲区(阻塞队列)] ——→ 消费者线程
为什么需要这个模式?
在没有缓冲区的情况下,生产者和消费者必须相互等待,导致耦合度高、效率低下。引入缓冲区后带来了以下好处:
| 优势 | 说明 |
|---|---|
| 解耦 | 生产者和消费者不直接依赖,只需知道缓冲区接口 |
| 削峰填谷 | 生产者生产速度快时,数据暂存在缓冲区,消费者可以慢慢处理 |
| 支持忙闲不均 | 生产者和消费者可以有不同的处理速度,互不阻塞等待 |
核心问题
生产者消费者模式需要解决的核心问题是线程间的协调与同步:
- 当缓冲区已满时,生产者必须等待,直到消费者消费了数据腾出空间
- 当缓冲区为空时,消费者必须等待,直到生产者生产了新的数据
- 多个生产者/消费者同时操作缓冲区时,必须保证线程安全
在 Java 中,有两种典型的实现方式:wait/notify 机制 和 阻塞队列(BlockingQueue)。下面通过完整的代码示例来对比这两种方式。
二、基于 wait/notify 的实现
这是最底层的实现方式,使用 synchronized + wait() / notifyAll() 手工控制线程的等待与唤醒。
原理
- 使用
synchronized保证对缓冲区的互斥访问 - 当条件不满足时(缓冲区满/空),调用
wait()让当前线程进入等待集 - 当条件可能满足时,调用
notifyAll()唤醒所有等待线程
完整代码
import java.util.LinkedList;
import java.util.Queue;
/**
* 使用 wait/notify 实现生产者消费者模式
*/
public class WaitNotifyProducerConsumer {
private static final int CAPACITY = 5; // 缓冲区容量
private static final Queue<Integer> buffer = new LinkedList<>();
private static class Producer extends Thread {
@Override
public void run() {
int value = 0;
while (true) {
synchronized (buffer) {
// 1. 缓冲区满时,生产者等待
while (buffer.size() == CAPACITY) {
try {
System.out.println("[生产者] 缓冲区已满,等待消费...");
buffer.wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
// 2. 生产数据并放入缓冲区
value++;
buffer.offer(value);
System.out.println("[生产者] 生产了数据: " + value + ",当前大小: " + buffer.size());
// 3. 唤醒可能等待的消费者
buffer.notifyAll();
}
// 模拟生产间隔
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
}
}
private static class Consumer extends Thread {
@Override
public void run() {
while (true) {
synchronized (buffer) {
// 1. 缓冲区空时,消费者等待
while (buffer.isEmpty()) {
try {
System.out.println("[消费者] 缓冲区为空,等待生产...");
buffer.wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
// 2. 从缓冲区取数据并消费
int value = buffer.poll();
System.out.println("[消费者] 消费了数据: " + value + ",剩余: " + buffer.size());
// 3. 唤醒可能等待的生产者
buffer.notifyAll();
}
// 模拟消费间隔(比生产者慢,体验缓冲效果)
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
}
}
public static void main(String[] args) {
System.out.println("===== wait/notify 实现生产者消费者模式 =====");
new Producer().start();
new Consumer().start();
}
}
代码要点分析
⚠️ 必须使用
while而非if来检查条件线程被唤醒后,条件可能再次不满足(虚假唤醒),因此必须在循环中重新检查条件。
这是《Java 并发编程实战》中明确要求的编程规范。
| 要点 | 说明 |
|---|---|
synchronized (buffer) |
使用共享缓冲区对象作为锁,保证互斥 |
buffer.wait() |
释放锁,线程进入等待状态 |
buffer.notifyAll() |
唤醒所有等待线程,而非 notify()(避免信号丢失) |
while 循环 |
防止虚假唤醒,条件不满足时继续等待 |
wait/notify 的痛点
- 代码繁琐:每个条件判断、等待、唤醒都需要手动编写
- 容易出错:
notify()和notifyAll()用错可能导致死锁或信号丢失 - 锁粒度粗:整个缓冲区操作都在同步块中,并发度受限
- 条件管理复杂:多个条件共用一把锁,
notifyAll()会唤醒所有等待线程,造成不必要的上下文切换
三、基于 BlockingQueue 的实现
BlockingQueue 是 java.util.concurrent 包提供的线程安全队列,它内置了等待/通知机制,开发者无需手动处理复杂的同步逻辑。
什么是 BlockingQueue?
BlockingQueue 是一个接口,它扩展了 Queue,增加了阻塞的存取方法:
| 方法 | 含义 |
|---|---|
put(E e) |
队列满时阻塞,直到有空间 |
take() |
队列空时阻塞,直到有元素 |
offer(E e, long timeout, TimeUnit unit) |
超时版入队 |
poll(long timeout, TimeUnit unit) |
超时版出队 |
常用实现类
| 实现类 | 特点 |
|---|---|
| ArrayBlockingQueue | 基于数组的有界阻塞队列,FIFO |
| LinkedBlockingQueue | 基于链表的可选有界阻塞队列,FIFO |
| PriorityBlockingQueue | 支持优先级排序的无界阻塞队列 |
| SynchronousQueue | 不存储元素的阻塞队列,每个 put 必须等待一个 take |
| DelayQueue | 支持延迟获取元素的无界阻塞队列 |
完整代码
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/**
* 使用 BlockingQueue 实现生产者消费者模式
*/
public class BlockingQueueProducerConsumer {
private static final int CAPACITY = 5; // 缓冲区容量
private static final BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(CAPACITY);
private static class Producer extends Thread {
@Override
public void run() {
int value = 0;
while (true) {
try {
value++;
// put 方法:队列满时自动阻塞,直到有空间
queue.put(value);
System.out.println("[生产者] 生产了数据: " + value + ",队列中元素数: " + queue.size());
// 模拟生产间隔
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
}
}
private static class Consumer extends Thread {
@Override
public void run() {
while (true) {
try {
// take 方法:队列空时自动阻塞,直到有元素
int value = queue.take();
System.out.println("[消费者] 消费了数据: " + value + ",队列中元素数: " + queue.size());
// 模拟消费间隔(比生产者慢,体验缓冲效果)
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
}
}
public static void main(String[] args) {
System.out.println("===== BlockingQueue 实现生产者消费者模式 =====");
new Producer().start();
new Consumer().start();
}
}
代码要点
| 要点 | 说明 |
|---|---|
queue.put(value) |
队列满时自动阻塞,无需手写等待逻辑 |
queue.take() |
队列空时自动阻塞,无需手写等待逻辑 |
无 synchronized |
所有线程安全由 BlockingQueue 内部保证 |
无 wait/notify |
阻塞逻辑由队列内置的 Condition 管理 |
四、两种实现方式的对比
| 对比维度 | wait/notify | BlockingQueue |
|---|---|---|
| 代码量 | ~60 行核心逻辑 | ~20 行核心逻辑 |
| 线程安全 | 需手动使用 synchronized |
队列内部已实现 |
| 条件等待 | 手动 wait() + while 检查 |
put() / take() 自动阻塞 |
| 唤醒机制 | 手动 notifyAll() |
队列内部使用 Condition.signal() |
| 可读性 | 需要理解底层并发机制 | 声明式,一目了然 |
| 扩展性 | 修改条件逻辑容易出错 | 替换队列实现类即可改变行为 |
| 性能 | 依赖 JVM 锁实现 | 使用更高效的 AQS + LockSupport |
为什么说 BlockingQueue 更好?
-
消除样板代码
wait/notify 实现中,每个读写操作都需要加锁、检查条件、处理等待/唤醒。BlockingQueue 将这些重复劳动封装到了队列内部,开发者只需关注业务逻辑。 -
更可靠的并发语义
即使经验丰富的开发者,在 wait/notify 中使用if而非while、误用notify()而非notifyAll()也是常见错误。BlockingQueue 经过 JDK 团队的严格测试,不存在这类隐患。 -
更细粒度的锁
ArrayBlockingQueue内部使用两把锁(读锁、写锁)和Condition分别管理"队列空"和"队列满"两种等待条件,相比synchronized一把大锁,并发性能更高。 -
灵活替换实现
如果需要优先级队列,把ArrayBlockingQueue换成PriorityBlockingQueue即可;如果需要无界队列,换成LinkedBlockingQueue。无需改动任何业务代码。 -
支持超时控制
offer(e, timeout, unit)和poll(timeout, unit)提供了超时版本,让生产者/消费者可以定义"最多等待多久",避免永久阻塞。这在 wait/notify 中需要额外实现。
五、多生产者多消费者场景
BlockingQueue 的优势在多生产者/多消费者场景下更为明显。以下是一个示例:
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 多生产者多消费者 —— BlockingQueue 实现
*/
public class MultiProducerConsumer {
private static final int CAPACITY = 10;
private static final BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(CAPACITY);
private static final AtomicInteger counter = new AtomicInteger(0);
static class Producer implements Runnable {
private final String name;
Producer(String name) {
this.name = name;
}
@Override
public void run() {
try {
while (!Thread.currentThread().isInterrupted()) {
int value = counter.incrementAndGet();
queue.put(value);
System.out.println(name + " 生产: " + value + " [队列: " + queue.size() + "]");
Thread.sleep((long) (Math.random() * 800));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
static class Consumer implements Runnable {
private final String name;
Consumer(String name) {
this.name = name;
}
@Override
public void run() {
try {
while (!Thread.currentThread().isInterrupted()) {
int value = queue.take();
System.out.println(name + " 消费: " + value + " [队列: " + queue.size() + "]");
Thread.sleep((long) (Math.random() * 1200));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
public static void main(String[] args) throws InterruptedException {
System.out.println("===== 多生产者多消费者 =====");
ExecutorService producers = Executors.newFixedThreadPool(3);
Executors.newFixedThreadPool(2); // 2 个消费者
ExecutorService consumers = Executors.newFixedThreadPool(2);
// 启动 3 个生产者
for (int i = 1; i <= 3; i++) {
producers.submit(new Producer("生产者-" + i));
}
// 启动 2 个消费者
for (int i = 1; i <= 2; i++) {
consumers.submit(new Consumer("消费者-" + i));
}
// 运行 10 秒后关闭
Thread.sleep(10000);
producers.shutdownNow();
consumers.shutdownNow();
System.out.println("系统关闭");
}
}
如果是 wait/notify 实现,需要处理:
- 多个生产者线程同时
wait和notifyAll带来的唤醒风暴 - 正确的条件判断(防止一个生产者被唤醒后空间又被其他生产者占满)
- 虚假唤醒的防御
而 BlockingQueue 内部已经优雅地解决了所有这些复杂性。
六、BlockingQueue 的底层原理(简要)
从 JDK 源码看,ArrayBlockingQueue 的核心实现如下(简化示意):
// ArrayBlockingQueue 内部(简化版)
public class ArrayBlockingQueue<E> {
final ReentrantLock lock;
private final Condition notFull; // "不满"条件
private final Condition notEmpty; // "不空"条件
public void put(E e) throws InterruptedException {
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await(); // 队列满则等待
enqueue(e);
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await(); // 队列空则等待
return dequeue();
} finally {
lock.unlock();
}
}
private void enqueue(E x) {
// ... 实际入队操作
notEmpty.signal(); // 唤醒等待的消费者
}
private E dequeue() {
// ... 实际出队操作
notFull.signal(); // 唤醒等待的生产者
}
}
关键点:
- 使用
ReentrantLock替代synchronized,支持可中断、超时等特性 - 使用 两个
Condition分别管理"队列满"和"队列空",避免了notifyAll()唤醒所有线程的低效问题 —— 生产者满时只等待notFull,消费者空时只等待notEmpty,唤醒也是精准唤醒 - 本质上仍然是 wait/notify 模式,但封装在了经过高度优化的并发容器中
七、总结与最佳实践
何时选择哪种方式?
| 场景 | 推荐方式 |
|---|---|
| 简单生产消费、对并发要求不高 | wait/notify 可学习,但生产环境不推荐 |
| 生产环境、标准生产者消费者 | BlockingQueue — 首选 |
| 需要自定义复杂的缓冲区策略 | 可基于 ReentrantLock + Condition 自行实现 |
| 需要精确控制等待/唤醒行为 | 考虑 Phaser、Exchanger 等更高级的并发工具 |
关键原则
- 优先使用
java.util.concurrent包中的高级并发工具,而非自己编写底层同步代码 - BlockingQueue 是生产者消费者模式的最佳实践,它安全、高效、可读性强
- 理解 wait/notify 机制仍然重要,它是理解 Java 并发锁机制的基础,也是阅读许多开源项目源码的前提
- 对于多生产者多消费者场景,务必使用
BlockingQueue或ReentrantLock+Condition,避免手动synchronized的复杂性
参考资源
- 《Java 并发编程实战》第 5 章 —— 同步工具类
- Java
BlockingQueueJavadoc- JDK 源码:
ArrayBlockingQueue、LinkedBlockingQueue