/*
* librdkafka - Apache Kafka C library
*
* Copyright (c) 2012-2015, Magnus Edenhill
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#include "test.h"
/* Typical include path would be <librdkafka/rdkafka.h>, but this program
* is built from within the librdkafka source tree and thus differs. */
#include "rdkafka.h" /* for Kafka driver */
/**
* List consumer groups
*
* Runs two consumers in two different groups and lists them.
*/
/**
* Verify that all groups in 'groups' are seen, if so returns group_cnt,
* else returns -1.
*/
static int verify_groups (const struct rd_kafka_group_list *grplist,
char **groups, int group_cnt) {
int i;
int seen = 0;
for (i = 0 ; i < grplist->group_cnt ; i++) {
const struct rd_kafka_group_info *gi = &grplist->groups[i];
int j;
for (j = 0 ; j < group_cnt ; j++) {
if (strcmp(gi->group, groups[j]))
continue;
if (gi->err)
TEST_SAY("Group %s has broker-reported "
"error: %s\n", gi->group,
rd_kafka_err2str(gi->err));
seen++;
}
}
TEST_SAY("Found %d/%d desired groups in list of %d groups\n",
seen, group_cnt, grplist->group_cnt);
if (seen != group_cnt)
return -1;
else
return seen;
}
/**
* List groups by:
* - List all groups, check that the groups in 'groups' are seen.
* - List each group in 'groups', one by one.
*
* Returns 'group_cnt' if all groups in 'groups' were seen by both
* methods, else 0, or -1 on error.
*/
static int list_groups (rd_kafka_t *rk, char **groups, int group_cnt,
const char *desc) {
rd_kafka_resp_err_t err = 0;
const struct rd_kafka_group_list *grplist;
int i, r;
int fails = 0;
int seen = 0;
int seen_all = 0;
int retries = 5;
TEST_SAY("List groups (expect %d): %s\n", group_cnt, desc);
/* FIXME: Wait for broker to come up. This should really be abstracted
* by librdkafka. */
do {
if (err) {
TEST_SAY("Retrying group list in 1s because of: %s\n",
rd_kafka_err2str(err));
rd_sleep(1);
}
err = rd_kafka_list_groups(rk, NULL, &grplist,
tmout_multip(5000));
} while ((err == RD_KAFKA_RESP_ERR__TRANSPORT ||
err == RD_KAFKA_RESP_ERR_GROUP_LOAD_IN_PROGRESS) &&
retries-- > 0);
if (err) {
TEST_SAY("Failed to list all groups: %s\n",
rd_kafka_err2str(err));
return -1;
}
seen_all = verify_groups(grplist, groups, group_cnt);
rd_kafka_group_list_destroy(grplist);
for (i = 0 ; i < group_cnt ; i++) {
err = rd_kafka_list_groups(rk, groups[i], &grplist, 5000);
if (err) {
TEST_SAY("Failed to list group %s: %s\n",
groups[i], rd_kafka_err2str(err));
fails++;
continue;
}
r = verify_groups(grplist, &groups[i], 1);
if (r == 1)
seen++;
rd_kafka_group_list_destroy(grplist);
}
if (seen_all != seen)
return 0;
return seen;
}
int main_0019_list_groups (int argc, char **argv) {
const char *topic = test_mk_topic_name(__FUNCTION__, 1);
#define _CONS_CNT 2
char *groups[_CONS_CNT];
rd_kafka_t *rk, *rk_c[_CONS_CNT];
rd_kafka_topic_partition_list_t *topics;
rd_kafka_resp_err_t err;
test_timing_t t_grps;
int i;
int groups_seen;
rd_kafka_topic_t *rkt;
const struct rd_kafka_group_list *grplist;
/* Handle for group listings */
rk = test_create_producer();
/* Produce messages so that topic is auto created */
rkt = test_create_topic_object(rk, topic, NULL);
test_produce_msgs(rk, rkt, 0, 0, 0, 10, NULL, 64);
rd_kafka_topic_destroy(rkt);
/* Query groups before creation, should not list our groups. */
groups_seen = list_groups(rk, NULL, 0, "should be none");
if (groups_seen != 0)
TEST_FAIL("Saw %d groups when there wasn't "
"supposed to be any\n", groups_seen);
/* Fill in topic subscription set */
topics = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(topics, topic, -1);
/* Create consumers and start subscription */
for (i = 0 ; i < _CONS_CNT ; i++) {
groups[i] = malloc(32);
test_str_id_generate(groups[i], 32);
rk_c[i] = test_create_consumer(groups[i],
NULL, NULL, NULL);
err = rd_kafka_poll_set_consumer(rk_c[i]);
if (err)
TEST_FAIL("poll_set_consumer: %s\n",
rd_kafka_err2str(err));
err = rd_kafka_subscribe(rk_c[i], topics);
if (err)
TEST_FAIL("subscribe: %s\n", rd_kafka_err2str(err));
}
rd_kafka_topic_partition_list_destroy(topics);
TIMING_START(&t_grps, "WAIT.GROUPS");
/* Query groups again until both groups are seen. */
while (1) {
int groups_seen = list_groups(rk, (char **)groups, _CONS_CNT,
"should see my groups");
if (groups_seen == _CONS_CNT)
break;
rd_sleep(1);
}
TIMING_STOP(&t_grps);
/* Try a list_groups with a low enough timeout to fail. */
grplist = NULL;
TIMING_START(&t_grps, "WAIT.GROUPS.TIMEOUT0");
err = rd_kafka_list_groups(rk, NULL, &grplist, 0);
TIMING_STOP(&t_grps);
TEST_SAY("list_groups(timeout=0) returned %d groups and status: %s\n",
grplist ? grplist->group_cnt : -1, rd_kafka_err2str(err));
TEST_ASSERT(err == RD_KAFKA_RESP_ERR__TIMED_OUT,
"expected list_groups(timeout=0) to fail "
"with timeout, got %s", rd_kafka_err2str(err));
TEST_SAY("Closing remaining consumers\n");
for (i = 0 ; i < _CONS_CNT ; i++) {
test_timing_t t_close;
if (!rk_c[i])
continue;
TEST_SAY("Closing %s\n", rd_kafka_name(rk_c[i]));
TIMING_START(&t_close, "CONSUMER.CLOSE");
err = rd_kafka_consumer_close(rk_c[i]);
TIMING_STOP(&t_close);
if (err)
TEST_FAIL("consumer_close failed: %s\n",
rd_kafka_err2str(err));
rd_kafka_destroy(rk_c[i]);
rk_c[i] = NULL;
free(groups[i]);
}
rd_kafka_destroy(rk);
return 0;
}