Blob Blame History Raw
/*
 * librdkafka - Apache Kafka C/C++ library
 *
 * Copyright (c) 2014 Magnus Edenhill
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *
 * 1. Redistributions of source code must retain the above copyright notice,
 *    this list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright notice,
 *    this list of conditions and the following disclaimer in the documentation
 *    and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */

#ifndef _RDKAFKACPP_INT_H_
#define _RDKAFKACPP_INT_H_

#include <string>
#include <iostream>
#include <cstring>
#include <stdlib.h>

#include "rdkafkacpp.h"

extern "C" {
#include "../src/rdkafka.h"
}

#ifdef _MSC_VER
typedef int mode_t;
#pragma warning(disable : 4250)
#endif


namespace RdKafka {


void consume_cb_trampoline(rd_kafka_message_t *msg, void *opaque);
void log_cb_trampoline (const rd_kafka_t *rk, int level,
                        const char *fac, const char *buf);
void error_cb_trampoline (rd_kafka_t *rk, int err, const char *reason,
                          void *opaque);
void throttle_cb_trampoline (rd_kafka_t *rk, const char *broker_name,
			     int32_t broker_id, int throttle_time_ms,
			     void *opaque);
int stats_cb_trampoline (rd_kafka_t *rk, char *json, size_t json_len,
                         void *opaque);
int socket_cb_trampoline (int domain, int type, int protocol, void *opaque);
int open_cb_trampoline (const char *pathname, int flags, mode_t mode,
                        void *opaque);
void rebalance_cb_trampoline (rd_kafka_t *rk,
                              rd_kafka_resp_err_t err,
                              rd_kafka_topic_partition_list_t *c_partitions,
                              void *opaque);
void offset_commit_cb_trampoline0 (
        rd_kafka_t *rk,
        rd_kafka_resp_err_t err,
        rd_kafka_topic_partition_list_t *c_offsets, void *opaque);

rd_kafka_topic_partition_list_t *
    partitions_to_c_parts (const std::vector<TopicPartition*> &partitions);

/**
 * @brief Update the application provided 'partitions' with info from 'c_parts'
 */
void update_partitions_from_c_parts (std::vector<TopicPartition*> &partitions,
                                     const rd_kafka_topic_partition_list_t *c_parts);


class EventImpl : public Event {
 public:
  ~EventImpl () {};

  EventImpl (Type type, ErrorCode err, Severity severity,
             const char *fac, const char *str):
  type_(type), err_(err), severity_(severity), fac_(fac ? fac : ""),
	  str_(str), id_(0), throttle_time_(0) {};

  EventImpl (Type type):
  type_(type), err_(ERR_NO_ERROR), severity_(EVENT_SEVERITY_EMERG),
	  fac_(""), str_(""), id_(0), throttle_time_(0) {};

  Type        type () const { return type_; }
  ErrorCode   err () const { return err_; }
  Severity    severity () const { return severity_; }
  std::string fac () const { return fac_; }
  std::string str () const { return str_; }
  std::string broker_name () const {
	  if (type_ == EVENT_THROTTLE)
		  return str_;
	  else
		  return std::string("");
  }
  int         broker_id () const { return id_; }
  int         throttle_time () const { return throttle_time_; }

  Type        type_;
  ErrorCode   err_;
  Severity    severity_;
  std::string fac_;
  std::string str_;         /* reused for THROTTLE broker_name */
  int         id_;
  int         throttle_time_;
};


class MessageImpl : public Message {
 public:
  ~MessageImpl () {
    if (free_rkmessage_)
      rd_kafka_message_destroy(const_cast<rd_kafka_message_t *>(rkmessage_));
    if (key_)
            delete key_;
  };

  MessageImpl (RdKafka::Topic *topic, rd_kafka_message_t *rkmessage):
  topic_(topic), rkmessage_(rkmessage), free_rkmessage_(true), key_(NULL) {}

  MessageImpl (RdKafka::Topic *topic, rd_kafka_message_t *rkmessage,
               bool dofree):
  topic_(topic), rkmessage_(rkmessage), free_rkmessage_(dofree), key_(NULL) { }

  MessageImpl (rd_kafka_message_t *rkmessage):
  topic_(NULL), rkmessage_(rkmessage), free_rkmessage_(true), key_(NULL) {
    if (rkmessage->rkt) {
      /* Possibly NULL */
      topic_ = static_cast<Topic *>(rd_kafka_topic_opaque(rkmessage->rkt));
    }
  }

