Blame libevent/bufferevent_ratelim.c

Packit e9ba0d
/*
Packit e9ba0d
 * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
Packit e9ba0d
 * Copyright (c) 2002-2006 Niels Provos <provos@citi.umich.edu>
Packit e9ba0d
 * All rights reserved.
Packit e9ba0d
 *
Packit e9ba0d
 * Redistribution and use in source and binary forms, with or without
Packit e9ba0d
 * modification, are permitted provided that the following conditions
Packit e9ba0d
 * are met:
Packit e9ba0d
 * 1. Redistributions of source code must retain the above copyright
Packit e9ba0d
 *    notice, this list of conditions and the following disclaimer.
Packit e9ba0d
 * 2. Redistributions in binary form must reproduce the above copyright
Packit e9ba0d
 *    notice, this list of conditions and the following disclaimer in the
Packit e9ba0d
 *    documentation and/or other materials provided with the distribution.
Packit e9ba0d
 * 3. The name of the author may not be used to endorse or promote products
Packit e9ba0d
 *    derived from this software without specific prior written permission.
Packit e9ba0d
 *
Packit e9ba0d
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
Packit e9ba0d
 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
Packit e9ba0d
 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
Packit e9ba0d
 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
Packit e9ba0d
 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
Packit e9ba0d
 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
Packit e9ba0d
 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
Packit e9ba0d
 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
Packit e9ba0d
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
Packit e9ba0d
 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
Packit e9ba0d
 */
Packit e9ba0d
Packit e9ba0d
#include <sys/types.h>
Packit e9ba0d
#include <limits.h>
Packit e9ba0d
#include <string.h>
Packit e9ba0d
#include <stdlib.h>
Packit e9ba0d
Packit e9ba0d
#include "event2/event.h"
Packit e9ba0d
#include "event2/event_struct.h"
Packit e9ba0d
#include "event2/util.h"
Packit e9ba0d
#include "event2/bufferevent.h"
Packit e9ba0d
#include "event2/bufferevent_struct.h"
Packit e9ba0d
#include "event2/buffer.h"
Packit e9ba0d
Packit e9ba0d
#include "ratelim-internal.h"
Packit e9ba0d
Packit e9ba0d
#include "bufferevent-internal.h"
Packit e9ba0d
#include "mm-internal.h"
Packit e9ba0d
#include "util-internal.h"
Packit e9ba0d
#include "event-internal.h"
Packit e9ba0d
Packit e9ba0d
int
Packit e9ba0d
ev_token_bucket_init(struct ev_token_bucket *bucket,
Packit e9ba0d
    const struct ev_token_bucket_cfg *cfg,
Packit e9ba0d
    ev_uint32_t current_tick,
Packit e9ba0d
    int reinitialize)
Packit e9ba0d
{
Packit e9ba0d
	if (reinitialize) {
Packit e9ba0d
		/* on reinitialization, we only clip downwards, since we've
Packit e9ba0d
		   already used who-knows-how-much bandwidth this tick.  We
Packit e9ba0d
		   leave "last_updated" as it is; the next update will add the
Packit e9ba0d
		   appropriate amount of bandwidth to the bucket.
Packit e9ba0d
		*/
Packit e9ba0d
		if (bucket->read_limit > (ev_int64_t) cfg->read_maximum)
Packit e9ba0d
			bucket->read_limit = cfg->read_maximum;
Packit e9ba0d
		if (bucket->write_limit > (ev_int64_t) cfg->write_maximum)
Packit e9ba0d
			bucket->write_limit = cfg->write_maximum;
Packit e9ba0d
	} else {
Packit e9ba0d
		bucket->read_limit = cfg->read_rate;
Packit e9ba0d
		bucket->write_limit = cfg->write_rate;
Packit e9ba0d
		bucket->last_updated = current_tick;
Packit e9ba0d
	}
Packit e9ba0d
	return 0;
Packit e9ba0d
}
Packit e9ba0d
Packit e9ba0d
int
Packit e9ba0d
ev_token_bucket_update(struct ev_token_bucket *bucket,
Packit e9ba0d
    const struct ev_token_bucket_cfg *cfg,
Packit e9ba0d
    ev_uint32_t current_tick)
Packit e9ba0d
{
Packit e9ba0d
	/* It's okay if the tick number overflows, since we'll just
Packit e9ba0d
	 * wrap around when we do the unsigned substraction. */
Packit e9ba0d
	unsigned n_ticks = current_tick - bucket->last_updated;
Packit e9ba0d
Packit e9ba0d
	/* Make sure some ticks actually happened, and that time didn't
Packit e9ba0d
	 * roll back. */
Packit e9ba0d
	if (n_ticks == 0 || n_ticks > INT_MAX)
Packit e9ba0d
		return 0;
Packit e9ba0d
Packit e9ba0d
	/* Naively, we would say
Packit e9ba0d
		bucket->limit += n_ticks * cfg->rate;
Packit e9ba0d
Packit e9ba0d
		if (bucket->limit > cfg->maximum)
Packit e9ba0d
			bucket->limit = cfg->maximum;
Packit e9ba0d
Packit e9ba0d
	   But we're worried about overflow, so we do it like this:
Packit e9ba0d
	*/
Packit e9ba0d
Packit e9ba0d
	if ((cfg->read_maximum - bucket->read_limit) / n_ticks < cfg->read_rate)
Packit e9ba0d
		bucket->read_limit = cfg->read_maximum;
Packit e9ba0d
	else
Packit e9ba0d
		bucket->read_limit += n_ticks * cfg->read_rate;
Packit e9ba0d
Packit e9ba0d
Packit e9ba0d
	if ((cfg->write_maximum - bucket->write_limit) / n_ticks < cfg->write_rate)
Packit e9ba0d
		bucket->write_limit = cfg->write_maximum;
Packit e9ba0d
	else
Packit e9ba0d
		bucket->write_limit += n_ticks * cfg->write_rate;
Packit e9ba0d
Packit e9ba0d
Packit e9ba0d
	bucket->last_updated = current_tick;
Packit e9ba0d
Packit e9ba0d
	return 1;
Packit e9ba0d
}
Packit e9ba0d
Packit e9ba0d
static inline void
Packit e9ba0d
bufferevent_update_buckets(struct bufferevent_private *bev)
Packit e9ba0d
{
Packit e9ba0d
	/* Must hold lock on bev. */
Packit e9ba0d
	struct timeval now;
Packit e9ba0d
	unsigned tick;
Packit e9ba0d
	event_base_gettimeofday_cached(bev->bev.ev_base, &now;;
Packit e9ba0d
	tick = ev_token_bucket_get_tick(&now, bev->rate_limiting->cfg);
Packit e9ba0d
	if (tick != bev->rate_limiting->limit.last_updated)
Packit e9ba0d
		ev_token_bucket_update(&bev->rate_limiting->limit,
Packit e9ba0d
		    bev->rate_limiting->cfg, tick);
Packit e9ba0d
}
Packit e9ba0d
Packit e9ba0d
ev_uint32_t
Packit e9ba0d
ev_token_bucket_get_tick(const struct timeval *tv,
Packit e9ba0d
    const struct ev_token_bucket_cfg *cfg)
Packit e9ba0d
{
Packit e9ba0d
	/* This computation uses two multiplies and a divide.  We could do
Packit e9ba0d
	 * fewer if we knew that the tick length was an integer number of
Packit e9ba0d
	 * seconds, or if we knew it divided evenly into a second.  We should
Packit e9ba0d
	 * investigate that more.
Packit e9ba0d
	 */
Packit e9ba0d
Packit e9ba0d
	/* We cast to an ev_uint64_t first, since we don't want to overflow
Packit e9ba0d
	 * before we do the final divide. */
Packit e9ba0d
	ev_uint64_t msec = (ev_uint64_t)tv->tv_sec * 1000 + tv->tv_usec / 1000;
Packit e9ba0d
	return (unsigned)(msec / cfg->msec_per_tick);
Packit e9ba0d
}
Packit e9ba0d
Packit e9ba0d
struct ev_token_bucket_cfg *
Packit e9ba0d
ev_token_bucket_cfg_new(size_t read_rate, size_t read_burst,
Packit e9ba0d
    size_t write_rate, size_t write_burst,
Packit e9ba0d
    const struct timeval *tick_len)
Packit e9ba0d
{
Packit e9ba0d
	struct ev_token_bucket_cfg *r;
Packit e9ba0d
	struct timeval g;
Packit e9ba0d
	if (! tick_len) {
Packit e9ba0d
		g.tv_sec = 1;
Packit e9ba0d
		g.tv_usec = 0;
Packit e9ba0d
		tick_len = &g;
Packit e9ba0d
	}
Packit e9ba0d
	if (read_rate > read_burst || write_rate > write_burst ||
Packit e9ba0d
	    read_rate < 1 || write_rate < 1)
Packit e9ba0d
		return NULL;
Packit e9ba0d
	if (read_rate > EV_RATE_LIMIT_MAX ||
Packit e9ba0d
	    write_rate > EV_RATE_LIMIT_MAX ||
Packit e9ba0d
	    read_burst > EV_RATE_LIMIT_MAX ||
Packit e9ba0d
	    write_burst > EV_RATE_LIMIT_MAX)
Packit e9ba0d
		return NULL;
Packit e9ba0d
	r = mm_calloc(1, sizeof(struct ev_token_bucket_cfg));
Packit e9ba0d
	if (!r)
Packit e9ba0d
		return NULL;
Packit e9ba0d
	r->read_rate = read_rate;
Packit e9ba0d
	r->write_rate = write_rate;
Packit e9ba0d
	r->read_maximum = read_burst;
Packit e9ba0d
	r->write_maximum = write_burst;
Packit e9ba0d
	memcpy(&r->tick_timeout, tick_len, sizeof(struct timeval));
Packit e9ba0d
	r->msec_per_tick = (tick_len->tv_sec * 1000) +
Packit e9ba0d
	    (tick_len->tv_usec & COMMON_TIMEOUT_MICROSECONDS_MASK)/1000;
Packit e9ba0d
	return r;
Packit e9ba0d
}
Packit e9ba0d
Packit e9ba0d
void
Packit e9ba0d
ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg)
Packit e9ba0d
{
Packit e9ba0d
	mm_free(cfg);
Packit e9ba0d
}
Packit e9ba0d
Packit e9ba0d
/* No matter how big our bucket gets, don't try to read more than this
Packit e9ba0d
 * much in a single read operation. */
