Blob Blame History Raw
/*
 * librdkafka - Apache Kafka C library
 *
 * Copyright (c) 2012-2015, Magnus Edenhill
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *
 * 1. Redistributions of source code must retain the above copyright notice,
 *    this list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright notice,
 *    this list of conditions and the following disclaimer in the documentation
 *    and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */

#include "test.h"

/* Typical include path would be <librdkafka/rdkafka.h>, but this program
 * is built from within the librdkafka source tree and thus differs. */
#include "rdkafka.h"  /* for Kafka driver */

static int prod_msg_remains = 0;
static int fails = 0;

/**
 * Delivery reported callback.
 * Called for each message once to signal its delivery status.
 */
static void dr_cb (rd_kafka_t *rk, void *payload, size_t len,
		   rd_kafka_resp_err_t err, void *opaque, void *msg_opaque) {

	if (err != RD_KAFKA_RESP_ERR_NO_ERROR)
		TEST_FAIL("Message delivery failed: %s\n",
			  rd_kafka_err2str(err));

	if (prod_msg_remains == 0)
		TEST_FAIL("Too many messages delivered (prod_msg_remains %i)",
			  prod_msg_remains);

	prod_msg_remains--;
}


/**
 * Produces 'msgcnt' messages split over 'partition_cnt' partitions.
 */
static void produce_messages (uint64_t testid, const char *topic,
                              int partition_cnt, int msg_base, int msgcnt) {
	int r;
	rd_kafka_t *rk;
	rd_kafka_topic_t *rkt;
	rd_kafka_conf_t *conf;
	rd_kafka_topic_conf_t *topic_conf;
	char errstr[512];
	int i;
	int32_t partition;
	int msgid = msg_base;

	test_conf_init(&conf, &topic_conf, 20);

	rd_kafka_conf_set_dr_cb(conf, dr_cb);

        /* Make sure all replicas are in-sync after producing
         * so that consume test wont fail. */
        rd_kafka_topic_conf_set(topic_conf, "request.required.acks", "-1",
                                errstr, sizeof(errstr));

	/* Create kafka instance */
	rk = test_create_handle(RD_KAFKA_PRODUCER, conf);

	rkt = rd_kafka_topic_new(rk, topic, topic_conf);
	if (!rkt)
		TEST_FAIL("Failed to create topic: %s\n",
                          rd_kafka_err2str(rd_kafka_last_error()));

        /* Produce messages */
	prod_msg_remains = msgcnt;
	for (partition = 0 ; partition < partition_cnt ; partition++) {
		int batch_cnt = msgcnt / partition_cnt;

		for (i = 0 ; i < batch_cnt ; i++) {
                        char key[128];
                        char buf[128];
			rd_snprintf(key, sizeof(key),
				 "testid=%"PRIu64", partition=%i, msg=%i",
				 testid, (int)partition, msgid);
                        rd_snprintf(buf, sizeof(buf),
                                 "data: testid=%"PRIu64", partition=%i, msg=%i",
				 testid, (int)partition, msgid);

                        r = rd_kafka_produce(rkt, partition,
                                             RD_KAFKA_MSG_F_COPY,
                                             buf, strlen(buf),
                                             key, strlen(key),
                                             NULL);
                        if (r == -1)
                                TEST_FAIL("Failed to produce message %i "
                                          "to partition %i: %s",
                                          msgid, (int)partition,
                                          rd_kafka_err2str(rd_kafka_last_error()));
			msgid++;
		}
        }


	/* Wait for messages to be delivered */
	while (rd_kafka_outq_len(rk) > 0)
		rd_kafka_poll(rk, 100);

	if (fails)
		TEST_FAIL("%i failures, see previous errors", fails);

	if (prod_msg_remains != 0)
		TEST_FAIL("Still waiting for %i messages to be produced",
			  prod_msg_remains);

	/* Destroy topic */
	rd_kafka_topic_destroy(rkt);

	/* Destroy rdkafka instance */
	TEST_SAY("Destroying kafka instance %s\n", rd_kafka_name(rk));
	rd_kafka_destroy(rk);
}



