2 from __future__ import print_function
4 from __future__ import absolute_import
5 from binascii import hexlify, unhexlify
6 import os, re, struct, time, zlib
9 from bup import git, ssh, vfs
10 from bup.compat import environ, pending_raise, range, reraise
11 from bup.helpers import (Conn, atomically_replaced_file, chunkyreader, debug1,
12 debug2, linereader, lines_until_sentinel,
13 mkdirp, nullcontext_if_not, progress, qprogress, DemuxConn)
14 from bup.io import path_msg
15 from bup.vint import write_bvec
21 class ClientError(Exception):
25 def _raw_write_bwlimit(f, buf, bwcount, bwtime):
28 return (len(buf), time.time())
30 # We want to write in reasonably large blocks, but not so large that
31 # they're likely to overflow a router's queue. So our bwlimit timing
32 # has to be pretty granular. Also, if it takes too long from one
33 # transmit to the next, we can't just make up for lost time to bring
34 # the average back up to bwlimit - that will risk overflowing the
35 # outbound queue, which defeats the purpose. So if we fall behind
36 # by more than one block delay, we shouldn't ever try to catch up.
37 for i in range(0,len(buf),4096):
39 next = max(now, bwtime + 1.0*bwcount/bwlimit)
43 bwcount = len(sub) # might be less than 4096
45 return (bwcount, bwtime)
48 _protocol_rs = br'([a-z]+)://'
49 _host_rs = br'(?P<sb>\[)?((?(sb)[0-9a-f:]+|[^:/]+))(?(sb)\])'
50 _port_rs = br'(?::(\d+))?'
52 _url_rx = re.compile(br'%s(?:%s%s)?%s' % (_protocol_rs, _host_rs, _port_rs, _path_rs),
55 def parse_remote(remote):
56 url_match = _url_rx.match(remote)
58 if not url_match.group(1) in (b'ssh', b'bup', b'file'):
59 raise ClientError('unexpected protocol: %s'
60 % url_match.group(1).decode('ascii'))
61 return url_match.group(1,3,4,5)
63 rs = remote.split(b':', 1)
64 if len(rs) == 1 or rs[0] in (b'', b'-'):
65 return b'file', None, None, rs[-1]
67 return b'ssh', rs[0], None, rs[1]
71 def __init__(self, remote, create=False):
73 self._busy = self.conn = None
74 self.sock = self.p = self.pout = self.pin = None
75 is_reverse = environ.get(b'BUP_SERVER_REVERSE')
78 remote = b'%s:' % is_reverse
79 (self.protocol, self.host, self.port, self.dir) = parse_remote(remote)
80 # The b'None' here matches python2's behavior of b'%s' % None == 'None',
81 # python3 will (as of version 3.7.5) do the same for str ('%s' % None),
82 # but crashes instead when doing b'%s' % None.
83 cachehost = b'None' if self.host is None else self.host
84 cachedir = b'None' if self.dir is None else self.dir
85 self.cachedir = git.repo(b'index-cache/%s'
88 b'%s:%s' % (cachehost, cachedir)))
90 self.pout = os.fdopen(3, 'rb')
91 self.pin = os.fdopen(4, 'wb')
92 self.conn = Conn(self.pout, self.pin)
94 if self.protocol in (b'ssh', b'file'):
96 # FIXME: ssh and file shouldn't use the same module
97 self.p = ssh.connect(self.host, self.port, b'server')
98 self.pout = self.p.stdout
99 self.pin = self.p.stdin
100 self.conn = Conn(self.pout, self.pin)
102 reraise(ClientError('connect: %s' % e))
103 elif self.protocol == b'bup':
104 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
105 self.sock.connect((self.host,
106 1982 if self.port is None else int(self.port)))
107 self.sockw = self.sock.makefile('wb')
108 self.conn = DemuxConn(self.sock.fileno(), self.sockw)
109 self._available_commands = self._get_available_commands()
110 self._require_command(b'init-dir')
111 self._require_command(b'set-dir')
113 self.dir = re.sub(br'[\r\n]', ' ', self.dir)
115 self.conn.write(b'init-dir %s\n' % self.dir)
117 self.conn.write(b'set-dir %s\n' % self.dir)
120 except BaseException as ex:
121 with pending_raise(ex):
127 if self.conn and not self._busy:
128 self.conn.write(b'quit\n')
131 if self.sock and self.sockw:
133 self.sock.shutdown(socket.SHUT_WR)
144 raise ClientError('server tunnel returned exit code %d' % rv)
146 self.sock = self.p = self.pin = self.pout = None
154 def __exit__(self, type, value, traceback):
155 with pending_raise(value, rethrow=False):
162 raise ClientError('server exited unexpectedly with code %r'
165 return self.conn.check_ok()
166 except Exception as e:
167 reraise(ClientError(e))
168 # reraise doesn't return
171 def check_busy(self):
173 raise ClientError('already busy with command %r' % self._busy)
175 def ensure_busy(self):
177 raise ClientError('expected to be busy, but not busy?!')
182 def _get_available_commands(self):
186 conn.write(b'help\n')
188 line = self.conn.readline()
189 if not line == b'Commands:\n':
190 raise ClientError('unexpected help header ' + repr(line))
192 line = self.conn.readline()
195 if not line.startswith(b' '):
196 raise ClientError('unexpected help line ' + repr(line))
199 raise ClientError('unexpected help line ' + repr(line))
202 not_ok = self.check_ok()
206 return frozenset(result)
208 def _require_command(self, name):
209 if name not in self._available_commands:
210 raise ClientError('server does not appear to provide %s command'
211 % name.encode('ascii'))
213 def sync_indexes(self):
214 self._require_command(b'list-indexes')
217 mkdirp(self.cachedir)
218 # All cached idxs are extra until proven otherwise
220 for f in os.listdir(self.cachedir):
221 debug1(path_msg(f) + '\n')
222 if f.endswith(b'.idx'):
225 conn.write(b'list-indexes\n')
226 for line in linereader(conn):
229 assert(line.find(b'/') < 0)
230 parts = line.split(b' ')
232 if len(parts) == 2 and parts[1] == b'load' and idx not in extra:
233 # If the server requests that we load an idx and we don't
234 # already have a copy of it, it is needed
236 # Any idx that the server has heard of is proven not extra
240 debug1('client: removing extra indexes: %s\n' % extra)
242 os.unlink(os.path.join(self.cachedir, idx))
243 debug1('client: server requested load of: %s\n' % needed)
246 git.auto_midx(self.cachedir)
248 def sync_index(self, name):
249 self._require_command(b'send-index')
250 #debug1('requesting %r\n' % name)
252 mkdirp(self.cachedir)
253 fn = os.path.join(self.cachedir, name)
254 if os.path.exists(fn):
255 msg = ("won't request existing .idx, try `bup bloom --check %s`"
257 raise ClientError(msg)
258 self.conn.write(b'send-index %s\n' % name)
259 n = struct.unpack('!I', self.conn.read(4))[0]
261 with atomically_replaced_file(fn, 'wb') as f:
263 progress('Receiving index from server: %d/%d\r' % (count, n))
264 for b in chunkyreader(self.conn, n):
267 qprogress('Receiving index from server: %d/%d\r' % (count, n))
268 progress('Receiving index from server: %d/%d, done.\n' % (count, n))
271 def _make_objcache(self):
272 return git.PackIdxList(self.cachedir)
274 def _suggest_packs(self):
277 assert(ob == b'receive-objects-v2')
278 self.conn.write(b'\xff\xff\xff\xff') # suspend receive-objects-v2
280 for line in linereader(self.conn):
283 debug2('%r\n' % line)
284 if line.startswith(b'index '):
286 debug1('client: received index suggestion: %s\n'
287 % git.shorten_hash(idx).decode('ascii'))
288 suggested.append(idx)
290 assert(line.endswith(b'.idx'))
291 debug1('client: completed writing pack, idx: %s\n'
292 % git.shorten_hash(line).decode('ascii'))
293 suggested.append(line)
298 for idx in suggested:
300 git.auto_midx(self.cachedir)
303 self.conn.write(b'%s\n' % ob)
306 def new_packwriter(self, compression_level=1,
307 max_pack_size=None, max_pack_objects=None):
308 self._require_command(b'receive-objects-v2')
311 self._busy = b'receive-objects-v2'
312 self.conn.write(b'receive-objects-v2\n')
313 return PackWriter_Remote(self.conn,
314 objcache_maker = self._make_objcache,
315 suggest_packs = self._suggest_packs,
317 onclose = self._not_busy,
318 ensure_busy = self.ensure_busy,
319 compression_level=compression_level,
320 max_pack_size=max_pack_size,
321 max_pack_objects=max_pack_objects)
323 def read_ref(self, refname):
324 self._require_command(b'read-ref')
326 self.conn.write(b'read-ref %s\n' % refname)
327 r = self.conn.readline().strip()
330 assert(len(r) == 40) # hexified sha
333 return None # nonexistent ref
335 def update_ref(self, refname, newval, oldval):
336 self._require_command(b'update-ref')
338 self.conn.write(b'update-ref %s\n%s\n%s\n'
339 % (refname, hexlify(newval),
340 hexlify(oldval) if oldval else b''))
344 self._require_command(b'join')
347 # Send 'cat' so we'll work fine with older versions
348 self.conn.write(b'cat %s\n' % re.sub(br'[\n\r]', b'_', id))
350 sz = struct.unpack('!I', self.conn.read(4))[0]
352 yield self.conn.read(sz)
353 # FIXME: ok to assume the only NotOk is a KerError? (it is true atm)
357 raise KeyError(str(e))
359 def cat_batch(self, refs):
360 self._require_command(b'cat-batch')
362 self._busy = b'cat-batch'
364 conn.write(b'cat-batch\n')
365 # FIXME: do we want (only) binary protocol?
368 assert b'\n' not in ref
373 info = conn.readline()
374 if info == b'missing\n':
375 yield None, None, None, None
377 if not (info and info.endswith(b'\n')):
378 raise ClientError('Hit EOF while looking for object info: %r'
380 oidx, oid_t, size = info.split(b' ')
382 cr = chunkyreader(conn, size)
383 yield oidx, oid_t, size, cr
384 detritus = next(cr, None)
386 raise ClientError('unexpected leftover data ' + repr(detritus))
388 not_ok = self.check_ok()
393 def refs(self, patterns=None, limit_to_heads=False, limit_to_tags=False):
394 patterns = patterns or tuple()
395 self._require_command(b'refs')
399 conn.write(b'refs %d %d\n' % (1 if limit_to_heads else 0,
400 1 if limit_to_tags else 0))
401 for pattern in patterns:
402 assert b'\n' not in pattern
406 for line in lines_until_sentinel(conn, b'\n', ClientError):
408 oidx, name = line.split(b' ')
410 raise ClientError('Invalid object fingerprint in %r' % line)
412 raise ClientError('Invalid reference name in %r' % line)
413 yield name, unhexlify(oidx)
415 not_ok = self.check_ok()
420 def rev_list(self, refs, parse=None, format=None):
421 """See git.rev_list for the general semantics, but note that with the
422 current interface, the parse function must be able to handle
423 (consume) any blank lines produced by the format because the
424 first one received that it doesn't consume will be interpreted
425 as a terminator for the entire rev-list result.
428 self._require_command(b'rev-list')
430 assert b'\n' not in format
434 assert b'\n' not in ref
436 self._busy = b'rev-list'
438 conn.write(b'rev-list\n')
448 for line in lines_until_sentinel(conn, b'\n', ClientError):
450 assert len(line) == 40
453 for line in lines_until_sentinel(conn, b'\n', ClientError):
454 if not line.startswith(b'commit '):
455 raise ClientError('unexpected line ' + repr(line))
456 cmt_oidx = line[7:].strip()
457 assert len(cmt_oidx) == 40
458 yield cmt_oidx, parse(conn)
460 not_ok = self.check_ok()
465 def resolve(self, path, parent=None, want_meta=True, follow=False):
466 self._require_command(b'resolve')
468 self._busy = b'resolve'
470 conn.write(b'resolve %d\n' % ((1 if want_meta else 0)
471 | (2 if follow else 0)
472 | (4 if parent else 0)))
474 vfs.write_resolution(conn, parent)
475 write_bvec(conn, path)
476 success = ord(conn.read(1))
477 assert success in (0, 1)
479 result = vfs.read_resolution(conn)
481 result = vfs.read_ioerror(conn)
483 not_ok = self.check_ok()
487 if isinstance(result, vfs.IOError):
492 # FIXME: disentangle this (stop inheriting) from PackWriter
493 class PackWriter_Remote(git.PackWriter):
495 def __init__(self, conn, objcache_maker, suggest_packs,
500 max_pack_objects=None):
501 git.PackWriter.__init__(self,
502 objcache_maker=objcache_maker,
503 compression_level=compression_level,
504 max_pack_size=max_pack_size,
505 max_pack_objects=max_pack_objects)
506 self.remote_closed = False
508 self.filename = b'remote socket'
509 self.suggest_packs = suggest_packs
511 self.onclose = onclose
512 self.ensure_busy = ensure_busy
513 self._packopen = False
515 self._bwtime = time.time()
517 # __enter__ and __exit__ are inherited
520 if not self._packopen:
522 self._packopen = True
524 def _end(self, run_midx=True):
525 # Called by other PackWriter methods like breakpoint().
526 # Must not close the connection (self.file)
527 assert(run_midx) # We don't support this via remote yet
528 self.objcache, objcache = None, self.objcache
529 with nullcontext_if_not(objcache):
530 if not (self._packopen and self.file):
532 self.file.write(b'\0\0\0\0')
533 self._packopen = False
534 self.onclose() # Unbusy
535 if objcache is not None:
537 return self.suggest_packs() # Returns last idx received
540 # Called by inherited __exit__
541 self.remote_closed = True
544 super(PackWriter_Remote, self).close()
548 assert self.remote_closed
549 super(PackWriter_Remote, self).__del__()
552 raise ClientError("don't know how to abort remote pack writing")
554 def _raw_write(self, datalist, sha):
556 if not self._packopen:
559 data = b''.join(datalist)
562 crc = zlib.crc32(data) & 0xffffffff
563 outbuf = b''.join((struct.pack('!I', len(data) + 20 + 4),
565 struct.pack('!I', crc),
568 (self._bwcount, self._bwtime) = _raw_write_bwlimit(
569 self.file, outbuf, self._bwcount, self._bwtime)
571 reraise(ClientError(e))
572 self.outbytes += len(data)
575 if self.file.has_input():
577 self.objcache.refresh()