/**
* Copyright (C) Mellanox Technologies Ltd. 2001-2014. ALL RIGHTS RESERVED.
* Copyright (C) The University of Tennessee and The University
* of Tennessee Research Foundation. 2015. ALL RIGHTS RESERVED.
* Copyright (C) UT-Battelle, LLC. 2015. ALL RIGHTS RESERVED.
*
* See file LICENSE for terms.
*/
#ifdef HAVE_CONFIG_H
# include "config.h"
#endif
#include "api/libperf.h"
#include "lib/libperf_int.h"
#include <ucs/sys/string.h>
#include <ucs/sys/sys.h>
#include <ucs/sys/sock.h>
#include <ucs/debug/log.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <netdb.h>
#include <getopt.h>
#include <string.h>
#include <sys/types.h>
#include <sys/poll.h>
#include <locale.h>
#if HAVE_MPI
# include <mpi.h>
#elif HAVE_RTE
# include<rte.h>
#endif
#define MAX_BATCH_FILES 32
#define TL_RESOURCE_NAME_NONE "<none>"
#define TEST_PARAMS_ARGS "t:n:s:W:O:w:D:i:H:oSCqM:r:T:d:x:A:BUm:"
enum {
TEST_FLAG_PRINT_RESULTS = UCS_BIT(0),
TEST_FLAG_PRINT_TEST = UCS_BIT(1),
TEST_FLAG_SET_AFFINITY = UCS_BIT(8),
TEST_FLAG_NUMERIC_FMT = UCS_BIT(9),
TEST_FLAG_PRINT_FINAL = UCS_BIT(10),
TEST_FLAG_PRINT_CSV = UCS_BIT(11)
};
typedef struct sock_rte_group {
int is_server;
int connfd;
} sock_rte_group_t;
typedef struct test_type {
const char *name;
ucx_perf_api_t api;
ucx_perf_cmd_t command;
ucx_perf_test_type_t test_type;
const char *desc;
} test_type_t;
struct perftest_context {
ucx_perf_params_t params;
const char *server_addr;
int port;
int mpi;
unsigned cpu;
unsigned flags;
unsigned num_batch_files;
char *batch_files[MAX_BATCH_FILES];
char *test_names[MAX_BATCH_FILES];
sock_rte_group_t sock_rte_group;
};
test_type_t tests[] = {
{"am_lat", UCX_PERF_API_UCT, UCX_PERF_CMD_AM, UCX_PERF_TEST_TYPE_PINGPONG,
"active message latency"},
{"put_lat", UCX_PERF_API_UCT, UCX_PERF_CMD_PUT, UCX_PERF_TEST_TYPE_PINGPONG,
"put latency"},
{"add_lat", UCX_PERF_API_UCT, UCX_PERF_CMD_ADD, UCX_PERF_TEST_TYPE_PINGPONG,
"atomic add latency"},
{"get", UCX_PERF_API_UCT, UCX_PERF_CMD_GET, UCX_PERF_TEST_TYPE_STREAM_UNI,
"get latency / bandwidth / message rate"},
{"fadd", UCX_PERF_API_UCT, UCX_PERF_CMD_FADD, UCX_PERF_TEST_TYPE_STREAM_UNI,
"atomic fetch-and-add latency / rate"},
{"swap", UCX_PERF_API_UCT, UCX_PERF_CMD_SWAP, UCX_PERF_TEST_TYPE_STREAM_UNI,
"atomic swap latency / rate"},
{"cswap", UCX_PERF_API_UCT, UCX_PERF_CMD_CSWAP, UCX_PERF_TEST_TYPE_STREAM_UNI,
"atomic compare-and-swap latency / rate"},
{"am_bw", UCX_PERF_API_UCT, UCX_PERF_CMD_AM, UCX_PERF_TEST_TYPE_STREAM_UNI,
"active message bandwidth / message rate"},
{"put_bw", UCX_PERF_API_UCT, UCX_PERF_CMD_PUT, UCX_PERF_TEST_TYPE_STREAM_UNI,
"put bandwidth / message rate"},
{"add_mr", UCX_PERF_API_UCT, UCX_PERF_CMD_ADD, UCX_PERF_TEST_TYPE_STREAM_UNI,
"atomic add message rate"},
{"tag_lat", UCX_PERF_API_UCP, UCX_PERF_CMD_TAG, UCX_PERF_TEST_TYPE_PINGPONG,
"tag match latency"},
{"tag_bw", UCX_PERF_API_UCP, UCX_PERF_CMD_TAG, UCX_PERF_TEST_TYPE_STREAM_UNI,
"tag match bandwidth"},
{"tag_sync_lat", UCX_PERF_API_UCP, UCX_PERF_CMD_TAG_SYNC, UCX_PERF_TEST_TYPE_PINGPONG,
"tag sync match latency"},
{"tag_sync_bw", UCX_PERF_API_UCP, UCX_PERF_CMD_TAG_SYNC, UCX_PERF_TEST_TYPE_STREAM_UNI,
"tag sync match bandwidth"},
{"ucp_put_lat", UCX_PERF_API_UCP, UCX_PERF_CMD_PUT, UCX_PERF_TEST_TYPE_PINGPONG,
"put latency"},
{"ucp_put_bw", UCX_PERF_API_UCP, UCX_PERF_CMD_PUT, UCX_PERF_TEST_TYPE_STREAM_UNI,
"put bandwidth"},
{"ucp_get", UCX_PERF_API_UCP, UCX_PERF_CMD_GET, UCX_PERF_TEST_TYPE_STREAM_UNI,
"get latency / bandwidth / message rate"},
{"ucp_add", UCX_PERF_API_UCP, UCX_PERF_CMD_ADD, UCX_PERF_TEST_TYPE_STREAM_UNI,
"atomic add bandwidth / message rate"},
{"ucp_fadd", UCX_PERF_API_UCP, UCX_PERF_CMD_FADD, UCX_PERF_TEST_TYPE_STREAM_UNI,
"atomic fetch-and-add latency / bandwidth / rate"},
{"ucp_swap", UCX_PERF_API_UCP, UCX_PERF_CMD_SWAP, UCX_PERF_TEST_TYPE_STREAM_UNI,
"atomic swap latency / bandwidth / rate"},
{"ucp_cswap", UCX_PERF_API_UCP, UCX_PERF_CMD_CSWAP, UCX_PERF_TEST_TYPE_STREAM_UNI,
"atomic compare-and-swap latency / bandwidth / rate"},
{"stream_bw", UCX_PERF_API_UCP, UCX_PERF_CMD_STREAM, UCX_PERF_TEST_TYPE_STREAM_UNI,
"stream bandwidth"},
{"stream_lat", UCX_PERF_API_UCP, UCX_PERF_CMD_STREAM, UCX_PERF_TEST_TYPE_PINGPONG,
"stream latency"},
{NULL}
};
static int sock_io(int sock, ssize_t (*sock_call)(int, void *, size_t, int),
int poll_events, void *data, size_t size,
void (*progress)(void *arg), void *arg, const char *name)
{
size_t total = 0;
struct pollfd pfd;
int ret;
while (total < size) {
pfd.fd = sock;
pfd.events = poll_events;
pfd.revents = 0;
ret = poll(&pfd, 1, 1); /* poll for 1ms */
if (ret > 0) {
ucs_assert(ret == 1);
ucs_assert(pfd.revents & poll_events);
ret = sock_call(sock, (char*)data + total, size - total, 0);
if (ret < 0) {
ucs_error("%s() failed: %m", name);
return -1;
}
total += ret;
} else if ((ret < 0) && (errno != EINTR)) {
ucs_error("poll(fd=%d) failed: %m", sock);
return -1;
}
/* progress user context */
if (progress != NULL) {
progress(arg);
}
}
return 0;
}
static int safe_send(int sock, void *data, size_t size,
void (*progress)(void *arg), void *arg)
{
typedef ssize_t (*sock_call)(int, void *, size_t, int);
return sock_io(sock, (sock_call)send, POLLOUT, data, size, progress, arg, "send");
}
static int safe_recv(int sock, void *data, size_t size,
void (*progress)(void *arg), void *arg)
{
return sock_io(sock, recv, POLLIN, data, size, progress, arg, "recv");
}
static void print_progress(char **test_names, unsigned num_names,
const ucx_perf_result_t *result, unsigned flags,
int final)
{
static const char *fmt_csv = "%.0f,%.3f,%.3f,%.3f,%.2f,%.2f,%.0f,%.0f\n";
static const char *fmt_numeric = "%'14.0f %9.3f %9.3f %9.3f %10.2f %10.2f %'11.0f %'11.0f\n";
static const char *fmt_plain = "%14.0f %9.3f %9.3f %9.3f %10.2f %10.2f %11.0f %11.0f\n";
unsigned i;
if (!(flags & TEST_FLAG_PRINT_RESULTS) ||
(!final && (flags & TEST_FLAG_PRINT_FINAL)))
{
return;
}
if (flags & TEST_FLAG_PRINT_CSV) {
for (i = 0; i < num_names; ++i) {
printf("%s,", test_names[i]);
}
}
printf((flags & TEST_FLAG_PRINT_CSV) ? fmt_csv :
(flags & TEST_FLAG_NUMERIC_FMT) ? fmt_numeric :
fmt_plain,
(double)result->iters,
result->latency.typical * 1000000.0,
result->latency.moment_average * 1000000.0,
result->latency.total_average * 1000000.0,
result->bandwidth.moment_average / (1024.0 * 1024.0),
result->bandwidth.total_average / (1024.0 * 1024.0),
result->msgrate.moment_average,
result->msgrate.total_average);
fflush(stdout);
}
static void print_header(struct perftest_context *ctx)
{
const char *test_api_str;
const char *test_data_str;
test_type_t *test;
unsigned i;
if (ctx->flags & TEST_FLAG_PRINT_TEST) {
for (test = tests; test->name; ++test) {
if ((test->command == ctx->params.command) && (test->test_type == ctx->params.test_type)) {
break;
}
}
if (test->name != NULL) {
if (test->api == UCX_PERF_API_UCT) {
test_api_str = "transport layer";
switch (ctx->params.uct.data_layout) {
case UCT_PERF_DATA_LAYOUT_SHORT:
test_data_str = "short";
break;
case UCT_PERF_DATA_LAYOUT_BCOPY:
test_data_str = "bcopy";
break;
case UCT_PERF_DATA_LAYOUT_ZCOPY:
test_data_str = "zcopy";
break;
default:
test_data_str = "(undefined)";
break;
}
} else if (test->api == UCX_PERF_API_UCP) {
test_api_str = "protocol layer";
test_data_str = "(automatic)"; /* TODO contig/stride/stream */
} else {
return;
}
printf("+------------------------------------------------------------------------------------------+\n");
printf("| API: %-60s |\n", test_api_str);
printf("| Test: %-60s |\n", test->desc);
printf("| Data layout: %-60s |\n", test_data_str);
printf("| Send memory: %-60s |\n", ucs_memory_type_names[ctx->params.send_mem_type]);
printf("| Recv memory: %-60s |\n", ucs_memory_type_names[ctx->params.recv_mem_type]);
printf("| Message size: %-60zu |\n", ucx_perf_get_message_size(&ctx->params));
}
}
if (ctx->flags & TEST_FLAG_PRINT_CSV) {
if (ctx->flags & TEST_FLAG_PRINT_RESULTS) {
for (i = 0; i < ctx->num_batch_files; ++i) {
printf("%s,", basename(ctx->batch_files[i]));
}
printf("iterations,typical_lat,avg_lat,overall_lat,avg_bw,overall_bw,avg_mr,overall_mr\n");
}
} else {
if (ctx->flags & TEST_FLAG_PRINT_RESULTS) {
printf("+--------------+-----------------------------+---------------------+-----------------------+\n");
printf("| | latency (usec) | bandwidth (MB/s) | message rate (msg/s) |\n");
printf("+--------------+---------+---------+---------+----------+----------+-----------+-----------+\n");
printf("| # iterations | typical | average | overall | average | overall | average | overall |\n");
printf("+--------------+---------+---------+---------+----------+----------+-----------+-----------+\n");
} else if (ctx->flags & TEST_FLAG_PRINT_TEST) {
printf("+------------------------------------------------------------------------------------------+\n");
}
}
}
static void print_test_name(struct perftest_context *ctx)
{
char buf[200];
unsigned i, pos;
if (!(ctx->flags & TEST_FLAG_PRINT_CSV) && (ctx->num_batch_files > 0)) {
strcpy(buf, "+--------------+---------+---------+---------+----------+----------+-----------+-----------+");
pos = 1;
for (i = 0; i < ctx->num_batch_files; ++i) {
if (i != 0) {
buf[pos++] = '/';
}
memcpy(&buf[pos], ctx->test_names[i],
ucs_min(strlen(ctx->test_names[i]), sizeof(buf) - pos - 1));
pos += strlen(ctx->test_names[i]);
}
if (ctx->flags & TEST_FLAG_PRINT_RESULTS) {
printf("%s\n", buf);
}
}
}
static void print_memory_type_usage(void)
{
ucs_memory_type_t it;
for (it = UCS_MEMORY_TYPE_HOST; it < UCS_MEMORY_TYPE_LAST; it++) {
if (ucx_perf_mem_type_allocators[it] != NULL) {
printf(" %s - %s\n",
ucs_memory_type_names[it],
ucs_memory_type_descs[it]);
}
}
}
static void usage(const struct perftest_context *ctx, const char *program)
{
static const char* api_names[] = {
[UCX_PERF_API_UCT] = "UCT",
[UCX_PERF_API_UCP] = "UCP"
};
test_type_t *test;
int UCS_V_UNUSED rank;
#if HAVE_MPI
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
if (ctx->mpi && (rank != 0)) {
return;
}
#endif
#if HAVE_MPI
printf(" Note: test can be also launched as an MPI application\n");
printf("\n");
#elif HAVE_RTE
printf(" Note: this test can be also launched as an libRTE application\n");
printf("\n");
#endif
printf(" Usage: %s [ server-hostname ] [ options ]\n", program);
printf("\n");
printf(" Common options:\n");
printf(" -t <test> test to run:\n");
for (test = tests; test->name; ++test) {
printf(" %13s - %s %s\n", test->name,
api_names[test->api], test->desc);
}
printf("\n");
printf(" -s <size> list of scatter-gather sizes for single message (%zu)\n",
ctx->params.msg_size_list[0]);
printf(" for example: \"-s 16,48,8192,8192,14\"\n");
printf(" -m <send mem type>[,<recv mem type>]\n");
printf(" memory type of message for sender and receiver (host)\n");
print_memory_type_usage();
printf(" -n <iters> number of iterations to run (%ld)\n", ctx->params.max_iter);
printf(" -w <iters> number of warm-up iterations (%zu)\n",
ctx->params.warmup_iter);
printf(" -c <cpu> set affinity to this CPU (off)\n");
printf(" -O <count> maximal number of uncompleted outstanding sends (%u)\n",
ctx->params.max_outstanding);
printf(" -i <offset> distance between consecutive scatter-gather entries (%zu)\n",
ctx->params.iov_stride);
printf(" -T <threads> number of threads in the test (%d), if >1 implies \"-M multi\"\n",
ctx->params.thread_count);
printf(" -B register memory with NONBLOCK flag\n");
printf(" -b <file> read and execute tests from a batch file: every line in the\n");
printf(" file is a test to run, first word is test name, the rest of\n");
printf(" the line is command-line arguments for the test.\n");
printf(" -p <port> TCP port to use for data exchange (%d)\n", ctx->port);
#if HAVE_MPI
printf(" -P <0|1> disable/enable MPI mode (%d)\n", ctx->mpi);
#endif
printf(" -h show this help message\n");
printf("\n");
printf(" Output format:\n");
printf(" -N use numeric formatting (thousands separator)\n");
printf(" -f print only final numbers\n");
printf(" -v print CSV-formatted output\n");
printf("\n");
printf(" UCT only:\n");
printf(" -d <device> device to use for testing\n");
printf(" -x <tl> transport to use for testing\n");
printf(" -D <layout> data layout for sender side:\n");
printf(" short - short messages (default, cannot be used for get)\n");
printf(" bcopy - copy-out (cannot be used for atomics)\n");
printf(" zcopy - zero-copy (cannot be used for atomics)\n");
printf(" iov - scatter-gather list (iovec)\n");
printf(" -W <count> flow control window size, for active messages (%u)\n",
ctx->params.uct.fc_window);
printf(" -H <size> active message header size (%zu)\n",
ctx->params.am_hdr_size);
printf(" -A <mode> asynchronous progress mode (thread_spinlock)\n");
printf(" thread_spinlock - separate progress thread with spin locking\n");
printf(" thread_mutex - separate progress thread with mutex locking\n");
printf(" signal - signal-based timer\n");
printf("\n");
printf(" UCP only:\n");
printf(" -M <thread> thread support level for progress engine (single)\n");
printf(" single - only the master thread can access\n");
printf(" serialized - one thread can access at a time\n");
printf(" multi - multiple threads can access\n");
printf(" -D <layout>[,<layout>]\n");
printf(" data layout for sender and receiver side (contig)\n");
printf(" contig - Continuous datatype\n");
printf(" iov - Scatter-gather list\n");
printf(" -C use wild-card tag for tag tests\n");
printf(" -U force unexpected flow by using tag probe\n");
printf(" -r <mode> receive mode for stream tests (recv)\n");
printf(" recv : Use ucp_stream_recv_nb\n");
printf(" recv_data : Use ucp_stream_recv_data_nb\n");
printf("\n");
printf(" NOTE: When running UCP tests, transport and device should be specified by\n");
printf(" environment variables: UCX_TLS and UCX_[SELF|SHM|NET]_DEVICES.\n");
printf("\n");
}
static ucs_status_t parse_ucp_datatype_params(const char *optarg,
ucp_perf_datatype_t *datatype)
{
const char *iov_type = "iov";
const size_t iov_type_size = strlen("iov");
const char *contig_type = "contig";
const size_t contig_type_size = strlen("contig");
if (0 == strncmp(optarg, iov_type, iov_type_size)) {
*datatype = UCP_PERF_DATATYPE_IOV;
} else if (0 == strncmp(optarg, contig_type, contig_type_size)) {
*datatype = UCP_PERF_DATATYPE_CONTIG;
} else {
return UCS_ERR_INVALID_PARAM;
}
return UCS_OK;
}
static ucs_status_t parse_mem_type(const char *optarg,
ucs_memory_type_t *mem_type)
{
ucs_memory_type_t it;
for (it = UCS_MEMORY_TYPE_HOST; it < UCS_MEMORY_TYPE_LAST; it++) {
if(!strcmp(optarg, ucs_memory_type_names[it]) &&
(ucx_perf_mem_type_allocators[it] != NULL)) {
*mem_type = it;
return UCS_OK;
}
}
ucs_error("Unsupported memory type: \"%s\"", optarg);
return UCS_ERR_INVALID_PARAM;
}
static ucs_status_t parse_mem_type_params(const char *optarg,
ucs_memory_type_t *send_mem_type,
ucs_memory_type_t *recv_mem_type)
{
const char *delim = ",";
char *token = strtok((char*)optarg, delim);
if (UCS_OK != parse_mem_type(token, send_mem_type)) {
return UCS_ERR_INVALID_PARAM;
}
token = strtok(NULL, delim);
if (NULL == token) {
*recv_mem_type = *send_mem_type;
return UCS_OK;
} else {
return parse_mem_type(token, recv_mem_type);
}
}
static ucs_status_t parse_message_sizes_params(const char *optarg,
ucx_perf_params_t *params)
{
const char delim = ',';
size_t *msg_size_list, token_num, token_it;
char *optarg_ptr, *optarg_ptr2;
optarg_ptr = (char *)optarg;
token_num = 0;
/* count the number of given message sizes */
while ((optarg_ptr = strchr(optarg_ptr, delim)) != NULL) {
++optarg_ptr;
++token_num;
}
++token_num;
msg_size_list = realloc(params->msg_size_list,
sizeof(*params->msg_size_list) * token_num);
if (NULL == msg_size_list) {
return UCS_ERR_NO_MEMORY;
}
params->msg_size_list = msg_size_list;
optarg_ptr = (char *)optarg;
errno = 0;
for (token_it = 0; token_it < token_num; ++token_it) {
params->msg_size_list[token_it] = strtoul(optarg_ptr, &optarg_ptr2, 10);
if (((ERANGE == errno) && (ULONG_MAX == params->msg_size_list[token_it])) ||
((errno != 0) && (params->msg_size_list[token_it] == 0)) ||
(optarg_ptr == optarg_ptr2)) {
free(params->msg_size_list);
params->msg_size_list = NULL; /* prevent double free */
ucs_error("Invalid option substring argument at position %lu", token_it);
return UCS_ERR_INVALID_PARAM;
}
optarg_ptr = optarg_ptr2 + 1;
}
params->msg_size_cnt = token_num;
return UCS_OK;
}
static ucs_status_t init_test_params(ucx_perf_params_t *params)
{
memset(params, 0, sizeof(*params));
params->api = UCX_PERF_API_LAST;
params->command = UCX_PERF_CMD_LAST;
params->test_type = UCX_PERF_TEST_TYPE_LAST;
params->thread_mode = UCS_THREAD_MODE_SINGLE;
params->thread_count = 1;
params->async_mode = UCS_ASYNC_THREAD_LOCK_TYPE;
params->wait_mode = UCX_PERF_WAIT_MODE_LAST;
params->max_outstanding = 1;
params->warmup_iter = 10000;
params->am_hdr_size = 8;
params->alignment = ucs_get_page_size();
params->max_iter = 1000000l;
params->max_time = 0.0;
params->report_interval = 1.0;
params->flags = UCX_PERF_TEST_FLAG_VERBOSE;
params->uct.fc_window = UCT_PERF_TEST_MAX_FC_WINDOW;
params->uct.data_layout = UCT_PERF_DATA_LAYOUT_SHORT;
params->send_mem_type = UCS_MEMORY_TYPE_HOST;
params->recv_mem_type = UCS_MEMORY_TYPE_HOST;
params->msg_size_cnt = 1;
params->iov_stride = 0;
params->ucp.send_datatype = UCP_PERF_DATATYPE_CONTIG;
params->ucp.recv_datatype = UCP_PERF_DATATYPE_CONTIG;
strcpy(params->uct.dev_name, TL_RESOURCE_NAME_NONE);
strcpy(params->uct.tl_name, TL_RESOURCE_NAME_NONE);
params->msg_size_list = calloc(params->msg_size_cnt,
sizeof(*params->msg_size_list));
if (params->msg_size_list == NULL) {
return UCS_ERR_NO_MEMORY;
}
params->msg_size_list[0] = 8;
return UCS_OK;
}
static ucs_status_t parse_test_params(ucx_perf_params_t *params, char opt, const char *optarg)
{
test_type_t *test;
char *optarg2 = NULL;
switch (opt) {
case 'd':
ucs_snprintf_zero(params->uct.dev_name, sizeof(params->uct.dev_name),
"%s", optarg);
return UCS_OK;
case 'x':
ucs_snprintf_zero(params->uct.tl_name, sizeof(params->uct.tl_name),
"%s", optarg);
return UCS_OK;
case 't':
for (test = tests; test->name; ++test) {
if (!strcmp(optarg, test->name)) {
params->api = test->api;
params->command = test->command;
params->test_type = test->test_type;
break;
}
}
if (test->name == NULL) {
ucs_error("Invalid option argument for -t");
return UCS_ERR_INVALID_PARAM;
}
return UCS_OK;
case 'D':
if (!strcmp(optarg, "short")) {
params->uct.data_layout = UCT_PERF_DATA_LAYOUT_SHORT;
} else if (!strcmp(optarg, "bcopy")) {
params->uct.data_layout = UCT_PERF_DATA_LAYOUT_BCOPY;
} else if (!strcmp(optarg, "zcopy")) {
params->uct.data_layout = UCT_PERF_DATA_LAYOUT_ZCOPY;
} else if (UCS_OK == parse_ucp_datatype_params(optarg,
¶ms->ucp.send_datatype)) {
optarg2 = strchr(optarg, ',');
if (optarg2) {
if (UCS_OK != parse_ucp_datatype_params(optarg2 + 1,
¶ms->ucp.recv_datatype)) {
return UCS_ERR_INVALID_PARAM;
}
}
} else {
ucs_error("Invalid option argument for -D");
return UCS_ERR_INVALID_PARAM;
}
return UCS_OK;
case 'i':
params->iov_stride = atol(optarg);
return UCS_OK;
case 'n':
params->max_iter = atol(optarg);
return UCS_OK;
case 's':
return parse_message_sizes_params(optarg, params);
case 'H':
params->am_hdr_size = atol(optarg);
return UCS_OK;
case 'W':
params->uct.fc_window = atoi(optarg);
return UCS_OK;
case 'O':
params->max_outstanding = atoi(optarg);
return UCS_OK;
case 'w':
params->warmup_iter = atol(optarg);
return UCS_OK;
case 'o':
params->flags |= UCX_PERF_TEST_FLAG_ONE_SIDED;
return UCS_OK;
case 'B':
params->flags |= UCX_PERF_TEST_FLAG_MAP_NONBLOCK;
return UCS_OK;
case 'q':
params->flags &= ~UCX_PERF_TEST_FLAG_VERBOSE;
return UCS_OK;
case 'C':
params->flags |= UCX_PERF_TEST_FLAG_TAG_WILDCARD;
return UCS_OK;
case 'U':
params->flags |= UCX_PERF_TEST_FLAG_TAG_UNEXP_PROBE;
return UCS_OK;
case 'M':
if (!strcmp(optarg, "single")) {
params->thread_mode = UCS_THREAD_MODE_SINGLE;
return UCS_OK;
} else if (!strcmp(optarg, "serialized")) {
params->thread_mode = UCS_THREAD_MODE_SERIALIZED;
return UCS_OK;
} else if (!strcmp(optarg, "multi")) {
params->thread_mode = UCS_THREAD_MODE_MULTI;
return UCS_OK;
} else {
ucs_error("Invalid option argument for -M");
return UCS_ERR_INVALID_PARAM;
}
case 'T':
params->thread_count = atoi(optarg);
params->thread_mode = UCS_THREAD_MODE_MULTI;
return UCS_OK;
case 'A':
if (!strcmp(optarg, "thread") || !strcmp(optarg, "thread_spinlock")) {
params->async_mode = UCS_ASYNC_MODE_THREAD_SPINLOCK;
return UCS_OK;
} else if (!strcmp(optarg, "thread_mutex")) {
params->async_mode = UCS_ASYNC_MODE_THREAD_MUTEX;
return UCS_OK;
} else if (!strcmp(optarg, "signal")) {
params->async_mode = UCS_ASYNC_MODE_SIGNAL;
return UCS_OK;
} else {
ucs_error("Invalid option argument for -A");
return UCS_ERR_INVALID_PARAM;
}
case 'r':
if (!strcmp(optarg, "recv_data")) {
params->flags |= UCX_PERF_TEST_FLAG_STREAM_RECV_DATA;
return UCS_OK;
} else if (!strcmp(optarg, "recv")) {
params->flags &= ~UCX_PERF_TEST_FLAG_STREAM_RECV_DATA;
return UCS_OK;
}
return UCS_ERR_INVALID_PARAM;
case 'm':
if (UCS_OK != parse_mem_type_params(optarg,
¶ms->send_mem_type,
¶ms->recv_mem_type)) {
return UCS_ERR_INVALID_PARAM;
}
return UCS_OK;
default:
return UCS_ERR_INVALID_PARAM;
}
}
static ucs_status_t read_batch_file(FILE *batch_file, const char *file_name,
int *line_num, ucx_perf_params_t *params,
char** test_name_p)
{
#define MAX_SIZE 256
#define MAX_ARG_SIZE 2048
ucs_status_t status;
char buf[MAX_ARG_SIZE];
int argc;
char *argv[MAX_SIZE + 1];
int c;
char *p;
do {
if (fgets(buf, sizeof(buf) - 1, batch_file) == NULL) {
return UCS_ERR_NO_ELEM;
}
++(*line_num);
argc = 0;
p = strtok(buf, " \t\n\r");
while (p && (argc < MAX_SIZE)) {
argv[argc++] = p;
p = strtok(NULL, " \t\n\r");
}
argv[argc] = NULL;
} while ((argc == 0) || (argv[0][0] == '#'));
optind = 1;
while ((c = getopt (argc, argv, TEST_PARAMS_ARGS)) != -1) {
status = parse_test_params(params, c, optarg);
if (status != UCS_OK) {
ucs_error("in batch file '%s' line %d: -%c %s: %s",
file_name, *line_num, c, optarg, ucs_status_string(status));
return status;
}
}
*test_name_p = strdup(argv[0]);
return UCS_OK;
}
static ucs_status_t parse_opts(struct perftest_context *ctx, int mpi_initialized,
int argc, char **argv)
{
ucs_status_t status;
int c;
ucs_trace_func("");
ucx_perf_global_init(); /* initialize memory types */
status = init_test_params(&ctx->params);
if (status != UCS_OK) {
return status;
}
ctx->server_addr = NULL;
ctx->num_batch_files = 0;
ctx->port = 13337;
ctx->flags = 0;
ctx->mpi = mpi_initialized;
optind = 1;
while ((c = getopt (argc, argv, "p:b:Nfvc:P:h" TEST_PARAMS_ARGS)) != -1) {
switch (c) {
case 'p':
ctx->port = atoi(optarg);
break;
case 'b':
if (ctx->num_batch_files < MAX_BATCH_FILES) {
ctx->batch_files[ctx->num_batch_files++] = optarg;
}
break;
case 'N':
ctx->flags |= TEST_FLAG_NUMERIC_FMT;
break;
case 'f':
ctx->flags |= TEST_FLAG_PRINT_FINAL;
break;
case 'v':
ctx->flags |= TEST_FLAG_PRINT_CSV;
break;
case 'c':
ctx->flags |= TEST_FLAG_SET_AFFINITY;
ctx->cpu = atoi(optarg);
break;
case 'P':
#if HAVE_MPI
ctx->mpi = atoi(optarg) && mpi_initialized;
break;
#endif
case 'h':
usage(ctx, ucs_basename(argv[0]));
return UCS_ERR_CANCELED;
default:
status = parse_test_params(&ctx->params, c, optarg);
if (status != UCS_OK) {
usage(ctx, ucs_basename(argv[0]));
return status;
}
break;
}
}
if (optind < argc) {
ctx->server_addr = argv[optind];
}
return UCS_OK;
}
static unsigned sock_rte_group_size(void *rte_group)
{
return 2;
}
static unsigned sock_rte_group_index(void *rte_group)
{
sock_rte_group_t *group = rte_group;
return group->is_server ? 0 : 1;
}
static void sock_rte_barrier(void *rte_group, void (*progress)(void *arg),
void *arg)
{
#pragma omp barrier
#pragma omp master
{
sock_rte_group_t *group = rte_group;
const unsigned magic = 0xdeadbeef;
unsigned sync;
sync = magic;
safe_send(group->connfd, &sync, sizeof(unsigned), progress, arg);
sync = 0;
safe_recv(group->connfd, &sync, sizeof(unsigned), progress, arg);
ucs_assert(sync == magic);
}
#pragma omp barrier
}
static void sock_rte_post_vec(void *rte_group, const struct iovec *iovec,
int iovcnt, void **req)
{
sock_rte_group_t *group = rte_group;
size_t size;
int i;
size = 0;
for (i = 0; i < iovcnt; ++i) {
size += iovec[i].iov_len;
}
safe_send(group->connfd, &size, sizeof(size), NULL, NULL);
for (i = 0; i < iovcnt; ++i) {
safe_send(group->connfd, iovec[i].iov_base, iovec[i].iov_len, NULL,
NULL);
}
}
static void sock_rte_recv(void *rte_group, unsigned src, void *buffer,
size_t max, void *req)
{
sock_rte_group_t *group = rte_group;
int group_index;
size_t size;
group_index = sock_rte_group_index(rte_group);
if (src == group_index) {
return;
}
ucs_assert_always(src == (1 - group_index));
safe_recv(group->connfd, &size, sizeof(size), NULL, NULL);
ucs_assert_always(size <= max);
safe_recv(group->connfd, buffer, size, NULL, NULL);
}
static void sock_rte_report(void *rte_group, const ucx_perf_result_t *result,
void *arg, int is_final)
{
struct perftest_context *ctx = arg;
print_progress(ctx->test_names, ctx->num_batch_files, result, ctx->flags,
is_final);
}
static ucx_perf_rte_t sock_rte = {
.group_size = sock_rte_group_size,
.group_index = sock_rte_group_index,
.barrier = sock_rte_barrier,
.post_vec = sock_rte_post_vec,
.recv = sock_rte_recv,
.exchange_vec = (ucx_perf_rte_exchange_vec_func_t)ucs_empty_function,
.report = sock_rte_report,
};
static ucs_status_t setup_sock_rte(struct perftest_context *ctx)
{
struct sockaddr_in inaddr;
struct hostent *he;
ucs_status_t status;
int optval = 1;
int sockfd, connfd;
int ret;
sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0) {
ucs_error("socket() failed: %m");
status = UCS_ERR_IO_ERROR;
goto err;
}
if (ctx->server_addr == NULL) {
optval = 1;
status = ucs_socket_setopt(sockfd, SOL_SOCKET, SO_REUSEADDR,
&optval, sizeof(optval));
if (status != UCS_OK) {
goto err_close_sockfd;
}
inaddr.sin_family = AF_INET;
inaddr.sin_port = htons(ctx->port);
inaddr.sin_addr.s_addr = INADDR_ANY;
memset(inaddr.sin_zero, 0, sizeof(inaddr.sin_zero));
ret = bind(sockfd, (struct sockaddr*)&inaddr, sizeof(inaddr));
if (ret < 0) {
ucs_error("bind() failed: %m");
status = UCS_ERR_INVALID_ADDR;
goto err_close_sockfd;
}
ret = listen(sockfd, 10);
if (ret < 0) {
ucs_error("listen() failed: %m");
status = UCS_ERR_IO_ERROR;
goto err_close_sockfd;
}
printf("Waiting for connection...\n");
/* Accept next connection */
connfd = accept(sockfd, NULL, NULL);
if (connfd < 0) {
ucs_error("accept() failed: %m");
status = UCS_ERR_IO_ERROR;
goto err_close_sockfd;
}
close(sockfd);
ret = safe_recv(connfd, &ctx->params, sizeof(ctx->params), NULL, NULL);
if (ret) {
status = UCS_ERR_IO_ERROR;
goto err_close_connfd;
}
if (ctx->params.msg_size_cnt) {
ctx->params.msg_size_list = calloc(ctx->params.msg_size_cnt,
sizeof(*ctx->params.msg_size_list));
if (NULL == ctx->params.msg_size_list) {
status = UCS_ERR_NO_MEMORY;
goto err_close_connfd;
}
ret = safe_recv(connfd, ctx->params.msg_size_list,
sizeof(*ctx->params.msg_size_list) *
ctx->params.msg_size_cnt,
NULL, NULL);
if (ret) {
status = UCS_ERR_IO_ERROR;
goto err_close_connfd;
}
}
ctx->sock_rte_group.connfd = connfd;
ctx->sock_rte_group.is_server = 1;
} else {
he = gethostbyname(ctx->server_addr);
if (he == NULL || he->h_addr_list == NULL) {
ucs_error("host %s not found: %s", ctx->server_addr,
hstrerror(h_errno));
status = UCS_ERR_INVALID_ADDR;
goto err_close_sockfd;
}
inaddr.sin_family = he->h_addrtype;
inaddr.sin_port = htons(ctx->port);
ucs_assert(he->h_length == sizeof(inaddr.sin_addr));
memcpy(&inaddr.sin_addr, he->h_addr_list[0], he->h_length);
memset(inaddr.sin_zero, 0, sizeof(inaddr.sin_zero));
ret = connect(sockfd, (struct sockaddr*)&inaddr, sizeof(inaddr));
if (ret < 0) {
ucs_error("connect() failed: %m");
status = UCS_ERR_UNREACHABLE;
goto err_close_sockfd;
}
safe_send(sockfd, &ctx->params, sizeof(ctx->params), NULL, NULL);
if (ctx->params.msg_size_cnt) {
safe_send(sockfd, ctx->params.msg_size_list,
sizeof(*ctx->params.msg_size_list) * ctx->params.msg_size_cnt,
NULL, NULL);
}
ctx->sock_rte_group.connfd = sockfd;
ctx->sock_rte_group.is_server = 0;
}
if (ctx->sock_rte_group.is_server) {
ctx->flags |= TEST_FLAG_PRINT_TEST;
} else {
ctx->flags |= TEST_FLAG_PRINT_RESULTS;
}
ctx->params.rte_group = &ctx->sock_rte_group;
ctx->params.rte = &sock_rte;
ctx->params.report_arg = ctx;
return UCS_OK;
err_close_connfd:
close(connfd);
goto err;
err_close_sockfd:
close(sockfd);
err:
return status;
}
static ucs_status_t cleanup_sock_rte(struct perftest_context *ctx)
{
close(ctx->sock_rte_group.connfd);
return UCS_OK;
}
#if HAVE_MPI
static unsigned mpi_rte_group_size(void *rte_group)
{
int size;
MPI_Comm_size(MPI_COMM_WORLD, &size);
return size;
}
static unsigned mpi_rte_group_index(void *rte_group)
{
int rank;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
return rank;
}
static void mpi_rte_barrier(void *rte_group, void (*progress)(void *arg),
void *arg)
{
int group_size, my_rank, i;
MPI_Request *reqs;
int nreqs = 0;
int dummy;
int flag;
#pragma omp barrier
#pragma omp master
/*
* Naive non-blocking barrier implementation over send/recv, to call user
* progress while waiting for completion.
* Not using MPI_Ibarrier to be compatible with MPI-1.
*/
MPI_Comm_rank(MPI_COMM_WORLD, &my_rank);
MPI_Comm_size(MPI_COMM_WORLD, &group_size);
/* allocate maximal possible number of requests */
reqs = (MPI_Request*)alloca(sizeof(*reqs) * group_size);
if (my_rank == 0) {
/* root gathers "ping" from all other ranks */
for (i = 1; i < group_size; ++i) {
MPI_Irecv(&dummy, 0, MPI_INT,
i /* source */,
1 /* tag */,
MPI_COMM_WORLD,
&reqs[nreqs++]);
}
} else {
/* every non-root rank sends "ping" and waits for "pong" */
MPI_Send(&dummy, 0, MPI_INT,
0 /* dest */,
1 /* tag */,
MPI_COMM_WORLD);
MPI_Irecv(&dummy, 0, MPI_INT,
0 /* source */,
2 /* tag */,
MPI_COMM_WORLD,
&reqs[nreqs++]);
}
/* Waiting for receive requests */
do {
MPI_Testall(nreqs, reqs, &flag, MPI_STATUSES_IGNORE);
progress(arg);
} while (!flag);
if (my_rank == 0) {
/* root sends "pong" to all ranks */
for (i = 1; i < group_size; ++i) {
MPI_Send(&dummy, 0, MPI_INT,
i /* dest */,
2 /* tag */,
MPI_COMM_WORLD);
}
}
#pragma omp barrier
}
static void mpi_rte_post_vec(void *rte_group, const struct iovec *iovec,
int iovcnt, void **req)
{
int group_size;
int my_rank;
int dest, i;
MPI_Comm_rank(MPI_COMM_WORLD, &my_rank);
MPI_Comm_size(MPI_COMM_WORLD, &group_size);
for (dest = 0; dest < group_size; ++dest) {
if (dest == my_rank) {
continue;
}
for (i = 0; i < iovcnt; ++i) {
MPI_Send(iovec[i].iov_base, iovec[i].iov_len, MPI_BYTE, dest,
i == (iovcnt - 1), /* Send last iov with tag == 1 */
MPI_COMM_WORLD);
}
}
*req = (void*)(uintptr_t)1;
}
static void mpi_rte_recv(void *rte_group, unsigned src, void *buffer, size_t max,
void *req)
{
MPI_Status status;
size_t offset;
int my_rank;
int count;
MPI_Comm_rank(MPI_COMM_WORLD, &my_rank);
if (src == my_rank) {
return;
}
offset = 0;
do {
ucs_assert_always(offset < max);
MPI_Recv(buffer + offset, max - offset, MPI_BYTE, src, MPI_ANY_TAG,
MPI_COMM_WORLD, &status);
MPI_Get_count(&status, MPI_BYTE, &count);
offset += count;
} while (status.MPI_TAG != 1);
}
static void mpi_rte_report(void *rte_group, const ucx_perf_result_t *result,
void *arg, int is_final)
{
struct perftest_context *ctx = arg;
print_progress(ctx->test_names, ctx->num_batch_files, result, ctx->flags,
is_final);
}
static ucx_perf_rte_t mpi_rte = {
.group_size = mpi_rte_group_size,
.group_index = mpi_rte_group_index,
.barrier = mpi_rte_barrier,
.post_vec = mpi_rte_post_vec,
.recv = mpi_rte_recv,
.exchange_vec = (void*)ucs_empty_function,
.report = mpi_rte_report,
};
#elif HAVE_RTE
static unsigned ext_rte_group_size(void *rte_group)
{
rte_group_t group = (rte_group_t)rte_group;
return rte_group_size(group);
}
static unsigned ext_rte_group_index(void *rte_group)
{
rte_group_t group = (rte_group_t)rte_group;
return rte_group_rank(group);
}
static void ext_rte_barrier(void *rte_group, void (*progress)(void *arg),
void *arg)
{
#pragma omp barrier
#pragma omp master
{
rte_group_t group = (rte_group_t)rte_group;
int rc;
rc = rte_barrier(group);
if (RTE_SUCCESS != rc) {
ucs_error("Failed to rte_barrier");
}
}
#pragma omp barrier
}
static void ext_rte_post_vec(void *rte_group, const struct iovec* iovec,
int iovcnt, void **req)
{
rte_group_t group = (rte_group_t)rte_group;
rte_srs_session_t session;
rte_iovec_t *r_vec;
int i, rc;
rc = rte_srs_session_create(group, 0, &session);
if (RTE_SUCCESS != rc) {
ucs_error("Failed to rte_srs_session_create");
}
r_vec = calloc(iovcnt, sizeof(rte_iovec_t));
if (r_vec == NULL) {
return;
}
for (i = 0; i < iovcnt; ++i) {
r_vec[i].iov_base = iovec[i].iov_base;
r_vec[i].type = rte_datatype_uint8_t;
r_vec[i].count = iovec[i].iov_len;
}
rc = rte_srs_set_data(session, "KEY_PERF", r_vec, iovcnt);
if (RTE_SUCCESS != rc) {
ucs_error("Failed to rte_srs_set_data");
}
*req = session;
free(r_vec);
}
static void ext_rte_recv(void *rte_group, unsigned src, void *buffer,
size_t max, void *req)
{
rte_group_t group = (rte_group_t)rte_group;
rte_srs_session_t session = (rte_srs_session_t)req;
void *rte_buffer = NULL;
rte_iovec_t r_vec;
uint32_t offset;
int size;
int rc;
rc = rte_srs_get_data(session, rte_group_index_to_ec(group, src),
"KEY_PERF", &rte_buffer, &size);
if (RTE_SUCCESS != rc) {
ucs_error("Failed to rte_srs_get_data");
return;
}
r_vec.iov_base = buffer;
r_vec.type = rte_datatype_uint8_t;
r_vec.count = max;
offset = 0;
rte_unpack(&r_vec, rte_buffer, &offset);
rc = rte_srs_session_destroy(session);
if (RTE_SUCCESS != rc) {
ucs_error("Failed to rte_srs_session_destroy");
}
free(rte_buffer);
}
static void ext_rte_exchange_vec(void *rte_group, void * req)
{
rte_srs_session_t session = (rte_srs_session_t)req;
int rc;
rc = rte_srs_exchange_data(session);
if (RTE_SUCCESS != rc) {
ucs_error("Failed to rte_srs_exchange_data");
}
}
static void ext_rte_report(void *rte_group, const ucx_perf_result_t *result,
void *arg, int is_final)
{
struct perftest_context *ctx = arg;
print_progress(ctx->test_names, ctx->num_batch_files, result, ctx->flags,
is_final);
}
static ucx_perf_rte_t ext_rte = {
.group_size = ext_rte_group_size,
.group_index = ext_rte_group_index,
.barrier = ext_rte_barrier,
.report = ext_rte_report,
.post_vec = ext_rte_post_vec,
.recv = ext_rte_recv,
.exchange_vec = ext_rte_exchange_vec,
};
#endif
static ucs_status_t setup_mpi_rte(struct perftest_context *ctx)
{
ucs_trace_func("");
#if HAVE_MPI
int size, rank;
MPI_Comm_size(MPI_COMM_WORLD, &size);
if (size != 2) {
ucs_error("This test should run with exactly 2 processes (actual: %d)", size);
return UCS_ERR_INVALID_PARAM;
}
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
if (rank == 1) {
ctx->flags |= TEST_FLAG_PRINT_RESULTS;
}
ctx->params.rte_group = NULL;
ctx->params.rte = &mpi_rte;
ctx->params.report_arg = ctx;
#elif HAVE_RTE
rte_group_t group;
rte_init(NULL, NULL, &group);
if (1 == rte_group_rank(group)) {
ctx->flags |= TEST_FLAG_PRINT_RESULTS;
}
ctx->params.rte_group = group;
ctx->params.rte = &ext_rte;
ctx->params.report_arg = ctx;
#endif
return UCS_OK;
}
static ucs_status_t cleanup_mpi_rte(struct perftest_context *ctx)
{
#if HAVE_RTE
rte_finalize();
#endif
return UCS_OK;
}
static ucs_status_t check_system(struct perftest_context *ctx)
{
ucs_sys_cpuset_t cpuset;
unsigned i, count, nr_cpus;
int ret;
ucs_trace_func("");
ret = sysconf(_SC_NPROCESSORS_CONF);
if (ret < 0) {
ucs_error("failed to get local cpu count: %m");
return UCS_ERR_INVALID_PARAM;
}
nr_cpus = ret;
memset(&cpuset, 0, sizeof(cpuset));
if (ctx->flags & TEST_FLAG_SET_AFFINITY) {
if (ctx->cpu >= nr_cpus) {
ucs_error("cpu (%u) ot of range (0..%u)", ctx->cpu, nr_cpus - 1);
return UCS_ERR_INVALID_PARAM;
}
CPU_SET(ctx->cpu, &cpuset);
ret = ucs_sys_setaffinity(&cpuset);
if (ret) {
ucs_warn("sched_setaffinity() failed: %m");
return UCS_ERR_INVALID_PARAM;
}
} else {
ret = ucs_sys_getaffinity(&cpuset);
if (ret) {
ucs_warn("sched_getaffinity() failed: %m");
return UCS_ERR_INVALID_PARAM;
}
count = 0;
for (i = 0; i < CPU_SETSIZE; ++i) {
if (CPU_ISSET(i, &cpuset)) {
++count;
}
}
if (count > 2) {
ucs_warn("CPU affinity is not set (bound to %u cpus)."
" Performance may be impacted.", count);
}
}
return UCS_OK;
}
static ucs_status_t clone_params(ucx_perf_params_t *dest,
const ucx_perf_params_t *src)
{
size_t msg_size_list_size;
*dest = *src;
msg_size_list_size = dest->msg_size_cnt * sizeof(*dest->msg_size_list);
dest->msg_size_list = malloc(msg_size_list_size);
if (dest->msg_size_list == NULL) {
return ((msg_size_list_size != 0) ? UCS_ERR_NO_MEMORY : UCS_OK);
}
memcpy(dest->msg_size_list, src->msg_size_list, msg_size_list_size);
return UCS_OK;
}
static ucs_status_t run_test_recurs(struct perftest_context *ctx,
ucx_perf_params_t *parent_params,
unsigned depth)
{
ucx_perf_params_t params;
ucx_perf_result_t result;
ucs_status_t status;
FILE *batch_file;
int line_num;
ucs_trace_func("depth=%u, num_files=%u", depth, ctx->num_batch_files);
if (parent_params->api == UCX_PERF_API_UCP) {
if (strcmp(parent_params->uct.dev_name, TL_RESOURCE_NAME_NONE)) {
ucs_warn("-d '%s' ignored for UCP test; see NOTES section in help message",
parent_params->uct.dev_name);
}
if (strcmp(parent_params->uct.tl_name, TL_RESOURCE_NAME_NONE)) {
ucs_warn("-x '%s' ignored for UCP test; see NOTES section in help message",
parent_params->uct.tl_name);
}
}
if (depth >= ctx->num_batch_files) {
print_test_name(ctx);
return ucx_perf_run(parent_params, &result);
}
batch_file = fopen(ctx->batch_files[depth], "r");
if (batch_file == NULL) {
ucs_error("Failed to open batch file '%s': %m", ctx->batch_files[depth]);
return UCS_ERR_IO_ERROR;
}
status = clone_params(¶ms, parent_params);
if (status != UCS_OK) {
goto out;
}
line_num = 0;
while ((status = read_batch_file(batch_file, ctx->batch_files[depth],
&line_num, ¶ms,
&ctx->test_names[depth])) == UCS_OK) {
run_test_recurs(ctx, ¶ms, depth + 1);
free(params.msg_size_list);
free(ctx->test_names[depth]);
ctx->test_names[depth] = NULL;
status = clone_params(¶ms, parent_params);
if (status != UCS_OK) {
goto out;
}
}
if (status == UCS_ERR_NO_ELEM) {
status = UCS_OK;
}
free(params.msg_size_list);
out:
fclose(batch_file);
return status;
}
static ucs_status_t run_test(struct perftest_context *ctx)
{
ucs_status_t status;
ucs_trace_func("");
setlocale(LC_ALL, "en_US");
print_header(ctx);
status = run_test_recurs(ctx, &ctx->params, 0);
if (status != UCS_OK) {
ucs_error("Failed to run test: %s", ucs_status_string(status));
}
return status;
}
int main(int argc, char **argv)
{
struct perftest_context ctx;
ucs_status_t status;
int mpi_initialized;
int mpi_rte;
int ret;
#if HAVE_MPI
mpi_initialized = !isatty(0) && (MPI_Init(&argc, &argv) == 0);
#else
mpi_initialized = 0;
#endif
/* Parse command line */
status = parse_opts(&ctx, mpi_initialized, argc, argv);
if (status != UCS_OK) {
ret = (status == UCS_ERR_CANCELED) ? 0 : -127;
goto out;
}
#ifdef __COVERITY__
/* coverity[dont_call] */
mpi_rte = rand(); /* Shut up deadcode error */
#endif
if (ctx.mpi) {
mpi_rte = 1;
} else {
#if HAVE_RTE
mpi_rte = 1;
#else
mpi_rte = 0;
#endif
}
status = check_system(&ctx);
if (status != UCS_OK) {
ret = -1;
goto out;
}
/* Create RTE */
status = (mpi_rte) ? setup_mpi_rte(&ctx) : setup_sock_rte(&ctx);
if (status != UCS_OK) {
ret = -1;
goto out;
}
/* Run the test */
status = run_test(&ctx);
if (status != UCS_OK) {
ret = -1;
goto out_cleanup_rte;
}
ret = 0;
out_cleanup_rte:
(mpi_rte) ? cleanup_mpi_rte(&ctx) : cleanup_sock_rte(&ctx);
out:
if (ctx.params.msg_size_list) {
free(ctx.params.msg_size_list);
}
if (mpi_initialized) {
#if HAVE_MPI
MPI_Finalize();
#endif
}
return ret;
}