Blob Blame History Raw
/*
 * librdkafka - Apache Kafka C library
 *
 * Copyright (c) 2012,2013 Magnus Edenhill
 * All rights reserved.
 * 
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met: 
 * 
 * 1. Redistributions of source code must retain the above copyright notice,
 *    this list of conditions and the following disclaimer. 
 * 2. Redistributions in binary form must reproduce the above copyright notice,
 *    this list of conditions and the following disclaimer in the documentation
 *    and/or other materials provided with the distribution. 
 * 
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */

#ifndef _RDKAFKA_BROKER_H_
#define _RDKAFKA_BROKER_H_

#include "rdkafka_feature.h"


extern const char *rd_kafka_broker_state_names[];
extern const char *rd_kafka_secproto_names[];

struct rd_kafka_broker_s { /* rd_kafka_broker_t */
	TAILQ_ENTRY(rd_kafka_broker_s) rkb_link;

	int32_t             rkb_nodeid;
#define RD_KAFKA_NODEID_UA -1

	rd_sockaddr_list_t *rkb_rsal;
        rd_ts_t             rkb_ts_rsal_last;
        const rd_sockaddr_inx_t  *rkb_addr_last; /* Last used connect address */

	rd_kafka_transport_t *rkb_transport;

	uint32_t            rkb_corrid;
	int                 rkb_connid;    /* Connection id, increased by
					    * one for each connection by
					    * this broker. Used as a safe-guard
					    * to help troubleshooting buffer
					    * problems across disconnects. */

	rd_kafka_q_t       *rkb_ops;

        mtx_t               rkb_lock;

        int                 rkb_blocking_max_ms; /* Maximum IO poll blocking
                                                  * time. */

        /* Toppars handled by this broker */
	TAILQ_HEAD(, rd_kafka_toppar_s) rkb_toppars;
	int                 rkb_toppar_cnt;

        /* Active toppars that are eligible for:
         *  - (consumer) fetching due to underflow
         *  - (producer) producing
         *
         * The circleq provides round-robin scheduling for both cases.
         */
        CIRCLEQ_HEAD(, rd_kafka_toppar_s) rkb_active_toppars;
        int                 rkb_active_toppar_cnt;
        rd_kafka_toppar_t  *rkb_active_toppar_next;  /* Next 'first' toppar
                                                      * in fetch list.
                                                      * This is used for
                                                      * round-robin. */


        rd_kafka_cgrp_t    *rkb_cgrp;

	rd_ts_t             rkb_ts_fetch_backoff;
	int                 rkb_fetching;

	enum {
		RD_KAFKA_BROKER_STATE_INIT,
		RD_KAFKA_BROKER_STATE_DOWN,
		RD_KAFKA_BROKER_STATE_CONNECT,
		RD_KAFKA_BROKER_STATE_AUTH,

		/* Any state >= STATE_UP means the Kafka protocol layer
		 * is operational (to some degree). */
		RD_KAFKA_BROKER_STATE_UP,
                RD_KAFKA_BROKER_STATE_UPDATE,
		RD_KAFKA_BROKER_STATE_APIVERSION_QUERY,
		RD_KAFKA_BROKER_STATE_AUTH_HANDSHAKE
	} rkb_state;

        rd_ts_t             rkb_ts_state;        /* Timestamp of last
                                                  * state change */
        rd_interval_t       rkb_timeout_scan_intvl;  /* Waitresp timeout scan
                                                      * interval. */

        rd_atomic32_t       rkb_blocking_request_cnt; /* The number of
                                                       * in-flight blocking
                                                       * requests.
                                                       * A blocking request is
                                                       * one that is known to
                                                       * possibly block on the
                                                       * broker for longer than
                                                       * the typical processing
                                                       * time, e.g.:
                                                       * JoinGroup, SyncGroup */

	int                 rkb_features;    /* Protocol features supported
					      * by this broker.
					      * See RD_KAFKA_FEATURE_* in
					      * rdkafka_proto.h */

        struct rd_kafka_ApiVersion *rkb_ApiVersions; /* Broker's supported APIs
                                                      * (MUST be sorted) */
	size_t                      rkb_ApiVersions_cnt;
	rd_interval_t               rkb_ApiVersion_fail_intvl; /* Controls how long
								* the fallback proto
								* will be used after
								* ApiVersionRequest
								* failure. */

	rd_kafka_confsource_t  rkb_source;
	struct {
		rd_atomic64_t tx_bytes;
		rd_atomic64_t tx;    /* Kafka-messages (not payload msgs) */
		rd_atomic64_t tx_err;
		rd_atomic64_t tx_retries;
		rd_atomic64_t req_timeouts;  /* Accumulated value */