Packit e9ba0d
#define MAX_TO_READ_EVER 16384
Packit e9ba0d
/* No matter how big our bucket gets, don't try to write more than this
Packit e9ba0d
 * much in a single write operation. */
Packit e9ba0d
#define MAX_TO_WRITE_EVER 16384
Packit e9ba0d
Packit e9ba0d
#define LOCK_GROUP(g) EVLOCK_LOCK((g)->lock, 0)
Packit e9ba0d
#define UNLOCK_GROUP(g) EVLOCK_UNLOCK((g)->lock, 0)
Packit e9ba0d
Packit e9ba0d
static int _bev_group_suspend_reading(struct bufferevent_rate_limit_group *g);
Packit e9ba0d
static int _bev_group_suspend_writing(struct bufferevent_rate_limit_group *g);
Packit e9ba0d
static void _bev_group_unsuspend_reading(struct bufferevent_rate_limit_group *g);
Packit e9ba0d
static void _bev_group_unsuspend_writing(struct bufferevent_rate_limit_group *g);
Packit e9ba0d
Packit e9ba0d
/** Helper: figure out the maximum amount we should write if is_write, or
Packit e9ba0d
    the maximum amount we should read if is_read.  Return that maximum, or
Packit e9ba0d
    0 if our bucket is wholly exhausted.
Packit e9ba0d
 */
