2 from __future__ import absolute_import, print_function
7 from bup import index, metadata
8 from bup.compat import fsencode
9 from bup.helpers import resolve_parent
10 import bup.xstat as xstat
13 lib_t_dir = os.path.dirname(fsencode(__file__))
16 def test_index_basic():
17 cd = os.path.realpath(os.path.join(lib_t_dir, b'../'))
19 sd = os.path.realpath(cd + b'/sampledata')
20 WVPASSEQ(resolve_parent(cd + b'/sampledata'), sd)
21 WVPASSEQ(os.path.realpath(cd + b'/sampledata/x'), sd + b'/x')
22 WVPASSEQ(os.path.realpath(cd + b'/sampledata/var/abs-symlink'),
23 sd + b'/var/abs-symlink-target')
24 WVPASSEQ(resolve_parent(cd + b'/sampledata/var/abs-symlink'),
25 sd + b'/var/abs-symlink')
28 def test_index_writer(tmpdir):
29 orig_cwd = os.getcwd()
33 fs = xstat.stat(lib_t_dir + b'/test_index.py')
34 tmax = (time.time() - 1) * 10**9
35 with index.MetaStoreWriter(b'index.meta.tmp') as ms, \
36 index.Writer(b'index.tmp', ms, tmax) as w:
37 w.add(b'/var/tmp/sporky', fs, 0)
38 w.add(b'/etc/passwd', fs, 0)
39 w.add(b'/etc/', ds, 0)
48 print('%s%s %s' % (e.is_valid() and ' ' or 'M',
49 e.is_fake() and 'F' or ' ',
52 def fake_validate(*l):
55 e.validate(0o100644, index.FAKE_SHA)
64 def test_index_negative_timestamps(tmpdir):
66 foopath = tmpdir + b'/foo'
67 f = open(foopath, 'wb')
71 os.utime(foopath, (-86400, -86400))
73 tmax = (time.time() - 1) * ns_per_sec
74 e = index.BlankNewEntry(foopath, 0, tmax)
75 e.update_from_stat(xstat.stat(foopath), 0)
79 os.utime(foopath, (-0x80000000, -0x80000000))
80 e = index.BlankNewEntry(foopath, 0, tmax)
81 e.update_from_stat(xstat.stat(foopath), 0)
85 def test_index_dirty(tmpdir):
86 orig_cwd = os.getcwd()
89 default_meta = metadata.Metadata()
91 with index.MetaStoreWriter(b'index.meta.tmp') as ms1, \
92 index.MetaStoreWriter(b'index2.meta.tmp') as ms2, \
93 index.MetaStoreWriter(b'index3.meta.tmp') as ms3:
95 meta_ofs1 = ms1.store(default_meta)
96 meta_ofs2 = ms2.store(default_meta)
97 meta_ofs3 = ms3.store(default_meta)
99 ds = xstat.stat(lib_t_dir)
100 fs = xstat.stat(lib_t_dir + b'/test_index.py')
101 tmax = (time.time() - 1) * 10**9
103 with index.Writer(b'index.tmp', ms1, tmax) as w1, \
104 index.Writer(b'index2.tmp', ms2, tmax) as w2, \
105 index.Writer(b'index3.tmp', ms3, tmax) as w3:
107 w1.add(b'/a/b/x', fs, meta_ofs1)
108 w1.add(b'/a/b/c', fs, meta_ofs1)
109 w1.add(b'/a/b/', ds, meta_ofs1)
110 w1.add(b'/a/', ds, meta_ofs1)
114 w2 = index.Writer(b'index2.tmp', ms2, tmax)
115 w2.add(b'/a/b/n/2', fs, meta_ofs2)
119 w3.add(b'/a/c/n/3', fs, meta_ofs3)
123 with w1.new_reader() as r1, \
124 w2.new_reader() as r2, \
125 w3.new_reader() as r3:
128 r1all = [e.name for e in r1]
130 [b'/a/b/x', b'/a/b/c', b'/a/b/', b'/a/', b'/'])
131 r2all = [e.name for e in r2]
133 [b'/a/b/n/2', b'/a/b/n/', b'/a/b/', b'/a/', b'/'])
134 r3all = [e.name for e in r3]
136 [b'/a/c/n/3', b'/a/c/n/', b'/a/c/', b'/a/', b'/'])
137 all = [e.name for e in index.merge(r2, r1, r3)]
139 [b'/a/c/n/3', b'/a/c/n/', b'/a/c/',
140 b'/a/b/x', b'/a/b/n/2', b'/a/b/n/', b'/a/b/c',
141 b'/a/b/', b'/a/', b'/'])
145 print([hex(e.flags) for e in r1])
146 WVPASSEQ([e.name for e in r1 if e.is_valid()], r1all)
147 WVPASSEQ([e.name for e in r1 if not e.is_valid()], [])
148 WVPASSEQ([e.name for e in index.merge(r2, r1, r3) if not e.is_valid()],
149 [b'/a/c/n/3', b'/a/c/n/', b'/a/c/',
150 b'/a/b/n/2', b'/a/b/n/', b'/a/b/', b'/a/', b'/'])
152 expect_invalid = [b'/'] + r2all + r3all
153 expect_real = (set(r1all) - set(r2all) - set(r3all)) \
154 | set([b'/a/b/n/2', b'/a/c/n/3'])
155 dump(index.merge(r2, r1, r3))
156 for e in index.merge(r2, r1, r3):
157 print(e.name, hex(e.flags), e.ctime)
158 eiv = e.name in expect_invalid
159 er = e.name in expect_real
160 WVPASSEQ(eiv, not e.is_valid())
161 WVPASSEQ(er, e.is_real())
162 fake_validate(r2, r3)
163 dump(index.merge(r2, r1, r3))
164 WVPASSEQ([e.name for e in index.merge(r2, r1, r3) if not e.is_valid()], [])
166 e = eget(index.merge(r2, r1, r3), b'/a/b/c')
169 dump(index.merge(r2, r1, r3))
170 WVPASSEQ([e.name for e in index.merge(r2, r1, r3) if not e.is_valid()],
171 [b'/a/b/c', b'/a/b/', b'/a/', b'/'])