菜单

Administrator
发布于 2026-05-18 / 0 阅读
0
0

Java 使用阻塞队列实现生产者消费者模式

Java 使用阻塞队列实现生产者消费者模式

一、什么是生产者消费者模式

生产者消费者模式(Producer-Consumer Pattern)是并发编程中最经典的设计模式之一。它解决的是一个共享数据缓冲区的协调问题:

  • 生产者(Producer):负责生成数据,将数据放入共享缓冲区
  • 消费者(Consumer):负责处理数据,从共享缓冲区取出数据
  • 共享缓冲区:生产者与消费者之间的中介容器,通常是一个线程安全的队列
  生产者线程  ——→  [共享缓冲区(阻塞队列)]  ——→  消费者线程

为什么需要这个模式?

在没有缓冲区的情况下,生产者和消费者必须相互等待,导致耦合度高、效率低下。引入缓冲区后带来了以下好处:

优势 说明
解耦 生产者和消费者不直接依赖,只需知道缓冲区接口
削峰填谷 生产者生产速度快时,数据暂存在缓冲区,消费者可以慢慢处理
支持忙闲不均 生产者和消费者可以有不同的处理速度,互不阻塞等待

核心问题

生产者消费者模式需要解决的核心问题是线程间的协调与同步

  1. 当缓冲区已满时,生产者必须等待,直到消费者消费了数据腾出空间
  2. 当缓冲区为空时,消费者必须等待,直到生产者生产了新的数据
  3. 多个生产者/消费者同时操作缓冲区时,必须保证线程安全

在 Java 中,有两种典型的实现方式:wait/notify 机制阻塞队列(BlockingQueue)。下面通过完整的代码示例来对比这两种方式。


二、基于 wait/notify 的实现

这是最底层的实现方式,使用 synchronized + wait() / notifyAll() 手工控制线程的等待与唤醒。

原理

  • 使用 synchronized 保证对缓冲区的互斥访问
  • 当条件不满足时(缓冲区满/空),调用 wait() 让当前线程进入等待集
  • 当条件可能满足时,调用 notifyAll() 唤醒所有等待线程

完整代码

import java.util.LinkedList;
import java.util.Queue;

/**
 * 使用 wait/notify 实现生产者消费者模式
 */
public class WaitNotifyProducerConsumer {

    private static final int CAPACITY = 5;  // 缓冲区容量
    private static final Queue<Integer> buffer = new LinkedList<>();

    private static class Producer extends Thread {
        @Override
        public void run() {
            int value = 0;
            while (true) {
                synchronized (buffer) {
                    // 1. 缓冲区满时,生产者等待
                    while (buffer.size() == CAPACITY) {
                        try {
                            System.out.println("[生产者] 缓冲区已满,等待消费...");
                            buffer.wait();
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                            return;
                        }
                    }

                    // 2. 生产数据并放入缓冲区
                    value++;
                    buffer.offer(value);
                    System.out.println("[生产者] 生产了数据: " + value + ",当前大小: " + buffer.size());

                    // 3. 唤醒可能等待的消费者
                    buffer.notifyAll();
                }

                // 模拟生产间隔
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    return;
                }
            }
        }
    }

    private static class Consumer extends Thread {
        @Override
        public void run() {
            while (true) {
                synchronized (buffer) {
                    // 1. 缓冲区空时,消费者等待
                    while (buffer.isEmpty()) {
                        try {
                            System.out.println("[消费者] 缓冲区为空,等待生产...");
                            buffer.wait();
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                            return;
                        }
                    }

                    // 2. 从缓冲区取数据并消费
                    int value = buffer.poll();
                    System.out.println("[消费者] 消费了数据: " + value + ",剩余: " + buffer.size());

                    // 3. 唤醒可能等待的生产者
                    buffer.notifyAll();
                }

                // 模拟消费间隔(比生产者慢,体验缓冲效果)
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    return;
                }
            }
        }
    }

    public static void main(String[] args) {
        System.out.println("===== wait/notify 实现生产者消费者模式 =====");
        new Producer().start();
        new Consumer().start();
    }
}

代码要点分析

⚠️ 必须使用 while 而非 if 来检查条件

线程被唤醒后,条件可能再次不满足(虚假唤醒),因此必须在循环中重新检查条件。
这是《Java 并发编程实战》中明确要求的编程规范。

要点 说明
synchronized (buffer) 使用共享缓冲区对象作为锁,保证互斥
buffer.wait() 释放锁,线程进入等待状态
buffer.notifyAll() 唤醒所有等待线程,而非 notify()(避免信号丢失)
while 循环 防止虚假唤醒,条件不满足时继续等待

