Java 使用阻塞队列实现生产者消费者模式
一、什么是生产者消费者模式?
生产者消费者模式是一种经典的多线程协作设计模式。它通过一个共享缓冲区(通常是一个队列)将"生产数据"和"消费数据"的线程解耦:
- 生产者线程:负责生产数据,将数据放入共享缓冲区
- 消费者线程:负责消费数据,从共享缓冲区取出数据
- 共享缓冲区(阻塞队列):协调生产者与消费者之间的速率差异
核心规则:
- 当缓冲区满时,生产者线程被阻塞,直到有消费者取出数据
- 当缓冲区空时,消费者线程被阻塞,直到有生产者放入数据
二、为什么使用 BlockingQueue?
Java 的 BlockingQueue 接口(位于 java.util.concurrent 包)提供了线程安全的阻塞操作:
| 方法 | 说明 |
|---|---|
put(E e) |
队列满时阻塞,直到有空位 |
take() |
队列空时阻塞,直到有元素 |
offer(E e, long timeout, TimeUnit unit) |
超时插入 |
poll(long timeout, TimeUnit unit) |
超时取出 |
使用 BlockingQueue 实现生产者消费者模式的优点:
- ✅ 代码极简,无需手动管理锁和条件变量
- ✅ 内置线程安全,避免并发 bug
- ✅ 支持多种队列实现,灵活适应不同场景
- ✅ 避免 wait/notify 的常见陷阱(假死、早期通知等)
三、三种实现方式对比
| 方式 | 关键方法 | 优点 | 缺点 |
|---|---|---|---|
| Object.wait/notify | wait(), notifyAll() |
基础方式,理解原理 | 代码复杂,易出错 |
| Lock + Condition | await(), signalAll() |
精细控制唤醒类型 | 仍需手动管理锁 |
| BlockingQueue ✅ 推荐 | put(), take() |
代码最简洁,开箱即用 | 需理解阻塞队列原理 |
四、基于 BlockingQueue 的完整实现
4.1 基础版:单生产者 + 单消费者
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
// 生产者
class Producer implements Runnable {
private final BlockingQueue<Integer> queue;
public Producer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
for (int i = 0; i < 10; i++) {
System.out.println("生产者生产: " + i);
queue.put(i); // 队列满时自动阻塞
Thread.sleep(500); // 模拟生产耗时
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
// 消费者
class Consumer implements Runnable {
private final BlockingQueue<Integer> queue;
public Consumer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while (true) {
Integer data = queue.take(); // 队列空时自动阻塞
System.out.println("消费者消费: " + data);
Thread.sleep(1000); // 模拟消费耗时
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
// 主程序
public class ProducerConsumerDemo {
public static void main(String[] args) {
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(5); // 容量为5
Producer producer = new Producer(queue);
Consumer consumer = new Consumer(queue);
new Thread(producer, "生产者").start();
new Thread(consumer, "消费者").start();
}
}
4.2 进阶版:多生产者 + 多消费者
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
// 日志事件类
class LogEvent {
private final String timestamp;
private final String level;
private final String message;
private static final AtomicInteger counter = new AtomicInteger(0);
private final int id;
public LogEvent(String level, String message) {
this.id = counter.incrementAndGet();
this.timestamp = java.time.LocalDateTime.now().toString();
this.level = level;
this.message = message;
}
@Override
public String toString() {
return String.format("[%s] #%d %s - %s", timestamp, id, level, message);
}
}
// 生产者
class MultiProducer implements Runnable {
private final BlockingQueue<LogEvent> queue;
private final String name;
public MultiProducer(BlockingQueue<LogEvent> queue, String name) {
this.queue = queue;
this.name = name;
}
@Override
public void run() {
try {
for (int i = 0; i < 5; i++) {
LogEvent event = new LogEvent("INFO", name + " 生产事件 #" + i);
queue.put(event);
System.out.println("[" + name + "] 生产: " + event);
Thread.sleep(200);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
// 消费者
class MultiConsumer implements Runnable {
private final BlockingQueue<LogEvent> queue;
private final String name;
public MultiConsumer(BlockingQueue<LogEvent> queue, String name) {
this.queue = queue;
this.name = name;
}
@Override
public void run() {
try {
while (true) {
LogEvent event = queue.take();
System.out.println("[" + name + "] 消费: " + event);
Thread.sleep(500);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
// 主程序
public class MultiProducerConsumerDemo {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<LogEvent> queue = new ArrayBlockingQueue<>(10);
// 启动3个生产者
for (int i = 1; i <= 3; i++) {
new Thread(new MultiProducer(queue, "生产者" + i)).start();
}
// 启动2个消费者
for (int i = 1; i <= 2; i++) {
new Thread(new MultiConsumer(queue, "消费者" + i)).start();
}
// 运行10秒后退出
Thread.sleep(10000);
System.out.println("主程序退出");
System.exit(0);
}
}
4.3 实用版:含优雅关闭的生产者消费者
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
class ShutdownProducer implements Runnable {
private final BlockingQueue<String> queue;
private final AtomicBoolean running;
public ShutdownProducer(BlockingQueue<String> queue, AtomicBoolean running) {
this.queue = queue;
this.running = running;
}
@Override
public void run() {
try {
int count = 0;
while (running.get()) {
String data = "消息-" + (++count);
queue.put(data);
System.out.println("[生产者] 发送: " + data);
Thread.sleep(300);
}
System.out.println("[生产者] 已关闭");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
class ShutdownConsumer implements Runnable {
private final BlockingQueue<String> queue;
private final AtomicBoolean running;
private static final String POISON_PILL = "POISON";
public ShutdownConsumer(BlockingQueue<String> queue, AtomicBoolean running) {
this.queue = queue;
this.running = running;
}
@Override
public void run() {
try {
while (running.get() || !queue.isEmpty()) {
String data = queue.poll(1, TimeUnit.SECONDS);
if (data != null) {
System.out.println("[消费者] 收到: " + data);
Thread.sleep(500);
}
}
System.out.println("[消费者] 已关闭");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
public class GracefulShutdownDemo {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> queue = new LinkedBlockingQueue<>(20);
AtomicBoolean running = new AtomicBoolean(true);
Thread producer = new Thread(new ShutdownProducer(queue, running));
Thread consumer = new Thread(new ShutdownConsumer(queue, running));
producer.start();
consumer.start();
// 运行5秒后优雅关闭
Thread.sleep(5000);
System.out.println("=== 准备关闭 ===");
running.set(false);
producer.join();
consumer.join();
System.out.println("=== 已安全关闭 ===");
}
}
五、选择合适的阻塞队列
| 队列类型 | 适用生产者消费者场景 |
|---|---|
LinkedBlockingQueue |
最常用,可选有界/无界,适合大多数场景 |
ArrayBlockingQueue |
固定容量、高性能场景 |
PriorityBlockingQueue |
需要按优先级消费的场景 |
DelayQueue |
需要延迟消费的场景(如订单超时取消) |
SynchronousQueue |
零缓存、直接交付场景 |
六、最佳实践
- 始终指定容量:使用
LinkedBlockingQueue时务必设置容量上限,防止内存溢出 - 使用 try-catch 处理中断:
put()和take()会抛出InterruptedException - 优雅关闭:通过标志位 + 超时 poll 实现安全关闭
- 监控队列大小:生产中监控队列积压情况,及时发现问题
- 选择合适的队列:根据吞吐量、优先级需求等因素选择实现类