Blob Blame History Raw
/*
 * librdkafka - Apache Kafka C library
 *
 * Copyright (c) 2012-2013, Magnus Edenhill
 * All rights reserved.
 * 
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met: 
 * 
 * 1. Redistributions of source code must retain the above copyright notice,
 *    this list of conditions and the following disclaimer. 
 * 2. Redistributions in binary form must reproduce the above copyright notice,
 *    this list of conditions and the following disclaimer in the documentation
 *    and/or other materials provided with the distribution. 
 * 
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */


#define _GNU_SOURCE
#include <errno.h>
#include <string.h>
#include <stdarg.h>
#include <signal.h>
#include <stdlib.h>
#include <sys/stat.h>

#include "rdkafka_int.h"
#include "rdkafka_msg.h"
#include "rdkafka_broker.h"
#include "rdkafka_topic.h"
#include "rdkafka_partition.h"
#include "rdkafka_offset.h"
#include "rdkafka_transport.h"
#include "rdkafka_cgrp.h"
#include "rdkafka_assignor.h"
#include "rdkafka_request.h"
#include "rdkafka_event.h"
#include "rdkafka_sasl.h"
#include "rdkafka_interceptor.h"

#include "rdtime.h"
#include "crc32c.h"
#include "rdunittest.h"

#ifdef _MSC_VER
#include <sys/types.h>
#include <sys/timeb.h>
#endif



static once_flag rd_kafka_global_init_once = ONCE_FLAG_INIT;

/**
 * @brief Global counter+lock for all active librdkafka instances
 */
mtx_t rd_kafka_global_lock;
int rd_kafka_global_cnt;


/**
 * Last API error code, per thread.
 * Shared among all rd_kafka_t instances.
 */
rd_kafka_resp_err_t RD_TLS rd_kafka_last_error_code;


/**
 * Current number of threads created by rdkafka.
 * This is used in regression tests.
 */
rd_atomic32_t rd_kafka_thread_cnt_curr;
int rd_kafka_thread_cnt (void) {
#if ENABLE_SHAREDPTR_DEBUG
        rd_shared_ptrs_dump();
#endif

	return rd_atomic32_get(&rd_kafka_thread_cnt_curr);
}

/**
 * Current thread's log name (TLS)
 */
static char RD_TLS rd_kafka_thread_name[64] = "app";

void rd_kafka_set_thread_name (const char *fmt, ...) {
        va_list ap;

        va_start(ap, fmt);
        rd_vsnprintf(rd_kafka_thread_name, sizeof(rd_kafka_thread_name),
                     fmt, ap);
        va_end(ap);
}

/**
 * @brief Current thread's system name (TLS)
 *
 * Note the name must be 15 characters or less, because it is passed to
 * pthread_setname_np on Linux which imposes this limit.
 */
static char RD_TLS rd_kafka_thread_sysname[16] = "app";

void rd_kafka_set_thread_sysname (const char *fmt, ...) {
        va_list ap;

        va_start(ap, fmt);
        rd_vsnprintf(rd_kafka_thread_sysname, sizeof(rd_kafka_thread_sysname),
                     fmt, ap);
        va_end(ap);

        thrd_setname(rd_kafka_thread_sysname);
}

static void rd_kafka_global_init (void) {
#if ENABLE_SHAREDPTR_DEBUG
        LIST_INIT(&rd_shared_ptr_debug_list);
        mtx_init(&rd_shared_ptr_debug_mtx, mtx_plain);
        atexit(rd_shared_ptrs_dump);
#endif
	mtx_init(&rd_kafka_global_lock, mtx_plain);
#if ENABLE_DEVEL
	rd_atomic32_init(&rd_kafka_op_cnt, 0);
#endif
        crc32c_global_init();
}

/**
 * @returns the current number of active librdkafka instances
 */
static int rd_kafka_global_cnt_get (void) {
	int r;
	mtx_lock(&rd_kafka_global_lock);
	r = rd_kafka_global_cnt;
	mtx_unlock(&rd_kafka_global_lock);
	return r;
}


/**
 * @brief Increase counter for active librdkafka instances.
 * If this is the first instance the global constructors will be called, if any.
 */
static void rd_kafka_global_cnt_incr (void) {
	mtx_lock(&rd_kafka_global_lock);
	rd_kafka_global_cnt++;
	if (rd_kafka_global_cnt == 1) {
		rd_kafka_transport_init();
#if WITH_SSL
		rd_kafka_transport_ssl_init();
#endif
                rd_kafka_sasl_global_init();
	}
	mtx_unlock(&rd_kafka_global_lock);
}

/**
 * @brief Decrease counter for active librdkafka instances.
 * If this counter reaches 0 the global destructors will be called, if any.
 */
static void rd_kafka_global_cnt_decr (void) {
	mtx_lock(&rd_kafka_global_lock);
	rd_kafka_assert(NULL, rd_kafka_global_cnt > 0);
	rd_kafka_global_cnt--;
	if (rd_kafka_global_cnt == 0) {
                rd_kafka_sasl_global_term();
#if WITH_SSL
		rd_kafka_transport_ssl_term();
#endif
	}
	mtx_unlock(&rd_kafka_global_lock);
}


/**
 * Wait for all rd_kafka_t objects to be destroyed.
 * Returns 0 if all kafka objects are now destroyed, or -1 if the
 * timeout was reached.
 */
int rd_kafka_wait_destroyed (int timeout_ms) {
	rd_ts_t timeout = rd_clock() + (timeout_ms * 1000);

	while (rd_kafka_thread_cnt() > 0 ||
	       rd_kafka_global_cnt_get() > 0) {
		if (rd_clock() >= timeout) {
			rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__TIMED_OUT,
						ETIMEDOUT);
#if ENABLE_SHAREDPTR_DEBUG
                        rd_shared_ptrs_dump();
#endif
			return -1;
		}
		rd_usleep(25000, NULL); /* 25ms */
	}

	return 0;
}

static void rd_kafka_log_buf (const rd_kafka_conf_t *conf,
                              const rd_kafka_t *rk, int level, const char *fac,
                              const char *buf) {
        if (level > conf->log_level)
                return;
        else if (rk && conf->log_queue) {
                rd_kafka_op_t *rko;

                if (!rk->rk_logq)
                        return; /* Terminating */

                rko = rd_kafka_op_new(RD_KAFKA_OP_LOG);
                rd_kafka_op_set_prio(rko, RD_KAFKA_PRIO_MEDIUM);
                rko->rko_u.log.level = level;
                strncpy(rko->rko_u.log.fac, fac,
                        sizeof(rko->rko_u.log.fac) - 1);
                rko->rko_u.log.str = rd_strdup(buf);
                rd_kafka_q_enq(rk->rk_logq, rko);

        } else if (conf->log_cb) {
                conf->log_cb(rk, level, fac, buf);
        }
}

/**
 * @brief Logger
 *
 * @remark conf must be set, but rk may be NULL
 */
void rd_kafka_log0 (const rd_kafka_conf_t *conf,
                    const rd_kafka_t *rk,
                    const char *extra, int level,
                    const char *fac, const char *fmt, ...) {
	char buf[2048];
	va_list ap;
	unsigned int elen = 0;
        unsigned int of = 0;

	if (level > conf->log_level)
		return;

	if (conf->log_thread_name) {
		elen = rd_snprintf(buf, sizeof(buf), "[thrd:%s]: ",
				   rd_kafka_thread_name);
		if (unlikely(elen >= sizeof(buf)))
			elen = sizeof(buf);
		of = elen;
	}

	if (extra) {
		elen = rd_snprintf(buf+of, sizeof(buf)-of, "%s: ", extra);
		if (unlikely(elen >= sizeof(buf)-of))
			elen = sizeof(buf)-of;
                of += elen;
	}

	va_start(ap, fmt);
	rd_vsnprintf(buf+of, sizeof(buf)-of, fmt, ap);
	va_end(ap);

        rd_kafka_log_buf(conf, rk, level, fac, buf);
}



void rd_kafka_log_print(const rd_kafka_t *rk, int level,
	const char *fac, const char *buf) {
	int secs, msecs;
	struct timeval tv;
	rd_gettimeofday(&tv, NULL);
	secs = (int)tv.tv_sec;
	msecs = (int)(tv.tv_usec / 1000);
	fprintf(stderr, "%%%i|%u.%03u|%s|%s| %s\n",
		level, secs, msecs,
		fac, rk ? rk->rk_name : "", buf);
}

#ifndef _MSC_VER
void rd_kafka_log_syslog (const rd_kafka_t *rk, int level,
			  const char *fac, const char *buf) {
	static int initialized = 0;

	if (!initialized)
		openlog("rdkafka", LOG_PID|LOG_CONS, LOG_USER);

	syslog(level, "%s: %s: %s", fac, rk ? rk->rk_name : "", buf);
}
#endif

void rd_kafka_set_logger (rd_kafka_t *rk,
			  void (*func) (const rd_kafka_t *rk, int level,
					const char *fac, const char *buf)) {
	rk->rk_conf.log_cb = func;
}

void rd_kafka_set_log_level (rd_kafka_t *rk, int level) {
	rk->rk_conf.log_level = level;
}






static const char *rd_kafka_type2str (rd_kafka_type_t type) {
	static const char *types[] = {
		[RD_KAFKA_PRODUCER] = "producer",
		[RD_KAFKA_CONSUMER] = "consumer",
	};
	return types[type];
}

#define _ERR_DESC(ENUM,DESC) \
	[ENUM - RD_KAFKA_RESP_ERR__BEGIN] = { ENUM, # ENUM + 18/*pfx*/, DESC }

static const struct rd_kafka_err_desc rd_kafka_err_descs[] = {
	_ERR_DESC(RD_KAFKA_RESP_ERR__BEGIN, NULL),
	_ERR_DESC(RD_KAFKA_RESP_ERR__BAD_MSG,
		  "Local: Bad message format"),
	_ERR_DESC(RD_KAFKA_RESP_ERR__BAD_COMPRESSION,
		  "Local: Invalid compressed data"),
	_ERR_DESC(RD_KAFKA_RESP_ERR__DESTROY,
		  "Local: Broker handle destroyed"),
	_ERR_DESC(RD_KAFKA_RESP_ERR__FAIL,
		  "Local: Communication failure with broker"), //FIXME: too specific
	_ERR_DESC(RD_KAFKA_RESP_ERR__TRANSPORT,
		  "Local: Broker transport failure"),
	_ERR_DESC(RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE,
		  "Local: Critical system resource failure"),
	_ERR_DESC(RD_KAFKA_RESP_ERR__RESOLVE,
		  "Local: Host resolution failure"),
	_ERR_DESC(RD_KAFKA_RESP_ERR__MSG_TIMED_OUT,
		  "Local: Message timed out"),
	_ERR_DESC(RD_KAFKA_RESP_ERR__PARTITION_EOF,
		  "Broker: No more messages"),
	_ERR_DESC(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION,
		  "Local: Unknown partition"),
	_ERR_DESC(RD_KAFKA_RESP_ERR__FS,
		  "Local: File or filesystem error"),
	_ERR_DESC(RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC,
		  "Local: Unknown topic"),
	_ERR_DESC(RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN,
		  "Local: All broker connections are down"),
	_ERR_DESC(RD_KAFKA_RESP_ERR__INVALID_ARG,
		  "Local: Invalid argument or configuration"),
	_ERR_DESC(RD_KAFKA_RESP_ERR__TIMED_OUT,
		  "Local: Timed out"),
	_ERR_DESC(RD_KAFKA_RESP_ERR__QUEUE_FULL,
		  "Local: Queue full"),
        _ERR_DESC(RD_KAFKA_RESP_ERR__ISR_INSUFF,
		  "Local: ISR count insufficient"),
        _ERR_DESC(RD_KAFKA_RESP_ERR__NODE_UPDATE,
		  "Local: Broker node update"),
	_ERR_DESC(RD_KAFKA_RESP_ERR__SSL,
		  "Local: SSL error"),
        _ERR_DESC(RD_KAFKA_RESP_ERR__WAIT_COORD,
		  "Local: Waiting for coordinator"),
        _ERR_DESC(RD_KAFKA_RESP_ERR__UNKNOWN_GROUP,
		  "Local: Unknown group"),
        _ERR_DESC(RD_KAFKA_RESP_ERR__IN_PROGRESS,
		  "Local: Operation in progress"),
        _ERR_DESC(RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS,
		  "Local: Previous operation in progress"),
        _ERR_DESC(RD_KAFKA_RESP_ERR__EXISTING_SUBSCRIPTION,
		  "Local: Existing subscription"),
        _ERR_DESC(RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS,
		  "Local: Assign partitions"),
        _ERR_DESC(RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS,
		  "Local: Revoke partitions"),
        _ERR_DESC(RD_KAFKA_RESP_ERR__CONFLICT,
		  "Local: Conflicting use"),
        _ERR_DESC(RD_KAFKA_RESP_ERR__STATE,
		  "Local: Erroneous state"),
        _ERR_DESC(RD_KAFKA_RESP_ERR__UNKNOWN_PROTOCOL,
		  "Local: Unknown protocol"),
        _ERR_DESC(RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED,
		  "Local: Not implemented"),
	_ERR_DESC(RD_KAFKA_RESP_ERR__AUTHENTICATION,
		  "Local: Authentication failure"),
	_ERR_DESC(RD_KAFKA_RESP_ERR__NO_OFFSET,
		  "Local: No offset stored"),
	_ERR_DESC(RD_KAFKA_RESP_ERR__OUTDATED,
		  "Local: Outdated"),
	_ERR_DESC(RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE,
		  "Local: Timed out in queue"),
        _ERR_DESC(RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE,
                  "Local: Required feature not supported by broker"),
        _ERR_DESC(RD_KAFKA_RESP_ERR__WAIT_CACHE,
                  "Local: Awaiting cache update"),
        _ERR_DESC(RD_KAFKA_RESP_ERR__INTR,
                  "Local: Operation interrupted"),
        _ERR_DESC(RD_KAFKA_RESP_ERR__KEY_SERIALIZATION,
                  "Local: Key serialization error"),
        _ERR_DESC(RD_KAFKA_RESP_ERR__VALUE_SERIALIZATION,
                  "Local: Value serialization error"),
        _ERR_DESC(RD_KAFKA_RESP_ERR__KEY_DESERIALIZATION,
                  "Local: Key deserialization error"),
        _ERR_DESC(RD_KAFKA_RESP_ERR__VALUE_DESERIALIZATION,
                  "Local: Value deserialization error"),
        _ERR_DESC(RD_KAFKA_RESP_ERR__PARTIAL,
                  "Local: Partial response"),
        _ERR_DESC(RD_KAFKA_RESP_ERR__READ_ONLY,
                  "Local: Read-only object"),
        _ERR_DESC(RD_KAFKA_RESP_ERR__NOENT,
                  "Local: No such entry"),
        _ERR_DESC(RD_KAFKA_RESP_ERR__UNDERFLOW,
                  "Local: Read underflow"),

	_ERR_DESC(RD_KAFKA_RESP_ERR_UNKNOWN,
		  "Unknown broker error"),
	_ERR_DESC(RD_KAFKA_RESP_ERR_NO_ERROR,
		  "Success"),
	_ERR_DESC(RD_KAFKA_RESP_ERR_OFFSET_OUT_OF_RANGE,
		  "Broker: Offset out of range"),
	_ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_MSG,
		  "Broker: Invalid message"),
	_ERR_DESC(RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART,
		  "Broker: Unknown topic or partition"),
	_ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_MSG_SIZE,
		  "Broker: Invalid message size"),
	_ERR_DESC(RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE,
		  "Broker: Leader not available"),
	_ERR_DESC(RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION,
		  "Broker: Not leader for partition"),
	_ERR_DESC(RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT,
		  "Broker: Request timed out"),
	_ERR_DESC(RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE,
		  "Broker: Broker not available"),
	_ERR_DESC(RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE,
		  "Broker: Replica not available"),
	_ERR_DESC(RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE,
		  "Broker: Message size too large"),
	_ERR_DESC(RD_KAFKA_RESP_ERR_STALE_CTRL_EPOCH,
		  "Broker: StaleControllerEpochCode"),
	_ERR_DESC(RD_KAFKA_RESP_ERR_OFFSET_METADATA_TOO_LARGE,
		  "Broker: Offset metadata string too large"),
	_ERR_DESC(RD_KAFKA_RESP_ERR_NETWORK_EXCEPTION,
		  "Broker: Broker disconnected before response received"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_GROUP_LOAD_IN_PROGRESS,
		  "Broker: Group coordinator load in progress"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE,
		  "Broker: Group coordinator not available"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP,
		  "Broker: Not coordinator for group"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_TOPIC_EXCEPTION,
		  "Broker: Invalid topic"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_RECORD_LIST_TOO_LARGE,
		  "Broker: Message batch larger than configured server "
		  "segment size"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS,
		  "Broker: Not enough in-sync replicas"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND,
		  "Broker: Message(s) written to insufficient number of "
		  "in-sync replicas"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_REQUIRED_ACKS,
		  "Broker: Invalid required acks value"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION,
		  "Broker: Specified group generation id is not valid"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_INCONSISTENT_GROUP_PROTOCOL,
		  "Broker: Inconsistent group protocol"),
	_ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_GROUP_ID,
		  "Broker: Invalid group.id"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID,
		  "Broker: Unknown member"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_SESSION_TIMEOUT,
		  "Broker: Invalid session timeout"),
	_ERR_DESC(RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS,
		  "Broker: Group rebalance in progress"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_COMMIT_OFFSET_SIZE,
		  "Broker: Commit offset data size is not valid"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED,
		  "Broker: Topic authorization failed"),
	_ERR_DESC(RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED,
		  "Broker: Group authorization failed"),
	_ERR_DESC(RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED,
		  "Broker: Cluster authorization failed"),
	_ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_TIMESTAMP,
		  "Broker: Invalid timestamp"),
	_ERR_DESC(RD_KAFKA_RESP_ERR_UNSUPPORTED_SASL_MECHANISM,
		  "Broker: Unsupported SASL mechanism"),
	_ERR_DESC(RD_KAFKA_RESP_ERR_ILLEGAL_SASL_STATE,
		  "Broker: Request not valid in current SASL state"),
	_ERR_DESC(RD_KAFKA_RESP_ERR_UNSUPPORTED_VERSION,
		  "Broker: API version not supported"),
	_ERR_DESC(RD_KAFKA_RESP_ERR_TOPIC_ALREADY_EXISTS,
		  "Broker: Topic already exists"),
	_ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_PARTITIONS,
		  "Broker: Invalid number of partitions"),
	_ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_REPLICATION_FACTOR,
		  "Broker: Invalid replication factor"),
	_ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_REPLICA_ASSIGNMENT,
		  "Broker: Invalid replica assignment"),
	_ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_CONFIG,
		  "Broker: Configuration is invalid"),
	_ERR_DESC(RD_KAFKA_RESP_ERR_NOT_CONTROLLER,
		  "Broker: Not controller for cluster"),
	_ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_REQUEST,
		  "Broker: Invalid request"),
	_ERR_DESC(RD_KAFKA_RESP_ERR_UNSUPPORTED_FOR_MESSAGE_FORMAT,
		  "Broker: Message format on broker does not support request"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_POLICY_VIOLATION,
                  "Broker: Isolation policy volation"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER,
                  "Broker: Broker received an out of order sequence number"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_DUPLICATE_SEQUENCE_NUMBER,
                  "Broker: Broker received a duplicate sequence number"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH,
                  "Broker: Producer attempted an operation with an old epoch"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_TXN_STATE,
                  "Broker: Producer attempted a transactional operation in "
                  "an invalid state"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_PRODUCER_ID_MAPPING,
                  "Broker: Producer attempted to use a producer id which is "
                  "not currently assigned to its transactional id"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_INVALID_TRANSACTION_TIMEOUT,
                  "Broker: Transaction timeout is larger than the maximum "
                  "value allowed by the broker's max.transaction.timeout.ms"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
                  "Broker: Producer attempted to update a transaction while "
                  "another concurrent operation on the same transaction was "
                  "ongoing"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_TRANSACTION_COORDINATOR_FENCED,
                  "Broker: Indicates that the transaction coordinator sending "
                  "a WriteTxnMarker is no longer the current coordinator for "
                  "a given producer"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED,
                  "Broker: Transactional Id authorization failed"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_SECURITY_DISABLED,
                  "Broker: Security features are disabled"),
        _ERR_DESC(RD_KAFKA_RESP_ERR_OPERATION_NOT_ATTEMPTED,
                  "Broker: Operation not attempted"),

	_ERR_DESC(RD_KAFKA_RESP_ERR__END, NULL)
};


