/* Copyright (C) 2011 the GSS-PROXY contributors, see COPYING for license */ #include "config.h" #include #include #include #include #include #include #include #include "gp_proxy.h" #define DEFAULT_WORKER_THREADS_NUM 5 #define GP_QUERY_IN 0 #define GP_QUERY_OUT 1 #define GP_QUERY_ERR 2 struct gp_query { struct gp_query *next; struct gp_conn *conn; uint8_t *buffer; size_t buflen; int status; }; struct gp_thread { struct gp_thread *prev; struct gp_thread *next; struct gp_workers *pool; pthread_t tid; struct gp_query *query; pthread_mutex_t cond_mutex; pthread_cond_t cond_wakeup; }; struct gp_workers { pthread_mutex_t lock; struct gssproxy_ctx *gpctx; bool shutdown; struct gp_query *wait_list; struct gp_query *reply_list; struct gp_thread *free_list; struct gp_thread *busy_list; int num_threads; int sig_pipe[2]; }; static void *gp_worker_main(void *pvt); static void gp_handle_query(struct gp_workers *w, struct gp_query *q); static void gp_handle_reply(verto_ctx *vctx, verto_ev *ev); /** DISPATCHER FUNCTIONS **/ int gp_workers_init(struct gssproxy_ctx *gpctx) { struct gp_workers *w; struct gp_thread *t; pthread_attr_t attr; verto_ev *ev; int vflags; int ret; int i; w = calloc(1, sizeof(struct gp_workers)); if (!w) { return ENOMEM; } w->gpctx = gpctx; /* init global queue mutex */ ret = pthread_mutex_init(&w->lock, NULL); if (ret) { free(w); return ENOMEM; } if (gpctx->config->num_workers > 0) { w->num_threads = gpctx->config->num_workers; } else { w->num_threads = DEFAULT_WORKER_THREADS_NUM; } /* make thread joinable (portability) */ pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); /* init all workers */ for (i = 0; i < w->num_threads; i++) { t = calloc(1, sizeof(struct gp_thread)); if (!t) { ret = -1; goto done; } t->pool = w; ret = pthread_cond_init(&t->cond_wakeup, NULL); if (ret) { free(t); goto done; } ret = pthread_mutex_init(&t->cond_mutex, NULL); if (ret) { free(t); goto done; } ret = pthread_create(&t->tid, &attr, gp_worker_main, t); if (ret) { free(t); goto done; } LIST_ADD(w->free_list, t); } /* add wakeup pipe, so that threads can hand back replies to the * dispatcher */ ret = pipe2(w->sig_pipe, O_NONBLOCK | O_CLOEXEC); if (ret == -1) { goto done; } vflags = VERTO_EV_FLAG_PERSIST | VERTO_EV_FLAG_IO_READ; ev = verto_add_io(gpctx->vctx, vflags, gp_handle_reply, w->sig_pipe[0]); if (!ev) { ret = -1; goto done; } verto_set_private(ev, w, NULL); gpctx->workers = w; ret = 0; done: if (ret) { gp_workers_free(w); } return ret; } void gp_workers_free(struct gp_workers *w) { struct gp_thread *t; void *retval; /* ======> POOL LOCK */ pthread_mutex_lock(&w->lock); w->shutdown = true; /* <====== POOL LOCK */ pthread_mutex_unlock(&w->lock); /* we do not run the following operations within * the lock, or deadlocks may arise for threads * that are just finishing doing some work */ /* we guarantee nobody is touching these lists by * preventing workers from touching the free/busy * lists when a 'shutdown' is in progress */ while (w->free_list) { /* pick threads one by one */ t = w->free_list; LIST_DEL(w->free_list, t); /* wake up threads, then join them */ /* ======> COND_MUTEX */ pthread_mutex_lock(&t->cond_mutex); pthread_cond_signal(&t->cond_wakeup); /* <====== COND_MUTEX */ pthread_mutex_unlock(&t->cond_mutex); pthread_join(t->tid, &retval); pthread_mutex_destroy(&t->cond_mutex); pthread_cond_destroy(&t->cond_wakeup); free(t); } /* do the same with the busy list */ while (w->busy_list) { /* pick threads one by one */ t = w->busy_list; LIST_DEL(w->free_list, t); /* wake up threads, then join them */ /* ======> COND_MUTEX */ pthread_mutex_lock(&t->cond_mutex); pthread_cond_signal(&t->cond_wakeup); /* <====== COND_MUTEX */ pthread_mutex_unlock(&t->cond_mutex); pthread_join(t->tid, &retval); pthread_mutex_destroy(&t->cond_mutex); pthread_cond_destroy(&t->cond_wakeup); free(t); } close(w->sig_pipe[0]); close(w->sig_pipe[1]); pthread_mutex_destroy(&w->lock); free(w); } static void gp_query_assign(struct gp_workers *w, struct gp_query *q) { struct gp_thread *t = NULL; /* then either find a free thread or queue in the wait list */ /* ======> POOL LOCK */ pthread_mutex_lock(&w->lock); if (w->free_list) { t = w->free_list; LIST_DEL(w->free_list, t); LIST_ADD(w->busy_list, t); } /* <====== POOL LOCK */ pthread_mutex_unlock(&w->lock); if (t) { /* found free thread, assign work */ /* ======> COND_MUTEX */ pthread_mutex_lock(&t->cond_mutex); /* hand over the query */ t->query = q; pthread_cond_signal(&t->cond_wakeup); /* <====== COND_MUTEX */ pthread_mutex_unlock(&t->cond_mutex); } else { /* all threads are busy, store in wait list */ /* only the dispatcher handles wait_list * so we do not need to lock around it */ q->next = w->wait_list; w->wait_list = q; } } static void gp_query_free(struct gp_query *q, bool free_buffer) { if (!q) { return; } if (free_buffer) { free(q->buffer); } free(q); } int gp_query_new(struct gp_workers *w, struct gp_conn *conn, uint8_t *buffer, size_t buflen) { struct gp_query *q; /* create query struct */ q = calloc(1, sizeof(struct gp_query)); if (!q) { return ENOMEM; } q->conn = conn; q->buffer = buffer; q->buflen = buflen; gp_query_assign(w, q); return 0; } static void gp_handle_reply(verto_ctx *vctx, verto_ev *ev) { struct gp_workers *w; struct gp_query *q = NULL; char dummy; int ret; w = verto_get_private(ev); /* first read out the dummy so the pipe doesn't get clogged */ ret = read(w->sig_pipe[0], &dummy, 1); if (ret) { /* ignore errors */ } /* grab a query reply if any */ if (w->reply_list) { /* ======> POOL LOCK */ pthread_mutex_lock(&w->lock); if (w->reply_list != NULL) { q = w->reply_list; w->reply_list = q->next; } /* <====== POOL LOCK */ pthread_mutex_unlock(&w->lock); } if (q) { switch (q->status) { case GP_QUERY_IN: /* ?! fallback and kill client conn */ case GP_QUERY_ERR: GPDEBUGN(3, "[status] Handling query error, terminating CID %d.\n", gp_conn_get_cid(q->conn)); gp_conn_free(q->conn); gp_query_free(q, true); break; case GP_QUERY_OUT: GPDEBUGN(3, "[status] Handling query reply: %p (%zu)\n", q->buffer, q->buflen); gp_socket_send_data(vctx, q->conn, q->buffer, q->buflen); gp_query_free(q, false); break; } } /* while we are at it, check if there is anything in the wait list * we need to process, as one thread just got free :-) */ q = NULL; if (w->wait_list) { /* only the dispatcher handles wait_list * so we do not need to lock around it */ if (w->wait_list) { q = w->wait_list; w->wait_list = q->next; q->next = NULL; } } if (q) { gp_query_assign(w, q); } } /** WORKER THREADS **/ static void *gp_worker_main(void *pvt) { struct gp_thread *t = (struct gp_thread *)pvt; struct gp_query *q = NULL; char dummy = 0; int ret; while (!t->pool->shutdown) { /* initialize debug client id to 0 until work is scheduled */ gp_debug_set_conn_id(0); /* ======> COND_MUTEX */ pthread_mutex_lock(&t->cond_mutex); while (t->query == NULL) { /* wait for next query */ pthread_cond_wait(&t->cond_wakeup, &t->cond_mutex); if (t->pool->shutdown) { pthread_exit(NULL); } } /* grab the query off the shared pointer */ q = t->query; t->query = NULL; /* <====== COND_MUTEX */ pthread_mutex_unlock(&t->cond_mutex); /* set client id before hndling requests */ gp_debug_set_conn_id(gp_conn_get_cid(q->conn)); /* handle the client request */ GPDEBUGN(3, "[status] Handling query input: %p (%zu)\n", q->buffer, q->buflen); gp_handle_query(t->pool, q); GPDEBUGN(3 ,"[status] Handling query output: %p (%zu)\n", q->buffer, q->buflen); /* now get lock on main queue, to play with the reply list */ /* ======> POOL LOCK */ pthread_mutex_lock(&t->pool->lock); /* put back query so that dispatcher can send reply */ q->next = t->pool->reply_list; t->pool->reply_list = q; /* add us back to the free list but only if we are not * shutting down */ if (!t->pool->shutdown) { LIST_DEL(t->pool->busy_list, t); LIST_ADD(t->pool->free_list, t); } /* <====== POOL LOCK */ pthread_mutex_unlock(&t->pool->lock); /* and wake up dispatcher so it will handle it */ ret = write(t->pool->sig_pipe[1], &dummy, 1); if (ret == -1) { GPERROR("Failed to signal dispatcher!"); } } pthread_exit(NULL); } static void gp_handle_query(struct gp_workers *w, struct gp_query *q) { struct gp_call_ctx gpcall = { 0 }; uint8_t *buffer; size_t buflen; int ret; /* find service */ gpcall.gpctx = w->gpctx; gpcall.service = gp_creds_match_conn(w->gpctx, q->conn); if (!gpcall.service) { q->status = GP_QUERY_ERR; return; } gpcall.connection = q->conn; ret = gp_rpc_process_call(&gpcall, q->buffer, q->buflen, &buffer, &buflen); if (ret) { q->status = GP_QUERY_ERR; } else { q->status = GP_QUERY_OUT; free(q->buffer); q->buffer = buffer; q->buflen = buflen; } if (gpcall.destroy_callback) { gpcall.destroy_callback(gpcall.destroy_callback_data); } }