1 """Helper functions and classes for bup."""
3 import sys, os, pwd, subprocess, errno, socket, select, mmap, stat, re, struct
4 import hashlib, heapq, operator, time, grp
5 from bup import _version, _helpers
6 import bup._helpers as _helpers
9 # This function should really be in helpers, not in bup.options. But we
10 # want options.py to be standalone so people can include it in other projects.
11 from bup.options import _tty_width
12 tty_width = _tty_width
16 """Convert the string 's' to an integer. Return 0 if s is not a number."""
24 """Convert the string 's' to a float. Return 0 if s is not a number."""
26 return float(s or '0')
31 buglvl = atoi(os.environ.get('BUP_DEBUG', 0))
34 # If the platform doesn't have fdatasync (OS X), fall back to fsync.
36 fdatasync = os.fdatasync
37 except AttributeError:
41 # Write (blockingly) to sockets that may or may not be in blocking mode.
42 # We need this because our stderr is sometimes eaten by subprocesses
43 # (probably ssh) that sometimes make it nonblocking, if only temporarily,
44 # leading to race conditions. Ick. We'll do it the hard way.
45 def _hard_write(fd, buf):
47 (r,w,x) = select.select([], [fd], [], None)
49 raise IOError('select(fd) returned without being writable')
51 sz = os.write(fd, buf)
53 if e.errno != errno.EAGAIN:
61 """Print a log message to stderr."""
64 _hard_write(sys.stderr.fileno(), s)
78 istty1 = os.isatty(1) or (atoi(os.environ.get('BUP_FORCE_TTY')) & 1)
79 istty2 = os.isatty(2) or (atoi(os.environ.get('BUP_FORCE_TTY')) & 2)
82 """Calls log() if stderr is a TTY. Does nothing otherwise."""
90 """Calls progress() only if we haven't printed progress in a while.
92 This avoids overloading the stderr buffer with excess junk.
96 if now - _last_prog > 0.1:
102 """Calls progress() to redisplay the most recent progress message.
104 Useful after you've printed some other message that wipes out the
107 if _last_progress and _last_progress.endswith('\r'):
108 progress(_last_progress)
111 def mkdirp(d, mode=None):
112 """Recursively create directories on path 'd'.
114 Unlike os.makedirs(), it doesn't raise an exception if the last element of
115 the path already exists.
123 if e.errno == errno.EEXIST:
130 """Get the next item from an iterator, None if we reached the end."""
133 except StopIteration:
137 def merge_iter(iters, pfreq, pfunc, pfinal, key=None):
139 samekey = lambda e, pe: getattr(e, key) == getattr(pe, key, None)
141 samekey = operator.eq
143 total = sum(len(it) for it in iters)
144 iters = (iter(it) for it in iters)
145 heap = ((next(it),it) for it in iters)
146 heap = [(e,it) for e,it in heap if e]
151 if not count % pfreq:
154 if not samekey(e, pe):
159 e = it.next() # Don't use next() function, it's too expensive
160 except StopIteration:
161 heapq.heappop(heap) # remove current
163 heapq.heapreplace(heap, (e, it)) # shift current to new location
168 """Delete a file at path 'f' if it currently exists.
170 Unlike os.unlink(), does not throw an exception if the file didn't already
176 if e.errno == errno.ENOENT:
177 pass # it doesn't exist, that's what you asked for
181 """Run a subprocess and return its output."""
182 p = subprocess.Popen(argv, stdout=subprocess.PIPE)
183 out, err = p.communicate()
184 if p.returncode != 0:
185 raise Exception('subprocess %r failed with status %d'
186 % (' '.join(argv), p.retcode))
191 """Get the absolute path of a file.
193 Behaves like os.path.realpath, but doesn't follow a symlink for the last
194 element. (ie. if 'p' itself is a symlink, this one won't follow it, but it
195 will follow symlinks in p's directory)
201 if st and stat.S_ISLNK(st.st_mode):
202 (dir, name) = os.path.split(p)
203 dir = os.path.realpath(dir)
204 out = os.path.join(dir, name)
206 out = os.path.realpath(p)
207 #log('realpathing:%r,%r\n' % (p, out))
211 def detect_fakeroot():
212 "Return True if we appear to be running under fakeroot."
213 return os.getenv("FAKEROOTKEY") != None
217 if sys.platform.startswith('cygwin'):
219 return ctypes.cdll.shell32.IsUserAnAdmin()
221 return os.geteuid() == 0
224 def _cache_key_value(get_value, key, cache):
225 """Return (value, was_cached). If there is a value in the cache
226 for key, use that, otherwise, call get_value(key) which should
227 throw a KeyError if there is no value -- in which case the cached
228 and returned value will be None.
230 try: # Do we already have it (or know there wasn't one)?
237 cache[key] = value = get_value(key)
243 _uid_to_pwd_cache = {}
244 _name_to_pwd_cache = {}
246 def pwd_from_uid(uid):
247 """Return password database entry for uid (may be a cached value).
248 Return None if no entry is found.
250 global _uid_to_pwd_cache, _name_to_pwd_cache
251 entry, cached = _cache_key_value(pwd.getpwuid, uid, _uid_to_pwd_cache)
252 if entry and not cached:
253 _name_to_pwd_cache[entry.pw_name] = entry
257 def pwd_from_name(name):
258 """Return password database entry for name (may be a cached value).
259 Return None if no entry is found.
261 global _uid_to_pwd_cache, _name_to_pwd_cache
262 entry, cached = _cache_key_value(pwd.getpwnam, name, _name_to_pwd_cache)
263 if entry and not cached:
264 _uid_to_pwd_cache[entry.pw_uid] = entry
268 _gid_to_grp_cache = {}
269 _name_to_grp_cache = {}
271 def grp_from_gid(gid):
272 """Return password database entry for gid (may be a cached value).
273 Return None if no entry is found.
275 global _gid_to_grp_cache, _name_to_grp_cache
276 entry, cached = _cache_key_value(grp.getgrgid, gid, _gid_to_grp_cache)
277 if entry and not cached:
278 _name_to_grp_cache[entry.gr_name] = entry
282 def grp_from_name(name):
283 """Return password database entry for name (may be a cached value).
284 Return None if no entry is found.
286 global _gid_to_grp_cache, _name_to_grp_cache
287 entry, cached = _cache_key_value(grp.getgrnam, name, _name_to_grp_cache)
288 if entry and not cached:
289 _gid_to_grp_cache[entry.gr_gid] = entry
295 """Get the user's login name."""
299 _username = pwd_from_uid(uid)[0] or 'user%d' % uid
305 """Get the user's full name."""
307 if not _userfullname:
309 entry = pwd_from_uid(uid)
311 _userfullname = entry[4].split(',')[0] or entry[0]
312 if not _userfullname:
313 _userfullname = 'user%d' % uid
319 """Get the FQDN of this machine."""
322 _hostname = socket.getfqdn()
326 _resource_path = None
327 def resource_path(subdir=''):
328 global _resource_path
329 if not _resource_path:
330 _resource_path = os.environ.get('BUP_RESOURCE_PATH') or '.'
331 return os.path.join(_resource_path, subdir)
333 def format_filesize(size):
338 exponent = int(math.log(size) / math.log(unit))
339 size_prefix = "KMGTPE"[exponent - 1]
340 return "%.1f%s" % (size / math.pow(unit, exponent), size_prefix)
343 class NotOk(Exception):
348 def __init__(self, outp):
352 while self._read(65536): pass
354 def read(self, size):
355 """Read 'size' bytes from input stream."""
357 return self._read(size)
360 """Read from input stream until a newline is found."""
362 return self._readline()
364 def write(self, data):
365 """Write 'data' to output stream."""
366 #log('%d writing: %d bytes\n' % (os.getpid(), len(data)))
367 self.outp.write(data)
370 """Return true if input stream is readable."""
371 raise NotImplemented("Subclasses must implement has_input")
374 """Indicate end of output from last sent command."""
378 """Indicate server error to the client."""
379 s = re.sub(r'\s+', ' ', str(s))
380 self.write('\nerror %s\n' % s)
382 def _check_ok(self, onempty):
385 for rl in linereader(self):
386 #log('%d got line: %r\n' % (os.getpid(), rl))
387 if not rl: # empty line
391 elif rl.startswith('error '):
392 #log('client: error: %s\n' % rl[6:])
396 raise Exception('server exited unexpectedly; see errors above')
398 def drain_and_check_ok(self):
399 """Remove all data for the current command from input stream."""
402 return self._check_ok(onempty)
405 """Verify that server action completed successfully."""
407 raise Exception('expected "ok", got %r' % rl)
408 return self._check_ok(onempty)
411 class Conn(BaseConn):
412 def __init__(self, inp, outp):
413 BaseConn.__init__(self, outp)
416 def _read(self, size):
417 return self.inp.read(size)
420 return self.inp.readline()
423 [rl, wl, xl] = select.select([self.inp.fileno()], [], [], 0)
425 assert(rl[0] == self.inp.fileno())
431 def checked_reader(fd, n):
433 rl, _, _ = select.select([fd], [], [])
436 if not buf: raise Exception("Unexpected EOF reading %d more bytes" % n)
441 MAX_PACKET = 128 * 1024
442 def mux(p, outfd, outr, errr):
445 while p.poll() is None:
446 rl, _, _ = select.select(fds, [], [])
449 buf = os.read(outr, MAX_PACKET)
451 os.write(outfd, struct.pack('!IB', len(buf), 1) + buf)
453 buf = os.read(errr, 1024)
455 os.write(outfd, struct.pack('!IB', len(buf), 2) + buf)
457 os.write(outfd, struct.pack('!IB', 0, 3))
460 class DemuxConn(BaseConn):
461 """A helper class for bup's client-server protocol."""
462 def __init__(self, infd, outp):
463 BaseConn.__init__(self, outp)
464 # Anything that comes through before the sync string was not
465 # multiplexed and can be assumed to be debug/log before mux init.
467 while tail != 'BUPMUX':
468 b = os.read(infd, (len(tail) < 6) and (6-len(tail)) or 1)
470 raise IOError('demux: unexpected EOF during initialization')
472 sys.stderr.write(tail[:-6]) # pre-mux log messages
479 def write(self, data):
481 BaseConn.write(self, data)
483 def _next_packet(self, timeout):
484 if self.closed: return False
485 rl, wl, xl = select.select([self.infd], [], [], timeout)
486 if not rl: return False
487 assert(rl[0] == self.infd)
488 ns = ''.join(checked_reader(self.infd, 5))
489 n, fdw = struct.unpack('!IB', ns)
490 assert(n <= MAX_PACKET)
492 self.reader = checked_reader(self.infd, n)
494 for buf in checked_reader(self.infd, n):
495 sys.stderr.write(buf)
498 debug2("DemuxConn: marked closed\n")
501 def _load_buf(self, timeout):
502 if self.buf is not None:
504 while not self.closed:
505 while not self.reader:
506 if not self._next_packet(timeout):
509 self.buf = self.reader.next()
511 except StopIteration:
515 def _read_parts(self, ix_fn):
516 while self._load_buf(None):
517 assert(self.buf is not None)
519 if i is None or i == len(self.buf):
524 self.buf = self.buf[i:]
532 return buf.index('\n')+1
535 return ''.join(self._read_parts(find_eol))
537 def _read(self, size):
539 def until_size(buf): # Closes on csize
540 if len(buf) < csize[0]:
545 return ''.join(self._read_parts(until_size))
548 return self._load_buf(0)
552 """Generate a list of input lines from 'f' without terminating newlines."""
560 def chunkyreader(f, count = None):
561 """Generate a list of chunks of data read from 'f'.
563 If count is None, read until EOF is reached.
565 If count is a positive integer, read 'count' bytes from 'f'. If EOF is
566 reached while reading, raise IOError.
570 b = f.read(min(count, 65536))
572 raise IOError('EOF with %d bytes remaining' % count)
583 """Append "/" to 's' if it doesn't aleady end in "/"."""
584 if s and not s.endswith('/'):
590 def _mmap_do(f, sz, flags, prot, close):
592 st = os.fstat(f.fileno())
595 # trying to open a zero-length map gives an error, but an empty
596 # string has all the same behaviour of a zero-length map, ie. it has
599 map = mmap.mmap(f.fileno(), sz, flags, prot)
601 f.close() # map will persist beyond file close
605 def mmap_read(f, sz = 0, close=True):
606 """Create a read-only memory mapped region on file 'f'.
607 If sz is 0, the region will cover the entire file.
609 return _mmap_do(f, sz, mmap.MAP_PRIVATE, mmap.PROT_READ, close)
612 def mmap_readwrite(f, sz = 0, close=True):
613 """Create a read-write memory mapped region on file 'f'.
614 If sz is 0, the region will cover the entire file.
616 return _mmap_do(f, sz, mmap.MAP_SHARED, mmap.PROT_READ|mmap.PROT_WRITE,
620 def mmap_readwrite_private(f, sz = 0, close=True):
621 """Create a read-write memory mapped region on file 'f'.
622 If sz is 0, the region will cover the entire file.
623 The map is private, which means the changes are never flushed back to the
626 return _mmap_do(f, sz, mmap.MAP_PRIVATE, mmap.PROT_READ|mmap.PROT_WRITE,
630 def parse_timestamp(epoch_str):
631 """Return the number of nanoseconds since the epoch that are described
632 by epoch_str (100ms, 100ns, ...); when epoch_str cannot be parsed,
633 throw a ValueError that may contain additional information."""
634 ns_per = {'s' : 1000000000,
638 match = re.match(r'^((?:[-+]?[0-9]+)?)(s|ms|us|ns)$', epoch_str)
640 if re.match(r'^([-+]?[0-9]+)$', epoch_str):
641 raise ValueError('must include units, i.e. 100ns, 100ms, ...')
643 (n, units) = match.group(1, 2)
647 return n * ns_per[units]
651 """Parse data size information into a float number.
653 Here are some examples of conversions:
654 199.2k means 203981 bytes
655 1GB means 1073741824 bytes
656 2.1 tb means 2199023255552 bytes
658 g = re.match(r'([-+\d.e]+)\s*(\w*)', str(s))
660 raise ValueError("can't parse %r as a number" % s)
661 (val, unit) = g.groups()
664 if unit in ['t', 'tb']:
665 mult = 1024*1024*1024*1024
666 elif unit in ['g', 'gb']:
667 mult = 1024*1024*1024
668 elif unit in ['m', 'mb']:
670 elif unit in ['k', 'kb']:
672 elif unit in ['', 'b']:
675 raise ValueError("invalid unit %r in number %r" % (unit, s))
680 """Count the number of elements in an iterator. (consumes the iterator)"""
681 return reduce(lambda x,y: x+1, l)
686 """Append an error message to the list of saved errors.
688 Once processing is able to stop and output the errors, the saved errors are
689 accessible in the module variable helpers.saved_errors.
691 saved_errors.append(e)
701 """Replace the default exception handler for KeyboardInterrupt (Ctrl-C).
703 The new exception handler will make sure that bup will exit without an ugly
704 stacktrace when Ctrl-C is hit.
706 oldhook = sys.excepthook
707 def newhook(exctype, value, traceback):
708 if exctype == KeyboardInterrupt:
709 log('\nInterrupted.\n')
711 return oldhook(exctype, value, traceback)
712 sys.excepthook = newhook
715 def columnate(l, prefix):
716 """Format elements of 'l' in columns with 'prefix' leading each line.
718 The number of columns is determined automatically based on the string
724 clen = max(len(s) for s in l)
725 ncols = (tty_width() - len(prefix)) / (clen + 2)
730 while len(l) % ncols:
733 for s in range(0, len(l), rows):
734 cols.append(l[s:s+rows])
736 for row in zip(*cols):
737 out += prefix + ''.join(('%-*s' % (clen+2, s)) for s in row) + '\n'
741 def parse_date_or_fatal(str, fatal):
742 """Parses the given date or calls Option.fatal().
743 For now we expect a string that contains a float."""
746 except ValueError, e:
747 raise fatal('invalid date format (should be a float): %r' % e)
752 def parse_excludes(options, fatal):
753 """Traverse the options and extract all excludes, or call Option.fatal()."""
757 (option, parameter) = flag
758 if option == '--exclude':
759 excluded_paths.append(realpath(parameter))
760 elif option == '--exclude-from':
762 f = open(realpath(parameter))
764 raise fatal("couldn't read %s" % parameter)
765 for exclude_path in f.readlines():
766 excluded_paths.append(realpath(exclude_path.strip()))
767 return sorted(frozenset(excluded_paths))
770 def parse_rx_excludes(options, fatal):
771 """Traverse the options and extract all rx excludes, or call
773 excluded_patterns = []
776 (option, parameter) = flag
777 if option == '--exclude-rx':
779 excluded_patterns.append(re.compile(parameter))
781 fatal('invalid --exclude-rx pattern (%s): %s' % (parameter, ex))
782 elif option == '--exclude-rx-from':
784 f = open(realpath(parameter))
786 raise fatal("couldn't read %s" % parameter)
787 for pattern in f.readlines():
788 spattern = pattern.rstrip('\n')
790 excluded_patterns.append(re.compile(spattern))
792 fatal('invalid --exclude-rx pattern (%s): %s' % (spattern, ex))
793 return excluded_patterns
796 def should_rx_exclude_path(path, exclude_rxs):
797 """Return True if path matches a regular expression in exclude_rxs."""
798 for rx in exclude_rxs:
800 debug1('Skipping %r: excluded by rx pattern %r.\n'
801 % (path, rx.pattern))
806 # FIXME: Carefully consider the use of functions (os.path.*, etc.)
807 # that resolve against the current filesystem in the strip/graft
808 # functions for example, but elsewhere as well. I suspect bup's not
809 # always being careful about that. For some cases, the contents of
810 # the current filesystem should be irrelevant, and consulting it might
811 # produce the wrong result, perhaps via unintended symlink resolution,
814 def path_components(path):
815 """Break path into a list of pairs of the form (name,
816 full_path_to_name). Path must start with '/'.
818 '/home/foo' -> [('', '/'), ('home', '/home'), ('foo', '/home/foo')]"""
819 if not path.startswith('/'):
820 raise Exception, 'path must start with "/": %s' % path
821 # Since we assume path startswith('/'), we can skip the first element.
823 norm_path = os.path.abspath(path)
827 for p in norm_path.split('/')[1:]:
829 result.append((p, full_path))
833 def stripped_path_components(path, strip_prefixes):
834 """Strip any prefix in strip_prefixes from path and return a list
835 of path components where each component is (name,
836 none_or_full_fs_path_to_name). Assume path startswith('/').
837 See thelpers.py for examples."""
838 normalized_path = os.path.abspath(path)
839 sorted_strip_prefixes = sorted(strip_prefixes, key=len, reverse=True)
840 for bp in sorted_strip_prefixes:
841 normalized_bp = os.path.abspath(bp)
842 if normalized_path.startswith(normalized_bp):
843 prefix = normalized_path[:len(normalized_bp)]
845 for p in normalized_path[len(normalized_bp):].split('/'):
849 result.append((p, prefix))
852 return path_components(path)
855 def grafted_path_components(graft_points, path):
856 # Create a result that consists of some number of faked graft
857 # directories before the graft point, followed by all of the real
858 # directories from path that are after the graft point. Arrange
859 # for the directory at the graft point in the result to correspond
860 # to the "orig" directory in --graft orig=new. See t/thelpers.py
863 # Note that given --graft orig=new, orig and new have *nothing* to
864 # do with each other, even if some of their component names
865 # match. i.e. --graft /foo/bar/baz=/foo/bar/bax is semantically
866 # equivalent to --graft /foo/bar/baz=/x/y/z, or even
869 # FIXME: This can't be the best solution...
870 clean_path = os.path.abspath(path)
871 for graft_point in graft_points:
872 old_prefix, new_prefix = graft_point
873 # Expand prefixes iff not absolute paths.
874 old_prefix = os.path.normpath(old_prefix)
875 new_prefix = os.path.normpath(new_prefix)
876 if clean_path.startswith(old_prefix):
877 escaped_prefix = re.escape(old_prefix)
878 grafted_path = re.sub(r'^' + escaped_prefix, new_prefix, clean_path)
879 # Handle /foo=/ (at least) -- which produces //whatever.
880 grafted_path = '/' + grafted_path.lstrip('/')
881 clean_path_components = path_components(clean_path)
882 # Count the components that were stripped.
883 strip_count = 0 if old_prefix == '/' else old_prefix.count('/')
884 new_prefix_parts = new_prefix.split('/')
885 result_prefix = grafted_path.split('/')[:new_prefix.count('/')]
886 result = [(p, None) for p in result_prefix] \
887 + clean_path_components[strip_count:]
888 # Now set the graft point name to match the end of new_prefix.
889 graft_point = len(result_prefix)
890 result[graft_point] = \
891 (new_prefix_parts[-1], clean_path_components[strip_count][1])
892 if new_prefix == '/': # --graft ...=/ is a special case.
895 return path_components(clean_path)
900 """Format bup's version date string for output."""
901 return _version.DATE.split(' ')[0]
904 def version_commit():
905 """Get the commit hash of bup's current version."""
906 return _version.COMMIT
910 """Format bup's version tag (the official version number).
912 When generated from a commit other than one pointed to with a tag, the
913 returned string will be "unknown-" followed by the first seven positions of
916 names = _version.NAMES.strip()
917 assert(names[0] == '(')
918 assert(names[-1] == ')')
920 l = [n.strip() for n in names.split(',')]
922 if n.startswith('tag: bup-'):
924 return 'unknown-%s' % _version.COMMIT[:7]