Blob Blame History Raw
/*
 * librdkafka - Apache Kafka C library
 *
 * Copyright (c) 2016 Magnus Edenhill
 * All rights reserved.
 * 
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met: 
 * 
 * 1. Redistributions of source code must retain the above copyright notice,
 *    this list of conditions and the following disclaimer. 
 * 2. Redistributions in binary form must reproduce the above copyright notice,
 *    this list of conditions and the following disclaimer in the documentation
 *    and/or other materials provided with the distribution. 
 * 
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */

#include "rdkafka_int.h"
#include "rdkafka_event.h"
#include "rd.h"

rd_kafka_event_type_t rd_kafka_event_type (const rd_kafka_event_t *rkev) {
	return rkev ? rkev->rko_evtype : RD_KAFKA_EVENT_NONE;
}

const char *rd_kafka_event_name (const rd_kafka_event_t *rkev) {
	switch (rkev ? rkev->rko_evtype : RD_KAFKA_EVENT_NONE)
	{
	case RD_KAFKA_EVENT_NONE:
		return "(NONE)";
	case RD_KAFKA_EVENT_DR:
		return "DeliveryReport";
	case RD_KAFKA_EVENT_FETCH:
		return "Fetch";
	case RD_KAFKA_EVENT_LOG:
		return "Log";
	case RD_KAFKA_EVENT_ERROR:
		return "Error";
	case RD_KAFKA_EVENT_REBALANCE:
		return "Rebalance";
	case RD_KAFKA_EVENT_OFFSET_COMMIT:
		return "OffsetCommit";
	case RD_KAFKA_EVENT_STATS:
		return "Stats";
	default:
		return "?unknown?";
	}
}




void rd_kafka_event_destroy (rd_kafka_event_t *rkev) {
	if (unlikely(!rkev))
		return;
	rd_kafka_op_destroy(rkev);
}


/**
 * @returns the next message from the event's message queue.
 * @remark messages will be freed automatically when event is destroyed,
 *         application MUST NOT call rd_kafka_message_destroy()
 */
const rd_kafka_message_t *
rd_kafka_event_message_next (rd_kafka_event_t *rkev) {
	rd_kafka_op_t *rko = rkev;
	rd_kafka_msg_t *rkm;
	rd_kafka_msgq_t *rkmq, *rkmq2;
	rd_kafka_message_t *rkmessage;

	switch (rkev->rko_type)
	{
	case RD_KAFKA_OP_DR:
		rkmq = &rko->rko_u.dr.msgq;
		rkmq2 = &rko->rko_u.dr.msgq2;
		break;

	case RD_KAFKA_OP_FETCH:
		/* Just one message */
		if (rko->rko_u.fetch.evidx++ > 0)
			return NULL;

		rkmessage = rd_kafka_message_get(rko);
		if (unlikely(!rkmessage))
			return NULL;

		/* Store offset */
		rd_kafka_op_offset_store(NULL, rko, rkmessage);

		return rkmessage;


	default:
		return NULL;
	}

	if (unlikely(!(rkm = TAILQ_FIRST(&rkmq->rkmq_msgs))))
		return NULL;

	rd_kafka_msgq_deq(rkmq, rkm, 1);

	/* Put rkm on secondary message queue which will be purged later. */
	rd_kafka_msgq_enq(rkmq2, rkm);

	return rd_kafka_message_get_from_rkm(rko, rkm);
}


size_t rd_kafka_event_message_array (rd_kafka_event_t *rkev,
				     const rd_kafka_message_t **rkmessages, size_t size) {
	size_t cnt = 0;
	const rd_kafka_message_t *rkmessage;

	while ((rkmessage = rd_kafka_event_message_next(rkev)))
		rkmessages[cnt++] = rkmessage;

	return cnt;
}


size_t rd_kafka_event_message_count (rd_kafka_event_t *rkev) {
	switch (rkev->rko_evtype)
	{
	case RD_KAFKA_EVENT_DR:
                return (size_t)rkev->rko_u.dr.msgq.rkmq_msg_cnt;
	case RD_KAFKA_EVENT_FETCH:
		return 1;
	default:
		return 0;
	}
}


rd_kafka_resp_err_t rd_kafka_event_error (rd_kafka_event_t *rkev) {
	return rkev->rko_err;
}

const char *rd_kafka_event_error_string (rd_kafka_event_t *rkev) {
	switch (rkev->rko_type)
	{
	case RD_KAFKA_OP_ERR:
	case RD_KAFKA_OP_CONSUMER_ERR:
		if (rkev->rko_u.err.errstr)
			return rkev->rko_u.err.errstr;
		/* FALLTHRU */
	default:
		return rd_kafka_err2str(rkev->rko_err);
	}
}


void *rd_kafka_event_opaque (rd_kafka_event_t *rkev) {
	switch (rkev->rko_type & ~RD_KAFKA_OP_FLAGMASK)
	{
	case RD_KAFKA_OP_OFFSET_COMMIT:
		return rkev->rko_u.offset_commit.opaque;
	default:
		return NULL;
	}
}


int rd_kafka_event_log (rd_kafka_event_t *rkev, const char **fac,
			const char **str, int *level) {
	if (unlikely(rkev->rko_evtype != RD_KAFKA_EVENT_LOG))
		return -1;

	if (likely(fac != NULL))
                *fac = rkev->rko_u.log.fac;
	if (likely(str != NULL))
		*str = rkev->rko_u.log.str;
	if (likely(level != NULL))
		*level = rkev->rko_u.log.level;

	return 0;
}

const char *rd_kafka_event_stats (rd_kafka_event_t *rkev) {
	return rkev->rko_u.stats.json;
}

rd_kafka_topic_partition_list_t *
rd_kafka_event_topic_partition_list (rd_kafka_event_t *rkev) {
	switch (rkev->rko_evtype)
	{
	case RD_KAFKA_EVENT_REBALANCE:
		return rkev->rko_u.rebalance.partitions;
	case RD_KAFKA_EVENT_OFFSET_COMMIT:
		return rkev->rko_u.offset_commit.partitions;
	default:
		return NULL;
	}
}


rd_kafka_topic_partition_t *
rd_kafka_event_topic_partition (rd_kafka_event_t *rkev) {
	rd_kafka_topic_partition_t *rktpar;

	if (unlikely(!rkev->rko_rktp))
		return NULL;

	rktpar = rd_kafka_topic_partition_new_from_rktp(
		rd_kafka_toppar_s2i(rkev->rko_rktp));

	switch (rkev->rko_type)
	{
	case RD_KAFKA_OP_ERR:
	case RD_KAFKA_OP_CONSUMER_ERR:
		rktpar->offset = rkev->rko_u.err.offset;
		break;
	default:
		break;
	}

	rktpar->err = rkev->rko_err;

	return rktpar;

}