06C++11线程池

1.半同步半异步线程池

​ 半同步半异步线程池共分为三层,第一层是同步服务层,它处理来自上层的任务请求。上层的请求可能是并发的,这些请求不是马上就被处理,而是将这些任务放到一个同步排队层中,等待处理。

​ 第二层是同步排队层,来自上层的任务请求都会被加到排队层中等待处理。

​ 第三层是异步服务层,这一层中会有多个线程同时处理排队层中的任务,异步服务层从同步排队成中取出任务并进行处理。

1.1 同步队列实现
#pragma once

#include <list>
#include <mutex>
#include <thread>
#include <condition_variable>
#include <iostream>
using namespace std;

template<typename T>
class SyncQueue
{
public:
    //同步队列
    SyncQueue(const int& maxSize) :m_maxSize(maxSize), m_needStop(false)
    {}
    virtual ~SyncQueue()
    {}


    void put(const T& x)
    {
        add(x);
    }

    void put(T&& x)
    {
        add(std::forward<T>(x));
    }

    void take(std::list<T>& list)
    {
        std::unique_lock<std::mutex> lck(m_mutex);
        while(!m_needStop && !notEmpty())
        {
            m_notEmpty.wait(lck);
        }

        if (m_needStop)
        {
            return;
        }

        list = std::move(m_queue);
        m_notFull.notify_one();
    }


    void take(T& t)
    {
        //1.创建互斥锁
        std::unique_lock<std::mutex> lck(m_mutex);
        while(!m_needStop && !notEmpty())
        {
            m_notEmpty.wait(lck);
        }
        //m_notEmpty.wait(lck, [this]{return (m_needStop || notEmpty()); });

        if (m_needStop)
        {
            return;
        }

        t = m_queue.front();
        m_queue.pop_front();
        m_notFull.notify_one();

    }

    void stop()
    {
        {
            std::unique_lock<std::mutex> lck(m_mutex);
            m_needStop = true;
        }
        m_notFull.notify_all();
        m_notEmpty.notify_all();
    }

    bool empty()
    {
        std::lock_guard<std::mutex> locker(m_mutex);
        return m_queue.empty();
    }

    bool full()
    {
        return m_queue.size() == m_maxSize;
    }

    size_t size()
    {
        return m_queue.size();
    }

    int count()
    {
        return m_queue.size();
    }

private:
    bool notFull() const
    {
        bool isFull = m_queue.size() >= m_maxSize;
        if (isFull)
        {
            cout << "queue is full, please wait..." << endl;
        }
        return !isFull;
    }

    bool notEmpty() const
    {
        bool isEmpty = m_queue.empty();
        if (isEmpty)
        {
            cout << "queue is empty, please wait..." << endl;
        }
        return !isEmpty;
    }

    template<typename F>
    void add(F&& x)
    {
        std::unique_lock<std::mutex> lck(m_mutex);
        while (!m_needStop && !notFull())
        {
            m_notFull.wait(lck);
        }
        //m_notFull.wait(lck, [this]{return (m_needStop || notFull()); });

        if (m_needStop)
        {
            return;
        }

        m_queue.push_back(std::forward<F>(x));
        m_notEmpty.notify_one();
    }

private:
    std::list<T> m_queue;
    std::mutex m_mutex;
    std::condition_variable m_notEmpty;
    std::condition_variable m_notFull;
    size_t m_maxSize;
    bool m_needStop;


};
1.2 线程池实现
#pragma once

#include <list>
#include <thread>
#include <functional>
#include <memory>
#include <atomic>
#include "sync_queue.h"

const int MAX_TASK_COUNT = 100;
using Task = std::function<void()>;


class ThreadPool
{
public:
    ThreadPool(const int& threadCount = std::thread::hardware_concurrency())
        :m_queue(MAX_TASK_COUNT)
    {
        startThreadGroup(threadCount);
    }

    ~ThreadPool()
    {
        stop();
    }

    void stop()
    {
        std::call_once(m_flag, [this](){stopThreadGroup();});
    }

    void addTask(Task&& task)
    {
        m_queue.put(std::forward<Task>(task));
    }


    void addTask(const Task& task)
    {
        m_queue.put(task);
    }

private:
    void startThreadGroup(const int& threadCount)
    {
        m_running = true;

        for (int i = 0; i < threadCount; ++i)
        {
            m_threadGroup.push_back(std::make_shared<std::thread>(&ThreadPool::runInThread, this));
        }
    }

    void runInThread()
    {
        while (m_running)
        {
            Task task;
            m_queue.take(task);
            if (!m_running)
            {
                return;
            }
            if (task)
            {
                task();
            }
        }
    }

    void stopThreadGroup()
    {
        m_queue.stop();
        m_running = false;

        for (auto& thread : m_threadGroup)
        {
            if (thread)
            {
                thread->join();
            }
        }
        m_threadGroup.clear();
    }

    std::list<std::shared_ptr<std::thread>> m_threadGroup;
    SyncQueue<Task> m_queue;
    atomic_bool m_running;
    std::once_flag m_flag;
};
1.3 测试demo
#include <iostream>
#include "thread_pool.h"


using namespace std;

int main()
{
    cout << "Hell ThreadPool!" << endl;

    ThreadPool pool;

    for (int i = 0; i < 10; ++i)
    {
        pool.addTask([i]{
            cout << "dooing tast:" << i << " in thread "<< this_thread::get_id() << endl;
            this_thread::sleep_for(std::chrono::seconds(1));
        });
    }

    this_thread::sleep_for(std::chrono::seconds(20));

    pool.stop();

    return 0;
}

image

原文链接: https://www.cnblogs.com/rock-cc/p/13161958.html

欢迎关注

微信关注下方公众号,第一时间获取干货硬货;公众号内回复【pdf】免费获取数百本计算机经典书籍;

也有高质量的技术群,里面有嵌入式、搜广推等BAT大佬

    06C++11线程池

原创文章受到原创版权保护。转载请注明出处:https://www.ccppcoding.com/archives/356942

非原创文章文中已经注明原地址,如有侵权,联系删除

关注公众号【高性能架构探索】,第一时间获取最新文章

转载文章受原作者版权保护。转载请注明原作者出处!

(0)
上一篇 2023年3月2日 上午11:29
下一篇 2023年3月2日 上午11:30

相关推荐