Blame buckets/bwtp_buckets.c

Packit 3adb1e
/* ====================================================================
Packit 3adb1e
 *    Licensed to the Apache Software Foundation (ASF) under one
Packit 3adb1e
 *    or more contributor license agreements.  See the NOTICE file
Packit 3adb1e
 *    distributed with this work for additional information
Packit 3adb1e
 *    regarding copyright ownership.  The ASF licenses this file
Packit 3adb1e
 *    to you under the Apache License, Version 2.0 (the
Packit 3adb1e
 *    "License"); you may not use this file except in compliance
Packit 3adb1e
 *    with the License.  You may obtain a copy of the License at
Packit 3adb1e
 *
Packit 3adb1e
 *      http://www.apache.org/licenses/LICENSE-2.0
Packit 3adb1e
 *
Packit 3adb1e
 *    Unless required by applicable law or agreed to in writing,
Packit 3adb1e
 *    software distributed under the License is distributed on an
Packit 3adb1e
 *    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
Packit 3adb1e
 *    KIND, either express or implied.  See the License for the
Packit 3adb1e
 *    specific language governing permissions and limitations
Packit 3adb1e
 *    under the License.
Packit 3adb1e
 * ====================================================================
Packit 3adb1e
 */
Packit 3adb1e
Packit 3adb1e
#include <apr_pools.h>
Packit 3adb1e
#include <apr_strings.h>
Packit 3adb1e
#include <apr_lib.h>
Packit 3adb1e
#include <apr_date.h>
Packit 3adb1e
Packit 3adb1e
#include "serf.h"
Packit 3adb1e
#include "serf_bucket_util.h"
Packit 3adb1e
#include "serf_bucket_types.h"
Packit 3adb1e
Packit 3adb1e
#include <stdlib.h>
Packit 3adb1e
Packit 3adb1e
/* This is an implementation of Bidirectional Web Transfer Protocol (BWTP)
Packit 3adb1e
 * See:
Packit 3adb1e
 *   http://bwtp.wikidot.com/
Packit 3adb1e
 */
Packit 3adb1e
Packit 3adb1e
typedef struct {
Packit 3adb1e
    int channel;
Packit 3adb1e
    int open;
Packit 3adb1e
    int type; /* 0 = header, 1 = message */ /* TODO enum? */
Packit 3adb1e
    const char *phrase;
Packit 3adb1e
    serf_bucket_t *headers;
Packit 3adb1e
Packit 3adb1e
    char req_line[1000];
Packit 3adb1e
} frame_context_t;
Packit 3adb1e
Packit 3adb1e
typedef struct {
Packit 3adb1e
    serf_bucket_t *stream;
Packit 3adb1e
    serf_bucket_t *body;        /* Pointer to the stream wrapping the body. */
Packit 3adb1e
    serf_bucket_t *headers;     /* holds parsed headers */
Packit 3adb1e
Packit 3adb1e
    enum {
Packit 3adb1e
        STATE_STATUS_LINE,      /* reading status line */
Packit 3adb1e
        STATE_HEADERS,          /* reading headers */
Packit 3adb1e
        STATE_BODY,             /* reading body */
Packit 3adb1e
        STATE_DONE              /* we've sent EOF */
Packit 3adb1e
    } state;
Packit 3adb1e
Packit 3adb1e
    /* Buffer for accumulating a line from the response. */
Packit 3adb1e
    serf_linebuf_t linebuf;
Packit 3adb1e
Packit 3adb1e
    int type; /* 0 = header, 1 = message */ /* TODO enum? */
Packit 3adb1e
    int channel;
Packit 3adb1e
    char *phrase;
Packit 3adb1e
    apr_size_t length;
Packit 3adb1e
} incoming_context_t;
Packit 3adb1e
Packit 3adb1e
Packit 3adb1e
serf_bucket_t *serf_bucket_bwtp_channel_close(
Packit 3adb1e
    int channel,
Packit 3adb1e
    serf_bucket_alloc_t *allocator)
