Blob Blame History Raw
/*
 * librdkafka - Apache Kafka C library
 *
 * Copyright (c) 2017, Magnus Edenhill
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *
 * 1. Redistributions of source code must retain the above copyright notice,
 *    this list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright notice,
 *    this list of conditions and the following disclaimer in the documentation
 *    and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */

#include "test.h"
#include "rdkafka.h"
#include <ctype.h>

/**
 * Verify interceptor functionality.
 *
 * Producer MO:
 *  - create a chain of N interceptors
 *  - allocate a state struct with unique id for each message produced,
 *    provide as msg_opaque and reference from payload.
 *  - in on_send: verify expected interceptor order by counting number
 *    of consecutive bits.
 *  - in on_acknowledge: same
 *  - produce message to invalid topic which should trigger on_send+on_ack..
 *    from within produce().
 *
 * Consumer MO:
 *  - create a chain of M interceptors
 *  - subscribe to the previously produced topic
 *  - in on_consume: find message by id, verify expected order by bit counting.
 *  - on on_commit: just count order per on_commit chain run.
 */


#define msgcnt 100
static const int producer_ic_cnt = 5;
static const int consumer_ic_cnt = 10;

/* The base values help differentiating opaque values between interceptors */
static const int on_send_base    = 1<<24;
static const int on_ack_base     = 1<<25;
static const int on_consume_base = 1<<26;
static const int on_commit_base  = 1<<27;
static const int base_mask       = 0xff << 24;

#define _ON_SEND    0
#define _ON_ACK     1
#define _ON_CONSUME 2
#define _ON_CNT     3
struct msg_state {
        int id;
        int bits[_ON_CNT];  /* Bit field, one bit per interceptor */
};

/* Per-message state */
static struct msg_state msgs[msgcnt];

/* on_commit bits */
static int on_commit_bits = 0;

/**
 * @brief Verify that \p bits matches the number of expected interceptor
 *        call cnt.
 *
 * Verify interceptor order: the lower bits of ic_id
 * denotes the order in which interceptors were added and it
 * must be reflected here, meaning that all lower bits must be set,
 * and no higher ones.
 */
static void msg_verify_ic_cnt (const struct msg_state *msg, const char *what,
                               int bits, int exp_cnt) {
        int exp_bits = exp_cnt ? (1 << exp_cnt)-1 : 0;

        TEST_ASSERT(bits == exp_bits,
                    "msg #%d: %s: expected bits 0x%x (%d), got 0x%x",
                    msg->id, what, exp_bits, exp_cnt, bits);
}

/*
 * @brief Same as msg_verify_ic_cnt() without the msg reliance
 */
static void verify_ic_cnt (const char *what, int bits, int exp_cnt) {
        int exp_bits = exp_cnt ? (1 << exp_cnt)-1 : 0;

        TEST_ASSERT(bits == exp_bits,
                    "%s: expected bits 0x%x (%d), got 0x%x",
                    what, exp_bits, exp_cnt, bits);
}



static void verify_msg (const char *what, int base, int bitid,
                        rd_kafka_message_t *rkmessage, void *ic_opaque) {
        const char *id_str = rkmessage->key;
        struct msg_state *msg;
        int id;
        int ic_id = (int)(intptr_t)ic_opaque;

        /* Verify opaque (base | ic id) */
        TEST_ASSERT((ic_id & base_mask) == base);
        ic_id &= ~base_mask;

        /* Find message by id */
        TEST_ASSERT(rkmessage->key && rkmessage->key_len > 0 &&
                    id_str[(int)rkmessage->key_len-1] == '\0' &&
                    strlen(id_str) > 0 && isdigit(*id_str));
        id = atoi(id_str);
        TEST_ASSERT(id >= 0 && id < msgcnt,
                    "%s: bad message id %s", what, id_str);
        msg = &msgs[id];
        TEST_ASSERT(msg->id == id, "expected msg #%d has wrong id %d",
                    id, msg->id);

        /* Verify message opaque */
        if (!strcmp(what, "on_send") ||
            !strncmp(what, "on_ack", 6))
                TEST_ASSERT(rkmessage->_private == (void *)msg);

        TEST_SAYL(3, "%s: interceptor #%d called for message #%d (%d)\n",
                  what, ic_id, id, msg->id);

        msg_verify_ic_cnt(msg, what, msg->bits[bitid], ic_id);

        /* Set this interceptor's bit */
        msg->bits[bitid] |= 1 << ic_id;

}


