7 import bup.helpers as helpers
8 from bup import metadata
9 from bup.helpers import clear_errors, detect_fakeroot
18 cmd_str = ' '.join(cmd)
19 print >> sys.stderr, cmd_str
20 rc = subprocess.call(cmd)
22 print >> sys.stderr, 'terminated by signal', - rc
25 print >> sys.stderr, 'returned exit status', rc
28 print >> sys.stderr, 'subprocess call failed:', e
33 # Set up testfs with user_xattr, etc.
34 subprocess.call(['umount', 'testfs'])
35 ex('dd', 'if=/dev/zero', 'of=testfs.img', 'bs=1M', 'count=32')
36 ex('mke2fs', '-F', '-j', '-m', '0', 'testfs.img')
37 ex('rm', '-rf', 'testfs')
39 ex('mount', '-o', 'loop,acl,user_xattr', 'testfs.img', 'testfs')
40 # Hide, so that tests can't create risks.
41 ex('chown', 'root:root', 'testfs')
42 os.chmod('testfs', 0700)
46 subprocess.call(['umount', 'testfs'])
47 subprocess.call(['rm', '-f', 'testfs.img'])
51 def test_clean_up_archive_path():
52 cleanup = metadata._clean_up_path_for_archive
53 WVPASSEQ(cleanup('foo'), 'foo')
54 WVPASSEQ(cleanup('/foo'), 'foo')
55 WVPASSEQ(cleanup('///foo'), 'foo')
56 WVPASSEQ(cleanup('/foo/bar'), 'foo/bar')
57 WVPASSEQ(cleanup('foo/./bar'), 'foo/bar')
58 WVPASSEQ(cleanup('/foo/./bar'), 'foo/bar')
59 WVPASSEQ(cleanup('/foo/./bar/././baz'), 'foo/bar/baz')
60 WVPASSEQ(cleanup('/foo/./bar///././baz'), 'foo/bar/baz')
61 WVPASSEQ(cleanup('//./foo/./bar///././baz/.///'), 'foo/bar/baz/')
62 WVPASSEQ(cleanup('./foo/./.bar'), 'foo/.bar')
63 WVPASSEQ(cleanup('./foo/.'), 'foo')
64 WVPASSEQ(cleanup('./foo/..'), '.')
65 WVPASSEQ(cleanup('//./..//.../..//.'), '.')
66 WVPASSEQ(cleanup('//./..//..././/.'), '...')
67 WVPASSEQ(cleanup('/////.'), '.')
68 WVPASSEQ(cleanup('/../'), '.')
69 WVPASSEQ(cleanup(''), '.')
73 def test_risky_path():
74 risky = metadata._risky_path
76 WVPASS(risky('///foo'))
77 WVPASS(risky('/../foo'))
78 WVPASS(risky('../foo'))
79 WVPASS(risky('foo/..'))
80 WVPASS(risky('foo/../'))
81 WVPASS(risky('foo/../bar'))
84 WVFAIL(risky('foo///'))
85 WVFAIL(risky('./foo'))
86 WVFAIL(risky('foo/.'))
87 WVFAIL(risky('./foo/.'))
88 WVFAIL(risky('foo/bar'))
89 WVFAIL(risky('foo/./bar'))
93 def test_clean_up_extract_path():
94 cleanup = metadata._clean_up_extract_path
95 WVPASSEQ(cleanup('/foo'), 'foo')
96 WVPASSEQ(cleanup('///foo'), 'foo')
97 WVFAIL(cleanup('/../foo'))
98 WVFAIL(cleanup('../foo'))
99 WVFAIL(cleanup('foo/..'))
100 WVFAIL(cleanup('foo/../'))
101 WVFAIL(cleanup('foo/../bar'))
102 WVPASSEQ(cleanup('foo'), 'foo')
103 WVPASSEQ(cleanup('foo/'), 'foo/')
104 WVPASSEQ(cleanup('foo///'), 'foo///')
105 WVPASSEQ(cleanup('./foo'), './foo')
106 WVPASSEQ(cleanup('foo/.'), 'foo/.')
107 WVPASSEQ(cleanup('./foo/.'), './foo/.')
108 WVPASSEQ(cleanup('foo/bar'), 'foo/bar')
109 WVPASSEQ(cleanup('foo/./bar'), 'foo/./bar')
110 WVPASSEQ(cleanup('/'), '.')
111 WVPASSEQ(cleanup('./'), './')
112 WVPASSEQ(cleanup('///foo/bar'), 'foo/bar')
113 WVPASSEQ(cleanup('///foo/bar'), 'foo/bar')
117 def test_from_path_error():
118 if os.geteuid() == 0 or detect_fakeroot():
120 tmpdir = tempfile.mkdtemp(prefix='bup-tmetadata-')
122 path = tmpdir + '/foo'
123 subprocess.call(['mkdir', path])
124 m = metadata.from_path(path, archive_path=path, save_symlinks=True)
125 WVPASSEQ(m.path, path)
126 subprocess.call(['chmod', '000', path])
127 metadata.from_path(path, archive_path=path, save_symlinks=True)
128 errmsg = helpers.saved_errors[0] if helpers.saved_errors else ''
129 WVPASS(errmsg.startswith('read Linux attr'))
132 subprocess.call(['rm', '-rf', tmpdir])
136 def test_apply_to_path_restricted_access():
137 if os.geteuid() == 0 or detect_fakeroot():
139 tmpdir = tempfile.mkdtemp(prefix='bup-tmetadata-')
141 path = tmpdir + '/foo'
142 subprocess.call(['mkdir', path])
144 m = metadata.from_path(path, archive_path=path, save_symlinks=True)
145 WVPASSEQ(m.path, path)
146 subprocess.call(['chmod', '000', tmpdir])
147 m.apply_to_path(path)
148 errmsg = str(helpers.saved_errors[0]) if helpers.saved_errors else ''
149 WVPASS(errmsg.startswith('utime: '))
152 subprocess.call(['rm', '-rf', tmpdir])
156 def test_restore_restricted_user_group():
157 if os.geteuid() == 0 or detect_fakeroot():
159 tmpdir = tempfile.mkdtemp(prefix='bup-tmetadata-')
161 path = tmpdir + '/foo'
162 subprocess.call(['mkdir', path])
163 m = metadata.from_path(path, archive_path=path, save_symlinks=True)
164 WVPASSEQ(m.path, path)
165 WVPASSEQ(m.apply_to_path(path), None)
168 m.apply_to_path(path, restore_numeric_ids=True)
169 errmsg = str(helpers.saved_errors[0]) if helpers.saved_errors else ''
170 WVPASS(errmsg.startswith('lchown: '))
174 m.apply_to_path(path, restore_numeric_ids=True)
175 errmsg = str(helpers.saved_errors[0]) if helpers.saved_errors else ''
176 WVPASS(errmsg.startswith('lchown: '))
179 subprocess.call(['rm', '-rf', tmpdir])
183 def test_restore_nonexistent_user_group():
184 tmpdir = tempfile.mkdtemp(prefix='bup-tmetadata-')
186 path = tmpdir + '/foo'
187 subprocess.call(['mkdir', path])
188 m = metadata.from_path(path, archive_path=path, save_symlinks=True)
189 WVPASSEQ(m.path, path)
190 m.owner = max([x.pw_name for x in pwd.getpwall()], key=len) + 'x'
191 m.group = max([x.gr_name for x in grp.getgrall()], key=len) + 'x'
192 WVPASSEQ(m.apply_to_path(path, restore_numeric_ids=True), None)
193 WVPASSEQ(os.stat(path).st_uid, m.uid)
194 WVPASSEQ(os.stat(path).st_gid, m.gid)
195 WVPASSEQ(m.apply_to_path(path, restore_numeric_ids=False), None)
196 WVPASSEQ(os.stat(path).st_uid, os.geteuid())
197 WVPASSEQ(os.stat(path).st_gid, os.getgid())
199 subprocess.call(['rm', '-rf', tmpdir])
203 def test_restore_over_existing_target():
204 tmpdir = tempfile.mkdtemp(prefix='bup-tmetadata-')
206 path = tmpdir + '/foo'
208 dir_m = metadata.from_path(path, archive_path=path, save_symlinks=True)
210 open(path, 'w').close()
211 file_m = metadata.from_path(path, archive_path=path, save_symlinks=True)
212 # Restore dir over file.
213 WVPASSEQ(dir_m.create_path(path, create_symlinks=True), None)
214 WVPASS(stat.S_ISDIR(os.stat(path).st_mode))
215 # Restore dir over dir.
216 WVPASSEQ(dir_m.create_path(path, create_symlinks=True), None)
217 WVPASS(stat.S_ISDIR(os.stat(path).st_mode))
218 # Restore file over dir.
219 WVPASSEQ(file_m.create_path(path, create_symlinks=True), None)
220 WVPASS(stat.S_ISREG(os.stat(path).st_mode))
221 # Restore file over file.
222 WVPASSEQ(file_m.create_path(path, create_symlinks=True), None)
223 WVPASS(stat.S_ISREG(os.stat(path).st_mode))
224 # Restore file over non-empty dir.
227 open(path + '/bar', 'w').close()
228 WVEXCEPT(Exception, file_m.create_path, path, create_symlinks=True)
229 # Restore dir over non-empty dir.
230 os.remove(path + '/bar')
231 os.mkdir(path + '/bar')
232 WVEXCEPT(Exception, dir_m.create_path, path, create_symlinks=True)
234 subprocess.call(['rm', '-rf', tmpdir])
238 def test_handling_of_incorrect_existing_linux_xattrs():
239 if os.geteuid() != 0 or detect_fakeroot():
242 subprocess.check_call('rm -rf testfs/*', shell=True)
244 open(path, 'w').close()
245 xattr.set(path, 'foo', 'bar', namespace=xattr.NS_USER)
246 m = metadata.from_path(path, archive_path=path, save_symlinks=True)
247 xattr.set(path, 'baz', 'bax', namespace=xattr.NS_USER)
248 m.apply_to_path(path, restore_numeric_ids=False)
249 WVPASSEQ(xattr.list(path), ['user.foo'])
250 WVPASSEQ(xattr.get(path, 'user.foo'), 'bar')
251 xattr.set(path, 'foo', 'baz', namespace=xattr.NS_USER)
252 m.apply_to_path(path, restore_numeric_ids=False)
253 WVPASSEQ(xattr.list(path), ['user.foo'])
254 WVPASSEQ(xattr.get(path, 'user.foo'), 'bar')
255 xattr.remove(path, 'foo', namespace=xattr.NS_USER)
256 m.apply_to_path(path, restore_numeric_ids=False)
257 WVPASSEQ(xattr.list(path), ['user.foo'])
258 WVPASSEQ(xattr.get(path, 'user.foo'), 'bar')