2 from __future__ import absolute_import
3 import errno, os, re, struct, sys, time, zlib
5 from bup import git, ssh, vfs
6 from bup.compat import range
7 from bup.helpers import (Conn, atomically_replaced_file, chunkyreader, debug1,
8 debug2, linereader, lines_until_sentinel,
9 mkdirp, progress, qprogress)
10 from bup.vint import read_bvec, read_vuint, write_bvec
16 class ClientError(Exception):
20 def _raw_write_bwlimit(f, buf, bwcount, bwtime):
23 return (len(buf), time.time())
25 # We want to write in reasonably large blocks, but not so large that
26 # they're likely to overflow a router's queue. So our bwlimit timing
27 # has to be pretty granular. Also, if it takes too long from one
28 # transmit to the next, we can't just make up for lost time to bring
29 # the average back up to bwlimit - that will risk overflowing the
30 # outbound queue, which defeats the purpose. So if we fall behind
31 # by more than one block delay, we shouldn't ever try to catch up.
32 for i in range(0,len(buf),4096):
34 next = max(now, bwtime + 1.0*bwcount/bwlimit)
38 bwcount = len(sub) # might be less than 4096
40 return (bwcount, bwtime)
43 def parse_remote(remote):
44 protocol = r'([a-z]+)://'
45 host = r'(?P<sb>\[)?((?(sb)[0-9a-f:]+|[^:/]+))(?(sb)\])'
49 '%s(?:%s%s)?%s' % (protocol, host, port, path), remote, re.I)
51 if not url_match.group(1) in ('ssh', 'bup', 'file'):
52 raise ClientError, 'unexpected protocol: %s' % url_match.group(1)
53 return url_match.group(1,3,4,5)
55 rs = remote.split(':', 1)
56 if len(rs) == 1 or rs[0] in ('', '-'):
57 return 'file', None, None, rs[-1]
59 return 'ssh', rs[0], None, rs[1]
63 def __init__(self, remote, create=False):
64 self._busy = self.conn = None
65 self.sock = self.p = self.pout = self.pin = None
66 is_reverse = os.environ.get('BUP_SERVER_REVERSE')
69 remote = '%s:' % is_reverse
70 (self.protocol, self.host, self.port, self.dir) = parse_remote(remote)
71 self.cachedir = git.repo('index-cache/%s'
72 % re.sub(r'[^@\w]', '_',
73 "%s:%s" % (self.host, self.dir)))
75 self.pout = os.fdopen(3, 'rb')
76 self.pin = os.fdopen(4, 'wb')
77 self.conn = Conn(self.pout, self.pin)
79 if self.protocol in ('ssh', 'file'):
81 # FIXME: ssh and file shouldn't use the same module
82 self.p = ssh.connect(self.host, self.port, 'server')
83 self.pout = self.p.stdout
84 self.pin = self.p.stdin
85 self.conn = Conn(self.pout, self.pin)
87 raise ClientError, 'connect: %s' % e, sys.exc_info()[2]
88 elif self.protocol == 'bup':
89 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
90 self.sock.connect((self.host, atoi(self.port) or 1982))
91 self.sockw = self.sock.makefile('wb')
92 self.conn = DemuxConn(self.sock.fileno(), self.sockw)
93 self._available_commands = self._get_available_commands()
94 self._require_command('init-dir')
95 self._require_command('set-dir')
97 self.dir = re.sub(r'[\r\n]', ' ', self.dir)
99 self.conn.write('init-dir %s\n' % self.dir)
101 self.conn.write('set-dir %s\n' % self.dir)
109 if e.errno == errno.EPIPE:
115 if self.conn and not self._busy:
116 self.conn.write('quit\n')
119 if self.sock and self.sockw:
121 self.sock.shutdown(socket.SHUT_WR)
132 raise ClientError('server tunnel returned exit code %d' % rv)
134 self.sock = self.p = self.pin = self.pout = None
140 raise ClientError('server exited unexpectedly with code %r'
143 return self.conn.check_ok()
144 except Exception as e:
145 raise ClientError, e, sys.exc_info()[2]
147 def check_busy(self):
149 raise ClientError('already busy with command %r' % self._busy)
151 def ensure_busy(self):
153 raise ClientError('expected to be busy, but not busy?!')
158 def _get_available_commands(self):
164 line = self.conn.readline()
165 if not line == 'Commands:\n':
166 raise ClientError('unexpected help header ' + repr(line))
168 line = self.conn.readline()
171 if not line.startswith(' '):
172 raise ClientError('unexpected help line ' + repr(line))
175 raise ClientError('unexpected help line ' + repr(line))
178 not_ok = self.check_ok()
182 return frozenset(result)
184 def _require_command(self, name):
185 if name not in self._available_commands:
186 raise ClientError('server does not appear to provide %s command'
189 def sync_indexes(self):
190 self._require_command('list-indexes')
193 mkdirp(self.cachedir)
194 # All cached idxs are extra until proven otherwise
196 for f in os.listdir(self.cachedir):
198 if f.endswith('.idx'):
201 conn.write('list-indexes\n')
202 for line in linereader(conn):
205 assert(line.find('/') < 0)
206 parts = line.split(' ')
208 if len(parts) == 2 and parts[1] == 'load' and idx not in extra:
209 # If the server requests that we load an idx and we don't
210 # already have a copy of it, it is needed
212 # Any idx that the server has heard of is proven not extra
216 debug1('client: removing extra indexes: %s\n' % extra)
218 os.unlink(os.path.join(self.cachedir, idx))
219 debug1('client: server requested load of: %s\n' % needed)
222 git.auto_midx(self.cachedir)
224 def sync_index(self, name):
225 self._require_command('send-index')
226 #debug1('requesting %r\n' % name)
228 mkdirp(self.cachedir)
229 fn = os.path.join(self.cachedir, name)
230 if os.path.exists(fn):
231 msg = "won't request existing .idx, try `bup bloom --check %s`" % fn
232 raise ClientError(msg)
233 self.conn.write('send-index %s\n' % name)
234 n = struct.unpack('!I', self.conn.read(4))[0]
236 with atomically_replaced_file(fn, 'w') as f:
238 progress('Receiving index from server: %d/%d\r' % (count, n))
239 for b in chunkyreader(self.conn, n):
242 qprogress('Receiving index from server: %d/%d\r' % (count, n))
243 progress('Receiving index from server: %d/%d, done.\n' % (count, n))
246 def _make_objcache(self):
247 return git.PackIdxList(self.cachedir)
249 def _suggest_packs(self):
252 assert(ob == 'receive-objects-v2')
253 self.conn.write('\xff\xff\xff\xff') # suspend receive-objects-v2
255 for line in linereader(self.conn):
258 debug2('%s\n' % line)
259 if line.startswith('index '):
261 debug1('client: received index suggestion: %s\n'
262 % git.shorten_hash(idx))
263 suggested.append(idx)
265 assert(line.endswith('.idx'))
266 debug1('client: completed writing pack, idx: %s\n'
267 % git.shorten_hash(line))
268 suggested.append(line)
273 for idx in suggested:
275 git.auto_midx(self.cachedir)
278 self.conn.write('%s\n' % ob)
281 def new_packwriter(self, compression_level=1,
282 max_pack_size=None, max_pack_objects=None):
283 self._require_command('receive-objects-v2')
286 self._busy = 'receive-objects-v2'
287 self.conn.write('receive-objects-v2\n')
288 return PackWriter_Remote(self.conn,
289 objcache_maker = self._make_objcache,
290 suggest_packs = self._suggest_packs,
292 onclose = self._not_busy,
293 ensure_busy = self.ensure_busy,
294 compression_level=compression_level,
295 max_pack_size=max_pack_size,
296 max_pack_objects=max_pack_objects)
298 def read_ref(self, refname):
299 self._require_command('read-ref')
301 self.conn.write('read-ref %s\n' % refname)
302 r = self.conn.readline().strip()
305 assert(len(r) == 40) # hexified sha
306 return r.decode('hex')
308 return None # nonexistent ref
310 def update_ref(self, refname, newval, oldval):
311 self._require_command('update-ref')
313 self.conn.write('update-ref %s\n%s\n%s\n'
314 % (refname, newval.encode('hex'),
315 (oldval or '').encode('hex')))
319 self._require_command('join')
322 # Send 'cat' so we'll work fine with older versions
323 self.conn.write('cat %s\n' % re.sub(r'[\n\r]', '_', id))
325 sz = struct.unpack('!I', self.conn.read(4))[0]
327 yield self.conn.read(sz)
328 # FIXME: ok to assume the only NotOk is a KerError? (it is true atm)
332 raise KeyError(str(e))
334 def cat_batch(self, refs):
335 self._require_command('cat-batch')
337 self._busy = 'cat-batch'
339 conn.write('cat-batch\n')
340 # FIXME: do we want (only) binary protocol?
343 assert '\n' not in ref
348 info = conn.readline()
349 if info == 'missing\n':
350 yield None, None, None, None
352 if not (info and info.endswith('\n')):
353 raise ClientError('Hit EOF while looking for object info: %r'
355 oidx, oid_t, size = info.split(' ')
357 cr = chunkyreader(conn, size)
358 yield oidx, oid_t, size, cr
359 detritus = next(cr, None)
361 raise ClientError('unexpected leftover data ' + repr(detritus))
363 not_ok = self.check_ok()
368 def refs(self, patterns=None, limit_to_heads=False, limit_to_tags=False):
369 patterns = patterns or tuple()
370 self._require_command('refs')
374 conn.write('refs %s %s\n' % (1 if limit_to_heads else 0,
375 1 if limit_to_tags else 0))
376 for pattern in patterns:
377 assert '\n' not in pattern
381 for line in lines_until_sentinel(conn, '\n', ClientError):
383 oidx, name = line.split(' ')
385 raise ClientError('Invalid object fingerprint in %r' % line)
387 raise ClientError('Invalid reference name in %r' % line)
388 yield name, oidx.decode('hex')
390 not_ok = self.check_ok()
395 def rev_list(self, refs, count=None, parse=None, format=None):
396 """See git.rev_list for the general semantics, but note that with the
397 current interface, the parse function must be able to handle
398 (consume) any blank lines produced by the format because the
399 first one received that it doesn't consume will be interpreted
400 as a terminator for the entire rev-list result.
403 self._require_command('rev-list')
404 assert (count is None) or (isinstance(count, Integral))
406 assert '\n' not in format
410 assert '\n' not in ref
412 self._busy = 'rev-list'
414 conn.write('rev-list\n')
415 if count is not None:
416 conn.write(str(count))
426 for line in lines_until_sentinel(conn, '\n', ClientError):
428 assert len(line) == 40
431 for line in lines_until_sentinel(conn, '\n', ClientError):
432 if not line.startswith('commit '):
433 raise ClientError('unexpected line ' + repr(line))
434 cmt_oidx = line[7:].strip()
435 assert len(cmt_oidx) == 40
436 yield cmt_oidx, parse(conn)
438 not_ok = self.check_ok()
443 def resolve(self, path, parent=None, want_meta=True, follow=False):
444 self._require_command('resolve')
446 self._busy = 'resolve'
448 conn.write('resolve %d\n' % ((1 if want_meta else 0)
449 | (2 if follow else 0)
450 | (4 if parent else 0)))
452 vfs.write_resolution(conn, parent)
453 write_bvec(conn, path)
454 success = ord(conn.read(1))
455 assert success in (0, 1)
457 result = vfs.read_resolution(conn)
459 result = vfs.read_ioerror(conn)
461 not_ok = self.check_ok()
465 if isinstance(result, vfs.IOError):
470 class PackWriter_Remote(git.PackWriter):
471 def __init__(self, conn, objcache_maker, suggest_packs,
476 max_pack_objects=None):
477 git.PackWriter.__init__(self,
478 objcache_maker=objcache_maker,
479 compression_level=compression_level,
480 max_pack_size=max_pack_size,
481 max_pack_objects=max_pack_objects)
483 self.filename = 'remote socket'
484 self.suggest_packs = suggest_packs
486 self.onclose = onclose
487 self.ensure_busy = ensure_busy
488 self._packopen = False
490 self._bwtime = time.time()
493 if not self._packopen:
495 self._packopen = True
497 def _end(self, run_midx=True):
498 assert(run_midx) # We don't support this via remote yet
499 if self._packopen and self.file:
500 self.file.write('\0\0\0\0')
501 self._packopen = False
502 self.onclose() # Unbusy
504 return self.suggest_packs() # Returns last idx received
512 raise ClientError("don't know how to abort remote pack writing")
514 def _raw_write(self, datalist, sha):
516 if not self._packopen:
519 data = ''.join(datalist)
522 crc = zlib.crc32(data) & 0xffffffff
523 outbuf = ''.join((struct.pack('!I', len(data) + 20 + 4),
525 struct.pack('!I', crc),
528 (self._bwcount, self._bwtime) = _raw_write_bwlimit(
529 self.file, outbuf, self._bwcount, self._bwtime)
531 raise ClientError, e, sys.exc_info()[2]
532 self.outbytes += len(data)
535 if self.file.has_input():
537 self.objcache.refresh()