Thread: Producer Consumer Buffer Problem - Segmentation Faults

    Nov 2010

    Producer Consumer Buffer Problem - Segmentation Faults

    I'm getting segmentation faults for the following code and I can't for the life of me figure out what's causing them. I'm supposed to have multiple threads producing and consuming buffer items and I need to use semaphores to protect shared variables. This is what I have so far:

    #include <iostream>
    #include <fstream>
    #include <stdio.h>
    #include <stdlib.h>
    #include <time.h>
    #include <cmath>
    #include <ctime>
    #include <iomanip>
    #include <vector>
    #include <string>
    #include <pthread.h>
    using namespace std;
    struct Date
        int day; //1-30
        int month; //1-12
        int year; //06
    struct Item
            Date SalesDate;
    	int storeID; //1-p
    	int registerNum; //1-10
    	double saleAmount; // 0.50 – 999.99
    void *produce(void *threadid); //function for producer processes
    void *consume(void *threadid); //function for consumer processes
    void cwait(int& S, int threadid, vector<int>& waiting, double l_Aggregate, double l_Store_Wide[], double l_Month_Wise[]); //wait semaphore function for consumers
    void pwait(int& S, int threadid, vector<int>& waiting); //wait semaphore function for producers
    void signal(int& S); //signal semaphore function
    void printStatistics(double l_Aggregate, double l_Store_Wide[], double l_Month_Wise[], int threadid); //function where consumers print local statistics
    Item produceItem(void *t); //function for production of new items with random values
    int numOfRecords, numOfRecordsRead, in, out; //variables for counting records produced, counting records consumed, and determining where to add or remove items from the buffer
    int p, c, b; //number of producers, consumers, and buffer size
    int empty, full, mutex; //semaphores
    vector<int> ewaiting; //waiting queue for empty semaphore
    vector<int> fwaiting; //waiting queue for full semaphore
    vector<int> mwaiting; //waiting queue for mutex semaphore
    int consumersDone; //flag vector for indicating when producer/consumer threads are finished executing
    vector<Item> buffer;
    vector<double> Store_Wide; //Store-wide total sales
    double Month_Wise[12]; //Month-wise total sales (in all stores)
    double Aggregate; // Aggregate sales (all sales together)
    double Time; //Total time for simulation (from begin to end)
    int main(int argc, char** argv)
        //start time
        clock_t start, stop;
        start = clock() + CLK_TCK;
        ifstream inFile;
        string fileName = argv[1];
        inFile >> p >> c >> b;
        Item emptyItem; = 0;
        emptyItem.SalesDate.month = 0;
        emptyItem.SalesDate.year = 0;
        emptyItem.storeID = 0;
        emptyItem.registerNum = 0;
        emptyItem.saleAmount =  0;
        for (int i = 0; i < b; i++)
        bool allConsumersDone = false;
        numOfRecords = 0;
        numOfRecordsRead = 0;
        in = 0;
        out = 0;
        empty = b;
    	full = 0;
    	mutex = 1;
    	Aggregate = 0;
    	consumersDone = 0;
    	for (int i = 0; i < p; i++)
        for (int i = 0; i < 12; i++)
            Month_Wise[i] = 0;
        pthread_t p_threads[p], c_threads[c];
        int rc, t;
        for(t=1; t < (p+1); t++)
            rc = pthread_create(&p_threads[t], NULL, produce, (void *)t);
            if (rc)
                printf("ERROR; return code from pthread_create() is %d\n", rc);
        for(t=1+p; t < (c+p+1); t++)
            rc = pthread_create(&c_threads[t], NULL, consume, (void *)t);
            if (rc)
                printf("ERROR; return code from pthread_create() is %d\n", rc);
        while (!allConsumersDone) //main waits until producer/consumer threads are finished executing
            if (consumersDone == c)
                allConsumersDone = true;
        //print global statistics and time
        cout << "GLOBAL STATISTICS" << endl;
        cout << "Global Aggregate: " << Aggregate << endl;
        cout << "Global Store Wide" << endl;
        for (int i = 0; i < p; i++)
            cout << "Store ID: " << (i + 1) << endl;
            cout << "Global Sales: " << Store_Wide[i] << endl;
        cout << "Global Month Wise" << endl;
        for (int i = 0; i < 12; i++)
            cout << "Month: " << (i + 1) << endl;
            cout << "Global Sales: " << Month_Wise[i] << endl;
        stop = clock()+CLK_TCK;
        Time = ((double) stop-start)/1000;
        cout << "Runtime: " << Time << " seconds" << endl << endl;
        return 0;
    void *produce(void *threadid) //production thread function
        while (numOfRecords < 50)
            Item nextProduced = produceItem(threadid); //produce item
            pwait(empty, (int) threadid, ewaiting);
            pwait(mutex, (int) threadid, mwaiting);
            //add item to buffer
            buffer[in] = nextProduced;
            in = ( in + 1) % b;
            int sleepTime = rand() % 36 + 5;
            sleep(sleepTime); //sleep for 5-40 ms
    void *consume(void *threadid) //consumption thread function
        double l_Store_Wide[p]; //Store-wide total sales (LOCAL)
        double l_Month_Wise[12]; //Month-wise total sales (in all stores) (LOCAL)
        double l_Aggregate = 0; // Aggregate sales (all sales together) (LOCAL)
        //initialize above variables
    	for (int i = 0; i < p; i++)
            l_Store_Wide[i] = 0;
        for (int i = 0; i < 12; i++)
            l_Month_Wise[i] = 0;
        while (numOfRecordsRead < 50)
    		cwait(full, (int) threadid, fwaiting, l_Aggregate, l_Store_Wide, l_Month_Wise);
    		cwait(mutex, (int) threadid, mwaiting, l_Aggregate, l_Store_Wide, l_Month_Wise);
            //remove item from buffer to nextConsumed
            Item nextConsumed = buffer[out];
            //create blank item to replace consumed item in buffer
            Item emptyItem;
   = 0;
            emptyItem.SalesDate.month = 0;
            emptyItem.SalesDate.year = 0;
            emptyItem.storeID = 0;
            emptyItem.registerNum = 0;
            emptyItem.saleAmount =  0;
            buffer[out] = emptyItem;
            out = (out + 1) % b;
            //consume item in nextConsumed/ record statistics
            l_Aggregate = l_Aggregate + nextConsumed.saleAmount;
            l_Store_Wide[nextConsumed.storeID - 1] = l_Store_Wide[nextConsumed.storeID - 1] + nextConsumed.saleAmount;
            l_Month_Wise[nextConsumed.SalesDate.month - 1] = l_Month_Wise[nextConsumed.SalesDate.month - 1] + nextConsumed.saleAmount;
        //print local statistics
        printStatistics(l_Aggregate, l_Store_Wide, l_Month_Wise, (int) threadid);
    void cwait(int& S, int threadid, vector<int>& waiting, double l_Aggregate, double l_Store_Wide[], double l_Month_Wise[])
        while (S <= 0 || waiting[0] != threadid)
            if (numOfRecordsRead >= 50)
                printStatistics(l_Aggregate, l_Store_Wide, l_Month_Wise, threadid); //print local statistics
    void pwait(int& S, int threadid, vector<int>& waiting)
        while (S <= 0 || waiting[0] != threadid)
            if (numOfRecords >= 50)
                pthread_exit(NULL); //terminate thread
    void signal(int& S)
    Item produceItem(void *t)
        Item newItem; = rand() % 30 + 1;
        newItem.SalesDate.month = rand() % 12 + 1;
        newItem.SalesDate.year = 6;
    	newItem.storeID = (int) t; //producerThreadID
    	newItem.registerNum = rand() % 10 + 1;
    	newItem.saleAmount =  static_cast<double>( rand() ) * 999.49 / static_cast<double>( RAND_MAX ) + 0.5 ; // rand() * range / RAND_MAX + low
    	return newItem;
    void printStatistics(double l_Aggregate, double l_Store_Wide[], double l_Month_Wise[], int threadid)
        cout << "LOCAL STATISTICS" << endl;
        cout << "Consumer ID: " << threadid << endl;
        cout << "Local Aggregate: " << l_Aggregate << endl;
        Aggregate = Aggregate + l_Aggregate;
        cout << "Local Store Wide" << endl;
        for (int i = 0; i < p; i++)
            cout << "Store ID: " << (i + 1) << endl;
            cout << "Local Sales: " << l_Store_Wide[i] << endl;
            Store_Wide[i] = Store_Wide[i] + l_Store_Wide[i];
        cout << "Local Month Wise" << endl;
        for (int i = 0; i < 12; i++)
            cout << "Month: " << (i + 1) << endl;
            cout << "Local Sales: " << l_Month_Wise[i] << endl;
            Month_Wise[i] = Month_Wise[i] + l_Month_Wise[i];
    I've tried using gdb to figure it out but haven't had any luck. Any help is appreciated.

        pthread_t p_threads[p], c_threads[c];
        int rc, t;
        for(t=1; t < (p+1); t++)
            rc = pthread_create(&p_threads[t], NULL, produce, (void *)t);
            if (rc)
                printf("ERROR; return code from pthread_create() is %d\n", rc);
        for(t=1+p; t < (c+p+1); t++)
            rc = pthread_create(&c_threads[t], NULL, consume, (void *)t);
    How about knowing that valid array subscripts run from 0 to N-1
    If you dance barefoot on the broken glass of undefined behaviour, you've got to expect the occasional cut.
    If at first you don't succeed, try writing your phone number on the exam paper.

    >> ... and I need to use semaphores to protect shared variables.
    You haven't used any synchronization at all. If two or more threads access the same memory location, and at least one writes to that location, then all access must be protected using synchronization primitives provided by your threading library.


    Thank you

