线程池的设计与测试

编写了一个最基本的线程池类,处理用c_work表示的工作任务。C++还很不熟练,欢迎会C++的提出宝贵的修改意见。

程序有注释,所以应该很好读懂。测试程序在下面。

 

 

  1. ///////////////////////////////////////////////////////
  2. //线程池类
  3. ///////////////////////////////////////////////////////
  4. #include <pthread.h>
  5. #include <stdlib.h>
  6. #include <stdio.h>
  7. #include <unistd.h>
  8. #include <assert.h>
  9. constint DEFAULT_MAX_THREAD_NUM = 10;
  10. constint MAX_WORK_NUM = 100000;
  11. //c_worker类
  12. class c_work
  13. {
  14. public:
  15. c_work():process(NULL), arg(NULL), next(NULL){}
  16. c_work(void *(*prcss)(void *), void *arg):
  17. process(prcss), arg(arg), next(NULL) {}
  18. ~c_work();
  19. void *(*process)(void *);
  20. void *arg;
  21. unsigned char type; //最高位表示arg是否需要delete操作
  22. c_work *next;
  23. };
  24. c_work::~c_work()
  25. {
  26. unsigned char ifdel = type >> 7;
  27. if (ifdel)
  28. {
  29. delete arg;
  30. arg = NULL;
  31. }
  32. }
  33. class c_thread_pool
  34. {
  35. public:
  36. c_thread_pool();
  37. c_thread_pool(constint max_thread_num);
  38. ~c_thread_pool();
  39. int add_work(c_work work);
  40. staticvoid *thread_routine(void *arg);
  41. pthread_mutex_t queue_lock;
  42. pthread_cond_t queue_cond;
  43. // private:
  44. c_work *queue_head;
  45. c_work *queue_tail;
  46. int shutdown;
  47. pthread_t *threadid;
  48. int max_thread_num;
  49. int cur_queue_size;
  50. };
  51. c_thread_pool::c_thread_pool()
  52. {
  53. pthread_mutex_init(&queue_lock, NULL);
  54. pthread_cond_init(&queue_cond, NULL);
  55. //工作队列初始化
  56. queue_head = NULL;
  57. queue_tail = NULL;
  58. max_thread_num = max_thread_num;
  59. cur_queue_size = 0;
  60. shutdown = 0;
  61. max_thread_num = DEFAULT_MAX_THREAD_NUM;
  62. threadid = new pthread_t[max_thread_num];
  63. int i = 0;
  64. for (i = 0; i < max_thread_num; i++)
  65. {
  66. pthread_create(&(threadid[i]), NULL, thread_routine, (void*)this);
  67. }
  68. }
  69. c_thread_pool::c_thread_pool(int max_thread_num)
  70. {
  71. pthread_mutex_init(&queue_lock, NULL);
  72. pthread_cond_init(&queue_cond, NULL);
  73. //工作队列初始化
  74. queue_head = NULL;
  75. queue_tail = NULL;
  76. max_thread_num = max_thread_num;
  77. cur_queue_size = 0;
  78. threadid = new pthread_t[max_thread_num];
  79. int i = 0;
  80. for (i = 0; i < max_thread_num; i++)
  81. {
  82. pthread_create(&(threadid[i]), NULL, thread_routine, (void*)this);
  83. }
  84. }
  85. /*向线程池中的任务队列加入任务*/
  86. int c_thread_pool::add_work(c_work work)
  87. {
  88. c_work *newwork = new c_work;
  89. newwork->process = work.process;
  90. newwork->arg = work.arg;
  91. newwork->next = NULL;
  92. pthread_mutex_lock(&queue_lock);
  93. /*将任务加入到等待队列中*/
  94. if (queue_head != NULL && queue_tail != NULL)
  95. {
  96. queue_tail->next = newwork;
  97. queue_tail = newwork;
  98. }
  99. else
  100. {
  101. //空队列
  102. queue_head = newwork;
  103. queue_tail = newwork;
  104. }
  105. cur_queue_size++;
  106. pthread_mutex_unlock(&queue_lock);
  107. /*等待队列中有任务了,唤醒一个等待线程,注意如果所有线程都在忙碌,这句没有任何作用*/
  108. pthread_cond_signal(&(queue_cond));
  109. printf("add work returned!\n");
  110. return 0;
  111. }
  112. void* c_thread_pool::thread_routine(void *arg)
  113. {
  114. c_thread_pool *pool = (c_thread_pool *)arg;
  115. int i = 0;
  116. while (1)
  117. {
  118. pthread_mutex_lock(&(pool->queue_lock));
  119. //如果等待队列为0并且不销毁线程池,则处于阻塞状态; 注意
  120. // pthread_cond_wait是一个原子操作,等待前会解锁,唤醒后会加锁
  121. //标注:注意这一如果任务队列不为空的话,while语句将被跳过,直接执行下面的调用。
  122. while (pool->cur_queue_size == 0 && pool->shutdown)
  123. {
  124. pthread_cond_wait(&(pool->queue_cond), &(pool->queue_lock));
  125. }
  126. //等待队列长度减去1,并取出链表中的头元素
  127. if (pool->cur_queue_size > 0 && pool->queue_head != NULL)
  128. {
  129. printf("IN THREAD ROUTINE size = %d && queue head is not NULL\n", pool->cur_queue_size);
  130. pool->cur_queue_size--;
  131. c_work *work = pool->queue_head;
  132. pool->queue_head = work->next;
  133. pthread_mutex_unlock(&(pool->queue_lock));
  134. //调用回调函数,执行测试任务
  135. //////////////////////////////////////////
  136. (*(work->process))(work->arg);
  137. free(work);
  138. work = NULL;
  139. }
  140. else//不可达
  141. {
  142. pthread_mutex_unlock(&(pool->queue_lock));
  143. }
  144. }
  145. }
  146. c_thread_pool::~c_thread_pool()
  147. {
  148. for (int i = 0; i < max_thread_num; ++i)
  149. pthread_cancel(threadid[i]);
  150. for (c_work *w_t = queue_head; w_t != NULL;)
  151. {
  152. c_work *temp = w_t->next;
  153. delete w_t;
  154. w_t = temp;
  155. }
  156. delete [] threadid;
  157. }