  /* Create errored message */
  MessageImpl (RdKafka::Topic *topic, RdKafka::ErrorCode err):
  topic_(topic), free_rkmessage_(false), key_(NULL) {
    rkmessage_ = &rkmessage_err_;
    memset(&rkmessage_err_, 0, sizeof(rkmessage_err_));
    rkmessage_err_.err = static_cast<rd_kafka_resp_err_t>(err);
  }

  std::string         errstr() const {
    /* FIXME: If there is an error string in payload (for consume_cb)
     *        it wont be shown since 'payload' is reused for errstr
     *        and we cant distinguish between consumer and producer.
     *        For the producer case the payload needs to be the original
     *        payload pointer. */
    const char *es = rd_kafka_err2str(rkmessage_->err);
    return std::string(es ? es : "");
  }

  ErrorCode           err () const {
    return static_cast<RdKafka::ErrorCode>(rkmessage_->err);
  }

  Topic              *topic () const { return topic_; }
  std::string         topic_name  () const {
          if (rkmessage_->rkt)
                  return rd_kafka_topic_name(rkmessage_->rkt);
          else
                  return "";
  }
  int32_t             partition () const { return rkmessage_->partition; }
  void               *payload () const { return rkmessage_->payload; }
  size_t              len () const { return rkmessage_->len; }
  const std::string  *key () const {
    if (key_) {
      return key_;
    } else if (rkmessage_->key) {
      key_ = new std::string(static_cast<char const*>(rkmessage_->key), rkmessage_->key_len);
      return key_;
    }
    return NULL;
  }
  const void         *key_pointer () const { return rkmessage_->key; }
  size_t              key_len () const { return rkmessage_->key_len; }

  int64_t             offset () const { return rkmessage_->offset; }

  MessageTimestamp   timestamp () const {
	  MessageTimestamp ts;
	  rd_kafka_timestamp_type_t tstype;
	  ts.timestamp = rd_kafka_message_timestamp(rkmessage_, &tstype);
	  ts.type = static_cast<MessageTimestamp::MessageTimestampType>(tstype);
	  return ts;
  }

  void               *msg_opaque () const { return rkmessage_->_private; };

  int64_t             latency () const {
          return rd_kafka_message_latency(rkmessage_);
  }

  struct rd_kafka_message_s *c_ptr () {
          return rkmessage_;
  }

  RdKafka::Topic *topic_;
  rd_kafka_message_t *rkmessage_;
  bool free_rkmessage_;
  /* For error signalling by the C++ layer the .._err_ message is
   * used as a place holder and rkmessage_ is set to point to it. */
  rd_kafka_message_t rkmessage_err_;
  mutable std::string *key_; /* mutable because it's a cached value */

private:
  /* "delete" copy ctor + copy assignment, for safety of key_ */
  MessageImpl(MessageImpl const&) /*= delete*/;
  MessageImpl& operator=(MessageImpl const&) /*= delete*/;
};


class ConfImpl : public Conf {
 public:
  ConfImpl()
      :consume_cb_(NULL),
      dr_cb_(NULL),
      event_cb_(NULL),
      socket_cb_(NULL),
      open_cb_(NULL),
      partitioner_cb_(NULL),
      partitioner_kp_cb_(NULL),
      rebalance_cb_(NULL),
      offset_commit_cb_(NULL),
      rk_conf_(NULL),
      rkt_conf_(NULL){}
  ~ConfImpl () {
    if (rk_conf_)
      rd_kafka_conf_destroy(rk_conf_);
    else if (rkt_conf_)
      rd_kafka_topic_conf_destroy(rkt_conf_);
  }

  Conf::ConfResult set(const std::string &name,
                       const std::string &value,
                       std::string &errstr);

  Conf::ConfResult set (const std::string &name, DeliveryReportCb *dr_cb,
                        std::string &errstr) {
    if (name != "dr_cb") {
      errstr = "Invalid value type, expected RdKafka::DeliveryReportCb";
      return Conf::CONF_INVALID;
    }

    if (!rk_conf_) {
      errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
      return Conf::CONF_INVALID;
    }

    dr_cb_ = dr_cb;
    return Conf::CONF_OK;
  }

