Blob Blame History Raw
/*
 * librdkafka - Apache Kafka C library
 *
 * Copyright (c) 2012-2015, Magnus Edenhill
 * All rights reserved.
 * 
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met: 
 * 
 * 1. Redistributions of source code must retain the above copyright notice,
 *    this list of conditions and the following disclaimer. 
 * 2. Redistributions in binary form must reproduce the above copyright notice,
 *    this list of conditions and the following disclaimer in the documentation
 *    and/or other materials provided with the distribution. 
 * 
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */

#include <stdarg.h>

#include "rdkafka_int.h"
#include "rdkafka_request.h"
#include "rdkafka_broker.h"
#include "rdkafka_offset.h"
#include "rdkafka_topic.h"
#include "rdkafka_partition.h"
#include "rdkafka_metadata.h"
#include "rdkafka_msgset.h"

#include "rdrand.h"
#include "rdstring.h"

/**
 * Kafka protocol request and response handling.
 * All of this code runs in the broker thread and uses op queues for
 * propagating results back to the various sub-systems operating in
 * other threads.
 */


/* RD_KAFKA_ERR_ACTION_.. to string map */
static const char *rd_kafka_actions_descs[] = {
        "Permanent",
        "Ignore",
        "Refresh",
        "Retry",
        "Inform",
        "Special",
        NULL,
};


/**
 * @brief Decide action(s) to take based on the returned error code.
 *
 * The optional var-args is a .._ACTION_END terminated list
 * of action,error tuples which overrides the general behaviour.
 * It is to be read as: for \p error, return \p action(s).
 *
 * @warning \p request, \p rkbuf and \p rkb may be NULL.
 */
int rd_kafka_err_action (rd_kafka_broker_t *rkb,
			 rd_kafka_resp_err_t err,
			 rd_kafka_buf_t *rkbuf,
			 rd_kafka_buf_t *request, ...) {
	va_list ap;
        int actions = 0;
	int exp_act;
        char actstr[64];

        if (!err)
                return 0;

	/* Match explicitly defined error mappings first. */
	va_start(ap, request);
	while ((exp_act = va_arg(ap, int))) {
		int exp_err = va_arg(ap, int);

		if (err == exp_err)
			actions |= exp_act;
	}
	va_end(ap);

        /* Explicit error match. */
        if (actions) {
                if (err && rkb && request)
                        rd_rkb_dbg(rkb, BROKER, "REQERR",
                                   "%sRequest failed: %s: explicit actions %s",
                                   rd_kafka_ApiKey2str(request->rkbuf_reqhdr.
                                                       ApiKey),
                                   rd_kafka_err2str(err),
                                   rd_flags2str(actstr, sizeof(actstr),
                                                rd_kafka_actions_descs,
                                                actions));

                return actions;
        }

	/* Default error matching */
        switch (err)
        {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
                break;
        case RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE:
        case RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION:
        case RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE:
        case RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE:
        case RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE:
        case RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP:
        case RD_KAFKA_RESP_ERR__WAIT_COORD:
                /* Request metadata information update */
                actions |= RD_KAFKA_ERR_ACTION_REFRESH;
                break;
        case RD_KAFKA_RESP_ERR__TIMED_OUT:
        case RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE:
                /* Client-side wait-response/in-queue timeout */
        case RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT:
                /* Broker-side request handling timeout */
        case RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS:
        case RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND:
                /* Temporary broker-side problem */
	case RD_KAFKA_RESP_ERR__TRANSPORT:
		/* Broker connection down */
		actions |= RD_KAFKA_ERR_ACTION_RETRY;
		break;
        case RD_KAFKA_RESP_ERR__DESTROY:
	case RD_KAFKA_RESP_ERR_INVALID_SESSION_TIMEOUT:
        case RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE:
        default:
                actions |= RD_KAFKA_ERR_ACTION_PERMANENT;
                break;
        }

        /* If no request buffer was specified, which might be the case
         * in certain error call chains, mask out the retry action. */
        if (!request)
                actions &= ~RD_KAFKA_ERR_ACTION_RETRY;


        if (err && actions && rkb && request)
                rd_rkb_dbg(rkb, BROKER, "REQERR",
                           "%sRequest failed: %s: actions %s",
                           rd_kafka_ApiKey2str(request->rkbuf_reqhdr.ApiKey),
                           rd_kafka_err2str(err),
                           rd_flags2str(actstr, sizeof(actstr),
                                        rd_kafka_actions_descs, actions));

        return actions;
}


/**
 * Send GroupCoordinatorRequest
 */
void rd_kafka_GroupCoordinatorRequest (rd_kafka_broker_t *rkb,
                                       const rd_kafkap_str_t *cgrp,
                                       rd_kafka_replyq_t replyq,
                                       rd_kafka_resp_cb_t *resp_cb,
                                       void *opaque) {
        rd_kafka_buf_t *rkbuf;

        rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_GroupCoordinator, 1,
                                         RD_KAFKAP_STR_SIZE(cgrp));
        rd_kafka_buf_write_kstr(rkbuf, cgrp);

        rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
}




/**
 * @brief Parses and handles Offset replies.
 *
 * Returns the parsed offsets (and errors) in \p offsets
 *
 * @returns 0 on success, else an error.
 */
rd_kafka_resp_err_t rd_kafka_handle_Offset (rd_kafka_t *rk,
                                            rd_kafka_broker_t *rkb,
                                            rd_kafka_resp_err_t err,
                                            rd_kafka_buf_t *rkbuf,
                                            rd_kafka_buf_t *request,
                                            rd_kafka_topic_partition_list_t
                                            *offsets) {

        const int log_decode_errors = LOG_ERR;
        int16_t ErrorCode = 0;
        int32_t TopicArrayCnt;
        int actions;
        int16_t api_version;

        if (err) {
                ErrorCode = err;
                goto err;
        }

        api_version = request->rkbuf_reqhdr.ApiVersion;

        /* NOTE:
         * Broker may return offsets in a different constellation than
         * in the original request .*/

        rd_kafka_buf_read_i32(rkbuf, &TopicArrayCnt);
        while (TopicArrayCnt-- > 0) {
                rd_kafkap_str_t ktopic;
                int32_t PartArrayCnt;
                char *topic_name;

                rd_kafka_buf_read_str(rkbuf, &ktopic);
                rd_kafka_buf_read_i32(rkbuf, &PartArrayCnt);

                RD_KAFKAP_STR_DUPA(&topic_name, &ktopic);

                while (PartArrayCnt-- > 0) {
                        int32_t kpartition;
                        int32_t OffsetArrayCnt;
                        int64_t Offset = -1;
                        rd_kafka_topic_partition_t *rktpar;

                        rd_kafka_buf_read_i32(rkbuf, &kpartition);
                        rd_kafka_buf_read_i16(rkbuf, &ErrorCode);

                        if (api_version == 1) {
                                int64_t Timestamp;
                                rd_kafka_buf_read_i64(rkbuf, &Timestamp);
                                rd_kafka_buf_read_i64(rkbuf, &Offset);
                        } else if (api_version == 0) {
                                rd_kafka_buf_read_i32(rkbuf, &OffsetArrayCnt);
                                /* We only request one offset so just grab
                                 * the first one. */
                                while (OffsetArrayCnt-- > 0)
                                        rd_kafka_buf_read_i64(rkbuf, &Offset);
                        } else {
                                rd_kafka_assert(NULL, !*"NOTREACHED");
                        }

                        rktpar = rd_kafka_topic_partition_list_add(
                                offsets, topic_name, kpartition);
                        rktpar->err = ErrorCode;
                        rktpar->offset = Offset;
                }
        }

        goto done;

 err_parse:
        ErrorCode = rkbuf->rkbuf_err;
 err:
        actions = rd_kafka_err_action(
                rkb, ErrorCode, rkbuf, request,
                RD_KAFKA_ERR_ACTION_PERMANENT,
                RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART,

                RD_KAFKA_ERR_ACTION_REFRESH|RD_KAFKA_ERR_ACTION_RETRY,
                RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION,

                RD_KAFKA_ERR_ACTION_END);

        if (actions & RD_KAFKA_ERR_ACTION_REFRESH) {
                char tmp[256];
                /* Re-query for leader */
                rd_snprintf(tmp, sizeof(tmp),
                            "OffsetRequest failed: %s",
                            rd_kafka_err2str(ErrorCode));
                rd_kafka_metadata_refresh_known_topics(rk, NULL, 1/*force*/,
                                                       tmp);
        }

        if (actions & RD_KAFKA_ERR_ACTION_RETRY) {
                if (rd_kafka_buf_retry(rkb, request))
                        return RD_KAFKA_RESP_ERR__IN_PROGRESS;
                /* FALLTHRU */
        }

done:
        return ErrorCode;
}






/**
 * Send OffsetRequest for toppar 'rktp'.
 */
