Code:
#include <windows.h>
#include <iostream>
#include <deque>
#include <vector>
#include <string>
#include <stdexcept>
//------------------------------------------------------------------------------
struct no_copy
{
protected:
no_copy() {}
~no_copy() {}
private:
no_copy(const no_copy&);
const no_copy& operator=(const no_copy&);
};//no_copy
//------------------------------------------------------------------------------
class scoped_HANDLE : public no_copy
{
HANDLE m_h;
public:
explicit scoped_HANDLE(HANDLE h) : m_h(h) {}
~scoped_HANDLE() {if (m_h && m_h != INVALID_HANDLE_VALUE) ::CloseHandle(m_h);}
};//scoped_HANDLE
//------------------------------------------------------------------------------
class Event : public no_copy
{
HANDLE m_handle;
public:
Event() : m_handle(0) {}
~Event() {Close();}
operator HANDLE() {return m_handle;}
void Close() {if (m_handle) ::CloseHandle(m_handle); m_handle = 0;}
bool Create(bool manualReset = false, bool initialOwner = false,
const wchar_t *name = 0, LPSECURITY_ATTRIBUTES sa = 0)
{
m_handle = ::CreateEventW(sa, manualReset, initialOwner, name);
return m_handle != 0;
}//Create
bool Signal() {return ::SetEvent(m_handle) != FALSE;}
bool Reset() {return ::ResetEvent(m_handle) != FALSE;}
};//Event
//------------------------------------------------------------------------------
template<class sync_t>
class scoped_lock : public no_copy
{
sync_t &m_sync;
public:
explicit scoped_lock(sync_t &s) : m_sync(s) {m_sync.Acquire();}
~scoped_lock() {m_sync.Release();}
};//scoped_lock
//------------------------------------------------------------------------------
class CriticalSection : public no_copy
{
CRITICAL_SECTION cs;
public:
typedef scoped_lock<CriticalSection> scoped_lock;
CriticalSection() {::InitializeCriticalSection(&cs);}
~CriticalSection() {::DeleteCriticalSection(&cs);}
void Acquire() {::EnterCriticalSection(&cs);}
void Release() {::LeaveCriticalSection(&cs);}
};//CriticalSection
//------------------------------------------------------------------------------
template <typename T>
class queue_MT
{
mutable CriticalSection m_cs;
mutable Event m_consumerEvent;
typedef CriticalSection::scoped_lock lock;
std::deque<T> m_queue;
public:
queue_MT()
{
if (!m_consumerEvent.Create(true)) // manual reset
throw std::runtime_error("queue_MT: failed to create event");
}//constructor
HANDLE consumer_event() const {return m_consumerEvent;}
void enqueue(const T& val)
{
lock l(m_cs);
m_queue.push_front(val);
m_consumerEvent.Signal();
}//enqueue
template<class It>
void enqueue(It first, It last)
{
lock l(m_cs);
m_queue.insert(m_queue.begin(), first, last);
m_consumerEvent.Signal();
}//enqueue
bool dequeue(T &val)
{
lock l(m_cs);
if (m_queue.size() == 0)
return false;
val = m_queue.back();
m_queue.pop_back();
if (m_queue.size() == 0)
m_consumerEvent.Reset();
return true;
}//dequeue
void dequeue_all(std::deque<T> &q)
{
lock l(m_cs);
m_queue.swap(q);
m_queue.clear();
m_consumerEvent.Reset();
}//dequeue_all
};//class queue_MT
//------------------------------------------------------------------------------
bool EnablePrivilege(LPCTSTR name)
{
HANDLE hToken;
TOKEN_PRIVILEGES tkp;
tkp.PrivilegeCount = 1;
tkp.Privileges[0].Attributes = SE_PRIVILEGE_ENABLED;
if (!OpenProcessToken(GetCurrentProcess(),
TOKEN_ADJUST_PRIVILEGES | TOKEN_QUERY,
&hToken) ||
!LookupPrivilegeValue(0, name, &tkp.Privileges[0].Luid) ||
!AdjustTokenPrivileges(hToken, FALSE, &tkp, 0, 0, 0))
{
return false;
}//if
return true;
}//EnablePrivilege
//------------------------------------------------------------------------------
struct SharedData : public no_copy
{
std::wstring m_rootpath; // producer only
Event m_exit;
queue_MT<std::wstring> m_q;
SharedData()
{
if (!m_exit.Create(true)) // manual reset
throw std::runtime_error("SharedData: failed to create event");
}//constructor
};//SharedData
//------------------------------------------------------------------------------
//------------------------------------------------------------------------------
using namespace std;
//------------------------------------------------------------------------------
DWORD WINAPI ConsumerThread(void *p)
{
SharedData *psd = reinterpret_cast<SharedData*>(p);
const DWORD num_objs = 2;
const HANDLE wait_objs[] = {psd->m_exit, psd->m_q.consumer_event()};
enum
{
WO_EXIT = WAIT_OBJECT_0 + 0,
WO_CONSUME = WAIT_OBJECT_0 + 1,
};
for (;;)
{
DWORD status = WaitForMultipleObjects(num_objs, wait_objs,
FALSE, INFINITE);
if (status == WO_EXIT)
break;
if (status == WO_CONSUME)
{
deque<wstring> work_q;
psd->m_q.dequeue_all(work_q);
deque<wstring>::const_iterator it = work_q.begin(),
it_end = work_q.end();
for (; it != it_end; ++it)
{
wcout << *it << endl;
if (*it == L"exit")
psd->m_exit.Signal();
}//for
}//if
else
{
wcerr << L"Consumer: Unexpected WFMO result, " << status << endl;
psd->m_exit.Signal();
return 1;
}//else
}//for
return 0;
}//ConsumerThread
//------------------------------------------------------------------------------
DWORD WINAPI ProducerThread(void *p)
{
SharedData *psd = reinterpret_cast<SharedData*>(p);
vector<wstring> work_q;
HANDLE hDir =
CreateFileW(psd->m_rootpath.c_str(),
FILE_LIST_DIRECTORY,
FILE_SHARE_READ | FILE_SHARE_WRITE,
0, OPEN_EXISTING,
FILE_FLAG_BACKUP_SEMANTICS | FILE_FLAG_OVERLAPPED, 0);
if (!hDir)
{
wcerr << L"CreateFile failed on " << psd->m_rootpath << L", LE = "
<< GetLastError() << endl;
psd->m_exit.Signal();
return 1;
}//if
scoped_HANDLE sh(hDir);
OVERLAPPED os = {0};
Event osev;
if (!osev.Create(true))
{
psd->m_exit.Signal();
return 1;
}//if
os.hEvent = osev;
union
{
FILE_NOTIFY_INFORMATION fni;
char buff[1024 * 4];
FILE_NOTIFY_INFORMATION* NextEntry(FILE_NOTIFY_INFORMATION *pfni)
{
if (!pfni->NextEntryOffset)
return 0;
char *p = reinterpret_cast<char*>(pfni) + pfni->NextEntryOffset;
return reinterpret_cast<FILE_NOTIFY_INFORMATION*>(p);
}//NextEntry
} u;
if (!ReadDirectoryChangesW(hDir, &u, sizeof(u), TRUE,
FILE_NOTIFY_CHANGE_FILE_NAME, 0, &os, 0))
{
wcerr << L"ReadDirectoryChangesW failed, LE = "
<< GetLastError() << endl;
psd->m_exit.Signal();
return 1;
}//if
const DWORD num_objs = 2;
const HANDLE wait_objs[] = {psd->m_exit, osev};
enum
{
WO_EXIT = WAIT_OBJECT_0 + 0,
WO_DIR = WAIT_OBJECT_0 + 1,
};
DWORD timeout = INFINITE;
const DWORD window = 1000;
DWORD nread;
for (;;)
{
DWORD status = WaitForMultipleObjects(num_objs, wait_objs,
FALSE, timeout);
if (status == WO_EXIT)
{
CancelIo(hDir);
GetOverlappedResult(hDir, &os, &nread, TRUE);
break;
}//if
if (status == WO_DIR)
{
if (!GetOverlappedResult(hDir, &os, &nread, TRUE))
{
wcerr << L"GetOverlappedResult failed, LE = "
<< GetLastError() << endl;
psd->m_exit.Signal();
return 1;
}//if
FILE_NOTIFY_INFORMATION *pfni = &u.fni;
for (; pfni; pfni = u.NextEntry(pfni))
{
work_q.push_back(wstring());
work_q.back().assign(
pfni->FileName,
pfni->FileName + (pfni->FileNameLength / 2));
}//for
if (!ReadDirectoryChangesW(hDir, &u, sizeof(u), TRUE,
FILE_NOTIFY_CHANGE_FILE_NAME, 0, &os, 0))
{
wcerr << L"ReadDirectoryChangesW failed, LE = "
<< GetLastError() << endl;
psd->m_exit.Signal();
return 1;
}//if
timeout = window;
}//if
else if (status == WAIT_TIMEOUT)
{
psd->m_q.enqueue(work_q.begin(), work_q.end());
work_q.clear();
timeout = INFINITE;
}//else if
else
{
wcerr << L"Producer: Unexpected WFMO result, " << status << endl;
psd->m_exit.Signal();
return 1;
}//else
}//for
return 0;
}//ProducerThread
//------------------------------------------------------------------------------
int main()
{
if (!EnablePrivilege(SE_DEBUG_NAME) ||
!EnablePrivilege(SE_RESTORE_NAME))
return 1;
SharedData data;
data.m_rootpath = L"C:\\temp";
HANDLE hCon = CreateThread(0, 0, &ConsumerThread, &data, 0, 0);
if (!hCon)
return 1;
HANDLE hProd = CreateThread(0, 0, &ProducerThread, &data, 0, 0);
if (!hProd)
{
data.m_exit.Signal();
WaitForSingleObject(hCon, INFINITE);
return 1;
}//if
const DWORD num_objs = 2;
const HANDLE wait_objs[num_objs] = {hCon, hProd};
WaitForMultipleObjects(num_objs, wait_objs, TRUE, INFINITE);
cout << "Done" << endl;
return 0;
}//main
Need to make sure your buffer is DWORD aligned.