Blame modules/http2/h2_bucket_beam.c

Packit 90a5c9
/* Licensed to the Apache Software Foundation (ASF) under one or more
Packit 90a5c9
 * contributor license agreements.  See the NOTICE file distributed with
Packit 90a5c9
 * this work for additional information regarding copyright ownership.
Packit 90a5c9
 * The ASF licenses this file to You under the Apache License, Version 2.0
Packit 90a5c9
 * (the "License"); you may not use this file except in compliance with
Packit 90a5c9
 * the License.  You may obtain a copy of the License at
Packit 90a5c9
 *
Packit 90a5c9
 *     http://www.apache.org/licenses/LICENSE-2.0
Packit 90a5c9
 *
Packit 90a5c9
 * Unless required by applicable law or agreed to in writing, software
Packit 90a5c9
 * distributed under the License is distributed on an "AS IS" BASIS,
Packit 90a5c9
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Packit 90a5c9
 * See the License for the specific language governing permissions and
Packit 90a5c9
 * limitations under the License.
Packit 90a5c9
 */
Packit 90a5c9
 
Packit 90a5c9
#include <apr_lib.h>
Packit 90a5c9
#include <apr_atomic.h>
Packit 90a5c9
#include <apr_strings.h>
Packit 90a5c9
#include <apr_time.h>
Packit 90a5c9
#include <apr_buckets.h>
Packit 90a5c9
#include <apr_thread_mutex.h>
Packit 90a5c9
#include <apr_thread_cond.h>
Packit 90a5c9
Packit 90a5c9
#include <httpd.h>
Packit 90a5c9
#include <http_protocol.h>
Packit 90a5c9
#include <http_log.h>
Packit 90a5c9
Packit 90a5c9
#include "h2_private.h"
Packit 90a5c9
#include "h2_util.h"
Packit 90a5c9
#include "h2_bucket_beam.h"
Packit 90a5c9
Packit 90a5c9
static void h2_beam_emitted(h2_bucket_beam *beam, h2_beam_proxy *proxy);
Packit 90a5c9
Packit 90a5c9
#define H2_BPROXY_NEXT(e)             APR_RING_NEXT((e), link)
Packit 90a5c9
#define H2_BPROXY_PREV(e)             APR_RING_PREV((e), link)
Packit 90a5c9
#define H2_BPROXY_REMOVE(e)           APR_RING_REMOVE((e), link)
Packit 90a5c9
Packit 90a5c9
#define H2_BPROXY_LIST_INIT(b)        APR_RING_INIT(&(b)->list, h2_beam_proxy, link);
Packit 90a5c9
#define H2_BPROXY_LIST_SENTINEL(b)    APR_RING_SENTINEL(&(b)->list, h2_beam_proxy, link)
Packit 90a5c9
#define H2_BPROXY_LIST_EMPTY(b)       APR_RING_EMPTY(&(b)->list, h2_beam_proxy, link)
Packit 90a5c9
#define H2_BPROXY_LIST_FIRST(b)       APR_RING_FIRST(&(b)->list)
Packit 90a5c9
#define H2_BPROXY_LIST_LAST(b)	      APR_RING_LAST(&(b)->list)
Packit 90a5c9
#define H2_PROXY_BLIST_INSERT_HEAD(b, e) do {				\
Packit 90a5c9
	h2_beam_proxy *ap__b = (e);                                        \
Packit 90a5c9
	APR_RING_INSERT_HEAD(&(b)->list, ap__b, h2_beam_proxy, link);	\
Packit 90a5c9
    } while (0)
Packit 90a5c9
#define H2_BPROXY_LIST_INSERT_TAIL(b, e) do {				\
Packit 90a5c9
	h2_beam_proxy *ap__b = (e);					\
Packit 90a5c9
	APR_RING_INSERT_TAIL(&(b)->list, ap__b, h2_beam_proxy, link);	\
Packit 90a5c9
    } while (0)
Packit 90a5c9
#define H2_BPROXY_LIST_CONCAT(a, b) do {					\
Packit 90a5c9
        APR_RING_CONCAT(&(a)->list, &(b)->list, h2_beam_proxy, link);	\
Packit 90a5c9
    } while (0)
Packit 90a5c9
#define H2_BPROXY_LIST_PREPEND(a, b) do {					\
Packit 90a5c9
        APR_RING_PREPEND(&(a)->list, &(b)->list, h2_beam_proxy, link);	\
Packit 90a5c9
    } while (0)
Packit 90a5c9
Packit 90a5c9
Packit 90a5c9
/*******************************************************************************
Packit 90a5c9
 * beam bucket with reference to beam and bucket it represents
Packit 90a5c9
 ******************************************************************************/
Packit 90a5c9
Packit 90a5c9
const apr_bucket_type_t h2_bucket_type_beam;
Packit 90a5c9
Packit 90a5c9
#define H2_BUCKET_IS_BEAM(e)     (e->type == &h2_bucket_type_beam)
Packit 90a5c9
Packit 90a5c9
struct h2_beam_proxy {
Packit 90a5c9
    apr_bucket_refcount refcount;
Packit 90a5c9
    APR_RING_ENTRY(h2_beam_proxy) link;
Packit 90a5c9
    h2_bucket_beam *beam;
Packit 90a5c9
    apr_bucket *bsender;
Packit 90a5c9
    apr_size_t n;
Packit 90a5c9
};
Packit 90a5c9
Packit 90a5c9
static const char Dummy = '\0';
Packit 90a5c9
Packit 90a5c9
static apr_status_t beam_bucket_read(apr_bucket *b, const char **str, 
Packit 90a5c9
                                     apr_size_t *len, apr_read_type_e block)
Packit 90a5c9
{
Packit 90a5c9
    h2_beam_proxy *d = b->data;
Packit 90a5c9
    if (d->bsender) {
Packit 90a5c9
        const char *data;
Packit 90a5c9
        apr_status_t status = apr_bucket_read(d->bsender, &data, len, block);
Packit 90a5c9
        if (status == APR_SUCCESS) {
Packit 90a5c9
            *str = data + b->start;
Packit 90a5c9
            *len = b->length;
Packit 90a5c9
        }
Packit 90a5c9
        return status;
Packit 90a5c9
    }
Packit 90a5c9
    *str = &Dummy;
Packit 90a5c9
    *len = 0;
Packit 90a5c9
    return APR_ECONNRESET;
Packit 90a5c9
}
Packit 90a5c9
Packit 90a5c9
static void beam_bucket_destroy(void *data)
Packit 90a5c9
{
Packit 90a5c9
    h2_beam_proxy *d = data;
Packit 90a5c9
Packit 90a5c9
    if (apr_bucket_shared_destroy(d)) {
Packit 90a5c9
        /* When the beam gets destroyed before this bucket, it will
Packit 90a5c9
         * NULLify its reference here. This is not protected by a mutex,
Packit 90a5c9
         * so it will not help with race conditions.
Packit 90a5c9
         * But it lets us shut down memory pool with circulare beam
Packit 90a5c9
         * references. */
Packit 90a5c9
        if (d->beam) {
Packit 90a5c9
            h2_beam_emitted(d->beam, d);
Packit 90a5c9
        }
Packit 90a5c9
        apr_bucket_free(d);
Packit 90a5c9
    }
Packit 90a5c9
}
Packit 90a5c9
Packit 90a5c9
static apr_bucket * h2_beam_bucket_make(apr_bucket *b, 
Packit 90a5c9
                                        h2_bucket_beam *beam,
Packit 90a5c9
                                        apr_bucket *bsender, apr_size_t n)
Packit 90a5c9
{
Packit 90a5c9
    h2_beam_proxy *d;
Packit 90a5c9
Packit 90a5c9
    d = apr_bucket_alloc(sizeof(*d), b->list);
Packit 90a5c9
    H2_BPROXY_LIST_INSERT_TAIL(&beam->proxies, d);
Packit 90a5c9
    d->beam = beam;
Packit 90a5c9
    d->bsender = bsender;
Packit 90a5c9
    d->n = n;
Packit 90a5c9
    
Packit 90a5c9
    b = apr_bucket_shared_make(b, d, 0, bsender? bsender->length : 0);
Packit 90a5c9
    b->type = &h2_bucket_type_beam;
Packit 90a5c9
Packit 90a5c9
    return b;
Packit 90a5c9
}
Packit 90a5c9
Packit 90a5c9
static apr_bucket *h2_beam_bucket_create(h2_bucket_beam *beam,
Packit 90a5c9
                                         apr_bucket *bsender,
Packit 90a5c9
                                         apr_bucket_alloc_t *list,
Packit 90a5c9
                                         apr_size_t n)
Packit 90a5c9
{
Packit 90a5c9
    apr_bucket *b = apr_bucket_alloc(sizeof(*b), list);
Packit 90a5c9
Packit 90a5c9
    APR_BUCKET_INIT(b);
Packit 90a5c9
    b->free = apr_bucket_free;
Packit 90a5c9
    b->list = list;
Packit 90a5c9
    return h2_beam_bucket_make(b, beam, bsender, n);
Packit 90a5c9
}
Packit 90a5c9
Packit 90a5c9
const apr_bucket_type_t h2_bucket_type_beam = {
Packit 90a5c9
    "BEAM", 5, APR_BUCKET_DATA,
Packit 90a5c9
    beam_bucket_destroy,
Packit 90a5c9
    beam_bucket_read,
Packit 90a5c9
    apr_bucket_setaside_noop,
Packit 90a5c9
    apr_bucket_shared_split,
Packit 90a5c9
    apr_bucket_shared_copy
Packit 90a5c9
};
Packit 90a5c9
Packit 90a5c9
/*******************************************************************************
Packit 90a5c9
 * h2_blist, a brigade without allocations
Packit 90a5c9
 ******************************************************************************/
