|
Packit Service |
9f2c4a |
/* Copyright (C) 2011 the GSS-PROXY contributors, see COPYING for license */
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
#include "config.h"
|
|
Packit Service |
9f2c4a |
#include <pthread.h>
|
|
Packit Service |
9f2c4a |
#include <stdint.h>
|
|
Packit Service |
9f2c4a |
#include <stdlib.h>
|
|
Packit Service |
9f2c4a |
#include <unistd.h>
|
|
Packit Service |
9f2c4a |
#include <string.h>
|
|
Packit Service |
9f2c4a |
#include <fcntl.h>
|
|
Packit Service |
9f2c4a |
#include <errno.h>
|
|
Packit Service |
9f2c4a |
#include "gp_proxy.h"
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
#define DEFAULT_WORKER_THREADS_NUM 5
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
#define GP_QUERY_IN 0
|
|
Packit Service |
9f2c4a |
#define GP_QUERY_OUT 1
|
|
Packit Service |
9f2c4a |
#define GP_QUERY_ERR 2
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
struct gp_query {
|
|
Packit Service |
9f2c4a |
struct gp_query *next;
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
struct gp_conn *conn;
|
|
Packit Service |
9f2c4a |
uint8_t *buffer;
|
|
Packit Service |
9f2c4a |
size_t buflen;
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
int status;
|
|
Packit Service |
9f2c4a |
};
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
struct gp_thread {
|
|
Packit Service |
9f2c4a |
struct gp_thread *prev;
|
|
Packit Service |
9f2c4a |
struct gp_thread *next;
|
|
Packit Service |
9f2c4a |
struct gp_workers *pool;
|
|
Packit Service |
9f2c4a |
pthread_t tid;
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
struct gp_query *query;
|
|
Packit Service |
9f2c4a |
pthread_mutex_t cond_mutex;
|
|
Packit Service |
9f2c4a |
pthread_cond_t cond_wakeup;
|
|
Packit Service |
9f2c4a |
};
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
struct gp_workers {
|
|
Packit Service |
9f2c4a |
pthread_mutex_t lock;
|
|
Packit Service |
9f2c4a |
struct gssproxy_ctx *gpctx;
|
|
Packit Service |
9f2c4a |
bool shutdown;
|
|
Packit Service |
9f2c4a |
struct gp_query *wait_list;
|
|
Packit Service |
9f2c4a |
struct gp_query *reply_list;
|
|
Packit Service |
9f2c4a |
struct gp_thread *free_list;
|
|
Packit Service |
9f2c4a |
struct gp_thread *busy_list;
|
|
Packit Service |
9f2c4a |
int num_threads;
|
|
Packit Service |
9f2c4a |
int sig_pipe[2];
|
|
Packit Service |
9f2c4a |
};
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
static void *gp_worker_main(void *pvt);
|
|
Packit Service |
9f2c4a |
static void gp_handle_query(struct gp_workers *w, struct gp_query *q);
|
|
Packit Service |
9f2c4a |
static void gp_handle_reply(verto_ctx *vctx, verto_ev *ev);
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
/** DISPATCHER FUNCTIONS **/
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
int gp_workers_init(struct gssproxy_ctx *gpctx)
|
|
Packit Service |
9f2c4a |
{
|
|
Packit Service |
9f2c4a |
struct gp_workers *w;
|
|
Packit Service |
9f2c4a |
struct gp_thread *t;
|
|
Packit Service |
9f2c4a |
pthread_attr_t attr;
|
|
Packit Service |
9f2c4a |
verto_ev *ev;
|
|
Packit Service |
9f2c4a |
int vflags;
|
|
Packit Service |
9f2c4a |
int ret;
|
|
Packit Service |
9f2c4a |
int i;
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
w = calloc(1, sizeof(struct gp_workers));
|
|
Packit Service |
9f2c4a |
if (!w) {
|
|
Packit Service |
9f2c4a |
return ENOMEM;
|
|
Packit Service |
9f2c4a |
}
|
|
Packit Service |
9f2c4a |
w->gpctx = gpctx;
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
/* init global queue mutex */
|
|
Packit Service |
9f2c4a |
ret = pthread_mutex_init(&w->lock, NULL);
|
|
Packit Service |
9f2c4a |
if (ret) {
|
|
Packit Service |
9f2c4a |
free(w);
|
|
Packit Service |
9f2c4a |
return ENOMEM;
|
|
Packit Service |
9f2c4a |
}
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
if (gpctx->config->num_workers > 0) {
|
|
Packit Service |
9f2c4a |
w->num_threads = gpctx->config->num_workers;
|
|
Packit Service |
9f2c4a |
} else {
|
|
Packit Service |
9f2c4a |
w->num_threads = DEFAULT_WORKER_THREADS_NUM;
|
|
Packit Service |
9f2c4a |
}
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
/* make thread joinable (portability) */
|
|
Packit Service |
9f2c4a |
pthread_attr_init(&attr);
|
|
Packit Service |
9f2c4a |
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
/* init all workers */
|
|
Packit Service |
9f2c4a |
for (i = 0; i < w->num_threads; i++) {
|
|
Packit Service |
9f2c4a |
t = calloc(1, sizeof(struct gp_thread));
|
|
Packit Service |
9f2c4a |
if (!t) {
|
|
Packit Service |
9f2c4a |
ret = -1;
|
|
Packit Service |
9f2c4a |
goto done;
|
|
Packit Service |
9f2c4a |
}
|
|
Packit Service |
9f2c4a |
t->pool = w;
|
|
Packit Service |
9f2c4a |
ret = pthread_cond_init(&t->cond_wakeup, NULL);
|
|
Packit Service |
9f2c4a |
if (ret) {
|
|
Packit Service |
9f2c4a |
free(t);
|
|
Packit Service |
9f2c4a |
goto done;
|
|
Packit Service |
9f2c4a |
}
|
|
Packit Service |
9f2c4a |
ret = pthread_mutex_init(&t->cond_mutex, NULL);
|
|
Packit Service |
9f2c4a |
if (ret) {
|
|
Packit Service |
9f2c4a |
free(t);
|
|
Packit Service |
9f2c4a |
goto done;
|
|
Packit Service |
9f2c4a |
}
|
|
Packit Service |
9f2c4a |
ret = pthread_create(&t->tid, &attr, gp_worker_main, t);
|
|
Packit Service |
9f2c4a |
if (ret) {
|
|
Packit Service |
9f2c4a |
free(t);
|
|
Packit Service |
9f2c4a |
goto done;
|
|
Packit Service |
9f2c4a |
}
|
|
Packit Service |
9f2c4a |
LIST_ADD(w->free_list, t);
|
|
Packit Service |
9f2c4a |
}
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
/* add wakeup pipe, so that threads can hand back replies to the
|
|
Packit Service |
9f2c4a |
* dispatcher */
|
|
Packit Service |
9f2c4a |
ret = pipe2(w->sig_pipe, O_NONBLOCK | O_CLOEXEC);
|
|
Packit Service |
9f2c4a |
if (ret == -1) {
|
|
Packit Service |
9f2c4a |
goto done;
|
|
Packit Service |
9f2c4a |
}
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
vflags = VERTO_EV_FLAG_PERSIST | VERTO_EV_FLAG_IO_READ;
|
|
Packit Service |
9f2c4a |
ev = verto_add_io(gpctx->vctx, vflags, gp_handle_reply, w->sig_pipe[0]);
|
|
Packit Service |
9f2c4a |
if (!ev) {
|
|
Packit Service |
9f2c4a |
ret = -1;
|
|
Packit Service |
9f2c4a |
goto done;
|
|
Packit Service |
9f2c4a |
}
|
|
Packit Service |
9f2c4a |
verto_set_private(ev, w, NULL);
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
gpctx->workers = w;
|
|
Packit Service |
9f2c4a |
ret = 0;
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
done:
|
|
Packit Service |
9f2c4a |
if (ret) {
|
|
Packit Service |
9f2c4a |
gp_workers_free(w);
|
|
Packit Service |
9f2c4a |
}
|
|
Packit Service |
9f2c4a |
return ret;
|
|
Packit Service |
9f2c4a |
}
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
void gp_workers_free(struct gp_workers *w)
|
|
Packit Service |
9f2c4a |
{
|
|
Packit Service |
9f2c4a |
struct gp_thread *t;
|
|
Packit Service |
9f2c4a |
void *retval;
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
/* ======> POOL LOCK */
|
|
Packit Service |
9f2c4a |
pthread_mutex_lock(&w->lock);
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
w->shutdown = true;
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
/* <====== POOL LOCK */
|
|
Packit Service |
9f2c4a |
pthread_mutex_unlock(&w->lock);
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
/* we do not run the following operations within
|
|
Packit Service |
9f2c4a |
* the lock, or deadlocks may arise for threads
|
|
Packit Service |
9f2c4a |
* that are just finishing doing some work */
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
/* we guarantee nobody is touching these lists by
|
|
Packit Service |
9f2c4a |
* preventing workers from touching the free/busy
|
|
Packit Service |
9f2c4a |
* lists when a 'shutdown' is in progress */
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
while (w->free_list) {
|
|
Packit Service |
9f2c4a |
/* pick threads one by one */
|
|
Packit Service |
9f2c4a |
t = w->free_list;
|
|
Packit Service |
9f2c4a |
LIST_DEL(w->free_list, t);
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
/* wake up threads, then join them */
|
|
Packit Service |
9f2c4a |
/* ======> COND_MUTEX */
|
|
Packit Service |
9f2c4a |
pthread_mutex_lock(&t->cond_mutex);
|
|
Packit Service |
9f2c4a |
pthread_cond_signal(&t->cond_wakeup);
|
|
Packit Service |
9f2c4a |
/* <====== COND_MUTEX */
|
|
Packit Service |
9f2c4a |
pthread_mutex_unlock(&t->cond_mutex);
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
pthread_join(t->tid, &retval);
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
pthread_mutex_destroy(&t->cond_mutex);
|
|
Packit Service |
9f2c4a |
pthread_cond_destroy(&t->cond_wakeup);
|
|
Packit Service |
9f2c4a |
free(t);
|
|
Packit Service |
9f2c4a |
}
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
/* do the same with the busy list */
|
|
Packit Service |
9f2c4a |
while (w->busy_list) {
|
|
Packit Service |
9f2c4a |
/* pick threads one by one */
|
|
Packit Service |
9f2c4a |
t = w->busy_list;
|
|
Packit Service |
9f2c4a |
LIST_DEL(w->free_list, t);
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
/* wake up threads, then join them */
|
|
Packit Service |
9f2c4a |
/* ======> COND_MUTEX */
|
|
Packit Service |
9f2c4a |
pthread_mutex_lock(&t->cond_mutex);
|
|
Packit Service |
9f2c4a |
pthread_cond_signal(&t->cond_wakeup);
|
|
Packit Service |
9f2c4a |
/* <====== COND_MUTEX */
|
|
Packit Service |
9f2c4a |
pthread_mutex_unlock(&t->cond_mutex);
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
pthread_join(t->tid, &retval);
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
pthread_mutex_destroy(&t->cond_mutex);
|
|
Packit Service |
9f2c4a |
pthread_cond_destroy(&t->cond_wakeup);
|
|
Packit Service |
9f2c4a |
free(t);
|
|
Packit Service |
9f2c4a |
}
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
close(w->sig_pipe[0]);
|
|
Packit Service |
9f2c4a |
close(w->sig_pipe[1]);
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
pthread_mutex_destroy(&w->lock);
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
free(w);
|
|
Packit Service |
9f2c4a |
}
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
static void gp_query_assign(struct gp_workers *w, struct gp_query *q)
|
|
Packit Service |
9f2c4a |
{
|
|
Packit Service |
9f2c4a |
struct gp_thread *t = NULL;
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
/* then either find a free thread or queue in the wait list */
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
/* ======> POOL LOCK */
|
|
Packit Service |
9f2c4a |
pthread_mutex_lock(&w->lock);
|
|
Packit Service |
9f2c4a |
if (w->free_list) {
|
|
Packit Service |
9f2c4a |
t = w->free_list;
|
|
Packit Service |
9f2c4a |
LIST_DEL(w->free_list, t);
|
|
Packit Service |
9f2c4a |
LIST_ADD(w->busy_list, t);
|
|
Packit Service |
9f2c4a |
}
|
|
Packit Service |
9f2c4a |
/* <====== POOL LOCK */
|
|
Packit Service |
9f2c4a |
pthread_mutex_unlock(&w->lock);
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
if (t) {
|
|
Packit Service |
9f2c4a |
/* found free thread, assign work */
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
/* ======> COND_MUTEX */
|
|
Packit Service |
9f2c4a |
pthread_mutex_lock(&t->cond_mutex);
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
/* hand over the query */
|
|
Packit Service |
9f2c4a |
t->query = q;
|
|
Packit Service |
9f2c4a |
pthread_cond_signal(&t->cond_wakeup);
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
/* <====== COND_MUTEX */
|
|
Packit Service |
9f2c4a |
pthread_mutex_unlock(&t->cond_mutex);
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
} else {
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
/* all threads are busy, store in wait list */
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
/* only the dispatcher handles wait_list
|
|
Packit Service |
9f2c4a |
* so we do not need to lock around it */
|
|
Packit Service |
9f2c4a |
q->next = w->wait_list;
|
|
Packit Service |
9f2c4a |
w->wait_list = q;
|
|
Packit Service |
9f2c4a |
}
|
|
Packit Service |
9f2c4a |
}
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
static void gp_query_free(struct gp_query *q, bool free_buffer)
|
|
Packit Service |
9f2c4a |
{
|
|
Packit Service |
9f2c4a |
if (!q) {
|
|
Packit Service |
9f2c4a |
return;
|
|
Packit Service |
9f2c4a |
}
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
if (free_buffer) {
|
|
Packit Service |
9f2c4a |
free(q->buffer);
|
|
Packit Service |
9f2c4a |
}
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
free(q);
|
|
Packit Service |
9f2c4a |
}
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
int gp_query_new(struct gp_workers *w, struct gp_conn *conn,
|
|
Packit Service |
9f2c4a |
uint8_t *buffer, size_t buflen)
|
|
Packit Service |
9f2c4a |
{
|
|
Packit Service |
9f2c4a |
struct gp_query *q;
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
/* create query struct */
|
|
Packit Service |
9f2c4a |
q = calloc(1, sizeof(struct gp_query));
|
|
Packit Service |
9f2c4a |
if (!q) {
|
|
Packit Service |
9f2c4a |
return ENOMEM;
|
|
Packit Service |
9f2c4a |
}
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
q->conn = conn;
|
|
Packit Service |
9f2c4a |
q->buffer = buffer;
|
|
Packit Service |
9f2c4a |
q->buflen = buflen;
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
gp_query_assign(w, q);
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
return 0;
|
|
Packit Service |
9f2c4a |
}
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
static void gp_handle_reply(verto_ctx *vctx, verto_ev *ev)
|
|
Packit Service |
9f2c4a |
{
|
|
Packit Service |
9f2c4a |
struct gp_workers *w;
|
|
Packit Service |
9f2c4a |
struct gp_query *q = NULL;
|
|
Packit Service |
9f2c4a |
char dummy;
|
|
Packit Service |
9f2c4a |
int ret;
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
w = verto_get_private(ev);
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
/* first read out the dummy so the pipe doesn't get clogged */
|
|
Packit Service |
9f2c4a |
ret = read(w->sig_pipe[0], &dummy, 1);
|
|
Packit Service |
9f2c4a |
if (ret) {
|
|
Packit Service |
9f2c4a |
/* ignore errors */
|
|
Packit Service |
9f2c4a |
}
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
/* grab a query reply if any */
|
|
Packit Service |
9f2c4a |
if (w->reply_list) {
|
|
Packit Service |
9f2c4a |
/* ======> POOL LOCK */
|
|
Packit Service |
9f2c4a |
pthread_mutex_lock(&w->lock);
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
if (w->reply_list != NULL) {
|
|
Packit Service |
9f2c4a |
q = w->reply_list;
|
|
Packit Service |
9f2c4a |
w->reply_list = q->next;
|
|
Packit Service |
9f2c4a |
}
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
/* <====== POOL LOCK */
|
|
Packit Service |
9f2c4a |
pthread_mutex_unlock(&w->lock);
|
|
Packit Service |
9f2c4a |
}
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
if (q) {
|
|
Packit Service |
9f2c4a |
switch (q->status) {
|
|
Packit Service |
9f2c4a |
case GP_QUERY_IN:
|
|
Packit Service |
9f2c4a |
/* ?! fallback and kill client conn */
|
|
Packit Service |
9f2c4a |
case GP_QUERY_ERR:
|
|
Packit Service |
9f2c4a |
GPDEBUGN(3, "[status] Handling query error, terminating CID %d.\n",
|
|
Packit Service |
9f2c4a |
gp_conn_get_cid(q->conn));
|
|
Packit Service |
9f2c4a |
gp_conn_free(q->conn);
|
|
Packit Service |
9f2c4a |
gp_query_free(q, true);
|
|
Packit Service |
9f2c4a |
break;
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
case GP_QUERY_OUT:
|
|
Packit Service |
9f2c4a |
GPDEBUGN(3, "[status] Handling query reply: %p (%zu)\n", q->buffer, q->buflen);
|
|
Packit Service |
9f2c4a |
gp_socket_send_data(vctx, q->conn, q->buffer, q->buflen);
|
|
Packit Service |
9f2c4a |
gp_query_free(q, false);
|
|
Packit Service |
9f2c4a |
break;
|
|
Packit Service |
9f2c4a |
}
|
|
Packit Service |
9f2c4a |
}
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
/* while we are at it, check if there is anything in the wait list
|
|
Packit Service |
9f2c4a |
* we need to process, as one thread just got free :-) */
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
q = NULL;
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
if (w->wait_list) {
|
|
Packit Service |
9f2c4a |
/* only the dispatcher handles wait_list
|
|
Packit Service |
9f2c4a |
* so we do not need to lock around it */
|
|
Packit Service |
9f2c4a |
if (w->wait_list) {
|
|
Packit Service |
9f2c4a |
q = w->wait_list;
|
|
Packit Service |
9f2c4a |
w->wait_list = q->next;
|
|
Packit Service |
9f2c4a |
q->next = NULL;
|
|
Packit Service |
9f2c4a |
}
|
|
Packit Service |
9f2c4a |
}
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
if (q) {
|
|
Packit Service |
9f2c4a |
gp_query_assign(w, q);
|
|
Packit Service |
9f2c4a |
}
|
|
Packit Service |
9f2c4a |
}
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
/** WORKER THREADS **/
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
static void *gp_worker_main(void *pvt)
|
|
Packit Service |
9f2c4a |
{
|
|
Packit Service |
9f2c4a |
struct gp_thread *t = (struct gp_thread *)pvt;
|
|
Packit Service |
9f2c4a |
struct gp_query *q = NULL;
|
|
Packit Service |
9f2c4a |
char dummy = 0;
|
|
Packit Service |
9f2c4a |
int ret;
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
while (!t->pool->shutdown) {
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
/* initialize debug client id to 0 until work is scheduled */
|
|
Packit Service |
9f2c4a |
gp_debug_set_conn_id(0);
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
/* ======> COND_MUTEX */
|
|
Packit Service |
9f2c4a |
pthread_mutex_lock(&t->cond_mutex);
|
|
Packit Service |
9f2c4a |
while (t->query == NULL) {
|
|
Packit Service |
9f2c4a |
/* wait for next query */
|
|
Packit Service |
9f2c4a |
pthread_cond_wait(&t->cond_wakeup, &t->cond_mutex);
|
|
Packit Service |
9f2c4a |
if (t->pool->shutdown) {
|
|
Packit Service |
9f2c4a |
pthread_exit(NULL);
|
|
Packit Service |
9f2c4a |
}
|
|
Packit Service |
9f2c4a |
}
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
/* grab the query off the shared pointer */
|
|
Packit Service |
9f2c4a |
q = t->query;
|
|
Packit Service |
9f2c4a |
t->query = NULL;
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
/* <====== COND_MUTEX */
|
|
Packit Service |
9f2c4a |
pthread_mutex_unlock(&t->cond_mutex);
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
/* set client id before hndling requests */
|
|
Packit Service |
9f2c4a |
gp_debug_set_conn_id(gp_conn_get_cid(q->conn));
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
/* handle the client request */
|
|
Packit Service |
9f2c4a |
GPDEBUGN(3, "[status] Handling query input: %p (%zu)\n", q->buffer,
|
|
Packit Service |
9f2c4a |
q->buflen);
|
|
Packit Service |
9f2c4a |
gp_handle_query(t->pool, q);
|
|
Packit Service |
9f2c4a |
GPDEBUGN(3 ,"[status] Handling query output: %p (%zu)\n", q->buffer,
|
|
Packit Service |
9f2c4a |
q->buflen);
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
/* now get lock on main queue, to play with the reply list */
|
|
Packit Service |
9f2c4a |
/* ======> POOL LOCK */
|
|
Packit Service |
9f2c4a |
pthread_mutex_lock(&t->pool->lock);
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
/* put back query so that dispatcher can send reply */
|
|
Packit Service |
9f2c4a |
q->next = t->pool->reply_list;
|
|
Packit Service |
9f2c4a |
t->pool->reply_list = q;
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
/* add us back to the free list but only if we are not
|
|
Packit Service |
9f2c4a |
* shutting down */
|
|
Packit Service |
9f2c4a |
if (!t->pool->shutdown) {
|
|
Packit Service |
9f2c4a |
LIST_DEL(t->pool->busy_list, t);
|
|
Packit Service |
9f2c4a |
LIST_ADD(t->pool->free_list, t);
|
|
Packit Service |
9f2c4a |
}
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
/* <====== POOL LOCK */
|
|
Packit Service |
9f2c4a |
pthread_mutex_unlock(&t->pool->lock);
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
/* and wake up dispatcher so it will handle it */
|
|
Packit Service |
9f2c4a |
ret = write(t->pool->sig_pipe[1], &dummy, 1);
|
|
Packit Service |
9f2c4a |
if (ret == -1) {
|
|
Packit Service |
9f2c4a |
GPERROR("Failed to signal dispatcher!");
|
|
Packit Service |
9f2c4a |
}
|
|
Packit Service |
9f2c4a |
}
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
pthread_exit(NULL);
|
|
Packit Service |
9f2c4a |
}
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
static void gp_handle_query(struct gp_workers *w, struct gp_query *q)
|
|
Packit Service |
9f2c4a |
{
|
|
Packit Service |
9f2c4a |
struct gp_call_ctx gpcall = { 0 };
|
|
Packit Service |
9f2c4a |
uint8_t *buffer;
|
|
Packit Service |
9f2c4a |
size_t buflen;
|
|
Packit Service |
9f2c4a |
int ret;
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
/* find service */
|
|
Packit Service |
9f2c4a |
gpcall.gpctx = w->gpctx;
|
|
Packit Service |
9f2c4a |
gpcall.service = gp_creds_match_conn(w->gpctx, q->conn);
|
|
Packit Service |
9f2c4a |
if (!gpcall.service) {
|
|
Packit Service |
9f2c4a |
q->status = GP_QUERY_ERR;
|
|
Packit Service |
9f2c4a |
return;
|
|
Packit Service |
9f2c4a |
}
|
|
Packit Service |
9f2c4a |
gpcall.connection = q->conn;
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
ret = gp_rpc_process_call(&gpcall,
|
|
Packit Service |
9f2c4a |
q->buffer, q->buflen,
|
|
Packit Service |
9f2c4a |
&buffer, &buflen);
|
|
Packit Service |
9f2c4a |
if (ret) {
|
|
Packit Service |
9f2c4a |
q->status = GP_QUERY_ERR;
|
|
Packit Service |
9f2c4a |
} else {
|
|
Packit Service |
9f2c4a |
q->status = GP_QUERY_OUT;
|
|
Packit Service |
9f2c4a |
free(q->buffer);
|
|
Packit Service |
9f2c4a |
q->buffer = buffer;
|
|
Packit Service |
9f2c4a |
q->buflen = buflen;
|
|
Packit Service |
9f2c4a |
}
|
|
Packit Service |
9f2c4a |
|
|
Packit Service |
9f2c4a |
if (gpcall.destroy_callback) {
|
|
Packit Service |
9f2c4a |
gpcall.destroy_callback(gpcall.destroy_callback_data);
|
|
Packit Service |
9f2c4a |
}
|
|
Packit Service |
9f2c4a |
}
|
|
Packit Service |
9f2c4a |
|