void rd_kafka_OffsetRequest (rd_kafka_broker_t *rkb,
                             rd_kafka_topic_partition_list_t *partitions,
                             int16_t api_version,
                             rd_kafka_replyq_t replyq,
                             rd_kafka_resp_cb_t *resp_cb,
                             void *opaque) {
        rd_kafka_buf_t *rkbuf;
        int i;
        size_t of_TopicArrayCnt = 0, of_PartArrayCnt = 0;
        const char *last_topic = "";
        int32_t topic_cnt = 0, part_cnt = 0;

        rd_kafka_topic_partition_list_sort_by_topic(partitions);

        rkbuf = rd_kafka_buf_new_request(
                rkb, RD_KAFKAP_Offset, 1,
                /* ReplicaId+TopicArrayCnt+Topic */
                4+4+100+
                /* PartArrayCnt */
                4 +
                /* partition_cnt * Partition+Time+MaxNumOffs */
                (partitions->cnt * (4+8+4)));

        /* ReplicaId */
        rd_kafka_buf_write_i32(rkbuf, -1);
        /* TopicArrayCnt */
        of_TopicArrayCnt = rd_kafka_buf_write_i32(rkbuf, 0); /* updated later */

        for (i = 0 ; i < partitions->cnt ; i++) {
                const rd_kafka_topic_partition_t *rktpar = &partitions->elems[i];

                if (strcmp(rktpar->topic, last_topic)) {
                        /* Finish last topic, if any. */
                        if (of_PartArrayCnt > 0)
                                rd_kafka_buf_update_i32(rkbuf,
                                                        of_PartArrayCnt,
                                                        part_cnt);

                        /* Topic */
                        rd_kafka_buf_write_str(rkbuf, rktpar->topic, -1);
                        topic_cnt++;
                        last_topic = rktpar->topic;
                        /* New topic so reset partition count */
                        part_cnt = 0;

                        /* PartitionArrayCnt: updated later */
                        of_PartArrayCnt = rd_kafka_buf_write_i32(rkbuf, 0);
                }

                /* Partition */
                rd_kafka_buf_write_i32(rkbuf, rktpar->partition);
                part_cnt++;

                /* Time/Offset */
                rd_kafka_buf_write_i64(rkbuf, rktpar->offset);

                if (api_version == 0) {
                        /* MaxNumberOfOffsets */
                        rd_kafka_buf_write_i32(rkbuf, 1);
                }
        }

        if (of_PartArrayCnt > 0) {
                rd_kafka_buf_update_i32(rkbuf, of_PartArrayCnt, part_cnt);
                rd_kafka_buf_update_i32(rkbuf, of_TopicArrayCnt, topic_cnt);
        }

        rd_kafka_buf_ApiVersion_set(rkbuf, api_version,
                                    api_version == 1 ?
                                    RD_KAFKA_FEATURE_OFFSET_TIME : 0);

        rd_rkb_dbg(rkb, TOPIC, "OFFSET",
                   "OffsetRequest (v%hd, opv %d) "
                   "for %"PRId32" topic(s) and %"PRId32" partition(s)",
                   api_version, rkbuf->rkbuf_replyq.version,
                   topic_cnt, partitions->cnt);

        rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
}


/**
 * Generic handler for OffsetFetch responses.
 * Offsets for included partitions will be propagated through the passed
 * 'offsets' list.
 *
 * \p update_toppar: update toppar's committed_offset
 */
rd_kafka_resp_err_t
rd_kafka_handle_OffsetFetch (rd_kafka_t *rk,
			     rd_kafka_broker_t *rkb,
			     rd_kafka_resp_err_t err,
			     rd_kafka_buf_t *rkbuf,
			     rd_kafka_buf_t *request,
			     rd_kafka_topic_partition_list_t *offsets,
			     int update_toppar) {
        const int log_decode_errors = LOG_ERR;
        int32_t TopicArrayCnt;
        int64_t offset = RD_KAFKA_OFFSET_INVALID;
        rd_kafkap_str_t metadata;
        int i;
        int actions;
        int seen_cnt = 0;

        if (err)
                goto err;

        /* Set default offset for all partitions. */
        rd_kafka_topic_partition_list_set_offsets(rkb->rkb_rk, offsets, 0,
                                                  RD_KAFKA_OFFSET_INVALID,
						  0 /* !is commit */);

        rd_kafka_buf_read_i32(rkbuf, &TopicArrayCnt);
        for (i = 0 ; i < TopicArrayCnt ; i++) {
                rd_kafkap_str_t topic;
                int32_t PartArrayCnt;
                char *topic_name;
                int j;

                rd_kafka_buf_read_str(rkbuf, &topic);
                rd_kafka_buf_read_i32(rkbuf, &PartArrayCnt);

                RD_KAFKAP_STR_DUPA(&topic_name, &topic);

                for (j = 0 ; j < PartArrayCnt ; j++) {
                        int32_t partition;
                        shptr_rd_kafka_toppar_t *s_rktp;
                        rd_kafka_topic_partition_t *rktpar;
                        int16_t err2;

                        rd_kafka_buf_read_i32(rkbuf, &partition);
                        rd_kafka_buf_read_i64(rkbuf, &offset);
                        rd_kafka_buf_read_str(rkbuf, &metadata);
                        rd_kafka_buf_read_i16(rkbuf, &err2);

                        rktpar = rd_kafka_topic_partition_list_find(offsets,
                                                                    topic_name,
                                                                    partition);
                        if (!rktpar) {
				rd_rkb_dbg(rkb, TOPIC, "OFFSETFETCH",
					   "OffsetFetchResponse: %s [%"PRId32"] "
					   "not found in local list: ignoring",
					   topic_name, partition);
                                continue;
			}

                        seen_cnt++;

			if (!(s_rktp = rktpar->_private)) {
				s_rktp = rd_kafka_toppar_get2(rkb->rkb_rk,
							      topic_name,
							      partition, 0, 0);
				/* May be NULL if topic is not locally known */
				rktpar->_private = s_rktp;
			}

			/* broker reports invalid offset as -1 */
			if (offset == -1)
				rktpar->offset = RD_KAFKA_OFFSET_INVALID;
			else
				rktpar->offset = offset;
                        rktpar->err = err2;

			rd_rkb_dbg(rkb, TOPIC, "OFFSETFETCH",
				   "OffsetFetchResponse: %s [%"PRId32"] offset %"PRId64,
				   topic_name, partition, offset);

			if (update_toppar && !err2 && s_rktp) {
				rd_kafka_toppar_t *rktp = rd_kafka_toppar_s2i(s_rktp);
				/* Update toppar's committed offset */
				rd_kafka_toppar_lock(rktp);
				rktp->rktp_committed_offset = rktpar->offset;
				rd_kafka_toppar_unlock(rktp);
			}


                        if (rktpar->metadata)
                                rd_free(rktpar->metadata);

                        if (RD_KAFKAP_STR_IS_NULL(&metadata)) {
                                rktpar->metadata = NULL;
                                rktpar->metadata_size = 0;
                        } else {
                                rktpar->metadata = RD_KAFKAP_STR_DUP(&metadata);
                                rktpar->metadata_size =
                                        RD_KAFKAP_STR_LEN(&metadata);
                        }
                }
        }


err:
        rd_rkb_dbg(rkb, TOPIC, "OFFFETCH",
                   "OffsetFetch for %d/%d partition(s) returned %s",
                   seen_cnt,
                   offsets ? offsets->cnt : -1, rd_kafka_err2str(err));

        actions = rd_kafka_err_action(rkb, err, rkbuf, request,
				      RD_KAFKA_ERR_ACTION_END);

        if (actions & RD_KAFKA_ERR_ACTION_REFRESH) {
                /* Re-query for coordinator */
                rd_kafka_cgrp_op(rkb->rkb_rk->rk_cgrp, NULL,
                                 RD_KAFKA_NO_REPLYQ,
				 RD_KAFKA_OP_COORD_QUERY, err);
        }

        if (actions & RD_KAFKA_ERR_ACTION_RETRY) {
                if (rd_kafka_buf_retry(rkb, request))
                        return RD_KAFKA_RESP_ERR__IN_PROGRESS;
                /* FALLTHRU */
        }

	return err;

 err_parse:
        err = rkbuf->rkbuf_err;
        goto err;
}



/**
 * @brief Handle OffsetFetch response based on an RD_KAFKA_OP_OFFSET_FETCH
 *        rko in \p opaque.
 *
 * @param opaque rko wrapper for handle_OffsetFetch.
 *
 * The \c rko->rko_u.offset_fetch.partitions list will be filled in with
 * the fetched offsets.
 *
 * A reply will be sent on 'rko->rko_replyq' with type RD_KAFKA_OP_OFFSET_FETCH.
 *
 * @remark \p rkb, \p rkbuf and \p request are optional.
 *
 * @remark The \p request buffer may be retried on error.
 *
 * @locality cgrp's broker thread
 */
