3 # Copyright 2009 Facebook
5 # Licensed under the Apache License, Version 2.0 (the "License"); you may
6 # not use this file except in compliance with the License. You may obtain
7 # a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14 # License for the specific language governing permissions and limitations
17 """Blocking and non-blocking HTTP client implementations using pycurl."""
34 class HTTPClient(object):
35 """A blocking HTTP client backed with pycurl.
37 Typical usage looks like this:
39 http_client = httpclient.HTTPClient()
41 response = http_client.fetch("http://www.google.com/")
43 except httpclient.HTTPError, e:
46 fetch() can take a string URL or an HTTPRequest instance, which offers
47 more options, like executing POST/PUT/DELETE requests.
49 def __init__(self, max_simultaneous_connections=None):
50 self._curl = _curl_create(max_simultaneous_connections)
55 def fetch(self, request, **kwargs):
56 """Executes an HTTPRequest, returning an HTTPResponse.
58 If an error occurs during the fetch, we raise an HTTPError.
60 if not isinstance(request, HTTPRequest):
61 request = HTTPRequest(url=request, **kwargs)
62 buffer = cStringIO.StringIO()
63 headers = httputil.HTTPHeaders()
65 _curl_setup_request(self._curl, request, buffer, headers)
67 code = self._curl.getinfo(pycurl.HTTP_CODE)
68 effective_url = self._curl.getinfo(pycurl.EFFECTIVE_URL)
70 response = HTTPResponse(
71 request=request, code=code, headers=headers,
72 buffer=buffer, effective_url=effective_url)
73 if code < 200 or code >= 300:
74 raise HTTPError(code, response=response)
76 except pycurl.error, e:
81 class AsyncHTTPClient(object):
82 """An non-blocking HTTP client backed with pycurl.
88 def handle_request(response):
90 print "Error:", response.error
93 ioloop.IOLoop.instance().stop()
95 http_client = httpclient.AsyncHTTPClient()
96 http_client.fetch("http://www.google.com/", handle_request)
97 ioloop.IOLoop.instance().start()
99 fetch() can take a string URL or an HTTPRequest instance, which offers
100 more options, like executing POST/PUT/DELETE requests.
102 The keyword argument max_clients to the AsyncHTTPClient constructor
103 determines the maximum number of simultaneous fetch() operations that
104 can execute in parallel on each IOLoop.
106 _ASYNC_CLIENTS = weakref.WeakKeyDictionary()
108 def __new__(cls, io_loop=None, max_clients=10,
109 max_simultaneous_connections=None):
110 # There is one client per IOLoop since they share curl instances
111 io_loop = io_loop or ioloop.IOLoop.instance()
112 if io_loop in cls._ASYNC_CLIENTS:
113 return cls._ASYNC_CLIENTS[io_loop]
115 instance = super(AsyncHTTPClient, cls).__new__(cls)
116 instance.io_loop = io_loop
117 instance._multi = pycurl.CurlMulti()
118 instance._curls = [_curl_create(max_simultaneous_connections)
119 for i in xrange(max_clients)]
120 instance._free_list = instance._curls[:]
121 instance._requests = collections.deque()
123 instance._events = {}
124 instance._added_perform_callback = False
125 instance._timeout = None
126 instance._closed = False
127 cls._ASYNC_CLIENTS[io_loop] = instance
131 """Destroys this http client, freeing any file descriptors used.
132 Not needed in normal use, but may be helpful in unittests that
133 create and destroy http clients. No other methods may be called
134 on the AsyncHTTPClient after close().
136 del AsyncHTTPClient._ASYNC_CLIENTS[self.io_loop]
137 for curl in self._curls:
142 def fetch(self, request, callback, **kwargs):
143 """Executes an HTTPRequest, calling callback with an HTTPResponse.
145 If an error occurs during the fetch, the HTTPResponse given to the
146 callback has a non-None error attribute that contains the exception
147 encountered during the request. You can call response.reraise() to
148 throw the exception (if any) in the callback.
150 if not isinstance(request, HTTPRequest):
151 request = HTTPRequest(url=request, **kwargs)
152 self._requests.append((request, callback))
153 self._add_perform_callback()
155 def _add_perform_callback(self):
156 if not self._added_perform_callback:
157 self.io_loop.add_callback(self._perform)
158 self._added_perform_callback = True
160 def _handle_events(self, fd, events):
161 self._events[fd] = events
162 self._add_perform_callback()
164 def _handle_timeout(self):
169 self._added_perform_callback = False
176 ret, num_handles = self._multi.perform()
177 if ret != pycurl.E_CALL_MULTI_PERFORM:
180 # Update the set of active file descriptors. It is important
181 # that this happen immediately after perform() because
182 # fds that have been removed from fdset are free to be reused
185 (readable, writable, exceptable) = self._multi.fdset()
187 fds[fd] = fds.get(fd, 0) | 0x1 | 0x2
189 fds[fd] = fds.get(fd, 0) | 0x4
190 for fd in exceptable:
191 fds[fd] = fds.get(fd, 0) | 0x8 | 0x10
193 if fds and max(fds.iterkeys()) > 900:
194 # Libcurl has a bug in which it behaves unpredictably with
195 # file descriptors greater than 1024. (This is because
196 # even though it uses poll() instead of select(), it still
197 # uses FD_SET internally) Since curl opens its own file
198 # descriptors we can't catch this problem when it happens,
199 # and the best we can do is detect that it's about to
200 # happen. Exiting is a lousy way to handle this error,
201 # but there's not much we can do at this point. Exiting
202 # (and getting restarted by whatever monitoring process
203 # is handling crashed tornado processes) will at least
204 # get things working again and hopefully bring the issue
205 # to someone's attention.
206 # If you run into this issue, you either have a file descriptor
207 # leak or need to run more tornado processes (so that none
208 # of them are handling more than 1000 simultaneous connections)
209 print >> sys.stderr, "ERROR: File descriptor too high for libcurl. Exiting."
210 logging.error("File descriptor too high for libcurl. Exiting.")
216 self.io_loop.remove_handler(fd)
217 except (OSError, IOError), e:
218 if e[0] != errno.ENOENT:
221 for fd, events in fds.iteritems():
222 old_events = self._fds.get(fd, None)
223 if old_events is None:
224 self.io_loop.add_handler(fd, self._handle_events, events)
225 elif old_events != events:
227 self.io_loop.update_handler(fd, events)
228 except (OSError, IOError), e:
229 if e[0] == errno.ENOENT:
230 self.io_loop.add_handler(fd, self._handle_events,
237 # Handle completed fetches
240 num_q, ok_list, err_list = self._multi.info_read()
244 for curl, errnum, errmsg in err_list:
245 self._finish(curl, errnum, errmsg)
250 # Start fetching new URLs
252 while self._free_list and self._requests:
254 curl = self._free_list.pop()
255 (request, callback) = self._requests.popleft()
257 "headers": httputil.HTTPHeaders(),
258 "buffer": cStringIO.StringIO(),
260 "callback": callback,
261 "start_time": time.time(),
263 _curl_setup_request(curl, request, curl.info["buffer"],
264 curl.info["headers"])
265 self._multi.add_handle(curl)
267 if not started and not completed:
270 if self._timeout is not None:
271 self.io_loop.remove_timeout(self._timeout)
275 self._timeout = self.io_loop.add_timeout(
276 time.time() + 0.2, self._handle_timeout)
279 def _finish(self, curl, curl_error=None, curl_message=None):
282 self._multi.remove_handle(curl)
283 self._free_list.append(curl)
284 buffer = info["buffer"]
286 error = CurlError(curl_error, curl_message)
294 code = curl.getinfo(pycurl.HTTP_CODE)
295 effective_url = curl.getinfo(pycurl.EFFECTIVE_URL)
298 info["callback"](HTTPResponse(
299 request=info["request"], code=code, headers=info["headers"],
300 buffer=buffer, effective_url=effective_url, error=error,
301 request_time=time.time() - info["start_time"]))
302 except (KeyboardInterrupt, SystemExit):
305 logging.error("Exception in callback %r", info["callback"],
309 class AsyncHTTPClient2(object):
310 """Alternate implementation of AsyncHTTPClient.
312 This class has the same interface as AsyncHTTPClient (so see that class
313 for usage documentation) but is implemented with a different set of
314 libcurl APIs (curl_multi_socket_action instead of fdset/perform).
315 This implementation will likely become the default in the future, but
316 for now should be considered somewhat experimental.
318 The main advantage of this class over the original implementation is
319 that it is immune to the fd > 1024 bug, so applications with a large
320 number of simultaneous requests (e.g. long-polling) may prefer this
324 * Timeouts connecting to localhost
325 In some situations, this implementation will return a connection
326 timeout when the old implementation would be able to connect. This
327 has only been observed when connecting to localhost when using
328 the kqueue-based IOLoop (mac/bsd), but it may also occur on epoll (linux)
329 and, in principle, for non-localhost sites.
330 While the bug is unrelated to IPv6, disabling IPv6 will avoid the
331 most common manifestations of the bug, so this class disables IPv6 when
332 it detects an affected version of libcurl.
333 The underlying cause is a libcurl bug in versions up to and including
334 7.21.0 (it will be fixed in the not-yet-released 7.21.1)
335 http://sourceforge.net/tracker/?func=detail&aid=3017819&group_id=976&atid=100976
337 _ASYNC_CLIENTS = weakref.WeakKeyDictionary()
339 def __new__(cls, io_loop=None, max_clients=10,
340 max_simultaneous_connections=None):
341 # There is one client per IOLoop since they share curl instances
342 io_loop = io_loop or ioloop.IOLoop.instance()
343 if io_loop in cls._ASYNC_CLIENTS:
344 return cls._ASYNC_CLIENTS[io_loop]
346 instance = super(AsyncHTTPClient2, cls).__new__(cls)
347 instance.io_loop = io_loop
348 instance._multi = pycurl.CurlMulti()
349 instance._multi.setopt(pycurl.M_TIMERFUNCTION,
350 instance._set_timeout)
351 instance._multi.setopt(pycurl.M_SOCKETFUNCTION,
352 instance._handle_socket)
353 instance._curls = [_curl_create(max_simultaneous_connections)
354 for i in xrange(max_clients)]
355 instance._free_list = instance._curls[:]
356 instance._requests = collections.deque()
358 instance._timeout = None
359 cls._ASYNC_CLIENTS[io_loop] = instance
363 """Destroys this http client, freeing any file descriptors used.
364 Not needed in normal use, but may be helpful in unittests that
365 create and destroy http clients. No other methods may be called
366 on the AsyncHTTPClient after close().
368 del AsyncHTTPClient2._ASYNC_CLIENTS[self.io_loop]
369 for curl in self._curls:
374 def fetch(self, request, callback, **kwargs):
375 """Executes an HTTPRequest, calling callback with an HTTPResponse.
377 If an error occurs during the fetch, the HTTPResponse given to the
378 callback has a non-None error attribute that contains the exception
379 encountered during the request. You can call response.reraise() to
380 throw the exception (if any) in the callback.
382 if not isinstance(request, HTTPRequest):
383 request = HTTPRequest(url=request, **kwargs)
384 self._requests.append((request, callback))
385 self._process_queue()
388 def _handle_socket(self, event, fd, multi, data):
389 """Called by libcurl when it wants to change the file descriptors
393 pycurl.POLL_NONE: ioloop.IOLoop.NONE,
394 pycurl.POLL_IN: ioloop.IOLoop.READ,
395 pycurl.POLL_OUT: ioloop.IOLoop.WRITE,
396 pycurl.POLL_INOUT: ioloop.IOLoop.READ | ioloop.IOLoop.WRITE
398 if event == pycurl.POLL_REMOVE:
399 self.io_loop.remove_handler(fd)
402 ioloop_event = event_map[event]
403 if fd not in self._fds:
404 self._fds[fd] = ioloop_event
405 self.io_loop.add_handler(fd, self._handle_events,
408 self._fds[fd] = ioloop_event
409 self.io_loop.update_handler(fd, ioloop_event)
411 def _set_timeout(self, msecs):
412 """Called by libcurl to schedule a timeout."""
413 if self._timeout is not None:
414 self.io_loop.remove_timeout(self._timeout)
415 self._timeout = self.io_loop.add_timeout(
416 time.time() + msecs/1000.0, self._handle_timeout)
418 def _handle_events(self, fd, events):
419 """Called by IOLoop when there is activity on one of our
423 if events & ioloop.IOLoop.READ: action |= pycurl.CSELECT_IN
424 if events & ioloop.IOLoop.WRITE: action |= pycurl.CSELECT_OUT
427 ret, num_handles = self._multi.socket_action(fd, action)
430 if ret != pycurl.E_CALL_MULTI_PERFORM:
432 self._finish_pending_requests()
434 def _handle_timeout(self):
435 """Called by IOLoop when the requested timeout has passed."""
439 ret, num_handles = self._multi.socket_action(
440 pycurl.SOCKET_TIMEOUT, 0)
443 if ret != pycurl.E_CALL_MULTI_PERFORM:
445 self._finish_pending_requests()
447 # In theory, we shouldn't have to do this because curl will
448 # call _set_timeout whenever the timeout changes. However,
449 # sometimes after _handle_timeout we will need to reschedule
450 # immediately even though nothing has changed from curl's
451 # perspective. This is because when socket_action is
452 # called with SOCKET_TIMEOUT, libcurl decides internally which
453 # timeouts need to be processed by using a monotonic clock
454 # (where available) while tornado uses python's time.time()
455 # to decide when timeouts have occurred. When those clocks
456 # disagree on elapsed time (as they will whenever there is an
457 # NTP adjustment), tornado might call _handle_timeout before
458 # libcurl is ready. After each timeout, resync the scheduled
459 # timeout with libcurl's current state.
460 new_timeout = self._multi.timeout()
461 if new_timeout != -1:
462 self._set_timeout(new_timeout)
464 def _finish_pending_requests(self):
465 """Process any requests that were completed by the last
466 call to multi.socket_action.
469 num_q, ok_list, err_list = self._multi.info_read()
472 for curl, errnum, errmsg in err_list:
473 self._finish(curl, errnum, errmsg)
476 self._process_queue()
478 def _process_queue(self):
481 while self._free_list and self._requests:
483 curl = self._free_list.pop()
484 (request, callback) = self._requests.popleft()
486 "headers": httputil.HTTPHeaders(),
487 "buffer": cStringIO.StringIO(),
489 "callback": callback,
490 "start_time": time.time(),
492 # Disable IPv6 to mitigate the effects of this bug
493 # on curl versions <= 7.21.0
494 # http://sourceforge.net/tracker/?func=detail&aid=3017819&group_id=976&atid=100976
495 if pycurl.version_info()[2] <= 0x71500: # 7.21.0
496 curl.setopt(pycurl.IPRESOLVE, pycurl.IPRESOLVE_V4)
497 _curl_setup_request(curl, request, curl.info["buffer"],
498 curl.info["headers"])
499 self._multi.add_handle(curl)
504 def _finish(self, curl, curl_error=None, curl_message=None):
507 self._multi.remove_handle(curl)
508 self._free_list.append(curl)
509 buffer = info["buffer"]
511 error = CurlError(curl_error, curl_message)
518 code = curl.getinfo(pycurl.HTTP_CODE)
519 effective_url = curl.getinfo(pycurl.EFFECTIVE_URL)
522 info["callback"](HTTPResponse(
523 request=info["request"], code=code, headers=info["headers"],
524 buffer=buffer, effective_url=effective_url, error=error,
525 request_time=time.time() - info["start_time"]))
526 except (KeyboardInterrupt, SystemExit):
529 logging.error("Exception in callback %r", info["callback"],
533 class HTTPRequest(object):
534 def __init__(self, url, method="GET", headers=None, body=None,
535 auth_username=None, auth_password=None,
536 connect_timeout=20.0, request_timeout=20.0,
537 if_modified_since=None, follow_redirects=True,
538 max_redirects=5, user_agent=None, use_gzip=True,
539 network_interface=None, streaming_callback=None,
540 header_callback=None, prepare_curl_callback=None,
541 allow_nonstandard_methods=False):
543 headers = httputil.HTTPHeaders()
544 if if_modified_since:
545 timestamp = calendar.timegm(if_modified_since.utctimetuple())
546 headers["If-Modified-Since"] = email.utils.formatdate(
547 timestamp, localtime=False, usegmt=True)
548 if "Pragma" not in headers:
549 headers["Pragma"] = ""
550 self.url = _utf8(url)
552 self.headers = headers
554 self.auth_username = _utf8(auth_username)
555 self.auth_password = _utf8(auth_password)
556 self.connect_timeout = connect_timeout
557 self.request_timeout = request_timeout
558 self.follow_redirects = follow_redirects
559 self.max_redirects = max_redirects
560 self.user_agent = user_agent
561 self.use_gzip = use_gzip
562 self.network_interface = network_interface
563 self.streaming_callback = streaming_callback
564 self.header_callback = header_callback
565 self.prepare_curl_callback = prepare_curl_callback
566 self.allow_nonstandard_methods = allow_nonstandard_methods
569 class HTTPResponse(object):
570 def __init__(self, request, code, headers={}, buffer=None, effective_url=None,
571 error=None, request_time=None):
572 self.request = request
574 self.headers = headers
577 if effective_url is None:
578 self.effective_url = request.url
580 self.effective_url = effective_url
582 if self.code < 200 or self.code >= 300:
583 self.error = HTTPError(self.code, response=self)
588 self.request_time = request_time
591 if self.buffer is None:
593 elif self._body is None:
594 self._body = self.buffer.getvalue()
598 body = property(_get_body)
605 args = ",".join("%s=%r" % i for i in self.__dict__.iteritems())
606 return "%s(%s)" % (self.__class__.__name__, args)
609 if self.buffer is not None:
613 class HTTPError(Exception):
614 """Exception thrown for an unsuccessful HTTP request.
617 code - HTTP error integer error code, e.g. 404. Error code 599 is
618 used when no HTTP response was received, e.g. for a timeout.
619 response - HTTPResponse object, if any.
621 Note that if follow_redirects is False, redirects become HTTPErrors,
622 and you can look at error.response.headers['Location'] to see the
623 destination of the redirect.
625 def __init__(self, code, message=None, response=None):
627 message = message or httplib.responses.get(code, "Unknown")
628 self.response = response
629 Exception.__init__(self, "HTTP %d: %s" % (self.code, message))
632 class CurlError(HTTPError):
633 def __init__(self, errno, message):
634 HTTPError.__init__(self, 599, message)
638 def _curl_create(max_simultaneous_connections=None):
640 if logging.getLogger().isEnabledFor(logging.DEBUG):
641 curl.setopt(pycurl.VERBOSE, 1)
642 curl.setopt(pycurl.DEBUGFUNCTION, _curl_debug)
643 curl.setopt(pycurl.MAXCONNECTS, max_simultaneous_connections or 5)
647 def _curl_setup_request(curl, request, buffer, headers):
648 curl.setopt(pycurl.URL, request.url)
649 # Request headers may be either a regular dict or HTTPHeaders object
650 if isinstance(request.headers, httputil.HTTPHeaders):
651 curl.setopt(pycurl.HTTPHEADER,
652 [_utf8("%s: %s" % i) for i in request.headers.get_all()])
654 curl.setopt(pycurl.HTTPHEADER,
655 [_utf8("%s: %s" % i) for i in request.headers.iteritems()])
656 if request.header_callback:
657 curl.setopt(pycurl.HEADERFUNCTION, request.header_callback)
659 curl.setopt(pycurl.HEADERFUNCTION,
660 lambda line: _curl_header_callback(headers, line))
661 if request.streaming_callback:
662 curl.setopt(pycurl.WRITEFUNCTION, request.streaming_callback)
664 curl.setopt(pycurl.WRITEFUNCTION, buffer.write)
665 curl.setopt(pycurl.FOLLOWLOCATION, request.follow_redirects)
666 curl.setopt(pycurl.MAXREDIRS, request.max_redirects)
667 curl.setopt(pycurl.CONNECTTIMEOUT, int(request.connect_timeout))
668 curl.setopt(pycurl.TIMEOUT, int(request.request_timeout))
669 if request.user_agent:
670 curl.setopt(pycurl.USERAGENT, _utf8(request.user_agent))
672 curl.setopt(pycurl.USERAGENT, "Mozilla/5.0 (compatible; pycurl)")
673 if request.network_interface:
674 curl.setopt(pycurl.INTERFACE, request.network_interface)
676 curl.setopt(pycurl.ENCODING, "gzip,deflate")
678 curl.setopt(pycurl.ENCODING, "none")
680 # Set the request method through curl's retarded interface which makes
681 # up names for almost every single method
683 "GET": pycurl.HTTPGET,
685 "PUT": pycurl.UPLOAD,
686 "HEAD": pycurl.NOBODY,
688 custom_methods = set(["DELETE"])
689 for o in curl_options.values():
690 curl.setopt(o, False)
691 if request.method in curl_options:
692 curl.unsetopt(pycurl.CUSTOMREQUEST)
693 curl.setopt(curl_options[request.method], True)
694 elif request.allow_nonstandard_methods or request.method in custom_methods:
695 curl.setopt(pycurl.CUSTOMREQUEST, request.method)
697 raise KeyError('unknown method ' + request.method)
699 # Handle curl's cryptic options for every individual HTTP method
700 if request.method in ("POST", "PUT"):
701 request_buffer = cStringIO.StringIO(escape.utf8(request.body))
702 curl.setopt(pycurl.READFUNCTION, request_buffer.read)
703 if request.method == "POST":
705 if cmd == curl.IOCMD_RESTARTREAD:
706 request_buffer.seek(0)
707 curl.setopt(pycurl.IOCTLFUNCTION, ioctl)
708 curl.setopt(pycurl.POSTFIELDSIZE, len(request.body))
710 curl.setopt(pycurl.INFILESIZE, len(request.body))
712 if request.auth_username and request.auth_password:
713 userpwd = "%s:%s" % (request.auth_username, request.auth_password)
714 curl.setopt(pycurl.HTTPAUTH, pycurl.HTTPAUTH_BASIC)
715 curl.setopt(pycurl.USERPWD, userpwd)
716 logging.info("%s %s (username: %r)", request.method, request.url,
717 request.auth_username)
719 curl.unsetopt(pycurl.USERPWD)
720 logging.info("%s %s", request.method, request.url)
721 if request.prepare_curl_callback is not None:
722 request.prepare_curl_callback(curl)
725 def _curl_header_callback(headers, header_line):
726 if header_line.startswith("HTTP/"):
729 if header_line == "\r\n":
731 headers.parse_line(header_line)
733 def _curl_debug(debug_type, debug_msg):
734 debug_types = ('I', '<', '>', '<', '>')
736 logging.debug('%s', debug_msg.strip())
737 elif debug_type in (1, 2):
738 for line in debug_msg.splitlines():
739 logging.debug('%s %s', debug_types[debug_type], line)
740 elif debug_type == 4:
741 logging.debug('%s %r', debug_types[debug_type], debug_msg)
747 if isinstance(value, unicode):
748 return value.encode("utf-8")
749 assert isinstance(value, str)