Socket blocking & end to end delay

This is a discussion on Socket blocking & end to end delay within the Networking/Device Communication forums, part of the General Programming Boards category; Hi all, I am doing a simple test with RTP packets. I have a computer with two wireless cards connected ...

  1. #1
    Registered User
    Join Date
    Aug 2010
    Posts
    23

    Socket blocking & end to end delay

    Hi all,

    I am doing a simple test with RTP packets. I have a computer with two wireless cards connected to a wifi-router (11Mbit/s), acting as User Agent (UA), and another computer connected to the router via Ethernet (10Mbit/s), acting as Proxy.

    The UA sends packets to the Proxy from one wireless card and the response from the proxy is received in the other wireless card. This allows me to measure the End to End delay of a packet by looking the sequence number of each packet received.

    Since my UA is programmed with threads, I have one thread sending messages and the other receiving. So basically,each time a packet is sent I check whether something has been received. I also collect the information with Wireshark to see what is happenning at the network. Since the generated packets take audio from snd_pcm_readi(), i have checked that sent packets are always sent at least 20 ms apart.

    Question: Why, if I am using a non-blocking socket at the receiver, the network performs better than if i am using a standar socket? That is, if i set the receiver's socket to NON -blocking the network sends Bursts of packets whereas if i use a blocking socket the network performs really well.

    Here some Pseudo Log from Wireshark

    Consider
    User Agent IP = IPu
    Proxy IP = IPp

    With non blocking

    TIME, IP SRC, IP DEST
    0, IPu, IPp
    1, IPu, IPp
    2, IPu, IPp
    3, IPu, IPp
    4, IPu, IPp
    5, IPu, IPp
    6, IPp, IPu
    7, IPp, IPu
    8, IPp, IPu
    9, IPp, IPu


    With blocking

    TIME, IP SRC, IP DEST
    0, IPu, IPp
    1, IPp, IPu
    2, IPu, IPp
    3, IPp, IPu
    4, IPu, IPp
    5, IPp, IPu
    6, IPu, IPp
    7, IPp, IPu
    8, IPu, IPp
    9, IPp, IPu
    10, IPu, IPp
    11, IPp, IPu

    Any ideas here?

    Thanks

  2. #2
    Banned
    Join Date
    Aug 2010
    Location
    Ontario Canada
    Posts
    9,547
    With a blocking socket the code (and thus the thread) comes to a blinding halt when it encounters the recv function... if send is a result of receive, the system paces itself to the network activity.

    With non-blocking sockets you need some means of detecting when a packet is ready to receive, which is usually done with select(). The idea is that your software can incorporate the select() function into some kind of wait loop (such as Windows message dispatcher) so that the system checks for incoming packets when idle and calls your receive functions. Again if send is the result of receive it will pace itself to the network activity.

    However... if you simply have two free running threads calling send and recv ansynchronously in a loop you are no longer doing send as a function of receive (or vice versa) and errors and collisions will result in the kind of network activity you are seeing.

  3. #3
    Registered User
    Join Date
    Aug 2010
    Posts
    23
    I am a bit confused with "the system paces itself to the network activity ".

    First, yes I am sending packet and receiving them so there is sort of correlation in the User Agent. My point is that if the Sender if the user already has a 20ms delay from packet to packet I do not see any harm on checking the socket whether anything was received or not. So basically if i am receiving something of the same size of my payload I read if not I ignore it. Below my pseudo Code

    Code:
    GLOBAL VALUES
    Int runcond = 1;
    Int queue_position;
    pthread_mutex_t token_mutex;
    pthread_cond_t ReceiveData_cond;//=PTHREAD_COND_INITIALIZER;
    int breceive;
    pthread_cond_t SendData_cond;//=PTHREAD_COND_INITIALIZER;
    int bsend;
    pthread_cond_t Start_cond;//=PTHREAD_COND_INITIALIZER;
    int payloadsize=160;
    int rtp_hdr=12;
    
    Main Proxy
    
    Int sockfd;
    pthread_create (&pid_rtp_sender, NULL, &send_rtp, null;
    pthread_create (&pid_rtp_receiver, NULL, &receive_rtp,null);
    void* send_rtp(void* unused)
    {
    unsigned char * pkt;
    if (!(pkt=malloc(((payloadsize+rtp_hdr)+1)*sizeof(char))))printf("malloc error for pkt\n");
    if ((sockfd=socket(PF_INET,SOCK_DGRAM,0))<0) {
    	printf("socket error");
    	return;
    }
    /*Bind the source port*/
    inet_pton(AF_INET,strdup("192.168.1.2"),&ip_address.s_addr);
    rem_addr.sin_family	= AF_INET;
    rem_addr.sin_port	= htons(ndestport);
    memcpy(&rem_addr.sin_addr.s_addr,&ip_address.s_addr, sizeof(rem_addr.sin_addr));
    addrlen=sizeof(rem_addr);
    
    	
    bind (sockfd,(struct sockaddr *)&local_addr,sizeof(struct sockaddr_in));
    flags = fcntl(sockfd, F_GETFL);
    flags |= O_NONBLOCK;
    fcntl(sockfd, F_SETFL, flags);
    while (runcond)
    {
    	pthread_mutex_lock(&token_mutex);
    if (q_position > 0){			
    	/*Get Payload from Queue*/	  	  
    	pkt=get_queue(queue,q_position);
    snd=sendto(sockfd,pkt,payloadsize+rtp_hdr,0,(struct sockaddr *)&rem_addr,sizeof(rem_addr));
    		}
    	breceive=1;
    	pthread_cond_signal(&ReceiveData_cond);
    	bsend=0;
    
    	pthread_mutex_unlock(&token_mutex);
    }
    void* receive_rtp(void* unused)
    {
    unsigned char * pkt;
    if (!(pkt=malloc(((payloadsize+rtp_hdr)+1)*sizeof(char))))printf("malloc error for pkt\n");
    if ((sockfd=socket(PF_INET,SOCK_DGRAM,0))<0) {
    	printf("socket error");
    	return;
    }
    /*Bind the source port*/
    
    inet_pton(AF_INET,myip,&ip_address.s_addr);
    rem_addr.sin_family	= AF_INET;
    rem_addr.sin_port	= htons(ndestport);
    memcpy(&rem_addr.sin_addr.s_addr,&ip_address.s_addr, sizeof(rem_addr.sin_addr));
    addrlen=sizeof(rem_addr);
    
    	
    bind (sockfd,(struct sockaddr *)&local_addr,sizeof(struct sockaddr_in));
    flags = fcntl(sockfd, F_GETFL);
    flags |= O_NONBLOCK;
    fcntl(sockfd, F_SETFL, flags);
    while (runcond)
    {
    pthread_mutex_lock(&token_mutex);
    err=recvfrom(sockfd,pkt,payloadsize+rtp_hdr, 0, (struct sockaddr *)&address,&addrlen );
    if (err==payloadsize){
    		memcpy(payload,pkt+rtp_hdr,payloadsize);
    		memcpy(&cseq_tmp,pkt+2,sizeof(cseq_tmp));
    		/*ADD PAYLOAD TO A FIFO QUEUE*/
    q_position=q_add(queue,q_position,&cdnbuffer,payloadsize);
    }
    	bsend=1;
    	pthread_cond_signal(&SendData_cond);
    	breceive=0;		
    	while (!breceive)
    		pthread_cond_wait(&ReceiveData_cond,&token_mutex);
      	pthread_mutex_unlock(&token_mutex);		
    }
    USER AGENT
    Code:
    void* send_rtp(void* unused)
    { 
    
    unsigned char * pkt;
    if (!(pkt=malloc(((payloadsize+rtp_hdr)+1)*sizeof(char))))printf("malloc error for pkt\n");
    if ((sockfd=socket(PF_INET,SOCK_DGRAM,0))<0) {
    	printf("socket error");
    	return;
    }
    /*Bind the source port*/
    inet_pton(AF_INET,strdup("192.168.1.2"),&ip_address.s_addr);
    rem_addr.sin_family	= AF_INET;
    rem_addr.sin_port	= htons(ndestport);
    memcpy(&rem_addr.sin_addr.s_addr,&ip_address.s_addr, sizeof(rem_addr.sin_addr));
    addrlen=sizeof(rem_addr);
    
    	
    bind (sockfd,(struct sockaddr *)&local_addr,sizeof(struct sockaddr_in));
    
    /* Open PCM device for recording (capture). */
      	//rc = snd_pcm_open(&handle, "default",SND_PCM_STREAM_CAPTURE, 0);
    	rc = snd_pcm_open(&handle, "plughw:0,0",SND_PCM_STREAM_CAPTURE, 0);
      	if (rc < 0){
      		fprintf(stderr,"unable to open pcm device: %s\n",snd_strerror(rc));
      		exit(1);
      	}
      	printf("PCM opened cap\n");
      	snd_pcm_hw_params_alloca(&params);
      	snd_pcm_hw_params_any(handle, params);
    	snd_pcm_hw_params_set_access(handle, params,SND_PCM_ACCESS_RW_INTERLEAVED);
    	//snd_pcm_hw_params_set_format(handle, params,SND_PCM_FORMAT_MU_LAW);
    	if (snd_pcm_hw_params_set_format(handle, params,SND_PCM_FORMAT_S16_LE)<0)printf("pcm set format\n");
    	snd_pcm_hw_params_set_channels(handle, params, 1);
    
      	snd_pcm_hw_params_set_rate_near(handle, params, &val, &dir);
    
      	snd_pcm_hw_params_set_period_size(handle,params, frames, dir);
      	rc = snd_pcm_hw_params(handle, params);
      	if (rc < 0){
    		fprintf(stderr,"unable to set hw parameters: %s\n",snd_strerror(rc));
      		exit(1);
      	}
      	if (DEBUG) printf("Parameters written to the driver cap\n");
    
    fcntl(sockfd, F_SETFL, flags);
    while (runcond)
    {
    	pthread_mutex_lock(&token_mutex);
    		rc = snd_pcm_readi(handle, pkt, frames);
      		if (rc == -EPIPE) 
      		{
      			printf(" sender overrun occurred\n");fflush(stdout);
      			snd_pcm_prepare(handle);
      		} 
      		else if (rc < 0) printf("error from read \n");
      		else if (rc != (int)frames) printf("short read, read %d frames\n", rc);	
    snd=sendto(sockfd,pkt,payloadsize+rtp_hdr,0,(struct sockaddr *)&rem_addr,sizeof(rem_addr));
    
    	breceive=1;
    	pthread_cond_signal(&ReceiveData_cond);
    	bsend=0;
    
    	pthread_mutex_unlock(&token_mutex);
    }
    
    void* receive_rtp(void* unused)
    {
    unsigned char * pkt;
    if (!(pkt=malloc(((payloadsize+rtp_hdr)+1)*sizeof(char))))printf("malloc error for pkt\n");
    if ((sockfd=socket(PF_INET,SOCK_DGRAM,0))<0) {
    	printf("socket error");
    	return;
    }
    /*Bind the source port*/
    
    inet_pton(AF_INET,myip,&ip_address.s_addr);
    rem_addr.sin_family	= AF_INET;
    rem_addr.sin_port	= htons(ndestport);
    memcpy(&rem_addr.sin_addr.s_addr,&ip_address.s_addr, sizeof(rem_addr.sin_addr));
    addrlen=sizeof(rem_addr);
    
    	
    bind (sockfd,(struct sockaddr *)&local_addr,sizeof(struct sockaddr_in));
    /*OPTIONAL??*/
    flags = fcntl(sockfd, F_GETFL);
    flags |= O_NONBLOCK;
    fcntl(sockfd, F_SETFL, flags);
    while (runcond)
    {
    pthread_mutex_lock(&token_mutex);
    err=recvfrom(sockfd,pkt,payloadsize+rtp_hdr, 0, (struct sockaddr *)&address,&addrlen );
    	if (err==payloadsize){
    			memcpy(payload,pkt+rtp_hdr,payloadsize);
    			memcpy(&cseq_tmp,pkt+2,sizeof(cseq_tmp));	
    			
    		}
    	bsend=1;
    	pthread_cond_signal(&SendData_cond);
    	breceive=0;		
    	while (!breceive)
    		pthread_cond_wait(&ReceiveData_cond,&token_mutex);
      	pthread_mutex_unlock(&token_mutex);		
    }
    I am programming that wrongly? is there any other way to make this more efficient? It is important for me not to use blocking sockets as if one packet is lost the sender will suffer this delay too.

    Any ideas are welcome

  4. #4
    Banned
    Join Date
    Aug 2010
    Location
    Ontario Canada
    Posts
    9,547
    I'm afraid what you've got there is a bit above my grade... Perhaps one of the others can help you better than I can...

  5. #5
    Registered User
    Join Date
    Mar 2010
    Posts
    68

    wellll

    The possible reason is because a non blocking socket will return immediately when a recv is called and there is no data to retrieve. Blocking socket will wait until there is data to receive. So, there might be some small delays between when the data is arriving which is causing your program to actually stop for small periods. When the sockets are non blocking, your program is not at the mercy of the network any longer... . . always good ideas for non blocking on udp . . .

Popular pages Recent additions subscribe to a feed

Similar Threads

  1. Problem building Quake source
    By Silvercord in forum Game Programming
    Replies: 16
    Last Post: 07-11-2010, 10:13 AM
  2. socket blocking problem
    By killerbyte in forum Linux Programming
    Replies: 2
    Last Post: 05-21-2006, 04:25 PM
  3. socket newbie, losing a few chars from server to client
    By registering in forum Linux Programming
    Replies: 2
    Last Post: 06-07-2003, 12:48 PM
  4. Next Question...
    By Azmeos in forum C++ Programming
    Replies: 3
    Last Post: 06-06-2003, 03:40 PM
  5. Socket Blocking Trouble!
    By LearningMan in forum Windows Programming
    Replies: 6
    Last Post: 01-09-2003, 10:09 AM

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21