]> arthur.barton.de Git - bup.git/blob - lib/bup/t/tindex.py
Don't use multiple with context clauses
[bup.git] / lib / bup / t / tindex.py
1
2 import os, time
3
4 from wvtest import *
5
6 from bup import index, metadata
7 from bup.helpers import mkdirp, resolve_parent
8 from buptest import no_lingering_errors, test_tempdir
9 import bup.xstat as xstat
10
11
12 lib_t_dir = os.path.dirname(__file__)
13
14
15 @wvtest
16 def index_basic():
17     with no_lingering_errors():
18         cd = os.path.realpath('../../../t')
19         WVPASS(cd)
20         sd = os.path.realpath(cd + '/sampledata')
21         WVPASSEQ(resolve_parent(cd + '/sampledata'), sd)
22         WVPASSEQ(os.path.realpath(cd + '/sampledata/x'), sd + '/x')
23         WVPASSEQ(os.path.realpath(cd + '/sampledata/var/abs-symlink'),
24                  sd + '/var/abs-symlink-target')
25         WVPASSEQ(resolve_parent(cd + '/sampledata/var/abs-symlink'),
26                  sd + '/var/abs-symlink')
27
28
29 @wvtest
30 def index_writer():
31     with no_lingering_errors():
32         with test_tempdir('bup-tindex-') as tmpdir:
33             orig_cwd = os.getcwd()
34             try:
35                 os.chdir(tmpdir)
36                 ds = xstat.stat('.')
37                 fs = xstat.stat(lib_t_dir + '/tindex.py')
38                 ms = index.MetaStoreWriter('index.meta.tmp');
39                 tmax = (time.time() - 1) * 10**9
40                 w = index.Writer('index.tmp', ms, tmax)
41                 w.add('/var/tmp/sporky', fs, 0)
42                 w.add('/etc/passwd', fs, 0)
43                 w.add('/etc/', ds, 0)
44                 w.add('/', ds, 0)
45                 ms.close()
46                 w.close()
47             finally:
48                 os.chdir(orig_cwd)
49
50
51 def dump(m):
52     for e in list(m):
53         print '%s%s %s' % (e.is_valid() and ' ' or 'M',
54                            e.is_fake() and 'F' or ' ',
55                            e.name)
56
57 def fake_validate(*l):
58     for i in l:
59         for e in i:
60             e.validate(0100644, index.FAKE_SHA)
61             e.repack()
62
63 def eget(l, ename):
64     for e in l:
65         if e.name == ename:
66             return e
67
68 @wvtest
69 def index_negative_timestamps():
70     with no_lingering_errors():
71         with test_tempdir('bup-tindex-') as tmpdir:
72             # Makes 'foo' exist
73             foopath = tmpdir + '/foo'
74             f = file(foopath, 'wb')
75             f.close()
76
77             # Dec 31, 1969
78             os.utime(foopath, (-86400, -86400))
79             ns_per_sec = 10**9
80             tstart = time.time() * ns_per_sec
81             tmax = tstart - ns_per_sec
82             e = index.BlankNewEntry(foopath, 0, tmax)
83             e.from_stat(xstat.stat(foopath), 0, tstart)
84             assert len(e.packed())
85             WVPASS()
86
87             # Jun 10, 1893
88             os.utime(foopath, (-0x80000000, -0x80000000))
89             e = index.BlankNewEntry(foopath, 0, tmax)
90             e.from_stat(xstat.stat(foopath), 0, tstart)
91             assert len(e.packed())
92             WVPASS()
93
94
95 @wvtest
96 def index_dirty():
97     with no_lingering_errors():
98         with test_tempdir('bup-tindex-') as tmpdir:
99             orig_cwd = os.getcwd()
100             try:
101                 os.chdir(tmpdir)
102                 default_meta = metadata.Metadata()
103                 ms1 = index.MetaStoreWriter('index.meta.tmp')
104                 ms2 = index.MetaStoreWriter('index2.meta.tmp')
105                 ms3 = index.MetaStoreWriter('index3.meta.tmp')
106                 meta_ofs1 = ms1.store(default_meta)
107                 meta_ofs2 = ms2.store(default_meta)
108                 meta_ofs3 = ms3.store(default_meta)
109
110                 ds = xstat.stat(lib_t_dir)
111                 fs = xstat.stat(lib_t_dir + '/tindex.py')
112                 tmax = (time.time() - 1) * 10**9
113
114                 w1 = index.Writer('index.tmp', ms1, tmax)
115                 w1.add('/a/b/x', fs, meta_ofs1)
116                 w1.add('/a/b/c', fs, meta_ofs1)
117                 w1.add('/a/b/', ds, meta_ofs1)
118                 w1.add('/a/', ds, meta_ofs1)
119                 #w1.close()
120                 WVPASS()
121
122                 w2 = index.Writer('index2.tmp', ms2, tmax)
123                 w2.add('/a/b/n/2', fs, meta_ofs2)
124                 #w2.close()
125                 WVPASS()
126
127                 w3 = index.Writer('index3.tmp', ms3, tmax)
128                 w3.add('/a/c/n/3', fs, meta_ofs3)
129                 #w3.close()
130                 WVPASS()
131
132                 r1 = w1.new_reader()
133                 r2 = w2.new_reader()
134                 r3 = w3.new_reader()
135                 WVPASS()
136
137                 r1all = [e.name for e in r1]
138                 WVPASSEQ(r1all,
139                          ['/a/b/x', '/a/b/c', '/a/b/', '/a/', '/'])
140                 r2all = [e.name for e in r2]
141                 WVPASSEQ(r2all,
142                          ['/a/b/n/2', '/a/b/n/', '/a/b/', '/a/', '/'])
143                 r3all = [e.name for e in r3]
144                 WVPASSEQ(r3all,
145                          ['/a/c/n/3', '/a/c/n/', '/a/c/', '/a/', '/'])
146                 all = [e.name for e in index.merge(r2, r1, r3)]
147                 WVPASSEQ(all,
148                          ['/a/c/n/3', '/a/c/n/', '/a/c/',
149                           '/a/b/x', '/a/b/n/2', '/a/b/n/', '/a/b/c',
150                           '/a/b/', '/a/', '/'])
151                 fake_validate(r1)
152                 dump(r1)
153
154                 print [hex(e.flags) for e in r1]
155                 WVPASSEQ([e.name for e in r1 if e.is_valid()], r1all)
156                 WVPASSEQ([e.name for e in r1 if not e.is_valid()], [])
157                 WVPASSEQ([e.name for e in index.merge(r2, r1, r3) if not e.is_valid()],
158                          ['/a/c/n/3', '/a/c/n/', '/a/c/',
159                           '/a/b/n/2', '/a/b/n/', '/a/b/', '/a/', '/'])
160
161                 expect_invalid = ['/'] + r2all + r3all
162                 expect_real = (set(r1all) - set(r2all) - set(r3all)) \
163                                 | set(['/a/b/n/2', '/a/c/n/3'])
164                 dump(index.merge(r2, r1, r3))
165                 for e in index.merge(r2, r1, r3):
166                     print e.name, hex(e.flags), e.ctime
167                     eiv = e.name in expect_invalid
168                     er  = e.name in expect_real
169                     WVPASSEQ(eiv, not e.is_valid())
170                     WVPASSEQ(er, e.is_real())
171                 fake_validate(r2, r3)
172                 dump(index.merge(r2, r1, r3))
173                 WVPASSEQ([e.name for e in index.merge(r2, r1, r3) if not e.is_valid()], [])
174
175                 e = eget(index.merge(r2, r1, r3), '/a/b/c')
176                 e.invalidate()
177                 e.repack()
178                 dump(index.merge(r2, r1, r3))
179                 WVPASSEQ([e.name for e in index.merge(r2, r1, r3) if not e.is_valid()],
180                          ['/a/b/c', '/a/b/', '/a/', '/'])
181                 w1.close()
182                 w2.close()
183                 w3.close()
184             finally:
185                 os.chdir(orig_cwd)