Blame src-cpp/rdkafkacpp_int.h

Packit 2997f0
/*
Packit 2997f0
 * librdkafka - Apache Kafka C/C++ library
Packit 2997f0
 *
Packit 2997f0
 * Copyright (c) 2014 Magnus Edenhill
Packit 2997f0
 * All rights reserved.
Packit 2997f0
 *
Packit 2997f0
 * Redistribution and use in source and binary forms, with or without
Packit 2997f0
 * modification, are permitted provided that the following conditions are met:
Packit 2997f0
 *
Packit 2997f0
 * 1. Redistributions of source code must retain the above copyright notice,
Packit 2997f0
 *    this list of conditions and the following disclaimer.
Packit 2997f0
 * 2. Redistributions in binary form must reproduce the above copyright notice,
Packit 2997f0
 *    this list of conditions and the following disclaimer in the documentation
Packit 2997f0
 *    and/or other materials provided with the distribution.
Packit 2997f0
 *
Packit 2997f0
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
Packit 2997f0
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
Packit 2997f0
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
Packit 2997f0
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
Packit 2997f0
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
Packit 2997f0
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
Packit 2997f0
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
Packit 2997f0
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
Packit 2997f0
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
Packit 2997f0
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
Packit 2997f0
 * POSSIBILITY OF SUCH DAMAGE.
Packit 2997f0
 */
