2 from __future__ import absolute_import, print_function
7 from bup import index, metadata
8 from bup.compat import fsencode
9 from bup.helpers import resolve_parent
10 import bup.xstat as xstat
13 lib_t_dir = os.path.dirname(fsencode(__file__))
16 def test_index_basic():
17 cd = os.path.realpath(os.path.join(lib_t_dir, b'../'))
19 sd = os.path.realpath(cd + b'/sampledata')
20 WVPASSEQ(resolve_parent(cd + b'/sampledata'), sd)
21 WVPASSEQ(os.path.realpath(cd + b'/sampledata/x'), sd + b'/x')
22 WVPASSEQ(os.path.realpath(cd + b'/sampledata/var/abs-symlink'),
23 sd + b'/var/abs-symlink-target')
24 WVPASSEQ(resolve_parent(cd + b'/sampledata/var/abs-symlink'),
25 sd + b'/var/abs-symlink')
28 def test_index_writer(tmpdir):
29 orig_cwd = os.getcwd()
33 fs = xstat.stat(lib_t_dir + b'/test_index.py')
34 tmax = (time.time() - 1) * 10**9
35 with index.MetaStoreWriter(b'index.meta.tmp') as ms, \
36 index.Writer(b'index.tmp', ms, tmax) as w:
37 w.add(b'/var/tmp/sporky', fs, 0)
38 w.add(b'/etc/passwd', fs, 0)
39 w.add(b'/etc/', ds, 0)
48 print('%s%s %s' % (e.is_valid() and ' ' or 'M',
49 e.is_fake() and 'F' or ' ',
52 def fake_validate(*l):
55 e.validate(0o100644, index.FAKE_SHA)
64 def test_index_negative_timestamps(tmpdir):
66 foopath = tmpdir + b'/foo'
67 f = open(foopath, 'wb')
71 os.utime(foopath, (-86400, -86400))
73 tmax = (time.time() - 1) * ns_per_sec
74 e = index.BlankNewEntry(foopath, 0, tmax)
75 e.update_from_stat(xstat.stat(foopath), 0)
79 os.utime(foopath, (-0x80000000, -0x80000000))
80 e = index.BlankNewEntry(foopath, 0, tmax)
81 e.update_from_stat(xstat.stat(foopath), 0)
85 def test_index_dirty(tmpdir):
86 orig_cwd = os.getcwd()
89 default_meta = metadata.Metadata()
91 with index.MetaStoreWriter(b'index.meta.tmp') as ms1, \
92 index.MetaStoreWriter(b'index2.meta.tmp') as ms2, \
93 index.MetaStoreWriter(b'index3.meta.tmp') as ms3:
95 meta_ofs1 = ms1.store(default_meta)
96 meta_ofs2 = ms2.store(default_meta)
97 meta_ofs3 = ms3.store(default_meta)
99 ds = xstat.stat(lib_t_dir)
100 fs = xstat.stat(lib_t_dir + b'/test_index.py')
101 tmax = (time.time() - 1) * 10**9
103 with index.Writer(b'index.tmp', ms1, tmax) as w1, \
104 index.Writer(b'index2.tmp', ms2, tmax) as w2, \
105 index.Writer(b'index3.tmp', ms3, tmax) as w3:
107 w1.add(b'/a/b/x', fs, meta_ofs1)
108 w1.add(b'/a/b/c', fs, meta_ofs1)
109 w1.add(b'/a/b/', ds, meta_ofs1)
110 w1.add(b'/a/', ds, meta_ofs1)
114 w2.add(b'/a/b/n/2', fs, meta_ofs2)
118 w3.add(b'/a/c/n/3', fs, meta_ofs3)
122 with w1.new_reader() as r1, \
123 w2.new_reader() as r2, \
124 w3.new_reader() as r3:
127 r1all = [e.name for e in r1]
129 [b'/a/b/x', b'/a/b/c', b'/a/b/', b'/a/', b'/'])
130 r2all = [e.name for e in r2]
132 [b'/a/b/n/2', b'/a/b/n/', b'/a/b/', b'/a/', b'/'])
133 r3all = [e.name for e in r3]
135 [b'/a/c/n/3', b'/a/c/n/', b'/a/c/', b'/a/', b'/'])
136 all = [e.name for e in index.merge(r2, r1, r3)]
138 [b'/a/c/n/3', b'/a/c/n/', b'/a/c/',
139 b'/a/b/x', b'/a/b/n/2', b'/a/b/n/', b'/a/b/c',
140 b'/a/b/', b'/a/', b'/'])
144 print([hex(e.flags) for e in r1])
145 WVPASSEQ([e.name for e in r1 if e.is_valid()], r1all)
146 WVPASSEQ([e.name for e in r1 if not e.is_valid()], [])
147 WVPASSEQ([e.name for e in index.merge(r2, r1, r3) if not e.is_valid()],
148 [b'/a/c/n/3', b'/a/c/n/', b'/a/c/',
149 b'/a/b/n/2', b'/a/b/n/', b'/a/b/', b'/a/', b'/'])
151 expect_invalid = [b'/'] + r2all + r3all
152 expect_real = (set(r1all) - set(r2all) - set(r3all)) \
153 | set([b'/a/b/n/2', b'/a/c/n/3'])
154 dump(index.merge(r2, r1, r3))
155 for e in index.merge(r2, r1, r3):
156 print(e.name, hex(e.flags), e.ctime)
157 eiv = e.name in expect_invalid
158 er = e.name in expect_real
159 WVPASSEQ(eiv, not e.is_valid())
160 WVPASSEQ(er, e.is_real())
161 fake_validate(r2, r3)
162 dump(index.merge(r2, r1, r3))
163 WVPASSEQ([e.name for e in index.merge(r2, r1, r3) if not e.is_valid()], [])
165 e = eget(index.merge(r2, r1, r3), b'/a/b/c')
168 dump(index.merge(r2, r1, r3))
169 WVPASSEQ([e.name for e in index.merge(r2, r1, r3) if not e.is_valid()],
170 [b'/a/b/c', b'/a/b/', b'/a/', b'/'])