1 """Helper functions and classes for bup."""
3 import sys, os, pwd, subprocess, errno, socket, select, mmap, stat, re, struct
5 from bup import _version
7 # This function should really be in helpers, not in bup.options. But we
8 # want options.py to be standalone so people can include it in other projects.
9 from bup.options import _tty_width
10 tty_width = _tty_width
14 """Convert the string 's' to an integer. Return 0 if s is not a number."""
22 """Convert the string 's' to a float. Return 0 if s is not a number."""
24 return float(s or '0')
29 buglvl = atoi(os.environ.get('BUP_DEBUG', 0))
32 # Write (blockingly) to sockets that may or may not be in blocking mode.
33 # We need this because our stderr is sometimes eaten by subprocesses
34 # (probably ssh) that sometimes make it nonblocking, if only temporarily,
35 # leading to race conditions. Ick. We'll do it the hard way.
36 def _hard_write(fd, buf):
38 (r,w,x) = select.select([], [fd], [], None)
40 raise IOError('select(fd) returned without being writable')
42 sz = os.write(fd, buf)
44 if e.errno != errno.EAGAIN:
50 """Print a log message to stderr."""
52 _hard_write(sys.stderr.fileno(), s)
65 def mkdirp(d, mode=None):
66 """Recursively create directories on path 'd'.
68 Unlike os.makedirs(), it doesn't raise an exception if the last element of
69 the path already exists.
77 if e.errno == errno.EEXIST:
84 """Get the next item from an iterator, None if we reached the end."""
91 def merge_iter(iters, pfreq, pfunc, pfinal, key=None):
93 samekey = lambda e, pe: getattr(e, key) == getattr(pe, key, None)
97 total = sum(len(it) for it in iters)
98 iters = (iter(it) for it in iters)
99 heap = ((next(it),it) for it in iters)
100 heap = [(e,it) for e,it in heap if e]
105 if not count % pfreq:
108 if not samekey(e, pe):
113 e = it.next() # Don't use next() function, it's too expensive
114 except StopIteration:
115 heapq.heappop(heap) # remove current
117 heapq.heapreplace(heap, (e, it)) # shift current to new location
122 """Delete a file at path 'f' if it currently exists.
124 Unlike os.unlink(), does not throw an exception if the file didn't already
130 if e.errno == errno.ENOENT:
131 pass # it doesn't exist, that's what you asked for
135 """Run a subprocess and return its output."""
136 p = subprocess.Popen(argv, stdout=subprocess.PIPE)
143 """Get the absolute path of a file.
145 Behaves like os.path.realpath, but doesn't follow a symlink for the last
146 element. (ie. if 'p' itself is a symlink, this one won't follow it, but it
147 will follow symlinks in p's directory)
153 if st and stat.S_ISLNK(st.st_mode):
154 (dir, name) = os.path.split(p)
155 dir = os.path.realpath(dir)
156 out = os.path.join(dir, name)
158 out = os.path.realpath(p)
159 #log('realpathing:%r,%r\n' % (p, out))
165 """Get the user's login name."""
170 _username = pwd.getpwuid(uid)[0]
172 _username = 'user%d' % uid
178 """Get the user's full name."""
180 if not _userfullname:
183 _userfullname = pwd.getpwuid(uid)[4].split(',')[0]
185 _userfullname = 'user%d' % uid
191 """Get the FQDN of this machine."""
194 _hostname = socket.getfqdn()
198 _resource_path = None
199 def resource_path(subdir=''):
200 global _resource_path
201 if not _resource_path:
202 _resource_path = os.environ.get('BUP_RESOURCE_PATH') or '.'
203 return os.path.join(_resource_path, subdir)
206 class NotOk(Exception):
211 def __init__(self, outp):
215 while self._read(65536): pass
217 def read(self, size):
218 """Read 'size' bytes from input stream."""
220 return self._read(size)
223 """Read from input stream until a newline is found."""
225 return self._readline()
227 def write(self, data):
228 """Write 'data' to output stream."""
229 #log('%d writing: %d bytes\n' % (os.getpid(), len(data)))
230 self.outp.write(data)
233 """Return true if input stream is readable."""
234 raise NotImplemented("Subclasses must implement has_input")
237 """Indicate end of output from last sent command."""
241 """Indicate server error to the client."""
242 s = re.sub(r'\s+', ' ', str(s))
243 self.write('\nerror %s\n' % s)
245 def _check_ok(self, onempty):
248 for rl in linereader(self):
249 #log('%d got line: %r\n' % (os.getpid(), rl))
250 if not rl: # empty line
254 elif rl.startswith('error '):
255 #log('client: error: %s\n' % rl[6:])
259 raise Exception('server exited unexpectedly; see errors above')
261 def drain_and_check_ok(self):
262 """Remove all data for the current command from input stream."""
265 return self._check_ok(onempty)
268 """Verify that server action completed successfully."""
270 raise Exception('expected "ok", got %r' % rl)
271 return self._check_ok(onempty)
274 class Conn(BaseConn):
275 def __init__(self, inp, outp):
276 BaseConn.__init__(self, outp)
279 def _read(self, size):
280 return self.inp.read(size)
283 return self.inp.readline()
286 [rl, wl, xl] = select.select([self.inp.fileno()], [], [], 0)
288 assert(rl[0] == self.inp.fileno())
294 def checked_reader(fd, n):
296 rl, _, _ = select.select([fd], [], [])
299 if not buf: raise Exception("Unexpected EOF reading %d more bytes" % n)
304 MAX_PACKET = 128 * 1024
305 def mux(p, outfd, outr, errr):
308 while p.poll() is None:
309 rl, _, _ = select.select(fds, [], [])
312 buf = os.read(outr, MAX_PACKET)
314 os.write(outfd, struct.pack('!IB', len(buf), 1) + buf)
316 buf = os.read(errr, 1024)
318 os.write(outfd, struct.pack('!IB', len(buf), 2) + buf)
320 os.write(outfd, struct.pack('!IB', 0, 3))
323 class DemuxConn(BaseConn):
324 """A helper class for bup's client-server protocol."""
325 def __init__(self, infd, outp):
326 BaseConn.__init__(self, outp)
327 # Anything that comes through before the sync string was not
328 # multiplexed and can be assumed to be debug/log before mux init.
330 while tail != 'BUPMUX':
331 b = os.read(infd, (len(tail) < 6) and (6-len(tail)) or 1)
333 raise IOError('demux: unexpected EOF during initialization')
335 sys.stderr.write(tail[:-6]) # pre-mux log messages
342 def write(self, data):
344 BaseConn.write(self, data)
346 def _next_packet(self, timeout):
347 if self.closed: return False
348 rl, wl, xl = select.select([self.infd], [], [], timeout)
349 if not rl: return False
350 assert(rl[0] == self.infd)
351 ns = ''.join(checked_reader(self.infd, 5))
352 n, fdw = struct.unpack('!IB', ns)
353 assert(n <= MAX_PACKET)
355 self.reader = checked_reader(self.infd, n)
357 for buf in checked_reader(self.infd, n):
358 sys.stderr.write(buf)
361 debug2("DemuxConn: marked closed\n")
364 def _load_buf(self, timeout):
365 if self.buf is not None:
367 while not self.closed:
368 while not self.reader:
369 if not self._next_packet(timeout):
372 self.buf = self.reader.next()
374 except StopIteration:
378 def _read_parts(self, ix_fn):
379 while self._load_buf(None):
380 assert(self.buf is not None)
382 if i is None or i == len(self.buf):
387 self.buf = self.buf[i:]
395 return buf.index('\n')+1
398 return ''.join(self._read_parts(find_eol))
400 def _read(self, size):
402 def until_size(buf): # Closes on csize
403 if len(buf) < csize[0]:
408 return ''.join(self._read_parts(until_size))
411 return self._load_buf(0)
415 """Generate a list of input lines from 'f' without terminating newlines."""
423 def chunkyreader(f, count = None):
424 """Generate a list of chunks of data read from 'f'.
426 If count is None, read until EOF is reached.
428 If count is a positive integer, read 'count' bytes from 'f'. If EOF is
429 reached while reading, raise IOError.
433 b = f.read(min(count, 65536))
435 raise IOError('EOF with %d bytes remaining' % count)
446 """Append "/" to 's' if it doesn't aleady end in "/"."""
447 if s and not s.endswith('/'):
453 def _mmap_do(f, sz, flags, prot):
455 st = os.fstat(f.fileno())
458 # trying to open a zero-length map gives an error, but an empty
459 # string has all the same behaviour of a zero-length map, ie. it has
462 map = mmap.mmap(f.fileno(), sz, flags, prot)
463 f.close() # map will persist beyond file close
467 def mmap_read(f, sz = 0):
468 """Create a read-only memory mapped region on file 'f'.
470 If sz is 0, the region will cover the entire file.
472 return _mmap_do(f, sz, mmap.MAP_PRIVATE, mmap.PROT_READ)
475 def mmap_readwrite(f, sz = 0):
476 """Create a read-write memory mapped region on file 'f'.
478 If sz is 0, the region will cover the entire file.
480 return _mmap_do(f, sz, mmap.MAP_SHARED, mmap.PROT_READ|mmap.PROT_WRITE)
484 """Parse data size information into a float number.
486 Here are some examples of conversions:
487 199.2k means 203981 bytes
488 1GB means 1073741824 bytes
489 2.1 tb means 2199023255552 bytes
491 g = re.match(r'([-+\d.e]+)\s*(\w*)', str(s))
493 raise ValueError("can't parse %r as a number" % s)
494 (val, unit) = g.groups()
497 if unit in ['t', 'tb']:
498 mult = 1024*1024*1024*1024
499 elif unit in ['g', 'gb']:
500 mult = 1024*1024*1024
501 elif unit in ['m', 'mb']:
503 elif unit in ['k', 'kb']:
505 elif unit in ['', 'b']:
508 raise ValueError("invalid unit %r in number %r" % (unit, s))
513 """Count the number of elements in an iterator. (consumes the iterator)"""
514 return reduce(lambda x,y: x+1, l)
519 """Append an error message to the list of saved errors.
521 Once processing is able to stop and output the errors, the saved errors are
522 accessible in the module variable helpers.saved_errors.
524 saved_errors.append(e)
528 istty = os.isatty(2) or atoi(os.environ.get('BUP_FORCE_TTY'))
530 """Calls log(s) if stderr is a TTY. Does nothing otherwise."""
536 """Replace the default exception handler for KeyboardInterrupt (Ctrl-C).
538 The new exception handler will make sure that bup will exit without an ugly
539 stacktrace when Ctrl-C is hit.
541 oldhook = sys.excepthook
542 def newhook(exctype, value, traceback):
543 if exctype == KeyboardInterrupt:
544 log('Interrupted.\n')
546 return oldhook(exctype, value, traceback)
547 sys.excepthook = newhook
550 def columnate(l, prefix):
551 """Format elements of 'l' in columns with 'prefix' leading each line.
553 The number of columns is determined automatically based on the string
559 clen = max(len(s) for s in l)
560 ncols = (tty_width() - len(prefix)) / (clen + 2)
565 while len(l) % ncols:
568 for s in range(0, len(l), rows):
569 cols.append(l[s:s+rows])
571 for row in zip(*cols):
572 out += prefix + ''.join(('%-*s' % (clen+2, s)) for s in row) + '\n'
576 def parse_date_or_fatal(str, fatal):
577 """Parses the given date or calls Option.fatal().
578 For now we expect a string that contains a float."""
581 except ValueError, e:
582 raise fatal('invalid date format (should be a float): %r' % e)
587 def strip_path(prefix, path):
588 """Strips a given prefix from a path.
590 First both paths are normalized.
592 Raises an Exception if no prefix is given.
595 raise Exception('no path given')
597 normalized_prefix = os.path.realpath(prefix)
598 debug2("normalized_prefix: %s\n" % normalized_prefix)
599 normalized_path = os.path.realpath(path)
600 debug2("normalized_path: %s\n" % normalized_path)
601 if normalized_path.startswith(normalized_prefix):
602 return normalized_path[len(normalized_prefix):]
607 def strip_base_path(path, base_paths):
608 """Strips the base path from a given path.
611 Determines the base path for the given string and then strips it
613 Iterates over all base_paths from long to short, to prevent that
614 a too short base_path is removed.
616 normalized_path = os.path.realpath(path)
617 sorted_base_paths = sorted(base_paths, key=len, reverse=True)
618 for bp in sorted_base_paths:
619 if normalized_path.startswith(os.path.realpath(bp)):
620 return strip_path(bp, normalized_path)
624 def graft_path(graft_points, path):
625 normalized_path = os.path.realpath(path)
626 for graft_point in graft_points:
627 old_prefix, new_prefix = graft_point
628 if normalized_path.startswith(old_prefix):
629 return re.sub(r'^' + old_prefix, new_prefix, normalized_path)
630 return normalized_path
633 # hashlib is only available in python 2.5 or higher, but the 'sha' module
634 # produces a DeprecationWarning in python 2.6 or higher. We want to support
635 # python 2.4 and above without any stupid warnings, so let's try using hashlib
636 # first, and downgrade if it fails.
647 """Format bup's version date string for output."""
648 return _version.DATE.split(' ')[0]
651 def version_commit():
652 """Get the commit hash of bup's current version."""
653 return _version.COMMIT
657 """Format bup's version tag (the official version number).
659 When generated from a commit other than one pointed to with a tag, the
660 returned string will be "unknown-" followed by the first seven positions of
663 names = _version.NAMES.strip()
664 assert(names[0] == '(')
665 assert(names[-1] == ')')
667 l = [n.strip() for n in names.split(',')]
669 if n.startswith('tag: bup-'):
671 return 'unknown-%s' % _version.COMMIT[:7]