  Conf::ConfResult set (const std::string &name, EventCb *event_cb,
                        std::string &errstr) {
    if (name != "event_cb") {
      errstr = "Invalid value type, expected RdKafka::EventCb";
      return Conf::CONF_INVALID;
    }

    if (!rk_conf_) {
      errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
      return Conf::CONF_INVALID;
    }

    event_cb_ = event_cb;
    return Conf::CONF_OK;
  }

  Conf::ConfResult set (const std::string &name, const Conf *topic_conf,
                        std::string &errstr) {
    const ConfImpl *tconf_impl =
        dynamic_cast<const RdKafka::ConfImpl *>(topic_conf);
    if (name != "default_topic_conf" || !tconf_impl->rkt_conf_) {
      errstr = "Invalid value type, expected RdKafka::Conf";
      return Conf::CONF_INVALID;
    }

    if (!rk_conf_) {
      errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
      return Conf::CONF_INVALID;
    }

    rd_kafka_conf_set_default_topic_conf(rk_conf_,
                                         rd_kafka_topic_conf_dup(tconf_impl->
                                                                 rkt_conf_));

    return Conf::CONF_OK;
  }

  Conf::ConfResult set (const std::string &name, PartitionerCb *partitioner_cb,
                        std::string &errstr) {
    if (name != "partitioner_cb") {
      errstr = "Invalid value type, expected RdKafka::PartitionerCb";
      return Conf::CONF_INVALID;
    }

    if (!rkt_conf_) {
      errstr = "Requires RdKafka::Conf::CONF_TOPIC object";
      return Conf::CONF_INVALID;
    }

    partitioner_cb_ = partitioner_cb;
    return Conf::CONF_OK;
  }

  Conf::ConfResult set (const std::string &name,
                        PartitionerKeyPointerCb *partitioner_kp_cb,
                        std::string &errstr) {
    if (name != "partitioner_key_pointer_cb") {
      errstr = "Invalid value type, expected RdKafka::PartitionerKeyPointerCb";
      return Conf::CONF_INVALID;
    }

    if (!rkt_conf_) {
      errstr = "Requires RdKafka::Conf::CONF_TOPIC object";
      return Conf::CONF_INVALID;
    }

    partitioner_kp_cb_ = partitioner_kp_cb;
    return Conf::CONF_OK;
  }

  Conf::ConfResult set (const std::string &name, SocketCb *socket_cb,
                        std::string &errstr) {
    if (name != "socket_cb") {
      errstr = "Invalid value type, expected RdKafka::SocketCb";
      return Conf::CONF_INVALID;
    }

    if (!rk_conf_) {
      errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
      return Conf::CONF_INVALID;
    }

    socket_cb_ = socket_cb;
    return Conf::CONF_OK;
  }


  Conf::ConfResult set (const std::string &name, OpenCb *open_cb,
                        std::string &errstr) {
    if (name != "open_cb") {
      errstr = "Invalid value type, expected RdKafka::OpenCb";
      return Conf::CONF_INVALID;
    }

    if (!rk_conf_) {
      errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
      return Conf::CONF_INVALID;
    }

    open_cb_ = open_cb;
    return Conf::CONF_OK;
  }




  Conf::ConfResult set (const std::string &name, RebalanceCb *rebalance_cb,
                        std::string &errstr) {
    if (name != "rebalance_cb") {
      errstr = "Invalid value type, expected RdKafka::RebalanceCb";
      return Conf::CONF_INVALID;
    }

    if (!rk_conf_) {
      errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
      return Conf::CONF_INVALID;
    }

    rebalance_cb_ = rebalance_cb;
    return Conf::CONF_OK;
  }


  Conf::ConfResult set (const std::string &name,
                        OffsetCommitCb *offset_commit_cb,
                        std::string &errstr) {
    if (name != "offset_commit_cb") {
      errstr = "Invalid value type, expected RdKafka::OffsetCommitCb";
      return Conf::CONF_INVALID;
    }

    if (!rk_conf_) {
      errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
      return Conf::CONF_INVALID;
    }

    offset_commit_cb_ = offset_commit_cb;
    return Conf::CONF_OK;
  }

