0%

生产者——消费者问题

问题描述

系统中有一组生产者进程与一组消费者进程,生产者进程每次生产一个产品放入缓冲区,消费者进程每次从缓冲区中取出一个产品并使用。生产者、消费者同时共享一个初始为空,大小为n的缓冲区,否则必须等待。只有缓冲区不空时,消费者才可以从中取出产品,否则必须等待。
如下图所示:
avatar

问题分析

这个问题的关键就在于是要保证生产者不会在缓冲区满时加数据,同时消费者也不会在缓冲区为空时消费数据。对于缓冲区而言,它是一个临界资源,因而各个进程必须互斥的访问。这样我们就需要使用互斥量(mutex),这里如果不使用mutex就会导致竞争条件的出现,进而引发死锁。加入mutex后,就可以限制只有一个进程可以被执行。如图所示:
avatar
若要解决该问题,就必须让生产者在缓冲区满时休眠(亦或者是直接放弃数据),等到下次消费者消费缓冲区中的数据时,生产者才可以被唤醒,开始往缓冲区中添加数据。同理,也可以让消费者在缓冲区为空时进入休眠状态,等到生产者往缓冲区添加数据之后,再唤醒消费者。常见的方法有:信号灯法、管程法。

具体实现

对于C++来说,并没有提供管程。如果对管程实现感兴趣的,可以参考这篇文章
对于信号灯法,具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
#include <iostream>
#include <mutex>
#include <thread>
#include <queue>
#include <condition_variable>

using namespace std;

mutex g_mutex;
condition_variable Product;
condition_variable Customer;

queue<int> share_buff; // 共享缓冲区
int maxSize = 20; // 共享缓冲区大小为20
int count;

void producer() {
// 与条件变量所搭配的锁
unique_lock<mutex> lck(g_mutex);
while (true) {
this_thread::sleep_for(chrono::milliseconds(500));
Product.wait(lck,[]{return share_buff.size() != maxSize;});

cout << "->producer " << this_thread::get_id() << " : ";
cout << share_buff.size() << '\n';
share_buff.push(share_buff.size());
Customer.notify_all();
count++;
if (count % 20 == 0) {
cout << "===========================================" << '\n';
cout << " THIS IS CUSTOMER" << '\n';
cout << "==========================================="<< '\n';
}
}
lck.unlock();
}

void customers() {
unique_lock<mutex> lck(g_mutex);
while (true) {
this_thread::sleep_for(chrono::milliseconds(700));
Customer.wait(lck,[] {return share_buff.size() != 0;} );
cout << "Customer " << this_thread::get_id() << ": ";
cout << share_buff.size() << '\n';
share_buff.pop();
Product.notify_all();
count++;
if (count % 20 == 0) {
cout << "===========================================" << '\n';
cout << " THIS IS Producter" << '\n';
cout << "==========================================="<< '\n';
}
}
}

void consumerThread(){ customers();}
void producerThread() {producer();}
int main()
{
thread t1(consumerThread);
thread t2(consumerThread);
t1.join();
t2.join();
return 0;
}

问题推广

上面我们是根据单个生产者与消费者进行讨论的,那么对于多个生产者或多个消费者时。我们又该如何处理

多生产者单消费者(MPSC)

对于多对于多生产者单消费者来说,多生产者之间具有互斥关系,所以这里需要一个互斥锁来实现缓冲区的互斥访问,那么具体的实现方式就是在单生产者单消费者的基础之上,加一个互斥信号量useQueue
如果采用信号量来实现的话可以如下:

  • emptyCount = N ; fullCount = 0 ; useQueue = 1
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    produce:
    P(emptyCount)//信号量emptyCount减一
    P(useQueue)//二值信号量useQueue减一,变为0(其他线程不能进入缓冲区,阻塞状态)
    putItemIntoQueue(item)//执行put操作
    V(useQueue)//二值信号量useQueue加一,变为1(其他线程可以进入缓冲区)
    V(fullCount)//信号量fullCount加一
    consume:
    P(fullCount)//fullCount -= 1
    item ← getItemFromQueue()
    V(emptyCount)//emptyCount += 1

单生产者多消费者(SPMC)

对于单生产者多消费者同多生产者多消费者

  • emptyCount = N ; fullCount = 0 ; useQueue = 1
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    produce:
    P(emptyCount)//信号量emptyCount减一
    putItemIntoQueue(item)//执行put操作
    V(fullCount)//信号量fullCount加一
    consume:
    P(fullCount)//fullCount -= 1
    P(useQueue)//二值信号量useQueue减一,变为0(其他线程不能进入缓冲区,阻塞状态)
    item ← getItemFromQueue()
    V(useQueue)//二值信号量useQueue加一,变为1(其他线程可以进入缓冲区)
    V(emptyCount)//emptyCount += 1

多生产者多消费者(MPMC)-单缓冲区(SB)

对于多生产者多消费者问题,是一个同步+互斥问题,不仅需要生产者和消费者之间的同步协作,还需要实现对缓冲区资源的互斥访问。
采用信号量:

  • emptyCount = N ; fullCount = 0 ; useQueue = 1
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    produce:
    P(emptyCount)//信号量emptyCount减一
    P(useQueue)//二值信号量useQueue减一,变为0(其他线程不能进入缓冲区,阻塞状态)
    putItemIntoQueue(item)//执行put操作
    V(useQueue)//二值信号量useQueue加一,变为1(其他线程可以进入缓冲区)
    V(fullCount)//信号量fullCount加一
    consume:
    P(fullCount)//fullCount -= 1
    P(useQueue)//二值信号量useQueue减一,变为0(其他线程不能进入缓冲区,阻塞状态)
    item ← getItemFromQueue()
    V(useQueue)//二值信号量useQueue加一,变为1(其他线程可以进入缓冲区)
    V(emptyCount)//emptyCount += 1