Back
Featured image of post 线程池

线程池

基础知识

线程池

基本功能模块:

  1. 线程池创建函数
  2. 线程池删除函数
  3. 线程池回调函数
  4. 线程池添加函数
  5. 线程池数据结构
  6. 线程任务数据结构
  7. 线程本身数据结构(由pid唯一确认)

首先实现数据结构:

线程任务数据结构:

struct nTask {
	void (*task_func)(struct nTask *task);
	void *user_data;

	struct nTask *prev;
	struct nTask *next;
};

这是任务中的一个个体,任务队列头存储在线程池数据结构中 void (*task_func)(struct nTask *task)函数指针表明函数为task_func且参数为struct nTask, 参数若为void是否更好

线程本身数据结构:

struct nWorker {
	pthread_t threadid;
	int terminate;
	struct nManager *manager;

	struct nWorker *prev;
	struct nWorker *next;
};

pid唯一标识线程,terminate用于标识该线程应被删除,存储manager(也就是所属线程池)是为了通过manager找到task队列以获取task

线程池数据结构:

typedef struct nManager {
	struct nTask *tasks;
	struct nWorker *workers;

	pthread_mutex_t mutex;
	pthread_cond_t cond;
} ThreadPool;

可以看到线程池其实只是一个管理者,使用mutex控制各个线程对进程内公共资源的访问,保证同时只有一个线程在访问公共资源,cond来控制各个线程的状态(处于等待队列(阻塞)或可以运行(运行、就绪态))细节在回调函数中

然后实现API:

线程池创建函数:

int nThreadPoolCreate(ThreadPool *pool, int numWorkers) {

	if (pool == NULL) return -1;
	if (numWorkers < 1) numWorkers = 1;
	memset(pool, 0, sizeof(ThreadPool));

	pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER;
	memcpy(&pool->cond, &blank_cond, sizeof(pthread_cond_t));

	//pthread_mutex_init(&pool->mutex, NULL);
	pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER;
	memcpy(&pool->mutex, &blank_mutex, sizeof(pthread_mutex_t));
	

	int i = 0;
	for (i = 0;i < numWorkers;i ++) {
		struct nWorker *worker = (struct nWorker*)malloc(sizeof(struct nWorker));
		if (worker == NULL) {
			perror("malloc");
			return -2;
		}
		memset(worker, 0, sizeof(struct nWorker));
		worker->manager = pool; //

		int ret = pthread_create(&worker->threadid, NULL, nThreadPoolCallback, worker);
		if (ret) {
			perror("pthread_create");
			free(worker);
			return -3;
		}
		
		LIST_INSERT(worker, pool->workers);
	}

	// success
	return 0; 

}

根据传入线程数量参数,创建含有指定数量线程的线程池,初始化条件变量和互斥量,初始化线程本身然后放入队列

线程池回调函数:

static void *nThreadPoolCallback(void *arg) {

	struct nWorker *worker = (struct nWorker*)arg;

	while (1) {

		pthread_mutex_lock(&worker->manager->mutex);
		while (worker->manager->tasks == NULL) {
			if (worker->terminate) break;
			pthread_cond_wait(&worker->manager->cond, &worker->manager->mutex);
		}
		if (worker->terminate) {
			pthread_mutex_unlock(&worker->manager->mutex);
			break;
		}

		struct nTask *task = worker->manager->tasks;
		LIST_REMOVE(task, worker->manager->tasks);

		pthread_mutex_unlock(&worker->manager->mutex);

		task->task_func(task); //go task
	}

	free(worker);
	
}

线程在创建并将其pid放入队列中后就运行回调函数,通过回调函数可以看到线程会阻塞在mutex上也可能阻塞在cond上 pthread_cond_wait函数使用两个参数: cond和mutex 这个函数等待在cond上并在收到signal或broadcast后返回,使主函数继续运行 在函数等待的时候,首先将该线程放到等待队列上然后释放mutex,这样可以保证其他线程对公共资源的访问 在收到cond的single或broadcast后线程会争夺mutex锁住临界区资源,然后自己消费,消费完后释放互斥锁 使用while循环可以保证在有资源到来的时候也就是signal cond的时候,速度慢的线程(没有抢到互斥锁的线程)可以发现资源已经被消耗完并重新通过pthread_cond_wait进入等待区

可以看到只有在对临界区资源的访问中才加锁:访问任务队列并从中获取任务

线程池添加函数:

int nThreadPoolPushTask(ThreadPool *pool, struct nTask *task) {

	pthread_mutex_lock(&pool->mutex);

	LIST_INSERT(task, pool->tasks);

	pthread_cond_signal(&pool->cond);

	pthread_mutex_unlock(&pool->mutex);

}

添加后通知整个线程队列,让他们消费

线程池删除函数:

int nThreadPoolDestory(ThreadPool *pool, int nWorker) {

	struct nWorker *worker = NULL;

	for (worker = pool->workers;worker != NULL;worker = worker->next) {
		worker->terminate;
	}

	pthread_mutex_lock(&pool->mutex);

	pthread_cond_broadcast(&pool->cond);

	pthread_mutex_unlock(&pool->mutex);

	pool->workers = NULL;
	pool->tasks = NULL;

	return 0;
	
}

设置删除位terminate之后唤醒所有等待队列中的线程叫他们检查自己的删除位terminate 如果要删除该线程就退出while循环然后释放worker再退出

附一个使用代码:

#if 1

#define THREADPOOL_INIT_COUNT	20
#define TASK_INIT_SIZE			1000


void task_entry(struct nTask *task) { //type 

	//struct nTask *task = (struct nTask*)task;
	int idx = *(int *)task->user_data;

	printf("idx: %d\n", idx);

	free(task->user_data);
	free(task);
}


int main(void) {

	ThreadPool pool = {0};
	
	nThreadPoolCreate(&pool, THREADPOOL_INIT_COUNT);
	// pool --> memset();
	
	int i = 0;
	for (i = 0;i < TASK_INIT_SIZE;i ++) {
		struct nTask *task = (struct nTask *)malloc(sizeof(struct nTask));
		if (task == NULL) {
			perror("malloc");
			exit(1);
		}
		memset(task, 0, sizeof(struct nTask));

		task->task_func = task_entry;
		task->user_data = malloc(sizeof(int));
		*(int*)task->user_data  = i;

		
		nThreadPoolPushTask(&pool, task);
	}

	getchar();
	
}


#endif
comments powered by Disqus
Built with Hugo
Theme Stack designed by Jimmy