  Conf::ConfResult get(const std::string &name, std::string &value) const {
    if (name.compare("dr_cb") == 0 ||
        name.compare("event_cb") == 0 ||
        name.compare("partitioner_cb") == 0 ||
        name.compare("partitioner_key_pointer_cb") == 0 ||
        name.compare("socket_cb") == 0 ||
        name.compare("open_cb") == 0 ||
        name.compare("rebalance_cb") == 0 ||
        name.compare("offset_commit_cb") == 0 ) {
      return Conf::CONF_INVALID;
    }
    rd_kafka_conf_res_t res = RD_KAFKA_CONF_INVALID;

    /* Get size of property */
    size_t size;
    if (rk_conf_)
      res = rd_kafka_conf_get(rk_conf_,
                              name.c_str(), NULL, &size);
    else if (rkt_conf_)
      res = rd_kafka_topic_conf_get(rkt_conf_,
                                    name.c_str(), NULL, &size);
    if (res != RD_KAFKA_CONF_OK)
      return static_cast<Conf::ConfResult>(res);

    char *tmpValue = new char[size];

    if (rk_conf_)
      res = rd_kafka_conf_get(rk_conf_, name.c_str(),
                              tmpValue, &size);
    else if (rkt_conf_)
      res = rd_kafka_topic_conf_get(rkt_conf_,
                                    name.c_str(), NULL, &size);

    if (res == RD_KAFKA_CONF_OK)
      value.assign(tmpValue);
    delete[] tmpValue;

    return static_cast<Conf::ConfResult>(res);
  }

  Conf::ConfResult get(DeliveryReportCb *&dr_cb) const {
      if (!rk_conf_)
	  return Conf::CONF_INVALID;
      dr_cb = this->dr_cb_;
      return Conf::CONF_OK;
  }

  Conf::ConfResult get(EventCb *&event_cb) const {
      if (!rk_conf_)
	  return Conf::CONF_INVALID;
      event_cb = this->event_cb_;
      return Conf::CONF_OK;
  }

  Conf::ConfResult get(PartitionerCb *&partitioner_cb) const {
      if (!rkt_conf_)
	  return Conf::CONF_INVALID;
      partitioner_cb = this->partitioner_cb_;
      return Conf::CONF_OK;
  }

  Conf::ConfResult get(PartitionerKeyPointerCb *&partitioner_kp_cb) const {
      if (!rkt_conf_)
	  return Conf::CONF_INVALID;
      partitioner_kp_cb = this->partitioner_kp_cb_;
      return Conf::CONF_OK;
  }

  Conf::ConfResult get(SocketCb *&socket_cb) const {
      if (!rk_conf_)
	  return Conf::CONF_INVALID;
      socket_cb = this->socket_cb_;
      return Conf::CONF_OK;
  }

  Conf::ConfResult get(OpenCb *&open_cb) const {
      if (!rk_conf_)
	  return Conf::CONF_INVALID;
      open_cb = this->open_cb_;
      return Conf::CONF_OK;
  }

  Conf::ConfResult get(RebalanceCb *&rebalance_cb) const {
      if (!rk_conf_)
	  return Conf::CONF_INVALID;
      rebalance_cb = this->rebalance_cb_;
      return Conf::CONF_OK;
  }

  Conf::ConfResult get(OffsetCommitCb *&offset_commit_cb) const {
      if (!rk_conf_)
	  return Conf::CONF_INVALID;
      offset_commit_cb = this->offset_commit_cb_;
      return Conf::CONF_OK;
    }



  std::list<std::string> *dump ();


  Conf::ConfResult set (const std::string &name, ConsumeCb *consume_cb,
                        std::string &errstr) {
    if (name != "consume_cb") {
      errstr = "Invalid value type, expected RdKafka::ConsumeCb";
      return Conf::CONF_INVALID;
    }

    if (!rk_conf_) {
      errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
      return Conf::CONF_INVALID;
    }

    consume_cb_ = consume_cb;
    return Conf::CONF_OK;
  }


  ConsumeCb *consume_cb_;
  DeliveryReportCb *dr_cb_;
  EventCb *event_cb_;
  SocketCb *socket_cb_;
  OpenCb *open_cb_;
  PartitionerCb *partitioner_cb_;
  PartitionerKeyPointerCb *partitioner_kp_cb_;
  RebalanceCb *rebalance_cb_;
  OffsetCommitCb *offset_commit_cb_;
  ConfType conf_type_;
  rd_kafka_conf_t *rk_conf_;
  rd_kafka_topic_conf_t *rkt_conf_;
};


class HandleImpl : virtual public Handle {
 public:
  ~HandleImpl() {};
  HandleImpl () {};
  const std::string name () const { return std::string(rd_kafka_name(rk_)); };
  const std::string memberid () const {
	  char *str = rd_kafka_memberid(rk_);
	  std::string memberid = str ? str : "";
	  if (str)
		  rd_kafka_mem_free(rk_, str);
	  return memberid;
  }
  int poll (int timeout_ms) { return rd_kafka_poll(rk_, timeout_ms); };
  int outq_len () { return rd_kafka_outq_len(rk_); };

