"""Helper functions and classes for bup."""
+from __future__ import absolute_import, division
from collections import namedtuple
from contextlib import contextmanager
from ctypes import sizeof, c_void_p
return (leading_matches(), rest())
+def merge_dict(*xs):
+ result = {}
+ for x in xs:
+ result.update(x)
+ return result
+
+
def lines_until_sentinel(f, sentinel, ex_type):
# sentinel must end with \n and must contain only one \n
while True:
return os.getenv("FAKEROOTKEY") != None
-_warned_about_superuser_detection = None
-def is_superuser():
- if sys.platform.startswith('cygwin'):
- if sys.getwindowsversion()[0] > 5:
- # Sounds like situation is much more complicated here
- global _warned_about_superuser_detection
- if not _warned_about_superuser_detection:
- log("can't detect root status for OS version > 5; assuming not root")
- _warned_about_superuser_detection = True
- return False
- import ctypes
- return ctypes.cdll.shell32.IsUserAnAdmin()
- else:
+if sys.platform.startswith('cygwin'):
+ def is_superuser():
+ # https://cygwin.com/ml/cygwin/2015-02/msg00057.html
+ groups = os.getgroups()
+ return 544 in groups or 0 in groups
+else:
+ def is_superuser():
return os.geteuid() == 0
-def _cache_key_value(get_value, key, cache):
+def cache_key_value(get_value, key, cache):
"""Return (value, was_cached). If there is a value in the cache
for key, use that, otherwise, call get_value(key) which should
throw a KeyError if there is no value -- in which case the cached
return value, False
-_uid_to_pwd_cache = {}
-_name_to_pwd_cache = {}
-
-def pwd_from_uid(uid):
- """Return password database entry for uid (may be a cached value).
- Return None if no entry is found.
- """
- global _uid_to_pwd_cache, _name_to_pwd_cache
- entry, cached = _cache_key_value(pwd.getpwuid, uid, _uid_to_pwd_cache)
- if entry and not cached:
- _name_to_pwd_cache[entry.pw_name] = entry
- return entry
-
-
-def pwd_from_name(name):
- """Return password database entry for name (may be a cached value).
- Return None if no entry is found.
- """
- global _uid_to_pwd_cache, _name_to_pwd_cache
- entry, cached = _cache_key_value(pwd.getpwnam, name, _name_to_pwd_cache)
- if entry and not cached:
- _uid_to_pwd_cache[entry.pw_uid] = entry
- return entry
-
-
-_gid_to_grp_cache = {}
-_name_to_grp_cache = {}
-
-def grp_from_gid(gid):
- """Return password database entry for gid (may be a cached value).
- Return None if no entry is found.
- """
- global _gid_to_grp_cache, _name_to_grp_cache
- entry, cached = _cache_key_value(grp.getgrgid, gid, _gid_to_grp_cache)
- if entry and not cached:
- _name_to_grp_cache[entry.gr_name] = entry
- return entry
-
-
-def grp_from_name(name):
- """Return password database entry for name (may be a cached value).
- Return None if no entry is found.
- """
- global _gid_to_grp_cache, _name_to_grp_cache
- entry, cached = _cache_key_value(grp.getgrnam, name, _name_to_grp_cache)
- if entry and not cached:
- _gid_to_grp_cache[entry.gr_gid] = entry
- return entry
-
-
-_username = None
-def username():
- """Get the user's login name."""
- global _username
- if not _username:
- uid = os.getuid()
- _username = pwd_from_uid(uid)[0] or 'user%d' % uid
- return _username
-
-
-_userfullname = None
-def userfullname():
- """Get the user's full name."""
- global _userfullname
- if not _userfullname:
- uid = os.getuid()
- entry = pwd_from_uid(uid)
- if entry:
- _userfullname = entry[4].split(',')[0] or entry[0]
- if not _userfullname:
- _userfullname = 'user%d' % uid
- return _userfullname
-
-
_hostname = None
def hostname():
"""Get the FQDN of this machine."""
size = float(size)
if size < unit:
return "%d" % (size)
- exponent = int(math.log(size) / math.log(unit))
+ exponent = int(math.log(size) // math.log(unit))
size_prefix = "KMGTPE"[exponent - 1]
- return "%.1f%s" % (size / math.pow(unit, exponent), size_prefix)
+ return "%.1f%s" % (size // math.pow(unit, exponent), size_prefix)
class NotOk(Exception):
pref_chunk_size = 64 * 1024 * 1024
chunk_size = sc_page_size
if (sc_page_size < pref_chunk_size):
- chunk_size = sc_page_size * (pref_chunk_size / sc_page_size)
+ chunk_size = sc_page_size * (pref_chunk_size // sc_page_size)
_fmincore_chunk_size = chunk_size
def fmincore(fd):
return bytearray(0)
if not _fmincore_chunk_size:
_set_fmincore_chunk_size()
- pages_per_chunk = _fmincore_chunk_size / sc_page_size;
- page_count = (st.st_size + sc_page_size - 1) / sc_page_size;
- chunk_count = page_count / _fmincore_chunk_size
+ pages_per_chunk = _fmincore_chunk_size // sc_page_size;
+ page_count = (st.st_size + sc_page_size - 1) // sc_page_size;
+ chunk_count = page_count // _fmincore_chunk_size
if chunk_count < 1:
chunk_count = 1
result = bytearray(page_count)
- for ci in xrange(chunk_count):
+ for ci in compat.range(chunk_count):
pos = _fmincore_chunk_size * ci;
msize = min(_fmincore_chunk_size, st.st_size - pos)
try:
while len(l) % ncols:
l.append('')
rows = len(l) // ncols
- for s in range(0, len(l), rows):
+ for s in compat.range(0, len(l), rows):
cols.append(l[s:s+rows])
out = ''
for row in zip(*cols):
return x
-_some_invalid_save_parts_rx = re.compile(r'[[ ~^:?*\\]|\.\.|//|@{')
+_some_invalid_save_parts_rx = re.compile(r'[\[ ~^:?*\\]|\.\.|//|@{')
def valid_save_name(name):
# Enforce a superset of the restrictions in git-check-ref-format(1)