菜单

Administrator
发布于 2026-05-18 / 1 阅读
0
0

Java 中的阻塞队列 (BlockingQueue)

Java 中的阻塞队列(BlockingQueue)详解

一、什么是 BlockingQueue?

BlockingQueue 是 Java 并发包(java.util.concurrent)中提供的一个接口,它代表一个支持阻塞操作的线程安全队列。它继承自 java.util.Queue,在普通队列的基础上增加了两个核心行为:

操作 队列满时 队列空时
put(e) 阻塞等待,直到有空位
take() 阻塞等待,直到有元素
offer(e, time, unit) 等待指定时间后超时返回 false
poll(time, unit) 等待指定时间后超时返回 null

简单来说,当生产者线程试图向已满的队列中放入元素时,它会被阻塞;当消费者线程试图从空队列中取出元素时,它也会被阻塞。这种机制天然地解决了生产者-消费者问题,无需我们手动使用 wait/notify

核心接口方法

操作类型 抛异常 返回特殊值 阻塞 超时
插入 add(e) offer(e) put(e) offer(e, time, unit)
移除 remove() poll() take() poll(time, unit)
检查 element() peek() 不支持 不支持
  • 抛异常:操作失败时抛出 IllegalStateExceptionNoSuchElementException
  • 返回特殊值:插入返回 false,移除/检查返回 null
  • 阻塞:线程一直阻塞直到操作成功。
  • 超时:在指定时间内阻塞,超时后放弃并返回特殊值。

二、BlockingQueue 的常见实现类

Java 提供了 7 种阻塞队列实现,下面逐一介绍最常用的 6 种。

1. ArrayBlockingQueue

基于数组实现的有界阻塞队列。内部使用一个定长的数组存储元素,一旦创建容量不可改变。采用 FIFO(先进先出) 顺序。

// 创建一个容量为 10 的队列,默认非公平模式
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);

// 创建公平模式的队列(吞吐量更低,但能避免饥饿)
BlockingQueue<Integer> fairQueue = new ArrayBlockingQueue<>(10, true);

特点:

  • 有界,必须指定容量
  • 只使用一把锁ReentrantLock),但通过两个 Condition(notFullnotEmpty)管理
  • 支持公平/非公平策略

适用场景: 有界缓冲池、线程池的工作队列。

2. LinkedBlockingQueue

基于链表实现的可选有界阻塞队列。若不指定容量,默认容量为 Integer.MAX_VALUE,相当于无界。

// 无界队列
BlockingQueue<String> unbounded = new LinkedBlockingQueue<>();

// 有界队列,容量 1000
BlockingQueue<String> bounded = new LinkedBlockingQueue<>(1000);

特点:

  • 有界/无界可选
  • 使用两把锁takeLockputLock,分别控制读取和写入,吞吐量通常高于 ArrayBlockingQueue
  • 链表结构,插入/删除时间复杂度 O(1)

适用场景: 需要高吞吐量的生产者-消费者场景,如 Executors.newFixedThreadPool() 默认使用它。

3. PriorityBlockingQueue

支持优先级排序的无界阻塞队列。元素按照自然顺序或 Comparator 排序,内部使用二叉堆(binary heap)实现。

// 使用自然顺序
BlockingQueue<Task> queue = new PriorityBlockingQueue<>();

// 使用 Comparator
BlockingQueue<Task> queue = new PriorityBlockingQueue<>(11, (a, b) -> b.priority - a.priority);

特点:

  • 无界(会动态扩容)
  • 排序非严格 FIFO:优先级相同的元素顺序不保证
  • 插入的元素必须实现 Comparable,或在构造时提供 Comparator
  • 不允许 null 元素

适用场景: 任务调度、需要按优先级处理的场景。

4. SynchronousQueue

一个不存储元素的阻塞队列。每个 put 操作必须等待一个 take 操作,反之亦然。容量为 0。

BlockingQueue<String> queue = new SynchronousQueue<>();

// 必须配对使用
new Thread(() -> {
    try { queue.put("hello"); } catch (InterruptedException e) { }
}).start();