void rd_kafka_get_err_descs (const struct rd_kafka_err_desc **errdescs,
			     size_t *cntp) {
	*errdescs = rd_kafka_err_descs;
	*cntp = RD_ARRAYSIZE(rd_kafka_err_descs);
}


const char *rd_kafka_err2str (rd_kafka_resp_err_t err) {
	static RD_TLS char ret[32];
	int idx = err - RD_KAFKA_RESP_ERR__BEGIN;

	if (unlikely(err <= RD_KAFKA_RESP_ERR__BEGIN ||
		     err >= RD_KAFKA_RESP_ERR_END_ALL ||
		     !rd_kafka_err_descs[idx].desc)) {
		rd_snprintf(ret, sizeof(ret), "Err-%i?", err);
		return ret;
	}

	return rd_kafka_err_descs[idx].desc;
}


const char *rd_kafka_err2name (rd_kafka_resp_err_t err) {
	static RD_TLS char ret[32];
	int idx = err - RD_KAFKA_RESP_ERR__BEGIN;

	if (unlikely(err <= RD_KAFKA_RESP_ERR__BEGIN ||
		     err >= RD_KAFKA_RESP_ERR_END_ALL ||
		     !rd_kafka_err_descs[idx].desc)) {
		rd_snprintf(ret, sizeof(ret), "ERR_%i?", err);
		return ret;
	}

	return rd_kafka_err_descs[idx].name;
}


rd_kafka_resp_err_t rd_kafka_last_error (void) {
	return rd_kafka_last_error_code;
}


rd_kafka_resp_err_t rd_kafka_errno2err (int errnox) {
	switch (errnox)
	{
	case EINVAL:
		return RD_KAFKA_RESP_ERR__INVALID_ARG;

        case EBUSY:
                return RD_KAFKA_RESP_ERR__CONFLICT;

	case ENOENT:
		return RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC;

	case ESRCH:
		return RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;

	case ETIMEDOUT:
		return RD_KAFKA_RESP_ERR__TIMED_OUT;

	case EMSGSIZE:
		return RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE;

	case ENOBUFS:
		return RD_KAFKA_RESP_ERR__QUEUE_FULL;

	default:
		return RD_KAFKA_RESP_ERR__FAIL;
	}
}



/**
 * @brief Final destructor for rd_kafka_t, must only be called with refcnt 0.
 *
 * @locality application thread
 */
void rd_kafka_destroy_final (rd_kafka_t *rk) {

        rd_kafka_assert(rk, rd_atomic32_get(&rk->rk_terminate) != 0);

        /* Synchronize state */
        rd_kafka_wrlock(rk);
        rd_kafka_wrunlock(rk);

        rd_kafka_assignors_term(rk);

        rd_kafka_metadata_cache_destroy(rk);

        rd_kafka_timers_destroy(&rk->rk_timers);

        rd_kafka_dbg(rk, GENERIC, "TERMINATE", "Destroying op queues");

        /* Destroy cgrp */
        if (rk->rk_cgrp) {
                rd_kafka_dbg(rk, GENERIC, "TERMINATE",
                             "Destroying cgrp");
                /* Reset queue forwarding (rep -> cgrp) */
                rd_kafka_q_fwd_set(rk->rk_rep, NULL);
                rd_kafka_cgrp_destroy_final(rk->rk_cgrp);
        }

	/* Purge op-queues */
	rd_kafka_q_destroy_owner(rk->rk_rep);
	rd_kafka_q_destroy_owner(rk->rk_ops);

#if WITH_SSL
	if (rk->rk_conf.ssl.ctx) {
                rd_kafka_dbg(rk, GENERIC, "TERMINATE", "Destroying SSL CTX");
                rd_kafka_transport_ssl_ctx_term(rk);
        }
#endif

        /* It is not safe to log after this point. */
        rd_kafka_dbg(rk, GENERIC, "TERMINATE",
                     "Termination done: freeing resources");

        if (rk->rk_logq) {
                rd_kafka_q_destroy_owner(rk->rk_logq);
                rk->rk_logq = NULL;
        }

        if (rk->rk_type == RD_KAFKA_PRODUCER) {
		cnd_destroy(&rk->rk_curr_msgs.cnd);
		mtx_destroy(&rk->rk_curr_msgs.lock);
	}

	cnd_destroy(&rk->rk_broker_state_change_cnd);
	mtx_destroy(&rk->rk_broker_state_change_lock);

	if (rk->rk_full_metadata)
		rd_kafka_metadata_destroy(rk->rk_full_metadata);
        rd_kafkap_str_destroy(rk->rk_client_id);
        rd_kafkap_str_destroy(rk->rk_group_id);
        rd_kafkap_str_destroy(rk->rk_eos.TransactionalId);
	rd_kafka_anyconf_destroy(_RK_GLOBAL, &rk->rk_conf);
        rd_list_destroy(&rk->rk_broker_by_id);

	rd_kafkap_bytes_destroy((rd_kafkap_bytes_t *)rk->rk_null_bytes);
	rwlock_destroy(&rk->rk_lock);

	rd_free(rk);
	rd_kafka_global_cnt_decr();
}


static void rd_kafka_destroy_app (rd_kafka_t *rk, int blocking) {
        thrd_t thrd;
#ifndef _MSC_VER
	int term_sig = rk->rk_conf.term_sig;
#endif
        rd_kafka_dbg(rk, ALL, "DESTROY", "Terminating instance");

        /* The legacy/simple consumer lacks an API to close down the consumer*/
        if (rk->rk_cgrp) {
                rd_kafka_dbg(rk, GENERIC, "TERMINATE",
                             "Closing consumer group");
                rd_kafka_consumer_close(rk);
        }

        rd_kafka_dbg(rk, GENERIC, "TERMINATE", "Interrupting timers");
        rd_kafka_wrlock(rk);
        thrd = rk->rk_thread;
	rd_atomic32_add(&rk->rk_terminate, 1);
        rd_kafka_timers_interrupt(&rk->rk_timers);
        rd_kafka_wrunlock(rk);

        rd_kafka_dbg(rk, GENERIC, "TERMINATE",
                     "Sending TERMINATE to main background thread");
        /* Send op to trigger queue/io wake-up.
         * The op itself is (likely) ignored by the receiver. */
        rd_kafka_q_enq(rk->rk_ops, rd_kafka_op_new(RD_KAFKA_OP_TERMINATE));

	rd_kafka_brokers_broadcast_state_change(rk);

#ifndef _MSC_VER
        /* Interrupt main kafka thread to speed up termination. */
	if (term_sig) {
                rd_kafka_dbg(rk, GENERIC, "TERMINATE",
                             "Sending thread kill signal %d", term_sig);
                pthread_kill(thrd, term_sig);
        }
#endif

        if (!blocking)
                return; /* FIXME: thread resource leak */

        rd_kafka_dbg(rk, GENERIC, "TERMINATE",
                     "Joining main background thread");

        if (thrd_join(thrd, NULL) != thrd_success)
                rd_kafka_log(rk, LOG_ERR, "DESTROY",
                             "Failed to join main thread: %s "
                             "(was process forked?)",
                             rd_strerror(errno));

        rd_kafka_destroy_final(rk);
}


/* NOTE: Must only be called by application.
 *       librdkafka itself must use rd_kafka_destroy0(). */
void rd_kafka_destroy (rd_kafka_t *rk) {
        rd_kafka_destroy_app(rk, 1);
}


/**
 * Main destructor for rd_kafka_t
 *
 * Locality: rdkafka main thread or application thread during rd_kafka_new()
 */
static void rd_kafka_destroy_internal (rd_kafka_t *rk) {
	rd_kafka_itopic_t *rkt, *rkt_tmp;
	rd_kafka_broker_t *rkb, *rkb_tmp;
        rd_list_t wait_thrds;
        thrd_t *thrd;
        int i;

        rd_kafka_dbg(rk, ALL, "DESTROY", "Destroy internal");

        /* Call on_destroy() interceptors */
        rd_kafka_interceptors_on_destroy(rk);

	/* Brokers pick up on rk_terminate automatically. */

        /* List of (broker) threads to join to synchronize termination */
        rd_list_init(&wait_thrds, rd_atomic32_get(&rk->rk_broker_cnt), NULL);

	rd_kafka_wrlock(rk);

        rd_kafka_dbg(rk, ALL, "DESTROY", "Removing all topics");
	/* Decommission all topics */
	TAILQ_FOREACH_SAFE(rkt, &rk->rk_topics, rkt_link, rkt_tmp) {
		rd_kafka_wrunlock(rk);
		rd_kafka_topic_partitions_remove(rkt);
		rd_kafka_wrlock(rk);
	}

        /* Decommission brokers.
         * Broker thread holds a refcount and detects when broker refcounts
         * reaches 1 and then decommissions itself. */
        TAILQ_FOREACH_SAFE(rkb, &rk->rk_brokers, rkb_link, rkb_tmp) {
                /* Add broker's thread to wait_thrds list for later joining */
                thrd = malloc(sizeof(*thrd));
                *thrd = rkb->rkb_thread;
                rd_list_add(&wait_thrds, thrd);
                rd_kafka_wrunlock(rk);

                /* Send op to trigger queue/io wake-up.
                 * The op itself is (likely) ignored by the broker thread. */
                rd_kafka_q_enq(rkb->rkb_ops,
                               rd_kafka_op_new(RD_KAFKA_OP_TERMINATE));

#ifndef _MSC_VER
                /* Interrupt IO threads to speed up termination. */
                if (rk->rk_conf.term_sig)
			pthread_kill(rkb->rkb_thread, rk->rk_conf.term_sig);
#endif

                rd_kafka_broker_destroy(rkb);

                rd_kafka_wrlock(rk);
        }

        if (rk->rk_clusterid) {
                rd_free(rk->rk_clusterid);
                rk->rk_clusterid = NULL;
        }

        rd_kafka_wrunlock(rk);

        rd_kafka_dbg(rk, GENERIC, "TERMINATE",
                     "Purging reply queue");

	/* Purge op-queue */
        rd_kafka_q_disable(rk->rk_rep);
	rd_kafka_q_purge(rk->rk_rep);

	/* Loose our special reference to the internal broker. */
        mtx_lock(&rk->rk_internal_rkb_lock);
	if ((rkb = rk->rk_internal_rkb)) {
                rd_kafka_dbg(rk, GENERIC, "TERMINATE",
                             "Decommissioning internal broker");

                /* Send op to trigger queue wake-up. */
                rd_kafka_q_enq(rkb->rkb_ops,
                               rd_kafka_op_new(RD_KAFKA_OP_TERMINATE));

                rk->rk_internal_rkb = NULL;
                thrd = malloc(sizeof(*thrd));
                *thrd = rkb->rkb_thread;
                rd_list_add(&wait_thrds, thrd);
        }
        mtx_unlock(&rk->rk_internal_rkb_lock);
	if (rkb)
		rd_kafka_broker_destroy(rkb);


        rd_kafka_dbg(rk, GENERIC, "TERMINATE",
                     "Join %d broker thread(s)", rd_list_cnt(&wait_thrds));

        /* Join broker threads */
        RD_LIST_FOREACH(thrd, &wait_thrds, i) {
                if (thrd_join(*thrd, NULL) != thrd_success)
                        ;
                free(thrd);
        }

        rd_list_destroy(&wait_thrds);
}


/* Stats buffer printf */
#define _st_printf(...) do {					\
		ssize_t r;					\
		ssize_t rem = size-of;				\
		r = rd_snprintf(buf+of, rem, __VA_ARGS__);	\
		if (r >= rem) {					\
			size *= 2;				\
			rem = size-of;				\
			buf = rd_realloc(buf, size);		\
			r = rd_snprintf(buf+of, rem, __VA_ARGS__);	\
		}						\
		of += r;					\
	} while (0)

