3 # Copyright 2009 Facebook
5 # Licensed under the Apache License, Version 2.0 (the "License"); you may
6 # not use this file except in compliance with the License. You may obtain
7 # a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14 # License for the specific language governing permissions and limitations
17 """A level-triggered I/O loop for non-blocking sockets."""
37 import win32_support as fcntl
42 """A level-triggered I/O loop.
44 We use epoll if it is available, or else we fall back on select(). If
45 you are implementing a system that needs to handle 1000s of simultaneous
46 connections, you should use Linux and either compile our epoll module or
47 use Python 2.6+ to get epoll support.
49 Example usage for a simple TCP server:
56 def connection_ready(sock, fd, events):
59 connection, address = sock.accept()
60 except socket.error, e:
61 if e[0] not in (errno.EWOULDBLOCK, errno.EAGAIN):
64 connection.setblocking(0)
65 handle_connection(connection, address)
67 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
68 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
73 io_loop = ioloop.IOLoop.instance()
74 callback = functools.partial(connection_ready, sock)
75 io_loop.add_handler(sock.fileno(), callback, io_loop.READ)
79 # Constants from the epoll module
86 _EPOLLONESHOT = (1 << 30)
89 # Our events map exactly to the epoll events
93 ERROR = _EPOLLERR | _EPOLLHUP | _EPOLLRDHUP
95 def __init__(self, impl=None):
96 self._impl = impl or _poll()
97 if hasattr(self._impl, 'fileno'):
98 self._set_close_exec(self._impl.fileno())
101 self._callbacks = set()
103 self._running = False
104 self._stopped = False
105 self._blocking_log_threshold = None
107 # Create a pipe that we send bogus data to when we want to wake
108 # the I/O loop when it is idle
111 self._set_nonblocking(r)
112 self._set_nonblocking(w)
113 self._set_close_exec(r)
114 self._set_close_exec(w)
115 self._waker_reader = os.fdopen(r, "r", 0)
116 self._waker_writer = os.fdopen(w, "w", 0)
118 self._waker_reader = self._waker_writer = win32_support.Pipe()
119 r = self._waker_writer.reader_fd
120 self.add_handler(r, self._read_waker, self.READ)
124 """Returns a global IOLoop instance.
126 Most single-threaded applications have a single, global IOLoop.
127 Use this method instead of passing around IOLoop instances
128 throughout your code.
130 A common pattern for classes that depend on IOLoops is to use
131 a default argument to enable programs with multiple IOLoops
132 but not require the argument for simpler applications:
134 class MyClass(object):
135 def __init__(self, io_loop=None):
136 self.io_loop = io_loop or IOLoop.instance()
138 if not hasattr(cls, "_instance"):
139 cls._instance = cls()
143 def initialized(cls):
144 return hasattr(cls, "_instance")
146 def add_handler(self, fd, handler, events):
147 """Registers the given handler to receive the given events for fd."""
148 self._handlers[fd] = handler
149 self._impl.register(fd, events | self.ERROR)
151 def update_handler(self, fd, events):
152 """Changes the events we listen for fd."""
153 self._impl.modify(fd, events | self.ERROR)
155 def remove_handler(self, fd):
156 """Stop listening for events on fd."""
157 self._handlers.pop(fd, None)
158 self._events.pop(fd, None)
160 self._impl.unregister(fd)
161 except (OSError, IOError):
162 logging.debug("Error deleting fd from IOLoop", exc_info=True)
164 def set_blocking_log_threshold(self, s):
165 """Logs a stack trace if the ioloop is blocked for more than s seconds.
166 Pass None to disable. Requires python 2.6 on a unixy platform.
168 if not hasattr(signal, "setitimer"):
169 logging.error("set_blocking_log_threshold requires a signal module "
170 "with the setitimer method")
172 self._blocking_log_threshold = s
174 signal.signal(signal.SIGALRM, self._handle_alarm)
176 def _handle_alarm(self, signal, frame):
177 logging.warning('IOLoop blocked for %f seconds in\n%s',
178 self._blocking_log_threshold,
179 ''.join(traceback.format_stack(frame)))
182 """Starts the I/O loop.
184 The loop will run until one of the I/O handlers calls stop(), which
185 will make the loop stop after the current event iteration completes.
188 self._stopped = False
192 # Never use an infinite timeout here - it can stall epoll
195 # Prevent IO event starvation by delaying new callbacks
196 # to the next iteration of the event loop.
197 callbacks = list(self._callbacks)
198 for callback in callbacks:
199 # A callback can add or remove other callbacks
200 if callback in self._callbacks:
201 self._callbacks.remove(callback)
202 self._run_callback(callback)
209 while self._timeouts and self._timeouts[0].deadline <= now:
210 timeout = self._timeouts.pop(0)
211 self._run_callback(timeout.callback)
213 milliseconds = self._timeouts[0].deadline - now
214 poll_timeout = min(milliseconds, poll_timeout)
216 if not self._running:
219 if self._blocking_log_threshold is not None:
220 # clear alarm so it doesn't fire while poll is waiting for
222 signal.setitimer(signal.ITIMER_REAL, 0, 0)
225 event_pairs = self._impl.poll(poll_timeout)
227 # Depending on python version and IOLoop implementation,
228 # different exception types may be thrown and there are
229 # two ways EINTR might be signaled:
230 # * e.errno == errno.EINTR
231 # * e.args is like (errno.EINTR, 'Interrupted system call')
232 if (getattr(e, 'errno') == errno.EINTR or
233 (isinstance(getattr(e, 'args'), tuple) and
234 len(e.args) == 2 and e.args[0] == errno.EINTR)):
235 logging.warning("Interrupted system call", exc_info=1)
240 if self._blocking_log_threshold is not None:
241 signal.setitimer(signal.ITIMER_REAL,
242 self._blocking_log_threshold, 0)
244 # Pop one fd at a time from the set of pending fds and run
245 # its handler. Since that handler may perform actions on
246 # other file descriptors, there may be reentrant calls to
247 # this IOLoop that update self._events
248 self._events.update(event_pairs)
250 fd, events = self._events.popitem()
252 self._handlers[fd](fd, events)
253 except (KeyboardInterrupt, SystemExit):
255 except (OSError, IOError), e:
256 if e[0] == errno.EPIPE:
257 # Happens when the client closes the connection
260 logging.error("Exception in I/O handler for fd %d",
263 logging.error("Exception in I/O handler for fd %d",
265 # reset the stopped flag so another start/stop pair can be issued
266 self._stopped = False
267 if self._blocking_log_threshold is not None:
268 signal.setitimer(signal.ITIMER_REAL, 0, 0)
271 """Stop the loop after the current event loop iteration is complete.
272 If the event loop is not currently running, the next call to start()
273 will return immediately.
275 To use asynchronous methods from otherwise-synchronous code (such as
276 unit tests), you can start and stop the event loop like this:
278 async_method(ioloop=ioloop, callback=ioloop.stop)
280 ioloop.start() will return after async_method has run its callback,
281 whether that callback was invoked before or after ioloop.start.
283 self._running = False
288 """Returns true if this IOLoop is currently running."""
291 def add_timeout(self, deadline, callback):
292 """Calls the given callback at the time deadline from the I/O loop."""
293 timeout = _Timeout(deadline, callback)
294 bisect.insort(self._timeouts, timeout)
297 def remove_timeout(self, timeout):
298 self._timeouts.remove(timeout)
300 def add_callback(self, callback):
301 """Calls the given callback on the next I/O loop iteration."""
302 self._callbacks.add(callback)
305 def remove_callback(self, callback):
306 """Removes the given callback from the next I/O loop iteration."""
307 self._callbacks.remove(callback)
311 self._waker_writer.write("x")
315 def _run_callback(self, callback):
318 except (KeyboardInterrupt, SystemExit):
321 self.handle_callback_exception(callback)
323 def handle_callback_exception(self, callback):
324 """This method is called whenever a callback run by the IOLoop
327 By default simply logs the exception as an error. Subclasses
328 may override this method to customize reporting of exceptions.
330 The exception itself is not passed explicitly, but is available
333 logging.error("Exception in callback %r", callback, exc_info=True)
335 def _read_waker(self, fd, events):
338 self._waker_reader.read()
342 def _set_nonblocking(self, fd):
343 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
344 fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
346 def _set_close_exec(self, fd):
347 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
348 fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)
351 class _Timeout(object):
352 """An IOLoop timeout, a UNIX timestamp and a callback"""
354 # Reduce memory overhead when there are lots of pending callbacks
355 __slots__ = ['deadline', 'callback']
357 def __init__(self, deadline, callback):
358 self.deadline = deadline
359 self.callback = callback
361 def __cmp__(self, other):
362 return cmp((self.deadline, id(self.callback)),
363 (other.deadline, id(other.callback)))
366 class PeriodicCallback(object):
367 """Schedules the given callback to be called periodically.
369 The callback is called every callback_time milliseconds.
371 def __init__(self, callback, callback_time, io_loop=None):
372 self.callback = callback
373 self.callback_time = callback_time
374 self.io_loop = io_loop or IOLoop.instance()
378 timeout = time.time() + self.callback_time / 1000.0
379 self.io_loop.add_timeout(timeout, self._run)
382 self._running = False
385 if not self._running: return
388 except (KeyboardInterrupt, SystemExit):
391 logging.error("Error in periodic callback", exc_info=True)
395 class _EPoll(object):
396 """An epoll-based event loop using our C module for Python 2.5 systems"""
402 self._epoll_fd = epoll.epoll_create()
405 return self._epoll_fd
407 def register(self, fd, events):
408 epoll.epoll_ctl(self._epoll_fd, self._EPOLL_CTL_ADD, fd, events)
410 def modify(self, fd, events):
411 epoll.epoll_ctl(self._epoll_fd, self._EPOLL_CTL_MOD, fd, events)
413 def unregister(self, fd):
414 epoll.epoll_ctl(self._epoll_fd, self._EPOLL_CTL_DEL, fd, 0)
416 def poll(self, timeout):
417 return epoll.epoll_wait(self._epoll_fd, int(timeout * 1000))
420 class _KQueue(object):
421 """A kqueue-based event loop for BSD/Mac systems."""
423 self._kqueue = select.kqueue()
427 return self._kqueue.fileno()
429 def register(self, fd, events):
430 self._control(fd, events, select.KQ_EV_ADD)
431 self._active[fd] = events
433 def modify(self, fd, events):
435 self.register(fd, events)
437 def unregister(self, fd):
438 events = self._active.pop(fd)
439 self._control(fd, events, select.KQ_EV_DELETE)
441 def _control(self, fd, events, flags):
443 if events & IOLoop.WRITE:
444 kevents.append(select.kevent(
445 fd, filter=select.KQ_FILTER_WRITE, flags=flags))
446 if events & IOLoop.READ or not kevents:
447 # Always read when there is not a write
448 kevents.append(select.kevent(
449 fd, filter=select.KQ_FILTER_READ, flags=flags))
450 # Even though control() takes a list, it seems to return EINVAL
451 # on Mac OS X (10.6) when there is more than one event in the list.
452 for kevent in kevents:
453 self._kqueue.control([kevent], 0)
455 def poll(self, timeout):
456 kevents = self._kqueue.control(None, 1000, timeout)
458 for kevent in kevents:
461 if kevent.filter == select.KQ_FILTER_READ:
462 events[fd] = events.get(fd, 0) | IOLoop.READ
463 if kevent.filter == select.KQ_FILTER_WRITE:
464 events[fd] = events.get(fd, 0) | IOLoop.WRITE
465 if kevent.flags & select.KQ_EV_ERROR:
466 events[fd] = events.get(fd, 0) | IOLoop.ERROR
467 return events.items()
470 class _Select(object):
471 """A simple, select()-based IOLoop implementation for non-Linux systems"""
473 self.read_fds = set()
474 self.write_fds = set()
475 self.error_fds = set()
476 self.fd_sets = (self.read_fds, self.write_fds, self.error_fds)
478 def register(self, fd, events):
479 if events & IOLoop.READ: self.read_fds.add(fd)
480 if events & IOLoop.WRITE: self.write_fds.add(fd)
481 if events & IOLoop.ERROR: self.error_fds.add(fd)
483 def modify(self, fd, events):
485 self.register(fd, events)
487 def unregister(self, fd):
488 self.read_fds.discard(fd)
489 self.write_fds.discard(fd)
490 self.error_fds.discard(fd)
492 def poll(self, timeout):
493 readable, writeable, errors = select.select(
494 self.read_fds, self.write_fds, self.error_fds, timeout)
497 events[fd] = events.get(fd, 0) | IOLoop.READ
499 events[fd] = events.get(fd, 0) | IOLoop.WRITE
501 events[fd] = events.get(fd, 0) | IOLoop.ERROR
502 return events.items()
505 # Choose a poll implementation. Use epoll if it is available, fall back to
506 # select() for non-Linux platforms
507 if hasattr(select, "epoll"):
508 # Python 2.6+ on Linux
510 elif hasattr(select, "kqueue"):
511 # Python 2.6+ on BSD or Mac
515 # Linux systems with our C module installed
521 if "linux" in sys.platform:
522 logging.warning("epoll module not found; using select()")