Blob Blame History Raw
/*
 * Copyright (c) 2001-2020 Mellanox Technologies, Ltd. All rights reserved.
 *
 * This software is available to you under a choice of one of two
 * licenses.  You may choose to be licensed under the terms of the GNU
 * General Public License (GPL) Version 2, available from the file
 * COPYING in the main directory of this source tree, or the
 * BSD license below:
 *
 *     Redistribution and use in source and binary forms, with or
 *     without modification, are permitted provided that the following
 *     conditions are met:
 *
 *      - Redistributions of source code must retain the above
 *        copyright notice, this list of conditions and the following
 *        disclaimer.
 *
 *      - Redistributions in binary form must reproduce the above
 *        copyright notice, this list of conditions and the following
 *        disclaimer in the documentation and/or other materials
 *        provided with the distribution.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 * SOFTWARE.
 */

/*
 ** Build command: g++ -lpthread -lrt trader.cpp -o trader
 */

#include <stdio.h>
#include <sys/select.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <stdlib.h>
#include <memory.h>
#include <stdint.h>
#include <string.h>
#include <pthread.h>
#include <errno.h>

using namespace std;

#define NUM_PAIR_OF_THREADS 1
#define IF_ADDRESS "1.1.1.19"
#define UC_SERVER_ADDRESS "1.1.1.18"
#define MC_ADDRESS "224.0.1.2"
#define UC_SERVER_PORT 15222
#define MC_LOCAL_PORT 15111
#define UC_LOCAL_PORT 15333
#define MC_BUFFLEN 200
#define UC_BUFFLEN 8
#define MIN_UC_BUFFLEN 10
#define RECV_PACKETS_AMOUNT 300000
#define SEND_PACKETS_AMOUNT 1
#define MAX_PARAM_LENGTH 20
#define MAX_THREADS_PAIRS 50
#define KEEP_ALIVE_INTERVAL 20
#define TCP_DUMMY_SEND_RATE 100
#define TCP_KA_CPU 1
#define MC_CPU 2
#define TCP_RECV_CPU 3


char if_address[MAX_PARAM_LENGTH] = "NO IF ADDRESS!!!";
char uc_server_address[MAX_PARAM_LENGTH] = "NO UC SERV ADDRESS!";
int num_pair_of_threads = NUM_PAIR_OF_THREADS;
uint64_t recv_packets_amount = RECV_PACKETS_AMOUNT;
uint64_t send_packets_amount = SEND_PACKETS_AMOUNT;
char mc_address[MAX_PARAM_LENGTH] = MC_ADDRESS;
uint16_t mc_local_port = MC_LOCAL_PORT;
uint16_t uc_server_port = UC_SERVER_PORT;
uint16_t uc_local_port = UC_LOCAL_PORT;
int mc_bufflen = MC_BUFFLEN;
int uc_bufflen = UC_BUFFLEN;
int keep_alive_interval = KEEP_ALIVE_INTERVAL;
int keep_alive_cpu = TCP_KA_CPU;
int mc_cpu = MC_CPU;
int tcp_recv_cpu = TCP_RECV_CPU;
int tcp_dummy_send_rate = TCP_DUMMY_SEND_RATE;
int disable_ka = 0;
int disable_tcp_recv = 0;


struct ThreadsPair
{
	int mc_fd;
	int uc_fd;
};

ThreadsPair fd_list[MAX_THREADS_PAIRS];
struct timeval tv_order_start, tv_order_end;
pthread_spinlock_t uc_spinlock_arr[MAX_THREADS_PAIRS];

