Blame examples/rdkafka_consumer_example.c

Packit 2997f0
/*
Packit 2997f0
 * librdkafka - Apache Kafka C library
Packit 2997f0
 *
Packit 2997f0
 * Copyright (c) 2015, Magnus Edenhill
Packit 2997f0
 * All rights reserved.
Packit 2997f0
 * 
Packit 2997f0
 * Redistribution and use in source and binary forms, with or without
Packit 2997f0
 * modification, are permitted provided that the following conditions are met: 
Packit 2997f0
 * 
Packit 2997f0
 * 1. Redistributions of source code must retain the above copyright notice,
Packit 2997f0
 *    this list of conditions and the following disclaimer. 
Packit 2997f0
 * 2. Redistributions in binary form must reproduce the above copyright notice,
Packit 2997f0
 *    this list of conditions and the following disclaimer in the documentation
Packit 2997f0
 *    and/or other materials provided with the distribution. 
Packit 2997f0
 * 
Packit 2997f0
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
Packit 2997f0
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
Packit 2997f0
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
Packit 2997f0
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
Packit 2997f0
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
Packit 2997f0
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
Packit 2997f0
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
Packit 2997f0
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
Packit 2997f0
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
Packit 2997f0
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
Packit 2997f0
 * POSSIBILITY OF SUCH DAMAGE.
Packit 2997f0
 */
Packit 2997f0
Packit 2997f0
/**
Packit 2997f0
 * Apache Kafka high level consumer example program
Packit 2997f0
 * using the Kafka driver from librdkafka
Packit 2997f0
 * (https://github.com/edenhill/librdkafka)
Packit 2997f0
 */
Packit 2997f0
Packit 2997f0
#include <ctype.h>
Packit 2997f0
#include <signal.h>
Packit 2997f0
#include <string.h>
Packit 2997f0
#include <unistd.h>
Packit 2997f0
#include <stdlib.h>
Packit 2997f0
#include <syslog.h>
Packit 2997f0
#include <sys/time.h>
Packit 2997f0
#include <errno.h>
Packit 2997f0
#include <getopt.h>
Packit 2997f0
Packit 2997f0
/* Typical include path would be <librdkafka/rdkafka.h>, but this program
Packit 2997f0
 * is builtin from within the librdkafka source tree and thus differs. */
