>> but I want to set bDataConsumed well before the thread is actually finished
Yes, that's what you should be doing. Just copy off the data, set/signal bDataConsumed, unlock, then process the data. Otherwise, the variable should be named "bDataConsumedAndProcessed" .
>> bTimeToExit = 1;
That's unsynchronized access since bTimeToExit is a shared variable. You have to acquire the lock, set bTimeToExit, broadcast the condition, release the lock.
>> while (!bDataReady || bTimeToExit)
This was an error on my part (post #9) since it allows consumers to exit while there is still data to consume.
Here's an updated example - the way it should have been done the first time:
Code:
// compiled with: gcc -Wall -pthread -D_XOPEN_SOURCE=700 -std=c99 main.c
#include <stdio.h>
#include <stdlib.h>
#include <stdarg.h>
#include <pthread.h>
#include <time.h>
pthread_cond_t DataReady = PTHREAD_COND_INITIALIZER;
int bDataReady = 0;
pthread_cond_t DataConsumed = PTHREAD_COND_INITIALIZER;
int bDataConsumed = 0;
pthread_mutex_t Lock = PTHREAD_MUTEX_INITIALIZER;
int Data;
int bTimeToExit = 0;
pthread_mutex_t IOLock = PTHREAD_MUTEX_INITIALIZER;
void sync_printf(const char *format, ...)
{
va_list marker;
va_start(marker, format);
pthread_mutex_lock(&IOLock);
vprintf(format, marker);
fflush(stdout);
pthread_mutex_unlock(&IOLock);
}//sync_printf
void* consumer(void *p)
{
int myid = (int)p;
int currentData;
while (1)
{
pthread_mutex_lock(&Lock);
while (!bDataReady)
{
if (bTimeToExit)
{
pthread_mutex_unlock(&Lock);
sync_printf("%d: Consumer Exiting\n", myid);
return NULL;
}//if
pthread_cond_wait(&DataReady, &Lock);
}//while
// "consume" the data and release the lock asap
currentData = Data;
bDataConsumed = 1;
pthread_cond_signal(&DataConsumed);
bDataReady = 0;
pthread_mutex_unlock(&Lock);
// now "process" the consumed data w/o the lock being held
sync_printf("%d: consumed %d\n", myid, currentData);
struct timespec delay = {0};
delay.tv_nsec = rand() % 10000;
nanosleep(&delay, NULL);
}//while
return NULL; // never reached
}//consumer
void* producer(void *p)
{
pthread_mutex_lock(&Lock);
for (int i = 1; i <= 30; ++i)
{
// produce i
Data = i;
bDataReady = 1;
pthread_cond_signal(&DataReady);
bDataConsumed = 0;
pthread_mutex_unlock(&Lock);
// Do any other producer work while not holding the lock
pthread_mutex_lock(&Lock);
while (!bDataConsumed)
pthread_cond_wait(&DataConsumed, &Lock);
}//for
bDataReady = 0;
pthread_mutex_unlock(&Lock);
sync_printf("Producer Exiting\n");
return NULL;
}//producer
int main()
{
const int NumConsumers = 4;
pthread_t p, c[NumConsumers];
for (int n = 0; n < NumConsumers; ++n)
pthread_create(c + n, NULL, consumer, (void*)(n + 1));
pthread_create(&p, NULL, producer, NULL);
pthread_join(p, NULL);
// producer has produced all it can
// for testing, produce one more thing and tell consumers to exit
pthread_mutex_lock(&Lock);
Data = 100;
bDataReady = 1;
bTimeToExit = 1;
pthread_cond_broadcast(&DataReady);
pthread_mutex_unlock(&Lock);
for (int n = 0; n < NumConsumers; ++n)
pthread_join(c[n], NULL);
printf("All done\n");
return 0;
}//main
Another common way of doing this is to use some kind of "container" data type (linked-list etc.) instead of a single shared variable (Data) to move data to the consumers. This allows you to eliminate the bDataConsumed condition since the container can buffer up produced data until it's ready for consumption. Then the predicate condition in the consumer becomes "while container not empty". With this method, you just have to watch out for run-away production (consumers can't keep up).
gg