1 """Helper functions and classes for bup."""
3 import sys, os, pwd, subprocess, errno, socket, select, mmap, stat, re, struct
4 import hashlib, heapq, operator, time, grp
5 from bup import _version, _helpers
6 import bup._helpers as _helpers
9 # This function should really be in helpers, not in bup.options. But we
10 # want options.py to be standalone so people can include it in other projects.
11 from bup.options import _tty_width
12 tty_width = _tty_width
16 """Convert the string 's' to an integer. Return 0 if s is not a number."""
24 """Convert the string 's' to a float. Return 0 if s is not a number."""
26 return float(s or '0')
31 buglvl = atoi(os.environ.get('BUP_DEBUG', 0))
34 # If the platform doesn't have fdatasync (OS X), fall back to fsync.
36 fdatasync = os.fdatasync
37 except AttributeError:
41 # Write (blockingly) to sockets that may or may not be in blocking mode.
42 # We need this because our stderr is sometimes eaten by subprocesses
43 # (probably ssh) that sometimes make it nonblocking, if only temporarily,
44 # leading to race conditions. Ick. We'll do it the hard way.
45 def _hard_write(fd, buf):
47 (r,w,x) = select.select([], [fd], [], None)
49 raise IOError('select(fd) returned without being writable')
51 sz = os.write(fd, buf)
53 if e.errno != errno.EAGAIN:
61 """Print a log message to stderr."""
64 _hard_write(sys.stderr.fileno(), s)
78 istty1 = os.isatty(1) or (atoi(os.environ.get('BUP_FORCE_TTY')) & 1)
79 istty2 = os.isatty(2) or (atoi(os.environ.get('BUP_FORCE_TTY')) & 2)
82 """Calls log() if stderr is a TTY. Does nothing otherwise."""
90 """Calls progress() only if we haven't printed progress in a while.
92 This avoids overloading the stderr buffer with excess junk.
96 if now - _last_prog > 0.1:
102 """Calls progress() to redisplay the most recent progress message.
104 Useful after you've printed some other message that wipes out the
107 if _last_progress and _last_progress.endswith('\r'):
108 progress(_last_progress)
111 def mkdirp(d, mode=None):
112 """Recursively create directories on path 'd'.
114 Unlike os.makedirs(), it doesn't raise an exception if the last element of
115 the path already exists.
123 if e.errno == errno.EEXIST:
130 """Get the next item from an iterator, None if we reached the end."""
133 except StopIteration:
137 def merge_iter(iters, pfreq, pfunc, pfinal, key=None):
139 samekey = lambda e, pe: getattr(e, key) == getattr(pe, key, None)
141 samekey = operator.eq
143 total = sum(len(it) for it in iters)
144 iters = (iter(it) for it in iters)
145 heap = ((next(it),it) for it in iters)
146 heap = [(e,it) for e,it in heap if e]
151 if not count % pfreq:
154 if not samekey(e, pe):
159 e = it.next() # Don't use next() function, it's too expensive
160 except StopIteration:
161 heapq.heappop(heap) # remove current
163 heapq.heapreplace(heap, (e, it)) # shift current to new location
168 """Delete a file at path 'f' if it currently exists.
170 Unlike os.unlink(), does not throw an exception if the file didn't already
176 if e.errno == errno.ENOENT:
177 pass # it doesn't exist, that's what you asked for
181 """Run a subprocess and return its output."""
182 p = subprocess.Popen(argv, stdout=subprocess.PIPE)
189 """Get the absolute path of a file.
191 Behaves like os.path.realpath, but doesn't follow a symlink for the last
192 element. (ie. if 'p' itself is a symlink, this one won't follow it, but it
193 will follow symlinks in p's directory)
199 if st and stat.S_ISLNK(st.st_mode):
200 (dir, name) = os.path.split(p)
201 dir = os.path.realpath(dir)
202 out = os.path.join(dir, name)
204 out = os.path.realpath(p)
205 #log('realpathing:%r,%r\n' % (p, out))
209 def detect_fakeroot():
210 "Return True if we appear to be running under fakeroot."
211 return os.getenv("FAKEROOTKEY") != None
215 if sys.platform.startswith('cygwin'):
217 return ctypes.cdll.shell32.IsUserAnAdmin()
219 return os.geteuid() == 0
222 def _cache_key_value(get_value, key, cache):
223 """Return (value, was_cached). If there is a value in the cache
224 for key, use that, otherwise, call get_value(key) which should
225 throw a KeyError if there is no value -- in which case the cached
226 and returned value will be None.
228 try: # Do we already have it (or know there wasn't one)?
235 cache[key] = value = get_value(key)
241 _uid_to_pwd_cache = {}
242 _name_to_pwd_cache = {}
244 def pwd_from_uid(uid):
245 """Return password database entry for uid (may be a cached value).
246 Return None if no entry is found.
248 global _uid_to_pwd_cache, _name_to_pwd_cache
249 entry, cached = _cache_key_value(pwd.getpwuid, uid, _uid_to_pwd_cache)
250 if entry and not cached:
251 _name_to_pwd_cache[entry.pw_name] = entry
255 def pwd_from_name(name):
256 """Return password database entry for name (may be a cached value).
257 Return None if no entry is found.
259 global _uid_to_pwd_cache, _name_to_pwd_cache
260 entry, cached = _cache_key_value(pwd.getpwnam, name, _name_to_pwd_cache)
261 if entry and not cached:
262 _uid_to_pwd_cache[entry.pw_uid] = entry
266 _gid_to_grp_cache = {}
267 _name_to_grp_cache = {}
269 def grp_from_gid(gid):
270 """Return password database entry for gid (may be a cached value).
271 Return None if no entry is found.
273 global _gid_to_grp_cache, _name_to_grp_cache
274 entry, cached = _cache_key_value(grp.getgrgid, gid, _gid_to_grp_cache)
275 if entry and not cached:
276 _name_to_grp_cache[entry.gr_name] = entry
280 def grp_from_name(name):
281 """Return password database entry for name (may be a cached value).
282 Return None if no entry is found.
284 global _gid_to_grp_cache, _name_to_grp_cache
285 entry, cached = _cache_key_value(grp.getgrnam, name, _name_to_grp_cache)
286 if entry and not cached:
287 _gid_to_grp_cache[entry.gr_gid] = entry
293 """Get the user's login name."""
297 _username = pwd_from_uid(uid)[0] or 'user%d' % uid
303 """Get the user's full name."""
305 if not _userfullname:
307 entry = pwd_from_uid(uid)
309 _userfullname = entry[4].split(',')[0] or entry[0]
310 if not _userfullname:
311 _userfullname = 'user%d' % uid
317 """Get the FQDN of this machine."""
320 _hostname = socket.getfqdn()
324 _resource_path = None
325 def resource_path(subdir=''):
326 global _resource_path
327 if not _resource_path:
328 _resource_path = os.environ.get('BUP_RESOURCE_PATH') or '.'
329 return os.path.join(_resource_path, subdir)
331 def format_filesize(size):
336 exponent = int(math.log(size) / math.log(unit))
337 size_prefix = "KMGTPE"[exponent - 1]
338 return "%.1f%s" % (size / math.pow(unit, exponent), size_prefix)
341 class NotOk(Exception):
346 def __init__(self, outp):
350 while self._read(65536): pass
352 def read(self, size):
353 """Read 'size' bytes from input stream."""
355 return self._read(size)
358 """Read from input stream until a newline is found."""
360 return self._readline()
362 def write(self, data):
363 """Write 'data' to output stream."""
364 #log('%d writing: %d bytes\n' % (os.getpid(), len(data)))
365 self.outp.write(data)
368 """Return true if input stream is readable."""
369 raise NotImplemented("Subclasses must implement has_input")
372 """Indicate end of output from last sent command."""
376 """Indicate server error to the client."""
377 s = re.sub(r'\s+', ' ', str(s))
378 self.write('\nerror %s\n' % s)
380 def _check_ok(self, onempty):
383 for rl in linereader(self):
384 #log('%d got line: %r\n' % (os.getpid(), rl))
385 if not rl: # empty line
389 elif rl.startswith('error '):
390 #log('client: error: %s\n' % rl[6:])
394 raise Exception('server exited unexpectedly; see errors above')
396 def drain_and_check_ok(self):
397 """Remove all data for the current command from input stream."""
400 return self._check_ok(onempty)
403 """Verify that server action completed successfully."""
405 raise Exception('expected "ok", got %r' % rl)
406 return self._check_ok(onempty)
409 class Conn(BaseConn):
410 def __init__(self, inp, outp):
411 BaseConn.__init__(self, outp)
414 def _read(self, size):
415 return self.inp.read(size)
418 return self.inp.readline()
421 [rl, wl, xl] = select.select([self.inp.fileno()], [], [], 0)
423 assert(rl[0] == self.inp.fileno())
429 def checked_reader(fd, n):
431 rl, _, _ = select.select([fd], [], [])
434 if not buf: raise Exception("Unexpected EOF reading %d more bytes" % n)
439 MAX_PACKET = 128 * 1024
440 def mux(p, outfd, outr, errr):
443 while p.poll() is None:
444 rl, _, _ = select.select(fds, [], [])
447 buf = os.read(outr, MAX_PACKET)
449 os.write(outfd, struct.pack('!IB', len(buf), 1) + buf)
451 buf = os.read(errr, 1024)
453 os.write(outfd, struct.pack('!IB', len(buf), 2) + buf)
455 os.write(outfd, struct.pack('!IB', 0, 3))
458 class DemuxConn(BaseConn):
459 """A helper class for bup's client-server protocol."""
460 def __init__(self, infd, outp):
461 BaseConn.__init__(self, outp)
462 # Anything that comes through before the sync string was not
463 # multiplexed and can be assumed to be debug/log before mux init.
465 while tail != 'BUPMUX':
466 b = os.read(infd, (len(tail) < 6) and (6-len(tail)) or 1)
468 raise IOError('demux: unexpected EOF during initialization')
470 sys.stderr.write(tail[:-6]) # pre-mux log messages
477 def write(self, data):
479 BaseConn.write(self, data)
481 def _next_packet(self, timeout):
482 if self.closed: return False
483 rl, wl, xl = select.select([self.infd], [], [], timeout)
484 if not rl: return False
485 assert(rl[0] == self.infd)
486 ns = ''.join(checked_reader(self.infd, 5))
487 n, fdw = struct.unpack('!IB', ns)
488 assert(n <= MAX_PACKET)
490 self.reader = checked_reader(self.infd, n)
492 for buf in checked_reader(self.infd, n):
493 sys.stderr.write(buf)
496 debug2("DemuxConn: marked closed\n")
499 def _load_buf(self, timeout):
500 if self.buf is not None:
502 while not self.closed:
503 while not self.reader:
504 if not self._next_packet(timeout):
507 self.buf = self.reader.next()
509 except StopIteration:
513 def _read_parts(self, ix_fn):
514 while self._load_buf(None):
515 assert(self.buf is not None)
517 if i is None or i == len(self.buf):
522 self.buf = self.buf[i:]
530 return buf.index('\n')+1
533 return ''.join(self._read_parts(find_eol))
535 def _read(self, size):
537 def until_size(buf): # Closes on csize
538 if len(buf) < csize[0]:
543 return ''.join(self._read_parts(until_size))
546 return self._load_buf(0)
550 """Generate a list of input lines from 'f' without terminating newlines."""
558 def chunkyreader(f, count = None):
559 """Generate a list of chunks of data read from 'f'.
561 If count is None, read until EOF is reached.
563 If count is a positive integer, read 'count' bytes from 'f'. If EOF is
564 reached while reading, raise IOError.
568 b = f.read(min(count, 65536))
570 raise IOError('EOF with %d bytes remaining' % count)
581 """Append "/" to 's' if it doesn't aleady end in "/"."""
582 if s and not s.endswith('/'):
588 def _mmap_do(f, sz, flags, prot, close):
590 st = os.fstat(f.fileno())
593 # trying to open a zero-length map gives an error, but an empty
594 # string has all the same behaviour of a zero-length map, ie. it has
597 map = mmap.mmap(f.fileno(), sz, flags, prot)
599 f.close() # map will persist beyond file close
603 def mmap_read(f, sz = 0, close=True):
604 """Create a read-only memory mapped region on file 'f'.
605 If sz is 0, the region will cover the entire file.
607 return _mmap_do(f, sz, mmap.MAP_PRIVATE, mmap.PROT_READ, close)
610 def mmap_readwrite(f, sz = 0, close=True):
611 """Create a read-write memory mapped region on file 'f'.
612 If sz is 0, the region will cover the entire file.
614 return _mmap_do(f, sz, mmap.MAP_SHARED, mmap.PROT_READ|mmap.PROT_WRITE,
618 def mmap_readwrite_private(f, sz = 0, close=True):
619 """Create a read-write memory mapped region on file 'f'.
620 If sz is 0, the region will cover the entire file.
621 The map is private, which means the changes are never flushed back to the
624 return _mmap_do(f, sz, mmap.MAP_PRIVATE, mmap.PROT_READ|mmap.PROT_WRITE,
629 """Parse data size information into a float number.
631 Here are some examples of conversions:
632 199.2k means 203981 bytes
633 1GB means 1073741824 bytes
634 2.1 tb means 2199023255552 bytes
636 g = re.match(r'([-+\d.e]+)\s*(\w*)', str(s))
638 raise ValueError("can't parse %r as a number" % s)
639 (val, unit) = g.groups()
642 if unit in ['t', 'tb']:
643 mult = 1024*1024*1024*1024
644 elif unit in ['g', 'gb']:
645 mult = 1024*1024*1024
646 elif unit in ['m', 'mb']:
648 elif unit in ['k', 'kb']:
650 elif unit in ['', 'b']:
653 raise ValueError("invalid unit %r in number %r" % (unit, s))
658 """Count the number of elements in an iterator. (consumes the iterator)"""
659 return reduce(lambda x,y: x+1, l)
664 """Append an error message to the list of saved errors.
666 Once processing is able to stop and output the errors, the saved errors are
667 accessible in the module variable helpers.saved_errors.
669 saved_errors.append(e)
679 """Replace the default exception handler for KeyboardInterrupt (Ctrl-C).
681 The new exception handler will make sure that bup will exit without an ugly
682 stacktrace when Ctrl-C is hit.
684 oldhook = sys.excepthook
685 def newhook(exctype, value, traceback):
686 if exctype == KeyboardInterrupt:
687 log('\nInterrupted.\n')
689 return oldhook(exctype, value, traceback)
690 sys.excepthook = newhook
693 def columnate(l, prefix):
694 """Format elements of 'l' in columns with 'prefix' leading each line.
696 The number of columns is determined automatically based on the string
702 clen = max(len(s) for s in l)
703 ncols = (tty_width() - len(prefix)) / (clen + 2)
708 while len(l) % ncols:
711 for s in range(0, len(l), rows):
712 cols.append(l[s:s+rows])
714 for row in zip(*cols):
715 out += prefix + ''.join(('%-*s' % (clen+2, s)) for s in row) + '\n'
719 def parse_date_or_fatal(str, fatal):
720 """Parses the given date or calls Option.fatal().
721 For now we expect a string that contains a float."""
724 except ValueError, e:
725 raise fatal('invalid date format (should be a float): %r' % e)
730 def parse_excludes(options, fatal):
731 """Traverse the options and extract all excludes, or call Option.fatal()."""
735 (option, parameter) = flag
736 if option == '--exclude':
737 excluded_paths.append(realpath(parameter))
738 elif option == '--exclude-from':
740 f = open(realpath(parameter))
742 raise fatal("couldn't read %s" % parameter)
743 for exclude_path in f.readlines():
744 excluded_paths.append(realpath(exclude_path.strip()))
745 return excluded_paths
748 def parse_rx_excludes(options, fatal):
749 """Traverse the options and extract all rx excludes, or call
751 rxs = [v for f, v in options if f == '--exclude-rx']
752 for i in range(len(rxs)):
754 rxs[i] = re.compile(rxs[i])
756 o.fatal('invalid --exclude-rx pattern (%s):' % (ex, rxs[i]))
760 def should_rx_exclude_path(path, exclude_rxs):
761 """Return True if path matches a regular expression in exclude_rxs."""
762 for rx in exclude_rxs:
764 debug1('Skipping %r: excluded by rx pattern %r.\n'
765 % (path, rx.pattern))
770 # FIXME: Carefully consider the use of functions (os.path.*, etc.)
771 # that resolve against the current filesystem in the strip/graft
772 # functions for example, but elsewhere as well. I suspect bup's not
773 # always being careful about that. For some cases, the contents of
774 # the current filesystem should be irrelevant, and consulting it might
775 # produce the wrong result, perhaps via unintended symlink resolution,
778 def path_components(path):
779 """Break path into a list of pairs of the form (name,
780 full_path_to_name). Path must start with '/'.
782 '/home/foo' -> [('', '/'), ('home', '/home'), ('foo', '/home/foo')]"""
783 if not path.startswith('/'):
784 raise Exception, 'path must start with "/": %s' % path
785 # Since we assume path startswith('/'), we can skip the first element.
787 norm_path = os.path.abspath(path)
791 for p in norm_path.split('/')[1:]:
793 result.append((p, full_path))
797 def stripped_path_components(path, strip_prefixes):
798 """Strip any prefix in strip_prefixes from path and return a list
799 of path components where each component is (name,
800 none_or_full_fs_path_to_name). Assume path startswith('/').
801 See thelpers.py for examples."""
802 normalized_path = os.path.abspath(path)
803 sorted_strip_prefixes = sorted(strip_prefixes, key=len, reverse=True)
804 for bp in sorted_strip_prefixes:
805 normalized_bp = os.path.abspath(bp)
806 if normalized_path.startswith(normalized_bp):
807 prefix = normalized_path[:len(normalized_bp)]
809 for p in normalized_path[len(normalized_bp):].split('/'):
813 result.append((p, prefix))
816 return path_components(path)
819 def grafted_path_components(graft_points, path):
820 # Create a result that consists of some number of faked graft
821 # directories before the graft point, followed by all of the real
822 # directories from path that are after the graft point. Arrange
823 # for the directory at the graft point in the result to correspond
824 # to the "orig" directory in --graft orig=new. See t/thelpers.py
827 # Note that given --graft orig=new, orig and new have *nothing* to
828 # do with each other, even if some of their component names
829 # match. i.e. --graft /foo/bar/baz=/foo/bar/bax is semantically
830 # equivalent to --graft /foo/bar/baz=/x/y/z, or even
833 # FIXME: This can't be the best solution...
834 clean_path = os.path.abspath(path)
835 for graft_point in graft_points:
836 old_prefix, new_prefix = graft_point
837 # Expand prefixes iff not absolute paths.
838 old_prefix = os.path.normpath(old_prefix)
839 new_prefix = os.path.normpath(new_prefix)
840 if clean_path.startswith(old_prefix):
841 escaped_prefix = re.escape(old_prefix)
842 grafted_path = re.sub(r'^' + escaped_prefix, new_prefix, clean_path)
843 # Handle /foo=/ (at least) -- which produces //whatever.
844 grafted_path = '/' + grafted_path.lstrip('/')
845 clean_path_components = path_components(clean_path)
846 # Count the components that were stripped.
847 strip_count = 0 if old_prefix == '/' else old_prefix.count('/')
848 new_prefix_parts = new_prefix.split('/')
849 result_prefix = grafted_path.split('/')[:new_prefix.count('/')]
850 result = [(p, None) for p in result_prefix] \
851 + clean_path_components[strip_count:]
852 # Now set the graft point name to match the end of new_prefix.
853 graft_point = len(result_prefix)
854 result[graft_point] = \
855 (new_prefix_parts[-1], clean_path_components[strip_count][1])
856 if new_prefix == '/': # --graft ...=/ is a special case.
859 return path_components(clean_path)
864 """Format bup's version date string for output."""
865 return _version.DATE.split(' ')[0]
868 def version_commit():
869 """Get the commit hash of bup's current version."""
870 return _version.COMMIT
874 """Format bup's version tag (the official version number).
876 When generated from a commit other than one pointed to with a tag, the
877 returned string will be "unknown-" followed by the first seven positions of
880 names = _version.NAMES.strip()
881 assert(names[0] == '(')
882 assert(names[-1] == ')')
884 l = [n.strip() for n in names.split(',')]
886 if n.startswith('tag: bup-'):
888 return 'unknown-%s' % _version.COMMIT[:7]