]> arthur.barton.de Git - bup.git/blob - lib/bup/client.py
bup.client: accommodate python 3
[bup.git] / lib / bup / client.py
1
2 from __future__ import print_function
3
4 from __future__ import absolute_import
5 from binascii import hexlify, unhexlify
6 import errno, os, re, struct, sys, time, zlib
7
8 from bup import git, ssh, vfs
9 from bup.compat import environ, range, reraise
10 from bup.helpers import (Conn, atomically_replaced_file, chunkyreader, debug1,
11                          debug2, linereader, lines_until_sentinel,
12                          mkdirp, progress, qprogress)
13 from bup.io import path_msg
14 from bup.vint import read_bvec, read_vuint, write_bvec
15
16
17 bwlimit = None
18
19
20 class ClientError(Exception):
21     pass
22
23
24 def _raw_write_bwlimit(f, buf, bwcount, bwtime):
25     if not bwlimit:
26         f.write(buf)
27         return (len(buf), time.time())
28     else:
29         # We want to write in reasonably large blocks, but not so large that
30         # they're likely to overflow a router's queue.  So our bwlimit timing
31         # has to be pretty granular.  Also, if it takes too long from one
32         # transmit to the next, we can't just make up for lost time to bring
33         # the average back up to bwlimit - that will risk overflowing the
34         # outbound queue, which defeats the purpose.  So if we fall behind
35         # by more than one block delay, we shouldn't ever try to catch up.
36         for i in range(0,len(buf),4096):
37             now = time.time()
38             next = max(now, bwtime + 1.0*bwcount/bwlimit)
39             time.sleep(next-now)
40             sub = buf[i:i+4096]
41             f.write(sub)
42             bwcount = len(sub)  # might be less than 4096
43             bwtime = next
44         return (bwcount, bwtime)
45
46
47 _protocol_rs = br'([a-z]+)://'
48 _host_rs = br'(?P<sb>\[)?((?(sb)[0-9a-f:]+|[^:/]+))(?(sb)\])'
49 _port_rs = br'(?::(\d+))?'
50 _path_rs = br'(/.*)?'
51 _url_rx = re.compile(br'%s(?:%s%s)?%s' % (_protocol_rs, _host_rs, _port_rs, _path_rs),
52                      re.I)
53
54 def parse_remote(remote):
55     url_match = _url_rx.match(remote)
56     if url_match:
57         if not url_match.group(1) in (b'ssh', b'bup', b'file'):
58             raise ClientError('unexpected protocol: %s'
59                               % url_match.group(1).decode('ascii'))
60         return url_match.group(1,3,4,5)
61     else:
62         rs = remote.split(b':', 1)
63         if len(rs) == 1 or rs[0] in (b'', b'-'):
64             return b'file', None, None, rs[-1]
65         else:
66             return b'ssh', rs[0], None, rs[1]
67
68
69 class Client:
70     def __init__(self, remote, create=False):
71         self._busy = self.conn = None
72         self.sock = self.p = self.pout = self.pin = None
73         is_reverse = environ.get(b'BUP_SERVER_REVERSE')
74         if is_reverse:
75             assert(not remote)
76             remote = b'%s:' % is_reverse
77         (self.protocol, self.host, self.port, self.dir) = parse_remote(remote)
78         self.cachedir = git.repo(b'index-cache/%s'
79                                  % re.sub(br'[^@\w]',
80                                           b'_',
81                                           # FIXME: the Nones just
82                                           # match python 2's behavior
83                                           b'%s:%s' % (self.host or b'None',
84                                                       self.dir or b'None')))
85         if is_reverse:
86             self.pout = os.fdopen(3, 'rb')
87             self.pin = os.fdopen(4, 'wb')
88             self.conn = Conn(self.pout, self.pin)
89         else:
90             if self.protocol in (b'ssh', b'file'):
91                 try:
92                     # FIXME: ssh and file shouldn't use the same module
93                     self.p = ssh.connect(self.host, self.port, b'server')
94                     self.pout = self.p.stdout
95                     self.pin = self.p.stdin
96                     self.conn = Conn(self.pout, self.pin)
97                 except OSError as e:
98                     reraise(ClientError('connect: %s' % e))
99             elif self.protocol == b'bup':
100                 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
101                 self.sock.connect((self.host, atoi(self.port) or 1982))
102                 self.sockw = self.sock.makefile('wb')
103                 self.conn = DemuxConn(self.sock.fileno(), self.sockw)
104         self._available_commands = self._get_available_commands()
105         self._require_command(b'init-dir')
106         self._require_command(b'set-dir')
107         if self.dir:
108             self.dir = re.sub(br'[\r\n]', ' ', self.dir)
109             if create:
110                 self.conn.write(b'init-dir %s\n' % self.dir)
111             else:
112                 self.conn.write(b'set-dir %s\n' % self.dir)
113             self.check_ok()
114         self.sync_indexes()
115
116     def __del__(self):
117         try:
118             self.close()
119         except IOError as e:
120             if e.errno == errno.EPIPE:
121                 pass
122             else:
123                 raise
124
125     def close(self):
126         if self.conn and not self._busy:
127             self.conn.write(b'quit\n')
128         if self.pin:
129             self.pin.close()
130         if self.sock and self.sockw:
131             self.sockw.close()
132             self.sock.shutdown(socket.SHUT_WR)
133         if self.conn:
134             self.conn.close()
135         if self.pout:
136             self.pout.close()
137         if self.sock:
138             self.sock.close()
139         if self.p:
140             self.p.wait()
141             rv = self.p.wait()
142             if rv:
143                 raise ClientError('server tunnel returned exit code %d' % rv)
144         self.conn = None
145         self.sock = self.p = self.pin = self.pout = None
146
147     def check_ok(self):
148         if self.p:
149             rv = self.p.poll()
150             if rv != None:
151                 raise ClientError('server exited unexpectedly with code %r'
152                                   % rv)
153         try:
154             return self.conn.check_ok()
155         except Exception as e:
156             reraise(ClientError(e))
157
158     def check_busy(self):
159         if self._busy:
160             raise ClientError('already busy with command %r' % self._busy)
161         
162     def ensure_busy(self):
163         if not self._busy:
164             raise ClientError('expected to be busy, but not busy?!')
165         
166     def _not_busy(self):
167         self._busy = None
168
169     def _get_available_commands(self):
170         self.check_busy()
171         self._busy = b'help'
172         conn = self.conn
173         conn.write(b'help\n')
174         result = set()
175         line = self.conn.readline()
176         if not line == b'Commands:\n':
177             raise ClientError('unexpected help header ' + repr(line))
178         while True:
179             line = self.conn.readline()
180             if line == b'\n':
181                 break
182             if not line.startswith(b'    '):
183                 raise ClientError('unexpected help line ' + repr(line))
184             cmd = line.strip()
185             if not cmd:
186                 raise ClientError('unexpected help line ' + repr(line))
187             result.add(cmd)
188         # FIXME: confusing
189         not_ok = self.check_ok()
190         if not_ok:
191             raise not_ok
192         self._not_busy()
193         return frozenset(result)
194
195     def _require_command(self, name):
196         if name not in self._available_commands:
197             raise ClientError('server does not appear to provide %s command'
198                               % name.encode('ascii'))
199
200     def sync_indexes(self):
201         self._require_command(b'list-indexes')
202         self.check_busy()
203         conn = self.conn
204         mkdirp(self.cachedir)
205         # All cached idxs are extra until proven otherwise
206         extra = set()
207         for f in os.listdir(self.cachedir):
208             debug1(path_msg(f) + '\n')
209             if f.endswith(b'.idx'):
210                 extra.add(f)
211         needed = set()
212         conn.write(b'list-indexes\n')
213         for line in linereader(conn):
214             if not line:
215                 break
216             assert(line.find(b'/') < 0)
217             parts = line.split(b' ')
218             idx = parts[0]
219             if len(parts) == 2 and parts[1] == b'load' and idx not in extra:
220                 # If the server requests that we load an idx and we don't
221                 # already have a copy of it, it is needed
222                 needed.add(idx)
223             # Any idx that the server has heard of is proven not extra
224             extra.discard(idx)
225
226         self.check_ok()
227         debug1('client: removing extra indexes: %s\n' % extra)
228         for idx in extra:
229             os.unlink(os.path.join(self.cachedir, idx))
230         debug1('client: server requested load of: %s\n' % needed)
231         for idx in needed:
232             self.sync_index(idx)
233         git.auto_midx(self.cachedir)
234
235     def sync_index(self, name):
236         self._require_command(b'send-index')
237         #debug1('requesting %r\n' % name)
238         self.check_busy()
239         mkdirp(self.cachedir)
240         fn = os.path.join(self.cachedir, name)
241         if os.path.exists(fn):
242             msg = ("won't request existing .idx, try `bup bloom --check %s`"
243                    % path_msg(fn))
244             raise ClientError(msg)
245         self.conn.write(b'send-index %s\n' % name)
246         n = struct.unpack('!I', self.conn.read(4))[0]
247         assert(n)
248         with atomically_replaced_file(fn, 'wb') as f:
249             count = 0
250             progress('Receiving index from server: %d/%d\r' % (count, n))
251             for b in chunkyreader(self.conn, n):
252                 f.write(b)
253                 count += len(b)
254                 qprogress('Receiving index from server: %d/%d\r' % (count, n))
255             progress('Receiving index from server: %d/%d, done.\n' % (count, n))
256             self.check_ok()
257
258     def _make_objcache(self):
259         return git.PackIdxList(self.cachedir)
260
261     def _suggest_packs(self):
262         ob = self._busy
263         if ob:
264             assert(ob == b'receive-objects-v2')
265             self.conn.write(b'\xff\xff\xff\xff')  # suspend receive-objects-v2
266         suggested = []
267         for line in linereader(self.conn):
268             if not line:
269                 break
270             debug2('%r\n' % line)
271             if line.startswith(b'index '):
272                 idx = line[6:]
273                 debug1('client: received index suggestion: %s\n'
274                        % git.shorten_hash(idx).decode('ascii'))
275                 suggested.append(idx)
276             else:
277                 assert(line.endswith(b'.idx'))
278                 debug1('client: completed writing pack, idx: %s\n'
279                        % git.shorten_hash(line).decode('ascii'))
280                 suggested.append(line)
281         self.check_ok()
282         if ob:
283             self._busy = None
284         idx = None
285         for idx in suggested:
286             self.sync_index(idx)
287         git.auto_midx(self.cachedir)
288         if ob:
289             self._busy = ob
290             self.conn.write(b'%s\n' % ob)
291         return idx
292
293     def new_packwriter(self, compression_level=1,
294                        max_pack_size=None, max_pack_objects=None):
295         self._require_command(b'receive-objects-v2')
296         self.check_busy()
297         def _set_busy():
298             self._busy = b'receive-objects-v2'
299             self.conn.write(b'receive-objects-v2\n')
300         return PackWriter_Remote(self.conn,
301                                  objcache_maker = self._make_objcache,
302                                  suggest_packs = self._suggest_packs,
303                                  onopen = _set_busy,
304                                  onclose = self._not_busy,
305                                  ensure_busy = self.ensure_busy,
306                                  compression_level=compression_level,
307                                  max_pack_size=max_pack_size,
308                                  max_pack_objects=max_pack_objects)
309
310     def read_ref(self, refname):
311         self._require_command(b'read-ref')
312         self.check_busy()
313         self.conn.write(b'read-ref %s\n' % refname)
314         r = self.conn.readline().strip()
315         self.check_ok()
316         if r:
317             assert(len(r) == 40)   # hexified sha
318             return r.decode('hex')
319         else:
320             return None   # nonexistent ref
321
322     def update_ref(self, refname, newval, oldval):
323         self._require_command(b'update-ref')
324         self.check_busy()
325         self.conn.write(b'update-ref %s\n%s\n%s\n'
326                         % (refname, hexlify(newval),
327                            hexlify(oldval) if oldval else b''))
328         self.check_ok()
329
330     def join(self, id):
331         self._require_command(b'join')
332         self.check_busy()
333         self._busy = b'join'
334         # Send 'cat' so we'll work fine with older versions
335         self.conn.write(b'cat %s\n' % re.sub(br'[\n\r]', b'_', id))
336         while 1:
337             sz = struct.unpack('!I', self.conn.read(4))[0]
338             if not sz: break
339             yield self.conn.read(sz)
340         # FIXME: ok to assume the only NotOk is a KerError? (it is true atm)
341         e = self.check_ok()
342         self._not_busy()
343         if e:
344             raise KeyError(str(e))
345
346     def cat_batch(self, refs):
347         self._require_command(b'cat-batch')
348         self.check_busy()
349         self._busy = b'cat-batch'
350         conn = self.conn
351         conn.write(b'cat-batch\n')
352         # FIXME: do we want (only) binary protocol?
353         for ref in refs:
354             assert ref
355             assert b'\n' not in ref
356             conn.write(ref)
357             conn.write(b'\n')
358         conn.write(b'\n')
359         for ref in refs:
360             info = conn.readline()
361             if info == b'missing\n':
362                 yield None, None, None, None
363                 continue
364             if not (info and info.endswith(b'\n')):
365                 raise ClientError('Hit EOF while looking for object info: %r'
366                                   % info)
367             oidx, oid_t, size = info.split(b' ')
368             size = int(size)
369             cr = chunkyreader(conn, size)
370             yield oidx, oid_t, size, cr
371             detritus = next(cr, None)
372             if detritus:
373                 raise ClientError('unexpected leftover data ' + repr(detritus))
374         # FIXME: confusing
375         not_ok = self.check_ok()
376         if not_ok:
377             raise not_ok
378         self._not_busy()
379
380     def refs(self, patterns=None, limit_to_heads=False, limit_to_tags=False):
381         patterns = patterns or tuple()
382         self._require_command(b'refs')
383         self.check_busy()
384         self._busy = b'refs'
385         conn = self.conn
386         conn.write(b'refs %d %d\n' % (1 if limit_to_heads else 0,
387                                       1 if limit_to_tags else 0))
388         for pattern in patterns:
389             assert b'\n' not in pattern
390             conn.write(pattern)
391             conn.write(b'\n')
392         conn.write(b'\n')
393         for line in lines_until_sentinel(conn, b'\n', ClientError):
394             line = line[:-1]
395             oidx, name = line.split(b' ')
396             if len(oidx) != 40:
397                 raise ClientError('Invalid object fingerprint in %r' % line)
398             if not name:
399                 raise ClientError('Invalid reference name in %r' % line)
400             yield name, unhexlify(oidx)
401         # FIXME: confusing
402         not_ok = self.check_ok()
403         if not_ok:
404             raise not_ok
405         self._not_busy()
406
407     def rev_list(self, refs, count=None, parse=None, format=None):
408         """See git.rev_list for the general semantics, but note that with the
409         current interface, the parse function must be able to handle
410         (consume) any blank lines produced by the format because the
411         first one received that it doesn't consume will be interpreted
412         as a terminator for the entire rev-list result.
413
414         """
415         self._require_command(b'rev-list')
416         assert (count is None) or (isinstance(count, Integral))
417         if format:
418             assert b'\n' not in format
419             assert parse
420         for ref in refs:
421             assert ref
422             assert b'\n' not in ref
423         self.check_busy()
424         self._busy = b'rev-list'
425         conn = self.conn
426         conn.write(b'rev-list\n')
427         if count is not None:
428             conn.write(b'%d' % count)
429         conn.write(b'\n')
430         if format:
431             conn.write(format)
432         conn.write(b'\n')
433         for ref in refs:
434             conn.write(ref)
435             conn.write(b'\n')
436         conn.write(b'\n')
437         if not format:
438             for line in lines_until_sentinel(conn, b'\n', ClientError):
439                 line = line.strip()
440                 assert len(line) == 40
441                 yield line
442         else:
443             for line in lines_until_sentinel(conn, b'\n', ClientError):
444                 if not line.startswith(b'commit '):
445                     raise ClientError('unexpected line ' + repr(line))
446                 cmt_oidx = line[7:].strip()
447                 assert len(cmt_oidx) == 40
448                 yield cmt_oidx, parse(conn)
449         # FIXME: confusing
450         not_ok = self.check_ok()
451         if not_ok:
452             raise not_ok
453         self._not_busy()
454
455     def resolve(self, path, parent=None, want_meta=True, follow=False):
456         self._require_command(b'resolve')
457         self.check_busy()
458         self._busy = b'resolve'
459         conn = self.conn
460         conn.write(b'resolve %d\n' % ((1 if want_meta else 0)
461                                       | (2 if follow else 0)
462                                       | (4 if parent else 0)))
463         if parent:
464             vfs.write_resolution(conn, parent)
465         write_bvec(conn, path)
466         success = ord(conn.read(1))
467         assert success in (0, 1)
468         if success:
469             result = vfs.read_resolution(conn)
470         else:
471             result = vfs.read_ioerror(conn)
472         # FIXME: confusing
473         not_ok = self.check_ok()
474         if not_ok:
475             raise not_ok
476         self._not_busy()
477         if isinstance(result, vfs.IOError):
478             raise result
479         return result
480
481
482 class PackWriter_Remote(git.PackWriter):
483     def __init__(self, conn, objcache_maker, suggest_packs,
484                  onopen, onclose,
485                  ensure_busy,
486                  compression_level=1,
487                  max_pack_size=None,
488                  max_pack_objects=None):
489         git.PackWriter.__init__(self,
490                                 objcache_maker=objcache_maker,
491                                 compression_level=compression_level,
492                                 max_pack_size=max_pack_size,
493                                 max_pack_objects=max_pack_objects)
494         self.file = conn
495         self.filename = b'remote socket'
496         self.suggest_packs = suggest_packs
497         self.onopen = onopen
498         self.onclose = onclose
499         self.ensure_busy = ensure_busy
500         self._packopen = False
501         self._bwcount = 0
502         self._bwtime = time.time()
503
504     def _open(self):
505         if not self._packopen:
506             self.onopen()
507             self._packopen = True
508
509     def _end(self, run_midx=True):
510         assert(run_midx)  # We don't support this via remote yet
511         if self._packopen and self.file:
512             self.file.write(b'\0\0\0\0')
513             self._packopen = False
514             self.onclose() # Unbusy
515             self.objcache = None
516             return self.suggest_packs() # Returns last idx received
517
518     def close(self):
519         id = self._end()
520         self.file = None
521         return id
522
523     def abort(self):
524         raise ClientError("don't know how to abort remote pack writing")
525
526     def _raw_write(self, datalist, sha):
527         assert(self.file)
528         if not self._packopen:
529             self._open()
530         self.ensure_busy()
531         data = b''.join(datalist)
532         assert(data)
533         assert(sha)
534         crc = zlib.crc32(data) & 0xffffffff
535         outbuf = b''.join((struct.pack('!I', len(data) + 20 + 4),
536                            sha,
537                            struct.pack('!I', crc),
538                            data))
539         try:
540             (self._bwcount, self._bwtime) = _raw_write_bwlimit(
541                     self.file, outbuf, self._bwcount, self._bwtime)
542         except IOError as e:
543             reraise(ClientError(e))
544         self.outbytes += len(data)
545         self.count += 1
546
547         if self.file.has_input():
548             self.suggest_packs()
549             self.objcache.refresh()
550
551         return sha, crc