Packit 3adb1e
{
Packit 3adb1e
    frame_context_t *ctx;
Packit 3adb1e
Packit 3adb1e
    ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
Packit 3adb1e
    ctx->type = 0;
Packit 3adb1e
    ctx->open = 0;
Packit 3adb1e
    ctx->channel = channel;
Packit 3adb1e
    ctx->phrase = "CLOSED";
Packit 3adb1e
    ctx->headers = serf_bucket_headers_create(allocator);
Packit 3adb1e
Packit 3adb1e
    return serf_bucket_create(&serf_bucket_type_bwtp_frame, allocator, ctx);
Packit 3adb1e
}
Packit 3adb1e
Packit 3adb1e
serf_bucket_t *serf_bucket_bwtp_channel_open(
Packit 3adb1e
    int channel,
Packit 3adb1e
    const char *uri,
Packit 3adb1e
    serf_bucket_alloc_t *allocator)
Packit 3adb1e
{
Packit 3adb1e
    frame_context_t *ctx;
Packit 3adb1e
Packit 3adb1e
    ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
Packit 3adb1e
    ctx->type = 0;
Packit 3adb1e
    ctx->open = 1;
Packit 3adb1e
    ctx->channel = channel;
Packit 3adb1e
    ctx->phrase = uri;
Packit 3adb1e
    ctx->headers = serf_bucket_headers_create(allocator);
Packit 3adb1e
Packit 3adb1e
    return serf_bucket_create(&serf_bucket_type_bwtp_frame, allocator, ctx);
Packit 3adb1e
}
Packit 3adb1e
Packit 3adb1e
serf_bucket_t *serf_bucket_bwtp_header_create(
Packit 3adb1e
    int channel,
Packit 3adb1e
    const char *phrase,
Packit 3adb1e
    serf_bucket_alloc_t *allocator)
Packit 3adb1e
{
Packit 3adb1e
    frame_context_t *ctx;
Packit 3adb1e
Packit 3adb1e
    ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
Packit 3adb1e
    ctx->type = 0;
Packit 3adb1e
    ctx->open = 0;
Packit 3adb1e
    ctx->channel = channel;
Packit 3adb1e
    ctx->phrase = phrase;
Packit 3adb1e
    ctx->headers = serf_bucket_headers_create(allocator);
Packit 3adb1e
Packit 3adb1e
    return serf_bucket_create(&serf_bucket_type_bwtp_frame, allocator, ctx);
Packit 3adb1e
}
Packit 3adb1e
Packit 3adb1e
serf_bucket_t *serf_bucket_bwtp_message_create(
Packit 3adb1e
    int channel,
Packit 3adb1e
    serf_bucket_t *body,
Packit 3adb1e
    serf_bucket_alloc_t *allocator)
Packit 3adb1e
{
Packit 3adb1e
    frame_context_t *ctx;
Packit 3adb1e
Packit 3adb1e
    ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
Packit 3adb1e
    ctx->type = 1;
Packit 3adb1e
    ctx->open = 0;
Packit 3adb1e
    ctx->channel = channel;
Packit 3adb1e
    ctx->phrase = "MESSAGE";
Packit 3adb1e
    ctx->headers = serf_bucket_headers_create(allocator);
Packit 3adb1e
Packit 3adb1e
    return serf_bucket_create(&serf_bucket_type_bwtp_frame, allocator, ctx);
Packit 3adb1e
}
Packit 3adb1e
Packit 3adb1e
int serf_bucket_bwtp_frame_get_channel(
Packit 3adb1e
    serf_bucket_t *bucket)
Packit 3adb1e
{
Packit 3adb1e
    if (SERF_BUCKET_IS_BWTP_FRAME(bucket)) {
Packit 3adb1e
        frame_context_t *ctx = bucket->data;
Packit 3adb1e
Packit 3adb1e
        return ctx->channel;
Packit 3adb1e
    }
Packit 3adb1e
    else if (SERF_BUCKET_IS_BWTP_INCOMING_FRAME(bucket)) {
Packit 3adb1e
        incoming_context_t *ctx = bucket->data;
Packit 3adb1e
Packit 3adb1e
        return ctx->channel;
Packit 3adb1e
    }
Packit 3adb1e
Packit 3adb1e
    return -1;
Packit 3adb1e
}
Packit 3adb1e
Packit 3adb1e
int serf_bucket_bwtp_frame_get_type(
Packit 3adb1e
    serf_bucket_t *bucket)
