I'm working on a 1-to-many consumer/producers problem. The trouble is when I execute more than one producer to connect to the consumer, the other producer that was already connected isn't finishing its execution. I was looking for some suggestions.
consumer.c
Code:
/*
* PJ
* usage: ./consumer port
*/
#include <stdio.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <fcntl.h>
#include <netinet/in.h>
#include <unistd.h>
#include <pthread.h>
#define BUFFER_SIZE 2
typedef struct ITEM ITEM;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t items_are_available = PTHREAD_COND_INITIALIZER;
pthread_cond_t space_is_available = PTHREAD_COND_INITIALIZER;
pthread_cond_t buffer_is_empty = PTHREAD_COND_INITIALIZER;
struct BUFFER buffer; //global buffer
struct ITEM
{
int prod_id;
int seq_num;
};
struct BUFFER
{
int buffer_size;
ITEM *buffer;
//put index: next producer puts item here.
//This slot is available.
int p_index;
//consumer index: next consumer gets item here.
//This slot is occupied.
int g_index;
//number of items currently stored in buffer
int count;
};
void BUFFER_init(struct BUFFER *buffer_p)
{
buffer_p->p_index = 0;
buffer_p->g_index = 0;
buffer_p->buffer = (ITEM *) calloc(sizeof(ITEM), BUFFER_SIZE);
buffer_p->count = 0;
buffer_p->buffer_size = BUFFER_SIZE;
}
void BUFFER_destroy(struct BUFFER *buffer_p)
{
free(buffer_p->buffer);
}
int BUFFER_is_empty(struct BUFFER *buffer_p)
{
return (buffer_p->count == 0);
}
int BUFFER_is_full(struct BUFFER *buffer_p)
{
return (buffer_p->count == buffer_p->buffer_size);
}
void BUFFER_put(struct BUFFER *buffer_p, ITEM *it)
{
pthread_mutex_lock(&mutex);
while(BUFFER_is_full(buffer_p))
pthread_cond_wait(&space_is_available, &mutex);
buffer_p->buffer[buffer_p->p_index] = *it;
// if next index is going to overflow, loop to the front of the buffer
buffer_p->p_index+1 >= buffer_p->buffer_size ? buffer_p->p_index = 0 : buffer_p->p_index++;
// buffer isn't empty anymore, let consumer know
if(buffer_p->count == 0)
pthread_cond_signal(&items_are_available);
buffer_p->count++;
fprintf(stdout, "producer %d: %d\n", it->prod_id, it->seq_num);
pthread_mutex_unlock(&mutex);
}
ITEM BUFFER_get(struct BUFFER *buffer_p)
{
pthread_mutex_lock(&mutex);
while(BUFFER_is_empty(buffer_p))
pthread_cond_wait(&items_are_available, &mutex);
ITEM it = buffer_p->buffer[buffer_p->g_index];
// if next index is going to overflow, loop to the front of the buffer
buffer_p->g_index+1 >= buffer_p->buffer_size ? buffer_p->g_index = 0 : buffer_p->g_index++;
// buffer isn't full anymore, let producers know
if(buffer_p->count >= buffer_p->buffer_size)
pthread_cond_broadcast(&space_is_available);
buffer_p->count--;
fprintf(stdout, "c%d: %d\n", it.prod_id, it.seq_num);
pthread_mutex_unlock(&mutex);
return it;
}
int create_tcp_endpoint(char *port)
{
int sock;
struct sockaddr_in server;
sock = socket(AF_INET, SOCK_STREAM, 0);
if(sock == -1){
perror("Socket error");
exit(1);
}
server.sin_family = AF_INET;
server.sin_addr.s_addr = htonl(INADDR_ANY);
server.sin_port = htons(atoi(port));
if(bind(sock, (struct sockaddr *)&server, sizeof(server)) < 0){
perror("Bind error");
exit(1);
}
if(listen(sock, 5) == -1){
perror("Listen error");
exit(1);
}
printf("Server is running and listening...\n");
return sock;
}
void *producer(void *arg)
{
int *fd = (int *)arg;
int n;
ITEM it;
/* Collect item from producer */
while(n = read(*fd, &it, sizeof it)){
if(n == -1){
perror("Producer read error");
exit(1);
}
/* put item to buffer */
BUFFER_put(&buffer, &it);
}
close(*fd);
}
void *consumer(void *arg)
{
while(1)
BUFFER_get(&buffer);
return NULL;
}
int main(int argc, char **argv)
{
int sock;
pthread_t cid;
if(--argc != 1){
fprintf(stderr, "./consumer port\n");
return -1;
}
/* Initialize global buffer */
BUFFER_init(&buffer);
/* Start consumer thread */
pthread_create(&cid, NULL, consumer, NULL);
/* Create an endpoint to listen on */
sock = create_tcp_endpoint(argv[1]);
/* Enter our main service loop */
while (1) {
int sock_fd;
pthread_t pid;
/* Get a connection from a client */
if((sock_fd = accept(sock, NULL, NULL)) == -1){
perror("Accept error");
exit(1);
}
/* spawn thread for producer */
pthread_create(&pid, NULL, producer, &sock_fd);
}
BUFFER_DESTROY(&buffer);
return 0;
}
producer.c
Code:
/*
* PJ Hyett
* usage: ./producer prod-id num-of-items max-delay server-ip-addr server-port
*/
#include <stdio.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <fcntl.h>
#include <netdb.h>
typedef struct ITEM ITEM;
struct ITEM
{
int prod_id;
int seq_num;
};
int tcp_connect(char *host, char *port)
{
struct sockaddr_in server;
int sock;
/* Create a socket */
sock = socket(AF_INET, SOCK_STREAM, 0);
if(sock == -1){
perror("Socket error");
exit(1);
}
/* Copy these into a socket address */
memset(&server, 0, sizeof(struct sockaddr_in));
server.sin_family = AF_INET;
server.sin_family = AF_INET;
server.sin_port = htons(atoi(port));
server.sin_addr.s_addr = inet_addr(host);
/* Now make the connection */
if((connect(sock, (struct sockaddr *)&server, sizeof server)) == -1){
perror("Connect error");
exit(1);
}
return sock;
}
int main(int argc, char **argv){
int sock_fd, id, num_items, max_delay;
if(--argc != 5){
fprintf(stderr, "usage: ./producer prod-id num-of-items max-delay server-ip-addr server-port\n");
return -1;
}
/* Convert args to ints */
id = atoi(argv[1]);
num_items = atoi(argv[2]);
max_delay = atoi(argv[3]);
/* Connect to server */
sock_fd = tcp_connect(argv[4],argv[5]);
printf("Connected to server.\n");
printf("Producing items.\n");
int k;
for(k = 1; k <= num_items; k++)
{
/* Producing item */
sleep((rand() % max_delay));
ITEM it;
it.prod_id = id;
it.seq_num = k;
/* Pass item to server */
write(sock_fd, &it, sizeof it);
}
/* close connection */
close(sock_fd);
printf("Connection closed.\n");
return 0;
}