]> arthur.barton.de Git - bup.git/blobdiff - lib/bup/xstat.py
get: adjust for python 3 and test there
[bup.git] / lib / bup / xstat.py
index 696cc1454b3ae10760462a716241d97385cd63c3..461c932f15865a4611c6342313f9220ae4917bdb 100644 (file)
 """Enhanced stat operations for bup."""
-import os
-import bup._helpers as _helpers
-
-class FSTime():
-    # Class to represent filesystem timestamps.  Use integer
-    # nanoseconds on platforms where we have the higher resolution
-    # lstat.  Use the native python stat representation (floating
-    # point seconds) otherwise.
-
-    def __cmp__(self, x):
-        return self._value.__cmp__(x._value)
-
-    def to_timespec(self):
-        """Return (s, ns) where ns is always non-negative
-        and t = s + ns / 10e8""" # metadata record rep (and libc rep)
-        s_ns = self.secs_nsecs()
-        if s_ns[0] > 0 or s_ns[1] >= 0:
-            return s_ns
-        return (s_ns[0] - 1, 10**9 + s_ns[1]) # ns is negative
-
-    if _helpers.lstat: # Use integer nanoseconds.
-
-        @staticmethod
-        def from_secs(secs):
-            ts = FSTime()
-            ts._value = int(secs * 10**9)
-            return ts
-
-        @staticmethod
-        def from_timespec(timespec):
-            ts = FSTime()
-            ts._value = timespec[0] * 10**9 + timespec[1]
-            return ts
-
-        @staticmethod
-        def from_stat_time(stat_time):
-            return FSTime.from_timespec(stat_time)
-
-        def approx_secs(self):
-            return self._value / 10e8;
-
-        def secs_nsecs(self):
-            "Return a (s, ns) pair: -1.5s -> (-1, -10**9 / 2)."
-            if self._value >= 0:
-                return (self._value / 10**9, self._value % 10**9)
-            abs_val = -self._value
-            return (- (abs_val / 10**9), - (abs_val % 10**9))
-
-    else: # Use python default floating-point seconds.
-
-        @staticmethod
-        def from_secs(secs):
-            ts = FSTime()
-            ts._value = secs
-            return ts
-
-        @staticmethod
-        def from_timespec(timespec):
-            ts = FSTime()
-            ts._value = timespec[0] + (timespec[1] / 10e8)
-            return ts
-
-        @staticmethod
-        def from_stat_time(stat_time):
-            ts = FSTime()
-            ts._value = stat_time
-            return ts
-
-        def approx_secs(self):
-            return self._value
-
-        def secs_nsecs(self):
-            "Return a (s, ns) pair: -1.5s -> (-1, -5**9)."
-            x = math.modf(self._value)
-            return (x[1], x[0] * 10**9)
-
-
-def lutime(path, times):
-    if _helpers.utimensat:
-        atime = times[0].to_timespec()
-        mtime = times[1].to_timespec()
-        return _helpers.utimensat(_helpers.AT_FDCWD, path, (atime, mtime),
-                                  _helpers.AT_SYMLINK_NOFOLLOW)
-    else:
-        return None
 
+from __future__ import absolute_import
+import os, sys
+import stat as pystat
+from bup import _helpers
 
