/*
- * Copyright (c) 2007-2010 Niels Provos and Nick Mathewson
+ * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
* Copyright (c) 2002-2006 Niels Provos <provos@citi.umich.edu>
* All rights reserved.
*
#include "bufferevent-internal.h"
#include "mm-internal.h"
#include "util-internal.h"
+#include "event-internal.h"
int
ev_token_bucket_init(struct ev_token_bucket *bucket,
r->read_maximum = read_burst;
r->write_maximum = write_burst;
memcpy(&r->tick_timeout, tick_len, sizeof(struct timeval));
- r->msec_per_tick = (tick_len->tv_sec * 1000) + tick_len->tv_usec/1000;
+ r->msec_per_tick = (tick_len->tv_sec * 1000) +
+ (tick_len->tv_usec & COMMON_TIMEOUT_MICROSECONDS_MASK)/1000;
return r;
}
static int _bev_group_suspend_reading(struct bufferevent_rate_limit_group *g);
static int _bev_group_suspend_writing(struct bufferevent_rate_limit_group *g);
+static void _bev_group_unsuspend_reading(struct bufferevent_rate_limit_group *g);
+static void _bev_group_unsuspend_writing(struct bufferevent_rate_limit_group *g);
/** Helper: figure out the maximum amount we should write if is_write, or
the maximum amount we should read if is_read. Return that maximum, or
if (event_add(&bev->rate_limiting->refill_bucket_event,
&bev->rate_limiting->cfg->tick_timeout) < 0)
r = -1;
+ } else if (bev->read_suspended & BEV_SUSPEND_BW) {
+ if (!(bev->write_suspended & BEV_SUSPEND_BW))
+ event_del(&bev->rate_limiting->refill_bucket_event);
+ bufferevent_unsuspend_read(&bev->bev, BEV_SUSPEND_BW);
}
}
bev->rate_limiting->group->total_read += bytes;
if (bev->rate_limiting->group->rate_limit.read_limit <= 0) {
_bev_group_suspend_reading(bev->rate_limiting->group);
+ } else if (bev->rate_limiting->group->read_suspended) {
+ _bev_group_unsuspend_reading(bev->rate_limiting->group);
}
UNLOCK_GROUP(bev->rate_limiting->group);
}
if (event_add(&bev->rate_limiting->refill_bucket_event,
&bev->rate_limiting->cfg->tick_timeout) < 0)
r = -1;
+ } else if (bev->write_suspended & BEV_SUSPEND_BW) {
+ if (!(bev->read_suspended & BEV_SUSPEND_BW))
+ event_del(&bev->rate_limiting->refill_bucket_event);
+ bufferevent_unsuspend_write(&bev->bev, BEV_SUSPEND_BW);
}
}
bev->rate_limiting->group->total_written += bytes;
if (bev->rate_limiting->group->rate_limit.write_limit <= 0) {
_bev_group_suspend_writing(bev->rate_limiting->group);
+ } else if (bev->rate_limiting->group->write_suspended) {
+ _bev_group_unsuspend_writing(bev->rate_limiting->group);
}
UNLOCK_GROUP(bev->rate_limiting->group);
}
event_base_gettimeofday_cached(event_get_base(&g->master_refill_event), &now);
LOCK_GROUP(g);
+
tick = ev_token_bucket_get_tick(&now, &g->rate_limit_cfg);
ev_token_bucket_update(&g->rate_limit, &g->rate_limit_cfg, tick);
ev_token_bucket_init(&g->rate_limit, cfg, tick, 0);
- g->min_share = 64;
event_assign(&g->master_refill_event, base, -1, EV_PERSIST,
_bev_group_refill_callback, g);
/*XXXX handle event_add failure */
event_add(&g->master_refill_event, &cfg->tick_timeout);
EVTHREAD_ALLOC_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
+
+ bufferevent_rate_limit_group_set_min_share(g, 64);
+
return g;
}
event_add(&g->master_refill_event, &cfg->tick_timeout);
}
+ /* The new limits might force us to adjust min_share differently. */
+ bufferevent_rate_limit_group_set_min_share(g, g->configured_min_share);
+
UNLOCK_GROUP(g);
return 0;
}
if (share > EV_SSIZE_MAX)
return -1;
+ g->configured_min_share = share;
+
+ /* Can't set share to less than the one-tick maximum. IOW, at steady
+ * state, at least one connection can go per tick. */
+ if (share > g->rate_limit_cfg.read_rate)
+ share = g->rate_limit_cfg.read_rate;
+ if (share > g->rate_limit_cfg.write_rate)
+ share = g->rate_limit_cfg.write_rate;
+
g->min_share = share;
return 0;
}