|
Packit |
90a5c9 |
/* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
Packit |
90a5c9 |
* contributor license agreements. See the NOTICE file distributed with
|
|
Packit |
90a5c9 |
* this work for additional information regarding copyright ownership.
|
|
Packit |
90a5c9 |
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
Packit |
90a5c9 |
* (the "License"); you may not use this file except in compliance with
|
|
Packit |
90a5c9 |
* the License. You may obtain a copy of the License at
|
|
Packit |
90a5c9 |
*
|
|
Packit |
90a5c9 |
* http://www.apache.org/licenses/LICENSE-2.0
|
|
Packit |
90a5c9 |
*
|
|
Packit |
90a5c9 |
* Unless required by applicable law or agreed to in writing, software
|
|
Packit |
90a5c9 |
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
Packit |
90a5c9 |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
Packit |
90a5c9 |
* See the License for the specific language governing permissions and
|
|
Packit |
90a5c9 |
* limitations under the License.
|
|
Packit |
90a5c9 |
*/
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
#include <assert.h>
|
|
Packit |
90a5c9 |
#include <stddef.h>
|
|
Packit |
90a5c9 |
#include <stdlib.h>
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
#include <apr_atomic.h>
|
|
Packit |
90a5c9 |
#include <apr_thread_mutex.h>
|
|
Packit |
90a5c9 |
#include <apr_thread_cond.h>
|
|
Packit |
90a5c9 |
#include <apr_strings.h>
|
|
Packit |
90a5c9 |
#include <apr_time.h>
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
#include <httpd.h>
|
|
Packit |
90a5c9 |
#include <http_core.h>
|
|
Packit |
90a5c9 |
#include <http_log.h>
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
#include <mpm_common.h>
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
#include "mod_http2.h"
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
#include "h2.h"
|
|
Packit |
90a5c9 |
#include "h2_private.h"
|
|
Packit |
90a5c9 |
#include "h2_bucket_beam.h"
|
|
Packit |
90a5c9 |
#include "h2_config.h"
|
|
Packit |
90a5c9 |
#include "h2_conn.h"
|
|
Packit |
90a5c9 |
#include "h2_ctx.h"
|
|
Packit |
90a5c9 |
#include "h2_h2.h"
|
|
Packit |
90a5c9 |
#include "h2_mplx.h"
|
|
Packit |
90a5c9 |
#include "h2_ngn_shed.h"
|
|
Packit |
90a5c9 |
#include "h2_request.h"
|
|
Packit |
90a5c9 |
#include "h2_stream.h"
|
|
Packit |
90a5c9 |
#include "h2_session.h"
|
|
Packit |
90a5c9 |
#include "h2_task.h"
|
|
Packit |
90a5c9 |
#include "h2_workers.h"
|
|
Packit |
90a5c9 |
#include "h2_util.h"
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
/* utility for iterating over ihash stream sets */
|
|
Packit |
90a5c9 |
typedef struct {
|
|
Packit |
90a5c9 |
h2_mplx *m;
|
|
Packit |
90a5c9 |
h2_stream *stream;
|
|
Packit |
90a5c9 |
apr_time_t now;
|
|
Packit |
90a5c9 |
} stream_iter_ctx;
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
apr_status_t h2_mplx_child_init(apr_pool_t *pool, server_rec *s)
|
|
Packit |
90a5c9 |
{
|
|
Packit |
90a5c9 |
return APR_SUCCESS;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
#define H2_MPLX_ENTER(m) \
|
|
Packit |
90a5c9 |
do { apr_status_t rv; if ((rv = apr_thread_mutex_lock(m->lock)) != APR_SUCCESS) {\
|
|
Packit |
90a5c9 |
return rv;\
|
|
Packit |
90a5c9 |
} } while(0)
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
#define H2_MPLX_LEAVE(m) \
|
|
Packit |
90a5c9 |
apr_thread_mutex_unlock(m->lock)
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
#define H2_MPLX_ENTER_ALWAYS(m) \
|
|
Packit |
90a5c9 |
apr_thread_mutex_lock(m->lock)
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
#define H2_MPLX_ENTER_MAYBE(m, lock) \
|
|
Packit |
90a5c9 |
if (lock) apr_thread_mutex_lock(m->lock)
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
#define H2_MPLX_LEAVE_MAYBE(m, lock) \
|
|
Packit |
90a5c9 |
if (lock) apr_thread_mutex_unlock(m->lock)
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
static void check_data_for(h2_mplx *m, h2_stream *stream, int lock);
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
static void stream_output_consumed(void *ctx,
|
|
Packit |
90a5c9 |
h2_bucket_beam *beam, apr_off_t length)
|
|
Packit |
90a5c9 |
{
|
|
Packit |
90a5c9 |
h2_stream *stream = ctx;
|
|
Packit |
90a5c9 |
h2_task *task = stream->task;
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
if (length > 0 && task && task->assigned) {
|
|
Packit |
90a5c9 |
h2_req_engine_out_consumed(task->assigned, task->c, length);
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
static void stream_input_ev(void *ctx, h2_bucket_beam *beam)
|
|
Packit |
90a5c9 |
{
|
|
Packit |
90a5c9 |
h2_stream *stream = ctx;
|
|
Packit |
90a5c9 |
h2_mplx *m = stream->session->mplx;
|
|
Packit |
90a5c9 |
apr_atomic_set32(&m->event_pending, 1);
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
static void stream_input_consumed(void *ctx, h2_bucket_beam *beam, apr_off_t length)
|
|
Packit |
90a5c9 |
{
|
|
Packit |
90a5c9 |
h2_stream_in_consumed(ctx, length);
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
static void stream_joined(h2_mplx *m, h2_stream *stream)
|
|
Packit |
90a5c9 |
{
|
|
Packit |
90a5c9 |
ap_assert(!stream->task || stream->task->worker_done);
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
h2_ihash_remove(m->shold, stream->id);
|
|
Packit |
90a5c9 |
h2_ihash_add(m->spurge, stream);
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
static void stream_cleanup(h2_mplx *m, h2_stream *stream)
|
|
Packit |
90a5c9 |
{
|
|
Packit |
90a5c9 |
ap_assert(stream->state == H2_SS_CLEANUP);
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
if (stream->input) {
|
|
Packit |
90a5c9 |
h2_beam_on_consumed(stream->input, NULL, NULL, NULL);
|
|
Packit |
90a5c9 |
h2_beam_abort(stream->input);
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
if (stream->output) {
|
|
Packit |
90a5c9 |
h2_beam_on_produced(stream->output, NULL, NULL);
|
|
Packit |
90a5c9 |
h2_beam_leave(stream->output);
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
h2_stream_cleanup(stream);
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
h2_ihash_remove(m->streams, stream->id);
|
|
Packit |
90a5c9 |
h2_iq_remove(m->q, stream->id);
|
|
Packit |
90a5c9 |
h2_ififo_remove(m->readyq, stream->id);
|
|
Packit |
90a5c9 |
h2_ihash_add(m->shold, stream);
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
if (!stream->task || stream->task->worker_done) {
|
|
Packit |
90a5c9 |
stream_joined(m, stream);
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
else if (stream->task) {
|
|
Packit |
90a5c9 |
stream->task->c->aborted = 1;
|
|
Packit |
90a5c9 |
apr_thread_cond_broadcast(m->task_thawed);
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
/**
|
|
Packit |
90a5c9 |
* A h2_mplx needs to be thread-safe *and* if will be called by
|
|
Packit |
90a5c9 |
* the h2_session thread *and* the h2_worker threads. Therefore:
|
|
Packit |
90a5c9 |
* - calls are protected by a mutex lock, m->lock
|
|
Packit |
90a5c9 |
* - the pool needs its own allocator, since apr_allocator_t are
|
|
Packit |
90a5c9 |
* not re-entrant. The separate allocator works without a
|
|
Packit |
90a5c9 |
* separate lock since we already protect h2_mplx itself.
|
|
Packit |
90a5c9 |
* Since HTTP/2 connections can be expected to live longer than
|
|
Packit |
90a5c9 |
* their HTTP/1 cousins, the separate allocator seems to work better
|
|
Packit |
90a5c9 |
* than protecting a shared h2_session one with an own lock.
|
|
Packit |
90a5c9 |
*/
|
|
Packit |
90a5c9 |
h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent,
|
|
Packit |
90a5c9 |
const h2_config *conf,
|
|
Packit |
90a5c9 |
h2_workers *workers)
|
|
Packit |
90a5c9 |
{
|
|
Packit |
90a5c9 |
apr_status_t status = APR_SUCCESS;
|
|
Packit |
90a5c9 |
apr_allocator_t *allocator;
|
|
Packit |
90a5c9 |
apr_thread_mutex_t *mutex;
|
|
Packit |
90a5c9 |
h2_mplx *m;
|
|
Packit |
90a5c9 |
h2_ctx *ctx = h2_ctx_get(c, 0);
|
|
Packit |
90a5c9 |
ap_assert(conf);
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
m = apr_pcalloc(parent, sizeof(h2_mplx));
|
|
Packit |
90a5c9 |
if (m) {
|
|
Packit |
90a5c9 |
m->id = c->id;
|
|
Packit |
90a5c9 |
m->c = c;
|
|
Packit |
90a5c9 |
m->s = (ctx? h2_ctx_server_get(ctx) : NULL);
|
|
Packit |
90a5c9 |
if (!m->s) {
|
|
Packit |
90a5c9 |
m->s = c->base_server;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
/* We create a pool with its own allocator to be used for
|
|
Packit |
90a5c9 |
* processing slave connections. This is the only way to have the
|
|
Packit |
90a5c9 |
* processing independant of its parent pool in the sense that it
|
|
Packit |
90a5c9 |
* can work in another thread. Also, the new allocator needs its own
|
|
Packit |
90a5c9 |
* mutex to synchronize sub-pools.
|
|
Packit |
90a5c9 |
*/
|
|
Packit |
90a5c9 |
status = apr_allocator_create(&allocator);
|
|
Packit |
90a5c9 |
if (status != APR_SUCCESS) {
|
|
Packit |
90a5c9 |
return NULL;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
apr_allocator_max_free_set(allocator, ap_max_mem_free);
|
|
Packit |
90a5c9 |
apr_pool_create_ex(&m->pool, parent, NULL, allocator);
|
|
Packit |
90a5c9 |
if (!m->pool) {
|
|
Packit |
90a5c9 |
apr_allocator_destroy(allocator);
|
|
Packit |
90a5c9 |
return NULL;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
apr_pool_tag(m->pool, "h2_mplx");
|
|
Packit |
90a5c9 |
apr_allocator_owner_set(allocator, m->pool);
|
|
Packit |
90a5c9 |
status = apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_DEFAULT,
|
|
Packit |
90a5c9 |
m->pool);
|
|
Packit |
90a5c9 |
if (status != APR_SUCCESS) {
|
|
Packit |
90a5c9 |
apr_pool_destroy(m->pool);
|
|
Packit |
90a5c9 |
return NULL;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
apr_allocator_mutex_set(allocator, mutex);
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
status = apr_thread_mutex_create(&m->lock, APR_THREAD_MUTEX_DEFAULT,
|
|
Packit |
90a5c9 |
m->pool);
|
|
Packit |
90a5c9 |
if (status != APR_SUCCESS) {
|
|
Packit |
90a5c9 |
apr_pool_destroy(m->pool);
|
|
Packit |
90a5c9 |
return NULL;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
status = apr_thread_cond_create(&m->task_thawed, m->pool);
|
|
Packit |
90a5c9 |
if (status != APR_SUCCESS) {
|
|
Packit |
90a5c9 |
apr_pool_destroy(m->pool);
|
|
Packit |
90a5c9 |
return NULL;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
m->max_streams = h2_config_geti(conf, H2_CONF_MAX_STREAMS);
|
|
Packit |
90a5c9 |
m->stream_max_mem = h2_config_geti(conf, H2_CONF_STREAM_MAX_MEM);
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
m->streams = h2_ihash_create(m->pool, offsetof(h2_stream,id));
|
|
Packit |
90a5c9 |
m->sredo = h2_ihash_create(m->pool, offsetof(h2_stream,id));
|
|
Packit |
90a5c9 |
m->shold = h2_ihash_create(m->pool, offsetof(h2_stream,id));
|
|
Packit |
90a5c9 |
m->spurge = h2_ihash_create(m->pool, offsetof(h2_stream,id));
|
|
Packit |
90a5c9 |
m->q = h2_iq_create(m->pool, m->max_streams);
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
status = h2_ififo_set_create(&m->readyq, m->pool, m->max_streams);
|
|
Packit |
90a5c9 |
if (status != APR_SUCCESS) {
|
|
Packit |
90a5c9 |
apr_pool_destroy(m->pool);
|
|
Packit |
90a5c9 |
return NULL;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
m->workers = workers;
|
|
Packit |
90a5c9 |
m->max_active = workers->max_workers;
|
|
Packit |
90a5c9 |
m->limit_active = 6; /* the original h1 max parallel connections */
|
|
Packit |
90a5c9 |
m->last_limit_change = m->last_idle_block = apr_time_now();
|
|
Packit |
90a5c9 |
m->limit_change_interval = apr_time_from_msec(100);
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
m->spare_slaves = apr_array_make(m->pool, 10, sizeof(conn_rec*));
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
m->ngn_shed = h2_ngn_shed_create(m->pool, m->c, m->max_streams,
|
|
Packit |
90a5c9 |
m->stream_max_mem);
|
|
Packit |
90a5c9 |
h2_ngn_shed_set_ctx(m->ngn_shed , m);
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
return m;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
int h2_mplx_shutdown(h2_mplx *m)
|
|
Packit |
90a5c9 |
{
|
|
Packit |
90a5c9 |
int max_stream_started = 0;
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
H2_MPLX_ENTER(m);
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
max_stream_started = m->max_stream_started;
|
|
Packit |
90a5c9 |
/* Clear schedule queue, disabling existing streams from starting */
|
|
Packit |
90a5c9 |
h2_iq_clear(m->q);
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
H2_MPLX_LEAVE(m);
|
|
Packit |
90a5c9 |
return max_stream_started;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
static int input_consumed_signal(h2_mplx *m, h2_stream *stream)
|
|
Packit |
90a5c9 |
{
|
|
Packit |
90a5c9 |
if (stream->input) {
|
|
Packit |
90a5c9 |
return h2_beam_report_consumption(stream->input);
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
return 0;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
static int report_consumption_iter(void *ctx, void *val)
|
|
Packit |
90a5c9 |
{
|
|
Packit |
90a5c9 |
h2_stream *stream = val;
|
|
Packit |
90a5c9 |
h2_mplx *m = ctx;
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
input_consumed_signal(m, stream);
|
|
Packit |
90a5c9 |
if (stream->state == H2_SS_CLOSED_L
|
|
Packit |
90a5c9 |
&& (!stream->task || stream->task->worker_done)) {
|
|
Packit |
90a5c9 |
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c,
|
|
Packit |
90a5c9 |
H2_STRM_LOG(APLOGNO(10026), stream, "remote close missing"));
|
|
Packit |
90a5c9 |
nghttp2_submit_rst_stream(stream->session->ngh2, NGHTTP2_FLAG_NONE,
|
|
Packit |
90a5c9 |
stream->id, NGHTTP2_NO_ERROR);
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
return 1;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
static int output_consumed_signal(h2_mplx *m, h2_task *task)
|
|
Packit |
90a5c9 |
{
|
|
Packit |
90a5c9 |
if (task->output.beam) {
|
|
Packit |
90a5c9 |
return h2_beam_report_consumption(task->output.beam);
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
return 0;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
static int stream_destroy_iter(void *ctx, void *val)
|
|
Packit |
90a5c9 |
{
|
|
Packit |
90a5c9 |
h2_mplx *m = ctx;
|
|
Packit |
90a5c9 |
h2_stream *stream = val;
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
h2_ihash_remove(m->spurge, stream->id);
|
|
Packit |
90a5c9 |
ap_assert(stream->state == H2_SS_CLEANUP);
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
if (stream->input) {
|
|
Packit |
90a5c9 |
/* Process outstanding events before destruction */
|
|
Packit |
90a5c9 |
input_consumed_signal(m, stream);
|
|
Packit |
90a5c9 |
h2_beam_log(stream->input, m->c, APLOG_TRACE2, "stream_destroy");
|
|
Packit |
90a5c9 |
h2_beam_destroy(stream->input);
|
|
Packit |
90a5c9 |
stream->input = NULL;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
if (stream->task) {
|
|
Packit |
90a5c9 |
h2_task *task = stream->task;
|
|
Packit |
90a5c9 |
conn_rec *slave;
|
|
Packit |
90a5c9 |
int reuse_slave = 0;
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
stream->task = NULL;
|
|
Packit |
90a5c9 |
slave = task->c;
|
|
Packit |
90a5c9 |
if (slave) {
|
|
Packit |
90a5c9 |
/* On non-serialized requests, the IO logging has not accounted for any
|
|
Packit |
90a5c9 |
* meta data send over the network: response headers and h2 frame headers. we
|
|
Packit |
90a5c9 |
* counted this on the stream and need to add this now.
|
|
Packit |
90a5c9 |
* This is supposed to happen before the EOR bucket triggers the
|
|
Packit |
90a5c9 |
* logging of the transaction. *fingers crossed* */
|
|
Packit |
90a5c9 |
if (task->request && !task->request->serialize && h2_task_logio_add_bytes_out) {
|
|
Packit |
90a5c9 |
apr_off_t unaccounted = stream->out_frame_octets - stream->out_data_octets;
|
|
Packit |
90a5c9 |
if (unaccounted > 0) {
|
|
Packit |
90a5c9 |
h2_task_logio_add_bytes_out(slave, unaccounted);
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
if (m->s->keep_alive_max == 0 || slave->keepalives < m->s->keep_alive_max) {
|
|
Packit |
90a5c9 |
reuse_slave = ((m->spare_slaves->nelts < (m->limit_active * 3 / 2))
|
|
Packit |
90a5c9 |
&& !task->rst_error);
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
if (reuse_slave && slave->keepalive == AP_CONN_KEEPALIVE) {
|
|
Packit |
90a5c9 |
h2_beam_log(task->output.beam, m->c, APLOG_DEBUG,
|
|
Packit |
90a5c9 |
APLOGNO(03385) "h2_task_destroy, reuse slave");
|
|
Packit |
90a5c9 |
h2_task_destroy(task);
|
|
Packit |
90a5c9 |
APR_ARRAY_PUSH(m->spare_slaves, conn_rec*) = slave;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
else {
|
|
Packit |
90a5c9 |
h2_beam_log(task->output.beam, m->c, APLOG_TRACE1,
|
|
Packit |
90a5c9 |
"h2_task_destroy, destroy slave");
|
|
Packit |
90a5c9 |
h2_slave_destroy(slave);
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
h2_stream_destroy(stream);
|
|
Packit |
90a5c9 |
return 0;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
static void purge_streams(h2_mplx *m, int lock)
|
|
Packit |
90a5c9 |
{
|
|
Packit |
90a5c9 |
if (!h2_ihash_empty(m->spurge)) {
|
|
Packit |
90a5c9 |
H2_MPLX_ENTER_MAYBE(m, lock);
|
|
Packit |
90a5c9 |
while (!h2_ihash_iter(m->spurge, stream_destroy_iter, m)) {
|
|
Packit |
90a5c9 |
/* repeat until empty */
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
H2_MPLX_LEAVE_MAYBE(m, lock);
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
typedef struct {
|
|
Packit |
90a5c9 |
h2_mplx_stream_cb *cb;
|
|
Packit |
90a5c9 |
void *ctx;
|
|
Packit |
90a5c9 |
} stream_iter_ctx_t;
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
static int stream_iter_wrap(void *ctx, void *stream)
|
|
Packit |
90a5c9 |
{
|
|
Packit |
90a5c9 |
stream_iter_ctx_t *x = ctx;
|
|
Packit |
90a5c9 |
return x->cb(stream, x->ctx);
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
apr_status_t h2_mplx_stream_do(h2_mplx *m, h2_mplx_stream_cb *cb, void *ctx)
|
|
Packit |
90a5c9 |
{
|
|
Packit |
90a5c9 |
stream_iter_ctx_t x;
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
H2_MPLX_ENTER(m);
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
x.cb = cb;
|
|
Packit |
90a5c9 |
x.ctx = ctx;
|
|
Packit |
90a5c9 |
h2_ihash_iter(m->streams, stream_iter_wrap, &x);
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
H2_MPLX_LEAVE(m);
|
|
Packit |
90a5c9 |
return APR_SUCCESS;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
static int report_stream_iter(void *ctx, void *val) {
|
|
Packit |
90a5c9 |
h2_mplx *m = ctx;
|
|
Packit |
90a5c9 |
h2_stream *stream = val;
|
|
Packit |
90a5c9 |
h2_task *task = stream->task;
|
|
Packit |
90a5c9 |
if (APLOGctrace1(m->c)) {
|
|
Packit |
90a5c9 |
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
|
|
Packit |
90a5c9 |
H2_STRM_MSG(stream, "started=%d, scheduled=%d, ready=%d, out_buffer=%ld"),
|
|
Packit |
90a5c9 |
!!stream->task, stream->scheduled, h2_stream_is_ready(stream),
|
|
Packit |
90a5c9 |
(long)h2_beam_get_buffered(stream->output));
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
if (task) {
|
|
Packit |
90a5c9 |
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, /* NO APLOGNO */
|
|
Packit |
90a5c9 |
H2_STRM_MSG(stream, "->03198: %s %s %s"
|
|
Packit |
90a5c9 |
"[started=%d/done=%d/frozen=%d]"),
|
|
Packit |
90a5c9 |
task->request->method, task->request->authority,
|
|
Packit |
90a5c9 |
task->request->path, task->worker_started,
|
|
Packit |
90a5c9 |
task->worker_done, task->frozen);
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
else {
|
|
Packit |
90a5c9 |
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, /* NO APLOGNO */
|
|
Packit |
90a5c9 |
H2_STRM_MSG(stream, "->03198: no task"));
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
return 1;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
static int unexpected_stream_iter(void *ctx, void *val) {
|
|
Packit |
90a5c9 |
h2_mplx *m = ctx;
|
|
Packit |
90a5c9 |
h2_stream *stream = val;
|
|
Packit |
90a5c9 |
ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */
|
|
Packit |
90a5c9 |
H2_STRM_MSG(stream, "unexpected, started=%d, scheduled=%d, ready=%d"),
|
|
Packit |
90a5c9 |
!!stream->task, stream->scheduled, h2_stream_is_ready(stream));
|
|
Packit |
90a5c9 |
return 1;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
static int stream_cancel_iter(void *ctx, void *val) {
|
|
Packit |
90a5c9 |
h2_mplx *m = ctx;
|
|
Packit |
90a5c9 |
h2_stream *stream = val;
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
/* disabled input consumed reporting */
|
|
Packit |
90a5c9 |
if (stream->input) {
|
|
Packit |
90a5c9 |
h2_beam_on_consumed(stream->input, NULL, NULL, NULL);
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
/* take over event monitoring */
|
|
Packit |
90a5c9 |
h2_stream_set_monitor(stream, NULL);
|
|
Packit |
90a5c9 |
/* Reset, should transit to CLOSED state */
|
|
Packit |
90a5c9 |
h2_stream_rst(stream, H2_ERR_NO_ERROR);
|
|
Packit |
90a5c9 |
/* All connection data has been sent, simulate cleanup */
|
|
Packit |
90a5c9 |
h2_stream_dispatch(stream, H2_SEV_EOS_SENT);
|
|
Packit |
90a5c9 |
stream_cleanup(m, stream);
|
|
Packit |
90a5c9 |
return 0;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
void h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
|
|
Packit |
90a5c9 |
{
|
|
Packit |
90a5c9 |
apr_status_t status;
|
|
Packit |
90a5c9 |
int i, wait_secs = 60;
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
/* How to shut down a h2 connection:
|
|
Packit |
90a5c9 |
* 0. abort and tell the workers that no more tasks will come from us */
|
|
Packit |
90a5c9 |
m->aborted = 1;
|
|
Packit |
90a5c9 |
h2_workers_unregister(m->workers, m);
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
H2_MPLX_ENTER_ALWAYS(m);
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
/* How to shut down a h2 connection:
|
|
Packit |
90a5c9 |
* 1. cancel all streams still active */
|
|
Packit |
90a5c9 |
while (!h2_ihash_iter(m->streams, stream_cancel_iter, m)) {
|
|
Packit |
90a5c9 |
/* until empty */
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
/* 2. terminate ngn_shed, no more streams
|
|
Packit |
90a5c9 |
* should be scheduled or in the active set */
|
|
Packit |
90a5c9 |
h2_ngn_shed_abort(m->ngn_shed);
|
|
Packit |
90a5c9 |
ap_assert(h2_ihash_empty(m->streams));
|
|
Packit |
90a5c9 |
ap_assert(h2_iq_empty(m->q));
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
/* 3. while workers are busy on this connection, meaning they
|
|
Packit |
90a5c9 |
* are processing tasks from this connection, wait on them finishing
|
|
Packit |
90a5c9 |
* in order to wake us and let us check again.
|
|
Packit |
90a5c9 |
* Eventually, this has to succeed. */
|
|
Packit |
90a5c9 |
m->join_wait = wait;
|
|
Packit |
90a5c9 |
for (i = 0; h2_ihash_count(m->shold) > 0; ++i) {
|
|
Packit |
90a5c9 |
status = apr_thread_cond_timedwait(wait, m->lock, apr_time_from_sec(wait_secs));
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
if (APR_STATUS_IS_TIMEUP(status)) {
|
|
Packit |
90a5c9 |
/* This can happen if we have very long running requests
|
|
Packit |
90a5c9 |
* that do not time out on IO. */
|
|
Packit |
90a5c9 |
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(03198)
|
|
Packit |
90a5c9 |
"h2_mplx(%ld): waited %d sec for %d tasks",
|
|
Packit |
90a5c9 |
m->id, i*wait_secs, (int)h2_ihash_count(m->shold));
|
|
Packit |
90a5c9 |
h2_ihash_iter(m->shold, report_stream_iter, m);
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
ap_assert(m->tasks_active == 0);
|
|
Packit |
90a5c9 |
m->join_wait = NULL;
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
/* 4. close the h2_req_enginge shed */
|
|
Packit |
90a5c9 |
h2_ngn_shed_destroy(m->ngn_shed);
|
|
Packit |
90a5c9 |
m->ngn_shed = NULL;
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
/* 4. With all workers done, all streams should be in spurge */
|
|
Packit |
90a5c9 |
if (!h2_ihash_empty(m->shold)) {
|
|
Packit |
90a5c9 |
ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, APLOGNO(03516)
|
|
Packit |
90a5c9 |
"h2_mplx(%ld): unexpected %d streams in hold",
|
|
Packit |
90a5c9 |
m->id, (int)h2_ihash_count(m->shold));
|
|
Packit |
90a5c9 |
h2_ihash_iter(m->shold, unexpected_stream_iter, m);
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
H2_MPLX_LEAVE(m);
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
|
|
Packit |
90a5c9 |
"h2_mplx(%ld): released", m->id);
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
apr_status_t h2_mplx_stream_cleanup(h2_mplx *m, h2_stream *stream)
|
|
Packit |
90a5c9 |
{
|
|
Packit |
90a5c9 |
H2_MPLX_ENTER(m);
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
|
|
Packit |
90a5c9 |
H2_STRM_MSG(stream, "cleanup"));
|
|
Packit |
90a5c9 |
stream_cleanup(m, stream);
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
H2_MPLX_LEAVE(m);
|
|
Packit |
90a5c9 |
return APR_SUCCESS;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
h2_stream *h2_mplx_stream_get(h2_mplx *m, int id)
|
|
Packit |
90a5c9 |
{
|
|
Packit |
90a5c9 |
h2_stream *s = NULL;
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
H2_MPLX_ENTER_ALWAYS(m);
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
s = h2_ihash_get(m->streams, id);
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
H2_MPLX_LEAVE(m);
|
|
Packit |
90a5c9 |
return s;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
static void output_produced(void *ctx, h2_bucket_beam *beam, apr_off_t bytes)
|
|
Packit |
90a5c9 |
{
|
|
Packit |
90a5c9 |
h2_stream *stream = ctx;
|
|
Packit |
90a5c9 |
h2_mplx *m = stream->session->mplx;
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
check_data_for(m, stream, 1);
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
static apr_status_t out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam)
|
|
Packit |
90a5c9 |
{
|
|
Packit |
90a5c9 |
apr_status_t status = APR_SUCCESS;
|
|
Packit |
90a5c9 |
h2_stream *stream = h2_ihash_get(m->streams, stream_id);
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
if (!stream || !stream->task || m->aborted) {
|
|
Packit |
90a5c9 |
return APR_ECONNABORTED;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
ap_assert(stream->output == NULL);
|
|
Packit |
90a5c9 |
stream->output = beam;
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
if (APLOGctrace2(m->c)) {
|
|
Packit |
90a5c9 |
h2_beam_log(beam, m->c, APLOG_TRACE2, "out_open");
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
else {
|
|
Packit |
90a5c9 |
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c,
|
|
Packit |
90a5c9 |
"h2_mplx(%s): out open", stream->task->id);
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
h2_beam_on_consumed(stream->output, NULL, stream_output_consumed, stream);
|
|
Packit |
90a5c9 |
h2_beam_on_produced(stream->output, output_produced, stream);
|
|
Packit |
90a5c9 |
if (stream->task->output.copy_files) {
|
|
Packit |
90a5c9 |
h2_beam_on_file_beam(stream->output, h2_beam_no_files, NULL);
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
/* we might see some file buckets in the output, see
|
|
Packit |
90a5c9 |
* if we have enough handles reserved. */
|
|
Packit |
90a5c9 |
check_data_for(m, stream, 0);
|
|
Packit |
90a5c9 |
return status;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
apr_status_t h2_mplx_out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam)
|
|
Packit |
90a5c9 |
{
|
|
Packit |
90a5c9 |
apr_status_t status;
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
H2_MPLX_ENTER(m);
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
if (m->aborted) {
|
|
Packit |
90a5c9 |
status = APR_ECONNABORTED;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
else {
|
|
Packit |
90a5c9 |
status = out_open(m, stream_id, beam);
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
H2_MPLX_LEAVE(m);
|
|
Packit |
90a5c9 |
return status;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
static apr_status_t out_close(h2_mplx *m, h2_task *task)
|
|
Packit |
90a5c9 |
{
|
|
Packit |
90a5c9 |
apr_status_t status = APR_SUCCESS;
|
|
Packit |
90a5c9 |
h2_stream *stream;
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
if (!task) {
|
|
Packit |
90a5c9 |
return APR_ECONNABORTED;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
if (task->c) {
|
|
Packit |
90a5c9 |
++task->c->keepalives;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
stream = h2_ihash_get(m->streams, task->stream_id);
|
|
Packit |
90a5c9 |
if (!stream) {
|
|
Packit |
90a5c9 |
return APR_ECONNABORTED;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c,
|
|
Packit |
90a5c9 |
"h2_mplx(%s): close", task->id);
|
|
Packit |
90a5c9 |
status = h2_beam_close(task->output.beam);
|
|
Packit |
90a5c9 |
h2_beam_log(task->output.beam, m->c, APLOG_TRACE2, "out_close");
|
|
Packit |
90a5c9 |
output_consumed_signal(m, task);
|
|
Packit |
90a5c9 |
check_data_for(m, stream, 0);
|
|
Packit |
90a5c9 |
return status;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout,
|
|
Packit |
90a5c9 |
apr_thread_cond_t *iowait)
|
|
Packit |
90a5c9 |
{
|
|
Packit |
90a5c9 |
apr_status_t status;
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
H2_MPLX_ENTER(m);
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
if (m->aborted) {
|
|
Packit |
90a5c9 |
status = APR_ECONNABORTED;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
else if (h2_mplx_has_master_events(m)) {
|
|
Packit |
90a5c9 |
status = APR_SUCCESS;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
else {
|
|
Packit |
90a5c9 |
purge_streams(m, 0);
|
|
Packit |
90a5c9 |
h2_ihash_iter(m->streams, report_consumption_iter, m);
|
|
Packit |
90a5c9 |
m->added_output = iowait;
|
|
Packit |
90a5c9 |
status = apr_thread_cond_timedwait(m->added_output, m->lock, timeout);
|
|
Packit |
90a5c9 |
if (APLOGctrace2(m->c)) {
|
|
Packit |
90a5c9 |
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
|
|
Packit |
90a5c9 |
"h2_mplx(%ld): trywait on data for %f ms)",
|
|
Packit |
90a5c9 |
m->id, timeout/1000.0);
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
m->added_output = NULL;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
H2_MPLX_LEAVE(m);
|
|
Packit |
90a5c9 |
return status;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
static void check_data_for(h2_mplx *m, h2_stream *stream, int lock)
|
|
Packit |
90a5c9 |
{
|
|
Packit |
90a5c9 |
if (h2_ififo_push(m->readyq, stream->id) == APR_SUCCESS) {
|
|
Packit |
90a5c9 |
apr_atomic_set32(&m->event_pending, 1);
|
|
Packit |
90a5c9 |
H2_MPLX_ENTER_MAYBE(m, lock);
|
|
Packit |
90a5c9 |
if (m->added_output) {
|
|
Packit |
90a5c9 |
apr_thread_cond_signal(m->added_output);
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
H2_MPLX_LEAVE_MAYBE(m, lock);
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
apr_status_t h2_mplx_reprioritize(h2_mplx *m, h2_stream_pri_cmp *cmp, void *ctx)
|
|
Packit |
90a5c9 |
{
|
|
Packit |
90a5c9 |
apr_status_t status;
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
H2_MPLX_ENTER(m);
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
if (m->aborted) {
|
|
Packit |
90a5c9 |
status = APR_ECONNABORTED;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
else {
|
|
Packit |
90a5c9 |
h2_iq_sort(m->q, cmp, ctx);
|
|
Packit |
90a5c9 |
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
|
|
Packit |
90a5c9 |
"h2_mplx(%ld): reprioritize tasks", m->id);
|
|
Packit |
90a5c9 |
status = APR_SUCCESS;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
H2_MPLX_LEAVE(m);
|
|
Packit |
90a5c9 |
return status;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
static void register_if_needed(h2_mplx *m)
|
|
Packit |
90a5c9 |
{
|
|
Packit |
90a5c9 |
if (!m->aborted && !m->is_registered && !h2_iq_empty(m->q)) {
|
|
Packit |
90a5c9 |
apr_status_t status = h2_workers_register(m->workers, m);
|
|
Packit |
90a5c9 |
if (status == APR_SUCCESS) {
|
|
Packit |
90a5c9 |
m->is_registered = 1;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
else {
|
|
Packit |
90a5c9 |
ap_log_cerror(APLOG_MARK, APLOG_ERR, status, m->c, APLOGNO(10021)
|
|
Packit |
90a5c9 |
"h2_mplx(%ld): register at workers", m->id);
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
apr_status_t h2_mplx_process(h2_mplx *m, struct h2_stream *stream,
|
|
Packit |
90a5c9 |
h2_stream_pri_cmp *cmp, void *ctx)
|
|
Packit |
90a5c9 |
{
|
|
Packit |
90a5c9 |
apr_status_t status;
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
H2_MPLX_ENTER(m);
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
if (m->aborted) {
|
|
Packit |
90a5c9 |
status = APR_ECONNABORTED;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
else {
|
|
Packit |
90a5c9 |
status = APR_SUCCESS;
|
|
Packit |
90a5c9 |
h2_ihash_add(m->streams, stream);
|
|
Packit |
90a5c9 |
if (h2_stream_is_ready(stream)) {
|
|
Packit |
90a5c9 |
/* already have a response */
|
|
Packit |
90a5c9 |
check_data_for(m, stream, 0);
|
|
Packit |
90a5c9 |
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
|
|
Packit |
90a5c9 |
H2_STRM_MSG(stream, "process, add to readyq"));
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
else {
|
|
Packit |
90a5c9 |
h2_iq_add(m->q, stream->id, cmp, ctx);
|
|
Packit |
90a5c9 |
register_if_needed(m);
|
|
Packit |
90a5c9 |
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
|
|
Packit |
90a5c9 |
H2_STRM_MSG(stream, "process, added to q"));
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
H2_MPLX_LEAVE(m);
|
|
Packit |
90a5c9 |
return status;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
static h2_task *next_stream_task(h2_mplx *m)
|
|
Packit |
90a5c9 |
{
|
|
Packit |
90a5c9 |
h2_stream *stream;
|
|
Packit |
90a5c9 |
int sid;
|
|
Packit |
90a5c9 |
while (!m->aborted && (m->tasks_active < m->limit_active)
|
|
Packit |
90a5c9 |
&& (sid = h2_iq_shift(m->q)) > 0) {
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
stream = h2_ihash_get(m->streams, sid);
|
|
Packit |
90a5c9 |
if (stream) {
|
|
Packit |
90a5c9 |
conn_rec *slave, **pslave;
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
pslave = (conn_rec **)apr_array_pop(m->spare_slaves);
|
|
Packit |
90a5c9 |
if (pslave) {
|
|
Packit |
90a5c9 |
slave = *pslave;
|
|
Packit |
90a5c9 |
slave->aborted = 0;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
else {
|
|
Packit |
90a5c9 |
slave = h2_slave_create(m->c, stream->id, m->pool);
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
if (!stream->task) {
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
if (sid > m->max_stream_started) {
|
|
Packit |
90a5c9 |
m->max_stream_started = sid;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
if (stream->input) {
|
|
Packit |
90a5c9 |
h2_beam_on_consumed(stream->input, stream_input_ev,
|
|
Packit |
90a5c9 |
stream_input_consumed, stream);
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
stream->task = h2_task_create(slave, stream->id,
|
|
Packit |
90a5c9 |
stream->request, m, stream->input,
|
|
Packit |
90a5c9 |
stream->session->s->timeout,
|
|
Packit |
90a5c9 |
m->stream_max_mem);
|
|
Packit |
90a5c9 |
if (!stream->task) {
|
|
Packit |
90a5c9 |
ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_ENOMEM, slave,
|
|
Packit |
90a5c9 |
H2_STRM_LOG(APLOGNO(02941), stream,
|
|
Packit |
90a5c9 |
"create task"));
|
|
Packit |
90a5c9 |
return NULL;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
++m->tasks_active;
|
|
Packit |
90a5c9 |
return stream->task;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
return NULL;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
apr_status_t h2_mplx_pop_task(h2_mplx *m, h2_task **ptask)
|
|
Packit |
90a5c9 |
{
|
|
Packit |
90a5c9 |
apr_status_t rv = APR_EOF;
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
*ptask = NULL;
|
|
Packit |
90a5c9 |
ap_assert(m);
|
|
Packit |
90a5c9 |
ap_assert(m->lock);
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
if (APR_SUCCESS != (rv = apr_thread_mutex_lock(m->lock))) {
|
|
Packit |
90a5c9 |
return rv;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
if (m->aborted) {
|
|
Packit |
90a5c9 |
rv = APR_EOF;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
else {
|
|
Packit |
90a5c9 |
*ptask = next_stream_task(m);
|
|
Packit |
90a5c9 |
rv = (*ptask != NULL && !h2_iq_empty(m->q))? APR_EAGAIN : APR_SUCCESS;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
if (APR_EAGAIN != rv) {
|
|
Packit |
90a5c9 |
m->is_registered = 0; /* h2_workers will discard this mplx */
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
H2_MPLX_LEAVE(m);
|
|
Packit |
90a5c9 |
return rv;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)
|
|
Packit |
90a5c9 |
{
|
|
Packit |
90a5c9 |
h2_stream *stream;
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
if (task->frozen) {
|
|
Packit |
90a5c9 |
/* this task was handed over to an engine for processing
|
|
Packit |
90a5c9 |
* and the original worker has finished. That means the
|
|
Packit |
90a5c9 |
* engine may start processing now. */
|
|
Packit |
90a5c9 |
h2_task_thaw(task);
|
|
Packit |
90a5c9 |
apr_thread_cond_broadcast(m->task_thawed);
|
|
Packit |
90a5c9 |
return;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
|
|
Packit |
90a5c9 |
"h2_mplx(%ld): task(%s) done", m->id, task->id);
|
|
Packit |
90a5c9 |
out_close(m, task);
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
if (ngn) {
|
|
Packit |
90a5c9 |
apr_off_t bytes = 0;
|
|
Packit |
90a5c9 |
h2_beam_send(task->output.beam, NULL, APR_NONBLOCK_READ);
|
|
Packit |
90a5c9 |
bytes += h2_beam_get_buffered(task->output.beam);
|
|
Packit |
90a5c9 |
if (bytes > 0) {
|
|
Packit |
90a5c9 |
/* we need to report consumed and current buffered output
|
|
Packit |
90a5c9 |
* to the engine. The request will be streamed out or cancelled,
|
|
Packit |
90a5c9 |
* no more data is coming from it and the engine should update
|
|
Packit |
90a5c9 |
* its calculations before we destroy this information. */
|
|
Packit |
90a5c9 |
h2_req_engine_out_consumed(ngn, task->c, bytes);
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
if (task->engine) {
|
|
Packit |
90a5c9 |
if (!m->aborted && !task->c->aborted
|
|
Packit |
90a5c9 |
&& !h2_req_engine_is_shutdown(task->engine)) {
|
|
Packit |
90a5c9 |
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(10022)
|
|
Packit |
90a5c9 |
"h2_mplx(%ld): task(%s) has not-shutdown "
|
|
Packit |
90a5c9 |
"engine(%s)", m->id, task->id,
|
|
Packit |
90a5c9 |
h2_req_engine_get_id(task->engine));
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
h2_ngn_shed_done_ngn(m->ngn_shed, task->engine);
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
task->worker_done = 1;
|
|
Packit |
90a5c9 |
task->done_at = apr_time_now();
|
|
Packit |
90a5c9 |
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
|
|
Packit |
90a5c9 |
"h2_mplx(%s): request done, %f ms elapsed", task->id,
|
|
Packit |
90a5c9 |
(task->done_at - task->started_at) / 1000.0);
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
if (task->started_at > m->last_idle_block) {
|
|
Packit |
90a5c9 |
/* this task finished without causing an 'idle block', e.g.
|
|
Packit |
90a5c9 |
* a block by flow control.
|
|
Packit |
90a5c9 |
*/
|
|
Packit |
90a5c9 |
if (task->done_at- m->last_limit_change >= m->limit_change_interval
|
|
Packit |
90a5c9 |
&& m->limit_active < m->max_active) {
|
|
Packit |
90a5c9 |
/* Well behaving stream, allow it more workers */
|
|
Packit |
90a5c9 |
m->limit_active = H2MIN(m->limit_active * 2,
|
|
Packit |
90a5c9 |
m->max_active);
|
|
Packit |
90a5c9 |
m->last_limit_change = task->done_at;
|
|
Packit |
90a5c9 |
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
|
|
Packit |
90a5c9 |
"h2_mplx(%ld): increase worker limit to %d",
|
|
Packit |
90a5c9 |
m->id, m->limit_active);
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
stream = h2_ihash_get(m->streams, task->stream_id);
|
|
Packit |
90a5c9 |
if (stream) {
|
|
Packit |
90a5c9 |
/* stream not done yet. */
|
|
Packit |
90a5c9 |
if (!m->aborted && h2_ihash_get(m->sredo, stream->id)) {
|
|
Packit |
90a5c9 |
/* reset and schedule again */
|
|
Packit |
90a5c9 |
h2_task_redo(task);
|
|
Packit |
90a5c9 |
h2_ihash_remove(m->sredo, stream->id);
|
|
Packit |
90a5c9 |
h2_iq_add(m->q, stream->id, NULL, NULL);
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
else {
|
|
Packit |
90a5c9 |
/* stream not cleaned up, stay around */
|
|
Packit |
90a5c9 |
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
|
|
Packit |
90a5c9 |
H2_STRM_MSG(stream, "task_done, stream open"));
|
|
Packit |
90a5c9 |
if (stream->input) {
|
|
Packit |
90a5c9 |
h2_beam_leave(stream->input);
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
/* more data will not arrive, resume the stream */
|
|
Packit |
90a5c9 |
check_data_for(m, stream, 0);
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
else if ((stream = h2_ihash_get(m->shold, task->stream_id)) != NULL) {
|
|
Packit |
90a5c9 |
/* stream is done, was just waiting for this. */
|
|
Packit |
90a5c9 |
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
|
|
Packit |
90a5c9 |
H2_STRM_MSG(stream, "task_done, in hold"));
|
|
Packit |
90a5c9 |
if (stream->input) {
|
|
Packit |
90a5c9 |
h2_beam_leave(stream->input);
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
stream_joined(m, stream);
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
else if ((stream = h2_ihash_get(m->spurge, task->stream_id)) != NULL) {
|
|
Packit |
90a5c9 |
ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
|
|
Packit |
90a5c9 |
H2_STRM_LOG(APLOGNO(03517), stream, "already in spurge"));
|
|
Packit |
90a5c9 |
ap_assert("stream should not be in spurge" == NULL);
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
else {
|
|
Packit |
90a5c9 |
ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, APLOGNO(03518)
|
|
Packit |
90a5c9 |
"h2_mplx(%s): task_done, stream not found",
|
|
Packit |
90a5c9 |
task->id);
|
|
Packit |
90a5c9 |
ap_assert("stream should still be available" == NULL);
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
void h2_mplx_task_done(h2_mplx *m, h2_task *task, h2_task **ptask)
|
|
Packit |
90a5c9 |
{
|
|
Packit |
90a5c9 |
H2_MPLX_ENTER_ALWAYS(m);
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
task_done(m, task, NULL);
|
|
Packit |
90a5c9 |
--m->tasks_active;
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
if (m->join_wait) {
|
|
Packit |
90a5c9 |
apr_thread_cond_signal(m->join_wait);
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
if (ptask) {
|
|
Packit |
90a5c9 |
/* caller wants another task */
|
|
Packit |
90a5c9 |
*ptask = next_stream_task(m);
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
register_if_needed(m);
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
H2_MPLX_LEAVE(m);
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
/*******************************************************************************
|
|
Packit |
90a5c9 |
* h2_mplx DoS protection
|
|
Packit |
90a5c9 |
******************************************************************************/
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
static int latest_repeatable_unsubmitted_iter(void *data, void *val)
|
|
Packit |
90a5c9 |
{
|
|
Packit |
90a5c9 |
stream_iter_ctx *ctx = data;
|
|
Packit |
90a5c9 |
h2_stream *stream = val;
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
if (stream->task && !stream->task->worker_done
|
|
Packit |
90a5c9 |
&& h2_task_can_redo(stream->task)
|
|
Packit |
90a5c9 |
&& !h2_ihash_get(ctx->m->sredo, stream->id)) {
|
|
Packit |
90a5c9 |
if (!h2_stream_is_ready(stream)) {
|
|
Packit |
90a5c9 |
/* this task occupies a worker, the response has not been submitted
|
|
Packit |
90a5c9 |
* yet, not been cancelled and it is a repeatable request
|
|
Packit |
90a5c9 |
* -> it can be re-scheduled later */
|
|
Packit |
90a5c9 |
if (!ctx->stream
|
|
Packit |
90a5c9 |
|| (ctx->stream->task->started_at < stream->task->started_at)) {
|
|
Packit |
90a5c9 |
/* we did not have one or this one was started later */
|
|
Packit |
90a5c9 |
ctx->stream = stream;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
return 1;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
static h2_stream *get_latest_repeatable_unsubmitted_stream(h2_mplx *m)
|
|
Packit |
90a5c9 |
{
|
|
Packit |
90a5c9 |
stream_iter_ctx ctx;
|
|
Packit |
90a5c9 |
ctx.m = m;
|
|
Packit |
90a5c9 |
ctx.stream = NULL;
|
|
Packit |
90a5c9 |
h2_ihash_iter(m->streams, latest_repeatable_unsubmitted_iter, &ctx;;
|
|
Packit |
90a5c9 |
return ctx.stream;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
static int timed_out_busy_iter(void *data, void *val)
|
|
Packit |
90a5c9 |
{
|
|
Packit |
90a5c9 |
stream_iter_ctx *ctx = data;
|
|
Packit |
90a5c9 |
h2_stream *stream = val;
|
|
Packit |
90a5c9 |
if (stream->task && !stream->task->worker_done
|
|
Packit |
90a5c9 |
&& (ctx->now - stream->task->started_at) > stream->task->timeout) {
|
|
Packit |
90a5c9 |
/* timed out stream occupying a worker, found */
|
|
Packit |
90a5c9 |
ctx->stream = stream;
|
|
Packit |
90a5c9 |
return 0;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
return 1;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
static h2_stream *get_timed_out_busy_stream(h2_mplx *m)
|
|
Packit |
90a5c9 |
{
|
|
Packit |
90a5c9 |
stream_iter_ctx ctx;
|
|
Packit |
90a5c9 |
ctx.m = m;
|
|
Packit |
90a5c9 |
ctx.stream = NULL;
|
|
Packit |
90a5c9 |
ctx.now = apr_time_now();
|
|
Packit |
90a5c9 |
h2_ihash_iter(m->streams, timed_out_busy_iter, &ctx;;
|
|
Packit |
90a5c9 |
return ctx.stream;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
static apr_status_t unschedule_slow_tasks(h2_mplx *m)
|
|
Packit |
90a5c9 |
{
|
|
Packit |
90a5c9 |
h2_stream *stream;
|
|
Packit |
90a5c9 |
int n;
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
/* Try to get rid of streams that occupy workers. Look for safe requests
|
|
Packit |
90a5c9 |
* that are repeatable. If none found, fail the connection.
|
|
Packit |
90a5c9 |
*/
|
|
Packit |
90a5c9 |
n = (m->tasks_active - m->limit_active - (int)h2_ihash_count(m->sredo));
|
|
Packit |
90a5c9 |
while (n > 0 && (stream = get_latest_repeatable_unsubmitted_stream(m))) {
|
|
Packit |
90a5c9 |
h2_task_rst(stream->task, H2_ERR_CANCEL);
|
|
Packit |
90a5c9 |
h2_ihash_add(m->sredo, stream);
|
|
Packit |
90a5c9 |
--n;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
if ((m->tasks_active - h2_ihash_count(m->sredo)) > m->limit_active) {
|
|
Packit |
90a5c9 |
h2_stream *stream = get_timed_out_busy_stream(m);
|
|
Packit |
90a5c9 |
if (stream) {
|
|
Packit |
90a5c9 |
/* Too many busy workers, unable to cancel enough streams
|
|
Packit |
90a5c9 |
* and with a busy, timed out stream, we tell the client
|
|
Packit |
90a5c9 |
* to go away... */
|
|
Packit |
90a5c9 |
return APR_TIMEUP;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
return APR_SUCCESS;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
apr_status_t h2_mplx_idle(h2_mplx *m)
|
|
Packit |
90a5c9 |
{
|
|
Packit |
90a5c9 |
apr_status_t status = APR_SUCCESS;
|
|
Packit |
90a5c9 |
apr_time_t now;
|
|
Packit |
90a5c9 |
apr_size_t scount;
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
H2_MPLX_ENTER(m);
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
scount = h2_ihash_count(m->streams);
|
|
Packit |
90a5c9 |
if (scount > 0) {
|
|
Packit |
90a5c9 |
if (m->tasks_active) {
|
|
Packit |
90a5c9 |
/* If we have streams in connection state 'IDLE', meaning
|
|
Packit |
90a5c9 |
* all streams are ready to sent data out, but lack
|
|
Packit |
90a5c9 |
* WINDOW_UPDATEs.
|
|
Packit |
90a5c9 |
*
|
|
Packit |
90a5c9 |
* This is ok, unless we have streams that still occupy
|
|
Packit |
90a5c9 |
* h2 workers. As worker threads are a scarce resource,
|
|
Packit |
90a5c9 |
* we need to take measures that we do not get DoSed.
|
|
Packit |
90a5c9 |
*
|
|
Packit |
90a5c9 |
* This is what we call an 'idle block'. Limit the amount
|
|
Packit |
90a5c9 |
* of busy workers we allow for this connection until it
|
|
Packit |
90a5c9 |
* well behaves.
|
|
Packit |
90a5c9 |
*/
|
|
Packit |
90a5c9 |
now = apr_time_now();
|
|
Packit |
90a5c9 |
m->last_idle_block = now;
|
|
Packit |
90a5c9 |
if (m->limit_active > 2
|
|
Packit |
90a5c9 |
&& now - m->last_limit_change >= m->limit_change_interval) {
|
|
Packit |
90a5c9 |
if (m->limit_active > 16) {
|
|
Packit |
90a5c9 |
m->limit_active = 16;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
else if (m->limit_active > 8) {
|
|
Packit |
90a5c9 |
m->limit_active = 8;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
else if (m->limit_active > 4) {
|
|
Packit |
90a5c9 |
m->limit_active = 4;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
else if (m->limit_active > 2) {
|
|
Packit |
90a5c9 |
m->limit_active = 2;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
m->last_limit_change = now;
|
|
Packit |
90a5c9 |
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
|
|
Packit |
90a5c9 |
"h2_mplx(%ld): decrease worker limit to %d",
|
|
Packit |
90a5c9 |
m->id, m->limit_active);
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
if (m->tasks_active > m->limit_active) {
|
|
Packit |
90a5c9 |
status = unschedule_slow_tasks(m);
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
else if (!h2_iq_empty(m->q)) {
|
|
Packit |
90a5c9 |
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
|
|
Packit |
90a5c9 |
"h2_mplx(%ld): idle, but %d streams to process",
|
|
Packit |
90a5c9 |
m->id, (int)h2_iq_count(m->q));
|
|
Packit |
90a5c9 |
status = APR_EAGAIN;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
else {
|
|
Packit |
90a5c9 |
/* idle, have streams, but no tasks active. what are we waiting for?
|
|
Packit |
90a5c9 |
* WINDOW_UPDATEs from client? */
|
|
Packit |
90a5c9 |
h2_stream *stream = NULL;
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
|
|
Packit |
90a5c9 |
"h2_mplx(%ld): idle, no tasks ongoing, %d streams",
|
|
Packit |
90a5c9 |
m->id, (int)h2_ihash_count(m->streams));
|
|
Packit |
90a5c9 |
h2_ihash_shift(m->streams, (void**)&stream, 1);
|
|
Packit |
90a5c9 |
if (stream) {
|
|
Packit |
90a5c9 |
h2_ihash_add(m->streams, stream);
|
|
Packit |
90a5c9 |
if (stream->output && !stream->out_checked) {
|
|
Packit |
90a5c9 |
/* FIXME: this looks like a race between the session thinking
|
|
Packit |
90a5c9 |
* it is idle and the EOF on a stream not being sent.
|
|
Packit |
90a5c9 |
* Signal to caller to leave IDLE state.
|
|
Packit |
90a5c9 |
*/
|
|
Packit |
90a5c9 |
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
|
|
Packit |
90a5c9 |
H2_STRM_MSG(stream, "output closed=%d, mplx idle"
|
|
Packit |
90a5c9 |
", out has %ld bytes buffered"),
|
|
Packit |
90a5c9 |
h2_beam_is_closed(stream->output),
|
|
Packit |
90a5c9 |
(long)h2_beam_get_buffered(stream->output));
|
|
Packit |
90a5c9 |
h2_ihash_add(m->streams, stream);
|
|
Packit |
90a5c9 |
check_data_for(m, stream, 0);
|
|
Packit |
90a5c9 |
stream->out_checked = 1;
|
|
Packit |
90a5c9 |
status = APR_EAGAIN;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
register_if_needed(m);
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
H2_MPLX_LEAVE(m);
|
|
Packit |
90a5c9 |
return status;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
/*******************************************************************************
|
|
Packit |
90a5c9 |
* HTTP/2 request engines
|
|
Packit |
90a5c9 |
******************************************************************************/
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
typedef struct {
|
|
Packit |
90a5c9 |
h2_mplx * m;
|
|
Packit |
90a5c9 |
h2_req_engine *ngn;
|
|
Packit |
90a5c9 |
int streams_updated;
|
|
Packit |
90a5c9 |
} ngn_update_ctx;
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
static int ngn_update_window(void *ctx, void *val)
|
|
Packit |
90a5c9 |
{
|
|
Packit |
90a5c9 |
ngn_update_ctx *uctx = ctx;
|
|
Packit |
90a5c9 |
h2_stream *stream = val;
|
|
Packit |
90a5c9 |
if (stream->task && stream->task->assigned == uctx->ngn
|
|
Packit |
90a5c9 |
&& output_consumed_signal(uctx->m, stream->task)) {
|
|
Packit |
90a5c9 |
++uctx->streams_updated;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
return 1;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
static apr_status_t ngn_out_update_windows(h2_mplx *m, h2_req_engine *ngn)
|
|
Packit |
90a5c9 |
{
|
|
Packit |
90a5c9 |
ngn_update_ctx ctx;
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
ctx.m = m;
|
|
Packit |
90a5c9 |
ctx.ngn = ngn;
|
|
Packit |
90a5c9 |
ctx.streams_updated = 0;
|
|
Packit |
90a5c9 |
h2_ihash_iter(m->streams, ngn_update_window, &ctx;;
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
return ctx.streams_updated? APR_SUCCESS : APR_EAGAIN;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
apr_status_t h2_mplx_req_engine_push(const char *ngn_type,
|
|
Packit |
90a5c9 |
request_rec *r,
|
|
Packit |
90a5c9 |
http2_req_engine_init *einit)
|
|
Packit |
90a5c9 |
{
|
|
Packit |
90a5c9 |
apr_status_t status;
|
|
Packit |
90a5c9 |
h2_mplx *m;
|
|
Packit |
90a5c9 |
h2_task *task;
|
|
Packit |
90a5c9 |
h2_stream *stream;
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
task = h2_ctx_rget_task(r);
|
|
Packit |
90a5c9 |
if (!task) {
|
|
Packit |
90a5c9 |
return APR_ECONNABORTED;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
m = task->mplx;
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
H2_MPLX_ENTER(m);
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
stream = h2_ihash_get(m->streams, task->stream_id);
|
|
Packit |
90a5c9 |
if (stream) {
|
|
Packit |
90a5c9 |
status = h2_ngn_shed_push_request(m->ngn_shed, ngn_type, r, einit);
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
else {
|
|
Packit |
90a5c9 |
status = APR_ECONNABORTED;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
H2_MPLX_LEAVE(m);
|
|
Packit |
90a5c9 |
return status;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
apr_status_t h2_mplx_req_engine_pull(h2_req_engine *ngn,
|
|
Packit |
90a5c9 |
apr_read_type_e block,
|
|
Packit |
90a5c9 |
int capacity,
|
|
Packit |
90a5c9 |
request_rec **pr)
|
|
Packit |
90a5c9 |
{
|
|
Packit |
90a5c9 |
h2_ngn_shed *shed = h2_ngn_shed_get_shed(ngn);
|
|
Packit |
90a5c9 |
h2_mplx *m = h2_ngn_shed_get_ctx(shed);
|
|
Packit |
90a5c9 |
apr_status_t status;
|
|
Packit |
90a5c9 |
int want_shutdown;
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
H2_MPLX_ENTER(m);
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
want_shutdown = (block == APR_BLOCK_READ);
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
/* Take this opportunity to update output consummation
|
|
Packit |
90a5c9 |
* for this engine */
|
|
Packit |
90a5c9 |
ngn_out_update_windows(m, ngn);
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
if (want_shutdown && !h2_iq_empty(m->q)) {
|
|
Packit |
90a5c9 |
/* For a blocking read, check first if requests are to be
|
|
Packit |
90a5c9 |
* had and, if not, wait a short while before doing the
|
|
Packit |
90a5c9 |
* blocking, and if unsuccessful, terminating read.
|
|
Packit |
90a5c9 |
*/
|
|
Packit |
90a5c9 |
status = h2_ngn_shed_pull_request(shed, ngn, capacity, 1, pr);
|
|
Packit |
90a5c9 |
if (APR_STATUS_IS_EAGAIN(status)) {
|
|
Packit |
90a5c9 |
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
|
|
Packit |
90a5c9 |
"h2_mplx(%ld): start block engine pull", m->id);
|
|
Packit |
90a5c9 |
apr_thread_cond_timedwait(m->task_thawed, m->lock,
|
|
Packit |
90a5c9 |
apr_time_from_msec(20));
|
|
Packit |
90a5c9 |
status = h2_ngn_shed_pull_request(shed, ngn, capacity, 1, pr);
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
else {
|
|
Packit |
90a5c9 |
status = h2_ngn_shed_pull_request(shed, ngn, capacity,
|
|
Packit |
90a5c9 |
want_shutdown, pr);
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
H2_MPLX_LEAVE(m);
|
|
Packit |
90a5c9 |
return status;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
void h2_mplx_req_engine_done(h2_req_engine *ngn, conn_rec *r_conn,
|
|
Packit |
90a5c9 |
apr_status_t status)
|
|
Packit |
90a5c9 |
{
|
|
Packit |
90a5c9 |
h2_task *task = h2_ctx_cget_task(r_conn);
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
if (task) {
|
|
Packit |
90a5c9 |
h2_mplx *m = task->mplx;
|
|
Packit |
90a5c9 |
h2_stream *stream;
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
H2_MPLX_ENTER_ALWAYS(m);
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
stream = h2_ihash_get(m->streams, task->stream_id);
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
ngn_out_update_windows(m, ngn);
|
|
Packit |
90a5c9 |
h2_ngn_shed_done_task(m->ngn_shed, ngn, task);
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
if (status != APR_SUCCESS && stream
|
|
Packit |
90a5c9 |
&& h2_task_can_redo(task)
|
|
Packit |
90a5c9 |
&& !h2_ihash_get(m->sredo, stream->id)) {
|
|
Packit |
90a5c9 |
h2_ihash_add(m->sredo, stream);
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
if (task->engine) {
|
|
Packit |
90a5c9 |
/* cannot report that as done until engine returns */
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
else {
|
|
Packit |
90a5c9 |
task_done(m, task, ngn);
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
H2_MPLX_LEAVE(m);
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
/*******************************************************************************
|
|
Packit |
90a5c9 |
* mplx master events dispatching
|
|
Packit |
90a5c9 |
******************************************************************************/
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
int h2_mplx_has_master_events(h2_mplx *m)
|
|
Packit |
90a5c9 |
{
|
|
Packit |
90a5c9 |
return apr_atomic_read32(&m->event_pending) > 0;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m,
|
|
Packit |
90a5c9 |
stream_ev_callback *on_resume,
|
|
Packit |
90a5c9 |
void *on_ctx)
|
|
Packit |
90a5c9 |
{
|
|
Packit |
90a5c9 |
h2_stream *stream;
|
|
Packit |
90a5c9 |
int n, id;
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
|
|
Packit |
90a5c9 |
"h2_mplx(%ld): dispatch events", m->id);
|
|
Packit |
90a5c9 |
apr_atomic_set32(&m->event_pending, 0);
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
/* update input windows for streams */
|
|
Packit |
90a5c9 |
h2_ihash_iter(m->streams, report_consumption_iter, m);
|
|
Packit |
90a5c9 |
purge_streams(m, 1);
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
n = h2_ififo_count(m->readyq);
|
|
Packit |
90a5c9 |
while (n > 0
|
|
Packit |
90a5c9 |
&& (h2_ififo_try_pull(m->readyq, &id) == APR_SUCCESS)) {
|
|
Packit |
90a5c9 |
--n;
|
|
Packit |
90a5c9 |
stream = h2_ihash_get(m->streams, id);
|
|
Packit |
90a5c9 |
if (stream) {
|
|
Packit |
90a5c9 |
on_resume(on_ctx, stream);
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
return APR_SUCCESS;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
apr_status_t h2_mplx_keep_active(h2_mplx *m, h2_stream *stream)
|
|
Packit |
90a5c9 |
{
|
|
Packit |
90a5c9 |
check_data_for(m, stream, 1);
|
|
Packit |
90a5c9 |
return APR_SUCCESS;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
int h2_mplx_awaits_data(h2_mplx *m)
|
|
Packit |
90a5c9 |
{
|
|
Packit |
90a5c9 |
int waiting = 1;
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
H2_MPLX_ENTER_ALWAYS(m);
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
if (h2_ihash_empty(m->streams)) {
|
|
Packit |
90a5c9 |
waiting = 0;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
else if (!m->tasks_active && !h2_ififo_count(m->readyq)
|
|
Packit |
90a5c9 |
&& h2_iq_empty(m->q)) {
|
|
Packit |
90a5c9 |
waiting = 0;
|
|
Packit |
90a5c9 |
}
|
|
Packit |
90a5c9 |
|
|
Packit |
90a5c9 |
H2_MPLX_LEAVE(m);
|
|
Packit |
90a5c9 |
return waiting;
|
|
Packit |
90a5c9 |
}
|