Blame examples/rdkafka_consume_batch.cpp

Packit 2997f0
/*
Packit 2997f0
 * librdkafka - Apache Kafka C library
Packit 2997f0
 *
Packit 2997f0
 * Copyright (c) 2018, Magnus Edenhill
Packit 2997f0
 * All rights reserved.
Packit 2997f0
 *
Packit 2997f0
 * Redistribution and use in source and binary forms, with or without
Packit 2997f0
 * modification, are permitted provided that the following conditions are met:
Packit 2997f0
 *
Packit 2997f0
 * 1. Redistributions of source code must retain the above copyright notice,
Packit 2997f0
 *    this list of conditions and the following disclaimer.
Packit 2997f0
 * 2. Redistributions in binary form must reproduce the above copyright notice,
Packit 2997f0
 *    this list of conditions and the following disclaimer in the documentation
Packit 2997f0
 *    and/or other materials provided with the distribution.
Packit 2997f0
 *
Packit 2997f0
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
Packit 2997f0
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
Packit 2997f0
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
Packit 2997f0
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
Packit 2997f0
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
Packit 2997f0
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
Packit 2997f0
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
Packit 2997f0
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
Packit 2997f0
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
Packit 2997f0
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
Packit 2997f0
 * POSSIBILITY OF SUCH DAMAGE.
Packit 2997f0
 */
Packit 2997f0
Packit 2997f0
/**
Packit 2997f0
 * Apache Kafka consumer & producer example programs
Packit 2997f0
 * using the Kafka driver from librdkafka
Packit 2997f0
 * (https://github.com/edenhill/librdkafka)
Packit 2997f0
 *
Packit 2997f0
 * This example shows how to read batches of messages.
Packit 2997f0
 * Note that messages are fetched from the broker in batches regardless
Packit 2997f0
 * of how the application polls messages from librdkafka, this example
Packit 2997f0
 * merely shows how to accumulate a set of messages in the application.
Packit 2997f0
 */
Packit 2997f0
Packit 2997f0
#include <iostream>
Packit 2997f0
#include <string>
Packit 2997f0
#include <cstdlib>
Packit 2997f0
#include <cstdio>
Packit 2997f0
#include <csignal>
Packit 2997f0
#include <cstring>
Packit 2997f0
Packit 2997f0
#ifndef _MSC_VER
Packit 2997f0
#include <sys/time.h>
Packit 2997f0
#endif
Packit 2997f0
Packit 2997f0
#ifdef _MSC_VER
Packit 2997f0
#include "../win32/wingetopt.h"
Packit 2997f0
#include <atltime.h>
Packit 2997f0
#elif _AIX
Packit 2997f0
#include <unistd.h>
Packit 2997f0
#else
Packit 2997f0
#include <getopt.h>
Packit 2997f0
#include <unistd.h>
Packit 2997f0
#endif
Packit 2997f0
Packit 2997f0
/*
Packit 2997f0
 * Typically include path in a real application would be
Packit 2997f0
 * #include <librdkafka/rdkafkacpp.h>
Packit 2997f0
 */