Packit 3adb1e
{
Packit 3adb1e
    if (SERF_BUCKET_IS_BWTP_FRAME(bucket)) {
Packit 3adb1e
        frame_context_t *ctx = bucket->data;
Packit 3adb1e
Packit 3adb1e
        return ctx->type;
Packit 3adb1e
    }
Packit 3adb1e
    else if (SERF_BUCKET_IS_BWTP_INCOMING_FRAME(bucket)) {
Packit 3adb1e
        incoming_context_t *ctx = bucket->data;
Packit 3adb1e
Packit 3adb1e
        return ctx->type;
Packit 3adb1e
    }
Packit 3adb1e
Packit 3adb1e
    return -1;
Packit 3adb1e
}
Packit 3adb1e
Packit 3adb1e
const char *serf_bucket_bwtp_frame_get_phrase(
Packit 3adb1e
    serf_bucket_t *bucket)
Packit 3adb1e
{
Packit 3adb1e
    if (SERF_BUCKET_IS_BWTP_FRAME(bucket)) {
Packit 3adb1e
        frame_context_t *ctx = bucket->data;
Packit 3adb1e
Packit 3adb1e
        return ctx->phrase;
Packit 3adb1e
    }
Packit 3adb1e
    else if (SERF_BUCKET_IS_BWTP_INCOMING_FRAME(bucket)) {
Packit 3adb1e
        incoming_context_t *ctx = bucket->data;
Packit 3adb1e
Packit 3adb1e
        return ctx->phrase;
Packit 3adb1e
    }
Packit 3adb1e
Packit 3adb1e
    return NULL;
Packit 3adb1e
}
Packit 3adb1e
Packit 3adb1e
serf_bucket_t *serf_bucket_bwtp_frame_get_headers(
Packit 3adb1e
    serf_bucket_t *bucket)
Packit 3adb1e
{
Packit 3adb1e
    if (SERF_BUCKET_IS_BWTP_FRAME(bucket)) {
Packit 3adb1e
        frame_context_t *ctx = bucket->data;
Packit 3adb1e
Packit 3adb1e
        return ctx->headers;
Packit 3adb1e
    }
Packit 3adb1e
    else if (SERF_BUCKET_IS_BWTP_INCOMING_FRAME(bucket)) {
Packit 3adb1e
        incoming_context_t *ctx = bucket->data;
Packit 3adb1e
Packit 3adb1e
        return ctx->headers;
Packit 3adb1e
    }
Packit 3adb1e
Packit 3adb1e
    return NULL;
Packit 3adb1e
}
Packit 3adb1e
Packit 3adb1e
static int count_size(void *baton, const char *key, const char *value)
Packit 3adb1e
{
Packit 3adb1e
    apr_size_t *c = baton;
Packit 3adb1e
    /* TODO Deal with folding.  Yikes. */
Packit 3adb1e
Packit 3adb1e
    /* Add in ": " and CRLF - so an extra four bytes. */
Packit 3adb1e
    *c += strlen(key) + strlen(value) + 4;
Packit 3adb1e
Packit 3adb1e
    return 0;
Packit 3adb1e
}
Packit 3adb1e
Packit 3adb1e
static apr_size_t calc_header_size(serf_bucket_t *hdrs)
Packit 3adb1e
{
Packit 3adb1e
    apr_size_t size = 0;
Packit 3adb1e
Packit 3adb1e
    serf_bucket_headers_do(hdrs, count_size, &size);
Packit 3adb1e
Packit 3adb1e
    return size;
Packit 3adb1e
}
Packit 3adb1e
Packit 3adb1e
static void serialize_data(serf_bucket_t *bucket)
Packit 3adb1e
{
Packit 3adb1e
    frame_context_t *ctx = bucket->data;
Packit 3adb1e
    serf_bucket_t *new_bucket;
Packit 3adb1e
    apr_size_t req_len;
Packit 3adb1e
Packit 3adb1e
    /* Serialize the request-line and headers into one mother string,
Packit 3adb1e
     * and wrap a bucket around it.
Packit 3adb1e
     */
Packit 3adb1e
    req_len = apr_snprintf(ctx->req_line, sizeof(ctx->req_line),
Packit 3adb1e
                           "%s %d " "%" APR_UINT64_T_HEX_FMT " %s%s\r\n",
Packit 3adb1e
                           (ctx->type ? "BWM" : "BWH"),
Packit 3adb1e
                           ctx->channel, calc_header_size(ctx->headers),
Packit 3adb1e
                           (ctx->open ? "OPEN " : ""),
Packit 3adb1e
                           ctx->phrase);
Packit 3adb1e
    new_bucket = serf_bucket_simple_copy_create(ctx->req_line, req_len,
Packit 3adb1e
                                                bucket->allocator);
Packit 3adb1e
Packit 3adb1e
    /* Build up the new bucket structure.
Packit 3adb1e
     *
Packit 3adb1e
     * Note that self needs to become an aggregate bucket so that a
Packit 3adb1e
     * pointer to self still represents the "right" data.
Packit 3adb1e
     */
Packit 3adb1e
    serf_bucket_aggregate_become(bucket);
Packit 3adb1e
Packit 3adb1e
    /* Insert the two buckets. */
Packit 3adb1e
    serf_bucket_aggregate_append(bucket, new_bucket);
Packit 3adb1e
    serf_bucket_aggregate_append(bucket, ctx->headers);
Packit 3adb1e
Packit 3adb1e
    /* Our private context is no longer needed, and is not referred to by
Packit 3adb1e
     * any existing bucket. Toss it.
Packit 3adb1e
     */
Packit 3adb1e
    serf_bucket_mem_free(bucket->allocator, ctx);
Packit 3adb1e
}
Packit 3adb1e
Packit 3adb1e
static apr_status_t serf_bwtp_frame_read(serf_bucket_t *bucket,
Packit 3adb1e
                                         apr_size_t requested,
Packit 3adb1e
                                         const char **data, apr_size_t *len)