-def utime(path, times):
-    if _helpers.utimensat:
-        atime = times[0].to_timespec()
-        mtime = times[1].to_timespec()
-        return _helpers.utimensat(_helpers.AT_FDCWD, path, (atime, mtime), 0)
-    else:
-        atime = times[0].approx_secs()
-        mtime = times[1].approx_secs()
-        os.utime(path, (atime, mtime))
+try:
+    _bup_utimensat = _helpers.bup_utimensat
+except AttributeError as e:
+    _bup_utimensat = False
+
+try:
+    _bup_utimes = _helpers.bup_utimes
+except AttributeError as e:
+    _bup_utimes = False
+
+try:
+    _bup_lutimes = _helpers.bup_lutimes
+except AttributeError as e:
+    _bup_lutimes = False
+
+
+def timespec_to_nsecs(ts):
+    ts_s, ts_ns = ts
+    return ts_s * 10**9 + ts_ns
+
+
+def nsecs_to_timespec(ns):
+    """Return (s, ns) where ns is always non-negative
+    and t = s + ns / 10e8""" # metadata record rep
+    ns = int(ns)
+    return (ns // 10**9, ns % 10**9)
+
+
+def nsecs_to_timeval(ns):
+    """Return (s, us) where ns is always non-negative
+    and t = s + us / 10e5"""
+    ns = int(ns)
+    return (ns // 10**9, (ns % 10**9) // 1000)
 
 
-class stat_result():
+def fstime_floor_secs(ns):
+    """Return largest integer not greater than ns / 10e8."""
+    return int(ns) // 10**9;
 
+
+def fstime_to_timespec(ns):
+    return nsecs_to_timespec(ns)
+
+
+def fstime_to_sec_bytes(fstime):
+    (s, ns) = fstime_to_timespec(fstime)
+    if(s < 0):
+        s += 1
+    if ns == 0:
+        return b'%d' % s
+    else:
+        return b'%d.%09d' % (s, ns)
+
+
+if _bup_utimensat:
+    def utime(path, times):
+        """Times must be provided as (atime_ns, mtime_ns)."""
+        atime = nsecs_to_timespec(times[0])
+        mtime = nsecs_to_timespec(times[1])
+        _bup_utimensat(_helpers.AT_FDCWD, path, (atime, mtime), 0)
+    def lutime(path, times):
+        """Times must be provided as (atime_ns, mtime_ns)."""
+        atime = nsecs_to_timespec(times[0])
+        mtime = nsecs_to_timespec(times[1])
+        _bup_utimensat(_helpers.AT_FDCWD, path, (atime, mtime),
+                       _helpers.AT_SYMLINK_NOFOLLOW)
+else: # Must have these if utimensat isn't available.
+    def utime(path, times):
+        """Times must be provided as (atime_ns, mtime_ns)."""
+        atime = nsecs_to_timeval(times[0])
+        mtime = nsecs_to_timeval(times[1])
+        _bup_utimes(path, (atime, mtime))
+    def lutime(path, times):
+        """Times must be provided as (atime_ns, mtime_ns)."""
+        atime = nsecs_to_timeval(times[0])
+        mtime = nsecs_to_timeval(times[1])
+        _bup_lutimes(path, (atime, mtime))
+
+_cygwin_sys = sys.platform.startswith('cygwin')
+
+def _fix_cygwin_id(id):
+    if id < 0:
+        id += 0x100000000
+        assert(id >= 0)
+    return id
+
+
+class stat_result:
     @staticmethod
-    def from_stat_rep(st):
+    def from_xstat_rep(st):
+        global _cygwin_sys
         result = stat_result()
-        if _helpers._have_ns_fs_timestamps:
-            (result.st_mode,
-             result.st_ino,
-             result.st_dev,
-             result.st_nlink,
-             result.st_uid,
-             result.st_gid,
-             result.st_rdev,
-             result.st_size,
-             atime,
-             mtime,
-             ctime) = st
-        else:
-            result.st_mode = st.st_mode
-            result.st_ino = st.st_ino
-            result.st_dev = st.st_dev
-            result.st_nlink = st.st_nlink
-            result.st_uid = st.st_uid
-            result.st_gid = st.st_gid
-            result.st_rdev = st.st_rdev
-            result.st_size = st.st_size
-            atime = FSTime.from_stat_time(st.st_atime)
-            mtime = FSTime.from_stat_time(st.st_mtime)
-            ctime = FSTime.from_stat_time(st.st_ctime)
-        result.st_atime = FSTime.from_stat_time(atime)
-        result.st_mtime = FSTime.from_stat_time(mtime)
-        result.st_ctime = FSTime.from_stat_time(ctime)
+        (result.st_mode,
+         result.st_ino,
+         result.st_dev,
+         result.st_nlink,
+         result.st_uid,
+         result.st_gid,
+         result.st_rdev,
+         result.st_size,
+         result.st_atime,
+         result.st_mtime,
+         result.st_ctime) = st
+        # Inlined timespec_to_nsecs after profiling
+        result.st_atime = result.st_atime[0] * 10**9 + result.st_atime[1]
+        result.st_mtime = result.st_mtime[0] * 10**9 + result.st_mtime[1]
+        result.st_ctime = result.st_ctime[0] * 10**9 + result.st_ctime[1]
+        if _cygwin_sys:
+            result.st_uid = _fix_cygwin_id(result.st_uid)
+            result.st_gid = _fix_cygwin_id(result.st_gid)
         return result
 
 
 def stat(path):
-    if _helpers.stat:
-        st = _helpers.stat(path)
-    else:
-        st = os.stat(path)
-    return stat_result.from_stat_rep(st)
+    return stat_result.from_xstat_rep(_helpers.stat(path))
 
 
 def fstat(path):
-    if _helpers.fstat:
-        st = _helpers.fstat(path)
-    else:
-        st = os.fstat(path)
-    return stat_result.from_stat_rep(st)
+    return stat_result.from_xstat_rep(_helpers.fstat(path))
 
 
 def lstat(path):
-    if _helpers.lstat:
-        st = _helpers.lstat(path)
+    return stat_result.from_xstat_rep(_helpers.lstat(path))
+
+
+def mode_str(mode):
+    result = ''
+    # FIXME: Other types?
+    if pystat.S_ISREG(mode):
+        result += '-'
+    elif pystat.S_ISDIR(mode):
+        result += 'd'
+    elif pystat.S_ISCHR(mode):
+        result += 'c'
+    elif pystat.S_ISBLK(mode):
+        result += 'b'
+    elif pystat.S_ISFIFO(mode):
+        result += 'p'
+    elif pystat.S_ISLNK(mode):
+        result += 'l'
+    elif pystat.S_ISSOCK(mode):
+        result += 's'
+    else:
+        result += '?'
+
+    result += 'r' if (mode & pystat.S_IRUSR) else '-'
+    result += 'w' if (mode & pystat.S_IWUSR) else '-'
+    result += 'x' if (mode & pystat.S_IXUSR) else '-'
+    result += 'r' if (mode & pystat.S_IRGRP) else '-'
+    result += 'w' if (mode & pystat.S_IWGRP) else '-'
+    result += 'x' if (mode & pystat.S_IXGRP) else '-'
+    result += 'r' if (mode & pystat.S_IROTH) else '-'
+    result += 'w' if (mode & pystat.S_IWOTH) else '-'
+    result += 'x' if (mode & pystat.S_IXOTH) else '-'
+    return result
+
+
+def classification_str(mode, include_exec):
+    if pystat.S_ISREG(mode):
+        if include_exec \
+           and (pystat.S_IMODE(mode) \
+                & (pystat.S_IXUSR | pystat.S_IXGRP | pystat.S_IXOTH)):
+            return '*'
+        else:
+            return ''
+    elif pystat.S_ISDIR(mode):
+        return '/'
+    elif pystat.S_ISLNK(mode):
+        return '@'
+    elif pystat.S_ISFIFO(mode):
+        return '|'
+    elif pystat.S_ISSOCK(mode):
+        return '='
     else:
-        st = os.lstat(path)
-    return stat_result.from_stat_rep(st)
+        return ''