/*
* librdkafka - Apache Kafka C library
*
* Copyright (c) 2012-2015, Magnus Edenhill
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#include "test.h"
/* Typical include path would be <librdkafka/rdkafka.h>, but this program
* is built from within the librdkafka source tree and thus differs. */
#include "rdkafka.h" /* for Kafka driver */
/**
* Consumer: various offset commit constellations, matrix:
* enable.auto.commit, enable.auto.offset.store, async
*/
static const char *topic;
static const int msgcnt = 100;
static const int partition = 0;
static uint64_t testid;
static int64_t expected_offset = 0;
static int64_t committed_offset = -1;
static void offset_commit_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
rd_kafka_topic_partition_list_t *offsets,
void *opaque) {
rd_kafka_topic_partition_t *rktpar;
TEST_SAYL(3, "Offset committed: %s:\n", rd_kafka_err2str(err));
if (err == RD_KAFKA_RESP_ERR__NO_OFFSET)
return;
test_print_partition_list(offsets);
if (err)
TEST_FAIL("Offset commit failed: %s", rd_kafka_err2str(err));
if (offsets->cnt == 0)
TEST_FAIL("Expected at least one partition in offset_commit_cb");
/* Find correct partition */
if (!(rktpar = rd_kafka_topic_partition_list_find(offsets,
topic, partition)))
return;
if (rktpar->err)
TEST_FAIL("Offset commit failed for partitioń : %s",
rd_kafka_err2str(rktpar->err));
if (rktpar->offset > expected_offset)
TEST_FAIL("Offset committed %"PRId64
" > expected offset %"PRId64,
rktpar->offset, expected_offset);
if (rktpar->offset < committed_offset)
TEST_FAIL("Old offset %"PRId64" (re)committed: "
"should be above committed_offset %"PRId64,
rktpar->offset, committed_offset);
else if (rktpar->offset == committed_offset)
TEST_SAYL(1, "Current offset re-commited: %"PRId64"\n",
rktpar->offset);
else
committed_offset = rktpar->offset;
if (rktpar->offset < expected_offset) {
TEST_SAYL(3, "Offset committed %"PRId64
" < expected offset %"PRId64"\n",
rktpar->offset, expected_offset);
return;
}
TEST_SAYL(3, "Expected offset committed: %"PRId64"\n", rktpar->offset);
}
static void do_offset_test (const char *what, int auto_commit, int auto_store,
int async) {
test_timing_t t_all;
char groupid[64];
rd_kafka_t *rk;
rd_kafka_conf_t *conf;
rd_kafka_topic_conf_t *tconf;
int cnt = 0;
const int extra_cnt = 5;
rd_kafka_resp_err_t err;
rd_kafka_topic_partition_list_t *parts;
rd_kafka_topic_partition_t *rktpar;
int64_t next_offset = -1;
test_conf_init(&conf, &tconf, 30);
test_conf_set(conf, "session.timeout.ms", "6000");
test_conf_set(conf, "enable.auto.commit", auto_commit ? "true":"false");
test_conf_set(conf, "enable.auto.offset.store", auto_store ?"true":"false");
test_conf_set(conf, "auto.commit.interval.ms", "500");
rd_kafka_conf_set_offset_commit_cb(conf, offset_commit_cb);
test_topic_conf_set(tconf, "auto.offset.reset", "smallest");
test_str_id_generate(groupid, sizeof(groupid));
test_conf_set(conf, "group.id", groupid);
rd_kafka_conf_set_default_topic_conf(conf, tconf);
TEST_SAY(_C_MAG "[ do_offset_test: %s with group.id %s ]\n",
what, groupid);
TIMING_START(&t_all, "%s", what);
expected_offset = 0;
committed_offset = -1;
/* MO:
* - Create consumer.
* - Start consuming from beginning
* - Perform store & commits according to settings
* - Stop storing&committing when half of the messages are consumed,
* - but consume 5 more to check against.
* - Query position.
* - Destroy consumer.
* - Create new consumer with same group.id using stored offsets
* - Should consume the expected message.
*/
/* Create kafka instance */
rk = test_create_handle(RD_KAFKA_CONSUMER, rd_kafka_conf_dup(conf));
rd_kafka_poll_set_consumer(rk);
test_consumer_subscribe(rk, topic);
while (cnt - extra_cnt < msgcnt / 2) {
rd_kafka_message_t *rkm;
rkm = rd_kafka_consumer_poll(rk, 10*1000);
if (!rkm)
continue;
if (rkm->err == RD_KAFKA_RESP_ERR__TIMED_OUT)
TEST_FAIL("%s: Timed out waiting for message %d", what,cnt);
else if (rkm->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
rd_kafka_message_destroy(rkm);
continue;
} else if (rkm->err)
TEST_FAIL("%s: Consumer error: %s",
what, rd_kafka_message_errstr(rkm));
/* Offset of next message. */
next_offset = rkm->offset + 1;
if (cnt < msgcnt / 2) {
if (!auto_store) {
err = rd_kafka_offset_store(rkm->rkt,rkm->partition,
rkm->offset);
if (err)
TEST_FAIL("%s: offset_store failed: %s\n",
what, rd_kafka_err2str(err));
}
expected_offset = rkm->offset+1;
if (!auto_commit) {
test_timing_t t_commit;
TIMING_START(&t_commit,
"%s @ %"PRId64,
async?
"commit.async":
"commit.sync",
rkm->offset+1);
err = rd_kafka_commit_message(rk, rkm, async);
TIMING_STOP(&t_commit);
if (err)
TEST_FAIL("%s: commit failed: %s\n",
what, rd_kafka_err2str(err));
}
} else if (auto_store && auto_commit)
expected_offset = rkm->offset+1;
rd_kafka_message_destroy(rkm);
cnt++;
}
TEST_SAY("%s: done consuming after %d messages, at offset %"PRId64
", next_offset %"PRId64"\n",
what, cnt, expected_offset, next_offset);
if ((err = rd_kafka_assignment(rk, &parts)))
TEST_FAIL("%s: failed to get assignment(): %s\n",
what, rd_kafka_err2str(err));
/* Verify position */
if ((err = rd_kafka_position(rk, parts)))
TEST_FAIL("%s: failed to get position(): %s\n",
what, rd_kafka_err2str(err));
if (!(rktpar = rd_kafka_topic_partition_list_find(parts,
topic, partition)))
TEST_FAIL("%s: position(): topic lost\n", what);
if (rktpar->offset != next_offset)
TEST_FAIL("%s: Expected position() offset %"PRId64", got %"PRId64,
what, next_offset, rktpar->offset);
TEST_SAY("%s: Position is at %"PRId64", good!\n",
what, rktpar->offset);
/* Pause messages while waiting so we can serve callbacks
* without having more messages received. */
if ((err = rd_kafka_pause_partitions(rk, parts)))
TEST_FAIL("%s: failed to pause partitions: %s\n",
what, rd_kafka_err2str(err));
rd_kafka_topic_partition_list_destroy(parts);
/* Fire off any enqueued offset_commit_cb */
test_consumer_poll_no_msgs(what, rk, testid, 0);
TEST_SAY("%s: committed_offset %"PRId64", expected_offset %"PRId64"\n",
what, committed_offset, expected_offset);
if (!auto_commit && !async) {
/* Sync commits should be up to date at this point. */
if (committed_offset != expected_offset)
TEST_FAIL("%s: Sync commit: committed offset %"PRId64
" should be same as expected offset "
"%"PRId64,
what, committed_offset, expected_offset);
} else {
/* Wait for offset commits to catch up */
while (committed_offset < expected_offset) {
TEST_SAYL(2, "%s: Wait for committed offset %"PRId64
" to reach expected offset %"PRId64"\n",
what, committed_offset, expected_offset);
test_consumer_poll_no_msgs(what, rk, testid, 1000);
}
}
TEST_SAY("%s: phase 1 complete, %d messages consumed, "
"next expected offset is %"PRId64"\n",
what, cnt, expected_offset);
/* Issue #827: cause committed() to return prematurely by specifying
* low timeout. The bug (use after free) will only
* be catched by valgrind. */
do {
parts = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(parts, topic, partition);
err = rd_kafka_committed(rk, parts, 1);
rd_kafka_topic_partition_list_destroy(parts);
TEST_SAY("Issue #827: committed() returned %s\n",
rd_kafka_err2str(err));
} while (err != RD_KAFKA_RESP_ERR__TIMED_OUT);
/* Query position */
parts = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(parts, topic, partition);
err = rd_kafka_committed(rk, parts, tmout_multip(5*1000));
if (err)
TEST_FAIL("%s: committed() failed: %s", what, rd_kafka_err2str(err));
if (!(rktpar = rd_kafka_topic_partition_list_find(parts,
topic, partition)))
TEST_FAIL("%s: committed(): topic lost\n", what);
if (rktpar->offset != expected_offset)
TEST_FAIL("%s: Expected committed() offset %"PRId64", got %"PRId64,
what, expected_offset, rktpar->offset);
TEST_SAY("%s: Committed offset is at %"PRId64", good!\n",
what, rktpar->offset);
rd_kafka_topic_partition_list_destroy(parts);
test_consumer_close(rk);
rd_kafka_destroy(rk);
/* Fire up a new consumer and continue from where we left off. */
TEST_SAY("%s: phase 2: starting new consumer to resume consumption\n",what);
rk = test_create_handle(RD_KAFKA_CONSUMER, conf);
rd_kafka_poll_set_consumer(rk);
test_consumer_subscribe(rk, topic);
while (cnt < msgcnt) {
rd_kafka_message_t *rkm;
rkm = rd_kafka_consumer_poll(rk, 10*1000);
if (!rkm)
continue;
if (rkm->err == RD_KAFKA_RESP_ERR__TIMED_OUT)
TEST_FAIL("%s: Timed out waiting for message %d", what,cnt);
else if (rkm->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
rd_kafka_message_destroy(rkm);
continue;
} else if (rkm->err)
TEST_FAIL("%s: Consumer error: %s",
what, rd_kafka_message_errstr(rkm));
if (rkm->offset != expected_offset)
TEST_FAIL("%s: Received message offset %"PRId64
", expected %"PRId64" at msgcnt %d/%d\n",
what, rkm->offset, expected_offset,
cnt, msgcnt);
rd_kafka_message_destroy(rkm);
expected_offset++;
cnt++;
}
TEST_SAY("%s: phase 2: complete\n", what);
test_consumer_close(rk);
rd_kafka_destroy(rk);
TIMING_STOP(&t_all);
}
static void empty_offset_commit_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
rd_kafka_topic_partition_list_t *offsets,
void *opaque) {
rd_kafka_resp_err_t expected = *(rd_kafka_resp_err_t *)opaque;
int valid_offsets = 0;
int i;
TEST_SAY("Offset commit callback for %d partitions: %s (expecting %s)\n",
offsets ? offsets->cnt : 0,
rd_kafka_err2str(err),
rd_kafka_err2str(expected));
if (expected != err)
TEST_FAIL("Offset commit cb: expected %s, got %s",
rd_kafka_err2str(expected),
rd_kafka_err2str(err));
for (i = 0 ; i < offsets->cnt ; i++) {
TEST_SAY("committed: %s [%"PRId32"] offset %"PRId64
": %s\n",
offsets->elems[i].topic,
offsets->elems[i].partition,
offsets->elems[i].offset,
rd_kafka_err2str(offsets->elems[i].err));
if (expected == RD_KAFKA_RESP_ERR_NO_ERROR)
TEST_ASSERT(offsets->elems[i].err == expected);
if (offsets->elems[i].offset > 0)
valid_offsets++;
}
if (expected == RD_KAFKA_RESP_ERR_NO_ERROR) {
/* If no error is expected we instead expect one proper offset
* to have been committed. */
TEST_ASSERT(valid_offsets > 0);
}
}
/**
* Trigger an empty cgrp commit (issue #803)
*/
static void do_empty_commit (void) {
rd_kafka_t *rk;
char group_id[64];
rd_kafka_conf_t *conf;
rd_kafka_topic_conf_t *tconf;
rd_kafka_resp_err_t err, expect;
test_conf_init(&conf, &tconf, 20);
test_conf_set(conf, "enable.auto.commit", "false");
test_topic_conf_set(tconf, "auto.offset.reset", "earliest");
test_str_id_generate(group_id, sizeof(group_id));
TEST_SAY(_C_MAG "[ do_empty_commit group.id %s ]\n", group_id);
rk = test_create_consumer(group_id, NULL, conf, tconf);
test_consumer_subscribe(rk, topic);
test_consumer_poll("consume", rk, testid, -1, -1, 100, NULL);
TEST_SAY("First commit\n");
expect = RD_KAFKA_RESP_ERR_NO_ERROR;
err = rd_kafka_commit_queue(rk, NULL, NULL,
empty_offset_commit_cb, &expect);
if (err != expect)
TEST_FAIL("commit failed: %s", rd_kafka_err2str(err));
else
TEST_SAY("First commit returned %s\n",
rd_kafka_err2str(err));
TEST_SAY("Second commit, should be empty\n");
expect = RD_KAFKA_RESP_ERR__NO_OFFSET;
err = rd_kafka_commit_queue(rk, NULL, NULL,
empty_offset_commit_cb, &expect);
if (err != RD_KAFKA_RESP_ERR__NO_OFFSET)
TEST_FAIL("unexpected commit result, wanted NO_OFFSET, got: %s",
rd_kafka_err2str(err));
else
TEST_SAY("Second commit returned %s\n",
rd_kafka_err2str(err));
test_consumer_close(rk);
rd_kafka_destroy(rk);
}
/**
* Commit non-existent topic (issue #704)
*/
static void nonexist_offset_commit_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
rd_kafka_topic_partition_list_t *offsets,
void *opaque) {
int i;
int failed_offsets = 0;
TEST_SAY("Offset commit callback for %d partitions: %s\n",
offsets ? offsets->cnt : 0,
rd_kafka_err2str(err));
TEST_ASSERT(offsets != NULL);
for (i = 0 ; i < offsets->cnt ; i++) {
TEST_SAY("committed: %s [%"PRId32"] offset %"PRId64
": %s\n",
offsets->elems[i].topic,
offsets->elems[i].partition,
offsets->elems[i].offset,
rd_kafka_err2str(offsets->elems[i].err));
failed_offsets += offsets->elems[i].err ? 1 : 0;
}
TEST_ASSERT(err == RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART,
"expected unknown Topic or partition, not %s", rd_kafka_err2str(err));
TEST_ASSERT(offsets->cnt == 2, "expected %d offsets", offsets->cnt);
TEST_ASSERT(failed_offsets == offsets->cnt,
"expected %d offsets to have failed, got %d",
offsets->cnt, failed_offsets);
}
static void do_nonexist_commit (void) {
rd_kafka_t *rk;
char group_id[64];
rd_kafka_conf_t *conf;
rd_kafka_topic_conf_t *tconf;
rd_kafka_topic_partition_list_t *offsets;
const char *unk_topic = test_mk_topic_name(__FUNCTION__, 1);
rd_kafka_resp_err_t err;
test_conf_init(&conf, &tconf, 20);
/* Offset commit deferrals when the broker is down is limited to
* session.timeout.ms. With 0.9 brokers and api.version.request=true
* the initial connect to all brokers will take 10*2 seconds
* and the commit_queue() below will time out too quickly.
* Set the session timeout high here to avoid it. */
test_conf_set(conf, "session.timeout.ms", "60000");
test_str_id_generate(group_id, sizeof(group_id));
test_conf_set(conf, "group.id", group_id);
rd_kafka_conf_set_default_topic_conf(conf, tconf);
TEST_SAY(_C_MAG "[ do_nonexist_commit group.id %s ]\n", group_id);
rk = test_create_handle(RD_KAFKA_CONSUMER, conf);
rd_kafka_poll_set_consumer(rk);
TEST_SAY("Try nonexist commit\n");
offsets = rd_kafka_topic_partition_list_new(2);
rd_kafka_topic_partition_list_add(offsets, unk_topic, 0)->offset = 123;
rd_kafka_topic_partition_list_add(offsets, unk_topic, 1)->offset = 456;
err = rd_kafka_commit_queue(rk, offsets, NULL,
nonexist_offset_commit_cb, NULL);
TEST_SAY("nonexist commit returned %s\n", rd_kafka_err2str(err));
if (err != RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART)
TEST_FAIL("commit() should give UnknownTopicOrPart, not: %s",
rd_kafka_err2str(err));
rd_kafka_topic_partition_list_destroy(offsets);
test_consumer_close(rk);
rd_kafka_destroy(rk);
}
int main_0030_offset_commit (int argc, char **argv) {
topic = test_mk_topic_name(__FUNCTION__, 1);
testid = test_produce_msgs_easy(topic, 0, partition, msgcnt);
do_offset_test("AUTO.COMMIT & AUTO.STORE",
1 /* enable.auto.commit */,
1 /* enable.auto.offset.store */,
0 /* not used. */);
do_offset_test("AUTO.COMMIT & MANUAL.STORE",
1 /* enable.auto.commit */,
0 /* enable.auto.offset.store */,
0 /* not used */);
do_offset_test("MANUAL.COMMIT.ASYNC & AUTO.STORE",
0 /* enable.auto.commit */,
1 /* enable.auto.offset.store */,
1 /* async */);
do_offset_test("MANUAL.COMMIT.SYNC & AUTO.STORE",
0 /* enable.auto.commit */,
1 /* enable.auto.offset.store */,
0 /* async */);
do_offset_test("MANUAL.COMMIT.ASYNC & MANUAL.STORE",
0 /* enable.auto.commit */,
0 /* enable.auto.offset.store */,
1 /* sync */);
do_offset_test("MANUAL.COMMIT.SYNC & MANUAL.STORE",
0 /* enable.auto.commit */,
0 /* enable.auto.offset.store */,
0 /* sync */);
do_empty_commit();
do_nonexist_commit();
return 0;
}