Packit 3adb1e
{
Packit 3adb1e
    /* Seralize our private data into a new aggregate bucket. */
Packit 3adb1e
    serialize_data(bucket);
Packit 3adb1e
Packit 3adb1e
    /* Delegate to the "new" aggregate bucket to do the read. */
Packit 3adb1e
    return serf_bucket_read(bucket, requested, data, len);
Packit 3adb1e
}
Packit 3adb1e
Packit 3adb1e
static apr_status_t serf_bwtp_frame_readline(serf_bucket_t *bucket,
Packit 3adb1e
                                             int acceptable, int *found,
Packit 3adb1e
                                             const char **data, apr_size_t *len)
Packit 3adb1e
{
Packit 3adb1e
    /* Seralize our private data into a new aggregate bucket. */
Packit 3adb1e
    serialize_data(bucket);
Packit 3adb1e
Packit 3adb1e
    /* Delegate to the "new" aggregate bucket to do the readline. */
Packit 3adb1e
    return serf_bucket_readline(bucket, acceptable, found, data, len);
Packit 3adb1e
}
Packit 3adb1e
Packit 3adb1e
static apr_status_t serf_bwtp_frame_read_iovec(serf_bucket_t *bucket,
Packit 3adb1e
                                               apr_size_t requested,
Packit 3adb1e
                                               int vecs_size,
Packit 3adb1e
                                               struct iovec *vecs,
Packit 3adb1e
                                               int *vecs_used)
Packit 3adb1e
{
Packit 3adb1e
    /* Seralize our private data into a new aggregate bucket. */
Packit 3adb1e
    serialize_data(bucket);
Packit 3adb1e
Packit 3adb1e
    /* Delegate to the "new" aggregate bucket to do the read. */
Packit 3adb1e
    return serf_bucket_read_iovec(bucket, requested,
Packit 3adb1e
                                  vecs_size, vecs, vecs_used);
Packit 3adb1e
}
Packit 3adb1e
Packit 3adb1e
static apr_status_t serf_bwtp_frame_peek(serf_bucket_t *bucket,
Packit 3adb1e
                                         const char **data,
Packit 3adb1e
                                         apr_size_t *len)
