Blob Blame History Raw
/*
 * librdkafka - Apache Kafka C library
 *
 * Copyright (c) 2012-2015, Magnus Edenhill
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *
 * 1. Redistributions of source code must retain the above copyright notice,
 *    this list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright notice,
 *    this list of conditions and the following disclaimer in the documentation
 *    and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */

#include "test.h"
#include "rdkafka.h"

/**
 * Message Headers end-to-end tests
 */



static int exp_msgid = 0;

struct expect {
        const char *name;
        const char *value;
};



static void expect_check (const char *what, const struct expect *expected,
                          rd_kafka_message_t *rkmessage, int is_const) {
        const struct expect *exp;
        rd_kafka_resp_err_t err;
        size_t idx = 0;
        const char *name;
        const char *value;
        size_t size;
        rd_kafka_headers_t *hdrs;
        int msgid;

        if (rkmessage->len != sizeof(msgid))
                TEST_FAIL("%s: expected message len %"PRIusz" == sizeof(int)",
                          what, rkmessage->len);

        memcpy(&msgid, rkmessage->payload, rkmessage->len);

        if ((err = rd_kafka_message_headers(rkmessage, &hdrs))) {
                if (msgid == 0) {
                        rd_kafka_resp_err_t err2;
                        TEST_SAYL(3, "%s: Msg #%d: no headers, good\n",
                                  what, msgid);

                        err2 = rd_kafka_message_detach_headers(rkmessage, &hdrs);
                        TEST_ASSERT(err == err2,
                                    "expected detach_headers() error %s "
                                    "to match headers() error %s",
                                    rd_kafka_err2str(err2),
                                    rd_kafka_err2str(err));

                        return; /* No headers expected for first message */
                }

                TEST_FAIL("%s: Expected headers in message %d: %s", what, msgid,
                          rd_kafka_err2str(err));
        } else {
                TEST_ASSERT(msgid != 0,
                            "%s: first message should have no headers", what);
        }

        test_headers_dump(what, 3, hdrs);

        for (idx = 0, exp = expected ;
             !rd_kafka_header_get_all(hdrs, idx, &name,
                                      (const void **)&value, &size) ;
             idx++, exp++) {

                TEST_SAYL(3, "%s: Msg #%d: "
                          "Header #%"PRIusz": %s='%s' (expecting %s='%s')\n",
                          what, msgid, idx, name, value ? value : "(NULL)",
                          exp->name, exp->value ? exp->value : "(NULL)");

                if (strcmp(name, exp->name))
                        TEST_FAIL("%s: Msg #%d: "
                                  "Expected header %s at idx #%"PRIusz
                                  ", not '%s' (%"PRIusz")",
                                  what, msgid, exp->name, idx, name,
                                  strlen(name));

                if (!strcmp(name, "msgid")) {
                        int vid;

                        /* Special handling: compare msgid header value
                         * to message body, should be identical */
                        if (size != rkmessage->len || size != sizeof(int))
                                TEST_FAIL("%s: "
                                          "Expected msgid/int-sized payload "
                                          "%"PRIusz", got %"PRIusz,
                                          what, size, rkmessage->len);

                        /* Copy to avoid unaligned access (by cast) */
                        memcpy(&vid, value, size);

                        if (vid != msgid)
                                TEST_FAIL("%s: Header msgid %d != payload %d",
                                          what, vid, msgid);

                        if (exp_msgid != vid)
                                TEST_FAIL("%s: Expected msgid %d, not %d",
                                          what, exp_msgid, vid);
                        continue;
                }

                if (!exp->value) {
                        /* Expected NULL value */
                        TEST_ASSERT(!value,
                                    "%s: Expected NULL value for %s, got %s",
                                    what, exp->name, value);

                } else {
                        TEST_ASSERT(value,
                                    "%s: "
                                    "Expected non-NULL value for %s, got NULL",
                                    what, exp->name);

                        TEST_ASSERT(size == strlen(exp->value),
                                    "%s: Expected size %"PRIusz" for %s, "
                                    "not %"PRIusz,
                                    what, strlen(exp->value), exp->name, size);

                        TEST_ASSERT(value[size] == '\0',
                                    "%s: "
                                    "Expected implicit null-terminator for %s",
                                    what, exp->name);

                        TEST_ASSERT(!strcmp(exp->value, value),
                                    "%s: "
                                    "Expected value %s for %s, not %s",
                                    what, exp->value, exp->name, value);
                }
        }

        TEST_ASSERT(exp->name == NULL,
                    "%s: Expected the expected, but stuck at %s which was "
                    "unexpected",
                    what, exp->name);

        if (!strcmp(what, "handle_consumed_msg") && !is_const &&
            (msgid % 3) == 0) {
                rd_kafka_headers_t *dhdrs;

                err = rd_kafka_message_detach_headers(rkmessage, &dhdrs);
                TEST_ASSERT(!err,
                            "detach_headers() should not fail, got %s",
                            rd_kafka_err2str(err));
                TEST_ASSERT(hdrs == dhdrs);

                /* Verify that a new headers object can be obtained */
                err = rd_kafka_message_headers(rkmessage, &hdrs);
                TEST_ASSERT(err == RD_KAFKA_RESP_ERR_NO_ERROR);
                TEST_ASSERT(hdrs != dhdrs);
                rd_kafka_headers_destroy(dhdrs);

                expect_check("post_detach_headers", expected,
                             rkmessage, is_const);
       }
}


