Thread: threads - threadpool - reset threads - stop threads without destroy - pthreads -linux

  1. #1
    Registered User
    Join Date
    Nov 2012
    Posts
    9

    threads - threadpool - reset threads - stop threads without destroy - pthreads -linux

    Hey there,

    I'm asking me if it is possible to implement a threadpool in C with the feature to stop worker-threads (they should not return their calculated value) and push them back to the threadpool. Normaly, worker-threads get some work, finished it and get back to the threadpool.

    Example:

    1. Server starts threadpool with e.g 20 threads
    2. Client sends request -> Server: all 20 threads waiting for work. 1 thread from threadpool get the work and is calculating
    3. Client sends another request (previous request isn't finished)-> Server: resets the thread (send him back to threadpool) and another/or the same thread get the new work.

    The previous request souldn't be finished because there are new informations and the previous calculation is out of date.

    I dont want to create a new thread an kill the old because of performance issues.

    The whole scenario is for LINUX and pthreads C. NO C++

    Is there an implementation of such a feature, is it possible to implement it and how can i do it?
    thx

  2. #2
    and the hat of int overfl Salem's Avatar
    Join Date
    Aug 2001
    Location
    The edge of the known universe
    Posts
    39,659
    How does the server communicate with each thread?

    If it's some kind of message interface, there are only two messages
    - start (and whatever the work parameters are)
    - stop

    So long as each worker thread can poll at a reasonable interval to see if there is a stop message, you should be able to strike a reasonable balance between frequent message queue checks, and letting a thread run on when it's out of date.
    If you dance barefoot on the broken glass of undefined behaviour, you've got to expect the occasional cut.
    If at first you don't succeed, try writing your phone number on the exam paper.

  3. #3
    Ticked and off
    Join Date
    Oct 2011
    Location
    La-la land
    Posts
    1,728
    Quote Originally Posted by hell-student View Post
    implement a threadpool in C ... LINUX and pthreads C.
    Yes, it is possible to implement a very efficient thread pool for pthread, linux, and gcc. (gcc provides certain atomic built-ins, which allow you to avoid locking overheads.)

    Quote Originally Posted by hell-student View Post
    I dont want to create a new thread an kill the old because of performance issues.
    Exactly what performance issues?
    • Worker setup overhead
      If each worker has a significant scratchpad (temporary memory), you do not want to cancel a thread outright, but just signal the thread that it should abort.
    • Latency
      If each work unit takes a relatively long time -- for example, a long-running loop --, and you don't want to slow it down with extra accesses to a flag variable (to tell the thread it should abort or restart with new parameters), it may take quite a long time between the master thread telling the worker to reload/restart/abort, and the worker actually doing that.
    • Per-client overhead
      If each thread has a lot of preparatory work per client, it may be much more efficient to tell the thread to restart with new parameters, but with the same client, rather than do the entire per-client setup for a new thread.
    • Something else?


    My approach in the three cases would differ quite a bit, so I'd need to know what the actual problem (performance issue) is you are trying to solve. Typically, I don't normally cancel threads, I use flags and signals to inform the threads about their targets, and let them handle it themselves.

    In general, I'd use a per-thread status flag (static __thread worker_changes = 0; or similar), and a signal handler to manipulate it. Calling pthread_kill() or pthread_sigqueue() causes the signal handler to interrupt any blocking I/O calls, and run the signal handler code in the target thread. This means that the worker thread should inspect the status flag at least after every I/O call, but also after other long tasks.

    If I was trying to minimize latency, I'd use a thread pool with N+B threads, with up to N threads doing actual work, and B threads either waiting for work, or in the process of aborting their current (useless, discarded) work. When new parameters arrive, the master thread would change the per-thread status flag for the current worker to a value which indicates "abort, discard results", and launch a new thread from the pool to work on the problem. The soon-to-be aborting thread, after noticing the flag, would do whatever cleanup necessary and then go back to the pool.

    If the number of threads was hard-limited, then I'd let the latency be whatever it is, and just inform the worker threads when parameters change (or if they can return back to the thread pool). This should keep the CPU use under the limit, and yield maximum useful work for the CPU time used. (The number of threads is often limited in high-performance computing clusters.)

    If you care to provide further details on exactly what the performance issue is, I think I could provide with some simplified example code.

  4. #4
    Registered User
    Join Date
    Nov 2012
    Posts
    9
    @Nominal Animal

    Thank you for this answer. As you problaby noticed, my english is not the best. I'm an german student, but I will try my best to explain my problem.

    The work unit has 4 sections named A, B, C and D.

    B waits for the result of A
    C waits for the result of B
    D waits for the result of D

    If A is finished, B, C and D must be performed straight after each other. But if a new request of the same client comes in during A hasn't finished, section B, C, and D

    shouldn't be performed because A needs to be processed again and then follows B,C and D.

    If the master thread noticed such a event, it can cancel the thread and creates a new one. But I think, this cost a lot of time, creating a new thread needs to jump into the

    kernel.

    When new parameters arrive, the master thread would change the per-thread status flag for the current worker to a value which indicates "abort, discard results", and

    launch a new thread from the pool to work on the problem. The soon-to-be aborting thread, after noticing the flag, would do whatever cleanup necessary and then go back to the

    pool.
    Thats exactly what i need. Before section A is finished, it could check the flag and go back to pool and a thread from the pool do the work. Can you post a simpified example

    or other links, books or papers, to implement this feature as also the wohle threadpool.

    thanks alot

  5. #5
    Registered User
    Join Date
    Dec 2007
    Posts
    2,675
    This looks like a job for a message queueing system, rather than (or in addition to) a thread pool.

  6. #6
    Ticked and off
    Join Date
    Oct 2011
    Location
    La-la land
    Posts
    1,728
    Quote Originally Posted by rags_to_riches View Post
    This looks like a job for a message queueing system, rather than (or in addition to) a thread pool.
    Well, I don't see why you'd need a queue. All I can see are simple conditional flags.

    Edit: Unless, of course, you mean a queue for the work themselves (as opposed to using a message queue to communicate between the master thread and worker threads). I think I'll use a simple LIFO queue for the work structures.

    Quote Originally Posted by hell-student View Post
    The work unit has 4 sections named A, B, C and D.
    B waits for the result of A
    C waits for the result of B
    D waits for the result of C
    If A is finished, B, C and D must be performed straight after each other. But if a new request of the same client comes in during A hasn't finished, section B, C, and D shouldn't be performed because A needs to be processed again and then follows B,C and D.
    Okay, that looks like a pretty typical case. What about the latency? How important is it to reduce the latency?

    Consider the case where the master thread sets a simple flag, and the A phase code regularly (or at least at the end of the A phase) checks the flag. If the flag is set, then the work is restarted from the beginning.

    From the point where the master thread sets the flag, there will be some time until the next time the thread checks the flag. This is the latency.

    There are two basic options: either you accept that latency, or you start a new thread from the beginning, and simply tell the currently working thread to discard the results. You can of course use the latter if there are free threads, and fall back to the former (with the increased latency) if all threads are already busy.

    Let me construct some example code; I'll post it in this thread in a bit.

    Note: Since this is Linux-specific, you might wish to ask a moderator to move the thread to the Linux board.
    Last edited by Nominal Animal; 12-12-2012 at 09:13 AM.

  7. #7
    Registered User
    Join Date
    Nov 2012
    Posts
    9
    Note: Since this is Linux-specific, you might wish to ask a moderator to move the thread to the Linux board.
    Sorry, but I can't find any option to send a message to a moderator. If you tell me how, I will do it.

    From the point where the master thread sets the flag, there will be some time until the next time the thread checks the flag. This is the latency.
    In section A there is a long running loop. Check always the flag before a new loop cycle beginns or at the end of the section. I can't tell you now what would be better. Always checking leads to overhead, if the flag dosen't change.

    Let me construct some example code; I'll post it in this thread in a bit.
    Thank you

  8. #8
    and the hat of int overfl Salem's Avatar
    Join Date
    Aug 2001
    Location
    The edge of the known universe
    Posts
    39,659
    Moved
    If you dance barefoot on the broken glass of undefined behaviour, you've got to expect the occasional cut.
    If at first you don't succeed, try writing your phone number on the exam paper.

  9. #9
    Ticked and off
    Join Date
    Oct 2011
    Location
    La-la land
    Posts
    1,728
    Quote Originally Posted by hell-student View Post
    Check always the flag before a new loop cycle beginns or at the end of the section. I can't tell you now what would be better.
    It should be possible to minimize the flag examination overhead, by using a separate flag variable that only transitions between zero and non-zero. That way atomicity is not an issue, and it is enough to have the flag be marked volatile. The actual cost, then, is similar to incrementing a local variable. (Minimal.)

    Before we get to the example code:

    I realized that I didn't mention that the actual creation of a thread is not at all expensive. Consider the following program:
    Code:
    #include <pthread.h>
    #include <time.h>
    #include <string.h>
    #include <stdio.h>
    
    #define   THREAD_SETS      100000
    #define   THREADS_PER_SET  1
    
    struct counter {
        pthread_mutex_t lock;
        unsigned long   value;
    };
    
    void *worker(void *const payload)
    {
        struct counter *const c = (struct counter *)payload;
    
        pthread_mutex_lock(&c->lock);
        c->value++;
        pthread_mutex_unlock(&c->lock);
    
        return NULL;
    }
    
    int main(void)
    {
        struct counter  total = { PTHREAD_MUTEX_INITIALIZER, 0UL };
        struct timespec started, stopped;
        double          elapsed;
        pthread_attr_t  attrs;
        pthread_t       id[THREADS_PER_SET];
        int             i, k, result;
    
        /* 64k stack per thread should suffice for real-world threads,
         * assuming they allocate larger arrays dynamically,
         * instead of relying on stack. */
        pthread_attr_init(&attrs);
        pthread_attr_setstacksize(&attrs, 65536);
    
        /* Start timing. */
        clock_gettime(CLOCK_REALTIME, &started);
    
        for (k = 0; k < THREAD_SETS; k++) {
    
            /* Create the threads. */
            for (i = 0; i < THREADS_PER_SET; i++) {
                result = pthread_create(&id[i], &attrs, worker, &total);
                if (result) {
                    fprintf(stderr, "Error creating a new thread: %s.\n", strerror(result));
                    return 1;
                }
            }
    
            /* Join the threads. */
            for (i = 0; i < THREADS_PER_SET; i++) {
                result = pthread_join(id[i], NULL);
                if (result) {
                    fprintf(stderr, "Error joining a thread: %s.\n", strerror(result));
                    return 1;
                }
            }
        }
    
        pthread_attr_destroy(&attrs);    
    
        /* Stop timing. */
        clock_gettime(CLOCK_REALTIME, &stopped);
    
        elapsed = (double)(stopped.tv_sec - started.tv_sec)
                + (double)(stopped.tv_nsec - started.tv_nsec) / 1000000000.0;
    
        printf("%lu threads (%d sets of %d threads) created in %.6f seconds real time,\n",
               total.value, THREAD_SETS, THREADS_PER_SET, elapsed);
        if (elapsed > 0.0)
            printf("%.0f threads per second, %.9f seconds per thread on average.\n",
                   (double)total.value / elapsed, elapsed / (double)total.value);
    
        return 0;
    }
    If you save the above as spawn.c, you can compile and run it using e.g.
    Code:
    gcc -W -Wall -m64 -O3 spawn.c -lpthread -lrt -o spawn
    ./spawn
    My desktop machine (AMD Athlon X4 640) can create and join about 65000 single threads per second, or about 110000 in groups of ten. This means that the time taken to create a new thread (and join an old one, assuming the old thread has already completed), takes less than 0.000009 seconds real time on it.

    Note that it is quite important to specify the per-thread stack size (see pthread_attr_t attrs in above code). While the stack given to the original thread in the process will grow dynamically, per-thread stacks are fixed in size. Because of that, the default tends to be ridiculously large, something like 8 megabytes per thread. Unless you use large local arrays, a small fraction of that is sufficient. The code above uses 64k. (The stack size is a power of two, minimum 16384 on Linux pthreads. The lower limit is not likely to be any larger on any future architectures.)

    Even if you modify the program to cancel the threads asynchronously,
    Code:
    #include <pthread.h>
    #include <time.h>
    #include <string.h>
    #include <stdio.h>
    
    #define   THREAD_SETS      100000
    #define   THREADS_PER_SET  1
    
    struct counter {
        pthread_mutex_t lock;
        unsigned long   value;
    };
    
    pthread_mutex_t  locked = PTHREAD_MUTEX_INITIALIZER;
    unsigned long    failed = 0UL;
    
    void *worker(void *const payload)
    {
        struct counter *const c = (struct counter *)payload;
    
        pthread_mutex_lock(&c->lock);
        c->value++;
        pthread_mutex_unlock(&c->lock);
    
        pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
        pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);    
    
        pthread_mutex_lock(&locked);
    
        failed++;
    
        return NULL;
    }
    
    int main(void)
    {
        struct counter  total = { PTHREAD_MUTEX_INITIALIZER, 0UL };
        struct timespec started, stopped;
        double          elapsed;
        pthread_attr_t  attrs;
        pthread_t       id[THREADS_PER_SET];
        int             i, k, result;
    
        pthread_mutex_lock(&locked);
    
        /* 64k stack per thread should suffice for real-world threads,
         * assuming they allocate larger arrays dynamically,
         * instead of relying on stack. */
        pthread_attr_init(&attrs);
        pthread_attr_setstacksize(&attrs, 65536);
    
        /* Start timing. */
        clock_gettime(CLOCK_REALTIME, &started);
    
        for (k = 0; k < THREAD_SETS; k++) {
    
            /* Create the threads. */
            for (i = 0; i < THREADS_PER_SET; i++) {
                result = pthread_create(&id[i], &attrs, worker, &total);
                if (result) {
                    fprintf(stderr, "Error creating a new thread: %s.\n", strerror(result));
                    return 1;
                }
            }
    
            /* Cancel the threads. */
            for (i = 0; i < THREADS_PER_SET; i++)
                pthread_cancel(id[i]);
    
            /* Join the threads. */
            for (i = 0; i < THREADS_PER_SET; i++) {
                result = pthread_join(id[i], NULL);
                if (result) {
                    fprintf(stderr, "Error joining a thread: %s.\n", strerror(result));
                    return 1;
                }
            }
        }
    
        pthread_attr_destroy(&attrs);    
    
        /* Stop timing. */
        clock_gettime(CLOCK_REALTIME, &stopped);
    
        elapsed = (double)(stopped.tv_sec - started.tv_sec)
                + (double)(stopped.tv_nsec - started.tv_nsec) / 1000000000.0;
    
        printf("%lu threads (%d sets of %d threads) created in %.6f seconds real time,\n",
               total.value, THREAD_SETS, THREADS_PER_SET, elapsed);
        if (elapsed > 0.0)
            printf("%.0f threads per second, %.9f seconds per thread on average.\n",
                   (double)total.value / elapsed, elapsed / (double)total.value);
    
        if (failed)
            printf("Note: %lu threads increased the failed variable.\n", failed);
    
        return 0;
    }
    I still get over 50000 threads per second, or 0.00002 seconds real time overhead per thread. (All threads will block on the locked mutex, and be canceled asynchronously. No thread will actually get to the failed++ line.)

    Is that really a performance issue for you? If so, I think you have divided the work into too small units.

    (By the way, my first example program grew a bit too complex; I have to rewrite it. )

  10. #10
    Registered User Codeplug's Avatar
    Join Date
    Mar 2003
    Posts
    4,981
    >> It should be possible to minimize the flag examination overhead, by using a separate flag variable that only transitions between zero and non-zero. That way atomicity is not an issue, and it is enough to have the flag be marked volatile.
    I recommend not recommending this. Not only because it's a Posix-violating hack, but it's also premature optimization. If the flag must be checked so often that it becomes a bottleneck, then I like the thread-signal + TLS-flag approach to avoiding synchronization for speed. Otherwise normal synchronization should suffice (or perhaps even "interlocked" intrinsics provided by the compiler).

    Implementing thread cancellation correctly can be tricky - due to the need to setup cancellation clean-up handlers via pthread_cleanup_push/pthread_cleanup_pop.

    gg

  11. #11
    Registered User Codeplug's Avatar
    Join Date
    Mar 2003
    Posts
    4,981
    Code:
        elapsed = (double)(stopped.tv_sec - started.tv_sec)
                + (double)(stopped.tv_nsec - started.tv_nsec) / 1000000000.0;
    started nsec's could be greater...

    Code:
    double timer_diff(struct timespec *started, struct timespec *stopped)
    {
        if (started->tv_nsec > stopped->tv_nsec)
        {
            --stopped->tv_sec;
            started->tv_nsec += 1000000000L;
        }
    
        return (stopped->tv_sec - started->tv_sec) +
               (stopped->tv_nsec - started->tv_nsec) / 1000000000.0;    
    }
    gg

  12. #12
    Ticked and off
    Join Date
    Oct 2011
    Location
    La-la land
    Posts
    1,728
    Quote Originally Posted by Codeplug View Post
    Not only because it's a Posix-violating hack, but it's also premature optimization.
    Exactly what about it violates POSIX? The nature of the flag is such that only one thread sets it, and one thread reads it. The change is not guaranteed to be atomic, but since the transition is always either from zero or to zero, there can be no misdetection; at most, the transition is detected later than if proper locking or atomic operators were used -- but that is acceptable here. Atomicity is not an issue also because the flag only informs the worker thread that they need to examine the proper control variable.

    Quote Originally Posted by Codeplug View Post
    Implementing thread cancellation correctly can be tricky
    Yes. However, there is a very simple trick I wish to show: using the work description structure to maintain all dynamic allocations and descriptors and other features that require cleanup, and protect the sections of code that do allocation/initialization and deallocation/cleanup with cancellation disabled temporarily, via pthread_setcancelstate().

    Quote Originally Posted by Codeplug View Post
    Code:
        elapsed = (double)(stopped.tv_sec - started.tv_sec)
                + (double)(stopped.tv_nsec - started.tv_nsec) / 1000000000.0;
    started nsec's could be greater...
    So what? It's perfectly correct as is. .tv_nsec is of type long, and can be negative.
    Last edited by Nominal Animal; 12-12-2012 at 11:11 PM.

  13. #13
    Registered User
    Join Date
    Nov 2012
    Posts
    9
    Thanks for the answers.

    My desktop machine (AMD Athlon X4 640) can create and join about 65000 single threads per second, or about 110000 in groups of ten. This means that the time taken to create a new thread (and join an old one, assuming the old thread has already completed), takes less than 0.000009 seconds real time on it.
    My Problem is, that I run the whole szenario on a FPGA board with 70 Mhz. So each trap to the kernel cost a lot of time (each creation of an thread and each time I destroy one). I need to implement the feature to stop an executing thread, push him back to the threadpool and take another thread from the pool which performes the work.

  14. #14
    Registered User Codeplug's Avatar
    Join Date
    Mar 2003
    Posts
    4,981
    >> Exactly what about it violates POSIX?
    It violates Posix-2008, section 4.11 in Base Definitions. You can read more about it here: pthread and shared variables

    >> So what? It's perfectly correct ...
    Just plug some #'s into your formula....
    started = [1s, 1ns]
    stopped = [2s, 0ns]
    correct diff = [0s, 999999999ns]

    gg

  15. #15
    Registered User Codeplug's Avatar
    Join Date
    Mar 2003
    Posts
    4,981
    Code:
    double timer_diff(struct timespec *started, struct timespec *stopped)
    {
        if (started->tv_nsec > stopped->tv_nsec)
        {
            --stopped->tv_sec;
            started->tv_nsec += 1000000000L;
        }
    
        return (stopped->tv_sec - started->tv_sec) +
               (stopped->tv_nsec - started->tv_nsec) / 1000000000.0;    
    }
    Hah - I goofed my version too
    That "started" should be "stopped".

    gg

Popular pages Recent additions subscribe to a feed

Similar Threads

  1. Threads , how? Portable Threads Library?
    By adderly in forum C++ Programming
    Replies: 3
    Last Post: 12-15-2011, 07:54 AM
  2. Threads other than pthreads?
    By dayalsoap in forum C Programming
    Replies: 10
    Last Post: 05-28-2010, 01:56 AM
  3. a point of threads to create multiple threads
    By v3dant in forum C Programming
    Replies: 3
    Last Post: 10-06-2004, 09:48 AM
  4. pthreads-win32 POSIX threads
    By c99 in forum C Programming
    Replies: 4
    Last Post: 02-24-2004, 06:59 PM
  5. threads and linux
    By shaka87 in forum C Programming
    Replies: 1
    Last Post: 06-07-2002, 03:59 AM

Tags for this Thread