生产者消费者问题是多线程并发中一个非常经典的问题,相信学过操作系统课程的同学都清楚这个问题的根源。本文将就四种情况分析并介绍生产者和消费者问题,它们分别是:单生产者-单消费者模型,单生产者-多消费者模型,多生产者-单消费者模型,多生产者-多消费者模型,我会给出四种情况下的 C++11 并发解决方案,如果文中出现了错误或者你对代码有异议,欢迎交流 ;-)。
单生产者-单消费者模型
顾名思义,单生产者-单消费者模型中只有一个生产者和一个消费者,生产者不停地往产品库中放入产品,消费者则从产品库中取走产品,产品库容积有限制,只能容纳一定数目的产品,如果生产者生产产品的速度过快,则需要等待消费者取走产品之后,产品库不为空才能继续往产品库中放置新的产品,相反,如果消费者取走产品的速度过快,则可能面临产品库中没有产品可使用的情况,此时需要等待生产者放入一个产品后,消费者才能继续工作。C++11实现单生产者单消费者模型的代码如下:
1 #include <unistd.h>
2
3 #include <cstdlib>
4 #include <condition_variable>
5 #include <iostream>
6 #include <mutex>
7 #include <thread>
8
9 static const int kItemRepositorySize = 10; // Item buffer size.
10 static const int kItemsToProduce = 1000; // How many items we plan to produce.
11
12 struct ItemRepository {
13 int item_buffer[kItemRepositorySize]; // 产品缓冲区, 配合 read_position 和 write_position 模型环形队列.
14 size_t read_position; // 消费者读取产品位置.
15 size_t write_position; // 生产者写入产品位置.
16 std::mutex mtx; // 互斥量,保护产品缓冲区
17 std::condition_variable repo_not_full; // 条件变量, 指示产品缓冲区不为满.
18 std::condition_variable repo_not_empty; // 条件变量, 指示产品缓冲区不为空.
19 } gItemRepository; // 产品库全局变量, 生产者和消费者操作该变量.
20
21 typedef struct ItemRepository ItemRepository;
22
23
24 void ProduceItem(ItemRepository *ir, int item)
25 {
26 std::unique_lock<std::mutex> lock(ir->mtx);
27 while(((ir->write_position + 1) % kItemRepositorySize)
28 == ir->read_position) { // item buffer is full, just wait here.
29 std::cout << "Producer is waiting for an empty slot...\n";
30 (ir->repo_not_full).wait(lock); // 生产者等待"产品库缓冲区不为满"这一条件发生.
31 }
32
33 (ir->item_buffer)[ir->write_position] = item; // 写入产品.
34 (ir->write_position)++; // 写入位置后移.
35
36 if (ir->write_position == kItemRepositorySize) // 写入位置若是在队列最后则重新设置为初始位置.
37 ir->write_position = 0;
38
39 (ir->repo_not_empty).notify_all(); // 通知消费者产品库不为空.
40 lock.unlock(); // 解锁.
41 }
42
43 int ConsumeItem(ItemRepository *ir)
44 {
45 int data;
46 std::unique_lock<std::mutex> lock(ir->mtx);
47 // item buffer is empty, just wait here.
48 while(ir->write_position == ir->read_position) {
49 std::cout << "Consumer is waiting for items...\n";
50 (ir->repo_not_empty).wait(lock); // 消费者等待"产品库缓冲区不为空"这一条件发生.
51 }
52
53 data = (ir->item_buffer)[ir->read_position]; // 读取某一产品
54 (ir->read_position)++; // 读取位置后移
55
56 if (ir->read_position >= kItemRepositorySize) // 读取位置若移到最后,则重新置位.
57 ir->read_position = 0;
58
59 (ir->repo_not_full).notify_all(); // 通知消费者产品库不为满.
60 lock.unlock(); // 解锁.
61
62 return data; // 返回产品.
63 }
64
65
66 void ProducerTask() // 生产者任务
67 {
68 for (int i = 1; i <= kItemsToProduce; ++i) {
69 // sleep(1);
70 std::cout << "Produce the " << i << "^th item..." << std::endl;
71 ProduceItem(&gItemRepository, i); // 循环生产 kItemsToProduce 个产品.
72 }
73 }
74
75 void ConsumerTask() // 消费者任务
76 {
77 static int cnt = 0;
78 while(1) {
79 sleep(1);
80 int item = ConsumeItem(&gItemRepository); // 消费一个产品.
81 std::cout << "Consume the " << item << "^th item" << std::endl;
82 if (++cnt == kItemsToProduce) break; // 如果产品消费个数为 kItemsToProduce, 则退出.
83 }
84 }
85
86 void InitItemRepository(ItemRepository *ir)
87 {
88 ir->write_position = 0; // 初始化产品写入位置.
89 ir->read_position = 0; // 初始化产品读取位置.
90 }
91
92 int main()
93 {
94 InitItemRepository(&gItemRepository);
95 std::thread producer(ProducerTask); // 创建生产者线程.
96 std::thread consumer(ConsumerTask); // 创建消费之线程.
97 producer.join();
98 consumer.join();
99 }
单生产者-多消费者模型
与单生产者和单消费者模型不同的是,单生产者-多消费者模型中可以允许多个消费者同时从产品库中取走产品。所以除了保护产品库在多个读写线程下互斥之外,还需要维护消费者取走产品的计数器,代码如下:
1 #include <unistd.h>
2
3 #include <cstdlib>
4 #include <condition_variable>
5 #include <iostream>
6 #include <mutex>
7 #include <thread>
8
9 static const int kItemRepositorySize = 4; // Item buffer size.
10 static const int kItemsToProduce = 10; // How many items we plan to produce.
11
12 struct ItemRepository {
13 int item_buffer[kItemRepositorySize];
14 size_t read_position;
15 size_t write_position;
16 size_t item_counter;
17 std::mutex mtx;
18 std::mutex item_counter_mtx;
19 std::condition_variable repo_not_full;
20 std::condition_variable repo_not_empty;
21 } gItemRepository;
22
23 typedef struct ItemRepository ItemRepository;
24
25
26 void ProduceItem(ItemRepository *ir, int item)
27 {
28 std::unique_lock<std::mutex> lock(ir->mtx);
29 while(((ir->write_position + 1) % kItemRepositorySize)
30 == ir->read_position) { // item buffer is full, just wait here.
31 std::cout << "Producer is waiting for an empty slot...\n";
32 (ir->repo_not_full).wait(lock);
33 }
34
35 (ir->item_buffer)[ir->write_position] = item;
36 (ir->write_position)++;
37
38 if (ir->write_position == kItemRepositorySize)
39 ir->write_position = 0;
40
41 (ir->repo_not_empty).notify_all();
42 lock.unlock();
43 }
44
45 int ConsumeItem(ItemRepository *ir)
46 {
47 int data;
48 std::unique_lock<std::mutex> lock(ir->mtx);
49 // item buffer is empty, just wait here.
50 while(ir->write_position == ir->read_position) {
51 std::cout << "Consumer is waiting for items...\n";
52 (ir->repo_not_empty).wait(lock);
53 }
54
55 data = (ir->item_buffer)[ir->read_position];
56 (ir->read_position)++;
57
58 if (ir->read_position >= kItemRepositorySize)
59 ir->read_position = 0;
60
61 (ir->repo_not_full).notify_all();
62 lock.unlock();
63
64 return data;
65 }
66
67
68 void ProducerTask()
69 {
70 for (int i = 1; i <= kItemsToProduce; ++i) {
71 // sleep(1);
72 std::cout << "Producer thread " << std::this_thread::get_id()
73 << " producing the " << i << "^th item..." << std::endl;
74 ProduceItem(&gItemRepository, i);
75 }
76 std::cout << "Producer thread " << std::this_thread::get_id()
77 << " is exiting..." << std::endl;
78 }
79
80 void ConsumerTask()
81 {
82 bool ready_to_exit = false;
83 while(1) {
84 sleep(1);
85 std::unique_lock<std::mutex> lock(gItemRepository.item_counter_mtx);
86 if (gItemRepository.item_counter < kItemsToProduce) {
87 int item = ConsumeItem(&gItemRepository);
88 ++(gItemRepository.item_counter);
89 std::cout << "Consumer thread " << std::this_thread::get_id()
90 << " is consuming the " << item << "^th item" << std::endl;
91 } else ready_to_exit = true;
92 lock.unlock();
93 if (ready_to_exit == true) break;
94 }
95 std::cout << "Consumer thread " << std::this_thread::get_id()
96 << " is exiting..." << std::endl;
97 }
98
99 void InitItemRepository(ItemRepository *ir)
100 {
101 ir->write_position = 0;
102 ir->read_position = 0;
103 ir->item_counter = 0;
104 }
105
106 int main()
107 {
108 InitItemRepository(&gItemRepository);
109 std::thread producer(ProducerTask);
110 std::thread consumer1(ConsumerTask);
111 std::thread consumer2(ConsumerTask);
112 std::thread consumer3(ConsumerTask);
113 std::thread consumer4(ConsumerTask);
114
115 producer.join();
116 consumer1.join();
117 consumer2.join();
118 consumer3.join();
119 consumer4.join();
120 }
多生产者-单消费者模型
与单生产者和单消费者模型不同的是,多生产者-单消费者模型中可以允许多个生产者同时向产品库中放入产品。所以除了保护产品库在多个读写线程下互斥之外,还需要维护生产者放入产品的计数器,代码如下:
1 #include <unistd.h>
2
3 #include <cstdlib>
4 #include <condition_variable>
5 #include <iostream>
6 #include <mutex>
7 #include <thread>
8
9 static const int kItemRepositorySize = 4; // Item buffer size.
10 static const int kItemsToProduce = 10; // How many items we plan to produce.
11
12 struct ItemRepository {
13 int item_buffer[kItemRepositorySize];
14 size_t read_position;
15 size_t write_position;
16 size_t item_counter;
17 std::mutex mtx;
18 std::mutex item_counter_mtx;
19 std::condition_variable repo_not_full;
20 std::condition_variable repo_not_empty;
21 } gItemRepository;
22
23 typedef struct ItemRepository ItemRepository;
24
25
26 void ProduceItem(ItemRepository *ir, int item)
27 {
28 std::unique_lock<std::mutex> lock(ir->mtx);
29 while(((ir->write_position + 1) % kItemRepositorySize)
30 == ir->read_position) { // item buffer is full, just wait here.
31 std::cout << "Producer is waiting for an empty slot...\n";
32 (ir->repo_not_full).wait(lock);
33 }
34
35 (ir->item_buffer)[ir->write_position] = item;
36 (ir->write_position)++;
37
38 if (ir->write_position == kItemRepositorySize)
39 ir->write_position = 0;
40
41 (ir->repo_not_empty).notify_all();
42 lock.unlock();
43 }
44
45 int ConsumeItem(ItemRepository *ir)
46 {
47 int data;
48 std::unique_lock<std::mutex> lock(ir->mtx);
49 // item buffer is empty, just wait here.
50 while(ir->write_position == ir->read_position) {
51 std::cout << "Consumer is waiting for items...\n";
52 (ir->repo_not_empty).wait(lock);
53 }
54
55 data = (ir->item_buffer)[ir->read_position];
56 (ir->read_position)++;
57
58 if (ir->read_position >= kItemRepositorySize)
59 ir->read_position = 0;
60
61 (ir->repo_not_full).notify_all();
62 lock.unlock();
63
64 return data;
65 }
66
67 void ProducerTask()
68 {
69 bool ready_to_exit = false;
70 while(1) {
71 sleep(1);
72 std::unique_lock<std::mutex> lock(gItemRepository.item_counter_mtx);
73 if (gItemRepository.item_counter < kItemsToProduce) {
74 ++(gItemRepository.item_counter);
75 ProduceItem(&gItemRepository, gItemRepository.item_counter);
76 std::cout << "Producer thread " << std::this_thread::get_id()
77 << " is producing the " << gItemRepository.item_counter
78 << "^th item" << std::endl;
79 } else ready_to_exit = true;
80 lock.unlock();
81 if (ready_to_exit == true) break;
82 }
83 std::cout << "Producer thread " << std::this_thread::get_id()
84 << " is exiting..." << std::endl;
85 }
86
87 void ConsumerTask()
88 {
89 static int item_consumed = 0;
90 while(1) {
91 sleep(1);
92 ++item_consumed;
93 if (item_consumed <= kItemsToProduce) {
94 int item = ConsumeItem(&gItemRepository);
95 std::cout << "Consumer thread " << std::this_thread::get_id()
96 << " is consuming the " << item << "^th item" << std::endl;
97 } else break;
98 }
99 std::cout << "Consumer thread " << std::this_thread::get_id()
100 << " is exiting..." << std::endl;
101 }
102
103 void InitItemRepository(ItemRepository *ir)
104 {
105 ir->write_position = 0;
106 ir->read_position = 0;
107 ir->item_counter = 0;
108 }
109
110 int main()
111 {
112 InitItemRepository(&gItemRepository);
113 std::thread producer1(ProducerTask);
114 std::thread producer2(ProducerTask);
115 std::thread producer3(ProducerTask);
116 std::thread producer4(ProducerTask);
117 std::thread consumer(ConsumerTask);
118
119 producer1.join();
120 producer2.join();
121 producer3.join();
122 producer4.join();
123 consumer.join();
124 }
多生产者-多消费者模型
该模型可以说是前面两种模型的综合,程序需要维护两个计数器,分别是生产者已生产产品的数目和消费者已取走产品的数目。另外也需要保护产品库在多个生产者和多个消费者互斥地访问。
代码如下:
1 #include <unistd.h>
2
3 #include <cstdlib>
4 #include <condition_variable>
5 #include <iostream>
6 #include <mutex>
7 #include <thread>
8
9 static const int kItemRepositorySize = 4; // Item buffer size.
10 static const int kItemsToProduce = 10; // How many items we plan to produce.
11
12 struct ItemRepository {
13 int item_buffer[kItemRepositorySize];
14 size_t read_position;
15 size_t write_position;
16 size_t produced_item_counter;
17 size_t consumed_item_counter;
18 std::mutex mtx;
19 std::mutex produced_item_counter_mtx;
20 std::mutex consumed_item_counter_mtx;
21 std::condition_variable repo_not_full;
22 std::condition_variable repo_not_empty;
23 } gItemRepository;
24
25 typedef struct ItemRepository ItemRepository;
26
27
28 void ProduceItem(ItemRepository *ir, int item)
29 {
30 std::unique_lock<std::mutex> lock(ir->mtx);
31 while(((ir->write_position + 1) % kItemRepositorySize)
32 == ir->read_position) { // item buffer is full, just wait here.
33 std::cout << "Producer is waiting for an empty slot...\n";
34 (ir->repo_not_full).wait(lock);
35 }
36
37 (ir->item_buffer)[ir->write_position] = item;
38 (ir->write_position)++;
39
40 if (ir->write_position == kItemRepositorySize)
41 ir->write_position = 0;
42
43 (ir->repo_not_empty).notify_all();
44 lock.unlock();
45 }
46
47 int ConsumeItem(ItemRepository *ir)
48 {
49 int data;
50 std::unique_lock<std::mutex> lock(ir->mtx);
51 // item buffer is empty, just wait here.
52 while(ir->write_position == ir->read_position) {
53 std::cout << "Consumer is waiting for items...\n";
54 (ir->repo_not_empty).wait(lock);
55 }
56
57 data = (ir->item_buffer)[ir->read_position];
58 (ir->read_position)++;
59
60 if (ir->read_position >= kItemRepositorySize)
61 ir->read_position = 0;
62
63 (ir->repo_not_full).notify_all();
64 lock.unlock();
65
66 return data;
67 }
68
69 void ProducerTask()
70 {
71 bool ready_to_exit = false;
72 while(1) {
73 sleep(1);
74 std::unique_lock<std::mutex> lock(gItemRepository.produced_item_counter_mtx);
75 if (gItemRepository.produced_item_counter < kItemsToProduce) {
76 ++(gItemRepository.produced_item_counter);
77 ProduceItem(&gItemRepository, gItemRepository.produced_item_counter);
78 std::cout << "Producer thread " << std::this_thread::get_id()
79 << " is producing the " << gItemRepository.produced_item_counter
80 << "^th item" << std::endl;
81 } else ready_to_exit = true;
82 lock.unlock();
83 if (ready_to_exit == true) break;
84 }
85 std::cout << "Producer thread " << std::this_thread::get_id()
86 << " is exiting..." << std::endl;
87 }
88
89 void ConsumerTask()
90 {
91 bool ready_to_exit = false;
92 while(1) {
93 sleep(1);
94 std::unique_lock<std::mutex> lock(gItemRepository.consumed_item_counter_mtx);
95 if (gItemRepository.consumed_item_counter < kItemsToProduce) {
96 int item = ConsumeItem(&gItemRepository);
97 ++(gItemRepository.consumed_item_counter);
98 std::cout << "Consumer thread " << std::this_thread::get_id()
99 << " is consuming the " << item << "^th item" << std::endl;
100 } else ready_to_exit = true;
101 lock.unlock();
102 if (ready_to_exit == true) break;
103 }
104 std::cout << "Consumer thread " << std::this_thread::get_id()
105 << " is exiting..." << std::endl;
106 }
107
108 void InitItemRepository(ItemRepository *ir)
109 {
110 ir->write_position = 0;
111 ir->read_position = 0;
112 ir->produced_item_counter = 0;
113 ir->consumed_item_counter = 0;
114 }
115
116 int main()
117 {
118 InitItemRepository(&gItemRepository);
119 std::thread producer1(ProducerTask);
120 std::thread producer2(ProducerTask);
121 std::thread producer3(ProducerTask);
122 std::thread producer4(ProducerTask);
123
124 std::thread consumer1(ConsumerTask);
125 std::thread consumer2(ConsumerTask);
126 std::thread consumer3(ConsumerTask);
127 std::thread consumer4(ConsumerTask);
128
129 producer1.join();
130 producer2.join();
131 producer3.join();
132 producer4.join();
133
134 consumer1.join();
135 consumer2.join();
136 consumer3.join();
137 consumer4.join();
138 }
转自:http://www.cnblogs.com/haippy/p/3252092.html
原文链接: https://www.cnblogs.com/zl1991/p/6993225.html
欢迎关注
微信关注下方公众号,第一时间获取干货硬货;公众号内回复【pdf】免费获取数百本计算机经典书籍
原创文章受到原创版权保护。转载请注明出处:https://www.ccppcoding.com/archives/255162
非原创文章文中已经注明原地址,如有侵权,联系删除
关注公众号【高性能架构探索】,第一时间获取最新文章
转载文章受原作者版权保护。转载请注明原作者出处!