/*
* librdkafka - Apache Kafka C library
*
* Copyright (c) 2012-2015, Magnus Edenhill
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#include "test.h"
#if WITH_SOCKEM
#include "rdkafka.h"
#include <stdarg.h>
#include <errno.h>
/**
* Request retry testing
*/
/* Hang on to the first broker socket we see in connect_cb,
* reject all the rest (connection refused) to make sure we're only
* playing with one single broker for this test. */
static struct {
mtx_t lock;
cnd_t cnd;
sockem_t *skm;
thrd_t thrd;
struct {
int64_t ts_at; /* to ctrl thread: at this time, set delay */
int delay;
int ack; /* from ctrl thread: new delay acked */
} cmd;
struct {
int64_t ts_at; /* to ctrl thread: at this time, set delay */
int delay;
} next;
int term;
} ctrl;
static int ctrl_thrd_main (void *arg) {
mtx_lock(&ctrl.lock);
while (!ctrl.term) {
int64_t now;
cnd_timedwait_ms(&ctrl.cnd, &ctrl.lock, 10);
if (ctrl.cmd.ts_at) {
ctrl.next.ts_at = ctrl.cmd.ts_at;
ctrl.next.delay = ctrl.cmd.delay;
ctrl.cmd.ts_at = 0;
ctrl.cmd.ack = 1;
printf(_C_CYA "## %s: sockem: "
"receieved command to set delay "
"to %d in %dms\n" _C_CLR,
__FILE__,
ctrl.next.delay,
(int)(ctrl.next.ts_at - test_clock()) / 1000);
}
now = test_clock();
if (ctrl.next.ts_at && now > ctrl.next.ts_at) {
assert(ctrl.skm);
printf(_C_CYA "## %s: "
"sockem: setting socket delay to %d\n" _C_CLR,
__FILE__, ctrl.next.delay);
sockem_set(ctrl.skm, "delay", ctrl.next.delay, NULL);
ctrl.next.ts_at = 0;
cnd_signal(&ctrl.cnd); /* signal back to caller */
}
}
mtx_unlock(&ctrl.lock);
return 0;
}
/**
* @brief Sockem connect, called from **internal librdkafka thread** through
* librdkafka's connect_cb
*/
static int connect_cb (struct test *test, sockem_t *skm, const char *id) {
mtx_lock(&ctrl.lock);
if (ctrl.skm) {
/* Reject all but the first connect */
mtx_unlock(&ctrl.lock);
return ECONNREFUSED;
}
ctrl.skm = skm;
/* signal wakeup to main thread */
cnd_broadcast(&ctrl.cnd);
mtx_unlock(&ctrl.lock);
return 0;
}
static int is_fatal_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
const char *reason) {
/* Ignore connectivity errors since we'll be bringing down
* .. connectivity.
* SASL auther will think a connection-down even in the auth
* state means the broker doesn't support SASL PLAIN. */
TEST_SAY("is_fatal?: %s: %s\n", rd_kafka_err2str(err), reason);
if (err == RD_KAFKA_RESP_ERR__TRANSPORT ||
err == RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN ||
err == RD_KAFKA_RESP_ERR__AUTHENTICATION ||
err == RD_KAFKA_RESP_ERR__TIMED_OUT)
return 0;
return 1;
}
/**
* @brief Set socket delay to kick in after \p after ms
*/
static void set_delay (int after, int delay) {
TEST_SAY("Set delay to %dms (after %dms)\n", delay, after);
mtx_lock(&ctrl.lock);
ctrl.cmd.ts_at = test_clock() + (after*1000);
ctrl.cmd.delay = delay;
ctrl.cmd.ack = 0;
cnd_broadcast(&ctrl.cnd);
/* Wait for ack from sockem thread */
while (!ctrl.cmd.ack) {
TEST_SAY("Waiting for sockem control ack\n");
cnd_timedwait_ms(&ctrl.cnd, &ctrl.lock, 1000);
}
mtx_unlock(&ctrl.lock);
}
/**
* @brief Test that Metadata requests are retried properly when
* timing out due to high broker rtt.
*/
static void do_test_low_socket_timeout (const char *topic) {
rd_kafka_t *rk;
rd_kafka_conf_t *conf;
rd_kafka_topic_t *rkt;
rd_kafka_resp_err_t err;
const struct rd_kafka_metadata *md;
mtx_init(&ctrl.lock, mtx_plain);
cnd_init(&ctrl.cnd);
TEST_SAY("Test Metadata request retries on timeout\n");
test_conf_init(&conf, NULL, 60);
test_conf_set(conf, "socket.timeout.ms", "1000");
test_conf_set(conf, "socket.max.fails", "12345");
test_conf_set(conf, "retry.backoff.ms", "5000");
/* Avoid api version requests (with their own timeout) to get in
* the way of our test */
test_conf_set(conf, "api.version.request", "false");
test_socket_enable(conf);
test_curr->connect_cb = connect_cb;
test_curr->is_fatal_cb = is_fatal_cb;
rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
rkt = test_create_producer_topic(rk, topic, NULL);
TEST_SAY("Waiting for sockem connect..\n");
mtx_lock(&ctrl.lock);
while (!ctrl.skm)
cnd_wait(&ctrl.cnd, &ctrl.lock);
mtx_unlock(&ctrl.lock);
TEST_SAY("Connected, fire off a undelayed metadata() to "
"make sure connection is up\n");
err = rd_kafka_metadata(rk, 0, rkt, &md, tmout_multip(2000));
TEST_ASSERT(!err, "metadata(undelayed) failed: %s",
rd_kafka_err2str(err));
rd_kafka_metadata_destroy(md);
if (thrd_create(&ctrl.thrd, ctrl_thrd_main, NULL) != thrd_success)
TEST_FAIL("Failed to create sockem ctrl thread");
set_delay(0, 3000); /* Takes effect immediately */
/* After two retries, remove the delay, the third retry
* should kick in and work. */
set_delay(((1000 /*socket.timeout.ms*/ +
5000 /*retry.backoff.ms*/) * 2) - 2000, 0);
TEST_SAY("Calling metadata() again which should succeed after "
"3 internal retries\n");
/* Metadata should be returned after the third retry */
err = rd_kafka_metadata(rk, 0, rkt, &md,
((1000 /*socket.timeout.ms*/ +
5000 /*retry.backoff.ms*/) * 2) + 5000);
TEST_SAY("metadata() returned %s\n", rd_kafka_err2str(err));
TEST_ASSERT(!err, "metadata(undelayed) failed: %s",
rd_kafka_err2str(err));
rd_kafka_metadata_destroy(md);
rd_kafka_topic_destroy(rkt);
rd_kafka_destroy(rk);
/* Join controller thread */
mtx_lock(&ctrl.lock);
ctrl.term = 1;
mtx_unlock(&ctrl.lock);
thrd_join(ctrl.thrd, NULL);
cnd_destroy(&ctrl.cnd);
mtx_destroy(&ctrl.lock);
}
int main_0075_retry (int argc, char **argv) {
const char *topic = test_mk_topic_name("0075_retry", 1);
do_test_low_socket_timeout(topic);
return 0;
}
#endif