1 """Helper functions and classes for bup."""
3 import sys, os, pwd, subprocess, errno, socket, select, mmap, stat, re, struct
4 import heapq, operator, time
5 from bup import _version, _helpers
7 # This function should really be in helpers, not in bup.options. But we
8 # want options.py to be standalone so people can include it in other projects.
9 from bup.options import _tty_width
10 tty_width = _tty_width
14 """Convert the string 's' to an integer. Return 0 if s is not a number."""
22 """Convert the string 's' to a float. Return 0 if s is not a number."""
24 return float(s or '0')
29 buglvl = atoi(os.environ.get('BUP_DEBUG', 0))
32 # Write (blockingly) to sockets that may or may not be in blocking mode.
33 # We need this because our stderr is sometimes eaten by subprocesses
34 # (probably ssh) that sometimes make it nonblocking, if only temporarily,
35 # leading to race conditions. Ick. We'll do it the hard way.
36 def _hard_write(fd, buf):
38 (r,w,x) = select.select([], [fd], [], None)
40 raise IOError('select(fd) returned without being writable')
42 sz = os.write(fd, buf)
44 if e.errno != errno.EAGAIN:
52 """Print a log message to stderr."""
55 _hard_write(sys.stderr.fileno(), s)
69 istty1 = os.isatty(1) or (atoi(os.environ.get('BUP_FORCE_TTY')) & 1)
70 istty2 = os.isatty(2) or (atoi(os.environ.get('BUP_FORCE_TTY')) & 2)
73 """Calls log() if stderr is a TTY. Does nothing otherwise."""
81 """Calls progress() only if we haven't printed progress in a while.
83 This avoids overloading the stderr buffer with excess junk.
87 if now - _last_prog > 0.1:
93 """Calls progress() to redisplay the most recent progress message.
95 Useful after you've printed some other message that wipes out the
98 if _last_progress and _last_progress.endswith('\r'):
99 progress(_last_progress)
102 def mkdirp(d, mode=None):
103 """Recursively create directories on path 'd'.
105 Unlike os.makedirs(), it doesn't raise an exception if the last element of
106 the path already exists.
114 if e.errno == errno.EEXIST:
121 """Get the next item from an iterator, None if we reached the end."""
124 except StopIteration:
128 def merge_iter(iters, pfreq, pfunc, pfinal, key=None):
130 samekey = lambda e, pe: getattr(e, key) == getattr(pe, key, None)
132 samekey = operator.eq
134 total = sum(len(it) for it in iters)
135 iters = (iter(it) for it in iters)
136 heap = ((next(it),it) for it in iters)
137 heap = [(e,it) for e,it in heap if e]
142 if not count % pfreq:
145 if not samekey(e, pe):
150 e = it.next() # Don't use next() function, it's too expensive
151 except StopIteration:
152 heapq.heappop(heap) # remove current
154 heapq.heapreplace(heap, (e, it)) # shift current to new location
159 """Delete a file at path 'f' if it currently exists.
161 Unlike os.unlink(), does not throw an exception if the file didn't already
167 if e.errno == errno.ENOENT:
168 pass # it doesn't exist, that's what you asked for
172 """Run a subprocess and return its output."""
173 p = subprocess.Popen(argv, stdout=subprocess.PIPE)
180 """Get the absolute path of a file.
182 Behaves like os.path.realpath, but doesn't follow a symlink for the last
183 element. (ie. if 'p' itself is a symlink, this one won't follow it, but it
184 will follow symlinks in p's directory)
190 if st and stat.S_ISLNK(st.st_mode):
191 (dir, name) = os.path.split(p)
192 dir = os.path.realpath(dir)
193 out = os.path.join(dir, name)
195 out = os.path.realpath(p)
196 #log('realpathing:%r,%r\n' % (p, out))
202 """Get the user's login name."""
207 _username = pwd.getpwuid(uid)[0]
209 _username = 'user%d' % uid
215 """Get the user's full name."""
217 if not _userfullname:
220 _userfullname = pwd.getpwuid(uid)[4].split(',')[0]
222 _userfullname = 'user%d' % uid
228 """Get the FQDN of this machine."""
231 _hostname = socket.getfqdn()
235 _resource_path = None
236 def resource_path(subdir=''):
237 global _resource_path
238 if not _resource_path:
239 _resource_path = os.environ.get('BUP_RESOURCE_PATH') or '.'
240 return os.path.join(_resource_path, subdir)
243 class NotOk(Exception):
248 def __init__(self, outp):
252 while self._read(65536): pass
254 def read(self, size):
255 """Read 'size' bytes from input stream."""
257 return self._read(size)
260 """Read from input stream until a newline is found."""
262 return self._readline()
264 def write(self, data):
265 """Write 'data' to output stream."""
266 #log('%d writing: %d bytes\n' % (os.getpid(), len(data)))
267 self.outp.write(data)
270 """Return true if input stream is readable."""
271 raise NotImplemented("Subclasses must implement has_input")
274 """Indicate end of output from last sent command."""
278 """Indicate server error to the client."""
279 s = re.sub(r'\s+', ' ', str(s))
280 self.write('\nerror %s\n' % s)
282 def _check_ok(self, onempty):
285 for rl in linereader(self):
286 #log('%d got line: %r\n' % (os.getpid(), rl))
287 if not rl: # empty line
291 elif rl.startswith('error '):
292 #log('client: error: %s\n' % rl[6:])
296 raise Exception('server exited unexpectedly; see errors above')
298 def drain_and_check_ok(self):
299 """Remove all data for the current command from input stream."""
302 return self._check_ok(onempty)
305 """Verify that server action completed successfully."""
307 raise Exception('expected "ok", got %r' % rl)
308 return self._check_ok(onempty)
311 class Conn(BaseConn):
312 def __init__(self, inp, outp):
313 BaseConn.__init__(self, outp)
316 def _read(self, size):
317 return self.inp.read(size)
320 return self.inp.readline()
323 [rl, wl, xl] = select.select([self.inp.fileno()], [], [], 0)
325 assert(rl[0] == self.inp.fileno())
331 def checked_reader(fd, n):
333 rl, _, _ = select.select([fd], [], [])
336 if not buf: raise Exception("Unexpected EOF reading %d more bytes" % n)
341 MAX_PACKET = 128 * 1024
342 def mux(p, outfd, outr, errr):
345 while p.poll() is None:
346 rl, _, _ = select.select(fds, [], [])
349 buf = os.read(outr, MAX_PACKET)
351 os.write(outfd, struct.pack('!IB', len(buf), 1) + buf)
353 buf = os.read(errr, 1024)
355 os.write(outfd, struct.pack('!IB', len(buf), 2) + buf)
357 os.write(outfd, struct.pack('!IB', 0, 3))
360 class DemuxConn(BaseConn):
361 """A helper class for bup's client-server protocol."""
362 def __init__(self, infd, outp):
363 BaseConn.__init__(self, outp)
364 # Anything that comes through before the sync string was not
365 # multiplexed and can be assumed to be debug/log before mux init.
367 while tail != 'BUPMUX':
368 b = os.read(infd, (len(tail) < 6) and (6-len(tail)) or 1)
370 raise IOError('demux: unexpected EOF during initialization')
372 sys.stderr.write(tail[:-6]) # pre-mux log messages
379 def write(self, data):
381 BaseConn.write(self, data)
383 def _next_packet(self, timeout):
384 if self.closed: return False
385 rl, wl, xl = select.select([self.infd], [], [], timeout)
386 if not rl: return False
387 assert(rl[0] == self.infd)
388 ns = ''.join(checked_reader(self.infd, 5))
389 n, fdw = struct.unpack('!IB', ns)
390 assert(n <= MAX_PACKET)
392 self.reader = checked_reader(self.infd, n)
394 for buf in checked_reader(self.infd, n):
395 sys.stderr.write(buf)
398 debug2("DemuxConn: marked closed\n")
401 def _load_buf(self, timeout):
402 if self.buf is not None:
404 while not self.closed:
405 while not self.reader:
406 if not self._next_packet(timeout):
409 self.buf = self.reader.next()
411 except StopIteration:
415 def _read_parts(self, ix_fn):
416 while self._load_buf(None):
417 assert(self.buf is not None)
419 if i is None or i == len(self.buf):
424 self.buf = self.buf[i:]
432 return buf.index('\n')+1
435 return ''.join(self._read_parts(find_eol))
437 def _read(self, size):
439 def until_size(buf): # Closes on csize
440 if len(buf) < csize[0]:
445 return ''.join(self._read_parts(until_size))
448 return self._load_buf(0)
452 """Generate a list of input lines from 'f' without terminating newlines."""
460 def chunkyreader(f, count = None):
461 """Generate a list of chunks of data read from 'f'.
463 If count is None, read until EOF is reached.
465 If count is a positive integer, read 'count' bytes from 'f'. If EOF is
466 reached while reading, raise IOError.
470 b = f.read(min(count, 65536))
472 raise IOError('EOF with %d bytes remaining' % count)
483 """Append "/" to 's' if it doesn't aleady end in "/"."""
484 if s and not s.endswith('/'):
490 def _mmap_do(f, sz, flags, prot, close):
492 st = os.fstat(f.fileno())
495 # trying to open a zero-length map gives an error, but an empty
496 # string has all the same behaviour of a zero-length map, ie. it has
499 map = mmap.mmap(f.fileno(), sz, flags, prot)
501 f.close() # map will persist beyond file close
505 def mmap_read(f, sz = 0, close=True):
506 """Create a read-only memory mapped region on file 'f'.
507 If sz is 0, the region will cover the entire file.
509 return _mmap_do(f, sz, mmap.MAP_PRIVATE, mmap.PROT_READ, close)
512 def mmap_readwrite(f, sz = 0, close=True):
513 """Create a read-write memory mapped region on file 'f'.
514 If sz is 0, the region will cover the entire file.
516 return _mmap_do(f, sz, mmap.MAP_SHARED, mmap.PROT_READ|mmap.PROT_WRITE,
520 def mmap_readwrite_private(f, sz = 0, close=True):
521 """Create a read-write memory mapped region on file 'f'.
522 If sz is 0, the region will cover the entire file.
523 The map is private, which means the changes are never flushed back to the
526 return _mmap_do(f, sz, mmap.MAP_PRIVATE, mmap.PROT_READ|mmap.PROT_WRITE,
531 """Parse data size information into a float number.
533 Here are some examples of conversions:
534 199.2k means 203981 bytes
535 1GB means 1073741824 bytes
536 2.1 tb means 2199023255552 bytes
538 g = re.match(r'([-+\d.e]+)\s*(\w*)', str(s))
540 raise ValueError("can't parse %r as a number" % s)
541 (val, unit) = g.groups()
544 if unit in ['t', 'tb']:
545 mult = 1024*1024*1024*1024
546 elif unit in ['g', 'gb']:
547 mult = 1024*1024*1024
548 elif unit in ['m', 'mb']:
550 elif unit in ['k', 'kb']:
552 elif unit in ['', 'b']:
555 raise ValueError("invalid unit %r in number %r" % (unit, s))
560 """Count the number of elements in an iterator. (consumes the iterator)"""
561 return reduce(lambda x,y: x+1, l)
566 """Append an error message to the list of saved errors.
568 Once processing is able to stop and output the errors, the saved errors are
569 accessible in the module variable helpers.saved_errors.
571 saved_errors.append(e)
576 """Replace the default exception handler for KeyboardInterrupt (Ctrl-C).
578 The new exception handler will make sure that bup will exit without an ugly
579 stacktrace when Ctrl-C is hit.
581 oldhook = sys.excepthook
582 def newhook(exctype, value, traceback):
583 if exctype == KeyboardInterrupt:
584 log('Interrupted.\n')
586 return oldhook(exctype, value, traceback)
587 sys.excepthook = newhook
590 def columnate(l, prefix):
591 """Format elements of 'l' in columns with 'prefix' leading each line.
593 The number of columns is determined automatically based on the string
599 clen = max(len(s) for s in l)
600 ncols = (tty_width() - len(prefix)) / (clen + 2)
605 while len(l) % ncols:
608 for s in range(0, len(l), rows):
609 cols.append(l[s:s+rows])
611 for row in zip(*cols):
612 out += prefix + ''.join(('%-*s' % (clen+2, s)) for s in row) + '\n'
616 def parse_date_or_fatal(str, fatal):
617 """Parses the given date or calls Option.fatal().
618 For now we expect a string that contains a float."""
621 except ValueError, e:
622 raise fatal('invalid date format (should be a float): %r' % e)
627 def strip_path(prefix, path):
628 """Strips a given prefix from a path.
630 First both paths are normalized.
632 Raises an Exception if no prefix is given.
635 raise Exception('no path given')
637 normalized_prefix = os.path.realpath(prefix)
638 debug2("normalized_prefix: %s\n" % normalized_prefix)
639 normalized_path = os.path.realpath(path)
640 debug2("normalized_path: %s\n" % normalized_path)
641 if normalized_path.startswith(normalized_prefix):
642 return normalized_path[len(normalized_prefix):]
647 def strip_base_path(path, base_paths):
648 """Strips the base path from a given path.
651 Determines the base path for the given string and then strips it
653 Iterates over all base_paths from long to short, to prevent that
654 a too short base_path is removed.
656 normalized_path = os.path.realpath(path)
657 sorted_base_paths = sorted(base_paths, key=len, reverse=True)
658 for bp in sorted_base_paths:
659 if normalized_path.startswith(os.path.realpath(bp)):
660 return strip_path(bp, normalized_path)
664 def graft_path(graft_points, path):
665 normalized_path = os.path.realpath(path)
666 for graft_point in graft_points:
667 old_prefix, new_prefix = graft_point
668 if normalized_path.startswith(old_prefix):
669 return re.sub(r'^' + old_prefix, new_prefix, normalized_path)
670 return normalized_path
673 # hashlib is only available in python 2.5 or higher, but the 'sha' module
674 # produces a DeprecationWarning in python 2.6 or higher. We want to support
675 # python 2.4 and above without any stupid warnings, so let's try using hashlib
676 # first, and downgrade if it fails.
687 """Format bup's version date string for output."""
688 return _version.DATE.split(' ')[0]
691 def version_commit():
692 """Get the commit hash of bup's current version."""
693 return _version.COMMIT
697 """Format bup's version tag (the official version number).
699 When generated from a commit other than one pointed to with a tag, the
700 returned string will be "unknown-" followed by the first seven positions of
703 names = _version.NAMES.strip()
704 assert(names[0] == '(')
705 assert(names[-1] == ')')
707 l = [n.strip() for n in names.split(',')]
709 if n.startswith('tag: bup-'):
711 return 'unknown-%s' % _version.COMMIT[:7]