Blame tests/0004-conf.c

Packit 2997f0
/*
Packit 2997f0
 * librdkafka - Apache Kafka C library
Packit 2997f0
 *
Packit 2997f0
 * Copyright (c) 2012-2013, Magnus Edenhill
Packit 2997f0
 * All rights reserved.
Packit 2997f0
 * 
Packit 2997f0
 * Redistribution and use in source and binary forms, with or without
Packit 2997f0
 * modification, are permitted provided that the following conditions are met: 
Packit 2997f0
 * 
Packit 2997f0
 * 1. Redistributions of source code must retain the above copyright notice,
Packit 2997f0
 *    this list of conditions and the following disclaimer. 
Packit 2997f0
 * 2. Redistributions in binary form must reproduce the above copyright notice,
Packit 2997f0
 *    this list of conditions and the following disclaimer in the documentation
Packit 2997f0
 *    and/or other materials provided with the distribution. 
Packit 2997f0
 * 
Packit 2997f0
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
Packit 2997f0
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
Packit 2997f0
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
Packit 2997f0
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
Packit 2997f0
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
Packit 2997f0
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
Packit 2997f0
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
Packit 2997f0
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
Packit 2997f0
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
Packit 2997f0
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
Packit 2997f0
 * POSSIBILITY OF SUCH DAMAGE.
Packit 2997f0
 */
Packit 2997f0
Packit 2997f0
/**
Packit 2997f0
 * Tests various config related things
Packit 2997f0
 */
Packit 2997f0
Packit 2997f0
Packit 2997f0
#include "test.h"
Packit 2997f0
Packit 2997f0
/* Typical include path would be <librdkafka/rdkafka.h>, but this program
Packit 2997f0
 * is built from within the librdkafka source tree and thus differs. */
