2 from __future__ import absolute_import
3 import errno, os, re, struct, sys, time, zlib
5 from bup import git, ssh
6 from bup.compat import range
7 from bup.helpers import (Conn, atomically_replaced_file, chunkyreader, debug1,
8 debug2, linereader, lines_until_sentinel,
9 mkdirp, progress, qprogress)
15 class ClientError(Exception):
19 def _raw_write_bwlimit(f, buf, bwcount, bwtime):
22 return (len(buf), time.time())
24 # We want to write in reasonably large blocks, but not so large that
25 # they're likely to overflow a router's queue. So our bwlimit timing
26 # has to be pretty granular. Also, if it takes too long from one
27 # transmit to the next, we can't just make up for lost time to bring
28 # the average back up to bwlimit - that will risk overflowing the
29 # outbound queue, which defeats the purpose. So if we fall behind
30 # by more than one block delay, we shouldn't ever try to catch up.
31 for i in range(0,len(buf),4096):
33 next = max(now, bwtime + 1.0*bwcount/bwlimit)
37 bwcount = len(sub) # might be less than 4096
39 return (bwcount, bwtime)
42 def parse_remote(remote):
43 protocol = r'([a-z]+)://'
44 host = r'(?P<sb>\[)?((?(sb)[0-9a-f:]+|[^:/]+))(?(sb)\])'
48 '%s(?:%s%s)?%s' % (protocol, host, port, path), remote, re.I)
50 if not url_match.group(1) in ('ssh', 'bup', 'file'):
51 raise ClientError, 'unexpected protocol: %s' % url_match.group(1)
52 return url_match.group(1,3,4,5)
54 rs = remote.split(':', 1)
55 if len(rs) == 1 or rs[0] in ('', '-'):
56 return 'file', None, None, rs[-1]
58 return 'ssh', rs[0], None, rs[1]
62 def __init__(self, remote, create=False):
63 self._busy = self.conn = None
64 self.sock = self.p = self.pout = self.pin = None
65 is_reverse = os.environ.get('BUP_SERVER_REVERSE')
68 remote = '%s:' % is_reverse
69 (self.protocol, self.host, self.port, self.dir) = parse_remote(remote)
70 self.cachedir = git.repo('index-cache/%s'
71 % re.sub(r'[^@\w]', '_',
72 "%s:%s" % (self.host, self.dir)))
74 self.pout = os.fdopen(3, 'rb')
75 self.pin = os.fdopen(4, 'wb')
76 self.conn = Conn(self.pout, self.pin)
78 if self.protocol in ('ssh', 'file'):
80 # FIXME: ssh and file shouldn't use the same module
81 self.p = ssh.connect(self.host, self.port, 'server')
82 self.pout = self.p.stdout
83 self.pin = self.p.stdin
84 self.conn = Conn(self.pout, self.pin)
86 raise ClientError, 'connect: %s' % e, sys.exc_info()[2]
87 elif self.protocol == 'bup':
88 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
89 self.sock.connect((self.host, atoi(self.port) or 1982))
90 self.sockw = self.sock.makefile('wb')
91 self.conn = DemuxConn(self.sock.fileno(), self.sockw)
92 self._available_commands = self._get_available_commands()
93 self._require_command('init-dir')
94 self._require_command('set-dir')
96 self.dir = re.sub(r'[\r\n]', ' ', self.dir)
98 self.conn.write('init-dir %s\n' % self.dir)
100 self.conn.write('set-dir %s\n' % self.dir)
108 if e.errno == errno.EPIPE:
114 if self.conn and not self._busy:
115 self.conn.write('quit\n')
118 if self.sock and self.sockw:
120 self.sock.shutdown(socket.SHUT_WR)
131 raise ClientError('server tunnel returned exit code %d' % rv)
133 self.sock = self.p = self.pin = self.pout = None
139 raise ClientError('server exited unexpectedly with code %r'
142 return self.conn.check_ok()
143 except Exception as e:
144 raise ClientError, e, sys.exc_info()[2]
146 def check_busy(self):
148 raise ClientError('already busy with command %r' % self._busy)
150 def ensure_busy(self):
152 raise ClientError('expected to be busy, but not busy?!')
157 def _get_available_commands(self):
163 line = self.conn.readline()
164 if not line == 'Commands:\n':
165 raise ClientError('unexpected help header ' + repr(line))
167 line = self.conn.readline()
170 if not line.startswith(' '):
171 raise ClientError('unexpected help line ' + repr(line))
174 raise ClientError('unexpected help line ' + repr(line))
177 not_ok = self.check_ok()
181 return frozenset(result)
183 def _require_command(self, name):
184 if name not in self._available_commands:
185 raise ClientError('server does not appear to provide %s command'
188 def sync_indexes(self):
189 self._require_command('list-indexes')
192 mkdirp(self.cachedir)
193 # All cached idxs are extra until proven otherwise
195 for f in os.listdir(self.cachedir):
197 if f.endswith('.idx'):
200 conn.write('list-indexes\n')
201 for line in linereader(conn):
204 assert(line.find('/') < 0)
205 parts = line.split(' ')
207 if len(parts) == 2 and parts[1] == 'load' and idx not in extra:
208 # If the server requests that we load an idx and we don't
209 # already have a copy of it, it is needed
211 # Any idx that the server has heard of is proven not extra
215 debug1('client: removing extra indexes: %s\n' % extra)
217 os.unlink(os.path.join(self.cachedir, idx))
218 debug1('client: server requested load of: %s\n' % needed)
221 git.auto_midx(self.cachedir)
223 def sync_index(self, name):
224 self._require_command('send-index')
225 #debug1('requesting %r\n' % name)
227 mkdirp(self.cachedir)
228 fn = os.path.join(self.cachedir, name)
229 if os.path.exists(fn):
230 msg = "won't request existing .idx, try `bup bloom --check %s`" % fn
231 raise ClientError(msg)
232 self.conn.write('send-index %s\n' % name)
233 n = struct.unpack('!I', self.conn.read(4))[0]
235 with atomically_replaced_file(fn, 'w') as f:
237 progress('Receiving index from server: %d/%d\r' % (count, n))
238 for b in chunkyreader(self.conn, n):
241 qprogress('Receiving index from server: %d/%d\r' % (count, n))
242 progress('Receiving index from server: %d/%d, done.\n' % (count, n))
245 def _make_objcache(self):
246 return git.PackIdxList(self.cachedir)
248 def _suggest_packs(self):
251 assert(ob == 'receive-objects-v2')
252 self.conn.write('\xff\xff\xff\xff') # suspend receive-objects-v2
254 for line in linereader(self.conn):
257 debug2('%s\n' % line)
258 if line.startswith('index '):
260 debug1('client: received index suggestion: %s\n'
261 % git.shorten_hash(idx))
262 suggested.append(idx)
264 assert(line.endswith('.idx'))
265 debug1('client: completed writing pack, idx: %s\n'
266 % git.shorten_hash(line))
267 suggested.append(line)
272 for idx in suggested:
274 git.auto_midx(self.cachedir)
277 self.conn.write('%s\n' % ob)
280 def new_packwriter(self, compression_level=1,
281 max_pack_size=None, max_pack_objects=None):
282 self._require_command('receive-objects-v2')
285 self._busy = 'receive-objects-v2'
286 self.conn.write('receive-objects-v2\n')
287 return PackWriter_Remote(self.conn,
288 objcache_maker = self._make_objcache,
289 suggest_packs = self._suggest_packs,
291 onclose = self._not_busy,
292 ensure_busy = self.ensure_busy,
293 compression_level=compression_level,
294 max_pack_size=max_pack_size,
295 max_pack_objects=max_pack_objects)
297 def read_ref(self, refname):
298 self._require_command('read-ref')
300 self.conn.write('read-ref %s\n' % refname)
301 r = self.conn.readline().strip()
304 assert(len(r) == 40) # hexified sha
305 return r.decode('hex')
307 return None # nonexistent ref
309 def update_ref(self, refname, newval, oldval):
310 self._require_command('update-ref')
312 self.conn.write('update-ref %s\n%s\n%s\n'
313 % (refname, newval.encode('hex'),
314 (oldval or '').encode('hex')))
318 self._require_command('join')
321 # Send 'cat' so we'll work fine with older versions
322 self.conn.write('cat %s\n' % re.sub(r'[\n\r]', '_', id))
324 sz = struct.unpack('!I', self.conn.read(4))[0]
326 yield self.conn.read(sz)
327 # FIXME: ok to assume the only NotOk is a KerError? (it is true atm)
331 raise KeyError(str(e))
333 def cat_batch(self, refs):
334 self._require_command('cat-batch')
336 self._busy = 'cat-batch'
338 conn.write('cat-batch\n')
339 # FIXME: do we want (only) binary protocol?
342 assert '\n' not in ref
347 info = conn.readline()
348 if info == 'missing\n':
349 yield None, None, None, None
351 if not (info and info.endswith('\n')):
352 raise ClientError('Hit EOF while looking for object info: %r'
354 oidx, oid_t, size = info.split(' ')
356 cr = chunkyreader(conn, size)
357 yield oidx, oid_t, size, cr
358 detritus = next(cr, None)
360 raise ClientError('unexpected leftover data ' + repr(detritus))
362 not_ok = self.check_ok()
367 def refs(self, patterns=None, limit_to_heads=False, limit_to_tags=False):
368 patterns = patterns or tuple()
369 self._require_command('refs')
373 conn.write('refs %s %s\n' % (1 if limit_to_heads else 0,
374 1 if limit_to_tags else 0))
375 for pattern in patterns:
376 assert '\n' not in pattern
380 for line in lines_until_sentinel(conn, '\n', ClientError):
382 oidx, name = line.split(' ')
384 raise ClientError('Invalid object fingerprint in %r' % line)
386 raise ClientError('Invalid reference name in %r' % line)
387 yield name, oidx.decode('hex')
389 not_ok = self.check_ok()
394 def rev_list(self, refs, count=None, parse=None, format=None):
395 self._require_command('rev-list')
396 assert (count is None) or (isinstance(count, Integral))
398 assert '\n' not in format
402 assert '\n' not in ref
404 self._busy = 'rev-list'
406 conn.write('rev-list\n')
407 if count is not None:
408 conn.write(str(count))
418 for _ in range(len(refs)):
419 line = conn.readline()
421 raise ClientError('unexpected EOF')
423 assert len(line) == 40
426 for _ in range(len(refs)):
427 line = conn.readline()
429 raise ClientError('unexpected EOF')
430 if not line.startswith('commit '):
431 raise ClientError('unexpected line ' + repr(line))
432 yield line[7:].strip(), parse(conn)
434 not_ok = self.check_ok()
440 class PackWriter_Remote(git.PackWriter):
441 def __init__(self, conn, objcache_maker, suggest_packs,
446 max_pack_objects=None):
447 git.PackWriter.__init__(self,
448 objcache_maker=objcache_maker,
449 compression_level=compression_level,
450 max_pack_size=max_pack_size,
451 max_pack_objects=max_pack_objects)
453 self.filename = 'remote socket'
454 self.suggest_packs = suggest_packs
456 self.onclose = onclose
457 self.ensure_busy = ensure_busy
458 self._packopen = False
460 self._bwtime = time.time()
463 if not self._packopen:
465 self._packopen = True
467 def _end(self, run_midx=True):
468 assert(run_midx) # We don't support this via remote yet
469 if self._packopen and self.file:
470 self.file.write('\0\0\0\0')
471 self._packopen = False
472 self.onclose() # Unbusy
474 return self.suggest_packs() # Returns last idx received
482 raise ClientError("don't know how to abort remote pack writing")
484 def _raw_write(self, datalist, sha):
486 if not self._packopen:
489 data = ''.join(datalist)
492 crc = zlib.crc32(data) & 0xffffffff
493 outbuf = ''.join((struct.pack('!I', len(data) + 20 + 4),
495 struct.pack('!I', crc),
498 (self._bwcount, self._bwtime) = _raw_write_bwlimit(
499 self.file, outbuf, self._bwcount, self._bwtime)
501 raise ClientError, e, sys.exc_info()[2]
502 self.outbytes += len(data)
505 if self.file.has_input():
507 self.objcache.refresh()