1 """Helper functions and classes for bup."""
3 import sys, os, pwd, subprocess, errno, socket, select, mmap, stat, re, struct
4 import hashlib, heapq, operator, time, grp
5 from bup import _version, _helpers
6 import bup._helpers as _helpers
9 # This function should really be in helpers, not in bup.options. But we
10 # want options.py to be standalone so people can include it in other projects.
11 from bup.options import _tty_width
12 tty_width = _tty_width
16 """Convert the string 's' to an integer. Return 0 if s is not a number."""
24 """Convert the string 's' to a float. Return 0 if s is not a number."""
26 return float(s or '0')
31 buglvl = atoi(os.environ.get('BUP_DEBUG', 0))
34 # If the platform doesn't have fdatasync (OS X), fall back to fsync.
36 fdatasync = os.fdatasync
37 except AttributeError:
41 # Write (blockingly) to sockets that may or may not be in blocking mode.
42 # We need this because our stderr is sometimes eaten by subprocesses
43 # (probably ssh) that sometimes make it nonblocking, if only temporarily,
44 # leading to race conditions. Ick. We'll do it the hard way.
45 def _hard_write(fd, buf):
47 (r,w,x) = select.select([], [fd], [], None)
49 raise IOError('select(fd) returned without being writable')
51 sz = os.write(fd, buf)
53 if e.errno != errno.EAGAIN:
61 """Print a log message to stderr."""
64 _hard_write(sys.stderr.fileno(), s)
78 istty1 = os.isatty(1) or (atoi(os.environ.get('BUP_FORCE_TTY')) & 1)
79 istty2 = os.isatty(2) or (atoi(os.environ.get('BUP_FORCE_TTY')) & 2)
82 """Calls log() if stderr is a TTY. Does nothing otherwise."""
90 """Calls progress() only if we haven't printed progress in a while.
92 This avoids overloading the stderr buffer with excess junk.
96 if now - _last_prog > 0.1:
102 """Calls progress() to redisplay the most recent progress message.
104 Useful after you've printed some other message that wipes out the
107 if _last_progress and _last_progress.endswith('\r'):
108 progress(_last_progress)
111 def mkdirp(d, mode=None):
112 """Recursively create directories on path 'd'.
114 Unlike os.makedirs(), it doesn't raise an exception if the last element of
115 the path already exists.
123 if e.errno == errno.EEXIST:
130 """Get the next item from an iterator, None if we reached the end."""
133 except StopIteration:
137 def merge_iter(iters, pfreq, pfunc, pfinal, key=None):
139 samekey = lambda e, pe: getattr(e, key) == getattr(pe, key, None)
141 samekey = operator.eq
143 total = sum(len(it) for it in iters)
144 iters = (iter(it) for it in iters)
145 heap = ((next(it),it) for it in iters)
146 heap = [(e,it) for e,it in heap if e]
151 if not count % pfreq:
154 if not samekey(e, pe):
159 e = it.next() # Don't use next() function, it's too expensive
160 except StopIteration:
161 heapq.heappop(heap) # remove current
163 heapq.heapreplace(heap, (e, it)) # shift current to new location
168 """Delete a file at path 'f' if it currently exists.
170 Unlike os.unlink(), does not throw an exception if the file didn't already
176 if e.errno == errno.ENOENT:
177 pass # it doesn't exist, that's what you asked for
181 """Run a subprocess and return its output."""
182 p = subprocess.Popen(argv, stdout=subprocess.PIPE)
189 """Get the absolute path of a file.
191 Behaves like os.path.realpath, but doesn't follow a symlink for the last
192 element. (ie. if 'p' itself is a symlink, this one won't follow it, but it
193 will follow symlinks in p's directory)
199 if st and stat.S_ISLNK(st.st_mode):
200 (dir, name) = os.path.split(p)
201 dir = os.path.realpath(dir)
202 out = os.path.join(dir, name)
204 out = os.path.realpath(p)
205 #log('realpathing:%r,%r\n' % (p, out))
209 def detect_fakeroot():
210 "Return True if we appear to be running under fakeroot."
211 return os.getenv("FAKEROOTKEY") != None
215 if sys.platform.startswith('cygwin'):
217 return ctypes.cdll.shell32.IsUserAnAdmin()
219 return os.geteuid() == 0
222 def _cache_key_value(get_value, key, cache):
223 """Return (value, was_cached). If there is a value in the cache
224 for key, use that, otherwise, call get_value(key) which should
225 throw a KeyError if there is no value -- in which case the cached
226 and returned value will be None.
228 try: # Do we already have it (or know there wasn't one)?
235 cache[key] = value = get_value(key)
241 _uid_to_pwd_cache = {}
242 _name_to_pwd_cache = {}
244 def pwd_from_uid(uid):
245 """Return password database entry for uid (may be a cached value).
246 Return None if no entry is found.
248 global _uid_to_pwd_cache, _name_to_pwd_cache
249 entry, cached = _cache_key_value(pwd.getpwuid, uid, _uid_to_pwd_cache)
250 if entry and not cached:
251 _name_to_pwd_cache[entry.pw_name] = entry
255 def pwd_from_name(name):
256 """Return password database entry for name (may be a cached value).
257 Return None if no entry is found.
259 global _uid_to_pwd_cache, _name_to_pwd_cache
260 entry, cached = _cache_key_value(pwd.getpwnam, name, _name_to_pwd_cache)
261 if entry and not cached:
262 _uid_to_pwd_cache[entry.pw_uid] = entry
266 _gid_to_grp_cache = {}
267 _name_to_grp_cache = {}
269 def grp_from_gid(gid):
270 """Return password database entry for gid (may be a cached value).
271 Return None if no entry is found.
273 global _gid_to_grp_cache, _name_to_grp_cache
274 entry, cached = _cache_key_value(grp.getgrgid, gid, _gid_to_grp_cache)
275 if entry and not cached:
276 _name_to_grp_cache[entry.gr_name] = entry
280 def grp_from_name(name):
281 """Return password database entry for name (may be a cached value).
282 Return None if no entry is found.
284 global _gid_to_grp_cache, _name_to_grp_cache
285 entry, cached = _cache_key_value(grp.getgrnam, name, _name_to_grp_cache)
286 if entry and not cached:
287 _gid_to_grp_cache[entry.gr_gid] = entry
293 """Get the user's login name."""
297 _username = pwd_from_uid(uid)[0] or 'user%d' % uid
303 """Get the user's full name."""
305 if not _userfullname:
307 entry = pwd_from_uid(uid)
309 _userfullname = entry[4].split(',')[0] or entry[0]
310 if not _userfullname:
311 _userfullname = 'user%d' % uid
317 """Get the FQDN of this machine."""
320 _hostname = socket.getfqdn()
324 _resource_path = None
325 def resource_path(subdir=''):
326 global _resource_path
327 if not _resource_path:
328 _resource_path = os.environ.get('BUP_RESOURCE_PATH') or '.'
329 return os.path.join(_resource_path, subdir)
331 def format_filesize(size):
336 exponent = int(math.log(size) / math.log(unit))
337 size_prefix = "KMGTPE"[exponent - 1]
338 return "%.1f%s" % (size / math.pow(unit, exponent), size_prefix)
341 class NotOk(Exception):
346 def __init__(self, outp):
350 while self._read(65536): pass
352 def read(self, size):
353 """Read 'size' bytes from input stream."""
355 return self._read(size)
358 """Read from input stream until a newline is found."""
360 return self._readline()
362 def write(self, data):
363 """Write 'data' to output stream."""
364 #log('%d writing: %d bytes\n' % (os.getpid(), len(data)))
365 self.outp.write(data)
368 """Return true if input stream is readable."""
369 raise NotImplemented("Subclasses must implement has_input")
372 """Indicate end of output from last sent command."""
376 """Indicate server error to the client."""
377 s = re.sub(r'\s+', ' ', str(s))
378 self.write('\nerror %s\n' % s)
380 def _check_ok(self, onempty):
383 for rl in linereader(self):
384 #log('%d got line: %r\n' % (os.getpid(), rl))
385 if not rl: # empty line
389 elif rl.startswith('error '):
390 #log('client: error: %s\n' % rl[6:])
394 raise Exception('server exited unexpectedly; see errors above')
396 def drain_and_check_ok(self):
397 """Remove all data for the current command from input stream."""
400 return self._check_ok(onempty)
403 """Verify that server action completed successfully."""
405 raise Exception('expected "ok", got %r' % rl)
406 return self._check_ok(onempty)
409 class Conn(BaseConn):
410 def __init__(self, inp, outp):
411 BaseConn.__init__(self, outp)
414 def _read(self, size):
415 return self.inp.read(size)
418 return self.inp.readline()
421 [rl, wl, xl] = select.select([self.inp.fileno()], [], [], 0)
423 assert(rl[0] == self.inp.fileno())
429 def checked_reader(fd, n):
431 rl, _, _ = select.select([fd], [], [])
434 if not buf: raise Exception("Unexpected EOF reading %d more bytes" % n)
439 MAX_PACKET = 128 * 1024
440 def mux(p, outfd, outr, errr):
443 while p.poll() is None:
444 rl, _, _ = select.select(fds, [], [])
447 buf = os.read(outr, MAX_PACKET)
449 os.write(outfd, struct.pack('!IB', len(buf), 1) + buf)
451 buf = os.read(errr, 1024)
453 os.write(outfd, struct.pack('!IB', len(buf), 2) + buf)
455 os.write(outfd, struct.pack('!IB', 0, 3))
458 class DemuxConn(BaseConn):
459 """A helper class for bup's client-server protocol."""
460 def __init__(self, infd, outp):
461 BaseConn.__init__(self, outp)
462 # Anything that comes through before the sync string was not
463 # multiplexed and can be assumed to be debug/log before mux init.
465 while tail != 'BUPMUX':
466 b = os.read(infd, (len(tail) < 6) and (6-len(tail)) or 1)
468 raise IOError('demux: unexpected EOF during initialization')
470 sys.stderr.write(tail[:-6]) # pre-mux log messages
477 def write(self, data):
479 BaseConn.write(self, data)
481 def _next_packet(self, timeout):
482 if self.closed: return False
483 rl, wl, xl = select.select([self.infd], [], [], timeout)
484 if not rl: return False
485 assert(rl[0] == self.infd)
486 ns = ''.join(checked_reader(self.infd, 5))
487 n, fdw = struct.unpack('!IB', ns)
488 assert(n <= MAX_PACKET)
490 self.reader = checked_reader(self.infd, n)
492 for buf in checked_reader(self.infd, n):
493 sys.stderr.write(buf)
496 debug2("DemuxConn: marked closed\n")
499 def _load_buf(self, timeout):
500 if self.buf is not None:
502 while not self.closed:
503 while not self.reader:
504 if not self._next_packet(timeout):
507 self.buf = self.reader.next()
509 except StopIteration:
513 def _read_parts(self, ix_fn):
514 while self._load_buf(None):
515 assert(self.buf is not None)
517 if i is None or i == len(self.buf):
522 self.buf = self.buf[i:]
530 return buf.index('\n')+1
533 return ''.join(self._read_parts(find_eol))
535 def _read(self, size):
537 def until_size(buf): # Closes on csize
538 if len(buf) < csize[0]:
543 return ''.join(self._read_parts(until_size))
546 return self._load_buf(0)
550 """Generate a list of input lines from 'f' without terminating newlines."""
558 def chunkyreader(f, count = None):
559 """Generate a list of chunks of data read from 'f'.
561 If count is None, read until EOF is reached.
563 If count is a positive integer, read 'count' bytes from 'f'. If EOF is
564 reached while reading, raise IOError.
568 b = f.read(min(count, 65536))
570 raise IOError('EOF with %d bytes remaining' % count)
581 """Append "/" to 's' if it doesn't aleady end in "/"."""
582 if s and not s.endswith('/'):
588 def _mmap_do(f, sz, flags, prot, close):
590 st = os.fstat(f.fileno())
593 # trying to open a zero-length map gives an error, but an empty
594 # string has all the same behaviour of a zero-length map, ie. it has
597 map = mmap.mmap(f.fileno(), sz, flags, prot)
599 f.close() # map will persist beyond file close
603 def mmap_read(f, sz = 0, close=True):
604 """Create a read-only memory mapped region on file 'f'.
605 If sz is 0, the region will cover the entire file.
607 return _mmap_do(f, sz, mmap.MAP_PRIVATE, mmap.PROT_READ, close)
610 def mmap_readwrite(f, sz = 0, close=True):
611 """Create a read-write memory mapped region on file 'f'.
612 If sz is 0, the region will cover the entire file.
614 return _mmap_do(f, sz, mmap.MAP_SHARED, mmap.PROT_READ|mmap.PROT_WRITE,
618 def mmap_readwrite_private(f, sz = 0, close=True):
619 """Create a read-write memory mapped region on file 'f'.
620 If sz is 0, the region will cover the entire file.
621 The map is private, which means the changes are never flushed back to the
624 return _mmap_do(f, sz, mmap.MAP_PRIVATE, mmap.PROT_READ|mmap.PROT_WRITE,
628 def parse_timestamp(epoch_str):
629 """Return the number of nanoseconds since the epoch that are described
630 by epoch_str (100ms, 100ns, ...); when epoch_str cannot be parsed,
631 throw a ValueError that may contain additional information."""
632 ns_per = {'s' : 1000000000,
636 match = re.match(r'^((?:[-+]?[0-9]+)?)(s|ms|us|ns)$', epoch_str)
638 if re.match(r'^([-+]?[0-9]+)$', epoch_str):
639 raise ValueError('must include units, i.e. 100ns, 100ms, ...')
641 (n, units) = match.group(1, 2)
645 return n * ns_per[units]
649 """Parse data size information into a float number.
651 Here are some examples of conversions:
652 199.2k means 203981 bytes
653 1GB means 1073741824 bytes
654 2.1 tb means 2199023255552 bytes
656 g = re.match(r'([-+\d.e]+)\s*(\w*)', str(s))
658 raise ValueError("can't parse %r as a number" % s)
659 (val, unit) = g.groups()
662 if unit in ['t', 'tb']:
663 mult = 1024*1024*1024*1024
664 elif unit in ['g', 'gb']:
665 mult = 1024*1024*1024
666 elif unit in ['m', 'mb']:
668 elif unit in ['k', 'kb']:
670 elif unit in ['', 'b']:
673 raise ValueError("invalid unit %r in number %r" % (unit, s))
678 """Count the number of elements in an iterator. (consumes the iterator)"""
679 return reduce(lambda x,y: x+1, l)
684 """Append an error message to the list of saved errors.
686 Once processing is able to stop and output the errors, the saved errors are
687 accessible in the module variable helpers.saved_errors.
689 saved_errors.append(e)
699 """Replace the default exception handler for KeyboardInterrupt (Ctrl-C).
701 The new exception handler will make sure that bup will exit without an ugly
702 stacktrace when Ctrl-C is hit.
704 oldhook = sys.excepthook
705 def newhook(exctype, value, traceback):
706 if exctype == KeyboardInterrupt:
707 log('\nInterrupted.\n')
709 return oldhook(exctype, value, traceback)
710 sys.excepthook = newhook
713 def columnate(l, prefix):
714 """Format elements of 'l' in columns with 'prefix' leading each line.
716 The number of columns is determined automatically based on the string
722 clen = max(len(s) for s in l)
723 ncols = (tty_width() - len(prefix)) / (clen + 2)
728 while len(l) % ncols:
731 for s in range(0, len(l), rows):
732 cols.append(l[s:s+rows])
734 for row in zip(*cols):
735 out += prefix + ''.join(('%-*s' % (clen+2, s)) for s in row) + '\n'
739 def parse_date_or_fatal(str, fatal):
740 """Parses the given date or calls Option.fatal().
741 For now we expect a string that contains a float."""
744 except ValueError, e:
745 raise fatal('invalid date format (should be a float): %r' % e)
750 def parse_excludes(options, fatal):
751 """Traverse the options and extract all excludes, or call Option.fatal()."""
755 (option, parameter) = flag
756 if option == '--exclude':
757 excluded_paths.append(realpath(parameter))
758 elif option == '--exclude-from':
760 f = open(realpath(parameter))
762 raise fatal("couldn't read %s" % parameter)
763 for exclude_path in f.readlines():
764 excluded_paths.append(realpath(exclude_path.strip()))
765 return sorted(frozenset(excluded_paths))
768 def parse_rx_excludes(options, fatal):
769 """Traverse the options and extract all rx excludes, or call
771 excluded_patterns = []
774 (option, parameter) = flag
775 if option == '--exclude-rx':
777 excluded_patterns.append(re.compile(parameter))
779 fatal('invalid --exclude-rx pattern (%s): %s' % (parameter, ex))
780 elif option == '--exclude-rx-from':
782 f = open(realpath(parameter))
784 raise fatal("couldn't read %s" % parameter)
785 for pattern in f.readlines():
786 spattern = pattern.rstrip('\n')
788 excluded_patterns.append(re.compile(spattern))
790 fatal('invalid --exclude-rx pattern (%s): %s' % (spattern, ex))
791 return excluded_patterns
794 def should_rx_exclude_path(path, exclude_rxs):
795 """Return True if path matches a regular expression in exclude_rxs."""
796 for rx in exclude_rxs:
798 debug1('Skipping %r: excluded by rx pattern %r.\n'
799 % (path, rx.pattern))
804 # FIXME: Carefully consider the use of functions (os.path.*, etc.)
805 # that resolve against the current filesystem in the strip/graft
806 # functions for example, but elsewhere as well. I suspect bup's not
807 # always being careful about that. For some cases, the contents of
808 # the current filesystem should be irrelevant, and consulting it might
809 # produce the wrong result, perhaps via unintended symlink resolution,
812 def path_components(path):
813 """Break path into a list of pairs of the form (name,
814 full_path_to_name). Path must start with '/'.
816 '/home/foo' -> [('', '/'), ('home', '/home'), ('foo', '/home/foo')]"""
817 if not path.startswith('/'):
818 raise Exception, 'path must start with "/": %s' % path
819 # Since we assume path startswith('/'), we can skip the first element.
821 norm_path = os.path.abspath(path)
825 for p in norm_path.split('/')[1:]:
827 result.append((p, full_path))
831 def stripped_path_components(path, strip_prefixes):
832 """Strip any prefix in strip_prefixes from path and return a list
833 of path components where each component is (name,
834 none_or_full_fs_path_to_name). Assume path startswith('/').
835 See thelpers.py for examples."""
836 normalized_path = os.path.abspath(path)
837 sorted_strip_prefixes = sorted(strip_prefixes, key=len, reverse=True)
838 for bp in sorted_strip_prefixes:
839 normalized_bp = os.path.abspath(bp)
840 if normalized_path.startswith(normalized_bp):
841 prefix = normalized_path[:len(normalized_bp)]
843 for p in normalized_path[len(normalized_bp):].split('/'):
847 result.append((p, prefix))
850 return path_components(path)
853 def grafted_path_components(graft_points, path):
854 # Create a result that consists of some number of faked graft
855 # directories before the graft point, followed by all of the real
856 # directories from path that are after the graft point. Arrange
857 # for the directory at the graft point in the result to correspond
858 # to the "orig" directory in --graft orig=new. See t/thelpers.py
861 # Note that given --graft orig=new, orig and new have *nothing* to
862 # do with each other, even if some of their component names
863 # match. i.e. --graft /foo/bar/baz=/foo/bar/bax is semantically
864 # equivalent to --graft /foo/bar/baz=/x/y/z, or even
867 # FIXME: This can't be the best solution...
868 clean_path = os.path.abspath(path)
869 for graft_point in graft_points:
870 old_prefix, new_prefix = graft_point
871 # Expand prefixes iff not absolute paths.
872 old_prefix = os.path.normpath(old_prefix)
873 new_prefix = os.path.normpath(new_prefix)
874 if clean_path.startswith(old_prefix):
875 escaped_prefix = re.escape(old_prefix)
876 grafted_path = re.sub(r'^' + escaped_prefix, new_prefix, clean_path)
877 # Handle /foo=/ (at least) -- which produces //whatever.
878 grafted_path = '/' + grafted_path.lstrip('/')
879 clean_path_components = path_components(clean_path)
880 # Count the components that were stripped.
881 strip_count = 0 if old_prefix == '/' else old_prefix.count('/')
882 new_prefix_parts = new_prefix.split('/')
883 result_prefix = grafted_path.split('/')[:new_prefix.count('/')]
884 result = [(p, None) for p in result_prefix] \
885 + clean_path_components[strip_count:]
886 # Now set the graft point name to match the end of new_prefix.
887 graft_point = len(result_prefix)
888 result[graft_point] = \
889 (new_prefix_parts[-1], clean_path_components[strip_count][1])
890 if new_prefix == '/': # --graft ...=/ is a special case.
893 return path_components(clean_path)
898 """Format bup's version date string for output."""
899 return _version.DATE.split(' ')[0]
902 def version_commit():
903 """Get the commit hash of bup's current version."""
904 return _version.COMMIT
908 """Format bup's version tag (the official version number).
910 When generated from a commit other than one pointed to with a tag, the
911 returned string will be "unknown-" followed by the first seven positions of
914 names = _version.NAMES.strip()
915 assert(names[0] == '(')
916 assert(names[-1] == ')')
918 l = [n.strip() for n in names.split(',')]
920 if n.startswith('tag: bup-'):
922 return 'unknown-%s' % _version.COMMIT[:7]