Blob Blame History Raw
/*
  Copyright (c) 2008-2015 Red Hat, Inc. <http://www.redhat.com>
  This file is part of GlusterFS.

  This file is licensed to you under your choice of the GNU Lesser
  General Public License, version 3 or any later version (LGPLv3 or
  later), or the GNU General Public License, version 2 (GPLv2), in all
  cases as published by the Free Software Foundation.
*/

#include <math.h>

#include "glusterfs/mem-types.h"
#include "glusterfs/mem-pool.h"

#include "glusterfs/rot-buffs.h"

/**
 * Producer-Consumer based on top of rotational buffers.
 *
 * This favours writers (producer) and keeps the critical section
 * light weight. Buffer switch happens when a consumer wants to
 * consume data. This is the slow path and waits for pending
 * writes to finish.
 *
 * TODO: do away with opaques (use arrays with indexing).
 */

#define ROT_BUFF_DEFAULT_COUNT 2
#define ROT_BUFF_ALLOC_SIZE (1 * 1024 * 1024) /* 1MB per iovec */

#define RLIST_IOV_MELDED_ALLOC_SIZE (RBUF_IOVEC_SIZE + ROT_BUFF_ALLOC_SIZE)

/**
 * iovec list is not shrunk (deallocated) if usage/total count
 * falls in this range. this is the fast path and should satisfy
 * most of the workloads. for the rest shrinking iovec list is
 * generous.
 */
#define RVEC_LOW_WATERMARK_COUNT 1
#define RVEC_HIGH_WATERMARK_COUNT (1 << 4)

static inline rbuf_list_t *
rbuf_current_buffer(rbuf_t *rbuf)
{
    return rbuf->current;
}

static void
rlist_mark_waiting(rbuf_list_t *rlist)
{
    LOCK(&rlist->c_lock);
    {
        rlist->awaiting = _gf_true;
    }
    UNLOCK(&rlist->c_lock);
}

static int
__rlist_has_waiter(rbuf_list_t *rlist)
{
    return (rlist->awaiting == _gf_true);
}

static void *
rbuf_alloc_rvec()
{
    return GF_CALLOC(1, RLIST_IOV_MELDED_ALLOC_SIZE, gf_common_mt_rvec_t);
}

static void
rlist_reset_vector_usage(rbuf_list_t *rlist)
{
    rlist->used = 1;
}

static void
rlist_increment_vector_usage(rbuf_list_t *rlist)
{
    rlist->used++;
}

static void
rlist_increment_total_usage(rbuf_list_t *rlist)
{
    rlist->total++;
}

static int
rvec_in_watermark_range(rbuf_list_t *rlist)
{
    return ((rlist->total >= RVEC_LOW_WATERMARK_COUNT) &&
            (rlist->total <= RVEC_HIGH_WATERMARK_COUNT));
}

static void
rbuf_reset_rvec(rbuf_iovec_t *rvec)
{
    GF_VALIDATE_OR_GOTO("libglusterfs", rvec, err);
    /* iov_base is _never_ modified */
    rvec->iov.iov_len = 0;
err:
    return;
}

/* TODO: alloc multiple rbuf_iovec_t */
static int
rlist_add_new_vec(rbuf_list_t *rlist)
{
    rbuf_iovec_t *rvec = NULL;

    rvec = (rbuf_iovec_t *)rbuf_alloc_rvec();
    if (!rvec)
        return -1;
    INIT_LIST_HEAD(&rvec->list);
    rvec->iov.iov_base = ((char *)rvec) + RBUF_IOVEC_SIZE;
    rvec->iov.iov_len = 0;

    list_add_tail(&rvec->list, &rlist->veclist);

    rlist->rvec = rvec; /* cache the latest */

    rlist_increment_vector_usage(rlist);
    rlist_increment_total_usage(rlist);

    return 0;
}

static void
rlist_free_rvec(rbuf_iovec_t *rvec)
{
    if (!rvec)
        return;
    list_del(&rvec->list);
    GF_FREE(rvec);
}

static void
rlist_purge_all_rvec(rbuf_list_t *rlist)
{
    rbuf_iovec_t *rvec = NULL;

    if (!rlist)
        return;
    while (!list_empty(&rlist->veclist)) {
        rvec = list_first_entry(&rlist->veclist, rbuf_iovec_t, list);
        rlist_free_rvec(rvec);
    }
}

static void
rlist_shrink_rvec(rbuf_list_t *rlist, unsigned long long shrink)
{
    rbuf_iovec_t *rvec = NULL;

    while (!list_empty(&rlist->veclist) && (shrink-- > 0)) {
        rvec = list_first_entry(&rlist->veclist, rbuf_iovec_t, list);
        rlist_free_rvec(rvec);
    }
}

