/* ** Build command: g++ -lpthread exchange.cpp -o exchange */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #define NUM_SOCKETS 2 #define MC_SOCKET 0 #define UC_SOCKET 1 #define NUM_PACKETS 200000 #define IF_ADDRESS "1.1.1.18" #define UC_SERVER_ADDRESS "1.1.1.19" #define MC_ADDRESS "224.0.1.2" #define MC_DEST_PORT 15111 #define UC_LOCAL_PORT 15222 #define UC_SERVER_PORT 15333 #define MC_BUFFLEN 200 #define UC_BUFFLEN 12 #define MIN_UC_BUFFLEN 10 #define SLEEP_TIME_USEC 10 #define MAX_PARAM_LENGTH 20 int fd_list[NUM_SOCKETS]; uint64_t tx_pkt_count, delta_usec_quote; struct timeval tv_quote_start, tv_quote_end; char if_address[MAX_PARAM_LENGTH] = "NO IF ADDRESS!!!"; int num_packets = NUM_PACKETS; char mc_address[MAX_PARAM_LENGTH] = MC_ADDRESS; uint16_t mc_dest_port = MC_DEST_PORT; uint16_t uc_local_port = UC_LOCAL_PORT; int mc_bufflen = MC_BUFFLEN; int uc_bufflen = UC_BUFFLEN; uint64_t sleep_time_usec = SLEEP_TIME_USEC; void usage(void) { printf("Usage:\n"); printf("\t-l\t\n"); printf("\t[-n]\t\n"); printf("\t[-m]\t\n"); printf("\t[-pm]\t\n"); printf("\t[-lp]\t\n"); printf("\t[-sm]\t\n"); printf("\t[-su]\t\n"); printf("\t[-u]\t\n"); } int prepare_socket() { struct sockaddr_in groupsock; struct in_addr localInterface; int fd = socket(AF_INET, SOCK_DGRAM, 0); if(fd < 0) { perror("Opening datagram socket error"); exit(1); } memset(&groupsock, 0, sizeof(groupsock)); groupsock.sin_family = AF_INET; groupsock.sin_addr.s_addr = inet_addr(mc_address); groupsock.sin_port = htons(mc_dest_port); /* Disable loopback so you do not receive your own datagrams.*/ char loopch = 0; if(setsockopt(fd, IPPROTO_IP, IP_MULTICAST_LOOP, (char *)&loopch, sizeof(loopch)) < 0) { perror("Setting IP_MULTICAST_LOOP error"); close(fd); exit(1); } /* Set local interface for outbound multicast datagrams. */ localInterface.s_addr = inet_addr(if_address); if(setsockopt(fd, IPPROTO_IP, IP_MULTICAST_IF, (char *)&localInterface, sizeof(localInterface)) < 0) { perror("Setting local interface error"); exit(1); } printf("Connecting..\n"); if(connect(fd, (struct sockaddr *) &groupsock, sizeof(struct sockaddr))) { perror("connect"); close(fd); exit(1); } return fd; } void* send_mc_loop(void* num) { int ret; char databuf[mc_bufflen]; char quote[] = "QUOTE"; uint64_t delta_usec, delta_usec_sleep; /* Prepare MC socket */ printf("Opening datagram MC socket\n"); fd_list[MC_SOCKET] = prepare_socket(); // Prepare to start measurements tx_pkt_count = 0; struct timeval tv_start, tv_sleep_start, tv_sleep_end; gettimeofday(&tv_start, NULL); gettimeofday(&tv_sleep_start, NULL); gettimeofday(&tv_sleep_end, NULL); while(true) { delta_usec_sleep = ((tv_sleep_end.tv_sec - tv_sleep_start.tv_sec) * 1000000) + (tv_sleep_end.tv_usec - tv_sleep_start.tv_usec); if (delta_usec_sleep > sleep_time_usec) { ret = send(fd_list[MC_SOCKET], databuf, mc_bufflen, 0); // Can use send with UDP socket because called connect() before... if (ret < 0) printf("ERROR on SEND errno = %s\n", strerror(errno)); tx_pkt_count++; tv_sleep_start = tv_sleep_end; } else { gettimeofday(&tv_sleep_end, NULL); } if ((tx_pkt_count != 0) && (tx_pkt_count % num_packets) == 0) { struct timeval tv_now; gettimeofday(&tv_now, NULL); delta_usec = ((tv_now.tv_sec - tv_start.tv_sec) * 1000000) + (tv_now.tv_usec - tv_start.tv_usec); tv_start = tv_now; double mps = 1000000 * (tx_pkt_count/(double)delta_usec); double bwGbps = mps * mc_bufflen * 8/(1024*1024*1024); printf("BW(Gbps)=%6.3f, MPS=%10.0f\n", bwGbps, mps); tx_pkt_count = 0; gettimeofday(&tv_quote_start, NULL); ret = send(fd_list[MC_SOCKET], quote, sizeof(quote), 0); if (ret < 0) printf("ERROR on SEND errno = %s\n", strerror(errno)); } } return 0; } void * uc_func(void * num) { struct sockaddr_in localSock, servaddr; socklen_t servaddrlen = sizeof(struct sockaddr); char buf[uc_bufflen], ord_ack[] = "ORD_ACK", ka_ack[] = "KA_ACK"; int ret, print = 0; fd_list[UC_SOCKET] = socket(AF_INET, SOCK_DGRAM, 0); if(fd_list[UC_SOCKET] < 0) { perror("Opening datagram UC socket error"); exit(1); } printf("Opening datagram UC socket....OK.\n"); memset((char *) &localSock, 0, sizeof(localSock)); localSock.sin_family = AF_INET; localSock.sin_addr.s_addr = inet_addr(if_address); localSock.sin_port = htons(uc_local_port); if(bind(fd_list[UC_SOCKET], (struct sockaddr*)&localSock, sizeof(struct sockaddr))) { perror("Binding datagram UC socket error"); close(fd_list[UC_SOCKET]); exit(1); } else { printf("Binding datagram UC socket...OK.\n"); } while(1) { ret = recvfrom(fd_list[UC_SOCKET], buf, uc_bufflen, 0, (struct sockaddr *)&servaddr, &servaddrlen); gettimeofday(&tv_quote_end, NULL); if (ret < 0) { printf("ERROR on RECV errno = %s \n", strerror(errno)); printf("errno value = %d\n", errno); } else { if (strcmp(buf, "ORD") == 0) { ret = sendto(fd_list[UC_SOCKET], ord_ack, sizeof(ord_ack), 0, (struct sockaddr *) &servaddr, sizeof(struct sockaddr)); if (ret < 0) { printf("ERROR on SEND UC errno = %s \n", strerror(errno)); printf("errno value = %d\n", errno); } print = 1; } else if (strcmp(buf, "KA") == 0){ ret = sendto(fd_list[UC_SOCKET], ka_ack, sizeof(ka_ack), 0, (struct sockaddr *) &servaddr, sizeof(struct sockaddr)); if (ret < 0) { printf("ERROR on SEND UC errno = %s \n", strerror(errno)); printf("errno value = %d\n", errno); } } else{ printf("Internal error: Exchange received UC packet- not ORD or KA\n"); } if (print) { delta_usec_quote = ((tv_quote_end.tv_sec - tv_quote_start.tv_sec) * 1000000) + (tv_quote_end.tv_usec - tv_quote_start.tv_usec); printf("@@@@@@@ QUOTE from port %u RTT in usec = %llu @@@@@@@\n", ntohs(servaddr.sin_port), (long long unsigned int)delta_usec_quote); print = 0; } } } close(fd_list[UC_SOCKET]); printf("closed UC socket\n"); return 0; } int main(int argc, char *argv[]) { pthread_t uc_thread; int nThreadId = 1, i; for (i=1; i