2 from __future__ import print_function
4 from __future__ import absolute_import
5 from binascii import hexlify, unhexlify
6 import errno, os, re, struct, time, zlib
9 from bup import git, ssh, vfs
10 from bup.compat import environ, range, reraise
11 from bup.helpers import (Conn, atomically_replaced_file, chunkyreader, debug1,
12 debug2, linereader, lines_until_sentinel,
13 mkdirp, progress, qprogress, DemuxConn)
14 from bup.io import path_msg
15 from bup.vint import write_bvec
21 class ClientError(Exception):
25 def _raw_write_bwlimit(f, buf, bwcount, bwtime):
28 return (len(buf), time.time())
30 # We want to write in reasonably large blocks, but not so large that
31 # they're likely to overflow a router's queue. So our bwlimit timing
32 # has to be pretty granular. Also, if it takes too long from one
33 # transmit to the next, we can't just make up for lost time to bring
34 # the average back up to bwlimit - that will risk overflowing the
35 # outbound queue, which defeats the purpose. So if we fall behind
36 # by more than one block delay, we shouldn't ever try to catch up.
37 for i in range(0,len(buf),4096):
39 next = max(now, bwtime + 1.0*bwcount/bwlimit)
43 bwcount = len(sub) # might be less than 4096
45 return (bwcount, bwtime)
48 _protocol_rs = br'([a-z]+)://'
49 _host_rs = br'(?P<sb>\[)?((?(sb)[0-9a-f:]+|[^:/]+))(?(sb)\])'
50 _port_rs = br'(?::(\d+))?'
52 _url_rx = re.compile(br'%s(?:%s%s)?%s' % (_protocol_rs, _host_rs, _port_rs, _path_rs),
55 def parse_remote(remote):
56 url_match = _url_rx.match(remote)
58 if not url_match.group(1) in (b'ssh', b'bup', b'file'):
59 raise ClientError('unexpected protocol: %s'
60 % url_match.group(1).decode('ascii'))
61 return url_match.group(1,3,4,5)
63 rs = remote.split(b':', 1)
64 if len(rs) == 1 or rs[0] in (b'', b'-'):
65 return b'file', None, None, rs[-1]
67 return b'ssh', rs[0], None, rs[1]
71 def __init__(self, remote, create=False):
72 self._busy = self.conn = None
73 self.sock = self.p = self.pout = self.pin = None
74 is_reverse = environ.get(b'BUP_SERVER_REVERSE')
77 remote = b'%s:' % is_reverse
78 (self.protocol, self.host, self.port, self.dir) = parse_remote(remote)
79 # The b'None' here matches python2's behavior of b'%s' % None == 'None',
80 # python3 will (as of version 3.7.5) do the same for str ('%s' % None),
81 # but crashes instead when doing b'%s' % None.
82 cachehost = b'None' if self.host is None else self.host
83 cachedir = b'None' if self.dir is None else self.dir
84 self.cachedir = git.repo(b'index-cache/%s'
87 b'%s:%s' % (cachehost, cachedir)))
89 self.pout = os.fdopen(3, 'rb')
90 self.pin = os.fdopen(4, 'wb')
91 self.conn = Conn(self.pout, self.pin)
93 if self.protocol in (b'ssh', b'file'):
95 # FIXME: ssh and file shouldn't use the same module
96 self.p = ssh.connect(self.host, self.port, b'server')
97 self.pout = self.p.stdout
98 self.pin = self.p.stdin
99 self.conn = Conn(self.pout, self.pin)
101 reraise(ClientError('connect: %s' % e))
102 elif self.protocol == b'bup':
103 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
104 self.sock.connect((self.host,
105 1982 if self.port is None else int(self.port)))
106 self.sockw = self.sock.makefile('wb')
107 self.conn = DemuxConn(self.sock.fileno(), self.sockw)
108 self._available_commands = self._get_available_commands()
109 self._require_command(b'init-dir')
110 self._require_command(b'set-dir')
112 self.dir = re.sub(br'[\r\n]', ' ', self.dir)
114 self.conn.write(b'init-dir %s\n' % self.dir)
116 self.conn.write(b'set-dir %s\n' % self.dir)
124 if e.errno == errno.EPIPE:
130 if self.conn and not self._busy:
131 self.conn.write(b'quit\n')
134 if self.sock and self.sockw:
136 self.sock.shutdown(socket.SHUT_WR)
147 raise ClientError('server tunnel returned exit code %d' % rv)
149 self.sock = self.p = self.pin = self.pout = None
155 raise ClientError('server exited unexpectedly with code %r'
158 return self.conn.check_ok()
159 except Exception as e:
160 reraise(ClientError(e))
161 # reraise doesn't return
164 def check_busy(self):
166 raise ClientError('already busy with command %r' % self._busy)
168 def ensure_busy(self):
170 raise ClientError('expected to be busy, but not busy?!')
175 def _get_available_commands(self):
179 conn.write(b'help\n')
181 line = self.conn.readline()
182 if not line == b'Commands:\n':
183 raise ClientError('unexpected help header ' + repr(line))
185 line = self.conn.readline()
188 if not line.startswith(b' '):
189 raise ClientError('unexpected help line ' + repr(line))
192 raise ClientError('unexpected help line ' + repr(line))
195 not_ok = self.check_ok()
199 return frozenset(result)
201 def _require_command(self, name):
202 if name not in self._available_commands:
203 raise ClientError('server does not appear to provide %s command'
204 % name.encode('ascii'))
206 def sync_indexes(self):
207 self._require_command(b'list-indexes')
210 mkdirp(self.cachedir)
211 # All cached idxs are extra until proven otherwise
213 for f in os.listdir(self.cachedir):
214 debug1(path_msg(f) + '\n')
215 if f.endswith(b'.idx'):
218 conn.write(b'list-indexes\n')
219 for line in linereader(conn):
222 assert(line.find(b'/') < 0)
223 parts = line.split(b' ')
225 if len(parts) == 2 and parts[1] == b'load' and idx not in extra:
226 # If the server requests that we load an idx and we don't
227 # already have a copy of it, it is needed
229 # Any idx that the server has heard of is proven not extra
233 debug1('client: removing extra indexes: %s\n' % extra)
235 os.unlink(os.path.join(self.cachedir, idx))
236 debug1('client: server requested load of: %s\n' % needed)
239 git.auto_midx(self.cachedir)
241 def sync_index(self, name):
242 self._require_command(b'send-index')
243 #debug1('requesting %r\n' % name)
245 mkdirp(self.cachedir)
246 fn = os.path.join(self.cachedir, name)
247 if os.path.exists(fn):
248 msg = ("won't request existing .idx, try `bup bloom --check %s`"
250 raise ClientError(msg)
251 self.conn.write(b'send-index %s\n' % name)
252 n = struct.unpack('!I', self.conn.read(4))[0]
254 with atomically_replaced_file(fn, 'wb') as f:
256 progress('Receiving index from server: %d/%d\r' % (count, n))
257 for b in chunkyreader(self.conn, n):
260 qprogress('Receiving index from server: %d/%d\r' % (count, n))
261 progress('Receiving index from server: %d/%d, done.\n' % (count, n))
264 def _make_objcache(self):
265 return git.PackIdxList(self.cachedir)
267 def _suggest_packs(self):
270 assert(ob == b'receive-objects-v2')
271 self.conn.write(b'\xff\xff\xff\xff') # suspend receive-objects-v2
273 for line in linereader(self.conn):
276 debug2('%r\n' % line)
277 if line.startswith(b'index '):
279 debug1('client: received index suggestion: %s\n'
280 % git.shorten_hash(idx).decode('ascii'))
281 suggested.append(idx)
283 assert(line.endswith(b'.idx'))
284 debug1('client: completed writing pack, idx: %s\n'
285 % git.shorten_hash(line).decode('ascii'))
286 suggested.append(line)
291 for idx in suggested:
293 git.auto_midx(self.cachedir)
296 self.conn.write(b'%s\n' % ob)
299 def new_packwriter(self, compression_level=1,
300 max_pack_size=None, max_pack_objects=None):
301 self._require_command(b'receive-objects-v2')
304 self._busy = b'receive-objects-v2'
305 self.conn.write(b'receive-objects-v2\n')
306 return PackWriter_Remote(self.conn,
307 objcache_maker = self._make_objcache,
308 suggest_packs = self._suggest_packs,
310 onclose = self._not_busy,
311 ensure_busy = self.ensure_busy,
312 compression_level=compression_level,
313 max_pack_size=max_pack_size,
314 max_pack_objects=max_pack_objects)
316 def read_ref(self, refname):
317 self._require_command(b'read-ref')
319 self.conn.write(b'read-ref %s\n' % refname)
320 r = self.conn.readline().strip()
323 assert(len(r) == 40) # hexified sha
326 return None # nonexistent ref
328 def update_ref(self, refname, newval, oldval):
329 self._require_command(b'update-ref')
331 self.conn.write(b'update-ref %s\n%s\n%s\n'
332 % (refname, hexlify(newval),
333 hexlify(oldval) if oldval else b''))
337 self._require_command(b'join')
340 # Send 'cat' so we'll work fine with older versions
341 self.conn.write(b'cat %s\n' % re.sub(br'[\n\r]', b'_', id))
343 sz = struct.unpack('!I', self.conn.read(4))[0]
345 yield self.conn.read(sz)
346 # FIXME: ok to assume the only NotOk is a KerError? (it is true atm)
350 raise KeyError(str(e))
352 def cat_batch(self, refs):
353 self._require_command(b'cat-batch')
355 self._busy = b'cat-batch'
357 conn.write(b'cat-batch\n')
358 # FIXME: do we want (only) binary protocol?
361 assert b'\n' not in ref
366 info = conn.readline()
367 if info == b'missing\n':
368 yield None, None, None, None
370 if not (info and info.endswith(b'\n')):
371 raise ClientError('Hit EOF while looking for object info: %r'
373 oidx, oid_t, size = info.split(b' ')
375 cr = chunkyreader(conn, size)
376 yield oidx, oid_t, size, cr
377 detritus = next(cr, None)
379 raise ClientError('unexpected leftover data ' + repr(detritus))
381 not_ok = self.check_ok()
386 def refs(self, patterns=None, limit_to_heads=False, limit_to_tags=False):
387 patterns = patterns or tuple()
388 self._require_command(b'refs')
392 conn.write(b'refs %d %d\n' % (1 if limit_to_heads else 0,
393 1 if limit_to_tags else 0))
394 for pattern in patterns:
395 assert b'\n' not in pattern
399 for line in lines_until_sentinel(conn, b'\n', ClientError):
401 oidx, name = line.split(b' ')
403 raise ClientError('Invalid object fingerprint in %r' % line)
405 raise ClientError('Invalid reference name in %r' % line)
406 yield name, unhexlify(oidx)
408 not_ok = self.check_ok()
413 def rev_list(self, refs, parse=None, format=None):
414 """See git.rev_list for the general semantics, but note that with the
415 current interface, the parse function must be able to handle
416 (consume) any blank lines produced by the format because the
417 first one received that it doesn't consume will be interpreted
418 as a terminator for the entire rev-list result.
421 self._require_command(b'rev-list')
423 assert b'\n' not in format
427 assert b'\n' not in ref
429 self._busy = b'rev-list'
431 conn.write(b'rev-list\n')
441 for line in lines_until_sentinel(conn, b'\n', ClientError):
443 assert len(line) == 40
446 for line in lines_until_sentinel(conn, b'\n', ClientError):
447 if not line.startswith(b'commit '):
448 raise ClientError('unexpected line ' + repr(line))
449 cmt_oidx = line[7:].strip()
450 assert len(cmt_oidx) == 40
451 yield cmt_oidx, parse(conn)
453 not_ok = self.check_ok()
458 def resolve(self, path, parent=None, want_meta=True, follow=False):
459 self._require_command(b'resolve')
461 self._busy = b'resolve'
463 conn.write(b'resolve %d\n' % ((1 if want_meta else 0)
464 | (2 if follow else 0)
465 | (4 if parent else 0)))
467 vfs.write_resolution(conn, parent)
468 write_bvec(conn, path)
469 success = ord(conn.read(1))
470 assert success in (0, 1)
472 result = vfs.read_resolution(conn)
474 result = vfs.read_ioerror(conn)
476 not_ok = self.check_ok()
480 if isinstance(result, vfs.IOError):
485 class PackWriter_Remote(git.PackWriter):
486 def __init__(self, conn, objcache_maker, suggest_packs,
491 max_pack_objects=None):
492 git.PackWriter.__init__(self,
493 objcache_maker=objcache_maker,
494 compression_level=compression_level,
495 max_pack_size=max_pack_size,
496 max_pack_objects=max_pack_objects)
498 self.filename = b'remote socket'
499 self.suggest_packs = suggest_packs
501 self.onclose = onclose
502 self.ensure_busy = ensure_busy
503 self._packopen = False
505 self._bwtime = time.time()
508 if not self._packopen:
510 self._packopen = True
512 def _end(self, run_midx=True):
513 assert(run_midx) # We don't support this via remote yet
514 if self._packopen and self.file:
515 self.file.write(b'\0\0\0\0')
516 self._packopen = False
517 self.onclose() # Unbusy
519 return self.suggest_packs() # Returns last idx received
528 raise ClientError("don't know how to abort remote pack writing")
530 def _raw_write(self, datalist, sha):
532 if not self._packopen:
535 data = b''.join(datalist)
538 crc = zlib.crc32(data) & 0xffffffff
539 outbuf = b''.join((struct.pack('!I', len(data) + 20 + 4),
541 struct.pack('!I', crc),
544 (self._bwcount, self._bwtime) = _raw_write_bwlimit(
545 self.file, outbuf, self._bwcount, self._bwtime)
547 reraise(ClientError(e))
548 self.outbytes += len(data)
551 if self.file.has_input():
553 self.objcache.refresh()