void rd_kafka_op_handle_OffsetFetch (rd_kafka_t *rk,
				     rd_kafka_broker_t *rkb,
                                     rd_kafka_resp_err_t err,
                                     rd_kafka_buf_t *rkbuf,
                                     rd_kafka_buf_t *request,
                                     void *opaque) {
        rd_kafka_op_t *rko = opaque;
        rd_kafka_op_t *rko_reply;
        rd_kafka_topic_partition_list_t *offsets;

	RD_KAFKA_OP_TYPE_ASSERT(rko, RD_KAFKA_OP_OFFSET_FETCH);

        if (err == RD_KAFKA_RESP_ERR__DESTROY) {
                /* Termination, quick cleanup. */
                rd_kafka_op_destroy(rko);
                return;
        }

        offsets = rd_kafka_topic_partition_list_copy(
                rko->rko_u.offset_fetch.partitions);

        /* If all partitions already had usable offsets then there
         * was no request sent and thus no reply, the offsets list is
         * good to go.. */
        if (rkbuf) {
                /* ..else parse the response (or perror) */
                err = rd_kafka_handle_OffsetFetch(rkb->rkb_rk, rkb, err, rkbuf,
                                                  request, offsets, 0);
                if (err == RD_KAFKA_RESP_ERR__IN_PROGRESS) {
                        rd_kafka_topic_partition_list_destroy(offsets);
                        return; /* Retrying */
                }
        }

        rko_reply = rd_kafka_op_new(RD_KAFKA_OP_OFFSET_FETCH|RD_KAFKA_OP_REPLY);
        rko_reply->rko_err = err;
        rko_reply->rko_u.offset_fetch.partitions = offsets;
        rko_reply->rko_u.offset_fetch.do_free = 1;
	if (rko->rko_rktp)
		rko_reply->rko_rktp = rd_kafka_toppar_keep(
			rd_kafka_toppar_s2i(rko->rko_rktp));

	rd_kafka_replyq_enq(&rko->rko_replyq, rko_reply, 0);

        rd_kafka_op_destroy(rko);
}






/**
 * Send OffsetFetchRequest for toppar.
 *
 * Any partition with a usable offset will be ignored, if all partitions
 * have usable offsets then no request is sent at all but an empty
 * reply is enqueued on the replyq.
 */
void rd_kafka_OffsetFetchRequest (rd_kafka_broker_t *rkb,
                                  int16_t api_version,
                                  rd_kafka_topic_partition_list_t *parts,
				  rd_kafka_replyq_t replyq,
                                  rd_kafka_resp_cb_t *resp_cb,
                                  void *opaque) {
	rd_kafka_buf_t *rkbuf;
        size_t of_TopicCnt;
        int TopicCnt = 0;
        ssize_t of_PartCnt = -1;
        const char *last_topic = NULL;
        int PartCnt = 0;
	int tot_PartCnt = 0;
        int i;

        rkbuf = rd_kafka_buf_new_request(
                rkb, RD_KAFKAP_OffsetFetch, 1,
                RD_KAFKAP_STR_SIZE(rkb->rkb_rk->rk_group_id) +
                4 +
                (parts->cnt * 32));


        /* ConsumerGroup */
        rd_kafka_buf_write_kstr(rkbuf, rkb->rkb_rk->rk_group_id);

        /* Sort partitions by topic */
        rd_kafka_topic_partition_list_sort_by_topic(parts);

	/* TopicArrayCnt */
        of_TopicCnt = rd_kafka_buf_write_i32(rkbuf, 0); /* Updated later */

        for (i = 0 ; i < parts->cnt ; i++) {
                rd_kafka_topic_partition_t *rktpar = &parts->elems[i];

		/* Ignore partitions with a usable offset. */
		if (rktpar->offset != RD_KAFKA_OFFSET_INVALID &&
		    rktpar->offset != RD_KAFKA_OFFSET_STORED) {
			rd_rkb_dbg(rkb, TOPIC, "OFFSET",
				   "OffsetFetchRequest: skipping %s [%"PRId32"] "
				   "with valid offset %s",
				   rktpar->topic, rktpar->partition,
				   rd_kafka_offset2str(rktpar->offset));
			continue;
		}

                if (last_topic == NULL || strcmp(last_topic, rktpar->topic)) {
                        /* New topic */

                        /* Finalize previous PartitionCnt */
                        if (PartCnt > 0)
                                rd_kafka_buf_update_u32(rkbuf, of_PartCnt,
                                                        PartCnt);

                        /* TopicName */
                        rd_kafka_buf_write_str(rkbuf, rktpar->topic, -1);
                        /* PartitionCnt, finalized later */
                        of_PartCnt = rd_kafka_buf_write_i32(rkbuf, 0);
                        PartCnt = 0;
			last_topic = rktpar->topic;
                        TopicCnt++;
                }

                /* Partition */
                rd_kafka_buf_write_i32(rkbuf,  rktpar->partition);
                PartCnt++;
		tot_PartCnt++;
        }

        /* Finalize previous PartitionCnt */
        if (PartCnt > 0)
                rd_kafka_buf_update_u32(rkbuf, of_PartCnt,  PartCnt);

        /* Finalize TopicCnt */
        rd_kafka_buf_update_u32(rkbuf, of_TopicCnt, TopicCnt);

        rd_kafka_buf_ApiVersion_set(rkbuf, api_version, 0);

	rd_rkb_dbg(rkb, TOPIC, "OFFSET",
		   "OffsetFetchRequest(v%d) for %d/%d partition(s)",
                   api_version, tot_PartCnt, parts->cnt);

	if (tot_PartCnt == 0) {
		/* No partitions needs OffsetFetch, enqueue empty
		 * response right away. */
                rkbuf->rkbuf_replyq = replyq;
                rkbuf->rkbuf_cb     = resp_cb;
                rkbuf->rkbuf_opaque = opaque;
		rd_kafka_buf_callback(rkb->rkb_rk, rkb, 0, NULL, rkbuf);
		return;
	}

        rd_rkb_dbg(rkb, CGRP|RD_KAFKA_DBG_CONSUMER, "OFFSET",
                   "Fetch committed offsets for %d/%d partition(s)",
                   tot_PartCnt, parts->cnt);


	rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
}


/**
 * @remark \p offsets may be NULL if \p err is set
 */
rd_kafka_resp_err_t
rd_kafka_handle_OffsetCommit (rd_kafka_t *rk,
			      rd_kafka_broker_t *rkb,
			      rd_kafka_resp_err_t err,
			      rd_kafka_buf_t *rkbuf,
			      rd_kafka_buf_t *request,
			      rd_kafka_topic_partition_list_t *offsets) {
        const int log_decode_errors = LOG_ERR;
        int32_t TopicArrayCnt;
        int16_t ErrorCode = 0, last_ErrorCode = 0;
	int errcnt = 0;
        int i;
	int actions;

        if (err)
		goto err;

        rd_kafka_buf_read_i32(rkbuf, &TopicArrayCnt);
        for (i = 0 ; i < TopicArrayCnt ; i++) {
                rd_kafkap_str_t topic;
                char *topic_str;
                int32_t PartArrayCnt;
                int j;

                rd_kafka_buf_read_str(rkbuf, &topic);
                rd_kafka_buf_read_i32(rkbuf, &PartArrayCnt);

                RD_KAFKAP_STR_DUPA(&topic_str, &topic);

                for (j = 0 ; j < PartArrayCnt ; j++) {
                        int32_t partition;
                        rd_kafka_topic_partition_t *rktpar;

                        rd_kafka_buf_read_i32(rkbuf, &partition);
                        rd_kafka_buf_read_i16(rkbuf, &ErrorCode);

                        rktpar = rd_kafka_topic_partition_list_find(
                                offsets, topic_str, partition);

                        if (!rktpar) {
                                /* Received offset for topic/partition we didn't
                                 * ask for, this shouldn't really happen. */
                                continue;
                        }

                        rktpar->err = ErrorCode;
			if (ErrorCode) {
				last_ErrorCode = ErrorCode;
				errcnt++;
			}
                }
        }

	/* If all partitions failed use error code
	 * from last partition as the global error. */
	if (offsets && errcnt == offsets->cnt)
		err = last_ErrorCode;
	goto done;

 err_parse:
        err = rkbuf->rkbuf_err;

 err:
        actions = rd_kafka_err_action(
		rkb, err, rkbuf, request,

		RD_KAFKA_ERR_ACTION_PERMANENT,
		RD_KAFKA_RESP_ERR_OFFSET_METADATA_TOO_LARGE,

		RD_KAFKA_ERR_ACTION_RETRY,
		RD_KAFKA_RESP_ERR_GROUP_LOAD_IN_PROGRESS,

		RD_KAFKA_ERR_ACTION_REFRESH|RD_KAFKA_ERR_ACTION_SPECIAL,
		RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE,

		RD_KAFKA_ERR_ACTION_REFRESH|RD_KAFKA_ERR_ACTION_SPECIAL,
		RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP,

		RD_KAFKA_ERR_ACTION_REFRESH|RD_KAFKA_ERR_ACTION_RETRY,
		RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION,

		RD_KAFKA_ERR_ACTION_REFRESH|RD_KAFKA_ERR_ACTION_RETRY,
		RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID,

		RD_KAFKA_ERR_ACTION_RETRY,
		RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS,

		RD_KAFKA_ERR_ACTION_PERMANENT,
		RD_KAFKA_RESP_ERR_INVALID_COMMIT_OFFSET_SIZE,

		RD_KAFKA_ERR_ACTION_PERMANENT,
		RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED,

		RD_KAFKA_ERR_ACTION_PERMANENT,
		RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED,

		RD_KAFKA_ERR_ACTION_END);

	if (actions & RD_KAFKA_ERR_ACTION_REFRESH && rk->rk_cgrp) {
		/* Mark coordinator dead or re-query for coordinator.
		 * ..dead() will trigger a re-query. */
		if (actions & RD_KAFKA_ERR_ACTION_SPECIAL)
			rd_kafka_cgrp_coord_dead(rk->rk_cgrp, err,
						 "OffsetCommitRequest failed");
		else
			rd_kafka_cgrp_coord_query(rk->rk_cgrp,
						  "OffsetCommitRequest failed");
	}
	if (actions & RD_KAFKA_ERR_ACTION_RETRY) {
		if (rd_kafka_buf_retry(rkb, request))
			return RD_KAFKA_RESP_ERR__IN_PROGRESS;
		/* FALLTHRU */
	}

 done:
	return err;
}




