手写线程池-C++版

重写C++版

C 语言版的线程池改为 C++ 版,这样代码从使用和感观上都会更简洁一些。
从 C 到 C++ 的迁移主要用到了 C++ 三大特性中的封装。
线程池按照面向对象的思想进行拆分可以分为两部分:
(1)任务队列类
(2)线程池类

任务队列

类声明,TaskQueue.h

Task 是任务类,有两个成员分别是两个指针 void(*)(void*)void*
TaskQueue 是任务队列,提供了添加任务、取出任务、获取任务个数的功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
using callback = void (*)(void* arg);
// 任务结构体
template <typename T>
struct Task
{
Task<T>()
{
function = nullptr;
arg = nullptr;
}
Task<T>(callback f, void* arg)
{
function = f;
this->arg = (T*)arg;
}

callback function;
T* arg;
};

template <typename T>
class TaskQueue
{
public:
TaskQueue();
~TaskQueue();

// 添加任务
void addTask(Task<T> task);
void addTask(callback f, void* arg);
// 取出一个任务
Task<T> takeTask();
// 获取当前任务的个数
inline int taskNumber()
{
return m_taskQ.size();
}

private:
queue<Task<T>> m_taskQ; // 任务队列
pthread_mutex_t m_mutex; // 互斥锁
};

类定义,TaskQueue.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
template <typename T>
TaskQueue<T>::TaskQueue()
{
pthread_mutex_init(&m_mutex, NULL);
}

template <typename T>
TaskQueue<T>::~TaskQueue()
{
pthread_mutex_destroy(&m_mutex);
}

template <typename T>
void TaskQueue<T>::addTask(Task<T> task)
{
pthread_mutex_lock(&m_mutex);
m_taskQ.push(task);
pthread_mutex_unlock(&m_mutex);
}

template <typename T>
void TaskQueue<T>::addTask(callback f, void* arg)
{
pthread_mutex_lock(&m_mutex);
m_taskQ.push(Task<T>(f, arg));
pthread_mutex_unlock(&m_mutex);
}

template <typename T>
Task<T> TaskQueue<T>::takeTask()
{
Task<T> t;
pthread_mutex_lock(&m_mutex);
if (!m_taskQ.empty())
{
t = m_taskQ.front();
m_taskQ.pop();
}
pthread_mutex_unlock(&m_mutex);
return t;
}

线程池

类声明,ThreadPool.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
template <typename T>
class ThreadPool
{
public:
// 创建线程池并初始化
ThreadPool(int min, int max);
// 销毁线程池
~ThreadPool();
// 给线程池添加任务
void addTask(Task<T> task);
// 获取线程池中工作的线程的个数
int getBusyNum();
// 获取线程池中活着的线程的个数
int getAliveNum();

private:
// 工作的线程(消费者线程)任务函数
static void* worker(void* arg);
// 管理者线程任务函数
static void* manager(void* arg);
// 单个线程退出
void threadExit();

private:
// 任务队列
TaskQueue<T>* taskQ;

pthread_t managerID; // 管理者线程ID
pthread_t *threadIDs; // 工作的线程ID
int minNum; // 最小线程数量
int maxNum; // 最大线程数量
int busyNum; // 忙的线程的个数
int liveNum; // 存活的线程的个数
int exitNum; // 要销毁的线程个数
pthread_mutex_t mutexPool; // 锁整个的线程池
pthread_cond_t notEmpty; // 任务队列是不是空了
bool shutdown; // 是否销毁线程池(销毁为1,不销毁为0)
};

类定义,ThreadPool.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
template <typename T>
ThreadPool<T>::ThreadPool(int min, int max)
{
do
{
// 实例化任务队列
taskQ = new TaskQueue<T>;
if (taskQ == nullptr)
{
cout << "new taskQ fail...\n";
break;
}

threadIDs = new pthread_t[max];
if (threadIDs == nullptr)
{
cout << "new threadIDs fail...\n";
break;
}
memset(threadIDs, 0, sizeof(pthread_t) * max);
minNum = min;
maxNum = max;
busyNum = 0;
liveNum = min; // 等于最小个数
exitNum = 0;

// 初始化互斥锁、条件变量
if (pthread_mutex_init(&mutexPool, NULL) != 0 ||
pthread_cond_init(&notEmpty, NULL) != 0)
{
cout << "mutex or condition init fail...\n";
break;
}

shutdown = false;

// 创建线程
pthread_create(&managerID, NULL, manager, this);
for (int i = 0; i < min; ++i)
{
pthread_create(&threadIDs[i], NULL, worker, this);
}
return;
} while (0);

// 释放资源
if (threadIDs) delete[] threadIDs;
if (taskQ) delete taskQ;
}

template <typename T>
ThreadPool<T>::~ThreadPool()
{
// 关闭线程池
shutdown = true;
// 阻塞回收管理者线程
pthread_join(managerID, NULL);
// 唤醒阻塞的消费者线程
for (int i = 0; i < liveNum; ++i)
{
pthread_cond_signal(&notEmpty);
}
// 释放堆内存
if (taskQ)
{
delete taskQ;
}
if (threadIDs)
{
delete[] threadIDs;
}

pthread_mutex_destroy(&mutexPool);
pthread_cond_destroy(&notEmpty);
}