Packit 3adb1e
{
Packit 3adb1e
    /* Seralize our private data into a new aggregate bucket. */
Packit 3adb1e
    serialize_data(bucket);
Packit 3adb1e
Packit 3adb1e
    /* Delegate to the "new" aggregate bucket to do the peek. */
Packit 3adb1e
    return serf_bucket_peek(bucket, data, len);
Packit 3adb1e
}
Packit 3adb1e
Packit 3adb1e
const serf_bucket_type_t serf_bucket_type_bwtp_frame = {
Packit 3adb1e
    "BWTP-FRAME",
Packit 3adb1e
    serf_bwtp_frame_read,
Packit 3adb1e
    serf_bwtp_frame_readline,
Packit 3adb1e
    serf_bwtp_frame_read_iovec,
Packit 3adb1e
    serf_default_read_for_sendfile,
Packit 3adb1e
    serf_default_read_bucket,
Packit 3adb1e
    serf_bwtp_frame_peek,
Packit 3adb1e
    serf_default_destroy_and_data,
Packit 3adb1e
};
Packit 3adb1e
Packit 3adb1e
Packit 3adb1e
serf_bucket_t *serf_bucket_bwtp_incoming_frame_create(
Packit 3adb1e
    serf_bucket_t *stream,
Packit 3adb1e
    serf_bucket_alloc_t *allocator)
Packit 3adb1e
{
Packit 3adb1e
    incoming_context_t *ctx;
Packit 3adb1e
Packit 3adb1e
    ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
Packit 3adb1e
    ctx->stream = stream;
Packit 3adb1e
    ctx->body = NULL;
Packit 3adb1e
    ctx->headers = serf_bucket_headers_create(allocator);
Packit 3adb1e
    ctx->state = STATE_STATUS_LINE;
Packit 3adb1e
    ctx->length = 0;
Packit 3adb1e
    ctx->channel = -1;
Packit 3adb1e
    ctx->phrase = NULL;
Packit 3adb1e
Packit 3adb1e
    serf_linebuf_init(&ctx->linebuf);
Packit 3adb1e
Packit 3adb1e
    return serf_bucket_create(&serf_bucket_type_bwtp_incoming_frame, allocator, ctx);
Packit 3adb1e
}
Packit 3adb1e
Packit 3adb1e
static void bwtp_incoming_destroy_and_data(serf_bucket_t *bucket)
Packit 3adb1e
{
Packit 3adb1e
    incoming_context_t *ctx = bucket->data;
Packit 3adb1e
Packit 3adb1e
    if (ctx->state != STATE_STATUS_LINE && ctx->phrase) {
Packit 3adb1e
        serf_bucket_mem_free(bucket->allocator, (void*)ctx->phrase);
Packit 3adb1e
    }
Packit 3adb1e
Packit 3adb1e
    serf_bucket_destroy(ctx->stream);
Packit 3adb1e
    if (ctx->body != NULL)
Packit 3adb1e
        serf_bucket_destroy(ctx->body);
Packit 3adb1e
    serf_bucket_destroy(ctx->headers);
Packit 3adb1e
Packit 3adb1e
    serf_default_destroy_and_data(bucket);
Packit 3adb1e
}
Packit 3adb1e
Packit 3adb1e
static apr_status_t fetch_line(incoming_context_t *ctx, int acceptable)
Packit 3adb1e
{
Packit 3adb1e
    return serf_linebuf_fetch(&ctx->linebuf, ctx->stream, acceptable);
Packit 3adb1e
}
Packit 3adb1e
Packit 3adb1e
static apr_status_t parse_status_line(incoming_context_t *ctx,
Packit 3adb1e
                                      serf_bucket_alloc_t *allocator)
