]> arthur.barton.de Git - bup.git/blob - lib/bup/client.py
get: adjust for python 3 and test there
[bup.git] / lib / bup / client.py
1
2 from __future__ import print_function
3
4 from __future__ import absolute_import
5 from binascii import hexlify, unhexlify
6 import errno, os, re, struct, sys, time, zlib
7 import socket
8
9 from bup import git, ssh, vfs
10 from bup.compat import environ, range, reraise
11 from bup.helpers import (Conn, atomically_replaced_file, chunkyreader, debug1,
12                          debug2, linereader, lines_until_sentinel,
13                          mkdirp, progress, qprogress, DemuxConn, atoi)
14 from bup.io import path_msg
15 from bup.vint import read_bvec, read_vuint, write_bvec
16
17
18 bwlimit = None
19
20
21 class ClientError(Exception):
22     pass
23
24
25 def _raw_write_bwlimit(f, buf, bwcount, bwtime):
26     if not bwlimit:
27         f.write(buf)
28         return (len(buf), time.time())
29     else:
30         # We want to write in reasonably large blocks, but not so large that
31         # they're likely to overflow a router's queue.  So our bwlimit timing
32         # has to be pretty granular.  Also, if it takes too long from one
33         # transmit to the next, we can't just make up for lost time to bring
34         # the average back up to bwlimit - that will risk overflowing the
35         # outbound queue, which defeats the purpose.  So if we fall behind
36         # by more than one block delay, we shouldn't ever try to catch up.
37         for i in range(0,len(buf),4096):
38             now = time.time()
39             next = max(now, bwtime + 1.0*bwcount/bwlimit)
40             time.sleep(next-now)
41             sub = buf[i:i+4096]
42             f.write(sub)
43             bwcount = len(sub)  # might be less than 4096
44             bwtime = next
45         return (bwcount, bwtime)
46
47
48 _protocol_rs = br'([a-z]+)://'
49 _host_rs = br'(?P<sb>\[)?((?(sb)[0-9a-f:]+|[^:/]+))(?(sb)\])'
50 _port_rs = br'(?::(\d+))?'
51 _path_rs = br'(/.*)?'
52 _url_rx = re.compile(br'%s(?:%s%s)?%s' % (_protocol_rs, _host_rs, _port_rs, _path_rs),
53                      re.I)
54
55 def parse_remote(remote):
56     url_match = _url_rx.match(remote)
57     if url_match:
58         if not url_match.group(1) in (b'ssh', b'bup', b'file'):
59             raise ClientError('unexpected protocol: %s'
60                               % url_match.group(1).decode('ascii'))
61         return url_match.group(1,3,4,5)
62     else:
63         rs = remote.split(b':', 1)
64         if len(rs) == 1 or rs[0] in (b'', b'-'):
65             return b'file', None, None, rs[-1]
66         else:
67             return b'ssh', rs[0], None, rs[1]
68
69
70 class Client:
71     def __init__(self, remote, create=False):
72         self._busy = self.conn = None
73         self.sock = self.p = self.pout = self.pin = None
74         is_reverse = environ.get(b'BUP_SERVER_REVERSE')
75         if is_reverse:
76             assert(not remote)
77             remote = b'%s:' % is_reverse
78         (self.protocol, self.host, self.port, self.dir) = parse_remote(remote)
79         # The b'None' here matches python2's behavior of b'%s' % None == 'None',
80         # python3 will (as of version 3.7.5) do the same for str ('%s' % None),
81         # but crashes instead when doing b'%s' % None.
82         cachehost = b'None' if self.host is None else self.host
83         cachedir = b'None' if self.dir is None else self.dir
84         self.cachedir = git.repo(b'index-cache/%s'
85                                  % re.sub(br'[^@\w]',
86                                           b'_',
87                                           b'%s:%s' % (cachehost, cachedir)))
88         if is_reverse:
89             self.pout = os.fdopen(3, 'rb')
90             self.pin = os.fdopen(4, 'wb')
91             self.conn = Conn(self.pout, self.pin)
92         else:
93             if self.protocol in (b'ssh', b'file'):
94                 try:
95                     # FIXME: ssh and file shouldn't use the same module
96                     self.p = ssh.connect(self.host, self.port, b'server')
97                     self.pout = self.p.stdout
98                     self.pin = self.p.stdin
99                     self.conn = Conn(self.pout, self.pin)
100                 except OSError as e:
101                     reraise(ClientError('connect: %s' % e))
102             elif self.protocol == b'bup':
103                 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
104                 self.sock.connect((self.host, atoi(self.port) or 1982))
105                 self.sockw = self.sock.makefile('wb')
106                 self.conn = DemuxConn(self.sock.fileno(), self.sockw)
107         self._available_commands = self._get_available_commands()
108         self._require_command(b'init-dir')
109         self._require_command(b'set-dir')
110         if self.dir:
111             self.dir = re.sub(br'[\r\n]', ' ', self.dir)
112             if create:
113                 self.conn.write(b'init-dir %s\n' % self.dir)
114             else:
115                 self.conn.write(b'set-dir %s\n' % self.dir)
116             self.check_ok()
117         self.sync_indexes()
118
119     def __del__(self):
120         try:
121             self.close()
122         except IOError as e:
123             if e.errno == errno.EPIPE:
124                 pass
125             else:
126                 raise
127
128     def close(self):
129         if self.conn and not self._busy:
130             self.conn.write(b'quit\n')
131         if self.pin:
132             self.pin.close()
133         if self.sock and self.sockw:
134             self.sockw.close()
135             self.sock.shutdown(socket.SHUT_WR)
136         if self.conn:
137             self.conn.close()
138         if self.pout:
139             self.pout.close()
140         if self.sock:
141             self.sock.close()
142         if self.p:
143             self.p.wait()
144             rv = self.p.wait()
145             if rv:
146                 raise ClientError('server tunnel returned exit code %d' % rv)
147         self.conn = None
148         self.sock = self.p = self.pin = self.pout = None
149
150     def check_ok(self):
151         if self.p:
152             rv = self.p.poll()
153             if rv != None:
154                 raise ClientError('server exited unexpectedly with code %r'
155                                   % rv)
156         try:
157             return self.conn.check_ok()
158         except Exception as e:
159             reraise(ClientError(e))
160
161     def check_busy(self):
162         if self._busy:
163             raise ClientError('already busy with command %r' % self._busy)
164         
165     def ensure_busy(self):
166         if not self._busy:
167             raise ClientError('expected to be busy, but not busy?!')
168         
169     def _not_busy(self):
170         self._busy = None
171
172     def _get_available_commands(self):
173         self.check_busy()
174         self._busy = b'help'
175         conn = self.conn
176         conn.write(b'help\n')
177         result = set()
178         line = self.conn.readline()
179         if not line == b'Commands:\n':
180             raise ClientError('unexpected help header ' + repr(line))
181         while True:
182             line = self.conn.readline()
183             if line == b'\n':
184                 break
185             if not line.startswith(b'    '):
186                 raise ClientError('unexpected help line ' + repr(line))
187             cmd = line.strip()
188             if not cmd:
189                 raise ClientError('unexpected help line ' + repr(line))
190             result.add(cmd)
191         # FIXME: confusing
192         not_ok = self.check_ok()
193         if not_ok:
194             raise not_ok
195         self._not_busy()
196         return frozenset(result)
197
198     def _require_command(self, name):
199         if name not in self._available_commands:
200             raise ClientError('server does not appear to provide %s command'
201                               % name.encode('ascii'))
202
203     def sync_indexes(self):
204         self._require_command(b'list-indexes')
205         self.check_busy()
206         conn = self.conn
207         mkdirp(self.cachedir)
208         # All cached idxs are extra until proven otherwise
209         extra = set()
210         for f in os.listdir(self.cachedir):
211             debug1(path_msg(f) + '\n')
212             if f.endswith(b'.idx'):
213                 extra.add(f)
214         needed = set()
215         conn.write(b'list-indexes\n')
216         for line in linereader(conn):
217             if not line:
218                 break
219             assert(line.find(b'/') < 0)
220             parts = line.split(b' ')
221             idx = parts[0]
222             if len(parts) == 2 and parts[1] == b'load' and idx not in extra:
223                 # If the server requests that we load an idx and we don't
224                 # already have a copy of it, it is needed
225                 needed.add(idx)
226             # Any idx that the server has heard of is proven not extra
227             extra.discard(idx)
228
229         self.check_ok()
230         debug1('client: removing extra indexes: %s\n' % extra)
231         for idx in extra:
232             os.unlink(os.path.join(self.cachedir, idx))
233         debug1('client: server requested load of: %s\n' % needed)
234         for idx in needed:
235             self.sync_index(idx)
236         git.auto_midx(self.cachedir)
237
238     def sync_index(self, name):
239         self._require_command(b'send-index')
240         #debug1('requesting %r\n' % name)
241         self.check_busy()
242         mkdirp(self.cachedir)
243         fn = os.path.join(self.cachedir, name)
244         if os.path.exists(fn):
245             msg = ("won't request existing .idx, try `bup bloom --check %s`"
246                    % path_msg(fn))
247             raise ClientError(msg)
248         self.conn.write(b'send-index %s\n' % name)
249         n = struct.unpack('!I', self.conn.read(4))[0]
250         assert(n)
251         with atomically_replaced_file(fn, 'wb') as f:
252             count = 0
253             progress('Receiving index from server: %d/%d\r' % (count, n))
254             for b in chunkyreader(self.conn, n):
255                 f.write(b)
256                 count += len(b)
257                 qprogress('Receiving index from server: %d/%d\r' % (count, n))
258             progress('Receiving index from server: %d/%d, done.\n' % (count, n))
259             self.check_ok()
260
261     def _make_objcache(self):
262         return git.PackIdxList(self.cachedir)
263
264     def _suggest_packs(self):
265         ob = self._busy
266         if ob:
267             assert(ob == b'receive-objects-v2')
268             self.conn.write(b'\xff\xff\xff\xff')  # suspend receive-objects-v2
269         suggested = []
270         for line in linereader(self.conn):
271             if not line:
272                 break
273             debug2('%r\n' % line)
274             if line.startswith(b'index '):
275                 idx = line[6:]
276                 debug1('client: received index suggestion: %s\n'
277                        % git.shorten_hash(idx).decode('ascii'))
278                 suggested.append(idx)
279             else:
280                 assert(line.endswith(b'.idx'))
281                 debug1('client: completed writing pack, idx: %s\n'
282                        % git.shorten_hash(line).decode('ascii'))
283                 suggested.append(line)
284         self.check_ok()
285         if ob:
286             self._busy = None
287         idx = None
288         for idx in suggested:
289             self.sync_index(idx)
290         git.auto_midx(self.cachedir)
291         if ob:
292             self._busy = ob
293             self.conn.write(b'%s\n' % ob)
294         return idx
295
296     def new_packwriter(self, compression_level=1,
297                        max_pack_size=None, max_pack_objects=None):
298         self._require_command(b'receive-objects-v2')
299         self.check_busy()
300         def _set_busy():
301             self._busy = b'receive-objects-v2'
302             self.conn.write(b'receive-objects-v2\n')
303         return PackWriter_Remote(self.conn,
304                                  objcache_maker = self._make_objcache,
305                                  suggest_packs = self._suggest_packs,
306                                  onopen = _set_busy,
307                                  onclose = self._not_busy,
308                                  ensure_busy = self.ensure_busy,
309                                  compression_level=compression_level,
310                                  max_pack_size=max_pack_size,
311                                  max_pack_objects=max_pack_objects)
312
313     def read_ref(self, refname):
314         self._require_command(b'read-ref')
315         self.check_busy()
316         self.conn.write(b'read-ref %s\n' % refname)
317         r = self.conn.readline().strip()
318         self.check_ok()
319         if r:
320             assert(len(r) == 40)   # hexified sha
321             return unhexlify(r)
322         else:
323             return None   # nonexistent ref
324
325     def update_ref(self, refname, newval, oldval):
326         self._require_command(b'update-ref')
327         self.check_busy()
328         self.conn.write(b'update-ref %s\n%s\n%s\n'
329                         % (refname, hexlify(newval),
330                            hexlify(oldval) if oldval else b''))
331         self.check_ok()
332
333     def join(self, id):
334         self._require_command(b'join')
335         self.check_busy()
336         self._busy = b'join'
337         # Send 'cat' so we'll work fine with older versions
338         self.conn.write(b'cat %s\n' % re.sub(br'[\n\r]', b'_', id))
339         while 1:
340             sz = struct.unpack('!I', self.conn.read(4))[0]
341             if not sz: break
342             yield self.conn.read(sz)
343         # FIXME: ok to assume the only NotOk is a KerError? (it is true atm)
344         e = self.check_ok()
345         self._not_busy()
346         if e:
347             raise KeyError(str(e))
348
349     def cat_batch(self, refs):
350         self._require_command(b'cat-batch')
351         self.check_busy()
352         self._busy = b'cat-batch'
353         conn = self.conn
354         conn.write(b'cat-batch\n')
355         # FIXME: do we want (only) binary protocol?
356         for ref in refs:
357             assert ref
358             assert b'\n' not in ref
359             conn.write(ref)
360             conn.write(b'\n')
361         conn.write(b'\n')
362         for ref in refs:
363             info = conn.readline()
364             if info == b'missing\n':
365                 yield None, None, None, None
366                 continue
367             if not (info and info.endswith(b'\n')):
368                 raise ClientError('Hit EOF while looking for object info: %r'
369                                   % info)
370             oidx, oid_t, size = info.split(b' ')
371             size = int(size)
372             cr = chunkyreader(conn, size)
373             yield oidx, oid_t, size, cr
374             detritus = next(cr, None)
375             if detritus:
376                 raise ClientError('unexpected leftover data ' + repr(detritus))
377         # FIXME: confusing
378         not_ok = self.check_ok()
379         if not_ok:
380             raise not_ok
381         self._not_busy()
382
383     def refs(self, patterns=None, limit_to_heads=False, limit_to_tags=False):
384         patterns = patterns or tuple()
385         self._require_command(b'refs')
386         self.check_busy()
387         self._busy = b'refs'
388         conn = self.conn
389         conn.write(b'refs %d %d\n' % (1 if limit_to_heads else 0,
390                                       1 if limit_to_tags else 0))
391         for pattern in patterns:
392             assert b'\n' not in pattern
393             conn.write(pattern)
394             conn.write(b'\n')
395         conn.write(b'\n')
396         for line in lines_until_sentinel(conn, b'\n', ClientError):
397             line = line[:-1]
398             oidx, name = line.split(b' ')
399             if len(oidx) != 40:
400                 raise ClientError('Invalid object fingerprint in %r' % line)
401             if not name:
402                 raise ClientError('Invalid reference name in %r' % line)
403             yield name, unhexlify(oidx)
404         # FIXME: confusing
405         not_ok = self.check_ok()
406         if not_ok:
407             raise not_ok
408         self._not_busy()
409
410     def rev_list(self, refs, count=None, parse=None, format=None):
411         """See git.rev_list for the general semantics, but note that with the
412         current interface, the parse function must be able to handle
413         (consume) any blank lines produced by the format because the
414         first one received that it doesn't consume will be interpreted
415         as a terminator for the entire rev-list result.
416
417         """
418         self._require_command(b'rev-list')
419         assert (count is None) or (isinstance(count, Integral))
420         if format:
421             assert b'\n' not in format
422             assert parse
423         for ref in refs:
424             assert ref
425             assert b'\n' not in ref
426         self.check_busy()
427         self._busy = b'rev-list'
428         conn = self.conn
429         conn.write(b'rev-list\n')
430         if count is not None:
431             conn.write(b'%d' % count)
432         conn.write(b'\n')
433         if format:
434             conn.write(format)
435         conn.write(b'\n')
436         for ref in refs:
437             conn.write(ref)
438             conn.write(b'\n')
439         conn.write(b'\n')
440         if not format:
441             for line in lines_until_sentinel(conn, b'\n', ClientError):
442                 line = line.strip()
443                 assert len(line) == 40
444                 yield line
445         else:
446             for line in lines_until_sentinel(conn, b'\n', ClientError):
447                 if not line.startswith(b'commit '):
448                     raise ClientError('unexpected line ' + repr(line))
449                 cmt_oidx = line[7:].strip()
450                 assert len(cmt_oidx) == 40
451                 yield cmt_oidx, parse(conn)
452         # FIXME: confusing
453         not_ok = self.check_ok()
454         if not_ok:
455             raise not_ok
456         self._not_busy()
457
458     def resolve(self, path, parent=None, want_meta=True, follow=False):
459         self._require_command(b'resolve')
460         self.check_busy()
461         self._busy = b'resolve'
462         conn = self.conn
463         conn.write(b'resolve %d\n' % ((1 if want_meta else 0)
464                                       | (2 if follow else 0)
465                                       | (4 if parent else 0)))
466         if parent:
467             vfs.write_resolution(conn, parent)
468         write_bvec(conn, path)
469         success = ord(conn.read(1))
470         assert success in (0, 1)
471         if success:
472             result = vfs.read_resolution(conn)
473         else:
474             result = vfs.read_ioerror(conn)
475         # FIXME: confusing
476         not_ok = self.check_ok()
477         if not_ok:
478             raise not_ok
479         self._not_busy()
480         if isinstance(result, vfs.IOError):
481             raise result
482         return result
483
484
485 class PackWriter_Remote(git.PackWriter):
486     def __init__(self, conn, objcache_maker, suggest_packs,
487                  onopen, onclose,
488                  ensure_busy,
489                  compression_level=1,
490                  max_pack_size=None,
491                  max_pack_objects=None):
492         git.PackWriter.__init__(self,
493                                 objcache_maker=objcache_maker,
494                                 compression_level=compression_level,
495                                 max_pack_size=max_pack_size,
496                                 max_pack_objects=max_pack_objects)
497         self.file = conn
498         self.filename = b'remote socket'
499         self.suggest_packs = suggest_packs
500         self.onopen = onopen
501         self.onclose = onclose
502         self.ensure_busy = ensure_busy
503         self._packopen = False
504         self._bwcount = 0
505         self._bwtime = time.time()
506
507     def _open(self):
508         if not self._packopen:
509             self.onopen()
510             self._packopen = True
511
512     def _end(self, run_midx=True):
513         assert(run_midx)  # We don't support this via remote yet
514         if self._packopen and self.file:
515             self.file.write(b'\0\0\0\0')
516             self._packopen = False
517             self.onclose() # Unbusy
518             self.objcache = None
519             return self.suggest_packs() # Returns last idx received
520
521     def close(self):
522         id = self._end()
523         self.file = None
524         return id
525
526     def abort(self):
527         raise ClientError("don't know how to abort remote pack writing")
528
529     def _raw_write(self, datalist, sha):
530         assert(self.file)
531         if not self._packopen:
532             self._open()
533         self.ensure_busy()
534         data = b''.join(datalist)
535         assert(data)
536         assert(sha)
537         crc = zlib.crc32(data) & 0xffffffff
538         outbuf = b''.join((struct.pack('!I', len(data) + 20 + 4),
539                            sha,
540                            struct.pack('!I', crc),
541                            data))
542         try:
543             (self._bwcount, self._bwtime) = _raw_write_bwlimit(
544                     self.file, outbuf, self._bwcount, self._bwtime)
545         except IOError as e:
546             reraise(ClientError(e))
547         self.outbytes += len(data)
548         self.count += 1
549
550         if self.file.has_input():
551             self.suggest_packs()
552             self.objcache.refresh()
553
554         return sha, crc