]> arthur.barton.de Git - bup.git/blob - lib/bup/client.py
19d52cb968e25145d194a7359ce01a1bb9ce654a
[bup.git] / lib / bup / client.py
1
2 from __future__ import absolute_import
3 import errno, os, re, struct, sys, time, zlib
4
5 from bup import git, ssh, vfs
6 from bup.compat import range
7 from bup.helpers import (Conn, atomically_replaced_file, chunkyreader, debug1,
8                          debug2, linereader, lines_until_sentinel,
9                          mkdirp, progress, qprogress)
10 from bup.vint import read_bvec, read_vuint, write_bvec
11
12
13 bwlimit = None
14
15
16 class ClientError(Exception):
17     pass
18
19
20 def _raw_write_bwlimit(f, buf, bwcount, bwtime):
21     if not bwlimit:
22         f.write(buf)
23         return (len(buf), time.time())
24     else:
25         # We want to write in reasonably large blocks, but not so large that
26         # they're likely to overflow a router's queue.  So our bwlimit timing
27         # has to be pretty granular.  Also, if it takes too long from one
28         # transmit to the next, we can't just make up for lost time to bring
29         # the average back up to bwlimit - that will risk overflowing the
30         # outbound queue, which defeats the purpose.  So if we fall behind
31         # by more than one block delay, we shouldn't ever try to catch up.
32         for i in range(0,len(buf),4096):
33             now = time.time()
34             next = max(now, bwtime + 1.0*bwcount/bwlimit)
35             time.sleep(next-now)
36             sub = buf[i:i+4096]
37             f.write(sub)
38             bwcount = len(sub)  # might be less than 4096
39             bwtime = next
40         return (bwcount, bwtime)
41
42
43 def parse_remote(remote):
44     protocol = r'([a-z]+)://'
45     host = r'(?P<sb>\[)?((?(sb)[0-9a-f:]+|[^:/]+))(?(sb)\])'
46     port = r'(?::(\d+))?'
47     path = r'(/.*)?'
48     url_match = re.match(
49             '%s(?:%s%s)?%s' % (protocol, host, port, path), remote, re.I)
50     if url_match:
51         if not url_match.group(1) in ('ssh', 'bup', 'file'):
52             raise ClientError, 'unexpected protocol: %s' % url_match.group(1)
53         return url_match.group(1,3,4,5)
54     else:
55         rs = remote.split(':', 1)
56         if len(rs) == 1 or rs[0] in ('', '-'):
57             return 'file', None, None, rs[-1]
58         else:
59             return 'ssh', rs[0], None, rs[1]
60
61
62 class Client:
63     def __init__(self, remote, create=False):
64         self._busy = self.conn = None
65         self.sock = self.p = self.pout = self.pin = None
66         is_reverse = os.environ.get('BUP_SERVER_REVERSE')
67         if is_reverse:
68             assert(not remote)
69             remote = '%s:' % is_reverse
70         (self.protocol, self.host, self.port, self.dir) = parse_remote(remote)
71         self.cachedir = git.repo('index-cache/%s'
72                                  % re.sub(r'[^@\w]', '_', 
73                                           "%s:%s" % (self.host, self.dir)))
74         if is_reverse:
75             self.pout = os.fdopen(3, 'rb')
76             self.pin = os.fdopen(4, 'wb')
77             self.conn = Conn(self.pout, self.pin)
78         else:
79             if self.protocol in ('ssh', 'file'):
80                 try:
81                     # FIXME: ssh and file shouldn't use the same module
82                     self.p = ssh.connect(self.host, self.port, 'server')
83                     self.pout = self.p.stdout
84                     self.pin = self.p.stdin
85                     self.conn = Conn(self.pout, self.pin)
86                 except OSError as e:
87                     raise ClientError, 'connect: %s' % e, sys.exc_info()[2]
88             elif self.protocol == 'bup':
89                 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
90                 self.sock.connect((self.host, atoi(self.port) or 1982))
91                 self.sockw = self.sock.makefile('wb')
92                 self.conn = DemuxConn(self.sock.fileno(), self.sockw)
93         self._available_commands = self._get_available_commands()
94         self._require_command('init-dir')
95         self._require_command('set-dir')
96         if self.dir:
97             self.dir = re.sub(r'[\r\n]', ' ', self.dir)
98             if create:
99                 self.conn.write('init-dir %s\n' % self.dir)
100             else:
101                 self.conn.write('set-dir %s\n' % self.dir)
102             self.check_ok()
103         self.sync_indexes()
104
105     def __del__(self):
106         try:
107             self.close()
108         except IOError as e:
109             if e.errno == errno.EPIPE:
110                 pass
111             else:
112                 raise
113
114     def close(self):
115         if self.conn and not self._busy:
116             self.conn.write('quit\n')
117         if self.pin:
118             self.pin.close()
119         if self.sock and self.sockw:
120             self.sockw.close()
121             self.sock.shutdown(socket.SHUT_WR)
122         if self.conn:
123             self.conn.close()
124         if self.pout:
125             self.pout.close()
126         if self.sock:
127             self.sock.close()
128         if self.p:
129             self.p.wait()
130             rv = self.p.wait()
131             if rv:
132                 raise ClientError('server tunnel returned exit code %d' % rv)
133         self.conn = None
134         self.sock = self.p = self.pin = self.pout = None
135
136     def check_ok(self):
137         if self.p:
138             rv = self.p.poll()
139             if rv != None:
140                 raise ClientError('server exited unexpectedly with code %r'
141                                   % rv)
142         try:
143             return self.conn.check_ok()
144         except Exception as e:
145             raise ClientError, e, sys.exc_info()[2]
146
147     def check_busy(self):
148         if self._busy:
149             raise ClientError('already busy with command %r' % self._busy)
150         
151     def ensure_busy(self):
152         if not self._busy:
153             raise ClientError('expected to be busy, but not busy?!')
154         
155     def _not_busy(self):
156         self._busy = None
157
158     def _get_available_commands(self):
159         self.check_busy()
160         self._busy = 'help'
161         conn = self.conn
162         conn.write('help\n')
163         result = set()
164         line = self.conn.readline()
165         if not line == 'Commands:\n':
166             raise ClientError('unexpected help header ' + repr(line))
167         while True:
168             line = self.conn.readline()
169             if line == '\n':
170                 break
171             if not line.startswith('    '):
172                 raise ClientError('unexpected help line ' + repr(line))
173             cmd = line.strip()
174             if not cmd:
175                 raise ClientError('unexpected help line ' + repr(line))
176             result.add(cmd)
177         # FIXME: confusing
178         not_ok = self.check_ok()
179         if not_ok:
180             raise not_ok
181         self._not_busy()
182         return frozenset(result)
183
184     def _require_command(self, name):
185         if name not in self._available_commands:
186             raise ClientError('server does not appear to provide %s command'
187                               % name)
188
189     def sync_indexes(self):
190         self._require_command('list-indexes')
191         self.check_busy()
192         conn = self.conn
193         mkdirp(self.cachedir)
194         # All cached idxs are extra until proven otherwise
195         extra = set()
196         for f in os.listdir(self.cachedir):
197             debug1('%s\n' % f)
198             if f.endswith('.idx'):
199                 extra.add(f)
200         needed = set()
201         conn.write('list-indexes\n')
202         for line in linereader(conn):
203             if not line:
204                 break
205             assert(line.find('/') < 0)
206             parts = line.split(' ')
207             idx = parts[0]
208             if len(parts) == 2 and parts[1] == 'load' and idx not in extra:
209                 # If the server requests that we load an idx and we don't
210                 # already have a copy of it, it is needed
211                 needed.add(idx)
212             # Any idx that the server has heard of is proven not extra
213             extra.discard(idx)
214
215         self.check_ok()
216         debug1('client: removing extra indexes: %s\n' % extra)
217         for idx in extra:
218             os.unlink(os.path.join(self.cachedir, idx))
219         debug1('client: server requested load of: %s\n' % needed)
220         for idx in needed:
221             self.sync_index(idx)
222         git.auto_midx(self.cachedir)
223
224     def sync_index(self, name):
225         self._require_command('send-index')
226         #debug1('requesting %r\n' % name)
227         self.check_busy()
228         mkdirp(self.cachedir)
229         fn = os.path.join(self.cachedir, name)
230         if os.path.exists(fn):
231             msg = "won't request existing .idx, try `bup bloom --check %s`" % fn
232             raise ClientError(msg)
233         self.conn.write('send-index %s\n' % name)
234         n = struct.unpack('!I', self.conn.read(4))[0]
235         assert(n)
236         with atomically_replaced_file(fn, 'w') as f:
237             count = 0
238             progress('Receiving index from server: %d/%d\r' % (count, n))
239             for b in chunkyreader(self.conn, n):
240                 f.write(b)
241                 count += len(b)
242                 qprogress('Receiving index from server: %d/%d\r' % (count, n))
243             progress('Receiving index from server: %d/%d, done.\n' % (count, n))
244             self.check_ok()
245
246     def _make_objcache(self):
247         return git.PackIdxList(self.cachedir)
248
249     def _suggest_packs(self):
250         ob = self._busy
251         if ob:
252             assert(ob == 'receive-objects-v2')
253             self.conn.write('\xff\xff\xff\xff')  # suspend receive-objects-v2
254         suggested = []
255         for line in linereader(self.conn):
256             if not line:
257                 break
258             debug2('%s\n' % line)
259             if line.startswith('index '):
260                 idx = line[6:]
261                 debug1('client: received index suggestion: %s\n'
262                        % git.shorten_hash(idx))
263                 suggested.append(idx)
264             else:
265                 assert(line.endswith('.idx'))
266                 debug1('client: completed writing pack, idx: %s\n'
267                        % git.shorten_hash(line))
268                 suggested.append(line)
269         self.check_ok()
270         if ob:
271             self._busy = None
272         idx = None
273         for idx in suggested:
274             self.sync_index(idx)
275         git.auto_midx(self.cachedir)
276         if ob:
277             self._busy = ob
278             self.conn.write('%s\n' % ob)
279         return idx
280
281     def new_packwriter(self, compression_level=1,
282                        max_pack_size=None, max_pack_objects=None):
283         self._require_command('receive-objects-v2')
284         self.check_busy()
285         def _set_busy():
286             self._busy = 'receive-objects-v2'
287             self.conn.write('receive-objects-v2\n')
288         return PackWriter_Remote(self.conn,
289                                  objcache_maker = self._make_objcache,
290                                  suggest_packs = self._suggest_packs,
291                                  onopen = _set_busy,
292                                  onclose = self._not_busy,
293                                  ensure_busy = self.ensure_busy,
294                                  compression_level=compression_level,
295                                  max_pack_size=max_pack_size,
296                                  max_pack_objects=max_pack_objects)
297
298     def read_ref(self, refname):
299         self._require_command('read-ref')
300         self.check_busy()
301         self.conn.write('read-ref %s\n' % refname)
302         r = self.conn.readline().strip()
303         self.check_ok()
304         if r:
305             assert(len(r) == 40)   # hexified sha
306             return r.decode('hex')
307         else:
308             return None   # nonexistent ref
309
310     def update_ref(self, refname, newval, oldval):
311         self._require_command('update-ref')
312         self.check_busy()
313         self.conn.write('update-ref %s\n%s\n%s\n' 
314                         % (refname, newval.encode('hex'),
315                            (oldval or '').encode('hex')))
316         self.check_ok()
317
318     def join(self, id):
319         self._require_command('join')
320         self.check_busy()
321         self._busy = 'join'
322         # Send 'cat' so we'll work fine with older versions
323         self.conn.write('cat %s\n' % re.sub(r'[\n\r]', '_', id))
324         while 1:
325             sz = struct.unpack('!I', self.conn.read(4))[0]
326             if not sz: break
327             yield self.conn.read(sz)
328         # FIXME: ok to assume the only NotOk is a KerError? (it is true atm)
329         e = self.check_ok()
330         self._not_busy()
331         if e:
332             raise KeyError(str(e))
333
334     def cat_batch(self, refs):
335         self._require_command('cat-batch')
336         self.check_busy()
337         self._busy = 'cat-batch'
338         conn = self.conn
339         conn.write('cat-batch\n')
340         # FIXME: do we want (only) binary protocol?
341         for ref in refs:
342             assert ref
343             assert '\n' not in ref
344             conn.write(ref)
345             conn.write('\n')
346         conn.write('\n')
347         for ref in refs:
348             info = conn.readline()
349             if info == 'missing\n':
350                 yield None, None, None, None
351                 continue
352             if not (info and info.endswith('\n')):
353                 raise ClientError('Hit EOF while looking for object info: %r'
354                                   % info)
355             oidx, oid_t, size = info.split(' ')
356             size = int(size)
357             cr = chunkyreader(conn, size)
358             yield oidx, oid_t, size, cr
359             detritus = next(cr, None)
360             if detritus:
361                 raise ClientError('unexpected leftover data ' + repr(detritus))
362         # FIXME: confusing
363         not_ok = self.check_ok()
364         if not_ok:
365             raise not_ok
366         self._not_busy()
367
368     def refs(self, patterns=None, limit_to_heads=False, limit_to_tags=False):
369         patterns = patterns or tuple()
370         self._require_command('refs')
371         self.check_busy()
372         self._busy = 'refs'
373         conn = self.conn
374         conn.write('refs %s %s\n' % (1 if limit_to_heads else 0,
375                                      1 if limit_to_tags else 0))
376         for pattern in patterns:
377             assert '\n' not in pattern
378             conn.write(pattern)
379             conn.write('\n')
380         conn.write('\n')
381         for line in lines_until_sentinel(conn, '\n', ClientError):
382             line = line[:-1]
383             oidx, name = line.split(' ')
384             if len(oidx) != 40:
385                 raise ClientError('Invalid object fingerprint in %r' % line)
386             if not name:
387                 raise ClientError('Invalid reference name in %r' % line)
388             yield name, oidx.decode('hex')
389         # FIXME: confusing
390         not_ok = self.check_ok()
391         if not_ok:
392             raise not_ok
393         self._not_busy()
394
395     def rev_list(self, refs, count=None, parse=None, format=None):
396         """See git.rev_list for the general semantics, but note that with the
397         current interface, the parse function must be able to handle
398         (consume) any blank lines produced by the format because the
399         first one received that it doesn't consume will be interpreted
400         as a terminator for the entire rev-list result.
401
402         """
403         self._require_command('rev-list')
404         assert (count is None) or (isinstance(count, Integral))
405         if format:
406             assert '\n' not in format
407             assert parse
408         for ref in refs:
409             assert ref
410             assert '\n' not in ref
411         self.check_busy()
412         self._busy = 'rev-list'
413         conn = self.conn
414         conn.write('rev-list\n')
415         if count is not None:
416             conn.write(str(count))
417         conn.write('\n')
418         if format:
419             conn.write(format)
420         conn.write('\n')
421         for ref in refs:
422             conn.write(ref)
423             conn.write('\n')
424         conn.write('\n')
425         if not format:
426             for line in lines_until_sentinel(conn, '\n', ClientError):
427                 line = line.strip()
428                 assert len(line) == 40
429                 yield line
430         else:
431             for line in lines_until_sentinel(conn, '\n', ClientError):
432                 if not line.startswith('commit '):
433                     raise ClientError('unexpected line ' + repr(line))
434                 cmt_oidx = line[7:].strip()
435                 assert len(cmt_oidx) == 40
436                 yield cmt_oidx, parse(conn)
437         # FIXME: confusing
438         not_ok = self.check_ok()
439         if not_ok:
440             raise not_ok
441         self._not_busy()
442
443     def resolve(self, path, parent=None, want_meta=True, follow=False):
444         self._require_command('resolve')
445         self.check_busy()
446         self._busy = 'resolve'
447         conn = self.conn
448         conn.write('resolve %d\n' % ((1 if want_meta else 0)
449                                      | (2 if follow else 0)
450                                      | (4 if parent else 0)))
451         if parent:
452             vfs.write_resolution(conn, parent)
453         write_bvec(conn, path)
454         success = ord(conn.read(1))
455         assert success in (0, 1)
456         if success:
457             result = vfs.read_resolution(conn)
458         else:
459             result = vfs.read_ioerror(conn)
460         # FIXME: confusing
461         not_ok = self.check_ok()
462         if not_ok:
463             raise not_ok
464         self._not_busy()
465         if isinstance(result, vfs.IOError):
466             raise result
467         return result
468
469
470 class PackWriter_Remote(git.PackWriter):
471     def __init__(self, conn, objcache_maker, suggest_packs,
472                  onopen, onclose,
473                  ensure_busy,
474                  compression_level=1,
475                  max_pack_size=None,
476                  max_pack_objects=None):
477         git.PackWriter.__init__(self,
478                                 objcache_maker=objcache_maker,
479                                 compression_level=compression_level,
480                                 max_pack_size=max_pack_size,
481                                 max_pack_objects=max_pack_objects)
482         self.file = conn
483         self.filename = 'remote socket'
484         self.suggest_packs = suggest_packs
485         self.onopen = onopen
486         self.onclose = onclose
487         self.ensure_busy = ensure_busy
488         self._packopen = False
489         self._bwcount = 0
490         self._bwtime = time.time()
491
492     def _open(self):
493         if not self._packopen:
494             self.onopen()
495             self._packopen = True
496
497     def _end(self, run_midx=True):
498         assert(run_midx)  # We don't support this via remote yet
499         if self._packopen and self.file:
500             self.file.write('\0\0\0\0')
501             self._packopen = False
502             self.onclose() # Unbusy
503             self.objcache = None
504             return self.suggest_packs() # Returns last idx received
505
506     def close(self):
507         id = self._end()
508         self.file = None
509         return id
510
511     def abort(self):
512         raise ClientError("don't know how to abort remote pack writing")
513
514     def _raw_write(self, datalist, sha):
515         assert(self.file)
516         if not self._packopen:
517             self._open()
518         self.ensure_busy()
519         data = ''.join(datalist)
520         assert(data)
521         assert(sha)
522         crc = zlib.crc32(data) & 0xffffffff
523         outbuf = ''.join((struct.pack('!I', len(data) + 20 + 4),
524                           sha,
525                           struct.pack('!I', crc),
526                           data))
527         try:
528             (self._bwcount, self._bwtime) = _raw_write_bwlimit(
529                     self.file, outbuf, self._bwcount, self._bwtime)
530         except IOError as e:
531             raise ClientError, e, sys.exc_info()[2]
532         self.outbytes += len(data)
533         self.count += 1
534
535         if self.file.has_input():
536             self.suggest_packs()
537             self.objcache.refresh()
538
539         return sha, crc