void usage(void)
{
	printf("Usage:\n");
	printf("\t-l\t<MANDATORY! local interface ip address for mc and uc (tcp)>\n");
	printf("\t-ua\t<MANDATORY! uc (tcp) server address>\n");
	//printf("\t[-nt]\t<optional num pair of threads. default 1>\n");
	//printf("\t[-n]\t<optional num of received mc packets before printing. default 300000>\n");
	printf("\t[-ns]\t<optional num of tcp send packets before printing. default 1>\n");
	printf("\t[-m]\t<optional mc address. default - 224.0.1.2>\n");
	printf("\t[-pm]\t<optional mc local port. default 15111>\n");
	printf("\t[-up]\t<optional uc server port. default 15222>\n");
	printf("\t[-lp]\t<optional local uc port. default 15333>\n");
	printf("\t[-sm]\t<optional mc massage payload size. default 200>\n");
	printf("\t[-su]\t<optional uc massage payload size. MIN value = 10. default 12>\n");
	printf("\t[-ka]\t<optional keep alive interval, i.e. time in usec between keep alive packets. default 1000000 usec>\n");
	printf("\t[-kac]\t<optional TCP keep alive thread cpu core. default 1>\n");
	printf("\t[-mcc]\t<optional MC recv thread cpu core. default 2>\n");
	printf("\t[-trc]\t<optional TCP recv thread cpu core. default 3>\n");
	printf("\t[-tds]\t<optional TCP dummy send rate, i.e. for each X MC packets, send dummy TCP. default 100>\n");
	printf("\t[-dtr]\t<optional disable TCP recv thread. default 0 (use 1 to disable)>\n");
	printf("\t[-dka]\t<optional disable TCP keep alive thread. default 0 (use 1 to disable)>\n");
}


int prepare_mc_socket(int sock_num)
{
	struct sockaddr_in localSock;
	struct ip_mreq group;
	int fd;

	fd = socket(AF_INET, SOCK_DGRAM, 0);
	if(fd < 0)
	{
		printf("Opening MC datagram socket num = %d error", sock_num);
		exit(1);
	}
	else
	{
		printf("Opening MC datagram socket num = %d....OK.\n", sock_num);
	}

	/* Enable SO_REUSEADDR to allow multiple instances of this */
	/* application to receive copies of the multicast datagrams. */
	int reuse = 1;
	if(setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *)&reuse, sizeof(reuse)) < 0)
	{
		printf("Setting SO_REUSEADDR for MC datagram socket num = %d error!!!", sock_num);
		close(fd);
		exit(1);
	}
	else
		printf("Setting SO_REUSEADDR on MC socket num = %d...OK.\n", sock_num);

	/* Bind to the proper port number with the IP address */
	/* specified as INADDR_ANY. */
	memset((char *)&localSock, 0, sizeof(localSock));
	localSock.sin_family = AF_INET;
	localSock.sin_addr.s_addr = INADDR_ANY;
	localSock.sin_port = htons(mc_local_port);
	if(bind(fd, (struct sockaddr*)&localSock, sizeof(struct sockaddr)))
	{
		printf("Binding MC datagram socket num = %d error", sock_num);
		close(fd);
		exit(1);
	}
	else
	{
		printf("Binding MC datagram socket num = %d...OK.\n", sock_num);
	}

	/* Join the multicast group on the local 1.1.1.19 */
	/* interface. Note that this IP_ADD_MEMBERSHIP option must be */
	/* called for each local interface over which the multicast */
	/* datagrams are to be received. */
	group.imr_multiaddr.s_addr = inet_addr(mc_address);
	group.imr_interface.s_addr = inet_addr(if_address);
	if(setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, (char *)&group, sizeof(group)) < 0)
	{
		printf("Adding multicast group for socket num = %d error", sock_num);
		close(fd);
		exit(1);
	}
	else
	{
		printf("Adding multicast group for socket num = %d...OK.\n", sock_num);
	}

	return fd;
}

#include <netinet/tcp.h>
int prepare_tcp_socket(int sock_num)
{
	struct sockaddr_in localSock;
	int fd;

	fd = socket(AF_INET, SOCK_STREAM, 0);
	if(fd < 0)
	{
		perror("Opening tcp socket error");
		exit(1);
	}
	printf("Opening tcp socket fd=%d....OK.\n", fd);
	memset((char *) &localSock, 0, sizeof(localSock));
	localSock.sin_family = AF_INET;
	localSock.sin_addr.s_addr = inet_addr(if_address);
	localSock.sin_port = htons(uc_local_port+sock_num-1);

	int flag = 1;
	if (setsockopt(fd,IPPROTO_TCP,TCP_NODELAY,(char *)&flag,sizeof(flag)) == -1)
	{
		perror("SETSOCKOPT tcp socket error");
		close(fd);
		exit(1);
	}

	if(bind(fd, (struct sockaddr*)&localSock, sizeof(struct sockaddr)))
	{
		perror("Binding tcp socket error");
		close(fd);
		exit(1);
	}

	printf("Binding tcp socket num %d....OK.\n", sock_num);


	struct sockaddr_in remoteSock;
	memset((char *) &remoteSock, 0, sizeof(remoteSock));
	remoteSock.sin_family = AF_INET;
	remoteSock.sin_addr.s_addr = inet_addr(uc_server_address);
	remoteSock.sin_port = htons(uc_server_port);

	if (connect(fd, (struct sockaddr*)&remoteSock, sizeof(struct sockaddr)) != 0) {
		perror("Connect tcp socket error");
		close(fd);
		exit(1);
	}

	printf("Connect tcp socket num %d....OK.\n", sock_num);

	return fd;
}