/**
 * Emit stats for toppar
 */
static RD_INLINE void rd_kafka_stats_emit_toppar (char **bufp, size_t *sizep,
					       size_t *ofp,
					       rd_kafka_toppar_t *rktp,
					       int first) {
	char *buf = *bufp;
	size_t size = *sizep;
	size_t of = *ofp;
        int64_t consumer_lag = -1;
        struct offset_stats offs;
        int32_t leader_nodeid = -1;

        rd_kafka_toppar_lock(rktp);

        if (rktp->rktp_leader) {
                rd_kafka_broker_lock(rktp->rktp_leader);
                leader_nodeid = rktp->rktp_leader->rkb_nodeid;
                rd_kafka_broker_unlock(rktp->rktp_leader);
        }

        /* Grab a copy of the latest finalized offset stats */
        offs = rktp->rktp_offsets_fin;

        if (rktp->rktp_hi_offset != RD_KAFKA_OFFSET_INVALID &&
            rktp->rktp_app_offset >= 0) {
                if (unlikely(rktp->rktp_app_offset > rktp->rktp_hi_offset))
                        consumer_lag = 0;
                else
                        consumer_lag = rktp->rktp_hi_offset -
                                rktp->rktp_app_offset;
        }

	_st_printf("%s\"%"PRId32"\": { "
		   "\"partition\":%"PRId32", "
		   "\"leader\":%"PRId32", "
		   "\"desired\":%s, "
		   "\"unknown\":%s, "
		   "\"msgq_cnt\":%i, "
		   "\"msgq_bytes\":%"PRIusz", "
		   "\"xmit_msgq_cnt\":%i, "
		   "\"xmit_msgq_bytes\":%"PRIusz", "
		   "\"fetchq_cnt\":%i, "
		   "\"fetchq_size\":%"PRIu64", "
		   "\"fetch_state\":\"%s\", "
		   "\"query_offset\":%"PRId64", "
		   "\"next_offset\":%"PRId64", "
		   "\"app_offset\":%"PRId64", "
		   "\"stored_offset\":%"PRId64", "
		   "\"commited_offset\":%"PRId64", " /*FIXME: issue #80 */
		   "\"committed_offset\":%"PRId64", "
		   "\"eof_offset\":%"PRId64", "
		   "\"lo_offset\":%"PRId64", "
		   "\"hi_offset\":%"PRId64", "
                   "\"consumer_lag\":%"PRId64", "
		   "\"txmsgs\":%"PRIu64", "
		   "\"txbytes\":%"PRIu64", "
                   "\"msgs\": %"PRIu64", "
                   "\"rx_ver_drops\": %"PRIu64" "
		   "} ",
		   first ? "" : ", ",
		   rktp->rktp_partition,
		   rktp->rktp_partition,
                   leader_nodeid,
		   (rktp->rktp_flags&RD_KAFKA_TOPPAR_F_DESIRED)?"true":"false",
		   (rktp->rktp_flags&RD_KAFKA_TOPPAR_F_UNKNOWN)?"true":"false",
                   rd_kafka_msgq_len(&rktp->rktp_msgq),
		   rd_kafka_msgq_size(&rktp->rktp_msgq),
                   /* FIXME: xmit_msgq is local to the broker thread. */
                   0,
                   (size_t)0,
		   rd_kafka_q_len(rktp->rktp_fetchq),
		   rd_kafka_q_size(rktp->rktp_fetchq),
		   rd_kafka_fetch_states[rktp->rktp_fetch_state],
		   rktp->rktp_query_offset,
                   offs.fetch_offset,
		   rktp->rktp_app_offset,
		   rktp->rktp_stored_offset,
		   rktp->rktp_committed_offset, /* FIXME: issue #80 */
		   rktp->rktp_committed_offset,
                   offs.eof_offset,
		   rktp->rktp_lo_offset,
		   rktp->rktp_hi_offset,
                   consumer_lag,
                   rd_atomic64_get(&rktp->rktp_c.tx_msgs),
		   rd_atomic64_get(&rktp->rktp_c.tx_bytes),
		   rd_atomic64_get(&rktp->rktp_c.msgs),
                   rd_atomic64_get(&rktp->rktp_c.rx_ver_drops));

        rd_kafka_toppar_unlock(rktp);

	*bufp = buf;
	*sizep = size;
	*ofp = of;
}

/**
 * Emit all statistics
 */
static void rd_kafka_stats_emit_all (rd_kafka_t *rk) {
	char  *buf;
	size_t size = 1024*10;
	size_t of = 0;
	rd_kafka_broker_t *rkb;
	rd_kafka_itopic_t *rkt;
	shptr_rd_kafka_toppar_t *s_rktp;
	rd_ts_t now;
	rd_kafka_op_t *rko;
	unsigned int tot_cnt;
	size_t tot_size;

	buf = rd_malloc(size);


	rd_kafka_curr_msgs_get(rk, &tot_cnt, &tot_size);
	rd_kafka_rdlock(rk);

	now = rd_clock();
	_st_printf("{ "
                   "\"name\": \"%s\", "
                   "\"type\": \"%s\", "
		   "\"ts\":%"PRId64", "
		   "\"time\":%lli, "
		   "\"replyq\":%i, "
                   "\"msg_cnt\":%u, "
		   "\"msg_size\":%"PRIusz", "
                   "\"msg_max\":%u, "
		   "\"msg_size_max\":%"PRIusz", "
                   "\"simple_cnt\":%i, "
                   "\"metadata_cache_cnt\":%i, "
		   "\"brokers\":{ "/*open brokers*/,
                   rk->rk_name,
                   rd_kafka_type2str(rk->rk_type),
		   now,
		   (signed long long)time(NULL),
		   rd_kafka_q_len(rk->rk_rep),
		   tot_cnt, tot_size,
		   rk->rk_curr_msgs.max_cnt, rk->rk_curr_msgs.max_size,
                   rd_atomic32_get(&rk->rk_simple_cnt),
                   rk->rk_metadata_cache.rkmc_cnt);


	TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) {
		rd_avg_t rtt, throttle, int_latency;
		rd_kafka_toppar_t *rktp;

		rd_kafka_broker_lock(rkb);
		rd_avg_rollover(&int_latency, &rkb->rkb_avg_int_latency);
		rd_avg_rollover(&rtt, &rkb->rkb_avg_rtt);
		rd_avg_rollover(&throttle, &rkb->rkb_avg_throttle);
		_st_printf("%s\"%s\": { "/*open broker*/
			   "\"name\":\"%s\", "
			   "\"nodeid\":%"PRId32", "
			   "\"state\":\"%s\", "
                           "\"stateage\":%"PRId64", "
			   "\"outbuf_cnt\":%i, "
			   "\"outbuf_msg_cnt\":%i, "
			   "\"waitresp_cnt\":%i, "
			   "\"waitresp_msg_cnt\":%i, "
			   "\"tx\":%"PRIu64", "
			   "\"txbytes\":%"PRIu64", "
			   "\"txerrs\":%"PRIu64", "
			   "\"txretries\":%"PRIu64", "
			   "\"req_timeouts\":%"PRIu64", "
			   "\"rx\":%"PRIu64", "
			   "\"rxbytes\":%"PRIu64", "
			   "\"rxerrs\":%"PRIu64", "
                           "\"rxcorriderrs\":%"PRIu64", "
                           "\"rxpartial\":%"PRIu64", "
                           "\"zbuf_grow\":%"PRIu64", "
                           "\"buf_grow\":%"PRIu64", "
                           "\"wakeups\":%"PRIu64", "
			   "\"int_latency\": {"
			   " \"min\":%"PRId64","
			   " \"max\":%"PRId64","
			   " \"avg\":%"PRId64","
			   " \"sum\":%"PRId64","
			   " \"cnt\":%i "
			   "}, "
			   "\"rtt\": {"
			   " \"min\":%"PRId64","
			   " \"max\":%"PRId64","
			   " \"avg\":%"PRId64","
			   " \"sum\":%"PRId64","
			   " \"cnt\":%i "
			   "}, "
			   "\"throttle\": {"
			   " \"min\":%"PRId64","
			   " \"max\":%"PRId64","
			   " \"avg\":%"PRId64","
			   " \"sum\":%"PRId64","
			   " \"cnt\":%i "
			   "}, "
			   "\"toppars\":{ "/*open toppars*/,
			   rkb == TAILQ_FIRST(&rk->rk_brokers) ? "" : ", ",
			   rkb->rkb_name,
			   rkb->rkb_name,
			   rkb->rkb_nodeid,
			   rd_kafka_broker_state_names[rkb->rkb_state],
                           rkb->rkb_ts_state ? now - rkb->rkb_ts_state : 0,
			   rd_atomic32_get(&rkb->rkb_outbufs.rkbq_cnt),
			   rd_atomic32_get(&rkb->rkb_outbufs.rkbq_msg_cnt),
			   rd_atomic32_get(&rkb->rkb_waitresps.rkbq_cnt),
			   rd_atomic32_get(&rkb->rkb_waitresps.rkbq_msg_cnt),
			   rd_atomic64_get(&rkb->rkb_c.tx),
			   rd_atomic64_get(&rkb->rkb_c.tx_bytes),
			   rd_atomic64_get(&rkb->rkb_c.tx_err),
			   rd_atomic64_get(&rkb->rkb_c.tx_retries),
			   rd_atomic64_get(&rkb->rkb_c.req_timeouts),
			   rd_atomic64_get(&rkb->rkb_c.rx),
			   rd_atomic64_get(&rkb->rkb_c.rx_bytes),
			   rd_atomic64_get(&rkb->rkb_c.rx_err),
			   rd_atomic64_get(&rkb->rkb_c.rx_corrid_err),
			   rd_atomic64_get(&rkb->rkb_c.rx_partial),
                           rd_atomic64_get(&rkb->rkb_c.zbuf_grow),
                           rd_atomic64_get(&rkb->rkb_c.buf_grow),
                           rd_atomic64_get(&rkb->rkb_c.wakeups),
			   int_latency.ra_v.minv,
			   int_latency.ra_v.maxv,
			   int_latency.ra_v.avg,
			   int_latency.ra_v.sum,
			   int_latency.ra_v.cnt,
			   rtt.ra_v.minv,
			   rtt.ra_v.maxv,
			   rtt.ra_v.avg,
			   rtt.ra_v.sum,
			   rtt.ra_v.cnt,
			   throttle.ra_v.minv,
			   throttle.ra_v.maxv,
			   throttle.ra_v.avg,
			   throttle.ra_v.sum,
			   throttle.ra_v.cnt);

		TAILQ_FOREACH(rktp, &rkb->rkb_toppars, rktp_rkblink) {
			_st_printf("%s\"%.*s-%"PRId32"\": { "
				   "\"topic\":\"%.*s\", "
				   "\"partition\":%"PRId32"} ",
				   rktp==TAILQ_FIRST(&rkb->rkb_toppars)?"":", ",
				   RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
                                   rktp->rktp_partition,
				   RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
				   rktp->rktp_partition);
		}

		rd_kafka_broker_unlock(rkb);

		_st_printf("} "/*close toppars*/
			   "} "/*close broker*/);
	}


	_st_printf("}, " /* close "brokers" array */
		   "\"topics\":{ ");

	TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) {
		int i, j;

		rd_kafka_topic_rdlock(rkt);
		_st_printf("%s\"%.*s\": { "
			   "\"topic\":\"%.*s\", "
			   "\"metadata_age\":%"PRId64", "
			   "\"partitions\":{ " /*open partitions*/,
			   rkt==TAILQ_FIRST(&rk->rk_topics)?"":", ",
			   RD_KAFKAP_STR_PR(rkt->rkt_topic),
			   RD_KAFKAP_STR_PR(rkt->rkt_topic),
			   rkt->rkt_ts_metadata ?
			   (rd_clock() - rkt->rkt_ts_metadata)/1000 : 0);

		for (i = 0 ; i < rkt->rkt_partition_cnt ; i++)
			rd_kafka_stats_emit_toppar(&buf, &size, &of,
						   rd_kafka_toppar_s2i(rkt->rkt_p[i]),
						   i == 0);

                RD_LIST_FOREACH(s_rktp, &rkt->rkt_desp, j)
			rd_kafka_stats_emit_toppar(&buf, &size, &of,
						   rd_kafka_toppar_s2i(s_rktp),
						   i+j == 0);

                i += j;

		if (rkt->rkt_ua)
			rd_kafka_stats_emit_toppar(&buf, &size, &of,
						   rd_kafka_toppar_s2i(rkt->rkt_ua),
                                                   i++ == 0);
		rd_kafka_topic_rdunlock(rkt);

		_st_printf("} "/*close partitions*/
			   "} "/*close topic*/);

	}
	_st_printf("} "/*close topics*/);

        if (rk->rk_cgrp) {
                rd_kafka_cgrp_t *rkcg = rk->rk_cgrp;
                _st_printf(", \"cgrp\": { "
                           "\"rebalance_age\": %"PRId64", "
                           "\"rebalance_cnt\": %d, "
                           "\"assignment_size\": %d }",
                           rkcg->rkcg_c.ts_rebalance ?
                           (rd_clock() - rkcg->rkcg_c.ts_rebalance)/1000 : 0,
                           rkcg->rkcg_c.rebalance_cnt,
                           rkcg->rkcg_c.assignment_size);
        }
	rd_kafka_rdunlock(rk);

        _st_printf("}"/*close object*/);


	/* Enqueue op for application */
	rko = rd_kafka_op_new(RD_KAFKA_OP_STATS);
        rd_kafka_op_set_prio(rko, RD_KAFKA_PRIO_HIGH);
	rko->rko_u.stats.json = buf;
	rko->rko_u.stats.json_len = of;
	rd_kafka_q_enq(rk->rk_rep, rko);
}



static void rd_kafka_topic_scan_tmr_cb (rd_kafka_timers_t *rkts, void *arg) {
        rd_kafka_t *rk = rkts->rkts_rk;
	rd_kafka_topic_scan_all(rk, rd_clock());
}

static void rd_kafka_stats_emit_tmr_cb (rd_kafka_timers_t *rkts, void *arg) {
        rd_kafka_t *rk = rkts->rkts_rk;
	rd_kafka_stats_emit_all(rk);
}


/**
 * @brief Periodic metadata refresh callback
 *
 * @locality rdkafka main thread
 */
static void rd_kafka_metadata_refresh_cb (rd_kafka_timers_t *rkts, void *arg) {
        rd_kafka_t *rk = rkts->rkts_rk;
        int sparse = 1;

        /* Dont do sparse requests if there is a consumer group with an
         * active subscription since subscriptions need to be able to match
         * on all topics. */
        if (rk->rk_type == RD_KAFKA_CONSUMER && rk->rk_cgrp &&
            rk->rk_cgrp->rkcg_flags & RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION)
                sparse = 0;

        if (sparse)
                rd_kafka_metadata_refresh_known_topics(rk, NULL, 1/*force*/,
                                                       "periodic refresh");
        else
                rd_kafka_metadata_refresh_all(rk, NULL, "periodic refresh");
}


/**
 * Main loop for Kafka handler thread.
 */