Packit e9ba0d
static inline ev_ssize_t
Packit e9ba0d
_bufferevent_get_rlim_max(struct bufferevent_private *bev, int is_write)
Packit e9ba0d
{
Packit e9ba0d
	/* needs lock on bev. */
Packit e9ba0d
	ev_ssize_t max_so_far = is_write?MAX_TO_WRITE_EVER:MAX_TO_READ_EVER;
Packit e9ba0d
Packit e9ba0d
#define LIM(x)						\
Packit e9ba0d
	(is_write ? (x).write_limit : (x).read_limit)
Packit e9ba0d
Packit e9ba0d
#define GROUP_SUSPENDED(g)			\
Packit e9ba0d
	(is_write ? (g)->write_suspended : (g)->read_suspended)
Packit e9ba0d
Packit e9ba0d
	/* Sets max_so_far to MIN(x, max_so_far) */
Packit e9ba0d
#define CLAMPTO(x)				\
Packit e9ba0d
	do {					\
Packit e9ba0d
		if (max_so_far > (x))		\
Packit e9ba0d
			max_so_far = (x);	\
Packit e9ba0d
	} while (0);
Packit e9ba0d
Packit e9ba0d
	if (!bev->rate_limiting)
Packit e9ba0d
		return max_so_far;
Packit e9ba0d
Packit e9ba0d
	/* If rate-limiting is enabled at all, update the appropriate
Packit e9ba0d
	   bucket, and take the smaller of our rate limit and the group
Packit e9ba0d
	   rate limit.
Packit e9ba0d
	 */
Packit e9ba0d
Packit e9ba0d
	if (bev->rate_limiting->cfg) {
Packit e9ba0d
		bufferevent_update_buckets(bev);
Packit e9ba0d
		max_so_far = LIM(bev->rate_limiting->limit);
Packit e9ba0d
	}
Packit e9ba0d
	if (bev->rate_limiting->group) {
Packit e9ba0d
		struct bufferevent_rate_limit_group *g =
Packit e9ba0d
		    bev->rate_limiting->group;
Packit e9ba0d
		ev_ssize_t share;
Packit e9ba0d
		LOCK_GROUP(g);
Packit e9ba0d
		if (GROUP_SUSPENDED(g)) {
Packit e9ba0d
			/* We can get here if we failed to lock this
Packit e9ba0d
			 * particular bufferevent while suspending the whole
Packit e9ba0d
			 * group. */
Packit e9ba0d
			if (is_write)
Packit e9ba0d
				bufferevent_suspend_write(&bev->bev,
Packit e9ba0d
				    BEV_SUSPEND_BW_GROUP);
Packit e9ba0d
			else
Packit e9ba0d
				bufferevent_suspend_read(&bev->bev,
Packit e9ba0d
				    BEV_SUSPEND_BW_GROUP);
Packit e9ba0d
			share = 0;
Packit e9ba0d
		} else {
Packit e9ba0d
			/* XXXX probably we should divide among the active
Packit e9ba0d
			 * members, not the total members. */
Packit e9ba0d
			share = LIM(g->rate_limit) / g->n_members;
Packit e9ba0d
			if (share < g->min_share)
Packit e9ba0d
				share = g->min_share;
Packit e9ba0d
		}
Packit e9ba0d
		UNLOCK_GROUP(g);
Packit e9ba0d
		CLAMPTO(share);
Packit e9ba0d
	}
Packit e9ba0d
Packit e9ba0d
	if (max_so_far < 0)
Packit e9ba0d
		max_so_far = 0;
Packit e9ba0d
	return max_so_far;
Packit e9ba0d
}
Packit e9ba0d
Packit e9ba0d
ev_ssize_t
Packit e9ba0d
_bufferevent_get_read_max(struct bufferevent_private *bev)
Packit e9ba0d
{
Packit e9ba0d
	return _bufferevent_get_rlim_max(bev, 0);
Packit e9ba0d
}
Packit e9ba0d
Packit e9ba0d
ev_ssize_t
Packit e9ba0d
_bufferevent_get_write_max(struct bufferevent_private *bev)
Packit e9ba0d
{
Packit e9ba0d
	return _bufferevent_get_rlim_max(bev, 1);
Packit e9ba0d
}
Packit e9ba0d
Packit e9ba0d
int
Packit e9ba0d
_bufferevent_decrement_read_buckets(struct bufferevent_private *bev, ev_ssize_t bytes)
Packit e9ba0d
{
Packit e9ba0d
	/* XXXXX Make sure all users of this function check its return value */
Packit e9ba0d
	int r = 0;
Packit e9ba0d
	/* need to hold lock on bev */
Packit e9ba0d
	if (!bev->rate_limiting)
Packit e9ba0d
		return 0;
Packit e9ba0d
Packit e9ba0d
	if (bev->rate_limiting->cfg) {
Packit e9ba0d
		bev->rate_limiting->limit.read_limit -= bytes;
Packit e9ba0d
		if (bev->rate_limiting->limit.read_limit <= 0) {
Packit e9ba0d
			bufferevent_suspend_read(&bev->bev, BEV_SUSPEND_BW);
Packit e9ba0d
			if (event_add(&bev->rate_limiting->refill_bucket_event,
Packit e9ba0d
				&bev->rate_limiting->cfg->tick_timeout) < 0)
Packit e9ba0d
				r = -1;
Packit e9ba0d
		} else if (bev->read_suspended & BEV_SUSPEND_BW) {
Packit e9ba0d
			if (!(bev->write_suspended & BEV_SUSPEND_BW))
Packit e9ba0d
				event_del(&bev->rate_limiting->refill_bucket_event);
Packit e9ba0d
			bufferevent_unsuspend_read(&bev->bev, BEV_SUSPEND_BW);
Packit e9ba0d
		}
Packit e9ba0d
	}
Packit e9ba0d
Packit e9ba0d
	if (bev->rate_limiting->group) {
Packit e9ba0d
		LOCK_GROUP(bev->rate_limiting->group);
Packit e9ba0d
		bev->rate_limiting->group->rate_limit.read_limit -= bytes;
Packit e9ba0d
		bev->rate_limiting->group->total_read += bytes;
Packit e9ba0d
		if (bev->rate_limiting->group->rate_limit.read_limit <= 0) {
Packit e9ba0d
			_bev_group_suspend_reading(bev->rate_limiting->group);
Packit e9ba0d
		} else if (bev->rate_limiting->group->read_suspended) {
Packit e9ba0d
			_bev_group_unsuspend_reading(bev->rate_limiting->group);
Packit e9ba0d
		}
Packit e9ba0d
		UNLOCK_GROUP(bev->rate_limiting->group);
Packit e9ba0d
	}
Packit e9ba0d
Packit e9ba0d
	return r;
Packit e9ba0d
}
Packit e9ba0d
Packit e9ba0d
int
Packit e9ba0d
_bufferevent_decrement_write_buckets(struct bufferevent_private *bev, ev_ssize_t bytes)
Packit e9ba0d
{
Packit e9ba0d
	/* XXXXX Make sure all users of this function check its return value */
Packit e9ba0d
	int r = 0;
Packit e9ba0d
	/* need to hold lock */
Packit e9ba0d
	if (!bev->rate_limiting)
Packit e9ba0d
		return 0;
Packit e9ba0d
Packit e9ba0d
	if (bev->rate_limiting->cfg) {
Packit e9ba0d
		bev->rate_limiting->limit.write_limit -= bytes;
Packit e9ba0d
		if (bev->rate_limiting->limit.write_limit <= 0) {
Packit e9ba0d
			bufferevent_suspend_write(&bev->bev, BEV_SUSPEND_BW);
Packit e9ba0d
			if (event_add(&bev->rate_limiting->refill_bucket_event,
Packit e9ba0d
				&bev->rate_limiting->cfg->tick_timeout) < 0)
Packit e9ba0d
				r = -1;
Packit e9ba0d
		} else if (bev->write_suspended & BEV_SUSPEND_BW) {
Packit e9ba0d
			if (!(bev->read_suspended & BEV_SUSPEND_BW))
Packit e9ba0d
				event_del(&bev->rate_limiting->refill_bucket_event);
Packit e9ba0d
			bufferevent_unsuspend_write(&bev->bev, BEV_SUSPEND_BW);
Packit e9ba0d
		}
Packit e9ba0d
	}
Packit e9ba0d
Packit e9ba0d
	if (bev->rate_limiting->group) {
Packit e9ba0d
		LOCK_GROUP(bev->rate_limiting->group);
Packit e9ba0d
		bev->rate_limiting->group->rate_limit.write_limit -= bytes;
Packit e9ba0d
		bev->rate_limiting->group->total_written += bytes;
Packit e9ba0d
		if (bev->rate_limiting->group->rate_limit.write_limit <= 0) {
Packit e9ba0d
			_bev_group_suspend_writing(bev->rate_limiting->group);
Packit e9ba0d
		} else if (bev->rate_limiting->group->write_suspended) {
Packit e9ba0d
			_bev_group_unsuspend_writing(bev->rate_limiting->group);
Packit e9ba0d
		}
Packit e9ba0d
		UNLOCK_GROUP(bev->rate_limiting->group);
Packit e9ba0d
	}
Packit e9ba0d
Packit e9ba0d
	return r;
Packit e9ba0d
}
Packit e9ba0d
Packit e9ba0d
/** Stop reading on every bufferevent in g */
Packit e9ba0d
static int
Packit e9ba0d
_bev_group_suspend_reading(struct bufferevent_rate_limit_group *g)
Packit e9ba0d
{
Packit e9ba0d
	/* Needs group lock */
Packit e9ba0d
	struct bufferevent_private *bev;
Packit e9ba0d
	g->read_suspended = 1;
Packit e9ba0d
	g->pending_unsuspend_read = 0;
Packit e9ba0d
Packit e9ba0d
	/* Note that in this loop we call EVLOCK_TRY_LOCK instead of BEV_LOCK,
Packit e9ba0d
	   to prevent a deadlock.  (Ordinarily, the group lock nests inside
Packit e9ba0d
	   the bufferevent locks.  If we are unable to lock any individual
Packit e9ba0d
	   bufferevent, it will find out later when it looks at its limit
Packit e9ba0d
	   and sees that its group is suspended.
Packit e9ba0d
	*/
Packit e9ba0d
	TAILQ_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
Packit e9ba0d
		if (EVLOCK_TRY_LOCK(bev->lock)) {
Packit e9ba0d
			bufferevent_suspend_read(&bev->bev,
Packit e9ba0d
			    BEV_SUSPEND_BW_GROUP);
Packit e9ba0d
			EVLOCK_UNLOCK(bev->lock, 0);
Packit e9ba0d
		}
Packit e9ba0d
	}
