Blame src/gp_workers.c

Packit Service 9f2c4a
/* Copyright (C) 2011 the GSS-PROXY contributors, see COPYING for license */
Packit Service 9f2c4a
Packit Service 9f2c4a
#include "config.h"
Packit Service 9f2c4a
#include <pthread.h>
Packit Service 9f2c4a
#include <stdint.h>
Packit Service 9f2c4a
#include <stdlib.h>
Packit Service 9f2c4a
#include <unistd.h>
Packit Service 9f2c4a
#include <string.h>
Packit Service 9f2c4a
#include <fcntl.h>
Packit Service 9f2c4a
#include <errno.h>
Packit Service 9f2c4a
#include "gp_proxy.h"
Packit Service 9f2c4a
Packit Service 9f2c4a
#define DEFAULT_WORKER_THREADS_NUM 5
Packit Service 9f2c4a
Packit Service 9f2c4a
#define GP_QUERY_IN 0
Packit Service 9f2c4a
#define GP_QUERY_OUT 1
Packit Service 9f2c4a
#define GP_QUERY_ERR 2
Packit Service 9f2c4a
Packit Service 9f2c4a
struct gp_query {
Packit Service 9f2c4a
    struct gp_query *next;
Packit Service 9f2c4a
Packit Service 9f2c4a
    struct gp_conn *conn;
Packit Service 9f2c4a
    uint8_t *buffer;
Packit Service 9f2c4a
    size_t buflen;
Packit Service 9f2c4a
Packit Service 9f2c4a
    int status;
Packit Service 9f2c4a
};
Packit Service 9f2c4a
Packit Service 9f2c4a
struct gp_thread {
Packit Service 9f2c4a
    struct gp_thread *prev;
Packit Service 9f2c4a
    struct gp_thread *next;
Packit Service 9f2c4a
    struct gp_workers *pool;
Packit Service 9f2c4a
    pthread_t tid;
Packit Service 9f2c4a
Packit Service 9f2c4a
    struct gp_query *query;
Packit Service 9f2c4a
    pthread_mutex_t cond_mutex;
Packit Service 9f2c4a
    pthread_cond_t cond_wakeup;
Packit Service 9f2c4a
};
Packit Service 9f2c4a
Packit Service 9f2c4a
struct gp_workers {
Packit Service 9f2c4a
    pthread_mutex_t lock;
Packit Service 9f2c4a
    struct gssproxy_ctx *gpctx;
Packit Service 9f2c4a
    bool shutdown;
Packit Service 9f2c4a
    struct gp_query *wait_list;
Packit Service 9f2c4a
    struct gp_query *reply_list;
Packit Service 9f2c4a
    struct gp_thread *free_list;
Packit Service 9f2c4a
    struct gp_thread *busy_list;
Packit Service 9f2c4a
    int num_threads;
Packit Service 9f2c4a
    int sig_pipe[2];
Packit Service 9f2c4a
};
Packit Service 9f2c4a
Packit Service 9f2c4a
static void *gp_worker_main(void *pvt);
Packit Service 9f2c4a
static void gp_handle_query(struct gp_workers *w, struct gp_query *q);
Packit Service 9f2c4a
static void gp_handle_reply(verto_ctx *vctx, verto_ev *ev);
Packit Service 9f2c4a
Packit Service 9f2c4a
/** DISPATCHER FUNCTIONS **/
Packit Service 9f2c4a
Packit Service 9f2c4a
int gp_workers_init(struct gssproxy_ctx *gpctx)
Packit Service 9f2c4a
{
Packit Service 9f2c4a
    struct gp_workers *w;
Packit Service 9f2c4a
    struct gp_thread *t;
Packit Service 9f2c4a
    pthread_attr_t attr;
Packit Service 9f2c4a
    verto_ev *ev;
Packit Service 9f2c4a
    int vflags;
Packit Service 9f2c4a
    int ret;
Packit Service 9f2c4a
    int i;
Packit Service 9f2c4a
Packit Service 9f2c4a
    w = calloc(1, sizeof(struct gp_workers));
Packit Service 9f2c4a
    if (!w) {
Packit Service 9f2c4a
        return ENOMEM;
Packit Service 9f2c4a
    }
Packit Service 9f2c4a
    w->gpctx = gpctx;
Packit Service 9f2c4a
Packit Service 9f2c4a
    /* init global queue mutex */
Packit Service 9f2c4a
    ret = pthread_mutex_init(&w->lock, NULL);
Packit Service 9f2c4a
    if (ret) {
Packit Service 9f2c4a
        free(w);
Packit Service 9f2c4a
        return ENOMEM;
Packit Service 9f2c4a
    }
Packit Service 9f2c4a
Packit Service 9f2c4a
    if (gpctx->config->num_workers > 0) {
Packit Service 9f2c4a
        w->num_threads = gpctx->config->num_workers;
Packit Service 9f2c4a
    } else {
Packit Service 9f2c4a
        w->num_threads = DEFAULT_WORKER_THREADS_NUM;
Packit Service 9f2c4a
    }
Packit Service 9f2c4a
Packit Service 9f2c4a
    /* make thread joinable (portability) */
Packit Service 9f2c4a
    pthread_attr_init(&attr);
Packit Service 9f2c4a
    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
Packit Service 9f2c4a
Packit Service 9f2c4a
    /* init all workers */
Packit Service 9f2c4a
    for (i = 0; i < w->num_threads; i++) {
Packit Service 9f2c4a
        t = calloc(1, sizeof(struct gp_thread));
Packit Service 9f2c4a
        if (!t) {
Packit Service 9f2c4a
            ret = -1;
Packit Service 9f2c4a
            goto done;
Packit Service 9f2c4a
        }
Packit Service 9f2c4a
        t->pool = w;
Packit Service 9f2c4a
        ret = pthread_cond_init(&t->cond_wakeup, NULL);
Packit Service 9f2c4a
        if (ret) {
Packit Service 9f2c4a
            free(t);
Packit Service 9f2c4a
            goto done;
Packit Service 9f2c4a
        }
Packit Service 9f2c4a
        ret = pthread_mutex_init(&t->cond_mutex, NULL);
Packit Service 9f2c4a
        if (ret) {
Packit Service 9f2c4a
            free(t);
Packit Service 9f2c4a
            goto done;
Packit Service 9f2c4a
        }
Packit Service 9f2c4a
        ret = pthread_create(&t->tid, &attr, gp_worker_main, t);
Packit Service 9f2c4a
        if (ret) {
Packit Service 9f2c4a
            free(t);
Packit Service 9f2c4a
            goto done;
Packit Service 9f2c4a
        }
Packit Service 9f2c4a
        LIST_ADD(w->free_list, t);
Packit Service 9f2c4a
    }
Packit Service 9f2c4a
Packit Service 9f2c4a
    /* add wakeup pipe, so that threads can hand back replies to the
Packit Service 9f2c4a
     * dispatcher */
Packit Service 9f2c4a
    ret = pipe2(w->sig_pipe, O_NONBLOCK | O_CLOEXEC);
Packit Service 9f2c4a
    if (ret == -1) {
Packit Service 9f2c4a
        goto done;
Packit Service 9f2c4a
    }
Packit Service 9f2c4a
Packit Service 9f2c4a
    vflags = VERTO_EV_FLAG_PERSIST | VERTO_EV_FLAG_IO_READ;
Packit Service 9f2c4a
    ev = verto_add_io(gpctx->vctx, vflags, gp_handle_reply, w->sig_pipe[0]);
Packit Service 9f2c4a
    if (!ev) {
Packit Service 9f2c4a
        ret = -1;
Packit Service 9f2c4a
        goto done;
Packit Service 9f2c4a
    }
Packit Service 9f2c4a
    verto_set_private(ev, w, NULL);
Packit Service 9f2c4a
Packit Service 9f2c4a
    gpctx->workers = w;
Packit Service 9f2c4a
    ret = 0;
Packit Service 9f2c4a
Packit Service 9f2c4a
done:
Packit Service 9f2c4a
    if (ret) {
Packit Service 9f2c4a
        gp_workers_free(w);
Packit Service 9f2c4a
    }
Packit Service 9f2c4a
    return ret;
Packit Service 9f2c4a
}
Packit Service 9f2c4a
Packit Service 9f2c4a
void gp_workers_free(struct gp_workers *w)
Packit Service 9f2c4a
{
Packit Service 9f2c4a
    struct gp_thread *t;
Packit Service 9f2c4a
    void *retval;
Packit Service 9f2c4a
Packit Service 9f2c4a
    /* ======> POOL LOCK */
Packit Service 9f2c4a
    pthread_mutex_lock(&w->lock);
Packit Service 9f2c4a
Packit Service 9f2c4a
    w->shutdown = true;
Packit Service 9f2c4a
Packit Service 9f2c4a
    /* <====== POOL LOCK */
Packit Service 9f2c4a
    pthread_mutex_unlock(&w->lock);
Packit Service 9f2c4a
Packit Service 9f2c4a
    /* we do not run the following operations within
Packit Service 9f2c4a
     * the lock, or deadlocks may arise for threads
Packit Service 9f2c4a
     * that are just finishing doing some work */
Packit Service 9f2c4a
Packit Service 9f2c4a
    /* we guarantee nobody is touching these lists by
Packit Service 9f2c4a
     * preventing workers from touching the free/busy
Packit Service 9f2c4a
     * lists when a 'shutdown' is in progress */
Packit Service 9f2c4a
Packit Service 9f2c4a
    while (w->free_list) {
Packit Service 9f2c4a
        /* pick threads one by one */
Packit Service 9f2c4a
        t = w->free_list;
Packit Service 9f2c4a
        LIST_DEL(w->free_list, t);
Packit Service 9f2c4a
Packit Service 9f2c4a
        /* wake up threads, then join them */
Packit Service 9f2c4a
        /* ======> COND_MUTEX */
Packit Service 9f2c4a
        pthread_mutex_lock(&t->cond_mutex);
Packit Service 9f2c4a
        pthread_cond_signal(&t->cond_wakeup);
Packit Service 9f2c4a
        /* <====== COND_MUTEX */
Packit Service 9f2c4a
        pthread_mutex_unlock(&t->cond_mutex);
Packit Service 9f2c4a
Packit Service 9f2c4a
        pthread_join(t->tid, &retval);
Packit Service 9f2c4a
Packit Service 9f2c4a
        pthread_mutex_destroy(&t->cond_mutex);
Packit Service 9f2c4a
        pthread_cond_destroy(&t->cond_wakeup);
Packit Service 9f2c4a
        free(t);
Packit Service 9f2c4a
    }
Packit Service 9f2c4a
Packit Service 9f2c4a
    /* do the same with the busy list */
Packit Service 9f2c4a
    while (w->busy_list) {
Packit Service 9f2c4a
        /* pick threads one by one */
Packit Service 9f2c4a
        t = w->busy_list;
Packit Service 9f2c4a
        LIST_DEL(w->free_list, t);
Packit Service 9f2c4a
Packit Service 9f2c4a
        /* wake up threads, then join them */
Packit Service 9f2c4a
        /* ======> COND_MUTEX */
Packit Service 9f2c4a
        pthread_mutex_lock(&t->cond_mutex);
Packit Service 9f2c4a
        pthread_cond_signal(&t->cond_wakeup);
Packit Service 9f2c4a
        /* <====== COND_MUTEX */
Packit Service 9f2c4a
        pthread_mutex_unlock(&t->cond_mutex);
Packit Service 9f2c4a
Packit Service 9f2c4a
        pthread_join(t->tid, &retval);
Packit Service 9f2c4a
Packit Service 9f2c4a
        pthread_mutex_destroy(&t->cond_mutex);
Packit Service 9f2c4a
        pthread_cond_destroy(&t->cond_wakeup);
Packit Service 9f2c4a
        free(t);
Packit Service 9f2c4a
    }
Packit Service 9f2c4a
Packit Service 9f2c4a
    close(w->sig_pipe[0]);
Packit Service 9f2c4a
    close(w->sig_pipe[1]);
Packit Service 9f2c4a
Packit Service 9f2c4a
    pthread_mutex_destroy(&w->lock);
Packit Service 9f2c4a
Packit Service 9f2c4a
    free(w);
Packit Service 9f2c4a
}
Packit Service 9f2c4a
Packit Service 9f2c4a
static void gp_query_assign(struct gp_workers *w, struct gp_query *q)
Packit Service 9f2c4a
{
Packit Service 9f2c4a
    struct gp_thread *t = NULL;
Packit Service 9f2c4a
Packit Service 9f2c4a
    /* then either find a free thread or queue in the wait list */
Packit Service 9f2c4a
Packit Service 9f2c4a
    /* ======> POOL LOCK */
Packit Service 9f2c4a
    pthread_mutex_lock(&w->lock);
Packit Service 9f2c4a
    if (w->free_list) {
Packit Service 9f2c4a
        t = w->free_list;
Packit Service 9f2c4a
        LIST_DEL(w->free_list, t);
Packit Service 9f2c4a
        LIST_ADD(w->busy_list, t);
Packit Service 9f2c4a
    }
Packit Service 9f2c4a
    /* <====== POOL LOCK */
Packit Service 9f2c4a
    pthread_mutex_unlock(&w->lock);
Packit Service 9f2c4a
Packit Service 9f2c4a
    if (t) {
Packit Service 9f2c4a
        /* found free thread, assign work */
Packit Service 9f2c4a
Packit Service 9f2c4a
        /* ======> COND_MUTEX */
Packit Service 9f2c4a
        pthread_mutex_lock(&t->cond_mutex);
Packit Service 9f2c4a
Packit Service 9f2c4a
        /* hand over the query */
Packit Service 9f2c4a
        t->query = q;
Packit Service 9f2c4a
        pthread_cond_signal(&t->cond_wakeup);
Packit Service 9f2c4a
Packit Service 9f2c4a
        /* <====== COND_MUTEX */
Packit Service 9f2c4a
        pthread_mutex_unlock(&t->cond_mutex);
Packit Service 9f2c4a
Packit Service 9f2c4a
    } else {
Packit Service 9f2c4a
Packit Service 9f2c4a
        /* all threads are busy, store in wait list */
Packit Service 9f2c4a
Packit Service 9f2c4a
        /* only the dispatcher handles wait_list
Packit Service 9f2c4a
        *  so we do not need to lock around it */
Packit Service 9f2c4a
        q->next = w->wait_list;
Packit Service 9f2c4a
        w->wait_list = q;
Packit Service 9f2c4a
    }
Packit Service 9f2c4a
}
Packit Service 9f2c4a
Packit Service 9f2c4a
static void gp_query_free(struct gp_query *q, bool free_buffer)
Packit Service 9f2c4a
{
Packit Service 9f2c4a
    if (!q) {
Packit Service 9f2c4a
        return;
Packit Service 9f2c4a
    }
Packit Service 9f2c4a
Packit Service 9f2c4a
    if (free_buffer) {
Packit Service 9f2c4a
        free(q->buffer);
Packit Service 9f2c4a
    }
Packit Service 9f2c4a
Packit Service 9f2c4a
    free(q);
Packit Service 9f2c4a
}
Packit Service 9f2c4a
Packit Service 9f2c4a
int gp_query_new(struct gp_workers *w, struct gp_conn *conn,
Packit Service 9f2c4a
                 uint8_t *buffer, size_t buflen)