static int rd_kafka_thread_main (void *arg) {
        rd_kafka_t *rk = arg;
	rd_kafka_timer_t tmr_topic_scan = RD_ZERO_INIT;
	rd_kafka_timer_t tmr_stats_emit = RD_ZERO_INIT;
	rd_kafka_timer_t tmr_metadata_refresh = RD_ZERO_INIT;

        rd_kafka_set_thread_name("main");
        rd_kafka_set_thread_sysname("rdk:main");

	(void)rd_atomic32_add(&rd_kafka_thread_cnt_curr, 1);

	/* Acquire lock (which was held by thread creator during creation)
	 * to synchronise state. */
	rd_kafka_wrlock(rk);
	rd_kafka_wrunlock(rk);

	rd_kafka_timer_start(&rk->rk_timers, &tmr_topic_scan, 1000000,
			     rd_kafka_topic_scan_tmr_cb, NULL);
	rd_kafka_timer_start(&rk->rk_timers, &tmr_stats_emit,
			     rk->rk_conf.stats_interval_ms * 1000ll,
			     rd_kafka_stats_emit_tmr_cb, NULL);
        if (rk->rk_conf.metadata_refresh_interval_ms > 0)
                rd_kafka_timer_start(&rk->rk_timers, &tmr_metadata_refresh,
                                     rk->rk_conf.metadata_refresh_interval_ms *
                                     1000ll,
                                     rd_kafka_metadata_refresh_cb, NULL);

        if (rk->rk_cgrp) {
                rd_kafka_cgrp_reassign_broker(rk->rk_cgrp);
                rd_kafka_q_fwd_set(rk->rk_cgrp->rkcg_ops, rk->rk_ops);
        }

	while (likely(!rd_kafka_terminating(rk) ||
		      rd_kafka_q_len(rk->rk_ops))) {
                rd_ts_t sleeptime = rd_kafka_timers_next(
                        &rk->rk_timers, 1000*1000/*1s*/, 1/*lock*/);
                rd_kafka_q_serve(rk->rk_ops, (int)(sleeptime / 1000), 0,
                                 RD_KAFKA_Q_CB_CALLBACK, NULL, NULL);
		if (rk->rk_cgrp) /* FIXME: move to timer-triggered */
			rd_kafka_cgrp_serve(rk->rk_cgrp);
		rd_kafka_timers_run(&rk->rk_timers, RD_POLL_NOWAIT);
	}

	rd_kafka_q_disable(rk->rk_ops);
	rd_kafka_q_purge(rk->rk_ops);

        rd_kafka_timer_stop(&rk->rk_timers, &tmr_topic_scan, 1);
        rd_kafka_timer_stop(&rk->rk_timers, &tmr_stats_emit, 1);
        rd_kafka_timer_stop(&rk->rk_timers, &tmr_metadata_refresh, 1);

        /* Synchronise state */
        rd_kafka_wrlock(rk);
        rd_kafka_wrunlock(rk);

        rd_kafka_destroy_internal(rk);

        rd_kafka_dbg(rk, GENERIC, "TERMINATE",
                     "Main background thread exiting");

	rd_atomic32_sub(&rd_kafka_thread_cnt_curr, 1);

	return 0;
}


static void rd_kafka_term_sig_handler (int sig) {
	/* nop */
}


rd_kafka_t *rd_kafka_new (rd_kafka_type_t type, rd_kafka_conf_t *app_conf,
			  char *errstr, size_t errstr_size) {
	rd_kafka_t *rk;
	static rd_atomic32_t rkid;
        rd_kafka_conf_t *conf;
        rd_kafka_resp_err_t ret_err = RD_KAFKA_RESP_ERR_NO_ERROR;
        int ret_errno = 0;
#ifndef _MSC_VER
        sigset_t newset, oldset;
#endif

	call_once(&rd_kafka_global_init_once, rd_kafka_global_init);

        /* rd_kafka_new() takes ownership of the provided \p app_conf
         * object if rd_kafka_new() succeeds.
         * Since \p app_conf is optional we allocate a default configuration
         * object here if \p app_conf is NULL.
         * The configuration object itself is struct-copied later
         * leaving the default *conf pointer to be ready for freeing.
         * In case new() fails and app_conf was specified we will clear out
         * rk_conf to avoid double-freeing from destroy_internal() and the
         * user's eventual call to rd_kafka_conf_destroy().
         * This is all a bit tricky but that's the nature of
         * legacy interfaces. */
        if (!app_conf)
                conf = rd_kafka_conf_new();
        else
                conf = app_conf;

        /* Verify mandatory configuration */
        if (!conf->socket_cb) {
                rd_snprintf(errstr, errstr_size,
                            "Mandatory config property 'socket_cb' not set");
                if (!app_conf)
                        rd_kafka_conf_destroy(conf);
                rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG, EINVAL);
                return NULL;
        }

        if (!conf->open_cb) {
                rd_snprintf(errstr, errstr_size,
                            "Mandatory config property 'open_cb' not set");
                if (!app_conf)
                        rd_kafka_conf_destroy(conf);
                rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG, EINVAL);
                return NULL;
        }

#if WITH_SSL
        if (conf->ssl.keystore_location && !conf->ssl.keystore_password) {
                rd_snprintf(errstr, errstr_size,
                            "Mandatory config property 'ssl.keystore.password' not set (mandatory because 'ssl.keystore.location' is set)");
                if (!app_conf)
                        rd_kafka_conf_destroy(conf);
                rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG, EINVAL);
                return NULL;
        }
#endif

        if (type == RD_KAFKA_CONSUMER) {
                /* Automatically adjust `fetch.max.bytes` to be >=
                 * `message.max.bytes`. */
                conf->fetch_max_bytes = RD_MAX(conf->fetch_max_bytes,
                                               conf->max_msg_size);

                /* Automatically adjust 'receive.message.max.bytes' to
                 * be 512 bytes larger than 'fetch.max.bytes' to have enough
                 * room for protocol framing (including topic name). */
                conf->recv_max_msg_size = RD_MAX(conf->recv_max_msg_size,
                                                 conf->fetch_max_bytes + 512);
        }

        if (conf->metadata_max_age_ms == -1) {
                if (conf->metadata_refresh_interval_ms > 0)
                        conf->metadata_max_age_ms =
                                conf->metadata_refresh_interval_ms * 3;
                else /* use default value of refresh * 3 */
                        conf->metadata_max_age_ms = 5*60*1000 * 3;
        }

	rd_kafka_global_cnt_incr();

	/*
	 * Set up the handle.
	 */
	rk = rd_calloc(1, sizeof(*rk));

	rk->rk_type = type;

        /* Struct-copy the config object. */
	rk->rk_conf = *conf;
        if (!app_conf)
                rd_free(conf); /* Free the base config struct only,
                                * not its fields since they were copied to
                                * rk_conf just above. Those fields are
                                * freed from rd_kafka_destroy_internal()
                                * as the rk itself is destroyed. */

        /* Call on_new() interceptors */
        rd_kafka_interceptors_on_new(rk, &rk->rk_conf);

	rwlock_init(&rk->rk_lock);
        mtx_init(&rk->rk_internal_rkb_lock, mtx_plain);

	cnd_init(&rk->rk_broker_state_change_cnd);
	mtx_init(&rk->rk_broker_state_change_lock, mtx_plain);

	rk->rk_rep = rd_kafka_q_new(rk);
	rk->rk_ops = rd_kafka_q_new(rk);
        rk->rk_ops->rkq_serve = rd_kafka_poll_cb;
        rk->rk_ops->rkq_opaque = rk;

        if (rk->rk_conf.log_queue) {
                rk->rk_logq = rd_kafka_q_new(rk);
                rk->rk_logq->rkq_serve = rd_kafka_poll_cb;
                rk->rk_logq->rkq_opaque = rk;
        }

	TAILQ_INIT(&rk->rk_brokers);
	TAILQ_INIT(&rk->rk_topics);
        rd_kafka_timers_init(&rk->rk_timers, rk);
        rd_kafka_metadata_cache_init(rk);

	if (rk->rk_conf.dr_cb || rk->rk_conf.dr_msg_cb)
		rk->rk_conf.enabled_events |= RD_KAFKA_EVENT_DR;
	if (rk->rk_conf.rebalance_cb)
		rk->rk_conf.enabled_events |= RD_KAFKA_EVENT_REBALANCE;
	if (rk->rk_conf.offset_commit_cb)
		rk->rk_conf.enabled_events |= RD_KAFKA_EVENT_OFFSET_COMMIT;

	/* Convenience Kafka protocol null bytes */
	rk->rk_null_bytes = rd_kafkap_bytes_new(NULL, 0);

	if (rk->rk_conf.debug)
                rk->rk_conf.log_level = LOG_DEBUG;

	rd_snprintf(rk->rk_name, sizeof(rk->rk_name), "%s#%s-%i",
                    rk->rk_conf.client_id_str, rd_kafka_type2str(rk->rk_type),
                    rd_atomic32_add(&rkid, 1));

	/* Construct clientid kafka string */
	rk->rk_client_id = rd_kafkap_str_new(rk->rk_conf.client_id_str,-1);

        /* Convert group.id to kafka string (may be NULL) */
        rk->rk_group_id = rd_kafkap_str_new(rk->rk_conf.group_id_str,-1);

        /* Config fixups */
        rk->rk_conf.queued_max_msg_bytes =
                (int64_t)rk->rk_conf.queued_max_msg_kbytes * 1000ll;

	/* Enable api.version.request=true if fallback.broker.version
	 * indicates a supporting broker. */
	if (rd_kafka_ApiVersion_is_queryable(rk->rk_conf.broker_version_fallback))
		rk->rk_conf.api_version_request = 1;

	if (rk->rk_type == RD_KAFKA_PRODUCER) {
		mtx_init(&rk->rk_curr_msgs.lock, mtx_plain);
		cnd_init(&rk->rk_curr_msgs.cnd);
		rk->rk_curr_msgs.max_cnt =
			rk->rk_conf.queue_buffering_max_msgs;
                if ((unsigned long long)rk->rk_conf.queue_buffering_max_kbytes * 1024 >
                    (unsigned long long)SIZE_MAX)
                        rk->rk_curr_msgs.max_size = SIZE_MAX;
                else
                        rk->rk_curr_msgs.max_size =
                        (size_t)rk->rk_conf.queue_buffering_max_kbytes * 1024;
	}

        if (rd_kafka_assignors_init(rk, errstr, errstr_size) == -1) {
                ret_err = RD_KAFKA_RESP_ERR__INVALID_ARG;
                ret_errno = EINVAL;
                goto fail;
        }

        if (rk->rk_conf.security_protocol == RD_KAFKA_PROTO_SASL_SSL ||
            rk->rk_conf.security_protocol == RD_KAFKA_PROTO_SASL_PLAINTEXT) {
                if (rd_kafka_sasl_select_provider(rk,
                                                  errstr, errstr_size) == -1) {
                        ret_err = RD_KAFKA_RESP_ERR__INVALID_ARG;
                        ret_errno = EINVAL;
                        goto fail;
                }
        }

#if WITH_SSL
	if (rk->rk_conf.security_protocol == RD_KAFKA_PROTO_SSL ||
	    rk->rk_conf.security_protocol == RD_KAFKA_PROTO_SASL_SSL) {
		/* Create SSL context */
		if (rd_kafka_transport_ssl_ctx_init(rk, errstr,
						    errstr_size) == -1) {
                        ret_err = RD_KAFKA_RESP_ERR__INVALID_ARG;
                        ret_errno = EINVAL;
                        goto fail;
                }
        }
#endif

	/* Client group, eligible both in consumer and producer mode. */
        if (type == RD_KAFKA_CONSUMER &&
	    RD_KAFKAP_STR_LEN(rk->rk_group_id) > 0)
                rk->rk_cgrp = rd_kafka_cgrp_new(rk,
                                                rk->rk_group_id,
                                                rk->rk_client_id);



#ifndef _MSC_VER
        /* Block all signals in newly created thread.
         * To avoid race condition we block all signals in the calling
         * thread, which the new thread will inherit its sigmask from,
         * and then restore the original sigmask of the calling thread when
         * we're done creating the thread. */
        sigemptyset(&oldset);
        sigfillset(&newset);
	if (rk->rk_conf.term_sig) {
		struct sigaction sa_term = {
			.sa_handler = rd_kafka_term_sig_handler
		};
		sigaction(rk->rk_conf.term_sig, &sa_term, NULL);
	}
        pthread_sigmask(SIG_SETMASK, &newset, &oldset);
#endif

	/* Lock handle here to synchronise state, i.e., hold off
	 * the thread until we've finalized the handle. */
	rd_kafka_wrlock(rk);

	/* Create handler thread */
	if ((thrd_create(&rk->rk_thread,
			 rd_kafka_thread_main, rk)) != thrd_success) {
                ret_err = RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE;
                ret_errno = errno;
		if (errstr)
			rd_snprintf(errstr, errstr_size,
				    "Failed to create thread: %s (%i)",
				    rd_strerror(errno), errno);
		rd_kafka_wrunlock(rk);
#ifndef _MSC_VER
                /* Restore sigmask of caller */
                pthread_sigmask(SIG_SETMASK, &oldset, NULL);
#endif
                goto fail;
        }

        rd_kafka_wrunlock(rk);

        rk->rk_eos.PID = -1;
        rk->rk_eos.TransactionalId = rd_kafkap_str_new(NULL, 0);

        mtx_lock(&rk->rk_internal_rkb_lock);
	rk->rk_internal_rkb = rd_kafka_broker_add(rk, RD_KAFKA_INTERNAL,
						  RD_KAFKA_PROTO_PLAINTEXT,
						  "", 0, RD_KAFKA_NODEID_UA);
        mtx_unlock(&rk->rk_internal_rkb_lock);

	/* Add initial list of brokers from configuration */
	if (rk->rk_conf.brokerlist) {
		if (rd_kafka_brokers_add0(rk, rk->rk_conf.brokerlist) == 0)
			rd_kafka_op_err(rk, RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN,
					"No brokers configured");
	}

#ifndef _MSC_VER
	/* Restore sigmask of caller */
	pthread_sigmask(SIG_SETMASK, &oldset, NULL);
#endif

        /* Free user supplied conf's base pointer on success,
         * but not the actual allocated fields since the struct
         * will have been copied in its entirety above. */
        if (app_conf)
                rd_free(app_conf);
	rd_kafka_set_last_error(0, 0);

        rk->rk_initialized = 1;

	return rk;

fail:
        /*
         * Error out and clean up
         */

        /* If on_new() interceptors have been called we also need
         * to allow interceptor clean-up by calling on_destroy() */
        rd_kafka_interceptors_on_destroy(rk);

        /* If rk_conf is a struct-copy of the application configuration
         * we need to avoid rk_conf fields from being freed from
         * rd_kafka_destroy_internal() since they belong to app_conf.
         * However, there are some internal fields, such as interceptors,
         * that belong to rk_conf and thus needs to be cleaned up.
         * Legacy APIs, sigh.. */
        if (app_conf) {
                rd_kafka_assignors_term(rk);
                rd_kafka_interceptors_destroy(&rk->rk_conf);
                memset(&rk->rk_conf, 0, sizeof(rk->rk_conf));
        }

        rd_atomic32_add(&rk->rk_terminate, 1);
        rd_kafka_destroy_internal(rk);
        rd_kafka_destroy_final(rk);

        rd_kafka_set_last_error(ret_err, ret_errno);

        return NULL;
}





/**
 * Produce a single message.
 * Locality: any application thread
 */
int rd_kafka_produce (rd_kafka_topic_t *rkt, int32_t partition,
		      int msgflags,
		      void *payload, size_t len,
		      const void *key, size_t keylen,
		      void *msg_opaque) {
	return rd_kafka_msg_new(rd_kafka_topic_a2i(rkt), partition,
				msgflags, payload, len,
				key, keylen, msg_opaque);
}


/**
 * Counts usage of the legacy/simple consumer (rd_kafka_consume_start() with
 * friends) since it does not have an API for stopping the cgrp we will need to
 * sort that out automatically in the background when all consumption
 * has stopped.
 *
 * Returns 0 if a  High level consumer is already instantiated
 * which means a Simple consumer cannot co-operate with it, else 1.
 *
 * A rd_kafka_t handle can never migrate from simple to high-level, or
 * vice versa, so we dont need a ..consumer_del().
 */
int rd_kafka_simple_consumer_add (rd_kafka_t *rk) {
        if (rd_atomic32_get(&rk->rk_simple_cnt) < 0)
                return 0;

        return (int)rd_atomic32_add(&rk->rk_simple_cnt, 1);
}




/**
 * rktp fetch is split up in these parts:
 *   * application side:
 *   * broker side (handled by current leader broker thread for rktp):
 *          - the fetch state, initial offset, etc.
 *          - fetching messages, updating fetched offset, etc.
 *          - offset commits
 *
 * Communication between the two are:
 *    app side -> rdkafka main side: rktp_ops
 *    broker thread -> app side: rktp_fetchq
 *
 * There is no shared state between these threads, instead
 * state is communicated through the two op queues, and state synchronization
 * is performed by version barriers.
 *
 */

