1 """Helper functions and classes for bup."""
3 import sys, os, pwd, subprocess, errno, socket, select, mmap, stat, re, struct
4 import heapq, operator, time
5 from bup import _version, _helpers
6 import bup._helpers as _helpers
8 # This function should really be in helpers, not in bup.options. But we
9 # want options.py to be standalone so people can include it in other projects.
10 from bup.options import _tty_width
11 tty_width = _tty_width
15 """Convert the string 's' to an integer. Return 0 if s is not a number."""
23 """Convert the string 's' to a float. Return 0 if s is not a number."""
25 return float(s or '0')
30 buglvl = atoi(os.environ.get('BUP_DEBUG', 0))
33 # Write (blockingly) to sockets that may or may not be in blocking mode.
34 # We need this because our stderr is sometimes eaten by subprocesses
35 # (probably ssh) that sometimes make it nonblocking, if only temporarily,
36 # leading to race conditions. Ick. We'll do it the hard way.
37 def _hard_write(fd, buf):
39 (r,w,x) = select.select([], [fd], [], None)
41 raise IOError('select(fd) returned without being writable')
43 sz = os.write(fd, buf)
45 if e.errno != errno.EAGAIN:
53 """Print a log message to stderr."""
56 _hard_write(sys.stderr.fileno(), s)
70 istty1 = os.isatty(1) or (atoi(os.environ.get('BUP_FORCE_TTY')) & 1)
71 istty2 = os.isatty(2) or (atoi(os.environ.get('BUP_FORCE_TTY')) & 2)
74 """Calls log() if stderr is a TTY. Does nothing otherwise."""
82 """Calls progress() only if we haven't printed progress in a while.
84 This avoids overloading the stderr buffer with excess junk.
88 if now - _last_prog > 0.1:
94 """Calls progress() to redisplay the most recent progress message.
96 Useful after you've printed some other message that wipes out the
99 if _last_progress and _last_progress.endswith('\r'):
100 progress(_last_progress)
103 def mkdirp(d, mode=None):
104 """Recursively create directories on path 'd'.
106 Unlike os.makedirs(), it doesn't raise an exception if the last element of
107 the path already exists.
115 if e.errno == errno.EEXIST:
122 """Get the next item from an iterator, None if we reached the end."""
125 except StopIteration:
129 def merge_iter(iters, pfreq, pfunc, pfinal, key=None):
131 samekey = lambda e, pe: getattr(e, key) == getattr(pe, key, None)
133 samekey = operator.eq
135 total = sum(len(it) for it in iters)
136 iters = (iter(it) for it in iters)
137 heap = ((next(it),it) for it in iters)
138 heap = [(e,it) for e,it in heap if e]
143 if not count % pfreq:
146 if not samekey(e, pe):
151 e = it.next() # Don't use next() function, it's too expensive
152 except StopIteration:
153 heapq.heappop(heap) # remove current
155 heapq.heapreplace(heap, (e, it)) # shift current to new location
160 """Delete a file at path 'f' if it currently exists.
162 Unlike os.unlink(), does not throw an exception if the file didn't already
168 if e.errno == errno.ENOENT:
169 pass # it doesn't exist, that's what you asked for
173 """Run a subprocess and return its output."""
174 p = subprocess.Popen(argv, stdout=subprocess.PIPE)
181 """Get the absolute path of a file.
183 Behaves like os.path.realpath, but doesn't follow a symlink for the last
184 element. (ie. if 'p' itself is a symlink, this one won't follow it, but it
185 will follow symlinks in p's directory)
191 if st and stat.S_ISLNK(st.st_mode):
192 (dir, name) = os.path.split(p)
193 dir = os.path.realpath(dir)
194 out = os.path.join(dir, name)
196 out = os.path.realpath(p)
197 #log('realpathing:%r,%r\n' % (p, out))
201 def detect_fakeroot():
202 "Return True if we appear to be running under fakeroot."
203 return os.getenv("FAKEROOTKEY") != None
208 """Get the user's login name."""
213 _username = pwd.getpwuid(uid)[0]
215 _username = 'user%d' % uid
221 """Get the user's full name."""
223 if not _userfullname:
226 _userfullname = pwd.getpwuid(uid)[4].split(',')[0]
228 _userfullname = 'user%d' % uid
234 """Get the FQDN of this machine."""
237 _hostname = socket.getfqdn()
241 _resource_path = None
242 def resource_path(subdir=''):
243 global _resource_path
244 if not _resource_path:
245 _resource_path = os.environ.get('BUP_RESOURCE_PATH') or '.'
246 return os.path.join(_resource_path, subdir)
249 class NotOk(Exception):
254 def __init__(self, outp):
258 while self._read(65536): pass
260 def read(self, size):
261 """Read 'size' bytes from input stream."""
263 return self._read(size)
266 """Read from input stream until a newline is found."""
268 return self._readline()
270 def write(self, data):
271 """Write 'data' to output stream."""
272 #log('%d writing: %d bytes\n' % (os.getpid(), len(data)))
273 self.outp.write(data)
276 """Return true if input stream is readable."""
277 raise NotImplemented("Subclasses must implement has_input")
280 """Indicate end of output from last sent command."""
284 """Indicate server error to the client."""
285 s = re.sub(r'\s+', ' ', str(s))
286 self.write('\nerror %s\n' % s)
288 def _check_ok(self, onempty):
291 for rl in linereader(self):
292 #log('%d got line: %r\n' % (os.getpid(), rl))
293 if not rl: # empty line
297 elif rl.startswith('error '):
298 #log('client: error: %s\n' % rl[6:])
302 raise Exception('server exited unexpectedly; see errors above')
304 def drain_and_check_ok(self):
305 """Remove all data for the current command from input stream."""
308 return self._check_ok(onempty)
311 """Verify that server action completed successfully."""
313 raise Exception('expected "ok", got %r' % rl)
314 return self._check_ok(onempty)
317 class Conn(BaseConn):
318 def __init__(self, inp, outp):
319 BaseConn.__init__(self, outp)
322 def _read(self, size):
323 return self.inp.read(size)
326 return self.inp.readline()
329 [rl, wl, xl] = select.select([self.inp.fileno()], [], [], 0)
331 assert(rl[0] == self.inp.fileno())
337 def checked_reader(fd, n):
339 rl, _, _ = select.select([fd], [], [])
342 if not buf: raise Exception("Unexpected EOF reading %d more bytes" % n)
347 MAX_PACKET = 128 * 1024
348 def mux(p, outfd, outr, errr):
351 while p.poll() is None:
352 rl, _, _ = select.select(fds, [], [])
355 buf = os.read(outr, MAX_PACKET)
357 os.write(outfd, struct.pack('!IB', len(buf), 1) + buf)
359 buf = os.read(errr, 1024)
361 os.write(outfd, struct.pack('!IB', len(buf), 2) + buf)
363 os.write(outfd, struct.pack('!IB', 0, 3))
366 class DemuxConn(BaseConn):
367 """A helper class for bup's client-server protocol."""
368 def __init__(self, infd, outp):
369 BaseConn.__init__(self, outp)
370 # Anything that comes through before the sync string was not
371 # multiplexed and can be assumed to be debug/log before mux init.
373 while tail != 'BUPMUX':
374 b = os.read(infd, (len(tail) < 6) and (6-len(tail)) or 1)
376 raise IOError('demux: unexpected EOF during initialization')
378 sys.stderr.write(tail[:-6]) # pre-mux log messages
385 def write(self, data):
387 BaseConn.write(self, data)
389 def _next_packet(self, timeout):
390 if self.closed: return False
391 rl, wl, xl = select.select([self.infd], [], [], timeout)
392 if not rl: return False
393 assert(rl[0] == self.infd)
394 ns = ''.join(checked_reader(self.infd, 5))
395 n, fdw = struct.unpack('!IB', ns)
396 assert(n <= MAX_PACKET)
398 self.reader = checked_reader(self.infd, n)
400 for buf in checked_reader(self.infd, n):
401 sys.stderr.write(buf)
404 debug2("DemuxConn: marked closed\n")
407 def _load_buf(self, timeout):
408 if self.buf is not None:
410 while not self.closed:
411 while not self.reader:
412 if not self._next_packet(timeout):
415 self.buf = self.reader.next()
417 except StopIteration:
421 def _read_parts(self, ix_fn):
422 while self._load_buf(None):
423 assert(self.buf is not None)
425 if i is None or i == len(self.buf):
430 self.buf = self.buf[i:]
438 return buf.index('\n')+1
441 return ''.join(self._read_parts(find_eol))
443 def _read(self, size):
445 def until_size(buf): # Closes on csize
446 if len(buf) < csize[0]:
451 return ''.join(self._read_parts(until_size))
454 return self._load_buf(0)
458 """Generate a list of input lines from 'f' without terminating newlines."""
466 def chunkyreader(f, count = None):
467 """Generate a list of chunks of data read from 'f'.
469 If count is None, read until EOF is reached.
471 If count is a positive integer, read 'count' bytes from 'f'. If EOF is
472 reached while reading, raise IOError.
476 b = f.read(min(count, 65536))
478 raise IOError('EOF with %d bytes remaining' % count)
489 """Append "/" to 's' if it doesn't aleady end in "/"."""
490 if s and not s.endswith('/'):
496 def _mmap_do(f, sz, flags, prot, close):
498 st = os.fstat(f.fileno())
501 # trying to open a zero-length map gives an error, but an empty
502 # string has all the same behaviour of a zero-length map, ie. it has
505 map = mmap.mmap(f.fileno(), sz, flags, prot)
507 f.close() # map will persist beyond file close
511 def mmap_read(f, sz = 0, close=True):
512 """Create a read-only memory mapped region on file 'f'.
513 If sz is 0, the region will cover the entire file.
515 return _mmap_do(f, sz, mmap.MAP_PRIVATE, mmap.PROT_READ, close)
518 def mmap_readwrite(f, sz = 0, close=True):
519 """Create a read-write memory mapped region on file 'f'.
520 If sz is 0, the region will cover the entire file.
522 return _mmap_do(f, sz, mmap.MAP_SHARED, mmap.PROT_READ|mmap.PROT_WRITE,
526 def mmap_readwrite_private(f, sz = 0, close=True):
527 """Create a read-write memory mapped region on file 'f'.
528 If sz is 0, the region will cover the entire file.
529 The map is private, which means the changes are never flushed back to the
532 return _mmap_do(f, sz, mmap.MAP_PRIVATE, mmap.PROT_READ|mmap.PROT_WRITE,
537 """Parse data size information into a float number.
539 Here are some examples of conversions:
540 199.2k means 203981 bytes
541 1GB means 1073741824 bytes
542 2.1 tb means 2199023255552 bytes
544 g = re.match(r'([-+\d.e]+)\s*(\w*)', str(s))
546 raise ValueError("can't parse %r as a number" % s)
547 (val, unit) = g.groups()
550 if unit in ['t', 'tb']:
551 mult = 1024*1024*1024*1024
552 elif unit in ['g', 'gb']:
553 mult = 1024*1024*1024
554 elif unit in ['m', 'mb']:
556 elif unit in ['k', 'kb']:
558 elif unit in ['', 'b']:
561 raise ValueError("invalid unit %r in number %r" % (unit, s))
566 """Count the number of elements in an iterator. (consumes the iterator)"""
567 return reduce(lambda x,y: x+1, l)
572 """Append an error message to the list of saved errors.
574 Once processing is able to stop and output the errors, the saved errors are
575 accessible in the module variable helpers.saved_errors.
577 saved_errors.append(e)
587 """Replace the default exception handler for KeyboardInterrupt (Ctrl-C).
589 The new exception handler will make sure that bup will exit without an ugly
590 stacktrace when Ctrl-C is hit.
592 oldhook = sys.excepthook
593 def newhook(exctype, value, traceback):
594 if exctype == KeyboardInterrupt:
595 log('Interrupted.\n')
597 return oldhook(exctype, value, traceback)
598 sys.excepthook = newhook
601 def columnate(l, prefix):
602 """Format elements of 'l' in columns with 'prefix' leading each line.
604 The number of columns is determined automatically based on the string
610 clen = max(len(s) for s in l)
611 ncols = (tty_width() - len(prefix)) / (clen + 2)
616 while len(l) % ncols:
619 for s in range(0, len(l), rows):
620 cols.append(l[s:s+rows])
622 for row in zip(*cols):
623 out += prefix + ''.join(('%-*s' % (clen+2, s)) for s in row) + '\n'
627 def parse_date_or_fatal(str, fatal):
628 """Parses the given date or calls Option.fatal().
629 For now we expect a string that contains a float."""
632 except ValueError, e:
633 raise fatal('invalid date format (should be a float): %r' % e)
638 def strip_path(prefix, path):
639 """Strips a given prefix from a path.
641 First both paths are normalized.
643 Raises an Exception if no prefix is given.
646 raise Exception('no path given')
648 normalized_prefix = os.path.realpath(prefix)
649 debug2("normalized_prefix: %s\n" % normalized_prefix)
650 normalized_path = os.path.realpath(path)
651 debug2("normalized_path: %s\n" % normalized_path)
652 if normalized_path.startswith(normalized_prefix):
653 return normalized_path[len(normalized_prefix):]
658 def strip_base_path(path, base_paths):
659 """Strips the base path from a given path.
662 Determines the base path for the given string and then strips it
664 Iterates over all base_paths from long to short, to prevent that
665 a too short base_path is removed.
667 normalized_path = os.path.realpath(path)
668 sorted_base_paths = sorted(base_paths, key=len, reverse=True)
669 for bp in sorted_base_paths:
670 if normalized_path.startswith(os.path.realpath(bp)):
671 return strip_path(bp, normalized_path)
675 def graft_path(graft_points, path):
676 normalized_path = os.path.realpath(path)
677 for graft_point in graft_points:
678 old_prefix, new_prefix = graft_point
679 if normalized_path.startswith(old_prefix):
680 return re.sub(r'^' + old_prefix, new_prefix, normalized_path)
681 return normalized_path
684 # hashlib is only available in python 2.5 or higher, but the 'sha' module
685 # produces a DeprecationWarning in python 2.6 or higher. We want to support
686 # python 2.4 and above without any stupid warnings, so let's try using hashlib
687 # first, and downgrade if it fails.
698 """Format bup's version date string for output."""
699 return _version.DATE.split(' ')[0]
702 def version_commit():
703 """Get the commit hash of bup's current version."""
704 return _version.COMMIT
708 """Format bup's version tag (the official version number).
710 When generated from a commit other than one pointed to with a tag, the
711 returned string will be "unknown-" followed by the first seven positions of
714 names = _version.NAMES.strip()
715 assert(names[0] == '(')
716 assert(names[-1] == ')')
718 l = [n.strip() for n in names.split(',')]
720 if n.startswith('tag: bup-'):
722 return 'unknown-%s' % _version.COMMIT[:7]