Java 中的阻塞队列(BlockingQueue)详解
一、什么是阻塞队列?
阻塞队列(BlockingQueue) 是一个支持两个附加操作的队列:
- 支持阻塞的插入:当队列满时,插入元素的线程会被阻塞,直到队列有空闲位置。
- 支持阻塞的移除:当队列空时,获取元素的线程会被阻塞,直到队列中有可用元素。
BlockingQueue 是 java.util.concurrent 包下的接口,继承自 Queue 接口。
二、BlockingQueue 的四种操作方式
| 操作类型 | 抛出异常 | 返回特殊值 | 一直阻塞 | 超时退出 |
|---|---|---|---|---|
| 插入 | add(e) |
offer(e) |
put(e) |
offer(e, time, unit) |
| 移除 | remove() |
poll() |
take() |
poll(time, unit) |
| 检查 | element() |
peek() |
— | — |
说明:
- 抛出异常:队列满时
add()抛IllegalStateException: Queue full;空队列remove()抛NoSuchElementException - 返回特殊值:
offer()返回false,poll()返回null - 一直阻塞:
put()和take()会阻塞直到操作成功 - 超时退出:在指定时间内未成功则返回特殊值
注意: BlockingQueue 不允许插入 null,否则抛出
NullPointerException。
三、Java 提供的 7 种阻塞队列实现
JDK 提供了 7 种 BlockingQueue 实现类:
| 实现类 | 特性 | 适用场景 |
|---|---|---|
ArrayBlockingQueue |
有界、数组、FIFO、单锁 | 固定容量高性能场景 |
LinkedBlockingQueue |
可选有界/无界、链表、双锁分离 | 容量可扩展的生产者消费者 |
PriorityBlockingQueue |
无界、优先级排序(最小堆) | 按优先级处理的任务 |
DelayQueue |
无界、延迟元素 | 定时/延迟任务、缓存过期 |
SynchronousQueue |
零容量、一对一传输 | 高吞吐量直接交换 |
LinkedTransferQueue |
无界、预占模式 | 高并发匹配传输 |
LinkedBlockingDeque |
双向链表、双端操作 | 工作窃取模式 |
四、各实现类详解
4.1 ArrayBlockingQueue
- 基于数组实现的有界阻塞队列,容量固定初始化后不可变
- FIFO 顺序(先进先出)
- 支持公平/非公平锁(公平模式下线程按 FIFO 顺序获取锁,减少饥饿)
核心源码片段:
// put 方法(阻塞插入)
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await(); // 队列满则阻塞
enqueue(e);
} finally { lock.unlock(); }
}
// take 方法(阻塞移除)
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await(); // 队列空则阻塞
return dequeue();
} finally { lock.unlock(); }
}
示例:
BlockingQueue<String> queue = new ArrayBlockingQueue<>(1024);
queue.put("Hello");
String msg = queue.take();
4.2 LinkedBlockingQueue
- 基于单向链表实现的阻塞队列
- 可选择有界(指定容量)或无界(默认
Integer.MAX_VALUE,建议指定容量防 OOM) - 锁分离:
putLock和takeLock分别控制入队和出队,减少锁竞争 - FIFO 顺序
示例:
BlockingQueue<String> queue = new LinkedBlockingQueue<>(1024);
queue.put("Value");
String value = queue.take();
4.3 PriorityBlockingQueue
- 无界优先级队列(可能 OOM)
- 元素按自然顺序或
Comparator排序 - 内部基于最小二叉堆实现
- 不允许 null,元素需实现
Comparable
示例:
BlockingQueue<Task> queue = new PriorityBlockingQueue<>();
queue.put(new Task(3, "low"));
queue.put(new Task(1, "high")); // 优先级高的先被取出
4.4 DelayQueue
- 无界延迟队列
- 元素必须实现
Delayed接口 - 仅当
getDelay()返回 ≤ 0 时才能被取出 - 适用于:缓存过期清理、定时任务、会话超时
示例:
DelayQueue<DelayedElement> queue = new DelayQueue<>();
queue.put(new DelayedElement(5000, "task")); // 5秒后到期
DelayedElement e = queue.take(); // 阻塞直到到期
4.5 SynchronousQueue
- 不存储元素,内部容量为 0
- 每个
put()必须等待一个take(),反之亦然 - 更像是一个汇合点而非传统队列
- 两种模式:公平(TransferQueue,FIFO)和非公平(TransferStack,LIFO)
- 吞吐量高于 ArrayBlockingQueue / LinkedBlockingQueue
示例:
BlockingQueue<String> queue = new SynchronousQueue<>();
// 线程1: queue.put("A"); // 阻塞直到有线程 take
// 线程2: String s = queue.take(); // 阻塞直到有线程 put
4.6 LinkedTransferQueue
- 无界链表实现
- 采用预占模式:消费者取元素时若队列为空,生成 null 节点并等待;生产者见 null 节点直接匹配
- 实现了
TransferQueue接口,提供transfer()和tryTransfer()方法
4.7 LinkedBlockingDeque
- 双向链表实现的阻塞双端队列
- 支持从两端插入/移除(
addFirst/addLast等) - 适用于工作窃取(Work Stealing)模式
示例:
BlockingDeque<String> deque = new LinkedBlockingDeque<>();
deque.addFirst("1");
deque.addLast("2");
String first = deque.takeFirst(); // "1"
String last = deque.takeLast(); // "2"
五、BlockingDeque 接口
BlockingDeque 继承自 BlockingQueue,表示线程安全的双端队列。对两端分别提供增删查操作:
| 操作 | 队首 | 队尾 |
|---|---|---|
| 抛出异常 | addFirst(e) / removeFirst() / getFirst() |
addLast(e) / removeLast() / getLast() |
| 特殊值 | offerFirst(e) / pollFirst() / peekFirst() |
offerLast(e) / pollLast() / peekLast() |
| 阻塞 | putFirst(e) / takeFirst() |
putLast(e) / takeLast() |
| 超时 | offerFirst(e, t, u) / pollFirst(t, u) |
offerLast(e, t, u) / pollLast(t, u) |
六、使用场景
- 生产者-消费者模式:最经典场景,生产者和消费者通过阻塞队列解耦
- 线程池任务队列:
ThreadPoolExecutor内部使用 BlockingQueue 保存待执行任务 - 消息中间件:内存中的消息队列
- 限流与背压:通过有界队列控制流量
- 定时/延迟任务:使用 DelayQueue 实现