# Simulating a pipe using threads and condition variables

• 07-23-2011
Phoenix_Rebirth
Simulating a pipe using threads and condition variables
So, this is an assignment I have for my parallel programming course. The objective of this part is to simulate pipelining a task. I have a task that is divided in 16 parts and I simulate this with a struct like the following:
Code:

```typedef struct _work_ {         int        part[WORK_PARTS] ; } work_t ;```
where each cell has an integer indicating how many milliseconds it takes to complete that part of the task. I am simulating the millisecond with a for-loop calling some mathematical functions. For anyone interested is like this:
Code:

```void millisec () {         int                i;         double        res = 4.0 ;                 for ( i=0 ; i<142350; i++ )                 res = pow( sqrt(res), 2.0) ; } void comp (int millis) {         int        i;                 for ( i=0; i<millis; i++ )                 millisec(); }```
The 142350 was choses with experiments timing the functions.

So what I wrote so far and my thought process. I have a main thread starting up and taking as an argument how many threads it will create (how many stages our pipe will have basically). This is purely for my experiments later on. I will have to time the same thing using different number of threads/stages. (The program is run in a machine at my university which has the necessary hardware). So after I turn the argument in a long integer called 'numThreads' I call a function where I build a table 'schedule' with 'numThreads+1' cells. This is a global variable. I initialize the table with zeros and then I start distributing the parts of the task at the cells of the table except for the first one. That stays zero. Basically I go round the cells 1-numThreads incrementing their value by one until the task parts are over (16 parts). Then I create the threads with a for-loop and call the function where I simulate the task. This is my main:
Code:

```#include                        <stdio.h> #include                        <stdlib.h> #include                        <pthread.h> #include                        "fun_n_str.h" pthread_mutex_t                sched_mutex; pthread_cond_t                sched_val_cv; int main (int argc, char **argv) {         uint32_t                start, stop;                pthread_t                *threadIDs;         pthread_attr_t        threadAttr;         void                        *status;         if ( argc < 2 ) {                 fprintf(stderr, "Usage: %s <No of threads>\nExiting...\n", argv[0]);                 return 1;         }                char*                        p;                        // gia xrisi me tin 'strtol', dixtis ston xaraktira pou den ine arithmos         long                        numThreads;        // apotelesma tis 'strtol'                 numThreads = strtol(argv[1], &p, 10);         if ( (*p != '\n') && (*p != '\0') ) {                 fprintf(stderr, "Usage: <No of threads> must be numerical.\n<%s> is NOT!\nExiting...", argv[1]);                 return 1;         }                 threadIDs = (pthread_t*) malloc( numThreads * sizeof(pthread_t) );         printf("No of Threads : %li\n",numThreads);                 distribute_load(numThreads);                 pthread_mutex_init(&sched_mutex, NULL);         pthread_cond_init (&sched_val_cv, NULL);         pthread_attr_init(&threadAttr);         pthread_attr_setdetachstate(&threadAttr, PTHREAD_CREATE_JOINABLE);                 start = stampstart();                 long i;         for( i=0; i<numThreads; i++ )         {                 pthread_create(&threadIDs[i], &threadAttr, someWork, (void *)(i+1));         }                 for( i=0; i<numThreads; i++)         {                 pthread_join(threadIDs[i], &status);         }                 stop = stampstop(start);                 int sum = 0;         for ( i=0; i<numThreads+1; i++ )         {                 printf("schedule[%d] : %d\n",i,schedule[i]);                 sum+=schedule[i];         }         printf("SUM : %d\n\n",sum);         for ( i=0; i<numThreads; i++ )         {                 printf("threadIDs[%d] : %d\n",i,threadIDs[i]);                 sum+=schedule[i];         }         return 0; }```
Code:

```void distribute_load(long numThreadsC) {         int i, j, sum;                 schedule = (int*) malloc( numThreadsC+1 * sizeof(int) );                 for ( i=0; i<numThreadsC+1; i++ )                 schedule[i] = 0;                 j = 1;         for ( i=WORK_PARTS; i>0; i-- )         {                 schedule[j]++;                 j = ( (j)%numThreadsC )+1;         }                 sum = 0;         for ( i=0; i<numThreadsC+1; i++ )         {                 printf("schedule[%d] : %d\n",i,schedule[i]);                 sum+=schedule[i];         } printf("SUM : %d\n\n",sum);         }```
The function I now call with the thread creation goes like this. I sent the pointer of the table cell that has the thread id stored incremented by 1 as a variable to use as a thread id. So each thread has a variable called 'myID' ranging from 1 to numThreads. The table I created before where I distributed the task parts has numThreads cells and except the first cell (schedule[0]) all the others have an integer value greater than 0. I made the program so it prints the table. For instance if I decided to use 5 threads the table will be like this:
schedule[0] : 0
schedule[1] : 4
schedule[2] : 3
schedule[3] : 3
schedule[4] : 3
schedule[5] : 3

