]> arthur.barton.de Git - bup.git/blob - lib/bup/client.py
25320545b876dc034663daa6a3c9c22496d01700
[bup.git] / lib / bup / client.py
1
2 from __future__ import print_function
3
4 from __future__ import absolute_import
5 from binascii import hexlify, unhexlify
6 import errno, os, re, struct, time, zlib
7 import socket
8
9 from bup import git, ssh, vfs
10 from bup.compat import environ, range, reraise
11 from bup.helpers import (Conn, atomically_replaced_file, chunkyreader, debug1,
12                          debug2, linereader, lines_until_sentinel,
13                          mkdirp, progress, qprogress, DemuxConn)
14 from bup.io import path_msg
15 from bup.vint import write_bvec
16
17
18 bwlimit = None
19
20
21 class ClientError(Exception):
22     pass
23
24
25 def _raw_write_bwlimit(f, buf, bwcount, bwtime):
26     if not bwlimit:
27         f.write(buf)
28         return (len(buf), time.time())
29     else:
30         # We want to write in reasonably large blocks, but not so large that
31         # they're likely to overflow a router's queue.  So our bwlimit timing
32         # has to be pretty granular.  Also, if it takes too long from one
33         # transmit to the next, we can't just make up for lost time to bring
34         # the average back up to bwlimit - that will risk overflowing the
35         # outbound queue, which defeats the purpose.  So if we fall behind
36         # by more than one block delay, we shouldn't ever try to catch up.
37         for i in range(0,len(buf),4096):
38             now = time.time()
39             next = max(now, bwtime + 1.0*bwcount/bwlimit)
40             time.sleep(next-now)
41             sub = buf[i:i+4096]
42             f.write(sub)
43             bwcount = len(sub)  # might be less than 4096
44             bwtime = next
45         return (bwcount, bwtime)
46
47
48 _protocol_rs = br'([a-z]+)://'
49 _host_rs = br'(?P<sb>\[)?((?(sb)[0-9a-f:]+|[^:/]+))(?(sb)\])'
50 _port_rs = br'(?::(\d+))?'
51 _path_rs = br'(/.*)?'
52 _url_rx = re.compile(br'%s(?:%s%s)?%s' % (_protocol_rs, _host_rs, _port_rs, _path_rs),
53                      re.I)
54
55 def parse_remote(remote):
56     url_match = _url_rx.match(remote)
57     if url_match:
58         if not url_match.group(1) in (b'ssh', b'bup', b'file'):
59             raise ClientError('unexpected protocol: %s'
60                               % url_match.group(1).decode('ascii'))
61         return url_match.group(1,3,4,5)
62     else:
63         rs = remote.split(b':', 1)
64         if len(rs) == 1 or rs[0] in (b'', b'-'):
65             return b'file', None, None, rs[-1]
66         else:
67             return b'ssh', rs[0], None, rs[1]
68
69
70 class Client:
71     def __init__(self, remote, create=False):
72         self._busy = self.conn = None
73         self.sock = self.p = self.pout = self.pin = None
74         is_reverse = environ.get(b'BUP_SERVER_REVERSE')
75         if is_reverse:
76             assert(not remote)
77             remote = b'%s:' % is_reverse
78         (self.protocol, self.host, self.port, self.dir) = parse_remote(remote)
79         # The b'None' here matches python2's behavior of b'%s' % None == 'None',
80         # python3 will (as of version 3.7.5) do the same for str ('%s' % None),
81         # but crashes instead when doing b'%s' % None.
82         cachehost = b'None' if self.host is None else self.host
83         cachedir = b'None' if self.dir is None else self.dir
84         self.cachedir = git.repo(b'index-cache/%s'
85                                  % re.sub(br'[^@\w]',
86                                           b'_',
87                                           b'%s:%s' % (cachehost, cachedir)))
88         if is_reverse:
89             self.pout = os.fdopen(3, 'rb')
90             self.pin = os.fdopen(4, 'wb')
91             self.conn = Conn(self.pout, self.pin)
92         else:
93             if self.protocol in (b'ssh', b'file'):
94                 try:
95                     # FIXME: ssh and file shouldn't use the same module
96                     self.p = ssh.connect(self.host, self.port, b'server')
97                     self.pout = self.p.stdout
98                     self.pin = self.p.stdin
99                     self.conn = Conn(self.pout, self.pin)
100                 except OSError as e:
101                     reraise(ClientError('connect: %s' % e))
102             elif self.protocol == b'bup':
103                 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
104                 self.sock.connect((self.host,
105                                    1982 if self.port is None else int(self.port)))
106                 self.sockw = self.sock.makefile('wb')
107                 self.conn = DemuxConn(self.sock.fileno(), self.sockw)
108         self._available_commands = self._get_available_commands()
109         self._require_command(b'init-dir')
110         self._require_command(b'set-dir')
111         if self.dir:
112             self.dir = re.sub(br'[\r\n]', ' ', self.dir)
113             if create:
114                 self.conn.write(b'init-dir %s\n' % self.dir)
115             else:
116                 self.conn.write(b'set-dir %s\n' % self.dir)
117             self.check_ok()
118         self.sync_indexes()
119
120     def __del__(self):
121         try:
122             self.close()
123         except IOError as e:
124             if e.errno == errno.EPIPE:
125                 pass
126             else:
127                 raise
128
129     def close(self):
130         if self.conn and not self._busy:
131             self.conn.write(b'quit\n')
132         if self.pin:
133             self.pin.close()
134         if self.sock and self.sockw:
135             self.sockw.close()
136             self.sock.shutdown(socket.SHUT_WR)
137         if self.conn:
138             self.conn.close()
139         if self.pout:
140             self.pout.close()
141         if self.sock:
142             self.sock.close()
143         if self.p:
144             self.p.wait()
145             rv = self.p.wait()
146             if rv:
147                 raise ClientError('server tunnel returned exit code %d' % rv)
148         self.conn = None
149         self.sock = self.p = self.pin = self.pout = None
150
151     def check_ok(self):
152         if self.p:
153             rv = self.p.poll()
154             if rv != None:
155                 raise ClientError('server exited unexpectedly with code %r'
156                                   % rv)
157         try:
158             return self.conn.check_ok()
159         except Exception as e:
160             reraise(ClientError(e))
161             # reraise doesn't return
162             return None
163
164     def check_busy(self):
165         if self._busy:
166             raise ClientError('already busy with command %r' % self._busy)
167
168     def ensure_busy(self):
169         if not self._busy:
170             raise ClientError('expected to be busy, but not busy?!')
171
172     def _not_busy(self):
173         self._busy = None
174
175     def _get_available_commands(self):
176         self.check_busy()
177         self._busy = b'help'
178         conn = self.conn
179         conn.write(b'help\n')
180         result = set()
181         line = self.conn.readline()
182         if not line == b'Commands:\n':
183             raise ClientError('unexpected help header ' + repr(line))
184         while True:
185             line = self.conn.readline()
186             if line == b'\n':
187                 break
188             if not line.startswith(b'    '):
189                 raise ClientError('unexpected help line ' + repr(line))
190             cmd = line.strip()
191             if not cmd:
192                 raise ClientError('unexpected help line ' + repr(line))
193             result.add(cmd)
194         # FIXME: confusing
195         not_ok = self.check_ok()
196         if not_ok:
197             raise not_ok
198         self._not_busy()
199         return frozenset(result)
200
201     def _require_command(self, name):
202         if name not in self._available_commands:
203             raise ClientError('server does not appear to provide %s command'
204                               % name.encode('ascii'))
205
206     def sync_indexes(self):
207         self._require_command(b'list-indexes')
208         self.check_busy()
209         conn = self.conn
210         mkdirp(self.cachedir)
211         # All cached idxs are extra until proven otherwise
212         extra = set()
213         for f in os.listdir(self.cachedir):
214             debug1(path_msg(f) + '\n')
215             if f.endswith(b'.idx'):
216                 extra.add(f)
217         needed = set()
218         conn.write(b'list-indexes\n')
219         for line in linereader(conn):
220             if not line:
221                 break
222             assert(line.find(b'/') < 0)
223             parts = line.split(b' ')
224             idx = parts[0]
225             if len(parts) == 2 and parts[1] == b'load' and idx not in extra:
226                 # If the server requests that we load an idx and we don't
227                 # already have a copy of it, it is needed
228                 needed.add(idx)
229             # Any idx that the server has heard of is proven not extra
230             extra.discard(idx)
231
232         self.check_ok()
233         debug1('client: removing extra indexes: %s\n' % extra)
234         for idx in extra:
235             os.unlink(os.path.join(self.cachedir, idx))
236         debug1('client: server requested load of: %s\n' % needed)
237         for idx in needed:
238             self.sync_index(idx)
239         git.auto_midx(self.cachedir)
240
241     def sync_index(self, name):
242         self._require_command(b'send-index')
243         #debug1('requesting %r\n' % name)
244         self.check_busy()
245         mkdirp(self.cachedir)
246         fn = os.path.join(self.cachedir, name)
247         if os.path.exists(fn):
248             msg = ("won't request existing .idx, try `bup bloom --check %s`"
249                    % path_msg(fn))
250             raise ClientError(msg)
251         self.conn.write(b'send-index %s\n' % name)
252         n = struct.unpack('!I', self.conn.read(4))[0]
253         assert(n)
254         with atomically_replaced_file(fn, 'wb') as f:
255             count = 0
256             progress('Receiving index from server: %d/%d\r' % (count, n))
257             for b in chunkyreader(self.conn, n):
258                 f.write(b)
259                 count += len(b)
260                 qprogress('Receiving index from server: %d/%d\r' % (count, n))
261             progress('Receiving index from server: %d/%d, done.\n' % (count, n))
262             self.check_ok()
263
264     def _make_objcache(self):
265         return git.PackIdxList(self.cachedir)
266
267     def _suggest_packs(self):
268         ob = self._busy
269         if ob:
270             assert(ob == b'receive-objects-v2')
271             self.conn.write(b'\xff\xff\xff\xff')  # suspend receive-objects-v2
272         suggested = []
273         for line in linereader(self.conn):
274             if not line:
275                 break
276             debug2('%r\n' % line)
277             if line.startswith(b'index '):
278                 idx = line[6:]
279                 debug1('client: received index suggestion: %s\n'
280                        % git.shorten_hash(idx).decode('ascii'))
281                 suggested.append(idx)
282             else:
283                 assert(line.endswith(b'.idx'))
284                 debug1('client: completed writing pack, idx: %s\n'
285                        % git.shorten_hash(line).decode('ascii'))
286                 suggested.append(line)
287         self.check_ok()
288         if ob:
289             self._busy = None
290         idx = None
291         for idx in suggested:
292             self.sync_index(idx)
293         git.auto_midx(self.cachedir)
294         if ob:
295             self._busy = ob
296             self.conn.write(b'%s\n' % ob)
297         return idx
298
299     def new_packwriter(self, compression_level=1,
300                        max_pack_size=None, max_pack_objects=None):
301         self._require_command(b'receive-objects-v2')
302         self.check_busy()
303         def _set_busy():
304             self._busy = b'receive-objects-v2'
305             self.conn.write(b'receive-objects-v2\n')
306         return PackWriter_Remote(self.conn,
307                                  objcache_maker = self._make_objcache,
308                                  suggest_packs = self._suggest_packs,
309                                  onopen = _set_busy,
310                                  onclose = self._not_busy,
311                                  ensure_busy = self.ensure_busy,
312                                  compression_level=compression_level,
313                                  max_pack_size=max_pack_size,
314                                  max_pack_objects=max_pack_objects)
315
316     def read_ref(self, refname):
317         self._require_command(b'read-ref')
318         self.check_busy()
319         self.conn.write(b'read-ref %s\n' % refname)
320         r = self.conn.readline().strip()
321         self.check_ok()
322         if r:
323             assert(len(r) == 40)   # hexified sha
324             return unhexlify(r)
325         else:
326             return None   # nonexistent ref
327
328     def update_ref(self, refname, newval, oldval):
329         self._require_command(b'update-ref')
330         self.check_busy()
331         self.conn.write(b'update-ref %s\n%s\n%s\n'
332                         % (refname, hexlify(newval),
333                            hexlify(oldval) if oldval else b''))
334         self.check_ok()
335
336     def join(self, id):
337         self._require_command(b'join')
338         self.check_busy()
339         self._busy = b'join'
340         # Send 'cat' so we'll work fine with older versions
341         self.conn.write(b'cat %s\n' % re.sub(br'[\n\r]', b'_', id))
342         while 1:
343             sz = struct.unpack('!I', self.conn.read(4))[0]
344             if not sz: break
345             yield self.conn.read(sz)
346         # FIXME: ok to assume the only NotOk is a KerError? (it is true atm)
347         e = self.check_ok()
348         self._not_busy()
349         if e:
350             raise KeyError(str(e))
351
352     def cat_batch(self, refs):
353         self._require_command(b'cat-batch')
354         self.check_busy()
355         self._busy = b'cat-batch'
356         conn = self.conn
357         conn.write(b'cat-batch\n')
358         # FIXME: do we want (only) binary protocol?
359         for ref in refs:
360             assert ref
361             assert b'\n' not in ref
362             conn.write(ref)
363             conn.write(b'\n')
364         conn.write(b'\n')
365         for ref in refs:
366             info = conn.readline()
367             if info == b'missing\n':
368                 yield None, None, None, None
369                 continue
370             if not (info and info.endswith(b'\n')):
371                 raise ClientError('Hit EOF while looking for object info: %r'
372                                   % info)
373             oidx, oid_t, size = info.split(b' ')
374             size = int(size)
375             cr = chunkyreader(conn, size)
376             yield oidx, oid_t, size, cr
377             detritus = next(cr, None)
378             if detritus:
379                 raise ClientError('unexpected leftover data ' + repr(detritus))
380         # FIXME: confusing
381         not_ok = self.check_ok()
382         if not_ok:
383             raise not_ok
384         self._not_busy()
385
386     def refs(self, patterns=None, limit_to_heads=False, limit_to_tags=False):
387         patterns = patterns or tuple()
388         self._require_command(b'refs')
389         self.check_busy()
390         self._busy = b'refs'
391         conn = self.conn
392         conn.write(b'refs %d %d\n' % (1 if limit_to_heads else 0,
393                                       1 if limit_to_tags else 0))
394         for pattern in patterns:
395             assert b'\n' not in pattern
396             conn.write(pattern)
397             conn.write(b'\n')
398         conn.write(b'\n')
399         for line in lines_until_sentinel(conn, b'\n', ClientError):
400             line = line[:-1]
401             oidx, name = line.split(b' ')
402             if len(oidx) != 40:
403                 raise ClientError('Invalid object fingerprint in %r' % line)
404             if not name:
405                 raise ClientError('Invalid reference name in %r' % line)
406             yield name, unhexlify(oidx)
407         # FIXME: confusing
408         not_ok = self.check_ok()
409         if not_ok:
410             raise not_ok
411         self._not_busy()
412
413     def rev_list(self, refs, parse=None, format=None):
414         """See git.rev_list for the general semantics, but note that with the
415         current interface, the parse function must be able to handle
416         (consume) any blank lines produced by the format because the
417         first one received that it doesn't consume will be interpreted
418         as a terminator for the entire rev-list result.
419
420         """
421         self._require_command(b'rev-list')
422         if format:
423             assert b'\n' not in format
424             assert parse
425         for ref in refs:
426             assert ref
427             assert b'\n' not in ref
428         self.check_busy()
429         self._busy = b'rev-list'
430         conn = self.conn
431         conn.write(b'rev-list\n')
432         conn.write(b'\n')
433         if format:
434             conn.write(format)
435         conn.write(b'\n')
436         for ref in refs:
437             conn.write(ref)
438             conn.write(b'\n')
439         conn.write(b'\n')
440         if not format:
441             for line in lines_until_sentinel(conn, b'\n', ClientError):
442                 line = line.strip()
443                 assert len(line) == 40
444                 yield line
445         else:
446             for line in lines_until_sentinel(conn, b'\n', ClientError):
447                 if not line.startswith(b'commit '):
448                     raise ClientError('unexpected line ' + repr(line))
449                 cmt_oidx = line[7:].strip()
450                 assert len(cmt_oidx) == 40
451                 yield cmt_oidx, parse(conn)
452         # FIXME: confusing
453         not_ok = self.check_ok()
454         if not_ok:
455             raise not_ok
456         self._not_busy()
457
458     def resolve(self, path, parent=None, want_meta=True, follow=False):
459         self._require_command(b'resolve')
460         self.check_busy()
461         self._busy = b'resolve'
462         conn = self.conn
463         conn.write(b'resolve %d\n' % ((1 if want_meta else 0)
464                                       | (2 if follow else 0)
465                                       | (4 if parent else 0)))
466         if parent:
467             vfs.write_resolution(conn, parent)
468         write_bvec(conn, path)
469         success = ord(conn.read(1))
470         assert success in (0, 1)
471         if success:
472             result = vfs.read_resolution(conn)
473         else:
474             result = vfs.read_ioerror(conn)
475         # FIXME: confusing
476         not_ok = self.check_ok()
477         if not_ok:
478             raise not_ok
479         self._not_busy()
480         if isinstance(result, vfs.IOError):
481             raise result
482         return result
483
484
485 class PackWriter_Remote(git.PackWriter):
486     def __init__(self, conn, objcache_maker, suggest_packs,
487                  onopen, onclose,
488                  ensure_busy,
489                  compression_level=1,
490                  max_pack_size=None,
491                  max_pack_objects=None):
492         git.PackWriter.__init__(self,
493                                 objcache_maker=objcache_maker,
494                                 compression_level=compression_level,
495                                 max_pack_size=max_pack_size,
496                                 max_pack_objects=max_pack_objects)
497         self.file = conn
498         self.filename = b'remote socket'
499         self.suggest_packs = suggest_packs
500         self.onopen = onopen
501         self.onclose = onclose
502         self.ensure_busy = ensure_busy
503         self._packopen = False
504         self._bwcount = 0
505         self._bwtime = time.time()
506
507     def _open(self):
508         if not self._packopen:
509             self.onopen()
510             self._packopen = True
511
512     def _end(self, run_midx=True):
513         assert(run_midx)  # We don't support this via remote yet
514         if self._packopen and self.file:
515             self.file.write(b'\0\0\0\0')
516             self._packopen = False
517             self.onclose() # Unbusy
518             self.objcache = None
519             return self.suggest_packs() # Returns last idx received
520         return None
521
522     def close(self):
523         id = self._end()
524         self.file = None
525         return id
526
527     def abort(self):
528         raise ClientError("don't know how to abort remote pack writing")
529
530     def _raw_write(self, datalist, sha):
531         assert(self.file)
532         if not self._packopen:
533             self._open()
534         self.ensure_busy()
535         data = b''.join(datalist)
536         assert(data)
537         assert(sha)
538         crc = zlib.crc32(data) & 0xffffffff
539         outbuf = b''.join((struct.pack('!I', len(data) + 20 + 4),
540                            sha,
541                            struct.pack('!I', crc),
542                            data))
543         try:
544             (self._bwcount, self._bwtime) = _raw_write_bwlimit(
545                     self.file, outbuf, self._bwcount, self._bwtime)
546         except IOError as e:
547             reraise(ClientError(e))
548         self.outbytes += len(data)
549         self.count += 1
550
551         if self.file.has_input():
552             self.suggest_packs()
553             self.objcache.refresh()
554
555         return sha, crc