|
Packit |
2997f0 |
/*
|
|
Packit |
2997f0 |
* librdkafka - The Apache Kafka C/C++ library
|
|
Packit |
2997f0 |
*
|
|
Packit |
2997f0 |
* Copyright (c) 2015 Magnus Edenhill
|
|
Packit |
2997f0 |
* All rights reserved.
|
|
Packit |
2997f0 |
*
|
|
Packit |
2997f0 |
* Redistribution and use in source and binary forms, with or without
|
|
Packit |
2997f0 |
* modification, are permitted provided that the following conditions are met:
|
|
Packit |
2997f0 |
*
|
|
Packit |
2997f0 |
* 1. Redistributions of source code must retain the above copyright notice,
|
|
Packit |
2997f0 |
* this list of conditions and the following disclaimer.
|
|
Packit |
2997f0 |
* 2. Redistributions in binary form must reproduce the above copyright notice,
|
|
Packit |
2997f0 |
* this list of conditions and the following disclaimer in the documentation
|
|
Packit |
2997f0 |
* and/or other materials provided with the distribution.
|
|
Packit |
2997f0 |
*
|
|
Packit |
2997f0 |
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
|
Packit |
2997f0 |
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
|
Packit |
2997f0 |
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
|
Packit |
2997f0 |
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
|
|
Packit |
2997f0 |
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
|
|
Packit |
2997f0 |
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
|
|
Packit |
2997f0 |
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
|
Packit |
2997f0 |
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
|
|
Packit |
2997f0 |
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
|
|
Packit |
2997f0 |
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
|
Packit |
2997f0 |
* POSSIBILITY OF SUCH DAMAGE.
|
|
Packit |
2997f0 |
*/
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
#include "rdkafka_int.h"
|
|
Packit |
2997f0 |
#include "rdkafka_transport.h"
|
|
Packit |
2997f0 |
#include "rdkafka_transport_int.h"
|
|
Packit |
2997f0 |
#include "rdkafka_sasl.h"
|
|
Packit |
2997f0 |
#include "rdkafka_sasl_int.h"
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/**
|
|
Packit |
2997f0 |
* Send auth message with framing.
|
|
Packit |
2997f0 |
* This is a blocking call.
|
|
Packit |
2997f0 |
*/
|
|
Packit |
2997f0 |
int rd_kafka_sasl_send (rd_kafka_transport_t *rktrans,
|
|
Packit |
2997f0 |
const void *payload, int len,
|
|
Packit |
2997f0 |
char *errstr, size_t errstr_size) {
|
|
Packit |
2997f0 |
rd_buf_t buf;
|
|
Packit |
2997f0 |
rd_slice_t slice;
|
|
Packit |
2997f0 |
int32_t hdr;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SASL",
|
|
Packit |
2997f0 |
"Send SASL frame to broker (%d bytes)", len);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
rd_buf_init(&buf, 1+1, sizeof(hdr));
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
hdr = htobe32(len);
|
|
Packit |
2997f0 |
rd_buf_write(&buf, &hdr, sizeof(hdr));
|
|
Packit |
2997f0 |
if (payload)
|
|
Packit |
2997f0 |
rd_buf_push(&buf, payload, len, NULL);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
rd_slice_init_full(&slice, &buf;;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Simulate blocking behaviour on non-blocking socket..
|
|
Packit |
2997f0 |
* FIXME: This isn't optimal but is highly unlikely to stall since
|
|
Packit |
2997f0 |
* the socket buffer will most likely not be exceeded. */
|
|
Packit |
2997f0 |
do {
|
|
Packit |
2997f0 |
int r;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
r = (int)rd_kafka_transport_send(rktrans, &slice,
|
|
Packit |
2997f0 |
errstr, errstr_size);
|
|
Packit |
2997f0 |
if (r == -1) {
|
|
Packit |
2997f0 |
rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SASL",
|
|
Packit |
2997f0 |
"SASL send failed: %s", errstr);
|
|
Packit |
2997f0 |
rd_buf_destroy(&buf;;
|
|
Packit |
2997f0 |
return -1;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (rd_slice_remains(&slice) == 0)
|
|
Packit |
2997f0 |
break;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Avoid busy-looping */
|
|
Packit |
2997f0 |
rd_usleep(10*1000, NULL);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
} while (1);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
rd_buf_destroy(&buf;;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
return 0;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/**
|
|
Packit |
2997f0 |
* @brief Authentication succesful
|
|
Packit |
2997f0 |
*
|
|
Packit |
2997f0 |
* Transition to next connect state.
|
|
Packit |
2997f0 |
*/
|
|
Packit |
2997f0 |
void rd_kafka_sasl_auth_done (rd_kafka_transport_t *rktrans) {
|
|
Packit |
2997f0 |
/* Authenticated */
|
|
Packit |
2997f0 |
rd_kafka_broker_connect_up(rktrans->rktrans_rkb);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
int rd_kafka_sasl_io_event (rd_kafka_transport_t *rktrans, int events,
|
|
Packit |
2997f0 |
char *errstr, size_t errstr_size) {
|
|
Packit |
2997f0 |
rd_kafka_buf_t *rkbuf;
|
|
Packit |
2997f0 |
int r;
|
|
Packit |
2997f0 |
const void *buf;
|
|
Packit |
2997f0 |
size_t len;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (!(events & POLLIN))
|
|
Packit |
2997f0 |
return 0;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
r = rd_kafka_transport_framed_recv(rktrans, &rkbuf,
|
|
Packit |
2997f0 |
errstr, errstr_size);
|
|
Packit |
2997f0 |
if (r == -1) {
|
|
Packit |
2997f0 |
if (!strcmp(errstr, "Disconnected"))
|
|
Packit |
2997f0 |
rd_snprintf(errstr, errstr_size,
|
|
Packit |
2997f0 |
"Disconnected: check client %s credentials "
|
|
Packit |
2997f0 |
"and broker logs",
|
|
Packit |
2997f0 |
rktrans->rktrans_rkb->rkb_rk->rk_conf.
|
|
Packit |
2997f0 |
sasl.mechanisms);
|
|
Packit |
2997f0 |
return -1;
|
|
Packit |
2997f0 |
} else if (r == 0) /* not fully received yet */
|
|
Packit |
2997f0 |
return 0;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SASL",
|
|
Packit |
2997f0 |
"Received SASL frame from broker (%"PRIusz" bytes)",
|
|
Packit |
2997f0 |
rkbuf ? rkbuf->rkbuf_totlen : 0);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (rkbuf) {
|
|
Packit |
2997f0 |
rd_slice_init_full(&rkbuf->rkbuf_reader, &rkbuf->rkbuf_buf);
|
|
Packit |
2997f0 |
/* Seek past framing header */
|
|
Packit |
2997f0 |
rd_slice_seek(&rkbuf->rkbuf_reader, 4);
|
|
Packit |
2997f0 |
len = rd_slice_remains(&rkbuf->rkbuf_reader);
|
|
Packit |
2997f0 |
buf = rd_slice_ensure_contig(&rkbuf->rkbuf_reader, len);
|
|
Packit |
2997f0 |
} else {
|
|
Packit |
2997f0 |
buf = NULL;
|
|
Packit |
2997f0 |
len = 0;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
r = rktrans->rktrans_rkb->rkb_rk->
|
|
Packit |
2997f0 |
rk_conf.sasl.provider->recv(rktrans, buf, len,
|
|
Packit |
2997f0 |
errstr, errstr_size);
|
|
Packit |
2997f0 |
rd_kafka_buf_destroy(rkbuf);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
return r;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/**
|
|
Packit |
2997f0 |
* @brief Close SASL session (from transport code)
|
|
Packit |
2997f0 |
* @remark May be called on non-SASL transports (no-op)
|
|
Packit |
2997f0 |
*/
|
|
Packit |
2997f0 |
void rd_kafka_sasl_close (rd_kafka_transport_t *rktrans) {
|
|
Packit |
2997f0 |
const struct rd_kafka_sasl_provider *provider =
|
|
Packit |
2997f0 |
rktrans->rktrans_rkb->rkb_rk->rk_conf.
|
|
Packit |
2997f0 |
sasl.provider;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (provider && provider->close)
|
|
Packit |
2997f0 |
provider->close(rktrans);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/**
|
|
Packit |
2997f0 |
* Initialize and start SASL authentication.
|
|
Packit |
2997f0 |
*
|
|
Packit |
2997f0 |
* Returns 0 on successful init and -1 on error.
|
|
Packit |
2997f0 |
*
|
|
Packit |
2997f0 |
* Locality: broker thread
|
|
Packit |
2997f0 |
*/
|
|
Packit |
2997f0 |
int rd_kafka_sasl_client_new (rd_kafka_transport_t *rktrans,
|
|
Packit |
2997f0 |
char *errstr, size_t errstr_size) {
|
|
Packit |
2997f0 |
int r;
|
|
Packit |
2997f0 |
rd_kafka_broker_t *rkb = rktrans->rktrans_rkb;
|
|
Packit |
2997f0 |
rd_kafka_t *rk = rkb->rkb_rk;
|
|
Packit |
2997f0 |
char *hostname, *t;
|
|
Packit |
2997f0 |
const struct rd_kafka_sasl_provider *provider =
|
|
Packit |
2997f0 |
rk->rk_conf.sasl.provider;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Verify broker support:
|
|
Packit |
2997f0 |
* - RD_KAFKA_FEATURE_SASL_GSSAPI - GSSAPI supported
|
|
Packit |
2997f0 |
* - RD_KAFKA_FEATURE_SASL_HANDSHAKE - GSSAPI, PLAIN and possibly
|
|
Packit |
2997f0 |
* other mechanisms supported. */
|
|
Packit |
2997f0 |
if (!strcmp(rk->rk_conf.sasl.mechanisms, "GSSAPI")) {
|
|
Packit |
2997f0 |
if (!(rkb->rkb_features & RD_KAFKA_FEATURE_SASL_GSSAPI)) {
|
|
Packit |
2997f0 |
rd_snprintf(errstr, errstr_size,
|
|
Packit |
2997f0 |
"SASL GSSAPI authentication not supported "
|
|
Packit |
2997f0 |
"by broker");
|
|
Packit |
2997f0 |
return -1;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
} else if (!(rkb->rkb_features & RD_KAFKA_FEATURE_SASL_HANDSHAKE)) {
|
|
Packit |
2997f0 |
rd_snprintf(errstr, errstr_size,
|
|
Packit |
2997f0 |
"SASL Handshake not supported by broker "
|
|
Packit |
2997f0 |
"(required by mechanism %s)%s",
|
|
Packit |
2997f0 |
rk->rk_conf.sasl.mechanisms,
|
|
Packit |
2997f0 |
rk->rk_conf.api_version_request ? "" :
|
|
Packit |
2997f0 |
": try api.version.request=true");
|
|
Packit |
2997f0 |
return -1;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
rd_strdupa(&hostname, rktrans->rktrans_rkb->rkb_nodename);
|
|
Packit |
2997f0 |
if ((t = strchr(hostname, ':')))
|
|
Packit |
2997f0 |
*t = '\0'; /* remove ":port" */
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
rd_rkb_dbg(rkb, SECURITY, "SASL",
|
|
Packit |
2997f0 |
"Initializing SASL client: service name %s, "
|
|
Packit |
2997f0 |
"hostname %s, mechanisms %s, provider %s",
|
|
Packit |
2997f0 |
rk->rk_conf.sasl.service_name, hostname,
|
|
Packit |
2997f0 |
rk->rk_conf.sasl.mechanisms,
|
|
Packit |
2997f0 |
provider->name);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
r = provider->client_new(rktrans, hostname, errstr, errstr_size);
|
|
Packit |
2997f0 |
if (r != -1)
|
|
Packit |
2997f0 |
rd_kafka_transport_poll_set(rktrans, POLLIN);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
return r;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/**
|
|
Packit |
2997f0 |
* Per handle SASL term.
|
|
Packit |
2997f0 |
*
|
|
Packit |
2997f0 |
* Locality: broker thread
|
|
Packit |
2997f0 |
*/
|
|
Packit |
2997f0 |
void rd_kafka_sasl_broker_term (rd_kafka_broker_t *rkb) {
|
|
Packit |
2997f0 |
const struct rd_kafka_sasl_provider *provider =
|
|
Packit |
2997f0 |
rkb->rkb_rk->rk_conf.sasl.provider;
|
|
Packit |
2997f0 |
if (provider->broker_term)
|
|
Packit |
2997f0 |
provider->broker_term(rkb);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/**
|
|
Packit |
2997f0 |
* Broker SASL init.
|
|
Packit |
2997f0 |
*
|
|
Packit |
2997f0 |
* Locality: broker thread
|
|
Packit |
2997f0 |
*/
|
|
Packit |
2997f0 |
void rd_kafka_sasl_broker_init (rd_kafka_broker_t *rkb) {
|
|
Packit |
2997f0 |
const struct rd_kafka_sasl_provider *provider =
|
|
Packit |
2997f0 |
rkb->rkb_rk->rk_conf.sasl.provider;
|
|
Packit |
2997f0 |
if (provider->broker_init)
|
|
Packit |
2997f0 |
provider->broker_init(rkb);
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/**
|
|
Packit |
2997f0 |
* @brief Select SASL provider for configured mechanism (singularis)
|
|
Packit |
2997f0 |
* @returns 0 on success or -1 on failure.
|
|
Packit |
2997f0 |
*/
|
|
Packit |
2997f0 |
int rd_kafka_sasl_select_provider (rd_kafka_t *rk,
|
|
Packit |
2997f0 |
char *errstr, size_t errstr_size) {
|
|
Packit |
2997f0 |
const struct rd_kafka_sasl_provider *provider = NULL;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (!strcmp(rk->rk_conf.sasl.mechanisms, "GSSAPI")) {
|
|
Packit |
2997f0 |
/* GSSAPI / Kerberos */
|
|
Packit |
2997f0 |
#ifdef _MSC_VER
|
|
Packit |
2997f0 |
provider = &rd_kafka_sasl_win32_provider;
|
|
Packit |
2997f0 |
#elif WITH_SASL_CYRUS
|
|
Packit |
2997f0 |
provider = &rd_kafka_sasl_cyrus_provider;
|
|
Packit |
2997f0 |
#endif
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
} else if (!strcmp(rk->rk_conf.sasl.mechanisms, "PLAIN")) {
|
|
Packit |
2997f0 |
/* SASL PLAIN */
|
|
Packit |
2997f0 |
provider = &rd_kafka_sasl_plain_provider;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
} else if (!strncmp(rk->rk_conf.sasl.mechanisms, "SCRAM-SHA-",
|
|
Packit |
2997f0 |
strlen("SCRAM-SHA-"))) {
|
|
Packit |
2997f0 |
/* SASL SCRAM */
|
|
Packit |
2997f0 |
#if WITH_SASL_SCRAM
|
|
Packit |
2997f0 |
provider = &rd_kafka_sasl_scram_provider;
|
|
Packit |
2997f0 |
#endif
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
} else {
|
|
Packit |
2997f0 |
/* Unsupported mechanism */
|
|
Packit |
2997f0 |
rd_snprintf(errstr, errstr_size,
|
|
Packit |
2997f0 |
"Unsupported SASL mechanism: %s",
|
|
Packit |
2997f0 |
rk->rk_conf.sasl.mechanisms);
|
|
Packit |
2997f0 |
return -1;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
if (!provider) {
|
|
Packit |
2997f0 |
rd_snprintf(errstr, errstr_size,
|
|
Packit |
2997f0 |
"No provider for SASL mechanism %s"
|
|
Packit |
2997f0 |
": recompile librdkafka with "
|
|
Packit |
2997f0 |
#ifndef _MSC_VER
|
|
Packit |
2997f0 |
"libsasl2 or "
|
|
Packit |
2997f0 |
#endif
|
|
Packit |
2997f0 |
"openssl support. "
|
|
Packit |
2997f0 |
"Current build options:"
|
|
Packit |
2997f0 |
" PLAIN"
|
|
Packit |
2997f0 |
#ifdef _MSC_VER
|
|
Packit |
2997f0 |
" WindowsSSPI(GSSAPI)"
|
|
Packit |
2997f0 |
#endif
|
|
Packit |
2997f0 |
#if WITH_SASL_CYRUS
|
|
Packit |
2997f0 |
" SASL_CYRUS"
|
|
Packit |
2997f0 |
#endif
|
|
Packit |
2997f0 |
#if WITH_SASL_SCRAM
|
|
Packit |
2997f0 |
" SASL_SCRAM"
|
|
Packit |
2997f0 |
#endif
|
|
Packit |
2997f0 |
,
|
|
Packit |
2997f0 |
rk->rk_conf.sasl.mechanisms);
|
|
Packit |
2997f0 |
return -1;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
rd_kafka_dbg(rk, SECURITY, "SASL",
|
|
Packit |
2997f0 |
"Selected provider %s for SASL mechanism %s",
|
|
Packit |
2997f0 |
provider->name, rk->rk_conf.sasl.mechanisms);
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/* Validate SASL config */
|
|
Packit |
2997f0 |
if (provider->conf_validate &&
|
|
Packit |
2997f0 |
provider->conf_validate(rk, errstr, errstr_size) == -1)
|
|
Packit |
2997f0 |
return -1;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
rk->rk_conf.sasl.provider = provider;
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
return 0;
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/**
|
|
Packit |
2997f0 |
* Global SASL termination.
|
|
Packit |
2997f0 |
*/
|
|
Packit |
2997f0 |
void rd_kafka_sasl_global_term (void) {
|
|
Packit |
2997f0 |
#if WITH_SASL_CYRUS
|
|
Packit |
2997f0 |
rd_kafka_sasl_cyrus_global_term();
|
|
Packit |
2997f0 |
#endif
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
|
|
Packit |
2997f0 |
/**
|
|
Packit |
2997f0 |
* Global SASL init, called once per runtime.
|
|
Packit |
2997f0 |
*/
|
|
Packit |
2997f0 |
int rd_kafka_sasl_global_init (void) {
|
|
Packit |
2997f0 |
#if WITH_SASL_CYRUS
|
|
Packit |
2997f0 |
return rd_kafka_sasl_cyrus_global_init();
|
|
Packit |
2997f0 |
#else
|
|
Packit |
2997f0 |
return 0;
|
|
Packit |
2997f0 |
#endif
|
|
Packit |
2997f0 |
}
|
|
Packit |
2997f0 |
|