1 """Helper functions and classes for bup."""
3 import sys, os, pwd, subprocess, errno, socket, select, mmap, stat, re, struct
4 import hashlib, heapq, operator, time, platform, grp
5 from bup import _version, _helpers
6 import bup._helpers as _helpers
8 # This function should really be in helpers, not in bup.options. But we
9 # want options.py to be standalone so people can include it in other projects.
10 from bup.options import _tty_width
11 tty_width = _tty_width
15 """Convert the string 's' to an integer. Return 0 if s is not a number."""
23 """Convert the string 's' to a float. Return 0 if s is not a number."""
25 return float(s or '0')
30 buglvl = atoi(os.environ.get('BUP_DEBUG', 0))
33 # Write (blockingly) to sockets that may or may not be in blocking mode.
34 # We need this because our stderr is sometimes eaten by subprocesses
35 # (probably ssh) that sometimes make it nonblocking, if only temporarily,
36 # leading to race conditions. Ick. We'll do it the hard way.
37 def _hard_write(fd, buf):
39 (r,w,x) = select.select([], [fd], [], None)
41 raise IOError('select(fd) returned without being writable')
43 sz = os.write(fd, buf)
45 if e.errno != errno.EAGAIN:
53 """Print a log message to stderr."""
56 _hard_write(sys.stderr.fileno(), s)
70 istty1 = os.isatty(1) or (atoi(os.environ.get('BUP_FORCE_TTY')) & 1)
71 istty2 = os.isatty(2) or (atoi(os.environ.get('BUP_FORCE_TTY')) & 2)
74 """Calls log() if stderr is a TTY. Does nothing otherwise."""
82 """Calls progress() only if we haven't printed progress in a while.
84 This avoids overloading the stderr buffer with excess junk.
88 if now - _last_prog > 0.1:
94 """Calls progress() to redisplay the most recent progress message.
96 Useful after you've printed some other message that wipes out the
99 if _last_progress and _last_progress.endswith('\r'):
100 progress(_last_progress)
103 def mkdirp(d, mode=None):
104 """Recursively create directories on path 'd'.
106 Unlike os.makedirs(), it doesn't raise an exception if the last element of
107 the path already exists.
115 if e.errno == errno.EEXIST:
122 """Get the next item from an iterator, None if we reached the end."""
125 except StopIteration:
129 def merge_iter(iters, pfreq, pfunc, pfinal, key=None):
131 samekey = lambda e, pe: getattr(e, key) == getattr(pe, key, None)
133 samekey = operator.eq
135 total = sum(len(it) for it in iters)
136 iters = (iter(it) for it in iters)
137 heap = ((next(it),it) for it in iters)
138 heap = [(e,it) for e,it in heap if e]
143 if not count % pfreq:
146 if not samekey(e, pe):
151 e = it.next() # Don't use next() function, it's too expensive
152 except StopIteration:
153 heapq.heappop(heap) # remove current
155 heapq.heapreplace(heap, (e, it)) # shift current to new location
160 """Delete a file at path 'f' if it currently exists.
162 Unlike os.unlink(), does not throw an exception if the file didn't already
168 if e.errno == errno.ENOENT:
169 pass # it doesn't exist, that's what you asked for
173 """Run a subprocess and return its output."""
174 p = subprocess.Popen(argv, stdout=subprocess.PIPE)
181 """Get the absolute path of a file.
183 Behaves like os.path.realpath, but doesn't follow a symlink for the last
184 element. (ie. if 'p' itself is a symlink, this one won't follow it, but it
185 will follow symlinks in p's directory)
191 if st and stat.S_ISLNK(st.st_mode):
192 (dir, name) = os.path.split(p)
193 dir = os.path.realpath(dir)
194 out = os.path.join(dir, name)
196 out = os.path.realpath(p)
197 #log('realpathing:%r,%r\n' % (p, out))
201 def detect_fakeroot():
202 "Return True if we appear to be running under fakeroot."
203 return os.getenv("FAKEROOTKEY") != None
207 if platform.system().startswith('CYGWIN'):
209 return ctypes.cdll.shell32.IsUserAnAdmin()
211 return os.geteuid() == 0
214 def _cache_key_value(get_value, key, cache):
215 """Return (value, was_cached). If there is a value in the cache
216 for key, use that, otherwise, call get_value(key) which should
217 throw a KeyError if there is no value -- in which case the cached
218 and returned value will be None.
220 try: # Do we already have it (or know there wasn't one)?
227 cache[key] = value = get_value(key)
233 _uid_to_pwd_cache = {}
234 _name_to_pwd_cache = {}
236 def pwd_from_uid(uid):
237 """Return password database entry for uid (may be a cached value).
238 Return None if no entry is found.
240 global _uid_to_pwd_cache, _name_to_pwd_cache
241 entry, cached = _cache_key_value(pwd.getpwuid, uid, _uid_to_pwd_cache)
242 if entry and not cached:
243 _name_to_pwd_cache[entry.pw_name] = entry
247 def pwd_from_name(name):
248 """Return password database entry for name (may be a cached value).
249 Return None if no entry is found.
251 global _uid_to_pwd_cache, _name_to_pwd_cache
252 entry, cached = _cache_key_value(pwd.getpwnam, name, _name_to_pwd_cache)
253 if entry and not cached:
254 _uid_to_pwd_cache[entry.pw_uid] = entry
258 _gid_to_grp_cache = {}
259 _name_to_grp_cache = {}
261 def grp_from_gid(gid):
262 """Return password database entry for gid (may be a cached value).
263 Return None if no entry is found.
265 global _gid_to_grp_cache, _name_to_grp_cache
266 entry, cached = _cache_key_value(grp.getgrgid, gid, _gid_to_grp_cache)
267 if entry and not cached:
268 _name_to_grp_cache[entry.gr_name] = entry
272 def grp_from_name(name):
273 """Return password database entry for name (may be a cached value).
274 Return None if no entry is found.
276 global _gid_to_grp_cache, _name_to_grp_cache
277 entry, cached = _cache_key_value(grp.getgrnam, name, _name_to_grp_cache)
278 if entry and not cached:
279 _gid_to_grp_cache[entry.gr_gid] = entry
285 """Get the user's login name."""
289 _username = pwd_from_uid(uid)[0] or 'user%d' % uid
295 """Get the user's full name."""
297 if not _userfullname:
299 entry = pwd_from_uid(uid)
301 _userfullname = entry[4].split(',')[0] or entry[0]
302 if not _userfullname:
303 _userfullname = 'user%d' % uid
309 """Get the FQDN of this machine."""
312 _hostname = socket.getfqdn()
316 _resource_path = None
317 def resource_path(subdir=''):
318 global _resource_path
319 if not _resource_path:
320 _resource_path = os.environ.get('BUP_RESOURCE_PATH') or '.'
321 return os.path.join(_resource_path, subdir)
324 class NotOk(Exception):
329 def __init__(self, outp):
333 while self._read(65536): pass
335 def read(self, size):
336 """Read 'size' bytes from input stream."""
338 return self._read(size)
341 """Read from input stream until a newline is found."""
343 return self._readline()
345 def write(self, data):
346 """Write 'data' to output stream."""
347 #log('%d writing: %d bytes\n' % (os.getpid(), len(data)))
348 self.outp.write(data)
351 """Return true if input stream is readable."""
352 raise NotImplemented("Subclasses must implement has_input")
355 """Indicate end of output from last sent command."""
359 """Indicate server error to the client."""
360 s = re.sub(r'\s+', ' ', str(s))
361 self.write('\nerror %s\n' % s)
363 def _check_ok(self, onempty):
366 for rl in linereader(self):
367 #log('%d got line: %r\n' % (os.getpid(), rl))
368 if not rl: # empty line
372 elif rl.startswith('error '):
373 #log('client: error: %s\n' % rl[6:])
377 raise Exception('server exited unexpectedly; see errors above')
379 def drain_and_check_ok(self):
380 """Remove all data for the current command from input stream."""
383 return self._check_ok(onempty)
386 """Verify that server action completed successfully."""
388 raise Exception('expected "ok", got %r' % rl)
389 return self._check_ok(onempty)
392 class Conn(BaseConn):
393 def __init__(self, inp, outp):
394 BaseConn.__init__(self, outp)
397 def _read(self, size):
398 return self.inp.read(size)
401 return self.inp.readline()
404 [rl, wl, xl] = select.select([self.inp.fileno()], [], [], 0)
406 assert(rl[0] == self.inp.fileno())
412 def checked_reader(fd, n):
414 rl, _, _ = select.select([fd], [], [])
417 if not buf: raise Exception("Unexpected EOF reading %d more bytes" % n)
422 MAX_PACKET = 128 * 1024
423 def mux(p, outfd, outr, errr):
426 while p.poll() is None:
427 rl, _, _ = select.select(fds, [], [])
430 buf = os.read(outr, MAX_PACKET)
432 os.write(outfd, struct.pack('!IB', len(buf), 1) + buf)
434 buf = os.read(errr, 1024)
436 os.write(outfd, struct.pack('!IB', len(buf), 2) + buf)
438 os.write(outfd, struct.pack('!IB', 0, 3))
441 class DemuxConn(BaseConn):
442 """A helper class for bup's client-server protocol."""
443 def __init__(self, infd, outp):
444 BaseConn.__init__(self, outp)
445 # Anything that comes through before the sync string was not
446 # multiplexed and can be assumed to be debug/log before mux init.
448 while tail != 'BUPMUX':
449 b = os.read(infd, (len(tail) < 6) and (6-len(tail)) or 1)
451 raise IOError('demux: unexpected EOF during initialization')
453 sys.stderr.write(tail[:-6]) # pre-mux log messages
460 def write(self, data):
462 BaseConn.write(self, data)
464 def _next_packet(self, timeout):
465 if self.closed: return False
466 rl, wl, xl = select.select([self.infd], [], [], timeout)
467 if not rl: return False
468 assert(rl[0] == self.infd)
469 ns = ''.join(checked_reader(self.infd, 5))
470 n, fdw = struct.unpack('!IB', ns)
471 assert(n <= MAX_PACKET)
473 self.reader = checked_reader(self.infd, n)
475 for buf in checked_reader(self.infd, n):
476 sys.stderr.write(buf)
479 debug2("DemuxConn: marked closed\n")
482 def _load_buf(self, timeout):
483 if self.buf is not None:
485 while not self.closed:
486 while not self.reader:
487 if not self._next_packet(timeout):
490 self.buf = self.reader.next()
492 except StopIteration:
496 def _read_parts(self, ix_fn):
497 while self._load_buf(None):
498 assert(self.buf is not None)
500 if i is None or i == len(self.buf):
505 self.buf = self.buf[i:]
513 return buf.index('\n')+1
516 return ''.join(self._read_parts(find_eol))
518 def _read(self, size):
520 def until_size(buf): # Closes on csize
521 if len(buf) < csize[0]:
526 return ''.join(self._read_parts(until_size))
529 return self._load_buf(0)
533 """Generate a list of input lines from 'f' without terminating newlines."""
541 def chunkyreader(f, count = None):
542 """Generate a list of chunks of data read from 'f'.
544 If count is None, read until EOF is reached.
546 If count is a positive integer, read 'count' bytes from 'f'. If EOF is
547 reached while reading, raise IOError.
551 b = f.read(min(count, 65536))
553 raise IOError('EOF with %d bytes remaining' % count)
564 """Append "/" to 's' if it doesn't aleady end in "/"."""
565 if s and not s.endswith('/'):
571 def _mmap_do(f, sz, flags, prot, close):
573 st = os.fstat(f.fileno())
576 # trying to open a zero-length map gives an error, but an empty
577 # string has all the same behaviour of a zero-length map, ie. it has
580 map = mmap.mmap(f.fileno(), sz, flags, prot)
582 f.close() # map will persist beyond file close
586 def mmap_read(f, sz = 0, close=True):
587 """Create a read-only memory mapped region on file 'f'.
588 If sz is 0, the region will cover the entire file.
590 return _mmap_do(f, sz, mmap.MAP_PRIVATE, mmap.PROT_READ, close)
593 def mmap_readwrite(f, sz = 0, close=True):
594 """Create a read-write memory mapped region on file 'f'.
595 If sz is 0, the region will cover the entire file.
597 return _mmap_do(f, sz, mmap.MAP_SHARED, mmap.PROT_READ|mmap.PROT_WRITE,
601 def mmap_readwrite_private(f, sz = 0, close=True):
602 """Create a read-write memory mapped region on file 'f'.
603 If sz is 0, the region will cover the entire file.
604 The map is private, which means the changes are never flushed back to the
607 return _mmap_do(f, sz, mmap.MAP_PRIVATE, mmap.PROT_READ|mmap.PROT_WRITE,
612 """Parse data size information into a float number.
614 Here are some examples of conversions:
615 199.2k means 203981 bytes
616 1GB means 1073741824 bytes
617 2.1 tb means 2199023255552 bytes
619 g = re.match(r'([-+\d.e]+)\s*(\w*)', str(s))
621 raise ValueError("can't parse %r as a number" % s)
622 (val, unit) = g.groups()
625 if unit in ['t', 'tb']:
626 mult = 1024*1024*1024*1024
627 elif unit in ['g', 'gb']:
628 mult = 1024*1024*1024
629 elif unit in ['m', 'mb']:
631 elif unit in ['k', 'kb']:
633 elif unit in ['', 'b']:
636 raise ValueError("invalid unit %r in number %r" % (unit, s))
641 """Count the number of elements in an iterator. (consumes the iterator)"""
642 return reduce(lambda x,y: x+1, l)
647 """Append an error message to the list of saved errors.
649 Once processing is able to stop and output the errors, the saved errors are
650 accessible in the module variable helpers.saved_errors.
652 saved_errors.append(e)
662 """Replace the default exception handler for KeyboardInterrupt (Ctrl-C).
664 The new exception handler will make sure that bup will exit without an ugly
665 stacktrace when Ctrl-C is hit.
667 oldhook = sys.excepthook
668 def newhook(exctype, value, traceback):
669 if exctype == KeyboardInterrupt:
670 log('Interrupted.\n')
672 return oldhook(exctype, value, traceback)
673 sys.excepthook = newhook
676 def columnate(l, prefix):
677 """Format elements of 'l' in columns with 'prefix' leading each line.
679 The number of columns is determined automatically based on the string
685 clen = max(len(s) for s in l)
686 ncols = (tty_width() - len(prefix)) / (clen + 2)
691 while len(l) % ncols:
694 for s in range(0, len(l), rows):
695 cols.append(l[s:s+rows])
697 for row in zip(*cols):
698 out += prefix + ''.join(('%-*s' % (clen+2, s)) for s in row) + '\n'
702 def parse_date_or_fatal(str, fatal):
703 """Parses the given date or calls Option.fatal().
704 For now we expect a string that contains a float."""
707 except ValueError, e:
708 raise fatal('invalid date format (should be a float): %r' % e)
713 # FIXME: Carefully consider the use of functions (os.path.*, etc.)
714 # that resolve against the current filesystem in the strip/graft
715 # functions for example, but elsewhere as well. I suspect bup's not
716 # always being careful about that. For some cases, the contents of
717 # the current filesystem should be irrelevant, and consulting it might
718 # produce the wrong result, perhaps via unintended symlink resolution,
721 def path_components(path):
722 """Break path into a list of pairs of the form (name,
723 full_path_to_name). Path must start with '/'.
725 '/home/foo' -> [('', '/'), ('home', '/home'), ('foo', '/home/foo')]"""
726 assert(path.startswith('/'))
727 # Since we assume path startswith('/'), we can skip the first element.
729 norm_path = os.path.abspath(path)
733 for p in norm_path.split('/')[1:]:
735 result.append((p, full_path))
739 def stripped_path_components(path, strip_prefixes):
740 """Strip any prefix in strip_prefixes from path and return a list
741 of path components where each component is (name,
742 none_or_full_fs_path_to_name). Assume path startswith('/').
743 See thelpers.py for examples."""
744 normalized_path = os.path.abspath(path)
745 sorted_strip_prefixes = sorted(strip_prefixes, key=len, reverse=True)
746 for bp in sorted_strip_prefixes:
747 normalized_bp = os.path.abspath(bp)
748 if normalized_path.startswith(normalized_bp):
749 prefix = normalized_path[:len(normalized_bp)]
751 for p in normalized_path[len(normalized_bp):].split('/'):
755 result.append((p, prefix))
758 return path_components(path)
761 def grafted_path_components(graft_points, path):
762 # Create a result that consists of some number of faked graft
763 # directories before the graft point, followed by all of the real
764 # directories from path that are after the graft point. Arrange
765 # for the directory at the graft point in the result to correspond
766 # to the "orig" directory in --graft orig=new. See t/thelpers.py
769 # Note that given --graft orig=new, orig and new have *nothing* to
770 # do with each other, even if some of their component names
771 # match. i.e. --graft /foo/bar/baz=/foo/bar/bax is semantically
772 # equivalent to --graft /foo/bar/baz=/x/y/z, or even
775 # FIXME: This can't be the best solution...
776 clean_path = os.path.abspath(path)
777 for graft_point in graft_points:
778 old_prefix, new_prefix = graft_point
779 # Expand prefixes iff not absolute paths.
780 old_prefix = os.path.normpath(old_prefix)
781 new_prefix = os.path.normpath(new_prefix)
782 if clean_path.startswith(old_prefix):
783 escaped_prefix = re.escape(old_prefix)
784 grafted_path = re.sub(r'^' + escaped_prefix, new_prefix, clean_path)
785 # Handle /foo=/ (at least) -- which produces //whatever.
786 grafted_path = '/' + grafted_path.lstrip('/')
787 clean_path_components = path_components(clean_path)
788 # Count the components that were stripped.
789 strip_count = 0 if old_prefix == '/' else old_prefix.count('/')
790 new_prefix_parts = new_prefix.split('/')
791 result_prefix = grafted_path.split('/')[:new_prefix.count('/')]
792 result = [(p, None) for p in result_prefix] \
793 + clean_path_components[strip_count:]
794 # Now set the graft point name to match the end of new_prefix.
795 graft_point = len(result_prefix)
796 result[graft_point] = \
797 (new_prefix_parts[-1], clean_path_components[strip_count][1])
798 if new_prefix == '/': # --graft ...=/ is a special case.
801 return path_components(clean_path)
806 """Format bup's version date string for output."""
807 return _version.DATE.split(' ')[0]
810 def version_commit():
811 """Get the commit hash of bup's current version."""
812 return _version.COMMIT
816 """Format bup's version tag (the official version number).
818 When generated from a commit other than one pointed to with a tag, the
819 returned string will be "unknown-" followed by the first seven positions of
822 names = _version.NAMES.strip()
823 assert(names[0] == '(')
824 assert(names[-1] == ')')
826 l = [n.strip() for n in names.split(',')]
828 if n.startswith('tag: bup-'):
830 return 'unknown-%s' % _version.COMMIT[:7]