]> arthur.barton.de Git - bup.git/blob - test/int/tindex.py
tests: move all tests to test/
[bup.git] / test / int / tindex.py
1
2 from __future__ import absolute_import, print_function
3 import os, time
4
5 from wvtest import *
6
7 from bup import index, metadata
8 from bup.compat import fsencode
9 from bup.helpers import mkdirp, resolve_parent
10 from buptest import no_lingering_errors, test_tempdir
11 import bup.xstat as xstat
12
13
14 lib_t_dir = os.path.dirname(fsencode(__file__))
15
16
17 @wvtest
18 def index_basic():
19     with no_lingering_errors():
20         cd = os.path.realpath(b'../')
21         WVPASS(cd)
22         sd = os.path.realpath(cd + b'/sampledata')
23         WVPASSEQ(resolve_parent(cd + b'/sampledata'), sd)
24         WVPASSEQ(os.path.realpath(cd + b'/sampledata/x'), sd + b'/x')
25         WVPASSEQ(os.path.realpath(cd + b'/sampledata/var/abs-symlink'),
26                  sd + b'/var/abs-symlink-target')
27         WVPASSEQ(resolve_parent(cd + b'/sampledata/var/abs-symlink'),
28                  sd + b'/var/abs-symlink')
29
30
31 @wvtest
32 def index_writer():
33     with no_lingering_errors():
34         with test_tempdir(b'bup-tindex-') as tmpdir:
35             orig_cwd = os.getcwd()
36             try:
37                 os.chdir(tmpdir)
38                 ds = xstat.stat(b'.')
39                 fs = xstat.stat(lib_t_dir + b'/tindex.py')
40                 ms = index.MetaStoreWriter(b'index.meta.tmp');
41                 tmax = (time.time() - 1) * 10**9
42                 w = index.Writer(b'index.tmp', ms, tmax)
43                 w.add(b'/var/tmp/sporky', fs, 0)
44                 w.add(b'/etc/passwd', fs, 0)
45                 w.add(b'/etc/', ds, 0)
46                 w.add(b'/', ds, 0)
47                 ms.close()
48                 w.close()
49             finally:
50                 os.chdir(orig_cwd)
51
52
53 def dump(m):
54     for e in list(m):
55         print('%s%s %s' % (e.is_valid() and ' ' or 'M',
56                            e.is_fake() and 'F' or ' ',
57                            e.name))
58
59 def fake_validate(*l):
60     for i in l:
61         for e in i:
62             e.validate(0o100644, index.FAKE_SHA)
63             e.repack()
64
65 def eget(l, ename):
66     for e in l:
67         if e.name == ename:
68             return e
69
70 @wvtest
71 def index_negative_timestamps():
72     with no_lingering_errors():
73         with test_tempdir(b'bup-tindex-') as tmpdir:
74             # Makes 'foo' exist
75             foopath = tmpdir + b'/foo'
76             f = open(foopath, 'wb')
77             f.close()
78
79             # Dec 31, 1969
80             os.utime(foopath, (-86400, -86400))
81             ns_per_sec = 10**9
82             tmax = (time.time() - 1) * ns_per_sec
83             e = index.BlankNewEntry(foopath, 0, tmax)
84             e.update_from_stat(xstat.stat(foopath), 0)
85             WVPASS(e.packed())
86
87             # Jun 10, 1893
88             os.utime(foopath, (-0x80000000, -0x80000000))
89             e = index.BlankNewEntry(foopath, 0, tmax)
90             e.update_from_stat(xstat.stat(foopath), 0)
91             WVPASS(e.packed())
92
93
94 @wvtest
95 def index_dirty():
96     with no_lingering_errors():
97         with test_tempdir(b'bup-tindex-') as tmpdir:
98             orig_cwd = os.getcwd()
99             try:
100                 os.chdir(tmpdir)
101                 default_meta = metadata.Metadata()
102                 ms1 = index.MetaStoreWriter(b'index.meta.tmp')
103                 ms2 = index.MetaStoreWriter(b'index2.meta.tmp')
104                 ms3 = index.MetaStoreWriter(b'index3.meta.tmp')
105                 meta_ofs1 = ms1.store(default_meta)
106                 meta_ofs2 = ms2.store(default_meta)
107                 meta_ofs3 = ms3.store(default_meta)
108
109                 ds = xstat.stat(lib_t_dir)
110                 fs = xstat.stat(lib_t_dir + b'/tindex.py')
111                 tmax = (time.time() - 1) * 10**9
112
113                 w1 = index.Writer(b'index.tmp', ms1, tmax)
114                 w1.add(b'/a/b/x', fs, meta_ofs1)
115                 w1.add(b'/a/b/c', fs, meta_ofs1)
116                 w1.add(b'/a/b/', ds, meta_ofs1)
117                 w1.add(b'/a/', ds, meta_ofs1)
118                 #w1.close()
119                 WVPASS()
120
121                 w2 = index.Writer(b'index2.tmp', ms2, tmax)
122                 w2.add(b'/a/b/n/2', fs, meta_ofs2)
123                 #w2.close()
124                 WVPASS()
125
126                 w3 = index.Writer(b'index3.tmp', ms3, tmax)
127                 w3.add(b'/a/c/n/3', fs, meta_ofs3)
128                 #w3.close()
129                 WVPASS()
130
131                 r1 = w1.new_reader()
132                 r2 = w2.new_reader()
133                 r3 = w3.new_reader()
134                 WVPASS()
135
136                 r1all = [e.name for e in r1]
137                 WVPASSEQ(r1all,
138                          [b'/a/b/x', b'/a/b/c', b'/a/b/', b'/a/', b'/'])
139                 r2all = [e.name for e in r2]
140                 WVPASSEQ(r2all,
141                          [b'/a/b/n/2', b'/a/b/n/', b'/a/b/', b'/a/', b'/'])
142                 r3all = [e.name for e in r3]
143                 WVPASSEQ(r3all,
144                          [b'/a/c/n/3', b'/a/c/n/', b'/a/c/', b'/a/', b'/'])
145                 all = [e.name for e in index.merge(r2, r1, r3)]
146                 WVPASSEQ(all,
147                          [b'/a/c/n/3', b'/a/c/n/', b'/a/c/',
148                           b'/a/b/x', b'/a/b/n/2', b'/a/b/n/', b'/a/b/c',
149                           b'/a/b/', b'/a/', b'/'])
150                 fake_validate(r1)
151                 dump(r1)
152
153                 print([hex(e.flags) for e in r1])
154                 WVPASSEQ([e.name for e in r1 if e.is_valid()], r1all)
155                 WVPASSEQ([e.name for e in r1 if not e.is_valid()], [])
156                 WVPASSEQ([e.name for e in index.merge(r2, r1, r3) if not e.is_valid()],
157                          [b'/a/c/n/3', b'/a/c/n/', b'/a/c/',
158                           b'/a/b/n/2', b'/a/b/n/', b'/a/b/', b'/a/', b'/'])
159
160                 expect_invalid = [b'/'] + r2all + r3all
161                 expect_real = (set(r1all) - set(r2all) - set(r3all)) \
162                                 | set([b'/a/b/n/2', b'/a/c/n/3'])
163                 dump(index.merge(r2, r1, r3))
164                 for e in index.merge(r2, r1, r3):
165                     print(e.name, hex(e.flags), e.ctime)
166                     eiv = e.name in expect_invalid
167                     er  = e.name in expect_real
168                     WVPASSEQ(eiv, not e.is_valid())
169                     WVPASSEQ(er, e.is_real())
170                 fake_validate(r2, r3)
171                 dump(index.merge(r2, r1, r3))
172                 WVPASSEQ([e.name for e in index.merge(r2, r1, r3) if not e.is_valid()], [])
173
174                 e = eget(index.merge(r2, r1, r3), b'/a/b/c')
175                 e.invalidate()
176                 e.repack()
177                 dump(index.merge(r2, r1, r3))
178                 WVPASSEQ([e.name for e in index.merge(r2, r1, r3) if not e.is_valid()],
179                          [b'/a/b/c', b'/a/b/', b'/a/', b'/'])
180                 w1.close()
181                 w2.close()
182                 w3.close()
183             finally:
184                 os.chdir(orig_cwd)