]> arthur.barton.de Git - bup.git/blob - lib/bup/helpers.py
Send *all* output before the BUPMUX header to stderr
[bup.git] / lib / bup / helpers.py
1 """Helper functions and classes for bup."""
2
3 from __future__ import absolute_import, division
4 from collections import namedtuple
5 from contextlib import contextmanager
6 from ctypes import sizeof, c_void_p
7 from math import floor
8 from os import environ
9 from subprocess import PIPE, Popen
10 import sys, os, pwd, subprocess, errno, socket, select, mmap, stat, re, struct
11 import hashlib, heapq, math, operator, time, grp, tempfile
12
13 from bup import _helpers
14 from bup import compat
15 from bup.compat import argv_bytes, byte_int, pending_raise
16 from bup.io import byte_stream, path_msg
17 # This function should really be in helpers, not in bup.options.  But we
18 # want options.py to be standalone so people can include it in other projects.
19 from bup.options import _tty_width as tty_width
20
21
22 buglvl = int(os.environ.get('BUP_DEBUG', 0))
23
24
25 class Nonlocal:
26     """Helper to deal with Python scoping issues"""
27     pass
28
29
30 sc_page_size = os.sysconf('SC_PAGE_SIZE')
31 assert(sc_page_size > 0)
32
33 sc_arg_max = os.sysconf('SC_ARG_MAX')
34 if sc_arg_max == -1:  # "no definite limit" - let's choose 2M
35     sc_arg_max = 2 * 1024 * 1024
36
37 def last(iterable):
38     result = None
39     for result in iterable:
40         pass
41     return result
42
43 try:
44     _fdatasync = os.fdatasync
45 except AttributeError:
46     _fdatasync = os.fsync
47
48 if sys.platform.startswith('darwin'):
49     # Apparently os.fsync on OS X doesn't guarantee to sync all the way down
50     import fcntl
51     def fdatasync(fd):
52         try:
53             return fcntl.fcntl(fd, fcntl.F_FULLFSYNC)
54         except IOError as e:
55             # Fallback for file systems (SMB) that do not support F_FULLFSYNC
56             if e.errno == errno.ENOTSUP:
57                 return _fdatasync(fd)
58             else:
59                 raise
60 else:
61     fdatasync = _fdatasync
62
63
64 def partition(predicate, stream):
65     """Returns (leading_matches_it, rest_it), where leading_matches_it
66     must be completely exhausted before traversing rest_it.
67
68     """
69     stream = iter(stream)
70     ns = Nonlocal()
71     ns.first_nonmatch = None
72     def leading_matches():
73         for x in stream:
74             if predicate(x):
75                 yield x
76             else:
77                 ns.first_nonmatch = (x,)
78                 break
79     def rest():
80         if ns.first_nonmatch:
81             yield ns.first_nonmatch[0]
82             for x in stream:
83                 yield x
84     return (leading_matches(), rest())
85
86
87 def merge_dict(*xs):
88     result = {}
89     for x in xs:
90         result.update(x)
91     return result
92
93
94 def lines_until_sentinel(f, sentinel, ex_type):
95     # sentinel must end with \n and must contain only one \n
96     while True:
97         line = f.readline()
98         if not (line and line.endswith(b'\n')):
99             raise ex_type('Hit EOF while reading line')
100         if line == sentinel:
101             return
102         yield line
103
104
105 def stat_if_exists(path):
106     try:
107         return os.stat(path)
108     except OSError as e:
109         if e.errno != errno.ENOENT:
110             raise
111     return None
112
113
114 # Write (blockingly) to sockets that may or may not be in blocking mode.
115 # We need this because our stderr is sometimes eaten by subprocesses
116 # (probably ssh) that sometimes make it nonblocking, if only temporarily,
117 # leading to race conditions.  Ick.  We'll do it the hard way.
118 def _hard_write(fd, buf):
119     while buf:
120         (r,w,x) = select.select([], [fd], [], None)
121         if not w:
122             raise IOError('select(fd) returned without being writable')
123         try:
124             sz = os.write(fd, buf)
125         except OSError as e:
126             if e.errno != errno.EAGAIN:
127                 raise
128         assert(sz >= 0)
129         buf = buf[sz:]
130
131
132 _last_prog = 0
133 def log(s):
134     """Print a log message to stderr."""
135     global _last_prog
136     sys.stdout.flush()
137     _hard_write(sys.stderr.fileno(), s if isinstance(s, bytes) else s.encode())
138     _last_prog = 0
139
140
141 def debug1(s):
142     if buglvl >= 1:
143         log(s)
144
145
146 def debug2(s):
147     if buglvl >= 2:
148         log(s)
149
150
151 istty1 = os.isatty(1) or (int(os.environ.get('BUP_FORCE_TTY', 0)) & 1)
152 istty2 = os.isatty(2) or (int(os.environ.get('BUP_FORCE_TTY', 0)) & 2)
153 _last_progress = ''
154 def progress(s):
155     """Calls log() if stderr is a TTY.  Does nothing otherwise."""
156     global _last_progress
157     if istty2:
158         log(s)
159         _last_progress = s
160
161
162 def qprogress(s):
163     """Calls progress() only if we haven't printed progress in a while.
164     
165     This avoids overloading the stderr buffer with excess junk.
166     """
167     global _last_prog
168     now = time.time()
169     if now - _last_prog > 0.1:
170         progress(s)
171         _last_prog = now
172
173
174 def reprogress():
175     """Calls progress() to redisplay the most recent progress message.
176
177     Useful after you've printed some other message that wipes out the
178     progress line.
179     """
180     if _last_progress and _last_progress.endswith('\r'):
181         progress(_last_progress)
182
183
184 def mkdirp(d, mode=None):
185     """Recursively create directories on path 'd'.
186
187     Unlike os.makedirs(), it doesn't raise an exception if the last element of
188     the path already exists.
189     """
190     try:
191         if mode:
192             os.makedirs(d, mode)
193         else:
194             os.makedirs(d)
195     except OSError as e:
196         if e.errno == errno.EEXIST:
197             pass
198         else:
199             raise
200
201
202 class MergeIterItem:
203     def __init__(self, entry, read_it):
204         self.entry = entry
205         self.read_it = read_it
206     def __lt__(self, x):
207         return self.entry < x.entry
208
209 def merge_iter(iters, pfreq, pfunc, pfinal, key=None):
210     if key:
211         samekey = lambda e, pe: getattr(e, key) == getattr(pe, key, None)
212     else:
213         samekey = operator.eq
214     count = 0
215     total = sum(len(it) for it in iters)
216     iters = (iter(it) for it in iters)
217     heap = ((next(it, None),it) for it in iters)
218     heap = [MergeIterItem(e, it) for e, it in heap if e]
219
220     heapq.heapify(heap)
221     pe = None
222     while heap:
223         if not count % pfreq:
224             pfunc(count, total)
225         e, it = heap[0].entry, heap[0].read_it
226         if not samekey(e, pe):
227             pe = e
228             yield e
229         count += 1
230         try:
231             e = next(it)
232         except StopIteration:
233             heapq.heappop(heap) # remove current
234         else:
235             # shift current to new location
236             heapq.heapreplace(heap, MergeIterItem(e, it))
237     pfinal(count, total)
238
239
240 def unlink(f):
241     """Delete a file at path 'f' if it currently exists.
242
243     Unlike os.unlink(), does not throw an exception if the file didn't already
244     exist.
245     """
246     try:
247         os.unlink(f)
248     except OSError as e:
249         if e.errno != errno.ENOENT:
250             raise
251
252
253 _bq_simple_id_rx = re.compile(br'^[-_./a-zA-Z0-9]+$')
254 _sq_simple_id_rx = re.compile(r'^[-_./a-zA-Z0-9]+$')
255
256 def bquote(x):
257     if x == b'':
258         return b"''"
259     if _bq_simple_id_rx.match(x):
260         return x
261     return b"'%s'" % x.replace(b"'", b"'\"'\"'")
262
263 def squote(x):
264     if x == '':
265         return "''"
266     if _sq_simple_id_rx.match(x):
267         return x
268     return "'%s'" % x.replace("'", "'\"'\"'")
269
270 def quote(x):
271     if isinstance(x, bytes):
272         return bquote(x)
273     if isinstance(x, compat.str_type):
274         return squote(x)
275     assert False
276
277 def shstr(cmd):
278     """Return a shell quoted string for cmd if it's a sequence, else cmd.
279
280     cmd must be a string, bytes, or a sequence of one or the other,
281     and the assumption is that if cmd is a string or bytes, then it's
282     already quoted (because it's what's actually being passed to
283     call() and friends.  e.g. log(shstr(cmd)); call(cmd)
284
285     """
286     if isinstance(cmd, (bytes, compat.str_type)):
287         return cmd
288     elif all(isinstance(x, bytes) for x in cmd):
289         return b' '.join(map(bquote, cmd))
290     elif all(isinstance(x, compat.str_type) for x in cmd):
291         return ' '.join(map(squote, cmd))
292     raise TypeError('unsupported shstr argument: ' + repr(cmd))
293
294
295 exc = subprocess.check_call
296
297 def exo(cmd,
298         input=None,
299         stdin=None,
300         stderr=None,
301         shell=False,
302         check=True,
303         preexec_fn=None,
304         close_fds=True):
305     if input:
306         assert stdin in (None, PIPE)
307         stdin = PIPE
308     p = Popen(cmd,
309               stdin=stdin, stdout=PIPE, stderr=stderr,
310               shell=shell,
311               preexec_fn=preexec_fn,
312               close_fds=close_fds)
313     out, err = p.communicate(input)
314     if check and p.returncode != 0:
315         raise Exception('subprocess %r failed with status %d%s'
316                         % (b' '.join(map(quote, cmd)), p.returncode,
317                            ', stderr: %r' % err if err else ''))
318     return out, err, p
319
320 def readpipe(argv, preexec_fn=None, shell=False):
321     """Run a subprocess and return its output."""
322     return exo(argv, preexec_fn=preexec_fn, shell=shell)[0]
323
324
325 def _argmax_base(command):
326     base_size = 2048
327     for c in command:
328         base_size += len(command) + 1
329     for k, v in compat.items(environ):
330         base_size += len(k) + len(v) + 2 + sizeof(c_void_p)
331     return base_size
332
333
334 def _argmax_args_size(args):
335     return sum(len(x) + 1 + sizeof(c_void_p) for x in args)
336
337
338 def batchpipe(command, args, preexec_fn=None, arg_max=sc_arg_max):
339     """If args is not empty, yield the output produced by calling the
340 command list with args as a sequence of strings (It may be necessary
341 to return multiple strings in order to respect ARG_MAX)."""
342     # The optional arg_max arg is a workaround for an issue with the
343     # current wvtest behavior.
344     base_size = _argmax_base(command)
345     while args:
346         room = arg_max - base_size
347         i = 0
348         while i < len(args):
349             next_size = _argmax_args_size(args[i:i+1])
350             if room - next_size < 0:
351                 break
352             room -= next_size
353             i += 1
354         sub_args = args[:i]
355         args = args[i:]
356         assert(len(sub_args))
357         yield readpipe(command + sub_args, preexec_fn=preexec_fn)
358
359
360 def resolve_parent(p):
361     """Return the absolute path of a file without following any final symlink.
362
363     Behaves like os.path.realpath, but doesn't follow a symlink for the last
364     element. (ie. if 'p' itself is a symlink, this one won't follow it, but it
365     will follow symlinks in p's directory)
366     """
367     try:
368         st = os.lstat(p)
369     except OSError:
370         st = None
371     if st and stat.S_ISLNK(st.st_mode):
372         (dir, name) = os.path.split(p)
373         dir = os.path.realpath(dir)
374         out = os.path.join(dir, name)
375     else:
376         out = os.path.realpath(p)
377     #log('realpathing:%r,%r\n' % (p, out))
378     return out
379
380
381 def detect_fakeroot():
382     "Return True if we appear to be running under fakeroot."
383     return os.getenv("FAKEROOTKEY") != None
384
385
386 if sys.platform.startswith('cygwin'):
387     def is_superuser():
388         # https://cygwin.com/ml/cygwin/2015-02/msg00057.html
389         groups = os.getgroups()
390         return 544 in groups or 0 in groups
391 else:
392     def is_superuser():
393         return os.geteuid() == 0
394
395
396 def cache_key_value(get_value, key, cache):
397     """Return (value, was_cached).  If there is a value in the cache
398     for key, use that, otherwise, call get_value(key) which should
399     throw a KeyError if there is no value -- in which case the cached
400     and returned value will be None.
401     """
402     try: # Do we already have it (or know there wasn't one)?
403         value = cache[key]
404         return value, True
405     except KeyError:
406         pass
407     value = None
408     try:
409         cache[key] = value = get_value(key)
410     except KeyError:
411         cache[key] = None
412     return value, False
413
414
415 _hostname = None
416 def hostname():
417     """Get the FQDN of this machine."""
418     global _hostname
419     if not _hostname:
420         _hostname = _helpers.gethostname()
421     return _hostname
422
423
424 def format_filesize(size):
425     unit = 1024.0
426     size = float(size)
427     if size < unit:
428         return "%d" % (size)
429     exponent = int(math.log(size) // math.log(unit))
430     size_prefix = "KMGTPE"[exponent - 1]
431     return "%.1f%s" % (size / math.pow(unit, exponent), size_prefix)
432
433
434 class NotOk(Exception):
435     pass
436
437
438 class BaseConn:
439     def __init__(self, outp):
440         self.outp = outp
441
442     def close(self):
443         while self._read(65536): pass
444
445     def _read(self, size):
446         raise NotImplementedError("Subclasses must implement _read")
447
448     def read(self, size):
449         """Read 'size' bytes from input stream."""
450         self.outp.flush()
451         return self._read(size)
452
453     def _readline(self, size):
454         raise NotImplementedError("Subclasses must implement _readline")
455
456     def readline(self):
457         """Read from input stream until a newline is found."""
458         self.outp.flush()
459         return self._readline()
460
461     def write(self, data):
462         """Write 'data' to output stream."""
463         #log('%d writing: %d bytes\n' % (os.getpid(), len(data)))
464         self.outp.write(data)
465
466     def has_input(self):
467         """Return true if input stream is readable."""
468         raise NotImplementedError("Subclasses must implement has_input")
469
470     def ok(self):
471         """Indicate end of output from last sent command."""
472         self.write(b'\nok\n')
473
474     def error(self, s):
475         """Indicate server error to the client."""
476         s = re.sub(br'\s+', b' ', s)
477         self.write(b'\nerror %s\n' % s)
478
479     def _check_ok(self, onempty):
480         self.outp.flush()
481         rl = b''
482         for rl in linereader(self):
483             #log('%d got line: %r\n' % (os.getpid(), rl))
484             if not rl:  # empty line
485                 continue
486             elif rl == b'ok':
487                 return None
488             elif rl.startswith(b'error '):
489                 #log('client: error: %s\n' % rl[6:])
490                 return NotOk(rl[6:])
491             else:
492                 onempty(rl)
493         raise Exception('server exited unexpectedly; see errors above')
494
495     def drain_and_check_ok(self):
496         """Remove all data for the current command from input stream."""
497         def onempty(rl):
498             pass
499         return self._check_ok(onempty)
500
501     def check_ok(self):
502         """Verify that server action completed successfully."""
503         def onempty(rl):
504             raise Exception('expected "ok", got %r' % rl)
505         return self._check_ok(onempty)
506
507
508 class Conn(BaseConn):
509     def __init__(self, inp, outp):
510         BaseConn.__init__(self, outp)
511         self.inp = inp
512
513     def _read(self, size):
514         return self.inp.read(size)
515
516     def _readline(self):
517         return self.inp.readline()
518
519     def has_input(self):
520         [rl, wl, xl] = select.select([self.inp.fileno()], [], [], 0)
521         if rl:
522             assert(rl[0] == self.inp.fileno())
523             return True
524         else:
525             return None
526
527
528 def checked_reader(fd, n):
529     while n > 0:
530         rl, _, _ = select.select([fd], [], [])
531         assert(rl[0] == fd)
532         buf = os.read(fd, n)
533         if not buf: raise Exception("Unexpected EOF reading %d more bytes" % n)
534         yield buf
535         n -= len(buf)
536
537
538 MAX_PACKET = 128 * 1024
539 def mux(p, outfd, outr, errr):
540     try:
541         fds = [outr, errr]
542         while p.poll() is None:
543             rl, _, _ = select.select(fds, [], [])
544             for fd in rl:
545                 if fd == outr:
546                     buf = os.read(outr, MAX_PACKET)
547                     if not buf: break
548                     os.write(outfd, struct.pack('!IB', len(buf), 1) + buf)
549                 elif fd == errr:
550                     buf = os.read(errr, 1024)
551                     if not buf: break
552                     os.write(outfd, struct.pack('!IB', len(buf), 2) + buf)
553     finally:
554         os.write(outfd, struct.pack('!IB', 0, 3))
555
556
557 class DemuxConn(BaseConn):
558     """A helper class for bup's client-server protocol."""
559     def __init__(self, infd, outp):
560         BaseConn.__init__(self, outp)
561         # Anything that comes through before the sync string was not
562         # multiplexed and can be assumed to be debug/log before mux init.
563         tail = b''
564         stderr = byte_stream(sys.stderr)
565         while tail != b'BUPMUX':
566             # Make sure to write all pre-BUPMUX output to stderr
567             b = os.read(infd, (len(tail) < 6) and (6-len(tail)) or 1)
568             if not b:
569                 ex = IOError('demux: unexpected EOF during initialization')
570                 with pending_raise(ex):
571                     stderr.write(tail)
572                     stderr.flush()
573             tail += b
574             stderr.write(tail[:-6])
575             tail = tail[-6:]
576         stderr.flush()
577         self.infd = infd
578         self.reader = None
579         self.buf = None
580         self.closed = False
581
582     def write(self, data):
583         self._load_buf(0)
584         BaseConn.write(self, data)
585
586     def _next_packet(self, timeout):
587         if self.closed: return False
588         rl, wl, xl = select.select([self.infd], [], [], timeout)
589         if not rl: return False
590         assert(rl[0] == self.infd)
591         ns = b''.join(checked_reader(self.infd, 5))
592         n, fdw = struct.unpack('!IB', ns)
593         assert(n <= MAX_PACKET)
594         if fdw == 1:
595             self.reader = checked_reader(self.infd, n)
596         elif fdw == 2:
597             for buf in checked_reader(self.infd, n):
598                 byte_stream(sys.stderr).write(buf)
599         elif fdw == 3:
600             self.closed = True
601             debug2("DemuxConn: marked closed\n")
602         return True
603
604     def _load_buf(self, timeout):
605         if self.buf is not None:
606             return True
607         while not self.closed:
608             while not self.reader:
609                 if not self._next_packet(timeout):
610                     return False
611             try:
612                 self.buf = next(self.reader)
613                 return True
614             except StopIteration:
615                 self.reader = None
616         return False
617
618     def _read_parts(self, ix_fn):
619         while self._load_buf(None):
620             assert(self.buf is not None)
621             i = ix_fn(self.buf)
622             if i is None or i == len(self.buf):
623                 yv = self.buf
624                 self.buf = None
625             else:
626                 yv = self.buf[:i]
627                 self.buf = self.buf[i:]
628             yield yv
629             if i is not None:
630                 break
631
632     def _readline(self):
633         def find_eol(buf):
634             try:
635                 return buf.index(b'\n')+1
636             except ValueError:
637                 return None
638         return b''.join(self._read_parts(find_eol))
639
640     def _read(self, size):
641         csize = [size]
642         def until_size(buf): # Closes on csize
643             if len(buf) < csize[0]:
644                 csize[0] -= len(buf)
645                 return None
646             else:
647                 return csize[0]
648         return b''.join(self._read_parts(until_size))
649
650     def has_input(self):
651         return self._load_buf(0)
652
653
654 def linereader(f):
655     """Generate a list of input lines from 'f' without terminating newlines."""
656     while 1:
657         line = f.readline()
658         if not line:
659             break
660         yield line[:-1]
661
662
663 def chunkyreader(f, count = None):
664     """Generate a list of chunks of data read from 'f'.
665
666     If count is None, read until EOF is reached.
667
668     If count is a positive integer, read 'count' bytes from 'f'. If EOF is
669     reached while reading, raise IOError.
670     """
671     if count != None:
672         while count > 0:
673             b = f.read(min(count, 65536))
674             if not b:
675                 raise IOError('EOF with %d bytes remaining' % count)
676             yield b
677             count -= len(b)
678     else:
679         while 1:
680             b = f.read(65536)
681             if not b: break
682             yield b
683
684
685 @contextmanager
686 def atomically_replaced_file(name, mode='w', buffering=-1):
687     """Yield a file that will be atomically renamed name when leaving the block.
688
689     This contextmanager yields an open file object that is backed by a
690     temporary file which will be renamed (atomically) to the target
691     name if everything succeeds.
692
693     The mode and buffering arguments are handled exactly as with open,
694     and the yielded file will have very restrictive permissions, as
695     per mkstemp.
696
697     E.g.::
698
699         with atomically_replaced_file('foo.txt', 'w') as f:
700             f.write('hello jack.')
701
702     """
703
704     (ffd, tempname) = tempfile.mkstemp(dir=os.path.dirname(name),
705                                        text=('b' not in mode))
706     try:
707         try:
708             f = os.fdopen(ffd, mode, buffering)
709         except:
710             os.close(ffd)
711             raise
712         try:
713             yield f
714         finally:
715             f.close()
716         os.rename(tempname, name)
717     finally:
718         unlink(tempname)  # nonexistant file is ignored
719
720
721 def slashappend(s):
722     """Append "/" to 's' if it doesn't aleady end in "/"."""
723     assert isinstance(s, bytes)
724     if s and not s.endswith(b'/'):
725         return s + b'/'
726     else:
727         return s
728
729
730 def _mmap_do(f, sz, flags, prot, close):
731     if not sz:
732         st = os.fstat(f.fileno())
733         sz = st.st_size
734     if not sz:
735         # trying to open a zero-length map gives an error, but an empty
736         # string has all the same behaviour of a zero-length map, ie. it has
737         # no elements :)
738         return ''
739     map = mmap.mmap(f.fileno(), sz, flags, prot)
740     if close:
741         f.close()  # map will persist beyond file close
742     return map
743
744
745 def mmap_read(f, sz = 0, close=True):
746     """Create a read-only memory mapped region on file 'f'.
747     If sz is 0, the region will cover the entire file.
748     """
749     return _mmap_do(f, sz, mmap.MAP_PRIVATE, mmap.PROT_READ, close)
750
751
752 def mmap_readwrite(f, sz = 0, close=True):
753     """Create a read-write memory mapped region on file 'f'.
754     If sz is 0, the region will cover the entire file.
755     """
756     return _mmap_do(f, sz, mmap.MAP_SHARED, mmap.PROT_READ|mmap.PROT_WRITE,
757                     close)
758
759
760 def mmap_readwrite_private(f, sz = 0, close=True):
761     """Create a read-write memory mapped region on file 'f'.
762     If sz is 0, the region will cover the entire file.
763     The map is private, which means the changes are never flushed back to the
764     file.
765     """
766     return _mmap_do(f, sz, mmap.MAP_PRIVATE, mmap.PROT_READ|mmap.PROT_WRITE,
767                     close)
768
769
770 _mincore = getattr(_helpers, 'mincore', None)
771 if _mincore:
772     # ./configure ensures that we're on Linux if MINCORE_INCORE isn't defined.
773     MINCORE_INCORE = getattr(_helpers, 'MINCORE_INCORE', 1)
774
775     _fmincore_chunk_size = None
776     def _set_fmincore_chunk_size():
777         global _fmincore_chunk_size
778         pref_chunk_size = 64 * 1024 * 1024
779         chunk_size = sc_page_size
780         if (sc_page_size < pref_chunk_size):
781             chunk_size = sc_page_size * (pref_chunk_size // sc_page_size)
782         _fmincore_chunk_size = chunk_size
783
784     def fmincore(fd):
785         """Return the mincore() data for fd as a bytearray whose values can be
786         tested via MINCORE_INCORE, or None if fd does not fully
787         support the operation."""
788         st = os.fstat(fd)
789         if (st.st_size == 0):
790             return bytearray(0)
791         if not _fmincore_chunk_size:
792             _set_fmincore_chunk_size()
793         pages_per_chunk = _fmincore_chunk_size // sc_page_size;
794         page_count = (st.st_size + sc_page_size - 1) // sc_page_size;
795         chunk_count = (st.st_size + _fmincore_chunk_size - 1) // _fmincore_chunk_size
796         result = bytearray(page_count)
797         for ci in compat.range(chunk_count):
798             pos = _fmincore_chunk_size * ci;
799             msize = min(_fmincore_chunk_size, st.st_size - pos)
800             try:
801                 m = mmap.mmap(fd, msize, mmap.MAP_PRIVATE, 0, 0, pos)
802             except mmap.error as ex:
803                 if ex.errno == errno.EINVAL or ex.errno == errno.ENODEV:
804                     # Perhaps the file was a pipe, i.e. "... | bup split ..."
805                     return None
806                 raise ex
807             try:
808                 _mincore(m, msize, 0, result, ci * pages_per_chunk)
809             except OSError as ex:
810                 if ex.errno == errno.ENOSYS:
811                     return None
812                 raise
813         return result
814
815
816 def parse_timestamp(epoch_str):
817     """Return the number of nanoseconds since the epoch that are described
818 by epoch_str (100ms, 100ns, ...); when epoch_str cannot be parsed,
819 throw a ValueError that may contain additional information."""
820     ns_per = {'s' :  1000000000,
821               'ms' : 1000000,
822               'us' : 1000,
823               'ns' : 1}
824     match = re.match(r'^((?:[-+]?[0-9]+)?)(s|ms|us|ns)$', epoch_str)
825     if not match:
826         if re.match(r'^([-+]?[0-9]+)$', epoch_str):
827             raise ValueError('must include units, i.e. 100ns, 100ms, ...')
828         raise ValueError()
829     (n, units) = match.group(1, 2)
830     if not n:
831         n = 1
832     n = int(n)
833     return n * ns_per[units]
834
835
836 def parse_num(s):
837     """Parse string or bytes as a possibly unit suffixed number.
838
839     For example:
840         199.2k means 203981 bytes
841         1GB means 1073741824 bytes
842         2.1 tb means 2199023255552 bytes
843     """
844     if isinstance(s, bytes):
845         # FIXME: should this raise a ValueError for UnicodeDecodeError
846         # (perhaps with the latter as the context).
847         s = s.decode('ascii')
848     g = re.match(r'([-+\d.e]+)\s*(\w*)', str(s))
849     if not g:
850         raise ValueError("can't parse %r as a number" % s)
851     (val, unit) = g.groups()
852     num = float(val)
853     unit = unit.lower()
854     if unit in ['t', 'tb']:
855         mult = 1024*1024*1024*1024
856     elif unit in ['g', 'gb']:
857         mult = 1024*1024*1024
858     elif unit in ['m', 'mb']:
859         mult = 1024*1024
860     elif unit in ['k', 'kb']:
861         mult = 1024
862     elif unit in ['', 'b']:
863         mult = 1
864     else:
865         raise ValueError("invalid unit %r in number %r" % (unit, s))
866     return int(num*mult)
867
868
869 saved_errors = []
870 def add_error(e):
871     """Append an error message to the list of saved errors.
872
873     Once processing is able to stop and output the errors, the saved errors are
874     accessible in the module variable helpers.saved_errors.
875     """
876     saved_errors.append(e)
877     log('%-70s\n' % e)
878
879
880 def clear_errors():
881     global saved_errors
882     saved_errors = []
883
884
885 def die_if_errors(msg=None, status=1):
886     global saved_errors
887     if saved_errors:
888         if not msg:
889             msg = 'warning: %d errors encountered\n' % len(saved_errors)
890         log(msg)
891         sys.exit(status)
892
893
894 def handle_ctrl_c():
895     """Replace the default exception handler for KeyboardInterrupt (Ctrl-C).
896
897     The new exception handler will make sure that bup will exit without an ugly
898     stacktrace when Ctrl-C is hit.
899     """
900     oldhook = sys.excepthook
901     def newhook(exctype, value, traceback):
902         if exctype == KeyboardInterrupt:
903             log('\nInterrupted.\n')
904         else:
905             return oldhook(exctype, value, traceback)
906     sys.excepthook = newhook
907
908
909 def columnate(l, prefix):
910     """Format elements of 'l' in columns with 'prefix' leading each line.
911
912     The number of columns is determined automatically based on the string
913     lengths.
914     """
915     binary = isinstance(prefix, bytes)
916     nothing = b'' if binary else ''
917     nl = b'\n' if binary else '\n'
918     if not l:
919         return nothing
920     l = l[:]
921     clen = max(len(s) for s in l)
922     ncols = (tty_width() - len(prefix)) // (clen + 2)
923     if ncols <= 1:
924         ncols = 1
925         clen = 0
926     cols = []
927     while len(l) % ncols:
928         l.append(nothing)
929     rows = len(l) // ncols
930     for s in compat.range(0, len(l), rows):
931         cols.append(l[s:s+rows])
932     out = nothing
933     fmt = b'%-*s' if binary else '%-*s'
934     for row in zip(*cols):
935         out += prefix + nothing.join((fmt % (clen+2, s)) for s in row) + nl
936     return out
937
938
939 def parse_date_or_fatal(str, fatal):
940     """Parses the given date or calls Option.fatal().
941     For now we expect a string that contains a float."""
942     try:
943         date = float(str)
944     except ValueError as e:
945         raise fatal('invalid date format (should be a float): %r' % e)
946     else:
947         return date
948
949
950 def parse_excludes(options, fatal):
951     """Traverse the options and extract all excludes, or call Option.fatal()."""
952     excluded_paths = []
953
954     for flag in options:
955         (option, parameter) = flag
956         if option == '--exclude':
957             excluded_paths.append(resolve_parent(argv_bytes(parameter)))
958         elif option == '--exclude-from':
959             try:
960                 f = open(resolve_parent(argv_bytes(parameter)), 'rb')
961             except IOError as e:
962                 raise fatal("couldn't read %r" % parameter)
963             for exclude_path in f.readlines():
964                 # FIXME: perhaps this should be rstrip('\n')
965                 exclude_path = resolve_parent(exclude_path.strip())
966                 if exclude_path:
967                     excluded_paths.append(exclude_path)
968     return sorted(frozenset(excluded_paths))
969
970
971 def parse_rx_excludes(options, fatal):
972     """Traverse the options and extract all rx excludes, or call
973     Option.fatal()."""
974     excluded_patterns = []
975
976     for flag in options:
977         (option, parameter) = flag
978         if option == '--exclude-rx':
979             try:
980                 excluded_patterns.append(re.compile(argv_bytes(parameter)))
981             except re.error as ex:
982                 fatal('invalid --exclude-rx pattern (%r): %s' % (parameter, ex))
983         elif option == '--exclude-rx-from':
984             try:
985                 f = open(resolve_parent(parameter), 'rb')
986             except IOError as e:
987                 raise fatal("couldn't read %r" % parameter)
988             for pattern in f.readlines():
989                 spattern = pattern.rstrip(b'\n')
990                 if not spattern:
991                     continue
992                 try:
993                     excluded_patterns.append(re.compile(spattern))
994                 except re.error as ex:
995                     fatal('invalid --exclude-rx pattern (%r): %s' % (spattern, ex))
996     return excluded_patterns
997
998
999 def should_rx_exclude_path(path, exclude_rxs):
1000     """Return True if path matches a regular expression in exclude_rxs."""
1001     for rx in exclude_rxs:
1002         if rx.search(path):
1003             debug1('Skipping %r: excluded by rx pattern %r.\n'
1004                    % (path, rx.pattern))
1005             return True
1006     return False
1007
1008
1009 # FIXME: Carefully consider the use of functions (os.path.*, etc.)
1010 # that resolve against the current filesystem in the strip/graft
1011 # functions for example, but elsewhere as well.  I suspect bup's not
1012 # always being careful about that.  For some cases, the contents of
1013 # the current filesystem should be irrelevant, and consulting it might
1014 # produce the wrong result, perhaps via unintended symlink resolution,
1015 # for example.
1016
1017 def path_components(path):
1018     """Break path into a list of pairs of the form (name,
1019     full_path_to_name).  Path must start with '/'.
1020     Example:
1021       '/home/foo' -> [('', '/'), ('home', '/home'), ('foo', '/home/foo')]"""
1022     if not path.startswith(b'/'):
1023         raise Exception('path must start with "/": %s' % path_msg(path))
1024     # Since we assume path startswith('/'), we can skip the first element.
1025     result = [(b'', b'/')]
1026     norm_path = os.path.abspath(path)
1027     if norm_path == b'/':
1028         return result
1029     full_path = b''
1030     for p in norm_path.split(b'/')[1:]:
1031         full_path += b'/' + p
1032         result.append((p, full_path))
1033     return result
1034
1035
1036 def stripped_path_components(path, strip_prefixes):
1037     """Strip any prefix in strip_prefixes from path and return a list
1038     of path components where each component is (name,
1039     none_or_full_fs_path_to_name).  Assume path startswith('/').
1040     See thelpers.py for examples."""
1041     normalized_path = os.path.abspath(path)
1042     sorted_strip_prefixes = sorted(strip_prefixes, key=len, reverse=True)
1043     for bp in sorted_strip_prefixes:
1044         normalized_bp = os.path.abspath(bp)
1045         if normalized_bp == b'/':
1046             continue
1047         if normalized_path.startswith(normalized_bp):
1048             prefix = normalized_path[:len(normalized_bp)]
1049             result = []
1050             for p in normalized_path[len(normalized_bp):].split(b'/'):
1051                 if p: # not root
1052                     prefix += b'/'
1053                 prefix += p
1054                 result.append((p, prefix))
1055             return result
1056     # Nothing to strip.
1057     return path_components(path)
1058
1059
1060 def grafted_path_components(graft_points, path):
1061     # Create a result that consists of some number of faked graft
1062     # directories before the graft point, followed by all of the real
1063     # directories from path that are after the graft point.  Arrange
1064     # for the directory at the graft point in the result to correspond
1065     # to the "orig" directory in --graft orig=new.  See t/thelpers.py
1066     # for some examples.
1067
1068     # Note that given --graft orig=new, orig and new have *nothing* to
1069     # do with each other, even if some of their component names
1070     # match. i.e. --graft /foo/bar/baz=/foo/bar/bax is semantically
1071     # equivalent to --graft /foo/bar/baz=/x/y/z, or even
1072     # /foo/bar/baz=/x.
1073
1074     # FIXME: This can't be the best solution...
1075     clean_path = os.path.abspath(path)
1076     for graft_point in graft_points:
1077         old_prefix, new_prefix = graft_point
1078         # Expand prefixes iff not absolute paths.
1079         old_prefix = os.path.normpath(old_prefix)
1080         new_prefix = os.path.normpath(new_prefix)
1081         if clean_path.startswith(old_prefix):
1082             escaped_prefix = re.escape(old_prefix)
1083             grafted_path = re.sub(br'^' + escaped_prefix, new_prefix, clean_path)
1084             # Handle /foo=/ (at least) -- which produces //whatever.
1085             grafted_path = b'/' + grafted_path.lstrip(b'/')
1086             clean_path_components = path_components(clean_path)
1087             # Count the components that were stripped.
1088             strip_count = 0 if old_prefix == b'/' else old_prefix.count(b'/')
1089             new_prefix_parts = new_prefix.split(b'/')
1090             result_prefix = grafted_path.split(b'/')[:new_prefix.count(b'/')]
1091             result = [(p, None) for p in result_prefix] \
1092                 + clean_path_components[strip_count:]
1093             # Now set the graft point name to match the end of new_prefix.
1094             graft_point = len(result_prefix)
1095             result[graft_point] = \
1096                 (new_prefix_parts[-1], clean_path_components[strip_count][1])
1097             if new_prefix == b'/': # --graft ...=/ is a special case.
1098                 return result[1:]
1099             return result
1100     return path_components(clean_path)
1101
1102
1103 Sha1 = hashlib.sha1
1104
1105
1106 _localtime = getattr(_helpers, 'localtime', None)
1107
1108 if _localtime:
1109     bup_time = namedtuple('bup_time', ['tm_year', 'tm_mon', 'tm_mday',
1110                                        'tm_hour', 'tm_min', 'tm_sec',
1111                                        'tm_wday', 'tm_yday',
1112                                        'tm_isdst', 'tm_gmtoff', 'tm_zone'])
1113
1114 # Define a localtime() that returns bup_time when possible.  Note:
1115 # this means that any helpers.localtime() results may need to be
1116 # passed through to_py_time() before being passed to python's time
1117 # module, which doesn't appear willing to ignore the extra items.
1118 if _localtime:
1119     def localtime(time):
1120         return bup_time(*_helpers.localtime(int(floor(time))))
1121     def utc_offset_str(t):
1122         """Return the local offset from UTC as "+hhmm" or "-hhmm" for time t.
1123         If the current UTC offset does not represent an integer number
1124         of minutes, the fractional component will be truncated."""
1125         off = localtime(t).tm_gmtoff
1126         # Note: // doesn't truncate like C for negative values, it rounds down.
1127         offmin = abs(off) // 60
1128         m = offmin % 60
1129         h = (offmin - m) // 60
1130         return b'%+03d%02d' % (-h if off < 0 else h, m)
1131     def to_py_time(x):
1132         if isinstance(x, time.struct_time):
1133             return x
1134         return time.struct_time(x[:9])
1135 else:
1136     localtime = time.localtime
1137     def utc_offset_str(t):
1138         return time.strftime(b'%z', localtime(t))
1139     def to_py_time(x):
1140         return x
1141
1142
1143 _some_invalid_save_parts_rx = re.compile(br'[\[ ~^:?*\\]|\.\.|//|@{')
1144
1145 def valid_save_name(name):
1146     # Enforce a superset of the restrictions in git-check-ref-format(1)
1147     if name == b'@' \
1148        or name.startswith(b'/') or name.endswith(b'/') \
1149        or name.endswith(b'.'):
1150         return False
1151     if _some_invalid_save_parts_rx.search(name):
1152         return False
1153     for c in name:
1154         if byte_int(c) < 0x20 or byte_int(c) == 0x7f:
1155             return False
1156     for part in name.split(b'/'):
1157         if part.startswith(b'.') or part.endswith(b'.lock'):
1158             return False
1159     return True
1160
1161
1162 _period_rx = re.compile(br'^([0-9]+)(s|min|h|d|w|m|y)$')
1163
1164 def period_as_secs(s):
1165     if s == b'forever':
1166         return float('inf')
1167     match = _period_rx.match(s)
1168     if not match:
1169         return None
1170     mag = int(match.group(1))
1171     scale = match.group(2)
1172     return mag * {b's': 1,
1173                   b'min': 60,
1174                   b'h': 60 * 60,
1175                   b'd': 60 * 60 * 24,
1176                   b'w': 60 * 60 * 24 * 7,
1177                   b'm': 60 * 60 * 24 * 31,
1178                   b'y': 60 * 60 * 24 * 366}[scale]