1 """Helper functions and classes for bup."""
3 from collections import namedtuple
4 from ctypes import sizeof, c_void_p
6 from contextlib import contextmanager
7 import sys, os, pwd, subprocess, errno, socket, select, mmap, stat, re, struct
8 import hashlib, heapq, math, operator, time, grp, tempfile
10 from bup import _helpers
12 # This function should really be in helpers, not in bup.options. But we
13 # want options.py to be standalone so people can include it in other projects.
14 from bup.options import _tty_width
15 tty_width = _tty_width
19 """Convert the string 's' to an integer. Return 0 if s is not a number."""
27 """Convert the string 's' to a float. Return 0 if s is not a number."""
29 return float(s or '0')
34 buglvl = atoi(os.environ.get('BUP_DEBUG', 0))
37 # If the platform doesn't have fdatasync (OS X), fall back to fsync.
39 fdatasync = os.fdatasync
40 except AttributeError:
44 # Write (blockingly) to sockets that may or may not be in blocking mode.
45 # We need this because our stderr is sometimes eaten by subprocesses
46 # (probably ssh) that sometimes make it nonblocking, if only temporarily,
47 # leading to race conditions. Ick. We'll do it the hard way.
48 def _hard_write(fd, buf):
50 (r,w,x) = select.select([], [fd], [], None)
52 raise IOError('select(fd) returned without being writable')
54 sz = os.write(fd, buf)
56 if e.errno != errno.EAGAIN:
64 """Print a log message to stderr."""
67 _hard_write(sys.stderr.fileno(), s)
81 istty1 = os.isatty(1) or (atoi(os.environ.get('BUP_FORCE_TTY')) & 1)
82 istty2 = os.isatty(2) or (atoi(os.environ.get('BUP_FORCE_TTY')) & 2)
85 """Calls log() if stderr is a TTY. Does nothing otherwise."""
93 """Calls progress() only if we haven't printed progress in a while.
95 This avoids overloading the stderr buffer with excess junk.
99 if now - _last_prog > 0.1:
105 """Calls progress() to redisplay the most recent progress message.
107 Useful after you've printed some other message that wipes out the
110 if _last_progress and _last_progress.endswith('\r'):
111 progress(_last_progress)
114 def mkdirp(d, mode=None):
115 """Recursively create directories on path 'd'.
117 Unlike os.makedirs(), it doesn't raise an exception if the last element of
118 the path already exists.
126 if e.errno == errno.EEXIST:
132 _unspecified_next_default = object()
134 def _fallback_next(it, default=_unspecified_next_default):
135 """Retrieve the next item from the iterator by calling its
136 next() method. If default is given, it is returned if the
137 iterator is exhausted, otherwise StopIteration is raised."""
139 if default is _unspecified_next_default:
144 except StopIteration:
147 if sys.version_info < (2, 6):
148 next = _fallback_next
151 def merge_iter(iters, pfreq, pfunc, pfinal, key=None):
153 samekey = lambda e, pe: getattr(e, key) == getattr(pe, key, None)
155 samekey = operator.eq
157 total = sum(len(it) for it in iters)
158 iters = (iter(it) for it in iters)
159 heap = ((next(it, None),it) for it in iters)
160 heap = [(e,it) for e,it in heap if e]
165 if not count % pfreq:
168 if not samekey(e, pe):
173 e = it.next() # Don't use next() function, it's too expensive
174 except StopIteration:
175 heapq.heappop(heap) # remove current
177 heapq.heapreplace(heap, (e, it)) # shift current to new location
182 """Delete a file at path 'f' if it currently exists.
184 Unlike os.unlink(), does not throw an exception if the file didn't already
190 if e.errno != errno.ENOENT:
194 def readpipe(argv, preexec_fn=None):
195 """Run a subprocess and return its output."""
196 p = subprocess.Popen(argv, stdout=subprocess.PIPE, preexec_fn=preexec_fn)
197 out, err = p.communicate()
198 if p.returncode != 0:
199 raise Exception('subprocess %r failed with status %d'
200 % (' '.join(argv), p.returncode))
204 def _argmax_base(command):
207 base_size += len(command) + 1
208 for k, v in environ.iteritems():
209 base_size += len(k) + len(v) + 2 + sizeof(c_void_p)
213 def _argmax_args_size(args):
214 return sum(len(x) + 1 + sizeof(c_void_p) for x in args)
217 def batchpipe(command, args, preexec_fn=None, arg_max=_helpers.SC_ARG_MAX):
218 """If args is not empty, yield the output produced by calling the
219 command list with args as a sequence of strings (It may be necessary
220 to return multiple strings in order to respect ARG_MAX)."""
221 # The optional arg_max arg is a workaround for an issue with the
222 # current wvtest behavior.
223 base_size = _argmax_base(command)
225 room = arg_max - base_size
228 next_size = _argmax_args_size(args[i:i+1])
229 if room - next_size < 0:
235 assert(len(sub_args))
236 yield readpipe(command + sub_args, preexec_fn=preexec_fn)
240 """Get the absolute path of a file.
242 Behaves like os.path.realpath, but doesn't follow a symlink for the last
243 element. (ie. if 'p' itself is a symlink, this one won't follow it, but it
244 will follow symlinks in p's directory)
250 if st and stat.S_ISLNK(st.st_mode):
251 (dir, name) = os.path.split(p)
252 dir = os.path.realpath(dir)
253 out = os.path.join(dir, name)
255 out = os.path.realpath(p)
256 #log('realpathing:%r,%r\n' % (p, out))
260 def detect_fakeroot():
261 "Return True if we appear to be running under fakeroot."
262 return os.getenv("FAKEROOTKEY") != None
266 if sys.platform.startswith('cygwin'):
268 return ctypes.cdll.shell32.IsUserAnAdmin()
270 return os.geteuid() == 0
273 def _cache_key_value(get_value, key, cache):
274 """Return (value, was_cached). If there is a value in the cache
275 for key, use that, otherwise, call get_value(key) which should
276 throw a KeyError if there is no value -- in which case the cached
277 and returned value will be None.
279 try: # Do we already have it (or know there wasn't one)?
286 cache[key] = value = get_value(key)
292 _uid_to_pwd_cache = {}
293 _name_to_pwd_cache = {}
295 def pwd_from_uid(uid):
296 """Return password database entry for uid (may be a cached value).
297 Return None if no entry is found.
299 global _uid_to_pwd_cache, _name_to_pwd_cache
300 entry, cached = _cache_key_value(pwd.getpwuid, uid, _uid_to_pwd_cache)
301 if entry and not cached:
302 _name_to_pwd_cache[entry.pw_name] = entry
306 def pwd_from_name(name):
307 """Return password database entry for name (may be a cached value).
308 Return None if no entry is found.
310 global _uid_to_pwd_cache, _name_to_pwd_cache
311 entry, cached = _cache_key_value(pwd.getpwnam, name, _name_to_pwd_cache)
312 if entry and not cached:
313 _uid_to_pwd_cache[entry.pw_uid] = entry
317 _gid_to_grp_cache = {}
318 _name_to_grp_cache = {}
320 def grp_from_gid(gid):
321 """Return password database entry for gid (may be a cached value).
322 Return None if no entry is found.
324 global _gid_to_grp_cache, _name_to_grp_cache
325 entry, cached = _cache_key_value(grp.getgrgid, gid, _gid_to_grp_cache)
326 if entry and not cached:
327 _name_to_grp_cache[entry.gr_name] = entry
331 def grp_from_name(name):
332 """Return password database entry for name (may be a cached value).
333 Return None if no entry is found.
335 global _gid_to_grp_cache, _name_to_grp_cache
336 entry, cached = _cache_key_value(grp.getgrnam, name, _name_to_grp_cache)
337 if entry and not cached:
338 _gid_to_grp_cache[entry.gr_gid] = entry
344 """Get the user's login name."""
348 _username = pwd_from_uid(uid)[0] or 'user%d' % uid
354 """Get the user's full name."""
356 if not _userfullname:
358 entry = pwd_from_uid(uid)
360 _userfullname = entry[4].split(',')[0] or entry[0]
361 if not _userfullname:
362 _userfullname = 'user%d' % uid
368 """Get the FQDN of this machine."""
371 _hostname = socket.getfqdn()
375 _resource_path = None
376 def resource_path(subdir=''):
377 global _resource_path
378 if not _resource_path:
379 _resource_path = os.environ.get('BUP_RESOURCE_PATH') or '.'
380 return os.path.join(_resource_path, subdir)
382 def format_filesize(size):
387 exponent = int(math.log(size) / math.log(unit))
388 size_prefix = "KMGTPE"[exponent - 1]
389 return "%.1f%s" % (size / math.pow(unit, exponent), size_prefix)
392 class NotOk(Exception):
397 def __init__(self, outp):
401 while self._read(65536): pass
403 def read(self, size):
404 """Read 'size' bytes from input stream."""
406 return self._read(size)
409 """Read from input stream until a newline is found."""
411 return self._readline()
413 def write(self, data):
414 """Write 'data' to output stream."""
415 #log('%d writing: %d bytes\n' % (os.getpid(), len(data)))
416 self.outp.write(data)
419 """Return true if input stream is readable."""
420 raise NotImplemented("Subclasses must implement has_input")
423 """Indicate end of output from last sent command."""
427 """Indicate server error to the client."""
428 s = re.sub(r'\s+', ' ', str(s))
429 self.write('\nerror %s\n' % s)
431 def _check_ok(self, onempty):
434 for rl in linereader(self):
435 #log('%d got line: %r\n' % (os.getpid(), rl))
436 if not rl: # empty line
440 elif rl.startswith('error '):
441 #log('client: error: %s\n' % rl[6:])
445 raise Exception('server exited unexpectedly; see errors above')
447 def drain_and_check_ok(self):
448 """Remove all data for the current command from input stream."""
451 return self._check_ok(onempty)
454 """Verify that server action completed successfully."""
456 raise Exception('expected "ok", got %r' % rl)
457 return self._check_ok(onempty)
460 class Conn(BaseConn):
461 def __init__(self, inp, outp):
462 BaseConn.__init__(self, outp)
465 def _read(self, size):
466 return self.inp.read(size)
469 return self.inp.readline()
472 [rl, wl, xl] = select.select([self.inp.fileno()], [], [], 0)
474 assert(rl[0] == self.inp.fileno())
480 def checked_reader(fd, n):
482 rl, _, _ = select.select([fd], [], [])
485 if not buf: raise Exception("Unexpected EOF reading %d more bytes" % n)
490 MAX_PACKET = 128 * 1024
491 def mux(p, outfd, outr, errr):
494 while p.poll() is None:
495 rl, _, _ = select.select(fds, [], [])
498 buf = os.read(outr, MAX_PACKET)
500 os.write(outfd, struct.pack('!IB', len(buf), 1) + buf)
502 buf = os.read(errr, 1024)
504 os.write(outfd, struct.pack('!IB', len(buf), 2) + buf)
506 os.write(outfd, struct.pack('!IB', 0, 3))
509 class DemuxConn(BaseConn):
510 """A helper class for bup's client-server protocol."""
511 def __init__(self, infd, outp):
512 BaseConn.__init__(self, outp)
513 # Anything that comes through before the sync string was not
514 # multiplexed and can be assumed to be debug/log before mux init.
516 while tail != 'BUPMUX':
517 b = os.read(infd, (len(tail) < 6) and (6-len(tail)) or 1)
519 raise IOError('demux: unexpected EOF during initialization')
521 sys.stderr.write(tail[:-6]) # pre-mux log messages
528 def write(self, data):
530 BaseConn.write(self, data)
532 def _next_packet(self, timeout):
533 if self.closed: return False
534 rl, wl, xl = select.select([self.infd], [], [], timeout)
535 if not rl: return False
536 assert(rl[0] == self.infd)
537 ns = ''.join(checked_reader(self.infd, 5))
538 n, fdw = struct.unpack('!IB', ns)
539 assert(n <= MAX_PACKET)
541 self.reader = checked_reader(self.infd, n)
543 for buf in checked_reader(self.infd, n):
544 sys.stderr.write(buf)
547 debug2("DemuxConn: marked closed\n")
550 def _load_buf(self, timeout):
551 if self.buf is not None:
553 while not self.closed:
554 while not self.reader:
555 if not self._next_packet(timeout):
558 self.buf = self.reader.next()
560 except StopIteration:
564 def _read_parts(self, ix_fn):
565 while self._load_buf(None):
566 assert(self.buf is not None)
568 if i is None or i == len(self.buf):
573 self.buf = self.buf[i:]
581 return buf.index('\n')+1
584 return ''.join(self._read_parts(find_eol))
586 def _read(self, size):
588 def until_size(buf): # Closes on csize
589 if len(buf) < csize[0]:
594 return ''.join(self._read_parts(until_size))
597 return self._load_buf(0)
601 """Generate a list of input lines from 'f' without terminating newlines."""
609 def chunkyreader(f, count = None):
610 """Generate a list of chunks of data read from 'f'.
612 If count is None, read until EOF is reached.
614 If count is a positive integer, read 'count' bytes from 'f'. If EOF is
615 reached while reading, raise IOError.
619 b = f.read(min(count, 65536))
621 raise IOError('EOF with %d bytes remaining' % count)
632 def atomically_replaced_file(name, mode='w', buffering=-1):
633 """Yield a file that will be atomically renamed name when leaving the block.
635 This contextmanager yields an open file object that is backed by a
636 temporary file which will be renamed (atomically) to the target
637 name if everything succeeds.
639 The mode and buffering arguments are handled exactly as with open,
640 and the yielded file will have very restrictive permissions, as
645 with atomically_replaced_file('foo.txt', 'w') as f:
646 f.write('hello jack.')
650 (ffd, tempname) = tempfile.mkstemp(dir=os.path.dirname(name),
651 text=('b' not in mode))
654 f = os.fdopen(ffd, mode, buffering)
662 os.rename(tempname, name)
664 unlink(tempname) # nonexistant file is ignored
668 """Append "/" to 's' if it doesn't aleady end in "/"."""
669 if s and not s.endswith('/'):
675 def _mmap_do(f, sz, flags, prot, close):
677 st = os.fstat(f.fileno())
680 # trying to open a zero-length map gives an error, but an empty
681 # string has all the same behaviour of a zero-length map, ie. it has
684 map = mmap.mmap(f.fileno(), sz, flags, prot)
686 f.close() # map will persist beyond file close
690 def mmap_read(f, sz = 0, close=True):
691 """Create a read-only memory mapped region on file 'f'.
692 If sz is 0, the region will cover the entire file.
694 return _mmap_do(f, sz, mmap.MAP_PRIVATE, mmap.PROT_READ, close)
697 def mmap_readwrite(f, sz = 0, close=True):
698 """Create a read-write memory mapped region on file 'f'.
699 If sz is 0, the region will cover the entire file.
701 return _mmap_do(f, sz, mmap.MAP_SHARED, mmap.PROT_READ|mmap.PROT_WRITE,
705 def mmap_readwrite_private(f, sz = 0, close=True):
706 """Create a read-write memory mapped region on file 'f'.
707 If sz is 0, the region will cover the entire file.
708 The map is private, which means the changes are never flushed back to the
711 return _mmap_do(f, sz, mmap.MAP_PRIVATE, mmap.PROT_READ|mmap.PROT_WRITE,
715 def parse_timestamp(epoch_str):
716 """Return the number of nanoseconds since the epoch that are described
717 by epoch_str (100ms, 100ns, ...); when epoch_str cannot be parsed,
718 throw a ValueError that may contain additional information."""
719 ns_per = {'s' : 1000000000,
723 match = re.match(r'^((?:[-+]?[0-9]+)?)(s|ms|us|ns)$', epoch_str)
725 if re.match(r'^([-+]?[0-9]+)$', epoch_str):
726 raise ValueError('must include units, i.e. 100ns, 100ms, ...')
728 (n, units) = match.group(1, 2)
732 return n * ns_per[units]
736 """Parse data size information into a float number.
738 Here are some examples of conversions:
739 199.2k means 203981 bytes
740 1GB means 1073741824 bytes
741 2.1 tb means 2199023255552 bytes
743 g = re.match(r'([-+\d.e]+)\s*(\w*)', str(s))
745 raise ValueError("can't parse %r as a number" % s)
746 (val, unit) = g.groups()
749 if unit in ['t', 'tb']:
750 mult = 1024*1024*1024*1024
751 elif unit in ['g', 'gb']:
752 mult = 1024*1024*1024
753 elif unit in ['m', 'mb']:
755 elif unit in ['k', 'kb']:
757 elif unit in ['', 'b']:
760 raise ValueError("invalid unit %r in number %r" % (unit, s))
765 """Count the number of elements in an iterator. (consumes the iterator)"""
766 return reduce(lambda x,y: x+1, l)
771 """Append an error message to the list of saved errors.
773 Once processing is able to stop and output the errors, the saved errors are
774 accessible in the module variable helpers.saved_errors.
776 saved_errors.append(e)
786 """Replace the default exception handler for KeyboardInterrupt (Ctrl-C).
788 The new exception handler will make sure that bup will exit without an ugly
789 stacktrace when Ctrl-C is hit.
791 oldhook = sys.excepthook
792 def newhook(exctype, value, traceback):
793 if exctype == KeyboardInterrupt:
794 log('\nInterrupted.\n')
796 return oldhook(exctype, value, traceback)
797 sys.excepthook = newhook
800 def columnate(l, prefix):
801 """Format elements of 'l' in columns with 'prefix' leading each line.
803 The number of columns is determined automatically based on the string
809 clen = max(len(s) for s in l)
810 ncols = (tty_width() - len(prefix)) / (clen + 2)
815 while len(l) % ncols:
818 for s in range(0, len(l), rows):
819 cols.append(l[s:s+rows])
821 for row in zip(*cols):
822 out += prefix + ''.join(('%-*s' % (clen+2, s)) for s in row) + '\n'
826 def parse_date_or_fatal(str, fatal):
827 """Parses the given date or calls Option.fatal().
828 For now we expect a string that contains a float."""
831 except ValueError, e:
832 raise fatal('invalid date format (should be a float): %r' % e)
837 def parse_excludes(options, fatal):
838 """Traverse the options and extract all excludes, or call Option.fatal()."""
842 (option, parameter) = flag
843 if option == '--exclude':
844 excluded_paths.append(realpath(parameter))
845 elif option == '--exclude-from':
847 f = open(realpath(parameter))
849 raise fatal("couldn't read %s" % parameter)
850 for exclude_path in f.readlines():
851 # FIXME: perhaps this should be rstrip('\n')
852 exclude_path = realpath(exclude_path.strip())
854 excluded_paths.append(exclude_path)
855 return sorted(frozenset(excluded_paths))
858 def parse_rx_excludes(options, fatal):
859 """Traverse the options and extract all rx excludes, or call
861 excluded_patterns = []
864 (option, parameter) = flag
865 if option == '--exclude-rx':
867 excluded_patterns.append(re.compile(parameter))
869 fatal('invalid --exclude-rx pattern (%s): %s' % (parameter, ex))
870 elif option == '--exclude-rx-from':
872 f = open(realpath(parameter))
874 raise fatal("couldn't read %s" % parameter)
875 for pattern in f.readlines():
876 spattern = pattern.rstrip('\n')
880 excluded_patterns.append(re.compile(spattern))
882 fatal('invalid --exclude-rx pattern (%s): %s' % (spattern, ex))
883 return excluded_patterns
886 def should_rx_exclude_path(path, exclude_rxs):
887 """Return True if path matches a regular expression in exclude_rxs."""
888 for rx in exclude_rxs:
890 debug1('Skipping %r: excluded by rx pattern %r.\n'
891 % (path, rx.pattern))
896 # FIXME: Carefully consider the use of functions (os.path.*, etc.)
897 # that resolve against the current filesystem in the strip/graft
898 # functions for example, but elsewhere as well. I suspect bup's not
899 # always being careful about that. For some cases, the contents of
900 # the current filesystem should be irrelevant, and consulting it might
901 # produce the wrong result, perhaps via unintended symlink resolution,
904 def path_components(path):
905 """Break path into a list of pairs of the form (name,
906 full_path_to_name). Path must start with '/'.
908 '/home/foo' -> [('', '/'), ('home', '/home'), ('foo', '/home/foo')]"""
909 if not path.startswith('/'):
910 raise Exception, 'path must start with "/": %s' % path
911 # Since we assume path startswith('/'), we can skip the first element.
913 norm_path = os.path.abspath(path)
917 for p in norm_path.split('/')[1:]:
919 result.append((p, full_path))
923 def stripped_path_components(path, strip_prefixes):
924 """Strip any prefix in strip_prefixes from path and return a list
925 of path components where each component is (name,
926 none_or_full_fs_path_to_name). Assume path startswith('/').
927 See thelpers.py for examples."""
928 normalized_path = os.path.abspath(path)
929 sorted_strip_prefixes = sorted(strip_prefixes, key=len, reverse=True)
930 for bp in sorted_strip_prefixes:
931 normalized_bp = os.path.abspath(bp)
932 if normalized_bp == '/':
934 if normalized_path.startswith(normalized_bp):
935 prefix = normalized_path[:len(normalized_bp)]
937 for p in normalized_path[len(normalized_bp):].split('/'):
941 result.append((p, prefix))
944 return path_components(path)
947 def grafted_path_components(graft_points, path):
948 # Create a result that consists of some number of faked graft
949 # directories before the graft point, followed by all of the real
950 # directories from path that are after the graft point. Arrange
951 # for the directory at the graft point in the result to correspond
952 # to the "orig" directory in --graft orig=new. See t/thelpers.py
955 # Note that given --graft orig=new, orig and new have *nothing* to
956 # do with each other, even if some of their component names
957 # match. i.e. --graft /foo/bar/baz=/foo/bar/bax is semantically
958 # equivalent to --graft /foo/bar/baz=/x/y/z, or even
961 # FIXME: This can't be the best solution...
962 clean_path = os.path.abspath(path)
963 for graft_point in graft_points:
964 old_prefix, new_prefix = graft_point
965 # Expand prefixes iff not absolute paths.
966 old_prefix = os.path.normpath(old_prefix)
967 new_prefix = os.path.normpath(new_prefix)
968 if clean_path.startswith(old_prefix):
969 escaped_prefix = re.escape(old_prefix)
970 grafted_path = re.sub(r'^' + escaped_prefix, new_prefix, clean_path)
971 # Handle /foo=/ (at least) -- which produces //whatever.
972 grafted_path = '/' + grafted_path.lstrip('/')
973 clean_path_components = path_components(clean_path)
974 # Count the components that were stripped.
975 strip_count = 0 if old_prefix == '/' else old_prefix.count('/')
976 new_prefix_parts = new_prefix.split('/')
977 result_prefix = grafted_path.split('/')[:new_prefix.count('/')]
978 result = [(p, None) for p in result_prefix] \
979 + clean_path_components[strip_count:]
980 # Now set the graft point name to match the end of new_prefix.
981 graft_point = len(result_prefix)
982 result[graft_point] = \
983 (new_prefix_parts[-1], clean_path_components[strip_count][1])
984 if new_prefix == '/': # --graft ...=/ is a special case.
987 return path_components(clean_path)
993 _localtime = getattr(_helpers, 'localtime', None)
996 bup_time = namedtuple('bup_time', ['tm_year', 'tm_mon', 'tm_mday',
997 'tm_hour', 'tm_min', 'tm_sec',
998 'tm_wday', 'tm_yday',
999 'tm_isdst', 'tm_gmtoff', 'tm_zone'])
1001 # Define a localtime() that returns bup_time when possible. Note:
1002 # this means that any helpers.localtime() results may need to be
1003 # passed through to_py_time() before being passed to python's time
1004 # module, which doesn't appear willing to ignore the extra items.
1006 def localtime(time):
1007 return bup_time(*_helpers.localtime(time))
1008 def utc_offset_str(t):
1009 'Return the local offset from UTC as "+hhmm" or "-hhmm" for time t.'
1010 off = localtime(t).tm_gmtoff
1012 return "%+03d%02d" % (hrs, abs(off - (hrs * 60 * 60)))
1014 if isinstance(x, time.struct_time):
1016 return time.struct_time(x[:9])
1018 localtime = time.localtime
1019 def utc_offset_str(t):
1020 return time.strftime('%z', localtime(t))