Packit 90a5c9
Packit 90a5c9
static apr_array_header_t *beamers;
Packit 90a5c9
Packit 90a5c9
static apr_status_t cleanup_beamers(void *dummy)
Packit 90a5c9
{
Packit 90a5c9
    (void)dummy;
Packit 90a5c9
    beamers = NULL;
Packit 90a5c9
    return APR_SUCCESS;
Packit 90a5c9
}
Packit 90a5c9
Packit 90a5c9
void h2_register_bucket_beamer(h2_bucket_beamer *beamer)
Packit 90a5c9
{
Packit 90a5c9
    if (!beamers) {
Packit 90a5c9
        apr_pool_cleanup_register(apr_hook_global_pool, NULL,
Packit 90a5c9
                                  cleanup_beamers, apr_pool_cleanup_null);
Packit 90a5c9
        beamers = apr_array_make(apr_hook_global_pool, 10, 
Packit 90a5c9
                                 sizeof(h2_bucket_beamer*));
Packit 90a5c9
    }
Packit 90a5c9
    APR_ARRAY_PUSH(beamers, h2_bucket_beamer*) = beamer;
Packit 90a5c9
}
Packit 90a5c9
Packit 90a5c9
static apr_bucket *h2_beam_bucket(h2_bucket_beam *beam, 
Packit 90a5c9
                                  apr_bucket_brigade *dest,
Packit 90a5c9
                                  const apr_bucket *src)
Packit 90a5c9
{
Packit 90a5c9
    apr_bucket *b = NULL;
Packit 90a5c9
    int i;
Packit 90a5c9
    if (beamers) {
Packit 90a5c9
        for (i = 0; i < beamers->nelts && b == NULL; ++i) {
Packit 90a5c9
            h2_bucket_beamer *beamer;
Packit 90a5c9
            
Packit 90a5c9
            beamer = APR_ARRAY_IDX(beamers, i, h2_bucket_beamer*);
Packit 90a5c9
            b = beamer(beam, dest, src);
Packit 90a5c9
        }
Packit 90a5c9
    }
Packit 90a5c9
    return b;
Packit 90a5c9
}
Packit 90a5c9
Packit 90a5c9
Packit 90a5c9
/*******************************************************************************
Packit 90a5c9
 * bucket beam that can transport buckets across threads
Packit 90a5c9
 ******************************************************************************/
Packit 90a5c9
Packit 90a5c9
static void mutex_leave(void *ctx, apr_thread_mutex_t *lock)
Packit 90a5c9
{
Packit 90a5c9
    apr_thread_mutex_unlock(lock);
Packit 90a5c9
}
Packit 90a5c9
Packit 90a5c9
static apr_status_t mutex_enter(void *ctx, h2_beam_lock *pbl)
Packit 90a5c9
{
Packit 90a5c9
    h2_bucket_beam *beam = ctx;
Packit 90a5c9
    pbl->mutex = beam->lock;
Packit 90a5c9
    pbl->leave = mutex_leave;
Packit 90a5c9
    return apr_thread_mutex_lock(pbl->mutex);
Packit 90a5c9
}
Packit 90a5c9
Packit 90a5c9
static apr_status_t enter_yellow(h2_bucket_beam *beam, h2_beam_lock *pbl)
Packit 90a5c9
{
Packit 90a5c9
    return mutex_enter(beam, pbl);
Packit 90a5c9
}
Packit 90a5c9
Packit 90a5c9
static void leave_yellow(h2_bucket_beam *beam, h2_beam_lock *pbl)
Packit 90a5c9
{
Packit 90a5c9
    if (pbl->leave) {
Packit 90a5c9
        pbl->leave(pbl->leave_ctx, pbl->mutex);
Packit 90a5c9
    }
Packit 90a5c9
}
Packit 90a5c9
Packit 90a5c9
static apr_off_t bucket_mem_used(apr_bucket *b)
Packit 90a5c9
{
Packit 90a5c9
    if (APR_BUCKET_IS_FILE(b)) {
Packit 90a5c9
        return 0;
Packit 90a5c9
    }
Packit 90a5c9
    else {
Packit 90a5c9
        /* should all have determinate length */
Packit 90a5c9
        return b->length;
Packit 90a5c9
    }
Packit 90a5c9
}
Packit 90a5c9
Packit 90a5c9
static int report_consumption(h2_bucket_beam *beam, h2_beam_lock *pbl)
Packit 90a5c9
{
Packit 90a5c9
    int rv = 0;
Packit 90a5c9
    apr_off_t len = beam->received_bytes - beam->cons_bytes_reported;
Packit 90a5c9
    h2_beam_io_callback *cb = beam->cons_io_cb;
Packit 90a5c9
     
Packit 90a5c9
    if (len > 0) {
Packit 90a5c9
        if (cb) {
Packit 90a5c9
            void *ctx = beam->cons_ctx;
Packit 90a5c9
            
Packit 90a5c9
            if (pbl) leave_yellow(beam, pbl);
Packit 90a5c9
            cb(ctx, beam, len);
Packit 90a5c9
            if (pbl) enter_yellow(beam, pbl);
Packit 90a5c9
            rv = 1;
Packit 90a5c9
        }
Packit 90a5c9
        beam->cons_bytes_reported += len;
Packit 90a5c9
    }
Packit 90a5c9
    return rv;
Packit 90a5c9
}
Packit 90a5c9
Packit 90a5c9
static void report_prod_io(h2_bucket_beam *beam, int force, h2_beam_lock *pbl)
Packit 90a5c9
{
Packit 90a5c9
    apr_off_t len = beam->sent_bytes - beam->prod_bytes_reported;
Packit 90a5c9
    if (force || len > 0) {
Packit 90a5c9
        h2_beam_io_callback *cb = beam->prod_io_cb; 
Packit 90a5c9
        if (cb) {
Packit 90a5c9
            void *ctx = beam->prod_ctx;
Packit 90a5c9
            
Packit 90a5c9
            leave_yellow(beam, pbl);
Packit 90a5c9
            cb(ctx, beam, len);
Packit 90a5c9
            enter_yellow(beam, pbl);
Packit 90a5c9
        }
Packit 90a5c9
        beam->prod_bytes_reported += len;
Packit 90a5c9
    }
Packit 90a5c9
}
Packit 90a5c9
Packit 90a5c9
static apr_size_t calc_buffered(h2_bucket_beam *beam)
Packit 90a5c9
{
Packit 90a5c9
    apr_size_t len = 0;
Packit 90a5c9
    apr_bucket *b;
Packit 90a5c9
    for (b = H2_BLIST_FIRST(&beam->send_list); 
Packit 90a5c9
         b != H2_BLIST_SENTINEL(&beam->send_list);
Packit 90a5c9
         b = APR_BUCKET_NEXT(b)) {
Packit 90a5c9
        if (b->length == ((apr_size_t)-1)) {
Packit 90a5c9
            /* do not count */
Packit 90a5c9
        }
Packit 90a5c9
        else if (APR_BUCKET_IS_FILE(b)) {
Packit 90a5c9
            /* if unread, has no real mem footprint. */
Packit 90a5c9
        }
Packit 90a5c9
        else {
Packit 90a5c9
            len += b->length;
Packit 90a5c9
        }
Packit 90a5c9
    }
Packit 90a5c9
    return len;
Packit 90a5c9
}
Packit 90a5c9
Packit 90a5c9
static void r_purge_sent(h2_bucket_beam *beam)
Packit 90a5c9
{
Packit 90a5c9
    apr_bucket *b;
Packit 90a5c9
    /* delete all sender buckets in purge brigade, needs to be called
Packit 90a5c9
     * from sender thread only */
Packit 90a5c9
    while (!H2_BLIST_EMPTY(&beam->purge_list)) {
Packit 90a5c9
        b = H2_BLIST_FIRST(&beam->purge_list);
Packit 90a5c9
        apr_bucket_delete(b);
Packit 90a5c9
    }
Packit 90a5c9
}
Packit 90a5c9
Packit 90a5c9
static apr_size_t calc_space_left(h2_bucket_beam *beam)
Packit 90a5c9
{
Packit 90a5c9
    if (beam->max_buf_size > 0) {
Packit 90a5c9
        apr_off_t len = calc_buffered(beam);
Packit 90a5c9
        return (beam->max_buf_size > len? (beam->max_buf_size - len) : 0);
Packit 90a5c9
    }
Packit 90a5c9
    return APR_SIZE_MAX;
Packit 90a5c9
}
Packit 90a5c9
Packit 90a5c9
static int buffer_is_empty(h2_bucket_beam *beam)
Packit 90a5c9
{
Packit 90a5c9
    return ((!beam->recv_buffer || APR_BRIGADE_EMPTY(beam->recv_buffer))
Packit 90a5c9
            && H2_BLIST_EMPTY(&beam->send_list));
Packit 90a5c9
}
Packit 90a5c9
Packit 90a5c9
static apr_status_t wait_empty(h2_bucket_beam *beam, apr_read_type_e block,  
Packit 90a5c9
                               apr_thread_mutex_t *lock)
