Blob Blame History Raw
/* 
** Build command: g++ -lpthread exchange.cpp -o exchange
*/

#include <stdio.h>
#include <sys/select.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <stdlib.h>
#include <memory.h>
#include <stdint.h>
#include <string.h>
#include <pthread.h>
#include <sched.h>
#include <errno.h>

#define NUM_SOCKETS 2
#define MC_SOCKET 0
#define UC_SOCKET 1
#define NUM_PACKETS 200000
#define IF_ADDRESS "1.1.1.18"
#define UC_SERVER_ADDRESS "1.1.1.19"
#define MC_ADDRESS "224.0.1.2"
#define MC_DEST_PORT 15111
#define UC_LOCAL_PORT 15222
#define UC_SERVER_PORT 15333
#define MC_BUFFLEN 200
#define UC_BUFFLEN 12
#define MIN_UC_BUFFLEN 10
#define SLEEP_TIME_USEC 10
#define MAX_PARAM_LENGTH 20

int fd_list[NUM_SOCKETS];
uint64_t tx_pkt_count, delta_usec_quote;
struct timeval tv_quote_start, tv_quote_end;

char if_address[MAX_PARAM_LENGTH] = "NO IF ADDRESS!!!";
int num_packets = NUM_PACKETS;
char mc_address[MAX_PARAM_LENGTH] = MC_ADDRESS;
uint16_t mc_dest_port = MC_DEST_PORT;
uint16_t uc_local_port = UC_LOCAL_PORT;
int mc_bufflen = MC_BUFFLEN;
int uc_bufflen = UC_BUFFLEN;
uint64_t sleep_time_usec = SLEEP_TIME_USEC;


void usage(void)
{
	printf("Usage:\n");
	printf("\t-l\t<MANDATORY! local interface ip address for mc and uc>\n");
	printf("\t[-n]\t<optional num of mc packets before marking for QUOTE. default 200000>\n");
	printf("\t[-m]\t<optional mc address. default - 224.0.1.2>\n");
	printf("\t[-pm]\t<optional mc destination port. default 15111>\n");
	printf("\t[-lp]\t<optional local uc port. default 15222>\n");
	printf("\t[-sm]\t<optional mc massage payload size. default 200>\n");
	printf("\t[-su]\t<optional uc massage payload size. MIN value = 10. default 12>\n");
	printf("\t[-u]\t<optional sleep time in usec between each mc packet send. default 10usec>\n");
}


int prepare_socket()
{
	struct sockaddr_in groupsock;
	struct in_addr localInterface;

	int fd = socket(AF_INET, SOCK_DGRAM, 0);
	if(fd < 0)
	{
			perror("Opening datagram socket error");
			exit(1);
	}

	memset(&groupsock, 0, sizeof(groupsock));
	groupsock.sin_family            = AF_INET;
	groupsock.sin_addr.s_addr       = inet_addr(mc_address);
	groupsock.sin_port              = htons(mc_dest_port);

	/* Disable loopback so you do not receive your own datagrams.*/
	char loopch = 0;
	if(setsockopt(fd, IPPROTO_IP, IP_MULTICAST_LOOP, (char *)&loopch, sizeof(loopch)) < 0)
	{
		perror("Setting IP_MULTICAST_LOOP error");
		close(fd);
		exit(1);
    	}

	/* Set local interface for outbound multicast datagrams. */
	localInterface.s_addr = inet_addr(if_address);
	if(setsockopt(fd, IPPROTO_IP, IP_MULTICAST_IF, (char *)&localInterface, sizeof(localInterface)) < 0)
	{
		perror("Setting local interface error");
 		exit(1);
	}

	printf("Connecting..\n");
	if(connect(fd, (struct sockaddr *) &groupsock, sizeof(struct sockaddr)))
	{
		perror("connect");
		close(fd);
 		exit(1);
	}

        return fd;
}


void* send_mc_loop(void* num)
{
	int ret;
	char databuf[mc_bufflen];
	char quote[] = "QUOTE";
	uint64_t delta_usec, delta_usec_sleep;

	/* Prepare MC socket */
	printf("Opening datagram MC socket\n");
	fd_list[MC_SOCKET] = prepare_socket();

	// Prepare to start measurements
	tx_pkt_count = 0;
	struct timeval tv_start, tv_sleep_start, tv_sleep_end;
	gettimeofday(&tv_start, NULL);
	gettimeofday(&tv_sleep_start, NULL);
	gettimeofday(&tv_sleep_end, NULL);

	while(true)
	{
		delta_usec_sleep = ((tv_sleep_end.tv_sec - tv_sleep_start.tv_sec) * 1000000) + (tv_sleep_end.tv_usec - tv_sleep_start.tv_usec);
		if (delta_usec_sleep > sleep_time_usec)
		{
			ret = send(fd_list[MC_SOCKET], databuf, mc_bufflen, 0);	// Can use send with UDP socket because called connect() before...
			if (ret < 0)
				printf("ERROR on SEND errno = %s\n", strerror(errno));
			tx_pkt_count++;
			tv_sleep_start = tv_sleep_end;
		}
		else
		{
			gettimeofday(&tv_sleep_end, NULL);
		}


		if ((tx_pkt_count != 0) && (tx_pkt_count % num_packets) == 0) {
			struct timeval tv_now;
			gettimeofday(&tv_now, NULL);
			delta_usec = ((tv_now.tv_sec - tv_start.tv_sec) * 1000000) + (tv_now.tv_usec - tv_start.tv_usec);
			tv_start = tv_now;

			double mps = 1000000 * (tx_pkt_count/(double)delta_usec);
			double bwGbps = mps * mc_bufflen * 8/(1024*1024*1024);
			printf("BW(Gbps)=%6.3f, MPS=%10.0f\n", bwGbps, mps);
			tx_pkt_count = 0;

			gettimeofday(&tv_quote_start, NULL);
			ret = send(fd_list[MC_SOCKET], quote, sizeof(quote), 0);
			if (ret < 0)
				printf("ERROR on SEND errno = %s\n", strerror(errno));
		}
	}

	return 0;
}