static rd_kafka_resp_err_t on_send (rd_kafka_t *rk,
                                    rd_kafka_message_t *rkmessage,
                                    void *ic_opaque) {
        TEST_ASSERT(ic_opaque != NULL);
        verify_msg("on_send", on_send_base, _ON_SEND, rkmessage, ic_opaque);
        return RD_KAFKA_RESP_ERR_NO_ERROR;
}

static rd_kafka_resp_err_t on_ack (rd_kafka_t *rk,
                                   rd_kafka_message_t *rkmessage,
                                   void *ic_opaque) {
        TEST_ASSERT(ic_opaque != NULL);
        verify_msg("on_ack", on_ack_base, _ON_ACK, rkmessage, ic_opaque);
        return RD_KAFKA_RESP_ERR_NO_ERROR;
}

static rd_kafka_resp_err_t on_consume (rd_kafka_t *rk,
                                       rd_kafka_message_t *rkmessage,
                                       void *ic_opaque) {
        TEST_ASSERT(ic_opaque != NULL);
        verify_msg("on_consume", on_consume_base, _ON_CONSUME, rkmessage,
                   ic_opaque);
        return RD_KAFKA_RESP_ERR_NO_ERROR;
}


static rd_kafka_resp_err_t on_commit (
        rd_kafka_t *rk, const rd_kafka_topic_partition_list_t *offsets,
        rd_kafka_resp_err_t err, void *ic_opaque) {
        int ic_id = (int)(intptr_t)ic_opaque;

        /* Since on_commit is triggered a bit randomly and not per
         * message we only try to make sure it gets fully set at least once. */
        TEST_ASSERT(ic_opaque != NULL);

        /* Verify opaque (base | ic id) */
        TEST_ASSERT((ic_id & base_mask) == on_commit_base);
        ic_id &= ~base_mask;

        TEST_ASSERT(ic_opaque != NULL);

        TEST_SAYL(3, "on_commit: interceptor #%d called: %s\n", ic_id,
                  rd_kafka_err2str(err));
        if (test_level >= 4)
                test_print_partition_list(offsets);

        /* Check for rollover where a previous on_commit stint was
         * succesful and it just now started over */
        if (on_commit_bits > 0 && ic_id == 0) {
                /* Verify completeness of previous stint */
                verify_ic_cnt("on_commit", on_commit_bits, consumer_ic_cnt);
                /* Reset */
                on_commit_bits = 0;
        }

        verify_ic_cnt("on_commit", on_commit_bits, ic_id);

        /* Set this interceptor's bit */
        on_commit_bits |= 1 << ic_id;


        return RD_KAFKA_RESP_ERR_NO_ERROR;
}


static void do_test_produce (rd_kafka_t *rk, const char *topic,
                             int32_t partition, int msgid, int exp_fail,
                             int exp_ic_cnt) {
        rd_kafka_resp_err_t err;
        char key[16];
        struct msg_state *msg = &msgs[msgid];
        int i;

        /* Message state should be empty, no interceptors should have
         * been called yet.. */
        for (i = 0 ; i < _ON_CNT ; i++)
                TEST_ASSERT(msg->bits[i] == 0);

        msg->id = msgid;
        rd_snprintf(key, sizeof(key), "%d", msgid);

        err = rd_kafka_producev(rk,
                                RD_KAFKA_V_TOPIC(topic),
                                RD_KAFKA_V_PARTITION(partition),
                                RD_KAFKA_V_KEY(key, strlen(key)+1),
                                RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
                                RD_KAFKA_V_OPAQUE(msg),
                                RD_KAFKA_V_END);
        msg_verify_ic_cnt(msg, "on_send", msg->bits[_ON_SEND], exp_ic_cnt);

        if (err) {
                msg_verify_ic_cnt(msg, "on_ack", msg->bits[_ON_ACK], exp_ic_cnt);
                TEST_ASSERT(exp_fail,
                            "producev() failed: %s", rd_kafka_err2str(err));
        } else {
                msg_verify_ic_cnt(msg, "on_ack", msg->bits[_ON_ACK], 0);
                TEST_ASSERT(!exp_fail,
                            "expected produce failure for msg #%d, not %s",
                            msgid, rd_kafka_err2str(err));
        }
}