Packit e9ba0d
	return 0;
Packit e9ba0d
}
Packit e9ba0d
Packit e9ba0d
/** Stop writing on every bufferevent in g */
Packit e9ba0d
static int
Packit e9ba0d
_bev_group_suspend_writing(struct bufferevent_rate_limit_group *g)
Packit e9ba0d
{
Packit e9ba0d
	/* Needs group lock */
Packit e9ba0d
	struct bufferevent_private *bev;
Packit e9ba0d
	g->write_suspended = 1;
Packit e9ba0d
	g->pending_unsuspend_write = 0;
Packit e9ba0d
	TAILQ_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
Packit e9ba0d
		if (EVLOCK_TRY_LOCK(bev->lock)) {
Packit e9ba0d
			bufferevent_suspend_write(&bev->bev,
Packit e9ba0d
			    BEV_SUSPEND_BW_GROUP);
Packit e9ba0d
			EVLOCK_UNLOCK(bev->lock, 0);
Packit e9ba0d
		}
Packit e9ba0d
	}
Packit e9ba0d
	return 0;
Packit e9ba0d
}
Packit e9ba0d
Packit e9ba0d
/** Timer callback invoked on a single bufferevent with one or more exhausted
Packit e9ba0d
    buckets when they are ready to refill. */
Packit e9ba0d
static void
Packit e9ba0d
_bev_refill_callback(evutil_socket_t fd, short what, void *arg)
Packit e9ba0d
{
Packit e9ba0d
	unsigned tick;
Packit e9ba0d
	struct timeval now;
Packit e9ba0d
	struct bufferevent_private *bev = arg;
Packit e9ba0d
	int again = 0;
Packit e9ba0d
	BEV_LOCK(&bev->bev);
Packit e9ba0d
	if (!bev->rate_limiting || !bev->rate_limiting->cfg) {
Packit e9ba0d
		BEV_UNLOCK(&bev->bev);
Packit e9ba0d
		return;
Packit e9ba0d
	}
Packit e9ba0d
Packit e9ba0d
	/* First, update the bucket */
Packit e9ba0d
	event_base_gettimeofday_cached(bev->bev.ev_base, &now;;
Packit e9ba0d
	tick = ev_token_bucket_get_tick(&now,
Packit e9ba0d
	    bev->rate_limiting->cfg);
Packit e9ba0d
	ev_token_bucket_update(&bev->rate_limiting->limit,
Packit e9ba0d
	    bev->rate_limiting->cfg,
Packit e9ba0d
	    tick);
Packit e9ba0d
Packit e9ba0d
	/* Now unsuspend any read/write operations as appropriate. */
Packit e9ba0d
	if ((bev->read_suspended & BEV_SUSPEND_BW)) {
Packit e9ba0d
		if (bev->rate_limiting->limit.read_limit > 0)
Packit e9ba0d
			bufferevent_unsuspend_read(&bev->bev, BEV_SUSPEND_BW);
Packit e9ba0d
		else
Packit e9ba0d
			again = 1;
Packit e9ba0d
	}
Packit e9ba0d
	if ((bev->write_suspended & BEV_SUSPEND_BW)) {
Packit e9ba0d
		if (bev->rate_limiting->limit.write_limit > 0)
Packit e9ba0d
			bufferevent_unsuspend_write(&bev->bev, BEV_SUSPEND_BW);
Packit e9ba0d
		else
Packit e9ba0d
			again = 1;
Packit e9ba0d
	}
Packit e9ba0d
	if (again) {
Packit e9ba0d
		/* One or more of the buckets may need another refill if they
Packit e9ba0d
		   started negative.
Packit e9ba0d
Packit e9ba0d
		   XXXX if we need to be quiet for more ticks, we should
Packit e9ba0d
		   maybe figure out what timeout we really want.
Packit e9ba0d
		*/
Packit e9ba0d
		/* XXXX Handle event_add failure somehow */
Packit e9ba0d
		event_add(&bev->rate_limiting->refill_bucket_event,
Packit e9ba0d
		    &bev->rate_limiting->cfg->tick_timeout);
Packit e9ba0d
	}
Packit e9ba0d
	BEV_UNLOCK(&bev->bev);
Packit e9ba0d
}
Packit e9ba0d
Packit e9ba0d
/** Helper: grab a random element from a bufferevent group. */
Packit e9ba0d
static struct bufferevent_private *
Packit e9ba0d
_bev_group_random_element(struct bufferevent_rate_limit_group *group)
Packit e9ba0d
{
Packit e9ba0d
	int which;
Packit e9ba0d
	struct bufferevent_private *bev;
Packit e9ba0d
Packit e9ba0d
	/* requires group lock */
Packit e9ba0d
Packit e9ba0d
	if (!group->n_members)
Packit e9ba0d
		return NULL;
Packit e9ba0d
Packit e9ba0d
	EVUTIL_ASSERT(! TAILQ_EMPTY(&group->members));
Packit e9ba0d
Packit e9ba0d
	which = _evutil_weakrand() % group->n_members;
Packit e9ba0d
Packit e9ba0d
	bev = TAILQ_FIRST(&group->members);
Packit e9ba0d
	while (which--)
Packit e9ba0d
		bev = TAILQ_NEXT(bev, rate_limiting->next_in_group);
Packit e9ba0d
Packit e9ba0d
	return bev;
Packit e9ba0d
}
Packit e9ba0d
Packit e9ba0d
/** Iterate over the elements of a rate-limiting group 'g' with a random
Packit e9ba0d
    starting point, assigning each to the variable 'bev', and executing the
Packit e9ba0d
    block 'block'.
Packit e9ba0d
Packit e9ba0d
    We do this in a half-baked effort to get fairness among group members.
Packit e9ba0d
    XXX Round-robin or some kind of priority queue would be even more fair.
Packit e9ba0d
 */
Packit e9ba0d
#define FOREACH_RANDOM_ORDER(block)			\
Packit e9ba0d
	do {						\
Packit e9ba0d
		first = _bev_group_random_element(g);	\
Packit e9ba0d
		for (bev = first; bev != TAILQ_END(&g->members); \
Packit e9ba0d
		    bev = TAILQ_NEXT(bev, rate_limiting->next_in_group)) { \
Packit e9ba0d
			block ;					 \
Packit e9ba0d
		}						 \
Packit e9ba0d
		for (bev = TAILQ_FIRST(&g->members); bev && bev != first; \
Packit e9ba0d
		    bev = TAILQ_NEXT(bev, rate_limiting->next_in_group)) { \
Packit e9ba0d
			block ;						\
Packit e9ba0d
		}							\
Packit e9ba0d
	} while (0)
Packit e9ba0d
Packit e9ba0d
static void
Packit e9ba0d
_bev_group_unsuspend_reading(struct bufferevent_rate_limit_group *g)
Packit e9ba0d
{
Packit e9ba0d
	int again = 0;
Packit e9ba0d
	struct bufferevent_private *bev, *first;
Packit e9ba0d
Packit e9ba0d
	g->read_suspended = 0;
Packit e9ba0d
	FOREACH_RANDOM_ORDER({
Packit e9ba0d
		if (EVLOCK_TRY_LOCK(bev->lock)) {
Packit e9ba0d
			bufferevent_unsuspend_read(&bev->bev,
Packit e9ba0d
			    BEV_SUSPEND_BW_GROUP);
Packit e9ba0d
			EVLOCK_UNLOCK(bev->lock, 0);
Packit e9ba0d
		} else {
Packit e9ba0d
			again = 1;
Packit e9ba0d
		}
Packit e9ba0d
	});
Packit e9ba0d
	g->pending_unsuspend_read = again;
Packit e9ba0d
}
Packit e9ba0d
Packit e9ba0d
static void
Packit e9ba0d
_bev_group_unsuspend_writing(struct bufferevent_rate_limit_group *g)
Packit e9ba0d
{
Packit e9ba0d
	int again = 0;
Packit e9ba0d
	struct bufferevent_private *bev, *first;
Packit e9ba0d
	g->write_suspended = 0;
Packit e9ba0d
Packit e9ba0d
	FOREACH_RANDOM_ORDER({
Packit e9ba0d
		if (EVLOCK_TRY_LOCK(bev->lock)) {
Packit e9ba0d
			bufferevent_unsuspend_write(&bev->bev,
Packit e9ba0d
			    BEV_SUSPEND_BW_GROUP);
Packit e9ba0d
			EVLOCK_UNLOCK(bev->lock, 0);
Packit e9ba0d
		} else {
Packit e9ba0d
			again = 1;
Packit e9ba0d
		}
Packit e9ba0d
	});
Packit e9ba0d
	g->pending_unsuspend_write = again;
Packit e9ba0d
}
Packit e9ba0d
Packit e9ba0d
/** Callback invoked every tick to add more elements to the group bucket
Packit e9ba0d
    and unsuspend group members as needed.
Packit e9ba0d
 */
Packit e9ba0d
static void
Packit e9ba0d
_bev_group_refill_callback(evutil_socket_t fd, short what, void *arg)
Packit e9ba0d
{
Packit e9ba0d
	struct bufferevent_rate_limit_group *g = arg;
Packit e9ba0d
	unsigned tick;
Packit e9ba0d
	struct timeval now;
Packit e9ba0d
Packit e9ba0d
	event_base_gettimeofday_cached(event_get_base(&g->master_refill_event), &now;;
Packit e9ba0d
Packit e9ba0d
	LOCK_GROUP(g);
Packit e9ba0d
Packit e9ba0d
	tick = ev_token_bucket_get_tick(&now, &g->rate_limit_cfg);
Packit e9ba0d
	ev_token_bucket_update(&g->rate_limit, &g->rate_limit_cfg, tick);
Packit e9ba0d
Packit e9ba0d
	if (g->pending_unsuspend_read ||
Packit e9ba0d
	    (g->read_suspended && (g->rate_limit.read_limit >= g->min_share))) {
Packit e9ba0d
		_bev_group_unsuspend_reading(g);
Packit e9ba0d
	}
Packit e9ba0d
	if (g->pending_unsuspend_write ||
Packit e9ba0d
	    (g->write_suspended && (g->rate_limit.write_limit >= g->min_share))){
Packit e9ba0d
		_bev_group_unsuspend_writing(g);
Packit e9ba0d
	}
Packit e9ba0d
Packit e9ba0d
	/* XXXX Rather than waiting to the next tick to unsuspend stuff
Packit e9ba0d
	 * with pending_unsuspend_write/read, we should do it on the
Packit e9ba0d
	 * next iteration of the mainloop.
Packit e9ba0d
	 */
Packit e9ba0d
Packit e9ba0d
	UNLOCK_GROUP(g);
Packit e9ba0d
}
Packit e9ba0d
Packit e9ba0d
int
Packit e9ba0d
bufferevent_set_rate_limit(struct bufferevent *bev,
Packit e9ba0d
    struct ev_token_bucket_cfg *cfg)
Packit e9ba0d
{
Packit e9ba0d
	struct bufferevent_private *bevp =
Packit e9ba0d
	    EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
Packit e9ba0d
	int r = -1;
Packit e9ba0d
	struct bufferevent_rate_limit *rlim;
Packit e9ba0d
	struct timeval now;
Packit e9ba0d
	ev_uint32_t tick;
Packit e9ba0d
	int reinit = 0, suspended = 0;
Packit e9ba0d
	/* XXX reference-count cfg */
Packit e9ba0d
Packit e9ba0d
	BEV_LOCK(bev);
Packit e9ba0d
Packit e9ba0d
	if (cfg == NULL) {
Packit e9ba0d
		if (bevp->rate_limiting) {
Packit e9ba0d
			rlim = bevp->rate_limiting;
Packit e9ba0d
			rlim->cfg = NULL;
Packit e9ba0d
			bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW);
Packit e9ba0d
			bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW);
Packit e9ba0d
			if (event_initialized(&rlim->refill_bucket_event))
Packit e9ba0d
				event_del(&rlim->refill_bucket_event);
Packit e9ba0d
		}
Packit e9ba0d
		r = 0;
Packit e9ba0d
		goto done;
Packit e9ba0d
	}
Packit e9ba0d
Packit e9ba0d
	event_base_gettimeofday_cached(bev->ev_base, &now;;
Packit e9ba0d
	tick = ev_token_bucket_get_tick(&now, cfg);
Packit e9ba0d
Packit e9ba0d
	if (bevp->rate_limiting && bevp->rate_limiting->cfg == cfg) {
Packit e9ba0d
		/* no-op */
Packit e9ba0d
		r = 0;
Packit e9ba0d
		goto done;
Packit e9ba0d
	}
Packit e9ba0d
	if (bevp->rate_limiting == NULL) {
Packit e9ba0d
		rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
Packit e9ba0d
		if (!rlim)
Packit e9ba0d
			goto done;
Packit e9ba0d
		bevp->rate_limiting = rlim;
Packit e9ba0d
	} else {
Packit e9ba0d
		rlim = bevp->rate_limiting;
Packit e9ba0d
	}
Packit e9ba0d
	reinit = rlim->cfg != NULL;
Packit e9ba0d
Packit e9ba0d
	rlim->cfg = cfg;
Packit e9ba0d
	ev_token_bucket_init(&rlim->limit, cfg, tick, reinit);
Packit e9ba0d
Packit e9ba0d
	if (reinit) {
Packit e9ba0d
		EVUTIL_ASSERT(event_initialized(&rlim->refill_bucket_event));
Packit e9ba0d
		event_del(&rlim->refill_bucket_event);
Packit e9ba0d
	}
Packit e9ba0d
	evtimer_assign(&rlim->refill_bucket_event, bev->ev_base,
Packit e9ba0d
	    _bev_refill_callback, bevp);
Packit e9ba0d
Packit e9ba0d
	if (rlim->limit.read_limit > 0) {
Packit e9ba0d
		bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW);
Packit e9ba0d
	} else {
Packit e9ba0d
		bufferevent_suspend_read(bev, BEV_SUSPEND_BW);
Packit e9ba0d
		suspended=1;
Packit e9ba0d
	}
Packit e9ba0d
	if (rlim->limit.write_limit > 0) {
Packit e9ba0d
		bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW);
Packit e9ba0d
	} else {
Packit e9ba0d
		bufferevent_suspend_write(bev, BEV_SUSPEND_BW);
Packit e9ba0d
		suspended = 1;
Packit e9ba0d
	}
