Blame examples/rdkafka_example.c

Packit 2997f0
/*
Packit 2997f0
 * librdkafka - Apache Kafka C library
Packit 2997f0
 *
Packit 2997f0
 * Copyright (c) 2012, Magnus Edenhill
Packit 2997f0
 * All rights reserved.
Packit 2997f0
 * 
Packit 2997f0
 * Redistribution and use in source and binary forms, with or without
Packit 2997f0
 * modification, are permitted provided that the following conditions are met: 
Packit 2997f0
 * 
Packit 2997f0
 * 1. Redistributions of source code must retain the above copyright notice,
Packit 2997f0
 *    this list of conditions and the following disclaimer. 
Packit 2997f0
 * 2. Redistributions in binary form must reproduce the above copyright notice,
Packit 2997f0
 *    this list of conditions and the following disclaimer in the documentation
Packit 2997f0
 *    and/or other materials provided with the distribution. 
Packit 2997f0
 * 
Packit 2997f0
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
Packit 2997f0
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
Packit 2997f0
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
Packit 2997f0
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
Packit 2997f0
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
Packit 2997f0
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
Packit 2997f0
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
Packit 2997f0
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
Packit 2997f0
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
Packit 2997f0
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
Packit 2997f0
 * POSSIBILITY OF SUCH DAMAGE.
Packit 2997f0
 */
Packit 2997f0
Packit 2997f0
/**
Packit 2997f0
 * Apache Kafka consumer & producer example programs
Packit 2997f0
 * using the Kafka driver from librdkafka
Packit 2997f0
 * (https://github.com/edenhill/librdkafka)
Packit 2997f0
 */
Packit 2997f0
Packit 2997f0
#include <ctype.h>
Packit 2997f0
#include <signal.h>
Packit 2997f0
#include <string.h>
Packit 2997f0
#include <unistd.h>
Packit 2997f0
#include <stdlib.h>
Packit 2997f0
#include <syslog.h>
Packit 2997f0
#include <time.h>
Packit 2997f0
#include <sys/time.h>
Packit 2997f0
#include <getopt.h>
Packit 2997f0
Packit 2997f0
/* Typical include path would be <librdkafka/rdkafka.h>, but this program
Packit 2997f0
 * is builtin from within the librdkafka source tree and thus differs. */