		rd_atomic64_t rx_bytes;
		rd_atomic64_t rx;    /* Kafka messages (not payload msgs) */
		rd_atomic64_t rx_err;
		rd_atomic64_t rx_corrid_err; /* CorrId misses */
		rd_atomic64_t rx_partial;    /* Partial messages received
                                              * and dropped. */
                rd_atomic64_t zbuf_grow;     /* Compression/decompression buffer grows needed */
                rd_atomic64_t buf_grow;      /* rkbuf grows needed */
                rd_atomic64_t wakeups;       /* Poll wakeups */
	} rkb_c;

        int                 rkb_req_timeouts;  /* Current value */

	rd_ts_t             rkb_ts_metadata_poll; /* Next metadata poll time */
	int                 rkb_metadata_fast_poll_cnt; /* Perform fast
							 * metadata polls. */
	thrd_t              rkb_thread;

	rd_refcnt_t         rkb_refcnt;

        rd_kafka_t         *rkb_rk;

	rd_kafka_buf_t     *rkb_recv_buf;

	int                 rkb_max_inflight;   /* Maximum number of in-flight
						 * requests to broker.
						 * Compared to rkb_waitresps length.*/
	rd_kafka_bufq_t     rkb_outbufs;
	rd_kafka_bufq_t     rkb_waitresps;
	rd_kafka_bufq_t     rkb_retrybufs;

	rd_avg_t            rkb_avg_int_latency;/* Current internal latency period*/
	rd_avg_t            rkb_avg_rtt;        /* Current RTT period */
	rd_avg_t            rkb_avg_throttle;   /* Current throttle period */

        /* These are all protected by rkb_lock */
	char                rkb_name[RD_KAFKA_NODENAME_SIZE];  /* Displ name */
	char                rkb_nodename[RD_KAFKA_NODENAME_SIZE]; /* host:port*/
        uint16_t            rkb_port;                          /* TCP port */
        char               *rkb_origname;                      /* Original
                                                                * host name */


        /* Logging name is a copy of rkb_name, protected by its own mutex */
        char               *rkb_logname;
        mtx_t               rkb_logname_lock;

        int                 rkb_wakeup_fd[2];     /* Wake-up fds (r/w) to wake
                                                   * up from IO-wait when
                                                   * queues have content. */
        int                 rkb_toppar_wakeup_fd; /* Toppar msgq wakeup fd,
                                                   * this is rkb_wakeup_fd[1]
                                                   * if enabled. */
        rd_interval_t       rkb_connect_intvl;    /* Reconnect throttling */

	rd_kafka_secproto_t rkb_proto;

	int                 rkb_down_reported;    /* Down event reported */
#if WITH_SASL_CYRUS
	rd_kafka_timer_t    rkb_sasl_kinit_refresh_tmr;
#endif


	struct {
		char msg[512];
		int  err;  /* errno */
	} rkb_err;
};

#define rd_kafka_broker_keep(rkb)   rd_refcnt_add(&(rkb)->rkb_refcnt)
#define rd_kafka_broker_lock(rkb)   mtx_lock(&(rkb)->rkb_lock)
#define rd_kafka_broker_unlock(rkb) mtx_unlock(&(rkb)->rkb_lock)


/**
 * @brief Broker comparator
 */
static RD_UNUSED RD_INLINE int rd_kafka_broker_cmp (const void *_a,
                                                    const void *_b) {
        const rd_kafka_broker_t *a = _a, *b = _b;
        return (int)(a - b);
}


/**
 * @returns true if broker supports \p features, else false.
 */
static RD_UNUSED
int rd_kafka_broker_supports (rd_kafka_broker_t *rkb, int features) {
	int r;
	rd_kafka_broker_lock(rkb);
	r = (rkb->rkb_features & features) == features;
	rd_kafka_broker_unlock(rkb);
	return r;
}

int16_t rd_kafka_broker_ApiVersion_supported (rd_kafka_broker_t *rkb,
                                              int16_t ApiKey,
                                              int16_t minver, int16_t maxver,
                                              int *featuresp);

int rd_kafka_broker_get_state (rd_kafka_broker_t *rkb);

rd_kafka_broker_t *rd_kafka_broker_find_by_nodeid (rd_kafka_t *rk,
						   int32_t nodeid);
rd_kafka_broker_t *rd_kafka_broker_find_by_nodeid0 (rd_kafka_t *rk,
                                                    int32_t nodeid,
                                                    int state);
#define rd_kafka_broker_find_by_nodeid(rk,nodeid) \
        rd_kafka_broker_find_by_nodeid0(rk,nodeid,-1)

/**
 * Filter out brokers that are currently in a blocking request.
 */
static RD_INLINE RD_UNUSED int
rd_kafka_broker_filter_non_blocking (rd_kafka_broker_t *rkb, void *opaque) {
        return rd_atomic32_get(&rkb->rkb_blocking_request_cnt) > 0;
}

/**
 * Filter out brokers that cant do GroupCoordinator requests right now.
 */
