线程池

线程池

线程池的思想

线程池的思想

线程池的流程图

线程池的流程图

简单版本的线程池

主线程添加任务;子线程执行任务和销毁线程池

  • threadpoolsimple.h
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    #ifndef _THREADPOOL_H
    #define _THREADPOOL_H

    #include <stdio.h>
    #include <stdlib.h>
    #include <string.h>
    #include <sys/types.h>
    #include <unistd.h>
    #include <pthread.h>

    typedef struct _PoolTask
    {
    int tasknum; //模拟任务编号
    void *arg; //回调函数的参数
    void (*task_func)(void *arg); //任务的回调函数
    } PoolTask;

    typedef struct _ThreadPool
    {
    int max_job_num; //最大任务个数
    int job_num; //实际任务个数
    PoolTask *tasks; //任务队列数组
    int job_push; //入队位置
    int job_pop; // 出队位置

    int thr_num; //线程池内线程个数
    pthread_t *threads; //线程池内线程数组
    int shutdown; //是否关闭线程池
    pthread_mutex_t pool_lock; //线程池的锁
    pthread_cond_t empty_task; //任务队列为空的条件
    pthread_cond_t not_empty_task; //任务队列不为空的条件

    } ThreadPool;

    void create_threadpool(int thrnum, int maxtasknum); //创建线程池:thrnum 代表线程个数,maxtasknum 最大任务个数
    void destroy_threadpool(ThreadPool *pool); //摧毁线程池
    void addtask(ThreadPool *pool); //添加任务到线程池
    void taskRun(void *arg); //任务回调函数

    #endif
  • threadpoolsimple.c
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    //简易版线程池
    #include "threadpoolsimple.h"

    ThreadPool *thrPool = NULL;

    int beginnum = 1000;

    void *thrRun(void *arg)
    {
    //printf("begin call %s-----\n",__FUNCTION__);
    ThreadPool *pool = (ThreadPool *)arg;
    int taskpos = 0; //任务位置
    PoolTask *task = (PoolTask *)malloc(sizeof(PoolTask));

    while (1)
    {
    //获取任务,先要尝试加锁
    pthread_mutex_lock(&thrPool->pool_lock);

    //无任务并且线程池不摧毁
    while (thrPool->job_num <= 0 && !thrPool->shutdown)
    {
    //如果没有任务,线程会阻塞
    pthread_cond_wait(&thrPool->not_empty_task, &thrPool->pool_lock);
    }

    if (thrPool->job_num)
    {
    //有任务需要处理
    taskpos = (thrPool->job_pop++) % thrPool->max_job_num;
    //printf("task out %d...tasknum===%d tid=%lu\n",taskpos,thrPool->tasks[taskpos].tasknum,pthread_self());
    //为什么要拷贝?避免任务被修改,生产者会添加任务
    memcpy(task, &thrPool->tasks[taskpos], sizeof(PoolTask));
    task->arg = task;
    thrPool->job_num--;
    //task = &thrPool->tasks[taskpos];
    pthread_cond_signal(&thrPool->empty_task); //通知主线程
    }

    if (thrPool->shutdown)
    {
    //代表要摧毁线程池,此时线程退出即可
    //pthread_detach(pthread_self());//临死前分家
    pthread_mutex_unlock(&thrPool->pool_lock);
    free(task);
    pthread_exit(NULL);
    }

    //释放锁
    pthread_mutex_unlock(&thrPool->pool_lock);
    task->task_func(task->arg); //执行回调函数
    }

    //printf("end call %s-----\n",__FUNCTION__);
    }

    //创建线程池
    void create_threadpool(int thrnum, int maxtasknum)
    {
    printf("begin call %s-----\n", __FUNCTION__);
    thrPool = (ThreadPool *)malloc(sizeof(ThreadPool));

    thrPool->thr_num = thrnum;
    thrPool->max_job_num = maxtasknum;
    thrPool->job_num = 0; //初始化的任务个数为0
    thrPool->job_push = 0; //任务队列添加的位置
    thrPool->job_pop = 0; //任务队列出队的位置
    thrPool->shutdown = 0; //是否摧毁线程池,1代表摧毁

    thrPool->tasks = (PoolTask *)malloc((sizeof(PoolTask) * maxtasknum)); //申请最大的任务队列

    //初始化锁和条件变量
    pthread_mutex_init(&thrPool->pool_lock, NULL);
    pthread_cond_init(&thrPool->empty_task, NULL);
    pthread_cond_init(&thrPool->not_empty_task, NULL);

    int i = 0;
    thrPool->threads = (pthread_t *)malloc(sizeof(pthread_t) * thrnum); //申请n个线程id的空间

    pthread_attr_t attr;
    pthread_attr_init(&attr);
    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
    for (i = 0; i < thrnum; i++)
    {
    pthread_create(&thrPool->threads[i], &attr, thrRun, (void *)thrPool); //创建多个线程
    }
    //printf("end call %s-----\n",__FUNCTION__);
    }

    //摧毁线程池
    void destroy_threadpool(ThreadPool *pool)
    {
    pool->shutdown = 1; //开始自爆
    pthread_cond_broadcast(&pool->not_empty_task); //诱杀

    int i = 0;
    for (i = 0; i < pool->thr_num; i++)
    {
    pthread_join(pool->threads[i], NULL);
    }

    pthread_cond_destroy(&pool->not_empty_task);
    pthread_cond_destroy(&pool->empty_task);
    pthread_mutex_destroy(&pool->pool_lock);

    free(pool->tasks);
    free(pool->threads);
    free(pool);
    }

    //添加任务到线程池
    void addtask(ThreadPool *pool)
    {
    //printf("begin call %s-----\n",__FUNCTION__);
    pthread_mutex_lock(&pool->pool_lock);

    //实际任务总数大于最大任务个数则阻塞等待(等待任务被处理)
    while (pool->max_job_num <= pool->job_num)
    {
    pthread_cond_wait(&pool->empty_task, &pool->pool_lock);
    }

    int taskpos = (pool->job_push++) % pool->max_job_num;
    //printf("add task %d tasknum===%d\n",taskpos,beginnum);
    pool->tasks[taskpos].tasknum = beginnum++;
    pool->tasks[taskpos].arg = (void *)&pool->tasks[taskpos];
    pool->tasks[taskpos].task_func = taskRun;
    pool->job_num++;

    pthread_mutex_unlock(&pool->pool_lock);

    pthread_cond_signal(&pool->not_empty_task); //通知子线程
    //printf("end call %s-----\n",__FUNCTION__);
    }

    //任务回调函数
    void taskRun(void *arg)
    {
    PoolTask *task = (PoolTask *)arg;
    int num = task->tasknum;
    printf("task %d is runing %lu\n", num, pthread_self());

    sleep(1);
    printf("task %d is done %lu\n", num, pthread_self());
    }

    int main()
    {
    create_threadpool(3, 20);
    int i = 0;
    for (i = 0; i < 50; i++)
    {
    addtask(thrPool); //模拟添加任务
    }

    sleep(20);
    destroy_threadpool(thrPool);

    return 0;
    }