static RD_UNUSED
int rd_kafka_consume_start0 (rd_kafka_itopic_t *rkt, int32_t partition,
				    int64_t offset, rd_kafka_q_t *rkq) {
	shptr_rd_kafka_toppar_t *s_rktp;

	if (partition < 0) {
		rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION,
					ESRCH);
		return -1;
	}

        if (!rd_kafka_simple_consumer_add(rkt->rkt_rk)) {
		rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG, EINVAL);
                return -1;
        }

	rd_kafka_topic_wrlock(rkt);
	s_rktp = rd_kafka_toppar_desired_add(rkt, partition);
	rd_kafka_topic_wrunlock(rkt);

        /* Verify offset */
	if (offset == RD_KAFKA_OFFSET_BEGINNING ||
	    offset == RD_KAFKA_OFFSET_END ||
            offset <= RD_KAFKA_OFFSET_TAIL_BASE) {
                /* logical offsets */

	} else if (offset == RD_KAFKA_OFFSET_STORED) {
		/* offset manager */

                if (rkt->rkt_conf.offset_store_method ==
                    RD_KAFKA_OFFSET_METHOD_BROKER &&
                    RD_KAFKAP_STR_IS_NULL(rkt->rkt_rk->rk_group_id)) {
                        /* Broker based offsets require a group id. */
                        rd_kafka_toppar_destroy(s_rktp);
			rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG,
						EINVAL);
                        return -1;
                }

	} else if (offset < 0) {
		rd_kafka_toppar_destroy(s_rktp);
		rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG,
					EINVAL);
		return -1;

        }

        rd_kafka_toppar_op_fetch_start(rd_kafka_toppar_s2i(s_rktp), offset,
				       rkq, RD_KAFKA_NO_REPLYQ);

        rd_kafka_toppar_destroy(s_rktp);

	rd_kafka_set_last_error(0, 0);
	return 0;
}




int rd_kafka_consume_start (rd_kafka_topic_t *app_rkt, int32_t partition,
			    int64_t offset) {
        rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt);
        rd_kafka_dbg(rkt->rkt_rk, TOPIC, "START",
                     "Start consuming partition %"PRId32,partition);
 	return rd_kafka_consume_start0(rkt, partition, offset, NULL);
}

int rd_kafka_consume_start_queue (rd_kafka_topic_t *app_rkt, int32_t partition,
				  int64_t offset, rd_kafka_queue_t *rkqu) {
        rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt);

 	return rd_kafka_consume_start0(rkt, partition, offset, rkqu->rkqu_q);
}




static RD_UNUSED int rd_kafka_consume_stop0 (rd_kafka_toppar_t *rktp) {
        rd_kafka_q_t *tmpq = NULL;
        rd_kafka_resp_err_t err;

        rd_kafka_topic_wrlock(rktp->rktp_rkt);
        rd_kafka_toppar_lock(rktp);
	rd_kafka_toppar_desired_del(rktp);
        rd_kafka_toppar_unlock(rktp);
	rd_kafka_topic_wrunlock(rktp->rktp_rkt);

        tmpq = rd_kafka_q_new(rktp->rktp_rkt->rkt_rk);

        rd_kafka_toppar_op_fetch_stop(rktp, RD_KAFKA_REPLYQ(tmpq, 0));

        /* Synchronisation: Wait for stop reply from broker thread */
        err = rd_kafka_q_wait_result(tmpq, RD_POLL_INFINITE);
        rd_kafka_q_destroy_owner(tmpq);

	rd_kafka_set_last_error(err, err ? EINVAL : 0);

	return err ? -1 : 0;
}


int rd_kafka_consume_stop (rd_kafka_topic_t *app_rkt, int32_t partition) {
        rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt);
	shptr_rd_kafka_toppar_t *s_rktp;
        int r;

	if (partition == RD_KAFKA_PARTITION_UA) {
		rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INVALID_ARG, EINVAL);
		return -1;
	}

	rd_kafka_topic_wrlock(rkt);
	if (!(s_rktp = rd_kafka_toppar_get(rkt, partition, 0)) &&
	    !(s_rktp = rd_kafka_toppar_desired_get(rkt, partition))) {
		rd_kafka_topic_wrunlock(rkt);
		rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION,
					ESRCH);
		return -1;
	}
        rd_kafka_topic_wrunlock(rkt);

        r = rd_kafka_consume_stop0(rd_kafka_toppar_s2i(s_rktp));
	/* set_last_error() called by stop0() */

        rd_kafka_toppar_destroy(s_rktp);

        return r;
}



rd_kafka_resp_err_t rd_kafka_seek (rd_kafka_topic_t *app_rkt,
                                   int32_t partition,
                                   int64_t offset,
                                   int timeout_ms) {
        rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt);
        shptr_rd_kafka_toppar_t *s_rktp;
	rd_kafka_toppar_t *rktp;
        rd_kafka_q_t *tmpq = NULL;
        rd_kafka_resp_err_t err;

        /* FIXME: simple consumer check */

	if (partition == RD_KAFKA_PARTITION_UA)
                return RD_KAFKA_RESP_ERR__INVALID_ARG;

	rd_kafka_topic_rdlock(rkt);
	if (!(s_rktp = rd_kafka_toppar_get(rkt, partition, 0)) &&
	    !(s_rktp = rd_kafka_toppar_desired_get(rkt, partition))) {
		rd_kafka_topic_rdunlock(rkt);
                return RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
	}
	rd_kafka_topic_rdunlock(rkt);

        if (timeout_ms)
                tmpq = rd_kafka_q_new(rkt->rkt_rk);

        rktp = rd_kafka_toppar_s2i(s_rktp);
        if ((err = rd_kafka_toppar_op_seek(rktp, offset,
					   RD_KAFKA_REPLYQ(tmpq, 0)))) {
                if (tmpq)
                        rd_kafka_q_destroy_owner(tmpq);
                rd_kafka_toppar_destroy(s_rktp);
                return err;
        }

	rd_kafka_toppar_destroy(s_rktp);

        if (tmpq) {
                err = rd_kafka_q_wait_result(tmpq, timeout_ms);
                rd_kafka_q_destroy_owner(tmpq);
                return err;
        }

        return RD_KAFKA_RESP_ERR_NO_ERROR;
}



static ssize_t rd_kafka_consume_batch0 (rd_kafka_q_t *rkq,
					int timeout_ms,
					rd_kafka_message_t **rkmessages,
					size_t rkmessages_size) {
	/* Populate application's rkmessages array. */
	return rd_kafka_q_serve_rkmessages(rkq, timeout_ms,
					   rkmessages, rkmessages_size);
}


ssize_t rd_kafka_consume_batch (rd_kafka_topic_t *app_rkt, int32_t partition,
				int timeout_ms,
				rd_kafka_message_t **rkmessages,
				size_t rkmessages_size) {
        rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt);
	shptr_rd_kafka_toppar_t *s_rktp;
        rd_kafka_toppar_t *rktp;
	ssize_t cnt;

	/* Get toppar */
	rd_kafka_topic_rdlock(rkt);
	s_rktp = rd_kafka_toppar_get(rkt, partition, 0/*no ua on miss*/);
	if (unlikely(!s_rktp))
		s_rktp = rd_kafka_toppar_desired_get(rkt, partition);
	rd_kafka_topic_rdunlock(rkt);

	if (unlikely(!s_rktp)) {
		/* No such toppar known */
		rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION,
					ESRCH);
		return -1;
	}

        rktp = rd_kafka_toppar_s2i(s_rktp);

	/* Populate application's rkmessages array. */
	cnt = rd_kafka_q_serve_rkmessages(rktp->rktp_fetchq, timeout_ms,
					  rkmessages, rkmessages_size);

	rd_kafka_toppar_destroy(s_rktp); /* refcnt from .._get() */

	rd_kafka_set_last_error(0, 0);

	return cnt;
}

ssize_t rd_kafka_consume_batch_queue (rd_kafka_queue_t *rkqu,
				      int timeout_ms,
				      rd_kafka_message_t **rkmessages,
				      size_t rkmessages_size) {
	/* Populate application's rkmessages array. */
	return rd_kafka_consume_batch0(rkqu->rkqu_q, timeout_ms,
				       rkmessages, rkmessages_size);
}


struct consume_ctx {
	void (*consume_cb) (rd_kafka_message_t *rkmessage, void *opaque);
	void *opaque;
};


/**
 * Trampoline for application's consume_cb()
 */
static rd_kafka_op_res_t
rd_kafka_consume_cb (rd_kafka_t *rk,
                     rd_kafka_q_t *rkq,
                     rd_kafka_op_t *rko,
                     rd_kafka_q_cb_type_t cb_type, void *opaque) {
	struct consume_ctx *ctx = opaque;
	rd_kafka_message_t *rkmessage;

        if (unlikely(rd_kafka_op_version_outdated(rko, 0))) {
                rd_kafka_op_destroy(rko);
                return RD_KAFKA_OP_RES_HANDLED;
        }

	rkmessage = rd_kafka_message_get(rko);

	rd_kafka_op_offset_store(rk, rko, rkmessage);

	ctx->consume_cb(rkmessage, ctx->opaque);

        rd_kafka_op_destroy(rko);

        return RD_KAFKA_OP_RES_HANDLED;
}



static rd_kafka_op_res_t
rd_kafka_consume_callback0 (rd_kafka_q_t *rkq, int timeout_ms, int max_cnt,
                            void (*consume_cb) (rd_kafka_message_t
                                                *rkmessage,
                                                void *opaque),
                            void *opaque) {
        struct consume_ctx ctx = { .consume_cb = consume_cb, .opaque = opaque };
        return rd_kafka_q_serve(rkq, timeout_ms, max_cnt,
                                RD_KAFKA_Q_CB_RETURN,
                                rd_kafka_consume_cb, &ctx);

}


int rd_kafka_consume_callback (rd_kafka_topic_t *app_rkt, int32_t partition,
			       int timeout_ms,
			       void (*consume_cb) (rd_kafka_message_t
						   *rkmessage,
						   void *opaque),
			       void *opaque) {
        rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt);
        shptr_rd_kafka_toppar_t *s_rktp;
	rd_kafka_toppar_t *rktp;
	int r;

	/* Get toppar */
	rd_kafka_topic_rdlock(rkt);
	s_rktp = rd_kafka_toppar_get(rkt, partition, 0/*no ua on miss*/);
	if (unlikely(!s_rktp))
		s_rktp = rd_kafka_toppar_desired_get(rkt, partition);
	rd_kafka_topic_rdunlock(rkt);

	if (unlikely(!s_rktp)) {
		/* No such toppar known */
		rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION,
					ESRCH);
		return -1;
	}

        rktp = rd_kafka_toppar_s2i(s_rktp);
	r = rd_kafka_consume_callback0(rktp->rktp_fetchq, timeout_ms,
                                       rkt->rkt_conf.consume_callback_max_msgs,
				       consume_cb, opaque);

	rd_kafka_toppar_destroy(s_rktp);

	rd_kafka_set_last_error(0, 0);

	return r;
}



int rd_kafka_consume_callback_queue (rd_kafka_queue_t *rkqu,
				     int timeout_ms,
				     void (*consume_cb) (rd_kafka_message_t
							 *rkmessage,
							 void *opaque),
				     void *opaque) {
	return rd_kafka_consume_callback0(rkqu->rkqu_q, timeout_ms, 0,
					  consume_cb, opaque);
}


/**
 * Serve queue 'rkq' and return one message.
 * By serving the queue it will also call any registered callbacks
 * registered for matching events, this includes consumer_cb()
 * in which case no message will be returned.
 */
static rd_kafka_message_t *rd_kafka_consume0 (rd_kafka_t *rk,
                                              rd_kafka_q_t *rkq,
					      int timeout_ms) {
	rd_kafka_op_t *rko;
	rd_kafka_message_t *rkmessage = NULL;
	rd_ts_t abs_timeout = rd_timeout_init(timeout_ms);

	rd_kafka_yield_thread = 0;
        while ((rko = rd_kafka_q_pop(rkq,
                                     rd_timeout_remains(abs_timeout), 0))) {
                rd_kafka_op_res_t res;

                res = rd_kafka_poll_cb(rk, rkq, rko,
                                       RD_KAFKA_Q_CB_RETURN, NULL);

                if (res == RD_KAFKA_OP_RES_PASS)
                        break;

                if (unlikely(res == RD_KAFKA_OP_RES_YIELD ||
                            rd_kafka_yield_thread)) {
                        /* Callback called rd_kafka_yield(), we must
                         * stop dispatching the queue and return. */
                        rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INTR,
                                                EINTR);
                        return NULL;
                }

                /* Message was handled by callback. */
                continue;
        }

	if (!rko) {
		/* Timeout reached with no op returned. */
		rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__TIMED_OUT,
					ETIMEDOUT);
		return NULL;
	}

        rd_kafka_assert(rk,
                        rko->rko_type == RD_KAFKA_OP_FETCH ||
                        rko->rko_type == RD_KAFKA_OP_CONSUMER_ERR);

	/* Get rkmessage from rko */
	rkmessage = rd_kafka_message_get(rko);

	/* Store offset */
	rd_kafka_op_offset_store(rk, rko, rkmessage);

	rd_kafka_set_last_error(0, 0);

	return rkmessage;
}

rd_kafka_message_t *rd_kafka_consume (rd_kafka_topic_t *app_rkt,
                                      int32_t partition,
				      int timeout_ms) {
        rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt);
        shptr_rd_kafka_toppar_t *s_rktp;
	rd_kafka_toppar_t *rktp;
	rd_kafka_message_t *rkmessage;

	rd_kafka_topic_rdlock(rkt);
	s_rktp = rd_kafka_toppar_get(rkt, partition, 0/*no ua on miss*/);
	if (unlikely(!s_rktp))
		s_rktp = rd_kafka_toppar_desired_get(rkt, partition);
	rd_kafka_topic_rdunlock(rkt);

	if (unlikely(!s_rktp)) {
		/* No such toppar known */
		rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION,
					ESRCH);
		return NULL;
	}

        rktp = rd_kafka_toppar_s2i(s_rktp);
	rkmessage = rd_kafka_consume0(rkt->rkt_rk,
                                      rktp->rktp_fetchq, timeout_ms);

	rd_kafka_toppar_destroy(s_rktp); /* refcnt from .._get() */

	return rkmessage;
}


rd_kafka_message_t *rd_kafka_consume_queue (rd_kafka_queue_t *rkqu,
					    int timeout_ms) {
	return rd_kafka_consume0(rkqu->rkqu_rk, rkqu->rkqu_q, timeout_ms);
}




rd_kafka_resp_err_t rd_kafka_poll_set_consumer (rd_kafka_t *rk) {
        rd_kafka_cgrp_t *rkcg;

        if (!(rkcg = rd_kafka_cgrp_get(rk)))
                return RD_KAFKA_RESP_ERR__UNKNOWN_GROUP;

        rd_kafka_q_fwd_set(rk->rk_rep, rkcg->rkcg_q);
        return RD_KAFKA_RESP_ERR_NO_ERROR;
}




rd_kafka_message_t *rd_kafka_consumer_poll (rd_kafka_t *rk,
                                            int timeout_ms) {
        rd_kafka_cgrp_t *rkcg;

        if (unlikely(!(rkcg = rd_kafka_cgrp_get(rk)))) {
                rd_kafka_message_t *rkmessage = rd_kafka_message_new();
                rkmessage->err = RD_KAFKA_RESP_ERR__UNKNOWN_GROUP;
                return rkmessage;
        }

        return rd_kafka_consume0(rk, rkcg->rkcg_q, timeout_ms);
}


rd_kafka_resp_err_t rd_kafka_consumer_close (rd_kafka_t *rk) {
        rd_kafka_cgrp_t *rkcg;
        rd_kafka_op_t *rko;
        rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR__TIMED_OUT;
	rd_kafka_q_t *rkq;

        if (!(rkcg = rd_kafka_cgrp_get(rk)))
                return RD_KAFKA_RESP_ERR__UNKNOWN_GROUP;

        rd_kafka_dbg(rk, CONSUMER, "CLOSE", "Closing consumer");

	/* Redirect cgrp queue to our temporary queue to make sure
	 * all posted ops (e.g., rebalance callbacks) are served by
	 * this function. */
	rkq = rd_kafka_q_new(rk);
	rd_kafka_q_fwd_set(rkcg->rkcg_q, rkq);

        rd_kafka_cgrp_terminate(rkcg, RD_KAFKA_REPLYQ(rkq, 0)); /* async */

        while ((rko = rd_kafka_q_pop(rkq, RD_POLL_INFINITE, 0))) {
                rd_kafka_op_res_t res;
                if ((rko->rko_type & ~RD_KAFKA_OP_FLAGMASK) ==
		    RD_KAFKA_OP_TERMINATE) {
                        err = rko->rko_err;
                        rd_kafka_op_destroy(rko);
                        break;
                }
                res = rd_kafka_poll_cb(rk, rkq, rko,
                                       RD_KAFKA_Q_CB_RETURN, NULL);
                if (res == RD_KAFKA_OP_RES_PASS)
                        rd_kafka_op_destroy(rko);
                /* Ignore YIELD, we need to finish */
        }

        rd_kafka_q_fwd_set(rkcg->rkcg_q, NULL);

        rd_kafka_q_destroy_owner(rkq);


        return err;
}



