Hello I am currently working on the producer consumer problem using circular buffers and various amounts of consumers/producers that are supposed to produce MAX_ITEM and then stop. I somewhere in my code am hitting some kind of lock and cannot find where I need to fix my code. Any tips would be appreciated.
Thanks
Kurjack
Code:
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
#define MAX_ITEM 5
#define BUFFER_SIZE 3
#define NUM_BUFFERS 2
#define NUM_PRODUCERS 2
#define NUM_CONSUMERS 2
void *producerProcess(void *ptr);
void *consumerProcess(void *ptr);
int circularQueue_isEmpty(int queue);
int circularQueue_isFull(int queue);
void printBuffer(int selectedBuffer);
int condpflag[NUM_PRODUCERS];
int condcflag[NUM_CONSUMERS];
pthread_cond_t condc[NUM_CONSUMERS];
pthread_cond_t condp[NUM_PRODUCERS];
typedef struct{
int buffer[BUFFER_SIZE];
int in, out;
//pthread_mutex_t mutex;// = PTHREAD_MUTEX_INITIALIZER;
}circularQueue;
circularQueue CQ[NUM_BUFFERS];
pthread_mutex_t mutex[NUM_BUFFERS];
int main() {
pthread_t Producer[NUM_PRODUCERS];
pthread_t Consumer[NUM_CONSUMERS];
int i;
int status;
//pthread_attr_t attr;
//pthread_attr_init(&attr);
for(i = 0; i < NUM_CONSUMERS; i++)
{
pthread_cond_init(&condc[i], 0);
pthread_cond_init(&condp[i], 0);
}
//INITIALIZE QUEUES
for(i = 0; i < NUM_BUFFERS; i++)
{
pthread_mutex_init(&mutex[i], 0);
CQ[i].in = 0;
CQ[i].out = 0;
}
/*CREATE PRODUCERS THREADS*/
for(i = 0; i < NUM_PRODUCERS; i++)
{
condpflag[i] = 0;
status = pthread_create(&Producer[i], NULL, producerProcess, (void*) i);
if(status != 0)
{
printf("I producer thread %d could not be created", i);
exit(1);
}
//pthread_join(Producer[i], NULL);
}
/*CREATE CONSUMER THREADS*/
for(i = 0; i < NUM_CONSUMERS; i++)
{
condcflag[i] = 0;
status = pthread_create(&Consumer[i], NULL, consumerProcess, (void*) i);
if(status != 0)
{
printf("I consumer thread %d could not be created", i);
exit(1);
}
//pthread_join(Consumer[i], NULL);
}
/*for(i = 0; i < NUM_PRODUCERS; i++)
pthread_join(Producer[i], NULL);
for(i = 0; i < NUM_CONSUMERS; i++)
pthread_join(Consumer[i], NULL);*/
for(i = 0; i < NUM_CONSUMERS; i++)
pthread_cond_destroy(&condc[i]);
for(i = 0; i < NUM_CONSUMERS; i++)
pthread_cond_destroy(&condp[i]);
for(i = 0; i < NUM_BUFFERS; i++)
pthread_mutex_destroy(&mutex[i]);
pthread_exit(NULL);
}
void *producerProcess(void *ptr)
{
//unsigned int iseed = (unsigned int)time(NULL);
int producer = (int)ptr;
int selectedBuffer, otherbuff;
int produced;
int i;
int consumer;
srand(pthread_self());
for(i = 0; i < MAX_ITEM; i++)
{
selectedBuffer = rand() % NUM_BUFFERS;
produced = rand() % 100;
//LOCK
pthread_mutex_lock(&mutex[selectedBuffer]);
while(circularQueue_isFull(otherbuff) == 1)
{
condpflag[producer]=1;
pthread_cond_wait(&condp[producer], &mutex[selectedBuffer]);
//BUFFER IS FULL WAIT
otherbuff = rand() % NUM_BUFFERS;
}
if(otherbuff != -1)
{
selectedBuffer = otherbuff;
otherbuff = -1;
}
//pthread_mutex_lock(&mutex[selectedBuffer]);
CQ[selectedBuffer].buffer[CQ[selectedBuffer].in] = produced;
CQ[selectedBuffer].in = (CQ[selectedBuffer].in + 1) % BUFFER_SIZE;
printf("Producer %d: B%d:", producer, selectedBuffer);
printBuffer(selectedBuffer);
while(condcflag[consumer]==0)
consumer = rand() % NUM_CONSUMERS;
if(condcflag[consumer] == 1)
{
condcflag[consumer] = 0;
pthread_cond_signal(&condc[consumer]);
}
pthread_mutex_unlock(&mutex[selectedBuffer]);
}
printf("\nProducer %d finished \n", producer);
}
void *consumerProcess(void *ptr)
{
srand(pthread_self());
int selectedBuffer, otherbuff = -1;
int consumer = (int)ptr;
int item;
int i;
int producer;
for(i = 0; i < MAX_ITEM; i++)
{
selectedBuffer = rand()% NUM_BUFFERS;
pthread_mutex_lock(&mutex[selectedBuffer]);
while(circularQueue_isEmpty(otherbuff) == 1)
{
condpflag[consumer]=1;
pthread_cond_wait(&condc[consumer], &mutex[selectedBuffer]);
otherbuff = rand()% NUM_BUFFERS;
//CONSUMER WAIT BUFFER IS EMPTy
}
if(otherbuff != -1)
{
selectedBuffer = otherbuff;
otherbuff = -1;
}
//pthread_mutex_lock(&mutex[selectedBuffer]);
item = CQ[selectedBuffer].buffer[CQ[selectedBuffer].out];
CQ[selectedBuffer].out = (CQ[selectedBuffer].out + 1) % BUFFER_SIZE;
printf("Consumer %d: B%d:", consumer, selectedBuffer);
printBuffer(selectedBuffer);
//UNLOCK MUTEX
while(condpflag[producer]==0)
producer = rand() % NUM_PRODUCERS;
if(condpflag[producer] == 1)
{
condpflag[producer] = 0;
pthread_cond_signal(&condp[producer]);
}
pthread_mutex_unlock(&mutex[selectedBuffer]);
}
printf("CONSUMER %d finished \n", consumer);
}
int circularQueue_isEmpty(int queue)
{
if(CQ[queue].in == CQ[queue].out)
return 1;
else
return 0;
}
int circularQueue_isFull(int queue)
{
if(((CQ[queue].in + 1) % BUFFER_SIZE) == CQ[queue].out)
return 1;
else
return 0;
}
void printBuffer(int selectedBuffer)
{
int i;
for(i = CQ[selectedBuffer].out; i != CQ[selectedBuffer].in; i = (i+1) % BUFFER_SIZE)
printf(" %d ", CQ[selectedBuffer].buffer[i]);
printf("\n");
fflush(stdout);
}