Blame src/rdkafka_int.h

Packit 2997f0
/*
Packit 2997f0
 * librdkafka - Apache Kafka C library
Packit 2997f0
 *
Packit 2997f0
 * Copyright (c) 2012-2013, Magnus Edenhill
Packit 2997f0
 * All rights reserved.
Packit 2997f0
 * 
Packit 2997f0
 * Redistribution and use in source and binary forms, with or without
Packit 2997f0
 * modification, are permitted provided that the following conditions are met: 
Packit 2997f0
 * 
Packit 2997f0
 * 1. Redistributions of source code must retain the above copyright notice,
Packit 2997f0
 *    this list of conditions and the following disclaimer. 
Packit 2997f0
 * 2. Redistributions in binary form must reproduce the above copyright notice,
Packit 2997f0
 *    this list of conditions and the following disclaimer in the documentation
Packit 2997f0
 *    and/or other materials provided with the distribution. 
Packit 2997f0
 * 
Packit 2997f0
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
Packit 2997f0
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
Packit 2997f0
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
Packit 2997f0
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
Packit 2997f0
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
Packit 2997f0
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
Packit 2997f0
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
Packit 2997f0
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
Packit 2997f0
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
Packit 2997f0
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
Packit 2997f0
 * POSSIBILITY OF SUCH DAMAGE.
Packit 2997f0
 */
Packit 2997f0
Packit 2997f0
#ifndef _RDKAFKA_INT_H_
Packit 2997f0
#define _RDKAFKA_INT_H_
Packit 2997f0
Packit 2997f0
#ifndef _MSC_VER
Packit 2997f0
#define _GNU_SOURCE  /* for strndup() */
Packit 2997f0
#include <syslog.h>
Packit 2997f0
#else
Packit 2997f0
typedef int mode_t;
Packit 2997f0
#endif
Packit 2997f0
#include <fcntl.h>
Packit 2997f0
Packit 2997f0
Packit 2997f0
#include "rdsysqueue.h"
Packit 2997f0
Packit 2997f0
#include "rdkafka.h"
Packit 2997f0
#include "rd.h"
Packit 2997f0
#include "rdlog.h"
Packit 2997f0
#include "rdtime.h"
Packit 2997f0
#include "rdaddr.h"
Packit 2997f0
#include "rdinterval.h"
Packit 2997f0
#include "rdavg.h"
Packit 2997f0
#include "rdlist.h"
Packit 2997f0
Packit 2997f0
#if WITH_SSL
Packit 2997f0
#include <openssl/ssl.h>
Packit 2997f0
#endif
Packit 2997f0
Packit 2997f0
Packit 2997f0
Packit 2997f0
Packit 2997f0
typedef struct rd_kafka_itopic_s rd_kafka_itopic_t;
Packit 2997f0
typedef struct rd_ikafka_s rd_ikafka_t;
Packit 2997f0
Packit 2997f0
Packit 2997f0
#define rd_kafka_assert(rk, cond) do {                                  \
Packit 2997f0
                if (unlikely(!(cond)))                                  \
Packit 2997f0
                        rd_kafka_crash(__FILE__,__LINE__, __FUNCTION__, \
Packit 2997f0
                                       (rk), "assert: " # cond);        \
Packit 2997f0
        } while (0)
Packit 2997f0
Packit 2997f0
Packit 2997f0
void
Packit 2997f0
RD_NORETURN
Packit 2997f0
rd_kafka_crash (const char *file, int line, const char *function,
Packit 2997f0
                rd_kafka_t *rk, const char *reason);
Packit 2997f0
Packit 2997f0
Packit 2997f0
/* Forward declarations */
Packit 2997f0
struct rd_kafka_s;
Packit 2997f0
struct rd_kafka_itopic_s;
Packit 2997f0
struct rd_kafka_msg_s;
Packit 2997f0
struct rd_kafka_broker_s;
Packit 2997f0
struct rd_kafka_toppar_s;
Packit 2997f0
Packit 2997f0
typedef RD_SHARED_PTR_TYPE(, struct rd_kafka_toppar_s) shptr_rd_kafka_toppar_t;
Packit 2997f0
typedef RD_SHARED_PTR_TYPE(, struct rd_kafka_itopic_s) shptr_rd_kafka_itopic_t;
Packit 2997f0
Packit 2997f0
Packit 2997f0
Packit 2997f0
#include "rdkafka_op.h"
Packit 2997f0
#include "rdkafka_queue.h"
Packit 2997f0
#include "rdkafka_msg.h"
Packit 2997f0
#include "rdkafka_proto.h"
Packit 2997f0
#include "rdkafka_buf.h"
Packit 2997f0
#include "rdkafka_pattern.h"
Packit 2997f0
#include "rdkafka_conf.h"
Packit 2997f0
#include "rdkafka_transport.h"
Packit 2997f0
#include "rdkafka_timer.h"
Packit 2997f0
#include "rdkafka_assignor.h"
Packit 2997f0
#include "rdkafka_metadata.h"
Packit 2997f0
Packit 2997f0
Packit 2997f0
/**
Packit 2997f0
 * Protocol level sanity
Packit 2997f0
 */
