Code:
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/mman.h>
#include <fcntl.h>
#include <pthread.h>
#include "t_semaphore.h"
void *file_to_buffer( void *ptr );
void *buffer_to_file( void *ptr );
int exists(const char *fname);
int writeToBuffferLineNum, readFromBufferLineNum, fileEnd, bufferEnd,
currReaderIndex, p;
semaphore_t *mutex;
semaphore_t *fillCount;
semaphore_t *emptyCount;
char **buffer;
FILE *input;
FILE *output;
int main(int argc, char *argv[])
{
if ( argc != 2 ) /* argc should be 2 for correct execution */
{
printf( "usage: %s filename\n", argv[0] );
}
else
{
pthread_t thread1, thread2, thread3, thread4, thread5, thread6;
buffer = malloc(16*(sizeof(char *)));
if(exists("mutex.sem")==1)
remove("mutex.sem");
if(exists("fillCount.sem")==1)
remove("fillCount.sem");
if(exists("emptyCount.sem")==1)
remove("emptyCount.sem");
mutex = semaphore_create("mutex.sem");
mutex = semaphore_open("mutex.sem");
fillCount = semaphore_create("fillCount.sem");
fillCount = semaphore_open("fillCount.sem");
emptyCount = semaphore_create("emptyCount.sem");
emptyCount = semaphore_open("emptyCount.sem");
mutex->count = 1;
fillCount->count = 3;
emptyCount->count = 3;
for(p=0;p<16;p++)
{
buffer[p] = NULL;
}
input = fopen (argv[1], "r+");
if(exists("output.dat")==1)
remove("output.dat");
output = fopen("output.dat","a+");
/*Create independent threads each of which will execute function */
pthread_create( &thread1, NULL, file_to_buffer, NULL);
pthread_create( &thread4, NULL, buffer_to_file, NULL);
pthread_create( &thread2, NULL, file_to_buffer, NULL);
pthread_create( &thread5, NULL, buffer_to_file, NULL);
pthread_create( &thread3, NULL, file_to_buffer, NULL);
pthread_create( &thread6, NULL, buffer_to_file, NULL);
/* Wait till threads are complete before main continues. Unless we */
/* wait we run the risk of executing an exit which will terminate */
/* the process and all threads before the threads have completed. */
pthread_join( thread1, NULL);
pthread_join( thread4, NULL);
pthread_join( thread2, NULL);
pthread_join( thread3, NULL);
pthread_join( thread5, NULL);
pthread_join( thread6, NULL);
}
exit(0);
}
void *file_to_buffer( void *ptr )
{
char *inputline = malloc(1024 * sizeof(char));
if(input==NULL) {
printf("file not found!\n");
exit(0);
}
else {
while(1 && fileEnd==0) {
semaphore_wait(emptyCount);
if(fileEnd==1)
{
pthread_mutex_unlock(emptyCount);
break;
}
if(!buffer[writeToBuffferLineNum])
{
fgets(inputline,1024,input);
if(feof(input))
{
fileEnd = 1;
buffer[writeToBuffferLineNum] = "*EOF*";
pthread_mutex_unlock(emptyCount);
break;
}
pthread_mutex_lock( mutex );
buffer[writeToBuffferLineNum] = malloc(1024 * sizeof(char));
strcpy(buffer[writeToBuffferLineNum], inputline);
printf("writer: %d - %s", writeToBuffferLineNum, buffer[writeToBuffferLineNum]);
fflush(stdout);
writeToBuffferLineNum++;
pthread_mutex_unlock( mutex );
if(writeToBuffferLineNum==16)
writeToBuffferLineNum=0;
}
semaphore_post(emptyCount);
}
fclose(input);
}
return NULL;
}
void *buffer_to_file( void *ptr )
{
while(1 && bufferEnd==0)
{
semaphore_wait(fillCount);
if(buffer[readFromBufferLineNum])
{
if(strcmp(buffer[readFromBufferLineNum], "*EOF*")==0)
{
bufferEnd=1;
pthread_mutex_unlock(fillCount);
break;
}
pthread_mutex_lock( mutex );
fprintf(output,"%s", buffer[readFromBufferLineNum]);
printf("reader: %d - %s", readFromBufferLineNum, buffer[readFromBufferLineNum]);
fflush(stdout);
buffer[readFromBufferLineNum] = NULL;
readFromBufferLineNum++;
pthread_mutex_unlock( mutex );
if(readFromBufferLineNum==16)
readFromBufferLineNum=0;
}
semaphore_post(fillCount);
}
fclose(output);
return NULL;
}
/*adding direct implementations of semaphore methods as the given files
*fail to compile
*/
semaphore_t *
semaphore_create(char *semaphore_name)
{
int fd;
semaphore_t *semap;
pthread_mutexattr_t psharedm;
pthread_condattr_t psharedc;
fd = open(semaphore_name, O_RDWR | O_CREAT | O_EXCL, 0666);
if (fd < 0)
return (NULL);
(void) ftruncate(fd, sizeof(semaphore_t));
(void) pthread_mutexattr_init(&psharedm);
(void) pthread_mutexattr_setpshared(&psharedm,
PTHREAD_PROCESS_SHARED);
(void) pthread_condattr_init(&psharedc);
(void) pthread_condattr_setpshared(&psharedc,
PTHREAD_PROCESS_SHARED);
semap = (semaphore_t *) mmap(NULL, sizeof(semaphore_t),
PROT_READ | PROT_WRITE, MAP_SHARED,
fd, 0);
close (fd);
(void) pthread_mutex_init(&semap->lock, &psharedm);
(void) pthread_cond_init(&semap->nonzero, &psharedc);
semap->count = 0;
return (semap);
}
semaphore_t *
semaphore_open(char *semaphore_name)
{
int fd;
semaphore_t *semap;
fd = open(semaphore_name, O_RDWR, 0666);
if (fd < 0)
return (NULL);
semap = (semaphore_t *) mmap(NULL, sizeof(semaphore_t),
PROT_READ | PROT_WRITE, MAP_SHARED,
fd, 0);
close (fd);
return (semap);
}
void
semaphore_post(semaphore_t *semap)
{
pthread_mutex_lock(&semap->lock);
if (semap->count == 0)
pthread_cond_signal(&semap->nonzero);
semap->count++;
pthread_mutex_unlock(&semap->lock);
}
void
semaphore_wait(semaphore_t *semap)
{
pthread_mutex_lock(&semap->lock);
while (&semap->count == 0)
pthread_cond_wait(&semap->nonzero, &semap->lock);
semap->count--;
pthread_mutex_unlock(&semap->lock);
}
void
semaphore_close(semaphore_t *semap)
{
munmap((void *) semap, sizeof(semaphore_t));
}
int exists(const char *fname)
{
FILE *file;
if (file = fopen(fname, "r"))
{
fclose(file);
return 1;
}
return 0;
}