Blob Blame History Raw
/*
 * librdkafka - The Apache Kafka C/C++ library
 *
 * Copyright (c) 2015 Magnus Edenhill
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *
 * 1. Redistributions of source code must retain the above copyright notice,
 *    this list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright notice,
 *    this list of conditions and the following disclaimer in the documentation
 *    and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */

#include "rdkafka_int.h"
#include "rdkafka_transport.h"
#include "rdkafka_transport_int.h"
#include "rdkafka_sasl.h"
#include "rdkafka_sasl_int.h"
#include "rdstring.h"

#ifdef __FreeBSD__
#include <sys/wait.h>  /* For WIF.. */
#endif

#ifdef __APPLE__
/* Apple has deprecated most of the SASL API for unknown reason,
 * silence those warnings. */
#pragma clang diagnostic ignored "-Wdeprecated-declarations"
#endif

#include <sasl/sasl.h>

static mtx_t rd_kafka_sasl_cyrus_kinit_lock;

typedef struct rd_kafka_sasl_cyrus_state_s {
        sasl_conn_t *conn;
        sasl_callback_t callbacks[16];
} rd_kafka_sasl_cyrus_state_t;



/**
 * Handle received frame from broker.
 */
static int rd_kafka_sasl_cyrus_recv (struct rd_kafka_transport_s *rktrans,
                                     const void *buf, size_t size,
                                     char *errstr, size_t errstr_size) {
        rd_kafka_sasl_cyrus_state_t *state = rktrans->rktrans_sasl.state;
        int r;

        if (rktrans->rktrans_sasl.complete && size == 0)
                goto auth_successful;

        do {
                sasl_interact_t *interact = NULL;
                const char *out;
                unsigned int outlen;

                r = sasl_client_step(state->conn,
                                     size > 0 ? buf : NULL, size,
                                     &interact,
                                     &out, &outlen);

                if (r >= 0) {
                        /* Note: outlen may be 0 here for an empty response */
                        if (rd_kafka_sasl_send(rktrans, out, outlen,
                                               errstr, errstr_size) == -1)
                                return -1;
                }

                if (r == SASL_INTERACT)
                        rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SASL",
                                   "SASL_INTERACT: %lu %s, %s, %s, %p",
                                   interact->id,
                                   interact->challenge,
                                   interact->prompt,
                                   interact->defresult,
                                   interact->result);

        } while (r == SASL_INTERACT);

        if (r == SASL_CONTINUE)
                return 0;  /* Wait for more data from broker */
        else if (r != SASL_OK) {
                rd_snprintf(errstr, errstr_size,
                            "SASL handshake failed (step): %s",
                            sasl_errdetail(state->conn));
                return -1;
        }

        /* Authentication successful */
auth_successful:
        if (rktrans->rktrans_rkb->rkb_rk->rk_conf.debug &
            RD_KAFKA_DBG_SECURITY) {
                const char *user, *mech, *authsrc;

                if (sasl_getprop(state->conn, SASL_USERNAME,
                                 (const void **)&user) != SASL_OK)
                        user = "(unknown)";

                if (sasl_getprop(state->conn, SASL_MECHNAME,
                                 (const void **)&mech) != SASL_OK)
                        mech = "(unknown)";

                if (sasl_getprop(state->conn, SASL_AUTHSOURCE,
                                 (const void **)&authsrc) != SASL_OK)
                        authsrc = "(unknown)";

                rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SASL",
                           "Authenticated as %s using %s (%s)",
                           user, mech, authsrc);
        }

        rd_kafka_sasl_auth_done(rktrans);

        return 0;
}




