Blob Blame History Raw
/*
 * librdkafka - Apache Kafka C/C++ library
 *
 * Copyright (c) 2014 Magnus Edenhill
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *
 * 1. Redistributions of source code must retain the above copyright notice,
 *    this list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright notice,
 *    this list of conditions and the following disclaimer in the documentation
 *    and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */

#include <iostream>
#include <string>
#include <list>
#include <cerrno>

#include "rdkafkacpp_int.h"


RdKafka::Producer::~Producer () {

}

static void dr_msg_cb_trampoline (rd_kafka_t *rk,
                                  const rd_kafka_message_t *
                                  rkmessage,
                                  void *opaque) {
  RdKafka::HandleImpl *handle = static_cast<RdKafka::HandleImpl *>(opaque);
  RdKafka::MessageImpl message(NULL, (rd_kafka_message_t *)rkmessage, false);
  handle->dr_cb_->dr_cb(message);
}



RdKafka::Producer *RdKafka::Producer::create (RdKafka::Conf *conf,
                                              std::string &errstr) {
  char errbuf[512];
  RdKafka::ConfImpl *confimpl = dynamic_cast<RdKafka::ConfImpl *>(conf);
  RdKafka::ProducerImpl *rkp = new RdKafka::ProducerImpl();
  rd_kafka_conf_t *rk_conf = NULL;

  if (confimpl) {
    if (!confimpl->rk_conf_) {
      errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
      delete rkp;
      return NULL;
    }

    rkp->set_common_config(confimpl);

    rk_conf = rd_kafka_conf_dup(confimpl->rk_conf_);

    if (confimpl->dr_cb_) {
      rd_kafka_conf_set_dr_msg_cb(rk_conf, dr_msg_cb_trampoline);
      rkp->dr_cb_ = confimpl->dr_cb_;
    }
  }


  rd_kafka_t *rk;
  if (!(rk = rd_kafka_new(RD_KAFKA_PRODUCER, rk_conf,
                          errbuf, sizeof(errbuf)))) {
    errstr = errbuf;
    delete rkp;
    return NULL;
  }

  rkp->rk_ = rk;

  return rkp;
}


RdKafka::ErrorCode RdKafka::ProducerImpl::produce (RdKafka::Topic *topic,
                                                   int32_t partition,
                                                   int msgflags,
                                                   void *payload, size_t len,
                                                   const std::string *key,
                                                   void *msg_opaque) {
  RdKafka::TopicImpl *topicimpl = dynamic_cast<RdKafka::TopicImpl *>(topic);

  if (rd_kafka_produce(topicimpl->rkt_, partition, msgflags,
                       payload, len,
                       key ? key->c_str() : NULL, key ? key->size() : 0,
                       msg_opaque) == -1)
    return static_cast<RdKafka::ErrorCode>(rd_kafka_last_error());

  return RdKafka::ERR_NO_ERROR;
}


RdKafka::ErrorCode RdKafka::ProducerImpl::produce (RdKafka::Topic *topic,
                                                   int32_t partition,
                                                   int msgflags,
                                                   void *payload, size_t len,
                                                   const void *key,
                                                   size_t key_len,
                                                   void *msg_opaque) {
  RdKafka::TopicImpl *topicimpl = dynamic_cast<RdKafka::TopicImpl *>(topic);

  if (rd_kafka_produce(topicimpl->rkt_, partition, msgflags,
                       payload, len, key, key_len,
                       msg_opaque) == -1)
    return static_cast<RdKafka::ErrorCode>(rd_kafka_last_error());

  return RdKafka::ERR_NO_ERROR;
}


RdKafka::ErrorCode
RdKafka::ProducerImpl::produce (RdKafka::Topic *topic,
                                int32_t partition,
                                const std::vector<char> *payload,
                                const std::vector<char> *key,
                                void *msg_opaque) {
  RdKafka::TopicImpl *topicimpl = dynamic_cast<RdKafka::TopicImpl *>(topic);

  if (rd_kafka_produce(topicimpl->rkt_, partition, RD_KAFKA_MSG_F_COPY,
                       payload ? (void *)&(*payload)[0] : NULL,
                       payload ? payload->size() : 0,
                       key ? &(*key)[0] : NULL, key ? key->size() : 0,
                       msg_opaque) == -1)
    return static_cast<RdKafka::ErrorCode>(rd_kafka_last_error());

  return RdKafka::ERR_NO_ERROR;

}


RdKafka::ErrorCode
RdKafka::ProducerImpl::produce (const std::string topic_name,
                                int32_t partition, int msgflags,
                                void *payload, size_t len,
                                const void *key, size_t key_len,
                                int64_t timestamp,
                                void *msg_opaque) {
  return
    static_cast<RdKafka::ErrorCode>
    (
     rd_kafka_producev(rk_,
                       RD_KAFKA_V_TOPIC(topic_name.c_str()),
                       RD_KAFKA_V_PARTITION(partition),
                       RD_KAFKA_V_MSGFLAGS(msgflags),
                       RD_KAFKA_V_VALUE(payload, len),
                       RD_KAFKA_V_KEY(key, key_len),
                       RD_KAFKA_V_TIMESTAMP(timestamp),
                       RD_KAFKA_V_OPAQUE(msg_opaque),
                       RD_KAFKA_V_END)
     );
}