String value = queue.take();  // 与 put 配对
System.out.println(value);    // 输出 hello

特点:

  • 容量为 0,内部不存储任何元素
  • 相当于线程之间的直接握手
  • 支持公平/非公平策略
  • 非常适合传递性设计(hand-off design)

适用场景: Executors.newCachedThreadPool() 内部使用 SynchronousQueue,当任务到达时直接交给空闲线程处理。

5. DelayQueue

存放实现了 Delayed 接口的元素的无界阻塞队列。只有延迟期满的元素才能被取出。

public class DelayedTask implements Delayed {
    private final String name;
    private final long startTime;  // 到期时间戳

    public DelayedTask(String name, long delayMillis) {
        this.name = name;
        this.startTime = System.currentTimeMillis() + delayMillis;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        long remaining = startTime - System.currentTimeMillis();
        return unit.convert(remaining, TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        return Long.compare(this.startTime, ((DelayedTask) o).startTime);
    }

    @Override
    public String toString() { return name; }
}
DelayQueue<DelayedTask> queue = new DelayQueue<>();
queue.put(new DelayedTask("任务1", 3000));
queue.put(new DelayedTask("任务2", 1000));

DelayedTask task = queue.take();  // 1 秒后取出"任务2"
System.out.println(task);         // 输出:任务2

特点:

  • 无界
  • 元素必须实现 Delayed 接口(继承自 Comparable
  • 内部使用 PriorityQueue 实现排序
  • take() 会阻塞直到有元素到期

适用场景: 定时任务调度、会话过期清理、缓存过期处理。

6. LinkedTransferQueue

基于链表的无界 TransferQueue 实现。在 BlockingQueue 的基础上增加了 transfertryTransfer 等方法,支持直接传递语义。

LinkedTransferQueue<String> queue = new LinkedTransferQueue<>();

// transfer:直接交给消费者,如果没有消费者则阻塞
queue.transfer("直接传递");

// tryTransfer:尝试直接传递,没有消费者则返回 false
boolean success = queue.tryTransfer("试试看");

// tryTransfer(timeout):带超时的尝试
boolean success = queue.tryTransfer("超时传递", 2, TimeUnit.SECONDS);

特点:

  • 无界
  • 融合了 SynchronousQueue直接传递LinkedBlockingQueue缓冲存储
  • 吞吐量在三者中通常最高
  • 提供 getWaitingConsumerCount()hasWaitingConsumer() 方法

适用场景: 需要精确控制"数据是否已被消费"的消息传递场景。


三、put / take 方法详解

put(e)take() 是 BlockingQueue 最核心的两个阻塞方法。

put 的阻塞原理

// ArrayBlockingQueue.put 简化源码
public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)      // 队列已满
            notFull.await();               // 在 notFull 条件上等待
        enqueue(e);                        // 入队
    } finally {
        lock.unlock();
    }
}

take 的阻塞原理

// ArrayBlockingQueue.take 简化源码
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)                 // 队列为空
            notEmpty.await();              // 在 notEmpty 条件上等待
        return dequeue();                  // 出队
    } finally {
        lock.unlock();
    }
}

完整的生产者-消费者示例:

public class ProducerConsumerExample {

