Blob Blame History Raw
/*
 * librdkafka - The Apache Kafka C/C++ library
 *
 * Copyright (c) 2015 Magnus Edenhill
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *
 * 1. Redistributions of source code must retain the above copyright notice,
 *    this list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright notice,
 *    this list of conditions and the following disclaimer in the documentation
 *    and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */
#include "rdkafka_int.h"
#include "rdkafka_topic.h"
#include "rdkafka_broker.h"
#include "rdkafka_request.h"
#include "rdkafka_offset.h"
#include "rdkafka_partition.h"
#include "rdregex.h"
#include "rdports.h"  /* rd_qsort_r() */

const char *rd_kafka_fetch_states[] = {
	"none",
        "stopping",
        "stopped",
	"offset-query",
	"offset-wait",
	"active"
};


static rd_kafka_op_res_t
rd_kafka_toppar_op_serve (rd_kafka_t *rk,
                          rd_kafka_q_t *rkq, rd_kafka_op_t *rko,
                          rd_kafka_q_cb_type_t cb_type, void *opaque);


static RD_INLINE int32_t
rd_kafka_toppar_version_new_barrier0 (rd_kafka_toppar_t *rktp,
				     const char *func, int line) {
	int32_t version = rd_atomic32_add(&rktp->rktp_version, 1);
	rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BARRIER",
		     "%s [%"PRId32"]: %s:%d: new version barrier v%"PRId32,
		     rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
		     func, line, version);
	return version;
}

#define rd_kafka_toppar_version_new_barrier(rktp) \
	rd_kafka_toppar_version_new_barrier0(rktp, __FUNCTION__, __LINE__)


/**
 * Toppar based OffsetResponse handling.
 * This is used for updating the low water mark for consumer lag.
 */
static void rd_kafka_toppar_lag_handle_Offset (rd_kafka_t *rk,
					       rd_kafka_broker_t *rkb,
					       rd_kafka_resp_err_t err,
					       rd_kafka_buf_t *rkbuf,
					       rd_kafka_buf_t *request,
					       void *opaque) {
        shptr_rd_kafka_toppar_t *s_rktp = opaque;
        rd_kafka_toppar_t *rktp = rd_kafka_toppar_s2i(s_rktp);
        rd_kafka_topic_partition_list_t *offsets;
        rd_kafka_topic_partition_t *rktpar;

        offsets = rd_kafka_topic_partition_list_new(1);

        /* Parse and return Offset */
        err = rd_kafka_handle_Offset(rkb->rkb_rk, rkb, err,
                                     rkbuf, request, offsets);

        if (err == RD_KAFKA_RESP_ERR__IN_PROGRESS) {
                rd_kafka_topic_partition_list_destroy(offsets);
                return; /* Retrying */
        }

        if (!err && !(rktpar = rd_kafka_topic_partition_list_find(
                              offsets,
                              rktp->rktp_rkt->rkt_topic->str,
                              rktp->rktp_partition)))
                err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;

        if (!err) {
                rd_kafka_toppar_lock(rktp);
                rktp->rktp_lo_offset = rktpar->offset;
                rd_kafka_toppar_unlock(rktp);
        }

        rd_kafka_topic_partition_list_destroy(offsets);

        rktp->rktp_wait_consumer_lag_resp = 0;

        rd_kafka_toppar_destroy(s_rktp); /* from request.opaque */
}



/**
 * Request information from broker to keep track of consumer lag.
 *
 * Locality: toppar handle thread
 */
static void rd_kafka_toppar_consumer_lag_req (rd_kafka_toppar_t *rktp) {
	rd_kafka_broker_t *rkb;
        rd_kafka_topic_partition_list_t *partitions;

        if (rktp->rktp_wait_consumer_lag_resp)
                return; /* Previous request not finished yet */

        rkb = rd_kafka_toppar_leader(rktp, 1/*proper brokers only*/);
        if (!rkb)
		return;

        rktp->rktp_wait_consumer_lag_resp = 1;

        partitions = rd_kafka_topic_partition_list_new(1);
        rd_kafka_topic_partition_list_add(partitions,
                                          rktp->rktp_rkt->rkt_topic->str,
                                          rktp->rktp_partition)->offset =
                RD_KAFKA_OFFSET_BEGINNING;

        /* Ask for oldest offset. The newest offset is automatically
         * propagated in FetchResponse.HighwaterMark. */
        rd_kafka_OffsetRequest(rkb, partitions, 0,
                               RD_KAFKA_REPLYQ(rktp->rktp_ops, 0),
                               rd_kafka_toppar_lag_handle_Offset,
                               rd_kafka_toppar_keep(rktp));

        rd_kafka_topic_partition_list_destroy(partitions);

        rd_kafka_broker_destroy(rkb); /* from toppar_leader() */
}



/**
 * Request earliest offset to measure consumer lag
 *
 * Locality: toppar handler thread
 */
static void rd_kafka_toppar_consumer_lag_tmr_cb (rd_kafka_timers_t *rkts,
						 void *arg) {
	rd_kafka_toppar_t *rktp = arg;
	rd_kafka_toppar_consumer_lag_req(rktp);
}


/**
 * Add new partition to topic.
 *
 * Locks: rd_kafka_topic_wrlock() must be held.
 * Locks: rd_kafka_wrlock() must be held.
 */
shptr_rd_kafka_toppar_t *rd_kafka_toppar_new0 (rd_kafka_itopic_t *rkt,
					       int32_t partition,
					       const char *func, int line) {
	rd_kafka_toppar_t *rktp;

	rktp = rd_calloc(1, sizeof(*rktp));

	rktp->rktp_partition = partition;
	rktp->rktp_rkt = rkt;
        rktp->rktp_leader_id = -1;
	rktp->rktp_fetch_state = RD_KAFKA_TOPPAR_FETCH_NONE;
        rktp->rktp_fetch_msg_max_bytes
            = rkt->rkt_rk->rk_conf.fetch_msg_max_bytes;
	rktp->rktp_offset_fp = NULL;
        rd_kafka_offset_stats_reset(&rktp->rktp_offsets);
        rd_kafka_offset_stats_reset(&rktp->rktp_offsets_fin);
        rktp->rktp_hi_offset = RD_KAFKA_OFFSET_INVALID;
	rktp->rktp_lo_offset = RD_KAFKA_OFFSET_INVALID;
	rktp->rktp_app_offset = RD_KAFKA_OFFSET_INVALID;
        rktp->rktp_stored_offset = RD_KAFKA_OFFSET_INVALID;
        rktp->rktp_committed_offset = RD_KAFKA_OFFSET_INVALID;
	rd_kafka_msgq_init(&rktp->rktp_msgq);
        rktp->rktp_msgq_wakeup_fd = -1;
	rd_kafka_msgq_init(&rktp->rktp_xmit_msgq);
	mtx_init(&rktp->rktp_lock, mtx_plain);

        rd_refcnt_init(&rktp->rktp_refcnt, 0);
	rktp->rktp_fetchq = rd_kafka_q_new(rkt->rkt_rk);
        rktp->rktp_ops    = rd_kafka_q_new(rkt->rkt_rk);
        rktp->rktp_ops->rkq_serve = rd_kafka_toppar_op_serve;
        rktp->rktp_ops->rkq_opaque = rktp;
        rd_atomic32_init(&rktp->rktp_version, 1);
	rktp->rktp_op_version = rd_atomic32_get(&rktp->rktp_version);

        /* Consumer: If statistics is available we query the oldest offset
         * of each partition.
         * Since the oldest offset only moves on log retention, we cap this
         * value on the low end to a reasonable value to avoid flooding
         * the brokers with OffsetRequests when our statistics interval is low.
         * FIXME: Use a global timer to collect offsets for all partitions */
        if (rktp->rktp_rkt->rkt_rk->rk_conf.stats_interval_ms > 0 &&
            rkt->rkt_rk->rk_type == RD_KAFKA_CONSUMER &&
            rktp->rktp_partition != RD_KAFKA_PARTITION_UA) {
                int intvl = rkt->rkt_rk->rk_conf.stats_interval_ms;
                if (intvl < 10 * 1000 /* 10s */)
                        intvl = 10 * 1000;
		rd_kafka_timer_start(&rkt->rkt_rk->rk_timers,
				     &rktp->rktp_consumer_lag_tmr,
                                     intvl * 1000ll,
				     rd_kafka_toppar_consumer_lag_tmr_cb,
				     rktp);
        }

        rktp->rktp_s_rkt = rd_kafka_topic_keep(rkt);

	rd_kafka_q_fwd_set(rktp->rktp_ops, rkt->rkt_rk->rk_ops);
	rd_kafka_dbg(rkt->rkt_rk, TOPIC, "TOPPARNEW", "NEW %s [%"PRId32"] %p (at %s:%d)",
		     rkt->rkt_topic->str, rktp->rktp_partition, rktp,
		     func, line);

	return rd_kafka_toppar_keep_src(func, line, rktp);
}



/**
 * Removes a toppar from its duties, global lists, etc.
 *
 * Locks: rd_kafka_toppar_lock() MUST be held
 */
static void rd_kafka_toppar_remove (rd_kafka_toppar_t *rktp) {
        rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "TOPPARREMOVE",
                     "Removing toppar %s [%"PRId32"] %p",
                     rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
		     rktp);

	rd_kafka_timer_stop(&rktp->rktp_rkt->rkt_rk->rk_timers,
			    &rktp->rktp_offset_query_tmr, 1/*lock*/);
	rd_kafka_timer_stop(&rktp->rktp_rkt->rkt_rk->rk_timers,
			    &rktp->rktp_consumer_lag_tmr, 1/*lock*/);

	rd_kafka_q_fwd_set(rktp->rktp_ops, NULL);
}


/**
 * Final destructor for partition.
 */
void rd_kafka_toppar_destroy_final (rd_kafka_toppar_t *rktp) {

        rd_kafka_toppar_remove(rktp);

	rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "DESTROY",
		     "%s [%"PRId32"]: %p DESTROY_FINAL",
		     rktp->rktp_rkt->rkt_topic->str,
                     rktp->rktp_partition, rktp);

	/* Clear queues */
	rd_kafka_assert(rktp->rktp_rkt->rkt_rk,
			rd_kafka_msgq_len(&rktp->rktp_xmit_msgq) == 0);
	rd_kafka_dr_msgq(rktp->rktp_rkt, &rktp->rktp_msgq,
			 RD_KAFKA_RESP_ERR__DESTROY);
	rd_kafka_q_destroy_owner(rktp->rktp_fetchq);
        rd_kafka_q_destroy_owner(rktp->rktp_ops);

	rd_kafka_replyq_destroy(&rktp->rktp_replyq);

	rd_kafka_topic_destroy0(rktp->rktp_s_rkt);

	mtx_destroy(&rktp->rktp_lock);

        rd_refcnt_destroy(&rktp->rktp_refcnt);

	rd_free(rktp);
}


/**
 * Set toppar fetching state.
 *
 * Locality: broker thread
 * Locks: rd_kafka_toppar_lock() MUST be held.
 */
void rd_kafka_toppar_set_fetch_state (rd_kafka_toppar_t *rktp,
                                      int fetch_state) {
	rd_kafka_assert(NULL,
			thrd_is_current(rktp->rktp_rkt->rkt_rk->rk_thread));

        if ((int)rktp->rktp_fetch_state == fetch_state)
                return;

        rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "PARTSTATE",
                     "Partition %.*s [%"PRId32"] changed fetch state %s -> %s",
                     RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
                     rktp->rktp_partition,
                     rd_kafka_fetch_states[rktp->rktp_fetch_state],
                     rd_kafka_fetch_states[fetch_state]);

        rktp->rktp_fetch_state = fetch_state;

        if (fetch_state == RD_KAFKA_TOPPAR_FETCH_ACTIVE)
                rd_kafka_dbg(rktp->rktp_rkt->rkt_rk,
                             CONSUMER|RD_KAFKA_DBG_TOPIC,
                             "FETCH",
                             "Partition %.*s [%"PRId32"] start fetching "
                             "at offset %s",
                             RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
                             rktp->rktp_partition,
                             rd_kafka_offset2str(rktp->rktp_next_offset));
}


/**
 * Returns the appropriate toppar for a given rkt and partition.
 * The returned toppar has increased refcnt and must be unreffed by calling
 *  rd_kafka_toppar_destroy().
 * May return NULL.
 *
 * If 'ua_on_miss' is true the UA (unassigned) toppar is returned if
 * 'partition' was not known locally, else NULL is returned.
 *
 * Locks: Caller must hold rd_kafka_topic_*lock()
 */
shptr_rd_kafka_toppar_t *rd_kafka_toppar_get0 (const char *func, int line,
                                               const rd_kafka_itopic_t *rkt,
                                               int32_t partition,
                                               int ua_on_miss) {
        shptr_rd_kafka_toppar_t *s_rktp;

	if (partition >= 0 && partition < rkt->rkt_partition_cnt)
		s_rktp = rkt->rkt_p[partition];
	else if (partition == RD_KAFKA_PARTITION_UA || ua_on_miss)
		s_rktp = rkt->rkt_ua;
	else
		return NULL;

	if (s_rktp)
                return rd_kafka_toppar_keep_src(func,line,
                                                rd_kafka_toppar_s2i(s_rktp));

	return NULL;
}


/**
 * Same as rd_kafka_toppar_get() but no need for locking and
 * looks up the topic first.
 *
 * Locality: any
 * Locks: none
 */
shptr_rd_kafka_toppar_t *rd_kafka_toppar_get2 (rd_kafka_t *rk,
                                               const char *topic,
                                               int32_t partition,
                                               int ua_on_miss,
                                               int create_on_miss) {
	shptr_rd_kafka_itopic_t *s_rkt;
        rd_kafka_itopic_t *rkt;
        shptr_rd_kafka_toppar_t *s_rktp;

        rd_kafka_wrlock(rk);

        /* Find or create topic */
	if (unlikely(!(s_rkt = rd_kafka_topic_find(rk, topic, 0/*no-lock*/)))) {
                if (!create_on_miss) {
                        rd_kafka_wrunlock(rk);
                        return NULL;
                }
                s_rkt = rd_kafka_topic_new0(rk, topic, NULL,
					    NULL, 0/*no-lock*/);
                if (!s_rkt) {
                        rd_kafka_wrunlock(rk);
                        rd_kafka_log(rk, LOG_ERR, "TOPIC",
                                     "Failed to create local topic \"%s\": %s",
                                     topic, rd_strerror(errno));
                        return NULL;
                }
        }

        rd_kafka_wrunlock(rk);

        rkt = rd_kafka_topic_s2i(s_rkt);

	rd_kafka_topic_wrlock(rkt);
	s_rktp = rd_kafka_toppar_desired_add(rkt, partition);
	rd_kafka_topic_wrunlock(rkt);

        rd_kafka_topic_destroy0(s_rkt);

	return s_rktp;
}


