Blob Blame History Raw
/* Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
 
#include <assert.h>
#include <stddef.h>
#include <stdlib.h>

#include <apr_thread_mutex.h>
#include <apr_thread_cond.h>
#include <apr_strings.h>
#include <apr_time.h>

#include <httpd.h>
#include <http_core.h>
#include <http_log.h>

#include "mod_http2.h"

#include "h2_private.h"
#include "h2.h"
#include "h2_config.h"
#include "h2_conn.h"
#include "h2_ctx.h"
#include "h2_h2.h"
#include "h2_mplx.h"
#include "h2_request.h"
#include "h2_task.h"
#include "h2_util.h"
#include "h2_ngn_shed.h"


typedef struct h2_ngn_entry h2_ngn_entry;
struct h2_ngn_entry {
    APR_RING_ENTRY(h2_ngn_entry) link;
    h2_task *task;
    request_rec *r;
};

#define H2_NGN_ENTRY_NEXT(e)	APR_RING_NEXT((e), link)
#define H2_NGN_ENTRY_PREV(e)	APR_RING_PREV((e), link)
#define H2_NGN_ENTRY_REMOVE(e)	APR_RING_REMOVE((e), link)

#define H2_REQ_ENTRIES_SENTINEL(b)	APR_RING_SENTINEL((b), h2_ngn_entry, link)
#define H2_REQ_ENTRIES_EMPTY(b)	APR_RING_EMPTY((b), h2_ngn_entry, link)
#define H2_REQ_ENTRIES_FIRST(b)	APR_RING_FIRST(b)
#define H2_REQ_ENTRIES_LAST(b)	APR_RING_LAST(b)

#define H2_REQ_ENTRIES_INSERT_HEAD(b, e) do {				\
h2_ngn_entry *ap__b = (e);                                        \
APR_RING_INSERT_HEAD((b), ap__b, h2_ngn_entry, link);	\
} while (0)

#define H2_REQ_ENTRIES_INSERT_TAIL(b, e) do {				\
h2_ngn_entry *ap__b = (e);					\
APR_RING_INSERT_TAIL((b), ap__b, h2_ngn_entry, link);	\
} while (0)

struct h2_req_engine {
    const char *id;        /* identifier */
    const char *type;      /* name of the engine type */
    apr_pool_t *pool;      /* pool for engine specific allocations */
    conn_rec *c;           /* connection this engine is assigned to */
    h2_task *task;         /* the task this engine is based on, running in */
    h2_ngn_shed *shed;

    unsigned int shutdown : 1; /* engine is being shut down */
    unsigned int done : 1;     /* engine has finished */

    APR_RING_HEAD(h2_req_entries, h2_ngn_entry) entries;
    int capacity;     /* maximum concurrent requests */
    int no_assigned;  /* # of assigned requests */
    int no_live;      /* # of live */
    int no_finished;  /* # of finished */
    
    h2_output_consumed *out_consumed;
    void *out_consumed_ctx;
};

const char *h2_req_engine_get_id(h2_req_engine *engine)
{
    return engine->id;
}

int h2_req_engine_is_shutdown(h2_req_engine *engine)
{
    return engine->shutdown;
}

void h2_req_engine_out_consumed(h2_req_engine *engine, conn_rec *c, 
                                apr_off_t bytes)
{
    if (engine->out_consumed) {
        engine->out_consumed(engine->out_consumed_ctx, c, bytes);
    }
}

h2_ngn_shed *h2_ngn_shed_create(apr_pool_t *pool, conn_rec *c,
                                int default_capacity, 
                                apr_size_t req_buffer_size)
{
    h2_ngn_shed *shed;
    
    shed = apr_pcalloc(pool, sizeof(*shed));
    shed->c = c;
    shed->pool = pool;
    shed->default_capacity = default_capacity;
    shed->req_buffer_size = req_buffer_size;
    shed->ngns = apr_hash_make(pool);
    
    return shed;
}

void h2_ngn_shed_set_ctx(h2_ngn_shed *shed, void *user_ctx)
{
    shed->user_ctx = user_ctx;
}

void *h2_ngn_shed_get_ctx(h2_ngn_shed *shed)
{
    return shed->user_ctx;
}

h2_ngn_shed *h2_ngn_shed_get_shed(h2_req_engine *ngn)
{
    return ngn->shed;
}

void h2_ngn_shed_abort(h2_ngn_shed *shed)
{
    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, shed->c, APLOGNO(03394)
                  "h2_ngn_shed(%ld): abort", shed->c->id);
    shed->aborted = 1;
}

