Blame tests/0048-partitioner.c

Packit 2997f0
/*
Packit 2997f0
 * librdkafka - Apache Kafka C library
Packit 2997f0
 *
Packit 2997f0
 * Copyright (c) 2012-2015, Magnus Edenhill
Packit 2997f0
 * All rights reserved.
Packit 2997f0
 *
Packit 2997f0
 * Redistribution and use in source and binary forms, with or without
Packit 2997f0
 * modification, are permitted provided that the following conditions are met:
Packit 2997f0
 *
Packit 2997f0
 * 1. Redistributions of source code must retain the above copyright notice,
Packit 2997f0
 *    this list of conditions and the following disclaimer.
Packit 2997f0
 * 2. Redistributions in binary form must reproduce the above copyright notice,
Packit 2997f0
 *    this list of conditions and the following disclaimer in the documentation
Packit 2997f0
 *    and/or other materials provided with the distribution.
Packit 2997f0
 *
Packit 2997f0
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
Packit 2997f0
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
Packit 2997f0
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
Packit 2997f0
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
Packit 2997f0
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
Packit 2997f0
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
Packit 2997f0
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
Packit 2997f0
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
Packit 2997f0
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
Packit 2997f0
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
Packit 2997f0
 * POSSIBILITY OF SUCH DAMAGE.
Packit 2997f0
 */
Packit 2997f0
Packit 2997f0
#include "test.h"
Packit 2997f0
#include "rdkafka.h"
Packit 2997f0
Packit 2997f0
#include <stdarg.h>
Packit 2997f0
Packit 2997f0
/**
Packit 2997f0
 * Various partitioner tests
Packit 2997f0
 *
Packit 2997f0
 * - Issue #797 - deadlock on failed partitioning
Packit 2997f0
 * - Verify that partitioning works across partitioners.
Packit 2997f0
 */
