C++ linux线程池

你好旅行者!欢迎来到cyt的练功房。本篇文章来记录一下我之前做的一个在linux下运行的线程池的小项目,中途也碰到了一些死锁之类的小问题,以下代码都是用C写的,因为当时再写NVCC不是很了解C++,但是应该就多了一个函数指针相关的东西,还有队列用指针实现了一下,其他应该都可以用C++的思路看懂。

首先我们来定义一下线程池结构体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
//每次减少线程数最多减少NUM个
const int NUMB = 2;
typedef struct Task
{
//函数指针 代表传入的函数功能是function,函数的参数是arg
void (*function)(void* arg);
void* arg;

}Task;
struct ThreadPool
{
//任务队列,用链表实现,减少锁住的数量,提高线程池的效率,不然就要锁住整个队列
Task* taskQ;
//任务队列最多可以存放多少任务
int queueCapacity;
//任务队列目前有存放多少任务
int queueSize;
// 队头
int queueFront;
//队尾
int queueRear;
//管理者线程id,就一个所以不需要指针
pthread_t managerID;
//工作的线程id
pthread_t* threadIDs;
//最少线程
int minNum;
int maxNum;
//正在工作的忙线程数,需要设定另一个锁,因为这个数容易被改变
int busyNum;
//存活的线程数
int liveNum;
//需要杀死的线程个数
int exitNum;
pthread_mutex_t mutexPool;//互斥锁 锁整个线程池
pthread_mutex_t mutexBusy;//互斥锁 锁在忙的任务数。
int shutdown;
//条件变量 判断任务队列是否空或者是否满
pthread_cond_t notFull;
pthread_cond_t notEmpty;
};

第一次写不出那么全面的线程池结构体也无所谓,毕竟都是一点点加上去的,我这边就只希望我自己能看懂了。

创建和摧毁线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
//创建线程池
typedef struct ThreadPool ThreadPool;
//定义线程池的最少有多少线程,最多有多少线程。
ThreadPool* threadPoolInit(int min, int max, int queueSize) {
//为malloc分配空间,可以用new 无所谓,结构体改成class就行了
ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool));
//do while 呢表示线程池的初始化只进行一次
do {
printf("initialize begin\n");
if (pool == NULL) {
printf("malloc failed");
break;
}
pool->threadIDs = (pthread_t*)malloc(sizeof(pthread_t) * max);
// 赋值想必可以看懂
pool->maxNum = max;
pool->minNum = min;
pool->liveNum = min;
pool->busyNum = 0;
pool->exitNum = 0;
// 如果某一个锁或者条件变量创建失败就break
if (pthread_mutex_init(&pool->mutexPool, NULL) != 0 ||
pthread_mutex_init(&pool->mutexBusy, NULL) != 0 ||
pthread_cond_init(&pool->notEmpty, NULL) != 0 ||
pthread_cond_init(&pool->notFull, NULL) != 0)
{
printf("mutex or cond failed");
break;
}
// 一般pthread_t的栈大小是1kb哟
memset(pool->threadIDs, 0, sizeof(pthread_t) * max);
pool->taskQ = (Task*)malloc(sizeof(Task) * queueSize);
pool->queueCapacity = queueSize;
pool->queueFront = 0;
pool->queueRear = 0;
pool->shutdown = 0;
//管理者线程
//此函数具体用法可自行百度,manager 是该线程需要执行的函数方法,可以在下文自行定义,可按照我的,最后一个是传入给manager的参数
pthread_create(&pool->managerID, NULL, manager, pool);
//工作者线程
//此函数具体用法可自行百度,worker 是该线程需要执行的函数方法,可以在下文自行定义,可按照我的,最后一个是传入给worker的参数
for (int i = 0; i < min; ++i) {
pthread_create(&pool->threadIDs[i], NULL, worker, pool);
}
printf("initialize complete\n");
//初始化完毕
return pool;
} while (0);
if (pool->threadIDs && pool) {
free(pool->threadIDs);
}
if (pool && pool->taskQ) {
free(pool->taskQ);
}
free(pool);
return NULL;
}
//摧毁线程池
int threadDestory(ThreadPool* pool)
{
if (!pool)return -1;
//设置关闭位为1,让工作线程自杀,所以会有下文的唤醒消费者
pool->shutdown = 1;
//阻塞回收管理者
pthread_join(pool->managerID, NULL);
//唤醒消费者
for (int i = 0; i < pool->liveNum; ++i) {
pthread_cond_signal(&pool->notEmpty);
}
//释放堆内存
if (pool->taskQ) {
free(pool->taskQ);
}
if (pool->threadIDs) {
free(pool->threadIDs);
}
pthread_mutex_destroy(&pool->mutexBusy);
pthread_mutex_destroy(&pool->mutexPool);
pthread_cond_destroy(&pool->notEmpty);
pthread_cond_destroy(&pool->notFull);
free(pool);
pool = NULL;
return 0;
}

