2 from __future__ import absolute_import, print_function
3 import errno, glob, grp, pwd, stat, tempfile, subprocess
9 from bup import git, metadata
11 from bup.compat import range
12 from bup.helpers import clear_errors, detect_fakeroot, is_superuser, resolve_parent
13 from bup.repo import LocalRepo
14 from bup.xstat import utime, lutime
15 import bup.helpers as helpers
16 from bup.compat import fsencode
18 lib_t_dir = os.path.dirname(fsencode(__file__))
20 top_dir = os.path.realpath(os.path.join(lib_t_dir, b'..', b'..'))
22 bup_path = top_dir + b'/bup'
27 cmd_str = b' '.join(cmd)
28 print(cmd_str, file=sys.stderr)
29 rc = subprocess.call(cmd)
31 print('terminated by signal', - rc, file=sys.stderr)
34 print('returned exit status', rc, file=sys.stderr)
37 print('subprocess call failed:', e, file=sys.stderr)
42 assert(sys.platform.startswith('linux'))
43 # Set up testfs with user_xattr, etc.
44 if subprocess.call([b'modprobe', b'loop']) != 0:
46 subprocess.call([b'umount', b'testfs'])
47 ex(b'dd', b'if=/dev/zero', b'of=testfs.img', b'bs=1M', b'count=32')
48 ex(b'mke2fs', b'-F', b'-j', b'-m', b'0', b'testfs.img')
49 ex(b'rm', b'-rf', b'testfs')
51 ex(b'mount', b'-o', b'loop,acl,user_xattr', b'testfs.img', b'testfs')
52 # Hide, so that tests can't create risks.
53 os.chown(b'testfs', 0, 0)
54 os.chmod(b'testfs', 0o700)
59 subprocess.call([b'umount', b'testfs'])
60 helpers.unlink(b'testfs.img')
63 def test_clean_up_archive_path():
64 cleanup = metadata._clean_up_path_for_archive
65 WVPASSEQ(cleanup(b'foo'), b'foo')
66 WVPASSEQ(cleanup(b'/foo'), b'foo')
67 WVPASSEQ(cleanup(b'///foo'), b'foo')
68 WVPASSEQ(cleanup(b'/foo/bar'), b'foo/bar')
69 WVPASSEQ(cleanup(b'foo/./bar'), b'foo/bar')
70 WVPASSEQ(cleanup(b'/foo/./bar'), b'foo/bar')
71 WVPASSEQ(cleanup(b'/foo/./bar/././baz'), b'foo/bar/baz')
72 WVPASSEQ(cleanup(b'/foo/./bar///././baz'), b'foo/bar/baz')
73 WVPASSEQ(cleanup(b'//./foo/./bar///././baz/.///'), b'foo/bar/baz/')
74 WVPASSEQ(cleanup(b'./foo/./.bar'), b'foo/.bar')
75 WVPASSEQ(cleanup(b'./foo/.'), b'foo')
76 WVPASSEQ(cleanup(b'./foo/..'), b'.')
77 WVPASSEQ(cleanup(b'//./..//.../..//.'), b'.')
78 WVPASSEQ(cleanup(b'//./..//..././/.'), b'...')
79 WVPASSEQ(cleanup(b'/////.'), b'.')
80 WVPASSEQ(cleanup(b'/../'), b'.')
81 WVPASSEQ(cleanup(b''), b'.')
84 def test_risky_path():
85 risky = metadata._risky_path
86 WVPASS(risky(b'/foo'))
87 WVPASS(risky(b'///foo'))
88 WVPASS(risky(b'/../foo'))
89 WVPASS(risky(b'../foo'))
90 WVPASS(risky(b'foo/..'))
91 WVPASS(risky(b'foo/../'))
92 WVPASS(risky(b'foo/../bar'))
94 WVFAIL(risky(b'foo/'))
95 WVFAIL(risky(b'foo///'))
96 WVFAIL(risky(b'./foo'))
97 WVFAIL(risky(b'foo/.'))
98 WVFAIL(risky(b'./foo/.'))
99 WVFAIL(risky(b'foo/bar'))
100 WVFAIL(risky(b'foo/./bar'))
103 def test_clean_up_extract_path():
104 cleanup = metadata._clean_up_extract_path
105 WVPASSEQ(cleanup(b'/foo'), b'foo')
106 WVPASSEQ(cleanup(b'///foo'), b'foo')
107 WVFAIL(cleanup(b'/../foo'))
108 WVFAIL(cleanup(b'../foo'))
109 WVFAIL(cleanup(b'foo/..'))
110 WVFAIL(cleanup(b'foo/../'))
111 WVFAIL(cleanup(b'foo/../bar'))
112 WVPASSEQ(cleanup(b'foo'), b'foo')
113 WVPASSEQ(cleanup(b'foo/'), b'foo/')
114 WVPASSEQ(cleanup(b'foo///'), b'foo///')
115 WVPASSEQ(cleanup(b'./foo'), b'./foo')
116 WVPASSEQ(cleanup(b'foo/.'), b'foo/.')
117 WVPASSEQ(cleanup(b'./foo/.'), b'./foo/.')
118 WVPASSEQ(cleanup(b'foo/bar'), b'foo/bar')
119 WVPASSEQ(cleanup(b'foo/./bar'), b'foo/./bar')
120 WVPASSEQ(cleanup(b'/'), b'.')
121 WVPASSEQ(cleanup(b'./'), b'./')
122 WVPASSEQ(cleanup(b'///foo/bar'), b'foo/bar')
123 WVPASSEQ(cleanup(b'///foo/bar'), b'foo/bar')
126 def test_metadata_method(tmpdir):
127 bup_dir = tmpdir + b'/bup'
128 data_path = tmpdir + b'/foo'
130 ex(b'touch', data_path + b'/file')
131 ex(b'ln', b'-s', b'file', data_path + b'/symlink')
132 test_time1 = 13 * 1000000000
133 test_time2 = 42 * 1000000000
134 utime(data_path + b'/file', (0, test_time1))
135 lutime(data_path + b'/symlink', (0, 0))
136 utime(data_path, (0, test_time2))
137 ex(bup_path, b'-d', bup_dir, b'init')
138 ex(bup_path, b'-d', bup_dir, b'index', b'-v', data_path)
139 ex(bup_path, b'-d', bup_dir, b'save', b'-tvvn', b'test', data_path)
140 git.check_repo_or_die(bup_dir)
142 resolved = vfs.resolve(repo,
143 b'/test/latest' + resolve_parent(data_path),
145 leaf_name, leaf_item = resolved[-1]
147 WVPASS(m.mtime == test_time2)
148 WVPASS(leaf_name == b'foo')
149 contents = tuple(vfs.contents(repo, leaf_item))
150 WVPASS(len(contents) == 3)
151 WVPASSEQ(frozenset(name for name, item in contents),
152 frozenset((b'.', b'file', b'symlink')))
153 for name, item in contents:
156 WVPASS(m.mtime == test_time1)
157 elif name == b'symlink':
159 WVPASSEQ(m.symlink_target, b'file')
165 if helpers.saved_errors:
166 return str(helpers.saved_errors[0])
170 def test_from_path_error(tmpdir):
171 if is_superuser() or detect_fakeroot():
173 path = tmpdir + b'/foo'
175 m = metadata.from_path(path, archive_path=path, save_symlinks=True)
176 WVPASSEQ(m.path, path)
177 os.chmod(path, 0o000)
178 metadata.from_path(path, archive_path=path, save_symlinks=True)
179 if metadata.get_linux_file_attr:
180 print('saved_errors:', helpers.saved_errors, file=sys.stderr)
181 WVPASS(len(helpers.saved_errors) == 1)
182 errmsg = _first_err()
183 WVPASS(errmsg.startswith('read Linux attr'))
187 def _linux_attr_supported(path):
188 # Expects path to denote a regular file or a directory.
189 if not metadata.get_linux_file_attr:
192 metadata.get_linux_file_attr(path)
194 if e.errno in (errno.ENOTTY, errno.ENOSYS, errno.EOPNOTSUPP):
201 def test_apply_to_path_restricted_access(tmpdir):
202 if is_superuser() or detect_fakeroot():
204 if sys.platform.startswith('cygwin'):
205 return # chmod 000 isn't effective.
207 parent = tmpdir + b'/foo'
208 path = parent + b'/bar'
214 metadata.xattr.set(path, b'user.buptest', b'bup')
216 print("failed to set test xattr")
217 # ignore any failures here - maybe FS cannot do it
219 m = metadata.from_path(path, archive_path=path, save_symlinks=True)
220 WVPASSEQ(m.path, path)
221 os.chmod(parent, 0o000)
222 m.apply_to_path(path)
223 print(b'saved_errors:', helpers.saved_errors, file=sys.stderr)
224 expected_errors = ['utime: ']
225 if m.linux_attr and _linux_attr_supported(tmpdir):
226 expected_errors.append('Linux chattr: ')
227 if metadata.xattr and m.linux_xattr:
228 expected_errors.append("xattr.set ")
229 WVPASS(len(helpers.saved_errors) == len(expected_errors))
230 for i in range(len(expected_errors)):
231 assert str(helpers.saved_errors[i]).startswith(expected_errors[i])
236 def test_restore_over_existing_target(tmpdir):
237 path = tmpdir + b'/foo'
239 dir_m = metadata.from_path(path, archive_path=path, save_symlinks=True)
241 open(path, 'w').close()
242 file_m = metadata.from_path(path, archive_path=path, save_symlinks=True)
243 # Restore dir over file.
244 WVPASSEQ(dir_m.create_path(path, create_symlinks=True), None)
245 WVPASS(stat.S_ISDIR(os.stat(path).st_mode))
246 # Restore dir over dir.
247 WVPASSEQ(dir_m.create_path(path, create_symlinks=True), None)
248 WVPASS(stat.S_ISDIR(os.stat(path).st_mode))
249 # Restore file over dir.
250 WVPASSEQ(file_m.create_path(path, create_symlinks=True), None)
251 WVPASS(stat.S_ISREG(os.stat(path).st_mode))
252 # Restore file over file.
253 WVPASSEQ(file_m.create_path(path, create_symlinks=True), None)
254 WVPASS(stat.S_ISREG(os.stat(path).st_mode))
255 # Restore file over non-empty dir.
258 open(path + b'/bar', 'w').close()
259 WVEXCEPT(Exception, file_m.create_path, path, create_symlinks=True)
260 # Restore dir over non-empty dir.
261 os.remove(path + b'/bar')
262 os.mkdir(path + b'/bar')
263 WVEXCEPT(Exception, dir_m.create_path, path, create_symlinks=True)
266 from bup.metadata import read_acl
268 from bup.metadata import xattr
270 def remove_selinux(attrs):
271 return list(filter(lambda i: not i in (b'security.selinux', ),
274 def test_handling_of_incorrect_existing_linux_xattrs():
275 if not is_superuser() or detect_fakeroot():
276 pytest.skip('skipping test -- not superuser')
278 if not setup_testfs():
279 pytest.skip('unable to load loop module; skipping dependent tests')
281 for f in glob.glob(b'testfs/*'):
284 open(path, 'w').close()
285 xattr.set(path, b'foo', b'bar', namespace=xattr.NS_USER)
286 m = metadata.from_path(path, archive_path=path, save_symlinks=True)
287 xattr.set(path, b'baz', b'bax', namespace=xattr.NS_USER)
288 m.apply_to_path(path, restore_numeric_ids=False)
289 WVPASSEQ(remove_selinux(xattr.list(path)), [b'user.foo'])
290 WVPASSEQ(xattr.get(path, b'user.foo'), b'bar')
291 xattr.set(path, b'foo', b'baz', namespace=xattr.NS_USER)
292 m.apply_to_path(path, restore_numeric_ids=False)
293 WVPASSEQ(remove_selinux(xattr.list(path)), [b'user.foo'])
294 WVPASSEQ(xattr.get(path, b'user.foo'), b'bar')
295 xattr.remove(path, b'foo', namespace=xattr.NS_USER)
296 m.apply_to_path(path, restore_numeric_ids=False)
297 WVPASSEQ(remove_selinux(xattr.list(path)), [b'user.foo'])
298 WVPASSEQ(xattr.get(path, b'user.foo'), b'bar')