Code:

```void *someWork(void *id) { long myID = (long)id; pthread_mutex_lock(&sched_mutex); printf("Hello World, this is thread No. %li\tI am assigned %d parts\n",myID,schedule[myID]); pthread_mutex_unlock(&sched_mutex); pthread_mutex_lock(&sched_mutex); while( schedule[myID-1]!=0 )         pthread_cond_wait(&sched_val_cv, &sched_mutex); pthread_mutex_unlock(&sched_mutex); pthread_mutex_lock(&sched_mutex); while( schedule[myID]!=0 ) {         //printf("----->Thread No. %li\t%d Parts remaining.\n",myID,schedule[myID]);         pthread_mutex_unlock(&sched_mutex);         comp(10);                 pthread_mutex_lock(&sched_mutex);         schedule[myID]--;         pthread_mutex_unlock(&sched_mutex); } printf("---->Thread No. %li\t%d Parts remaining... Finished. Waking other thread!\n",myID,schedule[myID]);         pthread_cond_broadcast(&sched_val_cv); pthread_exit( (void*) 0 ); }```
I admit than I am not sure whether I have understood 100% how and if condition variables are used like this (I am a little iffy still about how the wait works, with the mutex locking automatically and unlocking and when it gets a signal....) but I tried it anyway just to see how I was going and the thing is it worked (well at least the program ended with no errors and printing what I was expecting) when I run the program using as an argument munbers 2 to 6. However when using a greater number as an argument (7 to 16) something goes weird. The schedule array prints correct but when entering the thread function, cells 6 and above have random values (well, actually they are always the same, when using 7 threads schedule[6] suddenly has a value of 33 and schedule[7] of 0. When using 8 threads it is the same and in addition schedule[8] has the value 1668572463 which I think may be the thread id returned by the create function.)
So I wanted just some opinions on what might go on and if anyone is in the mood in general at my approach.

The makefile for anyone wanting to use the files I attached is this:
Code:

```OBJ = pipe.o timer.o load.o CC = gcc CFLAGS = -Wall -Wextra -O2 -c LFLAGS = -pthread -lm         pipe: \$(OBJ)         \$(CC) \$(LFLAGS) \$(OBJ) -o pipe         rm \$(OBJ)         pipe.o: pipe.c fun_n_str.h         \$(CC) \$(CFLAGS) pipe.c timer.o: timer.c fun_n_str.h         \$(CC) \$(CFLAGS) timer.c load.o: load.c fun_n_str.h         \$(CC) \$(CFLAGS) load.c```
I currently just use 10 as an argument when calling the comp function because I want to finish this first before going and trying to simulate different 'tasks'. Also I apologize for the lack of comment but unfortunately they are in my language so I erased them, and frankly I am pressed for time right now (have an appointment to go to) to rewrite them in English, which I am rather bad at, might I add.
• 07-23-2011
tabstop
Compare and contrast yours:
Code:

`schedule = (int*) malloc( numThreadsC+1 * sizeof(int) );`
with mine:
Code:

`schedule = (int*) malloc( (numThreadsC+1) * sizeof(int) );`
• 07-23-2011
Phoenix_Rebirth
Huh, didn't noticed that. I went back and changed my approach (created an extra cell so the thread with myID=1 could start from the beginning) so I added +1 to a lot of places and forgot about parentheses. * has higher priority, right, so that would be a problem. But why did it work for some cases. Does C allocates a bit more space just in case or something?

Thanks for the hint by the way. I doubt I would have seen it - at least not for some time...
• 07-23-2011
tabstop
Probably just sheer luck about how much space you actually requested vs. how much was available. (There may well be padding added, since some variables require addresses divisible by four or by eight, so there could well be rounding there; plus many implementations of malloc puts some extra stuff "around" your memory, so you might not notice anything until you get past that.)