Packit 3adb1e
{
Packit 3adb1e
    int res;
Packit 3adb1e
    char *reason; /* ### stupid APR interface makes this non-const */
Packit 3adb1e
Packit 3adb1e
    /* ctx->linebuf.line should be of form: BW* */
Packit 3adb1e
    res = apr_date_checkmask(ctx->linebuf.line, "BW*");
Packit 3adb1e
    if (!res) {
Packit 3adb1e
        /* Not an BWTP response?  Well, at least we won't understand it. */
Packit 3adb1e
        return APR_EGENERAL;
Packit 3adb1e
    }
Packit 3adb1e
Packit 3adb1e
    if (ctx->linebuf.line[2] == 'H') {
Packit 3adb1e
        ctx->type = 0;
Packit 3adb1e
    }
Packit 3adb1e
    else if (ctx->linebuf.line[2] == 'M') {
Packit 3adb1e
        ctx->type = 1;
Packit 3adb1e
    }
Packit 3adb1e
    else {
Packit 3adb1e
        ctx->type = -1;
Packit 3adb1e
    }
Packit 3adb1e
Packit 3adb1e
    ctx->channel = apr_strtoi64(ctx->linebuf.line + 3, &reason, 16);
Packit 3adb1e
Packit 3adb1e
    /* Skip leading spaces for the reason string. */
Packit 3adb1e
    if (apr_isspace(*reason)) {
Packit 3adb1e
        reason++;
Packit 3adb1e
    }
Packit 3adb1e
Packit 3adb1e
    ctx->length = apr_strtoi64(reason, &reason, 16);
Packit 3adb1e
Packit 3adb1e
    /* Skip leading spaces for the reason string. */
Packit 3adb1e
    if (reason - ctx->linebuf.line < ctx->linebuf.used) {
Packit 3adb1e
        if (apr_isspace(*reason)) {
Packit 3adb1e
            reason++;
Packit 3adb1e
        }
Packit 3adb1e
Packit 3adb1e
        ctx->phrase = serf_bstrmemdup(allocator, reason,
Packit 3adb1e
                                      ctx->linebuf.used
Packit 3adb1e
                                      - (reason - ctx->linebuf.line));
Packit 3adb1e
    } else {
Packit 3adb1e
        ctx->phrase = NULL;
Packit 3adb1e
    }
Packit 3adb1e
Packit 3adb1e
    return APR_SUCCESS;
Packit 3adb1e
}
Packit 3adb1e
Packit 3adb1e
/* This code should be replaced with header buckets. */
Packit 3adb1e
static apr_status_t fetch_headers(serf_bucket_t *bkt, incoming_context_t *ctx)
Packit 3adb1e
{
Packit 3adb1e
    apr_status_t status;
Packit 3adb1e
Packit 3adb1e
    /* RFC 2616 says that CRLF is the only line ending, but we can easily
Packit 3adb1e
     * accept any kind of line ending.
Packit 3adb1e
     */
Packit 3adb1e
    status = fetch_line(ctx, SERF_NEWLINE_ANY);
Packit 3adb1e
    if (SERF_BUCKET_READ_ERROR(status)) {
Packit 3adb1e
        return status;
Packit 3adb1e
    }
Packit 3adb1e
    /* Something was read. Process it. */
Packit 3adb1e
Packit 3adb1e
    if (ctx->linebuf.state == SERF_LINEBUF_READY && ctx->linebuf.used) {
Packit 3adb1e
        const char *end_key;
Packit 3adb1e
        const char *c;
Packit 3adb1e
Packit 3adb1e
        end_key = c = memchr(ctx->linebuf.line, ':', ctx->linebuf.used);
Packit 3adb1e
        if (!c) {
Packit 3adb1e
            /* Bad headers? */
Packit 3adb1e
            return APR_EGENERAL;
Packit 3adb1e
        }
Packit 3adb1e
Packit 3adb1e
        /* Skip over initial : and spaces. */
Packit 3adb1e
        while (apr_isspace(*++c))
Packit 3adb1e
            continue;
Packit 3adb1e
Packit 3adb1e
        /* Always copy the headers (from the linebuf into new mem). */
Packit 3adb1e
        /* ### we should be able to optimize some mem copies */
Packit 3adb1e
        serf_bucket_headers_setx(
Packit 3adb1e
            ctx->headers,
Packit 3adb1e
            ctx->linebuf.line, end_key - ctx->linebuf.line, 1,
Packit 3adb1e
            c, ctx->linebuf.line + ctx->linebuf.used - c, 1);
Packit 3adb1e
    }
Packit 3adb1e
Packit 3adb1e
    return status;
Packit 3adb1e
}
Packit 3adb1e
Packit 3adb1e
/* Perform one iteration of the state machine.
Packit 3adb1e
 *
Packit 3adb1e
 * Will return when one the following conditions occurred:
Packit 3adb1e
 *  1) a state change
Packit 3adb1e
 *  2) an error
Packit 3adb1e
 *  3) the stream is not ready or at EOF
Packit 3adb1e
 *  4) APR_SUCCESS, meaning the machine can be run again immediately
Packit 3adb1e
 */
