]> arthur.barton.de Git - bup.git/blob - lib/bup/client.py
Migrate all xrange calls to range in bup.compat
[bup.git] / lib / bup / client.py
1
2 from __future__ import absolute_import
3 import errno, os, re, struct, sys, time, zlib
4
5 from bup import git, ssh
6 from bup.compat import range
7 from bup.helpers import (Conn, atomically_replaced_file, chunkyreader, debug1,
8                          debug2, linereader, lines_until_sentinel,
9                          mkdirp, progress, qprogress)
10
11
12 bwlimit = None
13
14
15 class ClientError(Exception):
16     pass
17
18
19 def _raw_write_bwlimit(f, buf, bwcount, bwtime):
20     if not bwlimit:
21         f.write(buf)
22         return (len(buf), time.time())
23     else:
24         # We want to write in reasonably large blocks, but not so large that
25         # they're likely to overflow a router's queue.  So our bwlimit timing
26         # has to be pretty granular.  Also, if it takes too long from one
27         # transmit to the next, we can't just make up for lost time to bring
28         # the average back up to bwlimit - that will risk overflowing the
29         # outbound queue, which defeats the purpose.  So if we fall behind
30         # by more than one block delay, we shouldn't ever try to catch up.
31         for i in range(0,len(buf),4096):
32             now = time.time()
33             next = max(now, bwtime + 1.0*bwcount/bwlimit)
34             time.sleep(next-now)
35             sub = buf[i:i+4096]
36             f.write(sub)
37             bwcount = len(sub)  # might be less than 4096
38             bwtime = next
39         return (bwcount, bwtime)
40
41
42 def parse_remote(remote):
43     protocol = r'([a-z]+)://'
44     host = r'(?P<sb>\[)?((?(sb)[0-9a-f:]+|[^:/]+))(?(sb)\])'
45     port = r'(?::(\d+))?'
46     path = r'(/.*)?'
47     url_match = re.match(
48             '%s(?:%s%s)?%s' % (protocol, host, port, path), remote, re.I)
49     if url_match:
50         if not url_match.group(1) in ('ssh', 'bup', 'file'):
51             raise ClientError, 'unexpected protocol: %s' % url_match.group(1)
52         return url_match.group(1,3,4,5)
53     else:
54         rs = remote.split(':', 1)
55         if len(rs) == 1 or rs[0] in ('', '-'):
56             return 'file', None, None, rs[-1]
57         else:
58             return 'ssh', rs[0], None, rs[1]
59
60
61 class Client:
62     def __init__(self, remote, create=False):
63         self._busy = self.conn = None
64         self.sock = self.p = self.pout = self.pin = None
65         is_reverse = os.environ.get('BUP_SERVER_REVERSE')
66         if is_reverse:
67             assert(not remote)
68             remote = '%s:' % is_reverse
69         (self.protocol, self.host, self.port, self.dir) = parse_remote(remote)
70         self.cachedir = git.repo('index-cache/%s'
71                                  % re.sub(r'[^@\w]', '_', 
72                                           "%s:%s" % (self.host, self.dir)))
73         if is_reverse:
74             self.pout = os.fdopen(3, 'rb')
75             self.pin = os.fdopen(4, 'wb')
76             self.conn = Conn(self.pout, self.pin)
77         else:
78             if self.protocol in ('ssh', 'file'):
79                 try:
80                     # FIXME: ssh and file shouldn't use the same module
81                     self.p = ssh.connect(self.host, self.port, 'server')
82                     self.pout = self.p.stdout
83                     self.pin = self.p.stdin
84                     self.conn = Conn(self.pout, self.pin)
85                 except OSError as e:
86                     raise ClientError, 'connect: %s' % e, sys.exc_info()[2]
87             elif self.protocol == 'bup':
88                 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
89                 self.sock.connect((self.host, atoi(self.port) or 1982))
90                 self.sockw = self.sock.makefile('wb')
91                 self.conn = DemuxConn(self.sock.fileno(), self.sockw)
92         self._available_commands = self._get_available_commands()
93         self._require_command('init-dir')
94         self._require_command('set-dir')
95         if self.dir:
96             self.dir = re.sub(r'[\r\n]', ' ', self.dir)
97             if create:
98                 self.conn.write('init-dir %s\n' % self.dir)
99             else:
100                 self.conn.write('set-dir %s\n' % self.dir)
101             self.check_ok()
102         self.sync_indexes()
103
104     def __del__(self):
105         try:
106             self.close()
107         except IOError as e:
108             if e.errno == errno.EPIPE:
109                 pass
110             else:
111                 raise
112
113     def close(self):
114         if self.conn and not self._busy:
115             self.conn.write('quit\n')
116         if self.pin:
117             self.pin.close()
118         if self.sock and self.sockw:
119             self.sockw.close()
120             self.sock.shutdown(socket.SHUT_WR)
121         if self.conn:
122             self.conn.close()
123         if self.pout:
124             self.pout.close()
125         if self.sock:
126             self.sock.close()
127         if self.p:
128             self.p.wait()
129             rv = self.p.wait()
130             if rv:
131                 raise ClientError('server tunnel returned exit code %d' % rv)
132         self.conn = None
133         self.sock = self.p = self.pin = self.pout = None
134
135     def check_ok(self):
136         if self.p:
137             rv = self.p.poll()
138             if rv != None:
139                 raise ClientError('server exited unexpectedly with code %r'
140                                   % rv)
141         try:
142             return self.conn.check_ok()
143         except Exception as e:
144             raise ClientError, e, sys.exc_info()[2]
145
146     def check_busy(self):
147         if self._busy:
148             raise ClientError('already busy with command %r' % self._busy)
149         
150     def ensure_busy(self):
151         if not self._busy:
152             raise ClientError('expected to be busy, but not busy?!')
153         
154     def _not_busy(self):
155         self._busy = None
156
157     def _get_available_commands(self):
158         self.check_busy()
159         self._busy = 'help'
160         conn = self.conn
161         conn.write('help\n')
162         result = set()
163         line = self.conn.readline()
164         if not line == 'Commands:\n':
165             raise ClientError('unexpected help header ' + repr(line))
166         while True:
167             line = self.conn.readline()
168             if line == '\n':
169                 break
170             if not line.startswith('    '):
171                 raise ClientError('unexpected help line ' + repr(line))
172             cmd = line.strip()
173             if not cmd:
174                 raise ClientError('unexpected help line ' + repr(line))
175             result.add(cmd)
176         # FIXME: confusing
177         not_ok = self.check_ok()
178         if not_ok:
179             raise not_ok
180         self._not_busy()
181         return frozenset(result)
182
183     def _require_command(self, name):
184         if name not in self._available_commands:
185             raise ClientError('server does not appear to provide %s command'
186                               % name)
187
188     def sync_indexes(self):
189         self._require_command('list-indexes')
190         self.check_busy()
191         conn = self.conn
192         mkdirp(self.cachedir)
193         # All cached idxs are extra until proven otherwise
194         extra = set()
195         for f in os.listdir(self.cachedir):
196             debug1('%s\n' % f)
197             if f.endswith('.idx'):
198                 extra.add(f)
199         needed = set()
200         conn.write('list-indexes\n')
201         for line in linereader(conn):
202             if not line:
203                 break
204             assert(line.find('/') < 0)
205             parts = line.split(' ')
206             idx = parts[0]
207             if len(parts) == 2 and parts[1] == 'load' and idx not in extra:
208                 # If the server requests that we load an idx and we don't
209                 # already have a copy of it, it is needed
210                 needed.add(idx)
211             # Any idx that the server has heard of is proven not extra
212             extra.discard(idx)
213
214         self.check_ok()
215         debug1('client: removing extra indexes: %s\n' % extra)
216         for idx in extra:
217             os.unlink(os.path.join(self.cachedir, idx))
218         debug1('client: server requested load of: %s\n' % needed)
219         for idx in needed:
220             self.sync_index(idx)
221         git.auto_midx(self.cachedir)
222
223     def sync_index(self, name):
224         self._require_command('send-index')
225         #debug1('requesting %r\n' % name)
226         self.check_busy()
227         mkdirp(self.cachedir)
228         fn = os.path.join(self.cachedir, name)
229         if os.path.exists(fn):
230             msg = "won't request existing .idx, try `bup bloom --check %s`" % fn
231             raise ClientError(msg)
232         self.conn.write('send-index %s\n' % name)
233         n = struct.unpack('!I', self.conn.read(4))[0]
234         assert(n)
235         with atomically_replaced_file(fn, 'w') as f:
236             count = 0
237             progress('Receiving index from server: %d/%d\r' % (count, n))
238             for b in chunkyreader(self.conn, n):
239                 f.write(b)
240                 count += len(b)
241                 qprogress('Receiving index from server: %d/%d\r' % (count, n))
242             progress('Receiving index from server: %d/%d, done.\n' % (count, n))
243             self.check_ok()
244
245     def _make_objcache(self):
246         return git.PackIdxList(self.cachedir)
247
248     def _suggest_packs(self):
249         ob = self._busy
250         if ob:
251             assert(ob == 'receive-objects-v2')
252             self.conn.write('\xff\xff\xff\xff')  # suspend receive-objects-v2
253         suggested = []
254         for line in linereader(self.conn):
255             if not line:
256                 break
257             debug2('%s\n' % line)
258             if line.startswith('index '):
259                 idx = line[6:]
260                 debug1('client: received index suggestion: %s\n'
261                        % git.shorten_hash(idx))
262                 suggested.append(idx)
263             else:
264                 assert(line.endswith('.idx'))
265                 debug1('client: completed writing pack, idx: %s\n'
266                        % git.shorten_hash(line))
267                 suggested.append(line)
268         self.check_ok()
269         if ob:
270             self._busy = None
271         idx = None
272         for idx in suggested:
273             self.sync_index(idx)
274         git.auto_midx(self.cachedir)
275         if ob:
276             self._busy = ob
277             self.conn.write('%s\n' % ob)
278         return idx
279
280     def new_packwriter(self, compression_level=1,
281                        max_pack_size=None, max_pack_objects=None):
282         self._require_command('receive-objects-v2')
283         self.check_busy()
284         def _set_busy():
285             self._busy = 'receive-objects-v2'
286             self.conn.write('receive-objects-v2\n')
287         return PackWriter_Remote(self.conn,
288                                  objcache_maker = self._make_objcache,
289                                  suggest_packs = self._suggest_packs,
290                                  onopen = _set_busy,
291                                  onclose = self._not_busy,
292                                  ensure_busy = self.ensure_busy,
293                                  compression_level=compression_level,
294                                  max_pack_size=max_pack_size,
295                                  max_pack_objects=max_pack_objects)
296
297     def read_ref(self, refname):
298         self._require_command('read-ref')
299         self.check_busy()
300         self.conn.write('read-ref %s\n' % refname)
301         r = self.conn.readline().strip()
302         self.check_ok()
303         if r:
304             assert(len(r) == 40)   # hexified sha
305             return r.decode('hex')
306         else:
307             return None   # nonexistent ref
308
309     def update_ref(self, refname, newval, oldval):
310         self._require_command('update-ref')
311         self.check_busy()
312         self.conn.write('update-ref %s\n%s\n%s\n' 
313                         % (refname, newval.encode('hex'),
314                            (oldval or '').encode('hex')))
315         self.check_ok()
316
317     def join(self, id):
318         self._require_command('join')
319         self.check_busy()
320         self._busy = 'join'
321         # Send 'cat' so we'll work fine with older versions
322         self.conn.write('cat %s\n' % re.sub(r'[\n\r]', '_', id))
323         while 1:
324             sz = struct.unpack('!I', self.conn.read(4))[0]
325             if not sz: break
326             yield self.conn.read(sz)
327         # FIXME: ok to assume the only NotOk is a KerError? (it is true atm)
328         e = self.check_ok()
329         self._not_busy()
330         if e:
331             raise KeyError(str(e))
332
333     def cat_batch(self, refs):
334         self._require_command('cat-batch')
335         self.check_busy()
336         self._busy = 'cat-batch'
337         conn = self.conn
338         conn.write('cat-batch\n')
339         # FIXME: do we want (only) binary protocol?
340         for ref in refs:
341             assert ref
342             assert '\n' not in ref
343             conn.write(ref)
344             conn.write('\n')
345         conn.write('\n')
346         for ref in refs:
347             info = conn.readline()
348             if info == 'missing\n':
349                 yield None, None, None, None
350                 continue
351             if not (info and info.endswith('\n')):
352                 raise ClientError('Hit EOF while looking for object info: %r'
353                                   % info)
354             oidx, oid_t, size = info.split(' ')
355             size = int(size)
356             cr = chunkyreader(conn, size)
357             yield oidx, oid_t, size, cr
358             detritus = next(cr, None)
359             if detritus:
360                 raise ClientError('unexpected leftover data ' + repr(detritus))
361         # FIXME: confusing
362         not_ok = self.check_ok()
363         if not_ok:
364             raise not_ok
365         self._not_busy()
366
367     def refs(self, patterns=None, limit_to_heads=False, limit_to_tags=False):
368         patterns = patterns or tuple()
369         self._require_command('refs')
370         self.check_busy()
371         self._busy = 'refs'
372         conn = self.conn
373         conn.write('refs %s %s\n' % (1 if limit_to_heads else 0,
374                                      1 if limit_to_tags else 0))
375         for pattern in patterns:
376             assert '\n' not in pattern
377             conn.write(pattern)
378             conn.write('\n')
379         conn.write('\n')
380         for line in lines_until_sentinel(conn, '\n', ClientError):
381             line = line[:-1]
382             oidx, name = line.split(' ')
383             if len(oidx) != 40:
384                 raise ClientError('Invalid object fingerprint in %r' % line)
385             if not name:
386                 raise ClientError('Invalid reference name in %r' % line)
387             yield name, oidx.decode('hex')
388         # FIXME: confusing
389         not_ok = self.check_ok()
390         if not_ok:
391             raise not_ok
392         self._not_busy()
393
394     def rev_list(self, refs, count=None, parse=None, format=None):
395         self._require_command('rev-list')
396         assert (count is None) or (isinstance(count, Integral))
397         if format:
398             assert '\n' not in format
399             assert parse
400         for ref in refs:
401             assert ref
402             assert '\n' not in ref
403         self.check_busy()
404         self._busy = 'rev-list'
405         conn = self.conn
406         conn.write('rev-list\n')
407         if count is not None:
408             conn.write(str(count))
409         conn.write('\n')
410         if format:
411             conn.write(format)
412         conn.write('\n')
413         for ref in refs:
414             conn.write(ref)
415             conn.write('\n')
416         conn.write('\n')
417         if not format:
418             for _ in range(len(refs)):
419                 line = conn.readline()
420                 if not line:
421                     raise ClientError('unexpected EOF')
422                 line = line.strip()
423                 assert len(line) == 40
424                 yield line
425         else:
426             for _ in range(len(refs)):
427                 line = conn.readline()
428                 if not line:
429                     raise ClientError('unexpected EOF')
430                 if not line.startswith('commit '):
431                     raise ClientError('unexpected line ' + repr(line))
432                 yield line[7:].strip(), parse(conn)
433         # FIXME: confusing
434         not_ok = self.check_ok()
435         if not_ok:
436             raise not_ok
437         self._not_busy()
438
439
440 class PackWriter_Remote(git.PackWriter):
441     def __init__(self, conn, objcache_maker, suggest_packs,
442                  onopen, onclose,
443                  ensure_busy,
444                  compression_level=1,
445                  max_pack_size=None,
446                  max_pack_objects=None):
447         git.PackWriter.__init__(self,
448                                 objcache_maker=objcache_maker,
449                                 compression_level=compression_level,
450                                 max_pack_size=max_pack_size,
451                                 max_pack_objects=max_pack_objects)
452         self.file = conn
453         self.filename = 'remote socket'
454         self.suggest_packs = suggest_packs
455         self.onopen = onopen
456         self.onclose = onclose
457         self.ensure_busy = ensure_busy
458         self._packopen = False
459         self._bwcount = 0
460         self._bwtime = time.time()
461
462     def _open(self):
463         if not self._packopen:
464             self.onopen()
465             self._packopen = True
466
467     def _end(self, run_midx=True):
468         assert(run_midx)  # We don't support this via remote yet
469         if self._packopen and self.file:
470             self.file.write('\0\0\0\0')
471             self._packopen = False
472             self.onclose() # Unbusy
473             self.objcache = None
474             return self.suggest_packs() # Returns last idx received
475
476     def close(self):
477         id = self._end()
478         self.file = None
479         return id
480
481     def abort(self):
482         raise ClientError("don't know how to abort remote pack writing")
483
484     def _raw_write(self, datalist, sha):
485         assert(self.file)
486         if not self._packopen:
487             self._open()
488         self.ensure_busy()
489         data = ''.join(datalist)
490         assert(data)
491         assert(sha)
492         crc = zlib.crc32(data) & 0xffffffff
493         outbuf = ''.join((struct.pack('!I', len(data) + 20 + 4),
494                           sha,
495                           struct.pack('!I', crc),
496                           data))
497         try:
498             (self._bwcount, self._bwtime) = _raw_write_bwlimit(
499                     self.file, outbuf, self._bwcount, self._bwtime)
500         except IOError as e:
501             raise ClientError, e, sys.exc_info()[2]
502         self.outbytes += len(data)
503         self.count += 1
504
505         if self.file.has_input():
506             self.suggest_packs()
507             self.objcache.refresh()
508
509         return sha, crc