rd_kafka_resp_err_t
rd_kafka_committed (rd_kafka_t *rk,
		    rd_kafka_topic_partition_list_t *partitions,
		    int timeout_ms) {
        rd_kafka_q_t *rkq;
        rd_kafka_resp_err_t err;
        rd_kafka_cgrp_t *rkcg;
	rd_ts_t abs_timeout = rd_timeout_init(timeout_ms);

        if (!partitions)
                return RD_KAFKA_RESP_ERR__INVALID_ARG;

        if (!(rkcg = rd_kafka_cgrp_get(rk)))
                return RD_KAFKA_RESP_ERR__UNKNOWN_GROUP;

	/* Set default offsets. */
	rd_kafka_topic_partition_list_reset_offsets(partitions,
                                                    RD_KAFKA_OFFSET_INVALID);

	rkq = rd_kafka_q_new(rk);

        do {
                rd_kafka_op_t *rko;
		int state_version = rd_kafka_brokers_get_state_version(rk);

                rko = rd_kafka_op_new(RD_KAFKA_OP_OFFSET_FETCH);
		rd_kafka_op_set_replyq(rko, rkq, NULL);

                /* Issue #827
                 * Copy partition list to avoid use-after-free if we time out
                 * here, the app frees the list, and then cgrp starts
                 * processing the op. */
		rko->rko_u.offset_fetch.partitions =
                        rd_kafka_topic_partition_list_copy(partitions);
		rko->rko_u.offset_fetch.do_free = 1;

                if (!rd_kafka_q_enq(rkcg->rkcg_ops, rko)) {
                        err = RD_KAFKA_RESP_ERR__DESTROY;
                        break;
                }

                rko = rd_kafka_q_pop(rkq, rd_timeout_remains(abs_timeout), 0);
                if (rko) {
                        if (!(err = rko->rko_err))
                                rd_kafka_topic_partition_list_update(
                                        partitions,
                                        rko->rko_u.offset_fetch.partitions);
                        else if ((err == RD_KAFKA_RESP_ERR__WAIT_COORD ||
				    err == RD_KAFKA_RESP_ERR__TRANSPORT) &&
				   !rd_kafka_brokers_wait_state_change(
					   rk, state_version,
					   rd_timeout_remains(abs_timeout)))
				err = RD_KAFKA_RESP_ERR__TIMED_OUT;

                        rd_kafka_op_destroy(rko);
                } else
                        err = RD_KAFKA_RESP_ERR__TIMED_OUT;
        } while (err == RD_KAFKA_RESP_ERR__TRANSPORT ||
		 err == RD_KAFKA_RESP_ERR__WAIT_COORD);

        rd_kafka_q_destroy_owner(rkq);

        return err;
}



rd_kafka_resp_err_t
rd_kafka_position (rd_kafka_t *rk,
		   rd_kafka_topic_partition_list_t *partitions) {
 	int i;

	/* Set default offsets. */
	rd_kafka_topic_partition_list_reset_offsets(partitions,
						    RD_KAFKA_OFFSET_INVALID);

	for (i = 0 ; i < partitions->cnt ; i++) {
		rd_kafka_topic_partition_t *rktpar = &partitions->elems[i];
		shptr_rd_kafka_toppar_t *s_rktp;
		rd_kafka_toppar_t *rktp;

		if (!(s_rktp = rd_kafka_toppar_get2(rk, rktpar->topic,
						    rktpar->partition, 0, 1))) {
			rktpar->err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
			rktpar->offset = RD_KAFKA_OFFSET_INVALID;
			continue;
		}

		rktp = rd_kafka_toppar_s2i(s_rktp);
		rd_kafka_toppar_lock(rktp);
		rktpar->offset = rktp->rktp_app_offset;
		rktpar->err = RD_KAFKA_RESP_ERR_NO_ERROR;
		rd_kafka_toppar_unlock(rktp);
		rd_kafka_toppar_destroy(s_rktp);
	}

        return RD_KAFKA_RESP_ERR_NO_ERROR;
}



struct _query_wmark_offsets_state {
	rd_kafka_resp_err_t err;
	const char *topic;
	int32_t partition;
	int64_t offsets[2];
	int     offidx;  /* next offset to set from response */
	rd_ts_t ts_end;
	int     state_version;  /* Broker state version */
};

static void rd_kafka_query_wmark_offsets_resp_cb (rd_kafka_t *rk,
						  rd_kafka_broker_t *rkb,
						  rd_kafka_resp_err_t err,
						  rd_kafka_buf_t *rkbuf,
						  rd_kafka_buf_t *request,
						  void *opaque) {
	struct _query_wmark_offsets_state *state;
        rd_kafka_topic_partition_list_t *offsets;
        rd_kafka_topic_partition_t *rktpar;

        if (err == RD_KAFKA_RESP_ERR__DESTROY) {
                /* 'state' has gone out of scope when query_watermark..()
                 * timed out and returned to the caller. */
                return;
        }

        state = opaque;

        offsets = rd_kafka_topic_partition_list_new(1);
        err = rd_kafka_handle_Offset(rk, rkb, err, rkbuf, request, offsets);
        if (err == RD_KAFKA_RESP_ERR__IN_PROGRESS) {
                rd_kafka_topic_partition_list_destroy(offsets);
                return; /* Retrying */
        }

	/* Retry if no broker connection is available yet. */
	if ((err == RD_KAFKA_RESP_ERR__WAIT_COORD ||
	     err == RD_KAFKA_RESP_ERR__TRANSPORT) &&
	    rkb &&
	    rd_kafka_brokers_wait_state_change(
		    rkb->rkb_rk, state->state_version,
		    rd_timeout_remains(state->ts_end))) {
		/* Retry */
		state->state_version = rd_kafka_brokers_get_state_version(rk);
		request->rkbuf_retries = 0;
		if (rd_kafka_buf_retry(rkb, request)) {
                        rd_kafka_topic_partition_list_destroy(offsets);
                        return; /* Retry in progress */
                }
		/* FALLTHRU */
	}

        /* Partition not seen in response. */
        if (!(rktpar = rd_kafka_topic_partition_list_find(offsets,
                                                          state->topic,
                                                          state->partition)))
                err = RD_KAFKA_RESP_ERR__BAD_MSG;
        else if (rktpar->err)
                err = rktpar->err;
        else
                state->offsets[state->offidx] = rktpar->offset;

        state->offidx++;

        if (err || state->offidx == 2) /* Error or Done */
                state->err = err;

        rd_kafka_topic_partition_list_destroy(offsets);
}


rd_kafka_resp_err_t
rd_kafka_query_watermark_offsets (rd_kafka_t *rk, const char *topic,
                                  int32_t partition,
                                  int64_t *low, int64_t *high, int timeout_ms) {
        rd_kafka_q_t *rkq;
        struct _query_wmark_offsets_state state;
        rd_ts_t ts_end = rd_timeout_init(timeout_ms);
        rd_kafka_topic_partition_list_t *partitions;
        rd_kafka_topic_partition_t *rktpar;
        struct rd_kafka_partition_leader *leader;
        rd_list_t leaders;
        rd_kafka_resp_err_t err;

        partitions = rd_kafka_topic_partition_list_new(1);
        rktpar = rd_kafka_topic_partition_list_add(partitions,
                                                   topic, partition);

        rd_list_init(&leaders, partitions->cnt,
                     (void *)rd_kafka_partition_leader_destroy);

        err = rd_kafka_topic_partition_list_query_leaders(rk, partitions,
                                                          &leaders, timeout_ms);
        if (err) {
                         rd_list_destroy(&leaders);
                         rd_kafka_topic_partition_list_destroy(partitions);
                         return err;
        }

        leader = rd_list_elem(&leaders, 0);

        rkq = rd_kafka_q_new(rk);

        /* Due to KAFKA-1588 we need to send a request for each wanted offset,
         * in this case one for the low watermark and one for the high. */
        state.topic = topic;
        state.partition = partition;
        state.offsets[0] = RD_KAFKA_OFFSET_BEGINNING;
        state.offsets[1] = RD_KAFKA_OFFSET_END;
        state.offidx = 0;
        state.err = RD_KAFKA_RESP_ERR__IN_PROGRESS;
        state.ts_end = ts_end;
        state.state_version = rd_kafka_brokers_get_state_version(rk);


        rktpar->offset =  RD_KAFKA_OFFSET_BEGINNING;
        rd_kafka_OffsetRequest(leader->rkb, partitions, 0,
                               RD_KAFKA_REPLYQ(rkq, 0),
                               rd_kafka_query_wmark_offsets_resp_cb,
                               &state);

        rktpar->offset =  RD_KAFKA_OFFSET_END;
        rd_kafka_OffsetRequest(leader->rkb, partitions, 0,
                               RD_KAFKA_REPLYQ(rkq, 0),
                               rd_kafka_query_wmark_offsets_resp_cb,
                               &state);

        rd_kafka_topic_partition_list_destroy(partitions);
        rd_list_destroy(&leaders);

        /* Wait for reply (or timeout) */
        while (state.err == RD_KAFKA_RESP_ERR__IN_PROGRESS &&
               rd_kafka_q_serve(rkq, 100, 0, RD_KAFKA_Q_CB_CALLBACK,
                                rd_kafka_poll_cb, NULL) !=
               RD_KAFKA_OP_RES_YIELD)
                ;

        rd_kafka_q_destroy_owner(rkq);

        if (state.err)
                return state.err;
        else if (state.offidx != 2)
                return RD_KAFKA_RESP_ERR__FAIL;

        /* We are not certain about the returned order. */
        if (state.offsets[0] < state.offsets[1]) {
                *low = state.offsets[0];
                *high  = state.offsets[1];
        } else {
                *low = state.offsets[1];
                *high = state.offsets[0];
        }

        /* If partition is empty only one offset (the last) will be returned. */
        if (*low < 0 && *high >= 0)
                *low = *high;

        return RD_KAFKA_RESP_ERR_NO_ERROR;
}


rd_kafka_resp_err_t
rd_kafka_get_watermark_offsets (rd_kafka_t *rk, const char *topic,
				int32_t partition,
				int64_t *low, int64_t *high) {
	shptr_rd_kafka_toppar_t *s_rktp;
	rd_kafka_toppar_t *rktp;

	s_rktp = rd_kafka_toppar_get2(rk, topic, partition, 0, 1);
	if (!s_rktp)
		return RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
	rktp = rd_kafka_toppar_s2i(s_rktp);

	rd_kafka_toppar_lock(rktp);
	*low = rktp->rktp_lo_offset;
	*high = rktp->rktp_hi_offset;
	rd_kafka_toppar_unlock(rktp);

	rd_kafka_toppar_destroy(s_rktp);

	return RD_KAFKA_RESP_ERR_NO_ERROR;
}


/**
 * @brief get_offsets_for_times() state
 */
struct _get_offsets_for_times {
        rd_kafka_topic_partition_list_t *results;
        rd_kafka_resp_err_t err;
        int wait_reply;
        int state_version;
        rd_ts_t ts_end;
};

/**
 * @brief Handle OffsetRequest responses
 */
static void rd_kafka_get_offsets_for_times_resp_cb (rd_kafka_t *rk,
                                                  rd_kafka_broker_t *rkb,
                                                  rd_kafka_resp_err_t err,
                                                  rd_kafka_buf_t *rkbuf,
                                                  rd_kafka_buf_t *request,
                                                  void *opaque) {
        struct _get_offsets_for_times *state;

        if (err == RD_KAFKA_RESP_ERR__DESTROY) {
                /* 'state' has gone out of scope when offsets_for_times()
                 * timed out and returned to the caller. */
                return;
        }

        state = opaque;

        err = rd_kafka_handle_Offset(rk, rkb, err, rkbuf, request,
                                     state->results);
        if (err == RD_KAFKA_RESP_ERR__IN_PROGRESS)
                return; /* Retrying */

        /* Retry if no broker connection is available yet. */
        if ((err == RD_KAFKA_RESP_ERR__WAIT_COORD ||
             err == RD_KAFKA_RESP_ERR__TRANSPORT) &&
            rkb &&
            rd_kafka_brokers_wait_state_change(
                    rkb->rkb_rk, state->state_version,
                    rd_timeout_remains(state->ts_end))) {
                /* Retry */
                state->state_version = rd_kafka_brokers_get_state_version(rk);
                request->rkbuf_retries = 0;
                if (rd_kafka_buf_retry(rkb, request))
                        return; /* Retry in progress */
                /* FALLTHRU */
        }

        if (err && !state->err)
                state->err = err;

        state->wait_reply--;
}


rd_kafka_resp_err_t
rd_kafka_offsets_for_times (rd_kafka_t *rk,
                            rd_kafka_topic_partition_list_t *offsets,
                            int timeout_ms) {
        rd_kafka_q_t *rkq;
        struct _get_offsets_for_times state = RD_ZERO_INIT;
        rd_ts_t ts_end = rd_timeout_init(timeout_ms);
        rd_list_t leaders;
        int i;
        rd_kafka_resp_err_t err;
        struct rd_kafka_partition_leader *leader;
        int tmout;

        if (offsets->cnt == 0)
                return RD_KAFKA_RESP_ERR__INVALID_ARG;

        rd_list_init(&leaders, offsets->cnt,
                     (void *)rd_kafka_partition_leader_destroy);

        err = rd_kafka_topic_partition_list_query_leaders(rk, offsets, &leaders,
                                                          timeout_ms);
        if (err) {
                rd_list_destroy(&leaders);
                return err;
        }


        rkq = rd_kafka_q_new(rk);

        state.wait_reply = 0;
        state.results = rd_kafka_topic_partition_list_new(offsets->cnt);

        /* For each leader send a request for its partitions */
        RD_LIST_FOREACH(leader, &leaders, i) {
                state.wait_reply++;
                rd_kafka_OffsetRequest(leader->rkb, leader->partitions, 1,
                                       RD_KAFKA_REPLYQ(rkq, 0),
                                       rd_kafka_get_offsets_for_times_resp_cb,
                                       &state);
        }

        rd_list_destroy(&leaders);

        /* Wait for reply (or timeout) */
        while (state.wait_reply > 0 &&
               !rd_timeout_expired((tmout = rd_timeout_remains(ts_end))))
                rd_kafka_q_serve(rkq, tmout, 0, RD_KAFKA_Q_CB_CALLBACK,
                                 rd_kafka_poll_cb, NULL);

        rd_kafka_q_destroy_owner(rkq);

        if (state.wait_reply > 0 && !state.err)
                state.err = RD_KAFKA_RESP_ERR__TIMED_OUT;

        /* Then update the queried partitions. */
        if (!state.err)
                rd_kafka_topic_partition_list_update(offsets, state.results);

        rd_kafka_topic_partition_list_destroy(state.results);

        return state.err;
}


/**
 * rd_kafka_poll() (and similar) op callback handler.
 * Will either call registered callback depending on cb_type and op type
 * or return op to application, if applicable (e.g., fetch message).
 *
 * Returns 1 if op was handled, else 0.
 *
 * Locality: application thread
 */
