/* * Copyright (c) 2001-2020 Mellanox Technologies, Ltd. All rights reserved. * * This software is available to you under a choice of one of two * licenses. You may choose to be licensed under the terms of the GNU * General Public License (GPL) Version 2, available from the file * COPYING in the main directory of this source tree, or the * BSD license below: * * Redistribution and use in source and binary forms, with or * without modification, are permitted provided that the following * conditions are met: * * - Redistributions of source code must retain the above * copyright notice, this list of conditions and the following * disclaimer. * * - Redistributions in binary form must reproduce the above * copyright notice, this list of conditions and the following * disclaimer in the documentation and/or other materials * provided with the distribution. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE * SOFTWARE. */ /* ** Build command: g++ -lpthread -lrt trader.cpp -o trader */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include using namespace std; #define NUM_PAIR_OF_THREADS 1 #define IF_ADDRESS "1.1.1.19" #define UC_SERVER_ADDRESS "1.1.1.18" #define MC_ADDRESS "224.0.1.2" #define UC_SERVER_PORT 15222 #define MC_LOCAL_PORT 15111 #define UC_LOCAL_PORT 15333 #define MC_BUFFLEN 200 #define UC_BUFFLEN 8 #define MIN_UC_BUFFLEN 10 #define RECV_PACKETS_AMOUNT 300000 #define SEND_PACKETS_AMOUNT 1 #define MAX_PARAM_LENGTH 20 #define MAX_THREADS_PAIRS 50 #define KEEP_ALIVE_INTERVAL 20 #define TCP_DUMMY_SEND_RATE 100 #define TCP_KA_CPU 1 #define MC_CPU 2 #define TCP_RECV_CPU 3 char if_address[MAX_PARAM_LENGTH] = "NO IF ADDRESS!!!"; char uc_server_address[MAX_PARAM_LENGTH] = "NO UC SERV ADDRESS!"; int num_pair_of_threads = NUM_PAIR_OF_THREADS; uint64_t recv_packets_amount = RECV_PACKETS_AMOUNT; uint64_t send_packets_amount = SEND_PACKETS_AMOUNT; char mc_address[MAX_PARAM_LENGTH] = MC_ADDRESS; uint16_t mc_local_port = MC_LOCAL_PORT; uint16_t uc_server_port = UC_SERVER_PORT; uint16_t uc_local_port = UC_LOCAL_PORT; int mc_bufflen = MC_BUFFLEN; int uc_bufflen = UC_BUFFLEN; int keep_alive_interval = KEEP_ALIVE_INTERVAL; int keep_alive_cpu = TCP_KA_CPU; int mc_cpu = MC_CPU; int tcp_recv_cpu = TCP_RECV_CPU; int tcp_dummy_send_rate = TCP_DUMMY_SEND_RATE; int disable_ka = 0; int disable_tcp_recv = 0; struct ThreadsPair { int mc_fd; int uc_fd; }; ThreadsPair fd_list[MAX_THREADS_PAIRS]; struct timeval tv_order_start, tv_order_end; pthread_spinlock_t uc_spinlock_arr[MAX_THREADS_PAIRS]; void usage(void) { printf("Usage:\n"); printf("\t-l\t\n"); printf("\t-ua\t\n"); //printf("\t[-nt]\t\n"); //printf("\t[-n]\t\n"); printf("\t[-ns]\t\n"); printf("\t[-m]\t\n"); printf("\t[-pm]\t\n"); printf("\t[-up]\t\n"); printf("\t[-lp]\t\n"); printf("\t[-sm]\t\n"); printf("\t[-su]\t\n"); printf("\t[-ka]\t\n"); printf("\t[-kac]\t\n"); printf("\t[-mcc]\t\n"); printf("\t[-trc]\t\n"); printf("\t[-tds]\t\n"); printf("\t[-dtr]\t\n"); printf("\t[-dka]\t\n"); } int prepare_mc_socket(int sock_num) { struct sockaddr_in localSock; struct ip_mreq group; int fd; fd = socket(AF_INET, SOCK_DGRAM, 0); if(fd < 0) { printf("Opening MC datagram socket num = %d error", sock_num); exit(1); } else { printf("Opening MC datagram socket num = %d....OK.\n", sock_num); } /* Enable SO_REUSEADDR to allow multiple instances of this */ /* application to receive copies of the multicast datagrams. */ int reuse = 1; if(setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *)&reuse, sizeof(reuse)) < 0) { printf("Setting SO_REUSEADDR for MC datagram socket num = %d error!!!", sock_num); close(fd); exit(1); } else printf("Setting SO_REUSEADDR on MC socket num = %d...OK.\n", sock_num); /* Bind to the proper port number with the IP address */ /* specified as INADDR_ANY. */ memset((char *)&localSock, 0, sizeof(localSock)); localSock.sin_family = AF_INET; localSock.sin_addr.s_addr = INADDR_ANY; localSock.sin_port = htons(mc_local_port); if(bind(fd, (struct sockaddr*)&localSock, sizeof(struct sockaddr))) { printf("Binding MC datagram socket num = %d error", sock_num); close(fd); exit(1); } else { printf("Binding MC datagram socket num = %d...OK.\n", sock_num); } /* Join the multicast group on the local 1.1.1.19 */ /* interface. Note that this IP_ADD_MEMBERSHIP option must be */ /* called for each local interface over which the multicast */ /* datagrams are to be received. */ group.imr_multiaddr.s_addr = inet_addr(mc_address); group.imr_interface.s_addr = inet_addr(if_address); if(setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, (char *)&group, sizeof(group)) < 0) { printf("Adding multicast group for socket num = %d error", sock_num); close(fd); exit(1); } else { printf("Adding multicast group for socket num = %d...OK.\n", sock_num); } return fd; } #include int prepare_tcp_socket(int sock_num) { struct sockaddr_in localSock; int fd; fd = socket(AF_INET, SOCK_STREAM, 0); if(fd < 0) { perror("Opening tcp socket error"); exit(1); } printf("Opening tcp socket fd=%d....OK.\n", fd); memset((char *) &localSock, 0, sizeof(localSock)); localSock.sin_family = AF_INET; localSock.sin_addr.s_addr = inet_addr(if_address); localSock.sin_port = htons(uc_local_port+sock_num-1); int flag = 1; if (setsockopt(fd,IPPROTO_TCP,TCP_NODELAY,(char *)&flag,sizeof(flag)) == -1) { perror("SETSOCKOPT tcp socket error"); close(fd); exit(1); } if(bind(fd, (struct sockaddr*)&localSock, sizeof(struct sockaddr))) { perror("Binding tcp socket error"); close(fd); exit(1); } printf("Binding tcp socket num %d....OK.\n", sock_num); struct sockaddr_in remoteSock; memset((char *) &remoteSock, 0, sizeof(remoteSock)); remoteSock.sin_family = AF_INET; remoteSock.sin_addr.s_addr = inet_addr(uc_server_address); remoteSock.sin_port = htons(uc_server_port); if (connect(fd, (struct sockaddr*)&remoteSock, sizeof(struct sockaddr)) != 0) { perror("Connect tcp socket error"); close(fd); exit(1); } printf("Connect tcp socket num %d....OK.\n", sock_num); return fd; } void * tcp_ka_func(void * num) { int thread_num = (long int)num; char ka[] = "KAA"; int ret; while(1) { if (!fd_list[thread_num].uc_fd) continue; ret = send(fd_list[thread_num].uc_fd, ka, sizeof(ka), 0); if (ret < 0) { printf("ERROR on SEND KA errno = %s\n", strerror(errno)); printf("errno value = %d\n", errno); } usleep(keep_alive_interval); } return 0; } void * tcp_recv_func(void * num) { struct sockaddr_in servaddr; char buf[uc_bufflen], ka[] = "KAA"; int ret; int thread_num = (long int)num; uint64_t delta_usec; while(1) { if (!fd_list[thread_num].uc_fd) continue; /* Timeout on recvfrom using setsockopt */ ret = recv(fd_list[thread_num].uc_fd, buf, uc_bufflen, MSG_WAITALL); if (ret < 0) { if (errno == EAGAIN){ // meaning that Timeout occured ret = send(fd_list[thread_num].uc_fd, ka, sizeof(ka), 0); if (ret < 0) { printf("ERROR on SEND 1 errno = %s\n", strerror(errno)); printf("errno value = %d\n", errno); for (int i=0; i< num_pair_of_threads; i++) { close(fd_list[i].uc_fd); } exit(1); } } else { printf("ERROR on SEND 2 errno = %s\n", strerror(errno)); printf("errno value = %d\n", errno); for (int i=0; i< num_pair_of_threads; i++) { close(fd_list[i].uc_fd); } exit(1); } } else { // packet received if (strcmp(buf, "ORD_ACK") == 0) { gettimeofday(&tv_order_end, NULL); delta_usec = ((tv_order_end.tv_sec - tv_order_start.tv_sec) * 1000000) + (tv_order_end.tv_usec - tv_order_start.tv_usec); //printf("#### Thread num = %d - ORDER sent and received ####. RTT time = %llu\n", thread_num+1, (long long unsigned int)delta_usec); } else if (strcmp(buf, "KAA_ACK") == 0) { //printf("DEBUG: *** Keep Alive sent and received ***\n"); } else { printf("Internal error! UC packet received, not ORD_ACK or KA_ACK, buf=%s\n", buf); for (int i=0; i< num_pair_of_threads; i++) { close(fd_list[i].uc_fd); } exit(1); } } } return 0; } void * recv_loop(void * num) { int ret; int thread_num = (long int)num; char buf[mc_bufflen], order[] = "ORD"; struct sockaddr_in servaddr; uint64_t rx_pkt_count, tx_pkt_count, delta_usec; int t = 0; char ka[] = "KAA"; printf("MC Thread number %d entered recv_loop\n", thread_num+1); int dummy = socket(AF_INET, SOCK_STREAM, 0); int flag = 1; setsockopt(dummy,IPPROTO_TCP,TCP_NODELAY,(char *)&flag,sizeof(flag)); struct sockaddr_in remoteSock; memset((char *) &remoteSock, 0, sizeof(remoteSock)); remoteSock.sin_family = AF_INET; remoteSock.sin_addr.s_addr = inet_addr(uc_server_address); remoteSock.sin_port = htons(uc_server_port); connect(dummy, (struct sockaddr*)&remoteSock, sizeof(struct sockaddr)); memset(&servaddr, 0, sizeof(servaddr)); servaddr.sin_family = AF_INET; servaddr.sin_port = htons(uc_server_port); if (inet_aton(uc_server_address, &(servaddr.sin_addr)) <= 0) { printf("ERROR: Invalid IP address.\n"); exit(1); } rx_pkt_count=0; tx_pkt_count = 0; struct timeval tv_start, tv_end; gettimeofday(&tv_start, NULL); while (true) { t++; ret = recv(fd_list[thread_num].mc_fd, buf, mc_bufflen, 0); if(ret == -1) { printf("ERROR in recv! errno = %s\n", strerror(errno)); exit(1); } /* rx_pkt_count++; if (rx_pkt_count > recv_packets_amount) { gettimeofday(&tv_end, NULL); delta_usec = ((tv_end.tv_sec - tv_start.tv_sec) * 1000000) + (tv_end.tv_usec - tv_start.tv_usec); printf("MC thread num %d received %llu packets in usec = %llu\n", thread_num+1, (long long unsigned int)recv_packets_amount, (long long unsigned int)delta_usec); rx_pkt_count=0; gettimeofday(&tv_start, NULL); } */ if (strcmp(buf, "QUOTE") == 0) { gettimeofday(&tv_order_start, NULL); struct timespec ts_start = {0,0}, ts_end = {0,0}, ts_start1 = {0,0}, ts_end1 = {0,0}; tx_pkt_count++; clock_gettime(CLOCK_MONOTONIC, &ts_start); ret = send(fd_list[thread_num].uc_fd, order, sizeof(order), 0); clock_gettime(CLOCK_MONOTONIC, &ts_end); if (tx_pkt_count >= send_packets_amount) { tx_pkt_count = 0; uint64_t delta_usec = ((ts_end.tv_sec - ts_start.tv_sec) * 1000000000) + (ts_end.tv_nsec - ts_start.tv_nsec); printf("MC thread number %d got QUOTE, sending TCP order (send time = %llu nsec) \n", thread_num+1, (long long unsigned int)delta_usec); } if (ret < 0) { printf("ERROR on SEND errno = %s\n", strerror(errno)); printf("errno value = %d\n", errno); } } else if (t % tcp_dummy_send_rate == 0){ //dummy send send(fd_list[thread_num].uc_fd, NULL, 1, 0); } } return 0; } #include int main(int argc, char *argv[]) { int i; for (i=1; i