2 from __future__ import absolute_import
7 from bup import index, metadata
8 from bup.helpers import mkdirp, resolve_parent
9 from buptest import no_lingering_errors, test_tempdir
10 import bup.xstat as xstat
13 lib_t_dir = os.path.dirname(__file__)
18 with no_lingering_errors():
19 cd = os.path.realpath('../../../t')
21 sd = os.path.realpath(cd + '/sampledata')
22 WVPASSEQ(resolve_parent(cd + '/sampledata'), sd)
23 WVPASSEQ(os.path.realpath(cd + '/sampledata/x'), sd + '/x')
24 WVPASSEQ(os.path.realpath(cd + '/sampledata/var/abs-symlink'),
25 sd + '/var/abs-symlink-target')
26 WVPASSEQ(resolve_parent(cd + '/sampledata/var/abs-symlink'),
27 sd + '/var/abs-symlink')
32 with no_lingering_errors():
33 with test_tempdir('bup-tindex-') as tmpdir:
34 orig_cwd = os.getcwd()
38 fs = xstat.stat(lib_t_dir + '/tindex.py')
39 ms = index.MetaStoreWriter('index.meta.tmp');
40 tmax = (time.time() - 1) * 10**9
41 w = index.Writer('index.tmp', ms, tmax)
42 w.add('/var/tmp/sporky', fs, 0)
43 w.add('/etc/passwd', fs, 0)
54 print '%s%s %s' % (e.is_valid() and ' ' or 'M',
55 e.is_fake() and 'F' or ' ',
58 def fake_validate(*l):
61 e.validate(0100644, index.FAKE_SHA)
70 def index_negative_timestamps():
71 with no_lingering_errors():
72 with test_tempdir('bup-tindex-') as tmpdir:
74 foopath = tmpdir + '/foo'
75 f = file(foopath, 'wb')
79 os.utime(foopath, (-86400, -86400))
81 tmax = (time.time() - 1) * ns_per_sec
82 e = index.BlankNewEntry(foopath, 0, tmax)
83 e.update_from_stat(xstat.stat(foopath), 0)
87 os.utime(foopath, (-0x80000000, -0x80000000))
88 e = index.BlankNewEntry(foopath, 0, tmax)
89 e.update_from_stat(xstat.stat(foopath), 0)
95 with no_lingering_errors():
96 with test_tempdir('bup-tindex-') as tmpdir:
97 orig_cwd = os.getcwd()
100 default_meta = metadata.Metadata()
101 ms1 = index.MetaStoreWriter('index.meta.tmp')
102 ms2 = index.MetaStoreWriter('index2.meta.tmp')
103 ms3 = index.MetaStoreWriter('index3.meta.tmp')
104 meta_ofs1 = ms1.store(default_meta)
105 meta_ofs2 = ms2.store(default_meta)
106 meta_ofs3 = ms3.store(default_meta)
108 ds = xstat.stat(lib_t_dir)
109 fs = xstat.stat(lib_t_dir + '/tindex.py')
110 tmax = (time.time() - 1) * 10**9
112 w1 = index.Writer('index.tmp', ms1, tmax)
113 w1.add('/a/b/x', fs, meta_ofs1)
114 w1.add('/a/b/c', fs, meta_ofs1)
115 w1.add('/a/b/', ds, meta_ofs1)
116 w1.add('/a/', ds, meta_ofs1)
120 w2 = index.Writer('index2.tmp', ms2, tmax)
121 w2.add('/a/b/n/2', fs, meta_ofs2)
125 w3 = index.Writer('index3.tmp', ms3, tmax)
126 w3.add('/a/c/n/3', fs, meta_ofs3)
135 r1all = [e.name for e in r1]
137 ['/a/b/x', '/a/b/c', '/a/b/', '/a/', '/'])
138 r2all = [e.name for e in r2]
140 ['/a/b/n/2', '/a/b/n/', '/a/b/', '/a/', '/'])
141 r3all = [e.name for e in r3]
143 ['/a/c/n/3', '/a/c/n/', '/a/c/', '/a/', '/'])
144 all = [e.name for e in index.merge(r2, r1, r3)]
146 ['/a/c/n/3', '/a/c/n/', '/a/c/',
147 '/a/b/x', '/a/b/n/2', '/a/b/n/', '/a/b/c',
148 '/a/b/', '/a/', '/'])
152 print [hex(e.flags) for e in r1]
153 WVPASSEQ([e.name for e in r1 if e.is_valid()], r1all)
154 WVPASSEQ([e.name for e in r1 if not e.is_valid()], [])
155 WVPASSEQ([e.name for e in index.merge(r2, r1, r3) if not e.is_valid()],
156 ['/a/c/n/3', '/a/c/n/', '/a/c/',
157 '/a/b/n/2', '/a/b/n/', '/a/b/', '/a/', '/'])
159 expect_invalid = ['/'] + r2all + r3all
160 expect_real = (set(r1all) - set(r2all) - set(r3all)) \
161 | set(['/a/b/n/2', '/a/c/n/3'])
162 dump(index.merge(r2, r1, r3))
163 for e in index.merge(r2, r1, r3):
164 print e.name, hex(e.flags), e.ctime
165 eiv = e.name in expect_invalid
166 er = e.name in expect_real
167 WVPASSEQ(eiv, not e.is_valid())
168 WVPASSEQ(er, e.is_real())
169 fake_validate(r2, r3)
170 dump(index.merge(r2, r1, r3))
171 WVPASSEQ([e.name for e in index.merge(r2, r1, r3) if not e.is_valid()], [])
173 e = eget(index.merge(r2, r1, r3), '/a/b/c')
176 dump(index.merge(r2, r1, r3))
177 WVPASSEQ([e.name for e in index.merge(r2, r1, r3) if not e.is_valid()],
178 ['/a/b/c', '/a/b/', '/a/', '/'])