1 """Helper functions and classes for bup."""
3 from ctypes import sizeof, c_void_p
5 from contextlib import contextmanager
6 import sys, os, pwd, subprocess, errno, socket, select, mmap, stat, re, struct
7 import hashlib, heapq, operator, time, grp, tempfile
9 from bup import _helpers
10 import bup._helpers as _helpers
13 # This function should really be in helpers, not in bup.options. But we
14 # want options.py to be standalone so people can include it in other projects.
15 from bup.options import _tty_width
16 tty_width = _tty_width
20 """Convert the string 's' to an integer. Return 0 if s is not a number."""
28 """Convert the string 's' to a float. Return 0 if s is not a number."""
30 return float(s or '0')
35 buglvl = atoi(os.environ.get('BUP_DEBUG', 0))
38 # If the platform doesn't have fdatasync (OS X), fall back to fsync.
40 fdatasync = os.fdatasync
41 except AttributeError:
45 # Write (blockingly) to sockets that may or may not be in blocking mode.
46 # We need this because our stderr is sometimes eaten by subprocesses
47 # (probably ssh) that sometimes make it nonblocking, if only temporarily,
48 # leading to race conditions. Ick. We'll do it the hard way.
49 def _hard_write(fd, buf):
51 (r,w,x) = select.select([], [fd], [], None)
53 raise IOError('select(fd) returned without being writable')
55 sz = os.write(fd, buf)
57 if e.errno != errno.EAGAIN:
63 istty1 = os.isatty(1) or (atoi(os.environ.get('BUP_FORCE_TTY')) & 1)
64 istty2 = os.isatty(2) or (atoi(os.environ.get('BUP_FORCE_TTY')) & 2)
69 def log(s, is_prog=False, is_prog_end=False):
70 """Print a log message to stderr."""
84 _hard_write(sys.stderr.fileno(), s)
89 """Write a level 1 debug message to stderr"""
95 """Write a level 2 debug message to stderr"""
100 def progress_update(s, always=True):
101 """Update progress messages on stderr.
103 Calls log() if stderr is connected to a TTY and we haven't printed
104 progress messages in a while. This avoids overloading the stderr buffer
110 if always or now - _last_prog > 0.1:
116 """End progress messages on stderr.
118 Calls log() if stderr is connected to a TTY and makes sure, that this
119 message is kept even if more/other messages follow using progress_update().
125 def mkdirp(d, mode=None):
126 """Recursively create directories on path 'd'.
128 Unlike os.makedirs(), it doesn't raise an exception if the last element of
129 the path already exists.
137 if e.errno == errno.EEXIST:
143 _unspecified_next_default = object()
145 def _fallback_next(it, default=_unspecified_next_default):
146 """Retrieve the next item from the iterator by calling its
147 next() method. If default is given, it is returned if the
148 iterator is exhausted, otherwise StopIteration is raised."""
150 if default is _unspecified_next_default:
155 except StopIteration:
158 if sys.version_info < (2, 6):
159 next = _fallback_next
162 def merge_iter(iters, pfreq, pfunc, pfinal, key=None):
164 samekey = lambda e, pe: getattr(e, key) == getattr(pe, key, None)
166 samekey = operator.eq
168 total = sum(len(it) for it in iters)
169 iters = (iter(it) for it in iters)
170 heap = ((next(it, None),it) for it in iters)
171 heap = [(e,it) for e,it in heap if e]
176 if not count % pfreq:
179 if not samekey(e, pe):
184 e = it.next() # Don't use next() function, it's too expensive
185 except StopIteration:
186 heapq.heappop(heap) # remove current
188 heapq.heapreplace(heap, (e, it)) # shift current to new location
193 """Delete a file at path 'f' if it currently exists.
195 Unlike os.unlink(), does not throw an exception if the file didn't already
201 if e.errno != errno.ENOENT:
205 def readpipe(argv, preexec_fn=None):
206 """Run a subprocess and return its output."""
207 p = subprocess.Popen(argv, stdout=subprocess.PIPE, preexec_fn=preexec_fn)
208 out, err = p.communicate()
209 if p.returncode != 0:
210 raise Exception('subprocess %r failed with status %d'
211 % (' '.join(argv), p.returncode))
215 def _argmax_base(command):
218 base_size += len(command) + 1
219 for k, v in environ.iteritems():
220 base_size += len(k) + len(v) + 2 + sizeof(c_void_p)
224 def _argmax_args_size(args):
225 return sum(len(x) + 1 + sizeof(c_void_p) for x in args)
228 def batchpipe(command, args, preexec_fn=None, arg_max=_helpers.SC_ARG_MAX):
229 """If args is not empty, yield the output produced by calling the
230 command list with args as a sequence of strings (It may be necessary
231 to return multiple strings in order to respect ARG_MAX)."""
232 # The optional arg_max arg is a workaround for an issue with the
233 # current wvtest behavior.
234 base_size = _argmax_base(command)
236 room = arg_max - base_size
239 next_size = _argmax_args_size(args[i:i+1])
240 if room - next_size < 0:
246 assert(len(sub_args))
247 yield readpipe(command + sub_args, preexec_fn=preexec_fn)
251 """Get the absolute path of a file.
253 Behaves like os.path.realpath, but doesn't follow a symlink for the last
254 element. (ie. if 'p' itself is a symlink, this one won't follow it, but it
255 will follow symlinks in p's directory)
261 if st and stat.S_ISLNK(st.st_mode):
262 (dir, name) = os.path.split(p)
263 dir = os.path.realpath(dir)
264 out = os.path.join(dir, name)
266 out = os.path.realpath(p)
267 #log('realpathing:%r,%r' % (p, out))
271 def detect_fakeroot():
272 "Return True if we appear to be running under fakeroot."
273 return os.getenv("FAKEROOTKEY") != None
277 if sys.platform.startswith('cygwin'):
279 return ctypes.cdll.shell32.IsUserAnAdmin()
281 return os.geteuid() == 0
284 def _cache_key_value(get_value, key, cache):
285 """Return (value, was_cached). If there is a value in the cache
286 for key, use that, otherwise, call get_value(key) which should
287 throw a KeyError if there is no value -- in which case the cached
288 and returned value will be None.
290 try: # Do we already have it (or know there wasn't one)?
297 cache[key] = value = get_value(key)
303 _uid_to_pwd_cache = {}
304 _name_to_pwd_cache = {}
306 def pwd_from_uid(uid):
307 """Return password database entry for uid (may be a cached value).
308 Return None if no entry is found.
310 global _uid_to_pwd_cache, _name_to_pwd_cache
311 entry, cached = _cache_key_value(pwd.getpwuid, uid, _uid_to_pwd_cache)
312 if entry and not cached:
313 _name_to_pwd_cache[entry.pw_name] = entry
317 def pwd_from_name(name):
318 """Return password database entry for name (may be a cached value).
319 Return None if no entry is found.
321 global _uid_to_pwd_cache, _name_to_pwd_cache
322 entry, cached = _cache_key_value(pwd.getpwnam, name, _name_to_pwd_cache)
323 if entry and not cached:
324 _uid_to_pwd_cache[entry.pw_uid] = entry
328 _gid_to_grp_cache = {}
329 _name_to_grp_cache = {}
331 def grp_from_gid(gid):
332 """Return password database entry for gid (may be a cached value).
333 Return None if no entry is found.
335 global _gid_to_grp_cache, _name_to_grp_cache
336 entry, cached = _cache_key_value(grp.getgrgid, gid, _gid_to_grp_cache)
337 if entry and not cached:
338 _name_to_grp_cache[entry.gr_name] = entry
342 def grp_from_name(name):
343 """Return password database entry for name (may be a cached value).
344 Return None if no entry is found.
346 global _gid_to_grp_cache, _name_to_grp_cache
347 entry, cached = _cache_key_value(grp.getgrnam, name, _name_to_grp_cache)
348 if entry and not cached:
349 _gid_to_grp_cache[entry.gr_gid] = entry
355 """Get the user's login name."""
359 _username = pwd_from_uid(uid)[0] or 'user%d' % uid
365 """Get the user's full name."""
367 if not _userfullname:
369 entry = pwd_from_uid(uid)
371 _userfullname = entry[4].split(',')[0] or entry[0]
372 if not _userfullname:
373 _userfullname = 'user%d' % uid
379 """Get the FQDN of this machine."""
382 _hostname = socket.getfqdn()
386 _resource_path = None
387 def resource_path(subdir=''):
388 global _resource_path
389 if not _resource_path:
390 _resource_path = os.environ.get('BUP_RESOURCE_PATH') or '.'
391 return os.path.join(_resource_path, subdir)
393 def format_filesize(size):
398 exponent = int(math.log(size) / math.log(unit))
399 size_prefix = "KMGTPE"[exponent - 1]
400 return "%.1f%s" % (size / math.pow(unit, exponent), size_prefix)
403 class NotOk(Exception):
408 def __init__(self, outp):
412 while self._read(65536): pass
414 def read(self, size):
415 """Read 'size' bytes from input stream."""
417 return self._read(size)
420 """Read from input stream until a newline is found."""
422 return self._readline()
424 def write(self, data):
425 """Write 'data' to output stream."""
426 #log('%d writing: %d bytes' % (os.getpid(), len(data)))
427 self.outp.write(data)
430 """Return true if input stream is readable."""
431 raise NotImplemented("Subclasses must implement has_input")
434 """Indicate end of output from last sent command."""
438 """Indicate server error to the client."""
439 s = re.sub(r'\s+', ' ', str(s))
440 self.write('\nerror %s\n' % s)
442 def _check_ok(self, onempty):
445 for rl in linereader(self):
446 #log('%d got line: %r' % (os.getpid(), rl))
447 if not rl: # empty line
451 elif rl.startswith('error '):
452 #log('client: error: %s' % rl[6:])
456 raise Exception('server exited unexpectedly; see errors above')
458 def drain_and_check_ok(self):
459 """Remove all data for the current command from input stream."""
462 return self._check_ok(onempty)
465 """Verify that server action completed successfully."""
467 raise Exception('expected "ok", got %r' % rl)
468 return self._check_ok(onempty)
471 class Conn(BaseConn):
472 def __init__(self, inp, outp):
473 BaseConn.__init__(self, outp)
476 def _read(self, size):
477 return self.inp.read(size)
480 return self.inp.readline()
483 [rl, wl, xl] = select.select([self.inp.fileno()], [], [], 0)
485 assert(rl[0] == self.inp.fileno())
491 def checked_reader(fd, n):
493 rl, _, _ = select.select([fd], [], [])
496 if not buf: raise Exception("Unexpected EOF reading %d more bytes" % n)
501 MAX_PACKET = 128 * 1024
502 def mux(p, outfd, outr, errr):
505 while p.poll() is None:
506 rl, _, _ = select.select(fds, [], [])
509 buf = os.read(outr, MAX_PACKET)
511 os.write(outfd, struct.pack('!IB', len(buf), 1) + buf)
513 buf = os.read(errr, 1024)
515 os.write(outfd, struct.pack('!IB', len(buf), 2) + buf)
517 os.write(outfd, struct.pack('!IB', 0, 3))
520 class DemuxConn(BaseConn):
521 """A helper class for bup's client-server protocol."""
522 def __init__(self, infd, outp):
523 BaseConn.__init__(self, outp)
524 # Anything that comes through before the sync string was not
525 # multiplexed and can be assumed to be debug/log before mux init.
527 while tail != 'BUPMUX':
528 b = os.read(infd, (len(tail) < 6) and (6-len(tail)) or 1)
530 raise IOError('demux: unexpected EOF during initialization')
532 sys.stderr.write(tail[:-6]) # pre-mux log messages
539 def write(self, data):
541 BaseConn.write(self, data)
543 def _next_packet(self, timeout):
544 if self.closed: return False
545 rl, wl, xl = select.select([self.infd], [], [], timeout)
546 if not rl: return False
547 assert(rl[0] == self.infd)
548 ns = ''.join(checked_reader(self.infd, 5))
549 n, fdw = struct.unpack('!IB', ns)
550 assert(n <= MAX_PACKET)
552 self.reader = checked_reader(self.infd, n)
554 for buf in checked_reader(self.infd, n):
555 sys.stderr.write(buf)
558 debug2("DemuxConn: Marked closed.")
561 def _load_buf(self, timeout):
562 if self.buf is not None:
564 while not self.closed:
565 while not self.reader:
566 if not self._next_packet(timeout):
569 self.buf = self.reader.next()
571 except StopIteration:
575 def _read_parts(self, ix_fn):
576 while self._load_buf(None):
577 assert(self.buf is not None)
579 if i is None or i == len(self.buf):
584 self.buf = self.buf[i:]
592 return buf.index('\n')+1
595 return ''.join(self._read_parts(find_eol))
597 def _read(self, size):
599 def until_size(buf): # Closes on csize
600 if len(buf) < csize[0]:
605 return ''.join(self._read_parts(until_size))
608 return self._load_buf(0)
612 """Generate a list of input lines from 'f' without terminating newlines."""
620 def chunkyreader(f, count = None):
621 """Generate a list of chunks of data read from 'f'.
623 If count is None, read until EOF is reached.
625 If count is a positive integer, read 'count' bytes from 'f'. If EOF is
626 reached while reading, raise IOError.
630 b = f.read(min(count, 65536))
632 raise IOError('EOF with %d bytes remaining' % count)
643 def atomically_replaced_file(name, mode='w', buffering=-1):
644 """Yield a file that will be atomically renamed name when leaving the block.
646 This contextmanager yields an open file object that is backed by a
647 temporary file which will be renamed (atomically) to the target
648 name if everything succeeds.
650 The mode and buffering arguments are handled exactly as with open,
651 and the yielded file will have very restrictive permissions, as
656 with atomically_replaced_file('foo.txt', 'w') as f:
657 f.write('hello jack.')
661 (ffd, tempname) = tempfile.mkstemp(dir=os.path.dirname(name),
662 text=('b' not in mode))
665 f = os.fdopen(ffd, mode, buffering)
673 os.rename(tempname, name)
675 unlink(tempname) # nonexistant file is ignored
679 """Append "/" to 's' if it doesn't aleady end in "/"."""
680 if s and not s.endswith('/'):
686 def _mmap_do(f, sz, flags, prot, close):
688 st = os.fstat(f.fileno())
691 # trying to open a zero-length map gives an error, but an empty
692 # string has all the same behaviour of a zero-length map, ie. it has
695 map = mmap.mmap(f.fileno(), sz, flags, prot)
697 f.close() # map will persist beyond file close
701 def mmap_read(f, sz = 0, close=True):
702 """Create a read-only memory mapped region on file 'f'.
703 If sz is 0, the region will cover the entire file.
705 return _mmap_do(f, sz, mmap.MAP_PRIVATE, mmap.PROT_READ, close)
708 def mmap_readwrite(f, sz = 0, close=True):
709 """Create a read-write memory mapped region on file 'f'.
710 If sz is 0, the region will cover the entire file.
712 return _mmap_do(f, sz, mmap.MAP_SHARED, mmap.PROT_READ|mmap.PROT_WRITE,
716 def mmap_readwrite_private(f, sz = 0, close=True):
717 """Create a read-write memory mapped region on file 'f'.
718 If sz is 0, the region will cover the entire file.
719 The map is private, which means the changes are never flushed back to the
722 return _mmap_do(f, sz, mmap.MAP_PRIVATE, mmap.PROT_READ|mmap.PROT_WRITE,
726 def parse_timestamp(epoch_str):
727 """Return the number of nanoseconds since the epoch that are described
728 by epoch_str (100ms, 100ns, ...); when epoch_str cannot be parsed,
729 throw a ValueError that may contain additional information."""
730 ns_per = {'s' : 1000000000,
734 match = re.match(r'^((?:[-+]?[0-9]+)?)(s|ms|us|ns)$', epoch_str)
736 if re.match(r'^([-+]?[0-9]+)$', epoch_str):
737 raise ValueError('must include units, i.e. 100ns, 100ms, ...')
739 (n, units) = match.group(1, 2)
743 return n * ns_per[units]
747 """Parse data size information into a float number.
749 Here are some examples of conversions:
750 199.2k means 203981 bytes
751 1GB means 1073741824 bytes
752 2.1 tb means 2199023255552 bytes
754 g = re.match(r'([-+\d.e]+)\s*(\w*)', str(s))
756 raise ValueError("can't parse %r as a number" % s)
757 (val, unit) = g.groups()
760 if unit in ['t', 'tb']:
761 mult = 1024*1024*1024*1024
762 elif unit in ['g', 'gb']:
763 mult = 1024*1024*1024
764 elif unit in ['m', 'mb']:
766 elif unit in ['k', 'kb']:
768 elif unit in ['', 'b']:
771 raise ValueError("invalid unit %r in number %r" % (unit, s))
776 """Count the number of elements in an iterator. (consumes the iterator)"""
777 return reduce(lambda x,y: x+1, l)
782 """Append an error message to the list of saved errors.
784 Once processing is able to stop and output the errors, the saved errors are
785 accessible in the module variable helpers.saved_errors.
787 saved_errors.append(e)
796 def check_saved_errors():
799 log('Warning: %d error%s encountered!'
800 % (len(saved_errors), len(saved_errors)!=1 and 's' or ''))
807 """Replace the default exception handler for KeyboardInterrupt (Ctrl-C).
809 The new exception handler will make sure that bup will exit without an ugly
810 stacktrace when Ctrl-C is hit.
812 oldhook = sys.excepthook
813 def newhook(exctype, value, traceback):
814 if exctype == KeyboardInterrupt:
817 return oldhook(exctype, value, traceback)
818 sys.excepthook = newhook
821 def columnate(l, prefix):
822 """Format elements of 'l' in columns with 'prefix' leading each line.
824 The number of columns is determined automatically based on the string
830 clen = max(len(s) for s in l)
831 ncols = (tty_width() - len(prefix)) / (clen + 2)
836 while len(l) % ncols:
839 for s in range(0, len(l), rows):
840 cols.append(l[s:s+rows])
842 for row in zip(*cols):
843 out += prefix + ''.join(('%-*s' % (clen+2, s)) for s in row) + '\n'
847 def parse_date_or_fatal(str, fatal):
848 """Parses the given date or calls Option.fatal().
849 For now we expect a string that contains a float."""
852 except ValueError, e:
853 raise fatal('invalid date format (should be a float): %r' % e)
858 def parse_excludes(options, fatal):
859 """Traverse the options and extract all excludes, or call Option.fatal()."""
863 (option, parameter) = flag
864 if option == '--exclude':
865 excluded_paths.append(realpath(parameter))
866 elif option == '--exclude-from':
868 f = open(realpath(parameter))
870 raise fatal("couldn't read %s" % parameter)
871 for exclude_path in f.readlines():
872 # FIXME: perhaps this should be rstrip('\n')
873 exclude_path = realpath(exclude_path.strip())
875 excluded_paths.append(exclude_path)
876 return sorted(frozenset(excluded_paths))
879 def parse_rx_excludes(options, fatal):
880 """Traverse the options and extract all rx excludes, or call
882 excluded_patterns = []
885 (option, parameter) = flag
886 if option == '--exclude-rx':
888 excluded_patterns.append(re.compile(parameter))
890 fatal('Invalid --exclude-rx pattern (%s): %s!' % (parameter, ex))
891 elif option == '--exclude-rx-from':
893 f = open(realpath(parameter))
895 raise fatal("couldn't read %s" % parameter)
896 for pattern in f.readlines():
897 spattern = pattern.rstrip('\n')
901 excluded_patterns.append(re.compile(spattern))
903 fatal('Invalid --exclude-rx pattern (%s): %s!' % (spattern, ex))
904 return excluded_patterns
907 def should_rx_exclude_path(path, exclude_rxs):
908 """Return True if path matches a regular expression in exclude_rxs."""
909 for rx in exclude_rxs:
911 debug1('Skipping %r: excluded by rx pattern %r.'
912 % (path, rx.pattern))
917 # FIXME: Carefully consider the use of functions (os.path.*, etc.)
918 # that resolve against the current filesystem in the strip/graft
919 # functions for example, but elsewhere as well. I suspect bup's not
920 # always being careful about that. For some cases, the contents of
921 # the current filesystem should be irrelevant, and consulting it might
922 # produce the wrong result, perhaps via unintended symlink resolution,
925 def path_components(path):
926 """Break path into a list of pairs of the form (name,
927 full_path_to_name). Path must start with '/'.
929 '/home/foo' -> [('', '/'), ('home', '/home'), ('foo', '/home/foo')]"""
930 if not path.startswith('/'):
931 raise Exception, 'path must start with "/": %s' % path
932 # Since we assume path startswith('/'), we can skip the first element.
934 norm_path = os.path.abspath(path)
938 for p in norm_path.split('/')[1:]:
940 result.append((p, full_path))
944 def stripped_path_components(path, strip_prefixes):
945 """Strip any prefix in strip_prefixes from path and return a list
946 of path components where each component is (name,
947 none_or_full_fs_path_to_name). Assume path startswith('/').
948 See thelpers.py for examples."""
949 normalized_path = os.path.abspath(path)
950 sorted_strip_prefixes = sorted(strip_prefixes, key=len, reverse=True)
951 for bp in sorted_strip_prefixes:
952 normalized_bp = os.path.abspath(bp)
953 if normalized_path.startswith(normalized_bp):
954 prefix = normalized_path[:len(normalized_bp)]
956 for p in normalized_path[len(normalized_bp):].split('/'):
960 result.append((p, prefix))
963 return path_components(path)
966 def grafted_path_components(graft_points, path):
967 # Create a result that consists of some number of faked graft
968 # directories before the graft point, followed by all of the real
969 # directories from path that are after the graft point. Arrange
970 # for the directory at the graft point in the result to correspond
971 # to the "orig" directory in --graft orig=new. See t/thelpers.py
974 # Note that given --graft orig=new, orig and new have *nothing* to
975 # do with each other, even if some of their component names
976 # match. i.e. --graft /foo/bar/baz=/foo/bar/bax is semantically
977 # equivalent to --graft /foo/bar/baz=/x/y/z, or even
980 # FIXME: This can't be the best solution...
981 clean_path = os.path.abspath(path)
982 for graft_point in graft_points:
983 old_prefix, new_prefix = graft_point
984 # Expand prefixes iff not absolute paths.
985 old_prefix = os.path.normpath(old_prefix)
986 new_prefix = os.path.normpath(new_prefix)
987 if clean_path.startswith(old_prefix):
988 escaped_prefix = re.escape(old_prefix)
989 grafted_path = re.sub(r'^' + escaped_prefix, new_prefix, clean_path)
990 # Handle /foo=/ (at least) -- which produces //whatever.
991 grafted_path = '/' + grafted_path.lstrip('/')
992 clean_path_components = path_components(clean_path)
993 # Count the components that were stripped.
994 strip_count = 0 if old_prefix == '/' else old_prefix.count('/')
995 new_prefix_parts = new_prefix.split('/')
996 result_prefix = grafted_path.split('/')[:new_prefix.count('/')]
997 result = [(p, None) for p in result_prefix] \
998 + clean_path_components[strip_count:]
999 # Now set the graft point name to match the end of new_prefix.
1000 graft_point = len(result_prefix)
1001 result[graft_point] = \
1002 (new_prefix_parts[-1], clean_path_components[strip_count][1])
1003 if new_prefix == '/': # --graft ...=/ is a special case.
1006 return path_components(clean_path)