同步队列作为一个线程安全的数据共享区,经常用于线程之间数据读取,比如半同步半异步线程池的同步队列。
其实做起来比较简单,要用到list、锁和条件变量,条件变量的作用是在队列满了或者空了的时候等待通知。先看一个简单的同步队列:
#include <thread>
#include <condition_variable>
#include <mutex>
#include <list>
#include <iostream>
using namespace std;
template<typename T>
class SimpleSyncQueue
{
public:
void Put(const T& x)
{
std::lock_guard<std::mutex> locker(m_mutex);
m_queue.push_back(x);
m_notEmpty.notify_one();
}
void Take(T& x)
{
std::unique_lock<std::mutex> locker(m_mutex);
m_notEmpty.wait(locker, [this]{return !m_queue.empty();});
x = m_queue.front();
m_queue.pop_front();
}
private:
std::list<T> m_queue;
std::mutex m_mutex;
std::condition_variable_any m_notEmpty;
};
再看一个带上限的同步队列:
#include<list>
#include<mutex>
#include<thread>
#include<condition_variable>
#include <iostream>
using namespace std;
template<typename T>
class SyncQueue
{
public:
SyncQueue(int maxSize) :m_maxSize(maxSize), m_needStop(false)
{
}
void Put(const T&x)
{
Add(x);
}
void Put(T&&x)
{
Add(std::forward<T>(x));
}
void Take(std::list<T>& list)
{
std::unique_lock<std::mutex> locker(m_mutex);
m_notEmpty.wait(locker, [this]{return m_needStop || NotEmpty(); });
if (m_needStop)
return;
list = std::move(m_queue);
m_notFull.notify_one();
}
void Take(T& t)
{
std::unique_lock<std::mutex> locker(m_mutex);
m_notEmpty.wait(locker, [this]{return m_needStop || NotEmpty(); });
if (m_needStop)
return;
t = m_queue.front();
m_queue.pop_front();
m_notFull.notify_one();
}
void Stop()
{
{
std::lock_guard<std::mutex> locker(m_mutex);
m_needStop = true;
}
m_notFull.notify_all();
m_notEmpty.notify_all();
}
bool Empty()
{
std::lock_guard<std::mutex> locker(m_mutex);
return m_queue.empty();
}
bool Full()
{
std::lock_guard<std::mutex> locker(m_mutex);
return m_queue.size() == m_maxSize;
}
size_t Size()
{
std::lock_guard<std::mutex> locker(m_mutex);
return m_queue.size();
}
int Count()
{
return m_queue.size();
}
private:
bool NotFull() const
{
bool full = m_queue.size() >= m_maxSize;
if (full)
cout << "full, waiting,thread id: " << this_thread::get_id() << endl;
return !full;
}
bool NotEmpty() const
{
bool empty = m_queue.empty();
if (empty)
cout << "empty,waiting,thread id: " << this_thread::get_id() << endl;
return !empty;
}
template<typename F>
void Add(F&&x)
{
std::unique_lock< std::mutex> locker(m_mutex);
m_notFull.wait(locker, [this]{return m_needStop || NotFull(); });
if (m_needStop)
return;
m_queue.push_back(std::forward<F>(x));
m_notEmpty.notify_one();
}
private:
std::list<T> m_queue; //缓冲区
std::mutex m_mutex; //互斥量和条件变量结合起来使用
std::condition_variable m_notEmpty;//不为空的条件变量
std::condition_variable m_notFull; //没有满的条件变量
int m_maxSize; //同步队列最大的size
bool m_needStop; //停止的标志
};
测试代码比较简单,就不写了。实际的运用在后面的线程池中会用到。
原文链接: https://www.cnblogs.com/qicosmos/archive/2013/05/30/3107975.html
欢迎关注
微信关注下方公众号,第一时间获取干货硬货;公众号内回复【pdf】免费获取数百本计算机经典书籍
原创文章受到原创版权保护。转载请注明出处:https://www.ccppcoding.com/archives/90541
非原创文章文中已经注明原地址,如有侵权,联系删除
关注公众号【高性能架构探索】,第一时间获取最新文章
转载文章受原作者版权保护。转载请注明原作者出处!