Blob Blame History Raw
/*
 * librdkafka - Apache Kafka C library
 *
 * Copyright (c) 2012-2015, Magnus Edenhill
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *
 * 1. Redistributions of source code must retain the above copyright notice,
 *    this list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright notice,
 *    this list of conditions and the following disclaimer in the documentation
 *    and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */

#include "test.h"

/* Typical include path would be <librdkafka/rdkafka.h>, but this program
 * is built from within the librdkafka source tree and thus differs. */
#include "rdkafka.h"  /* for Kafka driver */


/**
 * List consumer groups
 *
 * Runs two consumers in two different groups and lists them.
 */



/**
 * Verify that all groups in 'groups' are seen, if so returns group_cnt,
 * else returns -1.
 */
static int verify_groups (const struct rd_kafka_group_list *grplist,
                          char **groups, int group_cnt) {
        int i;
        int seen = 0;

        for (i = 0 ; i < grplist->group_cnt ; i++) {
                const struct rd_kafka_group_info *gi = &grplist->groups[i];
                int j;

                for (j = 0 ; j < group_cnt ; j++) {
                        if (strcmp(gi->group, groups[j]))
                                continue;

                        if (gi->err)
                                TEST_SAY("Group %s has broker-reported "
                                         "error: %s\n", gi->group,
                                         rd_kafka_err2str(gi->err));

                        seen++;
                }
        }

        TEST_SAY("Found %d/%d desired groups in list of %d groups\n",
                 seen, group_cnt, grplist->group_cnt);

        if (seen != group_cnt)
                return -1;
        else
                return seen;
}


/**
 * List groups by:
 *   - List all groups, check that the groups in 'groups' are seen.
 *   - List each group in 'groups', one by one.
 *
 * Returns 'group_cnt' if all groups in 'groups' were seen by both
 * methods, else 0, or -1 on error.
 */
static int list_groups (rd_kafka_t *rk, char **groups, int group_cnt,
                        const char *desc) {
        rd_kafka_resp_err_t err = 0;
        const struct rd_kafka_group_list *grplist;
        int i, r;
        int fails = 0;
        int seen = 0;
        int seen_all = 0;
	int retries = 5;

        TEST_SAY("List groups (expect %d): %s\n", group_cnt, desc);

	/* FIXME: Wait for broker to come up. This should really be abstracted
	 *        by librdkafka. */
	do {
		if (err) {
			TEST_SAY("Retrying group list in 1s because of: %s\n",
				 rd_kafka_err2str(err));
			rd_sleep(1);
		}
		err = rd_kafka_list_groups(rk, NULL, &grplist,
                                           tmout_multip(5000));
	} while ((err == RD_KAFKA_RESP_ERR__TRANSPORT ||
		  err == RD_KAFKA_RESP_ERR_GROUP_LOAD_IN_PROGRESS) &&
		 retries-- > 0);

        if (err) {
                TEST_SAY("Failed to list all groups: %s\n",
                         rd_kafka_err2str(err));
                return -1;
        }

        seen_all = verify_groups(grplist, groups, group_cnt);
        rd_kafka_group_list_destroy(grplist);

        for (i = 0 ; i < group_cnt ; i++) {
                err = rd_kafka_list_groups(rk, groups[i], &grplist, 5000);
                if (err) {
                        TEST_SAY("Failed to list group %s: %s\n",
                                 groups[i], rd_kafka_err2str(err));
                        fails++;
                        continue;
                }

                r = verify_groups(grplist, &groups[i], 1);
                if (r == 1)
                        seen++;
                rd_kafka_group_list_destroy(grplist);
        }


        if (seen_all != seen)
                return 0;

        return seen;
}



int main_0019_list_groups (int argc, char **argv) {
	const char *topic = test_mk_topic_name(__FUNCTION__, 1);
#define _CONS_CNT 2
        char *groups[_CONS_CNT];
	rd_kafka_t *rk, *rk_c[_CONS_CNT];
	rd_kafka_topic_partition_list_t *topics;
	rd_kafka_resp_err_t err;
        test_timing_t t_grps;
	int i;
        int groups_seen;
	rd_kafka_topic_t *rkt;
        const struct rd_kafka_group_list *grplist;

        /* Handle for group listings */
        rk = test_create_producer();

	/* Produce messages so that topic is auto created */
	rkt = test_create_topic_object(rk, topic, NULL);
	test_produce_msgs(rk, rkt, 0, 0, 0, 10, NULL, 64);
	rd_kafka_topic_destroy(rkt);

        /* Query groups before creation, should not list our groups. */
        groups_seen = list_groups(rk, NULL, 0, "should be none");
        if (groups_seen != 0)
                TEST_FAIL("Saw %d groups when there wasn't "
                          "supposed to be any\n", groups_seen);

	/* Fill in topic subscription set */
	topics = rd_kafka_topic_partition_list_new(1);
	rd_kafka_topic_partition_list_add(topics, topic, -1);

	/* Create consumers and start subscription */
	for (i = 0 ; i < _CONS_CNT ; i++) {
                groups[i] = malloc(32);
                test_str_id_generate(groups[i], 32);
		rk_c[i] = test_create_consumer(groups[i],
					       NULL, NULL, NULL);

		err = rd_kafka_poll_set_consumer(rk_c[i]);
		if (err)
			TEST_FAIL("poll_set_consumer: %s\n",
				  rd_kafka_err2str(err));

		err = rd_kafka_subscribe(rk_c[i], topics);
		if (err)
			TEST_FAIL("subscribe: %s\n", rd_kafka_err2str(err));
	}

        rd_kafka_topic_partition_list_destroy(topics);


        TIMING_START(&t_grps, "WAIT.GROUPS");
        /* Query groups again until both groups are seen. */
        while (1) {
                int groups_seen = list_groups(rk, (char **)groups, _CONS_CNT,
                                              "should see my groups");
                if (groups_seen == _CONS_CNT)
                        break;
                rd_sleep(1);
        }
        TIMING_STOP(&t_grps);

        /* Try a list_groups with a low enough timeout to fail. */
        grplist = NULL;
        TIMING_START(&t_grps, "WAIT.GROUPS.TIMEOUT0");
        err = rd_kafka_list_groups(rk, NULL, &grplist, 0);
        TIMING_STOP(&t_grps);
        TEST_SAY("list_groups(timeout=0) returned %d groups and status: %s\n",
                 grplist ? grplist->group_cnt : -1, rd_kafka_err2str(err));
        TEST_ASSERT(err == RD_KAFKA_RESP_ERR__TIMED_OUT,
                    "expected list_groups(timeout=0) to fail "
                    "with timeout, got %s", rd_kafka_err2str(err));


	TEST_SAY("Closing remaining consumers\n");
	for (i = 0 ; i < _CONS_CNT ; i++) {
		test_timing_t t_close;
		if (!rk_c[i])
			continue;

		TEST_SAY("Closing %s\n", rd_kafka_name(rk_c[i]));
		TIMING_START(&t_close, "CONSUMER.CLOSE");
		err = rd_kafka_consumer_close(rk_c[i]);
		TIMING_STOP(&t_close);
		if (err)
			TEST_FAIL("consumer_close failed: %s\n",
				  rd_kafka_err2str(err));

		rd_kafka_destroy(rk_c[i]);
		rk_c[i] = NULL;

                free(groups[i]);
	}

        rd_kafka_destroy(rk);

        return 0;
}