Thread: Producer/Consumer - problems creating threads

  1. #31
    Registered User
    Join Date
    May 2009
    Posts
    43
    Quote Originally Posted by jeffcobb View Post
    Thursday as in tomorrow? OK.

    Well I will do what I can tonight to give some guidance but honestly for a BFO into what is *actually* happening, assign each producer/consumer an id number and have them print it in your output. What you are going to find is something like this:
    1. You need to read 20 files and ultimately uppercase them (the "work" and then print them to the screen.
    2. You want the thing with the most work to have the most threads.
    3. The way your producers are set up is they (all 20) will fire up, fight over a mutex, pull off a filename, read that file, stick it in the data store, set the semaphore for the readers and the *loop* to see if there is more work.
    4. The readers fight over the same mutex, 1 (one) of them wins, pulls off a value and prints it to the screen.

    Now of the work presented, the way you have distributed it, the producers might as well print it to the screen and be done with it if getting the job done is the point; the readers are superfluous.

    *however*

    What actually happens on a dual-core box is:
    The first 1 or 2 producers will do 90% of the work. This is what will be revealed if you assign each one a number as I had done some time ago. That means that 90-95% of your producers will spin on the while(1) loop and do nothing but eat clock cycles. If they all try to lock the same mutex, most won't even finish. This will demonstrate the lunacy of 20 producers: they won't get the job done 20x faster and most won't do anything at all.

    The readers on the other hand really don't have much to do , just fight over a mutex and print a string to the screen.

    In a real producer/consumer model, the producers (usually just a handful if even more than 1 or 2) will simply describe the job which could look like this:

    Code:
    struct TJob
    {
         // data
         char achBuffer[MAX_FILENAME_SIZE];
         bool bProcessed; // has this job been done yet?
    };
    
    // you know you only have 20 jobs so...
    struct TJob jobQueue[20];
    Then you keep a job count (how many are in your jobQueue), the currentAddPosition (init to -1) and the current job being done. Init this to -1;

    Spin your threads as follows:
    1-2 (only one is necessary) producers that feed filenames into the queue

    4-5 readers.

    Make a mutex-protected function to add to the queue which would involve:
    increment the totalJobs variablle.
    if( currentAddPosition == -1)
    Its the first job so set it to 0
    setCurrentJob to 0
    else
    increment the currentAddPosition
    copy job to jobQueue[currentAddPosition]
    incrementTotalJobs

    then you need a reader function (something to dole out the "jobs" to readers" so
    that function would:
    read the jobQueue[currentJob]
    decrement totalJobs
    increment currentJob

    The producers all the top function, the readers call the bottom one (protected by mutexes but ONLY as long as it takes to push data into it or pull data from jobQueue. The rest of the time let the threads work in solitude. The readers producers could read the files and push the contents into the jobQueue, readers (and long as each gets a unique index into the data) multiply access the data store at once, getting the data, upper-casing it and printing to the screen. Be prepared for a bottle-neck at the IO layer (screen)..
    This is very similar to what I did, except I actually just dequeued inside the producer. I also got rid of that while loop in the producer like you pointed out. It was useless!

    Jeff, I sent you a personal message of my new code. I'd rather not post it here now that I have it working.

    Thanks for your help.

  2. #32
    Registered User jeffcobb's Avatar
    Join Date
    Dec 2009
    Location
    Henderson, NV
    Posts
    875
    Well that was part one (dinner was ready). Part 2 (and then I will look at the PM) is this bit of strategy:

    If you have a few producers (since only one can add to the queue at a time anyways, more is pretty useless), the real trick is in how the jobs are doled out to the readers. The easy answer is it just set up a mutex and let them fight over it. This WILL work but not as well as it could.

    A strategy that works pretty well is this: you have a set of producers pushing jobs into one end of the queue.

    On the other end you have (say) 5 threads: 1 is a work "manager" and the others are workers. The workers are stored in in a struct kind of like this:
    Code:
    struct TWorker
    {
         pthread_t hThread; // thread handle
         pthread_cond_t waitVar; // this is a wait variable
         bool bDoWork; // this tells the worker to do "work"
         int nJobIndex; // in this case, it is an index into the array above.
         bool bExit;
    };
    So the reader threads all start up and go into a loop that looks like:
    Code:
    while(!bExit)
    {
          while(pthread_cond_wait(condVar) && !bDoWork)
          {
                sleep(1); // sometimes threads wake up for all kinds of reasons. unless we meant to
                              // bDoWork will be false so sleep and go back to waiting
          }
          if( !bExit)
          {
    
                 // do work here
           }
         return (void*)NULL;
    }
    Now picture these all in an array. When the producers add something to the queue, they set a semaphore saying "work available". The worker "manager" watches the semaphore and when true then it locks a mutex so the producers wait, the for each waiting job, it does something like:
    Code:
    while(bJobsToDo)
    {
          for(x = 0; x < numWorkers; x++)
          {
                if( workerArray[x].bDoWork == false)
                {
                      workerArray[x].nJobIndex = nIndexToCurrentJob; // give it a job
                      workerArray[x].bDoWork = true; // Assure the logic this is real work
                      pthread_cond_signal(workerArray[x].condVar); // wake up the thread;
                      break;
                 }
     
           }
    
    }
    Then when the producers are done and the queue is empty the work mgr releases the works by iterating thru the list, setting the bDoWork, bExit and signalling the condition variable. They wake up one at a time and exit cleanly.

    This way the queue is locked only as long as it takes to get a reference to a job and you keep as many workers busy as there are jobs to to.

    Admittedly this is high-level but this is one way to do a reasonable producer-consumer engine. I doubt you have time for this before tomorrow so let me look at your mail...
    C/C++ Environment: GNU CC/Emacs
    Make system: CMake
    Debuggers: Valgrind/GDB

  3. #33
    Registered User
    Join Date
    May 2009
    Posts
    43
    Quote Originally Posted by jeffcobb View Post
    Well that was part one (dinner was ready). Part 2 (and then I will look at the PM) is this bit of strategy:

    If you have a few producers (since only one can add to the queue at a time anyways, more is pretty useless), the real trick is in how the jobs are doled out to the readers. The easy answer is it just set up a mutex and let them fight over it. This WILL work but not as well as it could.

    A strategy that works pretty well is this: you have a set of producers pushing jobs into one end of the queue.

    On the other end you have (say) 5 threads: 1 is a work "manager" and the others are workers. The workers are stored in in a struct kind of like this:
    Code:
    struct TWorker
    {
         pthread_t hThread; // thread handle
         pthread_cond_t waitVar; // this is a wait variable
         bool bDoWork; // this tells the worker to do "work"
         int nJobIndex; // in this case, it is an index into the array above.
         bool bExit;
    };
    So the reader threads all start up and go into a loop that looks like:
    Code:
    while(!bExit)
    {
          while(pthread_cond_wait(condVar) && !bDoWork)
          {
                sleep(1); // sometimes threads wake up for all kinds of reasons. unless we meant to
                              // bDoWork will be false so sleep and go back to waiting
          }
          if( !bExit)
          {
    
                 // do work here
           }
         return (void*)NULL;
    }
    Now picture these all in an array. When the producers add something to the queue, they set a semaphore saying "work available". The worker "manager" watches the semaphore and when true then it locks a mutex so the producers wait, the for each waiting job, it does something like:
    Code:
    while(bJobsToDo)
    {
          for(x = 0; x < numWorkers; x++)
          {
                if( workerArray[x].bDoWork == false)
                {
                      workerArray[x].nJobIndex = nIndexToCurrentJob; // give it a job
                      workerArray[x].bDoWork = true; // Assure the logic this is real work
                      pthread_cond_signal(workerArray[x].condVar); // wake up the thread;
                      break;
                 }
     
           }
    
    }
    Then when the producers are done and the queue is empty the work mgr releases the works by iterating thru the list, setting the bDoWork, bExit and signalling the condition variable. They wake up one at a time and exit cleanly.

    This way the queue is locked only as long as it takes to get a reference to a job and you keep as many workers busy as there are jobs to to.

    Admittedly this is high-level but this is one way to do a reasonable producer-consumer engine. I doubt you have time for this before tomorrow so let me look at your mail...
    I have set up the mutex as you mentioned. However, I should have mentioned, for this assignment, I can't use conditional variables. That would definitely make things easier.. Instead I'm using semaphores (empty and full), for the section where the buffer is accessed by the producer and consumer.

    In the PM I sent you, it has the semaphores set up, and I am using only one buffer slot.

    Your code actually makes a lot of sense, haha I wish you were the TA..

  4. #34
    Registered User jeffcobb's Avatar
    Join Date
    Dec 2009
    Location
    Henderson, NV
    Posts
    875
    Hansel this kind of thing has been my bread and butter for a number of years. Code for the CellBE sometime if this interests you; it will blow you (and Intel) away). 8 cores. Each operating many many times faster than the primary 9th, primary core. Real multithreading. Yeah. <makes sounds like Tim Allen>
    C/C++ Environment: GNU CC/Emacs
    Make system: CMake
    Debuggers: Valgrind/GDB

  5. #35
    Registered User
    Join Date
    May 2009
    Posts
    43
    Quote Originally Posted by jeffcobb View Post
    Hansel this kind of thing has been my bread and butter for a number of years. Code for the CellBE sometime if this interests you; it will blow you (and Intel) away). 8 cores. Each operating many many times faster than the primary 9th, primary core. Real multithreading. Yeah. <makes sounds like Tim Allen>
    Wow, that stuff already blows me away just trying to comprehend it. I'm actually more interested in moving into management eventually or even sales. I'm going to get my Masters in CS and my MBA. I actually live in Portland, which is intel heaven, so I'll probably work there to start off with then after I get some experience, I'll hopefully move into computer sales.

  6. #36
    Registered User jeffcobb's Avatar
    Join Date
    Dec 2009
    Location
    Henderson, NV
    Posts
    875
    Ah. Well here was hoping; the industry is in dire need of good multi-thread programmers.
    C/C++ Environment: GNU CC/Emacs
    Make system: CMake
    Debuggers: Valgrind/GDB

  7. #37
    Registered User
    Join Date
    May 2009
    Posts
    43
    Quote Originally Posted by jeffcobb View Post
    Ah. Well here was hoping; the industry is in dire need of good multi-thread programmers.
    Hey, if they pay is good, I'm always open. I just know 2 people personally who have taken the route I hope to take, and they are in excellent shape financially. So hopefully with hard work, I can get to that level. I don't mind a challenge though.

  8. #38
    Registered User jeffcobb's Avatar
    Join Date
    Dec 2009
    Location
    Henderson, NV
    Posts
    875
    Quote Originally Posted by hansel13 View Post
    Hey, if they pay is good, I'm always open. I just know 2 people personally who have taken the route I hope to take, and they are in excellent shape financially. So hopefully with hard work, I can get to that level. I don't mind a challenge though.
    Depends on two things:
    1. Where you do it. In Virginia (a typically poor state) I was making ~35k 16 years ago. In San Francisco, six figures. Here in Vegas, 87k.
    C/C++ Environment: GNU CC/Emacs
    Make system: CMake
    Debuggers: Valgrind/GDB

  9. #39
    Registered User
    Join Date
    May 2009
    Posts
    43
    OK Well I almost have everything running, that's the good news. The bad news is I really need this done tonight.

    The problem right now is that it seems like only one buffer is being used the entire time, and only one consumer thread is mostly getting used... Why is this happening? I'm sure my semaphores are set up properly. Anyway, my code is posting below, with the semaphore sections BOLDED. And the code is following by the output. The output to the left is from the producer, the output to the right is from the consumer.

    My TA is pretty much useless in helping me right now because he's busy grading, so if anyone is kind enough to take one last ditch effort at this, please do!

    Code:
    #include<stdio.h>
    #include<stdlib.h>
    #include<ctype.h>
    #include<unistd.h>
    #include<pthread.h>
    #include<string.h>
    #include<semaphore.h>
    
    #define FILESIZE 20
    #define BUFFER_SIZE 5
    #define PRODUCERS 20
    #define CONSUMERS 5
    
    struct fifo_struct
    {
            char fileName[1024];
            struct fifo_struct* next;
    };
    
    struct linked_list
    {
            struct fifo_struct* head;
            struct fifo_struct* tail;
    };
    
    char buffer[BUFFER_SIZE][128];
    int counter;
    
    pthread_mutex_t mutex;
    sem_t full, empty;
    
    
    void print_queue(const struct linked_list* ps)
    {
            struct fifo_struct* p = NULL;
    
            if(ps)
            {
                    for(p = ps->head; p; p = p->next)
                    {
                            if(p)
                                    printf("int = %s\n", p->fileName);
                            else
                                    printf("can't print NULL STRUCT\n");
                    }
            }
    }
    void *producer(void *q);
    void *consumer(void *q);
    struct linked_list *s;
    
    int main()
    {
            pthread_t producerVar[PRODUCERS];
            pthread_t consumerVar[CONSUMERS];
            char str[200];
            int i = 0;
            counter = 0;
            sem_init(&full, 0, 0);
            sem_init(&empty, 0, BUFFER_SIZE);
            pthread_mutex_init(&mutex,NULL);
            struct fifo_struct* fifo;
    
            // Initialize the 5 buffer slots
            for(i = 0; i < BUFFER_SIZE; i++)
            {
                    buffer[i][0] = '\n';
            }
    
            // Create linked list
            s = malloc( 1 * sizeof(*s));
            if(s == NULL)
            {
                    fprintf(stderr, "LINE: %d, malloc() failed\n", __LINE__);
            }
            s->head = s->tail = NULL;
    
            for(i = 0; i < (FILESIZE); i++)
            {
                    // Generates file names to store into queue
                    sprintf(str, "in%d.txt", i);
    
                    // Create queue to store file names
                    fifo = malloc(1 * sizeof(*fifo));
    
                    // Error in creating fifo
                    if(fifo == NULL)
                    {
                            fprintf(stderr, "IN %s, %s: malloc() failed\n", __FILE__, "list_add");
                    }
    
                    // Store filename into queue
                    strcpy(fifo->fileName,str);
                    fifo->next = NULL;
    
                    if(s == NULL)
                    {
                            printf("Error: Queue has not been initialized\n");
                    }
                    else if(s->head == NULL && s->tail == NULL)
                    {
                            // First element in queue
                            s->head = s->tail = fifo;
                    }
                    else if(s->head == NULL || s->tail == NULL)
                    {
                            printf("Error: Problem with code\n");
                            free(fifo);
                    }
                    else
                    {
                            // Increments queue
                            s->tail->next = fifo;
                            s->tail = fifo;
                    }
            }
            //print_queue(s);
    
            // Create producer threads
            for(i = 0; i < PRODUCERS; i++)
            {
                    pthread_create(&producerVar[i], NULL, producer, &i);
                    //pthread_join(producerVar[i], NULL);
            }
            // Create consumer threads
            for(i = 0; i < CONSUMERS; i++)
            {
                    pthread_create(&consumerVar[i], NULL, consumer, s);
                    //pthread_join(consumerVar[i], NULL);
            }
            for(i = 0; i < PRODUCERS; i++)
            {
                    pthread_join(producerVar[i], NULL);
            }
            for(i = 0; i < CONSUMERS; i++)
            {
            //              pthread_join(consumerVar[i], NULL);
            }
    
            return 0;
    }
    
    void *producer(void *idx)
    {
            int myidx = * (int *) idx;
            int i = 0;
            char fileContent[1024];
            char line[1024];
            FILE * myfile;
            struct linked_list *q;
            //print_queue(q);
            struct fifo_struct* tmp1 = NULL;
            struct fifo_struct* tmp2 = NULL;
            //sem_wait(&empty);
            pthread_mutex_lock(&mutex);
            printf("IN PRODUCER\n");
            q = s;
            if(q == NULL)
            {
                    printf("List is empty\n");
                    return(NULL);
            }
            else if(q->head == NULL && q->tail == NULL)
            {
                    printf("List is empty\n");
                    return(NULL);
            }
            else if(q->head == NULL || q->tail == NULL)
            {
                    printf("Error: Problem with code\n");
                    return(NULL);
            }
    
            printf("Producer: %d\n", myidx);
            //print_queue(q);
            myfile = fopen(q->head->fileName,"r");
            tmp1 = q->head;
            tmp2 = tmp1->next;
            free(tmp1);
            q->head = tmp2;
    
            if(q->head == NULL)
                          q->tail = q->head;
            //print_queue(q);
            printf("After printq\n");
            fflush(stdout);
            printf("\n");
    
            if((fgets(line, 1024, myfile)) != NULL)
            {
                    strcpy(fileContent, line);
                    printf("%s",fileContent);
            }
            strcpy(fileContent, line);
            printf("Myfile: %s",fileContent);
            fclose(myfile);
    
            while(fileContent[i] != '\n')
            {
                    fileContent[i] = toupper(fileContent[i]);
                    i++;
            }
    
            pthread_mutex_unlock(&mutex);
            sem_wait(&empty);
            if(counter < BUFFER_SIZE) {
                    strncpy(buffer[counter],fileContent,128);
                    printf("buffer[%d] = %s\n", counter, buffer[counter]);
                    counter++;
            }
    
            sem_post(&full);
            return(NULL);
    }
    
    void *consumer(void *q)
    {
            int myidx = * (int *) q;
    
            printf("\t\t\t\tCONSUMER: %d\n", myidx);
    
            while(1)
            {
                    sem_wait(&full);
                    if(counter > 0) {
                    printf("\t\t\tbuffer[%d] = %s\n", counter - 1, buffer[(counter - 1)]);
                    counter--;
                    }
                    sem_post(&empty);
            }
            return(NULL);
    }


    OUTPUT:
    Code:
    IN PRODUCER
    Producer: 1
    After printq
    
    aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
    Myfile: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
    IN PRODUCER
    Producer: 16
    buffer[0] = AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
    
                            CONSUMER: 141192
                            buffer[0] = AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
    
                            CONSUMER: 141192
                            CONSUMER: 141192
                            CONSUMER: 141192
    After printq
                            CONSUMER: 142232
    
    bbbbbbbb
    Myfile: bbbbbbbb
    buffer[0] = BBBBBBBB
    
    IN PRODUCER
                            buffer[0] = BBBBBBBB
    
    Producer: 0
    After printq
    
    ccccccc
    Myfile: ccccccc
    buffer[0] = CCCCCCC
    
    IN PRODUCER
                            buffer[0] = CCCCCCC
    
    Producer: 19
    After printq
    
    ddddddd
    Myfile: ddddddd
    buffer[0] = DDDDDDD
    
    IN PRODUCER
                            buffer[0] = DDDDDDD
    
    Producer: 17
    After printq
    
    eeeeeee
    Myfile: eeeeeee
    buffer[0] = EEEEEEE
    
    IN PRODUCER
                            buffer[0] = EEEEEEE
    
    Producer: 15
    After printq
    
    ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff
    Myfile: ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff
    buffer[0] = FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF
    
    IN PRODUCER
                            buffer[0] = FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF
    
    Producer: 14
    After printq
    
    ggggggg
    Myfile: ggggggg
    buffer[0] = GGGGGGG
    
    IN PRODUCER
                            buffer[0] = GGGGGGG
    
    Producer: 13
    After printq
    
    hhhhhhh
    Myfile: hhhhhhh
    buffer[0] = HHHHHHH
    
    IN PRODUCER
                            buffer[0] = HHHHHHH
    
    Producer: 12
    After printq
    
    iiiiiii
    Myfile: iiiiiii
    buffer[0] = IIIIIII
    
    IN PRODUCER
                            buffer[0] = IIIIIII
    
    Producer: 11
    After printq
    
    jjjjjjj
    Myfile: jjjjjjj
    buffer[0] = JJJJJJJ
    
    IN PRODUCER
                            buffer[0] = JJJJJJJ
    
    Producer: 10
    After printq
    
    kkkkkk
    Myfile: kkkkkk
    buffer[0] = KKKKKK
    
    IN PRODUCER
                            buffer[0] = KKKKKK
    
    Producer: 9
    After printq
    
    llllllll
    Myfile: llllllll
    buffer[0] = LLLLLLLL
    
    IN PRODUCER
                            buffer[0] = LLLLLLLL
    
    Producer: 8
    After printq
    
    mmmmmmm
    Myfile: mmmmmmm
    buffer[0] = MMMMMMM
    
    IN PRODUCER
                            buffer[0] = MMMMMMM
    
    Producer: 7
    After printq
    
    nnnnnnn
    Myfile: nnnnnnn
    buffer[0] = NNNNNNN
    
    IN PRODUCER
                            buffer[0] = NNNNNNN
    
    Producer: 6
    After printq
    
    ooooooo
    Myfile: ooooooo
    buffer[0] = OOOOOOO
    
    IN PRODUCER
                            buffer[0] = OOOOOOO
    
    Producer: 5
    After printq
    
    ppppppp
    Myfile: ppppppp
    buffer[0] = PPPPPPP
    
    IN PRODUCER
                            buffer[0] = PPPPPPP
    
    Producer: 4
    After printq
    
    qqqqqq
    Myfile: qqqqqq
    buffer[0] = QQQQQQ
    
    IN PRODUCER
                            buffer[0] = QQQQQQ
    
    Producer: 3
    After printq
    
    rrrrrrr
    Myfile: rrrrrrr
    buffer[0] = RRRRRRR
    
    IN PRODUCER
                            buffer[0] = RRRRRRR
    
    Producer: 2
    After printq
    
    sssssssss
    Myfile: sssssssss
    buffer[0] = SSSSSSSSS
    
    IN PRODUCER
                            buffer[0] = SSSSSSSSS
    
    Producer: 18
    After printq
    
    tttttt
    Myfile: tttttt
    buffer[0] = TTTTTT
    
                            buffer[0] = TTTTTT
    Last edited by hansel13; 05-27-2010 at 11:47 PM.

  10. #40
    and the Hat of Guessing tabstop's Avatar
    Join Date
    Nov 2007
    Posts
    14,336
    The problem right now is that it seems like only one buffer is being used the entire time, and only one consumer thread is mostly getting used...
    It's not last night any more, and I haven't read the new code very carefully, but if it's anything like the other code this is what I would have expected. There's nothing about anything you're doing that's going to make the threads take turns. Somebody wins the first race, and then when that gets done there's another race, and it's not uncommon for the same somebody to win that race too.

  11. #41
    Registered User
    Join Date
    May 2009
    Posts
    43
    Quote Originally Posted by tabstop View Post
    It's not last night any more, and I haven't read the new code very carefully, but if it's anything like the other code this is what I would have expected. There's nothing about anything you're doing that's going to make the threads take turns. Somebody wins the first race, and then when that gets done there's another race, and it's not uncommon for the same somebody to win that race too.
    Well each producer thread writes exactly once. There's 20 producer threads, and 20 files. I don't care what order they go in, but they do what I want, which is producer a file ONCE.

    Maybe for the consumer function, I should make it sleep very quickly? Like you mentioned, it's a race to print, but I'd at least like the other consumer threads to print once in a while.

  12. #42
    Registered User
    Join Date
    May 2009
    Posts
    43
    I got this figured out, thanks. Feel free to lock this thread.

  13. #43
    Registered User
    Join Date
    Aug 2010
    Posts
    6
    checkout out mystrange problem with semaphore it is showing semaphore out of range out of range
    Code:
    //sema1.c
    --client
    #include<stdio.h>
    
    #include<stdlib.h>
    
    #include<string.h>
    
    #include<sys/sem.h>
    
    #include<sys/shm.h>
    
    #include<pthread.h>
    
    #include<unistd.h>
    
    #include<stdio.h>
    
    #include<stdlib.h>
    #include <sys/un.h>
    #include <netinet/in.h>
    #include<string.h>
    
    #include <sys/types.h>
    #include <sys/ipc.h>
    #include <sys/sem.h>
    #include<sys/shm.h>
    
    #include<pthread.h>
    
    #include <sys/types.h>          /* See NOTES */
    #include <sys/socket.h>
    #include<unistd.h>
    
    
    
    int sockfd, portno, clilen;
    
    int sem_id,shmid;
    
    void *sharedptr;
    
    int mysemid[100];
    
    int count=0;
    
    
    
    
    
    struct mystruct
    
    {
    
    	pthread_t tid;
    
    	int count;
    
    	int flag;
    
    };
    
    
    
    void *mythreadfunc(void *arg)
    
    {
    
    
    	while(1)
    
    	{
    
    
    
    		char buffer[4];
    
    		memset(buffer,0,4);
    
    		int segment=rand()%5;
    		//int segment=1;
    
    		sprintf(buffer,"%d", segment);
    
    		printf("\n outer threadid is-->%d segment-->%d, buffer =%s\n",(int)(pthread_self()),segment, buffer);
    
    
    		struct sembuf sem_b;
    
    		sem_b.sem_num=0;
    
    		sem_b.sem_op=-1;
    
    		sem_b.sem_flg=SEM_UNDO;
    
    		if(semop(mysemid[segment],&sem_b,1)==-1)
    
    		{
    
    			fprintf(stderr,"outer semaphore lock failed");
    			//sleep(10);
    			//continue;			
    
    		}
    		
    		while(1)
    
    		{
    
    			if(((((struct mystruct *)sharedptr)+segment)->flag)==0)
    
    			{
    
    				((((struct mystruct *)sharedptr)+segment)->tid)=(pthread_self());
    
    				((((struct mystruct *)sharedptr)+segment)->count)=count;
    
    				((((struct mystruct *)sharedptr)+segment)->flag)=1;
    
    				fflush(stdout);
    
    				count++;
    				
    
    				write(sockfd,buffer,4);
    
    				
    				printf("\n---> done main work\n");
    
    				fflush(stdout);
    
    
    				sem_b.sem_num=0;
    
    				sem_b.sem_op=1;
    
    				sem_b.sem_flg=SEM_UNDO;
    				
    
    				if(semop(mysemid[segment],&sem_b,1)==-1)
    
    				{
    
    					fprintf(stderr,"semaphore inner unlock failed ---------->>> %d", segment);
    					perror("\n semv error  : ");
    					//sleep(10);
    					continue;
    
    				}
    
    				break;
    
    			}else
    
    			{
    
    				sem_b.sem_num=0;
    
    				sem_b.sem_op=1;
    
    				sem_b.sem_flg=SEM_UNDO;
    
    
    
    				if(semop(mysemid[segment],&sem_b,1)==-1)
    
    				{	
    
    					perror("semv error 2 : UNLOCK FAILED");
    					//sleep(10);
    
    				}
    			}
    
    
    
    	}//inner while 
    	
    	/*
    	sem_b.sem_num=0;
    
    	sem_b.sem_op=1;
    
    	sem_b.sem_flg=SEM_UNDO;
    	if(semop(mysemid[segment],&sem_b,1)==-1)
    
    	{
    
    		fprintf(stderr,"semaphore outer unlock failed --------->>>> %d", segment);
    		perror(" \n semv error  : ");
    		sleep(10);
    		
    
    	}*/
    	
    
    	}//outer while
    
    }//end function
    
    
    int main(int argc,char **argv)
    
    {
    
    	struct sockaddr_in serv_addr, cli_addr;
    
    	int n;
    
    	if (argc < 2)
    
    	{
    
    		fprintf(stderr,"ERROR, no port provided\n");
    
    		exit(1);
    
    	}
    
    	sockfd = socket(PF_INET, SOCK_STREAM, 0);
    
    
    	if(sockfd<0)
    	{
    
    		perror("\n ERROR creating socket  : ");
    		exit(0);
    	}
    
    
    
    	struct linger linger;
    
    	linger.l_onoff=1;
    
    	linger.l_linger=1;
    
    	setsockopt(sockfd,SOL_SOCKET,SO_LINGER, &linger,sizeof(linger));
    
    
    	memset(&serv_addr, 0, sizeof(serv_addr));
    
    	portno = atoi(argv[1]);
    	
    
    	serv_addr.sin_family =AF_INET;
    
    	serv_addr.sin_addr.s_addr =inet_addr("127.0.0.1");
    
    	serv_addr.sin_port = htons(portno);
    
    	
    	if(connect(sockfd, (struct sockaddr *)&serv_addr, sizeof(serv_addr))< 0)
    	{
    		perror("\n ERROR in connect socket  : ");
    		exit(0);
    	}
    
    
    
    	int i=0;
    
    	for ( i=0;i<100 ;i++ )
    
    	{
    
    		if(i==0)
    
    			mysemid[i]=semget((key_t)102,1,0666|IPC_CREAT);
    
    		else
    
    			mysemid[i]=semget((key_t)i,1,0666|IPC_CREAT);
    
    	
    		if(mysemid[i]<0)
    
    		{
    
    			perror("semid error occured");
    
    			exit(1);
    
    		}
    
    
    
    	}//end for 
    	
    
    	key_t key = 1256;
    
    	shmid=shmget(key,100*sizeof(struct mystruct),0644|IPC_CREAT);
    
    	if(shmid==-1)
    	{
    
    		printf("shmid error");
    		exit(0);
    	}
    
    	sharedptr=shmat(shmid,(void*)0,0);
    
    
    
    	if(sharedptr==(void *)-1)
    
    	{
    
    		fprintf(stderr,"shared memory error");
    
    		fflush(stderr);
    
    	}
    
    	pthread_t thread_ID[100];
    
    	int value;
    
    
    
    	i=0;
    	mythreadfunc(&value);
    
    	/*for(i=0;i<100;i++)
    
    	{
    
    		if((pthread_create(&thread_ID[i] , NULL, mythreadfunc , &value ))<0)
    
    		{
    			perror("\n\n\n\nWARNING\n\n\n");
    
    			fflush(stdout);
    
    		}
    
    		//sleep(10);
    
    	}*/
    	while(1);
    
    }//end main

  14. #44
    Registered User
    Join Date
    Aug 2010
    Posts
    6
    sorry first run the server part
    Code:
    //socketserve.c
    
    #include<stdio.h>
    
    #include<stdlib.h>
    #include <sys/un.h>
    #include <netinet/in.h>
    #include<string.h>
    
    #include <sys/types.h>
    #include <sys/ipc.h>
    #include <sys/sem.h>
    #include<sys/shm.h>
    
    #include<pthread.h>
    
    #include <sys/types.h>          /* See NOTES */
    #include <sys/socket.h>
    #include<unistd.h>
    
    
    
    int newsockfd;
    
    
    //variables for shared and semaphore
    
    int sem_id,shmid;
    
    int mysemid[100];
    
    void *sharedptr;
    
    struct mystruct
    
    {
    
    	pthread_t tid;
    
    	int count;
    
    	int flag;
    
    };
    
    
    union semun
    
    {
    
    	int val; /* Value for SETVAL */
    
    	struct semid_ds *buf; /* Buffer for IPC_STAT, IPC_SET */
    
    	unsigned short *array; /* Array for GETALL, SETALL */
    
    	struct seminfo *__buf; /* Buffer for IPC_INFO
    
    	(Linux specific) */
    
    }sem_union;
    
    
    //end of special purpose variables
    
    void *mythreadfunc(void *arg)
    
    {
    
    	int n;
    
    	int segment;
    
    	char buffer[4];
    
    	memset(buffer,0,sizeof(buffer));
    
    	printf("going to read");
    
    	fflush(stdout);
    
    	while( n= read(newsockfd, buffer, 4))
    
    	{
    
    	
    
    		segment=atoi(buffer);
    
    		memset(buffer,0,sizeof(buffer));
    
    		struct sembuf sem_b;
    
    		sem_b.sem_num=0;
    
    		sem_b.sem_op=-1;
    
    		sem_b.sem_flg=SEM_UNDO;
    		
    
    		printf("inside server--received segment-->%d--data is-->",segment);
    		
    		if(semop(mysemid[segment],&sem_b,1)==-1)
    
    		{
    
    			fprintf(stderr,"semaphore lock failed");
    			sleep(10);
    			continue;
    
    		}
    		printf("threadid-->%d---count-->%d\n",(int)((((struct mystruct *)sharedptr)+segment)->tid),(((struct mystruct *)sharedptr)+segment)->count);
    		
    
    		(((struct mystruct *)sharedptr)+segment)->flag=0;
    
    
    		sem_b.sem_num=0;
    
    		sem_b.sem_op=1;
    
    		sem_b.sem_flg=SEM_UNDO;
    
    
    
    	if(semop(mysemid[segment],&sem_b,1)==-1)
    
    	{	
    
    	
    
    			fprintf(stderr,"semaphore unlock failed");
    			sleep(100);
    
    	}
    
    	}
    
    }
    
    
    int main(int argc,char **argv)
    
    {
    
    
    
    	int sockfd, portno, clilen;
    
    	struct sockaddr_in serv_addr, cli_addr;
    
    	int n;
    
    	if (argc < 2)
    
    	{
    
    		fprintf(stderr,"ERROR, no port provided\n");
    
    		exit(1);
    
    	}
    
    	sockfd = socket(PF_INET, SOCK_STREAM, 0);
    
    	if (sockfd < 0)
    	{
    
    		perror("ERROR creating socket");
    		exit(0);
    	}
    
    	
    	memset( &serv_addr,0, sizeof(serv_addr));
    
    	portno = atoi(argv[1]);
    
    	
    	serv_addr.sin_family =AF_INET;
    
    	serv_addr.sin_addr.s_addr =inet_addr("127.0.0.1");
    
    	serv_addr.sin_port = htons(portno);
    
    
    	if (bind(sockfd, (struct sockaddr *) &serv_addr,sizeof(serv_addr)) < 0)
    	{
    
    		perror("ERROR on binding");
    		exit(0);
    	}
    
    
    	listen(sockfd,50);
    
    	clilen = sizeof(cli_addr);
    
    
    	int i=0;
    
    	for ( i=0;i<100 ;i++ )
    
    	{
    
    		if(i==0)
    
    			mysemid[i]=semget((key_t)102,1,0666|IPC_CREAT);
    
    		else
    
    			mysemid[i]=semget((key_t)i,1,0666|IPC_CREAT);
    
    		if(mysemid[i]<0)
    
    		{
    
    			perror("semid error occured");
    
    			exit(1);
    
    		}
    
    	
    		sem_union.val=1;
    
    		if((semctl(mysemid[i],0,SETVAL,sem_union))<0)
    
    		{
    
    			perror("semid error occured initioalization");
    
    			exit(1);
    
    		}
    
    	}//end for
    
    	
    	key_t key = 1256;
    
    	shmid=shmget(key,100*sizeof(struct mystruct),0644|IPC_CREAT);
    
    	
    	if(shmid==-1)
    
    	printf("shmid error");
    
    	
    	sharedptr=shmat(shmid,(void*)0,0);
    
    	if(sharedptr==(void *)-1)
    
    	{
    
    		fprintf(stderr,"shared memory error");
    
    		fflush(stderr);
    		exit(0);
    
    	}
    
    	memset(sharedptr,0,100*sizeof(struct mystruct));
    
    	while(1)
    
    	{
    
    		newsockfd = accept(sockfd,(struct sockaddr *) &cli_addr,&clilen);
    
    		printf("connection accepted");
    
    		fflush(stdout);
    
    
    		if (newsockfd < 0)
    		{
    
    			perror("ERROR on accept");
    			exit(0);
    		}
    
    
    		printf("connection accepted");
    
    		fflush(stdout);
    
    		
    		pthread_t thread_ID;
    
    		int value;	
    
    		pthread_create(&thread_ID , NULL, mythreadfunc , &value ) ;
    
    	}
    
    
    	for(i=0;i<100;i++)
    
    	{
    
    		semctl(mysemid[i],0,IPC_RMID);
    
    	}
    
    	return 0;
    
    } //end main

  15. #45
    Registered User
    Join Date
    Aug 2010
    Posts
    6
    Code:
    #include<stdio.h>
    
    #include<stdlib.h>
    
    #include<string.h>
    
    #include<sys/sem.h>
    
    #include<sys/shm.h>
    
    #include<pthread.h>
    
    #include<unistd.h>
    
    #include<stdio.h>
    
    #include<stdlib.h>
    #include <sys/un.h>
    #include <netinet/in.h>
    #include<string.h>
    
    #include <sys/types.h>
    #include <sys/ipc.h>
    #include <sys/sem.h>
    #include<sys/shm.h>
    
    #include<pthread.h>
    
    #include <sys/types.h>          /* See NOTES */
    #include <sys/socket.h>
    #include<unistd.h>
    #include<semaphore.h>
    
    
    
    int sockfd, portno, clilen;
    
    int sem_id,shmid;
    
    void *sharedptr;
    
    int mysemid[100];
    
    long int count=0;
    
    
    
    
    
    struct mystruct
    
    {
    
    	sem_t mysem;
    	pthread_t tid;
    
    	long int count;
    
    	int flag;
    
    };
    
    
    
    void *mythreadfunc(void *arg)
    
    {
    
    
    	while(1)
    
    	{
    
    
    
    		char buffer[4];
    
    		memset(buffer,0,4);
    
    		//int segment=rand()%100;
    		int segment=5;
    		//int segment=1;
    
    		sprintf(buffer,"%d", segment);
    
    		printf("\n outer threadid is-->%d segment-->%d, buffer =%s\n",(int)(pthread_self()),segment, buffer);
    
    
    		/*struct sembuf sem_b;
    
    		sem_b.sem_num=0;
    
    		sem_b.sem_op=-1;
    
    		sem_b.sem_flg=SEM_UNDO;
    
    		if(semop(mysemid[segment],&sem_b,1)==-1)
    
    		{
    
    			fprintf(stderr,"outer semaphore lock failed");
    			//sleep(10);
    			//continue;			
    
    		}*/
    		 sem_wait(&((((struct mystruct *)sharedptr)+segment)->mysem));
    		
    		//while(1)
    
    		//{
    
    			//if(((((struct mystruct *)sharedptr)+segment)->flag)==0)
    
    			{
    
    				((((struct mystruct *)sharedptr)+segment)->tid)=(pthread_self());
    
    				((((struct mystruct *)sharedptr)+segment)->count)=count;
    
    				((((struct mystruct *)sharedptr)+segment)->flag)=1;
    
    				fflush(stdout);
    				printf("count-->%ld\n",count);
    				fflush(stdout);
    
    				count++;
    				
    
    				write(sockfd,buffer,4);
    
    				
    				printf("\n---> done main work\n");
    
    				fflush(stdout);
    
    
    				/*sem_b.sem_num=0;
    
    				sem_b.sem_op=1;
    
    				sem_b.sem_flg=SEM_UNDO;
    				
    
    				if(semop(mysemid[segment],&sem_b,1)==-1)
    
    				{
    
    					fprintf(stderr,"semaphore inner unlock failed ---------->>> %d", segment);
    					perror("\n semv error  : ");
    					sleep(10);
    					continue;
    
    				}
    					*/
    				
    				 sem_post(&((((struct mystruct *)sharedptr)+5)->mysem));
    				//break;
    
    			}
    			
    
    
    	//}//inner while 
    	
    	
    	
    
    	}//outer while
    
    }//end function
    
    
    int main(int argc,char **argv)
    
    {
    
    	struct sockaddr_in serv_addr, cli_addr;
    
    	int n;
    
    	if (argc < 2)
    
    	{
    
    		fprintf(stderr,"ERROR, no port provided\n");
    
    		exit(1);
    
    	}
    
    	sockfd = socket(PF_INET, SOCK_STREAM, 0);
    
    
    	if(sockfd<0)
    	{
    
    		perror("\n ERROR creating socket  : ");
    		exit(0);
    	}
    
    
    
    	struct linger linger;
    
    	linger.l_onoff=1;
    
    	linger.l_linger=1;
    
    	setsockopt(sockfd,SOL_SOCKET,SO_LINGER, &linger,sizeof(linger));
    
    
    	memset(&serv_addr, 0, sizeof(serv_addr));
    
    	portno = atoi(argv[1]);
    	
    
    	serv_addr.sin_family =AF_INET;
    
    	serv_addr.sin_addr.s_addr =inet_addr("127.0.0.1");
    
    	serv_addr.sin_port = htons(portno);
    
    	
    	if(connect(sockfd, (struct sockaddr *)&serv_addr, sizeof(serv_addr))< 0)
    	{
    		perror("\n ERROR in connect socket  : ");
    		exit(0);
    	}
    
    
    
    	int i=0;
    
    	for ( i=0;i<100 ;i++ )
    
    	{
    
    		if(i==0)
    
    			mysemid[i]=semget((key_t)102,1,0666|IPC_CREAT);
    
    		else
    
    			mysemid[i]=semget((key_t)i,1,0666|IPC_CREAT);
    
    	
    		if(mysemid[i]<0)
    
    		{
    
    			perror("semid error occured");
    
    			exit(1);
    
    		}
    
    
    
    	}//end for 
    	
    
    	key_t key = 1234;
    
    	shmid=shmget(key,100*sizeof(struct mystruct),0644|IPC_CREAT);
    
    	if(shmid==-1)
    	{
    
    		printf("shmid error");
    		exit(0);
    	}
    
    	sharedptr=shmat(shmid,(void*)0,0);
    
    
    
    	if(sharedptr==(void *)-1)
    
    	{
    
    		fprintf(stderr,"shared memory error");
    
    		fflush(stderr);
    
    	}
    
    	pthread_t thread_ID[100];
    
    	int value;
    
    
    
    	i=0;
    	mythreadfunc(&value);
    
    	/*for(i=0;i<100;i++)
    
    	{
    
    		if((pthread_create(&thread_ID[i] , NULL, mythreadfunc , &value ))<0)
    
    		{
    			perror("\n\n\n\nWARNING\n\n\n");
    
    			fflush(stdout);
    
    		}
    
    		//sleep(10);
    
    	}*/
    	//while(1);
    	pthread_exit(NULL);
    
    }//end main

Popular pages Recent additions subscribe to a feed

Similar Threads

  1. Need Help: Multi-threading and Synchronization
    By Anom in forum C Programming
    Replies: 7
    Last Post: 12-08-2009, 05:34 PM
  2. CFiledialog threads termination
    By ks_lohith in forum C++ Programming
    Replies: 0
    Last Post: 09-11-2009, 12:57 AM
  3. problems creating a linked list
    By jamjar in forum C Programming
    Replies: 5
    Last Post: 10-23-2002, 05:50 AM
  4. Block and wake up certain threads
    By Spark in forum C Programming
    Replies: 9
    Last Post: 06-01-2002, 03:39 AM
  5. problems using threads.....?
    By stumpert in forum C++ Programming
    Replies: 2
    Last Post: 04-27-2002, 09:30 PM