Blob Blame History Raw
/*
 * librdkafka - Apache Kafka C library
 *
 * Copyright (c) 2012-2015, Magnus Edenhill
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *
 * 1. Redistributions of source code must retain the above copyright notice,
 *    this list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright notice,
 *    this list of conditions and the following disclaimer in the documentation
 *    and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */

#include "test.h"
#include "rdkafka.h"

#include <stdarg.h>

/**
 * Verify that subscription is updated on metadata changes:
 *  - topic additions
 *  - topic deletions
 *  - partition count changes
 */



/**
 * Wait for REBALANCE ASSIGN event and perform assignment
 *
 * Va-args are \p topic_cnt tuples of the expected assignment:
 *   { const char *topic, int partition_cnt }
 */
static void await_assignment (const char *pfx, rd_kafka_t *rk,
			      rd_kafka_queue_t *queue,
			      int topic_cnt, ...) {
	rd_kafka_event_t *rkev;
	rd_kafka_topic_partition_list_t *tps;
	int i;
	va_list ap;
	int fails = 0;
	int exp_part_cnt = 0;

	TEST_SAY("%s: waiting for assignment\n", pfx);
	rkev = test_wait_event(queue, RD_KAFKA_EVENT_REBALANCE, 30000);
	if (!rkev)
		TEST_FAIL("timed out waiting for assignment");
	TEST_ASSERT(rd_kafka_event_error(rkev) ==
		    RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS,
		    "expected ASSIGN, got %s",
		    rd_kafka_err2str(rd_kafka_event_error(rkev)));
	tps = rd_kafka_event_topic_partition_list(rkev);

	TEST_SAY("%s: assignment:\n", pfx);
	test_print_partition_list(tps);

	va_start(ap, topic_cnt);
	for (i = 0 ; i < topic_cnt ; i++) {
		const char *topic = va_arg(ap, const char *);
		int partition_cnt = va_arg(ap, int);
		int p;
		TEST_SAY("%s: expecting %s with %d partitions\n",
			 pfx, topic, partition_cnt);
		for (p = 0 ; p < partition_cnt ; p++) {
			if (!rd_kafka_topic_partition_list_find(tps, topic, p)) {
				TEST_FAIL_LATER("%s: expected partition %s [%d] "
						"not found in assginment",
						pfx, topic, p);
				fails++;
			}
		}
		exp_part_cnt += partition_cnt;
	}
	va_end(ap);

	TEST_ASSERT(exp_part_cnt == tps->cnt,
		    "expected assignment of %d partitions, got %d",
		    exp_part_cnt, tps->cnt);

	if (fails > 0)
		TEST_FAIL("%s: assignment mismatch: see above", pfx);

	rd_kafka_assign(rk, tps);
	rd_kafka_event_destroy(rkev);
}


/**
 * Wait for REBALANCE REVOKE event and perform unassignment.
 */
static void await_revoke (const char *pfx, rd_kafka_t *rk,
			  rd_kafka_queue_t *queue) {
	rd_kafka_event_t *rkev;

	TEST_SAY("%s: waiting for revoke\n", pfx);
	rkev = test_wait_event(queue, RD_KAFKA_EVENT_REBALANCE, 30000);
	if (!rkev)
		TEST_FAIL("timed out waiting for revoke");
	TEST_ASSERT(rd_kafka_event_error(rkev) ==
		    RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS,
		    "expected REVOKE, got %s",
		    rd_kafka_err2str(rd_kafka_event_error(rkev)));
	rd_kafka_assign(rk, NULL);
	rd_kafka_event_destroy(rkev);
}

/**
 * Wait \p timeout_ms to make sure no rebalance was triggered.
 */
static void await_no_rebalance (const char *pfx, rd_kafka_t *rk,
				rd_kafka_queue_t *queue, int timeout_ms) {
	rd_kafka_event_t *rkev;

	TEST_SAY("%s: waiting for %d ms to not see rebalance\n",
		 pfx, timeout_ms);
	rkev = test_wait_event(queue, RD_KAFKA_EVENT_REBALANCE, timeout_ms);
	if (!rkev)
		return;
	TEST_ASSERT(rkev, "did not expect %s: %s",
		    rd_kafka_event_name(rkev),
		    rd_kafka_err2str(rd_kafka_event_error(rkev)));
	rd_kafka_event_destroy(rkev);

}