static ssize_t render_callback (const char *key, char *buf,
                                size_t size, void *opaque) {
        rd_kafka_broker_t *rkb = opaque;

        if (!strcmp(key, "broker.name")) {
                char *val, *t;
                size_t len;
                rd_kafka_broker_lock(rkb);
                rd_strdupa(&val, rkb->rkb_nodename);
                rd_kafka_broker_unlock(rkb);

                /* Just the broker name, no port */
                if ((t = strchr(val, ':')))
                        len = (size_t)(t-val);
                else
                        len = strlen(val);

                if (buf)
                        memcpy(buf, val, RD_MIN(len, size));

                return len;

        } else {
                rd_kafka_conf_res_t res;
                size_t destsize = size;

                /* Try config lookup. */
                res = rd_kafka_conf_get(&rkb->rkb_rk->rk_conf, key,
                                        buf, &destsize);
                if (res != RD_KAFKA_CONF_OK)
                        return -1;

                /* Dont include \0 in returned size */
                return (destsize > 0 ? destsize-1 : destsize);
        }
}


/**
 * Execute kinit to refresh ticket.
 *
 * Returns 0 on success, -1 on error.
 *
 * Locality: any
 */
static int rd_kafka_sasl_cyrus_kinit_refresh (rd_kafka_broker_t *rkb) {
        rd_kafka_t *rk = rkb->rkb_rk;
        int r;
        char *cmd;
        char errstr[128];

        if (!rk->rk_conf.sasl.kinit_cmd ||
            !strstr(rk->rk_conf.sasl.mechanisms, "GSSAPI"))
                return 0; /* kinit not configured */

        /* Build kinit refresh command line using string rendering and config */
        cmd = rd_string_render(rk->rk_conf.sasl.kinit_cmd,
                               errstr, sizeof(errstr),
                               render_callback, rkb);
        if (!cmd) {
                rd_rkb_log(rkb, LOG_ERR, "SASLREFRESH",
                           "Failed to construct kinit command "
                           "from sasl.kerberos.kinit.cmd template: %s",
                           errstr);
                return -1;
        }

        /* Execute kinit */
        rd_rkb_dbg(rkb, SECURITY, "SASLREFRESH",
                   "Refreshing SASL keys with command: %s", cmd);

        mtx_lock(&rd_kafka_sasl_cyrus_kinit_lock);
        r = system(cmd);
        mtx_unlock(&rd_kafka_sasl_cyrus_kinit_lock);

        if (r == -1) {
                rd_rkb_log(rkb, LOG_ERR, "SASLREFRESH",
                           "SASL key refresh failed: Failed to execute %s",
                           cmd);
                rd_free(cmd);
                return -1;
        } else if (WIFSIGNALED(r)) {
                rd_rkb_log(rkb, LOG_ERR, "SASLREFRESH",
                           "SASL key refresh failed: %s: received signal %d",
                           cmd, WTERMSIG(r));
                rd_free(cmd);
                return -1;
        } else if (WIFEXITED(r) && WEXITSTATUS(r) != 0) {
                rd_rkb_log(rkb, LOG_ERR, "SASLREFRESH",
                           "SASL key refresh failed: %s: exited with code %d",
                           cmd, WEXITSTATUS(r));
                rd_free(cmd);
                return -1;
        }

        rd_free(cmd);

        rd_rkb_dbg(rkb, SECURITY, "SASLREFRESH", "SASL key refreshed");
        return 0;
}


/**
 * Refresh timer callback
 *
 * Locality: kafka main thread
 */
static void rd_kafka_sasl_cyrus_kinit_refresh_tmr_cb (rd_kafka_timers_t *rkts,
                                                      void *arg) {
        rd_kafka_broker_t *rkb = arg;

        rd_kafka_sasl_cyrus_kinit_refresh(rkb);
}



/**
 *
 * libsasl callbacks
 *
 */
static RD_UNUSED int
rd_kafka_sasl_cyrus_cb_getopt (void *context, const char *plugin_name,
                         const char *option,
                         const char **result, unsigned *len) {
        rd_kafka_transport_t *rktrans = context;

        if (!strcmp(option, "client_mech_list"))
                *result = "GSSAPI";
        if (!strcmp(option, "canon_user_plugin"))
                *result = "INTERNAL";

        if (*result && len)
                *len = strlen(*result);

        rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "LIBSASL",
                   "CB_GETOPT: plugin %s, option %s: returning %s",
                   plugin_name, option, *result);

        return SASL_OK;
}

