IPC producer consumer with Circular Queue

This is a discussion on IPC producer consumer with Circular Queue within the C Programming forums, part of the General Programming Boards category; ok, im working on an assignment implementing a circular queue (fixed size 15), shared memory, normal producer consumer problem, using ...

  1. #1
    Registered User
    Join Date
    Dec 2012
    Posts
    5

    Smile IPC producer consumer with Circular Queue

    ok, im working on an assignment implementing a circular queue (fixed size 15), shared memory, normal producer consumer problem, using three semaphores (one for the queue and one for each process). The objective is to have the producer put the characters from mytest.dat (max 150 chars) into a circular queue, while the consumer pulls them and outputs them to the screen. The exit condition for the consumer is the character * (which should not be output). Currently the producer does pull the information, however the consumer outputs blank characters and then hangs. Any pointers (no pun intended) would be greatly appreciated. After I can get it working as is, I then need to implement the functions in threads, which I believe I can do fairly easily. Am I on the right track as of yet? There arent really any specifications other than what ive said, so I have some play in the implementation. Thank you for your advice and guidance.

    Code:
    1. #include <stdlib.h>
    2. #include <stdio.h>
    3. #include <unistd.h>
    4. #include <sys/types.h>
    5. #include <sys/ipc.h>
    6. #include <sys/sem.h>
    7. #include <sys/shm.h>
    8. #define NUM_ELEM 15 /* Number of elements in shared memory buffer */
    9. #define SEM_MUTEX 0
    10. #define SEM_EMPTY 1
    11. #define SEM_FULL 2
    12. FILE* fp;
    13. int rc, semID, shmID, status, i, x;
    14. char elem;
    15. union semun
    16. {
    17. int val;
    18. struct semid_ds *buf;
    19. ushort *array;
    20. } seminfo;
    21. struct sembuf WaitMutex={SEM_MUTEX, -1, 0};
    22. struct sembuf SignalMutex={SEM_MUTEX, 1, 0};
    23. struct sembuf WaitEmpty={SEM_EMPTY, -1, 0};
    24. struct sembuf SignalEmpty={SEM_EMPTY, 1, 0};
    25. struct sembuf WaitFull={SEM_FULL, -1, 0};
    26. struct sembuf SignalFull={SEM_FULL, 1, 0};
    27. struct shmid_ds shminfo;
    28. char *shmPtr;
    29. void initialize();
    30. void producer();
    31. void consumer();
    32. main()
    33. {
    34. /* Open file */
    35. fp= fopen("mytest.dat", "r");
    36. /* Initialize shared memory and semaphores */
    37. initialize();
    38. /* Start a child process and proceed accordingly*/
    39. if (fork()==0)
    40. {
    41. /* Child becomes the consumer */
    42. consumer();
    43. /* Child quits after consuming 26 characters */
    44. exit(0);
    45. }
    46. else
    47. {
    48. /* Parent becomes the producer */
    49. producer();
    50. /* Wait for child to finish */
    51. wait(&status);
    52. /* Remove shared memory */
    53. shmctl(shmID, IPC_RMID, &shminfo);
    54. /* Remove semaphores */
    55. semctl(semID, SEM_MUTEX, IPC_RMID, seminfo);
    56. /* Close file */
    57. fclose(fp);
    58. /* Parent is done cleaning up, so now quits */
    59. exit(0);
    60. }
    61. }
    62. void initialize()
    63. {
    64. /* Init semaphores */
    65. /* Three semaphores (Empty, Full, Mutex) are created in one set */
    66. semID=semget(IPC_PRIVATE, 3, 0666 | IPC_CREAT);
    67. /* Init Mutex to one, allowing access to critical section */
    68. seminfo.val=1;
    69. semctl(semID, SEM_MUTEX, SETVAL, seminfo);
    70. /* Init Empty to number of elements in shared memory (circular buffer) */
    71. seminfo.val=NUM_ELEM;
    72. semctl(semID, SEM_EMPTY, SETVAL, seminfo);
    73. /* Init Full to zero, no elements are produced yet */
    74. seminfo.val=0;
    75. semctl(semID, SEM_FULL, SETVAL, seminfo);
    76. /* Init Shared memory */
    77. shmID=shmget(IPC_PRIVATE, NUM_ELEM, 0666 | IPC_CREAT);
    78. }
    79. void producer()
    80. {
    81. /* attach shared memory to process */
    82. shmPtr=(char*)shmat(shmID, 0, SHM_W);
    83. while((x = fgetc(fp)) != EOF)
    84. {
    85. /* Wait(Empty) - pause if no empty spots in circular buffer (i.e. all filled) */
    86. semop(semID, &WaitEmpty, 1);
    87. elem = x;
    88. printf("Produced elem '%c'\n", elem);
    89. /* Wait(Mutex) - don't touch shared memory while consumer is using it */
    90. semop(semID, &WaitMutex, 1);
    91. /* Put element into shared memory buffer (circular buffer) */
    92. *(shmPtr + (i%NUM_ELEM))=elem;
    93. /* Signal(Mutex) - allow consumer to access shared memory now */
    94. semop(semID, &SignalMutex, 1);
    95. /* Signal(Full) - record one more filled spot in circular buffer */
    96. semop(semID, &SignalFull, 1);
    97. }
    98. }
    99. void consumer()
    100. {
    101. /* attach shared memory to process */
    102. shmPtr=(char*)shmat(shmID, 0, SHM_R);
    103. while((elem != '*'))
    104. {
    105. /* Wait(Full) - pause if no filled spots in circular buffer (i.e. all empty) */
    106. semop(semID, &WaitFull, 1);
    107. /* Wait(Mutex) - don't touch shared memory while producer is using it */
    108. semop(semID, &WaitMutex, 1);
    109. /* Get element from the shared memory buffer (circular buffer) */
    110. elem=*(shmPtr + (i%NUM_ELEM));
    111. /* Signal(Mutex) - allow producer to access shared memory now */
    112. semop(semID, &SignalMutex, 1);
    113. /* Display character */
    114. printf("Consumed elem '%c'\n", elem);
    115. /* Signal(Empty) - record one more empty spot in circular buffer */
    116. semop(semID, &SignalEmpty, 1);
    117. }
    118. }
    Last edited by mpunzirudu; 12-01-2012 at 01:08 PM. Reason: typo on line 103

  2. #2
    Registered User
    Join Date
    Nov 2010
    Location
    Long Beach, CA
    Posts
    5,467
    Please fix your post to make it readable. By that, I mean remove the line numbers (our forum will add them), and any other formatting, like font, color, etc, so the forum's syntax highlighter can do it's job. Also fix your indentation, so we can see the flow of your code. I'll start looking into your problem in the mean while.

    EDIT: Never mind about the line numbers, they appear to be from the forum, they just look strange. Still, remove any fonts, colors, etc and fix your indentation please.

  3. #3
    Registered User
    Join Date
    Nov 2010
    Location
    Long Beach, CA
    Posts
    5,467
    A few warnings/errors to fix. Crank up the warnings on your compiler if you're not seeing these:
    Code:
    $ make queue
    gcc -g -Wall -std=c99 -o queue queue.c -lpthread -lm -lpq
    In file included from queue.c:5:0:
    /usr/include/sys/ipc.h:25:3: warning: #warning "Files using this header must be compiled with _SVID_SOURCE or _XOPEN_SOURCE" [-Wcpp]
    queue.c:19:5: error: unknown type name ‘ushort’
    queue.c:32:1: warning: return type defaults to ‘int’ [enabled by default]
    queue.c: In function ‘main’:
    queue.c:51:9: warning: implicit declaration of function ‘wait’ [-Wimplicit-function-declaration]
    1. You may/may not need to define one of those, depending on your implementation (OS/compiler).
    2. You might be missing a typedef somewhere.
    3. You should declare main to explicitly return an int, like int main(void).
    4. You need to #include <sys/wait.h> to use the wait() function.


    A sample mytest.dat file would be helpful, so I know what kind of data to put in there, and about how much.

    On first run, without the input file, your program seg faulted. This is not good, you don't handle when fopen fails. Also, you probably only need to open the file in the producer. Also, you should properly check fork for errors.

    You actuall need to add error checking everywhere, you currently have none. Any function that returns success/fail or an error code needs to be checked, and an appropriate message printed out. I recommend perror() or strerror(errno). Google for details/examples. All of your file IO, IPC and system functions will return success/fail/error code. Properly check the return values of fopen, fork, semget, semctl, shmget, shmat, semop. There may be others. If they fail, your program should act appropriately, exiting if necessary. Check the man pages for these functions, either on your computer or on the web. The man pages contain all the details you need to know for error checking.

    That will help you track down your problem. If you can't solve it from there, post back with your updated code, any error messages you don't understand, and any other details we might need to help you.

  4. #4
    Registered User
    Join Date
    Dec 2012
    Posts
    5
    Thanks for the pointers so far, my "mytest.dat" is a plain text file with "abcdef*" without the quotes, but could be any number of chars (under 150) with a * to terminate the output in the consumer. I will add in some error checking and move the opening to within the producer and repost. Thanks again

  5. #5
    Registered User
    Join Date
    Dec 2012
    Posts
    5
    Code:
    #define _XOPEN_SOURCE
    #include <stdlib.h> 
    #include <stdio.h>
    #include <unistd.h>
    #include <sys/types.h>
    #include <sys/ipc.h>
    #include <sys/sem.h>
    #include <sys/shm.h>
    #include <sys/wait.h>
    
    #define NUM_ELEM 15	/* Number of elements in shared memory buffer */
    #define SEM_MUTEX 0
    #define SEM_EMPTY 1
    #define SEM_FULL 2
    
    FILE* fp;
    int rc, semID, shmID, status, i, x;
    char elem;
    union semun
    {
        int val;
        struct semid_ds *buf;
        short *array;
    } seminfo;
    
    struct sembuf WaitMutex={SEM_MUTEX, -1, 0};
    struct sembuf SignalMutex={SEM_MUTEX, 1, 0};
    struct sembuf WaitEmpty={SEM_EMPTY, -1, 0};
    struct sembuf SignalEmpty={SEM_EMPTY, 1, 0};
    struct sembuf WaitFull={SEM_FULL, -1, 0};
    struct sembuf SignalFull={SEM_FULL, 1, 0};
    struct shmid_ds shminfo;
    char *shmPtr;
    
    void initialize();
    void producer();
    void consumer();
    
    int main()
    {
      
      /* Initialize shared memory and semaphores */
      initialize();
    
      /* Start a child process and proceed accordingly*/
      if (fork()==0)
      {
        /* Child becomes the consumer */
        consumer();
        exit(0);
      }
      else
      {
        /* Parent becomes the producer */
        producer();
    
        /* Wait for child to finish */
        wait(&status);
    
        /* Remove shared memory */
        shmctl(shmID, IPC_RMID, &shminfo);
    
        /* Remove semaphores */
        semctl(semID, SEM_MUTEX, IPC_RMID, seminfo);
        
        /* Close file */
        fclose(fp);
     	
        /* Parent is done cleaning up, so now quits */
        return 0;
      }
    }
    
    void initialize()
    {
    
      /* Init semaphores */
      /* Three semaphores (Empty, Full, Mutex) are created in one set */
      semID=semget(IPC_PRIVATE, 3, 0666 | IPC_CREAT);
    
      /* Init Mutex to one, allowing access to critical section */
      seminfo.val=1;
      semctl(semID, SEM_MUTEX, SETVAL, seminfo);
    
      /* Init Empty to number of elements in shared memory (circular buffer) */
      seminfo.val=NUM_ELEM;
      semctl(semID, SEM_EMPTY, SETVAL, seminfo);
    
      /* Init Full to zero, no elements are produced yet */
      seminfo.val=0;
      semctl(semID, SEM_FULL, SETVAL, seminfo);
    
      /* Init Shared memory */
      shmID=shmget(IPC_PRIVATE, NUM_ELEM, 0666 | IPC_CREAT);
      
      if (shmID == -1)
      {
      	perror("shmget failed");
      	exit(1);
      }
    }
    
    void producer()
    {
                 
        /* Open file */
         fp= fopen("mytest.dat", "r");
         if (fp == NULL)
        {
            printf("File not found!\n");
        }
    
        /* attach shared memory to process */
        shmPtr=(char*)shmat(shmID, 0, SHM_W);
       
        /* Check for error in shmptr assignment */
        if (shmPtr == (void *)(-1))
        {
           perror("shmat failed in producer");
           exit(1);
        }
     
        while((x = fgetc(fp)) != EOF)
        {
     
    	/* Wait(Empty) - pause if no empty spots in circular buffer (i.e. all filled) */
            semop(semID, &WaitEmpty, 1);
    
            elem = x;
            printf("Produced elem '%c'\n", elem);
    
            /* Wait(Mutex) - don't touch shared memory while consumer is using it */
            semop(semID, &WaitMutex, 1);
    
            /* Put element into shared memory buffer (circular buffer) */
            *(shmPtr + (i%NUM_ELEM))=elem;
    
            /* Signal(Mutex) - allow consumer to access shared memory now */
            semop(semID, &SignalMutex, 1);
    
            /* Signal(Full) - record one more filled spot in circular buffer */
            semop(semID, &SignalFull, 1);
        }
    }
    
    void consumer()
    {
        /* attach shared memory to process */
        shmPtr=(char*)shmat(shmID, 0, SHM_R) = 0;
    
        while((x = fgetc(fp)) != EOF)
        {
            /* Wait(Full) - pause if no filled spots in circular buffer (i.e. all empty) */
            semop(semID, &WaitFull, 1);
    
            /* Wait(Mutex) - don't touch shared memory while producer is using it */
    	 semop(semID, &WaitMutex, 1);
    
            /* Get element from the shared memory buffer (circular buffer) */
            elem=*(shmPtr + (i%NUM_ELEM));
    
            /* Signal(Mutex) - allow producer to access shared memory now */
            semop(semID, &SignalMutex, 1);
    
            /* Display character */
            printf("%c", (char)elem);
    
            /* Signal(Empty) - record one more empty spot in circular buffer */
            semop(semID, &SignalEmpty, 1);
        }
    }

  6. #6
    Registered User
    Join Date
    Dec 2012
    Posts
    5
    I apologize for the bad cut and paste job before, I went back and reread the tutorials on tags etc. Currently the only error I am getting is an invalid lvalue assignment in the first line of the consumer code. Ive tried multiple different types of values, I am missing something elementary I believe, Ive just been stairing at this for so long..lol, im sure everyones been there more than once.

  7. #7
    Ultraviolence Connoisseur
    Join Date
    Mar 2004
    Posts
    472
    Code:
    shmPtr=(char*)shmat(shmID, 0, SHM_R) = 0;
    You are trying to assign the return of the function shmat() to 0 which is not possible. Thats the invalid lvalue assignment, by the way there is no need to cast the return of shmat to (char*) all that is going to do is hide errors if you happen to be using an invalid type or if the return value changes foro shmat in the future.

  8. #8
    Registered User
    Join Date
    Nov 2010
    Location
    Long Beach, CA
    Posts
    5,467
    Code:
    shmPtr=(char*)shmat(shmID, 0, SHM_R) = 0;
    You have two assignments in there. I'm pretty sure you don't need the = 0 part. The reason for that error is because it's trying to assign 0 to the shmat() call, and you can only assign to an lvalue. Function calls do not result in lvalues, neither do any type-cast expressions. Perhaps if you explain what you were trying to do there, we can find a workaround.

  9. #9
    Registered User
    Join Date
    Dec 2012
    Posts
    5
    That was a mistake/typo that I did not catch, when thats out, it compiles. The producer outputs from the file to the screen, however the consumer doesnt at all. I tried changing 'while((x = fgetc(fp)) != EOF)' to 'while((elem != '*')' within the Consumer function, because that ultimatly is what Id like the result to be. When the producer puts a * character into the buffer, the consumer should end and not print the *. The expected output is just the contents of the mytest.dat file (up to 150 chars ending with a *) which is put into the shared memory buffer and printed to the screen from the consumer.

    Thank you all very much for helping, I appreciate the understanding I am gaining just as much, if not more, than the solution. I just have scowered the man pages and internet for awhile and im at a "aw crap" point.

  10. #10
    Registered User
    Join Date
    Nov 2010
    Location
    Long Beach, CA
    Posts
    5,467
    I'm thinking, since you're at an "aw crap" point, that maybe we should ditch your current implementation. I'll start with some notes on your most recent version of code anyway, just so you can avoid reproducing the same errors in your new version (if you choose to start over).

    Your current handling of the return value of fork lumps the error case (returns -1) in with the parent case (returns > 0). Here's what a proper fork setup should look like:
    Code:
    pid_t pid;
    
    pid = fork();
    if (pid == 0)
        // child
    else if (pid > 0)
        // parent
    else
        // perror, exit
    Also, if fopen fails, you need to exit, otherwise there is still a potential seg fault. Having no input file means you should end the program, there is no point in continuing.

    You should check whether you got a '*' or an EOF (if you reach the end of file, no point in trying to keep producing elements):
    Code:
    while ((x = fgetc(fp)) != EOF && x != '*')
    Why is the consumer reading from the file pointer? He should be checking the character he pulled from the queue for '*'. Also, i is never updated when you write to or read from the queue, so you are not going to be reading/writing the right data.

    If you do start over:

    Forget about the semaphores and shared memory for a minute, and work on your queue/circular buffer implementation. Make sure you have a working circular buffer, and test it thoroughly. Then, worry about adding your shared memory. You definitely need insert and remove functions, also you might want is_full, is_empty, get_count. Make sure your insert and remove return a code that indicates whether the operation failed, can't insert because queue is full, or can't remove, queue empty.

    Then start on the forking/shared memory/semaphore part.

    Make sure you check the return value of every system call, shared memory function call and semaphore call. That means every call to shmget, shmat, semget, semop, semctl, etc. Every man page for those functions will have a section labeled "RETURN VALUE" that will tell you what values to check for on error. If a call fails, print out a message with perror and exit the program.

Popular pages Recent additions subscribe to a feed

Similar Threads

  1. Replies: 3
    Last Post: 11-11-2010, 11:05 AM
  2. producer consumer deadlock
    By strider1974 in forum C Programming
    Replies: 11
    Last Post: 08-17-2009, 10:47 AM
  3. producer / consumer question
    By pheres in forum C++ Programming
    Replies: 8
    Last Post: 08-02-2007, 05:55 PM
  4. Consumer - Producer
    By MethodMan in forum C Programming
    Replies: 0
    Last Post: 04-18-2003, 07:14 PM
  5. Producer consumer problem
    By traz in forum C Programming
    Replies: 2
    Last Post: 11-08-2002, 07:04 PM

Tags for this Thread


1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21