Blob Blame History Raw
/*
 * librdkafka - Apache Kafka C library
 *
 * Copyright (c) 2012-2015, Magnus Edenhill
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *
 * 1. Redistributions of source code must retain the above copyright notice,
 *    this list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright notice,
 *    this list of conditions and the following disclaimer in the documentation
 *    and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */

#include "test.h"

/* Typical include path would be <librdkafka/rdkafka.h>, but this program
 * is built from within the librdkafka source tree and thus differs. */
#include "rdkafka.h" /* for Kafka driver */

/**
 * KafkaConsumer balanced group with multithreading tests
 *
 * Runs a consumer subscribing to a topic with multiple partitions and farms
 * consuming of each partition to a separate thread.
 */

#define MAX_THRD_CNT 4

static int assign_cnt = 0;
static int consumed_msg_cnt = 0;
static int consumers_running = 0;
static int exp_msg_cnt;

static mtx_t lock;
static thrd_t tids[MAX_THRD_CNT];

typedef struct part_consume_info_s {
        rd_kafka_queue_t * rkqu;
        int partition;
} part_consume_info_t;

static int is_consuming () {
        int result;
        mtx_lock(&lock);
        result = consumers_running;
        mtx_unlock(&lock);
        return result;
}

static int partition_consume (void *args) {
        part_consume_info_t *info = (part_consume_info_t *)args;
        rd_kafka_queue_t *rkqu = info->rkqu;
        int partition = info->partition;
        int64_t ts_start = test_clock();
        int max_time = (test_session_timeout_ms + 3000) * 1000;
        int running = 1;

        free(args); /* Free the parameter struct dynamically allocated for us */

        while (ts_start + max_time > test_clock() && running &&
               is_consuming()) {
                rd_kafka_message_t *rkmsg;

                rkmsg = rd_kafka_consume_queue(rkqu, 500);

                if (!rkmsg)
                        continue;
                else if (rkmsg->err == RD_KAFKA_RESP_ERR__PARTITION_EOF)
                        running = 0;
                else if (rkmsg->err) {
                        mtx_lock(&lock);
                        TEST_FAIL("Message error "
                                  "(at offset %" PRId64 " after "
                                  "%d/%d messages and %dms): %s",
                                  rkmsg->offset, consumed_msg_cnt, exp_msg_cnt,
                                  (int)(test_clock() - ts_start) / 1000,
                                  rd_kafka_message_errstr(rkmsg));
                        mtx_unlock(&lock);
                } else {
                        if (rkmsg->partition != partition) {
                                mtx_lock(&lock);
                                TEST_FAIL("Message consumed has partition %d "
                                          "but we expected partition %d.",
                                          rkmsg->partition, partition);
                                mtx_unlock(&lock);
                        }
                }
                rd_kafka_message_destroy(rkmsg);

                mtx_lock(&lock);
                if (running && ++consumed_msg_cnt >= exp_msg_cnt) {
                        TEST_SAY("All messages consumed\n");
                        running = 0;
                }
                mtx_unlock(&lock);
        }

        rd_kafka_queue_destroy(rkqu);

        return thrd_success;
}

static thrd_t spawn_thread (rd_kafka_queue_t *rkqu, int partition) {
        thrd_t thr;
        part_consume_info_t *info = malloc(sizeof(part_consume_info_t));

        info->rkqu = rkqu;
        info->partition = partition;

        if (thrd_create(&thr, &partition_consume, info) != thrd_success) {
                TEST_FAIL("Failed to create consumer thread.");
        }
        return thr;
}

static int rebalanced = 0;

static void rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
                         rd_kafka_topic_partition_list_t *partitions,
                         void *opaque) {
        int i;
        char *memberid = rd_kafka_memberid(rk);

        TEST_SAY("%s: MemberId \"%s\": Consumer group rebalanced: %s\n",
                 rd_kafka_name(rk), memberid, rd_kafka_err2str(err));

        if (memberid)
                free(memberid);

        test_print_partition_list(partitions);

        switch (err) {
        case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
                assign_cnt++;

                rd_kafka_assign(rk, partitions);
                mtx_lock(&lock);
                consumers_running = 1;
                mtx_unlock(&lock);

                for (i = 0; i < partitions->cnt && i < MAX_THRD_CNT; ++i) {
                        rd_kafka_topic_partition_t part = partitions->elems[i];
                        rd_kafka_queue_t *rkqu;
                        /* This queue is loosed in partition-consume. */
                        rkqu = rd_kafka_queue_get_partition(rk, part.topic,
                                                            part.partition);

                        rd_kafka_queue_forward(rkqu, NULL);
                        tids[part.partition] = spawn_thread(rkqu,
                                                            part.partition);
                }

                rebalanced = 1;

                break;

        case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
                if (assign_cnt == 0)
                        TEST_FAIL("asymetric rebalance_cb");
                assign_cnt--;
                rd_kafka_assign(rk, NULL);
                mtx_lock(&lock);
                consumers_running = 0;
                mtx_unlock(&lock);

                break;

        default:
                TEST_FAIL("rebalance failed: %s", rd_kafka_err2str(err));
                break;
        }
}