static void do_test_non_exist_and_partchange (void) {
	char *topic_a = rd_strdup(test_mk_topic_name("topic_a", 1));
	rd_kafka_t *rk;
	rd_kafka_conf_t *conf;
	rd_kafka_queue_t *queue;

	/**
	 * Test #1:
	 * - Subscribe to non-existing topic.
	 * - Verify empty assignment
	 * - Create topic
	 * - Verify new assignment containing topic
	 */
	TEST_SAY("#1 & #2 testing\n");
	test_conf_init(&conf, NULL, 60);

	/* Decrease metadata interval to speed up topic change discovery. */
	test_conf_set(conf, "topic.metadata.refresh.interval.ms", "5000");

	rd_kafka_conf_set_events(conf, RD_KAFKA_EVENT_REBALANCE);
	rk = test_create_consumer(test_str_id_generate_tmp(),
				  NULL, conf, NULL);
	queue = rd_kafka_queue_get_consumer(rk);

	TEST_SAY("#1: Subscribing to %s\n", topic_a);
	test_consumer_subscribe(rk, topic_a);

	/* Should not see a rebalance since no topics are matched. */
	await_no_rebalance("#1: empty", rk, queue, 10000);

	TEST_SAY("#1: creating topic %s\n", topic_a);
	test_create_topic(topic_a, 2, 1);

	await_assignment("#1: proper", rk, queue, 1,
			 topic_a, 2);


	/**
	 * Test #2 (continue with #1 consumer)
	 * - Increase the partition count
	 * - Verify updated assignment
	 */
	test_kafka_topics("--alter --topic %s --partitions 4",
			  topic_a);
	await_revoke("#2", rk, queue);

	await_assignment("#2: more partitions", rk, queue, 1,
			 topic_a, 4);

	test_consumer_close(rk);
	rd_kafka_queue_destroy(queue);
	rd_kafka_destroy(rk);

	rd_free(topic_a);
}



static void do_test_regex (void) {
	char *base_topic = rd_strdup(test_mk_topic_name("topic", 1));
	char *topic_b = rd_strdup(tsprintf("%s_b", base_topic));
	char *topic_c = rd_strdup(tsprintf("%s_c", base_topic));
	char *topic_d = rd_strdup(tsprintf("%s_d", base_topic));
	char *topic_e = rd_strdup(tsprintf("%s_e", base_topic));
	rd_kafka_t *rk;
	rd_kafka_conf_t *conf;
	rd_kafka_queue_t *queue;

	/**
	 * Regex test:
	 * - Create topic b
	 * - Subscribe to b & d & e
	 * - Verify b assignment
	 * - Create topic c
	 * - Verify no rebalance
	 * - Create topic d
	 * - Verify b & d assignment
	 */
	TEST_SAY("Regex testing\n");
	test_conf_init(&conf, NULL, 60);

	/* Decrease metadata interval to speed up topic change discovery. */
	test_conf_set(conf, "topic.metadata.refresh.interval.ms", "5000");

	rd_kafka_conf_set_events(conf, RD_KAFKA_EVENT_REBALANCE);
	rk = test_create_consumer(test_str_id_generate_tmp(),
				  NULL, conf, NULL);
	queue = rd_kafka_queue_get_consumer(rk);

	TEST_SAY("Regex: creating topic %s (subscribed)\n", topic_b);
	test_create_topic(topic_b, 2, 1);
	rd_sleep(1); // FIXME: do check&wait loop instead

	TEST_SAY("Regex: Subscribing to %s & %s & %s\n",
		 topic_b, topic_d, topic_e);
	test_consumer_subscribe(rk, tsprintf("^%s_[bde]$", base_topic));

	await_assignment("Regex: just one topic exists", rk, queue, 1,
			 topic_b, 2);

	TEST_SAY("Regex: creating topic %s (not subscribed)\n", topic_c);
	test_create_topic(topic_c, 4, 1);

	/* Should not see a rebalance since no topics are matched. */
	await_no_rebalance("Regex: empty", rk, queue, 10000);

	TEST_SAY("Regex: creating topic %s (subscribed)\n", topic_d);
	test_create_topic(topic_d, 1, 1);

	await_revoke("Regex: rebalance after topic creation", rk, queue);

	await_assignment("Regex: two topics exist", rk, queue, 2,
			 topic_b, 2,
			 topic_d, 1);

	test_consumer_close(rk);
	rd_kafka_queue_destroy(queue);
	rd_kafka_destroy(rk);

	rd_free(base_topic);
	rd_free(topic_b);
	rd_free(topic_c);
	rd_free(topic_d);
	rd_free(topic_e);
}