/**
 * @brief Send OffsetCommitRequest for a list of partitions.
 *
 * @returns 0 if none of the partitions in \p offsets had valid offsets,
 *          else 1.
 */
int rd_kafka_OffsetCommitRequest (rd_kafka_broker_t *rkb,
                                   rd_kafka_cgrp_t *rkcg,
                                   int16_t api_version,
                                   rd_kafka_topic_partition_list_t *offsets,
                                   rd_kafka_replyq_t replyq,
                                   rd_kafka_resp_cb_t *resp_cb,
                                   void *opaque, const char *reason) {
	rd_kafka_buf_t *rkbuf;
        ssize_t of_TopicCnt = -1;
        int TopicCnt = 0;
        const char *last_topic = NULL;
        ssize_t of_PartCnt = -1;
        int PartCnt = 0;
	int tot_PartCnt = 0;
        int i;

        rd_kafka_assert(NULL, offsets != NULL);

        rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_OffsetCommit,
                                         1, 100 + (offsets->cnt * 128));

        /* ConsumerGroup */
        rd_kafka_buf_write_kstr(rkbuf, rkcg->rkcg_group_id);

        /* v1,v2 */
        if (api_version >= 1) {
                /* ConsumerGroupGenerationId */
                rd_kafka_buf_write_i32(rkbuf, rkcg->rkcg_generation_id);
                /* ConsumerId */
                rd_kafka_buf_write_kstr(rkbuf, rkcg->rkcg_member_id);
                /* v2: RetentionTime */
                if (api_version == 2)
                        rd_kafka_buf_write_i64(rkbuf, -1);
        }

        /* Sort offsets by topic */
        rd_kafka_topic_partition_list_sort_by_topic(offsets);

        /* TopicArrayCnt: Will be updated when we know the number of topics. */
        of_TopicCnt = rd_kafka_buf_write_i32(rkbuf, 0);

        for (i = 0 ; i < offsets->cnt ; i++) {
                rd_kafka_topic_partition_t *rktpar = &offsets->elems[i];

		/* Skip partitions with invalid offset. */
		if (rktpar->offset < 0)
			continue;

                if (last_topic == NULL || strcmp(last_topic, rktpar->topic)) {
                        /* New topic */

                        /* Finalize previous PartitionCnt */
                        if (PartCnt > 0)
                                rd_kafka_buf_update_u32(rkbuf, of_PartCnt,
                                                        PartCnt);

                        /* TopicName */
                        rd_kafka_buf_write_str(rkbuf, rktpar->topic, -1);
                        /* PartitionCnt, finalized later */
                        of_PartCnt = rd_kafka_buf_write_i32(rkbuf, 0);
                        PartCnt = 0;
			last_topic = rktpar->topic;
                        TopicCnt++;
                }

                /* Partition */
                rd_kafka_buf_write_i32(rkbuf,  rktpar->partition);
                PartCnt++;
		tot_PartCnt++;

                /* Offset */
                rd_kafka_buf_write_i64(rkbuf, rktpar->offset);

                /* v1: TimeStamp */
                if (api_version == 1)
                        rd_kafka_buf_write_i64(rkbuf, -1);// FIXME: retention time

                /* Metadata */
		/* Java client 0.9.0 and broker <0.10.0 can't parse
		 * Null metadata fields, so as a workaround we send an
		 * empty string if it's Null. */
		if (!rktpar->metadata)
			rd_kafka_buf_write_str(rkbuf, "", 0);
		else
			rd_kafka_buf_write_str(rkbuf,
					       rktpar->metadata,
					       rktpar->metadata_size);
        }

	if (tot_PartCnt == 0) {
		/* No topic+partitions had valid offsets to commit. */
		rd_kafka_replyq_destroy(&replyq);
		rd_kafka_buf_destroy(rkbuf);
		return 0;
	}

        /* Finalize previous PartitionCnt */
        if (PartCnt > 0)
                rd_kafka_buf_update_u32(rkbuf, of_PartCnt,  PartCnt);

        /* Finalize TopicCnt */
        rd_kafka_buf_update_u32(rkbuf, of_TopicCnt, TopicCnt);

        rd_kafka_buf_ApiVersion_set(rkbuf, api_version, 0);

        rd_rkb_dbg(rkb, TOPIC, "OFFSET",
                   "Enqueue OffsetCommitRequest(v%d, %d/%d partition(s))): %s",
                   api_version, tot_PartCnt, offsets->cnt, reason);

	rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);

	return 1;

}



/**
 * @brief Write "consumer" protocol type MemberState for SyncGroupRequest to
 *        enveloping buffer \p rkbuf.
 */
static void rd_kafka_group_MemberState_consumer_write (
        rd_kafka_buf_t *env_rkbuf,
        const rd_kafka_group_member_t *rkgm) {
        rd_kafka_buf_t *rkbuf;
        int i;
        const char *last_topic = NULL;
        size_t of_TopicCnt;
        ssize_t of_PartCnt = -1;
        int TopicCnt = 0;
        int PartCnt = 0;
        rd_slice_t slice;

        rkbuf = rd_kafka_buf_new(1, 100);
        rd_kafka_buf_write_i16(rkbuf, 0); /* Version */
        of_TopicCnt = rd_kafka_buf_write_i32(rkbuf, 0); /* Updated later */
        for (i = 0 ; i < rkgm->rkgm_assignment->cnt ; i++) {
                const rd_kafka_topic_partition_t *rktpar;

                rktpar = &rkgm->rkgm_assignment->elems[i];

                if (!last_topic || strcmp(last_topic,
                                          rktpar->topic)) {
                        if (last_topic)
                                /* Finalize previous PartitionCnt */
                                rd_kafka_buf_update_i32(rkbuf, of_PartCnt,
                                                        PartCnt);
                        rd_kafka_buf_write_str(rkbuf, rktpar->topic, -1);
                        /* Updated later */
                        of_PartCnt = rd_kafka_buf_write_i32(rkbuf, 0);
                        PartCnt = 0;
                        last_topic = rktpar->topic;
                        TopicCnt++;
                }

                rd_kafka_buf_write_i32(rkbuf, rktpar->partition);
                PartCnt++;
        }

        if (of_PartCnt != -1)
                rd_kafka_buf_update_i32(rkbuf, of_PartCnt, PartCnt);
        rd_kafka_buf_update_i32(rkbuf, of_TopicCnt, TopicCnt);

        rd_kafka_buf_write_kbytes(rkbuf, rkgm->rkgm_userdata);

        /* Get pointer to binary buffer */
        rd_slice_init_full(&slice, &rkbuf->rkbuf_buf);

        /* Write binary buffer as Kafka Bytes to enveloping buffer. */
        rd_kafka_buf_write_i32(env_rkbuf, (int32_t)rd_slice_remains(&slice));
        rd_buf_write_slice(&env_rkbuf->rkbuf_buf, &slice);

        rd_kafka_buf_destroy(rkbuf);
}

/**
 * Send SyncGroupRequest
 */
