1 """Helper functions and classes for bup."""
3 import sys, os, pwd, subprocess, errno, socket, select, mmap, stat, re, struct
4 import heapq, operator, time
5 from bup import _version, _helpers
6 import bup._helpers as _helpers
8 # This function should really be in helpers, not in bup.options. But we
9 # want options.py to be standalone so people can include it in other projects.
10 from bup.options import _tty_width
11 tty_width = _tty_width
15 """Convert the string 's' to an integer. Return 0 if s is not a number."""
23 """Convert the string 's' to a float. Return 0 if s is not a number."""
25 return float(s or '0')
30 buglvl = atoi(os.environ.get('BUP_DEBUG', 0))
33 # Write (blockingly) to sockets that may or may not be in blocking mode.
34 # We need this because our stderr is sometimes eaten by subprocesses
35 # (probably ssh) that sometimes make it nonblocking, if only temporarily,
36 # leading to race conditions. Ick. We'll do it the hard way.
37 def _hard_write(fd, buf):
39 (r,w,x) = select.select([], [fd], [], None)
41 raise IOError('select(fd) returned without being writable')
43 sz = os.write(fd, buf)
45 if e.errno != errno.EAGAIN:
53 """Print a log message to stderr."""
56 _hard_write(sys.stderr.fileno(), s)
70 istty = os.isatty(2) or atoi(os.environ.get('BUP_FORCE_TTY'))
72 """Calls log() if stderr is a TTY. Does nothing otherwise."""
78 """Calls progress() only if we haven't printed progress in a while.
80 This avoids overloading the stderr buffer with excess junk."""
83 if now - _last_prog > 0.1:
88 def mkdirp(d, mode=None):
89 """Recursively create directories on path 'd'.
91 Unlike os.makedirs(), it doesn't raise an exception if the last element of
92 the path already exists.
100 if e.errno == errno.EEXIST:
107 """Get the next item from an iterator, None if we reached the end."""
110 except StopIteration:
114 def merge_iter(iters, pfreq, pfunc, pfinal, key=None):
116 samekey = lambda e, pe: getattr(e, key) == getattr(pe, key, None)
118 samekey = operator.eq
120 total = sum(len(it) for it in iters)
121 iters = (iter(it) for it in iters)
122 heap = ((next(it),it) for it in iters)
123 heap = [(e,it) for e,it in heap if e]
128 if not count % pfreq:
131 if not samekey(e, pe):
136 e = it.next() # Don't use next() function, it's too expensive
137 except StopIteration:
138 heapq.heappop(heap) # remove current
140 heapq.heapreplace(heap, (e, it)) # shift current to new location
145 """Delete a file at path 'f' if it currently exists.
147 Unlike os.unlink(), does not throw an exception if the file didn't already
153 if e.errno == errno.ENOENT:
154 pass # it doesn't exist, that's what you asked for
158 """Run a subprocess and return its output."""
159 p = subprocess.Popen(argv, stdout=subprocess.PIPE)
166 """Get the absolute path of a file.
168 Behaves like os.path.realpath, but doesn't follow a symlink for the last
169 element. (ie. if 'p' itself is a symlink, this one won't follow it, but it
170 will follow symlinks in p's directory)
176 if st and stat.S_ISLNK(st.st_mode):
177 (dir, name) = os.path.split(p)
178 dir = os.path.realpath(dir)
179 out = os.path.join(dir, name)
181 out = os.path.realpath(p)
182 #log('realpathing:%r,%r\n' % (p, out))
186 def detect_fakeroot():
187 "Return True if we appear to be running under fakeroot."
188 return os.getenv("FAKEROOTKEY") != None
193 """Get the user's login name."""
198 _username = pwd.getpwuid(uid)[0]
200 _username = 'user%d' % uid
206 """Get the user's full name."""
208 if not _userfullname:
211 _userfullname = pwd.getpwuid(uid)[4].split(',')[0]
213 _userfullname = 'user%d' % uid
219 """Get the FQDN of this machine."""
222 _hostname = socket.getfqdn()
226 _resource_path = None
227 def resource_path(subdir=''):
228 global _resource_path
229 if not _resource_path:
230 _resource_path = os.environ.get('BUP_RESOURCE_PATH') or '.'
231 return os.path.join(_resource_path, subdir)
234 class NotOk(Exception):
239 def __init__(self, outp):
243 while self._read(65536): pass
245 def read(self, size):
246 """Read 'size' bytes from input stream."""
248 return self._read(size)
251 """Read from input stream until a newline is found."""
253 return self._readline()
255 def write(self, data):
256 """Write 'data' to output stream."""
257 #log('%d writing: %d bytes\n' % (os.getpid(), len(data)))
258 self.outp.write(data)
261 """Return true if input stream is readable."""
262 raise NotImplemented("Subclasses must implement has_input")
265 """Indicate end of output from last sent command."""
269 """Indicate server error to the client."""
270 s = re.sub(r'\s+', ' ', str(s))
271 self.write('\nerror %s\n' % s)
273 def _check_ok(self, onempty):
276 for rl in linereader(self):
277 #log('%d got line: %r\n' % (os.getpid(), rl))
278 if not rl: # empty line
282 elif rl.startswith('error '):
283 #log('client: error: %s\n' % rl[6:])
287 raise Exception('server exited unexpectedly; see errors above')
289 def drain_and_check_ok(self):
290 """Remove all data for the current command from input stream."""
293 return self._check_ok(onempty)
296 """Verify that server action completed successfully."""
298 raise Exception('expected "ok", got %r' % rl)
299 return self._check_ok(onempty)
302 class Conn(BaseConn):
303 def __init__(self, inp, outp):
304 BaseConn.__init__(self, outp)
307 def _read(self, size):
308 return self.inp.read(size)
311 return self.inp.readline()
314 [rl, wl, xl] = select.select([self.inp.fileno()], [], [], 0)
316 assert(rl[0] == self.inp.fileno())
322 def checked_reader(fd, n):
324 rl, _, _ = select.select([fd], [], [])
327 if not buf: raise Exception("Unexpected EOF reading %d more bytes" % n)
332 MAX_PACKET = 128 * 1024
333 def mux(p, outfd, outr, errr):
336 while p.poll() is None:
337 rl, _, _ = select.select(fds, [], [])
340 buf = os.read(outr, MAX_PACKET)
342 os.write(outfd, struct.pack('!IB', len(buf), 1) + buf)
344 buf = os.read(errr, 1024)
346 os.write(outfd, struct.pack('!IB', len(buf), 2) + buf)
348 os.write(outfd, struct.pack('!IB', 0, 3))
351 class DemuxConn(BaseConn):
352 """A helper class for bup's client-server protocol."""
353 def __init__(self, infd, outp):
354 BaseConn.__init__(self, outp)
355 # Anything that comes through before the sync string was not
356 # multiplexed and can be assumed to be debug/log before mux init.
358 while tail != 'BUPMUX':
359 b = os.read(infd, (len(tail) < 6) and (6-len(tail)) or 1)
361 raise IOError('demux: unexpected EOF during initialization')
363 sys.stderr.write(tail[:-6]) # pre-mux log messages
370 def write(self, data):
372 BaseConn.write(self, data)
374 def _next_packet(self, timeout):
375 if self.closed: return False
376 rl, wl, xl = select.select([self.infd], [], [], timeout)
377 if not rl: return False
378 assert(rl[0] == self.infd)
379 ns = ''.join(checked_reader(self.infd, 5))
380 n, fdw = struct.unpack('!IB', ns)
381 assert(n <= MAX_PACKET)
383 self.reader = checked_reader(self.infd, n)
385 for buf in checked_reader(self.infd, n):
386 sys.stderr.write(buf)
389 debug2("DemuxConn: marked closed\n")
392 def _load_buf(self, timeout):
393 if self.buf is not None:
395 while not self.closed:
396 while not self.reader:
397 if not self._next_packet(timeout):
400 self.buf = self.reader.next()
402 except StopIteration:
406 def _read_parts(self, ix_fn):
407 while self._load_buf(None):
408 assert(self.buf is not None)
410 if i is None or i == len(self.buf):
415 self.buf = self.buf[i:]
423 return buf.index('\n')+1
426 return ''.join(self._read_parts(find_eol))
428 def _read(self, size):
430 def until_size(buf): # Closes on csize
431 if len(buf) < csize[0]:
436 return ''.join(self._read_parts(until_size))
439 return self._load_buf(0)
443 """Generate a list of input lines from 'f' without terminating newlines."""
451 def chunkyreader(f, count = None):
452 """Generate a list of chunks of data read from 'f'.
454 If count is None, read until EOF is reached.
456 If count is a positive integer, read 'count' bytes from 'f'. If EOF is
457 reached while reading, raise IOError.
461 b = f.read(min(count, 65536))
463 raise IOError('EOF with %d bytes remaining' % count)
474 """Append "/" to 's' if it doesn't aleady end in "/"."""
475 if s and not s.endswith('/'):
481 def _mmap_do(f, sz, flags, prot, close):
483 st = os.fstat(f.fileno())
486 # trying to open a zero-length map gives an error, but an empty
487 # string has all the same behaviour of a zero-length map, ie. it has
490 map = mmap.mmap(f.fileno(), sz, flags, prot)
492 f.close() # map will persist beyond file close
496 def mmap_read(f, sz = 0, close=True):
497 """Create a read-only memory mapped region on file 'f'.
498 If sz is 0, the region will cover the entire file.
500 return _mmap_do(f, sz, mmap.MAP_PRIVATE, mmap.PROT_READ, close)
503 def mmap_readwrite(f, sz = 0, close=True):
504 """Create a read-write memory mapped region on file 'f'.
505 If sz is 0, the region will cover the entire file.
507 return _mmap_do(f, sz, mmap.MAP_SHARED, mmap.PROT_READ|mmap.PROT_WRITE,
511 def mmap_readwrite_private(f, sz = 0, close=True):
512 """Create a read-write memory mapped region on file 'f'.
513 If sz is 0, the region will cover the entire file.
514 The map is private, which means the changes are never flushed back to the
517 return _mmap_do(f, sz, mmap.MAP_PRIVATE, mmap.PROT_READ|mmap.PROT_WRITE,
522 """Parse data size information into a float number.
524 Here are some examples of conversions:
525 199.2k means 203981 bytes
526 1GB means 1073741824 bytes
527 2.1 tb means 2199023255552 bytes
529 g = re.match(r'([-+\d.e]+)\s*(\w*)', str(s))
531 raise ValueError("can't parse %r as a number" % s)
532 (val, unit) = g.groups()
535 if unit in ['t', 'tb']:
536 mult = 1024*1024*1024*1024
537 elif unit in ['g', 'gb']:
538 mult = 1024*1024*1024
539 elif unit in ['m', 'mb']:
541 elif unit in ['k', 'kb']:
543 elif unit in ['', 'b']:
546 raise ValueError("invalid unit %r in number %r" % (unit, s))
551 """Count the number of elements in an iterator. (consumes the iterator)"""
552 return reduce(lambda x,y: x+1, l)
557 """Append an error message to the list of saved errors.
559 Once processing is able to stop and output the errors, the saved errors are
560 accessible in the module variable helpers.saved_errors.
562 saved_errors.append(e)
572 """Replace the default exception handler for KeyboardInterrupt (Ctrl-C).
574 The new exception handler will make sure that bup will exit without an ugly
575 stacktrace when Ctrl-C is hit.
577 oldhook = sys.excepthook
578 def newhook(exctype, value, traceback):
579 if exctype == KeyboardInterrupt:
580 log('Interrupted.\n')
582 return oldhook(exctype, value, traceback)
583 sys.excepthook = newhook
586 def columnate(l, prefix):
587 """Format elements of 'l' in columns with 'prefix' leading each line.
589 The number of columns is determined automatically based on the string
595 clen = max(len(s) for s in l)
596 ncols = (tty_width() - len(prefix)) / (clen + 2)
601 while len(l) % ncols:
604 for s in range(0, len(l), rows):
605 cols.append(l[s:s+rows])
607 for row in zip(*cols):
608 out += prefix + ''.join(('%-*s' % (clen+2, s)) for s in row) + '\n'
612 def parse_date_or_fatal(str, fatal):
613 """Parses the given date or calls Option.fatal().
614 For now we expect a string that contains a float."""
617 except ValueError, e:
618 raise fatal('invalid date format (should be a float): %r' % e)
623 def strip_path(prefix, path):
624 """Strips a given prefix from a path.
626 First both paths are normalized.
628 Raises an Exception if no prefix is given.
631 raise Exception('no path given')
633 normalized_prefix = os.path.realpath(prefix)
634 debug2("normalized_prefix: %s\n" % normalized_prefix)
635 normalized_path = os.path.realpath(path)
636 debug2("normalized_path: %s\n" % normalized_path)
637 if normalized_path.startswith(normalized_prefix):
638 return normalized_path[len(normalized_prefix):]
643 def strip_base_path(path, base_paths):
644 """Strips the base path from a given path.
647 Determines the base path for the given string and then strips it
649 Iterates over all base_paths from long to short, to prevent that
650 a too short base_path is removed.
652 normalized_path = os.path.realpath(path)
653 sorted_base_paths = sorted(base_paths, key=len, reverse=True)
654 for bp in sorted_base_paths:
655 if normalized_path.startswith(os.path.realpath(bp)):
656 return strip_path(bp, normalized_path)
660 def graft_path(graft_points, path):
661 normalized_path = os.path.realpath(path)
662 for graft_point in graft_points:
663 old_prefix, new_prefix = graft_point
664 if normalized_path.startswith(old_prefix):
665 return re.sub(r'^' + old_prefix, new_prefix, normalized_path)
666 return normalized_path
669 # hashlib is only available in python 2.5 or higher, but the 'sha' module
670 # produces a DeprecationWarning in python 2.6 or higher. We want to support
671 # python 2.4 and above without any stupid warnings, so let's try using hashlib
672 # first, and downgrade if it fails.
683 """Format bup's version date string for output."""
684 return _version.DATE.split(' ')[0]
687 def version_commit():
688 """Get the commit hash of bup's current version."""
689 return _version.COMMIT
693 """Format bup's version tag (the official version number).
695 When generated from a commit other than one pointed to with a tag, the
696 returned string will be "unknown-" followed by the first seven positions of
699 names = _version.NAMES.strip()
700 assert(names[0] == '(')
701 assert(names[-1] == ')')
703 l = [n.strip() for n in names.split(',')]
705 if n.startswith('tag: bup-'):
707 return 'unknown-%s' % _version.COMMIT[:7]