2 from __future__ import print_function
4 from __future__ import absolute_import
5 from binascii import hexlify, unhexlify
6 import os, re, struct, time, zlib
9 from bup import git, ssh, vfs
10 from bup.compat import environ, pending_raise, reraise
11 from bup.helpers import (Conn, atomically_replaced_file, chunkyreader, debug1,
12 debug2, linereader, lines_until_sentinel,
13 mkdirp, nullcontext_if_not, progress, qprogress, DemuxConn)
14 from bup.io import path_msg
15 from bup.vint import write_bvec
21 class ClientError(Exception):
25 def _raw_write_bwlimit(f, buf, bwcount, bwtime):
28 return (len(buf), time.time())
30 # We want to write in reasonably large blocks, but not so large that
31 # they're likely to overflow a router's queue. So our bwlimit timing
32 # has to be pretty granular. Also, if it takes too long from one
33 # transmit to the next, we can't just make up for lost time to bring
34 # the average back up to bwlimit - that will risk overflowing the
35 # outbound queue, which defeats the purpose. So if we fall behind
36 # by more than one block delay, we shouldn't ever try to catch up.
37 for i in range(0,len(buf),4096):
39 next = max(now, bwtime + 1.0*bwcount/bwlimit)
43 bwcount = len(sub) # might be less than 4096
45 return (bwcount, bwtime)
48 _protocol_rs = br'([a-z]+)://'
49 _host_rs = br'(?P<sb>\[)?((?(sb)[0-9a-f:]+|[^:/]+))(?(sb)\])'
50 _port_rs = br'(?::(\d+))?'
52 _url_rx = re.compile(br'%s(?:%s%s)?%s' % (_protocol_rs, _host_rs, _port_rs, _path_rs),
55 def parse_remote(remote):
56 url_match = _url_rx.match(remote)
58 if not url_match.group(1) in (b'ssh', b'bup', b'file'):
59 raise ClientError('unexpected protocol: %s'
60 % url_match.group(1).decode('ascii'))
61 return url_match.group(1,3,4,5)
63 rs = remote.split(b':', 1)
64 if len(rs) == 1 or rs[0] in (b'', b'-'):
65 return b'file', None, None, rs[-1]
67 return b'ssh', rs[0], None, rs[1]
71 def __init__(self, remote, create=False):
73 self._busy = self.conn = None
74 self.sock = self.p = self.pout = self.pin = None
76 is_reverse = environ.get(b'BUP_SERVER_REVERSE')
79 remote = b'%s:' % is_reverse
80 (self.protocol, self.host, self.port, self.dir) = parse_remote(remote)
81 # The b'None' here matches python2's behavior of b'%s' % None == 'None',
82 # python3 will (as of version 3.7.5) do the same for str ('%s' % None),
83 # but crashes instead when doing b'%s' % None.
84 cachehost = b'None' if self.host is None else self.host
85 cachedir = b'None' if self.dir is None else self.dir
86 self.cachedir = git.repo(b'index-cache/%s'
89 b'%s:%s' % (cachehost, cachedir)))
91 self.pout = os.fdopen(3, 'rb')
92 self.pin = os.fdopen(4, 'wb')
93 self.conn = Conn(self.pout, self.pin)
95 if self.protocol in (b'ssh', b'file'):
97 # FIXME: ssh and file shouldn't use the same module
98 self.p = ssh.connect(self.host, self.port, b'server')
99 self.pout = self.p.stdout
100 self.pin = self.p.stdin
101 self.conn = Conn(self.pout, self.pin)
103 reraise(ClientError('connect: %s' % e))
104 elif self.protocol == b'bup':
105 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
106 self.sock.connect((self.host,
107 1982 if self.port is None else int(self.port)))
108 self.sockw = self.sock.makefile('wb')
109 self.conn = DemuxConn(self.sock.fileno(), self.sockw)
110 self._available_commands = self._get_available_commands()
111 self._require_command(b'init-dir')
112 self._require_command(b'set-dir')
114 self.dir = re.sub(br'[\r\n]', ' ', self.dir)
116 self.conn.write(b'init-dir %s\n' % self.dir)
118 self.conn.write(b'set-dir %s\n' % self.dir)
121 except BaseException as ex:
122 with pending_raise(ex):
130 if self.conn and not self._busy:
131 self.conn.write(b'quit\n')
139 if self.sock and self.sockw:
141 self.sock.shutdown(socket.SHUT_WR)
162 raise ClientError('server tunnel returned exit code %d' % rv)
171 def __exit__(self, type, value, traceback):
172 with pending_raise(value, rethrow=False):
179 raise ClientError('server exited unexpectedly with code %r'
182 return self.conn.check_ok()
183 except Exception as e:
184 reraise(ClientError(e))
185 # reraise doesn't return
188 def check_busy(self):
190 raise ClientError('already busy with command %r' % self._busy)
192 def ensure_busy(self):
194 raise ClientError('expected to be busy, but not busy?!')
199 def _get_available_commands(self):
203 conn.write(b'help\n')
205 line = self.conn.readline()
206 if not line == b'Commands:\n':
207 raise ClientError('unexpected help header ' + repr(line))
209 line = self.conn.readline()
212 if not line.startswith(b' '):
213 raise ClientError('unexpected help line ' + repr(line))
216 raise ClientError('unexpected help line ' + repr(line))
219 not_ok = self.check_ok()
223 return frozenset(result)
225 def _require_command(self, name):
226 if name not in self._available_commands:
227 raise ClientError('server does not appear to provide %s command'
228 % name.encode('ascii'))
230 def sync_indexes(self):
231 self._require_command(b'list-indexes')
234 mkdirp(self.cachedir)
235 # All cached idxs are extra until proven otherwise
237 for f in os.listdir(self.cachedir):
238 debug1(path_msg(f) + '\n')
239 if f.endswith(b'.idx'):
242 conn.write(b'list-indexes\n')
243 for line in linereader(conn):
246 assert(line.find(b'/') < 0)
247 parts = line.split(b' ')
249 if len(parts) == 2 and parts[1] == b'load' and idx not in extra:
250 # If the server requests that we load an idx and we don't
251 # already have a copy of it, it is needed
253 # Any idx that the server has heard of is proven not extra
257 debug1('client: removing extra indexes: %s\n' % extra)
259 os.unlink(os.path.join(self.cachedir, idx))
260 debug1('client: server requested load of: %s\n' % needed)
263 git.auto_midx(self.cachedir)
265 def sync_index(self, name):
266 self._require_command(b'send-index')
267 #debug1('requesting %r\n' % name)
269 mkdirp(self.cachedir)
270 fn = os.path.join(self.cachedir, name)
271 if os.path.exists(fn):
272 msg = ("won't request existing .idx, try `bup bloom --check %s`"
274 raise ClientError(msg)
275 self.conn.write(b'send-index %s\n' % name)
276 n = struct.unpack('!I', self.conn.read(4))[0]
278 with atomically_replaced_file(fn, 'wb') as f:
280 progress('Receiving index from server: %d/%d\r' % (count, n))
281 for b in chunkyreader(self.conn, n):
284 qprogress('Receiving index from server: %d/%d\r' % (count, n))
285 progress('Receiving index from server: %d/%d, done.\n' % (count, n))
288 def _make_objcache(self):
289 return git.PackIdxList(self.cachedir)
291 def _suggest_packs(self):
294 assert(ob == b'receive-objects-v2')
295 self.conn.write(b'\xff\xff\xff\xff') # suspend receive-objects-v2
297 for line in linereader(self.conn):
300 debug2('%r\n' % line)
301 if line.startswith(b'index '):
303 debug1('client: received index suggestion: %s\n'
304 % git.shorten_hash(idx).decode('ascii'))
305 suggested.append(idx)
307 assert(line.endswith(b'.idx'))
308 debug1('client: completed writing pack, idx: %s\n'
309 % git.shorten_hash(line).decode('ascii'))
310 suggested.append(line)
315 for idx in suggested:
317 git.auto_midx(self.cachedir)
320 self.conn.write(b'%s\n' % ob)
323 def new_packwriter(self, compression_level=1,
324 max_pack_size=None, max_pack_objects=None):
325 self._require_command(b'receive-objects-v2')
328 self._busy = b'receive-objects-v2'
329 self.conn.write(b'receive-objects-v2\n')
330 return PackWriter_Remote(self.conn,
331 objcache_maker = self._make_objcache,
332 suggest_packs = self._suggest_packs,
334 onclose = self._not_busy,
335 ensure_busy = self.ensure_busy,
336 compression_level=compression_level,
337 max_pack_size=max_pack_size,
338 max_pack_objects=max_pack_objects)
340 def read_ref(self, refname):
341 self._require_command(b'read-ref')
343 self.conn.write(b'read-ref %s\n' % refname)
344 r = self.conn.readline().strip()
347 assert(len(r) == 40) # hexified sha
350 return None # nonexistent ref
352 def update_ref(self, refname, newval, oldval):
353 self._require_command(b'update-ref')
355 self.conn.write(b'update-ref %s\n%s\n%s\n'
356 % (refname, hexlify(newval),
357 hexlify(oldval) if oldval else b''))
361 self._require_command(b'join')
364 # Send 'cat' so we'll work fine with older versions
365 self.conn.write(b'cat %s\n' % re.sub(br'[\n\r]', b'_', id))
367 sz = struct.unpack('!I', self.conn.read(4))[0]
369 yield self.conn.read(sz)
370 # FIXME: ok to assume the only NotOk is a KerError? (it is true atm)
374 raise KeyError(str(e))
376 def cat_batch(self, refs):
377 self._require_command(b'cat-batch')
379 self._busy = b'cat-batch'
381 conn.write(b'cat-batch\n')
382 # FIXME: do we want (only) binary protocol?
385 assert b'\n' not in ref
390 info = conn.readline()
391 if info == b'missing\n':
392 yield None, None, None, None
394 if not (info and info.endswith(b'\n')):
395 raise ClientError('Hit EOF while looking for object info: %r'
397 oidx, oid_t, size = info.split(b' ')
399 cr = chunkyreader(conn, size)
400 yield oidx, oid_t, size, cr
401 detritus = next(cr, None)
403 raise ClientError('unexpected leftover data ' + repr(detritus))
405 not_ok = self.check_ok()
410 def refs(self, patterns=None, limit_to_heads=False, limit_to_tags=False):
411 patterns = patterns or tuple()
412 self._require_command(b'refs')
416 conn.write(b'refs %d %d\n' % (1 if limit_to_heads else 0,
417 1 if limit_to_tags else 0))
418 for pattern in patterns:
419 assert b'\n' not in pattern
423 for line in lines_until_sentinel(conn, b'\n', ClientError):
425 oidx, name = line.split(b' ')
427 raise ClientError('Invalid object fingerprint in %r' % line)
429 raise ClientError('Invalid reference name in %r' % line)
430 yield name, unhexlify(oidx)
432 not_ok = self.check_ok()
437 def rev_list(self, refs, parse=None, format=None):
438 """See git.rev_list for the general semantics, but note that with the
439 current interface, the parse function must be able to handle
440 (consume) any blank lines produced by the format because the
441 first one received that it doesn't consume will be interpreted
442 as a terminator for the entire rev-list result.
445 self._require_command(b'rev-list')
447 assert b'\n' not in format
451 assert b'\n' not in ref
453 self._busy = b'rev-list'
455 conn.write(b'rev-list\n')
465 for line in lines_until_sentinel(conn, b'\n', ClientError):
467 assert len(line) == 40
470 for line in lines_until_sentinel(conn, b'\n', ClientError):
471 if not line.startswith(b'commit '):
472 raise ClientError('unexpected line ' + repr(line))
473 cmt_oidx = line[7:].strip()
474 assert len(cmt_oidx) == 40
475 yield cmt_oidx, parse(conn)
477 not_ok = self.check_ok()
482 def resolve(self, path, parent=None, want_meta=True, follow=False):
483 self._require_command(b'resolve')
485 self._busy = b'resolve'
487 conn.write(b'resolve %d\n' % ((1 if want_meta else 0)
488 | (2 if follow else 0)
489 | (4 if parent else 0)))
491 vfs.write_resolution(conn, parent)
492 write_bvec(conn, path)
493 success = ord(conn.read(1))
494 assert success in (0, 1)
496 result = vfs.read_resolution(conn)
498 result = vfs.read_ioerror(conn)
500 not_ok = self.check_ok()
504 if isinstance(result, vfs.IOError):
509 # FIXME: disentangle this (stop inheriting) from PackWriter
510 class PackWriter_Remote(git.PackWriter):
512 def __new__(cls, *args, **kwargs):
513 result = super().__new__(cls)
514 result.remote_closed = True # supports __del__
517 def __init__(self, conn, objcache_maker, suggest_packs,
522 max_pack_objects=None):
523 git.PackWriter.__init__(self,
524 objcache_maker=objcache_maker,
525 compression_level=compression_level,
526 max_pack_size=max_pack_size,
527 max_pack_objects=max_pack_objects)
528 self.remote_closed = False
530 self.filename = b'remote socket'
531 self.suggest_packs = suggest_packs
533 self.onclose = onclose
534 self.ensure_busy = ensure_busy
535 self._packopen = False
537 self._bwtime = time.time()
539 # __enter__ and __exit__ are inherited
542 if not self._packopen:
544 self._packopen = True
546 def _end(self, run_midx=True):
547 # Called by other PackWriter methods like breakpoint().
548 # Must not close the connection (self.file)
549 assert(run_midx) # We don't support this via remote yet
550 self.objcache, objcache = None, self.objcache
551 with nullcontext_if_not(objcache):
552 if not (self._packopen and self.file):
554 self.file.write(b'\0\0\0\0')
555 self._packopen = False
556 self.onclose() # Unbusy
557 if objcache is not None:
559 return self.suggest_packs() # Returns last idx received
562 # Called by inherited __exit__
563 self.remote_closed = True
566 super(PackWriter_Remote, self).close()
570 assert self.remote_closed
571 super(PackWriter_Remote, self).__del__()
574 raise ClientError("don't know how to abort remote pack writing")
576 def _raw_write(self, datalist, sha):
578 if not self._packopen:
581 data = b''.join(datalist)
584 crc = zlib.crc32(data) & 0xffffffff
585 outbuf = b''.join((struct.pack('!I', len(data) + 20 + 4),
587 struct.pack('!I', crc),
590 (self._bwcount, self._bwtime) = _raw_write_bwlimit(
591 self.file, outbuf, self._bwcount, self._bwtime)
593 reraise(ClientError(e))
594 self.outbytes += len(data)
597 if self.file.has_input():
599 self.objcache.refresh()