/* From Learning C by example: Producer-consumer problem with semaphores https://jlmedina123.wordpress.com/2014/04/08/255/ */ #include #include #include #include #include #define SIZE 5 #define NUMB_THREADS 6 #define PRODUCER_LOOPS 2 typedef int buffer_t; buffer_t buffer[SIZE]; int buffer_index; pthread_mutex_t buffer_mutex; /* initially buffer will be empty. full_sem will be initialized to buffer SIZE, which means SIZE number of producer threads can write to it. And empty_sem will be initialized to 0, so no consumer can read from buffer until a producer thread posts to empty_sem */ sem_t full_sem; /* when 0, buffer is full */ sem_t empty_sem; /* when 0, buffer is empty. Kind of like an index for the buffer */ /* sem_post algorithm: mutex_lock sem_t->mutex sem_t->value++ mutex_unlock sem_t->mutex sem_wait algorithn: mutex_lock sem_t->mutex while (sem_t->value > 0) { mutex_unlock sem_t->mutex sleep... wake up mutex_lock sem_t->mutex } sem_t->value-- mutex_unlock sem_t->mutex */ void insertbuffer(buffer_t value) { if (buffer_index < SIZE) { buffer[buffer_index++] = value; } else { printf("Buffer overflow\n"); } } buffer_t dequeuebuffer() { if (buffer_index > 0) { return buffer[--buffer_index]; // buffer_index-- would be error! } else { printf("Buffer underflow\n"); } return 0; } void *producer(void *thread_n) { int thread_numb = *(int *)thread_n; buffer_t value; int i=0; unsigned int seed; time_t now=time(NULL); seed = (unsigned int)now; while (i++ < PRODUCER_LOOPS) { sleep(rand_r(&seed) % 10); value = rand_r(&seed) % 100; sem_wait(&full_sem); // sem=0: wait. sem>0: go and decrement it /* possible race condition here. After this thread wakes up, another thread could aqcuire mutex before this one, and add to list. Then the list would be full again and when this thread tried to insert to buffer there would be a buffer overflow error */ pthread_mutex_lock(&buffer_mutex); /* protecting critical section */ insertbuffer(value); pthread_mutex_unlock(&buffer_mutex); sem_post(&empty_sem); // post (increment) emptybuffer semaphore printf("Producer %d added %d to buffer\n", thread_numb, value); } pthread_exit(0); } void *consumer(void *thread_n) { int thread_numb = *(int *)thread_n; buffer_t value; int i=0; while (i++ < PRODUCER_LOOPS) { sem_wait(&empty_sem); /* there could be race condition here, that could cause buffer underflow error */ pthread_mutex_lock(&buffer_mutex); value = dequeuebuffer(value); pthread_mutex_unlock(&buffer_mutex); sem_post(&full_sem); // post (increment) fullbuffer semaphore printf("Consumer %d dequeue %d from buffer\n", thread_numb, value); } pthread_exit(0); } int main(int argc, int **argv) { buffer_index = 0; pthread_mutex_init(&buffer_mutex, NULL); sem_init(&full_sem, // sem_t *sem 0, // int pshared. 0 = shared between threads of process, 1 = shared between processes SIZE); // unsigned int value. Initial value sem_init(&empty_sem, 0, 0); /* full_sem is initialized to buffer size because SIZE number of producers can add one element to buffer each. They will wait semaphore each time, which will decrement semaphore value. empty_sem is initialized to 0, because buffer starts empty and consumer cannot take any element from it. They will have to wait until producer posts to that semaphore (increments semaphore value) */ pthread_t thread[NUMB_THREADS]; int thread_numb[NUMB_THREADS]; int i; for (i = 0; i < NUMB_THREADS; ) { thread_numb[i] = i; pthread_create(thread + i, // pthread_t *t NULL, // const pthread_attr_t *attr producer, // void *(*start_routine) (void *) thread_numb + i); // void *arg i++; thread_numb[i] = i; // playing a bit with thread and thread_numb pointers... pthread_create(&thread[i], // pthread_t *t NULL, // const pthread_attr_t *attr consumer, // void *(*start_routine) (void *) &thread_numb[i]); // void *arg i++; } for (i = 0; i < NUMB_THREADS; i++) pthread_join(thread[i], NULL); pthread_mutex_destroy(&buffer_mutex); sem_destroy(&full_sem); sem_destroy(&empty_sem); return 0; }