Thread: Help with threading

  1. #1
    Registered User
    Join Date
    Apr 2005
    Posts
    10

    Help with threading

    I'm working on a 1-to-many consumer/producers problem. The trouble is when I execute more than one producer to connect to the consumer, the other producer that was already connected isn't finishing its execution. I was looking for some suggestions.

    consumer.c
    Code:
    /*
     * PJ
     * usage: ./consumer port
     */
    
    #include <stdio.h>
    #include <sys/types.h>
    #include <sys/socket.h>
    #include <fcntl.h>
    #include <netinet/in.h>
    #include <unistd.h>
    #include <pthread.h>
    
    #define BUFFER_SIZE 2
    
    typedef struct ITEM ITEM;
    
    pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
    pthread_cond_t items_are_available = PTHREAD_COND_INITIALIZER;
    pthread_cond_t space_is_available = PTHREAD_COND_INITIALIZER;
    pthread_cond_t buffer_is_empty = PTHREAD_COND_INITIALIZER;
    
    struct BUFFER buffer; //global buffer
    
    struct ITEM
    {
    	int prod_id;
    	int seq_num;
    };
    
    struct BUFFER
    {
    	int buffer_size;
    	ITEM *buffer;
    	//put index: next producer puts item here.
    	//This slot is available.
    	int p_index;
    	//consumer index: next consumer gets item here.
    	//This slot is occupied.
    	int g_index;
    	//number of items currently stored in buffer
    	int count;	
    };
    
    void BUFFER_init(struct BUFFER *buffer_p)
    {
    	buffer_p->p_index = 0;
    	buffer_p->g_index = 0;
    	buffer_p->buffer = (ITEM *) calloc(sizeof(ITEM), BUFFER_SIZE);
    	buffer_p->count = 0;
    	buffer_p->buffer_size = BUFFER_SIZE;
    }
    
    void BUFFER_destroy(struct BUFFER *buffer_p)
    {
    	free(buffer_p->buffer);
    }
    
    int BUFFER_is_empty(struct BUFFER *buffer_p)
    {
    	return (buffer_p->count == 0);
    }
    
    int BUFFER_is_full(struct BUFFER *buffer_p)
    {
    	return (buffer_p->count == buffer_p->buffer_size);
    }
    
    void BUFFER_put(struct BUFFER *buffer_p, ITEM *it)
    {
    	pthread_mutex_lock(&mutex);
    	while(BUFFER_is_full(buffer_p))
    		pthread_cond_wait(&space_is_available, &mutex);
    
    	buffer_p->buffer[buffer_p->p_index] = *it;
    
    	// if next index is going to overflow, loop to the front of the buffer
    	buffer_p->p_index+1 >= buffer_p->buffer_size ? buffer_p->p_index = 0 : buffer_p->p_index++;
    	
    	// buffer isn't empty anymore, let consumer know
    	if(buffer_p->count == 0)
    		pthread_cond_signal(&items_are_available);
    	buffer_p->count++;
    
    	fprintf(stdout, "producer %d: %d\n", it->prod_id, it->seq_num);
    	pthread_mutex_unlock(&mutex);
    }
    
    ITEM BUFFER_get(struct BUFFER *buffer_p)
    {
    	pthread_mutex_lock(&mutex);
    	while(BUFFER_is_empty(buffer_p))
    		pthread_cond_wait(&items_are_available, &mutex);
    
    	ITEM it = buffer_p->buffer[buffer_p->g_index];
    
    	// if next index is going to overflow, loop to the front of the buffer
    	buffer_p->g_index+1 >= buffer_p->buffer_size ? buffer_p->g_index = 0 : buffer_p->g_index++;
    	
    	// buffer isn't full anymore, let producers know
    	if(buffer_p->count >= buffer_p->buffer_size)
    		pthread_cond_broadcast(&space_is_available);
    	buffer_p->count--;
    	
    	fprintf(stdout, "c%d: %d\n", it.prod_id, it.seq_num);
    	pthread_mutex_unlock(&mutex);
    	
    	return it;
    }
    
    int create_tcp_endpoint(char *port)
    {
    	int sock;
    	struct sockaddr_in server;
    
    	sock = socket(AF_INET, SOCK_STREAM, 0);
    	if(sock == -1){
    		perror("Socket error");
    		exit(1);
    	}
    	server.sin_family = AF_INET;
    	server.sin_addr.s_addr = htonl(INADDR_ANY);
    	server.sin_port = htons(atoi(port));
    	if(bind(sock, (struct sockaddr *)&server, sizeof(server)) < 0){
    		perror("Bind error");
    		exit(1);
    	}
    	if(listen(sock, 5) == -1){
    	        perror("Listen error");
    	        exit(1);
    	}
    					        
    	printf("Server is running and listening...\n");
    	return sock;
    }
    
    void *producer(void *arg)
    {
    	int *fd = (int *)arg;
    	int n;
    	ITEM it;
    	
    	/* Collect item from producer */
    	while(n = read(*fd, &it, sizeof it)){
    		
    		if(n == -1){
    			perror("Producer read error");
    			exit(1);
    		}
    
    		/* put item to buffer */		
    		BUFFER_put(&buffer, &it);
    	}
    	close(*fd);
    }
    
    void *consumer(void *arg)
    {
    	while(1)
    		 BUFFER_get(&buffer);
    	return NULL;
    }
    
    int main(int argc, char **argv)
    {
    	int sock;
    	pthread_t cid;
    	
    	if(--argc != 1){
    		fprintf(stderr, "./consumer port\n");
    		return -1;
    	}
    
    	/* Initialize global buffer */
    	BUFFER_init(&buffer);
    
    	/* Start consumer thread */
    	pthread_create(&cid, NULL, consumer, NULL);
    
    	/* Create an endpoint to listen on */
    	sock = create_tcp_endpoint(argv[1]);
    
    	/* Enter our main service loop */
    	while (1) {
    		int sock_fd;
    		pthread_t pid;
    		
    		/* Get a connection from a client */
    		if((sock_fd = accept(sock, NULL, NULL)) == -1){
    			perror("Accept error");
    			exit(1);
    	        }
    
    		/* spawn thread for producer */
    		pthread_create(&pid, NULL, producer, &sock_fd);
            }
    
    	BUFFER_DESTROY(&buffer);	
    	return 0;
    }
    producer.c
    Code:
    /*
     * PJ Hyett
     * usage: ./producer prod-id num-of-items max-delay server-ip-addr server-port
     */
    
    #include <stdio.h>
    #include <sys/types.h>
    #include <sys/socket.h>
    #include <netinet/in.h>
    #include <fcntl.h>
    #include <netdb.h>
    
    typedef struct ITEM ITEM;
    
    struct ITEM
    {
    	int prod_id;
    	int seq_num;
    };
    
    int tcp_connect(char *host, char *port)
    {
    	struct sockaddr_in  server;
    	int sock;
    
    	/* Create a socket */    
    	sock = socket(AF_INET, SOCK_STREAM, 0);
    	if(sock == -1){
    		perror("Socket error");
    		exit(1);
    	}
    
    	/* Copy these into a socket address */
    	memset(&server, 0, sizeof(struct sockaddr_in));
    	server.sin_family = AF_INET;
    	server.sin_family = AF_INET;
    	server.sin_port = htons(atoi(port));
    	server.sin_addr.s_addr = inet_addr(host);
    
    	/* Now make the connection */
    	if((connect(sock, (struct sockaddr *)&server, sizeof server)) == -1){
    	        perror("Connect error");
    	        exit(1);
    	}
    						    
    	return sock;
    }
    
    int main(int argc, char **argv){
    
    	int sock_fd, id, num_items, max_delay;
    	
    	if(--argc != 5){
    		fprintf(stderr, "usage: ./producer prod-id num-of-items max-delay server-ip-addr server-port\n");
    		return -1;
    	}
    
    	/* Convert args to ints */
    	id = atoi(argv[1]);
    	num_items = atoi(argv[2]);
    	max_delay = atoi(argv[3]);
    
    	/* Connect to server */
    	sock_fd = tcp_connect(argv[4],argv[5]);
    	printf("Connected to server.\n");
    	
    	printf("Producing items.\n");
    	int k;
    	for(k = 1; k <= num_items; k++)
    	{
    		/* Producing item */
    		sleep((rand() % max_delay));
    		ITEM it;
    		it.prod_id = id;
    		it.seq_num = k;
    		
    		/* Pass item to server */
    		write(sock_fd, &it, sizeof it);
    	}
    	
    	/* close connection */
    	close(sock_fd);
    	printf("Connection closed.\n");
    
    	return 0;
    }

  2. #2
    Registered User Codeplug's Avatar
    Join Date
    Mar 2003
    Posts
    4,981
    >> pthread_create(&pid, NULL, producer, &sock_fd);
    ...
    >> int *fd = (int *)arg;

    That's not good, since sock_fd is a stack variable that, technically, isn't valid once the while loop loops after pthread_create() returns. What's probably going on is that all producer threads have a pointer to the same stack variable in main().

    I would do something like this to pass sock_fd "by value":

    >> pthread_create(&pid, NULL, producer, (void*)sock_fd);
    ...
    >> int fd = (int)arg;

    Also, the parameters to calloc() are reversed, although it shouldn't matter.

    gg
    Last edited by Codeplug; 06-02-2005 at 04:46 PM.

  3. #3
    Registered User
    Join Date
    Apr 2005
    Posts
    10
    That did the trick, thanks a bunch. I'll chalk that up to my C inexperience, I wasn't even aware you could do that.

Popular pages Recent additions subscribe to a feed

Similar Threads

  1. c++ threading
    By Anddos in forum C++ Programming
    Replies: 4
    Last Post: 12-28-2005, 03:29 PM
  2. C++ Threading?
    By draggy in forum C++ Programming
    Replies: 5
    Last Post: 08-16-2005, 12:16 PM
  3. Problem with threading
    By osal in forum Windows Programming
    Replies: 6
    Last Post: 07-21-2004, 12:41 PM
  4. starting to learn multi threading
    By hanhao in forum C++ Programming
    Replies: 2
    Last Post: 06-09-2004, 01:44 PM
  5. Threading
    By threads in forum C# Programming
    Replies: 0
    Last Post: 01-17-2003, 11:50 PM