Packit e9ba0d
Packit e9ba0d
	if (suspended)
Packit e9ba0d
		event_add(&rlim->refill_bucket_event, &cfg->tick_timeout);
Packit e9ba0d
Packit e9ba0d
	r = 0;
Packit e9ba0d
Packit e9ba0d
done:
Packit e9ba0d
	BEV_UNLOCK(bev);
Packit e9ba0d
	return r;
Packit e9ba0d
}
Packit e9ba0d
Packit e9ba0d
struct bufferevent_rate_limit_group *
Packit e9ba0d
bufferevent_rate_limit_group_new(struct event_base *base,
Packit e9ba0d
    const struct ev_token_bucket_cfg *cfg)
Packit e9ba0d
{
Packit e9ba0d
	struct bufferevent_rate_limit_group *g;
Packit e9ba0d
	struct timeval now;
Packit e9ba0d
	ev_uint32_t tick;
Packit e9ba0d
Packit e9ba0d
	event_base_gettimeofday_cached(base, &now;;
Packit e9ba0d
	tick = ev_token_bucket_get_tick(&now, cfg);
Packit e9ba0d
Packit e9ba0d
	g = mm_calloc(1, sizeof(struct bufferevent_rate_limit_group));
Packit e9ba0d
	if (!g)
Packit e9ba0d
		return NULL;
Packit e9ba0d
	memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
Packit e9ba0d
	TAILQ_INIT(&g->members);
Packit e9ba0d
Packit e9ba0d
	ev_token_bucket_init(&g->rate_limit, cfg, tick, 0);
Packit e9ba0d
Packit e9ba0d
	event_assign(&g->master_refill_event, base, -1, EV_PERSIST,
Packit e9ba0d
	    _bev_group_refill_callback, g);
Packit e9ba0d
	/*XXXX handle event_add failure */
Packit e9ba0d
	event_add(&g->master_refill_event, &cfg->tick_timeout);
Packit e9ba0d
Packit e9ba0d
	EVTHREAD_ALLOC_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
Packit e9ba0d
Packit e9ba0d
	bufferevent_rate_limit_group_set_min_share(g, 64);
Packit e9ba0d
Packit e9ba0d
	return g;
Packit e9ba0d
}
Packit e9ba0d
Packit e9ba0d
int
Packit e9ba0d
bufferevent_rate_limit_group_set_cfg(
Packit e9ba0d
	struct bufferevent_rate_limit_group *g,
Packit e9ba0d
	const struct ev_token_bucket_cfg *cfg)
Packit e9ba0d
{
Packit e9ba0d
	int same_tick;
Packit e9ba0d
	if (!g || !cfg)
Packit e9ba0d
		return -1;
Packit e9ba0d
Packit e9ba0d
	LOCK_GROUP(g);
Packit e9ba0d
	same_tick = evutil_timercmp(
Packit e9ba0d
		&g->rate_limit_cfg.tick_timeout, &cfg->tick_timeout, ==);
Packit e9ba0d
	memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
Packit e9ba0d
Packit e9ba0d
	if (g->rate_limit.read_limit > (ev_ssize_t)cfg->read_maximum)
Packit e9ba0d
		g->rate_limit.read_limit = cfg->read_maximum;
Packit e9ba0d
	if (g->rate_limit.write_limit > (ev_ssize_t)cfg->write_maximum)
Packit e9ba0d
		g->rate_limit.write_limit = cfg->write_maximum;
Packit e9ba0d
Packit e9ba0d
	if (!same_tick) {
Packit e9ba0d
		/* This can cause a hiccup in the schedule */
Packit e9ba0d
		event_add(&g->master_refill_event, &cfg->tick_timeout);
Packit e9ba0d
	}
Packit e9ba0d
Packit e9ba0d
	/* The new limits might force us to adjust min_share differently. */
Packit e9ba0d
	bufferevent_rate_limit_group_set_min_share(g, g->configured_min_share);
Packit e9ba0d
Packit e9ba0d
	UNLOCK_GROUP(g);
Packit e9ba0d
	return 0;
Packit e9ba0d
}
Packit e9ba0d
Packit e9ba0d
int
Packit e9ba0d
bufferevent_rate_limit_group_set_min_share(
Packit e9ba0d
	struct bufferevent_rate_limit_group *g,
Packit e9ba0d
	size_t share)
Packit e9ba0d
{
Packit e9ba0d
	if (share > EV_SSIZE_MAX)
Packit e9ba0d
		return -1;
Packit e9ba0d
Packit e9ba0d
	g->configured_min_share = share;
Packit e9ba0d
Packit e9ba0d
	/* Can't set share to less than the one-tick maximum.  IOW, at steady
Packit e9ba0d
	 * state, at least one connection can go per tick. */
Packit e9ba0d
	if (share > g->rate_limit_cfg.read_rate)
Packit e9ba0d
		share = g->rate_limit_cfg.read_rate;
Packit e9ba0d
	if (share > g->rate_limit_cfg.write_rate)
Packit e9ba0d
		share = g->rate_limit_cfg.write_rate;
Packit e9ba0d
Packit e9ba0d
	g->min_share = share;
Packit e9ba0d
	return 0;
Packit e9ba0d
}
Packit e9ba0d
Packit e9ba0d
void
Packit e9ba0d
bufferevent_rate_limit_group_free(struct bufferevent_rate_limit_group *g)
Packit e9ba0d
{
Packit e9ba0d
	LOCK_GROUP(g);
Packit e9ba0d
	EVUTIL_ASSERT(0 == g->n_members);
Packit e9ba0d
	event_del(&g->master_refill_event);
Packit e9ba0d
	UNLOCK_GROUP(g);
Packit e9ba0d
	EVTHREAD_FREE_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
Packit e9ba0d
	mm_free(g);
Packit e9ba0d
}
Packit e9ba0d
Packit e9ba0d
int
Packit e9ba0d
bufferevent_add_to_rate_limit_group(struct bufferevent *bev,
Packit e9ba0d
    struct bufferevent_rate_limit_group *g)
Packit e9ba0d
{
Packit e9ba0d
	int wsuspend, rsuspend;
Packit e9ba0d
	struct bufferevent_private *bevp =
Packit e9ba0d
	    EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
Packit e9ba0d
	BEV_LOCK(bev);
Packit e9ba0d
Packit e9ba0d
	if (!bevp->rate_limiting) {
Packit e9ba0d
		struct bufferevent_rate_limit *rlim;
Packit e9ba0d
		rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
Packit e9ba0d
		if (!rlim) {
Packit e9ba0d
			BEV_UNLOCK(bev);
Packit e9ba0d
			return -1;
Packit e9ba0d
		}
Packit e9ba0d
		evtimer_assign(&rlim->refill_bucket_event, bev->ev_base,
Packit e9ba0d
		    _bev_refill_callback, bevp);
Packit e9ba0d
		bevp->rate_limiting = rlim;
Packit e9ba0d
	}
Packit e9ba0d
Packit e9ba0d
	if (bevp->rate_limiting->group == g) {
Packit e9ba0d
		BEV_UNLOCK(bev);
Packit e9ba0d
		return 0;
Packit e9ba0d
	}
Packit e9ba0d
	if (bevp->rate_limiting->group)
Packit e9ba0d
		bufferevent_remove_from_rate_limit_group(bev);
Packit e9ba0d
Packit e9ba0d
	LOCK_GROUP(g);
Packit e9ba0d
	bevp->rate_limiting->group = g;
Packit e9ba0d
	++g->n_members;
Packit e9ba0d
	TAILQ_INSERT_TAIL(&g->members, bevp, rate_limiting->next_in_group);
Packit e9ba0d
Packit e9ba0d
	rsuspend = g->read_suspended;
Packit e9ba0d
	wsuspend = g->write_suspended;
Packit e9ba0d
Packit e9ba0d
	UNLOCK_GROUP(g);
Packit e9ba0d
Packit e9ba0d
	if (rsuspend)
Packit e9ba0d
		bufferevent_suspend_read(bev, BEV_SUSPEND_BW_GROUP);
Packit e9ba0d
	if (wsuspend)
Packit e9ba0d
		bufferevent_suspend_write(bev, BEV_SUSPEND_BW_GROUP);
Packit e9ba0d
Packit e9ba0d
	BEV_UNLOCK(bev);
Packit e9ba0d
	return 0;
Packit e9ba0d
}
Packit e9ba0d
Packit e9ba0d
int
Packit e9ba0d
bufferevent_remove_from_rate_limit_group(struct bufferevent *bev)
Packit e9ba0d
{
Packit e9ba0d
	return bufferevent_remove_from_rate_limit_group_internal(bev, 1);
Packit e9ba0d
}
Packit e9ba0d
Packit e9ba0d
int
Packit e9ba0d
bufferevent_remove_from_rate_limit_group_internal(struct bufferevent *bev,
Packit e9ba0d
    int unsuspend)
Packit e9ba0d
{
Packit e9ba0d
	struct bufferevent_private *bevp =
Packit e9ba0d
	    EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
Packit e9ba0d
	BEV_LOCK(bev);
Packit e9ba0d
	if (bevp->rate_limiting && bevp->rate_limiting->group) {
Packit e9ba0d
		struct bufferevent_rate_limit_group *g =
Packit e9ba0d
		    bevp->rate_limiting->group;
Packit e9ba0d
		LOCK_GROUP(g);
Packit e9ba0d
		bevp->rate_limiting->group = NULL;
Packit e9ba0d
		--g->n_members;
Packit e9ba0d
		TAILQ_REMOVE(&g->members, bevp, rate_limiting->next_in_group);
Packit e9ba0d
		UNLOCK_GROUP(g);
Packit e9ba0d
	}
Packit e9ba0d
	if (unsuspend) {
Packit e9ba0d
		bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW_GROUP);
Packit e9ba0d
		bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW_GROUP);
Packit e9ba0d
	}
