多线程编程已经是现在网络编程中常用的编程技术,设计一个良好的线程池库显得尤为重要。在 UNIX(WIN32下可以采用类似的方法,acl 库中的线程池是跨平台的) 环境下设计线程池库主要是如何用好如下系统 API:
1、pthread_cond_signal/pthread_cond_broadcast:生产者线程通知线程池中的某个或一些消费者线程池,接收处理任务;
2、pthread_cond_wait:线程池中的消费者线程等待线程条件变量被通知;
3、pthread_mutex_lock/pthread_mutex_unlock:线程互斥锁的加锁及解锁函数。
下面的代码示例是大家常见的线程池的设计方式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 struct thread_job { struct thread_job *next ; void (*func)(void *); void *arg; ... };struct thread_pool { int max_threads; int curr_threads; int idle_threads; pthread_mutex_t mutex; pthread_cond_t cond; thread_job *first; thread_job *last; ... }static void *consumer_thread (void *arg) { struct thread_pool *pool = (struct thread_pool*) arg; struct thread_job *job ; int status; pthread_mutex_lock(&pool->mutex); while (1 ) { if (pool->first != NULL ) { job = pool->first; pool->first = job->next; if (pool->last == job) pool->last = NULL ; pthread_mutex_unlock(&pool->mutex); job->func(job->arg); free (job); pthread_mutex_lock(&pool->mutex); } else { pool->idle_threads++; status = pthread_cond_wait(&pool->cond, &pool->mutex); pool->idle_threads--; if (status == 0 ) continue ; pool->curr_threads--; pthread_mutex_unlock(&pool->mutex); break ; } } return NULL ; }void add_thread_job (struct thread_pool *pool, void (*func)(void *), void *arg) { struct thread_job *job = (struct thread_job*) calloc (1 , sizeof (*job)); job->func = func; job->arg = arg; pthread_mutex_lock(&pool->mutex); if (pool->first == NULL ) pool->first = job; else pool->last->next = job; pool->last = job; job->next = NULL ; if (pool->idle_threads > 0 ) { pthread_mutex_unlock(&pool->mutex); pthread_cond_signal(&pool->cond); } else if (pool->curr_threads < pool->max_threads) { pthread_t id; pthread_attr_t attr; pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); if (pthread_create(&id, &attr, consumer_thread, pool) == 0 ) pool->curr_threads++; pthread_mutex_unlock(&pool->mutex); pthread_attr_destroy(&attr); } }struct thread_pool *create_thread_pool (int max_threads) { struct thread_pool *pool = (struct thread_pool*) calloc (1 , sizeof (*pool)); pool->max_threads = max_threads; pthread_mutex_init(&pool->mutex); pthread_cond_init(&pool->cond); ... return pool; }static void thread_callback (void * arg) { ... }void test (void ) { struct thread_pool *pool = create_thread_pool(100 ); int i; for (i = 0 ; i < 1000000 ; i++) add_thread_job(pool, thread_callback, NULL ); }
乍一看去,似乎也没有什么问题,象很多经典的开源代码中也是这样设计的,但有一个重要问题被忽视了:线程池设计中的惊群现象。大家可以看到,整个线程池只有一个线程条件变量和线程互斥锁,生产者线程和消费者线程(即线程池中的子线程)正是通过这两个变量进行同步的。生产者线程每添加一个新任务,都会调用 pthread_cond_signal 一次,由操作系统唤醒一个在线程条件变量等待的消费者线程,但如果查看 pthread_cond_signal API 的系统帮助,你会发现其中有一句话:调用此函数后,系统会唤醒在相同条件变量上等待的一个或多个线程。而正是这句模棱两可的话没有引起很多线程池设计者的注意,这也是整个线程池中消费者线程收到信号通知后产生惊群现象的根源所在,并且是消费者线程数量越多,惊群现象越严重—-意味着 CPU 占用越高,线程池的调度性能越低。
要想避免如上线程池设计中的惊群问题,在仍然共用一个线程互斥锁的条件下,给每一个消费者线程创建一个线程条件变量,生产者线程在添加任务时,找到空闲的消费者线程,将任务置入该消费者的任务队列中同时只通知 (pthread_cond_signal) 该消费者的线程条件变量,消费者线程与生产者线程虽然共用相同的线程互斥锁(因为有全局资源及调用 pthread_cond_wait 所需),但线程条件变量的通知过程却是定向通知的,未被通知的消费者线程不会被唤醒,这样惊群现象也就不会产生了。
当然,还有一些设计上的细节需要注意,比如:当没有空闲消费者线程时,需要将任务添加进线程池的全局任务队列中,消费者线程处理完自己的任务后需要查看一下线程池中的全局任务队列中是否还有未处理的任务。
更多的线程池的设计细节请参考 acl (https://sourceforge.net/projects/acl/ ) 库中 lib_acl/src/thread/acl_pthread_pool.c 中的代码。
github:https://github.com/acl-dev/acl gitee: https://github.com/acl-dev/acl