Packit 90a5c9
{
Packit 90a5c9
    apr_status_t rv = APR_SUCCESS;
Packit 90a5c9
    
Packit 90a5c9
    while (!buffer_is_empty(beam) && APR_SUCCESS == rv) {
Packit 90a5c9
        if (APR_BLOCK_READ != block || !lock) {
Packit 90a5c9
            rv = APR_EAGAIN;
Packit 90a5c9
        }
Packit 90a5c9
        else if (beam->timeout > 0) {
Packit 90a5c9
            rv = apr_thread_cond_timedwait(beam->change, lock, beam->timeout);
Packit 90a5c9
        }
Packit 90a5c9
        else {
Packit 90a5c9
            rv = apr_thread_cond_wait(beam->change, lock);
Packit 90a5c9
        }
Packit 90a5c9
    }
Packit 90a5c9
    return rv;
Packit 90a5c9
}
Packit 90a5c9
Packit 90a5c9
static apr_status_t wait_not_empty(h2_bucket_beam *beam, apr_read_type_e block,  
Packit 90a5c9
                                   apr_thread_mutex_t *lock)
Packit 90a5c9
{
Packit 90a5c9
    apr_status_t rv = APR_SUCCESS;
Packit 90a5c9
    
Packit 90a5c9
    while (buffer_is_empty(beam) && APR_SUCCESS == rv) {
Packit 90a5c9
        if (beam->aborted) {
Packit 90a5c9
            rv = APR_ECONNABORTED;
Packit 90a5c9
        }
Packit 90a5c9
        else if (beam->closed) {
Packit 90a5c9
            rv = APR_EOF;
Packit 90a5c9
        }
Packit 90a5c9
        else if (APR_BLOCK_READ != block || !lock) {
Packit 90a5c9
            rv = APR_EAGAIN;
Packit 90a5c9
        }
Packit 90a5c9
        else if (beam->timeout > 0) {
Packit 90a5c9
            rv = apr_thread_cond_timedwait(beam->change, lock, beam->timeout);
Packit 90a5c9
        }
Packit 90a5c9
        else {
Packit 90a5c9
            rv = apr_thread_cond_wait(beam->change, lock);
Packit 90a5c9
        }
Packit 90a5c9
    }
Packit 90a5c9
    return rv;
Packit 90a5c9
}
Packit 90a5c9
Packit 90a5c9
static apr_status_t wait_not_full(h2_bucket_beam *beam, apr_read_type_e block, 
Packit 90a5c9
                                  apr_size_t *pspace_left, h2_beam_lock *bl)
Packit 90a5c9
{
Packit 90a5c9
    apr_status_t rv = APR_SUCCESS;
Packit 90a5c9
    apr_size_t left;
Packit 90a5c9
    
Packit 90a5c9
    while (0 == (left = calc_space_left(beam)) && APR_SUCCESS == rv) {
Packit 90a5c9
        if (beam->aborted) {
Packit 90a5c9
            rv = APR_ECONNABORTED;
Packit 90a5c9
        }
Packit 90a5c9
        else if (block != APR_BLOCK_READ || !bl->mutex) {
Packit 90a5c9
            rv = APR_EAGAIN;
Packit 90a5c9
        }
Packit 90a5c9
        else {
Packit 90a5c9
            if (beam->timeout > 0) {
Packit 90a5c9
                rv = apr_thread_cond_timedwait(beam->change, bl->mutex, beam->timeout);
Packit 90a5c9
            }
Packit 90a5c9
            else {
Packit 90a5c9
                rv = apr_thread_cond_wait(beam->change, bl->mutex);
Packit 90a5c9
            }
Packit 90a5c9
        }
Packit 90a5c9
    }
Packit 90a5c9
    *pspace_left = left;
Packit 90a5c9
    return rv;
Packit 90a5c9
}
Packit 90a5c9
Packit 90a5c9
static void h2_beam_emitted(h2_bucket_beam *beam, h2_beam_proxy *proxy)
Packit 90a5c9
{
Packit 90a5c9
    h2_beam_lock bl;
Packit 90a5c9
    apr_bucket *b, *next;
Packit 90a5c9
Packit 90a5c9
    if (enter_yellow(beam, &bl) == APR_SUCCESS) {
Packit 90a5c9
        /* even when beam buckets are split, only the one where
Packit 90a5c9
         * refcount drops to 0 will call us */
Packit 90a5c9
        H2_BPROXY_REMOVE(proxy);
Packit 90a5c9
        /* invoked from receiver thread, the last beam bucket for the send
Packit 90a5c9
         * bucket is about to be destroyed.
Packit 90a5c9
         * remove it from the hold, where it should be now */
Packit 90a5c9
        if (proxy->bsender) {
Packit 90a5c9
            for (b = H2_BLIST_FIRST(&beam->hold_list); 
Packit 90a5c9
                 b != H2_BLIST_SENTINEL(&beam->hold_list);
Packit 90a5c9
                 b = APR_BUCKET_NEXT(b)) {
Packit 90a5c9
                 if (b == proxy->bsender) {
Packit 90a5c9
                    break;
Packit 90a5c9
                 }
Packit 90a5c9
            }
Packit 90a5c9
            if (b != H2_BLIST_SENTINEL(&beam->hold_list)) {
Packit 90a5c9
                /* bucket is in hold as it should be, mark this one
Packit 90a5c9
                 * and all before it for purging. We might have placed meta
Packit 90a5c9
                 * buckets without a receiver proxy into the hold before it 
Packit 90a5c9
                 * and schedule them for purging now */
Packit 90a5c9
                for (b = H2_BLIST_FIRST(&beam->hold_list); 
Packit 90a5c9
                     b != H2_BLIST_SENTINEL(&beam->hold_list);
Packit 90a5c9
                     b = next) {
Packit 90a5c9
                    next = APR_BUCKET_NEXT(b);
Packit 90a5c9
                    if (b == proxy->bsender) {
Packit 90a5c9
                        APR_BUCKET_REMOVE(b);
Packit 90a5c9
                        H2_BLIST_INSERT_TAIL(&beam->purge_list, b);
Packit 90a5c9
                        break;
Packit 90a5c9
                    }
Packit 90a5c9
                    else if (APR_BUCKET_IS_METADATA(b)) {
Packit 90a5c9
                        APR_BUCKET_REMOVE(b);
Packit 90a5c9
                        H2_BLIST_INSERT_TAIL(&beam->purge_list, b);
Packit 90a5c9
                    }
Packit 90a5c9
                    else {
Packit 90a5c9
                        /* another data bucket before this one in hold. this
Packit 90a5c9
                         * is normal since DATA buckets need not be destroyed
Packit 90a5c9
                         * in order */
Packit 90a5c9
                    }
Packit 90a5c9
                }
Packit 90a5c9
                
Packit 90a5c9
                proxy->bsender = NULL;
Packit 90a5c9
            }
Packit 90a5c9
            else {
Packit 90a5c9
                /* it should be there unless we screwed up */
Packit 90a5c9
                ap_log_perror(APLOG_MARK, APLOG_WARNING, 0, beam->send_pool, 
Packit 90a5c9
                              APLOGNO(03384) "h2_beam(%d-%s): emitted bucket not "
Packit 90a5c9
                              "in hold, n=%d", beam->id, beam->tag, 
Packit 90a5c9
                              (int)proxy->n);
Packit 90a5c9
                ap_assert(!proxy->bsender);
Packit 90a5c9
            }
Packit 90a5c9
        }
Packit 90a5c9
        /* notify anyone waiting on space to become available */
Packit 90a5c9
        if (!bl.mutex) {
Packit 90a5c9
            r_purge_sent(beam);
Packit 90a5c9
        }
Packit 90a5c9
        else {
Packit 90a5c9
            apr_thread_cond_broadcast(beam->change);
Packit 90a5c9
        }
Packit 90a5c9
        leave_yellow(beam, &bl);
Packit 90a5c9
    }
Packit 90a5c9
}
Packit 90a5c9
Packit 90a5c9
static void h2_blist_cleanup(h2_blist *bl)
Packit 90a5c9
{
Packit 90a5c9
    apr_bucket *e;
Packit 90a5c9
Packit 90a5c9
    while (!H2_BLIST_EMPTY(bl)) {
Packit 90a5c9
        e = H2_BLIST_FIRST(bl);
Packit 90a5c9
        apr_bucket_delete(e);
Packit 90a5c9
    }
Packit 90a5c9
}
Packit 90a5c9
Packit 90a5c9
static apr_status_t beam_close(h2_bucket_beam *beam)
Packit 90a5c9
{
Packit 90a5c9
    if (!beam->closed) {
Packit 90a5c9
        beam->closed = 1;
Packit 90a5c9
        apr_thread_cond_broadcast(beam->change);
Packit 90a5c9
    }
Packit 90a5c9
    return APR_SUCCESS;
Packit 90a5c9
}
Packit 90a5c9
Packit 90a5c9
int h2_beam_is_closed(h2_bucket_beam *beam)
Packit 90a5c9
{
Packit 90a5c9
    return beam->closed;
Packit 90a5c9
}
Packit 90a5c9
Packit 90a5c9
static int pool_register(h2_bucket_beam *beam, apr_pool_t *pool, 
Packit 90a5c9
                         apr_status_t (*cleanup)(void *))