Packit e9ba0d
	BEV_UNLOCK(bev);
Packit e9ba0d
	return 0;
Packit e9ba0d
}
Packit e9ba0d
Packit e9ba0d
/* ===
Packit e9ba0d
 * API functions to expose rate limits.
Packit e9ba0d
 *
Packit e9ba0d
 * Don't use these from inside Libevent; they're meant to be for use by
Packit e9ba0d
 * the program.
Packit e9ba0d
 * === */
Packit e9ba0d
Packit e9ba0d
/* Mostly you don't want to use this function from inside libevent;
Packit e9ba0d
 * _bufferevent_get_read_max() is more likely what you want*/
Packit e9ba0d
ev_ssize_t
Packit e9ba0d
bufferevent_get_read_limit(struct bufferevent *bev)
Packit e9ba0d
{
Packit e9ba0d
	ev_ssize_t r;
Packit e9ba0d
	struct bufferevent_private *bevp;
Packit e9ba0d
	BEV_LOCK(bev);
Packit e9ba0d
	bevp = BEV_UPCAST(bev);
Packit e9ba0d
	if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
Packit e9ba0d
		bufferevent_update_buckets(bevp);
Packit e9ba0d
		r = bevp->rate_limiting->limit.read_limit;
Packit e9ba0d
	} else {
Packit e9ba0d
		r = EV_SSIZE_MAX;
Packit e9ba0d
	}
