Blob Blame History Raw
/* Copyright (C) 2011 the GSS-PROXY contributors, see COPYING for license */

#include "config.h"
#include <pthread.h>
#include <stdint.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <fcntl.h>
#include <errno.h>
#include "gp_proxy.h"

#define DEFAULT_WORKER_THREADS_NUM 5

#define GP_QUERY_IN 0
#define GP_QUERY_OUT 1
#define GP_QUERY_ERR 2

struct gp_query {
    struct gp_query *next;

    struct gp_conn *conn;
    uint8_t *buffer;
    size_t buflen;

    int status;
};

struct gp_thread {
    struct gp_thread *prev;
    struct gp_thread *next;
    struct gp_workers *pool;
    pthread_t tid;

    struct gp_query *query;
    pthread_mutex_t cond_mutex;
    pthread_cond_t cond_wakeup;
};

struct gp_workers {
    pthread_mutex_t lock;
    struct gssproxy_ctx *gpctx;
    bool shutdown;
    struct gp_query *wait_list;
    struct gp_query *reply_list;
    struct gp_thread *free_list;
    struct gp_thread *busy_list;
    int num_threads;
    int sig_pipe[2];
};

static void *gp_worker_main(void *pvt);
static void gp_handle_query(struct gp_workers *w, struct gp_query *q);
static void gp_handle_reply(verto_ctx *vctx, verto_ev *ev);

/** DISPATCHER FUNCTIONS **/

int gp_workers_init(struct gssproxy_ctx *gpctx)
{
    struct gp_workers *w;
    struct gp_thread *t;
    pthread_attr_t attr;
    verto_ev *ev;
    int vflags;
    int ret;
    int i;

    w = calloc(1, sizeof(struct gp_workers));
    if (!w) {
        return ENOMEM;
    }
    w->gpctx = gpctx;

    /* init global queue mutex */
    ret = pthread_mutex_init(&w->lock, NULL);
    if (ret) {
        free(w);
        return ENOMEM;
    }

    if (gpctx->config->num_workers > 0) {
        w->num_threads = gpctx->config->num_workers;
    } else {
        w->num_threads = DEFAULT_WORKER_THREADS_NUM;
    }

    /* make thread joinable (portability) */
    pthread_attr_init(&attr);
    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);

    /* init all workers */
    for (i = 0; i < w->num_threads; i++) {
        t = calloc(1, sizeof(struct gp_thread));
        if (!t) {
            ret = -1;
            goto done;
        }
        t->pool = w;
        ret = pthread_cond_init(&t->cond_wakeup, NULL);
        if (ret) {
            free(t);
            goto done;
        }
        ret = pthread_mutex_init(&t->cond_mutex, NULL);
        if (ret) {
            free(t);
            goto done;
        }
        ret = pthread_create(&t->tid, &attr, gp_worker_main, t);
        if (ret) {
            free(t);
            goto done;
        }
        LIST_ADD(w->free_list, t);
    }

    /* add wakeup pipe, so that threads can hand back replies to the
     * dispatcher */
    ret = pipe2(w->sig_pipe, O_NONBLOCK | O_CLOEXEC);
    if (ret == -1) {
        goto done;
    }

    vflags = VERTO_EV_FLAG_PERSIST | VERTO_EV_FLAG_IO_READ;
    ev = verto_add_io(gpctx->vctx, vflags, gp_handle_reply, w->sig_pipe[0]);
    if (!ev) {
        ret = -1;
        goto done;
    }
    verto_set_private(ev, w, NULL);

    gpctx->workers = w;
    ret = 0;

done:
    if (ret) {
        gp_workers_free(w);
    }
    return ret;
}

