]> arthur.barton.de Git - bup.git/blob - lib/bup/client.py
git: close PackIdxList on init failures
[bup.git] / lib / bup / client.py
1
2 from __future__ import print_function
3
4 from __future__ import absolute_import
5 from binascii import hexlify, unhexlify
6 import os, re, struct, time, zlib
7 import socket
8
9 from bup import git, ssh, vfs
10 from bup.compat import environ, pending_raise, range, reraise
11 from bup.helpers import (Conn, atomically_replaced_file, chunkyreader, debug1,
12                          debug2, linereader, lines_until_sentinel,
13                          mkdirp, nullcontext_if_not, progress, qprogress, DemuxConn)
14 from bup.io import path_msg
15 from bup.vint import write_bvec
16
17
18 bwlimit = None
19
20
21 class ClientError(Exception):
22     pass
23
24
25 def _raw_write_bwlimit(f, buf, bwcount, bwtime):
26     if not bwlimit:
27         f.write(buf)
28         return (len(buf), time.time())
29     else:
30         # We want to write in reasonably large blocks, but not so large that
31         # they're likely to overflow a router's queue.  So our bwlimit timing
32         # has to be pretty granular.  Also, if it takes too long from one
33         # transmit to the next, we can't just make up for lost time to bring
34         # the average back up to bwlimit - that will risk overflowing the
35         # outbound queue, which defeats the purpose.  So if we fall behind
36         # by more than one block delay, we shouldn't ever try to catch up.
37         for i in range(0,len(buf),4096):
38             now = time.time()
39             next = max(now, bwtime + 1.0*bwcount/bwlimit)
40             time.sleep(next-now)
41             sub = buf[i:i+4096]
42             f.write(sub)
43             bwcount = len(sub)  # might be less than 4096
44             bwtime = next
45         return (bwcount, bwtime)
46
47
48 _protocol_rs = br'([a-z]+)://'
49 _host_rs = br'(?P<sb>\[)?((?(sb)[0-9a-f:]+|[^:/]+))(?(sb)\])'
50 _port_rs = br'(?::(\d+))?'
51 _path_rs = br'(/.*)?'
52 _url_rx = re.compile(br'%s(?:%s%s)?%s' % (_protocol_rs, _host_rs, _port_rs, _path_rs),
53                      re.I)
54
55 def parse_remote(remote):
56     url_match = _url_rx.match(remote)
57     if url_match:
58         if not url_match.group(1) in (b'ssh', b'bup', b'file'):
59             raise ClientError('unexpected protocol: %s'
60                               % url_match.group(1).decode('ascii'))
61         return url_match.group(1,3,4,5)
62     else:
63         rs = remote.split(b':', 1)
64         if len(rs) == 1 or rs[0] in (b'', b'-'):
65             return b'file', None, None, rs[-1]
66         else:
67             return b'ssh', rs[0], None, rs[1]
68
69
70 class Client:
71     def __init__(self, remote, create=False):
72         self.closed = False
73         self._busy = self.conn = None
74         self.sock = self.p = self.pout = self.pin = None
75         is_reverse = environ.get(b'BUP_SERVER_REVERSE')
76         if is_reverse:
77             assert(not remote)
78             remote = b'%s:' % is_reverse
79         (self.protocol, self.host, self.port, self.dir) = parse_remote(remote)
80         # The b'None' here matches python2's behavior of b'%s' % None == 'None',
81         # python3 will (as of version 3.7.5) do the same for str ('%s' % None),
82         # but crashes instead when doing b'%s' % None.
83         cachehost = b'None' if self.host is None else self.host
84         cachedir = b'None' if self.dir is None else self.dir
85         self.cachedir = git.repo(b'index-cache/%s'
86                                  % re.sub(br'[^@\w]',
87                                           b'_',
88                                           b'%s:%s' % (cachehost, cachedir)))
89         if is_reverse:
90             self.pout = os.fdopen(3, 'rb')
91             self.pin = os.fdopen(4, 'wb')
92             self.conn = Conn(self.pout, self.pin)
93         else:
94             if self.protocol in (b'ssh', b'file'):
95                 try:
96                     # FIXME: ssh and file shouldn't use the same module
97                     self.p = ssh.connect(self.host, self.port, b'server')
98                     self.pout = self.p.stdout
99                     self.pin = self.p.stdin
100                     self.conn = Conn(self.pout, self.pin)
101                 except OSError as e:
102                     reraise(ClientError('connect: %s' % e))
103             elif self.protocol == b'bup':
104                 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
105                 self.sock.connect((self.host,
106                                    1982 if self.port is None else int(self.port)))
107                 self.sockw = self.sock.makefile('wb')
108                 self.conn = DemuxConn(self.sock.fileno(), self.sockw)
109         self._available_commands = self._get_available_commands()
110         self._require_command(b'init-dir')
111         self._require_command(b'set-dir')
112         if self.dir:
113             self.dir = re.sub(br'[\r\n]', ' ', self.dir)
114             if create:
115                 self.conn.write(b'init-dir %s\n' % self.dir)
116             else:
117                 self.conn.write(b'set-dir %s\n' % self.dir)
118             self.check_ok()
119         self.sync_indexes()
120
121     def close(self):
122         self.closed = True
123         if self.conn and not self._busy:
124             self.conn.write(b'quit\n')
125         if self.pin:
126             self.pin.close()
127         if self.sock and self.sockw:
128             self.sockw.close()
129             self.sock.shutdown(socket.SHUT_WR)
130         if self.conn:
131             self.conn.close()
132         if self.pout:
133             self.pout.close()
134         if self.sock:
135             self.sock.close()
136         if self.p:
137             self.p.wait()
138             rv = self.p.wait()
139             if rv:
140                 raise ClientError('server tunnel returned exit code %d' % rv)
141         self.conn = None
142         self.sock = self.p = self.pin = self.pout = None
143
144     def __del__(self):
145         assert self.closed
146
147     def __enter__(self):
148         return self
149
150     def __exit__(self, type, value, traceback):
151         with pending_raise(value, rethrow=False):
152             self.close()
153
154     def check_ok(self):
155         if self.p:
156             rv = self.p.poll()
157             if rv != None:
158                 raise ClientError('server exited unexpectedly with code %r'
159                                   % rv)
160         try:
161             return self.conn.check_ok()
162         except Exception as e:
163             reraise(ClientError(e))
164             # reraise doesn't return
165             return None
166
167     def check_busy(self):
168         if self._busy:
169             raise ClientError('already busy with command %r' % self._busy)
170
171     def ensure_busy(self):
172         if not self._busy:
173             raise ClientError('expected to be busy, but not busy?!')
174
175     def _not_busy(self):
176         self._busy = None
177
178     def _get_available_commands(self):
179         self.check_busy()
180         self._busy = b'help'
181         conn = self.conn
182         conn.write(b'help\n')
183         result = set()
184         line = self.conn.readline()
185         if not line == b'Commands:\n':
186             raise ClientError('unexpected help header ' + repr(line))
187         while True:
188             line = self.conn.readline()
189             if line == b'\n':
190                 break
191             if not line.startswith(b'    '):
192                 raise ClientError('unexpected help line ' + repr(line))
193             cmd = line.strip()
194             if not cmd:
195                 raise ClientError('unexpected help line ' + repr(line))
196             result.add(cmd)
197         # FIXME: confusing
198         not_ok = self.check_ok()
199         if not_ok:
200             raise not_ok
201         self._not_busy()
202         return frozenset(result)
203
204     def _require_command(self, name):
205         if name not in self._available_commands:
206             raise ClientError('server does not appear to provide %s command'
207                               % name.encode('ascii'))
208
209     def sync_indexes(self):
210         self._require_command(b'list-indexes')
211         self.check_busy()
212         conn = self.conn
213         mkdirp(self.cachedir)
214         # All cached idxs are extra until proven otherwise
215         extra = set()
216         for f in os.listdir(self.cachedir):
217             debug1(path_msg(f) + '\n')
218             if f.endswith(b'.idx'):
219                 extra.add(f)
220         needed = set()
221         conn.write(b'list-indexes\n')
222         for line in linereader(conn):
223             if not line:
224                 break
225             assert(line.find(b'/') < 0)
226             parts = line.split(b' ')
227             idx = parts[0]
228             if len(parts) == 2 and parts[1] == b'load' and idx not in extra:
229                 # If the server requests that we load an idx and we don't
230                 # already have a copy of it, it is needed
231                 needed.add(idx)
232             # Any idx that the server has heard of is proven not extra
233             extra.discard(idx)
234
235         self.check_ok()
236         debug1('client: removing extra indexes: %s\n' % extra)
237         for idx in extra:
238             os.unlink(os.path.join(self.cachedir, idx))
239         debug1('client: server requested load of: %s\n' % needed)
240         for idx in needed:
241             self.sync_index(idx)
242         git.auto_midx(self.cachedir)
243
244     def sync_index(self, name):
245         self._require_command(b'send-index')
246         #debug1('requesting %r\n' % name)
247         self.check_busy()
248         mkdirp(self.cachedir)
249         fn = os.path.join(self.cachedir, name)
250         if os.path.exists(fn):
251             msg = ("won't request existing .idx, try `bup bloom --check %s`"
252                    % path_msg(fn))
253             raise ClientError(msg)
254         self.conn.write(b'send-index %s\n' % name)
255         n = struct.unpack('!I', self.conn.read(4))[0]
256         assert(n)
257         with atomically_replaced_file(fn, 'wb') as f:
258             count = 0
259             progress('Receiving index from server: %d/%d\r' % (count, n))
260             for b in chunkyreader(self.conn, n):
261                 f.write(b)
262                 count += len(b)
263                 qprogress('Receiving index from server: %d/%d\r' % (count, n))
264             progress('Receiving index from server: %d/%d, done.\n' % (count, n))
265             self.check_ok()
266
267     def _make_objcache(self):
268         return git.PackIdxList(self.cachedir)
269
270     def _suggest_packs(self):
271         ob = self._busy
272         if ob:
273             assert(ob == b'receive-objects-v2')
274             self.conn.write(b'\xff\xff\xff\xff')  # suspend receive-objects-v2
275         suggested = []
276         for line in linereader(self.conn):
277             if not line:
278                 break
279             debug2('%r\n' % line)
280             if line.startswith(b'index '):
281                 idx = line[6:]
282                 debug1('client: received index suggestion: %s\n'
283                        % git.shorten_hash(idx).decode('ascii'))
284                 suggested.append(idx)
285             else:
286                 assert(line.endswith(b'.idx'))
287                 debug1('client: completed writing pack, idx: %s\n'
288                        % git.shorten_hash(line).decode('ascii'))
289                 suggested.append(line)
290         self.check_ok()
291         if ob:
292             self._busy = None
293         idx = None
294         for idx in suggested:
295             self.sync_index(idx)
296         git.auto_midx(self.cachedir)
297         if ob:
298             self._busy = ob
299             self.conn.write(b'%s\n' % ob)
300         return idx
301
302     def new_packwriter(self, compression_level=1,
303                        max_pack_size=None, max_pack_objects=None):
304         self._require_command(b'receive-objects-v2')
305         self.check_busy()
306         def _set_busy():
307             self._busy = b'receive-objects-v2'
308             self.conn.write(b'receive-objects-v2\n')
309         return PackWriter_Remote(self.conn,
310                                  objcache_maker = self._make_objcache,
311                                  suggest_packs = self._suggest_packs,
312                                  onopen = _set_busy,
313                                  onclose = self._not_busy,
314                                  ensure_busy = self.ensure_busy,
315                                  compression_level=compression_level,
316                                  max_pack_size=max_pack_size,
317                                  max_pack_objects=max_pack_objects)
318
319     def read_ref(self, refname):
320         self._require_command(b'read-ref')
321         self.check_busy()
322         self.conn.write(b'read-ref %s\n' % refname)
323         r = self.conn.readline().strip()
324         self.check_ok()
325         if r:
326             assert(len(r) == 40)   # hexified sha
327             return unhexlify(r)
328         else:
329             return None   # nonexistent ref
330
331     def update_ref(self, refname, newval, oldval):
332         self._require_command(b'update-ref')
333         self.check_busy()
334         self.conn.write(b'update-ref %s\n%s\n%s\n'
335                         % (refname, hexlify(newval),
336                            hexlify(oldval) if oldval else b''))
337         self.check_ok()
338
339     def join(self, id):
340         self._require_command(b'join')
341         self.check_busy()
342         self._busy = b'join'
343         # Send 'cat' so we'll work fine with older versions
344         self.conn.write(b'cat %s\n' % re.sub(br'[\n\r]', b'_', id))
345         while 1:
346             sz = struct.unpack('!I', self.conn.read(4))[0]
347             if not sz: break
348             yield self.conn.read(sz)
349         # FIXME: ok to assume the only NotOk is a KerError? (it is true atm)
350         e = self.check_ok()
351         self._not_busy()
352         if e:
353             raise KeyError(str(e))
354
355     def cat_batch(self, refs):
356         self._require_command(b'cat-batch')
357         self.check_busy()
358         self._busy = b'cat-batch'
359         conn = self.conn
360         conn.write(b'cat-batch\n')
361         # FIXME: do we want (only) binary protocol?
362         for ref in refs:
363             assert ref
364             assert b'\n' not in ref
365             conn.write(ref)
366             conn.write(b'\n')
367         conn.write(b'\n')
368         for ref in refs:
369             info = conn.readline()
370             if info == b'missing\n':
371                 yield None, None, None, None
372                 continue
373             if not (info and info.endswith(b'\n')):
374                 raise ClientError('Hit EOF while looking for object info: %r'
375                                   % info)
376             oidx, oid_t, size = info.split(b' ')
377             size = int(size)
378             cr = chunkyreader(conn, size)
379             yield oidx, oid_t, size, cr
380             detritus = next(cr, None)
381             if detritus:
382                 raise ClientError('unexpected leftover data ' + repr(detritus))
383         # FIXME: confusing
384         not_ok = self.check_ok()
385         if not_ok:
386             raise not_ok
387         self._not_busy()
388
389     def refs(self, patterns=None, limit_to_heads=False, limit_to_tags=False):
390         patterns = patterns or tuple()
391         self._require_command(b'refs')
392         self.check_busy()
393         self._busy = b'refs'
394         conn = self.conn
395         conn.write(b'refs %d %d\n' % (1 if limit_to_heads else 0,
396                                       1 if limit_to_tags else 0))
397         for pattern in patterns:
398             assert b'\n' not in pattern
399             conn.write(pattern)
400             conn.write(b'\n')
401         conn.write(b'\n')
402         for line in lines_until_sentinel(conn, b'\n', ClientError):
403             line = line[:-1]
404             oidx, name = line.split(b' ')
405             if len(oidx) != 40:
406                 raise ClientError('Invalid object fingerprint in %r' % line)
407             if not name:
408                 raise ClientError('Invalid reference name in %r' % line)
409             yield name, unhexlify(oidx)
410         # FIXME: confusing
411         not_ok = self.check_ok()
412         if not_ok:
413             raise not_ok
414         self._not_busy()
415
416     def rev_list(self, refs, parse=None, format=None):
417         """See git.rev_list for the general semantics, but note that with the
418         current interface, the parse function must be able to handle
419         (consume) any blank lines produced by the format because the
420         first one received that it doesn't consume will be interpreted
421         as a terminator for the entire rev-list result.
422
423         """
424         self._require_command(b'rev-list')
425         if format:
426             assert b'\n' not in format
427             assert parse
428         for ref in refs:
429             assert ref
430             assert b'\n' not in ref
431         self.check_busy()
432         self._busy = b'rev-list'
433         conn = self.conn
434         conn.write(b'rev-list\n')
435         conn.write(b'\n')
436         if format:
437             conn.write(format)
438         conn.write(b'\n')
439         for ref in refs:
440             conn.write(ref)
441             conn.write(b'\n')
442         conn.write(b'\n')
443         if not format:
444             for line in lines_until_sentinel(conn, b'\n', ClientError):
445                 line = line.strip()
446                 assert len(line) == 40
447                 yield line
448         else:
449             for line in lines_until_sentinel(conn, b'\n', ClientError):
450                 if not line.startswith(b'commit '):
451                     raise ClientError('unexpected line ' + repr(line))
452                 cmt_oidx = line[7:].strip()
453                 assert len(cmt_oidx) == 40
454                 yield cmt_oidx, parse(conn)
455         # FIXME: confusing
456         not_ok = self.check_ok()
457         if not_ok:
458             raise not_ok
459         self._not_busy()
460
461     def resolve(self, path, parent=None, want_meta=True, follow=False):
462         self._require_command(b'resolve')
463         self.check_busy()
464         self._busy = b'resolve'
465         conn = self.conn
466         conn.write(b'resolve %d\n' % ((1 if want_meta else 0)
467                                       | (2 if follow else 0)
468                                       | (4 if parent else 0)))
469         if parent:
470             vfs.write_resolution(conn, parent)
471         write_bvec(conn, path)
472         success = ord(conn.read(1))
473         assert success in (0, 1)
474         if success:
475             result = vfs.read_resolution(conn)
476         else:
477             result = vfs.read_ioerror(conn)
478         # FIXME: confusing
479         not_ok = self.check_ok()
480         if not_ok:
481             raise not_ok
482         self._not_busy()
483         if isinstance(result, vfs.IOError):
484             raise result
485         return result
486
487
488 # FIXME: disentangle this (stop inheriting) from PackWriter
489 class PackWriter_Remote(git.PackWriter):
490
491     def __init__(self, conn, objcache_maker, suggest_packs,
492                  onopen, onclose,
493                  ensure_busy,
494                  compression_level=1,
495                  max_pack_size=None,
496                  max_pack_objects=None):
497         git.PackWriter.__init__(self,
498                                 objcache_maker=objcache_maker,
499                                 compression_level=compression_level,
500                                 max_pack_size=max_pack_size,
501                                 max_pack_objects=max_pack_objects)
502         self.remote_closed = False
503         self.file = conn
504         self.filename = b'remote socket'
505         self.suggest_packs = suggest_packs
506         self.onopen = onopen
507         self.onclose = onclose
508         self.ensure_busy = ensure_busy
509         self._packopen = False
510         self._bwcount = 0
511         self._bwtime = time.time()
512
513     # __enter__ and __exit__ are inherited
514
515     def _open(self):
516         if not self._packopen:
517             self.onopen()
518             self._packopen = True
519
520     def _end(self, run_midx=True):
521         # Called by other PackWriter methods like breakpoint().
522         # Must not close the connection (self.file)
523         assert(run_midx)  # We don't support this via remote yet
524         self.objcache, objcache = None, self.objcache
525         with nullcontext_if_not(objcache):
526             if not (self._packopen and self.file):
527                 return None
528             self.file.write(b'\0\0\0\0')
529             self._packopen = False
530             self.onclose() # Unbusy
531             if objcache is not None:
532                 objcache.close()
533             return self.suggest_packs() # Returns last idx received
534
535     def close(self):
536         # Called by inherited __exit__
537         self.remote_closed = True
538         id = self._end()
539         self.file = None
540         return id
541
542     def __del__(self):
543         assert self.remote_closed
544         super(PackWriter_Remote, self).__del__()
545
546     def abort(self):
547         raise ClientError("don't know how to abort remote pack writing")
548
549     def _raw_write(self, datalist, sha):
550         assert(self.file)
551         if not self._packopen:
552             self._open()
553         self.ensure_busy()
554         data = b''.join(datalist)
555         assert(data)
556         assert(sha)
557         crc = zlib.crc32(data) & 0xffffffff
558         outbuf = b''.join((struct.pack('!I', len(data) + 20 + 4),
559                            sha,
560                            struct.pack('!I', crc),
561                            data))
562         try:
563             (self._bwcount, self._bwtime) = _raw_write_bwlimit(
564                     self.file, outbuf, self._bwcount, self._bwtime)
565         except IOError as e:
566             reraise(ClientError(e))
567         self.outbytes += len(data)
568         self.count += 1
569
570         if self.file.has_input():
571             self.suggest_packs()
572             self.objcache.refresh()
573
574         return sha, crc