Blob Blame History Raw
/*
 * librdkafka - Apache Kafka C library
 *
 * Copyright (c) 2016, Magnus Edenhill
 * All rights reserved.
 * 
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met: 
 * 
 * 1. Redistributions of source code must retain the above copyright notice,
 *    this list of conditions and the following disclaimer. 
 * 2. Redistributions in binary form must reproduce the above copyright notice,
 *    this list of conditions and the following disclaimer in the documentation
 *    and/or other materials provided with the distribution. 
 * 
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */


#include "rdkafka_int.h"
#include "rdkafka_feature.h"

#include <stdlib.h>

static const char *rd_kafka_feature_names[] = {
	"MsgVer1",
	"ApiVersion",
	"BrokerBalancedConsumer",
	"ThrottleTime",
	"Sasl",
	"SaslHandshake",
	"BrokerGroupCoordinator",
	"LZ4",
        "OffsetTime",
        "MsgVer2",
	NULL
};


static const struct rd_kafka_feature_map {
	/* RD_KAFKA_FEATURE_... */
	int feature;

	/* Depends on the following ApiVersions overlapping with
	 * what the broker supports: */
	struct rd_kafka_ApiVersion depends[RD_KAFKAP__NUM];

} rd_kafka_feature_map[] = {
	/**
	 * @brief List of features and the ApiVersions they depend on.
	 *
	 * The dependency list consists of the ApiKey followed by this
	 * client's supported minimum and maximum API versions.
	 * As long as this list and its versions overlaps with the
	 * broker supported API versions the feature will be enabled.
	 */
	{

		/* @brief >=0.10.0: Message.MagicByte version 1:
		 * Relative offsets (KIP-31) and message timestamps (KIP-32). */
		.feature = RD_KAFKA_FEATURE_MSGVER1,
		.depends = {
			{ RD_KAFKAP_Produce, 2, 2 },
			{ RD_KAFKAP_Fetch, 2, 2 },
			{ -1 },
		},
	},
        {
                /* @brief >=0.11.0: Message.MagicByte version 2 */
                .feature = RD_KAFKA_FEATURE_MSGVER2,
                .depends = {
                        { RD_KAFKAP_Produce, 3, 3 },
                        { RD_KAFKAP_Fetch, 4, 4 },
                        { -1 },
                },
        },
	{
		
		/* @brief >=0.10.0: ApiVersionQuery support.
		 * @remark This is a bit of chicken-and-egg problem but needs to be
		 *         set by feature_check() to avoid the feature being cleared
		 *         even when broker supports it. */
		.feature = RD_KAFKA_FEATURE_APIVERSION,
		.depends = {
			{ RD_KAFKAP_ApiVersion, 0, 0 },
			{ -1 },
		},
	},
	{
		/* @brief >=0.8.2.0: Broker-based Group coordinator */
		.feature = RD_KAFKA_FEATURE_BROKER_GROUP_COORD,
		.depends = {
			{ RD_KAFKAP_GroupCoordinator, 0, 0 },
			{ -1 },
		},
	},
	{
		/* @brief >=0.9.0: Broker-based balanced consumer groups. */
		.feature = RD_KAFKA_FEATURE_BROKER_BALANCED_CONSUMER,
		.depends = {
			{ RD_KAFKAP_GroupCoordinator, 0, 0 },
			{ RD_KAFKAP_OffsetCommit, 1, 2 },
			{ RD_KAFKAP_OffsetFetch, 1, 1 },
			{ RD_KAFKAP_JoinGroup, 0, 0 },
			{ RD_KAFKAP_SyncGroup, 0, 0 },
			{ RD_KAFKAP_Heartbeat, 0, 0 },
			{ RD_KAFKAP_LeaveGroup, 0, 0 },
			{ -1 },
		},
	},
	{
		/* @brief >=0.9.0: ThrottleTime */
		.feature = RD_KAFKA_FEATURE_THROTTLETIME,
		.depends = {
			{ RD_KAFKAP_Produce, 1, 2 },
			{ RD_KAFKAP_Fetch, 1, 2 },
			{ -1 },
		},

        },
        {
                /* @brief >=0.9.0: SASL (GSSAPI) authentication.
                 * Since SASL is not using the Kafka protocol
                 * we must use something else to map us to the
                 * proper broker version support:
                 * JoinGroup was released along with SASL in 0.9.0. */
                .feature = RD_KAFKA_FEATURE_SASL_GSSAPI,
                .depends = {
                        { RD_KAFKAP_JoinGroup, 0, 0 },
                        { -1 },
                },
        },
        {
                /* @brief >=0.10.0: SASL mechanism handshake (KIP-43)
                 *                  to automatically support other mechanisms
                 *                  than GSSAPI, such as PLAIN. */
                .feature = RD_KAFKA_FEATURE_SASL_HANDSHAKE,
                .depends = {
                        { RD_KAFKAP_SaslHandshake, 0, 0 },
                        { -1 },
                },
        },
        {
                /* @brief >=0.8.2: LZ4 compression.
                 * Since LZ4 initially did not rely on a specific API
                 * type or version (it does in >=0.10.0)
                 * we must use something else to map us to the
                 * proper broker version support:
                 * GrooupCoordinator was released in 0.8.2 */
                .feature = RD_KAFKA_FEATURE_LZ4,
                .depends = {
                        { RD_KAFKAP_GroupCoordinator, 0, 0 },
                        { -1 },
                },
        },
        {
                /* @brief >=0.10.1.0: Offset v1 (KIP-79)
                 * Time-based offset requests */
                .feature = RD_KAFKA_FEATURE_OFFSET_TIME,
                .depends = {
                        { RD_KAFKAP_Offset, 1, 1 },
                        { -1 },
                }
        },
        { .feature = 0 }, /* sentinel */
};



