Skip to content

Thread Pools — In a Nutshell

Posted on:July 23, 2023 at 09:23 PM

In computer programming, a thread pool is a software design pattern for achieving concurrency of execution in a computer program. — Wikipedia

thread-pools-in-a-nutshell Photo by Steve Johnson Unsplash

If we want to execute our tasks in a multithreaded environment, like running every task in its own thread, we should create a thread for every task. (This task can be an API request, a new WebSocket connection, etc.)

But the creation and running of threads are not cheap and may induce the problem of waiting for multiple threads. This situation deducts performance.

Thread pools come as a solution to this. Thread pools contain a certain number of worker threads that run forever. (e.g. until the stop/cancellation token)

In most of the implementations, queues are used to give tasks to threads in a thread pool. Multiple threads listen for the queue to fetch tasks (so the queue should be thread-safe). A Thread takes a task, does it, and waits for the next task. (e.g. Single Producer — Multiple Consumer problem)

thread-pool Thread Pool, Image by Author

Practical Example with the POSIX Threads in C

Don’t forget to link with the POSIX threads library. ( -lpthread )

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <unistd.h>

// Change it
#define THREAD_NUM 4
#define TASK_QUEUE_SIZE 128

typedef struct _task {
  void (*task_fn)(int, int);
  int arg1, arg2;
} task;

task task_queue[TASK_QUEUE_SIZE];
size_t task_count = 0;

pthread_mutex_t _mtx;

// Condition Variable
pthread_cond_t cond_queue;

// Executing task
void execute_task(task *task) { task->task_fn(task->arg1, task->arg2); }

void register_task(task task) {
  pthread_mutex_lock(&_mtx);
  task_queue[task_count++] = task;
  pthread_mutex_unlock(&_mtx);
  // Signal that there is a work
  // Since we are adding a single task and a single task is runned by a
  // single thread,  we just need to wake single thread up
  pthread_cond_signal(&cond_queue);
}

void *start_thread(void *args) {
  while (1) {
    task task;

    // This section should run in single thread
    pthread_mutex_lock(&_mtx);
    while (task_count == 0) {
      // Wait for condition variable
      pthread_cond_wait(&cond_queue, &_mtx);
    }
    task = task_queue[0];
    int i;
    for (i = 0; i < task_count - 1; i++) {
      task_queue[i] = task_queue[i + 1]; // Shift the queue
    }
    task_count--;

    pthread_mutex_unlock(&_mtx);
    // -----------------------------------------------------

    execute_task(&task); // Run the task
  }
}

void sum_and_product(int a, int b) {
  // Simulate a work
  usleep(50000);
  int result = a + b;
  printf("The sum of %d and %d is %d\n", a, b, result);
}

int main(int argc, char **argv) {
  pthread_t th[THREAD_NUM];
  pthread_mutex_init(&_mtx, NULL);
  pthread_cond_init(&cond_queue, NULL);

  int i;
  for (i = 0; i < THREAD_NUM; i++) {
    if (pthread_create(&th[i], NULL, &start_thread, NULL) != 0) {
      perror("Failed to create the thread");
    }
  }

  srand(time(NULL));
  for (i = 0; i < TASK_QUEUE_SIZE - 1; i++) {
    task t = {
        .task_fn = &sum_and_product,
        .arg1 = rand() % 100,
        .arg2 = rand() % 100,
    };
    register_task(t);
  }

  for (i = 0; i < THREAD_NUM; i++) {
    if (pthread_join(th[i], NULL) != 0) {
      perror("Failed to join the thread");
    }
  }
  pthread_mutex_destroy(&_mtx);
  pthread_cond_destroy(&cond_queue);

  return 0;
}

References