void rd_kafka_SyncGroupRequest (rd_kafka_broker_t *rkb,
                                const rd_kafkap_str_t *group_id,
                                int32_t generation_id,
                                const rd_kafkap_str_t *member_id,
                                const rd_kafka_group_member_t
                                *assignments,
                                int assignment_cnt,
                                rd_kafka_replyq_t replyq,
                                rd_kafka_resp_cb_t *resp_cb,
                                void *opaque) {
        rd_kafka_buf_t *rkbuf;
        int i;

        rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_SyncGroup,
                                         1,
                                         RD_KAFKAP_STR_SIZE(group_id) +
                                         4 /* GenerationId */ +
                                         RD_KAFKAP_STR_SIZE(member_id) +
                                         4 /* array size group_assignment */ +
                                         (assignment_cnt * 100/*guess*/));
        rd_kafka_buf_write_kstr(rkbuf, group_id);
        rd_kafka_buf_write_i32(rkbuf, generation_id);
        rd_kafka_buf_write_kstr(rkbuf, member_id);
        rd_kafka_buf_write_i32(rkbuf, assignment_cnt);

        for (i = 0 ; i < assignment_cnt ; i++) {
                const rd_kafka_group_member_t *rkgm = &assignments[i];

                rd_kafka_buf_write_kstr(rkbuf, rkgm->rkgm_member_id);
                rd_kafka_group_MemberState_consumer_write(rkbuf, rkgm);
        }

        /* This is a blocking request */
        rkbuf->rkbuf_flags |= RD_KAFKA_OP_F_BLOCKING;
        rd_kafka_buf_set_abs_timeout(
                rkbuf,
                rkb->rkb_rk->rk_conf.group_session_timeout_ms +
                3000/* 3s grace period*/,
                0);

        rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
}

/**
 * Handler for SyncGroup responses
 * opaque must be the cgrp handle.
 */
void rd_kafka_handle_SyncGroup (rd_kafka_t *rk,
				rd_kafka_broker_t *rkb,
                                rd_kafka_resp_err_t err,
                                rd_kafka_buf_t *rkbuf,
                                rd_kafka_buf_t *request,
                                void *opaque) {
        rd_kafka_cgrp_t *rkcg = opaque;
        const int log_decode_errors = LOG_ERR;
        int16_t ErrorCode = 0;
        rd_kafkap_bytes_t MemberState = RD_ZERO_INIT;
        int actions;

	if (rkcg->rkcg_join_state != RD_KAFKA_CGRP_JOIN_STATE_WAIT_SYNC) {
		rd_kafka_dbg(rkb->rkb_rk, CGRP, "SYNCGROUP",
			     "SyncGroup response: discarding outdated request "
			     "(now in join-state %s)",
			     rd_kafka_cgrp_join_state_names[rkcg->
							    rkcg_join_state]);
		return;
	}

        if (err) {
                ErrorCode = err;
                goto err;
        }

        rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
        rd_kafka_buf_read_bytes(rkbuf, &MemberState);

err:
        actions = rd_kafka_err_action(rkb, ErrorCode, rkbuf, request,
				      RD_KAFKA_ERR_ACTION_END);

        if (actions & RD_KAFKA_ERR_ACTION_REFRESH) {
                /* Re-query for coordinator */
                rd_kafka_cgrp_op(rkcg, NULL, RD_KAFKA_NO_REPLYQ,
				 RD_KAFKA_OP_COORD_QUERY,
                                 ErrorCode);
                /* FALLTHRU */
        }

        if (actions & RD_KAFKA_ERR_ACTION_RETRY) {
                if (rd_kafka_buf_retry(rkb, request))
                        return;
                /* FALLTHRU */
        }

        rd_kafka_dbg(rkb->rkb_rk, CGRP, "SYNCGROUP",
                     "SyncGroup response: %s (%d bytes of MemberState data)",
                     rd_kafka_err2str(ErrorCode),
                     RD_KAFKAP_BYTES_LEN(&MemberState));

        if (ErrorCode == RD_KAFKA_RESP_ERR__DESTROY)
                return; /* Termination */

        rd_kafka_cgrp_handle_SyncGroup(rkcg, rkb, ErrorCode, &MemberState);

        return;

 err_parse:
        ErrorCode = rkbuf->rkbuf_err;
        goto err;
}


/**
 * Send JoinGroupRequest
 */
void rd_kafka_JoinGroupRequest (rd_kafka_broker_t *rkb,
                                const rd_kafkap_str_t *group_id,
                                const rd_kafkap_str_t *member_id,
                                const rd_kafkap_str_t *protocol_type,
				const rd_list_t *topics,
                                rd_kafka_replyq_t replyq,
                                rd_kafka_resp_cb_t *resp_cb,
                                void *opaque) {
        rd_kafka_buf_t *rkbuf;
        rd_kafka_t *rk = rkb->rkb_rk;
        rd_kafka_assignor_t *rkas;
        int i;

        rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_JoinGroup,
                                         1,
                                         RD_KAFKAP_STR_SIZE(group_id) +
                                         4 /* sessionTimeoutMs */ +
                                         RD_KAFKAP_STR_SIZE(member_id) +
                                         RD_KAFKAP_STR_SIZE(protocol_type) +
                                         4 /* array count GroupProtocols */ +
                                         (rd_list_cnt(topics) * 100));
        rd_kafka_buf_write_kstr(rkbuf, group_id);
        rd_kafka_buf_write_i32(rkbuf, rk->rk_conf.group_session_timeout_ms);
        rd_kafka_buf_write_kstr(rkbuf, member_id);
        rd_kafka_buf_write_kstr(rkbuf, protocol_type);
        rd_kafka_buf_write_i32(rkbuf, rk->rk_conf.enabled_assignor_cnt);

        RD_LIST_FOREACH(rkas, &rk->rk_conf.partition_assignors, i) {
                rd_kafkap_bytes_t *member_metadata;
		if (!rkas->rkas_enabled)
			continue;
                rd_kafka_buf_write_kstr(rkbuf, rkas->rkas_protocol_name);
                member_metadata = rkas->rkas_get_metadata_cb(rkas, topics);
                rd_kafka_buf_write_kbytes(rkbuf, member_metadata);
                rd_kafkap_bytes_destroy(member_metadata);
        }

        /* This is a blocking request */
        rkbuf->rkbuf_flags |= RD_KAFKA_OP_F_BLOCKING;
        rd_kafka_buf_set_abs_timeout(
                rkbuf,
                rk->rk_conf.group_session_timeout_ms +
                3000/* 3s grace period*/,
                0);

        rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
}






/**
 * Send LeaveGroupRequest
 */
void rd_kafka_LeaveGroupRequest (rd_kafka_broker_t *rkb,
                                 const rd_kafkap_str_t *group_id,
                                 const rd_kafkap_str_t *member_id,
                                 rd_kafka_replyq_t replyq,
                                 rd_kafka_resp_cb_t *resp_cb,
                                 void *opaque) {
        rd_kafka_buf_t *rkbuf;

        rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_LeaveGroup,
                                         1,
                                         RD_KAFKAP_STR_SIZE(group_id) +
                                         RD_KAFKAP_STR_SIZE(member_id));
        rd_kafka_buf_write_kstr(rkbuf, group_id);
        rd_kafka_buf_write_kstr(rkbuf, member_id);

        rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
}


/**
 * Handler for LeaveGroup responses
 * opaque must be the cgrp handle.
 */
void rd_kafka_handle_LeaveGroup (rd_kafka_t *rk,
				 rd_kafka_broker_t *rkb,
                                 rd_kafka_resp_err_t err,
                                 rd_kafka_buf_t *rkbuf,
                                 rd_kafka_buf_t *request,
                                 void *opaque) {
        rd_kafka_cgrp_t *rkcg = opaque;
        const int log_decode_errors = LOG_ERR;
        int16_t ErrorCode = 0;
        int actions;

        if (err) {
                ErrorCode = err;
                goto err;
        }

        rd_kafka_buf_read_i16(rkbuf, &ErrorCode);

err:
        actions = rd_kafka_err_action(rkb, ErrorCode, rkbuf, request,
				      RD_KAFKA_ERR_ACTION_END);

        if (actions & RD_KAFKA_ERR_ACTION_REFRESH) {
                /* Re-query for coordinator */
                rd_kafka_cgrp_op(rkcg, NULL, RD_KAFKA_NO_REPLYQ,
				 RD_KAFKA_OP_COORD_QUERY, ErrorCode);
        }

        if (actions & RD_KAFKA_ERR_ACTION_RETRY) {
                if (rd_kafka_buf_retry(rkb, request))
                        return;
                /* FALLTHRU */
        }

        if (ErrorCode)
                rd_kafka_dbg(rkb->rkb_rk, CGRP, "LEAVEGROUP",
                             "LeaveGroup response: %s",
                             rd_kafka_err2str(ErrorCode));

        return;

 err_parse:
        ErrorCode = rkbuf->rkbuf_err;
        goto err;
}