Packit 2997f0
#define RD_KAFKAP_BROKERS_MAX     1000
Packit 2997f0
#define RD_KAFKAP_TOPICS_MAX      1000000
Packit 2997f0
#define RD_KAFKAP_PARTITIONS_MAX  10000
Packit 2997f0
Packit 2997f0
Packit 2997f0
#define RD_KAFKA_OFFSET_IS_LOGICAL(OFF)  ((OFF) < 0)
Packit 2997f0
Packit 2997f0
Packit 2997f0
Packit 2997f0
Packit 2997f0
Packit 2997f0
Packit 2997f0
Packit 2997f0
/**
Packit 2997f0
 * Kafka handle, internal representation of the application's rd_kafka_t.
Packit 2997f0
 */
Packit 2997f0
Packit 2997f0
typedef RD_SHARED_PTR_TYPE(shptr_rd_ikafka_s, rd_ikafka_t) shptr_rd_ikafka_t;
Packit 2997f0
Packit 2997f0
struct rd_kafka_s {
Packit 2997f0
	rd_kafka_q_t *rk_rep;   /* kafka -> application reply queue */
Packit 2997f0
	rd_kafka_q_t *rk_ops;   /* any -> rdkafka main thread ops */
Packit 2997f0
Packit 2997f0
	TAILQ_HEAD(, rd_kafka_broker_s) rk_brokers;
Packit 2997f0
        rd_list_t                  rk_broker_by_id; /* Fast id lookups. */
Packit 2997f0
	rd_atomic32_t              rk_broker_cnt;
Packit 2997f0
	rd_atomic32_t              rk_broker_down_cnt;
Packit 2997f0
        mtx_t                      rk_internal_rkb_lock;
Packit 2997f0
	rd_kafka_broker_t         *rk_internal_rkb;
Packit 2997f0
Packit 2997f0
	/* Broadcasting of broker state changes to wake up
Packit 2997f0
	 * functions waiting for a state change. */
Packit 2997f0
	cnd_t                      rk_broker_state_change_cnd;
Packit 2997f0
	mtx_t                      rk_broker_state_change_lock;
Packit 2997f0
	int                        rk_broker_state_change_version;
Packit 2997f0
Packit 2997f0
Packit 2997f0
	TAILQ_HEAD(, rd_kafka_itopic_s)  rk_topics;
Packit 2997f0
	int              rk_topic_cnt;
Packit 2997f0
Packit 2997f0
        struct rd_kafka_cgrp_s *rk_cgrp;
Packit 2997f0
Packit 2997f0
        rd_kafka_conf_t  rk_conf;
Packit 2997f0
        rd_kafka_q_t    *rk_logq;          /* Log queue if `log.queue` set */
Packit 2997f0
        char             rk_name[128];
Packit 2997f0
	rd_kafkap_str_t *rk_client_id;
Packit 2997f0
        rd_kafkap_str_t *rk_group_id;    /* Consumer group id */
Packit 2997f0
Packit 2997f0
	int              rk_flags;
Packit 2997f0
	rd_atomic32_t    rk_terminate;
Packit 2997f0
	rwlock_t         rk_lock;
Packit 2997f0
	rd_kafka_type_t  rk_type;
Packit 2997f0
	struct timeval   rk_tv_state_change;
Packit 2997f0
Packit 2997f0
	rd_atomic32_t    rk_last_throttle;  /* Last throttle_time_ms value
Packit 2997f0
					     * from broker. */
Packit 2997f0
Packit 2997f0
        /* Locks: rd_kafka_*lock() */
Packit 2997f0
        rd_ts_t          rk_ts_metadata;    /* Timestamp of most recent
Packit 2997f0
                                             * metadata. */
Packit 2997f0
Packit 2997f0
	struct rd_kafka_metadata *rk_full_metadata; /* Last full metadata. */
Packit 2997f0
	rd_ts_t          rk_ts_full_metadata;       /* Timesstamp of .. */
Packit 2997f0
        struct rd_kafka_metadata_cache rk_metadata_cache; /* Metadata cache */
Packit 2997f0
Packit 2997f0
        char            *rk_clusterid;      /* ClusterId from metadata */
Packit 2997f0
Packit 2997f0
        /* Simple consumer count:
Packit 2997f0
         *  >0: Running in legacy / Simple Consumer mode,
Packit 2997f0
         *   0: No consumers running
Packit 2997f0
         *  <0: Running in High level consumer mode */
Packit 2997f0
        rd_atomic32_t    rk_simple_cnt;
Packit 2997f0
Packit 2997f0
        /**
Packit 2997f0
         * Exactly Once Semantics
Packit 2997f0
         */
Packit 2997f0
        struct {
Packit 2997f0
                rd_kafkap_str_t *TransactionalId;
Packit 2997f0
                int64_t          PID;
Packit 2997f0
                int16_t          ProducerEpoch;
Packit 2997f0
        } rk_eos;
Packit 2997f0
Packit 2997f0
	const rd_kafkap_bytes_t *rk_null_bytes;
Packit 2997f0
Packit 2997f0
	struct {
Packit 2997f0
		mtx_t lock;       /* Protects acces to this struct */
Packit 2997f0
		cnd_t cnd;        /* For waking up blocking injectors */
Packit 2997f0
		unsigned int cnt; /* Current message count */
Packit 2997f0
		size_t size;      /* Current message size sum */
Packit 2997f0
	        unsigned int max_cnt; /* Max limit */
Packit 2997f0
		size_t max_size; /* Max limit */
Packit 2997f0
	} rk_curr_msgs;
Packit 2997f0
Packit 2997f0
        rd_kafka_timers_t rk_timers;
Packit 2997f0
	thrd_t rk_thread;
Packit 2997f0
Packit 2997f0
        int rk_initialized;
Packit 2997f0
};
Packit 2997f0
Packit 2997f0
#define rd_kafka_wrlock(rk)    rwlock_wrlock(&(rk)->rk_lock)
Packit 2997f0
#define rd_kafka_rdlock(rk)    rwlock_rdlock(&(rk)->rk_lock)
Packit 2997f0
#define rd_kafka_rdunlock(rk)    rwlock_rdunlock(&(rk)->rk_lock)
Packit 2997f0
#define rd_kafka_wrunlock(rk)    rwlock_wrunlock(&(rk)->rk_lock)
Packit 2997f0
Packit 2997f0
/**
Packit 2997f0
 * @brief Add \p cnt messages and of total size \p size bytes to the
Packit 2997f0
 *        internal bookkeeping of current message counts.
Packit 2997f0
 *        If the total message count or size after add would exceed the
Packit 2997f0
 *        configured limits \c queue.buffering.max.messages and
Packit 2997f0
 *        \c queue.buffering.max.kbytes then depending on the value of
Packit 2997f0
 *        \p block the function either blocks until enough space is available
Packit 2997f0
 *        if \p block is 1, else immediately returns
Packit 2997f0
 *        RD_KAFKA_RESP_ERR__QUEUE_FULL.
Packit 2997f0
 *
Packit 2997f0
 * @param rdmtx If non-null and \p block is set and blocking is to ensue,
Packit 2997f0
 *              then unlock this mutex for the duration of the blocking
Packit 2997f0
 *              and then reacquire with a read-lock.
Packit 2997f0
 */
