"""Enhanced stat operations for bup."""
-import math
-import os
-import bup._helpers as _helpers
+from __future__ import absolute_import
+import os, sys
+import stat as pystat
+from bup import _helpers
try:
- _have_utimensat = _helpers.utimensat
-except AttributeError, e:
- _have_utimensat = False
+ _bup_utimensat = _helpers.bup_utimensat
+except AttributeError as e:
+ _bup_utimensat = False
+try:
+ _bup_utimes = _helpers.bup_utimes
+except AttributeError as e:
+ _bup_utimes = False
-class FSTime():
- # Class to represent filesystem timestamps. Use integer
- # nanoseconds on platforms where we have the higher resolution
- # lstat. Use the native python stat representation (floating
- # point seconds) otherwise.
+try:
+ _bup_lutimes = _helpers.bup_lutimes
+except AttributeError as e:
+ _bup_lutimes = False
- def __cmp__(self, x):
- return self._value.__cmp__(x._value)
+assert sys.version_info[0] < 3 \
+ or not (_bup_utimensat or _bup_utimes or _bup_lutimes)
- def to_timespec(self):
- """Return (s, ns) where ns is always non-negative
- and t = s + ns / 10e8""" # metadata record rep (and libc rep)
- s_ns = self.secs_nsecs()
- if s_ns[0] > 0 or s_ns[1] >= 0:
- return s_ns
- return (s_ns[0] - 1, 10**9 + s_ns[1]) # ns is negative
- @staticmethod
- def from_secs(secs):
- ts = FSTime()
- ts._value = int(secs * 10**9)
- return ts
+def timespec_to_nsecs(ts):
+ ts_s, ts_ns = ts
+ return ts_s * 10**9 + ts_ns
- @staticmethod
- def from_timespec(timespec):
- ts = FSTime()
- ts._value = timespec[0] * 10**9 + timespec[1]
- return ts
- def approx_secs(self):
- return self._value / 10e8;
+def nsecs_to_timespec(ns):
+ """Return (s, ns) where ns is always non-negative
+ and t = s + ns / 10e8""" # metadata record rep
+ ns = int(ns)
+ return (ns // 10**9, ns % 10**9)
+
- def secs_nsecs(self):
- "Return a (s, ns) pair: -1.5s -> (-1, -10**9 / 2)."
- if self._value >= 0:
- return (self._value / 10**9, self._value % 10**9)
- abs_val = -self._value
- return (- (abs_val / 10**9), - (abs_val % 10**9))
+def nsecs_to_timeval(ns):
+ """Return (s, us) where ns is always non-negative
+ and t = s + us / 10e5"""
+ ns = int(ns)
+ return (ns // 10**9, (ns % 10**9) // 1000)
- if _helpers._have_ns_fs_timestamps: # Use integer nanoseconds.
- @staticmethod
- def from_stat_time(stat_time):
- return FSTime.from_timespec(stat_time)
+def fstime_floor_secs(ns):
+ """Return largest integer not greater than ns / 10e8."""
+ return int(ns) // 10**9;
- else: # Use python default floating-point seconds.
- @staticmethod
- def from_stat_time(stat_time):
- ts = FSTime()
- x = math.modf(stat_time)
- ts._value = int(x[1]) + int(x[0] * 10**9)
- return ts
+def fstime_to_timespec(ns):
+ return nsecs_to_timespec(ns)
-if _have_utimensat:
+def fstime_to_sec_bytes(fstime):
+ (s, ns) = fstime_to_timespec(fstime)
+ if(s < 0):
+ s += 1
+ if ns == 0:
+ return b'%d' % s
+ else:
+ return b'%d.%09d' % (s, ns)
+if sys.version_info[0] > 2:
+ def utime(path, times):
+ """Times must be provided as (atime_ns, mtime_ns)."""
+ os.utime(path, ns=times)
def lutime(path, times):
- atime = times[0].to_timespec()
- mtime = times[1].to_timespec()
- return _helpers.utimensat(_helpers.AT_FDCWD, path, (atime, mtime),
- _helpers.AT_SYMLINK_NOFOLLOW)
+ """Times must be provided as (atime_ns, mtime_ns)."""
+ os.utime(path, ns=times, follow_symlinks=False)
+elif _bup_utimensat:
def utime(path, times):
- atime = times[0].to_timespec()
- mtime = times[1].to_timespec()
- return _helpers.utimensat(_helpers.AT_FDCWD, path, (atime, mtime), 0)
-
-else:
-
+ """Times must be provided as (atime_ns, mtime_ns)."""
+ atime = nsecs_to_timespec(times[0])
+ mtime = nsecs_to_timespec(times[1])
+ _bup_utimensat(_helpers.AT_FDCWD, path, (atime, mtime), 0)
def lutime(path, times):
- return None
-
+ """Times must be provided as (atime_ns, mtime_ns)."""
+ atime = nsecs_to_timespec(times[0])
+ mtime = nsecs_to_timespec(times[1])
+ _bup_utimensat(_helpers.AT_FDCWD, path, (atime, mtime),
+ _helpers.AT_SYMLINK_NOFOLLOW)
+else: # Must have these if utimensat isn't available.
def utime(path, times):
- atime = times[0].approx_secs()
- mtime = times[1].approx_secs()
- os.utime(path, (atime, mtime))
+ """Times must be provided as (atime_ns, mtime_ns)."""
+ atime = nsecs_to_timeval(times[0])
+ mtime = nsecs_to_timeval(times[1])
+ _bup_utimes(path, (atime, mtime))
+ def lutime(path, times):
+ """Times must be provided as (atime_ns, mtime_ns)."""
+ atime = nsecs_to_timeval(times[0])
+ mtime = nsecs_to_timeval(times[1])
+ _bup_lutimes(path, (atime, mtime))
+_cygwin_sys = sys.platform.startswith('cygwin')
-class stat_result():
+def _fix_cygwin_id(id):
+ if id < 0:
+ id += 0x100000000
+ assert(id >= 0)
+ return id
+
+class stat_result:
+ __slots__ = ('st_mode', 'st_ino', 'st_dev', 'st_nlink', 'st_uid', 'st_gid',
+ 'st_rdev', 'st_size', 'st_atime', 'st_mtime', 'st_ctime')
@staticmethod
- def from_stat_rep(st):
+ def from_xstat_rep(st):
+ global _cygwin_sys
result = stat_result()
- if _helpers._have_ns_fs_timestamps:
- (result.st_mode,
- result.st_ino,
- result.st_dev,
- result.st_nlink,
- result.st_uid,
- result.st_gid,
- result.st_rdev,
- result.st_size,
- atime,
- mtime,
- ctime) = st
- else:
- result.st_mode = st.st_mode
- result.st_ino = st.st_ino
- result.st_dev = st.st_dev
- result.st_nlink = st.st_nlink
- result.st_uid = st.st_uid
- result.st_gid = st.st_gid
- result.st_rdev = st.st_rdev
- result.st_size = st.st_size
- atime = FSTime.from_stat_time(st.st_atime)
- mtime = FSTime.from_stat_time(st.st_mtime)
- ctime = FSTime.from_stat_time(st.st_ctime)
- result.st_atime = FSTime.from_stat_time(atime)
- result.st_mtime = FSTime.from_stat_time(mtime)
- result.st_ctime = FSTime.from_stat_time(ctime)
+ (result.st_mode,
+ result.st_ino,
+ result.st_dev,
+ result.st_nlink,
+ result.st_uid,
+ result.st_gid,
+ result.st_rdev,
+ result.st_size,
+ result.st_atime,
+ result.st_mtime,
+ result.st_ctime) = st
+ # Inlined timespec_to_nsecs after profiling
+ result.st_atime = result.st_atime[0] * 10**9 + result.st_atime[1]
+ result.st_mtime = result.st_mtime[0] * 10**9 + result.st_mtime[1]
+ result.st_ctime = result.st_ctime[0] * 10**9 + result.st_ctime[1]
+ if _cygwin_sys:
+ result.st_uid = _fix_cygwin_id(result.st_uid)
+ result.st_gid = _fix_cygwin_id(result.st_gid)
return result
-try:
- _stat = _helpers.stat
-except AttributeError, e:
- _stat = os.stat
-
def stat(path):
- return stat_result.from_stat_rep(_stat(path))
+ return stat_result.from_xstat_rep(_helpers.stat(path))
-try:
- _fstat = _helpers.fstat
-except AttributeError, e:
- _fstat = os.fstat
-
def fstat(path):
- return stat_result.from_stat_rep(_fstat(path))
-
+ return stat_result.from_xstat_rep(_helpers.fstat(path))
-try:
- _lstat = _helpers.lstat
-except AttributeError, e:
- _lstat = os.lstat
def lstat(path):
- return stat_result.from_stat_rep(_lstat(path))
+ return stat_result.from_xstat_rep(_helpers.lstat(path))
+
+
+def mode_str(mode):
+ result = ''
+ # FIXME: Other types?
+ if pystat.S_ISREG(mode):
+ result += '-'
+ elif pystat.S_ISDIR(mode):
+ result += 'd'
+ elif pystat.S_ISCHR(mode):
+ result += 'c'
+ elif pystat.S_ISBLK(mode):
+ result += 'b'
+ elif pystat.S_ISFIFO(mode):
+ result += 'p'
+ elif pystat.S_ISLNK(mode):
+ result += 'l'
+ elif pystat.S_ISSOCK(mode):
+ result += 's'
+ else:
+ result += '?'
+
+ result += 'r' if (mode & pystat.S_IRUSR) else '-'
+ result += 'w' if (mode & pystat.S_IWUSR) else '-'
+ result += 'x' if (mode & pystat.S_IXUSR) else '-'
+ result += 'r' if (mode & pystat.S_IRGRP) else '-'
+ result += 'w' if (mode & pystat.S_IWGRP) else '-'
+ result += 'x' if (mode & pystat.S_IXGRP) else '-'
+ result += 'r' if (mode & pystat.S_IROTH) else '-'
+ result += 'w' if (mode & pystat.S_IWOTH) else '-'
+ result += 'x' if (mode & pystat.S_IXOTH) else '-'
+ return result
+
+
+def classification_str(mode, include_exec):
+ if pystat.S_ISREG(mode):
+ if include_exec \
+ and (pystat.S_IMODE(mode) \
+ & (pystat.S_IXUSR | pystat.S_IXGRP | pystat.S_IXOTH)):
+ return '*'
+ else:
+ return ''
+ elif pystat.S_ISDIR(mode):
+ return '/'
+ elif pystat.S_ISLNK(mode):
+ return '@'
+ elif pystat.S_ISFIFO(mode):
+ return '|'
+ elif pystat.S_ISSOCK(mode):
+ return '='
+ else:
+ return ''