/**
 * Send HeartbeatRequest
 */
void rd_kafka_HeartbeatRequest (rd_kafka_broker_t *rkb,
                                const rd_kafkap_str_t *group_id,
                                int32_t generation_id,
                                const rd_kafkap_str_t *member_id,
                                rd_kafka_replyq_t replyq,
                                rd_kafka_resp_cb_t *resp_cb,
                                void *opaque) {
        rd_kafka_buf_t *rkbuf;

        rd_rkb_dbg(rkb, CGRP, "HEARTBEAT",
                   "Heartbeat for group \"%s\" generation id %"PRId32,
                   group_id->str, generation_id);

        rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_Heartbeat,
                                         1,
                                         RD_KAFKAP_STR_SIZE(group_id) +
                                         4 /* GenerationId */ +
                                         RD_KAFKAP_STR_SIZE(member_id));

        rd_kafka_buf_write_kstr(rkbuf, group_id);
        rd_kafka_buf_write_i32(rkbuf, generation_id);
        rd_kafka_buf_write_kstr(rkbuf, member_id);

        rd_kafka_buf_set_abs_timeout(
                rkbuf,
                rkb->rkb_rk->rk_conf.group_session_timeout_ms,
                0);

        rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
}




/**
 * Send ListGroupsRequest
 */
void rd_kafka_ListGroupsRequest (rd_kafka_broker_t *rkb,
                                 rd_kafka_replyq_t replyq,
                                 rd_kafka_resp_cb_t *resp_cb,
                                 void *opaque) {
        rd_kafka_buf_t *rkbuf;

        rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_ListGroups, 0, 0);

        rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
}


/**
 * Send DescribeGroupsRequest
 */
void rd_kafka_DescribeGroupsRequest (rd_kafka_broker_t *rkb,
                                     const char **groups, int group_cnt,
                                     rd_kafka_replyq_t replyq,
                                     rd_kafka_resp_cb_t *resp_cb,
                                     void *opaque) {
        rd_kafka_buf_t *rkbuf;

        rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_DescribeGroups,
                                         1, 32*group_cnt);

        rd_kafka_buf_write_i32(rkbuf, group_cnt);
        while (group_cnt-- > 0)
                rd_kafka_buf_write_str(rkbuf, groups[group_cnt], -1);

        rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
}




/**
 * @brief Generic handler for Metadata responses
 *
 * @locality rdkafka main thread
 */
static void rd_kafka_handle_Metadata (rd_kafka_t *rk,
                                      rd_kafka_broker_t *rkb,
                                      rd_kafka_resp_err_t err,
                                      rd_kafka_buf_t *rkbuf,
                                      rd_kafka_buf_t *request,
                                      void *opaque) {
        rd_kafka_op_t *rko = opaque; /* Possibly NULL */
        struct rd_kafka_metadata *md = NULL;
        const rd_list_t *topics = request->rkbuf_u.Metadata.topics;
        char actstr[64];
        int actions;

        rd_kafka_assert(NULL, err == RD_KAFKA_RESP_ERR__DESTROY ||
                        thrd_is_current(rk->rk_thread));

        /* Avoid metadata updates when we're terminating. */
        if (rd_kafka_terminating(rkb->rkb_rk) ||
            err == RD_KAFKA_RESP_ERR__DESTROY) {
                /* Terminating */
                goto done;
        }

        if (err)
                goto err;

        if (!topics)
                rd_rkb_dbg(rkb, METADATA, "METADATA",
                           "===== Received metadata: %s =====",
                           request->rkbuf_u.Metadata.reason);
        else
                rd_rkb_dbg(rkb, METADATA, "METADATA",
                           "===== Received metadata "
                           "(for %d requested topics): %s =====",
                           rd_list_cnt(topics),
                           request->rkbuf_u.Metadata.reason);

        err = rd_kafka_parse_Metadata(rkb, request, rkbuf, &md);
        if (err)
                goto err;

        if (rko && rko->rko_replyq.q) {
                /* Reply to metadata requester, passing on the metadata.
                 * Reuse requesting rko for the reply. */
                rko->rko_err = err;
                rko->rko_u.metadata.md = md;

                rd_kafka_replyq_enq(&rko->rko_replyq, rko, 0);
                rko = NULL;
        } else {
                if (md)
                        rd_free(md);
        }

        goto done;

 err:
        actions = rd_kafka_err_action(
                rkb, err, rkbuf, request,

                RD_KAFKA_ERR_ACTION_RETRY,
                RD_KAFKA_RESP_ERR__PARTIAL,

                RD_KAFKA_ERR_ACTION_END);

        if (actions & RD_KAFKA_ERR_ACTION_RETRY) {
                if (rd_kafka_buf_retry(rkb, request))
                        return;
                /* FALLTHRU */
        } else {
                rd_rkb_log(rkb, LOG_WARNING, "METADATA",
                           "Metadata request failed: %s: %s (%dms): %s",
                           request->rkbuf_u.Metadata.reason,
                           rd_kafka_err2str(err),
                           (int)(request->rkbuf_ts_sent/1000),
                           rd_flags2str(actstr, sizeof(actstr),
                                        rd_kafka_actions_descs,
                                        actions));
        }



        /* FALLTHRU */

 done:
        if (rko)
                rd_kafka_op_destroy(rko);
}


/**
 * @brief Construct MetadataRequest (does not send)
 *
 * \p topics is a list of topic names (char *) to request.
 *
 * !topics          - only request brokers (if supported by broker, else
 *                    all topics)
 *  topics.cnt==0   - all topics in cluster are requested
 *  topics.cnt >0   - only specified topics are requested
 *
 * @param reason    - metadata request reason
 * @param rko       - (optional) rko with replyq for handling response.
 *                    Specifying an rko forces a metadata request even if
 *                    there is already a matching one in-transit.
 *
 * If full metadata for all topics is requested (or all brokers, which
 * results in all-topics on older brokers) and there is already a full request
 * in transit then this function will return RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS
 * otherwise RD_KAFKA_RESP_ERR_NO_ERROR. If \p rko is non-NULL the request
 * is sent regardless.
 */
rd_kafka_resp_err_t
rd_kafka_MetadataRequest (rd_kafka_broker_t *rkb,
                          const rd_list_t *topics, const char *reason,
                          rd_kafka_op_t *rko) {
        rd_kafka_buf_t *rkbuf;
        int16_t ApiVersion = 0;
        int features;
        int topic_cnt = topics ? rd_list_cnt(topics) : 0;
        int *full_incr = NULL;

        ApiVersion = rd_kafka_broker_ApiVersion_supported(rkb,
                                                          RD_KAFKAP_Metadata,
                                                          0, 2,
                                                          &features);

        rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_Metadata, 1,
                                         4 + (50 * topic_cnt));

        if (!reason)
                reason = "";

        rkbuf->rkbuf_u.Metadata.reason = rd_strdup(reason);

        if (!topics && ApiVersion >= 1) {
                /* a null(0) array (in the protocol) represents no topics */
                rd_kafka_buf_write_i32(rkbuf, 0);
                rd_rkb_dbg(rkb, METADATA, "METADATA",
                           "Request metadata for brokers only: %s", reason);
                full_incr = &rkb->rkb_rk->rk_metadata_cache.
                        rkmc_full_brokers_sent;

        } else {
                if (topic_cnt == 0 && !rko)
                        full_incr = &rkb->rkb_rk->rk_metadata_cache.
                                rkmc_full_topics_sent;

                if (topic_cnt == 0 && ApiVersion >= 1)
                        rd_kafka_buf_write_i32(rkbuf, -1); /* Null: all topics*/
                else
                        rd_kafka_buf_write_i32(rkbuf, topic_cnt);

                if (topic_cnt == 0) {
                        rkbuf->rkbuf_u.Metadata.all_topics = 1;
                        rd_rkb_dbg(rkb, METADATA, "METADATA",
                                   "Request metadata for all topics: "
                                   "%s", reason);
                } else
                        rd_rkb_dbg(rkb, METADATA, "METADATA",
                                   "Request metadata for %d topic(s): "
                                   "%s", topic_cnt, reason);
        }

        if (full_incr) {
                /* Avoid multiple outstanding full requests
                 * (since they are redundant and side-effect-less).
                 * Forced requests (app using metadata() API) are passed
                 * through regardless. */

                mtx_lock(&rkb->rkb_rk->rk_metadata_cache.
                         rkmc_full_lock);
                if (*full_incr > 0 && (!rko || !rko->rko_u.metadata.force)) {
                        mtx_unlock(&rkb->rkb_rk->rk_metadata_cache.
                                   rkmc_full_lock);
                        rd_rkb_dbg(rkb, METADATA, "METADATA",
                                   "Skipping metadata request: %s: "
                                   "full request already in-transit",
                                   reason);
                        rd_kafka_buf_destroy(rkbuf);
                        return RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS;
                }

                (*full_incr)++;
                mtx_unlock(&rkb->rkb_rk->rk_metadata_cache.
                           rkmc_full_lock);
                rkbuf->rkbuf_u.Metadata.decr = full_incr;
                rkbuf->rkbuf_u.Metadata.decr_lock = &rkb->rkb_rk->
                        rk_metadata_cache.rkmc_full_lock;
        }


        if (topic_cnt > 0) {
                char *topic;
                int i;

                /* Maintain a copy of the topics list so we can purge
                 * hints from the metadata cache on error. */
                rkbuf->rkbuf_u.Metadata.topics =
                        rd_list_copy(topics, rd_list_string_copy, NULL);

                RD_LIST_FOREACH(topic, topics, i)
                        rd_kafka_buf_write_str(rkbuf, topic, -1);

        }

        rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);

        /* Metadata requests are part of the important control plane
         * and should go before other requests (Produce, Fetch, etc). */
        rkbuf->rkbuf_flags |= RD_KAFKA_OP_F_FLASH;

        rd_kafka_broker_buf_enq_replyq(rkb, rkbuf,
                                       /* Handle response thru rk_ops,
                                        * but forward parsed result to
                                        * rko's replyq when done. */
                                       RD_KAFKA_REPLYQ(rkb->rkb_rk->
                                                       rk_ops, 0),
                                       rd_kafka_handle_Metadata, rko);

        return RD_KAFKA_RESP_ERR_NO_ERROR;
}









