Thread: Boost thread request queue design question

  1. #1
    The superhaterodyne twomers's Avatar
    Join Date
    Dec 2005
    Location
    Ireland
    Posts
    2,273

    Boost thread request queue design question

    Hi folks.

    I want a FIFO URL request queue for an application I'm developing (or rather re-developing). I want the URL requests to be executed on a separate thread and I want them to do a callback to a member function of some class I have. So, I'm using libcurl (with a class wrapper I developed a long time ago), sigc (for callbacks) and boost (for threads). I have something that works , but I'm not sure if it's a good/safe design 'cause I haven't done threadding stuff in a long time. Nor have I really ever done a whole bunch of it in the first place.

    So a brief description of the code...

    main: sets up the sigc callbacks and the threadded url class. It puts two requests to be downloaded, and then sleeps for 10 seconds (the application I'm developing is a GUI, this is simply a console demonstrater).

    printStuff: The callback. It simply prints the source of the webpage that was downloaded. This is just a simple function. While the application I will be using will be using a class-member callback, this demonstrates the poit pretty well.

    libURLstream_thread: A templated class which should handle the threads, the posting, and the actual calling backs.
    * I would appreciate feedback particularily about the doPostAndCallback member.
    * Also, I don't do anything with thread member variable after using it. How dangerous is that? I assumed that once the class closes the thread variable will destruct...
    * And instead of the while(locked) if thread is in use is it OK to simply do a thread.join()? I don't intend to, but it was just something that I thought about considering.
    (Note, my niall::urlStream object will be static. This is because I only want one instance of it as the template arguments will be different if the callback parameters are different (which they will be) in the proper code. )

    So the question: how thread unsafe is this? And what should I do (read in all likelihood) in order to make it more thread safe?

    All feedback appreciated.

    Code:
    // System headers
    #include <iostream>
    #include <fstream>
    #include <string>
    #include <queue>
    
    // Boost headers
    #include <boost/thread.hpp>  
    #include <boost/date_time.hpp>  
    
    // Sigc headers
    #include <sigc++/sigc++.h>
    
    // My headers
    #include <niall/urlStream.h>
    
    
    namespace niall {
    
      // A class which will post to a URL and callback to a user-defined function 
      // when the posting has completed
      template<typename callbackType>
      class libURLstream_thread {
      private:
        // The request type
        struct requestElement {
          std::string url; 
          std::string params;
    
          // Overload the = operator for convenience later
          requestElement &operator = ( requestElement &e ) {
            // Copy the individual paramaters
            url    = e.url; 
            params = e.params; 
    
            return *this; 
          }
        };
        typedef std::queue<requestElement> requestQueue; 
    
    
        // For net access (static, because I only want one instance of it)
        niall::urlStream stream;
    
        // The current queue of requests
        requestQueue theQueue;
    
        // These are for thread things
        bool beingUsed; 
        boost::mutex mutex; 
        boost::thread thread; 
        boost::condition_variable inUse;
        
    
    
    
        // Push a request element to the back of the queue 
        void push( const requestElement &el ) {
          boost::mutex::scoped_lock lock( mutex );
          theQueue.push( el );
          lock.unlock();
    
          inUse.notify_one();
        }
    
        // Pop the first element from the queue, return accordingly
        bool pop( requestElement &el ) {
          boost::mutex::scoped_lock lock( mutex );
    
          // Do nothing if the queue is empty, so return false
          if( theQueue.empty() ) {
            return false;
          }
    
          // Otherwise pop the first element
          el = theQueue.front();
          theQueue.pop();
          return true;
        }
    
    
    
        // The real thread functionality is here
        void doPostAndCallback( const requestElement &req, callbackType &callback ) {
          // Wait until we're good to go
          boost::mutex::scoped_lock lock( mutex ); 
          while ( beingUsed ) 
            inUse.wait( lock ); // possible alternative: thread.join() ?
          beingUsed = true; 
    
          // Do the real posting
          std::cout<< "Posting to: " << req.url << "\n";
          stream.post( req.url, req.params ); 
          std::cout<< "Done!\n\n";
    
          // Call the callback and wait for it to finish
          callback( stream.getSource() );
    
          beingUsed = false; 
        }
    
      public:
        // Constructor, do nothing here
        libURLstream_thread()
        : beingUsed( false ) {
        }
    
        ~libURLstream_thread() {
        }
    
    
    
        // Our post function. This will handle all the stuff for us
        void post( const std::string &url, const std::string &params, callbackType &callback ) {
          // Must wait
          requestElement req;
          req.url = url; 
          req.params = params; 
          push( req ); 
    
          // Pop the frontmost element and start the thread
          if ( pop(req) ) {
            thread = boost::thread( boost::bind( &libURLstream_thread::doPostAndCallback, this, req, callback ) );
          }
        }
      };
    }
    
    void printStuff( const std::string &src ) {
      std::cout<< "Source: " << src << "\n\n"; 
    }
    
    int main( void ) {
      // Set up the callback function
      typedef sigc::signal<void,std::string> urlStreamCallback;
      urlStreamCallback callback; 
      callback.connect( sigc::ptr_fun( printStuff ) );
    
    
      // And now set up the threadded posting
      niall::libURLstream_thread<urlStreamCallback> stream; 
      stream.post( "http://www.something.com", "", callback );
      stream.post( "http://www.google.com",    "", callback );
    
      Sleep( 10000 ); 
      // just to pause it. The application is a gtkmm GUI 
    
      return 0; 
    }
    Last edited by twomers; 01-27-2012 at 08:12 AM. Reason: Added some questions.

  2. #2
    Registered User
    Join Date
    Oct 2006
    Posts
    3,445
    for your callbacks, boost::bind is probably going to become your best friend, if your requirement is that printStuff is a class member function.

  3. #3
    The superhaterodyne twomers's Avatar
    Join Date
    Dec 2005
    Location
    Ireland
    Posts
    2,273
    Quote Originally Posted by Elkvis View Post
    for your callbacks, boost::bind is probably going to become your best friend, if your requirement is that printStuff is a class member function.
    Thanks. That part isn't any problem, just need to change from sigc::ptr_fun to sigc::mem_fun in the callback.connect line and add some arguments, nearly the exact same syntax as boost's actually.

    Is there any benefit to using boost rather than sigc++ for the callbacks? I'm using gtkmm for the GUI stuff which uses sigc++ natively, so I figured I'd use that to be consistent. I might consider changing it if boost might be superior...

    Thanks for the suggestion.
    Last edited by twomers; 01-27-2012 at 09:11 AM.

  4. #4
    Registered User Codeplug's Avatar
    Join Date
    Mar 2003
    Posts
    4,981
    >> requestElement &operator = ( requestElement &e )
    Make param const, or remove it and let compiler generate the operator for you.

    In push(), I wouldn't call lock.unlock() - since that's what Posix recommends. More reading on it: condvars: signal with mutex locked or not? | Loïc OnStage

    >> inUse.wait( lock ); // possible alternative: thread.join() ?
    The intention seems to be "only one thread can stream.post/callback at a time". You're currently using a condition variable/predicate to achieve this. You could of just used another mutex, or you could have kept track of all boost::thread objects created and have thread N join on thread N-1. I wouldn't do anything these...

    >> beingUsed = false;
    This is the predicate to your condition variable. So this is where you should notify_one(), instead of in push(). Also notice that 'mutex' is taking on extra responsibility. It's protecting both the predicate condition and 'theQueue'. In this case, accessing 'theQueue' and the predicate are disjoint and each could use their own distinct mutex.

    In post(), you push() followed by an immediate pop(). With this design, you don't even need a queue at all - just pass the request from post() directly to the new thread.

    >> thread = boost::thread(...);
    The reassignment of 'thread' causes the old thread object to become detached. Any thread of execution associated with the old object will continue to run in the background. So you're safe there.

    Overall I don't see any glaring thread safety issues, other than the use of 3rd party libraries whose thread safety/usage I'm not familiar with. Having said that, the design needs some work.

    Currently, every call to post() has to incur the cost of thread creation. This may be "good enough", if post() is only called a handful of times, spread out over a period of time. But if post() is called hundreds of times in rapid succession, then you could have problems with running out-of/low-on resources (heap space, stack space, etc...).

    Instead, you could create a single thread instance up front. This thread would just wait for requests to come in, post to the stream, invoke the callback, and wait for the next request.
    Here is some sample code you can start with:
    Code:
    #include <boost/thread/thread.hpp>
    #include <boost/thread/mutex.hpp>
    #include <boost/thread/condition.hpp>
    #include <boost/circular_buffer.hpp>
    
    #include <vector>
    #include <algorithm>
    #include <cstdlib>
    #include <iostream>
    using namespace std;
    
    typedef boost::circular_buffer<int> circular_buffer_t;
    
    const size_t PRODUCE_COUNT = 1000;
    const size_t CB_CAPACITY = 32;
    
    circular_buffer_t g_q(CB_CAPACITY);
    boost::mutex g_mux; // protects g_q and the predicate condition below
    boost::condition g_q_not_empty_or_exit;
    bool g_bExit = false;
    
    void random_work();
    
    void consume()
    {
        vector<int> consumed;
        circular_buffer_t q(CB_CAPACITY);
        bool bExit = false;
        while (!bExit)
        {
            {
                boost::unique_lock<boost::mutex> l(g_mux);
                while (g_q.empty() && !g_bExit)
                    g_q_not_empty_or_exit.wait(l);
    
                bExit = g_bExit; // update our cached copy
                q.swap(g_q);
            }//block
    
            cout << "Consuming Q of size " << q.size();
            if (q.size() == CB_CAPACITY)
                cout << "  ** possible overflow **";
            cout << endl;
    
            consumed.insert(consumed.end(), q.begin(), q.end());
            q.clear();
        }//while
    
        cout << "Produced " << PRODUCE_COUNT << " elements" << endl;
        cout << "Consumed " << consumed.size() << " elements" << endl;
    }//consume
    
    int main()
    {
        boost::thread consumer(consume);
    
        // produce some data
        for (int n = 0; n < PRODUCE_COUNT; ++n)
        {
            {
                boost::unique_lock<boost::mutex> l(g_mux);
                g_q.push_back(n);
                g_q_not_empty_or_exit.notify_one();
            }//block
    
           random_work(); // slow things down a bit
        }//for
    
        // tell consumer to exit 
        {
            boost::unique_lock<boost::mutex> l(g_mux);
            g_bExit = true;
            g_q_not_empty_or_exit.notify_all();
        }//block
    
        consumer.join();
        
        return 0;
    }//main
    
    void random_work()
    {
        vector<int> v(rand() % 1024);
        generate(v.begin(), v.end(), rand);
        sort(v.begin(), v.end());
    }//random_work
    Notes:
    - Using boost::circular_buffer for efficiency. Use an allocating container if you're worried about possibly overflowing the circular buffer.
    - Notice the lock is held only long enough to call swap(). This allows the thread to process multiple units of data without contending for the lock. This minimizes lock contention, which is good.

    gg

  5. #5
    The superhaterodyne twomers's Avatar
    Join Date
    Dec 2005
    Location
    Ireland
    Posts
    2,273
    Much appreciated feedback, codeplug. Thanks very much.

    The reason I wanted only one thread to stream.post/callback at a time was because I wasn't certain about the consequences of concurrent requesting on my urlStream class. I didn't think it was safe enough to do it so I didn't allow it. I suppose I wanted to encase all the mutex-es in the class I encased so that I wouldn't have to add more elsewhere.

    I think you're probably right about not needing the queue--was over complicating matters I think. Might take it out. It sounds reasonable.

    'post' will only be called roughly every minute. Possibly a couple of times a minute, but the application I'm developing is nearly entirely automatically operated, so only with manual 'refreshes' will it be called more frequeuntly.

    Thanks for your example. I will look at it and consider adopting that strategy.

    Thanks again for the very helpful comments.

Popular pages Recent additions subscribe to a feed

Similar Threads

  1. Boost thread
    By kargo in forum C++ Programming
    Replies: 2
    Last Post: 04-06-2011, 02:26 AM
  2. Replies: 11
    Last Post: 06-09-2010, 07:31 PM
  3. WaitForMultipleObjects for Boost.Thread?
    By Elysia in forum C++ Programming
    Replies: 8
    Last Post: 05-23-2009, 12:18 PM
  4. Edit thread request
    By edesign in forum A Brief History of Cprogramming.com
    Replies: 12
    Last Post: 02-19-2008, 12:36 PM
  5. boost::thread
    By l2u in forum C++ Programming
    Replies: 4
    Last Post: 11-08-2006, 03:59 PM