手写线程池-C语言版

线程池原理

使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题:
如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,
这样频繁创建线程就会大大降低系统的效率,因为创建线程和销毁线程需要时间。

有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是继续执行其他的任务呢?
线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。
线程池线程都是后台线程。每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。

线程池的组成主要分为 3 个部分:
(1)任务队列:存储需要处理的任务,由工作的线程来处理这些任务。
通过线程池提供的 API 函数,将一个待处理的任务添加到任务队列,或者从任务队列中删除。
已处理的任务会被从任务队列中删除。
线程池的使用者,调用线程池函数往任务队列中添加任务的线程就是生产者线程。
(2)工作的线程(任务队列中任务的消费者,N 个)
线程池中维护了一定数量的工作线程,它的作用是不停地读任务队列,从中取出任务并处理。
工作的线程相当于是任务队列的消费者角色,
如果任务队列为空,工作的线程将会被阻塞 (使用条件变量/信号量阻塞);
如果阻塞之后有了新的任务,由生产者将阻塞解除,工作线程开始工作。
(3)管理者线程(不处理任务队列中的任务,1 个)
它的作用是周期性的检测任务队列中的任务数量以及处于忙状态的工作线程个数,
当任务过多的时候,可以适当的创建一些新的工作线程;
当任务过少的时候,可以适当的销毁一些工作的线程。

任务队列

1
2
3
4
5
6
// 任务结构体
typedef struct Task
{
void (*function)(void* arg);
void* arg;
}Task;

线程池定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 线程池结构体
struct ThreadPool
{
// 任务队列
Task* taskQ;
int queueCapacity; // 容量
int queueSize; // 当前任务个数
int queueFront; // 队头 -> 取数据
int queueRear; // 队尾 -> 放数据

pthread_t managerID; // 管理者线程ID
pthread_t *threadIDs; // 工作的线程ID
int minNum; // 最小线程数量
int maxNum; // 最大线程数量
int busyNum; // 忙的线程的个数
int liveNum; // 存活的线程的个数
int exitNum; // 要销毁的线程个数
pthread_mutex_t mutexPool; // 锁整个的线程池
pthread_mutex_t mutexBusy; // 锁busyNum变量
pthread_cond_t notFull; // 任务队列是不是满了
pthread_cond_t notEmpty; // 任务队列是不是空了

int shutdown; // 是不是要销毁线程池, 销毁为1, 不销毁为0
};

头文件声明,ThreadPool.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#ifndef _THREADPOOL_H
#define _THREADPOOL_H

typedef struct ThreadPool ThreadPool;

// 创建线程池并初始化
ThreadPool* threadPoolCreate(int min, int max, int queueSize);

// 销毁线程池
int threadPoolDestroy(ThreadPool* pool);

// 给线程池添加任务
void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg);

// 获取线程池中工作的线程的个数
int threadPoolBusyNum(ThreadPool* pool);

// 获取线程池中活着的线程的个数
int threadPoolAliveNum(ThreadPool* pool);

// 工作的线程(消费者线程)任务函数
void* worker(void* arg);
// 管理者线程任务函数
void* manager(void* arg);
// 单个线程退出
void threadExit(ThreadPool* pool);

#endif // _THREADPOOL_H

源文件定义,ThreadPool.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267