  void set_common_config (RdKafka::ConfImpl *confimpl);

  RdKafka::ErrorCode metadata (bool all_topics,const Topic *only_rkt,
            Metadata **metadatap, int timeout_ms);

  ErrorCode pause (std::vector<TopicPartition*> &partitions);
  ErrorCode resume (std::vector<TopicPartition*> &partitions);

  ErrorCode query_watermark_offsets (const std::string &topic,
				     int32_t partition,
				     int64_t *low, int64_t *high,
				     int timeout_ms) {
    return static_cast<RdKafka::ErrorCode>(
        rd_kafka_query_watermark_offsets(
            rk_, topic.c_str(), partition,
            low, high, timeout_ms));
  }

  ErrorCode get_watermark_offsets (const std::string &topic,
                                   int32_t partition,
                                   int64_t *low, int64_t *high) {
    return static_cast<RdKafka::ErrorCode>(
        rd_kafka_get_watermark_offsets(
            rk_, topic.c_str(), partition,
            low, high));
  }

  Queue *get_partition_queue (const TopicPartition *partition);

  ErrorCode offsetsForTimes (std::vector<TopicPartition*> &offsets,
                             int timeout_ms) {
    rd_kafka_topic_partition_list_t *c_offsets = partitions_to_c_parts(offsets);
    ErrorCode err = static_cast<ErrorCode>(
        rd_kafka_offsets_for_times(rk_, c_offsets, timeout_ms));
    update_partitions_from_c_parts(offsets, c_offsets);
    rd_kafka_topic_partition_list_destroy(c_offsets);
    return err;
  }

  ErrorCode set_log_queue (Queue *queue);

  void yield () {
    rd_kafka_yield(rk_);
  }

  const std::string clusterid (int timeout_ms) {
          char *str = rd_kafka_clusterid(rk_, timeout_ms);
          std::string clusterid = str ? str : "";
          if (str)
                  rd_kafka_mem_free(rk_, str);
          return clusterid;
  }

  struct rd_kafka_s *c_ptr () {
          return rk_;
  }

  rd_kafka_t *rk_;
  /* All Producer and Consumer callbacks must reside in HandleImpl and
   * the opaque provided to rdkafka must be a pointer to HandleImpl, since
   * ProducerImpl and ConsumerImpl classes cannot be safely directly cast to
   * HandleImpl due to the skewed diamond inheritance. */
  ConsumeCb *consume_cb_;
  EventCb *event_cb_;
  SocketCb *socket_cb_;
  OpenCb *open_cb_;
  DeliveryReportCb *dr_cb_;
  PartitionerCb *partitioner_cb_;
  PartitionerKeyPointerCb *partitioner_kp_cb_;
  RebalanceCb *rebalance_cb_;
  OffsetCommitCb *offset_commit_cb_;
};


class TopicImpl : public Topic {
 public:
  ~TopicImpl () {
    rd_kafka_topic_destroy(rkt_);
  }

  const std::string name () const {
    return rd_kafka_topic_name(rkt_);
  }

  bool partition_available (int32_t partition) const {
    return !!rd_kafka_topic_partition_available(rkt_, partition);
  }

  ErrorCode offset_store (int32_t partition, int64_t offset) {
    return static_cast<RdKafka::ErrorCode>(
        rd_kafka_offset_store(rkt_, partition, offset));
  }

  static Topic *create (Handle &base, const std::string &topic,
                        Conf *conf);

  struct rd_kafka_topic_s *c_ptr () {
          return rkt_;
  }

  rd_kafka_topic_t *rkt_;
  PartitionerCb *partitioner_cb_;
  PartitionerKeyPointerCb *partitioner_kp_cb_;
};


/**
 * Topic and Partition
 */
class TopicPartitionImpl : public TopicPartition {
public:
  ~TopicPartitionImpl() {};

