Blob Blame History Raw
/*
 * librdkafka - Apache Kafka C/C++ library
 *
 * Copyright (c) 2014 Magnus Edenhill
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *
 * 1. Redistributions of source code must retain the above copyright notice,
 *    this list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright notice,
 *    this list of conditions and the following disclaimer in the documentation
 *    and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */

#include <iostream>
#include <string>
#include <list>

#include "rdkafkacpp_int.h"

void RdKafka::consume_cb_trampoline(rd_kafka_message_t *msg, void *opaque) {
  RdKafka::HandleImpl *handle = static_cast<RdKafka::HandleImpl *>(opaque);
  RdKafka::Topic* topic = static_cast<Topic *>(rd_kafka_topic_opaque(msg->rkt));

  RdKafka::MessageImpl message(topic, msg, false /*don't free*/);

  handle->consume_cb_->consume_cb(message, opaque);
}

void RdKafka::log_cb_trampoline (const rd_kafka_t *rk, int level,
                                 const char *fac, const char *buf) {
  if (!rk) {
    rd_kafka_log_print(rk, level, fac, buf);
    return;
  }

  void *opaque = rd_kafka_opaque(rk);
  RdKafka::HandleImpl *handle = static_cast<RdKafka::HandleImpl *>(opaque);

  if (!handle->event_cb_) {
    rd_kafka_log_print(rk, level, fac, buf);
    return;
  }

  RdKafka::EventImpl event(RdKafka::Event::EVENT_LOG,
                           RdKafka::ERR_NO_ERROR,
                           static_cast<RdKafka::Event::Severity>(level),
                           fac, buf);

  handle->event_cb_->event_cb(event);
}


void RdKafka::error_cb_trampoline (rd_kafka_t *rk, int err,
                                   const char *reason, void *opaque) {
  RdKafka::HandleImpl *handle = static_cast<RdKafka::HandleImpl *>(opaque);

  RdKafka::EventImpl event(RdKafka::Event::EVENT_ERROR,
                           static_cast<RdKafka::ErrorCode>(err),
                           RdKafka::Event::EVENT_SEVERITY_ERROR,
                           NULL,
                           reason);

  handle->event_cb_->event_cb(event);
}


void RdKafka::throttle_cb_trampoline (rd_kafka_t *rk, const char *broker_name,
				      int32_t broker_id,
				      int throttle_time_ms,
				      void *opaque) {
  RdKafka::HandleImpl *handle = static_cast<RdKafka::HandleImpl *>(opaque);

  RdKafka::EventImpl event(RdKafka::Event::EVENT_THROTTLE);
  event.str_ = broker_name;
  event.id_ = broker_id;
  event.throttle_time_ = throttle_time_ms;

  handle->event_cb_->event_cb(event);
}


int RdKafka::stats_cb_trampoline (rd_kafka_t *rk, char *json, size_t json_len,
                                  void *opaque) {
  RdKafka::HandleImpl *handle = static_cast<RdKafka::HandleImpl *>(opaque);

  RdKafka::EventImpl event(RdKafka::Event::EVENT_STATS,
                           RdKafka::ERR_NO_ERROR,
                           RdKafka::Event::EVENT_SEVERITY_INFO,
                           NULL, json);

  handle->event_cb_->event_cb(event);

  return 0;
}


int RdKafka::socket_cb_trampoline (int domain, int type, int protocol,
                                   void *opaque) {
  RdKafka::HandleImpl *handle = static_cast<RdKafka::HandleImpl *>(opaque);

  return handle->socket_cb_->socket_cb(domain, type, protocol);
}

int RdKafka::open_cb_trampoline (const char *pathname, int flags, mode_t mode,
                                 void *opaque) {
  RdKafka::HandleImpl *handle = static_cast<RdKafka::HandleImpl *>(opaque);

  return handle->open_cb_->open_cb(pathname, flags, static_cast<int>(mode));
}

RdKafka::ErrorCode RdKafka::HandleImpl::metadata (bool all_topics,
                                                  const Topic *only_rkt,
                                                  Metadata **metadatap, 
                                                  int timeout_ms) {

  const rd_kafka_metadata_t *cmetadatap=NULL;

  rd_kafka_topic_t *topic = only_rkt ? 
    static_cast<const TopicImpl *>(only_rkt)->rkt_ : NULL;

  const rd_kafka_resp_err_t rc = rd_kafka_metadata(rk_, all_topics, topic,
                                                   &cmetadatap,timeout_ms);

  *metadatap = (rc == RD_KAFKA_RESP_ERR_NO_ERROR) ? 
    new RdKafka::MetadataImpl(cmetadatap) : NULL;

  return static_cast<RdKafka::ErrorCode>(rc);
}

