How to implement a POSIX threaded program to solve producer/consumer problem

This is a discussion on How to implement a POSIX threaded program to solve producer/consumer problem within the C++ Programming forums, part of the General Programming Boards category; Hello ALL, In my program, I first receive data from another machine, then processing the data. Data receiveing is fast, ...

  1. #1
    Registered User
    Join Date
    Sep 2008
    Posts
    3

    How to implement a POSIX threaded program to solve producer/consumer problem

    Hello ALL,

    In my program, I first receive data from another machine, then processing the data. Data receiveing is fast, but data processing is relatively time consuming. Hence, I put the code for data processing in a POSIX thread (i am using Red Hat enterpriese linux AS 4). In the main() function, I receive data first. Whevenever a data is received, a new thread for processing the data is created.

    In this thread,
    http://www.codeguru.com/forum/showthread.php?t=459433

    a friend Pelede suggested a class, named SynchronizedQueue as shown below. I think the class is great for solving classic producer/consumer problem. However, I do not exactly know how to use it since my poor C++ knowledge. Can anyone help me with this.

    SynchronizedQueue.cpp


    Code:
    #include "SynchronizedQueue.h"
    /*
    *   SynchronizedQueue.h
    *   Author: Ishay Peled
    */
    using namespace std;
    
    SynchronizedQueue::SynchronizedQueue(){
        m_iSize = 0;
        pthread_mutex_init(&m_ptrMutex,NULL);
        pthread_cond_init(&m_ptrCondition, NULL);
    }
    
    SynchronizedQueue::~SynchronizedQueue(){
        pthread_mutex_destroy(&m_ptrMutex);
        pthread_cond_destroy(&m_ptrCondition);
    }
    
    void SynchronizedQueue::push(Message* obj){
        pthread_mutex_lock(&m_ptrMutex);
        m_vctQueue.push_back(obj);
        m_iSize++;
        if (m_iSize == 1)
            pthread_cond_signal(&m_ptrCondition);
        pthread_mutex_unlock(&m_ptrMutex);
    }
    
    Message* SynchronizedQueue::pop(){
        Message* result;
        void* res;
        pthread_mutex_lock(&m_ptrMutex);
        while (m_iSize == 0)
            pthread_cond_wait(&m_ptrCondition, &m_ptrMutex);
        result = m_vctQueue[0];
        m_vctQueue.erase(m_vctQueue.begin());
        m_iSize--;
        pthread_mutex_unlock(&m_ptrMutex);
        return result;
    }
    
    int SynchronizedQueue::getSize(){
        return m_iSize;
    }
    SynchronizedQueue.h


    Code:
    #ifndef BUFFER_H
    #define BUFFER_H
    
    #include <pthread.h>
    #include <string>
    #include <vector>
    #include "Semaphore.h"
    #include "Message.h"  //Change this to the data type you're using
    
    using namespace std;
    
    class SynchronizedQueue{
    public:
        SynchronizedQueue();
        ~SynchronizedQueue();
        void push(Message* obj);
        Message* pop();
        int getSize();
            
    private:
        std::vector<Message*> m_vctQueue;
        pthread_mutex_t m_ptrMutex;
        pthread_cond_t m_ptrCondition;
        int m_iSize;
    };
    #endif

  2. #2
    a_capitalist_story
    Join Date
    Dec 2007
    Posts
    2,652
    Outside of the CodeGuru reference in the post, this has also been addressed at DevShed.

  3. #3
    Registered User
    Join Date
    Sep 2008
    Posts
    3
    can anyone be of help, Please!

  4. #4
    Registered User C_ntua's Avatar
    Join Date
    Jun 2008
    Posts
    1,853
    You have a class that icontains a vector, called m_vctQueue. This is where the elements are stored. The type of the elements of the vector is Message, which as you are instructed can be changed through message.h. You use the functions pop, push to insert or extract elements from the queue (vector). When you pop you decrease m_size (initialized at 0). When you push you increase it. The point is for you not to pop() when there are no elements int the queue. So you there is the condition if m_size == 0 to wait for a signal. When m_size == 1 again, which means a push() was performed, then you can use pop() again. The pthread_mutex_lock/unlock is to make sure no other thread will use pop(), push() at the same time a pop() or push() is performed (for obvious reasons).
    You also have functions getsize() to get the size of the queue.

    So you create one object SynchronizedQueue and all the threads will pop()/push() elements there. The class will make sure it is properly syncrhonized. Meaning you won't be able to extract elements with pop() if the queue is empty. That is all. If you want more explanation we ll be welcome to help with possibly an example

  5. #5
    Registered User
    Join Date
    Sep 2008
    Posts
    3
    Quote Originally Posted by C_ntua View Post
    You have a class that icontains a vector, called m_vctQueue. This is where the elements are stored. The type of the elements of the vector is Message, which as you are instructed can be changed through message.h. You use the functions pop, push to insert or extract elements from the queue (vector). When you pop you decrease m_size (initialized at 0). When you push you increase it. The point is for you not to pop() when there are no elements int the queue. So you there is the condition if m_size == 0 to wait for a signal. When m_size == 1 again, which means a push() was performed, then you can use pop() again. The pthread_mutex_lock/unlock is to make sure no other thread will use pop(), push() at the same time a pop() or push() is performed (for obvious reasons).
    You also have functions getsize() to get the size of the queue.

    So you create one object SynchronizedQueue and all the threads will pop()/push() elements there. The class will make sure it is properly syncrhonized. Meaning you won't be able to extract elements with pop() if the queue is empty. That is all. If you want more explanation we ll be welcome to help with possibly an example
    Yesterday, 05:29 PM #4
    C_ntua Thank you. If you have got time. Could you please provide me an example program. It would be much helpful for me to understand the SynchronizedQueue class. Thank you in advance

    My application can be simplified as the following problem.

    In the main function, the program constantly receives data (a struct data type, containing two integer type elements), whenever a struct of data is received, the struct are pushed into a queue; On the other hand, once there are idle thread (totally, n threads are created when the program is started), one data unit are popped out, and are processed by one certain thread. ArkM, could you please provide a COMPLETE program to implement the above task using SynchronizedQueue class. Thanks, I really need this.

    the data struct is defined as
    struct DataIn{
    int ItemA;
    int ItemB;
    }

    QUestions:
    (1) How to create n threads.
    (2) Peled told me to use something like ProcessData procD(SynQ); I see this means that I should use a SynchronizedQueue type object as an argument to instantiate a ProcessData type object , but what should be done with SynQ within the ProcessData class.


    I tried my best to make the following code based on my understanding of the SynchronizedQueue class and poor knowledge of C++/OO design. The code is erroreous and incomplete even looks stupid. my apologies for this

    Code:
    #include "SynchronizedQueue.h"
    struct TWO_INT
    {
    	int iA;
    	int iB;
    };
    
    vector<int> SumofTwoNumbers;
    class ProcessData{
    private:
    	SynchronizedQueue* jobs
    		bool destroy;
    public:
    	ProcessData(SynchronizedQueue* jobs)
    	~ProcessData()
    	void destroy()
    	void* run(void* args)
    }
    
    ProcessData::run()
    {
    	while (!destroy){
    		struct TWO_INT* data  = jobs->pop();
    		//process data
    		Sum0fTwoNumbers.push(data->iA + data->iB);
    	}
    }
    
    SynchronizedQueue SynQ;
    SynQ.destroy = 0;
    main()
    {
    	int i;
                    ProcessData ProcD;
    	for(i=0;  i<1000; ++i)
    	{
    		struct TWO_INT* twoint = new TWO_INT;
    		twoint->iA = i;
    		twoint->iB = i + 1;
    		SynQ.push(twoint);
                                    
                                     ProcD.run();
    	}
                    //ProcD.run();// Or put the line here
    	SynQ.destroy = 1;
    
                    ProcD.destroy();//But what the code in the destroy function body should look like?
    }#include "SynchronizedQueue.h"
    struct TWO_INT
    {
    	int iA;
    	int iB;
    };
    
    vector<int> SumofTwoNumbers;
    class ProcessData{
    private:
    	SynchronizedQueue* jobs
    		bool destroy;
    public:
    	ProcessData(SynchronizedQueue* jobs)
    	~ProcessData()
    	void destroy()
    	void* run(void* args)
    }
    
    ProcessData::run()
    {
    	while (!destroy){
    		struct TWO_INT* data  = jobs->pop();
    		//process data
    		Sum0fTwoNumbers.push(data->iA + data->iB);
    	}
    }
    
    SynchronizedQueue SynQ;
    SynQ.destroy = 0;
    main()
    {
    	int i;
                    ProcessData ProcD;
    	for(i=0;  i<1000; ++i)
    	{
    		struct TWO_INT* twoint = new TWO_INT;
    		twoint->iA = i;
    		twoint->iB = i + 1;
    		SynQ.push(twoint);
                                    
                                     ProcD.run();
    	}
                    //ProcD.run();// Or put the line here
    	SynQ.destroy = 1;
    
                    ProcD.destroy();//But what the code in the destroy function body should look like?
    }
    Last edited by cy163cy163; 09-14-2008 at 07:47 PM.

Popular pages Recent additions subscribe to a feed

Similar Threads

  1. Inheritance and Dynamic Memory Program Problem
    By goron350 in forum C++ Programming
    Replies: 1
    Last Post: 07-02-2005, 03:38 PM
  2. Replies: 20
    Last Post: 06-13-2005, 12:53 AM
  3. Problem with Program not Quitting
    By Unregistered in forum Windows Programming
    Replies: 20
    Last Post: 06-12-2002, 12:06 AM
  4. Help to solve this problem
    By Romashka in forum C++ Programming
    Replies: 3
    Last Post: 04-16-2002, 10:32 AM
  5. problem with 2d array program.
    By Niloc1 in forum C Programming
    Replies: 1
    Last Post: 04-08-2002, 06:47 PM

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21