基于C++11的线程池实现

源码如下:

#include <thread>
#include <mutex>
#include <vector>
#include <deque>
#include <condition_variable>
#include <atomic>

namespace stdx
{
    class ThreadPool
    {
    private:
        std::size_t m_pool_size;
        std::size_t m_pool_size_max_growth;
        enum
        {
            DEFAULT_THREAD_POOL_COUNT = 5,
            DEFAULT_THREAD_MAX_COUNT = 24,
            DEFAULT_THREAD_POOL_COUNT_THRESHOLD = 128
        };
        std::vector<std::thread> m_threads;
        std::deque<std::function<void()> > m_threadfuncs;

        std::condition_variable m_cv;
        std::mutex m_mutex;

        std::atomic_int m_atmoic_working_couter;
        std::atomic_bool m_atmoic_quit_sign;

        std::atomic_bool m_atmoic_fifo_lifo;

        using LOCK = std::unique_lock<std::mutex>;

        void init()
        {
            for (int i = 0; i < m_pool_size; ++i)
            {
                pushback_thread();
            }
        }

        //should invoked in lockguard
        void pushback_thread()
        {
            m_threads.emplace_back([this]() -> void
                                   {
                                       while (!m_atmoic_quit_sign.load())
                                       {
                                           std::function<void()> fn;
                                           try
                                           {
                                               LOCK lock(m_mutex);
                                               m_cv.wait(lock,
                                                         [this]() -> bool
                                                         {
                                                             return !m_threadfuncs.empty() ||
                                                                    m_atmoic_quit_sign.load();
                                                         }
                                               );
                                               if (m_atmoic_quit_sign.load())
                                               {
                                                   break;
                                               }
                                               if (!m_threadfuncs.empty())
                                               {
                                                   if (m_atmoic_fifo_lifo.load())
                                                   {
                                                       fn = std::move(m_threadfuncs.front());
                                                       m_threadfuncs.pop_front();
                                                   }
                                                   else
                                                   {
                                                       fn = std::move(m_threadfuncs.back());
                                                       m_threadfuncs.pop_back();
                                                   }

                                               }
                                               lock.unlock();
                                               ++m_atmoic_working_couter;
                                               if (fn)
                                               {
                                                   fn();
                                               }
                                               --m_atmoic_working_couter;
                                           }
                                           catch (...)
                                           {
                                               std::cout << "catch exp:" << __LINE__ << std::endl;
                                           }

                                       }
                                   });
        }

        void uninit()
        {
            for (auto &thread : m_threads)
            {
                if (thread.joinable())
                {
                    thread.join();
                }
            }
            m_threads.clear();
        }

        //should invoked in lockguard
        bool has_idle()
        {
            return m_atmoic_working_couter.load() < m_threads.size();
        }

    public:
        ThreadPool() : ThreadPool(DEFAULT_THREAD_POOL_COUNT, DEFAULT_THREAD_MAX_COUNT)
        {

        }

        ThreadPool(std::size_t count_begin, std::size_t count_max_growth) : m_pool_size(count_begin),
                                                                            m_pool_size_max_growth(count_max_growth),
                                                                            m_atmoic_working_couter(0),
                                                                            m_atmoic_quit_sign(false),
                                                                            m_atmoic_fifo_lifo(true)
        {
            if (m_pool_size > DEFAULT_THREAD_POOL_COUNT_THRESHOLD)
            {
                m_pool_size = DEFAULT_THREAD_POOL_COUNT_THRESHOLD;
            }
            if (m_pool_size_max_growth > DEFAULT_THREAD_POOL_COUNT_THRESHOLD)
            {
                m_pool_size_max_growth = DEFAULT_THREAD_POOL_COUNT_THRESHOLD;
            }
            init();
        }

        ~ThreadPool()
        {
            quit();
            uninit();
        }

        ThreadPool(const ThreadPool &) = delete;

        ThreadPool(ThreadPool &&) = delete;

        void operator=(const ThreadPool &) = delete;

        void operator=(ThreadPool &&) = delete;
        ///////////////////////////////////////////////////////////////

        template<typename Fn, typename ...Params>
        void commit(Fn &&fn, Params &&... params)
        {
            try
            {
                LOCK lock(m_mutex);

                if (!has_idle() && m_threads.size() < m_pool_size_max_growth)
                {
                    pushback_thread();
                }

                m_threadfuncs.emplace_back(
                        std::bind(std::forward<Fn>(fn), std::forward<Params>(params)...)
                );
                m_cv.notify_one();
                lock.unlock();
            }
            catch (...)
            {
                std::cout << "catch exp:" << __LINE__ << std::endl;
            }
        }

