线程池
基本功能模块:
- 线程池创建函数
- 线程池删除函数
- 线程池回调函数
- 线程池添加函数
- 线程池数据结构
- 线程任务数据结构
- 线程本身数据结构(由pid唯一确认)
首先实现数据结构:
线程任务数据结构:
struct nTask {
void (*task_func)(struct nTask *task);
void *user_data;
struct nTask *prev;
struct nTask *next;
};
这是任务中的一个个体,任务队列头存储在线程池数据结构中 void (*task_func)(struct nTask *task)函数指针表明函数为task_func且参数为struct nTask, 参数若为void是否更好
线程本身数据结构:
struct nWorker {
pthread_t threadid;
int terminate;
struct nManager *manager;
struct nWorker *prev;
struct nWorker *next;
};
pid唯一标识线程,terminate用于标识该线程应被删除,存储manager(也就是所属线程池)是为了通过manager找到task队列以获取task
线程池数据结构:
typedef struct nManager {
struct nTask *tasks;
struct nWorker *workers;
pthread_mutex_t mutex;
pthread_cond_t cond;
} ThreadPool;
可以看到线程池其实只是一个管理者,使用mutex控制各个线程对进程内公共资源的访问,保证同时只有一个线程在访问公共资源,cond来控制各个线程的状态(处于等待队列(阻塞)或可以运行(运行、就绪态))细节在回调函数中
然后实现API:
线程池创建函数:
int nThreadPoolCreate(ThreadPool *pool, int numWorkers) {
if (pool == NULL) return -1;
if (numWorkers < 1) numWorkers = 1;
memset(pool, 0, sizeof(ThreadPool));
pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER;
memcpy(&pool->cond, &blank_cond, sizeof(pthread_cond_t));
//pthread_mutex_init(&pool->mutex, NULL);
pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER;
memcpy(&pool->mutex, &blank_mutex, sizeof(pthread_mutex_t));
int i = 0;
for (i = 0;i < numWorkers;i ++) {
struct nWorker *worker = (struct nWorker*)malloc(sizeof(struct nWorker));
if (worker == NULL) {
perror("malloc");
return -2;
}
memset(worker, 0, sizeof(struct nWorker));
worker->manager = pool; //
int ret = pthread_create(&worker->threadid, NULL, nThreadPoolCallback, worker);
if (ret) {
perror("pthread_create");
free(worker);
return -3;
}
LIST_INSERT(worker, pool->workers);
}
// success
return 0;
}
根据传入线程数量参数,创建含有指定数量线程的线程池,初始化条件变量和互斥量,初始化线程本身然后放入队列
线程池回调函数:
static void *nThreadPoolCallback(void *arg) {
struct nWorker *worker = (struct nWorker*)arg;
while (1) {
pthread_mutex_lock(&worker->manager->mutex);
while (worker->manager->tasks == NULL) {
if (worker->terminate) break;
pthread_cond_wait(&worker->manager->cond, &worker->manager->mutex);
}
if (worker->terminate) {
pthread_mutex_unlock(&worker->manager->mutex);
break;
}
struct nTask *task = worker->manager->tasks;
LIST_REMOVE(task, worker->manager->tasks);
pthread_mutex_unlock(&worker->manager->mutex);
task->task_func(task); //go task
}
free(worker);
}
线程在创建并将其pid放入队列中后就运行回调函数,通过回调函数可以看到线程会阻塞在mutex上也可能阻塞在cond上 pthread_cond_wait函数使用两个参数: cond和mutex 这个函数等待在cond上并在收到signal或broadcast后返回,使主函数继续运行 在函数等待的时候,首先将该线程放到等待队列上然后释放mutex,这样可以保证其他线程对公共资源的访问 在收到cond的single或broadcast后线程会争夺mutex锁住临界区资源,然后自己消费,消费完后释放互斥锁 使用while循环可以保证在有资源到来的时候也就是signal cond的时候,速度慢的线程(没有抢到互斥锁的线程)可以发现资源已经被消耗完并重新通过pthread_cond_wait进入等待区
可以看到只有在对临界区资源的访问中才加锁:访问任务队列并从中获取任务
线程池添加函数:
int nThreadPoolPushTask(ThreadPool *pool, struct nTask *task) {
pthread_mutex_lock(&pool->mutex);
LIST_INSERT(task, pool->tasks);
pthread_cond_signal(&pool->cond);
pthread_mutex_unlock(&pool->mutex);
}
添加后通知整个线程队列,让他们消费
线程池删除函数:
int nThreadPoolDestory(ThreadPool *pool, int nWorker) {
struct nWorker *worker = NULL;
for (worker = pool->workers;worker != NULL;worker = worker->next) {
worker->terminate;
}
pthread_mutex_lock(&pool->mutex);
pthread_cond_broadcast(&pool->cond);
pthread_mutex_unlock(&pool->mutex);
pool->workers = NULL;
pool->tasks = NULL;
return 0;
}
设置删除位terminate之后唤醒所有等待队列中的线程叫他们检查自己的删除位terminate 如果要删除该线程就退出while循环然后释放worker再退出
附一个使用代码:
#if 1
#define THREADPOOL_INIT_COUNT 20
#define TASK_INIT_SIZE 1000
void task_entry(struct nTask *task) { //type
//struct nTask *task = (struct nTask*)task;
int idx = *(int *)task->user_data;
printf("idx: %d\n", idx);
free(task->user_data);
free(task);
}
int main(void) {
ThreadPool pool = {0};
nThreadPoolCreate(&pool, THREADPOOL_INIT_COUNT);
// pool --> memset();
int i = 0;
for (i = 0;i < TASK_INIT_SIZE;i ++) {
struct nTask *task = (struct nTask *)malloc(sizeof(struct nTask));
if (task == NULL) {
perror("malloc");
exit(1);
}
memset(task, 0, sizeof(struct nTask));
task->task_func = task_entry;
task->user_data = malloc(sizeof(int));
*(int*)task->user_data = i;
nThreadPoolPushTask(&pool, task);
}
getchar();
}
#endif