/**
 * Convert a list of C partitions to C++ partitions
 */
static void c_parts_to_partitions (const rd_kafka_topic_partition_list_t
                                   *c_parts,
                                   std::vector<RdKafka::TopicPartition*>
                                   &partitions) {
  partitions.resize(c_parts->cnt);
  for (int i = 0 ; i < c_parts->cnt ; i++)
    partitions[i] = new RdKafka::TopicPartitionImpl(&c_parts->elems[i]);
}

static void free_partition_vector (std::vector<RdKafka::TopicPartition*> &v) {
  for (unsigned int i = 0 ; i < v.size() ; i++)
    delete v[i];
  v.clear();
}

void
RdKafka::rebalance_cb_trampoline (rd_kafka_t *rk,
                                  rd_kafka_resp_err_t err,
                                  rd_kafka_topic_partition_list_t *c_partitions,
                                  void *opaque) {
  RdKafka::HandleImpl *handle = static_cast<RdKafka::HandleImpl *>(opaque);
  std::vector<RdKafka::TopicPartition*> partitions;

  c_parts_to_partitions(c_partitions, partitions);

  handle->rebalance_cb_->rebalance_cb(
				      dynamic_cast<RdKafka::KafkaConsumer*>(handle),
				      static_cast<RdKafka::ErrorCode>(err),
				      partitions);

  free_partition_vector(partitions);
}


void
RdKafka::offset_commit_cb_trampoline0 (
    rd_kafka_t *rk,
    rd_kafka_resp_err_t err,
    rd_kafka_topic_partition_list_t *c_offsets, void *opaque) {
  OffsetCommitCb *cb = static_cast<RdKafka::OffsetCommitCb *>(opaque);
  std::vector<RdKafka::TopicPartition*> offsets;

  if (c_offsets)
    c_parts_to_partitions(c_offsets, offsets);

  cb->offset_commit_cb(static_cast<RdKafka::ErrorCode>(err), offsets);

  free_partition_vector(offsets);
}

static void
offset_commit_cb_trampoline (
    rd_kafka_t *rk,
    rd_kafka_resp_err_t err,
    rd_kafka_topic_partition_list_t *c_offsets, void *opaque) {
  RdKafka::HandleImpl *handle = static_cast<RdKafka::HandleImpl *>(opaque);
  RdKafka::offset_commit_cb_trampoline0(rk, err, c_offsets,
                                        handle->offset_commit_cb_);
}


void RdKafka::HandleImpl::set_common_config (RdKafka::ConfImpl *confimpl) {

  rd_kafka_conf_set_opaque(confimpl->rk_conf_, this);

  if (confimpl->event_cb_) {
    rd_kafka_conf_set_log_cb(confimpl->rk_conf_,
                             RdKafka::log_cb_trampoline);
    rd_kafka_conf_set_error_cb(confimpl->rk_conf_,
                               RdKafka::error_cb_trampoline);
    rd_kafka_conf_set_throttle_cb(confimpl->rk_conf_,
				  RdKafka::throttle_cb_trampoline);
    rd_kafka_conf_set_stats_cb(confimpl->rk_conf_,
                               RdKafka::stats_cb_trampoline);
    event_cb_ = confimpl->event_cb_;
  }

  if (confimpl->socket_cb_) {
    rd_kafka_conf_set_socket_cb(confimpl->rk_conf_,
                                RdKafka::socket_cb_trampoline);
    socket_cb_ = confimpl->socket_cb_;
  }

  if (confimpl->open_cb_) {
#ifndef _MSC_VER
    rd_kafka_conf_set_open_cb(confimpl->rk_conf_, RdKafka::open_cb_trampoline);
    open_cb_ = confimpl->open_cb_;
#endif
  }

  if (confimpl->rebalance_cb_) {
    rd_kafka_conf_set_rebalance_cb(confimpl->rk_conf_,
                                   RdKafka::rebalance_cb_trampoline);
    rebalance_cb_ = confimpl->rebalance_cb_;
  }

  if (confimpl->offset_commit_cb_) {
    rd_kafka_conf_set_offset_commit_cb(confimpl->rk_conf_,
                                       offset_commit_cb_trampoline);
    offset_commit_cb_ = confimpl->offset_commit_cb_;
  }

  if (confimpl->consume_cb_) {
    rd_kafka_conf_set_consume_cb(confimpl->rk_conf_,
                                 RdKafka::consume_cb_trampoline);
    consume_cb_ = confimpl->consume_cb_;
  }

}


