1 """Helper functions and classes for bup."""
3 import sys, os, pwd, subprocess, errno, socket, select, mmap, stat, re, struct
4 import heapq, operator, time
5 from bup import _version, _helpers
7 # This function should really be in helpers, not in bup.options. But we
8 # want options.py to be standalone so people can include it in other projects.
9 from bup.options import _tty_width
10 tty_width = _tty_width
14 """Convert the string 's' to an integer. Return 0 if s is not a number."""
22 """Convert the string 's' to a float. Return 0 if s is not a number."""
24 return float(s or '0')
29 buglvl = atoi(os.environ.get('BUP_DEBUG', 0))
32 # Write (blockingly) to sockets that may or may not be in blocking mode.
33 # We need this because our stderr is sometimes eaten by subprocesses
34 # (probably ssh) that sometimes make it nonblocking, if only temporarily,
35 # leading to race conditions. Ick. We'll do it the hard way.
36 def _hard_write(fd, buf):
38 (r,w,x) = select.select([], [fd], [], None)
40 raise IOError('select(fd) returned without being writable')
42 sz = os.write(fd, buf)
44 if e.errno != errno.EAGAIN:
52 """Print a log message to stderr."""
55 _hard_write(sys.stderr.fileno(), s)
69 istty = os.isatty(2) or atoi(os.environ.get('BUP_FORCE_TTY'))
71 """Calls log() if stderr is a TTY. Does nothing otherwise."""
77 """Calls progress() only if we haven't printed progress in a while.
79 This avoids overloading the stderr buffer with excess junk."""
82 if now - _last_prog > 0.1:
87 def mkdirp(d, mode=None):
88 """Recursively create directories on path 'd'.
90 Unlike os.makedirs(), it doesn't raise an exception if the last element of
91 the path already exists.
99 if e.errno == errno.EEXIST:
106 """Get the next item from an iterator, None if we reached the end."""
109 except StopIteration:
113 def merge_iter(iters, pfreq, pfunc, pfinal, key=None):
115 samekey = lambda e, pe: getattr(e, key) == getattr(pe, key, None)
117 samekey = operator.eq
119 total = sum(len(it) for it in iters)
120 iters = (iter(it) for it in iters)
121 heap = ((next(it),it) for it in iters)
122 heap = [(e,it) for e,it in heap if e]
127 if not count % pfreq:
130 if not samekey(e, pe):
135 e = it.next() # Don't use next() function, it's too expensive
136 except StopIteration:
137 heapq.heappop(heap) # remove current
139 heapq.heapreplace(heap, (e, it)) # shift current to new location
144 """Delete a file at path 'f' if it currently exists.
146 Unlike os.unlink(), does not throw an exception if the file didn't already
152 if e.errno == errno.ENOENT:
153 pass # it doesn't exist, that's what you asked for
157 """Run a subprocess and return its output."""
158 p = subprocess.Popen(argv, stdout=subprocess.PIPE)
165 """Get the absolute path of a file.
167 Behaves like os.path.realpath, but doesn't follow a symlink for the last
168 element. (ie. if 'p' itself is a symlink, this one won't follow it, but it
169 will follow symlinks in p's directory)
175 if st and stat.S_ISLNK(st.st_mode):
176 (dir, name) = os.path.split(p)
177 dir = os.path.realpath(dir)
178 out = os.path.join(dir, name)
180 out = os.path.realpath(p)
181 #log('realpathing:%r,%r\n' % (p, out))
187 """Get the user's login name."""
192 _username = pwd.getpwuid(uid)[0]
194 _username = 'user%d' % uid
200 """Get the user's full name."""
202 if not _userfullname:
205 _userfullname = pwd.getpwuid(uid)[4].split(',')[0]
207 _userfullname = 'user%d' % uid
213 """Get the FQDN of this machine."""
216 _hostname = socket.getfqdn()
220 _resource_path = None
221 def resource_path(subdir=''):
222 global _resource_path
223 if not _resource_path:
224 _resource_path = os.environ.get('BUP_RESOURCE_PATH') or '.'
225 return os.path.join(_resource_path, subdir)
228 class NotOk(Exception):
233 def __init__(self, outp):
237 while self._read(65536): pass
239 def read(self, size):
240 """Read 'size' bytes from input stream."""
242 return self._read(size)
245 """Read from input stream until a newline is found."""
247 return self._readline()
249 def write(self, data):
250 """Write 'data' to output stream."""
251 #log('%d writing: %d bytes\n' % (os.getpid(), len(data)))
252 self.outp.write(data)
255 """Return true if input stream is readable."""
256 raise NotImplemented("Subclasses must implement has_input")
259 """Indicate end of output from last sent command."""
263 """Indicate server error to the client."""
264 s = re.sub(r'\s+', ' ', str(s))
265 self.write('\nerror %s\n' % s)
267 def _check_ok(self, onempty):
270 for rl in linereader(self):
271 #log('%d got line: %r\n' % (os.getpid(), rl))
272 if not rl: # empty line
276 elif rl.startswith('error '):
277 #log('client: error: %s\n' % rl[6:])
281 raise Exception('server exited unexpectedly; see errors above')
283 def drain_and_check_ok(self):
284 """Remove all data for the current command from input stream."""
287 return self._check_ok(onempty)
290 """Verify that server action completed successfully."""
292 raise Exception('expected "ok", got %r' % rl)
293 return self._check_ok(onempty)
296 class Conn(BaseConn):
297 def __init__(self, inp, outp):
298 BaseConn.__init__(self, outp)
301 def _read(self, size):
302 return self.inp.read(size)
305 return self.inp.readline()
308 [rl, wl, xl] = select.select([self.inp.fileno()], [], [], 0)
310 assert(rl[0] == self.inp.fileno())
316 def checked_reader(fd, n):
318 rl, _, _ = select.select([fd], [], [])
321 if not buf: raise Exception("Unexpected EOF reading %d more bytes" % n)
326 MAX_PACKET = 128 * 1024
327 def mux(p, outfd, outr, errr):
330 while p.poll() is None:
331 rl, _, _ = select.select(fds, [], [])
334 buf = os.read(outr, MAX_PACKET)
336 os.write(outfd, struct.pack('!IB', len(buf), 1) + buf)
338 buf = os.read(errr, 1024)
340 os.write(outfd, struct.pack('!IB', len(buf), 2) + buf)
342 os.write(outfd, struct.pack('!IB', 0, 3))
345 class DemuxConn(BaseConn):
346 """A helper class for bup's client-server protocol."""
347 def __init__(self, infd, outp):
348 BaseConn.__init__(self, outp)
349 # Anything that comes through before the sync string was not
350 # multiplexed and can be assumed to be debug/log before mux init.
352 while tail != 'BUPMUX':
353 b = os.read(infd, (len(tail) < 6) and (6-len(tail)) or 1)
355 raise IOError('demux: unexpected EOF during initialization')
357 sys.stderr.write(tail[:-6]) # pre-mux log messages
364 def write(self, data):
366 BaseConn.write(self, data)
368 def _next_packet(self, timeout):
369 if self.closed: return False
370 rl, wl, xl = select.select([self.infd], [], [], timeout)
371 if not rl: return False
372 assert(rl[0] == self.infd)
373 ns = ''.join(checked_reader(self.infd, 5))
374 n, fdw = struct.unpack('!IB', ns)
375 assert(n <= MAX_PACKET)
377 self.reader = checked_reader(self.infd, n)
379 for buf in checked_reader(self.infd, n):
380 sys.stderr.write(buf)
383 debug2("DemuxConn: marked closed\n")
386 def _load_buf(self, timeout):
387 if self.buf is not None:
389 while not self.closed:
390 while not self.reader:
391 if not self._next_packet(timeout):
394 self.buf = self.reader.next()
396 except StopIteration:
400 def _read_parts(self, ix_fn):
401 while self._load_buf(None):
402 assert(self.buf is not None)
404 if i is None or i == len(self.buf):
409 self.buf = self.buf[i:]
417 return buf.index('\n')+1
420 return ''.join(self._read_parts(find_eol))
422 def _read(self, size):
424 def until_size(buf): # Closes on csize
425 if len(buf) < csize[0]:
430 return ''.join(self._read_parts(until_size))
433 return self._load_buf(0)
437 """Generate a list of input lines from 'f' without terminating newlines."""
445 def chunkyreader(f, count = None):
446 """Generate a list of chunks of data read from 'f'.
448 If count is None, read until EOF is reached.
450 If count is a positive integer, read 'count' bytes from 'f'. If EOF is
451 reached while reading, raise IOError.
455 b = f.read(min(count, 65536))
457 raise IOError('EOF with %d bytes remaining' % count)
468 """Append "/" to 's' if it doesn't aleady end in "/"."""
469 if s and not s.endswith('/'):
475 def _mmap_do(f, sz, flags, prot, close):
477 st = os.fstat(f.fileno())
480 # trying to open a zero-length map gives an error, but an empty
481 # string has all the same behaviour of a zero-length map, ie. it has
484 map = mmap.mmap(f.fileno(), sz, flags, prot)
486 f.close() # map will persist beyond file close
490 def mmap_read(f, sz = 0, close=True):
491 """Create a read-only memory mapped region on file 'f'.
492 If sz is 0, the region will cover the entire file.
494 return _mmap_do(f, sz, mmap.MAP_PRIVATE, mmap.PROT_READ, close)
497 def mmap_readwrite(f, sz = 0, close=True):
498 """Create a read-write memory mapped region on file 'f'.
499 If sz is 0, the region will cover the entire file.
501 return _mmap_do(f, sz, mmap.MAP_SHARED, mmap.PROT_READ|mmap.PROT_WRITE,
505 def mmap_readwrite_private(f, sz = 0, close=True):
506 """Create a read-write memory mapped region on file 'f'.
507 If sz is 0, the region will cover the entire file.
508 The map is private, which means the changes are never flushed back to the
511 return _mmap_do(f, sz, mmap.MAP_PRIVATE, mmap.PROT_READ|mmap.PROT_WRITE,
516 """Parse data size information into a float number.
518 Here are some examples of conversions:
519 199.2k means 203981 bytes
520 1GB means 1073741824 bytes
521 2.1 tb means 2199023255552 bytes
523 g = re.match(r'([-+\d.e]+)\s*(\w*)', str(s))
525 raise ValueError("can't parse %r as a number" % s)
526 (val, unit) = g.groups()
529 if unit in ['t', 'tb']:
530 mult = 1024*1024*1024*1024
531 elif unit in ['g', 'gb']:
532 mult = 1024*1024*1024
533 elif unit in ['m', 'mb']:
535 elif unit in ['k', 'kb']:
537 elif unit in ['', 'b']:
540 raise ValueError("invalid unit %r in number %r" % (unit, s))
545 """Count the number of elements in an iterator. (consumes the iterator)"""
546 return reduce(lambda x,y: x+1, l)
551 """Append an error message to the list of saved errors.
553 Once processing is able to stop and output the errors, the saved errors are
554 accessible in the module variable helpers.saved_errors.
556 saved_errors.append(e)
561 """Replace the default exception handler for KeyboardInterrupt (Ctrl-C).
563 The new exception handler will make sure that bup will exit without an ugly
564 stacktrace when Ctrl-C is hit.
566 oldhook = sys.excepthook
567 def newhook(exctype, value, traceback):
568 if exctype == KeyboardInterrupt:
569 log('Interrupted.\n')
571 return oldhook(exctype, value, traceback)
572 sys.excepthook = newhook
575 def columnate(l, prefix):
576 """Format elements of 'l' in columns with 'prefix' leading each line.
578 The number of columns is determined automatically based on the string
584 clen = max(len(s) for s in l)
585 ncols = (tty_width() - len(prefix)) / (clen + 2)
590 while len(l) % ncols:
593 for s in range(0, len(l), rows):
594 cols.append(l[s:s+rows])
596 for row in zip(*cols):
597 out += prefix + ''.join(('%-*s' % (clen+2, s)) for s in row) + '\n'
601 def parse_date_or_fatal(str, fatal):
602 """Parses the given date or calls Option.fatal().
603 For now we expect a string that contains a float."""
606 except ValueError, e:
607 raise fatal('invalid date format (should be a float): %r' % e)
612 def strip_path(prefix, path):
613 """Strips a given prefix from a path.
615 First both paths are normalized.
617 Raises an Exception if no prefix is given.
620 raise Exception('no path given')
622 normalized_prefix = os.path.realpath(prefix)
623 debug2("normalized_prefix: %s\n" % normalized_prefix)
624 normalized_path = os.path.realpath(path)
625 debug2("normalized_path: %s\n" % normalized_path)
626 if normalized_path.startswith(normalized_prefix):
627 return normalized_path[len(normalized_prefix):]
632 def strip_base_path(path, base_paths):
633 """Strips the base path from a given path.
636 Determines the base path for the given string and then strips it
638 Iterates over all base_paths from long to short, to prevent that
639 a too short base_path is removed.
641 normalized_path = os.path.realpath(path)
642 sorted_base_paths = sorted(base_paths, key=len, reverse=True)
643 for bp in sorted_base_paths:
644 if normalized_path.startswith(os.path.realpath(bp)):
645 return strip_path(bp, normalized_path)
649 def graft_path(graft_points, path):
650 normalized_path = os.path.realpath(path)
651 for graft_point in graft_points:
652 old_prefix, new_prefix = graft_point
653 if normalized_path.startswith(old_prefix):
654 return re.sub(r'^' + old_prefix, new_prefix, normalized_path)
655 return normalized_path
658 # hashlib is only available in python 2.5 or higher, but the 'sha' module
659 # produces a DeprecationWarning in python 2.6 or higher. We want to support
660 # python 2.4 and above without any stupid warnings, so let's try using hashlib
661 # first, and downgrade if it fails.
672 """Format bup's version date string for output."""
673 return _version.DATE.split(' ')[0]
676 def version_commit():
677 """Get the commit hash of bup's current version."""
678 return _version.COMMIT
682 """Format bup's version tag (the official version number).
684 When generated from a commit other than one pointed to with a tag, the
685 returned string will be "unknown-" followed by the first seven positions of
688 names = _version.NAMES.strip()
689 assert(names[0] == '(')
690 assert(names[-1] == ')')
692 l = [n.strip() for n in names.split(',')]
694 if n.startswith('tag: bup-'):
696 return 'unknown-%s' % _version.COMMIT[:7]