]> arthur.barton.de Git - bup.git/blob - lib/bup/client.py
client: ensure server provides requested command
[bup.git] / lib / bup / client.py
1
2 import errno, os, re, struct, sys, time, zlib
3
4 from bup import git, ssh
5 from bup.helpers import (Conn, atomically_replaced_file, chunkyreader, debug1,
6                          debug2, linereader, mkdirp, progress, qprogress)
7
8
9 bwlimit = None
10
11
12 class ClientError(Exception):
13     pass
14
15
16 def _raw_write_bwlimit(f, buf, bwcount, bwtime):
17     if not bwlimit:
18         f.write(buf)
19         return (len(buf), time.time())
20     else:
21         # We want to write in reasonably large blocks, but not so large that
22         # they're likely to overflow a router's queue.  So our bwlimit timing
23         # has to be pretty granular.  Also, if it takes too long from one
24         # transmit to the next, we can't just make up for lost time to bring
25         # the average back up to bwlimit - that will risk overflowing the
26         # outbound queue, which defeats the purpose.  So if we fall behind
27         # by more than one block delay, we shouldn't ever try to catch up.
28         for i in xrange(0,len(buf),4096):
29             now = time.time()
30             next = max(now, bwtime + 1.0*bwcount/bwlimit)
31             time.sleep(next-now)
32             sub = buf[i:i+4096]
33             f.write(sub)
34             bwcount = len(sub)  # might be less than 4096
35             bwtime = next
36         return (bwcount, bwtime)
37
38
39 def parse_remote(remote):
40     protocol = r'([a-z]+)://'
41     host = r'(?P<sb>\[)?((?(sb)[0-9a-f:]+|[^:/]+))(?(sb)\])'
42     port = r'(?::(\d+))?'
43     path = r'(/.*)?'
44     url_match = re.match(
45             '%s(?:%s%s)?%s' % (protocol, host, port, path), remote, re.I)
46     if url_match:
47         if not url_match.group(1) in ('ssh', 'bup', 'file'):
48             raise ClientError, 'unexpected protocol: %s' % url_match.group(1)
49         return url_match.group(1,3,4,5)
50     else:
51         rs = remote.split(':', 1)
52         if len(rs) == 1 or rs[0] in ('', '-'):
53             return 'file', None, None, rs[-1]
54         else:
55             return 'ssh', rs[0], None, rs[1]
56
57
58 class Client:
59     def __init__(self, remote, create=False):
60         self._busy = self.conn = None
61         self.sock = self.p = self.pout = self.pin = None
62         is_reverse = os.environ.get('BUP_SERVER_REVERSE')
63         if is_reverse:
64             assert(not remote)
65             remote = '%s:' % is_reverse
66         (self.protocol, self.host, self.port, self.dir) = parse_remote(remote)
67         self.cachedir = git.repo('index-cache/%s'
68                                  % re.sub(r'[^@\w]', '_', 
69                                           "%s:%s" % (self.host, self.dir)))
70         if is_reverse:
71             self.pout = os.fdopen(3, 'rb')
72             self.pin = os.fdopen(4, 'wb')
73             self.conn = Conn(self.pout, self.pin)
74         else:
75             if self.protocol in ('ssh', 'file'):
76                 try:
77                     # FIXME: ssh and file shouldn't use the same module
78                     self.p = ssh.connect(self.host, self.port, 'server')
79                     self.pout = self.p.stdout
80                     self.pin = self.p.stdin
81                     self.conn = Conn(self.pout, self.pin)
82                 except OSError as e:
83                     raise ClientError, 'connect: %s' % e, sys.exc_info()[2]
84             elif self.protocol == 'bup':
85                 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
86                 self.sock.connect((self.host, atoi(self.port) or 1982))
87                 self.sockw = self.sock.makefile('wb')
88                 self.conn = DemuxConn(self.sock.fileno(), self.sockw)
89         self._available_commands = self._get_available_commands()
90         self._require_command('init-dir')
91         self._require_command('set-dir')
92         if self.dir:
93             self.dir = re.sub(r'[\r\n]', ' ', self.dir)
94             if create:
95                 self.conn.write('init-dir %s\n' % self.dir)
96             else:
97                 self.conn.write('set-dir %s\n' % self.dir)
98             self.check_ok()
99         self.sync_indexes()
100
101     def __del__(self):
102         try:
103             self.close()
104         except IOError as e:
105             if e.errno == errno.EPIPE:
106                 pass
107             else:
108                 raise
109
110     def close(self):
111         if self.conn and not self._busy:
112             self.conn.write('quit\n')
113         if self.pin:
114             self.pin.close()
115         if self.sock and self.sockw:
116             self.sockw.close()
117             self.sock.shutdown(socket.SHUT_WR)
118         if self.conn:
119             self.conn.close()
120         if self.pout:
121             self.pout.close()
122         if self.sock:
123             self.sock.close()
124         if self.p:
125             self.p.wait()
126             rv = self.p.wait()
127             if rv:
128                 raise ClientError('server tunnel returned exit code %d' % rv)
129         self.conn = None
130         self.sock = self.p = self.pin = self.pout = None
131
132     def check_ok(self):
133         if self.p:
134             rv = self.p.poll()
135             if rv != None:
136                 raise ClientError('server exited unexpectedly with code %r'
137                                   % rv)
138         try:
139             return self.conn.check_ok()
140         except Exception as e:
141             raise ClientError, e, sys.exc_info()[2]
142
143     def check_busy(self):
144         if self._busy:
145             raise ClientError('already busy with command %r' % self._busy)
146         
147     def ensure_busy(self):
148         if not self._busy:
149             raise ClientError('expected to be busy, but not busy?!')
150         
151     def _not_busy(self):
152         self._busy = None
153
154     def _get_available_commands(self):
155         self.check_busy()
156         self._busy = 'help'
157         conn = self.conn
158         conn.write('help\n')
159         result = set()
160         line = self.conn.readline()
161         if not line == 'Commands:\n':
162             raise ClientError('unexpected help header ' + repr(line))
163         while True:
164             line = self.conn.readline()
165             if line == '\n':
166                 break
167             if not line.startswith('    '):
168                 raise ClientError('unexpected help line ' + repr(line))
169             cmd = line.strip()
170             if not cmd:
171                 raise ClientError('unexpected help line ' + repr(line))
172             result.add(cmd)
173         # FIXME: confusing
174         not_ok = self.check_ok()
175         if not_ok:
176             raise not_ok
177         self._not_busy()
178         return frozenset(result)
179
180     def _require_command(self, name):
181         if name not in self._available_commands:
182             raise ClientError('server does not appear to provide %s command'
183                               % name)
184
185     def sync_indexes(self):
186         self._require_command('list-indexes')
187         self.check_busy()
188         conn = self.conn
189         mkdirp(self.cachedir)
190         # All cached idxs are extra until proven otherwise
191         extra = set()
192         for f in os.listdir(self.cachedir):
193             debug1('%s\n' % f)
194             if f.endswith('.idx'):
195                 extra.add(f)
196         needed = set()
197         conn.write('list-indexes\n')
198         for line in linereader(conn):
199             if not line:
200                 break
201             assert(line.find('/') < 0)
202             parts = line.split(' ')
203             idx = parts[0]
204             if len(parts) == 2 and parts[1] == 'load' and idx not in extra:
205                 # If the server requests that we load an idx and we don't
206                 # already have a copy of it, it is needed
207                 needed.add(idx)
208             # Any idx that the server has heard of is proven not extra
209             extra.discard(idx)
210
211         self.check_ok()
212         debug1('client: removing extra indexes: %s\n' % extra)
213         for idx in extra:
214             os.unlink(os.path.join(self.cachedir, idx))
215         debug1('client: server requested load of: %s\n' % needed)
216         for idx in needed:
217             self.sync_index(idx)
218         git.auto_midx(self.cachedir)
219
220     def sync_index(self, name):
221         self._require_command('send-index')
222         #debug1('requesting %r\n' % name)
223         self.check_busy()
224         mkdirp(self.cachedir)
225         fn = os.path.join(self.cachedir, name)
226         if os.path.exists(fn):
227             msg = "won't request existing .idx, try `bup bloom --check %s`" % fn
228             raise ClientError(msg)
229         self.conn.write('send-index %s\n' % name)
230         n = struct.unpack('!I', self.conn.read(4))[0]
231         assert(n)
232         with atomically_replaced_file(fn, 'w') as f:
233             count = 0
234             progress('Receiving index from server: %d/%d\r' % (count, n))
235             for b in chunkyreader(self.conn, n):
236                 f.write(b)
237                 count += len(b)
238                 qprogress('Receiving index from server: %d/%d\r' % (count, n))
239             progress('Receiving index from server: %d/%d, done.\n' % (count, n))
240             self.check_ok()
241
242     def _make_objcache(self):
243         return git.PackIdxList(self.cachedir)
244
245     def _suggest_packs(self):
246         ob = self._busy
247         if ob:
248             assert(ob == 'receive-objects-v2')
249             self.conn.write('\xff\xff\xff\xff')  # suspend receive-objects-v2
250         suggested = []
251         for line in linereader(self.conn):
252             if not line:
253                 break
254             debug2('%s\n' % line)
255             if line.startswith('index '):
256                 idx = line[6:]
257                 debug1('client: received index suggestion: %s\n'
258                        % git.shorten_hash(idx))
259                 suggested.append(idx)
260             else:
261                 assert(line.endswith('.idx'))
262                 debug1('client: completed writing pack, idx: %s\n'
263                        % git.shorten_hash(line))
264                 suggested.append(line)
265         self.check_ok()
266         if ob:
267             self._busy = None
268         idx = None
269         for idx in suggested:
270             self.sync_index(idx)
271         git.auto_midx(self.cachedir)
272         if ob:
273             self._busy = ob
274             self.conn.write('%s\n' % ob)
275         return idx
276
277     def new_packwriter(self, compression_level=1,
278                        max_pack_size=None, max_pack_objects=None):
279         self._require_command('receive-objects-v2')
280         self.check_busy()
281         def _set_busy():
282             self._busy = 'receive-objects-v2'
283             self.conn.write('receive-objects-v2\n')
284         return PackWriter_Remote(self.conn,
285                                  objcache_maker = self._make_objcache,
286                                  suggest_packs = self._suggest_packs,
287                                  onopen = _set_busy,
288                                  onclose = self._not_busy,
289                                  ensure_busy = self.ensure_busy,
290                                  compression_level=compression_level,
291                                  max_pack_size=max_pack_size,
292                                  max_pack_objects=max_pack_objects)
293
294     def read_ref(self, refname):
295         self._require_command('read-ref')
296         self.check_busy()
297         self.conn.write('read-ref %s\n' % refname)
298         r = self.conn.readline().strip()
299         self.check_ok()
300         if r:
301             assert(len(r) == 40)   # hexified sha
302             return r.decode('hex')
303         else:
304             return None   # nonexistent ref
305
306     def update_ref(self, refname, newval, oldval):
307         self._require_command('update-ref')
308         self.check_busy()
309         self.conn.write('update-ref %s\n%s\n%s\n' 
310                         % (refname, newval.encode('hex'),
311                            (oldval or '').encode('hex')))
312         self.check_ok()
313
314     def cat(self, id):
315         self._require_command('cat')
316         self.check_busy()
317         self._busy = 'cat'
318         self.conn.write('cat %s\n' % re.sub(r'[\n\r]', '_', id))
319         while 1:
320             sz = struct.unpack('!I', self.conn.read(4))[0]
321             if not sz: break
322             yield self.conn.read(sz)
323         e = self.check_ok()
324         self._not_busy()
325         if e:
326             raise KeyError(str(e))
327
328
329 class PackWriter_Remote(git.PackWriter):
330     def __init__(self, conn, objcache_maker, suggest_packs,
331                  onopen, onclose,
332                  ensure_busy,
333                  compression_level=1,
334                  max_pack_size=None,
335                  max_pack_objects=None):
336         git.PackWriter.__init__(self,
337                                 objcache_maker=objcache_maker,
338                                 compression_level=compression_level,
339                                 max_pack_size=max_pack_size,
340                                 max_pack_objects=max_pack_objects)
341         self.file = conn
342         self.filename = 'remote socket'
343         self.suggest_packs = suggest_packs
344         self.onopen = onopen
345         self.onclose = onclose
346         self.ensure_busy = ensure_busy
347         self._packopen = False
348         self._bwcount = 0
349         self._bwtime = time.time()
350
351     def _open(self):
352         if not self._packopen:
353             self.onopen()
354             self._packopen = True
355
356     def _end(self, run_midx=True):
357         assert(run_midx)  # We don't support this via remote yet
358         if self._packopen and self.file:
359             self.file.write('\0\0\0\0')
360             self._packopen = False
361             self.onclose() # Unbusy
362             self.objcache = None
363             return self.suggest_packs() # Returns last idx received
364
365     def close(self):
366         id = self._end()
367         self.file = None
368         return id
369
370     def abort(self):
371         raise ClientError("don't know how to abort remote pack writing")
372
373     def _raw_write(self, datalist, sha):
374         assert(self.file)
375         if not self._packopen:
376             self._open()
377         self.ensure_busy()
378         data = ''.join(datalist)
379         assert(data)
380         assert(sha)
381         crc = zlib.crc32(data) & 0xffffffff
382         outbuf = ''.join((struct.pack('!I', len(data) + 20 + 4),
383                           sha,
384                           struct.pack('!I', crc),
385                           data))
386         try:
387             (self._bwcount, self._bwtime) = _raw_write_bwlimit(
388                     self.file, outbuf, self._bwcount, self._bwtime)
389         except IOError as e:
390             raise ClientError, e, sys.exc_info()[2]
391         self.outbytes += len(data)
392         self.count += 1
393
394         if self.file.has_input():
395             self.suggest_packs()
396             self.objcache.refresh()
397
398         return sha, crc