void * uc_func(void * num)
{
	struct sockaddr_in localSock, servaddr;
	socklen_t servaddrlen = sizeof(struct sockaddr);
	char buf[uc_bufflen], ord_ack[] = "ORD_ACK", ka_ack[] = "KA_ACK";
	int ret, print = 0;

	fd_list[UC_SOCKET] = socket(AF_INET, SOCK_DGRAM, 0);
	if(fd_list[UC_SOCKET] < 0)
	{
			perror("Opening datagram UC socket error");
			exit(1);
	}
	printf("Opening datagram UC socket....OK.\n");
	memset((char *) &localSock, 0, sizeof(localSock));
	localSock.sin_family = AF_INET;
	localSock.sin_addr.s_addr = inet_addr(if_address);
	localSock.sin_port = htons(uc_local_port);

	if(bind(fd_list[UC_SOCKET], (struct sockaddr*)&localSock, sizeof(struct sockaddr)))
	{
		perror("Binding datagram UC socket error");
		close(fd_list[UC_SOCKET]);
		exit(1);
	}
	else
	{
		printf("Binding datagram UC socket...OK.\n");
	}

	while(1)
	{
		ret = recvfrom(fd_list[UC_SOCKET], buf, uc_bufflen, 0, (struct sockaddr *)&servaddr, &servaddrlen);
		gettimeofday(&tv_quote_end, NULL);
		if (ret < 0)
		{
			printf("ERROR on RECV errno = %s \n", strerror(errno));
			printf("errno value = %d\n", errno);
		}
		else
		{
			if (strcmp(buf, "ORD") == 0)
			{
				ret = sendto(fd_list[UC_SOCKET], ord_ack, sizeof(ord_ack), 0, (struct sockaddr *) &servaddr, sizeof(struct sockaddr));
				if (ret < 0)
				{
					printf("ERROR on SEND UC errno = %s \n", strerror(errno));
					printf("errno value = %d\n", errno);
				}
				print = 1;
			}
			else if (strcmp(buf, "KA") == 0){
				ret = sendto(fd_list[UC_SOCKET], ka_ack, sizeof(ka_ack), 0, (struct sockaddr *) &servaddr, sizeof(struct sockaddr));
				if (ret < 0)
				{
					printf("ERROR on SEND UC errno = %s \n", strerror(errno));
					printf("errno value = %d\n", errno);
				}
			}
			else{
				printf("Internal error: Exchange received UC packet- not ORD or KA\n");
			}

			if (print)
			{
				delta_usec_quote = ((tv_quote_end.tv_sec - tv_quote_start.tv_sec) * 1000000) + (tv_quote_end.tv_usec - tv_quote_start.tv_usec);
				printf("@@@@@@@ QUOTE from port %u RTT in usec = %llu @@@@@@@\n", ntohs(servaddr.sin_port), (long long unsigned int)delta_usec_quote);
				print = 0;
			}
		}
	}

	close(fd_list[UC_SOCKET]);
	printf("closed UC socket\n");
	return 0;
}


int main(int argc, char *argv[])
{
	pthread_t uc_thread;
	int nThreadId = 1, i;

	for (i=1; i<argc; i++)
	{
		if (strcmp(argv[i], "-l") == 0) {
			strcpy(if_address, argv[i+1]);
		} else if (strcmp(argv[i], "-n") == 0) {
			num_packets = atoi(argv[i+1]);
		} else if (strcmp(argv[i], "-m") == 0) {
			strcpy(mc_address, argv[i+1]);
		} else if (strcmp(argv[i], "-pm") == 0) {
			mc_dest_port = atoi(argv[i+1]);
		} else if (strcmp(argv[i], "-lp") == 0) {
			uc_local_port = atoi(argv[i+1]);
		} else if (strcmp(argv[i], "-sm") == 0) {
			mc_bufflen = atoi(argv[i+1]);
		} else if (strcmp(argv[i], "-su") == 0) {
			uc_bufflen = atoi(argv[i+1]);
			if (uc_bufflen < MIN_UC_BUFFLEN) {
				uc_bufflen = MIN_UC_BUFFLEN;
			}
		} else if (strcmp(argv[i], "-u") == 0) {
			sleep_time_usec = atoi(argv[i+1]);
		} else if ((strcmp(argv[i], "-h") == 0) || (strcmp(argv[i], "-help") == 0) || (strcmp(argv[i], "--help") == 0) || (strcmp(argv[i], "--h") == 0)) {
			usage();
			return 0;
		}
	}

	if ((argc == 1) || (strcmp(if_address, "NO IF ADDRESS!!!") == 0)) {
		usage();
		return 0;
	}

	pthread_create(&uc_thread, NULL, uc_func, (void*)nThreadId);

	send_mc_loop(0);

	printf("Going to close MC socket\n");
	close(fd_list[MC_SOCKET]);
	printf("Closed MC socket\n");

	return 0;

}