/**
 * @brief In absence of KIP-35 support in earlier broker versions we provide hardcoded
 *        lists that corresponds to older broker versions.
 */

/* >= 0.10.0.0: dummy for all future versions that support ApiVersionRequest */
static struct rd_kafka_ApiVersion rd_kafka_ApiVersion_Queryable[] = {
	{ RD_KAFKAP_ApiVersion, 0, 0 }
};


/* =~ 0.9.0 */
static struct rd_kafka_ApiVersion rd_kafka_ApiVersion_0_9_0[] = {
	{ RD_KAFKAP_Produce, 0, 1 },
	{ RD_KAFKAP_Fetch, 0, 1 },
	{ RD_KAFKAP_Offset, 0, 0 },
	{ RD_KAFKAP_Metadata, 0, 0 },
	{ RD_KAFKAP_OffsetCommit, 0, 2 },
	{ RD_KAFKAP_OffsetFetch, 0, 1 },
	{ RD_KAFKAP_GroupCoordinator, 0, 0 },
	{ RD_KAFKAP_JoinGroup, 0, 0 },
	{ RD_KAFKAP_Heartbeat, 0, 0 },
	{ RD_KAFKAP_LeaveGroup, 0, 0 },
	{ RD_KAFKAP_SyncGroup, 0, 0 },
	{ RD_KAFKAP_DescribeGroups, 0, 0 },
	{ RD_KAFKAP_ListGroups, 0, 0 }
};

/* =~ 0.8.2 */
static struct rd_kafka_ApiVersion rd_kafka_ApiVersion_0_8_2[] = {
	{ RD_KAFKAP_Produce, 0, 0 },
	{ RD_KAFKAP_Fetch, 0, 0 },
	{ RD_KAFKAP_Offset, 0, 0 },
	{ RD_KAFKAP_Metadata, 0, 0 },
	{ RD_KAFKAP_OffsetCommit, 0, 1 },
	{ RD_KAFKAP_OffsetFetch, 0, 1 },
	{ RD_KAFKAP_GroupCoordinator, 0, 0 }
};