void * tcp_ka_func(void * num)
{
	int thread_num = (long int)num;
	char ka[] = "KAA";
	int ret;
	while(1)
	{
		if (!fd_list[thread_num].uc_fd) continue;

		ret = send(fd_list[thread_num].uc_fd, ka, sizeof(ka), 0);
		if (ret < 0)
		{
			printf("ERROR on SEND KA errno = %s\n", strerror(errno));
			printf("errno value = %d\n", errno);
		}
		usleep(keep_alive_interval);
	}
	return 0;
}

void * tcp_recv_func(void * num)
{
	struct sockaddr_in servaddr;
	char buf[uc_bufflen], ka[] = "KAA";
	int ret;
	int thread_num = (long int)num;

	uint64_t delta_usec;


	while(1)
	{
		if (!fd_list[thread_num].uc_fd) continue;
		/* Timeout on recvfrom using setsockopt */
		ret = recv(fd_list[thread_num].uc_fd, buf, uc_bufflen, MSG_WAITALL);
		if (ret < 0)
		{
			if (errno == EAGAIN){	// meaning that Timeout occured

				ret = send(fd_list[thread_num].uc_fd, ka, sizeof(ka), 0);

				if (ret < 0)
				{
					printf("ERROR on SEND 1 errno = %s\n", strerror(errno));
					printf("errno value = %d\n", errno);
					for (int i=0; i< num_pair_of_threads; i++)
					{
						close(fd_list[i].uc_fd);
					}
					exit(1);
				}
			} else {
				printf("ERROR on SEND 2 errno = %s\n", strerror(errno));
				printf("errno value = %d\n", errno);
				for (int i=0; i< num_pair_of_threads; i++)
				{
					close(fd_list[i].uc_fd);
				}
				exit(1);
			}
		} else {		// packet received
			if (strcmp(buf, "ORD_ACK") == 0) {
				gettimeofday(&tv_order_end, NULL);
				delta_usec = ((tv_order_end.tv_sec - tv_order_start.tv_sec) * 1000000) + (tv_order_end.tv_usec - tv_order_start.tv_usec);
				//printf("#### Thread num = %d - ORDER sent and received ####. RTT time = %llu\n", thread_num+1, (long long unsigned int)delta_usec);
			} else if (strcmp(buf, "KAA_ACK") == 0) {
				//printf("DEBUG: *** Keep Alive sent and received ***\n");
			} else {
				printf("Internal error! UC packet received, not ORD_ACK or KA_ACK, buf=%s\n", buf);
				for (int i=0; i< num_pair_of_threads; i++)
				{
					close(fd_list[i].uc_fd);
				}
				exit(1);
			}
		}
	}

	return 0;
}

