Java 中的阻塞队列(BlockingQueue)详解
一、什么是 BlockingQueue?
BlockingQueue 是 Java 并发包(java.util.concurrent)中提供的一个接口,它代表一个支持阻塞操作的线程安全队列。它继承自 java.util.Queue,在普通队列的基础上增加了两个核心行为:
| 操作 | 队列满时 | 队列空时 |
|---|---|---|
| put(e) | 阻塞等待,直到有空位 | — |
| take() | — | 阻塞等待,直到有元素 |
| offer(e, time, unit) | 等待指定时间后超时返回 false | — |
| poll(time, unit) | — | 等待指定时间后超时返回 null |
简单来说,当生产者线程试图向已满的队列中放入元素时,它会被阻塞;当消费者线程试图从空队列中取出元素时,它也会被阻塞。这种机制天然地解决了生产者-消费者问题,无需我们手动使用 wait/notify。
核心接口方法
| 操作类型 | 抛异常 | 返回特殊值 | 阻塞 | 超时 |
|---|---|---|---|---|
| 插入 | add(e) |
offer(e) |
put(e) |
offer(e, time, unit) |
| 移除 | remove() |
poll() |
take() |
poll(time, unit) |
| 检查 | element() |
peek() |
不支持 | 不支持 |
- 抛异常:操作失败时抛出
IllegalStateException或NoSuchElementException。 - 返回特殊值:插入返回
false,移除/检查返回null。 - 阻塞:线程一直阻塞直到操作成功。
- 超时:在指定时间内阻塞,超时后放弃并返回特殊值。
二、BlockingQueue 的常见实现类
Java 提供了 7 种阻塞队列实现,下面逐一介绍最常用的 6 种。
1. ArrayBlockingQueue
基于数组实现的有界阻塞队列。内部使用一个定长的数组存储元素,一旦创建容量不可改变。采用 FIFO(先进先出) 顺序。
// 创建一个容量为 10 的队列,默认非公平模式
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
// 创建公平模式的队列(吞吐量更低,但能避免饥饿)
BlockingQueue<Integer> fairQueue = new ArrayBlockingQueue<>(10, true);
特点:
- 有界,必须指定容量
- 只使用一把锁(
ReentrantLock),但通过两个 Condition(notFull、notEmpty)管理 - 支持公平/非公平策略
适用场景: 有界缓冲池、线程池的工作队列。
2. LinkedBlockingQueue
基于链表实现的可选有界阻塞队列。若不指定容量,默认容量为 Integer.MAX_VALUE,相当于无界。
// 无界队列
BlockingQueue<String> unbounded = new LinkedBlockingQueue<>();
// 有界队列,容量 1000
BlockingQueue<String> bounded = new LinkedBlockingQueue<>(1000);
特点:
- 有界/无界可选
- 使用两把锁:
takeLock和putLock,分别控制读取和写入,吞吐量通常高于ArrayBlockingQueue - 链表结构,插入/删除时间复杂度 O(1)
适用场景: 需要高吞吐量的生产者-消费者场景,如 Executors.newFixedThreadPool() 默认使用它。
3. PriorityBlockingQueue
支持优先级排序的无界阻塞队列。元素按照自然顺序或 Comparator 排序,内部使用二叉堆(binary heap)实现。
// 使用自然顺序
BlockingQueue<Task> queue = new PriorityBlockingQueue<>();
// 使用 Comparator
BlockingQueue<Task> queue = new PriorityBlockingQueue<>(11, (a, b) -> b.priority - a.priority);
特点:
- 无界(会动态扩容)
- 排序非严格 FIFO:优先级相同的元素顺序不保证
- 插入的元素必须实现
Comparable,或在构造时提供Comparator - 不允许 null 元素
适用场景: 任务调度、需要按优先级处理的场景。
4. SynchronousQueue
一个不存储元素的阻塞队列。每个 put 操作必须等待一个 take 操作,反之亦然。容量为 0。
BlockingQueue<String> queue = new SynchronousQueue<>();
// 必须配对使用
new Thread(() -> {
try { queue.put("hello"); } catch (InterruptedException e) { }
}).start();
String value = queue.take(); // 与 put 配对
System.out.println(value); // 输出 hello
特点:
- 容量为 0,内部不存储任何元素
- 相当于线程之间的直接握手
- 支持公平/非公平策略
- 非常适合传递性设计(hand-off design)
适用场景: Executors.newCachedThreadPool() 内部使用 SynchronousQueue,当任务到达时直接交给空闲线程处理。
5. DelayQueue
存放实现了 Delayed 接口的元素的无界阻塞队列。只有延迟期满的元素才能被取出。
public class DelayedTask implements Delayed {
private final String name;
private final long startTime; // 到期时间戳
public DelayedTask(String name, long delayMillis) {
this.name = name;
this.startTime = System.currentTimeMillis() + delayMillis;
}
@Override
public long getDelay(TimeUnit unit) {
long remaining = startTime - System.currentTimeMillis();
return unit.convert(remaining, TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
return Long.compare(this.startTime, ((DelayedTask) o).startTime);
}
@Override
public String toString() { return name; }
}
DelayQueue<DelayedTask> queue = new DelayQueue<>();
queue.put(new DelayedTask("任务1", 3000));
queue.put(new DelayedTask("任务2", 1000));
DelayedTask task = queue.take(); // 1 秒后取出"任务2"
System.out.println(task); // 输出:任务2
特点:
- 无界
- 元素必须实现
Delayed接口(继承自Comparable) - 内部使用
PriorityQueue实现排序 take()会阻塞直到有元素到期
适用场景: 定时任务调度、会话过期清理、缓存过期处理。
6. LinkedTransferQueue
基于链表的无界 TransferQueue 实现。在 BlockingQueue 的基础上增加了 transfer、tryTransfer 等方法,支持直接传递语义。
LinkedTransferQueue<String> queue = new LinkedTransferQueue<>();
// transfer:直接交给消费者,如果没有消费者则阻塞
queue.transfer("直接传递");
// tryTransfer:尝试直接传递,没有消费者则返回 false
boolean success = queue.tryTransfer("试试看");
// tryTransfer(timeout):带超时的尝试
boolean success = queue.tryTransfer("超时传递", 2, TimeUnit.SECONDS);
特点:
- 无界
- 融合了
SynchronousQueue的直接传递和LinkedBlockingQueue的缓冲存储 - 吞吐量在三者中通常最高
- 提供
getWaitingConsumerCount()和hasWaitingConsumer()方法
适用场景: 需要精确控制"数据是否已被消费"的消息传递场景。
三、put / take 方法详解
put(e) 和 take() 是 BlockingQueue 最核心的两个阻塞方法。
put 的阻塞原理
// ArrayBlockingQueue.put 简化源码
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length) // 队列已满
notFull.await(); // 在 notFull 条件上等待
enqueue(e); // 入队
} finally {
lock.unlock();
}
}
take 的阻塞原理
// ArrayBlockingQueue.take 简化源码
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) // 队列为空
notEmpty.await(); // 在 notEmpty 条件上等待
return dequeue(); // 出队
} finally {
lock.unlock();
}
}
完整的生产者-消费者示例:
public class ProducerConsumerExample {
private static final BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5);
static class Producer implements Runnable {
@Override
public void run() {
try {
for (int i = 0; i < 10; i++) {
queue.put(i);
System.out.println(Thread.currentThread().getName() + " 生产: " + i);
Thread.sleep(500);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
static class Consumer implements Runnable {
@Override
public void run() {
try {
while (true) {
Integer value = queue.take();
System.out.println(Thread.currentThread().getName() + " 消费: " + value);
Thread.sleep(1000);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
public static void main(String[] args) {
new Thread(new Producer(), "生产者-1").start();
new Thread(new Consumer(), "消费者-1").start();
new Thread(new Consumer(), "消费者-2").start();
}
}
运行效果:
生产者-1 生产: 0
消费者-1 消费: 0
生产者-1 生产: 1
消费者-2 消费: 1
生产者-1 生产: 2 ← 队列满时 put 阻塞,等待消费者取出
消费者-1 消费: 2
...
四、线程安全实现剖析
BlockingQueue 的线程安全依赖于 ReentrantLock + Condition 机制。
ArrayBlockingQueue 的内部结构
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
final Object[] items; // 存储元素的数组
int takeIndex; // 下一个 take 的位置
int putIndex; // 下一个 put 的位置
int count; // 元素数量
final ReentrantLock lock; // 全局锁
private final Condition notEmpty; // 等待队列非空的条件
private final Condition notFull; // 等待队列非满的条件
}
- 一把锁保护所有操作
- 两个 Condition:
notEmpty(消费者等待)、notFull(生产者等待) - 生产者
put时,如果队列满,调用notFull.await()阻塞;插入成功后,调用notEmpty.signal()唤醒消费者 - 消费者
take时,如果队列空,调用notEmpty.await()阻塞;取出成功后,调用notFull.signal()唤醒生产者
LinkedBlockingQueue 的内部结构
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
private final int capacity; // 容量,默认 MAX_VALUE
private final ReentrantLock takeLock = new ReentrantLock(); // take 锁
private final Condition notEmpty = takeLock.newCondition(); // 非空条件
private final ReentrantLock putLock = new ReentrantLock(); // put 锁
private final Condition notFull = putLock.newCondition(); // 非满条件
}
- 两把锁分离了读和写,允许生产者线程和消费者线程并发执行
- 使用
AtomicInteger记录count,保证原子性 - 这种设计使得
LinkedBlockingQueue在并发场景下吞吐量通常优于ArrayBlockingQueue
两种实现的对比
| 特性 | ArrayBlockingQueue | LinkedBlockingQueue |
|---|---|---|
| 数据结构 | 数组(定长) | 链表(动态节点) |
| 容量 | 必须指定,固定 | 可选有界,默认无界 |
| 锁机制 | 单锁 + 双 Condition | 双锁 + 双 Condition |
| 吞吐量 | 中等 | 通常更高 |
| 内存分配 | 预分配,无 GC 压力 | 动态分配,有 GC 开销 |
| 公平策略 | 支持 | 不支持 |
五、如何选择合适的阻塞队列?
| 需求场景 | 推荐实现 |
|---|---|
| 固定大小的有界缓冲池 | ArrayBlockingQueue |
| 高吞吐量的生产者-消费者 | LinkedBlockingQueue(有界或无界) |
| 任务按优先级处理 | PriorityBlockingQueue |
| 线程间直接传递,不缓存 | SynchronousQueue |
| 延迟执行/定时任务 | DelayQueue |
| 需要直接传递且可缓存 | LinkedTransferQueue |
六、使用注意事项
- 不允许 null 元素:所有
BlockingQueue实现都拒绝null元素,插入null会抛出NullPointerException。 - 中断响应:
put/take等阻塞方法都会响应中断(InterruptedException),建议在 catch 块中调用Thread.currentThread().interrupt()恢复中断状态。 - 无界队列的风险:
LinkedBlockingQueue(默认)、PriorityBlockingQueue、DelayQueue都是无界的,如果生产者速度远快于消费者,可能导致内存溢出。 - 容量规划:
ArrayBlockingQueue预分配数组,适合稳定流量;LinkedBlockingQueue动态分配节点,适合流量波动较大的场景。
七、总结
BlockingQueue 是 Java 并发编程中的一把利器,它优雅地解决了生产者-消费者问题。通过阻塞机制,我们无需手动管理 wait/notify,代码更加简洁、健壮。不同实现类各有侧重:
- 需要有界缓冲 →
ArrayBlockingQueue或LinkedBlockingQueue(指定容量) - 需要优先级 →
PriorityBlockingQueue - 需要直接传递 →
SynchronousQueue或LinkedTransferQueue - 需要延迟执行 →
DelayQueue
掌握 BlockingQueue 是深入理解 Java 并发的基础,也是编写高并发、高可靠服务的关键技能。