Thread: Understanding pipes...

  1. #1
    Registered User awsdert's Avatar
    Join Date
    Jan 2015
    Posts
    1,429

    Understanding pipes...

    Going back to my gasp project in a little bit and planning to rewrite it from the ground up after getting to grips with pipes, it is that "getting to grips" part that I'm having trouble with however, here's what I have (ignore the win32 code as I haven't even checked the reference yet for those).

    Code:
    #include <unistd.h>
    #include <stdlib.h>
    #include <stdio.h>
    #include <pthread.h>
    
    typedef struct smem
    {
    	size_t have, want;
    	void *data;
    } smem_t;
    
    typedef struct smemv
    {
    	size_t have;
    	smem_t *data;
    } smemv_t;
    
    #ifdef _WIN32
    typedef HANDLE pipe_t;
    #define INVALID_PIPE NULL
    #else
    typedef int pipe_t;
    #define INVALID_PIPE -1
    #endif
    
    int open_pipes( pipe_t pipes[2] );
    void shut_pipes( pipe_t pipes[2] );
    int pipe_err( pipe_t pipe );
    ssize_t rdpipe( pipe_t pipe, void *data, size_t size );
    ssize_t wrpipe( pipe_t pipe, void *data, size_t size );
    
    #define INVALID_TID -1
    
    typedef struct worker
    {
    	int num;
    	pthread_t tid;
    	pipe_t pipes[2];
    	pthread_attr_t pthread_attr;
    } worker_t;
    
    typedef void * (*Worker_t)( void *arg );
    
    void * rdworker( worker_t *worker );
    void * wrworker( worker_t *worker );
    
    int main()
    {
    	int i;
    	worker_t workers[2] = {{0}}, *worker, *worker_ret[2];
    	for ( i = 0; i < 2; ++i )
    	{
    		worker = &(workers[i]);
    		worker->num = i + 1;
    		worker->tid = INVALID_TID;
    		worker->pipes[0] = INVALID_PIPE;
    		worker->pipes[1] = INVALID_PIPE;
    		if ( pthread_attr_init( &(worker->pthread_attr) ) != 0 )
    			goto cleanup;
    	}
    	
    	if ( pthread_create
    		(
    			&(workers[0].tid)
    			, &(workers[0].pthread_attr)
    			, (Worker_t)rdworker
    			, &(workers[0])
    		) != 0
    	) goto cleanup;
    	
    	if ( pthread_create
    		(
    			&(workers[1].tid)
    			, &(workers[1].pthread_attr)
    			, (Worker_t)wrworker
    			, &(workers[1])
    		) != 0
    	) goto cleanup;
    	
    	cleanup:
    	
    	while ( i > 0 )
    	{
    		--i;
    		worker = &(workers[i]);
    		if ( worker->tid != INVALID_TID )
    		{
    			pthread_join( worker->tid, (void**)&(worker_ret[i]) );
    		}
    		pthread_attr_destroy( &(worker->pthread_attr) );
    		worker->pipes[1] = INVALID_PIPE;
    		worker->pipes[0] = INVALID_PIPE;
    		worker->tid = INVALID_TID;
    	}
    	
    	printf("Hello World!\n");
    	return 0;
    }
    
    int open_pipes( pipe_t pipes[2] )
    {
    	pipes[0] = pipes[1] = INVALID_PIPE;
    #ifdef _WIN32
    #error "Unchecked calls"
    	pipes[0] = CreatePipe( FILE_READ, 0 );
    	pipes[1] = CreatePipe( FILE_WRITE, 0 );
    	return 0;
    #else
    	return pipe( pipes );
    #endif
    }
    void shut_pipes( pipe_t pipes[2] )
    {
    #ifdef _WIN32
    	CloseHandle( pipes[1] );
    	CloseHandle( pipes[0] );
    #else
    	close( pipes[1] );
    	close( pipes[0] );
    #endif
    	pipes[0] = pipes[1] = INVALID_PIPE;
    }
    int pipe_err( pipe_t pipe )
    {
    	return error( pipe );
    }
    ssize_t rdpipe( pipe_t pipe, void *data, size_t size )
    {
    #ifdef _WIN32
    	DWORD bytes = 0;
    	ReadFile( pipe, data, size, &bytes );
    	return bytes;
    #else
    	return read( pipe, data, size );
    #endif
    }
    ssize_t wrpipe( pipe_t pipe, void *data, size_t size )
    {
    #ifdef _WIN32
    	DWORD bytes = 0;
    	WriteFile( pipe, data, size, &bytes );
    	return bytes;
    #else
    	return write( pipe, data, size );
    #endif
    }
    
    void * rdworker( worker_t *worker )
    {
    	int num = 0;
    	ssize_t bytes;
    	
    	while (bytes = rdpipe( worker->pipes[0], &num, sizeof(int) ));
    	
    	printf( "Worker %d: Read %zd bytes, num = %d\n", worker->num, bytes, num );
    	
    	return worker;
    }
    
    void * wrworker( worker_t *worker )
    {
    	int num = 0x12345678;
    	size_t bytes = wrpipe( worker->pipes[1], &num, sizeof(int) );
    	
    	printf( "Worker %d: Wrote %zd bytes, num = %d\n", worker->num, bytes, num );
    	
    	return worker;
    }
    I compiled under "cc --std=c89 %f -lpthread && ./a.out", can someone explain what I'm doing wrong please? Take your time if you want as I'm going to bed, I'll read it 2mw

  2. #2
    C++ Witch laserlight's Avatar
    Join Date
    Oct 2003
    Location
    Singapore
    Posts
    28,362
    You forgot the part where you tell readers what you are trying to do and how does it not work.
    Quote Originally Posted by Bjarne Stroustrup (2000-10-14)
    I get maybe two dozen requests for help with some sort of programming or design problem every day. Most have more sense than to send me hundreds of lines of code. If they do, I ask them to find the smallest example that exhibits the problem and send me that. Mostly, they then find the error themselves. "Finding the smallest program that demonstrates the error" is a powerful debugging tool.
    Look up a C++ Reference and learn How To Ask Questions The Smart Way

  3. #3
    and the hat of int overfl Salem's Avatar
    Join Date
    Aug 2001
    Location
    The edge of the known universe
    Posts
    38,811
    The first thing I would do is rip out all that ifdef WIN32 rubbish until you've got the POSIX version working properly.

    Next,
    #define P_RD 0
    #define P_WR 1

    so things like
    wrpipe( worker->pipes[P_WR], &num, sizeof(int) );
    become that much more readable.

    > while (bytes = rdpipe( worker->pipes[0], &num, sizeof(int) ));
    Better hope this doesn't return an error before it returns zero.

    You don't even call open_pipes() or shut_pipes().


    This is an abomination.
    Code:
    typedef struct smem
    {
    	size_t have, want;
    	void *data;
    } smem_t;
    
    typedef struct smemv
    {
    	size_t have;
    	smem_t *data;
    } smemv_t;
    Yeah, the same generically meaningless member names in two structures with names differing only by a single character.
    You're going to be forever mixing these up.
    If you dance barefoot on the broken glass of undefined behaviour, you've got to expect the occasional cut.
    If at first you don't succeed, try writing your phone number on the exam paper.

  4. #4
    Registered User awsdert's Avatar
    Join Date
    Jan 2015
    Posts
    1,429
    Quote Originally Posted by Salem View Post
    The first thing I would do is rip out all that ifdef WIN32 rubbish until you've got the POSIX version working properly.
    WIN32=rubbish, agreed, however since that's not doing any harm and I will need to understand it at some point I will leave it there and correct it and test it after getting the posix version working.

    Quote Originally Posted by Salem View Post
    Next,
    #define P_RD 0
    #define P_WR 1

    so things like
    wrpipe( worker->pipes[P_WR], &num, sizeof(int) );
    become that much more readable.
    That's fine, done

    Quote Originally Posted by Salem View Post
    > while (bytes = rdpipe( worker->pipes[0], &num, sizeof(int) ));
    Better hope this doesn't return an error before it returns zero.
    That actually caused the opposite situation to what I wanted, it locked up the app, forgot to get rid of the loop before posting it. I actually wanted the threads to run out of sync but have the read thread wait for the write thread to send anything, the "out of sync" part is critical for what I want to do with gasp, as you will have noticed if you've ever used Game Conqueror or PINCE, their UI lockup for the sake of a scan, I wish to be able to still use the UI while I'm waiting for the thread to finish, I also wish to able to report the progress of the thread to some extent so asynchronous threads are the only way for me to go. Anyways is this is what I now got after fixing 2 of the points you gave and removing the loop:

    Code:
    #include <stdbool.h>
    #include <unistd.h>
    #include <stdlib.h>
    #include <stdio.h>
    #include <pthread.h>
    
    typedef struct smem
    {
    	size_t have, want;
    	void *data;
    } smem_t;
    
    typedef struct smemv
    {
    	size_t have;
    	smem_t *data;
    } smemv_t;
    
    #define PIPE_RD 0
    #define PIPE_WR 1
    #define PIPE_COUNT 2
    
    #ifdef _WIN32
    typedef HANDLE pipe_t;
    #define INVALID_PIPE NULL
    #else
    typedef int pipe_t;
    #define INVALID_PIPE -1
    #endif
    
    int open_pipes( pipe_t pipes[2] );
    void shut_pipes( pipe_t pipes[2] );
    int pipe_err( pipe_t pipe );
    ssize_t rdpipe( pipe_t pipe, void *data, size_t size );
    ssize_t wrpipe( pipe_t pipe, void *data, size_t size );
    
    #define INVALID_TID -1
    
    typedef struct worker
    {
    	int num;
    	pthread_t tid;
    	pipe_t pipes[PIPE_COUNT];
    	bool pthread_attr_created;
    	pthread_attr_t pthread_attr;
    } worker_t;
    
    typedef void * (*Worker_t)( void *arg );
    
    void * rdworker( worker_t *worker );
    void * wrworker( worker_t *worker );
    
    int main()
    {
    	int i;
    	worker_t workers[2] = {{0}}, *worker, *worker_ret[2];
    	for ( i = 0; i < 2; ++i )
    	{
    		worker = &(workers[i]);
    		worker->num = i + 1;
    		worker->tid = INVALID_TID;
    		worker->pipes[PIPE_RD] = INVALID_PIPE;
    		worker->pipes[PIPE_WR] = INVALID_PIPE;
    		worker->pthread_attr_created = false;
    		
    		if ( open_pipes( worker->pipes ) != 0 )
    			goto cleanup;
    		
    		if ( pthread_attr_init( &(worker->pthread_attr) ) != 0 )
    			goto cleanup;
    		
    		worker->pthread_attr_created = true;
    	}
    	
    	if ( pthread_create
    		(
    			&(workers[0].tid)
    			, &(workers[0].pthread_attr)
    			, (Worker_t)rdworker
    			, &(workers[0])
    		) != 0
    	) goto cleanup;
    	
    	if ( pthread_create
    		(
    			&(workers[1].tid)
    			, &(workers[1].pthread_attr)
    			, (Worker_t)wrworker
    			, &(workers[1])
    		) != 0
    	) goto cleanup;
    	
    	cleanup:
    	
    	while ( i > 0 )
    	{
    		--i;
    		worker = &(workers[i]);
    		
    		if ( worker->tid != INVALID_TID )
    			pthread_join( worker->tid, (void**)&(worker_ret[i]) );
    		
    		if ( worker->pthread_attr_created )
    			pthread_attr_destroy( &(worker->pthread_attr) );
    		
    		shut_pipes( worker->pipes );
    		
    		worker->pthread_attr_created = false;
    		worker->pipes[PIPE_WR] = INVALID_PIPE;
    		worker->pipes[PIPE_RD] = INVALID_PIPE;
    		worker->tid = INVALID_TID;
    	}
    	
    	printf("Hello World!\n");
    	return 0;
    }
    
    int open_pipes( pipe_t pipes[2] )
    {
    	pipes[0] = pipes[1] = INVALID_PIPE;
    #ifdef _WIN32
    #error "Unchecked calls"
    	pipes[0] = CreatePipe( FILE_READ, 0 );
    	pipes[1] = CreatePipe( FILE_WRITE, 0 );
    	return 0;
    #else
    	return pipe( pipes );
    #endif
    }
    void shut_pipes( pipe_t pipes[2] )
    {
    #ifdef _WIN32
    	CloseHandle( pipes[1] );
    	CloseHandle( pipes[0] );
    #else
    	close( pipes[1] );
    	close( pipes[0] );
    #endif
    	pipes[0] = pipes[1] = INVALID_PIPE;
    }
    int pipe_err( pipe_t pipe )
    {
    	return error( pipe );
    }
    ssize_t rdpipe( pipe_t pipe, void *data, size_t size )
    {
    #ifdef _WIN32
    	DWORD bytes = 0;
    	ReadFile( pipe, data, size, &bytes );
    	return bytes;
    #else
    	return read( pipe, data, size );
    #endif
    }
    ssize_t wrpipe( pipe_t pipe, void *data, size_t size )
    {
    #ifdef _WIN32
    	DWORD bytes = 0;
    	WriteFile( pipe, data, size, &bytes );
    	return bytes;
    #else
    	return write( pipe, data, size );
    #endif
    }
    
    void * rdworker( worker_t *worker )
    {
    	int num = 0;
    	ssize_t bytes;
    	
    	bytes = rdpipe( worker->pipes[0], &num, sizeof(int) );
    	
    	printf( "Worker %d: Read %zd bytes, num = %d\n", worker->num, bytes, num );
    	
    	return worker;
    }
    
    void * wrworker( worker_t *worker )
    {
    	int num = 0x12345678;
    	size_t bytes = wrpipe( worker->pipes[1], &num, sizeof(int) );
    	
    	printf( "Worker %d: Wrote %zd bytes, num = %d\n", worker->num, bytes, num );
    	
    	return worker;
    }
    You don't even call open_pipes() or shut_pipes().
    Whoops, didn't even notice, ty

    This is an abomination.
    Code:
    typedef struct smem
    {
    	size_t have, want;
    	void *data;
    } smem_t;
    
    typedef struct smemv
    {
    	size_t have;
    	smem_t *data;
    } smemv_t;
    Yeah, the same generically meaningless member names in two structures with names differing only by a single character.
    You're going to be forever mixing these up.
    [/quote]
    How would I mix them up? One is clearly an element for the other, I didn't want to keep typing shared_mem and shared_mem_vector so I shortened them down to reasonable names for test code.

  5. #5
    Registered User awsdert's Avatar
    Join Date
    Jan 2015
    Posts
    1,429
    Just noticed a couple of mistakes I made and corrected them. 1st I forgot open just one set of pipes and copy the fds to each worker, 2nd I forgot to update the thread functions to use the PIPE_RD/PIPE_WR defines. Now however the compilation fails with no indication of why, here's the updated code:
    Code:
    #define _GNU_SOURCE
    #include <stdbool.h>
    #include <stdlib.h>
    #include <stdio.h>
    #include <unistd.h>
    #include <signal.h>
    #include <poll.h>
    #include <pthread.h>
    
    typedef struct smem
    {
    	size_t have, want;
    	void *data;
    } smem_t;
    
    typedef struct smemv
    {
    	size_t have;
    	smem_t *data;
    } smemv_t;
    
    #define PIPE_RD 0
    #define PIPE_WR 1
    #define PIPE_COUNT 2
    
    #ifdef _WIN32
    typedef HANDLE pipe_t;
    #define INVALID_PIPE NULL
    #else
    typedef int pipe_t;
    #define INVALID_PIPE -1
    #endif
    
    int open_pipes( pipe_t pipes[2] );
    void shut_pipes( pipe_t pipes[2] );
    int pipe_err( pipe_t pipe );
    ssize_t rdpipe( pipe_t pipe, void *data, size_t size );
    ssize_t wrpipe( pipe_t pipe, void *data, size_t size );
    
    #define INVALID_TID -1
    
    typedef struct worker
    {
    	int num;
    	pthread_t tid;
    	pipe_t pipes[PIPE_COUNT];
    	bool attr_created;
    	pthread_attr_t attr;
    } worker_t;
    
    typedef void * (*Worker_t)( void *arg );
    
    void * rdworker( worker_t *worker );
    void * wrworker( worker_t *worker );
    
    int main()
    {
    	int i;
    	pipe_t pipes[PIPE_COUNT] = {INVALID_PIPE,INVALID_PIPE};
    	worker_t workers[2] = {{0}}, *worker, *worker_ret[2];
    	
    	if ( open_pipes( worker->pipes ) != 0 )
    			goto cleanup;
    	
    	for ( i = 0; i < 2; ++i )
    	{
    		worker = &(workers[i]);
    		worker->num = i + 1;
    		worker->tid = INVALID_TID;
    		worker->pipes[PIPE_RD] = pipes[PIPE_RD];
    		worker->pipes[PIPE_WR] = pipes[PIPE_WR];
    		worker->attr_created = false;
    		
    		if ( pthread_attr_init( &(worker->attr) ) != 0 )
    			goto cleanup;
    		
    		worker->attr_created = true;
    	}
    	
    	worker = &(workers[0]);
    	
    	if ( pthread_create
    		(
    			&(worker->tid)
    			, &(worker->attr)
    			, (Worker_t)rdworker
    			, worker
    		) != 0
    	) goto cleanup;
    	
    	worker = &(workers[1]);
    	
    	if ( pthread_create
    		(
    			&(worker->tid)
    			, &(worker->attr)
    			, (Worker_t)wrworker
    			, worker
    		) != 0
    	) goto cleanup;
    	
    	pthread_join( workers[0].tid, (void**)&(worker_ret[0]) );
    	
    	cleanup:
    	
    	while ( i > 0 )
    	{
    		--i;
    		worker = &(workers[i]);
    		
    		if ( worker->num >= 0 )
    			pthread_cancel( worker->tid );
    		
    		if ( worker->attr_created )
    			pthread_attr_destroy( &(worker->attr) );
    		
    		worker->attr_created = false;
    		worker->pipes[PIPE_WR] = INVALID_PIPE;
    		worker->pipes[PIPE_RD] = INVALID_PIPE;
    		worker->tid = INVALID_TID;
    	}
    	
    	shut_pipes( pipes );
    	
    	printf("Hello World!\n");
    	return 0;
    }
    
    int open_pipes( pipe_t pipes[2] )
    {
    	pipes[0] = pipes[1] = INVALID_PIPE;
    #ifdef _WIN32
    #error "Unchecked calls"
    	pipes[0] = CreatePipe( FILE_READ, 0 );
    	pipes[1] = CreatePipe( FILE_WRITE, 0 );
    	return 0;
    #else
    	return pipe( pipes );
    #endif
    }
    void shut_pipes( pipe_t pipes[2] )
    {
    #ifdef _WIN32
    	CloseHandle( pipes[1] );
    	CloseHandle( pipes[0] );
    #else
    	close( pipes[1] );
    	close( pipes[0] );
    #endif
    	pipes[0] = pipes[1] = INVALID_PIPE;
    }
    int pipe_err( pipe_t pipe )
    {
    	return error( pipe );
    }
    ssize_t rdpipe( pipe_t pipe, void *data, size_t size )
    {
    #ifdef _WIN32
    	DWORD bytes = 0;
    	ReadFile( pipe, data, size, &bytes );
    	return bytes;
    #else
    	return read( pipe, data, size );
    #endif
    }
    ssize_t wrpipe( pipe_t pipe, void *data, size_t size )
    {
    #ifdef _WIN32
    	DWORD bytes = 0;
    	WriteFile( pipe, data, size, &bytes );
    	return bytes;
    #else
    	return write( pipe, data, size );
    #endif
    }
    
    void * rdworker( worker_t *worker )
    {
    	int num = 0;
    	ssize_t bytes;
    	/*
    	struct pollfd fds = {0};
    	fds.fd = worker->pipes[PIPE_RD];
    	*/
    	
    	printf( "Worker %d: num = %d\n", worker->num, num );
    	
    	bytes = rdpipe( worker->pipes[PIPE_RD], &num, sizeof(int) );
    	
    	printf( "Worker %d: num = %d, Read %zd bytes\n", worker->num, num, bytes );
    	
    	/* Indicate the thread exited */
    	worker->num = -1;
    	return worker;
    }
    
    void * wrworker( worker_t *worker )
    {
    	int num = 0x12345678;
    	size_t bytes;
    	
    	printf( "Worker %d: num = %d\n", worker->num, num );
    	
    	bytes = wrpipe( worker->pipes[PIPE_WR], &num, sizeof(int) );
    	
    	printf( "Worker %d: num = %d, Wrote %zd bytes\n", worker->num, num, bytes );
    	
    	/* Indicate the thread exited */
    	worker->num = -1;
    	return worker;
    }

  6. #6
    Registered User awsdert's Avatar
    Join Date
    Jan 2015
    Posts
    1,429
    Decided to go into the terminal, got more information there, turned out there was segfault, a debugging session later I got the code working, btw how do you shut down gdb without closing the terminal as no matter what basic sense command I tried nothing worked, I tried both exit and stop but neither worked out. Also for anyone looking to reference the code here it is (win32 code still isn't checked so don't rely on it)
    Code:
    #define _GNU_SOURCE
    #include <stdbool.h>
    #include <stdlib.h>
    #include <stdio.h>
    #include <unistd.h>
    #include <signal.h>
    #include <poll.h>
    #include <pthread.h>
    
    typedef struct smem
    {
    	size_t have, want;
    	void *data;
    } smem_t;
    
    typedef struct smemv
    {
    	size_t have;
    	smem_t *data;
    } smemv_t;
    
    #define PIPE_RD 0
    #define PIPE_WR 1
    #define PIPE_COUNT 2
    
    #ifdef _WIN32
    typedef HANDLE pipe_t;
    #define INVALID_PIPE NULL
    #else
    typedef int pipe_t;
    #define INVALID_PIPE -1
    #endif
    
    int open_pipes( pipe_t *pipes );
    void shut_pipes( pipe_t *pipes );
    int pipe_err( pipe_t pipe );
    ssize_t rdpipe( pipe_t pipe, void *data, size_t size );
    ssize_t wrpipe( pipe_t pipe, void *data, size_t size );
    
    #define INVALID_TID -1
    
    typedef struct worker
    {
    	int num;
    	pthread_t tid;
    	pipe_t pipes[PIPE_COUNT];
    	bool attr_created;
    	pthread_attr_t attr;
    } worker_t;
    
    typedef void * (*Worker_t)( void *arg );
    
    void * rdworker( worker_t *worker );
    void * wrworker( worker_t *worker );
    
    int main()
    {
    	int i;
    	pipe_t pipes[PIPE_COUNT] = {INVALID_PIPE,INVALID_PIPE};
    	worker_t workers[2] = {{0}}, *worker, *worker_ret[2];
    	
    	if ( open_pipes( pipes ) != 0 )
    		goto cleanup;
    	
    	for ( i = 0; i < 2; ++i )
    	{
    		worker = &(workers[i]);
    		worker->num = i + 1;
    		worker->tid = INVALID_TID;
    		worker->pipes[PIPE_RD] = pipes[PIPE_RD];
    		worker->pipes[PIPE_WR] = pipes[PIPE_WR];
    		worker->attr_created = false;
    		
    		if ( pthread_attr_init( &(worker->attr) ) != 0 )
    			goto cleanup;
    		
    		worker->attr_created = true;
    	}
    	
    	worker = &(workers[0]);
    	
    	if ( pthread_create
    		(
    			&(worker->tid)
    			, &(worker->attr)
    			, (Worker_t)rdworker
    			, worker
    		) != 0
    	) goto cleanup;
    	
    	worker = &(workers[1]);
    	
    	pthread_attr_setdetachstate( &(worker->attr), PTHREAD_CREATE_DETACHED );
    	
    	if ( pthread_create
    		(
    			&(worker->tid)
    			, &(worker->attr)
    			, (Worker_t)wrworker
    			, worker
    		) != 0
    	) goto cleanup;
    	
    	pthread_join( workers[0].tid, (void**)&(worker_ret[0]) );
    	
    	cleanup:
    	
    	while ( i > 0 )
    	{
    		--i;
    		worker = &(workers[i]);
    		
    		if ( worker->num >= 0 )
    			pthread_cancel( worker->tid );
    		
    		if ( worker->attr_created )
    			pthread_attr_destroy( &(worker->attr) );
    		
    		worker->attr_created = false;
    		worker->pipes[PIPE_WR] = INVALID_PIPE;
    		worker->pipes[PIPE_RD] = INVALID_PIPE;
    		worker->tid = INVALID_TID;
    	}
    	
    	shut_pipes( pipes );
    	
    	printf("Hello World!\n");
    	return 0;
    }
    
    int open_pipes( pipe_t *pipes )
    {
    	pipes[0] = pipes[1] = INVALID_PIPE;
    #ifdef _WIN32
    #error "Unchecked calls"
    	pipes[0] = CreatePipe( FILE_READ, 0 );
    	pipes[1] = CreatePipe( FILE_WRITE, 0 );
    	return 0;
    #else
    	return pipe( pipes );
    #endif
    }
    void shut_pipes( pipe_t *pipes )
    {
    #ifdef _WIN32
    	CloseHandle( pipes[1] );
    	CloseHandle( pipes[0] );
    #else
    	close( pipes[1] );
    	close( pipes[0] );
    #endif
    	pipes[0] = pipes[1] = INVALID_PIPE;
    }
    int pipe_err( pipe_t pipe )
    {
    	return error( pipe );
    }
    ssize_t rdpipe( pipe_t pipe, void *data, size_t size )
    {
    #ifdef _WIN32
    	DWORD bytes = 0;
    	ReadFile( pipe, data, size, &bytes );
    	return bytes;
    #else
    	return read( pipe, data, size );
    #endif
    }
    ssize_t wrpipe( pipe_t pipe, void *data, size_t size )
    {
    #ifdef _WIN32
    	DWORD bytes = 0;
    	WriteFile( pipe, data, size, &bytes );
    	return bytes;
    #else
    	return write( pipe, data, size );
    #endif
    }
    
    void * rdworker( worker_t *worker )
    {
    	int num = 0;
    	ssize_t bytes;
    	struct pollfd fds = {0};
    	fds.fd = worker->pipes[PIPE_RD];
    	fds.events = POLLIN | POLLERR | POLLHUP | POLLNVAL;
    	
    	printf( "Worker %d: num = %d\n", worker->num, num );
    	
    	poll( &fds, 1, 1 );
    	
    	bytes = rdpipe( worker->pipes[PIPE_RD], &num, sizeof(int) );
    	
    	printf( "Worker %d: num = %d, Read %zd bytes\n", worker->num, num, bytes );
    	
    	/* Indicate the thread exited */
    	worker->num = -1;
    	return worker;
    }
    
    void * wrworker( worker_t *worker )
    {
    	int num = 0x12345678;
    	size_t bytes;
    	
    	printf( "Worker %d: num = %d\n", worker->num, num );
    	
    	bytes = wrpipe( worker->pipes[PIPE_WR], &num, sizeof(int) );
    	
    	printf( "Worker %d: num = %d, Wrote %zd bytes\n", worker->num, num, bytes );
    	
    	/* Indicate the thread exited */
    	worker->num = -1;
    	return worker;
    }

  7. #7
    and the hat of int overfl Salem's Avatar
    Join Date
    Aug 2001
    Location
    The edge of the known universe
    Posts
    38,811
    > btw how do you shut down gdb without closing the terminal as no matter what basic sense command I tried nothing worked
    Try the 'quit' command.

    > I didn't want to keep typing shared_mem and shared_mem_vector
    That's why the world moved on and started using auto-completion text editors.
    But your naming is already off, because it looks like shared_mem is already a vector (what with those sizes in there).

    Oh, and a note on portability
    Code:
    // mypipe_win32.h
    #ifndef MYPIPE_WIN32_H
    #define MYPIPE_WIN32_H
    typedef HANDLE pipe_t;
    #define INVALID_PIPE NULL
    #endif
    
    // mypipe_posix.h
    #ifndef MYPIPE_POSIX_H
    #define MYPIPE_POSIX_H
    typedef int pipe_t;
    #define INVALID_PIPE -1
    #endif
    
    // mypipe.h
    #ifndef MYPIPE_H
    #define MYPIPE_H
    
    #ifdef _WIN32
    #include "mypipe_win32.h"
    #else
    #include "mypipe_posix.h"
    #endif
    
    int open_pipes( pipe_t *pipes );
    void shut_pipes( pipe_t *pipes );
    ssize_t rdpipe( pipe_t pipe, void *data, size_t size );
    ssize_t wrpipe( pipe_t pipe, void *data, size_t size );
    
    #endif
    
    // mypipe_posix.c
    #include "mypipe.h"
    int open_pipes( pipe_t *pipes )
    {
        pipes[0] = pipes[1] = INVALID_PIPE;
        return pipe( pipes );
    }
    void shut_pipes( pipe_t *pipes )
    {
        close( pipes[1] );
        close( pipes[0] );
        pipes[0] = pipes[1] = INVALID_PIPE;
    }
    ssize_t rdpipe( pipe_t pipe, void *data, size_t size )
    {
        return read( pipe, data, size );
    }
    ssize_t wrpipe( pipe_t pipe, void *data, size_t size )
    {
        return write( pipe, data, size );
    }
    
    
    // mypipe_win32.c
    #include "mypipe.h"
    int open_pipes( pipe_t *pipes )
    {
        pipes[0] = pipes[1] = INVALID_PIPE;
    #error "Unchecked calls"
        pipes[0] = CreatePipe( FILE_READ, 0 );
        pipes[1] = CreatePipe( FILE_WRITE, 0 );
        return 0;
    }
    void shut_pipes( pipe_t *pipes )
    {
        CloseHandle( pipes[1] );
        CloseHandle( pipes[0] );
        pipes[0] = pipes[1] = INVALID_PIPE;
    }
    ssize_t rdpipe( pipe_t pipe, void *data, size_t size )
    {
        DWORD bytes = 0;
        ReadFile( pipe, data, size, &bytes );
        return bytes;
    }
    ssize_t wrpipe( pipe_t pipe, void *data, size_t size )
    {
        DWORD bytes = 0;
        WriteFile( pipe, data, size, &bytes );
        return bytes;
    }
    1. The only file your code includes is mypipe.h
    2. The ONLY place where you need the system selection #ifdef is inside mypipe.h to include the appropriate system specialisation of mypipe_posix.h or mypipe_win32.h
    3. In your build project / makefile, you choose ONE of mypipe_posix.c or mypipe_win32.c as the specialised implementation.

    You don't have to keep mucking about with littering #ifdef BANANA all over your code once you have a decent abstraction in place.
    If you dance barefoot on the broken glass of undefined behaviour, you've got to expect the occasional cut.
    If at first you don't succeed, try writing your phone number on the exam paper.

  8. #8
    Registered User awsdert's Avatar
    Join Date
    Jan 2015
    Posts
    1,429
    Quote Originally Posted by Salem View Post
    > btw how do you shut down gdb without closing the terminal as no matter what basic sense command I tried nothing worked
    Try the 'quit' command.

    > I didn't want to keep typing shared_mem and shared_mem_vector
    That's why the world moved on and started using auto-completion text editors.
    But your naming is already off, because it looks like shared_mem is already a vector (what with those sizes in there).

    Oh, and a note on portability
    Code:
    // mypipe_win32.h
    #ifndef MYPIPE_WIN32_H
    #define MYPIPE_WIN32_H
    typedef HANDLE pipe_t;
    #define INVALID_PIPE NULL
    #endif
    
    // mypipe_posix.h
    #ifndef MYPIPE_POSIX_H
    #define MYPIPE_POSIX_H
    typedef int pipe_t;
    #define INVALID_PIPE -1
    #endif
    
    // mypipe.h
    #ifndef MYPIPE_H
    #define MYPIPE_H
    
    #ifdef _WIN32
    #include "mypipe_win32.h"
    #else
    #include "mypipe_posix.h"
    #endif
    
    int open_pipes( pipe_t *pipes );
    void shut_pipes( pipe_t *pipes );
    ssize_t rdpipe( pipe_t pipe, void *data, size_t size );
    ssize_t wrpipe( pipe_t pipe, void *data, size_t size );
    
    #endif
    
    // mypipe_posix.c
    #include "mypipe.h"
    int open_pipes( pipe_t *pipes )
    {
        pipes[0] = pipes[1] = INVALID_PIPE;
        return pipe( pipes );
    }
    void shut_pipes( pipe_t *pipes )
    {
        close( pipes[1] );
        close( pipes[0] );
        pipes[0] = pipes[1] = INVALID_PIPE;
    }
    ssize_t rdpipe( pipe_t pipe, void *data, size_t size )
    {
        return read( pipe, data, size );
    }
    ssize_t wrpipe( pipe_t pipe, void *data, size_t size )
    {
        return write( pipe, data, size );
    }
    
    
    // mypipe_win32.c
    #include "mypipe.h"
    int open_pipes( pipe_t *pipes )
    {
        pipes[0] = pipes[1] = INVALID_PIPE;
    #error "Unchecked calls"
        pipes[0] = CreatePipe( FILE_READ, 0 );
        pipes[1] = CreatePipe( FILE_WRITE, 0 );
        return 0;
    }
    void shut_pipes( pipe_t *pipes )
    {
        CloseHandle( pipes[1] );
        CloseHandle( pipes[0] );
        pipes[0] = pipes[1] = INVALID_PIPE;
    }
    ssize_t rdpipe( pipe_t pipe, void *data, size_t size )
    {
        DWORD bytes = 0;
        ReadFile( pipe, data, size, &bytes );
        return bytes;
    }
    ssize_t wrpipe( pipe_t pipe, void *data, size_t size )
    {
        DWORD bytes = 0;
        WriteFile( pipe, data, size, &bytes );
        return bytes;
    }
    1. The only file your code includes is mypipe.h
    2. The ONLY place where you need the system selection #ifdef is inside mypipe.h to include the appropriate system specialisation of mypipe_posix.h or mypipe_win32.h
    3. In your build project / makefile, you choose ONE of mypipe_posix.c or mypipe_win32.c as the specialised implementation.

    You don't have to keep mucking about with littering #ifdef BANANA all over your code once you have a decent abstraction in place.
    Answering via my phone so no split edits, anyways smem_t is not supposed to be specifically be a vector, it's supposed to be shared information about shared memory, hence why it has a "want" parameter, the idea is when a thread wants shared memory to increase in size it 1st updates the "want" member then sends off a signal to the thread responsible for managing the shared memory, its the only way I can think of for safely sharing memory that could change in size or be released unexectedly otherwise, as for portability, the code is just a test, its not in the target project, its just something to help me understand roughly how I should handle the multithreaded memory situation before I start with the rewrite, the win32 is because I want to make sure my undertanding of both versions is correct so that I don't rewrite my code again later when I try to implement support for both platforms

Popular pages Recent additions subscribe to a feed

Similar Threads

  1. Help using pipes
    By demonofnight in forum C++ Programming
    Replies: 5
    Last Post: 12-27-2011, 08:48 AM
  2. 2 way pipes
    By ssharish2005 in forum C Programming
    Replies: 6
    Last Post: 10-25-2010, 04:21 AM
  3. Pipes
    By Martin Kovac in forum C Programming
    Replies: 1
    Last Post: 03-31-2009, 03:09 AM
  4. Pipes!
    By NuNn in forum C Programming
    Replies: 6
    Last Post: 03-11-2009, 07:00 AM
  5. pipes in c
    By ajal1 in forum C Programming
    Replies: 6
    Last Post: 10-29-2005, 03:29 PM

Tags for this Thread