rd_kafka_op_res_t
rd_kafka_poll_cb (rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko,
                  rd_kafka_q_cb_type_t cb_type, void *opaque) {
	rd_kafka_msg_t *rkm;

	/* Return-as-event requested, see if op can be converted to event,
	 * otherwise fall through and trigger callbacks. */
	if (cb_type == RD_KAFKA_Q_CB_EVENT && rd_kafka_event_setup(rk, rko))
		return 0; /* Return as event */

        switch ((int)rko->rko_type)
        {
        case RD_KAFKA_OP_FETCH:
                if (!rk->rk_conf.consume_cb ||
                    cb_type == RD_KAFKA_Q_CB_RETURN ||
                    cb_type == RD_KAFKA_Q_CB_FORCE_RETURN)
                        return RD_KAFKA_OP_RES_PASS; /* Dont handle here */
                else {
                        struct consume_ctx ctx = {
                                .consume_cb = rk->rk_conf.consume_cb,
                                .opaque = rk->rk_conf.opaque };

                        return rd_kafka_consume_cb(rk, rkq, rko, cb_type, &ctx);
                }
                break;

        case RD_KAFKA_OP_REBALANCE:
                /* If EVENT_REBALANCE is enabled but rebalance_cb isnt
                 * we need to perform a dummy assign for the application.
                 * This might happen during termination with consumer_close() */
                if (rk->rk_conf.rebalance_cb)
                        rk->rk_conf.rebalance_cb(
                                rk, rko->rko_err,
                                rko->rko_u.rebalance.partitions,
                                rk->rk_conf.opaque);
                else {
                        rd_kafka_dbg(rk, CGRP, "UNASSIGN",
                                     "Forcing unassign of %d partition(s)",
                                     rko->rko_u.rebalance.partitions ?
                                     rko->rko_u.rebalance.partitions->cnt : 0);
                        rd_kafka_assign(rk, NULL);
                }
                break;

        case RD_KAFKA_OP_OFFSET_COMMIT | RD_KAFKA_OP_REPLY:
		if (!rko->rko_u.offset_commit.cb)
			return RD_KAFKA_OP_RES_PASS; /* Dont handle here */
		rko->rko_u.offset_commit.cb(
                        rk, rko->rko_err,
			rko->rko_u.offset_commit.partitions,
			rko->rko_u.offset_commit.opaque);
                break;

        case RD_KAFKA_OP_CONSUMER_ERR:
                /* rd_kafka_consumer_poll() (_Q_CB_CONSUMER):
                 *   Consumer errors are returned to the application
                 *   as rkmessages, not error callbacks.
                 *
                 * rd_kafka_poll() (_Q_CB_GLOBAL):
                 *   convert to ERR op (fallthru)
                 */
                if (cb_type == RD_KAFKA_Q_CB_RETURN ||
                    cb_type == RD_KAFKA_Q_CB_FORCE_RETURN) {
                        /* return as message_t to application */
                        return RD_KAFKA_OP_RES_PASS;
                }
		/* FALLTHRU */

	case RD_KAFKA_OP_ERR:
		if (rk->rk_conf.error_cb)
			rk->rk_conf.error_cb(rk, rko->rko_err,
					     rko->rko_u.err.errstr,
                                             rk->rk_conf.opaque);
                else {
                        /* If error string already contains
                         * the err2str then skip including err2str in
                         * the printout */
                        if (rko->rko_u.err.errstr &&
                            strstr(rko->rko_u.err.errstr,
                                   rd_kafka_err2str(rko->rko_err)))
                                rd_kafka_log(rk, LOG_ERR, "ERROR",
                                             "%s: %s",
                                             rk->rk_name,
                                             rko->rko_u.err.errstr);
                        else
                                rd_kafka_log(rk, LOG_ERR, "ERROR",
                                             "%s: %s: %s",
                                             rk->rk_name,
                                             rko->rko_u.err.errstr,
                                             rd_kafka_err2str(rko->rko_err));
                }
                break;

	case RD_KAFKA_OP_DR:
		/* Delivery report:
		 * call application DR callback for each message. */
		while ((rkm = TAILQ_FIRST(&rko->rko_u.dr.msgq.rkmq_msgs))) {
                        rd_kafka_message_t *rkmessage;

			TAILQ_REMOVE(&rko->rko_u.dr.msgq.rkmq_msgs,
				     rkm, rkm_link);

                        rkmessage = rd_kafka_message_get_from_rkm(rko, rkm);

                        if (rk->rk_conf.dr_msg_cb) {
                                rk->rk_conf.dr_msg_cb(rk, rkmessage,
                                                      rk->rk_conf.opaque);

                        } else {

                                rk->rk_conf.dr_cb(rk,
                                                  rkmessage->payload,
                                                  rkmessage->len,
                                                  rkmessage->err,
                                                  rk->rk_conf.opaque,
                                                  rkmessage->_private);
                        }

                        rd_kafka_msg_destroy(rk, rkm);

                        if (unlikely(rd_kafka_yield_thread)) {
                                /* Callback called yield(),
                                 * re-enqueue the op (if there are any
                                 * remaining messages). */
                                if (!TAILQ_EMPTY(&rko->rko_u.dr.msgq.
                                                 rkmq_msgs))
                                        rd_kafka_q_reenq(rkq, rko);
                                else
                                        rd_kafka_op_destroy(rko);
                                return RD_KAFKA_OP_RES_YIELD;
                        }
		}

		rd_kafka_msgq_init(&rko->rko_u.dr.msgq);

		break;

	case RD_KAFKA_OP_THROTTLE:
		if (rk->rk_conf.throttle_cb)
			rk->rk_conf.throttle_cb(rk, rko->rko_u.throttle.nodename,
						rko->rko_u.throttle.nodeid,
						rko->rko_u.throttle.
						throttle_time,
						rk->rk_conf.opaque);
		break;

	case RD_KAFKA_OP_STATS:
		/* Statistics */
		if (rk->rk_conf.stats_cb &&
		    rk->rk_conf.stats_cb(rk, rko->rko_u.stats.json,
                                         rko->rko_u.stats.json_len,
					 rk->rk_conf.opaque) == 1)
			rko->rko_u.stats.json = NULL; /* Application wanted json ptr */
		break;

        case RD_KAFKA_OP_LOG:
                if (likely(rk->rk_conf.log_cb &&
                           rk->rk_conf.log_level >= rko->rko_u.log.level))
                        rk->rk_conf.log_cb(rk,
                                           rko->rko_u.log.level,
                                           rko->rko_u.log.fac,
                                           rko->rko_u.log.str);
                break;

        case RD_KAFKA_OP_TERMINATE:
                /* nop: just a wake-up */
                break;

        default:
                rd_kafka_assert(rk, !*"cant handle op type");
                break;
        }

        rd_kafka_op_destroy(rko);

        return 1; /* op was handled */
}

int rd_kafka_poll (rd_kafka_t *rk, int timeout_ms) {
        return rd_kafka_q_serve(rk->rk_rep, timeout_ms, 0,
                                RD_KAFKA_Q_CB_CALLBACK, rd_kafka_poll_cb, NULL);
}


rd_kafka_event_t *rd_kafka_queue_poll (rd_kafka_queue_t *rkqu, int timeout_ms) {
        rd_kafka_op_t *rko;
        rko = rd_kafka_q_pop_serve(rkqu->rkqu_q, timeout_ms, 0,
                                   RD_KAFKA_Q_CB_EVENT, rd_kafka_poll_cb, NULL);
        if (!rko)
                return NULL;

        return rko;
}

int rd_kafka_queue_poll_callback (rd_kafka_queue_t *rkqu, int timeout_ms) {
        return rd_kafka_q_serve(rkqu->rkqu_q, timeout_ms, 0,
                                RD_KAFKA_Q_CB_CALLBACK, rd_kafka_poll_cb, NULL);
}



static void rd_kafka_toppar_dump (FILE *fp, const char *indent,
				  rd_kafka_toppar_t *rktp) {

	fprintf(fp, "%s%.*s [%"PRId32"] leader %s\n",
		indent,
		RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
		rktp->rktp_partition,
		rktp->rktp_leader ?
		rktp->rktp_leader->rkb_name : "none");
	fprintf(fp,
		"%s refcnt %i\n"
		"%s msgq:      %i messages\n"
		"%s xmit_msgq: %i messages\n"
		"%s total:     %"PRIu64" messages, %"PRIu64" bytes\n",
		indent, rd_refcnt_get(&rktp->rktp_refcnt),
		indent, rktp->rktp_msgq.rkmq_msg_cnt,
		indent, rktp->rktp_xmit_msgq.rkmq_msg_cnt,
		indent, rd_atomic64_get(&rktp->rktp_c.tx_msgs), rd_atomic64_get(&rktp->rktp_c.tx_bytes));
}

static void rd_kafka_broker_dump (FILE *fp, rd_kafka_broker_t *rkb, int locks) {
	rd_kafka_toppar_t *rktp;

        if (locks)
                rd_kafka_broker_lock(rkb);
        fprintf(fp, " rd_kafka_broker_t %p: %s NodeId %"PRId32
                " in state %s (for %.3fs)\n",
                rkb, rkb->rkb_name, rkb->rkb_nodeid,
                rd_kafka_broker_state_names[rkb->rkb_state],
                rkb->rkb_ts_state ?
                (float)(rd_clock() - rkb->rkb_ts_state) / 1000000.0f :
                0.0f);
        fprintf(fp, "  refcnt %i\n", rd_refcnt_get(&rkb->rkb_refcnt));
        fprintf(fp, "  outbuf_cnt: %i waitresp_cnt: %i\n",
                rd_atomic32_get(&rkb->rkb_outbufs.rkbq_cnt),
                rd_atomic32_get(&rkb->rkb_waitresps.rkbq_cnt));
        fprintf(fp,
                "  %"PRIu64 " messages sent, %"PRIu64" bytes, "
                "%"PRIu64" errors, %"PRIu64" timeouts\n"
                "  %"PRIu64 " messages received, %"PRIu64" bytes, "
                "%"PRIu64" errors\n"
                "  %"PRIu64 " messageset transmissions were retried\n",
                rd_atomic64_get(&rkb->rkb_c.tx), rd_atomic64_get(&rkb->rkb_c.tx_bytes),
                rd_atomic64_get(&rkb->rkb_c.tx_err), rd_atomic64_get(&rkb->rkb_c.req_timeouts),
                rd_atomic64_get(&rkb->rkb_c.rx), rd_atomic64_get(&rkb->rkb_c.rx_bytes),
                rd_atomic64_get(&rkb->rkb_c.rx_err),
                rd_atomic64_get(&rkb->rkb_c.tx_retries));

        fprintf(fp, "  %i toppars:\n", rkb->rkb_toppar_cnt);
        TAILQ_FOREACH(rktp, &rkb->rkb_toppars, rktp_rkblink)
                rd_kafka_toppar_dump(fp, "   ", rktp);
        if (locks) {
                rd_kafka_broker_unlock(rkb);
        }
}


static void rd_kafka_dump0 (FILE *fp, rd_kafka_t *rk, int locks) {
	rd_kafka_broker_t *rkb;
	rd_kafka_itopic_t *rkt;
	rd_kafka_toppar_t *rktp;
        shptr_rd_kafka_toppar_t *s_rktp;
        int i;
	unsigned int tot_cnt;
	size_t tot_size;

	rd_kafka_curr_msgs_get(rk, &tot_cnt, &tot_size);

	if (locks)
                rd_kafka_rdlock(rk);
#if ENABLE_DEVEL
        fprintf(fp, "rd_kafka_op_cnt: %d\n", rd_atomic32_get(&rd_kafka_op_cnt));
#endif
	fprintf(fp, "rd_kafka_t %p: %s\n", rk, rk->rk_name);

	fprintf(fp, " producer.msg_cnt %u (%"PRIusz" bytes)\n",
		tot_cnt, tot_size);
	fprintf(fp, " rk_rep reply queue: %i ops\n",
		rd_kafka_q_len(rk->rk_rep));

	fprintf(fp, " brokers:\n");
        if (locks)
                mtx_lock(&rk->rk_internal_rkb_lock);
        if (rk->rk_internal_rkb)
                rd_kafka_broker_dump(fp, rk->rk_internal_rkb, locks);
        if (locks)
                mtx_unlock(&rk->rk_internal_rkb_lock);

	TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) {
                rd_kafka_broker_dump(fp, rkb, locks);
	}

        fprintf(fp, " cgrp:\n");
        if (rk->rk_cgrp) {
                rd_kafka_cgrp_t *rkcg = rk->rk_cgrp;
                fprintf(fp, "  %.*s in state %s, flags 0x%x\n",
                        RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
                        rd_kafka_cgrp_state_names[rkcg->rkcg_state],
                        rkcg->rkcg_flags);
                fprintf(fp, "   coord_id %"PRId32", managing broker %s\n",
                        rkcg->rkcg_coord_id,
                        rkcg->rkcg_rkb ?
                        rd_kafka_broker_name(rkcg->rkcg_rkb) : "(none)");

                fprintf(fp, "  toppars:\n");
                RD_LIST_FOREACH(s_rktp, &rkcg->rkcg_toppars, i) {
                        rktp = rd_kafka_toppar_s2i(s_rktp);
                        fprintf(fp, "   %.*s [%"PRId32"] in state %s\n",
                                RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
                                rktp->rktp_partition,
                                rd_kafka_fetch_states[rktp->rktp_fetch_state]);
                }
        }

	fprintf(fp, " topics:\n");
	TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) {
		fprintf(fp, "  %.*s with %"PRId32" partitions, state %s, "
                        "refcnt %i\n",
			RD_KAFKAP_STR_PR(rkt->rkt_topic),
			rkt->rkt_partition_cnt,
                        rd_kafka_topic_state_names[rkt->rkt_state],
                        rd_refcnt_get(&rkt->rkt_refcnt));
		if (rkt->rkt_ua)
			rd_kafka_toppar_dump(fp, "   ",
                                             rd_kafka_toppar_s2i(rkt->rkt_ua));
                if (rd_list_empty(&rkt->rkt_desp)) {
                        fprintf(fp, "   desired partitions:");
                        RD_LIST_FOREACH(s_rktp, &rkt->rkt_desp,  i)
                                fprintf(fp, " %"PRId32,
                                        rd_kafka_toppar_s2i(s_rktp)->
                                        rktp_partition);
                        fprintf(fp, "\n");
                }
	}

        fprintf(fp, "\n");
        rd_kafka_metadata_cache_dump(fp, rk);

        if (locks)
                rd_kafka_rdunlock(rk);
}

void rd_kafka_dump (FILE *fp, rd_kafka_t *rk) {

        if (rk)
                rd_kafka_dump0(fp, rk, 1/*locks*/);

#if ENABLE_SHAREDPTR_DEBUG
        rd_shared_ptrs_dump();
#endif
}



const char *rd_kafka_name (const rd_kafka_t *rk) {
	return rk->rk_name;
}

rd_kafka_type_t rd_kafka_type(const rd_kafka_t *rk) {
        return rk->rk_type;
}


char *rd_kafka_memberid (const rd_kafka_t *rk) {
	rd_kafka_op_t *rko;
	rd_kafka_cgrp_t *rkcg;
	char *memberid;

	if (!(rkcg = rd_kafka_cgrp_get(rk)))
		return NULL;

	rko = rd_kafka_op_req2(rkcg->rkcg_ops, RD_KAFKA_OP_NAME);
	if (!rko)
		return NULL;
	memberid = rko->rko_u.name.str;
	rko->rko_u.name.str = NULL;
	rd_kafka_op_destroy(rko);

	return memberid;
}


char *rd_kafka_clusterid (rd_kafka_t *rk, int timeout_ms) {
        rd_ts_t abs_timeout = rd_timeout_init(timeout_ms);

        /* ClusterId is returned in Metadata >=V2 responses and
         * cached on the rk. If no cached value is available
         * it means no metadata has been received yet, or we're
         * using a lower protocol version
         * (e.g., lack of api.version.request=true). */

        while (1) {
                int remains_ms;

                rd_kafka_rdlock(rk);

                if (rk->rk_clusterid) {
                        /* Cached clusterid available. */
                        char *ret = rd_strdup(rk->rk_clusterid);
                        rd_kafka_rdunlock(rk);
                        return ret;
                } else if (rk->rk_ts_metadata > 0) {
                        /* Metadata received but no clusterid,
                         * this probably means the broker is too old
                         * or api.version.request=false. */
                        rd_kafka_rdunlock(rk);
                        return NULL;
                }

                rd_kafka_rdunlock(rk);

                /* Wait for up to timeout_ms for a metadata refresh,
                 * if permitted by application. */
                remains_ms = rd_timeout_remains(abs_timeout);
                if (rd_timeout_expired(remains_ms))
                        return NULL;

                rd_kafka_metadata_cache_wait_change(rk, remains_ms);
        }

        return NULL;
}