Packit 2997f0
Packit 2997f0
int32_t my_invalid_partitioner (const rd_kafka_topic_t *rkt,
Packit 2997f0
				const void *keydata, size_t keylen,
Packit 2997f0
				int32_t partition_cnt,
Packit 2997f0
				void *rkt_opaque,
Packit 2997f0
				void *msg_opaque) {
Packit 2997f0
	int32_t partition = partition_cnt + 10;
Packit 2997f0
	TEST_SAYL(4, "partition \"%.*s\" to %"PRId32"\n",
Packit 2997f0
		 (int)keylen, (const char *)keydata, partition);
Packit 2997f0
	return partition;
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
Packit 2997f0
/* FIXME: This doesn't seem to trigger the bug in #797.
Packit 2997f0
 *        Still a useful test though. */
Packit 2997f0
static void do_test_failed_partitioning (void) {
Packit 2997f0
	rd_kafka_t *rk;
Packit 2997f0
	rd_kafka_topic_t *rkt;
Packit 2997f0
	rd_kafka_topic_conf_t *tconf;
Packit 2997f0
	const char *topic = test_mk_topic_name(__FUNCTION__, 1);
Packit 2997f0
	int i;
Packit 2997f0
Packit 2997f0
	test_conf_init(NULL, &tconf, 0);
Packit 2997f0
Packit 2997f0
	rk = test_create_producer();
Packit 2997f0
	rd_kafka_topic_conf_set_partitioner_cb(tconf, my_invalid_partitioner);
Packit 2997f0
	test_topic_conf_set(tconf, "message.timeout.ms",
Packit 2997f0
                            tsprintf("%d", tmout_multip(10000)));
Packit 2997f0
	rkt = rd_kafka_topic_new(rk, topic, tconf);
Packit 2997f0
	TEST_ASSERT(rkt != NULL, "%s", rd_kafka_err2str(rd_kafka_last_error()));
Packit 2997f0
Packit 2997f0
	/* Produce some messages (to p 0) to create topic */
Packit 2997f0
	test_produce_msgs(rk, rkt, 0, 0, 0, 100, NULL, 0);
Packit 2997f0
Packit 2997f0
	/* Now use partitioner */
Packit 2997f0
	for (i = 0 ; i < 10000 ; i++) {
Packit 2997f0
		rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
Packit 2997f0
		if (rd_kafka_produce(rkt, RD_KAFKA_PARTITION_UA,
Packit 2997f0
				     0, NULL, 0, NULL, 0, NULL) == -1)
Packit 2997f0
			err = rd_kafka_last_error();
Packit 2997f0
		if (err != RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION)
Packit 2997f0
			TEST_FAIL("produce(): "
Packit 2997f0
				  "Expected UNKNOWN_PARTITION, got %s\n",
Packit 2997f0
				  rd_kafka_err2str(err));
Packit 2997f0
	}
Packit 2997f0
	test_flush(rk, 5000);
Packit 2997f0
Packit 2997f0
	rd_kafka_topic_destroy(rkt);
Packit 2997f0
	rd_kafka_destroy(rk);
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
Packit 2997f0
static void part_dr_msg_cb (rd_kafka_t *rk,
Packit 2997f0
                            const rd_kafka_message_t *rkmessage, void *opaque) {
Packit 2997f0
        int32_t *partp = rkmessage->_private;
Packit 2997f0
        int *remainsp = opaque;
Packit 2997f0
Packit 2997f0
        if (rkmessage->err) {
Packit 2997f0
                /* Will fail later */
Packit 2997f0
                TEST_WARN("Delivery failed: %s\n",
Packit 2997f0
                          rd_kafka_err2str(rkmessage->err));
Packit 2997f0
                *partp = -1;
Packit 2997f0
        } else {
Packit 2997f0
                *partp = rkmessage->partition;
Packit 2997f0
        }
Packit 2997f0
Packit 2997f0
        (*remainsp)--;
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
/**
Packit 2997f0
 * @brief Test single \p partitioner
Packit 2997f0
 */
Packit 2997f0
static void do_test_partitioner (const char *topic, const char *partitioner,
Packit 2997f0
                                 int msgcnt, const char **keys,
Packit 2997f0
                                 const int32_t *exp_part) {
Packit 2997f0
        rd_kafka_t *rk;
Packit 2997f0
        rd_kafka_conf_t *conf;
Packit 2997f0
        int i;
Packit 2997f0
        int32_t *parts;
Packit 2997f0
        int remains = msgcnt;
Packit 2997f0
        int randcnt = 0;
Packit 2997f0
        int fails = 0;
Packit 2997f0
Packit 2997f0
        TEST_SAY(_C_MAG "Test partitioner \"%s\"\n", partitioner);
Packit 2997f0
Packit 2997f0
        test_conf_init(&conf, NULL, 30);
Packit 2997f0
        rd_kafka_conf_set_opaque(conf, &remains);
Packit 2997f0
        rd_kafka_conf_set_dr_msg_cb(conf, part_dr_msg_cb);
Packit 2997f0
        test_conf_set(conf, "partitioner", partitioner);
Packit 2997f0
Packit 2997f0
        rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
Packit 2997f0
Packit 2997f0
        parts = malloc(msgcnt * sizeof(*parts));
Packit 2997f0
        for (i = 0 ; i < msgcnt ; i++)
Packit 2997f0
                parts[i] = -1;
Packit 2997f0
Packit 2997f0
        /*
Packit 2997f0
         * Produce messages
Packit 2997f0
         */
Packit 2997f0
        for (i = 0 ; i < msgcnt ; i++) {
Packit 2997f0
                rd_kafka_resp_err_t err;
Packit 2997f0
Packit 2997f0
                err = rd_kafka_producev(rk,
Packit 2997f0
                                        RD_KAFKA_V_TOPIC(topic),
Packit 2997f0
                                        RD_KAFKA_V_KEY(keys[i],
Packit 2997f0
                                                       keys[i] ?
Packit 2997f0
                                                       strlen(keys[i]) : 0),
Packit 2997f0
                                        RD_KAFKA_V_OPAQUE(&parts[i]),
Packit 2997f0
                                        RD_KAFKA_V_END);
Packit 2997f0
                TEST_ASSERT(!err,
Packit 2997f0
                            "producev() failed: %s", rd_kafka_err2str(err));
Packit 2997f0
Packit 2997f0
                randcnt += exp_part[i] == -1;
Packit 2997f0
        }
Packit 2997f0
Packit 2997f0
        rd_kafka_flush(rk, tmout_multip(10000));
Packit 2997f0
Packit 2997f0
        TEST_ASSERT(remains == 0,
Packit 2997f0
                    "Expected remains=%d, not %d for %d messages",
Packit 2997f0
                    0, remains, msgcnt);
Packit 2997f0
Packit 2997f0
        /*
Packit 2997f0
         * Verify produced partitions to expected partitions.
Packit 2997f0
         */
Packit 2997f0
Packit 2997f0
        /* First look for produce failures */
Packit 2997f0
        for (i = 0 ; i < msgcnt ; i++) {
Packit 2997f0
                if (parts[i] == -1) {
Packit 2997f0
                        TEST_WARN("Message #%d (exp part %"PRId32") "
Packit 2997f0
                                  "was not successfully produced\n",
Packit 2997f0
                                  i, exp_part[i]);
Packit 2997f0
                        fails++;
Packit 2997f0
                }
Packit 2997f0
        }
Packit 2997f0
Packit 2997f0
        TEST_ASSERT(!fails, "See %d previous failure(s)", fails);
Packit 2997f0
Packit 2997f0
Packit 2997f0
        if (randcnt == msgcnt) {
Packit 2997f0
                /* If all expected partitions are random make sure
Packit 2997f0
                 * the produced partitions have some form of
Packit 2997f0
                 * random distribution */
Packit 2997f0
                int32_t last_part = parts[0];
Packit 2997f0
                int samecnt = 0;
Packit 2997f0
Packit 2997f0
                for (i = 0 ; i < msgcnt ; i++) {
Packit 2997f0
                        samecnt += parts[i] == last_part;
Packit 2997f0
                        last_part = parts[i];
Packit 2997f0
                }
Packit 2997f0
Packit 2997f0
                TEST_ASSERT(samecnt < msgcnt,
Packit 2997f0
                            "No random distribution, all on partition %"PRId32,
Packit 2997f0
                            last_part);
Packit 2997f0
        } else {
Packit 2997f0
                for (i = 0 ; i < msgcnt ; i++) {
Packit 2997f0
                        if (exp_part[i] != -1 &&
Packit 2997f0
                            parts[i] != exp_part[i]) {
Packit 2997f0
                                TEST_WARN("Message #%d expected partition "
Packit 2997f0
                                          "%"PRId32" but got %"PRId32": %s\n",
Packit 2997f0
                                          i, exp_part[i], parts[i],
Packit 2997f0
                                          keys[i]);
Packit 2997f0
                                fails++;
Packit 2997f0
                        }
Packit 2997f0
                }
Packit 2997f0
Packit 2997f0
Packit 2997f0
                TEST_ASSERT(!fails, "See %d previous failure(s)", fails);
Packit 2997f0
        }
Packit 2997f0
Packit 2997f0
        free(parts);
Packit 2997f0
Packit 2997f0
        rd_kafka_destroy(rk);
Packit 2997f0
Packit 2997f0
        TEST_SAY(_C_GRN "Test partitioner \"%s\": PASS\n", partitioner);
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
extern uint32_t rd_crc32 (const char *, size_t);
Packit 2997f0
Packit 2997f0
/**
Packit 2997f0
 * @brief Test all builtin partitioners
Packit 2997f0
 */
Packit 2997f0
static void do_test_partitioners (void) {
Packit 2997f0
#define _PART_CNT 17
Packit 2997f0
#define _MSG_CNT 5
Packit 2997f0
        const char *unaligned = "123456";
Packit 2997f0
        /* Message keys */
Packit 2997f0
        const char *keys[_MSG_CNT] = {
Packit 2997f0
                NULL,
Packit 2997f0
                "", // empty
Packit 2997f0
                unaligned+1,
Packit 2997f0
                "this is another string with more length to it perhaps",
Packit 2997f0
                "hejsan"
Packit 2997f0
        };
Packit 2997f0
        struct {
Packit 2997f0
                const char *partitioner;
Packit 2997f0
                /* Expected partition per message (see keys above) */
Packit 2997f0
                int32_t exp_part[_MSG_CNT];
Packit 2997f0
        } ptest[] = {
Packit 2997f0
                { "random", { -1, -1, -1, -1, -1 } },
Packit 2997f0
                { "consistent", {
Packit 2997f0
                                /* These constants were acquired using
Packit 2997f0
                                 * the 'crc32' command on OSX */
Packit 2997f0
                                0x0 % _PART_CNT,
Packit 2997f0
                                0x0 % _PART_CNT,
Packit 2997f0
                                0xb1b451d7 % _PART_CNT,
Packit 2997f0
                                0xb0150df7 % _PART_CNT,
Packit 2997f0
                                0xd077037e % _PART_CNT
Packit 2997f0
                        } },
Packit 2997f0
                { "consistent_random", {
Packit 2997f0
                                -1,
Packit 2997f0
                                -1,
Packit 2997f0
                                0xb1b451d7 % _PART_CNT,
Packit 2997f0
                                0xb0150df7 % _PART_CNT,
Packit 2997f0
                                0xd077037e % _PART_CNT
Packit 2997f0
                        } },
Packit 2997f0
                { "murmur2", {
Packit 2997f0
                                /* .. using tests/java/Murmur2Cli */
Packit 2997f0
                                0x106e08d9 % _PART_CNT,
Packit 2997f0
                                0x106e08d9 % _PART_CNT,
Packit 2997f0
                                0x858d780f % _PART_CNT,
Packit 2997f0
                                0xcf7703da % _PART_CNT,
Packit 2997f0
                                0x5ec19395 % _PART_CNT
Packit 2997f0
                        } },
Packit 2997f0
                { "murmur2_random", {
Packit 2997f0
                                -1,
Packit 2997f0
                                0x106e08d9 % _PART_CNT,
Packit 2997f0
                                0x858d780f % _PART_CNT,
Packit 2997f0
                                0xcf7703da % _PART_CNT,
Packit 2997f0
                                0x5ec19395 % _PART_CNT
Packit 2997f0
                        } },
Packit 2997f0
                { NULL }
Packit 2997f0
        };
Packit 2997f0
        int pi;
Packit 2997f0
        const char *topic = test_mk_topic_name(__FUNCTION__, 1);
Packit 2997f0
Packit 2997f0
        test_create_topic(topic, _PART_CNT, 1);
Packit 2997f0
Packit 2997f0
        for (pi = 0 ; ptest[pi].partitioner ; pi++) {
Packit 2997f0
                do_test_partitioner(topic, ptest[pi].partitioner,
Packit 2997f0
                                    _MSG_CNT, keys, ptest[pi].exp_part);
Packit 2997f0
        }
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
int main_0048_partitioner (int argc, char **argv) {
Packit 2997f0
        do_test_partitioners();
Packit 2997f0
	do_test_failed_partitioning();
Packit 2997f0
	return 0;
Packit 2997f0
}