/**
 * @brief Final (as in no more header modifications) message check.
 */
static void msg_final_check (const char *what,
                             rd_kafka_message_t *rkmessage, int is_const) {
        const struct expect expected[] = {
                { "msgid", NULL }, /* special handling */
                { "static", "hey" },
                { "null", NULL },
                { "empty", "" },
                { "send1", "1" },
                { "multi", "multi5" },
                { NULL }
        };

        expect_check(what, expected, rkmessage, is_const);

        exp_msgid++;


}

/**
 * @brief Handle consumed message, must be identical to dr_msg_cb
 */
static void handle_consumed_msg (rd_kafka_message_t *rkmessage) {
        msg_final_check(__FUNCTION__, rkmessage, 0);
}

/**
 * @brief Delivery report callback
 */
static void dr_msg_cb (rd_kafka_t *rk,
                       const rd_kafka_message_t *rkmessage, void *opaque) {
        TEST_ASSERT(!rkmessage->err,
                    "Message delivery failed: %s",
                    rd_kafka_err2str(rkmessage->err));

        msg_final_check(__FUNCTION__, (rd_kafka_message_t *)rkmessage, 1);
}


/**
 * @brief First on_send() interceptor
 */
static rd_kafka_resp_err_t on_send1 (rd_kafka_t *rk,
                                     rd_kafka_message_t *rkmessage,
                                     void *ic_opaque) {
        const struct expect expected[] = {
                { "msgid", NULL }, /* special handling */
                { "static", "hey" },
                { "multi", "multi1" },
                { "multi", "multi2" },
                { "multi", "multi3" },
                { "null", NULL },
                { "empty", "" },
                { NULL }
        };
        rd_kafka_headers_t *hdrs;
        rd_kafka_resp_err_t err;

        expect_check(__FUNCTION__, expected, rkmessage, 0);

        err = rd_kafka_message_headers(rkmessage, &hdrs);
        if (err) /* First message has no headers. */
                return RD_KAFKA_RESP_ERR_NO_ERROR;

        rd_kafka_header_add(hdrs, "multi", -1, "multi4", -1);
        rd_kafka_header_add(hdrs, "send1", -1, "1", -1);
        rd_kafka_header_remove(hdrs, "multi");
        rd_kafka_header_add(hdrs, "multi", -1, "multi5", -1);

        return RD_KAFKA_RESP_ERR_NO_ERROR;
}


/**
 * @brief Second on_send() interceptor
 */
static rd_kafka_resp_err_t on_send2 (rd_kafka_t *rk,
                                     rd_kafka_message_t *rkmessage,
                                     void *ic_opaque) {
        const struct expect expected[] = {
                { "msgid", NULL }, /* special handling */
                { "static", "hey" },
                { "null", NULL },
                { "empty", "" },
                { "send1", "1" },
                { "multi", "multi5" },
                { NULL }
        };

        expect_check(__FUNCTION__, expected, rkmessage, 0);

        return RD_KAFKA_RESP_ERR_NO_ERROR;
}

