菜单

Administrator
发布于 2026-05-15 / 1 阅读
0
0

Java生产者消费者模式

Java 使用阻塞队列实现生产者消费者模式

一、什么是生产者消费者模式?

生产者消费者模式是一种经典的多线程协作设计模式。它通过一个共享缓冲区(通常是一个队列)将"生产数据"和"消费数据"的线程解耦:

  • 生产者线程:负责生产数据,将数据放入共享缓冲区
  • 消费者线程:负责消费数据,从共享缓冲区取出数据
  • 共享缓冲区(阻塞队列):协调生产者与消费者之间的速率差异

核心规则:

  • 当缓冲区时,生产者线程被阻塞,直到有消费者取出数据
  • 当缓冲区时,消费者线程被阻塞,直到有生产者放入数据

二、为什么使用 BlockingQueue?

Java 的 BlockingQueue 接口(位于 java.util.concurrent 包)提供了线程安全的阻塞操作

方法 说明
put(E e) 队列满时阻塞,直到有空位
take() 队列空时阻塞,直到有元素
offer(E e, long timeout, TimeUnit unit) 超时插入
poll(long timeout, TimeUnit unit) 超时取出

使用 BlockingQueue 实现生产者消费者模式的优点:

  • ✅ 代码极简,无需手动管理锁和条件变量
  • ✅ 内置线程安全,避免并发 bug
  • ✅ 支持多种队列实现,灵活适应不同场景
  • ✅ 避免 wait/notify 的常见陷阱(假死、早期通知等)

三、三种实现方式对比

方式 关键方法 优点 缺点
Object.wait/notify wait(), notifyAll() 基础方式,理解原理 代码复杂,易出错
Lock + Condition await(), signalAll() 精细控制唤醒类型 仍需手动管理锁
BlockingQueue ✅ 推荐 put(), take() 代码最简洁,开箱即用 需理解阻塞队列原理

四、基于 BlockingQueue 的完整实现

4.1 基础版:单生产者 + 单消费者

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

// 生产者
class Producer implements Runnable {
    private final BlockingQueue<Integer> queue;

