并发多线程8condition_variable、wait、notify_one、notify_all

第八节 condition_variable、wait、notify_one、notify_all

一、条件变量condition_variable、wait、notify_one、notify_all
std::condition_variable实际上是一个类,是一个和条件相关的类,说白了就是等待一个条件达成。

std::mutex mymutex1;
std::unique_lock<std::mutex> sbguard1(mymutex1);
std::condition_variable condition;
condition.wait(sbguard1, [this] {if (!msgRecvQueue.empty())//一个lambda表达式
                                    return true;
                                return false;
                                });

condition.wait(sbguard1);

wait()用来等一个东西

如果第二个参数的lambda表达式返回值是false,那么wait()将解锁互斥量,并阻塞到本行
如果第二个参数的lambda表达式返回值是true,那么wait()直接返回并继续执行

阻塞到什么时候为止呢?阻塞到其他某个线程调用notify_one()成员函数为止;

如果没有第二个参数,那么效果跟第二个参数lambda表达式返回false效果一样

wait()将解锁互斥量,并阻塞到本行,阻塞到其他某个线程调用notify_one()成员函数为止。

 

1、wait()不断尝试获取互斥量锁,如果获取不到那么流程就卡在wait()这里等待获取,如果获取到了,那么wait()就继续执行,获取到了锁

2.1、如果wait有第二个参数就判断这个lambda表达式。

a)如果表达式为false,那wait又对互斥量解锁,然后又休眠,等待再次被notify_one()唤醒
b)如果lambda表达式为true,则wait返回,流程可以继续执行(此时互斥量已被锁住)。
2.2、如果wait没有第二个参数,则wait只能等待notify_one()唤醒。

#include <thread>
#include <iostream>
#include <list>
#include <mutex>
using namespace std;

class A {
public:
    void inMsgRecvQueue() {
        for (int i = 0; i < 100000; ++i) 
        {
            cout << "inMsgRecvQueue插入一个元素" << i << endl;

            std::unique_lock<std::mutex> sbguard1(mymutex1);
            msgRecvQueue.push_back(i); 
            //尝试把wait()线程唤醒,执行完这行,
            //那么outMsgRecvQueue()里的wait就会被唤醒
            //只有当另外一个线程正在执行wait()时notify_one()才会起效,否则没有作用
            condition.notify_one();
        }
    }

    void outMsgRecvQueue() {
        int command = 0;
        while (true) {
            std::unique_lock<std::mutex> sbguard2(mymutex1);
            // wait()用来等一个东西
            // 如果第二个参数的lambda表达式返回值是false,那么wait()将解锁互斥量,并阻塞到本行
            // 阻塞到什么时候为止呢?阻塞到其他某个线程调用notify_one()成员函数为止;
            //当 wait() 被 notify_one() 激活时,会先执行它的 条件判断表达式 是否为 true,
            //如果为true才会继续往下执行
            condition.wait(sbguard2, [this] {
                if (!msgRecvQueue.empty())
                    return true;
                return false;});
            command = msgRecvQueue.front();
            msgRecvQueue.pop_front();
            //因为unique_lock的灵活性,我们可以随时unlock,以免锁住太长时间
            sbguard2.unlock(); 
            cout << "outMsgRecvQueue()执行,取出第一个元素" << endl;
        }
    }

private:
    std::list<int> msgRecvQueue;
    std::mutex mymutex1;
    std::condition_variable condition;
};

int main() {
    A myobja;
    std::thread myoutobj(&A::outMsgRecvQueue, &myobja);
    std::thread myinobj(&A::inMsgRecvQueue, &myobja);
    myinobj.join();
    myoutobj.join();
}

使用条件变量的线程安全队列

#include <queue>
#include <memory>
#include <mutex>
#include <condition_variable>

template<typename T>
class threadsafe_queue
{
private:
  mutable std::mutex mut;  // 1 互斥量必须是可变的 
  std::queue<T> data_queue;
  std::condition_variable data_cond;
public:
  threadsafe_queue()
  {}
  threadsafe_queue(threadsafe_queue const& other)
  {
    std::lock_guard<std::mutex> lk(other.mut);
    data_queue=other.data_queue;
  }

  void push(T new_value)
  {
    std::lock_guard<std::mutex> lk(mut);
    data_queue.push(new_value);
    data_cond.notify_one();
  }

  void wait_and_pop(T& value)
  {
    std::unique_lock<std::mutex> lk(mut);
    data_cond.wait(lk,[this]{return !data_queue.empty();});
    value=data_queue.front();
    data_queue.pop();
  }

  std::shared_ptr<T> wait_and_pop()
  {
    std::unique_lock<std::mutex> lk(mut);
    data_cond.wait(lk,[this]{return !data_queue.empty();});
    std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
    data_queue.pop();
    return res;
  }

  bool try_pop(T& value)
  {
    std::lock_guard<std::mutex> lk(mut);
    if(data_queue.empty())
      return false;
    value=data_queue.front();
    data_queue.pop();
    return true;
  }

  std::shared_ptr<T> try_pop()
  {
    std::lock_guard<std::mutex> lk(mut);
    if(data_queue.empty())
      return std::shared_ptr<T>();
    std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
    data_queue.pop();
    return res;
  }

  bool empty() const
  {
    std::lock_guard<std::mutex> lk(mut);
    return data_queue.empty();
  }
};

线程安全队列应用

threadsafe_queue<data_chunk> data_queue;  // 1

void data_preparation_thread()
{
  while(more_data_to_prepare())
  {
    data_chunk const data=prepare_data();
    data_queue.push(data);  // 2
  }
}

void data_processing_thread()
{
  while(true)
  {
    data_chunk data;
    data_queue.wait_and_pop(data);  // 3
    process(data);
    if(is_last_chunk(data))
      break;
  }
}

 

二、深入思考

上面的代码可能导致出现一种情况:
因为outMsgRecvQueue()与inMsgRecvQueue()并不是一对一执行的,所以当程序循环执行很多次以后,可能在msgRecvQueue 中已经有了很多消息,但是,outMsgRecvQueue还是被唤醒一次只处理一条数据。这时可以考虑把outMsgRecvQueue多执行几次,或者对inMsgRecvQueue进行限流。

三、notify_all()

notify_one():通知一个线程的wait()

notify_all():通知所有线程的wait()

原文链接:https://blog.csdn.net/qq_38231713/article/details/106092714

原文链接: https://www.cnblogs.com/gk520/p/16644286.html

欢迎关注

微信关注下方公众号,第一时间获取干货硬货;公众号内回复【pdf】免费获取数百本计算机经典书籍;

也有高质量的技术群,里面有嵌入式、搜广推等BAT大佬

    并发多线程8condition_variable、wait、notify_one、notify_all

原创文章受到原创版权保护。转载请注明出处:https://www.ccppcoding.com/archives/404289

非原创文章文中已经注明原地址,如有侵权,联系删除

关注公众号【高性能架构探索】,第一时间获取最新文章

转载文章受原作者版权保护。转载请注明原作者出处!

(0)
上一篇 2023年4月25日 下午4:27
下一篇 2023年4月25日 下午4:27

相关推荐