wait/notify 的痛点

  1. 代码繁琐:每个条件判断、等待、唤醒都需要手动编写
  2. 容易出错notify()notifyAll() 用错可能导致死锁或信号丢失
  3. 锁粒度粗:整个缓冲区操作都在同步块中,并发度受限
  4. 条件管理复杂:多个条件共用一把锁,notifyAll() 会唤醒所有等待线程,造成不必要的上下文切换

三、基于 BlockingQueue 的实现

BlockingQueuejava.util.concurrent 包提供的线程安全队列,它内置了等待/通知机制,开发者无需手动处理复杂的同步逻辑。

什么是 BlockingQueue?

BlockingQueue 是一个接口,它扩展了 Queue,增加了阻塞的存取方法:

方法 含义
put(E e) 队列满时阻塞,直到有空间
take() 队列空时阻塞,直到有元素
offer(E e, long timeout, TimeUnit unit) 超时版入队
poll(long timeout, TimeUnit unit) 超时版出队

常用实现类

实现类 特点
ArrayBlockingQueue 基于数组的有界阻塞队列,FIFO
LinkedBlockingQueue 基于链表的可选有界阻塞队列,FIFO
PriorityBlockingQueue 支持优先级排序的无界阻塞队列
SynchronousQueue 不存储元素的阻塞队列,每个 put 必须等待一个 take
DelayQueue 支持延迟获取元素的无界阻塞队列

完整代码

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

/**
 * 使用 BlockingQueue 实现生产者消费者模式
 */
public class BlockingQueueProducerConsumer {

    private static final int CAPACITY = 5;  // 缓冲区容量
    private static final BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(CAPACITY);

    private static class Producer extends Thread {
        @Override
        public void run() {
            int value = 0;
            while (true) {
                try {
                    value++;
                    // put 方法:队列满时自动阻塞,直到有空间
                    queue.put(value);
                    System.out.println("[生产者] 生产了数据: " + value + ",队列中元素数: " + queue.size());

                    // 模拟生产间隔
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    return;
                }
            }
        }
    }

    private static class Consumer extends Thread {
        @Override
        public void run() {
            while (true) {
                try {
                    // take 方法:队列空时自动阻塞,直到有元素
                    int value = queue.take();
                    System.out.println("[消费者] 消费了数据: " + value + ",队列中元素数: " + queue.size());

                    // 模拟消费间隔(比生产者慢,体验缓冲效果)
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    return;
                }
            }
        }
    }

    public static void main(String[] args) {
        System.out.println("===== BlockingQueue 实现生产者消费者模式 =====");
        new Producer().start();
        new Consumer().start();
    }
}

代码要点

要点 说明
queue.put(value) 队列满时自动阻塞,无需手写等待逻辑
queue.take() 队列空时自动阻塞,无需手写等待逻辑
synchronized 所有线程安全由 BlockingQueue 内部保证
wait/notify 阻塞逻辑由队列内置的 Condition 管理

四、两种实现方式的对比

对比维度 wait/notify BlockingQueue
代码量 ~60 行核心逻辑 ~20 行核心逻辑
线程安全 需手动使用 synchronized 队列内部已实现
条件等待 手动 wait() + while 检查 put() / take() 自动阻塞
唤醒机制 手动 notifyAll() 队列内部使用 Condition.signal()
可读性 需要理解底层并发机制 声明式,一目了然
扩展性 修改条件逻辑容易出错 替换队列实现类即可改变行为
性能 依赖 JVM 锁实现 使用更高效的 AQS + LockSupport

为什么说 BlockingQueue 更好?

  1. 消除样板代码
    wait/notify 实现中,每个读写操作都需要加锁、检查条件、处理等待/唤醒。BlockingQueue 将这些重复劳动封装到了队列内部,开发者只需关注业务逻辑。

  2. 更可靠的并发语义
    即使经验丰富的开发者,在 wait/notify 中使用 if 而非 while、误用 notify() 而非 notifyAll() 也是常见错误。BlockingQueue 经过 JDK 团队的严格测试,不存在这类隐患。

  3. 更细粒度的锁
    ArrayBlockingQueue 内部使用两把锁(读锁、写锁)和 Condition 分别管理"队列空"和"队列满"两种等待条件,相比 synchronized 一把大锁,并发性能更高。

  4. 灵活替换实现
    如果需要优先级队列,把 ArrayBlockingQueue 换成 PriorityBlockingQueue 即可;如果需要无界队列,换成 LinkedBlockingQueue。无需改动任何业务代码。

  5. 支持超时控制
    offer(e, timeout, unit)poll(timeout, unit) 提供了超时版本,让生产者/消费者可以定义"最多等待多久",避免永久阻塞。这在 wait/notify 中需要额外实现。


