1 """Helper functions and classes for bup."""
3 import sys, os, pwd, subprocess, errno, socket, select, mmap, stat, re, struct
5 from bup import _version
7 # This function should really be in helpers, not in bup.options. But we
8 # want options.py to be standalone so people can include it in other projects.
9 from bup.options import _tty_width
10 tty_width = _tty_width
14 """Convert the string 's' to an integer. Return 0 if s is not a number."""
22 """Convert the string 's' to a float. Return 0 if s is not a number."""
24 return float(s or '0')
29 buglvl = atoi(os.environ.get('BUP_DEBUG', 0))
32 # Write (blockingly) to sockets that may or may not be in blocking mode.
33 # We need this because our stderr is sometimes eaten by subprocesses
34 # (probably ssh) that sometimes make it nonblocking, if only temporarily,
35 # leading to race conditions. Ick. We'll do it the hard way.
36 def _hard_write(fd, buf):
38 (r,w,x) = select.select([], [fd], [], None)
40 raise IOError('select(fd) returned without being writable')
42 sz = os.write(fd, buf)
44 if e.errno != errno.EAGAIN:
50 """Print a log message to stderr."""
52 _hard_write(sys.stderr.fileno(), s)
65 def mkdirp(d, mode=None):
66 """Recursively create directories on path 'd'.
68 Unlike os.makedirs(), it doesn't raise an exception if the last element of
69 the path already exists.
77 if e.errno == errno.EEXIST:
84 """Get the next item from an iterator, None if we reached the end."""
91 def merge_iter(iters, pfreq, pfunc, pfinal, key=None):
93 samekey = lambda e, pe: getattr(e, key) == getattr(pe, key, None)
97 total = sum(len(it) for it in iters)
98 iters = (iter(it) for it in iters)
99 heap = ((next(it),it) for it in iters)
100 heap = [(e,it) for e,it in heap if e]
105 if not count % pfreq:
108 if not samekey(e, pe):
113 e = it.next() # Don't use next() function, it's too expensive
114 except StopIteration:
115 heapq.heappop(heap) # remove current
117 heapq.heapreplace(heap, (e, it)) # shift current to new location
122 """Delete a file at path 'f' if it currently exists.
124 Unlike os.unlink(), does not throw an exception if the file didn't already
130 if e.errno == errno.ENOENT:
131 pass # it doesn't exist, that's what you asked for
135 """Run a subprocess and return its output."""
136 p = subprocess.Popen(argv, stdout=subprocess.PIPE)
143 """Get the absolute path of a file.
145 Behaves like os.path.realpath, but doesn't follow a symlink for the last
146 element. (ie. if 'p' itself is a symlink, this one won't follow it, but it
147 will follow symlinks in p's directory)
153 if st and stat.S_ISLNK(st.st_mode):
154 (dir, name) = os.path.split(p)
155 dir = os.path.realpath(dir)
156 out = os.path.join(dir, name)
158 out = os.path.realpath(p)
159 #log('realpathing:%r,%r\n' % (p, out))
165 """Get the user's login name."""
170 _username = pwd.getpwuid(uid)[0]
172 _username = 'user%d' % uid
178 """Get the user's full name."""
180 if not _userfullname:
183 _userfullname = pwd.getpwuid(uid)[4].split(',')[0]
185 _userfullname = 'user%d' % uid
191 """Get the FQDN of this machine."""
194 _hostname = socket.getfqdn()
198 _resource_path = None
199 def resource_path(subdir=''):
200 global _resource_path
201 if not _resource_path:
202 _resource_path = os.environ.get('BUP_RESOURCE_PATH') or '.'
203 return os.path.join(_resource_path, subdir)
206 class NotOk(Exception):
211 def __init__(self, outp):
215 while self._read(65536): pass
217 def read(self, size):
218 """Read 'size' bytes from input stream."""
220 return self._read(size)
223 """Read from input stream until a newline is found."""
225 return self._readline()
227 def write(self, data):
228 """Write 'data' to output stream."""
229 #log('%d writing: %d bytes\n' % (os.getpid(), len(data)))
230 self.outp.write(data)
233 """Return true if input stream is readable."""
234 raise NotImplemented("Subclasses must implement has_input")
237 """Indicate end of output from last sent command."""
241 """Indicate server error to the client."""
242 s = re.sub(r'\s+', ' ', str(s))
243 self.write('\nerror %s\n' % s)
245 def _check_ok(self, onempty):
248 for rl in linereader(self):
249 #log('%d got line: %r\n' % (os.getpid(), rl))
250 if not rl: # empty line
254 elif rl.startswith('error '):
255 #log('client: error: %s\n' % rl[6:])
259 raise Exception('server exited unexpectedly; see errors above')
261 def drain_and_check_ok(self):
262 """Remove all data for the current command from input stream."""
265 return self._check_ok(onempty)
268 """Verify that server action completed successfully."""
270 raise Exception('expected "ok", got %r' % rl)
271 return self._check_ok(onempty)
274 class Conn(BaseConn):
275 def __init__(self, inp, outp):
276 BaseConn.__init__(self, outp)
279 def _read(self, size):
280 return self.inp.read(size)
283 return self.inp.readline()
286 [rl, wl, xl] = select.select([self.inp.fileno()], [], [], 0)
288 assert(rl[0] == self.inp.fileno())
294 def checked_reader(fd, n):
296 rl, _, _ = select.select([fd], [], [])
299 if not buf: raise Exception("Unexpected EOF reading %d more bytes" % n)
304 MAX_PACKET = 128 * 1024
305 def mux(p, outfd, outr, errr):
308 while p.poll() is None:
309 rl, _, _ = select.select(fds, [], [])
312 buf = os.read(outr, MAX_PACKET)
314 os.write(outfd, struct.pack('!IB', len(buf), 1) + buf)
316 buf = os.read(errr, 1024)
318 os.write(outfd, struct.pack('!IB', len(buf), 2) + buf)
320 os.write(outfd, struct.pack('!IB', 0, 3))
323 class DemuxConn(BaseConn):
324 """A helper class for bup's client-server protocol."""
325 def __init__(self, infd, outp):
326 BaseConn.__init__(self, outp)
327 # Anything that comes through before the sync string was not
328 # multiplexed and can be assumed to be debug/log before mux init.
330 while tail != 'BUPMUX':
331 b = os.read(infd, 1024)
333 raise IOError('demux: unexpected EOF during initialization')
337 sys.stderr.write(buf)
343 def write(self, data):
345 BaseConn.write(self, data)
347 def _next_packet(self, timeout):
348 if self.closed: return False
349 rl, wl, xl = select.select([self.infd], [], [], timeout)
350 if not rl: return False
351 assert(rl[0] == self.infd)
352 ns = ''.join(checked_reader(self.infd, 5))
353 n, fdw = struct.unpack('!IB', ns)
354 assert(n<=MAX_PACKET)
356 self.reader = checked_reader(self.infd, n)
358 for buf in checked_reader(self.infd, n):
359 sys.stderr.write(buf)
362 debug2("DemuxConn: marked closed\n")
365 def _load_buf(self, timeout):
366 if self.buf is not None:
368 while not self.closed:
369 while not self.reader:
370 if not self._next_packet(timeout):
373 self.buf = self.reader.next()
375 except StopIteration:
379 def _read_parts(self, ix_fn):
380 while self._load_buf(None):
381 assert(self.buf is not None)
383 if i is None or i == len(self.buf):
388 self.buf = self.buf[i:]
396 return buf.index('\n')+1
399 return ''.join(self._read_parts(find_eol))
401 def _read(self, size):
403 def until_size(buf): # Closes on csize
404 if len(buf) < csize[0]:
409 return ''.join(self._read_parts(until_size))
412 return self._load_buf(0)
416 """Generate a list of input lines from 'f' without terminating newlines."""
424 def chunkyreader(f, count = None):
425 """Generate a list of chunks of data read from 'f'.
427 If count is None, read until EOF is reached.
429 If count is a positive integer, read 'count' bytes from 'f'. If EOF is
430 reached while reading, raise IOError.
434 b = f.read(min(count, 65536))
436 raise IOError('EOF with %d bytes remaining' % count)
447 """Append "/" to 's' if it doesn't aleady end in "/"."""
448 if s and not s.endswith('/'):
454 def _mmap_do(f, sz, flags, prot):
456 st = os.fstat(f.fileno())
459 # trying to open a zero-length map gives an error, but an empty
460 # string has all the same behaviour of a zero-length map, ie. it has
463 map = mmap.mmap(f.fileno(), sz, flags, prot)
464 f.close() # map will persist beyond file close
468 def mmap_read(f, sz = 0):
469 """Create a read-only memory mapped region on file 'f'.
471 If sz is 0, the region will cover the entire file.
473 return _mmap_do(f, sz, mmap.MAP_PRIVATE, mmap.PROT_READ)
476 def mmap_readwrite(f, sz = 0):
477 """Create a read-write memory mapped region on file 'f'.
479 If sz is 0, the region will cover the entire file.
481 return _mmap_do(f, sz, mmap.MAP_SHARED, mmap.PROT_READ|mmap.PROT_WRITE)
485 """Parse data size information into a float number.
487 Here are some examples of conversions:
488 199.2k means 203981 bytes
489 1GB means 1073741824 bytes
490 2.1 tb means 2199023255552 bytes
492 g = re.match(r'([-+\d.e]+)\s*(\w*)', str(s))
494 raise ValueError("can't parse %r as a number" % s)
495 (val, unit) = g.groups()
498 if unit in ['t', 'tb']:
499 mult = 1024*1024*1024*1024
500 elif unit in ['g', 'gb']:
501 mult = 1024*1024*1024
502 elif unit in ['m', 'mb']:
504 elif unit in ['k', 'kb']:
506 elif unit in ['', 'b']:
509 raise ValueError("invalid unit %r in number %r" % (unit, s))
514 """Count the number of elements in an iterator. (consumes the iterator)"""
515 return reduce(lambda x,y: x+1, l)
520 """Append an error message to the list of saved errors.
522 Once processing is able to stop and output the errors, the saved errors are
523 accessible in the module variable helpers.saved_errors.
525 saved_errors.append(e)
529 istty = os.isatty(2) or atoi(os.environ.get('BUP_FORCE_TTY'))
531 """Calls log(s) if stderr is a TTY. Does nothing otherwise."""
537 """Replace the default exception handler for KeyboardInterrupt (Ctrl-C).
539 The new exception handler will make sure that bup will exit without an ugly
540 stacktrace when Ctrl-C is hit.
542 oldhook = sys.excepthook
543 def newhook(exctype, value, traceback):
544 if exctype == KeyboardInterrupt:
545 log('Interrupted.\n')
547 return oldhook(exctype, value, traceback)
548 sys.excepthook = newhook
551 def columnate(l, prefix):
552 """Format elements of 'l' in columns with 'prefix' leading each line.
554 The number of columns is determined automatically based on the string
560 clen = max(len(s) for s in l)
561 ncols = (tty_width() - len(prefix)) / (clen + 2)
566 while len(l) % ncols:
569 for s in range(0, len(l), rows):
570 cols.append(l[s:s+rows])
572 for row in zip(*cols):
573 out += prefix + ''.join(('%-*s' % (clen+2, s)) for s in row) + '\n'
577 def parse_date_or_fatal(str, fatal):
578 """Parses the given date or calls Option.fatal().
579 For now we expect a string that contains a float."""
582 except ValueError, e:
583 raise fatal('invalid date format (should be a float): %r' % e)
588 def strip_path(prefix, path):
589 """Strips a given prefix from a path.
591 First both paths are normalized.
593 Raises an Exception if no prefix is given.
596 raise Exception('no path given')
598 normalized_prefix = os.path.realpath(prefix)
599 debug2("normalized_prefix: %s\n" % normalized_prefix)
600 normalized_path = os.path.realpath(path)
601 debug2("normalized_path: %s\n" % normalized_path)
602 if normalized_path.startswith(normalized_prefix):
603 return normalized_path[len(normalized_prefix):]
608 def strip_base_path(path, base_paths):
609 """Strips the base path from a given path.
612 Determines the base path for the given string and then strips it
614 Iterates over all base_paths from long to short, to prevent that
615 a too short base_path is removed.
617 normalized_path = os.path.realpath(path)
618 sorted_base_paths = sorted(base_paths, key=len, reverse=True)
619 for bp in sorted_base_paths:
620 if normalized_path.startswith(os.path.realpath(bp)):
621 return strip_path(bp, normalized_path)
625 def graft_path(graft_points, path):
626 normalized_path = os.path.realpath(path)
627 for graft_point in graft_points:
628 old_prefix, new_prefix = graft_point
629 if normalized_path.startswith(old_prefix):
630 return re.sub(r'^' + old_prefix, new_prefix, normalized_path)
631 return normalized_path
634 # hashlib is only available in python 2.5 or higher, but the 'sha' module
635 # produces a DeprecationWarning in python 2.6 or higher. We want to support
636 # python 2.4 and above without any stupid warnings, so let's try using hashlib
637 # first, and downgrade if it fails.
648 """Format bup's version date string for output."""
649 return _version.DATE.split(' ')[0]
652 def version_commit():
653 """Get the commit hash of bup's current version."""
654 return _version.COMMIT
658 """Format bup's version tag (the official version number).
660 When generated from a commit other than one pointed to with a tag, the
661 returned string will be "unknown-" followed by the first seven positions of
664 names = _version.NAMES.strip()
665 assert(names[0] == '(')
666 assert(names[-1] == ')')
668 l = [n.strip() for n in names.split(',')]
670 if n.startswith('tag: bup-'):
672 return 'unknown-%s' % _version.COMMIT[:7]