1 """Helper functions and classes for bup."""
3 from ctypes import sizeof, c_void_p
5 import sys, os, pwd, subprocess, errno, socket, select, mmap, stat, re, struct
6 import config, hashlib, heapq, operator, time, grp
8 from bup import _version, _helpers
9 import bup._helpers as _helpers
12 # This function should really be in helpers, not in bup.options. But we
13 # want options.py to be standalone so people can include it in other projects.
14 from bup.options import _tty_width
15 tty_width = _tty_width
19 """Convert the string 's' to an integer. Return 0 if s is not a number."""
27 """Convert the string 's' to a float. Return 0 if s is not a number."""
29 return float(s or '0')
34 buglvl = atoi(os.environ.get('BUP_DEBUG', 0))
37 # If the platform doesn't have fdatasync (OS X), fall back to fsync.
39 fdatasync = os.fdatasync
40 except AttributeError:
44 # Write (blockingly) to sockets that may or may not be in blocking mode.
45 # We need this because our stderr is sometimes eaten by subprocesses
46 # (probably ssh) that sometimes make it nonblocking, if only temporarily,
47 # leading to race conditions. Ick. We'll do it the hard way.
48 def _hard_write(fd, buf):
50 (r,w,x) = select.select([], [fd], [], None)
52 raise IOError('select(fd) returned without being writable')
54 sz = os.write(fd, buf)
56 if e.errno != errno.EAGAIN:
64 """Print a log message to stderr."""
67 _hard_write(sys.stderr.fileno(), s)
81 istty1 = os.isatty(1) or (atoi(os.environ.get('BUP_FORCE_TTY')) & 1)
82 istty2 = os.isatty(2) or (atoi(os.environ.get('BUP_FORCE_TTY')) & 2)
85 """Calls log() if stderr is a TTY. Does nothing otherwise."""
93 """Calls progress() only if we haven't printed progress in a while.
95 This avoids overloading the stderr buffer with excess junk.
99 if now - _last_prog > 0.1:
105 """Calls progress() to redisplay the most recent progress message.
107 Useful after you've printed some other message that wipes out the
110 if _last_progress and _last_progress.endswith('\r'):
111 progress(_last_progress)
114 def mkdirp(d, mode=None):
115 """Recursively create directories on path 'd'.
117 Unlike os.makedirs(), it doesn't raise an exception if the last element of
118 the path already exists.
126 if e.errno == errno.EEXIST:
132 _unspecified_next_default = object()
134 def _fallback_next(it, default=_unspecified_next_default):
135 """Retrieve the next item from the iterator by calling its
136 next() method. If default is given, it is returned if the
137 iterator is exhausted, otherwise StopIteration is raised."""
139 if default is _unspecified_next_default:
144 except StopIteration:
147 if sys.version_info < (2, 6):
148 next = _fallback_next
151 def merge_iter(iters, pfreq, pfunc, pfinal, key=None):
153 samekey = lambda e, pe: getattr(e, key) == getattr(pe, key, None)
155 samekey = operator.eq
157 total = sum(len(it) for it in iters)
158 iters = (iter(it) for it in iters)
159 heap = ((next(it, None),it) for it in iters)
160 heap = [(e,it) for e,it in heap if e]
165 if not count % pfreq:
168 if not samekey(e, pe):
173 e = it.next() # Don't use next() function, it's too expensive
174 except StopIteration:
175 heapq.heappop(heap) # remove current
177 heapq.heapreplace(heap, (e, it)) # shift current to new location
182 """Delete a file at path 'f' if it currently exists.
184 Unlike os.unlink(), does not throw an exception if the file didn't already
190 if e.errno == errno.ENOENT:
191 pass # it doesn't exist, that's what you asked for
194 def readpipe(argv, preexec_fn=None):
195 """Run a subprocess and return its output."""
196 p = subprocess.Popen(argv, stdout=subprocess.PIPE, preexec_fn=preexec_fn)
197 out, err = p.communicate()
198 if p.returncode != 0:
199 raise Exception('subprocess %r failed with status %d'
200 % (' '.join(argv), p.returncode))
204 def _argmax_base(command):
207 base_size += len(command) + 1
208 for k, v in environ.iteritems():
209 base_size += len(k) + len(v) + 2 + sizeof(c_void_p)
213 def _argmax_args_size(args):
214 return sum(len(x) + 1 + sizeof(c_void_p) for x in args)
217 def batchpipe(command, args, preexec_fn=None):
218 """If args is not empty, yield the output produced by calling the
219 command list with args as a sequence of strings (It may be necessary
220 to return multiple strings in order to respect ARG_MAX)."""
221 base_size = _argmax_base(command)
223 room = config.arg_max - base_size
226 next_size = _argmax_args_size(args[i:i+1])
227 if room - next_size < 0:
233 assert(len(sub_args))
234 yield readpipe(command + sub_args, preexec_fn=preexec_fn)
238 """Get the absolute path of a file.
240 Behaves like os.path.realpath, but doesn't follow a symlink for the last
241 element. (ie. if 'p' itself is a symlink, this one won't follow it, but it
242 will follow symlinks in p's directory)
248 if st and stat.S_ISLNK(st.st_mode):
249 (dir, name) = os.path.split(p)
250 dir = os.path.realpath(dir)
251 out = os.path.join(dir, name)
253 out = os.path.realpath(p)
254 #log('realpathing:%r,%r\n' % (p, out))
258 def detect_fakeroot():
259 "Return True if we appear to be running under fakeroot."
260 return os.getenv("FAKEROOTKEY") != None
264 if sys.platform.startswith('cygwin'):
266 return ctypes.cdll.shell32.IsUserAnAdmin()
268 return os.geteuid() == 0
271 def _cache_key_value(get_value, key, cache):
272 """Return (value, was_cached). If there is a value in the cache
273 for key, use that, otherwise, call get_value(key) which should
274 throw a KeyError if there is no value -- in which case the cached
275 and returned value will be None.
277 try: # Do we already have it (or know there wasn't one)?
284 cache[key] = value = get_value(key)
290 _uid_to_pwd_cache = {}
291 _name_to_pwd_cache = {}
293 def pwd_from_uid(uid):
294 """Return password database entry for uid (may be a cached value).
295 Return None if no entry is found.
297 global _uid_to_pwd_cache, _name_to_pwd_cache
298 entry, cached = _cache_key_value(pwd.getpwuid, uid, _uid_to_pwd_cache)
299 if entry and not cached:
300 _name_to_pwd_cache[entry.pw_name] = entry
304 def pwd_from_name(name):
305 """Return password database entry for name (may be a cached value).
306 Return None if no entry is found.
308 global _uid_to_pwd_cache, _name_to_pwd_cache
309 entry, cached = _cache_key_value(pwd.getpwnam, name, _name_to_pwd_cache)
310 if entry and not cached:
311 _uid_to_pwd_cache[entry.pw_uid] = entry
315 _gid_to_grp_cache = {}
316 _name_to_grp_cache = {}
318 def grp_from_gid(gid):
319 """Return password database entry for gid (may be a cached value).
320 Return None if no entry is found.
322 global _gid_to_grp_cache, _name_to_grp_cache
323 entry, cached = _cache_key_value(grp.getgrgid, gid, _gid_to_grp_cache)
324 if entry and not cached:
325 _name_to_grp_cache[entry.gr_name] = entry
329 def grp_from_name(name):
330 """Return password database entry for name (may be a cached value).
331 Return None if no entry is found.
333 global _gid_to_grp_cache, _name_to_grp_cache
334 entry, cached = _cache_key_value(grp.getgrnam, name, _name_to_grp_cache)
335 if entry and not cached:
336 _gid_to_grp_cache[entry.gr_gid] = entry
342 """Get the user's login name."""
346 _username = pwd_from_uid(uid)[0] or 'user%d' % uid
352 """Get the user's full name."""
354 if not _userfullname:
356 entry = pwd_from_uid(uid)
358 _userfullname = entry[4].split(',')[0] or entry[0]
359 if not _userfullname:
360 _userfullname = 'user%d' % uid
366 """Get the FQDN of this machine."""
369 _hostname = socket.getfqdn()
373 _resource_path = None
374 def resource_path(subdir=''):
375 global _resource_path
376 if not _resource_path:
377 _resource_path = os.environ.get('BUP_RESOURCE_PATH') or '.'
378 return os.path.join(_resource_path, subdir)
380 def format_filesize(size):
385 exponent = int(math.log(size) / math.log(unit))
386 size_prefix = "KMGTPE"[exponent - 1]
387 return "%.1f%s" % (size / math.pow(unit, exponent), size_prefix)
390 class NotOk(Exception):
395 def __init__(self, outp):
399 while self._read(65536): pass
401 def read(self, size):
402 """Read 'size' bytes from input stream."""
404 return self._read(size)
407 """Read from input stream until a newline is found."""
409 return self._readline()
411 def write(self, data):
412 """Write 'data' to output stream."""
413 #log('%d writing: %d bytes\n' % (os.getpid(), len(data)))
414 self.outp.write(data)
417 """Return true if input stream is readable."""
418 raise NotImplemented("Subclasses must implement has_input")
421 """Indicate end of output from last sent command."""
425 """Indicate server error to the client."""
426 s = re.sub(r'\s+', ' ', str(s))
427 self.write('\nerror %s\n' % s)
429 def _check_ok(self, onempty):
432 for rl in linereader(self):
433 #log('%d got line: %r\n' % (os.getpid(), rl))
434 if not rl: # empty line
438 elif rl.startswith('error '):
439 #log('client: error: %s\n' % rl[6:])
443 raise Exception('server exited unexpectedly; see errors above')
445 def drain_and_check_ok(self):
446 """Remove all data for the current command from input stream."""
449 return self._check_ok(onempty)
452 """Verify that server action completed successfully."""
454 raise Exception('expected "ok", got %r' % rl)
455 return self._check_ok(onempty)
458 class Conn(BaseConn):
459 def __init__(self, inp, outp):
460 BaseConn.__init__(self, outp)
463 def _read(self, size):
464 return self.inp.read(size)
467 return self.inp.readline()
470 [rl, wl, xl] = select.select([self.inp.fileno()], [], [], 0)
472 assert(rl[0] == self.inp.fileno())
478 def checked_reader(fd, n):
480 rl, _, _ = select.select([fd], [], [])
483 if not buf: raise Exception("Unexpected EOF reading %d more bytes" % n)
488 MAX_PACKET = 128 * 1024
489 def mux(p, outfd, outr, errr):
492 while p.poll() is None:
493 rl, _, _ = select.select(fds, [], [])
496 buf = os.read(outr, MAX_PACKET)
498 os.write(outfd, struct.pack('!IB', len(buf), 1) + buf)
500 buf = os.read(errr, 1024)
502 os.write(outfd, struct.pack('!IB', len(buf), 2) + buf)
504 os.write(outfd, struct.pack('!IB', 0, 3))
507 class DemuxConn(BaseConn):
508 """A helper class for bup's client-server protocol."""
509 def __init__(self, infd, outp):
510 BaseConn.__init__(self, outp)
511 # Anything that comes through before the sync string was not
512 # multiplexed and can be assumed to be debug/log before mux init.
514 while tail != 'BUPMUX':
515 b = os.read(infd, (len(tail) < 6) and (6-len(tail)) or 1)
517 raise IOError('demux: unexpected EOF during initialization')
519 sys.stderr.write(tail[:-6]) # pre-mux log messages
526 def write(self, data):
528 BaseConn.write(self, data)
530 def _next_packet(self, timeout):
531 if self.closed: return False
532 rl, wl, xl = select.select([self.infd], [], [], timeout)
533 if not rl: return False
534 assert(rl[0] == self.infd)
535 ns = ''.join(checked_reader(self.infd, 5))
536 n, fdw = struct.unpack('!IB', ns)
537 assert(n <= MAX_PACKET)
539 self.reader = checked_reader(self.infd, n)
541 for buf in checked_reader(self.infd, n):
542 sys.stderr.write(buf)
545 debug2("DemuxConn: marked closed\n")
548 def _load_buf(self, timeout):
549 if self.buf is not None:
551 while not self.closed:
552 while not self.reader:
553 if not self._next_packet(timeout):
556 self.buf = self.reader.next()
558 except StopIteration:
562 def _read_parts(self, ix_fn):
563 while self._load_buf(None):
564 assert(self.buf is not None)
566 if i is None or i == len(self.buf):
571 self.buf = self.buf[i:]
579 return buf.index('\n')+1
582 return ''.join(self._read_parts(find_eol))
584 def _read(self, size):
586 def until_size(buf): # Closes on csize
587 if len(buf) < csize[0]:
592 return ''.join(self._read_parts(until_size))
595 return self._load_buf(0)
599 """Generate a list of input lines from 'f' without terminating newlines."""
607 def chunkyreader(f, count = None):
608 """Generate a list of chunks of data read from 'f'.
610 If count is None, read until EOF is reached.
612 If count is a positive integer, read 'count' bytes from 'f'. If EOF is
613 reached while reading, raise IOError.
617 b = f.read(min(count, 65536))
619 raise IOError('EOF with %d bytes remaining' % count)
630 """Append "/" to 's' if it doesn't aleady end in "/"."""
631 if s and not s.endswith('/'):
637 def _mmap_do(f, sz, flags, prot, close):
639 st = os.fstat(f.fileno())
642 # trying to open a zero-length map gives an error, but an empty
643 # string has all the same behaviour of a zero-length map, ie. it has
646 map = mmap.mmap(f.fileno(), sz, flags, prot)
648 f.close() # map will persist beyond file close
652 def mmap_read(f, sz = 0, close=True):
653 """Create a read-only memory mapped region on file 'f'.
654 If sz is 0, the region will cover the entire file.
656 return _mmap_do(f, sz, mmap.MAP_PRIVATE, mmap.PROT_READ, close)
659 def mmap_readwrite(f, sz = 0, close=True):
660 """Create a read-write memory mapped region on file 'f'.
661 If sz is 0, the region will cover the entire file.
663 return _mmap_do(f, sz, mmap.MAP_SHARED, mmap.PROT_READ|mmap.PROT_WRITE,
667 def mmap_readwrite_private(f, sz = 0, close=True):
668 """Create a read-write memory mapped region on file 'f'.
669 If sz is 0, the region will cover the entire file.
670 The map is private, which means the changes are never flushed back to the
673 return _mmap_do(f, sz, mmap.MAP_PRIVATE, mmap.PROT_READ|mmap.PROT_WRITE,
677 def parse_timestamp(epoch_str):
678 """Return the number of nanoseconds since the epoch that are described
679 by epoch_str (100ms, 100ns, ...); when epoch_str cannot be parsed,
680 throw a ValueError that may contain additional information."""
681 ns_per = {'s' : 1000000000,
685 match = re.match(r'^((?:[-+]?[0-9]+)?)(s|ms|us|ns)$', epoch_str)
687 if re.match(r'^([-+]?[0-9]+)$', epoch_str):
688 raise ValueError('must include units, i.e. 100ns, 100ms, ...')
690 (n, units) = match.group(1, 2)
694 return n * ns_per[units]
698 """Parse data size information into a float number.
700 Here are some examples of conversions:
701 199.2k means 203981 bytes
702 1GB means 1073741824 bytes
703 2.1 tb means 2199023255552 bytes
705 g = re.match(r'([-+\d.e]+)\s*(\w*)', str(s))
707 raise ValueError("can't parse %r as a number" % s)
708 (val, unit) = g.groups()
711 if unit in ['t', 'tb']:
712 mult = 1024*1024*1024*1024
713 elif unit in ['g', 'gb']:
714 mult = 1024*1024*1024
715 elif unit in ['m', 'mb']:
717 elif unit in ['k', 'kb']:
719 elif unit in ['', 'b']:
722 raise ValueError("invalid unit %r in number %r" % (unit, s))
727 """Count the number of elements in an iterator. (consumes the iterator)"""
728 return reduce(lambda x,y: x+1, l)
733 """Append an error message to the list of saved errors.
735 Once processing is able to stop and output the errors, the saved errors are
736 accessible in the module variable helpers.saved_errors.
738 saved_errors.append(e)
748 """Replace the default exception handler for KeyboardInterrupt (Ctrl-C).
750 The new exception handler will make sure that bup will exit without an ugly
751 stacktrace when Ctrl-C is hit.
753 oldhook = sys.excepthook
754 def newhook(exctype, value, traceback):
755 if exctype == KeyboardInterrupt:
756 log('\nInterrupted.\n')
758 return oldhook(exctype, value, traceback)
759 sys.excepthook = newhook
762 def columnate(l, prefix):
763 """Format elements of 'l' in columns with 'prefix' leading each line.
765 The number of columns is determined automatically based on the string
771 clen = max(len(s) for s in l)
772 ncols = (tty_width() - len(prefix)) / (clen + 2)
777 while len(l) % ncols:
780 for s in range(0, len(l), rows):
781 cols.append(l[s:s+rows])
783 for row in zip(*cols):
784 out += prefix + ''.join(('%-*s' % (clen+2, s)) for s in row) + '\n'
788 def parse_date_or_fatal(str, fatal):
789 """Parses the given date or calls Option.fatal().
790 For now we expect a string that contains a float."""
793 except ValueError, e:
794 raise fatal('invalid date format (should be a float): %r' % e)
799 def parse_excludes(options, fatal):
800 """Traverse the options and extract all excludes, or call Option.fatal()."""
804 (option, parameter) = flag
805 if option == '--exclude':
806 excluded_paths.append(realpath(parameter))
807 elif option == '--exclude-from':
809 f = open(realpath(parameter))
811 raise fatal("couldn't read %s" % parameter)
812 for exclude_path in f.readlines():
813 excluded_paths.append(realpath(exclude_path.strip()))
814 return sorted(frozenset(excluded_paths))
817 def parse_rx_excludes(options, fatal):
818 """Traverse the options and extract all rx excludes, or call
820 excluded_patterns = []
823 (option, parameter) = flag
824 if option == '--exclude-rx':
826 excluded_patterns.append(re.compile(parameter))
828 fatal('invalid --exclude-rx pattern (%s): %s' % (parameter, ex))
829 elif option == '--exclude-rx-from':
831 f = open(realpath(parameter))
833 raise fatal("couldn't read %s" % parameter)
834 for pattern in f.readlines():
835 spattern = pattern.rstrip('\n')
837 excluded_patterns.append(re.compile(spattern))
839 fatal('invalid --exclude-rx pattern (%s): %s' % (spattern, ex))
840 return excluded_patterns
843 def should_rx_exclude_path(path, exclude_rxs):
844 """Return True if path matches a regular expression in exclude_rxs."""
845 for rx in exclude_rxs:
847 debug1('Skipping %r: excluded by rx pattern %r.\n'
848 % (path, rx.pattern))
853 # FIXME: Carefully consider the use of functions (os.path.*, etc.)
854 # that resolve against the current filesystem in the strip/graft
855 # functions for example, but elsewhere as well. I suspect bup's not
856 # always being careful about that. For some cases, the contents of
857 # the current filesystem should be irrelevant, and consulting it might
858 # produce the wrong result, perhaps via unintended symlink resolution,
861 def path_components(path):
862 """Break path into a list of pairs of the form (name,
863 full_path_to_name). Path must start with '/'.
865 '/home/foo' -> [('', '/'), ('home', '/home'), ('foo', '/home/foo')]"""
866 if not path.startswith('/'):
867 raise Exception, 'path must start with "/": %s' % path
868 # Since we assume path startswith('/'), we can skip the first element.
870 norm_path = os.path.abspath(path)
874 for p in norm_path.split('/')[1:]:
876 result.append((p, full_path))
880 def stripped_path_components(path, strip_prefixes):
881 """Strip any prefix in strip_prefixes from path and return a list
882 of path components where each component is (name,
883 none_or_full_fs_path_to_name). Assume path startswith('/').
884 See thelpers.py for examples."""
885 normalized_path = os.path.abspath(path)
886 sorted_strip_prefixes = sorted(strip_prefixes, key=len, reverse=True)
887 for bp in sorted_strip_prefixes:
888 normalized_bp = os.path.abspath(bp)
889 if normalized_path.startswith(normalized_bp):
890 prefix = normalized_path[:len(normalized_bp)]
892 for p in normalized_path[len(normalized_bp):].split('/'):
896 result.append((p, prefix))
899 return path_components(path)
902 def grafted_path_components(graft_points, path):
903 # Create a result that consists of some number of faked graft
904 # directories before the graft point, followed by all of the real
905 # directories from path that are after the graft point. Arrange
906 # for the directory at the graft point in the result to correspond
907 # to the "orig" directory in --graft orig=new. See t/thelpers.py
910 # Note that given --graft orig=new, orig and new have *nothing* to
911 # do with each other, even if some of their component names
912 # match. i.e. --graft /foo/bar/baz=/foo/bar/bax is semantically
913 # equivalent to --graft /foo/bar/baz=/x/y/z, or even
916 # FIXME: This can't be the best solution...
917 clean_path = os.path.abspath(path)
918 for graft_point in graft_points:
919 old_prefix, new_prefix = graft_point
920 # Expand prefixes iff not absolute paths.
921 old_prefix = os.path.normpath(old_prefix)
922 new_prefix = os.path.normpath(new_prefix)
923 if clean_path.startswith(old_prefix):
924 escaped_prefix = re.escape(old_prefix)
925 grafted_path = re.sub(r'^' + escaped_prefix, new_prefix, clean_path)
926 # Handle /foo=/ (at least) -- which produces //whatever.
927 grafted_path = '/' + grafted_path.lstrip('/')
928 clean_path_components = path_components(clean_path)
929 # Count the components that were stripped.
930 strip_count = 0 if old_prefix == '/' else old_prefix.count('/')
931 new_prefix_parts = new_prefix.split('/')
932 result_prefix = grafted_path.split('/')[:new_prefix.count('/')]
933 result = [(p, None) for p in result_prefix] \
934 + clean_path_components[strip_count:]
935 # Now set the graft point name to match the end of new_prefix.
936 graft_point = len(result_prefix)
937 result[graft_point] = \
938 (new_prefix_parts[-1], clean_path_components[strip_count][1])
939 if new_prefix == '/': # --graft ...=/ is a special case.
942 return path_components(clean_path)
947 """Format bup's version date string for output."""
948 return _version.DATE.split(' ')[0]
951 def version_commit():
952 """Get the commit hash of bup's current version."""
953 return _version.COMMIT
957 """Format bup's version tag (the official version number).
959 When generated from a commit other than one pointed to with a tag, the
960 returned string will be "unknown-" followed by the first seven positions of
963 names = _version.NAMES.strip()
964 assert(names[0] == '(')
965 assert(names[-1] == ')')
967 l = [n.strip() for n in names.split(',')]
969 if n.startswith('tag: bup-'):
971 return 'unknown-%s' % _version.COMMIT[:7]