/* * librdkafka - The Apache Kafka C/C++ library * * Copyright (c) 2015 Magnus Edenhill * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ #include "rdkafka_int.h" #include "rdkafka_transport.h" #include "rdkafka_transport_int.h" #include "rdkafka_sasl.h" #include "rdkafka_sasl_int.h" #include "rdstring.h" #ifdef __FreeBSD__ #include /* For WIF.. */ #endif #ifdef __APPLE__ /* Apple has deprecated most of the SASL API for unknown reason, * silence those warnings. */ #pragma clang diagnostic ignored "-Wdeprecated-declarations" #endif #include static mtx_t rd_kafka_sasl_cyrus_kinit_lock; typedef struct rd_kafka_sasl_cyrus_state_s { sasl_conn_t *conn; sasl_callback_t callbacks[16]; } rd_kafka_sasl_cyrus_state_t; /** * Handle received frame from broker. */ static int rd_kafka_sasl_cyrus_recv (struct rd_kafka_transport_s *rktrans, const void *buf, size_t size, char *errstr, size_t errstr_size) { rd_kafka_sasl_cyrus_state_t *state = rktrans->rktrans_sasl.state; int r; if (rktrans->rktrans_sasl.complete && size == 0) goto auth_successful; do { sasl_interact_t *interact = NULL; const char *out; unsigned int outlen; r = sasl_client_step(state->conn, size > 0 ? buf : NULL, size, &interact, &out, &outlen); if (r >= 0) { /* Note: outlen may be 0 here for an empty response */ if (rd_kafka_sasl_send(rktrans, out, outlen, errstr, errstr_size) == -1) return -1; } if (r == SASL_INTERACT) rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SASL", "SASL_INTERACT: %lu %s, %s, %s, %p", interact->id, interact->challenge, interact->prompt, interact->defresult, interact->result); } while (r == SASL_INTERACT); if (r == SASL_CONTINUE) return 0; /* Wait for more data from broker */ else if (r != SASL_OK) { rd_snprintf(errstr, errstr_size, "SASL handshake failed (step): %s", sasl_errdetail(state->conn)); return -1; } /* Authentication successful */ auth_successful: if (rktrans->rktrans_rkb->rkb_rk->rk_conf.debug & RD_KAFKA_DBG_SECURITY) { const char *user, *mech, *authsrc; if (sasl_getprop(state->conn, SASL_USERNAME, (const void **)&user) != SASL_OK) user = "(unknown)"; if (sasl_getprop(state->conn, SASL_MECHNAME, (const void **)&mech) != SASL_OK) mech = "(unknown)"; if (sasl_getprop(state->conn, SASL_AUTHSOURCE, (const void **)&authsrc) != SASL_OK) authsrc = "(unknown)"; rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SASL", "Authenticated as %s using %s (%s)", user, mech, authsrc); } rd_kafka_sasl_auth_done(rktrans); return 0; } static ssize_t render_callback (const char *key, char *buf, size_t size, void *opaque) { rd_kafka_broker_t *rkb = opaque; if (!strcmp(key, "broker.name")) { char *val, *t; size_t len; rd_kafka_broker_lock(rkb); rd_strdupa(&val, rkb->rkb_nodename); rd_kafka_broker_unlock(rkb); /* Just the broker name, no port */ if ((t = strchr(val, ':'))) len = (size_t)(t-val); else len = strlen(val); if (buf) memcpy(buf, val, RD_MIN(len, size)); return len; } else { rd_kafka_conf_res_t res; size_t destsize = size; /* Try config lookup. */ res = rd_kafka_conf_get(&rkb->rkb_rk->rk_conf, key, buf, &destsize); if (res != RD_KAFKA_CONF_OK) return -1; /* Dont include \0 in returned size */ return (destsize > 0 ? destsize-1 : destsize); } } /** * Execute kinit to refresh ticket. * * Returns 0 on success, -1 on error. * * Locality: any */ static int rd_kafka_sasl_cyrus_kinit_refresh (rd_kafka_broker_t *rkb) { rd_kafka_t *rk = rkb->rkb_rk; int r; char *cmd; char errstr[128]; if (!rk->rk_conf.sasl.kinit_cmd || !strstr(rk->rk_conf.sasl.mechanisms, "GSSAPI")) return 0; /* kinit not configured */ /* Build kinit refresh command line using string rendering and config */ cmd = rd_string_render(rk->rk_conf.sasl.kinit_cmd, errstr, sizeof(errstr), render_callback, rkb); if (!cmd) { rd_rkb_log(rkb, LOG_ERR, "SASLREFRESH", "Failed to construct kinit command " "from sasl.kerberos.kinit.cmd template: %s", errstr); return -1; } /* Execute kinit */ rd_rkb_dbg(rkb, SECURITY, "SASLREFRESH", "Refreshing SASL keys with command: %s", cmd); mtx_lock(&rd_kafka_sasl_cyrus_kinit_lock); r = system(cmd); mtx_unlock(&rd_kafka_sasl_cyrus_kinit_lock); if (r == -1) { rd_rkb_log(rkb, LOG_ERR, "SASLREFRESH", "SASL key refresh failed: Failed to execute %s", cmd); rd_free(cmd); return -1; } else if (WIFSIGNALED(r)) { rd_rkb_log(rkb, LOG_ERR, "SASLREFRESH", "SASL key refresh failed: %s: received signal %d", cmd, WTERMSIG(r)); rd_free(cmd); return -1; } else if (WIFEXITED(r) && WEXITSTATUS(r) != 0) { rd_rkb_log(rkb, LOG_ERR, "SASLREFRESH", "SASL key refresh failed: %s: exited with code %d", cmd, WEXITSTATUS(r)); rd_free(cmd); return -1; } rd_free(cmd); rd_rkb_dbg(rkb, SECURITY, "SASLREFRESH", "SASL key refreshed"); return 0; } /** * Refresh timer callback * * Locality: kafka main thread */ static void rd_kafka_sasl_cyrus_kinit_refresh_tmr_cb (rd_kafka_timers_t *rkts, void *arg) { rd_kafka_broker_t *rkb = arg; rd_kafka_sasl_cyrus_kinit_refresh(rkb); } /** * * libsasl callbacks * */ static RD_UNUSED int rd_kafka_sasl_cyrus_cb_getopt (void *context, const char *plugin_name, const char *option, const char **result, unsigned *len) { rd_kafka_transport_t *rktrans = context; if (!strcmp(option, "client_mech_list")) *result = "GSSAPI"; if (!strcmp(option, "canon_user_plugin")) *result = "INTERNAL"; if (*result && len) *len = strlen(*result); rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "LIBSASL", "CB_GETOPT: plugin %s, option %s: returning %s", plugin_name, option, *result); return SASL_OK; } static int rd_kafka_sasl_cyrus_cb_log (void *context, int level, const char *message){ rd_kafka_transport_t *rktrans = context; if (level >= LOG_DEBUG) rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "LIBSASL", "%s", message); else rd_rkb_log(rktrans->rktrans_rkb, level, "LIBSASL", "%s", message); return SASL_OK; } static int rd_kafka_sasl_cyrus_cb_getsimple (void *context, int id, const char **result, unsigned *len) { rd_kafka_transport_t *rktrans = context; switch (id) { case SASL_CB_USER: case SASL_CB_AUTHNAME: *result = rktrans->rktrans_rkb->rkb_rk->rk_conf.sasl.username; break; default: *result = NULL; break; } if (len) *len = *result ? strlen(*result) : 0; rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "LIBSASL", "CB_GETSIMPLE: id 0x%x: returning %s", id, *result); return *result ? SASL_OK : SASL_FAIL; } static int rd_kafka_sasl_cyrus_cb_getsecret (sasl_conn_t *conn, void *context, int id, sasl_secret_t **psecret) { rd_kafka_transport_t *rktrans = context; const char *password; password = rktrans->rktrans_rkb->rkb_rk->rk_conf.sasl.password; if (!password) { *psecret = NULL; } else { size_t passlen = strlen(password); *psecret = rd_realloc(*psecret, sizeof(**psecret) + passlen); (*psecret)->len = passlen; memcpy((*psecret)->data, password, passlen); } rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "LIBSASL", "CB_GETSECRET: id 0x%x: returning %s", id, *psecret ? "(hidden)":"NULL"); return SASL_OK; } static int rd_kafka_sasl_cyrus_cb_chalprompt (void *context, int id, const char *challenge, const char *prompt, const char *defres, const char **result, unsigned *len) { rd_kafka_transport_t *rktrans = context; *result = "min_chalprompt"; *len = strlen(*result); rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "LIBSASL", "CB_CHALPROMPT: id 0x%x, challenge %s, prompt %s, " "default %s: returning %s", id, challenge, prompt, defres, *result); return SASL_OK; } static int rd_kafka_sasl_cyrus_cb_getrealm (void *context, int id, const char **availrealms, const char **result) { rd_kafka_transport_t *rktrans = context; *result = *availrealms; rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "LIBSASL", "CB_GETREALM: id 0x%x: returning %s", id, *result); return SASL_OK; } static RD_UNUSED int rd_kafka_sasl_cyrus_cb_canon (sasl_conn_t *conn, void *context, const char *in, unsigned inlen, unsigned flags, const char *user_realm, char *out, unsigned out_max, unsigned *out_len) { rd_kafka_transport_t *rktrans = context; if (strstr(rktrans->rktrans_rkb->rkb_rk->rk_conf. sasl.mechanisms, "GSSAPI")) { *out_len = rd_snprintf(out, out_max, "%s", rktrans->rktrans_rkb->rkb_rk-> rk_conf.sasl.principal); } else if (!strcmp(rktrans->rktrans_rkb->rkb_rk->rk_conf. sasl.mechanisms, "PLAIN")) { *out_len = rd_snprintf(out, out_max, "%.*s", inlen, in); } else out = NULL; rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "LIBSASL", "CB_CANON: flags 0x%x, \"%.*s\" @ \"%s\": returning \"%.*s\"", flags, (int)inlen, in, user_realm, (int)(*out_len), out); return out ? SASL_OK : SASL_FAIL; } static void rd_kafka_sasl_cyrus_close (struct rd_kafka_transport_s *rktrans) { rd_kafka_sasl_cyrus_state_t *state = rktrans->rktrans_sasl.state; if (!state) return; if (state->conn) sasl_dispose(&state->conn); rd_free(state); } /** * Initialize and start SASL authentication. * * Returns 0 on successful init and -1 on error. * * Locality: broker thread */ static int rd_kafka_sasl_cyrus_client_new (rd_kafka_transport_t *rktrans, const char *hostname, char *errstr, size_t errstr_size) { int r; rd_kafka_sasl_cyrus_state_t *state; rd_kafka_broker_t *rkb = rktrans->rktrans_rkb; rd_kafka_t *rk = rkb->rkb_rk; sasl_callback_t callbacks[16] = { // { SASL_CB_GETOPT, (void *)rd_kafka_sasl_cyrus_cb_getopt, rktrans }, { SASL_CB_LOG, (void *)rd_kafka_sasl_cyrus_cb_log, rktrans }, { SASL_CB_AUTHNAME, (void *)rd_kafka_sasl_cyrus_cb_getsimple, rktrans }, { SASL_CB_PASS, (void *)rd_kafka_sasl_cyrus_cb_getsecret, rktrans }, { SASL_CB_ECHOPROMPT, (void *)rd_kafka_sasl_cyrus_cb_chalprompt, rktrans }, { SASL_CB_GETREALM, (void *)rd_kafka_sasl_cyrus_cb_getrealm, rktrans }, { SASL_CB_CANON_USER, (void *)rd_kafka_sasl_cyrus_cb_canon, rktrans }, { SASL_CB_LIST_END } }; state = rd_calloc(1, sizeof(*state)); rktrans->rktrans_sasl.state = state; /* SASL_CB_USER is needed for PLAIN but breaks GSSAPI */ if (!strcmp(rk->rk_conf.sasl.mechanisms, "PLAIN")) { int endidx; /* Find end of callbacks array */ for (endidx = 0 ; callbacks[endidx].id != SASL_CB_LIST_END ; endidx++) ; callbacks[endidx].id = SASL_CB_USER; callbacks[endidx].proc = (void *)rd_kafka_sasl_cyrus_cb_getsimple; callbacks[endidx].context = rktrans; endidx++; callbacks[endidx].id = SASL_CB_LIST_END; } memcpy(state->callbacks, callbacks, sizeof(callbacks)); /* Acquire or refresh ticket if kinit is configured */ rd_kafka_sasl_cyrus_kinit_refresh(rkb); r = sasl_client_new(rk->rk_conf.sasl.service_name, hostname, NULL, NULL, /* no local & remote IP checks */ state->callbacks, 0, &state->conn); if (r != SASL_OK) { rd_snprintf(errstr, errstr_size, "%s", sasl_errstring(r, NULL, NULL)); return -1; } if (rk->rk_conf.debug & RD_KAFKA_DBG_SECURITY) { const char *avail_mechs; sasl_listmech(state->conn, NULL, NULL, " ", NULL, &avail_mechs, NULL, NULL); rd_rkb_dbg(rkb, SECURITY, "SASL", "My supported SASL mechanisms: %s", avail_mechs); } do { const char *out; unsigned int outlen; const char *mech = NULL; r = sasl_client_start(state->conn, rk->rk_conf.sasl.mechanisms, NULL, &out, &outlen, &mech); if (r >= 0) if (rd_kafka_sasl_send(rktrans, out, outlen, errstr, errstr_size)) return -1; } while (r == SASL_INTERACT); if (r == SASL_OK) { /* PLAIN is appearantly done here, but we still need to make sure * the PLAIN frame is sent and we get a response back (but we must * not pass the response to libsasl or it will fail). */ rktrans->rktrans_sasl.complete = 1; return 0; } else if (r != SASL_CONTINUE) { rd_snprintf(errstr, errstr_size, "SASL handshake failed (start (%d)): %s", r, sasl_errdetail(state->conn)); return -1; } return 0; } /** * Per handle SASL term. * * Locality: broker thread */ static void rd_kafka_sasl_cyrus_broker_term (rd_kafka_broker_t *rkb) { rd_kafka_t *rk = rkb->rkb_rk; if (!rk->rk_conf.sasl.kinit_cmd) return; rd_kafka_timer_stop(&rk->rk_timers, &rkb->rkb_sasl_kinit_refresh_tmr,1); } /** * Broker SASL init. * * Locality: broker thread */ static void rd_kafka_sasl_cyrus_broker_init (rd_kafka_broker_t *rkb) { rd_kafka_t *rk = rkb->rkb_rk; if (!rk->rk_conf.sasl.kinit_cmd || !strstr(rk->rk_conf.sasl.mechanisms, "GSSAPI")) return; /* kinit not configured, no need to start timer */ rd_kafka_timer_start(&rk->rk_timers, &rkb->rkb_sasl_kinit_refresh_tmr, rk->rk_conf.sasl.relogin_min_time * 1000ll, rd_kafka_sasl_cyrus_kinit_refresh_tmr_cb, rkb); } static int rd_kafka_sasl_cyrus_conf_validate (rd_kafka_t *rk, char *errstr, size_t errstr_size) { if (strcmp(rk->rk_conf.sasl.mechanisms, "GSSAPI")) return 0; if (rk->rk_conf.sasl.kinit_cmd) { rd_kafka_broker_t rkb; char *cmd; char tmperr[128]; memset(&rkb, 0, sizeof(rkb)); strcpy(rkb.rkb_nodename, "ATestBroker:9092"); rkb.rkb_rk = rk; mtx_init(&rkb.rkb_lock, mtx_plain); cmd = rd_string_render(rk->rk_conf.sasl.kinit_cmd, tmperr, sizeof(tmperr), render_callback, &rkb); mtx_destroy(&rkb.rkb_lock); if (!cmd) { rd_snprintf(errstr, errstr_size, "Invalid sasl.kerberos.kinit.cmd value: %s", tmperr); return -1; } rd_free(cmd); } return 0; } /** * Global SASL termination. */ void rd_kafka_sasl_cyrus_global_term (void) { /* NOTE: Should not be called since the application may be using SASL too*/ /* sasl_done(); */ mtx_destroy(&rd_kafka_sasl_cyrus_kinit_lock); } /** * Global SASL init, called once per runtime. */ int rd_kafka_sasl_cyrus_global_init (void) { int r; mtx_init(&rd_kafka_sasl_cyrus_kinit_lock, mtx_plain); r = sasl_client_init(NULL); if (r != SASL_OK) { fprintf(stderr, "librdkafka: sasl_client_init() failed: %s\n", sasl_errstring(r, NULL, NULL)); return -1; } return 0; } const struct rd_kafka_sasl_provider rd_kafka_sasl_cyrus_provider = { .name = "Cyrus", .client_new = rd_kafka_sasl_cyrus_client_new, .recv = rd_kafka_sasl_cyrus_recv, .close = rd_kafka_sasl_cyrus_close, .broker_init = rd_kafka_sasl_cyrus_broker_init, .broker_term = rd_kafka_sasl_cyrus_broker_term, .conf_validate = rd_kafka_sasl_cyrus_conf_validate };