2 from __future__ import absolute_import
3 import errno, os, re, struct, sys, time, zlib
5 from bup import git, ssh, vfs
6 from bup.compat import range
7 from bup.helpers import (Conn, DemuxConn,
8 atomically_replaced_file, chunkyreader, debug1,
9 debug2, linereader, lines_until_sentinel,
10 mkdirp, progress, qprogress)
11 from bup.vint import read_bvec, read_vuint, write_bvec
17 class ClientError(Exception):
21 def _raw_write_bwlimit(f, buf, bwcount, bwtime):
24 return (len(buf), time.time())
26 # We want to write in reasonably large blocks, but not so large that
27 # they're likely to overflow a router's queue. So our bwlimit timing
28 # has to be pretty granular. Also, if it takes too long from one
29 # transmit to the next, we can't just make up for lost time to bring
30 # the average back up to bwlimit - that will risk overflowing the
31 # outbound queue, which defeats the purpose. So if we fall behind
32 # by more than one block delay, we shouldn't ever try to catch up.
33 for i in range(0,len(buf),4096):
35 next = max(now, bwtime + 1.0*bwcount/bwlimit)
39 bwcount = len(sub) # might be less than 4096
41 return (bwcount, bwtime)
44 def parse_remote(remote):
45 protocol = r'([a-z]+)://'
46 host = r'(?P<sb>\[)?((?(sb)[0-9a-f:]+|[^:/]+))(?(sb)\])'
50 '%s(?:%s%s)?%s' % (protocol, host, port, path), remote, re.I)
52 if not url_match.group(1) in ('ssh', 'bup', 'file'):
53 raise ClientError, 'unexpected protocol: %s' % url_match.group(1)
54 return url_match.group(1,3,4,5)
56 rs = remote.split(':', 1)
57 if len(rs) == 1 or rs[0] in ('', '-'):
58 return 'file', None, None, rs[-1]
60 return 'ssh', rs[0], None, rs[1]
64 def __init__(self, remote, create=False):
65 self._busy = self.conn = None
66 self.sock = self.p = self.pout = self.pin = None
67 is_reverse = os.environ.get('BUP_SERVER_REVERSE')
70 remote = '%s:' % is_reverse
71 (self.protocol, self.host, self.port, self.dir) = parse_remote(remote)
72 self.cachedir = git.repo('index-cache/%s'
73 % re.sub(r'[^@\w]', '_',
74 "%s:%s" % (self.host, self.dir)))
76 self.pout = os.fdopen(3, 'rb')
77 self.pin = os.fdopen(4, 'wb')
78 self.conn = Conn(self.pout, self.pin)
80 if self.protocol in ('ssh', 'file'):
82 # FIXME: ssh and file shouldn't use the same module
83 self.p = ssh.connect(self.host, self.port, 'server')
84 self.pout = self.p.stdout
85 self.pin = self.p.stdin
86 self.conn = Conn(self.pout, self.pin)
88 raise ClientError, 'connect: %s' % e, sys.exc_info()[2]
89 elif self.protocol == 'bup':
90 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
91 self.sock.connect((self.host, atoi(self.port) or 1982))
92 self.sockw = self.sock.makefile('wb')
93 self.conn = DemuxConn(self.sock.fileno(), self.sockw)
94 self._available_commands = self._get_available_commands()
95 self._require_command('init-dir')
96 self._require_command('set-dir')
98 self.dir = re.sub(r'[\r\n]', ' ', self.dir)
100 self.conn.write('init-dir %s\n' % self.dir)
102 self.conn.write('set-dir %s\n' % self.dir)
110 if e.errno == errno.EPIPE:
116 if self.conn and not self._busy:
117 self.conn.write('quit\n')
120 if self.sock and self.sockw:
122 self.sock.shutdown(socket.SHUT_WR)
133 raise ClientError('server tunnel returned exit code %d' % rv)
135 self.sock = self.p = self.pin = self.pout = None
141 raise ClientError('server exited unexpectedly with code %r'
144 return self.conn.check_ok()
145 except Exception as e:
146 raise ClientError, e, sys.exc_info()[2]
148 def check_busy(self):
150 raise ClientError('already busy with command %r' % self._busy)
152 def ensure_busy(self):
154 raise ClientError('expected to be busy, but not busy?!')
159 def _get_available_commands(self):
165 line = self.conn.readline()
166 if not line == 'Commands:\n':
167 raise ClientError('unexpected help header ' + repr(line))
169 line = self.conn.readline()
172 if not line.startswith(' '):
173 raise ClientError('unexpected help line ' + repr(line))
176 raise ClientError('unexpected help line ' + repr(line))
179 not_ok = self.check_ok()
183 return frozenset(result)
185 def _require_command(self, name):
186 if name not in self._available_commands:
187 raise ClientError('server does not appear to provide %s command'
190 def sync_indexes(self):
191 self._require_command('list-indexes')
194 mkdirp(self.cachedir)
195 # All cached idxs are extra until proven otherwise
197 for f in os.listdir(self.cachedir):
199 if f.endswith('.idx'):
202 conn.write('list-indexes\n')
203 for line in linereader(conn):
206 assert(line.find('/') < 0)
207 parts = line.split(' ')
209 if len(parts) == 2 and parts[1] == 'load' and idx not in extra:
210 # If the server requests that we load an idx and we don't
211 # already have a copy of it, it is needed
213 # Any idx that the server has heard of is proven not extra
217 debug1('client: removing extra indexes: %s\n' % extra)
219 os.unlink(os.path.join(self.cachedir, idx))
220 debug1('client: server requested load of: %s\n' % needed)
223 git.auto_midx(self.cachedir)
225 def sync_index(self, name):
226 self._require_command('send-index')
227 #debug1('requesting %r\n' % name)
229 mkdirp(self.cachedir)
230 fn = os.path.join(self.cachedir, name)
231 if os.path.exists(fn):
232 msg = "won't request existing .idx, try `bup bloom --check %s`" % fn
233 raise ClientError(msg)
234 self.conn.write('send-index %s\n' % name)
235 n = struct.unpack('!I', self.conn.read(4))[0]
237 with atomically_replaced_file(fn, 'w') as f:
239 progress('Receiving index from server: %d/%d\r' % (count, n))
240 for b in chunkyreader(self.conn, n):
243 qprogress('Receiving index from server: %d/%d\r' % (count, n))
244 progress('Receiving index from server: %d/%d, done.\n' % (count, n))
247 def _make_objcache(self):
248 return git.PackIdxList(self.cachedir)
250 def _suggest_packs(self):
253 assert(ob == 'receive-objects-v2')
254 self.conn.write('\xff\xff\xff\xff') # suspend receive-objects-v2
256 for line in linereader(self.conn):
259 debug2('%s\n' % line)
260 if line.startswith('index '):
262 debug1('client: received index suggestion: %s\n'
263 % git.shorten_hash(idx))
264 suggested.append(idx)
266 assert(line.endswith('.idx'))
267 debug1('client: completed writing pack, idx: %s\n'
268 % git.shorten_hash(line))
269 suggested.append(line)
274 for idx in suggested:
276 git.auto_midx(self.cachedir)
279 self.conn.write('%s\n' % ob)
282 def new_packwriter(self, compression_level=1,
283 max_pack_size=None, max_pack_objects=None):
284 self._require_command('receive-objects-v2')
287 self._busy = 'receive-objects-v2'
288 self.conn.write('receive-objects-v2\n')
289 return PackWriter_Remote(self.conn,
290 objcache_maker = self._make_objcache,
291 suggest_packs = self._suggest_packs,
293 onclose = self._not_busy,
294 ensure_busy = self.ensure_busy,
295 compression_level=compression_level,
296 max_pack_size=max_pack_size,
297 max_pack_objects=max_pack_objects)
299 def read_ref(self, refname):
300 self._require_command('read-ref')
302 self.conn.write('read-ref %s\n' % refname)
303 r = self.conn.readline().strip()
306 assert(len(r) == 40) # hexified sha
307 return r.decode('hex')
309 return None # nonexistent ref
311 def update_ref(self, refname, newval, oldval):
312 self._require_command('update-ref')
314 self.conn.write('update-ref %s\n%s\n%s\n'
315 % (refname, newval.encode('hex'),
316 (oldval or '').encode('hex')))
320 self._require_command('join')
323 # Send 'cat' so we'll work fine with older versions
324 self.conn.write('cat %s\n' % re.sub(r'[\n\r]', '_', id))
326 sz = struct.unpack('!I', self.conn.read(4))[0]
328 yield self.conn.read(sz)
329 # FIXME: ok to assume the only NotOk is a KerError? (it is true atm)
333 raise KeyError(str(e))
335 def cat_batch(self, refs):
336 self._require_command('cat-batch')
338 self._busy = 'cat-batch'
340 conn.write('cat-batch\n')
341 # FIXME: do we want (only) binary protocol?
344 assert '\n' not in ref
349 info = conn.readline()
350 if info == 'missing\n':
351 yield None, None, None, None
353 if not (info and info.endswith('\n')):
354 raise ClientError('Hit EOF while looking for object info: %r'
356 oidx, oid_t, size = info.split(' ')
358 cr = chunkyreader(conn, size)
359 yield oidx, oid_t, size, cr
360 detritus = next(cr, None)
362 raise ClientError('unexpected leftover data ' + repr(detritus))
364 not_ok = self.check_ok()
369 def refs(self, patterns=None, limit_to_heads=False, limit_to_tags=False):
370 patterns = patterns or tuple()
371 self._require_command('refs')
375 conn.write('refs %s %s\n' % (1 if limit_to_heads else 0,
376 1 if limit_to_tags else 0))
377 for pattern in patterns:
378 assert '\n' not in pattern
382 for line in lines_until_sentinel(conn, '\n', ClientError):
384 oidx, name = line.split(' ')
386 raise ClientError('Invalid object fingerprint in %r' % line)
388 raise ClientError('Invalid reference name in %r' % line)
389 yield name, oidx.decode('hex')
391 not_ok = self.check_ok()
396 def rev_list(self, refs, count=None, parse=None, format=None):
397 """See git.rev_list for the general semantics, but note that with the
398 current interface, the parse function must be able to handle
399 (consume) any blank lines produced by the format because the
400 first one received that it doesn't consume will be interpreted
401 as a terminator for the entire rev-list result.
404 self._require_command('rev-list')
405 assert (count is None) or (isinstance(count, Integral))
407 assert '\n' not in format
411 assert '\n' not in ref
413 self._busy = 'rev-list'
415 conn.write('rev-list\n')
416 if count is not None:
417 conn.write(str(count))
427 for line in lines_until_sentinel(conn, '\n', ClientError):
429 assert len(line) == 40
432 for line in lines_until_sentinel(conn, '\n', ClientError):
433 if not line.startswith('commit '):
434 raise ClientError('unexpected line ' + repr(line))
435 cmt_oidx = line[7:].strip()
436 assert len(cmt_oidx) == 40
437 yield cmt_oidx, parse(conn)
439 not_ok = self.check_ok()
444 def resolve(self, path, parent=None, want_meta=True, follow=False):
445 self._require_command('resolve')
447 self._busy = 'resolve'
449 conn.write('resolve %d\n' % ((1 if want_meta else 0)
450 | (2 if follow else 0)
451 | (4 if parent else 0)))
453 vfs.write_resolution(conn, parent)
454 write_bvec(conn, path)
455 success = ord(conn.read(1))
456 assert success in (0, 1)
458 result = vfs.read_resolution(conn)
460 result = vfs.read_ioerror(conn)
462 not_ok = self.check_ok()
466 if isinstance(result, vfs.IOError):
471 class PackWriter_Remote(git.PackWriter):
472 def __init__(self, conn, objcache_maker, suggest_packs,
477 max_pack_objects=None):
478 git.PackWriter.__init__(self,
479 objcache_maker=objcache_maker,
480 compression_level=compression_level,
481 max_pack_size=max_pack_size,
482 max_pack_objects=max_pack_objects)
484 self.filename = 'remote socket'
485 self.suggest_packs = suggest_packs
487 self.onclose = onclose
488 self.ensure_busy = ensure_busy
489 self._packopen = False
491 self._bwtime = time.time()
494 if not self._packopen:
496 self._packopen = True
498 def _end(self, run_midx=True):
499 assert(run_midx) # We don't support this via remote yet
500 if self._packopen and self.file:
501 self.file.write('\0\0\0\0')
502 self._packopen = False
503 self.onclose() # Unbusy
505 return self.suggest_packs() # Returns last idx received
513 raise ClientError("don't know how to abort remote pack writing")
515 def _raw_write(self, datalist, sha):
517 if not self._packopen:
520 data = ''.join(datalist)
523 crc = zlib.crc32(data) & 0xffffffff
524 outbuf = ''.join((struct.pack('!I', len(data) + 20 + 4),
526 struct.pack('!I', crc),
529 (self._bwcount, self._bwtime) = _raw_write_bwlimit(
530 self.file, outbuf, self._bwcount, self._bwtime)
532 raise ClientError, e, sys.exc_info()[2]
533 self.outbytes += len(data)
536 if self.file.has_input():
538 self.objcache.refresh()