/**
 * Returns a toppar if it is available in the cluster.
 * '*errp' is set to the error-code if lookup fails.
 *
 * Locks: topic_*lock() MUST be held
 */
shptr_rd_kafka_toppar_t *
rd_kafka_toppar_get_avail (const rd_kafka_itopic_t *rkt,
                           int32_t partition, int ua_on_miss,
                           rd_kafka_resp_err_t *errp) {
	shptr_rd_kafka_toppar_t *s_rktp;

        switch (rkt->rkt_state)
        {
        case RD_KAFKA_TOPIC_S_UNKNOWN:
                /* No metadata received from cluster yet.
                 * Put message in UA partition and re-run partitioner when
                 * cluster comes up. */
		partition = RD_KAFKA_PARTITION_UA;
                break;

        case RD_KAFKA_TOPIC_S_NOTEXISTS:
                /* Topic not found in cluster.
                 * Fail message immediately. */
                *errp = RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC;
                return NULL;

        case RD_KAFKA_TOPIC_S_EXISTS:
                /* Topic exists in cluster. */

                /* Topic exists but has no partitions.
                 * This is usually an transient state following the
                 * auto-creation of a topic. */
                if (unlikely(rkt->rkt_partition_cnt == 0)) {
                        partition = RD_KAFKA_PARTITION_UA;
                        break;
                }

                /* Check that partition exists. */
                if (partition >= rkt->rkt_partition_cnt) {
                        *errp = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
                        return NULL;
                }
                break;

        default:
                rd_kafka_assert(rkt->rkt_rk, !*"NOTREACHED");
                break;
        }

	/* Get new partition */
	s_rktp = rd_kafka_toppar_get(rkt, partition, 0);

	if (unlikely(!s_rktp)) {
		/* Unknown topic or partition */
		if (rkt->rkt_state == RD_KAFKA_TOPIC_S_NOTEXISTS)
			*errp = RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC;
		else
			*errp = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;

		return NULL;
	}

	return s_rktp;
}


/**
 * Looks for partition 'i' in topic 'rkt's desired list.
 *
 * The desired partition list is the list of partitions that are desired
 * (e.g., by the consumer) but not yet seen on a broker.
 * As soon as the partition is seen on a broker the toppar is moved from
 * the desired list and onto the normal rkt_p array.
 * When the partition on the broker goes away a desired partition is put
 * back on the desired list.
 *
 * Locks: rd_kafka_topic_*lock() must be held.
 * Note: 'rktp' refcount is increased.
 */

shptr_rd_kafka_toppar_t *rd_kafka_toppar_desired_get (rd_kafka_itopic_t *rkt,
                                                      int32_t partition) {
	shptr_rd_kafka_toppar_t *s_rktp;
        int i;

	RD_LIST_FOREACH(s_rktp, &rkt->rkt_desp, i) {
                rd_kafka_toppar_t *rktp = rd_kafka_toppar_s2i(s_rktp);
		if (rktp->rktp_partition == partition)
			return rd_kafka_toppar_keep(rktp);
        }

	return NULL;
}


/**
 * Link toppar on desired list.
 *
 * Locks: rd_kafka_topic_wrlock() and toppar_lock() must be held.
 */
void rd_kafka_toppar_desired_link (rd_kafka_toppar_t *rktp) {
        shptr_rd_kafka_toppar_t *s_rktp;

        if (rktp->rktp_s_for_desp)
                return; /* Already linked */

        s_rktp = rd_kafka_toppar_keep(rktp);
        rd_list_add(&rktp->rktp_rkt->rkt_desp, s_rktp);
        rktp->rktp_s_for_desp = s_rktp; /* Desired list refcount */
}

/**
 * Unlink toppar from desired list.
 *
 * Locks: rd_kafka_topic_wrlock() and toppar_lock() must be held.
 */
void rd_kafka_toppar_desired_unlink (rd_kafka_toppar_t *rktp) {
        if (!rktp->rktp_s_for_desp)
                return; /* Not linked */

        rd_list_remove(&rktp->rktp_rkt->rkt_desp, rktp->rktp_s_for_desp);
        rd_kafka_toppar_destroy(rktp->rktp_s_for_desp);
        rktp->rktp_s_for_desp = NULL;
 }


/**
 * @brief If rktp is not already desired:
 *  - mark as DESIRED|UNKNOWN
 *  - add to desired list
 *
 * @remark toppar_lock() MUST be held
 */
void rd_kafka_toppar_desired_add0 (rd_kafka_toppar_t *rktp) {
        if ((rktp->rktp_flags & RD_KAFKA_TOPPAR_F_DESIRED))
                return;

        rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "DESIRED",
                     "%s [%"PRId32"]: adding to DESIRED list",
                     rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition);
	rktp->rktp_flags |= RD_KAFKA_TOPPAR_F_DESIRED;
        rd_kafka_toppar_desired_link(rktp);
}


/**
 * Adds 'partition' as a desired partition to topic 'rkt', or updates
 * an existing partition to be desired.
 *
 * Locks: rd_kafka_topic_wrlock() must be held.
 */
shptr_rd_kafka_toppar_t *rd_kafka_toppar_desired_add (rd_kafka_itopic_t *rkt,
                                                      int32_t partition) {
	shptr_rd_kafka_toppar_t *s_rktp;
        rd_kafka_toppar_t *rktp;

	if ((s_rktp = rd_kafka_toppar_get(rkt,
                                          partition, 0/*no_ua_on_miss*/))) {
                rktp = rd_kafka_toppar_s2i(s_rktp);
		rd_kafka_toppar_lock(rktp);
                if (unlikely(!(rktp->rktp_flags & RD_KAFKA_TOPPAR_F_DESIRED))) {
                        rd_kafka_dbg(rkt->rkt_rk, TOPIC, "DESP",
                                     "Setting topic %s [%"PRId32"] partition "
                                     "as desired",
                                     rkt->rkt_topic->str, rktp->rktp_partition);
                        rktp->rktp_flags |= RD_KAFKA_TOPPAR_F_DESIRED;
                }
		rd_kafka_toppar_unlock(rktp);
		return s_rktp;
	}

	if ((s_rktp = rd_kafka_toppar_desired_get(rkt, partition)))
		return s_rktp;

	s_rktp = rd_kafka_toppar_new(rkt, partition);
        rktp = rd_kafka_toppar_s2i(s_rktp);

        rd_kafka_toppar_lock(rktp);
        rktp->rktp_flags |= RD_KAFKA_TOPPAR_F_UNKNOWN;
        rd_kafka_toppar_desired_add0(rktp);
        rd_kafka_toppar_unlock(rktp);

	rd_kafka_dbg(rkt->rkt_rk, TOPIC, "DESP",
		     "Adding desired topic %s [%"PRId32"]",
		     rkt->rkt_topic->str, rktp->rktp_partition);

	return s_rktp; /* Callers refcount */
}




/**
 * Unmarks an 'rktp' as desired.
 *
 * Locks: rd_kafka_topic_wrlock() and rd_kafka_toppar_lock() MUST be held.
 */
void rd_kafka_toppar_desired_del (rd_kafka_toppar_t *rktp) {

	if (!(rktp->rktp_flags & RD_KAFKA_TOPPAR_F_DESIRED))
		return;

	rktp->rktp_flags &= ~RD_KAFKA_TOPPAR_F_DESIRED;
        rd_kafka_toppar_desired_unlink(rktp);

        if (rktp->rktp_flags & RD_KAFKA_TOPPAR_F_UNKNOWN)
                rktp->rktp_flags &= ~RD_KAFKA_TOPPAR_F_UNKNOWN;


	rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "DESP",
		     "Removing (un)desired topic %s [%"PRId32"]",
		     rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition);
}



/**
 * Append message at tail of 'rktp' message queue.
 */
void rd_kafka_toppar_enq_msg (rd_kafka_toppar_t *rktp, rd_kafka_msg_t *rkm) {
        int wakeup_fd, queue_len;

        rd_kafka_toppar_lock(rktp);

        if (!rkm->rkm_u.producer.msgseq &&
            rktp->rktp_partition != RD_KAFKA_PARTITION_UA)
                rkm->rkm_u.producer.msgseq = ++rktp->rktp_msgseq;

        if (rktp->rktp_partition == RD_KAFKA_PARTITION_UA ||
            rktp->rktp_rkt->rkt_conf.queuing_strategy == RD_KAFKA_QUEUE_FIFO) {
                /* No need for enq_sorted(), this is the oldest message. */
                queue_len = rd_kafka_msgq_enq(&rktp->rktp_msgq, rkm);
        } else {
                queue_len = rd_kafka_msgq_enq_sorted(rktp->rktp_rkt,
                                                     &rktp->rktp_msgq, rkm);
        }

        wakeup_fd = rktp->rktp_msgq_wakeup_fd;
        rd_kafka_toppar_unlock(rktp);

#ifndef _MSC_VER
        if (wakeup_fd != -1 && queue_len == 1) {
                char one = 1;
                int r;
                r = rd_write(wakeup_fd, &one, sizeof(one));
                if (r == -1)
                        rd_kafka_log(rktp->rktp_rkt->rkt_rk, LOG_ERR, "PARTENQ",
                                     "%s [%"PRId32"]: write to "
                                     "wake-up fd %d failed: %s",
                                     rktp->rktp_rkt->rkt_topic->str,
                                     rktp->rktp_partition,
                                     wakeup_fd,
                                     rd_strerror(errno));
        }
#endif
}


/**
 * Dequeue message from 'rktp' message queue.
 */
void rd_kafka_toppar_deq_msg (rd_kafka_toppar_t *rktp, rd_kafka_msg_t *rkm) {
	rd_kafka_toppar_lock(rktp);
	rd_kafka_msgq_deq(&rktp->rktp_msgq, rkm, 1);
	rd_kafka_toppar_unlock(rktp);
}


void rd_kafka_msgq_insert_msgq (rd_kafka_msgq_t *destq,
                                rd_kafka_msgq_t *srcq,
                                int (*cmp) (const void *a, const void *b)) {
        rd_kafka_msg_t *first, *dest_first;

        first = TAILQ_FIRST(&srcq->rkmq_msgs);
        if (unlikely(!first)) {
                /* srcq is empty */
                return;
        }

        dest_first = TAILQ_FIRST(&destq->rkmq_msgs);

        /*
         * Try to optimize insertion of source list.
         */

        if (unlikely(!dest_first)) {
                /* Dest queue is empty, simply move the srcq. */
                rd_kafka_msgq_move(destq, srcq);

                return;
        }

        /* See if we can optimize the insertion by bulk-loading
         * the messages in place.
         * We know that:
         *  - destq is sorted
         *  - srcq is sorted
         *  - there is no overlap between the two.
         */

        if (cmp(first, dest_first) < 0) {
                /* Prepend src to dest queue.
                 * First append existing dest queue to src queue,
                 * then move src queue to now-empty dest queue,
                 * effectively prepending src queue to dest queue. */
                rd_kafka_msgq_concat(srcq, destq);
                rd_kafka_msgq_move(destq, srcq);

        } else if (cmp(first,
                       TAILQ_LAST(&destq->rkmq_msgs,
                                  rd_kafka_msgs_head_s)) > 0) {
                /* Append src to dest queue */
                rd_kafka_msgq_concat(destq, srcq);

        } else {
                /* Source queue messages reside somewhere
                 * in the dest queue range, find the insert position. */
                rd_kafka_msg_t *at;

                at = rd_kafka_msgq_find_pos(destq, first, cmp);
                rd_assert(at &&
                          *"Bug in msg_order_cmp(): "
                          "could not find insert position");

                /* Insert input queue after 'at' position.
                 * We know that:
                 * - at is non-NULL
                 * - at is not the last element. */
                TAILQ_INSERT_LIST(&destq->rkmq_msgs,
                                  at, &srcq->rkmq_msgs,
                                  rd_kafka_msgs_head_s,
                                  rd_kafka_msg_t *, rkm_link);

                destq->rkmq_msg_cnt   += srcq->rkmq_msg_cnt;
                destq->rkmq_msg_bytes += srcq->rkmq_msg_bytes;
                rd_kafka_msgq_init(srcq);
        }
}


/**
 * @brief Inserts messages from \p srcq according to their sorted position
 *        into \p destq, filtering out messages that can not be retried.
 *
 * @param incr_retry Increment retry count for messages.
 * @param max_retries Maximum retries allowed per message.
 * @param backoff Absolute retry backoff for retried messages.
 *
 * @returns the number of messages that could not be retried.
 */
int rd_kafka_retry_msgq (rd_kafka_msgq_t *destq,
                         rd_kafka_msgq_t *srcq,
                         int incr_retry, int max_retries, rd_ts_t backoff,
                         int (*cmp) (const void *a, const void *b)) {
        rd_kafka_msgq_t retryable = RD_KAFKA_MSGQ_INITIALIZER(retryable);
        rd_kafka_msg_t *rkm, *tmp;

        /* Scan through messages to see which ones are eligible for retry,
         * move the retryable ones to temporary queue and
         * set backoff time for first message and optionally
         * increase retry count for each message.
         * Sorted insert is not necessary since the original order
         * srcq order is maintained. */
        TAILQ_FOREACH_SAFE(rkm, &srcq->rkmq_msgs, rkm_link, tmp) {
                if (rkm->rkm_u.producer.retries + incr_retry > max_retries)
                        continue;

                rd_kafka_msgq_deq(srcq, rkm, 1);
                rd_kafka_msgq_enq(&retryable, rkm);

                rkm->rkm_u.producer.ts_backoff = backoff;
                rkm->rkm_u.producer.retries  += incr_retry;
        }

        /* No messages are retryable */
        if (RD_KAFKA_MSGQ_EMPTY(&retryable))
                return 0;

        /* Insert retryable list at sorted position */
        rd_kafka_msgq_insert_msgq(destq, &retryable, cmp);

        return 1;
}

/**
 * @brief Inserts messages from \p rkmq according to their sorted position
 *        into the partition xmit queue (i.e., the broker xmit work queue).
 *
 * @param incr_retry Increment retry count for messages.
 *
 * @returns the number of messages that could not be retried.
 *
 * @locality Broker thread
 */