static void
rbuf_purge_rlist(rbuf_t *rbuf)
{
    rbuf_list_t *rlist = NULL;

    while (!list_empty(&rbuf->freelist)) {
        rlist = list_first_entry(&rbuf->freelist, rbuf_list_t, list);
        list_del(&rlist->list);

        rlist_purge_all_rvec(rlist);

        LOCK_DESTROY(&rlist->c_lock);

        (void)pthread_mutex_destroy(&rlist->b_lock);
        (void)pthread_cond_destroy(&rlist->b_cond);

        GF_FREE(rlist);
    }
}

rbuf_t *
rbuf_init(int bufcount)
{
    int j = 0;
    int ret = 0;
    rbuf_t *rbuf = NULL;
    rbuf_list_t *rlist = NULL;

    if (bufcount <= 0)
        bufcount = ROT_BUFF_DEFAULT_COUNT;

    rbuf = GF_CALLOC(1, sizeof(rbuf_t), gf_common_mt_rbuf_t);
    if (!rbuf)
        goto error_return;

    LOCK_INIT(&rbuf->lock);
    INIT_LIST_HEAD(&rbuf->freelist);

    /* it could have been one big calloc() but this is just once.. */
    for (j = 0; j < bufcount; j++) {
        rlist = GF_CALLOC(1, sizeof(rbuf_list_t), gf_common_mt_rlist_t);
        if (!rlist) {
            ret = -1;
            break;
        }

        INIT_LIST_HEAD(&rlist->list);
        INIT_LIST_HEAD(&rlist->veclist);

        rlist->pending = rlist->completed = 0;

        ret = rlist_add_new_vec(rlist);
        if (ret)
            break;

        LOCK_INIT(&rlist->c_lock);

        rlist->awaiting = _gf_false;
        ret = pthread_mutex_init(&rlist->b_lock, 0);
        if (ret != 0) {
            GF_FREE(rlist);
            break;
        }

        ret = pthread_cond_init(&rlist->b_cond, 0);
        if (ret != 0) {
            GF_FREE(rlist);
            break;
        }

        list_add_tail(&rlist->list, &rbuf->freelist);
    }

    if (ret != 0)
        goto dealloc_rlist;

    /* cache currently used buffer: first in the list */
    rbuf->current = list_first_entry(&rbuf->freelist, rbuf_list_t, list);
    return rbuf;

dealloc_rlist:
    rbuf_purge_rlist(rbuf);
    LOCK_DESTROY(&rbuf->lock);
    GF_FREE(rbuf);
error_return:
    return NULL;
}

void
rbuf_dtor(rbuf_t *rbuf)
{
    if (!rbuf)
        return;
    rbuf->current = NULL;
    rbuf_purge_rlist(rbuf);
    LOCK_DESTROY(&rbuf->lock);

    GF_FREE(rbuf);
}

static char *
rbuf_adjust_write_area(struct iovec *iov, size_t bytes)
{
    char *wbuf = NULL;

    wbuf = iov->iov_base + iov->iov_len;
    iov->iov_len += bytes;
    return wbuf;
}

static char *
rbuf_alloc_write_area(rbuf_list_t *rlist, size_t bytes)
{
    int ret = 0;
    struct iovec *iov = NULL;

    /* check for available space in _current_ IO buffer */
    iov = &rlist->rvec->iov;
    if (iov->iov_len + bytes <= ROT_BUFF_ALLOC_SIZE)
        return rbuf_adjust_write_area(iov, bytes); /* fast path */

    /* not enough bytes, try next available buffers */
    if (list_is_last(&rlist->rvec->list, &rlist->veclist)) {
        /* OH! consumed all vector buffers */
        GF_ASSERT(rlist->used == rlist->total);
        ret = rlist_add_new_vec(rlist);
        if (ret)
            goto error_return;
    } else {
        /* not the end, have available rbuf_iovec's */
        rlist->rvec = list_next_entry(rlist->rvec, list);
        rlist->used++;
        rbuf_reset_rvec(rlist->rvec);
    }

    iov = &rlist->rvec->iov;
    return rbuf_adjust_write_area(iov, bytes);

error_return:
    return NULL;
}

char *
rbuf_reserve_write_area(rbuf_t *rbuf, size_t bytes, void **opaque)
{
    char *wbuf = NULL;
    rbuf_list_t *rlist = NULL;

    if (!rbuf || (bytes <= 0) || (bytes > ROT_BUFF_ALLOC_SIZE) || !opaque)
        return NULL;

    LOCK(&rbuf->lock);
    {
        rlist = rbuf_current_buffer(rbuf);
        wbuf = rbuf_alloc_write_area(rlist, bytes);
        if (!wbuf)
            goto unblock;
        rlist->pending++;
    }
unblock:
    UNLOCK(&rbuf->lock);

    if (wbuf)
        *opaque = rlist;
    return wbuf;
}

