Blob Blame History Raw
/*
 * librdkafka - Apache Kafka C/C++ library
 *
 * Copyright (c) 2014 Magnus Edenhill
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *
 * 1. Redistributions of source code must retain the above copyright notice,
 *    this list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright notice,
 *    this list of conditions and the following disclaimer in the documentation
 *    and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */

#include <iostream>
#include <string>
#include <list>
#include <cerrno>

#include "rdkafkacpp_int.h"

RdKafka::Consumer::~Consumer () {}

RdKafka::Consumer *RdKafka::Consumer::create (RdKafka::Conf *conf,
                                              std::string &errstr) {
  char errbuf[512];
  RdKafka::ConfImpl *confimpl = dynamic_cast<RdKafka::ConfImpl *>(conf);
  RdKafka::ConsumerImpl *rkc = new RdKafka::ConsumerImpl();
  rd_kafka_conf_t *rk_conf = NULL;

  if (confimpl) {
    if (!confimpl->rk_conf_) {
      errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
      delete rkc;
      return NULL;
    }

    rkc->set_common_config(confimpl);

    rk_conf = rd_kafka_conf_dup(confimpl->rk_conf_);
  }

  rd_kafka_t *rk;
  if (!(rk = rd_kafka_new(RD_KAFKA_CONSUMER, rk_conf,
                          errbuf, sizeof(errbuf)))) {
    errstr = errbuf;
    delete rkc;
    return NULL;
  }

  rkc->rk_ = rk;


  return rkc;
}

int64_t RdKafka::Consumer::OffsetTail (int64_t offset) {
  return RD_KAFKA_OFFSET_TAIL(offset);
}

RdKafka::ErrorCode RdKafka::ConsumerImpl::start (Topic *topic,
                                                 int32_t partition,
                                                 int64_t offset) {
  RdKafka::TopicImpl *topicimpl = dynamic_cast<RdKafka::TopicImpl *>(topic);

  if (rd_kafka_consume_start(topicimpl->rkt_, partition, offset) == -1)
    return static_cast<RdKafka::ErrorCode>(rd_kafka_last_error());

  return RdKafka::ERR_NO_ERROR;
}


RdKafka::ErrorCode RdKafka::ConsumerImpl::start (Topic *topic,
                                                 int32_t partition,
                                                 int64_t offset,
                                                 Queue *queue) {
  RdKafka::TopicImpl *topicimpl = dynamic_cast<RdKafka::TopicImpl *>(topic);
  RdKafka::QueueImpl *queueimpl = dynamic_cast<RdKafka::QueueImpl *>(queue);

  if (rd_kafka_consume_start_queue(topicimpl->rkt_, partition, offset,
                                   queueimpl->queue_) == -1)
    return static_cast<RdKafka::ErrorCode>(rd_kafka_last_error());

  return RdKafka::ERR_NO_ERROR;
}


RdKafka::ErrorCode RdKafka::ConsumerImpl::stop (Topic *topic,
                                                int32_t partition) {
  RdKafka::TopicImpl *topicimpl = dynamic_cast<RdKafka::TopicImpl *>(topic);

  if (rd_kafka_consume_stop(topicimpl->rkt_, partition) == -1)
    return static_cast<RdKafka::ErrorCode>(rd_kafka_last_error());

  return RdKafka::ERR_NO_ERROR;
}

RdKafka::ErrorCode RdKafka::ConsumerImpl::seek (Topic *topic,
						int32_t partition,
						int64_t offset,
						int timeout_ms) {
  RdKafka::TopicImpl *topicimpl = dynamic_cast<RdKafka::TopicImpl *>(topic);

  if (rd_kafka_seek(topicimpl->rkt_, partition, offset, timeout_ms) == -1)
    return static_cast<RdKafka::ErrorCode>(rd_kafka_last_error());

  return RdKafka::ERR_NO_ERROR;
}

RdKafka::Message *RdKafka::ConsumerImpl::consume (Topic *topic,
                                                  int32_t partition,
                                                  int timeout_ms) {
  RdKafka::TopicImpl *topicimpl = dynamic_cast<RdKafka::TopicImpl *>(topic);
  rd_kafka_message_t *rkmessage;

  rkmessage = rd_kafka_consume(topicimpl->rkt_, partition, timeout_ms);
  if (!rkmessage)
    return new RdKafka::MessageImpl(topic,
                                    static_cast<RdKafka::ErrorCode>
                                    (rd_kafka_last_error()));

  return new RdKafka::MessageImpl(topic, rkmessage);
}

