2 from __future__ import absolute_import
3 import errno, os, re, struct, sys, time, zlib
5 from bup import git, ssh
6 from bup.helpers import (Conn, atomically_replaced_file, chunkyreader, debug1,
7 debug2, linereader, lines_until_sentinel,
8 mkdirp, progress, qprogress)
14 class ClientError(Exception):
18 def _raw_write_bwlimit(f, buf, bwcount, bwtime):
21 return (len(buf), time.time())
23 # We want to write in reasonably large blocks, but not so large that
24 # they're likely to overflow a router's queue. So our bwlimit timing
25 # has to be pretty granular. Also, if it takes too long from one
26 # transmit to the next, we can't just make up for lost time to bring
27 # the average back up to bwlimit - that will risk overflowing the
28 # outbound queue, which defeats the purpose. So if we fall behind
29 # by more than one block delay, we shouldn't ever try to catch up.
30 for i in xrange(0,len(buf),4096):
32 next = max(now, bwtime + 1.0*bwcount/bwlimit)
36 bwcount = len(sub) # might be less than 4096
38 return (bwcount, bwtime)
41 def parse_remote(remote):
42 protocol = r'([a-z]+)://'
43 host = r'(?P<sb>\[)?((?(sb)[0-9a-f:]+|[^:/]+))(?(sb)\])'
47 '%s(?:%s%s)?%s' % (protocol, host, port, path), remote, re.I)
49 if not url_match.group(1) in ('ssh', 'bup', 'file'):
50 raise ClientError, 'unexpected protocol: %s' % url_match.group(1)
51 return url_match.group(1,3,4,5)
53 rs = remote.split(':', 1)
54 if len(rs) == 1 or rs[0] in ('', '-'):
55 return 'file', None, None, rs[-1]
57 return 'ssh', rs[0], None, rs[1]
61 def __init__(self, remote, create=False):
62 self._busy = self.conn = None
63 self.sock = self.p = self.pout = self.pin = None
64 is_reverse = os.environ.get('BUP_SERVER_REVERSE')
67 remote = '%s:' % is_reverse
68 (self.protocol, self.host, self.port, self.dir) = parse_remote(remote)
69 self.cachedir = git.repo('index-cache/%s'
70 % re.sub(r'[^@\w]', '_',
71 "%s:%s" % (self.host, self.dir)))
73 self.pout = os.fdopen(3, 'rb')
74 self.pin = os.fdopen(4, 'wb')
75 self.conn = Conn(self.pout, self.pin)
77 if self.protocol in ('ssh', 'file'):
79 # FIXME: ssh and file shouldn't use the same module
80 self.p = ssh.connect(self.host, self.port, 'server')
81 self.pout = self.p.stdout
82 self.pin = self.p.stdin
83 self.conn = Conn(self.pout, self.pin)
85 raise ClientError, 'connect: %s' % e, sys.exc_info()[2]
86 elif self.protocol == 'bup':
87 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
88 self.sock.connect((self.host, atoi(self.port) or 1982))
89 self.sockw = self.sock.makefile('wb')
90 self.conn = DemuxConn(self.sock.fileno(), self.sockw)
91 self._available_commands = self._get_available_commands()
92 self._require_command('init-dir')
93 self._require_command('set-dir')
95 self.dir = re.sub(r'[\r\n]', ' ', self.dir)
97 self.conn.write('init-dir %s\n' % self.dir)
99 self.conn.write('set-dir %s\n' % self.dir)
107 if e.errno == errno.EPIPE:
113 if self.conn and not self._busy:
114 self.conn.write('quit\n')
117 if self.sock and self.sockw:
119 self.sock.shutdown(socket.SHUT_WR)
130 raise ClientError('server tunnel returned exit code %d' % rv)
132 self.sock = self.p = self.pin = self.pout = None
138 raise ClientError('server exited unexpectedly with code %r'
141 return self.conn.check_ok()
142 except Exception as e:
143 raise ClientError, e, sys.exc_info()[2]
145 def check_busy(self):
147 raise ClientError('already busy with command %r' % self._busy)
149 def ensure_busy(self):
151 raise ClientError('expected to be busy, but not busy?!')
156 def _get_available_commands(self):
162 line = self.conn.readline()
163 if not line == 'Commands:\n':
164 raise ClientError('unexpected help header ' + repr(line))
166 line = self.conn.readline()
169 if not line.startswith(' '):
170 raise ClientError('unexpected help line ' + repr(line))
173 raise ClientError('unexpected help line ' + repr(line))
176 not_ok = self.check_ok()
180 return frozenset(result)
182 def _require_command(self, name):
183 if name not in self._available_commands:
184 raise ClientError('server does not appear to provide %s command'
187 def sync_indexes(self):
188 self._require_command('list-indexes')
191 mkdirp(self.cachedir)
192 # All cached idxs are extra until proven otherwise
194 for f in os.listdir(self.cachedir):
196 if f.endswith('.idx'):
199 conn.write('list-indexes\n')
200 for line in linereader(conn):
203 assert(line.find('/') < 0)
204 parts = line.split(' ')
206 if len(parts) == 2 and parts[1] == 'load' and idx not in extra:
207 # If the server requests that we load an idx and we don't
208 # already have a copy of it, it is needed
210 # Any idx that the server has heard of is proven not extra
214 debug1('client: removing extra indexes: %s\n' % extra)
216 os.unlink(os.path.join(self.cachedir, idx))
217 debug1('client: server requested load of: %s\n' % needed)
220 git.auto_midx(self.cachedir)
222 def sync_index(self, name):
223 self._require_command('send-index')
224 #debug1('requesting %r\n' % name)
226 mkdirp(self.cachedir)
227 fn = os.path.join(self.cachedir, name)
228 if os.path.exists(fn):
229 msg = "won't request existing .idx, try `bup bloom --check %s`" % fn
230 raise ClientError(msg)
231 self.conn.write('send-index %s\n' % name)
232 n = struct.unpack('!I', self.conn.read(4))[0]
234 with atomically_replaced_file(fn, 'w') as f:
236 progress('Receiving index from server: %d/%d\r' % (count, n))
237 for b in chunkyreader(self.conn, n):
240 qprogress('Receiving index from server: %d/%d\r' % (count, n))
241 progress('Receiving index from server: %d/%d, done.\n' % (count, n))
244 def _make_objcache(self):
245 return git.PackIdxList(self.cachedir)
247 def _suggest_packs(self):
250 assert(ob == 'receive-objects-v2')
251 self.conn.write('\xff\xff\xff\xff') # suspend receive-objects-v2
253 for line in linereader(self.conn):
256 debug2('%s\n' % line)
257 if line.startswith('index '):
259 debug1('client: received index suggestion: %s\n'
260 % git.shorten_hash(idx))
261 suggested.append(idx)
263 assert(line.endswith('.idx'))
264 debug1('client: completed writing pack, idx: %s\n'
265 % git.shorten_hash(line))
266 suggested.append(line)
271 for idx in suggested:
273 git.auto_midx(self.cachedir)
276 self.conn.write('%s\n' % ob)
279 def new_packwriter(self, compression_level=1,
280 max_pack_size=None, max_pack_objects=None):
281 self._require_command('receive-objects-v2')
284 self._busy = 'receive-objects-v2'
285 self.conn.write('receive-objects-v2\n')
286 return PackWriter_Remote(self.conn,
287 objcache_maker = self._make_objcache,
288 suggest_packs = self._suggest_packs,
290 onclose = self._not_busy,
291 ensure_busy = self.ensure_busy,
292 compression_level=compression_level,
293 max_pack_size=max_pack_size,
294 max_pack_objects=max_pack_objects)
296 def read_ref(self, refname):
297 self._require_command('read-ref')
299 self.conn.write('read-ref %s\n' % refname)
300 r = self.conn.readline().strip()
303 assert(len(r) == 40) # hexified sha
304 return r.decode('hex')
306 return None # nonexistent ref
308 def update_ref(self, refname, newval, oldval):
309 self._require_command('update-ref')
311 self.conn.write('update-ref %s\n%s\n%s\n'
312 % (refname, newval.encode('hex'),
313 (oldval or '').encode('hex')))
317 self._require_command('join')
320 # Send 'cat' so we'll work fine with older versions
321 self.conn.write('cat %s\n' % re.sub(r'[\n\r]', '_', id))
323 sz = struct.unpack('!I', self.conn.read(4))[0]
325 yield self.conn.read(sz)
326 # FIXME: ok to assume the only NotOk is a KerError? (it is true atm)
330 raise KeyError(str(e))
332 def cat_batch(self, refs):
333 self._require_command('cat-batch')
335 self._busy = 'cat-batch'
337 conn.write('cat-batch\n')
338 # FIXME: do we want (only) binary protocol?
341 assert '\n' not in ref
346 info = conn.readline()
347 if info == 'missing\n':
348 yield None, None, None, None
350 if not (info and info.endswith('\n')):
351 raise ClientError('Hit EOF while looking for object info: %r'
353 oidx, oid_t, size = info.split(' ')
355 cr = chunkyreader(conn, size)
356 yield oidx, oid_t, size, cr
357 detritus = next(cr, None)
359 raise ClientError('unexpected leftover data ' + repr(detritus))
361 not_ok = self.check_ok()
366 def refs(self, patterns=None, limit_to_heads=False, limit_to_tags=False):
367 patterns = patterns or tuple()
368 self._require_command('refs')
372 conn.write('refs %s %s\n' % (1 if limit_to_heads else 0,
373 1 if limit_to_tags else 0))
374 for pattern in patterns:
375 assert '\n' not in pattern
379 for line in lines_until_sentinel(conn, '\n', ClientError):
381 oidx, name = line.split(' ')
383 raise ClientError('Invalid object fingerprint in %r' % line)
385 raise ClientError('Invalid reference name in %r' % line)
386 yield name, oidx.decode('hex')
388 not_ok = self.check_ok()
393 def rev_list(self, refs, count=None, parse=None, format=None):
394 self._require_command('rev-list')
395 assert (count is None) or (isinstance(count, Integral))
397 assert '\n' not in format
401 assert '\n' not in ref
403 self._busy = 'rev-list'
405 conn.write('rev-list\n')
406 if count is not None:
407 conn.write(str(count))
417 for _ in xrange(len(refs)):
418 line = conn.readline()
420 raise ClientError('unexpected EOF')
422 assert len(line) == 40
425 for _ in xrange(len(refs)):
426 line = conn.readline()
428 raise ClientError('unexpected EOF')
429 if not line.startswith('commit '):
430 raise ClientError('unexpected line ' + repr(line))
431 yield line[7:].strip(), parse(conn)
433 not_ok = self.check_ok()
439 class PackWriter_Remote(git.PackWriter):
440 def __init__(self, conn, objcache_maker, suggest_packs,
445 max_pack_objects=None):
446 git.PackWriter.__init__(self,
447 objcache_maker=objcache_maker,
448 compression_level=compression_level,
449 max_pack_size=max_pack_size,
450 max_pack_objects=max_pack_objects)
452 self.filename = 'remote socket'
453 self.suggest_packs = suggest_packs
455 self.onclose = onclose
456 self.ensure_busy = ensure_busy
457 self._packopen = False
459 self._bwtime = time.time()
462 if not self._packopen:
464 self._packopen = True
466 def _end(self, run_midx=True):
467 assert(run_midx) # We don't support this via remote yet
468 if self._packopen and self.file:
469 self.file.write('\0\0\0\0')
470 self._packopen = False
471 self.onclose() # Unbusy
473 return self.suggest_packs() # Returns last idx received
481 raise ClientError("don't know how to abort remote pack writing")
483 def _raw_write(self, datalist, sha):
485 if not self._packopen:
488 data = ''.join(datalist)
491 crc = zlib.crc32(data) & 0xffffffff
492 outbuf = ''.join((struct.pack('!I', len(data) + 20 + 4),
494 struct.pack('!I', crc),
497 (self._bwcount, self._bwtime) = _raw_write_bwlimit(
498 self.file, outbuf, self._bwcount, self._bwtime)
500 raise ClientError, e, sys.exc_info()[2]
501 self.outbytes += len(data)
504 if self.file.has_input():
506 self.objcache.refresh()