/* =~ 0.8.1 */
static struct rd_kafka_ApiVersion rd_kafka_ApiVersion_0_8_1[] = {
	{ RD_KAFKAP_Produce, 0, 0 },
	{ RD_KAFKAP_Fetch, 0, 0 },
	{ RD_KAFKAP_Offset, 0, 0 },
	{ RD_KAFKAP_Metadata, 0, 0 },
	{ RD_KAFKAP_OffsetCommit, 0, 1 },
	{ RD_KAFKAP_OffsetFetch, 0, 0 }
};

/* =~ 0.8.0 */
static struct rd_kafka_ApiVersion rd_kafka_ApiVersion_0_8_0[] = {
	{ RD_KAFKAP_Produce, 0, 0 },
	{ RD_KAFKAP_Fetch, 0, 0 },
	{ RD_KAFKAP_Offset, 0, 0 },
	{ RD_KAFKAP_Metadata, 0, 0 }
};


/**
 * @brief Returns the ApiVersion list for legacy broker versions that do not
 *        support the ApiVersionQuery request. E.g., brokers <0.10.0.
 *
 * @param broker_version Broker version to match (longest prefix matching).
 * @param use_default If no match is found return the default APIs (but return 0).
 *
 * @returns 1 if \p broker_version was recognized: \p *apisp will point to
 *          the ApiVersion list and *api_cntp will be set to its element count.
 *          0 if \p broker_version was not recognized: \p *apisp remains unchanged.
 *
 */
int rd_kafka_get_legacy_ApiVersions (const char *broker_version,
				     struct rd_kafka_ApiVersion **apisp,
				     size_t *api_cntp, const char *fallback) {
	static const struct {
		const char *pfx;
		struct rd_kafka_ApiVersion *apis;
		size_t api_cnt;
	} vermap[] = {
#define _VERMAP(PFX,APIS) { PFX, APIS, RD_ARRAYSIZE(APIS) }
		_VERMAP("0.9.0", rd_kafka_ApiVersion_0_9_0),
		_VERMAP("0.8.2", rd_kafka_ApiVersion_0_8_2),
		_VERMAP("0.8.1", rd_kafka_ApiVersion_0_8_1),
		_VERMAP("0.8.0", rd_kafka_ApiVersion_0_8_0),
		{ "0.7.", NULL }, /* Unsupported */
		{ "0.6.", NULL }, /* Unsupported */
		_VERMAP("", rd_kafka_ApiVersion_Queryable),
		{ NULL }
	};
	int i;
	int fallback_i = -1;
        int ret = 0;

        *apisp = NULL;
        *api_cntp = 0;

	for (i = 0 ; vermap[i].pfx ; i++) {
		if (!strncmp(vermap[i].pfx, broker_version, strlen(vermap[i].pfx))) {
			if (!vermap[i].apis)
				return 0;
			*apisp = vermap[i].apis;
			*api_cntp = vermap[i].api_cnt;
                        ret = 1;
                        break;
		} else if (fallback && !strcmp(vermap[i].pfx, fallback))
			fallback_i = i;
	}

	if (!*apisp && fallback) {
		rd_kafka_assert(NULL, fallback_i != -1);
		*apisp    = vermap[fallback_i].apis;
		*api_cntp = vermap[fallback_i].api_cnt;
	}

        return ret;
}


/**
 * @returns 1 if the provided broker version (probably)
 *          supports api.version.request.
 */
int rd_kafka_ApiVersion_is_queryable (const char *broker_version) {
	struct rd_kafka_ApiVersion *apis;
	size_t api_cnt;


	if (!rd_kafka_get_legacy_ApiVersions(broker_version,
					     &apis, &api_cnt, 0))
		return 0;

	return apis == rd_kafka_ApiVersion_Queryable;
}




	
/**
 * @brief Check if match's versions overlaps with \p apis.
 *
 * @returns 1 if true, else 0.
 * @remark \p apis must be sorted using rd_kafka_ApiVersion_key_cmp()
 */