namespace {
  /* Helper struct for `consume_callback'.
   * Encapsulates the values we need in order to call `rd_kafka_consume_callback'
   * and keep track of the C++ callback function and `opaque' value.
   */
  struct ConsumerImplCallback {
    ConsumerImplCallback(RdKafka::Topic* topic, RdKafka::ConsumeCb* cb, void* data)
      : topic(topic), cb_cls(cb), cb_data(data) {
    }
    /* This function is the one we give to `rd_kafka_consume_callback', with
     * the `opaque' pointer pointing to an instance of this struct, in which
     * we can find the C++ callback and `cb_data'.
     */
    static void consume_cb_trampoline(rd_kafka_message_t *msg, void *opaque) {
      ConsumerImplCallback *instance = static_cast<ConsumerImplCallback*>(opaque);
      RdKafka::MessageImpl message(instance->topic, msg, false /*don't free*/);
      instance->cb_cls->consume_cb(message, instance->cb_data);
    }
    RdKafka::Topic *topic;
    RdKafka::ConsumeCb *cb_cls;
    void *cb_data;
  };
}

int RdKafka::ConsumerImpl::consume_callback (RdKafka::Topic* topic,
                                             int32_t partition,
                                             int timeout_ms,
                                             RdKafka::ConsumeCb *consume_cb,
                                             void *opaque) {
  RdKafka::TopicImpl *topicimpl = static_cast<RdKafka::TopicImpl *>(topic);
  ConsumerImplCallback context(topic, consume_cb, opaque);
  return rd_kafka_consume_callback(topicimpl->rkt_, partition, timeout_ms,
                                   &ConsumerImplCallback::consume_cb_trampoline, &context);
}


RdKafka::Message *RdKafka::ConsumerImpl::consume (Queue *queue,
                                                  int timeout_ms) {
  RdKafka::QueueImpl *queueimpl = dynamic_cast<RdKafka::QueueImpl *>(queue);
  rd_kafka_message_t *rkmessage;

  rkmessage = rd_kafka_consume_queue(queueimpl->queue_, timeout_ms);
  if (!rkmessage)
    return new RdKafka::MessageImpl(NULL,
                                    static_cast<RdKafka::ErrorCode>
                                    (rd_kafka_last_error()));
  /*
   * Recover our Topic * from the topic conf's opaque field, which we
   * set in RdKafka::Topic::create() for just this kind of situation.
   */
  void *opaque = rd_kafka_topic_opaque(rkmessage->rkt);
  Topic *topic = static_cast<Topic *>(opaque);

  return new RdKafka::MessageImpl(topic, rkmessage);
}

namespace {
  /* Helper struct for `consume_callback' with a Queue.
   * Encapsulates the values we need in order to call `rd_kafka_consume_callback'
   * and keep track of the C++ callback function and `opaque' value.
   */
  struct ConsumerImplQueueCallback {
    ConsumerImplQueueCallback(RdKafka::ConsumeCb *cb, void *data)
      : cb_cls(cb), cb_data(data) {
    }
    /* This function is the one we give to `rd_kafka_consume_callback', with
     * the `opaque' pointer pointing to an instance of this struct, in which
     * we can find the C++ callback and `cb_data'.
     */
    static void consume_cb_trampoline(rd_kafka_message_t *msg, void *opaque) {
      ConsumerImplQueueCallback *instance = static_cast<ConsumerImplQueueCallback *>(opaque);
      /*
       * Recover our Topic * from the topic conf's opaque field, which we
       * set in RdKafka::Topic::create() for just this kind of situation.
       */
      void *topic_opaque = rd_kafka_topic_opaque(msg->rkt);
      RdKafka::Topic *topic = static_cast<RdKafka::Topic *>(topic_opaque);
      RdKafka::MessageImpl message(topic, msg, false /*don't free*/);
      instance->cb_cls->consume_cb(message, instance->cb_data);
    }
    RdKafka::ConsumeCb *cb_cls;
    void *cb_data;
  };
}

int RdKafka::ConsumerImpl::consume_callback (Queue *queue,
                                             int timeout_ms,
                                             RdKafka::ConsumeCb *consume_cb,
                                             void *opaque) {
  RdKafka::QueueImpl *queueimpl = dynamic_cast<RdKafka::QueueImpl *>(queue);
  ConsumerImplQueueCallback context(consume_cb, opaque);
  return rd_kafka_consume_callback_queue(queueimpl->queue_, timeout_ms,
                                         &ConsumerImplQueueCallback::consume_cb_trampoline,
                                         &context);
}