]> arthur.barton.de Git - bup.git/blob - lib/bup/client.py
8c364d19b401ddfa5368716f7c3d722f7c0a0bed
[bup.git] / lib / bup / client.py
1
2 from __future__ import absolute_import
3 import errno, os, re, struct, sys, time, zlib
4
5 from bup import git, ssh
6 from bup.helpers import (Conn, atomically_replaced_file, chunkyreader, debug1,
7                          debug2, linereader, lines_until_sentinel,
8                          mkdirp, progress, qprogress)
9
10
11 bwlimit = None
12
13
14 class ClientError(Exception):
15     pass
16
17
18 def _raw_write_bwlimit(f, buf, bwcount, bwtime):
19     if not bwlimit:
20         f.write(buf)
21         return (len(buf), time.time())
22     else:
23         # We want to write in reasonably large blocks, but not so large that
24         # they're likely to overflow a router's queue.  So our bwlimit timing
25         # has to be pretty granular.  Also, if it takes too long from one
26         # transmit to the next, we can't just make up for lost time to bring
27         # the average back up to bwlimit - that will risk overflowing the
28         # outbound queue, which defeats the purpose.  So if we fall behind
29         # by more than one block delay, we shouldn't ever try to catch up.
30         for i in xrange(0,len(buf),4096):
31             now = time.time()
32             next = max(now, bwtime + 1.0*bwcount/bwlimit)
33             time.sleep(next-now)
34             sub = buf[i:i+4096]
35             f.write(sub)
36             bwcount = len(sub)  # might be less than 4096
37             bwtime = next
38         return (bwcount, bwtime)
39
40
41 def parse_remote(remote):
42     protocol = r'([a-z]+)://'
43     host = r'(?P<sb>\[)?((?(sb)[0-9a-f:]+|[^:/]+))(?(sb)\])'
44     port = r'(?::(\d+))?'
45     path = r'(/.*)?'
46     url_match = re.match(
47             '%s(?:%s%s)?%s' % (protocol, host, port, path), remote, re.I)
48     if url_match:
49         if not url_match.group(1) in ('ssh', 'bup', 'file'):
50             raise ClientError, 'unexpected protocol: %s' % url_match.group(1)
51         return url_match.group(1,3,4,5)
52     else:
53         rs = remote.split(':', 1)
54         if len(rs) == 1 or rs[0] in ('', '-'):
55             return 'file', None, None, rs[-1]
56         else:
57             return 'ssh', rs[0], None, rs[1]
58
59
60 class Client:
61     def __init__(self, remote, create=False):
62         self._busy = self.conn = None
63         self.sock = self.p = self.pout = self.pin = None
64         is_reverse = os.environ.get('BUP_SERVER_REVERSE')
65         if is_reverse:
66             assert(not remote)
67             remote = '%s:' % is_reverse
68         (self.protocol, self.host, self.port, self.dir) = parse_remote(remote)
69         self.cachedir = git.repo('index-cache/%s'
70                                  % re.sub(r'[^@\w]', '_', 
71                                           "%s:%s" % (self.host, self.dir)))
72         if is_reverse:
73             self.pout = os.fdopen(3, 'rb')
74             self.pin = os.fdopen(4, 'wb')
75             self.conn = Conn(self.pout, self.pin)
76         else:
77             if self.protocol in ('ssh', 'file'):
78                 try:
79                     # FIXME: ssh and file shouldn't use the same module
80                     self.p = ssh.connect(self.host, self.port, 'server')
81                     self.pout = self.p.stdout
82                     self.pin = self.p.stdin
83                     self.conn = Conn(self.pout, self.pin)
84                 except OSError as e:
85                     raise ClientError, 'connect: %s' % e, sys.exc_info()[2]
86             elif self.protocol == 'bup':
87                 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
88                 self.sock.connect((self.host, atoi(self.port) or 1982))
89                 self.sockw = self.sock.makefile('wb')
90                 self.conn = DemuxConn(self.sock.fileno(), self.sockw)
91         self._available_commands = self._get_available_commands()
92         self._require_command('init-dir')
93         self._require_command('set-dir')
94         if self.dir:
95             self.dir = re.sub(r'[\r\n]', ' ', self.dir)
96             if create:
97                 self.conn.write('init-dir %s\n' % self.dir)
98             else:
99                 self.conn.write('set-dir %s\n' % self.dir)
100             self.check_ok()
101         self.sync_indexes()
102
103     def __del__(self):
104         try:
105             self.close()
106         except IOError as e:
107             if e.errno == errno.EPIPE:
108                 pass
109             else:
110                 raise
111
112     def close(self):
113         if self.conn and not self._busy:
114             self.conn.write('quit\n')
115         if self.pin:
116             self.pin.close()
117         if self.sock and self.sockw:
118             self.sockw.close()
119             self.sock.shutdown(socket.SHUT_WR)
120         if self.conn:
121             self.conn.close()
122         if self.pout:
123             self.pout.close()
124         if self.sock:
125             self.sock.close()
126         if self.p:
127             self.p.wait()
128             rv = self.p.wait()
129             if rv:
130                 raise ClientError('server tunnel returned exit code %d' % rv)
131         self.conn = None
132         self.sock = self.p = self.pin = self.pout = None
133
134     def check_ok(self):
135         if self.p:
136             rv = self.p.poll()
137             if rv != None:
138                 raise ClientError('server exited unexpectedly with code %r'
139                                   % rv)
140         try:
141             return self.conn.check_ok()
142         except Exception as e:
143             raise ClientError, e, sys.exc_info()[2]
144
145     def check_busy(self):
146         if self._busy:
147             raise ClientError('already busy with command %r' % self._busy)
148         
149     def ensure_busy(self):
150         if not self._busy:
151             raise ClientError('expected to be busy, but not busy?!')
152         
153     def _not_busy(self):
154         self._busy = None
155
156     def _get_available_commands(self):
157         self.check_busy()
158         self._busy = 'help'
159         conn = self.conn
160         conn.write('help\n')
161         result = set()
162         line = self.conn.readline()
163         if not line == 'Commands:\n':
164             raise ClientError('unexpected help header ' + repr(line))
165         while True:
166             line = self.conn.readline()
167             if line == '\n':
168                 break
169             if not line.startswith('    '):
170                 raise ClientError('unexpected help line ' + repr(line))
171             cmd = line.strip()
172             if not cmd:
173                 raise ClientError('unexpected help line ' + repr(line))
174             result.add(cmd)
175         # FIXME: confusing
176         not_ok = self.check_ok()
177         if not_ok:
178             raise not_ok
179         self._not_busy()
180         return frozenset(result)
181
182     def _require_command(self, name):
183         if name not in self._available_commands:
184             raise ClientError('server does not appear to provide %s command'
185                               % name)
186
187     def sync_indexes(self):
188         self._require_command('list-indexes')
189         self.check_busy()
190         conn = self.conn
191         mkdirp(self.cachedir)
192         # All cached idxs are extra until proven otherwise
193         extra = set()
194         for f in os.listdir(self.cachedir):
195             debug1('%s\n' % f)
196             if f.endswith('.idx'):
197                 extra.add(f)
198         needed = set()
199         conn.write('list-indexes\n')
200         for line in linereader(conn):
201             if not line:
202                 break
203             assert(line.find('/') < 0)
204             parts = line.split(' ')
205             idx = parts[0]
206             if len(parts) == 2 and parts[1] == 'load' and idx not in extra:
207                 # If the server requests that we load an idx and we don't
208                 # already have a copy of it, it is needed
209                 needed.add(idx)
210             # Any idx that the server has heard of is proven not extra
211             extra.discard(idx)
212
213         self.check_ok()
214         debug1('client: removing extra indexes: %s\n' % extra)
215         for idx in extra:
216             os.unlink(os.path.join(self.cachedir, idx))
217         debug1('client: server requested load of: %s\n' % needed)
218         for idx in needed:
219             self.sync_index(idx)
220         git.auto_midx(self.cachedir)
221
222     def sync_index(self, name):
223         self._require_command('send-index')
224         #debug1('requesting %r\n' % name)
225         self.check_busy()
226         mkdirp(self.cachedir)
227         fn = os.path.join(self.cachedir, name)
228         if os.path.exists(fn):
229             msg = "won't request existing .idx, try `bup bloom --check %s`" % fn
230             raise ClientError(msg)
231         self.conn.write('send-index %s\n' % name)
232         n = struct.unpack('!I', self.conn.read(4))[0]
233         assert(n)
234         with atomically_replaced_file(fn, 'w') as f:
235             count = 0
236             progress('Receiving index from server: %d/%d\r' % (count, n))
237             for b in chunkyreader(self.conn, n):
238                 f.write(b)
239                 count += len(b)
240                 qprogress('Receiving index from server: %d/%d\r' % (count, n))
241             progress('Receiving index from server: %d/%d, done.\n' % (count, n))
242             self.check_ok()
243
244     def _make_objcache(self):
245         return git.PackIdxList(self.cachedir)
246
247     def _suggest_packs(self):
248         ob = self._busy
249         if ob:
250             assert(ob == 'receive-objects-v2')
251             self.conn.write('\xff\xff\xff\xff')  # suspend receive-objects-v2
252         suggested = []
253         for line in linereader(self.conn):
254             if not line:
255                 break
256             debug2('%s\n' % line)
257             if line.startswith('index '):
258                 idx = line[6:]
259                 debug1('client: received index suggestion: %s\n'
260                        % git.shorten_hash(idx))
261                 suggested.append(idx)
262             else:
263                 assert(line.endswith('.idx'))
264                 debug1('client: completed writing pack, idx: %s\n'
265                        % git.shorten_hash(line))
266                 suggested.append(line)
267         self.check_ok()
268         if ob:
269             self._busy = None
270         idx = None
271         for idx in suggested:
272             self.sync_index(idx)
273         git.auto_midx(self.cachedir)
274         if ob:
275             self._busy = ob
276             self.conn.write('%s\n' % ob)
277         return idx
278
279     def new_packwriter(self, compression_level=1,
280                        max_pack_size=None, max_pack_objects=None):
281         self._require_command('receive-objects-v2')
282         self.check_busy()
283         def _set_busy():
284             self._busy = 'receive-objects-v2'
285             self.conn.write('receive-objects-v2\n')
286         return PackWriter_Remote(self.conn,
287                                  objcache_maker = self._make_objcache,
288                                  suggest_packs = self._suggest_packs,
289                                  onopen = _set_busy,
290                                  onclose = self._not_busy,
291                                  ensure_busy = self.ensure_busy,
292                                  compression_level=compression_level,
293                                  max_pack_size=max_pack_size,
294                                  max_pack_objects=max_pack_objects)
295
296     def read_ref(self, refname):
297         self._require_command('read-ref')
298         self.check_busy()
299         self.conn.write('read-ref %s\n' % refname)
300         r = self.conn.readline().strip()
301         self.check_ok()
302         if r:
303             assert(len(r) == 40)   # hexified sha
304             return r.decode('hex')
305         else:
306             return None   # nonexistent ref
307
308     def update_ref(self, refname, newval, oldval):
309         self._require_command('update-ref')
310         self.check_busy()
311         self.conn.write('update-ref %s\n%s\n%s\n' 
312                         % (refname, newval.encode('hex'),
313                            (oldval or '').encode('hex')))
314         self.check_ok()
315
316     def join(self, id):
317         self._require_command('join')
318         self.check_busy()
319         self._busy = 'join'
320         # Send 'cat' so we'll work fine with older versions
321         self.conn.write('cat %s\n' % re.sub(r'[\n\r]', '_', id))
322         while 1:
323             sz = struct.unpack('!I', self.conn.read(4))[0]
324             if not sz: break
325             yield self.conn.read(sz)
326         # FIXME: ok to assume the only NotOk is a KerError? (it is true atm)
327         e = self.check_ok()
328         self._not_busy()
329         if e:
330             raise KeyError(str(e))
331
332     def cat_batch(self, refs):
333         self._require_command('cat-batch')
334         self.check_busy()
335         self._busy = 'cat-batch'
336         conn = self.conn
337         conn.write('cat-batch\n')
338         # FIXME: do we want (only) binary protocol?
339         for ref in refs:
340             assert ref
341             assert '\n' not in ref
342             conn.write(ref)
343             conn.write('\n')
344         conn.write('\n')
345         for ref in refs:
346             info = conn.readline()
347             if info == 'missing\n':
348                 yield None, None, None, None
349                 continue
350             if not (info and info.endswith('\n')):
351                 raise ClientError('Hit EOF while looking for object info: %r'
352                                   % info)
353             oidx, oid_t, size = info.split(' ')
354             size = int(size)
355             cr = chunkyreader(conn, size)
356             yield oidx, oid_t, size, cr
357             detritus = next(cr, None)
358             if detritus:
359                 raise ClientError('unexpected leftover data ' + repr(detritus))
360         # FIXME: confusing
361         not_ok = self.check_ok()
362         if not_ok:
363             raise not_ok
364         self._not_busy()
365
366     def refs(self, patterns=None, limit_to_heads=False, limit_to_tags=False):
367         patterns = patterns or tuple()
368         self._require_command('refs')
369         self.check_busy()
370         self._busy = 'refs'
371         conn = self.conn
372         conn.write('refs %s %s\n' % (1 if limit_to_heads else 0,
373                                      1 if limit_to_tags else 0))
374         for pattern in patterns:
375             assert '\n' not in pattern
376             conn.write(pattern)
377             conn.write('\n')
378         conn.write('\n')
379         for line in lines_until_sentinel(conn, '\n', ClientError):
380             line = line[:-1]
381             oidx, name = line.split(' ')
382             if len(oidx) != 40:
383                 raise ClientError('Invalid object fingerprint in %r' % line)
384             if not name:
385                 raise ClientError('Invalid reference name in %r' % line)
386             yield name, oidx.decode('hex')
387         # FIXME: confusing
388         not_ok = self.check_ok()
389         if not_ok:
390             raise not_ok
391         self._not_busy()
392
393     def rev_list(self, refs, count=None, parse=None, format=None):
394         self._require_command('rev-list')
395         assert (count is None) or (isinstance(count, Integral))
396         if format:
397             assert '\n' not in format
398             assert parse
399         for ref in refs:
400             assert ref
401             assert '\n' not in ref
402         self.check_busy()
403         self._busy = 'rev-list'
404         conn = self.conn
405         conn.write('rev-list\n')
406         if count is not None:
407             conn.write(str(count))
408         conn.write('\n')
409         if format:
410             conn.write(format)
411         conn.write('\n')
412         for ref in refs:
413             conn.write(ref)
414             conn.write('\n')
415         conn.write('\n')
416         if not format:
417             for _ in xrange(len(refs)):
418                 line = conn.readline()
419                 if not line:
420                     raise ClientError('unexpected EOF')
421                 line = line.strip()
422                 assert len(line) == 40
423                 yield line
424         else:
425             for _ in xrange(len(refs)):
426                 line = conn.readline()
427                 if not line:
428                     raise ClientError('unexpected EOF')
429                 if not line.startswith('commit '):
430                     raise ClientError('unexpected line ' + repr(line))
431                 yield line[7:].strip(), parse(conn)
432         # FIXME: confusing
433         not_ok = self.check_ok()
434         if not_ok:
435             raise not_ok
436         self._not_busy()
437
438
439 class PackWriter_Remote(git.PackWriter):
440     def __init__(self, conn, objcache_maker, suggest_packs,
441                  onopen, onclose,
442                  ensure_busy,
443                  compression_level=1,
444                  max_pack_size=None,
445                  max_pack_objects=None):
446         git.PackWriter.__init__(self,
447                                 objcache_maker=objcache_maker,
448                                 compression_level=compression_level,
449                                 max_pack_size=max_pack_size,
450                                 max_pack_objects=max_pack_objects)
451         self.file = conn
452         self.filename = 'remote socket'
453         self.suggest_packs = suggest_packs
454         self.onopen = onopen
455         self.onclose = onclose
456         self.ensure_busy = ensure_busy
457         self._packopen = False
458         self._bwcount = 0
459         self._bwtime = time.time()
460
461     def _open(self):
462         if not self._packopen:
463             self.onopen()
464             self._packopen = True
465
466     def _end(self, run_midx=True):
467         assert(run_midx)  # We don't support this via remote yet
468         if self._packopen and self.file:
469             self.file.write('\0\0\0\0')
470             self._packopen = False
471             self.onclose() # Unbusy
472             self.objcache = None
473             return self.suggest_packs() # Returns last idx received
474
475     def close(self):
476         id = self._end()
477         self.file = None
478         return id
479
480     def abort(self):
481         raise ClientError("don't know how to abort remote pack writing")
482
483     def _raw_write(self, datalist, sha):
484         assert(self.file)
485         if not self._packopen:
486             self._open()
487         self.ensure_busy()
488         data = ''.join(datalist)
489         assert(data)
490         assert(sha)
491         crc = zlib.crc32(data) & 0xffffffff
492         outbuf = ''.join((struct.pack('!I', len(data) + 20 + 4),
493                           sha,
494                           struct.pack('!I', crc),
495                           data))
496         try:
497             (self._bwcount, self._bwtime) = _raw_write_bwlimit(
498                     self.file, outbuf, self._bwcount, self._bwtime)
499         except IOError as e:
500             raise ClientError, e, sys.exc_info()[2]
501         self.outbytes += len(data)
502         self.count += 1
503
504         if self.file.has_input():
505             self.suggest_packs()
506             self.objcache.refresh()
507
508         return sha, crc