/*
Copyright (c) 2008-2015 Red Hat, Inc. <http://www.redhat.com>
This file is part of GlusterFS.
This file is licensed to you under your choice of the GNU Lesser
General Public License, version 3 or any later version (LGPLv3 or
later), or the GNU General Public License, version 2 (GPLv2), in all
cases as published by the Free Software Foundation.
*/
#include <math.h>
#include "glusterfs/mem-types.h"
#include "glusterfs/mem-pool.h"
#include "glusterfs/rot-buffs.h"
/**
* Producer-Consumer based on top of rotational buffers.
*
* This favours writers (producer) and keeps the critical section
* light weight. Buffer switch happens when a consumer wants to
* consume data. This is the slow path and waits for pending
* writes to finish.
*
* TODO: do away with opaques (use arrays with indexing).
*/
#define ROT_BUFF_DEFAULT_COUNT 2
#define ROT_BUFF_ALLOC_SIZE (1 * 1024 * 1024) /* 1MB per iovec */
#define RLIST_IOV_MELDED_ALLOC_SIZE (RBUF_IOVEC_SIZE + ROT_BUFF_ALLOC_SIZE)
/**
* iovec list is not shrunk (deallocated) if usage/total count
* falls in this range. this is the fast path and should satisfy
* most of the workloads. for the rest shrinking iovec list is
* generous.
*/
#define RVEC_LOW_WATERMARK_COUNT 1
#define RVEC_HIGH_WATERMARK_COUNT (1 << 4)
static inline rbuf_list_t *
rbuf_current_buffer(rbuf_t *rbuf)
{
return rbuf->current;
}
static void
rlist_mark_waiting(rbuf_list_t *rlist)
{
LOCK(&rlist->c_lock);
{
rlist->awaiting = _gf_true;
}
UNLOCK(&rlist->c_lock);
}
static int
__rlist_has_waiter(rbuf_list_t *rlist)
{
return (rlist->awaiting == _gf_true);
}
static void *
rbuf_alloc_rvec()
{
return GF_CALLOC(1, RLIST_IOV_MELDED_ALLOC_SIZE, gf_common_mt_rvec_t);
}
static void
rlist_reset_vector_usage(rbuf_list_t *rlist)
{
rlist->used = 1;
}
static void
rlist_increment_vector_usage(rbuf_list_t *rlist)
{
rlist->used++;
}
static void
rlist_increment_total_usage(rbuf_list_t *rlist)
{
rlist->total++;
}
static int
rvec_in_watermark_range(rbuf_list_t *rlist)
{
return ((rlist->total >= RVEC_LOW_WATERMARK_COUNT) &&
(rlist->total <= RVEC_HIGH_WATERMARK_COUNT));
}
static void
rbuf_reset_rvec(rbuf_iovec_t *rvec)
{
GF_VALIDATE_OR_GOTO("libglusterfs", rvec, err);
/* iov_base is _never_ modified */
rvec->iov.iov_len = 0;
err:
return;
}
/* TODO: alloc multiple rbuf_iovec_t */
static int
rlist_add_new_vec(rbuf_list_t *rlist)
{
rbuf_iovec_t *rvec = NULL;
rvec = (rbuf_iovec_t *)rbuf_alloc_rvec();
if (!rvec)
return -1;
INIT_LIST_HEAD(&rvec->list);
rvec->iov.iov_base = ((char *)rvec) + RBUF_IOVEC_SIZE;
rvec->iov.iov_len = 0;
list_add_tail(&rvec->list, &rlist->veclist);
rlist->rvec = rvec; /* cache the latest */
rlist_increment_vector_usage(rlist);
rlist_increment_total_usage(rlist);
return 0;
}
static void
rlist_free_rvec(rbuf_iovec_t *rvec)
{
if (!rvec)
return;
list_del(&rvec->list);
GF_FREE(rvec);
}
static void
rlist_purge_all_rvec(rbuf_list_t *rlist)
{
rbuf_iovec_t *rvec = NULL;
if (!rlist)
return;
while (!list_empty(&rlist->veclist)) {
rvec = list_first_entry(&rlist->veclist, rbuf_iovec_t, list);
rlist_free_rvec(rvec);
}
}
static void
rlist_shrink_rvec(rbuf_list_t *rlist, unsigned long long shrink)
{
rbuf_iovec_t *rvec = NULL;
while (!list_empty(&rlist->veclist) && (shrink-- > 0)) {
rvec = list_first_entry(&rlist->veclist, rbuf_iovec_t, list);
rlist_free_rvec(rvec);
}
}
static void
rbuf_purge_rlist(rbuf_t *rbuf)
{
rbuf_list_t *rlist = NULL;
while (!list_empty(&rbuf->freelist)) {
rlist = list_first_entry(&rbuf->freelist, rbuf_list_t, list);
list_del(&rlist->list);
rlist_purge_all_rvec(rlist);
LOCK_DESTROY(&rlist->c_lock);
(void)pthread_mutex_destroy(&rlist->b_lock);
(void)pthread_cond_destroy(&rlist->b_cond);
GF_FREE(rlist);
}
}
rbuf_t *
rbuf_init(int bufcount)
{
int j = 0;
int ret = 0;
rbuf_t *rbuf = NULL;
rbuf_list_t *rlist = NULL;
if (bufcount <= 0)
bufcount = ROT_BUFF_DEFAULT_COUNT;
rbuf = GF_CALLOC(1, sizeof(rbuf_t), gf_common_mt_rbuf_t);
if (!rbuf)
goto error_return;
LOCK_INIT(&rbuf->lock);
INIT_LIST_HEAD(&rbuf->freelist);
/* it could have been one big calloc() but this is just once.. */
for (j = 0; j < bufcount; j++) {
rlist = GF_CALLOC(1, sizeof(rbuf_list_t), gf_common_mt_rlist_t);
if (!rlist) {
ret = -1;
break;
}
INIT_LIST_HEAD(&rlist->list);
INIT_LIST_HEAD(&rlist->veclist);
rlist->pending = rlist->completed = 0;
ret = rlist_add_new_vec(rlist);
if (ret)
break;
LOCK_INIT(&rlist->c_lock);
rlist->awaiting = _gf_false;
ret = pthread_mutex_init(&rlist->b_lock, 0);
if (ret != 0) {
GF_FREE(rlist);
break;
}
ret = pthread_cond_init(&rlist->b_cond, 0);
if (ret != 0) {
GF_FREE(rlist);
break;
}
list_add_tail(&rlist->list, &rbuf->freelist);
}
if (ret != 0)
goto dealloc_rlist;
/* cache currently used buffer: first in the list */
rbuf->current = list_first_entry(&rbuf->freelist, rbuf_list_t, list);
return rbuf;
dealloc_rlist:
rbuf_purge_rlist(rbuf);
LOCK_DESTROY(&rbuf->lock);
GF_FREE(rbuf);
error_return:
return NULL;
}
void
rbuf_dtor(rbuf_t *rbuf)
{
if (!rbuf)
return;
rbuf->current = NULL;
rbuf_purge_rlist(rbuf);
LOCK_DESTROY(&rbuf->lock);
GF_FREE(rbuf);
}
static char *
rbuf_adjust_write_area(struct iovec *iov, size_t bytes)
{
char *wbuf = NULL;
wbuf = iov->iov_base + iov->iov_len;
iov->iov_len += bytes;
return wbuf;
}
static char *
rbuf_alloc_write_area(rbuf_list_t *rlist, size_t bytes)
{
int ret = 0;
struct iovec *iov = NULL;
/* check for available space in _current_ IO buffer */
iov = &rlist->rvec->iov;
if (iov->iov_len + bytes <= ROT_BUFF_ALLOC_SIZE)
return rbuf_adjust_write_area(iov, bytes); /* fast path */
/* not enough bytes, try next available buffers */
if (list_is_last(&rlist->rvec->list, &rlist->veclist)) {
/* OH! consumed all vector buffers */
GF_ASSERT(rlist->used == rlist->total);
ret = rlist_add_new_vec(rlist);
if (ret)
goto error_return;
} else {
/* not the end, have available rbuf_iovec's */
rlist->rvec = list_next_entry(rlist->rvec, list);
rlist->used++;
rbuf_reset_rvec(rlist->rvec);
}
iov = &rlist->rvec->iov;
return rbuf_adjust_write_area(iov, bytes);
error_return:
return NULL;
}
char *
rbuf_reserve_write_area(rbuf_t *rbuf, size_t bytes, void **opaque)
{
char *wbuf = NULL;
rbuf_list_t *rlist = NULL;
if (!rbuf || (bytes <= 0) || (bytes > ROT_BUFF_ALLOC_SIZE) || !opaque)
return NULL;
LOCK(&rbuf->lock);
{
rlist = rbuf_current_buffer(rbuf);
wbuf = rbuf_alloc_write_area(rlist, bytes);
if (!wbuf)
goto unblock;
rlist->pending++;
}
unblock:
UNLOCK(&rbuf->lock);
if (wbuf)
*opaque = rlist;
return wbuf;
}
static void
rbuf_notify_waiter(rbuf_list_t *rlist)
{
pthread_mutex_lock(&rlist->b_lock);
{
pthread_cond_signal(&rlist->b_cond);
}
pthread_mutex_unlock(&rlist->b_lock);
}
int
rbuf_write_complete(void *opaque)
{
rbuf_list_t *rlist = NULL;
gf_boolean_t notify = _gf_false;
if (!opaque)
return -1;
rlist = opaque;
LOCK(&rlist->c_lock);
{
rlist->completed++;
/**
* it's safe to test ->pending without rbuf->lock *only* if
* there's a waiter as there can be no new incoming writes.
*/
if (__rlist_has_waiter(rlist) && (rlist->completed == rlist->pending))
notify = _gf_true;
}
UNLOCK(&rlist->c_lock);
if (notify)
rbuf_notify_waiter(rlist);
return 0;
}
int
rbuf_get_buffer(rbuf_t *rbuf, void **opaque, sequence_fn *seqfn, void *mydata)
{
int retval = RBUF_CONSUMABLE;
rbuf_list_t *rlist = NULL;
if (!rbuf || !opaque)
return -1;
LOCK(&rbuf->lock);
{
rlist = rbuf_current_buffer(rbuf);
if (!rlist->pending) {
retval = RBUF_EMPTY;
goto unblock;
}
if (list_is_singular(&rbuf->freelist)) {
/**
* removal would lead to writer starvation, disallow
* switching.
*/
retval = RBUF_WOULD_STARVE;
goto unblock;
}
list_del_init(&rlist->list);
if (seqfn)
seqfn(rlist, mydata);
rbuf->current = list_first_entry(&rbuf->freelist, rbuf_list_t, list);
}
unblock:
UNLOCK(&rbuf->lock);
if (retval == RBUF_CONSUMABLE)
*opaque = rlist; /* caller _owns_ the buffer */
return retval;
}
/**
* Wait for completion of pending writers and invoke dispatcher
* routine (for buffer consumption).
*/
static void
__rbuf_wait_for_writers(rbuf_list_t *rlist)
{
while (rlist->completed != rlist->pending)
pthread_cond_wait(&rlist->b_cond, &rlist->b_lock);
}
#ifndef M_E
#define M_E 2.7
#endif
static void
rlist_shrink_vector(rbuf_list_t *rlist)
{
unsigned long long shrink = 0;
/**
* fast path: don't bother to deallocate if vectors are hardly
* used.
*/
if (rvec_in_watermark_range(rlist))
return;
/**
* Calculate the shrink count based on total allocated vectors.
* Note that the calculation sticks to rlist->total irrespective
* of the actual usage count (rlist->used). Later, ->used could
* be used to apply slack to the calculation based on how much
* it lags from ->total. For now, let's stick to slow decay.
*/
shrink = rlist->total - (rlist->total * pow(M_E, -0.2));
rlist_shrink_rvec(rlist, shrink);
rlist->total -= shrink;
}
int
rbuf_wait_for_completion(rbuf_t *rbuf, void *opaque,
void (*fn)(rbuf_list_t *, void *), void *arg)
{
rbuf_list_t *rlist = NULL;
if (!rbuf || !opaque)
return -1;
rlist = opaque;
pthread_mutex_lock(&rlist->b_lock);
{
rlist_mark_waiting(rlist);
__rbuf_wait_for_writers(rlist);
}
pthread_mutex_unlock(&rlist->b_lock);
/**
* from here on, no need of locking until the rlist is put
* back into rotation.
*/
fn(rlist, arg); /* invoke dispatcher */
rlist->awaiting = _gf_false;
rlist->pending = rlist->completed = 0;
rlist_shrink_vector(rlist);
rlist_reset_vector_usage(rlist);
rlist->rvec = list_first_entry(&rlist->veclist, rbuf_iovec_t, list);
rbuf_reset_rvec(rlist->rvec);
LOCK(&rbuf->lock);
{
list_add_tail(&rlist->list, &rbuf->freelist);
}
UNLOCK(&rbuf->lock);
return 0;
}