///////////////////////////////////////////////////////
//线程池类 
///////////////////////////////////////////////////////
#include <pthread.h>
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <assert.h>

const int DEFAULT_MAX_THREAD_NUM = 10;
const int MAX_WORK_NUM = 100000;
//c_worker类
class c_work
{
	public:
		c_work():process(NULL), arg(NULL), next(NULL){}
		c_work(void *(*prcss)(void *), void *arg):
			process(prcss), arg(arg), next(NULL) {}
		~c_work(); 

		void *(*process)(void *);
		void *arg;
		unsigned char type; //最高位表示arg是否需要delete操作
		c_work *next;
};

c_work::~c_work()
{
	unsigned char ifdel = type >> 7;
	if (ifdel)
	{
		delete arg;
		arg = NULL;
	}
}

class c_thread_pool
{
	public:
		c_thread_pool();
		c_thread_pool(const int max_thread_num);
		~c_thread_pool();

		int add_work(c_work work);
		static void *thread_routine(void *arg);

		pthread_mutex_t queue_lock;
		pthread_cond_t queue_cond;

//	private:
		c_work *queue_head;
		c_work *queue_tail;
		int shutdown;

		pthread_t *threadid;
		int max_thread_num;
		int cur_queue_size;
};

c_thread_pool::c_thread_pool()
{

	pthread_mutex_init(&queue_lock, NULL);
	pthread_cond_init(&queue_cond, NULL);

	//工作队列初始化
	queue_head = NULL;
	queue_tail = NULL;
	
	max_thread_num = max_thread_num;
	cur_queue_size = 0;

	shutdown = 0;

	max_thread_num = DEFAULT_MAX_THREAD_NUM;
	threadid = new pthread_t[max_thread_num];
	int i = 0;

	for (i = 0; i < max_thread_num; i++)
	{
		pthread_create(&(threadid[i]), NULL, thread_routine, (void*)this);
	}
}


c_thread_pool::c_thread_pool(int max_thread_num)
{

	pthread_mutex_init(&queue_lock, NULL);
	pthread_cond_init(&queue_cond, NULL);

	//工作队列初始化
	queue_head = NULL;
	queue_tail = NULL;
	
	max_thread_num = max_thread_num;
	cur_queue_size = 0;

	threadid = new pthread_t[max_thread_num];
	int i = 0;
	for (i = 0; i < max_thread_num; i++)
	{
		pthread_create(&(threadid[i]), NULL, thread_routine, (void*)this);
	}
}

