Thread: Using condition_variable with unique_lock causing periodic crash (GCC 4.7, OSX)

  1. #1
    Registered User
    Join Date
    Jan 2009
    Posts
    11

    Using condition_variable with unique_lock causing periodic crash (GCC 4.7, OSX)

    I have some code which I adapted (barely changing it) from the C++ Concurrency In Action book, so would have expected it to work -- only it doesn't. What I've tried to do is to implement a thread-safe queue that I can then store background jobs for a thread or threads in. The queue looks like this:

    Code:
    /* imgproc/queue.h */
    
    #pragma once
    #include "imgproc/i_queue.h"
    #include <memory>
    #include <thread>
    #include <queue>
    #include <mutex>
    #include <condition_variable>
    
    using namespace std;
    
    namespace imgproc {
    
      /* Blocking, concurrency-safe queue. The method that blocks is pop(),
       * which makes the current thread wait until there is a value to pop
       * from the queue.
       */
      template <typename T>
        struct ConcurrentQueue : public IQueueWriter<T>, public IQueueReader<T>
        {
          ConcurrentQueue() {}
          ConcurrentQueue( const ConcurrentQueue & ) = delete;
          ConcurrentQueue & operator= ( const ConcurrentQueue & ) = delete;
    
          /* Concurrency-safe push to queue.
           */
          virtual void push( shared_ptr<T> val )
          {
            lock_guard<mutex> lk( _mutex );
            _queue.push( val );
            _cvar.notify_one();
          }
    
          /* Concurrency-safe check if queue empty.
           */
          virtual const bool empty() const
          {
            lock_guard<mutex> lk( _mutex );
            bool result( _queue.empty() );
            return result;
          }
    
          /* Waiting, concurrency-safe pop of value. If there are no values in
           * the queue, then this method blocks the current thread until there
           * are.
           */
          virtual shared_ptr<T> pop()
          {
            unique_lock<mutex> lk( _mutex );
            _cvar.wait( lk, [ this ] {return ! _queue.empty(); } );
            auto value( _queue.front() );
            _queue.pop();
            return value;
          }
    
        private:
          mutable mutex _mutex;
          queue<shared_ptr<T>> _queue;
          condition_variable _cvar;
        };
    
    }
    My understanding is that the one mutex there should protect all attempts to access the queue. However, I have a test that crashes about 1 time in 10:

    Code:
    /* test-that-crashes-fragment.cpp */
    // Should have threads wait until there is a value to pop
    TEST_F( ConcurrentQueueTest,
            ShouldHaveThreadsWaitUntilThereIsAValueToPop ) {
      int val( -1 );
      thread t1( [ this, &val ] {
          for ( uint i( 0 ) ; i < 1000 ; ++i );
          val = *_r_queue->pop();
        } );
      for ( uint i( 0 ) ; i < 1000 ; ++ i ) {
        for ( uint j( 0 ) ; j < 1000 ; ++ j );
        EXPECT_EQ( -1, val );
      }
      _w_queue->push( make_shared<int>( 27 ) );
      t1.join();
      EXPECT_EQ( 27, val );
      EXPECT_TRUE( _r_queue->empty() );
    }
    The variables _r_queue and _w_queue are just interfaces on the same ConcurrentQueue instance, here.

    From hours spent obsessively cout-ing debug info, it looks like the call to pop() is what causes the crash, always (that I've seen) when the _queue member instance variable is empty. Could anyone offer me any advice as to what I'm doing wrong, here? I've seen other posts asking for help with similar problems, but they always seem to say that conditional variables are the answer -- and I'm trying that!

    Or maybe some advice on how I can debug this better to help me solve it? FWIW, I tried manually implementing a while with a sleep( 1 ) in it, and that still periodically crashed, which rather suggests I've managed to get a race condition despite my best efforts -- only I really can't see it.

    Thanks very much for any & all help, and I promise I've tried to figure this out before bothering you all with it.

    Cheers, Doug.

  2. #2
    and the hat of int overfl Salem's Avatar
    Join Date
    Aug 2001
    Location
    The edge of the known universe
    Posts
    39,659
    It would help if we had a main() as well, that we can just copy/paste and see for ourselves what is going on.

    No one is going to spend hours trying to reconstruct your test environment only to discover that it actually works, because we didn't replicate some bug in some code you haven't posted.

    Also, what are those empty while loops supposed to achieve?
    Most modern compilers will eliminate such loops, even on very modest optimisation levels.
    If you dance barefoot on the broken glass of undefined behaviour, you've got to expect the occasional cut.
    If at first you don't succeed, try writing your phone number on the exam paper.

  3. #3
    Registered User
    Join Date
    Jan 2009
    Posts
    11
    Ah, a main would be trickier, but worth doing, as I can use the exact code from the book I'm using & see if that fails -- I'll post back here when I have one.
    Cheers,
    Doug.

  4. #4
    Registered User
    Join Date
    Jan 2009
    Posts
    11
    Oh, and the empty for loops were to give things time to happen -- I was getting weird results with calls to usleep().
    Perhaps there is a better way of achieving this?

  5. #5
    Registered User
    Join Date
    Oct 2006
    Posts
    3,445
    std::this_thread::sleep_for()

  6. #6
    Registered User
    Join Date
    Jan 2009
    Posts
    11
    Quote Originally Posted by Elkvis View Post
    std::this_thread::sleep_for()
    That will come in *very* handy, thankyou!

  7. #7
    Registered User
    Join Date
    Jan 2009
    Posts
    11
    Hi -- so I was working on a one file version of this code with a main, but guess what? I couldn't replicate the error there!
    The only real difference between my example code and the test code that was failing was that the test code used a queue (and two interfaces onto that queue) set up in a gtest fixture.
    So I wrote a version of the test that set up its own queue and interfaces, and found that that test runs without error -- at least so far, although I'll keep testing.
    To be certain, I made my failing test look as much like the one that hasn't yet failed as possible.
    Sure enough, the old one (using the gtest fixture) still periodically fails, whereas the new one (that creates its own queue) hasn't (yet, anyways).
    This is very strange, as the gtest fixtures are supposed to run before every test, and be pristine and new for each of them.
    I wonder if I'm getting something wrong in the lambda passed to the thread constructor? Maybe some value it uses is copying (even though I think I've specified that they should be passed by reference) & isn't finished copying before the test tries to use it?
    Anyway, the test that passes (so far -- ~40 runs) is this:
    Code:
    // Should have threads wait until there is a value to pop
    TEST_F( ConcurrentQueueTest,
            ShouldHaveThreadsWaitUntilThereIsAValueToPop ) {
      auto q( make_shared<ConcurrentQueue<int>>() );
      shared_ptr<IQueueWriter<int>> wq( q );
      shared_ptr<IQueueReader<int>> rq( q );
      int val( -1 );
      thread t( [ & ] {
          val = *rq->pop();
        } );
      for ( uint i( 0 ) ; i < 10 ; ++ i ) {
        this_thread::sleep_for( chrono::milliseconds( 10 ) );
        EXPECT_EQ( -1, val );
      }
      wq->push( make_shared<int>( 27 ) );
      t.join();
      EXPECT_EQ( 27, val );
      EXPECT_TRUE( rq->empty() );
    }
    And the code that (still) periodically fails is this:
    Code:
    // Should have threads wait until there is a value to pop -- old
    TEST_F( ConcurrentQueueTest,
            ShouldHaveThreadsWaitUntilThereIsAValueToPopOld ) {
      int val( -1 );
      thread t( [ & ] {
          val = *_rq->pop();
        } );
      for ( uint i( 0 ) ; i < 10 ; ++ i ) {
        this_thread::sleep_for( chrono::milliseconds( 10 ) );
        EXPECT_EQ( -1, val );
      }
      _wq->push( make_shared<int>( 27 ) );
      t.join();
      EXPECT_EQ( 27, val );
      EXPECT_TRUE( _rq->empty() );
    }
    Obviously, I'm happy I've got this instance working, although I'm still a little concerned that I don't understand why this is.
    Can anyone enlighten me, or point me at a resource that might?
    And thanks everyone for their replies -- it's really appreciated.
    Cheers,
    Doug.

  8. #8
    Registered User
    Join Date
    Jan 2009
    Posts
    11
    Oh -- for completeness, here's the code from the one-file example with a main that I couldn't get to replicate the error:

    Code:
    #include <memory>
    #include <thread>
    #include <queue>
    #include <mutex>
    #include <condition_variable>
    #include <chrono>
    #include <cassert>
    #include <iostream>
    
    using namespace std;
    
    typedef unsigned int uint;
    // ----------------------------------------
    
    template <typename T>
    struct IQueueWriter
    {
      virtual ~IQueueWriter() {}
      virtual void push( shared_ptr<T> ) = 0;
    };
    // ----------------------------------------
    
    template <typename T>
    struct IQueueReader
    {
      virtual ~IQueueReader() {}
      virtual const bool empty() const = 0;
      virtual shared_ptr<T> pop() = 0;
    };
    // ----------------------------------------
    
    /* Blocking, concurrency-safe queue. The method that blocks is pop(),
     * which makes the current thread wait until there is a value to pop
     * from the queue.
     */
    template <typename T>
    struct ConcurrentQueue : public IQueueWriter<T>, public IQueueReader<T>
    {
      ConcurrentQueue() {}
      ConcurrentQueue( const ConcurrentQueue & ) = delete;
      ConcurrentQueue & operator= ( const ConcurrentQueue & ) = delete;
      
      /* Concurrency-safe push to queue.
       */
      virtual void push( shared_ptr<T> val )
      {
        lock_guard<mutex> lk( _mutex );
        _queue.push( val );
        _cvar.notify_one();
      }
      
      /* Concurrency-safe check if queue empty.
       */
      virtual const bool empty() const
      {
        lock_guard<mutex> lk( _mutex );
        bool result( _queue.empty() );
        return result;
      }
      
      /* Waiting, concurrency-safe pop of value. If there are no values in
       * the queue, then this method blocks the current thread until there
       * are.
       */
      virtual shared_ptr<T> pop()
      {
        unique_lock<mutex> lk( _mutex );
        _cvar.wait( lk, [ this ] {return ! _queue.empty(); } );
        auto value( _queue.front() );
        _queue.pop();
        return value;
      }
      
    private:
      mutable mutex _mutex;
      queue<shared_ptr<T>> _queue;
      condition_variable _cvar;
    };
    // ----------------------------------------
    
    int main( const int argc, const char *argv[] ) {
      auto q( make_shared<ConcurrentQueue<int>>() );
      shared_ptr<IQueueWriter<int>> wq( q );
      shared_ptr<IQueueReader<int>> rq( q );
      for ( uint r( 0 ) ; r < 10000 ; ++ r ) {
        cout << r << "\t";
        cout.flush();
        int val( -1 );
        thread t( [ & ] {
            val = *rq->pop();
          } );
        for ( uint i( 0 ) ; i < 5 ; ++ i ) {
          cout.flush();
          this_thread::sleep_for( chrono::milliseconds( 1 ) );
          assert( val == -1 );
        }
        wq->push( make_shared<int>( 27 ) );
        t.join();
        assert( val == 27 );
        assert( rq->empty() );
      }
      cout << endl;
      // ----------
      return 0;
    }

  9. #9
    [](){}(); manasij7479's Avatar
    Join Date
    Feb 2011
    Location
    *nullptr
    Posts
    2,657
    What exactly, was the nature of the crash?
    If it was std::system_error saying "Operation not permitted", you just need to link pthreads (On Linux...so I presume the same on OSX) .

  10. #10
    Registered User
    Join Date
    Jan 2009
    Posts
    11
    Quote Originally Posted by manasij7479 View Post
    What exactly, was the nature of the crash?
    If it was std::system_error saying "Operation not permitted", you just need to link pthreads (On Linux...so I presume the same on OSX) .
    Hi -- I don't believe it was that, as here are the options I'm passing to compile the program (without the files themselves, and without some image magick stuff):
    g++-mp-4.7 -c -std=gnu++11 -pthread -Wall -g -D_GLIBCXX_USE_NANOSLEEP -Iinclude -Igtest/include -I/usr/local/include

  11. #11
    Registered User
    Join Date
    Jan 2009
    Posts
    11

    Actually, problem still exists -- main file to illustrate the issue

    Hi -- I was wrong, and the error has arisen even with the new code I put in as I've continued development.
    I've pulled it out into a single file with compile instructions at the top, and this recreates the issue:

    queuetest.cpp:
    Code:
    /* Compile with:
     * $CXX -std=gnu++11 -pthread -Wall -g -D_GLIBCXX_USE_NANOSLEEP -I/usr/local/include queuetest.cpp -o queuetest
     *
     * Where $CXX points to a G++ compiler, version 4.7
     */
    
    #include <memory>
    #include <thread>
    #include <queue>
    #include <mutex>
    #include <condition_variable>
    #include <chrono>
    #include <cassert>
    #include <iostream>
    
    using namespace std;
    
    typedef unsigned int uint;
    // ----------------------------------------
    
    template <typename T>
    struct IQueueWriter
    {
      virtual ~IQueueWriter() {}
      virtual void push( shared_ptr<T> ) = 0;
    };
    // ----------------------------------------
    
    template <typename T>
    struct IQueueReader
    {
      virtual ~IQueueReader() {}
      virtual const bool empty() const = 0;
      virtual const size_t size() const = 0;
      virtual shared_ptr<T> pop() = 0;
    };
    // ----------------------------------------
    
    /* Blocking, concurrency-safe queue. The method that blocks is pop(),
     * which makes the current thread wait until there is a value to pop
     * from the queue.
     */
    template <typename T>
    struct ConcurrentQueue : public IQueueWriter<T>, public IQueueReader<T>
    {
      ConcurrentQueue() {}
      ConcurrentQueue( const ConcurrentQueue & ) = delete;
      ConcurrentQueue & operator= ( const ConcurrentQueue & ) = delete;
      
      /* Concurrency-safe push to queue.
       */
      virtual void push( shared_ptr<T> val )
      {
        lock_guard<mutex> lk( _mutex );
        _queue.push( val );
        _cvar.notify_one();
      }
      
      /* Concurrency-safe check if queue empty.
       */
      virtual const bool empty() const
      {
        lock_guard<mutex> lk( _mutex );
        bool result( _queue.empty() );
        return result;
      }
    
      virtual const size_t size() const
      {
        lock_guard<mutex> lk( _mutex );
        return _queue.size();
      }
      
      /* Waiting, concurrency-safe pop of value. If there are no values in
       * the queue, then this method blocks the current thread until there
       * are.
       */
      virtual shared_ptr<T> pop()
      {
        unique_lock<mutex> lk( _mutex );
        _cvar.wait( lk, [ this ] { return ! _queue.empty(); } );
        auto value( _queue.front() );
        _queue.pop();
        return value;
      }
      
    private:
      mutable mutex _mutex;
      queue<shared_ptr<T>> _queue;
      condition_variable _cvar;
    };
    // ----------------------------------------
    
    void clear_q( ConcurrentQueue<string> &q )
    {
      while ( ! q.empty() ) q.pop();
      assert( q.empty() );
    }
    
    int main( const int argc, const char *argv[] ) {
      ConcurrentQueue<string> q;
      
      { // Should be able to push values onto the queue
        q.push( make_shared<string>( "Dennis The Menace" ) );
      }
      clear_q( q );
    
      { // Should be able to pop values from the queue
        q.push( make_shared<string>( "Frumpy Towers" ) );
        assert( *q.pop() == "Frumpy Towers" );
      }
      clear_q( q );
    
      { // Should be able to test if queue is empty
        assert( q.empty() );
        q.push( make_shared<string>( "Donkey Balls" ) );
        assert( ! q.empty() );
        q.pop();
        assert( q.empty() );
      }
      clear_q( q );
      
      { // Should be able to concurrently push values
        thread t1( [ & ] {
            for ( uint i( 0 ) ; i < 5000 ; ++ i )
              q.push( make_shared<string>( "Sherlock Holmes" ) );
          } );
        thread t2( [ & ] {
            for ( uint i( 0 ) ; i < 5000 ; ++ i )
              q.push( make_shared<string>( "Professor Moriarty" ) );
          } );
        t1.join();
        t2.join();
        assert( q.size() == static_cast<uint>( 10000 ) );
        pair<uint, uint> counts( 0, 0 );
        for ( uint i( 0 ) ; i < 10000 ; ++ i ) {
          string str( *q.pop() );
          if ( str == "Sherlock Holmes" )
            ++ counts.first;
          else if ( str == "Professor Moriarty" )
            ++ counts.second;
          else
            throw( runtime_error( "Unexpected value in queue" ) );
        }
        assert( counts.first == static_cast<uint>( 5000 ) );
        assert( counts.second == counts.first );
      }
      clear_q( q );
    
      { // Should be able to concurrently pop values
        for ( uint i( 0 ) ; i < 100 ; ++ i )
          q.push( make_shared<string>( "Monty Halfwit" ) );
        pair<uint, uint> counts( 0, 0 );
        thread t1( [ & ] {
            while ( true ) {
              this_thread::sleep_for( chrono::milliseconds( 1 ) );
              q.pop();
              ++ counts.first;
            }
          } );
        thread t2( [ & ] {
            while ( true ) {
              this_thread::sleep_for( chrono::milliseconds( 1 ) );
              q.pop();
              ++ counts.second;
            }
          } );
        t1.detach();
        t2.detach();
        while ( counts.first + counts.second < 100 )
          this_thread::sleep_for( chrono::milliseconds( 100 ) );
        assert( counts.first + counts.second == static_cast<uint>( 100 ) );
        assert( counts.first != static_cast<uint>( 0 ) );
        assert( counts.second != static_cast<uint>( 0 ) );;
      }
      clear_q( q );
    
      { // Should have threads wait until there is a value to pop
        string str( "ORIG" );
        thread t( [ & ] { cout << "pop() ..." << endl; str = *q.pop(); } );
        for ( uint i( 0 ) ; i < 10 ; ++ i ) {
          this_thread::sleep_for( chrono::milliseconds( 10 ) );
          assert( str == "ORIG" );
        }
        cout << "push() ..." << endl;
        q.push( make_shared<string>( "Tossy Halfcakes" ) );
        cout << "Size: " << q.size() << endl;
        t.join();
        assert( str == "Tossy Halfcakes" );
      }
      clear_q( q );
      // ----------
      return 0;
    }
    When I run this, I would expect to it to complete, whereas what happens is that the program hangs on the last call to t.join().
    So it looks like the conditional variable is not working.
    Can anyone please advise on this? I'm completely stumped, and in danger of having to dump the whole project due to time spent on it.
    Cheers,
    Doug.

  12. #12
    Registered User Codeplug's Avatar
    Join Date
    Mar 2003
    Posts
    4,981
    >> // Should be able to concurrently push values
    After that block, t1 and t2 are still running in the background, blocked on q.pop().

    Undefined behavior due to unsynchronized access to counts.first and counts.second.

    >> // Should have threads wait until there is a value to pop
    Undefined behavior due to unsynchronized access to str.

    gg

  13. #13
    Registered User
    Join Date
    Jan 2009
    Posts
    11
    Hi -- thanks very much for replying.
    Do you mean that the threads t1 & t2 are still running, so might be stealing the item for the thread in the final test to pop?
    And, with regards to the unsynchronised access, I thought my accesses were safe, there -- should I really be using a lock to access those, too?
    & thanks again for taking the time to reply.
    Doug.

  14. #14
    Registered User Codeplug's Avatar
    Join Date
    Mar 2003
    Posts
    4,981
    >> t1 & t2 are still running, so might be stealing the item
    Yes.

    >> I thought my accesses were safe, there
    When two or more threads access the same memory location, and at least one of them is writing, then synchronization is required.

    >> should I really be using a lock to access those, too?
    I would redesign the test case so that synchronization isn't needed. Or use std::atomic<int> if you must.

    gg

  15. #15
    Registered User
    Join Date
    Jan 2009
    Posts
    11
    Brilliant, thankyou very much for clarifying that -- it's really helped.
    Doug.

Popular pages Recent additions subscribe to a feed

Similar Threads

  1. Is my code causing my system to crash?
    By camel-man in forum C Programming
    Replies: 1
    Last Post: 04-14-2011, 05:54 PM
  2. Periodic program
    By ranmahs in forum C Programming
    Replies: 2
    Last Post: 09-01-2010, 05:49 PM
  3. fscanf causing a crash
    By dougwilliams in forum C Programming
    Replies: 6
    Last Post: 11-18-2007, 04:52 PM
  4. what is causing this seemingly simple app to crash?
    By Shadow12345 in forum C++ Programming
    Replies: 6
    Last Post: 12-06-2002, 08:36 PM
  5. allegro causing a crash
    By kooma in forum Game Programming
    Replies: 5
    Last Post: 04-06-2002, 02:01 PM