1 """Helper functions and classes for bup."""
3 from collections import namedtuple
4 from ctypes import sizeof, c_void_p
6 from contextlib import contextmanager
7 import sys, os, pwd, subprocess, errno, socket, select, mmap, stat, re, struct
8 import hashlib, heapq, math, operator, time, grp, tempfile
10 from bup import _helpers
12 sc_arg_max = os.sysconf('SC_ARG_MAX')
14 # This function should really be in helpers, not in bup.options. But we
15 # want options.py to be standalone so people can include it in other projects.
16 from bup.options import _tty_width
17 tty_width = _tty_width
21 """Convert the string 's' to an integer. Return 0 if s is not a number."""
29 """Convert the string 's' to a float. Return 0 if s is not a number."""
31 return float(s or '0')
36 buglvl = atoi(os.environ.get('BUP_DEBUG', 0))
39 # If the platform doesn't have fdatasync (OS X), fall back to fsync.
41 fdatasync = os.fdatasync
42 except AttributeError:
46 # Write (blockingly) to sockets that may or may not be in blocking mode.
47 # We need this because our stderr is sometimes eaten by subprocesses
48 # (probably ssh) that sometimes make it nonblocking, if only temporarily,
49 # leading to race conditions. Ick. We'll do it the hard way.
50 def _hard_write(fd, buf):
52 (r,w,x) = select.select([], [fd], [], None)
54 raise IOError('select(fd) returned without being writable')
56 sz = os.write(fd, buf)
58 if e.errno != errno.EAGAIN:
66 """Print a log message to stderr."""
69 _hard_write(sys.stderr.fileno(), s)
83 istty1 = os.isatty(1) or (atoi(os.environ.get('BUP_FORCE_TTY')) & 1)
84 istty2 = os.isatty(2) or (atoi(os.environ.get('BUP_FORCE_TTY')) & 2)
87 """Calls log() if stderr is a TTY. Does nothing otherwise."""
95 """Calls progress() only if we haven't printed progress in a while.
97 This avoids overloading the stderr buffer with excess junk.
101 if now - _last_prog > 0.1:
107 """Calls progress() to redisplay the most recent progress message.
109 Useful after you've printed some other message that wipes out the
112 if _last_progress and _last_progress.endswith('\r'):
113 progress(_last_progress)
116 def mkdirp(d, mode=None):
117 """Recursively create directories on path 'd'.
119 Unlike os.makedirs(), it doesn't raise an exception if the last element of
120 the path already exists.
128 if e.errno == errno.EEXIST:
134 _unspecified_next_default = object()
136 def _fallback_next(it, default=_unspecified_next_default):
137 """Retrieve the next item from the iterator by calling its
138 next() method. If default is given, it is returned if the
139 iterator is exhausted, otherwise StopIteration is raised."""
141 if default is _unspecified_next_default:
146 except StopIteration:
149 if sys.version_info < (2, 6):
150 next = _fallback_next
153 def merge_iter(iters, pfreq, pfunc, pfinal, key=None):
155 samekey = lambda e, pe: getattr(e, key) == getattr(pe, key, None)
157 samekey = operator.eq
159 total = sum(len(it) for it in iters)
160 iters = (iter(it) for it in iters)
161 heap = ((next(it, None),it) for it in iters)
162 heap = [(e,it) for e,it in heap if e]
167 if not count % pfreq:
170 if not samekey(e, pe):
175 e = it.next() # Don't use next() function, it's too expensive
176 except StopIteration:
177 heapq.heappop(heap) # remove current
179 heapq.heapreplace(heap, (e, it)) # shift current to new location
184 """Delete a file at path 'f' if it currently exists.
186 Unlike os.unlink(), does not throw an exception if the file didn't already
192 if e.errno != errno.ENOENT:
196 def readpipe(argv, preexec_fn=None):
197 """Run a subprocess and return its output."""
198 p = subprocess.Popen(argv, stdout=subprocess.PIPE, preexec_fn=preexec_fn)
199 out, err = p.communicate()
200 if p.returncode != 0:
201 raise Exception('subprocess %r failed with status %d'
202 % (' '.join(argv), p.returncode))
206 def _argmax_base(command):
209 base_size += len(command) + 1
210 for k, v in environ.iteritems():
211 base_size += len(k) + len(v) + 2 + sizeof(c_void_p)
215 def _argmax_args_size(args):
216 return sum(len(x) + 1 + sizeof(c_void_p) for x in args)
219 def batchpipe(command, args, preexec_fn=None, arg_max=sc_arg_max):
220 """If args is not empty, yield the output produced by calling the
221 command list with args as a sequence of strings (It may be necessary
222 to return multiple strings in order to respect ARG_MAX)."""
223 # The optional arg_max arg is a workaround for an issue with the
224 # current wvtest behavior.
225 base_size = _argmax_base(command)
227 room = arg_max - base_size
230 next_size = _argmax_args_size(args[i:i+1])
231 if room - next_size < 0:
237 assert(len(sub_args))
238 yield readpipe(command + sub_args, preexec_fn=preexec_fn)
242 """Get the absolute path of a file.
244 Behaves like os.path.realpath, but doesn't follow a symlink for the last
245 element. (ie. if 'p' itself is a symlink, this one won't follow it, but it
246 will follow symlinks in p's directory)
252 if st and stat.S_ISLNK(st.st_mode):
253 (dir, name) = os.path.split(p)
254 dir = os.path.realpath(dir)
255 out = os.path.join(dir, name)
257 out = os.path.realpath(p)
258 #log('realpathing:%r,%r\n' % (p, out))
262 def detect_fakeroot():
263 "Return True if we appear to be running under fakeroot."
264 return os.getenv("FAKEROOTKEY") != None
268 if sys.platform.startswith('cygwin'):
270 return ctypes.cdll.shell32.IsUserAnAdmin()
272 return os.geteuid() == 0
275 def _cache_key_value(get_value, key, cache):
276 """Return (value, was_cached). If there is a value in the cache
277 for key, use that, otherwise, call get_value(key) which should
278 throw a KeyError if there is no value -- in which case the cached
279 and returned value will be None.
281 try: # Do we already have it (or know there wasn't one)?
288 cache[key] = value = get_value(key)
294 _uid_to_pwd_cache = {}
295 _name_to_pwd_cache = {}
297 def pwd_from_uid(uid):
298 """Return password database entry for uid (may be a cached value).
299 Return None if no entry is found.
301 global _uid_to_pwd_cache, _name_to_pwd_cache
302 entry, cached = _cache_key_value(pwd.getpwuid, uid, _uid_to_pwd_cache)
303 if entry and not cached:
304 _name_to_pwd_cache[entry.pw_name] = entry
308 def pwd_from_name(name):
309 """Return password database entry for name (may be a cached value).
310 Return None if no entry is found.
312 global _uid_to_pwd_cache, _name_to_pwd_cache
313 entry, cached = _cache_key_value(pwd.getpwnam, name, _name_to_pwd_cache)
314 if entry and not cached:
315 _uid_to_pwd_cache[entry.pw_uid] = entry
319 _gid_to_grp_cache = {}
320 _name_to_grp_cache = {}
322 def grp_from_gid(gid):
323 """Return password database entry for gid (may be a cached value).
324 Return None if no entry is found.
326 global _gid_to_grp_cache, _name_to_grp_cache
327 entry, cached = _cache_key_value(grp.getgrgid, gid, _gid_to_grp_cache)
328 if entry and not cached:
329 _name_to_grp_cache[entry.gr_name] = entry
333 def grp_from_name(name):
334 """Return password database entry for name (may be a cached value).
335 Return None if no entry is found.
337 global _gid_to_grp_cache, _name_to_grp_cache
338 entry, cached = _cache_key_value(grp.getgrnam, name, _name_to_grp_cache)
339 if entry and not cached:
340 _gid_to_grp_cache[entry.gr_gid] = entry
346 """Get the user's login name."""
350 _username = pwd_from_uid(uid)[0] or 'user%d' % uid
356 """Get the user's full name."""
358 if not _userfullname:
360 entry = pwd_from_uid(uid)
362 _userfullname = entry[4].split(',')[0] or entry[0]
363 if not _userfullname:
364 _userfullname = 'user%d' % uid
370 """Get the FQDN of this machine."""
373 _hostname = socket.getfqdn()
377 _resource_path = None
378 def resource_path(subdir=''):
379 global _resource_path
380 if not _resource_path:
381 _resource_path = os.environ.get('BUP_RESOURCE_PATH') or '.'
382 return os.path.join(_resource_path, subdir)
384 def format_filesize(size):
389 exponent = int(math.log(size) / math.log(unit))
390 size_prefix = "KMGTPE"[exponent - 1]
391 return "%.1f%s" % (size / math.pow(unit, exponent), size_prefix)
394 class NotOk(Exception):
399 def __init__(self, outp):
403 while self._read(65536): pass
405 def read(self, size):
406 """Read 'size' bytes from input stream."""
408 return self._read(size)
411 """Read from input stream until a newline is found."""
413 return self._readline()
415 def write(self, data):
416 """Write 'data' to output stream."""
417 #log('%d writing: %d bytes\n' % (os.getpid(), len(data)))
418 self.outp.write(data)
421 """Return true if input stream is readable."""
422 raise NotImplemented("Subclasses must implement has_input")
425 """Indicate end of output from last sent command."""
429 """Indicate server error to the client."""
430 s = re.sub(r'\s+', ' ', str(s))
431 self.write('\nerror %s\n' % s)
433 def _check_ok(self, onempty):
436 for rl in linereader(self):
437 #log('%d got line: %r\n' % (os.getpid(), rl))
438 if not rl: # empty line
442 elif rl.startswith('error '):
443 #log('client: error: %s\n' % rl[6:])
447 raise Exception('server exited unexpectedly; see errors above')
449 def drain_and_check_ok(self):
450 """Remove all data for the current command from input stream."""
453 return self._check_ok(onempty)
456 """Verify that server action completed successfully."""
458 raise Exception('expected "ok", got %r' % rl)
459 return self._check_ok(onempty)
462 class Conn(BaseConn):
463 def __init__(self, inp, outp):
464 BaseConn.__init__(self, outp)
467 def _read(self, size):
468 return self.inp.read(size)
471 return self.inp.readline()
474 [rl, wl, xl] = select.select([self.inp.fileno()], [], [], 0)
476 assert(rl[0] == self.inp.fileno())
482 def checked_reader(fd, n):
484 rl, _, _ = select.select([fd], [], [])
487 if not buf: raise Exception("Unexpected EOF reading %d more bytes" % n)
492 MAX_PACKET = 128 * 1024
493 def mux(p, outfd, outr, errr):
496 while p.poll() is None:
497 rl, _, _ = select.select(fds, [], [])
500 buf = os.read(outr, MAX_PACKET)
502 os.write(outfd, struct.pack('!IB', len(buf), 1) + buf)
504 buf = os.read(errr, 1024)
506 os.write(outfd, struct.pack('!IB', len(buf), 2) + buf)
508 os.write(outfd, struct.pack('!IB', 0, 3))
511 class DemuxConn(BaseConn):
512 """A helper class for bup's client-server protocol."""
513 def __init__(self, infd, outp):
514 BaseConn.__init__(self, outp)
515 # Anything that comes through before the sync string was not
516 # multiplexed and can be assumed to be debug/log before mux init.
518 while tail != 'BUPMUX':
519 b = os.read(infd, (len(tail) < 6) and (6-len(tail)) or 1)
521 raise IOError('demux: unexpected EOF during initialization')
523 sys.stderr.write(tail[:-6]) # pre-mux log messages
530 def write(self, data):
532 BaseConn.write(self, data)
534 def _next_packet(self, timeout):
535 if self.closed: return False
536 rl, wl, xl = select.select([self.infd], [], [], timeout)
537 if not rl: return False
538 assert(rl[0] == self.infd)
539 ns = ''.join(checked_reader(self.infd, 5))
540 n, fdw = struct.unpack('!IB', ns)
541 assert(n <= MAX_PACKET)
543 self.reader = checked_reader(self.infd, n)
545 for buf in checked_reader(self.infd, n):
546 sys.stderr.write(buf)
549 debug2("DemuxConn: marked closed\n")
552 def _load_buf(self, timeout):
553 if self.buf is not None:
555 while not self.closed:
556 while not self.reader:
557 if not self._next_packet(timeout):
560 self.buf = self.reader.next()
562 except StopIteration:
566 def _read_parts(self, ix_fn):
567 while self._load_buf(None):
568 assert(self.buf is not None)
570 if i is None or i == len(self.buf):
575 self.buf = self.buf[i:]
583 return buf.index('\n')+1
586 return ''.join(self._read_parts(find_eol))
588 def _read(self, size):
590 def until_size(buf): # Closes on csize
591 if len(buf) < csize[0]:
596 return ''.join(self._read_parts(until_size))
599 return self._load_buf(0)
603 """Generate a list of input lines from 'f' without terminating newlines."""
611 def chunkyreader(f, count = None):
612 """Generate a list of chunks of data read from 'f'.
614 If count is None, read until EOF is reached.
616 If count is a positive integer, read 'count' bytes from 'f'. If EOF is
617 reached while reading, raise IOError.
621 b = f.read(min(count, 65536))
623 raise IOError('EOF with %d bytes remaining' % count)
634 def atomically_replaced_file(name, mode='w', buffering=-1):
635 """Yield a file that will be atomically renamed name when leaving the block.
637 This contextmanager yields an open file object that is backed by a
638 temporary file which will be renamed (atomically) to the target
639 name if everything succeeds.
641 The mode and buffering arguments are handled exactly as with open,
642 and the yielded file will have very restrictive permissions, as
647 with atomically_replaced_file('foo.txt', 'w') as f:
648 f.write('hello jack.')
652 (ffd, tempname) = tempfile.mkstemp(dir=os.path.dirname(name),
653 text=('b' not in mode))
656 f = os.fdopen(ffd, mode, buffering)
664 os.rename(tempname, name)
666 unlink(tempname) # nonexistant file is ignored
670 """Append "/" to 's' if it doesn't aleady end in "/"."""
671 if s and not s.endswith('/'):
677 def _mmap_do(f, sz, flags, prot, close):
679 st = os.fstat(f.fileno())
682 # trying to open a zero-length map gives an error, but an empty
683 # string has all the same behaviour of a zero-length map, ie. it has
686 map = mmap.mmap(f.fileno(), sz, flags, prot)
688 f.close() # map will persist beyond file close
692 def mmap_read(f, sz = 0, close=True):
693 """Create a read-only memory mapped region on file 'f'.
694 If sz is 0, the region will cover the entire file.
696 return _mmap_do(f, sz, mmap.MAP_PRIVATE, mmap.PROT_READ, close)
699 def mmap_readwrite(f, sz = 0, close=True):
700 """Create a read-write memory mapped region on file 'f'.
701 If sz is 0, the region will cover the entire file.
703 return _mmap_do(f, sz, mmap.MAP_SHARED, mmap.PROT_READ|mmap.PROT_WRITE,
707 def mmap_readwrite_private(f, sz = 0, close=True):
708 """Create a read-write memory mapped region on file 'f'.
709 If sz is 0, the region will cover the entire file.
710 The map is private, which means the changes are never flushed back to the
713 return _mmap_do(f, sz, mmap.MAP_PRIVATE, mmap.PROT_READ|mmap.PROT_WRITE,
717 def parse_timestamp(epoch_str):
718 """Return the number of nanoseconds since the epoch that are described
719 by epoch_str (100ms, 100ns, ...); when epoch_str cannot be parsed,
720 throw a ValueError that may contain additional information."""
721 ns_per = {'s' : 1000000000,
725 match = re.match(r'^((?:[-+]?[0-9]+)?)(s|ms|us|ns)$', epoch_str)
727 if re.match(r'^([-+]?[0-9]+)$', epoch_str):
728 raise ValueError('must include units, i.e. 100ns, 100ms, ...')
730 (n, units) = match.group(1, 2)
734 return n * ns_per[units]
738 """Parse data size information into a float number.
740 Here are some examples of conversions:
741 199.2k means 203981 bytes
742 1GB means 1073741824 bytes
743 2.1 tb means 2199023255552 bytes
745 g = re.match(r'([-+\d.e]+)\s*(\w*)', str(s))
747 raise ValueError("can't parse %r as a number" % s)
748 (val, unit) = g.groups()
751 if unit in ['t', 'tb']:
752 mult = 1024*1024*1024*1024
753 elif unit in ['g', 'gb']:
754 mult = 1024*1024*1024
755 elif unit in ['m', 'mb']:
757 elif unit in ['k', 'kb']:
759 elif unit in ['', 'b']:
762 raise ValueError("invalid unit %r in number %r" % (unit, s))
767 """Count the number of elements in an iterator. (consumes the iterator)"""
768 return reduce(lambda x,y: x+1, l)
773 """Append an error message to the list of saved errors.
775 Once processing is able to stop and output the errors, the saved errors are
776 accessible in the module variable helpers.saved_errors.
778 saved_errors.append(e)
788 """Replace the default exception handler for KeyboardInterrupt (Ctrl-C).
790 The new exception handler will make sure that bup will exit without an ugly
791 stacktrace when Ctrl-C is hit.
793 oldhook = sys.excepthook
794 def newhook(exctype, value, traceback):
795 if exctype == KeyboardInterrupt:
796 log('\nInterrupted.\n')
798 return oldhook(exctype, value, traceback)
799 sys.excepthook = newhook
802 def columnate(l, prefix):
803 """Format elements of 'l' in columns with 'prefix' leading each line.
805 The number of columns is determined automatically based on the string
811 clen = max(len(s) for s in l)
812 ncols = (tty_width() - len(prefix)) / (clen + 2)
817 while len(l) % ncols:
820 for s in range(0, len(l), rows):
821 cols.append(l[s:s+rows])
823 for row in zip(*cols):
824 out += prefix + ''.join(('%-*s' % (clen+2, s)) for s in row) + '\n'
828 def parse_date_or_fatal(str, fatal):
829 """Parses the given date or calls Option.fatal().
830 For now we expect a string that contains a float."""
833 except ValueError, e:
834 raise fatal('invalid date format (should be a float): %r' % e)
839 def parse_excludes(options, fatal):
840 """Traverse the options and extract all excludes, or call Option.fatal()."""
844 (option, parameter) = flag
845 if option == '--exclude':
846 excluded_paths.append(realpath(parameter))
847 elif option == '--exclude-from':
849 f = open(realpath(parameter))
851 raise fatal("couldn't read %s" % parameter)
852 for exclude_path in f.readlines():
853 # FIXME: perhaps this should be rstrip('\n')
854 exclude_path = realpath(exclude_path.strip())
856 excluded_paths.append(exclude_path)
857 return sorted(frozenset(excluded_paths))
860 def parse_rx_excludes(options, fatal):
861 """Traverse the options and extract all rx excludes, or call
863 excluded_patterns = []
866 (option, parameter) = flag
867 if option == '--exclude-rx':
869 excluded_patterns.append(re.compile(parameter))
871 fatal('invalid --exclude-rx pattern (%s): %s' % (parameter, ex))
872 elif option == '--exclude-rx-from':
874 f = open(realpath(parameter))
876 raise fatal("couldn't read %s" % parameter)
877 for pattern in f.readlines():
878 spattern = pattern.rstrip('\n')
882 excluded_patterns.append(re.compile(spattern))
884 fatal('invalid --exclude-rx pattern (%s): %s' % (spattern, ex))
885 return excluded_patterns
888 def should_rx_exclude_path(path, exclude_rxs):
889 """Return True if path matches a regular expression in exclude_rxs."""
890 for rx in exclude_rxs:
892 debug1('Skipping %r: excluded by rx pattern %r.\n'
893 % (path, rx.pattern))
898 # FIXME: Carefully consider the use of functions (os.path.*, etc.)
899 # that resolve against the current filesystem in the strip/graft
900 # functions for example, but elsewhere as well. I suspect bup's not
901 # always being careful about that. For some cases, the contents of
902 # the current filesystem should be irrelevant, and consulting it might
903 # produce the wrong result, perhaps via unintended symlink resolution,
906 def path_components(path):
907 """Break path into a list of pairs of the form (name,
908 full_path_to_name). Path must start with '/'.
910 '/home/foo' -> [('', '/'), ('home', '/home'), ('foo', '/home/foo')]"""
911 if not path.startswith('/'):
912 raise Exception, 'path must start with "/": %s' % path
913 # Since we assume path startswith('/'), we can skip the first element.
915 norm_path = os.path.abspath(path)
919 for p in norm_path.split('/')[1:]:
921 result.append((p, full_path))
925 def stripped_path_components(path, strip_prefixes):
926 """Strip any prefix in strip_prefixes from path and return a list
927 of path components where each component is (name,
928 none_or_full_fs_path_to_name). Assume path startswith('/').
929 See thelpers.py for examples."""
930 normalized_path = os.path.abspath(path)
931 sorted_strip_prefixes = sorted(strip_prefixes, key=len, reverse=True)
932 for bp in sorted_strip_prefixes:
933 normalized_bp = os.path.abspath(bp)
934 if normalized_bp == '/':
936 if normalized_path.startswith(normalized_bp):
937 prefix = normalized_path[:len(normalized_bp)]
939 for p in normalized_path[len(normalized_bp):].split('/'):
943 result.append((p, prefix))
946 return path_components(path)
949 def grafted_path_components(graft_points, path):
950 # Create a result that consists of some number of faked graft
951 # directories before the graft point, followed by all of the real
952 # directories from path that are after the graft point. Arrange
953 # for the directory at the graft point in the result to correspond
954 # to the "orig" directory in --graft orig=new. See t/thelpers.py
957 # Note that given --graft orig=new, orig and new have *nothing* to
958 # do with each other, even if some of their component names
959 # match. i.e. --graft /foo/bar/baz=/foo/bar/bax is semantically
960 # equivalent to --graft /foo/bar/baz=/x/y/z, or even
963 # FIXME: This can't be the best solution...
964 clean_path = os.path.abspath(path)
965 for graft_point in graft_points:
966 old_prefix, new_prefix = graft_point
967 # Expand prefixes iff not absolute paths.
968 old_prefix = os.path.normpath(old_prefix)
969 new_prefix = os.path.normpath(new_prefix)
970 if clean_path.startswith(old_prefix):
971 escaped_prefix = re.escape(old_prefix)
972 grafted_path = re.sub(r'^' + escaped_prefix, new_prefix, clean_path)
973 # Handle /foo=/ (at least) -- which produces //whatever.
974 grafted_path = '/' + grafted_path.lstrip('/')
975 clean_path_components = path_components(clean_path)
976 # Count the components that were stripped.
977 strip_count = 0 if old_prefix == '/' else old_prefix.count('/')
978 new_prefix_parts = new_prefix.split('/')
979 result_prefix = grafted_path.split('/')[:new_prefix.count('/')]
980 result = [(p, None) for p in result_prefix] \
981 + clean_path_components[strip_count:]
982 # Now set the graft point name to match the end of new_prefix.
983 graft_point = len(result_prefix)
984 result[graft_point] = \
985 (new_prefix_parts[-1], clean_path_components[strip_count][1])
986 if new_prefix == '/': # --graft ...=/ is a special case.
989 return path_components(clean_path)
995 _localtime = getattr(_helpers, 'localtime', None)
998 bup_time = namedtuple('bup_time', ['tm_year', 'tm_mon', 'tm_mday',
999 'tm_hour', 'tm_min', 'tm_sec',
1000 'tm_wday', 'tm_yday',
1001 'tm_isdst', 'tm_gmtoff', 'tm_zone'])
1003 # Define a localtime() that returns bup_time when possible. Note:
1004 # this means that any helpers.localtime() results may need to be
1005 # passed through to_py_time() before being passed to python's time
1006 # module, which doesn't appear willing to ignore the extra items.
1008 def localtime(time):
1009 return bup_time(*_helpers.localtime(time))
1010 def utc_offset_str(t):
1011 'Return the local offset from UTC as "+hhmm" or "-hhmm" for time t.'
1012 off = localtime(t).tm_gmtoff
1014 return "%+03d%02d" % (hrs, abs(off - (hrs * 60 * 60)))
1016 if isinstance(x, time.struct_time):
1018 return time.struct_time(x[:9])
1020 localtime = time.localtime
1021 def utc_offset_str(t):
1022 return time.strftime('%z', localtime(t))