Packit 90a5c9
{
Packit 90a5c9
    if (pool && pool != beam->pool) {
Packit 90a5c9
        apr_pool_pre_cleanup_register(pool, beam, cleanup);
Packit 90a5c9
        return 1;
Packit 90a5c9
    }
Packit 90a5c9
    return 0;
Packit 90a5c9
}
Packit 90a5c9
Packit 90a5c9
static int pool_kill(h2_bucket_beam *beam, apr_pool_t *pool,
Packit 90a5c9
                     apr_status_t (*cleanup)(void *)) {
Packit 90a5c9
    if (pool && pool != beam->pool) {
Packit 90a5c9
        apr_pool_cleanup_kill(pool, beam, cleanup);
Packit 90a5c9
        return 1;
Packit 90a5c9
    }
Packit 90a5c9
    return 0;
Packit 90a5c9
}
Packit 90a5c9
Packit 90a5c9
static apr_status_t beam_recv_cleanup(void *data)
Packit 90a5c9
{
Packit 90a5c9
    h2_bucket_beam *beam = data;
Packit 90a5c9
    /* receiver pool has gone away, clear references */
Packit 90a5c9
    beam->recv_buffer = NULL;
Packit 90a5c9
    beam->recv_pool = NULL;
Packit 90a5c9
    return APR_SUCCESS;
Packit 90a5c9
}
Packit 90a5c9
Packit 90a5c9
static apr_status_t beam_send_cleanup(void *data)
Packit 90a5c9
{
Packit 90a5c9
    h2_bucket_beam *beam = data;
Packit 90a5c9
    /* sender is going away, clear up all references to its memory */
Packit 90a5c9
    r_purge_sent(beam);
Packit 90a5c9
    h2_blist_cleanup(&beam->send_list);
Packit 90a5c9
    report_consumption(beam, NULL);
Packit 90a5c9
    while (!H2_BPROXY_LIST_EMPTY(&beam->proxies)) {
Packit 90a5c9
        h2_beam_proxy *proxy = H2_BPROXY_LIST_FIRST(&beam->proxies);
Packit 90a5c9
        H2_BPROXY_REMOVE(proxy);
Packit 90a5c9
        proxy->beam = NULL;
Packit 90a5c9
        proxy->bsender = NULL;
Packit 90a5c9
    }
Packit 90a5c9
    h2_blist_cleanup(&beam->purge_list);
Packit 90a5c9
    h2_blist_cleanup(&beam->hold_list);
Packit 90a5c9
    beam->send_pool = NULL;
Packit 90a5c9
    return APR_SUCCESS;
Packit 90a5c9
}
Packit 90a5c9
Packit 90a5c9
static void beam_set_send_pool(h2_bucket_beam *beam, apr_pool_t *pool) 
Packit 90a5c9
{
Packit 90a5c9
    if (beam->send_pool != pool) {
Packit 90a5c9
        if (beam->send_pool && beam->send_pool != beam->pool) {
Packit 90a5c9
            pool_kill(beam, beam->send_pool, beam_send_cleanup);
Packit 90a5c9
            beam_send_cleanup(beam);
Packit 90a5c9
        }
Packit 90a5c9
        beam->send_pool = pool;
Packit 90a5c9
        pool_register(beam, beam->send_pool, beam_send_cleanup);
Packit 90a5c9
    }
Packit 90a5c9
}
Packit 90a5c9
Packit 90a5c9
static void recv_buffer_cleanup(h2_bucket_beam *beam, h2_beam_lock *bl)
Packit 90a5c9
{
Packit 90a5c9
    if (beam->recv_buffer && !APR_BRIGADE_EMPTY(beam->recv_buffer)) {
Packit 90a5c9
        apr_bucket_brigade *bb = beam->recv_buffer;
Packit 90a5c9
        apr_off_t bblen = 0;
Packit 90a5c9
        
Packit 90a5c9
        beam->recv_buffer = NULL;
Packit 90a5c9
        apr_brigade_length(bb, 0, &bblen);
Packit 90a5c9
        beam->received_bytes += bblen;
Packit 90a5c9
        
Packit 90a5c9
        /* need to do this unlocked since bucket destroy might 
Packit 90a5c9
         * call this beam again. */
Packit 90a5c9
        if (bl) leave_yellow(beam, bl);
Packit 90a5c9
        apr_brigade_destroy(bb);
Packit 90a5c9
        if (bl) enter_yellow(beam, bl);
Packit 90a5c9
        
Packit 90a5c9
        apr_thread_cond_broadcast(beam->change);
Packit 90a5c9
        if (beam->cons_ev_cb) { 
Packit 90a5c9
            beam->cons_ev_cb(beam->cons_ctx, beam);
Packit 90a5c9
        }
Packit 90a5c9
    }
Packit 90a5c9
}
Packit 90a5c9
Packit 90a5c9
static apr_status_t beam_cleanup(h2_bucket_beam *beam, int from_pool)
Packit 90a5c9
{
Packit 90a5c9
    apr_status_t status = APR_SUCCESS;
Packit 90a5c9
    int safe_send = (beam->owner == H2_BEAM_OWNER_SEND);
Packit 90a5c9
    int safe_recv = (beam->owner == H2_BEAM_OWNER_RECV);
Packit 90a5c9
    
Packit 90a5c9
    /* 
Packit 90a5c9
     * Owner of the beam is going away, depending on which side it owns,
Packit 90a5c9
     * cleanup strategies will differ.
Packit 90a5c9
     *
Packit 90a5c9
     * In general, receiver holds references to memory from sender. 
Packit 90a5c9
     * Clean up receiver first, if safe, then cleanup sender, if safe.
Packit 90a5c9
     */
Packit 90a5c9
     
Packit 90a5c9
     /* When called from pool destroy, io callbacks are disabled */
Packit 90a5c9
     if (from_pool) {
Packit 90a5c9
         beam->cons_io_cb = NULL;
Packit 90a5c9
     }
Packit 90a5c9
     
Packit 90a5c9
    /* When modify send is not safe, this means we still have multi-thread
Packit 90a5c9
     * protection and the owner is receiving the buckets. If the sending
Packit 90a5c9
     * side has not gone away, this means we could have dangling buckets
Packit 90a5c9
     * in our lists that never get destroyed. This should not happen. */
Packit 90a5c9
    ap_assert(safe_send || !beam->send_pool);
Packit 90a5c9
    if (!H2_BLIST_EMPTY(&beam->send_list)) {
Packit 90a5c9
        ap_assert(beam->send_pool);
Packit 90a5c9
    }
Packit 90a5c9
    
Packit 90a5c9
    if (safe_recv) {
Packit 90a5c9
        if (beam->recv_pool) {
Packit 90a5c9
            pool_kill(beam, beam->recv_pool, beam_recv_cleanup);
Packit 90a5c9
            beam->recv_pool = NULL;
Packit 90a5c9
        }
Packit 90a5c9
        recv_buffer_cleanup(beam, NULL);
Packit 90a5c9
    }
Packit 90a5c9
    else {
Packit 90a5c9
        beam->recv_buffer = NULL;
Packit 90a5c9
        beam->recv_pool = NULL;
Packit 90a5c9
    }
Packit 90a5c9
    
Packit 90a5c9
    if (safe_send && beam->send_pool) {
Packit 90a5c9
        pool_kill(beam, beam->send_pool, beam_send_cleanup);
Packit 90a5c9
        status = beam_send_cleanup(beam);
Packit 90a5c9
    }
Packit 90a5c9
    
Packit 90a5c9
    if (safe_recv) {
Packit 90a5c9
        ap_assert(H2_BPROXY_LIST_EMPTY(&beam->proxies));
Packit 90a5c9
        ap_assert(H2_BLIST_EMPTY(&beam->send_list));
Packit 90a5c9
        ap_assert(H2_BLIST_EMPTY(&beam->hold_list));
Packit 90a5c9
        ap_assert(H2_BLIST_EMPTY(&beam->purge_list));
Packit 90a5c9
    }
Packit 90a5c9
    return status;
Packit 90a5c9
}
Packit 90a5c9
Packit 90a5c9
static apr_status_t beam_pool_cleanup(void *data)
Packit 90a5c9
{
Packit 90a5c9
    return beam_cleanup(data, 1);
Packit 90a5c9
}
Packit 90a5c9
Packit 90a5c9
apr_status_t h2_beam_destroy(h2_bucket_beam *beam)
Packit 90a5c9
{
Packit 90a5c9
    apr_pool_cleanup_kill(beam->pool, beam, beam_pool_cleanup);
Packit 90a5c9
    return beam_cleanup(beam, 0);
Packit 90a5c9
}
Packit 90a5c9
Packit 90a5c9
apr_status_t h2_beam_create(h2_bucket_beam **pbeam, apr_pool_t *pool, 
Packit 90a5c9
                            int id, const char *tag, 
Packit 90a5c9
                            h2_beam_owner_t owner,
Packit 90a5c9
                            apr_size_t max_buf_size,
Packit 90a5c9
                            apr_interval_time_t timeout)