Packit 3adb1e
static apr_status_t run_machine(serf_bucket_t *bkt, incoming_context_t *ctx)
Packit 3adb1e
{
Packit 3adb1e
    apr_status_t status = APR_SUCCESS; /* initialize to avoid gcc warnings */
Packit 3adb1e
Packit 3adb1e
    switch (ctx->state) {
Packit 3adb1e
    case STATE_STATUS_LINE:
Packit 3adb1e
        /* RFC 2616 says that CRLF is the only line ending, but we can easily
Packit 3adb1e
         * accept any kind of line ending.
Packit 3adb1e
         */
Packit 3adb1e
        status = fetch_line(ctx, SERF_NEWLINE_ANY);
Packit 3adb1e
        if (SERF_BUCKET_READ_ERROR(status))
Packit 3adb1e
            return status;
Packit 3adb1e
Packit 3adb1e
        if (ctx->linebuf.state == SERF_LINEBUF_READY && ctx->linebuf.used) {
Packit 3adb1e
            /* The Status-Line is in the line buffer. Process it. */
Packit 3adb1e
            status = parse_status_line(ctx, bkt->allocator);
Packit 3adb1e
            if (status)
Packit 3adb1e
                return status;
Packit 3adb1e
Packit 3adb1e
            if (ctx->length) {
Packit 3adb1e
                ctx->body =
Packit 3adb1e
                    serf_bucket_barrier_create(ctx->stream, bkt->allocator);
Packit 3adb1e
                ctx->body = serf_bucket_limit_create(ctx->body, ctx->length,
Packit 3adb1e
                                                     bkt->allocator);
Packit 3adb1e
                if (!ctx->type) {
Packit 3adb1e
                    ctx->state = STATE_HEADERS;
Packit 3adb1e
                } else {
Packit 3adb1e
                    ctx->state = STATE_BODY;
Packit 3adb1e
                }
Packit 3adb1e
            } else {
Packit 3adb1e
                ctx->state = STATE_DONE;
Packit 3adb1e
            }
Packit 3adb1e
        }
Packit 3adb1e
        else {
Packit 3adb1e
            /* The connection closed before we could get the next
Packit 3adb1e
             * response.  Treat the request as lost so that our upper
Packit 3adb1e
             * end knows the server never tried to give us a response.
Packit 3adb1e
             */
Packit 3adb1e
            if (APR_STATUS_IS_EOF(status)) {
Packit 3adb1e
                return SERF_ERROR_REQUEST_LOST;
Packit 3adb1e
            }
Packit 3adb1e
        }
Packit 3adb1e
        break;
Packit 3adb1e
    case STATE_HEADERS:
Packit 3adb1e
        status = fetch_headers(ctx->body, ctx);
Packit 3adb1e
        if (SERF_BUCKET_READ_ERROR(status))
Packit 3adb1e
            return status;
Packit 3adb1e
Packit 3adb1e
        /* If an empty line was read, then we hit the end of the headers.
Packit 3adb1e
         * Move on to the body.
Packit 3adb1e
         */
Packit 3adb1e
        if (ctx->linebuf.state == SERF_LINEBUF_READY && !ctx->linebuf.used) {
Packit 3adb1e
            /* Advance the state. */
Packit 3adb1e
            ctx->state = STATE_DONE;
Packit 3adb1e
        }
Packit 3adb1e
        break;
Packit 3adb1e
    case STATE_BODY:
Packit 3adb1e
        /* Don't do anything. */
Packit 3adb1e
        break;
Packit 3adb1e
    case STATE_DONE:
Packit 3adb1e
        return APR_EOF;
Packit 3adb1e
    default:
Packit 3adb1e
        /* Not reachable */
Packit 3adb1e
        return APR_EGENERAL;
Packit 3adb1e
    }
Packit 3adb1e
Packit 3adb1e
    return status;
Packit 3adb1e
}
Packit 3adb1e
Packit 3adb1e
static apr_status_t wait_for_body(serf_bucket_t *bkt, incoming_context_t *ctx)
Packit 3adb1e
{
Packit 3adb1e
    apr_status_t status;
Packit 3adb1e
Packit 3adb1e
    /* Keep reading and moving through states if we aren't at the BODY */
Packit 3adb1e
    while (ctx->state != STATE_BODY) {
Packit 3adb1e
        status = run_machine(bkt, ctx);
Packit 3adb1e
Packit 3adb1e
        /* Anything other than APR_SUCCESS means that we cannot immediately
Packit 3adb1e
         * read again (for now).
Packit 3adb1e
         */
Packit 3adb1e
        if (status)
Packit 3adb1e
            return status;
Packit 3adb1e
    }
Packit 3adb1e
    /* in STATE_BODY */
Packit 3adb1e
Packit 3adb1e
    return APR_SUCCESS;
Packit 3adb1e
}
Packit 3adb1e
Packit 3adb1e
apr_status_t serf_bucket_bwtp_incoming_frame_wait_for_headers(
Packit 3adb1e
    serf_bucket_t *bucket)