Packit 2997f0
#include "rdkafka.h"  /* for Kafka driver */
Packit 2997f0
Packit 2997f0
Packit 2997f0
static int run = 1;
Packit 2997f0
static rd_kafka_t *rk;
Packit 2997f0
static int exit_eof = 0;
Packit 2997f0
static int wait_eof = 0;  /* number of partitions awaiting EOF */
Packit 2997f0
static int quiet = 0;
Packit 2997f0
static 	enum {
Packit 2997f0
	OUTPUT_HEXDUMP,
Packit 2997f0
	OUTPUT_RAW,
Packit 2997f0
} output = OUTPUT_HEXDUMP;
Packit 2997f0
Packit 2997f0
static void stop (int sig) {
Packit 2997f0
        if (!run)
Packit 2997f0
                exit(1);
Packit 2997f0
	run = 0;
Packit 2997f0
	fclose(stdin); /* abort fgets() */
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
Packit 2997f0
static void hexdump (FILE *fp, const char *name, const void *ptr, size_t len) {
Packit 2997f0
	const char *p = (const char *)ptr;
Packit 2997f0
	unsigned int of = 0;
Packit 2997f0
Packit 2997f0
Packit 2997f0
	if (name)
Packit 2997f0
		fprintf(fp, "%s hexdump (%zd bytes):\n", name, len);
Packit 2997f0
Packit 2997f0
	for (of = 0 ; of < len ; of += 16) {
Packit 2997f0
		char hexen[16*3+1];
Packit 2997f0
		char charen[16+1];
Packit 2997f0
		int hof = 0;
Packit 2997f0
Packit 2997f0
		int cof = 0;
Packit 2997f0
		int i;
Packit 2997f0
Packit 2997f0
		for (i = of ; i < (int)of + 16 && i < (int)len ; i++) {
Packit 2997f0
			hof += sprintf(hexen+hof, "%02x ", p[i] & 0xff);
Packit 2997f0
			cof += sprintf(charen+cof, "%c",
Packit 2997f0
				       isprint((int)p[i]) ? p[i] : '.');
Packit 2997f0
		}
Packit 2997f0
		fprintf(fp, "%08x: %-48s %-16s\n",
Packit 2997f0
			of, hexen, charen);
Packit 2997f0
	}
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
/**
Packit 2997f0
 * Kafka logger callback (optional)
Packit 2997f0
 */
Packit 2997f0
static void logger (const rd_kafka_t *rk, int level,
Packit 2997f0
		    const char *fac, const char *buf) {
Packit 2997f0
	struct timeval tv;
Packit 2997f0
	gettimeofday(&tv, NULL);
Packit 2997f0
	fprintf(stdout, "%u.%03u RDKAFKA-%i-%s: %s: %s\n",
Packit 2997f0
		(int)tv.tv_sec, (int)(tv.tv_usec / 1000),
Packit 2997f0
		level, fac, rd_kafka_name(rk), buf);
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
Packit 2997f0
Packit 2997f0
/**
Packit 2997f0
 * Handle and print a consumed message.
Packit 2997f0
 * Internally crafted messages are also used to propagate state from
Packit 2997f0
 * librdkafka to the application. The application needs to check
Packit 2997f0
 * the `rkmessage->err` field for this purpose.
Packit 2997f0
 */
Packit 2997f0
static void msg_consume (rd_kafka_message_t *rkmessage) {
Packit 2997f0
	if (rkmessage->err) {
Packit 2997f0
		if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
Packit 2997f0
			fprintf(stderr,
Packit 2997f0
				"%% Consumer reached end of %s [%"PRId32"] "
Packit 2997f0
			       "message queue at offset %"PRId64"\n",
Packit 2997f0
			       rd_kafka_topic_name(rkmessage->rkt),
Packit 2997f0
			       rkmessage->partition, rkmessage->offset);
Packit 2997f0
Packit 2997f0
			if (exit_eof && --wait_eof == 0) {
Packit 2997f0
                                fprintf(stderr,
Packit 2997f0
                                        "%% All partition(s) reached EOF: "
Packit 2997f0
                                        "exiting\n");
Packit 2997f0
				run = 0;
Packit 2997f0
                        }
Packit 2997f0
Packit 2997f0
			return;
Packit 2997f0
		}
Packit 2997f0
Packit 2997f0
                if (rkmessage->rkt)
Packit 2997f0
                        fprintf(stderr, "%% Consume error for "
Packit 2997f0
                                "topic \"%s\" [%"PRId32"] "
Packit 2997f0
                                "offset %"PRId64": %s\n",
Packit 2997f0
                                rd_kafka_topic_name(rkmessage->rkt),
Packit 2997f0
                                rkmessage->partition,
Packit 2997f0
                                rkmessage->offset,
Packit 2997f0
                                rd_kafka_message_errstr(rkmessage));
Packit 2997f0
                else
Packit 2997f0
                        fprintf(stderr, "%% Consumer error: %s: %s\n",
Packit 2997f0
                                rd_kafka_err2str(rkmessage->err),
Packit 2997f0
                                rd_kafka_message_errstr(rkmessage));
Packit 2997f0
Packit 2997f0
                if (rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION ||
Packit 2997f0
                    rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC)
Packit 2997f0
                        run = 0;
Packit 2997f0
		return;
Packit 2997f0
	}
Packit 2997f0
Packit 2997f0
	if (!quiet)
Packit 2997f0
		fprintf(stdout, "%% Message (topic %s [%"PRId32"], "
Packit 2997f0
                        "offset %"PRId64", %zd bytes):\n",
Packit 2997f0
                        rd_kafka_topic_name(rkmessage->rkt),
Packit 2997f0
                        rkmessage->partition,
Packit 2997f0
			rkmessage->offset, rkmessage->len);
Packit 2997f0
Packit 2997f0
	if (rkmessage->key_len) {
Packit 2997f0
		if (output == OUTPUT_HEXDUMP)
Packit 2997f0
			hexdump(stdout, "Message Key",
Packit 2997f0
				rkmessage->key, rkmessage->key_len);
Packit 2997f0
		else
Packit 2997f0
			printf("Key: %.*s\n",
Packit 2997f0
			       (int)rkmessage->key_len, (char *)rkmessage->key);
Packit 2997f0
	}
Packit 2997f0
Packit 2997f0
	if (output == OUTPUT_HEXDUMP)
Packit 2997f0
		hexdump(stdout, "Message Payload",
Packit 2997f0
			rkmessage->payload, rkmessage->len);
Packit 2997f0
	else
Packit 2997f0
		printf("%.*s\n",
Packit 2997f0
		       (int)rkmessage->len, (char *)rkmessage->payload);
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
Packit 2997f0
static void print_partition_list (FILE *fp,
Packit 2997f0
                                  const rd_kafka_topic_partition_list_t
Packit 2997f0
                                  *partitions) {
Packit 2997f0
        int i;
Packit 2997f0
        for (i = 0 ; i < partitions->cnt ; i++) {
Packit 2997f0
                fprintf(stderr, "%s %s [%"PRId32"] offset %"PRId64,
Packit 2997f0
                        i > 0 ? ",":"",
Packit 2997f0
                        partitions->elems[i].topic,
Packit 2997f0
                        partitions->elems[i].partition,
Packit 2997f0
			partitions->elems[i].offset);
Packit 2997f0
        }
Packit 2997f0
        fprintf(stderr, "\n");
Packit 2997f0
Packit 2997f0
}
Packit 2997f0
static void rebalance_cb (rd_kafka_t *rk,
Packit 2997f0
                          rd_kafka_resp_err_t err,
Packit 2997f0
			  rd_kafka_topic_partition_list_t *partitions,
Packit 2997f0
                          void *opaque) {
Packit 2997f0
Packit 2997f0
	fprintf(stderr, "%% Consumer group rebalanced: ");
Packit 2997f0
Packit 2997f0
	switch (err)
Packit 2997f0
	{
Packit 2997f0
	case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
Packit 2997f0
		fprintf(stderr, "assigned:\n");
Packit 2997f0
		print_partition_list(stderr, partitions);
Packit 2997f0
		rd_kafka_assign(rk, partitions);
Packit 2997f0
		wait_eof += partitions->cnt;
Packit 2997f0
		break;
Packit 2997f0
Packit 2997f0
	case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
Packit 2997f0
		fprintf(stderr, "revoked:\n");
Packit 2997f0
		print_partition_list(stderr, partitions);
Packit 2997f0
		rd_kafka_assign(rk, NULL);
Packit 2997f0
		wait_eof = 0;
Packit 2997f0
		break;
Packit 2997f0
Packit 2997f0
	default:
Packit 2997f0
		fprintf(stderr, "failed: %s\n",
Packit 2997f0
                        rd_kafka_err2str(err));
Packit 2997f0
                rd_kafka_assign(rk, NULL);
Packit 2997f0
		break;
Packit 2997f0
	}
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
Packit 2997f0
static int describe_groups (rd_kafka_t *rk, const char *group) {
Packit 2997f0
        rd_kafka_resp_err_t err;
Packit 2997f0
        const struct rd_kafka_group_list *grplist;
Packit 2997f0
        int i;
Packit 2997f0
Packit 2997f0
        err = rd_kafka_list_groups(rk, group, &grplist, 10000);
Packit 2997f0
Packit 2997f0
        if (err) {
Packit 2997f0
                fprintf(stderr, "%% Failed to acquire group list: %s\n",
Packit 2997f0
                        rd_kafka_err2str(err));
Packit 2997f0
                return -1;
Packit 2997f0
        }
Packit 2997f0
Packit 2997f0
        for (i = 0 ; i < grplist->group_cnt ; i++) {
Packit 2997f0
                const struct rd_kafka_group_info *gi = &grplist->groups[i];
Packit 2997f0
                int j;
Packit 2997f0
Packit 2997f0
                printf("Group \"%s\" in state %s on broker %d (%s:%d)\n",
Packit 2997f0
                       gi->group, gi->state,
Packit 2997f0
                       gi->broker.id, gi->broker.host, gi->broker.port);
Packit 2997f0
                if (gi->err)
Packit 2997f0
                        printf(" Error: %s\n", rd_kafka_err2str(gi->err));
Packit 2997f0
                printf(" Protocol type \"%s\", protocol \"%s\", "
Packit 2997f0
                       "with %d member(s):\n",
Packit 2997f0
                       gi->protocol_type, gi->protocol, gi->member_cnt);
Packit 2997f0
Packit 2997f0
                for (j = 0 ; j < gi->member_cnt ; j++) {
Packit 2997f0
                        const struct rd_kafka_group_member_info *mi;
Packit 2997f0
                        mi = &gi->members[j];
Packit 2997f0
Packit 2997f0
                        printf("  \"%s\", client id \"%s\" on host %s\n",
Packit 2997f0
                               mi->member_id, mi->client_id, mi->client_host);
Packit 2997f0
                        printf("    metadata: %d bytes\n",
Packit 2997f0
                               mi->member_metadata_size);
Packit 2997f0
                        printf("    assignment: %d bytes\n",
Packit 2997f0
                               mi->member_assignment_size);
Packit 2997f0
                }
Packit 2997f0
                printf("\n");
Packit 2997f0
        }
Packit 2997f0
Packit 2997f0
        if (group && !grplist->group_cnt)
Packit 2997f0
                fprintf(stderr, "%% No matching group (%s)\n", group);
Packit 2997f0
Packit 2997f0
        rd_kafka_group_list_destroy(grplist);
Packit 2997f0
Packit 2997f0
        return 0;
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
Packit 2997f0
Packit 2997f0
static void sig_usr1 (int sig) {
Packit 2997f0
	rd_kafka_dump(stdout, rk);
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
int main (int argc, char **argv) {
Packit 2997f0
        char mode = 'C';
Packit 2997f0
	char *brokers = "localhost:9092";
Packit 2997f0
	int opt;
Packit 2997f0
	rd_kafka_conf_t *conf;
Packit 2997f0
	rd_kafka_topic_conf_t *topic_conf;
Packit 2997f0
	char errstr[512];
Packit 2997f0
	const char *debug = NULL;
Packit 2997f0
	int do_conf_dump = 0;
Packit 2997f0
	char tmp[16];
Packit 2997f0
        rd_kafka_resp_err_t err;
Packit 2997f0
        char *group = NULL;
Packit 2997f0
        rd_kafka_topic_partition_list_t *topics;
Packit 2997f0
        int is_subscription;
Packit 2997f0
        int i;
Packit 2997f0
Packit 2997f0
	quiet = !isatty(STDIN_FILENO);
Packit 2997f0
Packit 2997f0
	/* Kafka configuration */
Packit 2997f0
	conf = rd_kafka_conf_new();
Packit 2997f0
Packit 2997f0
        /* Set logger */
Packit 2997f0
        rd_kafka_conf_set_log_cb(conf, logger);
Packit 2997f0
Packit 2997f0
	/* Quick termination */
Packit 2997f0
	snprintf(tmp, sizeof(tmp), "%i", SIGIO);
Packit 2997f0
	rd_kafka_conf_set(conf, "internal.termination.signal", tmp, NULL, 0);
Packit 2997f0
Packit 2997f0
	/* Topic configuration */
Packit 2997f0
	topic_conf = rd_kafka_topic_conf_new();
Packit 2997f0
Packit 2997f0
	while ((opt = getopt(argc, argv, "g:b:qd:eX:ADO")) != -1) {
Packit 2997f0
		switch (opt) {
Packit 2997f0
		case 'b':
Packit 2997f0
			brokers = optarg;
Packit 2997f0
			break;
Packit 2997f0
                case 'g':
Packit 2997f0
                        group = optarg;
Packit 2997f0
                        break;
Packit 2997f0
		case 'e':
Packit 2997f0
			exit_eof = 1;
Packit 2997f0
			break;
Packit 2997f0
		case 'd':
Packit 2997f0
			debug = optarg;
Packit 2997f0
			break;
Packit 2997f0
		case 'q':
Packit 2997f0
			quiet = 1;
Packit 2997f0
			break;
Packit 2997f0
		case 'A':
Packit 2997f0
			output = OUTPUT_RAW;
Packit 2997f0
			break;
Packit 2997f0
		case 'X':
Packit 2997f0
		{
Packit 2997f0
			char *name, *val;
Packit 2997f0
			rd_kafka_conf_res_t res;
Packit 2997f0
Packit 2997f0
			if (!strcmp(optarg, "list") ||
Packit 2997f0
			    !strcmp(optarg, "help")) {
Packit 2997f0
				rd_kafka_conf_properties_show(stdout);
Packit 2997f0
				exit(0);
Packit 2997f0
			}
Packit 2997f0
Packit 2997f0
			if (!strcmp(optarg, "dump")) {
Packit 2997f0
				do_conf_dump = 1;
Packit 2997f0
				continue;
Packit 2997f0
			}
Packit 2997f0
Packit 2997f0
			name = optarg;
Packit 2997f0
			if (!(val = strchr(name, '='))) {
Packit 2997f0
				fprintf(stderr, "%% Expected "
Packit 2997f0
					"-X property=value, not %s\n", name);
Packit 2997f0
				exit(1);
Packit 2997f0
			}
Packit 2997f0
Packit 2997f0
			*val = '\0';
Packit 2997f0
			val++;
Packit 2997f0
Packit 2997f0
			res = RD_KAFKA_CONF_UNKNOWN;
Packit 2997f0
			/* Try "topic." prefixed properties on topic
Packit 2997f0
			 * conf first, and then fall through to global if
Packit 2997f0
			 * it didnt match a topic configuration property. */
Packit 2997f0
			if (!strncmp(name, "topic.", strlen("topic.")))
Packit 2997f0
				res = rd_kafka_topic_conf_set(topic_conf,
Packit 2997f0
							      name+
Packit 2997f0
							      strlen("topic."),
Packit 2997f0
							      val,
Packit 2997f0
							      errstr,
Packit 2997f0
							      sizeof(errstr));
Packit 2997f0
Packit 2997f0
			if (res == RD_KAFKA_CONF_UNKNOWN)
Packit 2997f0
				res = rd_kafka_conf_set(conf, name, val,
Packit 2997f0
							errstr, sizeof(errstr));
Packit 2997f0
Packit 2997f0
			if (res != RD_KAFKA_CONF_OK) {
Packit 2997f0
				fprintf(stderr, "%% %s\n", errstr);
Packit 2997f0
				exit(1);
Packit 2997f0
			}
Packit 2997f0
		}
Packit 2997f0
		break;
Packit 2997f0
Packit 2997f0
                case 'D':
Packit 2997f0
                case 'O':
Packit 2997f0
                        mode = opt;
Packit 2997f0
                        break;
Packit 2997f0
Packit 2997f0
		default:
Packit 2997f0
			goto usage;
Packit 2997f0
		}
Packit 2997f0
	}
Packit 2997f0
Packit 2997f0
Packit 2997f0
	if (do_conf_dump) {
Packit 2997f0
		const char **arr;
Packit 2997f0
		size_t cnt;
Packit 2997f0
		int pass;
Packit 2997f0
Packit 2997f0
		for (pass = 0 ; pass < 2 ; pass++) {
Packit 2997f0
			if (pass == 0) {
Packit 2997f0
				arr = rd_kafka_conf_dump(conf, &cnt);
Packit 2997f0
				printf("# Global config\n");
Packit 2997f0
			} else {
Packit 2997f0
				printf("# Topic config\n");
Packit 2997f0
				arr = rd_kafka_topic_conf_dump(topic_conf,
Packit 2997f0
							       &cnt);
Packit 2997f0
			}
Packit 2997f0
Packit 2997f0
			for (i = 0 ; i < (int)cnt ; i += 2)
Packit 2997f0
				printf("%s = %s\n",
Packit 2997f0
				       arr[i], arr[i+1]);
Packit 2997f0
Packit 2997f0
			printf("\n");
Packit 2997f0
Packit 2997f0
			rd_kafka_conf_dump_free(arr, cnt);
Packit 2997f0
		}
Packit 2997f0
Packit 2997f0
		exit(0);
Packit 2997f0
	}
Packit 2997f0
Packit 2997f0
Packit 2997f0
	if (strchr("OC", mode) && optind == argc) {
Packit 2997f0
	usage:
Packit 2997f0
		fprintf(stderr,
Packit 2997f0
			"Usage: %s [options] <topic[:part]> <topic[:part]>..\n"
Packit 2997f0
			"\n"
Packit 2997f0
			"librdkafka version %s (0x%08x)\n"
Packit 2997f0
			"\n"
Packit 2997f0
			" Options:\n"
Packit 2997f0
                        "  -g <group>      Consumer group (%s)\n"
Packit 2997f0
			"  -b <brokers>    Broker address (%s)\n"
Packit 2997f0
			"  -e              Exit consumer when last message\n"
Packit 2997f0
			"                  in partition has been received.\n"
Packit 2997f0
                        "  -D              Describe group.\n"
Packit 2997f0
                        "  -O              Get commmitted offset(s)\n"
Packit 2997f0
			"  -d [facs..]     Enable debugging contexts:\n"
Packit 2997f0
			"                  %s\n"
Packit 2997f0
			"  -q              Be quiet\n"
Packit 2997f0
			"  -A              Raw payload output (consumer)\n"
Packit 2997f0
			"  -X <prop=name> Set arbitrary librdkafka "
Packit 2997f0
			"configuration property\n"
Packit 2997f0
			"               Properties prefixed with \"topic.\" "
Packit 2997f0
			"will be set on topic object.\n"
Packit 2997f0
			"               Use '-X list' to see the full list\n"
Packit 2997f0
			"               of supported properties.\n"
Packit 2997f0
			"\n"
Packit 2997f0
                        "For balanced consumer groups use the 'topic1 topic2..'"
Packit 2997f0
                        " format\n"
Packit 2997f0
                        "and for static assignment use "
Packit 2997f0
                        "'topic1:part1 topic1:part2 topic2:part1..'\n"
Packit 2997f0
			"\n",
Packit 2997f0
			argv[0],
Packit 2997f0
			rd_kafka_version_str(), rd_kafka_version(),
Packit 2997f0
                        group, brokers,
Packit 2997f0
			RD_KAFKA_DEBUG_CONTEXTS);
Packit 2997f0
		exit(1);
Packit 2997f0
	}
Packit 2997f0
Packit 2997f0
Packit 2997f0
	signal(SIGINT, stop);
Packit 2997f0
	signal(SIGUSR1, sig_usr1);
Packit 2997f0
Packit 2997f0
	if (debug &&
Packit 2997f0
	    rd_kafka_conf_set(conf, "debug", debug, errstr, sizeof(errstr)) !=
Packit 2997f0
	    RD_KAFKA_CONF_OK) {
Packit 2997f0
		fprintf(stderr, "%% Debug configuration failed: %s: %s\n",
Packit 2997f0
			errstr, debug);
Packit 2997f0
		exit(1);
Packit 2997f0
	}
Packit 2997f0
Packit 2997f0
        /*
Packit 2997f0
         * Client/Consumer group
Packit 2997f0
         */
Packit 2997f0
Packit 2997f0
        if (strchr("CO", mode)) {
Packit 2997f0
                /* Consumer groups require a group id */
Packit 2997f0
                if (!group)
Packit 2997f0
                        group = "rdkafka_consumer_example";
Packit 2997f0
                if (rd_kafka_conf_set(conf, "group.id", group,
Packit 2997f0
                                      errstr, sizeof(errstr)) !=
Packit 2997f0
                    RD_KAFKA_CONF_OK) {
Packit 2997f0
                        fprintf(stderr, "%% %s\n", errstr);
Packit 2997f0
                        exit(1);
Packit 2997f0
                }
Packit 2997f0
Packit 2997f0
                /* Consumer groups always use broker based offset storage */
Packit 2997f0
                if (rd_kafka_topic_conf_set(topic_conf, "offset.store.method",
Packit 2997f0
                                            "broker",
Packit 2997f0
                                            errstr, sizeof(errstr)) !=
Packit 2997f0
                    RD_KAFKA_CONF_OK) {
Packit 2997f0
                        fprintf(stderr, "%% %s\n", errstr);
Packit 2997f0
                        exit(1);
Packit 2997f0
                }
Packit 2997f0
Packit 2997f0
                /* Set default topic config for pattern-matched topics. */
Packit 2997f0
                rd_kafka_conf_set_default_topic_conf(conf, topic_conf);
Packit 2997f0
Packit 2997f0
                /* Callback called on partition assignment changes */
Packit 2997f0
                rd_kafka_conf_set_rebalance_cb(conf, rebalance_cb);
Packit 2997f0
        }
Packit 2997f0
Packit 2997f0
        /* Create Kafka handle */
Packit 2997f0
        if (!(rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf,
Packit 2997f0
                                errstr, sizeof(errstr)))) {
Packit 2997f0
                fprintf(stderr,
Packit 2997f0
                        "%% Failed to create new consumer: %s\n",
Packit 2997f0
                        errstr);
Packit 2997f0
                exit(1);
Packit 2997f0
        }
Packit 2997f0
Packit 2997f0
        /* Add brokers */
Packit 2997f0
        if (rd_kafka_brokers_add(rk, brokers) == 0) {
Packit 2997f0
                fprintf(stderr, "%% No valid brokers specified\n");
Packit 2997f0
                exit(1);
Packit 2997f0
        }
Packit 2997f0
Packit 2997f0
Packit 2997f0
        if (mode == 'D') {
Packit 2997f0
                int r;
Packit 2997f0
                /* Describe groups */
Packit 2997f0
                r = describe_groups(rk, group);
Packit 2997f0
Packit 2997f0
                rd_kafka_destroy(rk);
Packit 2997f0
                exit(r == -1 ? 1 : 0);
Packit 2997f0
        }
Packit 2997f0
Packit 2997f0
        /* Redirect rd_kafka_poll() to consumer_poll() */
Packit 2997f0
        rd_kafka_poll_set_consumer(rk);
Packit 2997f0
Packit 2997f0
        topics = rd_kafka_topic_partition_list_new(argc - optind);
Packit 2997f0
        is_subscription = 1;
Packit 2997f0
        for (i = optind ; i < argc ; i++) {
Packit 2997f0
                /* Parse "topic[:part] */
Packit 2997f0
                char *topic = argv[i];
Packit 2997f0
                char *t;
Packit 2997f0
                int32_t partition = -1;
Packit 2997f0
Packit 2997f0
                if ((t = strstr(topic, ":"))) {
Packit 2997f0
                        *t = '\0';
Packit 2997f0
                        partition = atoi(t+1);
Packit 2997f0
                        is_subscription = 0; /* is assignment */
Packit 2997f0
                        wait_eof++;
Packit 2997f0
                }
Packit 2997f0
Packit 2997f0
                rd_kafka_topic_partition_list_add(topics, topic, partition);
Packit 2997f0
        }
Packit 2997f0
Packit 2997f0
        if (mode == 'O') {
Packit 2997f0
                /* Offset query */
Packit 2997f0
Packit 2997f0
                err = rd_kafka_committed(rk, topics, 5000);
Packit 2997f0
                if (err) {
Packit 2997f0
                        fprintf(stderr, "%% Failed to fetch offsets: %s\n",
Packit 2997f0
                                rd_kafka_err2str(err));
Packit 2997f0
                        exit(1);
Packit 2997f0
                }
Packit 2997f0
Packit 2997f0
                for (i = 0 ; i < topics->cnt ; i++) {
Packit 2997f0
                        rd_kafka_topic_partition_t *p = &topics->elems[i];
Packit 2997f0
                        printf("Topic \"%s\" partition %"PRId32,
Packit 2997f0
                               p->topic, p->partition);
Packit 2997f0
                        if (p->err)
Packit 2997f0
                                printf(" error %s",
Packit 2997f0
                                       rd_kafka_err2str(p->err));
Packit 2997f0
                        else {
Packit 2997f0
                                printf(" offset %"PRId64"",
Packit 2997f0
                                       p->offset);
Packit 2997f0
Packit 2997f0
                                if (p->metadata_size)
Packit 2997f0
                                        printf(" (%d bytes of metadata)",
Packit 2997f0
                                               (int)p->metadata_size);
Packit 2997f0
                        }
Packit 2997f0
                        printf("\n");
Packit 2997f0
                }
Packit 2997f0
Packit 2997f0
                goto done;
Packit 2997f0
        }
Packit 2997f0
Packit 2997f0
Packit 2997f0
        if (is_subscription) {
Packit 2997f0
                fprintf(stderr, "%% Subscribing to %d topics\n", topics->cnt);
Packit 2997f0
Packit 2997f0
                if ((err = rd_kafka_subscribe(rk, topics))) {
Packit 2997f0
                        fprintf(stderr,
Packit 2997f0
                                "%% Failed to start consuming topics: %s\n",
Packit 2997f0
                                rd_kafka_err2str(err));
Packit 2997f0
                        exit(1);
Packit 2997f0
                }
Packit 2997f0
        } else {
Packit 2997f0
                fprintf(stderr, "%% Assigning %d partitions\n", topics->cnt);
Packit 2997f0
Packit 2997f0
                if ((err = rd_kafka_assign(rk, topics))) {
Packit 2997f0
                        fprintf(stderr,
Packit 2997f0
                                "%% Failed to assign partitions: %s\n",
Packit 2997f0
                                rd_kafka_err2str(err));
Packit 2997f0
                }
Packit 2997f0
        }
Packit 2997f0
Packit 2997f0
        while (run) {
Packit 2997f0
                rd_kafka_message_t *rkmessage;
Packit 2997f0
Packit 2997f0
                rkmessage = rd_kafka_consumer_poll(rk, 1000);
Packit 2997f0
                if (rkmessage) {
Packit 2997f0
                        msg_consume(rkmessage);
Packit 2997f0
                        rd_kafka_message_destroy(rkmessage);
Packit 2997f0
                }
Packit 2997f0
        }
Packit 2997f0
Packit 2997f0
done:
Packit 2997f0
        err = rd_kafka_consumer_close(rk);
Packit 2997f0
        if (err)
Packit 2997f0
                fprintf(stderr, "%% Failed to close consumer: %s\n",
Packit 2997f0
                        rd_kafka_err2str(err));
Packit 2997f0
        else
Packit 2997f0
                fprintf(stderr, "%% Consumer closed\n");
Packit 2997f0
Packit 2997f0
        rd_kafka_topic_partition_list_destroy(topics);
Packit 2997f0
Packit 2997f0
        /* Destroy handle */
Packit 2997f0
        rd_kafka_destroy(rk);
Packit 2997f0
Packit 2997f0
	/* Let background threads clean up and terminate cleanly. */
Packit 2997f0
	run = 5;
Packit 2997f0
	while (run-- > 0 && rd_kafka_wait_destroyed(1000) == -1)
Packit 2997f0
		printf("Waiting for librdkafka to decommission\n");
Packit 2997f0
	if (run <= 0)
Packit 2997f0
		rd_kafka_dump(stdout, rk);
Packit 2997f0
Packit 2997f0
	return 0;
Packit 2997f0
}