-@wvtest
-def test_resolve():
- with no_lingering_errors():
- with test_tempdir('bup-tvfs-') as tmpdir:
- resolve = vfs.resolve
- lresolve = vfs.lresolve
- bup_dir = tmpdir + '/bup'
- environ['GIT_DIR'] = bup_dir
- environ['BUP_DIR'] = bup_dir
- git.repodir = bup_dir
- data_path = tmpdir + '/src'
- save_time = 100000
- save_time_str = strftime('%Y-%m-%d-%H%M%S', localtime(save_time))
- os.mkdir(data_path)
- with open(data_path + '/file', 'w+') as tmpfile:
- print('canary', file=tmpfile)
- symlink('file', data_path + '/symlink')
- ex((bup_path, 'init'))
- ex((bup_path, 'index', '-v', data_path))
- ex((bup_path, 'save', '-d', str(save_time), '-tvvn', 'test',
- '--strip', data_path))
- ex((bup_path, 'tag', 'test-tag', 'test'))
- repo = LocalRepo()
-
- tip_hash = exo(('git', 'show-ref', 'refs/heads/test'))[0]
- tip_oidx = tip_hash.strip().split()[0]
- tip_oid = tip_oidx.decode('hex')
- tip_tree_oidx = exo(('git', 'log', '--pretty=%T', '-n1',
- tip_oidx))[0].strip()
- tip_tree_oid = tip_tree_oidx.decode('hex')
- tip_tree = tree_dict(repo, tip_tree_oid)
- test_revlist = vfs.RevList(meta=S_IFDIR | 0o755, oid=tip_oid)
- test_revlist_w_meta = vfs.RevList(meta=tip_tree['.'].meta,
- oid=tip_oid)
- expected_latest_item = vfs.Commit(meta=S_IFDIR | 0o755,
- oid=tip_tree_oid,
- coid=tip_oid)
- expected_test_tag_item = expected_latest_item
-
- wvstart('resolve: /')
- res = resolve(repo, '/')
- wvpasseq(1, len(res))
- wvpasseq((('', vfs._root),), res)
- ignore, root_item = res[0]
- root_content = frozenset(vfs.contents(repo, root_item))
- wvpasseq(frozenset([('.', root_item),
- ('.tag', vfs._tags),
- ('test', test_revlist)]),
- root_content)
-
- wvstart('resolve: /.tag')
- res = resolve(repo, '/.tag')
- wvpasseq(2, len(res))
- wvpasseq((('', vfs._root), ('.tag', vfs._tags)),
- res)
- ignore, tag_item = res[1]
- tag_content = frozenset(vfs.contents(repo, tag_item))
- wvpasseq(frozenset([('.', tag_item),
- ('test-tag', expected_test_tag_item)]),
- tag_content)
-
- wvstart('resolve: /test')
- res = resolve(repo, '/test')
- wvpasseq(2, len(res))
- wvpasseq((('', vfs._root), ('test', test_revlist)), res)
- ignore, test_item = res[1]
- test_content = frozenset(vfs.contents(repo, test_item))
- wvpasseq(frozenset([('.', test_revlist_w_meta),
- (save_time_str, expected_latest_item),
- ('latest', expected_latest_item)]),
- test_content)
-
- wvstart('resolve: /test/latest')
- res = resolve(repo, '/test/latest')
- wvpasseq(3, len(res))
- expected_latest_item_w_meta = vfs.Commit(meta=tip_tree['.'].meta,
- oid=tip_tree_oid,
- coid=tip_oid)
- expected = (('', vfs._root),
- ('test', test_revlist),
- ('latest', expected_latest_item_w_meta))
- wvpasseq(expected, res)
- ignore, latest_item = res[2]
- latest_content = frozenset(vfs.contents(repo, latest_item))
- expected = frozenset((x.name, vfs.Item(oid=x.oid, meta=x.meta))
- for x in (tip_tree[name]
- for name in ('.', 'file',
- 'symlink')))
- wvpasseq(expected, latest_content)
-
- wvstart('resolve: /test/latest/foo')
- res = resolve(repo, '/test/latest/file')
- wvpasseq(4, len(res))
- expected_file_item_w_meta = vfs.Item(meta=tip_tree['file'].meta,
- oid=tip_tree['file'].oid)
- expected = (('', vfs._root),
- ('test', test_revlist),
- ('latest', expected_latest_item_w_meta),
- ('file', expected_file_item_w_meta))
- wvpasseq(expected, res)
-
- wvstart('resolve: /test/latest/symlink')
- res = resolve(repo, '/test/latest/symlink')
- wvpasseq(4, len(res))
- expected = (('', vfs._root),
- ('test', test_revlist),
- ('latest', expected_latest_item_w_meta),
- ('file', expected_file_item_w_meta))
- wvpasseq(expected, res)
-
- wvstart('lresolve: /test/latest/symlink')
- res = lresolve(repo, '/test/latest/symlink')
- wvpasseq(4, len(res))
- symlink_value = tip_tree['symlink']
- expected_symlink_item_w_meta = vfs.Item(meta=symlink_value.meta,
- oid=symlink_value.oid)
- expected = (('', vfs._root),
- ('test', test_revlist),
- ('latest', expected_latest_item_w_meta),
- ('symlink', expected_symlink_item_w_meta))
- wvpasseq(expected, res)
-
- wvstart('resolve: /test/latest/missing')
- res = resolve(repo, '/test/latest/missing')
- wvpasseq(4, len(res))
- name, item = res[-1]
- wvpasseq('missing', name)
- wvpass(item is None)
+def write_sized_random_content(parent_dir, size, seed):
+ verbose = 0
+ with open(b'%s/%d' % (parent_dir, size), 'wb') as f:
+ write_random(f.fileno(), size, seed, verbose)
+
+def validate_vfs_streaming_read(repo, item, expected_path, read_sizes):
+ for read_size in read_sizes:
+ with open(expected_path, 'rb') as expected:
+ with vfs.fopen(repo, item) as actual:
+ ex_buf = expected.read(read_size)
+ act_buf = actual.read(read_size)
+ while ex_buf and act_buf:
+ wvpassge(read_size, len(ex_buf))
+ wvpassge(read_size, len(act_buf))
+ wvpasseq(len(ex_buf), len(act_buf))
+ wvpass(ex_buf == act_buf)
+ ex_buf = expected.read(read_size)
+ act_buf = actual.read(read_size)
+ wvpasseq(b'', ex_buf)
+ wvpasseq(b'', act_buf)
+
+def validate_vfs_seeking_read(repo, item, expected_path, read_sizes):
+ def read_act(act_pos):
+ with vfs.fopen(repo, item) as actual:
+ actual.seek(act_pos)
+ wvpasseq(act_pos, actual.tell())
+ act_buf = actual.read(read_size)
+ act_pos += len(act_buf)
+ wvpasseq(act_pos, actual.tell())
+ return act_pos, act_buf
+
+ for read_size in read_sizes:
+ with open(expected_path, 'rb') as expected:
+ ex_buf = expected.read(read_size)
+ act_buf = None
+ act_pos = 0
+ while ex_buf:
+ act_pos, act_buf = read_act(act_pos)
+ wvpassge(read_size, len(ex_buf))
+ wvpassge(read_size, len(act_buf))
+ wvpasseq(len(ex_buf), len(act_buf))
+ wvpass(ex_buf == act_buf)
+ if not act_buf:
+ break
+ ex_buf = expected.read(read_size)
+ else: # hit expected eof first
+ act_pos, act_buf = read_act(act_pos)
+ wvpasseq(b'', ex_buf)
+ wvpasseq(b'', act_buf)