Thread: threads - threadpool - reset threads - stop threads without destroy - pthreads -linux

  1. #16
    Ticked and off
    Join Date
    Oct 2011
    Location
    La-la land
    Posts
    1,728
    Quote Originally Posted by hell-student View Post
    I run the whole szenario on a FPGA board with 70 Mhz.
    Do you mean you have an FPGA board that does the computation, connected to a computer running Linux and your server application? Perhaps you have the server threads write inputs to the FPGA and read the results using a character device? Using read() and write()?

    If so, the signal approach is almost certainly the best option. It will interrupt a blocking read/write immediately, and if the signal handler also sets a per-thread flag, the thread can detect the restart/abort condition with minimal cost.

    Or, do you mean you have a 70 MHz FPGA board running Linux? If so, the processor type the FPGA board implements may have relevant features beyond pthreads and POSIX stuff. In particular, the GCC built-in atomic functions tend to be faster than pthread locking primitives.

    I'm having trouble writing a simple example code -- I cannot seem to get a complete working one under a thousand lines --, but perhaps showing the key points would help you understand what I'm getting at?
    Code:
    #define _GNU_SOURCE
    #include <pthread.h>
    
    typedef enum {
        UNUSED = 0,
        PENDING,
        WORKING,
        SUCCESS,
        FAILURE,
        CANCELED
    } status_t;
    
    struct work_struct {
        struct work_struct      *next;
    
        volatile int             canceled;
    
        pthread_mutex_t          mutex;
        status_t                 status;
    
        /* Work parameters. This one iterates n times
         *      i = (i * a + b) modulo m
        */
        unsigned long            n;
        unsigned long            m;
        unsigned long            a;
        unsigned long            b;
        unsigned long            i;
    };
    
    static pthread_mutex_t       work_mutex = PTHREAD_MUTEX_INITIALIZER;
    static pthread_cond_t        work_cond = PTHREAD_COND_INITIALIZER;
    static size_t                work_drain = 0;
    static struct work_struct   *work_pending = NULL;
    static struct work_struct   *work_finished = NULL;
    
    
    /* Worker function.
    */
    status_t work_on(struct work_struct *const work)
    {
        volatile int *const canceled = &work->canceled;
        const unsigned long  m = work->m;
        const unsigned long  a = work->a;
        const unsigned long  b = work->b;
        unsigned long        n = work->n;
        unsigned long        i = work->i;
    
        while (n > 0UL) {
    
            if (*canceled)
                return CANCELED;
    
            n--;
    
            i = (i * a + b) % m;
        }
    
        work->i = i;
    
        return SUCCESS;
    }
    
    /* Thread function.
    */
    void *thread_worker(void *payload __attribute__((unused)))
    {
        struct work_struct *work;
        status_t            status;
    
        pthread_mutex_lock(&work_mutex);
    
        while (1) {
     
           /* Draining threads? */
            if (work_drain) {
                /* Yes. Decrease the number of threads to drain, */
                work_drain--;
                /* signal further drainage if necessary, */
                if (work_drain)
                    pthread_cond_signal(&work_cond);
                /* and detach and die. */
                pthread_detach(pthread_self());
                pthread_mutex_unlock(&work_mutex);
                return NULL;
            }
    
            /* No work to do? */
            if (!work_pending) {
                pthread_cond_wait(&work_cond, &work_mutex);
                continue;
            }
    
            /* Grab first work unit. */
            work = work_pending;
            work_pending = work->next;
            work->next = NULL;
    
            /* Should it be worked on? */
            pthread_mutex_lock(&work->mutex);
            if (work->status == PENDING) {
                /* Work on it. */
                work->status = WORKING;
                pthread_mutex_unlock(&work->mutex);
    
                status = work_on(work);
    
                pthread_mutex_lock(&work->mutex);
                if (work->status == WORKING)
                    work->status = status;
            }
            pthread_mutex_unlock(&work->mutex);
    
            /* Move work unit to finished queue. */
            pthread_mutex_lock(&work_mutex);
            work->next = work_finished;
            work_finished = work;
            pthread_mutex_unlock(&work_mutex);
        }
    }
    
    void cancel_work(struct work_struct *const work)
    {
        pthread_mutex_lock(&work->mutex);
        if (work->status == PENDING || work->status == WORKING) {
            work->canceled = ~0;
            work->status = CANCELED;
        }
        pthread_mutex_unlock(&work->mutex);       
    }
    Assume you have created a few thread_worker() threads. (Remember to use pthread_attr_t when creating the threads, having set the thread stack size to 16384, 32768, or 65536 using pthread_attr_setstacksize(). The default per-thread stack size is ridiculously large.)

    The threads will wait on the work_cond condition variable, unless there are work_pending available, or work_drain indicates some threads should die.

    Any thread can add new work units by taking the work_mutex mutex, appending the new work structure to the end of the list, calling pthread_cond_signal(&work_cond) to wake up a thread waiting for work, and unlocking the mutex. The status field in the work unit should be set to PENDING.

    To cancel a work, just call cancel_work() on the work unit. The thread that is working on it will move it to the work_finished list after it detects the change, and then move on to next work. (You can examine the status field to see if the work unit was completed successfully, failed, or canceled.)

    If you use blocking syscalls, the struct work_struct should contain the thread ID of the thread (assigned when status is assigned to WORKING). The cancel_work() function sets canceled, then uses pthread_kill() to send a signal to the thread. The signal handler can be empty, but it must be a real function; the delivery of the signal to the signal handler function is what interrupts blocking I/O calls in that thread.

    If you are worried about modifying the canceled field in one thread while it is being read in another, you can change it to sig_atomic_t and modify it in the signal handler. (GNU pthreads supports pthread_sigqueue(), which allows you to pass the pointer to the sig_atomic_t as a parameter to the signal handler. It is passed in the siginfo->si_value field, when using a SA_SIGINFO signal handler. See man 2 sigaction for details.)

    Quote Originally Posted by Codeplug View Post
    It violates Posix-2008, section 4.11 in Base Definitions.
    Section 4.11 only restricts memory access "using functions that synchronize thread execution and also synchronize memory with respect to other threads".

    My understanding is that since the synchronization is not necessary for the correct operation of the program (in this use case, only the change in the flag is monitored, and delay or observing a partial change is acceptable and does not cause an error), the code does not violate POSIX.1-2008.

    My opinion is based on the rationale for POSIX.1-2008 section 4.11, and on the fact that I have never encountered an architecture where it would not work.

    Quote Originally Posted by Codeplug View Post
    Just plug some #'s into your formula....
    started = [1s, 1ns]
    stopped = [2s, 0ns]
    correct diff = [0s, 999999999ns]
    Correct diff what? The formula computes a single floating-point value, not seconds and nanoseconds. For started=[1s, 1ns] and stopped=[2s, 0ns], it yields elapsed == 0.999999999, which is the correct difference. Where is the error?

  2. #17
    Registered User Codeplug's Avatar
    Join Date
    Mar 2003
    Posts
    4,981
    >> So what? It's perfectly correct ...
    You're right. That's what I get for doing math in my head instead of with the compiler. (When first reviewed your code, I recalled being burned by something very similar which involved unsigned integers...)

    The code is clearly in violation of Posix 4.11 (and C++11, as shown in the other thread).

    Quote Originally Posted by IEEE Std 1003.1-2008, 4.11
    Applications shall ensure that access to any memory location by more than one thread of control (threads or processes) is restricted such that no thread of control can read or modify a memory location while another thread of control may be modifying it.
    That's exactly what the hack is doing - one thread is writing, other threads are reading. How is it "restricted"?
    Next sentence...
    Quote Originally Posted by IEEE Std 1003.1-2008, 4.11
    Such access is restricted using functions that synchronize thread execution and also synchronize memory with respect to other threads. The following functions synchronize memory with respect to other threads: ...
    If you don't call one of the listed functions, then you have not met the "shall ensure" requirement.

    >> and on the fact that I have never encountered an architecture where it would not work.
    Not a reason to violate standards (though you haven't conceded that it does violate all known standards). The volatile hack does not provide synchronization - which includes things like visibility. There is no guarantee that a change to memory in one thread will ever be visible to another thread without synchronization. Just because your code and experience is running on architectures with cache coherency protocols that make changes visible to others eventually, doesn't mean that will always be the case for everyone - nor is it an excuse to violate standards.

    gg

  3. #18
    Ticked and off
    Join Date
    Oct 2011
    Location
    La-la land
    Posts
    1,728
    Quote Originally Posted by Codeplug View Post
    an excuse
    What I described is a technique that is known to work on all multiprocessor/multicore architectures Linux runs on. hell-student specifically excluded everything else.

    Feel free to argue about standards and standards compliance with someone else. I'm not interested.

  4. #19
    Registered User Codeplug's Avatar
    Join Date
    Mar 2003
    Posts
    4,981
    >> Feel free to argue about standards and standards compliance with someone else. I'm not interested.
    Except you were interested - in post #12 and #16. I'll argue with anyone who presents incorrect opinions/interpretations of the standard. If you don't want to backup your claims, don't make them in the first place. If you concede to the fact that it is standards violating, then consider not recommending standards violating code.

    The work unit has 4 sections named A, B, C and D.

    B waits for the result of A
    C waits for the result of B
    D waits for the result of D
    So the same thread perform A, B, C, D in sequential order - correct?

    But if a new request of the same client comes in during A hasn't finished, section B, C, and D shouldn't be performed because A needs to be processed again and then follows B,C and D.
    Give us a general idea of the processing times for A, B, C, D. If they are relatively short, then you could just check for "cancellation" between A-B, B-C, and C-D. With only 3 checks per work-unit you may be able to use normal synchronization (an additional condition variable and mutex) with negligible overhead.

    gg

  5. #20
    Registered User Codeplug's Avatar
    Join Date
    Mar 2003
    Posts
    4,981
    >> ... you may be able to normal use synchronization (an additional condition variable and mutex)
    A condition variable is not needed - just a mutex that protects a boolean would be "normal synchronization".

    I'm also interested in knowing the typical processing load of this device - how many threads in the pool - avg. number of threads that run concurrently (or avg. # that are idle).

    Any particular throughput requirements?

    How many real hardware threads/cores do you have?

    Out of curiosity, what is the architecture?

    Having a better understanding of how it works today will help with choosing a polling method vs. a signaling method.

    gg

  6. #21
    Registered User
    Join Date
    Nov 2012
    Posts
    9
    Thanks for the answers,

    Or, do you mean you have a 70 MHz FPGA board running Linux? If so, the processor type the FPGA board implements may have relevant features beyond pthreads and POSIX stuff. In particular, the GCC built-in atomic functions tend to be faster than pthread locking primitives.
    The FPGA board is running a Linux and the whole application runs on it. I must talk to the hardware designer, which core he uses. It's a Leon3 core cpu and some other cores.

    Give us a general idea of the processing times for A, B, C, D.
    The sections processing times are very long. It depends on the application and the input data. I guess seconds - minutes.

    I'm also interested in knowing the typical processing load of this device - how many threads in the pool - avg. number of threads that run concurrently (or avg. # that are idle).
    It depends on the application but i would say 200-300 threads

    How many real hardware threads/cores do you have?
    The threads will run on 1-3 cores

    Any thread can add new work units by taking the work_mutex mutex, appending the new work structure to the end of the list, calling pthread_cond_signal(&work_cond) to wake up a thread waiting for work, and unlocking the mutex. The status field in the work unit should be set to PENDING.
    I tried it and it runs. thanks

    Thanks alot for your time
    Last edited by hell-student; 12-14-2012 at 05:14 AM.

  7. #22
    Registered User
    Join Date
    Nov 2012
    Posts
    9
    Here is the modified code. My problem is, that when i send a message over a fifo to the runtime_system I can only start one working thread. The other problem is, that he is doing the work/job the hole time. After it finished the job, the worker starts from beginning:

    Code:
    #include <fcntl.h>
    #include <sys/stat.h>
    #include <sys/types.h>
    #include <stdio.h>
    #include <stdlib.h>
    #include "pthread.h"
    #include "runtime_system.h"
    
    #define   THREADS_PER_SET  100
    
    typedef enum {
        UNUSED = 0,
        PENDING,
        WORKING,
        SUCCESS,
        FAILURE,
        CANCELED
    } status_t;
    
    struct invade_job_struct {
        struct invade_job_struct *next;
        volatile int canceled;
        pthread_mutex_t mutex;
        status_t status;
    
        /* Job parameters. This one iterates n times sleep(1)
         */
        unsigned long n;
    };
    
    static pthread_mutex_t       work_mutex = PTHREAD_MUTEX_INITIALIZER;
    static pthread_cond_t        work_cond = PTHREAD_COND_INITIALIZER;
    
    static struct invade_job_struct *invade_queue_head = NULL;
    static struct invade_job_struct *invade_queue_ptr = NULL;
    
    /*
     * job worker function
     */
    #define DEBUG 1
    status_t invade(struct invade_job_struct *const job)
    {
        volatile int *const canceld = &job->canceled;
        unsigned long n = job->n;
    
        while (n > 0UL) {
            if (*canceld) {
                return CANCELED;
            }
    
            n--;
            sleep(1);
    
            if (DEBUG) {
                printf("Thread %u : invade = %lu\n",  (unsigned int)pthread_self(), n);
                fflush(stdout);
            }
        }
    
        return SUCCESS;
    }
    
    /* Invade thread function
     *
     */
    void *invade_thread_worker(void *payload __attribute__((unused)))
    {
        struct invade_job_struct *job;
        status_t status;
    
        pthread_mutex_lock(&work_mutex);
    
        while (1) {
    
            /* no job in queue */
            if (!invade_queue_head) {
                pthread_cond_wait(&work_cond, &work_mutex);
                continue;
            }
    
            /* grep first job element of invade queue */
            job = invade_queue_head;
            if (invade_queue_head->next) {
                invade_queue_ptr = invade_queue_head->next;
                invade_queue_head = invade_queue_ptr;
            }
            else {
                invade_queue_ptr = NULL;
                invade_queue_head = NULL;
            }
    
            pthread_mutex_lock(&job->mutex);
            if (job->status == PENDING) {
    
                /* work on it */
                job->status = WORKING;
                pthread_mutex_unlock(&job->mutex);
    
                status = invade(job);
    
                pthread_mutex_lock(&job->mutex);
                if (job->status == WORKING) {
                    job->status = status;
                }
            }
            pthread_mutex_unlock(&job->mutex);
    
            free(job);
        }
    }
    
    void cancel_invade_job(struct invade_job_struct *const job)
    {
        pthread_mutex_lock(&job->mutex);
        if (job->status == PENDING || job->status == WORKING) {
            job->canceled = ~0;
            job->status = CANCELED;
        }
        pthread_mutex_unlock(&job->mutex);
    }
    
    #define DEBUG 1
    int main()
    {
        int i;
        int recv_bytes;
        int fifo_fd;
        char fifo_name[100];
        char msg[100];
        pthread_attr_t attrs;
        pthread_t id[THREADS_PER_SET];
        struct invade_job_struct *job;
    
        sprintf(fifo_name, "runtime_system_fifo%i\0", 1);
        mkfifo(fifo_name, 0666);
        fifo_fd = open(fifo_name, O_RDONLY | O_NONBLOCK);
    
        pthread_attr_init(&attrs);
        pthread_attr_setstacksize(&attrs, 65536);
    
        for (i = 0; i < THREADS_PER_SET; i++) {
            pthread_create(&id[i], &attrs, invade_thread_worker, NULL);
        }
    
        while(1) {
            recv_bytes = read(fifo_fd, msg, sizeof(msg));
            msg[100] = 0;
            if(recv_bytes != 0) {
                pthread_mutex_lock(&work_mutex);
    
                /* empty invade job queue */
                if(invade_queue_head == NULL) {
                    invade_queue_head = (struct invade_job_struct *) calloc(1, sizeof(struct invade_job_struct));
                    invade_queue_head->status = PENDING;
                    invade_queue_head->n = 10;
                    invade_queue_ptr = invade_queue_head;
                }
                else {
                    invade_queue_ptr = invade_queue_head;
                    while (invade_queue_ptr->next != NULL) {
                        invade_queue_ptr = invade_queue_ptr->next;
                    }
                    invade_queue_ptr->next = (struct invade_job_struct *) calloc(1, sizeof(struct invade_job_struct));
                    invade_queue_ptr = invade_queue_ptr->next;
                    invade_queue_ptr->status = PENDING;
                    invade_queue_ptr->n = 10;
                    invade_queue_ptr->next = NULL;
                }
    
                pthread_cond_signal(&work_cond);
                pthread_mutex_unlock(&work_mutex);
            }
        }
    
        return EXIT_SUCCESS;
    }

    There is a failure. I create 100 threads, but i can only append one job to the job queue. All other jobs aren't calculated
    Last edited by hell-student; 12-14-2012 at 09:36 AM.

  8. #23
    Registered User Codeplug's Avatar
    Join Date
    Mar 2003
    Posts
    4,981
    >> ... 200-300 threads
    ? Why would you even need that many threads?

    100 is too many also. 5 or so is more reasonable.

    >> I don't want to create a new thread an kill the old because of performance issues.
    Knowing how to implement a thread-pool design is great and all - but doing so only to avoid "performance issues" is premature optimization. If each work unit is measured in seconds/minutes, then thread creation and startup times are minuscule in comparison.

    invade_thread_worker() is locking work_mutex without ever releasing it. Think about what your mutex's are protecting, and unlock mutex's as soon as you're done accessing whatever is being protected. For example, work_mutex protects access to invade_queue_head and invade_queue_ptr - so I would lock on line 75 instead of 72, then unlock on line 92.

    Your usage of invade_queue_ptr is a strange. I expected invade_queue_ptr to always point at the end of the list so that producers can produce directly to the end of the list without traversing it.

    >> pthread_attr_setstacksize
    This is rarely needed on a modern architecture with a 4G or more of virtual memory. Even if the default stack size is 32M, you only "pay" for what you use.
    I have seen one good reason for setting the stack size - it was due to a monitoring service that made you "pay" for the virtual memory allocated instead of the actual memory used: threads and mem usage

    gg

Popular pages Recent additions subscribe to a feed

Similar Threads

  1. Threads , how? Portable Threads Library?
    By adderly in forum C++ Programming
    Replies: 3
    Last Post: 12-15-2011, 07:54 AM
  2. Threads other than pthreads?
    By dayalsoap in forum C Programming
    Replies: 10
    Last Post: 05-28-2010, 01:56 AM
  3. a point of threads to create multiple threads
    By v3dant in forum C Programming
    Replies: 3
    Last Post: 10-06-2004, 09:48 AM
  4. pthreads-win32 POSIX threads
    By c99 in forum C Programming
    Replies: 4
    Last Post: 02-24-2004, 06:59 PM
  5. threads and linux
    By shaka87 in forum C Programming
    Replies: 1
    Last Post: 06-07-2002, 03:59 AM

Tags for this Thread