void gp_workers_free(struct gp_workers *w)
{
    struct gp_thread *t;
    void *retval;

    /* ======> POOL LOCK */
    pthread_mutex_lock(&w->lock);

    w->shutdown = true;

    /* <====== POOL LOCK */
    pthread_mutex_unlock(&w->lock);

    /* we do not run the following operations within
     * the lock, or deadlocks may arise for threads
     * that are just finishing doing some work */

    /* we guarantee nobody is touching these lists by
     * preventing workers from touching the free/busy
     * lists when a 'shutdown' is in progress */

    while (w->free_list) {
        /* pick threads one by one */
        t = w->free_list;
        LIST_DEL(w->free_list, t);

        /* wake up threads, then join them */
        /* ======> COND_MUTEX */
        pthread_mutex_lock(&t->cond_mutex);
        pthread_cond_signal(&t->cond_wakeup);
        /* <====== COND_MUTEX */
        pthread_mutex_unlock(&t->cond_mutex);

        pthread_join(t->tid, &retval);

        pthread_mutex_destroy(&t->cond_mutex);
        pthread_cond_destroy(&t->cond_wakeup);
        free(t);
    }

    /* do the same with the busy list */
    while (w->busy_list) {
        /* pick threads one by one */
        t = w->busy_list;
        LIST_DEL(w->free_list, t);

        /* wake up threads, then join them */
        /* ======> COND_MUTEX */
        pthread_mutex_lock(&t->cond_mutex);
        pthread_cond_signal(&t->cond_wakeup);
        /* <====== COND_MUTEX */
        pthread_mutex_unlock(&t->cond_mutex);

        pthread_join(t->tid, &retval);

        pthread_mutex_destroy(&t->cond_mutex);
        pthread_cond_destroy(&t->cond_wakeup);
        free(t);
    }

    close(w->sig_pipe[0]);
    close(w->sig_pipe[1]);

    pthread_mutex_destroy(&w->lock);

    free(w);
}

static void gp_query_assign(struct gp_workers *w, struct gp_query *q)
{
    struct gp_thread *t = NULL;

    /* then either find a free thread or queue in the wait list */

    /* ======> POOL LOCK */
    pthread_mutex_lock(&w->lock);
    if (w->free_list) {
        t = w->free_list;
        LIST_DEL(w->free_list, t);
        LIST_ADD(w->busy_list, t);
    }
    /* <====== POOL LOCK */
    pthread_mutex_unlock(&w->lock);

    if (t) {
        /* found free thread, assign work */

        /* ======> COND_MUTEX */
        pthread_mutex_lock(&t->cond_mutex);

        /* hand over the query */
        t->query = q;
        pthread_cond_signal(&t->cond_wakeup);

        /* <====== COND_MUTEX */
        pthread_mutex_unlock(&t->cond_mutex);

    } else {

        /* all threads are busy, store in wait list */

        /* only the dispatcher handles wait_list
        *  so we do not need to lock around it */
        q->next = w->wait_list;
        w->wait_list = q;
    }
}

static void gp_query_free(struct gp_query *q, bool free_buffer)
{
    if (!q) {
        return;
    }

    if (free_buffer) {
        free(q->buffer);
    }

    free(q);
}

int gp_query_new(struct gp_workers *w, struct gp_conn *conn,
                 uint8_t *buffer, size_t buflen)
{
    struct gp_query *q;

    /* create query struct */
    q = calloc(1, sizeof(struct gp_query));
    if (!q) {
        return ENOMEM;
    }

    q->conn = conn;
    q->buffer = buffer;
    q->buflen = buflen;

    gp_query_assign(w, q);

    return 0;
}

