Blame tests/0059-bsearch.cpp

Packit 2997f0
/*
Packit 2997f0
 * librdkafka - Apache Kafka C library
Packit 2997f0
 *
Packit 2997f0
 * Copyright (c) 2016, Magnus Edenhill
Packit 2997f0
 * All rights reserved.
Packit 2997f0
 *
Packit 2997f0
 * Redistribution and use in source and binary forms, with or without
Packit 2997f0
 * modification, are permitted provided that the following conditions are met:
Packit 2997f0
 *
Packit 2997f0
 * 1. Redistributions of source code must retain the above copyright notice,
Packit 2997f0
 *    this list of conditions and the following disclaimer.
Packit 2997f0
 * 2. Redistributions in binary form must reproduce the above copyright notice,
Packit 2997f0
 *    this list of conditions and the following disclaimer in the documentation
Packit 2997f0
 *    and/or other materials provided with the distribution.
Packit 2997f0
 *
Packit 2997f0
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
Packit 2997f0
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
Packit 2997f0
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
Packit 2997f0
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
Packit 2997f0
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
Packit 2997f0
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
Packit 2997f0
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
Packit 2997f0
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
Packit 2997f0
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
Packit 2997f0
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
Packit 2997f0
 * POSSIBILITY OF SUCH DAMAGE.
Packit 2997f0
 */
Packit 2997f0
Packit 2997f0
#include <iostream>
Packit 2997f0
#include "testcpp.h"
Packit 2997f0
Packit 2997f0
/**
Packit 2997f0
 * binary search by timestamp: excercices KafkaConsumer's seek() API.
Packit 2997f0
 */
Packit 2997f0
Packit 2997f0
Packit 2997f0
static std::string topic;
Packit 2997f0
static const int partition = 0;
Packit 2997f0
static int64_t golden_timestamp = -1;
Packit 2997f0
static int64_t golden_offset = -1;
Packit 2997f0
Packit 2997f0
/**
Packit 2997f0
 * @brief Seek to offset and consume that message.
Packit 2997f0
 *
Packit 2997f0
 * Asserts on failure.
Packit 2997f0
 */
