]> arthur.barton.de Git - bup.git/blob - lib/bup/client.py
client: import atoi
[bup.git] / lib / bup / client.py
1
2 from __future__ import print_function
3
4 from __future__ import absolute_import
5 from binascii import hexlify, unhexlify
6 import errno, os, re, struct, sys, time, zlib
7
8 from bup import git, ssh, vfs
9 from bup.compat import environ, range, reraise
10 from bup.helpers import (Conn, atomically_replaced_file, chunkyreader, debug1,
11                          debug2, linereader, lines_until_sentinel,
12                          mkdirp, progress, qprogress, DemuxConn, atoi)
13 from bup.io import path_msg
14 from bup.vint import read_bvec, read_vuint, write_bvec
15
16
17 bwlimit = None
18
19
20 class ClientError(Exception):
21     pass
22
23
24 def _raw_write_bwlimit(f, buf, bwcount, bwtime):
25     if not bwlimit:
26         f.write(buf)
27         return (len(buf), time.time())
28     else:
29         # We want to write in reasonably large blocks, but not so large that
30         # they're likely to overflow a router's queue.  So our bwlimit timing
31         # has to be pretty granular.  Also, if it takes too long from one
32         # transmit to the next, we can't just make up for lost time to bring
33         # the average back up to bwlimit - that will risk overflowing the
34         # outbound queue, which defeats the purpose.  So if we fall behind
35         # by more than one block delay, we shouldn't ever try to catch up.
36         for i in range(0,len(buf),4096):
37             now = time.time()
38             next = max(now, bwtime + 1.0*bwcount/bwlimit)
39             time.sleep(next-now)
40             sub = buf[i:i+4096]
41             f.write(sub)
42             bwcount = len(sub)  # might be less than 4096
43             bwtime = next
44         return (bwcount, bwtime)
45
46
47 _protocol_rs = br'([a-z]+)://'
48 _host_rs = br'(?P<sb>\[)?((?(sb)[0-9a-f:]+|[^:/]+))(?(sb)\])'
49 _port_rs = br'(?::(\d+))?'
50 _path_rs = br'(/.*)?'
51 _url_rx = re.compile(br'%s(?:%s%s)?%s' % (_protocol_rs, _host_rs, _port_rs, _path_rs),
52                      re.I)
53
54 def parse_remote(remote):
55     url_match = _url_rx.match(remote)
56     if url_match:
57         if not url_match.group(1) in (b'ssh', b'bup', b'file'):
58             raise ClientError('unexpected protocol: %s'
59                               % url_match.group(1).decode('ascii'))
60         return url_match.group(1,3,4,5)
61     else:
62         rs = remote.split(b':', 1)
63         if len(rs) == 1 or rs[0] in (b'', b'-'):
64             return b'file', None, None, rs[-1]
65         else:
66             return b'ssh', rs[0], None, rs[1]
67
68
69 class Client:
70     def __init__(self, remote, create=False):
71         self._busy = self.conn = None
72         self.sock = self.p = self.pout = self.pin = None
73         is_reverse = environ.get(b'BUP_SERVER_REVERSE')
74         if is_reverse:
75             assert(not remote)
76             remote = b'%s:' % is_reverse
77         (self.protocol, self.host, self.port, self.dir) = parse_remote(remote)
78         # The b'None' here matches python2's behavior of b'%s' % None == 'None',
79         # python3 will (as of version 3.7.5) do the same for str ('%s' % None),
80         # but crashes instead when doing b'%s' % None.
81         cachehost = b'None' if self.host is None else self.host
82         cachedir = b'None' if self.dir is None else self.dir
83         self.cachedir = git.repo(b'index-cache/%s'
84                                  % re.sub(br'[^@\w]',
85                                           b'_',
86                                           b'%s:%s' % (cachehost, cachedir)))
87         if is_reverse:
88             self.pout = os.fdopen(3, 'rb')
89             self.pin = os.fdopen(4, 'wb')
90             self.conn = Conn(self.pout, self.pin)
91         else:
92             if self.protocol in (b'ssh', b'file'):
93                 try:
94                     # FIXME: ssh and file shouldn't use the same module
95                     self.p = ssh.connect(self.host, self.port, b'server')
96                     self.pout = self.p.stdout
97                     self.pin = self.p.stdin
98                     self.conn = Conn(self.pout, self.pin)
99                 except OSError as e:
100                     reraise(ClientError('connect: %s' % e))
101             elif self.protocol == b'bup':
102                 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
103                 self.sock.connect((self.host, atoi(self.port) or 1982))
104                 self.sockw = self.sock.makefile('wb')
105                 self.conn = DemuxConn(self.sock.fileno(), self.sockw)
106         self._available_commands = self._get_available_commands()
107         self._require_command(b'init-dir')
108         self._require_command(b'set-dir')
109         if self.dir:
110             self.dir = re.sub(br'[\r\n]', ' ', self.dir)
111             if create:
112                 self.conn.write(b'init-dir %s\n' % self.dir)
113             else:
114                 self.conn.write(b'set-dir %s\n' % self.dir)
115             self.check_ok()
116         self.sync_indexes()
117
118     def __del__(self):
119         try:
120             self.close()
121         except IOError as e:
122             if e.errno == errno.EPIPE:
123                 pass
124             else:
125                 raise
126
127     def close(self):
128         if self.conn and not self._busy:
129             self.conn.write(b'quit\n')
130         if self.pin:
131             self.pin.close()
132         if self.sock and self.sockw:
133             self.sockw.close()
134             self.sock.shutdown(socket.SHUT_WR)
135         if self.conn:
136             self.conn.close()
137         if self.pout:
138             self.pout.close()
139         if self.sock:
140             self.sock.close()
141         if self.p:
142             self.p.wait()
143             rv = self.p.wait()
144             if rv:
145                 raise ClientError('server tunnel returned exit code %d' % rv)
146         self.conn = None
147         self.sock = self.p = self.pin = self.pout = None
148
149     def check_ok(self):
150         if self.p:
151             rv = self.p.poll()
152             if rv != None:
153                 raise ClientError('server exited unexpectedly with code %r'
154                                   % rv)
155         try:
156             return self.conn.check_ok()
157         except Exception as e:
158             reraise(ClientError(e))
159
160     def check_busy(self):
161         if self._busy:
162             raise ClientError('already busy with command %r' % self._busy)
163         
164     def ensure_busy(self):
165         if not self._busy:
166             raise ClientError('expected to be busy, but not busy?!')
167         
168     def _not_busy(self):
169         self._busy = None
170
171     def _get_available_commands(self):
172         self.check_busy()
173         self._busy = b'help'
174         conn = self.conn
175         conn.write(b'help\n')
176         result = set()
177         line = self.conn.readline()
178         if not line == b'Commands:\n':
179             raise ClientError('unexpected help header ' + repr(line))
180         while True:
181             line = self.conn.readline()
182             if line == b'\n':
183                 break
184             if not line.startswith(b'    '):
185                 raise ClientError('unexpected help line ' + repr(line))
186             cmd = line.strip()
187             if not cmd:
188                 raise ClientError('unexpected help line ' + repr(line))
189             result.add(cmd)
190         # FIXME: confusing
191         not_ok = self.check_ok()
192         if not_ok:
193             raise not_ok
194         self._not_busy()
195         return frozenset(result)
196
197     def _require_command(self, name):
198         if name not in self._available_commands:
199             raise ClientError('server does not appear to provide %s command'
200                               % name.encode('ascii'))
201
202     def sync_indexes(self):
203         self._require_command(b'list-indexes')
204         self.check_busy()
205         conn = self.conn
206         mkdirp(self.cachedir)
207         # All cached idxs are extra until proven otherwise
208         extra = set()
209         for f in os.listdir(self.cachedir):
210             debug1(path_msg(f) + '\n')
211             if f.endswith(b'.idx'):
212                 extra.add(f)
213         needed = set()
214         conn.write(b'list-indexes\n')
215         for line in linereader(conn):
216             if not line:
217                 break
218             assert(line.find(b'/') < 0)
219             parts = line.split(b' ')
220             idx = parts[0]
221             if len(parts) == 2 and parts[1] == b'load' and idx not in extra:
222                 # If the server requests that we load an idx and we don't
223                 # already have a copy of it, it is needed
224                 needed.add(idx)
225             # Any idx that the server has heard of is proven not extra
226             extra.discard(idx)
227
228         self.check_ok()
229         debug1('client: removing extra indexes: %s\n' % extra)
230         for idx in extra:
231             os.unlink(os.path.join(self.cachedir, idx))
232         debug1('client: server requested load of: %s\n' % needed)
233         for idx in needed:
234             self.sync_index(idx)
235         git.auto_midx(self.cachedir)
236
237     def sync_index(self, name):
238         self._require_command(b'send-index')
239         #debug1('requesting %r\n' % name)
240         self.check_busy()
241         mkdirp(self.cachedir)
242         fn = os.path.join(self.cachedir, name)
243         if os.path.exists(fn):
244             msg = ("won't request existing .idx, try `bup bloom --check %s`"
245                    % path_msg(fn))
246             raise ClientError(msg)
247         self.conn.write(b'send-index %s\n' % name)
248         n = struct.unpack('!I', self.conn.read(4))[0]
249         assert(n)
250         with atomically_replaced_file(fn, 'wb') as f:
251             count = 0
252             progress('Receiving index from server: %d/%d\r' % (count, n))
253             for b in chunkyreader(self.conn, n):
254                 f.write(b)
255                 count += len(b)
256                 qprogress('Receiving index from server: %d/%d\r' % (count, n))
257             progress('Receiving index from server: %d/%d, done.\n' % (count, n))
258             self.check_ok()
259
260     def _make_objcache(self):
261         return git.PackIdxList(self.cachedir)
262
263     def _suggest_packs(self):
264         ob = self._busy
265         if ob:
266             assert(ob == b'receive-objects-v2')
267             self.conn.write(b'\xff\xff\xff\xff')  # suspend receive-objects-v2
268         suggested = []
269         for line in linereader(self.conn):
270             if not line:
271                 break
272             debug2('%r\n' % line)
273             if line.startswith(b'index '):
274                 idx = line[6:]
275                 debug1('client: received index suggestion: %s\n'
276                        % git.shorten_hash(idx).decode('ascii'))
277                 suggested.append(idx)
278             else:
279                 assert(line.endswith(b'.idx'))
280                 debug1('client: completed writing pack, idx: %s\n'
281                        % git.shorten_hash(line).decode('ascii'))
282                 suggested.append(line)
283         self.check_ok()
284         if ob:
285             self._busy = None
286         idx = None
287         for idx in suggested:
288             self.sync_index(idx)
289         git.auto_midx(self.cachedir)
290         if ob:
291             self._busy = ob
292             self.conn.write(b'%s\n' % ob)
293         return idx
294
295     def new_packwriter(self, compression_level=1,
296                        max_pack_size=None, max_pack_objects=None):
297         self._require_command(b'receive-objects-v2')
298         self.check_busy()
299         def _set_busy():
300             self._busy = b'receive-objects-v2'
301             self.conn.write(b'receive-objects-v2\n')
302         return PackWriter_Remote(self.conn,
303                                  objcache_maker = self._make_objcache,
304                                  suggest_packs = self._suggest_packs,
305                                  onopen = _set_busy,
306                                  onclose = self._not_busy,
307                                  ensure_busy = self.ensure_busy,
308                                  compression_level=compression_level,
309                                  max_pack_size=max_pack_size,
310                                  max_pack_objects=max_pack_objects)
311
312     def read_ref(self, refname):
313         self._require_command(b'read-ref')
314         self.check_busy()
315         self.conn.write(b'read-ref %s\n' % refname)
316         r = self.conn.readline().strip()
317         self.check_ok()
318         if r:
319             assert(len(r) == 40)   # hexified sha
320             return unhexlify(r)
321         else:
322             return None   # nonexistent ref
323
324     def update_ref(self, refname, newval, oldval):
325         self._require_command(b'update-ref')
326         self.check_busy()
327         self.conn.write(b'update-ref %s\n%s\n%s\n'
328                         % (refname, hexlify(newval),
329                            hexlify(oldval) if oldval else b''))
330         self.check_ok()
331
332     def join(self, id):
333         self._require_command(b'join')
334         self.check_busy()
335         self._busy = b'join'
336         # Send 'cat' so we'll work fine with older versions
337         self.conn.write(b'cat %s\n' % re.sub(br'[\n\r]', b'_', id))
338         while 1:
339             sz = struct.unpack('!I', self.conn.read(4))[0]
340             if not sz: break
341             yield self.conn.read(sz)
342         # FIXME: ok to assume the only NotOk is a KerError? (it is true atm)
343         e = self.check_ok()
344         self._not_busy()
345         if e:
346             raise KeyError(str(e))
347
348     def cat_batch(self, refs):
349         self._require_command(b'cat-batch')
350         self.check_busy()
351         self._busy = b'cat-batch'
352         conn = self.conn
353         conn.write(b'cat-batch\n')
354         # FIXME: do we want (only) binary protocol?
355         for ref in refs:
356             assert ref
357             assert b'\n' not in ref
358             conn.write(ref)
359             conn.write(b'\n')
360         conn.write(b'\n')
361         for ref in refs:
362             info = conn.readline()
363             if info == b'missing\n':
364                 yield None, None, None, None
365                 continue
366             if not (info and info.endswith(b'\n')):
367                 raise ClientError('Hit EOF while looking for object info: %r'
368                                   % info)
369             oidx, oid_t, size = info.split(b' ')
370             size = int(size)
371             cr = chunkyreader(conn, size)
372             yield oidx, oid_t, size, cr
373             detritus = next(cr, None)
374             if detritus:
375                 raise ClientError('unexpected leftover data ' + repr(detritus))
376         # FIXME: confusing
377         not_ok = self.check_ok()
378         if not_ok:
379             raise not_ok
380         self._not_busy()
381
382     def refs(self, patterns=None, limit_to_heads=False, limit_to_tags=False):
383         patterns = patterns or tuple()
384         self._require_command(b'refs')
385         self.check_busy()
386         self._busy = b'refs'
387         conn = self.conn
388         conn.write(b'refs %d %d\n' % (1 if limit_to_heads else 0,
389                                       1 if limit_to_tags else 0))
390         for pattern in patterns:
391             assert b'\n' not in pattern
392             conn.write(pattern)
393             conn.write(b'\n')
394         conn.write(b'\n')
395         for line in lines_until_sentinel(conn, b'\n', ClientError):
396             line = line[:-1]
397             oidx, name = line.split(b' ')
398             if len(oidx) != 40:
399                 raise ClientError('Invalid object fingerprint in %r' % line)
400             if not name:
401                 raise ClientError('Invalid reference name in %r' % line)
402             yield name, unhexlify(oidx)
403         # FIXME: confusing
404         not_ok = self.check_ok()
405         if not_ok:
406             raise not_ok
407         self._not_busy()
408
409     def rev_list(self, refs, count=None, parse=None, format=None):
410         """See git.rev_list for the general semantics, but note that with the
411         current interface, the parse function must be able to handle
412         (consume) any blank lines produced by the format because the
413         first one received that it doesn't consume will be interpreted
414         as a terminator for the entire rev-list result.
415
416         """
417         self._require_command(b'rev-list')
418         assert (count is None) or (isinstance(count, Integral))
419         if format:
420             assert b'\n' not in format
421             assert parse
422         for ref in refs:
423             assert ref
424             assert b'\n' not in ref
425         self.check_busy()
426         self._busy = b'rev-list'
427         conn = self.conn
428         conn.write(b'rev-list\n')
429         if count is not None:
430             conn.write(b'%d' % count)
431         conn.write(b'\n')
432         if format:
433             conn.write(format)
434         conn.write(b'\n')
435         for ref in refs:
436             conn.write(ref)
437             conn.write(b'\n')
438         conn.write(b'\n')
439         if not format:
440             for line in lines_until_sentinel(conn, b'\n', ClientError):
441                 line = line.strip()
442                 assert len(line) == 40
443                 yield line
444         else:
445             for line in lines_until_sentinel(conn, b'\n', ClientError):
446                 if not line.startswith(b'commit '):
447                     raise ClientError('unexpected line ' + repr(line))
448                 cmt_oidx = line[7:].strip()
449                 assert len(cmt_oidx) == 40
450                 yield cmt_oidx, parse(conn)
451         # FIXME: confusing
452         not_ok = self.check_ok()
453         if not_ok:
454             raise not_ok
455         self._not_busy()
456
457     def resolve(self, path, parent=None, want_meta=True, follow=False):
458         self._require_command(b'resolve')
459         self.check_busy()
460         self._busy = b'resolve'
461         conn = self.conn
462         conn.write(b'resolve %d\n' % ((1 if want_meta else 0)
463                                       | (2 if follow else 0)
464                                       | (4 if parent else 0)))
465         if parent:
466             vfs.write_resolution(conn, parent)
467         write_bvec(conn, path)
468         success = ord(conn.read(1))
469         assert success in (0, 1)
470         if success:
471             result = vfs.read_resolution(conn)
472         else:
473             result = vfs.read_ioerror(conn)
474         # FIXME: confusing
475         not_ok = self.check_ok()
476         if not_ok:
477             raise not_ok
478         self._not_busy()
479         if isinstance(result, vfs.IOError):
480             raise result
481         return result
482
483
484 class PackWriter_Remote(git.PackWriter):
485     def __init__(self, conn, objcache_maker, suggest_packs,
486                  onopen, onclose,
487                  ensure_busy,
488                  compression_level=1,
489                  max_pack_size=None,
490                  max_pack_objects=None):
491         git.PackWriter.__init__(self,
492                                 objcache_maker=objcache_maker,
493                                 compression_level=compression_level,
494                                 max_pack_size=max_pack_size,
495                                 max_pack_objects=max_pack_objects)
496         self.file = conn
497         self.filename = b'remote socket'
498         self.suggest_packs = suggest_packs
499         self.onopen = onopen
500         self.onclose = onclose
501         self.ensure_busy = ensure_busy
502         self._packopen = False
503         self._bwcount = 0
504         self._bwtime = time.time()
505
506     def _open(self):
507         if not self._packopen:
508             self.onopen()
509             self._packopen = True
510
511     def _end(self, run_midx=True):
512         assert(run_midx)  # We don't support this via remote yet
513         if self._packopen and self.file:
514             self.file.write(b'\0\0\0\0')
515             self._packopen = False
516             self.onclose() # Unbusy
517             self.objcache = None
518             return self.suggest_packs() # Returns last idx received
519
520     def close(self):
521         id = self._end()
522         self.file = None
523         return id
524
525     def abort(self):
526         raise ClientError("don't know how to abort remote pack writing")
527
528     def _raw_write(self, datalist, sha):
529         assert(self.file)
530         if not self._packopen:
531             self._open()
532         self.ensure_busy()
533         data = b''.join(datalist)
534         assert(data)
535         assert(sha)
536         crc = zlib.crc32(data) & 0xffffffff
537         outbuf = b''.join((struct.pack('!I', len(data) + 20 + 4),
538                            sha,
539                            struct.pack('!I', crc),
540                            data))
541         try:
542             (self._bwcount, self._bwtime) = _raw_write_bwlimit(
543                     self.file, outbuf, self._bwcount, self._bwtime)
544         except IOError as e:
545             reraise(ClientError(e))
546         self.outbytes += len(data)
547         self.count += 1
548
549         if self.file.has_input():
550             self.suggest_packs()
551             self.objcache.refresh()
552
553         return sha, crc