Blame src/rdkafka_msg.c

Packit 2997f0
/*
Packit 2997f0
 * librdkafka - Apache Kafka C library
Packit 2997f0
 *
Packit 2997f0
 * Copyright (c) 2012,2013 Magnus Edenhill
Packit 2997f0
 * All rights reserved.
Packit 2997f0
 * 
Packit 2997f0
 * Redistribution and use in source and binary forms, with or without
Packit 2997f0
 * modification, are permitted provided that the following conditions are met: 
Packit 2997f0
 * 
Packit 2997f0
 * 1. Redistributions of source code must retain the above copyright notice,
Packit 2997f0
 *    this list of conditions and the following disclaimer. 
Packit 2997f0
 * 2. Redistributions in binary form must reproduce the above copyright notice,
Packit 2997f0
 *    this list of conditions and the following disclaimer in the documentation
Packit 2997f0
 *    and/or other materials provided with the distribution. 
Packit 2997f0
 * 
Packit 2997f0
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
Packit 2997f0
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
Packit 2997f0
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
Packit 2997f0
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
Packit 2997f0
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
Packit 2997f0
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
Packit 2997f0
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
Packit 2997f0
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
Packit 2997f0
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
Packit 2997f0
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
Packit 2997f0
 * POSSIBILITY OF SUCH DAMAGE.
Packit 2997f0
 */
