1 """Helper functions and classes for bup."""
3 import sys, os, pwd, subprocess, errno, socket, select, mmap, stat, re, struct
4 import hashlib, heapq, operator, time, grp
5 from bup import _version, _helpers
6 import bup._helpers as _helpers
9 # This function should really be in helpers, not in bup.options. But we
10 # want options.py to be standalone so people can include it in other projects.
11 from bup.options import _tty_width
12 tty_width = _tty_width
16 """Convert the string 's' to an integer. Return 0 if s is not a number."""
24 """Convert the string 's' to a float. Return 0 if s is not a number."""
26 return float(s or '0')
31 buglvl = atoi(os.environ.get('BUP_DEBUG', 0))
34 # If the platform doesn't have fdatasync (OS X), fall back to fsync.
36 fdatasync = os.fdatasync
37 except AttributeError:
41 # Write (blockingly) to sockets that may or may not be in blocking mode.
42 # We need this because our stderr is sometimes eaten by subprocesses
43 # (probably ssh) that sometimes make it nonblocking, if only temporarily,
44 # leading to race conditions. Ick. We'll do it the hard way.
45 def _hard_write(fd, buf):
47 (r,w,x) = select.select([], [fd], [], None)
49 raise IOError('select(fd) returned without being writable')
51 sz = os.write(fd, buf)
53 if e.errno != errno.EAGAIN:
61 """Print a log message to stderr."""
64 _hard_write(sys.stderr.fileno(), s)
78 istty1 = os.isatty(1) or (atoi(os.environ.get('BUP_FORCE_TTY')) & 1)
79 istty2 = os.isatty(2) or (atoi(os.environ.get('BUP_FORCE_TTY')) & 2)
82 """Calls log() if stderr is a TTY. Does nothing otherwise."""
90 """Calls progress() only if we haven't printed progress in a while.
92 This avoids overloading the stderr buffer with excess junk.
96 if now - _last_prog > 0.1:
102 """Calls progress() to redisplay the most recent progress message.
104 Useful after you've printed some other message that wipes out the
107 if _last_progress and _last_progress.endswith('\r'):
108 progress(_last_progress)
111 def mkdirp(d, mode=None):
112 """Recursively create directories on path 'd'.
114 Unlike os.makedirs(), it doesn't raise an exception if the last element of
115 the path already exists.
123 if e.errno == errno.EEXIST:
129 _unspecified_next_default = object()
131 def _fallback_next(it, default=_unspecified_next_default):
132 """Retrieve the next item from the iterator by calling its
133 next() method. If default is given, it is returned if the
134 iterator is exhausted, otherwise StopIteration is raised."""
136 if default is _unspecified_next_default:
141 except StopIteration:
144 if sys.version_info < (2, 6):
145 next = _fallback_next
148 def merge_iter(iters, pfreq, pfunc, pfinal, key=None):
150 samekey = lambda e, pe: getattr(e, key) == getattr(pe, key, None)
152 samekey = operator.eq
154 total = sum(len(it) for it in iters)
155 iters = (iter(it) for it in iters)
156 heap = ((next(it, None),it) for it in iters)
157 heap = [(e,it) for e,it in heap if e]
162 if not count % pfreq:
165 if not samekey(e, pe):
170 e = it.next() # Don't use next() function, it's too expensive
171 except StopIteration:
172 heapq.heappop(heap) # remove current
174 heapq.heapreplace(heap, (e, it)) # shift current to new location
179 """Delete a file at path 'f' if it currently exists.
181 Unlike os.unlink(), does not throw an exception if the file didn't already
187 if e.errno == errno.ENOENT:
188 pass # it doesn't exist, that's what you asked for
192 """Run a subprocess and return its output."""
193 p = subprocess.Popen(argv, stdout=subprocess.PIPE)
194 out, err = p.communicate()
195 if p.returncode != 0:
196 raise Exception('subprocess %r failed with status %d'
197 % (' '.join(argv), p.returncode))
202 """Get the absolute path of a file.
204 Behaves like os.path.realpath, but doesn't follow a symlink for the last
205 element. (ie. if 'p' itself is a symlink, this one won't follow it, but it
206 will follow symlinks in p's directory)
212 if st and stat.S_ISLNK(st.st_mode):
213 (dir, name) = os.path.split(p)
214 dir = os.path.realpath(dir)
215 out = os.path.join(dir, name)
217 out = os.path.realpath(p)
218 #log('realpathing:%r,%r\n' % (p, out))
222 def detect_fakeroot():
223 "Return True if we appear to be running under fakeroot."
224 return os.getenv("FAKEROOTKEY") != None
228 if sys.platform.startswith('cygwin'):
230 return ctypes.cdll.shell32.IsUserAnAdmin()
232 return os.geteuid() == 0
235 def _cache_key_value(get_value, key, cache):
236 """Return (value, was_cached). If there is a value in the cache
237 for key, use that, otherwise, call get_value(key) which should
238 throw a KeyError if there is no value -- in which case the cached
239 and returned value will be None.
241 try: # Do we already have it (or know there wasn't one)?
248 cache[key] = value = get_value(key)
254 _uid_to_pwd_cache = {}
255 _name_to_pwd_cache = {}
257 def pwd_from_uid(uid):
258 """Return password database entry for uid (may be a cached value).
259 Return None if no entry is found.
261 global _uid_to_pwd_cache, _name_to_pwd_cache
262 entry, cached = _cache_key_value(pwd.getpwuid, uid, _uid_to_pwd_cache)
263 if entry and not cached:
264 _name_to_pwd_cache[entry.pw_name] = entry
268 def pwd_from_name(name):
269 """Return password database entry for name (may be a cached value).
270 Return None if no entry is found.
272 global _uid_to_pwd_cache, _name_to_pwd_cache
273 entry, cached = _cache_key_value(pwd.getpwnam, name, _name_to_pwd_cache)
274 if entry and not cached:
275 _uid_to_pwd_cache[entry.pw_uid] = entry
279 _gid_to_grp_cache = {}
280 _name_to_grp_cache = {}
282 def grp_from_gid(gid):
283 """Return password database entry for gid (may be a cached value).
284 Return None if no entry is found.
286 global _gid_to_grp_cache, _name_to_grp_cache
287 entry, cached = _cache_key_value(grp.getgrgid, gid, _gid_to_grp_cache)
288 if entry and not cached:
289 _name_to_grp_cache[entry.gr_name] = entry
293 def grp_from_name(name):
294 """Return password database entry for name (may be a cached value).
295 Return None if no entry is found.
297 global _gid_to_grp_cache, _name_to_grp_cache
298 entry, cached = _cache_key_value(grp.getgrnam, name, _name_to_grp_cache)
299 if entry and not cached:
300 _gid_to_grp_cache[entry.gr_gid] = entry
306 """Get the user's login name."""
310 _username = pwd_from_uid(uid)[0] or 'user%d' % uid
316 """Get the user's full name."""
318 if not _userfullname:
320 entry = pwd_from_uid(uid)
322 _userfullname = entry[4].split(',')[0] or entry[0]
323 if not _userfullname:
324 _userfullname = 'user%d' % uid
330 """Get the FQDN of this machine."""
333 _hostname = socket.getfqdn()
337 _resource_path = None
338 def resource_path(subdir=''):
339 global _resource_path
340 if not _resource_path:
341 _resource_path = os.environ.get('BUP_RESOURCE_PATH') or '.'
342 return os.path.join(_resource_path, subdir)
344 def format_filesize(size):
349 exponent = int(math.log(size) / math.log(unit))
350 size_prefix = "KMGTPE"[exponent - 1]
351 return "%.1f%s" % (size / math.pow(unit, exponent), size_prefix)
354 class NotOk(Exception):
359 def __init__(self, outp):
363 while self._read(65536): pass
365 def read(self, size):
366 """Read 'size' bytes from input stream."""
368 return self._read(size)
371 """Read from input stream until a newline is found."""
373 return self._readline()
375 def write(self, data):
376 """Write 'data' to output stream."""
377 #log('%d writing: %d bytes\n' % (os.getpid(), len(data)))
378 self.outp.write(data)
381 """Return true if input stream is readable."""
382 raise NotImplemented("Subclasses must implement has_input")
385 """Indicate end of output from last sent command."""
389 """Indicate server error to the client."""
390 s = re.sub(r'\s+', ' ', str(s))
391 self.write('\nerror %s\n' % s)
393 def _check_ok(self, onempty):
396 for rl in linereader(self):
397 #log('%d got line: %r\n' % (os.getpid(), rl))
398 if not rl: # empty line
402 elif rl.startswith('error '):
403 #log('client: error: %s\n' % rl[6:])
407 raise Exception('server exited unexpectedly; see errors above')
409 def drain_and_check_ok(self):
410 """Remove all data for the current command from input stream."""
413 return self._check_ok(onempty)
416 """Verify that server action completed successfully."""
418 raise Exception('expected "ok", got %r' % rl)
419 return self._check_ok(onempty)
422 class Conn(BaseConn):
423 def __init__(self, inp, outp):
424 BaseConn.__init__(self, outp)
427 def _read(self, size):
428 return self.inp.read(size)
431 return self.inp.readline()
434 [rl, wl, xl] = select.select([self.inp.fileno()], [], [], 0)
436 assert(rl[0] == self.inp.fileno())
442 def checked_reader(fd, n):
444 rl, _, _ = select.select([fd], [], [])
447 if not buf: raise Exception("Unexpected EOF reading %d more bytes" % n)
452 MAX_PACKET = 128 * 1024
453 def mux(p, outfd, outr, errr):
456 while p.poll() is None:
457 rl, _, _ = select.select(fds, [], [])
460 buf = os.read(outr, MAX_PACKET)
462 os.write(outfd, struct.pack('!IB', len(buf), 1) + buf)
464 buf = os.read(errr, 1024)
466 os.write(outfd, struct.pack('!IB', len(buf), 2) + buf)
468 os.write(outfd, struct.pack('!IB', 0, 3))
471 class DemuxConn(BaseConn):
472 """A helper class for bup's client-server protocol."""
473 def __init__(self, infd, outp):
474 BaseConn.__init__(self, outp)
475 # Anything that comes through before the sync string was not
476 # multiplexed and can be assumed to be debug/log before mux init.
478 while tail != 'BUPMUX':
479 b = os.read(infd, (len(tail) < 6) and (6-len(tail)) or 1)
481 raise IOError('demux: unexpected EOF during initialization')
483 sys.stderr.write(tail[:-6]) # pre-mux log messages
490 def write(self, data):
492 BaseConn.write(self, data)
494 def _next_packet(self, timeout):
495 if self.closed: return False
496 rl, wl, xl = select.select([self.infd], [], [], timeout)
497 if not rl: return False
498 assert(rl[0] == self.infd)
499 ns = ''.join(checked_reader(self.infd, 5))
500 n, fdw = struct.unpack('!IB', ns)
501 assert(n <= MAX_PACKET)
503 self.reader = checked_reader(self.infd, n)
505 for buf in checked_reader(self.infd, n):
506 sys.stderr.write(buf)
509 debug2("DemuxConn: marked closed\n")
512 def _load_buf(self, timeout):
513 if self.buf is not None:
515 while not self.closed:
516 while not self.reader:
517 if not self._next_packet(timeout):
520 self.buf = self.reader.next()
522 except StopIteration:
526 def _read_parts(self, ix_fn):
527 while self._load_buf(None):
528 assert(self.buf is not None)
530 if i is None or i == len(self.buf):
535 self.buf = self.buf[i:]
543 return buf.index('\n')+1
546 return ''.join(self._read_parts(find_eol))
548 def _read(self, size):
550 def until_size(buf): # Closes on csize
551 if len(buf) < csize[0]:
556 return ''.join(self._read_parts(until_size))
559 return self._load_buf(0)
563 """Generate a list of input lines from 'f' without terminating newlines."""
571 def chunkyreader(f, count = None):
572 """Generate a list of chunks of data read from 'f'.
574 If count is None, read until EOF is reached.
576 If count is a positive integer, read 'count' bytes from 'f'. If EOF is
577 reached while reading, raise IOError.
581 b = f.read(min(count, 65536))
583 raise IOError('EOF with %d bytes remaining' % count)
594 """Append "/" to 's' if it doesn't aleady end in "/"."""
595 if s and not s.endswith('/'):
601 def _mmap_do(f, sz, flags, prot, close):
603 st = os.fstat(f.fileno())
606 # trying to open a zero-length map gives an error, but an empty
607 # string has all the same behaviour of a zero-length map, ie. it has
610 map = mmap.mmap(f.fileno(), sz, flags, prot)
612 f.close() # map will persist beyond file close
616 def mmap_read(f, sz = 0, close=True):
617 """Create a read-only memory mapped region on file 'f'.
618 If sz is 0, the region will cover the entire file.
620 return _mmap_do(f, sz, mmap.MAP_PRIVATE, mmap.PROT_READ, close)
623 def mmap_readwrite(f, sz = 0, close=True):
624 """Create a read-write memory mapped region on file 'f'.
625 If sz is 0, the region will cover the entire file.
627 return _mmap_do(f, sz, mmap.MAP_SHARED, mmap.PROT_READ|mmap.PROT_WRITE,
631 def mmap_readwrite_private(f, sz = 0, close=True):
632 """Create a read-write memory mapped region on file 'f'.
633 If sz is 0, the region will cover the entire file.
634 The map is private, which means the changes are never flushed back to the
637 return _mmap_do(f, sz, mmap.MAP_PRIVATE, mmap.PROT_READ|mmap.PROT_WRITE,
641 def parse_timestamp(epoch_str):
642 """Return the number of nanoseconds since the epoch that are described
643 by epoch_str (100ms, 100ns, ...); when epoch_str cannot be parsed,
644 throw a ValueError that may contain additional information."""
645 ns_per = {'s' : 1000000000,
649 match = re.match(r'^((?:[-+]?[0-9]+)?)(s|ms|us|ns)$', epoch_str)
651 if re.match(r'^([-+]?[0-9]+)$', epoch_str):
652 raise ValueError('must include units, i.e. 100ns, 100ms, ...')
654 (n, units) = match.group(1, 2)
658 return n * ns_per[units]
662 """Parse data size information into a float number.
664 Here are some examples of conversions:
665 199.2k means 203981 bytes
666 1GB means 1073741824 bytes
667 2.1 tb means 2199023255552 bytes
669 g = re.match(r'([-+\d.e]+)\s*(\w*)', str(s))
671 raise ValueError("can't parse %r as a number" % s)
672 (val, unit) = g.groups()
675 if unit in ['t', 'tb']:
676 mult = 1024*1024*1024*1024
677 elif unit in ['g', 'gb']:
678 mult = 1024*1024*1024
679 elif unit in ['m', 'mb']:
681 elif unit in ['k', 'kb']:
683 elif unit in ['', 'b']:
686 raise ValueError("invalid unit %r in number %r" % (unit, s))
691 """Count the number of elements in an iterator. (consumes the iterator)"""
692 return reduce(lambda x,y: x+1, l)
697 """Append an error message to the list of saved errors.
699 Once processing is able to stop and output the errors, the saved errors are
700 accessible in the module variable helpers.saved_errors.
702 saved_errors.append(e)
712 """Replace the default exception handler for KeyboardInterrupt (Ctrl-C).
714 The new exception handler will make sure that bup will exit without an ugly
715 stacktrace when Ctrl-C is hit.
717 oldhook = sys.excepthook
718 def newhook(exctype, value, traceback):
719 if exctype == KeyboardInterrupt:
720 log('\nInterrupted.\n')
722 return oldhook(exctype, value, traceback)
723 sys.excepthook = newhook
726 def columnate(l, prefix):
727 """Format elements of 'l' in columns with 'prefix' leading each line.
729 The number of columns is determined automatically based on the string
735 clen = max(len(s) for s in l)
736 ncols = (tty_width() - len(prefix)) / (clen + 2)
741 while len(l) % ncols:
744 for s in range(0, len(l), rows):
745 cols.append(l[s:s+rows])
747 for row in zip(*cols):
748 out += prefix + ''.join(('%-*s' % (clen+2, s)) for s in row) + '\n'
752 def parse_date_or_fatal(str, fatal):
753 """Parses the given date or calls Option.fatal().
754 For now we expect a string that contains a float."""
757 except ValueError, e:
758 raise fatal('invalid date format (should be a float): %r' % e)
763 def parse_excludes(options, fatal):
764 """Traverse the options and extract all excludes, or call Option.fatal()."""
768 (option, parameter) = flag
769 if option == '--exclude':
770 excluded_paths.append(realpath(parameter))
771 elif option == '--exclude-from':
773 f = open(realpath(parameter))
775 raise fatal("couldn't read %s" % parameter)
776 for exclude_path in f.readlines():
777 excluded_paths.append(realpath(exclude_path.strip()))
778 return sorted(frozenset(excluded_paths))
781 def parse_rx_excludes(options, fatal):
782 """Traverse the options and extract all rx excludes, or call
784 excluded_patterns = []
787 (option, parameter) = flag
788 if option == '--exclude-rx':
790 excluded_patterns.append(re.compile(parameter))
792 fatal('invalid --exclude-rx pattern (%s): %s' % (parameter, ex))
793 elif option == '--exclude-rx-from':
795 f = open(realpath(parameter))
797 raise fatal("couldn't read %s" % parameter)
798 for pattern in f.readlines():
799 spattern = pattern.rstrip('\n')
801 excluded_patterns.append(re.compile(spattern))
803 fatal('invalid --exclude-rx pattern (%s): %s' % (spattern, ex))
804 return excluded_patterns
807 def should_rx_exclude_path(path, exclude_rxs):
808 """Return True if path matches a regular expression in exclude_rxs."""
809 for rx in exclude_rxs:
811 debug1('Skipping %r: excluded by rx pattern %r.\n'
812 % (path, rx.pattern))
817 # FIXME: Carefully consider the use of functions (os.path.*, etc.)
818 # that resolve against the current filesystem in the strip/graft
819 # functions for example, but elsewhere as well. I suspect bup's not
820 # always being careful about that. For some cases, the contents of
821 # the current filesystem should be irrelevant, and consulting it might
822 # produce the wrong result, perhaps via unintended symlink resolution,
825 def path_components(path):
826 """Break path into a list of pairs of the form (name,
827 full_path_to_name). Path must start with '/'.
829 '/home/foo' -> [('', '/'), ('home', '/home'), ('foo', '/home/foo')]"""
830 if not path.startswith('/'):
831 raise Exception, 'path must start with "/": %s' % path
832 # Since we assume path startswith('/'), we can skip the first element.
834 norm_path = os.path.abspath(path)
838 for p in norm_path.split('/')[1:]:
840 result.append((p, full_path))
844 def stripped_path_components(path, strip_prefixes):
845 """Strip any prefix in strip_prefixes from path and return a list
846 of path components where each component is (name,
847 none_or_full_fs_path_to_name). Assume path startswith('/').
848 See thelpers.py for examples."""
849 normalized_path = os.path.abspath(path)
850 sorted_strip_prefixes = sorted(strip_prefixes, key=len, reverse=True)
851 for bp in sorted_strip_prefixes:
852 normalized_bp = os.path.abspath(bp)
853 if normalized_path.startswith(normalized_bp):
854 prefix = normalized_path[:len(normalized_bp)]
856 for p in normalized_path[len(normalized_bp):].split('/'):
860 result.append((p, prefix))
863 return path_components(path)
866 def grafted_path_components(graft_points, path):
867 # Create a result that consists of some number of faked graft
868 # directories before the graft point, followed by all of the real
869 # directories from path that are after the graft point. Arrange
870 # for the directory at the graft point in the result to correspond
871 # to the "orig" directory in --graft orig=new. See t/thelpers.py
874 # Note that given --graft orig=new, orig and new have *nothing* to
875 # do with each other, even if some of their component names
876 # match. i.e. --graft /foo/bar/baz=/foo/bar/bax is semantically
877 # equivalent to --graft /foo/bar/baz=/x/y/z, or even
880 # FIXME: This can't be the best solution...
881 clean_path = os.path.abspath(path)
882 for graft_point in graft_points:
883 old_prefix, new_prefix = graft_point
884 # Expand prefixes iff not absolute paths.
885 old_prefix = os.path.normpath(old_prefix)
886 new_prefix = os.path.normpath(new_prefix)
887 if clean_path.startswith(old_prefix):
888 escaped_prefix = re.escape(old_prefix)
889 grafted_path = re.sub(r'^' + escaped_prefix, new_prefix, clean_path)
890 # Handle /foo=/ (at least) -- which produces //whatever.
891 grafted_path = '/' + grafted_path.lstrip('/')
892 clean_path_components = path_components(clean_path)
893 # Count the components that were stripped.
894 strip_count = 0 if old_prefix == '/' else old_prefix.count('/')
895 new_prefix_parts = new_prefix.split('/')
896 result_prefix = grafted_path.split('/')[:new_prefix.count('/')]
897 result = [(p, None) for p in result_prefix] \
898 + clean_path_components[strip_count:]
899 # Now set the graft point name to match the end of new_prefix.
900 graft_point = len(result_prefix)
901 result[graft_point] = \
902 (new_prefix_parts[-1], clean_path_components[strip_count][1])
903 if new_prefix == '/': # --graft ...=/ is a special case.
906 return path_components(clean_path)
911 """Format bup's version date string for output."""
912 return _version.DATE.split(' ')[0]
915 def version_commit():
916 """Get the commit hash of bup's current version."""
917 return _version.COMMIT
921 """Format bup's version tag (the official version number).
923 When generated from a commit other than one pointed to with a tag, the
924 returned string will be "unknown-" followed by the first seven positions of
927 names = _version.NAMES.strip()
928 assert(names[0] == '(')
929 assert(names[-1] == ')')
931 l = [n.strip() for n in names.split(',')]
933 if n.startswith('tag: bup-'):
935 return 'unknown-%s' % _version.COMMIT[:7]