Packit 90a5c9
{
Packit 90a5c9
    h2_bucket_beam *beam;
Packit 90a5c9
    apr_status_t rv = APR_SUCCESS;
Packit 90a5c9
    
Packit 90a5c9
    beam = apr_pcalloc(pool, sizeof(*beam));
Packit 90a5c9
    if (!beam) {
Packit 90a5c9
        return APR_ENOMEM;
Packit 90a5c9
    }
Packit 90a5c9
Packit 90a5c9
    beam->id = id;
Packit 90a5c9
    beam->tag = tag;
Packit 90a5c9
    beam->pool = pool;
Packit 90a5c9
    beam->owner = owner;
Packit 90a5c9
    H2_BLIST_INIT(&beam->send_list);
Packit 90a5c9
    H2_BLIST_INIT(&beam->hold_list);
Packit 90a5c9
    H2_BLIST_INIT(&beam->purge_list);
Packit 90a5c9
    H2_BPROXY_LIST_INIT(&beam->proxies);
Packit 90a5c9
    beam->tx_mem_limits = 1;
Packit 90a5c9
    beam->max_buf_size = max_buf_size;
Packit 90a5c9
    beam->timeout = timeout;
Packit 90a5c9
Packit 90a5c9
    rv = apr_thread_mutex_create(&beam->lock, APR_THREAD_MUTEX_DEFAULT, pool);
Packit 90a5c9
    if (APR_SUCCESS == rv) {
Packit 90a5c9
        rv = apr_thread_cond_create(&beam->change, pool);
Packit 90a5c9
        if (APR_SUCCESS == rv) {
Packit 90a5c9
            apr_pool_pre_cleanup_register(pool, beam, beam_pool_cleanup);
Packit 90a5c9
            *pbeam = beam;
Packit 90a5c9
        }
Packit 90a5c9
    }
Packit 90a5c9
    return rv;
Packit 90a5c9
}
Packit 90a5c9
Packit 90a5c9
void h2_beam_buffer_size_set(h2_bucket_beam *beam, apr_size_t buffer_size)
Packit 90a5c9
{
Packit 90a5c9
    h2_beam_lock bl;
Packit 90a5c9
    
Packit 90a5c9
    if (enter_yellow(beam, &bl) == APR_SUCCESS) {
Packit 90a5c9
        beam->max_buf_size = buffer_size;
Packit 90a5c9
        leave_yellow(beam, &bl);
Packit 90a5c9
    }
Packit 90a5c9
}
Packit 90a5c9
Packit 90a5c9
apr_size_t h2_beam_buffer_size_get(h2_bucket_beam *beam)
Packit 90a5c9
{
Packit 90a5c9
    h2_beam_lock bl;
Packit 90a5c9
    apr_size_t buffer_size = 0;
Packit 90a5c9
    
Packit 90a5c9
    if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) {
Packit 90a5c9
        buffer_size = beam->max_buf_size;
Packit 90a5c9
        leave_yellow(beam, &bl);
Packit 90a5c9
    }
Packit 90a5c9
    return buffer_size;
Packit 90a5c9
}
Packit 90a5c9
Packit 90a5c9
void h2_beam_timeout_set(h2_bucket_beam *beam, apr_interval_time_t timeout)
Packit 90a5c9
{
Packit 90a5c9
    h2_beam_lock bl;
Packit 90a5c9
    
Packit 90a5c9
    if (enter_yellow(beam, &bl) == APR_SUCCESS) {
Packit 90a5c9
        beam->timeout = timeout;
Packit 90a5c9
        leave_yellow(beam, &bl);
Packit 90a5c9
    }
Packit 90a5c9
}
Packit 90a5c9
Packit 90a5c9
apr_interval_time_t h2_beam_timeout_get(h2_bucket_beam *beam)
Packit 90a5c9
{
Packit 90a5c9
    h2_beam_lock bl;
Packit 90a5c9
    apr_interval_time_t timeout = 0;
Packit 90a5c9
    
Packit 90a5c9
    if (enter_yellow(beam, &bl) == APR_SUCCESS) {
Packit 90a5c9
        timeout = beam->timeout;
Packit 90a5c9
        leave_yellow(beam, &bl);
Packit 90a5c9
    }
Packit 90a5c9
    return timeout;
Packit 90a5c9
}
Packit 90a5c9
Packit 90a5c9
void h2_beam_abort(h2_bucket_beam *beam)
Packit 90a5c9
{
Packit 90a5c9
    h2_beam_lock bl;
Packit 90a5c9
    
Packit 90a5c9
    if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) {
Packit 90a5c9
        beam->aborted = 1;
Packit 90a5c9
        r_purge_sent(beam);
Packit 90a5c9
        h2_blist_cleanup(&beam->send_list);
Packit 90a5c9
        report_consumption(beam, &bl);
Packit 90a5c9
        apr_thread_cond_broadcast(beam->change);
Packit 90a5c9
        leave_yellow(beam, &bl);
Packit 90a5c9
    }
Packit 90a5c9
}
Packit 90a5c9
Packit 90a5c9
apr_status_t h2_beam_close(h2_bucket_beam *beam)
Packit 90a5c9
{
Packit 90a5c9
    h2_beam_lock bl;
Packit 90a5c9
    
Packit 90a5c9
    if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) {
Packit 90a5c9
        r_purge_sent(beam);
Packit 90a5c9
        beam_close(beam);
Packit 90a5c9
        report_consumption(beam, &bl);
Packit 90a5c9
        leave_yellow(beam, &bl);
Packit 90a5c9
    }
Packit 90a5c9
    return beam->aborted? APR_ECONNABORTED : APR_SUCCESS;
Packit 90a5c9
}
Packit 90a5c9
Packit 90a5c9
apr_status_t h2_beam_leave(h2_bucket_beam *beam)
Packit 90a5c9
{
Packit 90a5c9
    h2_beam_lock bl;
Packit 90a5c9
    
Packit 90a5c9
    if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) {
Packit 90a5c9
        recv_buffer_cleanup(beam, &bl);
Packit 90a5c9
        beam->aborted = 1;
Packit 90a5c9
        beam_close(beam);
Packit 90a5c9
        leave_yellow(beam, &bl);
Packit 90a5c9
    }
Packit 90a5c9
    return APR_SUCCESS;
Packit 90a5c9
}
Packit 90a5c9
Packit 90a5c9
apr_status_t h2_beam_wait_empty(h2_bucket_beam *beam, apr_read_type_e block)
Packit 90a5c9
{
Packit 90a5c9
    apr_status_t status;
Packit 90a5c9
    h2_beam_lock bl;
Packit 90a5c9
    
Packit 90a5c9
    if ((status = enter_yellow(beam, &bl)) == APR_SUCCESS) {
Packit 90a5c9
        status = wait_empty(beam, block, bl.mutex);
Packit 90a5c9
        leave_yellow(beam, &bl);
Packit 90a5c9
    }
Packit 90a5c9
    return status;
Packit 90a5c9
}
Packit 90a5c9
Packit 90a5c9
static void move_to_hold(h2_bucket_beam *beam, 
Packit 90a5c9
                         apr_bucket_brigade *sender_bb)
Packit 90a5c9
{
Packit 90a5c9
    apr_bucket *b;
Packit 90a5c9
    while (sender_bb && !APR_BRIGADE_EMPTY(sender_bb)) {
Packit 90a5c9
        b = APR_BRIGADE_FIRST(sender_bb);
Packit 90a5c9
        APR_BUCKET_REMOVE(b);
Packit 90a5c9
        H2_BLIST_INSERT_TAIL(&beam->send_list, b);
Packit 90a5c9
    }
Packit 90a5c9
}
Packit 90a5c9
Packit 90a5c9
static apr_status_t append_bucket(h2_bucket_beam *beam, 
Packit 90a5c9
                                  apr_bucket *b,
Packit 90a5c9
                                  apr_read_type_e block,
Packit 90a5c9
                                  apr_size_t *pspace_left,
Packit 90a5c9
                                  h2_beam_lock *pbl)