static int rd_kafka_sasl_cyrus_cb_log (void *context, int level, const char *message){
        rd_kafka_transport_t *rktrans = context;

        if (level >= LOG_DEBUG)
                rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "LIBSASL",
                           "%s", message);
        else
                rd_rkb_log(rktrans->rktrans_rkb, level, "LIBSASL",
                           "%s", message);
        return SASL_OK;
}


static int rd_kafka_sasl_cyrus_cb_getsimple (void *context, int id,
                                       const char **result, unsigned *len) {
        rd_kafka_transport_t *rktrans = context;

        switch (id)
        {
        case SASL_CB_USER:
        case SASL_CB_AUTHNAME:
                *result = rktrans->rktrans_rkb->rkb_rk->rk_conf.sasl.username;
                break;

        default:
                *result = NULL;
                break;
        }

        if (len)
                *len = *result ? strlen(*result) : 0;

        rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "LIBSASL",
                   "CB_GETSIMPLE: id 0x%x: returning %s", id, *result);

        return *result ? SASL_OK : SASL_FAIL;
}


static int rd_kafka_sasl_cyrus_cb_getsecret (sasl_conn_t *conn, void *context,
                                       int id, sasl_secret_t **psecret) {
        rd_kafka_transport_t *rktrans = context;
        const char *password;

        password = rktrans->rktrans_rkb->rkb_rk->rk_conf.sasl.password;

        if (!password) {
                *psecret = NULL;
        } else {
                size_t passlen = strlen(password);
                *psecret = rd_realloc(*psecret, sizeof(**psecret) + passlen);
                (*psecret)->len = passlen;
                memcpy((*psecret)->data, password, passlen);
        }

        rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "LIBSASL",
                   "CB_GETSECRET: id 0x%x: returning %s",
                   id, *psecret ? "(hidden)":"NULL");

        return SASL_OK;
}

static int rd_kafka_sasl_cyrus_cb_chalprompt (void *context, int id,
                                        const char *challenge,
                                        const char *prompt,
                                        const char *defres,
                                        const char **result, unsigned *len) {
        rd_kafka_transport_t *rktrans = context;

        *result = "min_chalprompt";
        *len = strlen(*result);

        rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "LIBSASL",
                   "CB_CHALPROMPT: id 0x%x, challenge %s, prompt %s, "
                   "default %s: returning %s",
                   id, challenge, prompt, defres, *result);

        return SASL_OK;
}

static int rd_kafka_sasl_cyrus_cb_getrealm (void *context, int id,
                                      const char **availrealms,
                                      const char **result) {
        rd_kafka_transport_t *rktrans = context;

        *result = *availrealms;

        rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "LIBSASL",
                   "CB_GETREALM: id 0x%x: returning %s", id, *result);

        return SASL_OK;
}


static RD_UNUSED int
rd_kafka_sasl_cyrus_cb_canon (sasl_conn_t *conn,
                              void *context,
                              const char *in, unsigned inlen,
                              unsigned flags,
                              const char *user_realm,
                              char *out, unsigned out_max,
                              unsigned *out_len) {
        rd_kafka_transport_t *rktrans = context;

        if (strstr(rktrans->rktrans_rkb->rkb_rk->rk_conf.
                   sasl.mechanisms, "GSSAPI")) {
                *out_len = rd_snprintf(out, out_max, "%s",
                                       rktrans->rktrans_rkb->rkb_rk->
                                       rk_conf.sasl.principal);
        } else if (!strcmp(rktrans->rktrans_rkb->rkb_rk->rk_conf.
                           sasl.mechanisms, "PLAIN")) {
                *out_len = rd_snprintf(out, out_max, "%.*s", inlen, in);
        } else
                out = NULL;

        rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "LIBSASL",
                   "CB_CANON: flags 0x%x, \"%.*s\" @ \"%s\": returning \"%.*s\"",
                   flags, (int)inlen, in, user_realm, (int)(*out_len), out);

        return out ? SASL_OK : SASL_FAIL;
}


