1 """Helper functions and classes for bup."""
3 import sys, os, pwd, subprocess, errno, socket, select, mmap, stat, re, struct
4 import heapq, operator, time, platform
5 from bup import _version, _helpers
6 import bup._helpers as _helpers
8 # This function should really be in helpers, not in bup.options. But we
9 # want options.py to be standalone so people can include it in other projects.
10 from bup.options import _tty_width
11 tty_width = _tty_width
15 """Convert the string 's' to an integer. Return 0 if s is not a number."""
23 """Convert the string 's' to a float. Return 0 if s is not a number."""
25 return float(s or '0')
30 buglvl = atoi(os.environ.get('BUP_DEBUG', 0))
33 # Write (blockingly) to sockets that may or may not be in blocking mode.
34 # We need this because our stderr is sometimes eaten by subprocesses
35 # (probably ssh) that sometimes make it nonblocking, if only temporarily,
36 # leading to race conditions. Ick. We'll do it the hard way.
37 def _hard_write(fd, buf):
39 (r,w,x) = select.select([], [fd], [], None)
41 raise IOError('select(fd) returned without being writable')
43 sz = os.write(fd, buf)
45 if e.errno != errno.EAGAIN:
53 """Print a log message to stderr."""
56 _hard_write(sys.stderr.fileno(), s)
70 istty1 = os.isatty(1) or (atoi(os.environ.get('BUP_FORCE_TTY')) & 1)
71 istty2 = os.isatty(2) or (atoi(os.environ.get('BUP_FORCE_TTY')) & 2)
74 """Calls log() if stderr is a TTY. Does nothing otherwise."""
82 """Calls progress() only if we haven't printed progress in a while.
84 This avoids overloading the stderr buffer with excess junk.
88 if now - _last_prog > 0.1:
94 """Calls progress() to redisplay the most recent progress message.
96 Useful after you've printed some other message that wipes out the
99 if _last_progress and _last_progress.endswith('\r'):
100 progress(_last_progress)
103 def mkdirp(d, mode=None):
104 """Recursively create directories on path 'd'.
106 Unlike os.makedirs(), it doesn't raise an exception if the last element of
107 the path already exists.
115 if e.errno == errno.EEXIST:
122 """Get the next item from an iterator, None if we reached the end."""
125 except StopIteration:
129 def merge_iter(iters, pfreq, pfunc, pfinal, key=None):
131 samekey = lambda e, pe: getattr(e, key) == getattr(pe, key, None)
133 samekey = operator.eq
135 total = sum(len(it) for it in iters)
136 iters = (iter(it) for it in iters)
137 heap = ((next(it),it) for it in iters)
138 heap = [(e,it) for e,it in heap if e]
143 if not count % pfreq:
146 if not samekey(e, pe):
151 e = it.next() # Don't use next() function, it's too expensive
152 except StopIteration:
153 heapq.heappop(heap) # remove current
155 heapq.heapreplace(heap, (e, it)) # shift current to new location
160 """Delete a file at path 'f' if it currently exists.
162 Unlike os.unlink(), does not throw an exception if the file didn't already
168 if e.errno == errno.ENOENT:
169 pass # it doesn't exist, that's what you asked for
173 """Run a subprocess and return its output."""
174 p = subprocess.Popen(argv, stdout=subprocess.PIPE)
181 """Get the absolute path of a file.
183 Behaves like os.path.realpath, but doesn't follow a symlink for the last
184 element. (ie. if 'p' itself is a symlink, this one won't follow it, but it
185 will follow symlinks in p's directory)
191 if st and stat.S_ISLNK(st.st_mode):
192 (dir, name) = os.path.split(p)
193 dir = os.path.realpath(dir)
194 out = os.path.join(dir, name)
196 out = os.path.realpath(p)
197 #log('realpathing:%r,%r\n' % (p, out))
201 def detect_fakeroot():
202 "Return True if we appear to be running under fakeroot."
203 return os.getenv("FAKEROOTKEY") != None
207 if platform.system().startswith('CYGWIN'):
209 return ctypes.cdll.shell32.IsUserAnAdmin()
211 return os.geteuid() == 0
216 """Get the user's login name."""
221 _username = pwd.getpwuid(uid)[0]
223 _username = 'user%d' % uid
229 """Get the user's full name."""
231 if not _userfullname:
234 _userfullname = pwd.getpwuid(uid)[4].split(',')[0]
236 _userfullname = 'user%d' % uid
242 """Get the FQDN of this machine."""
245 _hostname = socket.getfqdn()
249 _resource_path = None
250 def resource_path(subdir=''):
251 global _resource_path
252 if not _resource_path:
253 _resource_path = os.environ.get('BUP_RESOURCE_PATH') or '.'
254 return os.path.join(_resource_path, subdir)
257 class NotOk(Exception):
262 def __init__(self, outp):
266 while self._read(65536): pass
268 def read(self, size):
269 """Read 'size' bytes from input stream."""
271 return self._read(size)
274 """Read from input stream until a newline is found."""
276 return self._readline()
278 def write(self, data):
279 """Write 'data' to output stream."""
280 #log('%d writing: %d bytes\n' % (os.getpid(), len(data)))
281 self.outp.write(data)
284 """Return true if input stream is readable."""
285 raise NotImplemented("Subclasses must implement has_input")
288 """Indicate end of output from last sent command."""
292 """Indicate server error to the client."""
293 s = re.sub(r'\s+', ' ', str(s))
294 self.write('\nerror %s\n' % s)
296 def _check_ok(self, onempty):
299 for rl in linereader(self):
300 #log('%d got line: %r\n' % (os.getpid(), rl))
301 if not rl: # empty line
305 elif rl.startswith('error '):
306 #log('client: error: %s\n' % rl[6:])
310 raise Exception('server exited unexpectedly; see errors above')
312 def drain_and_check_ok(self):
313 """Remove all data for the current command from input stream."""
316 return self._check_ok(onempty)
319 """Verify that server action completed successfully."""
321 raise Exception('expected "ok", got %r' % rl)
322 return self._check_ok(onempty)
325 class Conn(BaseConn):
326 def __init__(self, inp, outp):
327 BaseConn.__init__(self, outp)
330 def _read(self, size):
331 return self.inp.read(size)
334 return self.inp.readline()
337 [rl, wl, xl] = select.select([self.inp.fileno()], [], [], 0)
339 assert(rl[0] == self.inp.fileno())
345 def checked_reader(fd, n):
347 rl, _, _ = select.select([fd], [], [])
350 if not buf: raise Exception("Unexpected EOF reading %d more bytes" % n)
355 MAX_PACKET = 128 * 1024
356 def mux(p, outfd, outr, errr):
359 while p.poll() is None:
360 rl, _, _ = select.select(fds, [], [])
363 buf = os.read(outr, MAX_PACKET)
365 os.write(outfd, struct.pack('!IB', len(buf), 1) + buf)
367 buf = os.read(errr, 1024)
369 os.write(outfd, struct.pack('!IB', len(buf), 2) + buf)
371 os.write(outfd, struct.pack('!IB', 0, 3))
374 class DemuxConn(BaseConn):
375 """A helper class for bup's client-server protocol."""
376 def __init__(self, infd, outp):
377 BaseConn.__init__(self, outp)
378 # Anything that comes through before the sync string was not
379 # multiplexed and can be assumed to be debug/log before mux init.
381 while tail != 'BUPMUX':
382 b = os.read(infd, (len(tail) < 6) and (6-len(tail)) or 1)
384 raise IOError('demux: unexpected EOF during initialization')
386 sys.stderr.write(tail[:-6]) # pre-mux log messages
393 def write(self, data):
395 BaseConn.write(self, data)
397 def _next_packet(self, timeout):
398 if self.closed: return False
399 rl, wl, xl = select.select([self.infd], [], [], timeout)
400 if not rl: return False
401 assert(rl[0] == self.infd)
402 ns = ''.join(checked_reader(self.infd, 5))
403 n, fdw = struct.unpack('!IB', ns)
404 assert(n <= MAX_PACKET)
406 self.reader = checked_reader(self.infd, n)
408 for buf in checked_reader(self.infd, n):
409 sys.stderr.write(buf)
412 debug2("DemuxConn: marked closed\n")
415 def _load_buf(self, timeout):
416 if self.buf is not None:
418 while not self.closed:
419 while not self.reader:
420 if not self._next_packet(timeout):
423 self.buf = self.reader.next()
425 except StopIteration:
429 def _read_parts(self, ix_fn):
430 while self._load_buf(None):
431 assert(self.buf is not None)
433 if i is None or i == len(self.buf):
438 self.buf = self.buf[i:]
446 return buf.index('\n')+1
449 return ''.join(self._read_parts(find_eol))
451 def _read(self, size):
453 def until_size(buf): # Closes on csize
454 if len(buf) < csize[0]:
459 return ''.join(self._read_parts(until_size))
462 return self._load_buf(0)
466 """Generate a list of input lines from 'f' without terminating newlines."""
474 def chunkyreader(f, count = None):
475 """Generate a list of chunks of data read from 'f'.
477 If count is None, read until EOF is reached.
479 If count is a positive integer, read 'count' bytes from 'f'. If EOF is
480 reached while reading, raise IOError.
484 b = f.read(min(count, 65536))
486 raise IOError('EOF with %d bytes remaining' % count)
497 """Append "/" to 's' if it doesn't aleady end in "/"."""
498 if s and not s.endswith('/'):
504 def _mmap_do(f, sz, flags, prot, close):
506 st = os.fstat(f.fileno())
509 # trying to open a zero-length map gives an error, but an empty
510 # string has all the same behaviour of a zero-length map, ie. it has
513 map = mmap.mmap(f.fileno(), sz, flags, prot)
515 f.close() # map will persist beyond file close
519 def mmap_read(f, sz = 0, close=True):
520 """Create a read-only memory mapped region on file 'f'.
521 If sz is 0, the region will cover the entire file.
523 return _mmap_do(f, sz, mmap.MAP_PRIVATE, mmap.PROT_READ, close)
526 def mmap_readwrite(f, sz = 0, close=True):
527 """Create a read-write memory mapped region on file 'f'.
528 If sz is 0, the region will cover the entire file.
530 return _mmap_do(f, sz, mmap.MAP_SHARED, mmap.PROT_READ|mmap.PROT_WRITE,
534 def mmap_readwrite_private(f, sz = 0, close=True):
535 """Create a read-write memory mapped region on file 'f'.
536 If sz is 0, the region will cover the entire file.
537 The map is private, which means the changes are never flushed back to the
540 return _mmap_do(f, sz, mmap.MAP_PRIVATE, mmap.PROT_READ|mmap.PROT_WRITE,
545 """Parse data size information into a float number.
547 Here are some examples of conversions:
548 199.2k means 203981 bytes
549 1GB means 1073741824 bytes
550 2.1 tb means 2199023255552 bytes
552 g = re.match(r'([-+\d.e]+)\s*(\w*)', str(s))
554 raise ValueError("can't parse %r as a number" % s)
555 (val, unit) = g.groups()
558 if unit in ['t', 'tb']:
559 mult = 1024*1024*1024*1024
560 elif unit in ['g', 'gb']:
561 mult = 1024*1024*1024
562 elif unit in ['m', 'mb']:
564 elif unit in ['k', 'kb']:
566 elif unit in ['', 'b']:
569 raise ValueError("invalid unit %r in number %r" % (unit, s))
574 """Count the number of elements in an iterator. (consumes the iterator)"""
575 return reduce(lambda x,y: x+1, l)
580 """Append an error message to the list of saved errors.
582 Once processing is able to stop and output the errors, the saved errors are
583 accessible in the module variable helpers.saved_errors.
585 saved_errors.append(e)
595 """Replace the default exception handler for KeyboardInterrupt (Ctrl-C).
597 The new exception handler will make sure that bup will exit without an ugly
598 stacktrace when Ctrl-C is hit.
600 oldhook = sys.excepthook
601 def newhook(exctype, value, traceback):
602 if exctype == KeyboardInterrupt:
603 log('Interrupted.\n')
605 return oldhook(exctype, value, traceback)
606 sys.excepthook = newhook
609 def columnate(l, prefix):
610 """Format elements of 'l' in columns with 'prefix' leading each line.
612 The number of columns is determined automatically based on the string
618 clen = max(len(s) for s in l)
619 ncols = (tty_width() - len(prefix)) / (clen + 2)
624 while len(l) % ncols:
627 for s in range(0, len(l), rows):
628 cols.append(l[s:s+rows])
630 for row in zip(*cols):
631 out += prefix + ''.join(('%-*s' % (clen+2, s)) for s in row) + '\n'
635 def parse_date_or_fatal(str, fatal):
636 """Parses the given date or calls Option.fatal().
637 For now we expect a string that contains a float."""
640 except ValueError, e:
641 raise fatal('invalid date format (should be a float): %r' % e)
646 def strip_path(prefix, path):
647 """Strips a given prefix from a path.
649 First both paths are normalized.
651 Raises an Exception if no prefix is given.
654 raise Exception('no path given')
656 normalized_prefix = os.path.realpath(prefix)
657 debug2("normalized_prefix: %s\n" % normalized_prefix)
658 normalized_path = os.path.realpath(path)
659 debug2("normalized_path: %s\n" % normalized_path)
660 if normalized_path.startswith(normalized_prefix):
661 return normalized_path[len(normalized_prefix):]
666 def strip_base_path(path, base_paths):
667 """Strips the base path from a given path.
670 Determines the base path for the given string and then strips it
672 Iterates over all base_paths from long to short, to prevent that
673 a too short base_path is removed.
675 normalized_path = os.path.realpath(path)
676 sorted_base_paths = sorted(base_paths, key=len, reverse=True)
677 for bp in sorted_base_paths:
678 if normalized_path.startswith(os.path.realpath(bp)):
679 return strip_path(bp, normalized_path)
683 def graft_path(graft_points, path):
684 normalized_path = os.path.realpath(path)
685 for graft_point in graft_points:
686 old_prefix, new_prefix = graft_point
687 if normalized_path.startswith(old_prefix):
688 return re.sub(r'^' + old_prefix, new_prefix, normalized_path)
689 return normalized_path
692 # hashlib is only available in python 2.5 or higher, but the 'sha' module
693 # produces a DeprecationWarning in python 2.6 or higher. We want to support
694 # python 2.4 and above without any stupid warnings, so let's try using hashlib
695 # first, and downgrade if it fails.
706 """Format bup's version date string for output."""
707 return _version.DATE.split(' ')[0]
710 def version_commit():
711 """Get the commit hash of bup's current version."""
712 return _version.COMMIT
716 """Format bup's version tag (the official version number).
718 When generated from a commit other than one pointed to with a tag, the
719 returned string will be "unknown-" followed by the first seven positions of
722 names = _version.NAMES.strip()
723 assert(names[0] == '(')
724 assert(names[-1] == ')')
726 l = [n.strip() for n in names.split(',')]
728 if n.startswith('tag: bup-'):
730 return 'unknown-%s' % _version.COMMIT[:7]