/**
 * @brief Parses and handles ApiVersion reply.
 *
 * @param apis will be allocated, populated and sorted
 *             with broker's supported APIs.
 * @param api_cnt will be set to the number of elements in \p *apis

 * @returns 0 on success, else an error.
 */
rd_kafka_resp_err_t
rd_kafka_handle_ApiVersion (rd_kafka_t *rk,
			    rd_kafka_broker_t *rkb,
			    rd_kafka_resp_err_t err,
			    rd_kafka_buf_t *rkbuf,
			    rd_kafka_buf_t *request,
			    struct rd_kafka_ApiVersion **apis,
			    size_t *api_cnt) {
        const int log_decode_errors = LOG_ERR;
	int32_t ApiArrayCnt;
	int16_t ErrorCode;
	int i = 0;

	*apis = NULL;

        if (err)
                goto err;

	rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
	if ((err = ErrorCode))
		goto err;

        rd_kafka_buf_read_i32(rkbuf, &ApiArrayCnt);
	if (ApiArrayCnt > 1000)
		rd_kafka_buf_parse_fail(rkbuf,
					"ApiArrayCnt %"PRId32" out of range",
					ApiArrayCnt);

	rd_rkb_dbg(rkb, FEATURE, "APIVERSION",
		   "Broker API support:");

	*apis = malloc(sizeof(**apis) * ApiArrayCnt);

	for (i = 0 ; i < ApiArrayCnt ; i++) {
		struct rd_kafka_ApiVersion *api = &(*apis)[i];

		rd_kafka_buf_read_i16(rkbuf, &api->ApiKey);
		rd_kafka_buf_read_i16(rkbuf, &api->MinVer);
		rd_kafka_buf_read_i16(rkbuf, &api->MaxVer);

		rd_rkb_dbg(rkb, FEATURE, "APIVERSION",
			   "  ApiKey %s (%hd) Versions %hd..%hd",
			   rd_kafka_ApiKey2str(api->ApiKey),
			   api->ApiKey, api->MinVer, api->MaxVer);
        }

	*api_cnt = ApiArrayCnt;
        qsort(*apis, *api_cnt, sizeof(**apis), rd_kafka_ApiVersion_key_cmp);

	goto done;

 err_parse:
        err = rkbuf->rkbuf_err;
 err:
	if (*apis)
		rd_free(*apis);

        /* There are no retryable errors. */

 done:
        return err;
}



/**
 * Send ApiVersionRequest (KIP-35)
 */
void rd_kafka_ApiVersionRequest (rd_kafka_broker_t *rkb,
				 rd_kafka_replyq_t replyq,
				 rd_kafka_resp_cb_t *resp_cb,
				 void *opaque, int flash_msg) {
        rd_kafka_buf_t *rkbuf;

        rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_ApiVersion, 1, 4);
	rkbuf->rkbuf_flags |= (flash_msg ? RD_KAFKA_OP_F_FLASH : 0);
	rd_kafka_buf_write_i32(rkbuf, 0); /* Empty array: request all APIs */

	/* Non-supporting brokers will tear down the connection when they
	 * receive an unknown API request, so dont retry request on failure. */
	rkbuf->rkbuf_retries = RD_KAFKA_BUF_NO_RETRIES;

	/* 0.9.0.x brokers will not close the connection on unsupported
	 * API requests, so we minimize the timeout for the request.
	 * This is a regression on the broker part. */
        rd_kafka_buf_set_abs_timeout(
                rkbuf,
                rkb->rkb_rk->rk_conf.api_version_request_timeout_ms,
                0);

        if (replyq.q)
                rd_kafka_broker_buf_enq_replyq(rkb,
                                               rkbuf, replyq, resp_cb, opaque);
	else /* in broker thread */
		rd_kafka_broker_buf_enq1(rkb, rkbuf, resp_cb, opaque);
}


/**
 * Send SaslHandshakeRequest (KIP-43)
 */
void rd_kafka_SaslHandshakeRequest (rd_kafka_broker_t *rkb,
				    const char *mechanism,
				    rd_kafka_replyq_t replyq,
				    rd_kafka_resp_cb_t *resp_cb,
				    void *opaque, int flash_msg) {
        rd_kafka_buf_t *rkbuf;
	int mechlen = (int)strlen(mechanism);

        rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_SaslHandshake,
                                         1, RD_KAFKAP_STR_SIZE0(mechlen));
	rkbuf->rkbuf_flags |= (flash_msg ? RD_KAFKA_OP_F_FLASH : 0);
	rd_kafka_buf_write_str(rkbuf, mechanism, mechlen);

	/* Non-supporting brokers will tear down the conneciton when they
	 * receive an unknown API request or where the SASL GSSAPI
	 * token type is not recognized, so dont retry request on failure. */
	rkbuf->rkbuf_retries = RD_KAFKA_BUF_NO_RETRIES;

	/* 0.9.0.x brokers will not close the connection on unsupported
	 * API requests, so we minimize the timeout of the request.
	 * This is a regression on the broker part. */
        if (!rkb->rkb_rk->rk_conf.api_version_request &&
            rkb->rkb_rk->rk_conf.socket_timeout_ms > 10*1000)
                rd_kafka_buf_set_abs_timeout(rkbuf, 10*1000 /*10s*/, 0);

	if (replyq.q)
		rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq,
                                               resp_cb, opaque);
	else /* in broker thread */
		rd_kafka_broker_buf_enq1(rkb, rkbuf, resp_cb, opaque);
}




/**
 * @brief Parses a Produce reply.
 * @returns 0 on success or an error code on failure.
 * @locality broker thread
 */
static rd_kafka_resp_err_t
rd_kafka_handle_Produce_parse (rd_kafka_broker_t *rkb,
                               rd_kafka_toppar_t *rktp,
                               rd_kafka_buf_t *rkbuf,
                               rd_kafka_buf_t *request,
                               int64_t *offsetp,
                               int64_t *timestampp) {
        int32_t TopicArrayCnt;
        int32_t PartitionArrayCnt;
        struct {
                int32_t Partition;
                int16_t ErrorCode;
                int64_t Offset;
        } hdr;
        const int log_decode_errors = LOG_ERR;

        rd_kafka_buf_read_i32(rkbuf, &TopicArrayCnt);
        if (TopicArrayCnt != 1)
                goto err;

        /* Since we only produce to one single topic+partition in each
         * request we assume that the reply only contains one topic+partition
         * and that it is the same that we requested.
         * If not the broker is buggy. */
        rd_kafka_buf_skip_str(rkbuf);
        rd_kafka_buf_read_i32(rkbuf, &PartitionArrayCnt);

        if (PartitionArrayCnt != 1)
                goto err;

        rd_kafka_buf_read_i32(rkbuf, &hdr.Partition);
        rd_kafka_buf_read_i16(rkbuf, &hdr.ErrorCode);
        rd_kafka_buf_read_i64(rkbuf, &hdr.Offset);

        *offsetp = hdr.Offset;

        *timestampp = -1;
        if (request->rkbuf_reqhdr.ApiVersion >= 2) {
                rd_kafka_buf_read_i64(rkbuf, timestampp);
        }

        if (request->rkbuf_reqhdr.ApiVersion >= 1) {
                int32_t Throttle_Time;
                rd_kafka_buf_read_i32(rkbuf, &Throttle_Time);

                rd_kafka_op_throttle_time(rkb, rkb->rkb_rk->rk_rep,
                                          Throttle_Time);
        }


        return hdr.ErrorCode;

 err_parse:
        return rkbuf->rkbuf_err;
 err:
        return RD_KAFKA_RESP_ERR__BAD_MSG;
}


