1 """Helper functions and classes for bup."""
3 import sys, os, pwd, subprocess, errno, socket, select, mmap, stat, re, struct
5 from bup import _version
7 # This function should really be in helpers, not in bup.options. But we
8 # want options.py to be standalone so people can include it in other projects.
9 from bup.options import _tty_width
10 tty_width = _tty_width
14 """Convert the string 's' to an integer. Return 0 if s is not a number."""
22 """Convert the string 's' to a float. Return 0 if s is not a number."""
24 return float(s or '0')
29 buglvl = atoi(os.environ.get('BUP_DEBUG', 0))
32 # Write (blockingly) to sockets that may or may not be in blocking mode.
33 # We need this because our stderr is sometimes eaten by subprocesses
34 # (probably ssh) that sometimes make it nonblocking, if only temporarily,
35 # leading to race conditions. Ick. We'll do it the hard way.
36 def _hard_write(fd, buf):
38 (r,w,x) = select.select([], [fd], [], None)
40 raise IOError('select(fd) returned without being writable')
42 sz = os.write(fd, buf)
44 if e.errno != errno.EAGAIN:
50 """Print a log message to stderr."""
52 _hard_write(sys.stderr.fileno(), s)
65 def mkdirp(d, mode=None):
66 """Recursively create directories on path 'd'.
68 Unlike os.makedirs(), it doesn't raise an exception if the last element of
69 the path already exists.
77 if e.errno == errno.EEXIST:
84 """Get the next item from an iterator, None if we reached the end."""
91 def merge_iter(iters, pfreq, pfunc, pfinal, key=None):
93 samekey = lambda e, pe: getattr(e, key) == getattr(pe, key, None)
97 total = sum(len(it) for it in iters)
98 iters = (iter(it) for it in iters)
99 heap = ((next(it),it) for it in iters)
100 heap = [(e,it) for e,it in heap if e]
105 if not count % pfreq:
108 if not samekey(e, pe):
113 e = it.next() # Don't use next() function, it's too expensive
114 except StopIteration:
115 heapq.heappop(heap) # remove current
117 heapq.heapreplace(heap, (e, it)) # shift current to new location
122 """Delete a file at path 'f' if it currently exists.
124 Unlike os.unlink(), does not throw an exception if the file didn't already
130 if e.errno == errno.ENOENT:
131 pass # it doesn't exist, that's what you asked for
135 """Run a subprocess and return its output."""
136 p = subprocess.Popen(argv, stdout=subprocess.PIPE)
143 """Get the absolute path of a file.
145 Behaves like os.path.realpath, but doesn't follow a symlink for the last
146 element. (ie. if 'p' itself is a symlink, this one won't follow it, but it
147 will follow symlinks in p's directory)
153 if st and stat.S_ISLNK(st.st_mode):
154 (dir, name) = os.path.split(p)
155 dir = os.path.realpath(dir)
156 out = os.path.join(dir, name)
158 out = os.path.realpath(p)
159 #log('realpathing:%r,%r\n' % (p, out))
165 """Get the user's login name."""
170 _username = pwd.getpwuid(uid)[0]
172 _username = 'user%d' % uid
178 """Get the user's full name."""
180 if not _userfullname:
183 _userfullname = pwd.getpwuid(uid)[4].split(',')[0]
185 _userfullname = 'user%d' % uid
191 """Get the FQDN of this machine."""
194 _hostname = socket.getfqdn()
198 _resource_path = None
199 def resource_path(subdir=''):
200 global _resource_path
201 if not _resource_path:
202 _resource_path = os.environ.get('BUP_RESOURCE_PATH') or '.'
203 return os.path.join(_resource_path, subdir)
206 class NotOk(Exception):
211 def __init__(self, outp):
215 while self._read(65536): pass
217 def read(self, size):
218 """Read 'size' bytes from input stream."""
220 return self._read(size)
223 """Read from input stream until a newline is found."""
225 return self._readline()
227 def write(self, data):
228 """Write 'data' to output stream."""
229 #log('%d writing: %d bytes\n' % (os.getpid(), len(data)))
230 self.outp.write(data)
233 """Return true if input stream is readable."""
234 raise NotImplemented("Subclasses must implement has_input")
237 """Indicate end of output from last sent command."""
241 """Indicate server error to the client."""
242 s = re.sub(r'\s+', ' ', str(s))
243 self.write('\nerror %s\n' % s)
245 def _check_ok(self, onempty):
248 for rl in linereader(self):
249 #log('%d got line: %r\n' % (os.getpid(), rl))
250 if not rl: # empty line
254 elif rl.startswith('error '):
255 #log('client: error: %s\n' % rl[6:])
259 raise Exception('server exited unexpectedly; see errors above')
261 def drain_and_check_ok(self):
262 """Remove all data for the current command from input stream."""
265 return self._check_ok(onempty)
268 """Verify that server action completed successfully."""
270 raise Exception('expected "ok", got %r' % rl)
271 return self._check_ok(onempty)
274 class Conn(BaseConn):
275 def __init__(self, inp, outp):
276 BaseConn.__init__(self, outp)
279 def _read(self, size):
280 return self.inp.read(size)
283 return self.inp.readline()
286 [rl, wl, xl] = select.select([self.inp.fileno()], [], [], 0)
288 assert(rl[0] == self.inp.fileno())
294 def checked_reader(fd, n):
296 rl, _, _ = select.select([fd], [], [])
299 if not buf: raise Exception("Unexpected EOF reading %d more bytes" % n)
304 MAX_PACKET = 128 * 1024
305 def mux(p, outfd, outr, errr):
308 while p.poll() is None:
309 rl, _, _ = select.select(fds, [], [])
312 buf = os.read(outr, MAX_PACKET)
314 os.write(outfd, struct.pack('!IB', len(buf), 1) + buf)
316 buf = os.read(errr, 1024)
318 os.write(outfd, struct.pack('!IB', len(buf), 2) + buf)
320 os.write(outfd, struct.pack('!IB', 0, 3))
323 class DemuxConn(BaseConn):
324 """A helper class for bup's client-server protocol."""
325 def __init__(self, infd, outp):
326 BaseConn.__init__(self, outp)
327 # Anything that comes through before the sync string was not
328 # multiplexed and can be assumed to be debug/log before mux init.
330 while tail != 'BUPMUX':
331 tail += os.read(infd, 1024)
334 sys.stderr.write(buf)
340 def write(self, data):
342 BaseConn.write(self, data)
344 def _next_packet(self, timeout):
345 if self.closed: return False
346 rl, wl, xl = select.select([self.infd], [], [], timeout)
347 if not rl: return False
348 assert(rl[0] == self.infd)
349 ns = ''.join(checked_reader(self.infd, 5))
350 n, fdw = struct.unpack('!IB', ns)
351 assert(n<=MAX_PACKET)
353 self.reader = checked_reader(self.infd, n)
355 for buf in checked_reader(self.infd, n):
356 sys.stderr.write(buf)
359 debug2("DemuxConn: marked closed\n")
362 def _load_buf(self, timeout):
363 if self.buf is not None:
365 while not self.closed:
366 while not self.reader:
367 if not self._next_packet(timeout):
370 self.buf = self.reader.next()
372 except StopIteration:
376 def _read_parts(self, ix_fn):
377 while self._load_buf(None):
378 assert(self.buf is not None)
380 if i is None or i == len(self.buf):
385 self.buf = self.buf[i:]
393 return buf.index('\n')+1
396 return ''.join(self._read_parts(find_eol))
398 def _read(self, size):
400 def until_size(buf): # Closes on csize
401 if len(buf) < csize[0]:
406 return ''.join(self._read_parts(until_size))
409 return self._load_buf(0)
413 """Generate a list of input lines from 'f' without terminating newlines."""
421 def chunkyreader(f, count = None):
422 """Generate a list of chunks of data read from 'f'.
424 If count is None, read until EOF is reached.
426 If count is a positive integer, read 'count' bytes from 'f'. If EOF is
427 reached while reading, raise IOError.
431 b = f.read(min(count, 65536))
433 raise IOError('EOF with %d bytes remaining' % count)
444 """Append "/" to 's' if it doesn't aleady end in "/"."""
445 if s and not s.endswith('/'):
451 def _mmap_do(f, sz, flags, prot):
453 st = os.fstat(f.fileno())
456 # trying to open a zero-length map gives an error, but an empty
457 # string has all the same behaviour of a zero-length map, ie. it has
460 map = mmap.mmap(f.fileno(), sz, flags, prot)
461 f.close() # map will persist beyond file close
465 def mmap_read(f, sz = 0):
466 """Create a read-only memory mapped region on file 'f'.
468 If sz is 0, the region will cover the entire file.
470 return _mmap_do(f, sz, mmap.MAP_PRIVATE, mmap.PROT_READ)
473 def mmap_readwrite(f, sz = 0):
474 """Create a read-write memory mapped region on file 'f'.
476 If sz is 0, the region will cover the entire file.
478 return _mmap_do(f, sz, mmap.MAP_SHARED, mmap.PROT_READ|mmap.PROT_WRITE)
482 """Parse data size information into a float number.
484 Here are some examples of conversions:
485 199.2k means 203981 bytes
486 1GB means 1073741824 bytes
487 2.1 tb means 2199023255552 bytes
489 g = re.match(r'([-+\d.e]+)\s*(\w*)', str(s))
491 raise ValueError("can't parse %r as a number" % s)
492 (val, unit) = g.groups()
495 if unit in ['t', 'tb']:
496 mult = 1024*1024*1024*1024
497 elif unit in ['g', 'gb']:
498 mult = 1024*1024*1024
499 elif unit in ['m', 'mb']:
501 elif unit in ['k', 'kb']:
503 elif unit in ['', 'b']:
506 raise ValueError("invalid unit %r in number %r" % (unit, s))
511 """Count the number of elements in an iterator. (consumes the iterator)"""
512 return reduce(lambda x,y: x+1, l)
517 """Append an error message to the list of saved errors.
519 Once processing is able to stop and output the errors, the saved errors are
520 accessible in the module variable helpers.saved_errors.
522 saved_errors.append(e)
526 istty = os.isatty(2) or atoi(os.environ.get('BUP_FORCE_TTY'))
528 """Calls log(s) if stderr is a TTY. Does nothing otherwise."""
534 """Replace the default exception handler for KeyboardInterrupt (Ctrl-C).
536 The new exception handler will make sure that bup will exit without an ugly
537 stacktrace when Ctrl-C is hit.
539 oldhook = sys.excepthook
540 def newhook(exctype, value, traceback):
541 if exctype == KeyboardInterrupt:
542 log('Interrupted.\n')
544 return oldhook(exctype, value, traceback)
545 sys.excepthook = newhook
548 def columnate(l, prefix):
549 """Format elements of 'l' in columns with 'prefix' leading each line.
551 The number of columns is determined automatically based on the string
557 clen = max(len(s) for s in l)
558 ncols = (tty_width() - len(prefix)) / (clen + 2)
563 while len(l) % ncols:
566 for s in range(0, len(l), rows):
567 cols.append(l[s:s+rows])
569 for row in zip(*cols):
570 out += prefix + ''.join(('%-*s' % (clen+2, s)) for s in row) + '\n'
574 def parse_date_or_fatal(str, fatal):
575 """Parses the given date or calls Option.fatal().
576 For now we expect a string that contains a float."""
579 except ValueError, e:
580 raise fatal('invalid date format (should be a float): %r' % e)
585 def strip_path(prefix, path):
586 """Strips a given prefix from a path.
588 First both paths are normalized.
590 Raises an Exception if no prefix is given.
593 raise Exception('no path given')
595 normalized_prefix = os.path.realpath(prefix)
596 debug2("normalized_prefix: %s\n" % normalized_prefix)
597 normalized_path = os.path.realpath(path)
598 debug2("normalized_path: %s\n" % normalized_path)
599 if normalized_path.startswith(normalized_prefix):
600 return normalized_path[len(normalized_prefix):]
605 def strip_base_path(path, base_paths):
606 """Strips the base path from a given path.
609 Determines the base path for the given string and then strips it
611 Iterates over all base_paths from long to short, to prevent that
612 a too short base_path is removed.
614 normalized_path = os.path.realpath(path)
615 sorted_base_paths = sorted(base_paths, key=len, reverse=True)
616 for bp in sorted_base_paths:
617 if normalized_path.startswith(os.path.realpath(bp)):
618 return strip_path(bp, normalized_path)
622 def graft_path(graft_points, path):
623 normalized_path = os.path.realpath(path)
624 for graft_point in graft_points:
625 old_prefix, new_prefix = graft_point
626 if normalized_path.startswith(old_prefix):
627 return re.sub(r'^' + old_prefix, new_prefix, normalized_path)
628 return normalized_path
631 # hashlib is only available in python 2.5 or higher, but the 'sha' module
632 # produces a DeprecationWarning in python 2.6 or higher. We want to support
633 # python 2.4 and above without any stupid warnings, so let's try using hashlib
634 # first, and downgrade if it fails.
645 """Format bup's version date string for output."""
646 return _version.DATE.split(' ')[0]
649 def version_commit():
650 """Get the commit hash of bup's current version."""
651 return _version.COMMIT
655 """Format bup's version tag (the official version number).
657 When generated from a commit other than one pointed to with a tag, the
658 returned string will be "unknown-" followed by the first seven positions of
661 names = _version.NAMES.strip()
662 assert(names[0] == '(')
663 assert(names[-1] == ')')
665 l = [n.strip() for n in names.split(',')]
667 if n.startswith('tag: bup-'):
669 return 'unknown-%s' % _version.COMMIT[:7]