写消费者和管理者的工作用代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
//工作线程对应上文的worker函数arg肯定是pool啦
void* worker(void* arg) {
ThreadPool* pool = (ThreadPool*)arg;
//工作用线程要不断循环进行
while (1) {
//每次取任务只能先锁住整个线程池不然会形成多个工作线程做同一件任务
pthread_mutex_lock(&pool->mutexPool);
//当前任务队列是否为空,是否没有任务可以做了
while (pool->queueSize == 0 && !pool->shutdown) {
// 阻塞工作线程
pthread_cond_wait(&pool->notEmpty, &pool->mutexPool);
//判断是否需要自毁,在manager中可以设定
if (pool->exitNum > 0) {
pool->exitNum--;
// 如果比最小线程数大才销毁
if (pool->liveNum > pool->minNum) {
pool->liveNum--;
pthread_mutex_unlock(&pool->mutexPool);
threadExit(NULL);
}
}

}
//判断线程池是否关闭
if (pool->shutdown) {
pthread_mutex_unlock(&pool->mutexPool);
threadExit(pool);
}
Task task;
//取出队头的工作函数
task.function = pool->taskQ[pool->queueFront].function;
task.arg = pool->taskQ[pool->queueFront].arg;
//双向队列,对头那个出了
pool->queueFront = (pool->queueFront + 1) % pool->queueCapacity;
pool->queueSize--;
//唤醒生产者因为又可以加任务了
pthread_cond_signal(&pool->notFull);
//解锁咯和上面那个对应
pthread_mutex_unlock(&pool->mutexPool);
//这个代表开始工作了,因为busynum很容易变化就给他单独搞个锁
pthread_mutex_lock(&pool->mutexBusy);
pool->busyNum++;
pthread_mutex_unlock(&pool->mutexBusy);
//运行一手
task.function(task.arg);
free(task.arg);
task.arg = NULL;
//执行结束就释放掉
pthread_mutex_lock(&pool->mutexBusy);
pool->busyNum--;
pthread_mutex_unlock(&pool->mutexBusy);
}
return NULL;
}
//管理者线程
void* manager(void* arg) {
ThreadPool* pool = (ThreadPool*)arg;
while (!pool->shutdown) {
//每隔2s检测一次
sleep(2);
pthread_mutex_lock(&pool->mutexPool);
int queue = pool->queueSize;
int live = pool->liveNum;
pthread_mutex_unlock(&pool->mutexPool);
//取出忙的线程数量
pthread_mutex_lock(&pool->mutexBusy);
int busy = pool->busyNum;
pthread_mutex_unlock(&pool->mutexBusy);
//添加线程
//任务个数>存货线程 就增加线程
if (queue > live && live < pool->maxNum) {
pthread_mutex_lock(&pool->mutexPool);
int counter = 0;
for (int i = 0; i < pool->maxNum && live < pool->maxNum && counter < NUMB; ++i) {
if (pool->threadIDs[i] == 0) {
pthread_create(&pool->threadIDs[i], NULL, worker, pool);
counter++;
pool->liveNum++;
}
}
pthread_mutex_unlock(&pool->mutexPool);
}
//销毁线程我定义的是线程池中的线程数大于需要的任务的两倍
if (busy * 2 < live && live > pool->minNum) {
pthread_mutex_lock(&pool->mutexPool);
pool->exitNum = NUMB;
pthread_mutex_unlock(&pool->mutexPool);
//让线程自己关掉具体表现为型几个,然后走入worker中的while
for (int i = 0; i < NUMB; ++i) {
pthread_cond_signal(&pool->notEmpty);
}
}
}
}

增加任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg)
{
pthread_mutex_lock(&pool->mutexPool);
while (pool->queueSize == pool->queueCapacity && !pool->shutdown) {
//阻塞生产者线程
pthread_cond_wait(&pool->notFull, &pool->mutexPool);
}
if (pool->shutdown) {
pthread_mutex_unlock(&pool->mutexPool);
return;
}
pool->taskQ[pool->queueRear].function = func;
pool->taskQ[pool->queueRear].arg = arg;
pool->queueRear = (pool->queueRear + 1) % pool->queueCapacity;
pool->queueSize++;
pthread_cond_signal(&pool->notEmpty);
pthread_mutex_unlock(&pool->mutexPool);

}

看一下live和work有几个

1
2
3
4
5
6
7
8
9
10
11
12
13
14
int getWorkNum(ThreadPool* pool)
{
pthread_mutex_lock(&pool->mutexPool);
int work = pool->busyNum;
pthread_mutex_unlock(&pool->mutexPool);
return work;
}
int getLiveNum(ThreadPool* pool)
{
pthread_mutex_lock(&pool->mutexPool);
int live = pool->liveNum;
pthread_mutex_unlock(&pool->mutexPool);
return live;
}

线程退出

1
2
3
4
5
6
7
8
9
10
11
void threadExit(ThreadPool* pool) {
pthread_t tid = pthread_self();
//找到自己这个线程的位置并置零
for (int i = 0; i < pool->maxNum; ++i) {
if (pool->threadIDs[i] == tid) {
pool->threadIDs[i] = 0;
break;
}
}
pthread_exit(NULL);
}

以上就完成了一个线程池的编写啦,无敌!!!!

使用搜索:谷歌必应百度