Blame tests/0064-interceptors.c

Packit 2997f0
/*
Packit 2997f0
 * librdkafka - Apache Kafka C library
Packit 2997f0
 *
Packit 2997f0
 * Copyright (c) 2017, Magnus Edenhill
Packit 2997f0
 * All rights reserved.
Packit 2997f0
 *
Packit 2997f0
 * Redistribution and use in source and binary forms, with or without
Packit 2997f0
 * modification, are permitted provided that the following conditions are met:
Packit 2997f0
 *
Packit 2997f0
 * 1. Redistributions of source code must retain the above copyright notice,
Packit 2997f0
 *    this list of conditions and the following disclaimer.
Packit 2997f0
 * 2. Redistributions in binary form must reproduce the above copyright notice,
Packit 2997f0
 *    this list of conditions and the following disclaimer in the documentation
Packit 2997f0
 *    and/or other materials provided with the distribution.
Packit 2997f0
 *
Packit 2997f0
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
Packit 2997f0
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
Packit 2997f0
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
Packit 2997f0
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
Packit 2997f0
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
Packit 2997f0
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
Packit 2997f0
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
Packit 2997f0
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
Packit 2997f0
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
Packit 2997f0
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
Packit 2997f0
 * POSSIBILITY OF SUCH DAMAGE.
Packit 2997f0
 */
Packit 2997f0
Packit 2997f0
#include "test.h"
Packit 2997f0
#include "rdkafka.h"
Packit 2997f0
#include <ctype.h>
Packit 2997f0
Packit 2997f0
/**
Packit 2997f0
 * Verify interceptor functionality.
Packit 2997f0
 *
Packit 2997f0
 * Producer MO:
Packit 2997f0
 *  - create a chain of N interceptors
Packit 2997f0
 *  - allocate a state struct with unique id for each message produced,
Packit 2997f0
 *    provide as msg_opaque and reference from payload.
Packit 2997f0
 *  - in on_send: verify expected interceptor order by counting number
Packit 2997f0
 *    of consecutive bits.
Packit 2997f0
 *  - in on_acknowledge: same
Packit 2997f0
 *  - produce message to invalid topic which should trigger on_send+on_ack..
Packit 2997f0
 *    from within produce().
Packit 2997f0
 *
Packit 2997f0
 * Consumer MO:
Packit 2997f0
 *  - create a chain of M interceptors
Packit 2997f0
 *  - subscribe to the previously produced topic
Packit 2997f0
 *  - in on_consume: find message by id, verify expected order by bit counting.
Packit 2997f0
 *  - on on_commit: just count order per on_commit chain run.
Packit 2997f0
 */
Packit 2997f0
Packit 2997f0
Packit 2997f0
#define msgcnt 100
Packit 2997f0
static const int producer_ic_cnt = 5;
Packit 2997f0
static const int consumer_ic_cnt = 10;
Packit 2997f0
Packit 2997f0
/* The base values help differentiating opaque values between interceptors */
Packit 2997f0
static const int on_send_base    = 1<<24;
Packit 2997f0
static const int on_ack_base     = 1<<25;
Packit 2997f0
static const int on_consume_base = 1<<26;
Packit 2997f0
static const int on_commit_base  = 1<<27;
Packit 2997f0
static const int base_mask       = 0xff << 24;
Packit 2997f0
Packit 2997f0
#define _ON_SEND    0
Packit 2997f0
#define _ON_ACK     1
Packit 2997f0
#define _ON_CONSUME 2
Packit 2997f0
#define _ON_CNT     3
Packit 2997f0
struct msg_state {
Packit 2997f0
        int id;
Packit 2997f0
        int bits[_ON_CNT];  /* Bit field, one bit per interceptor */
Packit 2997f0
};
Packit 2997f0
Packit 2997f0
/* Per-message state */
Packit 2997f0
static struct msg_state msgs[msgcnt];
Packit 2997f0
Packit 2997f0
/* on_commit bits */
Packit 2997f0
static int on_commit_bits = 0;
Packit 2997f0
Packit 2997f0
/**
Packit 2997f0
 * @brief Verify that \p bits matches the number of expected interceptor
Packit 2997f0
 *        call cnt.
Packit 2997f0
 *
Packit 2997f0
 * Verify interceptor order: the lower bits of ic_id
Packit 2997f0
 * denotes the order in which interceptors were added and it
Packit 2997f0
 * must be reflected here, meaning that all lower bits must be set,
Packit 2997f0
 * and no higher ones.
Packit 2997f0
 */
