1 """Helper functions and classes for bup."""
3 from ctypes import sizeof, c_void_p
5 from contextlib import contextmanager
6 import sys, os, pwd, subprocess, errno, socket, select, mmap, stat, re, struct
7 import hashlib, heapq, operator, time, grp, tempfile
9 from bup import _helpers
10 import bup._helpers as _helpers
13 # This function should really be in helpers, not in bup.options. But we
14 # want options.py to be standalone so people can include it in other projects.
15 from bup.options import _tty_width
16 tty_width = _tty_width
20 """Convert the string 's' to an integer. Return 0 if s is not a number."""
28 """Convert the string 's' to a float. Return 0 if s is not a number."""
30 return float(s or '0')
35 buglvl = atoi(os.environ.get('BUP_DEBUG', 0))
38 # If the platform doesn't have fdatasync (OS X), fall back to fsync.
40 fdatasync = os.fdatasync
41 except AttributeError:
45 # Write (blockingly) to sockets that may or may not be in blocking mode.
46 # We need this because our stderr is sometimes eaten by subprocesses
47 # (probably ssh) that sometimes make it nonblocking, if only temporarily,
48 # leading to race conditions. Ick. We'll do it the hard way.
49 def _hard_write(fd, buf):
51 (r,w,x) = select.select([], [fd], [], None)
53 raise IOError('select(fd) returned without being writable')
55 sz = os.write(fd, buf)
57 if e.errno != errno.EAGAIN:
65 """Print a log message to stderr."""
68 _hard_write(sys.stderr.fileno(), s)
82 istty1 = os.isatty(1) or (atoi(os.environ.get('BUP_FORCE_TTY')) & 1)
83 istty2 = os.isatty(2) or (atoi(os.environ.get('BUP_FORCE_TTY')) & 2)
86 """Calls log() if stderr is a TTY. Does nothing otherwise."""
94 """Calls progress() only if we haven't printed progress in a while.
96 This avoids overloading the stderr buffer with excess junk.
100 if now - _last_prog > 0.1:
106 """Calls progress() to redisplay the most recent progress message.
108 Useful after you've printed some other message that wipes out the
111 if _last_progress and _last_progress.endswith('\r'):
112 progress(_last_progress)
115 def mkdirp(d, mode=None):
116 """Recursively create directories on path 'd'.
118 Unlike os.makedirs(), it doesn't raise an exception if the last element of
119 the path already exists.
127 if e.errno == errno.EEXIST:
133 _unspecified_next_default = object()
135 def _fallback_next(it, default=_unspecified_next_default):
136 """Retrieve the next item from the iterator by calling its
137 next() method. If default is given, it is returned if the
138 iterator is exhausted, otherwise StopIteration is raised."""
140 if default is _unspecified_next_default:
145 except StopIteration:
148 if sys.version_info < (2, 6):
149 next = _fallback_next
152 def merge_iter(iters, pfreq, pfunc, pfinal, key=None):
154 samekey = lambda e, pe: getattr(e, key) == getattr(pe, key, None)
156 samekey = operator.eq
158 total = sum(len(it) for it in iters)
159 iters = (iter(it) for it in iters)
160 heap = ((next(it, None),it) for it in iters)
161 heap = [(e,it) for e,it in heap if e]
166 if not count % pfreq:
169 if not samekey(e, pe):
174 e = it.next() # Don't use next() function, it's too expensive
175 except StopIteration:
176 heapq.heappop(heap) # remove current
178 heapq.heapreplace(heap, (e, it)) # shift current to new location
183 """Delete a file at path 'f' if it currently exists.
185 Unlike os.unlink(), does not throw an exception if the file didn't already
191 if e.errno != errno.ENOENT:
195 def readpipe(argv, preexec_fn=None):
196 """Run a subprocess and return its output."""
197 p = subprocess.Popen(argv, stdout=subprocess.PIPE, preexec_fn=preexec_fn)
198 out, err = p.communicate()
199 if p.returncode != 0:
200 raise Exception('subprocess %r failed with status %d'
201 % (' '.join(argv), p.returncode))
205 def _argmax_base(command):
208 base_size += len(command) + 1
209 for k, v in environ.iteritems():
210 base_size += len(k) + len(v) + 2 + sizeof(c_void_p)
214 def _argmax_args_size(args):
215 return sum(len(x) + 1 + sizeof(c_void_p) for x in args)
218 def batchpipe(command, args, preexec_fn=None, arg_max=_helpers.SC_ARG_MAX):
219 """If args is not empty, yield the output produced by calling the
220 command list with args as a sequence of strings (It may be necessary
221 to return multiple strings in order to respect ARG_MAX)."""
222 # The optional arg_max arg is a workaround for an issue with the
223 # current wvtest behavior.
224 base_size = _argmax_base(command)
226 room = arg_max - base_size
229 next_size = _argmax_args_size(args[i:i+1])
230 if room - next_size < 0:
236 assert(len(sub_args))
237 yield readpipe(command + sub_args, preexec_fn=preexec_fn)
241 """Get the absolute path of a file.
243 Behaves like os.path.realpath, but doesn't follow a symlink for the last
244 element. (ie. if 'p' itself is a symlink, this one won't follow it, but it
245 will follow symlinks in p's directory)
251 if st and stat.S_ISLNK(st.st_mode):
252 (dir, name) = os.path.split(p)
253 dir = os.path.realpath(dir)
254 out = os.path.join(dir, name)
256 out = os.path.realpath(p)
257 #log('realpathing:%r,%r\n' % (p, out))
261 def detect_fakeroot():
262 "Return True if we appear to be running under fakeroot."
263 return os.getenv("FAKEROOTKEY") != None
267 if sys.platform.startswith('cygwin'):
269 return ctypes.cdll.shell32.IsUserAnAdmin()
271 return os.geteuid() == 0
274 def _cache_key_value(get_value, key, cache):
275 """Return (value, was_cached). If there is a value in the cache
276 for key, use that, otherwise, call get_value(key) which should
277 throw a KeyError if there is no value -- in which case the cached
278 and returned value will be None.
280 try: # Do we already have it (or know there wasn't one)?
287 cache[key] = value = get_value(key)
293 _uid_to_pwd_cache = {}
294 _name_to_pwd_cache = {}
296 def pwd_from_uid(uid):
297 """Return password database entry for uid (may be a cached value).
298 Return None if no entry is found.
300 global _uid_to_pwd_cache, _name_to_pwd_cache
301 entry, cached = _cache_key_value(pwd.getpwuid, uid, _uid_to_pwd_cache)
302 if entry and not cached:
303 _name_to_pwd_cache[entry.pw_name] = entry
307 def pwd_from_name(name):
308 """Return password database entry for name (may be a cached value).
309 Return None if no entry is found.
311 global _uid_to_pwd_cache, _name_to_pwd_cache
312 entry, cached = _cache_key_value(pwd.getpwnam, name, _name_to_pwd_cache)
313 if entry and not cached:
314 _uid_to_pwd_cache[entry.pw_uid] = entry
318 _gid_to_grp_cache = {}
319 _name_to_grp_cache = {}
321 def grp_from_gid(gid):
322 """Return password database entry for gid (may be a cached value).
323 Return None if no entry is found.
325 global _gid_to_grp_cache, _name_to_grp_cache
326 entry, cached = _cache_key_value(grp.getgrgid, gid, _gid_to_grp_cache)
327 if entry and not cached:
328 _name_to_grp_cache[entry.gr_name] = entry
332 def grp_from_name(name):
333 """Return password database entry for name (may be a cached value).
334 Return None if no entry is found.
336 global _gid_to_grp_cache, _name_to_grp_cache
337 entry, cached = _cache_key_value(grp.getgrnam, name, _name_to_grp_cache)
338 if entry and not cached:
339 _gid_to_grp_cache[entry.gr_gid] = entry
345 """Get the user's login name."""
349 _username = pwd_from_uid(uid)[0] or 'user%d' % uid
355 """Get the user's full name."""
357 if not _userfullname:
359 entry = pwd_from_uid(uid)
361 _userfullname = entry[4].split(',')[0] or entry[0]
362 if not _userfullname:
363 _userfullname = 'user%d' % uid
369 """Get the FQDN of this machine."""
372 _hostname = socket.getfqdn()
376 _resource_path = None
377 def resource_path(subdir=''):
378 global _resource_path
379 if not _resource_path:
380 _resource_path = os.environ.get('BUP_RESOURCE_PATH') or '.'
381 return os.path.join(_resource_path, subdir)
383 def format_filesize(size):
388 exponent = int(math.log(size) / math.log(unit))
389 size_prefix = "KMGTPE"[exponent - 1]
390 return "%.1f%s" % (size / math.pow(unit, exponent), size_prefix)
393 class NotOk(Exception):
398 def __init__(self, outp):
402 while self._read(65536): pass
404 def read(self, size):
405 """Read 'size' bytes from input stream."""
407 return self._read(size)
410 """Read from input stream until a newline is found."""
412 return self._readline()
414 def write(self, data):
415 """Write 'data' to output stream."""
416 #log('%d writing: %d bytes\n' % (os.getpid(), len(data)))
417 self.outp.write(data)
420 """Return true if input stream is readable."""
421 raise NotImplemented("Subclasses must implement has_input")
424 """Indicate end of output from last sent command."""
428 """Indicate server error to the client."""
429 s = re.sub(r'\s+', ' ', str(s))
430 self.write('\nerror %s\n' % s)
432 def _check_ok(self, onempty):
435 for rl in linereader(self):
436 #log('%d got line: %r\n' % (os.getpid(), rl))
437 if not rl: # empty line
441 elif rl.startswith('error '):
442 #log('client: error: %s\n' % rl[6:])
446 raise Exception('server exited unexpectedly; see errors above')
448 def drain_and_check_ok(self):
449 """Remove all data for the current command from input stream."""
452 return self._check_ok(onempty)
455 """Verify that server action completed successfully."""
457 raise Exception('expected "ok", got %r' % rl)
458 return self._check_ok(onempty)
461 class Conn(BaseConn):
462 def __init__(self, inp, outp):
463 BaseConn.__init__(self, outp)
466 def _read(self, size):
467 return self.inp.read(size)
470 return self.inp.readline()
473 [rl, wl, xl] = select.select([self.inp.fileno()], [], [], 0)
475 assert(rl[0] == self.inp.fileno())
481 def checked_reader(fd, n):
483 rl, _, _ = select.select([fd], [], [])
486 if not buf: raise Exception("Unexpected EOF reading %d more bytes" % n)
491 MAX_PACKET = 128 * 1024
492 def mux(p, outfd, outr, errr):
495 while p.poll() is None:
496 rl, _, _ = select.select(fds, [], [])
499 buf = os.read(outr, MAX_PACKET)
501 os.write(outfd, struct.pack('!IB', len(buf), 1) + buf)
503 buf = os.read(errr, 1024)
505 os.write(outfd, struct.pack('!IB', len(buf), 2) + buf)
507 os.write(outfd, struct.pack('!IB', 0, 3))
510 class DemuxConn(BaseConn):
511 """A helper class for bup's client-server protocol."""
512 def __init__(self, infd, outp):
513 BaseConn.__init__(self, outp)
514 # Anything that comes through before the sync string was not
515 # multiplexed and can be assumed to be debug/log before mux init.
517 while tail != 'BUPMUX':
518 b = os.read(infd, (len(tail) < 6) and (6-len(tail)) or 1)
520 raise IOError('demux: unexpected EOF during initialization')
522 sys.stderr.write(tail[:-6]) # pre-mux log messages
529 def write(self, data):
531 BaseConn.write(self, data)
533 def _next_packet(self, timeout):
534 if self.closed: return False
535 rl, wl, xl = select.select([self.infd], [], [], timeout)
536 if not rl: return False
537 assert(rl[0] == self.infd)
538 ns = ''.join(checked_reader(self.infd, 5))
539 n, fdw = struct.unpack('!IB', ns)
540 assert(n <= MAX_PACKET)
542 self.reader = checked_reader(self.infd, n)
544 for buf in checked_reader(self.infd, n):
545 sys.stderr.write(buf)
548 debug2("DemuxConn: marked closed\n")
551 def _load_buf(self, timeout):
552 if self.buf is not None:
554 while not self.closed:
555 while not self.reader:
556 if not self._next_packet(timeout):
559 self.buf = self.reader.next()
561 except StopIteration:
565 def _read_parts(self, ix_fn):
566 while self._load_buf(None):
567 assert(self.buf is not None)
569 if i is None or i == len(self.buf):
574 self.buf = self.buf[i:]
582 return buf.index('\n')+1
585 return ''.join(self._read_parts(find_eol))
587 def _read(self, size):
589 def until_size(buf): # Closes on csize
590 if len(buf) < csize[0]:
595 return ''.join(self._read_parts(until_size))
598 return self._load_buf(0)
602 """Generate a list of input lines from 'f' without terminating newlines."""
610 def chunkyreader(f, count = None):
611 """Generate a list of chunks of data read from 'f'.
613 If count is None, read until EOF is reached.
615 If count is a positive integer, read 'count' bytes from 'f'. If EOF is
616 reached while reading, raise IOError.
620 b = f.read(min(count, 65536))
622 raise IOError('EOF with %d bytes remaining' % count)
633 def atomically_replaced_file(name, mode='w', buffering=-1):
634 """Yield a file that will be atomically renamed name when leaving the block.
636 This contextmanager yields an open file object that is backed by a
637 temporary file which will be renamed (atomically) to the target
638 name if everything succeeds.
640 The mode and buffering arguments are handled exactly as with open,
641 and the yielded file will have very restrictive permissions, as
646 with atomically_replaced_file('foo.txt', 'w') as f:
647 f.write('hello jack.')
651 (ffd, tempname) = tempfile.mkstemp(dir=os.path.dirname(name),
652 text=('b' not in mode))
655 f = os.fdopen(ffd, mode, buffering)
663 os.rename(tempname, name)
665 unlink(tempname) # nonexistant file is ignored
669 """Append "/" to 's' if it doesn't aleady end in "/"."""
670 if s and not s.endswith('/'):
676 def _mmap_do(f, sz, flags, prot, close):
678 st = os.fstat(f.fileno())
681 # trying to open a zero-length map gives an error, but an empty
682 # string has all the same behaviour of a zero-length map, ie. it has
685 map = mmap.mmap(f.fileno(), sz, flags, prot)
687 f.close() # map will persist beyond file close
691 def mmap_read(f, sz = 0, close=True):
692 """Create a read-only memory mapped region on file 'f'.
693 If sz is 0, the region will cover the entire file.
695 return _mmap_do(f, sz, mmap.MAP_PRIVATE, mmap.PROT_READ, close)
698 def mmap_readwrite(f, sz = 0, close=True):
699 """Create a read-write memory mapped region on file 'f'.
700 If sz is 0, the region will cover the entire file.
702 return _mmap_do(f, sz, mmap.MAP_SHARED, mmap.PROT_READ|mmap.PROT_WRITE,
706 def mmap_readwrite_private(f, sz = 0, close=True):
707 """Create a read-write memory mapped region on file 'f'.
708 If sz is 0, the region will cover the entire file.
709 The map is private, which means the changes are never flushed back to the
712 return _mmap_do(f, sz, mmap.MAP_PRIVATE, mmap.PROT_READ|mmap.PROT_WRITE,
716 def parse_timestamp(epoch_str):
717 """Return the number of nanoseconds since the epoch that are described
718 by epoch_str (100ms, 100ns, ...); when epoch_str cannot be parsed,
719 throw a ValueError that may contain additional information."""
720 ns_per = {'s' : 1000000000,
724 match = re.match(r'^((?:[-+]?[0-9]+)?)(s|ms|us|ns)$', epoch_str)
726 if re.match(r'^([-+]?[0-9]+)$', epoch_str):
727 raise ValueError('must include units, i.e. 100ns, 100ms, ...')
729 (n, units) = match.group(1, 2)
733 return n * ns_per[units]
737 """Parse data size information into a float number.
739 Here are some examples of conversions:
740 199.2k means 203981 bytes
741 1GB means 1073741824 bytes
742 2.1 tb means 2199023255552 bytes
744 g = re.match(r'([-+\d.e]+)\s*(\w*)', str(s))
746 raise ValueError("can't parse %r as a number" % s)
747 (val, unit) = g.groups()
750 if unit in ['t', 'tb']:
751 mult = 1024*1024*1024*1024
752 elif unit in ['g', 'gb']:
753 mult = 1024*1024*1024
754 elif unit in ['m', 'mb']:
756 elif unit in ['k', 'kb']:
758 elif unit in ['', 'b']:
761 raise ValueError("invalid unit %r in number %r" % (unit, s))
766 """Count the number of elements in an iterator. (consumes the iterator)"""
767 return reduce(lambda x,y: x+1, l)
772 """Append an error message to the list of saved errors.
774 Once processing is able to stop and output the errors, the saved errors are
775 accessible in the module variable helpers.saved_errors.
777 saved_errors.append(e)
787 """Replace the default exception handler for KeyboardInterrupt (Ctrl-C).
789 The new exception handler will make sure that bup will exit without an ugly
790 stacktrace when Ctrl-C is hit.
792 oldhook = sys.excepthook
793 def newhook(exctype, value, traceback):
794 if exctype == KeyboardInterrupt:
795 log('\nInterrupted.\n')
797 return oldhook(exctype, value, traceback)
798 sys.excepthook = newhook
801 def columnate(l, prefix):
802 """Format elements of 'l' in columns with 'prefix' leading each line.
804 The number of columns is determined automatically based on the string
810 clen = max(len(s) for s in l)
811 ncols = (tty_width() - len(prefix)) / (clen + 2)
816 while len(l) % ncols:
819 for s in range(0, len(l), rows):
820 cols.append(l[s:s+rows])
822 for row in zip(*cols):
823 out += prefix + ''.join(('%-*s' % (clen+2, s)) for s in row) + '\n'
827 def parse_date_or_fatal(str, fatal):
828 """Parses the given date or calls Option.fatal().
829 For now we expect a string that contains a float."""
832 except ValueError, e:
833 raise fatal('invalid date format (should be a float): %r' % e)
838 def parse_excludes(options, fatal):
839 """Traverse the options and extract all excludes, or call Option.fatal()."""
843 (option, parameter) = flag
844 if option == '--exclude':
845 excluded_paths.append(realpath(parameter))
846 elif option == '--exclude-from':
848 f = open(realpath(parameter))
850 raise fatal("couldn't read %s" % parameter)
851 for exclude_path in f.readlines():
852 # FIXME: perhaps this should be rstrip('\n')
853 exclude_path = realpath(exclude_path.strip())
855 excluded_paths.append(exclude_path)
856 return sorted(frozenset(excluded_paths))
859 def parse_rx_excludes(options, fatal):
860 """Traverse the options and extract all rx excludes, or call
862 excluded_patterns = []
865 (option, parameter) = flag
866 if option == '--exclude-rx':
868 excluded_patterns.append(re.compile(parameter))
870 fatal('invalid --exclude-rx pattern (%s): %s' % (parameter, ex))
871 elif option == '--exclude-rx-from':
873 f = open(realpath(parameter))
875 raise fatal("couldn't read %s" % parameter)
876 for pattern in f.readlines():
877 spattern = pattern.rstrip('\n')
881 excluded_patterns.append(re.compile(spattern))
883 fatal('invalid --exclude-rx pattern (%s): %s' % (spattern, ex))
884 return excluded_patterns
887 def should_rx_exclude_path(path, exclude_rxs):
888 """Return True if path matches a regular expression in exclude_rxs."""
889 for rx in exclude_rxs:
891 debug1('Skipping %r: excluded by rx pattern %r.\n'
892 % (path, rx.pattern))
897 # FIXME: Carefully consider the use of functions (os.path.*, etc.)
898 # that resolve against the current filesystem in the strip/graft
899 # functions for example, but elsewhere as well. I suspect bup's not
900 # always being careful about that. For some cases, the contents of
901 # the current filesystem should be irrelevant, and consulting it might
902 # produce the wrong result, perhaps via unintended symlink resolution,
905 def path_components(path):
906 """Break path into a list of pairs of the form (name,
907 full_path_to_name). Path must start with '/'.
909 '/home/foo' -> [('', '/'), ('home', '/home'), ('foo', '/home/foo')]"""
910 if not path.startswith('/'):
911 raise Exception, 'path must start with "/": %s' % path
912 # Since we assume path startswith('/'), we can skip the first element.
914 norm_path = os.path.abspath(path)
918 for p in norm_path.split('/')[1:]:
920 result.append((p, full_path))
924 def stripped_path_components(path, strip_prefixes):
925 """Strip any prefix in strip_prefixes from path and return a list
926 of path components where each component is (name,
927 none_or_full_fs_path_to_name). Assume path startswith('/').
928 See thelpers.py for examples."""
929 normalized_path = os.path.abspath(path)
930 sorted_strip_prefixes = sorted(strip_prefixes, key=len, reverse=True)
931 for bp in sorted_strip_prefixes:
932 normalized_bp = os.path.abspath(bp)
933 if normalized_path.startswith(normalized_bp):
934 prefix = normalized_path[:len(normalized_bp)]
936 for p in normalized_path[len(normalized_bp):].split('/'):
940 result.append((p, prefix))
943 return path_components(path)
946 def grafted_path_components(graft_points, path):
947 # Create a result that consists of some number of faked graft
948 # directories before the graft point, followed by all of the real
949 # directories from path that are after the graft point. Arrange
950 # for the directory at the graft point in the result to correspond
951 # to the "orig" directory in --graft orig=new. See t/thelpers.py
954 # Note that given --graft orig=new, orig and new have *nothing* to
955 # do with each other, even if some of their component names
956 # match. i.e. --graft /foo/bar/baz=/foo/bar/bax is semantically
957 # equivalent to --graft /foo/bar/baz=/x/y/z, or even
960 # FIXME: This can't be the best solution...
961 clean_path = os.path.abspath(path)
962 for graft_point in graft_points:
963 old_prefix, new_prefix = graft_point
964 # Expand prefixes iff not absolute paths.
965 old_prefix = os.path.normpath(old_prefix)
966 new_prefix = os.path.normpath(new_prefix)
967 if clean_path.startswith(old_prefix):
968 escaped_prefix = re.escape(old_prefix)
969 grafted_path = re.sub(r'^' + escaped_prefix, new_prefix, clean_path)
970 # Handle /foo=/ (at least) -- which produces //whatever.
971 grafted_path = '/' + grafted_path.lstrip('/')
972 clean_path_components = path_components(clean_path)
973 # Count the components that were stripped.
974 strip_count = 0 if old_prefix == '/' else old_prefix.count('/')
975 new_prefix_parts = new_prefix.split('/')
976 result_prefix = grafted_path.split('/')[:new_prefix.count('/')]
977 result = [(p, None) for p in result_prefix] \
978 + clean_path_components[strip_count:]
979 # Now set the graft point name to match the end of new_prefix.
980 graft_point = len(result_prefix)
981 result[graft_point] = \
982 (new_prefix_parts[-1], clean_path_components[strip_count][1])
983 if new_prefix == '/': # --graft ...=/ is a special case.
986 return path_components(clean_path)