2 from __future__ import print_function
4 from __future__ import absolute_import
5 from binascii import hexlify, unhexlify
6 import errno, os, re, struct, sys, time, zlib
9 from bup import git, ssh, vfs
10 from bup.compat import environ, range, reraise
11 from bup.helpers import (Conn, atomically_replaced_file, chunkyreader, debug1,
12 debug2, linereader, lines_until_sentinel,
13 mkdirp, progress, qprogress, DemuxConn, atoi)
14 from bup.io import path_msg
15 from bup.vint import read_bvec, read_vuint, write_bvec
21 class ClientError(Exception):
25 def _raw_write_bwlimit(f, buf, bwcount, bwtime):
28 return (len(buf), time.time())
30 # We want to write in reasonably large blocks, but not so large that
31 # they're likely to overflow a router's queue. So our bwlimit timing
32 # has to be pretty granular. Also, if it takes too long from one
33 # transmit to the next, we can't just make up for lost time to bring
34 # the average back up to bwlimit - that will risk overflowing the
35 # outbound queue, which defeats the purpose. So if we fall behind
36 # by more than one block delay, we shouldn't ever try to catch up.
37 for i in range(0,len(buf),4096):
39 next = max(now, bwtime + 1.0*bwcount/bwlimit)
43 bwcount = len(sub) # might be less than 4096
45 return (bwcount, bwtime)
48 _protocol_rs = br'([a-z]+)://'
49 _host_rs = br'(?P<sb>\[)?((?(sb)[0-9a-f:]+|[^:/]+))(?(sb)\])'
50 _port_rs = br'(?::(\d+))?'
52 _url_rx = re.compile(br'%s(?:%s%s)?%s' % (_protocol_rs, _host_rs, _port_rs, _path_rs),
55 def parse_remote(remote):
56 url_match = _url_rx.match(remote)
58 if not url_match.group(1) in (b'ssh', b'bup', b'file'):
59 raise ClientError('unexpected protocol: %s'
60 % url_match.group(1).decode('ascii'))
61 return url_match.group(1,3,4,5)
63 rs = remote.split(b':', 1)
64 if len(rs) == 1 or rs[0] in (b'', b'-'):
65 return b'file', None, None, rs[-1]
67 return b'ssh', rs[0], None, rs[1]
71 def __init__(self, remote, create=False):
72 self._busy = self.conn = None
73 self.sock = self.p = self.pout = self.pin = None
74 is_reverse = environ.get(b'BUP_SERVER_REVERSE')
77 remote = b'%s:' % is_reverse
78 (self.protocol, self.host, self.port, self.dir) = parse_remote(remote)
79 # The b'None' here matches python2's behavior of b'%s' % None == 'None',
80 # python3 will (as of version 3.7.5) do the same for str ('%s' % None),
81 # but crashes instead when doing b'%s' % None.
82 cachehost = b'None' if self.host is None else self.host
83 cachedir = b'None' if self.dir is None else self.dir
84 self.cachedir = git.repo(b'index-cache/%s'
87 b'%s:%s' % (cachehost, cachedir)))
89 self.pout = os.fdopen(3, 'rb')
90 self.pin = os.fdopen(4, 'wb')
91 self.conn = Conn(self.pout, self.pin)
93 if self.protocol in (b'ssh', b'file'):
95 # FIXME: ssh and file shouldn't use the same module
96 self.p = ssh.connect(self.host, self.port, b'server')
97 self.pout = self.p.stdout
98 self.pin = self.p.stdin
99 self.conn = Conn(self.pout, self.pin)
101 reraise(ClientError('connect: %s' % e))
102 elif self.protocol == b'bup':
103 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
104 self.sock.connect((self.host, atoi(self.port) or 1982))
105 self.sockw = self.sock.makefile('wb')
106 self.conn = DemuxConn(self.sock.fileno(), self.sockw)
107 self._available_commands = self._get_available_commands()
108 self._require_command(b'init-dir')
109 self._require_command(b'set-dir')
111 self.dir = re.sub(br'[\r\n]', ' ', self.dir)
113 self.conn.write(b'init-dir %s\n' % self.dir)
115 self.conn.write(b'set-dir %s\n' % self.dir)
123 if e.errno == errno.EPIPE:
129 if self.conn and not self._busy:
130 self.conn.write(b'quit\n')
133 if self.sock and self.sockw:
135 self.sock.shutdown(socket.SHUT_WR)
146 raise ClientError('server tunnel returned exit code %d' % rv)
148 self.sock = self.p = self.pin = self.pout = None
154 raise ClientError('server exited unexpectedly with code %r'
157 return self.conn.check_ok()
158 except Exception as e:
159 reraise(ClientError(e))
161 def check_busy(self):
163 raise ClientError('already busy with command %r' % self._busy)
165 def ensure_busy(self):
167 raise ClientError('expected to be busy, but not busy?!')
172 def _get_available_commands(self):
176 conn.write(b'help\n')
178 line = self.conn.readline()
179 if not line == b'Commands:\n':
180 raise ClientError('unexpected help header ' + repr(line))
182 line = self.conn.readline()
185 if not line.startswith(b' '):
186 raise ClientError('unexpected help line ' + repr(line))
189 raise ClientError('unexpected help line ' + repr(line))
192 not_ok = self.check_ok()
196 return frozenset(result)
198 def _require_command(self, name):
199 if name not in self._available_commands:
200 raise ClientError('server does not appear to provide %s command'
201 % name.encode('ascii'))
203 def sync_indexes(self):
204 self._require_command(b'list-indexes')
207 mkdirp(self.cachedir)
208 # All cached idxs are extra until proven otherwise
210 for f in os.listdir(self.cachedir):
211 debug1(path_msg(f) + '\n')
212 if f.endswith(b'.idx'):
215 conn.write(b'list-indexes\n')
216 for line in linereader(conn):
219 assert(line.find(b'/') < 0)
220 parts = line.split(b' ')
222 if len(parts) == 2 and parts[1] == b'load' and idx not in extra:
223 # If the server requests that we load an idx and we don't
224 # already have a copy of it, it is needed
226 # Any idx that the server has heard of is proven not extra
230 debug1('client: removing extra indexes: %s\n' % extra)
232 os.unlink(os.path.join(self.cachedir, idx))
233 debug1('client: server requested load of: %s\n' % needed)
236 git.auto_midx(self.cachedir)
238 def sync_index(self, name):
239 self._require_command(b'send-index')
240 #debug1('requesting %r\n' % name)
242 mkdirp(self.cachedir)
243 fn = os.path.join(self.cachedir, name)
244 if os.path.exists(fn):
245 msg = ("won't request existing .idx, try `bup bloom --check %s`"
247 raise ClientError(msg)
248 self.conn.write(b'send-index %s\n' % name)
249 n = struct.unpack('!I', self.conn.read(4))[0]
251 with atomically_replaced_file(fn, 'wb') as f:
253 progress('Receiving index from server: %d/%d\r' % (count, n))
254 for b in chunkyreader(self.conn, n):
257 qprogress('Receiving index from server: %d/%d\r' % (count, n))
258 progress('Receiving index from server: %d/%d, done.\n' % (count, n))
261 def _make_objcache(self):
262 return git.PackIdxList(self.cachedir)
264 def _suggest_packs(self):
267 assert(ob == b'receive-objects-v2')
268 self.conn.write(b'\xff\xff\xff\xff') # suspend receive-objects-v2
270 for line in linereader(self.conn):
273 debug2('%r\n' % line)
274 if line.startswith(b'index '):
276 debug1('client: received index suggestion: %s\n'
277 % git.shorten_hash(idx).decode('ascii'))
278 suggested.append(idx)
280 assert(line.endswith(b'.idx'))
281 debug1('client: completed writing pack, idx: %s\n'
282 % git.shorten_hash(line).decode('ascii'))
283 suggested.append(line)
288 for idx in suggested:
290 git.auto_midx(self.cachedir)
293 self.conn.write(b'%s\n' % ob)
296 def new_packwriter(self, compression_level=1,
297 max_pack_size=None, max_pack_objects=None):
298 self._require_command(b'receive-objects-v2')
301 self._busy = b'receive-objects-v2'
302 self.conn.write(b'receive-objects-v2\n')
303 return PackWriter_Remote(self.conn,
304 objcache_maker = self._make_objcache,
305 suggest_packs = self._suggest_packs,
307 onclose = self._not_busy,
308 ensure_busy = self.ensure_busy,
309 compression_level=compression_level,
310 max_pack_size=max_pack_size,
311 max_pack_objects=max_pack_objects)
313 def read_ref(self, refname):
314 self._require_command(b'read-ref')
316 self.conn.write(b'read-ref %s\n' % refname)
317 r = self.conn.readline().strip()
320 assert(len(r) == 40) # hexified sha
323 return None # nonexistent ref
325 def update_ref(self, refname, newval, oldval):
326 self._require_command(b'update-ref')
328 self.conn.write(b'update-ref %s\n%s\n%s\n'
329 % (refname, hexlify(newval),
330 hexlify(oldval) if oldval else b''))
334 self._require_command(b'join')
337 # Send 'cat' so we'll work fine with older versions
338 self.conn.write(b'cat %s\n' % re.sub(br'[\n\r]', b'_', id))
340 sz = struct.unpack('!I', self.conn.read(4))[0]
342 yield self.conn.read(sz)
343 # FIXME: ok to assume the only NotOk is a KerError? (it is true atm)
347 raise KeyError(str(e))
349 def cat_batch(self, refs):
350 self._require_command(b'cat-batch')
352 self._busy = b'cat-batch'
354 conn.write(b'cat-batch\n')
355 # FIXME: do we want (only) binary protocol?
358 assert b'\n' not in ref
363 info = conn.readline()
364 if info == b'missing\n':
365 yield None, None, None, None
367 if not (info and info.endswith(b'\n')):
368 raise ClientError('Hit EOF while looking for object info: %r'
370 oidx, oid_t, size = info.split(b' ')
372 cr = chunkyreader(conn, size)
373 yield oidx, oid_t, size, cr
374 detritus = next(cr, None)
376 raise ClientError('unexpected leftover data ' + repr(detritus))
378 not_ok = self.check_ok()
383 def refs(self, patterns=None, limit_to_heads=False, limit_to_tags=False):
384 patterns = patterns or tuple()
385 self._require_command(b'refs')
389 conn.write(b'refs %d %d\n' % (1 if limit_to_heads else 0,
390 1 if limit_to_tags else 0))
391 for pattern in patterns:
392 assert b'\n' not in pattern
396 for line in lines_until_sentinel(conn, b'\n', ClientError):
398 oidx, name = line.split(b' ')
400 raise ClientError('Invalid object fingerprint in %r' % line)
402 raise ClientError('Invalid reference name in %r' % line)
403 yield name, unhexlify(oidx)
405 not_ok = self.check_ok()
410 def rev_list(self, refs, parse=None, format=None):
411 """See git.rev_list for the general semantics, but note that with the
412 current interface, the parse function must be able to handle
413 (consume) any blank lines produced by the format because the
414 first one received that it doesn't consume will be interpreted
415 as a terminator for the entire rev-list result.
418 self._require_command(b'rev-list')
420 assert b'\n' not in format
424 assert b'\n' not in ref
426 self._busy = b'rev-list'
428 conn.write(b'rev-list\n')
438 for line in lines_until_sentinel(conn, b'\n', ClientError):
440 assert len(line) == 40
443 for line in lines_until_sentinel(conn, b'\n', ClientError):
444 if not line.startswith(b'commit '):
445 raise ClientError('unexpected line ' + repr(line))
446 cmt_oidx = line[7:].strip()
447 assert len(cmt_oidx) == 40
448 yield cmt_oidx, parse(conn)
450 not_ok = self.check_ok()
455 def resolve(self, path, parent=None, want_meta=True, follow=False):
456 self._require_command(b'resolve')
458 self._busy = b'resolve'
460 conn.write(b'resolve %d\n' % ((1 if want_meta else 0)
461 | (2 if follow else 0)
462 | (4 if parent else 0)))
464 vfs.write_resolution(conn, parent)
465 write_bvec(conn, path)
466 success = ord(conn.read(1))
467 assert success in (0, 1)
469 result = vfs.read_resolution(conn)
471 result = vfs.read_ioerror(conn)
473 not_ok = self.check_ok()
477 if isinstance(result, vfs.IOError):
482 class PackWriter_Remote(git.PackWriter):
483 def __init__(self, conn, objcache_maker, suggest_packs,
488 max_pack_objects=None):
489 git.PackWriter.__init__(self,
490 objcache_maker=objcache_maker,
491 compression_level=compression_level,
492 max_pack_size=max_pack_size,
493 max_pack_objects=max_pack_objects)
495 self.filename = b'remote socket'
496 self.suggest_packs = suggest_packs
498 self.onclose = onclose
499 self.ensure_busy = ensure_busy
500 self._packopen = False
502 self._bwtime = time.time()
505 if not self._packopen:
507 self._packopen = True
509 def _end(self, run_midx=True):
510 assert(run_midx) # We don't support this via remote yet
511 if self._packopen and self.file:
512 self.file.write(b'\0\0\0\0')
513 self._packopen = False
514 self.onclose() # Unbusy
516 return self.suggest_packs() # Returns last idx received
524 raise ClientError("don't know how to abort remote pack writing")
526 def _raw_write(self, datalist, sha):
528 if not self._packopen:
531 data = b''.join(datalist)
534 crc = zlib.crc32(data) & 0xffffffff
535 outbuf = b''.join((struct.pack('!I', len(data) + 20 + 4),
537 struct.pack('!I', crc),
540 (self._bwcount, self._bwtime) = _raw_write_bwlimit(
541 self.file, outbuf, self._bwcount, self._bwtime)
543 reraise(ClientError(e))
544 self.outbytes += len(data)
547 if self.file.has_input():
549 self.objcache.refresh()