ThreadPool* threadPoolCreate(int min, int max, int queueSize)
{
ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool));
do
{
if (pool == NULL)
{
printf("malloc threadpool fail...\n");
break;
}

pool->threadIDs = (pthread_t*)malloc(sizeof(pthread_t) * max);
if (pool->threadIDs == NULL)
{
printf("malloc threadIDs fail...\n");
break;
}
memset(pool->threadIDs, 0, sizeof(pthread_t) * max);
pool->minNum = min;
pool->maxNum = max;
pool->busyNum = 0;
pool->liveNum = min; // 等于最小个数
pool->exitNum = 0;

if (pthread_mutex_init(&pool->mutexPool, NULL) != 0 ||
pthread_mutex_init(&pool->mutexBusy, NULL) != 0 ||
pthread_cond_init(&pool->notFull, NULL) != 0 ||
pthread_cond_init(&pool->notEmpty, NULL) != 0)
{
printf("mutex or condition init fail...\n");
break;
}

// 任务队列
pool->taskQ = (Task*)malloc(sizeof(Task) * queueSize);
pool->queueCapacity = queueSize;
pool->queueSize = 0;
pool->queueFront = 0;
pool->queueRear = 0;

pool->shutdown = 0;

// 创建线程
pthread_create(&pool->managerID, NULL, manager, pool);
for (int i = 0; i < min; ++i)
{
pthread_create(&pool->threadIDs[i], NULL, worker, pool);
}
return pool;
} while (0);

// 释放资源
if (pool && pool->threadIDs) free(pool->threadIDs);
if (pool && pool->taskQ) free(pool->taskQ);
if (pool) free(pool);

return NULL;
}

int threadPoolDestroy(ThreadPool* pool)
{
if (pool == NULL)
{
return -1;
}

// 关闭线程池
pool->shutdown = 1;
// 阻塞回收管理者线程
pthread_join(pool->managerID, NULL);
// 唤醒阻塞的消费者线程
for (int i = 0; i < pool->liveNum; ++i)
{
pthread_cond_signal(&pool->notEmpty);
}
// 释放堆内存
if (pool->taskQ)
{
free(pool->taskQ);
}
if (pool->threadIDs)
{
free(pool->threadIDs);
}

pthread_mutex_destroy(&pool->mutexPool);
pthread_mutex_destroy(&pool->mutexBusy);
pthread_cond_destroy(&pool->notEmpty);
pthread_cond_destroy(&pool->notFull);

free(pool);
pool = NULL;

return 0;
}

void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg)
{
pthread_mutex_lock(&pool->mutexPool);
while (pool->queueSize == pool->queueCapacity && !pool->shutdown)
{
// 阻塞生产者线程
pthread_cond_wait(&pool->notFull, &pool->mutexPool);
}
if (pool->shutdown)
{
pthread_mutex_unlock(&pool->mutexPool);
return;
}
// 添加任务
pool->taskQ[pool->queueRear].function = func;
pool->taskQ[pool->queueRear].arg = arg;
pool->queueRear = (pool->queueRear + 1) % pool->queueCapacity;
pool->queueSize++;

pthread_cond_signal(&pool->notEmpty);
pthread_mutex_unlock(&pool->mutexPool);
}

int threadPoolBusyNum(ThreadPool* pool)
{
pthread_mutex_lock(&pool->mutexBusy);
int busyNum = pool->busyNum;
pthread_mutex_unlock(&pool->mutexBusy);
return busyNum;
}

int threadPoolAliveNum(ThreadPool* pool)
{
pthread_mutex_lock(&pool->mutexPool);
int aliveNum = pool->liveNum;
pthread_mutex_unlock(&pool->mutexPool);
return aliveNum;
}

