Code:
/*
* A program to automatically manage the number of worker threads in response
* to changing demand on the system. This program automatically spawns new
* threads to process requests in the event that no worker thread is available,
* and automatically terminates threads if they have been idle through ten
* consecutive wakeups.
*/
#ifndef _GLIBCXX_USE_NANOSLEEP
#define _GLIBCXX_USE_NANOSLEEP
#endif
#include <iostream>
#include <sstream>
#include <deque>
#include <thread>
#include <stdexcept>
#include <memory>
#include <map>
#include <functional>
// uncomment the next line if StringFormat.h is available
//#define USE_GOOD_STRFORMAT
#ifdef USE_GOOD_STRFORMAT
// StringFormat.h contains the definition for the StrFormat function.
// StrFormat behaves similar to System.String.Format() in the .Net Framework
// and is declared as
// template<typename ... Args>
// std::string StrFormat(const std::string& fmt, Args ... args)
#include "StringFormat.h"
#else
template<typename ... Args>
std::string StrFormat(const std::string& fmt, Args ... args)
{
return fmt;
}
#endif
// thread-safe version of rand() that allows a maximum and minimum
std::mutex randMutex;
auto GetRand(int max = 0, int min = 0) -> decltype(rand())
{
std::lock_guard<std::mutex> lock(randMutex);
auto r = rand();
if (min)
{
if (max)
{
r = (r % ((max + 1) - min)) + min;
}
else
{
r += min;
}
}
else
{
if (max)
{
r = r % (max + 1);
}
}
return r;
}
// thread-safe formatted access to standard output
std::mutex coutMutex;
template<typename... Args>
void Print(Args... args)
{
std::string s = StrFormat(args...);
std::lock_guard<std::mutex> lock(coutMutex);
std::cout << s << std::endl;
}
double TimespecToDouble(struct timespec& ts)
{
const long long NSEC_IN_ONE_SECOND = 1000000000;
double res = ts.tv_sec;
res *= NSEC_IN_ONE_SECOND;
res += ts.tv_nsec;
res /= NSEC_IN_ONE_SECOND;
return res;
}
class Request
{
public:
Request(int value) : m_value(value)
{
clock_gettime(CLOCK_REALTIME, &ts);
}
void operator () ()
{
struct timespec now;
clock_gettime(CLOCK_REALTIME, &now);
double currentTime = TimespecToDouble(now);
double queuedTime = TimespecToDouble(this->ts);
// print the value of the request and the time it spent in the queue,
// to the nearest microsecond (6 digits of precision)
Print("Request with value {0} running after spending {1:6} seconds in the queue.", this->m_value, currentTime - queuedTime);
}
int m_value;
struct timespec ts;
};
class ThreadStatus
{
public:
ThreadStatus(int threadid, std::thread* threadptr, bool busy) : id(threadid), thread(threadptr), isBusy(busy) { }
int id;
std::shared_ptr<std::thread> thread;
bool isBusy;
};
std::ostream& operator << (std::ostream& os, const Request& rhs)
{
os << "{ " << rhs.m_value << " }";
return os;
}
template<typename T>
std::ostream& operator << (std::ostream& os, const std::deque<T>& rhs)
{
os << "{ ";
int i = 0;
for (auto it : rhs)
{
if (i) os << ", ";
os << it;
++i;
}
os << " }";
return os;
}
template<typename requestT>
class MultiThreadEngine;
template<typename requestT>
class BusyThread
{
MultiThreadEngine<requestT>* m_queue;
int m_threadId;
public:
BusyThread() = delete;
BusyThread(const BusyThread&) = delete;
BusyThread(BusyThread&&) = delete;
BusyThread(MultiThreadEngine<requestT>* queue, int threadId);
~BusyThread();
};
template<typename requestT>
class MultiThreadEngine
{
private:
// private types
typedef std::recursive_mutex mutexType;
typedef MultiThreadEngine<requestT> thisType;
// friend classes
friend class BusyThread<requestT>;
// private functions
// notify all threads waiting on the queue
void Notify()
{
m_conVar.notify_all();
}
std::string QueueToString()
{
std::stringstream ss;
ss << m_queue;
return ss.str();
}
// update the specified thread's availability status
void SetThreadStatus(int threadId, bool isBusy)
{
std::unique_lock<mutexType> lock(m_mutex);
Print("Setting thread {0} status to {1}", threadId, isBusy ? "busy" : "available");
if (m_threadList.find(threadId) != m_threadList.end())
{
m_threadList.at(threadId).isBusy = isBusy;
}
}
// if a thread is available to handle a request, return true
// else return false
bool ThreadsAreAvailable()
{
std::unique_lock<mutexType> lock(m_mutex);
for (auto& it : m_threadList)
{
if (!it.second.isBusy)
{
Print("Thread {0} is available.", it.first);
return true;
}
}
return false;
}
// wait for the queue to have a request added
void Wait()
{
std::unique_lock<mutexType> lock(m_mutex);
m_conVar.wait(lock);
}
// get and remove the first request in the queue
std::unique_ptr<requestT> DequeueRequest()
{
std::unique_lock<mutexType> lock(m_mutex);
if (m_queue.empty())
{
return std::unique_ptr<requestT>();
}
std::unique_ptr<requestT> r(new requestT(m_queue.front()));
m_queue.pop_front();
Print("Queue: {0}", QueueToString());
return r;
}
// the procedure that handles the heavy lifting for each thread
void ThreadProc(int id)
{
int idleCount = 0;
Print("Starting thread {0}", id);
while (true)
{
try
{
Wait();
std::unique_ptr<requestT> request(DequeueRequest());
if (request)
{
idleCount = 0;
BusyThread<requestT>(this, id);
Print("Waking up thread {0}", id);
// tell the request to process
(*request)();
std::this_thread::sleep_for(std::chrono::seconds(10));
}
else
{
++idleCount;
if (idleCount > 10)
{
std::unique_lock<mutexType> lock(m_mutex);
if (m_threadList.size() > m_numThreads && m_threadList.find(id) != m_threadList.end())
{
Print("Terminating thread {0} due to idle timeout. There are now {1} threads.", id, m_threadList.size() - 1);
m_threadList.at(id).thread->detach();
m_threadList.erase(m_threadList.find(id));
}
return;
}
}
}
catch (std::exception& ex)
{
Print("Thread {0} exception caught: {1}", id, ex.what());
}
}
}
// thread-safe function spawn a new thread
void SpawnThread()
{
std::unique_lock<mutexType> lock(m_mutex);
int i = 0;
if (! m_threadList.empty())
{
i = m_threadList.rbegin()->second.id + 1;
}
Print("Spawning thread with id {0}. There are now {1} threads.", i, m_threadList.size() + 1);
std::thread* thread = new std::thread(std::bind(&MultiThreadEngine<requestT>::ThreadProc, this, std::placeholders::_1), i);
m_threadList.insert(std::make_pair(i, std::move(ThreadStatus(i, thread, false))));
}
// private member data
std::deque<requestT> m_queue;
std::map<int, ThreadStatus> m_threadList;
mutexType m_mutex;
std::condition_variable_any m_conVar;
size_t m_numThreads;
public:
MultiThreadEngine(size_t numThreads = 16) : m_numThreads(numThreads)
{
}
// add a request to the end of the queue
void EnqueueRequest(const requestT& request)
{
std::unique_lock<mutexType> lock(m_mutex);
m_queue.push_back(request);
Print("Queue: {0}", QueueToString());
if (!ThreadsAreAvailable())
{
Print("No threads are available. Spawning a new one.");
SpawnThread();
}
Notify();
}
// start the multithreaded request processing engine
void Run()
{
for (size_t i = 0; i < m_numThreads; ++i)
{
SpawnThread();
}
}
};
template<typename requestT>
BusyThread<requestT>::BusyThread(MultiThreadEngine<requestT>* queue, int threadId) : m_queue(queue), m_threadId(threadId)
{
m_queue->SetThreadStatus(m_threadId, true);
}
template<typename requestT>
BusyThread<requestT>::~BusyThread()
{
m_queue->SetThreadStatus(m_threadId, false);
m_queue = nullptr;
}
int main(void)
{
srand(time(nullptr));
MultiThreadEngine<Request> engine(11);
engine.Run();
while (true)
{
try
{
std::this_thread::sleep_for(std::chrono::milliseconds(250));
int value = GetRand(16);
Print("Enqueuing request with value {0}", value);
Request request(value);
engine.EnqueueRequest(request);
}
catch (std::exception& ex)
{
Print("Exception caught: {0}", ex.what());
}
}
return 0;
}
so the question is: why does one thread never wake up? have I found a bug, or am I doing something wrong?