/*
* librdkafka - Apache Kafka C library
*
* Copyright (c) 2016, Magnus Edenhill
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#include "test.h"
/* Typical include path would be <librdkafka/rdkafka.h>, but this program
* is built from within the librdkafka source tree and thus differs. */
#include "rdkafka.h" /* for Kafka driver */
/**
* KafkaConsumer: regex topic subscriptions
*/
struct expect {
char *name; /* sub-test name */
const char *sub[4]; /* subscriptions */
const char *exp[4]; /* expected topics */
int exp_err; /* expected error from subscribe() */
int stat[4]; /* per exp status */
int fails;
enum {
_EXP_NONE,
_EXP_FAIL,
_EXP_OK,
_EXP_ASSIGN,
_EXP_REVOKE,
_EXP_ASSIGNED,
_EXP_REVOKED,
} result;
};
static struct expect *exp_curr;
static uint64_t testid;
static void expect_match (struct expect *exp,
const rd_kafka_topic_partition_list_t *parts) {
int i;
int e = 0;
int fails = 0;
memset(exp->stat, 0, sizeof(exp->stat));
for (i = 0 ; i < parts->cnt ; i++) {
int found = 0;
e = 0;
while (exp->exp[e]) {
if (!strcmp(parts->elems[i].topic, exp->exp[e])) {
exp->stat[e]++;
found++;
}
e++;
}
if (!found) {
TEST_WARN("%s: got unexpected topic match: %s\n",
exp->name, parts->elems[i].topic);
fails++;
}
}
e = 0;
while (exp->exp[e]) {
if (!exp->stat[e]) {
TEST_WARN("%s: expected topic not "
"found in assignment: %s\n",
exp->name, exp->exp[e]);
fails++;
} else {
TEST_SAY("%s: expected topic %s seen in assignment\n",
exp->name, exp->exp[e]);
}
e++;
}
exp->fails += fails;
if (fails) {
TEST_WARN("%s: see %d previous failures\n", exp->name, fails);
exp->result = _EXP_FAIL;
} else {
TEST_SAY(_C_MAG "[ %s: assignment matched ]\n", exp->name);
exp->result = _EXP_OK;
}
}
static void rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
rd_kafka_topic_partition_list_t *parts, void *opaque){
struct expect *exp = exp_curr;
TEST_ASSERT(exp_curr, "exp_curr not set");
TEST_SAY("rebalance_cb: %s with %d partition(s)\n",
rd_kafka_err2str(err), parts->cnt);
test_print_partition_list(parts);
switch (err)
{
case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
/* Check that provided partitions match our expectations */
if (exp->result != _EXP_ASSIGN) {
TEST_WARN("%s: rebalance called while expecting %d: "
"too many or undesired assignment(s?\n",
exp->name, exp->result);
}
expect_match(exp, parts);
test_consumer_assign("rebalance", rk, parts);
exp->result = _EXP_ASSIGNED;
break;
case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
if (exp->result != _EXP_REVOKE) {
TEST_WARN("%s: rebalance called while expecting %d: "
"too many or undesired assignment(s?\n",
exp->name, exp->result);
}
test_consumer_unassign("rebalance", rk);
exp->result = _EXP_REVOKED;
break;
default:
TEST_FAIL("rebalance_cb: error: %s", rd_kafka_err2str(err));
}
}
static int test_subscribe (rd_kafka_t *rk, struct expect *exp) {
rd_kafka_resp_err_t err;
rd_kafka_topic_partition_list_t *tlist;
int i;
test_timing_t t_sub, t_assign, t_unsub;
exp_curr = exp;
test_timeout_set((test_session_timeout_ms/1000) * 3);
tlist = rd_kafka_topic_partition_list_new(4);
TEST_SAY(_C_MAG "[ %s: begin ]\n", exp->name);
i = 0;
TEST_SAY("Topic subscription:\n");
while (exp->sub[i]) {
TEST_SAY("%s: %s\n", exp->name, exp->sub[i]);
rd_kafka_topic_partition_list_add(tlist, exp->sub[i],
RD_KAFKA_PARTITION_UA);
i++;
}
/* Subscribe */
TIMING_START(&t_sub, "subscribe");
err = rd_kafka_subscribe(rk, tlist);
TIMING_STOP(&t_sub);
TEST_ASSERT(err == exp->exp_err,
"subscribe() failed: %s (expected %s)",
rd_kafka_err2str(err), rd_kafka_err2str(exp->exp_err));
if (exp->exp[0]) {
/* Wait for assignment, actual messages are ignored. */
exp->result = _EXP_ASSIGN;
TEST_SAY("%s: waiting for assignment\n", exp->name);
TIMING_START(&t_assign, "assignment");
while (exp->result == _EXP_ASSIGN)
test_consumer_poll_once(rk, NULL, 1000);
TIMING_STOP(&t_assign);
TEST_ASSERT(exp->result == _EXP_ASSIGNED,
"got %d instead of assignment", exp->result);
} else {
/* Not expecting any assignment */
int64_t ts_end = test_clock() + 5000;
exp->result = _EXP_NONE; /* Not expecting a rebalance */
while (exp->result == _EXP_NONE && test_clock() < ts_end)
test_consumer_poll_once(rk, NULL, 1000);
TEST_ASSERT(exp->result == _EXP_NONE);
}
/* Unsubscribe */
TIMING_START(&t_unsub, "unsubscribe");
err = rd_kafka_unsubscribe(rk);
TIMING_STOP(&t_unsub);
TEST_ASSERT(!err, "unsubscribe() failed: %s", rd_kafka_err2str(err));
rd_kafka_topic_partition_list_destroy(tlist);
if (exp->exp[0]) {
/* Wait for revoke, actual messages are ignored. */
TEST_SAY("%s: waiting for revoke\n", exp->name);
exp->result = _EXP_REVOKE;
TIMING_START(&t_assign, "revoke");
while (exp->result != _EXP_REVOKED)
test_consumer_poll_once(rk, NULL, 1000);
TIMING_STOP(&t_assign);
TEST_ASSERT(exp->result == _EXP_REVOKED,
"got %d instead of revoke", exp->result);
} else {
/* Not expecting any revoke */
int64_t ts_end = test_clock() + 5000;
exp->result = _EXP_NONE; /* Not expecting a rebalance */
while (exp->result == _EXP_NONE && test_clock() < ts_end)
test_consumer_poll_once(rk, NULL, 1000);
TEST_ASSERT(exp->result == _EXP_NONE);
}
TEST_SAY(_C_MAG "[ %s: done with %d failures ]\n", exp->name, exp->fails);
return exp->fails;
}
static int do_test (const char *assignor) {
static char topics[3][128];
static char nonexist_topic[128];
const int topic_cnt = 3;
rd_kafka_t *rk;
const int msgcnt = 10;
int i;
char groupid[64];
int fails = 0;
rd_kafka_conf_t *conf;
if (!test_check_builtin("regex")) {
TEST_SKIP("regex support not built in\n");
return 0;
}
testid = test_id_generate();
test_str_id_generate(groupid, sizeof(groupid));
rd_snprintf(topics[0], sizeof(topics[0]),
"%s_%s",
test_mk_topic_name("regex_subscribe_TOPIC_0001_UNO", 0),
groupid);
rd_snprintf(topics[1], sizeof(topics[1]),
"%s_%s",
test_mk_topic_name("regex_subscribe_topic_0002_dup", 0),
groupid);
rd_snprintf(topics[2], sizeof(topics[2]),
"%s_%s",
test_mk_topic_name("regex_subscribe_TOOTHPIC_0003_3", 0),
groupid);
/* To avoid auto topic creation to kick in we use
* an invalid topic name. */
rd_snprintf(nonexist_topic, sizeof(nonexist_topic),
"%s_%s",
test_mk_topic_name("regex_subscribe_NONEXISTENT_0004_IV#!",
0),
groupid);
/* Produce messages to topics to ensure creation. */
for (i = 0 ; i < topic_cnt ; i++)
test_produce_msgs_easy(topics[i], testid,
RD_KAFKA_PARTITION_UA, msgcnt);
test_conf_init(&conf, NULL, 20);
test_conf_set(conf, "partition.assignment.strategy", assignor);
/* Speed up propagation of new topics */
test_conf_set(conf, "topic.metadata.refresh.interval.ms", "5000");
/* Create a single consumer to handle all subscriptions.
* Has the nice side affect of testing multiple subscriptions. */
rk = test_create_consumer(groupid, rebalance_cb, conf, NULL);
/*
* Test cases
*/
{
struct expect expect = {
.name = rd_strdup(tsprintf("%s: no regexps (0&1)",
assignor)),
.sub = { topics[0], topics[1], NULL },
.exp = { topics[0], topics[1], NULL }
};
fails += test_subscribe(rk, &expect);
rd_free(expect.name);
}
{
struct expect expect = {
.name = rd_strdup(tsprintf("%s: no regexps "
"(no matches)",
assignor)),
.sub = { nonexist_topic, NULL },
.exp = { NULL }
};
fails += test_subscribe(rk, &expect);
rd_free(expect.name);
}
{
struct expect expect = {
.name = rd_strdup(tsprintf("%s: regex all", assignor)),
.sub = { rd_strdup(tsprintf("^.*_%s", groupid)), NULL },
.exp = { topics[0], topics[1], topics[2], NULL }
};
fails += test_subscribe(rk, &expect);
rd_free(expect.name);
rd_free((void*)expect.sub[0]);
}
{
struct expect expect = {
.name = rd_strdup(tsprintf("%s: regex 0&1", assignor)),
.sub = { rd_strdup(tsprintf("^.*[tToOpPiIcC]_0+[12]_[^_]+_%s",
groupid)), NULL },
.exp = { topics[0], topics[1], NULL }
};
fails += test_subscribe(rk, &expect);
rd_free(expect.name);
rd_free((void*)expect.sub[0]);
}
{
struct expect expect = {
.name = rd_strdup(tsprintf("%s: regex 2", assignor)),
.sub = { rd_strdup(tsprintf("^.*TOOTHPIC_000._._%s",
groupid)), NULL },
.exp = { topics[2], NULL }
};
fails += test_subscribe(rk, &expect);
rd_free(expect.name);
rd_free((void *)expect.sub[0]);
}
{
struct expect expect = {
.name = rd_strdup(tsprintf("%s: regex 2 and "
"nonexistent(not seen)",
assignor)),
.sub = { rd_strdup(tsprintf("^.*_000[34]_..?_%s",
groupid)), NULL },
.exp = { topics[2], NULL }
};
fails += test_subscribe(rk, &expect);
rd_free(expect.name);
rd_free((void *)expect.sub[0]);
}
{
struct expect expect = {
.name = rd_strdup(tsprintf("%s: broken regex (no matches)",
assignor)),
.sub = { "^.*[0", NULL },
.exp = { NULL },
.exp_err = RD_KAFKA_RESP_ERR__INVALID_ARG
};
fails += test_subscribe(rk, &expect);
rd_free(expect.name);
}
test_consumer_close(rk);
rd_kafka_destroy(rk);
if (fails)
TEST_FAIL("See %d previous failures", fails);
return 0;
}
int main_0033_regex_subscribe (int argc, char **argv) {
do_test("range");
do_test("roundrobin");
return 0;
}
/**
* @brief Subscription API tests that dont require a broker
*/
int main_0033_regex_subscribe_local (int argc, char **argv) {
rd_kafka_topic_partition_list_t *valids, *invalids, *none,
*empty, *alot;
rd_kafka_t *rk;
rd_kafka_conf_t *conf;
rd_kafka_resp_err_t err;
char errstr[256];
int i;
valids = rd_kafka_topic_partition_list_new(0);
invalids = rd_kafka_topic_partition_list_new(100);
none = rd_kafka_topic_partition_list_new(1000);
empty = rd_kafka_topic_partition_list_new(5);
alot = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(valids, "not_a_regex", 0);
rd_kafka_topic_partition_list_add(valids, "^My[vV]alid..regex+", 0);
rd_kafka_topic_partition_list_add(valids, "^another_one$", 55);
rd_kafka_topic_partition_list_add(invalids, "not_a_regex", 0);
rd_kafka_topic_partition_list_add(invalids, "^My[vV]alid..regex+", 0);
rd_kafka_topic_partition_list_add(invalids, "^??++", 99);
rd_kafka_topic_partition_list_add(empty, "not_a_regex", 0);
rd_kafka_topic_partition_list_add(empty, "", 0);
rd_kafka_topic_partition_list_add(empty, "^ok", 0);
for (i = 0 ; i < 10000 ; i++) {
char topic[32];
rd_snprintf(topic, sizeof(topic), "^Va[lLid]_regex_%d$", i);
rd_kafka_topic_partition_list_add(alot, topic, i);
}
conf = rd_kafka_conf_new();
test_conf_set(conf, "group.id", "group");
test_conf_set(conf, "client.id", test_curr->name);
rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
if (!rk)
TEST_FAIL("Failed to create consumer: %s", errstr);
err = rd_kafka_subscribe(rk, valids);
TEST_ASSERT(!err, "valids failed: %s", rd_kafka_err2str(err));
err = rd_kafka_subscribe(rk, invalids);
TEST_ASSERT(err == RD_KAFKA_RESP_ERR__INVALID_ARG,
"invalids failed with wrong return: %s",
rd_kafka_err2str(err));
err = rd_kafka_subscribe(rk, none);
TEST_ASSERT(err == RD_KAFKA_RESP_ERR__INVALID_ARG,
"none failed with wrong return: %s", rd_kafka_err2str(err));
err = rd_kafka_subscribe(rk, empty);
TEST_ASSERT(err == RD_KAFKA_RESP_ERR__INVALID_ARG,
"empty failed with wrong return: %s",
rd_kafka_err2str(err));
err = rd_kafka_subscribe(rk, alot);
TEST_ASSERT(!err, "alot failed: %s", rd_kafka_err2str(err));
rd_kafka_consumer_close(rk);
rd_kafka_destroy(rk);
rd_kafka_topic_partition_list_destroy(valids);
rd_kafka_topic_partition_list_destroy(invalids);
rd_kafka_topic_partition_list_destroy(none);
rd_kafka_topic_partition_list_destroy(empty);
rd_kafka_topic_partition_list_destroy(alot);
return 0;
}