static void gp_handle_reply(verto_ctx *vctx, verto_ev *ev)
{
    struct gp_workers *w;
    struct gp_query *q = NULL;
    char dummy;
    int ret;

    w = verto_get_private(ev);

    /* first read out the dummy so the pipe doesn't get clogged */
    ret = read(w->sig_pipe[0], &dummy, 1);
    if (ret) {
        /* ignore errors */
    }

    /* grab a query reply if any */
    if (w->reply_list) {
        /* ======> POOL LOCK */
        pthread_mutex_lock(&w->lock);

        if (w->reply_list != NULL) {
            q = w->reply_list;
            w->reply_list = q->next;
        }

        /* <====== POOL LOCK */
        pthread_mutex_unlock(&w->lock);
    }

    if (q) {
        switch (q->status) {
        case GP_QUERY_IN:
            /* ?! fallback and kill client conn */
        case GP_QUERY_ERR:
            GPDEBUGN(3, "[status] Handling query error, terminating CID %d.\n",
                     gp_conn_get_cid(q->conn));
            gp_conn_free(q->conn);
            gp_query_free(q, true);
            break;

        case GP_QUERY_OUT:
            GPDEBUGN(3, "[status] Handling query reply: %p (%zu)\n", q->buffer, q->buflen);
            gp_socket_send_data(vctx, q->conn, q->buffer, q->buflen);
            gp_query_free(q, false);
            break;
        }
    }

    /* while we are at it, check if there is anything in the wait list
     * we need to process, as one thread just got free :-) */

    q = NULL;

    if (w->wait_list) {
        /* only the dispatcher handles wait_list
        *  so we do not need to lock around it */
        if (w->wait_list) {
            q = w->wait_list;
            w->wait_list = q->next;
            q->next = NULL;
        }
    }

    if (q) {
        gp_query_assign(w, q);
    }
}


/** WORKER THREADS **/

static void *gp_worker_main(void *pvt)
{
    struct gp_thread *t = (struct gp_thread *)pvt;
    struct gp_query *q = NULL;
    char dummy = 0;
    int ret;

    while (!t->pool->shutdown) {

        /* initialize debug client id to 0 until work is scheduled */
        gp_debug_set_conn_id(0);

        /* ======> COND_MUTEX */
        pthread_mutex_lock(&t->cond_mutex);
        while (t->query == NULL) {
            /* wait for next query */
            pthread_cond_wait(&t->cond_wakeup, &t->cond_mutex);
            if (t->pool->shutdown) {
                pthread_exit(NULL);
            }
        }

        /* grab the query off the shared pointer */
        q = t->query;
        t->query = NULL;

        /* <====== COND_MUTEX */
        pthread_mutex_unlock(&t->cond_mutex);

        /* set client id before hndling requests */
        gp_debug_set_conn_id(gp_conn_get_cid(q->conn));

        /* handle the client request */
        GPDEBUGN(3, "[status] Handling query input: %p (%zu)\n", q->buffer,
                 q->buflen);
        gp_handle_query(t->pool, q);
        GPDEBUGN(3 ,"[status] Handling query output: %p (%zu)\n", q->buffer,
                 q->buflen);

        /* now get lock on main queue, to play with the reply list */
        /* ======> POOL LOCK */
        pthread_mutex_lock(&t->pool->lock);

        /* put back query so that dispatcher can send reply */
        q->next = t->pool->reply_list;
        t->pool->reply_list = q;

        /* add us back to the free list but only if we are not
         * shutting down */
        if (!t->pool->shutdown) {
            LIST_DEL(t->pool->busy_list, t);
            LIST_ADD(t->pool->free_list, t);
        }

        /* <====== POOL LOCK */
        pthread_mutex_unlock(&t->pool->lock);

        /* and wake up dispatcher so it will handle it */
        ret = write(t->pool->sig_pipe[1], &dummy, 1);
        if (ret == -1) {
            GPERROR("Failed to signal dispatcher!");
        }
    }

    pthread_exit(NULL);
}

static void gp_handle_query(struct gp_workers *w, struct gp_query *q)
{
    struct gp_call_ctx gpcall = { 0 };
    uint8_t *buffer;
    size_t buflen;
    int ret;

    /* find service */
    gpcall.gpctx = w->gpctx;
    gpcall.service = gp_creds_match_conn(w->gpctx, q->conn);
    if (!gpcall.service) {
        q->status = GP_QUERY_ERR;
        return;
    }
    gpcall.connection = q->conn;

    ret = gp_rpc_process_call(&gpcall,
                              q->buffer, q->buflen,
                              &buffer, &buflen);
    if (ret) {
        q->status = GP_QUERY_ERR;
    } else {
        q->status = GP_QUERY_OUT;
        free(q->buffer);
        q->buffer = buffer;
        q->buflen = buflen;
    }

    if (gpcall.destroy_callback) {
        gpcall.destroy_callback(gpcall.destroy_callback_data);
    }
}