1 """Helper functions and classes for bup."""
3 import sys, os, pwd, subprocess, errno, socket, select, mmap, stat, re, struct
5 from bup import _version
7 # This function should really be in helpers, not in bup.options. But we
8 # want options.py to be standalone so people can include it in other projects.
9 from bup.options import _tty_width
10 tty_width = _tty_width
14 """Convert the string 's' to an integer. Return 0 if s is not a number."""
22 """Convert the string 's' to a float. Return 0 if s is not a number."""
24 return float(s or '0')
29 buglvl = atoi(os.environ.get('BUP_DEBUG', 0))
32 # Write (blockingly) to sockets that may or may not be in blocking mode.
33 # We need this because our stderr is sometimes eaten by subprocesses
34 # (probably ssh) that sometimes make it nonblocking, if only temporarily,
35 # leading to race conditions. Ick. We'll do it the hard way.
36 def _hard_write(fd, buf):
38 (r,w,x) = select.select([], [fd], [], None)
40 raise IOError('select(fd) returned without being writable')
42 sz = os.write(fd, buf)
44 if e.errno != errno.EAGAIN:
50 """Print a log message to stderr."""
52 _hard_write(sys.stderr.fileno(), s)
65 def mkdirp(d, mode=None):
66 """Recursively create directories on path 'd'.
68 Unlike os.makedirs(), it doesn't raise an exception if the last element of
69 the path already exists.
77 if e.errno == errno.EEXIST:
84 """Get the next item from an iterator, None if we reached the end."""
91 def merge_iter(iters, pfreq, pfunc, pfinal, key=None):
93 samekey = lambda e, pe: getattr(e, key) == getattr(pe, key, None)
97 total = sum(len(it) for it in iters)
98 iters = (iter(it) for it in iters)
99 heap = ((next(it),it) for it in iters)
100 heap = [(e,it) for e,it in heap if e]
105 if not count % pfreq:
108 if not samekey(e, pe):
113 e = it.next() # Don't use next() function, it's too expensive
114 except StopIteration:
115 heapq.heappop(heap) # remove current
117 heapq.heapreplace(heap, (e, it)) # shift current to new location
122 """Delete a file at path 'f' if it currently exists.
124 Unlike os.unlink(), does not throw an exception if the file didn't already
130 if e.errno == errno.ENOENT:
131 pass # it doesn't exist, that's what you asked for
135 """Run a subprocess and return its output."""
136 p = subprocess.Popen(argv, stdout=subprocess.PIPE)
143 """Get the absolute path of a file.
145 Behaves like os.path.realpath, but doesn't follow a symlink for the last
146 element. (ie. if 'p' itself is a symlink, this one won't follow it, but it
147 will follow symlinks in p's directory)
153 if st and stat.S_ISLNK(st.st_mode):
154 (dir, name) = os.path.split(p)
155 dir = os.path.realpath(dir)
156 out = os.path.join(dir, name)
158 out = os.path.realpath(p)
159 #log('realpathing:%r,%r\n' % (p, out))
165 """Get the user's login name."""
170 _username = pwd.getpwuid(uid)[0]
172 _username = 'user%d' % uid
178 """Get the user's full name."""
180 if not _userfullname:
183 _userfullname = pwd.getpwuid(uid)[4].split(',')[0]
185 _userfullname = 'user%d' % uid
191 """Get the FQDN of this machine."""
194 _hostname = socket.getfqdn()
198 _resource_path = None
199 def resource_path(subdir=''):
200 global _resource_path
201 if not _resource_path:
202 _resource_path = os.environ.get('BUP_RESOURCE_PATH') or '.'
203 return os.path.join(_resource_path, subdir)
206 class NotOk(Exception):
211 def __init__(self, outp):
215 while self._read(65536): pass
217 def read(self, size):
218 """Read 'size' bytes from input stream."""
220 return self._read(size)
223 """Read from input stream until a newline is found."""
225 return self._readline()
227 def write(self, data):
228 """Write 'data' to output stream."""
229 #log('%d writing: %d bytes\n' % (os.getpid(), len(data)))
230 self.outp.write(data)
233 """Return true if input stream is readable."""
234 raise NotImplemented("Subclasses must implement has_input")
237 """Indicate end of output from last sent command."""
241 """Indicate server error to the client."""
242 s = re.sub(r'\s+', ' ', str(s))
243 self.write('\nerror %s\n' % s)
245 def _check_ok(self, onempty):
248 for rl in linereader(self):
249 #log('%d got line: %r\n' % (os.getpid(), rl))
250 if not rl: # empty line
254 elif rl.startswith('error '):
255 #log('client: error: %s\n' % rl[6:])
259 raise Exception('server exited unexpectedly; see errors above')
261 def drain_and_check_ok(self):
262 """Remove all data for the current command from input stream."""
265 return self._check_ok(onempty)
268 """Verify that server action completed successfully."""
270 raise Exception('expected "ok", got %r' % rl)
271 return self._check_ok(onempty)
274 class Conn(BaseConn):
275 def __init__(self, inp, outp):
276 BaseConn.__init__(self, outp)
279 def _read(self, size):
280 return self.inp.read(size)
283 return self.inp.readline()
286 [rl, wl, xl] = select.select([self.inp.fileno()], [], [], 0)
288 assert(rl[0] == self.inp.fileno())
294 def checked_reader(fd, n):
296 rl, _, _ = select.select([fd], [], [])
299 if not buf: raise Exception("Unexpected EOF reading %d more bytes" % n)
304 MAX_PACKET = 128 * 1024
305 def mux(p, outfd, outr, errr):
308 while p.poll() is None:
309 rl, _, _ = select.select(fds, [], [])
312 buf = os.read(outr, MAX_PACKET)
314 os.write(outfd, struct.pack('!IB', len(buf), 1) + buf)
316 buf = os.read(errr, 1024)
318 os.write(outfd, struct.pack('!IB', len(buf), 2) + buf)
320 os.write(outfd, struct.pack('!IB', 0, 3))
323 class DemuxConn(BaseConn):
324 """A helper class for bup's client-server protocol."""
325 def __init__(self, infd, outp):
326 BaseConn.__init__(self, outp)
327 # Anything that comes through before the sync string was not
328 # multiplexed and can be assumed to be debug/log before mux init.
330 while tail != 'BUPMUX':
331 b = os.read(infd, (len(tail) < 6) and (6-len(tail)) or 1)
333 raise IOError('demux: unexpected EOF during initialization')
335 sys.stderr.write(tail[:-6]) # pre-mux log messages
342 def write(self, data):
344 BaseConn.write(self, data)
346 def _next_packet(self, timeout):
347 if self.closed: return False
348 rl, wl, xl = select.select([self.infd], [], [], timeout)
349 if not rl: return False
350 assert(rl[0] == self.infd)
351 ns = ''.join(checked_reader(self.infd, 5))
352 n, fdw = struct.unpack('!IB', ns)
353 assert(n <= MAX_PACKET)
355 self.reader = checked_reader(self.infd, n)
357 for buf in checked_reader(self.infd, n):
358 sys.stderr.write(buf)
361 debug2("DemuxConn: marked closed\n")
364 def _load_buf(self, timeout):
365 if self.buf is not None:
367 while not self.closed:
368 while not self.reader:
369 if not self._next_packet(timeout):
372 self.buf = self.reader.next()
374 except StopIteration:
378 def _read_parts(self, ix_fn):
379 while self._load_buf(None):
380 assert(self.buf is not None)
382 if i is None or i == len(self.buf):
387 self.buf = self.buf[i:]
395 return buf.index('\n')+1
398 return ''.join(self._read_parts(find_eol))
400 def _read(self, size):
402 def until_size(buf): # Closes on csize
403 if len(buf) < csize[0]:
408 return ''.join(self._read_parts(until_size))
411 return self._load_buf(0)
415 """Generate a list of input lines from 'f' without terminating newlines."""
423 def chunkyreader(f, count = None):
424 """Generate a list of chunks of data read from 'f'.
426 If count is None, read until EOF is reached.
428 If count is a positive integer, read 'count' bytes from 'f'. If EOF is
429 reached while reading, raise IOError.
433 b = f.read(min(count, 65536))
435 raise IOError('EOF with %d bytes remaining' % count)
446 """Append "/" to 's' if it doesn't aleady end in "/"."""
447 if s and not s.endswith('/'):
453 def _mmap_do(f, sz, flags, prot, close):
455 st = os.fstat(f.fileno())
458 # trying to open a zero-length map gives an error, but an empty
459 # string has all the same behaviour of a zero-length map, ie. it has
462 map = mmap.mmap(f.fileno(), sz, flags, prot)
464 f.close() # map will persist beyond file close
468 def mmap_read(f, sz = 0, close=True):
469 """Create a read-only memory mapped region on file 'f'.
471 If sz is 0, the region will cover the entire file.
473 return _mmap_do(f, sz, mmap.MAP_PRIVATE, mmap.PROT_READ, close)
476 def mmap_readwrite(f, sz = 0, close=True):
477 """Create a read-write memory mapped region on file 'f'.
479 If sz is 0, the region will cover the entire file.
481 return _mmap_do(f, sz, mmap.MAP_SHARED, mmap.PROT_READ|mmap.PROT_WRITE,
486 """Parse data size information into a float number.
488 Here are some examples of conversions:
489 199.2k means 203981 bytes
490 1GB means 1073741824 bytes
491 2.1 tb means 2199023255552 bytes
493 g = re.match(r'([-+\d.e]+)\s*(\w*)', str(s))
495 raise ValueError("can't parse %r as a number" % s)
496 (val, unit) = g.groups()
499 if unit in ['t', 'tb']:
500 mult = 1024*1024*1024*1024
501 elif unit in ['g', 'gb']:
502 mult = 1024*1024*1024
503 elif unit in ['m', 'mb']:
505 elif unit in ['k', 'kb']:
507 elif unit in ['', 'b']:
510 raise ValueError("invalid unit %r in number %r" % (unit, s))
515 """Count the number of elements in an iterator. (consumes the iterator)"""
516 return reduce(lambda x,y: x+1, l)
521 """Append an error message to the list of saved errors.
523 Once processing is able to stop and output the errors, the saved errors are
524 accessible in the module variable helpers.saved_errors.
526 saved_errors.append(e)
530 istty = os.isatty(2) or atoi(os.environ.get('BUP_FORCE_TTY'))
532 """Calls log(s) if stderr is a TTY. Does nothing otherwise."""
538 """Replace the default exception handler for KeyboardInterrupt (Ctrl-C).
540 The new exception handler will make sure that bup will exit without an ugly
541 stacktrace when Ctrl-C is hit.
543 oldhook = sys.excepthook
544 def newhook(exctype, value, traceback):
545 if exctype == KeyboardInterrupt:
546 log('Interrupted.\n')
548 return oldhook(exctype, value, traceback)
549 sys.excepthook = newhook
552 def columnate(l, prefix):
553 """Format elements of 'l' in columns with 'prefix' leading each line.
555 The number of columns is determined automatically based on the string
561 clen = max(len(s) for s in l)
562 ncols = (tty_width() - len(prefix)) / (clen + 2)
567 while len(l) % ncols:
570 for s in range(0, len(l), rows):
571 cols.append(l[s:s+rows])
573 for row in zip(*cols):
574 out += prefix + ''.join(('%-*s' % (clen+2, s)) for s in row) + '\n'
578 def parse_date_or_fatal(str, fatal):
579 """Parses the given date or calls Option.fatal().
580 For now we expect a string that contains a float."""
583 except ValueError, e:
584 raise fatal('invalid date format (should be a float): %r' % e)
589 def strip_path(prefix, path):
590 """Strips a given prefix from a path.
592 First both paths are normalized.
594 Raises an Exception if no prefix is given.
597 raise Exception('no path given')
599 normalized_prefix = os.path.realpath(prefix)
600 debug2("normalized_prefix: %s\n" % normalized_prefix)
601 normalized_path = os.path.realpath(path)
602 debug2("normalized_path: %s\n" % normalized_path)
603 if normalized_path.startswith(normalized_prefix):
604 return normalized_path[len(normalized_prefix):]
609 def strip_base_path(path, base_paths):
610 """Strips the base path from a given path.
613 Determines the base path for the given string and then strips it
615 Iterates over all base_paths from long to short, to prevent that
616 a too short base_path is removed.
618 normalized_path = os.path.realpath(path)
619 sorted_base_paths = sorted(base_paths, key=len, reverse=True)
620 for bp in sorted_base_paths:
621 if normalized_path.startswith(os.path.realpath(bp)):
622 return strip_path(bp, normalized_path)
626 def graft_path(graft_points, path):
627 normalized_path = os.path.realpath(path)
628 for graft_point in graft_points:
629 old_prefix, new_prefix = graft_point
630 if normalized_path.startswith(old_prefix):
631 return re.sub(r'^' + old_prefix, new_prefix, normalized_path)
632 return normalized_path
635 # hashlib is only available in python 2.5 or higher, but the 'sha' module
636 # produces a DeprecationWarning in python 2.6 or higher. We want to support
637 # python 2.4 and above without any stupid warnings, so let's try using hashlib
638 # first, and downgrade if it fails.
649 """Format bup's version date string for output."""
650 return _version.DATE.split(' ')[0]
653 def version_commit():
654 """Get the commit hash of bup's current version."""
655 return _version.COMMIT
659 """Format bup's version tag (the official version number).
661 When generated from a commit other than one pointed to with a tag, the
662 returned string will be "unknown-" followed by the first seven positions of
665 names = _version.NAMES.strip()
666 assert(names[0] == '(')
667 assert(names[-1] == ')')
669 l = [n.strip() for n in names.split(',')]
671 if n.startswith('tag: bup-'):
673 return 'unknown-%s' % _version.COMMIT[:7]