Hello everyone,
I recently wrote a small function class that allows a user to perform oft-needed operations between threads using minimal code. In the actual application for which I'm using this class, I have a bug. My efforts to reproduce this bug in other, simpler codes have failed. The essence of the thing, however, seems to be simple; but apparently not.
You create a instance of this class, sync_func, with the main thread. Then n other threads may call the class operator(), each time feeding it some information. Once the n'th thread has come through (as the former threads wait), the function returns the result to each thread. The last thread cleans up so that it works again the next time.
I'm using this class to get the sum of a bunch of numbers across threads. Each thread is the operator() of an instance of class task. main() creates a class method, which owns a class sync_func. Each thread has a reference to the method, which allows them to call the sync_func.
In the below code, it all works. After that is my real code, which does not work. The problem is that the sync_func is returning 'nan' to all host threads, even though no calculation leading to it yields nan. Somewhere between the return statement and the assignment at the calling line, things go awry.
Any help that can be offered is much appreciated. If you need any more information, let me know.
The header, sync_func.hpp
Code:
#include <boost/thread/mutex.hpp>
#include <boost/thread/barrier.hpp>
template<class T>
struct do_nothing { void operator()(T & t) {} };
template<class T> //static variable, thread-specific variable
struct thread_max { void operator()(T & stat,const T & thrd) { if(thrd > stat) stat = thrd; } };
template<class T>
struct thread_min { void operator()(T & stat,const T & thrd) { if(thrd < stat) stat = thrd; } };
template<class T>
struct thread_sum { void operator()(T & stat,const T & thrd) { stat += thrd; } };
template<class T, class T_binary_ref_func, class T_unary_once_func = do_nothing<T> >
class sync_func
{
T working;
const T starting_value;
T returnable;
T_binary_ref_func func; //func( working, thread_piece );
T_unary_once_func at_end; //at_end( working );
boost::mutex moo;
boost::barrier bar;
unsigned int n_threads;
unsigned int counter;
public:
sync_func(unsigned int _n_threads, const T & _starting_value,
const T_unary_once_func & _at_end = T_unary_once_func(), const T_binary_ref_func & _func = T_binary_ref_func())
: bar(_n_threads), starting_value(_starting_value), n_threads(_n_threads), counter(0),
working(_starting_value), at_end(_at_end), func(_func) {}
T operator()(const T & thread_piece)
{
{
boost::mutex::scoped_lock lock(moo);
func( working, thread_piece );
if(++counter == n_threads)
{
at_end( working );
returnable = working;
working = starting_value;
counter = 0;
}
}
bar.wait();
return returnable;
}
};
My toy program, to demonstrate how I'm using it:
Code:
#include <boost/thread.hpp>
#include <iostream>
boost::mutex cout_mutex;
struct method
{
typedef sync_func< double, thread_sum<double> > sync_sum;
method(unsigned nthreads, boost::barrier & b) : tfun(nthreads, 0), bar(b) {}
void do_stuff(unsigned thread_id)
{
for(int i = 0; i < 2; ++i)
{
double sum = tfun( 1.0 / (1+thread_id) );
{
boost::mutex::scoped_lock lock(cout_mutex);
std::cout << "Thread " << thread_id << " got a sum of " << sum
<< " on iteration " << i << std::endl;
}
bar.wait();
}
}
private:
method(method & m);
method(const method & m);
sync_sum tfun;
boost::barrier & bar;
};
class task
{
unsigned int thread_id;
unsigned int n;
method & m;
public:
task( unsigned int id, unsigned int _n, boost::barrier & b, method & _m )
: thread_id(id), n(_n), m(_m) {}
void operator()()
{
for(int i = 0; i < 2; ++i)
m.do_stuff(thread_id);
}
};
int main()
{
boost::thread_group tg;
unsigned int n;
std::cout << "How many threads? ";
std::cin >> n;
boost::barrier bar(n);
typedef sync_func< double, thread_sum<double> > sync_sum;
sync_sum tsum( n, 0.0 );
method meth(n, bar);
for(unsigned int i = 0; i < n; ++i)
tg.create_thread( task(i, n, bar, meth) );
tg.join_all();
}
Example output:
Code:
How many threads? 2
Thread 0 got a sum of 1.5 on iteration 0
Thread 1 got a sum of 1.5 on iteration 0
Thread 0 got a sum of 1.5 on iteration 1
Thread 1 got a sum of 1.5 on iteration 1
Thread 0 got a sum of 1.5 on iteration 0
Thread 1 got a sum of 1.5 on iteration 0
Thread 0 got a sum of 1.5 on iteration 1
Thread 1 got a sum of 1.5 on iteration 1
And now, the actual code that fails
Code:
int main()
{
// . . .
mode_method* mmeth = new . . .
boost::thread_group tg;
for(unsigned i = 0; i < nthreads; ++i)
tg.create_thread(simulator(*tmeth,*mmeth,g,lap,tmeth->region(i),tmeth->boundary(i),settings::time_steps,settings::dt) );
tg.join_all();
// . . .
}
// . . . .
void simulator::operator()() //Here is the simulation
{
for(uword i = 0; i < n; ++i) // n not related to nthreads
{
// . . . .
mmeth.subtract(reg);
// . . . . .
}
}
// . . . .
class high_mode : public mode_method
{
thread_method & tmeth; //actual object is instantiated by main(), just like *this
public:
void subtract(grid::region & reg)
{
functions::inner_product ip( oldm ); //thread_piece
newm.iterate_region(reg, ip);
grid::data_type inner_prod = tmeth.get_sum(ip.value());
//ip.value() always well-defined but inner_prod always nan
{
boost::mutex::scoped_lock lock(debug_mutex);
std::cout << "thread " << reg.front().x << " has ip " << ip.value()
<< " and total ip " << inner_prod << std::endl;
}
}
};
//. . . .
class multi_thread : public thread_method
{
// . . .
sync_func< grid::data_type, thread_sum<grid::data_type> > sync_sum;
public:
grid::data_type get_sum(const grid::data_type & piece) { sync_sum(piece); }
// . . .
};
If you've come this far, you truly are a wild goose-chaser.