Thread: Lockless Threaded Queue

  1. #16
    Registered User Codeplug's Avatar
    Join Date
    Mar 2003
    Posts
    4,981
    >> So this is not correct?
    Haven't analyzed it since it was first published, but coming from Herb Sutter it's probably safe to assume that it's correct (at least against the C++0x draft that was available at the time).

    >> or did I misunderstand something?
    C++ volatile isn't even mentioned in that article. atomic<T> from the upcoming C++0x is used to guarantee ordering and memory consistency among threads. Not really related to Posix C.

    More good reading from Herb:
    Dr. Dobb's | Lock-Free Code: A False Sense of Security | September 8, 2008
    Dr. Dobb's | volatile vs. volatile | January 8, 2009

    >> The lock on the front end is really gonna slow me down...
    You don't know this without profiling a working MT-Q, under load.

    Writing correct, wait-free, data structures is extremely difficult in a language like C or C++03 which doesn't define a memory model for the "abstract machine". It would require assembly language and a knowledge of the architecture you're writing for. Even with a well defined memory model, correctness is difficult.

    If on Windows, then assembly language isn't necessarily required, thanks to the Interlocked API's - but correctness is still just as difficult. Which is why there's Interlocked SList as well.

    gg

  2. #17
    Registered User
    Join Date
    Jun 2008
    Posts
    93
    I have tried a queue with locks on front and back previously and when I start capturing over 20k packets per second the front end starts dropping packets because it has to wait to access the queue. I haven't tried it with the thread pool back end though so I'll give it a whirl and see what happens. Is there any way to give any type of priority to the producer using this type of queue?

  3. #18
    Registered User Codeplug's Avatar
    Join Date
    Mar 2003
    Posts
    4,981
    How do you know that malloc wasn't the bottle-neck? It needs to be profiled to know.

    How many consumers and producers per Q? How many Q's?
    How many cores will it run on?
    What OS?

    gg

  4. #19
    Registered User
    Join Date
    Jun 2008
    Posts
    93
    When you say "profile" I am not sure what you mean exactly. I haven't used any type of memory analysis tools etc yet. How would malloc be the bottle neck? How would I find out? It would definitely be interesting to know.

    -There is one producer thread that receives packets from the pcap library and stores them in the queue.

    -One queue currently

    -There are 20 consumer threads that remove from the queue.

    -It is running on Debian OS 2.6.26 kernel.

    -Two i7 processors for a total of 16 cores.

  5. #20
    Registered User Codeplug's Avatar
    Join Date
    Mar 2003
    Posts
    4,981
    Here's a start on profiling: gprof, bprof and Time Profilers

    2 i7's would be 8 cores I believe - which each core hyper-threaded for a total of 16 threads.

    gg

  6. #21
    Registered User
    Join Date
    Jun 2008
    Posts
    93
    OK I switched the code around to make it more readable. My previous implementation was too hard to follow. I also changed the queue so you do not have to use the worker threads. I commented the entire thing to give an explaination of what is going on. I am still getting random seg faults on the remove when the queue is under load. I tried running it in gdb but gdb slows down the execution so much that it never crashes. Do you see anything that I missed when rewriting this?

    Code:
    #ifndef QUEUE_H_
    #define QUEUE_H_
    
    #include <stdio.h>
    #include <stdlib.h>
    #include <pthread.h>
    
    typedef struct Node
    {
        void* data;
        int size;
        struct Node* next;
    } node_t;
    
    typedef struct queue
    {
    	unsigned int length;
    	unsigned int memory_allocated;
    	pthread_t* workers;
    	int num_workers;
    	pthread_cond_t  ready;
    	pthread_mutex_t queue_mutex;;
    	node_t* front;
    	node_t* back;
    	void* (*worker_task)(void *arg);
    } queue_t;
    
    void queue_new(queue_t **q);
    void queue_init(queue_t *queue, void *(*worker_task)(void *), int num_worker_threads);
    void queue_destroy(queue_t *q);
    void* queue_process(void *ptr);
    void queue_clear(queue_t *q);
    void queue_add(queue_t* queue, void* item, int size);
    void* queue_remove(queue_t* queue);
    unsigned int queue_length(queue_t* queue);
    unsigned int queue_mem(queue_t* queue);
    
    #endif
    Code:
    #include "queue.h"
    
    /* Performs various tasks in the event a worker thread is cancelled.  Each worker thread is assigned a cleanup handler. */
    static void cleanup_handler(void *arg) 
    {
      	pthread_mutex_unlock(&((queue_t*)arg)->queue_mutex);
    }
    
    void queue_init(queue_t *queue, void *(*f)(void *), int num_worker_threads)
    {
    	/* Initialize the pointer to the front of the queue to NULL */
    	queue->front = NULL;
    	
    	/* Initialize the pointer to the back of the queue to NULL */
    	queue->back = NULL;
    	
    	/* Assign the task that will be performing the work on the removed packets */
    	queue->worker_task = f;
    	
    	/* Set the length of the queue to 0 */
    	queue->length = 0;
        
        /* Set the amount of memory currently allocated by the queue to 0 */
        queue->memory_allocated = 0;
        
        /* Assign the number of worker threads this queue will be using */
     	queue->num_workers = num_worker_threads;
        
        if(queue->num_workers > 0)
        {
    	    /* Allocate memory for the worker thread array */
    	    queue->workers = (pthread_t *)calloc((num_worker_threads), sizeof(pthread_t));
        }
     	
     	/* Initialize the item removeal condition for this queue */
        pthread_cond_init(&queue->ready, NULL);
        
        /* Initialize the lock for this queue */
        pthread_mutex_init(&queue->queue_mutex, NULL);
    
    	/* Create all of the worker threads */
    	int x;
    	for(x =0; x<num_worker_threads; x++)
    	{
    		pthread_create(&queue->workers[x], NULL, queue_process, (void *)queue);
    	}
    }
    
    void queue_add(queue_t* queue, void* item, int size) 
    {
    	/* Lock the queue */
    	pthread_mutex_lock(&queue->queue_mutex);
    		
    	/* Allocate memory for the new node that will be added to the queue */
    	struct Node* new_node = malloc(sizeof(struct Node));
    	
    	/* The pointer to the new node will be null if memory could not be allocated for it */
    	if (new_node == NULL)
    	{
    		fprintf(stderr, "Error: Can not allocate memory for packet queue element!\n\n");
        	exit(EXIT_FAILURE);
    	}
    	else 
    	{	
    		/* Point the new node to the data that will be added and copy in the size of the new data */
    		new_node->data = item;
    		new_node->size = size;
    		
    		/* The new element is being added to the end of the queue, so the next pointer of the new node will be null */
        	new_node->next = NULL;
        	
    	    /* If the queue is empty ... */ 
    	    if (queue->back == NULL) 
    	    {
    	    	/* We have added the only node in the queue, so make the new node the front and back of the queue */
            	queue->front = new_node;
            	queue->back = new_node;
    	    } 
    	    
    	    /* If the queue is not empty ... */
    	    else 
    	    {
    			/* The next node that the current node at the end of the queue points to will be the new node */
            	queue->back->next = new_node;
            	
    			/* Point the queue's back pointer to the new node */
            	queue->back = new_node;
    	    }
        
        	/* Increase the amount of memory now being used by the queue */
        	queue->memory_allocated += size + sizeof(node_t);
    	    
    	    /* Increase the number of nodes in the queue */
    		queue->length++;
    		
    		/* Signal the worker threads that they now remove from the queue */
    		pthread_cond_signal(&queue->ready);
    		
    		/* Unlock the queue */
    		pthread_mutex_unlock(&queue->queue_mutex);
    	}
    }
    
    void* queue_remove(queue_t* queue) 
    {
    	/* Pointer the item we will be removing */
    	void* item = NULL;
    	
    	/* Lock the queue */
    	pthread_mutex_lock(&queue->queue_mutex);
    	
    	/* Push this thread's cleanup handler onto the stack */
    	pthread_cleanup_push(cleanup_handler,(void*)queue);
    	
    	if(queue->num_workers > 0)
    	{
    		/* This loop is to prevent more than one thread from accessing the queue at a time in the event of a spurious wakeup. */
    		while(queue->length == 0)
    		{	
    			/* Wait for queue_add to signal that it has added a new node */
    		    pthread_cond_wait(&queue->ready, &queue->queue_mutex);
    		}
    	}
    	
    	/* Temporary node pointer that will allow us to free the node being removed */				
    	node_t *tmp_node;
    	
       	/* Point the first node of the queue to the new temporary node */
       	tmp_node = queue->front;
        
       	/* The second node in the queue is going to become the front so point the front to the second node */
    	queue->front = tmp_node->next;
    	
    	/* Point the item we will be removing to the data portion of the node being removed */
    	item = tmp_node->data;
    	
    	/* Decrease the amount of memory being used by the queue by the size of the data being removed. */
    	queue->memory_allocated -= tmp_node->size + sizeof(node_t);
    	
    	/* Free the node we are removing */
    	free(tmp_node);
    	
    	/* Decrease the number of nodes in the queue */
    	queue->length--;
    	
    	/* If the queue becomes empty as a result of removing this element ... */
    	if (queue->front == NULL) 
    	{
    		/* Assign the back of the queue to null */
    		queue->back = NULL;
    	}
    
    	pthread_cleanup_pop(1);
    	
    	return item;
    }
    
    void queue_destroy(queue_t* queue)
    {	
    	/* Cancel all of the worker threads */
    	int x;
    	for(x =0; x<queue->num_workers; x++)
    	{
    		pthread_cancel(queue->workers[x]);
    	}
    	
    	/* Join all of the worker threads one by one waiting for each of them to finish */
    	for(x =0; x<queue->num_workers; x++)
    	{
    		pthread_join(queue->workers[x], NULL);
    	}
    	
    	/* Free all nodes still in the queue */
    	while(queue->length > 0)
    	{
    		/* Pointer to packet queue element to remove */				
    		node_t *tmp_node;
    		
    	   	/* Point the first element of the queue to the new temporary element*/
    	   	tmp_node = queue->front;
    	    
    	   	/* The second element in the queue becomes the front of the queue so 
    	   	 * the first element can no longer be acessed */
    		queue->front = tmp_node->next;
    		
    		/* Subtract the size of the packet that was removed from the total memory allocated size. */
    		queue->memory_allocated -= tmp_node->size;
    		
    		/* Deallocate the removed element from memory */
    		free(tmp_node->data);
    		free(tmp_node);
    		
    		/* Decrement the size of the queue in reference to the number of elements */
    		queue->length--;
    	}
    	
    	/* Destroy the removal condition variable */
    	pthread_cond_destroy(&queue->ready);
    	
    	/* Destroy the lock for this queue */	
    	pthread_mutex_destroy(&queue->queue_mutex);
    }
    
    void *queue_process(void *ptr)
    {	
    	/* Set this thread to be cancelable */
    	pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
    	
    	/* Set this thread to only check for cancellation at the cancellation points */
    	pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
    	
    	/* Continuously remove items from the queue */
    	while(1)
    	{
    		/* Remove an item from the queue */
    		void* item = queue_remove((queue_t *)ptr);
    		
    		/* Send the removed item to the task that was assigned to this queue */
    		((queue_t *)ptr)->worker_task(item);
    	}
    }
    
    unsigned int queue_length(queue_t* queue)
    {
    	unsigned int length = 0;
    	pthread_mutex_lock(&queue->queue_mutex);
    	length = queue->length;
    	pthread_mutex_unlock(&queue->queue_mutex);
    	
    	return length;
    }
    
    unsigned int queue_mem(queue_t* queue)
    {
    	unsigned int mem = 0;
    	pthread_mutex_lock(&queue->queue_mutex);
    	mem = queue->memory_allocated;
    	pthread_mutex_unlock(&queue->queue_mutex);
    	
    	return mem;
    }

  7. #22
    Registered User
    Join Date
    Jun 2008
    Posts
    93
    Wrote this to give it the gorilla test and isolate the problem but no luck in crashing it.....

    Code:
    #include<stdio.h>
    #include<pthread.h>
    #include<stdlib.h>
    #include<string.h>
    #include<unistd.h>
    #include "queue.h"
    
    void* task(void* vp);
    void* writer(void* vp);
    void* status(void* vp);
    
    queue_t Q;
    
    int main(int argc, char* argv[])
    {
    	char answer[50];
    
    	system("clear");
    	while(1)
    	{
    		printf("Would you like to add (a), remove (r), auto (au)? ");
    		scanf("%s", answer);
    
    		if((strcmp(answer, "a") == 0) || (strcmp(answer, "a") == 0))
    		{
    			queue_init(&Q, task, 0);
    		}
    		else if(strcmp(answer, "au") == 0)
    		{
    			queue_init(&Q, task, 10);
    		}
    
    		if(strcmp(answer, "a") == 0)
    		{
    			int items;
    			printf("How many items would you like to add? ");
    			scanf("%d", &items);
    
    			int x;
    			int y;
    			for(x=0; x<items; x++)
    			{
    				char* stuff = malloc(1000);
    				
    				for(y=0; y<1000; y++)
    				{
    					stuff[y] = 'a';
    				}
    				
    				queue_add(&Q, (void*)stuff, 1000);
    			}
    			
    			printf("There are %d items in the queue and a total of %d bytes in the queue.\n", queue_length(&Q), queue_mem(&Q));
    		}
    		else if(strcmp(answer, "r") == 0)
    		{
    			int items;
    			printf("How many items would you like to remove? ");
    			scanf("%d", &items);
    			
    			int x;
    			for(x=0; x<items; x++)
    			{
    				char* item = queue_remove(&Q);
    				free(item);
    			}
    			
    			printf("There are %d items in the queue and a total of %d bytes in the queue.\n", queue_length(&Q), queue_mem(&Q));
    		}
    		else if(strcmp(answer, "au") == 0)
    		{	
    			pthread_t thread_id;
    			pthread_create(&thread_id, NULL, status, NULL);
    			
    			pthread_t thread_id2;
    			pthread_create(&thread_id2, NULL, writer, NULL);
    			
    			sleep(2);
    			while(queue_length(&Q))
    			{
    				sleep(1);
    			}
    			
    			pthread_cancel(thread_id);
    			pthread_join(thread_id, NULL);
    			
    			system("clear");
    			status((void*)1);
    			
    			queue_destroy(&Q);
    		}
    	}
    	
    	return 0;
    }
    
    void* writer(void* vp)
    {
    	int x;
    	for(x=0; x<3000000; x++)
    	{
    		u_char* stuff = malloc(1000 * sizeof(u_char));
    		
    		int y;
    		for(y=0; y<1000; y++)
    		{
    			stuff[y] = 'a';
    		}
    		
    		queue_add(&Q, (void*)stuff, 1000);
    	}
    	
    	return NULL;
    }
    
    void* status(void* vp)
    { 
    	while(1)
    	{
    		pthread_testcancel();
    		printf("There are %d items in the queue and a total of %d bytes in the queue.\n", queue_length(&Q), queue_mem(&Q));
    		fflush(stdout);
    		usleep(100000);
    		
    		if((int*)vp)
    			break;
    			
    		system("clear");
    	}
    	
    	return NULL;
    }
    
    void* task(void* vp)
    {
    	usleep(100);
    	free(vp);
    	return NULL;
    }

  8. #23
    Registered User
    Join Date
    Jun 2008
    Posts
    93
    I was also looking into gprof but it gives very inaccurate results with multithreaded applications. There is a workaround preload that someone wrote but it appears to have the same issues. I'll keep working on it.

  9. #24
    Registered User
    Join Date
    Jun 2008
    Posts
    93
    Does anyone have any thoughts why pcap_remove might seg fault?

  10. #25
    Registered User
    Join Date
    Jun 2008
    Posts
    93
    sorry, i meant queue_remove

  11. #26
    {Jaxom,Imriel,Liam}'s Dad Kennedy's Avatar
    Join Date
    Aug 2006
    Location
    Alabama
    Posts
    1,065
    Please read post #4 of this for your answer -- he gave it to you at least twice.

  12. #27
    Registered User Codeplug's Avatar
    Join Date
    Mar 2003
    Posts
    4,981
    queue_add and queue_remove look correct to me. I don't believe they are the source of your issue.

    gg

  13. #28
    Registered User
    Join Date
    Jun 2008
    Posts
    93
    Yes. The problem was the method to which i was sending the items i was removing.

    Kennedy - I do not add null items to my queue so I should never have a null item. I believe this was explained earlier.

Popular pages Recent additions subscribe to a feed

Similar Threads

  1. Help with FIFO QUEUE
    By jackfraust in forum C++ Programming
    Replies: 23
    Last Post: 04-03-2009, 08:17 AM
  2. Fixing my program
    By Mcwaffle in forum C Programming
    Replies: 5
    Last Post: 11-05-2008, 03:55 AM
  3. help with queues
    By Unregistered in forum C Programming
    Replies: 3
    Last Post: 05-21-2002, 09:09 PM
  4. help with queues
    By Unregistered in forum C Programming
    Replies: 3
    Last Post: 05-21-2002, 11:39 AM
  5. queue help
    By Unregistered in forum C Programming
    Replies: 2
    Last Post: 10-29-2001, 09:38 AM