int rd_kafka_toppar_retry_msgq (rd_kafka_toppar_t *rktp, rd_kafka_msgq_t *rkmq,
                                int incr_retry) {
        rd_kafka_t *rk = rktp->rktp_rkt->rkt_rk;
        rd_ts_t backoff = rd_clock() + (rk->rk_conf.retry_backoff_ms * 1000);
        int r;

        rd_kafka_toppar_lock(rktp);
        r = rd_kafka_retry_msgq(&rktp->rktp_xmit_msgq, rkmq,
                                incr_retry, rk->rk_conf.max_retries,
                                backoff,
                                rktp->rktp_rkt->rkt_conf.msg_order_cmp);
        rd_kafka_toppar_unlock(rktp);

        return r;
}

/**
 * @brief Insert sorted message list \p rkmq at sorted position in \p rktp 's
 *        message queue. The queues must not overlap.
 * @remark \p rkmq will be cleared.
 */
void rd_kafka_toppar_insert_msgq (rd_kafka_toppar_t *rktp,
                                  rd_kafka_msgq_t *rkmq) {
        rd_kafka_toppar_lock(rktp);
        rd_kafka_msgq_insert_msgq(&rktp->rktp_msgq, rkmq,
                                  rktp->rktp_rkt->rkt_conf.msg_order_cmp);
        rd_kafka_toppar_unlock(rktp);
}



/**
 * Helper method for purging queues when removing a toppar.
 * Locks: rd_kafka_toppar_lock() MUST be held
 */
void rd_kafka_toppar_purge_queues (rd_kafka_toppar_t *rktp) {
        rd_kafka_q_disable(rktp->rktp_fetchq);
        rd_kafka_q_purge(rktp->rktp_fetchq);
        rd_kafka_q_disable(rktp->rktp_ops);
        rd_kafka_q_purge(rktp->rktp_ops);
}


/**
 * Migrate rktp from (optional) \p old_rkb to (optional) \p new_rkb.
 * This is an async operation.
 *
 * Locks: rd_kafka_toppar_lock() MUST be held
 */
static void rd_kafka_toppar_broker_migrate (rd_kafka_toppar_t *rktp,
                                            rd_kafka_broker_t *old_rkb,
                                            rd_kafka_broker_t *new_rkb) {
        rd_kafka_op_t *rko;
        rd_kafka_broker_t *dest_rkb;
        int had_next_leader = rktp->rktp_next_leader ? 1 : 0;

        /* Update next leader */
        if (new_rkb)
                rd_kafka_broker_keep(new_rkb);
        if (rktp->rktp_next_leader)
                rd_kafka_broker_destroy(rktp->rktp_next_leader);
        rktp->rktp_next_leader = new_rkb;

        /* If next_leader is set it means there is already an async
         * migration op going on and we should not send a new one
         * but simply change the next_leader (which we did above). */
        if (had_next_leader)
                return;

	/* Revert from offset-wait state back to offset-query
	 * prior to leaving the broker to avoid stalling
	 * on the new broker waiting for a offset reply from
	 * this old broker (that might not come and thus need
	 * to time out..slowly) */
	if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_OFFSET_WAIT) {
		rd_kafka_toppar_set_fetch_state(
			rktp, RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY);
		rd_kafka_timer_start(&rktp->rktp_rkt->rkt_rk->rk_timers,
				     &rktp->rktp_offset_query_tmr,
				     500*1000,
				     rd_kafka_offset_query_tmr_cb,
				     rktp);
	}

        if (old_rkb) {
                /* If there is an existing broker for this toppar we let it
                 * first handle its own leave and then trigger the join for
                 * the next leader, if any. */
                rko = rd_kafka_op_new(RD_KAFKA_OP_PARTITION_LEAVE);
                dest_rkb = old_rkb;
        } else {
                /* No existing broker, send join op directly to new leader. */
                rko = rd_kafka_op_new(RD_KAFKA_OP_PARTITION_JOIN);
                dest_rkb = new_rkb;
        }

        rko->rko_rktp = rd_kafka_toppar_keep(rktp);

        rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BRKMIGR",
                     "Migrating topic %.*s [%"PRId32"] %p from %s to %s "
		     "(sending %s to %s)",
                     RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
                     rktp->rktp_partition, rktp,
                     old_rkb ? rd_kafka_broker_name(old_rkb) : "(none)",
                     new_rkb ? rd_kafka_broker_name(new_rkb) : "(none)",
		     rd_kafka_op2str(rko->rko_type),
		     rd_kafka_broker_name(dest_rkb));

        rd_kafka_q_enq(dest_rkb->rkb_ops, rko);
}


/**
 * Async toppar leave from broker.
 * Only use this when partitions are to be removed.
 *
 * Locks: rd_kafka_toppar_lock() MUST be held
 */
void rd_kafka_toppar_broker_leave_for_remove (rd_kafka_toppar_t *rktp) {
        rd_kafka_op_t *rko;
        rd_kafka_broker_t *dest_rkb;


	if (rktp->rktp_next_leader)
		dest_rkb = rktp->rktp_next_leader;
	else if (rktp->rktp_leader)
		dest_rkb = rktp->rktp_leader;
	else {
		rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "TOPPARDEL",
			     "%.*s [%"PRId32"] %p not handled by any broker: "
			     "not sending LEAVE for remove",
			     RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
			     rktp->rktp_partition, rktp);
		return;
	}


	/* Revert from offset-wait state back to offset-query
	 * prior to leaving the broker to avoid stalling
	 * on the new broker waiting for a offset reply from
	 * this old broker (that might not come and thus need
	 * to time out..slowly) */
	if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_OFFSET_WAIT)
		rd_kafka_toppar_set_fetch_state(
			rktp, RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY);

	rko = rd_kafka_op_new(RD_KAFKA_OP_PARTITION_LEAVE);
        rko->rko_rktp = rd_kafka_toppar_keep(rktp);

        rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BRKMIGR",
                     "%.*s [%"PRId32"] %p sending final LEAVE for removal by %s",
                     RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
                     rktp->rktp_partition, rktp,
                     rd_kafka_broker_name(dest_rkb));

        rd_kafka_q_enq(dest_rkb->rkb_ops, rko);
}



/**
 * Delegates broker 'rkb' as leader for toppar 'rktp'.
 * 'rkb' may be NULL to undelegate leader.
 *
 * Locks: Caller must have rd_kafka_topic_wrlock(rktp->rktp_rkt) 
 *        AND rd_kafka_toppar_lock(rktp) held.
 */
void rd_kafka_toppar_broker_delegate (rd_kafka_toppar_t *rktp,
				      rd_kafka_broker_t *rkb,
				      int for_removal) {
        rd_kafka_t *rk = rktp->rktp_rkt->rkt_rk;
        int internal_fallback = 0;

	rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BRKDELGT",
		     "%s [%"PRId32"]: delegate to broker %s "
		     "(rktp %p, term %d, ref %d, remove %d)",
		     rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
		     rkb ? rkb->rkb_name : "(none)",
		     rktp, rd_kafka_terminating(rk),
		     rd_refcnt_get(&rktp->rktp_refcnt),
		     for_removal);

        /* Delegate toppars with no leader to the
         * internal broker for bookkeeping. */
        if (!rkb && !for_removal && !rd_kafka_terminating(rk)) {
                rkb = rd_kafka_broker_internal(rk);
                internal_fallback = 1;
        }

	if (rktp->rktp_leader == rkb && !rktp->rktp_next_leader) {
                rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BRKDELGT",
			     "%.*s [%"PRId32"]: not updating broker: "
                             "already on correct broker %s",
			     RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
			     rktp->rktp_partition,
                             rkb ? rd_kafka_broker_name(rkb) : "(none)");

                if (internal_fallback)
                        rd_kafka_broker_destroy(rkb);
		return;
        }

	if (rktp->rktp_leader)
		rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BRKDELGT",
			     "%.*s [%"PRId32"]: broker %s no longer leader",
			     RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
			     rktp->rktp_partition,
			     rd_kafka_broker_name(rktp->rktp_leader));


	if (rkb) {
		rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BRKDELGT",
			     "%.*s [%"PRId32"]: broker %s is now leader "
			     "for partition with %i messages "
			     "(%"PRIu64" bytes) queued",
			     RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
			     rktp->rktp_partition,
			     rd_kafka_broker_name(rkb),
                             rktp->rktp_msgq.rkmq_msg_cnt,
                             rktp->rktp_msgq.rkmq_msg_bytes);


	} else {
		rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BRKDELGT",
			     "%.*s [%"PRId32"]: no leader broker",
			     RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
			     rktp->rktp_partition);
	}

        if (rktp->rktp_leader || rkb)
                rd_kafka_toppar_broker_migrate(rktp, rktp->rktp_leader, rkb);

        if (internal_fallback)
                rd_kafka_broker_destroy(rkb);
}





void
rd_kafka_toppar_offset_commit_result (rd_kafka_toppar_t *rktp,
				      rd_kafka_resp_err_t err,
				      rd_kafka_topic_partition_list_t *offsets){
	if (err) {
		rd_kafka_q_op_err(rktp->rktp_fetchq,
				  RD_KAFKA_OP_CONSUMER_ERR,
				  err, 0 /* FIXME:VERSION*/,
				  rktp, 0,
				  "Offset commit failed: %s",
				  rd_kafka_err2str(err));
		return;
	}

	rd_kafka_toppar_lock(rktp);
	rktp->rktp_committed_offset = offsets->elems[0].offset;

	/* When stopping toppars:
	 * Final commit is now done (or failed), propagate. */
	if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_STOPPING)
		rd_kafka_toppar_fetch_stopped(rktp, err);

	rd_kafka_toppar_unlock(rktp);
}


/**
 * Commit toppar's offset on broker.
 * This is an asynch operation, this function simply enqueues an op
 * on the cgrp's queue.
 *
 * Locality: rktp's broker thread
 */
void rd_kafka_toppar_offset_commit (rd_kafka_toppar_t *rktp, int64_t offset,
				    const char *metadata) {
        rd_kafka_topic_partition_list_t *offsets;
        rd_kafka_topic_partition_t *rktpar;

        rd_kafka_assert(rktp->rktp_rkt->rkt_rk, rktp->rktp_cgrp != NULL);
        rd_kafka_assert(rktp->rktp_rkt->rkt_rk,
                        rktp->rktp_flags & RD_KAFKA_TOPPAR_F_OFFSET_STORE);

        rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, CGRP, "OFFSETCMT",
                     "%.*s [%"PRId32"]: committing offset %"PRId64,
                     RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
                     rktp->rktp_partition, offset);

        offsets = rd_kafka_topic_partition_list_new(1);
        rktpar = rd_kafka_topic_partition_list_add(
                offsets, rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition);
        rktpar->offset = offset;
        if (metadata) {
                rktpar->metadata = rd_strdup(metadata);
                rktpar->metadata_size = strlen(metadata);
        }

        rktp->rktp_committing_offset = offset;

        rd_kafka_commit(rktp->rktp_rkt->rkt_rk, offsets, 1/*async*/);

        rd_kafka_topic_partition_list_destroy(offsets);
}














/**
 * Handle the next offset to consume for a toppar.
 * This is used during initial setup when trying to figure out what
 * offset to start consuming from.
 *
 * Locality: toppar handler thread.
 * Locks: toppar_lock(rktp) must be held
 */
void rd_kafka_toppar_next_offset_handle (rd_kafka_toppar_t *rktp,
                                         int64_t Offset) {

        if (RD_KAFKA_OFFSET_IS_LOGICAL(Offset)) {
                /* Offset storage returned logical offset (e.g. "end"),
                 * look it up. */
                rd_kafka_offset_reset(rktp, Offset, RD_KAFKA_RESP_ERR_NO_ERROR,
                                      "update");
                return;
        }

        /* Adjust by TAIL count if, if wanted */
        if (rktp->rktp_query_offset <=
            RD_KAFKA_OFFSET_TAIL_BASE) {
                int64_t orig_Offset = Offset;
                int64_t tail_cnt =
                        llabs(rktp->rktp_query_offset -
                              RD_KAFKA_OFFSET_TAIL_BASE);

                if (tail_cnt > Offset)
                        Offset = 0;
                else
                        Offset -= tail_cnt;

                rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET",
                             "OffsetReply for topic %s [%"PRId32"]: "
                             "offset %"PRId64": adjusting for "
                             "OFFSET_TAIL(%"PRId64"): "
                             "effective offset %"PRId64,
                             rktp->rktp_rkt->rkt_topic->str,
                             rktp->rktp_partition,
                             orig_Offset, tail_cnt,
                             Offset);
        }

        rktp->rktp_next_offset = Offset;

        rd_kafka_toppar_set_fetch_state(rktp, RD_KAFKA_TOPPAR_FETCH_ACTIVE);

        /* Wake-up broker thread which might be idling on IO */
        if (rktp->rktp_leader)
                rd_kafka_broker_wakeup(rktp->rktp_leader);

}



/**
 * Fetch stored offset for a single partition. (simple consumer)
 *
 * Locality: toppar thread
 */
void rd_kafka_toppar_offset_fetch (rd_kafka_toppar_t *rktp,
                                   rd_kafka_replyq_t replyq) {
        rd_kafka_t *rk = rktp->rktp_rkt->rkt_rk;
        rd_kafka_topic_partition_list_t *part;
        rd_kafka_op_t *rko;

        rd_kafka_dbg(rk, TOPIC, "OFFSETREQ",
                     "Partition %.*s [%"PRId32"]: querying cgrp for "
                     "stored offset (opv %d)",
                     RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
                     rktp->rktp_partition, replyq.version);

        part = rd_kafka_topic_partition_list_new(1);
        rd_kafka_topic_partition_list_add0(part,
                                           rktp->rktp_rkt->rkt_topic->str,
                                           rktp->rktp_partition,
					   rd_kafka_toppar_keep(rktp));

        rko = rd_kafka_op_new(RD_KAFKA_OP_OFFSET_FETCH);
	rko->rko_rktp = rd_kafka_toppar_keep(rktp);
	rko->rko_replyq = replyq;

	rko->rko_u.offset_fetch.partitions = part;
	rko->rko_u.offset_fetch.do_free = 1;

        rd_kafka_q_enq(rktp->rktp_cgrp->rkcg_ops, rko);
}




/**
 * Toppar based OffsetResponse handling.
 * This is used for finding the next offset to Fetch.
 *
 * Locality: toppar handler thread
 */
