/*
* librdkafka - Apache Kafka C library
*
* Copyright (c) 2012-2015, Magnus Edenhill
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef _RDKAFKA_BUF_H_
#define _RDKAFKA_BUF_H_
#include "rdkafka_int.h"
#include "rdcrc32.h"
#include "rdlist.h"
#include "rdbuf.h"
typedef struct rd_kafka_broker_s rd_kafka_broker_t;
#define RD_KAFKA_HEADERS_IOV_CNT 2
/**
* Temporary buffer with memory aligned writes to accommodate
* effective and platform safe struct writes.
*/
typedef struct rd_tmpabuf_s {
size_t size;
size_t of;
char *buf;
int failed;
int assert_on_fail;
} rd_tmpabuf_t;
/**
* @brief Allocate new tmpabuf with \p size bytes pre-allocated.
*/
static RD_UNUSED void
rd_tmpabuf_new (rd_tmpabuf_t *tab, size_t size, int assert_on_fail) {
tab->buf = rd_malloc(size);
tab->size = size;
tab->of = 0;
tab->failed = 0;
tab->assert_on_fail = assert_on_fail;
}
/**
* @brief Free memory allocated by tmpabuf
*/
static RD_UNUSED void
rd_tmpabuf_destroy (rd_tmpabuf_t *tab) {
rd_free(tab->buf);
}
/**
* @returns 1 if a previous operation failed.
*/
static RD_UNUSED RD_INLINE int
rd_tmpabuf_failed (rd_tmpabuf_t *tab) {
return tab->failed;
}
/**
* @brief Allocate \p size bytes for writing, returning an aligned pointer
* to the memory.
* @returns the allocated pointer (within the tmpabuf) on success or
* NULL if the requested number of bytes + alignment is not available
* in the tmpabuf.
*/
static RD_UNUSED void *
rd_tmpabuf_alloc0 (const char *func, int line, rd_tmpabuf_t *tab, size_t size) {
void *ptr;
if (unlikely(tab->failed))
return NULL;
if (unlikely(tab->of + size > tab->size)) {
if (tab->assert_on_fail) {
fprintf(stderr,
"%s: %s:%d: requested size %zd + %zd > %zd\n",
__FUNCTION__, func, line, tab->of, size,
tab->size);
assert(!*"rd_tmpabuf_alloc: not enough size in buffer");
}
return NULL;
}
ptr = (void *)(tab->buf + tab->of);
tab->of += RD_ROUNDUP(size, 8);
return ptr;
}
#define rd_tmpabuf_alloc(tab,size) \
rd_tmpabuf_alloc0(__FUNCTION__,__LINE__,tab,size)
/**
* @brief Write \p buf of \p size bytes to tmpabuf memory in an aligned fashion.
*
* @returns the allocated and written-to pointer (within the tmpabuf) on success
* or NULL if the requested number of bytes + alignment is not available
* in the tmpabuf.
*/
static RD_UNUSED void *
rd_tmpabuf_write0 (const char *func, int line,
rd_tmpabuf_t *tab, const void *buf, size_t size) {
void *ptr = rd_tmpabuf_alloc0(func, line, tab, size);
if (ptr)
memcpy(ptr, buf, size);
return ptr;
}
#define rd_tmpabuf_write(tab,buf,size) \
rd_tmpabuf_write0(__FUNCTION__, __LINE__, tab, buf, size)
/**
* @brief Wrapper for rd_tmpabuf_write() that takes a nul-terminated string.
*/
static RD_UNUSED char *
rd_tmpabuf_write_str0 (const char *func, int line,
rd_tmpabuf_t *tab, const char *str) {
return rd_tmpabuf_write0(func, line, tab, str, strlen(str)+1);
}
#define rd_tmpabuf_write_str(tab,str) \
rd_tmpabuf_write_str0(__FUNCTION__, __LINE__, tab, str)
/**
* @name Read buffer interface
*
* Memory reading helper macros to be used when parsing network responses.
*
* Assumptions:
* - an 'err_parse:' goto-label must be available for error bailouts,
* the error code will be set in rkbuf->rkbuf_err
* - local `int log_decode_errors` variable set to the logging level
* to log parse errors (or 0 to turn off logging).
*/
#define rd_kafka_buf_parse_fail(rkbuf,...) do { \
if (log_decode_errors > 0) { \
rd_kafka_assert(NULL, rkbuf->rkbuf_rkb); \
rd_rkb_log(rkbuf->rkbuf_rkb, log_decode_errors, \
"PROTOERR", \
"Protocol parse failure " \
"at %"PRIusz"/%"PRIusz" (%s:%i) " \
"(incorrect broker.version.fallback?)", \
rd_slice_offset(&rkbuf->rkbuf_reader), \
rd_slice_size(&rkbuf->rkbuf_reader), \
__FUNCTION__, __LINE__); \
rd_rkb_log(rkbuf->rkbuf_rkb, log_decode_errors, \
"PROTOERR", __VA_ARGS__); \
} \
(rkbuf)->rkbuf_err = RD_KAFKA_RESP_ERR__BAD_MSG; \
goto err_parse; \
} while (0)
/**
* @name Fail buffer reading due to buffer underflow.
*/
#define rd_kafka_buf_underflow_fail(rkbuf,wantedlen,...) do { \
if (log_decode_errors > 0) { \
rd_kafka_assert(NULL, rkbuf->rkbuf_rkb); \
char __tmpstr[256]; \
rd_snprintf(__tmpstr, sizeof(__tmpstr), \
": " __VA_ARGS__); \
if (strlen(__tmpstr) == 2) __tmpstr[0] = '\0'; \
rd_rkb_log(rkbuf->rkbuf_rkb, log_decode_errors, \
"PROTOUFLOW", \
"Protocol read buffer underflow " \
"at %"PRIusz"/%"PRIusz" (%s:%i): " \
"expected %"PRIusz" bytes > " \
"%"PRIusz" remaining bytes (%s)%s", \
rd_slice_offset(&rkbuf->rkbuf_reader), \
rd_slice_size(&rkbuf->rkbuf_reader), \
__FUNCTION__, __LINE__, \
wantedlen, \
rd_slice_remains(&rkbuf->rkbuf_reader), \
rkbuf->rkbuf_uflow_mitigation ? \
rkbuf->rkbuf_uflow_mitigation : \
"incorrect broker.version.fallback?", \
__tmpstr); \
} \
(rkbuf)->rkbuf_err = RD_KAFKA_RESP_ERR__UNDERFLOW; \
goto err_parse; \
} while (0)
/**
* Returns the number of remaining bytes available to read.
*/
#define rd_kafka_buf_read_remain(rkbuf) \
rd_slice_remains(&(rkbuf)->rkbuf_reader)
/**
* Checks that at least 'len' bytes remain to be read in buffer, else fails.
*/
#define rd_kafka_buf_check_len(rkbuf,len) do { \
size_t __len0 = (size_t)(len); \
if (unlikely(__len0 > rd_kafka_buf_read_remain(rkbuf))) { \
rd_kafka_buf_underflow_fail(rkbuf, __len0); \
} \
} while (0)
/**
* Skip (as in read and ignore) the next 'len' bytes.
*/
#define rd_kafka_buf_skip(rkbuf, len) do { \
size_t __len1 = (size_t)(len); \
if (__len1 && \
!rd_slice_read(&(rkbuf)->rkbuf_reader, NULL, __len1)) \
rd_kafka_buf_check_len(rkbuf, __len1); \
} while (0)
/**
* Skip (as in read and ignore) up to fixed position \p pos.
*/
#define rd_kafka_buf_skip_to(rkbuf, pos) do { \
size_t __len1 = (size_t)(pos) - \
rd_slice_offset(&(rkbuf)->rkbuf_reader); \
if (__len1 && \
!rd_slice_read(&(rkbuf)->rkbuf_reader, NULL, __len1)) \
rd_kafka_buf_check_len(rkbuf, __len1); \
} while (0)
/**
* Read 'len' bytes and copy to 'dstptr'
*/
#define rd_kafka_buf_read(rkbuf,dstptr,len) do { \
size_t __len2 = (size_t)(len); \
if (!rd_slice_read(&(rkbuf)->rkbuf_reader, dstptr, __len2)) \
rd_kafka_buf_check_len(rkbuf, __len2); \
} while (0)
/**
* @brief Read \p len bytes at slice offset \p offset and copy to \p dstptr
* without affecting the current reader position.
*/
#define rd_kafka_buf_peek(rkbuf,offset,dstptr,len) do { \
size_t __len2 = (size_t)(len); \
if (!rd_slice_peek(&(rkbuf)->rkbuf_reader, offset, \
dstptr, __len2)) \
rd_kafka_buf_check_len(rkbuf, (offset)+(__len2)); \
} while (0)
/**
* Read a 16,32,64-bit integer and store it in 'dstptr'
*/
#define rd_kafka_buf_read_i64(rkbuf,dstptr) do { \
int64_t _v; \
rd_kafka_buf_read(rkbuf, &_v, sizeof(_v)); \
*(dstptr) = be64toh(_v); \
} while (0)
#define rd_kafka_buf_peek_i64(rkbuf,of,dstptr) do { \
int64_t _v; \
rd_kafka_buf_peek(rkbuf, of, &_v, sizeof(_v)); \
*(dstptr) = be64toh(_v); \
} while (0)
#define rd_kafka_buf_read_i32(rkbuf,dstptr) do { \
int32_t _v; \
rd_kafka_buf_read(rkbuf, &_v, sizeof(_v)); \
*(dstptr) = be32toh(_v); \
} while (0)
/* Same as .._read_i32 but does a direct assignment.
* dst is assumed to be a scalar, not pointer. */
#define rd_kafka_buf_read_i32a(rkbuf, dst) do { \
int32_t _v; \
rd_kafka_buf_read(rkbuf, &_v, 4); \
dst = (int32_t) be32toh(_v); \
} while (0)
#define rd_kafka_buf_read_i16(rkbuf,dstptr) do { \
int16_t _v; \
rd_kafka_buf_read(rkbuf, &_v, sizeof(_v)); \
*(dstptr) = be16toh(_v); \
} while (0)
#define rd_kafka_buf_read_i16a(rkbuf, dst) do { \
int16_t _v; \
rd_kafka_buf_read(rkbuf, &_v, 2); \
dst = (int16_t)be16toh(_v); \
} while (0)
#define rd_kafka_buf_read_i8(rkbuf, dst) rd_kafka_buf_read(rkbuf, dst, 1)
#define rd_kafka_buf_peek_i8(rkbuf,of,dst) rd_kafka_buf_peek(rkbuf,of,dst,1)
/**
* @brief Read varint and store in int64_t \p dst
*/
#define rd_kafka_buf_read_varint(rkbuf,dst) do { \
int64_t _v; \
size_t _r = rd_varint_dec_slice(&(rkbuf)->rkbuf_reader, &_v); \
if (unlikely(RD_UVARINT_UNDERFLOW(_r))) \
rd_kafka_buf_underflow_fail(rkbuf, (size_t)0, \
"varint parsing failed");\
*(dst) = _v; \
} while (0)
/* Read Kafka String representation (2+N).
* The kstr data will be updated to point to the rkbuf. */
#define rd_kafka_buf_read_str(rkbuf, kstr) do { \
int _klen; \
rd_kafka_buf_read_i16a(rkbuf, (kstr)->len); \
_klen = RD_KAFKAP_STR_LEN(kstr); \
if (RD_KAFKAP_STR_LEN0(_klen) == 0) \
(kstr)->str = NULL; \
else if (!((kstr)->str = \
rd_slice_ensure_contig(&rkbuf->rkbuf_reader, \
_klen))) \
rd_kafka_buf_check_len(rkbuf, _klen); \
} while (0)
/* Read Kafka String representation (2+N) and write it to the \p tmpabuf
* with a trailing nul byte. */
#define rd_kafka_buf_read_str_tmpabuf(rkbuf, tmpabuf, dst) do { \
rd_kafkap_str_t _kstr; \
size_t _slen; \
char *_dst; \
rd_kafka_buf_read_str(rkbuf, &_kstr); \
_slen = RD_KAFKAP_STR_LEN(&_kstr); \
if (!(_dst = \
rd_tmpabuf_write(tmpabuf, _kstr.str, _slen+1))) \
rd_kafka_buf_parse_fail( \
rkbuf, \
"Not enough room in tmpabuf: " \
"%"PRIusz"+%"PRIusz \
" > %"PRIusz, \
(tmpabuf)->of, _slen+1, (tmpabuf)->size); \
_dst[_slen] = '\0'; \
dst = (void *)_dst; \
} while (0)
/**
* Skip a string.
*/
#define rd_kafka_buf_skip_str(rkbuf) do { \
int16_t _slen; \
rd_kafka_buf_read_i16(rkbuf, &_slen); \
rd_kafka_buf_skip(rkbuf, RD_KAFKAP_STR_LEN0(_slen)); \
} while (0)
/* Read Kafka Bytes representation (4+N).
* The 'kbytes' will be updated to point to rkbuf data */
#define rd_kafka_buf_read_bytes(rkbuf, kbytes) do { \
int _klen; \
rd_kafka_buf_read_i32a(rkbuf, _klen); \
(kbytes)->len = _klen; \
if (RD_KAFKAP_BYTES_IS_NULL(kbytes)) { \
(kbytes)->data = NULL; \
(kbytes)->len = 0; \
} else if (RD_KAFKAP_BYTES_LEN(kbytes) == 0) \
(kbytes)->data = ""; \
else if (!((kbytes)->data = \
rd_slice_ensure_contig(&(rkbuf)->rkbuf_reader, \
_klen))) \
rd_kafka_buf_check_len(rkbuf, _klen); \
} while (0)
/**
* @brief Read \p size bytes from buffer, setting \p *ptr to the start
* of the memory region.
*/
#define rd_kafka_buf_read_ptr(rkbuf,ptr,size) do { \
size_t _klen = size; \
if (!(*(ptr) = (void *) \
rd_slice_ensure_contig(&(rkbuf)->rkbuf_reader, _klen))) \
rd_kafka_buf_check_len(rkbuf, _klen); \
} while (0)
/**
* @brief Read varint-lengted Kafka Bytes representation
*/
#define rd_kafka_buf_read_bytes_varint(rkbuf,kbytes) do { \
int64_t _len2; \
size_t _r = rd_varint_dec_slice(&(rkbuf)->rkbuf_reader, \
&_len2); \
if (unlikely(RD_UVARINT_UNDERFLOW(_r))) \
rd_kafka_buf_underflow_fail(rkbuf, (size_t)0, \
"varint parsing failed"); \
(kbytes)->len = (int32_t)_len2; \
if (RD_KAFKAP_BYTES_IS_NULL(kbytes)) { \
(kbytes)->data = NULL; \
(kbytes)->len = 0; \
} else if (RD_KAFKAP_BYTES_LEN(kbytes) == 0) \
(kbytes)->data = ""; \
else if (!((kbytes)->data = \
rd_slice_ensure_contig(&(rkbuf)->rkbuf_reader, \
(size_t)_len2))) \
rd_kafka_buf_check_len(rkbuf, _len2); \
} while (0)
/**
* Response handling callback.
*
* NOTE: Callbacks must check for 'err == RD_KAFKA_RESP_ERR__DESTROY'
* which indicates that some entity is terminating (rd_kafka_t, broker,
* toppar, queue, etc) and the callback may not be called in the
* correct thread. In this case the callback must perform just
* the most minimal cleanup and dont trigger any other operations.
*
* NOTE: rkb, reply and request may be NULL, depending on error situation.
*/
typedef void (rd_kafka_resp_cb_t) (rd_kafka_t *rk,
rd_kafka_broker_t *rkb,
rd_kafka_resp_err_t err,
rd_kafka_buf_t *reply,
rd_kafka_buf_t *request,
void *opaque);
struct rd_kafka_buf_s { /* rd_kafka_buf_t */
TAILQ_ENTRY(rd_kafka_buf_s) rkbuf_link;
int32_t rkbuf_corrid;
rd_ts_t rkbuf_ts_retry; /* Absolute send retry time */
int rkbuf_flags; /* RD_KAFKA_OP_F */
rd_buf_t rkbuf_buf; /**< Send/Recv byte buffer */
rd_slice_t rkbuf_reader; /**< Buffer slice reader for rkbuf_buf */
int rkbuf_connid; /* broker connection id (used when buffer
* was partially sent). */
size_t rkbuf_totlen; /* recv: total expected length,
* send: not used */
rd_crc32_t rkbuf_crc; /* Current CRC calculation */
struct rd_kafkap_reqhdr rkbuf_reqhdr; /* Request header.
* These fields are encoded
* and written to output buffer
* on buffer finalization. */
struct rd_kafkap_reshdr rkbuf_reshdr; /* Response header.
* Decoded fields are copied
* here from the buffer
* to provide an ease-of-use
* interface to the header */
int32_t rkbuf_expected_size; /* expected size of message */
rd_kafka_replyq_t rkbuf_replyq; /* Enqueue response on replyq */
rd_kafka_replyq_t rkbuf_orig_replyq; /* Original replyq to be used
* for retries from inside
* the rkbuf_cb() callback
* since rkbuf_replyq will
* have been reset. */
rd_kafka_resp_cb_t *rkbuf_cb; /* Response callback */
struct rd_kafka_buf_s *rkbuf_response; /* Response buffer */
struct rd_kafka_broker_s *rkbuf_rkb;
rd_refcnt_t rkbuf_refcnt;
void *rkbuf_opaque;
int rkbuf_retries; /* Retries so far. */
#define RD_KAFKA_BUF_NO_RETRIES 1000000 /* Do not retry */
int rkbuf_features; /* Required feature(s) that must be
* supported by broker. */
rd_ts_t rkbuf_ts_enq;
rd_ts_t rkbuf_ts_sent; /* Initially: Absolute time of transmission,
* after response: RTT. */
/* Request timeouts:
* rkbuf_ts_timeout is the effective absolute request timeout used
* by the timeout scanner to see if a request has timed out.
* It is set when a request is enqueued on the broker transmit
* queue based on the relative or absolute timeout:
*
* rkbuf_rel_timeout is the per-request-transmit relative timeout,
* this value is reused for each sub-sequent retry of a request.
*
* rkbuf_abs_timeout is the absolute request timeout, spanning
* all retries.
* This value is effectively limited by socket.timeout.ms for
* each transmission, but the absolute timeout for a request's
* lifetime is the absolute value.
*
* Use rd_kafka_buf_set_timeout() to set a relative timeout
* that will be reused on retry,
* or rd_kafka_buf_set_abs_timeout() to set a fixed absolute timeout
* for the case where the caller knows the request will be
* semantically outdated when that absolute time expires, such as for
* session.timeout.ms-based requests.
*
* The decision to retry a request is delegated to the rkbuf_cb
* response callback, which should use rd_kafka_err_action()
* and check the return actions for RD_KAFKA_ERR_ACTION_RETRY to be set
* and then call rd_kafka_buf_retry().
* rd_kafka_buf_retry() will enqueue the request on the rkb_retrybufs
* queue with a backoff time of retry.backoff.ms.
* The rkb_retrybufs queue is served by the broker thread's timeout
* scanner.
* @warning rkb_retrybufs is NOT purged on broker down.
*/
rd_ts_t rkbuf_ts_timeout; /* Request timeout (absolute time). */
rd_ts_t rkbuf_abs_timeout;/* Absolute timeout for request, including
* retries.
* Mutually exclusive with rkbuf_rel_timeout*/
int rkbuf_rel_timeout;/* Relative timeout (ms), used for retries.
* Defaults to socket.timeout.ms.
* Mutually exclusive with rkbuf_abs_timeout*/
int64_t rkbuf_offset; /* Used by OffsetCommit */
rd_list_t *rkbuf_rktp_vers; /* Toppar + Op Version map.
* Used by FetchRequest. */
rd_kafka_msgq_t rkbuf_msgq;
rd_kafka_resp_err_t rkbuf_err; /* Buffer parsing error code */
union {
struct {
rd_list_t *topics; /* Requested topics (char *) */
char *reason; /* Textual reason */
rd_kafka_op_t *rko; /* Originating rko with replyq
* (if any) */
int all_topics; /* Full/All topics requested */
int *decr; /* Decrement this integer by one
* when request is complete:
* typically points to metadata
* cache's full_.._sent.
* Will be performed with
* decr_lock held. */
mtx_t *decr_lock;
} Metadata;
} rkbuf_u;
const char *rkbuf_uflow_mitigation; /**< Buffer read underflow
* human readable mitigation
* string (const memory).
* This is used to hint the
* user why the underflow
* might have occurred, which
* depends on request type. */
};
typedef struct rd_kafka_bufq_s {
TAILQ_HEAD(, rd_kafka_buf_s) rkbq_bufs;
rd_atomic32_t rkbq_cnt;
rd_atomic32_t rkbq_msg_cnt;
} rd_kafka_bufq_t;
#define rd_kafka_bufq_cnt(rkbq) rd_atomic32_get(&(rkbq)->rkbq_cnt)
/**
* @brief Set buffer's request timeout to relative \p timeout_ms measured
* from the time the buffer is sent on the underlying socket.
*
* @param now Reuse current time from existing rd_clock() var, else 0.
*
* The relative timeout value is reused upon request retry.
*/
static RD_INLINE void
rd_kafka_buf_set_timeout (rd_kafka_buf_t *rkbuf, int timeout_ms, rd_ts_t now) {
if (!now)
now = rd_clock();
rkbuf->rkbuf_rel_timeout = timeout_ms;
rkbuf->rkbuf_abs_timeout = 0;
}
/**
* @brief Calculate the effective timeout for a request attempt
*/
void rd_kafka_buf_calc_timeout (const rd_kafka_t *rk, rd_kafka_buf_t *rkbuf,
rd_ts_t now);
/**
* @brief Set buffer's request timeout to relative \p timeout_ms measured
* from \p now.
*
* @param now Reuse current time from existing rd_clock() var, else 0.
*
* The remaining time is used as timeout for request retries.
*/
static RD_INLINE void
rd_kafka_buf_set_abs_timeout (rd_kafka_buf_t *rkbuf, int timeout_ms,
rd_ts_t now) {
if (!now)
now = rd_clock();
rkbuf->rkbuf_rel_timeout = 0;
rkbuf->rkbuf_abs_timeout = now + (timeout_ms * 1000);
}
#define rd_kafka_buf_keep(rkbuf) rd_refcnt_add(&(rkbuf)->rkbuf_refcnt)
#define rd_kafka_buf_destroy(rkbuf) \
rd_refcnt_destroywrapper(&(rkbuf)->rkbuf_refcnt, \
rd_kafka_buf_destroy_final(rkbuf))
void rd_kafka_buf_destroy_final (rd_kafka_buf_t *rkbuf);
void rd_kafka_buf_push0 (rd_kafka_buf_t *rkbuf, const void *buf, size_t len,
int allow_crc_calc, void (*free_cb) (void *));
#define rd_kafka_buf_push(rkbuf,buf,len,free_cb) \
rd_kafka_buf_push0(rkbuf,buf,len,1/*allow_crc*/,free_cb)
rd_kafka_buf_t *rd_kafka_buf_new0 (int segcnt, size_t size, int flags);
#define rd_kafka_buf_new(segcnt,size) \
rd_kafka_buf_new0(segcnt,size,0)
rd_kafka_buf_t *rd_kafka_buf_new_request (rd_kafka_broker_t *rkb, int16_t ApiKey,
int segcnt, size_t size);
rd_kafka_buf_t *rd_kafka_buf_new_shadow (const void *ptr, size_t size,
void (*free_cb) (void *));
void rd_kafka_bufq_enq (rd_kafka_bufq_t *rkbufq, rd_kafka_buf_t *rkbuf);
void rd_kafka_bufq_deq (rd_kafka_bufq_t *rkbufq, rd_kafka_buf_t *rkbuf);
void rd_kafka_bufq_init(rd_kafka_bufq_t *rkbufq);
void rd_kafka_bufq_concat (rd_kafka_bufq_t *dst, rd_kafka_bufq_t *src);
void rd_kafka_bufq_purge (rd_kafka_broker_t *rkb,
rd_kafka_bufq_t *rkbufq,
rd_kafka_resp_err_t err);
void rd_kafka_bufq_connection_reset (rd_kafka_broker_t *rkb,
rd_kafka_bufq_t *rkbufq);
void rd_kafka_bufq_dump (rd_kafka_broker_t *rkb, const char *fac,
rd_kafka_bufq_t *rkbq);
int rd_kafka_buf_retry (rd_kafka_broker_t *rkb, rd_kafka_buf_t *rkbuf);
void rd_kafka_buf_handle_op (rd_kafka_op_t *rko, rd_kafka_resp_err_t err);
void rd_kafka_buf_callback (rd_kafka_t *rk,
rd_kafka_broker_t *rkb, rd_kafka_resp_err_t err,
rd_kafka_buf_t *response, rd_kafka_buf_t *request);
/**
*
* Write buffer interface
*
*/
/**
* Set request API type version
*/
static RD_UNUSED RD_INLINE void
rd_kafka_buf_ApiVersion_set (rd_kafka_buf_t *rkbuf,
int16_t version, int features) {
rkbuf->rkbuf_reqhdr.ApiVersion = version;
rkbuf->rkbuf_features = features;
}
/**
* @returns the ApiVersion for a request
*/
#define rd_kafka_buf_ApiVersion(rkbuf) ((rkbuf)->rkbuf_reqhdr.ApiVersion)
/**
* Write (copy) data to buffer at current write-buffer position.
* There must be enough space allocated in the rkbuf.
* Returns offset to written destination buffer.
*/
static RD_INLINE size_t rd_kafka_buf_write (rd_kafka_buf_t *rkbuf,
const void *data, size_t len) {
size_t r;
r = rd_buf_write(&rkbuf->rkbuf_buf, data, len);
if (rkbuf->rkbuf_flags & RD_KAFKA_OP_F_CRC)
rkbuf->rkbuf_crc = rd_crc32_update(rkbuf->rkbuf_crc, data, len);
return r;
}
/**
* Write (copy) 'data' to buffer at 'ptr'.
* There must be enough space to fit 'len'.
* This will overwrite the buffer at given location and length.
*
* NOTE: rd_kafka_buf_update() MUST NOT be called when a CRC calculation
* is in progress (between rd_kafka_buf_crc_init() & .._crc_finalize())
*/
static RD_INLINE void rd_kafka_buf_update (rd_kafka_buf_t *rkbuf, size_t of,
const void *data, size_t len) {
rd_kafka_assert(NULL, !(rkbuf->rkbuf_flags & RD_KAFKA_OP_F_CRC));
rd_buf_write_update(&rkbuf->rkbuf_buf, of, data, len);
}
/**
* Write int8_t to buffer.
*/
static RD_INLINE size_t rd_kafka_buf_write_i8 (rd_kafka_buf_t *rkbuf,
int8_t v) {
return rd_kafka_buf_write(rkbuf, &v, sizeof(v));
}
/**
* Update int8_t in buffer at offset 'of'.
* 'of' should have been previously returned by `.._buf_write_i8()`.
*/
static RD_INLINE void rd_kafka_buf_update_i8 (rd_kafka_buf_t *rkbuf,
size_t of, int8_t v) {
rd_kafka_buf_update(rkbuf, of, &v, sizeof(v));
}
/**
* Write int16_t to buffer.
* The value will be endian-swapped before write.
*/
static RD_INLINE size_t rd_kafka_buf_write_i16 (rd_kafka_buf_t *rkbuf,
int16_t v) {
v = htobe16(v);
return rd_kafka_buf_write(rkbuf, &v, sizeof(v));
}
/**
* Update int16_t in buffer at offset 'of'.
* 'of' should have been previously returned by `.._buf_write_i16()`.
*/
static RD_INLINE void rd_kafka_buf_update_i16 (rd_kafka_buf_t *rkbuf,
size_t of, int16_t v) {
v = htobe16(v);
rd_kafka_buf_update(rkbuf, of, &v, sizeof(v));
}
/**
* Write int32_t to buffer.
* The value will be endian-swapped before write.
*/
static RD_INLINE size_t rd_kafka_buf_write_i32 (rd_kafka_buf_t *rkbuf,
int32_t v) {
v = htobe32(v);
return rd_kafka_buf_write(rkbuf, &v, sizeof(v));
}
/**
* Update int32_t in buffer at offset 'of'.
* 'of' should have been previously returned by `.._buf_write_i32()`.
*/
static RD_INLINE void rd_kafka_buf_update_i32 (rd_kafka_buf_t *rkbuf,
size_t of, int32_t v) {
v = htobe32(v);
rd_kafka_buf_update(rkbuf, of, &v, sizeof(v));
}
/**
* Update int32_t in buffer at offset 'of'.
* 'of' should have been previously returned by `.._buf_write_i32()`.
*/
static RD_INLINE void rd_kafka_buf_update_u32 (rd_kafka_buf_t *rkbuf,
size_t of, uint32_t v) {
v = htobe32(v);
rd_kafka_buf_update(rkbuf, of, &v, sizeof(v));
}
/**
* Write int64_t to buffer.
* The value will be endian-swapped before write.
*/
static RD_INLINE size_t rd_kafka_buf_write_i64 (rd_kafka_buf_t *rkbuf, int64_t v) {
v = htobe64(v);
return rd_kafka_buf_write(rkbuf, &v, sizeof(v));
}
/**
* Update int64_t in buffer at address 'ptr'.
* 'of' should have been previously returned by `.._buf_write_i64()`.
*/
static RD_INLINE void rd_kafka_buf_update_i64 (rd_kafka_buf_t *rkbuf,
size_t of, int64_t v) {
v = htobe64(v);
rd_kafka_buf_update(rkbuf, of, &v, sizeof(v));
}
/**
* @brief Write varint-encoded signed value to buffer.
*/
static RD_INLINE size_t
rd_kafka_buf_write_varint (rd_kafka_buf_t *rkbuf, int64_t v) {
char varint[RD_UVARINT_ENC_SIZEOF(v)];
size_t sz;
sz = rd_uvarint_enc_i64(varint, sizeof(varint), v);
return rd_kafka_buf_write(rkbuf, varint, sz);
}
/**
* Write (copy) Kafka string to buffer.
*/
static RD_INLINE size_t rd_kafka_buf_write_kstr (rd_kafka_buf_t *rkbuf,
const rd_kafkap_str_t *kstr) {
return rd_kafka_buf_write(rkbuf, RD_KAFKAP_STR_SER(kstr),
RD_KAFKAP_STR_SIZE(kstr));
}
/**
* Write (copy) char * string to buffer.
*/
static RD_INLINE size_t rd_kafka_buf_write_str (rd_kafka_buf_t *rkbuf,
const char *str, size_t len) {
size_t r;
if (!str)
len = RD_KAFKAP_STR_LEN_NULL;
else if (len == (size_t)-1)
len = strlen(str);
r = rd_kafka_buf_write_i16(rkbuf, (int16_t) len);
if (str)
rd_kafka_buf_write(rkbuf, str, len);
return r;
}
/**
* Push (i.e., no copy) Kafka string to buffer iovec
*/
static RD_INLINE void rd_kafka_buf_push_kstr (rd_kafka_buf_t *rkbuf,
const rd_kafkap_str_t *kstr) {
rd_kafka_buf_push(rkbuf, RD_KAFKAP_STR_SER(kstr),
RD_KAFKAP_STR_SIZE(kstr), NULL);
}
/**
* Write (copy) Kafka bytes to buffer.
*/
static RD_INLINE size_t rd_kafka_buf_write_kbytes (rd_kafka_buf_t *rkbuf,
const rd_kafkap_bytes_t *kbytes){
return rd_kafka_buf_write(rkbuf, RD_KAFKAP_BYTES_SER(kbytes),
RD_KAFKAP_BYTES_SIZE(kbytes));
}
/**
* Push (i.e., no copy) Kafka bytes to buffer iovec
*/
static RD_INLINE void rd_kafka_buf_push_kbytes (rd_kafka_buf_t *rkbuf,
const rd_kafkap_bytes_t *kbytes){
rd_kafka_buf_push(rkbuf, RD_KAFKAP_BYTES_SER(kbytes),
RD_KAFKAP_BYTES_SIZE(kbytes), NULL);
}
/**
* Write (copy) binary bytes to buffer as Kafka bytes encapsulate data.
*/
static RD_INLINE size_t rd_kafka_buf_write_bytes (rd_kafka_buf_t *rkbuf,
const void *payload, size_t size) {
size_t r;
if (!payload)
size = RD_KAFKAP_BYTES_LEN_NULL;
r = rd_kafka_buf_write_i32(rkbuf, (int32_t) size);
if (payload)
rd_kafka_buf_write(rkbuf, payload, size);
return r;
}
/**
* Write Kafka Message to buffer
* The number of bytes written is returned in '*outlenp'.
*
* Returns the buffer offset of the first byte.
*/
size_t rd_kafka_buf_write_Message (rd_kafka_broker_t *rkb,
rd_kafka_buf_t *rkbuf,
int64_t Offset, int8_t MagicByte,
int8_t Attributes, int64_t Timestamp,
const void *key, int32_t key_len,
const void *payload, int32_t len,
int *outlenp);
/**
* Start calculating CRC from now and track it in '*crcp'.
*/
static RD_INLINE RD_UNUSED void rd_kafka_buf_crc_init (rd_kafka_buf_t *rkbuf) {
rd_kafka_assert(NULL, !(rkbuf->rkbuf_flags & RD_KAFKA_OP_F_CRC));
rkbuf->rkbuf_flags |= RD_KAFKA_OP_F_CRC;
rkbuf->rkbuf_crc = rd_crc32_init();
}
/**
* Finalizes CRC calculation and returns the calculated checksum.
*/
static RD_INLINE RD_UNUSED
rd_crc32_t rd_kafka_buf_crc_finalize (rd_kafka_buf_t *rkbuf) {
rkbuf->rkbuf_flags &= ~RD_KAFKA_OP_F_CRC;
return rd_crc32_finalize(rkbuf->rkbuf_crc);
}
/**
* @brief Check if buffer's replyq.version is outdated.
* @param rkbuf: may be NULL, for convenience.
*
* @returns 1 if this is an outdated buffer, else 0.
*/
static RD_UNUSED RD_INLINE int
rd_kafka_buf_version_outdated (const rd_kafka_buf_t *rkbuf, int version) {
return rkbuf && rkbuf->rkbuf_replyq.version &&
rkbuf->rkbuf_replyq.version < version;
}
#endif /* _RDKAFKA_BUF_H_ */