]> arthur.barton.de Git - bup.git/blob - test/int/test_metadata.py
62d15c9dde61c970465ca12b94663c5c7132245e
[bup.git] / test / int / test_metadata.py
1
2 from __future__ import absolute_import, print_function
3 import errno, glob, grp, pwd, stat, tempfile, subprocess
4 import os, sys
5 import pytest
6
7 from wvpytest import *
8
9 from bup import git, metadata
10 from bup import vfs
11 from bup.compat import range
12 from bup.helpers import clear_errors, detect_fakeroot, is_superuser, resolve_parent
13 from bup.repo import LocalRepo
14 from bup.xstat import utime, lutime
15 import bup.helpers as helpers
16 from bup.compat import fsencode
17
18 lib_t_dir = os.path.dirname(fsencode(__file__))
19
20 top_dir = os.path.realpath(os.path.join(lib_t_dir, b'..', b'..'))
21
22 bup_path = top_dir + b'/bup'
23
24
25 def ex(*cmd):
26     try:
27         cmd_str = b' '.join(cmd)
28         print(cmd_str, file=sys.stderr)
29         rc = subprocess.call(cmd)
30         if rc < 0:
31             print('terminated by signal', - rc, file=sys.stderr)
32             sys.exit(1)
33         elif rc > 0:
34             print('returned exit status', rc, file=sys.stderr)
35             sys.exit(1)
36     except OSError as e:
37         print('subprocess call failed:', e, file=sys.stderr)
38         sys.exit(1)
39
40
41 def setup_testfs():
42     assert(sys.platform.startswith('linux'))
43     # Set up testfs with user_xattr, etc.
44     if subprocess.call([b'modprobe', b'loop']) != 0:
45         return False
46     subprocess.call([b'umount', b'testfs'])
47     ex(b'dd', b'if=/dev/zero', b'of=testfs.img', b'bs=1M', b'count=32')
48     ex(b'mke2fs', b'-F', b'-j', b'-m', b'0', b'testfs.img')
49     ex(b'rm', b'-rf', b'testfs')
50     os.mkdir(b'testfs')
51     ex(b'mount', b'-o', b'loop,acl,user_xattr', b'testfs.img', b'testfs')
52     # Hide, so that tests can't create risks.
53     os.chown(b'testfs', 0, 0)
54     os.chmod(b'testfs', 0o700)
55     return True
56
57
58 def cleanup_testfs():
59     subprocess.call([b'umount', b'testfs'])
60     helpers.unlink(b'testfs.img')
61
62
63 def test_clean_up_archive_path():
64     cleanup = metadata._clean_up_path_for_archive
65     WVPASSEQ(cleanup(b'foo'), b'foo')
66     WVPASSEQ(cleanup(b'/foo'), b'foo')
67     WVPASSEQ(cleanup(b'///foo'), b'foo')
68     WVPASSEQ(cleanup(b'/foo/bar'), b'foo/bar')
69     WVPASSEQ(cleanup(b'foo/./bar'), b'foo/bar')
70     WVPASSEQ(cleanup(b'/foo/./bar'), b'foo/bar')
71     WVPASSEQ(cleanup(b'/foo/./bar/././baz'), b'foo/bar/baz')
72     WVPASSEQ(cleanup(b'/foo/./bar///././baz'), b'foo/bar/baz')
73     WVPASSEQ(cleanup(b'//./foo/./bar///././baz/.///'), b'foo/bar/baz/')
74     WVPASSEQ(cleanup(b'./foo/./.bar'), b'foo/.bar')
75     WVPASSEQ(cleanup(b'./foo/.'), b'foo')
76     WVPASSEQ(cleanup(b'./foo/..'), b'.')
77     WVPASSEQ(cleanup(b'//./..//.../..//.'), b'.')
78     WVPASSEQ(cleanup(b'//./..//..././/.'), b'...')
79     WVPASSEQ(cleanup(b'/////.'), b'.')
80     WVPASSEQ(cleanup(b'/../'), b'.')
81     WVPASSEQ(cleanup(b''), b'.')
82
83
84 def test_risky_path():
85     risky = metadata._risky_path
86     WVPASS(risky(b'/foo'))
87     WVPASS(risky(b'///foo'))
88     WVPASS(risky(b'/../foo'))
89     WVPASS(risky(b'../foo'))
90     WVPASS(risky(b'foo/..'))
91     WVPASS(risky(b'foo/../'))
92     WVPASS(risky(b'foo/../bar'))
93     WVFAIL(risky(b'foo'))
94     WVFAIL(risky(b'foo/'))
95     WVFAIL(risky(b'foo///'))
96     WVFAIL(risky(b'./foo'))
97     WVFAIL(risky(b'foo/.'))
98     WVFAIL(risky(b'./foo/.'))
99     WVFAIL(risky(b'foo/bar'))
100     WVFAIL(risky(b'foo/./bar'))
101
102
103 def test_clean_up_extract_path():
104     cleanup = metadata._clean_up_extract_path
105     WVPASSEQ(cleanup(b'/foo'), b'foo')
106     WVPASSEQ(cleanup(b'///foo'), b'foo')
107     WVFAIL(cleanup(b'/../foo'))
108     WVFAIL(cleanup(b'../foo'))
109     WVFAIL(cleanup(b'foo/..'))
110     WVFAIL(cleanup(b'foo/../'))
111     WVFAIL(cleanup(b'foo/../bar'))
112     WVPASSEQ(cleanup(b'foo'), b'foo')
113     WVPASSEQ(cleanup(b'foo/'), b'foo/')
114     WVPASSEQ(cleanup(b'foo///'), b'foo///')
115     WVPASSEQ(cleanup(b'./foo'), b'./foo')
116     WVPASSEQ(cleanup(b'foo/.'), b'foo/.')
117     WVPASSEQ(cleanup(b'./foo/.'), b'./foo/.')
118     WVPASSEQ(cleanup(b'foo/bar'), b'foo/bar')
119     WVPASSEQ(cleanup(b'foo/./bar'), b'foo/./bar')
120     WVPASSEQ(cleanup(b'/'), b'.')
121     WVPASSEQ(cleanup(b'./'), b'./')
122     WVPASSEQ(cleanup(b'///foo/bar'), b'foo/bar')
123     WVPASSEQ(cleanup(b'///foo/bar'), b'foo/bar')
124
125
126 def test_metadata_method(tmpdir):
127     bup_dir = tmpdir + b'/bup'
128     data_path = tmpdir + b'/foo'
129     os.mkdir(data_path)
130     ex(b'touch', data_path + b'/file')
131     ex(b'ln', b'-s', b'file', data_path + b'/symlink')
132     test_time1 = 13 * 1000000000
133     test_time2 = 42 * 1000000000
134     utime(data_path + b'/file', (0, test_time1))
135     lutime(data_path + b'/symlink', (0, 0))
136     utime(data_path, (0, test_time2))
137     ex(bup_path, b'-d', bup_dir, b'init')
138     ex(bup_path, b'-d', bup_dir, b'index', b'-v', data_path)
139     ex(bup_path, b'-d', bup_dir, b'save', b'-tvvn', b'test', data_path)
140     git.check_repo_or_die(bup_dir)
141     repo = LocalRepo()
142     resolved = vfs.resolve(repo,
143                            b'/test/latest' + resolve_parent(data_path),
144                            follow=False)
145     leaf_name, leaf_item = resolved[-1]
146     m = leaf_item.meta
147     WVPASS(m.mtime == test_time2)
148     WVPASS(leaf_name == b'foo')
149     contents = tuple(vfs.contents(repo, leaf_item))
150     WVPASS(len(contents) == 3)
151     WVPASSEQ(frozenset(name for name, item in contents),
152              frozenset((b'.', b'file', b'symlink')))
153     for name, item in contents:
154         if name == b'file':
155             m = item.meta
156             WVPASS(m.mtime == test_time1)
157         elif name == b'symlink':
158             m = item.meta
159             WVPASSEQ(m.symlink_target, b'file')
160             WVPASSEQ(m.size, 4)
161             WVPASSEQ(m.mtime, 0)
162
163
164 def _first_err():
165     if helpers.saved_errors:
166         return str(helpers.saved_errors[0])
167     return ''
168
169
170 def test_from_path_error(tmpdir):
171     if is_superuser() or detect_fakeroot():
172         return
173     path = tmpdir + b'/foo'
174     os.mkdir(path)
175     m = metadata.from_path(path, archive_path=path, save_symlinks=True)
176     WVPASSEQ(m.path, path)
177     os.chmod(path, 0o000)
178     metadata.from_path(path, archive_path=path, save_symlinks=True)
179     if metadata.get_linux_file_attr:
180         print('saved_errors:', helpers.saved_errors, file=sys.stderr)
181         WVPASS(len(helpers.saved_errors) == 1)
182         errmsg = _first_err()
183         WVPASS(errmsg.startswith('read Linux attr'))
184         clear_errors()
185
186
187 def _linux_attr_supported(path):
188     # Expects path to denote a regular file or a directory.
189     if not metadata.get_linux_file_attr:
190         return False
191     try:
192         metadata.get_linux_file_attr(path)
193     except OSError as e:
194         if e.errno in (errno.ENOTTY, errno.ENOSYS, errno.EOPNOTSUPP):
195             return False
196         else:
197             raise
198     return True
199
200
201 def test_apply_to_path_restricted_access(tmpdir):
202     if is_superuser() or detect_fakeroot():
203         return
204     if sys.platform.startswith('cygwin'):
205         return # chmod 000 isn't effective.
206     try:
207         parent = tmpdir + b'/foo'
208         path = parent + b'/bar'
209         os.mkdir(parent)
210         os.mkdir(path)
211         clear_errors()
212         if metadata.xattr:
213             try:
214                 metadata.xattr.set(path, b'user.buptest', b'bup')
215             except:
216                 print("failed to set test xattr")
217                 # ignore any failures here - maybe FS cannot do it
218                 pass
219         m = metadata.from_path(path, archive_path=path, save_symlinks=True)
220         WVPASSEQ(m.path, path)
221         os.chmod(parent, 0o000)
222         m.apply_to_path(path)
223         print(b'saved_errors:', helpers.saved_errors, file=sys.stderr)
224         expected_errors = ['utime: ']
225         if m.linux_attr and _linux_attr_supported(tmpdir):
226             expected_errors.append('Linux chattr: ')
227         if metadata.xattr and m.linux_xattr:
228             expected_errors.append("xattr.set ")
229         WVPASS(len(helpers.saved_errors) == len(expected_errors))
230         for i in range(len(expected_errors)):
231             assert str(helpers.saved_errors[i]).startswith(expected_errors[i])
232     finally:
233         clear_errors()
234
235
236 def test_restore_over_existing_target(tmpdir):
237     path = tmpdir + b'/foo'
238     os.mkdir(path)
239     dir_m = metadata.from_path(path, archive_path=path, save_symlinks=True)
240     os.rmdir(path)
241     open(path, 'w').close()
242     file_m = metadata.from_path(path, archive_path=path, save_symlinks=True)
243     # Restore dir over file.
244     WVPASSEQ(dir_m.create_path(path, create_symlinks=True), None)
245     WVPASS(stat.S_ISDIR(os.stat(path).st_mode))
246     # Restore dir over dir.
247     WVPASSEQ(dir_m.create_path(path, create_symlinks=True), None)
248     WVPASS(stat.S_ISDIR(os.stat(path).st_mode))
249     # Restore file over dir.
250     WVPASSEQ(file_m.create_path(path, create_symlinks=True), None)
251     WVPASS(stat.S_ISREG(os.stat(path).st_mode))
252     # Restore file over file.
253     WVPASSEQ(file_m.create_path(path, create_symlinks=True), None)
254     WVPASS(stat.S_ISREG(os.stat(path).st_mode))
255     # Restore file over non-empty dir.
256     os.remove(path)
257     os.mkdir(path)
258     open(path + b'/bar', 'w').close()
259     WVEXCEPT(Exception, file_m.create_path, path, create_symlinks=True)
260     # Restore dir over non-empty dir.
261     os.remove(path + b'/bar')
262     os.mkdir(path + b'/bar')
263     WVEXCEPT(Exception, dir_m.create_path, path, create_symlinks=True)
264
265
266 from bup.metadata import read_acl
267
268 from bup.metadata import xattr
269 if xattr:
270     def remove_selinux(attrs):
271         return list(filter(lambda i: not i in (b'security.selinux', ),
272                            attrs))
273
274     def test_handling_of_incorrect_existing_linux_xattrs():
275         if not is_superuser() or detect_fakeroot():
276             pytest.skip('skipping test -- not superuser')
277             return
278         if not setup_testfs():
279             pytest.skip('unable to load loop module; skipping dependent tests')
280             return
281         for f in glob.glob(b'testfs/*'):
282             ex(b'rm', b'-rf', f)
283         path = b'testfs/foo'
284         open(path, 'w').close()
285         xattr.set(path, b'foo', b'bar', namespace=xattr.NS_USER)
286         m = metadata.from_path(path, archive_path=path, save_symlinks=True)
287         xattr.set(path, b'baz', b'bax', namespace=xattr.NS_USER)
288         m.apply_to_path(path, restore_numeric_ids=False)
289         WVPASSEQ(remove_selinux(xattr.list(path)), [b'user.foo'])
290         WVPASSEQ(xattr.get(path, b'user.foo'), b'bar')
291         xattr.set(path, b'foo', b'baz', namespace=xattr.NS_USER)
292         m.apply_to_path(path, restore_numeric_ids=False)
293         WVPASSEQ(remove_selinux(xattr.list(path)), [b'user.foo'])
294         WVPASSEQ(xattr.get(path, b'user.foo'), b'bar')
295         xattr.remove(path, b'foo', namespace=xattr.NS_USER)
296         m.apply_to_path(path, restore_numeric_ids=False)
297         WVPASSEQ(remove_selinux(xattr.list(path)), [b'user.foo'])
298         WVPASSEQ(xattr.get(path, b'user.foo'), b'bar')
299         cleanup_testfs()