2 from __future__ import print_function
4 from __future__ import absolute_import
5 from binascii import hexlify, unhexlify
6 import errno, os, re, struct, sys, time, zlib
9 from bup import git, ssh, vfs
10 from bup.compat import environ, range, reraise
11 from bup.helpers import (Conn, atomically_replaced_file, chunkyreader, debug1,
12 debug2, linereader, lines_until_sentinel,
13 mkdirp, progress, qprogress, DemuxConn, atoi)
14 from bup.io import path_msg
15 from bup.vint import read_bvec, read_vuint, write_bvec
21 class ClientError(Exception):
25 def _raw_write_bwlimit(f, buf, bwcount, bwtime):
28 return (len(buf), time.time())
30 # We want to write in reasonably large blocks, but not so large that
31 # they're likely to overflow a router's queue. So our bwlimit timing
32 # has to be pretty granular. Also, if it takes too long from one
33 # transmit to the next, we can't just make up for lost time to bring
34 # the average back up to bwlimit - that will risk overflowing the
35 # outbound queue, which defeats the purpose. So if we fall behind
36 # by more than one block delay, we shouldn't ever try to catch up.
37 for i in range(0,len(buf),4096):
39 next = max(now, bwtime + 1.0*bwcount/bwlimit)
43 bwcount = len(sub) # might be less than 4096
45 return (bwcount, bwtime)
48 _protocol_rs = br'([a-z]+)://'
49 _host_rs = br'(?P<sb>\[)?((?(sb)[0-9a-f:]+|[^:/]+))(?(sb)\])'
50 _port_rs = br'(?::(\d+))?'
52 _url_rx = re.compile(br'%s(?:%s%s)?%s' % (_protocol_rs, _host_rs, _port_rs, _path_rs),
55 def parse_remote(remote):
56 url_match = _url_rx.match(remote)
58 if not url_match.group(1) in (b'ssh', b'bup', b'file'):
59 raise ClientError('unexpected protocol: %s'
60 % url_match.group(1).decode('ascii'))
61 return url_match.group(1,3,4,5)
63 rs = remote.split(b':', 1)
64 if len(rs) == 1 or rs[0] in (b'', b'-'):
65 return b'file', None, None, rs[-1]
67 return b'ssh', rs[0], None, rs[1]
71 def __init__(self, remote, create=False):
72 self._busy = self.conn = None
73 self.sock = self.p = self.pout = self.pin = None
74 is_reverse = environ.get(b'BUP_SERVER_REVERSE')
77 remote = b'%s:' % is_reverse
78 (self.protocol, self.host, self.port, self.dir) = parse_remote(remote)
79 # The b'None' here matches python2's behavior of b'%s' % None == 'None',
80 # python3 will (as of version 3.7.5) do the same for str ('%s' % None),
81 # but crashes instead when doing b'%s' % None.
82 cachehost = b'None' if self.host is None else self.host
83 cachedir = b'None' if self.dir is None else self.dir
84 self.cachedir = git.repo(b'index-cache/%s'
87 b'%s:%s' % (cachehost, cachedir)))
89 self.pout = os.fdopen(3, 'rb')
90 self.pin = os.fdopen(4, 'wb')
91 self.conn = Conn(self.pout, self.pin)
93 if self.protocol in (b'ssh', b'file'):
95 # FIXME: ssh and file shouldn't use the same module
96 self.p = ssh.connect(self.host, self.port, b'server')
97 self.pout = self.p.stdout
98 self.pin = self.p.stdin
99 self.conn = Conn(self.pout, self.pin)
101 reraise(ClientError('connect: %s' % e))
102 elif self.protocol == b'bup':
103 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
104 self.sock.connect((self.host, atoi(self.port) or 1982))
105 self.sockw = self.sock.makefile('wb')
106 self.conn = DemuxConn(self.sock.fileno(), self.sockw)
107 self._available_commands = self._get_available_commands()
108 self._require_command(b'init-dir')
109 self._require_command(b'set-dir')
111 self.dir = re.sub(br'[\r\n]', ' ', self.dir)
113 self.conn.write(b'init-dir %s\n' % self.dir)
115 self.conn.write(b'set-dir %s\n' % self.dir)
123 if e.errno == errno.EPIPE:
129 if self.conn and not self._busy:
130 self.conn.write(b'quit\n')
133 if self.sock and self.sockw:
135 self.sock.shutdown(socket.SHUT_WR)
146 raise ClientError('server tunnel returned exit code %d' % rv)
148 self.sock = self.p = self.pin = self.pout = None
154 raise ClientError('server exited unexpectedly with code %r'
157 return self.conn.check_ok()
158 except Exception as e:
159 reraise(ClientError(e))
161 def check_busy(self):
163 raise ClientError('already busy with command %r' % self._busy)
165 def ensure_busy(self):
167 raise ClientError('expected to be busy, but not busy?!')
172 def _get_available_commands(self):
176 conn.write(b'help\n')
178 line = self.conn.readline()
179 if not line == b'Commands:\n':
180 raise ClientError('unexpected help header ' + repr(line))
182 line = self.conn.readline()
185 if not line.startswith(b' '):
186 raise ClientError('unexpected help line ' + repr(line))
189 raise ClientError('unexpected help line ' + repr(line))
192 not_ok = self.check_ok()
196 return frozenset(result)
198 def _require_command(self, name):
199 if name not in self._available_commands:
200 raise ClientError('server does not appear to provide %s command'
201 % name.encode('ascii'))
203 def sync_indexes(self):
204 self._require_command(b'list-indexes')
207 mkdirp(self.cachedir)
208 # All cached idxs are extra until proven otherwise
210 for f in os.listdir(self.cachedir):
211 debug1(path_msg(f) + '\n')
212 if f.endswith(b'.idx'):
215 conn.write(b'list-indexes\n')
216 for line in linereader(conn):
219 assert(line.find(b'/') < 0)
220 parts = line.split(b' ')
222 if len(parts) == 2 and parts[1] == b'load' and idx not in extra:
223 # If the server requests that we load an idx and we don't
224 # already have a copy of it, it is needed
226 # Any idx that the server has heard of is proven not extra
230 debug1('client: removing extra indexes: %s\n' % extra)
232 os.unlink(os.path.join(self.cachedir, idx))
233 debug1('client: server requested load of: %s\n' % needed)
236 git.auto_midx(self.cachedir)
238 def sync_index(self, name):
239 self._require_command(b'send-index')
240 #debug1('requesting %r\n' % name)
242 mkdirp(self.cachedir)
243 fn = os.path.join(self.cachedir, name)
244 if os.path.exists(fn):
245 msg = ("won't request existing .idx, try `bup bloom --check %s`"
247 raise ClientError(msg)
248 self.conn.write(b'send-index %s\n' % name)
249 n = struct.unpack('!I', self.conn.read(4))[0]
251 with atomically_replaced_file(fn, 'wb') as f:
253 progress('Receiving index from server: %d/%d\r' % (count, n))
254 for b in chunkyreader(self.conn, n):
257 qprogress('Receiving index from server: %d/%d\r' % (count, n))
258 progress('Receiving index from server: %d/%d, done.\n' % (count, n))
261 def _make_objcache(self):
262 return git.PackIdxList(self.cachedir)
264 def _suggest_packs(self):
267 assert(ob == b'receive-objects-v2')
268 self.conn.write(b'\xff\xff\xff\xff') # suspend receive-objects-v2
270 for line in linereader(self.conn):
273 debug2('%r\n' % line)
274 if line.startswith(b'index '):
276 debug1('client: received index suggestion: %s\n'
277 % git.shorten_hash(idx).decode('ascii'))
278 suggested.append(idx)
280 assert(line.endswith(b'.idx'))
281 debug1('client: completed writing pack, idx: %s\n'
282 % git.shorten_hash(line).decode('ascii'))
283 suggested.append(line)
288 for idx in suggested:
290 git.auto_midx(self.cachedir)
293 self.conn.write(b'%s\n' % ob)
296 def new_packwriter(self, compression_level=1,
297 max_pack_size=None, max_pack_objects=None):
298 self._require_command(b'receive-objects-v2')
301 self._busy = b'receive-objects-v2'
302 self.conn.write(b'receive-objects-v2\n')
303 return PackWriter_Remote(self.conn,
304 objcache_maker = self._make_objcache,
305 suggest_packs = self._suggest_packs,
307 onclose = self._not_busy,
308 ensure_busy = self.ensure_busy,
309 compression_level=compression_level,
310 max_pack_size=max_pack_size,
311 max_pack_objects=max_pack_objects)
313 def read_ref(self, refname):
314 self._require_command(b'read-ref')
316 self.conn.write(b'read-ref %s\n' % refname)
317 r = self.conn.readline().strip()
320 assert(len(r) == 40) # hexified sha
323 return None # nonexistent ref
325 def update_ref(self, refname, newval, oldval):
326 self._require_command(b'update-ref')
328 self.conn.write(b'update-ref %s\n%s\n%s\n'
329 % (refname, hexlify(newval),
330 hexlify(oldval) if oldval else b''))
334 self._require_command(b'join')
337 # Send 'cat' so we'll work fine with older versions
338 self.conn.write(b'cat %s\n' % re.sub(br'[\n\r]', b'_', id))
340 sz = struct.unpack('!I', self.conn.read(4))[0]
342 yield self.conn.read(sz)
343 # FIXME: ok to assume the only NotOk is a KerError? (it is true atm)
347 raise KeyError(str(e))
349 def cat_batch(self, refs):
350 self._require_command(b'cat-batch')
352 self._busy = b'cat-batch'
354 conn.write(b'cat-batch\n')
355 # FIXME: do we want (only) binary protocol?
358 assert b'\n' not in ref
363 info = conn.readline()
364 if info == b'missing\n':
365 yield None, None, None, None
367 if not (info and info.endswith(b'\n')):
368 raise ClientError('Hit EOF while looking for object info: %r'
370 oidx, oid_t, size = info.split(b' ')
372 cr = chunkyreader(conn, size)
373 yield oidx, oid_t, size, cr
374 detritus = next(cr, None)
376 raise ClientError('unexpected leftover data ' + repr(detritus))
378 not_ok = self.check_ok()
383 def refs(self, patterns=None, limit_to_heads=False, limit_to_tags=False):
384 patterns = patterns or tuple()
385 self._require_command(b'refs')
389 conn.write(b'refs %d %d\n' % (1 if limit_to_heads else 0,
390 1 if limit_to_tags else 0))
391 for pattern in patterns:
392 assert b'\n' not in pattern
396 for line in lines_until_sentinel(conn, b'\n', ClientError):
398 oidx, name = line.split(b' ')
400 raise ClientError('Invalid object fingerprint in %r' % line)
402 raise ClientError('Invalid reference name in %r' % line)
403 yield name, unhexlify(oidx)
405 not_ok = self.check_ok()
410 def rev_list(self, refs, count=None, parse=None, format=None):
411 """See git.rev_list for the general semantics, but note that with the
412 current interface, the parse function must be able to handle
413 (consume) any blank lines produced by the format because the
414 first one received that it doesn't consume will be interpreted
415 as a terminator for the entire rev-list result.
418 self._require_command(b'rev-list')
419 assert (count is None) or (isinstance(count, Integral))
421 assert b'\n' not in format
425 assert b'\n' not in ref
427 self._busy = b'rev-list'
429 conn.write(b'rev-list\n')
430 if count is not None:
431 conn.write(b'%d' % count)
441 for line in lines_until_sentinel(conn, b'\n', ClientError):
443 assert len(line) == 40
446 for line in lines_until_sentinel(conn, b'\n', ClientError):
447 if not line.startswith(b'commit '):
448 raise ClientError('unexpected line ' + repr(line))
449 cmt_oidx = line[7:].strip()
450 assert len(cmt_oidx) == 40
451 yield cmt_oidx, parse(conn)
453 not_ok = self.check_ok()
458 def resolve(self, path, parent=None, want_meta=True, follow=False):
459 self._require_command(b'resolve')
461 self._busy = b'resolve'
463 conn.write(b'resolve %d\n' % ((1 if want_meta else 0)
464 | (2 if follow else 0)
465 | (4 if parent else 0)))
467 vfs.write_resolution(conn, parent)
468 write_bvec(conn, path)
469 success = ord(conn.read(1))
470 assert success in (0, 1)
472 result = vfs.read_resolution(conn)
474 result = vfs.read_ioerror(conn)
476 not_ok = self.check_ok()
480 if isinstance(result, vfs.IOError):
485 class PackWriter_Remote(git.PackWriter):
486 def __init__(self, conn, objcache_maker, suggest_packs,
491 max_pack_objects=None):
492 git.PackWriter.__init__(self,
493 objcache_maker=objcache_maker,
494 compression_level=compression_level,
495 max_pack_size=max_pack_size,
496 max_pack_objects=max_pack_objects)
498 self.filename = b'remote socket'
499 self.suggest_packs = suggest_packs
501 self.onclose = onclose
502 self.ensure_busy = ensure_busy
503 self._packopen = False
505 self._bwtime = time.time()
508 if not self._packopen:
510 self._packopen = True
512 def _end(self, run_midx=True):
513 assert(run_midx) # We don't support this via remote yet
514 if self._packopen and self.file:
515 self.file.write(b'\0\0\0\0')
516 self._packopen = False
517 self.onclose() # Unbusy
519 return self.suggest_packs() # Returns last idx received
527 raise ClientError("don't know how to abort remote pack writing")
529 def _raw_write(self, datalist, sha):
531 if not self._packopen:
534 data = b''.join(datalist)
537 crc = zlib.crc32(data) & 0xffffffff
538 outbuf = b''.join((struct.pack('!I', len(data) + 20 + 4),
540 struct.pack('!I', crc),
543 (self._bwcount, self._bwtime) = _raw_write_bwlimit(
544 self.file, outbuf, self._bwcount, self._bwtime)
546 reraise(ClientError(e))
547 self.outbytes += len(data)
550 if self.file.has_input():
552 self.objcache.refresh()