]> arthur.barton.de Git - bup.git/blob - lib/bup/t/tresolve.py
tresolve: accommodate python 3 and test there
[bup.git] / lib / bup / t / tresolve.py
1
2 from __future__ import absolute_import, print_function
3 from binascii import unhexlify
4 from errno import ELOOP, ENOTDIR
5 from os import symlink
6 from stat import S_IFDIR
7 from sys import stderr
8 from time import localtime, strftime
9
10 from wvtest import *
11
12 from bup import git, path, vfs
13 from bup.compat import environ
14 from bup.io import path_msg
15 from bup.metadata import Metadata
16 from bup.repo import LocalRepo, RemoteRepo
17 from bup.test.vfs import tree_dict
18 from buptest import ex, exo, no_lingering_errors, test_tempdir
19
20 bup_path = path.exe()
21
22 ## The clear_cache() calls below are to make sure that the test starts
23 ## from a known state since at the moment the cache entry for a given
24 ## item (like a commit) can change.  For example, its meta value might
25 ## be promoted from a mode to a Metadata instance once the tree it
26 ## refers to is traversed.
27
28 def prep_and_test_repo(name, create_repo, test_repo):
29     with no_lingering_errors():
30         with test_tempdir(b'bup-t' + name) as tmpdir:
31             bup_dir = tmpdir + b'/bup'
32             environ[b'GIT_DIR'] = bup_dir
33             environ[b'BUP_DIR'] = bup_dir
34             ex((bup_path, b'init'))
35             git.repodir = bup_dir
36             with create_repo(bup_dir) as repo:
37                 test_repo(repo, tmpdir)
38
39 # Currently, we just test through the repos since LocalRepo resolve is
40 # just a straight redirection to vfs.resolve.
41
42 def test_resolve(repo, tmpdir):
43         data_path = tmpdir + b'/src'
44         resolve = repo.resolve
45         save_time = 100000
46         save_time_str = strftime('%Y-%m-%d-%H%M%S', localtime(save_time)).encode('ascii')
47         os.mkdir(data_path)
48         os.mkdir(data_path + b'/dir')
49         with open(data_path + b'/file', 'wb+') as tmpfile:
50             tmpfile.write(b'canary\n')
51         symlink(b'file', data_path + b'/file-symlink')
52         symlink(b'dir', data_path + b'/dir-symlink')
53         symlink(b'not-there', data_path + b'/bad-symlink')
54         ex((bup_path, b'index', b'-v', data_path))
55         ex((bup_path, b'save', b'-d', b'%d' % save_time, b'-tvvn', b'test',
56             b'--strip', data_path))
57         ex((bup_path, b'tag', b'test-tag', b'test'))
58
59         tip_hash = exo((b'git', b'show-ref', b'refs/heads/test'))[0]
60         tip_oidx = tip_hash.strip().split()[0]
61         tip_oid = unhexlify(tip_oidx)
62         tip_tree_oidx = exo((b'git', b'log', b'--pretty=%T', b'-n1',
63                              tip_oidx))[0].strip()
64         tip_tree_oid = unhexlify(tip_tree_oidx)
65         tip_tree = tree_dict(repo, tip_tree_oid)
66         test_revlist_w_meta = vfs.RevList(meta=tip_tree[b'.'].meta,
67                                           oid=tip_oid)
68         expected_latest_item = vfs.Commit(meta=S_IFDIR | 0o755,
69                                           oid=tip_tree_oid,
70                                           coid=tip_oid)
71         expected_latest_item_w_meta = vfs.Commit(meta=tip_tree[b'.'].meta,
72                                                  oid=tip_tree_oid,
73                                                  coid=tip_oid)
74         expected_latest_link = vfs.FakeLink(meta=vfs.default_symlink_mode,
75                                             target=save_time_str)
76         expected_test_tag_item = expected_latest_item
77
78         wvstart('resolve: /')
79         vfs.clear_cache()
80         res = resolve(b'/')
81         wvpasseq(1, len(res))
82         wvpasseq(((b'', vfs._root),), res)
83         ignore, root_item = res[0]
84         root_content = frozenset(vfs.contents(repo, root_item))
85         wvpasseq(frozenset([(b'.', root_item),
86                             (b'.tag', vfs._tags),
87                             (b'test', test_revlist_w_meta)]),
88                  root_content)
89         for path in (b'//', b'/.', b'/./', b'/..', b'/../',
90                      b'/test/latest/dir/../../..',
91                      b'/test/latest/dir/../../../',
92                      b'/test/latest/dir/../../../.',
93                      b'/test/latest/dir/../../..//',
94                      b'/test//latest/dir/../../..',
95                      b'/test/./latest/dir/../../..',
96                      b'/test/././latest/dir/../../..',
97                      b'/test/.//./latest/dir/../../..',
98                      b'/test//.//.//latest/dir/../../..'
99                      b'/test//./latest/dir/../../..'):
100             wvstart('resolve: ' + path_msg(path))
101             vfs.clear_cache()
102             res = resolve(path)
103             wvpasseq(((b'', vfs._root),), res)
104
105         wvstart('resolve: /.tag')
106         vfs.clear_cache()
107         res = resolve(b'/.tag')
108         wvpasseq(2, len(res))
109         wvpasseq(((b'', vfs._root), (b'.tag', vfs._tags)),
110                  res)
111         ignore, tag_item = res[1]
112         tag_content = frozenset(vfs.contents(repo, tag_item))
113         wvpasseq(frozenset([(b'.', tag_item),
114                             (b'test-tag', expected_test_tag_item)]),
115                  tag_content)
116
117         wvstart('resolve: /test')
118         vfs.clear_cache()
119         res = resolve(b'/test')
120         wvpasseq(2, len(res))
121         wvpasseq(((b'', vfs._root), (b'test', test_revlist_w_meta)), res)
122         ignore, test_item = res[1]
123         test_content = frozenset(vfs.contents(repo, test_item))
124         # latest has metadata here due to caching
125         wvpasseq(frozenset([(b'.', test_revlist_w_meta),
126                             (save_time_str, expected_latest_item_w_meta),
127                             (b'latest', expected_latest_link)]),
128                  test_content)
129
130         wvstart('resolve: /test/latest')
131         vfs.clear_cache()
132         res = resolve(b'/test/latest')
133         wvpasseq(3, len(res))
134         expected_latest_item_w_meta = vfs.Commit(meta=tip_tree[b'.'].meta,
135                                                  oid=tip_tree_oid,
136                                                  coid=tip_oid)
137         expected = ((b'', vfs._root),
138                     (b'test', test_revlist_w_meta),
139                     (save_time_str, expected_latest_item_w_meta))
140         wvpasseq(expected, res)
141         ignore, latest_item = res[2]
142         latest_content = frozenset(vfs.contents(repo, latest_item))
143         expected = frozenset((x.name, vfs.Item(oid=x.oid, meta=x.meta))
144                              for x in (tip_tree[name]
145                                        for name in (b'.',
146                                                     b'bad-symlink',
147                                                     b'dir',
148                                                     b'dir-symlink',
149                                                     b'file',
150                                                     b'file-symlink')))
151         wvpasseq(expected, latest_content)
152
153         wvstart('resolve: /test/latest/file')
154         vfs.clear_cache()
155         res = resolve(b'/test/latest/file')
156         wvpasseq(4, len(res))
157         expected_file_item_w_meta = vfs.Item(meta=tip_tree[b'file'].meta,
158                                              oid=tip_tree[b'file'].oid)
159         expected = ((b'', vfs._root),
160                     (b'test', test_revlist_w_meta),
161                     (save_time_str, expected_latest_item_w_meta),
162                     (b'file', expected_file_item_w_meta))
163         wvpasseq(expected, res)
164
165         wvstart('resolve: /test/latest/bad-symlink')
166         vfs.clear_cache()
167         res = resolve(b'/test/latest/bad-symlink')
168         wvpasseq(4, len(res))
169         expected = ((b'', vfs._root),
170                     (b'test', test_revlist_w_meta),
171                     (save_time_str, expected_latest_item_w_meta),
172                     (b'not-there', None))
173         wvpasseq(expected, res)
174
175         wvstart('resolve nofollow: /test/latest/bad-symlink')
176         vfs.clear_cache()
177         res = resolve(b'/test/latest/bad-symlink', follow=False)
178         wvpasseq(4, len(res))
179         bad_symlink_value = tip_tree[b'bad-symlink']
180         expected_bad_symlink_item_w_meta = vfs.Item(meta=bad_symlink_value.meta,
181                                                     oid=bad_symlink_value.oid)
182         expected = ((b'', vfs._root),
183                     (b'test', test_revlist_w_meta),
184                     (save_time_str, expected_latest_item_w_meta),
185                     (b'bad-symlink', expected_bad_symlink_item_w_meta))
186         wvpasseq(expected, res)
187
188         wvstart('resolve: /test/latest/file-symlink')
189         vfs.clear_cache()
190         res = resolve(b'/test/latest/file-symlink')
191         wvpasseq(4, len(res))
192         expected = ((b'', vfs._root),
193                     (b'test', test_revlist_w_meta),
194                     (save_time_str, expected_latest_item_w_meta),
195                     (b'file', expected_file_item_w_meta))
196         wvpasseq(expected, res)
197
198         wvstart('resolve nofollow: /test/latest/file-symlink')
199         vfs.clear_cache()
200         res = resolve(b'/test/latest/file-symlink', follow=False)
201         wvpasseq(4, len(res))
202         file_symlink_value = tip_tree[b'file-symlink']
203         expected_file_symlink_item_w_meta = vfs.Item(meta=file_symlink_value.meta,
204                                                      oid=file_symlink_value.oid)
205         expected = ((b'', vfs._root),
206                     (b'test', test_revlist_w_meta),
207                     (save_time_str, expected_latest_item_w_meta),
208                     (b'file-symlink', expected_file_symlink_item_w_meta))
209         wvpasseq(expected, res)
210
211         wvstart('resolve: /test/latest/missing')
212         vfs.clear_cache()
213         res = resolve(b'/test/latest/missing')
214         wvpasseq(4, len(res))
215         name, item = res[-1]
216         wvpasseq(b'missing', name)
217         wvpass(item is None)
218
219         for path in (b'/test/latest/file/',
220                      b'/test/latest/file/.',
221                      b'/test/latest/file/..',
222                      b'/test/latest/file/../',
223                      b'/test/latest/file/../.',
224                      b'/test/latest/file/../..',
225                      b'/test/latest/file/foo'):
226             wvstart('resolve: ' + path_msg(path))
227             vfs.clear_cache()
228             try:
229                 resolve(path)
230             except vfs.IOError as res_ex:
231                 wvpasseq(ENOTDIR, res_ex.errno)
232                 wvpasseq([b'', b'test', save_time_str, b'file'],
233                          [name for name, item in res_ex.terminus])
234
235         for path in (b'/test/latest/file-symlink/',
236                      b'/test/latest/file-symlink/.',
237                      b'/test/latest/file-symlink/..',
238                      b'/test/latest/file-symlink/../',
239                      b'/test/latest/file-symlink/../.',
240                      b'/test/latest/file-symlink/../..'):
241             wvstart('resolve nofollow: ' + path_msg(path))
242             vfs.clear_cache()
243             try:
244                 resolve(path, follow=False)
245             except vfs.IOError as res_ex:
246                 wvpasseq(ENOTDIR, res_ex.errno)
247                 wvpasseq([b'', b'test', save_time_str, b'file'],
248                          [name for name, item in res_ex.terminus])
249
250         wvstart('resolve: non-directory parent')
251         vfs.clear_cache()
252         file_res = resolve(b'/test/latest/file')
253         try:
254             resolve(b'foo', parent=file_res)
255         except vfs.IOError as res_ex:
256             wvpasseq(ENOTDIR, res_ex.errno)
257             wvpasseq(None, res_ex.terminus)
258
259         wvstart('resolve nofollow: /test/latest/dir-symlink')
260         vfs.clear_cache()
261         res = resolve(b'/test/latest/dir-symlink', follow=False)
262         wvpasseq(4, len(res))
263         dir_symlink_value = tip_tree[b'dir-symlink']
264         expected_dir_symlink_item_w_meta = vfs.Item(meta=dir_symlink_value.meta,
265                                                      oid=dir_symlink_value.oid)
266         expected = ((b'', vfs._root),
267                     (b'test', test_revlist_w_meta),
268                     (save_time_str, expected_latest_item_w_meta),
269                     (b'dir-symlink', expected_dir_symlink_item_w_meta))
270         wvpasseq(expected, res)
271
272         dir_value = tip_tree[b'dir']
273         expected_dir_item = vfs.Item(oid=dir_value.oid,
274                                      meta=tree_dict(repo, dir_value.oid)[b'.'].meta)
275         expected = ((b'', vfs._root),
276                     (b'test', test_revlist_w_meta),
277                     (save_time_str, expected_latest_item_w_meta),
278                     (b'dir', expected_dir_item))
279         def lresolve(*args, **keys):
280             return resolve(*args, **dict(keys, follow=False))
281         for resname, resolver in (('resolve', resolve),
282                                   ('resolve nofollow', lresolve)):
283             for path in (b'/test/latest/dir-symlink/',
284                          b'/test/latest/dir-symlink/.'):
285                 wvstart(resname + ': ' + path_msg(path))
286                 vfs.clear_cache()
287                 res = resolver(path)
288                 wvpasseq(4, len(res))
289                 wvpasseq(expected, res)
290         wvstart('resolve: /test/latest/dir-symlink')
291         vfs.clear_cache()
292         res = resolve(path)
293         wvpasseq(4, len(res))
294         wvpasseq(expected, res)
295
296 @wvtest
297 def test_local_resolve():
298     prep_and_test_repo(b'local-vfs-resolve',
299                        lambda x: LocalRepo(repo_dir=x), test_resolve)
300
301 @wvtest
302 def test_remote_resolve():
303     prep_and_test_repo(b'remote-vfs-resolve',
304                        lambda x: RemoteRepo(x), test_resolve)
305
306 def test_resolve_loop(repo, tmpdir):
307     data_path = tmpdir + b'/src'
308     os.mkdir(data_path)
309     symlink(b'loop', data_path + b'/loop')
310     ex((bup_path, b'init'))
311     ex((bup_path, b'index', b'-v', data_path))
312     save_utc = 100000
313     ex((bup_path, b'save', b'-d', b'%d' % save_utc, b'-tvvn', b'test', b'--strip',
314         data_path))
315     save_name = strftime('%Y-%m-%d-%H%M%S', localtime(save_utc)).encode('ascii')
316     try:
317         wvpasseq('this call should never return',
318                  repo.resolve(b'/test/%s/loop' % save_name))
319     except vfs.IOError as res_ex:
320         wvpasseq(ELOOP, res_ex.errno)
321         wvpasseq([b'', b'test', save_name, b'loop'],
322                  [name for name, item in res_ex.terminus])
323
324 @wvtest
325 def test_local_resolve_loop():
326     prep_and_test_repo(b'local-vfs-resolve-loop',
327                        lambda x: LocalRepo(x), test_resolve_loop)
328
329 @wvtest
330 def test_remote_resolve_loop():
331     prep_and_test_repo(b'remote-vfs-resolve-loop',
332                        lambda x: RemoteRepo(x), test_resolve_loop)
333
334 # FIXME: add tests for the want_meta=False cases.