2 import errno, os, re, struct, sys, time, zlib
4 from bup import git, ssh
5 from bup.helpers import (Conn, atomically_replaced_file, chunkyreader, debug1,
6 debug2, linereader, mkdirp, progress, qprogress)
12 class ClientError(Exception):
16 def _raw_write_bwlimit(f, buf, bwcount, bwtime):
19 return (len(buf), time.time())
21 # We want to write in reasonably large blocks, but not so large that
22 # they're likely to overflow a router's queue. So our bwlimit timing
23 # has to be pretty granular. Also, if it takes too long from one
24 # transmit to the next, we can't just make up for lost time to bring
25 # the average back up to bwlimit - that will risk overflowing the
26 # outbound queue, which defeats the purpose. So if we fall behind
27 # by more than one block delay, we shouldn't ever try to catch up.
28 for i in xrange(0,len(buf),4096):
30 next = max(now, bwtime + 1.0*bwcount/bwlimit)
34 bwcount = len(sub) # might be less than 4096
36 return (bwcount, bwtime)
39 def parse_remote(remote):
40 protocol = r'([a-z]+)://'
41 host = r'(?P<sb>\[)?((?(sb)[0-9a-f:]+|[^:/]+))(?(sb)\])'
45 '%s(?:%s%s)?%s' % (protocol, host, port, path), remote, re.I)
47 if not url_match.group(1) in ('ssh', 'bup', 'file'):
48 raise ClientError, 'unexpected protocol: %s' % url_match.group(1)
49 return url_match.group(1,3,4,5)
51 rs = remote.split(':', 1)
52 if len(rs) == 1 or rs[0] in ('', '-'):
53 return 'file', None, None, rs[-1]
55 return 'ssh', rs[0], None, rs[1]
59 def __init__(self, remote, create=False):
60 self._busy = self.conn = None
61 self.sock = self.p = self.pout = self.pin = None
62 is_reverse = os.environ.get('BUP_SERVER_REVERSE')
65 remote = '%s:' % is_reverse
66 (self.protocol, self.host, self.port, self.dir) = parse_remote(remote)
67 self.cachedir = git.repo('index-cache/%s'
68 % re.sub(r'[^@\w]', '_',
69 "%s:%s" % (self.host, self.dir)))
71 self.pout = os.fdopen(3, 'rb')
72 self.pin = os.fdopen(4, 'wb')
73 self.conn = Conn(self.pout, self.pin)
75 if self.protocol in ('ssh', 'file'):
77 # FIXME: ssh and file shouldn't use the same module
78 self.p = ssh.connect(self.host, self.port, 'server')
79 self.pout = self.p.stdout
80 self.pin = self.p.stdin
81 self.conn = Conn(self.pout, self.pin)
83 raise ClientError, 'connect: %s' % e, sys.exc_info()[2]
84 elif self.protocol == 'bup':
85 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
86 self.sock.connect((self.host, atoi(self.port) or 1982))
87 self.sockw = self.sock.makefile('wb')
88 self.conn = DemuxConn(self.sock.fileno(), self.sockw)
89 self._available_commands = self._get_available_commands()
90 self._require_command('init-dir')
91 self._require_command('set-dir')
93 self.dir = re.sub(r'[\r\n]', ' ', self.dir)
95 self.conn.write('init-dir %s\n' % self.dir)
97 self.conn.write('set-dir %s\n' % self.dir)
105 if e.errno == errno.EPIPE:
111 if self.conn and not self._busy:
112 self.conn.write('quit\n')
115 if self.sock and self.sockw:
117 self.sock.shutdown(socket.SHUT_WR)
128 raise ClientError('server tunnel returned exit code %d' % rv)
130 self.sock = self.p = self.pin = self.pout = None
136 raise ClientError('server exited unexpectedly with code %r'
139 return self.conn.check_ok()
140 except Exception as e:
141 raise ClientError, e, sys.exc_info()[2]
143 def check_busy(self):
145 raise ClientError('already busy with command %r' % self._busy)
147 def ensure_busy(self):
149 raise ClientError('expected to be busy, but not busy?!')
154 def _get_available_commands(self):
160 line = self.conn.readline()
161 if not line == 'Commands:\n':
162 raise ClientError('unexpected help header ' + repr(line))
164 line = self.conn.readline()
167 if not line.startswith(' '):
168 raise ClientError('unexpected help line ' + repr(line))
171 raise ClientError('unexpected help line ' + repr(line))
174 not_ok = self.check_ok()
178 return frozenset(result)
180 def _require_command(self, name):
181 if name not in self._available_commands:
182 raise ClientError('server does not appear to provide %s command'
185 def sync_indexes(self):
186 self._require_command('list-indexes')
189 mkdirp(self.cachedir)
190 # All cached idxs are extra until proven otherwise
192 for f in os.listdir(self.cachedir):
194 if f.endswith('.idx'):
197 conn.write('list-indexes\n')
198 for line in linereader(conn):
201 assert(line.find('/') < 0)
202 parts = line.split(' ')
204 if len(parts) == 2 and parts[1] == 'load' and idx not in extra:
205 # If the server requests that we load an idx and we don't
206 # already have a copy of it, it is needed
208 # Any idx that the server has heard of is proven not extra
212 debug1('client: removing extra indexes: %s\n' % extra)
214 os.unlink(os.path.join(self.cachedir, idx))
215 debug1('client: server requested load of: %s\n' % needed)
218 git.auto_midx(self.cachedir)
220 def sync_index(self, name):
221 self._require_command('send-index')
222 #debug1('requesting %r\n' % name)
224 mkdirp(self.cachedir)
225 fn = os.path.join(self.cachedir, name)
226 if os.path.exists(fn):
227 msg = "won't request existing .idx, try `bup bloom --check %s`" % fn
228 raise ClientError(msg)
229 self.conn.write('send-index %s\n' % name)
230 n = struct.unpack('!I', self.conn.read(4))[0]
232 with atomically_replaced_file(fn, 'w') as f:
234 progress('Receiving index from server: %d/%d\r' % (count, n))
235 for b in chunkyreader(self.conn, n):
238 qprogress('Receiving index from server: %d/%d\r' % (count, n))
239 progress('Receiving index from server: %d/%d, done.\n' % (count, n))
242 def _make_objcache(self):
243 return git.PackIdxList(self.cachedir)
245 def _suggest_packs(self):
248 assert(ob == 'receive-objects-v2')
249 self.conn.write('\xff\xff\xff\xff') # suspend receive-objects-v2
251 for line in linereader(self.conn):
254 debug2('%s\n' % line)
255 if line.startswith('index '):
257 debug1('client: received index suggestion: %s\n'
258 % git.shorten_hash(idx))
259 suggested.append(idx)
261 assert(line.endswith('.idx'))
262 debug1('client: completed writing pack, idx: %s\n'
263 % git.shorten_hash(line))
264 suggested.append(line)
269 for idx in suggested:
271 git.auto_midx(self.cachedir)
274 self.conn.write('%s\n' % ob)
277 def new_packwriter(self, compression_level=1,
278 max_pack_size=None, max_pack_objects=None):
279 self._require_command('receive-objects-v2')
282 self._busy = 'receive-objects-v2'
283 self.conn.write('receive-objects-v2\n')
284 return PackWriter_Remote(self.conn,
285 objcache_maker = self._make_objcache,
286 suggest_packs = self._suggest_packs,
288 onclose = self._not_busy,
289 ensure_busy = self.ensure_busy,
290 compression_level=compression_level,
291 max_pack_size=max_pack_size,
292 max_pack_objects=max_pack_objects)
294 def read_ref(self, refname):
295 self._require_command('read-ref')
297 self.conn.write('read-ref %s\n' % refname)
298 r = self.conn.readline().strip()
301 assert(len(r) == 40) # hexified sha
302 return r.decode('hex')
304 return None # nonexistent ref
306 def update_ref(self, refname, newval, oldval):
307 self._require_command('update-ref')
309 self.conn.write('update-ref %s\n%s\n%s\n'
310 % (refname, newval.encode('hex'),
311 (oldval or '').encode('hex')))
315 self._require_command('cat')
318 self.conn.write('cat %s\n' % re.sub(r'[\n\r]', '_', id))
320 sz = struct.unpack('!I', self.conn.read(4))[0]
322 yield self.conn.read(sz)
326 raise KeyError(str(e))
329 class PackWriter_Remote(git.PackWriter):
330 def __init__(self, conn, objcache_maker, suggest_packs,
335 max_pack_objects=None):
336 git.PackWriter.__init__(self,
337 objcache_maker=objcache_maker,
338 compression_level=compression_level,
339 max_pack_size=max_pack_size,
340 max_pack_objects=max_pack_objects)
342 self.filename = 'remote socket'
343 self.suggest_packs = suggest_packs
345 self.onclose = onclose
346 self.ensure_busy = ensure_busy
347 self._packopen = False
349 self._bwtime = time.time()
352 if not self._packopen:
354 self._packopen = True
356 def _end(self, run_midx=True):
357 assert(run_midx) # We don't support this via remote yet
358 if self._packopen and self.file:
359 self.file.write('\0\0\0\0')
360 self._packopen = False
361 self.onclose() # Unbusy
363 return self.suggest_packs() # Returns last idx received
371 raise ClientError("don't know how to abort remote pack writing")
373 def _raw_write(self, datalist, sha):
375 if not self._packopen:
378 data = ''.join(datalist)
381 crc = zlib.crc32(data) & 0xffffffff
382 outbuf = ''.join((struct.pack('!I', len(data) + 20 + 4),
384 struct.pack('!I', crc),
387 (self._bwcount, self._bwtime) = _raw_write_bwlimit(
388 self.file, outbuf, self._bwcount, self._bwtime)
390 raise ClientError, e, sys.exc_info()[2]
391 self.outbytes += len(data)
394 if self.file.has_input():
396 self.objcache.refresh()