Thread: Pthread Circular buffer not looping, need help!

  1. #1
    Registered User
    Join Date
    May 2020
    Posts
    29

    Pthread Circular buffer not looping, need help!

    I'm designing a circular buffer that reads a input text file and writes to output.

    It works by creating a pthread named Lift-R and calls request_t() to load values onto the buffer. When the buffer is full, it goes to the three other created pthreads lift-1 to lift-3 which all call lift(). The problem is that once lift-3 is done, the code suddenly becomes stuck a waiting deadlock for another lift thread (which there isn't).

    The code should return to lift-1 once lift-3 is done and continue until text file is finished.

    How can I get the code to cycle through all the pthreads that call lift() until the input text file is empty.

    Attempts I've made was to add a second mutex lock to prevent collisions. I've also tried arranging pthread_join() to no avail.

    I've tried asking stack overflow, but you know what they're like.

    Code:
    header file:
    Code:
        #include <stdbool.h>
        
        typedef struct Buffer
        {
            int from;
            int to;
        }buffer_t; //buffer arrary to store from and to values from sim_input
        
        typedef struct Output
        {
            int p; //previous floor
            int rf; //request from
            int rt; //request to
            int total_m[3]; //total movement
            int c; // current position
            int m; //movement
            int total_r[3]; //total requests
        }write_t;
        typedef struct shared_memory
        {
            void* in;
            void* out;
            int id; //name of thread (more relieable than pthread_self())
        }share_t; //Shared memory to be used across functions (Allow multiple arguements in pthread_create)
    main file:
    Code:
        #include <stdio.h>
        #include <stdlib.h>
        #include <pthread.h> 
        #include <unistd.h>
        #include <assert.h>
        #include "list.h"
        pthread_cond_t full, empty; //declare thread conditions
        pthread_mutex_t lock, lock2; //declare mutex to lock functions
        int counter; //number of requests made
        static int size; //buffer size (must only change once at beginning)
        static int sec; //Time in seconds, needed for lift to execute operation
        int length; //Number of values in buffer
        unsigned int in, out_i; //tail and head of buffer
        buffer_t *A; //the circular queue, is initialized in main due to size declared by user input
        write_t write1; //Used for printing the information from buffer.
        int id; //Lift id to be printed in output
        void *lift(void *vargp) //Consumer
        {
                pthread_mutex_lock(&lock2);
                while (length <= 0)
                {
                    printf("Waiting for request\n");
                    pthread_cond_wait(&empty, &lock2); //buffer is empty
                }
                sleep(sec); //Simulate time of elevator movement
                printf("lifting information\n");
                id++;
                //gather information to print
                if (write1.p == 0) //only for when system begins (Assign Previous to the first record `from` value)
                {
                    write1.p = A[out_i].from;
                }
                write1.rf = A[out_i].from;
                write1.rt = A[out_i].to;
                write1.m = (write1.p - A[out_i].from) + (A[out_i].to - A[out_i].from);
                write1.c = A[out_i].to;
                //Now write the information
                fprintf(((share_t*)vargp)->out, "Lift-%d Operation\n", id);
                fprintf(((share_t*)vargp)->out, "Previous position: Floor %d\n", write1.p);
                fprintf(((share_t*)vargp)->out, "Request: Floor %d to Floor %d\n", write1.rf, write1.rt);
                fprintf(((share_t*)vargp)->out, "Detail operations:\n");
                fprintf(((share_t*)vargp)->out, "    Go from Floor %d to Floor %d\n", write1.p, write1.rf);
                fprintf(((share_t*)vargp)->out, "    Go from Floor %d to Floor %d\n", write1.rf, write1.rt);
                fprintf(((share_t*)vargp)->out, "    #movement for this request: %d\n", write1.m);
                write1.total_r[id] = write1.total_r[id] + 1;
                fprintf(((share_t*)vargp)->out, "    #request: %d\n", write1.total_r[id]);
                write1.total_m[id] = write1.total_m[id] + 1;
                fprintf(((share_t*)vargp)->out, "    Total #movement: %d\n", write1.total_m[id]);
                fprintf(((share_t*)vargp)->out, "Current Position: Floor %d\n", write1.c);
                fprintf(((share_t*)vargp)->out, "\n");
                fprintf(((share_t*)vargp)->out, "----------------------------\n");
                write1.p = write1.c; //for next statement
                length--;
                out_i = (out_i + 1) % size;
                if (id < 3)
                {
                    id = 0;
                }
                if (length == size-1)
                {
                    printf("Signalling Request\n");
                    pthread_cond_signal(&full);
                }
            pthread_mutex_unlock(&lock2);
            return NULL;
        }
        void *request_t(void *vargp) //producer
        {
            pthread_mutex_lock(&lock); //Now only request can operate (mutual exclusion)
            //read the input line by line and into the buffer
            while (fscanf(((share_t*)vargp)->in, "%d %d\n", &A[in].from, &A[in].to) != EOF) //Stores values in buffer
            {
                while (length >= size) //buffer is full
                {
                    printf("Waiting for lift\n");
                    pthread_cond_wait(&full, &lock);
                }
                printf("Requesting information\n");
                //If requests go out of bounds, they will be automatically registered as either
                //maximum level or minimum level
                if (A[in].to > 20)
                {
                    A[in].to = 20;
                }
                else if (A[in].to < 1)
                    {
                        A[in].to = 1;
                    }
                else if (A[in].from > 20)
                    {
                        A[in].from = 20;
                    }
                else if (A[in].from < 1)
                    {
                        A[in].from = 1;
                    }
                //Print buffer information to sim_output
                fprintf(((share_t*)vargp)->out, "New Lift Request from Floor %d to Floor %d \n", A[in].from, A[in].to);
                fprintf(((share_t*)vargp)->out, "Request No %d \n", counter++);
                fprintf(((share_t*)vargp)->out, "----------------------------\n");
                length++;
                in = (in + 1) % size;
                if (length == 1)
                {
                    printf("Signalling Lift\n");
                    pthread_cond_signal(&empty);
                }
            }
            pthread_mutex_unlock(&lock);
            pthread_exit(NULL);
        }
        void main(int argc, char *argv[]) // to avoid segmentation fault
        {
            //establish values for use (convert argv to int values)
            counter = 1;
            share_t *share = (share_t *)malloc(sizeof(share_t)); //Allow multiple arguments in pthread
            in = 0, out_i = 0, id = 0, length = 0; //All slots are empty
            long arg = strtol(argv[1], NULL, 10);
            size = arg;
            if (size <= 1)
            {
                printf("buffer size too small\n");
                exit(0);
            }
            else
            {
                //initialize Buffer as empty array.
                A = calloc(size, sizeof *A);
            }
            arg = strtol(argv[2], NULL, 10);
            sec = arg; //time required by each lift to serve a request
            if (sec <= 0)
            {
                printf("insufficient time\n");
                exit(0);
            }
            FILE *out = fopen("sim_output.txt", "w");
            FILE *in = fopen("sim_input.txt", "r");
            if (in == NULL && out == NULL) //Check if files exists before we can use them
            {
                printf("Input file empty\n");
                exit(0);
            }
            else //Load files into share for transporting.
            {
                share->out = out;
                share->in = in;
            }
            //Create threads
            printf("Creating threads\n");
            pthread_t Lift_R, lift_1, lift_2, lift_3;
            assert(pthread_cond_init(&full, NULL) == 0);
            assert(pthread_cond_init(&empty, NULL) == 0);
            assert(pthread_mutex_init(&lock, NULL) == 0);
            assert(pthread_mutex_init(&lock2, NULL) == 0);
            assert(pthread_create(&Lift_R, NULL, request_t, share) == 0);
            share->id = 1;
            assert(pthread_create(&lift_1, NULL, lift, share) == 0);
            share->id = 2;
            assert(pthread_create(&lift_2, NULL, lift, share) == 0);
            share->id = 3;
            assert(pthread_create(&lift_3, NULL, lift, share) == 0);
            assert(pthread_join(Lift_R, NULL) == 0);
            assert(pthread_join(lift_1, NULL) == 0);
            assert(pthread_join(lift_2, NULL) == 0);
            assert(pthread_join(lift_3, NULL) == 0);
            //pthread_mutex_destroy(NULL);
            int sum_r = write1.total_r[1] + write1.total_r[2] + write1.total_r[3],
                sum_t = write1.total_m[1] + write1.total_m[2] + write1.total_m[3];
            fprintf(out, "\n");
            fprintf(out, "Total number of requests %d \n", sum_r);
            fprintf(out, "Total number of movememts %d \n", sum_t);
            fclose(in);
            fclose(out);
        }

  2. #2
    and the hat of int overfl Salem's Avatar
    Join Date
    Aug 2001
    Location
    The edge of the known universe
    Posts
    38,176
    Can you post an example input file as well.

    > long arg = strtol(argv[1], NULL, 10);
    Not to mention, what command line parameters you pass.
    You should check argc before touching any argv.


    Also, main returns int, not void.
    Code:
    foo.c:136:6: warning: return type of ‘main’ is not ‘int’ [-Wmain]
     void main(int argc, char *argv[]) // to avoid segmentation fault
          ^
    foo.c: In function ‘main’:
    foo.c:136:15: warning: unused parameter ‘argc’ [-Wunused-parameter]
     void main(int argc, char *argv[]) // to avoid segmentation fault
                   ^
    If you dance barefoot on the broken glass of undefined behaviour, you've got to expect the occasional cut.
    If at first you don't succeed, try writing your phone number on the exam paper.

  3. #3
    Registered User
    Join Date
    May 2020
    Posts
    29
    input for the code looks like this:
    1 10
    4 20
    5 14
    6 7
    8 1
    4 5
    10 14
    16 13

    It is compiled with -lpthread and to run the code, use: lift_sim_A m t.(m is size and t is seconds).

  4. #4
    Registered User
    Join Date
    Dec 2017
    Posts
    966
    So what should we enter for "size" and "seconds"???

    And look at lines 168 and 169 above.
    Array indices start at 0, and the max index for a 3-element array is 2.
    If you want the truth to stand clear before you, never be for or against. - Sent-ts'an

  5. #5
    Registered User
    Join Date
    May 2020
    Posts
    29
    when I test this code, I usually use lift_sim_A 3 1. I'll try to cut the code down later to make it easier to read.

  6. #6
    and the hat of int overfl Salem's Avatar
    Join Date
    Aug 2001
    Location
    The edge of the known universe
    Posts
    38,176
    Code:
        assert(pthread_create(&Lift_R, NULL, request_t, share) == 0);
        share->id = 1;
        assert(pthread_create(&lift_1, NULL, lift, share) == 0);
        share->id = 2;
        assert(pthread_create(&lift_2, NULL, lift, share) == 0);
        share->id = 3;
        assert(pthread_create(&lift_3, NULL, lift, share) == 0);
    Yeah, the problem here is all 4 threads are pointing to the same instance of share, so they're all going to end up seeing id == 3 at some point.

    > printf(((share_t*)vargp)->out, "Lift-%d Operation\n", id);
    Create a local pointer of the right type at the start of the function.
    So you don't have cast expressions on every single line of the function.
    If you dance barefoot on the broken glass of undefined behaviour, you've got to expect the occasional cut.
    If at first you don't succeed, try writing your phone number on the exam paper.

  7. #7
    Registered User
    Join Date
    May 2020
    Posts
    29
    I created share so that I can have multiple arguments inside the pthreads which is important for the FILE* in and out. I may try to create a local pointer for id.

  8. #8
    Registered User
    Join Date
    May 2020
    Posts
    29
    But a local pointer can't be shared globally and I need that space in pthread_creare for *out so I can print results to output.
    Even if I did create a local pointer like int* id, what am I supposed to do with it, you can only do one argument in a pthread which is I made share in the first place.
    Last edited by Redsam121; 05-14-2020 at 12:44 AM.

  9. #9
    and the hat of int overfl Salem's Avatar
    Join Date
    Aug 2001
    Location
    The edge of the known universe
    Posts
    38,176
    Like this.
    Code:
    share_t shares[4];
    ...
    for ( i = 0 ; i < 3 ; i++ ) {
      shares[i].in = 
      shares[i].out = 
      shares[i].id = i;
    }
    ...
    assert(pthread_create(&Lift_R, NULL, request_t, &share[0]) == 0);
    assert(pthread_create(&lift_1, NULL, lift, &share[1]) == 0);
    assert(pthread_create(&lift_2, NULL, lift, &share[2]) == 0);
    assert(pthread_create(&lift_3, NULL, lift, &share[3]) == 0);
    If you dance barefoot on the broken glass of undefined behaviour, you've got to expect the occasional cut.
    If at first you don't succeed, try writing your phone number on the exam paper.

  10. #10
    Registered User
    Join Date
    May 2020
    Posts
    29
    I implemented it but it caused the entire code crash on the third lift. Plus I don't think ((share_t*)vargp)->in will work anymore.
    Last edited by Redsam121; 05-14-2020 at 01:20 AM.

  11. #11
    and the hat of int overfl Salem's Avatar
    Join Date
    Aug 2001
    Location
    The edge of the known universe
    Posts
    38,176
    Another problem is both threads are writing to your output file in an unsafe way.

    request_t has a different mutex to lift.
    If you dance barefoot on the broken glass of undefined behaviour, you've got to expect the occasional cut.
    If at first you don't succeed, try writing your phone number on the exam paper.

  12. #12
    Registered User
    Join Date
    May 2020
    Posts
    29
    I've changed mutex to only one lock. Both now have the same mutex. I'm now getting errors for fscanf on line 71. It's reporting too many arguements.

  13. #13
    and the hat of int overfl Salem's Avatar
    Join Date
    Aug 2001
    Location
    The edge of the known universe
    Posts
    38,176
    Post the latest code.
    If you dance barefoot on the broken glass of undefined behaviour, you've got to expect the occasional cut.
    If at first you don't succeed, try writing your phone number on the exam paper.

  14. #14
    Registered User
    Join Date
    May 2020
    Posts
    29
    This is what I've got so far:

    Code:
    #include <stdio.h>
    #include <stdlib.h>
    #include <pthread.h> 
    #include <unistd.h>
    #include <assert.h>
    #include "list.h"
    pthread_cond_t full, empty; //declare thread conditions
    pthread_mutex_t lock; //declare mutex to lock functions
    int counter; //number of requests made
    static int size; //buffer size (must only change once at beginning)
    static int sec; //Time in seconds, needed for lift to execute operation
    int length; //Number of values in buffer
    unsigned int in_i, out_i; //tail and head of buffer
    buffer_t *A; //the circular queue, is initialized in main due to size declared by user input
    write_t write1; //Used for printing the information from buffer.
    void *lift(void *vargp) //Consumer
    {
        pthread_mutex_lock(&lock);
        while (length <= 0)
        {
            printf("Waiting for request\n");
            pthread_cond_wait(&empty, &lock); //buffer is empty
        }
        sleep(sec); //Simulate time of elevator movement
        printf("lifting information\n");
        //gather information to print
        if (write1.p == 0) //only for when system begins (Assign Previous to the first record `from` value)
        {
            write1.p = A[out_i].from;
        }
        write1.rf = A[out_i].from;
        write1.rt = A[out_i].to;
        write1.m = (write1.p - A[out_i].from) + (A[out_i].to - A[out_i].from);
        write1.c = A[out_i].to;
        int id = ((share_t*)vargp)->id;
        
        //Now write the information
        fprintf(((share_t*)vargp)->out, "Lift-%d Operation\n", id);
        fprintf(((share_t*)vargp)->out, "Previous position: Floor %d\n", write1.p);
        fprintf(((share_t*)vargp)->out, "Request: Floor %d to Floor %d\n", write1.rf, write1.rt);
        fprintf(((share_t*)vargp)->out, "Detail operations:\n");
        fprintf(((share_t*)vargp)->out, "    Go from Floor %d to Floor %d\n", write1.p, write1.rf);
        fprintf(((share_t*)vargp)->out, "    Go from Floor %d to Floor %d\n", write1.rf, write1.rt);
        fprintf(((share_t*)vargp)->out, "    #movement for this request: %d\n", write1.m);
        write1.total_r[id] = write1.total_r[id] + 1;
        fprintf(((share_t*)vargp)->out, "    #request: %d\n", write1.total_r[id]);
        write1.total_m[id] = write1.total_m[id] + 1;
        fprintf(((share_t*)vargp)->out, "    Total #movement: %d\n", write1.total_m[id]);
        fprintf(((share_t*)vargp)->out, "Current Position: Floor %d\n", write1.c);
        fprintf(((share_t*)vargp)->out, "\n");
        fprintf(((share_t*)vargp)->out, "----------------------------\n");
        write1.p = write1.c; //for next statement
        length--;
        out_i = (out_i + 1) % size;
        if (length == size-1)
        {
            printf("Signalling Request\n");
            pthread_cond_signal(&full);
        }
        pthread_mutex_unlock(&lock);
        return NULL;
    }
    void *request_t(void *vargp) //producer
    {
        pthread_mutex_lock(&lock); //Now only request can operate (mutual exclusion)
        //read the input line by line and into the buffer
        while (fscanf(((share_t*)vargp)->in, "%d %d\n", &A[in_i].from, &A[in_i].to) != EOF) //Stores values in buffer
        {
            while (length >= size) //buffer is full
            {
                printf("Waiting for lift\n");
                pthread_cond_wait(&full, &lock);
            }
            printf("Requesting information\n");
            //If requests go out of bounds, they will be automatically registered as either
            //maximum level or minimum level
            if (A[in_i].to > 20)
            {
                A[in_i].to = 20;
            }
            else if (A[in_i].to < 1)
            {
                A[in_i].to = 1;
            }
            else if (A[in_i].from > 20)
            {
                A[in_i].from = 20;
            }
            else if (A[in_i].from < 1)
            {
                A[in_i].from = 1;
            }
            //Print buffer information to sim_output
            fprintf(((share_t*)vargp)->out, "New Lift Request from Floor %d to Floor %d \n", A[in_i].from, A[in_i].to);
            fprintf(((share_t*)vargp)->out, "Request No %d \n", counter++);
            fprintf(((share_t*)vargp)->out, "----------------------------\n");
            length++;
            in_i = (in_i + 1) % size;
            if (length == 1)
            {
                printf("Signalling Lift\n");
                pthread_cond_signal(&empty);
            }
        }
        pthread_mutex_unlock(&lock);
        pthread_exit(NULL);
    }
    void main(int argc, char *argv[]) // to avoid segmentation fault
    {
        //establish values for use (convert argv to int values)
        counter = 1;
        share_t share[4]; //Allow multiple arguments in pthread
        in_i = 0, out_i = 0, length = 0; //All slots are empty
        long arg = strtol(argv[1], NULL, 10);
        size = arg;
        if (size <= 1)
        {
            printf("buffer size too small\n");
            exit(0);
        }
        else
        {
            //initialize Buffer as empty array.
            A = calloc(size, sizeof *A);
        }
        arg = strtol(argv[2], NULL, 10);
        sec = arg; //time required by each lift to serve a request
        if (sec <= 0)
        {
            printf("insufficient time\n");
            exit(0);
        }
        FILE *out_t = fopen("sim_output.txt", "w");
        FILE *in_t = fopen("sim_input.txt", "r");
        if (in_t == NULL && out_t == NULL) //Check if files exists before we can use them
        {
            printf("Input file empty\n");
            exit(0);
        }
        for (int i = 0; i < 3; i++)//Load files into share for transporting.
        {
            share[i].in = in_t;
            share[i].out = out_t;
            share[i].id = i;
        }
        //Create threads
        printf("Creating threads\n");
        pthread_t Lift_R, lift_1, lift_2, lift_3;
        assert(pthread_cond_init(&full, NULL) == 0);
        assert(pthread_cond_init(&empty, NULL) == 0);
        assert(pthread_mutex_init(&lock, NULL) == 0);
        assert(pthread_create(&Lift_R, NULL, request_t, &share[0]) == 0);
        assert(pthread_create(&lift_1, NULL, lift, &share[1]) == 0);
        assert(pthread_create(&lift_2, NULL, lift, &share[2]) == 0);
        assert(pthread_create(&lift_3, NULL, lift, &share[3]) == 0);
        assert(pthread_join(Lift_R, NULL) == 0);
        assert(pthread_join(lift_1, NULL) == 0);
        assert(pthread_join(lift_2, NULL) == 0);
        assert(pthread_join(lift_3, NULL) == 0);
        pthread_mutex_destroy(NULL);
        int sum_r = write1.total_r[1] + write1.total_r[2] + write1.total_r[3],
            sum_t = write1.total_m[1] + write1.total_m[2] + write1.total_m[3];
        fprintf(out_t, "\n");
        fprintf(out_t, "Total number of requests %d \n", sum_r);
        fprintf(out_t, "Total number of movememts %d \n", sum_t);
        fclose(in_t);
        fclose(out_t);
    }

  15. #15
    Registered User
    Join Date
    May 2020
    Posts
    29
    The main problem is that it crashes after lift-3.
    Last edited by Redsam121; 05-14-2020 at 05:29 AM.

Popular pages Recent additions subscribe to a feed

Similar Threads

  1. Circular buffer - console crashing
    By High Voltage in forum C++ Programming
    Replies: 10
    Last Post: 02-26-2017, 08:59 AM
  2. Circular Buffer using Virtual Memory
    By SMurf in forum Windows Programming
    Replies: 5
    Last Post: 03-26-2013, 07:05 PM
  3. Replies: 3
    Last Post: 06-27-2011, 11:25 AM
  4. Circular buffer - logic difficulty
    By erasm in forum C Programming
    Replies: 2
    Last Post: 10-05-2009, 01:08 PM
  5. Circular Buffer
    By parisframe in forum C Programming
    Replies: 11
    Last Post: 09-16-2009, 11:05 PM

Tags for this Thread