static rd_kafka_resp_err_t on_new_producer (rd_kafka_t *rk,
                                            const rd_kafka_conf_t *conf,
                                            void *ic_opaque,
                                            char *errstr, size_t errstr_size) {
        int i;

        for (i = 0 ; i < producer_ic_cnt ; i++) {
                rd_kafka_resp_err_t err;

                err = rd_kafka_interceptor_add_on_send(
                        rk, tsprintf("on_send:%d",i),
                        on_send, (void *)(intptr_t)(on_send_base | i));
                TEST_ASSERT(!err, "add_on_send failed: %s",
                            rd_kafka_err2str(err));

                err = rd_kafka_interceptor_add_on_acknowledgement(
                        rk, tsprintf("on_acknowledgement:%d",i),
                        on_ack, (void *)(intptr_t)(on_ack_base | i));
                TEST_ASSERT(!err, "add_on_ack.. failed: %s",
                            rd_kafka_err2str(err));


                /* Add consumer interceptors as well to make sure
                 * they are not called. */
                err = rd_kafka_interceptor_add_on_consume(
                        rk, tsprintf("on_consume:%d",i),
                        on_consume, NULL);
                TEST_ASSERT(!err, "add_on_consume failed: %s",
                            rd_kafka_err2str(err));


                err = rd_kafka_interceptor_add_on_commit(
                        rk, tsprintf("on_commit:%d",i),
                        on_commit, NULL);
                TEST_ASSERT(!err, "add_on_commit failed: %s",
                            rd_kafka_err2str(err));
        }

        return RD_KAFKA_RESP_ERR_NO_ERROR;
}

static void do_test_producer (const char *topic) {
        rd_kafka_conf_t *conf;
        int i;
        rd_kafka_t *rk;

        TEST_SAY(_C_MAG "[ %s ]\n" _C_CLR, __FUNCTION__);

        test_conf_init(&conf, NULL, 0);

        rd_kafka_conf_interceptor_add_on_new(conf, "on_new_prodcer",
                                             on_new_producer, NULL);

        /* Create producer */
        rk = test_create_handle(RD_KAFKA_PRODUCER, conf);

        for (i = 0 ; i < msgcnt-1 ; i++)
                do_test_produce(rk, topic, RD_KAFKA_PARTITION_UA, i, 0,
                                producer_ic_cnt);

        /* Wait for messages to be delivered */
        test_flush(rk, -1);

        /* Now send a message that will fail in produce()
         * due to bad partition */
        do_test_produce(rk, topic, 1234, i, 1, producer_ic_cnt);


        /* Verify acks */
        for (i = 0 ; i < msgcnt ; i++) {
                struct msg_state *msg = &msgs[i];
                msg_verify_ic_cnt(msg, "on_ack", msg->bits[_ON_ACK],
                                  producer_ic_cnt);
        }

        rd_kafka_destroy(rk);
}


