Blame tests/multithread_test/exchange.cpp

Packit Service aa3af4
/* 
Packit Service aa3af4
** Build command: g++ -lpthread exchange.cpp -o exchange
Packit Service aa3af4
*/
Packit Service aa3af4
Packit Service aa3af4
#include <stdio.h>
Packit Service aa3af4
#include <sys/select.h>
Packit Service aa3af4
#include <sys/time.h>
Packit Service aa3af4
#include <sys/types.h>
Packit Service aa3af4
#include <unistd.h>
Packit Service aa3af4
#include <sys/socket.h>
Packit Service aa3af4
#include <arpa/inet.h>
Packit Service aa3af4
#include <netinet/in.h>
Packit Service aa3af4
#include <stdlib.h>
Packit Service aa3af4
#include <memory.h>
Packit Service aa3af4
#include <stdint.h>
Packit Service aa3af4
#include <string.h>
Packit Service aa3af4
#include <pthread.h>
Packit Service aa3af4
#include <sched.h>
Packit Service aa3af4
#include <errno.h>
Packit Service aa3af4
Packit Service aa3af4
#define NUM_SOCKETS 2
Packit Service aa3af4
#define MC_SOCKET 0
Packit Service aa3af4
#define UC_SOCKET 1
Packit Service aa3af4
#define NUM_PACKETS 200000
Packit Service aa3af4
#define IF_ADDRESS "1.1.1.18"
Packit Service aa3af4
#define UC_SERVER_ADDRESS "1.1.1.19"
Packit Service aa3af4
#define MC_ADDRESS "224.0.1.2"
Packit Service aa3af4
#define MC_DEST_PORT 15111
Packit Service aa3af4
#define UC_LOCAL_PORT 15222
Packit Service aa3af4
#define UC_SERVER_PORT 15333
Packit Service aa3af4
#define MC_BUFFLEN 200
Packit Service aa3af4
#define UC_BUFFLEN 12
Packit Service aa3af4
#define MIN_UC_BUFFLEN 10
Packit Service aa3af4
#define SLEEP_TIME_USEC 10
Packit Service aa3af4
#define MAX_PARAM_LENGTH 20
Packit Service aa3af4
Packit Service aa3af4
int fd_list[NUM_SOCKETS];
Packit Service aa3af4
uint64_t tx_pkt_count, delta_usec_quote;
Packit Service aa3af4
struct timeval tv_quote_start, tv_quote_end;
Packit Service aa3af4
Packit Service aa3af4
char if_address[MAX_PARAM_LENGTH] = "NO IF ADDRESS!!!";
Packit Service aa3af4
int num_packets = NUM_PACKETS;
Packit Service aa3af4
char mc_address[MAX_PARAM_LENGTH] = MC_ADDRESS;
Packit Service aa3af4
uint16_t mc_dest_port = MC_DEST_PORT;
Packit Service aa3af4
uint16_t uc_local_port = UC_LOCAL_PORT;
Packit Service aa3af4
int mc_bufflen = MC_BUFFLEN;
Packit Service aa3af4
int uc_bufflen = UC_BUFFLEN;
Packit Service aa3af4
uint64_t sleep_time_usec = SLEEP_TIME_USEC;
Packit Service aa3af4
Packit Service aa3af4
Packit Service aa3af4
void usage(void)
Packit Service aa3af4
{
Packit Service aa3af4
	printf("Usage:\n");
Packit Service aa3af4
	printf("\t-l\t<MANDATORY! local interface ip address for mc and uc>\n");
Packit Service aa3af4
	printf("\t[-n]\t<optional num of mc packets before marking for QUOTE. default 200000>\n");
Packit Service aa3af4
	printf("\t[-m]\t<optional mc address. default - 224.0.1.2>\n");
Packit Service aa3af4
	printf("\t[-pm]\t<optional mc destination port. default 15111>\n");
Packit Service aa3af4
	printf("\t[-lp]\t<optional local uc port. default 15222>\n");
Packit Service aa3af4
	printf("\t[-sm]\t<optional mc massage payload size. default 200>\n");
Packit Service aa3af4
	printf("\t[-su]\t<optional uc massage payload size. MIN value = 10. default 12>\n");
Packit Service aa3af4
	printf("\t[-u]\t<optional sleep time in usec between each mc packet send. default 10usec>\n");
Packit Service aa3af4
}
Packit Service aa3af4
Packit Service aa3af4
Packit Service aa3af4
int prepare_socket()
Packit Service aa3af4
{
Packit Service aa3af4
	struct sockaddr_in groupsock;
Packit Service aa3af4
	struct in_addr localInterface;
Packit Service aa3af4
Packit Service aa3af4
	int fd = socket(AF_INET, SOCK_DGRAM, 0);
Packit Service aa3af4
	if(fd < 0)
Packit Service aa3af4
	{
Packit Service aa3af4
			perror("Opening datagram socket error");
Packit Service aa3af4
			exit(1);
Packit Service aa3af4
	}
Packit Service aa3af4
Packit Service aa3af4
	memset(&groupsock, 0, sizeof(groupsock));
Packit Service aa3af4
	groupsock.sin_family            = AF_INET;
Packit Service aa3af4
	groupsock.sin_addr.s_addr       = inet_addr(mc_address);
Packit Service aa3af4
	groupsock.sin_port              = htons(mc_dest_port);
Packit Service aa3af4
Packit Service aa3af4
	/* Disable loopback so you do not receive your own datagrams.*/
Packit Service aa3af4
	char loopch = 0;
Packit Service aa3af4
	if(setsockopt(fd, IPPROTO_IP, IP_MULTICAST_LOOP, (char *)&loopch, sizeof(loopch)) < 0)
Packit Service aa3af4
	{
Packit Service aa3af4
		perror("Setting IP_MULTICAST_LOOP error");
Packit Service aa3af4
		close(fd);
Packit Service aa3af4
		exit(1);
Packit Service aa3af4
    	}
Packit Service aa3af4
Packit Service aa3af4
	/* Set local interface for outbound multicast datagrams. */
Packit Service aa3af4
	localInterface.s_addr = inet_addr(if_address);
Packit Service aa3af4
	if(setsockopt(fd, IPPROTO_IP, IP_MULTICAST_IF, (char *)&localInterface, sizeof(localInterface)) < 0)
Packit Service aa3af4
	{
Packit Service aa3af4
		perror("Setting local interface error");
Packit Service aa3af4
 		exit(1);
Packit Service aa3af4
	}
Packit Service aa3af4
Packit Service aa3af4
	printf("Connecting..\n");
Packit Service aa3af4
	if(connect(fd, (struct sockaddr *) &groupsock, sizeof(struct sockaddr)))
Packit Service aa3af4
	{
Packit Service aa3af4
		perror("connect");
Packit Service aa3af4
		close(fd);
Packit Service aa3af4
 		exit(1);
Packit Service aa3af4
	}
Packit Service aa3af4
Packit Service aa3af4
        return fd;
Packit Service aa3af4
}
Packit Service aa3af4
Packit Service aa3af4
Packit Service aa3af4
void* send_mc_loop(void* num)
Packit Service aa3af4
{
Packit Service aa3af4
	int ret;
Packit Service aa3af4
	char databuf[mc_bufflen];
Packit Service aa3af4
	char quote[] = "QUOTE";
Packit Service aa3af4
	uint64_t delta_usec, delta_usec_sleep;
Packit Service aa3af4
Packit Service aa3af4
	/* Prepare MC socket */
Packit Service aa3af4
	printf("Opening datagram MC socket\n");
Packit Service aa3af4
	fd_list[MC_SOCKET] = prepare_socket();
Packit Service aa3af4
Packit Service aa3af4
	// Prepare to start measurements
Packit Service aa3af4
	tx_pkt_count = 0;
Packit Service aa3af4
	struct timeval tv_start, tv_sleep_start, tv_sleep_end;
Packit Service aa3af4
	gettimeofday(&tv_start, NULL);
Packit Service aa3af4
	gettimeofday(&tv_sleep_start, NULL);
Packit Service aa3af4
	gettimeofday(&tv_sleep_end, NULL);
Packit Service aa3af4
Packit Service aa3af4
	while(true)
Packit Service aa3af4
	{
Packit Service aa3af4
		delta_usec_sleep = ((tv_sleep_end.tv_sec - tv_sleep_start.tv_sec) * 1000000) + (tv_sleep_end.tv_usec - tv_sleep_start.tv_usec);
Packit Service aa3af4
		if (delta_usec_sleep > sleep_time_usec)
Packit Service aa3af4
		{
Packit Service aa3af4
			ret = send(fd_list[MC_SOCKET], databuf, mc_bufflen, 0);	// Can use send with UDP socket because called connect() before...
Packit Service aa3af4
			if (ret < 0)
Packit Service aa3af4
				printf("ERROR on SEND errno = %s\n", strerror(errno));
Packit Service aa3af4
			tx_pkt_count++;
Packit Service aa3af4
			tv_sleep_start = tv_sleep_end;
Packit Service aa3af4
		}
Packit Service aa3af4
		else
Packit Service aa3af4
		{
Packit Service aa3af4
			gettimeofday(&tv_sleep_end, NULL);
Packit Service aa3af4
		}
Packit Service aa3af4
Packit Service aa3af4
Packit Service aa3af4
		if ((tx_pkt_count != 0) && (tx_pkt_count % num_packets) == 0) {
Packit Service aa3af4
			struct timeval tv_now;
Packit Service aa3af4
			gettimeofday(&tv_now, NULL);
Packit Service aa3af4
			delta_usec = ((tv_now.tv_sec - tv_start.tv_sec) * 1000000) + (tv_now.tv_usec - tv_start.tv_usec);
Packit Service aa3af4
			tv_start = tv_now;
Packit Service aa3af4
Packit Service aa3af4
			double mps = 1000000 * (tx_pkt_count/(double)delta_usec);
Packit Service aa3af4
			double bwGbps = mps * mc_bufflen * 8/(1024*1024*1024);
Packit Service aa3af4
			printf("BW(Gbps)=%6.3f, MPS=%10.0f\n", bwGbps, mps);
Packit Service aa3af4
			tx_pkt_count = 0;
Packit Service aa3af4
Packit Service aa3af4
			gettimeofday(&tv_quote_start, NULL);
Packit Service aa3af4
			ret = send(fd_list[MC_SOCKET], quote, sizeof(quote), 0);
Packit Service aa3af4
			if (ret < 0)
Packit Service aa3af4
				printf("ERROR on SEND errno = %s\n", strerror(errno));
Packit Service aa3af4
		}
Packit Service aa3af4
	}
Packit Service aa3af4
Packit Service aa3af4
	return 0;
Packit Service aa3af4
}
Packit Service aa3af4
Packit Service aa3af4
Packit Service aa3af4
void * uc_func(void * num)
Packit Service aa3af4
{
Packit Service aa3af4
	struct sockaddr_in localSock, servaddr;
Packit Service aa3af4
	socklen_t servaddrlen = sizeof(struct sockaddr);
Packit Service aa3af4
	char buf[uc_bufflen], ord_ack[] = "ORD_ACK", ka_ack[] = "KA_ACK";
Packit Service aa3af4
	int ret, print = 0;
Packit Service aa3af4
Packit Service aa3af4
	fd_list[UC_SOCKET] = socket(AF_INET, SOCK_DGRAM, 0);
Packit Service aa3af4
	if(fd_list[UC_SOCKET] < 0)
Packit Service aa3af4
	{
Packit Service aa3af4
			perror("Opening datagram UC socket error");
Packit Service aa3af4
			exit(1);
Packit Service aa3af4
	}
Packit Service aa3af4
	printf("Opening datagram UC socket....OK.\n");
Packit Service aa3af4
	memset((char *) &localSock, 0, sizeof(localSock));
Packit Service aa3af4
	localSock.sin_family = AF_INET;
Packit Service aa3af4
	localSock.sin_addr.s_addr = inet_addr(if_address);
Packit Service aa3af4
	localSock.sin_port = htons(uc_local_port);
Packit Service aa3af4
Packit Service aa3af4
	if(bind(fd_list[UC_SOCKET], (struct sockaddr*)&localSock, sizeof(struct sockaddr)))
Packit Service aa3af4
	{
Packit Service aa3af4
		perror("Binding datagram UC socket error");
Packit Service aa3af4
		close(fd_list[UC_SOCKET]);
Packit Service aa3af4
		exit(1);
Packit Service aa3af4
	}
Packit Service aa3af4
	else
Packit Service aa3af4
	{
Packit Service aa3af4
		printf("Binding datagram UC socket...OK.\n");
Packit Service aa3af4
	}
Packit Service aa3af4
Packit Service aa3af4
	while(1)
Packit Service aa3af4
	{
Packit Service aa3af4
		ret = recvfrom(fd_list[UC_SOCKET], buf, uc_bufflen, 0, (struct sockaddr *)&servaddr, &servaddrlen);
Packit Service aa3af4
		gettimeofday(&tv_quote_end, NULL);
Packit Service aa3af4
		if (ret < 0)
Packit Service aa3af4
		{
Packit Service aa3af4
			printf("ERROR on RECV errno = %s \n", strerror(errno));
Packit Service aa3af4
			printf("errno value = %d\n", errno);
Packit Service aa3af4
		}
Packit Service aa3af4
		else
Packit Service aa3af4
		{
Packit Service aa3af4
			if (strcmp(buf, "ORD") == 0)
Packit Service aa3af4
			{
Packit Service aa3af4
				ret = sendto(fd_list[UC_SOCKET], ord_ack, sizeof(ord_ack), 0, (struct sockaddr *) &servaddr, sizeof(struct sockaddr));
Packit Service aa3af4
				if (ret < 0)
Packit Service aa3af4
				{
Packit Service aa3af4
					printf("ERROR on SEND UC errno = %s \n", strerror(errno));
Packit Service aa3af4
					printf("errno value = %d\n", errno);
Packit Service aa3af4
				}
Packit Service aa3af4
				print = 1;
Packit Service aa3af4
			}
Packit Service aa3af4
			else if (strcmp(buf, "KA") == 0){
Packit Service aa3af4
				ret = sendto(fd_list[UC_SOCKET], ka_ack, sizeof(ka_ack), 0, (struct sockaddr *) &servaddr, sizeof(struct sockaddr));
Packit Service aa3af4
				if (ret < 0)
Packit Service aa3af4
				{
Packit Service aa3af4
					printf("ERROR on SEND UC errno = %s \n", strerror(errno));
Packit Service aa3af4
					printf("errno value = %d\n", errno);
Packit Service aa3af4
				}
Packit Service aa3af4
			}
Packit Service aa3af4
			else{
Packit Service aa3af4
				printf("Internal error: Exchange received UC packet- not ORD or KA\n");
Packit Service aa3af4
			}
Packit Service aa3af4
Packit Service aa3af4
			if (print)
Packit Service aa3af4
			{
Packit Service aa3af4
				delta_usec_quote = ((tv_quote_end.tv_sec - tv_quote_start.tv_sec) * 1000000) + (tv_quote_end.tv_usec - tv_quote_start.tv_usec);
Packit Service aa3af4
				printf("@@@@@@@ QUOTE from port %u RTT in usec = %llu @@@@@@@\n", ntohs(servaddr.sin_port), (long long unsigned int)delta_usec_quote);
Packit Service aa3af4
				print = 0;
Packit Service aa3af4
			}
Packit Service aa3af4
		}
Packit Service aa3af4
	}
Packit Service aa3af4
Packit Service aa3af4
	close(fd_list[UC_SOCKET]);
Packit Service aa3af4
	printf("closed UC socket\n");
Packit Service aa3af4
	return 0;
Packit Service aa3af4
}
Packit Service aa3af4
Packit Service aa3af4
Packit Service aa3af4
int main(int argc, char *argv[])
Packit Service aa3af4
{
Packit Service aa3af4
	pthread_t uc_thread;
Packit Service aa3af4
	int nThreadId = 1, i;
Packit Service aa3af4
Packit Service aa3af4
	for (i=1; i
Packit Service aa3af4
	{
Packit Service aa3af4
		if (strcmp(argv[i], "-l") == 0) {
Packit Service aa3af4
			strcpy(if_address, argv[i+1]);
Packit Service aa3af4
		} else if (strcmp(argv[i], "-n") == 0) {
Packit Service aa3af4
			num_packets = atoi(argv[i+1]);
Packit Service aa3af4
		} else if (strcmp(argv[i], "-m") == 0) {
Packit Service aa3af4
			strcpy(mc_address, argv[i+1]);
Packit Service aa3af4
		} else if (strcmp(argv[i], "-pm") == 0) {
Packit Service aa3af4
			mc_dest_port = atoi(argv[i+1]);
Packit Service aa3af4
		} else if (strcmp(argv[i], "-lp") == 0) {
Packit Service aa3af4
			uc_local_port = atoi(argv[i+1]);
Packit Service aa3af4
		} else if (strcmp(argv[i], "-sm") == 0) {
Packit Service aa3af4
			mc_bufflen = atoi(argv[i+1]);
Packit Service aa3af4
		} else if (strcmp(argv[i], "-su") == 0) {
Packit Service aa3af4
			uc_bufflen = atoi(argv[i+1]);
Packit Service aa3af4
			if (uc_bufflen < MIN_UC_BUFFLEN) {
Packit Service aa3af4
				uc_bufflen = MIN_UC_BUFFLEN;
Packit Service aa3af4
			}
Packit Service aa3af4
		} else if (strcmp(argv[i], "-u") == 0) {
Packit Service aa3af4
			sleep_time_usec = atoi(argv[i+1]);
Packit Service aa3af4
		} else if ((strcmp(argv[i], "-h") == 0) || (strcmp(argv[i], "-help") == 0) || (strcmp(argv[i], "--help") == 0) || (strcmp(argv[i], "--h") == 0)) {
Packit Service aa3af4
			usage();
Packit Service aa3af4
			return 0;
Packit Service aa3af4
		}
Packit Service aa3af4
	}
Packit Service aa3af4
Packit Service aa3af4
	if ((argc == 1) || (strcmp(if_address, "NO IF ADDRESS!!!") == 0)) {
Packit Service aa3af4
		usage();
Packit Service aa3af4
		return 0;
Packit Service aa3af4
	}
Packit Service aa3af4
Packit Service aa3af4
	pthread_create(&uc_thread, NULL, uc_func, (void*)nThreadId);
Packit Service aa3af4
Packit Service aa3af4
	send_mc_loop(0);
Packit Service aa3af4
Packit Service aa3af4
	printf("Going to close MC socket\n");
Packit Service aa3af4
	close(fd_list[MC_SOCKET]);
Packit Service aa3af4
	printf("Closed MC socket\n");
Packit Service aa3af4
Packit Service aa3af4
	return 0;
Packit Service aa3af4
Packit Service aa3af4
}