    public Producer(BlockingQueue<Integer> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            for (int i = 0; i < 10; i++) {
                System.out.println("生产者生产: " + i);
                queue.put(i);           // 队列满时自动阻塞
                Thread.sleep(500);      // 模拟生产耗时
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

// 消费者
class Consumer implements Runnable {
    private final BlockingQueue<Integer> queue;

    public Consumer(BlockingQueue<Integer> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            while (true) {
                Integer data = queue.take();  // 队列空时自动阻塞
                System.out.println("消费者消费: " + data);
                Thread.sleep(1000);           // 模拟消费耗时
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

// 主程序
public class ProducerConsumerDemo {
    public static void main(String[] args) {
        BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(5); // 容量为5

        Producer producer = new Producer(queue);
        Consumer consumer = new Consumer(queue);

        new Thread(producer, "生产者").start();
        new Thread(consumer, "消费者").start();
    }
}

4.2 进阶版:多生产者 + 多消费者

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

// 日志事件类
class LogEvent {
    private final String timestamp;
    private final String level;
    private final String message;
    private static final AtomicInteger counter = new AtomicInteger(0);
    private final int id;

    public LogEvent(String level, String message) {
        this.id = counter.incrementAndGet();
        this.timestamp = java.time.LocalDateTime.now().toString();
        this.level = level;
        this.message = message;
    }

    @Override
    public String toString() {
        return String.format("[%s] #%d %s - %s", timestamp, id, level, message);
    }
}

// 生产者
class MultiProducer implements Runnable {
    private final BlockingQueue<LogEvent> queue;
    private final String name;

    public MultiProducer(BlockingQueue<LogEvent> queue, String name) {
        this.queue = queue;
        this.name = name;
    }

    @Override
    public void run() {
        try {
            for (int i = 0; i < 5; i++) {
                LogEvent event = new LogEvent("INFO", name + " 生产事件 #" + i);
                queue.put(event);
                System.out.println("[" + name + "] 生产: " + event);
                Thread.sleep(200);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

// 消费者
class MultiConsumer implements Runnable {
    private final BlockingQueue<LogEvent> queue;
    private final String name;

    public MultiConsumer(BlockingQueue<LogEvent> queue, String name) {
        this.queue = queue;
        this.name = name;
    }

    @Override
    public void run() {
        try {
            while (true) {
                LogEvent event = queue.take();
                System.out.println("[" + name + "] 消费: " + event);
                Thread.sleep(500);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

// 主程序
public class MultiProducerConsumerDemo {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<LogEvent> queue = new ArrayBlockingQueue<>(10);

        // 启动3个生产者
        for (int i = 1; i <= 3; i++) {
            new Thread(new MultiProducer(queue, "生产者" + i)).start();
        }

        // 启动2个消费者
        for (int i = 1; i <= 2; i++) {
            new Thread(new MultiConsumer(queue, "消费者" + i)).start();
        }

        // 运行10秒后退出
        Thread.sleep(10000);
        System.out.println("主程序退出");
        System.exit(0);
    }
}

4.3 实用版:含优雅关闭的生产者消费者

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;

class ShutdownProducer implements Runnable {
    private final BlockingQueue<String> queue;
    private final AtomicBoolean running;

    public ShutdownProducer(BlockingQueue<String> queue, AtomicBoolean running) {
        this.queue = queue;
        this.running = running;
    }

    @Override
    public void run() {
        try {
            int count = 0;
            while (running.get()) {
                String data = "消息-" + (++count);
                queue.put(data);
                System.out.println("[生产者] 发送: " + data);
                Thread.sleep(300);
            }
            System.out.println("[生产者] 已关闭");
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

class ShutdownConsumer implements Runnable {
    private final BlockingQueue<String> queue;
    private final AtomicBoolean running;
    private static final String POISON_PILL = "POISON";

    public ShutdownConsumer(BlockingQueue<String> queue, AtomicBoolean running) {
        this.queue = queue;
        this.running = running;
    }

    @Override
    public void run() {
        try {
            while (running.get() || !queue.isEmpty()) {
                String data = queue.poll(1, TimeUnit.SECONDS);
                if (data != null) {
                    System.out.println("[消费者] 收到: " + data);
                    Thread.sleep(500);
                }
            }
            System.out.println("[消费者] 已关闭");
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

public class GracefulShutdownDemo {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<String> queue = new LinkedBlockingQueue<>(20);
        AtomicBoolean running = new AtomicBoolean(true);

        Thread producer = new Thread(new ShutdownProducer(queue, running));
        Thread consumer = new Thread(new ShutdownConsumer(queue, running));

        producer.start();
        consumer.start();

        // 运行5秒后优雅关闭
        Thread.sleep(5000);
        System.out.println("=== 准备关闭 ===");
        running.set(false);
        
        producer.join();
        consumer.join();
        System.out.println("=== 已安全关闭 ===");
    }
}

五、选择合适的阻塞队列

队列类型 适用生产者消费者场景
LinkedBlockingQueue 最常用,可选有界/无界,适合大多数场景
ArrayBlockingQueue 固定容量、高性能场景
PriorityBlockingQueue 需要按优先级消费的场景
DelayQueue 需要延迟消费的场景(如订单超时取消)
SynchronousQueue 零缓存、直接交付场景

六、最佳实践

  1. 始终指定容量:使用 LinkedBlockingQueue 时务必设置容量上限,防止内存溢出
  2. 使用 try-catch 处理中断put()take() 会抛出 InterruptedException
  3. 优雅关闭:通过标志位 + 超时 poll 实现安全关闭
  4. 监控队列大小:生产中监控队列积压情况,及时发现问题
  5. 选择合适的队列:根据吞吐量、优先级需求等因素选择实现类

评论