Packit 2997f0
#include "rdkafka.h"  /* for Kafka driver */
Packit 2997f0
Packit 2997f0
Packit 2997f0
Packit 2997f0
static void dr_cb (rd_kafka_t *rk, void *payload, size_t len,
Packit 2997f0
		   rd_kafka_resp_err_t err, void *opaque, void *msg_opaque) {
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
static void error_cb (rd_kafka_t *rk, int err, const char *reason,
Packit 2997f0
		      void *opaque) {
Packit 2997f0
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
Packit 2997f0
static int32_t partitioner (const rd_kafka_topic_t *rkt,
Packit 2997f0
			    const void *keydata,
Packit 2997f0
			    size_t keylen,
Packit 2997f0
			    int32_t partition_cnt,
Packit 2997f0
			    void *rkt_opaque,
Packit 2997f0
			    void *msg_opaque) {
Packit 2997f0
	return 0;
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
Packit 2997f0
static void conf_verify (int line,
Packit 2997f0
			 const char **arr, size_t cnt, const char **confs) {
Packit 2997f0
	int i, j;
Packit 2997f0
Packit 2997f0
Packit 2997f0
	for (i = 0 ; confs[i] ; i += 2) {
Packit 2997f0
		for (j = 0 ; j < (int)cnt ; j += 2) {
Packit 2997f0
			if (!strcmp(confs[i], arr[j])) {
Packit 2997f0
				if (strcmp(confs[i+1], arr[j+1]))
Packit 2997f0
					TEST_FAIL("%i: Property %s mismatch: "
Packit 2997f0
						  "expected %s != retrieved %s",
Packit 2997f0
						  line,
Packit 2997f0
						  confs[i],
Packit 2997f0
						  confs[i+1], arr[j+1]);
Packit 2997f0
			}
Packit 2997f0
			if (j == (int)cnt)
Packit 2997f0
				TEST_FAIL("%i: "
Packit 2997f0
					  "Property %s not found in config\n",
Packit 2997f0
					  line,
Packit 2997f0
					  confs[i]);
Packit 2997f0
		}
Packit 2997f0
	}
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
Packit 2997f0
static void conf_cmp (const char *desc,
Packit 2997f0
		      const char **a, size_t acnt,
Packit 2997f0
		      const char **b, size_t bcnt) {
Packit 2997f0
	int i;
Packit 2997f0
Packit 2997f0
	if (acnt != bcnt)
Packit 2997f0
		TEST_FAIL("%s config compare: count %zd != %zd mismatch",
Packit 2997f0
			  desc, acnt, bcnt);
Packit 2997f0
Packit 2997f0
	for (i = 0 ; i < (int)acnt ; i += 2) {
Packit 2997f0
		if (strcmp(a[i], b[i]))
Packit 2997f0
			TEST_FAIL("%s conf mismatch: %s != %s",
Packit 2997f0
				  desc, a[i], b[i]);
Packit 2997f0
		else if (strcmp(a[i+1], b[i+1])) {
Packit 2997f0
                        /* The default_topic_conf will be auto-created
Packit 2997f0
                         * when global->topic fallthru is used, so its
Packit 2997f0
                         * value will not match here. */
Packit 2997f0
                        if (!strcmp(a[i], "default_topic_conf"))
Packit 2997f0
                                continue;
Packit 2997f0
                        TEST_FAIL("%s conf value mismatch for %s: %s != %s",
Packit 2997f0
                                  desc, a[i], a[i+1], b[i+1]);
Packit 2997f0
                }
Packit 2997f0
	}
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
Packit 2997f0
/**
Packit 2997f0
 * @brief Not called, just used for config
Packit 2997f0
 */
Packit 2997f0
static int on_new_call_cnt;
Packit 2997f0
static rd_kafka_resp_err_t my_on_new (rd_kafka_t *rk,
Packit 2997f0
                                      const rd_kafka_conf_t *conf,
Packit 2997f0
                                      void *ic_opaque,
Packit 2997f0
                                      char *errstr, size_t errstr_size) {
Packit 2997f0
        TEST_SAY("%s: on_new() called\n", rd_kafka_name(rk));
Packit 2997f0
        on_new_call_cnt++;
Packit 2997f0
        return RD_KAFKA_RESP_ERR_NO_ERROR;
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
Packit 2997f0
Packit 2997f0
/**
Packit 2997f0
 * @brief When rd_kafka_new() succeeds it takes ownership of the config object,
Packit 2997f0
 *        but when it fails the config object remains in application custody.
Packit 2997f0
 *        These tests makes sure that's the case (preferably run with valgrind)
Packit 2997f0
 */
Packit 2997f0
static void do_test_kafka_new_failures (void) {
Packit 2997f0
        rd_kafka_conf_t *conf;
Packit 2997f0
        rd_kafka_t *rk;
Packit 2997f0
        char errstr[512];
Packit 2997f0
Packit 2997f0
        conf = rd_kafka_conf_new();
Packit 2997f0
Packit 2997f0
        rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
Packit 2997f0
        TEST_ASSERT(rk, "kafka_new() failed: %s", errstr);
Packit 2997f0
        rd_kafka_destroy(rk);
Packit 2997f0
Packit 2997f0
        /* Set an erroneous configuration value that is not checked
Packit 2997f0
         * by conf_set() but by rd_kafka_new() */
Packit 2997f0
        conf = rd_kafka_conf_new();
Packit 2997f0
        if (rd_kafka_conf_set(conf, "partition.assignment.strategy",
Packit 2997f0
                              "range,thiswillfail", errstr, sizeof(errstr)) !=
Packit 2997f0
            RD_KAFKA_CONF_OK)
Packit 2997f0
                TEST_FAIL("%s", errstr);
Packit 2997f0
Packit 2997f0
        rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
Packit 2997f0
        TEST_ASSERT(!rk, "kafka_new() should have failed");
Packit 2997f0
Packit 2997f0
        /* config object should still belong to us,
Packit 2997f0
         * correct the erroneous config and try again. */
Packit 2997f0
        if (rd_kafka_conf_set(conf, "partition.assignment.strategy", NULL,
Packit 2997f0
                              errstr, sizeof(errstr)) !=
Packit 2997f0
            RD_KAFKA_CONF_OK)
Packit 2997f0
                TEST_FAIL("%s", errstr);
Packit 2997f0
Packit 2997f0
        rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
Packit 2997f0
        TEST_ASSERT(rk, "kafka_new() failed: %s", errstr);
Packit 2997f0
        rd_kafka_destroy(rk);
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
Packit 2997f0
/**
Packit 2997f0
 * @brief Verify that INVALID properties (such as for Java SSL properties)
Packit 2997f0
 *        work, as well as INTERNAL properties.
Packit 2997f0
 */
Packit 2997f0
static void do_test_special_invalid_conf (void) {
Packit 2997f0
        rd_kafka_conf_t *conf;
Packit 2997f0
        char errstr[512];
Packit 2997f0
        rd_kafka_conf_res_t res;
Packit 2997f0
Packit 2997f0
        conf = rd_kafka_conf_new();
Packit 2997f0
Packit 2997f0
        res = rd_kafka_conf_set(conf, "ssl.truststore.location", "abc",
Packit 2997f0
                                errstr, sizeof(errstr));
Packit 2997f0
        /* Existing apps might not print the error string when conf_set
Packit 2997f0
         * returns UNKNOWN, only on INVALID, so make sure that is
Packit 2997f0
         * what is being returned. */
Packit 2997f0
        TEST_ASSERT(res == RD_KAFKA_CONF_INVALID,
Packit 2997f0
                    "expected ssl.truststore.location to fail with INVALID, "
Packit 2997f0
                    "not %d", res);
Packit 2997f0
        /* Make sure there is a link to documentation */
Packit 2997f0
        TEST_ASSERT(strstr(errstr, "http"),
Packit 2997f0
                    "expected ssl.truststore.location to provide link to "
Packit 2997f0
                    "documentation, not \"%s\"", errstr);
Packit 2997f0
        TEST_SAY(_C_GRN "Ok: %s\n" _C_CLR, errstr);
Packit 2997f0
Packit 2997f0
Packit 2997f0
        res = rd_kafka_conf_set(conf, "sasl.jaas.config", "abc",
Packit 2997f0
                                errstr, sizeof(errstr));
Packit 2997f0
        /* Existing apps might not print the error string when conf_set
Packit 2997f0
         * returns UNKNOWN, only on INVALID, so make sure that is
Packit 2997f0
         * what is being returned. */
Packit 2997f0
        TEST_ASSERT(res == RD_KAFKA_CONF_INVALID,
Packit 2997f0
                    "expected sasl.jaas.config to fail with INVALID, "
Packit 2997f0
                    "not %d", res);
Packit 2997f0
        /* Make sure there is a link to documentation */
Packit 2997f0
        TEST_ASSERT(strstr(errstr, "http"),
Packit 2997f0
                    "expected sasl.jaas.config to provide link to "
Packit 2997f0
                    "documentation, not \"%s\"", errstr);
Packit 2997f0
        TEST_SAY(_C_GRN "Ok: %s\n" _C_CLR, errstr);
Packit 2997f0
Packit 2997f0
Packit 2997f0
        res = rd_kafka_conf_set(conf, "interceptors", "1",
Packit 2997f0
                                errstr, sizeof(errstr));
Packit 2997f0
        TEST_ASSERT(res == RD_KAFKA_CONF_INVALID,
Packit 2997f0
                    "expected interceptors to fail with INVALID, "
Packit 2997f0
                    "not %d", res);
Packit 2997f0
        TEST_SAY(_C_GRN "Ok: %s\n" _C_CLR, errstr);
Packit 2997f0
Packit 2997f0
        rd_kafka_conf_destroy(conf);
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
Packit 2997f0
int main_0004_conf (int argc, char **argv) {
Packit 2997f0
	rd_kafka_t *rk;
Packit 2997f0
	rd_kafka_topic_t *rkt;
Packit 2997f0
	rd_kafka_conf_t *ignore_conf, *conf, *conf2;
Packit 2997f0
	rd_kafka_topic_conf_t *ignore_topic_conf, *tconf, *tconf2;
Packit 2997f0
	char errstr[512];
Packit 2997f0
        rd_kafka_resp_err_t err;
Packit 2997f0
	const char **arr_orig, **arr_dup;
Packit 2997f0
	size_t cnt_orig, cnt_dup;
Packit 2997f0
	int i;
Packit 2997f0
        const char *topic;
Packit 2997f0
	static const char *gconfs[] = {
Packit 2997f0
		"message.max.bytes", "12345", /* int property */
Packit 2997f0
		"client.id", "my id", /* string property */
Packit 2997f0
		"debug", "topic,metadata,interceptor", /* S2F property */
Packit 2997f0
		"topic.blacklist", "__.*", /* #778 */
Packit 2997f0
                "auto.offset.reset", "earliest", /* Global->Topic fallthru */
Packit 2997f0
#if WITH_ZLIB
Packit 2997f0
		"compression.codec", "gzip", /* S2I property */
Packit 2997f0
#endif
Packit 2997f0
		NULL
Packit 2997f0
	};
Packit 2997f0
	static const char *tconfs[] = {
Packit 2997f0
		"request.required.acks", "-1", /* int */
Packit 2997f0
		"auto.commit.enable", "false", /* bool */
Packit 2997f0
		"auto.offset.reset", "error",  /* S2I */
Packit 2997f0
		"offset.store.path", "my/path", /* string */
Packit 2997f0
		NULL
Packit 2997f0
	};
Packit 2997f0
Packit 2997f0
	test_conf_init(&ignore_conf, &ignore_topic_conf, 10);
Packit 2997f0
	rd_kafka_conf_destroy(ignore_conf);
Packit 2997f0
	rd_kafka_topic_conf_destroy(ignore_topic_conf);
Packit 2997f0
Packit 2997f0
        topic = test_mk_topic_name("0004", 0);
Packit 2997f0
Packit 2997f0
	/* Set up a global config object */
Packit 2997f0
	conf = rd_kafka_conf_new();
Packit 2997f0
Packit 2997f0
        for (i = 0 ; gconfs[i] ; i += 2) {
Packit 2997f0
                if (rd_kafka_conf_set(conf, gconfs[i], gconfs[i+1],
Packit 2997f0
                                      errstr, sizeof(errstr)) !=
Packit 2997f0
                    RD_KAFKA_CONF_OK)
Packit 2997f0
                        TEST_FAIL("%s\n", errstr);
Packit 2997f0
        }
Packit 2997f0
Packit 2997f0
	rd_kafka_conf_set_dr_cb(conf, dr_cb);
Packit 2997f0
	rd_kafka_conf_set_error_cb(conf, error_cb);
Packit 2997f0
        /* interceptor configs are not exposed as strings or in dumps
Packit 2997f0
         * so the dump verification step will not cover them, but valgrind
Packit 2997f0
         * will help track down memory leaks/use-after-free etc. */
Packit 2997f0
        err = rd_kafka_conf_interceptor_add_on_new(conf, "testic",
Packit 2997f0
                                                   my_on_new, NULL);
Packit 2997f0
        TEST_ASSERT(!err, "add_on_new() failed: %s", rd_kafka_err2str(err));
Packit 2997f0
Packit 2997f0
	/* Set up a topic config object */
Packit 2997f0
	tconf = rd_kafka_topic_conf_new();
Packit 2997f0
Packit 2997f0
	rd_kafka_topic_conf_set_partitioner_cb(tconf, partitioner);
Packit 2997f0
	rd_kafka_topic_conf_set_opaque(tconf, (void *)0xbeef);
Packit 2997f0
Packit 2997f0
	for (i = 0 ; tconfs[i] ; i += 2) {
Packit 2997f0
		if (rd_kafka_topic_conf_set(tconf, tconfs[i], tconfs[i+1],
Packit 2997f0
				      errstr, sizeof(errstr)) !=
Packit 2997f0
		    RD_KAFKA_CONF_OK)
Packit 2997f0
			TEST_FAIL("%s\n", errstr);
Packit 2997f0
	}
Packit 2997f0
Packit 2997f0
Packit 2997f0
	/* Verify global config */
Packit 2997f0
	arr_orig = rd_kafka_conf_dump(conf, &cnt_orig);
Packit 2997f0
	conf_verify(__LINE__, arr_orig, cnt_orig, gconfs);
Packit 2997f0
Packit 2997f0
	/* Verify copied global config */
Packit 2997f0
	conf2 = rd_kafka_conf_dup(conf);
Packit 2997f0
	arr_dup = rd_kafka_conf_dump(conf2, &cnt_dup);
Packit 2997f0
	conf_verify(__LINE__, arr_dup, cnt_dup, gconfs);
Packit 2997f0
	conf_cmp("global", arr_orig, cnt_orig, arr_dup, cnt_dup);
Packit 2997f0
	rd_kafka_conf_dump_free(arr_orig, cnt_orig);
Packit 2997f0
	rd_kafka_conf_dump_free(arr_dup, cnt_dup);
Packit 2997f0
Packit 2997f0
	/* Verify topic config */
Packit 2997f0
	arr_orig = rd_kafka_topic_conf_dump(tconf, &cnt_orig);
Packit 2997f0
	conf_verify(__LINE__, arr_orig, cnt_orig, tconfs);
Packit 2997f0
Packit 2997f0
	/* Verify copied topic config */
Packit 2997f0
	tconf2 = rd_kafka_topic_conf_dup(tconf);
Packit 2997f0
	arr_dup = rd_kafka_topic_conf_dump(tconf2, &cnt_dup);
Packit 2997f0
	conf_verify(__LINE__, arr_dup, cnt_dup, tconfs);
Packit 2997f0
	conf_cmp("topic", arr_orig, cnt_orig, arr_dup, cnt_dup);
Packit 2997f0
	rd_kafka_conf_dump_free(arr_orig, cnt_orig);
Packit 2997f0
	rd_kafka_conf_dump_free(arr_dup, cnt_dup);
Packit 2997f0
Packit 2997f0
Packit 2997f0
	/*
Packit 2997f0
	 * Create kafka instances using original and copied confs
Packit 2997f0
	 */
Packit 2997f0
Packit 2997f0
	/* original */
Packit 2997f0
        TEST_ASSERT(on_new_call_cnt == 0, "expected 0 on_new call, not %d",
Packit 2997f0
                    on_new_call_cnt);
Packit 2997f0
        on_new_call_cnt = 0;
Packit 2997f0
	rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
Packit 2997f0
        TEST_ASSERT(on_new_call_cnt == 1, "expected 1 on_new call, not %d",
Packit 2997f0
                on_new_call_cnt);
Packit 2997f0
Packit 2997f0
	rkt = rd_kafka_topic_new(rk, topic, tconf);
Packit 2997f0
	if (!rkt)
Packit 2997f0
		TEST_FAIL("Failed to create topic: %s\n",
Packit 2997f0
			  rd_strerror(errno));
Packit 2997f0
Packit 2997f0
	rd_kafka_topic_destroy(rkt);
Packit 2997f0
	rd_kafka_destroy(rk);
Packit 2997f0
Packit 2997f0
	/* copied */
Packit 2997f0
        on_new_call_cnt = 0; /* interceptors are not copied. */
Packit 2997f0
	rk = test_create_handle(RD_KAFKA_PRODUCER, conf2);
Packit 2997f0
        TEST_ASSERT(on_new_call_cnt == 0, "expected 0 on_new call, not %d",
Packit 2997f0
                on_new_call_cnt);
Packit 2997f0
Packit 2997f0
	rkt = rd_kafka_topic_new(rk, topic, tconf2);
Packit 2997f0
	if (!rkt)
Packit 2997f0
		TEST_FAIL("Failed to create topic: %s\n",
Packit 2997f0
			  rd_strerror(errno));
Packit 2997f0
	rd_kafka_topic_destroy(rkt);
Packit 2997f0
	rd_kafka_destroy(rk);
Packit 2997f0
Packit 2997f0
Packit 2997f0
	/* Incremental S2F property.
Packit 2997f0
	 * NOTE: The order of fields returned in get() is hardcoded here. */
Packit 2997f0
	{
Packit 2997f0
		static const char *s2fs[] = {
Packit 2997f0
			"generic,broker,queue,cgrp",
Packit 2997f0
			"generic,broker,queue,cgrp",
Packit 2997f0
Packit 2997f0
			"-broker,+queue,topic",
Packit 2997f0
			"generic,topic,queue,cgrp",
Packit 2997f0
Packit 2997f0
			"-all,security,-fetch,+metadata",
Packit 2997f0
			"metadata,security",
Packit 2997f0
Packit 2997f0
			NULL
Packit 2997f0
		};
Packit 2997f0
Packit 2997f0
		TEST_SAY("Incremental S2F tests\n");
Packit 2997f0
		conf = rd_kafka_conf_new();
Packit 2997f0
Packit 2997f0
		for (i = 0 ; s2fs[i] ; i += 2) {
Packit 2997f0
			const char *val;
Packit 2997f0
Packit 2997f0
			TEST_SAY("  Set: %s\n", s2fs[i]);
Packit 2997f0
			test_conf_set(conf, "debug", s2fs[i]);
Packit 2997f0
			val = test_conf_get(conf, "debug");
Packit 2997f0
			TEST_SAY("  Now: %s\n", val);
Packit 2997f0
Packit 2997f0
			if (strcmp(val, s2fs[i+1]))
Packit 2997f0
				TEST_FAIL_LATER("\n"
Packit 2997f0
						"Expected: %s\n"
Packit 2997f0
						"     Got: %s",
Packit 2997f0
						s2fs[i+1], val);
Packit 2997f0
		}
Packit 2997f0
		rd_kafka_conf_destroy(conf);
Packit 2997f0
	}
Packit 2997f0
Packit 2997f0
	/* Canonical int values, aliases, s2i-verified strings */
Packit 2997f0
	{
Packit 2997f0
		static const struct {
Packit 2997f0
			const char *prop;
Packit 2997f0
			const char *val;
Packit 2997f0
			const char *exp;
Packit 2997f0
			int is_global;
Packit 2997f0
		} props[] = {
Packit 2997f0
			{ "request.required.acks", "0", "0" },
Packit 2997f0
			{ "request.required.acks", "-1", "-1" },
Packit 2997f0
			{ "request.required.acks", "1", "1" },
Packit 2997f0
			{ "acks", "3", "3" }, /* alias test */
Packit 2997f0
			{ "request.required.acks", "393", "393" },
Packit 2997f0
			{ "request.required.acks", "bad", NULL },
Packit 2997f0
			{ "request.required.acks", "all", "-1" },
Packit 2997f0
                        { "request.required.acks", "all", "-1", 1/*fallthru*/ },
Packit 2997f0
			{ "acks", "0", "0" }, /* alias test */
Packit 2997f0
#if WITH_SASL
Packit 2997f0
			{ "sasl.mechanisms", "GSSAPI", "GSSAPI", 1 },
Packit 2997f0
			{ "sasl.mechanisms", "PLAIN", "PLAIN", 1  },
Packit 2997f0
			{ "sasl.mechanisms", "GSSAPI,PLAIN", NULL, 1  },
Packit 2997f0
			{ "sasl.mechanisms", "", NULL, 1  },
Packit 2997f0
#endif
Packit 2997f0
			{ NULL }
Packit 2997f0
		};
Packit 2997f0
Packit 2997f0
		TEST_SAY("Canonical tests\n");
Packit 2997f0
		tconf = rd_kafka_topic_conf_new();
Packit 2997f0
		conf = rd_kafka_conf_new();
Packit 2997f0
Packit 2997f0
		for (i = 0 ; props[i].prop ; i++) {
Packit 2997f0
			char dest[64];
Packit 2997f0
			size_t destsz;
Packit 2997f0
			rd_kafka_conf_res_t res;
Packit 2997f0
Packit 2997f0
			TEST_SAY("  Set: %s=%s expect %s (%s)\n",
Packit 2997f0
				 props[i].prop, props[i].val, props[i].exp,
Packit 2997f0
                                 props[i].is_global ? "global":"topic");
Packit 2997f0
Packit 2997f0
Packit 2997f0
			/* Set value */
Packit 2997f0
			if (props[i].is_global)
Packit 2997f0
				res = rd_kafka_conf_set(conf,
Packit 2997f0
						      props[i].prop,
Packit 2997f0
						      props[i].val,
Packit 2997f0
						      errstr, sizeof(errstr));
Packit 2997f0
			else
Packit 2997f0
				res = rd_kafka_topic_conf_set(tconf,
Packit 2997f0
							      props[i].prop,
Packit 2997f0
							      props[i].val,
Packit 2997f0
							      errstr,
Packit 2997f0
							      sizeof(errstr));
Packit 2997f0
			if ((res == RD_KAFKA_CONF_OK ? 1:0) !=
Packit 2997f0
			    (props[i].exp ? 1:0))
Packit 2997f0
				TEST_FAIL("Expected %s, got %s",
Packit 2997f0
					  props[i].exp ? "success" : "failure",
Packit 2997f0
					  (res == RD_KAFKA_CONF_OK ? "OK" :
Packit 2997f0
					   (res == RD_KAFKA_CONF_INVALID ? "INVALID" :
Packit 2997f0
					    "UNKNOWN")));
Packit 2997f0
Packit 2997f0
			if (!props[i].exp)
Packit 2997f0
				continue;
Packit 2997f0
Packit 2997f0
			/* Get value and compare to expected result */
Packit 2997f0
			destsz = sizeof(dest);
Packit 2997f0
			if (props[i].is_global)
Packit 2997f0
				res = rd_kafka_conf_get(conf,
Packit 2997f0
							props[i].prop,
Packit 2997f0
							dest, &destsz);
Packit 2997f0
			else
Packit 2997f0
				res = rd_kafka_topic_conf_get(tconf,
Packit 2997f0
							      props[i].prop,
Packit 2997f0
							      dest, &destsz);
Packit 2997f0
			TEST_ASSERT(res == RD_KAFKA_CONF_OK,
Packit 2997f0
				    ".._conf_get(%s) returned %d",
Packit 2997f0
                                    props[i].prop, res);
Packit 2997f0
Packit 2997f0
			TEST_ASSERT(!strcmp(props[i].exp, dest),
Packit 2997f0
				    "Expected \"%s\", got \"%s\"",
Packit 2997f0
				    props[i].exp, dest);
Packit 2997f0
		}
Packit 2997f0
		rd_kafka_topic_conf_destroy(tconf);
Packit 2997f0
		rd_kafka_conf_destroy(conf);
Packit 2997f0
	}
Packit 2997f0
Packit 2997f0
        do_test_kafka_new_failures();
Packit 2997f0
Packit 2997f0
        do_test_special_invalid_conf();
Packit 2997f0
Packit 2997f0
	return 0;
Packit 2997f0
}