Blame modules/http2/h2_workers.c

Packit 90a5c9
/* Licensed to the Apache Software Foundation (ASF) under one or more
Packit 90a5c9
 * contributor license agreements.  See the NOTICE file distributed with
Packit 90a5c9
 * this work for additional information regarding copyright ownership.
Packit 90a5c9
 * The ASF licenses this file to You under the Apache License, Version 2.0
Packit 90a5c9
 * (the "License"); you may not use this file except in compliance with
Packit 90a5c9
 * the License.  You may obtain a copy of the License at
Packit 90a5c9
 *
Packit 90a5c9
 *     http://www.apache.org/licenses/LICENSE-2.0
Packit 90a5c9
 *
Packit 90a5c9
 * Unless required by applicable law or agreed to in writing, software
Packit 90a5c9
 * distributed under the License is distributed on an "AS IS" BASIS,
Packit 90a5c9
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Packit 90a5c9
 * See the License for the specific language governing permissions and
Packit 90a5c9
 * limitations under the License.
Packit 90a5c9
 */
Packit 90a5c9
Packit 90a5c9
#include <assert.h>
Packit 90a5c9
#include <apr_atomic.h>
Packit 90a5c9
#include <apr_thread_mutex.h>
Packit 90a5c9
#include <apr_thread_cond.h>
Packit 90a5c9
Packit 90a5c9
#include <mpm_common.h>
Packit 90a5c9
#include <httpd.h>
Packit 90a5c9
#include <http_core.h>
Packit 90a5c9
#include <http_log.h>
Packit 90a5c9
Packit 90a5c9
#include "h2.h"
Packit 90a5c9
#include "h2_private.h"
Packit 90a5c9
#include "h2_mplx.h"
Packit 90a5c9
#include "h2_task.h"
Packit 90a5c9
#include "h2_workers.h"
Packit 90a5c9
#include "h2_util.h"
Packit 90a5c9
Packit 90a5c9
typedef struct h2_slot h2_slot;
Packit 90a5c9
struct h2_slot {
Packit 90a5c9
    int id;
Packit 90a5c9
    h2_slot *next;
Packit 90a5c9
    h2_workers *workers;
Packit 90a5c9
    int aborted;
Packit 90a5c9
    int sticks;
Packit 90a5c9
    h2_task *task;
Packit 90a5c9
    apr_thread_t *thread;
Packit 90a5c9
    apr_thread_mutex_t *lock;
Packit 90a5c9
    apr_thread_cond_t *not_idle;
Packit 90a5c9
};
Packit 90a5c9
Packit 90a5c9
static h2_slot *pop_slot(h2_slot **phead) 
Packit 90a5c9
{
Packit 90a5c9
    /* Atomically pop a slot from the list */
Packit 90a5c9
    for (;;) {
Packit 90a5c9
        h2_slot *first = *phead;
Packit 90a5c9
        if (first == NULL) {
Packit 90a5c9
            return NULL;
Packit 90a5c9
        }
Packit 90a5c9
        if (apr_atomic_casptr((void*)phead, first->next, first) == first) {
Packit 90a5c9
            first->next = NULL;
Packit 90a5c9
            return first;
Packit 90a5c9
        }
Packit 90a5c9
    }
Packit 90a5c9
}
Packit 90a5c9
Packit 90a5c9
static void push_slot(h2_slot **phead, h2_slot *slot)
Packit 90a5c9
{
Packit 90a5c9
    /* Atomically push a slot to the list */
Packit 90a5c9
    ap_assert(!slot->next);
Packit 90a5c9
    for (;;) {
Packit 90a5c9
        h2_slot *next = slot->next = *phead;
Packit 90a5c9
        if (apr_atomic_casptr((void*)phead, slot, next) == next) {
Packit 90a5c9
            return;
Packit 90a5c9
        }
Packit 90a5c9
    }
Packit 90a5c9
}
Packit 90a5c9
Packit 90a5c9
static void* APR_THREAD_FUNC slot_run(apr_thread_t *thread, void *wctx);
Packit 90a5c9
Packit 90a5c9
static apr_status_t activate_slot(h2_workers *workers, h2_slot *slot) 
Packit 90a5c9
{
Packit 90a5c9
    apr_status_t status;
Packit 90a5c9
    
Packit 90a5c9
    slot->workers = workers;
Packit 90a5c9
    slot->aborted = 0;
Packit 90a5c9
    slot->task = NULL;
Packit 90a5c9
Packit 90a5c9
    if (!slot->lock) {
Packit 90a5c9
        status = apr_thread_mutex_create(&slot->lock,
Packit 90a5c9
                                         APR_THREAD_MUTEX_DEFAULT,
Packit 90a5c9
                                         workers->pool);
Packit 90a5c9
        if (status != APR_SUCCESS) {
Packit 90a5c9
            push_slot(&workers->free, slot);
Packit 90a5c9
            return status;
Packit 90a5c9
        }
Packit 90a5c9
    }
Packit 90a5c9
Packit 90a5c9
    if (!slot->not_idle) {
Packit 90a5c9
        status = apr_thread_cond_create(&slot->not_idle, workers->pool);
Packit 90a5c9
        if (status != APR_SUCCESS) {
Packit 90a5c9
            push_slot(&workers->free, slot);
Packit 90a5c9
            return status;
Packit 90a5c9
        }
Packit 90a5c9
    }
Packit 90a5c9
    
Packit 90a5c9
    ap_log_error(APLOG_MARK, APLOG_TRACE2, 0, workers->s,
Packit 90a5c9
                 "h2_workers: new thread for slot %d", slot->id); 
Packit 90a5c9
    /* thread will either immediately start work or add itself
Packit 90a5c9
     * to the idle queue */
Packit 90a5c9
    apr_thread_create(&slot->thread, workers->thread_attr, slot_run, slot, 
Packit 90a5c9
                      workers->pool);
Packit 90a5c9
    if (!slot->thread) {
Packit 90a5c9
        push_slot(&workers->free, slot);
Packit 90a5c9
        return APR_ENOMEM;
Packit 90a5c9
    }
Packit 90a5c9
    
Packit 90a5c9
    apr_atomic_inc32(&workers->worker_count);
Packit 90a5c9
    return APR_SUCCESS;
Packit 90a5c9
}
Packit 90a5c9
Packit 90a5c9
static apr_status_t add_worker(h2_workers *workers)
Packit 90a5c9
{
Packit 90a5c9
    h2_slot *slot = pop_slot(&workers->free);
Packit 90a5c9
    if (slot) {
Packit 90a5c9
        return activate_slot(workers, slot);
Packit 90a5c9
    }
Packit 90a5c9
    return APR_EAGAIN;
Packit 90a5c9
}
Packit 90a5c9
Packit 90a5c9
static void wake_idle_worker(h2_workers *workers) 
Packit 90a5c9
{
Packit 90a5c9
    h2_slot *slot = pop_slot(&workers->idle);
Packit 90a5c9
    if (slot) {
Packit 90a5c9
        apr_thread_mutex_lock(slot->lock);
Packit 90a5c9
        apr_thread_cond_signal(slot->not_idle);
Packit 90a5c9
        apr_thread_mutex_unlock(slot->lock);
Packit 90a5c9
    }
Packit 90a5c9
    else if (workers->dynamic) {
Packit 90a5c9
        add_worker(workers);
Packit 90a5c9
    }
Packit 90a5c9
}
Packit 90a5c9
Packit 90a5c9
static void cleanup_zombies(h2_workers *workers)
Packit 90a5c9
{
Packit 90a5c9
    h2_slot *slot;
Packit 90a5c9
    while ((slot = pop_slot(&workers->zombies))) {
Packit 90a5c9
        if (slot->thread) {
Packit 90a5c9
            apr_status_t status;
Packit 90a5c9
            apr_thread_join(&status, slot->thread);
Packit 90a5c9
            slot->thread = NULL;
Packit 90a5c9
        }
Packit 90a5c9
        apr_atomic_dec32(&workers->worker_count);
Packit 90a5c9
        slot->next = NULL;
Packit 90a5c9
        push_slot(&workers->free, slot);
Packit 90a5c9
    }
Packit 90a5c9
}
Packit 90a5c9
Packit 90a5c9
static apr_status_t slot_pull_task(h2_slot *slot, h2_mplx *m)
Packit 90a5c9
{
Packit 90a5c9
    apr_status_t rv;
Packit 90a5c9
    
Packit 90a5c9
    rv = h2_mplx_pop_task(m, &slot->task);
Packit 90a5c9
    if (slot->task) {
Packit 90a5c9
        /* Ok, we got something to give back to the worker for execution. 
Packit 90a5c9
         * If we still have idle workers, we let the worker be sticky, 
Packit 90a5c9
         * e.g. making it poll the task's h2_mplx instance for more work 
Packit 90a5c9
         * before asking back here. */
Packit 90a5c9
        slot->sticks = slot->workers->max_workers;
Packit 90a5c9
        return rv;            
Packit 90a5c9
    }
Packit 90a5c9
    slot->sticks = 0;
Packit 90a5c9
    return APR_EOF;
Packit 90a5c9
}
Packit 90a5c9
Packit 90a5c9
static h2_fifo_op_t mplx_peek(void *head, void *ctx)
Packit 90a5c9
{
Packit 90a5c9
    h2_mplx *m = head;
Packit 90a5c9
    h2_slot *slot = ctx;
Packit 90a5c9
    
Packit 90a5c9
    if (slot_pull_task(slot, m) == APR_EAGAIN) {
Packit 90a5c9
        wake_idle_worker(slot->workers);
Packit 90a5c9
        return H2_FIFO_OP_REPUSH;
Packit 90a5c9
    } 
Packit 90a5c9
    return H2_FIFO_OP_PULL;
Packit 90a5c9
}
Packit 90a5c9
Packit 90a5c9
/**
Packit 90a5c9
 * Get the next task for the given worker. Will block until a task arrives
Packit 90a5c9
 * or the max_wait timer expires and more than min workers exist.
Packit 90a5c9
 */
