Thread: threads using semaphores not working

  1. #1
    Registered User
    Join Date
    Oct 2010
    Posts
    2

    threads using semaphores not working

    The following code here does the job of copying a file, line by line to a buffer and then copying each line from the buffer to a new file-output.dat using threads.
    However the way I did it is wrong as it simply locks the whole writer when writing to the buffer and the same for the reader. Therefore having 3 instances of the writer and 3 instances of the reader thread is pointless.
    I have tried doing this with semaphores and tried placing them in almost everywhere you can imagine within the thread methods but they either seg fault and/or return the wrong order...mainly seg fault is the problem. If anyone is familiar with semaphores I would really really appreciate your help as to where I should put them, thanks


    The t_semaphore.h file:
    Code:
    /*
     * This code implements a counting semaphore for use with
     * threads using the pthread_mutex primitive
     * Code from the Open Group (maintainers of the POSIX 
     * standard)
     */
    
    /* sem.h */
    struct semaphore {
        pthread_mutex_t lock;
        pthread_cond_t nonzero;
        unsigned count;
    };
    typedef struct semaphore semaphore_t;
    
    
    semaphore_t *semaphore_create(char *semaphore_name);
    semaphore_t *semaphore_open(char *semaphore_name);
    void semaphore_post(semaphore_t *semap);
    void semaphore_wait(semaphore_t *semap);
    void semaphore_close(semaphore_t *semap);
    The only .c file:
    Code:
    #include <stdio.h>
    #include <stdlib.h>
    #include <sys/types.h>
    #include <sys/stat.h>
    #include <sys/mman.h>
    #include <fcntl.h>
    #include <pthread.h>
    #include "t_semaphore.h"
    
    void *file_to_buffer( void *ptr );
    void *buffer_to_file( void *ptr );
    int exists(const char *fname);
    
    
    int writeToBuffferLineNum, readFromBufferLineNum, fileEnd, bufferEnd, 
    currReaderIndex, p;
    
    semaphore_t *mutex;
    semaphore_t *fillCount;
    semaphore_t *emptyCount;
    
    char **buffer;
    
    FILE *input;
    FILE *output;
    
    int main(int argc, char *argv[])
    {
    
    	if ( argc != 2 ) /* argc should be 2 for correct execution */
    	{
    		printf( "usage: %s filename\n", argv[0] );
    	}
    	else 
    	{
    		pthread_t thread1, thread2, thread3, thread4, thread5, thread6;
    		
    		buffer = malloc(16*(sizeof(char *)));
    
    		if(exists("mutex.sem")==1)
    			remove("mutex.sem");
    		if(exists("fillCount.sem")==1)
    			remove("fillCount.sem");
    		if(exists("emptyCount.sem")==1)
    			remove("emptyCount.sem");
    
    		mutex = semaphore_create("mutex.sem");
    		mutex = semaphore_open("mutex.sem");
    		fillCount = semaphore_create("fillCount.sem");
    		fillCount = semaphore_open("fillCount.sem");
    		emptyCount = semaphore_create("emptyCount.sem");
    		emptyCount = semaphore_open("emptyCount.sem");
    
    		mutex->count = 1;
    		fillCount->count = 3;
    		emptyCount->count = 3;
    
    		for(p=0;p<16;p++)
    		{
    			buffer[p] = NULL;
    		}
    
    
    		input = fopen (argv[1], "r+");
    
    		if(exists("output.dat")==1)
    			remove("output.dat");
    
    		output = fopen("output.dat","a+");
    
    		/*Create independent threads each of which will execute function */
    		pthread_create( &thread1, NULL, file_to_buffer, NULL);
    	    pthread_create( &thread4, NULL, buffer_to_file, NULL);
    		pthread_create( &thread2, NULL, file_to_buffer, NULL);
    		pthread_create( &thread5, NULL, buffer_to_file, NULL);
    		pthread_create( &thread3, NULL, file_to_buffer, NULL);
    		pthread_create( &thread6, NULL, buffer_to_file, NULL);
    
    		/*  Wait till threads are complete before main continues. Unless we  */
    		/* wait we run the risk of executing an exit which will terminate   */
    		/* the process and all threads before the threads have completed.   */
    
    		pthread_join( thread1, NULL);
    		pthread_join( thread4, NULL);
    
    		pthread_join( thread2, NULL);
    
    		pthread_join( thread3, NULL);
    		pthread_join( thread5, NULL);
    		pthread_join( thread6, NULL); 
    
    	}
    	exit(0);
    
    }
    
    void *file_to_buffer( void *ptr )
    {
    
    	char *inputline = malloc(1024 * sizeof(char));
    
    
    	if(input==NULL) {
    		printf("file not found!\n");
    		exit(0);
    	}
    	else {
    		while(1 && fileEnd==0) {
    
    			semaphore_wait(emptyCount);
    
    			if(fileEnd==1)
    			{
    				pthread_mutex_unlock(emptyCount);
    				break;
    			}
    
    			if(!buffer[writeToBuffferLineNum])
    			{
    
    				fgets(inputline,1024,input);
    
    
    				if(feof(input))
    				{
    					fileEnd = 1;
    					buffer[writeToBuffferLineNum] = "*EOF*";
    					pthread_mutex_unlock(emptyCount);
    					break;
    				}
    
    				pthread_mutex_lock( mutex );
    
    				buffer[writeToBuffferLineNum] = malloc(1024 * sizeof(char));
    
    				strcpy(buffer[writeToBuffferLineNum], inputline);
    				
    				printf("writer: %d - %s", writeToBuffferLineNum, buffer[writeToBuffferLineNum]);
    				fflush(stdout);
    				
    				writeToBuffferLineNum++;
    
    				pthread_mutex_unlock( mutex );
    
    				if(writeToBuffferLineNum==16)
    					writeToBuffferLineNum=0;
    
    			}
    			semaphore_post(emptyCount);
    
    		}
    
    		fclose(input);
    
    	}
    
    
    	return NULL;
    }
    
    void *buffer_to_file( void *ptr )
    {
    
    	while(1 && bufferEnd==0)
    	{
    
    		semaphore_wait(fillCount);
    
    		if(buffer[readFromBufferLineNum])
    		{
    
    			if(strcmp(buffer[readFromBufferLineNum], "*EOF*")==0)
    			{
    				bufferEnd=1;
    				pthread_mutex_unlock(fillCount);
    				break;
    			}
    
    
    			pthread_mutex_lock( mutex );
    
    			fprintf(output,"%s", buffer[readFromBufferLineNum]);
    			
    			printf("reader: %d - %s", readFromBufferLineNum, buffer[readFromBufferLineNum]);
    			fflush(stdout);
    
    			buffer[readFromBufferLineNum] = NULL;
    
    			readFromBufferLineNum++;
    
    			pthread_mutex_unlock( mutex );
    
    			if(readFromBufferLineNum==16)
    				readFromBufferLineNum=0;
    
    
    		}
    		
    		semaphore_post(fillCount);
    
    	}
    
    	fclose(output);
    
    	return NULL;
    }
    
    /*adding direct implementations of semaphore methods as the given files
     *fail to compile 
     */
    
    semaphore_t *
    semaphore_create(char *semaphore_name)
    {
    	int fd;
    	semaphore_t *semap;
    	pthread_mutexattr_t psharedm;
    	pthread_condattr_t psharedc;
    
    
    	fd = open(semaphore_name, O_RDWR | O_CREAT | O_EXCL, 0666);
    	if (fd < 0)
    		return (NULL);
    	(void) ftruncate(fd, sizeof(semaphore_t));
    	(void) pthread_mutexattr_init(&psharedm);
    	(void) pthread_mutexattr_setpshared(&psharedm,
    			PTHREAD_PROCESS_SHARED);
    	(void) pthread_condattr_init(&psharedc);
    	(void) pthread_condattr_setpshared(&psharedc,
    			PTHREAD_PROCESS_SHARED);
    	semap = (semaphore_t *) mmap(NULL, sizeof(semaphore_t),
    			PROT_READ | PROT_WRITE, MAP_SHARED,
    			fd, 0);
    	close (fd);
    	(void) pthread_mutex_init(&semap->lock, &psharedm);
    	(void) pthread_cond_init(&semap->nonzero, &psharedc);
    	semap->count = 0;
    	return (semap);
    }
    
    
    semaphore_t *
    semaphore_open(char *semaphore_name)
    {
    	int fd;
    	semaphore_t *semap;
    
    	fd = open(semaphore_name, O_RDWR, 0666);
    
    	if (fd < 0)
    		return (NULL);
    	semap = (semaphore_t *) mmap(NULL, sizeof(semaphore_t),
    			PROT_READ | PROT_WRITE, MAP_SHARED,
    			fd, 0);
    
    	close (fd);
    	return (semap);
    }
    
    
    void
    semaphore_post(semaphore_t *semap)
    {
    	pthread_mutex_lock(&semap->lock);
    	if (semap->count == 0)
    		pthread_cond_signal(&semap->nonzero);
    	semap->count++;
    	pthread_mutex_unlock(&semap->lock);
    }
    
    
    void
    semaphore_wait(semaphore_t *semap)
    {
    
    	pthread_mutex_lock(&semap->lock);
    
    	while (&semap->count == 0)
    		pthread_cond_wait(&semap->nonzero, &semap->lock);
    
    	semap->count--;
    
    	pthread_mutex_unlock(&semap->lock);
    
    }
    
    
    void
    semaphore_close(semaphore_t *semap)
    {
    	munmap((void *) semap, sizeof(semaphore_t));
    }
    
    int exists(const char *fname)
    {
    	FILE *file;
    	if (file = fopen(fname, "r"))
    	{
    		fclose(file);
    		return 1;
    	}
    	return 0;
    }
    Last edited by davidelias16; 10-16-2010 at 10:19 AM. Reason: forgot the code brackets

  2. #2
    Registered User
    Join Date
    Dec 2007
    Posts
    2,675
    You also forgot the code TAGS!

    << !! Posting Code? Read this First !! >>

  3. #3
    and the hat of int overfl Salem's Avatar
    Join Date
    Aug 2001
    Location
    The edge of the known universe
    Posts
    39,659
    If you dance barefoot on the broken glass of undefined behaviour, you've got to expect the occasional cut.
    If at first you don't succeed, try writing your phone number on the exam paper.

  4. #4
    Registered User
    Join Date
    Oct 2010
    Posts
    2
    No? No one??? Is my question to long?

  5. #5
    Registered User Codeplug's Avatar
    Join Date
    Mar 2003
    Posts
    4,981
    You didn't copy the code correctly. Yours has a bug that isn't in the original: http://www.opengroup.org/onlinepubs/...g_16_441_08_02

    The rest of your code doesn't look right either:
    None of your int globals are initialized.
    There's unsynchronized access to globals.
    There all calls to pthread_XXX functions using semaphore_t parameters.
    The use of semaphores doesn't make sense.


    Your "fillCount" semaphore represents the number of "lines" in buffer that can be consumed. So the consumer thread needs to wait on it before consuming, and the producer thread needs to post to it after producing.

    Your "emptyCount' semaphore represents the number of slots in buffer that can receive new "lines". So the producer thread needs to wait on it before producing, and the consumer thread needs to post to it after consuming.

    At startup, fillCount->count should be 0, and emptyCount->count should be 16.


    I would first get the code working with just a single producer and single consumer threads. With multiple consumers/producers, you will need to take care to synchronize access to shared variables like "writeToBuffferLineNum" and "readFromBufferLineNum".

    gg

Popular pages Recent additions subscribe to a feed

Similar Threads

  1. Need Help: Multi-threading and Synchronization
    By Anom in forum C Programming
    Replies: 7
    Last Post: 12-08-2009, 05:34 PM
  2. Help explaining test questions
    By Sentral in forum General Discussions
    Replies: 26
    Last Post: 11-09-2009, 11:10 PM
  3. another trouble with semaphores
    By jsrig88 in forum C Programming
    Replies: 2
    Last Post: 11-05-2009, 05:33 AM
  4. Do I need to use semaphores for this problem?
    By Voondebah in forum C Programming
    Replies: 1
    Last Post: 04-20-2009, 02:50 PM
  5. Block and wake up certain threads
    By Spark in forum C Programming
    Replies: 9
    Last Post: 06-01-2002, 03:39 AM

Tags for this Thread