Producer/Consumer - problems creating threads

This is a discussion on Producer/Consumer - problems creating threads within the C Programming forums, part of the General Programming Boards category; I'm trying to create 20 producer threads and 5 consumer threads. In my main, I put 20 text file names ...

  1. #1
    Registered User
    Join Date
    May 2009
    Posts
    43

    Producer/Consumer - problems creating threads

    I'm trying to create 20 producer threads and 5 consumer threads. In my main, I put 20 text file names into a fifo queue.

    The producer thread gets the file name from the queue, reads the contents of that file, converts the characters to uppercase, and writes the contents into the buffer slot. (I am supposed to create 5 buffer slots, but to keep things simple, I'm storing everything into just buffer[0] for now until I fix the problem with creating threads).

    The consumer thread reads the contents from the buffer and then prints them. And the buffer and queue have to be empty until the consumer stops from continuing.

    Fairly straightforward! However, right now I am only having lock creating 20 producer threads with 20 consumer threads. Basically it executes like the following:
    -producer stores the contents of in0.txt into the buffer, the consumer reads it and prints the contents.
    -producer stores the contents of in1.txt into the buffer, the consumer reads it and prints the contents.
    -producer stores the contents of in2.txt into the buffer, the consumer reads it and prints the contents.
    etc.. until in19.txt has been read.

    I'm glad things work for 20 producers and 20 consumers, but I need it to work for 20 and 5. Please focus your attention to the bold part. If I remove the red comments, my code won't run and it gets hung after the 5th producer gets created. Anyone know how I can code 20 producer threads and 5 consumer threads to run properly? I've been at this for hours with no luck. Any help would be greatly appreciated.

    Code:
    struct fifo_struct
    {
            char fileName[1024];
            struct fifo_struct* next;
    };
    
    struct linked_list
    {
            struct fifo_struct* head;
            struct fifo_struct* tail;
    };
    
    pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
    sem_t full, empty;
    char buffer[BUFFER_SIZE][128];
    
    
    int main()
    {
            pthread_t producerVar;
            pthread_t consumerVar;
            char line[1024];
            char str[200];
            int i = 0;
            sem_init(&full, 0, 0);
            sem_init(&empty, 0, BUFFER_SIZE);
            struct fifo_struct* fifo;
    
    
            struct linked_list* s = malloc( 1 * sizeof(*s));
            if(s == NULL)
            {
                    fprintf(stderr, "LINE: %d, malloc() failed\n", __LINE__);
            }
            s->head = s->tail = NULL;
    
            for(i = 0; i < (FILESIZE + 1); i++)
            {
                    sprintf(str, "in%d.txt", i);
    
                    fifo = malloc(1 * sizeof(*fifo));
                    if(fifo == NULL)
                    {
                            fprintf(stderr, "IN %s, %s: malloc() failed\n", __FILE__, "list_add");
                    }
                    strcpy(fifo->fileName,str);
                    fifo->next = NULL;
    
                    if(s == NULL)
                    {
                            printf("Error: Queue has not been initialized\n");
                    }
                    else if(s->head == NULL && s->tail == NULL)
                    {
                            s->head = s->tail = fifo;
                    }
                    else if(s->head == NULL || s->tail == NULL)
                    {
                            printf("Error: Problem with code\n");
                            free(fifo);
                    }
                    else
                    {
                            s->tail->next = fifo;
                            s->tail = fifo;
                    }
            }
            //print_queue(s);
            //fifo->mutex = (pthread_mutex_t *) malloc (sizeof (pthread_mutex_t));
            //pthread_mutex_init(fifo->mutex, NULL);
    
            for(i = 0; i < 20; i++)
            {
                    pthread_create(&producerVar, NULL, producer, s);
                    pthread_join(producerVar, NULL);
            //}
            //for(i = 0; i < 20; i++)
            //{
                    pthread_create(&consumerVar, NULL, consumer, s);
                    pthread_join(consumerVar, NULL);
            }
    
            return 0;
    }


    I was hoping something like the following would work, but when I run the code, it hangs after the 5th producer thread or so:
    for(i = 0; i < 20; i++)
    {
    pthread_create(&producerVar, NULL, producer, s);
    pthread_join(producerVar, NULL);
    }
    for(i = 0; i < 20; i++)
    {
    pthread_create(&consumerVar, NULL, consumer, s);
    pthread_join(consumerVar, NULL);
    }

  2. #2
    Registered User
    Join Date
    May 2009
    Posts
    43
    Also if posting my producer and consumer code helps, just let me know and I'll post it.

  3. #3
    and the Hat of Guessing tabstop's Avatar
    Join Date
    Nov 2007
    Posts
    14,185
    Do you really want to call pthread_join after each go round? join will wait for the thread to finish before starting the next one....

    EDIT: And looking some more: how are your fifos set up? You may not be able to reuse a fifo until a consumer thread has come along to take up what's already been written; or maybe you're writing to a fifo that doesn't exist?
    Last edited by tabstop; 05-23-2010 at 08:44 AM.

  4. #4
    Registered User jeffcobb's Avatar
    Join Date
    Dec 2009
    Location
    Henderson, NV
    Posts
    875
    Actually having written a lot of this stuff before, what is the point of having more producers than consumers? I honestly cannot see how you will get any kind of efficiency gain out of it and remember: adding more threads doesn't translate directly to more throughput. The more threads you add, the more management your scheduler has to do and more context switches that happen (and those puppies can be expensive)...ultimately what you want is enough producers to keep your consumers busy (few idle states) but not overloaded with work (piling on more work is just like pounding more sand into a funnel; what is going to come out is going to come out, no matter how hard you pound or how much you put in). You strike that balance at the architectural level where you figure out your overall solution, not at run-time (not w/o a lot of load checking etc but that is the subject of another discussion)...
    C/C++ Environment: GNU CC/Emacs
    Make system: CMake
    Debuggers: Valgrind/GDB

  5. #5
    Registered User jeffcobb's Avatar
    Join Date
    Dec 2009
    Location
    Henderson, NV
    Posts
    875
    But tabstop is right: your more immediate problem is calling join() right after creation. The accepted practice is to create an array of thread handles, do a loop to create/start each one, then when your main thread is ready to wait for them to finish( as in, has no more work to do) *then* do a second loop on the thread handles and call join() on each....not until...
    C/C++ Environment: GNU CC/Emacs
    Make system: CMake
    Debuggers: Valgrind/GDB

  6. #6
    Registered User jeffcobb's Avatar
    Join Date
    Dec 2009
    Location
    Henderson, NV
    Posts
    875
    Also are you not creating (and then joining) over and over on the same thread handle variable? Would this not produce a resource leak?
    C/C++ Environment: GNU CC/Emacs
    Make system: CMake
    Debuggers: Valgrind/GDB

  7. #7
    Registered User
    Join Date
    May 2009
    Posts
    43
    Quote Originally Posted by tabstop View Post
    Do you really want to call pthread_join after each go round? join will wait for the thread to finish before starting the next one....

    EDIT: And looking some more: how are your fifos set up? You may not be able to reuse a fifo until a consumer thread has come along to take up what's already been written; or maybe you're writing to a fifo that doesn't exist?
    I did get rid of the joins, but my output is all messed up.

    I'm not writing to the fifo. I have a fifo queue of file names. The producer reads the file name, then stores its contents to the bufffer. The consumer writes whats in the buffer.

  8. #8
    Registered User
    Join Date
    May 2009
    Posts
    43
    Quote Originally Posted by jeffcobb View Post
    Actually having written a lot of this stuff before, what is the point of having more producers than consumers? I honestly cannot see how you will get any kind of efficiency gain out of it and remember: adding more threads doesn't translate directly to more throughput. The more threads you add, the more management your scheduler has to do and more context switches that happen (and those puppies can be expensive)...ultimately what you want is enough producers to keep your consumers busy (few idle states) but not overloaded with work (piling on more work is just like pounding more sand into a funnel; what is going to come out is going to come out, no matter how hard you pound or how much you put in). You strike that balance at the architectural level where you figure out your overall solution, not at run-time (not w/o a lot of load checking etc but that is the subject of another discussion)...
    Teacher's specifications to use 20 producers and 5 consumers. I'm not sure why these numbers were chosen, but I suppose it's to challenge us more.

  9. #9
    Registered User
    Join Date
    May 2009
    Posts
    43
    Quote Originally Posted by jeffcobb View Post
    But tabstop is right: your more immediate problem is calling join() right after creation. The accepted practice is to create an array of thread handles, do a loop to create/start each one, then when your main thread is ready to wait for them to finish( as in, has no more work to do) *then* do a second loop on the thread handles and call join() on each....not until...
    Right on, I changed my code to do exactly that.

    pthread_t producerVar[PRODUCERS];
    pthread_t consumerVar[CONSUMERS];

    for(i = 0; i < 20; i++)
    {
    pthread_create(&producerVar[i], NULL, producer, s);
    }
    for(i = 0; i < 5; i++)
    {
    pthread_create(&consumerVar[i], NULL, consumer, s);
    }

    Like I said, my output is still having problems. I'm certain there's a problem in my producer and/or consumer.

  10. #10
    Registered User jeffcobb's Avatar
    Join Date
    Dec 2009
    Location
    Henderson, NV
    Posts
    875
    Quote Originally Posted by hansel13 View Post
    Right on, I changed my code to do exactly that.

    pthread_t producerVar[PRODUCERS];
    pthread_t consumerVar[CONSUMERS];

    for(i = 0; i < 20; i++)
    {
    pthread_create(&producerVar[i], NULL, producer, s);
    }
    for(i = 0; i < 5; i++)
    {
    pthread_create(&consumerVar[i], NULL, consumer, s);
    }

    Like I said, my output is still having problems. I'm certain there's a problem in my producer and/or consumer.
    Well your teachers requirements are pretty brain-damaged, I gotta say mate. Put it this way: once you get away from this teacher, learn how to do it right. Pick up Programming POSIX Threads or Modern Multithreading for a master-class in doing this kind of thing.

    As for your messed up output when you fix your join problem you need to gate the access to stdio and that is thankfully easy to fix with a brute-force solution: set up a write method like so (pseudo code; this will be way easier with with a scoped mutex)
    Code:
    void write(char *ptrText)
    {
         writeMutex.lock();
         printf("%s\n", ptrText);
         writeMutex.unlock();
    
    }
    Then in all threads that need to output they just call write("some message"); If two threads get to it at the same time (the thing thats messing up your output) the mutex will let the first one through and write and anyone else has to wait. When the write is finished and the mutex is unlocked, the next thread gets unlocked and gets to print...

    easy-peasy. Multiple threads stepping on each other when using buffered output is an old problem..
    C/C++ Environment: GNU CC/Emacs
    Make system: CMake
    Debuggers: Valgrind/GDB

  11. #11
    and the Hat of Guessing tabstop's Avatar
    Join Date
    Nov 2007
    Posts
    14,185
    Quote Originally Posted by hansel13 View Post
    I did get rid of the joins, but my output is all messed up.

    I'm not writing to the fifo. I have a fifo queue of file names. The producer reads the file name, then stores its contents to the bufffer. The consumer writes whats in the buffer.
    I assume you mean the buffer variable that's global. You'll have to define what you mean by "output is all messed up". I assume that you intend your buffer to be a number of 128-byte chunks, and that each consumer takes a chunk? Depending on luck, I suppose you might get the chunks out of order depending on how you're divvying up the work.

  12. #12
    Registered User jeffcobb's Avatar
    Join Date
    Dec 2009
    Location
    Henderson, NV
    Posts
    875
    FWIW, the reason (if your teacher asks) that having so many more writers (producers) than readers is, beyond whats already been stated is that while you can have multiple readers accessing the data at once (consumers), you can never have more than one writer accessing at once, leading to a serious bottleneck in processing.

    I would really be keen to know what wisdom lead the instructor to add that constraint. Really bad for practical designs..
    C/C++ Environment: GNU CC/Emacs
    Make system: CMake
    Debuggers: Valgrind/GDB

  13. #13
    Registered User
    Join Date
    May 2009
    Posts
    43
    Quote Originally Posted by tabstop View Post
    I assume you mean the buffer variable that's global. You'll have to define what you mean by "output is all messed up". I assume that you intend your buffer to be a number of 128-byte chunks, and that each consumer takes a chunk? Depending on luck, I suppose you might get the chunks out of order depending on how you're divvying up the work.
    Well it won't store any of my file contents into the buffer, and right now, I'm have problems all together with my output.

    NOTE: The while loops in the consumer and producer functions isn't formatted correctly, sorry if there's confusion
    Code:
    #include<stdio.h>
    #include<stdlib.h>
    #include<ctype.h>
    #include<unistd.h>
    #include<pthread.h>
    #include<semaphore.h>
    
    #define FILESIZE 19
    #define BUFFER_SIZE 5
    #define PRODUCERS 20
    #define CONSUMERS 20
    
    struct fifo_struct
    {
            char fileName[1024];
            struct fifo_struct* next;
    };
    
    struct linked_list
    {
            struct fifo_struct* head;
            struct fifo_struct* tail;
    };
    
    pthread_mutex_t mutex;
    sem_t full, empty;
    char buffer[BUFFER_SIZE][128];
    int counter;
    
    void print_queue(const struct linked_list* ps)
    {
            struct fifo_struct* p = NULL;
    
            if(ps)
            {
                    for(p = ps->head; p; p = p->next)
                    {
                            if(p)
                                    printf("int = %s\n", p->fileName);
                            else
                                    printf("can't print NULL STRUCT\n");
                    }
            }
    }
    void *producer(void *q);
    void *consumer(void *q);
    
    int main()
    {
            pthread_t producerVar[PRODUCERS];
            pthread_t consumerVar[CONSUMERS];
            char line[1024];
            char str[200];
            int i = 0;
            counter = 0;
            sem_init(&full, 0, 0);
            sem_init(&empty, 0, BUFFER_SIZE);
            pthread_mutex_init(&mutex,NULL);
            struct fifo_struct* fifo;
    
    
            struct linked_list* s = malloc( 1 * sizeof(*s));
            if(s == NULL)
            {
                    fprintf(stderr, "LINE: %d, malloc() failed\n", __LINE__);
            }
            s->head = s->tail = NULL;
    
            for(i = 0; i < (FILESIZE + 1); i++)
            {
                    sprintf(str, "in%d.txt", i);
    
                    fifo = malloc(1 * sizeof(*fifo));
                    if(fifo == NULL)
                    {
                            fprintf(stderr, "IN %s, %s: malloc() failed\n", __FILE__, "list_add");
                    }
                    strcpy(fifo->fileName,str);
                    fifo->next = NULL;
    
                    if(s == NULL)
                    {
                            printf("Error: Queue has not been initialized\n");
                    }
                    else if(s->head == NULL && s->tail == NULL)
                    {
                            s->head = s->tail = fifo;
                    }
                    else if(s->head == NULL || s->tail == NULL)
                    {
                            printf("Error: Problem with code\n");
                            free(fifo);
                    }
                    else
                    {
                            s->tail->next = fifo;
                            s->tail = fifo;
                    }
            }
            //print_queue(s);
    
            for(i = 0; i < 20; i++)
            {
                    pthread_create(&producerVar[i], NULL, producer, s);
                    //pthread_join(producerVar, NULL);
            }
            for(i = 0; i < 5; i++)
            {
                    pthread_create(&consumerVar[i], NULL, consumer, s);
                    //pthread_join(consumerVar, NULL);
            }
    
            return 0;
    }
    
    void *producer(void *arg)
    {
            int i = 0;
            char fileContent[1024];
            char line[1024];
            FILE * myfile;
            struct linked_list *q = arg;
            printf("IN PRODUCER\n");
            //print_queue(q);
            struct fifo_struct* tmp1 = NULL;
            struct fifo_struct* tmp2 = NULL;
    while(1)
    {
            sem_wait(&empty);
            pthread_mutex_lock(&mutex);
    
            if(q == NULL)
            {
                    printf("List is empty\n");
                    return(NULL);
            }
            else if(q->head == NULL && q->tail == NULL)
            {
                    printf("List is empty\n");
                    return(NULL);
            }
            else if(q->head == NULL || q->tail == NULL)
            {
                    printf("Error: Problem with code\n");
                    return(NULL);
            }
    
            myfile = fopen(q->head->fileName,"r+");
            while ((fgets(line, 1024, myfile)) != NULL)
            {
                    strcpy(fileContent, line);
                    printf("%s",fileContent);
            }
            fclose(myfile);
    
            while(fileContent[i] != '\n')
            {
                    fileContent[i] = toupper(fileContent[i]);
                    i++;
            }
            //printf("%s\n", fileContent);
            if(counter < BUFFER_SIZE)
            {
                    strncpy(buffer[counter],fileContent,128);
                    printf("buffer[%d] = %c\n", counter, buffer[counter]);
                    counter++;
            }
            //strncpy(buffer[0],fileContent,128);
            //printf("%s\n", fileContent);
    
    
            tmp1 = q->head;
            tmp2 = tmp1->next;
            free(tmp1);
            q->head = tmp2;
    
            if(q->head == NULL)
                          q->tail = q->head;
    
            pthread_mutex_unlock(&mutex);
            sem_post(&full);
    }
            //print_queue(q);
            return(NULL);
    }
    
    void *consumer(void *q)
    {
            int i = 0;
    
            printf("IN CONSUMER\n");
    while(1)
    {
            sem_wait(&full);
            pthread_mutex_lock(&mutex);
    
            if(counter > 0)
            {
                    printf("\t\tbuffer[%d] = %c\n", counter, buffer[counter]);
                    counter--;
            }
            pthread_mutex_unlock(&mutex);
            sem_post(&empty);
    }
            return(NULL);
    }
    Each file name just has a let of the alphabet repeated like 5-10 times. Output should look like the following (and the consumer output should vary):
    IN PRODUCER
    aaaaaaaaa
    IN PRODUCER
    bbbbbbbbb
    IN PRODUCER
    ccccccccccc
    IN PRODUCER
    ddddddd
    IN PRODUCER
    eeeeeeee
    IN PRODUCER
    ffffffffff
    IN PRODUCER
    gggggggg
    IN PRODUCER
    hhhhhhhh
    IN PRODUCER
    iiiiiiiiiiiii
    IN PRODUCER
    jjjjjjjjjj
    IN PRODUCER
    kkkkkkkkkk
    IN PRODUCER
    llllllllllllllll
    IN PRODUCER
    mmmmmm
    IN PRODUCER
    nnnnnnnnn
    IN PRODUCER
    ooooooooo
    IN PRODUCER
    pppppppp
    IN PRODUCER
    qqqqqqqq
    IN PRODUCER
    rrrrrrrrr
    IN PRODUCER
    sssssss
    IN PRODUCER
    ttttttttt
    IN CONSUMER
    IN CONSUMER
    IN CONSUMER
    IN CONSUMER
    IN CONSUMER


    However, I'm getting something crazy like:
    IN PRODUCER
    IN PRODUCER
    IN PRODUCER
    IN PRODUCER
    IN PRODUCER
    IN PRODUCER
    IN PRODUCER
    IN PRODUCER
    IN PRODUCER
    IN PRODUCER
    IN PRODUCER
    IN PRODUCER
    IN PRODUCER
    aaaaaaaaaa
    IN PRODUCER
    IN PRODUCER
    IN PRODUCER
    IN PRODUCER
    IN PRODUCER
    IN PRODUCER
    IN PRODUCER
    IN CONSUMER
    IN CONSUMER
    IN CONSUMER
    IN CONSUMER
    IN CONSUMER

  14. #14
    Registered User
    Join Date
    May 2009
    Posts
    43
    Quote Originally Posted by jeffcobb View Post
    Well your teachers requirements are pretty brain-damaged, I gotta say mate. Put it this way: once you get away from this teacher, learn how to do it right. Pick up Programming POSIX Threads or Modern Multithreading for a master-class in doing this kind of thing.

    As for your messed up output when you fix your join problem you need to gate the access to stdio and that is thankfully easy to fix with a brute-force solution: set up a write method like so (pseudo code; this will be way easier with with a scoped mutex)
    Code:
    void write(char *ptrText)
    {
         writeMutex.lock();
         printf("%s\n", ptrText);
         writeMutex.unlock();
    
    }
    Then in all threads that need to output they just call write("some message"); If two threads get to it at the same time (the thing thats messing up your output) the mutex will let the first one through and write and anyone else has to wait. When the write is finished and the mutex is unlocked, the next thread gets unlocked and gets to print...

    easy-peasy. Multiple threads stepping on each other when using buffered output is an old problem..
    Yeah I'll keep that in mind, can't wait to graduate with this bachelors degree so I can start my Masters.

    I have my mutex and semaphore set up actually just like that, however when I implemented the buffer to 5 slots instead of 1, I keep getting different output than what I was hoping for. Clearly I'm doing something wrong, but I'm not sure if it's a pthread_create problem, a problem implementing a buffer, or something else?

  15. #15
    and the Hat of Guessing tabstop's Avatar
    Join Date
    Nov 2007
    Posts
    14,185
    Why not check whether the fopen succeeds?

Page 1 of 4 1234 LastLast
Popular pages Recent additions subscribe to a feed

Similar Threads

  1. Need Help: Multi-threading and Synchronization
    By Anom in forum C Programming
    Replies: 7
    Last Post: 12-08-2009, 05:34 PM
  2. CFiledialog threads termination
    By ks_lohith in forum C++ Programming
    Replies: 0
    Last Post: 09-11-2009, 01:57 AM
  3. problems creating a linked list
    By jamjar in forum C Programming
    Replies: 5
    Last Post: 10-23-2002, 06:50 AM
  4. Block and wake up certain threads
    By Spark in forum C Programming
    Replies: 9
    Last Post: 06-01-2002, 04:39 AM
  5. problems using threads.....?
    By stumpert in forum C++ Programming
    Replies: 2
    Last Post: 04-27-2002, 10:30 PM

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21