|
Packit |
2997f0 |
/*
|
|
Packit |
2997f0 |
* Copyright (c) 2015, Confluent Inc
|
|
Packit |
2997f0 |
* All rights reserved.
|
|
Packit |
2997f0 |
*
|
|
Packit |
2997f0 |
* Redistribution and use in source and binary forms, with or without
|
|
Packit |
2997f0 |
* modification, are permitted provided that the following conditions are met:
|
|
Packit |
2997f0 |
*
|
|
Packit |
2997f0 |
* 1. Redistributions of source code must retain the above copyright notice,
|
|
Packit |
2997f0 |
* this list of conditions and the following disclaimer.
|
|
Packit |
2997f0 |
* 2. Redistributions in binary form must reproduce the above copyright notice,
|
|
Packit |
2997f0 |
* this list of conditions and the following disclaimer in the documentation
|
|
Packit |
2997f0 |
* and/or other materials provided with the distribution.
|
|
Packit |
2997f0 |
*
|
|
Packit |
2997f0 |
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
|
Packit |
2997f0 |
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
|
Packit |
2997f0 |
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
|
Packit |
2997f0 |
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
|
|
Packit |
2997f0 |
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
|
|
Packit |
2997f0 |
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
|
|
Packit |
2997f0 |
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
|
Packit |
2997f0 |
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
|
|
Packit |
2997f0 |
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
|
|
Packit |
2997f0 |
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
|
Packit |
2997f0 |
* POSSIBILITY OF SUCH DAMAGE.
|
|
Packit |
2997f0 |
*/
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/**
|
|
Packit |
2997f0 |
* librdkafka version of the Java VerifiableProducer and VerifiableConsumer
|
|
Packit |
2997f0 |
* for use with the official Kafka client tests.
|
|
Packit |
2997f0 |
*/
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
#include <iostream>
|
|
Packit |
2997f0 |
#include <fstream>
|
|
Packit |
2997f0 |
#include <sstream>
|
|
Packit |
2997f0 |
#include <map>
|
|
Packit |
2997f0 |
#include <string>
|
|
Packit |
2997f0 |
#include <algorithm>
|
|
Packit |
2997f0 |
#include <cstdlib>
|
|
Packit |
2997f0 |
#include <cstdio>
|
|
Packit |
2997f0 |
#include <csignal>
|
|
Packit |
2997f0 |
#include <cstring>
|
|
Packit |
2997f0 |
#include <unistd.h>
|
|
Packit |
2997f0 |
#include <sys/time.h>
|
|
Packit |
2997f0 |
#include <assert.h>
|
|
Packit |
2997f0 |
#include <ctype.h>
|
|
Packit |
2997f0 |
#include <strings.h>
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
#ifdef _MSC_VER
|
|
Packit |
2997f0 |
#include "../win32/wingetopt.h"
|
|
Packit |
2997f0 |
#elif _AIX
|
|
Packit |
2997f0 |
#include <unistd.h>
|
|
Packit |
2997f0 |
#else
|
|
Packit |
2997f0 |
#include <getopt.h>
|
|
Packit |
2997f0 |
#endif
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/*
|
|
Packit |
2997f0 |
* Typically include path in a real application would be
|
|
Packit |
2997f0 |
* #include <librdkafka/rdkafkacpp.h>
|
|
Packit |
2997f0 |
*/
|
|
Packit |
2997f0 |
#include "rdkafkacpp.h"
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
static bool run = true;
|
|
Packit |
2997f0 |
static bool exit_eof = false;
|
|
Packit |
2997f0 |
static int verbosity = 1;
|
|
Packit |
2997f0 |
static std::string value_prefix;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
class Assignment {
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
public:
|
|
Packit |
2997f0 |
static std::string name (const std::string &t, int partition) {
|
|
Packit |
2997f0 |
std::stringstream stm;
|
|
Packit |
2997f0 |
stm << t << "." << partition;
|
|
Packit |
2997f0 |
return stm.str();
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
Assignment(): topic(""), partition(-1), consumedMessages(0),
|
|
Packit |
2997f0 |
minOffset(-1), maxOffset(0) {
|
|
Packit |
2997f0 |
printf("Created assignment\n");
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
Assignment(const Assignment &a) {
|
|
Packit |
2997f0 |
topic = a.topic;
|
|
Packit |
2997f0 |
partition = a.partition;
|
|
Packit |
2997f0 |
consumedMessages = a.consumedMessages;
|
|
Packit |
2997f0 |
minOffset = a.minOffset;
|
|
Packit |
2997f0 |
maxOffset = a.maxOffset;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
Assignment &operator=(const Assignment &a) {
|
|
Packit |
2997f0 |
this->topic = a.topic;
|
|
Packit |
2997f0 |
this->partition = a.partition;
|
|
Packit |
2997f0 |
this->consumedMessages = a.consumedMessages;
|
|
Packit |
2997f0 |
this->minOffset = a.minOffset;
|
|
Packit |
2997f0 |
this->maxOffset = a.maxOffset;
|
|
Packit |
2997f0 |
return *this;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
int operator==(const Assignment &a) const {
|
|
Packit |
2997f0 |
return !(this->topic == a.topic &&
|
|
Packit |
2997f0 |
this->partition == a.partition);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
int operator<(const Assignment &a) const {
|
|
Packit |
2997f0 |
if (this->topic < a.topic) return 1;
|
|
Packit |
2997f0 |
if (this->topic >= a.topic) return 0;
|
|
Packit |
2997f0 |
return (this->partition < a.partition);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
void setup (std::string t, int32_t p) {
|
|
Packit |
2997f0 |
assert(!t.empty());
|
|
Packit |
2997f0 |
assert(topic.empty() || topic == t);
|
|
Packit |
2997f0 |
assert(partition == -1 || partition == p);
|
|
Packit |
2997f0 |
topic = t;
|
|
Packit |
2997f0 |
partition = p;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
std::string topic;
|
|
Packit |
2997f0 |
int partition;
|
|
Packit |
2997f0 |
int consumedMessages;
|
|
Packit |
2997f0 |
int64_t minOffset;
|
|
Packit |
2997f0 |
int64_t maxOffset;
|
|
Packit |
2997f0 |
};
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
static struct {
|
|
Packit |
2997f0 |
int maxMessages;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
struct {
|
|
Packit |
2997f0 |
int numAcked;
|
|
Packit |
2997f0 |
int numSent;
|
|
Packit |
2997f0 |
int numErr;
|
|
Packit |
2997f0 |
} producer;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
struct {
|
|
Packit |
2997f0 |
int consumedMessages;
|
|
Packit |
2997f0 |
int consumedMessagesLastReported;
|
|
Packit |
2997f0 |
int consumedMessagesAtLastCommit;
|
|
Packit |
2997f0 |
bool useAutoCommit;
|
|
Packit |
2997f0 |
std::map<std::string, Assignment> assignments;
|
|
Packit |
2997f0 |
} consumer;
|
|
Packit |
2997f0 |
} state = {
|
|
Packit |
2997f0 |
/* .maxMessages = */ -1
|
|
Packit |
2997f0 |
};
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
static RdKafka::KafkaConsumer *consumer;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
static std::string now () {
|
|
Packit |
2997f0 |
struct timeval tv;
|
|
Packit |
2997f0 |
gettimeofday(&tv, NULL);
|
|
Packit |
2997f0 |
time_t t = tv.tv_sec;
|
|
Packit |
2997f0 |
struct tm tm;
|
|
Packit |
2997f0 |
char buf[64];
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
localtime_r(&t, &tm;;
|
|
Packit |
2997f0 |
strftime(buf, sizeof(buf), "%H:%M:%S", &tm;;
|
|
Packit |
2997f0 |
snprintf(buf+strlen(buf), sizeof(buf)-strlen(buf), ".%03d",
|
|
Packit |
2997f0 |
(int)(tv.tv_usec / 1000));
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
return buf;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
static time_t watchdog_last_kick;
|
|
Packit |
2997f0 |
static const int watchdog_timeout = 20; /* Must be > socket.timeout.ms */
|
|
Packit |
2997f0 |
static void sigwatchdog (int sig) {
|
|
Packit |
2997f0 |
time_t t = time(NULL);
|
|
Packit |
2997f0 |
if (watchdog_last_kick + watchdog_timeout <= t) {
|
|
Packit |
2997f0 |
std::cerr << now() << ": WATCHDOG TIMEOUT (" <<
|
|
Packit |
2997f0 |
(int)(t - watchdog_last_kick) << "s): TERMINATING" << std::endl;
|
|
Packit |
2997f0 |
int *i = NULL;
|
|
Packit |
2997f0 |
*i = 100;
|
|
Packit |
2997f0 |
abort();
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
static void watchdog_kick () {
|
|
Packit |
2997f0 |
watchdog_last_kick = time(NULL);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Safe guard against hangs-on-exit */
|
|
Packit |
2997f0 |
alarm(watchdog_timeout);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
static void errorString (const std::string &name,
|
|
Packit |
2997f0 |
const std::string &errmsg,
|
|
Packit |
2997f0 |
const std::string &topic,
|
|
Packit |
2997f0 |
const std::string *key,
|
|
Packit |
2997f0 |
const std::string &value) {
|
|
Packit |
2997f0 |
std::cout << "{ "
|
|
Packit |
2997f0 |
<< "\"name\": \"" << name << "\", "
|
|
Packit |
2997f0 |
<< "\"_time\": \"" << now() << "\", "
|
|
Packit |
2997f0 |
<< "\"message\": \"" << errmsg << "\", "
|
|
Packit |
2997f0 |
<< "\"topic\": \"" << topic << "\", "
|
|
Packit |
2997f0 |
<< "\"key\": \"" << (key ? *key : "NULL") << "\", "
|
|
Packit |
2997f0 |
<< "\"value\": \"" << value << "\" "
|
|
Packit |
2997f0 |
<< "}" << std::endl;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
static void successString (const std::string &name,
|
|
Packit |
2997f0 |
const std::string &topic,
|
|
Packit |
2997f0 |
int partition,
|
|
Packit |
2997f0 |
int64_t offset,
|
|
Packit |
2997f0 |
const std::string *key,
|
|
Packit |
2997f0 |
const std::string &value) {
|
|
Packit |
2997f0 |
std::cout << "{ "
|
|
Packit |
2997f0 |
<< "\"name\": \"" << name << "\", "
|
|
Packit |
2997f0 |
<< "\"_time\": \"" << now() << "\", "
|
|
Packit |
2997f0 |
<< "\"topic\": \"" << topic << "\", "
|
|
Packit |
2997f0 |
<< "\"partition\": " << partition << ", "
|
|
Packit |
2997f0 |
<< "\"offset\": " << offset << ", "
|
|
Packit |
2997f0 |
<< "\"key\": \"" << (key ? *key : "NULL") << "\", "
|
|
Packit |
2997f0 |
<< "\"value\": \"" << value << "\" "
|
|
Packit |
2997f0 |
<< "}" << std::endl;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
#if FIXME
|
|
Packit |
2997f0 |
static void offsetStatus (bool success,
|
|
Packit |
2997f0 |
const std::string &topic,
|
|
Packit |
2997f0 |
int partition,
|
|
Packit |
2997f0 |
int64_t offset,
|
|
Packit |
2997f0 |
const std::string &errstr) {
|
|
Packit |
2997f0 |
std::cout << "{ "
|
|
Packit |
2997f0 |
"\"name\": \"offsets_committed\", " <<
|
|
Packit |
2997f0 |
"\"success\": " << success << ", " <<
|
|
Packit |
2997f0 |
"\"offsets\": [ " <<
|
|
Packit |
2997f0 |
" { " <<
|
|
Packit |
2997f0 |
" \"topic\": \"" << topic << "\", " <<
|
|
Packit |
2997f0 |
" \"partition\": " << partition << ", " <<
|
|
Packit |
2997f0 |
" \"offset\": " << (int)offset << ", " <<
|
|
Packit |
2997f0 |
" \"error\": \"" << errstr << "\" " <<
|
|
Packit |
2997f0 |
" } " <<
|
|
Packit |
2997f0 |
"] }" << std::endl;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
#endif
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
static void sigterm (int sig) {
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
std::cerr << now() << ": Terminating because of signal " << sig << std::endl;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (!run) {
|
|
Packit |
2997f0 |
std::cerr << now() << ": Forced termination" << std::endl;
|
|
Packit |
2997f0 |
exit(1);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
run = false;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
class ExampleDeliveryReportCb : public RdKafka::DeliveryReportCb {
|
|
Packit |
2997f0 |
public:
|
|
Packit |
2997f0 |
void dr_cb (RdKafka::Message &message) {
|
|
Packit |
2997f0 |
if (message.err()) {
|
|
Packit |
2997f0 |
state.producer.numErr++;
|
|
Packit |
2997f0 |
errorString("producer_send_error", message.errstr(),
|
|
Packit |
2997f0 |
message.topic_name(),
|
|
Packit |
2997f0 |
message.key(),
|
|
Packit |
2997f0 |
std::string(static_cast<const char*>(message.payload()),
|
|
Packit |
2997f0 |
message.len()));
|
|
Packit |
2997f0 |
} else {
|
|
Packit |
2997f0 |
successString("producer_send_success",
|
|
Packit |
2997f0 |
message.topic_name(),
|
|
Packit |
2997f0 |
(int)message.partition(),
|
|
Packit |
2997f0 |
message.offset(),
|
|
Packit |
2997f0 |
message.key(),
|
|
Packit |
2997f0 |
std::string(static_cast<const char*>(message.payload()),
|
|
Packit |
2997f0 |
message.len()));
|
|
Packit |
2997f0 |
state.producer.numAcked++;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
};
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
class ExampleEventCb : public RdKafka::EventCb {
|
|
Packit |
2997f0 |
public:
|
|
Packit |
2997f0 |
void event_cb (RdKafka::Event &event) {
|
|
Packit |
2997f0 |
switch (event.type())
|
|
Packit |
2997f0 |
{
|
|
Packit |
2997f0 |
case RdKafka::Event::EVENT_ERROR:
|
|
Packit |
2997f0 |
std::cerr << now() << ": ERROR (" << RdKafka::err2str(event.err()) << "): " <<
|
|
Packit |
2997f0 |
event.str() << std::endl;
|
|
Packit |
2997f0 |
break;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
case RdKafka::Event::EVENT_STATS:
|
|
Packit |
2997f0 |
std::cerr << now() << ": \"STATS\": " << event.str() << std::endl;
|
|
Packit |
2997f0 |
break;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
case RdKafka::Event::EVENT_LOG:
|
|
Packit |
2997f0 |
std::cerr << now() << ": LOG-" << event.severity() << "-"
|
|
Packit |
2997f0 |
<< event.fac() << ": " << event.str() << std::endl;
|
|
Packit |
2997f0 |
break;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
default:
|
|
Packit |
2997f0 |
std::cerr << now() << ": EVENT " << event.type() <<
|
|
Packit |
2997f0 |
" (" << RdKafka::err2str(event.err()) << "): " <<
|
|
Packit |
2997f0 |
event.str() << std::endl;
|
|
Packit |
2997f0 |
break;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
};
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Use of this partitioner is pretty pointless since no key is provided
|
|
Packit |
2997f0 |
* in the produce() call. */
|
|
Packit |
2997f0 |
class MyHashPartitionerCb : public RdKafka::PartitionerCb {
|
|
Packit |
2997f0 |
public:
|
|
Packit |
2997f0 |
int32_t partitioner_cb (const RdKafka::Topic *topic, const std::string *key,
|
|
Packit |
2997f0 |
int32_t partition_cnt, void *msg_opaque) {
|
|
Packit |
2997f0 |
return djb_hash(key->c_str(), key->size()) % partition_cnt;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
private:
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
static inline unsigned int djb_hash (const char *str, size_t len) {
|
|
Packit |
2997f0 |
unsigned int hash = 5381;
|
|
Packit |
2997f0 |
for (size_t i = 0 ; i < len ; i++)
|
|
Packit |
2997f0 |
hash = ((hash << 5) + hash) + str[i];
|
|
Packit |
2997f0 |
return hash;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
};
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/**
|
|
Packit |
2997f0 |
* Print number of records consumed, every 100 messages or on timeout.
|
|
Packit |
2997f0 |
*/
|
|
Packit |
2997f0 |
static void report_records_consumed (int immediate) {
|
|
Packit |
2997f0 |
std::map<std::string,Assignment> *assignments = &state.consumer.assignments;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (state.consumer.consumedMessages <=
|
|
Packit |
2997f0 |
state.consumer.consumedMessagesLastReported + (immediate ? 0 : 999))
|
|
Packit |
2997f0 |
return;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
std::cout << "{ "
|
|
Packit |
2997f0 |
"\"name\": \"records_consumed\", " <<
|
|
Packit |
2997f0 |
"\"_totcount\": " << state.consumer.consumedMessages << ", " <<
|
|
Packit |
2997f0 |
"\"count\": " << (state.consumer.consumedMessages -
|
|
Packit |
2997f0 |
state.consumer.consumedMessagesLastReported) << ", " <<
|
|
Packit |
2997f0 |
"\"partitions\": [ ";
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
for (std::map<std::string,Assignment>::iterator ii = assignments->begin() ;
|
|
Packit |
2997f0 |
ii != assignments->end() ; ii++) {
|
|
Packit |
2997f0 |
Assignment *a = &(*ii).second;
|
|
Packit |
2997f0 |
assert(!a->topic.empty());
|
|
Packit |
2997f0 |
std::cout << (ii == assignments->begin() ? "": ", ") << " { " <<
|
|
Packit |
2997f0 |
" \"topic\": \"" << a->topic << "\", " <<
|
|
Packit |
2997f0 |
" \"partition\": " << a->partition << ", " <<
|
|
Packit |
2997f0 |
" \"minOffset\": " << a->minOffset << ", " <<
|
|
Packit |
2997f0 |
" \"maxOffset\": " << a->maxOffset << " " <<
|
|
Packit |
2997f0 |
" } ";
|
|
Packit |
2997f0 |
a->minOffset = -1;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
std::cout << "] }" << std::endl;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
state.consumer.consumedMessagesLastReported = state.consumer.consumedMessages;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
class ExampleOffsetCommitCb : public RdKafka::OffsetCommitCb {
|
|
Packit |
2997f0 |
public:
|
|
Packit |
2997f0 |
void offset_commit_cb (RdKafka::ErrorCode err,
|
|
Packit |
2997f0 |
std::vector<RdKafka::TopicPartition*> &offsets) {
|
|
Packit |
2997f0 |
std::cerr << now() << ": Propagate offset for " << offsets.size() << " partitions, error: " << RdKafka::err2str(err) << std::endl;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* No offsets to commit, dont report anything. */
|
|
Packit |
2997f0 |
if (err == RdKafka::ERR__NO_OFFSET)
|
|
Packit |
2997f0 |
return;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Send up-to-date records_consumed report to make sure consumed > committed */
|
|
Packit |
2997f0 |
report_records_consumed(1);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
std::cout << "{ " <<
|
|
Packit |
2997f0 |
"\"name\": \"offsets_committed\", " <<
|
|
Packit |
2997f0 |
"\"success\": " << (err ? "false" : "true") << ", " <<
|
|
Packit |
2997f0 |
"\"error\": \"" << (err ? RdKafka::err2str(err) : "") << "\", " <<
|
|
Packit |
2997f0 |
"\"_autocommit\": " << (state.consumer.useAutoCommit ? "true":"false") << ", " <<
|
|
Packit |
2997f0 |
"\"offsets\": [ ";
|
|
Packit |
2997f0 |
assert(offsets.size() > 0);
|
|
Packit |
2997f0 |
for (unsigned int i = 0 ; i < offsets.size() ; i++) {
|
|
Packit |
2997f0 |
std::cout << (i == 0 ? "" : ", ") << "{ " <<
|
|
Packit |
2997f0 |
" \"topic\": \"" << offsets[i]->topic() << "\", " <<
|
|
Packit |
2997f0 |
" \"partition\": " << offsets[i]->partition() << ", " <<
|
|
Packit |
2997f0 |
" \"offset\": " << (int)offsets[i]->offset() << ", " <<
|
|
Packit |
2997f0 |
" \"error\": \"" <<
|
|
Packit |
2997f0 |
(offsets[i]->err() ? RdKafka::err2str(offsets[i]->err()) : "") <<
|
|
Packit |
2997f0 |
"\" " <<
|
|
Packit |
2997f0 |
" }";
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
std::cout << " ] }" << std::endl;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
};
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
static ExampleOffsetCommitCb ex_offset_commit_cb;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/**
|
|
Packit |
2997f0 |
* Commit every 1000 messages or whenever there is a consume timeout.
|
|
Packit |
2997f0 |
*/
|
|
Packit |
2997f0 |
static void do_commit (RdKafka::KafkaConsumer *consumer,
|
|
Packit |
2997f0 |
int immediate) {
|
|
Packit |
2997f0 |
if (!immediate &&
|
|
Packit |
2997f0 |
(state.consumer.useAutoCommit ||
|
|
Packit |
2997f0 |
state.consumer.consumedMessagesAtLastCommit + 1000 >
|
|
Packit |
2997f0 |
state.consumer.consumedMessages))
|
|
Packit |
2997f0 |
return;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Make sure we report consumption before commit,
|
|
Packit |
2997f0 |
* otherwise tests may fail because of commit > consumed. */
|
|
Packit |
2997f0 |
if (state.consumer.consumedMessagesLastReported <
|
|
Packit |
2997f0 |
state.consumer.consumedMessages)
|
|
Packit |
2997f0 |
report_records_consumed(1);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
std::cerr << now() << ": committing " <<
|
|
Packit |
2997f0 |
(state.consumer.consumedMessages -
|
|
Packit |
2997f0 |
state.consumer.consumedMessagesAtLastCommit) << " messages" << std::endl;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
RdKafka::ErrorCode err;
|
|
Packit |
2997f0 |
err = consumer->commitSync(&ex_offset_commit_cb);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
std::cerr << now() << ": " <<
|
|
Packit |
2997f0 |
"sync commit returned " << RdKafka::err2str(err) << std::endl;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
state.consumer.consumedMessagesAtLastCommit =
|
|
Packit |
2997f0 |
state.consumer.consumedMessages;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
void msg_consume(RdKafka::KafkaConsumer *consumer,
|
|
Packit |
2997f0 |
RdKafka::Message* msg, void* opaque) {
|
|
Packit |
2997f0 |
switch (msg->err()) {
|
|
Packit |
2997f0 |
case RdKafka::ERR__TIMED_OUT:
|
|
Packit |
2997f0 |
/* Try reporting consumed messages */
|
|
Packit |
2997f0 |
report_records_consumed(1);
|
|
Packit |
2997f0 |
/* Commit one every consume() timeout instead of on every message.
|
|
Packit |
2997f0 |
* Also commit on every 1000 messages, whichever comes first. */
|
|
Packit |
2997f0 |
do_commit(consumer, 1);
|
|
Packit |
2997f0 |
break;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
case RdKafka::ERR_NO_ERROR:
|
|
Packit |
2997f0 |
{
|
|
Packit |
2997f0 |
/* Real message */
|
|
Packit |
2997f0 |
if (verbosity > 2)
|
|
Packit |
2997f0 |
std::cerr << now() << ": Read msg from " << msg->topic_name() <<
|
|
Packit |
2997f0 |
" [" << (int)msg->partition() << "] at offset " <<
|
|
Packit |
2997f0 |
msg->offset() << std::endl;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (state.maxMessages >= 0 &&
|
|
Packit |
2997f0 |
state.consumer.consumedMessages >= state.maxMessages)
|
|
Packit |
2997f0 |
return;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
Assignment *a =
|
|
Packit |
2997f0 |
&state.consumer.assignments[Assignment::name(msg->topic_name(),
|
|
Packit |
2997f0 |
msg->partition())];
|
|
Packit |
2997f0 |
a->setup(msg->topic_name(), msg->partition());
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
a->consumedMessages++;
|
|
Packit |
2997f0 |
if (a->minOffset == -1)
|
|
Packit |
2997f0 |
a->minOffset = msg->offset();
|
|
Packit |
2997f0 |
if (a->maxOffset < msg->offset())
|
|
Packit |
2997f0 |
a->maxOffset = msg->offset();
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (msg->key()) {
|
|
Packit |
2997f0 |
if (verbosity >= 3)
|
|
Packit |
2997f0 |
std::cerr << now() << ": Key: " << *msg->key() << std::endl;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (verbosity >= 3)
|
|
Packit |
2997f0 |
fprintf(stderr, "%.*s\n",
|
|
Packit |
2997f0 |
static_cast<int>(msg->len()),
|
|
Packit |
2997f0 |
static_cast<const char *>(msg->payload()));
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
state.consumer.consumedMessages++;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
report_records_consumed(0);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
do_commit(consumer, 0);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
break;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
case RdKafka::ERR__PARTITION_EOF:
|
|
Packit |
2997f0 |
/* Last message */
|
|
Packit |
2997f0 |
if (exit_eof) {
|
|
Packit |
2997f0 |
std::cerr << now() << ": Terminate: exit on EOF" << std::endl;
|
|
Packit |
2997f0 |
run = false;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
break;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
case RdKafka::ERR__UNKNOWN_TOPIC:
|
|
Packit |
2997f0 |
case RdKafka::ERR__UNKNOWN_PARTITION:
|
|
Packit |
2997f0 |
std::cerr << now() << ": Consume failed: " << msg->errstr() << std::endl;
|
|
Packit |
2997f0 |
run = false;
|
|
Packit |
2997f0 |
break;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
case RdKafka::ERR_GROUP_COORDINATOR_NOT_AVAILABLE:
|
|
Packit |
2997f0 |
std::cerr << now() << ": Warning: " << msg->errstr() << std::endl;
|
|
Packit |
2997f0 |
break;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
default:
|
|
Packit |
2997f0 |
/* Errors */
|
|
Packit |
2997f0 |
std::cerr << now() << ": Consume failed: " << msg->errstr() << std::endl;
|
|
Packit |
2997f0 |
run = false;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
class ExampleConsumeCb : public RdKafka::ConsumeCb {
|
|
Packit |
2997f0 |
public:
|
|
Packit |
2997f0 |
void consume_cb (RdKafka::Message &msg, void *opaque) {
|
|
Packit |
2997f0 |
msg_consume(consumer_, &msg, opaque);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
RdKafka::KafkaConsumer *consumer_;
|
|
Packit |
2997f0 |
};
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
class ExampleRebalanceCb : public RdKafka::RebalanceCb {
|
|
Packit |
2997f0 |
private:
|
|
Packit |
2997f0 |
static std::string part_list_json (const std::vector<RdKafka::TopicPartition*> &partitions) {
|
|
Packit |
2997f0 |
std::ostringstream out;
|
|
Packit |
2997f0 |
for (unsigned int i = 0 ; i < partitions.size() ; i++)
|
|
Packit |
2997f0 |
out << (i==0?"":", ") << "{ " <<
|
|
Packit |
2997f0 |
" \"topic\": \"" << partitions[i]->topic() << "\", " <<
|
|
Packit |
2997f0 |
" \"partition\": " << partitions[i]->partition() <<
|
|
Packit |
2997f0 |
" }";
|
|
Packit |
2997f0 |
return out.str();
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
public:
|
|
Packit |
2997f0 |
void rebalance_cb (RdKafka::KafkaConsumer *consumer,
|
|
Packit |
2997f0 |
RdKafka::ErrorCode err,
|
|
Packit |
2997f0 |
std::vector<RdKafka::TopicPartition*> &partitions) {
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
std::cerr << now() << ": rebalance_cb " << RdKafka::err2str(err) <<
|
|
Packit |
2997f0 |
" for " << partitions.size() << " partitions" << std::endl;
|
|
Packit |
2997f0 |
/* Send message report prior to rebalancing event to make sure they
|
|
Packit |
2997f0 |
* are accounted for on the "right side" of the rebalance. */
|
|
Packit |
2997f0 |
report_records_consumed(1);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (err == RdKafka::ERR__ASSIGN_PARTITIONS)
|
|
Packit |
2997f0 |
consumer->assign(partitions);
|
|
Packit |
2997f0 |
else {
|
|
Packit |
2997f0 |
do_commit(consumer, 1);
|
|
Packit |
2997f0 |
consumer->unassign();
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
std::cout <<
|
|
Packit |
2997f0 |
"{ " <<
|
|
Packit |
2997f0 |
"\"name\": \"partitions_" << (err == RdKafka::ERR__ASSIGN_PARTITIONS ?
|
|
Packit |
2997f0 |
"assigned" : "revoked") << "\", " <<
|
|
Packit |
2997f0 |
"\"partitions\": [ " << part_list_json(partitions) << "] }" << std::endl;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
};
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/**
|
|
Packit |
2997f0 |
* @brief Read (Java client) configuration file
|
|
Packit |
2997f0 |
*/
|
|
Packit |
2997f0 |
static void read_conf_file (RdKafka::Conf *conf, const std::string &conf_file) {
|
|
Packit |
2997f0 |
std::ifstream inf(conf_file.c_str());
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (!inf) {
|
|
Packit |
2997f0 |
std::cerr << now() << ": " << conf_file << ": could not open file" << std::endl;
|
|
Packit |
2997f0 |
exit(1);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
std::cerr << now() << ": " << conf_file << ": read config file" << std::endl;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
std::string line;
|
|
Packit |
2997f0 |
int linenr = 0;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
while (std::getline(inf, line)) {
|
|
Packit |
2997f0 |
linenr++;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
// Ignore comments and empty lines
|
|
Packit |
2997f0 |
if (line[0] == '#' || line.length() == 0)
|
|
Packit |
2997f0 |
continue;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
// Match on key=value..
|
|
Packit |
2997f0 |
size_t d = line.find("=");
|
|
Packit |
2997f0 |
if (d == 0 || d == std::string::npos) {
|
|
Packit |
2997f0 |
std::cerr << now() << ": " << conf_file << ":" << linenr << ": " << line << ": ignoring invalid line (expect key=value): " << ::std::endl;
|
|
Packit |
2997f0 |
continue;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
std::string key = line.substr(0, d);
|
|
Packit |
2997f0 |
std::string val = line.substr(d+1);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
std::string errstr;
|
|
Packit |
2997f0 |
if (conf->set(key, val, errstr)) {
|
|
Packit |
2997f0 |
std::cerr << now() << ": " << conf_file << ":" << linenr << ": " << key << "=" << val << ": " << errstr << ": ignoring error" << std::endl;
|
|
Packit |
2997f0 |
} else {
|
|
Packit |
2997f0 |
std::cerr << now() << ": " << conf_file << ":" << linenr << ": " << key << "=" << val << ": applied to configuration" << std::endl;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
inf.close();
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
int main (int argc, char **argv) {
|
|
Packit |
2997f0 |
std::string brokers = "localhost";
|
|
Packit |
2997f0 |
std::string errstr;
|
|
Packit |
2997f0 |
std::vector<std::string> topics;
|
|
Packit |
2997f0 |
std::string mode = "P";
|
|
Packit |
2997f0 |
int throughput = 0;
|
|
Packit |
2997f0 |
int32_t partition = RdKafka::Topic::PARTITION_UA;
|
|
Packit |
2997f0 |
MyHashPartitionerCb hash_partitioner;
|
|
Packit |
2997f0 |
int64_t create_time = -1;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
std::cerr << now() << ": librdkafka version " << RdKafka::version_str() <<
|
|
Packit |
2997f0 |
" (" << RdKafka::version() << ")" << std::endl;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/*
|
|
Packit |
2997f0 |
* Create configuration objects
|
|
Packit |
2997f0 |
*/
|
|
Packit |
2997f0 |
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Java VerifiableProducer defaults to acks=all */
|
|
Packit |
2997f0 |
if (conf->set("acks", "all", errstr)) {
|
|
Packit |
2997f0 |
std::cerr << now() << ": " << errstr << std::endl;
|
|
Packit |
2997f0 |
exit(1);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Avoid slow shutdown on error */
|
|
Packit |
2997f0 |
if (conf->set("message.timeout.ms", "60000", errstr)) {
|
|
Packit |
2997f0 |
std::cerr << now() << ": " << errstr << std::endl;
|
|
Packit |
2997f0 |
exit(1);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
{
|
|
Packit |
2997f0 |
char hostname[128];
|
|
Packit |
2997f0 |
gethostname(hostname, sizeof(hostname)-1);
|
|
Packit |
2997f0 |
conf->set("client.id", std::string("rdkafka@") + hostname, errstr);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
conf->set("log.thread.name", "true", errstr);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* correct producer offsets */
|
|
Packit |
2997f0 |
conf->set("produce.offset.report", "true", errstr);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* auto commit is explicitly enabled with --enable-autocommit */
|
|
Packit |
2997f0 |
conf->set("enable.auto.commit", "false", errstr);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* keep protocol request timeouts under the watchdog timeout
|
|
Packit |
2997f0 |
* to make sure things like commitSync() dont fall victim to the watchdog. */
|
|
Packit |
2997f0 |
conf->set("socket.timeout.ms", "10000", errstr);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
conf->set("fetch.wait.max.ms", "500", errstr);
|
|
Packit |
2997f0 |
conf->set("fetch.min.bytes", "4096", errstr);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
for (int i = 1 ; i < argc ; i++) {
|
|
Packit |
2997f0 |
const char *name = argv[i];
|
|
Packit |
2997f0 |
const char *val = i+1 < argc ? argv[i+1] : NULL;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (val && !strncmp(val, "-", 1))
|
|
Packit |
2997f0 |
val = NULL;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
std::cout << now() << ": argument: " << name << " " <<
|
|
Packit |
2997f0 |
(val?val:"") << std::endl;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (val) {
|
|
Packit |
2997f0 |
if (!strcmp(name, "--topic"))
|
|
Packit |
2997f0 |
topics.push_back(val);
|
|
Packit |
2997f0 |
else if (!strcmp(name, "--broker-list"))
|
|
Packit |
2997f0 |
brokers = val;
|
|
Packit |
2997f0 |
else if (!strcmp(name, "--max-messages"))
|
|
Packit |
2997f0 |
state.maxMessages = atoi(val);
|
|
Packit |
2997f0 |
else if (!strcmp(name, "--throughput"))
|
|
Packit |
2997f0 |
throughput = atoi(val);
|
|
Packit |
2997f0 |
else if (!strcmp(name, "--producer.config") ||
|
|
Packit |
2997f0 |
!strcmp(name, "--consumer.config"))
|
|
Packit |
2997f0 |
read_conf_file(conf, val);
|
|
Packit |
2997f0 |
else if (!strcmp(name, "--group-id"))
|
|
Packit |
2997f0 |
conf->set("group.id", val, errstr);
|
|
Packit |
2997f0 |
else if (!strcmp(name, "--session-timeout"))
|
|
Packit |
2997f0 |
conf->set("session.timeout.ms", val, errstr);
|
|
Packit |
2997f0 |
else if (!strcmp(name, "--reset-policy")) {
|
|
Packit |
2997f0 |
if (conf->set("auto.offset.reset", val, errstr)) {
|
|
Packit |
2997f0 |
std::cerr << now() << ": " << errstr << std::endl;
|
|
Packit |
2997f0 |
exit(1);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
} else if (!strcmp(name, "--assignment-strategy")) {
|
|
Packit |
2997f0 |
/* The system tests pass the Java class name(s) rather than
|
|
Packit |
2997f0 |
* the configuration value. Fix it.
|
|
Packit |
2997f0 |
* "org.apache.kafka.clients.consumer.RangeAssignor,.." -> "range,.."
|
|
Packit |
2997f0 |
*/
|
|
Packit |
2997f0 |
std::string s = val;
|
|
Packit |
2997f0 |
size_t pos;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
while ((pos = s.find("org.apache.kafka.clients.consumer.")) !=
|
|
Packit |
2997f0 |
std::string::npos)
|
|
Packit |
2997f0 |
s.erase(pos, strlen("org.apache.kafka.clients.consumer."));
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
while ((pos = s.find("Assignor")) != std::string::npos)
|
|
Packit |
2997f0 |
s.erase(pos, strlen("Assignor"));
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
std::transform(s.begin(), s.end(), s.begin(), tolower);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
std::cerr << now() << ": converted " << name << " "
|
|
Packit |
2997f0 |
<< val << " to " << s << std::endl;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (conf->set("partition.assignment.strategy", s.c_str(), errstr)) {
|
|
Packit |
2997f0 |
std::cerr << now() << ": " << errstr << std::endl;
|
|
Packit |
2997f0 |
exit(1);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
} else if (!strcmp(name, "--value-prefix")) {
|
|
Packit |
2997f0 |
value_prefix = std::string(val) + ".";
|
|
Packit |
2997f0 |
} else if (!strcmp(name, "--acks")) {
|
|
Packit |
2997f0 |
if (conf->set("acks", val, errstr)) {
|
|
Packit |
2997f0 |
std::cerr << now() << ": " << errstr << std::endl;
|
|
Packit |
2997f0 |
exit(1);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
} else if (!strcmp(name, "--message-create-time")) {
|
|
Packit |
2997f0 |
create_time = (int64_t)atoi(val);
|
|
Packit |
2997f0 |
} else if (!strcmp(name, "--debug")) {
|
|
Packit |
2997f0 |
conf->set("debug", val, errstr);
|
|
Packit |
2997f0 |
} else if (!strcmp(name, "-X")) {
|
|
Packit |
2997f0 |
char *s = strdup(val);
|
|
Packit |
2997f0 |
char *t = strchr(s, '=');
|
|
Packit |
2997f0 |
if (!t)
|
|
Packit |
2997f0 |
t = (char *)"";
|
|
Packit |
2997f0 |
else {
|
|
Packit |
2997f0 |
*t = '\0';
|
|
Packit |
2997f0 |
t++;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
if (conf->set(s, t, errstr)) {
|
|
Packit |
2997f0 |
std::cerr << now() << ": " << errstr << std::endl;
|
|
Packit |
2997f0 |
exit(1);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
free(s);
|
|
Packit |
2997f0 |
} else {
|
|
Packit |
2997f0 |
std::cerr << now() << ": Unknown option " << name << std::endl;
|
|
Packit |
2997f0 |
exit(1);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
i++;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
} else {
|
|
Packit |
2997f0 |
if (!strcmp(name, "--consumer"))
|
|
Packit |
2997f0 |
mode = "C";
|
|
Packit |
2997f0 |
else if (!strcmp(name, "--producer"))
|
|
Packit |
2997f0 |
mode = "P";
|
|
Packit |
2997f0 |
else if (!strcmp(name, "--enable-autocommit")) {
|
|
Packit |
2997f0 |
state.consumer.useAutoCommit = true;
|
|
Packit |
2997f0 |
conf->set("enable.auto.commit", "true", errstr);
|
|
Packit |
2997f0 |
} else if (!strcmp(name, "-v"))
|
|
Packit |
2997f0 |
verbosity++;
|
|
Packit |
2997f0 |
else if (!strcmp(name, "-q"))
|
|
Packit |
2997f0 |
verbosity--;
|
|
Packit |
2997f0 |
else {
|
|
Packit |
2997f0 |
std::cerr << now() << ": Unknown option or missing argument to " << name << std::endl;
|
|
Packit |
2997f0 |
exit(1);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (topics.empty() || brokers.empty()) {
|
|
Packit |
2997f0 |
std::cerr << now() << ": Missing --topic and --broker-list" << std::endl;
|
|
Packit |
2997f0 |
exit(1);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/*
|
|
Packit |
2997f0 |
* Set configuration properties
|
|
Packit |
2997f0 |
*/
|
|
Packit |
2997f0 |
conf->set("metadata.broker.list", brokers, errstr);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
ExampleEventCb ex_event_cb;
|
|
Packit |
2997f0 |
conf->set("event_cb", &ex_event_cb, errstr);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
signal(SIGINT, sigterm);
|
|
Packit |
2997f0 |
signal(SIGTERM, sigterm);
|
|
Packit |
2997f0 |
signal(SIGALRM, sigwatchdog);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (mode == "P") {
|
|
Packit |
2997f0 |
/*
|
|
Packit |
2997f0 |
* Producer mode
|
|
Packit |
2997f0 |
*/
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
ExampleDeliveryReportCb ex_dr_cb;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Set delivery report callback */
|
|
Packit |
2997f0 |
conf->set("dr_cb", &ex_dr_cb, errstr);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/*
|
|
Packit |
2997f0 |
* Create producer using accumulated global configuration.
|
|
Packit |
2997f0 |
*/
|
|
Packit |
2997f0 |
RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);
|
|
Packit |
2997f0 |
if (!producer) {
|
|
Packit |
2997f0 |
std::cerr << now() << ": Failed to create producer: " << errstr << std::endl;
|
|
Packit |
2997f0 |
exit(1);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
std::cerr << now() << ": % Created producer " << producer->name() << std::endl;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/*
|
|
Packit |
2997f0 |
* Create topic handle.
|
|
Packit |
2997f0 |
*/
|
|
Packit |
2997f0 |
RdKafka::Topic *topic = RdKafka::Topic::create(producer, topics[0],
|
|
Packit |
2997f0 |
NULL, errstr);
|
|
Packit |
2997f0 |
if (!topic) {
|
|
Packit |
2997f0 |
std::cerr << now() << ": Failed to create topic: " << errstr << std::endl;
|
|
Packit |
2997f0 |
exit(1);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
static const int delay_us = throughput ? 1000000/throughput : 10;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (state.maxMessages == -1)
|
|
Packit |
2997f0 |
state.maxMessages = 1000000; /* Avoid infinite produce */
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
for (int i = 0 ; run && i < state.maxMessages ; i++) {
|
|
Packit |
2997f0 |
/*
|
|
Packit |
2997f0 |
* Produce message
|
|
Packit |
2997f0 |
*/
|
|
Packit |
2997f0 |
std::ostringstream msg;
|
|
Packit |
2997f0 |
msg << value_prefix << i;
|
|
Packit |
2997f0 |
while (true) {
|
|
Packit |
2997f0 |
RdKafka::ErrorCode resp;
|
|
Packit |
2997f0 |
if (create_time == -1) {
|
|
Packit |
2997f0 |
resp = producer->produce(topic, partition,
|
|
Packit |
2997f0 |
RdKafka::Producer::RK_MSG_COPY /* Copy payload */,
|
|
Packit |
2997f0 |
const_cast<char *>(msg.str().c_str()),
|
|
Packit |
2997f0 |
msg.str().size(), NULL, NULL);
|
|
Packit |
2997f0 |
} else {
|
|
Packit |
2997f0 |
resp = producer->produce(topics[0], partition,
|
|
Packit |
2997f0 |
RdKafka::Producer::RK_MSG_COPY /* Copy payload */,
|
|
Packit |
2997f0 |
const_cast<char *>(msg.str().c_str()),
|
|
Packit |
2997f0 |
msg.str().size(),
|
|
Packit |
2997f0 |
NULL, 0,
|
|
Packit |
2997f0 |
create_time,
|
|
Packit |
2997f0 |
NULL);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (resp == RdKafka::ERR__QUEUE_FULL) {
|
|
Packit |
2997f0 |
producer->poll(100);
|
|
Packit |
2997f0 |
continue;
|
|
Packit |
2997f0 |
} else if (resp != RdKafka::ERR_NO_ERROR) {
|
|
Packit |
2997f0 |
errorString("producer_send_error",
|
|
Packit |
2997f0 |
RdKafka::err2str(resp), topic->name(), NULL, msg.str());
|
|
Packit |
2997f0 |
state.producer.numErr++;
|
|
Packit |
2997f0 |
} else {
|
|
Packit |
2997f0 |
state.producer.numSent++;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
break;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
producer->poll(delay_us / 1000);
|
|
Packit |
2997f0 |
usleep(1000);
|
|
Packit |
2997f0 |
watchdog_kick();
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
run = true;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
while (run && producer->outq_len() > 0) {
|
|
Packit |
2997f0 |
std::cerr << now() << ": Waiting for " << producer->outq_len() << std::endl;
|
|
Packit |
2997f0 |
producer->poll(1000);
|
|
Packit |
2997f0 |
watchdog_kick();
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
std::cerr << now() << ": " << state.producer.numAcked << "/" <<
|
|
Packit |
2997f0 |
state.producer.numSent << "/" << state.maxMessages <<
|
|
Packit |
2997f0 |
" msgs acked/sent/max, " << state.producer.numErr <<
|
|
Packit |
2997f0 |
" errored" << std::endl;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
delete topic;
|
|
Packit |
2997f0 |
delete producer;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
} else if (mode == "C") {
|
|
Packit |
2997f0 |
/*
|
|
Packit |
2997f0 |
* Consumer mode
|
|
Packit |
2997f0 |
*/
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
conf->set("auto.offset.reset", "smallest", errstr);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
ExampleRebalanceCb ex_rebalance_cb;
|
|
Packit |
2997f0 |
conf->set("rebalance_cb", &ex_rebalance_cb, errstr);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
conf->set("offset_commit_cb", &ex_offset_commit_cb, errstr);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/*
|
|
Packit |
2997f0 |
* Create consumer using accumulated global configuration.
|
|
Packit |
2997f0 |
*/
|
|
Packit |
2997f0 |
consumer = RdKafka::KafkaConsumer::create(conf, errstr);
|
|
Packit |
2997f0 |
if (!consumer) {
|
|
Packit |
2997f0 |
std::cerr << now() << ": Failed to create consumer: " <<
|
|
Packit |
2997f0 |
errstr << std::endl;
|
|
Packit |
2997f0 |
exit(1);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
std::cerr << now() << ": % Created consumer " << consumer->name() <<
|
|
Packit |
2997f0 |
std::endl;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/*
|
|
Packit |
2997f0 |
* Subscribe to topic(s)
|
|
Packit |
2997f0 |
*/
|
|
Packit |
2997f0 |
RdKafka::ErrorCode resp = consumer->subscribe(topics);
|
|
Packit |
2997f0 |
if (resp != RdKafka::ERR_NO_ERROR) {
|
|
Packit |
2997f0 |
std::cerr << now() << ": Failed to subscribe to " << topics.size() << " topics: "
|
|
Packit |
2997f0 |
<< RdKafka::err2str(resp) << std::endl;
|
|
Packit |
2997f0 |
exit(1);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
watchdog_kick();
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/*
|
|
Packit |
2997f0 |
* Consume messages
|
|
Packit |
2997f0 |
*/
|
|
Packit |
2997f0 |
while (run) {
|
|
Packit |
2997f0 |
RdKafka::Message *msg = consumer->consume(500);
|
|
Packit |
2997f0 |
msg_consume(consumer, msg, NULL);
|
|
Packit |
2997f0 |
delete msg;
|
|
Packit |
2997f0 |
watchdog_kick();
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
std::cerr << now() << ": Final commit on termination" << std::endl;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Final commit */
|
|
Packit |
2997f0 |
do_commit(consumer, 1);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/*
|
|
Packit |
2997f0 |
* Stop consumer
|
|
Packit |
2997f0 |
*/
|
|
Packit |
2997f0 |
consumer->close();
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
delete consumer;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
std::cout << "{ \"name\": \"shutdown_complete\" }" << std::endl;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/*
|
|
Packit |
2997f0 |
* Wait for RdKafka to decommission.
|
|
Packit |
2997f0 |
* This is not strictly needed (when check outq_len() above), but
|
|
Packit |
2997f0 |
* allows RdKafka to clean up all its resources before the application
|
|
Packit |
2997f0 |
* exits so that memory profilers such as valgrind wont complain about
|
|
Packit |
2997f0 |
* memory leaks.
|
|
Packit |
2997f0 |
*/
|
|
Packit |
2997f0 |
RdKafka::wait_destroyed(5000);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
std::cerr << now() << ": EXITING WITH RETURN VALUE 0" << std::endl;
|
|
Packit |
2997f0 |
return 0;
|
|
Packit |
2997f0 |
}
|