Packit 2997f0
static void msg_verify_ic_cnt (const struct msg_state *msg, const char *what,
Packit 2997f0
                               int bits, int exp_cnt) {
Packit 2997f0
        int exp_bits = exp_cnt ? (1 << exp_cnt)-1 : 0;
Packit 2997f0
Packit 2997f0
        TEST_ASSERT(bits == exp_bits,
Packit 2997f0
                    "msg #%d: %s: expected bits 0x%x (%d), got 0x%x",
Packit 2997f0
                    msg->id, what, exp_bits, exp_cnt, bits);
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
/*
Packit 2997f0
 * @brief Same as msg_verify_ic_cnt() without the msg reliance
Packit 2997f0
 */
Packit 2997f0
static void verify_ic_cnt (const char *what, int bits, int exp_cnt) {
Packit 2997f0
        int exp_bits = exp_cnt ? (1 << exp_cnt)-1 : 0;
Packit 2997f0
Packit 2997f0
        TEST_ASSERT(bits == exp_bits,
Packit 2997f0
                    "%s: expected bits 0x%x (%d), got 0x%x",
Packit 2997f0
                    what, exp_bits, exp_cnt, bits);
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
Packit 2997f0
Packit 2997f0
static void verify_msg (const char *what, int base, int bitid,
Packit 2997f0
                        rd_kafka_message_t *rkmessage, void *ic_opaque) {
Packit 2997f0
        const char *id_str = rkmessage->key;
Packit 2997f0
        struct msg_state *msg;
Packit 2997f0
        int id;
Packit 2997f0
        int ic_id = (int)(intptr_t)ic_opaque;
Packit 2997f0
Packit 2997f0
        /* Verify opaque (base | ic id) */
Packit 2997f0
        TEST_ASSERT((ic_id & base_mask) == base);
Packit 2997f0
        ic_id &= ~base_mask;
Packit 2997f0
Packit 2997f0
        /* Find message by id */
Packit 2997f0
        TEST_ASSERT(rkmessage->key && rkmessage->key_len > 0 &&
Packit 2997f0
                    id_str[(int)rkmessage->key_len-1] == '\0' &&
Packit 2997f0
                    strlen(id_str) > 0 && isdigit(*id_str));
Packit 2997f0
        id = atoi(id_str);
Packit 2997f0
        TEST_ASSERT(id >= 0 && id < msgcnt,
Packit 2997f0
                    "%s: bad message id %s", what, id_str);
Packit 2997f0
        msg = &msgs[id];
Packit 2997f0
        TEST_ASSERT(msg->id == id, "expected msg #%d has wrong id %d",
Packit 2997f0
                    id, msg->id);
Packit 2997f0
Packit 2997f0
        /* Verify message opaque */
Packit 2997f0
        if (!strcmp(what, "on_send") ||
Packit 2997f0
            !strncmp(what, "on_ack", 6))
Packit 2997f0
                TEST_ASSERT(rkmessage->_private == (void *)msg);
Packit 2997f0
Packit 2997f0
        TEST_SAYL(3, "%s: interceptor #%d called for message #%d (%d)\n",
Packit 2997f0
                  what, ic_id, id, msg->id);
Packit 2997f0
Packit 2997f0
        msg_verify_ic_cnt(msg, what, msg->bits[bitid], ic_id);
Packit 2997f0
Packit 2997f0
        /* Set this interceptor's bit */
Packit 2997f0
        msg->bits[bitid] |= 1 << ic_id;
Packit 2997f0
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
Packit 2997f0
static rd_kafka_resp_err_t on_send (rd_kafka_t *rk,
Packit 2997f0
                                    rd_kafka_message_t *rkmessage,
Packit 2997f0
                                    void *ic_opaque) {
Packit 2997f0
        TEST_ASSERT(ic_opaque != NULL);
Packit 2997f0
        verify_msg("on_send", on_send_base, _ON_SEND, rkmessage, ic_opaque);
Packit 2997f0
        return RD_KAFKA_RESP_ERR_NO_ERROR;
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
static rd_kafka_resp_err_t on_ack (rd_kafka_t *rk,
Packit 2997f0
                                   rd_kafka_message_t *rkmessage,
Packit 2997f0
                                   void *ic_opaque) {
Packit 2997f0
        TEST_ASSERT(ic_opaque != NULL);
Packit 2997f0
        verify_msg("on_ack", on_ack_base, _ON_ACK, rkmessage, ic_opaque);
Packit 2997f0
        return RD_KAFKA_RESP_ERR_NO_ERROR;
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
static rd_kafka_resp_err_t on_consume (rd_kafka_t *rk,
Packit 2997f0
                                       rd_kafka_message_t *rkmessage,
Packit 2997f0
                                       void *ic_opaque) {
Packit 2997f0
        TEST_ASSERT(ic_opaque != NULL);
Packit 2997f0
        verify_msg("on_consume", on_consume_base, _ON_CONSUME, rkmessage,
Packit 2997f0
                   ic_opaque);
Packit 2997f0
        return RD_KAFKA_RESP_ERR_NO_ERROR;
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
Packit 2997f0
static rd_kafka_resp_err_t on_commit (
Packit 2997f0
        rd_kafka_t *rk, const rd_kafka_topic_partition_list_t *offsets,
Packit 2997f0
        rd_kafka_resp_err_t err, void *ic_opaque) {
Packit 2997f0
        int ic_id = (int)(intptr_t)ic_opaque;
Packit 2997f0
Packit 2997f0
        /* Since on_commit is triggered a bit randomly and not per
Packit 2997f0
         * message we only try to make sure it gets fully set at least once. */
Packit 2997f0
        TEST_ASSERT(ic_opaque != NULL);
Packit 2997f0
Packit 2997f0
        /* Verify opaque (base | ic id) */
Packit 2997f0
        TEST_ASSERT((ic_id & base_mask) == on_commit_base);
Packit 2997f0
        ic_id &= ~base_mask;
Packit 2997f0
Packit 2997f0
        TEST_ASSERT(ic_opaque != NULL);
Packit 2997f0
Packit 2997f0
        TEST_SAYL(3, "on_commit: interceptor #%d called: %s\n", ic_id,
Packit 2997f0
                  rd_kafka_err2str(err));
Packit 2997f0
        if (test_level >= 4)
Packit 2997f0
                test_print_partition_list(offsets);
Packit 2997f0
Packit 2997f0
        /* Check for rollover where a previous on_commit stint was
Packit 2997f0
         * succesful and it just now started over */
Packit 2997f0
        if (on_commit_bits > 0 && ic_id == 0) {
Packit 2997f0
                /* Verify completeness of previous stint */
Packit 2997f0
                verify_ic_cnt("on_commit", on_commit_bits, consumer_ic_cnt);
Packit 2997f0
                /* Reset */
Packit 2997f0
                on_commit_bits = 0;
Packit 2997f0
        }
Packit 2997f0
Packit 2997f0
        verify_ic_cnt("on_commit", on_commit_bits, ic_id);
Packit 2997f0
Packit 2997f0
        /* Set this interceptor's bit */
Packit 2997f0
        on_commit_bits |= 1 << ic_id;
Packit 2997f0
Packit 2997f0
Packit 2997f0
        return RD_KAFKA_RESP_ERR_NO_ERROR;
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
Packit 2997f0
static void do_test_produce (rd_kafka_t *rk, const char *topic,
Packit 2997f0
                             int32_t partition, int msgid, int exp_fail,
Packit 2997f0
                             int exp_ic_cnt) {
Packit 2997f0
        rd_kafka_resp_err_t err;
Packit 2997f0
        char key[16];
Packit 2997f0
        struct msg_state *msg = &msgs[msgid];
Packit 2997f0
        int i;
Packit 2997f0
Packit 2997f0
        /* Message state should be empty, no interceptors should have
Packit 2997f0
         * been called yet.. */
Packit 2997f0
        for (i = 0 ; i < _ON_CNT ; i++)
Packit 2997f0
                TEST_ASSERT(msg->bits[i] == 0);
Packit 2997f0
Packit 2997f0
        msg->id = msgid;
Packit 2997f0
        rd_snprintf(key, sizeof(key), "%d", msgid);
Packit 2997f0
Packit 2997f0
        err = rd_kafka_producev(rk,
Packit 2997f0
                                RD_KAFKA_V_TOPIC(topic),
Packit 2997f0
                                RD_KAFKA_V_PARTITION(partition),
Packit 2997f0
                                RD_KAFKA_V_KEY(key, strlen(key)+1),
Packit 2997f0
                                RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
Packit 2997f0
                                RD_KAFKA_V_OPAQUE(msg),
Packit 2997f0
                                RD_KAFKA_V_END);
Packit 2997f0
        msg_verify_ic_cnt(msg, "on_send", msg->bits[_ON_SEND], exp_ic_cnt);
Packit 2997f0
Packit 2997f0
        if (err) {
Packit 2997f0
                msg_verify_ic_cnt(msg, "on_ack", msg->bits[_ON_ACK], exp_ic_cnt);
Packit 2997f0
                TEST_ASSERT(exp_fail,
Packit 2997f0
                            "producev() failed: %s", rd_kafka_err2str(err));
Packit 2997f0
        } else {
Packit 2997f0
                msg_verify_ic_cnt(msg, "on_ack", msg->bits[_ON_ACK], 0);
Packit 2997f0
                TEST_ASSERT(!exp_fail,
Packit 2997f0
                            "expected produce failure for msg #%d, not %s",
Packit 2997f0
                            msgid, rd_kafka_err2str(err));
Packit 2997f0
        }
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
Packit 2997f0
Packit 2997f0
static rd_kafka_resp_err_t on_new_producer (rd_kafka_t *rk,
Packit 2997f0
                                            const rd_kafka_conf_t *conf,
Packit 2997f0
                                            void *ic_opaque,
Packit 2997f0
                                            char *errstr, size_t errstr_size) {
Packit 2997f0
        int i;
Packit 2997f0
Packit 2997f0
        for (i = 0 ; i < producer_ic_cnt ; i++) {
Packit 2997f0
                rd_kafka_resp_err_t err;
Packit 2997f0
Packit 2997f0
                err = rd_kafka_interceptor_add_on_send(
Packit 2997f0
                        rk, tsprintf("on_send:%d",i),
Packit 2997f0
                        on_send, (void *)(intptr_t)(on_send_base | i));
Packit 2997f0
                TEST_ASSERT(!err, "add_on_send failed: %s",
Packit 2997f0
                            rd_kafka_err2str(err));
Packit 2997f0
Packit 2997f0
                err = rd_kafka_interceptor_add_on_acknowledgement(
Packit 2997f0
                        rk, tsprintf("on_acknowledgement:%d",i),
Packit 2997f0
                        on_ack, (void *)(intptr_t)(on_ack_base | i));
Packit 2997f0
                TEST_ASSERT(!err, "add_on_ack.. failed: %s",
Packit 2997f0
                            rd_kafka_err2str(err));
Packit 2997f0
Packit 2997f0
Packit 2997f0
                /* Add consumer interceptors as well to make sure
Packit 2997f0
                 * they are not called. */
Packit 2997f0
                err = rd_kafka_interceptor_add_on_consume(
Packit 2997f0
                        rk, tsprintf("on_consume:%d",i),
Packit 2997f0
                        on_consume, NULL);
Packit 2997f0
                TEST_ASSERT(!err, "add_on_consume failed: %s",
Packit 2997f0
                            rd_kafka_err2str(err));
Packit 2997f0
Packit 2997f0
Packit 2997f0
                err = rd_kafka_interceptor_add_on_commit(
Packit 2997f0
                        rk, tsprintf("on_commit:%d",i),
Packit 2997f0
                        on_commit, NULL);
Packit 2997f0
                TEST_ASSERT(!err, "add_on_commit failed: %s",
Packit 2997f0
                            rd_kafka_err2str(err));
Packit 2997f0
        }
Packit 2997f0
Packit 2997f0
        return RD_KAFKA_RESP_ERR_NO_ERROR;
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
static void do_test_producer (const char *topic) {
Packit 2997f0
        rd_kafka_conf_t *conf;
Packit 2997f0
        int i;
Packit 2997f0
        rd_kafka_t *rk;
Packit 2997f0
Packit 2997f0
        TEST_SAY(_C_MAG "[ %s ]\n" _C_CLR, __FUNCTION__);
Packit 2997f0
Packit 2997f0
        test_conf_init(&conf, NULL, 0);
Packit 2997f0
Packit 2997f0
        rd_kafka_conf_interceptor_add_on_new(conf, "on_new_prodcer",
Packit 2997f0
                                             on_new_producer, NULL);
Packit 2997f0
Packit 2997f0
        /* Create producer */
Packit 2997f0
        rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
Packit 2997f0
Packit 2997f0
        for (i = 0 ; i < msgcnt-1 ; i++)
Packit 2997f0
                do_test_produce(rk, topic, RD_KAFKA_PARTITION_UA, i, 0,
Packit 2997f0
                                producer_ic_cnt);
Packit 2997f0
Packit 2997f0
        /* Wait for messages to be delivered */
Packit 2997f0
        test_flush(rk, -1);
Packit 2997f0
Packit 2997f0
        /* Now send a message that will fail in produce()
Packit 2997f0
         * due to bad partition */
Packit 2997f0
        do_test_produce(rk, topic, 1234, i, 1, producer_ic_cnt);
Packit 2997f0
Packit 2997f0
Packit 2997f0
        /* Verify acks */
Packit 2997f0
        for (i = 0 ; i < msgcnt ; i++) {
Packit 2997f0
                struct msg_state *msg = &msgs[i];
Packit 2997f0
                msg_verify_ic_cnt(msg, "on_ack", msg->bits[_ON_ACK],
Packit 2997f0
                                  producer_ic_cnt);
Packit 2997f0
        }
Packit 2997f0
Packit 2997f0
        rd_kafka_destroy(rk);
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
Packit 2997f0
static rd_kafka_resp_err_t on_new_consumer (rd_kafka_t *rk,
Packit 2997f0
                                            const rd_kafka_conf_t *conf,
Packit 2997f0
                                            void *ic_opaque,
Packit 2997f0
                                            char *errstr, size_t errstr_size) {
Packit 2997f0
        int i;
Packit 2997f0
Packit 2997f0
        for (i = 0 ; i < consumer_ic_cnt ; i++) {
Packit 2997f0
                rd_kafka_interceptor_add_on_consume(
Packit 2997f0
                        rk, tsprintf("on_consume:%d",i),
Packit 2997f0
                        on_consume, (void *)(intptr_t)(on_consume_base | i));
Packit 2997f0
Packit 2997f0
                rd_kafka_interceptor_add_on_commit(
Packit 2997f0
                        rk, tsprintf("on_commit:%d",i),
Packit 2997f0
                        on_commit, (void *)(intptr_t)(on_commit_base | i));
Packit 2997f0
Packit 2997f0
                /* Add producer interceptors as well to make sure they
Packit 2997f0
                 * are not called. */
Packit 2997f0
                rd_kafka_interceptor_add_on_send(
Packit 2997f0
                        rk, tsprintf("on_send:%d",i),
Packit 2997f0
                        on_send, NULL);
Packit 2997f0
Packit 2997f0
                rd_kafka_interceptor_add_on_acknowledgement(
Packit 2997f0
                        rk, tsprintf("on_acknowledgement:%d",i),
Packit 2997f0
                        on_ack, NULL);
Packit 2997f0
        }
Packit 2997f0
Packit 2997f0
Packit 2997f0
        return RD_KAFKA_RESP_ERR_NO_ERROR;
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
Packit 2997f0
static void do_test_consumer (const char *topic) {
Packit 2997f0
Packit 2997f0
        rd_kafka_conf_t *conf;
Packit 2997f0
        int i;
Packit 2997f0
        rd_kafka_t *rk;
Packit 2997f0
Packit 2997f0
        TEST_SAY(_C_MAG "[ %s ]\n" _C_CLR, __FUNCTION__);
Packit 2997f0
Packit 2997f0
        test_conf_init(&conf, NULL, 0);
Packit 2997f0
Packit 2997f0
        rd_kafka_conf_interceptor_add_on_new(conf, "on_new_consumer",
Packit 2997f0
                                             on_new_consumer, NULL);
Packit 2997f0
Packit 2997f0
        test_conf_set(conf, "auto.offset.reset", "earliest");
Packit 2997f0
Packit 2997f0
        /* Create producer */
Packit 2997f0
        rk = test_create_consumer(topic, NULL, conf, NULL);
Packit 2997f0
Packit 2997f0
        test_consumer_subscribe(rk, topic);
Packit 2997f0
Packit 2997f0
        /* Consume messages (-1 for the one that failed producing) */
Packit 2997f0
        test_consumer_poll("interceptors.consume", rk, 0, -1, -1, msgcnt-1,
Packit 2997f0
                           NULL);
Packit 2997f0
Packit 2997f0
        /* Verify on_consume */
Packit 2997f0
        for (i = 0 ; i < msgcnt-1 ; i++) {
Packit 2997f0
                struct msg_state *msg = &msgs[i];
Packit 2997f0
                msg_verify_ic_cnt(msg, "on_consume", msg->bits[_ON_CONSUME],
Packit 2997f0
                                  consumer_ic_cnt);
Packit 2997f0
        }
Packit 2997f0
Packit 2997f0
        /* Verify that the produce-failed message didnt have
Packit 2997f0
         * interceptors called */
Packit 2997f0
        msg_verify_ic_cnt(&msgs[msgcnt-1], "on_consume",
Packit 2997f0
                          msgs[msgcnt-1].bits[_ON_CONSUME], 0);
Packit 2997f0
Packit 2997f0
        test_consumer_close(rk);
Packit 2997f0
Packit 2997f0
        verify_ic_cnt("on_commit", on_commit_bits, consumer_ic_cnt);
Packit 2997f0
Packit 2997f0
        rd_kafka_destroy(rk);
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
/**
Packit 2997f0
 * @brief Interceptors must not be copied automatically by conf_dup()
Packit 2997f0
 *        unless the interceptors have added on_conf_dup().
Packit 2997f0
 *        This behaviour makes sure an interceptor's instance
Packit 2997f0
 *        is not duplicated without the interceptor's knowledge or
Packit 2997f0
 *        assistance.
Packit 2997f0
 */
Packit 2997f0
static void do_test_conf_copy (const char *topic) {
Packit 2997f0
        rd_kafka_conf_t *conf, *conf2;
Packit 2997f0
        int i;
Packit 2997f0
        rd_kafka_t *rk;
Packit 2997f0
Packit 2997f0
        TEST_SAY(_C_MAG "[ %s ]\n" _C_CLR, __FUNCTION__);
Packit 2997f0
Packit 2997f0
        memset(&msgs[0], 0, sizeof(msgs));
Packit 2997f0
Packit 2997f0
        test_conf_init(&conf, NULL, 0);
Packit 2997f0
Packit 2997f0
        rd_kafka_conf_interceptor_add_on_new(conf, "on_new_conf_copy",
Packit 2997f0
                                             on_new_producer, NULL);
Packit 2997f0
Packit 2997f0
        /* Now copy the configuration to verify that interceptors are
Packit 2997f0
         * NOT copied. */
Packit 2997f0
        conf2 = conf;
Packit 2997f0
        conf = rd_kafka_conf_dup(conf2);
Packit 2997f0
        rd_kafka_conf_destroy(conf2);
Packit 2997f0
Packit 2997f0
        /* Create producer */
Packit 2997f0
        rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
Packit 2997f0
Packit 2997f0
        for (i = 0 ; i < msgcnt-1 ; i++)
Packit 2997f0
                do_test_produce(rk, topic, RD_KAFKA_PARTITION_UA, i, 0, 0);
Packit 2997f0
Packit 2997f0
        /* Wait for messages to be delivered */
Packit 2997f0
        test_flush(rk, -1);
Packit 2997f0
Packit 2997f0
        /* Verify acks */
Packit 2997f0
        for (i = 0 ; i < msgcnt ; i++) {
Packit 2997f0
                struct msg_state *msg = &msgs[i];
Packit 2997f0
                msg_verify_ic_cnt(msg, "on_ack", msg->bits[_ON_ACK], 0);
Packit 2997f0
        }
Packit 2997f0
Packit 2997f0
        rd_kafka_destroy(rk);
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
Packit 2997f0
int main_0064_interceptors (int argc, char **argv) {
Packit 2997f0
        const char *topic = test_mk_topic_name(__FUNCTION__, 1);
Packit 2997f0
Packit 2997f0
        do_test_producer(topic);
Packit 2997f0
Packit 2997f0
        do_test_consumer(topic);
Packit 2997f0
Packit 2997f0
        do_test_conf_copy(topic);
Packit 2997f0
Packit 2997f0
        return 0;
Packit 2997f0
}
Packit 2997f0