template <typename T>
void ThreadPool<T>::addTask(Task<T> task)
{
if (shutdown)
{
return;
}
// 添加任务
taskQ->addTask(task);

pthread_cond_signal(&notEmpty);
}

template <typename T>
int ThreadPool<T>::getBusyNum()
{
pthread_mutex_lock(&mutexPool);
int busyNum = this->busyNum;
pthread_mutex_unlock(&mutexPool);
return busyNum;
}

template <typename T>
int ThreadPool<T>::getAliveNum()
{
pthread_mutex_lock(&mutexPool);
int aliveNum = this->liveNum;
pthread_mutex_unlock(&mutexPool);
return aliveNum;
}

template <typename T>
void* ThreadPool<T>::worker(void* arg)
{
ThreadPool* pool = static_cast<ThreadPool*>(arg);

// 一直不停地工作
while (true)
{
pthread_mutex_lock(&pool->mutexPool);
// 当前任务队列是否为空
while (pool->taskQ->taskNumber() == 0 && !pool->shutdown)
{
// 阻塞工作线程
pthread_cond_wait(&pool->notEmpty, &pool->mutexPool);

// 解除阻塞之后, 判断是否要销毁线程
if (pool->exitNum > 0)
{
pool->exitNum--;
if (pool->liveNum > pool->minNum)
{
pool->liveNum--;
pthread_mutex_unlock(&pool->mutexPool);
pool->threadExit();
}
}
}

// 判断线程池是否被关闭
if (pool->shutdown)
{
pthread_mutex_unlock(&pool->mutexPool);
pool->threadExit();
}

// 从任务队列中取一个任务
Task<T> task = pool->taskQ->takeTask();
// 解锁
pool->busyNum++;
pthread_mutex_unlock(&pool->mutexPool);

// 执行任务
cout << "thread " << to_string(pthread_self()) << " start working...\n";
task.function(task.arg);
delete task.arg;
task.arg = nullptr;

// 任务处理结束
cout << "thread " << to_string(pthread_self()) << " end working...\n";
pthread_mutex_lock(&pool->mutexPool);
pool->busyNum--;
pthread_mutex_unlock(&pool->mutexPool);
}

return nullptr;
}

template <typename T>
void* ThreadPool<T>::manager(void* arg)
{
ThreadPool* pool = static_cast<ThreadPool*>(arg);

while (!pool->shutdown)
{
// 每隔3s检测一次
sleep(3);

// 取出线程池中任务的数量和当前线程的数量
pthread_mutex_lock(&pool->mutexPool);
int queueSize = pool->taskQ->taskNumber();
int liveNum = pool->liveNum;
// 取出忙的线程的数量
int busyNum = pool->busyNum;
pthread_mutex_unlock(&pool->mutexPool);

const int NUMBER = 2;
// 添加线程
// 任务的个数 > 存活的线程数 && 存活的线程数 < 最大线程数
if (queueSize > liveNum && liveNum < pool->maxNum)
{
pthread_mutex_lock(&pool->mutexPool);
int counter = 0;
for (int i = 0; i < pool->maxNum && counter < NUMBER
&& pool->liveNum < pool->maxNum; ++i)
{
if (pool->threadIDs[i] == 0)
{
pthread_create(&pool->threadIDs[i], NULL, worker, pool);
counter++;
pool->liveNum++;
}
}
pthread_mutex_unlock(&pool->mutexPool);
}

// 销毁线程
// 忙的线程*2 < 存活的线程数 && 存活的线程数 > 最小线程数
if (busyNum * 2 < liveNum && liveNum > pool->minNum)
{
pthread_mutex_lock(&pool->mutexPool);
pool->exitNum = NUMBER;
pthread_mutex_unlock(&pool->mutexPool);
// 让工作的线程自杀
for (int i = 0; i < NUMBER; ++i)
{
pthread_cond_signal(&pool->notEmpty);
}
}
}

return NULL;
}

template <typename T>
void ThreadPool<T>::threadExit()
{
pthread_t tid = pthread_self();
for (int i = 0; i < maxNum; ++i)
{
if (threadIDs[i] == tid)
{
threadIDs[i] = 0;
cout << "threadExit() called, " << to_string(tid) << " exiting...\n";
break;
}
}
pthread_exit(NULL);
}

测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#include "ThreadPool.h"
#include "ThreadPool.cpp"
#include <iostream>
#include <string>
#include <pthread.h>
#include <unistd.h>

void taskFunc(void* arg)
{
int num = *(int*)arg;
cout << "thread " << to_string(pthread_self()) << "is working, number = " << num << "\n";
sleep(1);
}

int main()
{
// 创建线程池
ThreadPool<int> pool(3, 10);
for (int i = 0; i < 100; ++i)
{
int* num = new int(i + 100);
pool.addTask(Task<int>(taskFunc, num));
}
// 主线程睡眠一段时间,保证工作线程处理完毕
sleep(20);

return 0;
}
// g++ test.cpp -lpthread -o test

手写线程池-C++版
https://lcf163.github.io/2021/08/31/手写线程池-Cpp版/
作者
乘风的小站
发布于
2021年8月31日
许可协议