Thread: Probably very easy question about pthread mutexes

  1. #1
    Registered User
    Join Date
    Jun 2009
    Posts
    101

    Probably very easy question about pthread mutexes

    I am working on a multi-threaded app that converts files. I want it to launch a new thread for each conversion, up to the available number of CPU cores. To keep track of how many threads are already running, I keep a global variable that is decremented by each thread once it's finished. To make sure there aren't any problems by more than one thread trying to update this variable simultaneously, I use a mutex. However, I'm not exactly sure how it works.

    I assume that when creating a mutex, the common pthread_mutex_t variable must be seen by any part of the code that attempts to read/write the variable in question. So, what I've done is initialized the mutex on the main thread, then passed a pointer to it to each of the threads. The code isn't comprehensive, but hopefully you'll get what I'm trying to do:

    Code:
    #define MAX_THREADS 4
    int counter; //thread counter
    
    void *threadFunction(void *mutex){
    
    	pthread_mutex_t *mutex_t = mutex;
    
    	//do thread stuff
    
    	//decrement counter
    	pthread_mutex_lock(&(*mutex_t));
    	counter--;
    	pthread_mutex_unlock(&(*mutex_t));
    }
    
    int main(){
    	counter = 0;
    	pthread_mutex_t mutex;
    	pthread_t thread_id = 1;
    	pthread_mutex_init(&mutex, NULL);
    
    	pthread_create(&thread_id, NULL, threadFunction, (void*)&mutex);
    	counter++;
    
    	//check counter to see if more threads can start
    	pthread_mutex_lock(&mutex);
    	if(counter>MAX_THREADS){
    		//wait code
    	}
    	pthread_mutex_destroy(&mutex);
    	return 0;
    }
    Does this make sense?

  2. #2
    spurious conceit MK27's Avatar
    Join Date
    Jul 2008
    Location
    segmentation fault
    Posts
    8,300
    Quote Originally Posted by synthetix View Post
    Does this make sense?
    It's okay except for right here (and a design flaw I'll get to after that):

    Code:
        pthread_create(&thread_id, NULL, threadFunction, (void*)&mutex);
        counter++;
    If the variable is to be used by various threads in a mutex locked code section, then that applies to ALL threads, including the first one. It applies to both writes and reads. Eg, if some thread can potentially change "counter", then "counter" should not even be read in main() subsequent to the thread's initialization without locking first.

    Also remember: a mutex should never be unlocked by any thread other than the one which locked it; otherwise it is pointless (and the behaviour is undefined). That creates certain limitations. The way you use the lock in main() is actually prone to a catch-22 because of this, qv this thread:

    Communication/Synchronization between threads


    The point is that altho the lock saves you from the race condition, it does not save you from one thread or another acquiring the same lock repeatedly when called in a loop (and it looks like that is what you have to do in main).*

    So: having the first thread fire threads whenever one is needed (meaning, the first thread must keep trying to acquire the same lock to check) is not the best idea. Instead, you launch your threads as needed upto whatever limit (number of cores) and have those threads wait on a condition variable (see me & codeplug's posts in the thing above). In that case, "counter" does not have to be global nor does it have to be accessible by the other threads; main() keeps track. And the other threads, rather than constantly respawning when they are done and there is more work to do, persist and wait for a condition to be met (more work). This is called a thread pool. It is slightly more complicated than what you have now but AFAIK, it is, in fact, the only truly viable option for stuff like this.

    To reiterate: don't launch a temporary thread to perform a temporary task and limit the number of concurrent threads. Limit the number of concurrent threads, and launch them as needed, but then re-use your existing threads via some IPC mechanism (such as a condition variable); this is much less processor work than creating new threads for each task.

    Depending on the nature of the whole thing, you may want threads to die sometimes (eg, if the work queue is totally empty), so you might want to use "counter" globally that way. But they are still better if they have an indeterminate lifetime (until no tasks remain) than a fixed one (just one task then die). Get it?

    * the POSIX specs says that "If there are threads blocked on the mutex object referenced by mutex when pthread_mutex_unlock() is called, resulting in the mutex becoming available, the scheduling policy shall determine which thread shall acquire the mutex." What may or may not be obvious is that if the thread which just released the lock continues to run (because of "scheduling policy") and then immediately reiterates a loop calling mutex_lock again, it will acquire the lock it just released, and can do indefinitely. YMMV WRT how likely or frequent that is, but the point is: mutex_unlock does not actively call some function wherein waiting threads are called in turn immediately. It simply releases the lock. The next thread to run with a mutex_lock call will get it, and that depends on the system scheduler (which is a separate beast to the pthreads implementation); it could just as easily be the same thread as any other (it might even be more likely if the loop is tight, and there are not many processes running).
    Last edited by MK27; 11-03-2011 at 01:44 PM.
    C programming resources:
    GNU C Function and Macro Index -- glibc reference manual
    The C Book -- nice online learner guide
    Current ISO draft standard
    CCAN -- new CPAN like open source library repository
    3 (different) GNU debugger tutorials: #1 -- #2 -- #3
    cpwiki -- our wiki on sourceforge

  3. #3
    Registered User Codeplug's Avatar
    Join Date
    Mar 2003
    Posts
    4,981
    >> I am working on a multi-threaded app that converts files.
    Is the conversion primarily "CPU bound" or "I/O bound"? I/O bound means that most of the time is spent reading/writing from/to the disk. If that's the case, then it may be just as fast to use a single thread to do all the conversions sequentially.

    >> pthread_t thread_id = 1;
    pthread_t is not guaranteed to be an arithmetic type. It could be a struct for example. That's why there's pthread_equal() for comparing two pthread_t's.

    You should also call pthread_join on any threads that haven't been explicitly "detached".

    gg

  4. #4
    Registered User
    Join Date
    Jun 2009
    Posts
    101
    Thanks. I rewrote my program using CodePlug's example in post #10 in the link you posted. However, with that code it seems as though no two threads can ever concurrently process data because they lock the entire time they are working, which locks everything else until each thread is complete since everything is referencing the same mutex. That means only one thread works at a time, no matter how many are spawned. This is the result I am getting with my program.

    I only need a filename variable from the producer thread, so I tried unlocking each thread after they acquire the filename. This works well, but once the loop in the producer is complete, the program quits before waiting for the rest of the threads to complete. This happens because those threads freed their locks very early, after they got their filenames. The real number crunching takes a little time so of course the producer thread quits immediately, terminating the worker threads while they're in the middle of crunching.

    I also don't understand the mutex in the producer thread. The lock is outside of the loop, and gets unlocked by the first thread that's spawned. So, it seems useless since anything useful is only done while the producer loop is running. Why are they there?

    Thanks for your help. This is working pretty well, I just need to get these last few problems worked out!

  5. #5
    Officially An Architect brewbuck's Avatar
    Join Date
    Mar 2007
    Location
    Portland, OR
    Posts
    7,396
    Since threads need to be joined, why do you need to keep track of which ones are running at all?

    Code:
    for (int i = 0; i < NTHREADS; ++i)
        thread[i] = pthread_create(...);
    
    // Workers are running, wait for them to finish
    
    for (int i = 0; i < NTHREADS; ++i)
        pthread_join(thread[i], NULL);
    The second loop will only terminate when all the threads have terminated.

    (Hint drop: Intel TBB. Use Google)
    Code:
    //try
    //{
    	if (a) do { f( b); } while(1);
    	else   do { f(!b); } while(1);
    //}

  6. #6
    Registered User
    Join Date
    Jun 2009
    Posts
    101
    Quote Originally Posted by synthetix View Post
    I only need a filename variable from the producer thread, so I tried unlocking each thread after they acquire the filename. This works well, but once the loop in the producer is complete, the program quits before waiting for the rest of the threads to complete. This happens because those threads freed their locks very early, after they got their filenames. The real number crunching takes a little time so of course the producer thread quits immediately, terminating the worker threads while they're in the middle of crunching.
    To deal with this, should I call pthread_join() for as many worker threads exist?

    Here:

    Code:
    for(i=0;i<num_threads;i++){
    	pthread_join(threads[i],NULL);
    }
    
    return 0; //end of main
    Actually, I just tried this, and it doesn't work since each thread is running an endless while() loop. Should I add a condition to the while()?
    Last edited by synthetix; 11-03-2011 at 07:05 PM.

  7. #7
    spurious conceit MK27's Avatar
    Join Date
    Jul 2008
    Location
    segmentation fault
    Posts
    8,300
    Quote Originally Posted by synthetix View Post
    Thanks. I rewrote my program using CodePlug's example in post #10 in the link you posted. However, with that code it seems as though no two threads can ever concurrently process data because they lock the entire time they are working, which locks everything else until each thread is complete since everything is referencing the same mutex.
    Hmmm -- did you understand condition_wait()? It unlocks the mutex.

    However, you do have a point, because the concept of a queue is not in that example; the producer hands off a singular I/O task that must complete before it creates another one. But the example is just to demonstrate the function of the wait condition. Imagine moving the printf() outside the lock (by copying the locked data) and maybe you can see more the value of de-synchronizing these activities (rather than wait for each printf() to complete, you could move on and accumulate data asynchronously).

    So, instead, use a FIFO structure (aka, a queue) contained by a first condition; when the producer is done with whatever provides it input, it waits on that to add until nothing is left. Tautology: the specifics of threading are a lot about the specifics of the task. In your case, it might be that the producer should just populate this queue (if the tasks are a finite list), then launch the threads detached (in which case, you do not need this first of two conditions, you only need the next one..).

    After that, the detached worker threads all wait for the same condition -- that the queue can be accessed -- pull a task, and release the condition. Then they work on the task, outside of the locked state, hence, asynchronously.

    I also don't understand the mutex in the producer thread. The lock is outside of the loop, and gets unlocked by the first thread that's spawned.
    Nope. You are perhaps discussing mutex's and conditions as if they are synonymous, but they are not. Conditions depend upon and control mutexes.
    Last edited by MK27; 11-03-2011 at 07:50 PM.
    C programming resources:
    GNU C Function and Macro Index -- glibc reference manual
    The C Book -- nice online learner guide
    Current ISO draft standard
    CCAN -- new CPAN like open source library repository
    3 (different) GNU debugger tutorials: #1 -- #2 -- #3
    cpwiki -- our wiki on sourceforge

  8. #8
    Registered User
    Join Date
    Jun 2009
    Posts
    101
    Quote Originally Posted by MK27 View Post
    Hmmm -- did you understand condition_wait()? It unlocks the mutex.
    Yeah, but I figured out that the producer thread waits until bDataConsumed = 1, which isn't set until the worker thread completes. That's why it refuses to give out any more data until the previous thread finishes. That's why it works much better when bDataConsumed = 1 is set before the hard work starts in the worker thread.

    After that, the detached worker threads all wait for the same condition -- that the queue can be accessed -- pull a task, and release the condition. Then they work on the task, outside of the locked state, hence, asynchronously.
    That sounds like what I need. I have a file list, which is sent to the worker threads. They process files and write new files independently, so the only thing they need before releasing the list is the next filename. However, I need to ensure that two threads don't grab the same filename. It seems to be working the way it is now, but my only problem is that the producer thread doesn't wait for the worker threads to finish. It seems I need to put something in the thread to check to see if the end of the list is reached, so they will terminate.

    Nope. You are perhaps discussing mutex's and conditions as if they are synonymous, but they are not. Conditions depend upon and control mutexes.
    I think I understand it... I hope!

  9. #9
    Registered User Codeplug's Avatar
    Join Date
    Mar 2003
    Posts
    4,981
    >> but my only problem is that the producer thread doesn't wait for the worker threads to finish.
    Once main() returns, or exit() is called, all threads are toast - the process is done. It seems you've experienced this first hand

    >> Actually, I just tried this, and it doesn't work since each thread is running an endless while() loop. Should I add a condition to the while()?
    When there is nothing left to produce, and nothing left to consume, the threads should exit. There are many ways this could be coded up. The most straight forward way in the example is to incorporate the exit condition into the "bDataReady" and "bDataConsumed" predicates. In other words:
    Code:
    while (!bDataReady || bTimeToExit)
    {
        if (bTimeToExit)
        {
            pthread_mutex_unlock(&Lock);
            return NULL;
        }
        pthread_cond_wait(&DataReady, &Lock);
    }
    If the single producer thread controls when it's time to quit, then it should acquire the lock, set bTimeToExit, and call pthread_cond_broadcast() - to tell all consumers that it's time. And main() should be joining with all threads so that it doesn't return until all threads have returned.

    Notice that in my example, main() only joins with (waits for) the producer - which just produces 20 items before returning. The consumers are left to be killed with extreme prejudice (via return from main), which is not ideal. The example could be improved with an exit predicate, and joining on all threads.

    gg

  10. #10
    Registered User
    Join Date
    Jun 2009
    Posts
    101
    Quote Originally Posted by Codeplug View Post
    When there is nothing left to produce, and nothing left to consume, the threads should exit. There are many ways this could be coded up. The most straight forward way in the example is to incorporate the exit condition into the "bDataReady" and "bDataConsumed" predicates.
    Yes, but I want to set bDataConsumed well before the thread is actually finished, otherwise other threads won't be allowed to spawn. This is sort of a catch-22 because you want to give the producer the go ahead to release more data, but you also want it to wait for the threads to give the "all clear" before exiting (by setting bDataConsumed to 1). It works in your post #10 example because the threads set bDataConsumed = 1 after they're finished. That's why it doesn't matter that they get blown away after the producer thread exits and main completes. But it also means that all other threads are tied up until each individual thread completes. So really, it's always operating single-threaded no matter how many threads you spawn, because bDataConsumed is 0 until the thread completes, but the producer won't give out any more data until bDataConsumed is 1.

    What I did was add the pTimeToExit int that is set after pthread_join(producer,NULL). And then I added a for loop that calls pthread_join() on as many worker threads were spawned:

    Code:
    pthread_create(&producer, NULL, producerFunc, (void*)&params);
    pthread_join(producer, NULL);
    
    /*producer thread is finished, but we
    don't want to kill the program yet
    because that would terminate threads
    that may be in the middle of working.*/
    bTimeToExit = 1;
    	
    /*wait for worker threads to finish
    before killing the program. They
    will now quit on their own since
    bTimeToExit has been set to 1.*/
    for(i=0;i<num_threads;i++){
    	pthread_join(threads[i], NULL);
    }
    In each worker thread, I changed while(1) to while(!bTimeToExit).

    Is there a better way to do this?

  11. #11
    Registered User
    Join Date
    Jun 2009
    Posts
    101
    Quote Originally Posted by synthetix View Post
    Yes, but I want to set bDataConsumed well before the thread is actually finished, otherwise other threads won't be allowed to spawn.
    Sorry, I should have said "continue," not "spawn." I create all my threads at once and just re-use them, so I'm not actually spawning any new ones.

  12. #12
    Registered User Codeplug's Avatar
    Join Date
    Mar 2003
    Posts
    4,981
    >> but I want to set bDataConsumed well before the thread is actually finished
    Yes, that's what you should be doing. Just copy off the data, set/signal bDataConsumed, unlock, then process the data. Otherwise, the variable should be named "bDataConsumedAndProcessed" .

    >> bTimeToExit = 1;
    That's unsynchronized access since bTimeToExit is a shared variable. You have to acquire the lock, set bTimeToExit, broadcast the condition, release the lock.

    >> while (!bDataReady || bTimeToExit)
    This was an error on my part (post #9) since it allows consumers to exit while there is still data to consume.

    Here's an updated example - the way it should have been done the first time:
    Code:
    // compiled with: gcc -Wall -pthread -D_XOPEN_SOURCE=700 -std=c99 main.c
    #include <stdio.h>
    #include <stdlib.h>
    #include <stdarg.h>
    #include <pthread.h>
    #include <time.h>
    
    pthread_cond_t DataReady = PTHREAD_COND_INITIALIZER;
    int bDataReady = 0;
    
    pthread_cond_t DataConsumed = PTHREAD_COND_INITIALIZER;
    int bDataConsumed = 0;
    
    pthread_mutex_t Lock = PTHREAD_MUTEX_INITIALIZER;
    int Data;
    int bTimeToExit = 0;
    
    pthread_mutex_t IOLock = PTHREAD_MUTEX_INITIALIZER;
    
    void sync_printf(const char *format, ...)
    {
        va_list marker;
        va_start(marker, format);
    
        pthread_mutex_lock(&IOLock);
            vprintf(format, marker);
            fflush(stdout);
        pthread_mutex_unlock(&IOLock);
    }//sync_printf
    
    void* consumer(void *p)
    {
        int myid = (int)p;
        int currentData;
        while (1)
        {
            pthread_mutex_lock(&Lock);
                while (!bDataReady)
                {
                    if (bTimeToExit)
                    {
                        pthread_mutex_unlock(&Lock);
                        sync_printf("%d: Consumer Exiting\n", myid);
                        return NULL;
                    }//if
    
                    pthread_cond_wait(&DataReady, &Lock);
                }//while
    
                // "consume" the data and release the lock asap
                currentData = Data;
    
                bDataConsumed = 1;
                pthread_cond_signal(&DataConsumed);
    
                bDataReady = 0;
            pthread_mutex_unlock(&Lock);
    
            // now "process" the consumed data w/o the lock being held
            sync_printf("%d: consumed %d\n", myid, currentData);
    
            struct timespec delay = {0};
            delay.tv_nsec = rand() % 10000;
            nanosleep(&delay, NULL);
        }//while
    
        return NULL; // never reached
    }//consumer
    
    void* producer(void *p)
    {
        pthread_mutex_lock(&Lock);
        for (int i = 1; i <= 30; ++i)
        {
            // produce i
            Data = i;
    
            bDataReady = 1;
            pthread_cond_signal(&DataReady);
    
            bDataConsumed = 0;
            pthread_mutex_unlock(&Lock);
    
            // Do any other producer work while not holding the lock
    
            pthread_mutex_lock(&Lock);
            while (!bDataConsumed)
                pthread_cond_wait(&DataConsumed, &Lock);
        }//for
    
        bDataReady = 0;
        pthread_mutex_unlock(&Lock);
        sync_printf("Producer Exiting\n");
        return NULL;
    }//producer
    
    int main()
    {
        const int NumConsumers = 4;
        pthread_t p, c[NumConsumers];
    
        for (int n = 0; n < NumConsumers; ++n)
            pthread_create(c + n, NULL, consumer, (void*)(n + 1));
    
        pthread_create(&p, NULL, producer, NULL);
        pthread_join(p, NULL);
    
        // producer has produced all it can
        // for testing, produce one more thing and tell consumers to exit
        pthread_mutex_lock(&Lock);
            Data = 100;
            bDataReady = 1;
            bTimeToExit = 1;
            pthread_cond_broadcast(&DataReady);
        pthread_mutex_unlock(&Lock);
    
        for (int n = 0; n < NumConsumers; ++n)
            pthread_join(c[n], NULL);
    
        printf("All done\n");
        return 0;
    }//main
    Another common way of doing this is to use some kind of "container" data type (linked-list etc.) instead of a single shared variable (Data) to move data to the consumers. This allows you to eliminate the bDataConsumed condition since the container can buffer up produced data until it's ready for consumption. Then the predicate condition in the consumer becomes "while container not empty". With this method, you just have to watch out for run-away production (consumers can't keep up).

    gg

  13. #13
    Registered User
    Join Date
    Jun 2009
    Posts
    101
    Thanks so much! This is working really well so far. I'll keep testing and let you know if there are any errors.

Popular pages Recent additions subscribe to a feed

Similar Threads

  1. question about the pthread.
    By thungmail in forum C Programming
    Replies: 4
    Last Post: 10-31-2009, 09:38 PM
  2. Pthread question
    By gundamz2001 in forum C Programming
    Replies: 3
    Last Post: 09-16-2009, 04:04 AM
  3. pthread question
    By quantt in forum Linux Programming
    Replies: 7
    Last Post: 04-07-2009, 01:21 AM
  4. Mutexes and Blocking
    By NuNn in forum C Programming
    Replies: 2
    Last Post: 03-12-2009, 03:32 PM
  5. Easy question, (should be) easy answer... ;-)
    By Unregistered in forum A Brief History of Cprogramming.com
    Replies: 1
    Last Post: 06-12-2002, 09:36 PM