如何写代码来解决生产者消费者问题?

这个题目考验的实际上是线程间的通信和同步问题。我们再有一篇文章中,使用Object的wait和notify实现了线程间的同步。参见 : https://huster.top/htmls/615.html

一、最简单的一个实现

1. 生产者

public class ProductClient implements Runnable {

    private Queue names ;


    public ProductClient(Queue names) {
        this.names = names;
    }

    public void run() {
        this.startProducter();
    }
    private void startProducter() {
        ScheduledExecutorService executorService  = Executors.newScheduledThreadPool(3);
        executorService.scheduleAtFixedRate( () -> {
            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            //有失败和丢失的可能性
            names.offer("test - " + simpleDateFormat.format(new Date()));
        }, 1, 1, TimeUnit.SECONDS);
    }

}

2. 消费者

public class ConsumerClient implements Runnable {
    private Queue names;

    public ConsumerClient(Queue names) {
        this.names = names;
    }

    public void run() {
           while (true) {
            String name  = names.poll();
            if (name != null) {
                System.out.println("finish : " + name);
            }
        }
    }
}

3. 主程序

public class MainClass {

    public static void main(String[] args) {
        Queue queue = new LinkedList<>();
        ProductClient productClient = new ProductClient(queue);
        Thread thread = new Thread(productClient);
        Thread thread2 = new Thread(productClient);
        thread.start();
        thread2.start();
        ConsumerClient consumerClient = new ConsumerClient(queue);
        Thread consumer = new Thread(consumerClient);
        consumer.start();
        Thread consumer2 = new Thread(new ConsumerClient(queue));
        consumer2.start();
    }
}

分析代码,我们知道,在生产者中,我们使用一个定时任务,每隔一秒钟,就生成一个名字。而在消费者,我们使用一个while循环来不断的check是否有心的name到达,如果有,则取出,然后打印。
缺点: 这个程序的缺点,显而易见,在生产者中,一旦有界队列的空间不足够,offer是可能失败的,这样就丢失了信息。而在消费者中,也有问题,while不断的在运行,但是如果queue为空的话,他其实是在空转的。他无法感知到队列的情况。

二、 针对以上情况,我们做一个补救措施,并且使用object的wait和notify来在线程之间进行同步。
1. 生产者2

public class ProductClient2 implements Runnable {
    private Queue names ;
    private Object object;
    public ProductClient2(Queue names, Object o) {
        this.names = names;
        this.object = o;
    }
    public void run() {
        this.startProducter();
    }
    private void startProducter() {
        ScheduledExecutorService executorService  = Executors.newScheduledThreadPool(3);
        executorService.scheduleAtFixedRate( () -> {
            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            //注意是while不是if
            while (names.size() >= Integer.MAX_VALUE) {
                synchronized (object) {
                    try {
                        object.wait();
                    } catch (InterruptedException e) {
                        System.err.println(e.getMessage());
                    }

                }
            }
            names.offer("test - " + simpleDateFormat.format(new Date()));
            object.notifyAll();
        }, 1, 1, TimeUnit.SECONDS);
    }

}

2. 消费者2

public class ConsumerClient2 implements Runnable {
    private Queue names;

    private Object object;

    public ConsumerClient2(Queue names, Object object) {
        this.names = names;
        this.object = object;
    }

    public void run() {
        while (true) {
            //注意是while not if
            while (names.size() <= 0) {
                synchronized (object) {
                    try {
                        object.wait();
                    } catch (InterruptedException e) {
                        System.err.println(e.getMessage());
                    }
                }
            }
            String name  = names.poll();
            object.notifyAll();
            System.out.println("finish : " + name);
        }
    }
}

分析和总结: 这段代码解决了越界和空转的问题,但是有问题吗?仔细想想看,在Object的notifyAll的时候,当放入了元素来通知消费者的时候,一定唤醒的是消费者吗? 不然,有可能生产者被唤醒了,而消费者却没有被唤醒。是不是有这个问题?
ps : 需要注意的是,使用的是while而不是if,就是为了解决假唤醒的问题。
三、 使用ReentrantLock的实现
为了解决二中带来的乱唤醒的问题,我们继续优化程序如下

1. 生产者3

