Blob Blame History Raw
/*
 * librdkafka - Apache Kafka C library
 *
 * Copyright (c) 2012-2015, Magnus Edenhill
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *
 * 1. Redistributions of source code must retain the above copyright notice,
 *    this list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright notice,
 *    this list of conditions and the following disclaimer in the documentation
 *    and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */

#include "test.h"
#include "rdkafka.h"

#include <stdarg.h>

/**
 * Various partitioner tests
 *
 * - Issue #797 - deadlock on failed partitioning
 * - Verify that partitioning works across partitioners.
 */

int32_t my_invalid_partitioner (const rd_kafka_topic_t *rkt,
				const void *keydata, size_t keylen,
				int32_t partition_cnt,
				void *rkt_opaque,
				void *msg_opaque) {
	int32_t partition = partition_cnt + 10;
	TEST_SAYL(4, "partition \"%.*s\" to %"PRId32"\n",
		 (int)keylen, (const char *)keydata, partition);
	return partition;
}


/* FIXME: This doesn't seem to trigger the bug in #797.
 *        Still a useful test though. */
static void do_test_failed_partitioning (void) {
	rd_kafka_t *rk;
	rd_kafka_topic_t *rkt;
	rd_kafka_topic_conf_t *tconf;
	const char *topic = test_mk_topic_name(__FUNCTION__, 1);
	int i;

	test_conf_init(NULL, &tconf, 0);

	rk = test_create_producer();
	rd_kafka_topic_conf_set_partitioner_cb(tconf, my_invalid_partitioner);
	test_topic_conf_set(tconf, "message.timeout.ms",
                            tsprintf("%d", tmout_multip(10000)));
	rkt = rd_kafka_topic_new(rk, topic, tconf);
	TEST_ASSERT(rkt != NULL, "%s", rd_kafka_err2str(rd_kafka_last_error()));

	/* Produce some messages (to p 0) to create topic */
	test_produce_msgs(rk, rkt, 0, 0, 0, 100, NULL, 0);

	/* Now use partitioner */
	for (i = 0 ; i < 10000 ; i++) {
		rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
		if (rd_kafka_produce(rkt, RD_KAFKA_PARTITION_UA,
				     0, NULL, 0, NULL, 0, NULL) == -1)
			err = rd_kafka_last_error();
		if (err != RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION)
			TEST_FAIL("produce(): "
				  "Expected UNKNOWN_PARTITION, got %s\n",
				  rd_kafka_err2str(err));
	}
	test_flush(rk, 5000);

	rd_kafka_topic_destroy(rkt);
	rd_kafka_destroy(rk);
}


static void part_dr_msg_cb (rd_kafka_t *rk,
                            const rd_kafka_message_t *rkmessage, void *opaque) {
        int32_t *partp = rkmessage->_private;
        int *remainsp = opaque;

        if (rkmessage->err) {
                /* Will fail later */
                TEST_WARN("Delivery failed: %s\n",
                          rd_kafka_err2str(rkmessage->err));
                *partp = -1;
        } else {
                *partp = rkmessage->partition;
        }

        (*remainsp)--;
}

/**
 * @brief Test single \p partitioner
 */