static void get_assignment (rd_kafka_t *rk_c) {
        while (!rebalanced) {
                rd_kafka_message_t *rkmsg;
                rkmsg = rd_kafka_consumer_poll(rk_c, 500);
                if (rkmsg)
                        rd_kafka_message_destroy(rkmsg);
        }
}

int main_0056_balanced_group_mt (int argc, char **argv) {
        const char *topic = test_mk_topic_name(__FUNCTION__, 1);
        rd_kafka_t *rk_p, *rk_c;
        rd_kafka_topic_t *rkt_p;
        int msg_cnt = 1000;
        int msg_base = 0;
        int partition_cnt = 2;
        int partition;
        uint64_t testid;
        rd_kafka_topic_conf_t *default_topic_conf;
        rd_kafka_topic_partition_list_t *sub, *topics;
        rd_kafka_resp_err_t err;
        test_timing_t t_assign, t_close, t_consume;
        int i;

        exp_msg_cnt = msg_cnt * partition_cnt;

        testid = test_id_generate();

        /* Produce messages */
        rk_p = test_create_producer();
        rkt_p = test_create_producer_topic(rk_p, topic, NULL);

        for (partition = 0; partition < partition_cnt; partition++) {
                test_produce_msgs(rk_p, rkt_p, testid, partition,
                                  msg_base + (partition * msg_cnt), msg_cnt,
                                  NULL, 0);
        }

        rd_kafka_topic_destroy(rkt_p);
        rd_kafka_destroy(rk_p);

        if (mtx_init(&lock, mtx_plain) != thrd_success)
                TEST_FAIL("Cannot create mutex.");

        test_conf_init(NULL, &default_topic_conf,
                       (test_session_timeout_ms * 3) / 1000);

        test_topic_conf_set(default_topic_conf, "auto.offset.reset",
                            "smallest");

        /* Fill in topic subscription set */
        topics = rd_kafka_topic_partition_list_new(1);
        rd_kafka_topic_partition_list_add(topics, topic, RD_KAFKA_PARTITION_UA);

        /* Create consumers and start subscription */
        rk_c = test_create_consumer(
                topic /*group_id*/, rebalance_cb, NULL,
                default_topic_conf);

        test_consumer_subscribe(rk_c, topic);

        rd_kafka_topic_partition_list_destroy(topics);

        /* Wait for both consumers to get an assignment */
        TIMING_START(&t_assign, "WAIT.ASSIGN");
        get_assignment(rk_c);
        TIMING_STOP(&t_assign);

        TIMING_START(&t_consume, "CONSUME.WAIT");
        for (i = 0; i < MAX_THRD_CNT; ++i) {
                if (tids[i] != 0)
                        thrd_join(tids[i], NULL);
        }
        TIMING_STOP(&t_consume);

        TEST_SAY("Closing remaining consumers\n");
        /* Query subscription */
        err = rd_kafka_subscription(rk_c, &sub);
        TEST_ASSERT(!err, "%s: subscription () failed: %s", rd_kafka_name(rk_c),
                    rd_kafka_err2str(err));
        TEST_SAY("%s: subscription (%d):\n", rd_kafka_name(rk_c), sub->cnt);
        for (i = 0; i < sub->cnt; ++i)
                TEST_SAY(" %s\n", sub->elems[i].topic);
        rd_kafka_topic_partition_list_destroy(sub);

        /* Run an explicit unsubscribe () (async) prior to close ()
         * to trigger race condition issues on termination. */
        TEST_SAY("Unsubscribing instance %s\n", rd_kafka_name(rk_c));
        err = rd_kafka_unsubscribe(rk_c);
        TEST_ASSERT(!err, "%s: unsubscribe failed: %s", rd_kafka_name(rk_c),
                    rd_kafka_err2str(err));

        TEST_SAY("Closing %s\n", rd_kafka_name(rk_c));
        TIMING_START(&t_close, "CONSUMER.CLOSE");
        err = rd_kafka_consumer_close(rk_c);
        TIMING_STOP(&t_close);
        TEST_ASSERT(!err, "consumer_close failed: %s", rd_kafka_err2str(err));

        rd_kafka_destroy(rk_c);
        rk_c = NULL;

        TEST_SAY("%d/%d messages consumed\n", consumed_msg_cnt, exp_msg_cnt);
        TEST_ASSERT(consumed_msg_cnt >= exp_msg_cnt,
                    "Only %d/%d messages were consumed", consumed_msg_cnt,
                    exp_msg_cnt);

        if (consumed_msg_cnt > exp_msg_cnt)
                TEST_SAY("At least %d/%d messages were consumed "
                         "multiple times\n",
                         consumed_msg_cnt - exp_msg_cnt, exp_msg_cnt);

        mtx_destroy(&lock);

        return 0;
}