]> arthur.barton.de Git - bup.git/blob - lib/bup/client.py
7b786eb0dc1aa0d0885842c9655c65c20a4ddedc
[bup.git] / lib / bup / client.py
1
2 import errno, os, re, struct, sys, time, zlib
3
4 from bup import git, ssh
5 from bup.helpers import (Conn, atomically_replaced_file, chunkyreader, debug1,
6                          debug2, linereader, lines_until_sentinel,
7                          mkdirp, progress, qprogress)
8
9
10 bwlimit = None
11
12
13 class ClientError(Exception):
14     pass
15
16
17 def _raw_write_bwlimit(f, buf, bwcount, bwtime):
18     if not bwlimit:
19         f.write(buf)
20         return (len(buf), time.time())
21     else:
22         # We want to write in reasonably large blocks, but not so large that
23         # they're likely to overflow a router's queue.  So our bwlimit timing
24         # has to be pretty granular.  Also, if it takes too long from one
25         # transmit to the next, we can't just make up for lost time to bring
26         # the average back up to bwlimit - that will risk overflowing the
27         # outbound queue, which defeats the purpose.  So if we fall behind
28         # by more than one block delay, we shouldn't ever try to catch up.
29         for i in xrange(0,len(buf),4096):
30             now = time.time()
31             next = max(now, bwtime + 1.0*bwcount/bwlimit)
32             time.sleep(next-now)
33             sub = buf[i:i+4096]
34             f.write(sub)
35             bwcount = len(sub)  # might be less than 4096
36             bwtime = next
37         return (bwcount, bwtime)
38
39
40 def parse_remote(remote):
41     protocol = r'([a-z]+)://'
42     host = r'(?P<sb>\[)?((?(sb)[0-9a-f:]+|[^:/]+))(?(sb)\])'
43     port = r'(?::(\d+))?'
44     path = r'(/.*)?'
45     url_match = re.match(
46             '%s(?:%s%s)?%s' % (protocol, host, port, path), remote, re.I)
47     if url_match:
48         if not url_match.group(1) in ('ssh', 'bup', 'file'):
49             raise ClientError, 'unexpected protocol: %s' % url_match.group(1)
50         return url_match.group(1,3,4,5)
51     else:
52         rs = remote.split(':', 1)
53         if len(rs) == 1 or rs[0] in ('', '-'):
54             return 'file', None, None, rs[-1]
55         else:
56             return 'ssh', rs[0], None, rs[1]
57
58
59 class Client:
60     def __init__(self, remote, create=False):
61         self._busy = self.conn = None
62         self.sock = self.p = self.pout = self.pin = None
63         is_reverse = os.environ.get('BUP_SERVER_REVERSE')
64         if is_reverse:
65             assert(not remote)
66             remote = '%s:' % is_reverse
67         (self.protocol, self.host, self.port, self.dir) = parse_remote(remote)
68         self.cachedir = git.repo('index-cache/%s'
69                                  % re.sub(r'[^@\w]', '_', 
70                                           "%s:%s" % (self.host, self.dir)))
71         if is_reverse:
72             self.pout = os.fdopen(3, 'rb')
73             self.pin = os.fdopen(4, 'wb')
74             self.conn = Conn(self.pout, self.pin)
75         else:
76             if self.protocol in ('ssh', 'file'):
77                 try:
78                     # FIXME: ssh and file shouldn't use the same module
79                     self.p = ssh.connect(self.host, self.port, 'server')
80                     self.pout = self.p.stdout
81                     self.pin = self.p.stdin
82                     self.conn = Conn(self.pout, self.pin)
83                 except OSError as e:
84                     raise ClientError, 'connect: %s' % e, sys.exc_info()[2]
85             elif self.protocol == 'bup':
86                 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
87                 self.sock.connect((self.host, atoi(self.port) or 1982))
88                 self.sockw = self.sock.makefile('wb')
89                 self.conn = DemuxConn(self.sock.fileno(), self.sockw)
90         self._available_commands = self._get_available_commands()
91         self._require_command('init-dir')
92         self._require_command('set-dir')
93         if self.dir:
94             self.dir = re.sub(r'[\r\n]', ' ', self.dir)
95             if create:
96                 self.conn.write('init-dir %s\n' % self.dir)
97             else:
98                 self.conn.write('set-dir %s\n' % self.dir)
99             self.check_ok()
100         self.sync_indexes()
101
102     def __del__(self):
103         try:
104             self.close()
105         except IOError as e:
106             if e.errno == errno.EPIPE:
107                 pass
108             else:
109                 raise
110
111     def close(self):
112         if self.conn and not self._busy:
113             self.conn.write('quit\n')
114         if self.pin:
115             self.pin.close()
116         if self.sock and self.sockw:
117             self.sockw.close()
118             self.sock.shutdown(socket.SHUT_WR)
119         if self.conn:
120             self.conn.close()
121         if self.pout:
122             self.pout.close()
123         if self.sock:
124             self.sock.close()
125         if self.p:
126             self.p.wait()
127             rv = self.p.wait()
128             if rv:
129                 raise ClientError('server tunnel returned exit code %d' % rv)
130         self.conn = None
131         self.sock = self.p = self.pin = self.pout = None
132
133     def check_ok(self):
134         if self.p:
135             rv = self.p.poll()
136             if rv != None:
137                 raise ClientError('server exited unexpectedly with code %r'
138                                   % rv)
139         try:
140             return self.conn.check_ok()
141         except Exception as e:
142             raise ClientError, e, sys.exc_info()[2]
143
144     def check_busy(self):
145         if self._busy:
146             raise ClientError('already busy with command %r' % self._busy)
147         
148     def ensure_busy(self):
149         if not self._busy:
150             raise ClientError('expected to be busy, but not busy?!')
151         
152     def _not_busy(self):
153         self._busy = None
154
155     def _get_available_commands(self):
156         self.check_busy()
157         self._busy = 'help'
158         conn = self.conn
159         conn.write('help\n')
160         result = set()
161         line = self.conn.readline()
162         if not line == 'Commands:\n':
163             raise ClientError('unexpected help header ' + repr(line))
164         while True:
165             line = self.conn.readline()
166             if line == '\n':
167                 break
168             if not line.startswith('    '):
169                 raise ClientError('unexpected help line ' + repr(line))
170             cmd = line.strip()
171             if not cmd:
172                 raise ClientError('unexpected help line ' + repr(line))
173             result.add(cmd)
174         # FIXME: confusing
175         not_ok = self.check_ok()
176         if not_ok:
177             raise not_ok
178         self._not_busy()
179         return frozenset(result)
180
181     def _require_command(self, name):
182         if name not in self._available_commands:
183             raise ClientError('server does not appear to provide %s command'
184                               % name)
185
186     def sync_indexes(self):
187         self._require_command('list-indexes')
188         self.check_busy()
189         conn = self.conn
190         mkdirp(self.cachedir)
191         # All cached idxs are extra until proven otherwise
192         extra = set()
193         for f in os.listdir(self.cachedir):
194             debug1('%s\n' % f)
195             if f.endswith('.idx'):
196                 extra.add(f)
197         needed = set()
198         conn.write('list-indexes\n')
199         for line in linereader(conn):
200             if not line:
201                 break
202             assert(line.find('/') < 0)
203             parts = line.split(' ')
204             idx = parts[0]
205             if len(parts) == 2 and parts[1] == 'load' and idx not in extra:
206                 # If the server requests that we load an idx and we don't
207                 # already have a copy of it, it is needed
208                 needed.add(idx)
209             # Any idx that the server has heard of is proven not extra
210             extra.discard(idx)
211
212         self.check_ok()
213         debug1('client: removing extra indexes: %s\n' % extra)
214         for idx in extra:
215             os.unlink(os.path.join(self.cachedir, idx))
216         debug1('client: server requested load of: %s\n' % needed)
217         for idx in needed:
218             self.sync_index(idx)
219         git.auto_midx(self.cachedir)
220
221     def sync_index(self, name):
222         self._require_command('send-index')
223         #debug1('requesting %r\n' % name)
224         self.check_busy()
225         mkdirp(self.cachedir)
226         fn = os.path.join(self.cachedir, name)
227         if os.path.exists(fn):
228             msg = "won't request existing .idx, try `bup bloom --check %s`" % fn
229             raise ClientError(msg)
230         self.conn.write('send-index %s\n' % name)
231         n = struct.unpack('!I', self.conn.read(4))[0]
232         assert(n)
233         with atomically_replaced_file(fn, 'w') as f:
234             count = 0
235             progress('Receiving index from server: %d/%d\r' % (count, n))
236             for b in chunkyreader(self.conn, n):
237                 f.write(b)
238                 count += len(b)
239                 qprogress('Receiving index from server: %d/%d\r' % (count, n))
240             progress('Receiving index from server: %d/%d, done.\n' % (count, n))
241             self.check_ok()
242
243     def _make_objcache(self):
244         return git.PackIdxList(self.cachedir)
245
246     def _suggest_packs(self):
247         ob = self._busy
248         if ob:
249             assert(ob == 'receive-objects-v2')
250             self.conn.write('\xff\xff\xff\xff')  # suspend receive-objects-v2
251         suggested = []
252         for line in linereader(self.conn):
253             if not line:
254                 break
255             debug2('%s\n' % line)
256             if line.startswith('index '):
257                 idx = line[6:]
258                 debug1('client: received index suggestion: %s\n'
259                        % git.shorten_hash(idx))
260                 suggested.append(idx)
261             else:
262                 assert(line.endswith('.idx'))
263                 debug1('client: completed writing pack, idx: %s\n'
264                        % git.shorten_hash(line))
265                 suggested.append(line)
266         self.check_ok()
267         if ob:
268             self._busy = None
269         idx = None
270         for idx in suggested:
271             self.sync_index(idx)
272         git.auto_midx(self.cachedir)
273         if ob:
274             self._busy = ob
275             self.conn.write('%s\n' % ob)
276         return idx
277
278     def new_packwriter(self, compression_level=1,
279                        max_pack_size=None, max_pack_objects=None):
280         self._require_command('receive-objects-v2')
281         self.check_busy()
282         def _set_busy():
283             self._busy = 'receive-objects-v2'
284             self.conn.write('receive-objects-v2\n')
285         return PackWriter_Remote(self.conn,
286                                  objcache_maker = self._make_objcache,
287                                  suggest_packs = self._suggest_packs,
288                                  onopen = _set_busy,
289                                  onclose = self._not_busy,
290                                  ensure_busy = self.ensure_busy,
291                                  compression_level=compression_level,
292                                  max_pack_size=max_pack_size,
293                                  max_pack_objects=max_pack_objects)
294
295     def read_ref(self, refname):
296         self._require_command('read-ref')
297         self.check_busy()
298         self.conn.write('read-ref %s\n' % refname)
299         r = self.conn.readline().strip()
300         self.check_ok()
301         if r:
302             assert(len(r) == 40)   # hexified sha
303             return r.decode('hex')
304         else:
305             return None   # nonexistent ref
306
307     def update_ref(self, refname, newval, oldval):
308         self._require_command('update-ref')
309         self.check_busy()
310         self.conn.write('update-ref %s\n%s\n%s\n' 
311                         % (refname, newval.encode('hex'),
312                            (oldval or '').encode('hex')))
313         self.check_ok()
314
315     def join(self, id):
316         self._require_command('join')
317         self.check_busy()
318         self._busy = 'join'
319         # Send 'cat' so we'll work fine with older versions
320         self.conn.write('cat %s\n' % re.sub(r'[\n\r]', '_', id))
321         while 1:
322             sz = struct.unpack('!I', self.conn.read(4))[0]
323             if not sz: break
324             yield self.conn.read(sz)
325         # FIXME: ok to assume the only NotOk is a KerError? (it is true atm)
326         e = self.check_ok()
327         self._not_busy()
328         if e:
329             raise KeyError(str(e))
330
331     def cat_batch(self, refs):
332         self._require_command('cat-batch')
333         self.check_busy()
334         self._busy = 'cat-batch'
335         conn = self.conn
336         conn.write('cat-batch\n')
337         # FIXME: do we want (only) binary protocol?
338         for ref in refs:
339             assert ref
340             assert '\n' not in ref
341             conn.write(ref)
342             conn.write('\n')
343         conn.write('\n')
344         for ref in refs:
345             info = conn.readline()
346             if info == 'missing\n':
347                 yield None, None, None, None
348                 continue
349             if not (info and info.endswith('\n')):
350                 raise ClientError('Hit EOF while looking for object info: %r'
351                                   % info)
352             oidx, oid_t, size = info.split(' ')
353             size = int(size)
354             cr = chunkyreader(conn, size)
355             yield oidx, oid_t, size, cr
356             detritus = next(cr, None)
357             if detritus:
358                 raise ClientError('unexpected leftover data ' + repr(detritus))
359         # FIXME: confusing
360         not_ok = self.check_ok()
361         if not_ok:
362             raise not_ok
363         self._not_busy()
364
365     def refs(self, patterns=None, limit_to_heads=False, limit_to_tags=False):
366         patterns = patterns or tuple()
367         self._require_command('refs')
368         self.check_busy()
369         self._busy = 'refs'
370         conn = self.conn
371         conn.write('refs %s %s\n' % (1 if limit_to_heads else 0,
372                                      1 if limit_to_tags else 0))
373         for pattern in patterns:
374             assert '\n' not in pattern
375             conn.write(pattern)
376             conn.write('\n')
377         conn.write('\n')
378         for line in lines_until_sentinel(conn, '\n', ClientError):
379             line = line[:-1]
380             oidx, name = line.split(' ')
381             if len(oidx) != 40:
382                 raise ClientError('Invalid object fingerprint in %r' % line)
383             if not name:
384                 raise ClientError('Invalid reference name in %r' % line)
385             yield name, oidx.decode('hex')
386         # FIXME: confusing
387         not_ok = self.check_ok()
388         if not_ok:
389             raise not_ok
390         self._not_busy()
391
392     def rev_list(self, refs, count=None, parse=None, format=None):
393         self._require_command('rev-list')
394         assert (count is None) or (isinstance(count, Integral))
395         if format:
396             assert '\n' not in format
397             assert parse
398         for ref in refs:
399             assert ref
400             assert '\n' not in ref
401         self.check_busy()
402         self._busy = 'rev-list'
403         conn = self.conn
404         conn.write('rev-list\n')
405         if count is not None:
406             conn.write(str(count))
407         conn.write('\n')
408         if format:
409             conn.write(format)
410         conn.write('\n')
411         for ref in refs:
412             conn.write(ref)
413             conn.write('\n')
414         conn.write('\n')
415         if not format:
416             for _ in xrange(len(refs)):
417                 line = conn.readline()
418                 if not line:
419                     raise ClientError('unexpected EOF')
420                 line = line.strip()
421                 assert len(line) == 40
422                 yield line
423         else:
424             for _ in xrange(len(refs)):
425                 line = conn.readline()
426                 if not line:
427                     raise ClientError('unexpected EOF')
428                 if not line.startswith('commit '):
429                     raise ClientError('unexpected line ' + repr(line))
430                 yield line[7:].strip(), parse(conn)
431         # FIXME: confusing
432         not_ok = self.check_ok()
433         if not_ok:
434             raise not_ok
435         self._not_busy()
436
437
438 class PackWriter_Remote(git.PackWriter):
439     def __init__(self, conn, objcache_maker, suggest_packs,
440                  onopen, onclose,
441                  ensure_busy,
442                  compression_level=1,
443                  max_pack_size=None,
444                  max_pack_objects=None):
445         git.PackWriter.__init__(self,
446                                 objcache_maker=objcache_maker,
447                                 compression_level=compression_level,
448                                 max_pack_size=max_pack_size,
449                                 max_pack_objects=max_pack_objects)
450         self.file = conn
451         self.filename = 'remote socket'
452         self.suggest_packs = suggest_packs
453         self.onopen = onopen
454         self.onclose = onclose
455         self.ensure_busy = ensure_busy
456         self._packopen = False
457         self._bwcount = 0
458         self._bwtime = time.time()
459
460     def _open(self):
461         if not self._packopen:
462             self.onopen()
463             self._packopen = True
464
465     def _end(self, run_midx=True):
466         assert(run_midx)  # We don't support this via remote yet
467         if self._packopen and self.file:
468             self.file.write('\0\0\0\0')
469             self._packopen = False
470             self.onclose() # Unbusy
471             self.objcache = None
472             return self.suggest_packs() # Returns last idx received
473
474     def close(self):
475         id = self._end()
476         self.file = None
477         return id
478
479     def abort(self):
480         raise ClientError("don't know how to abort remote pack writing")
481
482     def _raw_write(self, datalist, sha):
483         assert(self.file)
484         if not self._packopen:
485             self._open()
486         self.ensure_busy()
487         data = ''.join(datalist)
488         assert(data)
489         assert(sha)
490         crc = zlib.crc32(data) & 0xffffffff
491         outbuf = ''.join((struct.pack('!I', len(data) + 20 + 4),
492                           sha,
493                           struct.pack('!I', crc),
494                           data))
495         try:
496             (self._bwcount, self._bwtime) = _raw_write_bwlimit(
497                     self.file, outbuf, self._bwcount, self._bwtime)
498         except IOError as e:
499             raise ClientError, e, sys.exc_info()[2]
500         self.outbytes += len(data)
501         self.count += 1
502
503         if self.file.has_input():
504             self.suggest_packs()
505             self.objcache.refresh()
506
507         return sha, crc