/*
* librdkafka - Apache Kafka C library
*
* Copyright (c) 2012-2015, Magnus Edenhill
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#include "test.h"
#include "rdkafka.h"
/**
* Local (no broker) unit-like tests of Message Headers
*/
static int exp_msgid = 0;
struct expect {
const char *name;
const char *value;
};
/**
* @brief returns the message id
*/
static int expect_check (const char *what, const struct expect *expected,
const rd_kafka_message_t *rkmessage) {
const struct expect *exp;
rd_kafka_resp_err_t err;
size_t idx = 0;
const char *name;
const char *value;
size_t size;
rd_kafka_headers_t *hdrs;
int msgid;
if (rkmessage->len != sizeof(msgid))
TEST_FAIL("%s: expected message len %"PRIusz" == sizeof(int)",
what, rkmessage->len);
memcpy(&msgid, rkmessage->payload, rkmessage->len);
if ((err = rd_kafka_message_headers(rkmessage, &hdrs))) {
if (msgid == 0)
return 0; /* No headers expected for first message */
TEST_FAIL("%s: Expected headers in message %d: %s", what, msgid,
rd_kafka_err2str(err));
} else {
TEST_ASSERT(msgid != 0,
"%s: first message should have no headers", what);
}
/* msgid should always be first and has a variable value so hard to
* match with the expect struct. */
for (idx = 0, exp = expected ;
!rd_kafka_header_get_all(hdrs, idx, &name,
(const void **)&value, &size) ;
idx++, exp++) {
TEST_SAYL(3, "%s: Msg #%d: "
"Header #%"PRIusz": %s='%s' (expecting %s='%s')\n",
what, msgid, idx, name, value ? value : "(NULL)",
exp->name, exp->value ? exp->value : "(NULL)");
if (strcmp(name, exp->name))
TEST_FAIL("%s: Expected header %s at idx #%"PRIusz
", not %s",
what, exp->name, idx-1, name);
if (!strcmp(name, "msgid")) {
int vid;
/* Special handling: compare msgid header value
* to message body, should be identical */
if (size != rkmessage->len || size != sizeof(int))
TEST_FAIL("%s: "
"Expected msgid/int-sized payload "
"%"PRIusz", got %"PRIusz,
what, size, rkmessage->len);
/* Copy to avoid unaligned access (by cast) */
memcpy(&vid, value, size);
if (vid != msgid)
TEST_FAIL("%s: Header msgid %d != payload %d",
what, vid, msgid);
if (exp_msgid != vid)
TEST_FAIL("%s: Expected msgid %d, not %d",
what, exp_msgid, vid);
continue;
}
if (!exp->value) {
/* Expected NULL value */
TEST_ASSERT(!value,
"%s: Expected NULL value for %s, got %s",
what, exp->name, value);
} else {
TEST_ASSERT(value,
"%s: "
"Expected non-NULL value for %s, got NULL",
what, exp->name);
TEST_ASSERT(size == strlen(exp->value),
"%s: Expected size %"PRIusz" for %s, "
"not %"PRIusz,
what, strlen(exp->value), exp->name, size);
TEST_ASSERT(value[size] == '\0',
"%s: "
"Expected implicit null-terminator for %s",
what, exp->name);
TEST_ASSERT(!strcmp(exp->value, value),
"%s: "
"Expected value %s for %s, not %s",
what, exp->value, exp->name, value);
}
}
TEST_ASSERT(exp->name == NULL,
"%s: Expected the expected, but stuck at %s which was "
"unexpected",
what, exp->name);
return msgid;
}
/**
* @brief Delivery report callback
*/
static void dr_msg_cb (rd_kafka_t *rk,
const rd_kafka_message_t *rkmessage, void *opaque) {
const struct expect expected[] = {
{ "msgid", NULL }, /* special handling */
{ "static", "hey" },
{ "null", NULL },
{ "empty", "" },
{ "send1", "1" },
{ "multi", "multi5" },
{ NULL }
};
const struct expect replace_expected[] = {
{ "msgid", NULL },
{ "new", "one" },
{ "this is the", NULL },
{ "replaced headers\"", "" },
{ "new", "right?" },
{ NULL }
};
const struct expect *exp;
rd_kafka_headers_t *new_hdrs;
int msgid;
TEST_ASSERT(rkmessage->err == RD_KAFKA_RESP_ERR__MSG_TIMED_OUT,
"Expected message to fail with MSG_TIMED_OUT, not %s",
rd_kafka_err2str(rkmessage->err));
msgid = expect_check(__FUNCTION__, expected, rkmessage);
/* Replace entire headers list */
if (msgid > 0) {
new_hdrs = rd_kafka_headers_new(1);
rd_kafka_header_add(new_hdrs, "msgid", -1,
&msgid, sizeof(msgid));
for (exp = &replace_expected[1] ; exp->name ; exp++)
rd_kafka_header_add(new_hdrs,
exp->name, -1, exp->value, -1);
rd_kafka_message_set_headers((rd_kafka_message_t *)rkmessage,
new_hdrs);
expect_check(__FUNCTION__, replace_expected, rkmessage);
}
exp_msgid++;
}
static void expect_iter (const char *what,
const rd_kafka_headers_t *hdrs, const char *name,
const char **expected, size_t cnt) {
size_t idx;
rd_kafka_resp_err_t err;
const void *value;
size_t size;
for (idx = 0 ;
!(err = rd_kafka_header_get(hdrs, idx, name, &value, &size)) ;\
idx++) {
TEST_ASSERT(idx < cnt,
"%s: too many headers matching '%s', "
"expected %"PRIusz,
what, name, cnt);
TEST_SAYL(3, "%s: get(%"PRIusz", '%s') "
"expecting '%s' =? '%s'\n",
what, idx, name, expected[idx], (const char *)value);
TEST_ASSERT(!strcmp((const char *)value, expected[idx]),
"%s: get(%"PRIusz", '%s') expected '%s', not '%s'",
what, idx, name, expected[idx],
(const char *)value);
}
TEST_ASSERT(idx == cnt,
"%s: expected %"PRIusz" headers matching '%s', not %"PRIusz,
what, cnt, name, idx);
}
/**
* @brief First on_send() interceptor
*/
static rd_kafka_resp_err_t on_send1 (rd_kafka_t *rk,
rd_kafka_message_t *rkmessage,
void *ic_opaque) {
const struct expect expected[] = {
{ "msgid", NULL }, /* special handling */
{ "static", "hey" },
{ "multi", "multi1" },
{ "multi", "multi2" },
{ "multi", "multi3" },
{ "null", NULL },
{ "empty", "" },
{ NULL }
};
const char *expect_iter_multi[4] = {
"multi1",
"multi2",
"multi3",
"multi4" /* added below */
};
const char *expect_iter_static[1] = {
"hey"
};
rd_kafka_headers_t *hdrs;
size_t header_cnt;
rd_kafka_resp_err_t err;
const void *value;
size_t size;
expect_check(__FUNCTION__, expected, rkmessage);
err = rd_kafka_message_headers(rkmessage, &hdrs);
if (err) /* First message has no headers. */
return RD_KAFKA_RESP_ERR_NO_ERROR;
header_cnt = rd_kafka_header_cnt(hdrs);
TEST_ASSERT(header_cnt == 7,
"Expected 7 length got %zd", header_cnt);
rd_kafka_header_add(hdrs, "multi", -1, "multi4", -1);
header_cnt = rd_kafka_header_cnt(hdrs);
TEST_ASSERT(header_cnt == 8,
"Expected 8 length got %zd", header_cnt);
/* test iter() */
expect_iter(__FUNCTION__, hdrs, "multi", expect_iter_multi, 4);
expect_iter(__FUNCTION__, hdrs, "static", expect_iter_static, 1);
expect_iter(__FUNCTION__, hdrs, "notexists", NULL, 0);
rd_kafka_header_add(hdrs, "send1", -1, "1", -1);
header_cnt = rd_kafka_header_cnt(hdrs);
TEST_ASSERT(header_cnt == 9,
"Expected 9 length got %zd", header_cnt);
rd_kafka_header_remove(hdrs, "multi");
header_cnt = rd_kafka_header_cnt(hdrs);
TEST_ASSERT(header_cnt == 5,
"Expected 5 length got %zd", header_cnt);
rd_kafka_header_add(hdrs, "multi", -1, "multi5", -1);
header_cnt = rd_kafka_header_cnt(hdrs);
TEST_ASSERT(header_cnt == 6,
"Expected 6 length got %zd", header_cnt);
/* test get_last() */
err = rd_kafka_header_get_last(hdrs, "multi", &value, &size);
TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
TEST_ASSERT(size == strlen("multi5") &&
!strcmp((const char *)value, "multi5"),
"expected 'multi5', not '%s'",
(const char *)value);
return RD_KAFKA_RESP_ERR_NO_ERROR;
}
/**
* @brief Second on_send() interceptor
*/
static rd_kafka_resp_err_t on_send2 (rd_kafka_t *rk,
rd_kafka_message_t *rkmessage,
void *ic_opaque) {
const struct expect expected[] = {
{ "msgid", NULL }, /* special handling */
{ "static", "hey" },
{ "null", NULL },
{ "empty", "" },
{ "send1", "1" },
{ "multi", "multi5" },
{ NULL }
};
expect_check(__FUNCTION__, expected, rkmessage);
return RD_KAFKA_RESP_ERR_NO_ERROR;
}
/**
* @brief on_new() interceptor to set up message interceptors
* from rd_kafka_new().
*/
static rd_kafka_resp_err_t on_new (rd_kafka_t *rk, const rd_kafka_conf_t *conf,
void *ic_opaque,
char *errstr, size_t errstr_size) {
rd_kafka_interceptor_add_on_send(rk, __FILE__, on_send1, NULL);
rd_kafka_interceptor_add_on_send(rk, __FILE__, on_send2, NULL);
return RD_KAFKA_RESP_ERR_NO_ERROR;
}
int main_0072_headers_ut (int argc, char **argv) {
const char *topic = test_mk_topic_name(__FUNCTION__ + 5, 0);
rd_kafka_t *rk;
rd_kafka_conf_t *conf;
int i;
size_t header_cnt;
const int msgcnt = 10;
rd_kafka_resp_err_t err;
conf = rd_kafka_conf_new();
test_conf_set(conf, "message.timeout.ms", "1");
rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);
rd_kafka_conf_interceptor_add_on_new(conf, __FILE__, on_new, NULL);
rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
/* First message is without headers (negative testing) */
i = 0;
err = rd_kafka_producev(
rk,
RD_KAFKA_V_TOPIC(topic),
RD_KAFKA_V_VALUE(&i, sizeof(i)),
RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
RD_KAFKA_V_END);
TEST_ASSERT(!err,
"producev() failed: %s", rd_kafka_err2str(err));
exp_msgid++;
for (i = 1 ; i < msgcnt ; i++, exp_msgid++) {
/* Use headers list on one message */
if (i == 3) {
rd_kafka_headers_t *hdrs = rd_kafka_headers_new(4);
header_cnt = rd_kafka_header_cnt(hdrs);
TEST_ASSERT(header_cnt == 0,
"Expected 0 length got %zd", header_cnt);
rd_kafka_headers_t *copied;
rd_kafka_header_add(hdrs, "msgid", -1, &i, sizeof(i));
rd_kafka_header_add(hdrs, "static", -1, "hey", -1);
rd_kafka_header_add(hdrs, "multi", -1, "multi1", -1);
rd_kafka_header_add(hdrs, "multi", -1, "multi2", 6);
rd_kafka_header_add(hdrs, "multi", -1, "multi3", strlen("multi3"));
rd_kafka_header_add(hdrs, "null", -1, NULL, 0);
/* Make a copy of the headers to verify copy() */
copied = rd_kafka_headers_copy(hdrs);
header_cnt = rd_kafka_header_cnt(hdrs);
TEST_ASSERT(header_cnt == 6,
"Expected 6 length got %zd", header_cnt);
rd_kafka_headers_destroy(hdrs);
/* Last header ("empty") is added below */
/* Try unsupported _V_HEADER() and _V_HEADERS() mix,
* must fail with CONFLICT */
err = rd_kafka_producev(
rk,
RD_KAFKA_V_TOPIC(topic),
RD_KAFKA_V_VALUE(&i, sizeof(i)),
RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
RD_KAFKA_V_HEADER("will_be_removed", "yep", -1),
RD_KAFKA_V_HEADERS(copied),
RD_KAFKA_V_HEADER("empty", "", 0),
RD_KAFKA_V_END);
TEST_ASSERT(err == RD_KAFKA_RESP_ERR__CONFLICT,
"producev(): expected CONFLICT, got %s",
rd_kafka_err2str(err));
/* Proper call using only _V_HEADERS() */
rd_kafka_header_add(copied, "empty", -1, "", -1);
err = rd_kafka_producev(
rk,
RD_KAFKA_V_TOPIC(topic),
RD_KAFKA_V_VALUE(&i, sizeof(i)),
RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
RD_KAFKA_V_HEADERS(copied),
RD_KAFKA_V_END);
TEST_ASSERT(!err, "producev() failed: %s",
rd_kafka_err2str(err));
} else {
err = rd_kafka_producev(
rk,
RD_KAFKA_V_TOPIC(topic),
RD_KAFKA_V_VALUE(&i, sizeof(i)),
RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
RD_KAFKA_V_HEADER("msgid", &i, sizeof(i)),
RD_KAFKA_V_HEADER("static", "hey", -1),
RD_KAFKA_V_HEADER("multi", "multi1", -1),
RD_KAFKA_V_HEADER("multi", "multi2", 6),
RD_KAFKA_V_HEADER("multi", "multi3", strlen("multi3")),
RD_KAFKA_V_HEADER("null", NULL, 0),
RD_KAFKA_V_HEADER("empty", "", 0),
RD_KAFKA_V_END);
TEST_ASSERT(!err,
"producev() failed: %s", rd_kafka_err2str(err));
}
}
/* Reset expected message id for dr */
exp_msgid = 0;
/* Wait for timeouts and delivery reports */
rd_kafka_flush(rk, 5000);
rd_kafka_destroy(rk);
return 0;
}