/* * librdkafka - Apache Kafka C library * * Copyright (c) 2012-2015, Magnus Edenhill * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ #ifndef _RDKAFKA_OP_H_ #define _RDKAFKA_OP_H_ #include "rdkafka_msg.h" /* Forward declarations */ typedef struct rd_kafka_q_s rd_kafka_q_t; typedef struct rd_kafka_toppar_s rd_kafka_toppar_t; typedef struct rd_kafka_op_s rd_kafka_op_t; /* One-off reply queue + reply version. * All APIs that take a rd_kafka_replyq_t makes a copy of the * struct as-is and grabs hold of the existing .q refcount. * Think of replyq as a (Q,VERSION) tuple. */ typedef struct rd_kafka_replyq_s { rd_kafka_q_t *q; int32_t version; #if ENABLE_DEVEL char *_id; /* Devel id used for debugging reference leaks. * Is a strdup() of the caller's function name, * which makes for easy debugging with valgrind. */ #endif } rd_kafka_replyq_t; /** * Flags used by: * - rd_kafka_op_t.rko_flags * - rd_kafka_buf_t.rkbuf_flags */ #define RD_KAFKA_OP_F_FREE 0x1 /* rd_free payload when done with it */ #define RD_KAFKA_OP_F_FLASH 0x2 /* Internal: insert at head of queue */ #define RD_KAFKA_OP_F_NO_RESPONSE 0x4 /* rkbuf: Not expecting a response */ #define RD_KAFKA_OP_F_CRC 0x8 /* rkbuf: Perform CRC calculation */ #define RD_KAFKA_OP_F_BLOCKING 0x10 /* rkbuf: blocking protocol request */ #define RD_KAFKA_OP_F_REPROCESS 0x20 /* cgrp: Reprocess at a later time. */ typedef enum { RD_KAFKA_OP_NONE, /* No specific type, use OP_CB */ RD_KAFKA_OP_FETCH, /* Kafka thread -> Application */ RD_KAFKA_OP_ERR, /* Kafka thread -> Application */ RD_KAFKA_OP_CONSUMER_ERR, /* Kafka thread -> Application */ RD_KAFKA_OP_DR, /* Kafka thread -> Application * Produce message delivery report */ RD_KAFKA_OP_STATS, /* Kafka thread -> Application */ RD_KAFKA_OP_OFFSET_COMMIT, /* any -> toppar's Broker thread */ RD_KAFKA_OP_NODE_UPDATE, /* any -> Broker thread: node update */ RD_KAFKA_OP_XMIT_BUF, /* transmit buffer: any -> broker thread */ RD_KAFKA_OP_RECV_BUF, /* received response buffer: broker thr -> any */ RD_KAFKA_OP_XMIT_RETRY, /* retry buffer xmit: any -> broker thread */ RD_KAFKA_OP_FETCH_START, /* Application -> toppar's handler thread */ RD_KAFKA_OP_FETCH_STOP, /* Application -> toppar's handler thread */ RD_KAFKA_OP_SEEK, /* Application -> toppar's handler thread */ RD_KAFKA_OP_PAUSE, /* Application -> toppar's handler thread */ RD_KAFKA_OP_OFFSET_FETCH, /* Broker -> broker thread: fetch offsets * for topic. */ RD_KAFKA_OP_PARTITION_JOIN, /* * -> cgrp op: add toppar to cgrp * * -> broker op: add toppar to broker */ RD_KAFKA_OP_PARTITION_LEAVE, /* * -> cgrp op: remove toppar from cgrp * * -> broker op: remove toppar from rkb*/ RD_KAFKA_OP_REBALANCE, /* broker thread -> app: * group rebalance */ RD_KAFKA_OP_TERMINATE, /* For generic use */ RD_KAFKA_OP_COORD_QUERY, /* Query for coordinator */ RD_KAFKA_OP_SUBSCRIBE, /* New subscription */ RD_KAFKA_OP_ASSIGN, /* New assignment */ RD_KAFKA_OP_GET_SUBSCRIPTION,/* Get current subscription. * Reuses u.subscribe */ RD_KAFKA_OP_GET_ASSIGNMENT, /* Get current assignment. * Reuses u.assign */ RD_KAFKA_OP_THROTTLE, /* Throttle info */ RD_KAFKA_OP_NAME, /* Request name */ RD_KAFKA_OP_OFFSET_RESET, /* Offset reset */ RD_KAFKA_OP_METADATA, /* Metadata response */ RD_KAFKA_OP_LOG, /* Log */ RD_KAFKA_OP_WAKEUP, /* Wake-up signaling */ RD_KAFKA_OP__END } rd_kafka_op_type_t; /* Flags used with op_type_t */ #define RD_KAFKA_OP_CB (1 << 30) /* Callback op. */ #define RD_KAFKA_OP_REPLY (1 << 31) /* Reply op. */ #define RD_KAFKA_OP_FLAGMASK (RD_KAFKA_OP_CB | RD_KAFKA_OP_REPLY) /** * @brief Op/queue priority levels. * @remark Since priority levels alter the FIFO order, pay extra attention * to preserve ordering as deemed necessary. * @remark Priority should only be set on ops destined for application * facing queues (rk_rep, rkcg_q, etc). */ typedef enum { RD_KAFKA_PRIO_NORMAL = 0, /* Normal bulk, messages, DRs, etc. */ RD_KAFKA_PRIO_MEDIUM, /* Prioritize in front of bulk, * still at some scale. e.g. logs, .. */ RD_KAFKA_PRIO_HIGH, /* Small scale high priority */ RD_KAFKA_PRIO_FLASH /* Micro scale, immediate delivery. */ } rd_kafka_op_prio_t; /** * @brief Op handler result * * @remark When returning YIELD from a handler the handler will * need to have made sure to either re-enqueue the op or destroy it * since the caller will not touch the op anymore. */ typedef enum { RD_KAFKA_OP_RES_PASS, /* Not handled, pass to caller */ RD_KAFKA_OP_RES_HANDLED, /* Op was handled (through callbacks) */ RD_KAFKA_OP_RES_YIELD /* Callback called yield */ } rd_kafka_op_res_t; /** * @brief Queue serve callback call type */ typedef enum { RD_KAFKA_Q_CB_INVALID, /* dont use */ RD_KAFKA_Q_CB_CALLBACK,/* trigger callback based on op */ RD_KAFKA_Q_CB_RETURN, /* return op rather than trigger callback * (if possible)*/ RD_KAFKA_Q_CB_FORCE_RETURN, /* return op, regardless of callback. */ RD_KAFKA_Q_CB_EVENT /* like _Q_CB_RETURN but return event_t:ed op */ } rd_kafka_q_cb_type_t; /** * @brief Queue serve callback * @remark See rd_kafka_op_res_t docs for return semantics. */ typedef rd_kafka_op_res_t (rd_kafka_q_serve_cb_t) (rd_kafka_t *rk, struct rd_kafka_q_s *rkq, struct rd_kafka_op_s *rko, rd_kafka_q_cb_type_t cb_type, void *opaque) RD_WARN_UNUSED_RESULT; /** * @brief Op callback type */ typedef rd_kafka_op_res_t (rd_kafka_op_cb_t) (rd_kafka_t *rk, rd_kafka_q_t *rkq, struct rd_kafka_op_s *rko) RD_WARN_UNUSED_RESULT; #define RD_KAFKA_OP_TYPE_ASSERT(rko,type) \ rd_kafka_assert(NULL, (rko)->rko_type == (type) && # type) struct rd_kafka_op_s { TAILQ_ENTRY(rd_kafka_op_s) rko_link; rd_kafka_op_type_t rko_type; /* Internal op type */ rd_kafka_event_type_t rko_evtype; int rko_flags; /* See RD_KAFKA_OP_F_... above */ int32_t rko_version; rd_kafka_resp_err_t rko_err; int32_t rko_len; /* Depends on type, typically the * message length. */ rd_kafka_op_prio_t rko_prio; /* In-queue priority. * Higher value means higher prio. */ shptr_rd_kafka_toppar_t *rko_rktp; /* * Generic fields */ /* Indicates request: enqueue reply on rko_replyq.q with .version. * .q is refcounted. */ rd_kafka_replyq_t rko_replyq; /* Original queue's op serve callback and opaque, if any. * Mainly used for forwarded queues to use the original queue's * serve function from the forwarded position. */ rd_kafka_q_serve_cb_t *rko_serve; void *rko_serve_opaque; rd_kafka_t *rko_rk; #if ENABLE_DEVEL const char *rko_source; /**< Where op was created */ #endif /* RD_KAFKA_OP_CB */ rd_kafka_op_cb_t *rko_op_cb; union { struct { rd_kafka_buf_t *rkbuf; rd_kafka_msg_t rkm; int evidx; } fetch; struct { rd_kafka_topic_partition_list_t *partitions; int do_free; /* free .partitions on destroy() */ } offset_fetch; struct { rd_kafka_topic_partition_list_t *partitions; void (*cb) (rd_kafka_t *rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t *offsets, void *opaque); void *opaque; int silent_empty; /**< Fail silently if there are no * offsets to commit. */ rd_ts_t ts_timeout; char *reason; } offset_commit; struct { rd_kafka_topic_partition_list_t *topics; } subscribe; /* also used for GET_SUBSCRIPTION */ struct { rd_kafka_topic_partition_list_t *partitions; } assign; /* also used for GET_ASSIGNMENT */ struct { rd_kafka_topic_partition_list_t *partitions; } rebalance; struct { char *str; } name; struct { int64_t offset; char *errstr; rd_kafka_msg_t rkm; } err; /* used for ERR and CONSUMER_ERR */ struct { int throttle_time; int32_t nodeid; char *nodename; } throttle; struct { char *json; size_t json_len; } stats; struct { rd_kafka_buf_t *rkbuf; } xbuf; /* XMIT_BUF and RECV_BUF */ /* RD_KAFKA_OP_METADATA */ struct { rd_kafka_metadata_t *md; int force; /* force request regardless of outstanding * metadata requests. */ } metadata; struct { shptr_rd_kafka_itopic_t *s_rkt; rd_kafka_msgq_t msgq; rd_kafka_msgq_t msgq2; int do_purge2; } dr; struct { int32_t nodeid; char nodename[RD_KAFKA_NODENAME_SIZE]; } node; struct { int64_t offset; char *reason; } offset_reset; struct { int64_t offset; struct rd_kafka_cgrp_s *rkcg; } fetch_start; /* reused for SEEK */ struct { int pause; int flag; } pause; struct { char fac[64]; int level; char *str; } log; } rko_u; }; TAILQ_HEAD(rd_kafka_op_head_s, rd_kafka_op_s); const char *rd_kafka_op2str (rd_kafka_op_type_t type); void rd_kafka_op_destroy (rd_kafka_op_t *rko); rd_kafka_op_t *rd_kafka_op_new0 (const char *source, rd_kafka_op_type_t type); #if ENABLE_DEVEL #define _STRINGIFYX(A) #A #define _STRINGIFY(A) _STRINGIFYX(A) #define rd_kafka_op_new(type) \ rd_kafka_op_new0(__FILE__ ":" _STRINGIFY(__LINE__), type) #else #define rd_kafka_op_new(type) rd_kafka_op_new0(NULL, type) #endif rd_kafka_op_t *rd_kafka_op_new_reply (rd_kafka_op_t *rko_orig, rd_kafka_resp_err_t err); rd_kafka_op_t *rd_kafka_op_new_cb (rd_kafka_t *rk, rd_kafka_op_type_t type, rd_kafka_op_cb_t *cb); int rd_kafka_op_reply (rd_kafka_op_t *rko, rd_kafka_resp_err_t err); #define rd_kafka_op_set_prio(rko,prio) ((rko)->rko_prio = prio) #define rd_kafka_op_err(rk,err,...) do { \ if (!(rk)->rk_conf.error_cb) { \ rd_kafka_log(rk, LOG_ERR, "ERROR", __VA_ARGS__); \ break; \ } \ rd_kafka_q_op_err((rk)->rk_rep, RD_KAFKA_OP_ERR, err, 0, \ NULL, 0, __VA_ARGS__); \ } while (0) void rd_kafka_q_op_err (rd_kafka_q_t *rkq, rd_kafka_op_type_t optype, rd_kafka_resp_err_t err, int32_t version, rd_kafka_toppar_t *rktp, int64_t offset, const char *fmt, ...); rd_kafka_op_t *rd_kafka_op_req (rd_kafka_q_t *destq, rd_kafka_op_t *rko, int timeout_ms); rd_kafka_op_t *rd_kafka_op_req2 (rd_kafka_q_t *destq, rd_kafka_op_type_t type); rd_kafka_resp_err_t rd_kafka_op_err_destroy (rd_kafka_op_t *rko); rd_kafka_op_res_t rd_kafka_op_call (rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko) RD_WARN_UNUSED_RESULT; rd_kafka_op_t * rd_kafka_op_new_fetch_msg (rd_kafka_msg_t **rkmp, rd_kafka_toppar_t *rktp, int32_t version, rd_kafka_buf_t *rkbuf, int64_t offset, size_t key_len, const void *key, size_t val_len, const void *val); void rd_kafka_op_throttle_time (struct rd_kafka_broker_s *rkb, rd_kafka_q_t *rkq, int throttle_time); rd_kafka_op_res_t rd_kafka_op_handle (rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko, rd_kafka_q_cb_type_t cb_type, void *opaque, rd_kafka_q_serve_cb_t *callback) RD_WARN_UNUSED_RESULT; extern rd_atomic32_t rd_kafka_op_cnt; void rd_kafka_op_print (FILE *fp, const char *prefix, rd_kafka_op_t *rko); void rd_kafka_op_offset_store (rd_kafka_t *rk, rd_kafka_op_t *rko, const rd_kafka_message_t *rkmessage); #endif /* _RDKAFKA_OP_H_ */