2 from __future__ import print_function
4 from __future__ import absolute_import
5 from binascii import hexlify, unhexlify
6 import errno, os, re, struct, sys, time, zlib
8 from bup import git, ssh, vfs
9 from bup.compat import environ, range, reraise
10 from bup.helpers import (Conn, atomically_replaced_file, chunkyreader, debug1,
11 debug2, linereader, lines_until_sentinel,
12 mkdirp, progress, qprogress, DemuxConn, atoi)
13 from bup.io import path_msg
14 from bup.vint import read_bvec, read_vuint, write_bvec
20 class ClientError(Exception):
24 def _raw_write_bwlimit(f, buf, bwcount, bwtime):
27 return (len(buf), time.time())
29 # We want to write in reasonably large blocks, but not so large that
30 # they're likely to overflow a router's queue. So our bwlimit timing
31 # has to be pretty granular. Also, if it takes too long from one
32 # transmit to the next, we can't just make up for lost time to bring
33 # the average back up to bwlimit - that will risk overflowing the
34 # outbound queue, which defeats the purpose. So if we fall behind
35 # by more than one block delay, we shouldn't ever try to catch up.
36 for i in range(0,len(buf),4096):
38 next = max(now, bwtime + 1.0*bwcount/bwlimit)
42 bwcount = len(sub) # might be less than 4096
44 return (bwcount, bwtime)
47 _protocol_rs = br'([a-z]+)://'
48 _host_rs = br'(?P<sb>\[)?((?(sb)[0-9a-f:]+|[^:/]+))(?(sb)\])'
49 _port_rs = br'(?::(\d+))?'
51 _url_rx = re.compile(br'%s(?:%s%s)?%s' % (_protocol_rs, _host_rs, _port_rs, _path_rs),
54 def parse_remote(remote):
55 url_match = _url_rx.match(remote)
57 if not url_match.group(1) in (b'ssh', b'bup', b'file'):
58 raise ClientError('unexpected protocol: %s'
59 % url_match.group(1).decode('ascii'))
60 return url_match.group(1,3,4,5)
62 rs = remote.split(b':', 1)
63 if len(rs) == 1 or rs[0] in (b'', b'-'):
64 return b'file', None, None, rs[-1]
66 return b'ssh', rs[0], None, rs[1]
70 def __init__(self, remote, create=False):
71 self._busy = self.conn = None
72 self.sock = self.p = self.pout = self.pin = None
73 is_reverse = environ.get(b'BUP_SERVER_REVERSE')
76 remote = b'%s:' % is_reverse
77 (self.protocol, self.host, self.port, self.dir) = parse_remote(remote)
78 # The b'None' here matches python2's behavior of b'%s' % None == 'None',
79 # python3 will (as of version 3.7.5) do the same for str ('%s' % None),
80 # but crashes instead when doing b'%s' % None.
81 cachehost = b'None' if self.host is None else self.host
82 cachedir = b'None' if self.dir is None else self.dir
83 self.cachedir = git.repo(b'index-cache/%s'
86 b'%s:%s' % (cachehost, cachedir)))
88 self.pout = os.fdopen(3, 'rb')
89 self.pin = os.fdopen(4, 'wb')
90 self.conn = Conn(self.pout, self.pin)
92 if self.protocol in (b'ssh', b'file'):
94 # FIXME: ssh and file shouldn't use the same module
95 self.p = ssh.connect(self.host, self.port, b'server')
96 self.pout = self.p.stdout
97 self.pin = self.p.stdin
98 self.conn = Conn(self.pout, self.pin)
100 reraise(ClientError('connect: %s' % e))
101 elif self.protocol == b'bup':
102 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
103 self.sock.connect((self.host, atoi(self.port) or 1982))
104 self.sockw = self.sock.makefile('wb')
105 self.conn = DemuxConn(self.sock.fileno(), self.sockw)
106 self._available_commands = self._get_available_commands()
107 self._require_command(b'init-dir')
108 self._require_command(b'set-dir')
110 self.dir = re.sub(br'[\r\n]', ' ', self.dir)
112 self.conn.write(b'init-dir %s\n' % self.dir)
114 self.conn.write(b'set-dir %s\n' % self.dir)
122 if e.errno == errno.EPIPE:
128 if self.conn and not self._busy:
129 self.conn.write(b'quit\n')
132 if self.sock and self.sockw:
134 self.sock.shutdown(socket.SHUT_WR)
145 raise ClientError('server tunnel returned exit code %d' % rv)
147 self.sock = self.p = self.pin = self.pout = None
153 raise ClientError('server exited unexpectedly with code %r'
156 return self.conn.check_ok()
157 except Exception as e:
158 reraise(ClientError(e))
160 def check_busy(self):
162 raise ClientError('already busy with command %r' % self._busy)
164 def ensure_busy(self):
166 raise ClientError('expected to be busy, but not busy?!')
171 def _get_available_commands(self):
175 conn.write(b'help\n')
177 line = self.conn.readline()
178 if not line == b'Commands:\n':
179 raise ClientError('unexpected help header ' + repr(line))
181 line = self.conn.readline()
184 if not line.startswith(b' '):
185 raise ClientError('unexpected help line ' + repr(line))
188 raise ClientError('unexpected help line ' + repr(line))
191 not_ok = self.check_ok()
195 return frozenset(result)
197 def _require_command(self, name):
198 if name not in self._available_commands:
199 raise ClientError('server does not appear to provide %s command'
200 % name.encode('ascii'))
202 def sync_indexes(self):
203 self._require_command(b'list-indexes')
206 mkdirp(self.cachedir)
207 # All cached idxs are extra until proven otherwise
209 for f in os.listdir(self.cachedir):
210 debug1(path_msg(f) + '\n')
211 if f.endswith(b'.idx'):
214 conn.write(b'list-indexes\n')
215 for line in linereader(conn):
218 assert(line.find(b'/') < 0)
219 parts = line.split(b' ')
221 if len(parts) == 2 and parts[1] == b'load' and idx not in extra:
222 # If the server requests that we load an idx and we don't
223 # already have a copy of it, it is needed
225 # Any idx that the server has heard of is proven not extra
229 debug1('client: removing extra indexes: %s\n' % extra)
231 os.unlink(os.path.join(self.cachedir, idx))
232 debug1('client: server requested load of: %s\n' % needed)
235 git.auto_midx(self.cachedir)
237 def sync_index(self, name):
238 self._require_command(b'send-index')
239 #debug1('requesting %r\n' % name)
241 mkdirp(self.cachedir)
242 fn = os.path.join(self.cachedir, name)
243 if os.path.exists(fn):
244 msg = ("won't request existing .idx, try `bup bloom --check %s`"
246 raise ClientError(msg)
247 self.conn.write(b'send-index %s\n' % name)
248 n = struct.unpack('!I', self.conn.read(4))[0]
250 with atomically_replaced_file(fn, 'wb') as f:
252 progress('Receiving index from server: %d/%d\r' % (count, n))
253 for b in chunkyreader(self.conn, n):
256 qprogress('Receiving index from server: %d/%d\r' % (count, n))
257 progress('Receiving index from server: %d/%d, done.\n' % (count, n))
260 def _make_objcache(self):
261 return git.PackIdxList(self.cachedir)
263 def _suggest_packs(self):
266 assert(ob == b'receive-objects-v2')
267 self.conn.write(b'\xff\xff\xff\xff') # suspend receive-objects-v2
269 for line in linereader(self.conn):
272 debug2('%r\n' % line)
273 if line.startswith(b'index '):
275 debug1('client: received index suggestion: %s\n'
276 % git.shorten_hash(idx).decode('ascii'))
277 suggested.append(idx)
279 assert(line.endswith(b'.idx'))
280 debug1('client: completed writing pack, idx: %s\n'
281 % git.shorten_hash(line).decode('ascii'))
282 suggested.append(line)
287 for idx in suggested:
289 git.auto_midx(self.cachedir)
292 self.conn.write(b'%s\n' % ob)
295 def new_packwriter(self, compression_level=1,
296 max_pack_size=None, max_pack_objects=None):
297 self._require_command(b'receive-objects-v2')
300 self._busy = b'receive-objects-v2'
301 self.conn.write(b'receive-objects-v2\n')
302 return PackWriter_Remote(self.conn,
303 objcache_maker = self._make_objcache,
304 suggest_packs = self._suggest_packs,
306 onclose = self._not_busy,
307 ensure_busy = self.ensure_busy,
308 compression_level=compression_level,
309 max_pack_size=max_pack_size,
310 max_pack_objects=max_pack_objects)
312 def read_ref(self, refname):
313 self._require_command(b'read-ref')
315 self.conn.write(b'read-ref %s\n' % refname)
316 r = self.conn.readline().strip()
319 assert(len(r) == 40) # hexified sha
322 return None # nonexistent ref
324 def update_ref(self, refname, newval, oldval):
325 self._require_command(b'update-ref')
327 self.conn.write(b'update-ref %s\n%s\n%s\n'
328 % (refname, hexlify(newval),
329 hexlify(oldval) if oldval else b''))
333 self._require_command(b'join')
336 # Send 'cat' so we'll work fine with older versions
337 self.conn.write(b'cat %s\n' % re.sub(br'[\n\r]', b'_', id))
339 sz = struct.unpack('!I', self.conn.read(4))[0]
341 yield self.conn.read(sz)
342 # FIXME: ok to assume the only NotOk is a KerError? (it is true atm)
346 raise KeyError(str(e))
348 def cat_batch(self, refs):
349 self._require_command(b'cat-batch')
351 self._busy = b'cat-batch'
353 conn.write(b'cat-batch\n')
354 # FIXME: do we want (only) binary protocol?
357 assert b'\n' not in ref
362 info = conn.readline()
363 if info == b'missing\n':
364 yield None, None, None, None
366 if not (info and info.endswith(b'\n')):
367 raise ClientError('Hit EOF while looking for object info: %r'
369 oidx, oid_t, size = info.split(b' ')
371 cr = chunkyreader(conn, size)
372 yield oidx, oid_t, size, cr
373 detritus = next(cr, None)
375 raise ClientError('unexpected leftover data ' + repr(detritus))
377 not_ok = self.check_ok()
382 def refs(self, patterns=None, limit_to_heads=False, limit_to_tags=False):
383 patterns = patterns or tuple()
384 self._require_command(b'refs')
388 conn.write(b'refs %d %d\n' % (1 if limit_to_heads else 0,
389 1 if limit_to_tags else 0))
390 for pattern in patterns:
391 assert b'\n' not in pattern
395 for line in lines_until_sentinel(conn, b'\n', ClientError):
397 oidx, name = line.split(b' ')
399 raise ClientError('Invalid object fingerprint in %r' % line)
401 raise ClientError('Invalid reference name in %r' % line)
402 yield name, unhexlify(oidx)
404 not_ok = self.check_ok()
409 def rev_list(self, refs, count=None, parse=None, format=None):
410 """See git.rev_list for the general semantics, but note that with the
411 current interface, the parse function must be able to handle
412 (consume) any blank lines produced by the format because the
413 first one received that it doesn't consume will be interpreted
414 as a terminator for the entire rev-list result.
417 self._require_command(b'rev-list')
418 assert (count is None) or (isinstance(count, Integral))
420 assert b'\n' not in format
424 assert b'\n' not in ref
426 self._busy = b'rev-list'
428 conn.write(b'rev-list\n')
429 if count is not None:
430 conn.write(b'%d' % count)
440 for line in lines_until_sentinel(conn, b'\n', ClientError):
442 assert len(line) == 40
445 for line in lines_until_sentinel(conn, b'\n', ClientError):
446 if not line.startswith(b'commit '):
447 raise ClientError('unexpected line ' + repr(line))
448 cmt_oidx = line[7:].strip()
449 assert len(cmt_oidx) == 40
450 yield cmt_oidx, parse(conn)
452 not_ok = self.check_ok()
457 def resolve(self, path, parent=None, want_meta=True, follow=False):
458 self._require_command(b'resolve')
460 self._busy = b'resolve'
462 conn.write(b'resolve %d\n' % ((1 if want_meta else 0)
463 | (2 if follow else 0)
464 | (4 if parent else 0)))
466 vfs.write_resolution(conn, parent)
467 write_bvec(conn, path)
468 success = ord(conn.read(1))
469 assert success in (0, 1)
471 result = vfs.read_resolution(conn)
473 result = vfs.read_ioerror(conn)
475 not_ok = self.check_ok()
479 if isinstance(result, vfs.IOError):
484 class PackWriter_Remote(git.PackWriter):
485 def __init__(self, conn, objcache_maker, suggest_packs,
490 max_pack_objects=None):
491 git.PackWriter.__init__(self,
492 objcache_maker=objcache_maker,
493 compression_level=compression_level,
494 max_pack_size=max_pack_size,
495 max_pack_objects=max_pack_objects)
497 self.filename = b'remote socket'
498 self.suggest_packs = suggest_packs
500 self.onclose = onclose
501 self.ensure_busy = ensure_busy
502 self._packopen = False
504 self._bwtime = time.time()
507 if not self._packopen:
509 self._packopen = True
511 def _end(self, run_midx=True):
512 assert(run_midx) # We don't support this via remote yet
513 if self._packopen and self.file:
514 self.file.write(b'\0\0\0\0')
515 self._packopen = False
516 self.onclose() # Unbusy
518 return self.suggest_packs() # Returns last idx received
526 raise ClientError("don't know how to abort remote pack writing")
528 def _raw_write(self, datalist, sha):
530 if not self._packopen:
533 data = b''.join(datalist)
536 crc = zlib.crc32(data) & 0xffffffff
537 outbuf = b''.join((struct.pack('!I', len(data) + 20 + 4),
539 struct.pack('!I', crc),
542 (self._bwcount, self._bwtime) = _raw_write_bwlimit(
543 self.file, outbuf, self._bwcount, self._bwtime)
545 reraise(ClientError(e))
546 self.outbytes += len(data)
549 if self.file.has_input():
551 self.objcache.refresh()