/*向线程池中的任务队列加入任务*/
int c_thread_pool::add_work(c_work work)
{
	c_work *newwork = new c_work;
	newwork->process = work.process;
	newwork->arg = work.arg;
	newwork->next = NULL;

	pthread_mutex_lock(&queue_lock);
	
	/*将任务加入到等待队列中*/
	if (queue_head != NULL && queue_tail != NULL)
	{
		queue_tail->next = newwork;
		queue_tail = newwork;	
	}
	else
	{
		//空队列
		queue_head = newwork;
		queue_tail = newwork;
	}

	cur_queue_size++;
	pthread_mutex_unlock(&queue_lock);
	/*等待队列中有任务了,唤醒一个等待线程,注意如果所有线程都在忙碌,这句没有任何作用*/
	pthread_cond_signal(&(queue_cond)); 
	printf("add work returned!\n");
	return 0;
}


void* c_thread_pool::thread_routine(void *arg)
{
	c_thread_pool *pool = (c_thread_pool *)arg;
	int i = 0;
	while (1)
	{
		pthread_mutex_lock(&(pool->queue_lock));
		//如果等待队列为0并且不销毁线程池,则处于阻塞状态; 注意
		 // pthread_cond_wait是一个原子操作,等待前会解锁,唤醒后会加锁

		//标注:注意这一如果任务队列不为空的话,while语句将被跳过,直接执行下面的调用。
		while (pool->cur_queue_size == 0 && pool->shutdown)
		{
			pthread_cond_wait(&(pool->queue_cond), &(pool->queue_lock));
		}


		//等待队列长度减去1,并取出链表中的头元素
		if (pool->cur_queue_size > 0 && pool->queue_head != NULL)
		{
			printf("IN THREAD ROUTINE size = %d && queue head is not NULL\n", pool->cur_queue_size);
			pool->cur_queue_size--;
			c_work *work = pool->queue_head;
			pool->queue_head = work->next;
			pthread_mutex_unlock(&(pool->queue_lock));

			//调用回调函数,执行测试任务
			//////////////////////////////////////////
			(*(work->process))(work->arg);
			free(work);
			work = NULL;
		}
		else //不可达
		{
			pthread_mutex_unlock(&(pool->queue_lock));
		}
	}
	
}

c_thread_pool::~c_thread_pool()
{
	for (int i = 0; i < max_thread_num; ++i)	
		pthread_cancel(threadid[i]);
	for (c_work *w_t = queue_head; w_t != NULL;)
	{
		c_work *temp = w_t->next;
		delete w_t;
		w_t = temp;
	}
	delete [] threadid;
}

 

 
公司简介
 

聚拓互联(http://www.ejutuo.com).Net平台至强虚拟主机供应商,是领先的互联网基础应用服务提供商,主要面向全球客户提供域名注册、国内、香港/美国虚拟主机、企业邮箱、智能建站、虚拟服务器(VPS)、服务器租用、服务器托管等丰富的网络产品服务。

聚拓互联的快速发展与其企业文化密不可分,易网人秉持“团结互助、敬业负责、恪守信誉、积极进取、勇于创新”的企业文化,汇聚了行业内的大量专业人士,拥有多位国内顶尖的linux/freeBSD/unix经验的系统工程师、微软认证工程师和网络安全技术人才。核心团队均为该行业从业多年的专业人士,拥有丰富行业经验和较高威望,并不断改革创新,以满足客户多元化需求为己任,不断进取。 同时,易网坚守 “专业品质、服务为本、诚信经营、恪守信誉”的核心价值观,为客户提供多样、安全、稳定、放心的产品。

 

 

 

 

 

 

 

原文链接: https://www.cnblogs.com/ejutuo/archive/2012/08/23/2653348.html

欢迎关注

微信关注下方公众号,第一时间获取干货硬货;公众号内回复【pdf】免费获取数百本计算机经典书籍

    线程池的设计与测试

原创文章受到原创版权保护。转载请注明出处:https://www.ccppcoding.com/archives/60719

非原创文章文中已经注明原地址,如有侵权,联系删除

关注公众号【高性能架构探索】,第一时间获取最新文章

转载文章受原作者版权保护。转载请注明原作者出处!

(0)
上一篇 2023年2月9日 上午9:35
下一篇 2023年2月9日 上午9:35

相关推荐