static void
rbuf_notify_waiter(rbuf_list_t *rlist)
{
    pthread_mutex_lock(&rlist->b_lock);
    {
        pthread_cond_signal(&rlist->b_cond);
    }
    pthread_mutex_unlock(&rlist->b_lock);
}

int
rbuf_write_complete(void *opaque)
{
    rbuf_list_t *rlist = NULL;
    gf_boolean_t notify = _gf_false;

    if (!opaque)
        return -1;

    rlist = opaque;

    LOCK(&rlist->c_lock);
    {
        rlist->completed++;
        /**
         * it's safe to test ->pending without rbuf->lock *only* if
         * there's a waiter as there can be no new incoming writes.
         */
        if (__rlist_has_waiter(rlist) && (rlist->completed == rlist->pending))
            notify = _gf_true;
    }
    UNLOCK(&rlist->c_lock);

    if (notify)
        rbuf_notify_waiter(rlist);

    return 0;
}

int
rbuf_get_buffer(rbuf_t *rbuf, void **opaque, sequence_fn *seqfn, void *mydata)
{
    int retval = RBUF_CONSUMABLE;
    rbuf_list_t *rlist = NULL;

    if (!rbuf || !opaque)
        return -1;

    LOCK(&rbuf->lock);
    {
        rlist = rbuf_current_buffer(rbuf);
        if (!rlist->pending) {
            retval = RBUF_EMPTY;
            goto unblock;
        }

        if (list_is_singular(&rbuf->freelist)) {
            /**
             * removal would lead to writer starvation, disallow
             * switching.
             */
            retval = RBUF_WOULD_STARVE;
            goto unblock;
        }

        list_del_init(&rlist->list);
        if (seqfn)
            seqfn(rlist, mydata);
        rbuf->current = list_first_entry(&rbuf->freelist, rbuf_list_t, list);
    }
unblock:
    UNLOCK(&rbuf->lock);

    if (retval == RBUF_CONSUMABLE)
        *opaque = rlist; /* caller _owns_ the buffer */

    return retval;
}

/**
 * Wait for completion of pending writers and invoke dispatcher
 * routine (for buffer consumption).
 */

static void
__rbuf_wait_for_writers(rbuf_list_t *rlist)
{
    while (rlist->completed != rlist->pending)
        pthread_cond_wait(&rlist->b_cond, &rlist->b_lock);
}

#ifndef M_E
#define M_E 2.7
#endif

static void
rlist_shrink_vector(rbuf_list_t *rlist)
{
    unsigned long long shrink = 0;

    /**
     * fast path: don't bother to deallocate if vectors are hardly
     * used.
     */
    if (rvec_in_watermark_range(rlist))
        return;

    /**
     * Calculate the shrink count based on total allocated vectors.
     * Note that the calculation sticks to rlist->total irrespective
     * of the actual usage count (rlist->used). Later, ->used could
     * be used to apply slack to the calculation based on how much
     * it lags from ->total. For now, let's stick to slow decay.
     */
    shrink = rlist->total - (rlist->total * pow(M_E, -0.2));

    rlist_shrink_rvec(rlist, shrink);
    rlist->total -= shrink;
}

int
rbuf_wait_for_completion(rbuf_t *rbuf, void *opaque,
                         void (*fn)(rbuf_list_t *, void *), void *arg)
{
    rbuf_list_t *rlist = NULL;

    if (!rbuf || !opaque)
        return -1;

    rlist = opaque;

    pthread_mutex_lock(&rlist->b_lock);
    {
        rlist_mark_waiting(rlist);
        __rbuf_wait_for_writers(rlist);
    }
    pthread_mutex_unlock(&rlist->b_lock);

    /**
     * from here on, no need of locking until the rlist is put
     * back into rotation.
     */

    fn(rlist, arg); /* invoke dispatcher */

    rlist->awaiting = _gf_false;
    rlist->pending = rlist->completed = 0;

    rlist_shrink_vector(rlist);
    rlist_reset_vector_usage(rlist);

    rlist->rvec = list_first_entry(&rlist->veclist, rbuf_iovec_t, list);
    rbuf_reset_rvec(rlist->rvec);

    LOCK(&rbuf->lock);
    {
        list_add_tail(&rlist->list, &rbuf->freelist);
    }
    UNLOCK(&rbuf->lock);

    return 0;
}