]> arthur.barton.de Git - bup.git/blob - lib/bup/client.py
c4f2bffb6382bf78505d1aee2403ae1f20e5e7e6
[bup.git] / lib / bup / client.py
1
2 from __future__ import print_function
3
4 from __future__ import absolute_import
5 from binascii import hexlify, unhexlify
6 import os, re, struct, time, zlib
7 import socket
8
9 from bup import git, ssh, vfs
10 from bup.compat import environ, pending_raise, range, reraise
11 from bup.helpers import (Conn, atomically_replaced_file, chunkyreader, debug1,
12                          debug2, linereader, lines_until_sentinel,
13                          mkdirp, nullcontext_if_not, progress, qprogress, DemuxConn)
14 from bup.io import path_msg
15 from bup.vint import write_bvec
16
17
18 bwlimit = None
19
20
21 class ClientError(Exception):
22     pass
23
24
25 def _raw_write_bwlimit(f, buf, bwcount, bwtime):
26     if not bwlimit:
27         f.write(buf)
28         return (len(buf), time.time())
29     else:
30         # We want to write in reasonably large blocks, but not so large that
31         # they're likely to overflow a router's queue.  So our bwlimit timing
32         # has to be pretty granular.  Also, if it takes too long from one
33         # transmit to the next, we can't just make up for lost time to bring
34         # the average back up to bwlimit - that will risk overflowing the
35         # outbound queue, which defeats the purpose.  So if we fall behind
36         # by more than one block delay, we shouldn't ever try to catch up.
37         for i in range(0,len(buf),4096):
38             now = time.time()
39             next = max(now, bwtime + 1.0*bwcount/bwlimit)
40             time.sleep(next-now)
41             sub = buf[i:i+4096]
42             f.write(sub)
43             bwcount = len(sub)  # might be less than 4096
44             bwtime = next
45         return (bwcount, bwtime)
46
47
48 _protocol_rs = br'([a-z]+)://'
49 _host_rs = br'(?P<sb>\[)?((?(sb)[0-9a-f:]+|[^:/]+))(?(sb)\])'
50 _port_rs = br'(?::(\d+))?'
51 _path_rs = br'(/.*)?'
52 _url_rx = re.compile(br'%s(?:%s%s)?%s' % (_protocol_rs, _host_rs, _port_rs, _path_rs),
53                      re.I)
54
55 def parse_remote(remote):
56     url_match = _url_rx.match(remote)
57     if url_match:
58         if not url_match.group(1) in (b'ssh', b'bup', b'file'):
59             raise ClientError('unexpected protocol: %s'
60                               % url_match.group(1).decode('ascii'))
61         return url_match.group(1,3,4,5)
62     else:
63         rs = remote.split(b':', 1)
64         if len(rs) == 1 or rs[0] in (b'', b'-'):
65             return b'file', None, None, rs[-1]
66         else:
67             return b'ssh', rs[0], None, rs[1]
68
69
70 class Client:
71     def __init__(self, remote, create=False):
72         self.closed = False
73         self._busy = self.conn = None
74         self.sock = self.p = self.pout = self.pin = None
75         is_reverse = environ.get(b'BUP_SERVER_REVERSE')
76         if is_reverse:
77             assert(not remote)
78             remote = b'%s:' % is_reverse
79         (self.protocol, self.host, self.port, self.dir) = parse_remote(remote)
80         # The b'None' here matches python2's behavior of b'%s' % None == 'None',
81         # python3 will (as of version 3.7.5) do the same for str ('%s' % None),
82         # but crashes instead when doing b'%s' % None.
83         cachehost = b'None' if self.host is None else self.host
84         cachedir = b'None' if self.dir is None else self.dir
85         self.cachedir = git.repo(b'index-cache/%s'
86                                  % re.sub(br'[^@\w]',
87                                           b'_',
88                                           b'%s:%s' % (cachehost, cachedir)))
89         if is_reverse:
90             self.pout = os.fdopen(3, 'rb')
91             self.pin = os.fdopen(4, 'wb')
92             self.conn = Conn(self.pout, self.pin)
93         else:
94             if self.protocol in (b'ssh', b'file'):
95                 try:
96                     # FIXME: ssh and file shouldn't use the same module
97                     self.p = ssh.connect(self.host, self.port, b'server')
98                     self.pout = self.p.stdout
99                     self.pin = self.p.stdin
100                     self.conn = Conn(self.pout, self.pin)
101                 except OSError as e:
102                     reraise(ClientError('connect: %s' % e))
103             elif self.protocol == b'bup':
104                 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
105                 self.sock.connect((self.host,
106                                    1982 if self.port is None else int(self.port)))
107                 self.sockw = self.sock.makefile('wb')
108                 self.conn = DemuxConn(self.sock.fileno(), self.sockw)
109         self._available_commands = self._get_available_commands()
110         self._require_command(b'init-dir')
111         self._require_command(b'set-dir')
112         if self.dir:
113             self.dir = re.sub(br'[\r\n]', ' ', self.dir)
114             if create:
115                 self.conn.write(b'init-dir %s\n' % self.dir)
116             else:
117                 self.conn.write(b'set-dir %s\n' % self.dir)
118             try:
119                 self.check_ok()
120             except BaseException as ex:
121                 with pending_raise(ex):
122                     self.close()
123         self.sync_indexes()
124
125     def close(self):
126         self.closed = True
127         if self.conn and not self._busy:
128             self.conn.write(b'quit\n')
129         if self.pin:
130             self.pin.close()
131         if self.sock and self.sockw:
132             self.sockw.close()
133             self.sock.shutdown(socket.SHUT_WR)
134         if self.conn:
135             self.conn.close()
136         if self.pout:
137             self.pout.close()
138         if self.sock:
139             self.sock.close()
140         if self.p:
141             self.p.wait()
142             rv = self.p.wait()
143             if rv:
144                 raise ClientError('server tunnel returned exit code %d' % rv)
145         self.conn = None
146         self.sock = self.p = self.pin = self.pout = None
147
148     def __del__(self):
149         assert self.closed
150
151     def __enter__(self):
152         return self
153
154     def __exit__(self, type, value, traceback):
155         with pending_raise(value, rethrow=False):
156             self.close()
157
158     def check_ok(self):
159         if self.p:
160             rv = self.p.poll()
161             if rv != None:
162                 raise ClientError('server exited unexpectedly with code %r'
163                                   % rv)
164         try:
165             return self.conn.check_ok()
166         except Exception as e:
167             reraise(ClientError(e))
168             # reraise doesn't return
169             return None
170
171     def check_busy(self):
172         if self._busy:
173             raise ClientError('already busy with command %r' % self._busy)
174
175     def ensure_busy(self):
176         if not self._busy:
177             raise ClientError('expected to be busy, but not busy?!')
178
179     def _not_busy(self):
180         self._busy = None
181
182     def _get_available_commands(self):
183         self.check_busy()
184         self._busy = b'help'
185         conn = self.conn
186         conn.write(b'help\n')
187         result = set()
188         line = self.conn.readline()
189         if not line == b'Commands:\n':
190             raise ClientError('unexpected help header ' + repr(line))
191         while True:
192             line = self.conn.readline()
193             if line == b'\n':
194                 break
195             if not line.startswith(b'    '):
196                 raise ClientError('unexpected help line ' + repr(line))
197             cmd = line.strip()
198             if not cmd:
199                 raise ClientError('unexpected help line ' + repr(line))
200             result.add(cmd)
201         # FIXME: confusing
202         not_ok = self.check_ok()
203         if not_ok:
204             raise not_ok
205         self._not_busy()
206         return frozenset(result)
207
208     def _require_command(self, name):
209         if name not in self._available_commands:
210             raise ClientError('server does not appear to provide %s command'
211                               % name.encode('ascii'))
212
213     def sync_indexes(self):
214         self._require_command(b'list-indexes')
215         self.check_busy()
216         conn = self.conn
217         mkdirp(self.cachedir)
218         # All cached idxs are extra until proven otherwise
219         extra = set()
220         for f in os.listdir(self.cachedir):
221             debug1(path_msg(f) + '\n')
222             if f.endswith(b'.idx'):
223                 extra.add(f)
224         needed = set()
225         conn.write(b'list-indexes\n')
226         for line in linereader(conn):
227             if not line:
228                 break
229             assert(line.find(b'/') < 0)
230             parts = line.split(b' ')
231             idx = parts[0]
232             if len(parts) == 2 and parts[1] == b'load' and idx not in extra:
233                 # If the server requests that we load an idx and we don't
234                 # already have a copy of it, it is needed
235                 needed.add(idx)
236             # Any idx that the server has heard of is proven not extra
237             extra.discard(idx)
238
239         self.check_ok()
240         debug1('client: removing extra indexes: %s\n' % extra)
241         for idx in extra:
242             os.unlink(os.path.join(self.cachedir, idx))
243         debug1('client: server requested load of: %s\n' % needed)
244         for idx in needed:
245             self.sync_index(idx)
246         git.auto_midx(self.cachedir)
247
248     def sync_index(self, name):
249         self._require_command(b'send-index')
250         #debug1('requesting %r\n' % name)
251         self.check_busy()
252         mkdirp(self.cachedir)
253         fn = os.path.join(self.cachedir, name)
254         if os.path.exists(fn):
255             msg = ("won't request existing .idx, try `bup bloom --check %s`"
256                    % path_msg(fn))
257             raise ClientError(msg)
258         self.conn.write(b'send-index %s\n' % name)
259         n = struct.unpack('!I', self.conn.read(4))[0]
260         assert(n)
261         with atomically_replaced_file(fn, 'wb') as f:
262             count = 0
263             progress('Receiving index from server: %d/%d\r' % (count, n))
264             for b in chunkyreader(self.conn, n):
265                 f.write(b)
266                 count += len(b)
267                 qprogress('Receiving index from server: %d/%d\r' % (count, n))
268             progress('Receiving index from server: %d/%d, done.\n' % (count, n))
269             self.check_ok()
270
271     def _make_objcache(self):
272         return git.PackIdxList(self.cachedir)
273
274     def _suggest_packs(self):
275         ob = self._busy
276         if ob:
277             assert(ob == b'receive-objects-v2')
278             self.conn.write(b'\xff\xff\xff\xff')  # suspend receive-objects-v2
279         suggested = []
280         for line in linereader(self.conn):
281             if not line:
282                 break
283             debug2('%r\n' % line)
284             if line.startswith(b'index '):
285                 idx = line[6:]
286                 debug1('client: received index suggestion: %s\n'
287                        % git.shorten_hash(idx).decode('ascii'))
288                 suggested.append(idx)
289             else:
290                 assert(line.endswith(b'.idx'))
291                 debug1('client: completed writing pack, idx: %s\n'
292                        % git.shorten_hash(line).decode('ascii'))
293                 suggested.append(line)
294         self.check_ok()
295         if ob:
296             self._busy = None
297         idx = None
298         for idx in suggested:
299             self.sync_index(idx)
300         git.auto_midx(self.cachedir)
301         if ob:
302             self._busy = ob
303             self.conn.write(b'%s\n' % ob)
304         return idx
305
306     def new_packwriter(self, compression_level=1,
307                        max_pack_size=None, max_pack_objects=None):
308         self._require_command(b'receive-objects-v2')
309         self.check_busy()
310         def _set_busy():
311             self._busy = b'receive-objects-v2'
312             self.conn.write(b'receive-objects-v2\n')
313         return PackWriter_Remote(self.conn,
314                                  objcache_maker = self._make_objcache,
315                                  suggest_packs = self._suggest_packs,
316                                  onopen = _set_busy,
317                                  onclose = self._not_busy,
318                                  ensure_busy = self.ensure_busy,
319                                  compression_level=compression_level,
320                                  max_pack_size=max_pack_size,
321                                  max_pack_objects=max_pack_objects)
322
323     def read_ref(self, refname):
324         self._require_command(b'read-ref')
325         self.check_busy()
326         self.conn.write(b'read-ref %s\n' % refname)
327         r = self.conn.readline().strip()
328         self.check_ok()
329         if r:
330             assert(len(r) == 40)   # hexified sha
331             return unhexlify(r)
332         else:
333             return None   # nonexistent ref
334
335     def update_ref(self, refname, newval, oldval):
336         self._require_command(b'update-ref')
337         self.check_busy()
338         self.conn.write(b'update-ref %s\n%s\n%s\n'
339                         % (refname, hexlify(newval),
340                            hexlify(oldval) if oldval else b''))
341         self.check_ok()
342
343     def join(self, id):
344         self._require_command(b'join')
345         self.check_busy()
346         self._busy = b'join'
347         # Send 'cat' so we'll work fine with older versions
348         self.conn.write(b'cat %s\n' % re.sub(br'[\n\r]', b'_', id))
349         while 1:
350             sz = struct.unpack('!I', self.conn.read(4))[0]
351             if not sz: break
352             yield self.conn.read(sz)
353         # FIXME: ok to assume the only NotOk is a KerError? (it is true atm)
354         e = self.check_ok()
355         self._not_busy()
356         if e:
357             raise KeyError(str(e))
358
359     def cat_batch(self, refs):
360         self._require_command(b'cat-batch')
361         self.check_busy()
362         self._busy = b'cat-batch'
363         conn = self.conn
364         conn.write(b'cat-batch\n')
365         # FIXME: do we want (only) binary protocol?
366         for ref in refs:
367             assert ref
368             assert b'\n' not in ref
369             conn.write(ref)
370             conn.write(b'\n')
371         conn.write(b'\n')
372         for ref in refs:
373             info = conn.readline()
374             if info == b'missing\n':
375                 yield None, None, None, None
376                 continue
377             if not (info and info.endswith(b'\n')):
378                 raise ClientError('Hit EOF while looking for object info: %r'
379                                   % info)
380             oidx, oid_t, size = info.split(b' ')
381             size = int(size)
382             cr = chunkyreader(conn, size)
383             yield oidx, oid_t, size, cr
384             detritus = next(cr, None)
385             if detritus:
386                 raise ClientError('unexpected leftover data ' + repr(detritus))
387         # FIXME: confusing
388         not_ok = self.check_ok()
389         if not_ok:
390             raise not_ok
391         self._not_busy()
392
393     def refs(self, patterns=None, limit_to_heads=False, limit_to_tags=False):
394         patterns = patterns or tuple()
395         self._require_command(b'refs')
396         self.check_busy()
397         self._busy = b'refs'
398         conn = self.conn
399         conn.write(b'refs %d %d\n' % (1 if limit_to_heads else 0,
400                                       1 if limit_to_tags else 0))
401         for pattern in patterns:
402             assert b'\n' not in pattern
403             conn.write(pattern)
404             conn.write(b'\n')
405         conn.write(b'\n')
406         for line in lines_until_sentinel(conn, b'\n', ClientError):
407             line = line[:-1]
408             oidx, name = line.split(b' ')
409             if len(oidx) != 40:
410                 raise ClientError('Invalid object fingerprint in %r' % line)
411             if not name:
412                 raise ClientError('Invalid reference name in %r' % line)
413             yield name, unhexlify(oidx)
414         # FIXME: confusing
415         not_ok = self.check_ok()
416         if not_ok:
417             raise not_ok
418         self._not_busy()
419
420     def rev_list(self, refs, parse=None, format=None):
421         """See git.rev_list for the general semantics, but note that with the
422         current interface, the parse function must be able to handle
423         (consume) any blank lines produced by the format because the
424         first one received that it doesn't consume will be interpreted
425         as a terminator for the entire rev-list result.
426
427         """
428         self._require_command(b'rev-list')
429         if format:
430             assert b'\n' not in format
431             assert parse
432         for ref in refs:
433             assert ref
434             assert b'\n' not in ref
435         self.check_busy()
436         self._busy = b'rev-list'
437         conn = self.conn
438         conn.write(b'rev-list\n')
439         conn.write(b'\n')
440         if format:
441             conn.write(format)
442         conn.write(b'\n')
443         for ref in refs:
444             conn.write(ref)
445             conn.write(b'\n')
446         conn.write(b'\n')
447         if not format:
448             for line in lines_until_sentinel(conn, b'\n', ClientError):
449                 line = line.strip()
450                 assert len(line) == 40
451                 yield line
452         else:
453             for line in lines_until_sentinel(conn, b'\n', ClientError):
454                 if not line.startswith(b'commit '):
455                     raise ClientError('unexpected line ' + repr(line))
456                 cmt_oidx = line[7:].strip()
457                 assert len(cmt_oidx) == 40
458                 yield cmt_oidx, parse(conn)
459         # FIXME: confusing
460         not_ok = self.check_ok()
461         if not_ok:
462             raise not_ok
463         self._not_busy()
464
465     def resolve(self, path, parent=None, want_meta=True, follow=False):
466         self._require_command(b'resolve')
467         self.check_busy()
468         self._busy = b'resolve'
469         conn = self.conn
470         conn.write(b'resolve %d\n' % ((1 if want_meta else 0)
471                                       | (2 if follow else 0)
472                                       | (4 if parent else 0)))
473         if parent:
474             vfs.write_resolution(conn, parent)
475         write_bvec(conn, path)
476         success = ord(conn.read(1))
477         assert success in (0, 1)
478         if success:
479             result = vfs.read_resolution(conn)
480         else:
481             result = vfs.read_ioerror(conn)
482         # FIXME: confusing
483         not_ok = self.check_ok()
484         if not_ok:
485             raise not_ok
486         self._not_busy()
487         if isinstance(result, vfs.IOError):
488             raise result
489         return result
490
491
492 # FIXME: disentangle this (stop inheriting) from PackWriter
493 class PackWriter_Remote(git.PackWriter):
494
495     def __init__(self, conn, objcache_maker, suggest_packs,
496                  onopen, onclose,
497                  ensure_busy,
498                  compression_level=1,
499                  max_pack_size=None,
500                  max_pack_objects=None):
501         git.PackWriter.__init__(self,
502                                 objcache_maker=objcache_maker,
503                                 compression_level=compression_level,
504                                 max_pack_size=max_pack_size,
505                                 max_pack_objects=max_pack_objects)
506         self.remote_closed = False
507         self.file = conn
508         self.filename = b'remote socket'
509         self.suggest_packs = suggest_packs
510         self.onopen = onopen
511         self.onclose = onclose
512         self.ensure_busy = ensure_busy
513         self._packopen = False
514         self._bwcount = 0
515         self._bwtime = time.time()
516
517     # __enter__ and __exit__ are inherited
518
519     def _open(self):
520         if not self._packopen:
521             self.onopen()
522             self._packopen = True
523
524     def _end(self, run_midx=True):
525         # Called by other PackWriter methods like breakpoint().
526         # Must not close the connection (self.file)
527         assert(run_midx)  # We don't support this via remote yet
528         self.objcache, objcache = None, self.objcache
529         with nullcontext_if_not(objcache):
530             if not (self._packopen and self.file):
531                 return None
532             self.file.write(b'\0\0\0\0')
533             self._packopen = False
534             self.onclose() # Unbusy
535             if objcache is not None:
536                 objcache.close()
537             return self.suggest_packs() # Returns last idx received
538
539     def close(self):
540         # Called by inherited __exit__
541         self.remote_closed = True
542         id = self._end()
543         self.file = None
544         super(PackWriter_Remote, self).close()
545         return id
546
547     def __del__(self):
548         assert self.remote_closed
549         super(PackWriter_Remote, self).__del__()
550
551     def abort(self):
552         raise ClientError("don't know how to abort remote pack writing")
553
554     def _raw_write(self, datalist, sha):
555         assert(self.file)
556         if not self._packopen:
557             self._open()
558         self.ensure_busy()
559         data = b''.join(datalist)
560         assert(data)
561         assert(sha)
562         crc = zlib.crc32(data) & 0xffffffff
563         outbuf = b''.join((struct.pack('!I', len(data) + 20 + 4),
564                            sha,
565                            struct.pack('!I', crc),
566                            data))
567         try:
568             (self._bwcount, self._bwtime) = _raw_write_bwlimit(
569                     self.file, outbuf, self._bwcount, self._bwtime)
570         except IOError as e:
571             reraise(ClientError(e))
572         self.outbytes += len(data)
573         self.count += 1
574
575         if self.file.has_input():
576             self.suggest_packs()
577             self.objcache.refresh()
578
579         return sha, crc