]> arthur.barton.de Git - bup.git/blob - test/int/test_index.py
index.Writer.__del__: replace with context management
[bup.git] / test / int / test_index.py
1
2 from __future__ import absolute_import, print_function
3 import os, time
4
5 from wvpytest import *
6
7 from bup import index, metadata
8 from bup.compat import fsencode
9 from bup.helpers import resolve_parent
10 import bup.xstat as xstat
11
12
13 lib_t_dir = os.path.dirname(fsencode(__file__))
14
15
16 def test_index_basic():
17     cd = os.path.realpath(os.path.join(lib_t_dir, b'../'))
18     WVPASS(cd)
19     sd = os.path.realpath(cd + b'/sampledata')
20     WVPASSEQ(resolve_parent(cd + b'/sampledata'), sd)
21     WVPASSEQ(os.path.realpath(cd + b'/sampledata/x'), sd + b'/x')
22     WVPASSEQ(os.path.realpath(cd + b'/sampledata/var/abs-symlink'),
23              sd + b'/var/abs-symlink-target')
24     WVPASSEQ(resolve_parent(cd + b'/sampledata/var/abs-symlink'),
25              sd + b'/var/abs-symlink')
26
27
28 def test_index_writer(tmpdir):
29     orig_cwd = os.getcwd()
30     try:
31         os.chdir(tmpdir)
32         ds = xstat.stat(b'.')
33         fs = xstat.stat(lib_t_dir + b'/test_index.py')
34         tmax = (time.time() - 1) * 10**9
35         with index.MetaStoreWriter(b'index.meta.tmp') as ms, \
36              index.Writer(b'index.tmp', ms, tmax) as w:
37             w.add(b'/var/tmp/sporky', fs, 0)
38             w.add(b'/etc/passwd', fs, 0)
39             w.add(b'/etc/', ds, 0)
40             w.add(b'/', ds, 0)
41             w.close()
42     finally:
43         os.chdir(orig_cwd)
44
45
46 def dump(m):
47     for e in list(m):
48         print('%s%s %s' % (e.is_valid() and ' ' or 'M',
49                            e.is_fake() and 'F' or ' ',
50                            e.name))
51
52 def fake_validate(*l):
53     for i in l:
54         for e in i:
55             e.validate(0o100644, index.FAKE_SHA)
56             e.repack()
57
58 def eget(l, ename):
59     for e in l:
60         if e.name == ename:
61             return e
62     return None
63
64 def test_index_negative_timestamps(tmpdir):
65     # Makes 'foo' exist
66     foopath = tmpdir + b'/foo'
67     f = open(foopath, 'wb')
68     f.close()
69
70     # Dec 31, 1969
71     os.utime(foopath, (-86400, -86400))
72     ns_per_sec = 10**9
73     tmax = (time.time() - 1) * ns_per_sec
74     e = index.BlankNewEntry(foopath, 0, tmax)
75     e.update_from_stat(xstat.stat(foopath), 0)
76     WVPASS(e.packed())
77
78     # Jun 10, 1893
79     os.utime(foopath, (-0x80000000, -0x80000000))
80     e = index.BlankNewEntry(foopath, 0, tmax)
81     e.update_from_stat(xstat.stat(foopath), 0)
82     WVPASS(e.packed())
83
84
85 def test_index_dirty(tmpdir):
86     orig_cwd = os.getcwd()
87     try:
88         os.chdir(tmpdir)
89         default_meta = metadata.Metadata()
90
91         with index.MetaStoreWriter(b'index.meta.tmp') as ms1, \
92              index.MetaStoreWriter(b'index2.meta.tmp') as ms2, \
93              index.MetaStoreWriter(b'index3.meta.tmp') as ms3:
94
95             meta_ofs1 = ms1.store(default_meta)
96             meta_ofs2 = ms2.store(default_meta)
97             meta_ofs3 = ms3.store(default_meta)
98
99             ds = xstat.stat(lib_t_dir)
100             fs = xstat.stat(lib_t_dir + b'/test_index.py')
101             tmax = (time.time() - 1) * 10**9
102
103             with index.Writer(b'index.tmp', ms1, tmax) as w1, \
104                  index.Writer(b'index2.tmp', ms2, tmax) as w2, \
105                  index.Writer(b'index3.tmp', ms3, tmax) as w3:
106
107                 w1.add(b'/a/b/x', fs, meta_ofs1)
108                 w1.add(b'/a/b/c', fs, meta_ofs1)
109                 w1.add(b'/a/b/', ds, meta_ofs1)
110                 w1.add(b'/a/', ds, meta_ofs1)
111                 #w1.close()
112                 WVPASS()
113
114                 w2 = index.Writer(b'index2.tmp', ms2, tmax)
115                 w2.add(b'/a/b/n/2', fs, meta_ofs2)
116                 #w2.close()
117                 WVPASS()
118
119                 w3.add(b'/a/c/n/3', fs, meta_ofs3)
120                 #w3.close()
121                 WVPASS()
122
123                 with w1.new_reader() as r1, \
124                      w2.new_reader() as r2, \
125                      w3.new_reader() as r3:
126                     WVPASS()
127
128                     r1all = [e.name for e in r1]
129                     WVPASSEQ(r1all,
130                              [b'/a/b/x', b'/a/b/c', b'/a/b/', b'/a/', b'/'])
131                     r2all = [e.name for e in r2]
132                     WVPASSEQ(r2all,
133                              [b'/a/b/n/2', b'/a/b/n/', b'/a/b/', b'/a/', b'/'])
134                     r3all = [e.name for e in r3]
135                     WVPASSEQ(r3all,
136                              [b'/a/c/n/3', b'/a/c/n/', b'/a/c/', b'/a/', b'/'])
137                     all = [e.name for e in index.merge(r2, r1, r3)]
138                     WVPASSEQ(all,
139                              [b'/a/c/n/3', b'/a/c/n/', b'/a/c/',
140                               b'/a/b/x', b'/a/b/n/2', b'/a/b/n/', b'/a/b/c',
141                               b'/a/b/', b'/a/', b'/'])
142                     fake_validate(r1)
143                     dump(r1)
144
145                     print([hex(e.flags) for e in r1])
146                     WVPASSEQ([e.name for e in r1 if e.is_valid()], r1all)
147                     WVPASSEQ([e.name for e in r1 if not e.is_valid()], [])
148                     WVPASSEQ([e.name for e in index.merge(r2, r1, r3) if not e.is_valid()],
149                              [b'/a/c/n/3', b'/a/c/n/', b'/a/c/',
150                               b'/a/b/n/2', b'/a/b/n/', b'/a/b/', b'/a/', b'/'])
151
152                     expect_invalid = [b'/'] + r2all + r3all
153                     expect_real = (set(r1all) - set(r2all) - set(r3all)) \
154                                     | set([b'/a/b/n/2', b'/a/c/n/3'])
155                     dump(index.merge(r2, r1, r3))
156                     for e in index.merge(r2, r1, r3):
157                         print(e.name, hex(e.flags), e.ctime)
158                         eiv = e.name in expect_invalid
159                         er  = e.name in expect_real
160                         WVPASSEQ(eiv, not e.is_valid())
161                         WVPASSEQ(er, e.is_real())
162                     fake_validate(r2, r3)
163                     dump(index.merge(r2, r1, r3))
164                     WVPASSEQ([e.name for e in index.merge(r2, r1, r3) if not e.is_valid()], [])
165
166                     e = eget(index.merge(r2, r1, r3), b'/a/b/c')
167                     e.invalidate()
168                     e.repack()
169                     dump(index.merge(r2, r1, r3))
170                     WVPASSEQ([e.name for e in index.merge(r2, r1, r3) if not e.is_valid()],
171                              [b'/a/b/c', b'/a/b/', b'/a/', b'/'])
172     finally:
173         os.chdir(orig_cwd)