|
Packit |
2997f0 |
/*
|
|
Packit |
2997f0 |
* librdkafka - Apache Kafka C library
|
|
Packit |
2997f0 |
*
|
|
Packit |
2997f0 |
* Copyright (c) 2012, Magnus Edenhill
|
|
Packit |
2997f0 |
* All rights reserved.
|
|
Packit |
2997f0 |
*
|
|
Packit |
2997f0 |
* Redistribution and use in source and binary forms, with or without
|
|
Packit |
2997f0 |
* modification, are permitted provided that the following conditions are met:
|
|
Packit |
2997f0 |
*
|
|
Packit |
2997f0 |
* 1. Redistributions of source code must retain the above copyright notice,
|
|
Packit |
2997f0 |
* this list of conditions and the following disclaimer.
|
|
Packit |
2997f0 |
* 2. Redistributions in binary form must reproduce the above copyright notice,
|
|
Packit |
2997f0 |
* this list of conditions and the following disclaimer in the documentation
|
|
Packit |
2997f0 |
* and/or other materials provided with the distribution.
|
|
Packit |
2997f0 |
*
|
|
Packit |
2997f0 |
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
|
Packit |
2997f0 |
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
|
Packit |
2997f0 |
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
|
Packit |
2997f0 |
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
|
|
Packit |
2997f0 |
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
|
|
Packit |
2997f0 |
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
|
|
Packit |
2997f0 |
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
|
Packit |
2997f0 |
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
|
|
Packit |
2997f0 |
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
|
|
Packit |
2997f0 |
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
|
Packit |
2997f0 |
* POSSIBILITY OF SUCH DAMAGE.
|
|
Packit |
2997f0 |
*/
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/**
|
|
Packit |
2997f0 |
* Apache Kafka consumer & producer example programs
|
|
Packit |
2997f0 |
* using the Kafka driver from librdkafka
|
|
Packit |
2997f0 |
* (https://github.com/edenhill/librdkafka)
|
|
Packit |
2997f0 |
*/
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
#include <ctype.h>
|
|
Packit |
2997f0 |
#include <signal.h>
|
|
Packit |
2997f0 |
#include <string.h>
|
|
Packit |
2997f0 |
#include <unistd.h>
|
|
Packit |
2997f0 |
#include <stdlib.h>
|
|
Packit |
2997f0 |
#include <syslog.h>
|
|
Packit |
2997f0 |
#include <time.h>
|
|
Packit |
2997f0 |
#include <sys/time.h>
|
|
Packit |
2997f0 |
#include <getopt.h>
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Typical include path would be <librdkafka/rdkafka.h>, but this program
|
|
Packit |
2997f0 |
* is builtin from within the librdkafka source tree and thus differs. */
|
|
Packit |
2997f0 |
#include "rdkafka.h" /* for Kafka driver */
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
static int run = 1;
|
|
Packit |
2997f0 |
static rd_kafka_t *rk;
|
|
Packit |
2997f0 |
static int exit_eof = 0;
|
|
Packit |
2997f0 |
static int quiet = 0;
|
|
Packit |
2997f0 |
static enum {
|
|
Packit |
2997f0 |
OUTPUT_HEXDUMP,
|
|
Packit |
2997f0 |
OUTPUT_RAW,
|
|
Packit |
2997f0 |
} output = OUTPUT_HEXDUMP;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
static void stop (int sig) {
|
|
Packit |
2997f0 |
run = 0;
|
|
Packit |
2997f0 |
fclose(stdin); /* abort fgets() */
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
static void hexdump (FILE *fp, const char *name, const void *ptr, size_t len) {
|
|
Packit |
2997f0 |
const char *p = (const char *)ptr;
|
|
Packit |
2997f0 |
size_t of = 0;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (name)
|
|
Packit |
2997f0 |
fprintf(fp, "%s hexdump (%zd bytes):\n", name, len);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
for (of = 0 ; of < len ; of += 16) {
|
|
Packit |
2997f0 |
char hexen[16*3+1];
|
|
Packit |
2997f0 |
char charen[16+1];
|
|
Packit |
2997f0 |
int hof = 0;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
int cof = 0;
|
|
Packit |
2997f0 |
int i;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
for (i = of ; i < (int)of + 16 && i < (int)len ; i++) {
|
|
Packit |
2997f0 |
hof += sprintf(hexen+hof, "%02x ", p[i] & 0xff);
|
|
Packit |
2997f0 |
cof += sprintf(charen+cof, "%c",
|
|
Packit |
2997f0 |
isprint((int)p[i]) ? p[i] : '.');
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
fprintf(fp, "%08zx: %-48s %-16s\n",
|
|
Packit |
2997f0 |
of, hexen, charen);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/**
|
|
Packit |
2997f0 |
* Kafka logger callback (optional)
|
|
Packit |
2997f0 |
*/
|
|
Packit |
2997f0 |
static void logger (const rd_kafka_t *rk, int level,
|
|
Packit |
2997f0 |
const char *fac, const char *buf) {
|
|
Packit |
2997f0 |
struct timeval tv;
|
|
Packit |
2997f0 |
gettimeofday(&tv, NULL);
|
|
Packit |
2997f0 |
fprintf(stderr, "%u.%03u RDKAFKA-%i-%s: %s: %s\n",
|
|
Packit |
2997f0 |
(int)tv.tv_sec, (int)(tv.tv_usec / 1000),
|
|
Packit |
2997f0 |
level, fac, rk ? rd_kafka_name(rk) : NULL, buf);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/**
|
|
Packit |
2997f0 |
* Message delivery report callback.
|
|
Packit |
2997f0 |
* Called once for each message.
|
|
Packit |
2997f0 |
* See rdkafka.h for more information.
|
|
Packit |
2997f0 |
*/
|
|
Packit |
2997f0 |
static void msg_delivered (rd_kafka_t *rk,
|
|
Packit |
2997f0 |
void *payload, size_t len,
|
|
Packit |
2997f0 |
int error_code,
|
|
Packit |
2997f0 |
void *opaque, void *msg_opaque) {
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (error_code)
|
|
Packit |
2997f0 |
fprintf(stderr, "%% Message delivery failed: %s\n",
|
|
Packit |
2997f0 |
rd_kafka_err2str(error_code));
|
|
Packit |
2997f0 |
else if (!quiet)
|
|
Packit |
2997f0 |
fprintf(stderr, "%% Message delivered (%zd bytes): %.*s\n", len,
|
|
Packit |
2997f0 |
(int)len, (const char *)payload);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/**
|
|
Packit |
2997f0 |
* Message delivery report callback using the richer rd_kafka_message_t object.
|
|
Packit |
2997f0 |
*/
|
|
Packit |
2997f0 |
static void msg_delivered2 (rd_kafka_t *rk,
|
|
Packit |
2997f0 |
const rd_kafka_message_t *rkmessage, void *opaque) {
|
|
Packit |
2997f0 |
printf("del: %s: offset %"PRId64"\n",
|
|
Packit |
2997f0 |
rd_kafka_err2str(rkmessage->err), rkmessage->offset);
|
|
Packit |
2997f0 |
if (rkmessage->err)
|
|
Packit |
2997f0 |
fprintf(stderr, "%% Message delivery failed: %s\n",
|
|
Packit |
2997f0 |
rd_kafka_err2str(rkmessage->err));
|
|
Packit |
2997f0 |
else if (!quiet)
|
|
Packit |
2997f0 |
fprintf(stderr,
|
|
Packit |
2997f0 |
"%% Message delivered (%zd bytes, offset %"PRId64", "
|
|
Packit |
2997f0 |
"partition %"PRId32"): %.*s\n",
|
|
Packit |
2997f0 |
rkmessage->len, rkmessage->offset,
|
|
Packit |
2997f0 |
rkmessage->partition,
|
|
Packit |
2997f0 |
(int)rkmessage->len, (const char *)rkmessage->payload);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
static void msg_consume (rd_kafka_message_t *rkmessage,
|
|
Packit |
2997f0 |
void *opaque) {
|
|
Packit |
2997f0 |
if (rkmessage->err) {
|
|
Packit |
2997f0 |
if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
|
|
Packit |
2997f0 |
fprintf(stderr,
|
|
Packit |
2997f0 |
"%% Consumer reached end of %s [%"PRId32"] "
|
|
Packit |
2997f0 |
"message queue at offset %"PRId64"\n",
|
|
Packit |
2997f0 |
rd_kafka_topic_name(rkmessage->rkt),
|
|
Packit |
2997f0 |
rkmessage->partition, rkmessage->offset);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (exit_eof)
|
|
Packit |
2997f0 |
run = 0;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
return;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
fprintf(stderr, "%% Consume error for topic \"%s\" [%"PRId32"] "
|
|
Packit |
2997f0 |
"offset %"PRId64": %s\n",
|
|
Packit |
2997f0 |
rd_kafka_topic_name(rkmessage->rkt),
|
|
Packit |
2997f0 |
rkmessage->partition,
|
|
Packit |
2997f0 |
rkmessage->offset,
|
|
Packit |
2997f0 |
rd_kafka_message_errstr(rkmessage));
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION ||
|
|
Packit |
2997f0 |
rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC)
|
|
Packit |
2997f0 |
run = 0;
|
|
Packit |
2997f0 |
return;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (!quiet) {
|
|
Packit |
2997f0 |
rd_kafka_timestamp_type_t tstype;
|
|
Packit |
2997f0 |
int64_t timestamp;
|
|
Packit |
2997f0 |
rd_kafka_headers_t *hdrs;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
fprintf(stdout, "%% Message (offset %"PRId64", %zd bytes):\n",
|
|
Packit |
2997f0 |
rkmessage->offset, rkmessage->len);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
timestamp = rd_kafka_message_timestamp(rkmessage, &tstype);
|
|
Packit |
2997f0 |
if (tstype != RD_KAFKA_TIMESTAMP_NOT_AVAILABLE) {
|
|
Packit |
2997f0 |
const char *tsname = "?";
|
|
Packit |
2997f0 |
if (tstype == RD_KAFKA_TIMESTAMP_CREATE_TIME)
|
|
Packit |
2997f0 |
tsname = "create time";
|
|
Packit |
2997f0 |
else if (tstype == RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME)
|
|
Packit |
2997f0 |
tsname = "log append time";
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
fprintf(stdout, "%% Message timestamp: %s %"PRId64
|
|
Packit |
2997f0 |
" (%ds ago)\n",
|
|
Packit |
2997f0 |
tsname, timestamp,
|
|
Packit |
2997f0 |
!timestamp ? 0 :
|
|
Packit |
2997f0 |
(int)time(NULL) - (int)(timestamp/1000));
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (!rd_kafka_message_headers(rkmessage, &hdrs)) {
|
|
Packit |
2997f0 |
size_t idx = 0;
|
|
Packit |
2997f0 |
const char *name;
|
|
Packit |
2997f0 |
const void *val;
|
|
Packit |
2997f0 |
size_t size;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
fprintf(stdout, "%% Headers:");
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
while (!rd_kafka_header_get_all(hdrs, idx++,
|
|
Packit |
2997f0 |
&name, &val, &size)) {
|
|
Packit |
2997f0 |
fprintf(stdout, "%s%s=",
|
|
Packit |
2997f0 |
idx == 1 ? " " : ", ", name);
|
|
Packit |
2997f0 |
if (val)
|
|
Packit |
2997f0 |
fprintf(stdout, "\"%.*s\"",
|
|
Packit |
2997f0 |
(int)size, (const char *)val);
|
|
Packit |
2997f0 |
else
|
|
Packit |
2997f0 |
fprintf(stdout, "NULL");
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
fprintf(stdout, "\n");
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (rkmessage->key_len) {
|
|
Packit |
2997f0 |
if (output == OUTPUT_HEXDUMP)
|
|
Packit |
2997f0 |
hexdump(stdout, "Message Key",
|
|
Packit |
2997f0 |
rkmessage->key, rkmessage->key_len);
|
|
Packit |
2997f0 |
else
|
|
Packit |
2997f0 |
printf("Key: %.*s\n",
|
|
Packit |
2997f0 |
(int)rkmessage->key_len, (char *)rkmessage->key);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (output == OUTPUT_HEXDUMP)
|
|
Packit |
2997f0 |
hexdump(stdout, "Message Payload",
|
|
Packit |
2997f0 |
rkmessage->payload, rkmessage->len);
|
|
Packit |
2997f0 |
else
|
|
Packit |
2997f0 |
printf("%.*s\n",
|
|
Packit |
2997f0 |
(int)rkmessage->len, (char *)rkmessage->payload);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
static void metadata_print (const char *topic,
|
|
Packit |
2997f0 |
const struct rd_kafka_metadata *metadata) {
|
|
Packit |
2997f0 |
int i, j, k;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
printf("Metadata for %s (from broker %"PRId32": %s):\n",
|
|
Packit |
2997f0 |
topic ? : "all topics",
|
|
Packit |
2997f0 |
metadata->orig_broker_id,
|
|
Packit |
2997f0 |
metadata->orig_broker_name);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Iterate brokers */
|
|
Packit |
2997f0 |
printf(" %i brokers:\n", metadata->broker_cnt);
|
|
Packit |
2997f0 |
for (i = 0 ; i < metadata->broker_cnt ; i++)
|
|
Packit |
2997f0 |
printf(" broker %"PRId32" at %s:%i\n",
|
|
Packit |
2997f0 |
metadata->brokers[i].id,
|
|
Packit |
2997f0 |
metadata->brokers[i].host,
|
|
Packit |
2997f0 |
metadata->brokers[i].port);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Iterate topics */
|
|
Packit |
2997f0 |
printf(" %i topics:\n", metadata->topic_cnt);
|
|
Packit |
2997f0 |
for (i = 0 ; i < metadata->topic_cnt ; i++) {
|
|
Packit |
2997f0 |
const struct rd_kafka_metadata_topic *t = &metadata->topics[i];
|
|
Packit |
2997f0 |
printf(" topic \"%s\" with %i partitions:",
|
|
Packit |
2997f0 |
t->topic,
|
|
Packit |
2997f0 |
t->partition_cnt);
|
|
Packit |
2997f0 |
if (t->err) {
|
|
Packit |
2997f0 |
printf(" %s", rd_kafka_err2str(t->err));
|
|
Packit |
2997f0 |
if (t->err == RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE)
|
|
Packit |
2997f0 |
printf(" (try again)");
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
printf("\n");
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Iterate topic's partitions */
|
|
Packit |
2997f0 |
for (j = 0 ; j < t->partition_cnt ; j++) {
|
|
Packit |
2997f0 |
const struct rd_kafka_metadata_partition *p;
|
|
Packit |
2997f0 |
p = &t->partitions[j];
|
|
Packit |
2997f0 |
printf(" partition %"PRId32", "
|
|
Packit |
2997f0 |
"leader %"PRId32", replicas: ",
|
|
Packit |
2997f0 |
p->id, p->leader);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Iterate partition's replicas */
|
|
Packit |
2997f0 |
for (k = 0 ; k < p->replica_cnt ; k++)
|
|
Packit |
2997f0 |
printf("%s%"PRId32,
|
|
Packit |
2997f0 |
k > 0 ? ",":"", p->replicas[k]);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Iterate partition's ISRs */
|
|
Packit |
2997f0 |
printf(", isrs: ");
|
|
Packit |
2997f0 |
for (k = 0 ; k < p->isr_cnt ; k++)
|
|
Packit |
2997f0 |
printf("%s%"PRId32,
|
|
Packit |
2997f0 |
k > 0 ? ",":"", p->isrs[k]);
|
|
Packit |
2997f0 |
if (p->err)
|
|
Packit |
2997f0 |
printf(", %s\n", rd_kafka_err2str(p->err));
|
|
Packit |
2997f0 |
else
|
|
Packit |
2997f0 |
printf("\n");
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
static void sig_usr1 (int sig) {
|
|
Packit |
2997f0 |
rd_kafka_dump(stdout, rk);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
int main (int argc, char **argv) {
|
|
Packit |
2997f0 |
rd_kafka_topic_t *rkt;
|
|
Packit |
2997f0 |
char *brokers = "localhost:9092";
|
|
Packit |
2997f0 |
char mode = 'C';
|
|
Packit |
2997f0 |
char *topic = NULL;
|
|
Packit |
2997f0 |
int partition = RD_KAFKA_PARTITION_UA;
|
|
Packit |
2997f0 |
int opt;
|
|
Packit |
2997f0 |
rd_kafka_conf_t *conf;
|
|
Packit |
2997f0 |
rd_kafka_topic_conf_t *topic_conf;
|
|
Packit |
2997f0 |
char errstr[512];
|
|
Packit |
2997f0 |
int64_t start_offset = 0;
|
|
Packit |
2997f0 |
int report_offsets = 0;
|
|
Packit |
2997f0 |
int do_conf_dump = 0;
|
|
Packit |
2997f0 |
char tmp[16];
|
|
Packit |
2997f0 |
int64_t seek_offset = 0;
|
|
Packit |
2997f0 |
int64_t tmp_offset = 0;
|
|
Packit |
2997f0 |
int get_wmarks = 0;
|
|
Packit |
2997f0 |
rd_kafka_headers_t *hdrs = NULL;
|
|
Packit |
2997f0 |
rd_kafka_resp_err_t err;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Kafka configuration */
|
|
Packit |
2997f0 |
conf = rd_kafka_conf_new();
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Set logger */
|
|
Packit |
2997f0 |
rd_kafka_conf_set_log_cb(conf, logger);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Quick termination */
|
|
Packit |
2997f0 |
snprintf(tmp, sizeof(tmp), "%i", SIGIO);
|
|
Packit |
2997f0 |
rd_kafka_conf_set(conf, "internal.termination.signal", tmp, NULL, 0);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Topic configuration */
|
|
Packit |
2997f0 |
topic_conf = rd_kafka_topic_conf_new();
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
while ((opt = getopt(argc, argv, "PCLt:p:b:z:qd:o:eX:As:H:")) != -1) {
|
|
Packit |
2997f0 |
switch (opt) {
|
|
Packit |
2997f0 |
case 'P':
|
|
Packit |
2997f0 |
case 'C':
|
|
Packit |
2997f0 |
case 'L':
|
|
Packit |
2997f0 |
mode = opt;
|
|
Packit |
2997f0 |
break;
|
|
Packit |
2997f0 |
case 't':
|
|
Packit |
2997f0 |
topic = optarg;
|
|
Packit |
2997f0 |
break;
|
|
Packit |
2997f0 |
case 'p':
|
|
Packit |
2997f0 |
partition = atoi(optarg);
|
|
Packit |
2997f0 |
break;
|
|
Packit |
2997f0 |
case 'b':
|
|
Packit |
2997f0 |
brokers = optarg;
|
|
Packit |
2997f0 |
break;
|
|
Packit |
2997f0 |
case 'z':
|
|
Packit |
2997f0 |
if (rd_kafka_conf_set(conf, "compression.codec",
|
|
Packit |
2997f0 |
optarg,
|
|
Packit |
2997f0 |
errstr, sizeof(errstr)) !=
|
|
Packit |
2997f0 |
RD_KAFKA_CONF_OK) {
|
|
Packit |
2997f0 |
fprintf(stderr, "%% %s\n", errstr);
|
|
Packit |
2997f0 |
exit(1);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
break;
|
|
Packit |
2997f0 |
case 'o':
|
|
Packit |
2997f0 |
case 's':
|
|
Packit |
2997f0 |
if (!strcmp(optarg, "end"))
|
|
Packit |
2997f0 |
tmp_offset = RD_KAFKA_OFFSET_END;
|
|
Packit |
2997f0 |
else if (!strcmp(optarg, "beginning"))
|
|
Packit |
2997f0 |
tmp_offset = RD_KAFKA_OFFSET_BEGINNING;
|
|
Packit |
2997f0 |
else if (!strcmp(optarg, "stored"))
|
|
Packit |
2997f0 |
tmp_offset = RD_KAFKA_OFFSET_STORED;
|
|
Packit |
2997f0 |
else if (!strcmp(optarg, "report"))
|
|
Packit |
2997f0 |
report_offsets = 1;
|
|
Packit |
2997f0 |
else if (!strcmp(optarg, "wmark"))
|
|
Packit |
2997f0 |
get_wmarks = 1;
|
|
Packit |
2997f0 |
else {
|
|
Packit |
2997f0 |
tmp_offset = strtoll(optarg, NULL, 10);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (tmp_offset < 0)
|
|
Packit |
2997f0 |
tmp_offset = RD_KAFKA_OFFSET_TAIL(-tmp_offset);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (opt == 'o')
|
|
Packit |
2997f0 |
start_offset = tmp_offset;
|
|
Packit |
2997f0 |
else if (opt == 's')
|
|
Packit |
2997f0 |
seek_offset = tmp_offset;
|
|
Packit |
2997f0 |
break;
|
|
Packit |
2997f0 |
case 'e':
|
|
Packit |
2997f0 |
exit_eof = 1;
|
|
Packit |
2997f0 |
break;
|
|
Packit |
2997f0 |
case 'd':
|
|
Packit |
2997f0 |
if (rd_kafka_conf_set(conf, "debug", optarg,
|
|
Packit |
2997f0 |
errstr, sizeof(errstr)) !=
|
|
Packit |
2997f0 |
RD_KAFKA_CONF_OK) {
|
|
Packit |
2997f0 |
fprintf(stderr,
|
|
Packit |
2997f0 |
"%% Debug configuration failed: "
|
|
Packit |
2997f0 |
"%s: %s\n",
|
|
Packit |
2997f0 |
errstr, optarg);
|
|
Packit |
2997f0 |
exit(1);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
break;
|
|
Packit |
2997f0 |
case 'q':
|
|
Packit |
2997f0 |
quiet = 1;
|
|
Packit |
2997f0 |
break;
|
|
Packit |
2997f0 |
case 'A':
|
|
Packit |
2997f0 |
output = OUTPUT_RAW;
|
|
Packit |
2997f0 |
break;
|
|
Packit |
2997f0 |
case 'H':
|
|
Packit |
2997f0 |
{
|
|
Packit |
2997f0 |
char *name, *val;
|
|
Packit |
2997f0 |
size_t name_sz = -1;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
name = optarg;
|
|
Packit |
2997f0 |
val = strchr(name, '=');
|
|
Packit |
2997f0 |
if (val) {
|
|
Packit |
2997f0 |
name_sz = (size_t)(val-name);
|
|
Packit |
2997f0 |
val++; /* past the '=' */
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (!hdrs)
|
|
Packit |
2997f0 |
hdrs = rd_kafka_headers_new(8);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
err = rd_kafka_header_add(hdrs, name, name_sz, val, -1);
|
|
Packit |
2997f0 |
if (err) {
|
|
Packit |
2997f0 |
fprintf(stderr,
|
|
Packit |
2997f0 |
"%% Failed to add header %s: %s\n",
|
|
Packit |
2997f0 |
name, rd_kafka_err2str(err));
|
|
Packit |
2997f0 |
exit(1);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
break;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
case 'X':
|
|
Packit |
2997f0 |
{
|
|
Packit |
2997f0 |
char *name, *val;
|
|
Packit |
2997f0 |
rd_kafka_conf_res_t res;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (!strcmp(optarg, "list") ||
|
|
Packit |
2997f0 |
!strcmp(optarg, "help")) {
|
|
Packit |
2997f0 |
rd_kafka_conf_properties_show(stdout);
|
|
Packit |
2997f0 |
exit(0);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (!strcmp(optarg, "dump")) {
|
|
Packit |
2997f0 |
do_conf_dump = 1;
|
|
Packit |
2997f0 |
continue;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
name = optarg;
|
|
Packit |
2997f0 |
if (!(val = strchr(name, '='))) {
|
|
Packit |
2997f0 |
char dest[512];
|
|
Packit |
2997f0 |
size_t dest_size = sizeof(dest);
|
|
Packit |
2997f0 |
/* Return current value for property. */
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
res = RD_KAFKA_CONF_UNKNOWN;
|
|
Packit |
2997f0 |
if (!strncmp(name, "topic.", strlen("topic.")))
|
|
Packit |
2997f0 |
res = rd_kafka_topic_conf_get(
|
|
Packit |
2997f0 |
topic_conf,
|
|
Packit |
2997f0 |
name+strlen("topic."),
|
|
Packit |
2997f0 |
dest, &dest_size);
|
|
Packit |
2997f0 |
if (res == RD_KAFKA_CONF_UNKNOWN)
|
|
Packit |
2997f0 |
res = rd_kafka_conf_get(
|
|
Packit |
2997f0 |
conf, name, dest, &dest_size);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (res == RD_KAFKA_CONF_OK) {
|
|
Packit |
2997f0 |
printf("%s = %s\n", name, dest);
|
|
Packit |
2997f0 |
exit(0);
|
|
Packit |
2997f0 |
} else {
|
|
Packit |
2997f0 |
fprintf(stderr,
|
|
Packit |
2997f0 |
"%% %s property\n",
|
|
Packit |
2997f0 |
res == RD_KAFKA_CONF_UNKNOWN ?
|
|
Packit |
2997f0 |
"Unknown" : "Invalid");
|
|
Packit |
2997f0 |
exit(1);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
*val = '\0';
|
|
Packit |
2997f0 |
val++;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
res = RD_KAFKA_CONF_UNKNOWN;
|
|
Packit |
2997f0 |
/* Try "topic." prefixed properties on topic
|
|
Packit |
2997f0 |
* conf first, and then fall through to global if
|
|
Packit |
2997f0 |
* it didnt match a topic configuration property. */
|
|
Packit |
2997f0 |
if (!strncmp(name, "topic.", strlen("topic.")))
|
|
Packit |
2997f0 |
res = rd_kafka_topic_conf_set(topic_conf,
|
|
Packit |
2997f0 |
name+
|
|
Packit |
2997f0 |
strlen("topic."),
|
|
Packit |
2997f0 |
val,
|
|
Packit |
2997f0 |
errstr,
|
|
Packit |
2997f0 |
sizeof(errstr));
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (res == RD_KAFKA_CONF_UNKNOWN)
|
|
Packit |
2997f0 |
res = rd_kafka_conf_set(conf, name, val,
|
|
Packit |
2997f0 |
errstr, sizeof(errstr));
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (res != RD_KAFKA_CONF_OK) {
|
|
Packit |
2997f0 |
fprintf(stderr, "%% %s\n", errstr);
|
|
Packit |
2997f0 |
exit(1);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
break;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
default:
|
|
Packit |
2997f0 |
goto usage;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (do_conf_dump) {
|
|
Packit |
2997f0 |
const char **arr;
|
|
Packit |
2997f0 |
size_t cnt;
|
|
Packit |
2997f0 |
int pass;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
for (pass = 0 ; pass < 2 ; pass++) {
|
|
Packit |
2997f0 |
int i;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (pass == 0) {
|
|
Packit |
2997f0 |
arr = rd_kafka_conf_dump(conf, &cnt);
|
|
Packit |
2997f0 |
printf("# Global config\n");
|
|
Packit |
2997f0 |
} else {
|
|
Packit |
2997f0 |
printf("# Topic config\n");
|
|
Packit |
2997f0 |
arr = rd_kafka_topic_conf_dump(topic_conf,
|
|
Packit |
2997f0 |
&cnt);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
for (i = 0 ; i < (int)cnt ; i += 2)
|
|
Packit |
2997f0 |
printf("%s = %s\n",
|
|
Packit |
2997f0 |
arr[i], arr[i+1]);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
printf("\n");
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
rd_kafka_conf_dump_free(arr, cnt);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
exit(0);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (optind != argc || (mode != 'L' && !topic)) {
|
|
Packit |
2997f0 |
usage:
|
|
Packit |
2997f0 |
fprintf(stderr,
|
|
Packit |
2997f0 |
"Usage: %s -C|-P|-L -t <topic> "
|
|
Packit |
2997f0 |
"[-p <partition>] [-b <host1:port1,host2:port2,..>]\n"
|
|
Packit |
2997f0 |
"\n"
|
|
Packit |
2997f0 |
"librdkafka version %s (0x%08x)\n"
|
|
Packit |
2997f0 |
"\n"
|
|
Packit |
2997f0 |
" Options:\n"
|
|
Packit |
2997f0 |
" -C | -P Consumer or Producer mode\n"
|
|
Packit |
2997f0 |
" -L Metadata list mode\n"
|
|
Packit |
2997f0 |
" -t <topic> Topic to fetch / produce\n"
|
|
Packit |
2997f0 |
" -p <num> Partition (random partitioner)\n"
|
|
Packit |
2997f0 |
" -b <brokers> Broker address (localhost:9092)\n"
|
|
Packit |
2997f0 |
" -z <codec> Enable compression:\n"
|
|
Packit |
2997f0 |
" none|gzip|snappy\n"
|
|
Packit |
2997f0 |
" -o <offset> Start offset (consumer):\n"
|
|
Packit |
2997f0 |
" beginning, end, NNNNN or -NNNNN\n"
|
|
Packit |
2997f0 |
" wmark returns the current hi&lo "
|
|
Packit |
2997f0 |
"watermarks.\n"
|
|
Packit |
2997f0 |
" -o report Report message offsets (producer)\n"
|
|
Packit |
2997f0 |
" -e Exit consumer when last message\n"
|
|
Packit |
2997f0 |
" in partition has been received.\n"
|
|
Packit |
2997f0 |
" -d [facs..] Enable debugging contexts:\n"
|
|
Packit |
2997f0 |
" %s\n"
|
|
Packit |
2997f0 |
" -q Be quiet\n"
|
|
Packit |
2997f0 |
" -A Raw payload output (consumer)\n"
|
|
Packit |
2997f0 |
" -H <name[=value]> Add header to message (producer)\n"
|
|
Packit |
2997f0 |
" -X <prop=name> Set arbitrary librdkafka "
|
|
Packit |
2997f0 |
"configuration property\n"
|
|
Packit |
2997f0 |
" Properties prefixed with \"topic.\" "
|
|
Packit |
2997f0 |
"will be set on topic object.\n"
|
|
Packit |
2997f0 |
" -X list Show full list of supported "
|
|
Packit |
2997f0 |
"properties.\n"
|
|
Packit |
2997f0 |
" -X <prop> Get single property value\n"
|
|
Packit |
2997f0 |
"\n"
|
|
Packit |
2997f0 |
" In Consumer mode:\n"
|
|
Packit |
2997f0 |
" writes fetched messages to stdout\n"
|
|
Packit |
2997f0 |
" In Producer mode:\n"
|
|
Packit |
2997f0 |
" reads messages from stdin and sends to broker\n"
|
|
Packit |
2997f0 |
" In List mode:\n"
|
|
Packit |
2997f0 |
" queries broker for metadata information, "
|
|
Packit |
2997f0 |
"topic is optional.\n"
|
|
Packit |
2997f0 |
"\n"
|
|
Packit |
2997f0 |
"\n"
|
|
Packit |
2997f0 |
"\n",
|
|
Packit |
2997f0 |
argv[0],
|
|
Packit |
2997f0 |
rd_kafka_version_str(), rd_kafka_version(),
|
|
Packit |
2997f0 |
RD_KAFKA_DEBUG_CONTEXTS);
|
|
Packit |
2997f0 |
exit(1);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if ((mode == 'C' && !isatty(STDIN_FILENO)) ||
|
|
Packit |
2997f0 |
(mode != 'C' && !isatty(STDOUT_FILENO)))
|
|
Packit |
2997f0 |
quiet = 1;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
signal(SIGINT, stop);
|
|
Packit |
2997f0 |
signal(SIGUSR1, sig_usr1);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (mode == 'P') {
|
|
Packit |
2997f0 |
/*
|
|
Packit |
2997f0 |
* Producer
|
|
Packit |
2997f0 |
*/
|
|
Packit |
2997f0 |
char buf[2048];
|
|
Packit |
2997f0 |
int sendcnt = 0;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Set up a message delivery report callback.
|
|
Packit |
2997f0 |
* It will be called once for each message, either on successful
|
|
Packit |
2997f0 |
* delivery to broker, or upon failure to deliver to broker. */
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* If offset reporting (-o report) is enabled, use the
|
|
Packit |
2997f0 |
* richer dr_msg_cb instead. */
|
|
Packit |
2997f0 |
if (report_offsets) {
|
|
Packit |
2997f0 |
rd_kafka_topic_conf_set(topic_conf,
|
|
Packit |
2997f0 |
"produce.offset.report",
|
|
Packit |
2997f0 |
"true", errstr, sizeof(errstr));
|
|
Packit |
2997f0 |
rd_kafka_conf_set_dr_msg_cb(conf, msg_delivered2);
|
|
Packit |
2997f0 |
} else
|
|
Packit |
2997f0 |
rd_kafka_conf_set_dr_cb(conf, msg_delivered);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Create Kafka handle */
|
|
Packit |
2997f0 |
if (!(rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf,
|
|
Packit |
2997f0 |
errstr, sizeof(errstr)))) {
|
|
Packit |
2997f0 |
fprintf(stderr,
|
|
Packit |
2997f0 |
"%% Failed to create new producer: %s\n",
|
|
Packit |
2997f0 |
errstr);
|
|
Packit |
2997f0 |
exit(1);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Add brokers */
|
|
Packit |
2997f0 |
if (rd_kafka_brokers_add(rk, brokers) == 0) {
|
|
Packit |
2997f0 |
fprintf(stderr, "%% No valid brokers specified\n");
|
|
Packit |
2997f0 |
exit(1);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Create topic */
|
|
Packit |
2997f0 |
rkt = rd_kafka_topic_new(rk, topic, topic_conf);
|
|
Packit |
2997f0 |
topic_conf = NULL; /* Now owned by topic */
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (!quiet)
|
|
Packit |
2997f0 |
fprintf(stderr,
|
|
Packit |
2997f0 |
"%% Type stuff and hit enter to send\n");
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
while (run && fgets(buf, sizeof(buf), stdin)) {
|
|
Packit |
2997f0 |
size_t len = strlen(buf);
|
|
Packit |
2997f0 |
if (buf[len-1] == '\n')
|
|
Packit |
2997f0 |
buf[--len] = '\0';
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Send/Produce message. */
|
|
Packit |
2997f0 |
if (hdrs) {
|
|
Packit |
2997f0 |
rd_kafka_headers_t *hdrs_copy;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
hdrs_copy = rd_kafka_headers_copy(hdrs);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
err = rd_kafka_producev(
|
|
Packit |
2997f0 |
rk,
|
|
Packit |
2997f0 |
RD_KAFKA_V_RKT(rkt),
|
|
Packit |
2997f0 |
RD_KAFKA_V_PARTITION(partition),
|
|
Packit |
2997f0 |
RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
|
|
Packit |
2997f0 |
RD_KAFKA_V_VALUE(buf, len),
|
|
Packit |
2997f0 |
RD_KAFKA_V_HEADERS(hdrs_copy),
|
|
Packit |
2997f0 |
RD_KAFKA_V_END);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (err)
|
|
Packit |
2997f0 |
rd_kafka_headers_destroy(hdrs_copy);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
} else {
|
|
Packit |
2997f0 |
if (rd_kafka_produce(
|
|
Packit |
2997f0 |
rkt, partition,
|
|
Packit |
2997f0 |
RD_KAFKA_MSG_F_COPY,
|
|
Packit |
2997f0 |
/* Payload and length */
|
|
Packit |
2997f0 |
buf, len,
|
|
Packit |
2997f0 |
/* Optional key and its length */
|
|
Packit |
2997f0 |
NULL, 0,
|
|
Packit |
2997f0 |
/* Message opaque, provided in
|
|
Packit |
2997f0 |
* delivery report callback as
|
|
Packit |
2997f0 |
* msg_opaque. */
|
|
Packit |
2997f0 |
NULL) == -1) {
|
|
Packit |
2997f0 |
err = rd_kafka_last_error();
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (err) {
|
|
Packit |
2997f0 |
fprintf(stderr,
|
|
Packit |
2997f0 |
"%% Failed to produce to topic %s "
|
|
Packit |
2997f0 |
"partition %i: %s\n",
|
|
Packit |
2997f0 |
rd_kafka_topic_name(rkt), partition,
|
|
Packit |
2997f0 |
rd_kafka_err2str(err));
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Poll to handle delivery reports */
|
|
Packit |
2997f0 |
rd_kafka_poll(rk, 0);
|
|
Packit |
2997f0 |
continue;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (!quiet)
|
|
Packit |
2997f0 |
fprintf(stderr, "%% Sent %zd bytes to topic "
|
|
Packit |
2997f0 |
"%s partition %i\n",
|
|
Packit |
2997f0 |
len, rd_kafka_topic_name(rkt), partition);
|
|
Packit |
2997f0 |
sendcnt++;
|
|
Packit |
2997f0 |
/* Poll to handle delivery reports */
|
|
Packit |
2997f0 |
rd_kafka_poll(rk, 0);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Poll to handle delivery reports */
|
|
Packit |
2997f0 |
rd_kafka_poll(rk, 0);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Wait for messages to be delivered */
|
|
Packit |
2997f0 |
while (run && rd_kafka_outq_len(rk) > 0)
|
|
Packit |
2997f0 |
rd_kafka_poll(rk, 100);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Destroy topic */
|
|
Packit |
2997f0 |
rd_kafka_topic_destroy(rkt);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Destroy the handle */
|
|
Packit |
2997f0 |
rd_kafka_destroy(rk);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
} else if (mode == 'C') {
|
|
Packit |
2997f0 |
/*
|
|
Packit |
2997f0 |
* Consumer
|
|
Packit |
2997f0 |
*/
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Create Kafka handle */
|
|
Packit |
2997f0 |
if (!(rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf,
|
|
Packit |
2997f0 |
errstr, sizeof(errstr)))) {
|
|
Packit |
2997f0 |
fprintf(stderr,
|
|
Packit |
2997f0 |
"%% Failed to create new consumer: %s\n",
|
|
Packit |
2997f0 |
errstr);
|
|
Packit |
2997f0 |
exit(1);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Add brokers */
|
|
Packit |
2997f0 |
if (rd_kafka_brokers_add(rk, brokers) == 0) {
|
|
Packit |
2997f0 |
fprintf(stderr, "%% No valid brokers specified\n");
|
|
Packit |
2997f0 |
exit(1);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (get_wmarks) {
|
|
Packit |
2997f0 |
int64_t lo, hi;
|
|
Packit |
2997f0 |
rd_kafka_resp_err_t err;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Only query for hi&lo partition watermarks */
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if ((err = rd_kafka_query_watermark_offsets(
|
|
Packit |
2997f0 |
rk, topic, partition, &lo, &hi, 5000))) {
|
|
Packit |
2997f0 |
fprintf(stderr, "%% query_watermark_offsets() "
|
|
Packit |
2997f0 |
"failed: %s\n",
|
|
Packit |
2997f0 |
rd_kafka_err2str(err));
|
|
Packit |
2997f0 |
exit(1);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
printf("%s [%d]: low - high offsets: "
|
|
Packit |
2997f0 |
"%"PRId64" - %"PRId64"\n",
|
|
Packit |
2997f0 |
topic, partition, lo, hi);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
rd_kafka_destroy(rk);
|
|
Packit |
2997f0 |
exit(0);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Create topic */
|
|
Packit |
2997f0 |
rkt = rd_kafka_topic_new(rk, topic, topic_conf);
|
|
Packit |
2997f0 |
topic_conf = NULL; /* Now owned by topic */
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Start consuming */
|
|
Packit |
2997f0 |
if (rd_kafka_consume_start(rkt, partition, start_offset) == -1){
|
|
Packit |
2997f0 |
rd_kafka_resp_err_t err = rd_kafka_last_error();
|
|
Packit |
2997f0 |
fprintf(stderr, "%% Failed to start consuming: %s\n",
|
|
Packit |
2997f0 |
rd_kafka_err2str(err));
|
|
Packit |
2997f0 |
if (err == RD_KAFKA_RESP_ERR__INVALID_ARG)
|
|
Packit |
2997f0 |
fprintf(stderr,
|
|
Packit |
2997f0 |
"%% Broker based offset storage "
|
|
Packit |
2997f0 |
"requires a group.id, "
|
|
Packit |
2997f0 |
"add: -X group.id=yourGroup\n");
|
|
Packit |
2997f0 |
exit(1);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
while (run) {
|
|
Packit |
2997f0 |
rd_kafka_message_t *rkmessage;
|
|
Packit |
2997f0 |
rd_kafka_resp_err_t err;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Poll for errors, etc. */
|
|
Packit |
2997f0 |
rd_kafka_poll(rk, 0);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Consume single message.
|
|
Packit |
2997f0 |
* See rdkafka_performance.c for high speed
|
|
Packit |
2997f0 |
* consuming of messages. */
|
|
Packit |
2997f0 |
rkmessage = rd_kafka_consume(rkt, partition, 1000);
|
|
Packit |
2997f0 |
if (!rkmessage) /* timeout */
|
|
Packit |
2997f0 |
continue;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
msg_consume(rkmessage, NULL);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Return message to rdkafka */
|
|
Packit |
2997f0 |
rd_kafka_message_destroy(rkmessage);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (seek_offset) {
|
|
Packit |
2997f0 |
err = rd_kafka_seek(rkt, partition, seek_offset,
|
|
Packit |
2997f0 |
2000);
|
|
Packit |
2997f0 |
if (err)
|
|
Packit |
2997f0 |
printf("Seek failed: %s\n",
|
|
Packit |
2997f0 |
rd_kafka_err2str(err));
|
|
Packit |
2997f0 |
else
|
|
Packit |
2997f0 |
printf("Seeked to %"PRId64"\n",
|
|
Packit |
2997f0 |
seek_offset);
|
|
Packit |
2997f0 |
seek_offset = 0;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Stop consuming */
|
|
Packit |
2997f0 |
rd_kafka_consume_stop(rkt, partition);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
while (rd_kafka_outq_len(rk) > 0)
|
|
Packit |
2997f0 |
rd_kafka_poll(rk, 10);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Destroy topic */
|
|
Packit |
2997f0 |
rd_kafka_topic_destroy(rkt);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Destroy handle */
|
|
Packit |
2997f0 |
rd_kafka_destroy(rk);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
} else if (mode == 'L') {
|
|
Packit |
2997f0 |
rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Create Kafka handle */
|
|
Packit |
2997f0 |
if (!(rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf,
|
|
Packit |
2997f0 |
errstr, sizeof(errstr)))) {
|
|
Packit |
2997f0 |
fprintf(stderr,
|
|
Packit |
2997f0 |
"%% Failed to create new producer: %s\n",
|
|
Packit |
2997f0 |
errstr);
|
|
Packit |
2997f0 |
exit(1);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Add brokers */
|
|
Packit |
2997f0 |
if (rd_kafka_brokers_add(rk, brokers) == 0) {
|
|
Packit |
2997f0 |
fprintf(stderr, "%% No valid brokers specified\n");
|
|
Packit |
2997f0 |
exit(1);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Create topic */
|
|
Packit |
2997f0 |
if (topic) {
|
|
Packit |
2997f0 |
rkt = rd_kafka_topic_new(rk, topic, topic_conf);
|
|
Packit |
2997f0 |
topic_conf = NULL; /* Now owned by topic */
|
|
Packit |
2997f0 |
} else
|
|
Packit |
2997f0 |
rkt = NULL;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
while (run) {
|
|
Packit |
2997f0 |
const struct rd_kafka_metadata *metadata;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Fetch metadata */
|
|
Packit |
2997f0 |
err = rd_kafka_metadata(rk, rkt ? 0 : 1, rkt,
|
|
Packit |
2997f0 |
&metadata, 5000);
|
|
Packit |
2997f0 |
if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
|
|
Packit |
2997f0 |
fprintf(stderr,
|
|
Packit |
2997f0 |
"%% Failed to acquire metadata: %s\n",
|
|
Packit |
2997f0 |
rd_kafka_err2str(err));
|
|
Packit |
2997f0 |
run = 0;
|
|
Packit |
2997f0 |
break;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
metadata_print(topic, metadata);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
rd_kafka_metadata_destroy(metadata);
|
|
Packit |
2997f0 |
run = 0;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Destroy topic */
|
|
Packit |
2997f0 |
if (rkt)
|
|
Packit |
2997f0 |
rd_kafka_topic_destroy(rkt);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Destroy the handle */
|
|
Packit |
2997f0 |
rd_kafka_destroy(rk);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (topic_conf)
|
|
Packit |
2997f0 |
rd_kafka_topic_conf_destroy(topic_conf);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Exit right away, dont wait for background cleanup, we haven't
|
|
Packit |
2997f0 |
* done anything important anyway. */
|
|
Packit |
2997f0 |
exit(err ? 2 : 0);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (hdrs)
|
|
Packit |
2997f0 |
rd_kafka_headers_destroy(hdrs);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (topic_conf)
|
|
Packit |
2997f0 |
rd_kafka_topic_conf_destroy(topic_conf);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Let background threads clean up and terminate cleanly. */
|
|
Packit |
2997f0 |
run = 5;
|
|
Packit |
2997f0 |
while (run-- > 0 && rd_kafka_wait_destroyed(1000) == -1)
|
|
Packit |
2997f0 |
printf("Waiting for librdkafka to decommission\n");
|
|
Packit |
2997f0 |
if (run <= 0)
|
|
Packit |
2997f0 |
rd_kafka_dump(stdout, rk);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
return 0;
|
|
Packit |
2997f0 |
}
|