    private static final BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5);

    static class Producer implements Runnable {
        @Override
        public void run() {
            try {
                for (int i = 0; i < 10; i++) {
                    queue.put(i);
                    System.out.println(Thread.currentThread().getName() + " 生产: " + i);
                    Thread.sleep(500);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    static class Consumer implements Runnable {
        @Override
        public void run() {
            try {
                while (true) {
                    Integer value = queue.take();
                    System.out.println(Thread.currentThread().getName() + " 消费: " + value);
                    Thread.sleep(1000);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    public static void main(String[] args) {
        new Thread(new Producer(), "生产者-1").start();
        new Thread(new Consumer(), "消费者-1").start();
        new Thread(new Consumer(), "消费者-2").start();
    }
}

运行效果:

生产者-1 生产: 0
消费者-1 消费: 0
生产者-1 生产: 1
消费者-2 消费: 1
生产者-1 生产: 2    ← 队列满时 put 阻塞,等待消费者取出
消费者-1 消费: 2
...

四、线程安全实现剖析

BlockingQueue 的线程安全依赖于 ReentrantLock + Condition 机制。

ArrayBlockingQueue 的内部结构

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    final Object[] items;          // 存储元素的数组
    int takeIndex;                 // 下一个 take 的位置
    int putIndex;                  // 下一个 put 的位置
    int count;                     // 元素数量

    final ReentrantLock lock;      // 全局锁
    private final Condition notEmpty;  // 等待队列非空的条件
    private final Condition notFull;   // 等待队列非满的条件
}
  • 一把锁保护所有操作
  • 两个 ConditionnotEmpty(消费者等待)、notFull(生产者等待)
  • 生产者 put 时,如果队列满,调用 notFull.await() 阻塞;插入成功后,调用 notEmpty.signal() 唤醒消费者
  • 消费者 take 时,如果队列空,调用 notEmpty.await() 阻塞;取出成功后,调用 notFull.signal() 唤醒生产者

LinkedBlockingQueue 的内部结构

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    private final int capacity;              // 容量,默认 MAX_VALUE

    private final ReentrantLock takeLock = new ReentrantLock();  // take 锁
    private final Condition notEmpty = takeLock.newCondition();  // 非空条件

    private final ReentrantLock putLock = new ReentrantLock();   // put 锁
    private final Condition notFull = putLock.newCondition();    // 非满条件
}
  • 两把锁分离了读和写,允许生产者线程和消费者线程并发执行
  • 使用 AtomicInteger 记录 count,保证原子性
  • 这种设计使得 LinkedBlockingQueue 在并发场景下吞吐量通常优于 ArrayBlockingQueue

两种实现的对比

特性 ArrayBlockingQueue LinkedBlockingQueue
数据结构 数组(定长) 链表(动态节点)
容量 必须指定,固定 可选有界,默认无界
锁机制 单锁 + 双 Condition 双锁 + 双 Condition
吞吐量 中等 通常更高
内存分配 预分配,无 GC 压力 动态分配,有 GC 开销
公平策略 支持 不支持

五、如何选择合适的阻塞队列?

需求场景 推荐实现
固定大小的有界缓冲池 ArrayBlockingQueue
高吞吐量的生产者-消费者 LinkedBlockingQueue(有界或无界)
任务按优先级处理 PriorityBlockingQueue
线程间直接传递,不缓存 SynchronousQueue
延迟执行/定时任务 DelayQueue
需要直接传递且可缓存 LinkedTransferQueue

六、使用注意事项

  1. 不允许 null 元素:所有 BlockingQueue 实现都拒绝 null 元素,插入 null 会抛出 NullPointerException
  2. 中断响应put/take 等阻塞方法都会响应中断(InterruptedException),建议在 catch 块中调用 Thread.currentThread().interrupt() 恢复中断状态。
  3. 无界队列的风险LinkedBlockingQueue(默认)、PriorityBlockingQueueDelayQueue 都是无界的,如果生产者速度远快于消费者,可能导致内存溢出。
  4. 容量规划ArrayBlockingQueue 预分配数组,适合稳定流量;LinkedBlockingQueue 动态分配节点,适合流量波动较大的场景。

七、总结

BlockingQueue 是 Java 并发编程中的一把利器,它优雅地解决了生产者-消费者问题。通过阻塞机制,我们无需手动管理 wait/notify,代码更加简洁、健壮。不同实现类各有侧重:

  • 需要有界缓冲ArrayBlockingQueueLinkedBlockingQueue(指定容量)
  • 需要优先级PriorityBlockingQueue
  • 需要直接传递SynchronousQueueLinkedTransferQueue
  • 需要延迟执行DelayQueue

掌握 BlockingQueue 是深入理解 Java 并发的基础,也是编写高并发、高可靠服务的关键技能。


评论