        void quit()
        {
            m_atmoic_quit_sign.store(true);
            try
            {
                LOCK lock(m_mutex);
                m_cv.notify_all();
            }
            catch (...)
            {
                std::cout << "catch exp:" << __LINE__ << std::endl;
            }
        }

        std::size_t get_pending_count()
        {
            std::size_t count = 0;
            try
            {
                LOCK lock(m_mutex);
                count = m_threadfuncs.size();
            }
            catch (...)
            {
                std::cout << "catch exp:" << __LINE__ << std::endl;
            }

            return count;
        }

        std::size_t get_working_thread_count()
        {
            std::size_t count = 0;
            try
            {
                LOCK lock(m_mutex);
                count = m_threads.size();
            }
            catch (...)
            {
                std::cout << "catch exp:" << __LINE__ << std::endl;
            }

            return count;
        }

        int get_working_count()
        {
            return m_atmoic_working_couter.load();
        }

        void set_execute_fifo_lifo(bool b)
        {
            m_atmoic_fifo_lifo.store(b);
        }

    };
}

测试代码如下:

lass ThreadPoolTest
{
#define SLEEP(x) std::this_thread::sleep_for(x)
    using TIMESECONDS = std::chrono::seconds;
    using TIMEMILLI = std::chrono::milliseconds;
public:
    static void callback(std::string str, float f, const char *sztmp)
    {
        std::cout << "commit2:" << str << " " << f << sztmp << std::endl;
        std::this_thread::sleep_for(std::chrono::seconds(3));
    }

    struct functor
    {
        void operator()()
        {
            std::cout << "commit3:" << "nothing" << std::endl;
            std::this_thread::sleep_for(std::chrono::seconds(3));
        }
    };

    template<typename CALLBACK>
    static void thread_callback(int a, CALLBACK fn)
    {
        std::cout << "commit 5:" << a << std::endl;
        fn();
    }

    static void execute()
    {
        stdx::ThreadPool thread_pool(1, 5);
//        thread_pool.set_execute_fifo_lifo(false);

        thread_pool.commit(
                [](int a, char b) -> int
                {
                    std::cout << "commit1:" << a << " " << b << std::endl;
                    SLEEP(TIMESECONDS(3));
                    return 12;
                },
                123,
                'c'
        );

        SLEEP(TIMEMILLI (10));

        thread_pool.commit(
                callback,
                std::string("456"),
                1.1f,
                "789"
        );

        thread_pool.commit(
                functor()
        );

        auto lambdacallback = []() -> void
        {
            std::cout << "thread rans end" << std::endl;
        };
        thread_pool.commit(thread_callback<decltype(lambdacallback)>,
                           999,
                           lambdacallback
        );

        for (int i = 0; i < 10; ++i)
        {
            thread_pool.commit(
                    [=]
                    {
                        std::cout << "commitn:" << i << std::endl;
                        if (i == 9)
                        {
                            SLEEP(TIMESECONDS(0));
                        }
                    }
            );
        }

        SLEEP(TIMESECONDS(1));

        std::cout << "idle?:" << thread_pool.has_idle() << std::endl;
        std::cout << "pending count:" << thread_pool.get_pending_count() << std::endl;

        SLEEP(TIMESECONDS(3));
        std::cout << "working count:" << thread_pool.get_working_count() << std::endl;

        SLEEP(TIMESECONDS(5));
    }
};

欢迎关注

微信关注下方公众号,第一时间获取干货硬货;公众号内回复【pdf】免费获取数百本计算机经典书籍,也欢迎加入技术群,里面有自动驾驶、嵌入式、搜索广告以及推荐等BAT大厂大佬
基于C++11的线程池实现

原创文章受到原创版权保护。转载请注明出处:https://www.ccppcoding.com/archives/318681

非原创文章文中已经注明原地址,如有侵权,联系删除

关注公众号【高性能架构探索】,第一时间获取最新文章

转载文章受原作者版权保护。转载请注明原作者出处!

(0)
上一篇 2023年2月21日 上午9:24
下一篇 2023年2月24日 下午2:59

相关推荐