Packit 90a5c9
static apr_status_t get_next(h2_slot *slot)
Packit 90a5c9
{
Packit 90a5c9
    h2_workers *workers = slot->workers;
Packit 90a5c9
    apr_status_t status;
Packit 90a5c9
    
Packit 90a5c9
    slot->task = NULL;
Packit 90a5c9
    while (!slot->aborted) {
Packit 90a5c9
        if (!slot->task) {
Packit 90a5c9
            status = h2_fifo_try_peek(workers->mplxs, mplx_peek, slot);
Packit 90a5c9
            if (status == APR_EOF) {
Packit 90a5c9
                return status;
Packit 90a5c9
            }
Packit 90a5c9
        }
Packit 90a5c9
        
Packit 90a5c9
        if (slot->task) {
Packit 90a5c9
            return APR_SUCCESS;
Packit 90a5c9
        }
Packit 90a5c9
        
Packit 90a5c9
        cleanup_zombies(workers);
Packit 90a5c9
Packit 90a5c9
        apr_thread_mutex_lock(slot->lock);
Packit 90a5c9
        push_slot(&workers->idle, slot);
Packit 90a5c9
        apr_thread_cond_wait(slot->not_idle, slot->lock);
Packit 90a5c9
        apr_thread_mutex_unlock(slot->lock);
Packit 90a5c9
    }
Packit 90a5c9
    return APR_EOF;
Packit 90a5c9
}
Packit 90a5c9
Packit 90a5c9
static void slot_done(h2_slot *slot)
Packit 90a5c9
{
Packit 90a5c9
    push_slot(&(slot->workers->zombies), slot);
Packit 90a5c9
}
Packit 90a5c9
Packit 90a5c9
Packit 90a5c9
static void* APR_THREAD_FUNC slot_run(apr_thread_t *thread, void *wctx)
Packit 90a5c9
{
Packit 90a5c9
    h2_slot *slot = wctx;
Packit 90a5c9
    
Packit 90a5c9
    while (!slot->aborted) {
Packit 90a5c9
Packit 90a5c9
        /* Get a h2_task from the mplxs queue. */
Packit 90a5c9
        get_next(slot);
Packit 90a5c9
        while (slot->task) {
Packit 90a5c9
        
Packit 90a5c9
            h2_task_do(slot->task, thread, slot->id);
Packit 90a5c9
            
Packit 90a5c9
            /* Report the task as done. If stickyness is left, offer the
Packit 90a5c9
             * mplx the opportunity to give us back a new task right away.
Packit 90a5c9
             */
Packit 90a5c9
            if (!slot->aborted && (--slot->sticks > 0)) {
Packit 90a5c9
                h2_mplx_task_done(slot->task->mplx, slot->task, &slot->task);
Packit 90a5c9
            }
Packit 90a5c9
            else {
Packit 90a5c9
                h2_mplx_task_done(slot->task->mplx, slot->task, NULL);
Packit 90a5c9
                slot->task = NULL;
Packit 90a5c9
            }
Packit 90a5c9
        }
Packit 90a5c9
    }
Packit 90a5c9
Packit 90a5c9
    slot_done(slot);
Packit 90a5c9
    return NULL;
Packit 90a5c9
}
Packit 90a5c9
Packit 90a5c9
static apr_status_t workers_pool_cleanup(void *data)
Packit 90a5c9
{
Packit 90a5c9
    h2_workers *workers = data;
Packit 90a5c9
    h2_slot *slot;
Packit 90a5c9
    
Packit 90a5c9
    if (!workers->aborted) {
Packit 90a5c9
        workers->aborted = 1;
Packit 90a5c9
        /* abort all idle slots */
Packit 90a5c9
        for (;;) {
Packit 90a5c9
            slot = pop_slot(&workers->idle);
Packit 90a5c9
            if (slot) {
Packit 90a5c9
                apr_thread_mutex_lock(slot->lock);
Packit 90a5c9
                slot->aborted = 1;
Packit 90a5c9
                apr_thread_cond_signal(slot->not_idle);
Packit 90a5c9
                apr_thread_mutex_unlock(slot->lock);
Packit 90a5c9
            }
Packit 90a5c9
            else {
Packit 90a5c9
                break;
Packit 90a5c9
            }
Packit 90a5c9
        }
Packit 90a5c9
Packit 90a5c9
        h2_fifo_term(workers->mplxs);
Packit 90a5c9
        h2_fifo_interrupt(workers->mplxs);
Packit 90a5c9
Packit 90a5c9
        cleanup_zombies(workers);
Packit 90a5c9
    }
Packit 90a5c9
    return APR_SUCCESS;
Packit 90a5c9
}
Packit 90a5c9
Packit 90a5c9
h2_workers *h2_workers_create(server_rec *s, apr_pool_t *server_pool,
Packit 90a5c9
                              int min_workers, int max_workers,
Packit 90a5c9
                              int idle_secs)