/* @remark This test will fail if auto topic creation is enabled on the broker
 * since the client will issue a topic-creating metadata request to find
 * a new leader when the topic is removed.
 *
 * To run with trivup, do:
 * ./interactive_broker_version.py .. -conf '{"auto_create_topics":"false"}' ..
 * TESTS=0045 ./run-test.sh -k ./merged
 */
static void do_test_topic_remove (void) {
	char *topic_f = rd_strdup(test_mk_topic_name("topic_f", 1));
	char *topic_g = rd_strdup(test_mk_topic_name("topic_g", 1));
	int parts_f = 5;
	int parts_g = 9;
	rd_kafka_t *rk;
	rd_kafka_conf_t *conf;
	rd_kafka_queue_t *queue;
	rd_kafka_topic_partition_list_t *topics;
	rd_kafka_resp_err_t err;

	/**
	 * Topic removal test:
	 * - Create topic f & g
	 * - Subscribe to f & g
	 * - Verify f & g assignment
	 * - Remove topic f
	 * - Verify g assignment
	 * - Remove topic g
	 * - Verify empty assignment
	 */
	TEST_SAY("Topic removal testing\n");
	test_conf_init(&conf, NULL, 60);

	/* Decrease metadata interval to speed up topic change discovery. */
	test_conf_set(conf, "topic.metadata.refresh.interval.ms", "5000");

	rd_kafka_conf_set_events(conf, RD_KAFKA_EVENT_REBALANCE);
	rk = test_create_consumer(test_str_id_generate_tmp(),
				  NULL, conf, NULL);
	queue = rd_kafka_queue_get_consumer(rk);

	TEST_SAY("Topic removal: creating topic %s (subscribed)\n", topic_f);
	test_create_topic(topic_f, parts_f, 1);

	TEST_SAY("Topic removal: creating topic %s (subscribed)\n", topic_g);
	test_create_topic(topic_g, parts_g, 1);

	rd_sleep(1); // FIXME: do check&wait loop instead

	TEST_SAY("Topic removal: Subscribing to %s & %s\n", topic_f, topic_g);
	topics = rd_kafka_topic_partition_list_new(2);
	rd_kafka_topic_partition_list_add(topics, topic_f, RD_KAFKA_PARTITION_UA);
	rd_kafka_topic_partition_list_add(topics, topic_g, RD_KAFKA_PARTITION_UA);
	err = rd_kafka_subscribe(rk, topics);
	TEST_ASSERT(err == RD_KAFKA_RESP_ERR_NO_ERROR,
		    "%s", rd_kafka_err2str(err));
	rd_kafka_topic_partition_list_destroy(topics);

	await_assignment("Topic removal: both topics exist", rk, queue, 2,
			 topic_f, parts_f,
			 topic_g, parts_g);

	TEST_SAY("Topic removal: removing %s\n", topic_f);
	test_kafka_topics("--delete --topic %s", topic_f);

	await_revoke("Topic removal: rebalance after topic removal", rk, queue);

	await_assignment("Topic removal: one topic exists", rk, queue, 1,
			 topic_g, parts_g);
	
	TEST_SAY("Topic removal: removing %s\n", topic_g);
	test_kafka_topics("--delete --topic %s", topic_g);

	await_revoke("Topic removal: rebalance after 2nd topic removal",
		     rk, queue);

	/* Should not see another rebalance since all topics now removed */
	await_no_rebalance("Topic removal: empty", rk, queue, 10000);

	test_consumer_close(rk);
	rd_kafka_queue_destroy(queue);
	rd_kafka_destroy(rk);

	rd_free(topic_f);
	rd_free(topic_g);
}


int main_0045_subscribe_update (int argc, char **argv) {

        if (!test_can_create_topics(1))
                return 0;

        do_test_regex();

        return 0;
}

int main_0045_subscribe_update_non_exist_and_partchange (int argc, char **argv){
        if (test_check_auto_create_topic()) {
                TEST_SKIP("do_test_non_exist_and_partchange(): "
                          "topic auto-creation is enabled\n");
                return 0;
        }

        do_test_non_exist_and_partchange();

        return 0;
}

int main_0045_subscribe_update_topic_remove (int argc, char **argv) {

	if (!test_can_create_topics(1))
		return 0;

	do_test_topic_remove();

        return 0;
}