static rd_kafka_resp_err_t on_new_consumer (rd_kafka_t *rk,
                                            const rd_kafka_conf_t *conf,
                                            void *ic_opaque,
                                            char *errstr, size_t errstr_size) {
        int i;

        for (i = 0 ; i < consumer_ic_cnt ; i++) {
                rd_kafka_interceptor_add_on_consume(
                        rk, tsprintf("on_consume:%d",i),
                        on_consume, (void *)(intptr_t)(on_consume_base | i));

                rd_kafka_interceptor_add_on_commit(
                        rk, tsprintf("on_commit:%d",i),
                        on_commit, (void *)(intptr_t)(on_commit_base | i));

                /* Add producer interceptors as well to make sure they
                 * are not called. */
                rd_kafka_interceptor_add_on_send(
                        rk, tsprintf("on_send:%d",i),
                        on_send, NULL);

                rd_kafka_interceptor_add_on_acknowledgement(
                        rk, tsprintf("on_acknowledgement:%d",i),
                        on_ack, NULL);
        }


        return RD_KAFKA_RESP_ERR_NO_ERROR;
}


static void do_test_consumer (const char *topic) {

        rd_kafka_conf_t *conf;
        int i;
        rd_kafka_t *rk;

        TEST_SAY(_C_MAG "[ %s ]\n" _C_CLR, __FUNCTION__);

        test_conf_init(&conf, NULL, 0);

        rd_kafka_conf_interceptor_add_on_new(conf, "on_new_consumer",
                                             on_new_consumer, NULL);

        test_conf_set(conf, "auto.offset.reset", "earliest");

        /* Create producer */
        rk = test_create_consumer(topic, NULL, conf, NULL);

        test_consumer_subscribe(rk, topic);

        /* Consume messages (-1 for the one that failed producing) */
        test_consumer_poll("interceptors.consume", rk, 0, -1, -1, msgcnt-1,
                           NULL);

        /* Verify on_consume */
        for (i = 0 ; i < msgcnt-1 ; i++) {
                struct msg_state *msg = &msgs[i];
                msg_verify_ic_cnt(msg, "on_consume", msg->bits[_ON_CONSUME],
                                  consumer_ic_cnt);
        }

        /* Verify that the produce-failed message didnt have
         * interceptors called */
        msg_verify_ic_cnt(&msgs[msgcnt-1], "on_consume",
                          msgs[msgcnt-1].bits[_ON_CONSUME], 0);

        test_consumer_close(rk);

        verify_ic_cnt("on_commit", on_commit_bits, consumer_ic_cnt);

        rd_kafka_destroy(rk);
}

/**
 * @brief Interceptors must not be copied automatically by conf_dup()
 *        unless the interceptors have added on_conf_dup().
 *        This behaviour makes sure an interceptor's instance
 *        is not duplicated without the interceptor's knowledge or
 *        assistance.
 */
static void do_test_conf_copy (const char *topic) {
        rd_kafka_conf_t *conf, *conf2;
        int i;
        rd_kafka_t *rk;

        TEST_SAY(_C_MAG "[ %s ]\n" _C_CLR, __FUNCTION__);

        memset(&msgs[0], 0, sizeof(msgs));

        test_conf_init(&conf, NULL, 0);

        rd_kafka_conf_interceptor_add_on_new(conf, "on_new_conf_copy",
                                             on_new_producer, NULL);

        /* Now copy the configuration to verify that interceptors are
         * NOT copied. */
        conf2 = conf;
        conf = rd_kafka_conf_dup(conf2);
        rd_kafka_conf_destroy(conf2);

        /* Create producer */
        rk = test_create_handle(RD_KAFKA_PRODUCER, conf);

        for (i = 0 ; i < msgcnt-1 ; i++)
                do_test_produce(rk, topic, RD_KAFKA_PARTITION_UA, i, 0, 0);

        /* Wait for messages to be delivered */
        test_flush(rk, -1);

        /* Verify acks */
        for (i = 0 ; i < msgcnt ; i++) {
                struct msg_state *msg = &msgs[i];
                msg_verify_ic_cnt(msg, "on_ack", msg->bits[_ON_ACK], 0);
        }

        rd_kafka_destroy(rk);
}


int main_0064_interceptors (int argc, char **argv) {
        const char *topic = test_mk_topic_name(__FUNCTION__, 1);

        do_test_producer(topic);

        do_test_consumer(topic);

        do_test_conf_copy(topic);

        return 0;
}