static void ngn_add_task(h2_req_engine *ngn, h2_task *task, request_rec *r)
{
    h2_ngn_entry *entry = apr_pcalloc(task->pool, sizeof(*entry));
    APR_RING_ELEM_INIT(entry, link);
    entry->task = task;
    entry->r = r;
    H2_REQ_ENTRIES_INSERT_TAIL(&ngn->entries, entry);
    ngn->no_assigned++;
}


apr_status_t h2_ngn_shed_push_request(h2_ngn_shed *shed, const char *ngn_type, 
                                      request_rec *r, 
                                      http2_req_engine_init *einit) 
{
    h2_req_engine *ngn;
    h2_task *task = h2_ctx_rget_task(r);

    ap_assert(task);
    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, shed->c,
                  "h2_ngn_shed(%ld): PUSHing request (task=%s)", shed->c->id, 
                  task->id);
    if (task->request->serialize) {
        /* Max compatibility, deny processing of this */
        return APR_EOF;
    }
    
    if (task->assigned) {
        --task->assigned->no_assigned;
        --task->assigned->no_live;
        task->assigned = NULL;
    }
    
    if (task->engine) {
        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c, 
                      "h2_ngn_shed(%ld): push task(%s) hosting engine %s " 
                      "already with %d tasks", 
                      shed->c->id, task->id, task->engine->id,
                      task->engine->no_assigned);
        task->assigned = task->engine;
        ngn_add_task(task->engine, task, r);
        return APR_SUCCESS;
    }
    
    ngn = apr_hash_get(shed->ngns, ngn_type, APR_HASH_KEY_STRING);
    if (ngn && !ngn->shutdown) {
        /* this task will be processed in another thread,
         * freeze any I/O for the time being. */
        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c,
                      "h2_ngn_shed(%ld): pushing request %s to %s", 
                      shed->c->id, task->id, ngn->id);
        if (!h2_task_has_thawed(task)) {
            h2_task_freeze(task);
        }
        ngn_add_task(ngn, task, r);
        return APR_SUCCESS;
    }
    
    /* no existing engine or being shut down, start a new one */
    if (einit) {
        apr_status_t status;
        apr_pool_t *pool = task->pool;
        h2_req_engine *newngn;
        
        newngn = apr_pcalloc(pool, sizeof(*ngn));
        newngn->pool = pool;
        newngn->id   = apr_psprintf(pool, "ngn-%s", task->id);
        newngn->type = apr_pstrdup(pool, ngn_type);
        newngn->c    = task->c;
        newngn->shed = shed;
        newngn->capacity = shed->default_capacity;
        newngn->no_assigned = 1;
        newngn->no_live = 1;
        APR_RING_INIT(&newngn->entries, h2_ngn_entry, link);
        
        status = einit(newngn, newngn->id, newngn->type, newngn->pool,
                       shed->req_buffer_size, r,
                       &newngn->out_consumed, &newngn->out_consumed_ctx);
        
        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, task->c, APLOGNO(03395)
                      "h2_ngn_shed(%ld): create engine %s (%s)", 
                      shed->c->id, newngn->id, newngn->type);
        if (status == APR_SUCCESS) {
            newngn->task = task;
            task->engine = newngn;
            task->assigned = newngn;
            apr_hash_set(shed->ngns, newngn->type, APR_HASH_KEY_STRING, newngn);
        }
        return status;
    }
    return APR_EOF;
}

static h2_ngn_entry *pop_detached(h2_req_engine *ngn)
{
    h2_ngn_entry *entry;
    for (entry = H2_REQ_ENTRIES_FIRST(&ngn->entries);
         entry != H2_REQ_ENTRIES_SENTINEL(&ngn->entries);
         entry = H2_NGN_ENTRY_NEXT(entry)) {
        if (h2_task_has_thawed(entry->task) 
            || (entry->task->engine == ngn)) {
            /* The task hosting this engine can always be pulled by it.
             * For other task, they need to become detached, e.g. no longer
             * assigned to another worker. */
            H2_NGN_ENTRY_REMOVE(entry);
            return entry;
        }
    }
    return NULL;
}

