1 """Helper functions and classes for bup."""
3 from ctypes import sizeof, c_void_p
5 import sys, os, pwd, subprocess, errno, socket, select, mmap, stat, re, struct
6 import hashlib, heapq, operator, time, grp
8 from bup import _helpers
9 import bup._helpers as _helpers
12 # This function should really be in helpers, not in bup.options. But we
13 # want options.py to be standalone so people can include it in other projects.
14 from bup.options import _tty_width
15 tty_width = _tty_width
19 """Convert the string 's' to an integer. Return 0 if s is not a number."""
27 """Convert the string 's' to a float. Return 0 if s is not a number."""
29 return float(s or '0')
34 buglvl = atoi(os.environ.get('BUP_DEBUG', 0))
37 # If the platform doesn't have fdatasync (OS X), fall back to fsync.
39 fdatasync = os.fdatasync
40 except AttributeError:
44 # Write (blockingly) to sockets that may or may not be in blocking mode.
45 # We need this because our stderr is sometimes eaten by subprocesses
46 # (probably ssh) that sometimes make it nonblocking, if only temporarily,
47 # leading to race conditions. Ick. We'll do it the hard way.
48 def _hard_write(fd, buf):
50 (r,w,x) = select.select([], [fd], [], None)
52 raise IOError('select(fd) returned without being writable')
54 sz = os.write(fd, buf)
56 if e.errno != errno.EAGAIN:
64 """Print a log message to stderr."""
67 _hard_write(sys.stderr.fileno(), s)
81 istty1 = os.isatty(1) or (atoi(os.environ.get('BUP_FORCE_TTY')) & 1)
82 istty2 = os.isatty(2) or (atoi(os.environ.get('BUP_FORCE_TTY')) & 2)
85 """Calls log() if stderr is a TTY. Does nothing otherwise."""
93 """Calls progress() only if we haven't printed progress in a while.
95 This avoids overloading the stderr buffer with excess junk.
99 if now - _last_prog > 0.1:
105 """Calls progress() to redisplay the most recent progress message.
107 Useful after you've printed some other message that wipes out the
110 if _last_progress and _last_progress.endswith('\r'):
111 progress(_last_progress)
114 def mkdirp(d, mode=None):
115 """Recursively create directories on path 'd'.
117 Unlike os.makedirs(), it doesn't raise an exception if the last element of
118 the path already exists.
126 if e.errno == errno.EEXIST:
132 _unspecified_next_default = object()
134 def _fallback_next(it, default=_unspecified_next_default):
135 """Retrieve the next item from the iterator by calling its
136 next() method. If default is given, it is returned if the
137 iterator is exhausted, otherwise StopIteration is raised."""
139 if default is _unspecified_next_default:
144 except StopIteration:
147 if sys.version_info < (2, 6):
148 next = _fallback_next
151 def merge_iter(iters, pfreq, pfunc, pfinal, key=None):
153 samekey = lambda e, pe: getattr(e, key) == getattr(pe, key, None)
155 samekey = operator.eq
157 total = sum(len(it) for it in iters)
158 iters = (iter(it) for it in iters)
159 heap = ((next(it, None),it) for it in iters)
160 heap = [(e,it) for e,it in heap if e]
165 if not count % pfreq:
168 if not samekey(e, pe):
173 e = it.next() # Don't use next() function, it's too expensive
174 except StopIteration:
175 heapq.heappop(heap) # remove current
177 heapq.heapreplace(heap, (e, it)) # shift current to new location
182 """Delete a file at path 'f' if it currently exists.
184 Unlike os.unlink(), does not throw an exception if the file didn't already
190 if e.errno == errno.ENOENT:
191 pass # it doesn't exist, that's what you asked for
194 def readpipe(argv, preexec_fn=None):
195 """Run a subprocess and return its output."""
196 p = subprocess.Popen(argv, stdout=subprocess.PIPE, preexec_fn=preexec_fn)
197 out, err = p.communicate()
198 if p.returncode != 0:
199 raise Exception('subprocess %r failed with status %d'
200 % (' '.join(argv), p.returncode))
204 def _argmax_base(command):
207 base_size += len(command) + 1
208 for k, v in environ.iteritems():
209 base_size += len(k) + len(v) + 2 + sizeof(c_void_p)
213 def _argmax_args_size(args):
214 return sum(len(x) + 1 + sizeof(c_void_p) for x in args)
217 def batchpipe(command, args, preexec_fn=None, arg_max=_helpers.SC_ARG_MAX):
218 """If args is not empty, yield the output produced by calling the
219 command list with args as a sequence of strings (It may be necessary
220 to return multiple strings in order to respect ARG_MAX)."""
221 # The optional arg_max arg is a workaround for an issue with the
222 # current wvtest behavior.
223 base_size = _argmax_base(command)
225 room = arg_max - base_size
228 next_size = _argmax_args_size(args[i:i+1])
229 if room - next_size < 0:
235 assert(len(sub_args))
236 yield readpipe(command + sub_args, preexec_fn=preexec_fn)
240 """Get the absolute path of a file.
242 Behaves like os.path.realpath, but doesn't follow a symlink for the last
243 element. (ie. if 'p' itself is a symlink, this one won't follow it, but it
244 will follow symlinks in p's directory)
250 if st and stat.S_ISLNK(st.st_mode):
251 (dir, name) = os.path.split(p)
252 dir = os.path.realpath(dir)
253 out = os.path.join(dir, name)
255 out = os.path.realpath(p)
256 #log('realpathing:%r,%r\n' % (p, out))
260 def detect_fakeroot():
261 "Return True if we appear to be running under fakeroot."
262 return os.getenv("FAKEROOTKEY") != None
266 if sys.platform.startswith('cygwin'):
268 return ctypes.cdll.shell32.IsUserAnAdmin()
270 return os.geteuid() == 0
273 def _cache_key_value(get_value, key, cache):
274 """Return (value, was_cached). If there is a value in the cache
275 for key, use that, otherwise, call get_value(key) which should
276 throw a KeyError if there is no value -- in which case the cached
277 and returned value will be None.
279 try: # Do we already have it (or know there wasn't one)?
286 cache[key] = value = get_value(key)
292 _uid_to_pwd_cache = {}
293 _name_to_pwd_cache = {}
295 def pwd_from_uid(uid):
296 """Return password database entry for uid (may be a cached value).
297 Return None if no entry is found.
299 global _uid_to_pwd_cache, _name_to_pwd_cache
300 entry, cached = _cache_key_value(pwd.getpwuid, uid, _uid_to_pwd_cache)
301 if entry and not cached:
302 _name_to_pwd_cache[entry.pw_name] = entry
306 def pwd_from_name(name):
307 """Return password database entry for name (may be a cached value).
308 Return None if no entry is found.
310 global _uid_to_pwd_cache, _name_to_pwd_cache
311 entry, cached = _cache_key_value(pwd.getpwnam, name, _name_to_pwd_cache)
312 if entry and not cached:
313 _uid_to_pwd_cache[entry.pw_uid] = entry
317 _gid_to_grp_cache = {}
318 _name_to_grp_cache = {}
320 def grp_from_gid(gid):
321 """Return password database entry for gid (may be a cached value).
322 Return None if no entry is found.
324 global _gid_to_grp_cache, _name_to_grp_cache
325 entry, cached = _cache_key_value(grp.getgrgid, gid, _gid_to_grp_cache)
326 if entry and not cached:
327 _name_to_grp_cache[entry.gr_name] = entry
331 def grp_from_name(name):
332 """Return password database entry for name (may be a cached value).
333 Return None if no entry is found.
335 global _gid_to_grp_cache, _name_to_grp_cache
336 entry, cached = _cache_key_value(grp.getgrnam, name, _name_to_grp_cache)
337 if entry and not cached:
338 _gid_to_grp_cache[entry.gr_gid] = entry
344 """Get the user's login name."""
348 _username = pwd_from_uid(uid)[0] or 'user%d' % uid
354 """Get the user's full name."""
356 if not _userfullname:
358 entry = pwd_from_uid(uid)
360 _userfullname = entry[4].split(',')[0] or entry[0]
361 if not _userfullname:
362 _userfullname = 'user%d' % uid
368 """Get the FQDN of this machine."""
371 _hostname = socket.getfqdn()
375 _resource_path = None
376 def resource_path(subdir=''):
377 global _resource_path
378 if not _resource_path:
379 _resource_path = os.environ.get('BUP_RESOURCE_PATH') or '.'
380 return os.path.join(_resource_path, subdir)
382 def format_filesize(size):
387 exponent = int(math.log(size) / math.log(unit))
388 size_prefix = "KMGTPE"[exponent - 1]
389 return "%.1f%s" % (size / math.pow(unit, exponent), size_prefix)
392 class NotOk(Exception):
397 def __init__(self, outp):
401 while self._read(65536): pass
403 def read(self, size):
404 """Read 'size' bytes from input stream."""
406 return self._read(size)
409 """Read from input stream until a newline is found."""
411 return self._readline()
413 def write(self, data):
414 """Write 'data' to output stream."""
415 #log('%d writing: %d bytes\n' % (os.getpid(), len(data)))
416 self.outp.write(data)
419 """Return true if input stream is readable."""
420 raise NotImplemented("Subclasses must implement has_input")
423 """Indicate end of output from last sent command."""
427 """Indicate server error to the client."""
428 s = re.sub(r'\s+', ' ', str(s))
429 self.write('\nerror %s\n' % s)
431 def _check_ok(self, onempty):
434 for rl in linereader(self):
435 #log('%d got line: %r\n' % (os.getpid(), rl))
436 if not rl: # empty line
440 elif rl.startswith('error '):
441 #log('client: error: %s\n' % rl[6:])
445 raise Exception('server exited unexpectedly; see errors above')
447 def drain_and_check_ok(self):
448 """Remove all data for the current command from input stream."""
451 return self._check_ok(onempty)
454 """Verify that server action completed successfully."""
456 raise Exception('expected "ok", got %r' % rl)
457 return self._check_ok(onempty)
460 class Conn(BaseConn):
461 def __init__(self, inp, outp):
462 BaseConn.__init__(self, outp)
465 def _read(self, size):
466 return self.inp.read(size)
469 return self.inp.readline()
472 [rl, wl, xl] = select.select([self.inp.fileno()], [], [], 0)
474 assert(rl[0] == self.inp.fileno())
480 def checked_reader(fd, n):
482 rl, _, _ = select.select([fd], [], [])
485 if not buf: raise Exception("Unexpected EOF reading %d more bytes" % n)
490 MAX_PACKET = 128 * 1024
491 def mux(p, outfd, outr, errr):
494 while p.poll() is None:
495 rl, _, _ = select.select(fds, [], [])
498 buf = os.read(outr, MAX_PACKET)
500 os.write(outfd, struct.pack('!IB', len(buf), 1) + buf)
502 buf = os.read(errr, 1024)
504 os.write(outfd, struct.pack('!IB', len(buf), 2) + buf)
506 os.write(outfd, struct.pack('!IB', 0, 3))
509 class DemuxConn(BaseConn):
510 """A helper class for bup's client-server protocol."""
511 def __init__(self, infd, outp):
512 BaseConn.__init__(self, outp)
513 # Anything that comes through before the sync string was not
514 # multiplexed and can be assumed to be debug/log before mux init.
516 while tail != 'BUPMUX':
517 b = os.read(infd, (len(tail) < 6) and (6-len(tail)) or 1)
519 raise IOError('demux: unexpected EOF during initialization')
521 sys.stderr.write(tail[:-6]) # pre-mux log messages
528 def write(self, data):
530 BaseConn.write(self, data)
532 def _next_packet(self, timeout):
533 if self.closed: return False
534 rl, wl, xl = select.select([self.infd], [], [], timeout)
535 if not rl: return False
536 assert(rl[0] == self.infd)
537 ns = ''.join(checked_reader(self.infd, 5))
538 n, fdw = struct.unpack('!IB', ns)
539 assert(n <= MAX_PACKET)
541 self.reader = checked_reader(self.infd, n)
543 for buf in checked_reader(self.infd, n):
544 sys.stderr.write(buf)
547 debug2("DemuxConn: marked closed\n")
550 def _load_buf(self, timeout):
551 if self.buf is not None:
553 while not self.closed:
554 while not self.reader:
555 if not self._next_packet(timeout):
558 self.buf = self.reader.next()
560 except StopIteration:
564 def _read_parts(self, ix_fn):
565 while self._load_buf(None):
566 assert(self.buf is not None)
568 if i is None or i == len(self.buf):
573 self.buf = self.buf[i:]
581 return buf.index('\n')+1
584 return ''.join(self._read_parts(find_eol))
586 def _read(self, size):
588 def until_size(buf): # Closes on csize
589 if len(buf) < csize[0]:
594 return ''.join(self._read_parts(until_size))
597 return self._load_buf(0)
601 """Generate a list of input lines from 'f' without terminating newlines."""
609 def chunkyreader(f, count = None):
610 """Generate a list of chunks of data read from 'f'.
612 If count is None, read until EOF is reached.
614 If count is a positive integer, read 'count' bytes from 'f'. If EOF is
615 reached while reading, raise IOError.
619 b = f.read(min(count, 65536))
621 raise IOError('EOF with %d bytes remaining' % count)
632 """Append "/" to 's' if it doesn't aleady end in "/"."""
633 if s and not s.endswith('/'):
639 def _mmap_do(f, sz, flags, prot, close):
641 st = os.fstat(f.fileno())
644 # trying to open a zero-length map gives an error, but an empty
645 # string has all the same behaviour of a zero-length map, ie. it has
648 map = mmap.mmap(f.fileno(), sz, flags, prot)
650 f.close() # map will persist beyond file close
654 def mmap_read(f, sz = 0, close=True):
655 """Create a read-only memory mapped region on file 'f'.
656 If sz is 0, the region will cover the entire file.
658 return _mmap_do(f, sz, mmap.MAP_PRIVATE, mmap.PROT_READ, close)
661 def mmap_readwrite(f, sz = 0, close=True):
662 """Create a read-write memory mapped region on file 'f'.
663 If sz is 0, the region will cover the entire file.
665 return _mmap_do(f, sz, mmap.MAP_SHARED, mmap.PROT_READ|mmap.PROT_WRITE,
669 def mmap_readwrite_private(f, sz = 0, close=True):
670 """Create a read-write memory mapped region on file 'f'.
671 If sz is 0, the region will cover the entire file.
672 The map is private, which means the changes are never flushed back to the
675 return _mmap_do(f, sz, mmap.MAP_PRIVATE, mmap.PROT_READ|mmap.PROT_WRITE,
679 def parse_timestamp(epoch_str):
680 """Return the number of nanoseconds since the epoch that are described
681 by epoch_str (100ms, 100ns, ...); when epoch_str cannot be parsed,
682 throw a ValueError that may contain additional information."""
683 ns_per = {'s' : 1000000000,
687 match = re.match(r'^((?:[-+]?[0-9]+)?)(s|ms|us|ns)$', epoch_str)
689 if re.match(r'^([-+]?[0-9]+)$', epoch_str):
690 raise ValueError('must include units, i.e. 100ns, 100ms, ...')
692 (n, units) = match.group(1, 2)
696 return n * ns_per[units]
700 """Parse data size information into a float number.
702 Here are some examples of conversions:
703 199.2k means 203981 bytes
704 1GB means 1073741824 bytes
705 2.1 tb means 2199023255552 bytes
707 g = re.match(r'([-+\d.e]+)\s*(\w*)', str(s))
709 raise ValueError("can't parse %r as a number" % s)
710 (val, unit) = g.groups()
713 if unit in ['t', 'tb']:
714 mult = 1024*1024*1024*1024
715 elif unit in ['g', 'gb']:
716 mult = 1024*1024*1024
717 elif unit in ['m', 'mb']:
719 elif unit in ['k', 'kb']:
721 elif unit in ['', 'b']:
724 raise ValueError("invalid unit %r in number %r" % (unit, s))
729 """Count the number of elements in an iterator. (consumes the iterator)"""
730 return reduce(lambda x,y: x+1, l)
735 """Append an error message to the list of saved errors.
737 Once processing is able to stop and output the errors, the saved errors are
738 accessible in the module variable helpers.saved_errors.
740 saved_errors.append(e)
750 """Replace the default exception handler for KeyboardInterrupt (Ctrl-C).
752 The new exception handler will make sure that bup will exit without an ugly
753 stacktrace when Ctrl-C is hit.
755 oldhook = sys.excepthook
756 def newhook(exctype, value, traceback):
757 if exctype == KeyboardInterrupt:
758 log('\nInterrupted.\n')
760 return oldhook(exctype, value, traceback)
761 sys.excepthook = newhook
764 def columnate(l, prefix):
765 """Format elements of 'l' in columns with 'prefix' leading each line.
767 The number of columns is determined automatically based on the string
773 clen = max(len(s) for s in l)
774 ncols = (tty_width() - len(prefix)) / (clen + 2)
779 while len(l) % ncols:
782 for s in range(0, len(l), rows):
783 cols.append(l[s:s+rows])
785 for row in zip(*cols):
786 out += prefix + ''.join(('%-*s' % (clen+2, s)) for s in row) + '\n'
790 def parse_date_or_fatal(str, fatal):
791 """Parses the given date or calls Option.fatal().
792 For now we expect a string that contains a float."""
795 except ValueError, e:
796 raise fatal('invalid date format (should be a float): %r' % e)
801 def parse_excludes(options, fatal):
802 """Traverse the options and extract all excludes, or call Option.fatal()."""
806 (option, parameter) = flag
807 if option == '--exclude':
808 excluded_paths.append(realpath(parameter))
809 elif option == '--exclude-from':
811 f = open(realpath(parameter))
813 raise fatal("couldn't read %s" % parameter)
814 for exclude_path in f.readlines():
815 excluded_paths.append(realpath(exclude_path.strip()))
816 return sorted(frozenset(excluded_paths))
819 def parse_rx_excludes(options, fatal):
820 """Traverse the options and extract all rx excludes, or call
822 excluded_patterns = []
825 (option, parameter) = flag
826 if option == '--exclude-rx':
828 excluded_patterns.append(re.compile(parameter))
830 fatal('invalid --exclude-rx pattern (%s): %s' % (parameter, ex))
831 elif option == '--exclude-rx-from':
833 f = open(realpath(parameter))
835 raise fatal("couldn't read %s" % parameter)
836 for pattern in f.readlines():
837 spattern = pattern.rstrip('\n')
841 excluded_patterns.append(re.compile(spattern))
843 fatal('invalid --exclude-rx pattern (%s): %s' % (spattern, ex))
844 return excluded_patterns
847 def should_rx_exclude_path(path, exclude_rxs):
848 """Return True if path matches a regular expression in exclude_rxs."""
849 for rx in exclude_rxs:
851 debug1('Skipping %r: excluded by rx pattern %r.\n'
852 % (path, rx.pattern))
857 # FIXME: Carefully consider the use of functions (os.path.*, etc.)
858 # that resolve against the current filesystem in the strip/graft
859 # functions for example, but elsewhere as well. I suspect bup's not
860 # always being careful about that. For some cases, the contents of
861 # the current filesystem should be irrelevant, and consulting it might
862 # produce the wrong result, perhaps via unintended symlink resolution,
865 def path_components(path):
866 """Break path into a list of pairs of the form (name,
867 full_path_to_name). Path must start with '/'.
869 '/home/foo' -> [('', '/'), ('home', '/home'), ('foo', '/home/foo')]"""
870 if not path.startswith('/'):
871 raise Exception, 'path must start with "/": %s' % path
872 # Since we assume path startswith('/'), we can skip the first element.
874 norm_path = os.path.abspath(path)
878 for p in norm_path.split('/')[1:]:
880 result.append((p, full_path))
884 def stripped_path_components(path, strip_prefixes):
885 """Strip any prefix in strip_prefixes from path and return a list
886 of path components where each component is (name,
887 none_or_full_fs_path_to_name). Assume path startswith('/').
888 See thelpers.py for examples."""
889 normalized_path = os.path.abspath(path)
890 sorted_strip_prefixes = sorted(strip_prefixes, key=len, reverse=True)
891 for bp in sorted_strip_prefixes:
892 normalized_bp = os.path.abspath(bp)
893 if normalized_path.startswith(normalized_bp):
894 prefix = normalized_path[:len(normalized_bp)]
896 for p in normalized_path[len(normalized_bp):].split('/'):
900 result.append((p, prefix))
903 return path_components(path)
906 def grafted_path_components(graft_points, path):
907 # Create a result that consists of some number of faked graft
908 # directories before the graft point, followed by all of the real
909 # directories from path that are after the graft point. Arrange
910 # for the directory at the graft point in the result to correspond
911 # to the "orig" directory in --graft orig=new. See t/thelpers.py
914 # Note that given --graft orig=new, orig and new have *nothing* to
915 # do with each other, even if some of their component names
916 # match. i.e. --graft /foo/bar/baz=/foo/bar/bax is semantically
917 # equivalent to --graft /foo/bar/baz=/x/y/z, or even
920 # FIXME: This can't be the best solution...
921 clean_path = os.path.abspath(path)
922 for graft_point in graft_points:
923 old_prefix, new_prefix = graft_point
924 # Expand prefixes iff not absolute paths.
925 old_prefix = os.path.normpath(old_prefix)
926 new_prefix = os.path.normpath(new_prefix)
927 if clean_path.startswith(old_prefix):
928 escaped_prefix = re.escape(old_prefix)
929 grafted_path = re.sub(r'^' + escaped_prefix, new_prefix, clean_path)
930 # Handle /foo=/ (at least) -- which produces //whatever.
931 grafted_path = '/' + grafted_path.lstrip('/')
932 clean_path_components = path_components(clean_path)
933 # Count the components that were stripped.
934 strip_count = 0 if old_prefix == '/' else old_prefix.count('/')
935 new_prefix_parts = new_prefix.split('/')
936 result_prefix = grafted_path.split('/')[:new_prefix.count('/')]
937 result = [(p, None) for p in result_prefix] \
938 + clean_path_components[strip_count:]
939 # Now set the graft point name to match the end of new_prefix.
940 graft_point = len(result_prefix)
941 result[graft_point] = \
942 (new_prefix_parts[-1], clean_path_components[strip_count][1])
943 if new_prefix == '/': # --graft ...=/ is a special case.
946 return path_components(clean_path)