Packit 90a5c9
{
Packit 90a5c9
    const char *data;
Packit 90a5c9
    apr_size_t len;
Packit 90a5c9
    apr_status_t status;
Packit 90a5c9
    int can_beam = 0, check_len;
Packit 90a5c9
    
Packit 90a5c9
    if (beam->aborted) {
Packit 90a5c9
        return APR_ECONNABORTED;
Packit 90a5c9
    }
Packit 90a5c9
    
Packit 90a5c9
    if (APR_BUCKET_IS_METADATA(b)) {
Packit 90a5c9
        if (APR_BUCKET_IS_EOS(b)) {
Packit 90a5c9
            beam->closed = 1;
Packit 90a5c9
        }
Packit 90a5c9
        APR_BUCKET_REMOVE(b);
Packit 90a5c9
        H2_BLIST_INSERT_TAIL(&beam->send_list, b);
Packit 90a5c9
        return APR_SUCCESS;
Packit 90a5c9
    }
Packit 90a5c9
    else if (APR_BUCKET_IS_FILE(b)) {
Packit 90a5c9
        /* For file buckets the problem is their internal readpool that
Packit 90a5c9
         * is used on the first read to allocate buffer/mmap.
Packit 90a5c9
         * Since setting aside a file bucket will de-register the
Packit 90a5c9
         * file cleanup function from the previous pool, we need to
Packit 90a5c9
         * call that only from the sender thread.
Packit 90a5c9
         *
Packit 90a5c9
         * Currently, we do not handle file bucket with refcount > 1 as
Packit 90a5c9
         * the beam is then not in complete control of the file's lifetime.
Packit 90a5c9
         * Which results in the bug that a file get closed by the receiver
Packit 90a5c9
         * while the sender or the beam still have buckets using it. 
Packit 90a5c9
         * 
Packit 90a5c9
         * Additionally, we allow callbacks to prevent beaming file
Packit 90a5c9
         * handles across. The use case for this is to limit the number 
Packit 90a5c9
         * of open file handles and rather use a less efficient beam
Packit 90a5c9
         * transport. */
Packit 90a5c9
        apr_bucket_file *bf = b->data;
Packit 90a5c9
        apr_file_t *fd = bf->fd;
Packit 90a5c9
        can_beam = (bf->refcount.refcount == 1);
Packit 90a5c9
        if (can_beam && beam->can_beam_fn) {
Packit 90a5c9
            can_beam = beam->can_beam_fn(beam->can_beam_ctx, beam, fd);
Packit 90a5c9
        }
Packit 90a5c9
        check_len = !can_beam;
Packit 90a5c9
    }
Packit 90a5c9
    else {
Packit 90a5c9
        if (b->length == ((apr_size_t)-1)) {
Packit 90a5c9
            const char *data;
Packit 90a5c9
            status = apr_bucket_read(b, &data, &len, APR_BLOCK_READ);
Packit 90a5c9
            if (status != APR_SUCCESS) {
Packit 90a5c9
                return status;
Packit 90a5c9
            }
Packit 90a5c9
        }
Packit 90a5c9
        check_len = 1;
Packit 90a5c9
    }
Packit 90a5c9
    
Packit 90a5c9
    if (check_len) {
Packit 90a5c9
        if (b->length > *pspace_left) {
Packit 90a5c9
            apr_bucket_split(b, *pspace_left);
Packit 90a5c9
        }
Packit 90a5c9
        *pspace_left -= b->length;
Packit 90a5c9
    }
Packit 90a5c9
Packit 90a5c9
    /* The fundamental problem is that reading a sender bucket from
Packit 90a5c9
     * a receiver thread is a total NO GO, because the bucket might use
Packit 90a5c9
     * its pool/bucket_alloc from a foreign thread and that will
Packit 90a5c9
     * corrupt. */
Packit 90a5c9
    status = APR_ENOTIMPL;
Packit 90a5c9
    if (APR_BUCKET_IS_TRANSIENT(b)) {
Packit 90a5c9
        /* this takes care of transient buckets and converts them
Packit 90a5c9
         * into heap ones. Other bucket types might or might not be
Packit 90a5c9
         * affected by this. */
Packit 90a5c9
        status = apr_bucket_setaside(b, beam->send_pool);
Packit 90a5c9
    }
Packit 90a5c9
    else if (APR_BUCKET_IS_HEAP(b)) {
Packit 90a5c9
        /* For heap buckets read from a receiver thread is fine. The
Packit 90a5c9
         * data will be there and live until the bucket itself is
Packit 90a5c9
         * destroyed. */
Packit 90a5c9
        status = APR_SUCCESS;
Packit 90a5c9
    }
Packit 90a5c9
    else if (APR_BUCKET_IS_POOL(b)) {
Packit 90a5c9
        /* pool buckets are bastards that register at pool cleanup
Packit 90a5c9
         * to morph themselves into heap buckets. That may happen anytime,
Packit 90a5c9
         * even after the bucket data pointer has been read. So at
Packit 90a5c9
         * any time inside the receiver thread, the pool bucket memory
Packit 90a5c9
         * may disappear. yikes. */
Packit 90a5c9
        status = apr_bucket_read(b, &data, &len, APR_BLOCK_READ);
Packit 90a5c9
        if (status == APR_SUCCESS) {
Packit 90a5c9
            apr_bucket_heap_make(b, data, len, NULL);
Packit 90a5c9
        }
Packit 90a5c9
    }
Packit 90a5c9
    else if (APR_BUCKET_IS_FILE(b) && can_beam) {
Packit 90a5c9
        status = apr_bucket_setaside(b, beam->send_pool);
Packit 90a5c9
    }
Packit 90a5c9
    
Packit 90a5c9
    if (status == APR_ENOTIMPL) {
Packit 90a5c9
        /* we have no knowledge about the internals of this bucket,
Packit 90a5c9
         * but hope that after read, its data stays immutable for the
Packit 90a5c9
         * lifetime of the bucket. (see pool bucket handling above for
Packit 90a5c9
         * a counter example).
Packit 90a5c9
         * We do the read while in the sender thread, so that the bucket may
Packit 90a5c9
         * use pools/allocators safely. */
Packit 90a5c9
        status = apr_bucket_read(b, &data, &len, APR_BLOCK_READ);
Packit 90a5c9
        if (status == APR_SUCCESS) {
Packit 90a5c9
            status = apr_bucket_setaside(b, beam->send_pool);
Packit 90a5c9
        }
Packit 90a5c9
    }
Packit 90a5c9
    
Packit 90a5c9
    if (status != APR_SUCCESS && status != APR_ENOTIMPL) {
Packit 90a5c9
        return status;
Packit 90a5c9
    }
Packit 90a5c9
    
Packit 90a5c9
    APR_BUCKET_REMOVE(b);
Packit 90a5c9
    H2_BLIST_INSERT_TAIL(&beam->send_list, b);
Packit 90a5c9
    beam->sent_bytes += b->length;
Packit 90a5c9
Packit 90a5c9
    return APR_SUCCESS;
Packit 90a5c9
}
Packit 90a5c9
Packit 90a5c9
void h2_beam_send_from(h2_bucket_beam *beam, apr_pool_t *p)
Packit 90a5c9
{
Packit 90a5c9
    h2_beam_lock bl;
Packit 90a5c9
    /* Called from the sender thread to add buckets to the beam */
Packit 90a5c9
    if (enter_yellow(beam, &bl) == APR_SUCCESS) {
Packit 90a5c9
        r_purge_sent(beam);
Packit 90a5c9
        beam_set_send_pool(beam, p);
Packit 90a5c9
        leave_yellow(beam, &bl);
Packit 90a5c9
    }
Packit 90a5c9
}
Packit 90a5c9
Packit 90a5c9
apr_status_t h2_beam_send(h2_bucket_beam *beam, 
Packit 90a5c9
                          apr_bucket_brigade *sender_bb, 
Packit 90a5c9
                          apr_read_type_e block)
Packit 90a5c9
{
Packit 90a5c9
    apr_bucket *b;
Packit 90a5c9
    apr_status_t rv = APR_SUCCESS;
Packit 90a5c9
    apr_size_t space_left = 0;
Packit 90a5c9
    h2_beam_lock bl;
Packit 90a5c9
Packit 90a5c9
    /* Called from the sender thread to add buckets to the beam */
Packit 90a5c9
    if (enter_yellow(beam, &bl) == APR_SUCCESS) {
Packit 90a5c9
        ap_assert(beam->send_pool);
Packit 90a5c9
        r_purge_sent(beam);
Packit 90a5c9
        
Packit 90a5c9
        if (beam->aborted) {
Packit 90a5c9
            move_to_hold(beam, sender_bb);
Packit 90a5c9
            rv = APR_ECONNABORTED;
Packit 90a5c9
        }
Packit 90a5c9
        else if (sender_bb) {
Packit 90a5c9
            int force_report = !APR_BRIGADE_EMPTY(sender_bb);
Packit 90a5c9
            
Packit 90a5c9
            space_left = calc_space_left(beam);
Packit 90a5c9
            while (!APR_BRIGADE_EMPTY(sender_bb) && APR_SUCCESS == rv) {
Packit 90a5c9
                if (space_left <= 0) {
Packit 90a5c9
                    report_prod_io(beam, force_report, &bl);
Packit 90a5c9
                    r_purge_sent(beam);
Packit 90a5c9
                    rv = wait_not_full(beam, block, &space_left, &bl);
Packit 90a5c9
                    if (APR_SUCCESS != rv) {
Packit 90a5c9
                        break;
Packit 90a5c9
                    }
Packit 90a5c9
                }
Packit 90a5c9
                b = APR_BRIGADE_FIRST(sender_bb);
Packit 90a5c9
                rv = append_bucket(beam, b, block, &space_left, &bl);
Packit 90a5c9
            }
Packit 90a5c9
            
Packit 90a5c9
            report_prod_io(beam, force_report, &bl);
Packit 90a5c9
            apr_thread_cond_broadcast(beam->change);
Packit 90a5c9
        }
Packit 90a5c9
        report_consumption(beam, &bl);
Packit 90a5c9
        leave_yellow(beam, &bl);
Packit 90a5c9
    }
Packit 90a5c9
    return rv;
Packit 90a5c9
}
Packit 90a5c9
Packit 90a5c9
apr_status_t h2_beam_receive(h2_bucket_beam *beam, 
Packit 90a5c9
                             apr_bucket_brigade *bb, 
Packit 90a5c9
                             apr_read_type_e block,
Packit 90a5c9
                             apr_off_t readbytes)
