1 """Helper functions and classes for bup."""
3 import sys, os, pwd, subprocess, errno, socket, select, mmap, stat, re, struct
4 import hashlib, heapq, operator, time, grp
5 from bup import _version, _helpers
6 import bup._helpers as _helpers
9 # This function should really be in helpers, not in bup.options. But we
10 # want options.py to be standalone so people can include it in other projects.
11 from bup.options import _tty_width
12 tty_width = _tty_width
16 """Convert the string 's' to an integer. Return 0 if s is not a number."""
24 """Convert the string 's' to a float. Return 0 if s is not a number."""
26 return float(s or '0')
31 buglvl = atoi(os.environ.get('BUP_DEBUG', 0))
34 # Write (blockingly) to sockets that may or may not be in blocking mode.
35 # We need this because our stderr is sometimes eaten by subprocesses
36 # (probably ssh) that sometimes make it nonblocking, if only temporarily,
37 # leading to race conditions. Ick. We'll do it the hard way.
38 def _hard_write(fd, buf):
40 (r,w,x) = select.select([], [fd], [], None)
42 raise IOError('select(fd) returned without being writable')
44 sz = os.write(fd, buf)
46 if e.errno != errno.EAGAIN:
54 """Print a log message to stderr."""
57 _hard_write(sys.stderr.fileno(), s)
71 istty1 = os.isatty(1) or (atoi(os.environ.get('BUP_FORCE_TTY')) & 1)
72 istty2 = os.isatty(2) or (atoi(os.environ.get('BUP_FORCE_TTY')) & 2)
75 """Calls log() if stderr is a TTY. Does nothing otherwise."""
83 """Calls progress() only if we haven't printed progress in a while.
85 This avoids overloading the stderr buffer with excess junk.
89 if now - _last_prog > 0.1:
95 """Calls progress() to redisplay the most recent progress message.
97 Useful after you've printed some other message that wipes out the
100 if _last_progress and _last_progress.endswith('\r'):
101 progress(_last_progress)
104 def mkdirp(d, mode=None):
105 """Recursively create directories on path 'd'.
107 Unlike os.makedirs(), it doesn't raise an exception if the last element of
108 the path already exists.
116 if e.errno == errno.EEXIST:
123 """Get the next item from an iterator, None if we reached the end."""
126 except StopIteration:
130 def merge_iter(iters, pfreq, pfunc, pfinal, key=None):
132 samekey = lambda e, pe: getattr(e, key) == getattr(pe, key, None)
134 samekey = operator.eq
136 total = sum(len(it) for it in iters)
137 iters = (iter(it) for it in iters)
138 heap = ((next(it),it) for it in iters)
139 heap = [(e,it) for e,it in heap if e]
144 if not count % pfreq:
147 if not samekey(e, pe):
152 e = it.next() # Don't use next() function, it's too expensive
153 except StopIteration:
154 heapq.heappop(heap) # remove current
156 heapq.heapreplace(heap, (e, it)) # shift current to new location
161 """Delete a file at path 'f' if it currently exists.
163 Unlike os.unlink(), does not throw an exception if the file didn't already
169 if e.errno == errno.ENOENT:
170 pass # it doesn't exist, that's what you asked for
174 """Run a subprocess and return its output."""
175 p = subprocess.Popen(argv, stdout=subprocess.PIPE)
182 """Get the absolute path of a file.
184 Behaves like os.path.realpath, but doesn't follow a symlink for the last
185 element. (ie. if 'p' itself is a symlink, this one won't follow it, but it
186 will follow symlinks in p's directory)
192 if st and stat.S_ISLNK(st.st_mode):
193 (dir, name) = os.path.split(p)
194 dir = os.path.realpath(dir)
195 out = os.path.join(dir, name)
197 out = os.path.realpath(p)
198 #log('realpathing:%r,%r\n' % (p, out))
202 def detect_fakeroot():
203 "Return True if we appear to be running under fakeroot."
204 return os.getenv("FAKEROOTKEY") != None
208 if sys.platform.startswith('cygwin'):
210 return ctypes.cdll.shell32.IsUserAnAdmin()
212 return os.geteuid() == 0
215 def _cache_key_value(get_value, key, cache):
216 """Return (value, was_cached). If there is a value in the cache
217 for key, use that, otherwise, call get_value(key) which should
218 throw a KeyError if there is no value -- in which case the cached
219 and returned value will be None.
221 try: # Do we already have it (or know there wasn't one)?
228 cache[key] = value = get_value(key)
234 _uid_to_pwd_cache = {}
235 _name_to_pwd_cache = {}
237 def pwd_from_uid(uid):
238 """Return password database entry for uid (may be a cached value).
239 Return None if no entry is found.
241 global _uid_to_pwd_cache, _name_to_pwd_cache
242 entry, cached = _cache_key_value(pwd.getpwuid, uid, _uid_to_pwd_cache)
243 if entry and not cached:
244 _name_to_pwd_cache[entry.pw_name] = entry
248 def pwd_from_name(name):
249 """Return password database entry for name (may be a cached value).
250 Return None if no entry is found.
252 global _uid_to_pwd_cache, _name_to_pwd_cache
253 entry, cached = _cache_key_value(pwd.getpwnam, name, _name_to_pwd_cache)
254 if entry and not cached:
255 _uid_to_pwd_cache[entry.pw_uid] = entry
259 _gid_to_grp_cache = {}
260 _name_to_grp_cache = {}
262 def grp_from_gid(gid):
263 """Return password database entry for gid (may be a cached value).
264 Return None if no entry is found.
266 global _gid_to_grp_cache, _name_to_grp_cache
267 entry, cached = _cache_key_value(grp.getgrgid, gid, _gid_to_grp_cache)
268 if entry and not cached:
269 _name_to_grp_cache[entry.gr_name] = entry
273 def grp_from_name(name):
274 """Return password database entry for name (may be a cached value).
275 Return None if no entry is found.
277 global _gid_to_grp_cache, _name_to_grp_cache
278 entry, cached = _cache_key_value(grp.getgrnam, name, _name_to_grp_cache)
279 if entry and not cached:
280 _gid_to_grp_cache[entry.gr_gid] = entry
286 """Get the user's login name."""
290 _username = pwd_from_uid(uid)[0] or 'user%d' % uid
296 """Get the user's full name."""
298 if not _userfullname:
300 entry = pwd_from_uid(uid)
302 _userfullname = entry[4].split(',')[0] or entry[0]
303 if not _userfullname:
304 _userfullname = 'user%d' % uid
310 """Get the FQDN of this machine."""
313 _hostname = socket.getfqdn()
317 _resource_path = None
318 def resource_path(subdir=''):
319 global _resource_path
320 if not _resource_path:
321 _resource_path = os.environ.get('BUP_RESOURCE_PATH') or '.'
322 return os.path.join(_resource_path, subdir)
324 def format_filesize(size):
329 exponent = int(math.log(size) / math.log(unit))
330 size_prefix = "KMGTPE"[exponent - 1]
331 return "%.1f%s" % (size / math.pow(unit, exponent), size_prefix)
334 class NotOk(Exception):
339 def __init__(self, outp):
343 while self._read(65536): pass
345 def read(self, size):
346 """Read 'size' bytes from input stream."""
348 return self._read(size)
351 """Read from input stream until a newline is found."""
353 return self._readline()
355 def write(self, data):
356 """Write 'data' to output stream."""
357 #log('%d writing: %d bytes\n' % (os.getpid(), len(data)))
358 self.outp.write(data)
361 """Return true if input stream is readable."""
362 raise NotImplemented("Subclasses must implement has_input")
365 """Indicate end of output from last sent command."""
369 """Indicate server error to the client."""
370 s = re.sub(r'\s+', ' ', str(s))
371 self.write('\nerror %s\n' % s)
373 def _check_ok(self, onempty):
376 for rl in linereader(self):
377 #log('%d got line: %r\n' % (os.getpid(), rl))
378 if not rl: # empty line
382 elif rl.startswith('error '):
383 #log('client: error: %s\n' % rl[6:])
387 raise Exception('server exited unexpectedly; see errors above')
389 def drain_and_check_ok(self):
390 """Remove all data for the current command from input stream."""
393 return self._check_ok(onempty)
396 """Verify that server action completed successfully."""
398 raise Exception('expected "ok", got %r' % rl)
399 return self._check_ok(onempty)
402 class Conn(BaseConn):
403 def __init__(self, inp, outp):
404 BaseConn.__init__(self, outp)
407 def _read(self, size):
408 return self.inp.read(size)
411 return self.inp.readline()
414 [rl, wl, xl] = select.select([self.inp.fileno()], [], [], 0)
416 assert(rl[0] == self.inp.fileno())
422 def checked_reader(fd, n):
424 rl, _, _ = select.select([fd], [], [])
427 if not buf: raise Exception("Unexpected EOF reading %d more bytes" % n)
432 MAX_PACKET = 128 * 1024
433 def mux(p, outfd, outr, errr):
436 while p.poll() is None:
437 rl, _, _ = select.select(fds, [], [])
440 buf = os.read(outr, MAX_PACKET)
442 os.write(outfd, struct.pack('!IB', len(buf), 1) + buf)
444 buf = os.read(errr, 1024)
446 os.write(outfd, struct.pack('!IB', len(buf), 2) + buf)
448 os.write(outfd, struct.pack('!IB', 0, 3))
451 class DemuxConn(BaseConn):
452 """A helper class for bup's client-server protocol."""
453 def __init__(self, infd, outp):
454 BaseConn.__init__(self, outp)
455 # Anything that comes through before the sync string was not
456 # multiplexed and can be assumed to be debug/log before mux init.
458 while tail != 'BUPMUX':
459 b = os.read(infd, (len(tail) < 6) and (6-len(tail)) or 1)
461 raise IOError('demux: unexpected EOF during initialization')
463 sys.stderr.write(tail[:-6]) # pre-mux log messages
470 def write(self, data):
472 BaseConn.write(self, data)
474 def _next_packet(self, timeout):
475 if self.closed: return False
476 rl, wl, xl = select.select([self.infd], [], [], timeout)
477 if not rl: return False
478 assert(rl[0] == self.infd)
479 ns = ''.join(checked_reader(self.infd, 5))
480 n, fdw = struct.unpack('!IB', ns)
481 assert(n <= MAX_PACKET)
483 self.reader = checked_reader(self.infd, n)
485 for buf in checked_reader(self.infd, n):
486 sys.stderr.write(buf)
489 debug2("DemuxConn: marked closed\n")
492 def _load_buf(self, timeout):
493 if self.buf is not None:
495 while not self.closed:
496 while not self.reader:
497 if not self._next_packet(timeout):
500 self.buf = self.reader.next()
502 except StopIteration:
506 def _read_parts(self, ix_fn):
507 while self._load_buf(None):
508 assert(self.buf is not None)
510 if i is None or i == len(self.buf):
515 self.buf = self.buf[i:]
523 return buf.index('\n')+1
526 return ''.join(self._read_parts(find_eol))
528 def _read(self, size):
530 def until_size(buf): # Closes on csize
531 if len(buf) < csize[0]:
536 return ''.join(self._read_parts(until_size))
539 return self._load_buf(0)
543 """Generate a list of input lines from 'f' without terminating newlines."""
551 def chunkyreader(f, count = None):
552 """Generate a list of chunks of data read from 'f'.
554 If count is None, read until EOF is reached.
556 If count is a positive integer, read 'count' bytes from 'f'. If EOF is
557 reached while reading, raise IOError.
561 b = f.read(min(count, 65536))
563 raise IOError('EOF with %d bytes remaining' % count)
574 """Append "/" to 's' if it doesn't aleady end in "/"."""
575 if s and not s.endswith('/'):
581 def _mmap_do(f, sz, flags, prot, close):
583 st = os.fstat(f.fileno())
586 # trying to open a zero-length map gives an error, but an empty
587 # string has all the same behaviour of a zero-length map, ie. it has
590 map = mmap.mmap(f.fileno(), sz, flags, prot)
592 f.close() # map will persist beyond file close
596 def mmap_read(f, sz = 0, close=True):
597 """Create a read-only memory mapped region on file 'f'.
598 If sz is 0, the region will cover the entire file.
600 return _mmap_do(f, sz, mmap.MAP_PRIVATE, mmap.PROT_READ, close)
603 def mmap_readwrite(f, sz = 0, close=True):
604 """Create a read-write memory mapped region on file 'f'.
605 If sz is 0, the region will cover the entire file.
607 return _mmap_do(f, sz, mmap.MAP_SHARED, mmap.PROT_READ|mmap.PROT_WRITE,
611 def mmap_readwrite_private(f, sz = 0, close=True):
612 """Create a read-write memory mapped region on file 'f'.
613 If sz is 0, the region will cover the entire file.
614 The map is private, which means the changes are never flushed back to the
617 return _mmap_do(f, sz, mmap.MAP_PRIVATE, mmap.PROT_READ|mmap.PROT_WRITE,
622 """Parse data size information into a float number.
624 Here are some examples of conversions:
625 199.2k means 203981 bytes
626 1GB means 1073741824 bytes
627 2.1 tb means 2199023255552 bytes
629 g = re.match(r'([-+\d.e]+)\s*(\w*)', str(s))
631 raise ValueError("can't parse %r as a number" % s)
632 (val, unit) = g.groups()
635 if unit in ['t', 'tb']:
636 mult = 1024*1024*1024*1024
637 elif unit in ['g', 'gb']:
638 mult = 1024*1024*1024
639 elif unit in ['m', 'mb']:
641 elif unit in ['k', 'kb']:
643 elif unit in ['', 'b']:
646 raise ValueError("invalid unit %r in number %r" % (unit, s))
651 """Count the number of elements in an iterator. (consumes the iterator)"""
652 return reduce(lambda x,y: x+1, l)
657 """Append an error message to the list of saved errors.
659 Once processing is able to stop and output the errors, the saved errors are
660 accessible in the module variable helpers.saved_errors.
662 saved_errors.append(e)
672 """Replace the default exception handler for KeyboardInterrupt (Ctrl-C).
674 The new exception handler will make sure that bup will exit without an ugly
675 stacktrace when Ctrl-C is hit.
677 oldhook = sys.excepthook
678 def newhook(exctype, value, traceback):
679 if exctype == KeyboardInterrupt:
680 log('\nInterrupted.\n')
682 return oldhook(exctype, value, traceback)
683 sys.excepthook = newhook
686 def columnate(l, prefix):
687 """Format elements of 'l' in columns with 'prefix' leading each line.
689 The number of columns is determined automatically based on the string
695 clen = max(len(s) for s in l)
696 ncols = (tty_width() - len(prefix)) / (clen + 2)
701 while len(l) % ncols:
704 for s in range(0, len(l), rows):
705 cols.append(l[s:s+rows])
707 for row in zip(*cols):
708 out += prefix + ''.join(('%-*s' % (clen+2, s)) for s in row) + '\n'
712 def parse_date_or_fatal(str, fatal):
713 """Parses the given date or calls Option.fatal().
714 For now we expect a string that contains a float."""
717 except ValueError, e:
718 raise fatal('invalid date format (should be a float): %r' % e)
723 def parse_excludes(options, fatal):
724 """Traverse the options and extract all excludes, or call Option.fatal()."""
728 (option, parameter) = flag
729 if option == '--exclude':
730 excluded_paths.append(realpath(parameter))
731 elif option == '--exclude-from':
733 f = open(realpath(parameter))
735 raise fatal("couldn't read %s" % parameter)
736 for exclude_path in f.readlines():
737 excluded_paths.append(realpath(exclude_path.strip()))
738 return excluded_paths
741 def parse_rx_excludes(options, fatal):
742 """Traverse the options and extract all rx excludes, or call
744 rxs = [v for f, v in options if f == '--exclude-rx']
745 for i in range(len(rxs)):
747 rxs[i] = re.compile(rxs[i])
749 o.fatal('invalid --exclude-rx pattern (%s):' % (ex, rxs[i]))
753 def should_rx_exclude_path(path, exclude_rxs):
754 """Return True if path matches a regular expression in exclude_rxs."""
755 for rx in exclude_rxs:
757 debug1('Skipping %r: excluded by rx pattern %r.\n'
758 % (path, rx.pattern))
763 # FIXME: Carefully consider the use of functions (os.path.*, etc.)
764 # that resolve against the current filesystem in the strip/graft
765 # functions for example, but elsewhere as well. I suspect bup's not
766 # always being careful about that. For some cases, the contents of
767 # the current filesystem should be irrelevant, and consulting it might
768 # produce the wrong result, perhaps via unintended symlink resolution,
771 def path_components(path):
772 """Break path into a list of pairs of the form (name,
773 full_path_to_name). Path must start with '/'.
775 '/home/foo' -> [('', '/'), ('home', '/home'), ('foo', '/home/foo')]"""
776 if not path.startswith('/'):
777 raise Exception, 'path must start with "/": %s' % path
778 # Since we assume path startswith('/'), we can skip the first element.
780 norm_path = os.path.abspath(path)
784 for p in norm_path.split('/')[1:]:
786 result.append((p, full_path))
790 def stripped_path_components(path, strip_prefixes):
791 """Strip any prefix in strip_prefixes from path and return a list
792 of path components where each component is (name,
793 none_or_full_fs_path_to_name). Assume path startswith('/').
794 See thelpers.py for examples."""
795 normalized_path = os.path.abspath(path)
796 sorted_strip_prefixes = sorted(strip_prefixes, key=len, reverse=True)
797 for bp in sorted_strip_prefixes:
798 normalized_bp = os.path.abspath(bp)
799 if normalized_path.startswith(normalized_bp):
800 prefix = normalized_path[:len(normalized_bp)]
802 for p in normalized_path[len(normalized_bp):].split('/'):
806 result.append((p, prefix))
809 return path_components(path)
812 def grafted_path_components(graft_points, path):
813 # Create a result that consists of some number of faked graft
814 # directories before the graft point, followed by all of the real
815 # directories from path that are after the graft point. Arrange
816 # for the directory at the graft point in the result to correspond
817 # to the "orig" directory in --graft orig=new. See t/thelpers.py
820 # Note that given --graft orig=new, orig and new have *nothing* to
821 # do with each other, even if some of their component names
822 # match. i.e. --graft /foo/bar/baz=/foo/bar/bax is semantically
823 # equivalent to --graft /foo/bar/baz=/x/y/z, or even
826 # FIXME: This can't be the best solution...
827 clean_path = os.path.abspath(path)
828 for graft_point in graft_points:
829 old_prefix, new_prefix = graft_point
830 # Expand prefixes iff not absolute paths.
831 old_prefix = os.path.normpath(old_prefix)
832 new_prefix = os.path.normpath(new_prefix)
833 if clean_path.startswith(old_prefix):
834 escaped_prefix = re.escape(old_prefix)
835 grafted_path = re.sub(r'^' + escaped_prefix, new_prefix, clean_path)
836 # Handle /foo=/ (at least) -- which produces //whatever.
837 grafted_path = '/' + grafted_path.lstrip('/')
838 clean_path_components = path_components(clean_path)
839 # Count the components that were stripped.
840 strip_count = 0 if old_prefix == '/' else old_prefix.count('/')
841 new_prefix_parts = new_prefix.split('/')
842 result_prefix = grafted_path.split('/')[:new_prefix.count('/')]
843 result = [(p, None) for p in result_prefix] \
844 + clean_path_components[strip_count:]
845 # Now set the graft point name to match the end of new_prefix.
846 graft_point = len(result_prefix)
847 result[graft_point] = \
848 (new_prefix_parts[-1], clean_path_components[strip_count][1])
849 if new_prefix == '/': # --graft ...=/ is a special case.
852 return path_components(clean_path)
857 """Format bup's version date string for output."""
858 return _version.DATE.split(' ')[0]
861 def version_commit():
862 """Get the commit hash of bup's current version."""
863 return _version.COMMIT
867 """Format bup's version tag (the official version number).
869 When generated from a commit other than one pointed to with a tag, the
870 returned string will be "unknown-" followed by the first seven positions of
873 names = _version.NAMES.strip()
874 assert(names[0] == '(')
875 assert(names[-1] == ')')
877 l = [n.strip() for n in names.split(',')]
879 if n.startswith('tag: bup-'):
881 return 'unknown-%s' % _version.COMMIT[:7]