1 """Helper functions and classes for bup."""
3 import sys, os, pwd, subprocess, errno, socket, select, mmap, stat, re, struct
4 import hashlib, heapq, operator, time, grp
5 from bup import _version, _helpers
6 import bup._helpers as _helpers
9 # This function should really be in helpers, not in bup.options. But we
10 # want options.py to be standalone so people can include it in other projects.
11 from bup.options import _tty_width
12 tty_width = _tty_width
16 """Convert the string 's' to an integer. Return 0 if s is not a number."""
24 """Convert the string 's' to a float. Return 0 if s is not a number."""
26 return float(s or '0')
31 buglvl = atoi(os.environ.get('BUP_DEBUG', 0))
34 # If the platform doesn't have fdatasync (OS X), fall back to fsync.
36 fdatasync = os.fdatasync
37 except AttributeError:
41 # Write (blockingly) to sockets that may or may not be in blocking mode.
42 # We need this because our stderr is sometimes eaten by subprocesses
43 # (probably ssh) that sometimes make it nonblocking, if only temporarily,
44 # leading to race conditions. Ick. We'll do it the hard way.
45 def _hard_write(fd, buf):
47 (r,w,x) = select.select([], [fd], [], None)
49 raise IOError('select(fd) returned without being writable')
51 sz = os.write(fd, buf)
53 if e.errno != errno.EAGAIN:
61 """Print a log message to stderr."""
64 _hard_write(sys.stderr.fileno(), s)
78 istty1 = os.isatty(1) or (atoi(os.environ.get('BUP_FORCE_TTY')) & 1)
79 istty2 = os.isatty(2) or (atoi(os.environ.get('BUP_FORCE_TTY')) & 2)
82 """Calls log() if stderr is a TTY. Does nothing otherwise."""
90 """Calls progress() only if we haven't printed progress in a while.
92 This avoids overloading the stderr buffer with excess junk.
96 if now - _last_prog > 0.1:
102 """Calls progress() to redisplay the most recent progress message.
104 Useful after you've printed some other message that wipes out the
107 if _last_progress and _last_progress.endswith('\r'):
108 progress(_last_progress)
111 def mkdirp(d, mode=None):
112 """Recursively create directories on path 'd'.
114 Unlike os.makedirs(), it doesn't raise an exception if the last element of
115 the path already exists.
123 if e.errno == errno.EEXIST:
129 def merge_iter(iters, pfreq, pfunc, pfinal, key=None):
131 samekey = lambda e, pe: getattr(e, key) == getattr(pe, key, None)
133 samekey = operator.eq
135 total = sum(len(it) for it in iters)
136 iters = (iter(it) for it in iters)
137 heap = ((next(it, None),it) for it in iters)
138 heap = [(e,it) for e,it in heap if e]
143 if not count % pfreq:
146 if not samekey(e, pe):
151 e = it.next() # Don't use next() function, it's too expensive
152 except StopIteration:
153 heapq.heappop(heap) # remove current
155 heapq.heapreplace(heap, (e, it)) # shift current to new location
160 """Delete a file at path 'f' if it currently exists.
162 Unlike os.unlink(), does not throw an exception if the file didn't already
168 if e.errno == errno.ENOENT:
169 pass # it doesn't exist, that's what you asked for
173 """Run a subprocess and return its output."""
174 p = subprocess.Popen(argv, stdout=subprocess.PIPE)
175 out, err = p.communicate()
176 if p.returncode != 0:
177 raise Exception('subprocess %r failed with status %d'
178 % (' '.join(argv), p.returncode))
183 """Get the absolute path of a file.
185 Behaves like os.path.realpath, but doesn't follow a symlink for the last
186 element. (ie. if 'p' itself is a symlink, this one won't follow it, but it
187 will follow symlinks in p's directory)
193 if st and stat.S_ISLNK(st.st_mode):
194 (dir, name) = os.path.split(p)
195 dir = os.path.realpath(dir)
196 out = os.path.join(dir, name)
198 out = os.path.realpath(p)
199 #log('realpathing:%r,%r\n' % (p, out))
203 def detect_fakeroot():
204 "Return True if we appear to be running under fakeroot."
205 return os.getenv("FAKEROOTKEY") != None
209 if sys.platform.startswith('cygwin'):
211 return ctypes.cdll.shell32.IsUserAnAdmin()
213 return os.geteuid() == 0
216 def _cache_key_value(get_value, key, cache):
217 """Return (value, was_cached). If there is a value in the cache
218 for key, use that, otherwise, call get_value(key) which should
219 throw a KeyError if there is no value -- in which case the cached
220 and returned value will be None.
222 try: # Do we already have it (or know there wasn't one)?
229 cache[key] = value = get_value(key)
235 _uid_to_pwd_cache = {}
236 _name_to_pwd_cache = {}
238 def pwd_from_uid(uid):
239 """Return password database entry for uid (may be a cached value).
240 Return None if no entry is found.
242 global _uid_to_pwd_cache, _name_to_pwd_cache
243 entry, cached = _cache_key_value(pwd.getpwuid, uid, _uid_to_pwd_cache)
244 if entry and not cached:
245 _name_to_pwd_cache[entry.pw_name] = entry
249 def pwd_from_name(name):
250 """Return password database entry for name (may be a cached value).
251 Return None if no entry is found.
253 global _uid_to_pwd_cache, _name_to_pwd_cache
254 entry, cached = _cache_key_value(pwd.getpwnam, name, _name_to_pwd_cache)
255 if entry and not cached:
256 _uid_to_pwd_cache[entry.pw_uid] = entry
260 _gid_to_grp_cache = {}
261 _name_to_grp_cache = {}
263 def grp_from_gid(gid):
264 """Return password database entry for gid (may be a cached value).
265 Return None if no entry is found.
267 global _gid_to_grp_cache, _name_to_grp_cache
268 entry, cached = _cache_key_value(grp.getgrgid, gid, _gid_to_grp_cache)
269 if entry and not cached:
270 _name_to_grp_cache[entry.gr_name] = entry
274 def grp_from_name(name):
275 """Return password database entry for name (may be a cached value).
276 Return None if no entry is found.
278 global _gid_to_grp_cache, _name_to_grp_cache
279 entry, cached = _cache_key_value(grp.getgrnam, name, _name_to_grp_cache)
280 if entry and not cached:
281 _gid_to_grp_cache[entry.gr_gid] = entry
287 """Get the user's login name."""
291 _username = pwd_from_uid(uid)[0] or 'user%d' % uid
297 """Get the user's full name."""
299 if not _userfullname:
301 entry = pwd_from_uid(uid)
303 _userfullname = entry[4].split(',')[0] or entry[0]
304 if not _userfullname:
305 _userfullname = 'user%d' % uid
311 """Get the FQDN of this machine."""
314 _hostname = socket.getfqdn()
318 _resource_path = None
319 def resource_path(subdir=''):
320 global _resource_path
321 if not _resource_path:
322 _resource_path = os.environ.get('BUP_RESOURCE_PATH') or '.'
323 return os.path.join(_resource_path, subdir)
325 def format_filesize(size):
330 exponent = int(math.log(size) / math.log(unit))
331 size_prefix = "KMGTPE"[exponent - 1]
332 return "%.1f%s" % (size / math.pow(unit, exponent), size_prefix)
335 class NotOk(Exception):
340 def __init__(self, outp):
344 while self._read(65536): pass
346 def read(self, size):
347 """Read 'size' bytes from input stream."""
349 return self._read(size)
352 """Read from input stream until a newline is found."""
354 return self._readline()
356 def write(self, data):
357 """Write 'data' to output stream."""
358 #log('%d writing: %d bytes\n' % (os.getpid(), len(data)))
359 self.outp.write(data)
362 """Return true if input stream is readable."""
363 raise NotImplemented("Subclasses must implement has_input")
366 """Indicate end of output from last sent command."""
370 """Indicate server error to the client."""
371 s = re.sub(r'\s+', ' ', str(s))
372 self.write('\nerror %s\n' % s)
374 def _check_ok(self, onempty):
377 for rl in linereader(self):
378 #log('%d got line: %r\n' % (os.getpid(), rl))
379 if not rl: # empty line
383 elif rl.startswith('error '):
384 #log('client: error: %s\n' % rl[6:])
388 raise Exception('server exited unexpectedly; see errors above')
390 def drain_and_check_ok(self):
391 """Remove all data for the current command from input stream."""
394 return self._check_ok(onempty)
397 """Verify that server action completed successfully."""
399 raise Exception('expected "ok", got %r' % rl)
400 return self._check_ok(onempty)
403 class Conn(BaseConn):
404 def __init__(self, inp, outp):
405 BaseConn.__init__(self, outp)
408 def _read(self, size):
409 return self.inp.read(size)
412 return self.inp.readline()
415 [rl, wl, xl] = select.select([self.inp.fileno()], [], [], 0)
417 assert(rl[0] == self.inp.fileno())
423 def checked_reader(fd, n):
425 rl, _, _ = select.select([fd], [], [])
428 if not buf: raise Exception("Unexpected EOF reading %d more bytes" % n)
433 MAX_PACKET = 128 * 1024
434 def mux(p, outfd, outr, errr):
437 while p.poll() is None:
438 rl, _, _ = select.select(fds, [], [])
441 buf = os.read(outr, MAX_PACKET)
443 os.write(outfd, struct.pack('!IB', len(buf), 1) + buf)
445 buf = os.read(errr, 1024)
447 os.write(outfd, struct.pack('!IB', len(buf), 2) + buf)
449 os.write(outfd, struct.pack('!IB', 0, 3))
452 class DemuxConn(BaseConn):
453 """A helper class for bup's client-server protocol."""
454 def __init__(self, infd, outp):
455 BaseConn.__init__(self, outp)
456 # Anything that comes through before the sync string was not
457 # multiplexed and can be assumed to be debug/log before mux init.
459 while tail != 'BUPMUX':
460 b = os.read(infd, (len(tail) < 6) and (6-len(tail)) or 1)
462 raise IOError('demux: unexpected EOF during initialization')
464 sys.stderr.write(tail[:-6]) # pre-mux log messages
471 def write(self, data):
473 BaseConn.write(self, data)
475 def _next_packet(self, timeout):
476 if self.closed: return False
477 rl, wl, xl = select.select([self.infd], [], [], timeout)
478 if not rl: return False
479 assert(rl[0] == self.infd)
480 ns = ''.join(checked_reader(self.infd, 5))
481 n, fdw = struct.unpack('!IB', ns)
482 assert(n <= MAX_PACKET)
484 self.reader = checked_reader(self.infd, n)
486 for buf in checked_reader(self.infd, n):
487 sys.stderr.write(buf)
490 debug2("DemuxConn: marked closed\n")
493 def _load_buf(self, timeout):
494 if self.buf is not None:
496 while not self.closed:
497 while not self.reader:
498 if not self._next_packet(timeout):
501 self.buf = self.reader.next()
503 except StopIteration:
507 def _read_parts(self, ix_fn):
508 while self._load_buf(None):
509 assert(self.buf is not None)
511 if i is None or i == len(self.buf):
516 self.buf = self.buf[i:]
524 return buf.index('\n')+1
527 return ''.join(self._read_parts(find_eol))
529 def _read(self, size):
531 def until_size(buf): # Closes on csize
532 if len(buf) < csize[0]:
537 return ''.join(self._read_parts(until_size))
540 return self._load_buf(0)
544 """Generate a list of input lines from 'f' without terminating newlines."""
552 def chunkyreader(f, count = None):
553 """Generate a list of chunks of data read from 'f'.
555 If count is None, read until EOF is reached.
557 If count is a positive integer, read 'count' bytes from 'f'. If EOF is
558 reached while reading, raise IOError.
562 b = f.read(min(count, 65536))
564 raise IOError('EOF with %d bytes remaining' % count)
575 """Append "/" to 's' if it doesn't aleady end in "/"."""
576 if s and not s.endswith('/'):
582 def _mmap_do(f, sz, flags, prot, close):
584 st = os.fstat(f.fileno())
587 # trying to open a zero-length map gives an error, but an empty
588 # string has all the same behaviour of a zero-length map, ie. it has
591 map = mmap.mmap(f.fileno(), sz, flags, prot)
593 f.close() # map will persist beyond file close
597 def mmap_read(f, sz = 0, close=True):
598 """Create a read-only memory mapped region on file 'f'.
599 If sz is 0, the region will cover the entire file.
601 return _mmap_do(f, sz, mmap.MAP_PRIVATE, mmap.PROT_READ, close)
604 def mmap_readwrite(f, sz = 0, close=True):
605 """Create a read-write memory mapped region on file 'f'.
606 If sz is 0, the region will cover the entire file.
608 return _mmap_do(f, sz, mmap.MAP_SHARED, mmap.PROT_READ|mmap.PROT_WRITE,
612 def mmap_readwrite_private(f, sz = 0, close=True):
613 """Create a read-write memory mapped region on file 'f'.
614 If sz is 0, the region will cover the entire file.
615 The map is private, which means the changes are never flushed back to the
618 return _mmap_do(f, sz, mmap.MAP_PRIVATE, mmap.PROT_READ|mmap.PROT_WRITE,
622 def parse_timestamp(epoch_str):
623 """Return the number of nanoseconds since the epoch that are described
624 by epoch_str (100ms, 100ns, ...); when epoch_str cannot be parsed,
625 throw a ValueError that may contain additional information."""
626 ns_per = {'s' : 1000000000,
630 match = re.match(r'^((?:[-+]?[0-9]+)?)(s|ms|us|ns)$', epoch_str)
632 if re.match(r'^([-+]?[0-9]+)$', epoch_str):
633 raise ValueError('must include units, i.e. 100ns, 100ms, ...')
635 (n, units) = match.group(1, 2)
639 return n * ns_per[units]
643 """Parse data size information into a float number.
645 Here are some examples of conversions:
646 199.2k means 203981 bytes
647 1GB means 1073741824 bytes
648 2.1 tb means 2199023255552 bytes
650 g = re.match(r'([-+\d.e]+)\s*(\w*)', str(s))
652 raise ValueError("can't parse %r as a number" % s)
653 (val, unit) = g.groups()
656 if unit in ['t', 'tb']:
657 mult = 1024*1024*1024*1024
658 elif unit in ['g', 'gb']:
659 mult = 1024*1024*1024
660 elif unit in ['m', 'mb']:
662 elif unit in ['k', 'kb']:
664 elif unit in ['', 'b']:
667 raise ValueError("invalid unit %r in number %r" % (unit, s))
672 """Count the number of elements in an iterator. (consumes the iterator)"""
673 return reduce(lambda x,y: x+1, l)
678 """Append an error message to the list of saved errors.
680 Once processing is able to stop and output the errors, the saved errors are
681 accessible in the module variable helpers.saved_errors.
683 saved_errors.append(e)
693 """Replace the default exception handler for KeyboardInterrupt (Ctrl-C).
695 The new exception handler will make sure that bup will exit without an ugly
696 stacktrace when Ctrl-C is hit.
698 oldhook = sys.excepthook
699 def newhook(exctype, value, traceback):
700 if exctype == KeyboardInterrupt:
701 log('\nInterrupted.\n')
703 return oldhook(exctype, value, traceback)
704 sys.excepthook = newhook
707 def columnate(l, prefix):
708 """Format elements of 'l' in columns with 'prefix' leading each line.
710 The number of columns is determined automatically based on the string
716 clen = max(len(s) for s in l)
717 ncols = (tty_width() - len(prefix)) / (clen + 2)
722 while len(l) % ncols:
725 for s in range(0, len(l), rows):
726 cols.append(l[s:s+rows])
728 for row in zip(*cols):
729 out += prefix + ''.join(('%-*s' % (clen+2, s)) for s in row) + '\n'
733 def parse_date_or_fatal(str, fatal):
734 """Parses the given date or calls Option.fatal().
735 For now we expect a string that contains a float."""
738 except ValueError, e:
739 raise fatal('invalid date format (should be a float): %r' % e)
744 def parse_excludes(options, fatal):
745 """Traverse the options and extract all excludes, or call Option.fatal()."""
749 (option, parameter) = flag
750 if option == '--exclude':
751 excluded_paths.append(realpath(parameter))
752 elif option == '--exclude-from':
754 f = open(realpath(parameter))
756 raise fatal("couldn't read %s" % parameter)
757 for exclude_path in f.readlines():
758 excluded_paths.append(realpath(exclude_path.strip()))
759 return sorted(frozenset(excluded_paths))
762 def parse_rx_excludes(options, fatal):
763 """Traverse the options and extract all rx excludes, or call
765 excluded_patterns = []
768 (option, parameter) = flag
769 if option == '--exclude-rx':
771 excluded_patterns.append(re.compile(parameter))
773 fatal('invalid --exclude-rx pattern (%s): %s' % (parameter, ex))
774 elif option == '--exclude-rx-from':
776 f = open(realpath(parameter))
778 raise fatal("couldn't read %s" % parameter)
779 for pattern in f.readlines():
780 spattern = pattern.rstrip('\n')
782 excluded_patterns.append(re.compile(spattern))
784 fatal('invalid --exclude-rx pattern (%s): %s' % (spattern, ex))
785 return excluded_patterns
788 def should_rx_exclude_path(path, exclude_rxs):
789 """Return True if path matches a regular expression in exclude_rxs."""
790 for rx in exclude_rxs:
792 debug1('Skipping %r: excluded by rx pattern %r.\n'
793 % (path, rx.pattern))
798 # FIXME: Carefully consider the use of functions (os.path.*, etc.)
799 # that resolve against the current filesystem in the strip/graft
800 # functions for example, but elsewhere as well. I suspect bup's not
801 # always being careful about that. For some cases, the contents of
802 # the current filesystem should be irrelevant, and consulting it might
803 # produce the wrong result, perhaps via unintended symlink resolution,
806 def path_components(path):
807 """Break path into a list of pairs of the form (name,
808 full_path_to_name). Path must start with '/'.
810 '/home/foo' -> [('', '/'), ('home', '/home'), ('foo', '/home/foo')]"""
811 if not path.startswith('/'):
812 raise Exception, 'path must start with "/": %s' % path
813 # Since we assume path startswith('/'), we can skip the first element.
815 norm_path = os.path.abspath(path)
819 for p in norm_path.split('/')[1:]:
821 result.append((p, full_path))
825 def stripped_path_components(path, strip_prefixes):
826 """Strip any prefix in strip_prefixes from path and return a list
827 of path components where each component is (name,
828 none_or_full_fs_path_to_name). Assume path startswith('/').
829 See thelpers.py for examples."""
830 normalized_path = os.path.abspath(path)
831 sorted_strip_prefixes = sorted(strip_prefixes, key=len, reverse=True)
832 for bp in sorted_strip_prefixes:
833 normalized_bp = os.path.abspath(bp)
834 if normalized_path.startswith(normalized_bp):
835 prefix = normalized_path[:len(normalized_bp)]
837 for p in normalized_path[len(normalized_bp):].split('/'):
841 result.append((p, prefix))
844 return path_components(path)
847 def grafted_path_components(graft_points, path):
848 # Create a result that consists of some number of faked graft
849 # directories before the graft point, followed by all of the real
850 # directories from path that are after the graft point. Arrange
851 # for the directory at the graft point in the result to correspond
852 # to the "orig" directory in --graft orig=new. See t/thelpers.py
855 # Note that given --graft orig=new, orig and new have *nothing* to
856 # do with each other, even if some of their component names
857 # match. i.e. --graft /foo/bar/baz=/foo/bar/bax is semantically
858 # equivalent to --graft /foo/bar/baz=/x/y/z, or even
861 # FIXME: This can't be the best solution...
862 clean_path = os.path.abspath(path)
863 for graft_point in graft_points:
864 old_prefix, new_prefix = graft_point
865 # Expand prefixes iff not absolute paths.
866 old_prefix = os.path.normpath(old_prefix)
867 new_prefix = os.path.normpath(new_prefix)
868 if clean_path.startswith(old_prefix):
869 escaped_prefix = re.escape(old_prefix)
870 grafted_path = re.sub(r'^' + escaped_prefix, new_prefix, clean_path)
871 # Handle /foo=/ (at least) -- which produces //whatever.
872 grafted_path = '/' + grafted_path.lstrip('/')
873 clean_path_components = path_components(clean_path)
874 # Count the components that were stripped.
875 strip_count = 0 if old_prefix == '/' else old_prefix.count('/')
876 new_prefix_parts = new_prefix.split('/')
877 result_prefix = grafted_path.split('/')[:new_prefix.count('/')]
878 result = [(p, None) for p in result_prefix] \
879 + clean_path_components[strip_count:]
880 # Now set the graft point name to match the end of new_prefix.
881 graft_point = len(result_prefix)
882 result[graft_point] = \
883 (new_prefix_parts[-1], clean_path_components[strip_count][1])
884 if new_prefix == '/': # --graft ...=/ is a special case.
887 return path_components(clean_path)
892 """Format bup's version date string for output."""
893 return _version.DATE.split(' ')[0]
896 def version_commit():
897 """Get the commit hash of bup's current version."""
898 return _version.COMMIT
902 """Format bup's version tag (the official version number).
904 When generated from a commit other than one pointed to with a tag, the
905 returned string will be "unknown-" followed by the first seven positions of
908 names = _version.NAMES.strip()
909 assert(names[0] == '(')
910 assert(names[-1] == ')')
912 l = [n.strip() for n in names.split(',')]
914 if n.startswith('tag: bup-'):
916 return 'unknown-%s' % _version.COMMIT[:7]