]> arthur.barton.de Git - bup.git/blob - lib/bup/t/tindex.py
Check saved_errors before and after every test
[bup.git] / lib / bup / t / tindex.py
1
2 import os, time
3
4 from wvtest import *
5
6 from bup import index, metadata
7 from bup.helpers import mkdirp, resolve_parent
8 from buptest import no_lingering_errors, test_tempdir
9 import bup.xstat as xstat
10
11
12 lib_t_dir = os.path.dirname(__file__)
13
14
15 @wvtest
16 def index_basic():
17     with no_lingering_errors():
18         cd = os.path.realpath('../../../t')
19         WVPASS(cd)
20         sd = os.path.realpath(cd + '/sampledata')
21         WVPASSEQ(resolve_parent(cd + '/sampledata'), sd)
22         WVPASSEQ(os.path.realpath(cd + '/sampledata/x'), sd + '/x')
23         WVPASSEQ(os.path.realpath(cd + '/sampledata/var/abs-symlink'),
24                  sd + '/var/abs-symlink-target')
25         WVPASSEQ(resolve_parent(cd + '/sampledata/var/abs-symlink'),
26                  sd + '/var/abs-symlink')
27
28
29 @wvtest
30 def index_writer():
31     with no_lingering_errors(), test_tempdir('bup-tindex-') as tmpdir:
32         orig_cwd = os.getcwd()
33         try:
34             os.chdir(tmpdir)
35             ds = xstat.stat('.')
36             fs = xstat.stat(lib_t_dir + '/tindex.py')
37             ms = index.MetaStoreWriter('index.meta.tmp');
38             tmax = (time.time() - 1) * 10**9
39             w = index.Writer('index.tmp', ms, tmax)
40             w.add('/var/tmp/sporky', fs, 0)
41             w.add('/etc/passwd', fs, 0)
42             w.add('/etc/', ds, 0)
43             w.add('/', ds, 0)
44             ms.close()
45             w.close()
46         finally:
47             os.chdir(orig_cwd)
48
49
50 def dump(m):
51     for e in list(m):
52         print '%s%s %s' % (e.is_valid() and ' ' or 'M',
53                            e.is_fake() and 'F' or ' ',
54                            e.name)
55
56 def fake_validate(*l):
57     for i in l:
58         for e in i:
59             e.validate(0100644, index.FAKE_SHA)
60             e.repack()
61
62 def eget(l, ename):
63     for e in l:
64         if e.name == ename:
65             return e
66
67 @wvtest
68 def index_negative_timestamps():
69     with no_lingering_errors(), test_tempdir('bup-tindex-') as tmpdir:
70         # Makes 'foo' exist
71         foopath = tmpdir + '/foo'
72         f = file(foopath, 'wb')
73         f.close()
74
75         # Dec 31, 1969
76         os.utime(foopath, (-86400, -86400))
77         ns_per_sec = 10**9
78         tstart = time.time() * ns_per_sec
79         tmax = tstart - ns_per_sec
80         e = index.BlankNewEntry(foopath, 0, tmax)
81         e.from_stat(xstat.stat(foopath), 0, tstart)
82         assert len(e.packed())
83         WVPASS()
84
85         # Jun 10, 1893
86         os.utime(foopath, (-0x80000000, -0x80000000))
87         e = index.BlankNewEntry(foopath, 0, tmax)
88         e.from_stat(xstat.stat(foopath), 0, tstart)
89         assert len(e.packed())
90         WVPASS()
91
92
93 @wvtest
94 def index_dirty():
95     with no_lingering_errors(), test_tempdir('bup-tindex-') as tmpdir:
96         orig_cwd = os.getcwd()
97         try:
98             os.chdir(tmpdir)
99             default_meta = metadata.Metadata()
100             ms1 = index.MetaStoreWriter('index.meta.tmp')
101             ms2 = index.MetaStoreWriter('index2.meta.tmp')
102             ms3 = index.MetaStoreWriter('index3.meta.tmp')
103             meta_ofs1 = ms1.store(default_meta)
104             meta_ofs2 = ms2.store(default_meta)
105             meta_ofs3 = ms3.store(default_meta)
106
107             ds = xstat.stat(lib_t_dir)
108             fs = xstat.stat(lib_t_dir + '/tindex.py')
109             tmax = (time.time() - 1) * 10**9
110
111             w1 = index.Writer('index.tmp', ms1, tmax)
112             w1.add('/a/b/x', fs, meta_ofs1)
113             w1.add('/a/b/c', fs, meta_ofs1)
114             w1.add('/a/b/', ds, meta_ofs1)
115             w1.add('/a/', ds, meta_ofs1)
116             #w1.close()
117             WVPASS()
118
119             w2 = index.Writer('index2.tmp', ms2, tmax)
120             w2.add('/a/b/n/2', fs, meta_ofs2)
121             #w2.close()
122             WVPASS()
123
124             w3 = index.Writer('index3.tmp', ms3, tmax)
125             w3.add('/a/c/n/3', fs, meta_ofs3)
126             #w3.close()
127             WVPASS()
128
129             r1 = w1.new_reader()
130             r2 = w2.new_reader()
131             r3 = w3.new_reader()
132             WVPASS()
133
134             r1all = [e.name for e in r1]
135             WVPASSEQ(r1all,
136                      ['/a/b/x', '/a/b/c', '/a/b/', '/a/', '/'])
137             r2all = [e.name for e in r2]
138             WVPASSEQ(r2all,
139                      ['/a/b/n/2', '/a/b/n/', '/a/b/', '/a/', '/'])
140             r3all = [e.name for e in r3]
141             WVPASSEQ(r3all,
142                      ['/a/c/n/3', '/a/c/n/', '/a/c/', '/a/', '/'])
143             all = [e.name for e in index.merge(r2, r1, r3)]
144             WVPASSEQ(all,
145                      ['/a/c/n/3', '/a/c/n/', '/a/c/',
146                       '/a/b/x', '/a/b/n/2', '/a/b/n/', '/a/b/c',
147                       '/a/b/', '/a/', '/'])
148             fake_validate(r1)
149             dump(r1)
150
151             print [hex(e.flags) for e in r1]
152             WVPASSEQ([e.name for e in r1 if e.is_valid()], r1all)
153             WVPASSEQ([e.name for e in r1 if not e.is_valid()], [])
154             WVPASSEQ([e.name for e in index.merge(r2, r1, r3) if not e.is_valid()],
155                      ['/a/c/n/3', '/a/c/n/', '/a/c/',
156                       '/a/b/n/2', '/a/b/n/', '/a/b/', '/a/', '/'])
157
158             expect_invalid = ['/'] + r2all + r3all
159             expect_real = (set(r1all) - set(r2all) - set(r3all)) \
160                             | set(['/a/b/n/2', '/a/c/n/3'])
161             dump(index.merge(r2, r1, r3))
162             for e in index.merge(r2, r1, r3):
163                 print e.name, hex(e.flags), e.ctime
164                 eiv = e.name in expect_invalid
165                 er  = e.name in expect_real
166                 WVPASSEQ(eiv, not e.is_valid())
167                 WVPASSEQ(er, e.is_real())
168             fake_validate(r2, r3)
169             dump(index.merge(r2, r1, r3))
170             WVPASSEQ([e.name for e in index.merge(r2, r1, r3) if not e.is_valid()], [])
171
172             e = eget(index.merge(r2, r1, r3), '/a/b/c')
173             e.invalidate()
174             e.repack()
175             dump(index.merge(r2, r1, r3))
176             WVPASSEQ([e.name for e in index.merge(r2, r1, r3) if not e.is_valid()],
177                      ['/a/b/c', '/a/b/', '/a/', '/'])        
178             w1.close()
179             w2.close()
180             w3.close()
181         finally:
182             os.chdir(orig_cwd)