2 from __future__ import print_function
4 from __future__ import absolute_import
5 from binascii import hexlify, unhexlify
6 import os, re, struct, time, zlib
9 from bup import git, ssh, vfs
10 from bup.compat import environ, pending_raise, range, reraise
11 from bup.helpers import (Conn, atomically_replaced_file, chunkyreader, debug1,
12 debug2, linereader, lines_until_sentinel,
13 mkdirp, progress, qprogress, DemuxConn)
14 from bup.io import path_msg
15 from bup.vint import write_bvec
21 class ClientError(Exception):
25 def _raw_write_bwlimit(f, buf, bwcount, bwtime):
28 return (len(buf), time.time())
30 # We want to write in reasonably large blocks, but not so large that
31 # they're likely to overflow a router's queue. So our bwlimit timing
32 # has to be pretty granular. Also, if it takes too long from one
33 # transmit to the next, we can't just make up for lost time to bring
34 # the average back up to bwlimit - that will risk overflowing the
35 # outbound queue, which defeats the purpose. So if we fall behind
36 # by more than one block delay, we shouldn't ever try to catch up.
37 for i in range(0,len(buf),4096):
39 next = max(now, bwtime + 1.0*bwcount/bwlimit)
43 bwcount = len(sub) # might be less than 4096
45 return (bwcount, bwtime)
48 _protocol_rs = br'([a-z]+)://'
49 _host_rs = br'(?P<sb>\[)?((?(sb)[0-9a-f:]+|[^:/]+))(?(sb)\])'
50 _port_rs = br'(?::(\d+))?'
52 _url_rx = re.compile(br'%s(?:%s%s)?%s' % (_protocol_rs, _host_rs, _port_rs, _path_rs),
55 def parse_remote(remote):
56 url_match = _url_rx.match(remote)
58 if not url_match.group(1) in (b'ssh', b'bup', b'file'):
59 raise ClientError('unexpected protocol: %s'
60 % url_match.group(1).decode('ascii'))
61 return url_match.group(1,3,4,5)
63 rs = remote.split(b':', 1)
64 if len(rs) == 1 or rs[0] in (b'', b'-'):
65 return b'file', None, None, rs[-1]
67 return b'ssh', rs[0], None, rs[1]
71 def __init__(self, remote, create=False):
72 self._busy = self.conn = None
73 self.sock = self.p = self.pout = self.pin = None
74 is_reverse = environ.get(b'BUP_SERVER_REVERSE')
77 remote = b'%s:' % is_reverse
78 (self.protocol, self.host, self.port, self.dir) = parse_remote(remote)
79 # The b'None' here matches python2's behavior of b'%s' % None == 'None',
80 # python3 will (as of version 3.7.5) do the same for str ('%s' % None),
81 # but crashes instead when doing b'%s' % None.
82 cachehost = b'None' if self.host is None else self.host
83 cachedir = b'None' if self.dir is None else self.dir
84 self.cachedir = git.repo(b'index-cache/%s'
87 b'%s:%s' % (cachehost, cachedir)))
89 self.pout = os.fdopen(3, 'rb')
90 self.pin = os.fdopen(4, 'wb')
91 self.conn = Conn(self.pout, self.pin)
93 if self.protocol in (b'ssh', b'file'):
95 # FIXME: ssh and file shouldn't use the same module
96 self.p = ssh.connect(self.host, self.port, b'server')
97 self.pout = self.p.stdout
98 self.pin = self.p.stdin
99 self.conn = Conn(self.pout, self.pin)
101 reraise(ClientError('connect: %s' % e))
102 elif self.protocol == b'bup':
103 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
104 self.sock.connect((self.host,
105 1982 if self.port is None else int(self.port)))
106 self.sockw = self.sock.makefile('wb')
107 self.conn = DemuxConn(self.sock.fileno(), self.sockw)
108 self._available_commands = self._get_available_commands()
109 self._require_command(b'init-dir')
110 self._require_command(b'set-dir')
112 self.dir = re.sub(br'[\r\n]', ' ', self.dir)
114 self.conn.write(b'init-dir %s\n' % self.dir)
116 self.conn.write(b'set-dir %s\n' % self.dir)
123 def __exit__(self, type, value, traceback):
124 with pending_raise(value, rethrow=False):
128 if self.conn and not self._busy:
129 self.conn.write(b'quit\n')
132 if self.sock and self.sockw:
134 self.sock.shutdown(socket.SHUT_WR)
145 raise ClientError('server tunnel returned exit code %d' % rv)
147 self.sock = self.p = self.pin = self.pout = None
153 raise ClientError('server exited unexpectedly with code %r'
156 return self.conn.check_ok()
157 except Exception as e:
158 reraise(ClientError(e))
159 # reraise doesn't return
162 def check_busy(self):
164 raise ClientError('already busy with command %r' % self._busy)
166 def ensure_busy(self):
168 raise ClientError('expected to be busy, but not busy?!')
173 def _get_available_commands(self):
177 conn.write(b'help\n')
179 line = self.conn.readline()
180 if not line == b'Commands:\n':
181 raise ClientError('unexpected help header ' + repr(line))
183 line = self.conn.readline()
186 if not line.startswith(b' '):
187 raise ClientError('unexpected help line ' + repr(line))
190 raise ClientError('unexpected help line ' + repr(line))
193 not_ok = self.check_ok()
197 return frozenset(result)
199 def _require_command(self, name):
200 if name not in self._available_commands:
201 raise ClientError('server does not appear to provide %s command'
202 % name.encode('ascii'))
204 def sync_indexes(self):
205 self._require_command(b'list-indexes')
208 mkdirp(self.cachedir)
209 # All cached idxs are extra until proven otherwise
211 for f in os.listdir(self.cachedir):
212 debug1(path_msg(f) + '\n')
213 if f.endswith(b'.idx'):
216 conn.write(b'list-indexes\n')
217 for line in linereader(conn):
220 assert(line.find(b'/') < 0)
221 parts = line.split(b' ')
223 if len(parts) == 2 and parts[1] == b'load' and idx not in extra:
224 # If the server requests that we load an idx and we don't
225 # already have a copy of it, it is needed
227 # Any idx that the server has heard of is proven not extra
231 debug1('client: removing extra indexes: %s\n' % extra)
233 os.unlink(os.path.join(self.cachedir, idx))
234 debug1('client: server requested load of: %s\n' % needed)
237 git.auto_midx(self.cachedir)
239 def sync_index(self, name):
240 self._require_command(b'send-index')
241 #debug1('requesting %r\n' % name)
243 mkdirp(self.cachedir)
244 fn = os.path.join(self.cachedir, name)
245 if os.path.exists(fn):
246 msg = ("won't request existing .idx, try `bup bloom --check %s`"
248 raise ClientError(msg)
249 self.conn.write(b'send-index %s\n' % name)
250 n = struct.unpack('!I', self.conn.read(4))[0]
252 with atomically_replaced_file(fn, 'wb') as f:
254 progress('Receiving index from server: %d/%d\r' % (count, n))
255 for b in chunkyreader(self.conn, n):
258 qprogress('Receiving index from server: %d/%d\r' % (count, n))
259 progress('Receiving index from server: %d/%d, done.\n' % (count, n))
262 def _make_objcache(self):
263 return git.PackIdxList(self.cachedir)
265 def _suggest_packs(self):
268 assert(ob == b'receive-objects-v2')
269 self.conn.write(b'\xff\xff\xff\xff') # suspend receive-objects-v2
271 for line in linereader(self.conn):
274 debug2('%r\n' % line)
275 if line.startswith(b'index '):
277 debug1('client: received index suggestion: %s\n'
278 % git.shorten_hash(idx).decode('ascii'))
279 suggested.append(idx)
281 assert(line.endswith(b'.idx'))
282 debug1('client: completed writing pack, idx: %s\n'
283 % git.shorten_hash(line).decode('ascii'))
284 suggested.append(line)
289 for idx in suggested:
291 git.auto_midx(self.cachedir)
294 self.conn.write(b'%s\n' % ob)
297 def new_packwriter(self, compression_level=1,
298 max_pack_size=None, max_pack_objects=None):
299 self._require_command(b'receive-objects-v2')
302 self._busy = b'receive-objects-v2'
303 self.conn.write(b'receive-objects-v2\n')
304 return PackWriter_Remote(self.conn,
305 objcache_maker = self._make_objcache,
306 suggest_packs = self._suggest_packs,
308 onclose = self._not_busy,
309 ensure_busy = self.ensure_busy,
310 compression_level=compression_level,
311 max_pack_size=max_pack_size,
312 max_pack_objects=max_pack_objects)
314 def read_ref(self, refname):
315 self._require_command(b'read-ref')
317 self.conn.write(b'read-ref %s\n' % refname)
318 r = self.conn.readline().strip()
321 assert(len(r) == 40) # hexified sha
324 return None # nonexistent ref
326 def update_ref(self, refname, newval, oldval):
327 self._require_command(b'update-ref')
329 self.conn.write(b'update-ref %s\n%s\n%s\n'
330 % (refname, hexlify(newval),
331 hexlify(oldval) if oldval else b''))
335 self._require_command(b'join')
338 # Send 'cat' so we'll work fine with older versions
339 self.conn.write(b'cat %s\n' % re.sub(br'[\n\r]', b'_', id))
341 sz = struct.unpack('!I', self.conn.read(4))[0]
343 yield self.conn.read(sz)
344 # FIXME: ok to assume the only NotOk is a KerError? (it is true atm)
348 raise KeyError(str(e))
350 def cat_batch(self, refs):
351 self._require_command(b'cat-batch')
353 self._busy = b'cat-batch'
355 conn.write(b'cat-batch\n')
356 # FIXME: do we want (only) binary protocol?
359 assert b'\n' not in ref
364 info = conn.readline()
365 if info == b'missing\n':
366 yield None, None, None, None
368 if not (info and info.endswith(b'\n')):
369 raise ClientError('Hit EOF while looking for object info: %r'
371 oidx, oid_t, size = info.split(b' ')
373 cr = chunkyreader(conn, size)
374 yield oidx, oid_t, size, cr
375 detritus = next(cr, None)
377 raise ClientError('unexpected leftover data ' + repr(detritus))
379 not_ok = self.check_ok()
384 def refs(self, patterns=None, limit_to_heads=False, limit_to_tags=False):
385 patterns = patterns or tuple()
386 self._require_command(b'refs')
390 conn.write(b'refs %d %d\n' % (1 if limit_to_heads else 0,
391 1 if limit_to_tags else 0))
392 for pattern in patterns:
393 assert b'\n' not in pattern
397 for line in lines_until_sentinel(conn, b'\n', ClientError):
399 oidx, name = line.split(b' ')
401 raise ClientError('Invalid object fingerprint in %r' % line)
403 raise ClientError('Invalid reference name in %r' % line)
404 yield name, unhexlify(oidx)
406 not_ok = self.check_ok()
411 def rev_list(self, refs, parse=None, format=None):
412 """See git.rev_list for the general semantics, but note that with the
413 current interface, the parse function must be able to handle
414 (consume) any blank lines produced by the format because the
415 first one received that it doesn't consume will be interpreted
416 as a terminator for the entire rev-list result.
419 self._require_command(b'rev-list')
421 assert b'\n' not in format
425 assert b'\n' not in ref
427 self._busy = b'rev-list'
429 conn.write(b'rev-list\n')
439 for line in lines_until_sentinel(conn, b'\n', ClientError):
441 assert len(line) == 40
444 for line in lines_until_sentinel(conn, b'\n', ClientError):
445 if not line.startswith(b'commit '):
446 raise ClientError('unexpected line ' + repr(line))
447 cmt_oidx = line[7:].strip()
448 assert len(cmt_oidx) == 40
449 yield cmt_oidx, parse(conn)
451 not_ok = self.check_ok()
456 def resolve(self, path, parent=None, want_meta=True, follow=False):
457 self._require_command(b'resolve')
459 self._busy = b'resolve'
461 conn.write(b'resolve %d\n' % ((1 if want_meta else 0)
462 | (2 if follow else 0)
463 | (4 if parent else 0)))
465 vfs.write_resolution(conn, parent)
466 write_bvec(conn, path)
467 success = ord(conn.read(1))
468 assert success in (0, 1)
470 result = vfs.read_resolution(conn)
472 result = vfs.read_ioerror(conn)
474 not_ok = self.check_ok()
478 if isinstance(result, vfs.IOError):
483 # FIXME: disentangle this (stop inheriting) from PackWriter
484 class PackWriter_Remote(git.PackWriter):
486 def __init__(self, conn, objcache_maker, suggest_packs,
491 max_pack_objects=None):
492 git.PackWriter.__init__(self,
493 objcache_maker=objcache_maker,
494 compression_level=compression_level,
495 max_pack_size=max_pack_size,
496 max_pack_objects=max_pack_objects)
498 self.filename = b'remote socket'
499 self.suggest_packs = suggest_packs
501 self.onclose = onclose
502 self.ensure_busy = ensure_busy
503 self._packopen = False
505 self._bwtime = time.time()
507 # __enter__ and __exit__ are inherited
510 if not self._packopen:
512 self._packopen = True
514 def _end(self, run_midx=True):
515 # Called by other PackWriter methods like breakpoint().
516 # Must not close the connection (self.file)
517 assert(run_midx) # We don't support this via remote yet
518 if self._packopen and self.file:
519 self.file.write(b'\0\0\0\0')
520 self._packopen = False
521 self.onclose() # Unbusy
523 return self.suggest_packs() # Returns last idx received
527 # Called by inherited __exit__
533 raise ClientError("don't know how to abort remote pack writing")
535 def _raw_write(self, datalist, sha):
537 if not self._packopen:
540 data = b''.join(datalist)
543 crc = zlib.crc32(data) & 0xffffffff
544 outbuf = b''.join((struct.pack('!I', len(data) + 20 + 4),
546 struct.pack('!I', crc),
549 (self._bwcount, self._bwtime) = _raw_write_bwlimit(
550 self.file, outbuf, self._bwcount, self._bwtime)
552 reraise(ClientError(e))
553 self.outbytes += len(data)
556 if self.file.has_input():
558 self.objcache.refresh()