复杂版本的线程池

主线程;子线程;管理线程

  • threadpool.h
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    #ifndef __THREADPOOL_H_
    #define __THREADPOOL_H_

    typedef struct threadpool_t threadpool_t;

    /**
    * @function threadpool_create
    * @descCreates a threadpool_t object.
    * @param thr_num thread num
    * @param max_thr_num max thread size
    * @param queue_max_size size of the queue.
    * @return a newly created thread pool or NULL
    */
    threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size);

    /**
    * @function threadpool_add
    * @desc add a new task in the queue of a thread pool
    * @param pool Thread pool to which add the task.
    * @param function Pointer to the function that will perform the task.
    * @param argument Argument to be passed to the function.
    * @return 0 if all goes well,else -1
    */
    int threadpool_add(threadpool_t *pool, void *(*function)(void *arg), void *arg);

    /**
    * @function threadpool_destroy
    * @desc Stops and destroys a thread pool.
    * @param pool Thread pool to destroy.
    * @return 0 if destory success else -1
    */
    int threadpool_destroy(threadpool_t *pool);

    /**
    * @desc get the thread num
    * @pool pool threadpool
    * @return # of the thread
    */
    int threadpool_all_threadnum(threadpool_t *pool);

    /**
    * desc get the busy thread num
    * @param pool threadpool
    * return # of the busy thread
    */
    int threadpool_busy_threadnum(threadpool_t *pool);

    #endif
  • threadpool.c
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    175
    176
    177
    178
    179
    180
    181
    182
    183
    184
    185
    186
    187
    188
    189
    190
    191
    192
    193
    194
    195
    196
    197
    198
    199
    200
    201
    202
    203
    204
    205
    206
    207
    208
    209
    210
    211
    212
    213
    214
    215
    216
    217
    218
    219
    220
    221
    222
    223
    224
    225
    226
    227
    228
    229
    230
    231
    232
    233
    234
    235
    236
    237
    238
    239
    240
    241
    242
    243
    244
    245
    246
    247
    248
    249
    250
    251
    252
    253
    254
    255
    256
    257
    258
    259
    260
    261
    262
    263
    264
    265
    266
    267
    268
    269
    270
    271
    272
    273
    274
    275
    276
    277
    278
    279
    280
    281
    282
    283
    284
    285
    286
    287
    288
    289
    290
    291
    292
    293
    294
    295
    296
    297
    298
    299
    300
    301
    302
    303
    304
    305
    306
    307
    308
    309
    310
    311
    312
    313
    314
    315
    316
    317
    318
    319
    320
    321
    322
    323
    324
    325
    326
    327
    328
    329
    330
    331
    332
    333
    334
    335
    336
    337
    338
    339
    340
    341
    342
    343
    344
    345
    346
    347
    348
    349
    350
    351
    352
    353
    354
    355
    356
    357
    358
    359
    360
    361
    362
    363
    364
    365
    366
    367
    368
    369
    370
    371
    372
    373
    374
    375
    376
    377
    378
    379
    380
    381
    382
    383
    384
    385
    386
    387
    388
    389
    390
    391
    392
    393
    394
    395
    396
    397
    398
    399
    400
    401
    402
    403
    404
    405
    406
    407
    408
    409
    410
    411
    412
    413
    414
    415
    416
    417
    418
    419
    420
    421
    422
    423
    424
    425
    #include <stdlib.h>
    #include <pthread.h>
    #include <unistd.h>
    #include <assert.h>
    #include <stdio.h>
    #include <string.h>
    #include <signal.h>
    #include <errno.h>
    #include "threadpool.h"

    #define DEFAULT_TIME 10 /*10s检测一次*/
    #define MIN_WAIT_TASK_NUM 10 /*如果queue_size > MIN_WAIT_TASK_NUM 添加新的线程到线程池*/
    #define DEFAULT_THREAD_VARY 10 /*每次创建和销毁线程的个数*/
    #define true 1
    #define false 0

    typedef struct
    {
    void *(*function)(void *); /* 函数指针,回调函数 */
    void *arg; /* 上面函数的参数 */
    } threadpool_task_t; /* 各子线程任务结构体 */

    /* 描述线程池相关信息 */
    struct threadpool_t
    {
    pthread_mutex_t lock; /* 用于锁住本结构体 */
    pthread_mutex_t thread_counter; /* 记录忙状态线程个数的琐 -- busy_thr_num */

    pthread_cond_t queue_not_full; /* 当任务队列满时,添加任务的线程阻塞,等待此条件变量 */
    pthread_cond_t queue_not_empty; /* 任务队列里不为空时,通知等待任务的线程 */

    pthread_t *threads; /* 存放线程池中每个线程tid的数组 */
    pthread_t adjust_tid; /* 存管理线程tid */
    threadpool_task_t *task_queue; /* 任务队列(数组首地址) */

    int min_thr_num; /* 线程池最小线程数 */
    int max_thr_num; /* 线程池最大线程数 */
    int live_thr_num; /* 当前存活线程个数 */
    int busy_thr_num; /* 忙状态线程个数 */
    int wait_exit_thr_num; /* 要销毁的线程个数 */

    int queue_front; /* task_queue队头下标 */
    int queue_rear; /* task_queue队尾下标 */
    int queue_size; /* task_queue队中实际任务数 */
    int queue_max_size; /* task_queue队列可容纳任务数上限 */

    int shutdown; /* 标志位,线程池使用状态,true或false */
    };

    void *threadpool_thread(void *threadpool);

    void *adjust_thread(void *threadpool);

    int is_thread_alive(pthread_t tid);
    int threadpool_free(threadpool_t *pool);

    //threadpool_create(3,100,100);
    threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size)
    {
    int i;
    threadpool_t *pool = NULL;
    do
    {
    if ((pool = (threadpool_t *)malloc(sizeof(threadpool_t))) == NULL)
    {
    printf("malloc threadpool fail");
    break; /*跳出do while*/
    }

    pool->min_thr_num = min_thr_num;
    pool->max_thr_num = max_thr_num;
    pool->busy_thr_num = 0;
    pool->live_thr_num = min_thr_num; /* 活着的线程数初值=最小线程数 */
    pool->wait_exit_thr_num = 0;
    pool->queue_size = 0; /* 有0个产品 */
    pool->queue_max_size = queue_max_size;
    pool->queue_front = 0;
    pool->queue_rear = 0;
    pool->shutdown = false; /* 不关闭线程池 */

    /* 根据最大线程上限数,给工作线程数组开辟空间,并清零 */
    pool->threads = (pthread_t *)malloc(sizeof(pthread_t) * max_thr_num);
    if (pool->threads == NULL)
    {
    printf("malloc threads fail");
    break;
    }
    memset(pool->threads, 0, sizeof(pthread_t) * max_thr_num);

    /* 队列开辟空间 */
    pool->task_queue = (threadpool_task_t *)malloc(sizeof(threadpool_task_t) * queue_max_size);
    if (pool->task_queue == NULL)
    {
    printf("malloc task_queue fail\n");
    break;
    }

    /* 初始化互斥琐、条件变量 */
    if (pthread_mutex_init(&(pool->lock), NULL) != 0 ||
    pthread_mutex_init(&(pool->thread_counter), NULL) != 0 ||
    pthread_cond_init(&(pool->queue_not_empty), NULL) != 0 ||
    pthread_cond_init(&(pool->queue_not_full), NULL) != 0)
    {
    printf("init the lock or cond fail\n");
    break;
    }

    //启动工作线程
    pthread_attr_t attr;
    pthread_attr_init(&attr);
    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
    for (i = 0; i < min_thr_num; i++)
    {
    pthread_create(&(pool->threads[i]), &attr, threadpool_thread, (void *)pool); /*pool指向当前线程池*/
    printf("start thread 0x%x...\n", (unsigned int)pool->threads[i]);
    }

    //创建管理者线程
    pthread_create(&(pool->adjust_tid), &attr, adjust_thread, (void *)pool);

    return pool;
    } while (0);

    /* 前面代码调用失败时,释放poll存储空间 */
    threadpool_free(pool);

    return NULL;
    }

    /* 向线程池中添加一个任务 */
    //threadpool_add(thp, process, (void*)&num[i]); /* 向线程池中添加任务 process: 小写---->大写*/
    int threadpool_add(threadpool_t *pool, void *(*function)(void *arg), void *arg)
    {
    pthread_mutex_lock(&(pool->lock));

    /* ==为真,队列已经满,调wait阻塞 */
    while ((pool->queue_size == pool->queue_max_size) && (!pool->shutdown))
    {
    pthread_cond_wait(&(pool->queue_not_full), &(pool->lock));
    }

    if (pool->shutdown)
    {
    pthread_cond_broadcast(&(pool->queue_not_empty));
    pthread_mutex_unlock(&(pool->lock));
    return 0;
    }

    /* 清空工作线程,调用的回调函数的参数arg */
    if (pool->task_queue[pool->queue_rear].arg != NULL)
    {
    pool->task_queue[pool->queue_rear].arg = NULL;
    }

    /*添加任务到任务队列里*/
    pool->task_queue[pool->queue_rear].function = function;
    pool->task_queue[pool->queue_rear].arg = arg;
    pool->queue_rear = (pool->queue_rear + 1) % pool->queue_max_size; /* 队尾指针移动,模拟环形 */
    pool->queue_size++;

    /*添加完任务后,队列不为空,唤醒线程池中等待处理任务的线程*/
    pthread_cond_signal(&(pool->queue_not_empty));
    pthread_mutex_unlock(&(pool->lock));

    return 0;
    }

    /* 线程池中各个工作线程 */
    void *threadpool_thread(void *threadpool)
    {
    threadpool_t *pool = (threadpool_t *)threadpool;
    threadpool_task_t task;

    while (true)
    {
    /* Lock must be taken to wait on conditional variable */
    /*刚创建出线程,等待任务队列里有任务,否则阻塞等待任务队列里有任务后再唤醒接收任务*/
    pthread_mutex_lock(&(pool->lock));

    /*queue_size == 0 说明没有任务,调 wait 阻塞在条件变量上,若有任务,跳过该while*/
    while ((pool->queue_size == 0) && (!pool->shutdown))
    {
    printf("thread 0x%x is waiting\n", (unsigned int)pthread_self());
    pthread_cond_wait(&(pool->queue_not_empty), &(pool->lock)); //暂停到这

    /*清除指定数目的空闲线程,如果要结束的线程个数大于0,结束线程*/
    if (pool->wait_exit_thr_num > 0)
    {
    pool->wait_exit_thr_num--;

    /*如果线程池里线程个数大于最小值,可以结束当前线程*/
    if (pool->live_thr_num > pool->min_thr_num)
    {
    printf("thread 0x%x is exiting\n", (unsigned int)pthread_self());
    pool->live_thr_num--;
    pthread_mutex_unlock(&(pool->lock));
    //pthread_detach(pthread_self());
    pthread_exit(NULL);
    }
    }
    }

    /*如果指定了true,要关闭线程池里的每个线程,自行退出处理--销毁线程池*/
    if (pool->shutdown)
    {
    pthread_mutex_unlock(&(pool->lock));
    printf("thread 0x%x is exiting\n", (unsigned int)pthread_self());
    //pthread_detach(pthread_self());
    pthread_exit(NULL); /* 线程自行结束 */
    }

    /*从任务队列里获取任务, 是一个出队操作*/
    task.function = pool->task_queue[pool->queue_front].function;
    task.arg = pool->task_queue[pool->queue_front].arg;

    pool->queue_front = (pool->queue_front + 1) % pool->queue_max_size; /* 出队,模拟环形队列 */
    pool->queue_size--;

    /*通知可以有新的任务添加进来*/
    pthread_cond_broadcast(&(pool->queue_not_full));

    /*任务取出后,立即将线程池琐释放*/
    pthread_mutex_unlock(&(pool->lock));

    /*执行任务*/
    printf("thread 0x%x start working\n", (unsigned int)pthread_self());
    pthread_mutex_lock(&(pool->thread_counter)); /*忙状态线程数变量琐*/
    pool->busy_thr_num++; /*忙状态线程数+1*/
    pthread_mutex_unlock(&(pool->thread_counter));

    (*(task.function))(task.arg); /*执行回调函数任务*/
    //task.function(task.arg);

    /*任务结束处理*/
    printf("thread 0x%x end working\n", (unsigned int)pthread_self());
    pthread_mutex_lock(&(pool->thread_counter));
    pool->busy_thr_num--; /*处理掉一个任务,忙状态数线程数-1*/
    pthread_mutex_unlock(&(pool->thread_counter));
    }

    pthread_exit(NULL);
    }

    /* 管理线程 */
    void *adjust_thread(void *threadpool)
    {
    int i;
    threadpool_t *pool = (threadpool_t *)threadpool;
    while (!pool->shutdown)
    {
    sleep(DEFAULT_TIME); /*定时 对线程池管理*/

    pthread_mutex_lock(&(pool->lock));
    int queue_size = pool->queue_size; /* 关注 任务数 */
    int live_thr_num = pool->live_thr_num; /* 存活 线程数 */
    pthread_mutex_unlock(&(pool->lock));

    pthread_mutex_lock(&(pool->thread_counter));
    int busy_thr_num = pool->busy_thr_num; /* 忙着的线程数 */
    pthread_mutex_unlock(&(pool->thread_counter));

    /* 创建新线程:任务数大于最小线程池个数, 且存活的线程数少于最大线程个数时 如:30>=10 && 40<100*/
    if (queue_size >= MIN_WAIT_TASK_NUM && live_thr_num < pool->max_thr_num)
    {
    pthread_mutex_lock(&(pool->lock));
    int add = 0;

    /*一次增加 DEFAULT_THREAD 个线程*/
    for (i = 0; i < pool->max_thr_num && add < DEFAULT_THREAD_VARY && pool->live_thr_num < pool->max_thr_num; i++)
    {
    if (pool->threads[i] == 0 || !is_thread_alive(pool->threads[i]))
    {
    pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool);
    add++;
    pool->live_thr_num++;
    }
    }

    pthread_mutex_unlock(&(pool->lock));
    }

    /* 销毁多余的空闲线程:忙线程X2小于存活的线程数,且存活的线程数大于最小线程数*/
    if ((busy_thr_num * 2) < live_thr_num && live_thr_num > pool->min_thr_num)
    {
    /* 一次销毁DEFAULT_THREAD个线程,随机10个 */
    pthread_mutex_lock(&(pool->lock));
    pool->wait_exit_thr_num = DEFAULT_THREAD_VARY; /* 要销毁的线程数,设置为10 */
    pthread_mutex_unlock(&(pool->lock));

    for (i = 0; i < DEFAULT_THREAD_VARY; i++)
    {
    /* 通知处在空闲状态的线程, 他们会自行终止*/
    pthread_cond_signal(&(pool->queue_not_empty));
    }
    }
    }

    return NULL;
    }

    int threadpool_destroy(threadpool_t *pool)
    {
    int i;
    if (pool == NULL)
    {
    return -1;
    }
    pool->shutdown = true;

    /*先销毁管理线程*/
    //pthread_join(pool->adjust_tid, NULL);

    for (i = 0; i < pool->live_thr_num; i++)
    {
    /*通知所有的空闲线程*/
    pthread_cond_broadcast(&(pool->queue_not_empty));
    }

    /*for (i = 0; i < pool->live_thr_num; i++)
    {
    pthread_join(pool->threads[i], NULL);
    }*/

    threadpool_free(pool);

    return 0;
    }

    int threadpool_free(threadpool_t *pool)
    {
    if (pool == NULL)
    {
    return -1;
    }

    if (pool->task_queue)
    {
    free(pool->task_queue);
    }

    if (pool->threads)
    {
    free(pool->threads);
    pthread_mutex_lock(&(pool->lock));
    pthread_mutex_destroy(&(pool->lock));
    pthread_mutex_lock(&(pool->thread_counter));
    pthread_mutex_destroy(&(pool->thread_counter));
    pthread_cond_destroy(&(pool->queue_not_empty));
    pthread_cond_destroy(&(pool->queue_not_full));
    }

    free(pool);
    pool = NULL;

    return 0;
    }

    int threadpool_all_threadnum(threadpool_t *pool)
    {
    int all_threadnum = -1;

    pthread_mutex_lock(&(pool->lock));
    all_threadnum = pool->live_thr_num;
    pthread_mutex_unlock(&(pool->lock));

    return all_threadnum;
    }

    int threadpool_busy_threadnum(threadpool_t *pool)
    {
    int busy_threadnum = -1;

    pthread_mutex_lock(&(pool->thread_counter));
    busy_threadnum = pool->busy_thr_num;
    pthread_mutex_unlock(&(pool->thread_counter));

    return busy_threadnum;
    }

    int is_thread_alive(pthread_t tid)
    {
    int kill_rc = pthread_kill(tid, 0); //发0号信号,测试线程是否存活
    if (kill_rc == ESRCH)
    {
    return false;
    }

    return true;
    }

    /*测试*/

    #if 1
    /* 线程池中的线程,模拟处理业务 */
    void *process(void *arg)
    {
    printf("thread 0x%x working on task %d\n ", (unsigned int)pthread_self(), *(int *)arg);
    sleep(1);
    printf("task %d is end\n", *(int *)arg);

    return NULL;
    }

    int main()
    {
    /*threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size);*/
    threadpool_t *thp = threadpool_create(3, 100, 100); /*创建线程池,池里最小3个线程,最大100,队列最大100*/
    printf("pool inited");

    //int *num = (int *)malloc(sizeof(int)*20);
    int num[20], i;
    for (i = 0; i < 20; i++)
    {
    num[i] = i;
    printf("add task %d\n", i);
    threadpool_add(thp, process, (void *)&num[i]); /* 向线程池中添加任务 */
    }

    sleep(10); /* 等子线程完成任务 */
    threadpool_destroy(thp);

    return 0;
    }

    #endif

线程池
https://lcf163.github.io/2021/07/24/线程池/
作者
乘风的小站
发布于
2021年7月24日
许可协议