]> arthur.barton.de Git - bup.git/blob - test/int/test_index.py
test: add pylint and test imports
[bup.git] / test / int / test_index.py
1
2 from __future__ import absolute_import, print_function
3 import os, time
4
5 from wvpytest import *
6
7 from bup import index, metadata
8 from bup.compat import fsencode
9 from bup.helpers import resolve_parent
10 import bup.xstat as xstat
11
12
13 lib_t_dir = os.path.dirname(fsencode(__file__))
14
15
16 def test_index_basic():
17     cd = os.path.realpath(os.path.join(lib_t_dir, b'../'))
18     WVPASS(cd)
19     sd = os.path.realpath(cd + b'/sampledata')
20     WVPASSEQ(resolve_parent(cd + b'/sampledata'), sd)
21     WVPASSEQ(os.path.realpath(cd + b'/sampledata/x'), sd + b'/x')
22     WVPASSEQ(os.path.realpath(cd + b'/sampledata/var/abs-symlink'),
23              sd + b'/var/abs-symlink-target')
24     WVPASSEQ(resolve_parent(cd + b'/sampledata/var/abs-symlink'),
25              sd + b'/var/abs-symlink')
26
27
28 def test_index_writer(tmpdir):
29     orig_cwd = os.getcwd()
30     try:
31         os.chdir(tmpdir)
32         ds = xstat.stat(b'.')
33         fs = xstat.stat(lib_t_dir + b'/test_index.py')
34         ms = index.MetaStoreWriter(b'index.meta.tmp');
35         tmax = (time.time() - 1) * 10**9
36         w = index.Writer(b'index.tmp', ms, tmax)
37         w.add(b'/var/tmp/sporky', fs, 0)
38         w.add(b'/etc/passwd', fs, 0)
39         w.add(b'/etc/', ds, 0)
40         w.add(b'/', ds, 0)
41         ms.close()
42         w.close()
43     finally:
44         os.chdir(orig_cwd)
45
46
47 def dump(m):
48     for e in list(m):
49         print('%s%s %s' % (e.is_valid() and ' ' or 'M',
50                            e.is_fake() and 'F' or ' ',
51                            e.name))
52
53 def fake_validate(*l):
54     for i in l:
55         for e in i:
56             e.validate(0o100644, index.FAKE_SHA)
57             e.repack()
58
59 def eget(l, ename):
60     for e in l:
61         if e.name == ename:
62             return e
63
64 def test_index_negative_timestamps(tmpdir):
65     # Makes 'foo' exist
66     foopath = tmpdir + b'/foo'
67     f = open(foopath, 'wb')
68     f.close()
69
70     # Dec 31, 1969
71     os.utime(foopath, (-86400, -86400))
72     ns_per_sec = 10**9
73     tmax = (time.time() - 1) * ns_per_sec
74     e = index.BlankNewEntry(foopath, 0, tmax)
75     e.update_from_stat(xstat.stat(foopath), 0)
76     WVPASS(e.packed())
77
78     # Jun 10, 1893
79     os.utime(foopath, (-0x80000000, -0x80000000))
80     e = index.BlankNewEntry(foopath, 0, tmax)
81     e.update_from_stat(xstat.stat(foopath), 0)
82     WVPASS(e.packed())
83
84
85 def test_index_dirty(tmpdir):
86     orig_cwd = os.getcwd()
87     try:
88         os.chdir(tmpdir)
89         default_meta = metadata.Metadata()
90         ms1 = index.MetaStoreWriter(b'index.meta.tmp')
91         ms2 = index.MetaStoreWriter(b'index2.meta.tmp')
92         ms3 = index.MetaStoreWriter(b'index3.meta.tmp')
93         meta_ofs1 = ms1.store(default_meta)
94         meta_ofs2 = ms2.store(default_meta)
95         meta_ofs3 = ms3.store(default_meta)
96
97         ds = xstat.stat(lib_t_dir)
98         fs = xstat.stat(lib_t_dir + b'/test_index.py')
99         tmax = (time.time() - 1) * 10**9
100
101         w1 = index.Writer(b'index.tmp', ms1, tmax)
102         w1.add(b'/a/b/x', fs, meta_ofs1)
103         w1.add(b'/a/b/c', fs, meta_ofs1)
104         w1.add(b'/a/b/', ds, meta_ofs1)
105         w1.add(b'/a/', ds, meta_ofs1)
106         #w1.close()
107         WVPASS()
108
109         w2 = index.Writer(b'index2.tmp', ms2, tmax)
110         w2.add(b'/a/b/n/2', fs, meta_ofs2)
111         #w2.close()
112         WVPASS()
113
114         w3 = index.Writer(b'index3.tmp', ms3, tmax)
115         w3.add(b'/a/c/n/3', fs, meta_ofs3)
116         #w3.close()
117         WVPASS()
118
119         r1 = w1.new_reader()
120         r2 = w2.new_reader()
121         r3 = w3.new_reader()
122         WVPASS()
123
124         r1all = [e.name for e in r1]
125         WVPASSEQ(r1all,
126                  [b'/a/b/x', b'/a/b/c', b'/a/b/', b'/a/', b'/'])
127         r2all = [e.name for e in r2]
128         WVPASSEQ(r2all,
129                  [b'/a/b/n/2', b'/a/b/n/', b'/a/b/', b'/a/', b'/'])
130         r3all = [e.name for e in r3]
131         WVPASSEQ(r3all,
132                  [b'/a/c/n/3', b'/a/c/n/', b'/a/c/', b'/a/', b'/'])
133         all = [e.name for e in index.merge(r2, r1, r3)]
134         WVPASSEQ(all,
135                  [b'/a/c/n/3', b'/a/c/n/', b'/a/c/',
136                   b'/a/b/x', b'/a/b/n/2', b'/a/b/n/', b'/a/b/c',
137                   b'/a/b/', b'/a/', b'/'])
138         fake_validate(r1)
139         dump(r1)
140
141         print([hex(e.flags) for e in r1])
142         WVPASSEQ([e.name for e in r1 if e.is_valid()], r1all)
143         WVPASSEQ([e.name for e in r1 if not e.is_valid()], [])
144         WVPASSEQ([e.name for e in index.merge(r2, r1, r3) if not e.is_valid()],
145                  [b'/a/c/n/3', b'/a/c/n/', b'/a/c/',
146                   b'/a/b/n/2', b'/a/b/n/', b'/a/b/', b'/a/', b'/'])
147
148         expect_invalid = [b'/'] + r2all + r3all
149         expect_real = (set(r1all) - set(r2all) - set(r3all)) \
150                         | set([b'/a/b/n/2', b'/a/c/n/3'])
151         dump(index.merge(r2, r1, r3))
152         for e in index.merge(r2, r1, r3):
153             print(e.name, hex(e.flags), e.ctime)
154             eiv = e.name in expect_invalid
155             er  = e.name in expect_real
156             WVPASSEQ(eiv, not e.is_valid())
157             WVPASSEQ(er, e.is_real())
158         fake_validate(r2, r3)
159         dump(index.merge(r2, r1, r3))
160         WVPASSEQ([e.name for e in index.merge(r2, r1, r3) if not e.is_valid()], [])
161
162         e = eget(index.merge(r2, r1, r3), b'/a/b/c')
163         e.invalidate()
164         e.repack()
165         dump(index.merge(r2, r1, r3))
166         WVPASSEQ([e.name for e in index.merge(r2, r1, r3) if not e.is_valid()],
167                  [b'/a/b/c', b'/a/b/', b'/a/', b'/'])
168         w1.close()
169         w2.close()
170         w3.close()
171     finally:
172         os.chdir(orig_cwd)