/* * librdkafka - Apache Kafka C library * * Copyright (c) 2012,2013 Magnus Edenhill * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ #ifndef _RDKAFKA_BROKER_H_ #define _RDKAFKA_BROKER_H_ #include "rdkafka_feature.h" extern const char *rd_kafka_broker_state_names[]; extern const char *rd_kafka_secproto_names[]; struct rd_kafka_broker_s { /* rd_kafka_broker_t */ TAILQ_ENTRY(rd_kafka_broker_s) rkb_link; int32_t rkb_nodeid; #define RD_KAFKA_NODEID_UA -1 rd_sockaddr_list_t *rkb_rsal; rd_ts_t rkb_ts_rsal_last; const rd_sockaddr_inx_t *rkb_addr_last; /* Last used connect address */ rd_kafka_transport_t *rkb_transport; uint32_t rkb_corrid; int rkb_connid; /* Connection id, increased by * one for each connection by * this broker. Used as a safe-guard * to help troubleshooting buffer * problems across disconnects. */ rd_kafka_q_t *rkb_ops; mtx_t rkb_lock; int rkb_blocking_max_ms; /* Maximum IO poll blocking * time. */ /* Toppars handled by this broker */ TAILQ_HEAD(, rd_kafka_toppar_s) rkb_toppars; int rkb_toppar_cnt; /* Active toppars that are eligible for: * - (consumer) fetching due to underflow * - (producer) producing * * The circleq provides round-robin scheduling for both cases. */ CIRCLEQ_HEAD(, rd_kafka_toppar_s) rkb_active_toppars; int rkb_active_toppar_cnt; rd_kafka_toppar_t *rkb_active_toppar_next; /* Next 'first' toppar * in fetch list. * This is used for * round-robin. */ rd_kafka_cgrp_t *rkb_cgrp; rd_ts_t rkb_ts_fetch_backoff; int rkb_fetching; enum { RD_KAFKA_BROKER_STATE_INIT, RD_KAFKA_BROKER_STATE_DOWN, RD_KAFKA_BROKER_STATE_CONNECT, RD_KAFKA_BROKER_STATE_AUTH, /* Any state >= STATE_UP means the Kafka protocol layer * is operational (to some degree). */ RD_KAFKA_BROKER_STATE_UP, RD_KAFKA_BROKER_STATE_UPDATE, RD_KAFKA_BROKER_STATE_APIVERSION_QUERY, RD_KAFKA_BROKER_STATE_AUTH_HANDSHAKE } rkb_state; rd_ts_t rkb_ts_state; /* Timestamp of last * state change */ rd_interval_t rkb_timeout_scan_intvl; /* Waitresp timeout scan * interval. */ rd_atomic32_t rkb_blocking_request_cnt; /* The number of * in-flight blocking * requests. * A blocking request is * one that is known to * possibly block on the * broker for longer than * the typical processing * time, e.g.: * JoinGroup, SyncGroup */ int rkb_features; /* Protocol features supported * by this broker. * See RD_KAFKA_FEATURE_* in * rdkafka_proto.h */ struct rd_kafka_ApiVersion *rkb_ApiVersions; /* Broker's supported APIs * (MUST be sorted) */ size_t rkb_ApiVersions_cnt; rd_interval_t rkb_ApiVersion_fail_intvl; /* Controls how long * the fallback proto * will be used after * ApiVersionRequest * failure. */ rd_kafka_confsource_t rkb_source; struct { rd_atomic64_t tx_bytes; rd_atomic64_t tx; /* Kafka-messages (not payload msgs) */ rd_atomic64_t tx_err; rd_atomic64_t tx_retries; rd_atomic64_t req_timeouts; /* Accumulated value */ rd_atomic64_t rx_bytes; rd_atomic64_t rx; /* Kafka messages (not payload msgs) */ rd_atomic64_t rx_err; rd_atomic64_t rx_corrid_err; /* CorrId misses */ rd_atomic64_t rx_partial; /* Partial messages received * and dropped. */ rd_atomic64_t zbuf_grow; /* Compression/decompression buffer grows needed */ rd_atomic64_t buf_grow; /* rkbuf grows needed */ rd_atomic64_t wakeups; /* Poll wakeups */ } rkb_c; int rkb_req_timeouts; /* Current value */ rd_ts_t rkb_ts_metadata_poll; /* Next metadata poll time */ int rkb_metadata_fast_poll_cnt; /* Perform fast * metadata polls. */ thrd_t rkb_thread; rd_refcnt_t rkb_refcnt; rd_kafka_t *rkb_rk; rd_kafka_buf_t *rkb_recv_buf; int rkb_max_inflight; /* Maximum number of in-flight * requests to broker. * Compared to rkb_waitresps length.*/ rd_kafka_bufq_t rkb_outbufs; rd_kafka_bufq_t rkb_waitresps; rd_kafka_bufq_t rkb_retrybufs; rd_avg_t rkb_avg_int_latency;/* Current internal latency period*/ rd_avg_t rkb_avg_rtt; /* Current RTT period */ rd_avg_t rkb_avg_throttle; /* Current throttle period */ /* These are all protected by rkb_lock */ char rkb_name[RD_KAFKA_NODENAME_SIZE]; /* Displ name */ char rkb_nodename[RD_KAFKA_NODENAME_SIZE]; /* host:port*/ uint16_t rkb_port; /* TCP port */ char *rkb_origname; /* Original * host name */ /* Logging name is a copy of rkb_name, protected by its own mutex */ char *rkb_logname; mtx_t rkb_logname_lock; int rkb_wakeup_fd[2]; /* Wake-up fds (r/w) to wake * up from IO-wait when * queues have content. */ int rkb_toppar_wakeup_fd; /* Toppar msgq wakeup fd, * this is rkb_wakeup_fd[1] * if enabled. */ rd_interval_t rkb_connect_intvl; /* Reconnect throttling */ rd_kafka_secproto_t rkb_proto; int rkb_down_reported; /* Down event reported */ #if WITH_SASL_CYRUS rd_kafka_timer_t rkb_sasl_kinit_refresh_tmr; #endif struct { char msg[512]; int err; /* errno */ } rkb_err; }; #define rd_kafka_broker_keep(rkb) rd_refcnt_add(&(rkb)->rkb_refcnt) #define rd_kafka_broker_lock(rkb) mtx_lock(&(rkb)->rkb_lock) #define rd_kafka_broker_unlock(rkb) mtx_unlock(&(rkb)->rkb_lock) /** * @brief Broker comparator */ static RD_UNUSED RD_INLINE int rd_kafka_broker_cmp (const void *_a, const void *_b) { const rd_kafka_broker_t *a = _a, *b = _b; return (int)(a - b); } /** * @returns true if broker supports \p features, else false. */ static RD_UNUSED int rd_kafka_broker_supports (rd_kafka_broker_t *rkb, int features) { int r; rd_kafka_broker_lock(rkb); r = (rkb->rkb_features & features) == features; rd_kafka_broker_unlock(rkb); return r; } int16_t rd_kafka_broker_ApiVersion_supported (rd_kafka_broker_t *rkb, int16_t ApiKey, int16_t minver, int16_t maxver, int *featuresp); int rd_kafka_broker_get_state (rd_kafka_broker_t *rkb); rd_kafka_broker_t *rd_kafka_broker_find_by_nodeid (rd_kafka_t *rk, int32_t nodeid); rd_kafka_broker_t *rd_kafka_broker_find_by_nodeid0 (rd_kafka_t *rk, int32_t nodeid, int state); #define rd_kafka_broker_find_by_nodeid(rk,nodeid) \ rd_kafka_broker_find_by_nodeid0(rk,nodeid,-1) /** * Filter out brokers that are currently in a blocking request. */ static RD_INLINE RD_UNUSED int rd_kafka_broker_filter_non_blocking (rd_kafka_broker_t *rkb, void *opaque) { return rd_atomic32_get(&rkb->rkb_blocking_request_cnt) > 0; } /** * Filter out brokers that cant do GroupCoordinator requests right now. */ static RD_INLINE RD_UNUSED int rd_kafka_broker_filter_can_group_query (rd_kafka_broker_t *rkb, void *opaque) { return rd_atomic32_get(&rkb->rkb_blocking_request_cnt) > 0 || !(rkb->rkb_features & RD_KAFKA_FEATURE_BROKER_GROUP_COORD); } rd_kafka_broker_t *rd_kafka_broker_any (rd_kafka_t *rk, int state, int (*filter) (rd_kafka_broker_t *rkb, void *opaque), void *opaque); rd_kafka_broker_t *rd_kafka_broker_any_usable (rd_kafka_t *rk, int timeout_ms, int do_lock); rd_kafka_broker_t *rd_kafka_broker_prefer (rd_kafka_t *rk, int32_t broker_id, int state); int rd_kafka_brokers_add0 (rd_kafka_t *rk, const char *brokerlist); void rd_kafka_broker_set_state (rd_kafka_broker_t *rkb, int state); void rd_kafka_broker_fail (rd_kafka_broker_t *rkb, int level, rd_kafka_resp_err_t err, const char *fmt, ...); void rd_kafka_broker_destroy_final (rd_kafka_broker_t *rkb); #define rd_kafka_broker_destroy(rkb) \ rd_refcnt_destroywrapper(&(rkb)->rkb_refcnt, \ rd_kafka_broker_destroy_final(rkb)) void rd_kafka_broker_update (rd_kafka_t *rk, rd_kafka_secproto_t proto, const struct rd_kafka_metadata_broker *mdb); rd_kafka_broker_t *rd_kafka_broker_add (rd_kafka_t *rk, rd_kafka_confsource_t source, rd_kafka_secproto_t proto, const char *name, uint16_t port, int32_t nodeid); void rd_kafka_broker_connect_up (rd_kafka_broker_t *rkb); void rd_kafka_broker_connect_done (rd_kafka_broker_t *rkb, const char *errstr); int rd_kafka_send (rd_kafka_broker_t *rkb); int rd_kafka_recv (rd_kafka_broker_t *rkb); void rd_kafka_dr_msgq (rd_kafka_itopic_t *rkt, rd_kafka_msgq_t *rkmq, rd_kafka_resp_err_t err); void rd_kafka_broker_buf_enq1 (rd_kafka_broker_t *rkb, rd_kafka_buf_t *rkbuf, rd_kafka_resp_cb_t *resp_cb, void *opaque); void rd_kafka_broker_buf_enq_replyq (rd_kafka_broker_t *rkb, rd_kafka_buf_t *rkbuf, rd_kafka_replyq_t replyq, rd_kafka_resp_cb_t *resp_cb, void *opaque); void rd_kafka_broker_buf_retry (rd_kafka_broker_t *rkb, rd_kafka_buf_t *rkbuf); rd_kafka_broker_t *rd_kafka_broker_internal (rd_kafka_t *rk); void msghdr_print (rd_kafka_t *rk, const char *what, const struct msghdr *msg, int hexdump); const char *rd_kafka_broker_name (rd_kafka_broker_t *rkb); void rd_kafka_broker_wakeup (rd_kafka_broker_t *rkb); int rd_kafka_brokers_get_state_version (rd_kafka_t *rk); int rd_kafka_brokers_wait_state_change (rd_kafka_t *rk, int stored_version, int timeout_ms); void rd_kafka_brokers_broadcast_state_change (rd_kafka_t *rk); /** * Updates the current toppar active round-robin next pointer. */ static RD_INLINE RD_UNUSED void rd_kafka_broker_active_toppar_next (rd_kafka_broker_t *rkb, rd_kafka_toppar_t *sugg_next) { if (CIRCLEQ_EMPTY(&rkb->rkb_active_toppars) || (void *)sugg_next == CIRCLEQ_ENDC(&rkb->rkb_active_toppars)) rkb->rkb_active_toppar_next = NULL; else if (sugg_next) rkb->rkb_active_toppar_next = sugg_next; else rkb->rkb_active_toppar_next = CIRCLEQ_FIRST(&rkb->rkb_active_toppars); } void rd_kafka_broker_active_toppar_add (rd_kafka_broker_t *rkb, rd_kafka_toppar_t *rktp); void rd_kafka_broker_active_toppar_del (rd_kafka_broker_t *rkb, rd_kafka_toppar_t *rktp); #endif /* _RDKAFKA_BROKER_H_ */