]> arthur.barton.de Git - netatalk.git/blob - libevent/bufferevent_ratelim.c
Fix distcheck error caused by pkgconfig stuff
[netatalk.git] / libevent / bufferevent_ratelim.c
1 /*
2  * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
3  * Copyright (c) 2002-2006 Niels Provos <provos@citi.umich.edu>
4  * All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions
8  * are met:
9  * 1. Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  * 2. Redistributions in binary form must reproduce the above copyright
12  *    notice, this list of conditions and the following disclaimer in the
13  *    documentation and/or other materials provided with the distribution.
14  * 3. The name of the author may not be used to endorse or promote products
15  *    derived from this software without specific prior written permission.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27  */
28
29 #include <sys/types.h>
30 #include <limits.h>
31 #include <string.h>
32 #include <stdlib.h>
33
34 #include "event2/event.h"
35 #include "event2/event_struct.h"
36 #include "event2/util.h"
37 #include "event2/bufferevent.h"
38 #include "event2/bufferevent_struct.h"
39 #include "event2/buffer.h"
40
41 #include "ratelim-internal.h"
42
43 #include "bufferevent-internal.h"
44 #include "mm-internal.h"
45 #include "util-internal.h"
46 #include "event-internal.h"
47
48 int
49 ev_token_bucket_init(struct ev_token_bucket *bucket,
50     const struct ev_token_bucket_cfg *cfg,
51     ev_uint32_t current_tick,
52     int reinitialize)
53 {
54         if (reinitialize) {
55                 /* on reinitialization, we only clip downwards, since we've
56                    already used who-knows-how-much bandwidth this tick.  We
57                    leave "last_updated" as it is; the next update will add the
58                    appropriate amount of bandwidth to the bucket.
59                 */
60                 if (bucket->read_limit > (ev_int64_t) cfg->read_maximum)
61                         bucket->read_limit = cfg->read_maximum;
62                 if (bucket->write_limit > (ev_int64_t) cfg->write_maximum)
63                         bucket->write_limit = cfg->write_maximum;
64         } else {
65                 bucket->read_limit = cfg->read_rate;
66                 bucket->write_limit = cfg->write_rate;
67                 bucket->last_updated = current_tick;
68         }
69         return 0;
70 }
71
72 int
73 ev_token_bucket_update(struct ev_token_bucket *bucket,
74     const struct ev_token_bucket_cfg *cfg,
75     ev_uint32_t current_tick)
76 {
77         /* It's okay if the tick number overflows, since we'll just
78          * wrap around when we do the unsigned substraction. */
79         unsigned n_ticks = current_tick - bucket->last_updated;
80
81         /* Make sure some ticks actually happened, and that time didn't
82          * roll back. */
83         if (n_ticks == 0 || n_ticks > INT_MAX)
84                 return 0;
85
86         /* Naively, we would say
87                 bucket->limit += n_ticks * cfg->rate;
88
89                 if (bucket->limit > cfg->maximum)
90                         bucket->limit = cfg->maximum;
91
92            But we're worried about overflow, so we do it like this:
93         */
94
95         if ((cfg->read_maximum - bucket->read_limit) / n_ticks < cfg->read_rate)
96                 bucket->read_limit = cfg->read_maximum;
97         else
98                 bucket->read_limit += n_ticks * cfg->read_rate;
99
100
101         if ((cfg->write_maximum - bucket->write_limit) / n_ticks < cfg->write_rate)
102                 bucket->write_limit = cfg->write_maximum;
103         else
104                 bucket->write_limit += n_ticks * cfg->write_rate;
105
106
107         bucket->last_updated = current_tick;
108
109         return 1;
110 }
111
112 static inline void
113 bufferevent_update_buckets(struct bufferevent_private *bev)
114 {
115         /* Must hold lock on bev. */
116         struct timeval now;
117         unsigned tick;
118         event_base_gettimeofday_cached(bev->bev.ev_base, &now);
119         tick = ev_token_bucket_get_tick(&now, bev->rate_limiting->cfg);
120         if (tick != bev->rate_limiting->limit.last_updated)
121                 ev_token_bucket_update(&bev->rate_limiting->limit,
122                     bev->rate_limiting->cfg, tick);
123 }
124
125 ev_uint32_t
126 ev_token_bucket_get_tick(const struct timeval *tv,
127     const struct ev_token_bucket_cfg *cfg)
128 {
129         /* This computation uses two multiplies and a divide.  We could do
130          * fewer if we knew that the tick length was an integer number of
131          * seconds, or if we knew it divided evenly into a second.  We should
132          * investigate that more.
133          */
134
135         /* We cast to an ev_uint64_t first, since we don't want to overflow
136          * before we do the final divide. */
137         ev_uint64_t msec = (ev_uint64_t)tv->tv_sec * 1000 + tv->tv_usec / 1000;
138         return (unsigned)(msec / cfg->msec_per_tick);
139 }
140
141 struct ev_token_bucket_cfg *
142 ev_token_bucket_cfg_new(size_t read_rate, size_t read_burst,
143     size_t write_rate, size_t write_burst,
144     const struct timeval *tick_len)
145 {
146         struct ev_token_bucket_cfg *r;
147         struct timeval g;
148         if (! tick_len) {
149                 g.tv_sec = 1;
150                 g.tv_usec = 0;
151                 tick_len = &g;
152         }
153         if (read_rate > read_burst || write_rate > write_burst ||
154             read_rate < 1 || write_rate < 1)
155                 return NULL;
156         if (read_rate > EV_RATE_LIMIT_MAX ||
157             write_rate > EV_RATE_LIMIT_MAX ||
158             read_burst > EV_RATE_LIMIT_MAX ||
159             write_burst > EV_RATE_LIMIT_MAX)
160                 return NULL;
161         r = mm_calloc(1, sizeof(struct ev_token_bucket_cfg));
162         if (!r)
163                 return NULL;
164         r->read_rate = read_rate;
165         r->write_rate = write_rate;
166         r->read_maximum = read_burst;
167         r->write_maximum = write_burst;
168         memcpy(&r->tick_timeout, tick_len, sizeof(struct timeval));
169         r->msec_per_tick = (tick_len->tv_sec * 1000) +
170             (tick_len->tv_usec & COMMON_TIMEOUT_MICROSECONDS_MASK)/1000;
171         return r;
172 }
173
174 void
175 ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg)
176 {
177         mm_free(cfg);
178 }
179
180 /* No matter how big our bucket gets, don't try to read more than this
181  * much in a single read operation. */
182 #define MAX_TO_READ_EVER 16384
183 /* No matter how big our bucket gets, don't try to write more than this
184  * much in a single write operation. */
185 #define MAX_TO_WRITE_EVER 16384
186
187 #define LOCK_GROUP(g) EVLOCK_LOCK((g)->lock, 0)
188 #define UNLOCK_GROUP(g) EVLOCK_UNLOCK((g)->lock, 0)
189
190 static int _bev_group_suspend_reading(struct bufferevent_rate_limit_group *g);
191 static int _bev_group_suspend_writing(struct bufferevent_rate_limit_group *g);
192 static void _bev_group_unsuspend_reading(struct bufferevent_rate_limit_group *g);
193 static void _bev_group_unsuspend_writing(struct bufferevent_rate_limit_group *g);
194
195 /** Helper: figure out the maximum amount we should write if is_write, or
196     the maximum amount we should read if is_read.  Return that maximum, or
197     0 if our bucket is wholly exhausted.
198  */
199 static inline ev_ssize_t
200 _bufferevent_get_rlim_max(struct bufferevent_private *bev, int is_write)
201 {
202         /* needs lock on bev. */
203         ev_ssize_t max_so_far = is_write?MAX_TO_WRITE_EVER:MAX_TO_READ_EVER;
204
205 #define LIM(x)                                          \
206         (is_write ? (x).write_limit : (x).read_limit)
207
208 #define GROUP_SUSPENDED(g)                      \
209         (is_write ? (g)->write_suspended : (g)->read_suspended)
210
211         /* Sets max_so_far to MIN(x, max_so_far) */
212 #define CLAMPTO(x)                              \
213         do {                                    \
214                 if (max_so_far > (x))           \
215                         max_so_far = (x);       \
216         } while (0);
217
218         if (!bev->rate_limiting)
219                 return max_so_far;
220
221         /* If rate-limiting is enabled at all, update the appropriate
222            bucket, and take the smaller of our rate limit and the group
223            rate limit.
224          */
225
226         if (bev->rate_limiting->cfg) {
227                 bufferevent_update_buckets(bev);
228                 max_so_far = LIM(bev->rate_limiting->limit);
229         }
230         if (bev->rate_limiting->group) {
231                 struct bufferevent_rate_limit_group *g =
232                     bev->rate_limiting->group;
233                 ev_ssize_t share;
234                 LOCK_GROUP(g);
235                 if (GROUP_SUSPENDED(g)) {
236                         /* We can get here if we failed to lock this
237                          * particular bufferevent while suspending the whole
238                          * group. */
239                         if (is_write)
240                                 bufferevent_suspend_write(&bev->bev,
241                                     BEV_SUSPEND_BW_GROUP);
242                         else
243                                 bufferevent_suspend_read(&bev->bev,
244                                     BEV_SUSPEND_BW_GROUP);
245                         share = 0;
246                 } else {
247                         /* XXXX probably we should divide among the active
248                          * members, not the total members. */
249                         share = LIM(g->rate_limit) / g->n_members;
250                         if (share < g->min_share)
251                                 share = g->min_share;
252                 }
253                 UNLOCK_GROUP(g);
254                 CLAMPTO(share);
255         }
256
257         if (max_so_far < 0)
258                 max_so_far = 0;
259         return max_so_far;
260 }
261
262 ev_ssize_t
263 _bufferevent_get_read_max(struct bufferevent_private *bev)
264 {
265         return _bufferevent_get_rlim_max(bev, 0);
266 }
267
268 ev_ssize_t
269 _bufferevent_get_write_max(struct bufferevent_private *bev)
270 {
271         return _bufferevent_get_rlim_max(bev, 1);
272 }
273
274 int
275 _bufferevent_decrement_read_buckets(struct bufferevent_private *bev, ev_ssize_t bytes)
276 {
277         /* XXXXX Make sure all users of this function check its return value */
278         int r = 0;
279         /* need to hold lock on bev */
280         if (!bev->rate_limiting)
281                 return 0;
282
283         if (bev->rate_limiting->cfg) {
284                 bev->rate_limiting->limit.read_limit -= bytes;
285                 if (bev->rate_limiting->limit.read_limit <= 0) {
286                         bufferevent_suspend_read(&bev->bev, BEV_SUSPEND_BW);
287                         if (event_add(&bev->rate_limiting->refill_bucket_event,
288                                 &bev->rate_limiting->cfg->tick_timeout) < 0)
289                                 r = -1;
290                 } else if (bev->read_suspended & BEV_SUSPEND_BW) {
291                         if (!(bev->write_suspended & BEV_SUSPEND_BW))
292                                 event_del(&bev->rate_limiting->refill_bucket_event);
293                         bufferevent_unsuspend_read(&bev->bev, BEV_SUSPEND_BW);
294                 }
295         }
296
297         if (bev->rate_limiting->group) {
298                 LOCK_GROUP(bev->rate_limiting->group);
299                 bev->rate_limiting->group->rate_limit.read_limit -= bytes;
300                 bev->rate_limiting->group->total_read += bytes;
301                 if (bev->rate_limiting->group->rate_limit.read_limit <= 0) {
302                         _bev_group_suspend_reading(bev->rate_limiting->group);
303                 } else if (bev->rate_limiting->group->read_suspended) {
304                         _bev_group_unsuspend_reading(bev->rate_limiting->group);
305                 }
306                 UNLOCK_GROUP(bev->rate_limiting->group);
307         }
308
309         return r;
310 }
311
312 int
313 _bufferevent_decrement_write_buckets(struct bufferevent_private *bev, ev_ssize_t bytes)
314 {
315         /* XXXXX Make sure all users of this function check its return value */
316         int r = 0;
317         /* need to hold lock */
318         if (!bev->rate_limiting)
319                 return 0;
320
321         if (bev->rate_limiting->cfg) {
322                 bev->rate_limiting->limit.write_limit -= bytes;
323                 if (bev->rate_limiting->limit.write_limit <= 0) {
324                         bufferevent_suspend_write(&bev->bev, BEV_SUSPEND_BW);
325                         if (event_add(&bev->rate_limiting->refill_bucket_event,
326                                 &bev->rate_limiting->cfg->tick_timeout) < 0)
327                                 r = -1;
328                 } else if (bev->write_suspended & BEV_SUSPEND_BW) {
329                         if (!(bev->read_suspended & BEV_SUSPEND_BW))
330                                 event_del(&bev->rate_limiting->refill_bucket_event);
331                         bufferevent_unsuspend_write(&bev->bev, BEV_SUSPEND_BW);
332                 }
333         }
334
335         if (bev->rate_limiting->group) {
336                 LOCK_GROUP(bev->rate_limiting->group);
337                 bev->rate_limiting->group->rate_limit.write_limit -= bytes;
338                 bev->rate_limiting->group->total_written += bytes;
339                 if (bev->rate_limiting->group->rate_limit.write_limit <= 0) {
340                         _bev_group_suspend_writing(bev->rate_limiting->group);
341                 } else if (bev->rate_limiting->group->write_suspended) {
342                         _bev_group_unsuspend_writing(bev->rate_limiting->group);
343                 }
344                 UNLOCK_GROUP(bev->rate_limiting->group);
345         }
346
347         return r;
348 }
349
350 /** Stop reading on every bufferevent in <b>g</b> */
351 static int
352 _bev_group_suspend_reading(struct bufferevent_rate_limit_group *g)
353 {
354         /* Needs group lock */
355         struct bufferevent_private *bev;
356         g->read_suspended = 1;
357         g->pending_unsuspend_read = 0;
358
359         /* Note that in this loop we call EVLOCK_TRY_LOCK instead of BEV_LOCK,
360            to prevent a deadlock.  (Ordinarily, the group lock nests inside
361            the bufferevent locks.  If we are unable to lock any individual
362            bufferevent, it will find out later when it looks at its limit
363            and sees that its group is suspended.
364         */
365         TAILQ_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
366                 if (EVLOCK_TRY_LOCK(bev->lock)) {
367                         bufferevent_suspend_read(&bev->bev,
368                             BEV_SUSPEND_BW_GROUP);
369                         EVLOCK_UNLOCK(bev->lock, 0);
370                 }
371         }
372         return 0;
373 }
374
375 /** Stop writing on every bufferevent in <b>g</b> */
376 static int
377 _bev_group_suspend_writing(struct bufferevent_rate_limit_group *g)
378 {
379         /* Needs group lock */
380         struct bufferevent_private *bev;
381         g->write_suspended = 1;
382         g->pending_unsuspend_write = 0;
383         TAILQ_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
384                 if (EVLOCK_TRY_LOCK(bev->lock)) {
385                         bufferevent_suspend_write(&bev->bev,
386                             BEV_SUSPEND_BW_GROUP);
387                         EVLOCK_UNLOCK(bev->lock, 0);
388                 }
389         }
390         return 0;
391 }
392
393 /** Timer callback invoked on a single bufferevent with one or more exhausted
394     buckets when they are ready to refill. */
395 static void
396 _bev_refill_callback(evutil_socket_t fd, short what, void *arg)
397 {
398         unsigned tick;
399         struct timeval now;
400         struct bufferevent_private *bev = arg;
401         int again = 0;
402         BEV_LOCK(&bev->bev);
403         if (!bev->rate_limiting || !bev->rate_limiting->cfg) {
404                 BEV_UNLOCK(&bev->bev);
405                 return;
406         }
407
408         /* First, update the bucket */
409         event_base_gettimeofday_cached(bev->bev.ev_base, &now);
410         tick = ev_token_bucket_get_tick(&now,
411             bev->rate_limiting->cfg);
412         ev_token_bucket_update(&bev->rate_limiting->limit,
413             bev->rate_limiting->cfg,
414             tick);
415
416         /* Now unsuspend any read/write operations as appropriate. */
417         if ((bev->read_suspended & BEV_SUSPEND_BW)) {
418                 if (bev->rate_limiting->limit.read_limit > 0)
419                         bufferevent_unsuspend_read(&bev->bev, BEV_SUSPEND_BW);
420                 else
421                         again = 1;
422         }
423         if ((bev->write_suspended & BEV_SUSPEND_BW)) {
424                 if (bev->rate_limiting->limit.write_limit > 0)
425                         bufferevent_unsuspend_write(&bev->bev, BEV_SUSPEND_BW);
426                 else
427                         again = 1;
428         }
429         if (again) {
430                 /* One or more of the buckets may need another refill if they
431                    started negative.
432
433                    XXXX if we need to be quiet for more ticks, we should
434                    maybe figure out what timeout we really want.
435                 */
436                 /* XXXX Handle event_add failure somehow */
437                 event_add(&bev->rate_limiting->refill_bucket_event,
438                     &bev->rate_limiting->cfg->tick_timeout);
439         }
440         BEV_UNLOCK(&bev->bev);
441 }
442
443 /** Helper: grab a random element from a bufferevent group. */
444 static struct bufferevent_private *
445 _bev_group_random_element(struct bufferevent_rate_limit_group *group)
446 {
447         int which;
448         struct bufferevent_private *bev;
449
450         /* requires group lock */
451
452         if (!group->n_members)
453                 return NULL;
454
455         EVUTIL_ASSERT(! TAILQ_EMPTY(&group->members));
456
457         which = _evutil_weakrand() % group->n_members;
458
459         bev = TAILQ_FIRST(&group->members);
460         while (which--)
461                 bev = TAILQ_NEXT(bev, rate_limiting->next_in_group);
462
463         return bev;
464 }
465
466 /** Iterate over the elements of a rate-limiting group 'g' with a random
467     starting point, assigning each to the variable 'bev', and executing the
468     block 'block'.
469
470     We do this in a half-baked effort to get fairness among group members.
471     XXX Round-robin or some kind of priority queue would be even more fair.
472  */
473 #define FOREACH_RANDOM_ORDER(block)                     \
474         do {                                            \
475                 first = _bev_group_random_element(g);   \
476                 for (bev = first; bev != TAILQ_END(&g->members); \
477                     bev = TAILQ_NEXT(bev, rate_limiting->next_in_group)) { \
478                         block ;                                  \
479                 }                                                \
480                 for (bev = TAILQ_FIRST(&g->members); bev && bev != first; \
481                     bev = TAILQ_NEXT(bev, rate_limiting->next_in_group)) { \
482                         block ;                                         \
483                 }                                                       \
484         } while (0)
485
486 static void
487 _bev_group_unsuspend_reading(struct bufferevent_rate_limit_group *g)
488 {
489         int again = 0;
490         struct bufferevent_private *bev, *first;
491
492         g->read_suspended = 0;
493         FOREACH_RANDOM_ORDER({
494                 if (EVLOCK_TRY_LOCK(bev->lock)) {
495                         bufferevent_unsuspend_read(&bev->bev,
496                             BEV_SUSPEND_BW_GROUP);
497                         EVLOCK_UNLOCK(bev->lock, 0);
498                 } else {
499                         again = 1;
500                 }
501         });
502         g->pending_unsuspend_read = again;
503 }
504
505 static void
506 _bev_group_unsuspend_writing(struct bufferevent_rate_limit_group *g)
507 {
508         int again = 0;
509         struct bufferevent_private *bev, *first;
510         g->write_suspended = 0;
511
512         FOREACH_RANDOM_ORDER({
513                 if (EVLOCK_TRY_LOCK(bev->lock)) {
514                         bufferevent_unsuspend_write(&bev->bev,
515                             BEV_SUSPEND_BW_GROUP);
516                         EVLOCK_UNLOCK(bev->lock, 0);
517                 } else {
518                         again = 1;
519                 }
520         });
521         g->pending_unsuspend_write = again;
522 }
523
524 /** Callback invoked every tick to add more elements to the group bucket
525     and unsuspend group members as needed.
526  */
527 static void
528 _bev_group_refill_callback(evutil_socket_t fd, short what, void *arg)
529 {
530         struct bufferevent_rate_limit_group *g = arg;
531         unsigned tick;
532         struct timeval now;
533
534         event_base_gettimeofday_cached(event_get_base(&g->master_refill_event), &now);
535
536         LOCK_GROUP(g);
537
538         tick = ev_token_bucket_get_tick(&now, &g->rate_limit_cfg);
539         ev_token_bucket_update(&g->rate_limit, &g->rate_limit_cfg, tick);
540
541         if (g->pending_unsuspend_read ||
542             (g->read_suspended && (g->rate_limit.read_limit >= g->min_share))) {
543                 _bev_group_unsuspend_reading(g);
544         }
545         if (g->pending_unsuspend_write ||
546             (g->write_suspended && (g->rate_limit.write_limit >= g->min_share))){
547                 _bev_group_unsuspend_writing(g);
548         }
549
550         /* XXXX Rather than waiting to the next tick to unsuspend stuff
551          * with pending_unsuspend_write/read, we should do it on the
552          * next iteration of the mainloop.
553          */
554
555         UNLOCK_GROUP(g);
556 }
557
558 int
559 bufferevent_set_rate_limit(struct bufferevent *bev,
560     struct ev_token_bucket_cfg *cfg)
561 {
562         struct bufferevent_private *bevp =
563             EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
564         int r = -1;
565         struct bufferevent_rate_limit *rlim;
566         struct timeval now;
567         ev_uint32_t tick;
568         int reinit = 0, suspended = 0;
569         /* XXX reference-count cfg */
570
571         BEV_LOCK(bev);
572
573         if (cfg == NULL) {
574                 if (bevp->rate_limiting) {
575                         rlim = bevp->rate_limiting;
576                         rlim->cfg = NULL;
577                         bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW);
578                         bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW);
579                         if (event_initialized(&rlim->refill_bucket_event))
580                                 event_del(&rlim->refill_bucket_event);
581                 }
582                 r = 0;
583                 goto done;
584         }
585
586         event_base_gettimeofday_cached(bev->ev_base, &now);
587         tick = ev_token_bucket_get_tick(&now, cfg);
588
589         if (bevp->rate_limiting && bevp->rate_limiting->cfg == cfg) {
590                 /* no-op */
591                 r = 0;
592                 goto done;
593         }
594         if (bevp->rate_limiting == NULL) {
595                 rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
596                 if (!rlim)
597                         goto done;
598                 bevp->rate_limiting = rlim;
599         } else {
600                 rlim = bevp->rate_limiting;
601         }
602         reinit = rlim->cfg != NULL;
603
604         rlim->cfg = cfg;
605         ev_token_bucket_init(&rlim->limit, cfg, tick, reinit);
606
607         if (reinit) {
608                 EVUTIL_ASSERT(event_initialized(&rlim->refill_bucket_event));
609                 event_del(&rlim->refill_bucket_event);
610         }
611         evtimer_assign(&rlim->refill_bucket_event, bev->ev_base,
612             _bev_refill_callback, bevp);
613
614         if (rlim->limit.read_limit > 0) {
615                 bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW);
616         } else {
617                 bufferevent_suspend_read(bev, BEV_SUSPEND_BW);
618                 suspended=1;
619         }
620         if (rlim->limit.write_limit > 0) {
621                 bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW);
622         } else {
623                 bufferevent_suspend_write(bev, BEV_SUSPEND_BW);
624                 suspended = 1;
625         }
626
627         if (suspended)
628                 event_add(&rlim->refill_bucket_event, &cfg->tick_timeout);
629
630         r = 0;
631
632 done:
633         BEV_UNLOCK(bev);
634         return r;
635 }
636
637 struct bufferevent_rate_limit_group *
638 bufferevent_rate_limit_group_new(struct event_base *base,
639     const struct ev_token_bucket_cfg *cfg)
640 {
641         struct bufferevent_rate_limit_group *g;
642         struct timeval now;
643         ev_uint32_t tick;
644
645         event_base_gettimeofday_cached(base, &now);
646         tick = ev_token_bucket_get_tick(&now, cfg);
647
648         g = mm_calloc(1, sizeof(struct bufferevent_rate_limit_group));
649         if (!g)
650                 return NULL;
651         memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
652         TAILQ_INIT(&g->members);
653
654         ev_token_bucket_init(&g->rate_limit, cfg, tick, 0);
655
656         event_assign(&g->master_refill_event, base, -1, EV_PERSIST,
657             _bev_group_refill_callback, g);
658         /*XXXX handle event_add failure */
659         event_add(&g->master_refill_event, &cfg->tick_timeout);
660
661         EVTHREAD_ALLOC_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
662
663         bufferevent_rate_limit_group_set_min_share(g, 64);
664
665         return g;
666 }
667
668 int
669 bufferevent_rate_limit_group_set_cfg(
670         struct bufferevent_rate_limit_group *g,
671         const struct ev_token_bucket_cfg *cfg)
672 {
673         int same_tick;
674         if (!g || !cfg)
675                 return -1;
676
677         LOCK_GROUP(g);
678         same_tick = evutil_timercmp(
679                 &g->rate_limit_cfg.tick_timeout, &cfg->tick_timeout, ==);
680         memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
681
682         if (g->rate_limit.read_limit > (ev_ssize_t)cfg->read_maximum)
683                 g->rate_limit.read_limit = cfg->read_maximum;
684         if (g->rate_limit.write_limit > (ev_ssize_t)cfg->write_maximum)
685                 g->rate_limit.write_limit = cfg->write_maximum;
686
687         if (!same_tick) {
688                 /* This can cause a hiccup in the schedule */
689                 event_add(&g->master_refill_event, &cfg->tick_timeout);
690         }
691
692         /* The new limits might force us to adjust min_share differently. */
693         bufferevent_rate_limit_group_set_min_share(g, g->configured_min_share);
694
695         UNLOCK_GROUP(g);
696         return 0;
697 }
698
699 int
700 bufferevent_rate_limit_group_set_min_share(
701         struct bufferevent_rate_limit_group *g,
702         size_t share)
703 {
704         if (share > EV_SSIZE_MAX)
705                 return -1;
706
707         g->configured_min_share = share;
708
709         /* Can't set share to less than the one-tick maximum.  IOW, at steady
710          * state, at least one connection can go per tick. */
711         if (share > g->rate_limit_cfg.read_rate)
712                 share = g->rate_limit_cfg.read_rate;
713         if (share > g->rate_limit_cfg.write_rate)
714                 share = g->rate_limit_cfg.write_rate;
715
716         g->min_share = share;
717         return 0;
718 }
719
720 void
721 bufferevent_rate_limit_group_free(struct bufferevent_rate_limit_group *g)
722 {
723         LOCK_GROUP(g);
724         EVUTIL_ASSERT(0 == g->n_members);
725         event_del(&g->master_refill_event);
726         UNLOCK_GROUP(g);
727         EVTHREAD_FREE_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
728         mm_free(g);
729 }
730
731 int
732 bufferevent_add_to_rate_limit_group(struct bufferevent *bev,
733     struct bufferevent_rate_limit_group *g)
734 {
735         int wsuspend, rsuspend;
736         struct bufferevent_private *bevp =
737             EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
738         BEV_LOCK(bev);
739
740         if (!bevp->rate_limiting) {
741                 struct bufferevent_rate_limit *rlim;
742                 rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
743                 if (!rlim) {
744                         BEV_UNLOCK(bev);
745                         return -1;
746                 }
747                 evtimer_assign(&rlim->refill_bucket_event, bev->ev_base,
748                     _bev_refill_callback, bevp);
749                 bevp->rate_limiting = rlim;
750         }
751
752         if (bevp->rate_limiting->group == g) {
753                 BEV_UNLOCK(bev);
754                 return 0;
755         }
756         if (bevp->rate_limiting->group)
757                 bufferevent_remove_from_rate_limit_group(bev);
758
759         LOCK_GROUP(g);
760         bevp->rate_limiting->group = g;
761         ++g->n_members;
762         TAILQ_INSERT_TAIL(&g->members, bevp, rate_limiting->next_in_group);
763
764         rsuspend = g->read_suspended;
765         wsuspend = g->write_suspended;
766
767         UNLOCK_GROUP(g);
768
769         if (rsuspend)
770                 bufferevent_suspend_read(bev, BEV_SUSPEND_BW_GROUP);
771         if (wsuspend)
772                 bufferevent_suspend_write(bev, BEV_SUSPEND_BW_GROUP);
773
774         BEV_UNLOCK(bev);
775         return 0;
776 }
777
778 int
779 bufferevent_remove_from_rate_limit_group(struct bufferevent *bev)
780 {
781         return bufferevent_remove_from_rate_limit_group_internal(bev, 1);
782 }
783
784 int
785 bufferevent_remove_from_rate_limit_group_internal(struct bufferevent *bev,
786     int unsuspend)
787 {
788         struct bufferevent_private *bevp =
789             EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
790         BEV_LOCK(bev);
791         if (bevp->rate_limiting && bevp->rate_limiting->group) {
792                 struct bufferevent_rate_limit_group *g =
793                     bevp->rate_limiting->group;
794                 LOCK_GROUP(g);
795                 bevp->rate_limiting->group = NULL;
796                 --g->n_members;
797                 TAILQ_REMOVE(&g->members, bevp, rate_limiting->next_in_group);
798                 UNLOCK_GROUP(g);
799         }
800         if (unsuspend) {
801                 bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW_GROUP);
802                 bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW_GROUP);
803         }
804         BEV_UNLOCK(bev);
805         return 0;
806 }
807
808 /* ===
809  * API functions to expose rate limits.
810  *
811  * Don't use these from inside Libevent; they're meant to be for use by
812  * the program.
813  * === */
814
815 /* Mostly you don't want to use this function from inside libevent;
816  * _bufferevent_get_read_max() is more likely what you want*/
817 ev_ssize_t
818 bufferevent_get_read_limit(struct bufferevent *bev)
819 {
820         ev_ssize_t r;
821         struct bufferevent_private *bevp;
822         BEV_LOCK(bev);
823         bevp = BEV_UPCAST(bev);
824         if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
825                 bufferevent_update_buckets(bevp);
826                 r = bevp->rate_limiting->limit.read_limit;
827         } else {
828                 r = EV_SSIZE_MAX;
829         }
830         BEV_UNLOCK(bev);
831         return r;
832 }
833
834 /* Mostly you don't want to use this function from inside libevent;
835  * _bufferevent_get_write_max() is more likely what you want*/
836 ev_ssize_t
837 bufferevent_get_write_limit(struct bufferevent *bev)
838 {
839         ev_ssize_t r;
840         struct bufferevent_private *bevp;
841         BEV_LOCK(bev);
842         bevp = BEV_UPCAST(bev);
843         if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
844                 bufferevent_update_buckets(bevp);
845                 r = bevp->rate_limiting->limit.write_limit;
846         } else {
847                 r = EV_SSIZE_MAX;
848         }
849         BEV_UNLOCK(bev);
850         return r;
851 }
852
853 ev_ssize_t
854 bufferevent_get_max_to_read(struct bufferevent *bev)
855 {
856         ev_ssize_t r;
857         BEV_LOCK(bev);
858         r = _bufferevent_get_read_max(BEV_UPCAST(bev));
859         BEV_UNLOCK(bev);
860         return r;
861 }
862
863 ev_ssize_t
864 bufferevent_get_max_to_write(struct bufferevent *bev)
865 {
866         ev_ssize_t r;
867         BEV_LOCK(bev);
868         r = _bufferevent_get_write_max(BEV_UPCAST(bev));
869         BEV_UNLOCK(bev);
870         return r;
871 }
872
873
874 /* Mostly you don't want to use this function from inside libevent;
875  * _bufferevent_get_read_max() is more likely what you want*/
876 ev_ssize_t
877 bufferevent_rate_limit_group_get_read_limit(
878         struct bufferevent_rate_limit_group *grp)
879 {
880         ev_ssize_t r;
881         LOCK_GROUP(grp);
882         r = grp->rate_limit.read_limit;
883         UNLOCK_GROUP(grp);
884         return r;
885 }
886
887 /* Mostly you don't want to use this function from inside libevent;
888  * _bufferevent_get_write_max() is more likely what you want. */
889 ev_ssize_t
890 bufferevent_rate_limit_group_get_write_limit(
891         struct bufferevent_rate_limit_group *grp)
892 {
893         ev_ssize_t r;
894         LOCK_GROUP(grp);
895         r = grp->rate_limit.write_limit;
896         UNLOCK_GROUP(grp);
897         return r;
898 }
899
900 int
901 bufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr)
902 {
903         int r = 0;
904         ev_ssize_t old_limit, new_limit;
905         struct bufferevent_private *bevp;
906         BEV_LOCK(bev);
907         bevp = BEV_UPCAST(bev);
908         EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
909         old_limit = bevp->rate_limiting->limit.read_limit;
910
911         new_limit = (bevp->rate_limiting->limit.read_limit -= decr);
912         if (old_limit > 0 && new_limit <= 0) {
913                 bufferevent_suspend_read(bev, BEV_SUSPEND_BW);
914                 if (event_add(&bevp->rate_limiting->refill_bucket_event,
915                         &bevp->rate_limiting->cfg->tick_timeout) < 0)
916                         r = -1;
917         } else if (old_limit <= 0 && new_limit > 0) {
918                 if (!(bevp->write_suspended & BEV_SUSPEND_BW))
919                         event_del(&bevp->rate_limiting->refill_bucket_event);
920                 bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW);
921         }
922
923         BEV_UNLOCK(bev);
924         return r;
925 }
926
927 int
928 bufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr)
929 {
930         /* XXXX this is mostly copy-and-paste from
931          * bufferevent_decrement_read_limit */
932         int r = 0;
933         ev_ssize_t old_limit, new_limit;
934         struct bufferevent_private *bevp;
935         BEV_LOCK(bev);
936         bevp = BEV_UPCAST(bev);
937         EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
938         old_limit = bevp->rate_limiting->limit.write_limit;
939
940         new_limit = (bevp->rate_limiting->limit.write_limit -= decr);
941         if (old_limit > 0 && new_limit <= 0) {
942                 bufferevent_suspend_write(bev, BEV_SUSPEND_BW);
943                 if (event_add(&bevp->rate_limiting->refill_bucket_event,
944                         &bevp->rate_limiting->cfg->tick_timeout) < 0)
945                         r = -1;
946         } else if (old_limit <= 0 && new_limit > 0) {
947                 if (!(bevp->read_suspended & BEV_SUSPEND_BW))
948                         event_del(&bevp->rate_limiting->refill_bucket_event);
949                 bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW);
950         }
951
952         BEV_UNLOCK(bev);
953         return r;
954 }
955
956 int
957 bufferevent_rate_limit_group_decrement_read(
958         struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
959 {
960         int r = 0;
961         ev_ssize_t old_limit, new_limit;
962         LOCK_GROUP(grp);
963         old_limit = grp->rate_limit.read_limit;
964         new_limit = (grp->rate_limit.read_limit -= decr);
965
966         if (old_limit > 0 && new_limit <= 0) {
967                 _bev_group_suspend_reading(grp);
968         } else if (old_limit <= 0 && new_limit > 0) {
969                 _bev_group_unsuspend_reading(grp);
970         }
971
972         UNLOCK_GROUP(grp);
973         return r;
974 }
975
976 int
977 bufferevent_rate_limit_group_decrement_write(
978         struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
979 {
980         int r = 0;
981         ev_ssize_t old_limit, new_limit;
982         LOCK_GROUP(grp);
983         old_limit = grp->rate_limit.write_limit;
984         new_limit = (grp->rate_limit.write_limit -= decr);
985
986         if (old_limit > 0 && new_limit <= 0) {
987                 _bev_group_suspend_writing(grp);
988         } else if (old_limit <= 0 && new_limit > 0) {
989                 _bev_group_unsuspend_writing(grp);
990         }
991
992         UNLOCK_GROUP(grp);
993         return r;
994 }
995
996 void
997 bufferevent_rate_limit_group_get_totals(struct bufferevent_rate_limit_group *grp,
998     ev_uint64_t *total_read_out, ev_uint64_t *total_written_out)
999 {
1000         EVUTIL_ASSERT(grp != NULL);
1001         if (total_read_out)
1002                 *total_read_out = grp->total_read;
1003         if (total_written_out)
1004                 *total_written_out = grp->total_written;
1005 }
1006
1007 void
1008 bufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group *grp)
1009 {
1010         grp->total_read = grp->total_written = 0;
1011 }