Thread: Need help on multithreading - receiving and storing data in parallel

  1. #1
    Registered User
    Join Date
    Feb 2008
    Posts
    12

    Need help on multithreading - receiving and storing data in parallel

    I have been trying to learn POSIX threads programming for my application - to receive data from an external device and store the data to a log file, with both processes running in parallel.

    I've succeed in trying to make the two processes running simultaneously. I've used a binning approach to store the data i.e. transfer the read data buffer to the write thread whenever the sample bin is full. However, I've got a problem after I saw the results from the log file - there is a delay gap between reading the samples and writing the data to the file. I'm using the fwrite command for the writing (which should be acceptable in speed). I've also tried using the memcpy command to transfer the read data buffer to a new data buffer for writing, but it could not solve the delay problem. I've implemented the flags to "communicate" between the two threads.

    I'm new to C programming and basically rely on the pthread examples available online. I'll greatly appreciate if anyone can enlighten me on what are the possible mistakes I could have made and if there is a better/smarter way of using the pthread technique.

    Any comments from robot enthusiasts will be valued, as I believe this is a common issue where one tries to receive and process the sensor data. Mine is not related to robotics but I'm hoping to receive/store the data at 7kHz.

    Code:
    /*****************************************************
    * DESCRIPTION:
    *   This is a multi-thread example to read and write samples to a output
    *   file. There are two threads in this program : a readval and writeval threads.
    *
    *   The readval thread reads in samples from a device and when the number of
    *   samples reaches a specific size, it is flagged to the 2nd thread and the 
    *   existing data is copied to a new buffer before the 1st thread reads again.
    *
    *   The writeval thread writes the binary data to a log file.
    *******************************************************/
    
    // ---- Include Files ----------------------------------------------------
    #include <fcntl.h>
    #include <unistd.h>
    #include <sys/mman.h>
    #include <stdlib.h>
    #include <stdio.h>
    #include <math.h>
    #include <string.h>
    #include <errno.h>
    #include <time.h>
    #include <pthread.h>
    
    pthread_mutex_t mymutex = PTHREAD_MUTEX_INITIALIZER;
    pthread_cond_t cond_sample_limit = PTHREAD_COND_INITIALIZER;
    
    int data_receive(int b);
    void *readval(void *arg);
    void *writeval(void *arg);
    
    #define NUM_THREADS	2
    #define NCH		3
    #define NSAMPLES	5000
    
    char filename1[50];
    char filename2[50];
    
    int data[NSAMPLES][NCH];
    int data1[NSAMPLES][NCH];
    
    int flag = 1;
    int nCount = 0;
    int totCount = 0;
    
    //*************** Main Program ***********************
    
    int main()
    {
    	char *dirname ="/data/";
    	char *input_fname = "resultlog.log";
    	strcpy(filename1,dirname);
    	strcat(filename1,input_fname);
    
    
    	pthread_t threads[NUM_THREADS];
      
    	if (pthread_create(&threads[0], NULL, readval, NULL)) {
    	printf("Error creating reading_thread.\n"); abort();
    	}
    
    	if (pthread_create(&threads[1], NULL, writeval, NULL)) {
     	printf("Error creating writing_thread.\n"); abort();
    	}
    
    	// Join all threads after completion
    	int tmp;
    	for (tmp = 0; tmp < NUM_THREADS; tmp++) {
    	pthread_join(threads[tmp], NULL);
    	}
    
    	pthread_mutex_destroy(&mymutex);
    	pthread_cond_destroy(&cond_sample_limit);
    
    	fflush(stdout);
    	return 0;
    }
    // ================== Reading Values ==================
    
    void *readval(void *arg)
    {
    	int ch,t;
    
    	time_t	prevTime;
        	time_t	endTime;
    	prevTime = time( NULL );
    	while ( time( NULL ) == prevTime ) {;}
    	endTime = prevTime + 50;
    
    	while (time( NULL ) <= endTime) {
    
                             // receiving samples from external device
    		for (ch=0; ch < NCH; ch++) {
    			data1[nCount][ch] = data_receive(channel[ch]);
    		}
    		nCount++;
    		
    		// start to bin the samples and proceed to 2nd thread when nCount==5000
    		if (nCount == NSAMPLES) {
    
    			memcpy(data, data1, NSAMPLES*NCH *sizeof(int));
    			nCount=0;
    			flag=2;
    			pthread_mutex_lock(&mymutex);
    			pthread_cond_broadcast(&cond_sample_limit);
    			pthread_mutex_unlock(&mymutex);
    		}
    		
    	totCount++;
    	}
    
    	// Last sample bin
    	pthread_mutex_lock(&mymutex);
    	flag=3;
    	pthread_cond_broadcast(&cond_sample_limit);
    	pthread_mutex_unlock(&mymutex);
    
    	pthread_exit(NULL);
    	fflush(stdout);
    
    }
    
    // ============ Writing Binaries to File ================
    
    void *writeval(void *arg) 
    {
    	size_t obj_size=sizeof(int);
    	size_t obj_cnt=sizeof(data)/sizeof(int);
    
    	FILE *p_file1;
    	p_file1=fopen(filename1,"a");
    	pthread_mutex_lock(&mymutex);
    
    	// writing data to file
    	while (flag!=0) {
    		pthread_cond_wait(&cond_sample_limit, &mymutex);
    	
    		if (flag==3) {
    			obj_cnt=nCount*NCH;		
    			flag = 0;
    		}
    		fwrite(&data, obj_size,obj_cnt, p_file1);
    
    	}
    	fflush(stdout);
      	pthread_mutex_unlock(&mymutex);
      	pthread_exit(NULL);
    	fclose(p_file1);
    }

  2. #2
    and the hat of int overfl Salem's Avatar
    Join Date
    Aug 2001
    Location
    The edge of the known universe
    Posts
    39,660
    > there is a delay gap between reading the samples and writing the data to the file.
    What do you mean by "gap" ?
    A gap in time, like a delay between when you thought the file was written and when you saw the file change on disk?

    Some comments.
    1. Anything after a pthread_exit() call isn't going to happen. So if you've got files to close, then do so before hand.
    2. Try fflush(p_file1) immediately after the fwrite() calls.
    3. What are the fflush(stdout) calls doing, since there seems to be no stdout activity.
    4. That global "flag" is horrible.
    5. Similarly, it seems that nCount is incremented and read outside of mutex protection.

    Code:
    struct buff {
        int count;
        int data[SIZE];
    };
    If you had two of these, you could swap them over and preserve a meaningful count for each buffer.
    If you dance barefoot on the broken glass of undefined behaviour, you've got to expect the occasional cut.
    If at first you don't succeed, try writing your phone number on the exam paper.

  3. #3
    Registered User
    Join Date
    Feb 2008
    Posts
    12
    Hi Salem,

    Thanks for your comments.

    Yes, there seems to have a time delay between the time when the data was in the buffer and when the data is actually written in the file. I've actually simulated some data (incrementing values with time) for the data_receive() and while it could receive and store the data, there is some missing values whenever the bin is full (i.e. when nCount==5000 in readval()).

    I've tried to implement your suggestions but there doesn't seem to have improvement in the delay.

    The changes I've made include:
    Code:
    void *readval(void *arg)
    {
    .
    .
                    pthread_mutex_lock(&mymutex);
    	if (nCount == NSAMPLES) {
    		memcpy(data, data1, NSAMPLES*NCH *sizeof(int));
    		nCount=0;
    		flag=2;
    		
    		pthread_cond_broadcast(&cond_sample_limit);
                   
                                    else {
                                    nCount++;
                                    }
                                     pthread_mutex_unlock(&mymutex);
    	}
    		
    	totCount++;
    	}
    
                    fflush(stdout);
                    pthread_exit(NULL);
    }
    Code:
    void *writeval(void *arg) 
    {
    	size_t obj_size=sizeof(int);
    	size_t obj_cnt=sizeof(data)/sizeof(int);
    
    	FILE *p_file1;
    	p_file1=fopen(filename1,"a");
    	pthread_mutex_lock(&mymutex);
    
    	// writing data to file
    	while (flag!=0) {
    		pthread_cond_wait(&cond_sample_limit, &mymutex);
    	
    		if (flag==3) {
    			obj_cnt=nCount*NCH;		
    			flag = 0;
    		}
    		fwrite(&data, obj_size,obj_cnt, p_file1);
                                    fflush(p_file1);
    
    	}
    	fclose(p_file1);
      	pthread_mutex_unlock(&mymutex);
      	pthread_exit(NULL);
    }
    However, I don't have an idea about the struct buff. Can you tell me how I can implement that in my program?

    I'm sorry if this is too much for you because I'm really new to programming and especially so for multiple thread programming.

    Thanks for your help.

    Peck

  4. #4
    Registered User Codeplug's Avatar
    Join Date
    Mar 2003
    Posts
    4,981
    Have you already written a single-threaded version of this app and determined that multiple threads were needed? Or are you just interested in learning MT programming?

    What you have now is no faster than a single threaded version of the same app.

    The main bottle-neck is the disk, so the best you can do is to do "work" while the disk is off doing work. Since your "work" is only receiving data from a device, only one "work" thread is needed (assuming only one thread can read from device at a time). If you need to process the data in some way before writing it to the disk, then that may be something that can be done in parallel to take advantage of multiple cores. In the end, you want to be doing your work at the same time the disk is doing its work. You also have to think about how you handle data coming in faster than you can write it. The strategy you use can be very simple or complex.

    In this application, we do not have overlap of reading and writing because mymutext is acquired during the writing/flushing and the reader thread can not continue looping until it acquires mymutext.

    Salem suggested a simple strategy for allowing you to read and write at the same time. Use 2 buffers (like you have now) *and* two counts, one for each buffer. While one thread is reading to one buffer, the other thread can be writing from the other buffer. During synchronization, each thread switches pointers to buffers and you continue.

    This is a simple strategy that will allow you to read as fast as you can write. The drawback being the reading bandwidth is limited by the writing bandwidth.

    gg

Popular pages Recent additions subscribe to a feed