]> arthur.barton.de Git - netatalk.git/blob - libevent/test/test-ratelim.c
Merge master
[netatalk.git] / libevent / test / test-ratelim.c
1 /*
2  * Copyright (c) 2009-2010 Niels Provos and Nick Mathewson
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions
6  * are met:
7  * 1. Redistributions of source code must retain the above copyright
8  *    notice, this list of conditions and the following disclaimer.
9  * 2. Redistributions in binary form must reproduce the above copyright
10  *    notice, this list of conditions and the following disclaimer in the
11  *    documentation and/or other materials provided with the distribution.
12  * 3. The name of the author may not be used to endorse or promote products
13  *    derived from this software without specific prior written permission.
14  *
15  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
16  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
17  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
18  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
19  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
20  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
21  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
22  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
23  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
24  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
25  */
26
27 #include <stdio.h>
28 #include <stdlib.h>
29 #include <string.h>
30 #include <assert.h>
31 #include <math.h>
32
33 #ifdef WIN32
34 #include <winsock2.h>
35 #include <ws2tcpip.h>
36 #else
37 #include <sys/socket.h>
38 #include <netinet/in.h>
39 # ifdef _XOPEN_SOURCE_EXTENDED
40 #  include <arpa/inet.h>
41 # endif
42 #endif
43 #include <signal.h>
44
45 #include "event2/bufferevent.h"
46 #include "event2/buffer.h"
47 #include "event2/event.h"
48 #include "event2/util.h"
49 #include "event2/listener.h"
50 #include "event2/thread.h"
51
52 #include "../util-internal.h"
53
54 static int cfg_verbose = 0;
55 static int cfg_help = 0;
56
57 static int cfg_n_connections = 30;
58 static int cfg_duration = 5;
59 static int cfg_connlimit = 0;
60 static int cfg_grouplimit = 0;
61 static int cfg_tick_msec = 1000;
62 static int cfg_min_share = -1;
63
64 static int cfg_connlimit_tolerance = -1;
65 static int cfg_grouplimit_tolerance = -1;
66 static int cfg_stddev_tolerance = -1;
67
68 static struct timeval cfg_tick = { 0, 500*1000 };
69
70 static struct ev_token_bucket_cfg *conn_bucket_cfg = NULL;
71 static struct ev_token_bucket_cfg *group_bucket_cfg = NULL;
72 struct bufferevent_rate_limit_group *ratelim_group = NULL;
73 static double seconds_per_tick = 0.0;
74
75 struct client_state {
76         size_t queued;
77         ev_uint64_t received;
78 };
79
80 static int n_echo_conns_open = 0;
81
82 static void
83 loud_writecb(struct bufferevent *bev, void *ctx)
84 {
85         struct client_state *cs = ctx;
86         struct evbuffer *output = bufferevent_get_output(bev);
87         char buf[1024];
88 #ifdef WIN32
89         int r = rand() % 256;
90 #else
91         int r = random() % 256;
92 #endif
93         memset(buf, r, sizeof(buf));
94         while (evbuffer_get_length(output) < 8192) {
95                 evbuffer_add(output, buf, sizeof(buf));
96                 cs->queued += sizeof(buf);
97         }
98 }
99
100 static void
101 discard_readcb(struct bufferevent *bev, void *ctx)
102 {
103         struct client_state *cs = ctx;
104         struct evbuffer *input = bufferevent_get_input(bev);
105         size_t len = evbuffer_get_length(input);
106         evbuffer_drain(input, len);
107         cs->received += len;
108 }
109
110 static void
111 write_on_connectedcb(struct bufferevent *bev, short what, void *ctx)
112 {
113         if (what & BEV_EVENT_CONNECTED) {
114                 loud_writecb(bev, ctx);
115                 /* XXXX this shouldn't be needed. */
116                 bufferevent_enable(bev, EV_READ|EV_WRITE);
117         }
118 }
119
120 static void
121 echo_readcb(struct bufferevent *bev, void *ctx)
122 {
123         struct evbuffer *input = bufferevent_get_input(bev);
124         struct evbuffer *output = bufferevent_get_output(bev);
125
126         evbuffer_add_buffer(output, input);
127         if (evbuffer_get_length(output) > 1024000)
128                 bufferevent_disable(bev, EV_READ);
129 }
130
131 static void
132 echo_writecb(struct bufferevent *bev, void *ctx)
133 {
134         struct evbuffer *output = bufferevent_get_output(bev);
135         if (evbuffer_get_length(output) < 512000)
136                 bufferevent_enable(bev, EV_READ);
137 }
138
139 static void
140 echo_eventcb(struct bufferevent *bev, short what, void *ctx)
141 {
142         if (what & (BEV_EVENT_EOF|BEV_EVENT_ERROR)) {
143                 --n_echo_conns_open;
144                 bufferevent_free(bev);
145         }
146 }
147
148 static void
149 echo_listenercb(struct evconnlistener *listener, evutil_socket_t newsock,
150     struct sockaddr *sourceaddr, int socklen, void *ctx)
151 {
152         struct event_base *base = ctx;
153         int flags = BEV_OPT_CLOSE_ON_FREE|BEV_OPT_THREADSAFE;
154         struct bufferevent *bev;
155
156         bev = bufferevent_socket_new(base, newsock, flags);
157         bufferevent_setcb(bev, echo_readcb, echo_writecb, echo_eventcb, NULL);
158         if (conn_bucket_cfg)
159                 bufferevent_set_rate_limit(bev, conn_bucket_cfg);
160         if (ratelim_group)
161                 bufferevent_add_to_rate_limit_group(bev, ratelim_group);
162         ++n_echo_conns_open;
163         bufferevent_enable(bev, EV_READ|EV_WRITE);
164 }
165
166 static int
167 test_ratelimiting(void)
168 {
169         struct event_base *base;
170         struct sockaddr_in sin;
171         struct evconnlistener *listener;
172
173         struct sockaddr_storage ss;
174         ev_socklen_t slen;
175
176         struct bufferevent **bevs;
177         struct client_state *states;
178         struct bufferevent_rate_limit_group *group = NULL;
179
180         int i;
181
182         struct timeval tv;
183
184         ev_uint64_t total_received;
185         double total_sq_persec, total_persec;
186         double variance;
187         double expected_total_persec = -1.0, expected_avg_persec = -1.0;
188         int ok = 1;
189
190         memset(&sin, 0, sizeof(sin));
191         sin.sin_family = AF_INET;
192         sin.sin_addr.s_addr = htonl(0x7f000001); /* 127.0.0.1 */
193         sin.sin_port = 0; /* unspecified port */
194
195         if (0)
196                 event_enable_debug_mode();
197
198         base = event_base_new();
199
200         listener = evconnlistener_new_bind(base, echo_listenercb, base,
201             LEV_OPT_CLOSE_ON_FREE|LEV_OPT_REUSEABLE, -1,
202             (struct sockaddr *)&sin, sizeof(sin));
203
204         slen = sizeof(ss);
205         if (getsockname(evconnlistener_get_fd(listener), (struct sockaddr *)&ss,
206                 &slen) < 0) {
207                 perror("getsockname");
208                 return 1;
209         }
210
211         if (cfg_connlimit > 0) {
212                 conn_bucket_cfg = ev_token_bucket_cfg_new(
213                         cfg_connlimit, cfg_connlimit * 4,
214                         cfg_connlimit, cfg_connlimit * 4,
215                         &cfg_tick);
216                 assert(conn_bucket_cfg);
217         }
218
219         if (cfg_grouplimit > 0) {
220                 group_bucket_cfg = ev_token_bucket_cfg_new(
221                         cfg_grouplimit, cfg_grouplimit * 4,
222                         cfg_grouplimit, cfg_grouplimit * 4,
223                         &cfg_tick);
224                 group = ratelim_group = bufferevent_rate_limit_group_new(
225                         base, group_bucket_cfg);
226                 expected_total_persec = cfg_grouplimit;
227                 expected_avg_persec = cfg_grouplimit / cfg_n_connections;
228                 if (cfg_connlimit > 0 && expected_avg_persec > cfg_connlimit)
229                         expected_avg_persec = cfg_connlimit;
230                 if (cfg_min_share >= 0)
231                         bufferevent_rate_limit_group_set_min_share(
232                                 ratelim_group, cfg_min_share);
233         }
234
235         if (expected_avg_persec < 0 && cfg_connlimit > 0)
236                 expected_avg_persec = cfg_connlimit;
237
238         if (expected_avg_persec > 0)
239                 expected_avg_persec /= seconds_per_tick;
240         if (expected_total_persec > 0)
241                 expected_total_persec /= seconds_per_tick;
242
243         bevs = calloc(cfg_n_connections, sizeof(struct bufferevent *));
244         states = calloc(cfg_n_connections, sizeof(struct client_state));
245
246         for (i = 0; i < cfg_n_connections; ++i) {
247                 bevs[i] = bufferevent_socket_new(base, -1,
248                     BEV_OPT_CLOSE_ON_FREE|BEV_OPT_THREADSAFE);
249                 assert(bevs[i]);
250                 bufferevent_setcb(bevs[i], discard_readcb, loud_writecb,
251                     write_on_connectedcb, &states[i]);
252                 bufferevent_enable(bevs[i], EV_READ|EV_WRITE);
253                 bufferevent_socket_connect(bevs[i], (struct sockaddr *)&ss,
254                     slen);
255         }
256
257         tv.tv_sec = cfg_duration - 1;
258         tv.tv_usec = 995000;
259
260         event_base_loopexit(base, &tv);
261
262         event_base_dispatch(base);
263
264         ratelim_group = NULL; /* So no more responders get added */
265
266         for (i = 0; i < cfg_n_connections; ++i) {
267                 bufferevent_free(bevs[i]);
268         }
269         evconnlistener_free(listener);
270
271         /* Make sure no new echo_conns get added to the group. */
272         ratelim_group = NULL;
273
274         /* This should get _everybody_ freed */
275         while (n_echo_conns_open) {
276                 printf("waiting for %d conns\n", n_echo_conns_open);
277                 tv.tv_sec = 0;
278                 tv.tv_usec = 300000;
279                 event_base_loopexit(base, &tv);
280                 event_base_dispatch(base);
281         }
282
283         if (group)
284                 bufferevent_rate_limit_group_free(group);
285
286         total_received = 0;
287         total_persec = 0.0;
288         total_sq_persec = 0.0;
289         for (i=0; i < cfg_n_connections; ++i) {
290                 double persec = states[i].received;
291                 persec /= cfg_duration;
292                 total_received += states[i].received;
293                 total_persec += persec;
294                 total_sq_persec += persec*persec;
295                 printf("%d: %f per second\n", i+1, persec);
296         }
297         printf("   total: %f per second\n",
298             ((double)total_received)/cfg_duration);
299         if (expected_total_persec > 0) {
300                 double diff = expected_total_persec -
301                     ((double)total_received/cfg_duration);
302                 printf("  [Off by %lf]\n", diff);
303                 if (cfg_grouplimit_tolerance > 0 &&
304                     fabs(diff) > cfg_grouplimit_tolerance) {
305                         fprintf(stderr, "Group bandwidth out of bounds\n");
306                         ok = 0;
307                 }
308         }
309
310         printf(" average: %f per second\n",
311             (((double)total_received)/cfg_duration)/cfg_n_connections);
312         if (expected_avg_persec > 0) {
313                 double diff = expected_avg_persec - (((double)total_received)/cfg_duration)/cfg_n_connections;
314                 printf("  [Off by %lf]\n", diff);
315                 if (cfg_connlimit_tolerance > 0 &&
316                     fabs(diff) > cfg_connlimit_tolerance) {
317                         fprintf(stderr, "Connection bandwidth out of bounds\n");
318                         ok = 0;
319                 }
320         }
321
322         variance = total_sq_persec/cfg_n_connections - total_persec*total_persec/(cfg_n_connections*cfg_n_connections);
323
324         printf("  stddev: %f per second\n", sqrt(variance));
325         if (cfg_stddev_tolerance > 0 &&
326             sqrt(variance) > cfg_stddev_tolerance) {
327                 fprintf(stderr, "Connection variance out of bounds\n");
328                 ok = 0;
329         }
330
331         event_base_free(base);
332         free(bevs);
333         free(states);
334
335         return ok ? 0 : 1;
336 }
337
338 static struct option {
339         const char *name; int *ptr; int min; int isbool;
340 } options[] = {
341         { "-v", &cfg_verbose, 0, 1 },
342         { "-h", &cfg_help, 0, 1 },
343         { "-n", &cfg_n_connections, 1, 0 },
344         { "-d", &cfg_duration, 1, 0 },
345         { "-c", &cfg_connlimit, 0, 0 },
346         { "-g", &cfg_grouplimit, 0, 0 },
347         { "-t", &cfg_tick_msec, 10, 0 },
348         { "--min-share", &cfg_min_share, 0, 0 },
349         { "--check-connlimit", &cfg_connlimit_tolerance, 0, 0 },
350         { "--check-grouplimit", &cfg_grouplimit_tolerance, 0, 0 },
351         { "--check-stddev", &cfg_stddev_tolerance, 0, 0 },
352         { NULL, NULL, -1, 0 },
353 };
354
355 static int
356 handle_option(int argc, char **argv, int *i, const struct option *opt)
357 {
358         long val;
359         char *endptr = NULL;
360         if (opt->isbool) {
361                 *opt->ptr = 1;
362                 return 0;
363         }
364         if (*i + 1 == argc) {
365                 fprintf(stderr, "Too few arguments to '%s'\n",argv[*i]);
366                 return -1;
367         }
368         val = strtol(argv[*i+1], &endptr, 10);
369         if (*argv[*i+1] == '\0' || !endptr || *endptr != '\0') {
370                 fprintf(stderr, "Couldn't parse numeric value '%s'\n",
371                     argv[*i+1]);
372                 return -1;
373         }
374         if (val < opt->min || val > 0x7fffffff) {
375                 fprintf(stderr, "Value '%s' is out-of-range'\n",
376                     argv[*i+1]);
377                 return -1;
378         }
379         *opt->ptr = (int)val;
380         ++*i;
381         return 0;
382 }
383
384 static void
385 usage(void)
386 {
387         fprintf(stderr,
388 "test-ratelim [-v] [-n INT] [-d INT] [-c INT] [-g INT] [-t INT]\n\n"
389 "Pushes bytes through a number of possibly rate-limited connections, and\n"
390 "displays average throughput.\n\n"
391 "  -n INT: Number of connections to open (default: 30)\n"
392 "  -d INT: Duration of the test in seconds (default: 5 sec)\n");
393         fprintf(stderr,
394 "  -c INT: Connection-rate limit applied to each connection in bytes per second\n"
395 "          (default: None.)\n"
396 "  -g INT: Group-rate limit applied to sum of all usage in bytes per second\n"
397 "          (default: None.)\n"
398 "  -t INT: Granularity of timing, in milliseconds (default: 1000 msec)\n");
399 }
400
401 int
402 main(int argc, char **argv)
403 {
404         int i,j;
405         double ratio;
406
407 #ifdef WIN32
408         WORD wVersionRequested = MAKEWORD(2,2);
409         WSADATA wsaData;
410         int err;
411
412         err = WSAStartup(wVersionRequested, &wsaData);
413 #endif
414
415 #ifndef WIN32
416         if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
417                 return 1;
418 #endif
419         for (i = 1; i < argc; ++i) {
420                 for (j = 0; options[j].name; ++j) {
421                         if (!strcmp(argv[i],options[j].name)) {
422                                 if (handle_option(argc,argv,&i,&options[j])<0)
423                                         return 1;
424                                 goto again;
425                         }
426                 }
427                 fprintf(stderr, "Unknown option '%s'\n", argv[i]);
428                 usage();
429                 return 1;
430         again:
431                 ;
432         }
433         if (cfg_help) {
434                 usage();
435                 return 0;
436         }
437
438         cfg_tick.tv_sec = cfg_tick_msec / 1000;
439         cfg_tick.tv_usec = (cfg_tick_msec % 1000)*1000;
440
441         seconds_per_tick = ratio = cfg_tick_msec / 1000.0;
442
443         cfg_connlimit *= ratio;
444         cfg_grouplimit *= ratio;
445
446         {
447                 struct timeval tv;
448                 evutil_gettimeofday(&tv, NULL);
449 #ifdef WIN32
450                 srand(tv.tv_usec);
451 #else
452                 srandom(tv.tv_usec);
453 #endif
454         }
455
456 #ifndef _EVENT_DISABLE_THREAD_SUPPORT
457         evthread_enable_lock_debuging();
458 #endif
459
460         return test_ratelimiting();
461 }