void * recv_loop(void * num)
{
	int  ret;
	int thread_num = (long int)num;
	char buf[mc_bufflen], order[] = "ORD";
	struct sockaddr_in servaddr;
	uint64_t rx_pkt_count, tx_pkt_count, delta_usec;
	int t = 0;
	char ka[] = "KAA";

	printf("MC Thread number %d entered recv_loop\n", thread_num+1);

	int dummy = socket(AF_INET, SOCK_STREAM, 0);
	int flag = 1;
	setsockopt(dummy,IPPROTO_TCP,TCP_NODELAY,(char *)&flag,sizeof(flag));
	struct sockaddr_in remoteSock;
	memset((char *) &remoteSock, 0, sizeof(remoteSock));
	remoteSock.sin_family = AF_INET;
	remoteSock.sin_addr.s_addr = inet_addr(uc_server_address);
	remoteSock.sin_port = htons(uc_server_port);

	connect(dummy, (struct sockaddr*)&remoteSock, sizeof(struct sockaddr));

	memset(&servaddr, 0, sizeof(servaddr));
	servaddr.sin_family      = AF_INET;
	servaddr.sin_port        = htons(uc_server_port);
	if (inet_aton(uc_server_address, &(servaddr.sin_addr)) <= 0)
	{
		printf("ERROR: Invalid IP address.\n");
		exit(1);
	}

	rx_pkt_count=0;
	tx_pkt_count = 0;

	struct timeval tv_start, tv_end;
	gettimeofday(&tv_start, NULL);

	while (true)
	{
		t++;

		ret = recv(fd_list[thread_num].mc_fd, buf, mc_bufflen, 0);

		if(ret == -1)
		{
			printf("ERROR in recv! errno = %s\n", strerror(errno));
			exit(1);
		}
		/*
		rx_pkt_count++;
		if (rx_pkt_count > recv_packets_amount)
		{
			gettimeofday(&tv_end, NULL);
			delta_usec = ((tv_end.tv_sec - tv_start.tv_sec) * 1000000) + (tv_end.tv_usec - tv_start.tv_usec);
			printf("MC thread num %d received %llu packets in usec = %llu\n", thread_num+1, (long long unsigned int)recv_packets_amount, (long long unsigned int)delta_usec);
			rx_pkt_count=0;
			gettimeofday(&tv_start, NULL);
		}
		 */
		if (strcmp(buf, "QUOTE") == 0) {
			gettimeofday(&tv_order_start, NULL);

			struct timespec ts_start = {0,0}, ts_end = {0,0}, ts_start1 = {0,0}, ts_end1 = {0,0};

			tx_pkt_count++;

			clock_gettime(CLOCK_MONOTONIC, &ts_start);
			ret = send(fd_list[thread_num].uc_fd, order, sizeof(order), 0);
			clock_gettime(CLOCK_MONOTONIC, &ts_end);

			if (tx_pkt_count >= send_packets_amount) {
				tx_pkt_count = 0;
				uint64_t delta_usec = ((ts_end.tv_sec - ts_start.tv_sec) * 1000000000) + (ts_end.tv_nsec - ts_start.tv_nsec);
				printf("MC thread number %d got QUOTE, sending TCP order (send time = %llu nsec)  \n", thread_num+1, (long long unsigned int)delta_usec);
			}
			if (ret < 0)
			{
				printf("ERROR on SEND errno = %s\n", strerror(errno));
				printf("errno value = %d\n", errno);
			}
		} else if (t % tcp_dummy_send_rate == 0){
			//dummy send
			send(fd_list[thread_num].uc_fd, NULL, 1, 0);
		}
	}

	return 0;
}

#include <sched.h>

