|
Packit |
2997f0 |
/*
|
|
Packit |
2997f0 |
* librdkafka - Apache Kafka C/C++ library
|
|
Packit |
2997f0 |
*
|
|
Packit |
2997f0 |
* Copyright (c) 2014 Magnus Edenhill
|
|
Packit |
2997f0 |
* All rights reserved.
|
|
Packit |
2997f0 |
*
|
|
Packit |
2997f0 |
* Redistribution and use in source and binary forms, with or without
|
|
Packit |
2997f0 |
* modification, are permitted provided that the following conditions are met:
|
|
Packit |
2997f0 |
*
|
|
Packit |
2997f0 |
* 1. Redistributions of source code must retain the above copyright notice,
|
|
Packit |
2997f0 |
* this list of conditions and the following disclaimer.
|
|
Packit |
2997f0 |
* 2. Redistributions in binary form must reproduce the above copyright notice,
|
|
Packit |
2997f0 |
* this list of conditions and the following disclaimer in the documentation
|
|
Packit |
2997f0 |
* and/or other materials provided with the distribution.
|
|
Packit |
2997f0 |
*
|
|
Packit |
2997f0 |
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
|
Packit |
2997f0 |
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
|
Packit |
2997f0 |
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
|
Packit |
2997f0 |
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
|
|
Packit |
2997f0 |
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
|
|
Packit |
2997f0 |
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
|
|
Packit |
2997f0 |
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
|
Packit |
2997f0 |
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
|
|
Packit |
2997f0 |
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
|
|
Packit |
2997f0 |
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
|
Packit |
2997f0 |
* POSSIBILITY OF SUCH DAMAGE.
|
|
Packit |
2997f0 |
*/
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
#ifndef _RDKAFKACPP_INT_H_
|
|
Packit |
2997f0 |
#define _RDKAFKACPP_INT_H_
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
#include <string>
|
|
Packit |
2997f0 |
#include <iostream>
|
|
Packit |
2997f0 |
#include <cstring>
|
|
Packit |
2997f0 |
#include <stdlib.h>
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
#include "rdkafkacpp.h"
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
extern "C" {
|
|
Packit |
2997f0 |
#include "../src/rdkafka.h"
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
#ifdef _MSC_VER
|
|
Packit |
2997f0 |
typedef int mode_t;
|
|
Packit |
2997f0 |
#pragma warning(disable : 4250)
|
|
Packit |
2997f0 |
#endif
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
namespace RdKafka {
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
void consume_cb_trampoline(rd_kafka_message_t *msg, void *opaque);
|
|
Packit |
2997f0 |
void log_cb_trampoline (const rd_kafka_t *rk, int level,
|
|
Packit |
2997f0 |
const char *fac, const char *buf);
|
|
Packit |
2997f0 |
void error_cb_trampoline (rd_kafka_t *rk, int err, const char *reason,
|
|
Packit |
2997f0 |
void *opaque);
|
|
Packit |
2997f0 |
void throttle_cb_trampoline (rd_kafka_t *rk, const char *broker_name,
|
|
Packit |
2997f0 |
int32_t broker_id, int throttle_time_ms,
|
|
Packit |
2997f0 |
void *opaque);
|
|
Packit |
2997f0 |
int stats_cb_trampoline (rd_kafka_t *rk, char *json, size_t json_len,
|
|
Packit |
2997f0 |
void *opaque);
|
|
Packit |
2997f0 |
int socket_cb_trampoline (int domain, int type, int protocol, void *opaque);
|
|
Packit |
2997f0 |
int open_cb_trampoline (const char *pathname, int flags, mode_t mode,
|
|
Packit |
2997f0 |
void *opaque);
|
|
Packit |
2997f0 |
void rebalance_cb_trampoline (rd_kafka_t *rk,
|
|
Packit |
2997f0 |
rd_kafka_resp_err_t err,
|
|
Packit |
2997f0 |
rd_kafka_topic_partition_list_t *c_partitions,
|
|
Packit |
2997f0 |
void *opaque);
|
|
Packit |
2997f0 |
void offset_commit_cb_trampoline0 (
|
|
Packit |
2997f0 |
rd_kafka_t *rk,
|
|
Packit |
2997f0 |
rd_kafka_resp_err_t err,
|
|
Packit |
2997f0 |
rd_kafka_topic_partition_list_t *c_offsets, void *opaque);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
rd_kafka_topic_partition_list_t *
|
|
Packit |
2997f0 |
partitions_to_c_parts (const std::vector<TopicPartition*> &partitions);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/**
|
|
Packit |
2997f0 |
* @brief Update the application provided 'partitions' with info from 'c_parts'
|
|
Packit |
2997f0 |
*/
|
|
Packit |
2997f0 |
void update_partitions_from_c_parts (std::vector<TopicPartition*> &partitions,
|
|
Packit |
2997f0 |
const rd_kafka_topic_partition_list_t *c_parts);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
class EventImpl : public Event {
|
|
Packit |
2997f0 |
public:
|
|
Packit |
2997f0 |
~EventImpl () {};
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
EventImpl (Type type, ErrorCode err, Severity severity,
|
|
Packit |
2997f0 |
const char *fac, const char *str):
|
|
Packit |
2997f0 |
type_(type), err_(err), severity_(severity), fac_(fac ? fac : ""),
|
|
Packit |
2997f0 |
str_(str), id_(0), throttle_time_(0) {};
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
EventImpl (Type type):
|
|
Packit |
2997f0 |
type_(type), err_(ERR_NO_ERROR), severity_(EVENT_SEVERITY_EMERG),
|
|
Packit |
2997f0 |
fac_(""), str_(""), id_(0), throttle_time_(0) {};
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
Type type () const { return type_; }
|
|
Packit |
2997f0 |
ErrorCode err () const { return err_; }
|
|
Packit |
2997f0 |
Severity severity () const { return severity_; }
|
|
Packit |
2997f0 |
std::string fac () const { return fac_; }
|
|
Packit |
2997f0 |
std::string str () const { return str_; }
|
|
Packit |
2997f0 |
std::string broker_name () const {
|
|
Packit |
2997f0 |
if (type_ == EVENT_THROTTLE)
|
|
Packit |
2997f0 |
return str_;
|
|
Packit |
2997f0 |
else
|
|
Packit |
2997f0 |
return std::string("");
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
int broker_id () const { return id_; }
|
|
Packit |
2997f0 |
int throttle_time () const { return throttle_time_; }
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
Type type_;
|
|
Packit |
2997f0 |
ErrorCode err_;
|
|
Packit |
2997f0 |
Severity severity_;
|
|
Packit |
2997f0 |
std::string fac_;
|
|
Packit |
2997f0 |
std::string str_; /* reused for THROTTLE broker_name */
|
|
Packit |
2997f0 |
int id_;
|
|
Packit |
2997f0 |
int throttle_time_;
|
|
Packit |
2997f0 |
};
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
class MessageImpl : public Message {
|
|
Packit |
2997f0 |
public:
|
|
Packit |
2997f0 |
~MessageImpl () {
|
|
Packit |
2997f0 |
if (free_rkmessage_)
|
|
Packit |
2997f0 |
rd_kafka_message_destroy(const_cast<rd_kafka_message_t *>(rkmessage_));
|
|
Packit |
2997f0 |
if (key_)
|
|
Packit |
2997f0 |
delete key_;
|
|
Packit |
2997f0 |
};
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
MessageImpl (RdKafka::Topic *topic, rd_kafka_message_t *rkmessage):
|
|
Packit |
2997f0 |
topic_(topic), rkmessage_(rkmessage), free_rkmessage_(true), key_(NULL) {}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
MessageImpl (RdKafka::Topic *topic, rd_kafka_message_t *rkmessage,
|
|
Packit |
2997f0 |
bool dofree):
|
|
Packit |
2997f0 |
topic_(topic), rkmessage_(rkmessage), free_rkmessage_(dofree), key_(NULL) { }
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
MessageImpl (rd_kafka_message_t *rkmessage):
|
|
Packit |
2997f0 |
topic_(NULL), rkmessage_(rkmessage), free_rkmessage_(true), key_(NULL) {
|
|
Packit |
2997f0 |
if (rkmessage->rkt) {
|
|
Packit |
2997f0 |
/* Possibly NULL */
|
|
Packit |
2997f0 |
topic_ = static_cast<Topic *>(rd_kafka_topic_opaque(rkmessage->rkt));
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Create errored message */
|
|
Packit |
2997f0 |
MessageImpl (RdKafka::Topic *topic, RdKafka::ErrorCode err):
|
|
Packit |
2997f0 |
topic_(topic), free_rkmessage_(false), key_(NULL) {
|
|
Packit |
2997f0 |
rkmessage_ = &rkmessage_err_;
|
|
Packit |
2997f0 |
memset(&rkmessage_err_, 0, sizeof(rkmessage_err_));
|
|
Packit |
2997f0 |
rkmessage_err_.err = static_cast<rd_kafka_resp_err_t>(err);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
std::string errstr() const {
|
|
Packit |
2997f0 |
/* FIXME: If there is an error string in payload (for consume_cb)
|
|
Packit |
2997f0 |
* it wont be shown since 'payload' is reused for errstr
|
|
Packit |
2997f0 |
* and we cant distinguish between consumer and producer.
|
|
Packit |
2997f0 |
* For the producer case the payload needs to be the original
|
|
Packit |
2997f0 |
* payload pointer. */
|
|
Packit |
2997f0 |
const char *es = rd_kafka_err2str(rkmessage_->err);
|
|
Packit |
2997f0 |
return std::string(es ? es : "");
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
ErrorCode err () const {
|
|
Packit |
2997f0 |
return static_cast<RdKafka::ErrorCode>(rkmessage_->err);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
Topic *topic () const { return topic_; }
|
|
Packit |
2997f0 |
std::string topic_name () const {
|
|
Packit |
2997f0 |
if (rkmessage_->rkt)
|
|
Packit |
2997f0 |
return rd_kafka_topic_name(rkmessage_->rkt);
|
|
Packit |
2997f0 |
else
|
|
Packit |
2997f0 |
return "";
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
int32_t partition () const { return rkmessage_->partition; }
|
|
Packit |
2997f0 |
void *payload () const { return rkmessage_->payload; }
|
|
Packit |
2997f0 |
size_t len () const { return rkmessage_->len; }
|
|
Packit |
2997f0 |
const std::string *key () const {
|
|
Packit |
2997f0 |
if (key_) {
|
|
Packit |
2997f0 |
return key_;
|
|
Packit |
2997f0 |
} else if (rkmessage_->key) {
|
|
Packit |
2997f0 |
key_ = new std::string(static_cast<char const*>(rkmessage_->key), rkmessage_->key_len);
|
|
Packit |
2997f0 |
return key_;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
return NULL;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
const void *key_pointer () const { return rkmessage_->key; }
|
|
Packit |
2997f0 |
size_t key_len () const { return rkmessage_->key_len; }
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
int64_t offset () const { return rkmessage_->offset; }
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
MessageTimestamp timestamp () const {
|
|
Packit |
2997f0 |
MessageTimestamp ts;
|
|
Packit |
2997f0 |
rd_kafka_timestamp_type_t tstype;
|
|
Packit |
2997f0 |
ts.timestamp = rd_kafka_message_timestamp(rkmessage_, &tstype);
|
|
Packit |
2997f0 |
ts.type = static_cast<MessageTimestamp::MessageTimestampType>(tstype);
|
|
Packit |
2997f0 |
return ts;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
void *msg_opaque () const { return rkmessage_->_private; };
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
int64_t latency () const {
|
|
Packit |
2997f0 |
return rd_kafka_message_latency(rkmessage_);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
struct rd_kafka_message_s *c_ptr () {
|
|
Packit |
2997f0 |
return rkmessage_;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
RdKafka::Topic *topic_;
|
|
Packit |
2997f0 |
rd_kafka_message_t *rkmessage_;
|
|
Packit |
2997f0 |
bool free_rkmessage_;
|
|
Packit |
2997f0 |
/* For error signalling by the C++ layer the .._err_ message is
|
|
Packit |
2997f0 |
* used as a place holder and rkmessage_ is set to point to it. */
|
|
Packit |
2997f0 |
rd_kafka_message_t rkmessage_err_;
|
|
Packit |
2997f0 |
mutable std::string *key_; /* mutable because it's a cached value */
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
private:
|
|
Packit |
2997f0 |
/* "delete" copy ctor + copy assignment, for safety of key_ */
|
|
Packit |
2997f0 |
MessageImpl(MessageImpl const&) /*= delete*/;
|
|
Packit |
2997f0 |
MessageImpl& operator=(MessageImpl const&) /*= delete*/;
|
|
Packit |
2997f0 |
};
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
class ConfImpl : public Conf {
|
|
Packit |
2997f0 |
public:
|
|
Packit |
2997f0 |
ConfImpl()
|
|
Packit |
2997f0 |
:consume_cb_(NULL),
|
|
Packit |
2997f0 |
dr_cb_(NULL),
|
|
Packit |
2997f0 |
event_cb_(NULL),
|
|
Packit |
2997f0 |
socket_cb_(NULL),
|
|
Packit |
2997f0 |
open_cb_(NULL),
|
|
Packit |
2997f0 |
partitioner_cb_(NULL),
|
|
Packit |
2997f0 |
partitioner_kp_cb_(NULL),
|
|
Packit |
2997f0 |
rebalance_cb_(NULL),
|
|
Packit |
2997f0 |
offset_commit_cb_(NULL),
|
|
Packit |
2997f0 |
rk_conf_(NULL),
|
|
Packit |
2997f0 |
rkt_conf_(NULL){}
|
|
Packit |
2997f0 |
~ConfImpl () {
|
|
Packit |
2997f0 |
if (rk_conf_)
|
|
Packit |
2997f0 |
rd_kafka_conf_destroy(rk_conf_);
|
|
Packit |
2997f0 |
else if (rkt_conf_)
|
|
Packit |
2997f0 |
rd_kafka_topic_conf_destroy(rkt_conf_);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
Conf::ConfResult set(const std::string &name,
|
|
Packit |
2997f0 |
const std::string &value,
|
|
Packit |
2997f0 |
std::string &errstr);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
Conf::ConfResult set (const std::string &name, DeliveryReportCb *dr_cb,
|
|
Packit |
2997f0 |
std::string &errstr) {
|
|
Packit |
2997f0 |
if (name != "dr_cb") {
|
|
Packit |
2997f0 |
errstr = "Invalid value type, expected RdKafka::DeliveryReportCb";
|
|
Packit |
2997f0 |
return Conf::CONF_INVALID;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (!rk_conf_) {
|
|
Packit |
2997f0 |
errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
|
|
Packit |
2997f0 |
return Conf::CONF_INVALID;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
dr_cb_ = dr_cb;
|
|
Packit |
2997f0 |
return Conf::CONF_OK;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
Conf::ConfResult set (const std::string &name, EventCb *event_cb,
|
|
Packit |
2997f0 |
std::string &errstr) {
|
|
Packit |
2997f0 |
if (name != "event_cb") {
|
|
Packit |
2997f0 |
errstr = "Invalid value type, expected RdKafka::EventCb";
|
|
Packit |
2997f0 |
return Conf::CONF_INVALID;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (!rk_conf_) {
|
|
Packit |
2997f0 |
errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
|
|
Packit |
2997f0 |
return Conf::CONF_INVALID;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
event_cb_ = event_cb;
|
|
Packit |
2997f0 |
return Conf::CONF_OK;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
Conf::ConfResult set (const std::string &name, const Conf *topic_conf,
|
|
Packit |
2997f0 |
std::string &errstr) {
|
|
Packit |
2997f0 |
const ConfImpl *tconf_impl =
|
|
Packit |
2997f0 |
dynamic_cast<const RdKafka::ConfImpl *>(topic_conf);
|
|
Packit |
2997f0 |
if (name != "default_topic_conf" || !tconf_impl->rkt_conf_) {
|
|
Packit |
2997f0 |
errstr = "Invalid value type, expected RdKafka::Conf";
|
|
Packit |
2997f0 |
return Conf::CONF_INVALID;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (!rk_conf_) {
|
|
Packit |
2997f0 |
errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
|
|
Packit |
2997f0 |
return Conf::CONF_INVALID;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
rd_kafka_conf_set_default_topic_conf(rk_conf_,
|
|
Packit |
2997f0 |
rd_kafka_topic_conf_dup(tconf_impl->
|
|
Packit |
2997f0 |
rkt_conf_));
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
return Conf::CONF_OK;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
Conf::ConfResult set (const std::string &name, PartitionerCb *partitioner_cb,
|
|
Packit |
2997f0 |
std::string &errstr) {
|
|
Packit |
2997f0 |
if (name != "partitioner_cb") {
|
|
Packit |
2997f0 |
errstr = "Invalid value type, expected RdKafka::PartitionerCb";
|
|
Packit |
2997f0 |
return Conf::CONF_INVALID;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (!rkt_conf_) {
|
|
Packit |
2997f0 |
errstr = "Requires RdKafka::Conf::CONF_TOPIC object";
|
|
Packit |
2997f0 |
return Conf::CONF_INVALID;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
partitioner_cb_ = partitioner_cb;
|
|
Packit |
2997f0 |
return Conf::CONF_OK;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
Conf::ConfResult set (const std::string &name,
|
|
Packit |
2997f0 |
PartitionerKeyPointerCb *partitioner_kp_cb,
|
|
Packit |
2997f0 |
std::string &errstr) {
|
|
Packit |
2997f0 |
if (name != "partitioner_key_pointer_cb") {
|
|
Packit |
2997f0 |
errstr = "Invalid value type, expected RdKafka::PartitionerKeyPointerCb";
|
|
Packit |
2997f0 |
return Conf::CONF_INVALID;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (!rkt_conf_) {
|
|
Packit |
2997f0 |
errstr = "Requires RdKafka::Conf::CONF_TOPIC object";
|
|
Packit |
2997f0 |
return Conf::CONF_INVALID;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
partitioner_kp_cb_ = partitioner_kp_cb;
|
|
Packit |
2997f0 |
return Conf::CONF_OK;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
Conf::ConfResult set (const std::string &name, SocketCb *socket_cb,
|
|
Packit |
2997f0 |
std::string &errstr) {
|
|
Packit |
2997f0 |
if (name != "socket_cb") {
|
|
Packit |
2997f0 |
errstr = "Invalid value type, expected RdKafka::SocketCb";
|
|
Packit |
2997f0 |
return Conf::CONF_INVALID;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (!rk_conf_) {
|
|
Packit |
2997f0 |
errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
|
|
Packit |
2997f0 |
return Conf::CONF_INVALID;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
socket_cb_ = socket_cb;
|
|
Packit |
2997f0 |
return Conf::CONF_OK;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
Conf::ConfResult set (const std::string &name, OpenCb *open_cb,
|
|
Packit |
2997f0 |
std::string &errstr) {
|
|
Packit |
2997f0 |
if (name != "open_cb") {
|
|
Packit |
2997f0 |
errstr = "Invalid value type, expected RdKafka::OpenCb";
|
|
Packit |
2997f0 |
return Conf::CONF_INVALID;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (!rk_conf_) {
|
|
Packit |
2997f0 |
errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
|
|
Packit |
2997f0 |
return Conf::CONF_INVALID;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
open_cb_ = open_cb;
|
|
Packit |
2997f0 |
return Conf::CONF_OK;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
Conf::ConfResult set (const std::string &name, RebalanceCb *rebalance_cb,
|
|
Packit |
2997f0 |
std::string &errstr) {
|
|
Packit |
2997f0 |
if (name != "rebalance_cb") {
|
|
Packit |
2997f0 |
errstr = "Invalid value type, expected RdKafka::RebalanceCb";
|
|
Packit |
2997f0 |
return Conf::CONF_INVALID;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (!rk_conf_) {
|
|
Packit |
2997f0 |
errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
|
|
Packit |
2997f0 |
return Conf::CONF_INVALID;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
rebalance_cb_ = rebalance_cb;
|
|
Packit |
2997f0 |
return Conf::CONF_OK;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
Conf::ConfResult set (const std::string &name,
|
|
Packit |
2997f0 |
OffsetCommitCb *offset_commit_cb,
|
|
Packit |
2997f0 |
std::string &errstr) {
|
|
Packit |
2997f0 |
if (name != "offset_commit_cb") {
|
|
Packit |
2997f0 |
errstr = "Invalid value type, expected RdKafka::OffsetCommitCb";
|
|
Packit |
2997f0 |
return Conf::CONF_INVALID;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (!rk_conf_) {
|
|
Packit |
2997f0 |
errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
|
|
Packit |
2997f0 |
return Conf::CONF_INVALID;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
offset_commit_cb_ = offset_commit_cb;
|
|
Packit |
2997f0 |
return Conf::CONF_OK;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
Conf::ConfResult get(const std::string &name, std::string &value) const {
|
|
Packit |
2997f0 |
if (name.compare("dr_cb") == 0 ||
|
|
Packit |
2997f0 |
name.compare("event_cb") == 0 ||
|
|
Packit |
2997f0 |
name.compare("partitioner_cb") == 0 ||
|
|
Packit |
2997f0 |
name.compare("partitioner_key_pointer_cb") == 0 ||
|
|
Packit |
2997f0 |
name.compare("socket_cb") == 0 ||
|
|
Packit |
2997f0 |
name.compare("open_cb") == 0 ||
|
|
Packit |
2997f0 |
name.compare("rebalance_cb") == 0 ||
|
|
Packit |
2997f0 |
name.compare("offset_commit_cb") == 0 ) {
|
|
Packit |
2997f0 |
return Conf::CONF_INVALID;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
rd_kafka_conf_res_t res = RD_KAFKA_CONF_INVALID;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Get size of property */
|
|
Packit |
2997f0 |
size_t size;
|
|
Packit |
2997f0 |
if (rk_conf_)
|
|
Packit |
2997f0 |
res = rd_kafka_conf_get(rk_conf_,
|
|
Packit |
2997f0 |
name.c_str(), NULL, &size);
|
|
Packit |
2997f0 |
else if (rkt_conf_)
|
|
Packit |
2997f0 |
res = rd_kafka_topic_conf_get(rkt_conf_,
|
|
Packit |
2997f0 |
name.c_str(), NULL, &size);
|
|
Packit |
2997f0 |
if (res != RD_KAFKA_CONF_OK)
|
|
Packit |
2997f0 |
return static_cast<Conf::ConfResult>(res);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
char *tmpValue = new char[size];
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (rk_conf_)
|
|
Packit |
2997f0 |
res = rd_kafka_conf_get(rk_conf_, name.c_str(),
|
|
Packit |
2997f0 |
tmpValue, &size);
|
|
Packit |
2997f0 |
else if (rkt_conf_)
|
|
Packit |
2997f0 |
res = rd_kafka_topic_conf_get(rkt_conf_,
|
|
Packit |
2997f0 |
name.c_str(), NULL, &size);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (res == RD_KAFKA_CONF_OK)
|
|
Packit |
2997f0 |
value.assign(tmpValue);
|
|
Packit |
2997f0 |
delete[] tmpValue;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
return static_cast<Conf::ConfResult>(res);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
Conf::ConfResult get(DeliveryReportCb *&dr_cb) const {
|
|
Packit |
2997f0 |
if (!rk_conf_)
|
|
Packit |
2997f0 |
return Conf::CONF_INVALID;
|
|
Packit |
2997f0 |
dr_cb = this->dr_cb_;
|
|
Packit |
2997f0 |
return Conf::CONF_OK;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
Conf::ConfResult get(EventCb *&event_cb) const {
|
|
Packit |
2997f0 |
if (!rk_conf_)
|
|
Packit |
2997f0 |
return Conf::CONF_INVALID;
|
|
Packit |
2997f0 |
event_cb = this->event_cb_;
|
|
Packit |
2997f0 |
return Conf::CONF_OK;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
Conf::ConfResult get(PartitionerCb *&partitioner_cb) const {
|
|
Packit |
2997f0 |
if (!rkt_conf_)
|
|
Packit |
2997f0 |
return Conf::CONF_INVALID;
|
|
Packit |
2997f0 |
partitioner_cb = this->partitioner_cb_;
|
|
Packit |
2997f0 |
return Conf::CONF_OK;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
Conf::ConfResult get(PartitionerKeyPointerCb *&partitioner_kp_cb) const {
|
|
Packit |
2997f0 |
if (!rkt_conf_)
|
|
Packit |
2997f0 |
return Conf::CONF_INVALID;
|
|
Packit |
2997f0 |
partitioner_kp_cb = this->partitioner_kp_cb_;
|
|
Packit |
2997f0 |
return Conf::CONF_OK;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
Conf::ConfResult get(SocketCb *&socket_cb) const {
|
|
Packit |
2997f0 |
if (!rk_conf_)
|
|
Packit |
2997f0 |
return Conf::CONF_INVALID;
|
|
Packit |
2997f0 |
socket_cb = this->socket_cb_;
|
|
Packit |
2997f0 |
return Conf::CONF_OK;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
Conf::ConfResult get(OpenCb *&open_cb) const {
|
|
Packit |
2997f0 |
if (!rk_conf_)
|
|
Packit |
2997f0 |
return Conf::CONF_INVALID;
|
|
Packit |
2997f0 |
open_cb = this->open_cb_;
|
|
Packit |
2997f0 |
return Conf::CONF_OK;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
Conf::ConfResult get(RebalanceCb *&rebalance_cb) const {
|
|
Packit |
2997f0 |
if (!rk_conf_)
|
|
Packit |
2997f0 |
return Conf::CONF_INVALID;
|
|
Packit |
2997f0 |
rebalance_cb = this->rebalance_cb_;
|
|
Packit |
2997f0 |
return Conf::CONF_OK;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
Conf::ConfResult get(OffsetCommitCb *&offset_commit_cb) const {
|
|
Packit |
2997f0 |
if (!rk_conf_)
|
|
Packit |
2997f0 |
return Conf::CONF_INVALID;
|
|
Packit |
2997f0 |
offset_commit_cb = this->offset_commit_cb_;
|
|
Packit |
2997f0 |
return Conf::CONF_OK;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
std::list<std::string> *dump ();
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
Conf::ConfResult set (const std::string &name, ConsumeCb *consume_cb,
|
|
Packit |
2997f0 |
std::string &errstr) {
|
|
Packit |
2997f0 |
if (name != "consume_cb") {
|
|
Packit |
2997f0 |
errstr = "Invalid value type, expected RdKafka::ConsumeCb";
|
|
Packit |
2997f0 |
return Conf::CONF_INVALID;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (!rk_conf_) {
|
|
Packit |
2997f0 |
errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
|
|
Packit |
2997f0 |
return Conf::CONF_INVALID;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
consume_cb_ = consume_cb;
|
|
Packit |
2997f0 |
return Conf::CONF_OK;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
ConsumeCb *consume_cb_;
|
|
Packit |
2997f0 |
DeliveryReportCb *dr_cb_;
|
|
Packit |
2997f0 |
EventCb *event_cb_;
|
|
Packit |
2997f0 |
SocketCb *socket_cb_;
|
|
Packit |
2997f0 |
OpenCb *open_cb_;
|
|
Packit |
2997f0 |
PartitionerCb *partitioner_cb_;
|
|
Packit |
2997f0 |
PartitionerKeyPointerCb *partitioner_kp_cb_;
|
|
Packit |
2997f0 |
RebalanceCb *rebalance_cb_;
|
|
Packit |
2997f0 |
OffsetCommitCb *offset_commit_cb_;
|
|
Packit |
2997f0 |
ConfType conf_type_;
|
|
Packit |
2997f0 |
rd_kafka_conf_t *rk_conf_;
|
|
Packit |
2997f0 |
rd_kafka_topic_conf_t *rkt_conf_;
|
|
Packit |
2997f0 |
};
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
class HandleImpl : virtual public Handle {
|
|
Packit |
2997f0 |
public:
|
|
Packit |
2997f0 |
~HandleImpl() {};
|
|
Packit |
2997f0 |
HandleImpl () {};
|
|
Packit |
2997f0 |
const std::string name () const { return std::string(rd_kafka_name(rk_)); };
|
|
Packit |
2997f0 |
const std::string memberid () const {
|
|
Packit |
2997f0 |
char *str = rd_kafka_memberid(rk_);
|
|
Packit |
2997f0 |
std::string memberid = str ? str : "";
|
|
Packit |
2997f0 |
if (str)
|
|
Packit |
2997f0 |
rd_kafka_mem_free(rk_, str);
|
|
Packit |
2997f0 |
return memberid;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
int poll (int timeout_ms) { return rd_kafka_poll(rk_, timeout_ms); };
|
|
Packit |
2997f0 |
int outq_len () { return rd_kafka_outq_len(rk_); };
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
void set_common_config (RdKafka::ConfImpl *confimpl);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
RdKafka::ErrorCode metadata (bool all_topics,const Topic *only_rkt,
|
|
Packit |
2997f0 |
Metadata **metadatap, int timeout_ms);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
ErrorCode pause (std::vector<TopicPartition*> &partitions);
|
|
Packit |
2997f0 |
ErrorCode resume (std::vector<TopicPartition*> &partitions);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
ErrorCode query_watermark_offsets (const std::string &topic,
|
|
Packit |
2997f0 |
int32_t partition,
|
|
Packit |
2997f0 |
int64_t *low, int64_t *high,
|
|
Packit |
2997f0 |
int timeout_ms) {
|
|
Packit |
2997f0 |
return static_cast<RdKafka::ErrorCode>(
|
|
Packit |
2997f0 |
rd_kafka_query_watermark_offsets(
|
|
Packit |
2997f0 |
rk_, topic.c_str(), partition,
|
|
Packit |
2997f0 |
low, high, timeout_ms));
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
ErrorCode get_watermark_offsets (const std::string &topic,
|
|
Packit |
2997f0 |
int32_t partition,
|
|
Packit |
2997f0 |
int64_t *low, int64_t *high) {
|
|
Packit |
2997f0 |
return static_cast<RdKafka::ErrorCode>(
|
|
Packit |
2997f0 |
rd_kafka_get_watermark_offsets(
|
|
Packit |
2997f0 |
rk_, topic.c_str(), partition,
|
|
Packit |
2997f0 |
low, high));
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
Queue *get_partition_queue (const TopicPartition *partition);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
ErrorCode offsetsForTimes (std::vector<TopicPartition*> &offsets,
|
|
Packit |
2997f0 |
int timeout_ms) {
|
|
Packit |
2997f0 |
rd_kafka_topic_partition_list_t *c_offsets = partitions_to_c_parts(offsets);
|
|
Packit |
2997f0 |
ErrorCode err = static_cast<ErrorCode>(
|
|
Packit |
2997f0 |
rd_kafka_offsets_for_times(rk_, c_offsets, timeout_ms));
|
|
Packit |
2997f0 |
update_partitions_from_c_parts(offsets, c_offsets);
|
|
Packit |
2997f0 |
rd_kafka_topic_partition_list_destroy(c_offsets);
|
|
Packit |
2997f0 |
return err;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
ErrorCode set_log_queue (Queue *queue);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
void yield () {
|
|
Packit |
2997f0 |
rd_kafka_yield(rk_);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
const std::string clusterid (int timeout_ms) {
|
|
Packit |
2997f0 |
char *str = rd_kafka_clusterid(rk_, timeout_ms);
|
|
Packit |
2997f0 |
std::string clusterid = str ? str : "";
|
|
Packit |
2997f0 |
if (str)
|
|
Packit |
2997f0 |
rd_kafka_mem_free(rk_, str);
|
|
Packit |
2997f0 |
return clusterid;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
struct rd_kafka_s *c_ptr () {
|
|
Packit |
2997f0 |
return rk_;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
rd_kafka_t *rk_;
|
|
Packit |
2997f0 |
/* All Producer and Consumer callbacks must reside in HandleImpl and
|
|
Packit |
2997f0 |
* the opaque provided to rdkafka must be a pointer to HandleImpl, since
|
|
Packit |
2997f0 |
* ProducerImpl and ConsumerImpl classes cannot be safely directly cast to
|
|
Packit |
2997f0 |
* HandleImpl due to the skewed diamond inheritance. */
|
|
Packit |
2997f0 |
ConsumeCb *consume_cb_;
|
|
Packit |
2997f0 |
EventCb *event_cb_;
|
|
Packit |
2997f0 |
SocketCb *socket_cb_;
|
|
Packit |
2997f0 |
OpenCb *open_cb_;
|
|
Packit |
2997f0 |
DeliveryReportCb *dr_cb_;
|
|
Packit |
2997f0 |
PartitionerCb *partitioner_cb_;
|
|
Packit |
2997f0 |
PartitionerKeyPointerCb *partitioner_kp_cb_;
|
|
Packit |
2997f0 |
RebalanceCb *rebalance_cb_;
|
|
Packit |
2997f0 |
OffsetCommitCb *offset_commit_cb_;
|
|
Packit |
2997f0 |
};
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
class TopicImpl : public Topic {
|
|
Packit |
2997f0 |
public:
|
|
Packit |
2997f0 |
~TopicImpl () {
|
|
Packit |
2997f0 |
rd_kafka_topic_destroy(rkt_);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
const std::string name () const {
|
|
Packit |
2997f0 |
return rd_kafka_topic_name(rkt_);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
bool partition_available (int32_t partition) const {
|
|
Packit |
2997f0 |
return !!rd_kafka_topic_partition_available(rkt_, partition);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
ErrorCode offset_store (int32_t partition, int64_t offset) {
|
|
Packit |
2997f0 |
return static_cast<RdKafka::ErrorCode>(
|
|
Packit |
2997f0 |
rd_kafka_offset_store(rkt_, partition, offset));
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
static Topic *create (Handle &base, const std::string &topic,
|
|
Packit |
2997f0 |
Conf *conf);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
struct rd_kafka_topic_s *c_ptr () {
|
|
Packit |
2997f0 |
return rkt_;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
rd_kafka_topic_t *rkt_;
|
|
Packit |
2997f0 |
PartitionerCb *partitioner_cb_;
|
|
Packit |
2997f0 |
PartitionerKeyPointerCb *partitioner_kp_cb_;
|
|
Packit |
2997f0 |
};
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/**
|
|
Packit |
2997f0 |
* Topic and Partition
|
|
Packit |
2997f0 |
*/
|
|
Packit |
2997f0 |
class TopicPartitionImpl : public TopicPartition {
|
|
Packit |
2997f0 |
public:
|
|
Packit |
2997f0 |
~TopicPartitionImpl() {};
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
static TopicPartition *create (const std::string &topic, int partition);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
TopicPartitionImpl (const std::string &topic, int partition):
|
|
Packit |
2997f0 |
topic_(topic), partition_(partition), offset_(RdKafka::Topic::OFFSET_INVALID),
|
|
Packit |
2997f0 |
err_(ERR_NO_ERROR) {}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
TopicPartitionImpl (const std::string &topic, int partition, int64_t offset):
|
|
Packit |
2997f0 |
topic_(topic), partition_(partition), offset_(offset),
|
|
Packit |
2997f0 |
err_(ERR_NO_ERROR) {}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
TopicPartitionImpl (const rd_kafka_topic_partition_t *c_part) {
|
|
Packit |
2997f0 |
topic_ = std::string(c_part->topic);
|
|
Packit |
2997f0 |
partition_ = c_part->partition;
|
|
Packit |
2997f0 |
offset_ = c_part->offset;
|
|
Packit |
2997f0 |
err_ = static_cast<ErrorCode>(c_part->err);
|
|
Packit |
2997f0 |
// FIXME: metadata
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
static void destroy (std::vector<TopicPartition*> &partitions);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
int partition () const { return partition_; }
|
|
Packit |
2997f0 |
const std::string &topic () const { return topic_ ; }
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
int64_t offset () const { return offset_; }
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
ErrorCode err () const { return err_; }
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
void set_offset (int64_t offset) { offset_ = offset; }
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
std::ostream& operator<<(std::ostream &ostrm) const {
|
|
Packit |
2997f0 |
return ostrm << topic_ << " [" << partition_ << "]";
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
std::string topic_;
|
|
Packit |
2997f0 |
int partition_;
|
|
Packit |
2997f0 |
int64_t offset_;
|
|
Packit |
2997f0 |
ErrorCode err_;
|
|
Packit |
2997f0 |
};
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
class KafkaConsumerImpl : virtual public KafkaConsumer, virtual public HandleImpl {
|
|
Packit |
2997f0 |
public:
|
|
Packit |
2997f0 |
~KafkaConsumerImpl () {
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
static KafkaConsumer *create (Conf *conf, std::string &errstr);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
ErrorCode assignment (std::vector<TopicPartition*> &partitions);
|
|
Packit |
2997f0 |
ErrorCode subscription (std::vector<std::string> &topics);
|
|
Packit |
2997f0 |
ErrorCode subscribe (const std::vector<std::string> &topics);
|
|
Packit |
2997f0 |
ErrorCode unsubscribe ();
|
|
Packit |
2997f0 |
ErrorCode assign (const std::vector<TopicPartition*> &partitions);
|
|
Packit |
2997f0 |
ErrorCode unassign ();
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
Message *consume (int timeout_ms);
|
|
Packit |
2997f0 |
ErrorCode commitSync () {
|
|
Packit |
2997f0 |
return static_cast<ErrorCode>(rd_kafka_commit(rk_, NULL, 0/*sync*/));
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
ErrorCode commitAsync () {
|
|
Packit |
2997f0 |
return static_cast<ErrorCode>(rd_kafka_commit(rk_, NULL, 1/*async*/));
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
ErrorCode commitSync (Message *message) {
|
|
Packit |
2997f0 |
MessageImpl *msgimpl = dynamic_cast<MessageImpl*>(message);
|
|
Packit |
2997f0 |
return static_cast<ErrorCode>(
|
|
Packit |
2997f0 |
rd_kafka_commit_message(rk_, msgimpl->rkmessage_, 0/*sync*/));
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
ErrorCode commitAsync (Message *message) {
|
|
Packit |
2997f0 |
MessageImpl *msgimpl = dynamic_cast<MessageImpl*>(message);
|
|
Packit |
2997f0 |
return static_cast<ErrorCode>(
|
|
Packit |
2997f0 |
rd_kafka_commit_message(rk_, msgimpl->rkmessage_,1/*async*/));
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
ErrorCode commitSync (std::vector<TopicPartition*> &offsets) {
|
|
Packit |
2997f0 |
rd_kafka_topic_partition_list_t *c_parts =
|
|
Packit |
2997f0 |
partitions_to_c_parts(offsets);
|
|
Packit |
2997f0 |
rd_kafka_resp_err_t err =
|
|
Packit |
2997f0 |
rd_kafka_commit(rk_, c_parts, 0);
|
|
Packit |
2997f0 |
if (!err)
|
|
Packit |
2997f0 |
update_partitions_from_c_parts(offsets, c_parts);
|
|
Packit |
2997f0 |
rd_kafka_topic_partition_list_destroy(c_parts);
|
|
Packit |
2997f0 |
return static_cast<ErrorCode>(err);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
ErrorCode commitAsync (const std::vector<TopicPartition*> &offsets) {
|
|
Packit |
2997f0 |
rd_kafka_topic_partition_list_t *c_parts =
|
|
Packit |
2997f0 |
partitions_to_c_parts(offsets);
|
|
Packit |
2997f0 |
rd_kafka_resp_err_t err =
|
|
Packit |
2997f0 |
rd_kafka_commit(rk_, c_parts, 1);
|
|
Packit |
2997f0 |
rd_kafka_topic_partition_list_destroy(c_parts);
|
|
Packit |
2997f0 |
return static_cast<ErrorCode>(err);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
ErrorCode commitSync (OffsetCommitCb *offset_commit_cb) {
|
|
Packit |
2997f0 |
return static_cast<ErrorCode>(
|
|
Packit |
2997f0 |
rd_kafka_commit_queue(rk_, NULL, NULL,
|
|
Packit |
2997f0 |
RdKafka::offset_commit_cb_trampoline0,
|
|
Packit |
2997f0 |
offset_commit_cb));
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
ErrorCode commitSync (std::vector<TopicPartition*> &offsets,
|
|
Packit |
2997f0 |
OffsetCommitCb *offset_commit_cb) {
|
|
Packit |
2997f0 |
rd_kafka_topic_partition_list_t *c_parts =
|
|
Packit |
2997f0 |
partitions_to_c_parts(offsets);
|
|
Packit |
2997f0 |
rd_kafka_resp_err_t err =
|
|
Packit |
2997f0 |
rd_kafka_commit_queue(rk_, c_parts, NULL,
|
|
Packit |
2997f0 |
RdKafka::offset_commit_cb_trampoline0,
|
|
Packit |
2997f0 |
offset_commit_cb);
|
|
Packit |
2997f0 |
rd_kafka_topic_partition_list_destroy(c_parts);
|
|
Packit |
2997f0 |
return static_cast<ErrorCode>(err);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
ErrorCode committed (std::vector<TopicPartition*> &partitions, int timeout_ms);
|
|
Packit |
2997f0 |
ErrorCode position (std::vector<TopicPartition*> &partitions);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
ErrorCode close ();
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
ErrorCode seek (const TopicPartition &partition, int timeout_ms);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
ErrorCode offsets_store (std::vector<TopicPartition*> &offsets) {
|
|
Packit |
2997f0 |
rd_kafka_topic_partition_list_t *c_parts =
|
|
Packit |
2997f0 |
partitions_to_c_parts(offsets);
|
|
Packit |
2997f0 |
rd_kafka_resp_err_t err =
|
|
Packit |
2997f0 |
rd_kafka_offsets_store(rk_, c_parts);
|
|
Packit |
2997f0 |
update_partitions_from_c_parts(offsets, c_parts);
|
|
Packit |
2997f0 |
rd_kafka_topic_partition_list_destroy(c_parts);
|
|
Packit |
2997f0 |
return static_cast<ErrorCode>(err);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
};
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
class MetadataImpl : public Metadata {
|
|
Packit |
2997f0 |
public:
|
|
Packit |
2997f0 |
MetadataImpl(const rd_kafka_metadata_t *metadata);
|
|
Packit |
2997f0 |
~MetadataImpl();
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
const std::vector<const BrokerMetadata *> *brokers() const {
|
|
Packit |
2997f0 |
return &brokers_;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
const std::vector<const TopicMetadata *> *topics() const {
|
|
Packit |
2997f0 |
return &topics_;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
const std::string orig_broker_name() const {
|
|
Packit |
2997f0 |
return std::string(metadata_->orig_broker_name);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
int32_t orig_broker_id() const {
|
|
Packit |
2997f0 |
return metadata_->orig_broker_id;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
private:
|
|
Packit |
2997f0 |
const rd_kafka_metadata_t *metadata_;
|
|
Packit |
2997f0 |
std::vector<const BrokerMetadata *> brokers_;
|
|
Packit |
2997f0 |
std::vector<const TopicMetadata *> topics_;
|
|
Packit |
2997f0 |
std::string orig_broker_name_;
|
|
Packit |
2997f0 |
};
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
class QueueImpl : virtual public Queue {
|
|
Packit |
2997f0 |
public:
|
|
Packit |
2997f0 |
~QueueImpl () {
|
|
Packit |
2997f0 |
rd_kafka_queue_destroy(queue_);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
static Queue *create (Handle *base);
|
|
Packit |
2997f0 |
ErrorCode forward (Queue *queue);
|
|
Packit |
2997f0 |
Message *consume (int timeout_ms);
|
|
Packit |
2997f0 |
int poll (int timeout_ms);
|
|
Packit |
2997f0 |
void io_event_enable(int fd, const void *payload, size_t size);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
rd_kafka_queue_t *queue_;
|
|
Packit |
2997f0 |
};
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
class ConsumerImpl : virtual public Consumer, virtual public HandleImpl {
|
|
Packit |
2997f0 |
public:
|
|
Packit |
2997f0 |
~ConsumerImpl () {
|
|
Packit |
2997f0 |
rd_kafka_destroy(rk_); };
|
|
Packit |
2997f0 |
static Consumer *create (Conf *conf, std::string &errstr);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
ErrorCode start (Topic *topic, int32_t partition, int64_t offset);
|
|
Packit |
2997f0 |
ErrorCode start (Topic *topic, int32_t partition, int64_t offset,
|
|
Packit |
2997f0 |
Queue *queue);
|
|
Packit |
2997f0 |
ErrorCode stop (Topic *topic, int32_t partition);
|
|
Packit |
2997f0 |
ErrorCode seek (Topic *topic, int32_t partition, int64_t offset,
|
|
Packit |
2997f0 |
int timeout_ms);
|
|
Packit |
2997f0 |
Message *consume (Topic *topic, int32_t partition, int timeout_ms);
|
|
Packit |
2997f0 |
Message *consume (Queue *queue, int timeout_ms);
|
|
Packit |
2997f0 |
int consume_callback (Topic *topic, int32_t partition, int timeout_ms,
|
|
Packit |
2997f0 |
ConsumeCb *cb, void *opaque);
|
|
Packit |
2997f0 |
int consume_callback (Queue *queue, int timeout_ms,
|
|
Packit |
2997f0 |
RdKafka::ConsumeCb *consume_cb, void *opaque);
|
|
Packit |
2997f0 |
};
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
class ProducerImpl : virtual public Producer, virtual public HandleImpl {
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
public:
|
|
Packit |
2997f0 |
~ProducerImpl () { if (rk_) rd_kafka_destroy(rk_); };
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
ErrorCode produce (Topic *topic, int32_t partition,
|
|
Packit |
2997f0 |
int msgflags,
|
|
Packit |
2997f0 |
void *payload, size_t len,
|
|
Packit |
2997f0 |
const std::string *key,
|
|
Packit |
2997f0 |
void *msg_opaque);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
ErrorCode produce (Topic *topic, int32_t partition,
|
|
Packit |
2997f0 |
int msgflags,
|
|
Packit |
2997f0 |
void *payload, size_t len,
|
|
Packit |
2997f0 |
const void *key, size_t key_len,
|
|
Packit |
2997f0 |
void *msg_opaque);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
ErrorCode produce (Topic *topic, int32_t partition,
|
|
Packit |
2997f0 |
const std::vector<char> *payload,
|
|
Packit |
2997f0 |
const std::vector<char> *key,
|
|
Packit |
2997f0 |
void *msg_opaque);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
ErrorCode produce (const std::string topic_name, int32_t partition,
|
|
Packit |
2997f0 |
int msgflags,
|
|
Packit |
2997f0 |
void *payload, size_t len,
|
|
Packit |
2997f0 |
const void *key, size_t key_len,
|
|
Packit |
2997f0 |
int64_t timestamp,
|
|
Packit |
2997f0 |
void *msg_opaque);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
ErrorCode flush (int timeout_ms) {
|
|
Packit |
2997f0 |
return static_cast<RdKafka::ErrorCode>(rd_kafka_flush(rk_,
|
|
Packit |
2997f0 |
timeout_ms));
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
static Producer *create (Conf *conf, std::string &errstr);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
};
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
#endif /* _RDKAFKACPP_INT_H_ */
|