thread pool issues

This is a discussion on thread pool issues within the C++ Programming forums, part of the General Programming Boards category; I have written a program that creates a thread pool and queues requests for the pool to consume and process. ...

  1. #1
    Registered User
    Join Date
    Oct 2006
    Posts
    2,407

    thread pool issues

    I have written a program that creates a thread pool and queues requests for the pool to consume and process. the problem I'm having is that once all threads are busy, one thread still always reports that it is available, so the program doesn't spawn any new threads, but the "available" thread never wakes up when I signal the condition variable, so the queue just constantly grows in size. I have the threads artificially taking longer than they naturally would, so that I can test the automatic grow/shrink capability of the engine.

    because of the use of clock_gettime() and its use of nanosleep() through std::this_thread::sleep_for(), this program specifically targets linux, although with a little creativity from anyone able to understand it, it could be made to run on windows or other unix-ish operating systems.

    this program also makes use of C++11 features. I built it with GCC 4.6.2 on Fedora 15 x64. I don't know what minimum version of GCC is required to compile it.

    Code:
    /*
     * A program to automatically manage the number of worker threads in response
     * to changing demand on the system.  This program automatically spawns new
     * threads to process requests in the event that no worker thread is available,
     * and automatically terminates threads if they have been idle through ten
     * consecutive wakeups.
     */
    
    #ifndef _GLIBCXX_USE_NANOSLEEP
    #define _GLIBCXX_USE_NANOSLEEP
    #endif
    
    #include <iostream>
    #include <sstream>
    #include <deque>
    #include <thread>
    #include <stdexcept>
    #include <memory>
    #include <map>
    #include <functional>
    
    // uncomment the next line if StringFormat.h is available
    //#define USE_GOOD_STRFORMAT
    
    #ifdef USE_GOOD_STRFORMAT
    // StringFormat.h contains the definition for the StrFormat function.
    // StrFormat behaves similar to System.String.Format() in the .Net Framework
    // and is declared as
    // template<typename ... Args>
    // std::string StrFormat(const std::string& fmt, Args ... args)
    #include "StringFormat.h"
    #else
    
    template<typename ... Args>
    std::string StrFormat(const std::string& fmt, Args ... args)
    {
      return fmt;
    }
    
    #endif
    
    // thread-safe version of rand() that allows a maximum and minimum
    std::mutex randMutex;
    auto GetRand(int max = 0, int min = 0) -> decltype(rand())
    {
      std::lock_guard<std::mutex> lock(randMutex);
      auto r = rand();
      if (min)
      {
        if (max)
        {
          r = (r % ((max + 1)  - min)) + min;
        }
        else
        {
          r += min;
        }
      }
      else
      {
        if (max)
        {
          r = r % (max + 1);
        }
      }
      return r;
    }
    
    // thread-safe formatted access to standard output
    std::mutex coutMutex;
    template<typename... Args>
    void Print(Args... args)
    {
      std::string s = StrFormat(args...);
      std::lock_guard<std::mutex> lock(coutMutex);
      std::cout << s << std::endl;
    }
    
    double TimespecToDouble(struct timespec& ts)
    {
      const long long NSEC_IN_ONE_SECOND = 1000000000;
      double res = ts.tv_sec;
      res *= NSEC_IN_ONE_SECOND;
      res += ts.tv_nsec;
      res /= NSEC_IN_ONE_SECOND;
      return res;
    }
    
    class Request
    {
      public:
        Request(int value) : m_value(value)
        {
            clock_gettime(CLOCK_REALTIME, &ts);
        }
    
        void operator () ()
        {
          struct timespec now;
          clock_gettime(CLOCK_REALTIME, &now);
          double currentTime = TimespecToDouble(now);
          double queuedTime = TimespecToDouble(this->ts);
    
          // print the value of the request and the time it spent in the queue,
          // to the nearest microsecond (6 digits of precision)
          Print("Request with value {0} running after spending {1:6} seconds in the queue.", this->m_value, currentTime - queuedTime);
        }
    
        int m_value;
        struct timespec ts;
    };
    
    class ThreadStatus
    {
      public:
        ThreadStatus(int threadid, std::thread* threadptr, bool busy) : id(threadid), thread(threadptr), isBusy(busy) {  }
    
        int id;
        std::shared_ptr<std::thread> thread;
        bool isBusy;
    };
    
    std::ostream& operator << (std::ostream& os, const Request& rhs)
    {
      os << "{ " << rhs.m_value << " }";
      return os;
    }
    
    template<typename T>
    std::ostream& operator << (std::ostream& os, const std::deque<T>& rhs)
    {
      os << "{ ";
      int i = 0;
      for (auto it : rhs)
      {
        if (i) os << ", ";
        os << it;
        ++i;
      }
      os << " }";
      return os;
    }
    
    template<typename requestT>
    class MultiThreadEngine;
    
    template<typename requestT>
    class BusyThread
    {
        MultiThreadEngine<requestT>* m_queue;
        int m_threadId;
    
      public:
        BusyThread() = delete;
        BusyThread(const BusyThread&) = delete;
        BusyThread(BusyThread&&) = delete;
    
        BusyThread(MultiThreadEngine<requestT>* queue, int threadId);
    
        ~BusyThread();
    };
    
    template<typename requestT>
    class MultiThreadEngine
    {
      private:
        // private types
        typedef std::recursive_mutex mutexType;
        typedef MultiThreadEngine<requestT> thisType;
    
        // friend classes
        friend class BusyThread<requestT>;
    
        // private functions
    
        // notify all threads waiting on the queue
        void Notify()
        {
          m_conVar.notify_all();
        }
    
        std::string QueueToString()
        {
          std::stringstream ss;
          ss << m_queue;
          return ss.str();
        }
    
        // update the specified thread's availability status
        void SetThreadStatus(int threadId, bool isBusy)
        {
          std::unique_lock<mutexType> lock(m_mutex);
          Print("Setting thread {0} status to {1}", threadId, isBusy ? "busy" : "available");
          if (m_threadList.find(threadId) != m_threadList.end())
          {
            m_threadList.at(threadId).isBusy = isBusy;
          }
        }
    
        // if a thread is available to handle a request, return true
        // else return false
        bool ThreadsAreAvailable()
        {
          std::unique_lock<mutexType> lock(m_mutex);
          for (auto& it : m_threadList)
          {
            if (!it.second.isBusy)
            {
              Print("Thread {0} is available.", it.first);
              return true;
            }
          }
          return false;
        }
    
        // wait for the queue to have a request added
        void Wait()
        {
          std::unique_lock<mutexType> lock(m_mutex);
          m_conVar.wait(lock);
        }
    
        // get and remove the first request in the queue
        std::unique_ptr<requestT> DequeueRequest()
        {
          std::unique_lock<mutexType> lock(m_mutex);
          if (m_queue.empty())
          {
            return std::unique_ptr<requestT>();
          }
          std::unique_ptr<requestT> r(new requestT(m_queue.front()));
          m_queue.pop_front();
          Print("Queue: {0}", QueueToString());
          return r;
        }
    
        // the procedure that handles the heavy lifting for each thread
        void ThreadProc(int id)
        {
          int idleCount = 0;
          Print("Starting thread {0}", id);
          while (true)
          {
            try
            {
              Wait();
              std::unique_ptr<requestT> request(DequeueRequest());
              if (request)
              {
                idleCount = 0;
                BusyThread<requestT>(this, id);
    
                Print("Waking up thread {0}", id);
    
                // tell the request to process
                (*request)();
    
                std::this_thread::sleep_for(std::chrono::seconds(10));
              }
              else
              {
                ++idleCount;
                if (idleCount > 10)
                {
                  std::unique_lock<mutexType> lock(m_mutex);
    
                  if (m_threadList.size() > m_numThreads && m_threadList.find(id) != m_threadList.end())
                  {
                    Print("Terminating thread {0} due to idle timeout. There are now {1} threads.", id, m_threadList.size() - 1);
                    m_threadList.at(id).thread->detach();
                    m_threadList.erase(m_threadList.find(id));
                  }
                  return;
                }
              }
            }
            catch (std::exception& ex)
            {
              Print("Thread {0} exception caught: {1}", id, ex.what());
            }
          }
        }
    
        // thread-safe function spawn a new thread
        void SpawnThread()
        {
          std::unique_lock<mutexType> lock(m_mutex);
    
          int i = 0;
          if (! m_threadList.empty())
          {
            i = m_threadList.rbegin()->second.id + 1;
          }
    
          Print("Spawning thread with id {0}.  There are now {1} threads.", i, m_threadList.size() + 1);
    
          std::thread* thread = new std::thread(std::bind(&MultiThreadEngine<requestT>::ThreadProc, this, std::placeholders::_1), i);
    
          m_threadList.insert(std::make_pair(i, std::move(ThreadStatus(i, thread, false))));
        }
    
        // private member data
        std::deque<requestT> m_queue;
        std::map<int, ThreadStatus> m_threadList;
        mutexType m_mutex;
        std::condition_variable_any m_conVar;
        size_t m_numThreads;
    
      public:
        MultiThreadEngine(size_t numThreads = 16) : m_numThreads(numThreads)
        {
    
        }
    
        // add a request to the end of the queue
        void EnqueueRequest(const requestT& request)
        {
          std::unique_lock<mutexType> lock(m_mutex);
          m_queue.push_back(request);
          Print("Queue: {0}", QueueToString());
          if (!ThreadsAreAvailable())
          {
            Print("No threads are available. Spawning a new one.");
            SpawnThread();
          }
          Notify();
        }
    
        // start the multithreaded request processing engine
        void Run()
        {
          for (size_t i = 0; i < m_numThreads; ++i)
          {
            SpawnThread();
          }
        }
    };
    
    template<typename requestT>
    BusyThread<requestT>::BusyThread(MultiThreadEngine<requestT>* queue, int threadId) : m_queue(queue), m_threadId(threadId)
    {
      m_queue->SetThreadStatus(m_threadId, true);
    }
    
    template<typename requestT>
    BusyThread<requestT>::~BusyThread()
    {
      m_queue->SetThreadStatus(m_threadId, false);
      m_queue = nullptr;
    }
    
    int main(void)
    {
      srand(time(nullptr));
    
      MultiThreadEngine<Request> engine(11);
      engine.Run();
    
      while (true)
      {
        try
        {
          std::this_thread::sleep_for(std::chrono::milliseconds(250));
    
          int value = GetRand(16);
          Print("Enqueuing request with value {0}", value);
          Request request(value);
          engine.EnqueueRequest(request);
        }
        catch (std::exception& ex)
        {
          Print("Exception caught: {0}", ex.what());
        }
      }
    
      return 0;
    }
    so the question is: why does one thread never wake up? have I found a bug, or am I doing something wrong?

    I'm also very much open to suggestions of how to improve the design. I'm relatively new to the concept of a pool of worker threads, so this implementation may be extremely primitive or naive.

  2. #2
    Registered User
    Join Date
    Oct 2006
    Posts
    2,407
    I figured it out.

    line 251 should read
    Code:
    BusyThread<requestT> busyThread(this, id);
    it seems the temporary object was being destroyed right away, marking the thread as available, even though it wasn't.

Popular pages Recent additions subscribe to a feed

Similar Threads

  1. Replies: 11
    Last Post: 06-09-2010, 07:31 PM
  2. thread pool approach
    By fguy817817 in forum C Programming
    Replies: 3
    Last Post: 11-04-2009, 11:59 AM
  3. Thread Pool libraries or examples
    By Mastadex in forum Windows Programming
    Replies: 6
    Last Post: 08-24-2008, 08:58 PM
  4. Thread pool
    By mdoland in forum C# Programming
    Replies: 3
    Last Post: 10-19-2007, 06:41 AM
  5. Thread Pool server
    By rotis23 in forum Linux Programming
    Replies: 1
    Last Post: 02-25-2004, 07:11 AM

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21