3 # Copyright 2009 Facebook
5 # Licensed under the Apache License, Version 2.0 (the "License"); you may
6 # not use this file except in compliance with the License. You may obtain
7 # a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14 # License for the specific language governing permissions and limitations
17 """A utility class to write to and read from a non-blocking socket."""
24 class IOStream(object):
25 """A utility class to write to and read from a non-blocking socket.
27 We support three methods: write(), read_until(), and read_bytes().
28 All of the methods take callbacks (since writing and reading are
29 non-blocking and asynchronous). read_until() reads the socket until
30 a given delimiter, and read_bytes() reads until a specified number
31 of bytes have been read from the socket.
33 A very simple (and broken) HTTP client using this class:
39 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
40 s.connect(("friendfeed.com", 80))
45 for line in data.split("\r\n"):
46 parts = line.split(":")
48 headers[parts[0].strip()] = parts[1].strip()
49 stream.read_bytes(int(headers["Content-Length"]), on_body)
54 ioloop.IOLoop.instance().stop()
56 stream.write("GET / HTTP/1.0\r\n\r\n")
57 stream.read_until("\r\n\r\n", on_headers)
58 ioloop.IOLoop.instance().start()
61 def __init__(self, socket, io_loop=None, max_buffer_size=104857600,
62 read_chunk_size=4096):
64 self.socket.setblocking(False)
65 self.io_loop = io_loop or ioloop.IOLoop.instance()
66 self.max_buffer_size = max_buffer_size
67 self.read_chunk_size = read_chunk_size
68 self._read_buffer = ""
69 self._write_buffer = ""
70 self._read_delimiter = None
71 self._read_bytes = None
72 self._read_callback = None
73 self._write_callback = None
74 self._close_callback = None
75 self._state = self.io_loop.ERROR
76 self.io_loop.add_handler(
77 self.socket.fileno(), self._handle_events, self._state)
79 def read_until(self, delimiter, callback):
80 """Call callback when we read the given delimiter."""
81 assert not self._read_callback, "Already reading"
82 loc = self._read_buffer.find(delimiter)
84 self._run_callback(callback, self._consume(loc + len(delimiter)))
87 self._read_delimiter = delimiter
88 self._read_callback = callback
89 self._add_io_state(self.io_loop.READ)
91 def read_bytes(self, num_bytes, callback):
92 """Call callback when we read the given number of bytes."""
93 assert not self._read_callback, "Already reading"
94 if len(self._read_buffer) >= num_bytes:
95 callback(self._consume(num_bytes))
98 self._read_bytes = num_bytes
99 self._read_callback = callback
100 self._add_io_state(self.io_loop.READ)
102 def write(self, data, callback=None):
103 """Write the given data to this stream.
105 If callback is given, we call it when all of the buffered write
106 data has been successfully written to the stream. If there was
107 previously buffered write data and an old write callback, that
108 callback is simply overwritten with this new callback.
111 self._write_buffer += data
112 self._add_io_state(self.io_loop.WRITE)
113 self._write_callback = callback
115 def set_close_callback(self, callback):
116 """Call the given callback when the stream is closed."""
117 self._close_callback = callback
120 """Close this stream."""
121 if self.socket is not None:
122 self.io_loop.remove_handler(self.socket.fileno())
125 if self._close_callback:
126 self._run_callback(self._close_callback)
129 """Returns true if we are currently reading from the stream."""
130 return self._read_callback is not None
133 """Returns true if we are currently writing to the stream."""
134 return len(self._write_buffer) > 0
137 return self.socket is None
139 def _handle_events(self, fd, events):
141 logging.warning("Got events for closed stream %d", fd)
143 if events & self.io_loop.READ:
147 if events & self.io_loop.WRITE:
151 if events & self.io_loop.ERROR:
154 state = self.io_loop.ERROR
155 if self._read_delimiter or self._read_bytes:
156 state |= self.io_loop.READ
157 if self._write_buffer:
158 state |= self.io_loop.WRITE
159 if state != self._state:
161 self.io_loop.update_handler(self.socket.fileno(), self._state)
163 def _run_callback(self, callback, *args, **kwargs):
165 callback(*args, **kwargs)
167 # Close the socket on an uncaught exception from a user callback
168 # (It would eventually get closed when the socket object is
169 # gc'd, but we don't want to rely on gc happening before we
170 # run out of file descriptors)
172 # Re-raise the exception so that IOLoop.handle_callback_exception
173 # can see it and log the error
176 def _handle_read(self):
178 chunk = self.socket.recv(self.read_chunk_size)
179 except socket.error, e:
180 if e[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
183 logging.warning("Read error on %d: %s",
184 self.socket.fileno(), e)
190 self._read_buffer += chunk
191 if len(self._read_buffer) >= self.max_buffer_size:
192 logging.error("Reached maximum read buffer size")
196 if len(self._read_buffer) >= self._read_bytes:
197 num_bytes = self._read_bytes
198 callback = self._read_callback
199 self._read_callback = None
200 self._read_bytes = None
201 self._run_callback(callback, self._consume(num_bytes))
202 elif self._read_delimiter:
203 loc = self._read_buffer.find(self._read_delimiter)
205 callback = self._read_callback
206 delimiter_len = len(self._read_delimiter)
207 self._read_callback = None
208 self._read_delimiter = None
209 self._run_callback(callback,
210 self._consume(loc + delimiter_len))
212 def _handle_write(self):
213 while self._write_buffer:
215 num_bytes = self.socket.send(self._write_buffer)
216 self._write_buffer = self._write_buffer[num_bytes:]
217 except socket.error, e:
218 if e[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
221 logging.warning("Write error on %d: %s",
222 self.socket.fileno(), e)
225 if not self._write_buffer and self._write_callback:
226 callback = self._write_callback
227 self._write_callback = None
228 self._run_callback(callback)
230 def _consume(self, loc):
231 result = self._read_buffer[:loc]
232 self._read_buffer = self._read_buffer[loc:]
235 def _check_closed(self):
237 raise IOError("Stream is closed")
239 def _add_io_state(self, state):
240 if not self._state & state:
241 self._state = self._state | state
242 self.io_loop.update_handler(self.socket.fileno(), self._state)