Packit 2997f0
static RdKafka::Message *get_msg (RdKafka::KafkaConsumer *c, int64_t offset,
Packit 2997f0
                                  bool use_seek) {
Packit 2997f0
  RdKafka::TopicPartition *next =
Packit 2997f0
    RdKafka::TopicPartition::create(topic, partition, offset);
Packit 2997f0
  RdKafka::ErrorCode err;
Packit 2997f0
Packit 2997f0
  /* Since seek() can only be used to change the currently consumed
Packit 2997f0
   * offset we need to start consuming the first time we run this
Packit 2997f0
   * loop by calling assign() */
Packit 2997f0
Packit 2997f0
  test_timing_t t_seek;
Packit 2997f0
  TIMING_START(&t_seek, "seek");
Packit 2997f0
  if (!use_seek) {
Packit 2997f0
    std::vector<RdKafka::TopicPartition*> parts;
Packit 2997f0
    parts.push_back(next);
Packit 2997f0
    err = c->assign(parts);
Packit 2997f0
    if (err)
Packit 2997f0
      Test::Fail("assign() failed: " + RdKafka::err2str(err));
Packit 2997f0
  } else {
Packit 2997f0
    err = c->seek(*next, tmout_multip(5000));
Packit 2997f0
    if (err)
Packit 2997f0
      Test::Fail("seek() failed: " + RdKafka::err2str(err));
Packit 2997f0
  }
Packit 2997f0
  TIMING_STOP(&t_seek);
Packit 2997f0
  delete next;
Packit 2997f0
Packit 2997f0
  test_timing_t t_consume;
Packit 2997f0
  TIMING_START(&t_consume, "consume");
Packit 2997f0
Packit 2997f0
  RdKafka::Message *msg = c->consume(tmout_multip(5000));
Packit 2997f0
  if (!msg)
Packit 2997f0
    Test::Fail("consume() returned NULL");
Packit 2997f0
  TIMING_STOP(&t_consume);
Packit 2997f0
Packit 2997f0
  if (msg->err())
Packit 2997f0
    Test::Fail("consume() returned error: " + msg->errstr());
Packit 2997f0
Packit 2997f0
  if (msg->offset() != offset)
Packit 2997f0
    Test::Fail(tostr() << "seek()ed to offset " << offset <<
Packit 2997f0
               " but consume() returned offset " << msg->offset());
Packit 2997f0
Packit 2997f0
  return msg;
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
class MyDeliveryReportCb : public RdKafka::DeliveryReportCb {
Packit 2997f0
 public:
Packit 2997f0
  void dr_cb (RdKafka::Message &msg) {
Packit 2997f0
    if (msg.err())
Packit 2997f0
      Test::Fail("Delivery failed: " + msg.errstr());
Packit 2997f0
Packit 2997f0
    if (!msg.msg_opaque())
Packit 2997f0
      return;
Packit 2997f0
Packit 2997f0
    RdKafka::MessageTimestamp ts = msg.timestamp();
Packit 2997f0
    if (ts.type != RdKafka::MessageTimestamp::MSG_TIMESTAMP_CREATE_TIME)
Packit 2997f0
      Test::Fail(tostr() << "Dr msg timestamp type wrong: " << ts.type);
Packit 2997f0
Packit 2997f0
    golden_timestamp = ts.timestamp;
Packit 2997f0
    golden_offset = msg.offset();
Packit 2997f0
  }
Packit 2997f0
};
Packit 2997f0
Packit 2997f0
static void do_test_bsearch (void) {
Packit 2997f0
  RdKafka::Conf *conf, *tconf;
Packit 2997f0
  int msgcnt = 1000;
Packit 2997f0
  int64_t timestamp;
Packit 2997f0
  std::string errstr;
Packit 2997f0
  RdKafka::ErrorCode err;
Packit 2997f0
  MyDeliveryReportCb my_dr;
Packit 2997f0
Packit 2997f0
  topic = Test::mk_topic_name("0059-bsearch", 1);
Packit 2997f0
  Test::conf_init(&conf, &tconf, 0);
Packit 2997f0
  Test::conf_set(tconf, "produce.offset.report", "true");
Packit 2997f0
  Test::conf_set(conf, "api.version.request", "true");
Packit 2997f0
  conf->set("dr_cb", &my_dr, errstr);
Packit 2997f0
  conf->set("default_topic_conf", tconf, errstr);
Packit 2997f0
Packit 2997f0
  RdKafka::Producer *p = RdKafka::Producer::create(conf, errstr);
Packit 2997f0
  if (!p)
Packit 2997f0
    Test::Fail("Failed to create Producer: " + errstr);
Packit 2997f0
  delete conf;
Packit 2997f0
  delete tconf;
Packit 2997f0
Packit 2997f0
  timestamp = 1000;
Packit 2997f0
  for (int i = 0 ; i < msgcnt ; i++) {
Packit 2997f0
    err = p->produce(topic, partition, RdKafka::Producer::RK_MSG_COPY,
Packit 2997f0
                     (void *)topic.c_str(), topic.size(), NULL, 0,
Packit 2997f0
                     timestamp,
Packit 2997f0
                     i == 357 ? (void *)1 /*golden*/ : NULL);
Packit 2997f0
      if (err != RdKafka::ERR_NO_ERROR)
Packit 2997f0
        Test::Fail("Produce failed: " + RdKafka::err2str(err));
Packit 2997f0
      timestamp += 100 + (timestamp % 9);
Packit 2997f0
  }
Packit 2997f0
Packit 2997f0
  if (p->flush(tmout_multip(5000)) != 0)
Packit 2997f0
    Test::Fail("Not all messages flushed");
Packit 2997f0
Packit 2997f0
  Test::Say(tostr() << "Produced " << msgcnt << " messages, " <<
Packit 2997f0
            "golden message with timestamp " << golden_timestamp <<
Packit 2997f0
            " at offset " << golden_offset << "\n");
Packit 2997f0
Packit 2997f0
  delete p;
Packit 2997f0
Packit 2997f0
  /*
Packit 2997f0
   * Now find the golden message using bsearch
Packit 2997f0
   */
Packit 2997f0
Packit 2997f0
  /* Create consumer */
Packit 2997f0
  Test::conf_init(&conf, NULL, 10);
Packit 2997f0
  Test::conf_set(conf, "group.id", topic);
Packit 2997f0
  Test::conf_set(conf, "api.version.request", "true");
Packit 2997f0
  Test::conf_set(conf, "fetch.wait.max.ms", "1");
Packit 2997f0
  Test::conf_set(conf, "fetch.error.backoff.ms", "1");
Packit 2997f0
  Test::conf_set(conf, "queued.min.messages", "1");
Packit 2997f0
  Test::conf_set(conf, "enable.auto.commit", "false");
Packit 2997f0
  Test::conf_set(conf, "socket.blocking.max.ms", "2000");
Packit 2997f0
Packit 2997f0
  RdKafka::KafkaConsumer *c = RdKafka::KafkaConsumer::create(conf, errstr);
Packit 2997f0
  if (!c)
Packit 2997f0
    Test::Fail("Failed to create KafkaConsumer: " + errstr);
Packit 2997f0
  delete conf;
Packit 2997f0
Packit 2997f0
  Test::Say("Find initial middle offset\n");
Packit 2997f0
  int64_t low, high;
Packit 2997f0
  test_timing_t t_qr;
Packit 2997f0
  TIMING_START(&t_qr, "query_watermark_offsets");
Packit 2997f0
  err = c->query_watermark_offsets(topic, partition, &low, &high,
Packit 2997f0
                                   tmout_multip(5000));
Packit 2997f0
  TIMING_STOP(&t_qr);
Packit 2997f0
  if (err)
Packit 2997f0
    Test::Fail("query_watermark_offsets failed: " + RdKafka::err2str(err));
Packit 2997f0
Packit 2997f0
  /* Divide and conquer */
Packit 2997f0
  test_timing_t t_bsearch;
Packit 2997f0
  TIMING_START(&t_bsearch, "actual bsearch");
Packit 2997f0
  int itcnt = 0;
Packit 2997f0
  do {
Packit 2997f0
    int64_t mid;
Packit 2997f0
Packit 2997f0
    mid = low + ((high - low) / 2);
Packit 2997f0
Packit 2997f0
    Test::Say(1, tostr() << "Get message at mid point of " << low <<
Packit 2997f0
              ".." << high << " -> " << mid << "\n");
Packit 2997f0
Packit 2997f0
    RdKafka::Message *msg = get_msg(c, mid,
Packit 2997f0
                                    /* use assign() on first iteration,
Packit 2997f0
                                     * then seek() */
Packit 2997f0
                                    itcnt > 0);
Packit 2997f0
Packit 2997f0
    RdKafka::MessageTimestamp ts = msg->timestamp();
Packit 2997f0
    if (ts.type != RdKafka::MessageTimestamp::MSG_TIMESTAMP_CREATE_TIME)
Packit 2997f0
      Test::Fail(tostr() << "Expected CreateTime timestamp, not " <<
Packit 2997f0
                 ts.type << " at offset " << msg->offset());
Packit 2997f0
Packit 2997f0
    Test::Say(1, tostr() << "Message at offset " << msg->offset() <<
Packit 2997f0
              " with timestamp " << ts.timestamp << "\n");
Packit 2997f0
Packit 2997f0
    if (ts.timestamp == golden_timestamp) {
Packit 2997f0
      Test::Say(1, tostr() << "Found golden timestamp " << ts.timestamp <<
Packit 2997f0
                " at offset " << msg->offset() << " in " << itcnt+1 <<
Packit 2997f0
                " iterations\n");
Packit 2997f0
      delete msg;
Packit 2997f0
      break;
Packit 2997f0
    }
Packit 2997f0
Packit 2997f0
    if (low == high) {
Packit 2997f0
      Test::Fail(tostr() << "Search exhausted at offset " << msg->offset() <<
Packit 2997f0
                 " with timestamp " << ts.timestamp <<
Packit 2997f0
                 " without finding golden timestamp " << golden_timestamp <<
Packit 2997f0
                 " at offset " << golden_offset);
Packit 2997f0
Packit 2997f0
    } else if (ts.timestamp < golden_timestamp)
Packit 2997f0
      low = msg->offset() + 1;
Packit 2997f0
    else if (ts.timestamp > golden_timestamp)
Packit 2997f0
      high = msg->offset() - 1;
Packit 2997f0
Packit 2997f0
    delete msg;
Packit 2997f0
    itcnt++;
Packit 2997f0
  } while (true);
Packit 2997f0
  TIMING_STOP(&t_bsearch);
Packit 2997f0
Packit 2997f0
  c->close();
Packit 2997f0
Packit 2997f0
  delete c;
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
extern "C" {
Packit 2997f0
  int main_0059_bsearch (int argc, char **argv) {
Packit 2997f0
    do_test_bsearch();
Packit 2997f0
    return 0;
Packit 2997f0
  }
Packit 2997f0
}