五、多生产者多消费者场景

BlockingQueue 的优势在多生产者/多消费者场景下更为明显。以下是一个示例:

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 多生产者多消费者 —— BlockingQueue 实现
 */
public class MultiProducerConsumer {

    private static final int CAPACITY = 10;
    private static final BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(CAPACITY);
    private static final AtomicInteger counter = new AtomicInteger(0);

    static class Producer implements Runnable {
        private final String name;

        Producer(String name) {
            this.name = name;
        }

        @Override
        public void run() {
            try {
                while (!Thread.currentThread().isInterrupted()) {
                    int value = counter.incrementAndGet();
                    queue.put(value);
                    System.out.println(name + " 生产: " + value + " [队列: " + queue.size() + "]");
                    Thread.sleep((long) (Math.random() * 800));
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    static class Consumer implements Runnable {
        private final String name;

        Consumer(String name) {
            this.name = name;
        }

        @Override
        public void run() {
            try {
                while (!Thread.currentThread().isInterrupted()) {
                    int value = queue.take();
                    System.out.println(name + " 消费: " + value + " [队列: " + queue.size() + "]");
                    Thread.sleep((long) (Math.random() * 1200));
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        System.out.println("===== 多生产者多消费者 =====");

        ExecutorService producers = Executors.newFixedThreadPool(3);
        Executors.newFixedThreadPool(2);  // 2 个消费者
        ExecutorService consumers = Executors.newFixedThreadPool(2);

        // 启动 3 个生产者
        for (int i = 1; i <= 3; i++) {
            producers.submit(new Producer("生产者-" + i));
        }

        // 启动 2 个消费者
        for (int i = 1; i <= 2; i++) {
            consumers.submit(new Consumer("消费者-" + i));
        }

        // 运行 10 秒后关闭
        Thread.sleep(10000);
        producers.shutdownNow();
        consumers.shutdownNow();
        System.out.println("系统关闭");
    }
}

如果是 wait/notify 实现,需要处理:

  • 多个生产者线程同时 waitnotifyAll 带来的唤醒风暴
  • 正确的条件判断(防止一个生产者被唤醒后空间又被其他生产者占满)
  • 虚假唤醒的防御

而 BlockingQueue 内部已经优雅地解决了所有这些复杂性。


六、BlockingQueue 的底层原理(简要)

从 JDK 源码看,ArrayBlockingQueue 的核心实现如下(简化示意):

// ArrayBlockingQueue 内部(简化版)
public class ArrayBlockingQueue<E> {
    final ReentrantLock lock;
    private final Condition notFull;   // "不满"条件
    private final Condition notEmpty;  // "不空"条件

    public void put(E e) throws InterruptedException {
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();       // 队列满则等待
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

    public E take() throws InterruptedException {
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();      // 队列空则等待
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

    private void enqueue(E x) {
        // ... 实际入队操作
        notEmpty.signal();             // 唤醒等待的消费者
    }

    private E dequeue() {
        // ... 实际出队操作
        notFull.signal();              // 唤醒等待的生产者
    }
}

关键点:

  • 使用 ReentrantLock 替代 synchronized,支持可中断、超时等特性
  • 使用 两个 Condition 分别管理"队列满"和"队列空",避免了 notifyAll() 唤醒所有线程的低效问题 —— 生产者满时只等待 notFull,消费者空时只等待 notEmpty,唤醒也是精准唤醒
  • 本质上仍然是 wait/notify 模式,但封装在了经过高度优化的并发容器中

七、总结与最佳实践

何时选择哪种方式?

场景 推荐方式
简单生产消费、对并发要求不高 wait/notify 可学习,但生产环境不推荐
生产环境、标准生产者消费者 BlockingQueue — 首选
需要自定义复杂的缓冲区策略 可基于 ReentrantLock + Condition 自行实现
需要精确控制等待/唤醒行为 考虑 PhaserExchanger 等更高级的并发工具

关键原则

  1. 优先使用 java.util.concurrent 包中的高级并发工具,而非自己编写底层同步代码
  2. BlockingQueue 是生产者消费者模式的最佳实践,它安全、高效、可读性强
  3. 理解 wait/notify 机制仍然重要,它是理解 Java 并发锁机制的基础,也是阅读许多开源项目源码的前提
  4. 对于多生产者多消费者场景,务必使用 BlockingQueueReentrantLock + Condition,避免手动 synchronized 的复杂性

参考资源

  • 《Java 并发编程实战》第 5 章 —— 同步工具类
  • Java BlockingQueue Javadoc
  • JDK 源码:ArrayBlockingQueueLinkedBlockingQueue

评论