Thread: Simulating a pipe using threads and condition variables

  1. #1
    Registered User Phoenix_Rebirth's Avatar
    Join Date
    Dec 2005
    Location
    Cyprus
    Posts
    68

    Simulating a pipe using threads and condition variables

    So, this is an assignment I have for my parallel programming course. The objective of this part is to simulate pipelining a task. I have a task that is divided in 16 parts and I simulate this with a struct like the following:
    Code:
    typedef struct _work_ {
    	int 	part[WORK_PARTS] ;
    } work_t ;
    where each cell has an integer indicating how many milliseconds it takes to complete that part of the task. I am simulating the millisecond with a for-loop calling some mathematical functions. For anyone interested is like this:
    Code:
    void millisec ()
    {
    	int		i;
    	double	res = 4.0 ;
    	
    	for ( i=0 ; i<142350; i++ )
    		res = pow( sqrt(res), 2.0) ;
    }
    
    
    void comp (int millis)
    {
    	int	i;
    	
    	for ( i=0; i<millis; i++ )
    		millisec();
    }
    The 142350 was choses with experiments timing the functions.

    So what I wrote so far and my thought process. I have a main thread starting up and taking as an argument how many threads it will create (how many stages our pipe will have basically). This is purely for my experiments later on. I will have to time the same thing using different number of threads/stages. (The program is run in a machine at my university which has the necessary hardware). So after I turn the argument in a long integer called 'numThreads' I call a function where I build a table 'schedule' with 'numThreads+1' cells. This is a global variable. I initialize the table with zeros and then I start distributing the parts of the task at the cells of the table except for the first one. That stays zero. Basically I go round the cells 1-numThreads incrementing their value by one until the task parts are over (16 parts). Then I create the threads with a for-loop and call the function where I simulate the task. This is my main:
    Code:
    #include			<stdio.h>
    #include			<stdlib.h>
    #include			<pthread.h>
    #include			"fun_n_str.h"
    
    pthread_mutex_t		sched_mutex;
    pthread_cond_t		sched_val_cv;
    
    int main (int argc, char **argv)
    {
    	uint32_t		start, stop;	
    	pthread_t		*threadIDs;
    	pthread_attr_t	 threadAttr;
    	void			*status;
    
    	if ( argc < 2 ) {
    		fprintf(stderr, "Usage: %s <No of threads>\nExiting...\n", argv[0]);
    		return 1;
    	}	
    
    	char*			p;			// gia xrisi me tin 'strtol', dixtis ston xaraktira pou den ine arithmos
    	long			numThreads;	// apotelesma tis 'strtol'
    	
    	numThreads = strtol(argv[1], &p, 10);
    
    	if ( (*p != '\n') && (*p != '\0') ) {
    		fprintf(stderr, "Usage: <No of threads> must be numerical.\n<%s> is NOT!\nExiting...", argv[1]);
    		return 1;
    	}
    	
    	threadIDs = (pthread_t*) malloc( numThreads * sizeof(pthread_t) );
    	printf("No of Threads : %li\n",numThreads);
    	
    	distribute_load(numThreads);
    	
    	pthread_mutex_init(&sched_mutex, NULL);
    	pthread_cond_init (&sched_val_cv, NULL);
    
    	pthread_attr_init(&threadAttr);
    	pthread_attr_setdetachstate(&threadAttr, PTHREAD_CREATE_JOINABLE);
    	
    	start = stampstart();
    	
    	long i;
    	for( i=0; i<numThreads; i++ )
    	{
    		pthread_create(&threadIDs[i], &threadAttr, someWork, (void *)(i+1));
    	}
    	
    	for( i=0; i<numThreads; i++)
    	{
    		pthread_join(threadIDs[i], &status);
    	}
    	
    	stop = stampstop(start);
    	
    	int sum = 0;
    	for ( i=0; i<numThreads+1; i++ )
    	{
    		printf("schedule[%d] : %d\n",i,schedule[i]);
    		sum+=schedule[i];
    	}
    	printf("SUM : %d\n\n",sum);
    
    	for ( i=0; i<numThreads; i++ )
    	{
    		printf("threadIDs[%d] : %d\n",i,threadIDs[i]);
    		sum+=schedule[i];
    	}
    	return 0;
    }
    Code:
    void distribute_load(long numThreadsC)
    {
    	int i, j, sum;
    	
    	schedule = (int*) malloc( numThreadsC+1 * sizeof(int) );
    	
    	for ( i=0; i<numThreadsC+1; i++ )
    		schedule[i] = 0;
    	
    	j = 1;
    	for ( i=WORK_PARTS; i>0; i-- )
    	{
    		schedule[j]++;
    		j = ( (j)%numThreadsC )+1;
    	}
    	
    	sum = 0;
    	for ( i=0; i<numThreadsC+1; i++ )
    	{
    		printf("schedule[%d] : %d\n",i,schedule[i]);
    		sum+=schedule[i];
    	}
    printf("SUM : %d\n\n",sum);
    	
    }
    The function I now call with the thread creation goes like this. I sent the pointer of the table cell that has the thread id stored incremented by 1 as a variable to use as a thread id. So each thread has a variable called 'myID' ranging from 1 to numThreads. The table I created before where I distributed the task parts has numThreads cells and except the first cell (schedule[0]) all the others have an integer value greater than 0. I made the program so it prints the table. For instance if I decided to use 5 threads the table will be like this:
    schedule[0] : 0
    schedule[1] : 4
    schedule[2] : 3
    schedule[3] : 3
    schedule[4] : 3
    schedule[5] : 3
    So I use as a condition variable for each thread the value of the cell with pointer the id of the previous thread. So thread with 'myID=1' checks 'schedule[0]', thread with 'myID=3' checks 'schedule[2]' and so on. If the value is different than 0 then I call pthread_cond_wait and if yes (which is valid only for the thread with 'myID=1' at the creation of all the threads it goes on to enter a while-loop where it calls the function which simulates the milliseconds for as many times as the value in the 'schedule[myID]' is indicating. If there were 5 threads then the thread with 'myID=1' would call the function comp four times. When it ended, that is when the value of the 'schedule[myID]' became 0 then it would callpthread_cond_broadcast to wake all the threads, so each of them checking again their corresponding variable and going back to sleep except for the one wich has 'myID' value larger by 1 than the current thread.

    Code:
    void *someWork(void *id)
    {
    long myID = (long)id;
    
    pthread_mutex_lock(&sched_mutex);
    printf("Hello World, this is thread No. %li\tI am assigned %d parts\n",myID,schedule[myID]);
    pthread_mutex_unlock(&sched_mutex);
    
    pthread_mutex_lock(&sched_mutex);
    while( schedule[myID-1]!=0 )
    	pthread_cond_wait(&sched_val_cv, &sched_mutex);
    pthread_mutex_unlock(&sched_mutex);
    
    pthread_mutex_lock(&sched_mutex);
    while( schedule[myID]!=0 )
    {
    	//printf("----->Thread No. %li\t%d Parts remaining.\n",myID,schedule[myID]);
    	pthread_mutex_unlock(&sched_mutex);
    	comp(10);
    	
    	pthread_mutex_lock(&sched_mutex);
    	schedule[myID]--;
    	pthread_mutex_unlock(&sched_mutex);
    }
    printf("---->Thread No. %li\t%d Parts remaining... Finished. Waking other thread!\n",myID,schedule[myID]);
    	
    pthread_cond_broadcast(&sched_val_cv);
    
    pthread_exit( (void*) 0 );
    }
    I admit than I am not sure whether I have understood 100% how and if condition variables are used like this (I am a little iffy still about how the wait works, with the mutex locking automatically and unlocking and when it gets a signal....) but I tried it anyway just to see how I was going and the thing is it worked (well at least the program ended with no errors and printing what I was expecting) when I run the program using as an argument munbers 2 to 6. However when using a greater number as an argument (7 to 16) something goes weird. The schedule array prints correct but when entering the thread function, cells 6 and above have random values (well, actually they are always the same, when using 7 threads schedule[6] suddenly has a value of 33 and schedule[7] of 0. When using 8 threads it is the same and in addition schedule[8] has the value 1668572463 which I think may be the thread id returned by the create function.)
    So I wanted just some opinions on what might go on and if anyone is in the mood in general at my approach.

    The makefile for anyone wanting to use the files I attached is this:
    Code:
    OBJ = pipe.o timer.o load.o
    CC = gcc
    CFLAGS = -Wall -Wextra -O2 -c
    LFLAGS = -pthread -lm
    
    	
    pipe: $(OBJ)
    	$(CC) $(LFLAGS) $(OBJ) -o pipe 
    	rm $(OBJ)
    	
    pipe.o: pipe.c fun_n_str.h
    	$(CC) $(CFLAGS) pipe.c 
    
    timer.o: timer.c fun_n_str.h
    	$(CC) $(CFLAGS) timer.c
    
    load.o: load.c fun_n_str.h
    	$(CC) $(CFLAGS) load.c
    I currently just use 10 as an argument when calling the comp function because I want to finish this first before going and trying to simulate different 'tasks'. Also I apologize for the lack of comment but unfortunately they are in my language so I erased them, and frankly I am pressed for time right now (have an appointment to go to) to rewrite them in English, which I am rather bad at, might I add.
    Attached Files Attached Files

  2. #2
    and the Hat of Guessing tabstop's Avatar
    Join Date
    Nov 2007
    Posts
    14,336
    Compare and contrast yours:
    Code:
    schedule = (int*) malloc( numThreadsC+1 * sizeof(int) );
    with mine:
    Code:
    schedule = (int*) malloc( (numThreadsC+1) * sizeof(int) );

  3. #3
    Registered User Phoenix_Rebirth's Avatar
    Join Date
    Dec 2005
    Location
    Cyprus
    Posts
    68
    Huh, didn't noticed that. I went back and changed my approach (created an extra cell so the thread with myID=1 could start from the beginning) so I added +1 to a lot of places and forgot about parentheses. * has higher priority, right, so that would be a problem. But why did it work for some cases. Does C allocates a bit more space just in case or something?

    Thanks for the hint by the way. I doubt I would have seen it - at least not for some time...

  4. #4
    and the Hat of Guessing tabstop's Avatar
    Join Date
    Nov 2007
    Posts
    14,336
    Probably just sheer luck about how much space you actually requested vs. how much was available. (There may well be padding added, since some variables require addresses divisible by four or by eight, so there could well be rounding there; plus many implementations of malloc puts some extra stuff "around" your memory, so you might not notice anything until you get past that.)

Popular pages Recent additions subscribe to a feed

Similar Threads

  1. Condition variables
    By sethjackson in forum Windows Programming
    Replies: 16
    Last Post: 03-19-2008, 11:42 AM
  2. Replies: 1
    Last Post: 06-11-2007, 04:59 PM
  3. SDL Condition variables.
    By antex in forum Game Programming
    Replies: 3
    Last Post: 11-11-2005, 07:11 AM
  4. Race condition: getting multiple threads to cooperate
    By FlyingDutchMan in forum C++ Programming
    Replies: 10
    Last Post: 03-31-2005, 05:53 AM
  5. pthread condition variables
    By jbsloan in forum C Programming
    Replies: 1
    Last Post: 03-08-2005, 12:45 PM