In computer programming, a thread pool is a software design pattern for achieving concurrency of execution in a computer program. — Wikipedia
Photo by Steve Johnson Unsplash
If we want to execute our tasks in a multithreaded environment, like running every task in its own thread, we should create a thread for every task. (This task can be an API request, a new WebSocket connection, etc.)
But the creation and running of threads are not cheap and may induce the problem of waiting for multiple threads. This situation deducts performance.
Thread pools come as a solution to this. Thread pools contain a certain number of worker threads that run forever. (e.g. until the stop/cancellation token)
In most of the implementations, queues are used to give tasks to threads in a thread pool. Multiple threads listen for the queue to fetch tasks (so the queue should be thread-safe). A Thread takes a task, does it, and waits for the next task. (e.g. Single Producer — Multiple Consumer problem)
Thread Pool, Image by Author
Practical Example with the POSIX Threads in C
Don’t forget to link with the POSIX threads library. ( -lpthread
)
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <unistd.h>
// Change it
#define THREAD_NUM 4
#define TASK_QUEUE_SIZE 128
typedef struct _task {
void (*task_fn)(int, int);
int arg1, arg2;
} task;
task task_queue[TASK_QUEUE_SIZE];
size_t task_count = 0;
pthread_mutex_t _mtx;
// Condition Variable
pthread_cond_t cond_queue;
// Executing task
void execute_task(task *task) { task->task_fn(task->arg1, task->arg2); }
void register_task(task task) {
pthread_mutex_lock(&_mtx);
task_queue[task_count++] = task;
pthread_mutex_unlock(&_mtx);
// Signal that there is a work
// Since we are adding a single task and a single task is runned by a
// single thread, we just need to wake single thread up
pthread_cond_signal(&cond_queue);
}
void *start_thread(void *args) {
while (1) {
task task;
// This section should run in single thread
pthread_mutex_lock(&_mtx);
while (task_count == 0) {
// Wait for condition variable
pthread_cond_wait(&cond_queue, &_mtx);
}
task = task_queue[0];
int i;
for (i = 0; i < task_count - 1; i++) {
task_queue[i] = task_queue[i + 1]; // Shift the queue
}
task_count--;
pthread_mutex_unlock(&_mtx);
// -----------------------------------------------------
execute_task(&task); // Run the task
}
}
void sum_and_product(int a, int b) {
// Simulate a work
usleep(50000);
int result = a + b;
printf("The sum of %d and %d is %d\n", a, b, result);
}
int main(int argc, char **argv) {
pthread_t th[THREAD_NUM];
pthread_mutex_init(&_mtx, NULL);
pthread_cond_init(&cond_queue, NULL);
int i;
for (i = 0; i < THREAD_NUM; i++) {
if (pthread_create(&th[i], NULL, &start_thread, NULL) != 0) {
perror("Failed to create the thread");
}
}
srand(time(NULL));
for (i = 0; i < TASK_QUEUE_SIZE - 1; i++) {
task t = {
.task_fn = &sum_and_product,
.arg1 = rand() % 100,
.arg2 = rand() % 100,
};
register_task(t);
}
for (i = 0; i < THREAD_NUM; i++) {
if (pthread_join(th[i], NULL) != 0) {
perror("Failed to join the thread");
}
}
pthread_mutex_destroy(&_mtx);
pthread_cond_destroy(&cond_queue);
return 0;
}