/* * librdkafka - Apache Kafka C library * * Copyright (c) 2012-2013, Magnus Edenhill * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ #define _CRT_RAND_S // rand_s() on MSVC #include #include "test.h" #include #include #include /* Typical include path would be , but this program * is built from within the librdkafka source tree and thus differs. */ #include "rdkafka.h" int test_level = 2; int test_seed = 0; char test_mode[64] = "bare"; static int test_exit = 0; static char test_topic_prefix[128] = "rdkafkatest"; static int test_topic_random = 0; int tests_running_cnt = 0; static int test_concurrent_max = 5; int test_assert_on_fail = 0; double test_timeout_multiplier = 1.0; static char *test_sql_cmd = NULL; int test_session_timeout_ms = 6000; int test_broker_version; static const char *test_broker_version_str = "0.9.0.0"; int test_flags = 0; int test_neg_flags = TEST_F_KNOWN_ISSUE; /* run delete-test-topics.sh between each test (when concurrent_max = 1) */ static int test_delete_topics_between = 0; static const char *test_git_version = "HEAD"; static const char *test_sockem_conf = ""; int test_on_ci = 0; /* Tests are being run on CI, be more forgiving * with regards to timeouts, etc. */ static int show_summary = 1; static int test_summary (int do_lock); /** * Protects shared state, such as tests[] */ mtx_t test_mtx; static const char *test_states[] = { "DNS", "SKIPPED", "RUNNING", "PASSED", "FAILED", }; #define _TEST_DECL(NAME) \ extern int main_ ## NAME (int, char **) #define _TEST(NAME,FLAGS,...) \ { .name = # NAME, .mainfunc = main_ ## NAME, .flags = FLAGS, __VA_ARGS__ } /** * Declare all tests here */ _TEST_DECL(0000_unittests); _TEST_DECL(0001_multiobj); _TEST_DECL(0002_unkpart); _TEST_DECL(0003_msgmaxsize); _TEST_DECL(0004_conf); _TEST_DECL(0005_order); _TEST_DECL(0006_symbols); _TEST_DECL(0007_autotopic); _TEST_DECL(0008_reqacks); _TEST_DECL(0011_produce_batch); _TEST_DECL(0012_produce_consume); _TEST_DECL(0013_null_msgs); _TEST_DECL(0014_reconsume_191); _TEST_DECL(0015_offsets_seek); _TEST_DECL(0017_compression); _TEST_DECL(0018_cgrp_term); _TEST_DECL(0019_list_groups); _TEST_DECL(0020_destroy_hang); _TEST_DECL(0021_rkt_destroy); _TEST_DECL(0022_consume_batch); _TEST_DECL(0025_timers); _TEST_DECL(0026_consume_pause); _TEST_DECL(0028_long_topicnames); _TEST_DECL(0029_assign_offset); _TEST_DECL(0030_offset_commit); _TEST_DECL(0031_get_offsets); _TEST_DECL(0033_regex_subscribe); _TEST_DECL(0033_regex_subscribe_local); _TEST_DECL(0034_offset_reset); _TEST_DECL(0035_api_version); _TEST_DECL(0036_partial_fetch); _TEST_DECL(0037_destroy_hang_local); _TEST_DECL(0038_performance); _TEST_DECL(0039_event); _TEST_DECL(0040_io_event); _TEST_DECL(0041_fetch_max_bytes); _TEST_DECL(0042_many_topics); _TEST_DECL(0043_no_connection); _TEST_DECL(0044_partition_cnt); _TEST_DECL(0045_subscribe_update); _TEST_DECL(0045_subscribe_update_topic_remove); _TEST_DECL(0045_subscribe_update_non_exist_and_partchange); _TEST_DECL(0046_rkt_cache); _TEST_DECL(0047_partial_buf_tmout); _TEST_DECL(0048_partitioner); _TEST_DECL(0049_consume_conn_close); _TEST_DECL(0050_subscribe_adds); _TEST_DECL(0051_assign_adds); _TEST_DECL(0052_msg_timestamps); _TEST_DECL(0053_stats_cb); _TEST_DECL(0054_offset_time); _TEST_DECL(0055_producer_latency); _TEST_DECL(0056_balanced_group_mt); _TEST_DECL(0057_invalid_topic); _TEST_DECL(0058_log); _TEST_DECL(0059_bsearch); _TEST_DECL(0060_op_prio); _TEST_DECL(0061_consumer_lag); _TEST_DECL(0062_stats_event); _TEST_DECL(0063_clusterid); _TEST_DECL(0064_interceptors); _TEST_DECL(0065_yield); _TEST_DECL(0066_plugins); _TEST_DECL(0067_empty_topic); _TEST_DECL(0068_produce_timeout); _TEST_DECL(0069_consumer_add_parts); _TEST_DECL(0070_null_empty); _TEST_DECL(0072_headers_ut); _TEST_DECL(0073_headers); _TEST_DECL(0074_producev); _TEST_DECL(0075_retry); _TEST_DECL(0076_produce_retry); _TEST_DECL(0077_compaction); _TEST_DECL(0078_c_from_cpp); _TEST_DECL(0079_fork); _TEST_DECL(0081_fetch_max_bytes); /* Manual tests */ _TEST_DECL(8000_idle); /** * Define all tests here */ struct test tests[] = { /* Special MAIN test to hold over-all timings, etc. */ { .name = "
", .flags = TEST_F_LOCAL }, _TEST(0000_unittests, TEST_F_LOCAL), _TEST(0001_multiobj, 0), _TEST(0002_unkpart, 0), _TEST(0003_msgmaxsize, 0), _TEST(0004_conf, TEST_F_LOCAL), _TEST(0005_order, 0), _TEST(0006_symbols, TEST_F_LOCAL), _TEST(0007_autotopic, 0), _TEST(0008_reqacks, 0), _TEST(0011_produce_batch, 0), _TEST(0012_produce_consume, 0), _TEST(0013_null_msgs, 0), _TEST(0014_reconsume_191, 0), _TEST(0015_offsets_seek, 0), _TEST(0017_compression, 0), _TEST(0018_cgrp_term, 0, TEST_BRKVER(0,9,0,0)), _TEST(0019_list_groups, 0, TEST_BRKVER(0,9,0,0)), _TEST(0020_destroy_hang, 0, TEST_BRKVER(0,9,0,0)), _TEST(0021_rkt_destroy, 0), _TEST(0022_consume_batch, 0), _TEST(0025_timers, TEST_F_LOCAL), _TEST(0026_consume_pause, 0, TEST_BRKVER(0,9,0,0)), _TEST(0028_long_topicnames, TEST_F_KNOWN_ISSUE, TEST_BRKVER(0,9,0,0), .extra = "https://github.com/edenhill/librdkafka/issues/529"), _TEST(0029_assign_offset, 0), _TEST(0030_offset_commit, 0, TEST_BRKVER(0,9,0,0)), _TEST(0031_get_offsets, 0), _TEST(0033_regex_subscribe, 0, TEST_BRKVER(0,9,0,0)), _TEST(0033_regex_subscribe_local, TEST_F_LOCAL), _TEST(0034_offset_reset, 0), _TEST(0035_api_version, 0), _TEST(0036_partial_fetch, 0), _TEST(0037_destroy_hang_local, TEST_F_LOCAL), _TEST(0038_performance, 0), _TEST(0039_event, 0), _TEST(0040_io_event, 0, TEST_BRKVER(0,9,0,0)), _TEST(0041_fetch_max_bytes, 0), _TEST(0042_many_topics, 0), _TEST(0043_no_connection, TEST_F_LOCAL), _TEST(0044_partition_cnt, 0), _TEST(0045_subscribe_update, 0, TEST_BRKVER(0,9,0,0)), _TEST(0045_subscribe_update_topic_remove, TEST_F_KNOWN_ISSUE, TEST_BRKVER(0,9,0,0)), _TEST(0045_subscribe_update_non_exist_and_partchange, 0, TEST_BRKVER(0,9,0,0)), _TEST(0046_rkt_cache, TEST_F_LOCAL), _TEST(0047_partial_buf_tmout, TEST_F_KNOWN_ISSUE), _TEST(0048_partitioner, 0), #if WITH_SOCKEM _TEST(0049_consume_conn_close, 0, TEST_BRKVER(0,9,0,0)), #endif _TEST(0050_subscribe_adds, 0, TEST_BRKVER(0,9,0,0)), _TEST(0051_assign_adds, 0, TEST_BRKVER(0,9,0,0)), _TEST(0052_msg_timestamps, 0, TEST_BRKVER(0,10,0,0)), _TEST(0053_stats_cb, TEST_F_LOCAL), _TEST(0054_offset_time, 0, TEST_BRKVER(0,10,1,0)), _TEST(0055_producer_latency, TEST_F_KNOWN_ISSUE_WIN32), _TEST(0056_balanced_group_mt, 0, TEST_BRKVER(0,9,0,0)), _TEST(0057_invalid_topic, 0, TEST_BRKVER(0,9,0,0)), _TEST(0058_log, TEST_F_LOCAL), _TEST(0059_bsearch, 0, TEST_BRKVER(0,10,0,0)), _TEST(0060_op_prio, 0, TEST_BRKVER(0,9,0,0)), _TEST(0061_consumer_lag, 0), _TEST(0062_stats_event, TEST_F_LOCAL), _TEST(0063_clusterid, 0, TEST_BRKVER(0,10,1,0)), _TEST(0064_interceptors, 0, TEST_BRKVER(0,9,0,0)), _TEST(0065_yield, 0), _TEST(0066_plugins, TEST_F_LOCAL|TEST_F_KNOWN_ISSUE_WIN32|TEST_F_KNOWN_ISSUE_OSX, .extra = "dynamic loading of tests might not be fixed for this platform"), _TEST(0067_empty_topic, 0), #if WITH_SOCKEM _TEST(0068_produce_timeout, 0), #endif _TEST(0069_consumer_add_parts, TEST_F_KNOWN_ISSUE_WIN32, TEST_BRKVER(0,9,0,0)), _TEST(0070_null_empty, 0), _TEST(0072_headers_ut, TEST_F_LOCAL), _TEST(0073_headers, 0, TEST_BRKVER(0,11,0,0)), _TEST(0074_producev, TEST_F_LOCAL), #if WITH_SOCKEM _TEST(0075_retry, 0), #endif _TEST(0076_produce_retry, 0), _TEST(0077_compaction, 0, TEST_BRKVER(0,9,0,0)), _TEST(0078_c_from_cpp, TEST_F_LOCAL), _TEST(0079_fork, TEST_F_LOCAL|TEST_F_KNOWN_ISSUE, .extra = "using a fork():ed rd_kafka_t is not supported and will " "most likely hang"), _TEST(0081_fetch_max_bytes, 0, TEST_BRKVER(0,10,1,0)), /* Manual tests */ _TEST(8000_idle, TEST_F_MANUAL), { NULL } }; RD_TLS struct test *test_curr = &tests[0]; #if WITH_SOCKEM /** * Socket network emulation with sockem */ static void test_socket_add (struct test *test, sockem_t *skm) { TEST_LOCK(); rd_list_add(&test->sockets, skm); TEST_UNLOCK(); } static void test_socket_del (struct test *test, sockem_t *skm, int do_lock) { if (do_lock) TEST_LOCK(); /* Best effort, skm might not have been added if connect_cb failed */ rd_list_remove(&test->sockets, skm); if (do_lock) TEST_UNLOCK(); } int test_socket_sockem_set_all (const char *key, int val) { int i; sockem_t *skm; int cnt = 0; TEST_LOCK(); cnt = rd_list_cnt(&test_curr->sockets); TEST_SAY("Setting sockem %s=%d on %s%d socket(s)\n", key, val, cnt > 0 ? "" : _C_RED, cnt); RD_LIST_FOREACH(skm, &test_curr->sockets, i) { if (sockem_set(skm, key, val, NULL) == -1) TEST_FAIL("sockem_set(%s, %d) failed", key, val); } TEST_UNLOCK(); return cnt; } void test_socket_close_all (struct test *test, int reinit) { TEST_LOCK(); rd_list_destroy(&test->sockets); if (reinit) rd_list_init(&test->sockets, 16, (void *)sockem_close); TEST_UNLOCK(); } static int test_connect_cb (int s, const struct sockaddr *addr, int addrlen, const char *id, void *opaque) { struct test *test = opaque; sockem_t *skm; int r; skm = sockem_connect(s, addr, addrlen, test_sockem_conf, 0, NULL); if (!skm) return errno; if (test->connect_cb) { r = test->connect_cb(test, skm, id); if (r) return r; } test_socket_add(test, skm); return 0; } static int test_closesocket_cb (int s, void *opaque) { struct test *test = opaque; sockem_t *skm; TEST_LOCK(); skm = sockem_find(s); if (skm) { sockem_close(skm); test_socket_del(test, skm, 0/*nolock*/); } else { #ifdef _MSC_VER closesocket(s); #else close(s); #endif } TEST_UNLOCK(); return 0; } void test_socket_enable (rd_kafka_conf_t *conf) { rd_kafka_conf_set_connect_cb(conf, test_connect_cb); rd_kafka_conf_set_closesocket_cb(conf, test_closesocket_cb); rd_kafka_conf_set_opaque(conf, test_curr); } #endif /* WITH_SOCKEM */ static void test_error_cb (rd_kafka_t *rk, int err, const char *reason, void *opaque) { if (test_curr->is_fatal_cb && !test_curr->is_fatal_cb(rk, err, reason)) TEST_SAY(_C_YEL "rdkafka error (non-fatal): %s: %s\n", rd_kafka_err2str(err), reason); else TEST_FAIL("rdkafka error: %s: %s", rd_kafka_err2str(err), reason); } static int test_stats_cb (rd_kafka_t *rk, char *json, size_t json_len, void *opaque) { struct test *test = test_curr; if (test->stats_fp) fprintf(test->stats_fp, "{\"test\": \"%s\", \"instance\":\"%s\", " "\"stats\": %s}\n", test->name, rd_kafka_name(rk), json); return 0; } /** * @brief Limit the test run time (in seconds) */ void test_timeout_set (int timeout) { TEST_LOCK(); TEST_SAY("Setting test timeout to %ds * %.1f\n", timeout, test_timeout_multiplier); timeout = (int)((double)timeout * test_timeout_multiplier); test_curr->timeout = test_clock() + (timeout * 1000000); TEST_UNLOCK(); } int tmout_multip (int msecs) { int r; TEST_LOCK(); r = (int)(((double)(msecs)) * test_timeout_multiplier); TEST_UNLOCK(); return r; } #ifdef _MSC_VER static void test_init_win32 (void) { /* Enable VT emulation to support colored output. */ HANDLE hOut = GetStdHandle(STD_OUTPUT_HANDLE); DWORD dwMode = 0; if (hOut == INVALID_HANDLE_VALUE || !GetConsoleMode(hOut, &dwMode)) return; #ifndef ENABLE_VIRTUAL_TERMINAL_PROCESSING #define ENABLE_VIRTUAL_TERMINAL_PROCESSING 0x4 #endif dwMode |= ENABLE_VIRTUAL_TERMINAL_PROCESSING; SetConsoleMode(hOut, dwMode); } #endif static void test_init (void) { int seed; const char *tmp; if (test_seed) return; if ((tmp = test_getenv("TEST_LEVEL", NULL))) test_level = atoi(tmp); if ((tmp = test_getenv("TEST_MODE", NULL))) strncpy(test_mode, tmp, sizeof(test_mode)-1); if ((tmp = test_getenv("TEST_SOCKEM", NULL))) test_sockem_conf = tmp; if ((tmp = test_getenv("TEST_SEED", NULL))) seed = atoi(tmp); else seed = test_clock() & 0xffffffff; #ifdef _MSC_VER test_init_win32(); { LARGE_INTEGER cycl; QueryPerformanceCounter(&cycl); seed = (int)cycl.QuadPart; } #endif srand(seed); test_seed = seed; } const char *test_mk_topic_name (const char *suffix, int randomized) { static RD_TLS char ret[512]; if (test_topic_random || randomized) rd_snprintf(ret, sizeof(ret), "%s_rnd%"PRIx64"_%s", test_topic_prefix, test_id_generate(), suffix); else rd_snprintf(ret, sizeof(ret), "%s_%s", test_topic_prefix, suffix); TEST_SAY("Using topic \"%s\"\n", ret); return ret; } /** * @brief Set special test config property * @returns 1 if property was known, else 0. */ int test_set_special_conf (const char *name, const char *val, int *timeoutp) { if (!strcmp(name, "test.timeout.multiplier")) { TEST_LOCK(); test_timeout_multiplier = strtod(val, NULL); TEST_UNLOCK(); *timeoutp = tmout_multip((*timeoutp)*1000) / 1000; } else if (!strcmp(name, "test.topic.prefix")) { rd_snprintf(test_topic_prefix, sizeof(test_topic_prefix), "%s", val); } else if (!strcmp(name, "test.topic.random")) { if (!strcmp(val, "true") || !strcmp(val, "1")) test_topic_random = 1; else test_topic_random = 0; } else if (!strcmp(name, "test.concurrent.max")) { TEST_LOCK(); test_concurrent_max = (int)strtod(val, NULL); TEST_UNLOCK(); } else if (!strcmp(name, "test.sql.command")) { TEST_LOCK(); if (test_sql_cmd) rd_free(test_sql_cmd); test_sql_cmd = rd_strdup(val); TEST_UNLOCK(); } else return 0; return 1; } static void test_read_conf_file (const char *conf_path, rd_kafka_conf_t *conf, rd_kafka_topic_conf_t *topic_conf, int *timeoutp) { FILE *fp; char buf[1024]; int line = 0; #ifndef _MSC_VER fp = fopen(conf_path, "r"); #else fp = NULL; errno = fopen_s(&fp, conf_path, "r"); #endif if (!fp) { if (errno == ENOENT) { TEST_SAY("Test config file %s not found\n", conf_path); return; } else TEST_FAIL("Failed to read %s: %s", conf_path, strerror(errno)); } while (fgets(buf, sizeof(buf)-1, fp)) { char *t; char *b = buf; rd_kafka_conf_res_t res = RD_KAFKA_CONF_UNKNOWN; char *name, *val; char errstr[512]; line++; if ((t = strchr(b, '\n'))) *t = '\0'; if (*b == '#' || !*b) continue; if (!(t = strchr(b, '='))) TEST_FAIL("%s:%i: expected name=value format\n", conf_path, line); name = b; *t = '\0'; val = t+1; if (test_set_special_conf(name, val, timeoutp)) continue; if (!strncmp(name, "topic.", strlen("topic."))) { name += strlen("topic."); if (topic_conf) res = rd_kafka_topic_conf_set(topic_conf, name, val, errstr, sizeof(errstr)); else res = RD_KAFKA_CONF_OK; name -= strlen("topic."); } if (res == RD_KAFKA_CONF_UNKNOWN) { if (conf) res = rd_kafka_conf_set(conf, name, val, errstr, sizeof(errstr)); else res = RD_KAFKA_CONF_OK; } if (res != RD_KAFKA_CONF_OK) TEST_FAIL("%s:%i: %s\n", conf_path, line, errstr); } fclose(fp); } /** * @brief Get path to test config file */ const char *test_conf_get_path (void) { return test_getenv("RDKAFKA_TEST_CONF", "test.conf"); } const char *test_getenv (const char *env, const char *def) { #ifndef _MSC_VER const char *tmp; tmp = getenv(env); if (tmp && *tmp) return tmp; return def; #else static RD_TLS char tmp[512]; DWORD r; r = GetEnvironmentVariableA(env, tmp, sizeof(tmp)); if (r == 0 || r > sizeof(tmp)) return def; return tmp; #endif } void test_conf_common_init (rd_kafka_conf_t *conf, int timeout) { if (conf) { const char *tmp = test_getenv("TEST_DEBUG", NULL); if (tmp) test_conf_set(conf, "debug", tmp); } if (timeout) test_timeout_set(timeout); } /** * Creates and sets up kafka configuration objects. * Will read "test.conf" file if it exists. */ void test_conf_init (rd_kafka_conf_t **conf, rd_kafka_topic_conf_t **topic_conf, int timeout) { const char *test_conf = test_conf_get_path(); if (conf) { *conf = rd_kafka_conf_new(); rd_kafka_conf_set(*conf, "client.id", test_curr->name, NULL, 0); rd_kafka_conf_set_error_cb(*conf, test_error_cb); rd_kafka_conf_set_stats_cb(*conf, test_stats_cb); #ifdef SIGIO { char buf[64]; /* Quick termination */ rd_snprintf(buf, sizeof(buf), "%i", SIGIO); rd_kafka_conf_set(*conf, "internal.termination.signal", buf, NULL, 0); signal(SIGIO, SIG_IGN); } #endif } #if WITH_SOCKEM if (*test_sockem_conf && conf) test_socket_enable(*conf); #endif if (topic_conf) *topic_conf = rd_kafka_topic_conf_new(); /* Open and read optional local test configuration file, if any. */ test_read_conf_file(test_conf, conf ? *conf : NULL, topic_conf ? *topic_conf : NULL, &timeout); test_conf_common_init(conf ? *conf : NULL, timeout); } /** * Wait 'timeout' seconds for rdkafka to kill all its threads and clean up. */ void test_wait_exit (int timeout) { int r; time_t start = time(NULL); while ((r = rd_kafka_thread_cnt()) && timeout-- >= 0) { TEST_SAY("%i thread(s) in use by librdkafka, waiting...\n", r); rd_sleep(1); } TEST_SAY("%i thread(s) in use by librdkafka\n", r); if (r > 0) TEST_FAIL("%i thread(s) still active in librdkafka", r); timeout -= (int)(time(NULL) - start); if (timeout > 0) { TEST_SAY("Waiting %d seconds for all librdkafka memory " "to be released\n", timeout); if (rd_kafka_wait_destroyed(timeout * 1000) == -1) TEST_FAIL("Not all internal librdkafka " "objects destroyed\n"); } } static RD_INLINE unsigned int test_rand(void) { unsigned int r; #if _MSC_VER rand_s(&r); #else r = rand(); #endif return r; } /** * Generate a "unique" test id. */ uint64_t test_id_generate (void) { return (((uint64_t)test_rand()) << 32) | (uint64_t)test_rand(); } /** * Generate a "unique" string id */ char *test_str_id_generate (char *dest, size_t dest_size) { rd_snprintf(dest, dest_size, "%"PRId64, test_id_generate()); return dest; } /** * Same as test_str_id_generate but returns a temporary string. */ const char *test_str_id_generate_tmp (void) { static RD_TLS char ret[64]; return test_str_id_generate(ret, sizeof(ret)); } /** * Format a message token. * Pad's to dest_size. */ void test_msg_fmt (char *dest, size_t dest_size, uint64_t testid, int32_t partition, int msgid) { size_t of; of = rd_snprintf(dest, dest_size, "testid=%"PRIu64", partition=%"PRId32", msg=%i\n", testid, partition, msgid); if (of < dest_size - 1) { memset(dest+of, '!', dest_size-of); dest[dest_size-1] = '\0'; } } /** * @brief Prepare message value and key for test produce. */ void test_prepare_msg (uint64_t testid, int32_t partition, int msg_id, char *val, size_t val_size, char *key, size_t key_size) { size_t of = 0; test_msg_fmt(key, key_size, testid, partition, msg_id); while (of < val_size) { /* Copy-repeat key into val until val_size */ size_t len = RD_MIN(val_size-of, key_size); memcpy(val+of, key, len); of += len; } } /** * Parse a message token */ void test_msg_parse0 (const char *func, int line, uint64_t testid, rd_kafka_message_t *rkmessage, int32_t exp_partition, int *msgidp) { char buf[128]; uint64_t in_testid; int in_part; if (!rkmessage->key) TEST_FAIL("%s:%i: Message (%s [%"PRId32"] @ %"PRId64") " "has empty key\n", func, line, rd_kafka_topic_name(rkmessage->rkt), rkmessage->partition, rkmessage->offset); rd_snprintf(buf, sizeof(buf), "%.*s", (int)rkmessage->key_len, (char *)rkmessage->key); if (sscanf(buf, "testid=%"SCNu64", partition=%i, msg=%i\n", &in_testid, &in_part, msgidp) != 3) TEST_FAIL("%s:%i: Incorrect key format: %s", func, line, buf); if (testid != in_testid || (exp_partition != -1 && exp_partition != in_part)) TEST_FAIL("%s:%i: Our testid %"PRIu64", part %i did " "not match message: \"%s\"\n", func, line, testid, (int)exp_partition, buf); } struct run_args { struct test *test; int argc; char **argv; }; static int run_test0 (struct run_args *run_args) { struct test *test = run_args->test; test_timing_t t_run; int r; char stats_file[256]; rd_snprintf(stats_file, sizeof(stats_file), "stats_%s_%"PRIu64".json", test->name, test_id_generate()); if (!(test->stats_fp = fopen(stats_file, "w+"))) TEST_SAY("=== Failed to create stats file %s: %s ===\n", stats_file, strerror(errno)); test_curr = test; #if WITH_SOCKEM rd_list_init(&test->sockets, 16, (void *)sockem_close); #endif TEST_SAY("================= Running test %s =================\n", test->name); if (test->stats_fp) TEST_SAY("==== Stats written to file %s ====\n", stats_file); TIMING_START(&t_run, "%s", test->name); test->start = t_run.ts_start; r = test->mainfunc(run_args->argc, run_args->argv); TIMING_STOP(&t_run); TEST_LOCK(); test->duration = TIMING_DURATION(&t_run); if (test->state == TEST_SKIPPED) { TEST_SAY("================= Test %s SKIPPED " "=================\n", run_args->test->name); } else if (r) { test->state = TEST_FAILED; TEST_SAY("\033[31m" "================= Test %s FAILED =================" "\033[0m\n", run_args->test->name); } else { test->state = TEST_PASSED; TEST_SAY("\033[32m" "================= Test %s PASSED =================" "\033[0m\n", run_args->test->name); } TEST_UNLOCK(); #if WITH_SOCKEM test_socket_close_all(test, 0); #endif if (test->stats_fp) { long pos = ftell(test->stats_fp); fclose(test->stats_fp); test->stats_fp = NULL; /* Delete file if nothing was written */ if (pos == 0) { #ifndef _MSC_VER unlink(stats_file); #else _unlink(stats_file); #endif } } #ifndef _MSC_VER if (test_delete_topics_between && test_concurrent_max == 1) { if (system("./delete-test-topics.sh $ZK_ADDRESS " "$KAFKA_PATH/bin/kafka-topics.sh") != 0) { /* ignore failures but avoid unused-result warning */ } } #endif return r; } static int run_test_from_thread (void *arg) { struct run_args *run_args = arg; thrd_detach(thrd_current()); run_test0(run_args); TEST_LOCK(); tests_running_cnt--; TEST_UNLOCK(); free(run_args); return 0; } static int run_test (struct test *test, int argc, char **argv) { struct run_args *run_args = calloc(1, sizeof(*run_args)); int wait_cnt = 0; run_args->test = test; run_args->argc = argc; run_args->argv = argv; TEST_LOCK(); while (tests_running_cnt >= test_concurrent_max) { if (!(wait_cnt++ % 10)) TEST_SAY("Too many tests running (%d >= %d): " "postponing %s start...\n", tests_running_cnt, test_concurrent_max, test->name); TEST_UNLOCK(); rd_sleep(1); TEST_LOCK(); } tests_running_cnt++; test->timeout = test_clock() + (int64_t)(20.0 * 1000000.0 * test_timeout_multiplier); test->state = TEST_RUNNING; TEST_UNLOCK(); if (thrd_create(&test->thrd, run_test_from_thread, run_args) != thrd_success) { TEST_LOCK(); tests_running_cnt--; test->state = TEST_FAILED; TEST_UNLOCK(); TEST_FAIL("Failed to start thread for test %s\n", test->name); } return 0; } static void run_tests (const char *tests_to_run, int argc, char **argv) { struct test *test; for (test = tests ; test->name ; test++) { char testnum[128]; char *t; const char *skip_reason = NULL; char tmp[128]; if (!test->mainfunc) continue; /* Extract test number, as string */ strncpy(testnum, test->name, sizeof(testnum)-1); testnum[sizeof(testnum)-1] = '\0'; if ((t = strchr(testnum, '_'))) *t = '\0'; if ((test_flags && (test_flags & test->flags) != test_flags)) skip_reason = "filtered due to test flags"; if ((test_neg_flags & ~test_flags) & test->flags) skip_reason = "Filtered due to negative test flags"; if (test_broker_version && (test->minver > test_broker_version || (test->maxver && test->maxver < test_broker_version))) { rd_snprintf(tmp, sizeof(tmp), "not applicable for broker " "version %d.%d.%d.%d", TEST_BRKVER_X(test_broker_version, 0), TEST_BRKVER_X(test_broker_version, 1), TEST_BRKVER_X(test_broker_version, 2), TEST_BRKVER_X(test_broker_version, 3)); skip_reason = tmp; } if (tests_to_run && !strstr(tests_to_run, testnum)) skip_reason = "not included in TESTS list"; else if (!tests_to_run && (test->flags & TEST_F_MANUAL)) skip_reason = "manual test"; if (!skip_reason) { run_test(test, argc, argv); } else { TEST_SAYL(3, "================= Skipping test %s (%s)" "================\n", test->name, skip_reason); TEST_LOCK(); test->state = TEST_SKIPPED; TEST_UNLOCK(); } } } /** * @brief Print summary for all tests. * * @returns the number of failed tests. */ static int test_summary (int do_lock) { struct test *test; FILE *report_fp; char report_path[128]; time_t t; struct tm *tm; char datestr[64]; int64_t total_duration = 0; int tests_run = 0; int tests_failed = 0; int tests_failed_known = 0; int tests_passed = 0; FILE *sql_fp = NULL; const char *tmp; t = time(NULL); tm = localtime(&t); strftime(datestr, sizeof(datestr), "%Y%m%d%H%M%S", tm); if ((tmp = test_getenv("TEST_REPORT", NULL))) rd_snprintf(report_path, sizeof(report_path), "%s", tmp); else rd_snprintf(report_path, sizeof(report_path), "test_report_%s.json", datestr); report_fp = fopen(report_path, "w+"); if (!report_fp) TEST_WARN("Failed to create report file %s: %s\n", report_path, strerror(errno)); else fprintf(report_fp, "{ \"id\": \"%s_%s\", \"mode\": \"%s\", " "\"date\": \"%s\", " "\"git_version\": \"%s\", " "\"broker_version\": \"%s\", " "\"tests\": {", datestr, test_mode, test_mode, datestr, test_git_version, test_broker_version_str); if (do_lock) TEST_LOCK(); if (test_sql_cmd) { #ifdef _MSC_VER sql_fp = _popen(test_sql_cmd, "w"); #else sql_fp = popen(test_sql_cmd, "w"); #endif fprintf(sql_fp, "CREATE TABLE IF NOT EXISTS " "runs(runid text PRIMARY KEY, mode text, " "date datetime, cnt int, passed int, failed int, " "duration numeric);\n" "CREATE TABLE IF NOT EXISTS " "tests(runid text, mode text, name text, state text, " "extra text, duration numeric);\n"); } if (show_summary) printf("TEST %s (%s) SUMMARY\n" "#==================================================================#\n", datestr, test_mode); for (test = tests ; test->name ; test++) { const char *color; int64_t duration; char extra[128] = ""; int do_count = 1; if (!(duration = test->duration) && test->start > 0) duration = test_clock() - test->start; if (test == tests) { /*
test: * test accounts for total runtime. * dont include in passed/run/failed counts. */ total_duration = duration; do_count = 0; } switch (test->state) { case TEST_PASSED: color = _C_GRN; if (do_count) { tests_passed++; tests_run++; } break; case TEST_FAILED: if (test->flags & TEST_F_KNOWN_ISSUE) { rd_snprintf(extra, sizeof(extra), " <-- known issue%s%s", test->extra ? ": " : "", test->extra ? test->extra : ""); if (do_count) tests_failed_known++; } color = _C_RED; if (do_count) { tests_failed++; tests_run++; } break; case TEST_RUNNING: color = _C_MAG; if (do_count) { tests_failed++; /* All tests should be finished */ tests_run++; } break; case TEST_NOT_STARTED: color = _C_YEL; break; default: color = _C_CYA; break; } if (show_summary && test->state != TEST_SKIPPED) printf("|%s %-40s | %10s | %7.3fs %s|%s\n", color, test->name, test_states[test->state], (double)duration/1000000.0, _C_CLR, extra); if (report_fp) { int i; fprintf(report_fp, "%s\"%s\": {" "\"name\": \"%s\", " "\"state\": \"%s\", " "\"known_issue\": %s, " "\"extra\": \"%s\", " "\"duration\": %.3f, " "\"report\": [ ", test == tests ? "": ", ", test->name, test->name, test_states[test->state], test->flags & TEST_F_KNOWN_ISSUE ? "true":"false", test->extra ? test->extra : "", (double)duration/1000000.0); for (i = 0 ; i < test->report_cnt ; i++) { fprintf(report_fp, "%s%s ", i == 0 ? "":",", test->report_arr[i]); } fprintf(report_fp, "] }"); } if (sql_fp) fprintf(sql_fp, "INSERT INTO tests VALUES(" "'%s_%s', '%s', '%s', '%s', '%s', %f);\n", datestr, test_mode, test_mode, test->name, test_states[test->state], test->extra ? test->extra : "", (double)duration/1000000.0); } if (do_lock) TEST_UNLOCK(); if (show_summary) printf("#==================================================================#\n"); if (report_fp) { fprintf(report_fp, "}, " "\"tests_run\": %d, " "\"tests_passed\": %d, " "\"tests_failed\": %d, " "\"duration\": %.3f" "}\n", tests_run, tests_passed, tests_failed, (double)total_duration/1000000.0); fclose(report_fp); TEST_SAY("# Test report written to %s\n", report_path); } if (sql_fp) { fprintf(sql_fp, "INSERT INTO runs VALUES('%s_%s', '%s', datetime(), " "%d, %d, %d, %f);\n", datestr, test_mode, test_mode, tests_run, tests_passed, tests_failed, (double)total_duration/1000000.0); fclose(sql_fp); } return tests_failed - tests_failed_known; } #ifndef _MSC_VER static void test_sig_term (int sig) { if (test_exit) exit(1); fprintf(stderr, "Exiting tests, waiting for running tests to finish.\n"); test_exit = 1; } #endif /** * @brief Test framework cleanup before termination. */ static void test_cleanup (void) { struct test *test; /* Free report arrays */ for (test = tests ; test->name ; test++) { int i; if (!test->report_arr) continue; for (i = 0 ; i < test->report_cnt ; i++) rd_free(test->report_arr[i]); rd_free(test->report_arr); test->report_arr = NULL; } if (test_sql_cmd) rd_free(test_sql_cmd); } int main(int argc, char **argv) { const char *tests_to_run = NULL; /* all */ int i, r; test_timing_t t_all; int a,b,c,d; mtx_init(&test_mtx, mtx_plain); test_init(); #ifndef _MSC_VER signal(SIGINT, test_sig_term); #endif tests_to_run = test_getenv("TESTS", NULL); test_broker_version_str = test_getenv("TEST_KAFKA_VERSION", test_broker_version_str); test_git_version = test_getenv("RDKAFKA_GITVER", "HEAD"); /* Are we running on CI? */ if (test_getenv("CI", NULL)) test_on_ci = 1; test_conf_init(NULL, NULL, 10); for (i = 1 ; i < argc ; i++) { if (!strncmp(argv[i], "-p", 2) && strlen(argv[i]) > 2) test_concurrent_max = (int)strtod(argv[i]+2, NULL); else if (!strcmp(argv[i], "-l")) test_flags |= TEST_F_LOCAL; else if (!strcmp(argv[i], "-L")) test_neg_flags |= TEST_F_LOCAL; else if (!strcmp(argv[i], "-a")) test_assert_on_fail = 1; else if (!strcmp(argv[i], "-k")) test_flags |= TEST_F_KNOWN_ISSUE; else if (!strcmp(argv[i], "-K")) test_neg_flags |= TEST_F_KNOWN_ISSUE; else if (!strcmp(argv[i], "-V") && i+1 < argc) test_broker_version_str = argv[++i]; else if (!strcmp(argv[i], "-S")) show_summary = 0; #ifndef _MSC_VER else if (!strcmp(argv[i], "-D")) test_delete_topics_between = 1; #endif else if (*argv[i] != '-') tests_to_run = argv[i]; else { printf("Unknown option: %s\n" "\n" "Usage: %s [options] []\n" "Options:\n" " -p Run N tests in parallel\n" " -l/-L Only/dont run local tests (no broker needed)\n" " -k/-K Only/dont run tests with known issues\n" " -a Assert on failures\n" " -S Dont show test summary\n" " -V Broker version.\n" #ifndef _MSC_VER " -D Run delete-test-topics.sh between each test (requires -p1)\n" #endif "\n" "Environment variables:\n" " TESTS - substring matched test to run (e.g., 0033)\n" " TEST_KAFKA_VERSION - broker version (e.g., 0.9.0.1)\n" " TEST_LEVEL - Test verbosity level\n" " TEST_MODE - bare, helgrind, valgrind\n" " TEST_SEED - random seed\n" " RDKAFKA_TEST_CONF - test config file (test.conf)\n" " KAFKA_PATH - Path to kafka source dir\n" " ZK_ADDRESS - Zookeeper address\n" "\n", argv[0], argv[i]); exit(1); } } TEST_SAY("Git version: %s\n", test_git_version); if (!strcmp(test_broker_version_str, "trunk")) test_broker_version_str = "0.10.0.0"; /* for now */ d = 0; if (sscanf(test_broker_version_str, "%d.%d.%d.%d", &a, &b, &c, &d) < 3) { printf("%% Expected broker version to be in format " "N.N.N (N=int), not %s\n", test_broker_version_str); exit(1); } test_broker_version = TEST_BRKVER(a, b, c, d); TEST_SAY("Broker version: %s (%d.%d.%d.%d)\n", test_broker_version_str, TEST_BRKVER_X(test_broker_version, 0), TEST_BRKVER_X(test_broker_version, 1), TEST_BRKVER_X(test_broker_version, 2), TEST_BRKVER_X(test_broker_version, 3)); /* Set up fake "
" test for all operations performed in * the main thread rather than the per-test threads. * Nice side effect is that we get timing and status for main as well.*/ test_curr = &tests[0]; test_curr->state = TEST_PASSED; test_curr->start = test_clock(); if (!strcmp(test_mode, "helgrind") || !strcmp(test_mode, "drd")) { TEST_LOCK(); test_timeout_multiplier += 5; TEST_UNLOCK(); } else if (!strcmp(test_mode, "valgrind")) { TEST_LOCK(); test_timeout_multiplier += 3; TEST_UNLOCK(); } /* Broker version 0.9 and api.version.request=true (which is default) * will cause a 10s stall per connection. Instead of fixing * that for each affected API in every test we increase the timeout * multiplier accordingly instead. The typical consume timeout is 5 * seconds, so a multiplier of 3 should be good. */ if ((test_broker_version & 0xffff0000) == 0x00090000) test_timeout_multiplier += 3; test_timeout_multiplier += (double)test_concurrent_max / 3; TEST_SAY("Tests to run: %s\n", tests_to_run ? tests_to_run : "all"); TEST_SAY("Test mode : %s\n", test_mode); TEST_SAY("Test filter : %s\n", (test_flags & TEST_F_LOCAL) ? "local tests only" : "no filter"); TEST_SAY("Test timeout multiplier: %.1f\n", test_timeout_multiplier); TEST_SAY("Action on test failure: %s\n", test_assert_on_fail ? "assert crash" : "continue other tests"); test_timeout_set(20); TIMING_START(&t_all, "ALL-TESTS"); /* Run tests */ run_tests(tests_to_run, argc, argv); TEST_LOCK(); while (tests_running_cnt > 0 && !test_exit) { struct test *test; int64_t now = test_clock(); TEST_SAY("%d test(s) running:", tests_running_cnt); for (test = tests ; test->name ; test++) { if (test->state != TEST_RUNNING) continue; if (test_level >= 2) TEST_SAY0(" %s", test->name); /* Timeout check */ if (now > test->timeout) { struct test *save_test = test_curr; test_curr = test; test->state = TEST_FAILED; test_summary(0/*no-locks*/); TEST_FAIL0(__FILE__,__LINE__,0/*nolock*/, 0/*fail-later*/, "Test %s timed out " "(timeout set to %d seconds)\n", test->name, (int)(test->timeout- test->start)/ 1000000); test_curr = save_test; tests_running_cnt--; /* fail-later misses this*/ #ifdef _MSC_VER TerminateThread(test->thrd, -1); #else pthread_kill(test->thrd, SIGKILL); #endif } } if (test_level >= 2) TEST_SAY0("\n"); TEST_UNLOCK(); rd_sleep(1); TEST_LOCK(); } TIMING_STOP(&t_all); test_curr = &tests[0]; test_curr->duration = test_clock() - test_curr->start; TEST_UNLOCK(); /* Wait for everything to be cleaned up since broker destroys are * handled in its own thread. */ test_wait_exit(0); r = test_summary(1/*lock*/) ? 1 : 0; /* If we havent failed at this point then * there were no threads leaked */ if (r == 0) TEST_SAY("\n============== ALL TESTS PASSED ==============\n"); test_cleanup(); if (r > 0) TEST_FAIL("%d test(s) failed, see previous errors", r); return r; } /****************************************************************************** * * Helpers * ******************************************************************************/ void test_dr_cb (rd_kafka_t *rk, void *payload, size_t len, rd_kafka_resp_err_t err, void *opaque, void *msg_opaque) { int *remainsp = msg_opaque; TEST_SAYL(4, "Delivery report: %s\n", rd_kafka_err2str(err)); if (!test_curr->produce_sync && err != test_curr->exp_dr_err) TEST_FAIL("Message delivery failed: expected %s, got %s\n", rd_kafka_err2str(test_curr->exp_dr_err), rd_kafka_err2str(err)); if (*remainsp == 0) TEST_FAIL("Too many messages delivered (remains %i)", *remainsp); (*remainsp)--; if (test_curr->produce_sync) test_curr->produce_sync_err = err; } rd_kafka_t *test_create_handle (int mode, rd_kafka_conf_t *conf) { rd_kafka_t *rk; char errstr[512]; if (!conf) { test_conf_init(&conf, NULL, 0); #if WITH_SOCKEM if (*test_sockem_conf) test_socket_enable(conf); #endif } else { test_conf_set(conf, "client.id", test_curr->name); } /* Creat kafka instance */ rk = rd_kafka_new(mode, conf, errstr, sizeof(errstr)); if (!rk) TEST_FAIL("Failed to create rdkafka instance: %s\n", errstr); TEST_SAY("Created kafka instance %s\n", rd_kafka_name(rk)); return rk; } rd_kafka_t *test_create_producer (void) { rd_kafka_conf_t *conf; test_conf_init(&conf, NULL, 0); rd_kafka_conf_set_dr_cb(conf, test_dr_cb); return test_create_handle(RD_KAFKA_PRODUCER, conf); } /** * Create topic_t object with va-arg list as key-value config pairs * terminated by NULL. */ rd_kafka_topic_t *test_create_topic_object (rd_kafka_t *rk, const char *topic, ...) { rd_kafka_topic_t *rkt; rd_kafka_topic_conf_t *topic_conf; va_list ap; const char *name, *val; test_conf_init(NULL, &topic_conf, 0); va_start(ap, topic); while ((name = va_arg(ap, const char *)) && (val = va_arg(ap, const char *))) { test_topic_conf_set(topic_conf, name, val); } va_end(ap); rkt = rd_kafka_topic_new(rk, topic, topic_conf); if (!rkt) TEST_FAIL("Failed to create topic: %s\n", rd_kafka_err2str(rd_kafka_last_error())); return rkt; } rd_kafka_topic_t *test_create_producer_topic (rd_kafka_t *rk, const char *topic, ...) { rd_kafka_topic_t *rkt; rd_kafka_topic_conf_t *topic_conf; char errstr[512]; va_list ap; const char *name, *val; test_conf_init(NULL, &topic_conf, 0); va_start(ap, topic); while ((name = va_arg(ap, const char *)) && (val = va_arg(ap, const char *))) { if (rd_kafka_topic_conf_set(topic_conf, name, val, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) TEST_FAIL("Conf failed: %s\n", errstr); } va_end(ap); /* Make sure all replicas are in-sync after producing * so that consume test wont fail. */ rd_kafka_topic_conf_set(topic_conf, "request.required.acks", "-1", errstr, sizeof(errstr)); rkt = rd_kafka_topic_new(rk, topic, topic_conf); if (!rkt) TEST_FAIL("Failed to create topic: %s\n", rd_kafka_err2str(rd_kafka_last_error())); return rkt; } /** * Produces \p cnt messages and returns immediately. * Does not wait for delivery. * \p msgcounterp is incremented for each produced messages and passed * as \p msg_opaque which is later used in test_dr_cb to decrement * the counter on delivery. * * If \p payload is NULL the message key and payload will be formatted * according to standard test format, otherwise the key will be NULL and * payload send as message payload. * * Default message size is 128 bytes, if \p size is non-zero and \p payload * is NULL the message size of \p size will be used. */ void test_produce_msgs_nowait (rd_kafka_t *rk, rd_kafka_topic_t *rkt, uint64_t testid, int32_t partition, int msg_base, int cnt, const char *payload, size_t size, int *msgcounterp) { int msg_id; test_timing_t t_all, t_poll; char key[128]; void *buf; int64_t tot_bytes = 0; int64_t tot_time_poll = 0; if (payload) buf = (void *)payload; else { if (size == 0) size = 128; buf = calloc(1, size); } TEST_SAY("Produce to %s [%"PRId32"]: messages #%d..%d\n", rd_kafka_topic_name(rkt), partition, msg_base, msg_base+cnt); TIMING_START(&t_all, "PRODUCE"); TIMING_START(&t_poll, "SUM(POLL)"); for (msg_id = msg_base ; msg_id < msg_base + cnt ; msg_id++) { if (!payload) test_prepare_msg(testid, partition, msg_id, buf, size, key, sizeof(key)); if (rd_kafka_produce(rkt, partition, RD_KAFKA_MSG_F_COPY, buf, size, !payload ? key : NULL, !payload ? strlen(key) : 0, msgcounterp) == -1) TEST_FAIL("Failed to produce message %i " "to partition %i: %s", msg_id, (int)partition, rd_kafka_err2str(rd_kafka_last_error())); (*msgcounterp)++; tot_bytes += size; TIMING_RESTART(&t_poll); rd_kafka_poll(rk, 0); tot_time_poll = TIMING_DURATION(&t_poll); if (TIMING_EVERY(&t_all, 3*1000000)) TEST_SAY("produced %3d%%: %d/%d messages " "(%d msgs/s, %d bytes/s)\n", ((msg_id - msg_base) * 100) / cnt, msg_id - msg_base, cnt, (int)((msg_id - msg_base) / (TIMING_DURATION(&t_all) / 1000000)), (int)((tot_bytes) / (TIMING_DURATION(&t_all) / 1000000))); } if (!payload) free(buf); t_poll.duration = tot_time_poll; TIMING_STOP(&t_poll); TIMING_STOP(&t_all); } /** * Waits for the messages tracked by counter \p msgcounterp to be delivered. */ void test_wait_delivery (rd_kafka_t *rk, int *msgcounterp) { test_timing_t t_all; int start_cnt = *msgcounterp; TIMING_START(&t_all, "PRODUCE.DELIVERY.WAIT"); /* Wait for messages to be delivered */ while (*msgcounterp > 0 && rd_kafka_outq_len(rk) > 0) { rd_kafka_poll(rk, 10); if (TIMING_EVERY(&t_all, 3*1000000)) { int delivered = start_cnt - *msgcounterp; TEST_SAY("wait_delivery: " "%d/%d messages delivered: %d msgs/s\n", delivered, start_cnt, (int)(delivered / (TIMING_DURATION(&t_all) / 1000000))); } } TIMING_STOP(&t_all); TEST_ASSERT(*msgcounterp == 0, "Not all messages delivered: msgcounter still at %d, " "outq_len %d", *msgcounterp, rd_kafka_outq_len(rk)); } /** * Produces \p cnt messages and waits for succesful delivery */ void test_produce_msgs (rd_kafka_t *rk, rd_kafka_topic_t *rkt, uint64_t testid, int32_t partition, int msg_base, int cnt, const char *payload, size_t size) { int remains = 0; test_produce_msgs_nowait(rk, rkt, testid, partition, msg_base, cnt, payload, size, &remains); test_wait_delivery(rk, &remains); } /** * Create producer, produce \p msgcnt messages to \p topic \p partition, * destroy consumer, and returns the used testid. */ uint64_t test_produce_msgs_easy_size (const char *topic, uint64_t testid, int32_t partition, int msgcnt, size_t size) { rd_kafka_t *rk; rd_kafka_topic_t *rkt; test_timing_t t_produce; if (!testid) testid = test_id_generate(); rk = test_create_producer(); rkt = test_create_producer_topic(rk, topic, NULL); TIMING_START(&t_produce, "PRODUCE"); test_produce_msgs(rk, rkt, testid, partition, 0, msgcnt, NULL, size); TIMING_STOP(&t_produce); rd_kafka_topic_destroy(rkt); rd_kafka_destroy(rk); return testid; } rd_kafka_resp_err_t test_produce_sync (rd_kafka_t *rk, rd_kafka_topic_t *rkt, uint64_t testid, int32_t partition) { test_curr->produce_sync = 1; test_produce_msgs(rk, rkt, testid, partition, 0, 1, NULL, 0); test_curr->produce_sync = 0; return test_curr->produce_sync_err; } rd_kafka_t *test_create_consumer (const char *group_id, void (*rebalance_cb) ( rd_kafka_t *rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t *partitions, void *opaque), rd_kafka_conf_t *conf, rd_kafka_topic_conf_t *default_topic_conf) { rd_kafka_t *rk; char tmp[64]; if (!conf) test_conf_init(&conf, NULL, 0); if (group_id) { test_conf_set(conf, "group.id", group_id); rd_snprintf(tmp, sizeof(tmp), "%d", test_session_timeout_ms); test_conf_set(conf, "session.timeout.ms", tmp); if (rebalance_cb) rd_kafka_conf_set_rebalance_cb(conf, rebalance_cb); } else { TEST_ASSERT(!rebalance_cb); } if (default_topic_conf) rd_kafka_conf_set_default_topic_conf(conf, default_topic_conf); /* Create kafka instance */ rk = test_create_handle(RD_KAFKA_CONSUMER, conf); if (group_id) rd_kafka_poll_set_consumer(rk); return rk; } rd_kafka_topic_t *test_create_consumer_topic (rd_kafka_t *rk, const char *topic) { rd_kafka_topic_t *rkt; rd_kafka_topic_conf_t *topic_conf; test_conf_init(NULL, &topic_conf, 0); rkt = rd_kafka_topic_new(rk, topic, topic_conf); if (!rkt) TEST_FAIL("Failed to create topic: %s\n", rd_kafka_err2str(rd_kafka_last_error())); return rkt; } void test_consumer_start (const char *what, rd_kafka_topic_t *rkt, int32_t partition, int64_t start_offset) { TEST_SAY("%s: consumer_start: %s [%"PRId32"] at offset %"PRId64"\n", what, rd_kafka_topic_name(rkt), partition, start_offset); if (rd_kafka_consume_start(rkt, partition, start_offset) == -1) TEST_FAIL("%s: consume_start failed: %s\n", what, rd_kafka_err2str(rd_kafka_last_error())); } void test_consumer_stop (const char *what, rd_kafka_topic_t *rkt, int32_t partition) { TEST_SAY("%s: consumer_stop: %s [%"PRId32"]\n", what, rd_kafka_topic_name(rkt), partition); if (rd_kafka_consume_stop(rkt, partition) == -1) TEST_FAIL("%s: consume_stop failed: %s\n", what, rd_kafka_err2str(rd_kafka_last_error())); } void test_consumer_seek (const char *what, rd_kafka_topic_t *rkt, int32_t partition, int64_t offset) { int err; TEST_SAY("%s: consumer_seek: %s [%"PRId32"] to offset %"PRId64"\n", what, rd_kafka_topic_name(rkt), partition, offset); if ((err = rd_kafka_seek(rkt, partition, offset, 2000))) TEST_FAIL("%s: consume_seek(%s, %"PRId32", %"PRId64") " "failed: %s\n", what, rd_kafka_topic_name(rkt), partition, offset, rd_kafka_err2str(err)); } /** * Returns offset of the last message consumed */ int64_t test_consume_msgs (const char *what, rd_kafka_topic_t *rkt, uint64_t testid, int32_t partition, int64_t offset, int exp_msg_base, int exp_cnt, int parse_fmt) { int cnt = 0; int msg_next = exp_msg_base; int fails = 0; int64_t offset_last = -1; int64_t tot_bytes = 0; test_timing_t t_first, t_all; TEST_SAY("%s: consume_msgs: %s [%"PRId32"]: expect msg #%d..%d " "at offset %"PRId64"\n", what, rd_kafka_topic_name(rkt), partition, exp_msg_base, exp_msg_base+exp_cnt, offset); if (offset != TEST_NO_SEEK) { rd_kafka_resp_err_t err; test_timing_t t_seek; TIMING_START(&t_seek, "SEEK"); if ((err = rd_kafka_seek(rkt, partition, offset, 5000))) TEST_FAIL("%s: consume_msgs: %s [%"PRId32"]: " "seek to %"PRId64" failed: %s\n", what, rd_kafka_topic_name(rkt), partition, offset, rd_kafka_err2str(err)); TIMING_STOP(&t_seek); TEST_SAY("%s: seeked to offset %"PRId64"\n", what, offset); } TIMING_START(&t_first, "FIRST MSG"); TIMING_START(&t_all, "ALL MSGS"); while (cnt < exp_cnt) { rd_kafka_message_t *rkmessage; int msg_id; rkmessage = rd_kafka_consume(rkt, partition, tmout_multip(5000)); if (TIMING_EVERY(&t_all, 3*1000000)) TEST_SAY("%s: " "consumed %3d%%: %d/%d messages " "(%d msgs/s, %d bytes/s)\n", what, cnt * 100 / exp_cnt, cnt, exp_cnt, (int)(cnt / (TIMING_DURATION(&t_all) / 1000000)), (int)(tot_bytes / (TIMING_DURATION(&t_all) / 1000000))); if (!rkmessage) TEST_FAIL("%s: consume_msgs: %s [%"PRId32"]: " "expected msg #%d (%d/%d): timed out\n", what, rd_kafka_topic_name(rkt), partition, msg_next, cnt, exp_cnt); if (rkmessage->err) TEST_FAIL("%s: consume_msgs: %s [%"PRId32"]: " "expected msg #%d (%d/%d): got error: %s\n", what, rd_kafka_topic_name(rkt), partition, msg_next, cnt, exp_cnt, rd_kafka_err2str(rkmessage->err)); if (cnt == 0) TIMING_STOP(&t_first); if (parse_fmt) test_msg_parse(testid, rkmessage, partition, &msg_id); else msg_id = 0; if (test_level >= 3) TEST_SAY("%s: consume_msgs: %s [%"PRId32"]: " "got msg #%d at offset %"PRId64 " (expect #%d at offset %"PRId64")\n", what, rd_kafka_topic_name(rkt), partition, msg_id, rkmessage->offset, msg_next, offset >= 0 ? offset + cnt : -1); if (parse_fmt && msg_id != msg_next) { TEST_SAY("%s: consume_msgs: %s [%"PRId32"]: " "expected msg #%d (%d/%d): got msg #%d\n", what, rd_kafka_topic_name(rkt), partition, msg_next, cnt, exp_cnt, msg_id); fails++; } cnt++; tot_bytes += rkmessage->len; msg_next++; offset_last = rkmessage->offset; rd_kafka_message_destroy(rkmessage); } TIMING_STOP(&t_all); if (fails) TEST_FAIL("%s: consume_msgs: %s [%"PRId32"]: %d failures\n", what, rd_kafka_topic_name(rkt), partition, fails); TEST_SAY("%s: consume_msgs: %s [%"PRId32"]: " "%d/%d messages consumed succesfully\n", what, rd_kafka_topic_name(rkt), partition, cnt, exp_cnt); return offset_last; } /** * Create high-level consumer subscribing to \p topic from BEGINNING * and expects \d exp_msgcnt with matching \p testid * Destroys consumer when done. * * If \p group_id is NULL a new unique group is generated */ void test_consume_msgs_easy_mv (const char *group_id, const char *topic, uint64_t testid, int exp_eofcnt, int exp_msgcnt, rd_kafka_topic_conf_t *tconf, test_msgver_t *mv) { rd_kafka_t *rk; char grpid0[64]; if (!tconf) test_conf_init(NULL, &tconf, 0); if (!group_id) group_id = test_str_id_generate(grpid0, sizeof(grpid0)); test_topic_conf_set(tconf, "auto.offset.reset", "smallest"); rk = test_create_consumer(group_id, NULL, NULL, tconf); rd_kafka_poll_set_consumer(rk); TEST_SAY("Subscribing to topic %s in group %s " "(expecting %d msgs with testid %"PRIu64")\n", topic, group_id, exp_msgcnt, testid); test_consumer_subscribe(rk, topic); /* Consume messages */ test_consumer_poll("consume.easy", rk, testid, exp_eofcnt, -1, exp_msgcnt, mv); test_consumer_close(rk); rd_kafka_destroy(rk); } void test_consume_msgs_easy (const char *group_id, const char *topic, uint64_t testid, int exp_eofcnt, int exp_msgcnt, rd_kafka_topic_conf_t *tconf) { test_msgver_t mv; test_msgver_init(&mv, testid); test_consume_msgs_easy_mv(group_id, topic, testid, exp_eofcnt, exp_msgcnt, tconf, &mv); test_msgver_clear(&mv); } /** * @brief Start subscribing for 'topic' */ void test_consumer_subscribe (rd_kafka_t *rk, const char *topic) { rd_kafka_topic_partition_list_t *topics; rd_kafka_resp_err_t err; topics = rd_kafka_topic_partition_list_new(1); rd_kafka_topic_partition_list_add(topics, topic, RD_KAFKA_PARTITION_UA); err = rd_kafka_subscribe(rk, topics); if (err) TEST_FAIL("Failed to subscribe to %s: %s\n", topic, rd_kafka_err2str(err)); rd_kafka_topic_partition_list_destroy(topics); } void test_consumer_assign (const char *what, rd_kafka_t *rk, rd_kafka_topic_partition_list_t *partitions) { rd_kafka_resp_err_t err; test_timing_t timing; TIMING_START(&timing, "ASSIGN.PARTITIONS"); err = rd_kafka_assign(rk, partitions); TIMING_STOP(&timing); if (err) TEST_FAIL("%s: failed to assign %d partition(s): %s\n", what, partitions->cnt, rd_kafka_err2str(err)); else TEST_SAY("%s: assigned %d partition(s)\n", what, partitions->cnt); } void test_consumer_unassign (const char *what, rd_kafka_t *rk) { rd_kafka_resp_err_t err; test_timing_t timing; TIMING_START(&timing, "UNASSIGN.PARTITIONS"); err = rd_kafka_assign(rk, NULL); TIMING_STOP(&timing); if (err) TEST_FAIL("%s: failed to unassign current partitions: %s\n", what, rd_kafka_err2str(err)); else TEST_SAY("%s: unassigned current partitions\n", what); } /** * Message verification services * */ void test_msgver_init (test_msgver_t *mv, uint64_t testid) { memset(mv, 0, sizeof(*mv)); mv->testid = testid; /* Max warning logs before suppressing. */ mv->log_max = (test_level + 1) * 100; } #define TEST_MV_WARN(mv,...) do { \ if ((mv)->log_cnt++ > (mv)->log_max) \ (mv)->log_suppr_cnt++; \ else \ TEST_WARN(__VA_ARGS__); \ } while (0) static void test_mv_mvec_grow (struct test_mv_mvec *mvec, int tot_size) { if (tot_size <= mvec->size) return; mvec->size = tot_size; mvec->m = realloc(mvec->m, sizeof(*mvec->m) * mvec->size); } /** * Make sure there is room for at least \p cnt messages, else grow mvec. */ static void test_mv_mvec_reserve (struct test_mv_mvec *mvec, int cnt) { test_mv_mvec_grow(mvec, mvec->cnt + cnt); } void test_mv_mvec_init (struct test_mv_mvec *mvec, int exp_cnt) { TEST_ASSERT(mvec->m == NULL, "mvec not cleared"); if (!exp_cnt) return; test_mv_mvec_grow(mvec, exp_cnt); } void test_mv_mvec_clear (struct test_mv_mvec *mvec) { if (mvec->m) free(mvec->m); } void test_msgver_clear (test_msgver_t *mv) { int i; for (i = 0 ; i < mv->p_cnt ; i++) { struct test_mv_p *p = mv->p[i]; free(p->topic); test_mv_mvec_clear(&p->mvec); free(p); } free(mv->p); test_msgver_init(mv, mv->testid); } struct test_mv_p *test_msgver_p_get (test_msgver_t *mv, const char *topic, int32_t partition, int do_create) { int i; struct test_mv_p *p; for (i = 0 ; i < mv->p_cnt ; i++) { p = mv->p[i]; if (p->partition == partition && !strcmp(p->topic, topic)) return p; } if (!do_create) TEST_FAIL("Topic %s [%d] not found in msgver", topic, partition); if (mv->p_cnt == mv->p_size) { mv->p_size = (mv->p_size + 4) * 2; mv->p = realloc(mv->p, sizeof(*mv->p) * mv->p_size); } mv->p[mv->p_cnt++] = p = calloc(1, sizeof(*p)); p->topic = rd_strdup(topic); p->partition = partition; p->eof_offset = RD_KAFKA_OFFSET_INVALID; return p; } /** * Add (room for) message to message vector. * Resizes the vector as needed. */ static struct test_mv_m *test_mv_mvec_add (struct test_mv_mvec *mvec) { if (mvec->cnt == mvec->size) { test_mv_mvec_grow(mvec, (mvec->size ? mvec->size * 2 : 10000)); } mvec->cnt++; return &mvec->m[mvec->cnt-1]; } /** * Returns message at index \p mi */ static RD_INLINE struct test_mv_m *test_mv_mvec_get (struct test_mv_mvec *mvec, int mi) { if (mi >= mvec->cnt) return NULL; return &mvec->m[mi]; } /** * Print message list to \p fp */ static RD_UNUSED void test_mv_mvec_dump (FILE *fp, const struct test_mv_mvec *mvec) { int mi; fprintf(fp, "*** Dump mvec with %d messages (capacity %d): ***\n", mvec->cnt, mvec->size); for (mi = 0 ; mi < mvec->cnt ; mi++) fprintf(fp, " msgid %d, offset %"PRId64"\n", mvec->m[mi].msgid, mvec->m[mi].offset); fprintf(fp, "*** Done ***\n"); } static void test_mv_mvec_sort (struct test_mv_mvec *mvec, int (*cmp) (const void *, const void *)) { qsort(mvec->m, mvec->cnt, sizeof(*mvec->m), cmp); } /** * @brief Adds a message to the msgver service. * * @returns 1 if message is from the expected testid, else 0 (not added) */ int test_msgver_add_msg00 (const char *func, int line, test_msgver_t *mv, uint64_t testid, const char *topic, int32_t partition, int64_t offset, int64_t timestamp, rd_kafka_resp_err_t err, int msgnum) { struct test_mv_p *p; struct test_mv_m *m; if (testid != mv->testid) return 0; /* Ignore message */ p = test_msgver_p_get(mv, topic, partition, 1); if (err == RD_KAFKA_RESP_ERR__PARTITION_EOF) { p->eof_offset = offset; return 1; } m = test_mv_mvec_add(&p->mvec); m->offset = offset; m->msgid = msgnum; m->timestamp = timestamp; if (test_level > 2) { TEST_SAY("%s:%d: " "Recv msg %s [%"PRId32"] offset %"PRId64" msgid %d " "timestamp %"PRId64"\n", func, line, p->topic, p->partition, m->offset, m->msgid, m->timestamp); } mv->msgcnt++; return 1; } /** * Adds a message to the msgver service. * * Message must be a proper message or PARTITION_EOF. * * @returns 1 if message is from the expected testid, else 0 (not added). */ int test_msgver_add_msg0 (const char *func, int line, test_msgver_t *mv, rd_kafka_message_t *rkmessage) { uint64_t in_testid; int in_part; int in_msgnum = -1; char buf[128]; const void *val; size_t valsize; if (mv->fwd) test_msgver_add_msg(mv->fwd, rkmessage); if (rkmessage->err) { if (rkmessage->err != RD_KAFKA_RESP_ERR__PARTITION_EOF) return 0; /* Ignore error */ in_testid = mv->testid; } else { if (!mv->msgid_hdr) { rd_snprintf(buf, sizeof(buf), "%.*s", (int)rkmessage->len, (char *)rkmessage->payload); val = buf; } else { /* msgid is in message header */ rd_kafka_headers_t *hdrs; if (rd_kafka_message_headers(rkmessage, &hdrs) || rd_kafka_header_get_last(hdrs, mv->msgid_hdr, &val, &valsize)) { TEST_SAYL(3, "%s:%d: msgid expected in header %s " "but %s exists for " "message at offset %"PRId64 " has no headers", func, line, mv->msgid_hdr, hdrs ? "no such header" : "no headers", rkmessage->offset); return 0; } } if (sscanf(val, "testid=%"SCNu64", partition=%i, msg=%i\n", &in_testid, &in_part, &in_msgnum) != 3) TEST_FAIL("%s:%d: Incorrect format at offset %"PRId64 ": %s", func, line, rkmessage->offset, (const char *)val); } return test_msgver_add_msg00(func, line, mv, in_testid, rd_kafka_topic_name(rkmessage->rkt), rkmessage->partition, rkmessage->offset, rd_kafka_message_timestamp(rkmessage, NULL), rkmessage->err, in_msgnum); return 1; } /** * Verify that all messages were received in order. * * - Offsets need to occur without gaps * - msgids need to be increasing: but may have gaps, e.g., using partitioner) */ static int test_mv_mvec_verify_order (test_msgver_t *mv, int flags, struct test_mv_p *p, struct test_mv_mvec *mvec, struct test_mv_vs *vs) { int mi; int fails = 0; for (mi = 1/*skip first*/ ; mi < mvec->cnt ; mi++) { struct test_mv_m *prev = test_mv_mvec_get(mvec, mi-1); struct test_mv_m *this = test_mv_mvec_get(mvec, mi); if (((flags & TEST_MSGVER_BY_OFFSET) && prev->offset + 1 != this->offset) || ((flags & TEST_MSGVER_BY_MSGID) && prev->msgid > this->msgid)) { TEST_MV_WARN( mv, " %s [%"PRId32"] msg rcvidx #%d/%d: " "out of order (prev vs this): " "offset %"PRId64" vs %"PRId64", " "msgid %d vs %d\n", p ? p->topic : "*", p ? p->partition : -1, mi, mvec->cnt, prev->offset, this->offset, prev->msgid, this->msgid); fails++; } } return fails; } /** * @brief Verify that messages correspond to 'correct' msgver. */ static int test_mv_mvec_verify_corr (test_msgver_t *mv, int flags, struct test_mv_p *p, struct test_mv_mvec *mvec, struct test_mv_vs *vs) { int mi; int fails = 0; struct test_mv_p *corr_p = NULL; struct test_mv_mvec *corr_mvec; TEST_ASSERT(vs->corr); /* Get correct mvec for comparison. */ if (p) corr_p = test_msgver_p_get(vs->corr, p->topic, p->partition, 0); if (!corr_p) { TEST_MV_WARN(mv, " %s [%"PRId32"]: " "no corresponding correct partition found\n", p ? p->topic : "*", p ? p->partition : -1); return 1; } corr_mvec = &corr_p->mvec; for (mi = 0 ; mi < mvec->cnt ; mi++) { struct test_mv_m *this = test_mv_mvec_get(mvec, mi); const struct test_mv_m *corr = test_mv_mvec_get(corr_mvec, mi); if (0) TEST_MV_WARN(mv, "msg #%d: msgid %d, offset %"PRId64"\n", mi, this->msgid, this->offset); if (!corr) { TEST_MV_WARN( mv, " %s [%"PRId32"] msg rcvidx #%d/%d: " "out of range: correct mvec has %d messages: " "message offset %"PRId64", msgid %d\n", p ? p->topic : "*", p ? p->partition : -1, mi, mvec->cnt, corr_mvec->cnt, this->offset, this->msgid); fails++; continue; } if (((flags & TEST_MSGVER_BY_OFFSET) && this->offset != corr->offset) || ((flags & TEST_MSGVER_BY_MSGID) && this->msgid != corr->msgid) || ((flags & TEST_MSGVER_BY_TIMESTAMP) && this->timestamp != corr->timestamp)) { TEST_MV_WARN( mv, " %s [%"PRId32"] msg rcvidx #%d/%d: " "did not match correct msg: " "offset %"PRId64" vs %"PRId64", " "msgid %d vs %d, " "timestamp %"PRId64" vs %"PRId64" (fl 0x%x)\n", p ? p->topic : "*", p ? p->partition : -1, mi, mvec->cnt, this->offset, corr->offset, this->msgid, corr->msgid, this->timestamp, corr->timestamp, flags); fails++; } } return fails; } static int test_mv_m_cmp_offset (const void *_a, const void *_b) { const struct test_mv_m *a = _a, *b = _b; return (int)(a->offset - b->offset); } static int test_mv_m_cmp_msgid (const void *_a, const void *_b) { const struct test_mv_m *a = _a, *b = _b; return a->msgid - b->msgid; } /** * Verify that there are no duplicate message. * * - Offsets are checked * - msgids are checked * * * NOTE: This sorts the message (.m) array, first by offset, then by msgid * and leaves the message array sorted (by msgid) */ static int test_mv_mvec_verify_dup (test_msgver_t *mv, int flags, struct test_mv_p *p, struct test_mv_mvec *mvec, struct test_mv_vs *vs) { int mi; int fails = 0; enum { _P_OFFSET, _P_MSGID } pass; for (pass = _P_OFFSET ; pass <= _P_MSGID ; pass++) { if (pass == _P_OFFSET) { if (!(flags & TEST_MSGVER_BY_OFFSET)) continue; test_mv_mvec_sort(mvec, test_mv_m_cmp_offset); } else if (pass == _P_MSGID) { if (!(flags & TEST_MSGVER_BY_MSGID)) continue; test_mv_mvec_sort(mvec, test_mv_m_cmp_msgid); } for (mi = 1/*skip first*/ ; mi < mvec->cnt ; mi++) { struct test_mv_m *prev = test_mv_mvec_get(mvec, mi-1); struct test_mv_m *this = test_mv_mvec_get(mvec, mi); int is_dup = 0; if (pass == _P_OFFSET) is_dup = prev->offset == this->offset; else if (pass == _P_MSGID) is_dup = prev->msgid == this->msgid; if (!is_dup) continue; TEST_MV_WARN(mv, " %s [%"PRId32"] " "duplicate msg (prev vs this): " "offset %"PRId64" vs %"PRId64", " "msgid %d vs %d\n", p ? p->topic : "*", p ? p->partition : -1, prev->offset, this->offset, prev->msgid, this->msgid); fails++; } } return fails; } /** * Verify that \p mvec contains the expected range: * - TEST_MSGVER_BY_MSGID: msgid within \p vs->msgid_min .. \p vs->msgid_max * - TEST_MSGVER_BY_TIMESTAMP: timestamp with \p vs->timestamp_min .. _max * * * NOTE: TEST_MSGVER_BY_MSGID is required * * * NOTE: This sorts the message (.m) array by msgid * and leaves the message array sorted (by msgid) */ static int test_mv_mvec_verify_range (test_msgver_t *mv, int flags, struct test_mv_p *p, struct test_mv_mvec *mvec, struct test_mv_vs *vs) { int mi; int fails = 0; int cnt = 0; int exp_cnt = vs->msgid_max - vs->msgid_min + 1; int skip_cnt = 0; if (!(flags & TEST_MSGVER_BY_MSGID)) return 0; test_mv_mvec_sort(mvec, test_mv_m_cmp_msgid); //test_mv_mvec_dump(stdout, mvec); for (mi = 0 ; mi < mvec->cnt ; mi++) { struct test_mv_m *prev = mi ? test_mv_mvec_get(mvec, mi-1):NULL; struct test_mv_m *this = test_mv_mvec_get(mvec, mi); if (this->msgid < vs->msgid_min) { skip_cnt++; continue; } else if (this->msgid > vs->msgid_max) break; if (flags & TEST_MSGVER_BY_TIMESTAMP) { if (this->timestamp < vs->timestamp_min || this->timestamp > vs->timestamp_max) { TEST_MV_WARN( mv, " %s [%"PRId32"] range check: " "msgid #%d (at mi %d): " "timestamp %"PRId64" outside " "expected range %"PRId64"..%"PRId64"\n", p ? p->topic : "*", p ? p->partition : -1, this->msgid, mi, this->timestamp, vs->timestamp_min, vs->timestamp_max); fails++; } } if (cnt++ == 0) { if (this->msgid != vs->msgid_min) { TEST_MV_WARN(mv, " %s [%"PRId32"] range check: " "first message #%d (at mi %d) " "is not first in " "expected range %d..%d\n", p ? p->topic : "*", p ? p->partition : -1, this->msgid, mi, vs->msgid_min, vs->msgid_max); fails++; } } else if (cnt > exp_cnt) { TEST_MV_WARN(mv, " %s [%"PRId32"] range check: " "too many messages received (%d/%d) at " "msgid %d for expected range %d..%d\n", p ? p->topic : "*", p ? p->partition : -1, cnt, exp_cnt, this->msgid, vs->msgid_min, vs->msgid_max); fails++; } if (!prev) { skip_cnt++; continue; } if (prev->msgid + 1 != this->msgid) { TEST_MV_WARN(mv, " %s [%"PRId32"] range check: " " %d message(s) missing between " "msgid %d..%d in expected range %d..%d\n", p ? p->topic : "*", p ? p->partition : -1, this->msgid - prev->msgid - 1, prev->msgid+1, this->msgid-1, vs->msgid_min, vs->msgid_max); fails++; } } if (cnt != exp_cnt) { TEST_MV_WARN(mv, " %s [%"PRId32"] range check: " " wrong number of messages seen, wanted %d got %d " "in expected range %d..%d (%d messages skipped)\n", p ? p->topic : "*", p ? p->partition : -1, exp_cnt, cnt, vs->msgid_min, vs->msgid_max, skip_cnt); fails++; } return fails; } /** * Run verifier \p f for all partitions. */ #define test_mv_p_verify_f(mv,flags,f,vs) \ test_mv_p_verify_f0(mv,flags,f, # f, vs) static int test_mv_p_verify_f0 (test_msgver_t *mv, int flags, int (*f) (test_msgver_t *mv, int flags, struct test_mv_p *p, struct test_mv_mvec *mvec, struct test_mv_vs *vs), const char *f_name, struct test_mv_vs *vs) { int i; int fails = 0; for (i = 0 ; i < mv->p_cnt ; i++) { TEST_SAY("Verifying %s [%"PRId32"] %d msgs with %s\n", mv->p[i]->topic, mv->p[i]->partition, mv->p[i]->mvec.cnt, f_name); fails += f(mv, flags, mv->p[i], &mv->p[i]->mvec, vs); } return fails; } /** * Collect all messages from all topics and partitions into vs->mvec */ static void test_mv_collect_all_msgs (test_msgver_t *mv, struct test_mv_vs *vs) { int i; for (i = 0 ; i < mv->p_cnt ; i++) { struct test_mv_p *p = mv->p[i]; int mi; test_mv_mvec_reserve(&vs->mvec, p->mvec.cnt); for (mi = 0 ; mi < p->mvec.cnt ; mi++) { struct test_mv_m *m = test_mv_mvec_get(&p->mvec, mi); struct test_mv_m *m_new = test_mv_mvec_add(&vs->mvec); *m_new = *m; } } } /** * Verify that all messages (by msgid) in range msg_base+exp_cnt were received * and received only once. * This works across all partitions. */ static int test_msgver_verify_range (test_msgver_t *mv, int flags, struct test_mv_vs *vs) { int fails = 0; /** * Create temporary array to hold expected message set, * then traverse all topics and partitions and move matching messages * to that set. Then verify the message set. */ test_mv_mvec_init(&vs->mvec, vs->exp_cnt); /* Collect all msgs into vs mvec */ test_mv_collect_all_msgs(mv, vs); fails += test_mv_mvec_verify_range(mv, TEST_MSGVER_BY_MSGID|flags, NULL, &vs->mvec, vs); fails += test_mv_mvec_verify_dup(mv, TEST_MSGVER_BY_MSGID|flags, NULL, &vs->mvec, vs); test_mv_mvec_clear(&vs->mvec); return fails; } /** * Verify that \p exp_cnt messages were received for \p topic and \p partition * starting at msgid base \p msg_base. */ int test_msgver_verify_part0 (const char *func, int line, const char *what, test_msgver_t *mv, int flags, const char *topic, int partition, int msg_base, int exp_cnt) { int fails = 0; struct test_mv_vs vs = { .msg_base = msg_base, .exp_cnt = exp_cnt }; struct test_mv_p *p; TEST_SAY("%s:%d: %s: Verifying %d received messages (flags 0x%x) " "in %s [%d]: expecting msgids %d..%d (%d)\n", func, line, what, mv->msgcnt, flags, topic, partition, msg_base, msg_base+exp_cnt, exp_cnt); p = test_msgver_p_get(mv, topic, partition, 0); /* Per-partition checks */ if (flags & TEST_MSGVER_ORDER) fails += test_mv_mvec_verify_order(mv, flags, p, &p->mvec, &vs); if (flags & TEST_MSGVER_DUP) fails += test_mv_mvec_verify_dup(mv, flags, p, &p->mvec, &vs); if (mv->msgcnt < vs.exp_cnt) { TEST_MV_WARN(mv, "%s:%d: " "%s [%"PRId32"] expected %d messages but only " "%d received\n", func, line, p ? p->topic : "*", p ? p->partition : -1, vs.exp_cnt, mv->msgcnt); fails++; } if (mv->log_suppr_cnt > 0) TEST_WARN("%s:%d: %s: %d message warning logs suppressed\n", func, line, what, mv->log_suppr_cnt); if (fails) TEST_FAIL("%s:%d: %s: Verification of %d received messages " "failed: " "expected msgids %d..%d (%d): see previous errors\n", func, line, what, mv->msgcnt, msg_base, msg_base+exp_cnt, exp_cnt); else TEST_SAY("%s:%d: %s: Verification of %d received messages " "succeeded: " "expected msgids %d..%d (%d)\n", func, line, what, mv->msgcnt, msg_base, msg_base+exp_cnt, exp_cnt); return fails; } /** * Verify that \p exp_cnt messages were received starting at * msgid base \p msg_base. */ int test_msgver_verify0 (const char *func, int line, const char *what, test_msgver_t *mv, int flags, struct test_mv_vs vs) { int fails = 0; TEST_SAY("%s:%d: %s: Verifying %d received messages (flags 0x%x): " "expecting msgids %d..%d (%d)\n", func, line, what, mv->msgcnt, flags, vs.msg_base, vs.msg_base+vs.exp_cnt, vs.exp_cnt); if (flags & TEST_MSGVER_BY_TIMESTAMP) { assert((flags & TEST_MSGVER_BY_MSGID)); /* Required */ TEST_SAY("%s:%d: %s: " " and expecting timestamps %"PRId64"..%"PRId64"\n", func, line, what, vs.timestamp_min, vs.timestamp_max); } /* Per-partition checks */ if (flags & TEST_MSGVER_ORDER) fails += test_mv_p_verify_f(mv, flags, test_mv_mvec_verify_order, &vs); if (flags & TEST_MSGVER_DUP) fails += test_mv_p_verify_f(mv, flags, test_mv_mvec_verify_dup, &vs); /* Checks across all partitions */ if ((flags & TEST_MSGVER_RANGE) && vs.exp_cnt > 0) { vs.msgid_min = vs.msg_base; vs.msgid_max = vs.msgid_min + vs.exp_cnt - 1; fails += test_msgver_verify_range(mv, flags, &vs); } if (mv->log_suppr_cnt > 0) TEST_WARN("%s:%d: %s: %d message warning logs suppressed\n", func, line, what, mv->log_suppr_cnt); if (vs.exp_cnt != mv->msgcnt) { TEST_WARN("%s:%d: %s: expected %d messages, got %d\n", func, line, what, vs.exp_cnt, mv->msgcnt); fails++; } if (fails) TEST_FAIL("%s:%d: %s: Verification of %d received messages " "failed: " "expected msgids %d..%d (%d): see previous errors\n", func, line, what, mv->msgcnt, vs.msg_base, vs.msg_base+vs.exp_cnt, vs.exp_cnt); else TEST_SAY("%s:%d: %s: Verification of %d received messages " "succeeded: " "expected msgids %d..%d (%d)\n", func, line, what, mv->msgcnt, vs.msg_base, vs.msg_base+vs.exp_cnt, vs.exp_cnt); return fails; } void test_verify_rkmessage0 (const char *func, int line, rd_kafka_message_t *rkmessage, uint64_t testid, int32_t partition, int msgnum) { uint64_t in_testid; int in_part; int in_msgnum; char buf[128]; rd_snprintf(buf, sizeof(buf), "%.*s", (int)rkmessage->len, (char *)rkmessage->payload); if (sscanf(buf, "testid=%"SCNu64", partition=%i, msg=%i\n", &in_testid, &in_part, &in_msgnum) != 3) TEST_FAIL("Incorrect format: %s", buf); if (testid != in_testid || (partition != -1 && partition != in_part) || (msgnum != -1 && msgnum != in_msgnum) || in_msgnum < 0) goto fail_match; if (test_level > 2) { TEST_SAY("%s:%i: Our testid %"PRIu64", part %i (%i), msg %i\n", func, line, testid, (int)partition, (int)rkmessage->partition, msgnum); } return; fail_match: TEST_FAIL("%s:%i: Our testid %"PRIu64", part %i, msg %i did " "not match message: \"%s\"\n", func, line, testid, (int)partition, msgnum, buf); } /** * @brief Verify that \p mv is identical to \p corr according to flags. */ void test_msgver_verify_compare0 (const char *func, int line, const char *what, test_msgver_t *mv, test_msgver_t *corr, int flags) { struct test_mv_vs vs; int fails = 0; memset(&vs, 0, sizeof(vs)); TEST_SAY("%s:%d: %s: Verifying %d received messages (flags 0x%x) by " "comparison to correct msgver (%d messages)\n", func, line, what, mv->msgcnt, flags, corr->msgcnt); vs.corr = corr; /* Per-partition checks */ fails += test_mv_p_verify_f(mv, flags, test_mv_mvec_verify_corr, &vs); if (mv->log_suppr_cnt > 0) TEST_WARN("%s:%d: %s: %d message warning logs suppressed\n", func, line, what, mv->log_suppr_cnt); if (corr->msgcnt != mv->msgcnt) { TEST_WARN("%s:%d: %s: expected %d messages, got %d\n", func, line, what, corr->msgcnt, mv->msgcnt); fails++; } if (fails) TEST_FAIL("%s:%d: %s: Verification of %d received messages " "failed: expected %d messages: see previous errors\n", func, line, what, mv->msgcnt, corr->msgcnt); else TEST_SAY("%s:%d: %s: Verification of %d received messages " "succeeded: matching %d messages from correct msgver\n", func, line, what, mv->msgcnt, corr->msgcnt); } /** * Consumer poll but dont expect any proper messages for \p timeout_ms. */ void test_consumer_poll_no_msgs (const char *what, rd_kafka_t *rk, uint64_t testid, int timeout_ms) { int64_t tmout = test_clock() + timeout_ms * 1000; int cnt = 0; test_timing_t t_cons; test_msgver_t mv; test_msgver_init(&mv, testid); TEST_SAY("%s: not expecting any messages for %dms\n", what, timeout_ms); TIMING_START(&t_cons, "CONSUME"); do { rd_kafka_message_t *rkmessage; rkmessage = rd_kafka_consumer_poll(rk, timeout_ms); if (!rkmessage) continue; if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) { TEST_SAY("%s [%"PRId32"] reached EOF at " "offset %"PRId64"\n", rd_kafka_topic_name(rkmessage->rkt), rkmessage->partition, rkmessage->offset); test_msgver_add_msg(&mv, rkmessage); } else if (rkmessage->err) { TEST_FAIL("%s [%"PRId32"] error (offset %"PRId64 "): %s", rkmessage->rkt ? rd_kafka_topic_name(rkmessage->rkt) : "(no-topic)", rkmessage->partition, rkmessage->offset, rd_kafka_message_errstr(rkmessage)); } else { if (test_msgver_add_msg(&mv, rkmessage)) { TEST_MV_WARN(&mv, "Received unexpected message on " "%s [%"PRId32"] at offset " "%"PRId64"\n", rd_kafka_topic_name(rkmessage-> rkt), rkmessage->partition, rkmessage->offset); cnt++; } } rd_kafka_message_destroy(rkmessage); } while (test_clock() <= tmout); TIMING_STOP(&t_cons); test_msgver_verify(what, &mv, TEST_MSGVER_ALL, 0, 0); test_msgver_clear(&mv); TEST_ASSERT(cnt == 0, "Expected 0 messages, got %d", cnt); } /** * Call consumer poll once and then return. * Messages are handled. * * \p mv is optional * * @returns 0 on timeout, 1 if a message was received or .._PARTITION_EOF * if EOF was reached. * TEST_FAIL()s on all errors. */ int test_consumer_poll_once (rd_kafka_t *rk, test_msgver_t *mv, int timeout_ms){ rd_kafka_message_t *rkmessage; rkmessage = rd_kafka_consumer_poll(rk, timeout_ms); if (!rkmessage) return 0; if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) { TEST_SAY("%s [%"PRId32"] reached EOF at " "offset %"PRId64"\n", rd_kafka_topic_name(rkmessage->rkt), rkmessage->partition, rkmessage->offset); if (mv) test_msgver_add_msg(mv, rkmessage); rd_kafka_message_destroy(rkmessage); return RD_KAFKA_RESP_ERR__PARTITION_EOF; } else if (rkmessage->err) { TEST_FAIL("%s [%"PRId32"] error (offset %"PRId64 "): %s", rkmessage->rkt ? rd_kafka_topic_name(rkmessage->rkt) : "(no-topic)", rkmessage->partition, rkmessage->offset, rd_kafka_message_errstr(rkmessage)); } else { if (mv) test_msgver_add_msg(mv, rkmessage); } rd_kafka_message_destroy(rkmessage); return 1; } int test_consumer_poll (const char *what, rd_kafka_t *rk, uint64_t testid, int exp_eof_cnt, int exp_msg_base, int exp_cnt, test_msgver_t *mv) { int eof_cnt = 0; int cnt = 0; test_timing_t t_cons; TEST_SAY("%s: consume %d messages\n", what, exp_cnt); TIMING_START(&t_cons, "CONSUME"); while ((exp_eof_cnt == -1 || eof_cnt < exp_eof_cnt) && (exp_cnt == -1 || cnt < exp_cnt)) { rd_kafka_message_t *rkmessage; rkmessage = rd_kafka_consumer_poll(rk, tmout_multip(10*1000)); if (!rkmessage) /* Shouldn't take this long to get a msg */ TEST_FAIL("%s: consumer_poll() timeout " "(%d/%d eof, %d/%d msgs)\n", what, eof_cnt, exp_eof_cnt, cnt, exp_cnt); if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) { TEST_SAY("%s [%"PRId32"] reached EOF at " "offset %"PRId64"\n", rd_kafka_topic_name(rkmessage->rkt), rkmessage->partition, rkmessage->offset); if (mv) test_msgver_add_msg(mv, rkmessage); eof_cnt++; } else if (rkmessage->err) { TEST_FAIL("%s [%"PRId32"] error (offset %"PRId64 "): %s", rkmessage->rkt ? rd_kafka_topic_name(rkmessage->rkt) : "(no-topic)", rkmessage->partition, rkmessage->offset, rd_kafka_message_errstr(rkmessage)); } else { if (!mv || test_msgver_add_msg(mv, rkmessage)) cnt++; } rd_kafka_message_destroy(rkmessage); } TIMING_STOP(&t_cons); TEST_SAY("%s: consumed %d/%d messages (%d/%d EOFs)\n", what, cnt, exp_cnt, eof_cnt, exp_eof_cnt); return cnt; } void test_consumer_close (rd_kafka_t *rk) { rd_kafka_resp_err_t err; test_timing_t timing; TEST_SAY("Closing consumer\n"); TIMING_START(&timing, "CONSUMER.CLOSE"); err = rd_kafka_consumer_close(rk); TIMING_STOP(&timing); if (err) TEST_FAIL("Failed to close consumer: %s\n", rd_kafka_err2str(err)); } void test_flush (rd_kafka_t *rk, int timeout_ms) { test_timing_t timing; rd_kafka_resp_err_t err; TEST_SAY("%s: Flushing %d messages\n", rd_kafka_name(rk), rd_kafka_outq_len(rk)); TIMING_START(&timing, "FLUSH"); err = rd_kafka_flush(rk, timeout_ms); TIMING_STOP(&timing); if (err) TEST_FAIL("Failed to flush(%s, %d): %s\n", rd_kafka_name(rk), timeout_ms, rd_kafka_err2str(err)); } void test_conf_set (rd_kafka_conf_t *conf, const char *name, const char *val) { char errstr[512]; if (rd_kafka_conf_set(conf, name, val, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) TEST_FAIL("Failed to set config \"%s\"=\"%s\": %s\n", name, val, errstr); } char *test_conf_get (const rd_kafka_conf_t *conf, const char *name) { static RD_TLS char ret[256]; size_t ret_sz = sizeof(ret); if (rd_kafka_conf_get(conf, name, ret, &ret_sz) != RD_KAFKA_CONF_OK) TEST_FAIL("Failed to get config \"%s\": %s\n", name, "unknown property"); return ret; } /** * @brief Check if property \name matches \p val in \p conf. * If \p conf is NULL the test config will be used. */ int test_conf_match (rd_kafka_conf_t *conf, const char *name, const char *val) { char *real; int free_conf = 0; if (!conf) { test_conf_init(&conf, NULL, 0); free_conf = 1; } real = test_conf_get(conf, name); if (free_conf) rd_kafka_conf_destroy(conf); return !strcmp(real, val); } void test_topic_conf_set (rd_kafka_topic_conf_t *tconf, const char *name, const char *val) { char errstr[512]; if (rd_kafka_topic_conf_set(tconf, name, val, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) TEST_FAIL("Failed to set topic config \"%s\"=\"%s\": %s\n", name, val, errstr); } /** * @brief First attempt to set topic level property, then global. */ void test_any_conf_set (rd_kafka_conf_t *conf, rd_kafka_topic_conf_t *tconf, const char *name, const char *val) { rd_kafka_conf_res_t res = RD_KAFKA_CONF_UNKNOWN; char errstr[512] = {"Missing conf_t"}; if (tconf) res = rd_kafka_topic_conf_set(tconf, name, val, errstr, sizeof(errstr)); if (res == RD_KAFKA_CONF_UNKNOWN && conf) res = rd_kafka_conf_set(conf, name, val, errstr, sizeof(errstr)); if (res != RD_KAFKA_CONF_OK) TEST_FAIL("Failed to set any config \"%s\"=\"%s\": %s\n", name, val, errstr); } void test_print_partition_list (const rd_kafka_topic_partition_list_t *partitions) { int i; for (i = 0 ; i < partitions->cnt ; i++) { TEST_SAY(" %s [%"PRId32"] offset %"PRId64"%s%s\n", partitions->elems[i].topic, partitions->elems[i].partition, partitions->elems[i].offset, partitions->elems[i].err ? ": " : "", partitions->elems[i].err ? rd_kafka_err2str(partitions->elems[i].err) : ""); } } /** * @brief Execute kafka-topics.sh from the Kafka distribution. */ void test_kafka_topics (const char *fmt, ...) { #ifdef _MSC_VER TEST_FAIL("%s not supported on Windows, yet", __FUNCTION__); #else char cmd[512]; int r; va_list ap; test_timing_t t_cmd; const char *kpath, *zk; kpath = test_getenv("KAFKA_PATH", NULL); zk = test_getenv("ZK_ADDRESS", NULL); if (!kpath || !zk) TEST_FAIL("%s: KAFKA_PATH and ZK_ADDRESS must be set", __FUNCTION__); r = rd_snprintf(cmd, sizeof(cmd), "%s/bin/kafka-topics.sh --zookeeper %s ", kpath, zk); TEST_ASSERT(r < (int)sizeof(cmd)); va_start(ap, fmt); rd_vsnprintf(cmd+r, sizeof(cmd)-r, fmt, ap); va_end(ap); TEST_SAY("Executing: %s\n", cmd); TIMING_START(&t_cmd, "exec"); r = system(cmd); TIMING_STOP(&t_cmd); if (r == -1) TEST_FAIL("system(\"%s\") failed: %s", cmd, strerror(errno)); else if (WIFSIGNALED(r)) TEST_FAIL("system(\"%s\") terminated by signal %d\n", cmd, WTERMSIG(r)); else if (WEXITSTATUS(r)) TEST_FAIL("system(\"%s\") failed with exit status %d\n", cmd, WEXITSTATUS(r)); #endif } /** * @brief Create topic using kafka-topics.sh --create */ void test_create_topic (const char *topicname, int partition_cnt, int replication_factor) { test_kafka_topics("--create --topic \"%s\" " "--replication-factor %d --partitions %d", topicname, replication_factor, partition_cnt); } int test_get_partition_count (rd_kafka_t *rk, const char *topicname) { rd_kafka_t *use_rk; rd_kafka_resp_err_t err; rd_kafka_topic_t *rkt; if (!rk) use_rk = test_create_producer(); else use_rk = rk; rkt = rd_kafka_topic_new(use_rk, topicname, NULL); do { const struct rd_kafka_metadata *metadata; err = rd_kafka_metadata(use_rk, 0, rkt, &metadata, tmout_multip(15000)); if (err) TEST_WARN("metadata() for %s failed: %s", rkt ? rd_kafka_topic_name(rkt) : "(all-local)", rd_kafka_err2str(err)); else { if (metadata->topic_cnt == 1) { if (metadata->topics[0].err == 0 || metadata->topics[0].partition_cnt > 0) { int32_t cnt; cnt = metadata->topics[0].partition_cnt; rd_kafka_metadata_destroy(metadata); rd_kafka_topic_destroy(rkt); return (int)cnt; } TEST_SAY("metadata(%s) returned %s: retrying\n", rd_kafka_topic_name(rkt), rd_kafka_err2str(metadata-> topics[0].err)); } rd_kafka_metadata_destroy(metadata); rd_sleep(1); } } while (1); rd_kafka_topic_destroy(rkt); if (!rk) rd_kafka_destroy(use_rk); return -1; } /** * @brief Let the broker auto-create the topic for us. */ rd_kafka_resp_err_t test_auto_create_topic_rkt (rd_kafka_t *rk, rd_kafka_topic_t *rkt) { const struct rd_kafka_metadata *metadata; rd_kafka_resp_err_t err; test_timing_t t; do { TIMING_START(&t, "auto_create_topic"); err = rd_kafka_metadata(rk, 0, rkt, &metadata, tmout_multip(15000)); TIMING_STOP(&t); if (err) TEST_WARN("metadata() for %s failed: %s", rkt ? rd_kafka_topic_name(rkt) : "(all-local)", rd_kafka_err2str(err)); else { if (metadata->topic_cnt == 1) { if (metadata->topics[0].err == 0 || metadata->topics[0].partition_cnt > 0) { rd_kafka_metadata_destroy(metadata); return 0; } TEST_SAY("metadata(%s) returned %s: retrying\n", rd_kafka_topic_name(rkt), rd_kafka_err2str(metadata-> topics[0].err)); } rd_kafka_metadata_destroy(metadata); rd_sleep(1); } } while (1); return err; } rd_kafka_resp_err_t test_auto_create_topic (rd_kafka_t *rk, const char *name) { rd_kafka_topic_t *rkt = rd_kafka_topic_new(rk, name, NULL); rd_kafka_resp_err_t err; if (!rkt) return rd_kafka_last_error(); err = test_auto_create_topic_rkt(rk, rkt); rd_kafka_topic_destroy(rkt); return err; } /** * @brief Check if topic auto creation works. * @returns 1 if it does, else 0. */ int test_check_auto_create_topic (void) { rd_kafka_t *rk; rd_kafka_conf_t *conf; rd_kafka_resp_err_t err; const char *topic = test_mk_topic_name("autocreatetest", 1); test_conf_init(&conf, NULL, 0); rk = test_create_handle(RD_KAFKA_PRODUCER, conf); err = test_auto_create_topic(rk, topic); if (err) TEST_SAY("Auto topic creation of \"%s\" failed: %s\n", topic, rd_kafka_err2str(err)); rd_kafka_destroy(rk); return err ? 0 : 1; } /** * @brief Check if \p feature is builtin to librdkafka. * @returns returns 1 if feature is built in, else 0. */ int test_check_builtin (const char *feature) { rd_kafka_conf_t *conf; char errstr[128]; int r; conf = rd_kafka_conf_new(); if (rd_kafka_conf_set(conf, "builtin.features", feature, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { TEST_SAY("Feature \"%s\" not built-in: %s\n", feature, errstr); r = 0; } else { TEST_SAY("Feature \"%s\" is built-in\n", feature); r = 1; } rd_kafka_conf_destroy(conf); return r; } char *tsprintf (const char *fmt, ...) { static RD_TLS char ret[8][512]; static RD_TLS int i; va_list ap; i = (i + 1) % 8; va_start(ap, fmt); rd_vsnprintf(ret[i], sizeof(ret[i]), fmt, ap); va_end(ap); return ret[i]; } /** * @brief Add a test report JSON object. * These will be written as a JSON array to the test report file. */ void test_report_add (struct test *test, const char *fmt, ...) { va_list ap; char buf[512]; va_start(ap, fmt); vsnprintf(buf, sizeof(buf), fmt, ap); va_end(ap); if (test->report_cnt == test->report_size) { if (test->report_size == 0) test->report_size = 8; else test->report_size *= 2; test->report_arr = realloc(test->report_arr, sizeof(*test->report_arr) * test->report_size); } test->report_arr[test->report_cnt++] = rd_strdup(buf); TEST_SAYL(1, "Report #%d: %s\n", test->report_cnt-1, buf); } /** * Returns 1 if KAFKA_PATH and ZK_ADDRESS is set to se we can use the * kafka-topics.sh script to manually create topics. * * If \p skip is set TEST_SKIP() will be called with a helpful message. */ int test_can_create_topics (int skip) { #ifdef _MSC_VER if (skip) TEST_SKIP("Cannot create topics on Win32"); return 0; #else if (!test_getenv("KAFKA_PATH", NULL) || !test_getenv("ZK_ADDRESS", NULL)) { if (skip) TEST_SKIP("Cannot create topics " "(set KAFKA_PATH and ZK_ADDRESS)\n"); return 0; } return 1; #endif } /** * Wait for \p event_type, discarding all other events prior to it. */ rd_kafka_event_t *test_wait_event (rd_kafka_queue_t *eventq, rd_kafka_event_type_t event_type, int timeout_ms) { test_timing_t t_w; int64_t abs_timeout = test_clock() + (timeout_ms * 1000); TIMING_START(&t_w, "wait_event"); while (test_clock() < abs_timeout) { rd_kafka_event_t *rkev; rkev = rd_kafka_queue_poll(eventq, (int)(abs_timeout - test_clock())/ 1000); if (rd_kafka_event_type(rkev) == event_type) { TIMING_STOP(&t_w); return rkev; } if (!rkev) continue; if (rd_kafka_event_error(rkev)) TEST_SAY("discarding ignored event %s: %s\n", rd_kafka_event_name(rkev), rd_kafka_event_error_string(rkev)); else TEST_SAY("discarding ignored event %s\n", rd_kafka_event_name(rkev)); rd_kafka_event_destroy(rkev); } TIMING_STOP(&t_w); return NULL; } void test_FAIL (const char *file, int line, int fail_now, const char *str) { TEST_FAIL0(file, line, 1/*lock*/, fail_now, "%s", str); } void test_SAY (const char *file, int line, int level, const char *str) { TEST_SAYL(level, "%s", str); } const char *test_curr_name (void) { return test_curr->name; } /** * @brief Dump/print message haders */ void test_headers_dump (const char *what, int lvl, const rd_kafka_headers_t *hdrs) { size_t idx = 0; const char *name, *value; size_t size; while (!rd_kafka_header_get_all(hdrs, idx++, &name, (const void **)&value, &size)) TEST_SAYL(lvl, "%s: Header #%"PRIusz": %s='%s'\n", what, idx-1, name, value ? value : "(NULL)"); }