Packit e9ba0d
	BEV_UNLOCK(bev);
Packit e9ba0d
	return r;
Packit e9ba0d
}
Packit e9ba0d
Packit e9ba0d
/* Mostly you don't want to use this function from inside libevent;
Packit e9ba0d
 * _bufferevent_get_write_max() is more likely what you want*/
Packit e9ba0d
ev_ssize_t
Packit e9ba0d
bufferevent_get_write_limit(struct bufferevent *bev)
Packit e9ba0d
{
Packit e9ba0d
	ev_ssize_t r;
Packit e9ba0d
	struct bufferevent_private *bevp;
Packit e9ba0d
	BEV_LOCK(bev);
Packit e9ba0d
	bevp = BEV_UPCAST(bev);
Packit e9ba0d
	if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
Packit e9ba0d
		bufferevent_update_buckets(bevp);
Packit e9ba0d
		r = bevp->rate_limiting->limit.write_limit;
Packit e9ba0d
	} else {
Packit e9ba0d
		r = EV_SSIZE_MAX;
Packit e9ba0d
	}
Packit e9ba0d
	BEV_UNLOCK(bev);
Packit e9ba0d
	return r;
Packit e9ba0d
}
Packit e9ba0d
Packit e9ba0d
ev_ssize_t
Packit e9ba0d
bufferevent_get_max_to_read(struct bufferevent *bev)
Packit e9ba0d
{
Packit e9ba0d
	ev_ssize_t r;
Packit e9ba0d
	BEV_LOCK(bev);
Packit e9ba0d
	r = _bufferevent_get_read_max(BEV_UPCAST(bev));
Packit e9ba0d
	BEV_UNLOCK(bev);
Packit e9ba0d
	return r;
Packit e9ba0d
}
Packit e9ba0d
Packit e9ba0d
ev_ssize_t
Packit e9ba0d
bufferevent_get_max_to_write(struct bufferevent *bev)
Packit e9ba0d
{
Packit e9ba0d
	ev_ssize_t r;
Packit e9ba0d
	BEV_LOCK(bev);
Packit e9ba0d
	r = _bufferevent_get_write_max(BEV_UPCAST(bev));
Packit e9ba0d
	BEV_UNLOCK(bev);
Packit e9ba0d
	return r;
Packit e9ba0d
}
Packit e9ba0d
Packit e9ba0d
Packit e9ba0d
/* Mostly you don't want to use this function from inside libevent;
Packit e9ba0d
 * _bufferevent_get_read_max() is more likely what you want*/
Packit e9ba0d
ev_ssize_t
Packit e9ba0d
bufferevent_rate_limit_group_get_read_limit(
Packit e9ba0d
	struct bufferevent_rate_limit_group *grp)
