]> arthur.barton.de Git - bup.git/blob - lib/tornado/ioloop.py
Always publish (l)utimes in helpers when available and fix type conversions.
[bup.git] / lib / tornado / ioloop.py
1 #!/usr/bin/env python
2 #
3 # Copyright 2009 Facebook
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License"); you may
6 # not use this file except in compliance with the License. You may obtain
7 # a copy of the License at
8 #
9 #     http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14 # License for the specific language governing permissions and limitations
15 # under the License.
16
17 """A level-triggered I/O loop for non-blocking sockets."""
18
19 import bisect
20 import errno
21 import os
22 import logging
23 import select
24 import time
25 import traceback
26
27 try:
28     import signal
29 except ImportError:
30     signal = None
31
32 try:
33     import fcntl
34 except ImportError:
35     if os.name == 'nt':
36         import win32_support
37         import win32_support as fcntl
38     else:
39         raise
40
41 class IOLoop(object):
42     """A level-triggered I/O loop.
43
44     We use epoll if it is available, or else we fall back on select(). If
45     you are implementing a system that needs to handle 1000s of simultaneous
46     connections, you should use Linux and either compile our epoll module or
47     use Python 2.6+ to get epoll support.
48
49     Example usage for a simple TCP server:
50
51         import errno
52         import functools
53         import ioloop
54         import socket
55
56         def connection_ready(sock, fd, events):
57             while True:
58                 try:
59                     connection, address = sock.accept()
60                 except socket.error, e:
61                     if e[0] not in (errno.EWOULDBLOCK, errno.EAGAIN):
62                         raise
63                     return
64                 connection.setblocking(0)
65                 handle_connection(connection, address)
66
67         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
68         sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
69         sock.setblocking(0)
70         sock.bind(("", port))
71         sock.listen(128)
72
73         io_loop = ioloop.IOLoop.instance()
74         callback = functools.partial(connection_ready, sock)
75         io_loop.add_handler(sock.fileno(), callback, io_loop.READ)
76         io_loop.start()
77
78     """
79     # Constants from the epoll module
80     _EPOLLIN = 0x001
81     _EPOLLPRI = 0x002
82     _EPOLLOUT = 0x004
83     _EPOLLERR = 0x008
84     _EPOLLHUP = 0x010
85     _EPOLLRDHUP = 0x2000
86     _EPOLLONESHOT = (1 << 30)
87     _EPOLLET = (1 << 31)
88
89     # Our events map exactly to the epoll events
90     NONE = 0
91     READ = _EPOLLIN
92     WRITE = _EPOLLOUT
93     ERROR = _EPOLLERR | _EPOLLHUP | _EPOLLRDHUP
94
95     def __init__(self, impl=None):
96         self._impl = impl or _poll()
97         if hasattr(self._impl, 'fileno'):
98             self._set_close_exec(self._impl.fileno())
99         self._handlers = {}
100         self._events = {}
101         self._callbacks = set()
102         self._timeouts = []
103         self._running = False
104         self._stopped = False
105         self._blocking_log_threshold = None
106
107         # Create a pipe that we send bogus data to when we want to wake
108         # the I/O loop when it is idle
109         if os.name != 'nt':
110             r, w = os.pipe()
111             self._set_nonblocking(r)
112             self._set_nonblocking(w)
113             self._set_close_exec(r)
114             self._set_close_exec(w)
115             self._waker_reader = os.fdopen(r, "r", 0)
116             self._waker_writer = os.fdopen(w, "w", 0)
117         else:
118             self._waker_reader = self._waker_writer = win32_support.Pipe()
119             r = self._waker_writer.reader_fd
120         self.add_handler(r, self._read_waker, self.READ)
121
122     @classmethod
123     def instance(cls):
124         """Returns a global IOLoop instance.
125
126         Most single-threaded applications have a single, global IOLoop.
127         Use this method instead of passing around IOLoop instances
128         throughout your code.
129
130         A common pattern for classes that depend on IOLoops is to use
131         a default argument to enable programs with multiple IOLoops
132         but not require the argument for simpler applications:
133
134             class MyClass(object):
135                 def __init__(self, io_loop=None):
136                     self.io_loop = io_loop or IOLoop.instance()
137         """
138         if not hasattr(cls, "_instance"):
139             cls._instance = cls()
140         return cls._instance
141
142     @classmethod
143     def initialized(cls):
144         return hasattr(cls, "_instance")
145
146     def add_handler(self, fd, handler, events):
147         """Registers the given handler to receive the given events for fd."""
148         self._handlers[fd] = handler
149         self._impl.register(fd, events | self.ERROR)
150
151     def update_handler(self, fd, events):
152         """Changes the events we listen for fd."""
153         self._impl.modify(fd, events | self.ERROR)
154
155     def remove_handler(self, fd):
156         """Stop listening for events on fd."""
157         self._handlers.pop(fd, None)
158         self._events.pop(fd, None)
159         try:
160             self._impl.unregister(fd)
161         except (OSError, IOError):
162             logging.debug("Error deleting fd from IOLoop", exc_info=True)
163
164     def set_blocking_log_threshold(self, s):
165         """Logs a stack trace if the ioloop is blocked for more than s seconds.
166         Pass None to disable.  Requires python 2.6 on a unixy platform.
167         """
168         if not hasattr(signal, "setitimer"):
169             logging.error("set_blocking_log_threshold requires a signal module "
170                        "with the setitimer method")
171             return
172         self._blocking_log_threshold = s
173         if s is not None:
174             signal.signal(signal.SIGALRM, self._handle_alarm)
175
176     def _handle_alarm(self, signal, frame):
177         logging.warning('IOLoop blocked for %f seconds in\n%s',
178                      self._blocking_log_threshold,
179                      ''.join(traceback.format_stack(frame)))
180
181     def start(self):
182         """Starts the I/O loop.
183
184         The loop will run until one of the I/O handlers calls stop(), which
185         will make the loop stop after the current event iteration completes.
186         """
187         if self._stopped:
188             self._stopped = False
189             return
190         self._running = True
191         while True:
192             # Never use an infinite timeout here - it can stall epoll
193             poll_timeout = 0.2
194
195             # Prevent IO event starvation by delaying new callbacks
196             # to the next iteration of the event loop.
197             callbacks = list(self._callbacks)
198             for callback in callbacks:
199                 # A callback can add or remove other callbacks
200                 if callback in self._callbacks:
201                     self._callbacks.remove(callback)
202                     self._run_callback(callback)
203
204             if self._callbacks:
205                 poll_timeout = 0.0
206
207             if self._timeouts:
208                 now = time.time()
209                 while self._timeouts and self._timeouts[0].deadline <= now:
210                     timeout = self._timeouts.pop(0)
211                     self._run_callback(timeout.callback)
212                 if self._timeouts:
213                     milliseconds = self._timeouts[0].deadline - now
214                     poll_timeout = min(milliseconds, poll_timeout)
215
216             if not self._running:
217                 break
218
219             if self._blocking_log_threshold is not None:
220                 # clear alarm so it doesn't fire while poll is waiting for
221                 # events.
222                 signal.setitimer(signal.ITIMER_REAL, 0, 0)
223
224             try:
225                 event_pairs = self._impl.poll(poll_timeout)
226             except Exception, e:
227                 # Depending on python version and IOLoop implementation,
228                 # different exception types may be thrown and there are
229                 # two ways EINTR might be signaled:
230                 # * e.errno == errno.EINTR
231                 # * e.args is like (errno.EINTR, 'Interrupted system call')
232                 if (getattr(e, 'errno') == errno.EINTR or
233                     (isinstance(getattr(e, 'args'), tuple) and
234                      len(e.args) == 2 and e.args[0] == errno.EINTR)):
235                     logging.warning("Interrupted system call", exc_info=1)
236                     continue
237                 else:
238                     raise
239
240             if self._blocking_log_threshold is not None:
241                 signal.setitimer(signal.ITIMER_REAL,
242                                  self._blocking_log_threshold, 0)
243
244             # Pop one fd at a time from the set of pending fds and run
245             # its handler. Since that handler may perform actions on
246             # other file descriptors, there may be reentrant calls to
247             # this IOLoop that update self._events
248             self._events.update(event_pairs)
249             while self._events:
250                 fd, events = self._events.popitem()
251                 try:
252                     self._handlers[fd](fd, events)
253                 except (KeyboardInterrupt, SystemExit):
254                     raise
255                 except (OSError, IOError), e:
256                     if e[0] == errno.EPIPE:
257                         # Happens when the client closes the connection
258                         pass
259                     else:
260                         logging.error("Exception in I/O handler for fd %d",
261                                       fd, exc_info=True)
262                 except:
263                     logging.error("Exception in I/O handler for fd %d",
264                                   fd, exc_info=True)
265         # reset the stopped flag so another start/stop pair can be issued
266         self._stopped = False
267         if self._blocking_log_threshold is not None:
268             signal.setitimer(signal.ITIMER_REAL, 0, 0)
269
270     def stop(self):
271         """Stop the loop after the current event loop iteration is complete.
272         If the event loop is not currently running, the next call to start()
273         will return immediately.
274
275         To use asynchronous methods from otherwise-synchronous code (such as
276         unit tests), you can start and stop the event loop like this:
277           ioloop = IOLoop()
278           async_method(ioloop=ioloop, callback=ioloop.stop)
279           ioloop.start()
280         ioloop.start() will return after async_method has run its callback,
281         whether that callback was invoked before or after ioloop.start.
282         """
283         self._running = False
284         self._stopped = True
285         self._wake()
286
287     def running(self):
288         """Returns true if this IOLoop is currently running."""
289         return self._running
290
291     def add_timeout(self, deadline, callback):
292         """Calls the given callback at the time deadline from the I/O loop."""
293         timeout = _Timeout(deadline, callback)
294         bisect.insort(self._timeouts, timeout)
295         return timeout
296
297     def remove_timeout(self, timeout):
298         self._timeouts.remove(timeout)
299
300     def add_callback(self, callback):
301         """Calls the given callback on the next I/O loop iteration."""
302         self._callbacks.add(callback)
303         self._wake()
304
305     def remove_callback(self, callback):
306         """Removes the given callback from the next I/O loop iteration."""
307         self._callbacks.remove(callback)
308
309     def _wake(self):
310         try:
311             self._waker_writer.write("x")
312         except IOError:
313             pass
314
315     def _run_callback(self, callback):
316         try:
317             callback()
318         except (KeyboardInterrupt, SystemExit):
319             raise
320         except:
321             self.handle_callback_exception(callback)
322
323     def handle_callback_exception(self, callback):
324         """This method is called whenever a callback run by the IOLoop
325         throws an exception.
326
327         By default simply logs the exception as an error.  Subclasses
328         may override this method to customize reporting of exceptions.
329
330         The exception itself is not passed explicitly, but is available
331         in sys.exc_info.
332         """
333         logging.error("Exception in callback %r", callback, exc_info=True)
334
335     def _read_waker(self, fd, events):
336         try:
337             while True:
338                 self._waker_reader.read()
339         except IOError:
340             pass
341
342     def _set_nonblocking(self, fd):
343         flags = fcntl.fcntl(fd, fcntl.F_GETFL)
344         fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
345
346     def _set_close_exec(self, fd):
347         flags = fcntl.fcntl(fd, fcntl.F_GETFD)
348         fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)
349
350
351 class _Timeout(object):
352     """An IOLoop timeout, a UNIX timestamp and a callback"""
353
354     # Reduce memory overhead when there are lots of pending callbacks
355     __slots__ = ['deadline', 'callback']
356
357     def __init__(self, deadline, callback):
358         self.deadline = deadline
359         self.callback = callback
360
361     def __cmp__(self, other):
362         return cmp((self.deadline, id(self.callback)),
363                    (other.deadline, id(other.callback)))
364
365
366 class PeriodicCallback(object):
367     """Schedules the given callback to be called periodically.
368
369     The callback is called every callback_time milliseconds.
370     """
371     def __init__(self, callback, callback_time, io_loop=None):
372         self.callback = callback
373         self.callback_time = callback_time
374         self.io_loop = io_loop or IOLoop.instance()
375         self._running = True
376
377     def start(self):
378         timeout = time.time() + self.callback_time / 1000.0
379         self.io_loop.add_timeout(timeout, self._run)
380
381     def stop(self):
382         self._running = False
383
384     def _run(self):
385         if not self._running: return
386         try:
387             self.callback()
388         except (KeyboardInterrupt, SystemExit):
389             raise
390         except:
391             logging.error("Error in periodic callback", exc_info=True)
392         self.start()
393
394
395 class _EPoll(object):
396     """An epoll-based event loop using our C module for Python 2.5 systems"""
397     _EPOLL_CTL_ADD = 1
398     _EPOLL_CTL_DEL = 2
399     _EPOLL_CTL_MOD = 3
400
401     def __init__(self):
402         self._epoll_fd = epoll.epoll_create()
403
404     def fileno(self):
405         return self._epoll_fd
406
407     def register(self, fd, events):
408         epoll.epoll_ctl(self._epoll_fd, self._EPOLL_CTL_ADD, fd, events)
409
410     def modify(self, fd, events):
411         epoll.epoll_ctl(self._epoll_fd, self._EPOLL_CTL_MOD, fd, events)
412
413     def unregister(self, fd):
414         epoll.epoll_ctl(self._epoll_fd, self._EPOLL_CTL_DEL, fd, 0)
415
416     def poll(self, timeout):
417         return epoll.epoll_wait(self._epoll_fd, int(timeout * 1000))
418
419
420 class _KQueue(object):
421     """A kqueue-based event loop for BSD/Mac systems."""
422     def __init__(self):
423         self._kqueue = select.kqueue()
424         self._active = {}
425
426     def fileno(self):
427         return self._kqueue.fileno()
428
429     def register(self, fd, events):
430         self._control(fd, events, select.KQ_EV_ADD)
431         self._active[fd] = events
432
433     def modify(self, fd, events):
434         self.unregister(fd)
435         self.register(fd, events)
436
437     def unregister(self, fd):
438         events = self._active.pop(fd)
439         self._control(fd, events, select.KQ_EV_DELETE)
440
441     def _control(self, fd, events, flags):
442         kevents = []
443         if events & IOLoop.WRITE:
444             kevents.append(select.kevent(
445                     fd, filter=select.KQ_FILTER_WRITE, flags=flags))
446         if events & IOLoop.READ or not kevents:
447             # Always read when there is not a write
448             kevents.append(select.kevent(
449                     fd, filter=select.KQ_FILTER_READ, flags=flags))
450         # Even though control() takes a list, it seems to return EINVAL
451         # on Mac OS X (10.6) when there is more than one event in the list.
452         for kevent in kevents:
453             self._kqueue.control([kevent], 0)
454
455     def poll(self, timeout):
456         kevents = self._kqueue.control(None, 1000, timeout)
457         events = {}
458         for kevent in kevents:
459             fd = kevent.ident
460             flags = 0
461             if kevent.filter == select.KQ_FILTER_READ:
462                 events[fd] = events.get(fd, 0) | IOLoop.READ
463             if kevent.filter == select.KQ_FILTER_WRITE:
464                 events[fd] = events.get(fd, 0) | IOLoop.WRITE
465             if kevent.flags & select.KQ_EV_ERROR:
466                 events[fd] = events.get(fd, 0) | IOLoop.ERROR
467         return events.items()
468
469
470 class _Select(object):
471     """A simple, select()-based IOLoop implementation for non-Linux systems"""
472     def __init__(self):
473         self.read_fds = set()
474         self.write_fds = set()
475         self.error_fds = set()
476         self.fd_sets = (self.read_fds, self.write_fds, self.error_fds)
477
478     def register(self, fd, events):
479         if events & IOLoop.READ: self.read_fds.add(fd)
480         if events & IOLoop.WRITE: self.write_fds.add(fd)
481         if events & IOLoop.ERROR: self.error_fds.add(fd)
482
483     def modify(self, fd, events):
484         self.unregister(fd)
485         self.register(fd, events)
486
487     def unregister(self, fd):
488         self.read_fds.discard(fd)
489         self.write_fds.discard(fd)
490         self.error_fds.discard(fd)
491
492     def poll(self, timeout):
493         readable, writeable, errors = select.select(
494             self.read_fds, self.write_fds, self.error_fds, timeout)
495         events = {}
496         for fd in readable:
497             events[fd] = events.get(fd, 0) | IOLoop.READ
498         for fd in writeable:
499             events[fd] = events.get(fd, 0) | IOLoop.WRITE
500         for fd in errors:
501             events[fd] = events.get(fd, 0) | IOLoop.ERROR
502         return events.items()
503
504
505 # Choose a poll implementation. Use epoll if it is available, fall back to
506 # select() for non-Linux platforms
507 if hasattr(select, "epoll"):
508     # Python 2.6+ on Linux
509     _poll = select.epoll
510 elif hasattr(select, "kqueue"):
511     # Python 2.6+ on BSD or Mac
512     _poll = _KQueue
513 else:
514     try:
515         # Linux systems with our C module installed
516         import epoll
517         _poll = _EPoll
518     except:
519         # All other systems
520         import sys
521         if "linux" in sys.platform:
522             logging.warning("epoll module not found; using select()")
523         _poll = _Select