static void rd_kafka_toppar_handle_Offset (rd_kafka_t *rk,
					   rd_kafka_broker_t *rkb,
					   rd_kafka_resp_err_t err,
					   rd_kafka_buf_t *rkbuf,
					   rd_kafka_buf_t *request,
					   void *opaque) {
        shptr_rd_kafka_toppar_t *s_rktp = opaque;
        rd_kafka_toppar_t *rktp = rd_kafka_toppar_s2i(s_rktp);
        rd_kafka_topic_partition_list_t *offsets;
        rd_kafka_topic_partition_t *rktpar;
        int64_t Offset;

	rd_kafka_toppar_lock(rktp);
	/* Drop reply from previous partition leader */
	if (err != RD_KAFKA_RESP_ERR__DESTROY && rktp->rktp_leader != rkb)
		err = RD_KAFKA_RESP_ERR__OUTDATED;
	rd_kafka_toppar_unlock(rktp);

        offsets = rd_kafka_topic_partition_list_new(1);

        /* Parse and return Offset */
        err = rd_kafka_handle_Offset(rkb->rkb_rk, rkb, err,
                                     rkbuf, request, offsets);

	rd_rkb_dbg(rkb, TOPIC, "OFFSET",
		   "Offset reply for "
		   "topic %.*s [%"PRId32"] (v%d vs v%d)",
		   RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
		   rktp->rktp_partition, request->rkbuf_replyq.version,
		   rktp->rktp_op_version);

	rd_dassert(request->rkbuf_replyq.version > 0);
	if (err != RD_KAFKA_RESP_ERR__DESTROY &&
            rd_kafka_buf_version_outdated(request, rktp->rktp_op_version)) {
		/* Outdated request response, ignore. */
		    err = RD_KAFKA_RESP_ERR__OUTDATED;
	}

        if (!err &&
            (!(rktpar = rd_kafka_topic_partition_list_find(
                       offsets,
                       rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition))))
                err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;

        if (err) {
                rd_kafka_op_t *rko;

                rd_rkb_dbg(rkb, TOPIC, "OFFSET",
                           "Offset reply error for "
                           "topic %.*s [%"PRId32"] (v%d): %s",
                           RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
                           rktp->rktp_partition, request->rkbuf_replyq.version,
			   rd_kafka_err2str(err));

                rd_kafka_topic_partition_list_destroy(offsets);

                if (err == RD_KAFKA_RESP_ERR__DESTROY ||
                    err == RD_KAFKA_RESP_ERR__OUTDATED) {
                        /* Termination or outdated, quick cleanup. */

                        /* from request.opaque */
                        rd_kafka_toppar_destroy(s_rktp);
                        return;

		} else if (err == RD_KAFKA_RESP_ERR__IN_PROGRESS)
			return; /* Retry in progress */


                rd_kafka_toppar_lock(rktp);
                rd_kafka_offset_reset(rktp, rktp->rktp_query_offset,
                                      err,
                                      "failed to query logical offset");

                /* Signal error back to application,
                 * unless this is an intermittent problem
                 * (e.g.,connection lost) */
                rko = rd_kafka_op_new(RD_KAFKA_OP_CONSUMER_ERR);
                rko->rko_err = err;
                if (rktp->rktp_query_offset <=
                    RD_KAFKA_OFFSET_TAIL_BASE)
                        rko->rko_u.err.offset =
                                rktp->rktp_query_offset -
                                RD_KAFKA_OFFSET_TAIL_BASE;
                else
                        rko->rko_u.err.offset = rktp->rktp_query_offset;
                rd_kafka_toppar_unlock(rktp);
                rko->rko_rktp = rd_kafka_toppar_keep(rktp);

                rd_kafka_q_enq(rktp->rktp_fetchq, rko);

                rd_kafka_toppar_destroy(s_rktp); /* from request.opaque */
                return;
        }

        Offset = rktpar->offset;
        rd_kafka_topic_partition_list_destroy(offsets);

	rd_kafka_toppar_lock(rktp);
        rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET",
                     "Offset %s request for %.*s [%"PRId32"] "
                     "returned offset %s (%"PRId64")",
                     rd_kafka_offset2str(rktp->rktp_query_offset),
                     RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
                     rktp->rktp_partition, rd_kafka_offset2str(Offset), Offset);

        rd_kafka_toppar_next_offset_handle(rktp, Offset);
	rd_kafka_toppar_unlock(rktp);

        rd_kafka_toppar_destroy(s_rktp); /* from request.opaque */
}

/**
 * Send OffsetRequest for toppar.
 *
 * If \p backoff_ms is non-zero only the query timer is started,
 * otherwise a query is triggered directly.
 *
 * Locality: toppar handler thread
 * Locks: toppar_lock() must be held
 */
void rd_kafka_toppar_offset_request (rd_kafka_toppar_t *rktp,
				     int64_t query_offset, int backoff_ms) {
	rd_kafka_broker_t *rkb;

	rd_kafka_assert(NULL,
			thrd_is_current(rktp->rktp_rkt->rkt_rk->rk_thread));

        rkb = rktp->rktp_leader;

        if (!backoff_ms && (!rkb || rkb->rkb_source == RD_KAFKA_INTERNAL))
                backoff_ms = 500;

        if (backoff_ms) {
		rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET",
			     "%s [%"PRId32"]: %s"
			     "starting offset query timer for offset %s",
			     rktp->rktp_rkt->rkt_topic->str,
			     rktp->rktp_partition,
                             !rkb ? "no current leader for partition, " : "",
			     rd_kafka_offset2str(query_offset));

                rd_kafka_toppar_set_fetch_state(
                        rktp, RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY);
		rd_kafka_timer_start(&rktp->rktp_rkt->rkt_rk->rk_timers,
				     &rktp->rktp_offset_query_tmr,
				     backoff_ms*1000ll,
				     rd_kafka_offset_query_tmr_cb, rktp);
		return;
        }


        rd_kafka_timer_stop(&rktp->rktp_rkt->rkt_rk->rk_timers,
                            &rktp->rktp_offset_query_tmr, 1/*lock*/);


	if (query_offset == RD_KAFKA_OFFSET_STORED &&
            rktp->rktp_rkt->rkt_conf.offset_store_method ==
            RD_KAFKA_OFFSET_METHOD_BROKER) {
                /*
                 * Get stored offset from broker based storage:
                 * ask cgrp manager for offsets
                 */
                rd_kafka_toppar_offset_fetch(
			rktp,
			RD_KAFKA_REPLYQ(rktp->rktp_ops,
					rktp->rktp_op_version));

	} else {
                shptr_rd_kafka_toppar_t *s_rktp;
                rd_kafka_topic_partition_list_t *offsets;

                /*
                 * Look up logical offset (end,beginning,tail,..)
                 */

                rd_rkb_dbg(rkb, TOPIC, "OFFREQ",
                           "Partition %.*s [%"PRId32"]: querying for logical "
                           "offset %s (opv %d)",
                           RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
                           rktp->rktp_partition,
                           rd_kafka_offset2str(query_offset),
			   rktp->rktp_op_version);

                s_rktp = rd_kafka_toppar_keep(rktp);

		if (query_offset <= RD_KAFKA_OFFSET_TAIL_BASE)
			query_offset = RD_KAFKA_OFFSET_END;

                offsets = rd_kafka_topic_partition_list_new(1);
                rd_kafka_topic_partition_list_add(
                        offsets,
                        rktp->rktp_rkt->rkt_topic->str,
                        rktp->rktp_partition)->offset = query_offset;

                rd_kafka_OffsetRequest(rkb, offsets, 0,
                                       RD_KAFKA_REPLYQ(rktp->rktp_ops,
                                                       rktp->rktp_op_version),
                                       rd_kafka_toppar_handle_Offset,
                                       s_rktp);

                rd_kafka_topic_partition_list_destroy(offsets);
        }

        rd_kafka_toppar_set_fetch_state(rktp,
					RD_KAFKA_TOPPAR_FETCH_OFFSET_WAIT);
}


/**
 * Start fetching toppar.
 *
 * Locality: toppar handler thread
 * Locks: none
 */
static void rd_kafka_toppar_fetch_start (rd_kafka_toppar_t *rktp,
					 int64_t offset,
					 rd_kafka_op_t *rko_orig) {
        rd_kafka_cgrp_t *rkcg = rko_orig->rko_u.fetch_start.rkcg;
        rd_kafka_resp_err_t err = 0;
        int32_t version = rko_orig->rko_version;

	rd_kafka_toppar_lock(rktp);

        rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "FETCH",
                     "Start fetch for %.*s [%"PRId32"] in "
                     "state %s at offset %s (v%"PRId32")",
                     RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
                     rktp->rktp_partition,
                     rd_kafka_fetch_states[rktp->rktp_fetch_state],
                     rd_kafka_offset2str(offset), version);

        if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_STOPPING) {
                err = RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS;
		rd_kafka_toppar_unlock(rktp);
                goto err_reply;
        }

	rktp->rktp_op_version = version;

        if (rkcg) {
                rd_kafka_assert(rktp->rktp_rkt->rkt_rk, !rktp->rktp_cgrp);
                /* Attach toppar to cgrp */
                rktp->rktp_cgrp = rkcg;
                rd_kafka_cgrp_op(rkcg, rktp, RD_KAFKA_NO_REPLYQ,
                                 RD_KAFKA_OP_PARTITION_JOIN, 0);
        }


        if (offset == RD_KAFKA_OFFSET_BEGINNING ||
	    offset == RD_KAFKA_OFFSET_END ||
            offset <= RD_KAFKA_OFFSET_TAIL_BASE) {
		rd_kafka_toppar_next_offset_handle(rktp, offset);

	} else if (offset == RD_KAFKA_OFFSET_STORED) {
                rd_kafka_offset_store_init(rktp);

	} else if (offset == RD_KAFKA_OFFSET_INVALID) {
		rd_kafka_offset_reset(rktp, offset,
				      RD_KAFKA_RESP_ERR__NO_OFFSET,
				      "no previously committed offset "
				      "available");

	} else {
		rktp->rktp_next_offset = offset;
                rd_kafka_toppar_set_fetch_state(rktp,
						RD_KAFKA_TOPPAR_FETCH_ACTIVE);

                /* Wake-up broker thread which might be idling on IO */
                if (rktp->rktp_leader)
                        rd_kafka_broker_wakeup(rktp->rktp_leader);

	}

        rktp->rktp_offsets_fin.eof_offset = RD_KAFKA_OFFSET_INVALID;

	rd_kafka_toppar_unlock(rktp);

        /* Signal back to caller thread that start has commenced, or err */
err_reply:
        if (rko_orig->rko_replyq.q) {
                rd_kafka_op_t *rko;

                rko = rd_kafka_op_new(RD_KAFKA_OP_FETCH_START);

                rko->rko_err = err;
                rko->rko_rktp = rd_kafka_toppar_keep(rktp);

                rd_kafka_replyq_enq(&rko_orig->rko_replyq, rko, 0);
        }
}




/**
 * Mark toppar's fetch state as stopped (all decommissioning is done,
 * offsets are stored, etc).
 *
 * Locality: toppar handler thread
 * Locks: toppar_lock(rktp) MUST be held
 */
void rd_kafka_toppar_fetch_stopped (rd_kafka_toppar_t *rktp,
                                    rd_kafka_resp_err_t err) {


        rd_kafka_toppar_set_fetch_state(rktp, RD_KAFKA_TOPPAR_FETCH_STOPPED);

        if (rktp->rktp_cgrp) {
                /* Detach toppar from cgrp */
                rd_kafka_cgrp_op(rktp->rktp_cgrp, rktp, RD_KAFKA_NO_REPLYQ,
                                 RD_KAFKA_OP_PARTITION_LEAVE, 0);
                rktp->rktp_cgrp = NULL;
        }

        /* Signal back to application thread that stop is done. */
	if (rktp->rktp_replyq.q) {
		rd_kafka_op_t *rko;
		rko = rd_kafka_op_new(RD_KAFKA_OP_FETCH_STOP|RD_KAFKA_OP_REPLY);
                rko->rko_err = err;
		rko->rko_rktp = rd_kafka_toppar_keep(rktp);

		rd_kafka_replyq_enq(&rktp->rktp_replyq, rko, 0);
	}
}


/**
 * Stop toppar fetcher.
 * This is usually an async operation.
 *
 * Locality: toppar handler thread
 */
void rd_kafka_toppar_fetch_stop (rd_kafka_toppar_t *rktp,
				 rd_kafka_op_t *rko_orig) {
        int32_t version = rko_orig->rko_version;

	rd_kafka_toppar_lock(rktp);

        rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "FETCH",
                     "Stopping fetch for %.*s [%"PRId32"] in state %s (v%d)",
                     RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
                     rktp->rktp_partition,
                     rd_kafka_fetch_states[rktp->rktp_fetch_state], version);

	rktp->rktp_op_version = version;

	/* Abort pending offset lookups. */
	if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY)
		rd_kafka_timer_stop(&rktp->rktp_rkt->rkt_rk->rk_timers,
				    &rktp->rktp_offset_query_tmr,
				    1/*lock*/);

        /* Clear out the forwarding queue. */
        rd_kafka_q_fwd_set(rktp->rktp_fetchq, NULL);

        /* Assign the future replyq to propagate stop results. */
        rd_kafka_assert(rktp->rktp_rkt->rkt_rk, rktp->rktp_replyq.q == NULL);
	if (rko_orig) {
		rktp->rktp_replyq = rko_orig->rko_replyq;
		rd_kafka_replyq_clear(&rko_orig->rko_replyq);
	}
        rd_kafka_toppar_set_fetch_state(rktp, RD_KAFKA_TOPPAR_FETCH_STOPPING);

        /* Stop offset store (possibly async).
         * NOTE: will call .._stopped() if store finishes immediately,
         *       so no more operations after this call! */
        rd_kafka_offset_store_stop(rktp);

	rd_kafka_toppar_unlock(rktp);
}


/**
 * Update a toppars offset.
 * The toppar must have been previously FETCH_START:ed
 *
 * Locality: toppar handler thread
 */
void rd_kafka_toppar_seek (rd_kafka_toppar_t *rktp,
			   int64_t offset, rd_kafka_op_t *rko_orig) {
        rd_kafka_resp_err_t err = 0;
        int32_t version = rko_orig->rko_version;

	rd_kafka_toppar_lock(rktp);

        rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "FETCH",
                     "Seek %.*s [%"PRId32"] to offset %s "
                     "in state %s (v%"PRId32")",
                     RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
                     rktp->rktp_partition,
		     rd_kafka_offset2str(offset),
                     rd_kafka_fetch_states[rktp->rktp_fetch_state], version);


        if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_STOPPING) {
                err = RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS;
                goto err_reply;
        } else if (!RD_KAFKA_TOPPAR_FETCH_IS_STARTED(rktp->rktp_fetch_state)) {
                err = RD_KAFKA_RESP_ERR__STATE;
                goto err_reply;
        } else if (offset == RD_KAFKA_OFFSET_STORED) {
		err = RD_KAFKA_RESP_ERR__INVALID_ARG;
		goto err_reply;
	}

	rktp->rktp_op_version = version;

	/* Abort pending offset lookups. */
	if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY)
		rd_kafka_timer_stop(&rktp->rktp_rkt->rkt_rk->rk_timers,
				    &rktp->rktp_offset_query_tmr,
				    1/*lock*/);

	if (RD_KAFKA_OFFSET_IS_LOGICAL(offset))
		rd_kafka_toppar_next_offset_handle(rktp, offset);
	else {
		rktp->rktp_next_offset = offset;
                rd_kafka_toppar_set_fetch_state(rktp,
						RD_KAFKA_TOPPAR_FETCH_ACTIVE);

                /* Wake-up broker thread which might be idling on IO */
                if (rktp->rktp_leader)
                        rd_kafka_broker_wakeup(rktp->rktp_leader);
	}

        /* Signal back to caller thread that seek has commenced, or err */
