]> arthur.barton.de Git - bup.git/blob - lib/bup/client.py
Remove Client __del__ in favor of context management
[bup.git] / lib / bup / client.py
1
2 from __future__ import print_function
3
4 from __future__ import absolute_import
5 from binascii import hexlify, unhexlify
6 import os, re, struct, time, zlib
7 import socket
8
9 from bup import git, ssh, vfs
10 from bup.compat import environ, pending_raise, range, reraise
11 from bup.helpers import (Conn, atomically_replaced_file, chunkyreader, debug1,
12                          debug2, linereader, lines_until_sentinel,
13                          mkdirp, progress, qprogress, DemuxConn)
14 from bup.io import path_msg
15 from bup.vint import write_bvec
16
17
18 bwlimit = None
19
20
21 class ClientError(Exception):
22     pass
23
24
25 def _raw_write_bwlimit(f, buf, bwcount, bwtime):
26     if not bwlimit:
27         f.write(buf)
28         return (len(buf), time.time())
29     else:
30         # We want to write in reasonably large blocks, but not so large that
31         # they're likely to overflow a router's queue.  So our bwlimit timing
32         # has to be pretty granular.  Also, if it takes too long from one
33         # transmit to the next, we can't just make up for lost time to bring
34         # the average back up to bwlimit - that will risk overflowing the
35         # outbound queue, which defeats the purpose.  So if we fall behind
36         # by more than one block delay, we shouldn't ever try to catch up.
37         for i in range(0,len(buf),4096):
38             now = time.time()
39             next = max(now, bwtime + 1.0*bwcount/bwlimit)
40             time.sleep(next-now)
41             sub = buf[i:i+4096]
42             f.write(sub)
43             bwcount = len(sub)  # might be less than 4096
44             bwtime = next
45         return (bwcount, bwtime)
46
47
48 _protocol_rs = br'([a-z]+)://'
49 _host_rs = br'(?P<sb>\[)?((?(sb)[0-9a-f:]+|[^:/]+))(?(sb)\])'
50 _port_rs = br'(?::(\d+))?'
51 _path_rs = br'(/.*)?'
52 _url_rx = re.compile(br'%s(?:%s%s)?%s' % (_protocol_rs, _host_rs, _port_rs, _path_rs),
53                      re.I)
54
55 def parse_remote(remote):
56     url_match = _url_rx.match(remote)
57     if url_match:
58         if not url_match.group(1) in (b'ssh', b'bup', b'file'):
59             raise ClientError('unexpected protocol: %s'
60                               % url_match.group(1).decode('ascii'))
61         return url_match.group(1,3,4,5)
62     else:
63         rs = remote.split(b':', 1)
64         if len(rs) == 1 or rs[0] in (b'', b'-'):
65             return b'file', None, None, rs[-1]
66         else:
67             return b'ssh', rs[0], None, rs[1]
68
69
70 class Client:
71     def __init__(self, remote, create=False):
72         self._busy = self.conn = None
73         self.sock = self.p = self.pout = self.pin = None
74         is_reverse = environ.get(b'BUP_SERVER_REVERSE')
75         if is_reverse:
76             assert(not remote)
77             remote = b'%s:' % is_reverse
78         (self.protocol, self.host, self.port, self.dir) = parse_remote(remote)
79         # The b'None' here matches python2's behavior of b'%s' % None == 'None',
80         # python3 will (as of version 3.7.5) do the same for str ('%s' % None),
81         # but crashes instead when doing b'%s' % None.
82         cachehost = b'None' if self.host is None else self.host
83         cachedir = b'None' if self.dir is None else self.dir
84         self.cachedir = git.repo(b'index-cache/%s'
85                                  % re.sub(br'[^@\w]',
86                                           b'_',
87                                           b'%s:%s' % (cachehost, cachedir)))
88         if is_reverse:
89             self.pout = os.fdopen(3, 'rb')
90             self.pin = os.fdopen(4, 'wb')
91             self.conn = Conn(self.pout, self.pin)
92         else:
93             if self.protocol in (b'ssh', b'file'):
94                 try:
95                     # FIXME: ssh and file shouldn't use the same module
96                     self.p = ssh.connect(self.host, self.port, b'server')
97                     self.pout = self.p.stdout
98                     self.pin = self.p.stdin
99                     self.conn = Conn(self.pout, self.pin)
100                 except OSError as e:
101                     reraise(ClientError('connect: %s' % e))
102             elif self.protocol == b'bup':
103                 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
104                 self.sock.connect((self.host,
105                                    1982 if self.port is None else int(self.port)))
106                 self.sockw = self.sock.makefile('wb')
107                 self.conn = DemuxConn(self.sock.fileno(), self.sockw)
108         self._available_commands = self._get_available_commands()
109         self._require_command(b'init-dir')
110         self._require_command(b'set-dir')
111         if self.dir:
112             self.dir = re.sub(br'[\r\n]', ' ', self.dir)
113             if create:
114                 self.conn.write(b'init-dir %s\n' % self.dir)
115             else:
116                 self.conn.write(b'set-dir %s\n' % self.dir)
117             self.check_ok()
118         self.sync_indexes()
119
120     def __enter__(self):
121         return self
122
123     def __exit__(self, type, value, traceback):
124         with pending_raise(value, rethrow=False):
125             self.close()
126
127     def close(self):
128         if self.conn and not self._busy:
129             self.conn.write(b'quit\n')
130         if self.pin:
131             self.pin.close()
132         if self.sock and self.sockw:
133             self.sockw.close()
134             self.sock.shutdown(socket.SHUT_WR)
135         if self.conn:
136             self.conn.close()
137         if self.pout:
138             self.pout.close()
139         if self.sock:
140             self.sock.close()
141         if self.p:
142             self.p.wait()
143             rv = self.p.wait()
144             if rv:
145                 raise ClientError('server tunnel returned exit code %d' % rv)
146         self.conn = None
147         self.sock = self.p = self.pin = self.pout = None
148
149     def check_ok(self):
150         if self.p:
151             rv = self.p.poll()
152             if rv != None:
153                 raise ClientError('server exited unexpectedly with code %r'
154                                   % rv)
155         try:
156             return self.conn.check_ok()
157         except Exception as e:
158             reraise(ClientError(e))
159             # reraise doesn't return
160             return None
161
162     def check_busy(self):
163         if self._busy:
164             raise ClientError('already busy with command %r' % self._busy)
165
166     def ensure_busy(self):
167         if not self._busy:
168             raise ClientError('expected to be busy, but not busy?!')
169
170     def _not_busy(self):
171         self._busy = None
172
173     def _get_available_commands(self):
174         self.check_busy()
175         self._busy = b'help'
176         conn = self.conn
177         conn.write(b'help\n')
178         result = set()
179         line = self.conn.readline()
180         if not line == b'Commands:\n':
181             raise ClientError('unexpected help header ' + repr(line))
182         while True:
183             line = self.conn.readline()
184             if line == b'\n':
185                 break
186             if not line.startswith(b'    '):
187                 raise ClientError('unexpected help line ' + repr(line))
188             cmd = line.strip()
189             if not cmd:
190                 raise ClientError('unexpected help line ' + repr(line))
191             result.add(cmd)
192         # FIXME: confusing
193         not_ok = self.check_ok()
194         if not_ok:
195             raise not_ok
196         self._not_busy()
197         return frozenset(result)
198
199     def _require_command(self, name):
200         if name not in self._available_commands:
201             raise ClientError('server does not appear to provide %s command'
202                               % name.encode('ascii'))
203
204     def sync_indexes(self):
205         self._require_command(b'list-indexes')
206         self.check_busy()
207         conn = self.conn
208         mkdirp(self.cachedir)
209         # All cached idxs are extra until proven otherwise
210         extra = set()
211         for f in os.listdir(self.cachedir):
212             debug1(path_msg(f) + '\n')
213             if f.endswith(b'.idx'):
214                 extra.add(f)
215         needed = set()
216         conn.write(b'list-indexes\n')
217         for line in linereader(conn):
218             if not line:
219                 break
220             assert(line.find(b'/') < 0)
221             parts = line.split(b' ')
222             idx = parts[0]
223             if len(parts) == 2 and parts[1] == b'load' and idx not in extra:
224                 # If the server requests that we load an idx and we don't
225                 # already have a copy of it, it is needed
226                 needed.add(idx)
227             # Any idx that the server has heard of is proven not extra
228             extra.discard(idx)
229
230         self.check_ok()
231         debug1('client: removing extra indexes: %s\n' % extra)
232         for idx in extra:
233             os.unlink(os.path.join(self.cachedir, idx))
234         debug1('client: server requested load of: %s\n' % needed)
235         for idx in needed:
236             self.sync_index(idx)
237         git.auto_midx(self.cachedir)
238
239     def sync_index(self, name):
240         self._require_command(b'send-index')
241         #debug1('requesting %r\n' % name)
242         self.check_busy()
243         mkdirp(self.cachedir)
244         fn = os.path.join(self.cachedir, name)
245         if os.path.exists(fn):
246             msg = ("won't request existing .idx, try `bup bloom --check %s`"
247                    % path_msg(fn))
248             raise ClientError(msg)
249         self.conn.write(b'send-index %s\n' % name)
250         n = struct.unpack('!I', self.conn.read(4))[0]
251         assert(n)
252         with atomically_replaced_file(fn, 'wb') as f:
253             count = 0
254             progress('Receiving index from server: %d/%d\r' % (count, n))
255             for b in chunkyreader(self.conn, n):
256                 f.write(b)
257                 count += len(b)
258                 qprogress('Receiving index from server: %d/%d\r' % (count, n))
259             progress('Receiving index from server: %d/%d, done.\n' % (count, n))
260             self.check_ok()
261
262     def _make_objcache(self):
263         return git.PackIdxList(self.cachedir)
264
265     def _suggest_packs(self):
266         ob = self._busy
267         if ob:
268             assert(ob == b'receive-objects-v2')
269             self.conn.write(b'\xff\xff\xff\xff')  # suspend receive-objects-v2
270         suggested = []
271         for line in linereader(self.conn):
272             if not line:
273                 break
274             debug2('%r\n' % line)
275             if line.startswith(b'index '):
276                 idx = line[6:]
277                 debug1('client: received index suggestion: %s\n'
278                        % git.shorten_hash(idx).decode('ascii'))
279                 suggested.append(idx)
280             else:
281                 assert(line.endswith(b'.idx'))
282                 debug1('client: completed writing pack, idx: %s\n'
283                        % git.shorten_hash(line).decode('ascii'))
284                 suggested.append(line)
285         self.check_ok()
286         if ob:
287             self._busy = None
288         idx = None
289         for idx in suggested:
290             self.sync_index(idx)
291         git.auto_midx(self.cachedir)
292         if ob:
293             self._busy = ob
294             self.conn.write(b'%s\n' % ob)
295         return idx
296
297     def new_packwriter(self, compression_level=1,
298                        max_pack_size=None, max_pack_objects=None):
299         self._require_command(b'receive-objects-v2')
300         self.check_busy()
301         def _set_busy():
302             self._busy = b'receive-objects-v2'
303             self.conn.write(b'receive-objects-v2\n')
304         return PackWriter_Remote(self.conn,
305                                  objcache_maker = self._make_objcache,
306                                  suggest_packs = self._suggest_packs,
307                                  onopen = _set_busy,
308                                  onclose = self._not_busy,
309                                  ensure_busy = self.ensure_busy,
310                                  compression_level=compression_level,
311                                  max_pack_size=max_pack_size,
312                                  max_pack_objects=max_pack_objects)
313
314     def read_ref(self, refname):
315         self._require_command(b'read-ref')
316         self.check_busy()
317         self.conn.write(b'read-ref %s\n' % refname)
318         r = self.conn.readline().strip()
319         self.check_ok()
320         if r:
321             assert(len(r) == 40)   # hexified sha
322             return unhexlify(r)
323         else:
324             return None   # nonexistent ref
325
326     def update_ref(self, refname, newval, oldval):
327         self._require_command(b'update-ref')
328         self.check_busy()
329         self.conn.write(b'update-ref %s\n%s\n%s\n'
330                         % (refname, hexlify(newval),
331                            hexlify(oldval) if oldval else b''))
332         self.check_ok()
333
334     def join(self, id):
335         self._require_command(b'join')
336         self.check_busy()
337         self._busy = b'join'
338         # Send 'cat' so we'll work fine with older versions
339         self.conn.write(b'cat %s\n' % re.sub(br'[\n\r]', b'_', id))
340         while 1:
341             sz = struct.unpack('!I', self.conn.read(4))[0]
342             if not sz: break
343             yield self.conn.read(sz)
344         # FIXME: ok to assume the only NotOk is a KerError? (it is true atm)
345         e = self.check_ok()
346         self._not_busy()
347         if e:
348             raise KeyError(str(e))
349
350     def cat_batch(self, refs):
351         self._require_command(b'cat-batch')
352         self.check_busy()
353         self._busy = b'cat-batch'
354         conn = self.conn
355         conn.write(b'cat-batch\n')
356         # FIXME: do we want (only) binary protocol?
357         for ref in refs:
358             assert ref
359             assert b'\n' not in ref
360             conn.write(ref)
361             conn.write(b'\n')
362         conn.write(b'\n')
363         for ref in refs:
364             info = conn.readline()
365             if info == b'missing\n':
366                 yield None, None, None, None
367                 continue
368             if not (info and info.endswith(b'\n')):
369                 raise ClientError('Hit EOF while looking for object info: %r'
370                                   % info)
371             oidx, oid_t, size = info.split(b' ')
372             size = int(size)
373             cr = chunkyreader(conn, size)
374             yield oidx, oid_t, size, cr
375             detritus = next(cr, None)
376             if detritus:
377                 raise ClientError('unexpected leftover data ' + repr(detritus))
378         # FIXME: confusing
379         not_ok = self.check_ok()
380         if not_ok:
381             raise not_ok
382         self._not_busy()
383
384     def refs(self, patterns=None, limit_to_heads=False, limit_to_tags=False):
385         patterns = patterns or tuple()
386         self._require_command(b'refs')
387         self.check_busy()
388         self._busy = b'refs'
389         conn = self.conn
390         conn.write(b'refs %d %d\n' % (1 if limit_to_heads else 0,
391                                       1 if limit_to_tags else 0))
392         for pattern in patterns:
393             assert b'\n' not in pattern
394             conn.write(pattern)
395             conn.write(b'\n')
396         conn.write(b'\n')
397         for line in lines_until_sentinel(conn, b'\n', ClientError):
398             line = line[:-1]
399             oidx, name = line.split(b' ')
400             if len(oidx) != 40:
401                 raise ClientError('Invalid object fingerprint in %r' % line)
402             if not name:
403                 raise ClientError('Invalid reference name in %r' % line)
404             yield name, unhexlify(oidx)
405         # FIXME: confusing
406         not_ok = self.check_ok()
407         if not_ok:
408             raise not_ok
409         self._not_busy()
410
411     def rev_list(self, refs, parse=None, format=None):
412         """See git.rev_list for the general semantics, but note that with the
413         current interface, the parse function must be able to handle
414         (consume) any blank lines produced by the format because the
415         first one received that it doesn't consume will be interpreted
416         as a terminator for the entire rev-list result.
417
418         """
419         self._require_command(b'rev-list')
420         if format:
421             assert b'\n' not in format
422             assert parse
423         for ref in refs:
424             assert ref
425             assert b'\n' not in ref
426         self.check_busy()
427         self._busy = b'rev-list'
428         conn = self.conn
429         conn.write(b'rev-list\n')
430         conn.write(b'\n')
431         if format:
432             conn.write(format)
433         conn.write(b'\n')
434         for ref in refs:
435             conn.write(ref)
436             conn.write(b'\n')
437         conn.write(b'\n')
438         if not format:
439             for line in lines_until_sentinel(conn, b'\n', ClientError):
440                 line = line.strip()
441                 assert len(line) == 40
442                 yield line
443         else:
444             for line in lines_until_sentinel(conn, b'\n', ClientError):
445                 if not line.startswith(b'commit '):
446                     raise ClientError('unexpected line ' + repr(line))
447                 cmt_oidx = line[7:].strip()
448                 assert len(cmt_oidx) == 40
449                 yield cmt_oidx, parse(conn)
450         # FIXME: confusing
451         not_ok = self.check_ok()
452         if not_ok:
453             raise not_ok
454         self._not_busy()
455
456     def resolve(self, path, parent=None, want_meta=True, follow=False):
457         self._require_command(b'resolve')
458         self.check_busy()
459         self._busy = b'resolve'
460         conn = self.conn
461         conn.write(b'resolve %d\n' % ((1 if want_meta else 0)
462                                       | (2 if follow else 0)
463                                       | (4 if parent else 0)))
464         if parent:
465             vfs.write_resolution(conn, parent)
466         write_bvec(conn, path)
467         success = ord(conn.read(1))
468         assert success in (0, 1)
469         if success:
470             result = vfs.read_resolution(conn)
471         else:
472             result = vfs.read_ioerror(conn)
473         # FIXME: confusing
474         not_ok = self.check_ok()
475         if not_ok:
476             raise not_ok
477         self._not_busy()
478         if isinstance(result, vfs.IOError):
479             raise result
480         return result
481
482
483 class PackWriter_Remote(git.PackWriter):
484     def __init__(self, conn, objcache_maker, suggest_packs,
485                  onopen, onclose,
486                  ensure_busy,
487                  compression_level=1,
488                  max_pack_size=None,
489                  max_pack_objects=None):
490         git.PackWriter.__init__(self,
491                                 objcache_maker=objcache_maker,
492                                 compression_level=compression_level,
493                                 max_pack_size=max_pack_size,
494                                 max_pack_objects=max_pack_objects)
495         self.file = conn
496         self.filename = b'remote socket'
497         self.suggest_packs = suggest_packs
498         self.onopen = onopen
499         self.onclose = onclose
500         self.ensure_busy = ensure_busy
501         self._packopen = False
502         self._bwcount = 0
503         self._bwtime = time.time()
504
505     def _open(self):
506         if not self._packopen:
507             self.onopen()
508             self._packopen = True
509
510     def _end(self, run_midx=True):
511         assert(run_midx)  # We don't support this via remote yet
512         if self._packopen and self.file:
513             self.file.write(b'\0\0\0\0')
514             self._packopen = False
515             self.onclose() # Unbusy
516             self.objcache = None
517             return self.suggest_packs() # Returns last idx received
518         return None
519
520     def close(self):
521         id = self._end()
522         self.file = None
523         return id
524
525     def abort(self):
526         raise ClientError("don't know how to abort remote pack writing")
527
528     def _raw_write(self, datalist, sha):
529         assert(self.file)
530         if not self._packopen:
531             self._open()
532         self.ensure_busy()
533         data = b''.join(datalist)
534         assert(data)
535         assert(sha)
536         crc = zlib.crc32(data) & 0xffffffff
537         outbuf = b''.join((struct.pack('!I', len(data) + 20 + 4),
538                            sha,
539                            struct.pack('!I', crc),
540                            data))
541         try:
542             (self._bwcount, self._bwtime) = _raw_write_bwlimit(
543                     self.file, outbuf, self._bwcount, self._bwtime)
544         except IOError as e:
545             reraise(ClientError(e))
546         self.outbytes += len(data)
547         self.count += 1
548
549         if self.file.has_input():
550             self.suggest_packs()
551             self.objcache.refresh()
552
553         return sha, crc