void *rd_kafka_opaque (const rd_kafka_t *rk) {
        return rk->rk_conf.opaque;
}


int rd_kafka_outq_len (rd_kafka_t *rk) {
	return rd_kafka_curr_msgs_cnt(rk) + rd_kafka_q_len(rk->rk_rep);
}


rd_kafka_resp_err_t rd_kafka_flush (rd_kafka_t *rk, int timeout_ms) {
        unsigned int msg_cnt = 0;
	int qlen;
	rd_ts_t ts_end = rd_timeout_init(timeout_ms);
        int tmout;

	if (rk->rk_type != RD_KAFKA_PRODUCER)
		return RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED;

        rd_kafka_yield_thread = 0;
        while (((qlen = rd_kafka_q_len(rk->rk_rep)) > 0 ||
                (msg_cnt = rd_kafka_curr_msgs_cnt(rk)) > 0) &&
               !rd_kafka_yield_thread &&
               (tmout = rd_timeout_remains_limit(ts_end, 100))!=RD_POLL_NOWAIT)
                rd_kafka_poll(rk, tmout);

	return qlen + msg_cnt > 0 ? RD_KAFKA_RESP_ERR__TIMED_OUT :
		RD_KAFKA_RESP_ERR_NO_ERROR;
}



int rd_kafka_version (void) {
	return RD_KAFKA_VERSION;
}

const char *rd_kafka_version_str (void) {
	static char ret[128];
	size_t of = 0, r;

	if (*ret)
		return ret;

#ifdef LIBRDKAFKA_GIT_VERSION
	if (*LIBRDKAFKA_GIT_VERSION) {
		of = rd_snprintf(ret, sizeof(ret), "%s",
				 *LIBRDKAFKA_GIT_VERSION == 'v' ?
                                 LIBRDKAFKA_GIT_VERSION+1 :
                                 LIBRDKAFKA_GIT_VERSION);
		if (of > sizeof(ret))
			of = sizeof(ret);
	}
#endif

#define _my_sprintf(...) do {						\
		r = rd_snprintf(ret+of, sizeof(ret)-of, __VA_ARGS__);	\
		if (r > sizeof(ret)-of)					\
			r = sizeof(ret)-of;				\
		of += r;						\
	} while(0)

	if (of == 0) {
		int ver = rd_kafka_version();
		int prel = (ver & 0xff);
		_my_sprintf("%i.%i.%i",
			    (ver >> 24) & 0xff,
			    (ver >> 16) & 0xff,
			    (ver >> 8) & 0xff);
		if (prel != 0xff) {
			/* pre-builds below 200 are just running numbers,
			 * above 200 are RC numbers. */
			if (prel <= 200)
				_my_sprintf("-pre%d", prel);
			else
				_my_sprintf("-RC%d", prel - 200);
		}
	}

#if ENABLE_DEVEL
	_my_sprintf("-devel");
#endif

#if ENABLE_SHAREDPTR_DEBUG
	_my_sprintf("-shptr");
#endif

#if WITHOUT_OPTIMIZATION
	_my_sprintf("-O0");
#endif

	return ret;
}


/**
 * Assert trampoline to print some debugging information on crash.
 */
void
RD_NORETURN
rd_kafka_crash (const char *file, int line, const char *function,
                rd_kafka_t *rk, const char *reason) {
        fprintf(stderr, "*** %s:%i:%s: %s ***\n",
                file, line, function, reason);
        if (rk)
                rd_kafka_dump0(stderr, rk, 0/*no locks*/);
        abort();
}





struct list_groups_state {
        rd_kafka_q_t *q;
        rd_kafka_resp_err_t err;
        int wait_cnt;
        const char *desired_group;
        struct rd_kafka_group_list *grplist;
        int grplist_size;
};

static void rd_kafka_DescribeGroups_resp_cb (rd_kafka_t *rk,
					     rd_kafka_broker_t *rkb,
                                             rd_kafka_resp_err_t err,
                                             rd_kafka_buf_t *reply,
                                             rd_kafka_buf_t *request,
                                             void *opaque) {
        struct list_groups_state *state;
        const int log_decode_errors = LOG_ERR;
        int cnt;

        if (err == RD_KAFKA_RESP_ERR__DESTROY) {
                /* 'state' has gone out of scope due to list_groups()
                 * timing out and returning. */
                return;
        }

        state = opaque;
        state->wait_cnt--;

        if (err)
                goto err;

        rd_kafka_buf_read_i32(reply, &cnt);

        while (cnt-- > 0) {
                int16_t ErrorCode;
                rd_kafkap_str_t Group, GroupState, ProtoType, Proto;
                int MemberCnt;
                struct rd_kafka_group_info *gi;

                if (state->grplist->group_cnt == state->grplist_size) {
                        /* Grow group array */
                        state->grplist_size *= 2;
                        state->grplist->groups =
                                rd_realloc(state->grplist->groups,
                                           state->grplist_size *
                                           sizeof(*state->grplist->groups));
                }

                gi = &state->grplist->groups[state->grplist->group_cnt++];
                memset(gi, 0, sizeof(*gi));

                rd_kafka_buf_read_i16(reply, &ErrorCode);
                rd_kafka_buf_read_str(reply, &Group);
                rd_kafka_buf_read_str(reply, &GroupState);
                rd_kafka_buf_read_str(reply, &ProtoType);
                rd_kafka_buf_read_str(reply, &Proto);
                rd_kafka_buf_read_i32(reply, &MemberCnt);

                if (MemberCnt > 100000) {
                        err = RD_KAFKA_RESP_ERR__BAD_MSG;
                        goto err;
                }

                rd_kafka_broker_lock(rkb);
                gi->broker.id = rkb->rkb_nodeid;
                gi->broker.host = rd_strdup(rkb->rkb_origname);
                gi->broker.port = rkb->rkb_port;
                rd_kafka_broker_unlock(rkb);

                gi->err = ErrorCode;
                gi->group = RD_KAFKAP_STR_DUP(&Group);
                gi->state = RD_KAFKAP_STR_DUP(&GroupState);
                gi->protocol_type = RD_KAFKAP_STR_DUP(&ProtoType);
                gi->protocol = RD_KAFKAP_STR_DUP(&Proto);

                if (MemberCnt > 0)
                        gi->members =
                                rd_malloc(MemberCnt * sizeof(*gi->members));

                while (MemberCnt-- > 0) {
                        rd_kafkap_str_t MemberId, ClientId, ClientHost;
                        rd_kafkap_bytes_t Meta, Assignment;
                        struct rd_kafka_group_member_info *mi;

                        mi = &gi->members[gi->member_cnt++];
                        memset(mi, 0, sizeof(*mi));

                        rd_kafka_buf_read_str(reply, &MemberId);
                        rd_kafka_buf_read_str(reply, &ClientId);
                        rd_kafka_buf_read_str(reply, &ClientHost);
                        rd_kafka_buf_read_bytes(reply, &Meta);
                        rd_kafka_buf_read_bytes(reply, &Assignment);

                        mi->member_id = RD_KAFKAP_STR_DUP(&MemberId);
                        mi->client_id = RD_KAFKAP_STR_DUP(&ClientId);
                        mi->client_host = RD_KAFKAP_STR_DUP(&ClientHost);

                        if (RD_KAFKAP_BYTES_LEN(&Meta) == 0) {
                                mi->member_metadata_size = 0;
                                mi->member_metadata = NULL;
                        } else {
                                mi->member_metadata_size =
                                        RD_KAFKAP_BYTES_LEN(&Meta);
                                mi->member_metadata =
                                        rd_memdup(Meta.data,
                                                  mi->member_metadata_size);
                        }

                        if (RD_KAFKAP_BYTES_LEN(&Assignment) == 0) {
                                mi->member_assignment_size = 0;
                                mi->member_assignment = NULL;
                        } else {
                                mi->member_assignment_size =
                                        RD_KAFKAP_BYTES_LEN(&Assignment);
                                mi->member_assignment =
                                        rd_memdup(Assignment.data,
                                                  mi->member_assignment_size);
                        }
                }
        }

err:
        state->err = err;
        return;

 err_parse:
        state->err = reply->rkbuf_err;
}

static void rd_kafka_ListGroups_resp_cb (rd_kafka_t *rk,
					 rd_kafka_broker_t *rkb,
                                         rd_kafka_resp_err_t err,
                                         rd_kafka_buf_t *reply,
                                         rd_kafka_buf_t *request,
                                         void *opaque) {
        struct list_groups_state *state;
        const int log_decode_errors = LOG_ERR;
        int16_t ErrorCode;
        char **grps;
        int cnt, grpcnt, i = 0;

        if (err == RD_KAFKA_RESP_ERR__DESTROY) {
                /* 'state' is no longer in scope because
                 * list_groups() timed out and returned to the caller.
                 * We must not touch anything here but simply return. */
                return;
        }

        state = opaque;

        state->wait_cnt--;

        if (err)
                goto err;

        rd_kafka_buf_read_i16(reply, &ErrorCode);
        if (ErrorCode) {
                err = ErrorCode;
                goto err;
        }

        rd_kafka_buf_read_i32(reply, &cnt);

        if (state->desired_group)
                grpcnt = 1;
        else
                grpcnt = cnt;

        if (cnt == 0 || grpcnt == 0)
                return;

        grps = rd_malloc(sizeof(*grps) * grpcnt);

        while (cnt-- > 0) {
                rd_kafkap_str_t grp, proto;

                rd_kafka_buf_read_str(reply, &grp);
                rd_kafka_buf_read_str(reply, &proto);

                if (state->desired_group &&
                    rd_kafkap_str_cmp_str(&grp, state->desired_group))
                        continue;

                grps[i++] = RD_KAFKAP_STR_DUP(&grp);

                if (i == grpcnt)
                        break;
        }

        if (i > 0) {
                state->wait_cnt++;
                rd_kafka_DescribeGroupsRequest(rkb,
                                               (const char **)grps, i,
                                               RD_KAFKA_REPLYQ(state->q, 0),
                                               rd_kafka_DescribeGroups_resp_cb,
                                               state);

                while (i-- > 0)
                        rd_free(grps[i]);
        }


        rd_free(grps);

err:
        state->err = err;
        return;

 err_parse:
        state->err = reply->rkbuf_err;
}

rd_kafka_resp_err_t
rd_kafka_list_groups (rd_kafka_t *rk, const char *group,
                      const struct rd_kafka_group_list **grplistp,
                      int timeout_ms) {
        rd_kafka_broker_t *rkb;
        int rkb_cnt = 0;
        struct list_groups_state state = RD_ZERO_INIT;
        rd_ts_t ts_end = rd_timeout_init(timeout_ms);
	int state_version = rd_kafka_brokers_get_state_version(rk);

        /* Wait until metadata has been fetched from cluster so
         * that we have a full broker list.
	 * This state only happens during initial client setup, after that
	 * there'll always be a cached metadata copy. */
        rd_kafka_rdlock(rk);
        while (!rk->rk_ts_metadata) {
                rd_kafka_rdunlock(rk);

		if (!rd_kafka_brokers_wait_state_change(
			    rk, state_version, rd_timeout_remains(ts_end)))
                        return RD_KAFKA_RESP_ERR__TIMED_OUT;

                rd_kafka_rdlock(rk);
        }

        state.q = rd_kafka_q_new(rk);
        state.desired_group = group;
        state.grplist = rd_calloc(1, sizeof(*state.grplist));
        state.grplist_size = group ? 1 : 32;

        state.grplist->groups = rd_malloc(state.grplist_size *
                                          sizeof(*state.grplist->groups));

        /* Query each broker for its list of groups */
        TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) {
                rd_kafka_broker_lock(rkb);
                if (rkb->rkb_nodeid == -1) {
                        rd_kafka_broker_unlock(rkb);
                        continue;
                }

                state.wait_cnt++;
                rd_kafka_ListGroupsRequest(rkb,
                                           RD_KAFKA_REPLYQ(state.q, 0),
					   rd_kafka_ListGroups_resp_cb,
                                           &state);

                rkb_cnt++;

                rd_kafka_broker_unlock(rkb);

        }
        rd_kafka_rdunlock(rk);

        if (rkb_cnt == 0) {
                state.err = RD_KAFKA_RESP_ERR__TRANSPORT;

        } else {
                int remains;

                while (state.wait_cnt > 0 &&
                       !rd_timeout_expired((remains =
                                            rd_timeout_remains(ts_end)))) {
                        rd_kafka_q_serve(state.q, remains, 0,
                                         RD_KAFKA_Q_CB_CALLBACK,
                                         rd_kafka_poll_cb, NULL);
                        /* Ignore yields */
                }
        }

        rd_kafka_q_destroy_owner(state.q);

        if (state.wait_cnt > 0 && !state.err) {
                if (state.grplist->group_cnt == 0)
                        state.err = RD_KAFKA_RESP_ERR__TIMED_OUT;
                else {
                        *grplistp = state.grplist;
                        return RD_KAFKA_RESP_ERR__PARTIAL;
                }
        }

        if (state.err)
                rd_kafka_group_list_destroy(state.grplist);
        else
                *grplistp = state.grplist;

        return state.err;
}


void rd_kafka_group_list_destroy (const struct rd_kafka_group_list *grplist0) {
        struct rd_kafka_group_list *grplist =
                (struct rd_kafka_group_list *)grplist0;

        while (grplist->group_cnt-- > 0) {
                struct rd_kafka_group_info *gi;
                gi = &grplist->groups[grplist->group_cnt];

                if (gi->broker.host)
                        rd_free(gi->broker.host);
                if (gi->group)
                        rd_free(gi->group);
                if (gi->state)
                        rd_free(gi->state);
                if (gi->protocol_type)
                        rd_free(gi->protocol_type);
                if (gi->protocol)
                        rd_free(gi->protocol);

                while (gi->member_cnt-- > 0) {
                        struct rd_kafka_group_member_info *mi;
                        mi = &gi->members[gi->member_cnt];

                        if (mi->member_id)
                                rd_free(mi->member_id);
                        if (mi->client_id)
                                rd_free(mi->client_id);
                        if (mi->client_host)
                                rd_free(mi->client_host);
                        if (mi->member_metadata)
                                rd_free(mi->member_metadata);
                        if (mi->member_assignment)
                                rd_free(mi->member_assignment);
                }

                if (gi->members)
                        rd_free(gi->members);
        }

        if (grplist->groups)
                rd_free(grplist->groups);

        rd_free(grplist);
}



const char *rd_kafka_get_debug_contexts(void) {
	return RD_KAFKA_DEBUG_CONTEXTS;
}


int rd_kafka_path_is_dir (const char *path) {
#ifdef _MSC_VER
	struct _stat st;
	return (_stat(path, &st) == 0 && st.st_mode & S_IFDIR);
#else
	struct stat st;
	return (stat(path, &st) == 0 && S_ISDIR(st.st_mode));
#endif
}


void rd_kafka_mem_free (rd_kafka_t *rk, void *ptr) {
        free(ptr);
}


int rd_kafka_errno (void) {
        return errno;
}

int rd_kafka_unittest (void) {
        return rd_unittest();
}


#if ENABLE_SHAREDPTR_DEBUG
struct rd_shptr0_head rd_shared_ptr_debug_list;
mtx_t rd_shared_ptr_debug_mtx;

void rd_shared_ptrs_dump (void) {
        rd_shptr0_t *sptr;

        printf("################ Current shared pointers ################\n");
        printf("### op_cnt: %d\n", rd_atomic32_get(&rd_kafka_op_cnt));
        mtx_lock(&rd_shared_ptr_debug_mtx);
        LIST_FOREACH(sptr, &rd_shared_ptr_debug_list, link)
                printf("# shptr ((%s*)%p): object %p refcnt %d: at %s:%d\n",
                       sptr->typename, sptr, sptr->obj,
                       rd_refcnt_get(sptr->ref), sptr->func, sptr->line);
        mtx_unlock(&rd_shared_ptr_debug_mtx);
        printf("#########################################################\n");
}
#endif