/*
* librdkafka - Apache Kafka C library
*
* Copyright (c) 2016, Magnus Edenhill
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#include <iostream>
#include "testcpp.h"
/**
* binary search by timestamp: excercices KafkaConsumer's seek() API.
*/
static std::string topic;
static const int partition = 0;
static int64_t golden_timestamp = -1;
static int64_t golden_offset = -1;
/**
* @brief Seek to offset and consume that message.
*
* Asserts on failure.
*/
static RdKafka::Message *get_msg (RdKafka::KafkaConsumer *c, int64_t offset,
bool use_seek) {
RdKafka::TopicPartition *next =
RdKafka::TopicPartition::create(topic, partition, offset);
RdKafka::ErrorCode err;
/* Since seek() can only be used to change the currently consumed
* offset we need to start consuming the first time we run this
* loop by calling assign() */
test_timing_t t_seek;
TIMING_START(&t_seek, "seek");
if (!use_seek) {
std::vector<RdKafka::TopicPartition*> parts;
parts.push_back(next);
err = c->assign(parts);
if (err)
Test::Fail("assign() failed: " + RdKafka::err2str(err));
} else {
err = c->seek(*next, tmout_multip(5000));
if (err)
Test::Fail("seek() failed: " + RdKafka::err2str(err));
}
TIMING_STOP(&t_seek);
delete next;
test_timing_t t_consume;
TIMING_START(&t_consume, "consume");
RdKafka::Message *msg = c->consume(tmout_multip(5000));
if (!msg)
Test::Fail("consume() returned NULL");
TIMING_STOP(&t_consume);
if (msg->err())
Test::Fail("consume() returned error: " + msg->errstr());
if (msg->offset() != offset)
Test::Fail(tostr() << "seek()ed to offset " << offset <<
" but consume() returned offset " << msg->offset());
return msg;
}
class MyDeliveryReportCb : public RdKafka::DeliveryReportCb {
public:
void dr_cb (RdKafka::Message &msg) {
if (msg.err())
Test::Fail("Delivery failed: " + msg.errstr());
if (!msg.msg_opaque())
return;
RdKafka::MessageTimestamp ts = msg.timestamp();
if (ts.type != RdKafka::MessageTimestamp::MSG_TIMESTAMP_CREATE_TIME)
Test::Fail(tostr() << "Dr msg timestamp type wrong: " << ts.type);
golden_timestamp = ts.timestamp;
golden_offset = msg.offset();
}
};
static void do_test_bsearch (void) {
RdKafka::Conf *conf, *tconf;
int msgcnt = 1000;
int64_t timestamp;
std::string errstr;
RdKafka::ErrorCode err;
MyDeliveryReportCb my_dr;
topic = Test::mk_topic_name("0059-bsearch", 1);
Test::conf_init(&conf, &tconf, 0);
Test::conf_set(tconf, "produce.offset.report", "true");
Test::conf_set(conf, "api.version.request", "true");
conf->set("dr_cb", &my_dr, errstr);
conf->set("default_topic_conf", tconf, errstr);
RdKafka::Producer *p = RdKafka::Producer::create(conf, errstr);
if (!p)
Test::Fail("Failed to create Producer: " + errstr);
delete conf;
delete tconf;
timestamp = 1000;
for (int i = 0 ; i < msgcnt ; i++) {
err = p->produce(topic, partition, RdKafka::Producer::RK_MSG_COPY,
(void *)topic.c_str(), topic.size(), NULL, 0,
timestamp,
i == 357 ? (void *)1 /*golden*/ : NULL);
if (err != RdKafka::ERR_NO_ERROR)
Test::Fail("Produce failed: " + RdKafka::err2str(err));
timestamp += 100 + (timestamp % 9);
}
if (p->flush(tmout_multip(5000)) != 0)
Test::Fail("Not all messages flushed");
Test::Say(tostr() << "Produced " << msgcnt << " messages, " <<
"golden message with timestamp " << golden_timestamp <<
" at offset " << golden_offset << "\n");
delete p;
/*
* Now find the golden message using bsearch
*/
/* Create consumer */
Test::conf_init(&conf, NULL, 10);
Test::conf_set(conf, "group.id", topic);
Test::conf_set(conf, "api.version.request", "true");
Test::conf_set(conf, "fetch.wait.max.ms", "1");
Test::conf_set(conf, "fetch.error.backoff.ms", "1");
Test::conf_set(conf, "queued.min.messages", "1");
Test::conf_set(conf, "enable.auto.commit", "false");
Test::conf_set(conf, "socket.blocking.max.ms", "2000");
RdKafka::KafkaConsumer *c = RdKafka::KafkaConsumer::create(conf, errstr);
if (!c)
Test::Fail("Failed to create KafkaConsumer: " + errstr);
delete conf;
Test::Say("Find initial middle offset\n");
int64_t low, high;
test_timing_t t_qr;
TIMING_START(&t_qr, "query_watermark_offsets");
err = c->query_watermark_offsets(topic, partition, &low, &high,
tmout_multip(5000));
TIMING_STOP(&t_qr);
if (err)
Test::Fail("query_watermark_offsets failed: " + RdKafka::err2str(err));
/* Divide and conquer */
test_timing_t t_bsearch;
TIMING_START(&t_bsearch, "actual bsearch");
int itcnt = 0;
do {
int64_t mid;
mid = low + ((high - low) / 2);
Test::Say(1, tostr() << "Get message at mid point of " << low <<
".." << high << " -> " << mid << "\n");
RdKafka::Message *msg = get_msg(c, mid,
/* use assign() on first iteration,
* then seek() */
itcnt > 0);
RdKafka::MessageTimestamp ts = msg->timestamp();
if (ts.type != RdKafka::MessageTimestamp::MSG_TIMESTAMP_CREATE_TIME)
Test::Fail(tostr() << "Expected CreateTime timestamp, not " <<
ts.type << " at offset " << msg->offset());
Test::Say(1, tostr() << "Message at offset " << msg->offset() <<
" with timestamp " << ts.timestamp << "\n");
if (ts.timestamp == golden_timestamp) {
Test::Say(1, tostr() << "Found golden timestamp " << ts.timestamp <<
" at offset " << msg->offset() << " in " << itcnt+1 <<
" iterations\n");
delete msg;
break;
}
if (low == high) {
Test::Fail(tostr() << "Search exhausted at offset " << msg->offset() <<
" with timestamp " << ts.timestamp <<
" without finding golden timestamp " << golden_timestamp <<
" at offset " << golden_offset);
} else if (ts.timestamp < golden_timestamp)
low = msg->offset() + 1;
else if (ts.timestamp > golden_timestamp)
high = msg->offset() - 1;
delete msg;
itcnt++;
} while (true);
TIMING_STOP(&t_bsearch);
c->close();
delete c;
}
extern "C" {
int main_0059_bsearch (int argc, char **argv) {
do_test_bsearch();
return 0;
}
}