1 """Helper functions and classes for bup."""
3 import sys, os, pwd, subprocess, errno, socket, select, mmap, stat, re, struct
5 from bup import _version
7 # This function should really be in helpers, not in bup.options. But we
8 # want options.py to be standalone so people can include it in other projects.
9 from bup.options import _tty_width
10 tty_width = _tty_width
14 """Convert the string 's' to an integer. Return 0 if s is not a number."""
22 """Convert the string 's' to a float. Return 0 if s is not a number."""
24 return float(s or '0')
29 buglvl = atoi(os.environ.get('BUP_DEBUG', 0))
32 # Write (blockingly) to sockets that may or may not be in blocking mode.
33 # We need this because our stderr is sometimes eaten by subprocesses
34 # (probably ssh) that sometimes make it nonblocking, if only temporarily,
35 # leading to race conditions. Ick. We'll do it the hard way.
36 def _hard_write(fd, buf):
38 (r,w,x) = select.select([], [fd], [], None)
40 raise IOError('select(fd) returned without being writable')
42 sz = os.write(fd, buf)
44 if e.errno != errno.EAGAIN:
50 """Print a log message to stderr."""
52 _hard_write(sys.stderr.fileno(), s)
65 def mkdirp(d, mode=None):
66 """Recursively create directories on path 'd'.
68 Unlike os.makedirs(), it doesn't raise an exception if the last element of
69 the path already exists.
77 if e.errno == errno.EEXIST:
84 """Get the next item from an iterator, None if we reached the end."""
91 def merge_iter(iters, pfreq, pfunc, pfinal, key=None):
93 samekey = lambda e, pe: getattr(e, key) == getattr(pe, key, None)
97 total = sum(len(it) for it in iters)
98 iters = (iter(it) for it in iters)
99 heap = ((next(it),it) for it in iters)
100 heap = [(e,it) for e,it in heap if e]
105 if not count % pfreq:
108 if not samekey(e, pe):
113 e = it.next() # Don't use next() function, it's too expensive
114 except StopIteration:
115 heapq.heappop(heap) # remove current
117 heapq.heapreplace(heap, (e, it)) # shift current to new location
122 """Delete a file at path 'f' if it currently exists.
124 Unlike os.unlink(), does not throw an exception if the file didn't already
130 if e.errno == errno.ENOENT:
131 pass # it doesn't exist, that's what you asked for
135 """Run a subprocess and return its output."""
136 p = subprocess.Popen(argv, stdout=subprocess.PIPE)
143 """Get the absolute path of a file.
145 Behaves like os.path.realpath, but doesn't follow a symlink for the last
146 element. (ie. if 'p' itself is a symlink, this one won't follow it, but it
147 will follow symlinks in p's directory)
153 if st and stat.S_ISLNK(st.st_mode):
154 (dir, name) = os.path.split(p)
155 dir = os.path.realpath(dir)
156 out = os.path.join(dir, name)
158 out = os.path.realpath(p)
159 #log('realpathing:%r,%r\n' % (p, out))
165 """Get the user's login name."""
170 _username = pwd.getpwuid(uid)[0]
172 _username = 'user%d' % uid
178 """Get the user's full name."""
180 if not _userfullname:
183 _userfullname = pwd.getpwuid(uid)[4].split(',')[0]
185 _userfullname = 'user%d' % uid
191 """Get the FQDN of this machine."""
194 _hostname = socket.getfqdn()
198 _resource_path = None
199 def resource_path(subdir=''):
200 global _resource_path
201 if not _resource_path:
202 _resource_path = os.environ.get('BUP_RESOURCE_PATH') or '.'
203 return os.path.join(_resource_path, subdir)
205 class NotOk(Exception):
209 def __init__(self, outp):
213 while self._read(65536): pass
215 def read(self, size):
216 """Read 'size' bytes from input stream."""
218 return self._read(size)
221 """Read from input stream until a newline is found."""
223 return self._readline()
225 def write(self, data):
226 """Write 'data' to output stream."""
227 #log('%d writing: %d bytes\n' % (os.getpid(), len(data)))
228 self.outp.write(data)
231 """Return true if input stream is readable."""
232 raise NotImplemented("Subclasses must implement has_input")
235 """Indicate end of output from last sent command."""
239 """Indicate server error to the client."""
240 s = re.sub(r'\s+', ' ', str(s))
241 self.write('\nerror %s\n' % s)
243 def _check_ok(self, onempty):
246 for rl in linereader(self):
247 #log('%d got line: %r\n' % (os.getpid(), rl))
248 if not rl: # empty line
252 elif rl.startswith('error '):
253 #log('client: error: %s\n' % rl[6:])
257 raise Exception('server exited unexpectedly; see errors above')
259 def drain_and_check_ok(self):
260 """Remove all data for the current command from input stream."""
263 return self._check_ok(onempty)
266 """Verify that server action completed successfully."""
268 raise Exception('expected "ok", got %r' % rl)
269 return self._check_ok(onempty)
271 class Conn(BaseConn):
272 def __init__(self, inp, outp):
273 BaseConn.__init__(self, outp)
276 def _read(self, size):
277 return self.inp.read(size)
280 return self.inp.readline()
283 [rl, wl, xl] = select.select([self.inp.fileno()], [], [], 0)
285 assert(rl[0] == self.inp.fileno())
290 def checked_reader(fd, n):
292 rl, _, _ = select.select([fd], [], [])
295 if not buf: raise Exception("Unexpected EOF reading %d more bytes" % n)
299 MAX_PACKET = 128 * 1024
300 def mux(p, outfd, outr, errr):
303 while p.poll() is None:
304 rl, _, _ = select.select(fds, [], [])
307 buf = os.read(outr, MAX_PACKET)
309 os.write(outfd, struct.pack('!IB', len(buf), 1) + buf)
311 buf = os.read(errr, 1024)
313 os.write(outfd, struct.pack('!IB', len(buf), 2) + buf)
315 os.write(outfd, struct.pack('!IB', 0, 3))
317 class DemuxConn(BaseConn):
318 """A helper class for bup's client-server protocol."""
319 def __init__(self, infd, outp):
320 BaseConn.__init__(self, outp)
321 # Anything that comes through before the sync string was not
322 # multiplexed and can be assumed to be debug/log before mux init.
324 while tail != 'BUPMUX':
325 tail += os.read(infd, 1024)
328 sys.stderr.write(buf)
334 def write(self, data):
336 BaseConn.write(self, data)
338 def _next_packet(self, timeout):
339 if self.closed: return False
340 rl, wl, xl = select.select([self.infd], [], [], timeout)
341 if not rl: return False
342 assert(rl[0] == self.infd)
343 ns = ''.join(checked_reader(self.infd, 5))
344 n, fdw = struct.unpack('!IB', ns)
345 assert(n<=MAX_PACKET)
347 self.reader = checked_reader(self.infd, n)
349 for buf in checked_reader(self.infd, n):
350 sys.stderr.write(buf)
353 debug2("DemuxConn: marked closed\n")
356 def _load_buf(self, timeout):
357 if self.buf is not None:
359 while not self.closed:
360 while not self.reader:
361 if not self._next_packet(timeout):
364 self.buf = self.reader.next()
366 except StopIteration:
370 def _read_parts(self, ix_fn):
371 while self._load_buf(None):
372 assert(self.buf is not None)
374 if i is None or i == len(self.buf):
379 self.buf = self.buf[i:]
387 return buf.index('\n')+1
390 return ''.join(self._read_parts(find_eol))
392 def _read(self, size):
394 def until_size(buf): # Closes on csize
395 if len(buf) < csize[0]:
400 return ''.join(self._read_parts(until_size))
403 return self._load_buf(0)
406 """Generate a list of input lines from 'f' without terminating newlines."""
414 def chunkyreader(f, count = None):
415 """Generate a list of chunks of data read from 'f'.
417 If count is None, read until EOF is reached.
419 If count is a positive integer, read 'count' bytes from 'f'. If EOF is
420 reached while reading, raise IOError.
424 b = f.read(min(count, 65536))
426 raise IOError('EOF with %d bytes remaining' % count)
437 """Append "/" to 's' if it doesn't aleady end in "/"."""
438 if s and not s.endswith('/'):
444 def _mmap_do(f, sz, flags, prot):
446 st = os.fstat(f.fileno())
449 # trying to open a zero-length map gives an error, but an empty
450 # string has all the same behaviour of a zero-length map, ie. it has
453 map = mmap.mmap(f.fileno(), sz, flags, prot)
454 f.close() # map will persist beyond file close
458 def mmap_read(f, sz = 0):
459 """Create a read-only memory mapped region on file 'f'.
461 If sz is 0, the region will cover the entire file.
463 return _mmap_do(f, sz, mmap.MAP_PRIVATE, mmap.PROT_READ)
466 def mmap_readwrite(f, sz = 0):
467 """Create a read-write memory mapped region on file 'f'.
469 If sz is 0, the region will cover the entire file.
471 return _mmap_do(f, sz, mmap.MAP_SHARED, mmap.PROT_READ|mmap.PROT_WRITE)
475 """Parse data size information into a float number.
477 Here are some examples of conversions:
478 199.2k means 203981 bytes
479 1GB means 1073741824 bytes
480 2.1 tb means 2199023255552 bytes
482 g = re.match(r'([-+\d.e]+)\s*(\w*)', str(s))
484 raise ValueError("can't parse %r as a number" % s)
485 (val, unit) = g.groups()
488 if unit in ['t', 'tb']:
489 mult = 1024*1024*1024*1024
490 elif unit in ['g', 'gb']:
491 mult = 1024*1024*1024
492 elif unit in ['m', 'mb']:
494 elif unit in ['k', 'kb']:
496 elif unit in ['', 'b']:
499 raise ValueError("invalid unit %r in number %r" % (unit, s))
504 """Count the number of elements in an iterator. (consumes the iterator)"""
505 return reduce(lambda x,y: x+1, l)
510 """Append an error message to the list of saved errors.
512 Once processing is able to stop and output the errors, the saved errors are
513 accessible in the module variable helpers.saved_errors.
515 saved_errors.append(e)
518 istty = os.isatty(2) or atoi(os.environ.get('BUP_FORCE_TTY'))
520 """Calls log(s) if stderr is a TTY. Does nothing otherwise."""
526 """Replace the default exception handler for KeyboardInterrupt (Ctrl-C).
528 The new exception handler will make sure that bup will exit without an ugly
529 stacktrace when Ctrl-C is hit.
531 oldhook = sys.excepthook
532 def newhook(exctype, value, traceback):
533 if exctype == KeyboardInterrupt:
534 log('Interrupted.\n')
536 return oldhook(exctype, value, traceback)
537 sys.excepthook = newhook
540 def columnate(l, prefix):
541 """Format elements of 'l' in columns with 'prefix' leading each line.
543 The number of columns is determined automatically based on the string
549 clen = max(len(s) for s in l)
550 ncols = (tty_width() - len(prefix)) / (clen + 2)
555 while len(l) % ncols:
558 for s in range(0, len(l), rows):
559 cols.append(l[s:s+rows])
561 for row in zip(*cols):
562 out += prefix + ''.join(('%-*s' % (clen+2, s)) for s in row) + '\n'
565 def parse_date_or_fatal(str, fatal):
566 """Parses the given date or calls Option.fatal().
567 For now we expect a string that contains a float."""
570 except ValueError, e:
571 raise fatal('invalid date format (should be a float): %r' % e)
575 def strip_path(prefix, path):
576 """Strips a given prefix from a path.
578 First both paths are normalized.
580 Raises an Exception if no prefix is given.
583 raise Exception('no path given')
585 normalized_prefix = os.path.realpath(prefix)
586 debug2("normalized_prefix: %s\n" % normalized_prefix)
587 normalized_path = os.path.realpath(path)
588 debug2("normalized_path: %s\n" % normalized_path)
589 if normalized_path.startswith(normalized_prefix):
590 return normalized_path[len(normalized_prefix):]
594 def strip_base_path(path, base_paths):
595 """Strips the base path from a given path.
598 Determines the base path for the given string and then strips it
600 Iterates over all base_paths from long to short, to prevent that
601 a too short base_path is removed.
603 normalized_path = os.path.realpath(path)
604 sorted_base_paths = sorted(base_paths, key=len, reverse=True)
605 for bp in sorted_base_paths:
606 if normalized_path.startswith(os.path.realpath(bp)):
607 return strip_path(bp, normalized_path)
610 def graft_path(graft_points, path):
611 normalized_path = os.path.realpath(path)
612 for graft_point in graft_points:
613 old_prefix, new_prefix = graft_point
614 if normalized_path.startswith(old_prefix):
615 return re.sub(r'^' + old_prefix, new_prefix, normalized_path)
616 return normalized_path
619 # hashlib is only available in python 2.5 or higher, but the 'sha' module
620 # produces a DeprecationWarning in python 2.6 or higher. We want to support
621 # python 2.4 and above without any stupid warnings, so let's try using hashlib
622 # first, and downgrade if it fails.
633 """Format bup's version date string for output."""
634 return _version.DATE.split(' ')[0]
636 def version_commit():
637 """Get the commit hash of bup's current version."""
638 return _version.COMMIT
641 """Format bup's version tag (the official version number).
643 When generated from a commit other than one pointed to with a tag, the
644 returned string will be "unknown-" followed by the first seven positions of
647 names = _version.NAMES.strip()
648 assert(names[0] == '(')
649 assert(names[-1] == ')')
651 l = [n.strip() for n in names.split(',')]
653 if n.startswith('tag: bup-'):
655 return 'unknown-%s' % _version.COMMIT[:7]