1 """Helper functions and classes for bup."""
3 from collections import namedtuple
4 from ctypes import sizeof, c_void_p
6 from contextlib import contextmanager
7 import sys, os, pwd, subprocess, errno, socket, select, mmap, stat, re, struct
8 import hashlib, heapq, math, operator, time, grp, tempfile
10 from bup import _helpers
12 sc_page_size = os.sysconf('SC_PAGE_SIZE')
13 assert(sc_page_size > 0)
15 sc_arg_max = os.sysconf('SC_ARG_MAX')
19 # This function should really be in helpers, not in bup.options. But we
20 # want options.py to be standalone so people can include it in other projects.
21 from bup.options import _tty_width
22 tty_width = _tty_width
26 """Convert the string 's' to an integer. Return 0 if s is not a number."""
34 """Convert the string 's' to a float. Return 0 if s is not a number."""
36 return float(s or '0')
41 buglvl = atoi(os.environ.get('BUP_DEBUG', 0))
44 # If the platform doesn't have fdatasync (OS X), fall back to fsync.
46 fdatasync = os.fdatasync
47 except AttributeError:
51 # Write (blockingly) to sockets that may or may not be in blocking mode.
52 # We need this because our stderr is sometimes eaten by subprocesses
53 # (probably ssh) that sometimes make it nonblocking, if only temporarily,
54 # leading to race conditions. Ick. We'll do it the hard way.
55 def _hard_write(fd, buf):
57 (r,w,x) = select.select([], [fd], [], None)
59 raise IOError('select(fd) returned without being writable')
61 sz = os.write(fd, buf)
63 if e.errno != errno.EAGAIN:
71 """Print a log message to stderr."""
74 _hard_write(sys.stderr.fileno(), s)
88 istty1 = os.isatty(1) or (atoi(os.environ.get('BUP_FORCE_TTY')) & 1)
89 istty2 = os.isatty(2) or (atoi(os.environ.get('BUP_FORCE_TTY')) & 2)
92 """Calls log() if stderr is a TTY. Does nothing otherwise."""
100 """Calls progress() only if we haven't printed progress in a while.
102 This avoids overloading the stderr buffer with excess junk.
106 if now - _last_prog > 0.1:
112 """Calls progress() to redisplay the most recent progress message.
114 Useful after you've printed some other message that wipes out the
117 if _last_progress and _last_progress.endswith('\r'):
118 progress(_last_progress)
121 def mkdirp(d, mode=None):
122 """Recursively create directories on path 'd'.
124 Unlike os.makedirs(), it doesn't raise an exception if the last element of
125 the path already exists.
133 if e.errno == errno.EEXIST:
139 _unspecified_next_default = object()
141 def _fallback_next(it, default=_unspecified_next_default):
142 """Retrieve the next item from the iterator by calling its
143 next() method. If default is given, it is returned if the
144 iterator is exhausted, otherwise StopIteration is raised."""
146 if default is _unspecified_next_default:
151 except StopIteration:
154 if sys.version_info < (2, 6):
155 next = _fallback_next
158 def merge_iter(iters, pfreq, pfunc, pfinal, key=None):
160 samekey = lambda e, pe: getattr(e, key) == getattr(pe, key, None)
162 samekey = operator.eq
164 total = sum(len(it) for it in iters)
165 iters = (iter(it) for it in iters)
166 heap = ((next(it, None),it) for it in iters)
167 heap = [(e,it) for e,it in heap if e]
172 if not count % pfreq:
175 if not samekey(e, pe):
180 e = it.next() # Don't use next() function, it's too expensive
181 except StopIteration:
182 heapq.heappop(heap) # remove current
184 heapq.heapreplace(heap, (e, it)) # shift current to new location
189 """Delete a file at path 'f' if it currently exists.
191 Unlike os.unlink(), does not throw an exception if the file didn't already
197 if e.errno != errno.ENOENT:
201 def readpipe(argv, preexec_fn=None):
202 """Run a subprocess and return its output."""
203 p = subprocess.Popen(argv, stdout=subprocess.PIPE, preexec_fn=preexec_fn)
204 out, err = p.communicate()
205 if p.returncode != 0:
206 raise Exception('subprocess %r failed with status %d'
207 % (' '.join(argv), p.returncode))
211 def _argmax_base(command):
214 base_size += len(command) + 1
215 for k, v in environ.iteritems():
216 base_size += len(k) + len(v) + 2 + sizeof(c_void_p)
220 def _argmax_args_size(args):
221 return sum(len(x) + 1 + sizeof(c_void_p) for x in args)
224 def batchpipe(command, args, preexec_fn=None, arg_max=sc_arg_max):
225 """If args is not empty, yield the output produced by calling the
226 command list with args as a sequence of strings (It may be necessary
227 to return multiple strings in order to respect ARG_MAX)."""
228 # The optional arg_max arg is a workaround for an issue with the
229 # current wvtest behavior.
230 base_size = _argmax_base(command)
232 room = arg_max - base_size
235 next_size = _argmax_args_size(args[i:i+1])
236 if room - next_size < 0:
242 assert(len(sub_args))
243 yield readpipe(command + sub_args, preexec_fn=preexec_fn)
247 """Get the absolute path of a file.
249 Behaves like os.path.realpath, but doesn't follow a symlink for the last
250 element. (ie. if 'p' itself is a symlink, this one won't follow it, but it
251 will follow symlinks in p's directory)
257 if st and stat.S_ISLNK(st.st_mode):
258 (dir, name) = os.path.split(p)
259 dir = os.path.realpath(dir)
260 out = os.path.join(dir, name)
262 out = os.path.realpath(p)
263 #log('realpathing:%r,%r\n' % (p, out))
267 def detect_fakeroot():
268 "Return True if we appear to be running under fakeroot."
269 return os.getenv("FAKEROOTKEY") != None
273 if sys.platform.startswith('cygwin'):
275 return ctypes.cdll.shell32.IsUserAnAdmin()
277 return os.geteuid() == 0
280 def _cache_key_value(get_value, key, cache):
281 """Return (value, was_cached). If there is a value in the cache
282 for key, use that, otherwise, call get_value(key) which should
283 throw a KeyError if there is no value -- in which case the cached
284 and returned value will be None.
286 try: # Do we already have it (or know there wasn't one)?
293 cache[key] = value = get_value(key)
299 _uid_to_pwd_cache = {}
300 _name_to_pwd_cache = {}
302 def pwd_from_uid(uid):
303 """Return password database entry for uid (may be a cached value).
304 Return None if no entry is found.
306 global _uid_to_pwd_cache, _name_to_pwd_cache
307 entry, cached = _cache_key_value(pwd.getpwuid, uid, _uid_to_pwd_cache)
308 if entry and not cached:
309 _name_to_pwd_cache[entry.pw_name] = entry
313 def pwd_from_name(name):
314 """Return password database entry for name (may be a cached value).
315 Return None if no entry is found.
317 global _uid_to_pwd_cache, _name_to_pwd_cache
318 entry, cached = _cache_key_value(pwd.getpwnam, name, _name_to_pwd_cache)
319 if entry and not cached:
320 _uid_to_pwd_cache[entry.pw_uid] = entry
324 _gid_to_grp_cache = {}
325 _name_to_grp_cache = {}
327 def grp_from_gid(gid):
328 """Return password database entry for gid (may be a cached value).
329 Return None if no entry is found.
331 global _gid_to_grp_cache, _name_to_grp_cache
332 entry, cached = _cache_key_value(grp.getgrgid, gid, _gid_to_grp_cache)
333 if entry and not cached:
334 _name_to_grp_cache[entry.gr_name] = entry
338 def grp_from_name(name):
339 """Return password database entry for name (may be a cached value).
340 Return None if no entry is found.
342 global _gid_to_grp_cache, _name_to_grp_cache
343 entry, cached = _cache_key_value(grp.getgrnam, name, _name_to_grp_cache)
344 if entry and not cached:
345 _gid_to_grp_cache[entry.gr_gid] = entry
351 """Get the user's login name."""
355 _username = pwd_from_uid(uid)[0] or 'user%d' % uid
361 """Get the user's full name."""
363 if not _userfullname:
365 entry = pwd_from_uid(uid)
367 _userfullname = entry[4].split(',')[0] or entry[0]
368 if not _userfullname:
369 _userfullname = 'user%d' % uid
375 """Get the FQDN of this machine."""
378 _hostname = socket.getfqdn()
382 _resource_path = None
383 def resource_path(subdir=''):
384 global _resource_path
385 if not _resource_path:
386 _resource_path = os.environ.get('BUP_RESOURCE_PATH') or '.'
387 return os.path.join(_resource_path, subdir)
389 def format_filesize(size):
394 exponent = int(math.log(size) / math.log(unit))
395 size_prefix = "KMGTPE"[exponent - 1]
396 return "%.1f%s" % (size / math.pow(unit, exponent), size_prefix)
399 class NotOk(Exception):
404 def __init__(self, outp):
408 while self._read(65536): pass
410 def read(self, size):
411 """Read 'size' bytes from input stream."""
413 return self._read(size)
416 """Read from input stream until a newline is found."""
418 return self._readline()
420 def write(self, data):
421 """Write 'data' to output stream."""
422 #log('%d writing: %d bytes\n' % (os.getpid(), len(data)))
423 self.outp.write(data)
426 """Return true if input stream is readable."""
427 raise NotImplemented("Subclasses must implement has_input")
430 """Indicate end of output from last sent command."""
434 """Indicate server error to the client."""
435 s = re.sub(r'\s+', ' ', str(s))
436 self.write('\nerror %s\n' % s)
438 def _check_ok(self, onempty):
441 for rl in linereader(self):
442 #log('%d got line: %r\n' % (os.getpid(), rl))
443 if not rl: # empty line
447 elif rl.startswith('error '):
448 #log('client: error: %s\n' % rl[6:])
452 raise Exception('server exited unexpectedly; see errors above')
454 def drain_and_check_ok(self):
455 """Remove all data for the current command from input stream."""
458 return self._check_ok(onempty)
461 """Verify that server action completed successfully."""
463 raise Exception('expected "ok", got %r' % rl)
464 return self._check_ok(onempty)
467 class Conn(BaseConn):
468 def __init__(self, inp, outp):
469 BaseConn.__init__(self, outp)
472 def _read(self, size):
473 return self.inp.read(size)
476 return self.inp.readline()
479 [rl, wl, xl] = select.select([self.inp.fileno()], [], [], 0)
481 assert(rl[0] == self.inp.fileno())
487 def checked_reader(fd, n):
489 rl, _, _ = select.select([fd], [], [])
492 if not buf: raise Exception("Unexpected EOF reading %d more bytes" % n)
497 MAX_PACKET = 128 * 1024
498 def mux(p, outfd, outr, errr):
501 while p.poll() is None:
502 rl, _, _ = select.select(fds, [], [])
505 buf = os.read(outr, MAX_PACKET)
507 os.write(outfd, struct.pack('!IB', len(buf), 1) + buf)
509 buf = os.read(errr, 1024)
511 os.write(outfd, struct.pack('!IB', len(buf), 2) + buf)
513 os.write(outfd, struct.pack('!IB', 0, 3))
516 class DemuxConn(BaseConn):
517 """A helper class for bup's client-server protocol."""
518 def __init__(self, infd, outp):
519 BaseConn.__init__(self, outp)
520 # Anything that comes through before the sync string was not
521 # multiplexed and can be assumed to be debug/log before mux init.
523 while tail != 'BUPMUX':
524 b = os.read(infd, (len(tail) < 6) and (6-len(tail)) or 1)
526 raise IOError('demux: unexpected EOF during initialization')
528 sys.stderr.write(tail[:-6]) # pre-mux log messages
535 def write(self, data):
537 BaseConn.write(self, data)
539 def _next_packet(self, timeout):
540 if self.closed: return False
541 rl, wl, xl = select.select([self.infd], [], [], timeout)
542 if not rl: return False
543 assert(rl[0] == self.infd)
544 ns = ''.join(checked_reader(self.infd, 5))
545 n, fdw = struct.unpack('!IB', ns)
546 assert(n <= MAX_PACKET)
548 self.reader = checked_reader(self.infd, n)
550 for buf in checked_reader(self.infd, n):
551 sys.stderr.write(buf)
554 debug2("DemuxConn: marked closed\n")
557 def _load_buf(self, timeout):
558 if self.buf is not None:
560 while not self.closed:
561 while not self.reader:
562 if not self._next_packet(timeout):
565 self.buf = self.reader.next()
567 except StopIteration:
571 def _read_parts(self, ix_fn):
572 while self._load_buf(None):
573 assert(self.buf is not None)
575 if i is None or i == len(self.buf):
580 self.buf = self.buf[i:]
588 return buf.index('\n')+1
591 return ''.join(self._read_parts(find_eol))
593 def _read(self, size):
595 def until_size(buf): # Closes on csize
596 if len(buf) < csize[0]:
601 return ''.join(self._read_parts(until_size))
604 return self._load_buf(0)
608 """Generate a list of input lines from 'f' without terminating newlines."""
616 def chunkyreader(f, count = None):
617 """Generate a list of chunks of data read from 'f'.
619 If count is None, read until EOF is reached.
621 If count is a positive integer, read 'count' bytes from 'f'. If EOF is
622 reached while reading, raise IOError.
626 b = f.read(min(count, 65536))
628 raise IOError('EOF with %d bytes remaining' % count)
639 def atomically_replaced_file(name, mode='w', buffering=-1):
640 """Yield a file that will be atomically renamed name when leaving the block.
642 This contextmanager yields an open file object that is backed by a
643 temporary file which will be renamed (atomically) to the target
644 name if everything succeeds.
646 The mode and buffering arguments are handled exactly as with open,
647 and the yielded file will have very restrictive permissions, as
652 with atomically_replaced_file('foo.txt', 'w') as f:
653 f.write('hello jack.')
657 (ffd, tempname) = tempfile.mkstemp(dir=os.path.dirname(name),
658 text=('b' not in mode))
661 f = os.fdopen(ffd, mode, buffering)
669 os.rename(tempname, name)
671 unlink(tempname) # nonexistant file is ignored
675 """Append "/" to 's' if it doesn't aleady end in "/"."""
676 if s and not s.endswith('/'):
682 def _mmap_do(f, sz, flags, prot, close):
684 st = os.fstat(f.fileno())
687 # trying to open a zero-length map gives an error, but an empty
688 # string has all the same behaviour of a zero-length map, ie. it has
691 map = mmap.mmap(f.fileno(), sz, flags, prot)
693 f.close() # map will persist beyond file close
697 def mmap_read(f, sz = 0, close=True):
698 """Create a read-only memory mapped region on file 'f'.
699 If sz is 0, the region will cover the entire file.
701 return _mmap_do(f, sz, mmap.MAP_PRIVATE, mmap.PROT_READ, close)
704 def mmap_readwrite(f, sz = 0, close=True):
705 """Create a read-write memory mapped region on file 'f'.
706 If sz is 0, the region will cover the entire file.
708 return _mmap_do(f, sz, mmap.MAP_SHARED, mmap.PROT_READ|mmap.PROT_WRITE,
712 def mmap_readwrite_private(f, sz = 0, close=True):
713 """Create a read-write memory mapped region on file 'f'.
714 If sz is 0, the region will cover the entire file.
715 The map is private, which means the changes are never flushed back to the
718 return _mmap_do(f, sz, mmap.MAP_PRIVATE, mmap.PROT_READ|mmap.PROT_WRITE,
722 def parse_timestamp(epoch_str):
723 """Return the number of nanoseconds since the epoch that are described
724 by epoch_str (100ms, 100ns, ...); when epoch_str cannot be parsed,
725 throw a ValueError that may contain additional information."""
726 ns_per = {'s' : 1000000000,
730 match = re.match(r'^((?:[-+]?[0-9]+)?)(s|ms|us|ns)$', epoch_str)
732 if re.match(r'^([-+]?[0-9]+)$', epoch_str):
733 raise ValueError('must include units, i.e. 100ns, 100ms, ...')
735 (n, units) = match.group(1, 2)
739 return n * ns_per[units]
743 """Parse data size information into a float number.
745 Here are some examples of conversions:
746 199.2k means 203981 bytes
747 1GB means 1073741824 bytes
748 2.1 tb means 2199023255552 bytes
750 g = re.match(r'([-+\d.e]+)\s*(\w*)', str(s))
752 raise ValueError("can't parse %r as a number" % s)
753 (val, unit) = g.groups()
756 if unit in ['t', 'tb']:
757 mult = 1024*1024*1024*1024
758 elif unit in ['g', 'gb']:
759 mult = 1024*1024*1024
760 elif unit in ['m', 'mb']:
762 elif unit in ['k', 'kb']:
764 elif unit in ['', 'b']:
767 raise ValueError("invalid unit %r in number %r" % (unit, s))
772 """Count the number of elements in an iterator. (consumes the iterator)"""
773 return reduce(lambda x,y: x+1, l)
778 """Append an error message to the list of saved errors.
780 Once processing is able to stop and output the errors, the saved errors are
781 accessible in the module variable helpers.saved_errors.
783 saved_errors.append(e)
793 """Replace the default exception handler for KeyboardInterrupt (Ctrl-C).
795 The new exception handler will make sure that bup will exit without an ugly
796 stacktrace when Ctrl-C is hit.
798 oldhook = sys.excepthook
799 def newhook(exctype, value, traceback):
800 if exctype == KeyboardInterrupt:
801 log('\nInterrupted.\n')
803 return oldhook(exctype, value, traceback)
804 sys.excepthook = newhook
807 def columnate(l, prefix):
808 """Format elements of 'l' in columns with 'prefix' leading each line.
810 The number of columns is determined automatically based on the string
816 clen = max(len(s) for s in l)
817 ncols = (tty_width() - len(prefix)) / (clen + 2)
822 while len(l) % ncols:
825 for s in range(0, len(l), rows):
826 cols.append(l[s:s+rows])
828 for row in zip(*cols):
829 out += prefix + ''.join(('%-*s' % (clen+2, s)) for s in row) + '\n'
833 def parse_date_or_fatal(str, fatal):
834 """Parses the given date or calls Option.fatal().
835 For now we expect a string that contains a float."""
838 except ValueError, e:
839 raise fatal('invalid date format (should be a float): %r' % e)
844 def parse_excludes(options, fatal):
845 """Traverse the options and extract all excludes, or call Option.fatal()."""
849 (option, parameter) = flag
850 if option == '--exclude':
851 excluded_paths.append(realpath(parameter))
852 elif option == '--exclude-from':
854 f = open(realpath(parameter))
856 raise fatal("couldn't read %s" % parameter)
857 for exclude_path in f.readlines():
858 # FIXME: perhaps this should be rstrip('\n')
859 exclude_path = realpath(exclude_path.strip())
861 excluded_paths.append(exclude_path)
862 return sorted(frozenset(excluded_paths))
865 def parse_rx_excludes(options, fatal):
866 """Traverse the options and extract all rx excludes, or call
868 excluded_patterns = []
871 (option, parameter) = flag
872 if option == '--exclude-rx':
874 excluded_patterns.append(re.compile(parameter))
876 fatal('invalid --exclude-rx pattern (%s): %s' % (parameter, ex))
877 elif option == '--exclude-rx-from':
879 f = open(realpath(parameter))
881 raise fatal("couldn't read %s" % parameter)
882 for pattern in f.readlines():
883 spattern = pattern.rstrip('\n')
887 excluded_patterns.append(re.compile(spattern))
889 fatal('invalid --exclude-rx pattern (%s): %s' % (spattern, ex))
890 return excluded_patterns
893 def should_rx_exclude_path(path, exclude_rxs):
894 """Return True if path matches a regular expression in exclude_rxs."""
895 for rx in exclude_rxs:
897 debug1('Skipping %r: excluded by rx pattern %r.\n'
898 % (path, rx.pattern))
903 # FIXME: Carefully consider the use of functions (os.path.*, etc.)
904 # that resolve against the current filesystem in the strip/graft
905 # functions for example, but elsewhere as well. I suspect bup's not
906 # always being careful about that. For some cases, the contents of
907 # the current filesystem should be irrelevant, and consulting it might
908 # produce the wrong result, perhaps via unintended symlink resolution,
911 def path_components(path):
912 """Break path into a list of pairs of the form (name,
913 full_path_to_name). Path must start with '/'.
915 '/home/foo' -> [('', '/'), ('home', '/home'), ('foo', '/home/foo')]"""
916 if not path.startswith('/'):
917 raise Exception, 'path must start with "/": %s' % path
918 # Since we assume path startswith('/'), we can skip the first element.
920 norm_path = os.path.abspath(path)
924 for p in norm_path.split('/')[1:]:
926 result.append((p, full_path))
930 def stripped_path_components(path, strip_prefixes):
931 """Strip any prefix in strip_prefixes from path and return a list
932 of path components where each component is (name,
933 none_or_full_fs_path_to_name). Assume path startswith('/').
934 See thelpers.py for examples."""
935 normalized_path = os.path.abspath(path)
936 sorted_strip_prefixes = sorted(strip_prefixes, key=len, reverse=True)
937 for bp in sorted_strip_prefixes:
938 normalized_bp = os.path.abspath(bp)
939 if normalized_bp == '/':
941 if normalized_path.startswith(normalized_bp):
942 prefix = normalized_path[:len(normalized_bp)]
944 for p in normalized_path[len(normalized_bp):].split('/'):
948 result.append((p, prefix))
951 return path_components(path)
954 def grafted_path_components(graft_points, path):
955 # Create a result that consists of some number of faked graft
956 # directories before the graft point, followed by all of the real
957 # directories from path that are after the graft point. Arrange
958 # for the directory at the graft point in the result to correspond
959 # to the "orig" directory in --graft orig=new. See t/thelpers.py
962 # Note that given --graft orig=new, orig and new have *nothing* to
963 # do with each other, even if some of their component names
964 # match. i.e. --graft /foo/bar/baz=/foo/bar/bax is semantically
965 # equivalent to --graft /foo/bar/baz=/x/y/z, or even
968 # FIXME: This can't be the best solution...
969 clean_path = os.path.abspath(path)
970 for graft_point in graft_points:
971 old_prefix, new_prefix = graft_point
972 # Expand prefixes iff not absolute paths.
973 old_prefix = os.path.normpath(old_prefix)
974 new_prefix = os.path.normpath(new_prefix)
975 if clean_path.startswith(old_prefix):
976 escaped_prefix = re.escape(old_prefix)
977 grafted_path = re.sub(r'^' + escaped_prefix, new_prefix, clean_path)
978 # Handle /foo=/ (at least) -- which produces //whatever.
979 grafted_path = '/' + grafted_path.lstrip('/')
980 clean_path_components = path_components(clean_path)
981 # Count the components that were stripped.
982 strip_count = 0 if old_prefix == '/' else old_prefix.count('/')
983 new_prefix_parts = new_prefix.split('/')
984 result_prefix = grafted_path.split('/')[:new_prefix.count('/')]
985 result = [(p, None) for p in result_prefix] \
986 + clean_path_components[strip_count:]
987 # Now set the graft point name to match the end of new_prefix.
988 graft_point = len(result_prefix)
989 result[graft_point] = \
990 (new_prefix_parts[-1], clean_path_components[strip_count][1])
991 if new_prefix == '/': # --graft ...=/ is a special case.
994 return path_components(clean_path)
1000 _localtime = getattr(_helpers, 'localtime', None)
1003 bup_time = namedtuple('bup_time', ['tm_year', 'tm_mon', 'tm_mday',
1004 'tm_hour', 'tm_min', 'tm_sec',
1005 'tm_wday', 'tm_yday',
1006 'tm_isdst', 'tm_gmtoff', 'tm_zone'])
1008 # Define a localtime() that returns bup_time when possible. Note:
1009 # this means that any helpers.localtime() results may need to be
1010 # passed through to_py_time() before being passed to python's time
1011 # module, which doesn't appear willing to ignore the extra items.
1013 def localtime(time):
1014 return bup_time(*_helpers.localtime(time))
1015 def utc_offset_str(t):
1016 'Return the local offset from UTC as "+hhmm" or "-hhmm" for time t.'
1017 off = localtime(t).tm_gmtoff
1019 return "%+03d%02d" % (hrs, abs(off - (hrs * 60 * 60)))
1021 if isinstance(x, time.struct_time):
1023 return time.struct_time(x[:9])
1025 localtime = time.localtime
1026 def utc_offset_str(t):
1027 return time.strftime('%z', localtime(t))