1 """Helper functions and classes for bup."""
3 import sys, os, pwd, subprocess, errno, socket, select, mmap, stat, re, struct
5 from bup import _version
7 # This function should really be in helpers, not in bup.options. But we
8 # want options.py to be standalone so people can include it in other projects.
9 from bup.options import _tty_width
10 tty_width = _tty_width
14 """Convert the string 's' to an integer. Return 0 if s is not a number."""
22 """Convert the string 's' to a float. Return 0 if s is not a number."""
24 return float(s or '0')
29 buglvl = atoi(os.environ.get('BUP_DEBUG', 0))
32 # Write (blockingly) to sockets that may or may not be in blocking mode.
33 # We need this because our stderr is sometimes eaten by subprocesses
34 # (probably ssh) that sometimes make it nonblocking, if only temporarily,
35 # leading to race conditions. Ick. We'll do it the hard way.
36 def _hard_write(fd, buf):
38 (r,w,x) = select.select([], [fd], [], None)
40 raise IOError('select(fd) returned without being writable')
42 sz = os.write(fd, buf)
44 if e.errno != errno.EAGAIN:
50 """Print a log message to stderr."""
52 _hard_write(sys.stderr.fileno(), s)
65 def mkdirp(d, mode=None):
66 """Recursively create directories on path 'd'.
68 Unlike os.makedirs(), it doesn't raise an exception if the last element of
69 the path already exists.
77 if e.errno == errno.EEXIST:
84 """Get the next item from an iterator, None if we reached the end."""
91 def merge_iter(iters, pfreq, pfunc, pfinal, key=None):
93 samekey = lambda e, pe: getattr(e, key) == getattr(pe, key, None)
97 total = sum(len(it) for it in iters)
98 iters = (iter(it) for it in iters)
99 heap = ((next(it),it) for it in iters)
100 heap = [(e,it) for e,it in heap if e]
105 if not count % pfreq:
108 if not samekey(e, pe):
113 e = it.next() # Don't use next() function, it's too expensive
114 except StopIteration:
115 heapq.heappop(heap) # remove current
117 heapq.heapreplace(heap, (e, it)) # shift current to new location
122 """Delete a file at path 'f' if it currently exists.
124 Unlike os.unlink(), does not throw an exception if the file didn't already
130 if e.errno == errno.ENOENT:
131 pass # it doesn't exist, that's what you asked for
135 """Run a subprocess and return its output."""
136 p = subprocess.Popen(argv, stdout=subprocess.PIPE)
143 """Get the absolute path of a file.
145 Behaves like os.path.realpath, but doesn't follow a symlink for the last
146 element. (ie. if 'p' itself is a symlink, this one won't follow it, but it
147 will follow symlinks in p's directory)
153 if st and stat.S_ISLNK(st.st_mode):
154 (dir, name) = os.path.split(p)
155 dir = os.path.realpath(dir)
156 out = os.path.join(dir, name)
158 out = os.path.realpath(p)
159 #log('realpathing:%r,%r\n' % (p, out))
165 """Get the user's login name."""
170 _username = pwd.getpwuid(uid)[0]
172 _username = 'user%d' % uid
178 """Get the user's full name."""
180 if not _userfullname:
183 _userfullname = pwd.getpwuid(uid)[4].split(',')[0]
185 _userfullname = 'user%d' % uid
191 """Get the FQDN of this machine."""
194 _hostname = socket.getfqdn()
198 _resource_path = None
199 def resource_path(subdir=''):
200 global _resource_path
201 if not _resource_path:
202 _resource_path = os.environ.get('BUP_RESOURCE_PATH') or '.'
203 return os.path.join(_resource_path, subdir)
206 class NotOk(Exception):
211 def __init__(self, outp):
215 while self._read(65536): pass
217 def read(self, size):
218 """Read 'size' bytes from input stream."""
220 return self._read(size)
223 """Read from input stream until a newline is found."""
225 return self._readline()
227 def write(self, data):
228 """Write 'data' to output stream."""
229 #log('%d writing: %d bytes\n' % (os.getpid(), len(data)))
230 self.outp.write(data)
233 """Return true if input stream is readable."""
234 raise NotImplemented("Subclasses must implement has_input")
237 """Indicate end of output from last sent command."""
241 """Indicate server error to the client."""
242 s = re.sub(r'\s+', ' ', str(s))
243 self.write('\nerror %s\n' % s)
245 def _check_ok(self, onempty):
248 for rl in linereader(self):
249 #log('%d got line: %r\n' % (os.getpid(), rl))
250 if not rl: # empty line
254 elif rl.startswith('error '):
255 #log('client: error: %s\n' % rl[6:])
259 raise Exception('server exited unexpectedly; see errors above')
261 def drain_and_check_ok(self):
262 """Remove all data for the current command from input stream."""
265 return self._check_ok(onempty)
268 """Verify that server action completed successfully."""
270 raise Exception('expected "ok", got %r' % rl)
271 return self._check_ok(onempty)
274 class Conn(BaseConn):
275 def __init__(self, inp, outp):
276 BaseConn.__init__(self, outp)
279 def _read(self, size):
280 return self.inp.read(size)
283 return self.inp.readline()
286 [rl, wl, xl] = select.select([self.inp.fileno()], [], [], 0)
288 assert(rl[0] == self.inp.fileno())
294 def checked_reader(fd, n):
296 rl, _, _ = select.select([fd], [], [])
299 if not buf: raise Exception("Unexpected EOF reading %d more bytes" % n)
304 MAX_PACKET = 128 * 1024
305 def mux(p, outfd, outr, errr):
308 while p.poll() is None:
309 rl, _, _ = select.select(fds, [], [])
312 buf = os.read(outr, MAX_PACKET)
314 os.write(outfd, struct.pack('!IB', len(buf), 1) + buf)
316 buf = os.read(errr, 1024)
318 os.write(outfd, struct.pack('!IB', len(buf), 2) + buf)
320 os.write(outfd, struct.pack('!IB', 0, 3))
323 class DemuxConn(BaseConn):
324 """A helper class for bup's client-server protocol."""
325 def __init__(self, infd, outp):
326 BaseConn.__init__(self, outp)
327 # Anything that comes through before the sync string was not
328 # multiplexed and can be assumed to be debug/log before mux init.
330 while tail != 'BUPMUX':
331 b = os.read(infd, (len(tail) < 6) and (6-len(tail)) or 1)
333 raise IOError('demux: unexpected EOF during initialization')
335 sys.stderr.write(tail[:-6]) # pre-mux log messages
342 def write(self, data):
344 BaseConn.write(self, data)
346 def _next_packet(self, timeout):
347 if self.closed: return False
348 rl, wl, xl = select.select([self.infd], [], [], timeout)
349 if not rl: return False
350 assert(rl[0] == self.infd)
351 ns = ''.join(checked_reader(self.infd, 5))
352 n, fdw = struct.unpack('!IB', ns)
353 assert(n <= MAX_PACKET)
355 self.reader = checked_reader(self.infd, n)
357 for buf in checked_reader(self.infd, n):
358 sys.stderr.write(buf)
361 debug2("DemuxConn: marked closed\n")
364 def _load_buf(self, timeout):
365 if self.buf is not None:
367 while not self.closed:
368 while not self.reader:
369 if not self._next_packet(timeout):
372 self.buf = self.reader.next()
374 except StopIteration:
378 def _read_parts(self, ix_fn):
379 while self._load_buf(None):
380 assert(self.buf is not None)
382 if i is None or i == len(self.buf):
387 self.buf = self.buf[i:]
395 return buf.index('\n')+1
398 return ''.join(self._read_parts(find_eol))
400 def _read(self, size):
402 def until_size(buf): # Closes on csize
403 if len(buf) < csize[0]:
408 return ''.join(self._read_parts(until_size))
411 return self._load_buf(0)
415 """Generate a list of input lines from 'f' without terminating newlines."""
423 def chunkyreader(f, count = None):
424 """Generate a list of chunks of data read from 'f'.
426 If count is None, read until EOF is reached.
428 If count is a positive integer, read 'count' bytes from 'f'. If EOF is
429 reached while reading, raise IOError.
433 b = f.read(min(count, 65536))
435 raise IOError('EOF with %d bytes remaining' % count)
446 """Append "/" to 's' if it doesn't aleady end in "/"."""
447 if s and not s.endswith('/'):
453 def _mmap_do(f, sz, flags, prot, close):
455 st = os.fstat(f.fileno())
458 # trying to open a zero-length map gives an error, but an empty
459 # string has all the same behaviour of a zero-length map, ie. it has
462 map = mmap.mmap(f.fileno(), sz, flags, prot)
464 f.close() # map will persist beyond file close
468 def mmap_read(f, sz = 0, close=True):
469 """Create a read-only memory mapped region on file 'f'.
470 If sz is 0, the region will cover the entire file.
472 return _mmap_do(f, sz, mmap.MAP_PRIVATE, mmap.PROT_READ, close)
475 def mmap_readwrite(f, sz = 0, close=True):
476 """Create a read-write memory mapped region on file 'f'.
477 If sz is 0, the region will cover the entire file.
479 return _mmap_do(f, sz, mmap.MAP_SHARED, mmap.PROT_READ|mmap.PROT_WRITE,
483 def mmap_readwrite_private(f, sz = 0, close=True):
484 """Create a read-write memory mapped region on file 'f'.
485 If sz is 0, the region will cover the entire file.
486 The map is private, which means the changes are never flushed back to the
489 return _mmap_do(f, sz, mmap.MAP_PRIVATE, mmap.PROT_READ|mmap.PROT_WRITE,
494 """Parse data size information into a float number.
496 Here are some examples of conversions:
497 199.2k means 203981 bytes
498 1GB means 1073741824 bytes
499 2.1 tb means 2199023255552 bytes
501 g = re.match(r'([-+\d.e]+)\s*(\w*)', str(s))
503 raise ValueError("can't parse %r as a number" % s)
504 (val, unit) = g.groups()
507 if unit in ['t', 'tb']:
508 mult = 1024*1024*1024*1024
509 elif unit in ['g', 'gb']:
510 mult = 1024*1024*1024
511 elif unit in ['m', 'mb']:
513 elif unit in ['k', 'kb']:
515 elif unit in ['', 'b']:
518 raise ValueError("invalid unit %r in number %r" % (unit, s))
523 """Count the number of elements in an iterator. (consumes the iterator)"""
524 return reduce(lambda x,y: x+1, l)
529 """Append an error message to the list of saved errors.
531 Once processing is able to stop and output the errors, the saved errors are
532 accessible in the module variable helpers.saved_errors.
534 saved_errors.append(e)
538 istty = os.isatty(2) or atoi(os.environ.get('BUP_FORCE_TTY'))
540 """Calls log(s) if stderr is a TTY. Does nothing otherwise."""
546 """Replace the default exception handler for KeyboardInterrupt (Ctrl-C).
548 The new exception handler will make sure that bup will exit without an ugly
549 stacktrace when Ctrl-C is hit.
551 oldhook = sys.excepthook
552 def newhook(exctype, value, traceback):
553 if exctype == KeyboardInterrupt:
554 log('Interrupted.\n')
556 return oldhook(exctype, value, traceback)
557 sys.excepthook = newhook
560 def columnate(l, prefix):
561 """Format elements of 'l' in columns with 'prefix' leading each line.
563 The number of columns is determined automatically based on the string
569 clen = max(len(s) for s in l)
570 ncols = (tty_width() - len(prefix)) / (clen + 2)
575 while len(l) % ncols:
578 for s in range(0, len(l), rows):
579 cols.append(l[s:s+rows])
581 for row in zip(*cols):
582 out += prefix + ''.join(('%-*s' % (clen+2, s)) for s in row) + '\n'
586 def parse_date_or_fatal(str, fatal):
587 """Parses the given date or calls Option.fatal().
588 For now we expect a string that contains a float."""
591 except ValueError, e:
592 raise fatal('invalid date format (should be a float): %r' % e)
597 def strip_path(prefix, path):
598 """Strips a given prefix from a path.
600 First both paths are normalized.
602 Raises an Exception if no prefix is given.
605 raise Exception('no path given')
607 normalized_prefix = os.path.realpath(prefix)
608 debug2("normalized_prefix: %s\n" % normalized_prefix)
609 normalized_path = os.path.realpath(path)
610 debug2("normalized_path: %s\n" % normalized_path)
611 if normalized_path.startswith(normalized_prefix):
612 return normalized_path[len(normalized_prefix):]
617 def strip_base_path(path, base_paths):
618 """Strips the base path from a given path.
621 Determines the base path for the given string and then strips it
623 Iterates over all base_paths from long to short, to prevent that
624 a too short base_path is removed.
626 normalized_path = os.path.realpath(path)
627 sorted_base_paths = sorted(base_paths, key=len, reverse=True)
628 for bp in sorted_base_paths:
629 if normalized_path.startswith(os.path.realpath(bp)):
630 return strip_path(bp, normalized_path)
634 def graft_path(graft_points, path):
635 normalized_path = os.path.realpath(path)
636 for graft_point in graft_points:
637 old_prefix, new_prefix = graft_point
638 if normalized_path.startswith(old_prefix):
639 return re.sub(r'^' + old_prefix, new_prefix, normalized_path)
640 return normalized_path
643 # hashlib is only available in python 2.5 or higher, but the 'sha' module
644 # produces a DeprecationWarning in python 2.6 or higher. We want to support
645 # python 2.4 and above without any stupid warnings, so let's try using hashlib
646 # first, and downgrade if it fails.
657 """Format bup's version date string for output."""
658 return _version.DATE.split(' ')[0]
661 def version_commit():
662 """Get the commit hash of bup's current version."""
663 return _version.COMMIT
667 """Format bup's version tag (the official version number).
669 When generated from a commit other than one pointed to with a tag, the
670 returned string will be "unknown-" followed by the first seven positions of
673 names = _version.NAMES.strip()
674 assert(names[0] == '(')
675 assert(names[-1] == ')')
677 l = [n.strip() for n in names.split(',')]
679 if n.startswith('tag: bup-'):
681 return 'unknown-%s' % _version.COMMIT[:7]