本文目录导读:

- 方式一:使用
wait()和notify()(底层原理,面试常考) - 方式二:使用
BlockingQueue(生产推荐,简单高效) - 方式三:使用
Lock和Condition(更精细的控制) - 总结与选型建议
在Java中模拟生产者-消费者模式,最经典且面试高频的方式是使用 wait() / notify() 或 BlockingQueue。
下面提供两种核心实现方式,并附带一个完整的可运行案例。
使用 wait() 和 notify()(底层原理,面试常考)
这种方式手动控制锁和条件,能让你更深刻地理解线程协作。
核心思想:
- 定义一个缓冲区(如队列),并设定最大容量。
- 生产者:当缓冲区满时,调用
wait()等待;否则生产数据,放入缓冲区,然后调用notify()唤醒消费者。 - 消费者:当缓冲区空时,调用
wait()等待;否则取出数据,然后调用notify()唤醒生产者。
完整代码示例:
import java.util.LinkedList;
import java.util.Queue;
// 1. 共享缓冲区
class MyBlockingQueue {
private final Queue<Integer> queue = new LinkedList<>();
private final int capacity;
public MyBlockingQueue(int capacity) {
this.capacity = capacity;
}
// 生产者放入数据
public synchronized void put(int value) throws InterruptedException {
while (queue.size() == capacity) {
System.out.println("队列已满,生产者等待...");
wait(); // 满则等待
}
queue.offer(value);
System.out.println("生产者生产了: " + value + ",当前队列大小: " + queue.size());
notifyAll(); // 唤醒所有等待的线程(消费者)
}
// 消费者取出数据
public synchronized int take() throws InterruptedException {
while (queue.isEmpty()) {
System.out.println("队列为空,消费者等待...");
wait(); // 空则等待
}
int value = queue.poll();
System.out.println("消费者消费了: " + value + ",当前队列大小: " + queue.size());
notifyAll(); // 唤醒所有等待的线程(生产者)
return value;
}
}
// 2. 生产者线程
class Producer extends Thread {
private final MyBlockingQueue queue;
private final int id;
public Producer(MyBlockingQueue queue, int id) {
this.queue = queue;
this.id = id;
}
@Override
public void run() {
for (int i = 0; i < 5; i++) {
try {
int value = id * 100 + i; // 生成一些数据
queue.put(value);
Thread.sleep((long) (Math.random() * 500)); // 模拟生产耗时
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
// 3. 消费者线程
class Consumer extends Thread {
private final MyBlockingQueue queue;
private final int id;
public Consumer(MyBlockingQueue queue, int id) {
this.queue = queue;
this.id = id;
}
@Override
public void run() {
for (int i = 0; i < 5; i++) {
try {
int value = queue.take();
System.out.println("消费者" + id + "消费了: " + value);
Thread.sleep((long) (Math.random() * 1000)); // 模拟消费耗时
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
// 4. 主程序
public class ProducerConsumerExample {
public static void main(String[] args) {
MyBlockingQueue queue = new MyBlockingQueue(3); // 容量为3
// 创建并启动生产者和消费者
new Producer(queue, 1).start();
new Producer(queue, 2).start();
new Consumer(queue, 1).start();
new Consumer(queue, 2).start();
}
}
关键点说明:
while而非if:判断条件(满/空)时用while循环,因为线程被notify唤醒后,可能由于别的线程抢先操作,条件又变回不满足,需要重新检查。notifyAll而非notify:避免“信号丢失”或“死锁”,因为可能有多个生产者/消费者在等待不同类型的条件。
使用 BlockingQueue(生产推荐,简单高效)
Java并发包 java.util.concurrent 提供了 BlockingQueue 接口及其实现(如 ArrayBlockingQueue),其内部已经封装好了锁和等待/通知机制。
代码相比方式一大幅简化:
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class BlockingQueueDemo {
public static void main(String[] args) {
// 创建一个容量为2的阻塞队列
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(2);
// 生产者:使用Lambda表达式创建线程
Thread producer = new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
queue.put(i); // 如果队列满,此方法会自动阻塞
System.out.println("生产者生产: " + i + ",队列中元素数: " + queue.size());
Thread.sleep(200);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// 消费者
Thread consumer = new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
Integer value = queue.take(); // 如果队列空,此方法会自动阻塞
System.out.println("消费者消费: " + value);
Thread.sleep(500);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producer.start();
consumer.start();
}
}
推荐理由:代码简洁、线程安全、无需手动管理锁。
使用 Lock 和 Condition(更精细的控制)
如果你想要更灵活的锁控制(如超时等待、公平锁),可以使用 ReentrantLock + Condition。
示例片段:
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
class BoundedBuffer {
private final Queue<Integer> queue = new LinkedList<>();
private final int capacity;
private final ReentrantLock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition(); // 不满条件
private final Condition notEmpty = lock.newCondition(); // 不空条件
public BoundedBuffer(int capacity) {
this.capacity = capacity;
}
public void put(int value) throws InterruptedException {
lock.lock();
try {
while (queue.size() == capacity) {
notFull.await(); // 满则等待“不满”信号
}
queue.offer(value);
System.out.println("生产: " + value);
notEmpty.signal(); // 唤醒等待“不空”信号的消费者
} finally {
lock.unlock();
}
}
public int take() throws InterruptedException {
lock.lock();
try {
while (queue.isEmpty()) {
notEmpty.await(); // 空则等待“不空”信号
}
int value = queue.poll();
System.out.println("消费: " + value);
notFull.signal(); // 唤醒等待“不满”信号的生产者
return value;
} finally {
lock.unlock();
}
}
}
优点:可以使用多个 Condition 精确唤醒生产者或消费者,效率略高于 notifyAll。
总结与选型建议
| 方式 | 复杂度 | 适用场景 |
|---|---|---|
wait/notify |
中等 | 面试考察线程协作原理、学习底层机制 |
BlockingQueue |
低 | 实际开发首选,最简洁、最可靠 |
Lock + Condition |
较高 | 需要超时等待、公平锁、或区分唤醒特定类型线程时 |
核心要点:
- 防止死锁:使用
while而不是if检查条件。 - 避免信号丢失:优先使用
notifyAll()或signalAll()。 - 生产消费速率:通常消费者比生产者慢(如案例中
sleep时间更长),模拟真实场景中的“积压”与“等待”。
你可以直接复制上面的 方式二(BlockingQueue) 代码运行,结果会看到生产者和消费者交替执行,且队列不会超过容量。