I have written the code below for a lockles threaded queue I am trying to implement. I am almost done, but I have a few questions/issues. I have put comments into my code that relate to each issue to provide a better understanding of where I am in the code.

AREA 1 (main.c): I get a seg fault when I uncomment the code that frees my nodes. If you comment these two lines the program runs just fine but with a very large memory leak because nothing is actually being freed.

AREA 2 (queue.c): Am I calling the cleanup_handler and passing it the queue to unlock correctly? Also, did I implement the pthread_cleanup_push and pop correctly? I don't think the cleanup handler is popped off of the stack when a thread is cancelled. Therefore I only pushed it on one time for each worker thread, and I am assuming it never gets popped because the threads will not be terminating until the queue is destroyed.

AREA 3 (queue.c): I am not sure how to free the data portion of each node that is stored into the queue. I believe the way I have it is correct but again it seg faults when I use this code. I guess this is essentially the same problem as AREA 1.

AREA 4 (queue.c): I am very confused about spurious wakeups that arise when i call pthread_cond_signal. I understand what the spurious wakeups are, but I am not sure if the while() condition I am using will prevent them from occurring.

If you notice anything else that appears to be incorrect please let me know. I think most of the code is OK but I am confused about the issues above. Thanks.

main.c

Code:
#include "queue.h"
#include <unistd.h>

queue_t queue;

#define DEBUG_PRINT_STATUS
//#define DEBUG_PRINT_ADD
//#define DEBUG_PRINT_REMOVE

struct data
{
	int x;
	int y;
};

void* task(void* item);
void* writer();
void* status();

int main()
{
	queue_init(&queue, task, 10);
	
#ifdef DEBUG_PRINT_STATUS
	pthread_t status_id;
	pthread_create(&status_id, NULL, status, NULL);
#endif
	
	pthread_t writer_id;
	pthread_create(&writer_id, NULL, writer, NULL);
	
	pthread_join(writer_id, NULL);

	queue_destroy(&queue);
	
	return 0;
}

void* status()
{
	char buffer[500];
	while(1)
	{
		system("clear");
		
		sprintf(buffer, "Length:\t%d\nMemory Usage:\t%d\n", queue.length, queue.memory_allocated);
		
		printf("%s", buffer);
		
		usleep(100000);
	}
	
	return NULL;
}

void* writer()
{
	int x;

	for(;;)
	{
		struct data* information = malloc(sizeof(struct data));
		information->x = x+1;
		information->y = x+2;
		
#ifdef DEBUG_PRINT_ADD
		printf("\nADD:  %lu has added an item with values x=%d y=%d\n", pthread_self(), ((struct data*)information)->x, ((struct data*)information)->y);
#endif

		queue_add(&queue, (void*)information, sizeof(struct data));
	}
	
	return NULL;
}

void* task(void* item)
{
	
#ifdef DEBUG_PRINT_ADD
	printf("\nREMOVE:  %lu has retrieved an item with values x=%d y=%d\n", pthread_self(), ((struct data*)(((node_t*)item)->data))->x,  ((struct data*)(((node_t*)item)->data))->y);
#endif

	/****** AREA 1 ******/
	free(((node_t*)item)->data);
	free(item);
	
	return NULL;
}
main.h
Code:
#ifndef MAIN_H_
#define MAIN_H_

void* writer();
int task();

#endif
queue.c

Code:
#include "queue.h"

/***** AREA 2 *****/
static void cleanup_handler(void *arg) 
{
  	pthread_mutex_unlock(&((queue_t*)arg)->queue_mutex);
}

void queue_init(queue_t *queue, void *(*f)(void *), int num_worker_threads)
{
	node_t* dummy = malloc(sizeof(node_t));
	queue->first = dummy;
	queue->divider = dummy;
	queue->last = dummy;
	
	queue->worker_task = f;
	
	queue->length = 0;
    
    queue->memory_allocated = 0;
    
    queue->workers = (pthread_t *)calloc((num_worker_threads), sizeof(pthread_t));
 	
 	queue->num_workers = num_worker_threads;
 	
    pthread_cond_init(&queue->ready, NULL);
    pthread_mutex_init(&queue->queue_mutex, NULL);
	pthread_mutex_init(&queue->backup_mutex, NULL);

	int x;
	for(x =0; x<num_worker_threads; x++)
	{
		pthread_create(&queue->workers[x], NULL, queue_process, (void *)queue);
	}
}

