Need Help: Multi-threading and Synchronization
Everything in this program works perfectly except for one thing that I cannot figure out how to fix. I've spent hours rearranging code without success.
There are 10 text files filled with random characters in a directory. Each file's contents are read into a global buffer by 'count' thread(s). Once the buffer is full, the 'count' thread(s) lock and the 'stats' thread stores the data then resets the buffer. There can be 1-10 threads working of the 10 files.
Here is what is going wrong:
When 1 thread is used, it counts 10 files worth of data.
When 2 threads are used, it counts 9 files worth of data.
When 3 threads are used, it counts 8 files worth of data.
When 4 threads are used, it counts 7 files worth of data.
When 5 threads are used, it counts 6 files worth of data.
When 6 threads are used, it counts 5 files worth of data.
When 7 threads are used, it counts 4 files worth of data.
When 8 threads are used, it counts 3 files worth of data.
When 9 threads are used, it counts 2 files worth of data.
When 10 threads are used, it counts 1 file worth of data.
See the pattern?
1 thread - 10 files
2 threads - 9 files
3 threads - 8 files
4 threads - 7 files
5 threads- 6 files
6 threads - 5 files
7 threads - 4 files
8 threads - 3 files
9 threads - 2 files
10 threads - 1 file
All 10 files are supposed to be read, regardless of how many 'count' threads are used.
I believe the program is being stopped prematurely by something in the 'count' thread before the threads can have their file's data read by 'stats'
Here is the output when 10 threads are reading from 10 files:
Code:
Stats locked
buffer full
Stats locked
buffer full <- Everything in red is the first count thread and the stats thread
Stats locked going back and forth filling the buffer then reading the data
buffer full
Stats locked
Counter: 1 <- the first thread is truly done at this point and is correct
Thread dead
Stats locked
Counter: 2 <- threads 2-10 are being stopped prematurely, without their
Thread dead corresponding file contents being counted by stats
Stats locked
Counter: 3
Thread dead
Stats locked
Counter: 4
Thread dead
Stats locked
Counter: 5
Thread dead
Stats locked
Counter: 6
Thread dead
Stats locked
Counter: 7
Thread dead
Stats locked
Counter: 8
Thread dead
Stats locked
Counter: 9
Thread dead
Stats locked
Counter: 10
Thread dead
Here is the source code:
Code:
/*
This program will use multi-threading and synchronization to have 'count' threads read in characters
from the provided text files into a buffer. Once the 'count' threads have completed their run, the
stats thread will count the characters from the buffer, and then reset the buffer. Once the counting
is finished, the program will output the count of each character.
compile with: gcc sync.c -lpthread
*/
#include <sys/types.h>
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <pthread.h>
#include <inttypes.h>
//Declarations
FILE *file;
char *buffer;
volatile int x, counter, filenum, comp, done = 0;
pthread_cond_t unlock_stats;
pthread_cond_t unlock_main;
pthread_cond_t unlock_counts;
pthread_mutex_t mutex1;
pthread_mutex_t mutex2;
//Beginning of count threads
void *count (void *arg)
{
int k;
char c;
char *name = (char *) malloc(2);
printf("counts\n");
pthread_mutex_lock(&mutex1);
while(counter!=10)
{
if (filenum==10)
{
break;
}
else
{
filenum++;
sprintf(name, "%i", filenum);
name = strcat(name, ".txt");
file = fopen(name, "r");
if(file==NULL)
{
printf("Error: can't open file.\n");
}
printf("Filenum: %i\n", filenum);
}
pthread_mutex_unlock(&mutex1);
while(1)
{
pthread_mutex_lock(&mutex1);
for (k=0;k<800;k++)
{
c = fgetc(file);
if(c!=EOF)
{
if (c!='\n')
{
if (comp == 7999)
{
pthread_cond_signal(&unlock_stats);
printf("buffer full\n");
pthread_cond_wait(&unlock_counts, &mutex1);
}
else
{
buffer[x] = c;
x++;
comp++;
}
}
}
else
{
counter++;
pthread_cond_signal(&unlock_stats);
break;
}
}
pthread_mutex_unlock(&mutex1);
if(c==EOF)
{
break;
}
}
int fclose(FILE *file);
}
printf("Counter: %i\n", counter);
printf("Thread dead\n");
}
//Beginning of stats thread
void *stats (void *arg)
{
printf("stats created");
int z;
int count[26];
for (z=0;z<26;z++)
{
count[z]=0;
}
pthread_mutex_lock(&mutex1);
while(counter!=10)
{
printf("Stats locked\n");
pthread_cond_wait(&unlock_stats, &mutex1);
x=0;
comp=0;
for (z=0; z<8000; z++)
{
count[ buffer[z] - 'a']++;
}
pthread_cond_signal(&unlock_main);
}
pthread_mutex_unlock(&mutex1);
//Print the stored data
printf("\nA is %i\n", count[0]);
printf("B is %i\n", count[1]);
printf("C is %i\n", count[2]);
printf("D is %i\n", count[3]);
printf("E is %i\n", count[4]);
printf("F is %i\n", count[5]);
printf("G is %i\n", count[6]);
printf("H is %i\n", count[7]);
printf("I is %i\n", count[8]);
printf("J is %i\n", count[9]);
printf("K is %i\n", count[10]);
printf("L is %i\n", count[11]);
printf("M is %i\n", count[12]);
printf("N is %i\n", count[13]);
printf("O is %i\n", count[14]);
printf("P is %i\n", count[15]);
printf("Q is %i\n", count[16]);
printf("R is %i\n", count[17]);
printf("S is %i\n", count[18]);
printf("T is %i\n", count[19]);
printf("U is %i\n", count[20]);
printf("V is %i\n", count[21]);
printf("W is %i\n", count[22]);
printf("X is %i\n", count[23]);
printf("Y is %i\n", count[24]);
printf("Z is %i\n", count[25]);
done = 1;
}
//Beginning of main function
int main()
{
int rc, i, status, num;
printf("Enter the number of threads, between 1 and 10:\n");
scanf("%i", &num);
if ((num>10) || (num<1))
{
printf("Invalid number.\n\n");
main();
}
pthread_t threads[num+1];
//Initialize mutex and conditions
pthread_mutex_init(&mutex1, NULL);
pthread_mutex_init(&mutex2, NULL);
pthread_cond_init(&unlock_stats, NULL);
pthread_cond_init(&unlock_main, NULL);
pthread_cond_init(&unlock_counts, NULL);
buffer = (char *) malloc (8000);
//Create the threads
rc = pthread_create(&threads[0], NULL, stats, NULL);
if (rc) printf("BAD\n");
for (i=1; i<(num+1); i++)
{
rc = pthread_create(&threads[i], NULL, count, NULL);
if (rc) printf("BAD\n");
}
pthread_mutex_lock(&mutex1);
while(counter!=10)
{
pthread_cond_wait(&unlock_main, &mutex1);
pthread_cond_signal(&unlock_counts);
}
pthread_mutex_unlock(&mutex1);
while(done!=1){}
printf("Done.\n");
pthread_mutex_destroy(&mutex1);
pthread_mutex_destroy(&mutex2);
pthread_join(threads[num+1], NULL);
}
Finding concurrency and synchronization bugs
In the future, the Jinx hypervisor might be helpful here. The current beta version is free.