static int *cons_msgs;
static int  cons_msgs_size;
static int  cons_msgs_cnt;
static int  cons_msg_next;
static int  cons_msg_stop = -1;
static int64_t cons_last_offset = -1;  /* last offset received */

static void verify_consumed_msg_reset (int msgcnt) {
	if (cons_msgs) {
		free(cons_msgs);
		cons_msgs = NULL;
	}

	if (msgcnt) {
		int i;

		cons_msgs = malloc(sizeof(*cons_msgs) * msgcnt);
		for (i = 0 ; i < msgcnt ; i++)
			cons_msgs[i] = -1;
	}

	cons_msgs_size = msgcnt;
	cons_msgs_cnt = 0;
        cons_msg_next = 0;
        cons_msg_stop = -1;
        cons_last_offset = -1;

        TEST_SAY("Reset consumed_msg stats, making room for %d new messages\n",
                 msgcnt);
}


static int int_cmp (const void *_a, const void *_b) {
	int a = *(int *)_a;
	int b = *(int *)_b;
        /* Sort -1 (non-received msgs) at the end */
	return (a == -1 ? 100000000 : a) - (b == -1 ? 10000000 : b);
}

static void verify_consumed_msg_check0 (const char *func, int line,
                                        const char *desc,
                                        int expected_cnt) {
	int i;
	int fails = 0;
        int not_recvd = 0;

        TEST_SAY("%s: received %d/%d/%d messages\n",
                 desc, cons_msgs_cnt, expected_cnt, cons_msgs_size);
        if (expected_cnt > cons_msgs_size)
                TEST_FAIL("expected_cnt %d > cons_msgs_size %d\n",
                          expected_cnt, cons_msgs_size);

	if (cons_msgs_cnt < expected_cnt) {
		TEST_SAY("%s: Missing %i messages in consumer\n",
			 desc,expected_cnt - cons_msgs_cnt);
		fails++;
	}

	qsort(cons_msgs, cons_msgs_size, sizeof(*cons_msgs), int_cmp);

	for (i = 0 ; i < expected_cnt ; i++) {
		if (cons_msgs[i] != i) {
                        if (cons_msgs[i] == -1) {
                                not_recvd++;
                                TEST_SAY("%s: msg %d/%d not received\n",
                                         desc, i, expected_cnt);
                        } else
                                TEST_SAY("%s: Consumed message #%i is wrong, "
                                         "expected #%i\n",
                                         desc, cons_msgs[i], i);
			fails++;
		}
	}

        if (not_recvd)
                TEST_SAY("%s: %d messages not received at all\n",
                         desc, not_recvd);

	if (fails)
		TEST_FAIL("%s: See above error(s)", desc);
        else
                TEST_SAY("%s: message range check: %d/%d messages consumed: "
                         "succeeded\n", desc, cons_msgs_cnt, expected_cnt);

}


#define verify_consumed_msg_check(desc,expected_cnt)                        \
	verify_consumed_msg_check0(__FUNCTION__,__LINE__, desc, expected_cnt)



static void verify_consumed_msg0 (const char *func, int line,
				  uint64_t testid, int32_t partition,
				  int msgnum,
				  rd_kafka_message_t *rkmessage) {
	uint64_t in_testid;
	int in_part;
	int in_msgnum;
	char buf[128];

	if (rkmessage->key_len +1 >= sizeof(buf))
		TEST_FAIL("Incoming message key too large (%i): "
			  "not sourced by this test",
			  (int)rkmessage->key_len);

	rd_snprintf(buf, sizeof(buf), "%.*s",
		 (int)rkmessage->key_len, (char *)rkmessage->key);

	if (sscanf(buf, "testid=%"SCNu64", partition=%i, msg=%i",
		   &in_testid, &in_part, &in_msgnum) != 3)
		TEST_FAIL("Incorrect key format: %s", buf);

        if (test_level > 2) {
		TEST_SAY("%s:%i: Our testid %"PRIu64", part %i (%i), "
			 "msg %i/%i, key's: \"%s\"\n",
			 func, line,
			 testid, (int)partition, (int)rkmessage->partition,
			 msgnum, cons_msgs_size, buf);
	}

	if (testid != in_testid ||
	    (partition != -1 && partition != in_part) ||
	    (msgnum != -1 && msgnum != in_msgnum) ||
	    (in_msgnum < 0 || in_msgnum > cons_msgs_size))
		goto fail_match;

	if (cons_msgs_cnt == cons_msgs_size) {
		TEST_SAY("Too many messages in cons_msgs (%i) while reading "
			 "message key \"%s\"\n",
			 cons_msgs_cnt, buf);
		verify_consumed_msg_check("?", cons_msgs_size);
		TEST_FAIL("See above error(s)");
	}

	cons_msgs[cons_msgs_cnt++] = in_msgnum;
        cons_last_offset = rkmessage->offset;

	return;

 fail_match:
	TEST_FAIL("%s:%i: Our testid %"PRIu64", part %i, msg %i/%i did "
		  "not match message's key: \"%s\"\n",
		  func, line,
		  testid, (int)partition, msgnum, cons_msgs_size, buf);
}