err_reply:
	rd_kafka_toppar_unlock(rktp);

        if (rko_orig && rko_orig->rko_replyq.q) {
                rd_kafka_op_t *rko;

                rko = rd_kafka_op_new(RD_KAFKA_OP_SEEK|RD_KAFKA_OP_REPLY);

                rko->rko_err = err;
		rko->rko_u.fetch_start.offset =
			rko_orig->rko_u.fetch_start.offset;
                rko->rko_rktp = rd_kafka_toppar_keep(rktp);

                rd_kafka_replyq_enq(&rko_orig->rko_replyq, rko, 0);
        }
}


static void rd_kafka_toppar_pause_resume (rd_kafka_toppar_t *rktp,
					  rd_kafka_op_t *rko_orig) {
	rd_kafka_t *rk = rktp->rktp_rkt->rkt_rk;
	int pause = rko_orig->rko_u.pause.pause;
	int flag = rko_orig->rko_u.pause.flag;
        int32_t version = rko_orig->rko_version;

	rd_kafka_toppar_lock(rktp);

	rktp->rktp_op_version = version;

	if (pause) {
		/* Pause partition */
		rktp->rktp_flags |= flag;

		if (rk->rk_type == RD_KAFKA_CONSUMER) {
			/* Save offset of last consumed message+1 as the
			 * next message to fetch on resume. */
			if (rktp->rktp_app_offset != RD_KAFKA_OFFSET_INVALID) {
				rktp->rktp_next_offset = rktp->rktp_app_offset;
			}

			rd_kafka_dbg(rk, TOPIC, pause?"PAUSE":"RESUME",
				     "%s %s [%"PRId32"]: at offset %s "
				     "(state %s, v%d)",
				     pause ? "Pause":"Resume",
				     rktp->rktp_rkt->rkt_topic->str,
				     rktp->rktp_partition,
				     rd_kafka_offset2str(
					     rktp->rktp_next_offset),
				     rd_kafka_fetch_states[rktp->
							   rktp_fetch_state],
				     version);
		} else {
			rd_kafka_dbg(rk, TOPIC, pause?"PAUSE":"RESUME",
				     "%s %s [%"PRId32"] (state %s, v%d)",
				     pause ? "Pause":"Resume",
				     rktp->rktp_rkt->rkt_topic->str,
				     rktp->rktp_partition,
				     rd_kafka_fetch_states[rktp->
							   rktp_fetch_state],
				     version);
			}

	} else {
		/* Resume partition */
		rktp->rktp_flags &= ~flag;

		if (rk->rk_type == RD_KAFKA_CONSUMER) {
			rd_kafka_dbg(rk, TOPIC, pause?"PAUSE":"RESUME",
				     "%s %s [%"PRId32"]: at offset %s "
				     "(state %s, v%d)",
				     rktp->rktp_fetch_state ==
				     RD_KAFKA_TOPPAR_FETCH_ACTIVE ?
				     "Resuming" : "Not resuming stopped",
				     rktp->rktp_rkt->rkt_topic->str,
				     rktp->rktp_partition,
				     rd_kafka_offset2str(
					     rktp->rktp_next_offset),
				     rd_kafka_fetch_states[rktp->
							   rktp_fetch_state],
				     version);

			/* If the resuming offset is logical we
			 * need to trigger a seek (that performs the
			 * logical->absolute lookup logic) to get
			 * things going.
			 * Typical case is when a partition is paused
			 * before anything has been consumed by app
			 * yet thus having rktp_app_offset=INVALID. */
			if ((rktp->rktp_fetch_state ==
			     RD_KAFKA_TOPPAR_FETCH_ACTIVE ||
			     rktp->rktp_fetch_state ==
			     RD_KAFKA_TOPPAR_FETCH_OFFSET_WAIT) &&
			    rktp->rktp_next_offset == RD_KAFKA_OFFSET_INVALID)
				rd_kafka_toppar_next_offset_handle(
					rktp, rktp->rktp_next_offset);

		} else
			rd_kafka_dbg(rk, TOPIC, pause?"PAUSE":"RESUME",
				     "%s %s [%"PRId32"] (state %s, v%d)",
				     pause ? "Pause":"Resume",
				     rktp->rktp_rkt->rkt_topic->str,
				     rktp->rktp_partition,
				     rd_kafka_fetch_states[rktp->
							   rktp_fetch_state],
				     version);
	}
	rd_kafka_toppar_unlock(rktp);

	if (pause && rk->rk_type == RD_KAFKA_CONSUMER) {
		/* Flush partition's fetch queue */
		rd_kafka_q_purge_toppar_version(rktp->rktp_fetchq, rktp,
						rko_orig->rko_version);
	}
}




/**
 * @brief Decide whether this toppar should be on the fetch list or not.
 *
 * Also:
 *  - update toppar's op version (for broker thread's copy)
 *  - finalize statistics (move rktp_offsets to rktp_offsets_fin)
 *
 * @returns the partition's Fetch backoff timestamp, or 0 if no backoff.
 *
 * @locality broker thread
 */
rd_ts_t rd_kafka_toppar_fetch_decide (rd_kafka_toppar_t *rktp,
				   rd_kafka_broker_t *rkb,
				   int force_remove) {
        int should_fetch = 1;
        const char *reason = "";
        int32_t version;
        rd_ts_t ts_backoff = 0;

	rd_kafka_toppar_lock(rktp);

	/* Forced removal from fetch list */
	if (unlikely(force_remove)) {
		reason = "forced removal";
		should_fetch = 0;
		goto done;
	}

	if (unlikely((rktp->rktp_flags & RD_KAFKA_TOPPAR_F_REMOVE) != 0)) {
		reason = "partition removed";
		should_fetch = 0;
		goto done;
	}

	/* Skip toppars not in active fetch state */
	if (rktp->rktp_fetch_state != RD_KAFKA_TOPPAR_FETCH_ACTIVE) {
                reason = "not in active fetch state";
		should_fetch = 0;
		goto done;
	}

        /* Update broker thread's fetch op version */
        version = rktp->rktp_op_version;
        if (version > rktp->rktp_fetch_version ||
	    rktp->rktp_next_offset != rktp->rktp_last_next_offset) {
                /* New version barrier, something was modified from the
                 * control plane. Reset and start over.
		 * Alternatively only the next_offset changed but not the
		 * barrier, which is the case when automatically triggering
		 * offset.reset (such as on PARTITION_EOF). */

                rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "FETCHDEC",
                             "Topic %s [%"PRId32"]: fetch decide: "
                             "updating to version %d (was %d) at "
                             "offset %"PRId64" (was %"PRId64")",
                             rktp->rktp_rkt->rkt_topic->str,
                             rktp->rktp_partition,
                             version, rktp->rktp_fetch_version,
                             rktp->rktp_next_offset,
                             rktp->rktp_offsets.fetch_offset);

                rd_kafka_offset_stats_reset(&rktp->rktp_offsets);

                /* New start offset */
                rktp->rktp_offsets.fetch_offset = rktp->rktp_next_offset;
		rktp->rktp_last_next_offset = rktp->rktp_next_offset;

                rktp->rktp_fetch_version = version;

                rd_kafka_q_purge_toppar_version(rktp->rktp_fetchq, rktp,
                                                version);
        }


	if (RD_KAFKA_TOPPAR_IS_PAUSED(rktp)) {
		should_fetch = 0;
		reason = "paused";

	} else if (RD_KAFKA_OFFSET_IS_LOGICAL(rktp->rktp_next_offset)) {
                should_fetch = 0;
                reason = "no concrete offset";

        } else if (rd_kafka_q_len(rktp->rktp_fetchq) >=
		   rkb->rkb_rk->rk_conf.queued_min_msgs) {
		/* Skip toppars who's local message queue is already above
		 * the lower threshold. */
                reason = "queued.min.messages exceeded";
                should_fetch = 0;

        } else if ((int64_t)rd_kafka_q_size(rktp->rktp_fetchq) >=
            rkb->rkb_rk->rk_conf.queued_max_msg_bytes) {
                reason = "queued.max.messages.kbytes exceeded";
                should_fetch = 0;

        } else if (rktp->rktp_ts_fetch_backoff > rd_clock()) {
                reason = "fetch backed off";
                ts_backoff = rktp->rktp_ts_fetch_backoff;
                should_fetch = 0;
        }

 done:
        /* Copy offset stats to finalized place holder. */
        rktp->rktp_offsets_fin = rktp->rktp_offsets;

        if (rktp->rktp_fetch != should_fetch) {
                rd_rkb_dbg(rkb, FETCH, "FETCH",
                           "Topic %s [%"PRId32"] in state %s at offset %s "
                           "(%d/%d msgs, %"PRId64"/%d kb queued, "
			   "opv %"PRId32") is %sfetchable: %s",
                           rktp->rktp_rkt->rkt_topic->str,
                           rktp->rktp_partition,
			   rd_kafka_fetch_states[rktp->rktp_fetch_state],
                           rd_kafka_offset2str(rktp->rktp_next_offset),
                           rd_kafka_q_len(rktp->rktp_fetchq),
                           rkb->rkb_rk->rk_conf.queued_min_msgs,
                           rd_kafka_q_size(rktp->rktp_fetchq) / 1024,
                           rkb->rkb_rk->rk_conf.queued_max_msg_kbytes,
			   rktp->rktp_fetch_version,
                           should_fetch ? "" : "not ", reason);

                if (should_fetch) {
			rd_dassert(rktp->rktp_fetch_version > 0);
                        rd_kafka_broker_active_toppar_add(rkb, rktp);
                } else {
                        rd_kafka_broker_active_toppar_del(rkb, rktp);
                        /* Non-fetching partitions will have an
                         * indefinate backoff, unless explicitly specified. */
                        if (!ts_backoff)
                                ts_backoff = RD_TS_MAX;
                }
        }

        rd_kafka_toppar_unlock(rktp);

        return ts_backoff;
}


/**
 * @brief Serve a toppar in a consumer broker thread.
 *        This is considered the fast path and should be minimal,
 *        mostly focusing on fetch related mechanisms.
 *
 * @returns the partition's Fetch backoff timestamp, or 0 if no backoff.
 *
 * @locality broker thread
 * @locks none
 */
rd_ts_t rd_kafka_broker_consumer_toppar_serve (rd_kafka_broker_t *rkb,
                                               rd_kafka_toppar_t *rktp) {
        return rd_kafka_toppar_fetch_decide(rktp, rkb, 0);
}



/**
 * Serve a toppar op
 * 'rktp' may be NULL for certain ops (OP_RECV_BUF)
 *
 * @locality toppar handler thread
 */
static rd_kafka_op_res_t
rd_kafka_toppar_op_serve (rd_kafka_t *rk,
                          rd_kafka_q_t *rkq, rd_kafka_op_t *rko,
                          rd_kafka_q_cb_type_t cb_type, void *opaque) {
	rd_kafka_toppar_t *rktp = NULL;
	int outdated = 0;

	if (rko->rko_rktp)
		rktp = rd_kafka_toppar_s2i(rko->rko_rktp);

	if (rktp) {
		outdated = rd_kafka_op_version_outdated(rko,
							rktp->rktp_op_version);

		rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OP",
			     "%.*s [%"PRId32"] received %sop %s "
			     "(v%"PRId32") in fetch-state %s (opv%d)",
			     RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
			     rktp->rktp_partition,
			     outdated ? "outdated ": "",
			     rd_kafka_op2str(rko->rko_type),
			     rko->rko_version,
			     rd_kafka_fetch_states[rktp->rktp_fetch_state],
			     rktp->rktp_op_version);

		if (outdated) {
#if ENABLE_DEVEL
			rd_kafka_op_print(stdout, "PART_OUTDATED", rko);
#endif
                        rd_kafka_op_destroy(rko);
			return RD_KAFKA_OP_RES_HANDLED;
		}
	}

	switch ((int)rko->rko_type)
	{
	case RD_KAFKA_OP_FETCH_START:
		rd_kafka_toppar_fetch_start(rktp,
					    rko->rko_u.fetch_start.offset, rko);
		break;

	case RD_KAFKA_OP_FETCH_STOP:
		rd_kafka_toppar_fetch_stop(rktp, rko);
		break;

	case RD_KAFKA_OP_SEEK:
		rd_kafka_toppar_seek(rktp, rko->rko_u.fetch_start.offset, rko);
		break;

	case RD_KAFKA_OP_PAUSE:
		rd_kafka_toppar_pause_resume(rktp, rko);
		break;

        case RD_KAFKA_OP_OFFSET_COMMIT | RD_KAFKA_OP_REPLY:
                rd_kafka_assert(NULL, rko->rko_u.offset_commit.cb);
                rko->rko_u.offset_commit.cb(
                        rk, rko->rko_err,
                        rko->rko_u.offset_commit.partitions,
                        rko->rko_u.offset_commit.opaque);
                break;

	case RD_KAFKA_OP_OFFSET_FETCH | RD_KAFKA_OP_REPLY:
        {
                /* OffsetFetch reply */
                rd_kafka_topic_partition_list_t *offsets =
			rko->rko_u.offset_fetch.partitions;
                shptr_rd_kafka_toppar_t *s_rktp;
		int64_t offset = RD_KAFKA_OFFSET_INVALID;

                s_rktp = offsets->elems[0]._private;
                if (!rko->rko_err) {
                        /* Request succeeded but per-partition might have failed */
                        rko->rko_err = offsets->elems[0].err;
			offset       = offsets->elems[0].offset;
                }
                offsets->elems[0]._private = NULL;
                rd_kafka_topic_partition_list_destroy(offsets);
		rko->rko_u.offset_fetch.partitions = NULL;
                rktp = rd_kafka_toppar_s2i(s_rktp);

		rd_kafka_timer_stop(&rktp->rktp_rkt->rkt_rk->rk_timers,
				    &rktp->rktp_offset_query_tmr,
				    1/*lock*/);

		rd_kafka_toppar_lock(rktp);

		if (rko->rko_err) {
			rd_kafka_dbg(rktp->rktp_rkt->rkt_rk,
				     TOPIC, "OFFSET",
				     "Failed to fetch offset for "
				     "%.*s [%"PRId32"]: %s",
				     RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
				     rktp->rktp_partition,
				     rd_kafka_err2str(rko->rko_err));

			/* Keep on querying until we succeed. */
			rd_kafka_toppar_set_fetch_state(rktp, RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY);

			rd_kafka_toppar_unlock(rktp);

			rd_kafka_timer_start(&rktp->rktp_rkt->rkt_rk->rk_timers,
					     &rktp->rktp_offset_query_tmr,
					     500*1000,
					     rd_kafka_offset_query_tmr_cb,
					     rktp);

			/* Propagate error to application */
			if (rko->rko_err != RD_KAFKA_RESP_ERR__WAIT_COORD) {
				rd_kafka_q_op_err(rktp->rktp_fetchq,
						  RD_KAFKA_OP_ERR, rko->rko_err,
						  0, rktp, 0,
						  "Failed to fetch "
						  "offsets from brokers: %s",
						  rd_kafka_err2str(rko->rko_err));
			}

			rd_kafka_toppar_destroy(s_rktp);

			break;
		}

		rd_kafka_dbg(rktp->rktp_rkt->rkt_rk,
			     TOPIC, "OFFSET",
			     "%.*s [%"PRId32"]: OffsetFetch returned "
			     "offset %s (%"PRId64")",
			     RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
			     rktp->rktp_partition,
			     rd_kafka_offset2str(offset), offset);

		if (offset > 0)
			rktp->rktp_committed_offset = offset;

		if (offset >= 0)
			rd_kafka_toppar_next_offset_handle(rktp, offset);
		else
			rd_kafka_offset_reset(rktp, offset,
					      RD_KAFKA_RESP_ERR__NO_OFFSET,
					      "no previously committed offset "
					      "available");
		rd_kafka_toppar_unlock(rktp);

                rd_kafka_toppar_destroy(s_rktp);
        }
        break;

        default:
                rd_kafka_assert(NULL, !*"unknown type");
                break;
        }

        rd_kafka_op_destroy(rko);

        return RD_KAFKA_OP_RES_HANDLED;
}