static RD_INLINE RD_UNUSED int
rd_kafka_broker_filter_can_group_query (rd_kafka_broker_t *rkb, void *opaque) {
        return rd_atomic32_get(&rkb->rkb_blocking_request_cnt) > 0 ||
		!(rkb->rkb_features & RD_KAFKA_FEATURE_BROKER_GROUP_COORD);
}

rd_kafka_broker_t *rd_kafka_broker_any (rd_kafka_t *rk, int state,
                                        int (*filter) (rd_kafka_broker_t *rkb,
                                                       void *opaque),
                                        void *opaque);

rd_kafka_broker_t *rd_kafka_broker_any_usable (rd_kafka_t *rk, int timeout_ms,
                                               int do_lock);

rd_kafka_broker_t *rd_kafka_broker_prefer (rd_kafka_t *rk, int32_t broker_id, int state);

int rd_kafka_brokers_add0 (rd_kafka_t *rk, const char *brokerlist);
void rd_kafka_broker_set_state (rd_kafka_broker_t *rkb, int state);

void rd_kafka_broker_fail (rd_kafka_broker_t *rkb,
			   int level, rd_kafka_resp_err_t err,
			   const char *fmt, ...);

void rd_kafka_broker_destroy_final (rd_kafka_broker_t *rkb);

#define rd_kafka_broker_destroy(rkb)                                    \
        rd_refcnt_destroywrapper(&(rkb)->rkb_refcnt,                    \
                                 rd_kafka_broker_destroy_final(rkb))


void rd_kafka_broker_update (rd_kafka_t *rk, rd_kafka_secproto_t proto,
                             const struct rd_kafka_metadata_broker *mdb);
rd_kafka_broker_t *rd_kafka_broker_add (rd_kafka_t *rk,
					rd_kafka_confsource_t source,
					rd_kafka_secproto_t proto,
					const char *name, uint16_t port,
					int32_t nodeid);

void rd_kafka_broker_connect_up (rd_kafka_broker_t *rkb);
void rd_kafka_broker_connect_done (rd_kafka_broker_t *rkb, const char *errstr);

int rd_kafka_send (rd_kafka_broker_t *rkb);
int rd_kafka_recv (rd_kafka_broker_t *rkb);

void rd_kafka_dr_msgq (rd_kafka_itopic_t *rkt,
		       rd_kafka_msgq_t *rkmq, rd_kafka_resp_err_t err);

void rd_kafka_broker_buf_enq1 (rd_kafka_broker_t *rkb,
                               rd_kafka_buf_t *rkbuf,
                               rd_kafka_resp_cb_t *resp_cb,
                               void *opaque);

void rd_kafka_broker_buf_enq_replyq (rd_kafka_broker_t *rkb,
                                     rd_kafka_buf_t *rkbuf,
                                     rd_kafka_replyq_t replyq,
                                     rd_kafka_resp_cb_t *resp_cb,
                                     void *opaque);

void rd_kafka_broker_buf_retry (rd_kafka_broker_t *rkb, rd_kafka_buf_t *rkbuf);


rd_kafka_broker_t *rd_kafka_broker_internal (rd_kafka_t *rk);

void msghdr_print (rd_kafka_t *rk,
		   const char *what, const struct msghdr *msg,
		   int hexdump);

const char *rd_kafka_broker_name (rd_kafka_broker_t *rkb);
void rd_kafka_broker_wakeup (rd_kafka_broker_t *rkb);

int rd_kafka_brokers_get_state_version (rd_kafka_t *rk);
int rd_kafka_brokers_wait_state_change (rd_kafka_t *rk, int stored_version,
					int timeout_ms);
void rd_kafka_brokers_broadcast_state_change (rd_kafka_t *rk);



/**
 * Updates the current toppar active round-robin next pointer.
 */
static RD_INLINE RD_UNUSED
void rd_kafka_broker_active_toppar_next (rd_kafka_broker_t *rkb,
                                        rd_kafka_toppar_t *sugg_next) {
        if (CIRCLEQ_EMPTY(&rkb->rkb_active_toppars) ||
            (void *)sugg_next == CIRCLEQ_ENDC(&rkb->rkb_active_toppars))
                rkb->rkb_active_toppar_next = NULL;
        else if (sugg_next)
                rkb->rkb_active_toppar_next = sugg_next;
        else
                rkb->rkb_active_toppar_next =
                        CIRCLEQ_FIRST(&rkb->rkb_active_toppars);
}


void rd_kafka_broker_active_toppar_add (rd_kafka_broker_t *rkb,
                                        rd_kafka_toppar_t *rktp);

void rd_kafka_broker_active_toppar_del (rd_kafka_broker_t *rkb,
                                        rd_kafka_toppar_t *rktp);

#endif /* _RDKAFKA_BROKER_H_ */