Packit 3adb1e
{
Packit 3adb1e
    incoming_context_t *ctx = bucket->data;
Packit 3adb1e
Packit 3adb1e
    return wait_for_body(bucket, ctx);
Packit 3adb1e
}
Packit 3adb1e
Packit 3adb1e
static apr_status_t bwtp_incoming_read(serf_bucket_t *bucket,
Packit 3adb1e
                                       apr_size_t requested,
Packit 3adb1e
                                       const char **data, apr_size_t *len)
Packit 3adb1e
{
Packit 3adb1e
    incoming_context_t *ctx = bucket->data;
Packit 3adb1e
    apr_status_t rv;
Packit 3adb1e
Packit 3adb1e
    rv = wait_for_body(bucket, ctx);
Packit 3adb1e
    if (rv) {
Packit 3adb1e
        /* It's not possible to have read anything yet! */
Packit 3adb1e
        if (APR_STATUS_IS_EOF(rv) || APR_STATUS_IS_EAGAIN(rv)) {
Packit 3adb1e
            *len = 0;
Packit 3adb1e
        }
Packit 3adb1e
        return rv;
Packit 3adb1e
    }
Packit 3adb1e
Packit 3adb1e
    rv = serf_bucket_read(ctx->body, requested, data, len);
Packit 3adb1e
    if (APR_STATUS_IS_EOF(rv)) {
Packit 3adb1e
        ctx->state = STATE_DONE;
Packit 3adb1e
    }
Packit 3adb1e
    return rv;
Packit 3adb1e
}
Packit 3adb1e
Packit 3adb1e
static apr_status_t bwtp_incoming_readline(serf_bucket_t *bucket,
Packit 3adb1e
                                           int acceptable, int *found,
Packit 3adb1e
                                           const char **data, apr_size_t *len)
Packit 3adb1e
{
Packit 3adb1e
    incoming_context_t *ctx = bucket->data;
Packit 3adb1e
    apr_status_t rv;
Packit 3adb1e
Packit 3adb1e
    rv = wait_for_body(bucket, ctx);
Packit 3adb1e
    if (rv) {
Packit 3adb1e
        return rv;
Packit 3adb1e
    }
Packit 3adb1e
Packit 3adb1e
    /* Delegate to the stream bucket to do the readline. */
Packit 3adb1e
    return serf_bucket_readline(ctx->body, acceptable, found, data, len);
Packit 3adb1e
}
Packit 3adb1e
Packit 3adb1e
/* ### need to implement */
Packit 3adb1e
#define bwtp_incoming_peek NULL
Packit 3adb1e
Packit 3adb1e
const serf_bucket_type_t serf_bucket_type_bwtp_incoming_frame = {
Packit 3adb1e
    "BWTP-INCOMING",
Packit 3adb1e
    bwtp_incoming_read,
Packit 3adb1e
    bwtp_incoming_readline,
Packit 3adb1e
    serf_default_read_iovec,
Packit 3adb1e
    serf_default_read_for_sendfile,
Packit 3adb1e
    serf_default_read_bucket,
Packit 3adb1e
    bwtp_incoming_peek,
Packit 3adb1e
    bwtp_incoming_destroy_and_data,
Packit 3adb1e
};