Packit 2997f0
Packit 2997f0
#ifndef _RDKAFKACPP_INT_H_
Packit 2997f0
#define _RDKAFKACPP_INT_H_
Packit 2997f0
Packit 2997f0
#include <string>
Packit 2997f0
#include <iostream>
Packit 2997f0
#include <cstring>
Packit 2997f0
#include <stdlib.h>
Packit 2997f0
Packit 2997f0
#include "rdkafkacpp.h"
Packit 2997f0
Packit 2997f0
extern "C" {
Packit 2997f0
#include "../src/rdkafka.h"
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
#ifdef _MSC_VER
Packit 2997f0
typedef int mode_t;
Packit 2997f0
#pragma warning(disable : 4250)
Packit 2997f0
#endif
Packit 2997f0
Packit 2997f0
Packit 2997f0
namespace RdKafka {
Packit 2997f0
Packit 2997f0
Packit 2997f0
void consume_cb_trampoline(rd_kafka_message_t *msg, void *opaque);
Packit 2997f0
void log_cb_trampoline (const rd_kafka_t *rk, int level,
Packit 2997f0
                        const char *fac, const char *buf);
Packit 2997f0
void error_cb_trampoline (rd_kafka_t *rk, int err, const char *reason,
Packit 2997f0
                          void *opaque);
Packit 2997f0
void throttle_cb_trampoline (rd_kafka_t *rk, const char *broker_name,
Packit 2997f0
			     int32_t broker_id, int throttle_time_ms,
Packit 2997f0
			     void *opaque);
Packit 2997f0
int stats_cb_trampoline (rd_kafka_t *rk, char *json, size_t json_len,
Packit 2997f0
                         void *opaque);
Packit 2997f0
int socket_cb_trampoline (int domain, int type, int protocol, void *opaque);
Packit 2997f0
int open_cb_trampoline (const char *pathname, int flags, mode_t mode,
Packit 2997f0
                        void *opaque);
Packit 2997f0
void rebalance_cb_trampoline (rd_kafka_t *rk,
Packit 2997f0
                              rd_kafka_resp_err_t err,
Packit 2997f0
                              rd_kafka_topic_partition_list_t *c_partitions,
Packit 2997f0
                              void *opaque);
Packit 2997f0
void offset_commit_cb_trampoline0 (
Packit 2997f0
        rd_kafka_t *rk,
Packit 2997f0
        rd_kafka_resp_err_t err,
Packit 2997f0
        rd_kafka_topic_partition_list_t *c_offsets, void *opaque);
Packit 2997f0
Packit 2997f0
rd_kafka_topic_partition_list_t *
Packit 2997f0
    partitions_to_c_parts (const std::vector<TopicPartition*> &partitions);
Packit 2997f0
Packit 2997f0
/**
Packit 2997f0
 * @brief Update the application provided 'partitions' with info from 'c_parts'
Packit 2997f0
 */
Packit 2997f0
void update_partitions_from_c_parts (std::vector<TopicPartition*> &partitions,
Packit 2997f0
                                     const rd_kafka_topic_partition_list_t *c_parts);
Packit 2997f0
Packit 2997f0
Packit 2997f0
class EventImpl : public Event {
Packit 2997f0
 public:
Packit 2997f0
  ~EventImpl () {};
Packit 2997f0
Packit 2997f0
  EventImpl (Type type, ErrorCode err, Severity severity,
Packit 2997f0
             const char *fac, const char *str):
Packit 2997f0
  type_(type), err_(err), severity_(severity), fac_(fac ? fac : ""),
Packit 2997f0
	  str_(str), id_(0), throttle_time_(0) {};
Packit 2997f0
Packit 2997f0
  EventImpl (Type type):
Packit 2997f0
  type_(type), err_(ERR_NO_ERROR), severity_(EVENT_SEVERITY_EMERG),
Packit 2997f0
	  fac_(""), str_(""), id_(0), throttle_time_(0) {};
Packit 2997f0
Packit 2997f0
  Type        type () const { return type_; }
Packit 2997f0
  ErrorCode   err () const { return err_; }
Packit 2997f0
  Severity    severity () const { return severity_; }
Packit 2997f0
  std::string fac () const { return fac_; }
Packit 2997f0
  std::string str () const { return str_; }
Packit 2997f0
  std::string broker_name () const {
Packit 2997f0
	  if (type_ == EVENT_THROTTLE)
Packit 2997f0
		  return str_;
Packit 2997f0
	  else
Packit 2997f0
		  return std::string("");
Packit 2997f0
  }
Packit 2997f0
  int         broker_id () const { return id_; }
Packit 2997f0
  int         throttle_time () const { return throttle_time_; }
Packit 2997f0
Packit 2997f0
  Type        type_;
Packit 2997f0
  ErrorCode   err_;
Packit 2997f0
  Severity    severity_;
Packit 2997f0
  std::string fac_;
Packit 2997f0
  std::string str_;         /* reused for THROTTLE broker_name */
Packit 2997f0
  int         id_;
Packit 2997f0
  int         throttle_time_;
Packit 2997f0
};
Packit 2997f0
Packit 2997f0
Packit 2997f0
class MessageImpl : public Message {
Packit 2997f0
 public:
Packit 2997f0
  ~MessageImpl () {
Packit 2997f0
    if (free_rkmessage_)
Packit 2997f0
      rd_kafka_message_destroy(const_cast<rd_kafka_message_t *>(rkmessage_));
Packit 2997f0
    if (key_)
Packit 2997f0
            delete key_;
Packit 2997f0
  };
Packit 2997f0
Packit 2997f0
  MessageImpl (RdKafka::Topic *topic, rd_kafka_message_t *rkmessage):
Packit 2997f0
  topic_(topic), rkmessage_(rkmessage), free_rkmessage_(true), key_(NULL) {}
Packit 2997f0
Packit 2997f0
  MessageImpl (RdKafka::Topic *topic, rd_kafka_message_t *rkmessage,
Packit 2997f0
               bool dofree):
Packit 2997f0
  topic_(topic), rkmessage_(rkmessage), free_rkmessage_(dofree), key_(NULL) { }
Packit 2997f0
Packit 2997f0
  MessageImpl (rd_kafka_message_t *rkmessage):
Packit 2997f0
  topic_(NULL), rkmessage_(rkmessage), free_rkmessage_(true), key_(NULL) {
Packit 2997f0
    if (rkmessage->rkt) {
Packit 2997f0
      /* Possibly NULL */
Packit 2997f0
      topic_ = static_cast<Topic *>(rd_kafka_topic_opaque(rkmessage->rkt));
Packit 2997f0
    }
Packit 2997f0
  }
Packit 2997f0
Packit 2997f0
  /* Create errored message */
Packit 2997f0
  MessageImpl (RdKafka::Topic *topic, RdKafka::ErrorCode err):
Packit 2997f0
  topic_(topic), free_rkmessage_(false), key_(NULL) {
Packit 2997f0
    rkmessage_ = &rkmessage_err_;
Packit 2997f0
    memset(&rkmessage_err_, 0, sizeof(rkmessage_err_));
Packit 2997f0
    rkmessage_err_.err = static_cast<rd_kafka_resp_err_t>(err);
Packit 2997f0
  }
Packit 2997f0
Packit 2997f0
  std::string         errstr() const {
Packit 2997f0
    /* FIXME: If there is an error string in payload (for consume_cb)
Packit 2997f0
     *        it wont be shown since 'payload' is reused for errstr
Packit 2997f0
     *        and we cant distinguish between consumer and producer.
Packit 2997f0
     *        For the producer case the payload needs to be the original
Packit 2997f0
     *        payload pointer. */
Packit 2997f0
    const char *es = rd_kafka_err2str(rkmessage_->err);
Packit 2997f0
    return std::string(es ? es : "");
Packit 2997f0
  }
Packit 2997f0
Packit 2997f0
  ErrorCode           err () const {
Packit 2997f0
    return static_cast<RdKafka::ErrorCode>(rkmessage_->err);
Packit 2997f0
  }
Packit 2997f0
Packit 2997f0
  Topic              *topic () const { return topic_; }
Packit 2997f0
  std::string         topic_name  () const {
Packit 2997f0
          if (rkmessage_->rkt)
Packit 2997f0
                  return rd_kafka_topic_name(rkmessage_->rkt);
Packit 2997f0
          else
Packit 2997f0
                  return "";
Packit 2997f0
  }
Packit 2997f0
  int32_t             partition () const { return rkmessage_->partition; }
Packit 2997f0
  void               *payload () const { return rkmessage_->payload; }
Packit 2997f0
  size_t              len () const { return rkmessage_->len; }
Packit 2997f0
  const std::string  *key () const {
Packit 2997f0
    if (key_) {
Packit 2997f0
      return key_;
Packit 2997f0
    } else if (rkmessage_->key) {
Packit 2997f0
      key_ = new std::string(static_cast<char const*>(rkmessage_->key), rkmessage_->key_len);
Packit 2997f0
      return key_;
Packit 2997f0
    }
Packit 2997f0
    return NULL;
Packit 2997f0
  }
Packit 2997f0
  const void         *key_pointer () const { return rkmessage_->key; }
Packit 2997f0
  size_t              key_len () const { return rkmessage_->key_len; }
Packit 2997f0
Packit 2997f0
  int64_t             offset () const { return rkmessage_->offset; }
Packit 2997f0
Packit 2997f0
  MessageTimestamp   timestamp () const {
Packit 2997f0
	  MessageTimestamp ts;
Packit 2997f0
	  rd_kafka_timestamp_type_t tstype;
Packit 2997f0
	  ts.timestamp = rd_kafka_message_timestamp(rkmessage_, &tstype);
Packit 2997f0
	  ts.type = static_cast<MessageTimestamp::MessageTimestampType>(tstype);
Packit 2997f0
	  return ts;
Packit 2997f0
  }
Packit 2997f0
Packit 2997f0
  void               *msg_opaque () const { return rkmessage_->_private; };
Packit 2997f0
Packit 2997f0
  int64_t             latency () const {
Packit 2997f0
          return rd_kafka_message_latency(rkmessage_);
Packit 2997f0
  }
Packit 2997f0
Packit 2997f0
  struct rd_kafka_message_s *c_ptr () {
Packit 2997f0
          return rkmessage_;
Packit 2997f0
  }
Packit 2997f0
Packit 2997f0
  RdKafka::Topic *topic_;
Packit 2997f0
  rd_kafka_message_t *rkmessage_;
Packit 2997f0
  bool free_rkmessage_;
Packit 2997f0
  /* For error signalling by the C++ layer the .._err_ message is
Packit 2997f0
   * used as a place holder and rkmessage_ is set to point to it. */
Packit 2997f0
  rd_kafka_message_t rkmessage_err_;
Packit 2997f0
  mutable std::string *key_; /* mutable because it's a cached value */
Packit 2997f0
Packit 2997f0
private:
Packit 2997f0
  /* "delete" copy ctor + copy assignment, for safety of key_ */
Packit 2997f0
  MessageImpl(MessageImpl const&) /*= delete*/;
Packit 2997f0
  MessageImpl& operator=(MessageImpl const&) /*= delete*/;
Packit 2997f0
};
Packit 2997f0
Packit 2997f0
Packit 2997f0
class ConfImpl : public Conf {
Packit 2997f0
 public:
Packit 2997f0
  ConfImpl()
Packit 2997f0
      :consume_cb_(NULL),
Packit 2997f0
      dr_cb_(NULL),
Packit 2997f0
      event_cb_(NULL),
Packit 2997f0
      socket_cb_(NULL),
Packit 2997f0
      open_cb_(NULL),
Packit 2997f0
      partitioner_cb_(NULL),
Packit 2997f0
      partitioner_kp_cb_(NULL),
Packit 2997f0
      rebalance_cb_(NULL),
Packit 2997f0
      offset_commit_cb_(NULL),
Packit 2997f0
      rk_conf_(NULL),
Packit 2997f0
      rkt_conf_(NULL){}
Packit 2997f0
  ~ConfImpl () {
Packit 2997f0
    if (rk_conf_)
Packit 2997f0
      rd_kafka_conf_destroy(rk_conf_);
Packit 2997f0
    else if (rkt_conf_)
Packit 2997f0
      rd_kafka_topic_conf_destroy(rkt_conf_);
Packit 2997f0
  }
Packit 2997f0
Packit 2997f0
  Conf::ConfResult set(const std::string &name,
Packit 2997f0
                       const std::string &value,
Packit 2997f0
                       std::string &errstr);
Packit 2997f0
Packit 2997f0
  Conf::ConfResult set (const std::string &name, DeliveryReportCb *dr_cb,
Packit 2997f0
                        std::string &errstr) {
Packit 2997f0
    if (name != "dr_cb") {
Packit 2997f0
      errstr = "Invalid value type, expected RdKafka::DeliveryReportCb";
Packit 2997f0
      return Conf::CONF_INVALID;
Packit 2997f0
    }
Packit 2997f0
Packit 2997f0
    if (!rk_conf_) {
Packit 2997f0
      errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
Packit 2997f0
      return Conf::CONF_INVALID;
Packit 2997f0
    }
Packit 2997f0
Packit 2997f0
    dr_cb_ = dr_cb;
Packit 2997f0
    return Conf::CONF_OK;
Packit 2997f0
  }
Packit 2997f0
Packit 2997f0
  Conf::ConfResult set (const std::string &name, EventCb *event_cb,
Packit 2997f0
                        std::string &errstr) {
Packit 2997f0
    if (name != "event_cb") {
Packit 2997f0
      errstr = "Invalid value type, expected RdKafka::EventCb";
Packit 2997f0
      return Conf::CONF_INVALID;
Packit 2997f0
    }
Packit 2997f0
Packit 2997f0
    if (!rk_conf_) {
Packit 2997f0
      errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
Packit 2997f0
      return Conf::CONF_INVALID;
Packit 2997f0
    }
Packit 2997f0
Packit 2997f0
    event_cb_ = event_cb;
Packit 2997f0
    return Conf::CONF_OK;
Packit 2997f0
  }
Packit 2997f0
Packit 2997f0
  Conf::ConfResult set (const std::string &name, const Conf *topic_conf,
Packit 2997f0
                        std::string &errstr) {
Packit 2997f0
    const ConfImpl *tconf_impl =
Packit 2997f0
        dynamic_cast<const RdKafka::ConfImpl *>(topic_conf);
Packit 2997f0
    if (name != "default_topic_conf" || !tconf_impl->rkt_conf_) {
Packit 2997f0
      errstr = "Invalid value type, expected RdKafka::Conf";
Packit 2997f0
      return Conf::CONF_INVALID;
Packit 2997f0
    }
Packit 2997f0
Packit 2997f0
    if (!rk_conf_) {
Packit 2997f0
      errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
Packit 2997f0
      return Conf::CONF_INVALID;
Packit 2997f0
    }
Packit 2997f0
Packit 2997f0
    rd_kafka_conf_set_default_topic_conf(rk_conf_,
Packit 2997f0
                                         rd_kafka_topic_conf_dup(tconf_impl->
Packit 2997f0
                                                                 rkt_conf_));
Packit 2997f0
Packit 2997f0
    return Conf::CONF_OK;
Packit 2997f0
  }
Packit 2997f0
Packit 2997f0
  Conf::ConfResult set (const std::string &name, PartitionerCb *partitioner_cb,
Packit 2997f0
                        std::string &errstr) {
Packit 2997f0
    if (name != "partitioner_cb") {
Packit 2997f0
      errstr = "Invalid value type, expected RdKafka::PartitionerCb";
Packit 2997f0
      return Conf::CONF_INVALID;
Packit 2997f0
    }
Packit 2997f0
Packit 2997f0
    if (!rkt_conf_) {
Packit 2997f0
      errstr = "Requires RdKafka::Conf::CONF_TOPIC object";
Packit 2997f0
      return Conf::CONF_INVALID;
Packit 2997f0
    }
Packit 2997f0
Packit 2997f0
    partitioner_cb_ = partitioner_cb;
Packit 2997f0
    return Conf::CONF_OK;
Packit 2997f0
  }
Packit 2997f0
Packit 2997f0
  Conf::ConfResult set (const std::string &name,
Packit 2997f0
                        PartitionerKeyPointerCb *partitioner_kp_cb,
Packit 2997f0
                        std::string &errstr) {
Packit 2997f0
    if (name != "partitioner_key_pointer_cb") {
Packit 2997f0
      errstr = "Invalid value type, expected RdKafka::PartitionerKeyPointerCb";
Packit 2997f0
      return Conf::CONF_INVALID;
Packit 2997f0
    }
Packit 2997f0
Packit 2997f0
    if (!rkt_conf_) {
Packit 2997f0
      errstr = "Requires RdKafka::Conf::CONF_TOPIC object";
Packit 2997f0
      return Conf::CONF_INVALID;
Packit 2997f0
    }
Packit 2997f0
Packit 2997f0
    partitioner_kp_cb_ = partitioner_kp_cb;
Packit 2997f0
    return Conf::CONF_OK;
Packit 2997f0
  }
Packit 2997f0
Packit 2997f0
  Conf::ConfResult set (const std::string &name, SocketCb *socket_cb,
Packit 2997f0
                        std::string &errstr) {
Packit 2997f0
    if (name != "socket_cb") {
Packit 2997f0
      errstr = "Invalid value type, expected RdKafka::SocketCb";
Packit 2997f0
      return Conf::CONF_INVALID;
Packit 2997f0
    }
Packit 2997f0
Packit 2997f0
    if (!rk_conf_) {
Packit 2997f0
      errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
Packit 2997f0
      return Conf::CONF_INVALID;
Packit 2997f0
    }
Packit 2997f0
Packit 2997f0
    socket_cb_ = socket_cb;
Packit 2997f0
    return Conf::CONF_OK;
Packit 2997f0
  }
Packit 2997f0
Packit 2997f0
Packit 2997f0
  Conf::ConfResult set (const std::string &name, OpenCb *open_cb,
Packit 2997f0
                        std::string &errstr) {
Packit 2997f0
    if (name != "open_cb") {
Packit 2997f0
      errstr = "Invalid value type, expected RdKafka::OpenCb";
Packit 2997f0
      return Conf::CONF_INVALID;
Packit 2997f0
    }
Packit 2997f0
Packit 2997f0
    if (!rk_conf_) {
Packit 2997f0
      errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
Packit 2997f0
      return Conf::CONF_INVALID;
Packit 2997f0
    }
Packit 2997f0
Packit 2997f0
    open_cb_ = open_cb;
Packit 2997f0
    return Conf::CONF_OK;
Packit 2997f0
  }
Packit 2997f0
Packit 2997f0
Packit 2997f0
Packit 2997f0
Packit 2997f0
  Conf::ConfResult set (const std::string &name, RebalanceCb *rebalance_cb,
Packit 2997f0
                        std::string &errstr) {
Packit 2997f0
    if (name != "rebalance_cb") {
Packit 2997f0
      errstr = "Invalid value type, expected RdKafka::RebalanceCb";
Packit 2997f0
      return Conf::CONF_INVALID;
Packit 2997f0
    }
Packit 2997f0
Packit 2997f0
    if (!rk_conf_) {
Packit 2997f0
      errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
Packit 2997f0
      return Conf::CONF_INVALID;
Packit 2997f0
    }
Packit 2997f0
Packit 2997f0
    rebalance_cb_ = rebalance_cb;
Packit 2997f0
    return Conf::CONF_OK;
Packit 2997f0
  }
Packit 2997f0
Packit 2997f0
Packit 2997f0
  Conf::ConfResult set (const std::string &name,
Packit 2997f0
                        OffsetCommitCb *offset_commit_cb,
Packit 2997f0
                        std::string &errstr) {
Packit 2997f0
    if (name != "offset_commit_cb") {
Packit 2997f0
      errstr = "Invalid value type, expected RdKafka::OffsetCommitCb";
Packit 2997f0
      return Conf::CONF_INVALID;
Packit 2997f0
    }
Packit 2997f0
Packit 2997f0
    if (!rk_conf_) {
Packit 2997f0
      errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
Packit 2997f0
      return Conf::CONF_INVALID;
Packit 2997f0
    }
Packit 2997f0
Packit 2997f0
    offset_commit_cb_ = offset_commit_cb;
Packit 2997f0
    return Conf::CONF_OK;
Packit 2997f0
  }
Packit 2997f0
Packit 2997f0
  Conf::ConfResult get(const std::string &name, std::string &value) const {
Packit 2997f0
    if (name.compare("dr_cb") == 0 ||
Packit 2997f0
        name.compare("event_cb") == 0 ||
Packit 2997f0
        name.compare("partitioner_cb") == 0 ||
Packit 2997f0
        name.compare("partitioner_key_pointer_cb") == 0 ||
Packit 2997f0
        name.compare("socket_cb") == 0 ||
Packit 2997f0
        name.compare("open_cb") == 0 ||
Packit 2997f0
        name.compare("rebalance_cb") == 0 ||
Packit 2997f0
        name.compare("offset_commit_cb") == 0 ) {
Packit 2997f0
      return Conf::CONF_INVALID;
Packit 2997f0
    }
Packit 2997f0
    rd_kafka_conf_res_t res = RD_KAFKA_CONF_INVALID;
Packit 2997f0
Packit 2997f0
    /* Get size of property */
Packit 2997f0
    size_t size;
Packit 2997f0
    if (rk_conf_)
Packit 2997f0
      res = rd_kafka_conf_get(rk_conf_,
Packit 2997f0
                              name.c_str(), NULL, &size);
Packit 2997f0
    else if (rkt_conf_)
Packit 2997f0
      res = rd_kafka_topic_conf_get(rkt_conf_,
Packit 2997f0
                                    name.c_str(), NULL, &size);
Packit 2997f0
    if (res != RD_KAFKA_CONF_OK)
Packit 2997f0
      return static_cast<Conf::ConfResult>(res);
Packit 2997f0
Packit 2997f0
    char *tmpValue = new char[size];
Packit 2997f0
Packit 2997f0
    if (rk_conf_)
Packit 2997f0
      res = rd_kafka_conf_get(rk_conf_, name.c_str(),
Packit 2997f0
                              tmpValue, &size);
Packit 2997f0
    else if (rkt_conf_)
Packit 2997f0
      res = rd_kafka_topic_conf_get(rkt_conf_,
Packit 2997f0
                                    name.c_str(), NULL, &size);
Packit 2997f0
Packit 2997f0
    if (res == RD_KAFKA_CONF_OK)
Packit 2997f0
      value.assign(tmpValue);
Packit 2997f0
    delete[] tmpValue;
Packit 2997f0
Packit 2997f0
    return static_cast<Conf::ConfResult>(res);
Packit 2997f0
  }
Packit 2997f0
Packit 2997f0
  Conf::ConfResult get(DeliveryReportCb *&dr_cb) const {
Packit 2997f0
      if (!rk_conf_)
Packit 2997f0
	  return Conf::CONF_INVALID;
Packit 2997f0
      dr_cb = this->dr_cb_;
Packit 2997f0
      return Conf::CONF_OK;
Packit 2997f0
  }
Packit 2997f0
Packit 2997f0
  Conf::ConfResult get(EventCb *&event_cb) const {
Packit 2997f0
      if (!rk_conf_)
Packit 2997f0
	  return Conf::CONF_INVALID;
Packit 2997f0
      event_cb = this->event_cb_;
Packit 2997f0
      return Conf::CONF_OK;
Packit 2997f0
  }
Packit 2997f0
Packit 2997f0
  Conf::ConfResult get(PartitionerCb *&partitioner_cb) const {
Packit 2997f0
      if (!rkt_conf_)
Packit 2997f0
	  return Conf::CONF_INVALID;
Packit 2997f0
      partitioner_cb = this->partitioner_cb_;
Packit 2997f0
      return Conf::CONF_OK;
Packit 2997f0
  }
Packit 2997f0
Packit 2997f0
  Conf::ConfResult get(PartitionerKeyPointerCb *&partitioner_kp_cb) const {
Packit 2997f0
      if (!rkt_conf_)
Packit 2997f0
	  return Conf::CONF_INVALID;
Packit 2997f0
      partitioner_kp_cb = this->partitioner_kp_cb_;
Packit 2997f0
      return Conf::CONF_OK;
Packit 2997f0
  }
Packit 2997f0
Packit 2997f0
  Conf::ConfResult get(SocketCb *&socket_cb) const {
Packit 2997f0
      if (!rk_conf_)
Packit 2997f0
	  return Conf::CONF_INVALID;
Packit 2997f0
      socket_cb = this->socket_cb_;
Packit 2997f0
      return Conf::CONF_OK;
Packit 2997f0
  }
Packit 2997f0
Packit 2997f0
  Conf::ConfResult get(OpenCb *&open_cb) const {
Packit 2997f0
      if (!rk_conf_)
Packit 2997f0
	  return Conf::CONF_INVALID;
Packit 2997f0
      open_cb = this->open_cb_;
Packit 2997f0
      return Conf::CONF_OK;
Packit 2997f0
  }
Packit 2997f0
Packit 2997f0
  Conf::ConfResult get(RebalanceCb *&rebalance_cb) const {
Packit 2997f0
      if (!rk_conf_)
Packit 2997f0
	  return Conf::CONF_INVALID;
Packit 2997f0
      rebalance_cb = this->rebalance_cb_;
Packit 2997f0
      return Conf::CONF_OK;
Packit 2997f0
  }
Packit 2997f0
Packit 2997f0
  Conf::ConfResult get(OffsetCommitCb *&offset_commit_cb) const {
Packit 2997f0
      if (!rk_conf_)
Packit 2997f0
	  return Conf::CONF_INVALID;
Packit 2997f0
      offset_commit_cb = this->offset_commit_cb_;
Packit 2997f0
      return Conf::CONF_OK;
Packit 2997f0
    }
Packit 2997f0
Packit 2997f0
Packit 2997f0
Packit 2997f0
  std::list<std::string> *dump ();
Packit 2997f0
Packit 2997f0
Packit 2997f0
  Conf::ConfResult set (const std::string &name, ConsumeCb *consume_cb,
Packit 2997f0
                        std::string &errstr) {
Packit 2997f0
    if (name != "consume_cb") {
Packit 2997f0
      errstr = "Invalid value type, expected RdKafka::ConsumeCb";
Packit 2997f0
      return Conf::CONF_INVALID;
Packit 2997f0
    }
Packit 2997f0
Packit 2997f0
    if (!rk_conf_) {
Packit 2997f0
      errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
Packit 2997f0
      return Conf::CONF_INVALID;
Packit 2997f0
    }
Packit 2997f0
Packit 2997f0
    consume_cb_ = consume_cb;
Packit 2997f0
    return Conf::CONF_OK;
Packit 2997f0
  }
Packit 2997f0
Packit 2997f0
Packit 2997f0
  ConsumeCb *consume_cb_;
Packit 2997f0
  DeliveryReportCb *dr_cb_;
Packit 2997f0
  EventCb *event_cb_;
Packit 2997f0
  SocketCb *socket_cb_;
Packit 2997f0
  OpenCb *open_cb_;
Packit 2997f0
  PartitionerCb *partitioner_cb_;
Packit 2997f0
  PartitionerKeyPointerCb *partitioner_kp_cb_;
Packit 2997f0
  RebalanceCb *rebalance_cb_;
Packit 2997f0
  OffsetCommitCb *offset_commit_cb_;
Packit 2997f0
  ConfType conf_type_;
Packit 2997f0
  rd_kafka_conf_t *rk_conf_;
Packit 2997f0
  rd_kafka_topic_conf_t *rkt_conf_;
Packit 2997f0
};
Packit 2997f0
Packit 2997f0
Packit 2997f0
class HandleImpl : virtual public Handle {
Packit 2997f0
 public:
Packit 2997f0
  ~HandleImpl() {};
Packit 2997f0
  HandleImpl () {};
Packit 2997f0
  const std::string name () const { return std::string(rd_kafka_name(rk_)); };
Packit 2997f0
  const std::string memberid () const {
Packit 2997f0
	  char *str = rd_kafka_memberid(rk_);
Packit 2997f0
	  std::string memberid = str ? str : "";
Packit 2997f0
	  if (str)
Packit 2997f0
		  rd_kafka_mem_free(rk_, str);
Packit 2997f0
	  return memberid;
Packit 2997f0
  }
Packit 2997f0
  int poll (int timeout_ms) { return rd_kafka_poll(rk_, timeout_ms); };
Packit 2997f0
  int outq_len () { return rd_kafka_outq_len(rk_); };
Packit 2997f0
Packit 2997f0
  void set_common_config (RdKafka::ConfImpl *confimpl);
Packit 2997f0
Packit 2997f0
  RdKafka::ErrorCode metadata (bool all_topics,const Topic *only_rkt,
Packit 2997f0
            Metadata **metadatap, int timeout_ms);
Packit 2997f0
Packit 2997f0
  ErrorCode pause (std::vector<TopicPartition*> &partitions);
Packit 2997f0
  ErrorCode resume (std::vector<TopicPartition*> &partitions);
Packit 2997f0
Packit 2997f0
  ErrorCode query_watermark_offsets (const std::string &topic,
Packit 2997f0
				     int32_t partition,
Packit 2997f0
				     int64_t *low, int64_t *high,
Packit 2997f0
				     int timeout_ms) {
Packit 2997f0
    return static_cast<RdKafka::ErrorCode>(
Packit 2997f0
        rd_kafka_query_watermark_offsets(
Packit 2997f0
            rk_, topic.c_str(), partition,
Packit 2997f0
            low, high, timeout_ms));
Packit 2997f0
  }
Packit 2997f0
Packit 2997f0
  ErrorCode get_watermark_offsets (const std::string &topic,
Packit 2997f0
                                   int32_t partition,
Packit 2997f0
                                   int64_t *low, int64_t *high) {
Packit 2997f0
    return static_cast<RdKafka::ErrorCode>(
Packit 2997f0
        rd_kafka_get_watermark_offsets(
Packit 2997f0
            rk_, topic.c_str(), partition,
Packit 2997f0
            low, high));
Packit 2997f0
  }
Packit 2997f0
Packit 2997f0
  Queue *get_partition_queue (const TopicPartition *partition);
Packit 2997f0
Packit 2997f0
  ErrorCode offsetsForTimes (std::vector<TopicPartition*> &offsets,
Packit 2997f0
                             int timeout_ms) {
Packit 2997f0
    rd_kafka_topic_partition_list_t *c_offsets = partitions_to_c_parts(offsets);
Packit 2997f0
    ErrorCode err = static_cast<ErrorCode>(
Packit 2997f0
        rd_kafka_offsets_for_times(rk_, c_offsets, timeout_ms));
Packit 2997f0
    update_partitions_from_c_parts(offsets, c_offsets);
Packit 2997f0
    rd_kafka_topic_partition_list_destroy(c_offsets);
Packit 2997f0
    return err;
Packit 2997f0
  }
Packit 2997f0
Packit 2997f0
  ErrorCode set_log_queue (Queue *queue);
Packit 2997f0
Packit 2997f0
  void yield () {
Packit 2997f0
    rd_kafka_yield(rk_);
Packit 2997f0
  }
Packit 2997f0
Packit 2997f0
  const std::string clusterid (int timeout_ms) {
Packit 2997f0
          char *str = rd_kafka_clusterid(rk_, timeout_ms);
Packit 2997f0
          std::string clusterid = str ? str : "";
Packit 2997f0
          if (str)
Packit 2997f0
                  rd_kafka_mem_free(rk_, str);
Packit 2997f0
          return clusterid;
Packit 2997f0
  }
Packit 2997f0
Packit 2997f0
  struct rd_kafka_s *c_ptr () {
Packit 2997f0
          return rk_;
Packit 2997f0
  }
Packit 2997f0
Packit 2997f0
  rd_kafka_t *rk_;
Packit 2997f0
  /* All Producer and Consumer callbacks must reside in HandleImpl and
Packit 2997f0
   * the opaque provided to rdkafka must be a pointer to HandleImpl, since
Packit 2997f0
   * ProducerImpl and ConsumerImpl classes cannot be safely directly cast to
Packit 2997f0
   * HandleImpl due to the skewed diamond inheritance. */
Packit 2997f0
  ConsumeCb *consume_cb_;
Packit 2997f0
  EventCb *event_cb_;
Packit 2997f0
  SocketCb *socket_cb_;
Packit 2997f0
  OpenCb *open_cb_;
Packit 2997f0
  DeliveryReportCb *dr_cb_;
Packit 2997f0
  PartitionerCb *partitioner_cb_;
Packit 2997f0
  PartitionerKeyPointerCb *partitioner_kp_cb_;
Packit 2997f0
  RebalanceCb *rebalance_cb_;
Packit 2997f0
  OffsetCommitCb *offset_commit_cb_;
Packit 2997f0
};
Packit 2997f0
Packit 2997f0
Packit 2997f0
class TopicImpl : public Topic {
Packit 2997f0
 public:
Packit 2997f0
  ~TopicImpl () {
Packit 2997f0
    rd_kafka_topic_destroy(rkt_);
Packit 2997f0
  }
Packit 2997f0
Packit 2997f0
  const std::string name () const {
Packit 2997f0
    return rd_kafka_topic_name(rkt_);
Packit 2997f0
  }
Packit 2997f0
Packit 2997f0
  bool partition_available (int32_t partition) const {
Packit 2997f0
    return !!rd_kafka_topic_partition_available(rkt_, partition);
Packit 2997f0
  }
Packit 2997f0
Packit 2997f0
  ErrorCode offset_store (int32_t partition, int64_t offset) {
Packit 2997f0
    return static_cast<RdKafka::ErrorCode>(
Packit 2997f0
        rd_kafka_offset_store(rkt_, partition, offset));
Packit 2997f0
  }
Packit 2997f0
Packit 2997f0
  static Topic *create (Handle &base, const std::string &topic,
Packit 2997f0
                        Conf *conf);
Packit 2997f0
Packit 2997f0
  struct rd_kafka_topic_s *c_ptr () {
Packit 2997f0
          return rkt_;
Packit 2997f0
  }
Packit 2997f0
Packit 2997f0
  rd_kafka_topic_t *rkt_;
Packit 2997f0
  PartitionerCb *partitioner_cb_;
Packit 2997f0
  PartitionerKeyPointerCb *partitioner_kp_cb_;
Packit 2997f0
};
Packit 2997f0
Packit 2997f0
Packit 2997f0
/**
Packit 2997f0
 * Topic and Partition
Packit 2997f0
 */
Packit 2997f0
class TopicPartitionImpl : public TopicPartition {
Packit 2997f0
public:
Packit 2997f0
  ~TopicPartitionImpl() {};
Packit 2997f0
Packit 2997f0
  static TopicPartition *create (const std::string &topic, int partition);
Packit 2997f0
Packit 2997f0
  TopicPartitionImpl (const std::string &topic, int partition):
Packit 2997f0
  topic_(topic), partition_(partition), offset_(RdKafka::Topic::OFFSET_INVALID),
Packit 2997f0
      err_(ERR_NO_ERROR) {}
Packit 2997f0
Packit 2997f0
  TopicPartitionImpl (const std::string &topic, int partition, int64_t offset):
Packit 2997f0
  topic_(topic), partition_(partition), offset_(offset),
Packit 2997f0
          err_(ERR_NO_ERROR) {}
Packit 2997f0
Packit 2997f0
  TopicPartitionImpl (const rd_kafka_topic_partition_t *c_part) {
Packit 2997f0
    topic_ = std::string(c_part->topic);
Packit 2997f0
    partition_ = c_part->partition;
Packit 2997f0
    offset_ = c_part->offset;
Packit 2997f0
    err_ = static_cast<ErrorCode>(c_part->err);
Packit 2997f0
    // FIXME: metadata
Packit 2997f0
  }
Packit 2997f0
Packit 2997f0
  static void destroy (std::vector<TopicPartition*> &partitions);
Packit 2997f0
Packit 2997f0
  int partition () const { return partition_; }
Packit 2997f0
  const std::string &topic () const { return topic_ ; }
Packit 2997f0
Packit 2997f0
  int64_t offset () const { return offset_; }
Packit 2997f0
Packit 2997f0
  ErrorCode err () const { return err_; }
Packit 2997f0
Packit 2997f0
  void set_offset (int64_t offset) { offset_ = offset; }
Packit 2997f0
Packit 2997f0
  std::ostream& operator<<(std::ostream &ostrm) const {
Packit 2997f0
    return ostrm << topic_ << " [" << partition_ << "]";
Packit 2997f0
  }
Packit 2997f0
Packit 2997f0
  std::string topic_;
Packit 2997f0
  int partition_;
Packit 2997f0
  int64_t offset_;
Packit 2997f0
  ErrorCode err_;
Packit 2997f0
};
Packit 2997f0
Packit 2997f0
Packit 2997f0
Packit 2997f0
class KafkaConsumerImpl : virtual public KafkaConsumer, virtual public HandleImpl {
Packit 2997f0
public:
Packit 2997f0
  ~KafkaConsumerImpl () {
Packit 2997f0
Packit 2997f0
  }
Packit 2997f0
Packit 2997f0
  static KafkaConsumer *create (Conf *conf, std::string &errstr);
Packit 2997f0
Packit 2997f0
  ErrorCode assignment (std::vector<TopicPartition*> &partitions);
Packit 2997f0
  ErrorCode subscription (std::vector<std::string> &topics);
Packit 2997f0
  ErrorCode subscribe (const std::vector<std::string> &topics);
Packit 2997f0
  ErrorCode unsubscribe ();
Packit 2997f0
  ErrorCode assign (const std::vector<TopicPartition*> &partitions);
Packit 2997f0
  ErrorCode unassign ();
Packit 2997f0
Packit 2997f0
  Message *consume (int timeout_ms);
Packit 2997f0
  ErrorCode commitSync () {
Packit 2997f0
    return static_cast<ErrorCode>(rd_kafka_commit(rk_, NULL, 0/*sync*/));
Packit 2997f0
  }
Packit 2997f0
  ErrorCode commitAsync () {
Packit 2997f0
    return static_cast<ErrorCode>(rd_kafka_commit(rk_, NULL, 1/*async*/));
Packit 2997f0
  }
Packit 2997f0
  ErrorCode commitSync (Message *message) {
Packit 2997f0
	  MessageImpl *msgimpl = dynamic_cast<MessageImpl*>(message);
Packit 2997f0
	  return static_cast<ErrorCode>(
Packit 2997f0
                  rd_kafka_commit_message(rk_, msgimpl->rkmessage_, 0/*sync*/));
Packit 2997f0
  }
Packit 2997f0
  ErrorCode commitAsync (Message *message) {
Packit 2997f0
	  MessageImpl *msgimpl = dynamic_cast<MessageImpl*>(message);
Packit 2997f0
	  return static_cast<ErrorCode>(
Packit 2997f0
                  rd_kafka_commit_message(rk_, msgimpl->rkmessage_,1/*async*/));
Packit 2997f0
  }
Packit 2997f0
Packit 2997f0
  ErrorCode commitSync (std::vector<TopicPartition*> &offsets) {
Packit 2997f0
	  rd_kafka_topic_partition_list_t *c_parts =
Packit 2997f0
		  partitions_to_c_parts(offsets);
Packit 2997f0
	  rd_kafka_resp_err_t err =
Packit 2997f0
		  rd_kafka_commit(rk_, c_parts, 0);
Packit 2997f0
	  if (!err)
Packit 2997f0
		  update_partitions_from_c_parts(offsets, c_parts);
Packit 2997f0
	  rd_kafka_topic_partition_list_destroy(c_parts);
Packit 2997f0
	  return static_cast<ErrorCode>(err);
Packit 2997f0
  }
Packit 2997f0
Packit 2997f0
  ErrorCode commitAsync (const std::vector<TopicPartition*> &offsets) {
Packit 2997f0
	  rd_kafka_topic_partition_list_t *c_parts =
Packit 2997f0
		  partitions_to_c_parts(offsets);
Packit 2997f0
	  rd_kafka_resp_err_t err =
Packit 2997f0
		  rd_kafka_commit(rk_, c_parts, 1);
Packit 2997f0
	  rd_kafka_topic_partition_list_destroy(c_parts);
Packit 2997f0
	  return static_cast<ErrorCode>(err);
Packit 2997f0
  }
Packit 2997f0
Packit 2997f0
  ErrorCode commitSync (OffsetCommitCb *offset_commit_cb) {
Packit 2997f0
          return static_cast<ErrorCode>(
Packit 2997f0
                  rd_kafka_commit_queue(rk_, NULL, NULL,
Packit 2997f0
                                        RdKafka::offset_commit_cb_trampoline0,
Packit 2997f0
                                        offset_commit_cb));
Packit 2997f0
  }
Packit 2997f0
Packit 2997f0
  ErrorCode commitSync (std::vector<TopicPartition*> &offsets,
Packit 2997f0
                        OffsetCommitCb *offset_commit_cb) {
Packit 2997f0
          rd_kafka_topic_partition_list_t *c_parts =
Packit 2997f0
                  partitions_to_c_parts(offsets);
Packit 2997f0
          rd_kafka_resp_err_t err =
Packit 2997f0
                  rd_kafka_commit_queue(rk_, c_parts, NULL,
Packit 2997f0
                                        RdKafka::offset_commit_cb_trampoline0,
Packit 2997f0
                                        offset_commit_cb);
Packit 2997f0
          rd_kafka_topic_partition_list_destroy(c_parts);
Packit 2997f0
          return static_cast<ErrorCode>(err);
Packit 2997f0
  }
Packit 2997f0
Packit 2997f0
  ErrorCode committed (std::vector<TopicPartition*> &partitions, int timeout_ms);
Packit 2997f0
  ErrorCode position (std::vector<TopicPartition*> &partitions);
Packit 2997f0
Packit 2997f0
  ErrorCode close ();
Packit 2997f0
Packit 2997f0
  ErrorCode seek (const TopicPartition &partition, int timeout_ms);
Packit 2997f0
Packit 2997f0
  ErrorCode offsets_store (std::vector<TopicPartition*> &offsets) {
Packit 2997f0
          rd_kafka_topic_partition_list_t *c_parts =
Packit 2997f0
                  partitions_to_c_parts(offsets);
Packit 2997f0
          rd_kafka_resp_err_t err =
Packit 2997f0
                  rd_kafka_offsets_store(rk_, c_parts);
Packit 2997f0
          update_partitions_from_c_parts(offsets, c_parts);
Packit 2997f0
          rd_kafka_topic_partition_list_destroy(c_parts);
Packit 2997f0
          return static_cast<ErrorCode>(err);
Packit 2997f0
  }
Packit 2997f0
Packit 2997f0
};
Packit 2997f0
Packit 2997f0
Packit 2997f0
class MetadataImpl : public Metadata {
Packit 2997f0
 public:
Packit 2997f0
  MetadataImpl(const rd_kafka_metadata_t *metadata);
Packit 2997f0
  ~MetadataImpl();
Packit 2997f0
Packit 2997f0
  const std::vector<const BrokerMetadata *> *brokers() const {
Packit 2997f0
    return &brokers_;
Packit 2997f0
  }
Packit 2997f0
Packit 2997f0
  const std::vector<const TopicMetadata *>  *topics() const {
Packit 2997f0
    return &topics_;
Packit 2997f0
  }
Packit 2997f0
Packit 2997f0
  const std::string orig_broker_name() const {
Packit 2997f0
    return std::string(metadata_->orig_broker_name);
Packit 2997f0
  }
Packit 2997f0
Packit 2997f0
  int32_t orig_broker_id() const {
Packit 2997f0
    return metadata_->orig_broker_id;
Packit 2997f0
  }
Packit 2997f0
Packit 2997f0
private:
Packit 2997f0
  const rd_kafka_metadata_t *metadata_;
Packit 2997f0
  std::vector<const BrokerMetadata *> brokers_;
Packit 2997f0
  std::vector<const TopicMetadata *> topics_;
Packit 2997f0
  std::string orig_broker_name_;
Packit 2997f0
};
Packit 2997f0
Packit 2997f0
Packit 2997f0
class QueueImpl : virtual public Queue {
Packit 2997f0
 public:
Packit 2997f0
  ~QueueImpl () {
Packit 2997f0
    rd_kafka_queue_destroy(queue_);
Packit 2997f0
  }
Packit 2997f0
  static Queue *create (Handle *base);
Packit 2997f0
  ErrorCode forward (Queue *queue);
Packit 2997f0
  Message *consume (int timeout_ms);
Packit 2997f0
  int poll (int timeout_ms);
Packit 2997f0
  void io_event_enable(int fd, const void *payload, size_t size);
Packit 2997f0
Packit 2997f0
  rd_kafka_queue_t *queue_;
Packit 2997f0
};
Packit 2997f0
Packit 2997f0
Packit 2997f0
Packit 2997f0
Packit 2997f0
Packit 2997f0
class ConsumerImpl : virtual public Consumer, virtual public HandleImpl {
Packit 2997f0
 public:
Packit 2997f0
  ~ConsumerImpl () {
Packit 2997f0
    rd_kafka_destroy(rk_); };
Packit 2997f0
  static Consumer *create (Conf *conf, std::string &errstr);
Packit 2997f0
Packit 2997f0
  ErrorCode start (Topic *topic, int32_t partition, int64_t offset);
Packit 2997f0
  ErrorCode start (Topic *topic, int32_t partition, int64_t offset,
Packit 2997f0
                   Queue *queue);
Packit 2997f0
  ErrorCode stop (Topic *topic, int32_t partition);
Packit 2997f0
  ErrorCode seek (Topic *topic, int32_t partition, int64_t offset,
Packit 2997f0
		  int timeout_ms);
Packit 2997f0
  Message *consume (Topic *topic, int32_t partition, int timeout_ms);
Packit 2997f0
  Message *consume (Queue *queue, int timeout_ms);
Packit 2997f0
  int consume_callback (Topic *topic, int32_t partition, int timeout_ms,
Packit 2997f0
                        ConsumeCb *cb, void *opaque);
Packit 2997f0
  int consume_callback (Queue *queue, int timeout_ms,
Packit 2997f0
                        RdKafka::ConsumeCb *consume_cb, void *opaque);
Packit 2997f0
};
Packit 2997f0
Packit 2997f0
Packit 2997f0
Packit 2997f0
class ProducerImpl : virtual public Producer, virtual public HandleImpl {
Packit 2997f0
Packit 2997f0
 public:
Packit 2997f0
  ~ProducerImpl () { if (rk_) rd_kafka_destroy(rk_); };
Packit 2997f0
Packit 2997f0
  ErrorCode produce (Topic *topic, int32_t partition,
Packit 2997f0
                     int msgflags,
Packit 2997f0
                     void *payload, size_t len,
Packit 2997f0
                     const std::string *key,
Packit 2997f0
                     void *msg_opaque);
Packit 2997f0
Packit 2997f0
  ErrorCode produce (Topic *topic, int32_t partition,
Packit 2997f0
                     int msgflags,
Packit 2997f0
                     void *payload, size_t len,
Packit 2997f0
                     const void *key, size_t key_len,
Packit 2997f0
                     void *msg_opaque);
Packit 2997f0
Packit 2997f0
  ErrorCode produce (Topic *topic, int32_t partition,
Packit 2997f0
                     const std::vector<char> *payload,
Packit 2997f0
                     const std::vector<char> *key,
Packit 2997f0
                     void *msg_opaque);
Packit 2997f0
Packit 2997f0
  ErrorCode produce (const std::string topic_name, int32_t partition,
Packit 2997f0
                     int msgflags,
Packit 2997f0
                     void *payload, size_t len,
Packit 2997f0
                     const void *key, size_t key_len,
Packit 2997f0
                     int64_t timestamp,
Packit 2997f0
                     void *msg_opaque);
Packit 2997f0
Packit 2997f0
  ErrorCode flush (int timeout_ms) {
Packit 2997f0
	  return static_cast<RdKafka::ErrorCode>(rd_kafka_flush(rk_,
Packit 2997f0
								timeout_ms));
Packit 2997f0
  }
Packit 2997f0
Packit 2997f0
  static Producer *create (Conf *conf, std::string &errstr);
Packit 2997f0
Packit 2997f0
};
Packit 2997f0
Packit 2997f0
Packit 2997f0
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
#endif /* _RDKAFKACPP_INT_H_ */