]> arthur.barton.de Git - bup.git/blob - lib/bup/client.py
git: split out idx file writing to a separate class
[bup.git] / lib / bup / client.py
1
2 from __future__ import print_function
3
4 from __future__ import absolute_import
5 from binascii import hexlify, unhexlify
6 import errno, os, re, struct, sys, time, zlib
7 import socket
8
9 from bup import git, ssh, vfs
10 from bup.compat import environ, range, reraise
11 from bup.helpers import (Conn, atomically_replaced_file, chunkyreader, debug1,
12                          debug2, linereader, lines_until_sentinel,
13                          mkdirp, progress, qprogress, DemuxConn, atoi)
14 from bup.io import path_msg
15 from bup.vint import read_bvec, read_vuint, write_bvec
16
17
18 bwlimit = None
19
20
21 class ClientError(Exception):
22     pass
23
24
25 def _raw_write_bwlimit(f, buf, bwcount, bwtime):
26     if not bwlimit:
27         f.write(buf)
28         return (len(buf), time.time())
29     else:
30         # We want to write in reasonably large blocks, but not so large that
31         # they're likely to overflow a router's queue.  So our bwlimit timing
32         # has to be pretty granular.  Also, if it takes too long from one
33         # transmit to the next, we can't just make up for lost time to bring
34         # the average back up to bwlimit - that will risk overflowing the
35         # outbound queue, which defeats the purpose.  So if we fall behind
36         # by more than one block delay, we shouldn't ever try to catch up.
37         for i in range(0,len(buf),4096):
38             now = time.time()
39             next = max(now, bwtime + 1.0*bwcount/bwlimit)
40             time.sleep(next-now)
41             sub = buf[i:i+4096]
42             f.write(sub)
43             bwcount = len(sub)  # might be less than 4096
44             bwtime = next
45         return (bwcount, bwtime)
46
47
48 _protocol_rs = br'([a-z]+)://'
49 _host_rs = br'(?P<sb>\[)?((?(sb)[0-9a-f:]+|[^:/]+))(?(sb)\])'
50 _port_rs = br'(?::(\d+))?'
51 _path_rs = br'(/.*)?'
52 _url_rx = re.compile(br'%s(?:%s%s)?%s' % (_protocol_rs, _host_rs, _port_rs, _path_rs),
53                      re.I)
54
55 def parse_remote(remote):
56     url_match = _url_rx.match(remote)
57     if url_match:
58         if not url_match.group(1) in (b'ssh', b'bup', b'file'):
59             raise ClientError('unexpected protocol: %s'
60                               % url_match.group(1).decode('ascii'))
61         return url_match.group(1,3,4,5)
62     else:
63         rs = remote.split(b':', 1)
64         if len(rs) == 1 or rs[0] in (b'', b'-'):
65             return b'file', None, None, rs[-1]
66         else:
67             return b'ssh', rs[0], None, rs[1]
68
69
70 class Client:
71     def __init__(self, remote, create=False):
72         self._busy = self.conn = None
73         self.sock = self.p = self.pout = self.pin = None
74         is_reverse = environ.get(b'BUP_SERVER_REVERSE')
75         if is_reverse:
76             assert(not remote)
77             remote = b'%s:' % is_reverse
78         (self.protocol, self.host, self.port, self.dir) = parse_remote(remote)
79         # The b'None' here matches python2's behavior of b'%s' % None == 'None',
80         # python3 will (as of version 3.7.5) do the same for str ('%s' % None),
81         # but crashes instead when doing b'%s' % None.
82         cachehost = b'None' if self.host is None else self.host
83         cachedir = b'None' if self.dir is None else self.dir
84         self.cachedir = git.repo(b'index-cache/%s'
85                                  % re.sub(br'[^@\w]',
86                                           b'_',
87                                           b'%s:%s' % (cachehost, cachedir)))
88         if is_reverse:
89             self.pout = os.fdopen(3, 'rb')
90             self.pin = os.fdopen(4, 'wb')
91             self.conn = Conn(self.pout, self.pin)
92         else:
93             if self.protocol in (b'ssh', b'file'):
94                 try:
95                     # FIXME: ssh and file shouldn't use the same module
96                     self.p = ssh.connect(self.host, self.port, b'server')
97                     self.pout = self.p.stdout
98                     self.pin = self.p.stdin
99                     self.conn = Conn(self.pout, self.pin)
100                 except OSError as e:
101                     reraise(ClientError('connect: %s' % e))
102             elif self.protocol == b'bup':
103                 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
104                 self.sock.connect((self.host, atoi(self.port) or 1982))
105                 self.sockw = self.sock.makefile('wb')
106                 self.conn = DemuxConn(self.sock.fileno(), self.sockw)
107         self._available_commands = self._get_available_commands()
108         self._require_command(b'init-dir')
109         self._require_command(b'set-dir')
110         if self.dir:
111             self.dir = re.sub(br'[\r\n]', ' ', self.dir)
112             if create:
113                 self.conn.write(b'init-dir %s\n' % self.dir)
114             else:
115                 self.conn.write(b'set-dir %s\n' % self.dir)
116             self.check_ok()
117         self.sync_indexes()
118
119     def __del__(self):
120         try:
121             self.close()
122         except IOError as e:
123             if e.errno == errno.EPIPE:
124                 pass
125             else:
126                 raise
127
128     def close(self):
129         if self.conn and not self._busy:
130             self.conn.write(b'quit\n')
131         if self.pin:
132             self.pin.close()
133         if self.sock and self.sockw:
134             self.sockw.close()
135             self.sock.shutdown(socket.SHUT_WR)
136         if self.conn:
137             self.conn.close()
138         if self.pout:
139             self.pout.close()
140         if self.sock:
141             self.sock.close()
142         if self.p:
143             self.p.wait()
144             rv = self.p.wait()
145             if rv:
146                 raise ClientError('server tunnel returned exit code %d' % rv)
147         self.conn = None
148         self.sock = self.p = self.pin = self.pout = None
149
150     def check_ok(self):
151         if self.p:
152             rv = self.p.poll()
153             if rv != None:
154                 raise ClientError('server exited unexpectedly with code %r'
155                                   % rv)
156         try:
157             return self.conn.check_ok()
158         except Exception as e:
159             reraise(ClientError(e))
160
161     def check_busy(self):
162         if self._busy:
163             raise ClientError('already busy with command %r' % self._busy)
164         
165     def ensure_busy(self):
166         if not self._busy:
167             raise ClientError('expected to be busy, but not busy?!')
168         
169     def _not_busy(self):
170         self._busy = None
171
172     def _get_available_commands(self):
173         self.check_busy()
174         self._busy = b'help'
175         conn = self.conn
176         conn.write(b'help\n')
177         result = set()
178         line = self.conn.readline()
179         if not line == b'Commands:\n':
180             raise ClientError('unexpected help header ' + repr(line))
181         while True:
182             line = self.conn.readline()
183             if line == b'\n':
184                 break
185             if not line.startswith(b'    '):
186                 raise ClientError('unexpected help line ' + repr(line))
187             cmd = line.strip()
188             if not cmd:
189                 raise ClientError('unexpected help line ' + repr(line))
190             result.add(cmd)
191         # FIXME: confusing
192         not_ok = self.check_ok()
193         if not_ok:
194             raise not_ok
195         self._not_busy()
196         return frozenset(result)
197
198     def _require_command(self, name):
199         if name not in self._available_commands:
200             raise ClientError('server does not appear to provide %s command'
201                               % name.encode('ascii'))
202
203     def sync_indexes(self):
204         self._require_command(b'list-indexes')
205         self.check_busy()
206         conn = self.conn
207         mkdirp(self.cachedir)
208         # All cached idxs are extra until proven otherwise
209         extra = set()
210         for f in os.listdir(self.cachedir):
211             debug1(path_msg(f) + '\n')
212             if f.endswith(b'.idx'):
213                 extra.add(f)
214         needed = set()
215         conn.write(b'list-indexes\n')
216         for line in linereader(conn):
217             if not line:
218                 break
219             assert(line.find(b'/') < 0)
220             parts = line.split(b' ')
221             idx = parts[0]
222             if len(parts) == 2 and parts[1] == b'load' and idx not in extra:
223                 # If the server requests that we load an idx and we don't
224                 # already have a copy of it, it is needed
225                 needed.add(idx)
226             # Any idx that the server has heard of is proven not extra
227             extra.discard(idx)
228
229         self.check_ok()
230         debug1('client: removing extra indexes: %s\n' % extra)
231         for idx in extra:
232             os.unlink(os.path.join(self.cachedir, idx))
233         debug1('client: server requested load of: %s\n' % needed)
234         for idx in needed:
235             self.sync_index(idx)
236         git.auto_midx(self.cachedir)
237
238     def sync_index(self, name):
239         self._require_command(b'send-index')
240         #debug1('requesting %r\n' % name)
241         self.check_busy()
242         mkdirp(self.cachedir)
243         fn = os.path.join(self.cachedir, name)
244         if os.path.exists(fn):
245             msg = ("won't request existing .idx, try `bup bloom --check %s`"
246                    % path_msg(fn))
247             raise ClientError(msg)
248         self.conn.write(b'send-index %s\n' % name)
249         n = struct.unpack('!I', self.conn.read(4))[0]
250         assert(n)
251         with atomically_replaced_file(fn, 'wb') as f:
252             count = 0
253             progress('Receiving index from server: %d/%d\r' % (count, n))
254             for b in chunkyreader(self.conn, n):
255                 f.write(b)
256                 count += len(b)
257                 qprogress('Receiving index from server: %d/%d\r' % (count, n))
258             progress('Receiving index from server: %d/%d, done.\n' % (count, n))
259             self.check_ok()
260
261     def _make_objcache(self):
262         return git.PackIdxList(self.cachedir)
263
264     def _suggest_packs(self):
265         ob = self._busy
266         if ob:
267             assert(ob == b'receive-objects-v2')
268             self.conn.write(b'\xff\xff\xff\xff')  # suspend receive-objects-v2
269         suggested = []
270         for line in linereader(self.conn):
271             if not line:
272                 break
273             debug2('%r\n' % line)
274             if line.startswith(b'index '):
275                 idx = line[6:]
276                 debug1('client: received index suggestion: %s\n'
277                        % git.shorten_hash(idx).decode('ascii'))
278                 suggested.append(idx)
279             else:
280                 assert(line.endswith(b'.idx'))
281                 debug1('client: completed writing pack, idx: %s\n'
282                        % git.shorten_hash(line).decode('ascii'))
283                 suggested.append(line)
284         self.check_ok()
285         if ob:
286             self._busy = None
287         idx = None
288         for idx in suggested:
289             self.sync_index(idx)
290         git.auto_midx(self.cachedir)
291         if ob:
292             self._busy = ob
293             self.conn.write(b'%s\n' % ob)
294         return idx
295
296     def new_packwriter(self, compression_level=1,
297                        max_pack_size=None, max_pack_objects=None):
298         self._require_command(b'receive-objects-v2')
299         self.check_busy()
300         def _set_busy():
301             self._busy = b'receive-objects-v2'
302             self.conn.write(b'receive-objects-v2\n')
303         return PackWriter_Remote(self.conn,
304                                  objcache_maker = self._make_objcache,
305                                  suggest_packs = self._suggest_packs,
306                                  onopen = _set_busy,
307                                  onclose = self._not_busy,
308                                  ensure_busy = self.ensure_busy,
309                                  compression_level=compression_level,
310                                  max_pack_size=max_pack_size,
311                                  max_pack_objects=max_pack_objects)
312
313     def read_ref(self, refname):
314         self._require_command(b'read-ref')
315         self.check_busy()
316         self.conn.write(b'read-ref %s\n' % refname)
317         r = self.conn.readline().strip()
318         self.check_ok()
319         if r:
320             assert(len(r) == 40)   # hexified sha
321             return unhexlify(r)
322         else:
323             return None   # nonexistent ref
324
325     def update_ref(self, refname, newval, oldval):
326         self._require_command(b'update-ref')
327         self.check_busy()
328         self.conn.write(b'update-ref %s\n%s\n%s\n'
329                         % (refname, hexlify(newval),
330                            hexlify(oldval) if oldval else b''))
331         self.check_ok()
332
333     def join(self, id):
334         self._require_command(b'join')
335         self.check_busy()
336         self._busy = b'join'
337         # Send 'cat' so we'll work fine with older versions
338         self.conn.write(b'cat %s\n' % re.sub(br'[\n\r]', b'_', id))
339         while 1:
340             sz = struct.unpack('!I', self.conn.read(4))[0]
341             if not sz: break
342             yield self.conn.read(sz)
343         # FIXME: ok to assume the only NotOk is a KerError? (it is true atm)
344         e = self.check_ok()
345         self._not_busy()
346         if e:
347             raise KeyError(str(e))
348
349     def cat_batch(self, refs):
350         self._require_command(b'cat-batch')
351         self.check_busy()
352         self._busy = b'cat-batch'
353         conn = self.conn
354         conn.write(b'cat-batch\n')
355         # FIXME: do we want (only) binary protocol?
356         for ref in refs:
357             assert ref
358             assert b'\n' not in ref
359             conn.write(ref)
360             conn.write(b'\n')
361         conn.write(b'\n')
362         for ref in refs:
363             info = conn.readline()
364             if info == b'missing\n':
365                 yield None, None, None, None
366                 continue
367             if not (info and info.endswith(b'\n')):
368                 raise ClientError('Hit EOF while looking for object info: %r'
369                                   % info)
370             oidx, oid_t, size = info.split(b' ')
371             size = int(size)
372             cr = chunkyreader(conn, size)
373             yield oidx, oid_t, size, cr
374             detritus = next(cr, None)
375             if detritus:
376                 raise ClientError('unexpected leftover data ' + repr(detritus))
377         # FIXME: confusing
378         not_ok = self.check_ok()
379         if not_ok:
380             raise not_ok
381         self._not_busy()
382
383     def refs(self, patterns=None, limit_to_heads=False, limit_to_tags=False):
384         patterns = patterns or tuple()
385         self._require_command(b'refs')
386         self.check_busy()
387         self._busy = b'refs'
388         conn = self.conn
389         conn.write(b'refs %d %d\n' % (1 if limit_to_heads else 0,
390                                       1 if limit_to_tags else 0))
391         for pattern in patterns:
392             assert b'\n' not in pattern
393             conn.write(pattern)
394             conn.write(b'\n')
395         conn.write(b'\n')
396         for line in lines_until_sentinel(conn, b'\n', ClientError):
397             line = line[:-1]
398             oidx, name = line.split(b' ')
399             if len(oidx) != 40:
400                 raise ClientError('Invalid object fingerprint in %r' % line)
401             if not name:
402                 raise ClientError('Invalid reference name in %r' % line)
403             yield name, unhexlify(oidx)
404         # FIXME: confusing
405         not_ok = self.check_ok()
406         if not_ok:
407             raise not_ok
408         self._not_busy()
409
410     def rev_list(self, refs, parse=None, format=None):
411         """See git.rev_list for the general semantics, but note that with the
412         current interface, the parse function must be able to handle
413         (consume) any blank lines produced by the format because the
414         first one received that it doesn't consume will be interpreted
415         as a terminator for the entire rev-list result.
416
417         """
418         self._require_command(b'rev-list')
419         if format:
420             assert b'\n' not in format
421             assert parse
422         for ref in refs:
423             assert ref
424             assert b'\n' not in ref
425         self.check_busy()
426         self._busy = b'rev-list'
427         conn = self.conn
428         conn.write(b'rev-list\n')
429         conn.write(b'\n')
430         if format:
431             conn.write(format)
432         conn.write(b'\n')
433         for ref in refs:
434             conn.write(ref)
435             conn.write(b'\n')
436         conn.write(b'\n')
437         if not format:
438             for line in lines_until_sentinel(conn, b'\n', ClientError):
439                 line = line.strip()
440                 assert len(line) == 40
441                 yield line
442         else:
443             for line in lines_until_sentinel(conn, b'\n', ClientError):
444                 if not line.startswith(b'commit '):
445                     raise ClientError('unexpected line ' + repr(line))
446                 cmt_oidx = line[7:].strip()
447                 assert len(cmt_oidx) == 40
448                 yield cmt_oidx, parse(conn)
449         # FIXME: confusing
450         not_ok = self.check_ok()
451         if not_ok:
452             raise not_ok
453         self._not_busy()
454
455     def resolve(self, path, parent=None, want_meta=True, follow=False):
456         self._require_command(b'resolve')
457         self.check_busy()
458         self._busy = b'resolve'
459         conn = self.conn
460         conn.write(b'resolve %d\n' % ((1 if want_meta else 0)
461                                       | (2 if follow else 0)
462                                       | (4 if parent else 0)))
463         if parent:
464             vfs.write_resolution(conn, parent)
465         write_bvec(conn, path)
466         success = ord(conn.read(1))
467         assert success in (0, 1)
468         if success:
469             result = vfs.read_resolution(conn)
470         else:
471             result = vfs.read_ioerror(conn)
472         # FIXME: confusing
473         not_ok = self.check_ok()
474         if not_ok:
475             raise not_ok
476         self._not_busy()
477         if isinstance(result, vfs.IOError):
478             raise result
479         return result
480
481
482 class PackWriter_Remote(git.PackWriter):
483     def __init__(self, conn, objcache_maker, suggest_packs,
484                  onopen, onclose,
485                  ensure_busy,
486                  compression_level=1,
487                  max_pack_size=None,
488                  max_pack_objects=None):
489         git.PackWriter.__init__(self,
490                                 objcache_maker=objcache_maker,
491                                 compression_level=compression_level,
492                                 max_pack_size=max_pack_size,
493                                 max_pack_objects=max_pack_objects)
494         self.file = conn
495         self.filename = b'remote socket'
496         self.suggest_packs = suggest_packs
497         self.onopen = onopen
498         self.onclose = onclose
499         self.ensure_busy = ensure_busy
500         self._packopen = False
501         self._bwcount = 0
502         self._bwtime = time.time()
503
504     def _open(self):
505         if not self._packopen:
506             self.onopen()
507             self._packopen = True
508
509     def _end(self, run_midx=True):
510         assert(run_midx)  # We don't support this via remote yet
511         if self._packopen and self.file:
512             self.file.write(b'\0\0\0\0')
513             self._packopen = False
514             self.onclose() # Unbusy
515             self.objcache = None
516             return self.suggest_packs() # Returns last idx received
517
518     def close(self):
519         id = self._end()
520         self.file = None
521         return id
522
523     def abort(self):
524         raise ClientError("don't know how to abort remote pack writing")
525
526     def _raw_write(self, datalist, sha):
527         assert(self.file)
528         if not self._packopen:
529             self._open()
530         self.ensure_busy()
531         data = b''.join(datalist)
532         assert(data)
533         assert(sha)
534         crc = zlib.crc32(data) & 0xffffffff
535         outbuf = b''.join((struct.pack('!I', len(data) + 20 + 4),
536                            sha,
537                            struct.pack('!I', crc),
538                            data))
539         try:
540             (self._bwcount, self._bwtime) = _raw_write_bwlimit(
541                     self.file, outbuf, self._bwcount, self._bwtime)
542         except IOError as e:
543             reraise(ClientError(e))
544         self.outbytes += len(data)
545         self.count += 1
546
547         if self.file.has_input():
548             self.suggest_packs()
549             self.objcache.refresh()
550
551         return sha, crc