|
Packit |
2997f0 |
/*
|
|
Packit |
2997f0 |
* librdkafka - Apache Kafka C library
|
|
Packit |
2997f0 |
*
|
|
Packit |
2997f0 |
* Copyright (c) 2015, Magnus Edenhill
|
|
Packit |
2997f0 |
* All rights reserved.
|
|
Packit |
2997f0 |
*
|
|
Packit |
2997f0 |
* Redistribution and use in source and binary forms, with or without
|
|
Packit |
2997f0 |
* modification, are permitted provided that the following conditions are met:
|
|
Packit |
2997f0 |
*
|
|
Packit |
2997f0 |
* 1. Redistributions of source code must retain the above copyright notice,
|
|
Packit |
2997f0 |
* this list of conditions and the following disclaimer.
|
|
Packit |
2997f0 |
* 2. Redistributions in binary form must reproduce the above copyright notice,
|
|
Packit |
2997f0 |
* this list of conditions and the following disclaimer in the documentation
|
|
Packit |
2997f0 |
* and/or other materials provided with the distribution.
|
|
Packit |
2997f0 |
*
|
|
Packit |
2997f0 |
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
|
Packit |
2997f0 |
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
|
Packit |
2997f0 |
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
|
Packit |
2997f0 |
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
|
|
Packit |
2997f0 |
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
|
|
Packit |
2997f0 |
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
|
|
Packit |
2997f0 |
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
|
Packit |
2997f0 |
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
|
|
Packit |
2997f0 |
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
|
|
Packit |
2997f0 |
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
|
Packit |
2997f0 |
* POSSIBILITY OF SUCH DAMAGE.
|
|
Packit |
2997f0 |
*/
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/**
|
|
Packit |
2997f0 |
* Apache Kafka high level consumer example program
|
|
Packit |
2997f0 |
* using the Kafka driver from librdkafka
|
|
Packit |
2997f0 |
* (https://github.com/edenhill/librdkafka)
|
|
Packit |
2997f0 |
*/
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
#include <ctype.h>
|
|
Packit |
2997f0 |
#include <signal.h>
|
|
Packit |
2997f0 |
#include <string.h>
|
|
Packit |
2997f0 |
#include <unistd.h>
|
|
Packit |
2997f0 |
#include <stdlib.h>
|
|
Packit |
2997f0 |
#include <syslog.h>
|
|
Packit |
2997f0 |
#include <sys/time.h>
|
|
Packit |
2997f0 |
#include <errno.h>
|
|
Packit |
2997f0 |
#include <getopt.h>
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Typical include path would be <librdkafka/rdkafka.h>, but this program
|
|
Packit |
2997f0 |
* is builtin from within the librdkafka source tree and thus differs. */
|
|
Packit |
2997f0 |
#include "rdkafka.h" /* for Kafka driver */
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
static int run = 1;
|
|
Packit |
2997f0 |
static rd_kafka_t *rk;
|
|
Packit |
2997f0 |
static int exit_eof = 0;
|
|
Packit |
2997f0 |
static int wait_eof = 0; /* number of partitions awaiting EOF */
|
|
Packit |
2997f0 |
static int quiet = 0;
|
|
Packit |
2997f0 |
static enum {
|
|
Packit |
2997f0 |
OUTPUT_HEXDUMP,
|
|
Packit |
2997f0 |
OUTPUT_RAW,
|
|
Packit |
2997f0 |
} output = OUTPUT_HEXDUMP;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
static void stop (int sig) {
|
|
Packit |
2997f0 |
if (!run)
|
|
Packit |
2997f0 |
exit(1);
|
|
Packit |
2997f0 |
run = 0;
|
|
Packit |
2997f0 |
fclose(stdin); /* abort fgets() */
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
static void hexdump (FILE *fp, const char *name, const void *ptr, size_t len) {
|
|
Packit |
2997f0 |
const char *p = (const char *)ptr;
|
|
Packit |
2997f0 |
unsigned int of = 0;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (name)
|
|
Packit |
2997f0 |
fprintf(fp, "%s hexdump (%zd bytes):\n", name, len);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
for (of = 0 ; of < len ; of += 16) {
|
|
Packit |
2997f0 |
char hexen[16*3+1];
|
|
Packit |
2997f0 |
char charen[16+1];
|
|
Packit |
2997f0 |
int hof = 0;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
int cof = 0;
|
|
Packit |
2997f0 |
int i;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
for (i = of ; i < (int)of + 16 && i < (int)len ; i++) {
|
|
Packit |
2997f0 |
hof += sprintf(hexen+hof, "%02x ", p[i] & 0xff);
|
|
Packit |
2997f0 |
cof += sprintf(charen+cof, "%c",
|
|
Packit |
2997f0 |
isprint((int)p[i]) ? p[i] : '.');
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
fprintf(fp, "%08x: %-48s %-16s\n",
|
|
Packit |
2997f0 |
of, hexen, charen);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/**
|
|
Packit |
2997f0 |
* Kafka logger callback (optional)
|
|
Packit |
2997f0 |
*/
|
|
Packit |
2997f0 |
static void logger (const rd_kafka_t *rk, int level,
|
|
Packit |
2997f0 |
const char *fac, const char *buf) {
|
|
Packit |
2997f0 |
struct timeval tv;
|
|
Packit |
2997f0 |
gettimeofday(&tv, NULL);
|
|
Packit |
2997f0 |
fprintf(stdout, "%u.%03u RDKAFKA-%i-%s: %s: %s\n",
|
|
Packit |
2997f0 |
(int)tv.tv_sec, (int)(tv.tv_usec / 1000),
|
|
Packit |
2997f0 |
level, fac, rd_kafka_name(rk), buf);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/**
|
|
Packit |
2997f0 |
* Handle and print a consumed message.
|
|
Packit |
2997f0 |
* Internally crafted messages are also used to propagate state from
|
|
Packit |
2997f0 |
* librdkafka to the application. The application needs to check
|
|
Packit |
2997f0 |
* the `rkmessage->err` field for this purpose.
|
|
Packit |
2997f0 |
*/
|
|
Packit |
2997f0 |
static void msg_consume (rd_kafka_message_t *rkmessage) {
|
|
Packit |
2997f0 |
if (rkmessage->err) {
|
|
Packit |
2997f0 |
if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
|
|
Packit |
2997f0 |
fprintf(stderr,
|
|
Packit |
2997f0 |
"%% Consumer reached end of %s [%"PRId32"] "
|
|
Packit |
2997f0 |
"message queue at offset %"PRId64"\n",
|
|
Packit |
2997f0 |
rd_kafka_topic_name(rkmessage->rkt),
|
|
Packit |
2997f0 |
rkmessage->partition, rkmessage->offset);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (exit_eof && --wait_eof == 0) {
|
|
Packit |
2997f0 |
fprintf(stderr,
|
|
Packit |
2997f0 |
"%% All partition(s) reached EOF: "
|
|
Packit |
2997f0 |
"exiting\n");
|
|
Packit |
2997f0 |
run = 0;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
return;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (rkmessage->rkt)
|
|
Packit |
2997f0 |
fprintf(stderr, "%% Consume error for "
|
|
Packit |
2997f0 |
"topic \"%s\" [%"PRId32"] "
|
|
Packit |
2997f0 |
"offset %"PRId64": %s\n",
|
|
Packit |
2997f0 |
rd_kafka_topic_name(rkmessage->rkt),
|
|
Packit |
2997f0 |
rkmessage->partition,
|
|
Packit |
2997f0 |
rkmessage->offset,
|
|
Packit |
2997f0 |
rd_kafka_message_errstr(rkmessage));
|
|
Packit |
2997f0 |
else
|
|
Packit |
2997f0 |
fprintf(stderr, "%% Consumer error: %s: %s\n",
|
|
Packit |
2997f0 |
rd_kafka_err2str(rkmessage->err),
|
|
Packit |
2997f0 |
rd_kafka_message_errstr(rkmessage));
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION ||
|
|
Packit |
2997f0 |
rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC)
|
|
Packit |
2997f0 |
run = 0;
|
|
Packit |
2997f0 |
return;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (!quiet)
|
|
Packit |
2997f0 |
fprintf(stdout, "%% Message (topic %s [%"PRId32"], "
|
|
Packit |
2997f0 |
"offset %"PRId64", %zd bytes):\n",
|
|
Packit |
2997f0 |
rd_kafka_topic_name(rkmessage->rkt),
|
|
Packit |
2997f0 |
rkmessage->partition,
|
|
Packit |
2997f0 |
rkmessage->offset, rkmessage->len);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (rkmessage->key_len) {
|
|
Packit |
2997f0 |
if (output == OUTPUT_HEXDUMP)
|
|
Packit |
2997f0 |
hexdump(stdout, "Message Key",
|
|
Packit |
2997f0 |
rkmessage->key, rkmessage->key_len);
|
|
Packit |
2997f0 |
else
|
|
Packit |
2997f0 |
printf("Key: %.*s\n",
|
|
Packit |
2997f0 |
(int)rkmessage->key_len, (char *)rkmessage->key);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (output == OUTPUT_HEXDUMP)
|
|
Packit |
2997f0 |
hexdump(stdout, "Message Payload",
|
|
Packit |
2997f0 |
rkmessage->payload, rkmessage->len);
|
|
Packit |
2997f0 |
else
|
|
Packit |
2997f0 |
printf("%.*s\n",
|
|
Packit |
2997f0 |
(int)rkmessage->len, (char *)rkmessage->payload);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
static void print_partition_list (FILE *fp,
|
|
Packit |
2997f0 |
const rd_kafka_topic_partition_list_t
|
|
Packit |
2997f0 |
*partitions) {
|
|
Packit |
2997f0 |
int i;
|
|
Packit |
2997f0 |
for (i = 0 ; i < partitions->cnt ; i++) {
|
|
Packit |
2997f0 |
fprintf(stderr, "%s %s [%"PRId32"] offset %"PRId64,
|
|
Packit |
2997f0 |
i > 0 ? ",":"",
|
|
Packit |
2997f0 |
partitions->elems[i].topic,
|
|
Packit |
2997f0 |
partitions->elems[i].partition,
|
|
Packit |
2997f0 |
partitions->elems[i].offset);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
fprintf(stderr, "\n");
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
static void rebalance_cb (rd_kafka_t *rk,
|
|
Packit |
2997f0 |
rd_kafka_resp_err_t err,
|
|
Packit |
2997f0 |
rd_kafka_topic_partition_list_t *partitions,
|
|
Packit |
2997f0 |
void *opaque) {
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
fprintf(stderr, "%% Consumer group rebalanced: ");
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
switch (err)
|
|
Packit |
2997f0 |
{
|
|
Packit |
2997f0 |
case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
|
|
Packit |
2997f0 |
fprintf(stderr, "assigned:\n");
|
|
Packit |
2997f0 |
print_partition_list(stderr, partitions);
|
|
Packit |
2997f0 |
rd_kafka_assign(rk, partitions);
|
|
Packit |
2997f0 |
wait_eof += partitions->cnt;
|
|
Packit |
2997f0 |
break;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
|
|
Packit |
2997f0 |
fprintf(stderr, "revoked:\n");
|
|
Packit |
2997f0 |
print_partition_list(stderr, partitions);
|
|
Packit |
2997f0 |
rd_kafka_assign(rk, NULL);
|
|
Packit |
2997f0 |
wait_eof = 0;
|
|
Packit |
2997f0 |
break;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
default:
|
|
Packit |
2997f0 |
fprintf(stderr, "failed: %s\n",
|
|
Packit |
2997f0 |
rd_kafka_err2str(err));
|
|
Packit |
2997f0 |
rd_kafka_assign(rk, NULL);
|
|
Packit |
2997f0 |
break;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
static int describe_groups (rd_kafka_t *rk, const char *group) {
|
|
Packit |
2997f0 |
rd_kafka_resp_err_t err;
|
|
Packit |
2997f0 |
const struct rd_kafka_group_list *grplist;
|
|
Packit |
2997f0 |
int i;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
err = rd_kafka_list_groups(rk, group, &grplist, 10000);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (err) {
|
|
Packit |
2997f0 |
fprintf(stderr, "%% Failed to acquire group list: %s\n",
|
|
Packit |
2997f0 |
rd_kafka_err2str(err));
|
|
Packit |
2997f0 |
return -1;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
for (i = 0 ; i < grplist->group_cnt ; i++) {
|
|
Packit |
2997f0 |
const struct rd_kafka_group_info *gi = &grplist->groups[i];
|
|
Packit |
2997f0 |
int j;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
printf("Group \"%s\" in state %s on broker %d (%s:%d)\n",
|
|
Packit |
2997f0 |
gi->group, gi->state,
|
|
Packit |
2997f0 |
gi->broker.id, gi->broker.host, gi->broker.port);
|
|
Packit |
2997f0 |
if (gi->err)
|
|
Packit |
2997f0 |
printf(" Error: %s\n", rd_kafka_err2str(gi->err));
|
|
Packit |
2997f0 |
printf(" Protocol type \"%s\", protocol \"%s\", "
|
|
Packit |
2997f0 |
"with %d member(s):\n",
|
|
Packit |
2997f0 |
gi->protocol_type, gi->protocol, gi->member_cnt);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
for (j = 0 ; j < gi->member_cnt ; j++) {
|
|
Packit |
2997f0 |
const struct rd_kafka_group_member_info *mi;
|
|
Packit |
2997f0 |
mi = &gi->members[j];
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
printf(" \"%s\", client id \"%s\" on host %s\n",
|
|
Packit |
2997f0 |
mi->member_id, mi->client_id, mi->client_host);
|
|
Packit |
2997f0 |
printf(" metadata: %d bytes\n",
|
|
Packit |
2997f0 |
mi->member_metadata_size);
|
|
Packit |
2997f0 |
printf(" assignment: %d bytes\n",
|
|
Packit |
2997f0 |
mi->member_assignment_size);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
printf("\n");
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (group && !grplist->group_cnt)
|
|
Packit |
2997f0 |
fprintf(stderr, "%% No matching group (%s)\n", group);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
rd_kafka_group_list_destroy(grplist);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
return 0;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
static void sig_usr1 (int sig) {
|
|
Packit |
2997f0 |
rd_kafka_dump(stdout, rk);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
int main (int argc, char **argv) {
|
|
Packit |
2997f0 |
char mode = 'C';
|
|
Packit |
2997f0 |
char *brokers = "localhost:9092";
|
|
Packit |
2997f0 |
int opt;
|
|
Packit |
2997f0 |
rd_kafka_conf_t *conf;
|
|
Packit |
2997f0 |
rd_kafka_topic_conf_t *topic_conf;
|
|
Packit |
2997f0 |
char errstr[512];
|
|
Packit |
2997f0 |
const char *debug = NULL;
|
|
Packit |
2997f0 |
int do_conf_dump = 0;
|
|
Packit |
2997f0 |
char tmp[16];
|
|
Packit |
2997f0 |
rd_kafka_resp_err_t err;
|
|
Packit |
2997f0 |
char *group = NULL;
|
|
Packit |
2997f0 |
rd_kafka_topic_partition_list_t *topics;
|
|
Packit |
2997f0 |
int is_subscription;
|
|
Packit |
2997f0 |
int i;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
quiet = !isatty(STDIN_FILENO);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Kafka configuration */
|
|
Packit |
2997f0 |
conf = rd_kafka_conf_new();
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Set logger */
|
|
Packit |
2997f0 |
rd_kafka_conf_set_log_cb(conf, logger);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Quick termination */
|
|
Packit |
2997f0 |
snprintf(tmp, sizeof(tmp), "%i", SIGIO);
|
|
Packit |
2997f0 |
rd_kafka_conf_set(conf, "internal.termination.signal", tmp, NULL, 0);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Topic configuration */
|
|
Packit |
2997f0 |
topic_conf = rd_kafka_topic_conf_new();
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
while ((opt = getopt(argc, argv, "g:b:qd:eX:ADO")) != -1) {
|
|
Packit |
2997f0 |
switch (opt) {
|
|
Packit |
2997f0 |
case 'b':
|
|
Packit |
2997f0 |
brokers = optarg;
|
|
Packit |
2997f0 |
break;
|
|
Packit |
2997f0 |
case 'g':
|
|
Packit |
2997f0 |
group = optarg;
|
|
Packit |
2997f0 |
break;
|
|
Packit |
2997f0 |
case 'e':
|
|
Packit |
2997f0 |
exit_eof = 1;
|
|
Packit |
2997f0 |
break;
|
|
Packit |
2997f0 |
case 'd':
|
|
Packit |
2997f0 |
debug = optarg;
|
|
Packit |
2997f0 |
break;
|
|
Packit |
2997f0 |
case 'q':
|
|
Packit |
2997f0 |
quiet = 1;
|
|
Packit |
2997f0 |
break;
|
|
Packit |
2997f0 |
case 'A':
|
|
Packit |
2997f0 |
output = OUTPUT_RAW;
|
|
Packit |
2997f0 |
break;
|
|
Packit |
2997f0 |
case 'X':
|
|
Packit |
2997f0 |
{
|
|
Packit |
2997f0 |
char *name, *val;
|
|
Packit |
2997f0 |
rd_kafka_conf_res_t res;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (!strcmp(optarg, "list") ||
|
|
Packit |
2997f0 |
!strcmp(optarg, "help")) {
|
|
Packit |
2997f0 |
rd_kafka_conf_properties_show(stdout);
|
|
Packit |
2997f0 |
exit(0);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (!strcmp(optarg, "dump")) {
|
|
Packit |
2997f0 |
do_conf_dump = 1;
|
|
Packit |
2997f0 |
continue;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
name = optarg;
|
|
Packit |
2997f0 |
if (!(val = strchr(name, '='))) {
|
|
Packit |
2997f0 |
fprintf(stderr, "%% Expected "
|
|
Packit |
2997f0 |
"-X property=value, not %s\n", name);
|
|
Packit |
2997f0 |
exit(1);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
*val = '\0';
|
|
Packit |
2997f0 |
val++;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
res = RD_KAFKA_CONF_UNKNOWN;
|
|
Packit |
2997f0 |
/* Try "topic." prefixed properties on topic
|
|
Packit |
2997f0 |
* conf first, and then fall through to global if
|
|
Packit |
2997f0 |
* it didnt match a topic configuration property. */
|
|
Packit |
2997f0 |
if (!strncmp(name, "topic.", strlen("topic.")))
|
|
Packit |
2997f0 |
res = rd_kafka_topic_conf_set(topic_conf,
|
|
Packit |
2997f0 |
name+
|
|
Packit |
2997f0 |
strlen("topic."),
|
|
Packit |
2997f0 |
val,
|
|
Packit |
2997f0 |
errstr,
|
|
Packit |
2997f0 |
sizeof(errstr));
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (res == RD_KAFKA_CONF_UNKNOWN)
|
|
Packit |
2997f0 |
res = rd_kafka_conf_set(conf, name, val,
|
|
Packit |
2997f0 |
errstr, sizeof(errstr));
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (res != RD_KAFKA_CONF_OK) {
|
|
Packit |
2997f0 |
fprintf(stderr, "%% %s\n", errstr);
|
|
Packit |
2997f0 |
exit(1);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
break;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
case 'D':
|
|
Packit |
2997f0 |
case 'O':
|
|
Packit |
2997f0 |
mode = opt;
|
|
Packit |
2997f0 |
break;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
default:
|
|
Packit |
2997f0 |
goto usage;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (do_conf_dump) {
|
|
Packit |
2997f0 |
const char **arr;
|
|
Packit |
2997f0 |
size_t cnt;
|
|
Packit |
2997f0 |
int pass;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
for (pass = 0 ; pass < 2 ; pass++) {
|
|
Packit |
2997f0 |
if (pass == 0) {
|
|
Packit |
2997f0 |
arr = rd_kafka_conf_dump(conf, &cnt);
|
|
Packit |
2997f0 |
printf("# Global config\n");
|
|
Packit |
2997f0 |
} else {
|
|
Packit |
2997f0 |
printf("# Topic config\n");
|
|
Packit |
2997f0 |
arr = rd_kafka_topic_conf_dump(topic_conf,
|
|
Packit |
2997f0 |
&cnt);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
for (i = 0 ; i < (int)cnt ; i += 2)
|
|
Packit |
2997f0 |
printf("%s = %s\n",
|
|
Packit |
2997f0 |
arr[i], arr[i+1]);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
printf("\n");
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
rd_kafka_conf_dump_free(arr, cnt);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
exit(0);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (strchr("OC", mode) && optind == argc) {
|
|
Packit |
2997f0 |
usage:
|
|
Packit |
2997f0 |
fprintf(stderr,
|
|
Packit |
2997f0 |
"Usage: %s [options] <topic[:part]> <topic[:part]>..\n"
|
|
Packit |
2997f0 |
"\n"
|
|
Packit |
2997f0 |
"librdkafka version %s (0x%08x)\n"
|
|
Packit |
2997f0 |
"\n"
|
|
Packit |
2997f0 |
" Options:\n"
|
|
Packit |
2997f0 |
" -g <group> Consumer group (%s)\n"
|
|
Packit |
2997f0 |
" -b <brokers> Broker address (%s)\n"
|
|
Packit |
2997f0 |
" -e Exit consumer when last message\n"
|
|
Packit |
2997f0 |
" in partition has been received.\n"
|
|
Packit |
2997f0 |
" -D Describe group.\n"
|
|
Packit |
2997f0 |
" -O Get commmitted offset(s)\n"
|
|
Packit |
2997f0 |
" -d [facs..] Enable debugging contexts:\n"
|
|
Packit |
2997f0 |
" %s\n"
|
|
Packit |
2997f0 |
" -q Be quiet\n"
|
|
Packit |
2997f0 |
" -A Raw payload output (consumer)\n"
|
|
Packit |
2997f0 |
" -X <prop=name> Set arbitrary librdkafka "
|
|
Packit |
2997f0 |
"configuration property\n"
|
|
Packit |
2997f0 |
" Properties prefixed with \"topic.\" "
|
|
Packit |
2997f0 |
"will be set on topic object.\n"
|
|
Packit |
2997f0 |
" Use '-X list' to see the full list\n"
|
|
Packit |
2997f0 |
" of supported properties.\n"
|
|
Packit |
2997f0 |
"\n"
|
|
Packit |
2997f0 |
"For balanced consumer groups use the 'topic1 topic2..'"
|
|
Packit |
2997f0 |
" format\n"
|
|
Packit |
2997f0 |
"and for static assignment use "
|
|
Packit |
2997f0 |
"'topic1:part1 topic1:part2 topic2:part1..'\n"
|
|
Packit |
2997f0 |
"\n",
|
|
Packit |
2997f0 |
argv[0],
|
|
Packit |
2997f0 |
rd_kafka_version_str(), rd_kafka_version(),
|
|
Packit |
2997f0 |
group, brokers,
|
|
Packit |
2997f0 |
RD_KAFKA_DEBUG_CONTEXTS);
|
|
Packit |
2997f0 |
exit(1);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
signal(SIGINT, stop);
|
|
Packit |
2997f0 |
signal(SIGUSR1, sig_usr1);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (debug &&
|
|
Packit |
2997f0 |
rd_kafka_conf_set(conf, "debug", debug, errstr, sizeof(errstr)) !=
|
|
Packit |
2997f0 |
RD_KAFKA_CONF_OK) {
|
|
Packit |
2997f0 |
fprintf(stderr, "%% Debug configuration failed: %s: %s\n",
|
|
Packit |
2997f0 |
errstr, debug);
|
|
Packit |
2997f0 |
exit(1);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/*
|
|
Packit |
2997f0 |
* Client/Consumer group
|
|
Packit |
2997f0 |
*/
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (strchr("CO", mode)) {
|
|
Packit |
2997f0 |
/* Consumer groups require a group id */
|
|
Packit |
2997f0 |
if (!group)
|
|
Packit |
2997f0 |
group = "rdkafka_consumer_example";
|
|
Packit |
2997f0 |
if (rd_kafka_conf_set(conf, "group.id", group,
|
|
Packit |
2997f0 |
errstr, sizeof(errstr)) !=
|
|
Packit |
2997f0 |
RD_KAFKA_CONF_OK) {
|
|
Packit |
2997f0 |
fprintf(stderr, "%% %s\n", errstr);
|
|
Packit |
2997f0 |
exit(1);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Consumer groups always use broker based offset storage */
|
|
Packit |
2997f0 |
if (rd_kafka_topic_conf_set(topic_conf, "offset.store.method",
|
|
Packit |
2997f0 |
"broker",
|
|
Packit |
2997f0 |
errstr, sizeof(errstr)) !=
|
|
Packit |
2997f0 |
RD_KAFKA_CONF_OK) {
|
|
Packit |
2997f0 |
fprintf(stderr, "%% %s\n", errstr);
|
|
Packit |
2997f0 |
exit(1);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Set default topic config for pattern-matched topics. */
|
|
Packit |
2997f0 |
rd_kafka_conf_set_default_topic_conf(conf, topic_conf);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Callback called on partition assignment changes */
|
|
Packit |
2997f0 |
rd_kafka_conf_set_rebalance_cb(conf, rebalance_cb);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Create Kafka handle */
|
|
Packit |
2997f0 |
if (!(rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf,
|
|
Packit |
2997f0 |
errstr, sizeof(errstr)))) {
|
|
Packit |
2997f0 |
fprintf(stderr,
|
|
Packit |
2997f0 |
"%% Failed to create new consumer: %s\n",
|
|
Packit |
2997f0 |
errstr);
|
|
Packit |
2997f0 |
exit(1);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Add brokers */
|
|
Packit |
2997f0 |
if (rd_kafka_brokers_add(rk, brokers) == 0) {
|
|
Packit |
2997f0 |
fprintf(stderr, "%% No valid brokers specified\n");
|
|
Packit |
2997f0 |
exit(1);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (mode == 'D') {
|
|
Packit |
2997f0 |
int r;
|
|
Packit |
2997f0 |
/* Describe groups */
|
|
Packit |
2997f0 |
r = describe_groups(rk, group);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
rd_kafka_destroy(rk);
|
|
Packit |
2997f0 |
exit(r == -1 ? 1 : 0);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Redirect rd_kafka_poll() to consumer_poll() */
|
|
Packit |
2997f0 |
rd_kafka_poll_set_consumer(rk);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
topics = rd_kafka_topic_partition_list_new(argc - optind);
|
|
Packit |
2997f0 |
is_subscription = 1;
|
|
Packit |
2997f0 |
for (i = optind ; i < argc ; i++) {
|
|
Packit |
2997f0 |
/* Parse "topic[:part] */
|
|
Packit |
2997f0 |
char *topic = argv[i];
|
|
Packit |
2997f0 |
char *t;
|
|
Packit |
2997f0 |
int32_t partition = -1;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if ((t = strstr(topic, ":"))) {
|
|
Packit |
2997f0 |
*t = '\0';
|
|
Packit |
2997f0 |
partition = atoi(t+1);
|
|
Packit |
2997f0 |
is_subscription = 0; /* is assignment */
|
|
Packit |
2997f0 |
wait_eof++;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
rd_kafka_topic_partition_list_add(topics, topic, partition);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (mode == 'O') {
|
|
Packit |
2997f0 |
/* Offset query */
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
err = rd_kafka_committed(rk, topics, 5000);
|
|
Packit |
2997f0 |
if (err) {
|
|
Packit |
2997f0 |
fprintf(stderr, "%% Failed to fetch offsets: %s\n",
|
|
Packit |
2997f0 |
rd_kafka_err2str(err));
|
|
Packit |
2997f0 |
exit(1);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
for (i = 0 ; i < topics->cnt ; i++) {
|
|
Packit |
2997f0 |
rd_kafka_topic_partition_t *p = &topics->elems[i];
|
|
Packit |
2997f0 |
printf("Topic \"%s\" partition %"PRId32,
|
|
Packit |
2997f0 |
p->topic, p->partition);
|
|
Packit |
2997f0 |
if (p->err)
|
|
Packit |
2997f0 |
printf(" error %s",
|
|
Packit |
2997f0 |
rd_kafka_err2str(p->err));
|
|
Packit |
2997f0 |
else {
|
|
Packit |
2997f0 |
printf(" offset %"PRId64"",
|
|
Packit |
2997f0 |
p->offset);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (p->metadata_size)
|
|
Packit |
2997f0 |
printf(" (%d bytes of metadata)",
|
|
Packit |
2997f0 |
(int)p->metadata_size);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
printf("\n");
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
goto done;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (is_subscription) {
|
|
Packit |
2997f0 |
fprintf(stderr, "%% Subscribing to %d topics\n", topics->cnt);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if ((err = rd_kafka_subscribe(rk, topics))) {
|
|
Packit |
2997f0 |
fprintf(stderr,
|
|
Packit |
2997f0 |
"%% Failed to start consuming topics: %s\n",
|
|
Packit |
2997f0 |
rd_kafka_err2str(err));
|
|
Packit |
2997f0 |
exit(1);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
} else {
|
|
Packit |
2997f0 |
fprintf(stderr, "%% Assigning %d partitions\n", topics->cnt);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if ((err = rd_kafka_assign(rk, topics))) {
|
|
Packit |
2997f0 |
fprintf(stderr,
|
|
Packit |
2997f0 |
"%% Failed to assign partitions: %s\n",
|
|
Packit |
2997f0 |
rd_kafka_err2str(err));
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
while (run) {
|
|
Packit |
2997f0 |
rd_kafka_message_t *rkmessage;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
rkmessage = rd_kafka_consumer_poll(rk, 1000);
|
|
Packit |
2997f0 |
if (rkmessage) {
|
|
Packit |
2997f0 |
msg_consume(rkmessage);
|
|
Packit |
2997f0 |
rd_kafka_message_destroy(rkmessage);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
done:
|
|
Packit |
2997f0 |
err = rd_kafka_consumer_close(rk);
|
|
Packit |
2997f0 |
if (err)
|
|
Packit |
2997f0 |
fprintf(stderr, "%% Failed to close consumer: %s\n",
|
|
Packit |
2997f0 |
rd_kafka_err2str(err));
|
|
Packit |
2997f0 |
else
|
|
Packit |
2997f0 |
fprintf(stderr, "%% Consumer closed\n");
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
rd_kafka_topic_partition_list_destroy(topics);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Destroy handle */
|
|
Packit |
2997f0 |
rd_kafka_destroy(rk);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Let background threads clean up and terminate cleanly. */
|
|
Packit |
2997f0 |
run = 5;
|
|
Packit |
2997f0 |
while (run-- > 0 && rd_kafka_wait_destroyed(1000) == -1)
|
|
Packit |
2997f0 |
printf("Waiting for librdkafka to decommission\n");
|
|
Packit |
2997f0 |
if (run <= 0)
|
|
Packit |
2997f0 |
rd_kafka_dump(stdout, rk);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
return 0;
|
|
Packit |
2997f0 |
}
|