]> arthur.barton.de Git - bup.git/blob - lib/bup/t/tindex.py
Use absolute_import from the __future__ everywhere
[bup.git] / lib / bup / t / tindex.py
1
2 from __future__ import absolute_import
3 import os, time
4
5 from wvtest import *
6
7 from bup import index, metadata
8 from bup.helpers import mkdirp, resolve_parent
9 from buptest import no_lingering_errors, test_tempdir
10 import bup.xstat as xstat
11
12
13 lib_t_dir = os.path.dirname(__file__)
14
15
16 @wvtest
17 def index_basic():
18     with no_lingering_errors():
19         cd = os.path.realpath('../../../t')
20         WVPASS(cd)
21         sd = os.path.realpath(cd + '/sampledata')
22         WVPASSEQ(resolve_parent(cd + '/sampledata'), sd)
23         WVPASSEQ(os.path.realpath(cd + '/sampledata/x'), sd + '/x')
24         WVPASSEQ(os.path.realpath(cd + '/sampledata/var/abs-symlink'),
25                  sd + '/var/abs-symlink-target')
26         WVPASSEQ(resolve_parent(cd + '/sampledata/var/abs-symlink'),
27                  sd + '/var/abs-symlink')
28
29
30 @wvtest
31 def index_writer():
32     with no_lingering_errors():
33         with test_tempdir('bup-tindex-') as tmpdir:
34             orig_cwd = os.getcwd()
35             try:
36                 os.chdir(tmpdir)
37                 ds = xstat.stat('.')
38                 fs = xstat.stat(lib_t_dir + '/tindex.py')
39                 ms = index.MetaStoreWriter('index.meta.tmp');
40                 tmax = (time.time() - 1) * 10**9
41                 w = index.Writer('index.tmp', ms, tmax)
42                 w.add('/var/tmp/sporky', fs, 0)
43                 w.add('/etc/passwd', fs, 0)
44                 w.add('/etc/', ds, 0)
45                 w.add('/', ds, 0)
46                 ms.close()
47                 w.close()
48             finally:
49                 os.chdir(orig_cwd)
50
51
52 def dump(m):
53     for e in list(m):
54         print '%s%s %s' % (e.is_valid() and ' ' or 'M',
55                            e.is_fake() and 'F' or ' ',
56                            e.name)
57
58 def fake_validate(*l):
59     for i in l:
60         for e in i:
61             e.validate(0100644, index.FAKE_SHA)
62             e.repack()
63
64 def eget(l, ename):
65     for e in l:
66         if e.name == ename:
67             return e
68
69 @wvtest
70 def index_negative_timestamps():
71     with no_lingering_errors():
72         with test_tempdir('bup-tindex-') as tmpdir:
73             # Makes 'foo' exist
74             foopath = tmpdir + '/foo'
75             f = file(foopath, 'wb')
76             f.close()
77
78             # Dec 31, 1969
79             os.utime(foopath, (-86400, -86400))
80             ns_per_sec = 10**9
81             tmax = (time.time() - 1) * ns_per_sec
82             e = index.BlankNewEntry(foopath, 0, tmax)
83             e.update_from_stat(xstat.stat(foopath), 0)
84             WVPASS(e.packed())
85
86             # Jun 10, 1893
87             os.utime(foopath, (-0x80000000, -0x80000000))
88             e = index.BlankNewEntry(foopath, 0, tmax)
89             e.update_from_stat(xstat.stat(foopath), 0)
90             WVPASS(e.packed())
91
92
93 @wvtest
94 def index_dirty():
95     with no_lingering_errors():
96         with test_tempdir('bup-tindex-') as tmpdir:
97             orig_cwd = os.getcwd()
98             try:
99                 os.chdir(tmpdir)
100                 default_meta = metadata.Metadata()
101                 ms1 = index.MetaStoreWriter('index.meta.tmp')
102                 ms2 = index.MetaStoreWriter('index2.meta.tmp')
103                 ms3 = index.MetaStoreWriter('index3.meta.tmp')
104                 meta_ofs1 = ms1.store(default_meta)
105                 meta_ofs2 = ms2.store(default_meta)
106                 meta_ofs3 = ms3.store(default_meta)
107
108                 ds = xstat.stat(lib_t_dir)
109                 fs = xstat.stat(lib_t_dir + '/tindex.py')
110                 tmax = (time.time() - 1) * 10**9
111
112                 w1 = index.Writer('index.tmp', ms1, tmax)
113                 w1.add('/a/b/x', fs, meta_ofs1)
114                 w1.add('/a/b/c', fs, meta_ofs1)
115                 w1.add('/a/b/', ds, meta_ofs1)
116                 w1.add('/a/', ds, meta_ofs1)
117                 #w1.close()
118                 WVPASS()
119
120                 w2 = index.Writer('index2.tmp', ms2, tmax)
121                 w2.add('/a/b/n/2', fs, meta_ofs2)
122                 #w2.close()
123                 WVPASS()
124
125                 w3 = index.Writer('index3.tmp', ms3, tmax)
126                 w3.add('/a/c/n/3', fs, meta_ofs3)
127                 #w3.close()
128                 WVPASS()
129
130                 r1 = w1.new_reader()
131                 r2 = w2.new_reader()
132                 r3 = w3.new_reader()
133                 WVPASS()
134
135                 r1all = [e.name for e in r1]
136                 WVPASSEQ(r1all,
137                          ['/a/b/x', '/a/b/c', '/a/b/', '/a/', '/'])
138                 r2all = [e.name for e in r2]
139                 WVPASSEQ(r2all,
140                          ['/a/b/n/2', '/a/b/n/', '/a/b/', '/a/', '/'])
141                 r3all = [e.name for e in r3]
142                 WVPASSEQ(r3all,
143                          ['/a/c/n/3', '/a/c/n/', '/a/c/', '/a/', '/'])
144                 all = [e.name for e in index.merge(r2, r1, r3)]
145                 WVPASSEQ(all,
146                          ['/a/c/n/3', '/a/c/n/', '/a/c/',
147                           '/a/b/x', '/a/b/n/2', '/a/b/n/', '/a/b/c',
148                           '/a/b/', '/a/', '/'])
149                 fake_validate(r1)
150                 dump(r1)
151
152                 print [hex(e.flags) for e in r1]
153                 WVPASSEQ([e.name for e in r1 if e.is_valid()], r1all)
154                 WVPASSEQ([e.name for e in r1 if not e.is_valid()], [])
155                 WVPASSEQ([e.name for e in index.merge(r2, r1, r3) if not e.is_valid()],
156                          ['/a/c/n/3', '/a/c/n/', '/a/c/',
157                           '/a/b/n/2', '/a/b/n/', '/a/b/', '/a/', '/'])
158
159                 expect_invalid = ['/'] + r2all + r3all
160                 expect_real = (set(r1all) - set(r2all) - set(r3all)) \
161                                 | set(['/a/b/n/2', '/a/c/n/3'])
162                 dump(index.merge(r2, r1, r3))
163                 for e in index.merge(r2, r1, r3):
164                     print e.name, hex(e.flags), e.ctime
165                     eiv = e.name in expect_invalid
166                     er  = e.name in expect_real
167                     WVPASSEQ(eiv, not e.is_valid())
168                     WVPASSEQ(er, e.is_real())
169                 fake_validate(r2, r3)
170                 dump(index.merge(r2, r1, r3))
171                 WVPASSEQ([e.name for e in index.merge(r2, r1, r3) if not e.is_valid()], [])
172
173                 e = eget(index.merge(r2, r1, r3), '/a/b/c')
174                 e.invalidate()
175                 e.repack()
176                 dump(index.merge(r2, r1, r3))
177                 WVPASSEQ([e.name for e in index.merge(r2, r1, r3) if not e.is_valid()],
178                          ['/a/b/c', '/a/b/', '/a/', '/'])
179                 w1.close()
180                 w2.close()
181                 w3.close()
182             finally:
183                 os.chdir(orig_cwd)