apr_status_t h2_ngn_shed_pull_request(h2_ngn_shed *shed, 
                                      h2_req_engine *ngn, 
                                      int capacity, 
                                      int want_shutdown,
                                      request_rec **pr)
{   
    h2_ngn_entry *entry;
    
    ap_assert(ngn);
    *pr = NULL;
    ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, shed->c, APLOGNO(03396)
                  "h2_ngn_shed(%ld): pull task for engine %s, shutdown=%d", 
                  shed->c->id, ngn->id, want_shutdown);
    if (shed->aborted) {
        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, shed->c, APLOGNO(03397)
                      "h2_ngn_shed(%ld): abort while pulling requests %s", 
                      shed->c->id, ngn->id);
        ngn->shutdown = 1;
        return APR_ECONNABORTED;
    }
    
    ngn->capacity = capacity;
    if (H2_REQ_ENTRIES_EMPTY(&ngn->entries)) {
        if (want_shutdown) {
            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, shed->c,
                          "h2_ngn_shed(%ld): emtpy queue, shutdown engine %s", 
                          shed->c->id, ngn->id);
            ngn->shutdown = 1;
        }
        return ngn->shutdown? APR_EOF : APR_EAGAIN;
    }
    
    if ((entry = pop_detached(ngn))) {
        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, entry->task->c, APLOGNO(03398)
                      "h2_ngn_shed(%ld): pulled request %s for engine %s", 
                      shed->c->id, entry->task->id, ngn->id);
        ngn->no_live++;
        *pr = entry->r;
        entry->task->assigned = ngn;
        /* task will now run in ngn's own thread. Modules like lua
         * seem to require the correct thread set in the conn_rec.
         * See PR 59542. */
        if (entry->task->c && ngn->c) {
            entry->task->c->current_thread = ngn->c->current_thread;
        }
        if (entry->task->engine == ngn) {
            /* If an engine pushes its own base task, and then pulls
             * it back to itself again, it needs to be thawed.
             */
            h2_task_thaw(entry->task);
        }
        return APR_SUCCESS;
    }
    
    if (1) {
        h2_ngn_entry *entry = H2_REQ_ENTRIES_FIRST(&ngn->entries);
        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, shed->c, APLOGNO(03399)
                      "h2_ngn_shed(%ld): pull task, nothing, first task %s", 
                      shed->c->id, entry->task->id);
    }
    return APR_EAGAIN;
}
                                 
static apr_status_t ngn_done_task(h2_ngn_shed *shed, h2_req_engine *ngn, 
                                  h2_task *task, int waslive, int aborted)
{
    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, shed->c, APLOGNO(03400)
                  "h2_ngn_shed(%ld): task %s %s by %s", 
                  shed->c->id, task->id, aborted? "aborted":"done", ngn->id);
    ngn->no_finished++;
    if (waslive) ngn->no_live--;
    ngn->no_assigned--;
    task->assigned = NULL;
    
    return APR_SUCCESS;
}
                                
apr_status_t h2_ngn_shed_done_task(h2_ngn_shed *shed, 
                                    struct h2_req_engine *ngn, h2_task *task)
{
    return ngn_done_task(shed, ngn, task, 1, 0);
}
                                
void h2_ngn_shed_done_ngn(h2_ngn_shed *shed, struct h2_req_engine *ngn)
{
    if (ngn->done) {
        return;
    }
    
    if (!shed->aborted && !H2_REQ_ENTRIES_EMPTY(&ngn->entries)) {
        h2_ngn_entry *entry;
        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, shed->c,
                      "h2_ngn_shed(%ld): exit engine %s (%s), "
                      "has still requests queued, shutdown=%d,"
                      "assigned=%ld, live=%ld, finished=%ld", 
                      shed->c->id, ngn->id, ngn->type,
                      ngn->shutdown, 
                      (long)ngn->no_assigned, (long)ngn->no_live,
                      (long)ngn->no_finished);
        for (entry = H2_REQ_ENTRIES_FIRST(&ngn->entries);
             entry != H2_REQ_ENTRIES_SENTINEL(&ngn->entries);
             entry = H2_NGN_ENTRY_NEXT(entry)) {
            h2_task *task = entry->task;
            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, shed->c,
                          "h2_ngn_shed(%ld): engine %s has queued task %s, "
                          "frozen=%d, aborting",
                          shed->c->id, ngn->id, task->id, task->frozen);
            ngn_done_task(shed, ngn, task, 0, 1);
            task->engine = task->assigned = NULL;
        }
    }
    if (!shed->aborted && (ngn->no_assigned > 1 || ngn->no_live > 1)) {
        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, shed->c,
                      "h2_ngn_shed(%ld): exit engine %s (%s), "
                      "assigned=%ld, live=%ld, finished=%ld", 
                      shed->c->id, ngn->id, ngn->type,
                      (long)ngn->no_assigned, (long)ngn->no_live,
                      (long)ngn->no_finished);
    }
    else {
        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, shed->c,
                      "h2_ngn_shed(%ld): exit engine %s", 
                      shed->c->id, ngn->id);
    }
    
    apr_hash_set(shed->ngns, ngn->type, APR_HASH_KEY_STRING, NULL);
    ngn->done = 1;
}

void h2_ngn_shed_destroy(h2_ngn_shed *shed)
{
    ap_assert(apr_hash_count(shed->ngns) == 0);
}