Packit 2997f0
#include "rdkafkacpp.h"
Packit 2997f0
Packit 2997f0
Packit 2997f0
Packit 2997f0
static bool run = true;
Packit 2997f0
Packit 2997f0
static void sigterm (int sig) {
Packit 2997f0
  run = false;
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
Packit 2997f0
Packit 2997f0
/**
Packit 2997f0
 * @returns the current wall-clock time in milliseconds
Packit 2997f0
 */
Packit 2997f0
static int64_t now () {
Packit 2997f0
#ifndef _MSC_VER
Packit 2997f0
        struct timeval tv;
Packit 2997f0
        gettimeofday(&tv, NULL);
Packit 2997f0
        return ((int64_t)tv.tv_sec * 1000) + (tv.tv_usec / 1000);
Packit 2997f0
#else
Packit 2997f0
#error "now() not implemented for Windows, please submit a PR"
Packit 2997f0
#endif
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
Packit 2997f0
Packit 2997f0
/**
Packit 2997f0
 * @brief Accumulate a batch of \p batch_size messages, but wait
Packit 2997f0
 *        no longer than \p batch_tmout milliseconds.
Packit 2997f0
 */
Packit 2997f0
static std::vector<RdKafka::Message *>
Packit 2997f0
consume_batch (RdKafka::KafkaConsumer *consumer, size_t batch_size, int batch_tmout) {
Packit 2997f0
Packit 2997f0
  std::vector<RdKafka::Message *> msgs;
Packit 2997f0
  msgs.reserve(batch_size);
Packit 2997f0
Packit 2997f0
  int64_t end = now() + batch_tmout;
Packit 2997f0
  int remaining_timeout = batch_tmout;
Packit 2997f0
Packit 2997f0
  while (msgs.size() < batch_size) {
Packit 2997f0
    RdKafka::Message *msg = consumer->consume(remaining_timeout);
Packit 2997f0
Packit 2997f0
    switch (msg->err()) {
Packit 2997f0
    case RdKafka::ERR__TIMED_OUT:
Packit 2997f0
      delete msg;
Packit 2997f0
      return msgs;
Packit 2997f0
Packit 2997f0
    case RdKafka::ERR_NO_ERROR:
Packit 2997f0
      msgs.push_back(msg);
Packit 2997f0
      break;
Packit 2997f0
Packit 2997f0
    default:
Packit 2997f0
      std::cerr << "%% Consumer error: " << msg->errstr() << std::endl;
Packit 2997f0
      run = false;
Packit 2997f0
      delete msg;
Packit 2997f0
      return msgs;
Packit 2997f0
    }
Packit 2997f0
Packit 2997f0
    remaining_timeout = end - now();
Packit 2997f0
    if (remaining_timeout < 0)
Packit 2997f0
      break;
Packit 2997f0
  }
Packit 2997f0
Packit 2997f0
  return msgs;
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
Packit 2997f0
int main (int argc, char **argv) {
Packit 2997f0
  std::string errstr;
Packit 2997f0
  std::string topic_str;
Packit 2997f0
  std::vector<std::string> topics;
Packit 2997f0
  int batch_size = 100;
Packit 2997f0
  int batch_tmout = 1000;
Packit 2997f0
Packit 2997f0
  /* Create configuration objects */
Packit 2997f0
  RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
Packit 2997f0
Packit 2997f0
  if (conf->set("enable.partition.eof", "false", errstr) != RdKafka::Conf::CONF_OK) {
Packit 2997f0
    std::cerr << errstr << std::endl;
Packit 2997f0
    exit(1);
Packit 2997f0
  }
Packit 2997f0
Packit 2997f0
  /* Read command line arguments */
Packit 2997f0
  int opt;
Packit 2997f0
  while ((opt = getopt(argc, argv, "g:B:T::b:X:")) != -1) {
Packit 2997f0
    switch (opt) {
Packit 2997f0
    case 'g':
Packit 2997f0
      if (conf->set("group.id",  optarg, errstr) != RdKafka::Conf::CONF_OK) {
Packit 2997f0
        std::cerr << errstr << std::endl;
Packit 2997f0
        exit(1);
Packit 2997f0
      }
Packit 2997f0
      break;
Packit 2997f0
Packit 2997f0
    case 'B':
Packit 2997f0
      batch_size = atoi(optarg);
Packit 2997f0
      break;
Packit 2997f0
Packit 2997f0
    case 'T':
Packit 2997f0
      batch_tmout = atoi(optarg);
Packit 2997f0
      break;
Packit 2997f0
Packit 2997f0
    case 'b':
Packit 2997f0
      if (conf->set("bootstrap.servers", optarg, errstr) != RdKafka::Conf::CONF_OK) {
Packit 2997f0
        std::cerr << errstr << std::endl;
Packit 2997f0
        exit(1);
Packit 2997f0
      }
Packit 2997f0
      break;
Packit 2997f0
Packit 2997f0
    case 'X':
Packit 2997f0
      {
Packit 2997f0
        char *name, *val;
Packit 2997f0
Packit 2997f0
        name = optarg;
Packit 2997f0
        if (!(val = strchr(name, '='))) {
Packit 2997f0
          std::cerr << "%% Expected -X property=value, not " <<
Packit 2997f0
              name << std::endl;
Packit 2997f0
          exit(1);
Packit 2997f0
        }
Packit 2997f0
Packit 2997f0
        *val = '\0';
Packit 2997f0
        val++;
Packit 2997f0
Packit 2997f0
        if (conf->set(name, val, errstr) != RdKafka::Conf::CONF_OK) {
Packit 2997f0
          std::cerr << errstr << std::endl;
Packit 2997f0
          exit(1);
Packit 2997f0
        }
Packit 2997f0
      }
Packit 2997f0
      break;
Packit 2997f0
Packit 2997f0
    default:
Packit 2997f0
      goto usage;
Packit 2997f0
    }
Packit 2997f0
  }
Packit 2997f0
Packit 2997f0
  /* Topics to consume */
Packit 2997f0
  for (; optind < argc ; optind++)
Packit 2997f0
    topics.push_back(std::string(argv[optind]));
Packit 2997f0
Packit 2997f0
  if (topics.empty() || optind != argc) {
Packit 2997f0
  usage:
Packit 2997f0
    fprintf(stderr,
Packit 2997f0
            "Usage: %s -g <group-id> -B <batch-size> [options] topic1 topic2..\n"
Packit 2997f0
            "\n"
Packit 2997f0
            "librdkafka version %s (0x%08x)\n"
Packit 2997f0
            "\n"
Packit 2997f0
            " Options:\n"
Packit 2997f0
            "  -g <group-id>    Consumer group id\n"
Packit 2997f0
            "  -B <batch-size>  How many messages to batch (default: 100).\n"
Packit 2997f0
            "  -T <batch-tmout> How long to wait for batch-size to accumulate in milliseconds. (default 1000 ms)\n"
Packit 2997f0
            "  -b <brokers>    Broker address (localhost:9092)\n"
Packit 2997f0
            "  -X <prop=name>  Set arbitrary librdkafka configuration property\n"
Packit 2997f0
            "\n",
Packit 2997f0
            argv[0],
Packit 2997f0
            RdKafka::version_str().c_str(), RdKafka::version());
Packit 2997f0
        exit(1);
Packit 2997f0
  }
Packit 2997f0
Packit 2997f0
Packit 2997f0
  signal(SIGINT, sigterm);
Packit 2997f0
  signal(SIGTERM, sigterm);
Packit 2997f0
Packit 2997f0
  /* Create consumer */
Packit 2997f0
  RdKafka::KafkaConsumer *consumer = RdKafka::KafkaConsumer::create(conf, errstr);
Packit 2997f0
  if (!consumer) {
Packit 2997f0
    std::cerr << "Failed to create consumer: " << errstr << std::endl;
Packit 2997f0
    exit(1);
Packit 2997f0
  }
Packit 2997f0
Packit 2997f0
  delete conf;
Packit 2997f0
Packit 2997f0
  /* Subscribe to topics */
Packit 2997f0
  RdKafka::ErrorCode err = consumer->subscribe(topics);
Packit 2997f0
  if (err) {
Packit 2997f0
    std::cerr << "Failed to subscribe to " << topics.size() << " topics: "
Packit 2997f0
              << RdKafka::err2str(err) << std::endl;
Packit 2997f0
    exit(1);
Packit 2997f0
  }
Packit 2997f0
Packit 2997f0
  /* Consume messages in batches of \p batch_size */
Packit 2997f0
  while (run) {
Packit 2997f0
    auto msgs = consume_batch(consumer, batch_size, batch_tmout);
Packit 2997f0
    std::cout << "Accumulated " << msgs.size() << " messages:" << std::endl;
Packit 2997f0
Packit 2997f0
    for (auto &msg : msgs) {
Packit 2997f0
      std::cout << " Message in " << msg->topic_name() << " [" << msg->partition() << "] at offset " << msg->offset() << std::endl;
Packit 2997f0
      delete msg;
Packit 2997f0
    }
Packit 2997f0
  }
Packit 2997f0
Packit 2997f0
  /* Close and destroy consumer */
Packit 2997f0
  consumer->close();
Packit 2997f0
  delete consumer;
Packit 2997f0
Packit 2997f0
  return 0;
Packit 2997f0
}