static void rd_kafka_sasl_cyrus_close (struct rd_kafka_transport_s *rktrans) {
        rd_kafka_sasl_cyrus_state_t *state = rktrans->rktrans_sasl.state;

        if (!state)
                return;

        if (state->conn)
                sasl_dispose(&state->conn);
        rd_free(state);
}


/**
 * Initialize and start SASL authentication.
 *
 * Returns 0 on successful init and -1 on error.
 *
 * Locality: broker thread
 */
static int rd_kafka_sasl_cyrus_client_new (rd_kafka_transport_t *rktrans,
                                           const char *hostname,
                                           char *errstr, size_t errstr_size) {
        int r;
        rd_kafka_sasl_cyrus_state_t *state;
        rd_kafka_broker_t *rkb = rktrans->rktrans_rkb;
        rd_kafka_t *rk = rkb->rkb_rk;
        sasl_callback_t callbacks[16] = {
                // { SASL_CB_GETOPT, (void *)rd_kafka_sasl_cyrus_cb_getopt, rktrans },
                { SASL_CB_LOG, (void *)rd_kafka_sasl_cyrus_cb_log, rktrans },
                { SASL_CB_AUTHNAME, (void *)rd_kafka_sasl_cyrus_cb_getsimple, rktrans },
                { SASL_CB_PASS, (void *)rd_kafka_sasl_cyrus_cb_getsecret, rktrans },
                { SASL_CB_ECHOPROMPT, (void *)rd_kafka_sasl_cyrus_cb_chalprompt, rktrans },
                { SASL_CB_GETREALM, (void *)rd_kafka_sasl_cyrus_cb_getrealm, rktrans },
                { SASL_CB_CANON_USER, (void *)rd_kafka_sasl_cyrus_cb_canon, rktrans },
                { SASL_CB_LIST_END }
        };

        state = rd_calloc(1, sizeof(*state));
        rktrans->rktrans_sasl.state = state;

        /* SASL_CB_USER is needed for PLAIN but breaks GSSAPI */
        if (!strcmp(rk->rk_conf.sasl.mechanisms, "PLAIN")) {
                int endidx;
                /* Find end of callbacks array */
                for (endidx = 0 ;
                     callbacks[endidx].id != SASL_CB_LIST_END ; endidx++)
                        ;

                callbacks[endidx].id = SASL_CB_USER;
                callbacks[endidx].proc = (void *)rd_kafka_sasl_cyrus_cb_getsimple;
                callbacks[endidx].context = rktrans;
                endidx++;
                callbacks[endidx].id = SASL_CB_LIST_END;
        }

        memcpy(state->callbacks, callbacks, sizeof(callbacks));

        /* Acquire or refresh ticket if kinit is configured */ 
        rd_kafka_sasl_cyrus_kinit_refresh(rkb);

        r = sasl_client_new(rk->rk_conf.sasl.service_name, hostname,
                            NULL, NULL, /* no local & remote IP checks */
                            state->callbacks, 0, &state->conn);
        if (r != SASL_OK) {
                rd_snprintf(errstr, errstr_size, "%s",
                            sasl_errstring(r, NULL, NULL));
                return -1;
        }

        if (rk->rk_conf.debug & RD_KAFKA_DBG_SECURITY) {
                const char *avail_mechs;
                sasl_listmech(state->conn, NULL, NULL, " ", NULL,
                              &avail_mechs, NULL, NULL);
                rd_rkb_dbg(rkb, SECURITY, "SASL",
                           "My supported SASL mechanisms: %s", avail_mechs);
        }

        do {
                const char *out;
                unsigned int outlen;
                const char *mech = NULL;

                r = sasl_client_start(state->conn,
                                      rk->rk_conf.sasl.mechanisms,
                                      NULL, &out, &outlen, &mech);

                if (r >= 0)
                        if (rd_kafka_sasl_send(rktrans, out, outlen,
                                               errstr, errstr_size))
                                return -1;
        } while (r == SASL_INTERACT);

        if (r == SASL_OK) {
                /* PLAIN is appearantly done here, but we still need to make sure
                 * the PLAIN frame is sent and we get a response back (but we must
                 * not pass the response to libsasl or it will fail). */
                rktrans->rktrans_sasl.complete = 1;
                return 0;

        } else if (r != SASL_CONTINUE) {
                rd_snprintf(errstr, errstr_size,
                            "SASL handshake failed (start (%d)): %s",
                            r, sasl_errdetail(state->conn));
                return -1;
        }

        return 0;
}