#define verify_consumed_msg(testid,part,msgnum,rkmessage) \
	verify_consumed_msg0(__FUNCTION__,__LINE__,testid,part,msgnum,rkmessage)


static void consume_cb (rd_kafka_message_t *rkmessage, void *opaque) {
        int64_t testid = *(int64_t *)opaque;

	if (test_level > 2)
		TEST_SAY("Consumed message #%d? at offset %"PRId64": %s\n",
			 cons_msg_next, rkmessage->offset,
			 rd_kafka_err2str(rkmessage->err));

        if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
                TEST_SAY("EOF at offset %"PRId64"\n", rkmessage->offset);
                return;
        }

        if (rkmessage->err)
                TEST_FAIL("Consume message from partition %i "
                          "has error: %s",
                          (int)rkmessage->partition,
                          rd_kafka_err2str(rkmessage->err));

        verify_consumed_msg(testid, rkmessage->partition,
                            cons_msg_next, rkmessage);

        if (cons_msg_next == cons_msg_stop) {
                rd_kafka_yield(NULL/*FIXME*/);
        }

        cons_msg_next++;
}

static void consume_messages_callback_multi (const char *desc,
                                             uint64_t testid, const char *topic,
                                             int32_t partition,
                                             const char *offset_store_method,
                                             int msg_base,
                                             int msg_cnt,
                                             int64_t initial_offset,
                                             int iterations) {
	rd_kafka_t *rk;
	rd_kafka_topic_t *rkt;
	rd_kafka_conf_t *conf;
	rd_kafka_topic_conf_t *topic_conf;
	int i;

        TEST_SAY("%s: Consume messages %d+%d from %s [%"PRId32"] "
                 "from offset %"PRId64" in %d iterations\n",
                 desc, msg_base, msg_cnt, topic, partition,
                 initial_offset, iterations);

	test_conf_init(&conf, &topic_conf, 20);

        test_topic_conf_set(topic_conf, "offset.store.method",
                            offset_store_method);

        if (!strcmp(offset_store_method, "broker")) {
                /* Broker based offset storage requires a group.id */
                test_conf_set(conf, "group.id", topic);
        }

	/* Create kafka instance */
	rk = test_create_handle(RD_KAFKA_CONSUMER, conf);

        rd_kafka_topic_conf_set(topic_conf, "auto.offset.reset", "smallest",
                                NULL, 0);

	rkt = rd_kafka_topic_new(rk, topic, topic_conf);
	if (!rkt)
		TEST_FAIL("%s: Failed to create topic: %s\n",
                          desc, rd_kafka_err2str(rd_kafka_last_error()));

	cons_msg_stop = cons_msg_next + msg_cnt - 1;

        /* Consume the same batch of messages multiple times to
         * make sure back-to-back start&stops work. */
        for (i = 0 ; i < iterations ; i++) {
                int cnta;
                test_timing_t t_stop;

                TEST_SAY("%s: Iteration #%i: Consuming from "
                         "partition %i at offset %"PRId64", "
                         "msgs range %d..%d\n",
                         desc, i, partition, initial_offset,
                         cons_msg_next, cons_msg_stop);

                /* Consume messages */
                if (rd_kafka_consume_start(rkt, partition, initial_offset) == -1)
                        TEST_FAIL("%s: consume_start(%i) failed: %s",
                                  desc, (int)partition,
                                  rd_kafka_err2str(rd_kafka_last_error()));


                /* Stop consuming messages when this number of messages
                 * is reached. */
                cnta = cons_msg_next;
                do {
                        rd_kafka_consume_callback(rkt, partition, 1000,
                                                  consume_cb, &testid);
                } while (cons_msg_next < cons_msg_stop);

                TEST_SAY("%s: Iteration #%i: consumed %i messages\n",
                         desc, i, cons_msg_next - cnta);

                TIMING_START(&t_stop, "rd_kafka_consume_stop()");
                rd_kafka_consume_stop(rkt, partition);
                TIMING_STOP(&t_stop);

                /* Advance next offset so we dont reconsume
                 * messages on the next run. */
                if (initial_offset != RD_KAFKA_OFFSET_STORED) {
                        initial_offset = cons_last_offset+1;
			cons_msg_stop = cons_msg_next + msg_cnt - 1;
		}
        }

	/* Destroy topic */
	rd_kafka_topic_destroy(rkt);

	/* Destroy rdkafka instance */
	TEST_SAY("%s: Destroying kafka instance %s\n", desc, rd_kafka_name(rk));
	rd_kafka_destroy(rk);
}



