使用内核链表实现线程池的接口设计
1. 基础知识
关于链表和线程的使用方法可以查看我的技术笔记:数据结构 和 系统编程 ,在这两个页面都可以找到链表和线程的一些基础知识和使用示例。
在 Linux 系统中,线程池是一种用于管理和调度线程的机制。它由一组可以复用的线程组成,这些线程被保存在线程池中,并等待分配任务。
当一个新的任务到达时,线程池会从其中选择一个线程来执行该任务。如果所有线程都正在执行任务,则新任务将被放在队列中等待,直到有线程可用。
使用线程池的优点包括:
- 可以更有效地利用 CPU 资源,因为线程可以在任务之间复用。
- 可以更容易地管理线程,因为所有线程都在线程池中。
- 可以更容易地维护线程的数量,因为可以限制线程池的大小。
线程池的实现原理可以参考这篇笔记:线程池。
内核链表的使用方法可以参考这篇笔记:内核链表。
下面我们来看看怎么通过内核链表跟线程的组合实现线程池。
2. 线程池接口设计
2.1 线程结构体
- 接口文档
原型 | struct thread |
---|---|
功能描述 | 线程节点,包含线程ID,通过内核链表连成一个线程队列,方便后续进行扩展 |
成员列表 | pthread_t id; // 线程 ID struct list_head list; // 小结构体 |
备注 | 线程队列最终是形成一条内核链表 |
- 代码实现
// 线程池中的线程
struct thread {
pthread_t id; // 线程 ID
struct list_head list; // 小结构体
};
2.2 任务结构体
- 接口文档
原型 | struct task |
---|---|
功能描述 | 任务节点,包含需要执行的函数及其参数,通过链表连成一个任务队列 |
成员列表 | void *(*func)(void *arg); void *arg; struct list_head list; |
备注 | 任务队列最终是形成一条内核链表 |
- 代码实现
// 线程池中的任务
struct task {
void *(*func)(void *); // 任务函数
void *arg; // 任务函数的参数
struct list_head list; // 小结构体
};
2.3 线程池结构体
- 接口文档
原型 | thread_pool |
---|---|
功能描述 | 包含线程池的所有信息 |
成员列表 | struct list_head threads; // 线程队列 struct list_head queue; // 任务队列 pthread_mutex_t lock; // 互斥锁 pthread_cond_t cond; // 条件变量 unsigned active_threads; // 线程队列中的线程数量 unsigned waiting_tasks; // 任务链队列中等待的任务个数 bool shutdown; // 线程池销毁标记 |
备注 | 活跃线程个数由用户自行定义,但至少包含一条活跃线程 |
- 代码实现
struct thread_pool {
struct list_head threads; // 线程队列
struct list_head queue; // 任务队列
pthread_mutex_t lock; // 互斥锁
pthread_cond_t cond; // 条件变量
unsigned active_threads; // 线程队列中的线程数量
unsigned waiting_tasks; // 任务链队列中等待的任务个数
bool shutdown; // 线程池销毁标记
};
2.4 线程池初始化
- 接口文档
原型 | void thread_pool_init(struct thread_pool *pool, int num_threads) |
---|---|
功能描述 | 创建一个新的线程池,包含 num_threads 个活跃线程 |
参数 | pool: 线程池指针 num_threads: 初始活跃线程个数(大于等于1) |
返回值 | 无 |
所在头文件 | thread_pool.h |
备注 | 线程池最少线程个数为1,最大值不超过 MAX_ACTIVE_THREADS |
- 代码实现
void thread_pool_init(struct thread_pool *pool, int num_threads)
{
int i;
// 检查 num_threads 的值是否超过了最大值
if (num_threads > MAX_ACTIVE_THREADS) {
num_threads = MAX_ACTIVE_THREADS;
}
pool->waiting_tasks = 0; //初始化任务队列中等待的任务个数
pool->active_threads = num_threads; //初始化活跃线程个数
pool->shutdown = false; //初始化线程池销毁标记
INIT_LIST_HEAD(&pool->threads); // 初始化线程队列
INIT_LIST_HEAD(&pool->queue); // 初始化任务队列
// 是否初始化成功
if (!list_empty(&pool->queue) || !list_empty(&pool->threads)) {
fprintf(stderr, "init fail.\n");
exit(1);
}
pthread_mutex_init(&pool->lock, NULL);
pthread_cond_init(&pool->cond, NULL);
// 创建线程池中的线程
for (i = 0; i < num_threads; i++) {
struct thread *thread = (struct thread *)malloc(sizeof(struct thread));
INIT_LIST_HEAD(&thread->list);
// 使用 pthread_create 创建线程
int ret = pthread_create(&thread->id, NULL, thread_pool_main, pool);
if (ret != 0) {
perror("create threads error");
exit(1);
}
// 将新创建的线程添加到线程池中
list_add_tail(&thread->list, &pool->threads);
}
}
2.5 投送任务
- 接口文档
原型 | void thread_pool_add_task(struct thread_pool *pool, void *(*func)(void *), void *arg) |
---|---|
功能描述 | 往线程池投送任务 |
参数 | pool: 线程池指针 func: 投送至线程池的执行例程 arg: 执行例程 func 的参数,若该执行例程不需要参数可设置为NULL |
返回值 | 无 |
所在头文件 | thread_pool.h |
备注 | 任务队列中最大任务个数为 MAX_WAITING_TASKS |
- 代码实现
void thread_pool_add_task(struct thread_pool *pool, void *(*func)(void *), void *arg)
{
// 分配内存给新任务
struct task *new_task = (struct task *) malloc(sizeof(struct task));
if(new_task == NULL) {
perror("allocate memory error");
exit(1);
}
// 初始化任务节点
new_task->func = func;
new_task->arg = arg;
// 获取锁
pthread_mutex_lock(&pool->lock);
// 超过最大任务
if(pool->waiting_tasks >= MAX_WAITING_TASKS) {
pthread_mutex_unlock(&pool->lock);
fprintf(stderr, "too many tasks.\n");
free(new_task);
exit(1);
}
// 将新任务添加到任务队列的末尾
list_add_tail(&new_task->list, &pool->queue);
// 新任务加一
pool->waiting_tasks++;
// 唤醒一个线程,让它开始执行新任务
pthread_cond_signal(&pool->cond);
// 释放锁
pthread_mutex_unlock(&pool->lock);
}
2.6 增加活跃线程
- 接口文档
原型 | void thread_pool_add_thread(struct thread_pool *pool, int num_threads) |
---|---|
功能描述 | 增加线程池中活跃线程的个数 |
参数 | pool: 需要增加线程的线程池指针 num_threads: 新增线程个数 |
返回值 | 无 |
所在头文件 | thread_pool.h |
备注 | 增加的活跃线程的数量和之前初始化的线程数量不能大于 MAX_ACTIVE_THREADS |
- 代码实现
void thread_pool_add_thread(struct thread_pool *pool, int num_threads)
{
int i;
// 检查 num_threads 的值是否超过了最大值
if (num_threads > MAX_ACTIVE_THREADS - pool->active_threads) {
num_threads = MAX_ACTIVE_THREADS - pool->active_threads;
}
// 创建新线程
for (i = 0; i < num_threads; i++) {
struct thread *thread = (struct thread *)malloc(sizeof(struct thread));
INIT_LIST_HEAD(&thread->list);
// 使用 pthread_create 创建线程
int ret = pthread_create(&thread->id, NULL, thread_pool_main, pool);
if (ret != 0) {
perror("create threads error");
exit(1);
}
// 将新创建的线程添加到线程池中
list_add_tail(&thread->list, &pool->threads);
}
// 更新线程池中的活跃线程数
pool->active_threads += num_threads;
}
2.7 删除线程
- 接口文档
原型 | void thread_pool_remove_thread(struct thread_pool *pool, int num_threads); |
---|---|
功能描述 | 删除线程池中活跃线程的个数 |
参数 | pool: 需要删除线程的线程池指针 num_threads: 要删除的线程个数,该参数设置为0时直接返回当前线程池线程总数,对线程池不造成任何其它影响 |
返回值 | 无 |
所在头文件 | thread_pool.h |
备注 | 1,线程池至少会存在1条活跃线程 2,如果被删除的线程正在执行任务,则将等待其完成任务之后删除 |
- 代码实现
void thread_pool_remove_thread(struct thread_pool *pool, int num_threads)
{
int i;
for (i = 0; i < num_threads; i++) {
// 从线程池中获取最后一个线程
struct thread *thread = list_last_entry(&pool->threads, struct thread, list);
// 使用 pthread_cancel 取消线程
int ret = pthread_cancel(thread->id);
if (ret != 0) {
fprintf(stderr, "pthread_cancel failed: %s\n", strerror(ret));
}
// 检查线程是否正在执行任务
if (pool->waiting_tasks > 0) {
// 如果线程正在执行任务,则等待它完成
pthread_join(thread->id, NULL);
}
// 从线程池中删除线程
list_del(&thread->list);
free(thread);
// 更新线程池中的线程数
pool->active_threads--;
}
}
2.8 销毁线程池
- 接口文档
原型 | void thread_pool_destroy(thread_pool *pool); |
---|---|
功能描述 | 阻塞等待所有任务完成,然后立即销毁整个线程池,释放所有资源和内存 |
参数 | pool:将要销毁的线程池 |
返回值 | 成功返回true,失败返回false |
所在头文件 | thread_pool.h |
备注 | 无 |
- 代码实现
void thread_pool_destroy(struct thread_pool *pool)
{
pool->shutdown = true; //线程池的销毁标记
pthread_cond_broadcast(&pool->cond); //唤醒所有线程
int i;
// 销毁线程池中的所有线程
for (i = 1; i <= pool->active_threads; i++) {
struct thread *thread = list_entry(pool->threads.next, struct thread, list);
// 使用 pthread_join 等待线程结束
int ret = pthread_join(thread->id, NULL);
if(ret != 0) {
printf("join id[%d] error: %s\n", i, strerror(errno));
} else {
printf("[%ld] is joined, i=%d\n", thread->id, i);
}
// 从线程池中删除已退出的线程
list_del(&thread->list);
free(thread);
}
// 重置任务队列
INIT_LIST_HEAD(&pool->queue);
// 销毁锁和条件变量
pthread_mutex_destroy(&pool->lock);
pthread_cond_destroy(&pool->cond);
// 释放线程池占用的内存
// free(pool);
}
2.9 线程入口函数
- 用来执行任务的线程函数:
void *thread_pool_main(void *arg)
{
struct thread_pool *pool = (struct thread_pool *) arg;
struct task *task;
while (1) {
// 访问任务队列前加锁,为防止取消后死锁,注册处理例程 handle
pthread_cleanup_push(handler, (void *)&pool->lock); //防止死锁
// 获取锁
pthread_mutex_lock(&pool->lock);
// 如果任务队列为空,则阻塞等待
while (list_empty(&pool->queue) && !pool->shutdown) {
pthread_cond_wait(&pool->cond, &pool->lock);
}
//如果任务为空,线程池被销毁,则立即释放互斥锁并退出
if(list_empty(&pool->queue) && pool->shutdown) {
pthread_mutex_unlock(&pool->lock);
pthread_exit(NULL);
}
// 将队列中的第一个任务取出
task = list_first_entry(&pool->queue, struct task, list);
list_del(&task->list);
pool->waiting_tasks--; // 任务数量减1
// 释放锁
pthread_mutex_unlock(&pool->lock);
// 任务取走,解锁,并弹栈 handle(但不执行它)
pthread_cleanup_pop(0);
//执行任务期间拒绝取消请求
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
task->func(task->arg); // 执行任务
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
free(task); //释放资源
}
pthread_exit(NULL);
}
- 注册死锁处理函数
static void handler(void *arg)
{
// 响应取消请求之后自动处理:释放互斥锁
pthread_mutex_unlock((pthread_mutex_t *)arg);
}
3. 使用示例
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include "thread_pool.h"
// 任务1
void *task(void *arg)
{
printf("Task 1 is running...\n");
// 任务1休眠一段时间
sleep(1);
printf("Task 1 is done.\n");
//pthread_exit(NULL);
}
int main(int argc, char *argv[])
{
// 创建线程池
struct thread_pool pool;
thread_pool_init(&pool, 4);
printf("当前线程数为:%u\n", pool.active_threads);
sleep(1);
// 删除线程
thread_pool_remove_thread(&pool, 2);
printf("当前线程数为:%u\n", pool.active_threads);
sleep(1);
// 添加线程到线程池中
thread_pool_add_thread(&pool, 3);
printf("当前线程数为:%u\n", pool.active_threads);
sleep(1);
// 添加任务到线程池中
thread_pool_add_task(&pool, task, NULL);
sleep(1);
thread_pool_add_task(&pool, task, NULL);
sleep(1);
thread_pool_add_task(&pool, task, NULL);
sleep(1);
// 主线程休眠一段时间,让任务完成
sleep(1);
printf("\033[31m""---------Function: [%s]---------Line: [%d]---------\n""\033[m", __FUNCTION__, __LINE__);
// 清理线程池
thread_pool_destroy(&pool);
printf("\033[31m""---------Function: [%s]---------Line: [%d]---------\n""\033[m", __FUNCTION__, __LINE__);
return 0;
}
4. 源码
想要源码的话,不妨上我的🍍GitHub看看吧!