1 """Helper functions and classes for bup."""
3 import sys, os, pwd, subprocess, errno, socket, select, mmap, stat, re, struct
4 import heapq, operator, time
5 from bup import _version, _helpers
7 # This function should really be in helpers, not in bup.options. But we
8 # want options.py to be standalone so people can include it in other projects.
9 from bup.options import _tty_width
10 tty_width = _tty_width
14 """Convert the string 's' to an integer. Return 0 if s is not a number."""
22 """Convert the string 's' to a float. Return 0 if s is not a number."""
24 return float(s or '0')
29 buglvl = atoi(os.environ.get('BUP_DEBUG', 0))
32 # Write (blockingly) to sockets that may or may not be in blocking mode.
33 # We need this because our stderr is sometimes eaten by subprocesses
34 # (probably ssh) that sometimes make it nonblocking, if only temporarily,
35 # leading to race conditions. Ick. We'll do it the hard way.
36 def _hard_write(fd, buf):
38 (r,w,x) = select.select([], [fd], [], None)
40 raise IOError('select(fd) returned without being writable')
42 sz = os.write(fd, buf)
44 if e.errno != errno.EAGAIN:
52 """Print a log message to stderr."""
55 _hard_write(sys.stderr.fileno(), s)
69 istty = os.isatty(2) or atoi(os.environ.get('BUP_FORCE_TTY'))
72 """Calls log() if stderr is a TTY. Does nothing otherwise."""
80 """Calls progress() only if we haven't printed progress in a while.
82 This avoids overloading the stderr buffer with excess junk.
86 if now - _last_prog > 0.1:
92 """Calls progress() to redisplay the most recent progress message.
94 Useful after you've printed some other message that wipes out the
97 if _last_progress and _last_progress.endswith('\r'):
98 progress(_last_progress)
101 def mkdirp(d, mode=None):
102 """Recursively create directories on path 'd'.
104 Unlike os.makedirs(), it doesn't raise an exception if the last element of
105 the path already exists.
113 if e.errno == errno.EEXIST:
120 """Get the next item from an iterator, None if we reached the end."""
123 except StopIteration:
127 def merge_iter(iters, pfreq, pfunc, pfinal, key=None):
129 samekey = lambda e, pe: getattr(e, key) == getattr(pe, key, None)
131 samekey = operator.eq
133 total = sum(len(it) for it in iters)
134 iters = (iter(it) for it in iters)
135 heap = ((next(it),it) for it in iters)
136 heap = [(e,it) for e,it in heap if e]
141 if not count % pfreq:
144 if not samekey(e, pe):
149 e = it.next() # Don't use next() function, it's too expensive
150 except StopIteration:
151 heapq.heappop(heap) # remove current
153 heapq.heapreplace(heap, (e, it)) # shift current to new location
158 """Delete a file at path 'f' if it currently exists.
160 Unlike os.unlink(), does not throw an exception if the file didn't already
166 if e.errno == errno.ENOENT:
167 pass # it doesn't exist, that's what you asked for
171 """Run a subprocess and return its output."""
172 p = subprocess.Popen(argv, stdout=subprocess.PIPE)
179 """Get the absolute path of a file.
181 Behaves like os.path.realpath, but doesn't follow a symlink for the last
182 element. (ie. if 'p' itself is a symlink, this one won't follow it, but it
183 will follow symlinks in p's directory)
189 if st and stat.S_ISLNK(st.st_mode):
190 (dir, name) = os.path.split(p)
191 dir = os.path.realpath(dir)
192 out = os.path.join(dir, name)
194 out = os.path.realpath(p)
195 #log('realpathing:%r,%r\n' % (p, out))
201 """Get the user's login name."""
206 _username = pwd.getpwuid(uid)[0]
208 _username = 'user%d' % uid
214 """Get the user's full name."""
216 if not _userfullname:
219 _userfullname = pwd.getpwuid(uid)[4].split(',')[0]
221 _userfullname = 'user%d' % uid
227 """Get the FQDN of this machine."""
230 _hostname = socket.getfqdn()
234 _resource_path = None
235 def resource_path(subdir=''):
236 global _resource_path
237 if not _resource_path:
238 _resource_path = os.environ.get('BUP_RESOURCE_PATH') or '.'
239 return os.path.join(_resource_path, subdir)
242 class NotOk(Exception):
247 def __init__(self, outp):
251 while self._read(65536): pass
253 def read(self, size):
254 """Read 'size' bytes from input stream."""
256 return self._read(size)
259 """Read from input stream until a newline is found."""
261 return self._readline()
263 def write(self, data):
264 """Write 'data' to output stream."""
265 #log('%d writing: %d bytes\n' % (os.getpid(), len(data)))
266 self.outp.write(data)
269 """Return true if input stream is readable."""
270 raise NotImplemented("Subclasses must implement has_input")
273 """Indicate end of output from last sent command."""
277 """Indicate server error to the client."""
278 s = re.sub(r'\s+', ' ', str(s))
279 self.write('\nerror %s\n' % s)
281 def _check_ok(self, onempty):
284 for rl in linereader(self):
285 #log('%d got line: %r\n' % (os.getpid(), rl))
286 if not rl: # empty line
290 elif rl.startswith('error '):
291 #log('client: error: %s\n' % rl[6:])
295 raise Exception('server exited unexpectedly; see errors above')
297 def drain_and_check_ok(self):
298 """Remove all data for the current command from input stream."""
301 return self._check_ok(onempty)
304 """Verify that server action completed successfully."""
306 raise Exception('expected "ok", got %r' % rl)
307 return self._check_ok(onempty)
310 class Conn(BaseConn):
311 def __init__(self, inp, outp):
312 BaseConn.__init__(self, outp)
315 def _read(self, size):
316 return self.inp.read(size)
319 return self.inp.readline()
322 [rl, wl, xl] = select.select([self.inp.fileno()], [], [], 0)
324 assert(rl[0] == self.inp.fileno())
330 def checked_reader(fd, n):
332 rl, _, _ = select.select([fd], [], [])
335 if not buf: raise Exception("Unexpected EOF reading %d more bytes" % n)
340 MAX_PACKET = 128 * 1024
341 def mux(p, outfd, outr, errr):
344 while p.poll() is None:
345 rl, _, _ = select.select(fds, [], [])
348 buf = os.read(outr, MAX_PACKET)
350 os.write(outfd, struct.pack('!IB', len(buf), 1) + buf)
352 buf = os.read(errr, 1024)
354 os.write(outfd, struct.pack('!IB', len(buf), 2) + buf)
356 os.write(outfd, struct.pack('!IB', 0, 3))
359 class DemuxConn(BaseConn):
360 """A helper class for bup's client-server protocol."""
361 def __init__(self, infd, outp):
362 BaseConn.__init__(self, outp)
363 # Anything that comes through before the sync string was not
364 # multiplexed and can be assumed to be debug/log before mux init.
366 while tail != 'BUPMUX':
367 b = os.read(infd, (len(tail) < 6) and (6-len(tail)) or 1)
369 raise IOError('demux: unexpected EOF during initialization')
371 sys.stderr.write(tail[:-6]) # pre-mux log messages
378 def write(self, data):
380 BaseConn.write(self, data)
382 def _next_packet(self, timeout):
383 if self.closed: return False
384 rl, wl, xl = select.select([self.infd], [], [], timeout)
385 if not rl: return False
386 assert(rl[0] == self.infd)
387 ns = ''.join(checked_reader(self.infd, 5))
388 n, fdw = struct.unpack('!IB', ns)
389 assert(n <= MAX_PACKET)
391 self.reader = checked_reader(self.infd, n)
393 for buf in checked_reader(self.infd, n):
394 sys.stderr.write(buf)
397 debug2("DemuxConn: marked closed\n")
400 def _load_buf(self, timeout):
401 if self.buf is not None:
403 while not self.closed:
404 while not self.reader:
405 if not self._next_packet(timeout):
408 self.buf = self.reader.next()
410 except StopIteration:
414 def _read_parts(self, ix_fn):
415 while self._load_buf(None):
416 assert(self.buf is not None)
418 if i is None or i == len(self.buf):
423 self.buf = self.buf[i:]
431 return buf.index('\n')+1
434 return ''.join(self._read_parts(find_eol))
436 def _read(self, size):
438 def until_size(buf): # Closes on csize
439 if len(buf) < csize[0]:
444 return ''.join(self._read_parts(until_size))
447 return self._load_buf(0)
451 """Generate a list of input lines from 'f' without terminating newlines."""
459 def chunkyreader(f, count = None):
460 """Generate a list of chunks of data read from 'f'.
462 If count is None, read until EOF is reached.
464 If count is a positive integer, read 'count' bytes from 'f'. If EOF is
465 reached while reading, raise IOError.
469 b = f.read(min(count, 65536))
471 raise IOError('EOF with %d bytes remaining' % count)
482 """Append "/" to 's' if it doesn't aleady end in "/"."""
483 if s and not s.endswith('/'):
489 def _mmap_do(f, sz, flags, prot, close):
491 st = os.fstat(f.fileno())
494 # trying to open a zero-length map gives an error, but an empty
495 # string has all the same behaviour of a zero-length map, ie. it has
498 map = mmap.mmap(f.fileno(), sz, flags, prot)
500 f.close() # map will persist beyond file close
504 def mmap_read(f, sz = 0, close=True):
505 """Create a read-only memory mapped region on file 'f'.
506 If sz is 0, the region will cover the entire file.
508 return _mmap_do(f, sz, mmap.MAP_PRIVATE, mmap.PROT_READ, close)
511 def mmap_readwrite(f, sz = 0, close=True):
512 """Create a read-write memory mapped region on file 'f'.
513 If sz is 0, the region will cover the entire file.
515 return _mmap_do(f, sz, mmap.MAP_SHARED, mmap.PROT_READ|mmap.PROT_WRITE,
519 def mmap_readwrite_private(f, sz = 0, close=True):
520 """Create a read-write memory mapped region on file 'f'.
521 If sz is 0, the region will cover the entire file.
522 The map is private, which means the changes are never flushed back to the
525 return _mmap_do(f, sz, mmap.MAP_PRIVATE, mmap.PROT_READ|mmap.PROT_WRITE,
530 """Parse data size information into a float number.
532 Here are some examples of conversions:
533 199.2k means 203981 bytes
534 1GB means 1073741824 bytes
535 2.1 tb means 2199023255552 bytes
537 g = re.match(r'([-+\d.e]+)\s*(\w*)', str(s))
539 raise ValueError("can't parse %r as a number" % s)
540 (val, unit) = g.groups()
543 if unit in ['t', 'tb']:
544 mult = 1024*1024*1024*1024
545 elif unit in ['g', 'gb']:
546 mult = 1024*1024*1024
547 elif unit in ['m', 'mb']:
549 elif unit in ['k', 'kb']:
551 elif unit in ['', 'b']:
554 raise ValueError("invalid unit %r in number %r" % (unit, s))
559 """Count the number of elements in an iterator. (consumes the iterator)"""
560 return reduce(lambda x,y: x+1, l)
565 """Append an error message to the list of saved errors.
567 Once processing is able to stop and output the errors, the saved errors are
568 accessible in the module variable helpers.saved_errors.
570 saved_errors.append(e)
575 """Replace the default exception handler for KeyboardInterrupt (Ctrl-C).
577 The new exception handler will make sure that bup will exit without an ugly
578 stacktrace when Ctrl-C is hit.
580 oldhook = sys.excepthook
581 def newhook(exctype, value, traceback):
582 if exctype == KeyboardInterrupt:
583 log('Interrupted.\n')
585 return oldhook(exctype, value, traceback)
586 sys.excepthook = newhook
589 def columnate(l, prefix):
590 """Format elements of 'l' in columns with 'prefix' leading each line.
592 The number of columns is determined automatically based on the string
598 clen = max(len(s) for s in l)
599 ncols = (tty_width() - len(prefix)) / (clen + 2)
604 while len(l) % ncols:
607 for s in range(0, len(l), rows):
608 cols.append(l[s:s+rows])
610 for row in zip(*cols):
611 out += prefix + ''.join(('%-*s' % (clen+2, s)) for s in row) + '\n'
615 def parse_date_or_fatal(str, fatal):
616 """Parses the given date or calls Option.fatal().
617 For now we expect a string that contains a float."""
620 except ValueError, e:
621 raise fatal('invalid date format (should be a float): %r' % e)
626 def strip_path(prefix, path):
627 """Strips a given prefix from a path.
629 First both paths are normalized.
631 Raises an Exception if no prefix is given.
634 raise Exception('no path given')
636 normalized_prefix = os.path.realpath(prefix)
637 debug2("normalized_prefix: %s\n" % normalized_prefix)
638 normalized_path = os.path.realpath(path)
639 debug2("normalized_path: %s\n" % normalized_path)
640 if normalized_path.startswith(normalized_prefix):
641 return normalized_path[len(normalized_prefix):]
646 def strip_base_path(path, base_paths):
647 """Strips the base path from a given path.
650 Determines the base path for the given string and then strips it
652 Iterates over all base_paths from long to short, to prevent that
653 a too short base_path is removed.
655 normalized_path = os.path.realpath(path)
656 sorted_base_paths = sorted(base_paths, key=len, reverse=True)
657 for bp in sorted_base_paths:
658 if normalized_path.startswith(os.path.realpath(bp)):
659 return strip_path(bp, normalized_path)
663 def graft_path(graft_points, path):
664 normalized_path = os.path.realpath(path)
665 for graft_point in graft_points:
666 old_prefix, new_prefix = graft_point
667 if normalized_path.startswith(old_prefix):
668 return re.sub(r'^' + old_prefix, new_prefix, normalized_path)
669 return normalized_path
672 # hashlib is only available in python 2.5 or higher, but the 'sha' module
673 # produces a DeprecationWarning in python 2.6 or higher. We want to support
674 # python 2.4 and above without any stupid warnings, so let's try using hashlib
675 # first, and downgrade if it fails.
686 """Format bup's version date string for output."""
687 return _version.DATE.split(' ')[0]
690 def version_commit():
691 """Get the commit hash of bup's current version."""
692 return _version.COMMIT
696 """Format bup's version tag (the official version number).
698 When generated from a commit other than one pointed to with a tag, the
699 returned string will be "unknown-" followed by the first seven positions of
702 names = _version.NAMES.strip()
703 assert(names[0] == '(')
704 assert(names[-1] == ')')
706 l = [n.strip() for n in names.split(',')]
708 if n.startswith('tag: bup-'):
710 return 'unknown-%s' % _version.COMMIT[:7]