/**
 * Send command op to toppar (handled by toppar's thread).
 *
 * Locality: any thread
 */
static void rd_kafka_toppar_op0 (rd_kafka_toppar_t *rktp, rd_kafka_op_t *rko,
				 rd_kafka_replyq_t replyq) {
        rko->rko_rktp = rd_kafka_toppar_keep(rktp);
	rko->rko_replyq = replyq;

        rd_kafka_q_enq(rktp->rktp_ops, rko);
}


/**
 * Send command op to toppar (handled by toppar's thread).
 *
 * Locality: any thread
 */
static void rd_kafka_toppar_op (rd_kafka_toppar_t *rktp,
				rd_kafka_op_type_t type, int32_t version,
				int64_t offset, rd_kafka_cgrp_t *rkcg,
				rd_kafka_replyq_t replyq) {
        rd_kafka_op_t *rko;

        rko = rd_kafka_op_new(type);
	rko->rko_version = version;
        if (type == RD_KAFKA_OP_FETCH_START ||
	    type == RD_KAFKA_OP_SEEK) {
		if (rkcg)
			rko->rko_u.fetch_start.rkcg = rkcg;
		rko->rko_u.fetch_start.offset = offset;
	}

	rd_kafka_toppar_op0(rktp, rko, replyq);
}



/**
 * Start consuming partition (async operation).
 *  'offset' is the initial offset
 *  'fwdq' is an optional queue to forward messages to, if this is NULL
 *  then messages will be enqueued on rktp_fetchq.
 *  'replyq' is an optional queue for handling the consume_start ack.
 *
 * This is the thread-safe interface that can be called from any thread.
 */
rd_kafka_resp_err_t rd_kafka_toppar_op_fetch_start (rd_kafka_toppar_t *rktp,
                                                    int64_t offset,
                                                    rd_kafka_q_t *fwdq,
                                                    rd_kafka_replyq_t replyq) {
	int32_t version;

        rd_kafka_q_lock(rktp->rktp_fetchq);
        if (fwdq && !(rktp->rktp_fetchq->rkq_flags & RD_KAFKA_Q_F_FWD_APP))
                rd_kafka_q_fwd_set0(rktp->rktp_fetchq, fwdq,
                                    0, /* no do_lock */
                                    0 /* no fwd_app */);
        rd_kafka_q_unlock(rktp->rktp_fetchq);

	/* Bump version barrier. */
	version = rd_kafka_toppar_version_new_barrier(rktp);

	rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "CONSUMER",
		     "Start consuming %.*s [%"PRId32"] at "
		     "offset %s (v%"PRId32")",
		     RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
		     rktp->rktp_partition, rd_kafka_offset2str(offset),
		     version);

        rd_kafka_toppar_op(rktp, RD_KAFKA_OP_FETCH_START, version,
                           offset, rktp->rktp_rkt->rkt_rk->rk_cgrp, replyq);

        return RD_KAFKA_RESP_ERR_NO_ERROR;
}


/**
 * Stop consuming partition (async operatoin)
 * This is thread-safe interface that can be called from any thread.
 *
 * Locality: any thread
 */
rd_kafka_resp_err_t rd_kafka_toppar_op_fetch_stop (rd_kafka_toppar_t *rktp,
                                                   rd_kafka_replyq_t replyq) {
	int32_t version;

	/* Bump version barrier. */
        version = rd_kafka_toppar_version_new_barrier(rktp);

        rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "CONSUMER",
		     "Stop consuming %.*s [%"PRId32"] (v%"PRId32")",
		     RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
		     rktp->rktp_partition, version);

        rd_kafka_toppar_op(rktp, RD_KAFKA_OP_FETCH_STOP, version,
			   0, NULL, replyq);

        return RD_KAFKA_RESP_ERR_NO_ERROR;
}


/**
 * Set/Seek offset of a consumed partition (async operation).
 *  'offset' is the target offset
 *  'replyq' is an optional queue for handling the ack.
 *
 * This is the thread-safe interface that can be called from any thread.
 */
rd_kafka_resp_err_t rd_kafka_toppar_op_seek (rd_kafka_toppar_t *rktp,
                                             int64_t offset,
                                             rd_kafka_replyq_t replyq) {
	int32_t version;

	/* Bump version barrier. */
	version = rd_kafka_toppar_version_new_barrier(rktp);

	rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "CONSUMER",
		     "Seek %.*s [%"PRId32"] to "
		     "offset %s (v%"PRId32")",
		     RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
		     rktp->rktp_partition, rd_kafka_offset2str(offset),
		     version);

        rd_kafka_toppar_op(rktp, RD_KAFKA_OP_SEEK, version,
			   offset, NULL, replyq);

        return RD_KAFKA_RESP_ERR_NO_ERROR;
}


/**
 * Pause/resume partition (async operation).
 * \p flag is either RD_KAFKA_TOPPAR_F_APP_PAUSE or .._F_LIB_PAUSE
 * depending on if the app paused or librdkafka.
 * \p pause is 1 for pausing or 0 for resuming.
 *
 * Locality: any
 */
static rd_kafka_resp_err_t
rd_kafka_toppar_op_pause_resume (rd_kafka_toppar_t *rktp,
				 int pause, int flag) {
	int32_t version;
	rd_kafka_op_t *rko;

	/* Bump version barrier. */
	version = rd_kafka_toppar_version_new_barrier(rktp);

	rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, pause ? "PAUSE":"RESUME",
		     "%s %.*s [%"PRId32"] (v%"PRId32")",
		     pause ? "Pause" : "Resume",
		     RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
		     rktp->rktp_partition, version);

	rko = rd_kafka_op_new(RD_KAFKA_OP_PAUSE);
	rko->rko_version = version;
	rko->rko_u.pause.pause = pause;
	rko->rko_u.pause.flag = flag;

	rd_kafka_toppar_op0(rktp, rko, RD_KAFKA_NO_REPLYQ);

        return RD_KAFKA_RESP_ERR_NO_ERROR;
}





/**
 * Pause or resume a list of partitions.
 * \p flag is either RD_KAFKA_TOPPAR_F_APP_PAUSE or .._F_LIB_PAUSE
 * depending on if the app paused or librdkafka.
 * \p pause is 1 for pausing or 0 for resuming.
 *
 * Locality: any
 *
 * @remark This is an asynchronous call, the actual pause/resume is performed
 *         by toppar_pause() in the toppar's handler thread.
 */
rd_kafka_resp_err_t
rd_kafka_toppars_pause_resume (rd_kafka_t *rk, int pause, int flag,
			       rd_kafka_topic_partition_list_t *partitions) {
	int i;

	rd_kafka_dbg(rk, TOPIC, pause ? "PAUSE":"RESUME",
		     "%s %s %d partition(s)",
		     flag & RD_KAFKA_TOPPAR_F_APP_PAUSE ? "Application" : "Library",
		     pause ? "pausing" : "resuming", partitions->cnt);

	for (i = 0 ; i < partitions->cnt ; i++) {
		rd_kafka_topic_partition_t *rktpar = &partitions->elems[i];
		shptr_rd_kafka_toppar_t *s_rktp;
		rd_kafka_toppar_t *rktp;

                s_rktp = rd_kafka_topic_partition_list_get_toppar(rk, rktpar);
		if (!s_rktp) {
			rd_kafka_dbg(rk, TOPIC, pause ? "PAUSE":"RESUME",
				     "%s %s [%"PRId32"]: skipped: "
				     "unknown partition",
				     pause ? "Pause":"Resume",
				     rktpar->topic, rktpar->partition);

			rktpar->err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
			continue;
		}

		rktp = rd_kafka_toppar_s2i(s_rktp);

		rd_kafka_toppar_op_pause_resume(rktp, pause, flag);

		rd_kafka_toppar_destroy(s_rktp);

		rktpar->err = RD_KAFKA_RESP_ERR_NO_ERROR;
	}

	return RD_KAFKA_RESP_ERR_NO_ERROR;
}





/**
 * Propagate error for toppar
 */
void rd_kafka_toppar_enq_error (rd_kafka_toppar_t *rktp,
                                rd_kafka_resp_err_t err,
                                const char *reason) {
        rd_kafka_op_t *rko;
        char buf[512];

        rko = rd_kafka_op_new(RD_KAFKA_OP_ERR);
        rko->rko_err  = err;
        rko->rko_rktp = rd_kafka_toppar_keep(rktp);

        rd_snprintf(buf, sizeof(buf), "%.*s [%"PRId32"]: %s (%s)",
                    RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
                    rktp->rktp_partition, reason,
                    rd_kafka_err2str(err));

        rko->rko_u.err.errstr = rd_strdup(buf);

        rd_kafka_q_enq(rktp->rktp_fetchq, rko);
}





/**
 * Returns the local leader broker for this toppar.
 * If \p proper_broker is set NULL will be returned if current handler
 * is not a proper broker (INTERNAL broker).
 *
 * The returned broker has an increased refcount.
 *
 * Locks: none
 */
rd_kafka_broker_t *rd_kafka_toppar_leader (rd_kafka_toppar_t *rktp,
                                           int proper_broker) {
        rd_kafka_broker_t *rkb;
        rd_kafka_toppar_lock(rktp);
        rkb = rktp->rktp_leader;
        if (rkb) {
                if (proper_broker && rkb->rkb_source == RD_KAFKA_INTERNAL)
                        rkb = NULL;
                else
                        rd_kafka_broker_keep(rkb);
        }
        rd_kafka_toppar_unlock(rktp);

        return rkb;
}


/**
 * @brief Take action when partition leader becomes unavailable.
 *        This should be called when leader-specific requests fail with
 *        NOT_LEADER_FOR.. or similar error codes, e.g. ProduceRequest.
 *
 * @locks none
 * @locality any
 */
void rd_kafka_toppar_leader_unavailable (rd_kafka_toppar_t *rktp,
                                         const char *reason,
                                         rd_kafka_resp_err_t err) {
        rd_kafka_itopic_t *rkt = rktp->rktp_rkt;

        rd_kafka_dbg(rkt->rkt_rk, TOPIC, "LEADERUA",
                     "%s [%"PRId32"]: leader unavailable: %s: %s",
                     rkt->rkt_topic->str, rktp->rktp_partition, reason,
                     rd_kafka_err2str(err));

        rd_kafka_topic_wrlock(rkt);
        rkt->rkt_flags |= RD_KAFKA_TOPIC_F_LEADER_UNAVAIL;
        rd_kafka_topic_wrunlock(rkt);

        rd_kafka_topic_fast_leader_query(rkt->rkt_rk);
}


const char *
rd_kafka_topic_partition_topic (const rd_kafka_topic_partition_t *rktpar) {
        const rd_kafka_toppar_t *rktp = (const rd_kafka_toppar_t *)rktpar;
        return rktp->rktp_rkt->rkt_topic->str;
}

int32_t
rd_kafka_topic_partition_partition (const rd_kafka_topic_partition_t *rktpar) {
        const rd_kafka_toppar_t *rktp = (const rd_kafka_toppar_t *)rktpar;
        return rktp->rktp_partition;
}

void rd_kafka_topic_partition_get (const rd_kafka_topic_partition_t *rktpar,
                                   const char **name, int32_t *partition) {
        const rd_kafka_toppar_t *rktp = (const rd_kafka_toppar_t *)rktpar;
        *name = rktp->rktp_rkt->rkt_topic->str;
        *partition = rktp->rktp_partition;
}




/**
 *
 * rd_kafka_topic_partition_t lists
 * Fixed-size non-growable list of partitions for propagation to application.
 *
 */


static void
rd_kafka_topic_partition_list_grow (rd_kafka_topic_partition_list_t *rktparlist,
                                    int add_size) {
        if (add_size < rktparlist->size)
                add_size = RD_MAX(rktparlist->size, 32);

        rktparlist->size += add_size;
        rktparlist->elems = rd_realloc(rktparlist->elems,
                                       sizeof(*rktparlist->elems) *
                                       rktparlist->size);

}
/**
 * Create a list for fitting 'size' topic_partitions (rktp).
 */
rd_kafka_topic_partition_list_t *rd_kafka_topic_partition_list_new (int size) {
        rd_kafka_topic_partition_list_t *rktparlist;

        rktparlist = rd_calloc(1, sizeof(*rktparlist));

        rktparlist->size = size;
        rktparlist->cnt = 0;

        if (size > 0)
                rd_kafka_topic_partition_list_grow(rktparlist, size);

        return rktparlist;
}



