Blame src/rdkafka_sasl.c

Packit 2997f0
/*
Packit 2997f0
 * librdkafka - The Apache Kafka C/C++ library
Packit 2997f0
 *
Packit 2997f0
 * Copyright (c) 2015 Magnus Edenhill
Packit 2997f0
 * All rights reserved.
Packit 2997f0
 *
Packit 2997f0
 * Redistribution and use in source and binary forms, with or without
Packit 2997f0
 * modification, are permitted provided that the following conditions are met:
Packit 2997f0
 *
Packit 2997f0
 * 1. Redistributions of source code must retain the above copyright notice,
Packit 2997f0
 *    this list of conditions and the following disclaimer.
Packit 2997f0
 * 2. Redistributions in binary form must reproduce the above copyright notice,
Packit 2997f0
 *    this list of conditions and the following disclaimer in the documentation
Packit 2997f0
 *    and/or other materials provided with the distribution.
Packit 2997f0
 *
Packit 2997f0
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
Packit 2997f0
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
Packit 2997f0
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
Packit 2997f0
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
Packit 2997f0
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
Packit 2997f0
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
Packit 2997f0
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
Packit 2997f0
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
Packit 2997f0
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
Packit 2997f0
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
Packit 2997f0
 * POSSIBILITY OF SUCH DAMAGE.
Packit 2997f0
 */
Packit 2997f0
Packit 2997f0
#include "rdkafka_int.h"
Packit 2997f0
#include "rdkafka_transport.h"
Packit 2997f0
#include "rdkafka_transport_int.h"
Packit 2997f0
#include "rdkafka_sasl.h"
Packit 2997f0
#include "rdkafka_sasl_int.h"
Packit 2997f0
Packit 2997f0
Packit 2997f0
 /**
Packit 2997f0
 * Send auth message with framing.
Packit 2997f0
 * This is a blocking call.
Packit 2997f0
 */
Packit 2997f0
int rd_kafka_sasl_send (rd_kafka_transport_t *rktrans,
Packit 2997f0
                        const void *payload, int len,
Packit 2997f0
                        char *errstr, size_t errstr_size) {
Packit 2997f0
        rd_buf_t buf;
Packit 2997f0
        rd_slice_t slice;
Packit 2997f0
	int32_t hdr;
Packit 2997f0
Packit 2997f0
	rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SASL",
Packit 2997f0
		   "Send SASL frame to broker (%d bytes)", len);
Packit 2997f0
Packit 2997f0
        rd_buf_init(&buf, 1+1, sizeof(hdr));
Packit 2997f0
Packit 2997f0
	hdr = htobe32(len);
Packit 2997f0
        rd_buf_write(&buf, &hdr, sizeof(hdr));
Packit 2997f0
	if (payload)
Packit 2997f0
                rd_buf_push(&buf, payload, len, NULL);
