编写了一个最基本的线程池类,处理用c_work表示的工作任务。C++还很不熟练,欢迎会C++的提出宝贵的修改意见。
程序有注释,所以应该很好读懂。测试程序在下面。
- ///////////////////////////////////////////////////////
- //线程池类
- ///////////////////////////////////////////////////////
- #include <pthread.h>
- #include <stdlib.h>
- #include <stdio.h>
- #include <unistd.h>
- #include <assert.h>
- constint DEFAULT_MAX_THREAD_NUM = 10;
- constint MAX_WORK_NUM = 100000;
- //c_worker类
- class c_work
- {
- public:
- c_work():process(NULL), arg(NULL), next(NULL){}
- c_work(void *(*prcss)(void *), void *arg):
- process(prcss), arg(arg), next(NULL) {}
- ~c_work();
- void *(*process)(void *);
- void *arg;
- unsigned char type; //最高位表示arg是否需要delete操作
- c_work *next;
- };
- c_work::~c_work()
- {
- unsigned char ifdel = type >> 7;
- if (ifdel)
- {
- delete arg;
- arg = NULL;
- }
- }
- class c_thread_pool
- {
- public:
- c_thread_pool();
- c_thread_pool(constint max_thread_num);
- ~c_thread_pool();
- int add_work(c_work work);
- staticvoid *thread_routine(void *arg);
- pthread_mutex_t queue_lock;
- pthread_cond_t queue_cond;
- // private:
- c_work *queue_head;
- c_work *queue_tail;
- int shutdown;
- pthread_t *threadid;
- int max_thread_num;
- int cur_queue_size;
- };
- c_thread_pool::c_thread_pool()
- {
- pthread_mutex_init(&queue_lock, NULL);
- pthread_cond_init(&queue_cond, NULL);
- //工作队列初始化
- queue_head = NULL;
- queue_tail = NULL;
- max_thread_num = max_thread_num;
- cur_queue_size = 0;
- shutdown = 0;
- max_thread_num = DEFAULT_MAX_THREAD_NUM;
- threadid = new pthread_t[max_thread_num];
- int i = 0;
- for (i = 0; i < max_thread_num; i++)
- {
- pthread_create(&(threadid[i]), NULL, thread_routine, (void*)this);
- }
- }
- c_thread_pool::c_thread_pool(int max_thread_num)
- {
- pthread_mutex_init(&queue_lock, NULL);
- pthread_cond_init(&queue_cond, NULL);
- //工作队列初始化
- queue_head = NULL;
- queue_tail = NULL;
- max_thread_num = max_thread_num;
- cur_queue_size = 0;
- threadid = new pthread_t[max_thread_num];
- int i = 0;
- for (i = 0; i < max_thread_num; i++)
- {
- pthread_create(&(threadid[i]), NULL, thread_routine, (void*)this);
- }
- }
- /*向线程池中的任务队列加入任务*/
- int c_thread_pool::add_work(c_work work)
- {
- c_work *newwork = new c_work;
- newwork->process = work.process;
- newwork->arg = work.arg;
- newwork->next = NULL;
- pthread_mutex_lock(&queue_lock);
- /*将任务加入到等待队列中*/
- if (queue_head != NULL && queue_tail != NULL)
- {
- queue_tail->next = newwork;
- queue_tail = newwork;
- }
- else
- {
- //空队列
- queue_head = newwork;
- queue_tail = newwork;
- }
- cur_queue_size++;
- pthread_mutex_unlock(&queue_lock);
- /*等待队列中有任务了,唤醒一个等待线程,注意如果所有线程都在忙碌,这句没有任何作用*/
- pthread_cond_signal(&(queue_cond));
- printf("add work returned!\n");
- return 0;
- }
- void* c_thread_pool::thread_routine(void *arg)
- {
- c_thread_pool *pool = (c_thread_pool *)arg;
- int i = 0;
- while (1)
- {
- pthread_mutex_lock(&(pool->queue_lock));
- //如果等待队列为0并且不销毁线程池,则处于阻塞状态; 注意
- // pthread_cond_wait是一个原子操作,等待前会解锁,唤醒后会加锁
- //标注:注意这一如果任务队列不为空的话,while语句将被跳过,直接执行下面的调用。
- while (pool->cur_queue_size == 0 && pool->shutdown)
- {
- pthread_cond_wait(&(pool->queue_cond), &(pool->queue_lock));
- }
- //等待队列长度减去1,并取出链表中的头元素
- if (pool->cur_queue_size > 0 && pool->queue_head != NULL)
- {
- printf("IN THREAD ROUTINE size = %d && queue head is not NULL\n", pool->cur_queue_size);
- pool->cur_queue_size--;
- c_work *work = pool->queue_head;
- pool->queue_head = work->next;
- pthread_mutex_unlock(&(pool->queue_lock));
- //调用回调函数,执行测试任务
- //////////////////////////////////////////
- (*(work->process))(work->arg);
- free(work);
- work = NULL;
- }
- else//不可达
- {
- pthread_mutex_unlock(&(pool->queue_lock));
- }
- }
- }
- c_thread_pool::~c_thread_pool()
- {
- for (int i = 0; i < max_thread_num; ++i)
- pthread_cancel(threadid[i]);
- for (c_work *w_t = queue_head; w_t != NULL;)
- {
- c_work *temp = w_t->next;
- delete w_t;
- w_t = temp;
- }
- delete [] threadid;
- }
/////////////////////////////////////////////////////// //线程池类 /////////////////////////////////////////////////////// #include <pthread.h> #include <stdlib.h> #include <stdio.h> #include <unistd.h> #include <assert.h> const int DEFAULT_MAX_THREAD_NUM = 10; const int MAX_WORK_NUM = 100000; //c_worker类 class c_work { public: c_work():process(NULL), arg(NULL), next(NULL){} c_work(void *(*prcss)(void *), void *arg): process(prcss), arg(arg), next(NULL) {} ~c_work(); void *(*process)(void *); void *arg; unsigned char type; //最高位表示arg是否需要delete操作 c_work *next; }; c_work::~c_work() { unsigned char ifdel = type >> 7; if (ifdel) { delete arg; arg = NULL; } } class c_thread_pool { public: c_thread_pool(); c_thread_pool(const int max_thread_num); ~c_thread_pool(); int add_work(c_work work); static void *thread_routine(void *arg); pthread_mutex_t queue_lock; pthread_cond_t queue_cond; // private: c_work *queue_head; c_work *queue_tail; int shutdown; pthread_t *threadid; int max_thread_num; int cur_queue_size; }; c_thread_pool::c_thread_pool() { pthread_mutex_init(&queue_lock, NULL); pthread_cond_init(&queue_cond, NULL); //工作队列初始化 queue_head = NULL; queue_tail = NULL; max_thread_num = max_thread_num; cur_queue_size = 0; shutdown = 0; max_thread_num = DEFAULT_MAX_THREAD_NUM; threadid = new pthread_t[max_thread_num]; int i = 0; for (i = 0; i < max_thread_num; i++) { pthread_create(&(threadid[i]), NULL, thread_routine, (void*)this); } } c_thread_pool::c_thread_pool(int max_thread_num) { pthread_mutex_init(&queue_lock, NULL); pthread_cond_init(&queue_cond, NULL); //工作队列初始化 queue_head = NULL; queue_tail = NULL; max_thread_num = max_thread_num; cur_queue_size = 0; threadid = new pthread_t[max_thread_num]; int i = 0; for (i = 0; i < max_thread_num; i++) { pthread_create(&(threadid[i]), NULL, thread_routine, (void*)this); } } /*向线程池中的任务队列加入任务*/ int c_thread_pool::add_work(c_work work) { c_work *newwork = new c_work; newwork->process = work.process; newwork->arg = work.arg; newwork->next = NULL; pthread_mutex_lock(&queue_lock); /*将任务加入到等待队列中*/ if (queue_head != NULL && queue_tail != NULL) { queue_tail->next = newwork; queue_tail = newwork; } else { //空队列 queue_head = newwork; queue_tail = newwork; } cur_queue_size++; pthread_mutex_unlock(&queue_lock); /*等待队列中有任务了,唤醒一个等待线程,注意如果所有线程都在忙碌,这句没有任何作用*/ pthread_cond_signal(&(queue_cond)); printf("add work returned!\n"); return 0; } void* c_thread_pool::thread_routine(void *arg) { c_thread_pool *pool = (c_thread_pool *)arg; int i = 0; while (1) { pthread_mutex_lock(&(pool->queue_lock)); //如果等待队列为0并且不销毁线程池,则处于阻塞状态; 注意 // pthread_cond_wait是一个原子操作,等待前会解锁,唤醒后会加锁 //标注:注意这一如果任务队列不为空的话,while语句将被跳过,直接执行下面的调用。 while (pool->cur_queue_size == 0 && pool->shutdown) { pthread_cond_wait(&(pool->queue_cond), &(pool->queue_lock)); } //等待队列长度减去1,并取出链表中的头元素 if (pool->cur_queue_size > 0 && pool->queue_head != NULL) { printf("IN THREAD ROUTINE size = %d && queue head is not NULL\n", pool->cur_queue_size); pool->cur_queue_size--; c_work *work = pool->queue_head; pool->queue_head = work->next; pthread_mutex_unlock(&(pool->queue_lock)); //调用回调函数,执行测试任务 ////////////////////////////////////////// (*(work->process))(work->arg); free(work); work = NULL; } else //不可达 { pthread_mutex_unlock(&(pool->queue_lock)); } } } c_thread_pool::~c_thread_pool() { for (int i = 0; i < max_thread_num; ++i) pthread_cancel(threadid[i]); for (c_work *w_t = queue_head; w_t != NULL;) { c_work *temp = w_t->next; delete w_t; w_t = temp; } delete [] threadid; }
公司简介
聚拓互联(http://www.ejutuo.com).Net平台至强虚拟主机供应商,是领先的互联网基础应用服务提供商,主要面向全球客户提供域名注册、国内、香港/美国虚拟主机、企业邮箱、智能建站、虚拟服务器(VPS)、服务器租用、服务器托管等丰富的网络产品服务。
聚拓互联的快速发展与其企业文化密不可分,易网人秉持“团结互助、敬业负责、恪守信誉、积极进取、勇于创新”的企业文化,汇聚了行业内的大量专业人士,拥有多位国内顶尖的linux/freeBSD/unix经验的系统工程师、微软认证工程师和网络安全技术人才。核心团队均为该行业从业多年的专业人士,拥有丰富行业经验和较高威望,并不断改革创新,以满足客户多元化需求为己任,不断进取。 同时,易网坚守 “专业品质、服务为本、诚信经营、恪守信誉”的核心价值观,为客户提供多样、安全、稳定、放心的产品。
原文链接: https://www.cnblogs.com/ejutuo/archive/2012/08/23/2653348.html
欢迎关注
微信关注下方公众号,第一时间获取干货硬货;公众号内回复【pdf】免费获取数百本计算机经典书籍
原创文章受到原创版权保护。转载请注明出处:https://www.ccppcoding.com/archives/60719
非原创文章文中已经注明原地址,如有侵权,联系删除
关注公众号【高性能架构探索】,第一时间获取最新文章
转载文章受原作者版权保护。转载请注明原作者出处!