]> arthur.barton.de Git - bup.git/blob - lib/tornado/iostream.py
Always publish (l)utimes in helpers when available and fix type conversions.
[bup.git] / lib / tornado / iostream.py
1 #!/usr/bin/env python
2 #
3 # Copyright 2009 Facebook
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License"); you may
6 # not use this file except in compliance with the License. You may obtain
7 # a copy of the License at
8 #
9 #     http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14 # License for the specific language governing permissions and limitations
15 # under the License.
16
17 """A utility class to write to and read from a non-blocking socket."""
18
19 import errno
20 import ioloop
21 import logging
22 import socket
23
24 class IOStream(object):
25     """A utility class to write to and read from a non-blocking socket.
26
27     We support three methods: write(), read_until(), and read_bytes().
28     All of the methods take callbacks (since writing and reading are
29     non-blocking and asynchronous). read_until() reads the socket until
30     a given delimiter, and read_bytes() reads until a specified number
31     of bytes have been read from the socket.
32
33     A very simple (and broken) HTTP client using this class:
34
35         import ioloop
36         import iostream
37         import socket
38
39         s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
40         s.connect(("friendfeed.com", 80))
41         stream = IOStream(s)
42
43         def on_headers(data):
44             headers = {}
45             for line in data.split("\r\n"):
46                parts = line.split(":")
47                if len(parts) == 2:
48                    headers[parts[0].strip()] = parts[1].strip()
49             stream.read_bytes(int(headers["Content-Length"]), on_body)
50
51         def on_body(data):
52             print data
53             stream.close()
54             ioloop.IOLoop.instance().stop()
55
56         stream.write("GET / HTTP/1.0\r\n\r\n")
57         stream.read_until("\r\n\r\n", on_headers)
58         ioloop.IOLoop.instance().start()
59
60     """
61     def __init__(self, socket, io_loop=None, max_buffer_size=104857600,
62                  read_chunk_size=4096):
63         self.socket = socket
64         self.socket.setblocking(False)
65         self.io_loop = io_loop or ioloop.IOLoop.instance()
66         self.max_buffer_size = max_buffer_size
67         self.read_chunk_size = read_chunk_size
68         self._read_buffer = ""
69         self._write_buffer = ""
70         self._read_delimiter = None
71         self._read_bytes = None
72         self._read_callback = None
73         self._write_callback = None
74         self._close_callback = None
75         self._state = self.io_loop.ERROR
76         self.io_loop.add_handler(
77             self.socket.fileno(), self._handle_events, self._state)
78
79     def read_until(self, delimiter, callback):
80         """Call callback when we read the given delimiter."""
81         assert not self._read_callback, "Already reading"
82         loc = self._read_buffer.find(delimiter)
83         if loc != -1:
84             self._run_callback(callback, self._consume(loc + len(delimiter)))
85             return
86         self._check_closed()
87         self._read_delimiter = delimiter
88         self._read_callback = callback
89         self._add_io_state(self.io_loop.READ)
90
91     def read_bytes(self, num_bytes, callback):
92         """Call callback when we read the given number of bytes."""
93         assert not self._read_callback, "Already reading"
94         if len(self._read_buffer) >= num_bytes:
95             callback(self._consume(num_bytes))
96             return
97         self._check_closed()
98         self._read_bytes = num_bytes
99         self._read_callback = callback
100         self._add_io_state(self.io_loop.READ)
101
102     def write(self, data, callback=None):
103         """Write the given data to this stream.
104
105         If callback is given, we call it when all of the buffered write
106         data has been successfully written to the stream. If there was
107         previously buffered write data and an old write callback, that
108         callback is simply overwritten with this new callback.
109         """
110         self._check_closed()
111         self._write_buffer += data
112         self._add_io_state(self.io_loop.WRITE)
113         self._write_callback = callback
114
115     def set_close_callback(self, callback):
116         """Call the given callback when the stream is closed."""
117         self._close_callback = callback
118
119     def close(self):
120         """Close this stream."""
121         if self.socket is not None:
122             self.io_loop.remove_handler(self.socket.fileno())
123             self.socket.close()
124             self.socket = None
125             if self._close_callback:
126                 self._run_callback(self._close_callback)
127
128     def reading(self):
129         """Returns true if we are currently reading from the stream."""
130         return self._read_callback is not None
131
132     def writing(self):
133         """Returns true if we are currently writing to the stream."""
134         return len(self._write_buffer) > 0
135
136     def closed(self):
137         return self.socket is None
138
139     def _handle_events(self, fd, events):
140         if not self.socket:
141             logging.warning("Got events for closed stream %d", fd)
142             return
143         if events & self.io_loop.READ:
144             self._handle_read()
145         if not self.socket:
146             return
147         if events & self.io_loop.WRITE:
148             self._handle_write()
149         if not self.socket:
150             return
151         if events & self.io_loop.ERROR:
152             self.close()
153             return
154         state = self.io_loop.ERROR
155         if self._read_delimiter or self._read_bytes:
156             state |= self.io_loop.READ
157         if self._write_buffer:
158             state |= self.io_loop.WRITE
159         if state != self._state:
160             self._state = state
161             self.io_loop.update_handler(self.socket.fileno(), self._state)
162
163     def _run_callback(self, callback, *args, **kwargs):
164         try:
165             callback(*args, **kwargs)
166         except:
167             # Close the socket on an uncaught exception from a user callback
168             # (It would eventually get closed when the socket object is
169             # gc'd, but we don't want to rely on gc happening before we
170             # run out of file descriptors)
171             self.close()
172             # Re-raise the exception so that IOLoop.handle_callback_exception
173             # can see it and log the error
174             raise
175
176     def _handle_read(self):
177         try:
178             chunk = self.socket.recv(self.read_chunk_size)
179         except socket.error, e:
180             if e[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
181                 return
182             else:
183                 logging.warning("Read error on %d: %s",
184                                 self.socket.fileno(), e)
185                 self.close()
186                 return
187         if not chunk:
188             self.close()
189             return
190         self._read_buffer += chunk
191         if len(self._read_buffer) >= self.max_buffer_size:
192             logging.error("Reached maximum read buffer size")
193             self.close()
194             return
195         if self._read_bytes:
196             if len(self._read_buffer) >= self._read_bytes:
197                 num_bytes = self._read_bytes
198                 callback = self._read_callback
199                 self._read_callback = None
200                 self._read_bytes = None
201                 self._run_callback(callback, self._consume(num_bytes))
202         elif self._read_delimiter:
203             loc = self._read_buffer.find(self._read_delimiter)
204             if loc != -1:
205                 callback = self._read_callback
206                 delimiter_len = len(self._read_delimiter)
207                 self._read_callback = None
208                 self._read_delimiter = None
209                 self._run_callback(callback,
210                                    self._consume(loc + delimiter_len))
211
212     def _handle_write(self):
213         while self._write_buffer:
214             try:
215                 num_bytes = self.socket.send(self._write_buffer)
216                 self._write_buffer = self._write_buffer[num_bytes:]
217             except socket.error, e:
218                 if e[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
219                     break
220                 else:
221                     logging.warning("Write error on %d: %s",
222                                     self.socket.fileno(), e)
223                     self.close()
224                     return
225         if not self._write_buffer and self._write_callback:
226             callback = self._write_callback
227             self._write_callback = None
228             self._run_callback(callback)
229
230     def _consume(self, loc):
231         result = self._read_buffer[:loc]
232         self._read_buffer = self._read_buffer[loc:]
233         return result
234
235     def _check_closed(self):
236         if not self.socket:
237             raise IOError("Stream is closed")
238
239     def _add_io_state(self, state):
240         if not self._state & state:
241             self._state = self._state | state
242             self.io_loop.update_handler(self.socket.fileno(), self._state)