为什么需要线程池目前的大多数网络服务器,包括Web服务器、Email服务器以及数据库服务器等都具有一个共同点,就是单位时间内必须处理数目巨大的连接请求,但处理时间却相对较短。
传 统多线程方案中我们采用的服务器模型则是一旦接受到请求之后,即创建一个新的线程,由该线程执行任务。任务执行完毕后,线程退出,这就是是“即时创建,即 时销毁”的策略。尽管与创建进程相比,创建线程的时间已经大大的缩短,但是如果提交给线程的任务是执行时间较短,而且执行次数极其频繁,那么服务器将处于 不停的创建线程,销毁线程的状态。
我们将传统方案中的线程执行过程分为三个过程:T1、T2、T3。
T1:线程创建时间T2:线程执行时间,包括线程的同步等时间T3:线程销毁时间那么我们可以看出,线程本身的开销所占的比例为(T1+T3) / (T1+T2+T3)。如果线程执行的时间很短的话,这比开销可能占到20%-50%左右。如果任务执行时间很频繁的话,这笔开销将是不可忽略的。除此之外,线程池能够减少创建的线程个数。通常线程池所允许的并发线程是有上界的,如果同时需要并发的线程数超过上界,那么一部分线程将会等待。而传统方案中,如果同时请求数目为2000,那么最坏情况下,系统可能需要产生2000个线程。尽管这不是一个很大的数目,但是也有部分机器可能达不到这种要求。因此线程池的出现正是着眼于减少线程池本身带来的开销。线程池采用预创建的技术,在应用程序启动之后,将立即创建一定数量的线程(N1),放入空闲队列中。这些线程都是处于阻塞(Suspended)状态,不消耗CPU,但占用较小的内存空间。当任务到来后,缓冲池选择一个空闲线程,把任务传入此线程中运行。当N1个线程都在处理任务后,缓冲池自动创建一定数量的新线程,用于处理更多的任务。在任务执行完毕后线程也不退出,而是继续保持在池中等待下一次的任务。当系统比较空闲时,大部分线程都一直处于暂停状态,线程池自动销毁一部分线程,回收系统资源。基于这种预创建技术,线程池将线程创建和销毁本身所带来的开销分摊到了各个具体的任务上,执行次数越多,每个任务所分担到的线程本身开销则越小,不过我们另外可能需要考虑进去线程之间同步所带来的开销。
1 #ifndef _ThreadPool_H_
2 #define _ThreadPool_H_
3
4 #pragma warning(disable: 4530)
5 #pragma warning(disable: 4786)
6
7 #include <cassert>
8 #include <vector>
9 #include <queue>
10 #include <windows.h>
11
12
13 class ThreadJob //工作基类
14 {
15 public:
16 //供线程池调用的虚函数
17 virtual void DoJob(void *pPara) = 0;
18 };
19
20 class ThreadPool
21 {
22
23 public:
24 //dwNum 线程池规模
25 ThreadPool(DWORD dwNum = 4) : _lThreadNum(0), _lRunningNum(0)
26 {
27 InitializeCriticalSection(&_csThreadVector);
28 InitializeCriticalSection(&_csWorkQueue);
29
30 _EventComplete = CreateEvent(0, false, false, NULL);
31 _EventEnd = CreateEvent(0, true, false, NULL);
32 _SemaphoreCall = CreateSemaphore(0, 0, 0x7FFFFFFF, NULL);
33 _SemaphoreDel = CreateSemaphore(0, 0, 0x7FFFFFFF, NULL);
34
35 assert(_SemaphoreCall != INVALID_HANDLE_VALUE);
36 assert(_EventComplete != INVALID_HANDLE_VALUE);
37 assert(_EventEnd != INVALID_HANDLE_VALUE);
38 assert(_SemaphoreDel != INVALID_HANDLE_VALUE);
39
40 AdjustSize(dwNum <= 0 ? 4 : dwNum);
41 }
42
43 ~ThreadPool()
44 {
45 DeleteCriticalSection(&_csWorkQueue);
46
47 CloseHandle(_EventEnd);
48 CloseHandle(_EventComplete);
49 CloseHandle(_SemaphoreCall);
50 CloseHandle(_SemaphoreDel);
51
52 vector<ThreadItem*>::iterator iter;
53 for(iter = _ThreadVector.begin(); iter != _ThreadVector.end(); iter++)
54 {
55 if(*iter)
56 delete *iter;
57 }
58
59 DeleteCriticalSection(&_csThreadVector);
60 }
61 //调整线程池规模
62 int AdjustSize(int iNum)
63 {
64 if(iNum > 0)
65 {
66 ThreadItem *pNew;
67 EnterCriticalSection(&_csThreadVector);
68 for(int _i=0; _i<iNum; _i++)
69 {
70 _ThreadVector.push_back(pNew = new ThreadItem(this));
71 assert(pNew);
72 pNew->_Handle = CreateThread(NULL, 0, DefaultJobProc, pNew, 0, NULL);
73 assert(pNew->_Handle);
74 }
75 LeaveCriticalSection(&_csThreadVector);
76 }
77 else
78 {
79 iNum *= -1;
80 ReleaseSemaphore(_SemaphoreDel, iNum > _lThreadNum ? _lThreadNum : iNum, NULL);
81 }
82 return (int)_lThreadNum;
83 }
84 //调用线程池
85 void Call(void (*pFunc)(void *), void *pPara = NULL)
86 {
87 assert(pFunc);
88
89 EnterCriticalSection(&_csWorkQueue);
90 _JobQueue.push(new JobItem(pFunc, pPara));
91 LeaveCriticalSection(&_csWorkQueue);
92
93 ReleaseSemaphore(_SemaphoreCall, 1, NULL);
94 }
95 //调用线程池
96 inline void Call(ThreadJob * p, void *pPara = NULL)
97 {
98 Call(CallProc, new CallProcPara(p, pPara));
99 }
100 //结束线程池, 并同步等待
101 bool EndAndWait(DWORD dwWaitTime = INFINITE)
102 {
103 SetEvent(_EventEnd);
104 return WaitForSingleObject(_EventComplete, dwWaitTime) == WAIT_OBJECT_0;
105 }
106 //结束线程池
107 inline void End()
108 {
109 SetEvent(_EventEnd);
110 }
111 inline DWORD Size()
112 {
113 return (DWORD)_lThreadNum;
114 }
115 inline DWORD GetRunningSize()
116 {
117 return (DWORD)_lRunningNum;
118 }
119 bool IsRunning()
120 {
121 return _lRunningNum > 0;
122 }
123
124 protected:
125
126 //工作线程
127 static DWORD WINAPI DefaultJobProc(LPVOID lpParameter = NULL)
128 {
129 ThreadItem *pThread = static_cast<ThreadItem*>(lpParameter);
130 assert(pThread);
131
132 ThreadPool *pThreadPoolObj = pThread->_pThis;
133 assert(pThreadPoolObj);
134
135 InterlockedIncrement(&pThreadPoolObj->_lThreadNum);
136
137 HANDLE hWaitHandle[3];
138 hWaitHandle[0] = pThreadPoolObj->_SemaphoreCall;
139 hWaitHandle[1] = pThreadPoolObj->_SemaphoreDel;
140 hWaitHandle[2] = pThreadPoolObj->_EventEnd;
141
142 JobItem *pJob;
143 bool fHasJob;
144
145 for(;;)
146 {
147 DWORD wr = WaitForMultipleObjects(3, hWaitHandle, false, INFINITE);
148
149 //响应删除线程信号
150 if(wr == WAIT_OBJECT_0 + 1)
151 break;
152
153 //从队列里取得用户作业
154 EnterCriticalSection(&pThreadPoolObj->_csWorkQueue);
155 if(fHasJob = !pThreadPoolObj->_JobQueue.empty())
156 {
157 pJob = pThreadPoolObj->_JobQueue.front();
158 pThreadPoolObj->_JobQueue.pop();
159 assert(pJob);
160 }
161 LeaveCriticalSection(&pThreadPoolObj->_csWorkQueue);
162
163 //受到结束线程信号 确定是否结束线程(结束线程信号 && 是否还有工作)
164 if(wr == WAIT_OBJECT_0 + 2 && !fHasJob)
165 break;
166
167 if(fHasJob && pJob)
168 {
169 InterlockedIncrement(&pThreadPoolObj->_lRunningNum);
170 pThread->_dwLastBeginTime = GetTickCount();
171 pThread->_dwCount++;
172 pThread->_fIsRunning = true;
173 pJob->_pFunc(pJob->_pPara); //运行用户作业
174 delete pJob;
175 pThread->_fIsRunning = false;
176 InterlockedDecrement(&pThreadPoolObj->_lRunningNum);
177 }
178 }
179
180 //删除自身结构
181 EnterCriticalSection(&pThreadPoolObj->_csThreadVector);
182 pThreadPoolObj->_ThreadVector.erase(find(pThreadPoolObj->_ThreadVector.begin(), pThreadPoolObj->_ThreadVector.end(), pThread));
183 LeaveCriticalSection(&pThreadPoolObj->_csThreadVector);
184
185 delete pThread;
186
187 InterlockedDecrement(&pThreadPoolObj->_lThreadNum);
188
189 if(!pThreadPoolObj->_lThreadNum) //所有线程结束
190 SetEvent(pThreadPoolObj->_EventComplete);
191
192 return 0;
193 }
194 //调用用户对象虚函数
195 static void CallProc(void *pPara)
196 {
197 CallProcPara *cp = static_cast<CallProcPara *>(pPara);
198 assert(cp);
199 if(cp)
200 {
201 cp->_pObj->DoJob(cp->_pPara);
202 delete cp;
203 }
204 }
205 //用户对象结构
206 struct CallProcPara
207 {
208 ThreadJob* _pObj;//用户对象
209 void *_pPara;//用户参数
210 CallProcPara(ThreadJob* p, void *pPara) : _pObj(p), _pPara(pPara) { };
211 };
212 //用户函数结构
213 struct JobItem
214 {
215 void (*_pFunc)(void *);//函数
216 void *_pPara; //参数
217 JobItem(void (*pFunc)(void *) = NULL, void *pPara = NULL) : _pFunc(pFunc), _pPara(pPara) { };
218 };
219 //线程池中的线程结构
220 struct ThreadItem
221 {
222 HANDLE _Handle; //线程句柄
223 ThreadPool *_pThis; //线程池的指针
224 DWORD _dwLastBeginTime; //最后一次运行开始时间
225 DWORD _dwCount; //运行次数
226 bool _fIsRunning;
227 ThreadItem(ThreadPool *pthis) : _pThis(pthis), _Handle(NULL), _dwLastBeginTime(0), _dwCount(0), _fIsRunning(false) { };
228 ~ThreadItem()
229 {
230 if(_Handle)
231 {
232 CloseHandle(_Handle);
233 _Handle = NULL;
234 }
235 }
236 };
237
238 std::queue<JobItem *> _JobQueue; //工作队列
239 std::vector<ThreadItem *> _ThreadVector; //线程数据
240
241 CRITICAL_SECTION _csThreadVector, _csWorkQueue; //工作队列临界, 线程数据临界
242
243 HANDLE _EventEnd, _EventComplete, _SemaphoreCall, _SemaphoreDel;//结束通知, 完成事件, 工作信号, 删除线程信号
244 long _lThreadNum, _lRunningNum; //线程数, 运行的线程数
245
246 };
247
248 #endif //_ThreadPool_H_
View Code
1 //Use:1>:
2 void threadfunc(void *p)
3 {
4 //...
5 }
6 ThreadPool tp;
7 for(i=0; i<100; i++)
8 tp.Call(threadfunc);
9
10 ThreadPool tp(20);//20为初始线程池规模
11 tp.Call(threadfunc, lpPara);
12 tp.AdjustSize(50);//增加50
13 tp.AdjustSize(-30);//减少30
14
15
16 //2>:
17 class MyThreadJob : public ThreadJob //线程对象从ThreadJob扩展
18 {
19 public:
20 virtual void DoJob(void *p)//自定义的虚函数
21 {
22 //....
23 }
24 };
25 MyThreadJob mt[10];
26 ThreadPool tp;
27 for(i=0; i<100 i++)
28 tp.Call(mt + i);//tp.Call(mt + i, para);
View Code
原文链接: https://www.cnblogs.com/jeromesunny/p/3222031.html
欢迎关注
微信关注下方公众号,第一时间获取干货硬货;公众号内回复【pdf】免费获取数百本计算机经典书籍
原创文章受到原创版权保护。转载请注明出处:https://www.ccppcoding.com/archives/97559
非原创文章文中已经注明原地址,如有侵权,联系删除
关注公众号【高性能架构探索】,第一时间获取最新文章
转载文章受原作者版权保护。转载请注明原作者出处!