  static TopicPartition *create (const std::string &topic, int partition);

  TopicPartitionImpl (const std::string &topic, int partition):
  topic_(topic), partition_(partition), offset_(RdKafka::Topic::OFFSET_INVALID),
      err_(ERR_NO_ERROR) {}

  TopicPartitionImpl (const std::string &topic, int partition, int64_t offset):
  topic_(topic), partition_(partition), offset_(offset),
          err_(ERR_NO_ERROR) {}

  TopicPartitionImpl (const rd_kafka_topic_partition_t *c_part) {
    topic_ = std::string(c_part->topic);
    partition_ = c_part->partition;
    offset_ = c_part->offset;
    err_ = static_cast<ErrorCode>(c_part->err);
    // FIXME: metadata
  }

  static void destroy (std::vector<TopicPartition*> &partitions);

  int partition () const { return partition_; }
  const std::string &topic () const { return topic_ ; }

  int64_t offset () const { return offset_; }

  ErrorCode err () const { return err_; }

  void set_offset (int64_t offset) { offset_ = offset; }

  std::ostream& operator<<(std::ostream &ostrm) const {
    return ostrm << topic_ << " [" << partition_ << "]";
  }

  std::string topic_;
  int partition_;
  int64_t offset_;
  ErrorCode err_;
};



class KafkaConsumerImpl : virtual public KafkaConsumer, virtual public HandleImpl {
public:
  ~KafkaConsumerImpl () {

  }

  static KafkaConsumer *create (Conf *conf, std::string &errstr);

  ErrorCode assignment (std::vector<TopicPartition*> &partitions);
  ErrorCode subscription (std::vector<std::string> &topics);
  ErrorCode subscribe (const std::vector<std::string> &topics);
  ErrorCode unsubscribe ();
  ErrorCode assign (const std::vector<TopicPartition*> &partitions);
  ErrorCode unassign ();

  Message *consume (int timeout_ms);
  ErrorCode commitSync () {
    return static_cast<ErrorCode>(rd_kafka_commit(rk_, NULL, 0/*sync*/));
  }
  ErrorCode commitAsync () {
    return static_cast<ErrorCode>(rd_kafka_commit(rk_, NULL, 1/*async*/));
  }
  ErrorCode commitSync (Message *message) {
	  MessageImpl *msgimpl = dynamic_cast<MessageImpl*>(message);
	  return static_cast<ErrorCode>(
                  rd_kafka_commit_message(rk_, msgimpl->rkmessage_, 0/*sync*/));
  }
  ErrorCode commitAsync (Message *message) {
	  MessageImpl *msgimpl = dynamic_cast<MessageImpl*>(message);
	  return static_cast<ErrorCode>(
                  rd_kafka_commit_message(rk_, msgimpl->rkmessage_,1/*async*/));
  }

  ErrorCode commitSync (std::vector<TopicPartition*> &offsets) {
	  rd_kafka_topic_partition_list_t *c_parts =
		  partitions_to_c_parts(offsets);
	  rd_kafka_resp_err_t err =
		  rd_kafka_commit(rk_, c_parts, 0);
	  if (!err)
		  update_partitions_from_c_parts(offsets, c_parts);
	  rd_kafka_topic_partition_list_destroy(c_parts);
	  return static_cast<ErrorCode>(err);
  }

  ErrorCode commitAsync (const std::vector<TopicPartition*> &offsets) {
	  rd_kafka_topic_partition_list_t *c_parts =
		  partitions_to_c_parts(offsets);
	  rd_kafka_resp_err_t err =
		  rd_kafka_commit(rk_, c_parts, 1);
	  rd_kafka_topic_partition_list_destroy(c_parts);
	  return static_cast<ErrorCode>(err);
  }

  ErrorCode commitSync (OffsetCommitCb *offset_commit_cb) {
          return static_cast<ErrorCode>(
                  rd_kafka_commit_queue(rk_, NULL, NULL,
                                        RdKafka::offset_commit_cb_trampoline0,
                                        offset_commit_cb));
  }

  ErrorCode commitSync (std::vector<TopicPartition*> &offsets,
                        OffsetCommitCb *offset_commit_cb) {
          rd_kafka_topic_partition_list_t *c_parts =
                  partitions_to_c_parts(offsets);
          rd_kafka_resp_err_t err =
                  rd_kafka_commit_queue(rk_, c_parts, NULL,
                                        RdKafka::offset_commit_cb_trampoline0,
                                        offset_commit_cb);
          rd_kafka_topic_partition_list_destroy(c_parts);
          return static_cast<ErrorCode>(err);
  }

