]> arthur.barton.de Git - bup.git/blob - lib/bup/t/tmetadata.py
Don't touch correct target xattrs; remove inappropriate target xattrs.
[bup.git] / lib / bup / t / tmetadata.py
1 import grp
2 import pwd
3 import stat
4 import subprocess
5 import tempfile
6 import xattr
7 import bup.helpers as helpers
8 from bup import metadata
9 from bup.helpers import clear_errors, detect_fakeroot
10 from wvtest import *
11
12
13 top_dir = os.getcwd()
14
15
16 def ex(*cmd):
17     try:
18         cmd_str = ' '.join(cmd)
19         print >> sys.stderr, cmd_str
20         rc = subprocess.call(cmd)
21         if rc < 0:
22             print >> sys.stderr, 'terminated by signal', - rc
23             sys.exit(1)
24         elif rc > 0:
25             print >> sys.stderr, 'returned exit status', rc
26             sys.exit(1)
27     except OSError, e:
28         print >> sys.stderr, 'subprocess call failed:', e
29         sys.exit(1)
30
31
32 def setup_testfs():
33     # Set up testfs with user_xattr, etc.
34     subprocess.call(['umount', 'testfs'])
35     ex('dd', 'if=/dev/zero', 'of=testfs.img', 'bs=1M', 'count=32')
36     ex('mke2fs', '-F', '-j', '-m', '0', 'testfs.img')
37     ex('rm', '-rf', 'testfs')
38     os.mkdir('testfs')
39     ex('mount', '-o', 'loop,acl,user_xattr', 'testfs.img', 'testfs')
40     # Hide, so that tests can't create risks.
41     ex('chown', 'root:root', 'testfs')
42     os.chmod('testfs', 0700)
43
44
45 def cleanup_testfs():
46     subprocess.call(['umount', 'testfs'])
47     subprocess.call(['rm', '-f', 'testfs.img'])
48
49
50 @wvtest
51 def test_clean_up_archive_path():
52     cleanup = metadata._clean_up_path_for_archive
53     WVPASSEQ(cleanup('foo'), 'foo')
54     WVPASSEQ(cleanup('/foo'), 'foo')
55     WVPASSEQ(cleanup('///foo'), 'foo')
56     WVPASSEQ(cleanup('/foo/bar'), 'foo/bar')
57     WVPASSEQ(cleanup('foo/./bar'), 'foo/bar')
58     WVPASSEQ(cleanup('/foo/./bar'), 'foo/bar')
59     WVPASSEQ(cleanup('/foo/./bar/././baz'), 'foo/bar/baz')
60     WVPASSEQ(cleanup('/foo/./bar///././baz'), 'foo/bar/baz')
61     WVPASSEQ(cleanup('//./foo/./bar///././baz/.///'), 'foo/bar/baz/')
62     WVPASSEQ(cleanup('./foo/./.bar'), 'foo/.bar')
63     WVPASSEQ(cleanup('./foo/.'), 'foo')
64     WVPASSEQ(cleanup('./foo/..'), '.')
65     WVPASSEQ(cleanup('//./..//.../..//.'), '.')
66     WVPASSEQ(cleanup('//./..//..././/.'), '...')
67     WVPASSEQ(cleanup('/////.'), '.')
68     WVPASSEQ(cleanup('/../'), '.')
69     WVPASSEQ(cleanup(''), '.')
70
71
72 @wvtest
73 def test_risky_path():
74     risky = metadata._risky_path
75     WVPASS(risky('/foo'))
76     WVPASS(risky('///foo'))
77     WVPASS(risky('/../foo'))
78     WVPASS(risky('../foo'))
79     WVPASS(risky('foo/..'))
80     WVPASS(risky('foo/../'))
81     WVPASS(risky('foo/../bar'))
82     WVFAIL(risky('foo'))
83     WVFAIL(risky('foo/'))
84     WVFAIL(risky('foo///'))
85     WVFAIL(risky('./foo'))
86     WVFAIL(risky('foo/.'))
87     WVFAIL(risky('./foo/.'))
88     WVFAIL(risky('foo/bar'))
89     WVFAIL(risky('foo/./bar'))
90
91
92 @wvtest
93 def test_clean_up_extract_path():
94     cleanup = metadata._clean_up_extract_path
95     WVPASSEQ(cleanup('/foo'), 'foo')
96     WVPASSEQ(cleanup('///foo'), 'foo')
97     WVFAIL(cleanup('/../foo'))
98     WVFAIL(cleanup('../foo'))
99     WVFAIL(cleanup('foo/..'))
100     WVFAIL(cleanup('foo/../'))
101     WVFAIL(cleanup('foo/../bar'))
102     WVPASSEQ(cleanup('foo'), 'foo')
103     WVPASSEQ(cleanup('foo/'), 'foo/')
104     WVPASSEQ(cleanup('foo///'), 'foo///')
105     WVPASSEQ(cleanup('./foo'), './foo')
106     WVPASSEQ(cleanup('foo/.'), 'foo/.')
107     WVPASSEQ(cleanup('./foo/.'), './foo/.')
108     WVPASSEQ(cleanup('foo/bar'), 'foo/bar')
109     WVPASSEQ(cleanup('foo/./bar'), 'foo/./bar')
110     WVPASSEQ(cleanup('/'), '.')
111     WVPASSEQ(cleanup('./'), './')
112     WVPASSEQ(cleanup('///foo/bar'), 'foo/bar')
113     WVPASSEQ(cleanup('///foo/bar'), 'foo/bar')
114
115
116 @wvtest
117 def test_from_path_error():
118     if os.geteuid() == 0 or detect_fakeroot():
119         return
120     tmpdir = tempfile.mkdtemp(prefix='bup-tmetadata-')
121     try:
122         path = tmpdir + '/foo'
123         subprocess.call(['mkdir', path])
124         m = metadata.from_path(path, archive_path=path, save_symlinks=True)
125         WVPASSEQ(m.path, path)
126         subprocess.call(['chmod', '000', path])
127         metadata.from_path(path, archive_path=path, save_symlinks=True)
128         errmsg = helpers.saved_errors[0] if helpers.saved_errors else ''
129         WVPASS(errmsg.startswith('read Linux attr'))
130         clear_errors()
131     finally:
132         subprocess.call(['rm', '-rf', tmpdir])
133
134
135 @wvtest
136 def test_apply_to_path_restricted_access():
137     if os.geteuid() == 0 or detect_fakeroot():
138         return
139     tmpdir = tempfile.mkdtemp(prefix='bup-tmetadata-')
140     try:
141         path = tmpdir + '/foo'
142         subprocess.call(['mkdir', path])
143         clear_errors()
144         m = metadata.from_path(path, archive_path=path, save_symlinks=True)
145         WVPASSEQ(m.path, path)
146         subprocess.call(['chmod', '000', tmpdir])
147         m.apply_to_path(path)
148         errmsg = str(helpers.saved_errors[0]) if helpers.saved_errors else ''
149         WVPASS(errmsg.startswith('utime: '))
150         clear_errors()
151     finally:
152         subprocess.call(['rm', '-rf', tmpdir])
153
154
155 @wvtest
156 def test_restore_restricted_user_group():
157     if os.geteuid() == 0 or detect_fakeroot():
158         return
159     tmpdir = tempfile.mkdtemp(prefix='bup-tmetadata-')
160     try:
161         path = tmpdir + '/foo'
162         subprocess.call(['mkdir', path])
163         m = metadata.from_path(path, archive_path=path, save_symlinks=True)
164         WVPASSEQ(m.path, path)
165         WVPASSEQ(m.apply_to_path(path), None)
166         orig_uid = m.uid
167         m.uid = 0;
168         m.apply_to_path(path, restore_numeric_ids=True)
169         errmsg = str(helpers.saved_errors[0]) if helpers.saved_errors else ''
170         WVPASS(errmsg.startswith('lchown: '))
171         clear_errors()
172         m.uid = orig_uid
173         m.gid = 0;
174         m.apply_to_path(path, restore_numeric_ids=True)
175         errmsg = str(helpers.saved_errors[0]) if helpers.saved_errors else ''
176         WVPASS(errmsg.startswith('lchown: '))
177         clear_errors()
178     finally:
179         subprocess.call(['rm', '-rf', tmpdir])
180
181
182 @wvtest
183 def test_restore_nonexistent_user_group():
184     tmpdir = tempfile.mkdtemp(prefix='bup-tmetadata-')
185     try:
186         path = tmpdir + '/foo'
187         subprocess.call(['mkdir', path])
188         m = metadata.from_path(path, archive_path=path, save_symlinks=True)
189         WVPASSEQ(m.path, path)
190         m.owner = max([x.pw_name for x in pwd.getpwall()], key=len) + 'x'
191         m.group = max([x.gr_name for x in grp.getgrall()], key=len) + 'x'
192         WVPASSEQ(m.apply_to_path(path, restore_numeric_ids=True), None)
193         WVPASSEQ(os.stat(path).st_uid, m.uid)
194         WVPASSEQ(os.stat(path).st_gid, m.gid)
195         WVPASSEQ(m.apply_to_path(path, restore_numeric_ids=False), None)
196         WVPASSEQ(os.stat(path).st_uid, os.geteuid())
197         WVPASSEQ(os.stat(path).st_uid, os.getgid())
198     finally:
199         subprocess.call(['rm', '-rf', tmpdir])
200
201
202 @wvtest
203 def test_restore_over_existing_target():
204     tmpdir = tempfile.mkdtemp(prefix='bup-tmetadata-')
205     try:
206         path = tmpdir + '/foo'
207         os.mkdir(path)
208         dir_m = metadata.from_path(path, archive_path=path, save_symlinks=True)
209         os.rmdir(path)
210         open(path, 'w').close()
211         file_m = metadata.from_path(path, archive_path=path, save_symlinks=True)
212         # Restore dir over file.
213         WVPASSEQ(dir_m.create_path(path, create_symlinks=True), None)
214         WVPASS(stat.S_ISDIR(os.stat(path).st_mode))
215         # Restore dir over dir.
216         WVPASSEQ(dir_m.create_path(path, create_symlinks=True), None)
217         WVPASS(stat.S_ISDIR(os.stat(path).st_mode))
218         # Restore file over dir.
219         WVPASSEQ(file_m.create_path(path, create_symlinks=True), None)
220         WVPASS(stat.S_ISREG(os.stat(path).st_mode))
221         # Restore file over file.
222         WVPASSEQ(file_m.create_path(path, create_symlinks=True), None)
223         WVPASS(stat.S_ISREG(os.stat(path).st_mode))
224         # Restore file over non-empty dir.
225         os.remove(path)
226         os.mkdir(path)
227         open(path + '/bar', 'w').close()
228         WVEXCEPT(Exception, file_m.create_path, path, create_symlinks=True)
229         # Restore dir over non-empty dir.
230         os.remove(path + '/bar')
231         os.mkdir(path + '/bar')
232         WVEXCEPT(Exception, dir_m.create_path, path, create_symlinks=True)
233     finally:
234         subprocess.call(['rm', '-rf', tmpdir])
235
236
237 @wvtest
238 def test_handling_of_incorrect_existing_linux_xattrs():
239     if os.geteuid() != 0 or detect_fakeroot():
240         return
241     setup_testfs()
242     subprocess.check_call('rm -rf testfs/*', shell=True)
243     path = 'testfs/foo'
244     open(path, 'w').close()
245     xattr.set(path, 'foo', 'bar', namespace=xattr.NS_USER)
246     m = metadata.from_path(path, archive_path=path, save_symlinks=True)
247     xattr.set(path, 'baz', 'bax', namespace=xattr.NS_USER)
248     m.apply_to_path(path, restore_numeric_ids=False)
249     WVPASSEQ(xattr.list(path), ['user.foo'])
250     WVPASSEQ(xattr.get(path, 'user.foo'), 'bar')
251     xattr.set(path, 'foo', 'baz', namespace=xattr.NS_USER)
252     m.apply_to_path(path, restore_numeric_ids=False)
253     WVPASSEQ(xattr.list(path), ['user.foo'])
254     WVPASSEQ(xattr.get(path, 'user.foo'), 'bar')
255     xattr.remove(path, 'foo', namespace=xattr.NS_USER)
256     m.apply_to_path(path, restore_numeric_ids=False)
257     WVPASSEQ(xattr.list(path), ['user.foo'])
258     WVPASSEQ(xattr.get(path, 'user.foo'), 'bar')
259     os.chdir(top_dir)
260     cleanup_testfs()