Packit 2997f0
static RD_INLINE RD_UNUSED rd_kafka_resp_err_t
Packit 2997f0
rd_kafka_curr_msgs_add (rd_kafka_t *rk, unsigned int cnt, size_t size,
Packit 2997f0
			int block, rwlock_t *rdlock) {
Packit 2997f0
Packit 2997f0
	if (rk->rk_type != RD_KAFKA_PRODUCER)
Packit 2997f0
		return RD_KAFKA_RESP_ERR_NO_ERROR;
Packit 2997f0
Packit 2997f0
	mtx_lock(&rk->rk_curr_msgs.lock);
Packit 2997f0
	while (unlikely(rk->rk_curr_msgs.cnt + cnt >
Packit 2997f0
			rk->rk_curr_msgs.max_cnt ||
Packit 2997f0
			(unsigned long long)(rk->rk_curr_msgs.size + size) >
Packit 2997f0
			(unsigned long long)rk->rk_curr_msgs.max_size)) {
Packit 2997f0
		if (!block) {
Packit 2997f0
			mtx_unlock(&rk->rk_curr_msgs.lock);
Packit 2997f0
			return RD_KAFKA_RESP_ERR__QUEUE_FULL;
Packit 2997f0
		}
Packit 2997f0
Packit 2997f0
                if (rdlock)
Packit 2997f0
                        rwlock_rdunlock(rdlock);
Packit 2997f0
Packit 2997f0
		cnd_wait(&rk->rk_curr_msgs.cnd, &rk->rk_curr_msgs.lock);
Packit 2997f0
Packit 2997f0
                if (rdlock)
Packit 2997f0
                        rwlock_rdlock(rdlock);
Packit 2997f0
Packit 2997f0
	}
Packit 2997f0
Packit 2997f0
	rk->rk_curr_msgs.cnt  += cnt;
Packit 2997f0
	rk->rk_curr_msgs.size += size;
Packit 2997f0
	mtx_unlock(&rk->rk_curr_msgs.lock);
Packit 2997f0
Packit 2997f0
	return RD_KAFKA_RESP_ERR_NO_ERROR;
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
Packit 2997f0
/**
Packit 2997f0
 * @brief Subtract \p cnt messages of total size \p size from the
Packit 2997f0
 *        current bookkeeping and broadcast a wakeup on the condvar
Packit 2997f0
 *        for any waiting & blocking threads.
Packit 2997f0
 */
Packit 2997f0
static RD_INLINE RD_UNUSED void
Packit 2997f0
rd_kafka_curr_msgs_sub (rd_kafka_t *rk, unsigned int cnt, size_t size) {
Packit 2997f0
        int broadcast = 0;
Packit 2997f0
Packit 2997f0
	if (rk->rk_type != RD_KAFKA_PRODUCER)
Packit 2997f0
		return;
Packit 2997f0
Packit 2997f0
	mtx_lock(&rk->rk_curr_msgs.lock);
Packit 2997f0
	rd_kafka_assert(NULL,
Packit 2997f0
			rk->rk_curr_msgs.cnt >= cnt &&
Packit 2997f0
			rk->rk_curr_msgs.size >= size);
Packit 2997f0
Packit 2997f0
        /* If the subtraction would pass one of the thresholds
Packit 2997f0
         * broadcast a wake-up to any waiting listeners. */
Packit 2997f0
        if ((rk->rk_curr_msgs.cnt >= rk->rk_curr_msgs.max_cnt &&
Packit 2997f0
             rk->rk_curr_msgs.cnt - cnt < rk->rk_curr_msgs.max_cnt) ||
Packit 2997f0
            (rk->rk_curr_msgs.size >= rk->rk_curr_msgs.max_size &&
Packit 2997f0
             rk->rk_curr_msgs.size - size < rk->rk_curr_msgs.max_size))
Packit 2997f0
                broadcast = 1;
Packit 2997f0
Packit 2997f0
	rk->rk_curr_msgs.cnt  -= cnt;
Packit 2997f0
	rk->rk_curr_msgs.size -= size;
Packit 2997f0
Packit 2997f0
        if (unlikely(broadcast))
Packit 2997f0
                cnd_broadcast(&rk->rk_curr_msgs.cnd);
Packit 2997f0
Packit 2997f0
	mtx_unlock(&rk->rk_curr_msgs.lock);
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
static RD_INLINE RD_UNUSED void
Packit 2997f0
rd_kafka_curr_msgs_get (rd_kafka_t *rk, unsigned int *cntp, size_t *sizep) {
Packit 2997f0
	if (rk->rk_type != RD_KAFKA_PRODUCER) {
Packit 2997f0
		*cntp = 0;
Packit 2997f0
		*sizep = 0;
Packit 2997f0
		return;
Packit 2997f0
	}
Packit 2997f0
Packit 2997f0
	mtx_lock(&rk->rk_curr_msgs.lock);
Packit 2997f0
	*cntp = rk->rk_curr_msgs.cnt;
Packit 2997f0
	*sizep = rk->rk_curr_msgs.size;
Packit 2997f0
	mtx_unlock(&rk->rk_curr_msgs.lock);
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
static RD_INLINE RD_UNUSED int
Packit 2997f0
rd_kafka_curr_msgs_cnt (rd_kafka_t *rk) {
Packit 2997f0
	int cnt;
Packit 2997f0
	if (rk->rk_type != RD_KAFKA_PRODUCER)
Packit 2997f0
		return 0;
Packit 2997f0
Packit 2997f0
	mtx_lock(&rk->rk_curr_msgs.lock);
Packit 2997f0
	cnt = rk->rk_curr_msgs.cnt;
Packit 2997f0
	mtx_unlock(&rk->rk_curr_msgs.lock);
Packit 2997f0
Packit 2997f0
	return cnt;
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
Packit 2997f0
void rd_kafka_destroy_final (rd_kafka_t *rk);
Packit 2997f0
Packit 2997f0
Packit 2997f0
/**
Packit 2997f0
 * Returns true if 'rk' handle is terminating.
Packit 2997f0
 */
Packit 2997f0
#define rd_kafka_terminating(rk) (rd_atomic32_get(&(rk)->rk_terminate))
Packit 2997f0
Packit 2997f0
#define rd_kafka_is_simple_consumer(rk) \
Packit 2997f0
        (rd_atomic32_get(&(rk)->rk_simple_cnt) > 0)
Packit 2997f0
int rd_kafka_simple_consumer_add (rd_kafka_t *rk);
Packit 2997f0
Packit 2997f0
Packit 2997f0
#include "rdkafka_topic.h"
Packit 2997f0
#include "rdkafka_partition.h"
Packit 2997f0
Packit 2997f0
Packit 2997f0
Packit 2997f0
Packit 2997f0
Packit 2997f0
Packit 2997f0
Packit 2997f0
Packit 2997f0
Packit 2997f0
Packit 2997f0
Packit 2997f0
Packit 2997f0
Packit 2997f0
Packit 2997f0
/**
Packit 2997f0
 * Debug contexts
Packit 2997f0
 */
Packit 2997f0
#define RD_KAFKA_DBG_GENERIC        0x1
Packit 2997f0
#define RD_KAFKA_DBG_BROKER         0x2
Packit 2997f0
#define RD_KAFKA_DBG_TOPIC          0x4
Packit 2997f0
#define RD_KAFKA_DBG_METADATA       0x8
Packit 2997f0
#define RD_KAFKA_DBG_FEATURE        0x10
Packit 2997f0
#define RD_KAFKA_DBG_QUEUE          0x20
Packit 2997f0
#define RD_KAFKA_DBG_MSG            0x40
Packit 2997f0
#define RD_KAFKA_DBG_PROTOCOL       0x80
Packit 2997f0
#define RD_KAFKA_DBG_CGRP           0x100
Packit 2997f0
#define RD_KAFKA_DBG_SECURITY       0x200
Packit 2997f0
#define RD_KAFKA_DBG_FETCH          0x400
Packit 2997f0
#define RD_KAFKA_DBG_INTERCEPTOR    0x800
Packit 2997f0
#define RD_KAFKA_DBG_PLUGIN         0x1000
Packit 2997f0
#define RD_KAFKA_DBG_CONSUMER       0x2000
Packit 2997f0
#define RD_KAFKA_DBG_ALL            0xffff
Packit 2997f0
#define RD_KAFKA_DBG_NONE           0x0
Packit 2997f0
Packit 2997f0
void rd_kafka_log0(const rd_kafka_conf_t *conf,
Packit 2997f0
                   const rd_kafka_t *rk, const char *extra, int level,
Packit 2997f0
                   const char *fac, const char *fmt, ...) RD_FORMAT(printf,
Packit 2997f0
                                                                    6, 7);
Packit 2997f0
Packit 2997f0
#define rd_kafka_log(rk,level,fac,...) \
Packit 2997f0
        rd_kafka_log0(&rk->rk_conf, rk, NULL, level, fac, __VA_ARGS__)
Packit 2997f0
#define rd_kafka_dbg(rk,ctx,fac,...) do {                               \
Packit 2997f0
                if (unlikely((rk)->rk_conf.debug & (RD_KAFKA_DBG_ ## ctx))) \
Packit 2997f0
                        rd_kafka_log0(&rk->rk_conf,rk,NULL,             \
Packit 2997f0
                                      LOG_DEBUG,fac,__VA_ARGS__);       \
Packit 2997f0
        } while (0)
Packit 2997f0
Packit 2997f0
/* dbg() not requiring an rk, just the conf object, for early logging */
Packit 2997f0
#define rd_kafka_dbg0(conf,ctx,fac,...) do {                            \
Packit 2997f0
                if (unlikely((conf)->debug & (RD_KAFKA_DBG_ ## ctx)))   \
Packit 2997f0
                        rd_kafka_log0(conf,NULL,NULL,                   \
Packit 2997f0
                                      LOG_DEBUG,fac,__VA_ARGS__);       \
Packit 2997f0
        } while (0)
Packit 2997f0
Packit 2997f0
/* NOTE: The local copy of _logname is needed due rkb_logname_lock lock-ordering
Packit 2997f0
 *       when logging another broker's name in the message. */
Packit 2997f0
#define rd_rkb_log(rkb,level,fac,...) do {				\
Packit 2997f0
		char _logname[RD_KAFKA_NODENAME_SIZE];			\
Packit 2997f0
                mtx_lock(&(rkb)->rkb_logname_lock);                     \
Packit 2997f0
		strncpy(_logname, rkb->rkb_logname, sizeof(_logname)-1); \
Packit 2997f0
		_logname[RD_KAFKA_NODENAME_SIZE-1] = '\0';		\
Packit 2997f0
                mtx_unlock(&(rkb)->rkb_logname_lock);                   \
Packit 2997f0
		rd_kafka_log0(&(rkb)->rkb_rk->rk_conf, \
Packit 2997f0
                              (rkb)->rkb_rk, _logname,                  \
Packit 2997f0
                              level, fac, __VA_ARGS__);                 \
Packit 2997f0
        } while (0)
Packit 2997f0
Packit 2997f0
#define rd_rkb_dbg(rkb,ctx,fac,...) do {				\
Packit 2997f0
		if (unlikely((rkb)->rkb_rk->rk_conf.debug &		\
Packit 2997f0
			     (RD_KAFKA_DBG_ ## ctx))) {			\
Packit 2997f0
			rd_rkb_log(rkb, LOG_DEBUG, fac, __VA_ARGS__);	\
Packit 2997f0
                }                                                       \
Packit 2997f0
	} while (0)
Packit 2997f0
Packit 2997f0
Packit 2997f0
Packit 2997f0
extern rd_kafka_resp_err_t RD_TLS rd_kafka_last_error_code;
Packit 2997f0
Packit 2997f0
static RD_UNUSED RD_INLINE
Packit 2997f0
rd_kafka_resp_err_t rd_kafka_set_last_error (rd_kafka_resp_err_t err,
Packit 2997f0
					     int errnox) {
Packit 2997f0
        if (errnox) {
Packit 2997f0
#ifdef _MSC_VER
Packit 2997f0
                /* This is the correct way to set errno on Windows,
Packit 2997f0
                 * but it is still pointless due to different errnos in
Packit 2997f0
                 * in different runtimes:
Packit 2997f0
                 * https://social.msdn.microsoft.com/Forums/vstudio/en-US/b4500c0d-1b69-40c7-9ef5-08da1025b5bf/setting-errno-from-within-a-dll?forum=vclanguage/
Packit 2997f0
                 * errno is thus highly deprecated, and buggy, on Windows
Packit 2997f0
                 * when using librdkafka as a dynamically loaded DLL. */
Packit 2997f0
                _set_errno(errnox);
Packit 2997f0
#else
Packit 2997f0
                errno = errnox;
Packit 2997f0
#endif
Packit 2997f0
        }
Packit 2997f0
	rd_kafka_last_error_code = err;
Packit 2997f0
	return err;
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
Packit 2997f0
extern rd_atomic32_t rd_kafka_thread_cnt_curr;
Packit 2997f0
Packit 2997f0
void rd_kafka_set_thread_name (const char *fmt, ...);
Packit 2997f0
void rd_kafka_set_thread_sysname (const char *fmt, ...);
Packit 2997f0
Packit 2997f0
int rd_kafka_path_is_dir (const char *path);
Packit 2997f0
Packit 2997f0
rd_kafka_op_res_t
Packit 2997f0
rd_kafka_poll_cb (rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko,
Packit 2997f0
                  rd_kafka_q_cb_type_t cb_type, void *opaque);
Packit 2997f0
Packit 2997f0
rd_kafka_resp_err_t rd_kafka_subscribe_rkt (rd_kafka_itopic_t *rkt);
Packit 2997f0
Packit 2997f0
#endif /* _RDKAFKA_INT_H_ */