]> arthur.barton.de Git - bup.git/blob - lib/bup/client.py
5873a8f33d32b8c4996c9dd624fbc38a496270ef
[bup.git] / lib / bup / client.py
1
2 from __future__ import print_function
3
4 from __future__ import absolute_import
5 from binascii import hexlify, unhexlify
6 import os, re, struct, time, zlib
7 import socket
8
9 from bup import git, ssh, vfs
10 from bup.compat import environ, pending_raise, reraise
11 from bup.helpers import (Conn, atomically_replaced_file, chunkyreader, debug1,
12                          debug2, linereader, lines_until_sentinel,
13                          mkdirp, nullcontext_if_not, progress, qprogress, DemuxConn)
14 from bup.io import path_msg
15 from bup.vint import write_bvec
16
17
18 bwlimit = None
19
20
21 class ClientError(Exception):
22     pass
23
24
25 def _raw_write_bwlimit(f, buf, bwcount, bwtime):
26     if not bwlimit:
27         f.write(buf)
28         return (len(buf), time.time())
29     else:
30         # We want to write in reasonably large blocks, but not so large that
31         # they're likely to overflow a router's queue.  So our bwlimit timing
32         # has to be pretty granular.  Also, if it takes too long from one
33         # transmit to the next, we can't just make up for lost time to bring
34         # the average back up to bwlimit - that will risk overflowing the
35         # outbound queue, which defeats the purpose.  So if we fall behind
36         # by more than one block delay, we shouldn't ever try to catch up.
37         for i in range(0,len(buf),4096):
38             now = time.time()
39             next = max(now, bwtime + 1.0*bwcount/bwlimit)
40             time.sleep(next-now)
41             sub = buf[i:i+4096]
42             f.write(sub)
43             bwcount = len(sub)  # might be less than 4096
44             bwtime = next
45         return (bwcount, bwtime)
46
47
48 _protocol_rs = br'([a-z]+)://'
49 _host_rs = br'(?P<sb>\[)?((?(sb)[0-9a-f:]+|[^:/]+))(?(sb)\])'
50 _port_rs = br'(?::(\d+))?'
51 _path_rs = br'(/.*)?'
52 _url_rx = re.compile(br'%s(?:%s%s)?%s' % (_protocol_rs, _host_rs, _port_rs, _path_rs),
53                      re.I)
54
55 def parse_remote(remote):
56     url_match = _url_rx.match(remote)
57     if url_match:
58         if not url_match.group(1) in (b'ssh', b'bup', b'file'):
59             raise ClientError('unexpected protocol: %s'
60                               % url_match.group(1).decode('ascii'))
61         return url_match.group(1,3,4,5)
62     else:
63         rs = remote.split(b':', 1)
64         if len(rs) == 1 or rs[0] in (b'', b'-'):
65             return b'file', None, None, rs[-1]
66         else:
67             return b'ssh', rs[0], None, rs[1]
68
69
70 class Client:
71     def __init__(self, remote, create=False):
72         self.closed = False
73         self._busy = self.conn = None
74         self.sock = self.p = self.pout = self.pin = None
75         try:
76             is_reverse = environ.get(b'BUP_SERVER_REVERSE')
77             if is_reverse:
78                 assert(not remote)
79                 remote = b'%s:' % is_reverse
80             (self.protocol, self.host, self.port, self.dir) = parse_remote(remote)
81             # The b'None' here matches python2's behavior of b'%s' % None == 'None',
82             # python3 will (as of version 3.7.5) do the same for str ('%s' % None),
83             # but crashes instead when doing b'%s' % None.
84             cachehost = b'None' if self.host is None else self.host
85             cachedir = b'None' if self.dir is None else self.dir
86             self.cachedir = git.repo(b'index-cache/%s'
87                                      % re.sub(br'[^@\w]',
88                                               b'_',
89                                               b'%s:%s' % (cachehost, cachedir)))
90             if is_reverse:
91                 self.pout = os.fdopen(3, 'rb')
92                 self.pin = os.fdopen(4, 'wb')
93                 self.conn = Conn(self.pout, self.pin)
94             else:
95                 if self.protocol in (b'ssh', b'file'):
96                     try:
97                         # FIXME: ssh and file shouldn't use the same module
98                         self.p = ssh.connect(self.host, self.port, b'server')
99                         self.pout = self.p.stdout
100                         self.pin = self.p.stdin
101                         self.conn = Conn(self.pout, self.pin)
102                     except OSError as e:
103                         reraise(ClientError('connect: %s' % e))
104                 elif self.protocol == b'bup':
105                     self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
106                     self.sock.connect((self.host,
107                                        1982 if self.port is None else int(self.port)))
108                     self.sockw = self.sock.makefile('wb')
109                     self.conn = DemuxConn(self.sock.fileno(), self.sockw)
110             self._available_commands = self._get_available_commands()
111             self._require_command(b'init-dir')
112             self._require_command(b'set-dir')
113             if self.dir:
114                 self.dir = re.sub(br'[\r\n]', ' ', self.dir)
115                 if create:
116                     self.conn.write(b'init-dir %s\n' % self.dir)
117                 else:
118                     self.conn.write(b'set-dir %s\n' % self.dir)
119                 self.check_ok()
120             self.sync_indexes()
121         except BaseException as ex:
122             with pending_raise(ex):
123                 self.close()
124
125     def close(self):
126         if self.closed:
127             return
128         self.closed = True
129         try:
130             if self.conn and not self._busy:
131                 self.conn.write(b'quit\n')
132         finally:
133             try:
134                 if self.pin:
135                     self.pin.close()
136             finally:
137                 try:
138                     self.pin = None
139                     if self.sock and self.sockw:
140                         self.sockw.close()
141                         self.sock.shutdown(socket.SHUT_WR)
142                 finally:
143                     try:
144                         if self.conn:
145                             self.conn.close()
146                     finally:
147                         try:
148                             self.conn = None
149                             if self.pout:
150                                 self.pout.close()
151                         finally:
152                             try:
153                                 self.pout = None
154                                 if self.sock:
155                                     self.sock.close()
156                             finally:
157                                 self.sock = None
158                                 if self.p:
159                                     self.p.wait()
160                                     rv = self.p.wait()
161                                     if rv:
162                                         raise ClientError('server tunnel returned exit code %d' % rv)
163                                 self.p = None
164
165     def __del__(self):
166         assert self.closed
167
168     def __enter__(self):
169         return self
170
171     def __exit__(self, type, value, traceback):
172         with pending_raise(value, rethrow=False):
173             self.close()
174
175     def check_ok(self):
176         if self.p:
177             rv = self.p.poll()
178             if rv != None:
179                 raise ClientError('server exited unexpectedly with code %r'
180                                   % rv)
181         try:
182             return self.conn.check_ok()
183         except Exception as e:
184             reraise(ClientError(e))
185             # reraise doesn't return
186             return None
187
188     def check_busy(self):
189         if self._busy:
190             raise ClientError('already busy with command %r' % self._busy)
191
192     def ensure_busy(self):
193         if not self._busy:
194             raise ClientError('expected to be busy, but not busy?!')
195
196     def _not_busy(self):
197         self._busy = None
198
199     def _get_available_commands(self):
200         self.check_busy()
201         self._busy = b'help'
202         conn = self.conn
203         conn.write(b'help\n')
204         result = set()
205         line = self.conn.readline()
206         if not line == b'Commands:\n':
207             raise ClientError('unexpected help header ' + repr(line))
208         while True:
209             line = self.conn.readline()
210             if line == b'\n':
211                 break
212             if not line.startswith(b'    '):
213                 raise ClientError('unexpected help line ' + repr(line))
214             cmd = line.strip()
215             if not cmd:
216                 raise ClientError('unexpected help line ' + repr(line))
217             result.add(cmd)
218         # FIXME: confusing
219         not_ok = self.check_ok()
220         if not_ok:
221             raise not_ok
222         self._not_busy()
223         return frozenset(result)
224
225     def _require_command(self, name):
226         if name not in self._available_commands:
227             raise ClientError('server does not appear to provide %s command'
228                               % name.encode('ascii'))
229
230     def sync_indexes(self):
231         self._require_command(b'list-indexes')
232         self.check_busy()
233         conn = self.conn
234         mkdirp(self.cachedir)
235         # All cached idxs are extra until proven otherwise
236         extra = set()
237         for f in os.listdir(self.cachedir):
238             debug1(path_msg(f) + '\n')
239             if f.endswith(b'.idx'):
240                 extra.add(f)
241         needed = set()
242         conn.write(b'list-indexes\n')
243         for line in linereader(conn):
244             if not line:
245                 break
246             assert(line.find(b'/') < 0)
247             parts = line.split(b' ')
248             idx = parts[0]
249             if len(parts) == 2 and parts[1] == b'load' and idx not in extra:
250                 # If the server requests that we load an idx and we don't
251                 # already have a copy of it, it is needed
252                 needed.add(idx)
253             # Any idx that the server has heard of is proven not extra
254             extra.discard(idx)
255
256         self.check_ok()
257         debug1('client: removing extra indexes: %s\n' % extra)
258         for idx in extra:
259             os.unlink(os.path.join(self.cachedir, idx))
260         debug1('client: server requested load of: %s\n' % needed)
261         for idx in needed:
262             self.sync_index(idx)
263         git.auto_midx(self.cachedir)
264
265     def sync_index(self, name):
266         self._require_command(b'send-index')
267         #debug1('requesting %r\n' % name)
268         self.check_busy()
269         mkdirp(self.cachedir)
270         fn = os.path.join(self.cachedir, name)
271         if os.path.exists(fn):
272             msg = ("won't request existing .idx, try `bup bloom --check %s`"
273                    % path_msg(fn))
274             raise ClientError(msg)
275         self.conn.write(b'send-index %s\n' % name)
276         n = struct.unpack('!I', self.conn.read(4))[0]
277         assert(n)
278         with atomically_replaced_file(fn, 'wb') as f:
279             count = 0
280             progress('Receiving index from server: %d/%d\r' % (count, n))
281             for b in chunkyreader(self.conn, n):
282                 f.write(b)
283                 count += len(b)
284                 qprogress('Receiving index from server: %d/%d\r' % (count, n))
285             progress('Receiving index from server: %d/%d, done.\n' % (count, n))
286             self.check_ok()
287
288     def _make_objcache(self):
289         return git.PackIdxList(self.cachedir)
290
291     def _suggest_packs(self):
292         ob = self._busy
293         if ob:
294             assert(ob == b'receive-objects-v2')
295             self.conn.write(b'\xff\xff\xff\xff')  # suspend receive-objects-v2
296         suggested = []
297         for line in linereader(self.conn):
298             if not line:
299                 break
300             debug2('%r\n' % line)
301             if line.startswith(b'index '):
302                 idx = line[6:]
303                 debug1('client: received index suggestion: %s\n'
304                        % git.shorten_hash(idx).decode('ascii'))
305                 suggested.append(idx)
306             else:
307                 assert(line.endswith(b'.idx'))
308                 debug1('client: completed writing pack, idx: %s\n'
309                        % git.shorten_hash(line).decode('ascii'))
310                 suggested.append(line)
311         self.check_ok()
312         if ob:
313             self._busy = None
314         idx = None
315         for idx in suggested:
316             self.sync_index(idx)
317         git.auto_midx(self.cachedir)
318         if ob:
319             self._busy = ob
320             self.conn.write(b'%s\n' % ob)
321         return idx
322
323     def new_packwriter(self, compression_level=1,
324                        max_pack_size=None, max_pack_objects=None):
325         self._require_command(b'receive-objects-v2')
326         self.check_busy()
327         def _set_busy():
328             self._busy = b'receive-objects-v2'
329             self.conn.write(b'receive-objects-v2\n')
330         return PackWriter_Remote(self.conn,
331                                  objcache_maker = self._make_objcache,
332                                  suggest_packs = self._suggest_packs,
333                                  onopen = _set_busy,
334                                  onclose = self._not_busy,
335                                  ensure_busy = self.ensure_busy,
336                                  compression_level=compression_level,
337                                  max_pack_size=max_pack_size,
338                                  max_pack_objects=max_pack_objects)
339
340     def read_ref(self, refname):
341         self._require_command(b'read-ref')
342         self.check_busy()
343         self.conn.write(b'read-ref %s\n' % refname)
344         r = self.conn.readline().strip()
345         self.check_ok()
346         if r:
347             assert(len(r) == 40)   # hexified sha
348             return unhexlify(r)
349         else:
350             return None   # nonexistent ref
351
352     def update_ref(self, refname, newval, oldval):
353         self._require_command(b'update-ref')
354         self.check_busy()
355         self.conn.write(b'update-ref %s\n%s\n%s\n'
356                         % (refname, hexlify(newval),
357                            hexlify(oldval) if oldval else b''))
358         self.check_ok()
359
360     def join(self, id):
361         self._require_command(b'join')
362         self.check_busy()
363         self._busy = b'join'
364         # Send 'cat' so we'll work fine with older versions
365         self.conn.write(b'cat %s\n' % re.sub(br'[\n\r]', b'_', id))
366         while 1:
367             sz = struct.unpack('!I', self.conn.read(4))[0]
368             if not sz: break
369             yield self.conn.read(sz)
370         # FIXME: ok to assume the only NotOk is a KerError? (it is true atm)
371         e = self.check_ok()
372         self._not_busy()
373         if e:
374             raise KeyError(str(e))
375
376     def cat_batch(self, refs):
377         self._require_command(b'cat-batch')
378         self.check_busy()
379         self._busy = b'cat-batch'
380         conn = self.conn
381         conn.write(b'cat-batch\n')
382         # FIXME: do we want (only) binary protocol?
383         for ref in refs:
384             assert ref
385             assert b'\n' not in ref
386             conn.write(ref)
387             conn.write(b'\n')
388         conn.write(b'\n')
389         for ref in refs:
390             info = conn.readline()
391             if info == b'missing\n':
392                 yield None, None, None, None
393                 continue
394             if not (info and info.endswith(b'\n')):
395                 raise ClientError('Hit EOF while looking for object info: %r'
396                                   % info)
397             oidx, oid_t, size = info.split(b' ')
398             size = int(size)
399             cr = chunkyreader(conn, size)
400             yield oidx, oid_t, size, cr
401             detritus = next(cr, None)
402             if detritus:
403                 raise ClientError('unexpected leftover data ' + repr(detritus))
404         # FIXME: confusing
405         not_ok = self.check_ok()
406         if not_ok:
407             raise not_ok
408         self._not_busy()
409
410     def refs(self, patterns=None, limit_to_heads=False, limit_to_tags=False):
411         patterns = patterns or tuple()
412         self._require_command(b'refs')
413         self.check_busy()
414         self._busy = b'refs'
415         conn = self.conn
416         conn.write(b'refs %d %d\n' % (1 if limit_to_heads else 0,
417                                       1 if limit_to_tags else 0))
418         for pattern in patterns:
419             assert b'\n' not in pattern
420             conn.write(pattern)
421             conn.write(b'\n')
422         conn.write(b'\n')
423         for line in lines_until_sentinel(conn, b'\n', ClientError):
424             line = line[:-1]
425             oidx, name = line.split(b' ')
426             if len(oidx) != 40:
427                 raise ClientError('Invalid object fingerprint in %r' % line)
428             if not name:
429                 raise ClientError('Invalid reference name in %r' % line)
430             yield name, unhexlify(oidx)
431         # FIXME: confusing
432         not_ok = self.check_ok()
433         if not_ok:
434             raise not_ok
435         self._not_busy()
436
437     def rev_list(self, refs, parse=None, format=None):
438         """See git.rev_list for the general semantics, but note that with the
439         current interface, the parse function must be able to handle
440         (consume) any blank lines produced by the format because the
441         first one received that it doesn't consume will be interpreted
442         as a terminator for the entire rev-list result.
443
444         """
445         self._require_command(b'rev-list')
446         if format:
447             assert b'\n' not in format
448             assert parse
449         for ref in refs:
450             assert ref
451             assert b'\n' not in ref
452         self.check_busy()
453         self._busy = b'rev-list'
454         conn = self.conn
455         conn.write(b'rev-list\n')
456         conn.write(b'\n')
457         if format:
458             conn.write(format)
459         conn.write(b'\n')
460         for ref in refs:
461             conn.write(ref)
462             conn.write(b'\n')
463         conn.write(b'\n')
464         if not format:
465             for line in lines_until_sentinel(conn, b'\n', ClientError):
466                 line = line.strip()
467                 assert len(line) == 40
468                 yield line
469         else:
470             for line in lines_until_sentinel(conn, b'\n', ClientError):
471                 if not line.startswith(b'commit '):
472                     raise ClientError('unexpected line ' + repr(line))
473                 cmt_oidx = line[7:].strip()
474                 assert len(cmt_oidx) == 40
475                 yield cmt_oidx, parse(conn)
476         # FIXME: confusing
477         not_ok = self.check_ok()
478         if not_ok:
479             raise not_ok
480         self._not_busy()
481
482     def resolve(self, path, parent=None, want_meta=True, follow=False):
483         self._require_command(b'resolve')
484         self.check_busy()
485         self._busy = b'resolve'
486         conn = self.conn
487         conn.write(b'resolve %d\n' % ((1 if want_meta else 0)
488                                       | (2 if follow else 0)
489                                       | (4 if parent else 0)))
490         if parent:
491             vfs.write_resolution(conn, parent)
492         write_bvec(conn, path)
493         success = ord(conn.read(1))
494         assert success in (0, 1)
495         if success:
496             result = vfs.read_resolution(conn)
497         else:
498             result = vfs.read_ioerror(conn)
499         # FIXME: confusing
500         not_ok = self.check_ok()
501         if not_ok:
502             raise not_ok
503         self._not_busy()
504         if isinstance(result, vfs.IOError):
505             raise result
506         return result
507
508
509 # FIXME: disentangle this (stop inheriting) from PackWriter
510 class PackWriter_Remote(git.PackWriter):
511
512     def __new__(cls, *args, **kwargs):
513         result = super().__new__(cls)
514         result.remote_closed = True  # supports __del__
515         return result
516
517     def __init__(self, conn, objcache_maker, suggest_packs,
518                  onopen, onclose,
519                  ensure_busy,
520                  compression_level=1,
521                  max_pack_size=None,
522                  max_pack_objects=None):
523         git.PackWriter.__init__(self,
524                                 objcache_maker=objcache_maker,
525                                 compression_level=compression_level,
526                                 max_pack_size=max_pack_size,
527                                 max_pack_objects=max_pack_objects)
528         self.remote_closed = False
529         self.file = conn
530         self.filename = b'remote socket'
531         self.suggest_packs = suggest_packs
532         self.onopen = onopen
533         self.onclose = onclose
534         self.ensure_busy = ensure_busy
535         self._packopen = False
536         self._bwcount = 0
537         self._bwtime = time.time()
538
539     # __enter__ and __exit__ are inherited
540
541     def _open(self):
542         if not self._packopen:
543             self.onopen()
544             self._packopen = True
545
546     def _end(self, run_midx=True):
547         # Called by other PackWriter methods like breakpoint().
548         # Must not close the connection (self.file)
549         assert(run_midx)  # We don't support this via remote yet
550         self.objcache, objcache = None, self.objcache
551         with nullcontext_if_not(objcache):
552             if not (self._packopen and self.file):
553                 return None
554             self.file.write(b'\0\0\0\0')
555             self._packopen = False
556             self.onclose() # Unbusy
557             if objcache is not None:
558                 objcache.close()
559             return self.suggest_packs() # Returns last idx received
560
561     def close(self):
562         # Called by inherited __exit__
563         self.remote_closed = True
564         id = self._end()
565         self.file = None
566         super(PackWriter_Remote, self).close()
567         return id
568
569     def __del__(self):
570         assert self.remote_closed
571         super(PackWriter_Remote, self).__del__()
572
573     def abort(self):
574         raise ClientError("don't know how to abort remote pack writing")
575
576     def _raw_write(self, datalist, sha):
577         assert(self.file)
578         if not self._packopen:
579             self._open()
580         self.ensure_busy()
581         data = b''.join(datalist)
582         assert(data)
583         assert(sha)
584         crc = zlib.crc32(data) & 0xffffffff
585         outbuf = b''.join((struct.pack('!I', len(data) + 20 + 4),
586                            sha,
587                            struct.pack('!I', crc),
588                            data))
589         try:
590             (self._bwcount, self._bwtime) = _raw_write_bwlimit(
591                     self.file, outbuf, self._bwcount, self._bwtime)
592         except IOError as e:
593             reraise(ClientError(e))
594         self.outbytes += len(data)
595         self.count += 1
596
597         if self.file.has_input():
598             self.suggest_packs()
599             self.objcache.refresh()
600
601         return sha, crc