2 from __future__ import print_function
4 from __future__ import absolute_import
5 from binascii import hexlify, unhexlify
6 import os, re, struct, time, zlib
9 from bup import git, ssh, vfs
10 from bup.compat import environ, pending_raise, range, reraise
11 from bup.helpers import (Conn, atomically_replaced_file, chunkyreader, debug1,
12 debug2, linereader, lines_until_sentinel,
13 mkdirp, nullcontext_if_not, progress, qprogress, DemuxConn)
14 from bup.io import path_msg
15 from bup.vint import write_bvec
21 class ClientError(Exception):
25 def _raw_write_bwlimit(f, buf, bwcount, bwtime):
28 return (len(buf), time.time())
30 # We want to write in reasonably large blocks, but not so large that
31 # they're likely to overflow a router's queue. So our bwlimit timing
32 # has to be pretty granular. Also, if it takes too long from one
33 # transmit to the next, we can't just make up for lost time to bring
34 # the average back up to bwlimit - that will risk overflowing the
35 # outbound queue, which defeats the purpose. So if we fall behind
36 # by more than one block delay, we shouldn't ever try to catch up.
37 for i in range(0,len(buf),4096):
39 next = max(now, bwtime + 1.0*bwcount/bwlimit)
43 bwcount = len(sub) # might be less than 4096
45 return (bwcount, bwtime)
48 _protocol_rs = br'([a-z]+)://'
49 _host_rs = br'(?P<sb>\[)?((?(sb)[0-9a-f:]+|[^:/]+))(?(sb)\])'
50 _port_rs = br'(?::(\d+))?'
52 _url_rx = re.compile(br'%s(?:%s%s)?%s' % (_protocol_rs, _host_rs, _port_rs, _path_rs),
55 def parse_remote(remote):
56 url_match = _url_rx.match(remote)
58 if not url_match.group(1) in (b'ssh', b'bup', b'file'):
59 raise ClientError('unexpected protocol: %s'
60 % url_match.group(1).decode('ascii'))
61 return url_match.group(1,3,4,5)
63 rs = remote.split(b':', 1)
64 if len(rs) == 1 or rs[0] in (b'', b'-'):
65 return b'file', None, None, rs[-1]
67 return b'ssh', rs[0], None, rs[1]
71 def __init__(self, remote, create=False):
73 self._busy = self.conn = None
74 self.sock = self.p = self.pout = self.pin = None
75 is_reverse = environ.get(b'BUP_SERVER_REVERSE')
78 remote = b'%s:' % is_reverse
79 (self.protocol, self.host, self.port, self.dir) = parse_remote(remote)
80 # The b'None' here matches python2's behavior of b'%s' % None == 'None',
81 # python3 will (as of version 3.7.5) do the same for str ('%s' % None),
82 # but crashes instead when doing b'%s' % None.
83 cachehost = b'None' if self.host is None else self.host
84 cachedir = b'None' if self.dir is None else self.dir
85 self.cachedir = git.repo(b'index-cache/%s'
88 b'%s:%s' % (cachehost, cachedir)))
90 self.pout = os.fdopen(3, 'rb')
91 self.pin = os.fdopen(4, 'wb')
92 self.conn = Conn(self.pout, self.pin)
94 if self.protocol in (b'ssh', b'file'):
96 # FIXME: ssh and file shouldn't use the same module
97 self.p = ssh.connect(self.host, self.port, b'server')
98 self.pout = self.p.stdout
99 self.pin = self.p.stdin
100 self.conn = Conn(self.pout, self.pin)
102 reraise(ClientError('connect: %s' % e))
103 elif self.protocol == b'bup':
104 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
105 self.sock.connect((self.host,
106 1982 if self.port is None else int(self.port)))
107 self.sockw = self.sock.makefile('wb')
108 self.conn = DemuxConn(self.sock.fileno(), self.sockw)
109 self._available_commands = self._get_available_commands()
110 self._require_command(b'init-dir')
111 self._require_command(b'set-dir')
113 self.dir = re.sub(br'[\r\n]', ' ', self.dir)
115 self.conn.write(b'init-dir %s\n' % self.dir)
117 self.conn.write(b'set-dir %s\n' % self.dir)
123 if self.conn and not self._busy:
124 self.conn.write(b'quit\n')
127 if self.sock and self.sockw:
129 self.sock.shutdown(socket.SHUT_WR)
140 raise ClientError('server tunnel returned exit code %d' % rv)
142 self.sock = self.p = self.pin = self.pout = None
150 def __exit__(self, type, value, traceback):
151 with pending_raise(value, rethrow=False):
158 raise ClientError('server exited unexpectedly with code %r'
161 return self.conn.check_ok()
162 except Exception as e:
163 reraise(ClientError(e))
164 # reraise doesn't return
167 def check_busy(self):
169 raise ClientError('already busy with command %r' % self._busy)
171 def ensure_busy(self):
173 raise ClientError('expected to be busy, but not busy?!')
178 def _get_available_commands(self):
182 conn.write(b'help\n')
184 line = self.conn.readline()
185 if not line == b'Commands:\n':
186 raise ClientError('unexpected help header ' + repr(line))
188 line = self.conn.readline()
191 if not line.startswith(b' '):
192 raise ClientError('unexpected help line ' + repr(line))
195 raise ClientError('unexpected help line ' + repr(line))
198 not_ok = self.check_ok()
202 return frozenset(result)
204 def _require_command(self, name):
205 if name not in self._available_commands:
206 raise ClientError('server does not appear to provide %s command'
207 % name.encode('ascii'))
209 def sync_indexes(self):
210 self._require_command(b'list-indexes')
213 mkdirp(self.cachedir)
214 # All cached idxs are extra until proven otherwise
216 for f in os.listdir(self.cachedir):
217 debug1(path_msg(f) + '\n')
218 if f.endswith(b'.idx'):
221 conn.write(b'list-indexes\n')
222 for line in linereader(conn):
225 assert(line.find(b'/') < 0)
226 parts = line.split(b' ')
228 if len(parts) == 2 and parts[1] == b'load' and idx not in extra:
229 # If the server requests that we load an idx and we don't
230 # already have a copy of it, it is needed
232 # Any idx that the server has heard of is proven not extra
236 debug1('client: removing extra indexes: %s\n' % extra)
238 os.unlink(os.path.join(self.cachedir, idx))
239 debug1('client: server requested load of: %s\n' % needed)
242 git.auto_midx(self.cachedir)
244 def sync_index(self, name):
245 self._require_command(b'send-index')
246 #debug1('requesting %r\n' % name)
248 mkdirp(self.cachedir)
249 fn = os.path.join(self.cachedir, name)
250 if os.path.exists(fn):
251 msg = ("won't request existing .idx, try `bup bloom --check %s`"
253 raise ClientError(msg)
254 self.conn.write(b'send-index %s\n' % name)
255 n = struct.unpack('!I', self.conn.read(4))[0]
257 with atomically_replaced_file(fn, 'wb') as f:
259 progress('Receiving index from server: %d/%d\r' % (count, n))
260 for b in chunkyreader(self.conn, n):
263 qprogress('Receiving index from server: %d/%d\r' % (count, n))
264 progress('Receiving index from server: %d/%d, done.\n' % (count, n))
267 def _make_objcache(self):
268 return git.PackIdxList(self.cachedir)
270 def _suggest_packs(self):
273 assert(ob == b'receive-objects-v2')
274 self.conn.write(b'\xff\xff\xff\xff') # suspend receive-objects-v2
276 for line in linereader(self.conn):
279 debug2('%r\n' % line)
280 if line.startswith(b'index '):
282 debug1('client: received index suggestion: %s\n'
283 % git.shorten_hash(idx).decode('ascii'))
284 suggested.append(idx)
286 assert(line.endswith(b'.idx'))
287 debug1('client: completed writing pack, idx: %s\n'
288 % git.shorten_hash(line).decode('ascii'))
289 suggested.append(line)
294 for idx in suggested:
296 git.auto_midx(self.cachedir)
299 self.conn.write(b'%s\n' % ob)
302 def new_packwriter(self, compression_level=1,
303 max_pack_size=None, max_pack_objects=None):
304 self._require_command(b'receive-objects-v2')
307 self._busy = b'receive-objects-v2'
308 self.conn.write(b'receive-objects-v2\n')
309 return PackWriter_Remote(self.conn,
310 objcache_maker = self._make_objcache,
311 suggest_packs = self._suggest_packs,
313 onclose = self._not_busy,
314 ensure_busy = self.ensure_busy,
315 compression_level=compression_level,
316 max_pack_size=max_pack_size,
317 max_pack_objects=max_pack_objects)
319 def read_ref(self, refname):
320 self._require_command(b'read-ref')
322 self.conn.write(b'read-ref %s\n' % refname)
323 r = self.conn.readline().strip()
326 assert(len(r) == 40) # hexified sha
329 return None # nonexistent ref
331 def update_ref(self, refname, newval, oldval):
332 self._require_command(b'update-ref')
334 self.conn.write(b'update-ref %s\n%s\n%s\n'
335 % (refname, hexlify(newval),
336 hexlify(oldval) if oldval else b''))
340 self._require_command(b'join')
343 # Send 'cat' so we'll work fine with older versions
344 self.conn.write(b'cat %s\n' % re.sub(br'[\n\r]', b'_', id))
346 sz = struct.unpack('!I', self.conn.read(4))[0]
348 yield self.conn.read(sz)
349 # FIXME: ok to assume the only NotOk is a KerError? (it is true atm)
353 raise KeyError(str(e))
355 def cat_batch(self, refs):
356 self._require_command(b'cat-batch')
358 self._busy = b'cat-batch'
360 conn.write(b'cat-batch\n')
361 # FIXME: do we want (only) binary protocol?
364 assert b'\n' not in ref
369 info = conn.readline()
370 if info == b'missing\n':
371 yield None, None, None, None
373 if not (info and info.endswith(b'\n')):
374 raise ClientError('Hit EOF while looking for object info: %r'
376 oidx, oid_t, size = info.split(b' ')
378 cr = chunkyreader(conn, size)
379 yield oidx, oid_t, size, cr
380 detritus = next(cr, None)
382 raise ClientError('unexpected leftover data ' + repr(detritus))
384 not_ok = self.check_ok()
389 def refs(self, patterns=None, limit_to_heads=False, limit_to_tags=False):
390 patterns = patterns or tuple()
391 self._require_command(b'refs')
395 conn.write(b'refs %d %d\n' % (1 if limit_to_heads else 0,
396 1 if limit_to_tags else 0))
397 for pattern in patterns:
398 assert b'\n' not in pattern
402 for line in lines_until_sentinel(conn, b'\n', ClientError):
404 oidx, name = line.split(b' ')
406 raise ClientError('Invalid object fingerprint in %r' % line)
408 raise ClientError('Invalid reference name in %r' % line)
409 yield name, unhexlify(oidx)
411 not_ok = self.check_ok()
416 def rev_list(self, refs, parse=None, format=None):
417 """See git.rev_list for the general semantics, but note that with the
418 current interface, the parse function must be able to handle
419 (consume) any blank lines produced by the format because the
420 first one received that it doesn't consume will be interpreted
421 as a terminator for the entire rev-list result.
424 self._require_command(b'rev-list')
426 assert b'\n' not in format
430 assert b'\n' not in ref
432 self._busy = b'rev-list'
434 conn.write(b'rev-list\n')
444 for line in lines_until_sentinel(conn, b'\n', ClientError):
446 assert len(line) == 40
449 for line in lines_until_sentinel(conn, b'\n', ClientError):
450 if not line.startswith(b'commit '):
451 raise ClientError('unexpected line ' + repr(line))
452 cmt_oidx = line[7:].strip()
453 assert len(cmt_oidx) == 40
454 yield cmt_oidx, parse(conn)
456 not_ok = self.check_ok()
461 def resolve(self, path, parent=None, want_meta=True, follow=False):
462 self._require_command(b'resolve')
464 self._busy = b'resolve'
466 conn.write(b'resolve %d\n' % ((1 if want_meta else 0)
467 | (2 if follow else 0)
468 | (4 if parent else 0)))
470 vfs.write_resolution(conn, parent)
471 write_bvec(conn, path)
472 success = ord(conn.read(1))
473 assert success in (0, 1)
475 result = vfs.read_resolution(conn)
477 result = vfs.read_ioerror(conn)
479 not_ok = self.check_ok()
483 if isinstance(result, vfs.IOError):
488 # FIXME: disentangle this (stop inheriting) from PackWriter
489 class PackWriter_Remote(git.PackWriter):
491 def __init__(self, conn, objcache_maker, suggest_packs,
496 max_pack_objects=None):
497 git.PackWriter.__init__(self,
498 objcache_maker=objcache_maker,
499 compression_level=compression_level,
500 max_pack_size=max_pack_size,
501 max_pack_objects=max_pack_objects)
502 self.remote_closed = False
504 self.filename = b'remote socket'
505 self.suggest_packs = suggest_packs
507 self.onclose = onclose
508 self.ensure_busy = ensure_busy
509 self._packopen = False
511 self._bwtime = time.time()
513 # __enter__ and __exit__ are inherited
516 if not self._packopen:
518 self._packopen = True
520 def _end(self, run_midx=True):
521 # Called by other PackWriter methods like breakpoint().
522 # Must not close the connection (self.file)
523 assert(run_midx) # We don't support this via remote yet
524 self.objcache, objcache = None, self.objcache
525 with nullcontext_if_not(objcache):
526 if not (self._packopen and self.file):
528 self.file.write(b'\0\0\0\0')
529 self._packopen = False
530 self.onclose() # Unbusy
531 if objcache is not None:
533 return self.suggest_packs() # Returns last idx received
536 # Called by inherited __exit__
537 self.remote_closed = True
543 assert self.remote_closed
544 super(PackWriter_Remote, self).__del__()
547 raise ClientError("don't know how to abort remote pack writing")
549 def _raw_write(self, datalist, sha):
551 if not self._packopen:
554 data = b''.join(datalist)
557 crc = zlib.crc32(data) & 0xffffffff
558 outbuf = b''.join((struct.pack('!I', len(data) + 20 + 4),
560 struct.pack('!I', crc),
563 (self._bwcount, self._bwtime) = _raw_write_bwlimit(
564 self.file, outbuf, self._bwcount, self._bwtime)
566 reraise(ClientError(e))
567 self.outbytes += len(data)
570 if self.file.has_input():
572 self.objcache.refresh()