]> arthur.barton.de Git - bup.git/blob - lib/bup/t/tindex.py
6639e0b939fcd5f7584c15efaebf42055d6b4353
[bup.git] / lib / bup / t / tindex.py
1
2 import os, time
3
4 from wvtest import *
5
6 from bup import index, metadata
7 from bup.helpers import mkdirp, resolve_parent
8 from buptest import no_lingering_errors, test_tempdir
9 import bup.xstat as xstat
10
11
12 lib_t_dir = os.path.dirname(__file__)
13
14
15 @wvtest
16 def index_basic():
17     with no_lingering_errors():
18         cd = os.path.realpath('../../../t')
19         WVPASS(cd)
20         sd = os.path.realpath(cd + '/sampledata')
21         WVPASSEQ(resolve_parent(cd + '/sampledata'), sd)
22         WVPASSEQ(os.path.realpath(cd + '/sampledata/x'), sd + '/x')
23         WVPASSEQ(os.path.realpath(cd + '/sampledata/var/abs-symlink'),
24                  sd + '/var/abs-symlink-target')
25         WVPASSEQ(resolve_parent(cd + '/sampledata/var/abs-symlink'),
26                  sd + '/var/abs-symlink')
27
28
29 @wvtest
30 def index_writer():
31     with no_lingering_errors():
32         with test_tempdir('bup-tindex-') as tmpdir:
33             orig_cwd = os.getcwd()
34             try:
35                 os.chdir(tmpdir)
36                 ds = xstat.stat('.')
37                 fs = xstat.stat(lib_t_dir + '/tindex.py')
38                 ms = index.MetaStoreWriter('index.meta.tmp');
39                 tmax = (time.time() - 1) * 10**9
40                 w = index.Writer('index.tmp', ms, tmax)
41                 w.add('/var/tmp/sporky', fs, 0)
42                 w.add('/etc/passwd', fs, 0)
43                 w.add('/etc/', ds, 0)
44                 w.add('/', ds, 0)
45                 ms.close()
46                 w.close()
47             finally:
48                 os.chdir(orig_cwd)
49
50
51 def dump(m):
52     for e in list(m):
53         print '%s%s %s' % (e.is_valid() and ' ' or 'M',
54                            e.is_fake() and 'F' or ' ',
55                            e.name)
56
57 def fake_validate(*l):
58     for i in l:
59         for e in i:
60             e.validate(0100644, index.FAKE_SHA)
61             e.repack()
62
63 def eget(l, ename):
64     for e in l:
65         if e.name == ename:
66             return e
67
68 @wvtest
69 def index_negative_timestamps():
70     with no_lingering_errors():
71         with test_tempdir('bup-tindex-') as tmpdir:
72             # Makes 'foo' exist
73             foopath = tmpdir + '/foo'
74             f = file(foopath, 'wb')
75             f.close()
76
77             # Dec 31, 1969
78             os.utime(foopath, (-86400, -86400))
79             ns_per_sec = 10**9
80             tmax = (time.time() - 1) * ns_per_sec
81             e = index.BlankNewEntry(foopath, 0, tmax)
82             e.update_from_stat(xstat.stat(foopath), 0)
83             WVPASS(e.packed())
84
85             # Jun 10, 1893
86             os.utime(foopath, (-0x80000000, -0x80000000))
87             e = index.BlankNewEntry(foopath, 0, tmax)
88             e.update_from_stat(xstat.stat(foopath), 0)
89             WVPASS(e.packed())
90
91
92 @wvtest
93 def index_dirty():
94     with no_lingering_errors():
95         with test_tempdir('bup-tindex-') as tmpdir:
96             orig_cwd = os.getcwd()
97             try:
98                 os.chdir(tmpdir)
99                 default_meta = metadata.Metadata()
100                 ms1 = index.MetaStoreWriter('index.meta.tmp')
101                 ms2 = index.MetaStoreWriter('index2.meta.tmp')
102                 ms3 = index.MetaStoreWriter('index3.meta.tmp')
103                 meta_ofs1 = ms1.store(default_meta)
104                 meta_ofs2 = ms2.store(default_meta)
105                 meta_ofs3 = ms3.store(default_meta)
106
107                 ds = xstat.stat(lib_t_dir)
108                 fs = xstat.stat(lib_t_dir + '/tindex.py')
109                 tmax = (time.time() - 1) * 10**9
110
111                 w1 = index.Writer('index.tmp', ms1, tmax)
112                 w1.add('/a/b/x', fs, meta_ofs1)
113                 w1.add('/a/b/c', fs, meta_ofs1)
114                 w1.add('/a/b/', ds, meta_ofs1)
115                 w1.add('/a/', ds, meta_ofs1)
116                 #w1.close()
117                 WVPASS()
118
119                 w2 = index.Writer('index2.tmp', ms2, tmax)
120                 w2.add('/a/b/n/2', fs, meta_ofs2)
121                 #w2.close()
122                 WVPASS()
123
124                 w3 = index.Writer('index3.tmp', ms3, tmax)
125                 w3.add('/a/c/n/3', fs, meta_ofs3)
126                 #w3.close()
127                 WVPASS()
128
129                 r1 = w1.new_reader()
130                 r2 = w2.new_reader()
131                 r3 = w3.new_reader()
132                 WVPASS()
133
134                 r1all = [e.name for e in r1]
135                 WVPASSEQ(r1all,
136                          ['/a/b/x', '/a/b/c', '/a/b/', '/a/', '/'])
137                 r2all = [e.name for e in r2]
138                 WVPASSEQ(r2all,
139                          ['/a/b/n/2', '/a/b/n/', '/a/b/', '/a/', '/'])
140                 r3all = [e.name for e in r3]
141                 WVPASSEQ(r3all,
142                          ['/a/c/n/3', '/a/c/n/', '/a/c/', '/a/', '/'])
143                 all = [e.name for e in index.merge(r2, r1, r3)]
144                 WVPASSEQ(all,
145                          ['/a/c/n/3', '/a/c/n/', '/a/c/',
146                           '/a/b/x', '/a/b/n/2', '/a/b/n/', '/a/b/c',
147                           '/a/b/', '/a/', '/'])
148                 fake_validate(r1)
149                 dump(r1)
150
151                 print [hex(e.flags) for e in r1]
152                 WVPASSEQ([e.name for e in r1 if e.is_valid()], r1all)
153                 WVPASSEQ([e.name for e in r1 if not e.is_valid()], [])
154                 WVPASSEQ([e.name for e in index.merge(r2, r1, r3) if not e.is_valid()],
155                          ['/a/c/n/3', '/a/c/n/', '/a/c/',
156                           '/a/b/n/2', '/a/b/n/', '/a/b/', '/a/', '/'])
157
158                 expect_invalid = ['/'] + r2all + r3all
159                 expect_real = (set(r1all) - set(r2all) - set(r3all)) \
160                                 | set(['/a/b/n/2', '/a/c/n/3'])
161                 dump(index.merge(r2, r1, r3))
162                 for e in index.merge(r2, r1, r3):
163                     print e.name, hex(e.flags), e.ctime
164                     eiv = e.name in expect_invalid
165                     er  = e.name in expect_real
166                     WVPASSEQ(eiv, not e.is_valid())
167                     WVPASSEQ(er, e.is_real())
168                 fake_validate(r2, r3)
169                 dump(index.merge(r2, r1, r3))
170                 WVPASSEQ([e.name for e in index.merge(r2, r1, r3) if not e.is_valid()], [])
171
172                 e = eget(index.merge(r2, r1, r3), '/a/b/c')
173                 e.invalidate()
174                 e.repack()
175                 dump(index.merge(r2, r1, r3))
176                 WVPASSEQ([e.name for e in index.merge(r2, r1, r3) if not e.is_valid()],
177                          ['/a/b/c', '/a/b/', '/a/', '/'])
178                 w1.close()
179                 w2.close()
180                 w3.close()
181             finally:
182                 os.chdir(orig_cwd)