2 import errno, os, re, struct, sys, time, zlib
4 from bup import git, ssh
5 from bup.helpers import (Conn, atomically_replaced_file, chunkyreader, debug1,
6 debug2, linereader, lines_until_sentinel,
7 mkdirp, progress, qprogress)
13 class ClientError(Exception):
17 def _raw_write_bwlimit(f, buf, bwcount, bwtime):
20 return (len(buf), time.time())
22 # We want to write in reasonably large blocks, but not so large that
23 # they're likely to overflow a router's queue. So our bwlimit timing
24 # has to be pretty granular. Also, if it takes too long from one
25 # transmit to the next, we can't just make up for lost time to bring
26 # the average back up to bwlimit - that will risk overflowing the
27 # outbound queue, which defeats the purpose. So if we fall behind
28 # by more than one block delay, we shouldn't ever try to catch up.
29 for i in xrange(0,len(buf),4096):
31 next = max(now, bwtime + 1.0*bwcount/bwlimit)
35 bwcount = len(sub) # might be less than 4096
37 return (bwcount, bwtime)
40 def parse_remote(remote):
41 protocol = r'([a-z]+)://'
42 host = r'(?P<sb>\[)?((?(sb)[0-9a-f:]+|[^:/]+))(?(sb)\])'
46 '%s(?:%s%s)?%s' % (protocol, host, port, path), remote, re.I)
48 if not url_match.group(1) in ('ssh', 'bup', 'file'):
49 raise ClientError, 'unexpected protocol: %s' % url_match.group(1)
50 return url_match.group(1,3,4,5)
52 rs = remote.split(':', 1)
53 if len(rs) == 1 or rs[0] in ('', '-'):
54 return 'file', None, None, rs[-1]
56 return 'ssh', rs[0], None, rs[1]
60 def __init__(self, remote, create=False):
61 self._busy = self.conn = None
62 self.sock = self.p = self.pout = self.pin = None
63 is_reverse = os.environ.get('BUP_SERVER_REVERSE')
66 remote = '%s:' % is_reverse
67 (self.protocol, self.host, self.port, self.dir) = parse_remote(remote)
68 self.cachedir = git.repo('index-cache/%s'
69 % re.sub(r'[^@\w]', '_',
70 "%s:%s" % (self.host, self.dir)))
72 self.pout = os.fdopen(3, 'rb')
73 self.pin = os.fdopen(4, 'wb')
74 self.conn = Conn(self.pout, self.pin)
76 if self.protocol in ('ssh', 'file'):
78 # FIXME: ssh and file shouldn't use the same module
79 self.p = ssh.connect(self.host, self.port, 'server')
80 self.pout = self.p.stdout
81 self.pin = self.p.stdin
82 self.conn = Conn(self.pout, self.pin)
84 raise ClientError, 'connect: %s' % e, sys.exc_info()[2]
85 elif self.protocol == 'bup':
86 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
87 self.sock.connect((self.host, atoi(self.port) or 1982))
88 self.sockw = self.sock.makefile('wb')
89 self.conn = DemuxConn(self.sock.fileno(), self.sockw)
90 self._available_commands = self._get_available_commands()
91 self._require_command('init-dir')
92 self._require_command('set-dir')
94 self.dir = re.sub(r'[\r\n]', ' ', self.dir)
96 self.conn.write('init-dir %s\n' % self.dir)
98 self.conn.write('set-dir %s\n' % self.dir)
106 if e.errno == errno.EPIPE:
112 if self.conn and not self._busy:
113 self.conn.write('quit\n')
116 if self.sock and self.sockw:
118 self.sock.shutdown(socket.SHUT_WR)
129 raise ClientError('server tunnel returned exit code %d' % rv)
131 self.sock = self.p = self.pin = self.pout = None
137 raise ClientError('server exited unexpectedly with code %r'
140 return self.conn.check_ok()
141 except Exception as e:
142 raise ClientError, e, sys.exc_info()[2]
144 def check_busy(self):
146 raise ClientError('already busy with command %r' % self._busy)
148 def ensure_busy(self):
150 raise ClientError('expected to be busy, but not busy?!')
155 def _get_available_commands(self):
161 line = self.conn.readline()
162 if not line == 'Commands:\n':
163 raise ClientError('unexpected help header ' + repr(line))
165 line = self.conn.readline()
168 if not line.startswith(' '):
169 raise ClientError('unexpected help line ' + repr(line))
172 raise ClientError('unexpected help line ' + repr(line))
175 not_ok = self.check_ok()
179 return frozenset(result)
181 def _require_command(self, name):
182 if name not in self._available_commands:
183 raise ClientError('server does not appear to provide %s command'
186 def sync_indexes(self):
187 self._require_command('list-indexes')
190 mkdirp(self.cachedir)
191 # All cached idxs are extra until proven otherwise
193 for f in os.listdir(self.cachedir):
195 if f.endswith('.idx'):
198 conn.write('list-indexes\n')
199 for line in linereader(conn):
202 assert(line.find('/') < 0)
203 parts = line.split(' ')
205 if len(parts) == 2 and parts[1] == 'load' and idx not in extra:
206 # If the server requests that we load an idx and we don't
207 # already have a copy of it, it is needed
209 # Any idx that the server has heard of is proven not extra
213 debug1('client: removing extra indexes: %s\n' % extra)
215 os.unlink(os.path.join(self.cachedir, idx))
216 debug1('client: server requested load of: %s\n' % needed)
219 git.auto_midx(self.cachedir)
221 def sync_index(self, name):
222 self._require_command('send-index')
223 #debug1('requesting %r\n' % name)
225 mkdirp(self.cachedir)
226 fn = os.path.join(self.cachedir, name)
227 if os.path.exists(fn):
228 msg = "won't request existing .idx, try `bup bloom --check %s`" % fn
229 raise ClientError(msg)
230 self.conn.write('send-index %s\n' % name)
231 n = struct.unpack('!I', self.conn.read(4))[0]
233 with atomically_replaced_file(fn, 'w') as f:
235 progress('Receiving index from server: %d/%d\r' % (count, n))
236 for b in chunkyreader(self.conn, n):
239 qprogress('Receiving index from server: %d/%d\r' % (count, n))
240 progress('Receiving index from server: %d/%d, done.\n' % (count, n))
243 def _make_objcache(self):
244 return git.PackIdxList(self.cachedir)
246 def _suggest_packs(self):
249 assert(ob == 'receive-objects-v2')
250 self.conn.write('\xff\xff\xff\xff') # suspend receive-objects-v2
252 for line in linereader(self.conn):
255 debug2('%s\n' % line)
256 if line.startswith('index '):
258 debug1('client: received index suggestion: %s\n'
259 % git.shorten_hash(idx))
260 suggested.append(idx)
262 assert(line.endswith('.idx'))
263 debug1('client: completed writing pack, idx: %s\n'
264 % git.shorten_hash(line))
265 suggested.append(line)
270 for idx in suggested:
272 git.auto_midx(self.cachedir)
275 self.conn.write('%s\n' % ob)
278 def new_packwriter(self, compression_level=1,
279 max_pack_size=None, max_pack_objects=None):
280 self._require_command('receive-objects-v2')
283 self._busy = 'receive-objects-v2'
284 self.conn.write('receive-objects-v2\n')
285 return PackWriter_Remote(self.conn,
286 objcache_maker = self._make_objcache,
287 suggest_packs = self._suggest_packs,
289 onclose = self._not_busy,
290 ensure_busy = self.ensure_busy,
291 compression_level=compression_level,
292 max_pack_size=max_pack_size,
293 max_pack_objects=max_pack_objects)
295 def read_ref(self, refname):
296 self._require_command('read-ref')
298 self.conn.write('read-ref %s\n' % refname)
299 r = self.conn.readline().strip()
302 assert(len(r) == 40) # hexified sha
303 return r.decode('hex')
305 return None # nonexistent ref
307 def update_ref(self, refname, newval, oldval):
308 self._require_command('update-ref')
310 self.conn.write('update-ref %s\n%s\n%s\n'
311 % (refname, newval.encode('hex'),
312 (oldval or '').encode('hex')))
316 self._require_command('join')
319 # Send 'cat' so we'll work fine with older versions
320 self.conn.write('cat %s\n' % re.sub(r'[\n\r]', '_', id))
322 sz = struct.unpack('!I', self.conn.read(4))[0]
324 yield self.conn.read(sz)
325 # FIXME: ok to assume the only NotOk is a KerError? (it is true atm)
329 raise KeyError(str(e))
331 def cat_batch(self, refs):
332 self._require_command('cat-batch')
334 self._busy = 'cat-batch'
336 conn.write('cat-batch\n')
337 # FIXME: do we want (only) binary protocol?
340 assert '\n' not in ref
345 info = conn.readline()
346 if info == 'missing\n':
347 yield None, None, None, None
349 if not (info and info.endswith('\n')):
350 raise ClientError('Hit EOF while looking for object info: %r'
352 oidx, oid_t, size = info.split(' ')
354 cr = chunkyreader(conn, size)
355 yield oidx, oid_t, size, cr
356 detritus = next(cr, None)
358 raise ClientError('unexpected leftover data ' + repr(detritus))
360 not_ok = self.check_ok()
365 def refs(self, patterns=None, limit_to_heads=False, limit_to_tags=False):
366 patterns = patterns or tuple()
367 self._require_command('refs')
371 conn.write('refs %s %s\n' % (1 if limit_to_heads else 0,
372 1 if limit_to_tags else 0))
373 for pattern in patterns:
374 assert '\n' not in pattern
378 for line in lines_until_sentinel(conn, '\n', ClientError):
380 oidx, name = line.split(' ')
382 raise ClientError('Invalid object fingerprint in %r' % line)
384 raise ClientError('Invalid reference name in %r' % line)
385 yield name, oidx.decode('hex')
387 not_ok = self.check_ok()
392 def rev_list(self, refs, count=None, parse=None, format=None):
393 self._require_command('rev-list')
394 assert (count is None) or (isinstance(count, Integral))
396 assert '\n' not in format
400 assert '\n' not in ref
402 self._busy = 'rev-list'
404 conn.write('rev-list\n')
405 if count is not None:
406 conn.write(str(count))
416 for _ in xrange(len(refs)):
417 line = conn.readline()
419 raise ClientError('unexpected EOF')
421 assert len(line) == 40
424 for _ in xrange(len(refs)):
425 line = conn.readline()
427 raise ClientError('unexpected EOF')
428 if not line.startswith('commit '):
429 raise ClientError('unexpected line ' + repr(line))
430 yield line[7:].strip(), parse(conn)
432 not_ok = self.check_ok()
438 class PackWriter_Remote(git.PackWriter):
439 def __init__(self, conn, objcache_maker, suggest_packs,
444 max_pack_objects=None):
445 git.PackWriter.__init__(self,
446 objcache_maker=objcache_maker,
447 compression_level=compression_level,
448 max_pack_size=max_pack_size,
449 max_pack_objects=max_pack_objects)
451 self.filename = 'remote socket'
452 self.suggest_packs = suggest_packs
454 self.onclose = onclose
455 self.ensure_busy = ensure_busy
456 self._packopen = False
458 self._bwtime = time.time()
461 if not self._packopen:
463 self._packopen = True
465 def _end(self, run_midx=True):
466 assert(run_midx) # We don't support this via remote yet
467 if self._packopen and self.file:
468 self.file.write('\0\0\0\0')
469 self._packopen = False
470 self.onclose() # Unbusy
472 return self.suggest_packs() # Returns last idx received
480 raise ClientError("don't know how to abort remote pack writing")
482 def _raw_write(self, datalist, sha):
484 if not self._packopen:
487 data = ''.join(datalist)
490 crc = zlib.crc32(data) & 0xffffffff
491 outbuf = ''.join((struct.pack('!I', len(data) + 20 + 4),
493 struct.pack('!I', crc),
496 (self._bwcount, self._bwtime) = _raw_write_bwlimit(
497 self.file, outbuf, self._bwcount, self._bwtime)
499 raise ClientError, e, sys.exc_info()[2]
500 self.outbytes += len(data)
503 if self.file.has_input():
505 self.objcache.refresh()