  ErrorCode committed (std::vector<TopicPartition*> &partitions, int timeout_ms);
  ErrorCode position (std::vector<TopicPartition*> &partitions);

  ErrorCode close ();

  ErrorCode seek (const TopicPartition &partition, int timeout_ms);

  ErrorCode offsets_store (std::vector<TopicPartition*> &offsets) {
          rd_kafka_topic_partition_list_t *c_parts =
                  partitions_to_c_parts(offsets);
          rd_kafka_resp_err_t err =
                  rd_kafka_offsets_store(rk_, c_parts);
          update_partitions_from_c_parts(offsets, c_parts);
          rd_kafka_topic_partition_list_destroy(c_parts);
          return static_cast<ErrorCode>(err);
  }

};


class MetadataImpl : public Metadata {
 public:
  MetadataImpl(const rd_kafka_metadata_t *metadata);
  ~MetadataImpl();

  const std::vector<const BrokerMetadata *> *brokers() const {
    return &brokers_;
  }

  const std::vector<const TopicMetadata *>  *topics() const {
    return &topics_;
  }

  const std::string orig_broker_name() const {
    return std::string(metadata_->orig_broker_name);
  }

  int32_t orig_broker_id() const {
    return metadata_->orig_broker_id;
  }

private:
  const rd_kafka_metadata_t *metadata_;
  std::vector<const BrokerMetadata *> brokers_;
  std::vector<const TopicMetadata *> topics_;
  std::string orig_broker_name_;
};


class QueueImpl : virtual public Queue {
 public:
  ~QueueImpl () {
    rd_kafka_queue_destroy(queue_);
  }
  static Queue *create (Handle *base);
  ErrorCode forward (Queue *queue);
  Message *consume (int timeout_ms);
  int poll (int timeout_ms);
  void io_event_enable(int fd, const void *payload, size_t size);

  rd_kafka_queue_t *queue_;
};





class ConsumerImpl : virtual public Consumer, virtual public HandleImpl {
 public:
  ~ConsumerImpl () {
    rd_kafka_destroy(rk_); };
  static Consumer *create (Conf *conf, std::string &errstr);

  ErrorCode start (Topic *topic, int32_t partition, int64_t offset);
  ErrorCode start (Topic *topic, int32_t partition, int64_t offset,
                   Queue *queue);
  ErrorCode stop (Topic *topic, int32_t partition);
  ErrorCode seek (Topic *topic, int32_t partition, int64_t offset,
		  int timeout_ms);
  Message *consume (Topic *topic, int32_t partition, int timeout_ms);
  Message *consume (Queue *queue, int timeout_ms);
  int consume_callback (Topic *topic, int32_t partition, int timeout_ms,
                        ConsumeCb *cb, void *opaque);
  int consume_callback (Queue *queue, int timeout_ms,
                        RdKafka::ConsumeCb *consume_cb, void *opaque);
};



class ProducerImpl : virtual public Producer, virtual public HandleImpl {

 public:
  ~ProducerImpl () { if (rk_) rd_kafka_destroy(rk_); };

  ErrorCode produce (Topic *topic, int32_t partition,
                     int msgflags,
                     void *payload, size_t len,
                     const std::string *key,
                     void *msg_opaque);

  ErrorCode produce (Topic *topic, int32_t partition,
                     int msgflags,
                     void *payload, size_t len,
                     const void *key, size_t key_len,
                     void *msg_opaque);

  ErrorCode produce (Topic *topic, int32_t partition,
                     const std::vector<char> *payload,
                     const std::vector<char> *key,
                     void *msg_opaque);

  ErrorCode produce (const std::string topic_name, int32_t partition,
                     int msgflags,
                     void *payload, size_t len,
                     const void *key, size_t key_len,
                     int64_t timestamp,
                     void *msg_opaque);

  ErrorCode flush (int timeout_ms) {
	  return static_cast<RdKafka::ErrorCode>(rd_kafka_flush(rk_,
								timeout_ms));
  }

  static Producer *create (Conf *conf, std::string &errstr);

};



}

#endif /* _RDKAFKACPP_INT_H_ */