int main(int argc, char *argv[])
{
	int i;

	for (i=1; i<argc; i++)
	{
		if (strcmp(argv[i], "-l") == 0) {
			strcpy(if_address, argv[i+1]);
		} else if (strcmp(argv[i], "-ua") == 0) {
			strcpy(uc_server_address, argv[i+1]);
		} else if (strcmp(argv[i], "-nt") == 0) {
			num_pair_of_threads = atoi(argv[i+1]);
		} else if (strcmp(argv[i], "-n") == 0) {
			recv_packets_amount = atoi(argv[i+1]);
		} else if (strcmp(argv[i], "-ns") == 0) {
			send_packets_amount = atoi(argv[i+1]);
		} else if (strcmp(argv[i], "-m") == 0) {
			strcpy(mc_address, argv[i+1]);
		} else if (strcmp(argv[i], "-pm") == 0) {
			mc_local_port = atoi(argv[i+1]);
		} else if (strcmp(argv[i], "-up") == 0) {
			uc_server_port = atoi(argv[i+1]);
		} else if (strcmp(argv[i], "-lp") == 0) {
			uc_local_port = atoi(argv[i+1]);
		} else if (strcmp(argv[i], "-sm") == 0) {
			mc_bufflen = atoi(argv[i+1]);
		} else if (strcmp(argv[i], "-su") == 0) {
			uc_bufflen = atoi(argv[i+1]);
			if (uc_bufflen < MIN_UC_BUFFLEN) {
				uc_bufflen = MIN_UC_BUFFLEN;
			}
		} else if (strcmp(argv[i], "-ka") == 0) {
			keep_alive_interval = atoi(argv[i+1]);
		} else if (strcmp(argv[i], "-kac") == 0) {
			keep_alive_cpu = atoi(argv[i+1]);
		} else if (strcmp(argv[i], "-mcc") == 0) {
			mc_cpu = atoi(argv[i+1]);
		} else if (strcmp(argv[i], "-trc") == 0) {
			tcp_recv_cpu = atoi(argv[i+1]);
		} else if (strcmp(argv[i], "-tds") == 0) {
			tcp_dummy_send_rate = atoi(argv[i+1]);
		} else if (strcmp(argv[i], "-dka") == 0) {
			disable_ka = atoi(argv[i+1]);
		} else if (strcmp(argv[i], "-dtr") == 0) {
			disable_tcp_recv = atoi(argv[i+1]);
		} else if ((strcmp(argv[i], "-h") == 0) || (strcmp(argv[i], "-help") == 0) || (strcmp(argv[i], "--help") == 0) || (strcmp(argv[i], "--h") == 0)) {
			usage();
			return 0;
		}
	}

	if ((argc == 1) || (strcmp(if_address, "NO IF ADDRESS!!!") == 0) || (strcmp(uc_server_address, "NO UC SERV ADDRESS!") == 0)) {
		usage();
		return 0;
	}

	for (i=0; i<num_pair_of_threads; i++)
	{
		printf("Opening MC datagram socket %d\n", i+1);
		fd_list[i].mc_fd = prepare_mc_socket(i+1);
		fd_list[i].uc_fd = prepare_tcp_socket(i+1);
		pthread_spin_init(&uc_spinlock_arr[i], 0);
	}

	pthread_t mc_thread_arr[num_pair_of_threads];
	pthread_t tcp_recv_thread_arr[num_pair_of_threads];
	pthread_t tcp_ka_thread_arr[num_pair_of_threads];

	cpu_set_t* cpu_set = NULL;
	cpu_set = CPU_ALLOC(64);
	if (!cpu_set) {
		perror("failed to allocate cpu set");
		return -1;
	}
	size_t cpu_set_size = CPU_ALLOC_SIZE(64);

	for (i=0; i<num_pair_of_threads; i++)
	{
		pthread_create(&mc_thread_arr[i], NULL, recv_loop,(void*)i);
		CPU_ZERO_S(cpu_set_size, cpu_set);
		CPU_SET_S(mc_cpu, cpu_set_size, cpu_set);
		if (pthread_setaffinity_np(mc_thread_arr[i], cpu_set_size, cpu_set)) {
			CPU_FREE(cpu_set);
			perror("failed to SET AFFINITY cpu set");
			return -1;
		}
		printf("attached MC recv thread to cpu = %d\n", mc_cpu);
		pthread_detach(mc_thread_arr[i]);

		if (!disable_tcp_recv) {
			pthread_create(&tcp_recv_thread_arr[i], NULL, tcp_recv_func, (void*)i);
			CPU_ZERO_S(cpu_set_size, cpu_set);
			CPU_SET_S(tcp_recv_cpu, cpu_set_size, cpu_set);
			if (pthread_setaffinity_np(tcp_recv_thread_arr[i], cpu_set_size, cpu_set)) {
				CPU_FREE(cpu_set);
				perror("failed to SET AFFINITY cpu set");
				return -1;
			}
			printf("attached TCP recv thread to cpu = %d\n", tcp_recv_cpu);
			pthread_detach(tcp_recv_thread_arr[i]);
		}

		if (!disable_ka) {
			pthread_create(&tcp_ka_thread_arr[i], NULL, tcp_ka_func, (void*)i);
			CPU_ZERO_S(cpu_set_size, cpu_set);
			CPU_SET_S(keep_alive_cpu, cpu_set_size, cpu_set);
			if (pthread_setaffinity_np(tcp_ka_thread_arr[i], cpu_set_size, cpu_set)) {
				CPU_FREE(cpu_set);
				perror("failed to SET AFFINITY cpu set");
				return -1;
			}
			printf("attached TCP keep alive thread to cpu = %d\n", keep_alive_cpu);
			pthread_detach(tcp_ka_thread_arr[i]);
		}
	}
	pause();

	for (i=0; i< num_pair_of_threads; i++)
	{
		printf("Closing mc socket %d\n", i+1);
		shutdown(fd_list[i].mc_fd, SHUT_RDWR);
		close(fd_list[i].mc_fd);
		shutdown(fd_list[i].uc_fd, SHUT_RDWR);
		close(fd_list[i].uc_fd);
	}
	printf("Closed all MC sockets\n");

	return 0;
}