]> arthur.barton.de Git - bup.git/blob - lib/bup/t/tmetadata.py
Remove MetadataError and make apply error handling finer-grained.
[bup.git] / lib / bup / t / tmetadata.py
1 import grp
2 import pwd
3 import stat
4 import subprocess
5 import tempfile
6 import bup.helpers as helpers
7 from bup import metadata
8 from bup.helpers import clear_errors, detect_fakeroot
9 from wvtest import *
10
11
12 @wvtest
13 def test_clean_up_archive_path():
14     cleanup = metadata._clean_up_path_for_archive
15     WVPASSEQ(cleanup('foo'), 'foo')
16     WVPASSEQ(cleanup('/foo'), 'foo')
17     WVPASSEQ(cleanup('///foo'), 'foo')
18     WVPASSEQ(cleanup('/foo/bar'), 'foo/bar')
19     WVPASSEQ(cleanup('foo/./bar'), 'foo/bar')
20     WVPASSEQ(cleanup('/foo/./bar'), 'foo/bar')
21     WVPASSEQ(cleanup('/foo/./bar/././baz'), 'foo/bar/baz')
22     WVPASSEQ(cleanup('/foo/./bar///././baz'), 'foo/bar/baz')
23     WVPASSEQ(cleanup('//./foo/./bar///././baz/.///'), 'foo/bar/baz/')
24     WVPASSEQ(cleanup('./foo/./.bar'), 'foo/.bar')
25     WVPASSEQ(cleanup('./foo/.'), 'foo')
26     WVPASSEQ(cleanup('./foo/..'), '.')
27     WVPASSEQ(cleanup('//./..//.../..//.'), '.')
28     WVPASSEQ(cleanup('//./..//..././/.'), '...')
29     WVPASSEQ(cleanup('/////.'), '.')
30     WVPASSEQ(cleanup('/../'), '.')
31     WVPASSEQ(cleanup(''), '.')
32
33
34 @wvtest
35 def test_risky_path():
36     risky = metadata._risky_path
37     WVPASS(risky('/foo'))
38     WVPASS(risky('///foo'))
39     WVPASS(risky('/../foo'))
40     WVPASS(risky('../foo'))
41     WVPASS(risky('foo/..'))
42     WVPASS(risky('foo/../'))
43     WVPASS(risky('foo/../bar'))
44     WVFAIL(risky('foo'))
45     WVFAIL(risky('foo/'))
46     WVFAIL(risky('foo///'))
47     WVFAIL(risky('./foo'))
48     WVFAIL(risky('foo/.'))
49     WVFAIL(risky('./foo/.'))
50     WVFAIL(risky('foo/bar'))
51     WVFAIL(risky('foo/./bar'))
52
53
54 @wvtest
55 def test_clean_up_extract_path():
56     cleanup = metadata._clean_up_extract_path
57     WVPASSEQ(cleanup('/foo'), 'foo')
58     WVPASSEQ(cleanup('///foo'), 'foo')
59     WVFAIL(cleanup('/../foo'))
60     WVFAIL(cleanup('../foo'))
61     WVFAIL(cleanup('foo/..'))
62     WVFAIL(cleanup('foo/../'))
63     WVFAIL(cleanup('foo/../bar'))
64     WVPASSEQ(cleanup('foo'), 'foo')
65     WVPASSEQ(cleanup('foo/'), 'foo/')
66     WVPASSEQ(cleanup('foo///'), 'foo///')
67     WVPASSEQ(cleanup('./foo'), './foo')
68     WVPASSEQ(cleanup('foo/.'), 'foo/.')
69     WVPASSEQ(cleanup('./foo/.'), './foo/.')
70     WVPASSEQ(cleanup('foo/bar'), 'foo/bar')
71     WVPASSEQ(cleanup('foo/./bar'), 'foo/./bar')
72     WVPASSEQ(cleanup('/'), '.')
73     WVPASSEQ(cleanup('./'), './')
74     WVPASSEQ(cleanup('///foo/bar'), 'foo/bar')
75     WVPASSEQ(cleanup('///foo/bar'), 'foo/bar')
76
77
78 @wvtest
79 def test_from_path_error():
80     if os.geteuid == 0 or detect_fakeroot():
81         return
82     tmpdir = tempfile.mkdtemp(prefix='bup-tmetadata-')
83     try:
84         path = tmpdir + '/foo'
85         subprocess.call(['mkdir', path])
86         m = metadata.from_path(path, archive_path=path, save_symlinks=True)
87         WVPASSEQ(m.path, path)
88         subprocess.call(['chmod', '000', path])
89         metadata.from_path(path, archive_path=path, save_symlinks=True)
90         errmsg = helpers.saved_errors[0] if helpers.saved_errors else ''
91         WVPASS(errmsg.startswith('bup: unable to read Linux attr'))
92         clear_errors()
93     finally:
94         subprocess.call(['rm', '-rf', tmpdir])
95
96
97 @wvtest
98 def test_apply_to_path_restricted_access():
99     if os.geteuid == 0 or detect_fakeroot():
100         return
101     tmpdir = tempfile.mkdtemp(prefix='bup-tmetadata-')
102     try:
103         path = tmpdir + '/foo'
104         subprocess.call(['mkdir', path])
105         clear_errors()
106         m = metadata.from_path(path, archive_path=path, save_symlinks=True)
107         WVPASSEQ(m.path, path)
108         subprocess.call(['chmod', '000', tmpdir])
109         m.apply_to_path(path)
110         errmsg = str(helpers.saved_errors[0]) if helpers.saved_errors else ''
111         WVPASS(errmsg.startswith('utime: '))
112         clear_errors()
113     finally:
114         subprocess.call(['rm', '-rf', tmpdir])
115
116
117 @wvtest
118 def test_restore_restricted_user_group():
119     if os.geteuid == 0 or detect_fakeroot():
120         return
121     tmpdir = tempfile.mkdtemp(prefix='bup-tmetadata-')
122     try:
123         path = tmpdir + '/foo'
124         subprocess.call(['mkdir', path])
125         m = metadata.from_path(path, archive_path=path, save_symlinks=True)
126         WVPASSEQ(m.path, path)
127         WVPASSEQ(m.apply_to_path(path), None)
128         orig_uid = m.uid
129         m.uid = 0;
130         m.apply_to_path(path, restore_numeric_ids=True)
131         errmsg = str(helpers.saved_errors[0]) if helpers.saved_errors else ''
132         WVPASS(errmsg.startswith('lchown: '))
133         clear_errors()
134         m.uid = orig_uid
135         m.gid = 0;
136         m.apply_to_path(path, restore_numeric_ids=True)
137         errmsg = str(helpers.saved_errors[0]) if helpers.saved_errors else ''
138         WVPASS(errmsg.startswith('lchown: '))
139         clear_errors()
140     finally:
141         subprocess.call(['rm', '-rf', tmpdir])
142
143
144 @wvtest
145 def test_restore_nonexistent_user_group():
146     tmpdir = tempfile.mkdtemp(prefix='bup-tmetadata-')
147     try:
148         path = tmpdir + '/foo'
149         subprocess.call(['mkdir', path])
150         m = metadata.from_path(path, archive_path=path, save_symlinks=True)
151         WVPASSEQ(m.path, path)
152         m.owner = max([x.pw_name for x in pwd.getpwall()], key=len) + 'x'
153         m.group = max([x.gr_name for x in grp.getgrall()], key=len) + 'x'
154         WVPASSEQ(m.apply_to_path(path, restore_numeric_ids=True), None)
155         WVPASSEQ(os.stat(path).st_uid, m.uid)
156         WVPASSEQ(os.stat(path).st_gid, m.gid)
157         WVPASSEQ(m.apply_to_path(path, restore_numeric_ids=False), None)
158         WVPASSEQ(os.stat(path).st_uid, os.geteuid())
159         WVPASSEQ(os.stat(path).st_uid, os.getgid())
160     finally:
161         subprocess.call(['rm', '-rf', tmpdir])
162
163
164 @wvtest
165 def test_restore_over_existing_target():
166     tmpdir = tempfile.mkdtemp(prefix='bup-tmetadata-')
167     try:
168         path = tmpdir + '/foo'
169         os.mkdir(path)
170         dir_m = metadata.from_path(path, archive_path=path, save_symlinks=True)
171         os.rmdir(path)
172         open(path, 'w').close()
173         file_m = metadata.from_path(path, archive_path=path, save_symlinks=True)
174         # Restore dir over file.
175         WVPASSEQ(dir_m.create_path(path, create_symlinks=True), None)
176         WVPASS(stat.S_ISDIR(os.stat(path).st_mode))
177         # Restore dir over dir.
178         WVPASSEQ(dir_m.create_path(path, create_symlinks=True), None)
179         WVPASS(stat.S_ISDIR(os.stat(path).st_mode))
180         # Restore file over dir.
181         WVPASSEQ(file_m.create_path(path, create_symlinks=True), None)
182         WVPASS(stat.S_ISREG(os.stat(path).st_mode))
183         # Restore file over file.
184         WVPASSEQ(file_m.create_path(path, create_symlinks=True), None)
185         WVPASS(stat.S_ISREG(os.stat(path).st_mode))
186         # Restore file over non-empty dir.
187         os.remove(path)
188         os.mkdir(path)
189         open(path + '/bar', 'w').close()
190         WVEXCEPT(Exception, file_m.create_path, path, create_symlinks=True)
191         # Restore dir over non-empty dir.
192         os.remove(path + '/bar')
193         os.mkdir(path + '/bar')
194         WVEXCEPT(Exception, dir_m.create_path, path, create_symlinks=True)
195     finally:
196         subprocess.call(['rm', '-rf', tmpdir])