Packit e9ba0d
{
Packit e9ba0d
	ev_ssize_t r;
Packit e9ba0d
	LOCK_GROUP(grp);
Packit e9ba0d
	r = grp->rate_limit.read_limit;
Packit e9ba0d
	UNLOCK_GROUP(grp);
Packit e9ba0d
	return r;
Packit e9ba0d
}
Packit e9ba0d
Packit e9ba0d
/* Mostly you don't want to use this function from inside libevent;
Packit e9ba0d
 * _bufferevent_get_write_max() is more likely what you want. */
Packit e9ba0d
ev_ssize_t
Packit e9ba0d
bufferevent_rate_limit_group_get_write_limit(
Packit e9ba0d
	struct bufferevent_rate_limit_group *grp)
Packit e9ba0d
{
Packit e9ba0d
	ev_ssize_t r;
Packit e9ba0d
	LOCK_GROUP(grp);
Packit e9ba0d
	r = grp->rate_limit.write_limit;
Packit e9ba0d
	UNLOCK_GROUP(grp);
Packit e9ba0d
	return r;
Packit e9ba0d
}
Packit e9ba0d
Packit e9ba0d
int
Packit e9ba0d
bufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr)
Packit e9ba0d
{
Packit e9ba0d
	int r = 0;
Packit e9ba0d
	ev_ssize_t old_limit, new_limit;
Packit e9ba0d
	struct bufferevent_private *bevp;
Packit e9ba0d
	BEV_LOCK(bev);
Packit e9ba0d
	bevp = BEV_UPCAST(bev);
Packit e9ba0d
	EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
Packit e9ba0d
	old_limit = bevp->rate_limiting->limit.read_limit;
Packit e9ba0d
Packit e9ba0d
	new_limit = (bevp->rate_limiting->limit.read_limit -= decr);
Packit e9ba0d
	if (old_limit > 0 && new_limit <= 0) {
Packit e9ba0d
		bufferevent_suspend_read(bev, BEV_SUSPEND_BW);
Packit e9ba0d
		if (event_add(&bevp->rate_limiting->refill_bucket_event,
Packit e9ba0d
			&bevp->rate_limiting->cfg->tick_timeout) < 0)
Packit e9ba0d
			r = -1;
Packit e9ba0d
	} else if (old_limit <= 0 && new_limit > 0) {
Packit e9ba0d
		if (!(bevp->write_suspended & BEV_SUSPEND_BW))
Packit e9ba0d
			event_del(&bevp->rate_limiting->refill_bucket_event);
Packit e9ba0d
		bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW);
Packit e9ba0d
	}
Packit e9ba0d
Packit e9ba0d
	BEV_UNLOCK(bev);
Packit e9ba0d
	return r;
Packit e9ba0d
}
Packit e9ba0d
Packit e9ba0d
int
Packit e9ba0d
bufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr)
Packit e9ba0d
{
Packit e9ba0d
	/* XXXX this is mostly copy-and-paste from
Packit e9ba0d
	 * bufferevent_decrement_read_limit */
Packit e9ba0d
	int r = 0;
Packit e9ba0d
	ev_ssize_t old_limit, new_limit;
Packit e9ba0d
	struct bufferevent_private *bevp;
Packit e9ba0d
	BEV_LOCK(bev);
Packit e9ba0d
	bevp = BEV_UPCAST(bev);
Packit e9ba0d
	EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
Packit e9ba0d
	old_limit = bevp->rate_limiting->limit.write_limit;
Packit e9ba0d
Packit e9ba0d
	new_limit = (bevp->rate_limiting->limit.write_limit -= decr);
Packit e9ba0d
	if (old_limit > 0 && new_limit <= 0) {
Packit e9ba0d
		bufferevent_suspend_write(bev, BEV_SUSPEND_BW);
Packit e9ba0d
		if (event_add(&bevp->rate_limiting->refill_bucket_event,
Packit e9ba0d
			&bevp->rate_limiting->cfg->tick_timeout) < 0)
Packit e9ba0d
			r = -1;
Packit e9ba0d
	} else if (old_limit <= 0 && new_limit > 0) {
Packit e9ba0d
		if (!(bevp->read_suspended & BEV_SUSPEND_BW))
Packit e9ba0d
			event_del(&bevp->rate_limiting->refill_bucket_event);
Packit e9ba0d
		bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW);
Packit e9ba0d
	}
Packit e9ba0d
Packit e9ba0d
	BEV_UNLOCK(bev);
Packit e9ba0d
	return r;
Packit e9ba0d
}
Packit e9ba0d
Packit e9ba0d
int
Packit e9ba0d
bufferevent_rate_limit_group_decrement_read(
Packit e9ba0d
	struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
Packit e9ba0d
{
Packit e9ba0d
	int r = 0;
Packit e9ba0d
	ev_ssize_t old_limit, new_limit;
Packit e9ba0d
	LOCK_GROUP(grp);
Packit e9ba0d
	old_limit = grp->rate_limit.read_limit;
Packit e9ba0d
	new_limit = (grp->rate_limit.read_limit -= decr);
Packit e9ba0d
Packit e9ba0d
	if (old_limit > 0 && new_limit <= 0) {
Packit e9ba0d
		_bev_group_suspend_reading(grp);
Packit e9ba0d
	} else if (old_limit <= 0 && new_limit > 0) {
Packit e9ba0d
		_bev_group_unsuspend_reading(grp);
Packit e9ba0d
	}
Packit e9ba0d
Packit e9ba0d
	UNLOCK_GROUP(grp);
Packit e9ba0d
	return r;
Packit e9ba0d
}
Packit e9ba0d
Packit e9ba0d
int
Packit e9ba0d
bufferevent_rate_limit_group_decrement_write(
Packit e9ba0d
	struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
Packit e9ba0d
{
Packit e9ba0d
	int r = 0;
Packit e9ba0d
	ev_ssize_t old_limit, new_limit;
Packit e9ba0d
	LOCK_GROUP(grp);
Packit e9ba0d
	old_limit = grp->rate_limit.write_limit;
Packit e9ba0d
	new_limit = (grp->rate_limit.write_limit -= decr);
Packit e9ba0d
Packit e9ba0d
	if (old_limit > 0 && new_limit <= 0) {
Packit e9ba0d
		_bev_group_suspend_writing(grp);
Packit e9ba0d
	} else if (old_limit <= 0 && new_limit > 0) {
Packit e9ba0d
		_bev_group_unsuspend_writing(grp);
Packit e9ba0d
	}
Packit e9ba0d
Packit e9ba0d
	UNLOCK_GROUP(grp);
Packit e9ba0d
	return r;
Packit e9ba0d
}
Packit e9ba0d
Packit e9ba0d
void
Packit e9ba0d
bufferevent_rate_limit_group_get_totals(struct bufferevent_rate_limit_group *grp,
Packit e9ba0d
    ev_uint64_t *total_read_out, ev_uint64_t *total_written_out)
Packit e9ba0d
{
Packit e9ba0d
	EVUTIL_ASSERT(grp != NULL);
Packit e9ba0d
	if (total_read_out)
Packit e9ba0d
		*total_read_out = grp->total_read;
Packit e9ba0d
	if (total_written_out)
Packit e9ba0d
		*total_written_out = grp->total_written;
Packit e9ba0d
}
Packit e9ba0d
Packit e9ba0d
void
Packit e9ba0d
bufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group *grp)
Packit e9ba0d
{
Packit e9ba0d
	grp->total_read = grp->total_written = 0;
Packit e9ba0d
}