Packit Service 9f2c4a
{
Packit Service 9f2c4a
    struct gp_query *q;
Packit Service 9f2c4a
Packit Service 9f2c4a
    /* create query struct */
Packit Service 9f2c4a
    q = calloc(1, sizeof(struct gp_query));
Packit Service 9f2c4a
    if (!q) {
Packit Service 9f2c4a
        return ENOMEM;
Packit Service 9f2c4a
    }
Packit Service 9f2c4a
Packit Service 9f2c4a
    q->conn = conn;
Packit Service 9f2c4a
    q->buffer = buffer;
Packit Service 9f2c4a
    q->buflen = buflen;
Packit Service 9f2c4a
Packit Service 9f2c4a
    gp_query_assign(w, q);
Packit Service 9f2c4a
Packit Service 9f2c4a
    return 0;
Packit Service 9f2c4a
}
Packit Service 9f2c4a
Packit Service 9f2c4a
static void gp_handle_reply(verto_ctx *vctx, verto_ev *ev)
Packit Service 9f2c4a
{
Packit Service 9f2c4a
    struct gp_workers *w;
Packit Service 9f2c4a
    struct gp_query *q = NULL;
Packit Service 9f2c4a
    char dummy;
Packit Service 9f2c4a
    int ret;
Packit Service 9f2c4a
Packit Service 9f2c4a
    w = verto_get_private(ev);
Packit Service 9f2c4a
Packit Service 9f2c4a
    /* first read out the dummy so the pipe doesn't get clogged */
Packit Service 9f2c4a
    ret = read(w->sig_pipe[0], &dummy, 1);
Packit Service 9f2c4a
    if (ret) {
Packit Service 9f2c4a
        /* ignore errors */
Packit Service 9f2c4a
    }
Packit Service 9f2c4a
Packit Service 9f2c4a
    /* grab a query reply if any */
Packit Service 9f2c4a
    if (w->reply_list) {
Packit Service 9f2c4a
        /* ======> POOL LOCK */
Packit Service 9f2c4a
        pthread_mutex_lock(&w->lock);
Packit Service 9f2c4a
Packit Service 9f2c4a
        if (w->reply_list != NULL) {
Packit Service 9f2c4a
            q = w->reply_list;
Packit Service 9f2c4a
            w->reply_list = q->next;
Packit Service 9f2c4a
        }
Packit Service 9f2c4a
Packit Service 9f2c4a
        /* <====== POOL LOCK */
Packit Service 9f2c4a
        pthread_mutex_unlock(&w->lock);
Packit Service 9f2c4a
    }
Packit Service 9f2c4a
Packit Service 9f2c4a
    if (q) {
Packit Service 9f2c4a
        switch (q->status) {
Packit Service 9f2c4a
        case GP_QUERY_IN:
Packit Service 9f2c4a
            /* ?! fallback and kill client conn */
Packit Service 9f2c4a
        case GP_QUERY_ERR:
Packit Service 9f2c4a
            GPDEBUGN(3, "[status] Handling query error, terminating CID %d.\n",
Packit Service 9f2c4a
                     gp_conn_get_cid(q->conn));
Packit Service 9f2c4a
            gp_conn_free(q->conn);
Packit Service 9f2c4a
            gp_query_free(q, true);
Packit Service 9f2c4a
            break;
Packit Service 9f2c4a
Packit Service 9f2c4a
        case GP_QUERY_OUT:
Packit Service 9f2c4a
            GPDEBUGN(3, "[status] Handling query reply: %p (%zu)\n", q->buffer, q->buflen);
Packit Service 9f2c4a
            gp_socket_send_data(vctx, q->conn, q->buffer, q->buflen);
Packit Service 9f2c4a
            gp_query_free(q, false);
Packit Service 9f2c4a
            break;
Packit Service 9f2c4a
        }
Packit Service 9f2c4a
    }
Packit Service 9f2c4a
Packit Service 9f2c4a
    /* while we are at it, check if there is anything in the wait list
Packit Service 9f2c4a
     * we need to process, as one thread just got free :-) */
Packit Service 9f2c4a
Packit Service 9f2c4a
    q = NULL;
Packit Service 9f2c4a
Packit Service 9f2c4a
    if (w->wait_list) {
Packit Service 9f2c4a
        /* only the dispatcher handles wait_list
Packit Service 9f2c4a
        *  so we do not need to lock around it */
Packit Service 9f2c4a
        if (w->wait_list) {
Packit Service 9f2c4a
            q = w->wait_list;
Packit Service 9f2c4a
            w->wait_list = q->next;
Packit Service 9f2c4a
            q->next = NULL;
Packit Service 9f2c4a
        }
Packit Service 9f2c4a
    }
Packit Service 9f2c4a
Packit Service 9f2c4a
    if (q) {
Packit Service 9f2c4a
        gp_query_assign(w, q);
Packit Service 9f2c4a
    }
Packit Service 9f2c4a
}
Packit Service 9f2c4a
Packit Service 9f2c4a
Packit Service 9f2c4a
/** WORKER THREADS **/
Packit Service 9f2c4a
Packit Service 9f2c4a
static void *gp_worker_main(void *pvt)
Packit Service 9f2c4a
{
Packit Service 9f2c4a
    struct gp_thread *t = (struct gp_thread *)pvt;
Packit Service 9f2c4a
    struct gp_query *q = NULL;
Packit Service 9f2c4a
    char dummy = 0;
Packit Service 9f2c4a
    int ret;
Packit Service 9f2c4a
Packit Service 9f2c4a
    while (!t->pool->shutdown) {
Packit Service 9f2c4a
Packit Service 9f2c4a
        /* initialize debug client id to 0 until work is scheduled */
Packit Service 9f2c4a
        gp_debug_set_conn_id(0);
Packit Service 9f2c4a
Packit Service 9f2c4a
        /* ======> COND_MUTEX */
Packit Service 9f2c4a
        pthread_mutex_lock(&t->cond_mutex);
Packit Service 9f2c4a
        while (t->query == NULL) {
Packit Service 9f2c4a
            /* wait for next query */
Packit Service 9f2c4a
            pthread_cond_wait(&t->cond_wakeup, &t->cond_mutex);
Packit Service 9f2c4a
            if (t->pool->shutdown) {
Packit Service 9f2c4a
                pthread_exit(NULL);
Packit Service 9f2c4a
            }
Packit Service 9f2c4a
        }
Packit Service 9f2c4a
Packit Service 9f2c4a
        /* grab the query off the shared pointer */
Packit Service 9f2c4a
        q = t->query;
Packit Service 9f2c4a
        t->query = NULL;
Packit Service 9f2c4a
Packit Service 9f2c4a
        /* <====== COND_MUTEX */
Packit Service 9f2c4a
        pthread_mutex_unlock(&t->cond_mutex);
Packit Service 9f2c4a
Packit Service 9f2c4a
        /* set client id before hndling requests */
Packit Service 9f2c4a
        gp_debug_set_conn_id(gp_conn_get_cid(q->conn));
Packit Service 9f2c4a
Packit Service 9f2c4a
        /* handle the client request */
Packit Service 9f2c4a
        GPDEBUGN(3, "[status] Handling query input: %p (%zu)\n", q->buffer,
Packit Service 9f2c4a
                 q->buflen);
Packit Service 9f2c4a
        gp_handle_query(t->pool, q);
Packit Service 9f2c4a
        GPDEBUGN(3 ,"[status] Handling query output: %p (%zu)\n", q->buffer,
Packit Service 9f2c4a
                 q->buflen);
Packit Service 9f2c4a
Packit Service 9f2c4a
        /* now get lock on main queue, to play with the reply list */
Packit Service 9f2c4a
        /* ======> POOL LOCK */
Packit Service 9f2c4a
        pthread_mutex_lock(&t->pool->lock);
Packit Service 9f2c4a
Packit Service 9f2c4a
        /* put back query so that dispatcher can send reply */
Packit Service 9f2c4a
        q->next = t->pool->reply_list;
Packit Service 9f2c4a
        t->pool->reply_list = q;
Packit Service 9f2c4a
Packit Service 9f2c4a
        /* add us back to the free list but only if we are not
Packit Service 9f2c4a
         * shutting down */
Packit Service 9f2c4a
        if (!t->pool->shutdown) {
Packit Service 9f2c4a
            LIST_DEL(t->pool->busy_list, t);
Packit Service 9f2c4a
            LIST_ADD(t->pool->free_list, t);
Packit Service 9f2c4a
        }
Packit Service 9f2c4a
Packit Service 9f2c4a
        /* <====== POOL LOCK */
Packit Service 9f2c4a
        pthread_mutex_unlock(&t->pool->lock);
Packit Service 9f2c4a
Packit Service 9f2c4a
        /* and wake up dispatcher so it will handle it */
Packit Service 9f2c4a
        ret = write(t->pool->sig_pipe[1], &dummy, 1);
Packit Service 9f2c4a
        if (ret == -1) {
Packit Service 9f2c4a
            GPERROR("Failed to signal dispatcher!");
Packit Service 9f2c4a
        }
Packit Service 9f2c4a
    }
Packit Service 9f2c4a
Packit Service 9f2c4a
    pthread_exit(NULL);
Packit Service 9f2c4a
}
Packit Service 9f2c4a
Packit Service 9f2c4a
static void gp_handle_query(struct gp_workers *w, struct gp_query *q)
Packit Service 9f2c4a
{
Packit Service 9f2c4a
    struct gp_call_ctx gpcall = { 0 };
Packit Service 9f2c4a
    uint8_t *buffer;
Packit Service 9f2c4a
    size_t buflen;
Packit Service 9f2c4a
    int ret;
Packit Service 9f2c4a
Packit Service 9f2c4a
    /* find service */
Packit Service 9f2c4a
    gpcall.gpctx = w->gpctx;
Packit Service 9f2c4a
    gpcall.service = gp_creds_match_conn(w->gpctx, q->conn);
Packit Service 9f2c4a
    if (!gpcall.service) {
Packit Service 9f2c4a
        q->status = GP_QUERY_ERR;
Packit Service 9f2c4a
        return;
Packit Service 9f2c4a
    }
Packit Service 9f2c4a
    gpcall.connection = q->conn;
Packit Service 9f2c4a
Packit Service 9f2c4a
    ret = gp_rpc_process_call(&gpcall,
Packit Service 9f2c4a
                              q->buffer, q->buflen,
Packit Service 9f2c4a
                              &buffer, &buflen);
Packit Service 9f2c4a
    if (ret) {
Packit Service 9f2c4a
        q->status = GP_QUERY_ERR;
Packit Service 9f2c4a
    } else {
Packit Service 9f2c4a
        q->status = GP_QUERY_OUT;
Packit Service 9f2c4a
        free(q->buffer);
Packit Service 9f2c4a
        q->buffer = buffer;
Packit Service 9f2c4a
        q->buflen = buflen;
Packit Service 9f2c4a
    }
Packit Service 9f2c4a
Packit Service 9f2c4a
    if (gpcall.destroy_callback) {
Packit Service 9f2c4a
        gpcall.destroy_callback(gpcall.destroy_callback_data);
Packit Service 9f2c4a
    }
Packit Service 9f2c4a
}
Packit Service 9f2c4a