I'm working on a 1-to-many consumer/producers problem. The trouble is when I execute more than one producer to connect to the consumer, the other producer that was already connected isn't finishing its execution. I was looking for some suggestions.
consumer.c
producer.cCode:/* * PJ * usage: ./consumer port */ #include <stdio.h> #include <sys/types.h> #include <sys/socket.h> #include <fcntl.h> #include <netinet/in.h> #include <unistd.h> #include <pthread.h> #define BUFFER_SIZE 2 typedef struct ITEM ITEM; pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t items_are_available = PTHREAD_COND_INITIALIZER; pthread_cond_t space_is_available = PTHREAD_COND_INITIALIZER; pthread_cond_t buffer_is_empty = PTHREAD_COND_INITIALIZER; struct BUFFER buffer; //global buffer struct ITEM { int prod_id; int seq_num; }; struct BUFFER { int buffer_size; ITEM *buffer; //put index: next producer puts item here. //This slot is available. int p_index; //consumer index: next consumer gets item here. //This slot is occupied. int g_index; //number of items currently stored in buffer int count; }; void BUFFER_init(struct BUFFER *buffer_p) { buffer_p->p_index = 0; buffer_p->g_index = 0; buffer_p->buffer = (ITEM *) calloc(sizeof(ITEM), BUFFER_SIZE); buffer_p->count = 0; buffer_p->buffer_size = BUFFER_SIZE; } void BUFFER_destroy(struct BUFFER *buffer_p) { free(buffer_p->buffer); } int BUFFER_is_empty(struct BUFFER *buffer_p) { return (buffer_p->count == 0); } int BUFFER_is_full(struct BUFFER *buffer_p) { return (buffer_p->count == buffer_p->buffer_size); } void BUFFER_put(struct BUFFER *buffer_p, ITEM *it) { pthread_mutex_lock(&mutex); while(BUFFER_is_full(buffer_p)) pthread_cond_wait(&space_is_available, &mutex); buffer_p->buffer[buffer_p->p_index] = *it; // if next index is going to overflow, loop to the front of the buffer buffer_p->p_index+1 >= buffer_p->buffer_size ? buffer_p->p_index = 0 : buffer_p->p_index++; // buffer isn't empty anymore, let consumer know if(buffer_p->count == 0) pthread_cond_signal(&items_are_available); buffer_p->count++; fprintf(stdout, "producer %d: %d\n", it->prod_id, it->seq_num); pthread_mutex_unlock(&mutex); } ITEM BUFFER_get(struct BUFFER *buffer_p) { pthread_mutex_lock(&mutex); while(BUFFER_is_empty(buffer_p)) pthread_cond_wait(&items_are_available, &mutex); ITEM it = buffer_p->buffer[buffer_p->g_index]; // if next index is going to overflow, loop to the front of the buffer buffer_p->g_index+1 >= buffer_p->buffer_size ? buffer_p->g_index = 0 : buffer_p->g_index++; // buffer isn't full anymore, let producers know if(buffer_p->count >= buffer_p->buffer_size) pthread_cond_broadcast(&space_is_available); buffer_p->count--; fprintf(stdout, "c%d: %d\n", it.prod_id, it.seq_num); pthread_mutex_unlock(&mutex); return it; } int create_tcp_endpoint(char *port) { int sock; struct sockaddr_in server; sock = socket(AF_INET, SOCK_STREAM, 0); if(sock == -1){ perror("Socket error"); exit(1); } server.sin_family = AF_INET; server.sin_addr.s_addr = htonl(INADDR_ANY); server.sin_port = htons(atoi(port)); if(bind(sock, (struct sockaddr *)&server, sizeof(server)) < 0){ perror("Bind error"); exit(1); } if(listen(sock, 5) == -1){ perror("Listen error"); exit(1); } printf("Server is running and listening...\n"); return sock; } void *producer(void *arg) { int *fd = (int *)arg; int n; ITEM it; /* Collect item from producer */ while(n = read(*fd, &it, sizeof it)){ if(n == -1){ perror("Producer read error"); exit(1); } /* put item to buffer */ BUFFER_put(&buffer, &it); } close(*fd); } void *consumer(void *arg) { while(1) BUFFER_get(&buffer); return NULL; } int main(int argc, char **argv) { int sock; pthread_t cid; if(--argc != 1){ fprintf(stderr, "./consumer port\n"); return -1; } /* Initialize global buffer */ BUFFER_init(&buffer); /* Start consumer thread */ pthread_create(&cid, NULL, consumer, NULL); /* Create an endpoint to listen on */ sock = create_tcp_endpoint(argv[1]); /* Enter our main service loop */ while (1) { int sock_fd; pthread_t pid; /* Get a connection from a client */ if((sock_fd = accept(sock, NULL, NULL)) == -1){ perror("Accept error"); exit(1); } /* spawn thread for producer */ pthread_create(&pid, NULL, producer, &sock_fd); } BUFFER_DESTROY(&buffer); return 0; }
Code:/* * PJ Hyett * usage: ./producer prod-id num-of-items max-delay server-ip-addr server-port */ #include <stdio.h> #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <fcntl.h> #include <netdb.h> typedef struct ITEM ITEM; struct ITEM { int prod_id; int seq_num; }; int tcp_connect(char *host, char *port) { struct sockaddr_in server; int sock; /* Create a socket */ sock = socket(AF_INET, SOCK_STREAM, 0); if(sock == -1){ perror("Socket error"); exit(1); } /* Copy these into a socket address */ memset(&server, 0, sizeof(struct sockaddr_in)); server.sin_family = AF_INET; server.sin_family = AF_INET; server.sin_port = htons(atoi(port)); server.sin_addr.s_addr = inet_addr(host); /* Now make the connection */ if((connect(sock, (struct sockaddr *)&server, sizeof server)) == -1){ perror("Connect error"); exit(1); } return sock; } int main(int argc, char **argv){ int sock_fd, id, num_items, max_delay; if(--argc != 5){ fprintf(stderr, "usage: ./producer prod-id num-of-items max-delay server-ip-addr server-port\n"); return -1; } /* Convert args to ints */ id = atoi(argv[1]); num_items = atoi(argv[2]); max_delay = atoi(argv[3]); /* Connect to server */ sock_fd = tcp_connect(argv[4],argv[5]); printf("Connected to server.\n"); printf("Producing items.\n"); int k; for(k = 1; k <= num_items; k++) { /* Producing item */ sleep((rand() % max_delay)); ITEM it; it.prod_id = id; it.seq_num = k; /* Pass item to server */ write(sock_fd, &it, sizeof it); } /* close connection */ close(sock_fd); printf("Connection closed.\n"); return 0; }



LinkBack URL
About LinkBacks


