Code:
#define _GNU_SOURCE
#include <unistd.h>
#include <errno.h>
#include <stdbool.h>
#include <inttypes.h>
#include <stdlib.h>
#include <stdio.h>
#include <signal.h>
#include <poll.h>
#include <pthread.h>
#ifdef _WIN32
#include <namedpipeapi.h>
#endif
enum
{
ALL_MSG_NIL = 0
, ALL_MSG_DIE
, MM_MSG_NEW
, MM_MSG_DEL
, MM_MSG_ALT
, MM_MSG_INC
, MM_MSG_DEC
, MM_MSG_DIE
, MM_MSG_WAIT
, MM_MSG_READY
, MM_MSG_COUNT
};
int smem_msg_num[MM_MSG_COUNT] = {-1};
char *smem_msg_txt[MM_MSG_COUNT] = {NULL};
void init_smem_msg_txt();
struct memory_block
{
void *block;
size_t bytes;
};
#define get_memory_block_bytes( block ) ((block)->bytes)
#define get_memory_block_block( T, block ) ((T)((block)->block))
/* Initialises memory block structure
*
* @memory_block Pointer to structure you want to initialise, will segfault on
* invalid pointer
*
* @bytes Number of bytes you want to start with
*
* Will attempt to allocate block of bytes to point the structure to
*
* @return NULL on failure, valid pointer on success
*/
void * new_memory_block( struct memory_block *memory_block, size_t bytes )
{
memory_block->block = malloc( bytes );
memory_block->bytes = !!(memory_block) * bytes;
return memory_block->block;
}
/* Reallocates memory pointed by structure
*
* @memory_block Pointer to structure you want to reallocate pointed memory of,
* will segfault on invalid pointer
*
* @bytes New number of bytes to allocate
*
* Will attempt to reallocate block pointed to by structure and update the
* members on success, on failure it is left as is, attempts with invalid
* pointer in memory_block->block can lead to undefined behaviour, currently
* whatever realloc() does in this situation is what happens here as it is
* passed directly onto the function
*
* @return NULL on failure, memory_block->block on success
*/
void * alt_memory_block( struct memory_block *memory_block, size_t bytes )
{
void * block = realloc( memory_block->block, bytes );
uintptr_t old = (uintptr_t)(memory_block->block), nxt = (uintptr_t)block;
memory_block->block = (void*)(nxt + (!nxt * old));
memory_block->bytes = (!!nxt * bytes) + (!nxt * memory_block->bytes);
return block;
}
/* Reallocates memory pointed by structure
*
* @memory_block Pointer to structure you want to reallocate pointed memory of,
* will segfault on invalid pointer
*
* @bytes New number of bytes to allocate
*
* Will attempt to reallocate block pointed to by structure ONLY if the wanted
* bytes are more than the current amount allocated and update the
* members on success, on failure it is left as is, attempts with invalid
* pointer in memory_block->block can lead to undefined behaviour, currently
* whatever realloc() does in this situation is what happens here as it is
* passed directly onto the function
*
* @return NULL on failure, memory_block->block on success
*/
void * inc_memory_block( struct memory_block *memory_block, size_t bytes )
{
if ( bytes > memory_block->bytes )
{
void * block = realloc( memory_block->block, bytes );
uintptr_t old = (uintptr_t)(memory_block->block), nxt = (uintptr_t)block;
memory_block->block = (void*)(nxt + (!nxt * old));
memory_block->bytes = (!!nxt * bytes) + (!nxt * memory_block->bytes);
return block;
}
return NULL;
}
/* Reallocates memory pointed by structure
*
* @memory_block Pointer to structure you want to reallocate pointed memory of,
* will segfault on invalid pointer
*
* @bytes New number of bytes to allocate
*
* Will attempt to reallocate block pointed to by structure ONLY if the wanted
* bytes are less than the current amount allocated and update the
* members on success, on failure it is left as is, attempts with invalid
* pointer in memory_block->block can lead to undefined behaviour, currently
* whatever realloc() does in this situation is what happens here as it is
* passed directly onto the function
*
* @return NULL on failure, memory_block->block on success
*/
void * dec_memory_block( struct memory_block *memory_block, size_t bytes )
{
if ( bytes > memory_block->bytes )
{
void * block = realloc( memory_block->block, bytes );
uintptr_t old = (uintptr_t)(memory_block->block), nxt = (uintptr_t)block;
memory_block->block = (void*)(nxt + (!nxt * old));
memory_block->bytes = (!!nxt * bytes) + (!nxt * memory_block->bytes);
return block;
}
return NULL;
}
/* Releases pointed memory
*
* @memory_block Pointer to memory block structure holding a pointer to the
* block to be released, will segfault on invalid pointer
*
* Will attempt to release memory pointed to by memory_block->block, afterwards
* it will reinitialise the members to 0 & NULL, invalid pointers can lead to
* undefined behaviour, currently whatever free() does in this situation is
* what happens here as the pointer is directly passed on to the function
*/
void del_memory_block( struct memory_block *memory_block )
{
free( memory_block->block );
memory_block->block = NULL;
memory_block->bytes = 0;
}
struct memory_group
{
struct memory_block memory_block;
int total, count, focus;
};
#define get_memory_group_total( group ) ((group)->total)
#define get_memory_group_count( group ) ((group)->count)
#define get_memory_group_focus( group ) ((group)->focus)
#define get_memory_group_bytes( group ) \
get_memory_block_bytes( &((group)->memory_block) )
#define get_memory_group_group( T, group ) \
get_memory_block_block( T*, &((group)->memory_block) )
#define get_memory_group_entry( T, group, index ) \
(get_memory_group_group( T, group )[index])
struct memory_group shared_blocks = {0};
#define get_shared_blocks_total() get_memory_group_total( &shared_blocks )
#define get_shared_blocks_count() get_memory_group_count( &shared_blocks )
#define get_shared_blocks_focus() get_memory_group_focus( &shared_blocks )
#define get_shared_blocks_bytes() get_memory_group_bytes( &shared_blocks )
#define get_shared_blocks_group() \
get_memory_group_group( struct memory_block, &shared_blocks )
#define get_shared_blocks_entry( index ) \
(&(get_memory_group_entry( struct memory_block, &shared_blocks, index )))
#define PIPE_RD 0
#define PIPE_WR 1
#define PIPE_COUNT 2
#ifdef _WIN32
typedef HANDLE pipe_t;
#define INVALID_PIPE NULL
#else
typedef int pipe_t;
#define INVALID_PIPE -1
#endif
int open_pipes( pipe_t *pipes );
void shut_pipes( pipe_t *pipes );
int pipe_err( pipe_t pipe );
ssize_t rdpipe( pipe_t pipe, void *data, size_t size );
ssize_t wrpipe( pipe_t pipe, void *data, size_t size );
struct shared_block
{
size_t want;
struct memory_block *memory_block;
};
struct worker_msg
{
int type;
void *data;
};
#define INVALID_TID -1
typedef struct worker
{
// Thread reference
int num;
// Thread ID
pthread_t tid;
// Memory Manager
pipe_t mmi_pipes[PIPE_COUNT], mmo_pipes[PIPE_COUNT];
// Will an attempt to join be made?
bool join;
// Should an attempt to close member attr be made by main() during cleanup?
bool attr_created;
// Attributes
pthread_attr_t attr;
} worker_t;
size_t size;
int have_workers;
worker_t *workerv;
int new_worker( char const * const name );
#define get_worker( index ) ((index >= 0 && index )
void del_worker( int index );
typedef void * (*Worker_t)( void *arg );
void * mmworker( worker_t *worker );
void * fooworker( worker_t *worker );
int main()
{
int i;
enum
{
WORKER_NULL = 0,
WORKER_MM,
WORKER_FOO,
WORKER_COUNT
};
pipe_t pipes[PIPE_COUNT] = {INVALID_PIPE,INVALID_PIPE};
worker_t workers[WORKER_COUNT] = {{0}}, *worker, *worker_ret[WORKER_COUNT];
Worker_t Worker[WORKER_COUNT] = { NULL };
Worker[WORKER_MM] = (Worker_t)mmworker;
Worker[WORKER_FOO] = (Worker_t)fooworker;
#define JOIN
#ifdef JOIN
workers[WORKER_MM].join = true;
workers[WORKER_FOO].join = true;
#endif
init_smem_msg_txt();
printf("Hello World!\n");
if ( open_pipes( pipes ) != 0 )
goto cleanup;
for ( i = 1; i < WORKER_COUNT; ++i )
{
printf( "Creating worker %d...\n", i );
worker = &(workers[i]);
worker->num = i;
worker->tid = INVALID_TID;
worker->mmi_pipes[PIPE_RD] = pipes[PIPE_RD];
worker->mmi_pipes[PIPE_WR] = pipes[PIPE_WR];
worker->attr_created = false;
if ( pthread_attr_init( &(worker->attr) ) != 0 )
goto cleanup;
worker->attr_created = true;
if
( pthread_create(
&(worker->tid), &(worker->attr), Worker[i], worker ) != 0
) goto cleanup;
if ( !(worker->join) && pthread_detach( worker->tid ) != 0 )
goto cleanup;
}
#ifdef JOIN
pthread_join( workers[WORKER_MM].tid, (void**)&(worker_ret[WORKER_MM]) );
pthread_join( workers[WORKER_FOO].tid, (void**)&(worker_ret[WORKER_FOO]) );
#endif
cleanup:
while ( i > WORKER_MM )
{
--i;
worker = &(workers[i]);
printf( "Cleaning worker %d\n", i );
if ( worker->num >= 0 )
pthread_cancel( worker->tid );
if ( worker->attr_created )
pthread_attr_destroy( &(worker->attr) );
worker->attr_created = false;
worker->mmi_pipes[PIPE_WR] = INVALID_PIPE;
worker->mmi_pipes[PIPE_RD] = INVALID_PIPE;
worker->tid = INVALID_TID;
}
shut_pipes( pipes );
return 0;
}
int open_pipes( pipe_t *pipes )
{
#ifdef _WIN32
SECURITY_ATTRIBUTES sa;
sa.nLength = sizeof(sa);
sa.lpSecurityDescriptor = NULL;
sa.bInheritHandle = true;
#endif
pipes[0] = pipes[1] = INVALID_PIPE;
#ifdef _WIN32
int ret = (CreatePipe( &pipes[PIPE_RD], &pipes[PIPE_WR], 0 ) != 0 );
if ( ret != 0 )
{
pipes[PIPE_RD] = pipes[PIPE_WR] = INVALID_PIPE;
}
#else
return pipe( pipes );
#endif
}
void shut_pipes( pipe_t *pipes )
{
#ifdef _WIN32
CloseHandle( pipes[PIPE_WR] );
CloseHandle( pipes[PIPE_RD] );
#else
close( pipes[PIPE_WR] );
close( pipes[PIPE_RD] );
#endif
pipes[PIPE_RD] = pipes[PIPE_WR] = INVALID_PIPE;
}
int pipe_err( pipe_t pipe )
{
return errno;
}
ssize_t rdpipe( pipe_t pipe, void *data, size_t size )
{
#ifdef _WIN32
DWORD bytes = 0;
ReadFile( pipe, data, size, &bytes, NULL );
return bytes;
#else
return read( pipe, data, size );
#endif
}
ssize_t wrpipe( pipe_t pipe, void *data, size_t size )
{
#ifdef _WIN32
DWORD bytes = 0;
WriteFile( pipe, data, size, &bytes, NULL );
return bytes;
#else
return write( pipe, data, size );
#endif
}
void * mmworker( worker_t *worker )
{
int ret;
struct pollfd fds = {0};
pipe_t mm_pipes[PIPE_COUNT] =
{
worker->mmi_pipes[PIPE_RD],
worker->mmo_pipes[PIPE_WR]
};
fds.fd = mm_pipes[PIPE_RD];
fds.events = POLLIN | POLLERR | POLLHUP | POLLNVAL;
printf( "Worker %d started\n", worker->num );
while ( (ret = poll( &fds, 1, 1 )) >= 0 )
{
if ( ret == 1 )
{
struct worker_msg *worker_msg = NULL;
ssize_t bytes;
bytes = rdpipe(
mm_pipes[PIPE_RD], &worker_msg, sizeof(struct worker_msg) );
printf( "Worker %d: Read %zd bytes\n", worker->num, bytes );
}
}
/* Indicate the thread exited */
worker->num = -1;
return worker;
}
void * fooworker( worker_t *worker )
{
int ret;
struct pollfd fds = {0};
pipe_t mm_pipes[PIPE_COUNT] =
{
worker->mmo_pipes[PIPE_RD],
worker->mmi_pipes[PIPE_WR]
};
ssize_t bytes;
struct worker_msg *worker_msg = NULL;
printf( "Worker %d started\n", worker->num );
#if 0
while ( (ret = poll( &fds, 1, 1 )) >= 0 )
{
if ( ret == 1 )
{
struct worker_msg *worker_msg = NULL;
ssize_t bytes;
bytes = rdpipe
(
mm_pipes[PIPE_RD]
, &worker_msg
, sizeof(struct worker_msg)
);
printf( "Worker %d: Read %zd bytes\n", worker->num, bytes );
}
}
#endif
bytes = wrpipe(
mm_pipes[PIPE_WR], &worker_msg, sizeof(struct worker_msg) );
printf( "Worker %d: Wrote %zd bytes\n", worker->num, bytes );
/* Indicate the thread exited */
worker->num = -1;
return worker;
}
void init_smem_msg_txt()
{
smem_msg_txt[ALL_MSG_NIL] = "ALL_MSG_NIL";
smem_msg_txt[ALL_MSG_DIE] = "ALL_MSG_DIE";
smem_msg_txt[MM_MSG_NEW] = "MM_MSG_NEW";
smem_msg_txt[MM_MSG_DEL] = "MM_MSG_DEL";
smem_msg_txt[MM_MSG_ALT] = "MM_MSG_ALT";
smem_msg_txt[MM_MSG_INC] = "MM_MSG_INC";
smem_msg_txt[MM_MSG_DEC] = "MM_MSG_DEC";
smem_msg_txt[MM_MSG_DIE] = "MM_MSG_DIE";
smem_msg_txt[MM_MSG_WAIT] = "MM_MSG_WAIT";
smem_msg_txt[MM_MSG_READY] = "MM_MSG_READY";
}