static void test_produce_consume (const char *offset_store_method) {
	int msgcnt = 100;
        int partition_cnt = 1;
	int i;
	uint64_t testid;
	int msg_base = 0;
        const char *topic;

	/* Generate a testid so we can differentiate messages
	 * from other tests */
	testid = test_id_generate();

        /* Read test.conf to configure topic name */
        test_conf_init(NULL, NULL, 20);
        topic = test_mk_topic_name("0014", 1/*random*/);

	TEST_SAY("Topic %s, testid %"PRIu64", offset.store.method=%s\n",
                 topic, testid, offset_store_method);

	/* Produce messages */
	produce_messages(testid, topic, partition_cnt, msg_base, msgcnt);

        /* 100% of messages */
        verify_consumed_msg_reset(msgcnt);

	/* Consume 50% of messages with callbacks: stored offsets with no prior
         * offset stored. */
	for (i = 0 ; i < partition_cnt ; i++)
		consume_messages_callback_multi("STORED.1/2", testid, topic, i,
                                                offset_store_method,
                                                msg_base,
                                                (msgcnt / partition_cnt) / 2,
                                                RD_KAFKA_OFFSET_STORED,
                                                1);
        verify_consumed_msg_check("STORED.1/2", msgcnt / 2);

        /* Consume the rest using the now stored offset */
        for (i = 0 ; i < partition_cnt ; i++)
		consume_messages_callback_multi("STORED.2/2", testid, topic, i,
                                                offset_store_method,
                                                msg_base,
                                                (msgcnt / partition_cnt) / 2,
                                                RD_KAFKA_OFFSET_STORED,
                                                1);
        verify_consumed_msg_check("STORED.2/2", msgcnt);


	/* Consume messages with callbacks: logical offsets */
	verify_consumed_msg_reset(msgcnt);
	for (i = 0 ; i < partition_cnt ; i++) {
                int p_msg_cnt = msgcnt / partition_cnt;
                int64_t initial_offset = RD_KAFKA_OFFSET_TAIL(p_msg_cnt);
                const int iterations = 4;
		consume_messages_callback_multi("TAIL+", testid, topic, i,
                                                offset_store_method,
                                                /* start here (msgid) */
                                                msg_base,
                                                /* consume this many messages
                                                 * per iteration. */
                                                p_msg_cnt / iterations,
                                                /* start here (offset) */
                                                initial_offset,
                                                iterations);
        }

        verify_consumed_msg_check("TAIL+", msgcnt);

        verify_consumed_msg_reset(0);

	return;
}




int main_0014_reconsume_191 (int argc, char **argv) {
	if (test_broker_version >= TEST_BRKVER(0,8,2,0))
		test_produce_consume("broker");
        test_produce_consume("file");
	return 0;
}