2 from __future__ import print_function
4 from __future__ import absolute_import
5 from binascii import hexlify, unhexlify
6 import errno, os, re, struct, sys, time, zlib
8 from bup import git, ssh, vfs
9 from bup.compat import environ, range, reraise
10 from bup.helpers import (Conn, atomically_replaced_file, chunkyreader, debug1,
11 debug2, linereader, lines_until_sentinel,
12 mkdirp, progress, qprogress)
13 from bup.io import path_msg
14 from bup.vint import read_bvec, read_vuint, write_bvec
20 class ClientError(Exception):
24 def _raw_write_bwlimit(f, buf, bwcount, bwtime):
27 return (len(buf), time.time())
29 # We want to write in reasonably large blocks, but not so large that
30 # they're likely to overflow a router's queue. So our bwlimit timing
31 # has to be pretty granular. Also, if it takes too long from one
32 # transmit to the next, we can't just make up for lost time to bring
33 # the average back up to bwlimit - that will risk overflowing the
34 # outbound queue, which defeats the purpose. So if we fall behind
35 # by more than one block delay, we shouldn't ever try to catch up.
36 for i in range(0,len(buf),4096):
38 next = max(now, bwtime + 1.0*bwcount/bwlimit)
42 bwcount = len(sub) # might be less than 4096
44 return (bwcount, bwtime)
47 _protocol_rs = br'([a-z]+)://'
48 _host_rs = br'(?P<sb>\[)?((?(sb)[0-9a-f:]+|[^:/]+))(?(sb)\])'
49 _port_rs = br'(?::(\d+))?'
51 _url_rx = re.compile(br'%s(?:%s%s)?%s' % (_protocol_rs, _host_rs, _port_rs, _path_rs),
54 def parse_remote(remote):
55 url_match = _url_rx.match(remote)
57 if not url_match.group(1) in (b'ssh', b'bup', b'file'):
58 raise ClientError('unexpected protocol: %s'
59 % url_match.group(1).decode('ascii'))
60 return url_match.group(1,3,4,5)
62 rs = remote.split(b':', 1)
63 if len(rs) == 1 or rs[0] in (b'', b'-'):
64 return b'file', None, None, rs[-1]
66 return b'ssh', rs[0], None, rs[1]
70 def __init__(self, remote, create=False):
71 self._busy = self.conn = None
72 self.sock = self.p = self.pout = self.pin = None
73 is_reverse = environ.get(b'BUP_SERVER_REVERSE')
76 remote = b'%s:' % is_reverse
77 (self.protocol, self.host, self.port, self.dir) = parse_remote(remote)
78 self.cachedir = git.repo(b'index-cache/%s'
81 # FIXME: the Nones just
82 # match python 2's behavior
83 b'%s:%s' % (self.host or b'None',
84 self.dir or b'None')))
86 self.pout = os.fdopen(3, 'rb')
87 self.pin = os.fdopen(4, 'wb')
88 self.conn = Conn(self.pout, self.pin)
90 if self.protocol in (b'ssh', b'file'):
92 # FIXME: ssh and file shouldn't use the same module
93 self.p = ssh.connect(self.host, self.port, b'server')
94 self.pout = self.p.stdout
95 self.pin = self.p.stdin
96 self.conn = Conn(self.pout, self.pin)
98 reraise(ClientError('connect: %s' % e))
99 elif self.protocol == b'bup':
100 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
101 self.sock.connect((self.host, atoi(self.port) or 1982))
102 self.sockw = self.sock.makefile('wb')
103 self.conn = DemuxConn(self.sock.fileno(), self.sockw)
104 self._available_commands = self._get_available_commands()
105 self._require_command(b'init-dir')
106 self._require_command(b'set-dir')
108 self.dir = re.sub(br'[\r\n]', ' ', self.dir)
110 self.conn.write(b'init-dir %s\n' % self.dir)
112 self.conn.write(b'set-dir %s\n' % self.dir)
120 if e.errno == errno.EPIPE:
126 if self.conn and not self._busy:
127 self.conn.write(b'quit\n')
130 if self.sock and self.sockw:
132 self.sock.shutdown(socket.SHUT_WR)
143 raise ClientError('server tunnel returned exit code %d' % rv)
145 self.sock = self.p = self.pin = self.pout = None
151 raise ClientError('server exited unexpectedly with code %r'
154 return self.conn.check_ok()
155 except Exception as e:
156 reraise(ClientError(e))
158 def check_busy(self):
160 raise ClientError('already busy with command %r' % self._busy)
162 def ensure_busy(self):
164 raise ClientError('expected to be busy, but not busy?!')
169 def _get_available_commands(self):
173 conn.write(b'help\n')
175 line = self.conn.readline()
176 if not line == b'Commands:\n':
177 raise ClientError('unexpected help header ' + repr(line))
179 line = self.conn.readline()
182 if not line.startswith(b' '):
183 raise ClientError('unexpected help line ' + repr(line))
186 raise ClientError('unexpected help line ' + repr(line))
189 not_ok = self.check_ok()
193 return frozenset(result)
195 def _require_command(self, name):
196 if name not in self._available_commands:
197 raise ClientError('server does not appear to provide %s command'
198 % name.encode('ascii'))
200 def sync_indexes(self):
201 self._require_command(b'list-indexes')
204 mkdirp(self.cachedir)
205 # All cached idxs are extra until proven otherwise
207 for f in os.listdir(self.cachedir):
208 debug1(path_msg(f) + '\n')
209 if f.endswith(b'.idx'):
212 conn.write(b'list-indexes\n')
213 for line in linereader(conn):
216 assert(line.find(b'/') < 0)
217 parts = line.split(b' ')
219 if len(parts) == 2 and parts[1] == b'load' and idx not in extra:
220 # If the server requests that we load an idx and we don't
221 # already have a copy of it, it is needed
223 # Any idx that the server has heard of is proven not extra
227 debug1('client: removing extra indexes: %s\n' % extra)
229 os.unlink(os.path.join(self.cachedir, idx))
230 debug1('client: server requested load of: %s\n' % needed)
233 git.auto_midx(self.cachedir)
235 def sync_index(self, name):
236 self._require_command(b'send-index')
237 #debug1('requesting %r\n' % name)
239 mkdirp(self.cachedir)
240 fn = os.path.join(self.cachedir, name)
241 if os.path.exists(fn):
242 msg = ("won't request existing .idx, try `bup bloom --check %s`"
244 raise ClientError(msg)
245 self.conn.write(b'send-index %s\n' % name)
246 n = struct.unpack('!I', self.conn.read(4))[0]
248 with atomically_replaced_file(fn, 'wb') as f:
250 progress('Receiving index from server: %d/%d\r' % (count, n))
251 for b in chunkyreader(self.conn, n):
254 qprogress('Receiving index from server: %d/%d\r' % (count, n))
255 progress('Receiving index from server: %d/%d, done.\n' % (count, n))
258 def _make_objcache(self):
259 return git.PackIdxList(self.cachedir)
261 def _suggest_packs(self):
264 assert(ob == b'receive-objects-v2')
265 self.conn.write(b'\xff\xff\xff\xff') # suspend receive-objects-v2
267 for line in linereader(self.conn):
270 debug2('%r\n' % line)
271 if line.startswith(b'index '):
273 debug1('client: received index suggestion: %s\n'
274 % git.shorten_hash(idx).decode('ascii'))
275 suggested.append(idx)
277 assert(line.endswith(b'.idx'))
278 debug1('client: completed writing pack, idx: %s\n'
279 % git.shorten_hash(line).decode('ascii'))
280 suggested.append(line)
285 for idx in suggested:
287 git.auto_midx(self.cachedir)
290 self.conn.write(b'%s\n' % ob)
293 def new_packwriter(self, compression_level=1,
294 max_pack_size=None, max_pack_objects=None):
295 self._require_command(b'receive-objects-v2')
298 self._busy = b'receive-objects-v2'
299 self.conn.write(b'receive-objects-v2\n')
300 return PackWriter_Remote(self.conn,
301 objcache_maker = self._make_objcache,
302 suggest_packs = self._suggest_packs,
304 onclose = self._not_busy,
305 ensure_busy = self.ensure_busy,
306 compression_level=compression_level,
307 max_pack_size=max_pack_size,
308 max_pack_objects=max_pack_objects)
310 def read_ref(self, refname):
311 self._require_command(b'read-ref')
313 self.conn.write(b'read-ref %s\n' % refname)
314 r = self.conn.readline().strip()
317 assert(len(r) == 40) # hexified sha
318 return r.decode('hex')
320 return None # nonexistent ref
322 def update_ref(self, refname, newval, oldval):
323 self._require_command(b'update-ref')
325 self.conn.write(b'update-ref %s\n%s\n%s\n'
326 % (refname, hexlify(newval),
327 hexlify(oldval) if oldval else b''))
331 self._require_command(b'join')
334 # Send 'cat' so we'll work fine with older versions
335 self.conn.write(b'cat %s\n' % re.sub(br'[\n\r]', b'_', id))
337 sz = struct.unpack('!I', self.conn.read(4))[0]
339 yield self.conn.read(sz)
340 # FIXME: ok to assume the only NotOk is a KerError? (it is true atm)
344 raise KeyError(str(e))
346 def cat_batch(self, refs):
347 self._require_command(b'cat-batch')
349 self._busy = b'cat-batch'
351 conn.write(b'cat-batch\n')
352 # FIXME: do we want (only) binary protocol?
355 assert b'\n' not in ref
360 info = conn.readline()
361 if info == b'missing\n':
362 yield None, None, None, None
364 if not (info and info.endswith(b'\n')):
365 raise ClientError('Hit EOF while looking for object info: %r'
367 oidx, oid_t, size = info.split(b' ')
369 cr = chunkyreader(conn, size)
370 yield oidx, oid_t, size, cr
371 detritus = next(cr, None)
373 raise ClientError('unexpected leftover data ' + repr(detritus))
375 not_ok = self.check_ok()
380 def refs(self, patterns=None, limit_to_heads=False, limit_to_tags=False):
381 patterns = patterns or tuple()
382 self._require_command(b'refs')
386 conn.write(b'refs %d %d\n' % (1 if limit_to_heads else 0,
387 1 if limit_to_tags else 0))
388 for pattern in patterns:
389 assert b'\n' not in pattern
393 for line in lines_until_sentinel(conn, b'\n', ClientError):
395 oidx, name = line.split(b' ')
397 raise ClientError('Invalid object fingerprint in %r' % line)
399 raise ClientError('Invalid reference name in %r' % line)
400 yield name, unhexlify(oidx)
402 not_ok = self.check_ok()
407 def rev_list(self, refs, count=None, parse=None, format=None):
408 """See git.rev_list for the general semantics, but note that with the
409 current interface, the parse function must be able to handle
410 (consume) any blank lines produced by the format because the
411 first one received that it doesn't consume will be interpreted
412 as a terminator for the entire rev-list result.
415 self._require_command(b'rev-list')
416 assert (count is None) or (isinstance(count, Integral))
418 assert b'\n' not in format
422 assert b'\n' not in ref
424 self._busy = b'rev-list'
426 conn.write(b'rev-list\n')
427 if count is not None:
428 conn.write(b'%d' % count)
438 for line in lines_until_sentinel(conn, b'\n', ClientError):
440 assert len(line) == 40
443 for line in lines_until_sentinel(conn, b'\n', ClientError):
444 if not line.startswith(b'commit '):
445 raise ClientError('unexpected line ' + repr(line))
446 cmt_oidx = line[7:].strip()
447 assert len(cmt_oidx) == 40
448 yield cmt_oidx, parse(conn)
450 not_ok = self.check_ok()
455 def resolve(self, path, parent=None, want_meta=True, follow=False):
456 self._require_command(b'resolve')
458 self._busy = b'resolve'
460 conn.write(b'resolve %d\n' % ((1 if want_meta else 0)
461 | (2 if follow else 0)
462 | (4 if parent else 0)))
464 vfs.write_resolution(conn, parent)
465 write_bvec(conn, path)
466 success = ord(conn.read(1))
467 assert success in (0, 1)
469 result = vfs.read_resolution(conn)
471 result = vfs.read_ioerror(conn)
473 not_ok = self.check_ok()
477 if isinstance(result, vfs.IOError):
482 class PackWriter_Remote(git.PackWriter):
483 def __init__(self, conn, objcache_maker, suggest_packs,
488 max_pack_objects=None):
489 git.PackWriter.__init__(self,
490 objcache_maker=objcache_maker,
491 compression_level=compression_level,
492 max_pack_size=max_pack_size,
493 max_pack_objects=max_pack_objects)
495 self.filename = b'remote socket'
496 self.suggest_packs = suggest_packs
498 self.onclose = onclose
499 self.ensure_busy = ensure_busy
500 self._packopen = False
502 self._bwtime = time.time()
505 if not self._packopen:
507 self._packopen = True
509 def _end(self, run_midx=True):
510 assert(run_midx) # We don't support this via remote yet
511 if self._packopen and self.file:
512 self.file.write(b'\0\0\0\0')
513 self._packopen = False
514 self.onclose() # Unbusy
516 return self.suggest_packs() # Returns last idx received
524 raise ClientError("don't know how to abort remote pack writing")
526 def _raw_write(self, datalist, sha):
528 if not self._packopen:
531 data = b''.join(datalist)
534 crc = zlib.crc32(data) & 0xffffffff
535 outbuf = b''.join((struct.pack('!I', len(data) + 20 + 4),
537 struct.pack('!I', crc),
540 (self._bwcount, self._bwtime) = _raw_write_bwlimit(
541 self.file, outbuf, self._bwcount, self._bwtime)
543 reraise(ClientError(e))
544 self.outbytes += len(data)
547 if self.file.has_input():
549 self.objcache.refresh()