这个题目考验的实际上是线程间的通信和同步问题。我们再有一篇文章中,使用Object的wait和notify实现了线程间的同步。参见 : https://huster.top/htmls/615.html
一、最简单的一个实现
1. 生产者
public class ProductClient implements Runnable {
private Queue names ;
public ProductClient(Queue names) {
this.names = names;
}
public void run() {
this.startProducter();
}
private void startProducter() {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(3);
executorService.scheduleAtFixedRate( () -> {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
//有失败和丢失的可能性
names.offer("test - " + simpleDateFormat.format(new Date()));
}, 1, 1, TimeUnit.SECONDS);
}
}
2. 消费者
public class ConsumerClient implements Runnable {
private Queue names;
public ConsumerClient(Queue names) {
this.names = names;
}
public void run() {
while (true) {
String name = names.poll();
if (name != null) {
System.out.println("finish : " + name);
}
}
}
}
3. 主程序
public class MainClass {
public static void main(String[] args) {
Queue queue = new LinkedList<>();
ProductClient productClient = new ProductClient(queue);
Thread thread = new Thread(productClient);
Thread thread2 = new Thread(productClient);
thread.start();
thread2.start();
ConsumerClient consumerClient = new ConsumerClient(queue);
Thread consumer = new Thread(consumerClient);
consumer.start();
Thread consumer2 = new Thread(new ConsumerClient(queue));
consumer2.start();
}
}
分析代码,我们知道,在生产者中,我们使用一个定时任务,每隔一秒钟,就生成一个名字。而在消费者,我们使用一个while循环来不断的check是否有心的name到达,如果有,则取出,然后打印。
缺点: 这个程序的缺点,显而易见,在生产者中,一旦有界队列的空间不足够,offer是可能失败的,这样就丢失了信息。而在消费者中,也有问题,while不断的在运行,但是如果queue为空的话,他其实是在空转的。他无法感知到队列的情况。
二、 针对以上情况,我们做一个补救措施,并且使用object的wait和notify来在线程之间进行同步。
1. 生产者2
public class ProductClient2 implements Runnable {
private Queue names ;
private Object object;
public ProductClient2(Queue names, Object o) {
this.names = names;
this.object = o;
}
public void run() {
this.startProducter();
}
private void startProducter() {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(3);
executorService.scheduleAtFixedRate( () -> {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
//注意是while不是if
while (names.size() >= Integer.MAX_VALUE) {
synchronized (object) {
try {
object.wait();
} catch (InterruptedException e) {
System.err.println(e.getMessage());
}
}
}
names.offer("test - " + simpleDateFormat.format(new Date()));
object.notifyAll();
}, 1, 1, TimeUnit.SECONDS);
}
}
2. 消费者2
public class ConsumerClient2 implements Runnable {
private Queue names;
private Object object;
public ConsumerClient2(Queue names, Object object) {
this.names = names;
this.object = object;
}
public void run() {
while (true) {
//注意是while not if
while (names.size() <= 0) {
synchronized (object) {
try {
object.wait();
} catch (InterruptedException e) {
System.err.println(e.getMessage());
}
}
}
String name = names.poll();
object.notifyAll();
System.out.println("finish : " + name);
}
}
}
分析和总结: 这段代码解决了越界和空转的问题,但是有问题吗?仔细想想看,在Object的notifyAll的时候,当放入了元素来通知消费者的时候,一定唤醒的是消费者吗? 不然,有可能生产者被唤醒了,而消费者却没有被唤醒。是不是有这个问题?
ps : 需要注意的是,使用的是while而不是if,就是为了解决假唤醒的问题。
三、 使用ReentrantLock的实现
为了解决二中带来的乱唤醒的问题,我们继续优化程序如下
1. 生产者3
public class ProductClient3 implements Runnable {
private Queue names ;
private ReentrantLock lock;
private Condition full;
private Condition empty;
public ProductClient3(Queue names, ReentrantLock lock, Condition full, Condition empty) {
this.names = names;
this.lock = lock;
this.full = full;
this.empty = empty;
}
public void run() {
this.startProducter();
}
private void startProducter() {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(3);
executorService.scheduleAtFixedRate( () -> {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
//注意是while不是if
while (names.size() >= Integer.MAX_VALUE) {
lock.lock();
try {
full.await();
} catch (InterruptedException e) {
System.err.println(e.getMessage());
} finally {
lock.unlock();
}
}
names.offer("test - " + simpleDateFormat.format(new Date()));
empty.signal();
}, 1, 1, TimeUnit.SECONDS);
}
}
2. 消费者3
public class ConsumerClient3 implements Runnable {
private Queue names;
private ReentrantLock lock;
private Condition full;
private Condition empty;
public ConsumerClient3(Queue names, ReentrantLock lock, Condition full, Condition empty) {
this.names = names;
this.lock = lock;
this.full = full;
this.empty = empty;
}
public void run() {
while (true) {
//注意是while not if
while (names.size() <= 0) {
lock.lock();
try {
empty.await();
} catch (InterruptedException e) {
System.err.println(e.getMessage());
} finally {
lock.unlock();
}
}
String name = names.poll();
full.signal();
System.out.println("finish : " + name);
}
}
}
我们将empty和full分开考虑,这样就能确保正确的唤醒。
四、 终极态,使用阻塞队列。
到3其实我们的阻塞队列的方式呼之欲出。我们先看看阻塞队列的定义。
public interface BlockingQueue extends Queue
其中Queue定义如下
添加:
- add(E element) 添加一个,如果超出了界限,则抛出异常
- Offer(E element) 添加一个,如果超出了界限,则返回false
获取并删除
- remove() 获取并删除队头,如果队列为空,则抛出异常
- Poll() 获取并删除队头, 如果队列为空,则返回null
获取但不删除
- element() 获取但是并不删除,如果队列为空,则抛出异常、
- peek() 获取但是并不删除,如果队列为空,则返回null.
而阻塞队列则多了两个阻塞的方法,put 和 take,参见下表
hrows exception | Special value | Blocks | Times out | |
---|---|---|---|---|
Insert | add(e) | offer(e) | put(e) | offer(e, time, unit) |
Remove | remove() | poll() | take() | poll(time, unit) |
Examine | element() | peek() | not applicable | not applicable |
最终实现如下
1. 生产者
public class ConsumerClient4 implements Runnable {
private BlockingDeque names;
public ConsumerClient4(BlockingDeque names) {
this.names = names;
}
public void run() {
while (true) {
try {
String name = names.take();
System.out.println("finish : " + name);
}catch (InterruptedException e) {
System.out.println(e.getMessage());
}
}
}
}
2. 消费者
public class ProductClient4 implements Runnable {
private BlockingDeque names ;
public ProductClient4(BlockingDeque names) {
this.names = names;
}
public void run() {
this.startProducter();
}
private void startProducter() {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(3);
executorService.scheduleAtFixedRate( () -> {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
try {
names.put("test - " + simpleDateFormat.format(new Date()));
} catch (InterruptedException e) {
System.err.println(e.getMessage());
}
}, 1, 1, TimeUnit.SECONDS);
}
}
总结: 程序瞬间短了很多有么有,其实细看阻塞队列的实现,跟我们的第三种实现基本一样,我们看下ArrayBlockingQueue的实现
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
基本一样,有没有。
五、 其他实现,诸如,我们还可以使用Semaphore来实现线程间的同步,所有线程间的同步工具,都可以用来实现生产者和消费者模式,但是都不如使用阻塞队列来的简单。