/*
Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com>
This file is part of GlusterFS.
This file is licensed to you under your choice of the GNU Lesser
General Public License, version 3 or any later version (LGPLv3 or
later), or the GNU General Public License, version 2 (GPLv2), in all
cases as published by the Free Software Foundation.
*/
/**
*
* Basic token bucket implementation for rate limiting. As of now interfaces
* to throttle disk read request, directory entry scan and hash calculation
* are available. To throttle a particular request (operation), the call needs
* to be wrapped in-between throttling APIs, for e.g.
*
* TBF_THROTTLE_BEGIN (...); <-- induces "delays" if required
* {
* call (...);
* }
* TBF_THROTTLE_END (...); <-- not used atm, maybe needed later
*
*/
#include "glusterfs/mem-pool.h"
#include "glusterfs/throttle-tbf.h"
typedef struct tbf_throttle {
char done;
pthread_mutex_t mutex;
pthread_cond_t cond;
unsigned long tokens;
struct list_head list;
} tbf_throttle_t;
static tbf_throttle_t *
tbf_init_throttle(unsigned long tokens_required)
{
tbf_throttle_t *throttle = NULL;
throttle = GF_CALLOC(1, sizeof(*throttle), gf_common_mt_tbf_throttle_t);
if (!throttle)
return NULL;
throttle->done = 0;
throttle->tokens = tokens_required;
INIT_LIST_HEAD(&throttle->list);
(void)pthread_mutex_init(&throttle->mutex, NULL);
(void)pthread_cond_init(&throttle->cond, NULL);
return throttle;
}
void
_tbf_dispatch_queued(tbf_bucket_t *bucket)
{
gf_boolean_t xcont = _gf_false;
tbf_throttle_t *tmp = NULL;
tbf_throttle_t *throttle = NULL;
list_for_each_entry_safe(throttle, tmp, &bucket->queued, list)
{
pthread_mutex_lock(&throttle->mutex);
{
if (bucket->tokens < throttle->tokens) {
xcont = _gf_true;
goto unblock;
}
/* this request can now be serviced */
throttle->done = 1;
list_del_init(&throttle->list);
bucket->tokens -= throttle->tokens;
pthread_cond_signal(&throttle->cond);
}
unblock:
pthread_mutex_unlock(&throttle->mutex);
if (xcont)
break;
}
}
void *
tbf_tokengenerator(void *arg)
{
unsigned long tokenrate = 0;
unsigned long maxtokens = 0;
unsigned long token_gen_interval = 0;
tbf_bucket_t *bucket = arg;
tokenrate = bucket->tokenrate;
maxtokens = bucket->maxtokens;
token_gen_interval = bucket->token_gen_interval;
while (1) {
usleep(token_gen_interval);
LOCK(&bucket->lock);
{
bucket->tokens += tokenrate;
if (bucket->tokens > maxtokens)
bucket->tokens = maxtokens;
if (!list_empty(&bucket->queued))
_tbf_dispatch_queued(bucket);
}
UNLOCK(&bucket->lock);
}
return NULL;
}
/**
* There is lazy synchronization between this routine (when invoked
* under tbf_mod() context) and tbf_throttle(). *bucket is
* updated _after_ all the required variables are initialized.
*/
static int32_t
tbf_init_bucket(tbf_t *tbf, tbf_opspec_t *spec)
{
int ret = 0;
tbf_bucket_t *curr = NULL;
tbf_bucket_t **bucket = NULL;
GF_ASSERT(spec->op >= TBF_OP_MIN);
GF_ASSERT(spec->op <= TBF_OP_MAX);
/* no rate? no throttling. */
if (!spec->rate)
return 0;
bucket = tbf->bucket + spec->op;
curr = GF_CALLOC(1, sizeof(*curr), gf_common_mt_tbf_bucket_t);
if (!curr)
goto error_return;
LOCK_INIT(&curr->lock);
INIT_LIST_HEAD(&curr->queued);
curr->tokens = 0;
curr->tokenrate = spec->rate;
curr->maxtokens = spec->maxlimit;
curr->token_gen_interval = spec->token_gen_interval;
ret = gf_thread_create(&curr->tokener, NULL, tbf_tokengenerator, curr,
"tbfclock");
if (ret != 0)
goto freemem;
*bucket = curr;
return 0;
freemem:
LOCK_DESTROY(&curr->lock);
GF_FREE(curr);
error_return:
return -1;
}
#define TBF_ALLOC_SIZE (sizeof(tbf_t) + (TBF_OP_MAX * sizeof(tbf_bucket_t)))
tbf_t *
tbf_init(tbf_opspec_t *tbfspec, unsigned int count)
{
int32_t i = 0;
int32_t ret = 0;
tbf_t *tbf = NULL;
tbf_opspec_t *opspec = NULL;
tbf = GF_CALLOC(1, TBF_ALLOC_SIZE, gf_common_mt_tbf_t);
if (!tbf)
goto error_return;
tbf->bucket = (tbf_bucket_t **)((char *)tbf + sizeof(*tbf));
for (i = 0; i < TBF_OP_MAX; i++) {
*(tbf->bucket + i) = NULL;
}
for (i = 0; i < count; i++) {
opspec = tbfspec + i;
ret = tbf_init_bucket(tbf, opspec);
if (ret)
break;
}
if (ret)
goto error_return;
return tbf;
error_return:
return NULL;
}
static void
tbf_mod_bucket(tbf_bucket_t *bucket, tbf_opspec_t *spec)
{
LOCK(&bucket->lock);
{
bucket->tokens = 0;
bucket->tokenrate = spec->rate;
bucket->maxtokens = spec->maxlimit;
}
UNLOCK(&bucket->lock);
/* next token tick would unqueue pending operations */
}
int
tbf_mod(tbf_t *tbf, tbf_opspec_t *tbfspec)
{
int ret = 0;
tbf_bucket_t *bucket = NULL;
tbf_ops_t op = TBF_OP_MIN;
if (!tbf || !tbfspec)
return -1;
op = tbfspec->op;
GF_ASSERT(op >= TBF_OP_MIN);
GF_ASSERT(op <= TBF_OP_MAX);
bucket = *(tbf->bucket + op);
if (bucket) {
tbf_mod_bucket(bucket, tbfspec);
} else {
ret = tbf_init_bucket(tbf, tbfspec);
}
return ret;
}
void
tbf_throttle(tbf_t *tbf, tbf_ops_t op, unsigned long tokens_requested)
{
char waitq = 0;
tbf_bucket_t *bucket = NULL;
tbf_throttle_t *throttle = NULL;
GF_ASSERT(op >= TBF_OP_MIN);
GF_ASSERT(op <= TBF_OP_MAX);
bucket = *(tbf->bucket + op);
if (!bucket)
return;
LOCK(&bucket->lock);
{
/**
* if there are enough tokens in the bucket there is no need
* to throttle the request: therefore, consume the required
* number of tokens and continue.
*/
if (tokens_requested <= bucket->tokens) {
bucket->tokens -= tokens_requested;
} else {
throttle = tbf_init_throttle(tokens_requested);
if (!throttle) /* let it slip through for now.. */
goto unblock;
waitq = 1;
pthread_mutex_lock(&throttle->mutex);
list_add_tail(&throttle->list, &bucket->queued);
}
}
unblock:
UNLOCK(&bucket->lock);
if (waitq) {
while (!throttle->done) {
pthread_cond_wait(&throttle->cond, &throttle->mutex);
}
pthread_mutex_unlock(&throttle->mutex);
pthread_mutex_destroy(&throttle->mutex);
pthread_cond_destroy(&throttle->cond);
GF_FREE(throttle);
}
}