]> arthur.barton.de Git - netatalk.git/blob - libevent/bufferevent_ratelim.c
Add libevent
[netatalk.git] / libevent / bufferevent_ratelim.c
1 /*
2  * Copyright (c) 2007-2010 Niels Provos and Nick Mathewson
3  * Copyright (c) 2002-2006 Niels Provos <provos@citi.umich.edu>
4  * All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions
8  * are met:
9  * 1. Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  * 2. Redistributions in binary form must reproduce the above copyright
12  *    notice, this list of conditions and the following disclaimer in the
13  *    documentation and/or other materials provided with the distribution.
14  * 3. The name of the author may not be used to endorse or promote products
15  *    derived from this software without specific prior written permission.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27  */
28
29 #include <sys/types.h>
30 #include <limits.h>
31 #include <string.h>
32 #include <stdlib.h>
33
34 #include "event2/event.h"
35 #include "event2/event_struct.h"
36 #include "event2/util.h"
37 #include "event2/bufferevent.h"
38 #include "event2/bufferevent_struct.h"
39 #include "event2/buffer.h"
40
41 #include "ratelim-internal.h"
42
43 #include "bufferevent-internal.h"
44 #include "mm-internal.h"
45 #include "util-internal.h"
46
47 int
48 ev_token_bucket_init(struct ev_token_bucket *bucket,
49     const struct ev_token_bucket_cfg *cfg,
50     ev_uint32_t current_tick,
51     int reinitialize)
52 {
53         if (reinitialize) {
54                 /* on reinitialization, we only clip downwards, since we've
55                    already used who-knows-how-much bandwidth this tick.  We
56                    leave "last_updated" as it is; the next update will add the
57                    appropriate amount of bandwidth to the bucket.
58                 */
59                 if (bucket->read_limit > (ev_int64_t) cfg->read_maximum)
60                         bucket->read_limit = cfg->read_maximum;
61                 if (bucket->write_limit > (ev_int64_t) cfg->write_maximum)
62                         bucket->write_limit = cfg->write_maximum;
63         } else {
64                 bucket->read_limit = cfg->read_rate;
65                 bucket->write_limit = cfg->write_rate;
66                 bucket->last_updated = current_tick;
67         }
68         return 0;
69 }
70
71 int
72 ev_token_bucket_update(struct ev_token_bucket *bucket,
73     const struct ev_token_bucket_cfg *cfg,
74     ev_uint32_t current_tick)
75 {
76         /* It's okay if the tick number overflows, since we'll just
77          * wrap around when we do the unsigned substraction. */
78         unsigned n_ticks = current_tick - bucket->last_updated;
79
80         /* Make sure some ticks actually happened, and that time didn't
81          * roll back. */
82         if (n_ticks == 0 || n_ticks > INT_MAX)
83                 return 0;
84
85         /* Naively, we would say
86                 bucket->limit += n_ticks * cfg->rate;
87
88                 if (bucket->limit > cfg->maximum)
89                         bucket->limit = cfg->maximum;
90
91            But we're worried about overflow, so we do it like this:
92         */
93
94         if ((cfg->read_maximum - bucket->read_limit) / n_ticks < cfg->read_rate)
95                 bucket->read_limit = cfg->read_maximum;
96         else
97                 bucket->read_limit += n_ticks * cfg->read_rate;
98
99
100         if ((cfg->write_maximum - bucket->write_limit) / n_ticks < cfg->write_rate)
101                 bucket->write_limit = cfg->write_maximum;
102         else
103                 bucket->write_limit += n_ticks * cfg->write_rate;
104
105
106         bucket->last_updated = current_tick;
107
108         return 1;
109 }
110
111 static inline void
112 bufferevent_update_buckets(struct bufferevent_private *bev)
113 {
114         /* Must hold lock on bev. */
115         struct timeval now;
116         unsigned tick;
117         event_base_gettimeofday_cached(bev->bev.ev_base, &now);
118         tick = ev_token_bucket_get_tick(&now, bev->rate_limiting->cfg);
119         if (tick != bev->rate_limiting->limit.last_updated)
120                 ev_token_bucket_update(&bev->rate_limiting->limit,
121                     bev->rate_limiting->cfg, tick);
122 }
123
124 ev_uint32_t
125 ev_token_bucket_get_tick(const struct timeval *tv,
126     const struct ev_token_bucket_cfg *cfg)
127 {
128         /* This computation uses two multiplies and a divide.  We could do
129          * fewer if we knew that the tick length was an integer number of
130          * seconds, or if we knew it divided evenly into a second.  We should
131          * investigate that more.
132          */
133
134         /* We cast to an ev_uint64_t first, since we don't want to overflow
135          * before we do the final divide. */
136         ev_uint64_t msec = (ev_uint64_t)tv->tv_sec * 1000 + tv->tv_usec / 1000;
137         return (unsigned)(msec / cfg->msec_per_tick);
138 }
139
140 struct ev_token_bucket_cfg *
141 ev_token_bucket_cfg_new(size_t read_rate, size_t read_burst,
142     size_t write_rate, size_t write_burst,
143     const struct timeval *tick_len)
144 {
145         struct ev_token_bucket_cfg *r;
146         struct timeval g;
147         if (! tick_len) {
148                 g.tv_sec = 1;
149                 g.tv_usec = 0;
150                 tick_len = &g;
151         }
152         if (read_rate > read_burst || write_rate > write_burst ||
153             read_rate < 1 || write_rate < 1)
154                 return NULL;
155         if (read_rate > EV_RATE_LIMIT_MAX ||
156             write_rate > EV_RATE_LIMIT_MAX ||
157             read_burst > EV_RATE_LIMIT_MAX ||
158             write_burst > EV_RATE_LIMIT_MAX)
159                 return NULL;
160         r = mm_calloc(1, sizeof(struct ev_token_bucket_cfg));
161         if (!r)
162                 return NULL;
163         r->read_rate = read_rate;
164         r->write_rate = write_rate;
165         r->read_maximum = read_burst;
166         r->write_maximum = write_burst;
167         memcpy(&r->tick_timeout, tick_len, sizeof(struct timeval));
168         r->msec_per_tick = (tick_len->tv_sec * 1000) + tick_len->tv_usec/1000;
169         return r;
170 }
171
172 void
173 ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg)
174 {
175         mm_free(cfg);
176 }
177
178 /* No matter how big our bucket gets, don't try to read more than this
179  * much in a single read operation. */
180 #define MAX_TO_READ_EVER 16384
181 /* No matter how big our bucket gets, don't try to write more than this
182  * much in a single write operation. */
183 #define MAX_TO_WRITE_EVER 16384
184
185 #define LOCK_GROUP(g) EVLOCK_LOCK((g)->lock, 0)
186 #define UNLOCK_GROUP(g) EVLOCK_UNLOCK((g)->lock, 0)
187
188 static int _bev_group_suspend_reading(struct bufferevent_rate_limit_group *g);
189 static int _bev_group_suspend_writing(struct bufferevent_rate_limit_group *g);
190
191 /** Helper: figure out the maximum amount we should write if is_write, or
192     the maximum amount we should read if is_read.  Return that maximum, or
193     0 if our bucket is wholly exhausted.
194  */
195 static inline ev_ssize_t
196 _bufferevent_get_rlim_max(struct bufferevent_private *bev, int is_write)
197 {
198         /* needs lock on bev. */
199         ev_ssize_t max_so_far = is_write?MAX_TO_WRITE_EVER:MAX_TO_READ_EVER;
200
201 #define LIM(x)                                          \
202         (is_write ? (x).write_limit : (x).read_limit)
203
204 #define GROUP_SUSPENDED(g)                      \
205         (is_write ? (g)->write_suspended : (g)->read_suspended)
206
207         /* Sets max_so_far to MIN(x, max_so_far) */
208 #define CLAMPTO(x)                              \
209         do {                                    \
210                 if (max_so_far > (x))           \
211                         max_so_far = (x);       \
212         } while (0);
213
214         if (!bev->rate_limiting)
215                 return max_so_far;
216
217         /* If rate-limiting is enabled at all, update the appropriate
218            bucket, and take the smaller of our rate limit and the group
219            rate limit.
220          */
221
222         if (bev->rate_limiting->cfg) {
223                 bufferevent_update_buckets(bev);
224                 max_so_far = LIM(bev->rate_limiting->limit);
225         }
226         if (bev->rate_limiting->group) {
227                 struct bufferevent_rate_limit_group *g =
228                     bev->rate_limiting->group;
229                 ev_ssize_t share;
230                 LOCK_GROUP(g);
231                 if (GROUP_SUSPENDED(g)) {
232                         /* We can get here if we failed to lock this
233                          * particular bufferevent while suspending the whole
234                          * group. */
235                         if (is_write)
236                                 bufferevent_suspend_write(&bev->bev,
237                                     BEV_SUSPEND_BW_GROUP);
238                         else
239                                 bufferevent_suspend_read(&bev->bev,
240                                     BEV_SUSPEND_BW_GROUP);
241                         share = 0;
242                 } else {
243                         /* XXXX probably we should divide among the active
244                          * members, not the total members. */
245                         share = LIM(g->rate_limit) / g->n_members;
246                         if (share < g->min_share)
247                                 share = g->min_share;
248                 }
249                 UNLOCK_GROUP(g);
250                 CLAMPTO(share);
251         }
252
253         if (max_so_far < 0)
254                 max_so_far = 0;
255         return max_so_far;
256 }
257
258 ev_ssize_t
259 _bufferevent_get_read_max(struct bufferevent_private *bev)
260 {
261         return _bufferevent_get_rlim_max(bev, 0);
262 }
263
264 ev_ssize_t
265 _bufferevent_get_write_max(struct bufferevent_private *bev)
266 {
267         return _bufferevent_get_rlim_max(bev, 1);
268 }
269
270 int
271 _bufferevent_decrement_read_buckets(struct bufferevent_private *bev, ev_ssize_t bytes)
272 {
273         /* XXXXX Make sure all users of this function check its return value */
274         int r = 0;
275         /* need to hold lock on bev */
276         if (!bev->rate_limiting)
277                 return 0;
278
279         if (bev->rate_limiting->cfg) {
280                 bev->rate_limiting->limit.read_limit -= bytes;
281                 if (bev->rate_limiting->limit.read_limit <= 0) {
282                         bufferevent_suspend_read(&bev->bev, BEV_SUSPEND_BW);
283                         if (event_add(&bev->rate_limiting->refill_bucket_event,
284                                 &bev->rate_limiting->cfg->tick_timeout) < 0)
285                                 r = -1;
286                 }
287         }
288
289         if (bev->rate_limiting->group) {
290                 LOCK_GROUP(bev->rate_limiting->group);
291                 bev->rate_limiting->group->rate_limit.read_limit -= bytes;
292                 bev->rate_limiting->group->total_read += bytes;
293                 if (bev->rate_limiting->group->rate_limit.read_limit <= 0) {
294                         _bev_group_suspend_reading(bev->rate_limiting->group);
295                 }
296                 UNLOCK_GROUP(bev->rate_limiting->group);
297         }
298
299         return r;
300 }
301
302 int
303 _bufferevent_decrement_write_buckets(struct bufferevent_private *bev, ev_ssize_t bytes)
304 {
305         /* XXXXX Make sure all users of this function check its return value */
306         int r = 0;
307         /* need to hold lock */
308         if (!bev->rate_limiting)
309                 return 0;
310
311         if (bev->rate_limiting->cfg) {
312                 bev->rate_limiting->limit.write_limit -= bytes;
313                 if (bev->rate_limiting->limit.write_limit <= 0) {
314                         bufferevent_suspend_write(&bev->bev, BEV_SUSPEND_BW);
315                         if (event_add(&bev->rate_limiting->refill_bucket_event,
316                                 &bev->rate_limiting->cfg->tick_timeout) < 0)
317                                 r = -1;
318                 }
319         }
320
321         if (bev->rate_limiting->group) {
322                 LOCK_GROUP(bev->rate_limiting->group);
323                 bev->rate_limiting->group->rate_limit.write_limit -= bytes;
324                 bev->rate_limiting->group->total_written += bytes;
325                 if (bev->rate_limiting->group->rate_limit.write_limit <= 0) {
326                         _bev_group_suspend_writing(bev->rate_limiting->group);
327                 }
328                 UNLOCK_GROUP(bev->rate_limiting->group);
329         }
330
331         return r;
332 }
333
334 /** Stop reading on every bufferevent in <b>g</b> */
335 static int
336 _bev_group_suspend_reading(struct bufferevent_rate_limit_group *g)
337 {
338         /* Needs group lock */
339         struct bufferevent_private *bev;
340         g->read_suspended = 1;
341         g->pending_unsuspend_read = 0;
342
343         /* Note that in this loop we call EVLOCK_TRY_LOCK instead of BEV_LOCK,
344            to prevent a deadlock.  (Ordinarily, the group lock nests inside
345            the bufferevent locks.  If we are unable to lock any individual
346            bufferevent, it will find out later when it looks at its limit
347            and sees that its group is suspended.
348         */
349         TAILQ_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
350                 if (EVLOCK_TRY_LOCK(bev->lock)) {
351                         bufferevent_suspend_read(&bev->bev,
352                             BEV_SUSPEND_BW_GROUP);
353                         EVLOCK_UNLOCK(bev->lock, 0);
354                 }
355         }
356         return 0;
357 }
358
359 /** Stop writing on every bufferevent in <b>g</b> */
360 static int
361 _bev_group_suspend_writing(struct bufferevent_rate_limit_group *g)
362 {
363         /* Needs group lock */
364         struct bufferevent_private *bev;
365         g->write_suspended = 1;
366         g->pending_unsuspend_write = 0;
367         TAILQ_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
368                 if (EVLOCK_TRY_LOCK(bev->lock)) {
369                         bufferevent_suspend_write(&bev->bev,
370                             BEV_SUSPEND_BW_GROUP);
371                         EVLOCK_UNLOCK(bev->lock, 0);
372                 }
373         }
374         return 0;
375 }
376
377 /** Timer callback invoked on a single bufferevent with one or more exhausted
378     buckets when they are ready to refill. */
379 static void
380 _bev_refill_callback(evutil_socket_t fd, short what, void *arg)
381 {
382         unsigned tick;
383         struct timeval now;
384         struct bufferevent_private *bev = arg;
385         int again = 0;
386         BEV_LOCK(&bev->bev);
387         if (!bev->rate_limiting || !bev->rate_limiting->cfg) {
388                 BEV_UNLOCK(&bev->bev);
389                 return;
390         }
391
392         /* First, update the bucket */
393         event_base_gettimeofday_cached(bev->bev.ev_base, &now);
394         tick = ev_token_bucket_get_tick(&now,
395             bev->rate_limiting->cfg);
396         ev_token_bucket_update(&bev->rate_limiting->limit,
397             bev->rate_limiting->cfg,
398             tick);
399
400         /* Now unsuspend any read/write operations as appropriate. */
401         if ((bev->read_suspended & BEV_SUSPEND_BW)) {
402                 if (bev->rate_limiting->limit.read_limit > 0)
403                         bufferevent_unsuspend_read(&bev->bev, BEV_SUSPEND_BW);
404                 else
405                         again = 1;
406         }
407         if ((bev->write_suspended & BEV_SUSPEND_BW)) {
408                 if (bev->rate_limiting->limit.write_limit > 0)
409                         bufferevent_unsuspend_write(&bev->bev, BEV_SUSPEND_BW);
410                 else
411                         again = 1;
412         }
413         if (again) {
414                 /* One or more of the buckets may need another refill if they
415                    started negative.
416
417                    XXXX if we need to be quiet for more ticks, we should
418                    maybe figure out what timeout we really want.
419                 */
420                 /* XXXX Handle event_add failure somehow */
421                 event_add(&bev->rate_limiting->refill_bucket_event,
422                     &bev->rate_limiting->cfg->tick_timeout);
423         }
424         BEV_UNLOCK(&bev->bev);
425 }
426
427 /** Helper: grab a random element from a bufferevent group. */
428 static struct bufferevent_private *
429 _bev_group_random_element(struct bufferevent_rate_limit_group *group)
430 {
431         int which;
432         struct bufferevent_private *bev;
433
434         /* requires group lock */
435
436         if (!group->n_members)
437                 return NULL;
438
439         EVUTIL_ASSERT(! TAILQ_EMPTY(&group->members));
440
441         which = _evutil_weakrand() % group->n_members;
442
443         bev = TAILQ_FIRST(&group->members);
444         while (which--)
445                 bev = TAILQ_NEXT(bev, rate_limiting->next_in_group);
446
447         return bev;
448 }
449
450 /** Iterate over the elements of a rate-limiting group 'g' with a random
451     starting point, assigning each to the variable 'bev', and executing the
452     block 'block'.
453
454     We do this in a half-baked effort to get fairness among group members.
455     XXX Round-robin or some kind of priority queue would be even more fair.
456  */
457 #define FOREACH_RANDOM_ORDER(block)                     \
458         do {                                            \
459                 first = _bev_group_random_element(g);   \
460                 for (bev = first; bev != TAILQ_END(&g->members); \
461                     bev = TAILQ_NEXT(bev, rate_limiting->next_in_group)) { \
462                         block ;                                  \
463                 }                                                \
464                 for (bev = TAILQ_FIRST(&g->members); bev && bev != first; \
465                     bev = TAILQ_NEXT(bev, rate_limiting->next_in_group)) { \
466                         block ;                                         \
467                 }                                                       \
468         } while (0)
469
470 static void
471 _bev_group_unsuspend_reading(struct bufferevent_rate_limit_group *g)
472 {
473         int again = 0;
474         struct bufferevent_private *bev, *first;
475
476         g->read_suspended = 0;
477         FOREACH_RANDOM_ORDER({
478                 if (EVLOCK_TRY_LOCK(bev->lock)) {
479                         bufferevent_unsuspend_read(&bev->bev,
480                             BEV_SUSPEND_BW_GROUP);
481                         EVLOCK_UNLOCK(bev->lock, 0);
482                 } else {
483                         again = 1;
484                 }
485         });
486         g->pending_unsuspend_read = again;
487 }
488
489 static void
490 _bev_group_unsuspend_writing(struct bufferevent_rate_limit_group *g)
491 {
492         int again = 0;
493         struct bufferevent_private *bev, *first;
494         g->write_suspended = 0;
495
496         FOREACH_RANDOM_ORDER({
497                 if (EVLOCK_TRY_LOCK(bev->lock)) {
498                         bufferevent_unsuspend_write(&bev->bev,
499                             BEV_SUSPEND_BW_GROUP);
500                         EVLOCK_UNLOCK(bev->lock, 0);
501                 } else {
502                         again = 1;
503                 }
504         });
505         g->pending_unsuspend_write = again;
506 }
507
508 /** Callback invoked every tick to add more elements to the group bucket
509     and unsuspend group members as needed.
510  */
511 static void
512 _bev_group_refill_callback(evutil_socket_t fd, short what, void *arg)
513 {
514         struct bufferevent_rate_limit_group *g = arg;
515         unsigned tick;
516         struct timeval now;
517
518         event_base_gettimeofday_cached(event_get_base(&g->master_refill_event), &now);
519
520         LOCK_GROUP(g);
521         tick = ev_token_bucket_get_tick(&now, &g->rate_limit_cfg);
522         ev_token_bucket_update(&g->rate_limit, &g->rate_limit_cfg, tick);
523
524         if (g->pending_unsuspend_read ||
525             (g->read_suspended && (g->rate_limit.read_limit >= g->min_share))) {
526                 _bev_group_unsuspend_reading(g);
527         }
528         if (g->pending_unsuspend_write ||
529             (g->write_suspended && (g->rate_limit.write_limit >= g->min_share))){
530                 _bev_group_unsuspend_writing(g);
531         }
532
533         /* XXXX Rather than waiting to the next tick to unsuspend stuff
534          * with pending_unsuspend_write/read, we should do it on the
535          * next iteration of the mainloop.
536          */
537
538         UNLOCK_GROUP(g);
539 }
540
541 int
542 bufferevent_set_rate_limit(struct bufferevent *bev,
543     struct ev_token_bucket_cfg *cfg)
544 {
545         struct bufferevent_private *bevp =
546             EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
547         int r = -1;
548         struct bufferevent_rate_limit *rlim;
549         struct timeval now;
550         ev_uint32_t tick;
551         int reinit = 0, suspended = 0;
552         /* XXX reference-count cfg */
553
554         BEV_LOCK(bev);
555
556         if (cfg == NULL) {
557                 if (bevp->rate_limiting) {
558                         rlim = bevp->rate_limiting;
559                         rlim->cfg = NULL;
560                         bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW);
561                         bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW);
562                         if (event_initialized(&rlim->refill_bucket_event))
563                                 event_del(&rlim->refill_bucket_event);
564                 }
565                 r = 0;
566                 goto done;
567         }
568
569         event_base_gettimeofday_cached(bev->ev_base, &now);
570         tick = ev_token_bucket_get_tick(&now, cfg);
571
572         if (bevp->rate_limiting && bevp->rate_limiting->cfg == cfg) {
573                 /* no-op */
574                 r = 0;
575                 goto done;
576         }
577         if (bevp->rate_limiting == NULL) {
578                 rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
579                 if (!rlim)
580                         goto done;
581                 bevp->rate_limiting = rlim;
582         } else {
583                 rlim = bevp->rate_limiting;
584         }
585         reinit = rlim->cfg != NULL;
586
587         rlim->cfg = cfg;
588         ev_token_bucket_init(&rlim->limit, cfg, tick, reinit);
589
590         if (reinit) {
591                 EVUTIL_ASSERT(event_initialized(&rlim->refill_bucket_event));
592                 event_del(&rlim->refill_bucket_event);
593         }
594         evtimer_assign(&rlim->refill_bucket_event, bev->ev_base,
595             _bev_refill_callback, bevp);
596
597         if (rlim->limit.read_limit > 0) {
598                 bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW);
599         } else {
600                 bufferevent_suspend_read(bev, BEV_SUSPEND_BW);
601                 suspended=1;
602         }
603         if (rlim->limit.write_limit > 0) {
604                 bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW);
605         } else {
606                 bufferevent_suspend_write(bev, BEV_SUSPEND_BW);
607                 suspended = 1;
608         }
609
610         if (suspended)
611                 event_add(&rlim->refill_bucket_event, &cfg->tick_timeout);
612
613         r = 0;
614
615 done:
616         BEV_UNLOCK(bev);
617         return r;
618 }
619
620 struct bufferevent_rate_limit_group *
621 bufferevent_rate_limit_group_new(struct event_base *base,
622     const struct ev_token_bucket_cfg *cfg)
623 {
624         struct bufferevent_rate_limit_group *g;
625         struct timeval now;
626         ev_uint32_t tick;
627
628         event_base_gettimeofday_cached(base, &now);
629         tick = ev_token_bucket_get_tick(&now, cfg);
630
631         g = mm_calloc(1, sizeof(struct bufferevent_rate_limit_group));
632         if (!g)
633                 return NULL;
634         memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
635         TAILQ_INIT(&g->members);
636
637         ev_token_bucket_init(&g->rate_limit, cfg, tick, 0);
638
639         g->min_share = 64;
640         event_assign(&g->master_refill_event, base, -1, EV_PERSIST,
641             _bev_group_refill_callback, g);
642         /*XXXX handle event_add failure */
643         event_add(&g->master_refill_event, &cfg->tick_timeout);
644
645         EVTHREAD_ALLOC_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
646         return g;
647 }
648
649 int
650 bufferevent_rate_limit_group_set_cfg(
651         struct bufferevent_rate_limit_group *g,
652         const struct ev_token_bucket_cfg *cfg)
653 {
654         int same_tick;
655         if (!g || !cfg)
656                 return -1;
657
658         LOCK_GROUP(g);
659         same_tick = evutil_timercmp(
660                 &g->rate_limit_cfg.tick_timeout, &cfg->tick_timeout, ==);
661         memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
662
663         if (g->rate_limit.read_limit > (ev_ssize_t)cfg->read_maximum)
664                 g->rate_limit.read_limit = cfg->read_maximum;
665         if (g->rate_limit.write_limit > (ev_ssize_t)cfg->write_maximum)
666                 g->rate_limit.write_limit = cfg->write_maximum;
667
668         if (!same_tick) {
669                 /* This can cause a hiccup in the schedule */
670                 event_add(&g->master_refill_event, &cfg->tick_timeout);
671         }
672
673         UNLOCK_GROUP(g);
674         return 0;
675 }
676
677 int
678 bufferevent_rate_limit_group_set_min_share(
679         struct bufferevent_rate_limit_group *g,
680         size_t share)
681 {
682         if (share > EV_SSIZE_MAX)
683                 return -1;
684
685         g->min_share = share;
686         return 0;
687 }
688
689 void
690 bufferevent_rate_limit_group_free(struct bufferevent_rate_limit_group *g)
691 {
692         LOCK_GROUP(g);
693         EVUTIL_ASSERT(0 == g->n_members);
694         event_del(&g->master_refill_event);
695         UNLOCK_GROUP(g);
696         EVTHREAD_FREE_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
697         mm_free(g);
698 }
699
700 int
701 bufferevent_add_to_rate_limit_group(struct bufferevent *bev,
702     struct bufferevent_rate_limit_group *g)
703 {
704         int wsuspend, rsuspend;
705         struct bufferevent_private *bevp =
706             EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
707         BEV_LOCK(bev);
708
709         if (!bevp->rate_limiting) {
710                 struct bufferevent_rate_limit *rlim;
711                 rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
712                 if (!rlim) {
713                         BEV_UNLOCK(bev);
714                         return -1;
715                 }
716                 evtimer_assign(&rlim->refill_bucket_event, bev->ev_base,
717                     _bev_refill_callback, bevp);
718                 bevp->rate_limiting = rlim;
719         }
720
721         if (bevp->rate_limiting->group == g) {
722                 BEV_UNLOCK(bev);
723                 return 0;
724         }
725         if (bevp->rate_limiting->group)
726                 bufferevent_remove_from_rate_limit_group(bev);
727
728         LOCK_GROUP(g);
729         bevp->rate_limiting->group = g;
730         ++g->n_members;
731         TAILQ_INSERT_TAIL(&g->members, bevp, rate_limiting->next_in_group);
732
733         rsuspend = g->read_suspended;
734         wsuspend = g->write_suspended;
735
736         UNLOCK_GROUP(g);
737
738         if (rsuspend)
739                 bufferevent_suspend_read(bev, BEV_SUSPEND_BW_GROUP);
740         if (wsuspend)
741                 bufferevent_suspend_write(bev, BEV_SUSPEND_BW_GROUP);
742
743         BEV_UNLOCK(bev);
744         return 0;
745 }
746
747 int
748 bufferevent_remove_from_rate_limit_group(struct bufferevent *bev)
749 {
750         return bufferevent_remove_from_rate_limit_group_internal(bev, 1);
751 }
752
753 int
754 bufferevent_remove_from_rate_limit_group_internal(struct bufferevent *bev,
755     int unsuspend)
756 {
757         struct bufferevent_private *bevp =
758             EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
759         BEV_LOCK(bev);
760         if (bevp->rate_limiting && bevp->rate_limiting->group) {
761                 struct bufferevent_rate_limit_group *g =
762                     bevp->rate_limiting->group;
763                 LOCK_GROUP(g);
764                 bevp->rate_limiting->group = NULL;
765                 --g->n_members;
766                 TAILQ_REMOVE(&g->members, bevp, rate_limiting->next_in_group);
767                 UNLOCK_GROUP(g);
768         }
769         if (unsuspend) {
770                 bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW_GROUP);
771                 bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW_GROUP);
772         }
773         BEV_UNLOCK(bev);
774         return 0;
775 }
776
777 /* ===
778  * API functions to expose rate limits.
779  *
780  * Don't use these from inside Libevent; they're meant to be for use by
781  * the program.
782  * === */
783
784 /* Mostly you don't want to use this function from inside libevent;
785  * _bufferevent_get_read_max() is more likely what you want*/
786 ev_ssize_t
787 bufferevent_get_read_limit(struct bufferevent *bev)
788 {
789         ev_ssize_t r;
790         struct bufferevent_private *bevp;
791         BEV_LOCK(bev);
792         bevp = BEV_UPCAST(bev);
793         if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
794                 bufferevent_update_buckets(bevp);
795                 r = bevp->rate_limiting->limit.read_limit;
796         } else {
797                 r = EV_SSIZE_MAX;
798         }
799         BEV_UNLOCK(bev);
800         return r;
801 }
802
803 /* Mostly you don't want to use this function from inside libevent;
804  * _bufferevent_get_write_max() is more likely what you want*/
805 ev_ssize_t
806 bufferevent_get_write_limit(struct bufferevent *bev)
807 {
808         ev_ssize_t r;
809         struct bufferevent_private *bevp;
810         BEV_LOCK(bev);
811         bevp = BEV_UPCAST(bev);
812         if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
813                 bufferevent_update_buckets(bevp);
814                 r = bevp->rate_limiting->limit.write_limit;
815         } else {
816                 r = EV_SSIZE_MAX;
817         }
818         BEV_UNLOCK(bev);
819         return r;
820 }
821
822 ev_ssize_t
823 bufferevent_get_max_to_read(struct bufferevent *bev)
824 {
825         ev_ssize_t r;
826         BEV_LOCK(bev);
827         r = _bufferevent_get_read_max(BEV_UPCAST(bev));
828         BEV_UNLOCK(bev);
829         return r;
830 }
831
832 ev_ssize_t
833 bufferevent_get_max_to_write(struct bufferevent *bev)
834 {
835         ev_ssize_t r;
836         BEV_LOCK(bev);
837         r = _bufferevent_get_write_max(BEV_UPCAST(bev));
838         BEV_UNLOCK(bev);
839         return r;
840 }
841
842
843 /* Mostly you don't want to use this function from inside libevent;
844  * _bufferevent_get_read_max() is more likely what you want*/
845 ev_ssize_t
846 bufferevent_rate_limit_group_get_read_limit(
847         struct bufferevent_rate_limit_group *grp)
848 {
849         ev_ssize_t r;
850         LOCK_GROUP(grp);
851         r = grp->rate_limit.read_limit;
852         UNLOCK_GROUP(grp);
853         return r;
854 }
855
856 /* Mostly you don't want to use this function from inside libevent;
857  * _bufferevent_get_write_max() is more likely what you want. */
858 ev_ssize_t
859 bufferevent_rate_limit_group_get_write_limit(
860         struct bufferevent_rate_limit_group *grp)
861 {
862         ev_ssize_t r;
863         LOCK_GROUP(grp);
864         r = grp->rate_limit.write_limit;
865         UNLOCK_GROUP(grp);
866         return r;
867 }
868
869 int
870 bufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr)
871 {
872         int r = 0;
873         ev_ssize_t old_limit, new_limit;
874         struct bufferevent_private *bevp;
875         BEV_LOCK(bev);
876         bevp = BEV_UPCAST(bev);
877         EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
878         old_limit = bevp->rate_limiting->limit.read_limit;
879
880         new_limit = (bevp->rate_limiting->limit.read_limit -= decr);
881         if (old_limit > 0 && new_limit <= 0) {
882                 bufferevent_suspend_read(bev, BEV_SUSPEND_BW);
883                 if (event_add(&bevp->rate_limiting->refill_bucket_event,
884                         &bevp->rate_limiting->cfg->tick_timeout) < 0)
885                         r = -1;
886         } else if (old_limit <= 0 && new_limit > 0) {
887                 if (!(bevp->write_suspended & BEV_SUSPEND_BW))
888                         event_del(&bevp->rate_limiting->refill_bucket_event);
889                 bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW);
890         }
891
892         BEV_UNLOCK(bev);
893         return r;
894 }
895
896 int
897 bufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr)
898 {
899         /* XXXX this is mostly copy-and-paste from
900          * bufferevent_decrement_read_limit */
901         int r = 0;
902         ev_ssize_t old_limit, new_limit;
903         struct bufferevent_private *bevp;
904         BEV_LOCK(bev);
905         bevp = BEV_UPCAST(bev);
906         EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
907         old_limit = bevp->rate_limiting->limit.write_limit;
908
909         new_limit = (bevp->rate_limiting->limit.write_limit -= decr);
910         if (old_limit > 0 && new_limit <= 0) {
911                 bufferevent_suspend_write(bev, BEV_SUSPEND_BW);
912                 if (event_add(&bevp->rate_limiting->refill_bucket_event,
913                         &bevp->rate_limiting->cfg->tick_timeout) < 0)
914                         r = -1;
915         } else if (old_limit <= 0 && new_limit > 0) {
916                 if (!(bevp->read_suspended & BEV_SUSPEND_BW))
917                         event_del(&bevp->rate_limiting->refill_bucket_event);
918                 bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW);
919         }
920
921         BEV_UNLOCK(bev);
922         return r;
923 }
924
925 int
926 bufferevent_rate_limit_group_decrement_read(
927         struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
928 {
929         int r = 0;
930         ev_ssize_t old_limit, new_limit;
931         LOCK_GROUP(grp);
932         old_limit = grp->rate_limit.read_limit;
933         new_limit = (grp->rate_limit.read_limit -= decr);
934
935         if (old_limit > 0 && new_limit <= 0) {
936                 _bev_group_suspend_reading(grp);
937         } else if (old_limit <= 0 && new_limit > 0) {
938                 _bev_group_unsuspend_reading(grp);
939         }
940
941         UNLOCK_GROUP(grp);
942         return r;
943 }
944
945 int
946 bufferevent_rate_limit_group_decrement_write(
947         struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
948 {
949         int r = 0;
950         ev_ssize_t old_limit, new_limit;
951         LOCK_GROUP(grp);
952         old_limit = grp->rate_limit.write_limit;
953         new_limit = (grp->rate_limit.write_limit -= decr);
954
955         if (old_limit > 0 && new_limit <= 0) {
956                 _bev_group_suspend_writing(grp);
957         } else if (old_limit <= 0 && new_limit > 0) {
958                 _bev_group_unsuspend_writing(grp);
959         }
960
961         UNLOCK_GROUP(grp);
962         return r;
963 }
964
965 void
966 bufferevent_rate_limit_group_get_totals(struct bufferevent_rate_limit_group *grp,
967     ev_uint64_t *total_read_out, ev_uint64_t *total_written_out)
968 {
969         EVUTIL_ASSERT(grp != NULL);
970         if (total_read_out)
971                 *total_read_out = grp->total_read;
972         if (total_written_out)
973                 *total_written_out = grp->total_written;
974 }
975
976 void
977 bufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group *grp)
978 {
979         grp->total_read = grp->total_written = 0;
980 }