static RD_INLINE int
rd_kafka_ApiVersion_check (const struct rd_kafka_ApiVersion *apis, size_t api_cnt,
			   const struct rd_kafka_ApiVersion *match) {
	const struct rd_kafka_ApiVersion *api;

	api = bsearch(match, apis, api_cnt, sizeof(*apis),
		      rd_kafka_ApiVersion_key_cmp);
	if (unlikely(!api))
		return 0;

	return match->MinVer <= api->MaxVer && api->MinVer <= match->MaxVer;
}


/**
 * @brief Compare broker's supported API versions to our feature request map
 *        and enable/disable features accordingly.
 *
 * @param broker_apis Broker's supported APIs. If NULL the
 *        \p broker.version.fallback configuration property will specify a
 *        default legacy version to use.
 * @param broker_api_cnt Number of elements in \p broker_apis
 *
 * @returns the supported features (bitmask) to enable.
 */
int rd_kafka_features_check (rd_kafka_broker_t *rkb,
			     struct rd_kafka_ApiVersion *broker_apis,
			     size_t broker_api_cnt) {
	int features = 0;
	int i;

	/* Scan through features. */
	for (i = 0 ; rd_kafka_feature_map[i].feature != 0 ; i++) {
		const struct rd_kafka_ApiVersion *match;
		int fails = 0;

		/* For each feature check that all its API dependencies
		 * can be fullfilled. */

		for (match = &rd_kafka_feature_map[i].depends[0] ;
		     match->ApiKey != -1 ; match++) {
			int r;
			
			r = rd_kafka_ApiVersion_check(broker_apis, broker_api_cnt,
						      match);

			rd_rkb_dbg(rkb, FEATURE, "APIVERSION",
				   " Feature %s: %s (%hd..%hd) "
				   "%ssupported by broker",
				   rd_kafka_features2str(rd_kafka_feature_map[i].
							feature),
				   rd_kafka_ApiKey2str(match->ApiKey),
				   match->MinVer, match->MaxVer,
				   r ? "" : "NOT ");

			fails += !r;
		}

		rd_rkb_dbg(rkb, FEATURE, "APIVERSION",
			   "%s feature %s",
			   fails ? "Disabling" : "Enabling",
			   rd_kafka_features2str(rd_kafka_feature_map[i].feature));


		if (!fails)
			features |= rd_kafka_feature_map[i].feature;
	}

	return features;
}



/**
 * @brief Make an allocated and sorted copy of \p src.
 */
void
rd_kafka_ApiVersions_copy (const struct rd_kafka_ApiVersion *src,
                           size_t src_cnt,
                           struct rd_kafka_ApiVersion **dstp,
                           size_t *dst_cntp) {
        *dstp = rd_memdup(src, sizeof(*src) * src_cnt);
        *dst_cntp = src_cnt;
        qsort(*dstp, *dst_cntp, sizeof(**dstp), rd_kafka_ApiVersion_key_cmp);
}






/**
 * @returns a human-readable feature flag string.
 */
const char *rd_kafka_features2str (int features) {
	static RD_TLS char ret[4][128];
	size_t of = 0;
	static RD_TLS int reti = 0;
	int i;

	reti = (reti + 1) % 4;

	*ret[reti] = '\0';
	for (i = 0 ; rd_kafka_feature_names[i] ; i++) {
		int r;
		if (!(features & (1 << i)))
			continue;

		r = rd_snprintf(ret[reti]+of, sizeof(ret[reti])-of, "%s%s",
				of == 0 ? "" : ",",
				rd_kafka_feature_names[i]);
		if ((size_t)r > sizeof(ret[reti])-of) {
			/* Out of space */
			memcpy(&ret[reti][sizeof(ret[reti])-3], "..", 3);
			break;
		}

		of += r;
	}

	return ret[reti];
}