/**
 * @brief Handle ProduceResponse
 *
 * @locality broker thread
 */
static void rd_kafka_handle_Produce (rd_kafka_t *rk,
                                     rd_kafka_broker_t *rkb,
                                     rd_kafka_resp_err_t err,
                                     rd_kafka_buf_t *reply,
                                     rd_kafka_buf_t *request,
                                     void *opaque) {
        shptr_rd_kafka_toppar_t *s_rktp = opaque; /* from ProduceRequest() */
        rd_kafka_toppar_t *rktp = rd_kafka_toppar_s2i(s_rktp);
        int64_t offset = RD_KAFKA_OFFSET_INVALID;
        int64_t timestamp = -1;

        /* Parse Produce reply (unless the request errored) */
        if (!err && reply)
                err = rd_kafka_handle_Produce_parse(rkb, rktp,
                                                    reply, request,
                                                    &offset, &timestamp);


        if (likely(!err)) {
                rd_rkb_dbg(rkb, MSG, "MSGSET",
                           "%s [%"PRId32"]: MessageSet with %i message(s) "
                           "delivered",
                           rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
                           request->rkbuf_msgq.rkmq_msg_cnt);

        } else {
                /* Error */
                int actions;
                char actstr[64];

                if (err == RD_KAFKA_RESP_ERR__DESTROY)
                        goto done; /* Terminating */

                actions = rd_kafka_err_action(
                        rkb, err, reply, request,

                        RD_KAFKA_ERR_ACTION_REFRESH,
                        RD_KAFKA_RESP_ERR__TRANSPORT,

                        RD_KAFKA_ERR_ACTION_REFRESH,
                        RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART,

                        RD_KAFKA_ERR_ACTION_RETRY,
                        RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS,

                        RD_KAFKA_ERR_ACTION_RETRY,
                        RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND,

                        RD_KAFKA_ERR_ACTION_RETRY,
                        RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE,

                        RD_KAFKA_ERR_ACTION_RETRY,
                        RD_KAFKA_RESP_ERR__TIMED_OUT,

                        RD_KAFKA_ERR_ACTION_PERMANENT,
                        RD_KAFKA_RESP_ERR__MSG_TIMED_OUT,

                        RD_KAFKA_ERR_ACTION_END);

                rd_rkb_dbg(rkb, MSG, "MSGSET",
                           "%s [%"PRId32"]: MessageSet with %i message(s) "
                           "encountered error: %s (actions %s)",
                           rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
                           request->rkbuf_msgq.rkmq_msg_cnt,
                           rd_kafka_err2str(err),
                           rd_flags2str(actstr, sizeof(actstr),
                                        rd_kafka_actions_descs,
                                        actions));


                if (actions & (RD_KAFKA_ERR_ACTION_REFRESH |
                               RD_KAFKA_ERR_ACTION_RETRY)) {
                        /* Retry */
                        int incr_retry = 1; /* Increase per-message retry cnt */

                        if (actions & RD_KAFKA_ERR_ACTION_REFRESH) {
                                /* Request metadata information update.
                                 * These errors imply that we have stale
                                 * information and the request was
                                 * either rejected or not sent -
                                 * we don't need to increment the retry count
                                 * when we perform a retry since:
                                 *   - it is a temporary error (hopefully)
                                 *   - there is no chance of duplicate delivery
                                 */
                                rd_kafka_toppar_leader_unavailable(
                                        rktp, "produce", err);

                                /* We can't be certain the request wasn't
                                 * sent in case of transport failure,
                                 * so the ERR__TRANSPORT case will need
                                 * the retry count to be increased */
                                if (err != RD_KAFKA_RESP_ERR__TRANSPORT)
                                        incr_retry = 0;
                        }

                        /* If message timed out in queue, not in transit,
                         * we will retry at a later time but not increment
                         * the retry count since there is no risk
                         * of duplicates. */
                        if (err == RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE)
                                incr_retry = 0;

                        /* Since requests are specific to a broker
                         * we move the retryable messages from the request
                         * back to the partition queue (prepend) and then
                         * let the new broker construct a new request.
                         * While doing this we also make sure the retry count
                         * for each message is honoured, any messages that
                         * would exceeded the retry count will not be
                         * moved but instead fail below. */
                        rd_kafka_toppar_retry_msgq(rktp, &request->rkbuf_msgq,
                                                   incr_retry);

                        if (rd_kafka_msgq_len(&request->rkbuf_msgq) == 0) {
                                /* No need do anything more with the request
                                 * here since the request no longer has any
                                 messages associated with it. */
                                goto done;
                        }
                }

                /* Translate request-level timeout error code
                 * to message-level timeout error code. */
                if (err == RD_KAFKA_RESP_ERR__TIMED_OUT ||
                    err == RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE)
                        err = RD_KAFKA_RESP_ERR__MSG_TIMED_OUT;

                /* Fatal errors: no message transmission retries */
                /* FALLTHRU */
        }

        /* Propagate assigned offset and timestamp back to app. */
        if (likely(!err && offset != RD_KAFKA_OFFSET_INVALID)) {
                rd_kafka_msg_t *rkm;
                if (rktp->rktp_rkt->rkt_conf.produce_offset_report) {
                        /* produce.offset.report: each message */
                        TAILQ_FOREACH(rkm, &request->rkbuf_msgq.rkmq_msgs,
                                      rkm_link) {
                                rkm->rkm_offset = offset++;
                                if (timestamp != -1) {
                                        rkm->rkm_timestamp = timestamp;
                                        rkm->rkm_tstype = RD_KAFKA_MSG_ATTR_LOG_APPEND_TIME;
                                }
                        }
                } else {
                        /* Last message in each batch */
                        rkm = TAILQ_LAST(&request->rkbuf_msgq.rkmq_msgs,
                                         rd_kafka_msg_head_s);
                        rkm->rkm_offset = offset +
                                request->rkbuf_msgq.rkmq_msg_cnt - 1;
                        if (timestamp != -1) {
                                rkm->rkm_timestamp = timestamp;
                                rkm->rkm_tstype = RD_KAFKA_MSG_ATTR_LOG_APPEND_TIME;
                        }
                }
        }

        /* Enqueue messages for delivery report */
        rd_kafka_dr_msgq(rktp->rktp_rkt, &request->rkbuf_msgq, err);

 done:
        rd_kafka_toppar_destroy(s_rktp); /* from ProduceRequest() */
}


/**
 * @brief Send ProduceRequest for messages in toppar queue.
 *
 * @returns the number of messages included, or 0 on error / no messages.
 *
 * @locality broker thread
 */
int rd_kafka_ProduceRequest (rd_kafka_broker_t *rkb, rd_kafka_toppar_t *rktp) {
        rd_kafka_buf_t *rkbuf;
        rd_kafka_itopic_t *rkt = rktp->rktp_rkt;
        size_t MessageSetSize = 0;
        int cnt;
        rd_ts_t now;
        int64_t first_msg_timeout;
        int tmout;

        /**
         * Create ProduceRequest with as many messages from the toppar
         * transmit queue as possible.
         */
        rkbuf = rd_kafka_msgset_create_ProduceRequest(rkb, rktp,
                                                      &MessageSetSize);
        if (unlikely(!rkbuf))
                return 0;

        cnt = rkbuf->rkbuf_msgq.rkmq_msg_cnt;
        rd_dassert(cnt > 0);

        rd_atomic64_add(&rktp->rktp_c.tx_msgs, cnt);
        rd_atomic64_add(&rktp->rktp_c.tx_bytes, MessageSetSize);

        if (!rkt->rkt_conf.required_acks)
                rkbuf->rkbuf_flags |= RD_KAFKA_OP_F_NO_RESPONSE;

        /* Use timeout from first message in batch */
        now = rd_clock();
        first_msg_timeout = (TAILQ_FIRST(&rkbuf->rkbuf_msgq.rkmq_msgs)->
                             rkm_ts_timeout - now) / 1000;

        if (unlikely(first_msg_timeout <= 0)) {
                /* Message has already timed out, allow 100 ms
                 * to produce anyway */
                tmout = 100;
        } else {
                tmout = (int)first_msg_timeout;
        }

        /* Set absolute timeout (including retries), the
         * effective timeout for this specific request will be
         * capped by socket.timeout.ms */
        rd_kafka_buf_set_abs_timeout(rkbuf, tmout, now);

        rd_kafka_broker_buf_enq_replyq(rkb, rkbuf,
                                       RD_KAFKA_NO_REPLYQ,
                                       rd_kafka_handle_Produce,
                                       /* toppar ref for handle_Produce() */
                                       rd_kafka_toppar_keep(rktp));

        return cnt;
}