Packit 2997f0
Packit 2997f0
#include "rd.h"
Packit 2997f0
#include "rdkafka_int.h"
Packit 2997f0
#include "rdkafka_msg.h"
Packit 2997f0
#include "rdkafka_topic.h"
Packit 2997f0
#include "rdkafka_partition.h"
Packit 2997f0
#include "rdkafka_interceptor.h"
Packit 2997f0
#include "rdkafka_header.h"
Packit 2997f0
#include "rdcrc32.h"
Packit 2997f0
#include "rdmurmur2.h"
Packit 2997f0
#include "rdrand.h"
Packit 2997f0
#include "rdtime.h"
Packit 2997f0
#include "rdsysqueue.h"
Packit 2997f0
#include "rdunittest.h"
Packit 2997f0
Packit 2997f0
#include <stdarg.h>
Packit 2997f0
Packit 2997f0
void rd_kafka_msg_destroy (rd_kafka_t *rk, rd_kafka_msg_t *rkm) {
Packit 2997f0
Packit 2997f0
	if (rkm->rkm_flags & RD_KAFKA_MSG_F_ACCOUNT) {
Packit 2997f0
		rd_dassert(rk || rkm->rkm_rkmessage.rkt);
Packit 2997f0
		rd_kafka_curr_msgs_sub(
Packit 2997f0
			rk ? rk :
Packit 2997f0
			rd_kafka_topic_a2i(rkm->rkm_rkmessage.rkt)->rkt_rk,
Packit 2997f0
			1, rkm->rkm_len);
Packit 2997f0
	}
Packit 2997f0
Packit 2997f0
        if (rkm->rkm_headers)
Packit 2997f0
                rd_kafka_headers_destroy(rkm->rkm_headers);
Packit 2997f0
Packit 2997f0
	if (likely(rkm->rkm_rkmessage.rkt != NULL))
Packit 2997f0
		rd_kafka_topic_destroy0(
Packit 2997f0
                        rd_kafka_topic_a2s(rkm->rkm_rkmessage.rkt));
Packit 2997f0
Packit 2997f0
	if (rkm->rkm_flags & RD_KAFKA_MSG_F_FREE && rkm->rkm_payload)
Packit 2997f0
		rd_free(rkm->rkm_payload);
Packit 2997f0
Packit 2997f0
	if (rkm->rkm_flags & RD_KAFKA_MSG_F_FREE_RKM)
Packit 2997f0
		rd_free(rkm);
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
Packit 2997f0
Packit 2997f0
/**
Packit 2997f0
 * @brief Create a new Producer message, copying the payload as
Packit 2997f0
 *        indicated by msgflags.
Packit 2997f0
 *
Packit 2997f0
 * @returns the new message
Packit 2997f0
 */
Packit 2997f0
static
Packit 2997f0
rd_kafka_msg_t *rd_kafka_msg_new00 (rd_kafka_itopic_t *rkt,
Packit 2997f0
				    int32_t partition,
Packit 2997f0
				    int msgflags,
Packit 2997f0
				    char *payload, size_t len,
Packit 2997f0
				    const void *key, size_t keylen,
Packit 2997f0
				    void *msg_opaque) {
Packit 2997f0
	rd_kafka_msg_t *rkm;
Packit 2997f0
	size_t mlen = sizeof(*rkm);
Packit 2997f0
	char *p;
Packit 2997f0
Packit 2997f0
	/* If we are to make a copy of the payload, allocate space for it too */
Packit 2997f0
	if (msgflags & RD_KAFKA_MSG_F_COPY) {
Packit 2997f0
		msgflags &= ~RD_KAFKA_MSG_F_FREE;
Packit 2997f0
		mlen += len;
Packit 2997f0
	}
Packit 2997f0
Packit 2997f0
	mlen += keylen;
Packit 2997f0
Packit 2997f0
	/* Note: using rd_malloc here, not rd_calloc, so make sure all fields
Packit 2997f0
	 *       are properly set up. */
Packit 2997f0
	rkm                 = rd_malloc(mlen);
Packit 2997f0
	rkm->rkm_err        = 0;
Packit 2997f0
	rkm->rkm_flags      = (RD_KAFKA_MSG_F_PRODUCER |
Packit 2997f0
                               RD_KAFKA_MSG_F_FREE_RKM | msgflags);
Packit 2997f0
	rkm->rkm_len        = len;
Packit 2997f0
	rkm->rkm_opaque     = msg_opaque;
Packit 2997f0
	rkm->rkm_rkmessage.rkt = rd_kafka_topic_keep_a(rkt);
Packit 2997f0
Packit 2997f0
	rkm->rkm_partition  = partition;
Packit 2997f0
        rkm->rkm_offset     = RD_KAFKA_OFFSET_INVALID;
Packit 2997f0
	rkm->rkm_timestamp  = 0;
Packit 2997f0
	rkm->rkm_tstype     = RD_KAFKA_TIMESTAMP_NOT_AVAILABLE;
Packit 2997f0
        rkm->rkm_headers    = NULL;
Packit 2997f0
Packit 2997f0
	p = (char *)(rkm+1);
Packit 2997f0
Packit 2997f0
	if (payload && msgflags & RD_KAFKA_MSG_F_COPY) {
Packit 2997f0
		/* Copy payload to space following the ..msg_t */
Packit 2997f0
		rkm->rkm_payload = p;
Packit 2997f0
		memcpy(rkm->rkm_payload, payload, len);
Packit 2997f0
		p += len;
Packit 2997f0
Packit 2997f0
	} else {
Packit 2997f0
		/* Just point to the provided payload. */
Packit 2997f0
		rkm->rkm_payload = payload;
Packit 2997f0
	}
Packit 2997f0
Packit 2997f0
	if (key) {
Packit 2997f0
		rkm->rkm_key     = p;
Packit 2997f0
		rkm->rkm_key_len = keylen;
Packit 2997f0
		memcpy(rkm->rkm_key, key, keylen);
Packit 2997f0
	} else {
Packit 2997f0
		rkm->rkm_key = NULL;
Packit 2997f0
		rkm->rkm_key_len = 0;
Packit 2997f0
	}
Packit 2997f0
Packit 2997f0
Packit 2997f0
        return rkm;
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
Packit 2997f0
Packit 2997f0
Packit 2997f0
/**
Packit 2997f0
 * @brief Create a new Producer message.
Packit 2997f0
 *
Packit 2997f0
 * @remark Must only be used by producer code.
Packit 2997f0
 *
Packit 2997f0
 * Returns 0 on success or -1 on error.
Packit 2997f0
 * Both errno and 'errp' are set appropriately.
Packit 2997f0
 */
Packit 2997f0
static rd_kafka_msg_t *rd_kafka_msg_new0 (rd_kafka_itopic_t *rkt,
Packit 2997f0
                                          int32_t force_partition,
Packit 2997f0
                                          int msgflags,
Packit 2997f0
                                          char *payload, size_t len,
Packit 2997f0
                                          const void *key, size_t keylen,
Packit 2997f0
                                          void *msg_opaque,
Packit 2997f0
                                          rd_kafka_resp_err_t *errp,
Packit 2997f0
                                          int *errnop,
Packit 2997f0
                                          rd_kafka_headers_t *hdrs,
Packit 2997f0
                                          int64_t timestamp,
Packit 2997f0
                                          rd_ts_t now) {
Packit 2997f0
	rd_kafka_msg_t *rkm;
Packit 2997f0
        size_t hdrs_size = 0;
Packit 2997f0
Packit 2997f0
	if (unlikely(!payload))
Packit 2997f0
		len = 0;
Packit 2997f0
	if (!key)
Packit 2997f0
		keylen = 0;
Packit 2997f0
        if (hdrs)
Packit 2997f0
                hdrs_size = rd_kafka_headers_serialized_size(hdrs);
Packit 2997f0
Packit 2997f0
	if (unlikely(len + keylen + hdrs_size >
Packit 2997f0
		     (size_t)rkt->rkt_rk->rk_conf.max_msg_size ||
Packit 2997f0
		     keylen > INT32_MAX)) {
Packit 2997f0
		*errp = RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE;
Packit 2997f0
		if (errnop)
Packit 2997f0
			*errnop = EMSGSIZE;
Packit 2997f0
		return NULL;
Packit 2997f0
	}
Packit 2997f0
Packit 2997f0
        if (msgflags & RD_KAFKA_MSG_F_BLOCK)
Packit 2997f0
                *errp = rd_kafka_curr_msgs_add(
Packit 2997f0
                        rkt->rkt_rk, 1, len, 1/*block*/,
Packit 2997f0
                        (msgflags & RD_KAFKA_MSG_F_RKT_RDLOCKED) ?
Packit 2997f0
                        &rkt->rkt_lock : NULL);
Packit 2997f0
        else
Packit 2997f0
                *errp = rd_kafka_curr_msgs_add(rkt->rkt_rk, 1, len, 0, NULL);
Packit 2997f0
Packit 2997f0
        if (unlikely(*errp)) {
Packit 2997f0
		if (errnop)
Packit 2997f0
			*errnop = ENOBUFS;
Packit 2997f0
		return NULL;
Packit 2997f0
	}
Packit 2997f0
Packit 2997f0
Packit 2997f0
	rkm = rd_kafka_msg_new00(rkt, force_partition,
Packit 2997f0
				 msgflags|RD_KAFKA_MSG_F_ACCOUNT /* curr_msgs_add() */,
Packit 2997f0
				 payload, len, key, keylen, msg_opaque);
Packit 2997f0
Packit 2997f0
        memset(&rkm->rkm_u.producer, 0, sizeof(rkm->rkm_u.producer));
Packit 2997f0
Packit 2997f0
        if (timestamp)
Packit 2997f0
                rkm->rkm_timestamp  = timestamp;
Packit 2997f0
        else
Packit 2997f0
                rkm->rkm_timestamp = rd_uclock()/1000;
Packit 2997f0
        rkm->rkm_tstype     = RD_KAFKA_TIMESTAMP_CREATE_TIME;
Packit 2997f0
Packit 2997f0
        if (hdrs) {
Packit 2997f0
                rd_dassert(!rkm->rkm_headers);
Packit 2997f0
                rkm->rkm_headers = hdrs;
Packit 2997f0
        }
Packit 2997f0
Packit 2997f0
        rkm->rkm_ts_enq = now;
Packit 2997f0
Packit 2997f0
	if (rkt->rkt_conf.message_timeout_ms == 0) {
Packit 2997f0
		rkm->rkm_ts_timeout = INT64_MAX;
Packit 2997f0
	} else {
Packit 2997f0
		rkm->rkm_ts_timeout = now +
Packit 2997f0
			rkt->rkt_conf.message_timeout_ms * 1000;
Packit 2997f0
	}
Packit 2997f0
Packit 2997f0
        /* Call interceptor chain for on_send */
Packit 2997f0
        rd_kafka_interceptors_on_send(rkt->rkt_rk, &rkm->rkm_rkmessage);
Packit 2997f0
Packit 2997f0
        return rkm;
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
Packit 2997f0
/**
Packit 2997f0
 * @brief Produce: creates a new message, runs the partitioner and enqueues
Packit 2997f0
 *        into on the selected partition.
Packit 2997f0
 *
Packit 2997f0
 * @returns 0 on success or -1 on error.
Packit 2997f0
 *
Packit 2997f0
 * If the function returns -1 and RD_KAFKA_MSG_F_FREE was specified, then
Packit 2997f0
 * the memory associated with the payload is still the caller's
Packit 2997f0
 * responsibility.
Packit 2997f0
 *
Packit 2997f0
 * @locks none
Packit 2997f0
 */
Packit 2997f0
int rd_kafka_msg_new (rd_kafka_itopic_t *rkt, int32_t force_partition,
Packit 2997f0
		      int msgflags,
Packit 2997f0
		      char *payload, size_t len,
Packit 2997f0
		      const void *key, size_t keylen,
Packit 2997f0
		      void *msg_opaque) {
Packit 2997f0
	rd_kafka_msg_t *rkm;
Packit 2997f0
	rd_kafka_resp_err_t err;
Packit 2997f0
	int errnox;
Packit 2997f0
Packit 2997f0
        /* Create message */
Packit 2997f0
        rkm = rd_kafka_msg_new0(rkt, force_partition, msgflags,
Packit 2997f0
                                payload, len, key, keylen, msg_opaque,
Packit 2997f0
                                &err, &errnox, NULL, 0, rd_clock());
Packit 2997f0
        if (unlikely(!rkm)) {
Packit 2997f0
                /* errno is already set by msg_new() */
Packit 2997f0
		rd_kafka_set_last_error(err, errnox);
Packit 2997f0
                return -1;
Packit 2997f0
        }
Packit 2997f0
Packit 2997f0
Packit 2997f0
        /* Partition the message */
Packit 2997f0
	err = rd_kafka_msg_partitioner(rkt, rkm, 1);
Packit 2997f0
	if (likely(!err)) {
Packit 2997f0
		rd_kafka_set_last_error(0, 0);
Packit 2997f0
		return 0;
Packit 2997f0
	}
Packit 2997f0
Packit 2997f0
        /* Interceptor: unroll failing messages by triggering on_ack.. */
Packit 2997f0
        rkm->rkm_err = err;
Packit 2997f0
        rd_kafka_interceptors_on_acknowledgement(rkt->rkt_rk,
Packit 2997f0
                                                 &rkm->rkm_rkmessage);
Packit 2997f0
Packit 2997f0
	/* Handle partitioner failures: it only fails when the application
Packit 2997f0
	 * attempts to force a destination partition that does not exist
Packit 2997f0
	 * in the cluster.  Note we must clear the RD_KAFKA_MSG_F_FREE
Packit 2997f0
	 * flag since our contract says we don't free the payload on
Packit 2997f0
	 * failure. */
Packit 2997f0
Packit 2997f0
	rkm->rkm_flags &= ~RD_KAFKA_MSG_F_FREE;
Packit 2997f0
	rd_kafka_msg_destroy(rkt->rkt_rk, rkm);
Packit 2997f0
Packit 2997f0
	/* Translate error codes to errnos. */
Packit 2997f0
	if (err == RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION)
Packit 2997f0
		rd_kafka_set_last_error(err, ESRCH);
Packit 2997f0
	else if (err == RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC)
Packit 2997f0
		rd_kafka_set_last_error(err, ENOENT);
Packit 2997f0
	else
Packit 2997f0
		rd_kafka_set_last_error(err, EINVAL); /* NOTREACHED */
Packit 2997f0
Packit 2997f0
	return -1;
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
Packit 2997f0
rd_kafka_resp_err_t rd_kafka_producev (rd_kafka_t *rk, ...) {
Packit 2997f0
        va_list ap;
Packit 2997f0
        rd_kafka_msg_t s_rkm = {
Packit 2997f0
                /* Message defaults */
Packit 2997f0
                .rkm_partition = RD_KAFKA_PARTITION_UA,
Packit 2997f0
                .rkm_timestamp = 0, /* current time */
Packit 2997f0
        };
Packit 2997f0
        rd_kafka_msg_t *rkm = &s_rkm;
Packit 2997f0
        rd_kafka_vtype_t vtype;
Packit 2997f0
        rd_kafka_topic_t *app_rkt;
Packit 2997f0
        shptr_rd_kafka_itopic_t *s_rkt = NULL;
Packit 2997f0
        rd_kafka_itopic_t *rkt;
Packit 2997f0
        rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
Packit 2997f0
        rd_kafka_headers_t *hdrs = NULL;
Packit 2997f0
        rd_kafka_headers_t *app_hdrs = NULL; /* App-provided headers list */
Packit 2997f0
Packit 2997f0
        va_start(ap, rk);
Packit 2997f0
        while (!err &&
Packit 2997f0
               (vtype = va_arg(ap, rd_kafka_vtype_t)) != RD_KAFKA_VTYPE_END) {
Packit 2997f0
                switch (vtype)
Packit 2997f0
                {
Packit 2997f0
                case RD_KAFKA_VTYPE_TOPIC:
Packit 2997f0
                        s_rkt = rd_kafka_topic_new0(rk,
Packit 2997f0
                                                    va_arg(ap, const char *),
Packit 2997f0
                                                    NULL, NULL, 1);
Packit 2997f0
                        break;
Packit 2997f0
Packit 2997f0
                case RD_KAFKA_VTYPE_RKT:
Packit 2997f0
                        app_rkt = va_arg(ap, rd_kafka_topic_t *);
Packit 2997f0
                        s_rkt = rd_kafka_topic_keep(
Packit 2997f0
                                rd_kafka_topic_a2i(app_rkt));
Packit 2997f0
                        break;
Packit 2997f0
Packit 2997f0
                case RD_KAFKA_VTYPE_PARTITION:
Packit 2997f0
                        rkm->rkm_partition = va_arg(ap, int32_t);
Packit 2997f0
                        break;
Packit 2997f0
Packit 2997f0
                case RD_KAFKA_VTYPE_VALUE:
Packit 2997f0
                        rkm->rkm_payload = va_arg(ap, void *);
Packit 2997f0
                        rkm->rkm_len = va_arg(ap, size_t);
Packit 2997f0
                        break;
Packit 2997f0
Packit 2997f0
                case RD_KAFKA_VTYPE_KEY:
Packit 2997f0
                        rkm->rkm_key = va_arg(ap, void *);
Packit 2997f0
                        rkm->rkm_key_len = va_arg(ap, size_t);
Packit 2997f0
                        break;
Packit 2997f0
Packit 2997f0
                case RD_KAFKA_VTYPE_OPAQUE:
Packit 2997f0
                        rkm->rkm_opaque = va_arg(ap, void *);
Packit 2997f0
                        break;
Packit 2997f0
Packit 2997f0
                case RD_KAFKA_VTYPE_MSGFLAGS:
Packit 2997f0
                        rkm->rkm_flags = va_arg(ap, int);
Packit 2997f0
                        break;
Packit 2997f0
Packit 2997f0
                case RD_KAFKA_VTYPE_TIMESTAMP:
Packit 2997f0
                        rkm->rkm_timestamp = va_arg(ap, int64_t);
Packit 2997f0
                        break;
Packit 2997f0
Packit 2997f0
                case RD_KAFKA_VTYPE_HEADER:
Packit 2997f0
                {
Packit 2997f0
                        const char *name;
Packit 2997f0
                        const void *value;
Packit 2997f0
                        ssize_t size;
Packit 2997f0
Packit 2997f0
                        if (unlikely(app_hdrs != NULL)) {
Packit 2997f0
                                err = RD_KAFKA_RESP_ERR__CONFLICT;
Packit 2997f0
                                break;
Packit 2997f0
                        }
Packit 2997f0
Packit 2997f0
                        if (unlikely(!hdrs))
Packit 2997f0
                                hdrs = rd_kafka_headers_new(8);
Packit 2997f0
Packit 2997f0
                        name = va_arg(ap, const char *);
Packit 2997f0
                        value = va_arg(ap, const void *);
Packit 2997f0
                        size = va_arg(ap, ssize_t);
Packit 2997f0
Packit 2997f0
                        err = rd_kafka_header_add(hdrs, name, -1, value, size);
Packit 2997f0
                }
Packit 2997f0
                break;
Packit 2997f0
Packit 2997f0
                case RD_KAFKA_VTYPE_HEADERS:
Packit 2997f0
                        if (unlikely(hdrs != NULL)) {
Packit 2997f0
                                err = RD_KAFKA_RESP_ERR__CONFLICT;
Packit 2997f0
                                break;
Packit 2997f0
                        }
Packit 2997f0
                        app_hdrs = va_arg(ap, rd_kafka_headers_t *);
Packit 2997f0
                        break;
Packit 2997f0
Packit 2997f0
                default:
Packit 2997f0
                        err = RD_KAFKA_RESP_ERR__INVALID_ARG;
Packit 2997f0
                        break;
Packit 2997f0
                }
Packit 2997f0
        }
Packit 2997f0
Packit 2997f0
        va_end(ap);
Packit 2997f0
Packit 2997f0
        if (unlikely(!s_rkt))
Packit 2997f0
                return RD_KAFKA_RESP_ERR__INVALID_ARG;
Packit 2997f0
Packit 2997f0
        rkt = rd_kafka_topic_s2i(s_rkt);
Packit 2997f0
Packit 2997f0
        if (likely(!err))
Packit 2997f0
                rkm = rd_kafka_msg_new0(rkt,
Packit 2997f0
                                        rkm->rkm_partition,
Packit 2997f0
                                        rkm->rkm_flags,
Packit 2997f0
                                        rkm->rkm_payload, rkm->rkm_len,
Packit 2997f0
                                        rkm->rkm_key, rkm->rkm_key_len,
Packit 2997f0
                                        rkm->rkm_opaque,
Packit 2997f0
                                        &err, NULL,
Packit 2997f0
                                        app_hdrs ? app_hdrs : hdrs,
Packit 2997f0
                                        rkm->rkm_timestamp,
Packit 2997f0
                                        rd_clock());
Packit 2997f0
Packit 2997f0
        if (unlikely(err)) {
Packit 2997f0
                rd_kafka_topic_destroy0(s_rkt);
Packit 2997f0
                if (hdrs)
Packit 2997f0
                        rd_kafka_headers_destroy(hdrs);
Packit 2997f0
                return err;
Packit 2997f0
        }
Packit 2997f0
Packit 2997f0
        /* Partition the message */
Packit 2997f0
        err = rd_kafka_msg_partitioner(rkt, rkm, 1);
Packit 2997f0
        if (unlikely(err)) {
Packit 2997f0
                /* Handle partitioner failures: it only fails when
Packit 2997f0
                 * the application attempts to force a destination
Packit 2997f0
                 * partition that does not exist in the cluster. */
Packit 2997f0
Packit 2997f0
                /* Interceptors: Unroll on_send by on_ack.. */
Packit 2997f0
                rkm->rkm_err = err;
Packit 2997f0
                rd_kafka_interceptors_on_acknowledgement(rk,
Packit 2997f0
                                                         &rkm->rkm_rkmessage);
Packit 2997f0
Packit 2997f0
                /* Note we must clear the RD_KAFKA_MSG_F_FREE
Packit 2997f0
                 * flag since our contract says we don't free the payload on
Packit 2997f0
                 * failure. */
Packit 2997f0
                rkm->rkm_flags &= ~RD_KAFKA_MSG_F_FREE;
Packit 2997f0
Packit 2997f0
                /* Deassociate application owned headers from message
Packit 2997f0
                 * since headers remain in application ownership
Packit 2997f0
                 * when producev() fails */
Packit 2997f0
                if (app_hdrs && app_hdrs == rkm->rkm_headers)
Packit 2997f0
                        rkm->rkm_headers = NULL;
Packit 2997f0
Packit 2997f0
                rd_kafka_msg_destroy(rk, rkm);
Packit 2997f0
        }
Packit 2997f0
Packit 2997f0
        rd_kafka_topic_destroy0(s_rkt);
Packit 2997f0
Packit 2997f0
        return err;
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
/**
Packit 2997f0
 * Produce a batch of messages.
Packit 2997f0
 * Returns the number of messages succesfully queued for producing.
Packit 2997f0
 * Each message's .err will be set accordingly.
Packit 2997f0
 */
Packit 2997f0
int rd_kafka_produce_batch (rd_kafka_topic_t *app_rkt, int32_t partition,
Packit 2997f0
                            int msgflags,
Packit 2997f0
                            rd_kafka_message_t *rkmessages, int message_cnt) {
Packit 2997f0
        rd_kafka_msgq_t tmpq = RD_KAFKA_MSGQ_INITIALIZER(tmpq);
Packit 2997f0
        int i;
Packit 2997f0
	int64_t utc_now = rd_uclock() / 1000;
Packit 2997f0
        rd_ts_t now = rd_clock();
Packit 2997f0
        int good = 0;
Packit 2997f0
        int multiple_partitions = (partition == RD_KAFKA_PARTITION_UA ||
Packit 2997f0
                                   (msgflags & RD_KAFKA_MSG_F_PARTITION));
Packit 2997f0
        rd_kafka_resp_err_t all_err = 0;
Packit 2997f0
        rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt);
Packit 2997f0
        rd_kafka_toppar_t *rktp = NULL;
Packit 2997f0
        shptr_rd_kafka_toppar_t *s_rktp = NULL;
Packit 2997f0
Packit 2997f0
        /* For multiple partitions; hold lock for entire run,
Packit 2997f0
         * for one partition: only acquire for now. */
Packit 2997f0
        rd_kafka_topic_rdlock(rkt);
Packit 2997f0
        if (!multiple_partitions) {
Packit 2997f0
                s_rktp = rd_kafka_toppar_get_avail(rkt, partition,
Packit 2997f0
                                                   1/*ua on miss*/, &all_err);
Packit 2997f0
                rktp = rd_kafka_toppar_s2i(s_rktp);
Packit 2997f0
                rd_kafka_topic_rdunlock(rkt);
Packit 2997f0
        } else {
Packit 2997f0
                /* Indicate to lower-level msg_new..() that rkt is locked
Packit 2997f0
                 * so that they may unlock it momentarily if blocking. */
Packit 2997f0
                msgflags |= RD_KAFKA_MSG_F_RKT_RDLOCKED;
Packit 2997f0
        }
Packit 2997f0
Packit 2997f0
        for (i = 0 ; i < message_cnt ; i++) {
Packit 2997f0
                rd_kafka_msg_t *rkm;
Packit 2997f0
Packit 2997f0
                /* Propagate error for all messages. */
Packit 2997f0
                if (unlikely(all_err)) {
Packit 2997f0
                        rkmessages[i].err = all_err;
Packit 2997f0
                        continue;
Packit 2997f0
                }
Packit 2997f0
Packit 2997f0
                /* Create message */
Packit 2997f0
                rkm = rd_kafka_msg_new0(rkt,
Packit 2997f0
                                        (msgflags & RD_KAFKA_MSG_F_PARTITION) ?
Packit 2997f0
                                        rkmessages[i].partition : partition,
Packit 2997f0
                                        msgflags,
Packit 2997f0
                                        rkmessages[i].payload,
Packit 2997f0
                                        rkmessages[i].len,
Packit 2997f0
                                        rkmessages[i].key,
Packit 2997f0
                                        rkmessages[i].key_len,
Packit 2997f0
                                        rkmessages[i]._private,
Packit 2997f0
                                        &rkmessages[i].err, NULL,
Packit 2997f0
					NULL, utc_now, now);
Packit 2997f0
                if (unlikely(!rkm)) {
Packit 2997f0
			if (rkmessages[i].err == RD_KAFKA_RESP_ERR__QUEUE_FULL)
Packit 2997f0
				all_err = rkmessages[i].err;
Packit 2997f0
                        continue;
Packit 2997f0
		}
Packit 2997f0
Packit 2997f0
                /* Three cases here:
Packit 2997f0
                 *  partition==UA:            run the partitioner (slow)
Packit 2997f0
                 *  RD_KAFKA_MSG_F_PARTITION: produce message to specified
Packit 2997f0
                 *                            partition
Packit 2997f0
                 *  fixed partition:          simply concatenate the queue
Packit 2997f0
                 *                            to partit */
Packit 2997f0
                if (multiple_partitions) {
Packit 2997f0
                        if (rkm->rkm_partition == RD_KAFKA_PARTITION_UA) {
Packit 2997f0
                                /* Partition the message */
Packit 2997f0
                                rkmessages[i].err =
Packit 2997f0
                                        rd_kafka_msg_partitioner(
Packit 2997f0
                                                rkt, rkm, 0/*already locked*/);
Packit 2997f0
                        } else {
Packit 2997f0
                                if (s_rktp == NULL ||
Packit 2997f0
                                    rkm->rkm_partition !=
Packit 2997f0
                                    rd_kafka_toppar_s2i(s_rktp)->
Packit 2997f0
                                    rktp_partition) {
Packit 2997f0
                                        if (s_rktp != NULL)
Packit 2997f0
                                                rd_kafka_toppar_destroy(s_rktp);
Packit 2997f0
                                        s_rktp = rd_kafka_toppar_get_avail(
Packit 2997f0
                                                rkt, rkm->rkm_partition,
Packit 2997f0
                                                1/*ua on miss*/, &all_err);
Packit 2997f0
                                }
Packit 2997f0
                                rktp = rd_kafka_toppar_s2i(s_rktp);
Packit 2997f0
                                rd_kafka_toppar_enq_msg(rktp, rkm);
Packit 2997f0
                        }
Packit 2997f0
Packit 2997f0
                        if (unlikely(rkmessages[i].err)) {
Packit 2997f0
                                /* Interceptors: Unroll on_send by on_ack.. */
Packit 2997f0
                                rd_kafka_interceptors_on_acknowledgement(
Packit 2997f0
                                        rkt->rkt_rk, &rkmessages[i]);
Packit 2997f0
Packit 2997f0
                                rd_kafka_msg_destroy(rkt->rkt_rk, rkm);
Packit 2997f0
                                continue;
Packit 2997f0
                        }
Packit 2997f0
Packit 2997f0
Packit 2997f0
                } else {
Packit 2997f0
                        /* Single destination partition. */
Packit 2997f0
                        rd_kafka_toppar_enq_msg(rktp, rkm);
Packit 2997f0
                }
Packit 2997f0
Packit 2997f0
                rkmessages[i].err = RD_KAFKA_RESP_ERR_NO_ERROR;
Packit 2997f0
                good++;
Packit 2997f0
        }
Packit 2997f0
Packit 2997f0
        if (multiple_partitions)
Packit 2997f0
                rd_kafka_topic_rdunlock(rkt);
Packit 2997f0
        if (s_rktp != NULL)
Packit 2997f0
                rd_kafka_toppar_destroy(s_rktp);
Packit 2997f0
Packit 2997f0
        return good;
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
/**
Packit 2997f0
 * Scan 'rkmq' for messages that have timed out and remove them from
Packit 2997f0
 * 'rkmq' and add to 'timedout'.
Packit 2997f0
 *
Packit 2997f0
 * Returns the number of messages timed out.
Packit 2997f0
 */
Packit 2997f0
int rd_kafka_msgq_age_scan (rd_kafka_msgq_t *rkmq,
Packit 2997f0
			    rd_kafka_msgq_t *timedout,
Packit 2997f0
			    rd_ts_t now) {
Packit 2997f0
	rd_kafka_msg_t *rkm, *tmp;
Packit 2997f0
	int cnt = timedout->rkmq_msg_cnt;
Packit 2997f0
Packit 2997f0
	/* Assume messages are added in time sequencial order */
Packit 2997f0
	TAILQ_FOREACH_SAFE(rkm, &rkmq->rkmq_msgs, rkm_link, tmp) {
Packit 2997f0
                /* FIXME: this is no longer true */
Packit 2997f0
		if (likely(rkm->rkm_ts_timeout > now))
Packit 2997f0
			break;
Packit 2997f0
Packit 2997f0
		rd_kafka_msgq_deq(rkmq, rkm, 1);
Packit 2997f0
		rd_kafka_msgq_enq(timedout, rkm);
Packit 2997f0
	}
Packit 2997f0
Packit 2997f0
	return timedout->rkmq_msg_cnt - cnt;
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
Packit 2997f0
static RD_INLINE int
Packit 2997f0
rd_kafka_msgq_enq_sorted0 (rd_kafka_msgq_t *rkmq,
Packit 2997f0
                           rd_kafka_msg_t *rkm,
Packit 2997f0
                           int (*order_cmp) (const void *, const void *)) {
Packit 2997f0
        TAILQ_INSERT_SORTED(&rkmq->rkmq_msgs, rkm, rd_kafka_msg_t *,
Packit 2997f0
                            rkm_link, order_cmp);
Packit 2997f0
        rkmq->rkmq_msg_bytes += rkm->rkm_len+rkm->rkm_key_len;
Packit 2997f0
        return ++rkmq->rkmq_msg_cnt;
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
int rd_kafka_msgq_enq_sorted (const rd_kafka_itopic_t *rkt,
Packit 2997f0
                              rd_kafka_msgq_t *rkmq,
Packit 2997f0
                              rd_kafka_msg_t *rkm) {
Packit 2997f0
        rd_dassert(rkm->rkm_u.producer.msgseq != 0);
Packit 2997f0
        return rd_kafka_msgq_enq_sorted0(rkmq, rkm,
Packit 2997f0
                                         rkt->rkt_conf.msg_order_cmp);
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
/**
Packit 2997f0
 * @brief Find the insert position (i.e., the previous element)
Packit 2997f0
 *        for message \p rkm.
Packit 2997f0
 *
Packit 2997f0
 * @returns the insert position element, or NULL if \p rkm should be
Packit 2997f0
 *          added at head of queue.
Packit 2997f0
 */
Packit 2997f0
rd_kafka_msg_t *rd_kafka_msgq_find_pos (const rd_kafka_msgq_t *rkmq,
Packit 2997f0
                                        const rd_kafka_msg_t *rkm,
Packit 2997f0
                                        int (*cmp) (const void *,
Packit 2997f0
                                                    const void *)) {
Packit 2997f0
        const rd_kafka_msg_t *curr, *last = NULL;
Packit 2997f0
Packit 2997f0
        TAILQ_FOREACH(curr, &rkmq->rkmq_msgs, rkm_link) {
Packit 2997f0
                if (cmp(rkm, curr) < 0)
Packit 2997f0
                        return (rd_kafka_msg_t *)last;
Packit 2997f0
                last = curr;
Packit 2997f0
        }
Packit 2997f0
Packit 2997f0
        return (rd_kafka_msg_t *)last;
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
Packit 2997f0
Packit 2997f0
int32_t rd_kafka_msg_partitioner_random (const rd_kafka_topic_t *rkt,
Packit 2997f0
					 const void *key, size_t keylen,
Packit 2997f0
					 int32_t partition_cnt,
Packit 2997f0
					 void *rkt_opaque,
Packit 2997f0
					 void *msg_opaque) {
Packit 2997f0
	int32_t p = rd_jitter(0, partition_cnt-1);
Packit 2997f0
	if (unlikely(!rd_kafka_topic_partition_available(rkt, p)))
Packit 2997f0
		return rd_jitter(0, partition_cnt-1);
Packit 2997f0
	else
Packit 2997f0
		return p;
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
int32_t rd_kafka_msg_partitioner_consistent (const rd_kafka_topic_t *rkt,
Packit 2997f0
                                             const void *key, size_t keylen,
Packit 2997f0
                                             int32_t partition_cnt,
Packit 2997f0
                                             void *rkt_opaque,
Packit 2997f0
                                             void *msg_opaque) {
Packit 2997f0
    return rd_crc32(key, keylen) % partition_cnt;
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
int32_t rd_kafka_msg_partitioner_consistent_random (const rd_kafka_topic_t *rkt,
Packit 2997f0
                                             const void *key, size_t keylen,
Packit 2997f0
                                             int32_t partition_cnt,
Packit 2997f0
                                             void *rkt_opaque,
Packit 2997f0
                                             void *msg_opaque) {
Packit 2997f0
    if (keylen == 0)
Packit 2997f0
      return rd_kafka_msg_partitioner_random(rkt,
Packit 2997f0
                                             key,
Packit 2997f0
                                             keylen,
Packit 2997f0
                                             partition_cnt,
Packit 2997f0
                                             rkt_opaque,
Packit 2997f0
                                             msg_opaque);
Packit 2997f0
    else
Packit 2997f0
      return rd_kafka_msg_partitioner_consistent(rkt,
Packit 2997f0
                                                 key,
Packit 2997f0
                                                 keylen,
Packit 2997f0
                                                 partition_cnt,
Packit 2997f0
                                                 rkt_opaque,
Packit 2997f0
                                                 msg_opaque);
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
int32_t
Packit 2997f0
rd_kafka_msg_partitioner_murmur2 (const rd_kafka_topic_t *rkt,
Packit 2997f0
                                  const void *key, size_t keylen,
Packit 2997f0
                                  int32_t partition_cnt,
Packit 2997f0
                                  void *rkt_opaque,
Packit 2997f0
                                  void *msg_opaque) {
Packit 2997f0
        return rd_murmur2(key, keylen) % partition_cnt;
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
int32_t rd_kafka_msg_partitioner_murmur2_random (const rd_kafka_topic_t *rkt,
Packit 2997f0
                                                 const void *key, size_t keylen,
Packit 2997f0
                                                 int32_t partition_cnt,
Packit 2997f0
                                                 void *rkt_opaque,
Packit 2997f0
                                                 void *msg_opaque) {
Packit 2997f0
        if (!key)
Packit 2997f0
                return rd_kafka_msg_partitioner_random(rkt,
Packit 2997f0
                                                       key,
Packit 2997f0
                                                       keylen,
Packit 2997f0
                                                       partition_cnt,
Packit 2997f0
                                                       rkt_opaque,
Packit 2997f0
                                                       msg_opaque);
Packit 2997f0
        else
Packit 2997f0
                return rd_murmur2(key, keylen) % partition_cnt;
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
Packit 2997f0
/**
Packit 2997f0
 * Assigns a message to a topic partition using a partitioner.
Packit 2997f0
 * Returns RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION or .._UNKNOWN_TOPIC if
Packit 2997f0
 * partitioning failed, or 0 on success.
Packit 2997f0
 */
Packit 2997f0
int rd_kafka_msg_partitioner (rd_kafka_itopic_t *rkt, rd_kafka_msg_t *rkm,
Packit 2997f0
			      int do_lock) {
Packit 2997f0
	int32_t partition;
Packit 2997f0
	rd_kafka_toppar_t *rktp_new;
Packit 2997f0
        shptr_rd_kafka_toppar_t *s_rktp_new;
Packit 2997f0
	rd_kafka_resp_err_t err;
Packit 2997f0
Packit 2997f0
	if (do_lock)
Packit 2997f0
		rd_kafka_topic_rdlock(rkt);
Packit 2997f0
Packit 2997f0
        switch (rkt->rkt_state)
Packit 2997f0
        {
Packit 2997f0
        case RD_KAFKA_TOPIC_S_UNKNOWN:
Packit 2997f0
                /* No metadata received from cluster yet.
Packit 2997f0
                 * Put message in UA partition and re-run partitioner when
Packit 2997f0
                 * cluster comes up. */
Packit 2997f0
		partition = RD_KAFKA_PARTITION_UA;
Packit 2997f0
                break;
Packit 2997f0
Packit 2997f0
        case RD_KAFKA_TOPIC_S_NOTEXISTS:
Packit 2997f0
                /* Topic not found in cluster.
Packit 2997f0
                 * Fail message immediately. */
Packit 2997f0
                err = RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC;
Packit 2997f0
		if (do_lock)
Packit 2997f0
			rd_kafka_topic_rdunlock(rkt);
Packit 2997f0
                return err;
Packit 2997f0
Packit 2997f0
        case RD_KAFKA_TOPIC_S_EXISTS:
Packit 2997f0
                /* Topic exists in cluster. */
Packit 2997f0
Packit 2997f0
                /* Topic exists but has no partitions.
Packit 2997f0
                 * This is usually an transient state following the
Packit 2997f0
                 * auto-creation of a topic. */
Packit 2997f0
                if (unlikely(rkt->rkt_partition_cnt == 0)) {
Packit 2997f0
                        partition = RD_KAFKA_PARTITION_UA;
Packit 2997f0
                        break;
Packit 2997f0
                }
Packit 2997f0
Packit 2997f0
                /* Partition not assigned, run partitioner. */
Packit 2997f0
                if (rkm->rkm_partition == RD_KAFKA_PARTITION_UA) {
Packit 2997f0
                        rd_kafka_topic_t *app_rkt;
Packit 2997f0
                        /* Provide a temporary app_rkt instance to protect
Packit 2997f0
                         * from the case where the application decided to
Packit 2997f0
                         * destroy its topic object prior to delivery completion
Packit 2997f0
                         * (issue #502). */
Packit 2997f0
                        app_rkt = rd_kafka_topic_keep_a(rkt);
Packit 2997f0
                        partition = rkt->rkt_conf.
Packit 2997f0
                                partitioner(app_rkt,
Packit 2997f0
                                            rkm->rkm_key,
Packit 2997f0
					    rkm->rkm_key_len,
Packit 2997f0
                                            rkt->rkt_partition_cnt,
Packit 2997f0
                                            rkt->rkt_conf.opaque,
Packit 2997f0
                                            rkm->rkm_opaque);
Packit 2997f0
                        rd_kafka_topic_destroy0(
Packit 2997f0
                                rd_kafka_topic_a2s(app_rkt));
Packit 2997f0
                } else
Packit 2997f0
                        partition = rkm->rkm_partition;
Packit 2997f0
Packit 2997f0
                /* Check that partition exists. */
Packit 2997f0
                if (partition >= rkt->rkt_partition_cnt) {
Packit 2997f0
                        err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
Packit 2997f0
                        if (do_lock)
Packit 2997f0
                                rd_kafka_topic_rdunlock(rkt);
Packit 2997f0
                        return err;
Packit 2997f0
                }
Packit 2997f0
                break;
Packit 2997f0
Packit 2997f0
        default:
Packit 2997f0
                rd_kafka_assert(rkt->rkt_rk, !*"NOTREACHED");
Packit 2997f0
                break;
Packit 2997f0
        }
Packit 2997f0
Packit 2997f0
	/* Get new partition */
Packit 2997f0
	s_rktp_new = rd_kafka_toppar_get(rkt, partition, 0);
Packit 2997f0
Packit 2997f0
	if (unlikely(!s_rktp_new)) {
Packit 2997f0
		/* Unknown topic or partition */
Packit 2997f0
		if (rkt->rkt_state == RD_KAFKA_TOPIC_S_NOTEXISTS)
Packit 2997f0
			err = RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC;
Packit 2997f0
		else
Packit 2997f0
			err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
Packit 2997f0
Packit 2997f0
		if (do_lock)
Packit 2997f0
			rd_kafka_topic_rdunlock(rkt);
Packit 2997f0
Packit 2997f0
		return  err;
Packit 2997f0
	}
Packit 2997f0
Packit 2997f0
        rktp_new = rd_kafka_toppar_s2i(s_rktp_new);
Packit 2997f0
        rd_atomic64_add(&rktp_new->rktp_c.msgs, 1);
Packit 2997f0
Packit 2997f0
        /* Update message partition */
Packit 2997f0
        if (rkm->rkm_partition == RD_KAFKA_PARTITION_UA)
Packit 2997f0
                rkm->rkm_partition = partition;
Packit 2997f0
Packit 2997f0
	/* Partition is available: enqueue msg on partition's queue */
Packit 2997f0
	rd_kafka_toppar_enq_msg(rktp_new, rkm);
Packit 2997f0
	if (do_lock)
Packit 2997f0
		rd_kafka_topic_rdunlock(rkt);
Packit 2997f0
	rd_kafka_toppar_destroy(s_rktp_new); /* from _get() */
Packit 2997f0
	return 0;
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
Packit 2997f0
Packit 2997f0
Packit 2997f0
/**
Packit 2997f0
 * @name Public message type (rd_kafka_message_t)
Packit 2997f0
 */
Packit 2997f0
void rd_kafka_message_destroy (rd_kafka_message_t *rkmessage) {
Packit 2997f0
        rd_kafka_op_t *rko;
Packit 2997f0
Packit 2997f0
        if (likely((rko = (rd_kafka_op_t *)rkmessage->_private) != NULL))
Packit 2997f0
                rd_kafka_op_destroy(rko);
Packit 2997f0
        else {
Packit 2997f0
                rd_kafka_msg_t *rkm = rd_kafka_message2msg(rkmessage);
Packit 2997f0
                rd_kafka_msg_destroy(NULL, rkm);
Packit 2997f0
        }
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
Packit 2997f0
rd_kafka_message_t *rd_kafka_message_new (void) {
Packit 2997f0
        rd_kafka_msg_t *rkm = rd_calloc(1, sizeof(*rkm));
Packit 2997f0
        return (rd_kafka_message_t *)rkm;
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
Packit 2997f0
/**
Packit 2997f0
 * @brief Set up a rkmessage from an rko for passing to the application.
Packit 2997f0
 * @remark Will trigger on_consume() interceptors if any.
Packit 2997f0
 */
Packit 2997f0
static rd_kafka_message_t *
Packit 2997f0
rd_kafka_message_setup (rd_kafka_op_t *rko, rd_kafka_message_t *rkmessage) {
Packit 2997f0
        rd_kafka_itopic_t *rkt;
Packit 2997f0
        rd_kafka_toppar_t *rktp = NULL;
Packit 2997f0
Packit 2997f0
        if (rko->rko_type == RD_KAFKA_OP_DR) {
Packit 2997f0
                rkt = rd_kafka_topic_s2i(rko->rko_u.dr.s_rkt);
Packit 2997f0
        } else {
Packit 2997f0
                if (rko->rko_rktp) {
Packit 2997f0
                        rktp = rd_kafka_toppar_s2i(rko->rko_rktp);
Packit 2997f0
                        rkt = rktp->rktp_rkt;
Packit 2997f0
                } else
Packit 2997f0
                        rkt = NULL;
Packit 2997f0
Packit 2997f0
                rkmessage->_private = rko;
Packit 2997f0
        }
Packit 2997f0
Packit 2997f0
Packit 2997f0
        if (!rkmessage->rkt && rkt)
Packit 2997f0
                rkmessage->rkt = rd_kafka_topic_keep_a(rkt);
Packit 2997f0
Packit 2997f0
        if (rktp)
Packit 2997f0
                rkmessage->partition = rktp->rktp_partition;
Packit 2997f0
Packit 2997f0
        if (!rkmessage->err)
Packit 2997f0
                rkmessage->err = rko->rko_err;
Packit 2997f0
Packit 2997f0
        /* Call on_consume interceptors */
Packit 2997f0
        switch (rko->rko_type)
Packit 2997f0
        {
Packit 2997f0
        case RD_KAFKA_OP_FETCH:
Packit 2997f0
                if (!rkmessage->err && rkt)
Packit 2997f0
                        rd_kafka_interceptors_on_consume(rkt->rkt_rk,
Packit 2997f0
                                                         rkmessage);
Packit 2997f0
                break;
Packit 2997f0
Packit 2997f0
        default:
Packit 2997f0
                break;
Packit 2997f0
        }
Packit 2997f0
Packit 2997f0
        return rkmessage;
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
Packit 2997f0
Packit 2997f0
/**
Packit 2997f0
 * @brief Get rkmessage from rkm (for EVENT_DR)
Packit 2997f0
 * @remark Must only be called just prior to passing a dr to the application.
Packit 2997f0
 */
Packit 2997f0
rd_kafka_message_t *rd_kafka_message_get_from_rkm (rd_kafka_op_t *rko,
Packit 2997f0
                                                   rd_kafka_msg_t *rkm) {
Packit 2997f0
        return rd_kafka_message_setup(rko, &rkm->rkm_rkmessage);
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
/**
Packit 2997f0
 * @brief Convert rko to rkmessage
Packit 2997f0
 * @remark Must only be called just prior to passing a consumed message
Packit 2997f0
 *         or event to the application.
Packit 2997f0
 * @remark Will trigger on_consume() interceptors, if any.
Packit 2997f0
 * @returns a rkmessage (bound to the rko).
Packit 2997f0
 */
Packit 2997f0
rd_kafka_message_t *rd_kafka_message_get (rd_kafka_op_t *rko) {
Packit 2997f0
        rd_kafka_message_t *rkmessage;
Packit 2997f0
Packit 2997f0
        if (!rko)
Packit 2997f0
                return rd_kafka_message_new(); /* empty */
Packit 2997f0
Packit 2997f0
        switch (rko->rko_type)
Packit 2997f0
        {
Packit 2997f0
        case RD_KAFKA_OP_FETCH:
Packit 2997f0
                /* Use embedded rkmessage */
Packit 2997f0
                rkmessage = &rko->rko_u.fetch.rkm.rkm_rkmessage;
Packit 2997f0
                break;
Packit 2997f0
Packit 2997f0
        case RD_KAFKA_OP_ERR:
Packit 2997f0
        case RD_KAFKA_OP_CONSUMER_ERR:
Packit 2997f0
                rkmessage = &rko->rko_u.err.rkm.rkm_rkmessage;
Packit 2997f0
                rkmessage->payload = rko->rko_u.err.errstr;
Packit 2997f0
                rkmessage->offset  = rko->rko_u.err.offset;
Packit 2997f0
                break;
Packit 2997f0
Packit 2997f0
        default:
Packit 2997f0
                rd_kafka_assert(NULL, !*"unhandled optype");
Packit 2997f0
                RD_NOTREACHED();
Packit 2997f0
                return NULL;
Packit 2997f0
        }
Packit 2997f0
Packit 2997f0
        return rd_kafka_message_setup(rko, rkmessage);
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
Packit 2997f0
int64_t rd_kafka_message_timestamp (const rd_kafka_message_t *rkmessage,
Packit 2997f0
                                    rd_kafka_timestamp_type_t *tstype) {
Packit 2997f0
        rd_kafka_msg_t *rkm;
Packit 2997f0
Packit 2997f0
        if (rkmessage->err) {
Packit 2997f0
                if (tstype)
Packit 2997f0
                        *tstype = RD_KAFKA_TIMESTAMP_NOT_AVAILABLE;
Packit 2997f0
                return -1;
Packit 2997f0
        }
Packit 2997f0
Packit 2997f0
        rkm = rd_kafka_message2msg((rd_kafka_message_t *)rkmessage);
Packit 2997f0
Packit 2997f0
        if (tstype)
Packit 2997f0
                *tstype = rkm->rkm_tstype;
Packit 2997f0
Packit 2997f0
        return rkm->rkm_timestamp;
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
Packit 2997f0
int64_t rd_kafka_message_latency (const rd_kafka_message_t *rkmessage) {
Packit 2997f0
        rd_kafka_msg_t *rkm;
Packit 2997f0
Packit 2997f0
        rkm = rd_kafka_message2msg((rd_kafka_message_t *)rkmessage);
Packit 2997f0
Packit 2997f0
        if (unlikely(!rkm->rkm_ts_enq))
Packit 2997f0
                return -1;
Packit 2997f0
Packit 2997f0
        return rd_clock() - rkm->rkm_ts_enq;
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
Packit 2997f0
Packit 2997f0
/**
Packit 2997f0
 * @brief Parse serialized message headers and populate
Packit 2997f0
 *        rkm->rkm_headers (which must be NULL).
Packit 2997f0
 */
Packit 2997f0
static rd_kafka_resp_err_t rd_kafka_msg_headers_parse (rd_kafka_msg_t *rkm) {
Packit 2997f0
        rd_kafka_buf_t *rkbuf;
Packit 2997f0
        int64_t HeaderCount;
Packit 2997f0
        const int log_decode_errors = 0;
Packit 2997f0
        rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR__BAD_MSG;
Packit 2997f0
        int i;
Packit 2997f0
        rd_kafka_headers_t *hdrs = NULL;
Packit 2997f0
Packit 2997f0
        rd_dassert(!rkm->rkm_headers);
Packit 2997f0
Packit 2997f0
        if (RD_KAFKAP_BYTES_LEN(&rkm->rkm_u.consumer.binhdrs) == 0)
Packit 2997f0
                return RD_KAFKA_RESP_ERR__NOENT;
Packit 2997f0
Packit 2997f0
        rkbuf = rd_kafka_buf_new_shadow(rkm->rkm_u.consumer.binhdrs.data,
Packit 2997f0
                                        RD_KAFKAP_BYTES_LEN(&rkm->rkm_u.
Packit 2997f0
                                                            consumer.binhdrs),
Packit 2997f0
                                        NULL);
Packit 2997f0
Packit 2997f0
        rd_kafka_buf_read_varint(rkbuf, &HeaderCount);
Packit 2997f0
Packit 2997f0
        if (HeaderCount <= 0) {
Packit 2997f0
                rd_kafka_buf_destroy(rkbuf);
Packit 2997f0
                return RD_KAFKA_RESP_ERR__NOENT;
Packit 2997f0
        } else if (unlikely(HeaderCount > 100000)) {
Packit 2997f0
                rd_kafka_buf_destroy(rkbuf);
Packit 2997f0
                return RD_KAFKA_RESP_ERR__BAD_MSG;
Packit 2997f0
        }
Packit 2997f0
Packit 2997f0
        hdrs = rd_kafka_headers_new((size_t)HeaderCount);
Packit 2997f0
Packit 2997f0
        for (i = 0 ; (int64_t)i < HeaderCount ; i++) {
Packit 2997f0
                int64_t KeyLen, ValueLen;
Packit 2997f0
                const char *Key, *Value;
Packit 2997f0
Packit 2997f0
                rd_kafka_buf_read_varint(rkbuf, &KeyLen);
Packit 2997f0
                rd_kafka_buf_read_ptr(rkbuf, &Key, (size_t)KeyLen);
Packit 2997f0
Packit 2997f0
                rd_kafka_buf_read_varint(rkbuf, &ValueLen);
Packit 2997f0
                if (unlikely(ValueLen == -1))
Packit 2997f0
                        Value = NULL;
Packit 2997f0
                else
Packit 2997f0
                        rd_kafka_buf_read_ptr(rkbuf, &Value, (size_t)ValueLen);
Packit 2997f0
Packit 2997f0
                rd_kafka_header_add(hdrs, Key, (ssize_t)KeyLen,
Packit 2997f0
                                    Value, (ssize_t)ValueLen);
Packit 2997f0
        }
Packit 2997f0
Packit 2997f0
        rkm->rkm_headers = hdrs;
Packit 2997f0
Packit 2997f0
        rd_kafka_buf_destroy(rkbuf);
Packit 2997f0
        return RD_KAFKA_RESP_ERR_NO_ERROR;
Packit 2997f0
Packit 2997f0
 err_parse:
Packit 2997f0
        err = rkbuf->rkbuf_err;
Packit 2997f0
        rd_kafka_buf_destroy(rkbuf);
Packit 2997f0
        if (hdrs)
Packit 2997f0
                rd_kafka_headers_destroy(hdrs);
Packit 2997f0
        return err;
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
Packit 2997f0
Packit 2997f0
Packit 2997f0
rd_kafka_resp_err_t
Packit 2997f0
rd_kafka_message_headers (const rd_kafka_message_t *rkmessage,
Packit 2997f0
                          rd_kafka_headers_t **hdrsp) {
Packit 2997f0
        rd_kafka_msg_t *rkm;
Packit 2997f0
        rd_kafka_resp_err_t err;
Packit 2997f0
Packit 2997f0
        rkm = rd_kafka_message2msg((rd_kafka_message_t *)rkmessage);
Packit 2997f0
Packit 2997f0
        if (rkm->rkm_headers) {
Packit 2997f0
                *hdrsp = rkm->rkm_headers;
Packit 2997f0
                return RD_KAFKA_RESP_ERR_NO_ERROR;
Packit 2997f0
        }
Packit 2997f0
Packit 2997f0
        /* Producer (rkm_headers will be set if there were any headers) */
Packit 2997f0
        if (rkm->rkm_flags & RD_KAFKA_MSG_F_PRODUCER)
Packit 2997f0
                return RD_KAFKA_RESP_ERR__NOENT;
Packit 2997f0
Packit 2997f0
        /* Consumer */
Packit 2997f0
Packit 2997f0
        /* No previously parsed headers, check if the underlying
Packit 2997f0
         * protocol message had headers and if so, parse them. */
Packit 2997f0
        if (unlikely(!RD_KAFKAP_BYTES_LEN(&rkm->rkm_u.consumer.binhdrs)))
Packit 2997f0
                return RD_KAFKA_RESP_ERR__NOENT;
Packit 2997f0
Packit 2997f0
        err = rd_kafka_msg_headers_parse(rkm);
Packit 2997f0
        if (unlikely(err))
Packit 2997f0
                return err;
Packit 2997f0
Packit 2997f0
        *hdrsp = rkm->rkm_headers;
Packit 2997f0
        return RD_KAFKA_RESP_ERR_NO_ERROR;
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
Packit 2997f0
rd_kafka_resp_err_t
Packit 2997f0
rd_kafka_message_detach_headers (rd_kafka_message_t *rkmessage,
Packit 2997f0
                                 rd_kafka_headers_t **hdrsp) {
Packit 2997f0
        rd_kafka_msg_t *rkm;
Packit 2997f0
        rd_kafka_resp_err_t err;
Packit 2997f0
Packit 2997f0
        err = rd_kafka_message_headers(rkmessage, hdrsp);
Packit 2997f0
        if (err)
Packit 2997f0
                return err;
Packit 2997f0
Packit 2997f0
        rkm = rd_kafka_message2msg((rd_kafka_message_t *)rkmessage);
Packit 2997f0
        rkm->rkm_headers = NULL;
Packit 2997f0
Packit 2997f0
        return RD_KAFKA_RESP_ERR_NO_ERROR;
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
Packit 2997f0
void rd_kafka_message_set_headers (rd_kafka_message_t *rkmessage,
Packit 2997f0
                                   rd_kafka_headers_t *hdrs) {
Packit 2997f0
        rd_kafka_msg_t *rkm;
Packit 2997f0
Packit 2997f0
        rkm = rd_kafka_message2msg((rd_kafka_message_t *)rkmessage);
Packit 2997f0
Packit 2997f0
        if (rkm->rkm_headers) {
Packit 2997f0
                assert(rkm->rkm_headers != hdrs);
Packit 2997f0
                rd_kafka_headers_destroy(rkm->rkm_headers);
Packit 2997f0
        }
Packit 2997f0
Packit 2997f0
        rkm->rkm_headers = hdrs;
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
Packit 2997f0
Packit 2997f0
void rd_kafka_msgq_dump (FILE *fp, const char *what, rd_kafka_msgq_t *rkmq) {
Packit 2997f0
        rd_kafka_msg_t *rkm;
Packit 2997f0
Packit 2997f0
        fprintf(fp, "%s msgq_dump (%d messages, %"PRIusz" bytes):\n", what,
Packit 2997f0
                rd_kafka_msgq_len(rkmq), rd_kafka_msgq_size(rkmq));
Packit 2997f0
        TAILQ_FOREACH(rkm, &rkmq->rkmq_msgs, rkm_link) {
Packit 2997f0
                fprintf(fp, " [%"PRId32"]@%"PRId64
Packit 2997f0
                        ": rkm msgseq %"PRIu64": \"%.*s\"\n",
Packit 2997f0
                        rkm->rkm_partition, rkm->rkm_offset,
Packit 2997f0
                        rkm->rkm_u.producer.msgseq,
Packit 2997f0
                        (int)rkm->rkm_len, (const char *)rkm->rkm_payload);
Packit 2997f0
        }
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
/**
Packit 2997f0
 * @name Unit tests
Packit 2997f0
 */
Packit 2997f0
Packit 2997f0
/**
Packit 2997f0
 * @brief Unittest: message allocator
Packit 2997f0
 */
Packit 2997f0
static rd_kafka_msg_t *ut_rd_kafka_msg_new (void) {
Packit 2997f0
        rd_kafka_msg_t *rkm;
Packit 2997f0
Packit 2997f0
        rkm = rd_calloc(1, sizeof(*rkm));
Packit 2997f0
        rkm->rkm_flags      = RD_KAFKA_MSG_F_FREE_RKM;
Packit 2997f0
        rkm->rkm_offset     = RD_KAFKA_OFFSET_INVALID;
Packit 2997f0
        rkm->rkm_tstype     = RD_KAFKA_TIMESTAMP_NOT_AVAILABLE;
Packit 2997f0
Packit 2997f0
        return rkm;
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
Packit 2997f0
Packit 2997f0
/**
Packit 2997f0
 * @brief Unittest: destroy all messages in queue
Packit 2997f0
 */
Packit 2997f0
static void ut_rd_kafka_msgq_purge (rd_kafka_msgq_t *rkmq) {
Packit 2997f0
        rd_kafka_msg_t *rkm, *tmp;
Packit 2997f0
Packit 2997f0
        TAILQ_FOREACH_SAFE(rkm, &rkmq->rkmq_msgs, rkm_link, tmp)
Packit 2997f0
                rd_kafka_msg_destroy(NULL, rkm);
Packit 2997f0
Packit 2997f0
Packit 2997f0
        rd_kafka_msgq_init(rkmq);
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
Packit 2997f0
Packit 2997f0
static int ut_verify_msgq_order (const char *what,
Packit 2997f0
                                 const rd_kafka_msgq_t *rkmq,
Packit 2997f0
                                 int first, int last) {
Packit 2997f0
        const rd_kafka_msg_t *rkm;
Packit 2997f0
        uint64_t expected = first;
Packit 2997f0
        int incr = first < last ? +1 : -1;
Packit 2997f0
        int fails = 0;
Packit 2997f0
        int cnt = 0;
Packit 2997f0
Packit 2997f0
        TAILQ_FOREACH(rkm, &rkmq->rkmq_msgs, rkm_link) {
Packit 2997f0
                if (rkm->rkm_u.producer.msgseq != expected) {
Packit 2997f0
                        RD_UT_SAY("%s: expected msgseq %"PRIu64
Packit 2997f0
                                  " not %"PRIu64" at index #%d",
Packit 2997f0
                                  what, expected,
Packit 2997f0
                                  rkm->rkm_u.producer.msgseq, cnt);
Packit 2997f0
                        fails++;
Packit 2997f0
                }
Packit 2997f0
                cnt++;
Packit 2997f0
                expected += incr;
Packit 2997f0
        }
Packit 2997f0
Packit 2997f0
        RD_UT_ASSERT(!fails, "See %d previous failure(s)", fails);
Packit 2997f0
        return fails;
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
/**
Packit 2997f0
 * @brief Verify ordering comparator for message queues.
Packit 2997f0
 */
Packit 2997f0
static int unittest_msgq_order (const char *what, int fifo,
Packit 2997f0
                          int (*cmp) (const void *, const void *)) {
Packit 2997f0
        rd_kafka_msgq_t rkmq = RD_KAFKA_MSGQ_INITIALIZER(rkmq);
Packit 2997f0
        rd_kafka_msg_t *rkm;
Packit 2997f0
        rd_kafka_msgq_t sendq;
Packit 2997f0
        int i;
Packit 2997f0
Packit 2997f0
        RD_UT_SAY("%s: testing in %s mode", what, fifo? "FIFO" : "LIFO");
Packit 2997f0
Packit 2997f0
        for (i = 1 ; i <= 6 ; i++) {
Packit 2997f0
                rkm = ut_rd_kafka_msg_new();
Packit 2997f0
                rkm->rkm_u.producer.msgseq = i;
Packit 2997f0
                rd_kafka_msgq_enq_sorted0(&rkmq, rkm, cmp);
Packit 2997f0
        }
Packit 2997f0
Packit 2997f0
        if (fifo) {
Packit 2997f0
                if (ut_verify_msgq_order("added", &rkmq, 1, 6))
Packit 2997f0
                        return 1;
Packit 2997f0
        } else {
Packit 2997f0
                if (ut_verify_msgq_order("added", &rkmq, 6, 1))
Packit 2997f0
                        return 1;
Packit 2997f0
        }
Packit 2997f0
Packit 2997f0
        /* Move 3 messages to "send" queue which we then re-insert
Packit 2997f0
         * in the original queue (i.e., "retry"). */
Packit 2997f0
        rd_kafka_msgq_init(&sendq);
Packit 2997f0
        while (rd_kafka_msgq_len(&sendq) < 3)
Packit 2997f0
                rd_kafka_msgq_enq(&sendq, rd_kafka_msgq_pop(&rkmq));
Packit 2997f0
Packit 2997f0
        if (fifo) {
Packit 2997f0
                if (ut_verify_msgq_order("send removed", &rkmq, 4, 6))
Packit 2997f0
                        return 1;
Packit 2997f0
Packit 2997f0
                if (ut_verify_msgq_order("sendq", &sendq, 1, 3))
Packit 2997f0
                        return 1;
Packit 2997f0
        } else {
Packit 2997f0
                if (ut_verify_msgq_order("send removed", &rkmq, 3, 1))
Packit 2997f0
                        return 1;
Packit 2997f0
Packit 2997f0
                if (ut_verify_msgq_order("sendq", &sendq, 6, 4))
Packit 2997f0
                        return 1;
Packit 2997f0
        }
Packit 2997f0
Packit 2997f0
        /* Retry the messages, which moves them back to sendq
Packit 2997f0
         * maintaining the original order */
Packit 2997f0
        rd_kafka_retry_msgq(&rkmq, &sendq, 1, 1, 0, cmp);
Packit 2997f0
Packit 2997f0
        RD_UT_ASSERT(rd_kafka_msgq_len(&sendq) == 0,
Packit 2997f0
                     "sendq FIFO should be empty, not contain %d messages",
Packit 2997f0
                     rd_kafka_msgq_len(&sendq));
Packit 2997f0
Packit 2997f0
        if (fifo) {
Packit 2997f0
                if (ut_verify_msgq_order("readded", &rkmq, 1, 6))
Packit 2997f0
                        return 1;
Packit 2997f0
        } else {
Packit 2997f0
                if (ut_verify_msgq_order("readded", &rkmq, 6, 1))
Packit 2997f0
                        return 1;
Packit 2997f0
        }
Packit 2997f0
Packit 2997f0
        /* Move 4 first messages to to "send" queue, then
Packit 2997f0
         * retry them with max_retries=1 which should now fail for
Packit 2997f0
         * the 3 first messages that were already retried. */
Packit 2997f0
        rd_kafka_msgq_init(&sendq);
Packit 2997f0
        while (rd_kafka_msgq_len(&sendq) < 4)
Packit 2997f0
                rd_kafka_msgq_enq(&sendq, rd_kafka_msgq_pop(&rkmq));
Packit 2997f0
Packit 2997f0
        if (fifo) {
Packit 2997f0
                if (ut_verify_msgq_order("send removed #2", &rkmq, 5, 6))
Packit 2997f0
                        return 1;
Packit 2997f0
Packit 2997f0
                if (ut_verify_msgq_order("sendq #2", &sendq, 1, 4))
Packit 2997f0
                        return 1;
Packit 2997f0
        } else {
Packit 2997f0
                if (ut_verify_msgq_order("send removed #2", &rkmq, 2, 1))
Packit 2997f0
                        return 1;
Packit 2997f0
Packit 2997f0
                if (ut_verify_msgq_order("sendq #2", &sendq, 6, 3))
Packit 2997f0
                        return 1;
Packit 2997f0
        }
Packit 2997f0
Packit 2997f0
        /* Retry the messages, which should now keep the 3 first messages
Packit 2997f0
         * on sendq (no more retries) and just number 4 moved back. */
Packit 2997f0
        rd_kafka_retry_msgq(&rkmq, &sendq, 1, 1, 0, cmp);
Packit 2997f0
Packit 2997f0
        if (fifo) {
Packit 2997f0
                if (ut_verify_msgq_order("readded #2", &rkmq, 4, 6))
Packit 2997f0
                        return 1;
Packit 2997f0
Packit 2997f0
                if (ut_verify_msgq_order("no more retries", &sendq, 1, 3))
Packit 2997f0
                        return 1;
Packit 2997f0
Packit 2997f0
        } else {
Packit 2997f0
                if (ut_verify_msgq_order("readded #2", &rkmq, 3, 1))
Packit 2997f0
                        return 1;
Packit 2997f0
Packit 2997f0
                if (ut_verify_msgq_order("no more retries", &sendq, 6, 4))
Packit 2997f0
                        return 1;
Packit 2997f0
        }
Packit 2997f0
Packit 2997f0
        ut_rd_kafka_msgq_purge(&sendq);
Packit 2997f0
        ut_rd_kafka_msgq_purge(&rkmq);
Packit 2997f0
Packit 2997f0
        return 0;
Packit 2997f0
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
Packit 2997f0
int unittest_msg (void) {
Packit 2997f0
        int fails = 0;
Packit 2997f0
Packit 2997f0
        fails += unittest_msgq_order("FIFO", 1, rd_kafka_msg_cmp_msgseq);
Packit 2997f0
        fails += unittest_msgq_order("LIFO", 0, rd_kafka_msg_cmp_msgseq_lifo);
Packit 2997f0
Packit 2997f0
        return fails;
Packit 2997f0
}