rd_kafka_topic_partition_t *rd_kafka_topic_partition_new (const char *topic,
							  int32_t partition) {
	rd_kafka_topic_partition_t *rktpar = rd_calloc(1, sizeof(*rktpar));

	rktpar->topic = rd_strdup(topic);
	rktpar->partition = partition;

	return rktpar;
}


rd_kafka_topic_partition_t *
rd_kafka_topic_partition_new_from_rktp (rd_kafka_toppar_t *rktp) {
	rd_kafka_topic_partition_t *rktpar = rd_calloc(1, sizeof(*rktpar));

	rktpar->topic = RD_KAFKAP_STR_DUP(rktp->rktp_rkt->rkt_topic);
	rktpar->partition = rktp->rktp_partition;

	return rktpar;
}



static void
rd_kafka_topic_partition_destroy0 (rd_kafka_topic_partition_t *rktpar, int do_free) {
	if (rktpar->topic)
		rd_free(rktpar->topic);
	if (rktpar->metadata)
		rd_free(rktpar->metadata);
	if (rktpar->_private)
		rd_kafka_toppar_destroy((shptr_rd_kafka_toppar_t *)
					rktpar->_private);

	if (do_free)
		rd_free(rktpar);
}

void rd_kafka_topic_partition_destroy (rd_kafka_topic_partition_t *rktpar) {
	rd_kafka_topic_partition_destroy0(rktpar, 1);
}


/**
 * Destroys a list previously created with .._list_new() and drops
 * any references to contained toppars.
 */
void
rd_kafka_topic_partition_list_destroy (rd_kafka_topic_partition_list_t *rktparlist) {
        int i;

        for (i = 0 ; i < rktparlist->cnt ; i++)
		rd_kafka_topic_partition_destroy0(&rktparlist->elems[i], 0);

        if (rktparlist->elems)
                rd_free(rktparlist->elems);

        rd_free(rktparlist);
}


/**
 * Add a partition to an rktpar list.
 * The list must have enough room to fit it.
 *
 * '_private' must be NULL or a valid 'shptr_rd_kafka_toppar_t *'.
 *
 * Returns a pointer to the added element.
 */
rd_kafka_topic_partition_t *
rd_kafka_topic_partition_list_add0 (rd_kafka_topic_partition_list_t *rktparlist,
                                    const char *topic, int32_t partition,
				    shptr_rd_kafka_toppar_t *_private) {
        rd_kafka_topic_partition_t *rktpar;
        if (rktparlist->cnt == rktparlist->size)
                rd_kafka_topic_partition_list_grow(rktparlist, 1);
        rd_kafka_assert(NULL, rktparlist->cnt < rktparlist->size);

        rktpar = &rktparlist->elems[rktparlist->cnt++];
        memset(rktpar, 0, sizeof(*rktpar));
        rktpar->topic = rd_strdup(topic);
        rktpar->partition = partition;
	rktpar->offset = RD_KAFKA_OFFSET_INVALID;
        rktpar->_private = _private;

        return rktpar;
}


rd_kafka_topic_partition_t *
rd_kafka_topic_partition_list_add (rd_kafka_topic_partition_list_t *rktparlist,
                                   const char *topic, int32_t partition) {
        return rd_kafka_topic_partition_list_add0(rktparlist,
                                                  topic, partition, NULL);
}


/**
 * Adds a consecutive list of partitions to a list
 */
void
rd_kafka_topic_partition_list_add_range (rd_kafka_topic_partition_list_t
                                         *rktparlist,
                                         const char *topic,
                                         int32_t start, int32_t stop) {

        for (; start <= stop ; start++)
                rd_kafka_topic_partition_list_add(rktparlist, topic, start);
}


rd_kafka_topic_partition_t *
rd_kafka_topic_partition_list_upsert (
        rd_kafka_topic_partition_list_t *rktparlist,
        const char *topic, int32_t partition) {
        rd_kafka_topic_partition_t *rktpar;

        if ((rktpar = rd_kafka_topic_partition_list_find(rktparlist,
                                                         topic, partition)))
                return rktpar;

        return rd_kafka_topic_partition_list_add(rktparlist, topic, partition);
}

/**
 * @brief Creates a copy of \p rktpar and adds it to \p rktparlist
 */
void
rd_kafka_topic_partition_copy (rd_kafka_topic_partition_list_t *rktparlist,
                               const rd_kafka_topic_partition_t *rktpar) {
        rd_kafka_topic_partition_t *dst;

        dst = rd_kafka_topic_partition_list_add0(
                rktparlist,
                rktpar->topic,
                rktpar->partition,
                rktpar->_private ?
                rd_kafka_toppar_keep(
                        rd_kafka_toppar_s2i((shptr_rd_kafka_toppar_t *)
                                            rktpar->_private)) : NULL);
        dst->offset = rktpar->offset;
        dst->opaque = rktpar->opaque;
        dst->err    = rktpar->err;
        if (rktpar->metadata_size > 0) {
                dst->metadata =
                        rd_malloc(rktpar->metadata_size);
                dst->metadata_size = rktpar->metadata_size;
                memcpy((void *)dst->metadata, rktpar->metadata,
                       rktpar->metadata_size);
        }
}



/**
 * Create and return a copy of list 'src'
 */
rd_kafka_topic_partition_list_t *
rd_kafka_topic_partition_list_copy (const rd_kafka_topic_partition_list_t *src){
        rd_kafka_topic_partition_list_t *dst;
        int i;

        dst = rd_kafka_topic_partition_list_new(src->size);

        for (i = 0 ; i < src->cnt ; i++)
                rd_kafka_topic_partition_copy(dst, &src->elems[i]);
        return dst;
}

/**
 * @returns (and sets if necessary) the \p rktpar's _private / toppar.
 * @remark a new reference is returned.
 */
shptr_rd_kafka_toppar_t *
rd_kafka_topic_partition_get_toppar (rd_kafka_t *rk,
                                     rd_kafka_topic_partition_t *rktpar) {
        shptr_rd_kafka_toppar_t *s_rktp;

        if (!(s_rktp = rktpar->_private))
                s_rktp = rktpar->_private =
                        rd_kafka_toppar_get2(rk,
                                             rktpar->topic,
                                             rktpar->partition, 0, 0);
        if (!s_rktp)
                return NULL;

        return rd_kafka_toppar_keep(rd_kafka_toppar_s2i(s_rktp));
}


static int rd_kafka_topic_partition_cmp (const void *_a, const void *_b,
                                         void *opaque) {
        const rd_kafka_topic_partition_t *a = _a;
        const rd_kafka_topic_partition_t *b = _b;
        int r = strcmp(a->topic, b->topic);
        if (r)
                return r;
        else
                return a->partition - b->partition;
}


/**
 * @brief Search 'rktparlist' for 'topic' and 'partition'.
 * @returns the elems[] index or -1 on miss.
 */
int
rd_kafka_topic_partition_list_find0 (rd_kafka_topic_partition_list_t *rktparlist,
				     const char *topic, int32_t partition) {
        rd_kafka_topic_partition_t skel;
        int i;

        skel.topic = (char *)topic;
        skel.partition = partition;

        for (i = 0 ; i < rktparlist->cnt ; i++) {
                if (!rd_kafka_topic_partition_cmp(&skel,
                                                  &rktparlist->elems[i],
                                                  NULL))
                        return i;
        }

        return -1;
}

rd_kafka_topic_partition_t *
rd_kafka_topic_partition_list_find (rd_kafka_topic_partition_list_t *rktparlist,
				     const char *topic, int32_t partition) {
	int i = rd_kafka_topic_partition_list_find0(rktparlist,
						    topic, partition);
	if (i == -1)
		return NULL;
	else
		return &rktparlist->elems[i];
}


int
rd_kafka_topic_partition_list_del_by_idx (rd_kafka_topic_partition_list_t *rktparlist,
					  int idx) {
	if (unlikely(idx < 0 || idx >= rktparlist->cnt))
		return 0;

	rktparlist->cnt--;
	rd_kafka_topic_partition_destroy0(&rktparlist->elems[idx], 0);
	memmove(&rktparlist->elems[idx], &rktparlist->elems[idx+1],
		(rktparlist->cnt - idx) * sizeof(rktparlist->elems[idx]));

	return 1;
}


int
rd_kafka_topic_partition_list_del (rd_kafka_topic_partition_list_t *rktparlist,
				   const char *topic, int32_t partition) {
	int i = rd_kafka_topic_partition_list_find0(rktparlist,
						    topic, partition);
	if (i == -1)
		return 0;

	return rd_kafka_topic_partition_list_del_by_idx(rktparlist, i);
}



/**
 * Returns true if 'topic' matches the 'rktpar', else false.
 * On match, if rktpar is a regex pattern then 'matched_by_regex' is set to 1.
 */
int rd_kafka_topic_partition_match (rd_kafka_t *rk,
				    const rd_kafka_group_member_t *rkgm,
				    const rd_kafka_topic_partition_t *rktpar,
				    const char *topic, int *matched_by_regex) {
	int ret = 0;

	if (*rktpar->topic == '^') {
		char errstr[128];

		ret = rd_regex_match(rktpar->topic, topic,
				     errstr, sizeof(errstr));
		if (ret == -1) {
			rd_kafka_dbg(rk, CGRP,
				     "SUBMATCH",
				     "Invalid regex for member "
				     "\"%.*s\" subscription \"%s\": %s",
				     RD_KAFKAP_STR_PR(rkgm->rkgm_member_id),
				     rktpar->topic, errstr);
			return 0;
		}

		if (ret && matched_by_regex)
			*matched_by_regex = 1;

	} else if (!strcmp(rktpar->topic, topic)) {

		if (matched_by_regex)
			*matched_by_regex = 0;

		ret = 1;
	}

	return ret;
}



void rd_kafka_topic_partition_list_sort (
        rd_kafka_topic_partition_list_t *rktparlist,
        int (*cmp) (const void *, const void *, void *),
        void *opaque) {

        if (!cmp)
                cmp = rd_kafka_topic_partition_cmp;

        rd_qsort_r(rktparlist->elems, rktparlist->cnt,
                   sizeof(*rktparlist->elems),
                   cmp, opaque);
}


void rd_kafka_topic_partition_list_sort_by_topic (
        rd_kafka_topic_partition_list_t *rktparlist) {
        rd_kafka_topic_partition_list_sort(rktparlist,
                                           rd_kafka_topic_partition_cmp, NULL);
}

rd_kafka_resp_err_t rd_kafka_topic_partition_list_set_offset (
	rd_kafka_topic_partition_list_t *rktparlist,
	const char *topic, int32_t partition, int64_t offset) {
	rd_kafka_topic_partition_t *rktpar;

	if (!(rktpar = rd_kafka_topic_partition_list_find(rktparlist,
							  topic, partition)))
		return RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;

	rktpar->offset = offset;

	return RD_KAFKA_RESP_ERR_NO_ERROR;
}


/**
 * @brief Reset all offsets to the provided value.
 */
void
rd_kafka_topic_partition_list_reset_offsets (rd_kafka_topic_partition_list_t *rktparlist,
					     int64_t offset) {

        int i;
        for (i = 0 ; i < rktparlist->cnt ; i++)
		rktparlist->elems[i].offset = offset;
}


/**
 * Set offset values in partition list based on toppar's last stored offset.
 *
 *  from_rktp - true: set rktp's last stored offset, false: set def_value
 *  unless a concrete offset is set.
 *  is_commit: indicates that set offset is to be committed (for debug log)
 *
 * Returns the number of valid non-logical offsets (>=0).
 */
int rd_kafka_topic_partition_list_set_offsets (
	rd_kafka_t *rk,
        rd_kafka_topic_partition_list_t *rktparlist,
        int from_rktp, int64_t def_value, int is_commit) {
        int i;
	int valid_cnt = 0;

        for (i = 0 ; i < rktparlist->cnt ; i++) {
                rd_kafka_topic_partition_t *rktpar = &rktparlist->elems[i];
		const char *verb = "setting";

                if (from_rktp) {
                        shptr_rd_kafka_toppar_t *s_rktp = rktpar->_private;
                        rd_kafka_toppar_t *rktp = rd_kafka_toppar_s2i(s_rktp);
                        rd_kafka_toppar_lock(rktp);

			rd_kafka_dbg(rk, CGRP | RD_KAFKA_DBG_TOPIC, "OFFSET",
				     "Topic %s [%"PRId32"]: "
				     "stored offset %"PRId64", committed "
				     "offset %"PRId64,
				     rktpar->topic, rktpar->partition,
				     rktp->rktp_stored_offset,
				     rktp->rktp_committed_offset);

			if (rktp->rktp_stored_offset >
			    rktp->rktp_committed_offset) {
				verb = "setting stored";
				rktpar->offset = rktp->rktp_stored_offset;
			} else {
				rktpar->offset = RD_KAFKA_OFFSET_INVALID;
			}
                        rd_kafka_toppar_unlock(rktp);
                } else {
			if (RD_KAFKA_OFFSET_IS_LOGICAL(rktpar->offset)) {
				verb = "setting default";
				rktpar->offset = def_value;
			} else
				verb = "keeping";
                }

		rd_kafka_dbg(rk, CGRP | RD_KAFKA_DBG_TOPIC, "OFFSET",
			     "Topic %s [%"PRId32"]: "
			     "%s offset %s%s",
			     rktpar->topic, rktpar->partition,
			     verb,
			     rd_kafka_offset2str(rktpar->offset),
			     is_commit ? " for commit" : "");

		if (!RD_KAFKA_OFFSET_IS_LOGICAL(rktpar->offset))
			valid_cnt++;
        }

	return valid_cnt;
}


/**
 * @returns the number of partitions with absolute (non-logical) offsets set.
 */
int rd_kafka_topic_partition_list_count_abs_offsets (
	const rd_kafka_topic_partition_list_t *rktparlist) {
	int i;
	int valid_cnt = 0;

        for (i = 0 ; i < rktparlist->cnt ; i++)
		if (!RD_KAFKA_OFFSET_IS_LOGICAL(rktparlist->elems[i].offset))
			valid_cnt++;

	return valid_cnt;
}

/**
 * @returns a new shared toppar pointer for partition at index 'idx',
 * or NULL if not set, not found, or out of range.
 *
 * @remark A new reference is returned.
 * @remark The _private field is set to the toppar it not previously set.
 */
shptr_rd_kafka_toppar_t *
rd_kafka_topic_partition_list_get_toppar (
        rd_kafka_t *rk, rd_kafka_topic_partition_t *rktpar) {
        shptr_rd_kafka_toppar_t *s_rktp;

        s_rktp = rd_kafka_topic_partition_get_toppar(rk, rktpar);
        if (!s_rktp)
                return NULL;

        return s_rktp;
}