void* worker(void* arg)
{
ThreadPool* pool = (ThreadPool*) arg;

while (1)
{
pthread_mutex_lock(&pool->mutexPool);
// 当前任务队列是否为空
while (pool->queueSize == 0 && !pool->shutdown)
{
// 阻塞工作线程
pthread_cond_wait(&pool->notEmpty, &pool->mutexPool);

// 判断是不是要销毁线程
if (pool->exitNum > 0)
{
pool->exitNum--;
if (pool->liveNum > pool->minNum)
{
pool->liveNum--;
pthread_mutex_unlock(&pool->mutexPool);
threadExit(pool);
}
}
}

// 判断线程池是否被关闭了
if (pool->shutdown)
{
pthread_mutex_unlock(&pool->mutexPool);
threadExit(pool);
}

// 从任务队列中取一个任务
Task task;
task.function = pool->taskQ[pool->queueFront].function;
task.arg = pool->taskQ[pool->queueFront].arg;
// 移动头节点
pool->queueFront = (pool->queueFront + 1) % pool->queueCapacity;
pool->queueSize--;
// 解锁
pthread_cond_signal(&pool->notFull);
pthread_mutex_unlock(&pool->mutexPool);

printf("thread %ld start working...\n", pthread_self());
pthread_mutex_lock(&pool->mutexBusy);
pool->busyNum++;
pthread_mutex_unlock(&pool->mutexBusy);
task.function(task.arg);
free(task.arg);
task.arg = NULL;

printf("thread %ld end working...\n", pthread_self());
pthread_mutex_lock(&pool->mutexBusy);
pool->busyNum--;
pthread_mutex_unlock(&pool->mutexBusy);
}

return NULL;
}

void* manager(void* arg)
{
ThreadPool* pool = (ThreadPool*) arg;

while (!pool->shutdown)
{
// 每隔3s检测一次
sleep(3);

// 取出线程池中任务的数量和当前线程的数量
pthread_mutex_lock(&pool->mutexPool);
int queueSize = pool->queueSize;
int liveNum = pool->liveNum;
pthread_mutex_unlock(&pool->mutexPool);

// 取出忙的线程的数量
pthread_mutex_lock(&pool->mutexBusy);
int busyNum = pool->busyNum;
pthread_mutex_unlock(&pool->mutexBusy);

// 添加线程
// 任务的个数 > 存活的线程数 && 存活的线程数 < 最大线程数
if (queueSize > liveNum && liveNum < pool->maxNum)
{
pthread_mutex_lock(&pool->mutexPool);
int counter = 0;
for (int i = 0; i < pool->maxNum && counter < NUMBER
&& pool->liveNum < pool->maxNum; ++i)
{
if (pool->threadIDs[i] == 0)
{
pthread_create(&pool->threadIDs[i], NULL, worker, pool);
counter++;
pool->liveNum++;
}
}
pthread_mutex_unlock(&pool->mutexPool);
}
// 销毁线程
// 忙的线程*2 < 存活的线程数 && 存活的线程数 > 最小线程数
if (busyNum * 2 < liveNum && liveNum > pool->minNum)
{
pthread_mutex_lock(&pool->mutexPool);
pool->exitNum = NUMBER;
pthread_mutex_unlock(&pool->mutexPool);
// 让工作的线程自杀
for (int i = 0; i < NUMBER; ++i)
{
pthread_cond_signal(&pool->notEmpty);
}
}
}

return NULL;
}

void threadExit(ThreadPool* pool)
{
pthread_t tid = pthread_self();
for (int i = 0; i < pool->maxNum; ++i)
{
if (pool->threadIDs[i] == tid)
{
pool->threadIDs[i] = 0;
printf("threadExit() called, %ld exiting...\n", tid);
break;
}
}
pthread_exit(NULL);
}

测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#include "ThreadPool.h"
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
#include <stdlib.h>

void taskFunc(void* arg)
{
int num = *(int*)arg;
printf("thread %ld is working, number = %d\n", pthread_self(), num);
sleep(1);
}

int main()
{
// 创建线程池
ThreadPool* pool = threadPoolCreate(3, 10, 100);
for (int i = 0; i < 100; ++i)
{
int* num = (int*)malloc(sizeof(int));
*num = i + 100;
threadPoolAdd(pool, taskFunc, num);
}
// 主线程睡眠一段时间,保证工作线程处理完毕
sleep(30);

threadPoolDestroy(pool);

return 0;
}
// gcc main.c ThreadPool.c -o main -lpthread

手写线程池-C语言版
https://lcf163.github.io/2021/08/31/手写线程池-C语言版/
作者
乘风的小站
发布于
2021年8月31日
许可协议