]> arthur.barton.de Git - bup.git/blob - lib/bup/client.py
client: import DemuxConn
[bup.git] / lib / bup / client.py
1
2 from __future__ import absolute_import
3 import errno, os, re, struct, sys, time, zlib
4
5 from bup import git, ssh, vfs
6 from bup.compat import range
7 from bup.helpers import (Conn, DemuxConn,
8                          atomically_replaced_file, chunkyreader, debug1,
9                          debug2, linereader, lines_until_sentinel,
10                          mkdirp, progress, qprogress)
11 from bup.vint import read_bvec, read_vuint, write_bvec
12
13
14 bwlimit = None
15
16
17 class ClientError(Exception):
18     pass
19
20
21 def _raw_write_bwlimit(f, buf, bwcount, bwtime):
22     if not bwlimit:
23         f.write(buf)
24         return (len(buf), time.time())
25     else:
26         # We want to write in reasonably large blocks, but not so large that
27         # they're likely to overflow a router's queue.  So our bwlimit timing
28         # has to be pretty granular.  Also, if it takes too long from one
29         # transmit to the next, we can't just make up for lost time to bring
30         # the average back up to bwlimit - that will risk overflowing the
31         # outbound queue, which defeats the purpose.  So if we fall behind
32         # by more than one block delay, we shouldn't ever try to catch up.
33         for i in range(0,len(buf),4096):
34             now = time.time()
35             next = max(now, bwtime + 1.0*bwcount/bwlimit)
36             time.sleep(next-now)
37             sub = buf[i:i+4096]
38             f.write(sub)
39             bwcount = len(sub)  # might be less than 4096
40             bwtime = next
41         return (bwcount, bwtime)
42
43
44 def parse_remote(remote):
45     protocol = r'([a-z]+)://'
46     host = r'(?P<sb>\[)?((?(sb)[0-9a-f:]+|[^:/]+))(?(sb)\])'
47     port = r'(?::(\d+))?'
48     path = r'(/.*)?'
49     url_match = re.match(
50             '%s(?:%s%s)?%s' % (protocol, host, port, path), remote, re.I)
51     if url_match:
52         if not url_match.group(1) in ('ssh', 'bup', 'file'):
53             raise ClientError, 'unexpected protocol: %s' % url_match.group(1)
54         return url_match.group(1,3,4,5)
55     else:
56         rs = remote.split(':', 1)
57         if len(rs) == 1 or rs[0] in ('', '-'):
58             return 'file', None, None, rs[-1]
59         else:
60             return 'ssh', rs[0], None, rs[1]
61
62
63 class Client:
64     def __init__(self, remote, create=False):
65         self._busy = self.conn = None
66         self.sock = self.p = self.pout = self.pin = None
67         is_reverse = os.environ.get('BUP_SERVER_REVERSE')
68         if is_reverse:
69             assert(not remote)
70             remote = '%s:' % is_reverse
71         (self.protocol, self.host, self.port, self.dir) = parse_remote(remote)
72         self.cachedir = git.repo('index-cache/%s'
73                                  % re.sub(r'[^@\w]', '_', 
74                                           "%s:%s" % (self.host, self.dir)))
75         if is_reverse:
76             self.pout = os.fdopen(3, 'rb')
77             self.pin = os.fdopen(4, 'wb')
78             self.conn = Conn(self.pout, self.pin)
79         else:
80             if self.protocol in ('ssh', 'file'):
81                 try:
82                     # FIXME: ssh and file shouldn't use the same module
83                     self.p = ssh.connect(self.host, self.port, 'server')
84                     self.pout = self.p.stdout
85                     self.pin = self.p.stdin
86                     self.conn = Conn(self.pout, self.pin)
87                 except OSError as e:
88                     raise ClientError, 'connect: %s' % e, sys.exc_info()[2]
89             elif self.protocol == 'bup':
90                 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
91                 self.sock.connect((self.host, atoi(self.port) or 1982))
92                 self.sockw = self.sock.makefile('wb')
93                 self.conn = DemuxConn(self.sock.fileno(), self.sockw)
94         self._available_commands = self._get_available_commands()
95         self._require_command('init-dir')
96         self._require_command('set-dir')
97         if self.dir:
98             self.dir = re.sub(r'[\r\n]', ' ', self.dir)
99             if create:
100                 self.conn.write('init-dir %s\n' % self.dir)
101             else:
102                 self.conn.write('set-dir %s\n' % self.dir)
103             self.check_ok()
104         self.sync_indexes()
105
106     def __del__(self):
107         try:
108             self.close()
109         except IOError as e:
110             if e.errno == errno.EPIPE:
111                 pass
112             else:
113                 raise
114
115     def close(self):
116         if self.conn and not self._busy:
117             self.conn.write('quit\n')
118         if self.pin:
119             self.pin.close()
120         if self.sock and self.sockw:
121             self.sockw.close()
122             self.sock.shutdown(socket.SHUT_WR)
123         if self.conn:
124             self.conn.close()
125         if self.pout:
126             self.pout.close()
127         if self.sock:
128             self.sock.close()
129         if self.p:
130             self.p.wait()
131             rv = self.p.wait()
132             if rv:
133                 raise ClientError('server tunnel returned exit code %d' % rv)
134         self.conn = None
135         self.sock = self.p = self.pin = self.pout = None
136
137     def check_ok(self):
138         if self.p:
139             rv = self.p.poll()
140             if rv != None:
141                 raise ClientError('server exited unexpectedly with code %r'
142                                   % rv)
143         try:
144             return self.conn.check_ok()
145         except Exception as e:
146             raise ClientError, e, sys.exc_info()[2]
147
148     def check_busy(self):
149         if self._busy:
150             raise ClientError('already busy with command %r' % self._busy)
151         
152     def ensure_busy(self):
153         if not self._busy:
154             raise ClientError('expected to be busy, but not busy?!')
155         
156     def _not_busy(self):
157         self._busy = None
158
159     def _get_available_commands(self):
160         self.check_busy()
161         self._busy = 'help'
162         conn = self.conn
163         conn.write('help\n')
164         result = set()
165         line = self.conn.readline()
166         if not line == 'Commands:\n':
167             raise ClientError('unexpected help header ' + repr(line))
168         while True:
169             line = self.conn.readline()
170             if line == '\n':
171                 break
172             if not line.startswith('    '):
173                 raise ClientError('unexpected help line ' + repr(line))
174             cmd = line.strip()
175             if not cmd:
176                 raise ClientError('unexpected help line ' + repr(line))
177             result.add(cmd)
178         # FIXME: confusing
179         not_ok = self.check_ok()
180         if not_ok:
181             raise not_ok
182         self._not_busy()
183         return frozenset(result)
184
185     def _require_command(self, name):
186         if name not in self._available_commands:
187             raise ClientError('server does not appear to provide %s command'
188                               % name)
189
190     def sync_indexes(self):
191         self._require_command('list-indexes')
192         self.check_busy()
193         conn = self.conn
194         mkdirp(self.cachedir)
195         # All cached idxs are extra until proven otherwise
196         extra = set()
197         for f in os.listdir(self.cachedir):
198             debug1('%s\n' % f)
199             if f.endswith('.idx'):
200                 extra.add(f)
201         needed = set()
202         conn.write('list-indexes\n')
203         for line in linereader(conn):
204             if not line:
205                 break
206             assert(line.find('/') < 0)
207             parts = line.split(' ')
208             idx = parts[0]
209             if len(parts) == 2 and parts[1] == 'load' and idx not in extra:
210                 # If the server requests that we load an idx and we don't
211                 # already have a copy of it, it is needed
212                 needed.add(idx)
213             # Any idx that the server has heard of is proven not extra
214             extra.discard(idx)
215
216         self.check_ok()
217         debug1('client: removing extra indexes: %s\n' % extra)
218         for idx in extra:
219             os.unlink(os.path.join(self.cachedir, idx))
220         debug1('client: server requested load of: %s\n' % needed)
221         for idx in needed:
222             self.sync_index(idx)
223         git.auto_midx(self.cachedir)
224
225     def sync_index(self, name):
226         self._require_command('send-index')
227         #debug1('requesting %r\n' % name)
228         self.check_busy()
229         mkdirp(self.cachedir)
230         fn = os.path.join(self.cachedir, name)
231         if os.path.exists(fn):
232             msg = "won't request existing .idx, try `bup bloom --check %s`" % fn
233             raise ClientError(msg)
234         self.conn.write('send-index %s\n' % name)
235         n = struct.unpack('!I', self.conn.read(4))[0]
236         assert(n)
237         with atomically_replaced_file(fn, 'w') as f:
238             count = 0
239             progress('Receiving index from server: %d/%d\r' % (count, n))
240             for b in chunkyreader(self.conn, n):
241                 f.write(b)
242                 count += len(b)
243                 qprogress('Receiving index from server: %d/%d\r' % (count, n))
244             progress('Receiving index from server: %d/%d, done.\n' % (count, n))
245             self.check_ok()
246
247     def _make_objcache(self):
248         return git.PackIdxList(self.cachedir)
249
250     def _suggest_packs(self):
251         ob = self._busy
252         if ob:
253             assert(ob == 'receive-objects-v2')
254             self.conn.write('\xff\xff\xff\xff')  # suspend receive-objects-v2
255         suggested = []
256         for line in linereader(self.conn):
257             if not line:
258                 break
259             debug2('%s\n' % line)
260             if line.startswith('index '):
261                 idx = line[6:]
262                 debug1('client: received index suggestion: %s\n'
263                        % git.shorten_hash(idx))
264                 suggested.append(idx)
265             else:
266                 assert(line.endswith('.idx'))
267                 debug1('client: completed writing pack, idx: %s\n'
268                        % git.shorten_hash(line))
269                 suggested.append(line)
270         self.check_ok()
271         if ob:
272             self._busy = None
273         idx = None
274         for idx in suggested:
275             self.sync_index(idx)
276         git.auto_midx(self.cachedir)
277         if ob:
278             self._busy = ob
279             self.conn.write('%s\n' % ob)
280         return idx
281
282     def new_packwriter(self, compression_level=1,
283                        max_pack_size=None, max_pack_objects=None):
284         self._require_command('receive-objects-v2')
285         self.check_busy()
286         def _set_busy():
287             self._busy = 'receive-objects-v2'
288             self.conn.write('receive-objects-v2\n')
289         return PackWriter_Remote(self.conn,
290                                  objcache_maker = self._make_objcache,
291                                  suggest_packs = self._suggest_packs,
292                                  onopen = _set_busy,
293                                  onclose = self._not_busy,
294                                  ensure_busy = self.ensure_busy,
295                                  compression_level=compression_level,
296                                  max_pack_size=max_pack_size,
297                                  max_pack_objects=max_pack_objects)
298
299     def read_ref(self, refname):
300         self._require_command('read-ref')
301         self.check_busy()
302         self.conn.write('read-ref %s\n' % refname)
303         r = self.conn.readline().strip()
304         self.check_ok()
305         if r:
306             assert(len(r) == 40)   # hexified sha
307             return r.decode('hex')
308         else:
309             return None   # nonexistent ref
310
311     def update_ref(self, refname, newval, oldval):
312         self._require_command('update-ref')
313         self.check_busy()
314         self.conn.write('update-ref %s\n%s\n%s\n' 
315                         % (refname, newval.encode('hex'),
316                            (oldval or '').encode('hex')))
317         self.check_ok()
318
319     def join(self, id):
320         self._require_command('join')
321         self.check_busy()
322         self._busy = 'join'
323         # Send 'cat' so we'll work fine with older versions
324         self.conn.write('cat %s\n' % re.sub(r'[\n\r]', '_', id))
325         while 1:
326             sz = struct.unpack('!I', self.conn.read(4))[0]
327             if not sz: break
328             yield self.conn.read(sz)
329         # FIXME: ok to assume the only NotOk is a KerError? (it is true atm)
330         e = self.check_ok()
331         self._not_busy()
332         if e:
333             raise KeyError(str(e))
334
335     def cat_batch(self, refs):
336         self._require_command('cat-batch')
337         self.check_busy()
338         self._busy = 'cat-batch'
339         conn = self.conn
340         conn.write('cat-batch\n')
341         # FIXME: do we want (only) binary protocol?
342         for ref in refs:
343             assert ref
344             assert '\n' not in ref
345             conn.write(ref)
346             conn.write('\n')
347         conn.write('\n')
348         for ref in refs:
349             info = conn.readline()
350             if info == 'missing\n':
351                 yield None, None, None, None
352                 continue
353             if not (info and info.endswith('\n')):
354                 raise ClientError('Hit EOF while looking for object info: %r'
355                                   % info)
356             oidx, oid_t, size = info.split(' ')
357             size = int(size)
358             cr = chunkyreader(conn, size)
359             yield oidx, oid_t, size, cr
360             detritus = next(cr, None)
361             if detritus:
362                 raise ClientError('unexpected leftover data ' + repr(detritus))
363         # FIXME: confusing
364         not_ok = self.check_ok()
365         if not_ok:
366             raise not_ok
367         self._not_busy()
368
369     def refs(self, patterns=None, limit_to_heads=False, limit_to_tags=False):
370         patterns = patterns or tuple()
371         self._require_command('refs')
372         self.check_busy()
373         self._busy = 'refs'
374         conn = self.conn
375         conn.write('refs %s %s\n' % (1 if limit_to_heads else 0,
376                                      1 if limit_to_tags else 0))
377         for pattern in patterns:
378             assert '\n' not in pattern
379             conn.write(pattern)
380             conn.write('\n')
381         conn.write('\n')
382         for line in lines_until_sentinel(conn, '\n', ClientError):
383             line = line[:-1]
384             oidx, name = line.split(' ')
385             if len(oidx) != 40:
386                 raise ClientError('Invalid object fingerprint in %r' % line)
387             if not name:
388                 raise ClientError('Invalid reference name in %r' % line)
389             yield name, oidx.decode('hex')
390         # FIXME: confusing
391         not_ok = self.check_ok()
392         if not_ok:
393             raise not_ok
394         self._not_busy()
395
396     def rev_list(self, refs, count=None, parse=None, format=None):
397         """See git.rev_list for the general semantics, but note that with the
398         current interface, the parse function must be able to handle
399         (consume) any blank lines produced by the format because the
400         first one received that it doesn't consume will be interpreted
401         as a terminator for the entire rev-list result.
402
403         """
404         self._require_command('rev-list')
405         assert (count is None) or (isinstance(count, Integral))
406         if format:
407             assert '\n' not in format
408             assert parse
409         for ref in refs:
410             assert ref
411             assert '\n' not in ref
412         self.check_busy()
413         self._busy = 'rev-list'
414         conn = self.conn
415         conn.write('rev-list\n')
416         if count is not None:
417             conn.write(str(count))
418         conn.write('\n')
419         if format:
420             conn.write(format)
421         conn.write('\n')
422         for ref in refs:
423             conn.write(ref)
424             conn.write('\n')
425         conn.write('\n')
426         if not format:
427             for line in lines_until_sentinel(conn, '\n', ClientError):
428                 line = line.strip()
429                 assert len(line) == 40
430                 yield line
431         else:
432             for line in lines_until_sentinel(conn, '\n', ClientError):
433                 if not line.startswith('commit '):
434                     raise ClientError('unexpected line ' + repr(line))
435                 cmt_oidx = line[7:].strip()
436                 assert len(cmt_oidx) == 40
437                 yield cmt_oidx, parse(conn)
438         # FIXME: confusing
439         not_ok = self.check_ok()
440         if not_ok:
441             raise not_ok
442         self._not_busy()
443
444     def resolve(self, path, parent=None, want_meta=True, follow=False):
445         self._require_command('resolve')
446         self.check_busy()
447         self._busy = 'resolve'
448         conn = self.conn
449         conn.write('resolve %d\n' % ((1 if want_meta else 0)
450                                      | (2 if follow else 0)
451                                      | (4 if parent else 0)))
452         if parent:
453             vfs.write_resolution(conn, parent)
454         write_bvec(conn, path)
455         success = ord(conn.read(1))
456         assert success in (0, 1)
457         if success:
458             result = vfs.read_resolution(conn)
459         else:
460             result = vfs.read_ioerror(conn)
461         # FIXME: confusing
462         not_ok = self.check_ok()
463         if not_ok:
464             raise not_ok
465         self._not_busy()
466         if isinstance(result, vfs.IOError):
467             raise result
468         return result
469
470
471 class PackWriter_Remote(git.PackWriter):
472     def __init__(self, conn, objcache_maker, suggest_packs,
473                  onopen, onclose,
474                  ensure_busy,
475                  compression_level=1,
476                  max_pack_size=None,
477                  max_pack_objects=None):
478         git.PackWriter.__init__(self,
479                                 objcache_maker=objcache_maker,
480                                 compression_level=compression_level,
481                                 max_pack_size=max_pack_size,
482                                 max_pack_objects=max_pack_objects)
483         self.file = conn
484         self.filename = 'remote socket'
485         self.suggest_packs = suggest_packs
486         self.onopen = onopen
487         self.onclose = onclose
488         self.ensure_busy = ensure_busy
489         self._packopen = False
490         self._bwcount = 0
491         self._bwtime = time.time()
492
493     def _open(self):
494         if not self._packopen:
495             self.onopen()
496             self._packopen = True
497
498     def _end(self, run_midx=True):
499         assert(run_midx)  # We don't support this via remote yet
500         if self._packopen and self.file:
501             self.file.write('\0\0\0\0')
502             self._packopen = False
503             self.onclose() # Unbusy
504             self.objcache = None
505             return self.suggest_packs() # Returns last idx received
506
507     def close(self):
508         id = self._end()
509         self.file = None
510         return id
511
512     def abort(self):
513         raise ClientError("don't know how to abort remote pack writing")
514
515     def _raw_write(self, datalist, sha):
516         assert(self.file)
517         if not self._packopen:
518             self._open()
519         self.ensure_busy()
520         data = ''.join(datalist)
521         assert(data)
522         assert(sha)
523         crc = zlib.crc32(data) & 0xffffffff
524         outbuf = ''.join((struct.pack('!I', len(data) + 20 + 4),
525                           sha,
526                           struct.pack('!I', crc),
527                           data))
528         try:
529             (self._bwcount, self._bwtime) = _raw_write_bwlimit(
530                     self.file, outbuf, self._bwcount, self._bwtime)
531         except IOError as e:
532             raise ClientError, e, sys.exc_info()[2]
533         self.outbytes += len(data)
534         self.count += 1
535
536         if self.file.has_input():
537             self.suggest_packs()
538             self.objcache.refresh()
539
540         return sha, crc