/**
 * @brief Update _private (toppar) field to point to valid s_rktp
 *        for each parition.
 */
void
rd_kafka_topic_partition_list_update_toppars (rd_kafka_t *rk,
                                              rd_kafka_topic_partition_list_t
                                              *rktparlist) {
        int i;
        for (i = 0 ; i < rktparlist->cnt ; i++) {
                rd_kafka_topic_partition_t *rktpar = &rktparlist->elems[i];

                rd_kafka_topic_partition_list_get_toppar(rk, rktpar);
        }
}


/**
 * @brief Populate \p leaders with the leaders+partitions for the partitions in
 *        \p rktparlist. Duplicates are suppressed.
 *
 *        If no leader is found for a partition that element's \c .err will
 *        be set to RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE.
 *
 *        If the partition does not exist \c .err will be set to
 *        RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION.
 *
 * @param leaders rd_list_t of allocated (struct rd_kafka_partition_leader *)
 * @param query_topics (optional) rd_list of strdupped (char *)
 *
 * @remark This is based on the current topic_t and partition state
 *         which may lag behind the last metadata update due to internal
 *         threading and also the fact that no topic_t may have been created.
 *
 * @param leaders rd_list_t of type (struct rd_kafka_partition_leader *)
 *
 * @returns the number of leaders added.
 *
 * @sa rd_kafka_topic_partition_list_get_leaders_by_metadata
 *
 * @locks rd_kafka_*lock() MUST NOT be held
 */
int
rd_kafka_topic_partition_list_get_leaders (
        rd_kafka_t *rk,
        rd_kafka_topic_partition_list_t *rktparlist,
        rd_list_t *leaders,
        rd_list_t *query_topics) {
        int cnt = 0;
        int i;

        rd_kafka_rdlock(rk);

        for (i = 0 ; i < rktparlist->cnt ; i++) {
                rd_kafka_topic_partition_t *rktpar = &rktparlist->elems[i];
                rd_kafka_broker_t *rkb = NULL;
                struct rd_kafka_partition_leader leader_skel;
                struct rd_kafka_partition_leader *leader;
                const rd_kafka_metadata_topic_t *mtopic;
                const rd_kafka_metadata_partition_t *mpart;

                rd_kafka_metadata_cache_topic_partition_get(
                        rk, &mtopic, &mpart,
                        rktpar->topic, rktpar->partition, 1/*valid*/);

                if (mtopic &&
                    mtopic->err != RD_KAFKA_RESP_ERR_NO_ERROR &&
                    mtopic->err != RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE) {
                        /* Topic permanently errored */
                        rktpar->err = mtopic->err;
                        continue;
                }

                if (mtopic && !mpart && mtopic->partition_cnt > 0) {
                        /* Topic exists but partition doesnt.
                         * This is a permanent error. */
                        rktpar->err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
                        continue;
                }

                if (mpart &&
                    (mpart->leader == -1 ||
                     !(rkb = rd_kafka_broker_find_by_nodeid0(
                               rk, mpart->leader, -1/*any state*/)))) {
                        /* Partition has no (valid) leader */
                        rktpar->err =
                                mtopic->err ? mtopic->err :
                                RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE;
                }

                if (!mtopic || !rkb) {
                        /* Topic unknown or no current leader for partition,
                         * add topic to query list. */
                        if (query_topics &&
                            !rd_list_find(query_topics, rktpar->topic,
                                          (void *)strcmp))
                                rd_list_add(query_topics,
                                            rd_strdup(rktpar->topic));
                        continue;
                }

                /* Leader exists, add to leader list. */

                rktpar->err = RD_KAFKA_RESP_ERR_NO_ERROR;

                memset(&leader_skel, 0, sizeof(leader_skel));
                leader_skel.rkb = rkb;

                leader = rd_list_find(leaders, &leader_skel,
                                      rd_kafka_partition_leader_cmp);

                if (!leader) {
                        leader = rd_kafka_partition_leader_new(rkb);
                        rd_list_add(leaders, leader);
                        cnt++;
                }

                rd_kafka_topic_partition_copy(leader->partitions, rktpar);

                rd_kafka_broker_destroy(rkb);    /* loose refcount */
        }

        rd_kafka_rdunlock(rk);

        return cnt;

}




/**
 * @brief Get leaders for all partitions in \p rktparlist, querying metadata
 *        if needed.
 *
 * @param leaders is a pre-initialized (empty) list which will be populated
 *        with the leader brokers and their partitions
 *        (struct rd_kafka_partition_leader *)
 *
 * @returns an error code on error.
 *
 * @locks rd_kafka_*lock() MUST NOT be held
 */
rd_kafka_resp_err_t
rd_kafka_topic_partition_list_query_leaders (
        rd_kafka_t *rk,
        rd_kafka_topic_partition_list_t *rktparlist,
        rd_list_t *leaders, int timeout_ms) {
        rd_ts_t ts_end = rd_timeout_init(timeout_ms);
        rd_ts_t ts_query = 0;
        rd_ts_t now;
        int i = 0;

        /* Get all the partition leaders, try multiple times:
         * if there are no leaders after the first run fire off a leader
         * query and wait for broker state update before trying again,
         * keep trying and re-querying at increasing intervals until
         * success or timeout. */
        do {
                rd_list_t query_topics;
                int query_intvl;

                rd_list_init(&query_topics, rktparlist->cnt, rd_free);

                rd_kafka_topic_partition_list_get_leaders(
                        rk, rktparlist, leaders, &query_topics);

                if (rd_list_empty(&query_topics)) {
                        /* No remaining topics to query: leader-list complete.*/
                        rd_list_destroy(&query_topics);

                        /* No leader(s) for partitions means all partitions
                         * are unknown. */
                        if (rd_list_empty(leaders))
                                return RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;

                        return RD_KAFKA_RESP_ERR_NO_ERROR;
                }

                now = rd_clock();
                /*
                 * Missing leader for some partitions
                 */
                query_intvl = (i+1) * 100; /* add 100ms per iteration */
                if (query_intvl > 2*1000)
                        query_intvl = 2*1000; /* Cap to 2s */

                if (now >= ts_query + (query_intvl*1000)) {
                        /* Query metadata for missing leaders,
                         * possibly creating the topic. */
                        rd_kafka_metadata_refresh_topics(
                                rk, NULL, &query_topics, 1/*force*/,
                                "query partition leaders");
                        ts_query = now;
                } else {
                        /* Wait for broker ids to be updated from
                         * metadata refresh above. */
                        int wait_ms = rd_timeout_remains_limit(ts_end,
                                                               query_intvl);
                        rd_kafka_metadata_cache_wait_change(rk, wait_ms);
                }

                rd_list_destroy(&query_topics);

                i++;
        } while (ts_end == RD_POLL_INFINITE ||
                 now < ts_end); /* now is deliberately outdated here
                                 * since wait_change() will block.
                                 * This gives us one more chance to spin thru*/

        return RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE;
}


/**
 * @brief Populate \p rkts with the rd_kafka_itopic_t objects for the
 *        partitions in. Duplicates are suppressed.
 *
 * @returns the number of topics added.
 */
int
rd_kafka_topic_partition_list_get_topics (
        rd_kafka_t *rk,
        rd_kafka_topic_partition_list_t *rktparlist,
        rd_list_t *rkts) {
        int cnt = 0;

        int i;
        for (i = 0 ; i < rktparlist->cnt ; i++) {
                rd_kafka_topic_partition_t *rktpar = &rktparlist->elems[i];
                shptr_rd_kafka_toppar_t *s_rktp;
                rd_kafka_toppar_t *rktp;

                s_rktp = rd_kafka_topic_partition_get_toppar(rk, rktpar);
                if (!s_rktp) {
                        rktpar->err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
                        continue;
                }

                rktp = rd_kafka_toppar_s2i(s_rktp);

                if (!rd_list_find(rkts, rktp->rktp_s_rkt,
                                  rd_kafka_topic_cmp_s_rkt)) {
                        rd_list_add(rkts, rd_kafka_topic_keep(rktp->rktp_rkt));
                        cnt++;
                }

                rd_kafka_toppar_destroy(s_rktp);
        }

        return cnt;
}


/**
 * @brief Populate \p topics with the strdupped topic names in \p rktparlist.
 *        Duplicates are suppressed.
 *
 * @param include_regex: include regex topics
 *
 * @returns the number of topics added.
 */
int
rd_kafka_topic_partition_list_get_topic_names (
        const rd_kafka_topic_partition_list_t *rktparlist,
        rd_list_t *topics, int include_regex) {
        int cnt = 0;
        int i;

        for (i = 0 ; i < rktparlist->cnt ; i++) {
                const rd_kafka_topic_partition_t *rktpar = &rktparlist->elems[i];

                if (!include_regex && *rktpar->topic == '^')
                        continue;

                if (!rd_list_find(topics, rktpar->topic, (void *)strcmp)) {
                        rd_list_add(topics, rd_strdup(rktpar->topic));
                        cnt++;
                }
        }

        return cnt;
}


/**
 * @brief Create a copy of \p rktparlist only containing the partitions
 *        matched by \p match function.
 *
 * \p match shall return 1 for match, else 0.
 *
 * @returns a new list
 */
rd_kafka_topic_partition_list_t *rd_kafka_topic_partition_list_match (
        const rd_kafka_topic_partition_list_t *rktparlist,
        int (*match) (const void *elem, const void *opaque),
        void *opaque) {
        rd_kafka_topic_partition_list_t *newlist;
        int i;

        newlist = rd_kafka_topic_partition_list_new(0);

        for (i = 0 ; i < rktparlist->cnt ; i++) {
                const rd_kafka_topic_partition_t *rktpar =
                        &rktparlist->elems[i];

                if (!match(rktpar, opaque))
                        continue;

                rd_kafka_topic_partition_copy(newlist, rktpar);
        }

        return newlist;
}

void
rd_kafka_topic_partition_list_log (rd_kafka_t *rk, const char *fac, int dbg,
				   const rd_kafka_topic_partition_list_t *rktparlist) {
        int i;

	rd_kafka_dbg(rk, NONE|dbg, fac, "List with %d partition(s):",
		     rktparlist->cnt);
        for (i = 0 ; i < rktparlist->cnt ; i++) {
		const rd_kafka_topic_partition_t *rktpar =
			&rktparlist->elems[i];
		rd_kafka_dbg(rk, NONE|dbg, fac, " %s [%"PRId32"] offset %s%s%s",
			     rktpar->topic, rktpar->partition,
			     rd_kafka_offset2str(rktpar->offset),
			     rktpar->err ? ": error: " : "",
			     rktpar->err ? rd_kafka_err2str(rktpar->err) : "");
	}
}

/**
 * @returns a comma-separated list of partitions.
 */
const char *
rd_kafka_topic_partition_list_str (const rd_kafka_topic_partition_list_t *rktparlist,
                                   char *dest, size_t dest_size,
                                   int fmt_flags) {
        int i;
        size_t of = 0;
        int trunc = 0;

        for (i = 0 ; i < rktparlist->cnt ; i++) {
                const rd_kafka_topic_partition_t *rktpar =
                        &rktparlist->elems[i];
                char errstr[128];
                char offsetstr[32];
                int r;

                if (trunc) {
                        if (dest_size > 4)
                                rd_snprintf(&dest[dest_size-4], 4, "...");
                        break;
                }

                if (!rktpar->err && (fmt_flags & RD_KAFKA_FMT_F_ONLY_ERR))
                        continue;

                if (rktpar->err && !(fmt_flags & RD_KAFKA_FMT_F_NO_ERR))
                        rd_snprintf(errstr, sizeof(errstr),
                                    "(%s)", rd_kafka_err2str(rktpar->err));
                else
                        errstr[0] = '\0';

                if (rktpar->offset != RD_KAFKA_OFFSET_INVALID)
                        rd_snprintf(offsetstr, sizeof(offsetstr),
                                    "@%"PRId64, rktpar->offset);
                else
                        offsetstr[0] = '\0';

                r = rd_snprintf(&dest[of], dest_size-of,
                                "%s"
                                "%s[%"PRId32"]"
                                "%s"
                                "%s",
                                of == 0 ? "" : ", ",
                                rktpar->topic, rktpar->partition,
                                offsetstr,
                                errstr);

                if ((size_t)r >= dest_size-of)
                        trunc++;
                else
                        of += r;
        }

        return dest;
}



/**
 * @brief Update \p dst with info from \p src.
 *
 * Fields updated:
 *  - offset
 *  - err
 *
 * Will only partitions that are in both dst and src, other partitions will
 * remain unchanged.
 */
void
rd_kafka_topic_partition_list_update (rd_kafka_topic_partition_list_t *dst,
                                      const rd_kafka_topic_partition_list_t *src){
        int i;

        for (i = 0 ; i < dst->cnt ; i++) {
                rd_kafka_topic_partition_t *d = &dst->elems[i];
                rd_kafka_topic_partition_t *s;

                if (!(s = rd_kafka_topic_partition_list_find(
                              (rd_kafka_topic_partition_list_t *)src,
                              d->topic, d->partition)))
                        continue;

                d->offset = s->offset;
                d->err    = s->err;
        }
}


/**
 * @returns the sum of \p cb called for each element.
 */
size_t
rd_kafka_topic_partition_list_sum (
        const rd_kafka_topic_partition_list_t *rktparlist,
        size_t (*cb) (const rd_kafka_topic_partition_t *rktpar, void *opaque),
        void *opaque) {
        int i;
        size_t sum = 0;

        for (i = 0 ; i < rktparlist->cnt ; i++) {
                const rd_kafka_topic_partition_t *rktpar =
                        &rktparlist->elems[i];
                sum += cb(rktpar, opaque);
       }
        return sum;
}


/**
 * @brief Set \c .err field \p err on all partitions in list.
 */
void rd_kafka_topic_partition_list_set_err (
        rd_kafka_topic_partition_list_t *rktparlist,
        rd_kafka_resp_err_t err) {
        int i;

        for (i = 0 ; i < rktparlist->cnt ; i++)
                rktparlist->elems[i].err = err;
}


/**
 * @returns the number of wildcard/regex topics
 */
int rd_kafka_topic_partition_list_regex_cnt (
        const rd_kafka_topic_partition_list_t *rktparlist) {
        int i;
        int cnt = 0;

        for (i = 0 ; i < rktparlist->cnt ; i++) {
                const rd_kafka_topic_partition_t *rktpar =
                        &rktparlist->elems[i];
                cnt += *rktpar->topic == '^';
        }
        return cnt;
}