/**
 * Per handle SASL term.
 *
 * Locality: broker thread
 */
static void rd_kafka_sasl_cyrus_broker_term (rd_kafka_broker_t *rkb) {
        rd_kafka_t *rk = rkb->rkb_rk;

        if (!rk->rk_conf.sasl.kinit_cmd)
                return;

        rd_kafka_timer_stop(&rk->rk_timers, &rkb->rkb_sasl_kinit_refresh_tmr,1);
}

/**
 * Broker SASL init.
 *
 * Locality: broker thread
 */
static void rd_kafka_sasl_cyrus_broker_init (rd_kafka_broker_t *rkb) {
        rd_kafka_t *rk = rkb->rkb_rk;

        if (!rk->rk_conf.sasl.kinit_cmd ||
            !strstr(rk->rk_conf.sasl.mechanisms, "GSSAPI"))
                return; /* kinit not configured, no need to start timer */

        rd_kafka_timer_start(&rk->rk_timers, &rkb->rkb_sasl_kinit_refresh_tmr,
                             rk->rk_conf.sasl.relogin_min_time * 1000ll,
                             rd_kafka_sasl_cyrus_kinit_refresh_tmr_cb, rkb);
}



static int rd_kafka_sasl_cyrus_conf_validate (rd_kafka_t *rk,
                                       char *errstr, size_t errstr_size) {

        if (strcmp(rk->rk_conf.sasl.mechanisms, "GSSAPI"))
                return 0;

        if (rk->rk_conf.sasl.kinit_cmd) {
                rd_kafka_broker_t rkb;
                char *cmd;
                char tmperr[128];

                memset(&rkb, 0, sizeof(rkb));
                strcpy(rkb.rkb_nodename, "ATestBroker:9092");
                rkb.rkb_rk = rk;
                mtx_init(&rkb.rkb_lock, mtx_plain);

                cmd = rd_string_render(rk->rk_conf.sasl.kinit_cmd,
                                       tmperr, sizeof(tmperr),
                                       render_callback, &rkb);

                mtx_destroy(&rkb.rkb_lock);

                if (!cmd) {
                        rd_snprintf(errstr, errstr_size,
                                    "Invalid sasl.kerberos.kinit.cmd value: %s",
                                    tmperr);
                        return -1;
                }

                rd_free(cmd);
        }

        return 0;
}


/**
 * Global SASL termination.
 */
void rd_kafka_sasl_cyrus_global_term (void) {
        /* NOTE: Should not be called since the application may be using SASL too*/
        /* sasl_done(); */
        mtx_destroy(&rd_kafka_sasl_cyrus_kinit_lock);
}


/**
 * Global SASL init, called once per runtime.
 */
int rd_kafka_sasl_cyrus_global_init (void) {
        int r;

        mtx_init(&rd_kafka_sasl_cyrus_kinit_lock, mtx_plain);

        r = sasl_client_init(NULL);
        if (r != SASL_OK) {
                fprintf(stderr, "librdkafka: sasl_client_init() failed: %s\n",
                        sasl_errstring(r, NULL, NULL));
                return -1;
        }

        return 0;
}


const struct rd_kafka_sasl_provider rd_kafka_sasl_cyrus_provider = {
        .name          = "Cyrus",
        .client_new    = rd_kafka_sasl_cyrus_client_new,
        .recv          = rd_kafka_sasl_cyrus_recv,
        .close         = rd_kafka_sasl_cyrus_close,
        .broker_init   = rd_kafka_sasl_cyrus_broker_init,
        .broker_term   = rd_kafka_sasl_cyrus_broker_term,
        .conf_validate = rd_kafka_sasl_cyrus_conf_validate
};