2 from __future__ import print_function
4 from __future__ import absolute_import
5 from binascii import hexlify, unhexlify
6 import errno, os, re, struct, time, zlib
9 from bup import git, ssh, vfs
10 from bup.compat import environ, range, reraise
11 from bup.helpers import (Conn, atomically_replaced_file, chunkyreader, debug1,
12 debug2, linereader, lines_until_sentinel,
13 mkdirp, progress, qprogress, DemuxConn)
14 from bup.io import path_msg
15 from bup.vint import write_bvec
21 class ClientError(Exception):
25 def _raw_write_bwlimit(f, buf, bwcount, bwtime):
28 return (len(buf), time.time())
30 # We want to write in reasonably large blocks, but not so large that
31 # they're likely to overflow a router's queue. So our bwlimit timing
32 # has to be pretty granular. Also, if it takes too long from one
33 # transmit to the next, we can't just make up for lost time to bring
34 # the average back up to bwlimit - that will risk overflowing the
35 # outbound queue, which defeats the purpose. So if we fall behind
36 # by more than one block delay, we shouldn't ever try to catch up.
37 for i in range(0,len(buf),4096):
39 next = max(now, bwtime + 1.0*bwcount/bwlimit)
43 bwcount = len(sub) # might be less than 4096
45 return (bwcount, bwtime)
48 _protocol_rs = br'([a-z]+)://'
49 _host_rs = br'(?P<sb>\[)?((?(sb)[0-9a-f:]+|[^:/]+))(?(sb)\])'
50 _port_rs = br'(?::(\d+))?'
52 _url_rx = re.compile(br'%s(?:%s%s)?%s' % (_protocol_rs, _host_rs, _port_rs, _path_rs),
55 def parse_remote(remote):
56 url_match = _url_rx.match(remote)
58 if not url_match.group(1) in (b'ssh', b'bup', b'file'):
59 raise ClientError('unexpected protocol: %s'
60 % url_match.group(1).decode('ascii'))
61 return url_match.group(1,3,4,5)
63 rs = remote.split(b':', 1)
64 if len(rs) == 1 or rs[0] in (b'', b'-'):
65 return b'file', None, None, rs[-1]
67 return b'ssh', rs[0], None, rs[1]
71 def __init__(self, remote, create=False):
72 self._busy = self.conn = None
73 self.sock = self.p = self.pout = self.pin = None
74 is_reverse = environ.get(b'BUP_SERVER_REVERSE')
77 remote = b'%s:' % is_reverse
78 (self.protocol, self.host, self.port, self.dir) = parse_remote(remote)
79 # The b'None' here matches python2's behavior of b'%s' % None == 'None',
80 # python3 will (as of version 3.7.5) do the same for str ('%s' % None),
81 # but crashes instead when doing b'%s' % None.
82 cachehost = b'None' if self.host is None else self.host
83 cachedir = b'None' if self.dir is None else self.dir
84 self.cachedir = git.repo(b'index-cache/%s'
87 b'%s:%s' % (cachehost, cachedir)))
89 self.pout = os.fdopen(3, 'rb')
90 self.pin = os.fdopen(4, 'wb')
91 self.conn = Conn(self.pout, self.pin)
93 if self.protocol in (b'ssh', b'file'):
95 # FIXME: ssh and file shouldn't use the same module
96 self.p = ssh.connect(self.host, self.port, b'server')
97 self.pout = self.p.stdout
98 self.pin = self.p.stdin
99 self.conn = Conn(self.pout, self.pin)
101 reraise(ClientError('connect: %s' % e))
102 elif self.protocol == b'bup':
103 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
104 self.sock.connect((self.host,
105 1982 if self.port is None else int(self.port)))
106 self.sockw = self.sock.makefile('wb')
107 self.conn = DemuxConn(self.sock.fileno(), self.sockw)
108 self._available_commands = self._get_available_commands()
109 self._require_command(b'init-dir')
110 self._require_command(b'set-dir')
112 self.dir = re.sub(br'[\r\n]', ' ', self.dir)
114 self.conn.write(b'init-dir %s\n' % self.dir)
116 self.conn.write(b'set-dir %s\n' % self.dir)
124 if e.errno == errno.EPIPE:
130 if self.conn and not self._busy:
131 self.conn.write(b'quit\n')
134 if self.sock and self.sockw:
136 self.sock.shutdown(socket.SHUT_WR)
147 raise ClientError('server tunnel returned exit code %d' % rv)
149 self.sock = self.p = self.pin = self.pout = None
155 raise ClientError('server exited unexpectedly with code %r'
158 return self.conn.check_ok()
159 except Exception as e:
160 reraise(ClientError(e))
162 def check_busy(self):
164 raise ClientError('already busy with command %r' % self._busy)
166 def ensure_busy(self):
168 raise ClientError('expected to be busy, but not busy?!')
173 def _get_available_commands(self):
177 conn.write(b'help\n')
179 line = self.conn.readline()
180 if not line == b'Commands:\n':
181 raise ClientError('unexpected help header ' + repr(line))
183 line = self.conn.readline()
186 if not line.startswith(b' '):
187 raise ClientError('unexpected help line ' + repr(line))
190 raise ClientError('unexpected help line ' + repr(line))
193 not_ok = self.check_ok()
197 return frozenset(result)
199 def _require_command(self, name):
200 if name not in self._available_commands:
201 raise ClientError('server does not appear to provide %s command'
202 % name.encode('ascii'))
204 def sync_indexes(self):
205 self._require_command(b'list-indexes')
208 mkdirp(self.cachedir)
209 # All cached idxs are extra until proven otherwise
211 for f in os.listdir(self.cachedir):
212 debug1(path_msg(f) + '\n')
213 if f.endswith(b'.idx'):
216 conn.write(b'list-indexes\n')
217 for line in linereader(conn):
220 assert(line.find(b'/') < 0)
221 parts = line.split(b' ')
223 if len(parts) == 2 and parts[1] == b'load' and idx not in extra:
224 # If the server requests that we load an idx and we don't
225 # already have a copy of it, it is needed
227 # Any idx that the server has heard of is proven not extra
231 debug1('client: removing extra indexes: %s\n' % extra)
233 os.unlink(os.path.join(self.cachedir, idx))
234 debug1('client: server requested load of: %s\n' % needed)
237 git.auto_midx(self.cachedir)
239 def sync_index(self, name):
240 self._require_command(b'send-index')
241 #debug1('requesting %r\n' % name)
243 mkdirp(self.cachedir)
244 fn = os.path.join(self.cachedir, name)
245 if os.path.exists(fn):
246 msg = ("won't request existing .idx, try `bup bloom --check %s`"
248 raise ClientError(msg)
249 self.conn.write(b'send-index %s\n' % name)
250 n = struct.unpack('!I', self.conn.read(4))[0]
252 with atomically_replaced_file(fn, 'wb') as f:
254 progress('Receiving index from server: %d/%d\r' % (count, n))
255 for b in chunkyreader(self.conn, n):
258 qprogress('Receiving index from server: %d/%d\r' % (count, n))
259 progress('Receiving index from server: %d/%d, done.\n' % (count, n))
262 def _make_objcache(self):
263 return git.PackIdxList(self.cachedir)
265 def _suggest_packs(self):
268 assert(ob == b'receive-objects-v2')
269 self.conn.write(b'\xff\xff\xff\xff') # suspend receive-objects-v2
271 for line in linereader(self.conn):
274 debug2('%r\n' % line)
275 if line.startswith(b'index '):
277 debug1('client: received index suggestion: %s\n'
278 % git.shorten_hash(idx).decode('ascii'))
279 suggested.append(idx)
281 assert(line.endswith(b'.idx'))
282 debug1('client: completed writing pack, idx: %s\n'
283 % git.shorten_hash(line).decode('ascii'))
284 suggested.append(line)
289 for idx in suggested:
291 git.auto_midx(self.cachedir)
294 self.conn.write(b'%s\n' % ob)
297 def new_packwriter(self, compression_level=1,
298 max_pack_size=None, max_pack_objects=None):
299 self._require_command(b'receive-objects-v2')
302 self._busy = b'receive-objects-v2'
303 self.conn.write(b'receive-objects-v2\n')
304 return PackWriter_Remote(self.conn,
305 objcache_maker = self._make_objcache,
306 suggest_packs = self._suggest_packs,
308 onclose = self._not_busy,
309 ensure_busy = self.ensure_busy,
310 compression_level=compression_level,
311 max_pack_size=max_pack_size,
312 max_pack_objects=max_pack_objects)
314 def read_ref(self, refname):
315 self._require_command(b'read-ref')
317 self.conn.write(b'read-ref %s\n' % refname)
318 r = self.conn.readline().strip()
321 assert(len(r) == 40) # hexified sha
324 return None # nonexistent ref
326 def update_ref(self, refname, newval, oldval):
327 self._require_command(b'update-ref')
329 self.conn.write(b'update-ref %s\n%s\n%s\n'
330 % (refname, hexlify(newval),
331 hexlify(oldval) if oldval else b''))
335 self._require_command(b'join')
338 # Send 'cat' so we'll work fine with older versions
339 self.conn.write(b'cat %s\n' % re.sub(br'[\n\r]', b'_', id))
341 sz = struct.unpack('!I', self.conn.read(4))[0]
343 yield self.conn.read(sz)
344 # FIXME: ok to assume the only NotOk is a KerError? (it is true atm)
348 raise KeyError(str(e))
350 def cat_batch(self, refs):
351 self._require_command(b'cat-batch')
353 self._busy = b'cat-batch'
355 conn.write(b'cat-batch\n')
356 # FIXME: do we want (only) binary protocol?
359 assert b'\n' not in ref
364 info = conn.readline()
365 if info == b'missing\n':
366 yield None, None, None, None
368 if not (info and info.endswith(b'\n')):
369 raise ClientError('Hit EOF while looking for object info: %r'
371 oidx, oid_t, size = info.split(b' ')
373 cr = chunkyreader(conn, size)
374 yield oidx, oid_t, size, cr
375 detritus = next(cr, None)
377 raise ClientError('unexpected leftover data ' + repr(detritus))
379 not_ok = self.check_ok()
384 def refs(self, patterns=None, limit_to_heads=False, limit_to_tags=False):
385 patterns = patterns or tuple()
386 self._require_command(b'refs')
390 conn.write(b'refs %d %d\n' % (1 if limit_to_heads else 0,
391 1 if limit_to_tags else 0))
392 for pattern in patterns:
393 assert b'\n' not in pattern
397 for line in lines_until_sentinel(conn, b'\n', ClientError):
399 oidx, name = line.split(b' ')
401 raise ClientError('Invalid object fingerprint in %r' % line)
403 raise ClientError('Invalid reference name in %r' % line)
404 yield name, unhexlify(oidx)
406 not_ok = self.check_ok()
411 def rev_list(self, refs, parse=None, format=None):
412 """See git.rev_list for the general semantics, but note that with the
413 current interface, the parse function must be able to handle
414 (consume) any blank lines produced by the format because the
415 first one received that it doesn't consume will be interpreted
416 as a terminator for the entire rev-list result.
419 self._require_command(b'rev-list')
421 assert b'\n' not in format
425 assert b'\n' not in ref
427 self._busy = b'rev-list'
429 conn.write(b'rev-list\n')
439 for line in lines_until_sentinel(conn, b'\n', ClientError):
441 assert len(line) == 40
444 for line in lines_until_sentinel(conn, b'\n', ClientError):
445 if not line.startswith(b'commit '):
446 raise ClientError('unexpected line ' + repr(line))
447 cmt_oidx = line[7:].strip()
448 assert len(cmt_oidx) == 40
449 yield cmt_oidx, parse(conn)
451 not_ok = self.check_ok()
456 def resolve(self, path, parent=None, want_meta=True, follow=False):
457 self._require_command(b'resolve')
459 self._busy = b'resolve'
461 conn.write(b'resolve %d\n' % ((1 if want_meta else 0)
462 | (2 if follow else 0)
463 | (4 if parent else 0)))
465 vfs.write_resolution(conn, parent)
466 write_bvec(conn, path)
467 success = ord(conn.read(1))
468 assert success in (0, 1)
470 result = vfs.read_resolution(conn)
472 result = vfs.read_ioerror(conn)
474 not_ok = self.check_ok()
478 if isinstance(result, vfs.IOError):
483 class PackWriter_Remote(git.PackWriter):
484 def __init__(self, conn, objcache_maker, suggest_packs,
489 max_pack_objects=None):
490 git.PackWriter.__init__(self,
491 objcache_maker=objcache_maker,
492 compression_level=compression_level,
493 max_pack_size=max_pack_size,
494 max_pack_objects=max_pack_objects)
496 self.filename = b'remote socket'
497 self.suggest_packs = suggest_packs
499 self.onclose = onclose
500 self.ensure_busy = ensure_busy
501 self._packopen = False
503 self._bwtime = time.time()
506 if not self._packopen:
508 self._packopen = True
510 def _end(self, run_midx=True):
511 assert(run_midx) # We don't support this via remote yet
512 if self._packopen and self.file:
513 self.file.write(b'\0\0\0\0')
514 self._packopen = False
515 self.onclose() # Unbusy
517 return self.suggest_packs() # Returns last idx received
525 raise ClientError("don't know how to abort remote pack writing")
527 def _raw_write(self, datalist, sha):
529 if not self._packopen:
532 data = b''.join(datalist)
535 crc = zlib.crc32(data) & 0xffffffff
536 outbuf = b''.join((struct.pack('!I', len(data) + 20 + 4),
538 struct.pack('!I', crc),
541 (self._bwcount, self._bwtime) = _raw_write_bwlimit(
542 self.file, outbuf, self._bwcount, self._bwtime)
544 reraise(ClientError(e))
545 self.outbytes += len(data)
548 if self.file.has_input():
550 self.objcache.refresh()