|
Packit |
2997f0 |
/*
|
|
Packit |
2997f0 |
* librdkafka - Apache Kafka C library
|
|
Packit |
2997f0 |
*
|
|
Packit |
2997f0 |
* Copyright (c) 2012,2013 Magnus Edenhill
|
|
Packit |
2997f0 |
* All rights reserved.
|
|
Packit |
2997f0 |
*
|
|
Packit |
2997f0 |
* Redistribution and use in source and binary forms, with or without
|
|
Packit |
2997f0 |
* modification, are permitted provided that the following conditions are met:
|
|
Packit |
2997f0 |
*
|
|
Packit |
2997f0 |
* 1. Redistributions of source code must retain the above copyright notice,
|
|
Packit |
2997f0 |
* this list of conditions and the following disclaimer.
|
|
Packit |
2997f0 |
* 2. Redistributions in binary form must reproduce the above copyright notice,
|
|
Packit |
2997f0 |
* this list of conditions and the following disclaimer in the documentation
|
|
Packit |
2997f0 |
* and/or other materials provided with the distribution.
|
|
Packit |
2997f0 |
*
|
|
Packit |
2997f0 |
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
|
Packit |
2997f0 |
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
|
Packit |
2997f0 |
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
|
Packit |
2997f0 |
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
|
|
Packit |
2997f0 |
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
|
|
Packit |
2997f0 |
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
|
|
Packit |
2997f0 |
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
|
Packit |
2997f0 |
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
|
|
Packit |
2997f0 |
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
|
|
Packit |
2997f0 |
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
|
Packit |
2997f0 |
* POSSIBILITY OF SUCH DAMAGE.
|
|
Packit |
2997f0 |
*/
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
#include "rd.h"
|
|
Packit |
2997f0 |
#include "rdkafka_int.h"
|
|
Packit |
2997f0 |
#include "rdkafka_msg.h"
|
|
Packit |
2997f0 |
#include "rdkafka_topic.h"
|
|
Packit |
2997f0 |
#include "rdkafka_partition.h"
|
|
Packit |
2997f0 |
#include "rdkafka_interceptor.h"
|
|
Packit |
2997f0 |
#include "rdkafka_header.h"
|
|
Packit |
2997f0 |
#include "rdcrc32.h"
|
|
Packit |
2997f0 |
#include "rdmurmur2.h"
|
|
Packit |
2997f0 |
#include "rdrand.h"
|
|
Packit |
2997f0 |
#include "rdtime.h"
|
|
Packit |
2997f0 |
#include "rdsysqueue.h"
|
|
Packit |
2997f0 |
#include "rdunittest.h"
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
#include <stdarg.h>
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
void rd_kafka_msg_destroy (rd_kafka_t *rk, rd_kafka_msg_t *rkm) {
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (rkm->rkm_flags & RD_KAFKA_MSG_F_ACCOUNT) {
|
|
Packit |
2997f0 |
rd_dassert(rk || rkm->rkm_rkmessage.rkt);
|
|
Packit |
2997f0 |
rd_kafka_curr_msgs_sub(
|
|
Packit |
2997f0 |
rk ? rk :
|
|
Packit |
2997f0 |
rd_kafka_topic_a2i(rkm->rkm_rkmessage.rkt)->rkt_rk,
|
|
Packit |
2997f0 |
1, rkm->rkm_len);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (rkm->rkm_headers)
|
|
Packit |
2997f0 |
rd_kafka_headers_destroy(rkm->rkm_headers);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (likely(rkm->rkm_rkmessage.rkt != NULL))
|
|
Packit |
2997f0 |
rd_kafka_topic_destroy0(
|
|
Packit |
2997f0 |
rd_kafka_topic_a2s(rkm->rkm_rkmessage.rkt));
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (rkm->rkm_flags & RD_KAFKA_MSG_F_FREE && rkm->rkm_payload)
|
|
Packit |
2997f0 |
rd_free(rkm->rkm_payload);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (rkm->rkm_flags & RD_KAFKA_MSG_F_FREE_RKM)
|
|
Packit |
2997f0 |
rd_free(rkm);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/**
|
|
Packit |
2997f0 |
* @brief Create a new Producer message, copying the payload as
|
|
Packit |
2997f0 |
* indicated by msgflags.
|
|
Packit |
2997f0 |
*
|
|
Packit |
2997f0 |
* @returns the new message
|
|
Packit |
2997f0 |
*/
|
|
Packit |
2997f0 |
static
|
|
Packit |
2997f0 |
rd_kafka_msg_t *rd_kafka_msg_new00 (rd_kafka_itopic_t *rkt,
|
|
Packit |
2997f0 |
int32_t partition,
|
|
Packit |
2997f0 |
int msgflags,
|
|
Packit |
2997f0 |
char *payload, size_t len,
|
|
Packit |
2997f0 |
const void *key, size_t keylen,
|
|
Packit |
2997f0 |
void *msg_opaque) {
|
|
Packit |
2997f0 |
rd_kafka_msg_t *rkm;
|
|
Packit |
2997f0 |
size_t mlen = sizeof(*rkm);
|
|
Packit |
2997f0 |
char *p;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* If we are to make a copy of the payload, allocate space for it too */
|
|
Packit |
2997f0 |
if (msgflags & RD_KAFKA_MSG_F_COPY) {
|
|
Packit |
2997f0 |
msgflags &= ~RD_KAFKA_MSG_F_FREE;
|
|
Packit |
2997f0 |
mlen += len;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
mlen += keylen;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Note: using rd_malloc here, not rd_calloc, so make sure all fields
|
|
Packit |
2997f0 |
* are properly set up. */
|
|
Packit |
2997f0 |
rkm = rd_malloc(mlen);
|
|
Packit |
2997f0 |
rkm->rkm_err = 0;
|
|
Packit |
2997f0 |
rkm->rkm_flags = (RD_KAFKA_MSG_F_PRODUCER |
|
|
Packit |
2997f0 |
RD_KAFKA_MSG_F_FREE_RKM | msgflags);
|
|
Packit |
2997f0 |
rkm->rkm_len = len;
|
|
Packit |
2997f0 |
rkm->rkm_opaque = msg_opaque;
|
|
Packit |
2997f0 |
rkm->rkm_rkmessage.rkt = rd_kafka_topic_keep_a(rkt);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
rkm->rkm_partition = partition;
|
|
Packit |
2997f0 |
rkm->rkm_offset = RD_KAFKA_OFFSET_INVALID;
|
|
Packit |
2997f0 |
rkm->rkm_timestamp = 0;
|
|
Packit |
2997f0 |
rkm->rkm_tstype = RD_KAFKA_TIMESTAMP_NOT_AVAILABLE;
|
|
Packit |
2997f0 |
rkm->rkm_headers = NULL;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
p = (char *)(rkm+1);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (payload && msgflags & RD_KAFKA_MSG_F_COPY) {
|
|
Packit |
2997f0 |
/* Copy payload to space following the ..msg_t */
|
|
Packit |
2997f0 |
rkm->rkm_payload = p;
|
|
Packit |
2997f0 |
memcpy(rkm->rkm_payload, payload, len);
|
|
Packit |
2997f0 |
p += len;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
} else {
|
|
Packit |
2997f0 |
/* Just point to the provided payload. */
|
|
Packit |
2997f0 |
rkm->rkm_payload = payload;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (key) {
|
|
Packit |
2997f0 |
rkm->rkm_key = p;
|
|
Packit |
2997f0 |
rkm->rkm_key_len = keylen;
|
|
Packit |
2997f0 |
memcpy(rkm->rkm_key, key, keylen);
|
|
Packit |
2997f0 |
} else {
|
|
Packit |
2997f0 |
rkm->rkm_key = NULL;
|
|
Packit |
2997f0 |
rkm->rkm_key_len = 0;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
return rkm;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/**
|
|
Packit |
2997f0 |
* @brief Create a new Producer message.
|
|
Packit |
2997f0 |
*
|
|
Packit |
2997f0 |
* @remark Must only be used by producer code.
|
|
Packit |
2997f0 |
*
|
|
Packit |
2997f0 |
* Returns 0 on success or -1 on error.
|
|
Packit |
2997f0 |
* Both errno and 'errp' are set appropriately.
|
|
Packit |
2997f0 |
*/
|
|
Packit |
2997f0 |
static rd_kafka_msg_t *rd_kafka_msg_new0 (rd_kafka_itopic_t *rkt,
|
|
Packit |
2997f0 |
int32_t force_partition,
|
|
Packit |
2997f0 |
int msgflags,
|
|
Packit |
2997f0 |
char *payload, size_t len,
|
|
Packit |
2997f0 |
const void *key, size_t keylen,
|
|
Packit |
2997f0 |
void *msg_opaque,
|
|
Packit |
2997f0 |
rd_kafka_resp_err_t *errp,
|
|
Packit |
2997f0 |
int *errnop,
|
|
Packit |
2997f0 |
rd_kafka_headers_t *hdrs,
|
|
Packit |
2997f0 |
int64_t timestamp,
|
|
Packit |
2997f0 |
rd_ts_t now) {
|
|
Packit |
2997f0 |
rd_kafka_msg_t *rkm;
|
|
Packit |
2997f0 |
size_t hdrs_size = 0;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (unlikely(!payload))
|
|
Packit |
2997f0 |
len = 0;
|
|
Packit |
2997f0 |
if (!key)
|
|
Packit |
2997f0 |
keylen = 0;
|
|
Packit |
2997f0 |
if (hdrs)
|
|
Packit |
2997f0 |
hdrs_size = rd_kafka_headers_serialized_size(hdrs);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (unlikely(len + keylen + hdrs_size >
|
|
Packit |
2997f0 |
(size_t)rkt->rkt_rk->rk_conf.max_msg_size ||
|
|
Packit |
2997f0 |
keylen > INT32_MAX)) {
|
|
Packit |
2997f0 |
*errp = RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE;
|
|
Packit |
2997f0 |
if (errnop)
|
|
Packit |
2997f0 |
*errnop = EMSGSIZE;
|
|
Packit |
2997f0 |
return NULL;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (msgflags & RD_KAFKA_MSG_F_BLOCK)
|
|
Packit |
2997f0 |
*errp = rd_kafka_curr_msgs_add(
|
|
Packit |
2997f0 |
rkt->rkt_rk, 1, len, 1/*block*/,
|
|
Packit |
2997f0 |
(msgflags & RD_KAFKA_MSG_F_RKT_RDLOCKED) ?
|
|
Packit |
2997f0 |
&rkt->rkt_lock : NULL);
|
|
Packit |
2997f0 |
else
|
|
Packit |
2997f0 |
*errp = rd_kafka_curr_msgs_add(rkt->rkt_rk, 1, len, 0, NULL);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (unlikely(*errp)) {
|
|
Packit |
2997f0 |
if (errnop)
|
|
Packit |
2997f0 |
*errnop = ENOBUFS;
|
|
Packit |
2997f0 |
return NULL;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
rkm = rd_kafka_msg_new00(rkt, force_partition,
|
|
Packit |
2997f0 |
msgflags|RD_KAFKA_MSG_F_ACCOUNT /* curr_msgs_add() */,
|
|
Packit |
2997f0 |
payload, len, key, keylen, msg_opaque);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
memset(&rkm->rkm_u.producer, 0, sizeof(rkm->rkm_u.producer));
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (timestamp)
|
|
Packit |
2997f0 |
rkm->rkm_timestamp = timestamp;
|
|
Packit |
2997f0 |
else
|
|
Packit |
2997f0 |
rkm->rkm_timestamp = rd_uclock()/1000;
|
|
Packit |
2997f0 |
rkm->rkm_tstype = RD_KAFKA_TIMESTAMP_CREATE_TIME;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (hdrs) {
|
|
Packit |
2997f0 |
rd_dassert(!rkm->rkm_headers);
|
|
Packit |
2997f0 |
rkm->rkm_headers = hdrs;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
rkm->rkm_ts_enq = now;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (rkt->rkt_conf.message_timeout_ms == 0) {
|
|
Packit |
2997f0 |
rkm->rkm_ts_timeout = INT64_MAX;
|
|
Packit |
2997f0 |
} else {
|
|
Packit |
2997f0 |
rkm->rkm_ts_timeout = now +
|
|
Packit |
2997f0 |
rkt->rkt_conf.message_timeout_ms * 1000;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Call interceptor chain for on_send */
|
|
Packit |
2997f0 |
rd_kafka_interceptors_on_send(rkt->rkt_rk, &rkm->rkm_rkmessage);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
return rkm;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/**
|
|
Packit |
2997f0 |
* @brief Produce: creates a new message, runs the partitioner and enqueues
|
|
Packit |
2997f0 |
* into on the selected partition.
|
|
Packit |
2997f0 |
*
|
|
Packit |
2997f0 |
* @returns 0 on success or -1 on error.
|
|
Packit |
2997f0 |
*
|
|
Packit |
2997f0 |
* If the function returns -1 and RD_KAFKA_MSG_F_FREE was specified, then
|
|
Packit |
2997f0 |
* the memory associated with the payload is still the caller's
|
|
Packit |
2997f0 |
* responsibility.
|
|
Packit |
2997f0 |
*
|
|
Packit |
2997f0 |
* @locks none
|
|
Packit |
2997f0 |
*/
|
|
Packit |
2997f0 |
int rd_kafka_msg_new (rd_kafka_itopic_t *rkt, int32_t force_partition,
|
|
Packit |
2997f0 |
int msgflags,
|
|
Packit |
2997f0 |
char *payload, size_t len,
|
|
Packit |
2997f0 |
const void *key, size_t keylen,
|
|
Packit |
2997f0 |
void *msg_opaque) {
|
|
Packit |
2997f0 |
rd_kafka_msg_t *rkm;
|
|
Packit |
2997f0 |
rd_kafka_resp_err_t err;
|
|
Packit |
2997f0 |
int errnox;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Create message */
|
|
Packit |
2997f0 |
rkm = rd_kafka_msg_new0(rkt, force_partition, msgflags,
|
|
Packit |
2997f0 |
payload, len, key, keylen, msg_opaque,
|
|
Packit |
2997f0 |
&err, &errnox, NULL, 0, rd_clock());
|
|
Packit |
2997f0 |
if (unlikely(!rkm)) {
|
|
Packit |
2997f0 |
/* errno is already set by msg_new() */
|
|
Packit |
2997f0 |
rd_kafka_set_last_error(err, errnox);
|
|
Packit |
2997f0 |
return -1;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Partition the message */
|
|
Packit |
2997f0 |
err = rd_kafka_msg_partitioner(rkt, rkm, 1);
|
|
Packit |
2997f0 |
if (likely(!err)) {
|
|
Packit |
2997f0 |
rd_kafka_set_last_error(0, 0);
|
|
Packit |
2997f0 |
return 0;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Interceptor: unroll failing messages by triggering on_ack.. */
|
|
Packit |
2997f0 |
rkm->rkm_err = err;
|
|
Packit |
2997f0 |
rd_kafka_interceptors_on_acknowledgement(rkt->rkt_rk,
|
|
Packit |
2997f0 |
&rkm->rkm_rkmessage);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Handle partitioner failures: it only fails when the application
|
|
Packit |
2997f0 |
* attempts to force a destination partition that does not exist
|
|
Packit |
2997f0 |
* in the cluster. Note we must clear the RD_KAFKA_MSG_F_FREE
|
|
Packit |
2997f0 |
* flag since our contract says we don't free the payload on
|
|
Packit |
2997f0 |
* failure. */
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
rkm->rkm_flags &= ~RD_KAFKA_MSG_F_FREE;
|
|
Packit |
2997f0 |
rd_kafka_msg_destroy(rkt->rkt_rk, rkm);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Translate error codes to errnos. */
|
|
Packit |
2997f0 |
if (err == RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION)
|
|
Packit |
2997f0 |
rd_kafka_set_last_error(err, ESRCH);
|
|
Packit |
2997f0 |
else if (err == RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC)
|
|
Packit |
2997f0 |
rd_kafka_set_last_error(err, ENOENT);
|
|
Packit |
2997f0 |
else
|
|
Packit |
2997f0 |
rd_kafka_set_last_error(err, EINVAL); /* NOTREACHED */
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
return -1;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
rd_kafka_resp_err_t rd_kafka_producev (rd_kafka_t *rk, ...) {
|
|
Packit |
2997f0 |
va_list ap;
|
|
Packit |
2997f0 |
rd_kafka_msg_t s_rkm = {
|
|
Packit |
2997f0 |
/* Message defaults */
|
|
Packit |
2997f0 |
.rkm_partition = RD_KAFKA_PARTITION_UA,
|
|
Packit |
2997f0 |
.rkm_timestamp = 0, /* current time */
|
|
Packit |
2997f0 |
};
|
|
Packit |
2997f0 |
rd_kafka_msg_t *rkm = &s_rkm;
|
|
Packit |
2997f0 |
rd_kafka_vtype_t vtype;
|
|
Packit |
2997f0 |
rd_kafka_topic_t *app_rkt;
|
|
Packit |
2997f0 |
shptr_rd_kafka_itopic_t *s_rkt = NULL;
|
|
Packit |
2997f0 |
rd_kafka_itopic_t *rkt;
|
|
Packit |
2997f0 |
rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
|
|
Packit |
2997f0 |
rd_kafka_headers_t *hdrs = NULL;
|
|
Packit |
2997f0 |
rd_kafka_headers_t *app_hdrs = NULL; /* App-provided headers list */
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
va_start(ap, rk);
|
|
Packit |
2997f0 |
while (!err &&
|
|
Packit |
2997f0 |
(vtype = va_arg(ap, rd_kafka_vtype_t)) != RD_KAFKA_VTYPE_END) {
|
|
Packit |
2997f0 |
switch (vtype)
|
|
Packit |
2997f0 |
{
|
|
Packit |
2997f0 |
case RD_KAFKA_VTYPE_TOPIC:
|
|
Packit |
2997f0 |
s_rkt = rd_kafka_topic_new0(rk,
|
|
Packit |
2997f0 |
va_arg(ap, const char *),
|
|
Packit |
2997f0 |
NULL, NULL, 1);
|
|
Packit |
2997f0 |
break;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
case RD_KAFKA_VTYPE_RKT:
|
|
Packit |
2997f0 |
app_rkt = va_arg(ap, rd_kafka_topic_t *);
|
|
Packit |
2997f0 |
s_rkt = rd_kafka_topic_keep(
|
|
Packit |
2997f0 |
rd_kafka_topic_a2i(app_rkt));
|
|
Packit |
2997f0 |
break;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
case RD_KAFKA_VTYPE_PARTITION:
|
|
Packit |
2997f0 |
rkm->rkm_partition = va_arg(ap, int32_t);
|
|
Packit |
2997f0 |
break;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
case RD_KAFKA_VTYPE_VALUE:
|
|
Packit |
2997f0 |
rkm->rkm_payload = va_arg(ap, void *);
|
|
Packit |
2997f0 |
rkm->rkm_len = va_arg(ap, size_t);
|
|
Packit |
2997f0 |
break;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
case RD_KAFKA_VTYPE_KEY:
|
|
Packit |
2997f0 |
rkm->rkm_key = va_arg(ap, void *);
|
|
Packit |
2997f0 |
rkm->rkm_key_len = va_arg(ap, size_t);
|
|
Packit |
2997f0 |
break;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
case RD_KAFKA_VTYPE_OPAQUE:
|
|
Packit |
2997f0 |
rkm->rkm_opaque = va_arg(ap, void *);
|
|
Packit |
2997f0 |
break;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
case RD_KAFKA_VTYPE_MSGFLAGS:
|
|
Packit |
2997f0 |
rkm->rkm_flags = va_arg(ap, int);
|
|
Packit |
2997f0 |
break;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
case RD_KAFKA_VTYPE_TIMESTAMP:
|
|
Packit |
2997f0 |
rkm->rkm_timestamp = va_arg(ap, int64_t);
|
|
Packit |
2997f0 |
break;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
case RD_KAFKA_VTYPE_HEADER:
|
|
Packit |
2997f0 |
{
|
|
Packit |
2997f0 |
const char *name;
|
|
Packit |
2997f0 |
const void *value;
|
|
Packit |
2997f0 |
ssize_t size;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (unlikely(app_hdrs != NULL)) {
|
|
Packit |
2997f0 |
err = RD_KAFKA_RESP_ERR__CONFLICT;
|
|
Packit |
2997f0 |
break;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (unlikely(!hdrs))
|
|
Packit |
2997f0 |
hdrs = rd_kafka_headers_new(8);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
name = va_arg(ap, const char *);
|
|
Packit |
2997f0 |
value = va_arg(ap, const void *);
|
|
Packit |
2997f0 |
size = va_arg(ap, ssize_t);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
err = rd_kafka_header_add(hdrs, name, -1, value, size);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
break;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
case RD_KAFKA_VTYPE_HEADERS:
|
|
Packit |
2997f0 |
if (unlikely(hdrs != NULL)) {
|
|
Packit |
2997f0 |
err = RD_KAFKA_RESP_ERR__CONFLICT;
|
|
Packit |
2997f0 |
break;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
app_hdrs = va_arg(ap, rd_kafka_headers_t *);
|
|
Packit |
2997f0 |
break;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
default:
|
|
Packit |
2997f0 |
err = RD_KAFKA_RESP_ERR__INVALID_ARG;
|
|
Packit |
2997f0 |
break;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
va_end(ap);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (unlikely(!s_rkt))
|
|
Packit |
2997f0 |
return RD_KAFKA_RESP_ERR__INVALID_ARG;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
rkt = rd_kafka_topic_s2i(s_rkt);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (likely(!err))
|
|
Packit |
2997f0 |
rkm = rd_kafka_msg_new0(rkt,
|
|
Packit |
2997f0 |
rkm->rkm_partition,
|
|
Packit |
2997f0 |
rkm->rkm_flags,
|
|
Packit |
2997f0 |
rkm->rkm_payload, rkm->rkm_len,
|
|
Packit |
2997f0 |
rkm->rkm_key, rkm->rkm_key_len,
|
|
Packit |
2997f0 |
rkm->rkm_opaque,
|
|
Packit |
2997f0 |
&err, NULL,
|
|
Packit |
2997f0 |
app_hdrs ? app_hdrs : hdrs,
|
|
Packit |
2997f0 |
rkm->rkm_timestamp,
|
|
Packit |
2997f0 |
rd_clock());
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (unlikely(err)) {
|
|
Packit |
2997f0 |
rd_kafka_topic_destroy0(s_rkt);
|
|
Packit |
2997f0 |
if (hdrs)
|
|
Packit |
2997f0 |
rd_kafka_headers_destroy(hdrs);
|
|
Packit |
2997f0 |
return err;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Partition the message */
|
|
Packit |
2997f0 |
err = rd_kafka_msg_partitioner(rkt, rkm, 1);
|
|
Packit |
2997f0 |
if (unlikely(err)) {
|
|
Packit |
2997f0 |
/* Handle partitioner failures: it only fails when
|
|
Packit |
2997f0 |
* the application attempts to force a destination
|
|
Packit |
2997f0 |
* partition that does not exist in the cluster. */
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Interceptors: Unroll on_send by on_ack.. */
|
|
Packit |
2997f0 |
rkm->rkm_err = err;
|
|
Packit |
2997f0 |
rd_kafka_interceptors_on_acknowledgement(rk,
|
|
Packit |
2997f0 |
&rkm->rkm_rkmessage);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Note we must clear the RD_KAFKA_MSG_F_FREE
|
|
Packit |
2997f0 |
* flag since our contract says we don't free the payload on
|
|
Packit |
2997f0 |
* failure. */
|
|
Packit |
2997f0 |
rkm->rkm_flags &= ~RD_KAFKA_MSG_F_FREE;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Deassociate application owned headers from message
|
|
Packit |
2997f0 |
* since headers remain in application ownership
|
|
Packit |
2997f0 |
* when producev() fails */
|
|
Packit |
2997f0 |
if (app_hdrs && app_hdrs == rkm->rkm_headers)
|
|
Packit |
2997f0 |
rkm->rkm_headers = NULL;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
rd_kafka_msg_destroy(rk, rkm);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
rd_kafka_topic_destroy0(s_rkt);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
return err;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/**
|
|
Packit |
2997f0 |
* Produce a batch of messages.
|
|
Packit |
2997f0 |
* Returns the number of messages succesfully queued for producing.
|
|
Packit |
2997f0 |
* Each message's .err will be set accordingly.
|
|
Packit |
2997f0 |
*/
|
|
Packit |
2997f0 |
int rd_kafka_produce_batch (rd_kafka_topic_t *app_rkt, int32_t partition,
|
|
Packit |
2997f0 |
int msgflags,
|
|
Packit |
2997f0 |
rd_kafka_message_t *rkmessages, int message_cnt) {
|
|
Packit |
2997f0 |
rd_kafka_msgq_t tmpq = RD_KAFKA_MSGQ_INITIALIZER(tmpq);
|
|
Packit |
2997f0 |
int i;
|
|
Packit |
2997f0 |
int64_t utc_now = rd_uclock() / 1000;
|
|
Packit |
2997f0 |
rd_ts_t now = rd_clock();
|
|
Packit |
2997f0 |
int good = 0;
|
|
Packit |
2997f0 |
int multiple_partitions = (partition == RD_KAFKA_PARTITION_UA ||
|
|
Packit |
2997f0 |
(msgflags & RD_KAFKA_MSG_F_PARTITION));
|
|
Packit |
2997f0 |
rd_kafka_resp_err_t all_err = 0;
|
|
Packit |
2997f0 |
rd_kafka_itopic_t *rkt = rd_kafka_topic_a2i(app_rkt);
|
|
Packit |
2997f0 |
rd_kafka_toppar_t *rktp = NULL;
|
|
Packit |
2997f0 |
shptr_rd_kafka_toppar_t *s_rktp = NULL;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* For multiple partitions; hold lock for entire run,
|
|
Packit |
2997f0 |
* for one partition: only acquire for now. */
|
|
Packit |
2997f0 |
rd_kafka_topic_rdlock(rkt);
|
|
Packit |
2997f0 |
if (!multiple_partitions) {
|
|
Packit |
2997f0 |
s_rktp = rd_kafka_toppar_get_avail(rkt, partition,
|
|
Packit |
2997f0 |
1/*ua on miss*/, &all_err);
|
|
Packit |
2997f0 |
rktp = rd_kafka_toppar_s2i(s_rktp);
|
|
Packit |
2997f0 |
rd_kafka_topic_rdunlock(rkt);
|
|
Packit |
2997f0 |
} else {
|
|
Packit |
2997f0 |
/* Indicate to lower-level msg_new..() that rkt is locked
|
|
Packit |
2997f0 |
* so that they may unlock it momentarily if blocking. */
|
|
Packit |
2997f0 |
msgflags |= RD_KAFKA_MSG_F_RKT_RDLOCKED;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
for (i = 0 ; i < message_cnt ; i++) {
|
|
Packit |
2997f0 |
rd_kafka_msg_t *rkm;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Propagate error for all messages. */
|
|
Packit |
2997f0 |
if (unlikely(all_err)) {
|
|
Packit |
2997f0 |
rkmessages[i].err = all_err;
|
|
Packit |
2997f0 |
continue;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Create message */
|
|
Packit |
2997f0 |
rkm = rd_kafka_msg_new0(rkt,
|
|
Packit |
2997f0 |
(msgflags & RD_KAFKA_MSG_F_PARTITION) ?
|
|
Packit |
2997f0 |
rkmessages[i].partition : partition,
|
|
Packit |
2997f0 |
msgflags,
|
|
Packit |
2997f0 |
rkmessages[i].payload,
|
|
Packit |
2997f0 |
rkmessages[i].len,
|
|
Packit |
2997f0 |
rkmessages[i].key,
|
|
Packit |
2997f0 |
rkmessages[i].key_len,
|
|
Packit |
2997f0 |
rkmessages[i]._private,
|
|
Packit |
2997f0 |
&rkmessages[i].err, NULL,
|
|
Packit |
2997f0 |
NULL, utc_now, now);
|
|
Packit |
2997f0 |
if (unlikely(!rkm)) {
|
|
Packit |
2997f0 |
if (rkmessages[i].err == RD_KAFKA_RESP_ERR__QUEUE_FULL)
|
|
Packit |
2997f0 |
all_err = rkmessages[i].err;
|
|
Packit |
2997f0 |
continue;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Three cases here:
|
|
Packit |
2997f0 |
* partition==UA: run the partitioner (slow)
|
|
Packit |
2997f0 |
* RD_KAFKA_MSG_F_PARTITION: produce message to specified
|
|
Packit |
2997f0 |
* partition
|
|
Packit |
2997f0 |
* fixed partition: simply concatenate the queue
|
|
Packit |
2997f0 |
* to partit */
|
|
Packit |
2997f0 |
if (multiple_partitions) {
|
|
Packit |
2997f0 |
if (rkm->rkm_partition == RD_KAFKA_PARTITION_UA) {
|
|
Packit |
2997f0 |
/* Partition the message */
|
|
Packit |
2997f0 |
rkmessages[i].err =
|
|
Packit |
2997f0 |
rd_kafka_msg_partitioner(
|
|
Packit |
2997f0 |
rkt, rkm, 0/*already locked*/);
|
|
Packit |
2997f0 |
} else {
|
|
Packit |
2997f0 |
if (s_rktp == NULL ||
|
|
Packit |
2997f0 |
rkm->rkm_partition !=
|
|
Packit |
2997f0 |
rd_kafka_toppar_s2i(s_rktp)->
|
|
Packit |
2997f0 |
rktp_partition) {
|
|
Packit |
2997f0 |
if (s_rktp != NULL)
|
|
Packit |
2997f0 |
rd_kafka_toppar_destroy(s_rktp);
|
|
Packit |
2997f0 |
s_rktp = rd_kafka_toppar_get_avail(
|
|
Packit |
2997f0 |
rkt, rkm->rkm_partition,
|
|
Packit |
2997f0 |
1/*ua on miss*/, &all_err);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
rktp = rd_kafka_toppar_s2i(s_rktp);
|
|
Packit |
2997f0 |
rd_kafka_toppar_enq_msg(rktp, rkm);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (unlikely(rkmessages[i].err)) {
|
|
Packit |
2997f0 |
/* Interceptors: Unroll on_send by on_ack.. */
|
|
Packit |
2997f0 |
rd_kafka_interceptors_on_acknowledgement(
|
|
Packit |
2997f0 |
rkt->rkt_rk, &rkmessages[i]);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
rd_kafka_msg_destroy(rkt->rkt_rk, rkm);
|
|
Packit |
2997f0 |
continue;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
} else {
|
|
Packit |
2997f0 |
/* Single destination partition. */
|
|
Packit |
2997f0 |
rd_kafka_toppar_enq_msg(rktp, rkm);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
rkmessages[i].err = RD_KAFKA_RESP_ERR_NO_ERROR;
|
|
Packit |
2997f0 |
good++;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (multiple_partitions)
|
|
Packit |
2997f0 |
rd_kafka_topic_rdunlock(rkt);
|
|
Packit |
2997f0 |
if (s_rktp != NULL)
|
|
Packit |
2997f0 |
rd_kafka_toppar_destroy(s_rktp);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
return good;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/**
|
|
Packit |
2997f0 |
* Scan 'rkmq' for messages that have timed out and remove them from
|
|
Packit |
2997f0 |
* 'rkmq' and add to 'timedout'.
|
|
Packit |
2997f0 |
*
|
|
Packit |
2997f0 |
* Returns the number of messages timed out.
|
|
Packit |
2997f0 |
*/
|
|
Packit |
2997f0 |
int rd_kafka_msgq_age_scan (rd_kafka_msgq_t *rkmq,
|
|
Packit |
2997f0 |
rd_kafka_msgq_t *timedout,
|
|
Packit |
2997f0 |
rd_ts_t now) {
|
|
Packit |
2997f0 |
rd_kafka_msg_t *rkm, *tmp;
|
|
Packit |
2997f0 |
int cnt = timedout->rkmq_msg_cnt;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Assume messages are added in time sequencial order */
|
|
Packit |
2997f0 |
TAILQ_FOREACH_SAFE(rkm, &rkmq->rkmq_msgs, rkm_link, tmp) {
|
|
Packit |
2997f0 |
/* FIXME: this is no longer true */
|
|
Packit |
2997f0 |
if (likely(rkm->rkm_ts_timeout > now))
|
|
Packit |
2997f0 |
break;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
rd_kafka_msgq_deq(rkmq, rkm, 1);
|
|
Packit |
2997f0 |
rd_kafka_msgq_enq(timedout, rkm);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
return timedout->rkmq_msg_cnt - cnt;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
static RD_INLINE int
|
|
Packit |
2997f0 |
rd_kafka_msgq_enq_sorted0 (rd_kafka_msgq_t *rkmq,
|
|
Packit |
2997f0 |
rd_kafka_msg_t *rkm,
|
|
Packit |
2997f0 |
int (*order_cmp) (const void *, const void *)) {
|
|
Packit |
2997f0 |
TAILQ_INSERT_SORTED(&rkmq->rkmq_msgs, rkm, rd_kafka_msg_t *,
|
|
Packit |
2997f0 |
rkm_link, order_cmp);
|
|
Packit |
2997f0 |
rkmq->rkmq_msg_bytes += rkm->rkm_len+rkm->rkm_key_len;
|
|
Packit |
2997f0 |
return ++rkmq->rkmq_msg_cnt;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
int rd_kafka_msgq_enq_sorted (const rd_kafka_itopic_t *rkt,
|
|
Packit |
2997f0 |
rd_kafka_msgq_t *rkmq,
|
|
Packit |
2997f0 |
rd_kafka_msg_t *rkm) {
|
|
Packit |
2997f0 |
rd_dassert(rkm->rkm_u.producer.msgseq != 0);
|
|
Packit |
2997f0 |
return rd_kafka_msgq_enq_sorted0(rkmq, rkm,
|
|
Packit |
2997f0 |
rkt->rkt_conf.msg_order_cmp);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/**
|
|
Packit |
2997f0 |
* @brief Find the insert position (i.e., the previous element)
|
|
Packit |
2997f0 |
* for message \p rkm.
|
|
Packit |
2997f0 |
*
|
|
Packit |
2997f0 |
* @returns the insert position element, or NULL if \p rkm should be
|
|
Packit |
2997f0 |
* added at head of queue.
|
|
Packit |
2997f0 |
*/
|
|
Packit |
2997f0 |
rd_kafka_msg_t *rd_kafka_msgq_find_pos (const rd_kafka_msgq_t *rkmq,
|
|
Packit |
2997f0 |
const rd_kafka_msg_t *rkm,
|
|
Packit |
2997f0 |
int (*cmp) (const void *,
|
|
Packit |
2997f0 |
const void *)) {
|
|
Packit |
2997f0 |
const rd_kafka_msg_t *curr, *last = NULL;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
TAILQ_FOREACH(curr, &rkmq->rkmq_msgs, rkm_link) {
|
|
Packit |
2997f0 |
if (cmp(rkm, curr) < 0)
|
|
Packit |
2997f0 |
return (rd_kafka_msg_t *)last;
|
|
Packit |
2997f0 |
last = curr;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
return (rd_kafka_msg_t *)last;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
int32_t rd_kafka_msg_partitioner_random (const rd_kafka_topic_t *rkt,
|
|
Packit |
2997f0 |
const void *key, size_t keylen,
|
|
Packit |
2997f0 |
int32_t partition_cnt,
|
|
Packit |
2997f0 |
void *rkt_opaque,
|
|
Packit |
2997f0 |
void *msg_opaque) {
|
|
Packit |
2997f0 |
int32_t p = rd_jitter(0, partition_cnt-1);
|
|
Packit |
2997f0 |
if (unlikely(!rd_kafka_topic_partition_available(rkt, p)))
|
|
Packit |
2997f0 |
return rd_jitter(0, partition_cnt-1);
|
|
Packit |
2997f0 |
else
|
|
Packit |
2997f0 |
return p;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
int32_t rd_kafka_msg_partitioner_consistent (const rd_kafka_topic_t *rkt,
|
|
Packit |
2997f0 |
const void *key, size_t keylen,
|
|
Packit |
2997f0 |
int32_t partition_cnt,
|
|
Packit |
2997f0 |
void *rkt_opaque,
|
|
Packit |
2997f0 |
void *msg_opaque) {
|
|
Packit |
2997f0 |
return rd_crc32(key, keylen) % partition_cnt;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
int32_t rd_kafka_msg_partitioner_consistent_random (const rd_kafka_topic_t *rkt,
|
|
Packit |
2997f0 |
const void *key, size_t keylen,
|
|
Packit |
2997f0 |
int32_t partition_cnt,
|
|
Packit |
2997f0 |
void *rkt_opaque,
|
|
Packit |
2997f0 |
void *msg_opaque) {
|
|
Packit |
2997f0 |
if (keylen == 0)
|
|
Packit |
2997f0 |
return rd_kafka_msg_partitioner_random(rkt,
|
|
Packit |
2997f0 |
key,
|
|
Packit |
2997f0 |
keylen,
|
|
Packit |
2997f0 |
partition_cnt,
|
|
Packit |
2997f0 |
rkt_opaque,
|
|
Packit |
2997f0 |
msg_opaque);
|
|
Packit |
2997f0 |
else
|
|
Packit |
2997f0 |
return rd_kafka_msg_partitioner_consistent(rkt,
|
|
Packit |
2997f0 |
key,
|
|
Packit |
2997f0 |
keylen,
|
|
Packit |
2997f0 |
partition_cnt,
|
|
Packit |
2997f0 |
rkt_opaque,
|
|
Packit |
2997f0 |
msg_opaque);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
int32_t
|
|
Packit |
2997f0 |
rd_kafka_msg_partitioner_murmur2 (const rd_kafka_topic_t *rkt,
|
|
Packit |
2997f0 |
const void *key, size_t keylen,
|
|
Packit |
2997f0 |
int32_t partition_cnt,
|
|
Packit |
2997f0 |
void *rkt_opaque,
|
|
Packit |
2997f0 |
void *msg_opaque) {
|
|
Packit |
2997f0 |
return rd_murmur2(key, keylen) % partition_cnt;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
int32_t rd_kafka_msg_partitioner_murmur2_random (const rd_kafka_topic_t *rkt,
|
|
Packit |
2997f0 |
const void *key, size_t keylen,
|
|
Packit |
2997f0 |
int32_t partition_cnt,
|
|
Packit |
2997f0 |
void *rkt_opaque,
|
|
Packit |
2997f0 |
void *msg_opaque) {
|
|
Packit |
2997f0 |
if (!key)
|
|
Packit |
2997f0 |
return rd_kafka_msg_partitioner_random(rkt,
|
|
Packit |
2997f0 |
key,
|
|
Packit |
2997f0 |
keylen,
|
|
Packit |
2997f0 |
partition_cnt,
|
|
Packit |
2997f0 |
rkt_opaque,
|
|
Packit |
2997f0 |
msg_opaque);
|
|
Packit |
2997f0 |
else
|
|
Packit |
2997f0 |
return rd_murmur2(key, keylen) % partition_cnt;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/**
|
|
Packit |
2997f0 |
* Assigns a message to a topic partition using a partitioner.
|
|
Packit |
2997f0 |
* Returns RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION or .._UNKNOWN_TOPIC if
|
|
Packit |
2997f0 |
* partitioning failed, or 0 on success.
|
|
Packit |
2997f0 |
*/
|
|
Packit |
2997f0 |
int rd_kafka_msg_partitioner (rd_kafka_itopic_t *rkt, rd_kafka_msg_t *rkm,
|
|
Packit |
2997f0 |
int do_lock) {
|
|
Packit |
2997f0 |
int32_t partition;
|
|
Packit |
2997f0 |
rd_kafka_toppar_t *rktp_new;
|
|
Packit |
2997f0 |
shptr_rd_kafka_toppar_t *s_rktp_new;
|
|
Packit |
2997f0 |
rd_kafka_resp_err_t err;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (do_lock)
|
|
Packit |
2997f0 |
rd_kafka_topic_rdlock(rkt);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
switch (rkt->rkt_state)
|
|
Packit |
2997f0 |
{
|
|
Packit |
2997f0 |
case RD_KAFKA_TOPIC_S_UNKNOWN:
|
|
Packit |
2997f0 |
/* No metadata received from cluster yet.
|
|
Packit |
2997f0 |
* Put message in UA partition and re-run partitioner when
|
|
Packit |
2997f0 |
* cluster comes up. */
|
|
Packit |
2997f0 |
partition = RD_KAFKA_PARTITION_UA;
|
|
Packit |
2997f0 |
break;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
case RD_KAFKA_TOPIC_S_NOTEXISTS:
|
|
Packit |
2997f0 |
/* Topic not found in cluster.
|
|
Packit |
2997f0 |
* Fail message immediately. */
|
|
Packit |
2997f0 |
err = RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC;
|
|
Packit |
2997f0 |
if (do_lock)
|
|
Packit |
2997f0 |
rd_kafka_topic_rdunlock(rkt);
|
|
Packit |
2997f0 |
return err;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
case RD_KAFKA_TOPIC_S_EXISTS:
|
|
Packit |
2997f0 |
/* Topic exists in cluster. */
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Topic exists but has no partitions.
|
|
Packit |
2997f0 |
* This is usually an transient state following the
|
|
Packit |
2997f0 |
* auto-creation of a topic. */
|
|
Packit |
2997f0 |
if (unlikely(rkt->rkt_partition_cnt == 0)) {
|
|
Packit |
2997f0 |
partition = RD_KAFKA_PARTITION_UA;
|
|
Packit |
2997f0 |
break;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Partition not assigned, run partitioner. */
|
|
Packit |
2997f0 |
if (rkm->rkm_partition == RD_KAFKA_PARTITION_UA) {
|
|
Packit |
2997f0 |
rd_kafka_topic_t *app_rkt;
|
|
Packit |
2997f0 |
/* Provide a temporary app_rkt instance to protect
|
|
Packit |
2997f0 |
* from the case where the application decided to
|
|
Packit |
2997f0 |
* destroy its topic object prior to delivery completion
|
|
Packit |
2997f0 |
* (issue #502). */
|
|
Packit |
2997f0 |
app_rkt = rd_kafka_topic_keep_a(rkt);
|
|
Packit |
2997f0 |
partition = rkt->rkt_conf.
|
|
Packit |
2997f0 |
partitioner(app_rkt,
|
|
Packit |
2997f0 |
rkm->rkm_key,
|
|
Packit |
2997f0 |
rkm->rkm_key_len,
|
|
Packit |
2997f0 |
rkt->rkt_partition_cnt,
|
|
Packit |
2997f0 |
rkt->rkt_conf.opaque,
|
|
Packit |
2997f0 |
rkm->rkm_opaque);
|
|
Packit |
2997f0 |
rd_kafka_topic_destroy0(
|
|
Packit |
2997f0 |
rd_kafka_topic_a2s(app_rkt));
|
|
Packit |
2997f0 |
} else
|
|
Packit |
2997f0 |
partition = rkm->rkm_partition;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Check that partition exists. */
|
|
Packit |
2997f0 |
if (partition >= rkt->rkt_partition_cnt) {
|
|
Packit |
2997f0 |
err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
|
|
Packit |
2997f0 |
if (do_lock)
|
|
Packit |
2997f0 |
rd_kafka_topic_rdunlock(rkt);
|
|
Packit |
2997f0 |
return err;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
break;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
default:
|
|
Packit |
2997f0 |
rd_kafka_assert(rkt->rkt_rk, !*"NOTREACHED");
|
|
Packit |
2997f0 |
break;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Get new partition */
|
|
Packit |
2997f0 |
s_rktp_new = rd_kafka_toppar_get(rkt, partition, 0);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (unlikely(!s_rktp_new)) {
|
|
Packit |
2997f0 |
/* Unknown topic or partition */
|
|
Packit |
2997f0 |
if (rkt->rkt_state == RD_KAFKA_TOPIC_S_NOTEXISTS)
|
|
Packit |
2997f0 |
err = RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC;
|
|
Packit |
2997f0 |
else
|
|
Packit |
2997f0 |
err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (do_lock)
|
|
Packit |
2997f0 |
rd_kafka_topic_rdunlock(rkt);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
return err;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
rktp_new = rd_kafka_toppar_s2i(s_rktp_new);
|
|
Packit |
2997f0 |
rd_atomic64_add(&rktp_new->rktp_c.msgs, 1);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Update message partition */
|
|
Packit |
2997f0 |
if (rkm->rkm_partition == RD_KAFKA_PARTITION_UA)
|
|
Packit |
2997f0 |
rkm->rkm_partition = partition;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Partition is available: enqueue msg on partition's queue */
|
|
Packit |
2997f0 |
rd_kafka_toppar_enq_msg(rktp_new, rkm);
|
|
Packit |
2997f0 |
if (do_lock)
|
|
Packit |
2997f0 |
rd_kafka_topic_rdunlock(rkt);
|
|
Packit |
2997f0 |
rd_kafka_toppar_destroy(s_rktp_new); /* from _get() */
|
|
Packit |
2997f0 |
return 0;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/**
|
|
Packit |
2997f0 |
* @name Public message type (rd_kafka_message_t)
|
|
Packit |
2997f0 |
*/
|
|
Packit |
2997f0 |
void rd_kafka_message_destroy (rd_kafka_message_t *rkmessage) {
|
|
Packit |
2997f0 |
rd_kafka_op_t *rko;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (likely((rko = (rd_kafka_op_t *)rkmessage->_private) != NULL))
|
|
Packit |
2997f0 |
rd_kafka_op_destroy(rko);
|
|
Packit |
2997f0 |
else {
|
|
Packit |
2997f0 |
rd_kafka_msg_t *rkm = rd_kafka_message2msg(rkmessage);
|
|
Packit |
2997f0 |
rd_kafka_msg_destroy(NULL, rkm);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
rd_kafka_message_t *rd_kafka_message_new (void) {
|
|
Packit |
2997f0 |
rd_kafka_msg_t *rkm = rd_calloc(1, sizeof(*rkm));
|
|
Packit |
2997f0 |
return (rd_kafka_message_t *)rkm;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/**
|
|
Packit |
2997f0 |
* @brief Set up a rkmessage from an rko for passing to the application.
|
|
Packit |
2997f0 |
* @remark Will trigger on_consume() interceptors if any.
|
|
Packit |
2997f0 |
*/
|
|
Packit |
2997f0 |
static rd_kafka_message_t *
|
|
Packit |
2997f0 |
rd_kafka_message_setup (rd_kafka_op_t *rko, rd_kafka_message_t *rkmessage) {
|
|
Packit |
2997f0 |
rd_kafka_itopic_t *rkt;
|
|
Packit |
2997f0 |
rd_kafka_toppar_t *rktp = NULL;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (rko->rko_type == RD_KAFKA_OP_DR) {
|
|
Packit |
2997f0 |
rkt = rd_kafka_topic_s2i(rko->rko_u.dr.s_rkt);
|
|
Packit |
2997f0 |
} else {
|
|
Packit |
2997f0 |
if (rko->rko_rktp) {
|
|
Packit |
2997f0 |
rktp = rd_kafka_toppar_s2i(rko->rko_rktp);
|
|
Packit |
2997f0 |
rkt = rktp->rktp_rkt;
|
|
Packit |
2997f0 |
} else
|
|
Packit |
2997f0 |
rkt = NULL;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
rkmessage->_private = rko;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (!rkmessage->rkt && rkt)
|
|
Packit |
2997f0 |
rkmessage->rkt = rd_kafka_topic_keep_a(rkt);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (rktp)
|
|
Packit |
2997f0 |
rkmessage->partition = rktp->rktp_partition;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (!rkmessage->err)
|
|
Packit |
2997f0 |
rkmessage->err = rko->rko_err;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Call on_consume interceptors */
|
|
Packit |
2997f0 |
switch (rko->rko_type)
|
|
Packit |
2997f0 |
{
|
|
Packit |
2997f0 |
case RD_KAFKA_OP_FETCH:
|
|
Packit |
2997f0 |
if (!rkmessage->err && rkt)
|
|
Packit |
2997f0 |
rd_kafka_interceptors_on_consume(rkt->rkt_rk,
|
|
Packit |
2997f0 |
rkmessage);
|
|
Packit |
2997f0 |
break;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
default:
|
|
Packit |
2997f0 |
break;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
return rkmessage;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/**
|
|
Packit |
2997f0 |
* @brief Get rkmessage from rkm (for EVENT_DR)
|
|
Packit |
2997f0 |
* @remark Must only be called just prior to passing a dr to the application.
|
|
Packit |
2997f0 |
*/
|
|
Packit |
2997f0 |
rd_kafka_message_t *rd_kafka_message_get_from_rkm (rd_kafka_op_t *rko,
|
|
Packit |
2997f0 |
rd_kafka_msg_t *rkm) {
|
|
Packit |
2997f0 |
return rd_kafka_message_setup(rko, &rkm->rkm_rkmessage);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/**
|
|
Packit |
2997f0 |
* @brief Convert rko to rkmessage
|
|
Packit |
2997f0 |
* @remark Must only be called just prior to passing a consumed message
|
|
Packit |
2997f0 |
* or event to the application.
|
|
Packit |
2997f0 |
* @remark Will trigger on_consume() interceptors, if any.
|
|
Packit |
2997f0 |
* @returns a rkmessage (bound to the rko).
|
|
Packit |
2997f0 |
*/
|
|
Packit |
2997f0 |
rd_kafka_message_t *rd_kafka_message_get (rd_kafka_op_t *rko) {
|
|
Packit |
2997f0 |
rd_kafka_message_t *rkmessage;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (!rko)
|
|
Packit |
2997f0 |
return rd_kafka_message_new(); /* empty */
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
switch (rko->rko_type)
|
|
Packit |
2997f0 |
{
|
|
Packit |
2997f0 |
case RD_KAFKA_OP_FETCH:
|
|
Packit |
2997f0 |
/* Use embedded rkmessage */
|
|
Packit |
2997f0 |
rkmessage = &rko->rko_u.fetch.rkm.rkm_rkmessage;
|
|
Packit |
2997f0 |
break;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
case RD_KAFKA_OP_ERR:
|
|
Packit |
2997f0 |
case RD_KAFKA_OP_CONSUMER_ERR:
|
|
Packit |
2997f0 |
rkmessage = &rko->rko_u.err.rkm.rkm_rkmessage;
|
|
Packit |
2997f0 |
rkmessage->payload = rko->rko_u.err.errstr;
|
|
Packit |
2997f0 |
rkmessage->offset = rko->rko_u.err.offset;
|
|
Packit |
2997f0 |
break;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
default:
|
|
Packit |
2997f0 |
rd_kafka_assert(NULL, !*"unhandled optype");
|
|
Packit |
2997f0 |
RD_NOTREACHED();
|
|
Packit |
2997f0 |
return NULL;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
return rd_kafka_message_setup(rko, rkmessage);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
int64_t rd_kafka_message_timestamp (const rd_kafka_message_t *rkmessage,
|
|
Packit |
2997f0 |
rd_kafka_timestamp_type_t *tstype) {
|
|
Packit |
2997f0 |
rd_kafka_msg_t *rkm;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (rkmessage->err) {
|
|
Packit |
2997f0 |
if (tstype)
|
|
Packit |
2997f0 |
*tstype = RD_KAFKA_TIMESTAMP_NOT_AVAILABLE;
|
|
Packit |
2997f0 |
return -1;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
rkm = rd_kafka_message2msg((rd_kafka_message_t *)rkmessage);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (tstype)
|
|
Packit |
2997f0 |
*tstype = rkm->rkm_tstype;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
return rkm->rkm_timestamp;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
int64_t rd_kafka_message_latency (const rd_kafka_message_t *rkmessage) {
|
|
Packit |
2997f0 |
rd_kafka_msg_t *rkm;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
rkm = rd_kafka_message2msg((rd_kafka_message_t *)rkmessage);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (unlikely(!rkm->rkm_ts_enq))
|
|
Packit |
2997f0 |
return -1;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
return rd_clock() - rkm->rkm_ts_enq;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/**
|
|
Packit |
2997f0 |
* @brief Parse serialized message headers and populate
|
|
Packit |
2997f0 |
* rkm->rkm_headers (which must be NULL).
|
|
Packit |
2997f0 |
*/
|
|
Packit |
2997f0 |
static rd_kafka_resp_err_t rd_kafka_msg_headers_parse (rd_kafka_msg_t *rkm) {
|
|
Packit |
2997f0 |
rd_kafka_buf_t *rkbuf;
|
|
Packit |
2997f0 |
int64_t HeaderCount;
|
|
Packit |
2997f0 |
const int log_decode_errors = 0;
|
|
Packit |
2997f0 |
rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR__BAD_MSG;
|
|
Packit |
2997f0 |
int i;
|
|
Packit |
2997f0 |
rd_kafka_headers_t *hdrs = NULL;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
rd_dassert(!rkm->rkm_headers);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (RD_KAFKAP_BYTES_LEN(&rkm->rkm_u.consumer.binhdrs) == 0)
|
|
Packit |
2997f0 |
return RD_KAFKA_RESP_ERR__NOENT;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
rkbuf = rd_kafka_buf_new_shadow(rkm->rkm_u.consumer.binhdrs.data,
|
|
Packit |
2997f0 |
RD_KAFKAP_BYTES_LEN(&rkm->rkm_u.
|
|
Packit |
2997f0 |
consumer.binhdrs),
|
|
Packit |
2997f0 |
NULL);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
rd_kafka_buf_read_varint(rkbuf, &HeaderCount);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (HeaderCount <= 0) {
|
|
Packit |
2997f0 |
rd_kafka_buf_destroy(rkbuf);
|
|
Packit |
2997f0 |
return RD_KAFKA_RESP_ERR__NOENT;
|
|
Packit |
2997f0 |
} else if (unlikely(HeaderCount > 100000)) {
|
|
Packit |
2997f0 |
rd_kafka_buf_destroy(rkbuf);
|
|
Packit |
2997f0 |
return RD_KAFKA_RESP_ERR__BAD_MSG;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
hdrs = rd_kafka_headers_new((size_t)HeaderCount);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
for (i = 0 ; (int64_t)i < HeaderCount ; i++) {
|
|
Packit |
2997f0 |
int64_t KeyLen, ValueLen;
|
|
Packit |
2997f0 |
const char *Key, *Value;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
rd_kafka_buf_read_varint(rkbuf, &KeyLen);
|
|
Packit |
2997f0 |
rd_kafka_buf_read_ptr(rkbuf, &Key, (size_t)KeyLen);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
rd_kafka_buf_read_varint(rkbuf, &ValueLen);
|
|
Packit |
2997f0 |
if (unlikely(ValueLen == -1))
|
|
Packit |
2997f0 |
Value = NULL;
|
|
Packit |
2997f0 |
else
|
|
Packit |
2997f0 |
rd_kafka_buf_read_ptr(rkbuf, &Value, (size_t)ValueLen);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
rd_kafka_header_add(hdrs, Key, (ssize_t)KeyLen,
|
|
Packit |
2997f0 |
Value, (ssize_t)ValueLen);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
rkm->rkm_headers = hdrs;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
rd_kafka_buf_destroy(rkbuf);
|
|
Packit |
2997f0 |
return RD_KAFKA_RESP_ERR_NO_ERROR;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
err_parse:
|
|
Packit |
2997f0 |
err = rkbuf->rkbuf_err;
|
|
Packit |
2997f0 |
rd_kafka_buf_destroy(rkbuf);
|
|
Packit |
2997f0 |
if (hdrs)
|
|
Packit |
2997f0 |
rd_kafka_headers_destroy(hdrs);
|
|
Packit |
2997f0 |
return err;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
rd_kafka_resp_err_t
|
|
Packit |
2997f0 |
rd_kafka_message_headers (const rd_kafka_message_t *rkmessage,
|
|
Packit |
2997f0 |
rd_kafka_headers_t **hdrsp) {
|
|
Packit |
2997f0 |
rd_kafka_msg_t *rkm;
|
|
Packit |
2997f0 |
rd_kafka_resp_err_t err;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
rkm = rd_kafka_message2msg((rd_kafka_message_t *)rkmessage);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (rkm->rkm_headers) {
|
|
Packit |
2997f0 |
*hdrsp = rkm->rkm_headers;
|
|
Packit |
2997f0 |
return RD_KAFKA_RESP_ERR_NO_ERROR;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Producer (rkm_headers will be set if there were any headers) */
|
|
Packit |
2997f0 |
if (rkm->rkm_flags & RD_KAFKA_MSG_F_PRODUCER)
|
|
Packit |
2997f0 |
return RD_KAFKA_RESP_ERR__NOENT;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Consumer */
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* No previously parsed headers, check if the underlying
|
|
Packit |
2997f0 |
* protocol message had headers and if so, parse them. */
|
|
Packit |
2997f0 |
if (unlikely(!RD_KAFKAP_BYTES_LEN(&rkm->rkm_u.consumer.binhdrs)))
|
|
Packit |
2997f0 |
return RD_KAFKA_RESP_ERR__NOENT;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
err = rd_kafka_msg_headers_parse(rkm);
|
|
Packit |
2997f0 |
if (unlikely(err))
|
|
Packit |
2997f0 |
return err;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
*hdrsp = rkm->rkm_headers;
|
|
Packit |
2997f0 |
return RD_KAFKA_RESP_ERR_NO_ERROR;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
rd_kafka_resp_err_t
|
|
Packit |
2997f0 |
rd_kafka_message_detach_headers (rd_kafka_message_t *rkmessage,
|
|
Packit |
2997f0 |
rd_kafka_headers_t **hdrsp) {
|
|
Packit |
2997f0 |
rd_kafka_msg_t *rkm;
|
|
Packit |
2997f0 |
rd_kafka_resp_err_t err;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
err = rd_kafka_message_headers(rkmessage, hdrsp);
|
|
Packit |
2997f0 |
if (err)
|
|
Packit |
2997f0 |
return err;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
rkm = rd_kafka_message2msg((rd_kafka_message_t *)rkmessage);
|
|
Packit |
2997f0 |
rkm->rkm_headers = NULL;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
return RD_KAFKA_RESP_ERR_NO_ERROR;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
void rd_kafka_message_set_headers (rd_kafka_message_t *rkmessage,
|
|
Packit |
2997f0 |
rd_kafka_headers_t *hdrs) {
|
|
Packit |
2997f0 |
rd_kafka_msg_t *rkm;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
rkm = rd_kafka_message2msg((rd_kafka_message_t *)rkmessage);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (rkm->rkm_headers) {
|
|
Packit |
2997f0 |
assert(rkm->rkm_headers != hdrs);
|
|
Packit |
2997f0 |
rd_kafka_headers_destroy(rkm->rkm_headers);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
rkm->rkm_headers = hdrs;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
void rd_kafka_msgq_dump (FILE *fp, const char *what, rd_kafka_msgq_t *rkmq) {
|
|
Packit |
2997f0 |
rd_kafka_msg_t *rkm;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
fprintf(fp, "%s msgq_dump (%d messages, %"PRIusz" bytes):\n", what,
|
|
Packit |
2997f0 |
rd_kafka_msgq_len(rkmq), rd_kafka_msgq_size(rkmq));
|
|
Packit |
2997f0 |
TAILQ_FOREACH(rkm, &rkmq->rkmq_msgs, rkm_link) {
|
|
Packit |
2997f0 |
fprintf(fp, " [%"PRId32"]@%"PRId64
|
|
Packit |
2997f0 |
": rkm msgseq %"PRIu64": \"%.*s\"\n",
|
|
Packit |
2997f0 |
rkm->rkm_partition, rkm->rkm_offset,
|
|
Packit |
2997f0 |
rkm->rkm_u.producer.msgseq,
|
|
Packit |
2997f0 |
(int)rkm->rkm_len, (const char *)rkm->rkm_payload);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/**
|
|
Packit |
2997f0 |
* @name Unit tests
|
|
Packit |
2997f0 |
*/
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/**
|
|
Packit |
2997f0 |
* @brief Unittest: message allocator
|
|
Packit |
2997f0 |
*/
|
|
Packit |
2997f0 |
static rd_kafka_msg_t *ut_rd_kafka_msg_new (void) {
|
|
Packit |
2997f0 |
rd_kafka_msg_t *rkm;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
rkm = rd_calloc(1, sizeof(*rkm));
|
|
Packit |
2997f0 |
rkm->rkm_flags = RD_KAFKA_MSG_F_FREE_RKM;
|
|
Packit |
2997f0 |
rkm->rkm_offset = RD_KAFKA_OFFSET_INVALID;
|
|
Packit |
2997f0 |
rkm->rkm_tstype = RD_KAFKA_TIMESTAMP_NOT_AVAILABLE;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
return rkm;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/**
|
|
Packit |
2997f0 |
* @brief Unittest: destroy all messages in queue
|
|
Packit |
2997f0 |
*/
|
|
Packit |
2997f0 |
static void ut_rd_kafka_msgq_purge (rd_kafka_msgq_t *rkmq) {
|
|
Packit |
2997f0 |
rd_kafka_msg_t *rkm, *tmp;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
TAILQ_FOREACH_SAFE(rkm, &rkmq->rkmq_msgs, rkm_link, tmp)
|
|
Packit |
2997f0 |
rd_kafka_msg_destroy(NULL, rkm);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
rd_kafka_msgq_init(rkmq);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
static int ut_verify_msgq_order (const char *what,
|
|
Packit |
2997f0 |
const rd_kafka_msgq_t *rkmq,
|
|
Packit |
2997f0 |
int first, int last) {
|
|
Packit |
2997f0 |
const rd_kafka_msg_t *rkm;
|
|
Packit |
2997f0 |
uint64_t expected = first;
|
|
Packit |
2997f0 |
int incr = first < last ? +1 : -1;
|
|
Packit |
2997f0 |
int fails = 0;
|
|
Packit |
2997f0 |
int cnt = 0;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
TAILQ_FOREACH(rkm, &rkmq->rkmq_msgs, rkm_link) {
|
|
Packit |
2997f0 |
if (rkm->rkm_u.producer.msgseq != expected) {
|
|
Packit |
2997f0 |
RD_UT_SAY("%s: expected msgseq %"PRIu64
|
|
Packit |
2997f0 |
" not %"PRIu64" at index #%d",
|
|
Packit |
2997f0 |
what, expected,
|
|
Packit |
2997f0 |
rkm->rkm_u.producer.msgseq, cnt);
|
|
Packit |
2997f0 |
fails++;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
cnt++;
|
|
Packit |
2997f0 |
expected += incr;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
RD_UT_ASSERT(!fails, "See %d previous failure(s)", fails);
|
|
Packit |
2997f0 |
return fails;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/**
|
|
Packit |
2997f0 |
* @brief Verify ordering comparator for message queues.
|
|
Packit |
2997f0 |
*/
|
|
Packit |
2997f0 |
static int unittest_msgq_order (const char *what, int fifo,
|
|
Packit |
2997f0 |
int (*cmp) (const void *, const void *)) {
|
|
Packit |
2997f0 |
rd_kafka_msgq_t rkmq = RD_KAFKA_MSGQ_INITIALIZER(rkmq);
|
|
Packit |
2997f0 |
rd_kafka_msg_t *rkm;
|
|
Packit |
2997f0 |
rd_kafka_msgq_t sendq;
|
|
Packit |
2997f0 |
int i;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
RD_UT_SAY("%s: testing in %s mode", what, fifo? "FIFO" : "LIFO");
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
for (i = 1 ; i <= 6 ; i++) {
|
|
Packit |
2997f0 |
rkm = ut_rd_kafka_msg_new();
|
|
Packit |
2997f0 |
rkm->rkm_u.producer.msgseq = i;
|
|
Packit |
2997f0 |
rd_kafka_msgq_enq_sorted0(&rkmq, rkm, cmp);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (fifo) {
|
|
Packit |
2997f0 |
if (ut_verify_msgq_order("added", &rkmq, 1, 6))
|
|
Packit |
2997f0 |
return 1;
|
|
Packit |
2997f0 |
} else {
|
|
Packit |
2997f0 |
if (ut_verify_msgq_order("added", &rkmq, 6, 1))
|
|
Packit |
2997f0 |
return 1;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Move 3 messages to "send" queue which we then re-insert
|
|
Packit |
2997f0 |
* in the original queue (i.e., "retry"). */
|
|
Packit |
2997f0 |
rd_kafka_msgq_init(&sendq);
|
|
Packit |
2997f0 |
while (rd_kafka_msgq_len(&sendq) < 3)
|
|
Packit |
2997f0 |
rd_kafka_msgq_enq(&sendq, rd_kafka_msgq_pop(&rkmq));
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (fifo) {
|
|
Packit |
2997f0 |
if (ut_verify_msgq_order("send removed", &rkmq, 4, 6))
|
|
Packit |
2997f0 |
return 1;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (ut_verify_msgq_order("sendq", &sendq, 1, 3))
|
|
Packit |
2997f0 |
return 1;
|
|
Packit |
2997f0 |
} else {
|
|
Packit |
2997f0 |
if (ut_verify_msgq_order("send removed", &rkmq, 3, 1))
|
|
Packit |
2997f0 |
return 1;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (ut_verify_msgq_order("sendq", &sendq, 6, 4))
|
|
Packit |
2997f0 |
return 1;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Retry the messages, which moves them back to sendq
|
|
Packit |
2997f0 |
* maintaining the original order */
|
|
Packit |
2997f0 |
rd_kafka_retry_msgq(&rkmq, &sendq, 1, 1, 0, cmp);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
RD_UT_ASSERT(rd_kafka_msgq_len(&sendq) == 0,
|
|
Packit |
2997f0 |
"sendq FIFO should be empty, not contain %d messages",
|
|
Packit |
2997f0 |
rd_kafka_msgq_len(&sendq));
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (fifo) {
|
|
Packit |
2997f0 |
if (ut_verify_msgq_order("readded", &rkmq, 1, 6))
|
|
Packit |
2997f0 |
return 1;
|
|
Packit |
2997f0 |
} else {
|
|
Packit |
2997f0 |
if (ut_verify_msgq_order("readded", &rkmq, 6, 1))
|
|
Packit |
2997f0 |
return 1;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Move 4 first messages to to "send" queue, then
|
|
Packit |
2997f0 |
* retry them with max_retries=1 which should now fail for
|
|
Packit |
2997f0 |
* the 3 first messages that were already retried. */
|
|
Packit |
2997f0 |
rd_kafka_msgq_init(&sendq);
|
|
Packit |
2997f0 |
while (rd_kafka_msgq_len(&sendq) < 4)
|
|
Packit |
2997f0 |
rd_kafka_msgq_enq(&sendq, rd_kafka_msgq_pop(&rkmq));
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (fifo) {
|
|
Packit |
2997f0 |
if (ut_verify_msgq_order("send removed #2", &rkmq, 5, 6))
|
|
Packit |
2997f0 |
return 1;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (ut_verify_msgq_order("sendq #2", &sendq, 1, 4))
|
|
Packit |
2997f0 |
return 1;
|
|
Packit |
2997f0 |
} else {
|
|
Packit |
2997f0 |
if (ut_verify_msgq_order("send removed #2", &rkmq, 2, 1))
|
|
Packit |
2997f0 |
return 1;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (ut_verify_msgq_order("sendq #2", &sendq, 6, 3))
|
|
Packit |
2997f0 |
return 1;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Retry the messages, which should now keep the 3 first messages
|
|
Packit |
2997f0 |
* on sendq (no more retries) and just number 4 moved back. */
|
|
Packit |
2997f0 |
rd_kafka_retry_msgq(&rkmq, &sendq, 1, 1, 0, cmp);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (fifo) {
|
|
Packit |
2997f0 |
if (ut_verify_msgq_order("readded #2", &rkmq, 4, 6))
|
|
Packit |
2997f0 |
return 1;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (ut_verify_msgq_order("no more retries", &sendq, 1, 3))
|
|
Packit |
2997f0 |
return 1;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
} else {
|
|
Packit |
2997f0 |
if (ut_verify_msgq_order("readded #2", &rkmq, 3, 1))
|
|
Packit |
2997f0 |
return 1;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (ut_verify_msgq_order("no more retries", &sendq, 6, 4))
|
|
Packit |
2997f0 |
return 1;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
ut_rd_kafka_msgq_purge(&sendq);
|
|
Packit |
2997f0 |
ut_rd_kafka_msgq_purge(&rkmq);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
return 0;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
int unittest_msg (void) {
|
|
Packit |
2997f0 |
int fails = 0;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
fails += unittest_msgq_order("FIFO", 1, rd_kafka_msg_cmp_msgseq);
|
|
Packit |
2997f0 |
fails += unittest_msgq_order("LIFO", 0, rd_kafka_msg_cmp_msgseq_lifo);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
return fails;
|
|
Packit |
2997f0 |
}
|