I have written the code below for a lockles threaded queue I am trying to implement. I am almost done, but I have a few questions/issues. I have put comments into my code that relate to each issue to provide a better understanding of where I am in the code.
AREA 1 (main.c): I get a seg fault when I uncomment the code that frees my nodes. If you comment these two lines the program runs just fine but with a very large memory leak because nothing is actually being freed.
AREA 2 (queue.c): Am I calling the cleanup_handler and passing it the queue to unlock correctly? Also, did I implement the pthread_cleanup_push and pop correctly? I don't think the cleanup handler is popped off of the stack when a thread is cancelled. Therefore I only pushed it on one time for each worker thread, and I am assuming it never gets popped because the threads will not be terminating until the queue is destroyed.
AREA 3 (queue.c): I am not sure how to free the data portion of each node that is stored into the queue. I believe the way I have it is correct but again it seg faults when I use this code. I guess this is essentially the same problem as AREA 1.
AREA 4 (queue.c): I am very confused about spurious wakeups that arise when i call pthread_cond_signal. I understand what the spurious wakeups are, but I am not sure if the while() condition I am using will prevent them from occurring.
If you notice anything else that appears to be incorrect please let me know. I think most of the code is OK but I am confused about the issues above. Thanks.
main.c
Code:
#include "queue.h"
#include <unistd.h>
queue_t queue;
#define DEBUG_PRINT_STATUS
//#define DEBUG_PRINT_ADD
//#define DEBUG_PRINT_REMOVE
struct data
{
int x;
int y;
};
void* task(void* item);
void* writer();
void* status();
int main()
{
queue_init(&queue, task, 10);
#ifdef DEBUG_PRINT_STATUS
pthread_t status_id;
pthread_create(&status_id, NULL, status, NULL);
#endif
pthread_t writer_id;
pthread_create(&writer_id, NULL, writer, NULL);
pthread_join(writer_id, NULL);
queue_destroy(&queue);
return 0;
}
void* status()
{
char buffer[500];
while(1)
{
system("clear");
sprintf(buffer, "Length:\t%d\nMemory Usage:\t%d\n", queue.length, queue.memory_allocated);
printf("%s", buffer);
usleep(100000);
}
return NULL;
}
void* writer()
{
int x;
for(;;)
{
struct data* information = malloc(sizeof(struct data));
information->x = x+1;
information->y = x+2;
#ifdef DEBUG_PRINT_ADD
printf("\nADD: %lu has added an item with values x=%d y=%d\n", pthread_self(), ((struct data*)information)->x, ((struct data*)information)->y);
#endif
queue_add(&queue, (void*)information, sizeof(struct data));
}
return NULL;
}
void* task(void* item)
{
#ifdef DEBUG_PRINT_ADD
printf("\nREMOVE: %lu has retrieved an item with values x=%d y=%d\n", pthread_self(), ((struct data*)(((node_t*)item)->data))->x, ((struct data*)(((node_t*)item)->data))->y);
#endif
/****** AREA 1 ******/
free(((node_t*)item)->data);
free(item);
return NULL;
}
main.h
Code:
#ifndef MAIN_H_
#define MAIN_H_
void* writer();
int task();
#endif
queue.c
Code:
#include "queue.h"
/***** AREA 2 *****/
static void cleanup_handler(void *arg)
{
pthread_mutex_unlock(&((queue_t*)arg)->queue_mutex);
}
void queue_init(queue_t *queue, void *(*f)(void *), int num_worker_threads)
{
node_t* dummy = malloc(sizeof(node_t));
queue->first = dummy;
queue->divider = dummy;
queue->last = dummy;
queue->worker_task = f;
queue->length = 0;
queue->memory_allocated = 0;
queue->workers = (pthread_t *)calloc((num_worker_threads), sizeof(pthread_t));
queue->num_workers = num_worker_threads;
pthread_cond_init(&queue->ready, NULL);
pthread_mutex_init(&queue->queue_mutex, NULL);
pthread_mutex_init(&queue->backup_mutex, NULL);
int x;
for(x =0; x<num_worker_threads; x++)
{
pthread_create(&queue->workers[x], NULL, queue_process, (void *)queue);
}
}
void queue_add(queue_t* queue, void* item, int size)
{
struct Node* new_node = malloc(sizeof(struct Node));
new_node->data = item;
new_node->size = size;
pthread_mutex_lock(&queue->queue_mutex);
queue->last->next = new_node;
queue->last = queue->last->next;
while( queue->first != queue->divider )
{
node_t* tmp = queue->first;
queue->first = queue->first->next;
free(tmp);
}
queue->memory_allocated += size + sizeof(node_t);
queue->length++;
pthread_mutex_unlock(&queue->queue_mutex);
pthread_cond_signal(&queue->ready);
}
void* queue_remove(queue_t* queue, int is_worker_thread)
{
void* item = NULL;
if(is_worker_thread)
{
pthread_mutex_lock(&queue->queue_mutex);
/***** AREA 4 *****/
while(queue->length == 0)
{
pthread_cond_wait(&queue->ready, &queue->queue_mutex);
}
}
pthread_mutex_lock(&queue->backup_mutex);
if( queue->divider != queue->last )
{
item = malloc(queue->divider->size);
item = queue->divider->next;
queue->memory_allocated -= (queue->divider->size + sizeof(node_t));
queue->divider = queue->divider->next;
}
queue->length--;
pthread_mutex_unlock(&queue->backup_mutex);
if(is_worker_thread)
{
pthread_mutex_unlock(&queue->queue_mutex);
}
return item;
}
void queue_destroy(queue_t* queue)
{
int x;
for(x =0; x<queue->num_workers; x++)
{
pthread_cancel(queue->workers[x]);
}
for(x =0; x<queue->num_workers; x++)
{
pthread_join(queue->workers[x], NULL);
}
while(queue->length > 0)
{
/***** AREA 3 *****/
node_t* item = queue_remove(queue, 0);
free(item->data);
free(item);
}
pthread_cond_destroy(&queue->ready);
pthread_mutex_destroy(&queue->queue_mutex);
}
void *queue_process(void *ptr)
{
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
/***** AREA 2 *****/
pthread_cleanup_push(cleanup_handler,(void*)ptr);
while(1)
{
void* item = queue_remove((queue_t *)ptr, 1);
((queue_t *)ptr)->worker_task(item);
}
/***** AREA 2 *****/
pthread_cleanup_pop(0);
}
queue.h
Code:
#ifndef QUEUE_H_
#define QUEUE_H_
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
typedef struct Node
{
void* data;
int size;
struct Node* next;
} node_t;
typedef struct queue
{
unsigned int length;
unsigned int memory_allocated;
pthread_t* workers;
int num_workers;
pthread_cond_t ready;
pthread_mutex_t queue_mutex;
pthread_mutex_t backup_mutex;
node_t* first;
volatile node_t* divider;
volatile node_t* last;
void* (*worker_task)(void *arg);
} queue_t;
void queue_new(queue_t **q);
void queue_init(queue_t *queue, void *(*worker_task)(void *), int num_worker_threads);
void queue_destroy(queue_t *q);
void* queue_process(void *ptr);
void queue_clear(queue_t *q);
void queue_add(queue_t* queue, void* item, int size);
void* queue_remove(queue_t* queue, int is_worker_thread);
#endif