RdKafka::ErrorCode
RdKafka::HandleImpl::pause (std::vector<RdKafka::TopicPartition*> &partitions) {
  rd_kafka_topic_partition_list_t *c_parts;
  rd_kafka_resp_err_t err;

  c_parts = partitions_to_c_parts(partitions);

  err = rd_kafka_pause_partitions(rk_, c_parts);

  if (!err)
    update_partitions_from_c_parts(partitions, c_parts);

  rd_kafka_topic_partition_list_destroy(c_parts);

  return static_cast<RdKafka::ErrorCode>(err);
}


RdKafka::ErrorCode
RdKafka::HandleImpl::resume (std::vector<RdKafka::TopicPartition*> &partitions) {
  rd_kafka_topic_partition_list_t *c_parts;
  rd_kafka_resp_err_t err;

  c_parts = partitions_to_c_parts(partitions);

  err = rd_kafka_resume_partitions(rk_, c_parts);

  if (!err)
    update_partitions_from_c_parts(partitions, c_parts);

  rd_kafka_topic_partition_list_destroy(c_parts);

  return static_cast<RdKafka::ErrorCode>(err);
}

RdKafka::Queue *
RdKafka::HandleImpl::get_partition_queue (const TopicPartition *part) {
  rd_kafka_queue_t *rkqu;
  rkqu = rd_kafka_queue_get_partition(rk_,
                                      part->topic().c_str(),
                                      part->partition());

  if (rkqu == NULL)
    return NULL;

  RdKafka::QueueImpl *queueimpl = new RdKafka::QueueImpl;
  queueimpl->queue_ = rkqu;

  return queueimpl;
}

RdKafka::ErrorCode
RdKafka::HandleImpl::set_log_queue (RdKafka::Queue *queue) {
        rd_kafka_queue_t *rkqu = NULL;
        if (queue) {
                QueueImpl *queueimpl = dynamic_cast<QueueImpl *>(queue);
                rkqu = queueimpl->queue_;
        }
        return static_cast<RdKafka::ErrorCode>(
                rd_kafka_set_log_queue(rk_, rkqu));
}

namespace RdKafka {

rd_kafka_topic_partition_list_t *
partitions_to_c_parts (const std::vector<RdKafka::TopicPartition*> &partitions){
  rd_kafka_topic_partition_list_t *c_parts;

  c_parts = rd_kafka_topic_partition_list_new((int)partitions.size());

  for (unsigned int i = 0 ; i < partitions.size() ; i++) {
    const RdKafka::TopicPartitionImpl *tpi =
        dynamic_cast<const RdKafka::TopicPartitionImpl*>(partitions[i]);
    rd_kafka_topic_partition_t *rktpar =
      rd_kafka_topic_partition_list_add(c_parts,
					tpi->topic_.c_str(), tpi->partition_);
    rktpar->offset = tpi->offset_;
  }

  return c_parts;
}


/**
 * @brief Update the application provided 'partitions' with info from 'c_parts'
 */
void
update_partitions_from_c_parts (std::vector<RdKafka::TopicPartition*> &partitions,
				const rd_kafka_topic_partition_list_t *c_parts) {
  for (int i = 0 ; i < c_parts->cnt ; i++) {
    rd_kafka_topic_partition_t *p = &c_parts->elems[i];

    /* Find corresponding C++ entry */
    for (unsigned int j = 0 ; j < partitions.size() ; j++) {
      RdKafka::TopicPartitionImpl *pp =
	dynamic_cast<RdKafka::TopicPartitionImpl*>(partitions[j]);
      if (!strcmp(p->topic, pp->topic_.c_str()) &&
	  p->partition == pp->partition_) {
	pp->offset_ = p->offset;
	pp->err_ = static_cast<RdKafka::ErrorCode>(p->err);
      }
    }
  }
}

};