Packit 90a5c9
{
Packit 90a5c9
    h2_beam_lock bl;
Packit 90a5c9
    apr_bucket *bsender, *brecv, *ng;
Packit 90a5c9
    int transferred = 0;
Packit 90a5c9
    apr_status_t status = APR_SUCCESS;
Packit 90a5c9
    apr_off_t remain;
Packit 90a5c9
    int transferred_buckets = 0;
Packit 90a5c9
    
Packit 90a5c9
    /* Called from the receiver thread to take buckets from the beam */
Packit 90a5c9
    if (enter_yellow(beam, &bl) == APR_SUCCESS) {
Packit 90a5c9
        if (readbytes <= 0) {
Packit 90a5c9
            readbytes = APR_SIZE_MAX;
Packit 90a5c9
        }
Packit 90a5c9
        remain = readbytes;
Packit 90a5c9
        
Packit 90a5c9
transfer:
Packit 90a5c9
        if (beam->aborted) {
Packit 90a5c9
            recv_buffer_cleanup(beam, &bl);
Packit 90a5c9
            status = APR_ECONNABORTED;
Packit 90a5c9
            goto leave;
Packit 90a5c9
        }
Packit 90a5c9
Packit 90a5c9
        /* transfer enough buckets from our receiver brigade, if we have one */
Packit 90a5c9
        while (remain >= 0 
Packit 90a5c9
               && beam->recv_buffer 
Packit 90a5c9
               && !APR_BRIGADE_EMPTY(beam->recv_buffer)) {
Packit 90a5c9
               
Packit 90a5c9
            brecv = APR_BRIGADE_FIRST(beam->recv_buffer);
Packit 90a5c9
            if (brecv->length > 0 && remain <= 0) {
Packit 90a5c9
                break;
Packit 90a5c9
            }            
Packit 90a5c9
            APR_BUCKET_REMOVE(brecv);
Packit 90a5c9
            APR_BRIGADE_INSERT_TAIL(bb, brecv);
Packit 90a5c9
            remain -= brecv->length;
Packit 90a5c9
            ++transferred;
Packit 90a5c9
        }
Packit 90a5c9
Packit 90a5c9
        /* transfer from our sender brigade, transforming sender buckets to
Packit 90a5c9
         * receiver ones until we have enough */
Packit 90a5c9
        while (remain >= 0 && !H2_BLIST_EMPTY(&beam->send_list)) {
Packit 90a5c9
               
Packit 90a5c9
            brecv = NULL;
Packit 90a5c9
            bsender = H2_BLIST_FIRST(&beam->send_list);            
Packit 90a5c9
            if (bsender->length > 0 && remain <= 0) {
Packit 90a5c9
                break;
Packit 90a5c9
            }
Packit 90a5c9
                        
Packit 90a5c9
            if (APR_BUCKET_IS_METADATA(bsender)) {
Packit 90a5c9
                if (APR_BUCKET_IS_EOS(bsender)) {
Packit 90a5c9
                    brecv = apr_bucket_eos_create(bb->bucket_alloc);
Packit 90a5c9
                    beam->close_sent = 1;
Packit 90a5c9
                }
Packit 90a5c9
                else if (APR_BUCKET_IS_FLUSH(bsender)) {
Packit 90a5c9
                    brecv = apr_bucket_flush_create(bb->bucket_alloc);
Packit 90a5c9
                }
Packit 90a5c9
                else if (AP_BUCKET_IS_ERROR(bsender)) {
Packit 90a5c9
                    ap_bucket_error *eb = (ap_bucket_error *)bsender;
Packit 90a5c9
                    brecv = ap_bucket_error_create(eb->status, eb->data,
Packit 90a5c9
                                                    bb->p, bb->bucket_alloc);
Packit 90a5c9
                }
Packit 90a5c9
            }
Packit 90a5c9
            else if (bsender->length == 0) {
Packit 90a5c9
                APR_BUCKET_REMOVE(bsender);
Packit 90a5c9
                H2_BLIST_INSERT_TAIL(&beam->hold_list, bsender);
Packit 90a5c9
                continue;
Packit 90a5c9
            }
Packit 90a5c9
            else if (APR_BUCKET_IS_FILE(bsender)) {
Packit 90a5c9
                /* This is set aside into the target brigade pool so that 
Packit 90a5c9
                 * any read operation messes with that pool and not 
Packit 90a5c9
                 * the sender one. */
Packit 90a5c9
                apr_bucket_file *f = (apr_bucket_file *)bsender->data;
Packit 90a5c9
                apr_file_t *fd = f->fd;
Packit 90a5c9
                int setaside = (f->readpool != bb->p);
Packit 90a5c9
                
Packit 90a5c9
                if (setaside) {
Packit 90a5c9
                    status = apr_file_setaside(&fd, fd, bb->p);
Packit 90a5c9
                    if (status != APR_SUCCESS) {
Packit 90a5c9
                        goto leave;
Packit 90a5c9
                    }
Packit 90a5c9
                    ++beam->files_beamed;
Packit 90a5c9
                }
Packit 90a5c9
                ng = apr_brigade_insert_file(bb, fd, bsender->start, bsender->length, 
Packit 90a5c9
                                             bb->p);
Packit 90a5c9
#if APR_HAS_MMAP
Packit 90a5c9
                /* disable mmap handling as this leads to segfaults when
Packit 90a5c9
                 * the underlying file is changed while memory pointer has
Packit 90a5c9
                 * been handed out. See also PR 59348 */
Packit 90a5c9
                apr_bucket_file_enable_mmap(ng, 0);
Packit 90a5c9
#endif
Packit 90a5c9
                APR_BUCKET_REMOVE(bsender);
Packit 90a5c9
                H2_BLIST_INSERT_TAIL(&beam->hold_list, bsender);
Packit 90a5c9
Packit 90a5c9
                remain -= bsender->length;
Packit 90a5c9
                ++transferred;
Packit 90a5c9
                ++transferred_buckets;
Packit 90a5c9
                continue;
Packit 90a5c9
            }
Packit 90a5c9
            else {
Packit 90a5c9
                /* create a "receiver" standin bucket. we took care about the
Packit 90a5c9
                 * underlying sender bucket and its data when we placed it into
Packit 90a5c9
                 * the sender brigade.
Packit 90a5c9
                 * the beam bucket will notify us on destruction that bsender is
Packit 90a5c9
                 * no longer needed. */
Packit 90a5c9
                brecv = h2_beam_bucket_create(beam, bsender, bb->bucket_alloc,
Packit 90a5c9
                                               beam->buckets_sent++);
Packit 90a5c9
            }
Packit 90a5c9
            
Packit 90a5c9
            /* Place the sender bucket into our hold, to be destroyed when no
Packit 90a5c9
             * receiver bucket references it any more. */
Packit 90a5c9
            APR_BUCKET_REMOVE(bsender);
Packit 90a5c9
            H2_BLIST_INSERT_TAIL(&beam->hold_list, bsender);
Packit 90a5c9
            
Packit 90a5c9
            beam->received_bytes += bsender->length;
Packit 90a5c9
            ++transferred_buckets;
Packit 90a5c9
            
Packit 90a5c9
            if (brecv) {
Packit 90a5c9
                APR_BRIGADE_INSERT_TAIL(bb, brecv);
Packit 90a5c9
                remain -= brecv->length;
Packit 90a5c9
                ++transferred;
Packit 90a5c9
            }
Packit 90a5c9
            else {
Packit 90a5c9
                /* let outside hook determine how bucket is beamed */
Packit 90a5c9
                leave_yellow(beam, &bl);
Packit 90a5c9
                brecv = h2_beam_bucket(beam, bb, bsender);
Packit 90a5c9
                enter_yellow(beam, &bl);
Packit 90a5c9
                
Packit 90a5c9
                while (brecv && brecv != APR_BRIGADE_SENTINEL(bb)) {
Packit 90a5c9
                    ++transferred;
Packit 90a5c9
                    remain -= brecv->length;
Packit 90a5c9
                    brecv = APR_BUCKET_NEXT(brecv);
Packit 90a5c9
                }
Packit 90a5c9
            }
Packit 90a5c9
        }
Packit 90a5c9
Packit 90a5c9
        if (remain < 0) {
Packit 90a5c9
            /* too much, put some back into out recv_buffer */
Packit 90a5c9
            remain = readbytes;
Packit 90a5c9
            for (brecv = APR_BRIGADE_FIRST(bb);
Packit 90a5c9
                 brecv != APR_BRIGADE_SENTINEL(bb);
Packit 90a5c9
                 brecv = APR_BUCKET_NEXT(brecv)) {
Packit 90a5c9
                remain -= (beam->tx_mem_limits? bucket_mem_used(brecv) 
Packit 90a5c9
                           : brecv->length);
Packit 90a5c9
                if (remain < 0) {
Packit 90a5c9
                    apr_bucket_split(brecv, brecv->length+remain);
Packit 90a5c9
                    beam->recv_buffer = apr_brigade_split_ex(bb, 
Packit 90a5c9
                                                             APR_BUCKET_NEXT(brecv), 
Packit 90a5c9
                                                             beam->recv_buffer);
Packit 90a5c9
                    break;
Packit 90a5c9
                }
Packit 90a5c9
            }
Packit 90a5c9
        }
Packit 90a5c9
Packit 90a5c9
        if (beam->closed && buffer_is_empty(beam)) {
Packit 90a5c9
            /* beam is closed and we have nothing more to receive */ 
Packit 90a5c9
            if (!beam->close_sent) {
Packit 90a5c9
                apr_bucket *b = apr_bucket_eos_create(bb->bucket_alloc);
Packit 90a5c9
                APR_BRIGADE_INSERT_TAIL(bb, b);
Packit 90a5c9
                beam->close_sent = 1;
Packit 90a5c9
                ++transferred;
Packit 90a5c9
                status = APR_SUCCESS;
Packit 90a5c9
            }
Packit 90a5c9
        }
Packit 90a5c9
        
Packit 90a5c9
        if (transferred_buckets > 0) {
Packit 90a5c9
           if (beam->cons_ev_cb) { 
Packit 90a5c9
               beam->cons_ev_cb(beam->cons_ctx, beam);
Packit 90a5c9
            }
Packit 90a5c9
        }
Packit 90a5c9
        
Packit 90a5c9
        if (transferred) {
Packit 90a5c9
            apr_thread_cond_broadcast(beam->change);
Packit 90a5c9
            status = APR_SUCCESS;
Packit 90a5c9
        }
Packit 90a5c9
        else {
Packit 90a5c9
            status = wait_not_empty(beam, block, bl.mutex);
Packit 90a5c9
            if (status != APR_SUCCESS) {
Packit 90a5c9
                goto leave;
Packit 90a5c9
            }
Packit 90a5c9
            goto transfer;
Packit 90a5c9
        }
Packit 90a5c9
leave:        
Packit 90a5c9
        leave_yellow(beam, &bl);
Packit 90a5c9
    }
Packit 90a5c9
    return status;
Packit 90a5c9
}
Packit 90a5c9
Packit 90a5c9
void h2_beam_on_consumed(h2_bucket_beam *beam, 
Packit 90a5c9
                         h2_beam_ev_callback *ev_cb,
Packit 90a5c9
                         h2_beam_io_callback *io_cb, void *ctx)
