I wanted to submit some code to demonstrate a few concepts, including the "on topic" ones.
For those not familiar with the Win32 API, any unfamiliar functions can be looked up on MSDN (and forgive the Win32 specific code on the c++ board).
The Op's initial post describes a classic producer/consumer scenario that is typical in a multi-threaded application.
The following code shows how the std::deque container can be used as the point of access for multiple producer and consumer threads. It also demonstrates simple encapsulation of synchronization objects to make MT programming a little easier.
I am using a Thread class that I posted previously, but I thought it would be more appropriate since the isn't the Windows board.
Question comments welcome. Feel free to ask if anything needs elabaration.
Code:
#include <iostream>
#include <sstream>
#include <string>
#include <deque>
#include <exception>
using namespace std;
// code from this thread.
#include "thread.h"
#include <windows.h>
//-----------------------------------------------------------------------------
// C++ wrapper object for CRITICAL_SECTION
class CriticalSection
{
CRITICAL_SECTION m_cs;
public:
CriticalSection() {InitializeCriticalSection(&m_cs);}
~CriticalSection() {DeleteCriticalSection(&m_cs);}
operator CRITICAL_SECTION*() {return &m_cs;}
void Enter() {EnterCriticalSection(&m_cs);}
void Leave() {LeaveCriticalSection(&m_cs);}
};//CriticalSection
//-----------------------------------------------------------------------------
// Automatic Enter() of a CriticalSection in the contructor, and Leave() in the
// destructor
class AutoCSLock
{
CriticalSection *m_cs;
public:
explicit AutoCSLock(CriticalSection *cs) : m_cs(cs) {m_cs->Enter();}
~AutoCSLock() {m_cs->Leave();}
};//CSLock
//-----------------------------------------------------------------------------
// Thread safe queue object for producer consumer threads
template <class T>
class MTqueue
{
deque<T> m_q;
CriticalSection m_cs; // for thread safe access to m_q
HANDLE m_ev; // event to signal consumers to come and get it
public:
MTqueue() {m_ev = CreateEvent(NULL, TRUE, FALSE, NULL);}
~MTqueue() {CloseHandle(m_ev);}
HANDLE consumer_event() {return m_ev;}
void en_que(const T &val)
{
AutoCSLock lock(&m_cs);
m_q.push_back(val);
SetEvent(m_ev); // something to consume
}//en_que
// this shuold only be called by the owner of consumer_event()
void de_que(T &val)
{
AutoCSLock lock(&m_cs);
typename deque<T>::size_type sz = m_q.size();
if (sz == 0)
{
cerr << "MTqueue::de_que() on empyt queue!!" << endl;
throw bad_exception();
}//if
val = m_q.back();
m_q.pop_back();
if (sz == 1)
ResetEvent(m_ev); // m_q is empty now
}//de_que
};//MTqueue
//-----------------------------------------------------------------------------
DWORD producer_thread(Thread *t); // implemented below
//-----------------------------------------------------------------------------
int main()
{
// a message queue
MTqueue<string> msg_q;
// a thread object to run our producer_thread() function
Thread t(&producer_thread);
cout << "Starting producer thread...\n" << endl;
t.SetThreadData(&msg_q);
t.Start();
// now we can start consuming messages
int n = 0;
HANDLE consume_ev = msg_q.consumer_event();
string msg;
while (1)
{
if (WaitForSingleObject(consume_ev, INFINITE) != WAIT_OBJECT_0)
break;
n++;
msg_q.de_que(msg);
cout << msg << endl;
if (n == 10)
break;
}//while
cout << "\nStopping producer thread" << endl;
t.Stop(INFINITE);
return 0;
}//main
//-----------------------------------------------------------------------------
// NOTE: remove the "WINAPI" calling convention from ThreadProc_t typedef used
// by the Thread class - it serves no purpose.
DWORD producer_thread(Thread *t)
{
MTqueue<string> *q = (MTqueue<string>*)t->GetThreadData();
// wait a second on the exit event, then produce something
int n = 1;
ostringstream next_msg;
HANDLE exit_ev = t->GetExitEvent();
while (1)
{
// if this doesn't timeout, then exit
if (WaitForSingleObject(exit_ev, 1000) != WAIT_TIMEOUT)
break;
next_msg << "producing message #" << n++;
q->en_que(next_msg.str());
next_msg.seekp(0); // seek to beginning of stream
}//while
cout << "Producer thread exiting" << endl;
return 0;
}//producer_thread
//-----------------------------------------------------------------------------
gg