Packit 90a5c9
{
Packit 90a5c9
    apr_status_t status;
Packit 90a5c9
    h2_workers *workers;
Packit 90a5c9
    apr_pool_t *pool;
Packit 90a5c9
    int i, n;
Packit 90a5c9
Packit 90a5c9
    ap_assert(s);
Packit 90a5c9
    ap_assert(server_pool);
Packit 90a5c9
Packit 90a5c9
    /* let's have our own pool that will be parent to all h2_worker
Packit 90a5c9
     * instances we create. This happens in various threads, but always
Packit 90a5c9
     * guarded by our lock. Without this pool, all subpool creations would
Packit 90a5c9
     * happen on the pool handed to us, which we do not guard.
Packit 90a5c9
     */
Packit 90a5c9
    apr_pool_create(&pool, server_pool);
Packit 90a5c9
    apr_pool_tag(pool, "h2_workers");
Packit 90a5c9
    workers = apr_pcalloc(pool, sizeof(h2_workers));
Packit 90a5c9
    if (!workers) {
Packit 90a5c9
        return NULL;
Packit 90a5c9
    }
Packit 90a5c9
    
Packit 90a5c9
    workers->s = s;
Packit 90a5c9
    workers->pool = pool;
Packit 90a5c9
    workers->min_workers = min_workers;
Packit 90a5c9
    workers->max_workers = max_workers;
Packit 90a5c9
    workers->max_idle_secs = (idle_secs > 0)? idle_secs : 10;
Packit 90a5c9
Packit 90a5c9
    /* FIXME: the fifo set we use here has limited capacity. Once the
Packit 90a5c9
     * set is full, connections with new requests do a wait. Unfortunately,
Packit 90a5c9
     * we have optimizations in place there that makes such waiting "unfair"
Packit 90a5c9
     * in the sense that it may take connections a looong time to get scheduled.
Packit 90a5c9
     *
Packit 90a5c9
     * Need to rewrite this to use one of our double-linked lists and a mutex
Packit 90a5c9
     * to have unlimited capacity and fair scheduling.
Packit 90a5c9
     *
Packit 90a5c9
     * For now, we just make enough room to have many connections inside one
Packit 90a5c9
     * process.
Packit 90a5c9
     */
Packit 90a5c9
    status = h2_fifo_set_create(&workers->mplxs, pool, 8 * 1024);
Packit 90a5c9
    if (status != APR_SUCCESS) {
Packit 90a5c9
        return NULL;
Packit 90a5c9
    }
Packit 90a5c9
    
Packit 90a5c9
    status = apr_threadattr_create(&workers->thread_attr, workers->pool);
Packit 90a5c9
    if (status != APR_SUCCESS) {
Packit 90a5c9
        return NULL;
Packit 90a5c9
    }
Packit 90a5c9
    
Packit 90a5c9
    if (ap_thread_stacksize != 0) {
Packit 90a5c9
        apr_threadattr_stacksize_set(workers->thread_attr,
Packit 90a5c9
                                     ap_thread_stacksize);
Packit 90a5c9
        ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, s,
Packit 90a5c9
                     "h2_workers: using stacksize=%ld", 
Packit 90a5c9
                     (long)ap_thread_stacksize);
Packit 90a5c9
    }
