1 """Helper functions and classes for bup."""
3 import sys, os, pwd, subprocess, errno, socket, select, mmap, stat, re, struct
5 from bup import _version
6 import bup._helpers as _helpers
8 # This function should really be in helpers, not in bup.options. But we
9 # want options.py to be standalone so people can include it in other projects.
10 from bup.options import _tty_width
11 tty_width = _tty_width
15 """Convert the string 's' to an integer. Return 0 if s is not a number."""
23 """Convert the string 's' to a float. Return 0 if s is not a number."""
25 return float(s or '0')
30 buglvl = atoi(os.environ.get('BUP_DEBUG', 0))
33 # Write (blockingly) to sockets that may or may not be in blocking mode.
34 # We need this because our stderr is sometimes eaten by subprocesses
35 # (probably ssh) that sometimes make it nonblocking, if only temporarily,
36 # leading to race conditions. Ick. We'll do it the hard way.
37 def _hard_write(fd, buf):
39 (r,w,x) = select.select([], [fd], [], None)
41 raise IOError('select(fd) returned without being writable')
43 sz = os.write(fd, buf)
45 if e.errno != errno.EAGAIN:
51 """Print a log message to stderr."""
53 _hard_write(sys.stderr.fileno(), s)
66 def mkdirp(d, mode=None):
67 """Recursively create directories on path 'd'.
69 Unlike os.makedirs(), it doesn't raise an exception if the last element of
70 the path already exists.
78 if e.errno == errno.EEXIST:
85 """Get the next item from an iterator, None if we reached the end."""
92 def merge_iter(iters, pfreq, pfunc, pfinal, key=None):
94 samekey = lambda e, pe: getattr(e, key) == getattr(pe, key, None)
98 total = sum(len(it) for it in iters)
99 iters = (iter(it) for it in iters)
100 heap = ((next(it),it) for it in iters)
101 heap = [(e,it) for e,it in heap if e]
106 if not count % pfreq:
109 if not samekey(e, pe):
114 e = it.next() # Don't use next() function, it's too expensive
115 except StopIteration:
116 heapq.heappop(heap) # remove current
118 heapq.heapreplace(heap, (e, it)) # shift current to new location
123 """Delete a file at path 'f' if it currently exists.
125 Unlike os.unlink(), does not throw an exception if the file didn't already
131 if e.errno == errno.ENOENT:
132 pass # it doesn't exist, that's what you asked for
136 """Run a subprocess and return its output."""
137 p = subprocess.Popen(argv, stdout=subprocess.PIPE)
144 """Get the absolute path of a file.
146 Behaves like os.path.realpath, but doesn't follow a symlink for the last
147 element. (ie. if 'p' itself is a symlink, this one won't follow it, but it
148 will follow symlinks in p's directory)
154 if st and stat.S_ISLNK(st.st_mode):
155 (dir, name) = os.path.split(p)
156 dir = os.path.realpath(dir)
157 out = os.path.join(dir, name)
159 out = os.path.realpath(p)
160 #log('realpathing:%r,%r\n' % (p, out))
164 def detect_fakeroot():
165 "Return True if we appear to be running under fakeroot."
166 return os.getenv("FAKEROOTKEY") != None
171 """Get the user's login name."""
176 _username = pwd.getpwuid(uid)[0]
178 _username = 'user%d' % uid
184 """Get the user's full name."""
186 if not _userfullname:
189 _userfullname = pwd.getpwuid(uid)[4].split(',')[0]
191 _userfullname = 'user%d' % uid
197 """Get the FQDN of this machine."""
200 _hostname = socket.getfqdn()
204 _resource_path = None
205 def resource_path(subdir=''):
206 global _resource_path
207 if not _resource_path:
208 _resource_path = os.environ.get('BUP_RESOURCE_PATH') or '.'
209 return os.path.join(_resource_path, subdir)
212 class NotOk(Exception):
217 def __init__(self, outp):
221 while self._read(65536): pass
223 def read(self, size):
224 """Read 'size' bytes from input stream."""
226 return self._read(size)
229 """Read from input stream until a newline is found."""
231 return self._readline()
233 def write(self, data):
234 """Write 'data' to output stream."""
235 #log('%d writing: %d bytes\n' % (os.getpid(), len(data)))
236 self.outp.write(data)
239 """Return true if input stream is readable."""
240 raise NotImplemented("Subclasses must implement has_input")
243 """Indicate end of output from last sent command."""
247 """Indicate server error to the client."""
248 s = re.sub(r'\s+', ' ', str(s))
249 self.write('\nerror %s\n' % s)
251 def _check_ok(self, onempty):
254 for rl in linereader(self):
255 #log('%d got line: %r\n' % (os.getpid(), rl))
256 if not rl: # empty line
260 elif rl.startswith('error '):
261 #log('client: error: %s\n' % rl[6:])
265 raise Exception('server exited unexpectedly; see errors above')
267 def drain_and_check_ok(self):
268 """Remove all data for the current command from input stream."""
271 return self._check_ok(onempty)
274 """Verify that server action completed successfully."""
276 raise Exception('expected "ok", got %r' % rl)
277 return self._check_ok(onempty)
280 class Conn(BaseConn):
281 def __init__(self, inp, outp):
282 BaseConn.__init__(self, outp)
285 def _read(self, size):
286 return self.inp.read(size)
289 return self.inp.readline()
292 [rl, wl, xl] = select.select([self.inp.fileno()], [], [], 0)
294 assert(rl[0] == self.inp.fileno())
300 def checked_reader(fd, n):
302 rl, _, _ = select.select([fd], [], [])
305 if not buf: raise Exception("Unexpected EOF reading %d more bytes" % n)
310 MAX_PACKET = 128 * 1024
311 def mux(p, outfd, outr, errr):
314 while p.poll() is None:
315 rl, _, _ = select.select(fds, [], [])
318 buf = os.read(outr, MAX_PACKET)
320 os.write(outfd, struct.pack('!IB', len(buf), 1) + buf)
322 buf = os.read(errr, 1024)
324 os.write(outfd, struct.pack('!IB', len(buf), 2) + buf)
326 os.write(outfd, struct.pack('!IB', 0, 3))
329 class DemuxConn(BaseConn):
330 """A helper class for bup's client-server protocol."""
331 def __init__(self, infd, outp):
332 BaseConn.__init__(self, outp)
333 # Anything that comes through before the sync string was not
334 # multiplexed and can be assumed to be debug/log before mux init.
336 while tail != 'BUPMUX':
337 b = os.read(infd, (len(tail) < 6) and (6-len(tail)) or 1)
339 raise IOError('demux: unexpected EOF during initialization')
341 sys.stderr.write(tail[:-6]) # pre-mux log messages
348 def write(self, data):
350 BaseConn.write(self, data)
352 def _next_packet(self, timeout):
353 if self.closed: return False
354 rl, wl, xl = select.select([self.infd], [], [], timeout)
355 if not rl: return False
356 assert(rl[0] == self.infd)
357 ns = ''.join(checked_reader(self.infd, 5))
358 n, fdw = struct.unpack('!IB', ns)
359 assert(n <= MAX_PACKET)
361 self.reader = checked_reader(self.infd, n)
363 for buf in checked_reader(self.infd, n):
364 sys.stderr.write(buf)
367 debug2("DemuxConn: marked closed\n")
370 def _load_buf(self, timeout):
371 if self.buf is not None:
373 while not self.closed:
374 while not self.reader:
375 if not self._next_packet(timeout):
378 self.buf = self.reader.next()
380 except StopIteration:
384 def _read_parts(self, ix_fn):
385 while self._load_buf(None):
386 assert(self.buf is not None)
388 if i is None or i == len(self.buf):
393 self.buf = self.buf[i:]
401 return buf.index('\n')+1
404 return ''.join(self._read_parts(find_eol))
406 def _read(self, size):
408 def until_size(buf): # Closes on csize
409 if len(buf) < csize[0]:
414 return ''.join(self._read_parts(until_size))
417 return self._load_buf(0)
421 """Generate a list of input lines from 'f' without terminating newlines."""
429 def chunkyreader(f, count = None):
430 """Generate a list of chunks of data read from 'f'.
432 If count is None, read until EOF is reached.
434 If count is a positive integer, read 'count' bytes from 'f'. If EOF is
435 reached while reading, raise IOError.
439 b = f.read(min(count, 65536))
441 raise IOError('EOF with %d bytes remaining' % count)
452 """Append "/" to 's' if it doesn't aleady end in "/"."""
453 if s and not s.endswith('/'):
459 def _mmap_do(f, sz, flags, prot, close):
461 st = os.fstat(f.fileno())
464 # trying to open a zero-length map gives an error, but an empty
465 # string has all the same behaviour of a zero-length map, ie. it has
468 map = mmap.mmap(f.fileno(), sz, flags, prot)
470 f.close() # map will persist beyond file close
474 def mmap_read(f, sz = 0, close=True):
475 """Create a read-only memory mapped region on file 'f'.
476 If sz is 0, the region will cover the entire file.
478 return _mmap_do(f, sz, mmap.MAP_PRIVATE, mmap.PROT_READ, close)
481 def mmap_readwrite(f, sz = 0, close=True):
482 """Create a read-write memory mapped region on file 'f'.
483 If sz is 0, the region will cover the entire file.
485 return _mmap_do(f, sz, mmap.MAP_SHARED, mmap.PROT_READ|mmap.PROT_WRITE,
489 def mmap_readwrite_private(f, sz = 0, close=True):
490 """Create a read-write memory mapped region on file 'f'.
491 If sz is 0, the region will cover the entire file.
492 The map is private, which means the changes are never flushed back to the
495 return _mmap_do(f, sz, mmap.MAP_PRIVATE, mmap.PROT_READ|mmap.PROT_WRITE,
500 """Parse data size information into a float number.
502 Here are some examples of conversions:
503 199.2k means 203981 bytes
504 1GB means 1073741824 bytes
505 2.1 tb means 2199023255552 bytes
507 g = re.match(r'([-+\d.e]+)\s*(\w*)', str(s))
509 raise ValueError("can't parse %r as a number" % s)
510 (val, unit) = g.groups()
513 if unit in ['t', 'tb']:
514 mult = 1024*1024*1024*1024
515 elif unit in ['g', 'gb']:
516 mult = 1024*1024*1024
517 elif unit in ['m', 'mb']:
519 elif unit in ['k', 'kb']:
521 elif unit in ['', 'b']:
524 raise ValueError("invalid unit %r in number %r" % (unit, s))
529 """Count the number of elements in an iterator. (consumes the iterator)"""
530 return reduce(lambda x,y: x+1, l)
535 """Append an error message to the list of saved errors.
537 Once processing is able to stop and output the errors, the saved errors are
538 accessible in the module variable helpers.saved_errors.
540 saved_errors.append(e)
549 istty = os.isatty(2) or atoi(os.environ.get('BUP_FORCE_TTY'))
551 """Calls log(s) if stderr is a TTY. Does nothing otherwise."""
557 """Replace the default exception handler for KeyboardInterrupt (Ctrl-C).
559 The new exception handler will make sure that bup will exit without an ugly
560 stacktrace when Ctrl-C is hit.
562 oldhook = sys.excepthook
563 def newhook(exctype, value, traceback):
564 if exctype == KeyboardInterrupt:
565 log('Interrupted.\n')
567 return oldhook(exctype, value, traceback)
568 sys.excepthook = newhook
571 def columnate(l, prefix):
572 """Format elements of 'l' in columns with 'prefix' leading each line.
574 The number of columns is determined automatically based on the string
580 clen = max(len(s) for s in l)
581 ncols = (tty_width() - len(prefix)) / (clen + 2)
586 while len(l) % ncols:
589 for s in range(0, len(l), rows):
590 cols.append(l[s:s+rows])
592 for row in zip(*cols):
593 out += prefix + ''.join(('%-*s' % (clen+2, s)) for s in row) + '\n'
597 def parse_date_or_fatal(str, fatal):
598 """Parses the given date or calls Option.fatal().
599 For now we expect a string that contains a float."""
602 except ValueError, e:
603 raise fatal('invalid date format (should be a float): %r' % e)
608 def strip_path(prefix, path):
609 """Strips a given prefix from a path.
611 First both paths are normalized.
613 Raises an Exception if no prefix is given.
616 raise Exception('no path given')
618 normalized_prefix = os.path.realpath(prefix)
619 debug2("normalized_prefix: %s\n" % normalized_prefix)
620 normalized_path = os.path.realpath(path)
621 debug2("normalized_path: %s\n" % normalized_path)
622 if normalized_path.startswith(normalized_prefix):
623 return normalized_path[len(normalized_prefix):]
628 def strip_base_path(path, base_paths):
629 """Strips the base path from a given path.
632 Determines the base path for the given string and then strips it
634 Iterates over all base_paths from long to short, to prevent that
635 a too short base_path is removed.
637 normalized_path = os.path.realpath(path)
638 sorted_base_paths = sorted(base_paths, key=len, reverse=True)
639 for bp in sorted_base_paths:
640 if normalized_path.startswith(os.path.realpath(bp)):
641 return strip_path(bp, normalized_path)
645 def graft_path(graft_points, path):
646 normalized_path = os.path.realpath(path)
647 for graft_point in graft_points:
648 old_prefix, new_prefix = graft_point
649 if normalized_path.startswith(old_prefix):
650 return re.sub(r'^' + old_prefix, new_prefix, normalized_path)
651 return normalized_path
654 # hashlib is only available in python 2.5 or higher, but the 'sha' module
655 # produces a DeprecationWarning in python 2.6 or higher. We want to support
656 # python 2.4 and above without any stupid warnings, so let's try using hashlib
657 # first, and downgrade if it fails.
668 """Format bup's version date string for output."""
669 return _version.DATE.split(' ')[0]
672 def version_commit():
673 """Get the commit hash of bup's current version."""
674 return _version.COMMIT
678 """Format bup's version tag (the official version number).
680 When generated from a commit other than one pointed to with a tag, the
681 returned string will be "unknown-" followed by the first seven positions of
684 names = _version.NAMES.strip()
685 assert(names[0] == '(')
686 assert(names[-1] == ')')
688 l = [n.strip() for n in names.split(',')]
690 if n.startswith('tag: bup-'):
692 return 'unknown-%s' % _version.COMMIT[:7]