Packit 2997f0
Packit 2997f0
        rd_slice_init_full(&slice, &buf;;
Packit 2997f0
Packit 2997f0
	/* Simulate blocking behaviour on non-blocking socket..
Packit 2997f0
	 * FIXME: This isn't optimal but is highly unlikely to stall since
Packit 2997f0
	 *        the socket buffer will most likely not be exceeded. */
Packit 2997f0
	do {
Packit 2997f0
		int r;
Packit 2997f0
Packit 2997f0
		r = (int)rd_kafka_transport_send(rktrans, &slice,
Packit 2997f0
                                                 errstr, errstr_size);
Packit 2997f0
		if (r == -1) {
Packit 2997f0
			rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SASL",
Packit 2997f0
				   "SASL send failed: %s", errstr);
Packit 2997f0
                        rd_buf_destroy(&buf;;
Packit 2997f0
			return -1;
Packit 2997f0
		}
Packit 2997f0
Packit 2997f0
                if (rd_slice_remains(&slice) == 0)
Packit 2997f0
                        break;
Packit 2997f0
Packit 2997f0
		/* Avoid busy-looping */
Packit 2997f0
		rd_usleep(10*1000, NULL);
Packit 2997f0
Packit 2997f0
	} while (1);
Packit 2997f0
Packit 2997f0
        rd_buf_destroy(&buf;;
Packit 2997f0
Packit 2997f0
	return 0;
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
Packit 2997f0
/**
Packit 2997f0
 * @brief Authentication succesful
Packit 2997f0
 *
Packit 2997f0
 * Transition to next connect state.
Packit 2997f0
 */
Packit 2997f0
void rd_kafka_sasl_auth_done (rd_kafka_transport_t *rktrans) {
Packit 2997f0
        /* Authenticated */
Packit 2997f0
        rd_kafka_broker_connect_up(rktrans->rktrans_rkb);
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
Packit 2997f0
int rd_kafka_sasl_io_event (rd_kafka_transport_t *rktrans, int events,
Packit 2997f0
                            char *errstr, size_t errstr_size) {
Packit 2997f0
        rd_kafka_buf_t *rkbuf;
Packit 2997f0
        int r;
Packit 2997f0
        const void *buf;
Packit 2997f0
        size_t len;
Packit 2997f0
Packit 2997f0
        if (!(events & POLLIN))
Packit 2997f0
                return 0;
Packit 2997f0
Packit 2997f0
        r = rd_kafka_transport_framed_recv(rktrans, &rkbuf,
Packit 2997f0
                                           errstr, errstr_size);
Packit 2997f0
        if (r == -1) {
Packit 2997f0
                if (!strcmp(errstr, "Disconnected"))
Packit 2997f0
                        rd_snprintf(errstr, errstr_size,
Packit 2997f0
                                    "Disconnected: check client %s credentials "
Packit 2997f0
                                    "and broker logs",
Packit 2997f0
                                    rktrans->rktrans_rkb->rkb_rk->rk_conf.
Packit 2997f0
                                    sasl.mechanisms);
Packit 2997f0
                return -1;
Packit 2997f0
        } else if (r == 0) /* not fully received yet */
Packit 2997f0
                return 0;
Packit 2997f0
Packit 2997f0
        rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SASL",
Packit 2997f0
                   "Received SASL frame from broker (%"PRIusz" bytes)",
Packit 2997f0
                   rkbuf ? rkbuf->rkbuf_totlen : 0);
Packit 2997f0
Packit 2997f0
        if (rkbuf) {
Packit 2997f0
                rd_slice_init_full(&rkbuf->rkbuf_reader, &rkbuf->rkbuf_buf);
Packit 2997f0
                /* Seek past framing header */
Packit 2997f0
                rd_slice_seek(&rkbuf->rkbuf_reader, 4);
Packit 2997f0
                len = rd_slice_remains(&rkbuf->rkbuf_reader);
Packit 2997f0
                buf = rd_slice_ensure_contig(&rkbuf->rkbuf_reader, len);
Packit 2997f0
        } else {
Packit 2997f0
                buf = NULL;
Packit 2997f0
                len = 0;
Packit 2997f0
        }
Packit 2997f0
Packit 2997f0
        r = rktrans->rktrans_rkb->rkb_rk->
Packit 2997f0
                rk_conf.sasl.provider->recv(rktrans, buf, len,
Packit 2997f0
                                            errstr, errstr_size);
Packit 2997f0
        rd_kafka_buf_destroy(rkbuf);
Packit 2997f0
Packit 2997f0
        return r;
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
Packit 2997f0
/**
Packit 2997f0
 * @brief Close SASL session (from transport code)
Packit 2997f0
 * @remark May be called on non-SASL transports (no-op)
Packit 2997f0
 */
Packit 2997f0
void rd_kafka_sasl_close (rd_kafka_transport_t *rktrans) {
Packit 2997f0
        const struct rd_kafka_sasl_provider *provider =
Packit 2997f0
                rktrans->rktrans_rkb->rkb_rk->rk_conf.
Packit 2997f0
                sasl.provider;
Packit 2997f0
Packit 2997f0
        if (provider && provider->close)
Packit 2997f0
                provider->close(rktrans);
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
Packit 2997f0
Packit 2997f0
/**
Packit 2997f0
 * Initialize and start SASL authentication.
Packit 2997f0
 *
Packit 2997f0
 * Returns 0 on successful init and -1 on error.
Packit 2997f0
 *
Packit 2997f0
 * Locality: broker thread
Packit 2997f0
 */
Packit 2997f0
int rd_kafka_sasl_client_new (rd_kafka_transport_t *rktrans,
Packit 2997f0
			      char *errstr, size_t errstr_size) {
Packit 2997f0
	int r;
Packit 2997f0
	rd_kafka_broker_t *rkb = rktrans->rktrans_rkb;
Packit 2997f0
	rd_kafka_t *rk = rkb->rkb_rk;
Packit 2997f0
        char *hostname, *t;
Packit 2997f0
        const struct rd_kafka_sasl_provider *provider =
Packit 2997f0
                rk->rk_conf.sasl.provider;
Packit 2997f0
Packit 2997f0
        /* Verify broker support:
Packit 2997f0
         * - RD_KAFKA_FEATURE_SASL_GSSAPI - GSSAPI supported
Packit 2997f0
         * - RD_KAFKA_FEATURE_SASL_HANDSHAKE - GSSAPI, PLAIN and possibly
Packit 2997f0
         *   other mechanisms supported. */
Packit 2997f0
        if (!strcmp(rk->rk_conf.sasl.mechanisms, "GSSAPI")) {
Packit 2997f0
                if (!(rkb->rkb_features & RD_KAFKA_FEATURE_SASL_GSSAPI)) {
Packit 2997f0
                        rd_snprintf(errstr, errstr_size,
Packit 2997f0
                                    "SASL GSSAPI authentication not supported "
Packit 2997f0
                                    "by broker");
Packit 2997f0
                        return -1;
Packit 2997f0
                }
Packit 2997f0
        } else if (!(rkb->rkb_features & RD_KAFKA_FEATURE_SASL_HANDSHAKE)) {
Packit 2997f0
                rd_snprintf(errstr, errstr_size,
Packit 2997f0
                            "SASL Handshake not supported by broker "
Packit 2997f0
                            "(required by mechanism %s)%s",
Packit 2997f0
                            rk->rk_conf.sasl.mechanisms,
Packit 2997f0
                            rk->rk_conf.api_version_request ? "" :
Packit 2997f0
                            ": try api.version.request=true");
Packit 2997f0
                return -1;
Packit 2997f0
        }
Packit 2997f0
Packit 2997f0
        rd_strdupa(&hostname, rktrans->rktrans_rkb->rkb_nodename);
Packit 2997f0
        if ((t = strchr(hostname, ':')))
Packit 2997f0
                *t = '\0';  /* remove ":port" */
Packit 2997f0
Packit 2997f0
        rd_rkb_dbg(rkb, SECURITY, "SASL",
Packit 2997f0
                   "Initializing SASL client: service name %s, "
Packit 2997f0
                   "hostname %s, mechanisms %s, provider %s",
Packit 2997f0
                   rk->rk_conf.sasl.service_name, hostname,
Packit 2997f0
                   rk->rk_conf.sasl.mechanisms,
Packit 2997f0
                   provider->name);
Packit 2997f0
Packit 2997f0
        r = provider->client_new(rktrans, hostname, errstr, errstr_size);
Packit 2997f0
        if (r != -1)
Packit 2997f0
                rd_kafka_transport_poll_set(rktrans, POLLIN);
Packit 2997f0
Packit 2997f0
        return r;
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
Packit 2997f0
Packit 2997f0
Packit 2997f0
Packit 2997f0
Packit 2997f0
Packit 2997f0
/**
Packit 2997f0
 * Per handle SASL term.
Packit 2997f0
 *
Packit 2997f0
 * Locality: broker thread
Packit 2997f0
 */
Packit 2997f0
void rd_kafka_sasl_broker_term (rd_kafka_broker_t *rkb) {
Packit 2997f0
        const struct rd_kafka_sasl_provider *provider =
Packit 2997f0
                rkb->rkb_rk->rk_conf.sasl.provider;
Packit 2997f0
        if (provider->broker_term)
Packit 2997f0
                provider->broker_term(rkb);
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
/**
Packit 2997f0
 * Broker SASL init.
Packit 2997f0
 *
Packit 2997f0
 * Locality: broker thread
Packit 2997f0
 */
Packit 2997f0
void rd_kafka_sasl_broker_init (rd_kafka_broker_t *rkb) {
Packit 2997f0
        const struct rd_kafka_sasl_provider *provider =
Packit 2997f0
                rkb->rkb_rk->rk_conf.sasl.provider;
Packit 2997f0
        if (provider->broker_init)
Packit 2997f0
                provider->broker_init(rkb);
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
Packit 2997f0
Packit 2997f0
/**
Packit 2997f0
 * @brief Select SASL provider for configured mechanism (singularis)
Packit 2997f0
 * @returns 0 on success or -1 on failure.
Packit 2997f0
 */
Packit 2997f0
int rd_kafka_sasl_select_provider (rd_kafka_t *rk,
Packit 2997f0
                                   char *errstr, size_t errstr_size) {
Packit 2997f0
        const struct rd_kafka_sasl_provider *provider = NULL;
Packit 2997f0
Packit 2997f0
        if (!strcmp(rk->rk_conf.sasl.mechanisms, "GSSAPI")) {
Packit 2997f0
                /* GSSAPI / Kerberos */
Packit 2997f0
#ifdef _MSC_VER
Packit 2997f0
                provider = &rd_kafka_sasl_win32_provider;
Packit 2997f0
#elif WITH_SASL_CYRUS
Packit 2997f0
                provider = &rd_kafka_sasl_cyrus_provider;
Packit 2997f0
#endif
Packit 2997f0
Packit 2997f0
        } else if (!strcmp(rk->rk_conf.sasl.mechanisms, "PLAIN")) {
Packit 2997f0
                /* SASL PLAIN */
Packit 2997f0
                provider = &rd_kafka_sasl_plain_provider;
Packit 2997f0
Packit 2997f0
        } else if (!strncmp(rk->rk_conf.sasl.mechanisms, "SCRAM-SHA-",
Packit 2997f0
                            strlen("SCRAM-SHA-"))) {
Packit 2997f0
                /* SASL SCRAM */
Packit 2997f0
#if WITH_SASL_SCRAM
Packit 2997f0
                provider = &rd_kafka_sasl_scram_provider;
Packit 2997f0
#endif
Packit 2997f0
Packit 2997f0
        } else {
Packit 2997f0
                /* Unsupported mechanism */
Packit 2997f0
                rd_snprintf(errstr, errstr_size,
Packit 2997f0
                            "Unsupported SASL mechanism: %s",
Packit 2997f0
                            rk->rk_conf.sasl.mechanisms);
Packit 2997f0
                return -1;
Packit 2997f0
        }
Packit 2997f0
Packit 2997f0
        if (!provider) {
Packit 2997f0
                rd_snprintf(errstr, errstr_size,
Packit 2997f0
                            "No provider for SASL mechanism %s"
Packit 2997f0
                            ": recompile librdkafka with "
Packit 2997f0
#ifndef _MSC_VER
Packit 2997f0
                            "libsasl2 or "
Packit 2997f0
#endif
Packit 2997f0
                            "openssl support. "
Packit 2997f0
                            "Current build options:"
Packit 2997f0
                            " PLAIN"
Packit 2997f0
#ifdef _MSC_VER
Packit 2997f0
                            " WindowsSSPI(GSSAPI)"
Packit 2997f0
#endif
Packit 2997f0
#if WITH_SASL_CYRUS
Packit 2997f0
                            " SASL_CYRUS"
Packit 2997f0
#endif
Packit 2997f0
#if WITH_SASL_SCRAM
Packit 2997f0
                            " SASL_SCRAM"
Packit 2997f0
#endif
Packit 2997f0
                            ,
Packit 2997f0
                            rk->rk_conf.sasl.mechanisms);
Packit 2997f0
                return -1;
Packit 2997f0
        }
Packit 2997f0
Packit 2997f0
        rd_kafka_dbg(rk, SECURITY, "SASL",
Packit 2997f0
                     "Selected provider %s for SASL mechanism %s",
Packit 2997f0
                     provider->name, rk->rk_conf.sasl.mechanisms);
Packit 2997f0
Packit 2997f0
        /* Validate SASL config */
Packit 2997f0
        if (provider->conf_validate &&
Packit 2997f0
            provider->conf_validate(rk, errstr, errstr_size) == -1)
Packit 2997f0
                return -1;
Packit 2997f0
Packit 2997f0
        rk->rk_conf.sasl.provider = provider;
Packit 2997f0
Packit 2997f0
        return 0;
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
Packit 2997f0
Packit 2997f0
/**
Packit 2997f0
 * Global SASL termination.
Packit 2997f0
 */
Packit 2997f0
void rd_kafka_sasl_global_term (void) {
Packit 2997f0
#if WITH_SASL_CYRUS
Packit 2997f0
        rd_kafka_sasl_cyrus_global_term();
Packit 2997f0
#endif
Packit 2997f0
}
Packit 2997f0
Packit 2997f0
Packit 2997f0
/**
Packit 2997f0
 * Global SASL init, called once per runtime.
Packit 2997f0
 */
Packit 2997f0
int rd_kafka_sasl_global_init (void) {
Packit 2997f0
#if WITH_SASL_CYRUS
Packit 2997f0
        return rd_kafka_sasl_cyrus_global_init();
Packit 2997f0
#else
Packit 2997f0
        return 0;
Packit 2997f0
#endif
Packit 2997f0
}
Packit 2997f0