/**
 * @brief on_new() interceptor to set up message interceptors
 *        from rd_kafka_new().
 */
static rd_kafka_resp_err_t on_new (rd_kafka_t *rk, const rd_kafka_conf_t *conf,
                                   void *ic_opaque,
                                   char *errstr, size_t errstr_size) {
        rd_kafka_interceptor_add_on_send(rk, __FILE__, on_send1, NULL);
        rd_kafka_interceptor_add_on_send(rk, __FILE__, on_send2, NULL);
        return RD_KAFKA_RESP_ERR_NO_ERROR;
}


static void do_produce (const char *topic, int msgcnt) {
        rd_kafka_t *rk;
        rd_kafka_conf_t *conf;
        int i;
        rd_kafka_resp_err_t err;

        test_conf_init(&conf, NULL, 0);
        test_conf_set(conf, "acks", "all");
        rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);

        rd_kafka_conf_interceptor_add_on_new(conf, __FILE__, on_new, NULL);

        rk = test_create_handle(RD_KAFKA_PRODUCER, conf);

        /* First message is without headers (negative testing) */
        i = 0;
        err = rd_kafka_producev(
                rk,
                RD_KAFKA_V_TOPIC(topic),
                RD_KAFKA_V_PARTITION(0),
                RD_KAFKA_V_VALUE(&i, sizeof(i)),
                RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
                RD_KAFKA_V_END);
        TEST_ASSERT(!err,
                    "producev() failed: %s", rd_kafka_err2str(err));
        exp_msgid++;

        for (i = 1 ; i < msgcnt ; i++, exp_msgid++) {
                err = rd_kafka_producev(
                        rk,
                        RD_KAFKA_V_TOPIC(topic),
                        RD_KAFKA_V_PARTITION(0),
                        RD_KAFKA_V_VALUE(&i, sizeof(i)),
                        RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
                        RD_KAFKA_V_HEADER("msgid", &i, sizeof(i)),
                        RD_KAFKA_V_HEADER("static", "hey", -1),
                        RD_KAFKA_V_HEADER("multi", "multi1", -1),
                        RD_KAFKA_V_HEADER("multi", "multi2", 6),
                        RD_KAFKA_V_HEADER("multi", "multi3", strlen("multi3")),
                        RD_KAFKA_V_HEADER("null", NULL, 0),
                        RD_KAFKA_V_HEADER("empty", "", 0),
                        RD_KAFKA_V_END);
                TEST_ASSERT(!err,
                            "producev() failed: %s", rd_kafka_err2str(err));
        }

        /* Reset expected message id for dr */
        exp_msgid = 0;

        /* Wait for timeouts and delivery reports */
        rd_kafka_flush(rk, tmout_multip(5000));

        rd_kafka_destroy(rk);
}

static void do_consume (const char *topic, int msgcnt) {
        rd_kafka_t *rk;
        rd_kafka_topic_partition_list_t *parts;

        rk = test_create_consumer(topic, NULL, NULL, NULL);

        parts = rd_kafka_topic_partition_list_new(1);
        rd_kafka_topic_partition_list_add(parts, topic, 0)->offset =
                RD_KAFKA_OFFSET_BEGINNING;

        test_consumer_assign("assign", rk, parts);

        rd_kafka_topic_partition_list_destroy(parts);

        exp_msgid = 0;

        while (exp_msgid < msgcnt) {
                rd_kafka_message_t *rkm;

                rkm = rd_kafka_consumer_poll(rk, 1000);
                if (!rkm)
                        continue;

                if (rkm->err)
                        TEST_FAIL("consume error while expecting msgid %d/%d: "
                                  "%s",
                                  exp_msgid, msgcnt,
                                  rd_kafka_message_errstr(rkm));

                handle_consumed_msg(rkm);

                rd_kafka_message_destroy(rkm);
        }

        test_consumer_close(rk);
        rd_kafka_destroy(rk);
}


int main_0073_headers (int argc, char **argv) {
        const char *topic = test_mk_topic_name(__FUNCTION__ + 5, 1);
        const int msgcnt = 10;

        do_produce(topic, msgcnt);
        do_consume(topic, msgcnt);

        return 0;
}