public class ProductClient3 implements Runnable {
    private Queue names ;
    private ReentrantLock lock;
    private Condition full;
    private Condition empty;

    public ProductClient3(Queue names, ReentrantLock lock, Condition full, Condition empty) {
        this.names = names;
        this.lock = lock;
        this.full = full;
        this.empty = empty;
    }

    public void run() {
        this.startProducter();
    }
    private void startProducter() {
        ScheduledExecutorService executorService  = Executors.newScheduledThreadPool(3);
        executorService.scheduleAtFixedRate( () -> {
            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            //注意是while不是if
            while (names.size() >= Integer.MAX_VALUE) {
                lock.lock();
                    try {
                        full.await();
                    } catch (InterruptedException e) {
                        System.err.println(e.getMessage());
                    } finally {
                        lock.unlock();
                    }

            }
            names.offer("test - " + simpleDateFormat.format(new Date()));
            empty.signal();
        }, 1, 1, TimeUnit.SECONDS);
    }

}

2. 消费者3

public class ConsumerClient3 implements Runnable {
    private Queue names;
    private ReentrantLock lock;
    private Condition full;
    private Condition empty;

    public ConsumerClient3(Queue names, ReentrantLock lock, Condition full, Condition empty) {
        this.names = names;
        this.lock = lock;
        this.full = full;
        this.empty = empty;
    }

    public void run() {
        while (true) {
            //注意是while not if
            while (names.size() <= 0) {
                lock.lock();
                    try {
                        empty.await();
                    } catch (InterruptedException e) {
                        System.err.println(e.getMessage());
                    } finally {
                        lock.unlock();
                    }
                }
            String name  = names.poll();
            full.signal();
            System.out.println("finish : " + name);
        }
    }


}

我们将empty和full分开考虑,这样就能确保正确的唤醒。

四、 终极态,使用阻塞队列。

到3其实我们的阻塞队列的方式呼之欲出。我们先看看阻塞队列的定义。
public interface BlockingQueue extends Queue
其中Queue定义如下
添加:

  1. add(E element) 添加一个,如果超出了界限,则抛出异常
  2. Offer(E element) 添加一个,如果超出了界限,则返回false

获取并删除

  1. remove() 获取并删除队头,如果队列为空,则抛出异常
  2. Poll() 获取并删除队头, 如果队列为空,则返回null


获取但不删除

  1. element() 获取但是并不删除,如果队列为空,则抛出异常、
  2. peek() 获取但是并不删除,如果队列为空,则返回null.

而阻塞队列则多了两个阻塞的方法,put 和 take,参见下表

hrows exception Special value Blocks Times out
Insert add(e) offer(e) put(e) offer(e, time, unit)
Remove remove() poll() take() poll(time, unit)
Examine element() peek() not applicable not applicable

 

最终实现如下

1. 生产者

public class ConsumerClient4 implements Runnable {
    private BlockingDeque names;

    public ConsumerClient4(BlockingDeque names) {
        this.names = names;
    }
    public void run() {
        while (true) {
            try {
                String name  = names.take();
                System.out.println("finish : " + name);
            }catch (InterruptedException e) {
                System.out.println(e.getMessage());
            }
        }
    }
}

2. 消费者

public class ProductClient4 implements Runnable {
    private BlockingDeque names ;

    public ProductClient4(BlockingDeque names) {
        this.names = names;
    }

    public void run() {
        this.startProducter();
    }
    private void startProducter() {
        ScheduledExecutorService executorService  = Executors.newScheduledThreadPool(3);
        executorService.scheduleAtFixedRate( () -> {
            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            try {
                names.put("test - " + simpleDateFormat.format(new Date()));
            } catch (InterruptedException e) {
                System.err.println(e.getMessage());
            }
        }, 1, 1, TimeUnit.SECONDS);
    }

}

总结: 程序瞬间短了很多有么有,其实细看阻塞队列的实现,跟我们的第三种实现基本一样,我们看下ArrayBlockingQueue的实现

 public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

  private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }

基本一样,有没有。

五、 其他实现,诸如,我们还可以使用Semaphore来实现线程间的同步,所有线程间的同步工具,都可以用来实现生产者和消费者模式,但是都不如使用阻塞队列来的简单。

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注