|
Packit |
2997f0 |
/*
|
|
Packit |
2997f0 |
* librdkafka - Apache Kafka C library
|
|
Packit |
2997f0 |
*
|
|
Packit |
2997f0 |
* Copyright (c) 2012-2013, Magnus Edenhill
|
|
Packit |
2997f0 |
* All rights reserved.
|
|
Packit |
2997f0 |
*
|
|
Packit |
2997f0 |
* Redistribution and use in source and binary forms, with or without
|
|
Packit |
2997f0 |
* modification, are permitted provided that the following conditions are met:
|
|
Packit |
2997f0 |
*
|
|
Packit |
2997f0 |
* 1. Redistributions of source code must retain the above copyright notice,
|
|
Packit |
2997f0 |
* this list of conditions and the following disclaimer.
|
|
Packit |
2997f0 |
* 2. Redistributions in binary form must reproduce the above copyright notice,
|
|
Packit |
2997f0 |
* this list of conditions and the following disclaimer in the documentation
|
|
Packit |
2997f0 |
* and/or other materials provided with the distribution.
|
|
Packit |
2997f0 |
*
|
|
Packit |
2997f0 |
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
|
Packit |
2997f0 |
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
|
Packit |
2997f0 |
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
|
Packit |
2997f0 |
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
|
|
Packit |
2997f0 |
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
|
|
Packit |
2997f0 |
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
|
|
Packit |
2997f0 |
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
|
Packit |
2997f0 |
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
|
|
Packit |
2997f0 |
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
|
|
Packit |
2997f0 |
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
|
Packit |
2997f0 |
* POSSIBILITY OF SUCH DAMAGE.
|
|
Packit |
2997f0 |
*/
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
#ifndef _RDKAFKA_INT_H_
|
|
Packit |
2997f0 |
#define _RDKAFKA_INT_H_
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
#ifndef _MSC_VER
|
|
Packit |
2997f0 |
#define _GNU_SOURCE /* for strndup() */
|
|
Packit |
2997f0 |
#include <syslog.h>
|
|
Packit |
2997f0 |
#else
|
|
Packit |
2997f0 |
typedef int mode_t;
|
|
Packit |
2997f0 |
#endif
|
|
Packit |
2997f0 |
#include <fcntl.h>
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
#include "rdsysqueue.h"
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
#include "rdkafka.h"
|
|
Packit |
2997f0 |
#include "rd.h"
|
|
Packit |
2997f0 |
#include "rdlog.h"
|
|
Packit |
2997f0 |
#include "rdtime.h"
|
|
Packit |
2997f0 |
#include "rdaddr.h"
|
|
Packit |
2997f0 |
#include "rdinterval.h"
|
|
Packit |
2997f0 |
#include "rdavg.h"
|
|
Packit |
2997f0 |
#include "rdlist.h"
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
#if WITH_SSL
|
|
Packit |
2997f0 |
#include <openssl/ssl.h>
|
|
Packit |
2997f0 |
#endif
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
typedef struct rd_kafka_itopic_s rd_kafka_itopic_t;
|
|
Packit |
2997f0 |
typedef struct rd_ikafka_s rd_ikafka_t;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
#define rd_kafka_assert(rk, cond) do { \
|
|
Packit |
2997f0 |
if (unlikely(!(cond))) \
|
|
Packit |
2997f0 |
rd_kafka_crash(__FILE__,__LINE__, __FUNCTION__, \
|
|
Packit |
2997f0 |
(rk), "assert: " # cond); \
|
|
Packit |
2997f0 |
} while (0)
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
void
|
|
Packit |
2997f0 |
RD_NORETURN
|
|
Packit |
2997f0 |
rd_kafka_crash (const char *file, int line, const char *function,
|
|
Packit |
2997f0 |
rd_kafka_t *rk, const char *reason);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Forward declarations */
|
|
Packit |
2997f0 |
struct rd_kafka_s;
|
|
Packit |
2997f0 |
struct rd_kafka_itopic_s;
|
|
Packit |
2997f0 |
struct rd_kafka_msg_s;
|
|
Packit |
2997f0 |
struct rd_kafka_broker_s;
|
|
Packit |
2997f0 |
struct rd_kafka_toppar_s;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
typedef RD_SHARED_PTR_TYPE(, struct rd_kafka_toppar_s) shptr_rd_kafka_toppar_t;
|
|
Packit |
2997f0 |
typedef RD_SHARED_PTR_TYPE(, struct rd_kafka_itopic_s) shptr_rd_kafka_itopic_t;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
#include "rdkafka_op.h"
|
|
Packit |
2997f0 |
#include "rdkafka_queue.h"
|
|
Packit |
2997f0 |
#include "rdkafka_msg.h"
|
|
Packit |
2997f0 |
#include "rdkafka_proto.h"
|
|
Packit |
2997f0 |
#include "rdkafka_buf.h"
|
|
Packit |
2997f0 |
#include "rdkafka_pattern.h"
|
|
Packit |
2997f0 |
#include "rdkafka_conf.h"
|
|
Packit |
2997f0 |
#include "rdkafka_transport.h"
|
|
Packit |
2997f0 |
#include "rdkafka_timer.h"
|
|
Packit |
2997f0 |
#include "rdkafka_assignor.h"
|
|
Packit |
2997f0 |
#include "rdkafka_metadata.h"
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/**
|
|
Packit |
2997f0 |
* Protocol level sanity
|
|
Packit |
2997f0 |
*/
|
|
Packit |
2997f0 |
#define RD_KAFKAP_BROKERS_MAX 1000
|
|
Packit |
2997f0 |
#define RD_KAFKAP_TOPICS_MAX 1000000
|
|
Packit |
2997f0 |
#define RD_KAFKAP_PARTITIONS_MAX 10000
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
#define RD_KAFKA_OFFSET_IS_LOGICAL(OFF) ((OFF) < 0)
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/**
|
|
Packit |
2997f0 |
* Kafka handle, internal representation of the application's rd_kafka_t.
|
|
Packit |
2997f0 |
*/
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
typedef RD_SHARED_PTR_TYPE(shptr_rd_ikafka_s, rd_ikafka_t) shptr_rd_ikafka_t;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
struct rd_kafka_s {
|
|
Packit |
2997f0 |
rd_kafka_q_t *rk_rep; /* kafka -> application reply queue */
|
|
Packit |
2997f0 |
rd_kafka_q_t *rk_ops; /* any -> rdkafka main thread ops */
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
TAILQ_HEAD(, rd_kafka_broker_s) rk_brokers;
|
|
Packit |
2997f0 |
rd_list_t rk_broker_by_id; /* Fast id lookups. */
|
|
Packit |
2997f0 |
rd_atomic32_t rk_broker_cnt;
|
|
Packit |
2997f0 |
rd_atomic32_t rk_broker_down_cnt;
|
|
Packit |
2997f0 |
mtx_t rk_internal_rkb_lock;
|
|
Packit |
2997f0 |
rd_kafka_broker_t *rk_internal_rkb;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Broadcasting of broker state changes to wake up
|
|
Packit |
2997f0 |
* functions waiting for a state change. */
|
|
Packit |
2997f0 |
cnd_t rk_broker_state_change_cnd;
|
|
Packit |
2997f0 |
mtx_t rk_broker_state_change_lock;
|
|
Packit |
2997f0 |
int rk_broker_state_change_version;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
TAILQ_HEAD(, rd_kafka_itopic_s) rk_topics;
|
|
Packit |
2997f0 |
int rk_topic_cnt;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
struct rd_kafka_cgrp_s *rk_cgrp;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
rd_kafka_conf_t rk_conf;
|
|
Packit |
2997f0 |
rd_kafka_q_t *rk_logq; /* Log queue if `log.queue` set */
|
|
Packit |
2997f0 |
char rk_name[128];
|
|
Packit |
2997f0 |
rd_kafkap_str_t *rk_client_id;
|
|
Packit |
2997f0 |
rd_kafkap_str_t *rk_group_id; /* Consumer group id */
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
int rk_flags;
|
|
Packit |
2997f0 |
rd_atomic32_t rk_terminate;
|
|
Packit |
2997f0 |
rwlock_t rk_lock;
|
|
Packit |
2997f0 |
rd_kafka_type_t rk_type;
|
|
Packit |
2997f0 |
struct timeval rk_tv_state_change;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
rd_atomic32_t rk_last_throttle; /* Last throttle_time_ms value
|
|
Packit |
2997f0 |
* from broker. */
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Locks: rd_kafka_*lock() */
|
|
Packit |
2997f0 |
rd_ts_t rk_ts_metadata; /* Timestamp of most recent
|
|
Packit |
2997f0 |
* metadata. */
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
struct rd_kafka_metadata *rk_full_metadata; /* Last full metadata. */
|
|
Packit |
2997f0 |
rd_ts_t rk_ts_full_metadata; /* Timesstamp of .. */
|
|
Packit |
2997f0 |
struct rd_kafka_metadata_cache rk_metadata_cache; /* Metadata cache */
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
char *rk_clusterid; /* ClusterId from metadata */
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Simple consumer count:
|
|
Packit |
2997f0 |
* >0: Running in legacy / Simple Consumer mode,
|
|
Packit |
2997f0 |
* 0: No consumers running
|
|
Packit |
2997f0 |
* <0: Running in High level consumer mode */
|
|
Packit |
2997f0 |
rd_atomic32_t rk_simple_cnt;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/**
|
|
Packit |
2997f0 |
* Exactly Once Semantics
|
|
Packit |
2997f0 |
*/
|
|
Packit |
2997f0 |
struct {
|
|
Packit |
2997f0 |
rd_kafkap_str_t *TransactionalId;
|
|
Packit |
2997f0 |
int64_t PID;
|
|
Packit |
2997f0 |
int16_t ProducerEpoch;
|
|
Packit |
2997f0 |
} rk_eos;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
const rd_kafkap_bytes_t *rk_null_bytes;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
struct {
|
|
Packit |
2997f0 |
mtx_t lock; /* Protects acces to this struct */
|
|
Packit |
2997f0 |
cnd_t cnd; /* For waking up blocking injectors */
|
|
Packit |
2997f0 |
unsigned int cnt; /* Current message count */
|
|
Packit |
2997f0 |
size_t size; /* Current message size sum */
|
|
Packit |
2997f0 |
unsigned int max_cnt; /* Max limit */
|
|
Packit |
2997f0 |
size_t max_size; /* Max limit */
|
|
Packit |
2997f0 |
} rk_curr_msgs;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
rd_kafka_timers_t rk_timers;
|
|
Packit |
2997f0 |
thrd_t rk_thread;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
int rk_initialized;
|
|
Packit |
2997f0 |
};
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
#define rd_kafka_wrlock(rk) rwlock_wrlock(&(rk)->rk_lock)
|
|
Packit |
2997f0 |
#define rd_kafka_rdlock(rk) rwlock_rdlock(&(rk)->rk_lock)
|
|
Packit |
2997f0 |
#define rd_kafka_rdunlock(rk) rwlock_rdunlock(&(rk)->rk_lock)
|
|
Packit |
2997f0 |
#define rd_kafka_wrunlock(rk) rwlock_wrunlock(&(rk)->rk_lock)
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/**
|
|
Packit |
2997f0 |
* @brief Add \p cnt messages and of total size \p size bytes to the
|
|
Packit |
2997f0 |
* internal bookkeeping of current message counts.
|
|
Packit |
2997f0 |
* If the total message count or size after add would exceed the
|
|
Packit |
2997f0 |
* configured limits \c queue.buffering.max.messages and
|
|
Packit |
2997f0 |
* \c queue.buffering.max.kbytes then depending on the value of
|
|
Packit |
2997f0 |
* \p block the function either blocks until enough space is available
|
|
Packit |
2997f0 |
* if \p block is 1, else immediately returns
|
|
Packit |
2997f0 |
* RD_KAFKA_RESP_ERR__QUEUE_FULL.
|
|
Packit |
2997f0 |
*
|
|
Packit |
2997f0 |
* @param rdmtx If non-null and \p block is set and blocking is to ensue,
|
|
Packit |
2997f0 |
* then unlock this mutex for the duration of the blocking
|
|
Packit |
2997f0 |
* and then reacquire with a read-lock.
|
|
Packit |
2997f0 |
*/
|
|
Packit |
2997f0 |
static RD_INLINE RD_UNUSED rd_kafka_resp_err_t
|
|
Packit |
2997f0 |
rd_kafka_curr_msgs_add (rd_kafka_t *rk, unsigned int cnt, size_t size,
|
|
Packit |
2997f0 |
int block, rwlock_t *rdlock) {
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (rk->rk_type != RD_KAFKA_PRODUCER)
|
|
Packit |
2997f0 |
return RD_KAFKA_RESP_ERR_NO_ERROR;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
mtx_lock(&rk->rk_curr_msgs.lock);
|
|
Packit |
2997f0 |
while (unlikely(rk->rk_curr_msgs.cnt + cnt >
|
|
Packit |
2997f0 |
rk->rk_curr_msgs.max_cnt ||
|
|
Packit |
2997f0 |
(unsigned long long)(rk->rk_curr_msgs.size + size) >
|
|
Packit |
2997f0 |
(unsigned long long)rk->rk_curr_msgs.max_size)) {
|
|
Packit |
2997f0 |
if (!block) {
|
|
Packit |
2997f0 |
mtx_unlock(&rk->rk_curr_msgs.lock);
|
|
Packit |
2997f0 |
return RD_KAFKA_RESP_ERR__QUEUE_FULL;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (rdlock)
|
|
Packit |
2997f0 |
rwlock_rdunlock(rdlock);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
cnd_wait(&rk->rk_curr_msgs.cnd, &rk->rk_curr_msgs.lock);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (rdlock)
|
|
Packit |
2997f0 |
rwlock_rdlock(rdlock);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
rk->rk_curr_msgs.cnt += cnt;
|
|
Packit |
2997f0 |
rk->rk_curr_msgs.size += size;
|
|
Packit |
2997f0 |
mtx_unlock(&rk->rk_curr_msgs.lock);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
return RD_KAFKA_RESP_ERR_NO_ERROR;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/**
|
|
Packit |
2997f0 |
* @brief Subtract \p cnt messages of total size \p size from the
|
|
Packit |
2997f0 |
* current bookkeeping and broadcast a wakeup on the condvar
|
|
Packit |
2997f0 |
* for any waiting & blocking threads.
|
|
Packit |
2997f0 |
*/
|
|
Packit |
2997f0 |
static RD_INLINE RD_UNUSED void
|
|
Packit |
2997f0 |
rd_kafka_curr_msgs_sub (rd_kafka_t *rk, unsigned int cnt, size_t size) {
|
|
Packit |
2997f0 |
int broadcast = 0;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (rk->rk_type != RD_KAFKA_PRODUCER)
|
|
Packit |
2997f0 |
return;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
mtx_lock(&rk->rk_curr_msgs.lock);
|
|
Packit |
2997f0 |
rd_kafka_assert(NULL,
|
|
Packit |
2997f0 |
rk->rk_curr_msgs.cnt >= cnt &&
|
|
Packit |
2997f0 |
rk->rk_curr_msgs.size >= size);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* If the subtraction would pass one of the thresholds
|
|
Packit |
2997f0 |
* broadcast a wake-up to any waiting listeners. */
|
|
Packit |
2997f0 |
if ((rk->rk_curr_msgs.cnt >= rk->rk_curr_msgs.max_cnt &&
|
|
Packit |
2997f0 |
rk->rk_curr_msgs.cnt - cnt < rk->rk_curr_msgs.max_cnt) ||
|
|
Packit |
2997f0 |
(rk->rk_curr_msgs.size >= rk->rk_curr_msgs.max_size &&
|
|
Packit |
2997f0 |
rk->rk_curr_msgs.size - size < rk->rk_curr_msgs.max_size))
|
|
Packit |
2997f0 |
broadcast = 1;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
rk->rk_curr_msgs.cnt -= cnt;
|
|
Packit |
2997f0 |
rk->rk_curr_msgs.size -= size;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (unlikely(broadcast))
|
|
Packit |
2997f0 |
cnd_broadcast(&rk->rk_curr_msgs.cnd);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
mtx_unlock(&rk->rk_curr_msgs.lock);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
static RD_INLINE RD_UNUSED void
|
|
Packit |
2997f0 |
rd_kafka_curr_msgs_get (rd_kafka_t *rk, unsigned int *cntp, size_t *sizep) {
|
|
Packit |
2997f0 |
if (rk->rk_type != RD_KAFKA_PRODUCER) {
|
|
Packit |
2997f0 |
*cntp = 0;
|
|
Packit |
2997f0 |
*sizep = 0;
|
|
Packit |
2997f0 |
return;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
mtx_lock(&rk->rk_curr_msgs.lock);
|
|
Packit |
2997f0 |
*cntp = rk->rk_curr_msgs.cnt;
|
|
Packit |
2997f0 |
*sizep = rk->rk_curr_msgs.size;
|
|
Packit |
2997f0 |
mtx_unlock(&rk->rk_curr_msgs.lock);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
static RD_INLINE RD_UNUSED int
|
|
Packit |
2997f0 |
rd_kafka_curr_msgs_cnt (rd_kafka_t *rk) {
|
|
Packit |
2997f0 |
int cnt;
|
|
Packit |
2997f0 |
if (rk->rk_type != RD_KAFKA_PRODUCER)
|
|
Packit |
2997f0 |
return 0;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
mtx_lock(&rk->rk_curr_msgs.lock);
|
|
Packit |
2997f0 |
cnt = rk->rk_curr_msgs.cnt;
|
|
Packit |
2997f0 |
mtx_unlock(&rk->rk_curr_msgs.lock);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
return cnt;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
void rd_kafka_destroy_final (rd_kafka_t *rk);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/**
|
|
Packit |
2997f0 |
* Returns true if 'rk' handle is terminating.
|
|
Packit |
2997f0 |
*/
|
|
Packit |
2997f0 |
#define rd_kafka_terminating(rk) (rd_atomic32_get(&(rk)->rk_terminate))
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
#define rd_kafka_is_simple_consumer(rk) \
|
|
Packit |
2997f0 |
(rd_atomic32_get(&(rk)->rk_simple_cnt) > 0)
|
|
Packit |
2997f0 |
int rd_kafka_simple_consumer_add (rd_kafka_t *rk);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
#include "rdkafka_topic.h"
|
|
Packit |
2997f0 |
#include "rdkafka_partition.h"
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/**
|
|
Packit |
2997f0 |
* Debug contexts
|
|
Packit |
2997f0 |
*/
|
|
Packit |
2997f0 |
#define RD_KAFKA_DBG_GENERIC 0x1
|
|
Packit |
2997f0 |
#define RD_KAFKA_DBG_BROKER 0x2
|
|
Packit |
2997f0 |
#define RD_KAFKA_DBG_TOPIC 0x4
|
|
Packit |
2997f0 |
#define RD_KAFKA_DBG_METADATA 0x8
|
|
Packit |
2997f0 |
#define RD_KAFKA_DBG_FEATURE 0x10
|
|
Packit |
2997f0 |
#define RD_KAFKA_DBG_QUEUE 0x20
|
|
Packit |
2997f0 |
#define RD_KAFKA_DBG_MSG 0x40
|
|
Packit |
2997f0 |
#define RD_KAFKA_DBG_PROTOCOL 0x80
|
|
Packit |
2997f0 |
#define RD_KAFKA_DBG_CGRP 0x100
|
|
Packit |
2997f0 |
#define RD_KAFKA_DBG_SECURITY 0x200
|
|
Packit |
2997f0 |
#define RD_KAFKA_DBG_FETCH 0x400
|
|
Packit |
2997f0 |
#define RD_KAFKA_DBG_INTERCEPTOR 0x800
|
|
Packit |
2997f0 |
#define RD_KAFKA_DBG_PLUGIN 0x1000
|
|
Packit |
2997f0 |
#define RD_KAFKA_DBG_CONSUMER 0x2000
|
|
Packit |
2997f0 |
#define RD_KAFKA_DBG_ALL 0xffff
|
|
Packit |
2997f0 |
#define RD_KAFKA_DBG_NONE 0x0
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
void rd_kafka_log0(const rd_kafka_conf_t *conf,
|
|
Packit |
2997f0 |
const rd_kafka_t *rk, const char *extra, int level,
|
|
Packit |
2997f0 |
const char *fac, const char *fmt, ...) RD_FORMAT(printf,
|
|
Packit |
2997f0 |
6, 7);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
#define rd_kafka_log(rk,level,fac,...) \
|
|
Packit |
2997f0 |
rd_kafka_log0(&rk->rk_conf, rk, NULL, level, fac, __VA_ARGS__)
|
|
Packit |
2997f0 |
#define rd_kafka_dbg(rk,ctx,fac,...) do { \
|
|
Packit |
2997f0 |
if (unlikely((rk)->rk_conf.debug & (RD_KAFKA_DBG_ ## ctx))) \
|
|
Packit |
2997f0 |
rd_kafka_log0(&rk->rk_conf,rk,NULL, \
|
|
Packit |
2997f0 |
LOG_DEBUG,fac,__VA_ARGS__); \
|
|
Packit |
2997f0 |
} while (0)
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* dbg() not requiring an rk, just the conf object, for early logging */
|
|
Packit |
2997f0 |
#define rd_kafka_dbg0(conf,ctx,fac,...) do { \
|
|
Packit |
2997f0 |
if (unlikely((conf)->debug & (RD_KAFKA_DBG_ ## ctx))) \
|
|
Packit |
2997f0 |
rd_kafka_log0(conf,NULL,NULL, \
|
|
Packit |
2997f0 |
LOG_DEBUG,fac,__VA_ARGS__); \
|
|
Packit |
2997f0 |
} while (0)
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* NOTE: The local copy of _logname is needed due rkb_logname_lock lock-ordering
|
|
Packit |
2997f0 |
* when logging another broker's name in the message. */
|
|
Packit |
2997f0 |
#define rd_rkb_log(rkb,level,fac,...) do { \
|
|
Packit |
2997f0 |
char _logname[RD_KAFKA_NODENAME_SIZE]; \
|
|
Packit |
2997f0 |
mtx_lock(&(rkb)->rkb_logname_lock); \
|
|
Packit |
2997f0 |
strncpy(_logname, rkb->rkb_logname, sizeof(_logname)-1); \
|
|
Packit |
2997f0 |
_logname[RD_KAFKA_NODENAME_SIZE-1] = '\0'; \
|
|
Packit |
2997f0 |
mtx_unlock(&(rkb)->rkb_logname_lock); \
|
|
Packit |
2997f0 |
rd_kafka_log0(&(rkb)->rkb_rk->rk_conf, \
|
|
Packit |
2997f0 |
(rkb)->rkb_rk, _logname, \
|
|
Packit |
2997f0 |
level, fac, __VA_ARGS__); \
|
|
Packit |
2997f0 |
} while (0)
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
#define rd_rkb_dbg(rkb,ctx,fac,...) do { \
|
|
Packit |
2997f0 |
if (unlikely((rkb)->rkb_rk->rk_conf.debug & \
|
|
Packit |
2997f0 |
(RD_KAFKA_DBG_ ## ctx))) { \
|
|
Packit |
2997f0 |
rd_rkb_log(rkb, LOG_DEBUG, fac, __VA_ARGS__); \
|
|
Packit |
2997f0 |
} \
|
|
Packit |
2997f0 |
} while (0)
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
extern rd_kafka_resp_err_t RD_TLS rd_kafka_last_error_code;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
static RD_UNUSED RD_INLINE
|
|
Packit |
2997f0 |
rd_kafka_resp_err_t rd_kafka_set_last_error (rd_kafka_resp_err_t err,
|
|
Packit |
2997f0 |
int errnox) {
|
|
Packit |
2997f0 |
if (errnox) {
|
|
Packit |
2997f0 |
#ifdef _MSC_VER
|
|
Packit |
2997f0 |
/* This is the correct way to set errno on Windows,
|
|
Packit |
2997f0 |
* but it is still pointless due to different errnos in
|
|
Packit |
2997f0 |
* in different runtimes:
|
|
Packit |
2997f0 |
* https://social.msdn.microsoft.com/Forums/vstudio/en-US/b4500c0d-1b69-40c7-9ef5-08da1025b5bf/setting-errno-from-within-a-dll?forum=vclanguage/
|
|
Packit |
2997f0 |
* errno is thus highly deprecated, and buggy, on Windows
|
|
Packit |
2997f0 |
* when using librdkafka as a dynamically loaded DLL. */
|
|
Packit |
2997f0 |
_set_errno(errnox);
|
|
Packit |
2997f0 |
#else
|
|
Packit |
2997f0 |
errno = errnox;
|
|
Packit |
2997f0 |
#endif
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
rd_kafka_last_error_code = err;
|
|
Packit |
2997f0 |
return err;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
extern rd_atomic32_t rd_kafka_thread_cnt_curr;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
void rd_kafka_set_thread_name (const char *fmt, ...);
|
|
Packit |
2997f0 |
void rd_kafka_set_thread_sysname (const char *fmt, ...);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
int rd_kafka_path_is_dir (const char *path);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
rd_kafka_op_res_t
|
|
Packit |
2997f0 |
rd_kafka_poll_cb (rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko,
|
|
Packit |
2997f0 |
rd_kafka_q_cb_type_t cb_type, void *opaque);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
rd_kafka_resp_err_t rd_kafka_subscribe_rkt (rd_kafka_itopic_t *rkt);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
#endif /* _RDKAFKA_INT_H_ */
|