菜单

Administrator
发布于 2026-05-15 / 1 阅读
0
0

Java中的阻塞队列

Java 中的阻塞队列(BlockingQueue)详解

一、什么是阻塞队列?

阻塞队列(BlockingQueue) 是一个支持两个附加操作的队列:

  1. 支持阻塞的插入:当队列满时,插入元素的线程会被阻塞,直到队列有空闲位置。
  2. 支持阻塞的移除:当队列空时,获取元素的线程会被阻塞,直到队列中有可用元素。

BlockingQueuejava.util.concurrent 包下的接口,继承自 Queue 接口。


二、BlockingQueue 的四种操作方式

操作类型 抛出异常 返回特殊值 一直阻塞 超时退出
插入 add(e) offer(e) put(e) offer(e, time, unit)
移除 remove() poll() take() poll(time, unit)
检查 element() peek()

说明:

  • 抛出异常:队列满时 add()IllegalStateException: Queue full;空队列 remove()NoSuchElementException
  • 返回特殊值offer() 返回 falsepoll() 返回 null
  • 一直阻塞put()take() 会阻塞直到操作成功
  • 超时退出:在指定时间内未成功则返回特殊值

注意: BlockingQueue 不允许插入 null,否则抛出 NullPointerException


三、Java 提供的 7 种阻塞队列实现

JDK 提供了 7 种 BlockingQueue 实现类

实现类 特性 适用场景
ArrayBlockingQueue 有界、数组、FIFO、单锁 固定容量高性能场景
LinkedBlockingQueue 可选有界/无界、链表、双锁分离 容量可扩展的生产者消费者
PriorityBlockingQueue 无界、优先级排序(最小堆) 按优先级处理的任务
DelayQueue 无界、延迟元素 定时/延迟任务、缓存过期
SynchronousQueue 零容量、一对一传输 高吞吐量直接交换
LinkedTransferQueue 无界、预占模式 高并发匹配传输
LinkedBlockingDeque 双向链表、双端操作 工作窃取模式

四、各实现类详解

4.1 ArrayBlockingQueue

  • 基于数组实现的有界阻塞队列,容量固定初始化后不可变
  • FIFO 顺序(先进先出)
  • 支持公平/非公平锁(公平模式下线程按 FIFO 顺序获取锁,减少饥饿)

核心源码片段:

// put 方法(阻塞插入)
public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            notFull.await();    // 队列满则阻塞
        enqueue(e);
    } finally { lock.unlock(); }
}

// take 方法(阻塞移除)
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await();   // 队列空则阻塞
        return dequeue();
    } finally { lock.unlock(); }
}

示例:

BlockingQueue<String> queue = new ArrayBlockingQueue<>(1024);
queue.put("Hello");
String msg = queue.take();

4.2 LinkedBlockingQueue

  • 基于单向链表实现的阻塞队列
  • 可选择有界(指定容量)或无界(默认 Integer.MAX_VALUE,建议指定容量防 OOM)
  • 锁分离putLocktakeLock 分别控制入队和出队,减少锁竞争
  • FIFO 顺序

示例:

BlockingQueue<String> queue = new LinkedBlockingQueue<>(1024);
queue.put("Value");
String value = queue.take();

4.3 PriorityBlockingQueue

  • 无界优先级队列(可能 OOM)
  • 元素按自然顺序或 Comparator 排序
  • 内部基于最小二叉堆实现
  • 不允许 null,元素需实现 Comparable

示例:

BlockingQueue<Task> queue = new PriorityBlockingQueue<>();
queue.put(new Task(3, "low"));
queue.put(new Task(1, "high"));  // 优先级高的先被取出

4.4 DelayQueue

  • 无界延迟队列
  • 元素必须实现 Delayed 接口
  • 仅当 getDelay() 返回 ≤ 0 时才能被取出
  • 适用于:缓存过期清理、定时任务、会话超时

示例:

DelayQueue<DelayedElement> queue = new DelayQueue<>();
queue.put(new DelayedElement(5000, "task"));  // 5秒后到期
DelayedElement e = queue.take();  // 阻塞直到到期

4.5 SynchronousQueue

  • 不存储元素,内部容量为 0
  • 每个 put() 必须等待一个 take(),反之亦然
  • 更像是一个汇合点而非传统队列
  • 两种模式:公平(TransferQueue,FIFO)和非公平(TransferStack,LIFO)
  • 吞吐量高于 ArrayBlockingQueue / LinkedBlockingQueue

示例:

BlockingQueue<String> queue = new SynchronousQueue<>();
// 线程1: queue.put("A");   // 阻塞直到有线程 take
// 线程2: String s = queue.take(); // 阻塞直到有线程 put

4.6 LinkedTransferQueue

  • 无界链表实现
  • 采用预占模式:消费者取元素时若队列为空,生成 null 节点并等待;生产者见 null 节点直接匹配
  • 实现了 TransferQueue 接口,提供 transfer()tryTransfer() 方法

4.7 LinkedBlockingDeque

  • 双向链表实现的阻塞双端队列
  • 支持从两端插入/移除(addFirst/addLast 等)
  • 适用于工作窃取(Work Stealing)模式

示例:

BlockingDeque<String> deque = new LinkedBlockingDeque<>();
deque.addFirst("1");
deque.addLast("2");
String first = deque.takeFirst();  // "1"
String last = deque.takeLast();    // "2"

五、BlockingDeque 接口

BlockingDeque 继承自 BlockingQueue,表示线程安全的双端队列。对两端分别提供增删查操作:

操作 队首 队尾
抛出异常 addFirst(e) / removeFirst() / getFirst() addLast(e) / removeLast() / getLast()
特殊值 offerFirst(e) / pollFirst() / peekFirst() offerLast(e) / pollLast() / peekLast()
阻塞 putFirst(e) / takeFirst() putLast(e) / takeLast()
超时 offerFirst(e, t, u) / pollFirst(t, u) offerLast(e, t, u) / pollLast(t, u)

六、使用场景

  1. 生产者-消费者模式:最经典场景,生产者和消费者通过阻塞队列解耦
  2. 线程池任务队列ThreadPoolExecutor 内部使用 BlockingQueue 保存待执行任务
  3. 消息中间件:内存中的消息队列
  4. 限流与背压:通过有界队列控制流量
  5. 定时/延迟任务:使用 DelayQueue 实现

评论