void queue_add(queue_t* queue, void* item, int size) 
{
	struct Node* new_node = malloc(sizeof(struct Node));
	new_node->data = item;
	new_node->size = size;
	
	pthread_mutex_lock(&queue->queue_mutex);
	
    queue->last->next = new_node;
    queue->last = queue->last->next;
    
    while( queue->first != queue->divider ) 
    {
      	node_t* tmp = queue->first;
		queue->first = queue->first->next;
     	free(tmp);
    }
    
    queue->memory_allocated += size + sizeof(node_t);
	    
	queue->length++;
	
	pthread_mutex_unlock(&queue->queue_mutex);
	
	pthread_cond_signal(&queue->ready);
}

void* queue_remove(queue_t* queue, int is_worker_thread) 
{
	void* item = NULL;
	
	if(is_worker_thread)
	{
		pthread_mutex_lock(&queue->queue_mutex);
		
		/***** AREA 4 *****/
		while(queue->length == 0)
		{	
		    pthread_cond_wait(&queue->ready, &queue->queue_mutex);
		}
	}
	
	pthread_mutex_lock(&queue->backup_mutex);
	
    if( queue->divider != queue->last ) 
    {
    	item = malloc(queue->divider->size);
      	item = queue->divider->next;
      	queue->memory_allocated -= (queue->divider->size + sizeof(node_t));
      	queue->divider = queue->divider->next;
    }

	queue->length--;
    
	pthread_mutex_unlock(&queue->backup_mutex);
	
	if(is_worker_thread)
	{
		pthread_mutex_unlock(&queue->queue_mutex);
	}
	
	return item;
}

void queue_destroy(queue_t* queue)
{	
	int x;
	for(x =0; x<queue->num_workers; x++)
	{
		pthread_cancel(queue->workers[x]);
	}
	
	for(x =0; x<queue->num_workers; x++)
	{
		pthread_join(queue->workers[x], NULL);
	}
	
	while(queue->length > 0)
	{
		/***** AREA 3 *****/
		node_t* item = queue_remove(queue, 0);
		free(item->data);
		free(item);
	}
	
	pthread_cond_destroy(&queue->ready);	
	pthread_mutex_destroy(&queue->queue_mutex);
}

void *queue_process(void *ptr)
{	
	pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
	pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
	
	/***** AREA 2 *****/
	pthread_cleanup_push(cleanup_handler,(void*)ptr);
	
	while(1)
	{
		void* item = queue_remove((queue_t *)ptr, 1);
		((queue_t *)ptr)->worker_task(item);
	}
	
	/***** AREA 2 *****/
	pthread_cleanup_pop(0);
}
queue.h

Code:
#ifndef QUEUE_H_
#define QUEUE_H_

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

typedef struct Node
{
    void* data;
    int size;
    struct Node* next;
} node_t;

typedef struct queue 
{
	unsigned int length;
	unsigned int memory_allocated;
	pthread_t* workers;
	int num_workers;
	pthread_cond_t  ready;
	pthread_mutex_t queue_mutex;
	pthread_mutex_t backup_mutex;
	node_t* first;
	volatile node_t* divider;
	volatile node_t* last;
	void* (*worker_task)(void *arg);
} queue_t;

void queue_new(queue_t **q);
void queue_init(queue_t *queue, void *(*worker_task)(void *), int num_worker_threads);
void queue_destroy(queue_t *q);
void* queue_process(void *ptr);
void queue_clear(queue_t *q);
void queue_add(queue_t* queue, void* item, int size);
void* queue_remove(queue_t* queue, int is_worker_thread);

#endif