Packit 2997f0
#include "rdkafka.h"  /* for Kafka driver */
Packit 2997f0
Packit 2997f0
Packit 2997f0
static int run = 1;
Packit 2997f0
static rd_kafka_t *rk;
Packit 2997f0
static int exit_eof = 0;
Packit 2997f0
static int quiet = 0;
Packit 2997f0
static 	enum {
Packit 2997f0
	OUTPUT_HEXDUMP,
Packit 2997f0
	OUTPUT_RAW,
Packit 2997f0
} output = OUTPUT_HEXDUMP;
Packit 2997f0
Packit 2997f0
static void stop (int sig) {
Packit 2997f0
	run = 0;
Packit 2997f0
	fclose(stdin); /* abort fgets() */
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
Packit 2997f0
static void hexdump (FILE *fp, const char *name, const void *ptr, size_t len) {
Packit 2997f0
	const char *p = (const char *)ptr;
Packit 2997f0
	size_t of = 0;
Packit 2997f0
Packit 2997f0
Packit 2997f0
	if (name)
Packit 2997f0
		fprintf(fp, "%s hexdump (%zd bytes):\n", name, len);
Packit 2997f0
Packit 2997f0
	for (of = 0 ; of < len ; of += 16) {
Packit 2997f0
		char hexen[16*3+1];
Packit 2997f0
		char charen[16+1];
Packit 2997f0
		int hof = 0;
Packit 2997f0
Packit 2997f0
		int cof = 0;
Packit 2997f0
		int i;
Packit 2997f0
Packit 2997f0
		for (i = of ; i < (int)of + 16 && i < (int)len ; i++) {
Packit 2997f0
			hof += sprintf(hexen+hof, "%02x ", p[i] & 0xff);
Packit 2997f0
			cof += sprintf(charen+cof, "%c",
Packit 2997f0
				       isprint((int)p[i]) ? p[i] : '.');
Packit 2997f0
		}
Packit 2997f0
		fprintf(fp, "%08zx: %-48s %-16s\n",
Packit 2997f0
			of, hexen, charen);
Packit 2997f0
	}
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
/**
Packit 2997f0
 * Kafka logger callback (optional)
Packit 2997f0
 */
Packit 2997f0
static void logger (const rd_kafka_t *rk, int level,
Packit 2997f0
		    const char *fac, const char *buf) {
Packit 2997f0
	struct timeval tv;
Packit 2997f0
	gettimeofday(&tv, NULL);
Packit 2997f0
	fprintf(stderr, "%u.%03u RDKAFKA-%i-%s: %s: %s\n",
Packit 2997f0
		(int)tv.tv_sec, (int)(tv.tv_usec / 1000),
Packit 2997f0
		level, fac, rk ? rd_kafka_name(rk) : NULL, buf);
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
/**
Packit 2997f0
 * Message delivery report callback.
Packit 2997f0
 * Called once for each message.
Packit 2997f0
 * See rdkafka.h for more information.
Packit 2997f0
 */
Packit 2997f0
static void msg_delivered (rd_kafka_t *rk,
Packit 2997f0
			   void *payload, size_t len,
Packit 2997f0
			   int error_code,
Packit 2997f0
			   void *opaque, void *msg_opaque) {
Packit 2997f0
Packit 2997f0
	if (error_code)
Packit 2997f0
		fprintf(stderr, "%% Message delivery failed: %s\n",
Packit 2997f0
			rd_kafka_err2str(error_code));
Packit 2997f0
	else if (!quiet)
Packit 2997f0
		fprintf(stderr, "%% Message delivered (%zd bytes): %.*s\n", len,
Packit 2997f0
			(int)len, (const char *)payload);
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
/**
Packit 2997f0
 * Message delivery report callback using the richer rd_kafka_message_t object.
Packit 2997f0
 */
Packit 2997f0
static void msg_delivered2 (rd_kafka_t *rk,
Packit 2997f0
                            const rd_kafka_message_t *rkmessage, void *opaque) {
Packit 2997f0
	printf("del: %s: offset %"PRId64"\n",
Packit 2997f0
	       rd_kafka_err2str(rkmessage->err), rkmessage->offset);
Packit 2997f0
        if (rkmessage->err)
Packit 2997f0
		fprintf(stderr, "%% Message delivery failed: %s\n",
Packit 2997f0
                        rd_kafka_err2str(rkmessage->err));
Packit 2997f0
	else if (!quiet)
Packit 2997f0
		fprintf(stderr,
Packit 2997f0
                        "%% Message delivered (%zd bytes, offset %"PRId64", "
Packit 2997f0
                        "partition %"PRId32"): %.*s\n",
Packit 2997f0
                        rkmessage->len, rkmessage->offset,
Packit 2997f0
			rkmessage->partition,
Packit 2997f0
			(int)rkmessage->len, (const char *)rkmessage->payload);
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
Packit 2997f0
static void msg_consume (rd_kafka_message_t *rkmessage,
Packit 2997f0
			 void *opaque) {
Packit 2997f0
	if (rkmessage->err) {
Packit 2997f0
		if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
Packit 2997f0
			fprintf(stderr,
Packit 2997f0
				"%% Consumer reached end of %s [%"PRId32"] "
Packit 2997f0
			       "message queue at offset %"PRId64"\n",
Packit 2997f0
			       rd_kafka_topic_name(rkmessage->rkt),
Packit 2997f0
			       rkmessage->partition, rkmessage->offset);
Packit 2997f0
Packit 2997f0
			if (exit_eof)
Packit 2997f0
				run = 0;
Packit 2997f0
Packit 2997f0
			return;
Packit 2997f0
		}
Packit 2997f0
Packit 2997f0
		fprintf(stderr, "%% Consume error for topic \"%s\" [%"PRId32"] "
Packit 2997f0
		       "offset %"PRId64": %s\n",
Packit 2997f0
		       rd_kafka_topic_name(rkmessage->rkt),
Packit 2997f0
		       rkmessage->partition,
Packit 2997f0
		       rkmessage->offset,
Packit 2997f0
		       rd_kafka_message_errstr(rkmessage));
Packit 2997f0
Packit 2997f0
                if (rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION ||
Packit 2997f0
                    rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC)
Packit 2997f0
                        run = 0;
Packit 2997f0
		return;
Packit 2997f0
	}
Packit 2997f0
Packit 2997f0
	if (!quiet) {
Packit 2997f0
		rd_kafka_timestamp_type_t tstype;
Packit 2997f0
		int64_t timestamp;
Packit 2997f0
                rd_kafka_headers_t *hdrs;
Packit 2997f0
Packit 2997f0
		fprintf(stdout, "%% Message (offset %"PRId64", %zd bytes):\n",
Packit 2997f0
			rkmessage->offset, rkmessage->len);
Packit 2997f0
Packit 2997f0
		timestamp = rd_kafka_message_timestamp(rkmessage, &tstype);
Packit 2997f0
		if (tstype != RD_KAFKA_TIMESTAMP_NOT_AVAILABLE) {
Packit 2997f0
			const char *tsname = "?";
Packit 2997f0
			if (tstype == RD_KAFKA_TIMESTAMP_CREATE_TIME)
Packit 2997f0
				tsname = "create time";
Packit 2997f0
			else if (tstype == RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME)
Packit 2997f0
				tsname = "log append time";
Packit 2997f0
Packit 2997f0
			fprintf(stdout, "%% Message timestamp: %s %"PRId64
Packit 2997f0
				" (%ds ago)\n",
Packit 2997f0
				tsname, timestamp,
Packit 2997f0
				!timestamp ? 0 :
Packit 2997f0
				(int)time(NULL) - (int)(timestamp/1000));
Packit 2997f0
		}
Packit 2997f0
Packit 2997f0
                if (!rd_kafka_message_headers(rkmessage, &hdrs)) {
Packit 2997f0
                        size_t idx = 0;
Packit 2997f0
                        const char *name;
Packit 2997f0
                        const void *val;
Packit 2997f0
                        size_t size;
Packit 2997f0
Packit 2997f0
                        fprintf(stdout, "%% Headers:");
Packit 2997f0
Packit 2997f0
                        while (!rd_kafka_header_get_all(hdrs, idx++,
Packit 2997f0
                                                        &name, &val, &size)) {
Packit 2997f0
                                fprintf(stdout, "%s%s=",
Packit 2997f0
                                        idx == 1 ? " " : ", ", name);
Packit 2997f0
                                if (val)
Packit 2997f0
                                        fprintf(stdout, "\"%.*s\"",
Packit 2997f0
                                                (int)size, (const char *)val);
Packit 2997f0
                                else
Packit 2997f0
                                        fprintf(stdout, "NULL");
Packit 2997f0
                        }
Packit 2997f0
                        fprintf(stdout, "\n");
Packit 2997f0
                }
Packit 2997f0
	}
Packit 2997f0
Packit 2997f0
	if (rkmessage->key_len) {
Packit 2997f0
		if (output == OUTPUT_HEXDUMP)
Packit 2997f0
			hexdump(stdout, "Message Key",
Packit 2997f0
				rkmessage->key, rkmessage->key_len);
Packit 2997f0
		else
Packit 2997f0
			printf("Key: %.*s\n",
Packit 2997f0
			       (int)rkmessage->key_len, (char *)rkmessage->key);
Packit 2997f0
	}
Packit 2997f0
Packit 2997f0
	if (output == OUTPUT_HEXDUMP)
Packit 2997f0
		hexdump(stdout, "Message Payload",
Packit 2997f0
			rkmessage->payload, rkmessage->len);
Packit 2997f0
	else
Packit 2997f0
		printf("%.*s\n",
Packit 2997f0
		       (int)rkmessage->len, (char *)rkmessage->payload);
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
Packit 2997f0
static void metadata_print (const char *topic,
Packit 2997f0
                            const struct rd_kafka_metadata *metadata) {
Packit 2997f0
        int i, j, k;
Packit 2997f0
Packit 2997f0
        printf("Metadata for %s (from broker %"PRId32": %s):\n",
Packit 2997f0
               topic ? : "all topics",
Packit 2997f0
               metadata->orig_broker_id,
Packit 2997f0
               metadata->orig_broker_name);
Packit 2997f0
Packit 2997f0
Packit 2997f0
        /* Iterate brokers */
Packit 2997f0
        printf(" %i brokers:\n", metadata->broker_cnt);
Packit 2997f0
        for (i = 0 ; i < metadata->broker_cnt ; i++)
Packit 2997f0
                printf("  broker %"PRId32" at %s:%i\n",
Packit 2997f0
                       metadata->brokers[i].id,
Packit 2997f0
                       metadata->brokers[i].host,
Packit 2997f0
                       metadata->brokers[i].port);
Packit 2997f0
Packit 2997f0
        /* Iterate topics */
Packit 2997f0
        printf(" %i topics:\n", metadata->topic_cnt);
Packit 2997f0
        for (i = 0 ; i < metadata->topic_cnt ; i++) {
Packit 2997f0
                const struct rd_kafka_metadata_topic *t = &metadata->topics[i];
Packit 2997f0
                printf("  topic \"%s\" with %i partitions:",
Packit 2997f0
                       t->topic,
Packit 2997f0
                       t->partition_cnt);
Packit 2997f0
                if (t->err) {
Packit 2997f0
                        printf(" %s", rd_kafka_err2str(t->err));
Packit 2997f0
                        if (t->err == RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE)
Packit 2997f0
                                printf(" (try again)");
Packit 2997f0
                }
Packit 2997f0
                printf("\n");
Packit 2997f0
Packit 2997f0
                /* Iterate topic's partitions */
Packit 2997f0
                for (j = 0 ; j < t->partition_cnt ; j++) {
Packit 2997f0
                        const struct rd_kafka_metadata_partition *p;
Packit 2997f0
                        p = &t->partitions[j];
Packit 2997f0
                        printf("    partition %"PRId32", "
Packit 2997f0
                               "leader %"PRId32", replicas: ",
Packit 2997f0
                               p->id, p->leader);
Packit 2997f0
Packit 2997f0
                        /* Iterate partition's replicas */
Packit 2997f0
                        for (k = 0 ; k < p->replica_cnt ; k++)
Packit 2997f0
                                printf("%s%"PRId32,
Packit 2997f0
                                       k > 0 ? ",":"", p->replicas[k]);
Packit 2997f0
Packit 2997f0
                        /* Iterate partition's ISRs */
Packit 2997f0
                        printf(", isrs: ");
Packit 2997f0
                        for (k = 0 ; k < p->isr_cnt ; k++)
Packit 2997f0
                                printf("%s%"PRId32,
Packit 2997f0
                                       k > 0 ? ",":"", p->isrs[k]);
Packit 2997f0
                        if (p->err)
Packit 2997f0
                                printf(", %s\n", rd_kafka_err2str(p->err));
Packit 2997f0
                        else
Packit 2997f0
                                printf("\n");
Packit 2997f0
                }
Packit 2997f0
        }
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
Packit 2997f0
static void sig_usr1 (int sig) {
Packit 2997f0
	rd_kafka_dump(stdout, rk);
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
int main (int argc, char **argv) {
Packit 2997f0
	rd_kafka_topic_t *rkt;
Packit 2997f0
	char *brokers = "localhost:9092";
Packit 2997f0
	char mode = 'C';
Packit 2997f0
	char *topic = NULL;
Packit 2997f0
	int partition = RD_KAFKA_PARTITION_UA;
Packit 2997f0
	int opt;
Packit 2997f0
	rd_kafka_conf_t *conf;
Packit 2997f0
	rd_kafka_topic_conf_t *topic_conf;
Packit 2997f0
	char errstr[512];
Packit 2997f0
	int64_t start_offset = 0;
Packit 2997f0
        int report_offsets = 0;
Packit 2997f0
	int do_conf_dump = 0;
Packit 2997f0
	char tmp[16];
Packit 2997f0
        int64_t seek_offset = 0;
Packit 2997f0
        int64_t tmp_offset = 0;
Packit 2997f0
	int get_wmarks = 0;
Packit 2997f0
        rd_kafka_headers_t *hdrs = NULL;
Packit 2997f0
        rd_kafka_resp_err_t err;
Packit 2997f0
Packit 2997f0
	/* Kafka configuration */
Packit 2997f0
	conf = rd_kafka_conf_new();
Packit 2997f0
Packit 2997f0
        /* Set logger */
Packit 2997f0
        rd_kafka_conf_set_log_cb(conf, logger);
Packit 2997f0
Packit 2997f0
	/* Quick termination */
Packit 2997f0
	snprintf(tmp, sizeof(tmp), "%i", SIGIO);
Packit 2997f0
	rd_kafka_conf_set(conf, "internal.termination.signal", tmp, NULL, 0);
Packit 2997f0
Packit 2997f0
	/* Topic configuration */
Packit 2997f0
	topic_conf = rd_kafka_topic_conf_new();
Packit 2997f0
Packit 2997f0
	while ((opt = getopt(argc, argv, "PCLt:p:b:z:qd:o:eX:As:H:")) != -1) {
Packit 2997f0
		switch (opt) {
Packit 2997f0
		case 'P':
Packit 2997f0
		case 'C':
Packit 2997f0
                case 'L':
Packit 2997f0
			mode = opt;
Packit 2997f0
			break;
Packit 2997f0
		case 't':
Packit 2997f0
			topic = optarg;
Packit 2997f0
			break;
Packit 2997f0
		case 'p':
Packit 2997f0
			partition = atoi(optarg);
Packit 2997f0
			break;
Packit 2997f0
		case 'b':
Packit 2997f0
			brokers = optarg;
Packit 2997f0
			break;
Packit 2997f0
		case 'z':
Packit 2997f0
			if (rd_kafka_conf_set(conf, "compression.codec",
Packit 2997f0
					      optarg,
Packit 2997f0
					      errstr, sizeof(errstr)) !=
Packit 2997f0
			    RD_KAFKA_CONF_OK) {
Packit 2997f0
				fprintf(stderr, "%% %s\n", errstr);
Packit 2997f0
				exit(1);
Packit 2997f0
			}
Packit 2997f0
			break;
Packit 2997f0
		case 'o':
Packit 2997f0
                case 's':
Packit 2997f0
			if (!strcmp(optarg, "end"))
Packit 2997f0
				tmp_offset = RD_KAFKA_OFFSET_END;
Packit 2997f0
			else if (!strcmp(optarg, "beginning"))
Packit 2997f0
				tmp_offset = RD_KAFKA_OFFSET_BEGINNING;
Packit 2997f0
			else if (!strcmp(optarg, "stored"))
Packit 2997f0
				tmp_offset = RD_KAFKA_OFFSET_STORED;
Packit 2997f0
                        else if (!strcmp(optarg, "report"))
Packit 2997f0
                                report_offsets = 1;
Packit 2997f0
			else if (!strcmp(optarg, "wmark"))
Packit 2997f0
				get_wmarks = 1;
Packit 2997f0
			else {
Packit 2997f0
				tmp_offset = strtoll(optarg, NULL, 10);
Packit 2997f0
Packit 2997f0
				if (tmp_offset < 0)
Packit 2997f0
					tmp_offset = RD_KAFKA_OFFSET_TAIL(-tmp_offset);
Packit 2997f0
			}
Packit 2997f0
Packit 2997f0
                        if (opt == 'o')
Packit 2997f0
                                start_offset = tmp_offset;
Packit 2997f0
                        else if (opt == 's')
Packit 2997f0
                                seek_offset = tmp_offset;
Packit 2997f0
			break;
Packit 2997f0
		case 'e':
Packit 2997f0
			exit_eof = 1;
Packit 2997f0
			break;
Packit 2997f0
		case 'd':
Packit 2997f0
			if (rd_kafka_conf_set(conf, "debug", optarg,
Packit 2997f0
					      errstr, sizeof(errstr)) !=
Packit 2997f0
			    RD_KAFKA_CONF_OK) {
Packit 2997f0
				fprintf(stderr,
Packit 2997f0
					"%% Debug configuration failed: "
Packit 2997f0
					"%s: %s\n",
Packit 2997f0
					errstr, optarg);
Packit 2997f0
				exit(1);
Packit 2997f0
			}
Packit 2997f0
			break;
Packit 2997f0
		case 'q':
Packit 2997f0
			quiet = 1;
Packit 2997f0
			break;
Packit 2997f0
		case 'A':
Packit 2997f0
			output = OUTPUT_RAW;
Packit 2997f0
			break;
Packit 2997f0
                case 'H':
Packit 2997f0
                {
Packit 2997f0
                        char *name, *val;
Packit 2997f0
                        size_t name_sz = -1;
Packit 2997f0
Packit 2997f0
                        name = optarg;
Packit 2997f0
                        val = strchr(name, '=');
Packit 2997f0
                        if (val) {
Packit 2997f0
                                name_sz = (size_t)(val-name);
Packit 2997f0
                                val++; /* past the '=' */
Packit 2997f0
                        }
Packit 2997f0
Packit 2997f0
                        if (!hdrs)
Packit 2997f0
                                hdrs = rd_kafka_headers_new(8);
Packit 2997f0
Packit 2997f0
                        err = rd_kafka_header_add(hdrs, name, name_sz, val, -1);
Packit 2997f0
                        if (err) {
Packit 2997f0
                                fprintf(stderr,
Packit 2997f0
                                        "%% Failed to add header %s: %s\n",
Packit 2997f0
                                        name, rd_kafka_err2str(err));
Packit 2997f0
                                exit(1);
Packit 2997f0
                        }
Packit 2997f0
                }
Packit 2997f0
                break;
Packit 2997f0
Packit 2997f0
		case 'X':
Packit 2997f0
		{
Packit 2997f0
			char *name, *val;
Packit 2997f0
			rd_kafka_conf_res_t res;
Packit 2997f0
Packit 2997f0
			if (!strcmp(optarg, "list") ||
Packit 2997f0
			    !strcmp(optarg, "help")) {
Packit 2997f0
				rd_kafka_conf_properties_show(stdout);
Packit 2997f0
				exit(0);
Packit 2997f0
			}
Packit 2997f0
Packit 2997f0
			if (!strcmp(optarg, "dump")) {
Packit 2997f0
				do_conf_dump = 1;
Packit 2997f0
				continue;
Packit 2997f0
			}
Packit 2997f0
Packit 2997f0
			name = optarg;
Packit 2997f0
			if (!(val = strchr(name, '='))) {
Packit 2997f0
				char dest[512];
Packit 2997f0
				size_t dest_size = sizeof(dest);
Packit 2997f0
				/* Return current value for property. */
Packit 2997f0
Packit 2997f0
				res = RD_KAFKA_CONF_UNKNOWN;
Packit 2997f0
				if (!strncmp(name, "topic.", strlen("topic.")))
Packit 2997f0
					res = rd_kafka_topic_conf_get(
Packit 2997f0
						topic_conf,
Packit 2997f0
						name+strlen("topic."),
Packit 2997f0
						dest, &dest_size);
Packit 2997f0
				if (res == RD_KAFKA_CONF_UNKNOWN)
Packit 2997f0
					res = rd_kafka_conf_get(
Packit 2997f0
						conf, name, dest, &dest_size);
Packit 2997f0
Packit 2997f0
				if (res == RD_KAFKA_CONF_OK) {
Packit 2997f0
					printf("%s = %s\n", name, dest);
Packit 2997f0
					exit(0);
Packit 2997f0
				} else {
Packit 2997f0
					fprintf(stderr,
Packit 2997f0
						"%% %s property\n",
Packit 2997f0
						res == RD_KAFKA_CONF_UNKNOWN ?
Packit 2997f0
						"Unknown" : "Invalid");
Packit 2997f0
					exit(1);
Packit 2997f0
				}
Packit 2997f0
			}
Packit 2997f0
Packit 2997f0
			*val = '\0';
Packit 2997f0
			val++;
Packit 2997f0
Packit 2997f0
			res = RD_KAFKA_CONF_UNKNOWN;
Packit 2997f0
			/* Try "topic." prefixed properties on topic
Packit 2997f0
			 * conf first, and then fall through to global if
Packit 2997f0
			 * it didnt match a topic configuration property. */
Packit 2997f0
			if (!strncmp(name, "topic.", strlen("topic.")))
Packit 2997f0
				res = rd_kafka_topic_conf_set(topic_conf,
Packit 2997f0
							      name+
Packit 2997f0
							      strlen("topic."),
Packit 2997f0
							      val,
Packit 2997f0
							      errstr,
Packit 2997f0
							      sizeof(errstr));
Packit 2997f0
Packit 2997f0
			if (res == RD_KAFKA_CONF_UNKNOWN)
Packit 2997f0
				res = rd_kafka_conf_set(conf, name, val,
Packit 2997f0
							errstr, sizeof(errstr));
Packit 2997f0
Packit 2997f0
			if (res != RD_KAFKA_CONF_OK) {
Packit 2997f0
				fprintf(stderr, "%% %s\n", errstr);
Packit 2997f0
				exit(1);
Packit 2997f0
			}
Packit 2997f0
		}
Packit 2997f0
		break;
Packit 2997f0
Packit 2997f0
		default:
Packit 2997f0
			goto usage;
Packit 2997f0
		}
Packit 2997f0
	}
Packit 2997f0
Packit 2997f0
Packit 2997f0
	if (do_conf_dump) {
Packit 2997f0
		const char **arr;
Packit 2997f0
		size_t cnt;
Packit 2997f0
		int pass;
Packit 2997f0
Packit 2997f0
		for (pass = 0 ; pass < 2 ; pass++) {
Packit 2997f0
			int i;
Packit 2997f0
Packit 2997f0
			if (pass == 0) {
Packit 2997f0
				arr = rd_kafka_conf_dump(conf, &cnt);
Packit 2997f0
				printf("# Global config\n");
Packit 2997f0
			} else {
Packit 2997f0
				printf("# Topic config\n");
Packit 2997f0
				arr = rd_kafka_topic_conf_dump(topic_conf,
Packit 2997f0
							       &cnt);
Packit 2997f0
			}
Packit 2997f0
Packit 2997f0
			for (i = 0 ; i < (int)cnt ; i += 2)
Packit 2997f0
				printf("%s = %s\n",
Packit 2997f0
				       arr[i], arr[i+1]);
Packit 2997f0
Packit 2997f0
			printf("\n");
Packit 2997f0
Packit 2997f0
			rd_kafka_conf_dump_free(arr, cnt);
Packit 2997f0
		}
Packit 2997f0
Packit 2997f0
		exit(0);
Packit 2997f0
	}
Packit 2997f0
Packit 2997f0
Packit 2997f0
	if (optind != argc || (mode != 'L' && !topic)) {
Packit 2997f0
	usage:
Packit 2997f0
		fprintf(stderr,
Packit 2997f0
			"Usage: %s -C|-P|-L -t <topic> "
Packit 2997f0
			"[-p <partition>] [-b <host1:port1,host2:port2,..>]\n"
Packit 2997f0
			"\n"
Packit 2997f0
			"librdkafka version %s (0x%08x)\n"
Packit 2997f0
			"\n"
Packit 2997f0
			" Options:\n"
Packit 2997f0
			"  -C | -P         Consumer or Producer mode\n"
Packit 2997f0
                        "  -L              Metadata list mode\n"
Packit 2997f0
			"  -t <topic>      Topic to fetch / produce\n"
Packit 2997f0
			"  -p <num>        Partition (random partitioner)\n"
Packit 2997f0
			"  -b <brokers>    Broker address (localhost:9092)\n"
Packit 2997f0
			"  -z <codec>      Enable compression:\n"
Packit 2997f0
			"                  none|gzip|snappy\n"
Packit 2997f0
			"  -o <offset>     Start offset (consumer):\n"
Packit 2997f0
			"                  beginning, end, NNNNN or -NNNNN\n"
Packit 2997f0
			"                  wmark returns the current hi&lo "
Packit 2997f0
			"watermarks.\n"
Packit 2997f0
                        "  -o report       Report message offsets (producer)\n"
Packit 2997f0
			"  -e              Exit consumer when last message\n"
Packit 2997f0
			"                  in partition has been received.\n"
Packit 2997f0
			"  -d [facs..]     Enable debugging contexts:\n"
Packit 2997f0
			"                  %s\n"
Packit 2997f0
			"  -q              Be quiet\n"
Packit 2997f0
			"  -A              Raw payload output (consumer)\n"
Packit 2997f0
                        "  -H <name[=value]> Add header to message (producer)\n"
Packit 2997f0
			"  -X <prop=name>  Set arbitrary librdkafka "
Packit 2997f0
			"configuration property\n"
Packit 2997f0
			"                  Properties prefixed with \"topic.\" "
Packit 2997f0
			"will be set on topic object.\n"
Packit 2997f0
			"  -X list         Show full list of supported "
Packit 2997f0
			"properties.\n"
Packit 2997f0
			"  -X <prop>       Get single property value\n"
Packit 2997f0
			"\n"
Packit 2997f0
			" In Consumer mode:\n"
Packit 2997f0
			"  writes fetched messages to stdout\n"
Packit 2997f0
			" In Producer mode:\n"
Packit 2997f0
			"  reads messages from stdin and sends to broker\n"
Packit 2997f0
                        " In List mode:\n"
Packit 2997f0
                        "  queries broker for metadata information, "
Packit 2997f0
                        "topic is optional.\n"
Packit 2997f0
			"\n"
Packit 2997f0
			"\n"
Packit 2997f0
			"\n",
Packit 2997f0
			argv[0],
Packit 2997f0
			rd_kafka_version_str(), rd_kafka_version(),
Packit 2997f0
			RD_KAFKA_DEBUG_CONTEXTS);
Packit 2997f0
		exit(1);
Packit 2997f0
	}
Packit 2997f0
Packit 2997f0
	if ((mode == 'C' && !isatty(STDIN_FILENO)) ||
Packit 2997f0
	    (mode != 'C' && !isatty(STDOUT_FILENO)))
Packit 2997f0
		quiet = 1;
Packit 2997f0
Packit 2997f0
Packit 2997f0
	signal(SIGINT, stop);
Packit 2997f0
	signal(SIGUSR1, sig_usr1);
Packit 2997f0
Packit 2997f0
	if (mode == 'P') {
Packit 2997f0
		/*
Packit 2997f0
		 * Producer
Packit 2997f0
		 */
Packit 2997f0
		char buf[2048];
Packit 2997f0
		int sendcnt = 0;
Packit 2997f0
Packit 2997f0
		/* Set up a message delivery report callback.
Packit 2997f0
		 * It will be called once for each message, either on successful
Packit 2997f0
		 * delivery to broker, or upon failure to deliver to broker. */
Packit 2997f0
Packit 2997f0
                /* If offset reporting (-o report) is enabled, use the
Packit 2997f0
                 * richer dr_msg_cb instead. */
Packit 2997f0
                if (report_offsets) {
Packit 2997f0
                        rd_kafka_topic_conf_set(topic_conf,
Packit 2997f0
                                                "produce.offset.report",
Packit 2997f0
                                                "true", errstr, sizeof(errstr));
Packit 2997f0
                        rd_kafka_conf_set_dr_msg_cb(conf, msg_delivered2);
Packit 2997f0
                } else
Packit 2997f0
                        rd_kafka_conf_set_dr_cb(conf, msg_delivered);
Packit 2997f0
Packit 2997f0
		/* Create Kafka handle */
Packit 2997f0
		if (!(rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf,
Packit 2997f0
					errstr, sizeof(errstr)))) {
Packit 2997f0
			fprintf(stderr,
Packit 2997f0
				"%% Failed to create new producer: %s\n",
Packit 2997f0
				errstr);
Packit 2997f0
			exit(1);
Packit 2997f0
		}
Packit 2997f0
Packit 2997f0
		/* Add brokers */
Packit 2997f0
		if (rd_kafka_brokers_add(rk, brokers) == 0) {
Packit 2997f0
			fprintf(stderr, "%% No valid brokers specified\n");
Packit 2997f0
			exit(1);
Packit 2997f0
		}
Packit 2997f0
Packit 2997f0
		/* Create topic */
Packit 2997f0
		rkt = rd_kafka_topic_new(rk, topic, topic_conf);
Packit 2997f0
                topic_conf = NULL; /* Now owned by topic */
Packit 2997f0
Packit 2997f0
		if (!quiet)
Packit 2997f0
			fprintf(stderr,
Packit 2997f0
				"%% Type stuff and hit enter to send\n");
Packit 2997f0
Packit 2997f0
		while (run && fgets(buf, sizeof(buf), stdin)) {
Packit 2997f0
			size_t len = strlen(buf);
Packit 2997f0
			if (buf[len-1] == '\n')
Packit 2997f0
				buf[--len] = '\0';
Packit 2997f0
Packit 2997f0
			/* Send/Produce message. */
Packit 2997f0
                        if (hdrs) {
Packit 2997f0
                                rd_kafka_headers_t *hdrs_copy;
Packit 2997f0
Packit 2997f0
                                hdrs_copy = rd_kafka_headers_copy(hdrs);
Packit 2997f0
Packit 2997f0
                                err = rd_kafka_producev(
Packit 2997f0
                                        rk,
Packit 2997f0
                                        RD_KAFKA_V_RKT(rkt),
Packit 2997f0
                                        RD_KAFKA_V_PARTITION(partition),
Packit 2997f0
                                        RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
Packit 2997f0
                                        RD_KAFKA_V_VALUE(buf, len),
Packit 2997f0
                                        RD_KAFKA_V_HEADERS(hdrs_copy),
Packit 2997f0
                                        RD_KAFKA_V_END);
Packit 2997f0
Packit 2997f0
                                if (err)
Packit 2997f0
                                        rd_kafka_headers_destroy(hdrs_copy);
Packit 2997f0
Packit 2997f0
                        } else {
Packit 2997f0
                                if (rd_kafka_produce(
Packit 2997f0
                                            rkt, partition,
Packit 2997f0
                                            RD_KAFKA_MSG_F_COPY,
Packit 2997f0
                                            /* Payload and length */
Packit 2997f0
                                            buf, len,
Packit 2997f0
                                            /* Optional key and its length */
Packit 2997f0
                                            NULL, 0,
Packit 2997f0
                                            /* Message opaque, provided in
Packit 2997f0
                                             * delivery report callback as
Packit 2997f0
                                             * msg_opaque. */
Packit 2997f0
                                            NULL) == -1) {
Packit 2997f0
                                        err = rd_kafka_last_error();
Packit 2997f0
                                }
Packit 2997f0
                        }
Packit 2997f0
Packit 2997f0
                        if (err) {
Packit 2997f0
                                fprintf(stderr,
Packit 2997f0
                                        "%% Failed to produce to topic %s "
Packit 2997f0
					"partition %i: %s\n",
Packit 2997f0
					rd_kafka_topic_name(rkt), partition,
Packit 2997f0
					rd_kafka_err2str(err));
Packit 2997f0
Packit 2997f0
				/* Poll to handle delivery reports */
Packit 2997f0
				rd_kafka_poll(rk, 0);
Packit 2997f0
				continue;
Packit 2997f0
			}
Packit 2997f0
Packit 2997f0
			if (!quiet)
Packit 2997f0
				fprintf(stderr, "%% Sent %zd bytes to topic "
Packit 2997f0
					"%s partition %i\n",
Packit 2997f0
				len, rd_kafka_topic_name(rkt), partition);
Packit 2997f0
			sendcnt++;
Packit 2997f0
			/* Poll to handle delivery reports */
Packit 2997f0
			rd_kafka_poll(rk, 0);
Packit 2997f0
		}
Packit 2997f0
Packit 2997f0
		/* Poll to handle delivery reports */
Packit 2997f0
		rd_kafka_poll(rk, 0);
Packit 2997f0
Packit 2997f0
		/* Wait for messages to be delivered */
Packit 2997f0
		while (run && rd_kafka_outq_len(rk) > 0)
Packit 2997f0
			rd_kafka_poll(rk, 100);
Packit 2997f0
Packit 2997f0
		/* Destroy topic */
Packit 2997f0
		rd_kafka_topic_destroy(rkt);
Packit 2997f0
Packit 2997f0
		/* Destroy the handle */
Packit 2997f0
		rd_kafka_destroy(rk);
Packit 2997f0
Packit 2997f0
	} else if (mode == 'C') {
Packit 2997f0
		/*
Packit 2997f0
		 * Consumer
Packit 2997f0
		 */
Packit 2997f0
Packit 2997f0
		/* Create Kafka handle */
Packit 2997f0
		if (!(rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf,
Packit 2997f0
					errstr, sizeof(errstr)))) {
Packit 2997f0
			fprintf(stderr,
Packit 2997f0
				"%% Failed to create new consumer: %s\n",
Packit 2997f0
				errstr);
Packit 2997f0
			exit(1);
Packit 2997f0
		}
Packit 2997f0
Packit 2997f0
		/* Add brokers */
Packit 2997f0
		if (rd_kafka_brokers_add(rk, brokers) == 0) {
Packit 2997f0
			fprintf(stderr, "%% No valid brokers specified\n");
Packit 2997f0
			exit(1);
Packit 2997f0
		}
Packit 2997f0
Packit 2997f0
		if (get_wmarks) {
Packit 2997f0
			int64_t lo, hi;
Packit 2997f0
                        rd_kafka_resp_err_t err;
Packit 2997f0
Packit 2997f0
			/* Only query for hi&lo partition watermarks */
Packit 2997f0
Packit 2997f0
			if ((err = rd_kafka_query_watermark_offsets(
Packit 2997f0
				     rk, topic, partition, &lo, &hi, 5000))) {
Packit 2997f0
				fprintf(stderr, "%% query_watermark_offsets() "
Packit 2997f0
					"failed: %s\n",
Packit 2997f0
					rd_kafka_err2str(err));
Packit 2997f0
				exit(1);
Packit 2997f0
			}
Packit 2997f0
Packit 2997f0
			printf("%s [%d]: low - high offsets: "
Packit 2997f0
			       "%"PRId64" - %"PRId64"\n",
Packit 2997f0
			       topic, partition, lo, hi);
Packit 2997f0
Packit 2997f0
			rd_kafka_destroy(rk);
Packit 2997f0
			exit(0);
Packit 2997f0
		}
Packit 2997f0
Packit 2997f0
Packit 2997f0
		/* Create topic */
Packit 2997f0
		rkt = rd_kafka_topic_new(rk, topic, topic_conf);
Packit 2997f0
                topic_conf = NULL; /* Now owned by topic */
Packit 2997f0
Packit 2997f0
		/* Start consuming */
Packit 2997f0
		if (rd_kafka_consume_start(rkt, partition, start_offset) == -1){
Packit 2997f0
			rd_kafka_resp_err_t err = rd_kafka_last_error();
Packit 2997f0
			fprintf(stderr, "%% Failed to start consuming: %s\n",
Packit 2997f0
				rd_kafka_err2str(err));
Packit 2997f0
                        if (err == RD_KAFKA_RESP_ERR__INVALID_ARG)
Packit 2997f0
                                fprintf(stderr,
Packit 2997f0
                                        "%% Broker based offset storage "
Packit 2997f0
                                        "requires a group.id, "
Packit 2997f0
                                        "add: -X group.id=yourGroup\n");
Packit 2997f0
			exit(1);
Packit 2997f0
		}
Packit 2997f0
Packit 2997f0
		while (run) {
Packit 2997f0
			rd_kafka_message_t *rkmessage;
Packit 2997f0
                        rd_kafka_resp_err_t err;
Packit 2997f0
Packit 2997f0
                        /* Poll for errors, etc. */
Packit 2997f0
                        rd_kafka_poll(rk, 0);
Packit 2997f0
Packit 2997f0
			/* Consume single message.
Packit 2997f0
			 * See rdkafka_performance.c for high speed
Packit 2997f0
			 * consuming of messages. */
Packit 2997f0
			rkmessage = rd_kafka_consume(rkt, partition, 1000);
Packit 2997f0
			if (!rkmessage) /* timeout */
Packit 2997f0
				continue;
Packit 2997f0
Packit 2997f0
			msg_consume(rkmessage, NULL);
Packit 2997f0
Packit 2997f0
			/* Return message to rdkafka */
Packit 2997f0
			rd_kafka_message_destroy(rkmessage);
Packit 2997f0
Packit 2997f0
                        if (seek_offset) {
Packit 2997f0
                                err = rd_kafka_seek(rkt, partition, seek_offset,
Packit 2997f0
                                                    2000);
Packit 2997f0
                                if (err)
Packit 2997f0
                                        printf("Seek failed: %s\n",
Packit 2997f0
                                               rd_kafka_err2str(err));
Packit 2997f0
                                else
Packit 2997f0
                                        printf("Seeked to %"PRId64"\n",
Packit 2997f0
                                               seek_offset);
Packit 2997f0
                                seek_offset = 0;
Packit 2997f0
                        }
Packit 2997f0
		}
Packit 2997f0
Packit 2997f0
		/* Stop consuming */
Packit 2997f0
		rd_kafka_consume_stop(rkt, partition);
Packit 2997f0
Packit 2997f0
                while (rd_kafka_outq_len(rk) > 0)
Packit 2997f0
                        rd_kafka_poll(rk, 10);
Packit 2997f0
Packit 2997f0
		/* Destroy topic */
Packit 2997f0
		rd_kafka_topic_destroy(rkt);
Packit 2997f0
Packit 2997f0
		/* Destroy handle */
Packit 2997f0
		rd_kafka_destroy(rk);
Packit 2997f0
Packit 2997f0
        } else if (mode == 'L') {
Packit 2997f0
                rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
Packit 2997f0
Packit 2997f0
		/* Create Kafka handle */
Packit 2997f0
		if (!(rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf,
Packit 2997f0
					errstr, sizeof(errstr)))) {
Packit 2997f0
			fprintf(stderr,
Packit 2997f0
				"%% Failed to create new producer: %s\n",
Packit 2997f0
				errstr);
Packit 2997f0
			exit(1);
Packit 2997f0
		}
Packit 2997f0
Packit 2997f0
		/* Add brokers */
Packit 2997f0
		if (rd_kafka_brokers_add(rk, brokers) == 0) {
Packit 2997f0
			fprintf(stderr, "%% No valid brokers specified\n");
Packit 2997f0
			exit(1);
Packit 2997f0
		}
Packit 2997f0
Packit 2997f0
                /* Create topic */
Packit 2997f0
                if (topic) {
Packit 2997f0
                        rkt = rd_kafka_topic_new(rk, topic, topic_conf);
Packit 2997f0
                        topic_conf = NULL; /* Now owned by topic */
Packit 2997f0
                } else
Packit 2997f0
                        rkt = NULL;
Packit 2997f0
Packit 2997f0
                while (run) {
Packit 2997f0
                        const struct rd_kafka_metadata *metadata;
Packit 2997f0
Packit 2997f0
                        /* Fetch metadata */
Packit 2997f0
                        err = rd_kafka_metadata(rk, rkt ? 0 : 1, rkt,
Packit 2997f0
                                                &metadata, 5000);
Packit 2997f0
                        if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
Packit 2997f0
                                fprintf(stderr,
Packit 2997f0
                                        "%% Failed to acquire metadata: %s\n",
Packit 2997f0
                                        rd_kafka_err2str(err));
Packit 2997f0
                                run = 0;
Packit 2997f0
                                break;
Packit 2997f0
                        }
Packit 2997f0
Packit 2997f0
                        metadata_print(topic, metadata);
Packit 2997f0
Packit 2997f0
                        rd_kafka_metadata_destroy(metadata);
Packit 2997f0
                        run = 0;
Packit 2997f0
                }
Packit 2997f0
Packit 2997f0
		/* Destroy topic */
Packit 2997f0
		if (rkt)
Packit 2997f0
			rd_kafka_topic_destroy(rkt);
Packit 2997f0
Packit 2997f0
		/* Destroy the handle */
Packit 2997f0
		rd_kafka_destroy(rk);
Packit 2997f0
Packit 2997f0
                if (topic_conf)
Packit 2997f0
                        rd_kafka_topic_conf_destroy(topic_conf);
Packit 2997f0
Packit 2997f0
Packit 2997f0
                /* Exit right away, dont wait for background cleanup, we haven't
Packit 2997f0
                 * done anything important anyway. */
Packit 2997f0
                exit(err ? 2 : 0);
Packit 2997f0
        }
Packit 2997f0
Packit 2997f0
        if (hdrs)
Packit 2997f0
                rd_kafka_headers_destroy(hdrs);
Packit 2997f0
Packit 2997f0
        if (topic_conf)
Packit 2997f0
                rd_kafka_topic_conf_destroy(topic_conf);
Packit 2997f0
Packit 2997f0
	/* Let background threads clean up and terminate cleanly. */
Packit 2997f0
	run = 5;
Packit 2997f0
	while (run-- > 0 && rd_kafka_wait_destroyed(1000) == -1)
Packit 2997f0
		printf("Waiting for librdkafka to decommission\n");
Packit 2997f0
	if (run <= 0)
Packit 2997f0
		rd_kafka_dump(stdout, rk);
Packit 2997f0
Packit 2997f0
	return 0;
Packit 2997f0
}