Packit 90a5c9
    
Packit 90a5c9
    status = apr_thread_mutex_create(&workers->lock,
Packit 90a5c9
                                     APR_THREAD_MUTEX_DEFAULT,
Packit 90a5c9
                                     workers->pool);
Packit 90a5c9
    if (status == APR_SUCCESS) {        
Packit 90a5c9
        n = workers->nslots = workers->max_workers;
Packit 90a5c9
        workers->slots = apr_pcalloc(workers->pool, n * sizeof(h2_slot));
Packit 90a5c9
        if (workers->slots == NULL) {
Packit 90a5c9
            workers->nslots = 0;
Packit 90a5c9
            status = APR_ENOMEM;
Packit 90a5c9
        }
Packit 90a5c9
        for (i = 0; i < n; ++i) {
Packit 90a5c9
            workers->slots[i].id = i;
Packit 90a5c9
        }
Packit 90a5c9
    }
Packit 90a5c9
    if (status == APR_SUCCESS) {
Packit 90a5c9
        /* we activate all for now, TODO: support min_workers again.
Packit 90a5c9
         * do this in reverse for vanity reasons so slot 0 will most
Packit 90a5c9
         * likely be at head of idle queue. */
Packit 90a5c9
        n = workers->max_workers;
Packit 90a5c9
        for (i = n-1; i >= 0; --i) {
Packit 90a5c9
            status = activate_slot(workers, &workers->slots[i]);
Packit 90a5c9
        }
Packit 90a5c9
        /* the rest of the slots go on the free list */
Packit 90a5c9
        for(i = n; i < workers->nslots; ++i) {
Packit 90a5c9
            push_slot(&workers->free, &workers->slots[i]);
Packit 90a5c9
        }
Packit 90a5c9
        workers->dynamic = (workers->worker_count < workers->max_workers);
Packit 90a5c9
    }
Packit 90a5c9
    if (status == APR_SUCCESS) {
Packit 90a5c9
        apr_pool_pre_cleanup_register(pool, workers, workers_pool_cleanup);    
Packit 90a5c9
        return workers;
Packit 90a5c9
    }
Packit 90a5c9
    return NULL;
Packit 90a5c9
}
Packit 90a5c9
Packit 90a5c9
apr_status_t h2_workers_register(h2_workers *workers, struct h2_mplx *m)
Packit 90a5c9
{
Packit 90a5c9
    apr_status_t status = h2_fifo_push(workers->mplxs, m);
Packit 90a5c9
    wake_idle_worker(workers);
Packit 90a5c9
    return status;
Packit 90a5c9
}
Packit 90a5c9
Packit 90a5c9
apr_status_t h2_workers_unregister(h2_workers *workers, struct h2_mplx *m)
Packit 90a5c9
{
Packit 90a5c9
    return h2_fifo_remove(workers->mplxs, m);
Packit 90a5c9
}