1 """Helper functions and classes for bup."""
3 from ctypes import sizeof, c_void_p
5 import sys, os, pwd, subprocess, errno, socket, select, mmap, stat, re, struct
6 import hashlib, heapq, operator, time, grp
8 from bup import _version, _helpers
9 import bup._helpers as _helpers
12 # This function should really be in helpers, not in bup.options. But we
13 # want options.py to be standalone so people can include it in other projects.
14 from bup.options import _tty_width
15 tty_width = _tty_width
19 """Convert the string 's' to an integer. Return 0 if s is not a number."""
27 """Convert the string 's' to a float. Return 0 if s is not a number."""
29 return float(s or '0')
34 buglvl = atoi(os.environ.get('BUP_DEBUG', 0))
37 # If the platform doesn't have fdatasync (OS X), fall back to fsync.
39 fdatasync = os.fdatasync
40 except AttributeError:
44 # Write (blockingly) to sockets that may or may not be in blocking mode.
45 # We need this because our stderr is sometimes eaten by subprocesses
46 # (probably ssh) that sometimes make it nonblocking, if only temporarily,
47 # leading to race conditions. Ick. We'll do it the hard way.
48 def _hard_write(fd, buf):
50 (r,w,x) = select.select([], [fd], [], None)
52 raise IOError('select(fd) returned without being writable')
54 sz = os.write(fd, buf)
56 if e.errno != errno.EAGAIN:
64 """Print a log message to stderr."""
67 _hard_write(sys.stderr.fileno(), s)
81 istty1 = os.isatty(1) or (atoi(os.environ.get('BUP_FORCE_TTY')) & 1)
82 istty2 = os.isatty(2) or (atoi(os.environ.get('BUP_FORCE_TTY')) & 2)
85 """Calls log() if stderr is a TTY. Does nothing otherwise."""
93 """Calls progress() only if we haven't printed progress in a while.
95 This avoids overloading the stderr buffer with excess junk.
99 if now - _last_prog > 0.1:
105 """Calls progress() to redisplay the most recent progress message.
107 Useful after you've printed some other message that wipes out the
110 if _last_progress and _last_progress.endswith('\r'):
111 progress(_last_progress)
114 def mkdirp(d, mode=None):
115 """Recursively create directories on path 'd'.
117 Unlike os.makedirs(), it doesn't raise an exception if the last element of
118 the path already exists.
126 if e.errno == errno.EEXIST:
132 _unspecified_next_default = object()
134 def _fallback_next(it, default=_unspecified_next_default):
135 """Retrieve the next item from the iterator by calling its
136 next() method. If default is given, it is returned if the
137 iterator is exhausted, otherwise StopIteration is raised."""
139 if default is _unspecified_next_default:
144 except StopIteration:
147 if sys.version_info < (2, 6):
148 next = _fallback_next
151 def merge_iter(iters, pfreq, pfunc, pfinal, key=None):
153 samekey = lambda e, pe: getattr(e, key) == getattr(pe, key, None)
155 samekey = operator.eq
157 total = sum(len(it) for it in iters)
158 iters = (iter(it) for it in iters)
159 heap = ((next(it, None),it) for it in iters)
160 heap = [(e,it) for e,it in heap if e]
165 if not count % pfreq:
168 if not samekey(e, pe):
173 e = it.next() # Don't use next() function, it's too expensive
174 except StopIteration:
175 heapq.heappop(heap) # remove current
177 heapq.heapreplace(heap, (e, it)) # shift current to new location
182 """Delete a file at path 'f' if it currently exists.
184 Unlike os.unlink(), does not throw an exception if the file didn't already
190 if e.errno == errno.ENOENT:
191 pass # it doesn't exist, that's what you asked for
194 def readpipe(argv, preexec_fn=None):
195 """Run a subprocess and return its output."""
196 p = subprocess.Popen(argv, stdout=subprocess.PIPE, preexec_fn=preexec_fn)
197 out, err = p.communicate()
198 if p.returncode != 0:
199 raise Exception('subprocess %r failed with status %d'
200 % (' '.join(argv), p.returncode))
205 _arg_max = os.sysconf('SC_ARG_MAX')
208 except ValueError, ex:
209 print >> sys.stderr, 'Cannot find SC_ARG_MAX, please report a bug.'
213 def _argmax_base(command):
216 base_size += len(command) + 1
217 for k, v in environ.iteritems():
218 base_size += len(k) + len(v) + 2 + sizeof(c_void_p)
222 def _argmax_args_size(args):
223 return sum(len(x) + 1 + sizeof(c_void_p) for x in args)
226 def batchpipe(command, args, preexec_fn=None, arg_max=None):
227 """If args is not empty, yield the output produced by calling the
228 command list with args as a sequence of strings (It may be necessary
229 to return multiple strings in order to respect ARG_MAX)."""
230 # The optional arg_max arg is a workaround for an issue with the
231 # current wvtest behavior.
234 base_size = _argmax_base(command)
236 room = arg_max - base_size
239 next_size = _argmax_args_size(args[i:i+1])
240 if room - next_size < 0:
246 assert(len(sub_args))
247 yield readpipe(command + sub_args, preexec_fn=preexec_fn)
251 """Get the absolute path of a file.
253 Behaves like os.path.realpath, but doesn't follow a symlink for the last
254 element. (ie. if 'p' itself is a symlink, this one won't follow it, but it
255 will follow symlinks in p's directory)
261 if st and stat.S_ISLNK(st.st_mode):
262 (dir, name) = os.path.split(p)
263 dir = os.path.realpath(dir)
264 out = os.path.join(dir, name)
266 out = os.path.realpath(p)
267 #log('realpathing:%r,%r\n' % (p, out))
271 def detect_fakeroot():
272 "Return True if we appear to be running under fakeroot."
273 return os.getenv("FAKEROOTKEY") != None
277 if sys.platform.startswith('cygwin'):
279 return ctypes.cdll.shell32.IsUserAnAdmin()
281 return os.geteuid() == 0
284 def _cache_key_value(get_value, key, cache):
285 """Return (value, was_cached). If there is a value in the cache
286 for key, use that, otherwise, call get_value(key) which should
287 throw a KeyError if there is no value -- in which case the cached
288 and returned value will be None.
290 try: # Do we already have it (or know there wasn't one)?
297 cache[key] = value = get_value(key)
303 _uid_to_pwd_cache = {}
304 _name_to_pwd_cache = {}
306 def pwd_from_uid(uid):
307 """Return password database entry for uid (may be a cached value).
308 Return None if no entry is found.
310 global _uid_to_pwd_cache, _name_to_pwd_cache
311 entry, cached = _cache_key_value(pwd.getpwuid, uid, _uid_to_pwd_cache)
312 if entry and not cached:
313 _name_to_pwd_cache[entry.pw_name] = entry
317 def pwd_from_name(name):
318 """Return password database entry for name (may be a cached value).
319 Return None if no entry is found.
321 global _uid_to_pwd_cache, _name_to_pwd_cache
322 entry, cached = _cache_key_value(pwd.getpwnam, name, _name_to_pwd_cache)
323 if entry and not cached:
324 _uid_to_pwd_cache[entry.pw_uid] = entry
328 _gid_to_grp_cache = {}
329 _name_to_grp_cache = {}
331 def grp_from_gid(gid):
332 """Return password database entry for gid (may be a cached value).
333 Return None if no entry is found.
335 global _gid_to_grp_cache, _name_to_grp_cache
336 entry, cached = _cache_key_value(grp.getgrgid, gid, _gid_to_grp_cache)
337 if entry and not cached:
338 _name_to_grp_cache[entry.gr_name] = entry
342 def grp_from_name(name):
343 """Return password database entry for name (may be a cached value).
344 Return None if no entry is found.
346 global _gid_to_grp_cache, _name_to_grp_cache
347 entry, cached = _cache_key_value(grp.getgrnam, name, _name_to_grp_cache)
348 if entry and not cached:
349 _gid_to_grp_cache[entry.gr_gid] = entry
355 """Get the user's login name."""
359 _username = pwd_from_uid(uid)[0] or 'user%d' % uid
365 """Get the user's full name."""
367 if not _userfullname:
369 entry = pwd_from_uid(uid)
371 _userfullname = entry[4].split(',')[0] or entry[0]
372 if not _userfullname:
373 _userfullname = 'user%d' % uid
379 """Get the FQDN of this machine."""
382 _hostname = socket.getfqdn()
386 _resource_path = None
387 def resource_path(subdir=''):
388 global _resource_path
389 if not _resource_path:
390 _resource_path = os.environ.get('BUP_RESOURCE_PATH') or '.'
391 return os.path.join(_resource_path, subdir)
393 def format_filesize(size):
398 exponent = int(math.log(size) / math.log(unit))
399 size_prefix = "KMGTPE"[exponent - 1]
400 return "%.1f%s" % (size / math.pow(unit, exponent), size_prefix)
403 class NotOk(Exception):
408 def __init__(self, outp):
412 while self._read(65536): pass
414 def read(self, size):
415 """Read 'size' bytes from input stream."""
417 return self._read(size)
420 """Read from input stream until a newline is found."""
422 return self._readline()
424 def write(self, data):
425 """Write 'data' to output stream."""
426 #log('%d writing: %d bytes\n' % (os.getpid(), len(data)))
427 self.outp.write(data)
430 """Return true if input stream is readable."""
431 raise NotImplemented("Subclasses must implement has_input")
434 """Indicate end of output from last sent command."""
438 """Indicate server error to the client."""
439 s = re.sub(r'\s+', ' ', str(s))
440 self.write('\nerror %s\n' % s)
442 def _check_ok(self, onempty):
445 for rl in linereader(self):
446 #log('%d got line: %r\n' % (os.getpid(), rl))
447 if not rl: # empty line
451 elif rl.startswith('error '):
452 #log('client: error: %s\n' % rl[6:])
456 raise Exception('server exited unexpectedly; see errors above')
458 def drain_and_check_ok(self):
459 """Remove all data for the current command from input stream."""
462 return self._check_ok(onempty)
465 """Verify that server action completed successfully."""
467 raise Exception('expected "ok", got %r' % rl)
468 return self._check_ok(onempty)
471 class Conn(BaseConn):
472 def __init__(self, inp, outp):
473 BaseConn.__init__(self, outp)
476 def _read(self, size):
477 return self.inp.read(size)
480 return self.inp.readline()
483 [rl, wl, xl] = select.select([self.inp.fileno()], [], [], 0)
485 assert(rl[0] == self.inp.fileno())
491 def checked_reader(fd, n):
493 rl, _, _ = select.select([fd], [], [])
496 if not buf: raise Exception("Unexpected EOF reading %d more bytes" % n)
501 MAX_PACKET = 128 * 1024
502 def mux(p, outfd, outr, errr):
505 while p.poll() is None:
506 rl, _, _ = select.select(fds, [], [])
509 buf = os.read(outr, MAX_PACKET)
511 os.write(outfd, struct.pack('!IB', len(buf), 1) + buf)
513 buf = os.read(errr, 1024)
515 os.write(outfd, struct.pack('!IB', len(buf), 2) + buf)
517 os.write(outfd, struct.pack('!IB', 0, 3))
520 class DemuxConn(BaseConn):
521 """A helper class for bup's client-server protocol."""
522 def __init__(self, infd, outp):
523 BaseConn.__init__(self, outp)
524 # Anything that comes through before the sync string was not
525 # multiplexed and can be assumed to be debug/log before mux init.
527 while tail != 'BUPMUX':
528 b = os.read(infd, (len(tail) < 6) and (6-len(tail)) or 1)
530 raise IOError('demux: unexpected EOF during initialization')
532 sys.stderr.write(tail[:-6]) # pre-mux log messages
539 def write(self, data):
541 BaseConn.write(self, data)
543 def _next_packet(self, timeout):
544 if self.closed: return False
545 rl, wl, xl = select.select([self.infd], [], [], timeout)
546 if not rl: return False
547 assert(rl[0] == self.infd)
548 ns = ''.join(checked_reader(self.infd, 5))
549 n, fdw = struct.unpack('!IB', ns)
550 assert(n <= MAX_PACKET)
552 self.reader = checked_reader(self.infd, n)
554 for buf in checked_reader(self.infd, n):
555 sys.stderr.write(buf)
558 debug2("DemuxConn: marked closed\n")
561 def _load_buf(self, timeout):
562 if self.buf is not None:
564 while not self.closed:
565 while not self.reader:
566 if not self._next_packet(timeout):
569 self.buf = self.reader.next()
571 except StopIteration:
575 def _read_parts(self, ix_fn):
576 while self._load_buf(None):
577 assert(self.buf is not None)
579 if i is None or i == len(self.buf):
584 self.buf = self.buf[i:]
592 return buf.index('\n')+1
595 return ''.join(self._read_parts(find_eol))
597 def _read(self, size):
599 def until_size(buf): # Closes on csize
600 if len(buf) < csize[0]:
605 return ''.join(self._read_parts(until_size))
608 return self._load_buf(0)
612 """Generate a list of input lines from 'f' without terminating newlines."""
620 def chunkyreader(f, count = None):
621 """Generate a list of chunks of data read from 'f'.
623 If count is None, read until EOF is reached.
625 If count is a positive integer, read 'count' bytes from 'f'. If EOF is
626 reached while reading, raise IOError.
630 b = f.read(min(count, 65536))
632 raise IOError('EOF with %d bytes remaining' % count)
643 """Append "/" to 's' if it doesn't aleady end in "/"."""
644 if s and not s.endswith('/'):
650 def _mmap_do(f, sz, flags, prot, close):
652 st = os.fstat(f.fileno())
655 # trying to open a zero-length map gives an error, but an empty
656 # string has all the same behaviour of a zero-length map, ie. it has
659 map = mmap.mmap(f.fileno(), sz, flags, prot)
661 f.close() # map will persist beyond file close
665 def mmap_read(f, sz = 0, close=True):
666 """Create a read-only memory mapped region on file 'f'.
667 If sz is 0, the region will cover the entire file.
669 return _mmap_do(f, sz, mmap.MAP_PRIVATE, mmap.PROT_READ, close)
672 def mmap_readwrite(f, sz = 0, close=True):
673 """Create a read-write memory mapped region on file 'f'.
674 If sz is 0, the region will cover the entire file.
676 return _mmap_do(f, sz, mmap.MAP_SHARED, mmap.PROT_READ|mmap.PROT_WRITE,
680 def mmap_readwrite_private(f, sz = 0, close=True):
681 """Create a read-write memory mapped region on file 'f'.
682 If sz is 0, the region will cover the entire file.
683 The map is private, which means the changes are never flushed back to the
686 return _mmap_do(f, sz, mmap.MAP_PRIVATE, mmap.PROT_READ|mmap.PROT_WRITE,
690 def parse_timestamp(epoch_str):
691 """Return the number of nanoseconds since the epoch that are described
692 by epoch_str (100ms, 100ns, ...); when epoch_str cannot be parsed,
693 throw a ValueError that may contain additional information."""
694 ns_per = {'s' : 1000000000,
698 match = re.match(r'^((?:[-+]?[0-9]+)?)(s|ms|us|ns)$', epoch_str)
700 if re.match(r'^([-+]?[0-9]+)$', epoch_str):
701 raise ValueError('must include units, i.e. 100ns, 100ms, ...')
703 (n, units) = match.group(1, 2)
707 return n * ns_per[units]
711 """Parse data size information into a float number.
713 Here are some examples of conversions:
714 199.2k means 203981 bytes
715 1GB means 1073741824 bytes
716 2.1 tb means 2199023255552 bytes
718 g = re.match(r'([-+\d.e]+)\s*(\w*)', str(s))
720 raise ValueError("can't parse %r as a number" % s)
721 (val, unit) = g.groups()
724 if unit in ['t', 'tb']:
725 mult = 1024*1024*1024*1024
726 elif unit in ['g', 'gb']:
727 mult = 1024*1024*1024
728 elif unit in ['m', 'mb']:
730 elif unit in ['k', 'kb']:
732 elif unit in ['', 'b']:
735 raise ValueError("invalid unit %r in number %r" % (unit, s))
740 """Count the number of elements in an iterator. (consumes the iterator)"""
741 return reduce(lambda x,y: x+1, l)
746 """Append an error message to the list of saved errors.
748 Once processing is able to stop and output the errors, the saved errors are
749 accessible in the module variable helpers.saved_errors.
751 saved_errors.append(e)
761 """Replace the default exception handler for KeyboardInterrupt (Ctrl-C).
763 The new exception handler will make sure that bup will exit without an ugly
764 stacktrace when Ctrl-C is hit.
766 oldhook = sys.excepthook
767 def newhook(exctype, value, traceback):
768 if exctype == KeyboardInterrupt:
769 log('\nInterrupted.\n')
771 return oldhook(exctype, value, traceback)
772 sys.excepthook = newhook
775 def columnate(l, prefix):
776 """Format elements of 'l' in columns with 'prefix' leading each line.
778 The number of columns is determined automatically based on the string
784 clen = max(len(s) for s in l)
785 ncols = (tty_width() - len(prefix)) / (clen + 2)
790 while len(l) % ncols:
793 for s in range(0, len(l), rows):
794 cols.append(l[s:s+rows])
796 for row in zip(*cols):
797 out += prefix + ''.join(('%-*s' % (clen+2, s)) for s in row) + '\n'
801 def parse_date_or_fatal(str, fatal):
802 """Parses the given date or calls Option.fatal().
803 For now we expect a string that contains a float."""
806 except ValueError, e:
807 raise fatal('invalid date format (should be a float): %r' % e)
812 def parse_excludes(options, fatal):
813 """Traverse the options and extract all excludes, or call Option.fatal()."""
817 (option, parameter) = flag
818 if option == '--exclude':
819 excluded_paths.append(realpath(parameter))
820 elif option == '--exclude-from':
822 f = open(realpath(parameter))
824 raise fatal("couldn't read %s" % parameter)
825 for exclude_path in f.readlines():
826 excluded_paths.append(realpath(exclude_path.strip()))
827 return sorted(frozenset(excluded_paths))
830 def parse_rx_excludes(options, fatal):
831 """Traverse the options and extract all rx excludes, or call
833 excluded_patterns = []
836 (option, parameter) = flag
837 if option == '--exclude-rx':
839 excluded_patterns.append(re.compile(parameter))
841 fatal('invalid --exclude-rx pattern (%s): %s' % (parameter, ex))
842 elif option == '--exclude-rx-from':
844 f = open(realpath(parameter))
846 raise fatal("couldn't read %s" % parameter)
847 for pattern in f.readlines():
848 spattern = pattern.rstrip('\n')
850 excluded_patterns.append(re.compile(spattern))
852 fatal('invalid --exclude-rx pattern (%s): %s' % (spattern, ex))
853 return excluded_patterns
856 def should_rx_exclude_path(path, exclude_rxs):
857 """Return True if path matches a regular expression in exclude_rxs."""
858 for rx in exclude_rxs:
860 debug1('Skipping %r: excluded by rx pattern %r.\n'
861 % (path, rx.pattern))
866 # FIXME: Carefully consider the use of functions (os.path.*, etc.)
867 # that resolve against the current filesystem in the strip/graft
868 # functions for example, but elsewhere as well. I suspect bup's not
869 # always being careful about that. For some cases, the contents of
870 # the current filesystem should be irrelevant, and consulting it might
871 # produce the wrong result, perhaps via unintended symlink resolution,
874 def path_components(path):
875 """Break path into a list of pairs of the form (name,
876 full_path_to_name). Path must start with '/'.
878 '/home/foo' -> [('', '/'), ('home', '/home'), ('foo', '/home/foo')]"""
879 if not path.startswith('/'):
880 raise Exception, 'path must start with "/": %s' % path
881 # Since we assume path startswith('/'), we can skip the first element.
883 norm_path = os.path.abspath(path)
887 for p in norm_path.split('/')[1:]:
889 result.append((p, full_path))
893 def stripped_path_components(path, strip_prefixes):
894 """Strip any prefix in strip_prefixes from path and return a list
895 of path components where each component is (name,
896 none_or_full_fs_path_to_name). Assume path startswith('/').
897 See thelpers.py for examples."""
898 normalized_path = os.path.abspath(path)
899 sorted_strip_prefixes = sorted(strip_prefixes, key=len, reverse=True)
900 for bp in sorted_strip_prefixes:
901 normalized_bp = os.path.abspath(bp)
902 if normalized_path.startswith(normalized_bp):
903 prefix = normalized_path[:len(normalized_bp)]
905 for p in normalized_path[len(normalized_bp):].split('/'):
909 result.append((p, prefix))
912 return path_components(path)
915 def grafted_path_components(graft_points, path):
916 # Create a result that consists of some number of faked graft
917 # directories before the graft point, followed by all of the real
918 # directories from path that are after the graft point. Arrange
919 # for the directory at the graft point in the result to correspond
920 # to the "orig" directory in --graft orig=new. See t/thelpers.py
923 # Note that given --graft orig=new, orig and new have *nothing* to
924 # do with each other, even if some of their component names
925 # match. i.e. --graft /foo/bar/baz=/foo/bar/bax is semantically
926 # equivalent to --graft /foo/bar/baz=/x/y/z, or even
929 # FIXME: This can't be the best solution...
930 clean_path = os.path.abspath(path)
931 for graft_point in graft_points:
932 old_prefix, new_prefix = graft_point
933 # Expand prefixes iff not absolute paths.
934 old_prefix = os.path.normpath(old_prefix)
935 new_prefix = os.path.normpath(new_prefix)
936 if clean_path.startswith(old_prefix):
937 escaped_prefix = re.escape(old_prefix)
938 grafted_path = re.sub(r'^' + escaped_prefix, new_prefix, clean_path)
939 # Handle /foo=/ (at least) -- which produces //whatever.
940 grafted_path = '/' + grafted_path.lstrip('/')
941 clean_path_components = path_components(clean_path)
942 # Count the components that were stripped.
943 strip_count = 0 if old_prefix == '/' else old_prefix.count('/')
944 new_prefix_parts = new_prefix.split('/')
945 result_prefix = grafted_path.split('/')[:new_prefix.count('/')]
946 result = [(p, None) for p in result_prefix] \
947 + clean_path_components[strip_count:]
948 # Now set the graft point name to match the end of new_prefix.
949 graft_point = len(result_prefix)
950 result[graft_point] = \
951 (new_prefix_parts[-1], clean_path_components[strip_count][1])
952 if new_prefix == '/': # --graft ...=/ is a special case.
955 return path_components(clean_path)
960 """Format bup's version date string for output."""
961 return _version.DATE.split(' ')[0]
964 def version_commit():
965 """Get the commit hash of bup's current version."""
966 return _version.COMMIT
970 """Format bup's version tag (the official version number).
972 When generated from a commit other than one pointed to with a tag, the
973 returned string will be "unknown-" followed by the first seven positions of
976 names = _version.NAMES.strip()
977 assert(names[0] == '(')
978 assert(names[-1] == ')')
980 l = [n.strip() for n in names.split(',')]
982 if n.startswith('tag: bup-'):
984 return 'unknown-%s' % _version.COMMIT[:7]