生产者/消费者问题

Producer/Consumer problem

  • 概念
  • Java实现
  • Python实现
  • 总结

概念

生产者-消费者(Producer-Consumer)问题是一个著名的进程/线程的同步问题。
- 它描述的是:有一群生产者在生产产品,并将这些产品提供给消费者去消费
- 为了使生产者与消费者能并发执行,在两者之间设置了一个具有n个缓冲区的缓冲池
【生产者】将生产的产品放入缓冲区中;不允许生产者向一个已装满产品的缓冲区投放产品
【消费者】从缓冲区中取走产品去消费;不允许消费者到一个空缓冲区去取产品
- 所有的生产者和消费者进程或线程都是以异步的方式运行的,但它们之间必须保持同步

Java实现

  • 产品类
public class Product {

    private long randID;    // 随机ID

    public Product(long id) {
        this.randID = id;
    }

    public long getRandID() {
        return randID;
    }

    public void setRandID(long randID) {
        this.randID = randID;
    }

}
  • 消费者类
import java.util.concurrent.Semaphore;

public class Consumer implements Runnable {

    private java.util.concurrent.ConcurrentLinkedQueue<Product> q;
    private Semaphore semaphore;
    private String name;

    public Consumer(String name,
            java.util.concurrent.ConcurrentLinkedQueue<Product> q,
            Semaphore semaphore) {
        this.name = name;
        this.q = q;
        this.semaphore = semaphore;
    }

    public void consume() throws InterruptedException {

        semaphore.acquire();

        if (q.size() > 0) {
            Product product = this.q.remove();
            System.out.println("Consumer ["+this.name+
            "] notify : consumed product num is " + product.getRandID() + "\n");
        }

    }

    @Override
    public void run() {
        while (true) {
            // 1.延时2秒
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 2.消费一个
            try {
                this.consume();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}
  • 生产者类
import java.util.concurrent.Semaphore;

public class Producer implements Runnable {

    private java.util.concurrent.ConcurrentLinkedQueue<Product> q;
    private Semaphore semaphore;
    private int MAX_AVAILABLE;
    private String name;

    public Producer(String name, int MAX_AVAILABLE, 
            java.util.concurrent.ConcurrentLinkedQueue<Product> q,
            Semaphore semaphore) {
        this.name = name;
        this.MAX_AVAILABLE = MAX_AVAILABLE;
        this.q = q;
        this.semaphore = semaphore;
    }

    public void produce() {

        if (q.size() < MAX_AVAILABLE) {
// long id = System.currentTimeMillis();
            long id = (long) (Math.random() * 100);
            this.q.add(new Product(id));
            System.out.println("Producer ["+this.name
                    +"] notify : produced product num is "+id);
        }

        semaphore.release();

    }

    @Override
    public void run() {
        while (true) {
            // 1.延时1秒
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 2.生产一个
            produce();
        }
    }

}
  • 测试类
import java.util.concurrent.Semaphore;

public class Main {

    private final static int MAX_AVAILABLE = 5;

    public static void main(String[] args) throws InterruptedException {

        java.util.concurrent.ConcurrentLinkedQueue<Product> q = 
                new java.util.concurrent.ConcurrentLinkedQueue<Product>();
        Semaphore semaphore = new Semaphore(0);

        Thread produceThread = new Thread(new Producer("p1", MAX_AVAILABLE, q, semaphore));
        Thread consumeThread = new Thread(new Consumer("c1", q, semaphore));

        produceThread.start();
        consumeThread.start();

        produceThread.join();
        consumeThread.join();
    }

}

Python实现

  • 生产者和消费者
import threading
import time
import random

# 通过信号量进行同步
semaphore = threading.Semaphore(0)  # 初始信号量为0

def consumer(): # 消费者
    print("Consumer is waiting.")
    ## Acquire a semaphore
    semaphore.acquire()  # 当信号量为0时,acquire会阻塞
    ## The consumer have access to the shared resource
    print("\tConsumer notify : consumed item number %s" % (item,))

def producer(): # 生产者
    global item
    time.sleep(2)
    ## Create a random item
    item = random.randint(0, 1000)
    print("\tProducer notify : produced item number %s" % (item,))
    ## Release a semaphore, increamenting the internal counter by one
    semaphore.release()  # 信号量 0 -> 1
  • 测试
if __name__ == '__main__':
    for i in range(0, 5):
        t1 = threading.Thread(target=producer)
        t2 = threading.Thread(target=consumer)
        t1.start()
        t2.start()
        t1.join()
        t2.join()
    print("program terminated")

总结

生产者和消费者问题的实现可以多种多样,同样的,问题的描述也可以多种多样,例如火车入站,对于一条站台轨道,多辆火车如何分配。这类问题都可以总结为对临界资源(Critical Resouce)的管理问题。

其中为实现进程互斥的进入临界区,多采用软件方法,同步机制均遵循下面4条准则:
1. 空闲让进。当无进程处于临界区时,应允许一个请求进入临界区的进程进入临界区,以有效地利用临界资源。
2. 忙则等待。当已有进程进入临界区时,因而其他试图进入临界区的进程必须等待,以保证对临界资源的互斥访问。
3. 有限等待。对要求访问临界资源的进程,应保证在有限时间内能进入自己的临界区,以免陷入死等状态。
4. 让权等待。当进程不能进入自己的临界区时,应立即释放处理机,以免进程陷入忙等状态。

更多相关文章

  1. 线程同步-生产者消费者问题
  2. Linux内核中实现生产者与消费者(避免无效唤醒)
  3. java并发包学习系列:生产者消费者模式

随机推荐

  1. android 来电自动接听和自动挂断
  2. Android之OnGestureListener实现图片的左
  3. Android获取、设置桌面
  4. 常用的android弹出对话框
  5. 2014.01.16(2) ——— android 关于适配的
  6. Android存储之SharedPreferences和File
  7. [gitbook] Android框架分析系列之Android
  8. Android电话秀(三)
  9. 安卓开发之去标题栏
  10. AndroidStudio3.6.3新版本遇到的坑