static void do_test_partitioner (const char *topic, const char *partitioner,
                                 int msgcnt, const char **keys,
                                 const int32_t *exp_part) {
        rd_kafka_t *rk;
        rd_kafka_conf_t *conf;
        int i;
        int32_t *parts;
        int remains = msgcnt;
        int randcnt = 0;
        int fails = 0;

        TEST_SAY(_C_MAG "Test partitioner \"%s\"\n", partitioner);

        test_conf_init(&conf, NULL, 30);
        rd_kafka_conf_set_opaque(conf, &remains);
        rd_kafka_conf_set_dr_msg_cb(conf, part_dr_msg_cb);
        test_conf_set(conf, "partitioner", partitioner);

        rk = test_create_handle(RD_KAFKA_PRODUCER, conf);

        parts = malloc(msgcnt * sizeof(*parts));
        for (i = 0 ; i < msgcnt ; i++)
                parts[i] = -1;

        /*
         * Produce messages
         */
        for (i = 0 ; i < msgcnt ; i++) {
                rd_kafka_resp_err_t err;

                err = rd_kafka_producev(rk,
                                        RD_KAFKA_V_TOPIC(topic),
                                        RD_KAFKA_V_KEY(keys[i],
                                                       keys[i] ?
                                                       strlen(keys[i]) : 0),
                                        RD_KAFKA_V_OPAQUE(&parts[i]),
                                        RD_KAFKA_V_END);
                TEST_ASSERT(!err,
                            "producev() failed: %s", rd_kafka_err2str(err));

                randcnt += exp_part[i] == -1;
        }

        rd_kafka_flush(rk, tmout_multip(10000));

        TEST_ASSERT(remains == 0,
                    "Expected remains=%d, not %d for %d messages",
                    0, remains, msgcnt);

        /*
         * Verify produced partitions to expected partitions.
         */

        /* First look for produce failures */
        for (i = 0 ; i < msgcnt ; i++) {
                if (parts[i] == -1) {
                        TEST_WARN("Message #%d (exp part %"PRId32") "
                                  "was not successfully produced\n",
                                  i, exp_part[i]);
                        fails++;
                }
        }

        TEST_ASSERT(!fails, "See %d previous failure(s)", fails);


        if (randcnt == msgcnt) {
                /* If all expected partitions are random make sure
                 * the produced partitions have some form of
                 * random distribution */
                int32_t last_part = parts[0];
                int samecnt = 0;

                for (i = 0 ; i < msgcnt ; i++) {
                        samecnt += parts[i] == last_part;
                        last_part = parts[i];
                }

                TEST_ASSERT(samecnt < msgcnt,
                            "No random distribution, all on partition %"PRId32,
                            last_part);
        } else {
                for (i = 0 ; i < msgcnt ; i++) {
                        if (exp_part[i] != -1 &&
                            parts[i] != exp_part[i]) {
                                TEST_WARN("Message #%d expected partition "
                                          "%"PRId32" but got %"PRId32": %s\n",
                                          i, exp_part[i], parts[i],
                                          keys[i]);
                                fails++;
                        }
                }


                TEST_ASSERT(!fails, "See %d previous failure(s)", fails);
        }

        free(parts);

        rd_kafka_destroy(rk);

        TEST_SAY(_C_GRN "Test partitioner \"%s\": PASS\n", partitioner);
}

extern uint32_t rd_crc32 (const char *, size_t);

/**
 * @brief Test all builtin partitioners
 */
static void do_test_partitioners (void) {
#define _PART_CNT 17
#define _MSG_CNT 5
        const char *unaligned = "123456";
        /* Message keys */
        const char *keys[_MSG_CNT] = {
                NULL,
                "", // empty
                unaligned+1,
                "this is another string with more length to it perhaps",
                "hejsan"
        };
        struct {
                const char *partitioner;
                /* Expected partition per message (see keys above) */
                int32_t exp_part[_MSG_CNT];
        } ptest[] = {
                { "random", { -1, -1, -1, -1, -1 } },
                { "consistent", {
                                /* These constants were acquired using
                                 * the 'crc32' command on OSX */
                                0x0 % _PART_CNT,
                                0x0 % _PART_CNT,
                                0xb1b451d7 % _PART_CNT,
                                0xb0150df7 % _PART_CNT,
                                0xd077037e % _PART_CNT
                        } },
                { "consistent_random", {
                                -1,
                                -1,
                                0xb1b451d7 % _PART_CNT,
                                0xb0150df7 % _PART_CNT,
                                0xd077037e % _PART_CNT
                        } },
                { "murmur2", {
                                /* .. using tests/java/Murmur2Cli */
                                0x106e08d9 % _PART_CNT,
                                0x106e08d9 % _PART_CNT,
                                0x858d780f % _PART_CNT,
                                0xcf7703da % _PART_CNT,
                                0x5ec19395 % _PART_CNT
                        } },
                { "murmur2_random", {
                                -1,
                                0x106e08d9 % _PART_CNT,
                                0x858d780f % _PART_CNT,
                                0xcf7703da % _PART_CNT,
                                0x5ec19395 % _PART_CNT
                        } },
                { NULL }
        };
        int pi;
        const char *topic = test_mk_topic_name(__FUNCTION__, 1);

        test_create_topic(topic, _PART_CNT, 1);

        for (pi = 0 ; ptest[pi].partitioner ; pi++) {
                do_test_partitioner(topic, ptest[pi].partitioner,
                                    _MSG_CNT, keys, ptest[pi].exp_part);
        }
}

int main_0048_partitioner (int argc, char **argv) {
        do_test_partitioners();
	do_test_failed_partitioning();
	return 0;
}