Packit 90a5c9
{
Packit 90a5c9
    h2_beam_lock bl;
Packit 90a5c9
    if (enter_yellow(beam, &bl) == APR_SUCCESS) {
Packit 90a5c9
        beam->cons_ev_cb = ev_cb;
Packit 90a5c9
        beam->cons_io_cb = io_cb;
Packit 90a5c9
        beam->cons_ctx = ctx;
Packit 90a5c9
        leave_yellow(beam, &bl);
Packit 90a5c9
    }
Packit 90a5c9
}
Packit 90a5c9
Packit 90a5c9
void h2_beam_on_produced(h2_bucket_beam *beam, 
Packit 90a5c9
                         h2_beam_io_callback *io_cb, void *ctx)
Packit 90a5c9
{
Packit 90a5c9
    h2_beam_lock bl;
Packit 90a5c9
    if (enter_yellow(beam, &bl) == APR_SUCCESS) {
Packit 90a5c9
        beam->prod_io_cb = io_cb;
Packit 90a5c9
        beam->prod_ctx = ctx;
Packit 90a5c9
        leave_yellow(beam, &bl);
Packit 90a5c9
    }
Packit 90a5c9
}
Packit 90a5c9
Packit 90a5c9
void h2_beam_on_file_beam(h2_bucket_beam *beam, 
Packit 90a5c9
                          h2_beam_can_beam_callback *cb, void *ctx)
Packit 90a5c9
{
Packit 90a5c9
    h2_beam_lock bl;
Packit 90a5c9
    
Packit 90a5c9
    if (enter_yellow(beam, &bl) == APR_SUCCESS) {
Packit 90a5c9
        beam->can_beam_fn = cb;
Packit 90a5c9
        beam->can_beam_ctx = ctx;
Packit 90a5c9
        leave_yellow(beam, &bl);
Packit 90a5c9
    }
Packit 90a5c9
}
Packit 90a5c9
Packit 90a5c9
Packit 90a5c9
apr_off_t h2_beam_get_buffered(h2_bucket_beam *beam)
Packit 90a5c9
{
Packit 90a5c9
    apr_bucket *b;
Packit 90a5c9
    apr_off_t l = 0;
Packit 90a5c9
    h2_beam_lock bl;
Packit 90a5c9
    
Packit 90a5c9
    if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) {
Packit 90a5c9
        for (b = H2_BLIST_FIRST(&beam->send_list); 
Packit 90a5c9
            b != H2_BLIST_SENTINEL(&beam->send_list);
Packit 90a5c9
            b = APR_BUCKET_NEXT(b)) {
Packit 90a5c9
            /* should all have determinate length */
Packit 90a5c9
            l += b->length;
Packit 90a5c9
        }
Packit 90a5c9
        leave_yellow(beam, &bl);
Packit 90a5c9
    }
Packit 90a5c9
    return l;
Packit 90a5c9
}
Packit 90a5c9
Packit 90a5c9
apr_off_t h2_beam_get_mem_used(h2_bucket_beam *beam)
Packit 90a5c9
{
Packit 90a5c9
    apr_bucket *b;
Packit 90a5c9
    apr_off_t l = 0;
Packit 90a5c9
    h2_beam_lock bl;
Packit 90a5c9
    
Packit 90a5c9
    if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) {
Packit 90a5c9
        for (b = H2_BLIST_FIRST(&beam->send_list); 
Packit 90a5c9
            b != H2_BLIST_SENTINEL(&beam->send_list);
Packit 90a5c9
            b = APR_BUCKET_NEXT(b)) {
Packit 90a5c9
            l += bucket_mem_used(b);
Packit 90a5c9
        }
Packit 90a5c9
        leave_yellow(beam, &bl);
Packit 90a5c9
    }
Packit 90a5c9
    return l;
Packit 90a5c9
}
Packit 90a5c9
Packit 90a5c9
int h2_beam_empty(h2_bucket_beam *beam)
Packit 90a5c9
{
Packit 90a5c9
    int empty = 1;
Packit 90a5c9
    h2_beam_lock bl;
Packit 90a5c9
    
Packit 90a5c9
    if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) {
Packit 90a5c9
        empty = (H2_BLIST_EMPTY(&beam->send_list) 
Packit 90a5c9
                 && (!beam->recv_buffer || APR_BRIGADE_EMPTY(beam->recv_buffer)));
Packit 90a5c9
        leave_yellow(beam, &bl);
Packit 90a5c9
    }
Packit 90a5c9
    return empty;
Packit 90a5c9
}
Packit 90a5c9
Packit 90a5c9
int h2_beam_holds_proxies(h2_bucket_beam *beam)
Packit 90a5c9
{
Packit 90a5c9
    int has_proxies = 1;
Packit 90a5c9
    h2_beam_lock bl;
Packit 90a5c9
    
Packit 90a5c9
    if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) {
Packit 90a5c9
        has_proxies = !H2_BPROXY_LIST_EMPTY(&beam->proxies);
Packit 90a5c9
        leave_yellow(beam, &bl);
Packit 90a5c9
    }
Packit 90a5c9
    return has_proxies;
Packit 90a5c9
}
Packit 90a5c9
Packit 90a5c9
int h2_beam_was_received(h2_bucket_beam *beam)
Packit 90a5c9
{
Packit 90a5c9
    int happend = 0;
Packit 90a5c9
    h2_beam_lock bl;
Packit 90a5c9
    
Packit 90a5c9
    if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) {
Packit 90a5c9
        happend = (beam->received_bytes > 0);
Packit 90a5c9
        leave_yellow(beam, &bl);
Packit 90a5c9
    }
Packit 90a5c9
    return happend;
Packit 90a5c9
}
Packit 90a5c9
Packit 90a5c9
apr_size_t h2_beam_get_files_beamed(h2_bucket_beam *beam)
Packit 90a5c9
{
Packit 90a5c9
    apr_size_t n = 0;
Packit 90a5c9
    h2_beam_lock bl;
Packit 90a5c9
    
Packit 90a5c9
    if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) {
Packit 90a5c9
        n = beam->files_beamed;
Packit 90a5c9
        leave_yellow(beam, &bl);
Packit 90a5c9
    }
Packit 90a5c9
    return n;
Packit 90a5c9
}
Packit 90a5c9
Packit 90a5c9
int h2_beam_no_files(void *ctx, h2_bucket_beam *beam, apr_file_t *file)
Packit 90a5c9
{
Packit 90a5c9
    return 0;
Packit 90a5c9
}
Packit 90a5c9
Packit 90a5c9
int h2_beam_report_consumption(h2_bucket_beam *beam)
Packit 90a5c9
{
Packit 90a5c9
    h2_beam_lock bl;
Packit 90a5c9
    int rv = 0;
Packit 90a5c9
    if (enter_yellow(beam, &bl) == APR_SUCCESS) {
Packit 90a5c9
        rv = report_consumption(beam, &bl);
Packit 90a5c9
        leave_yellow(beam, &bl);
Packit 90a5c9
    }
Packit 90a5c9
    return rv;
Packit 90a5c9
}
Packit 90a5c9
Packit 90a5c9
void h2_beam_log(h2_bucket_beam *beam, conn_rec *c, int level, const char *msg)
Packit 90a5c9
{
Packit 90a5c9
    if (beam && APLOG_C_IS_LEVEL(c,level)) {
Packit 90a5c9
        ap_log_cerror(APLOG_MARK, level, 0, c, 
Packit 90a5c9
                      "beam(%ld-%d,%s,closed=%d,aborted=%d,empty=%d,buf=%ld): %s", 
Packit 90a5c9
                      (c->master? c->master->id : c->id), beam->id, beam->tag, 
Packit 90a5c9
                      beam->closed, beam->aborted, h2_beam_empty(beam), 
Packit 90a5c9
                      (long)h2_beam_get_buffered(beam), msg);
Packit 90a5c9
    }
Packit 90a5c9
}
Packit 90a5c9
Packit 90a5c9