]> arthur.barton.de Git - bup.git/blob - test/int/test_bloom.py
Remove Client __del__ in favor of context management
[bup.git] / test / int / test_bloom.py
1
2 from __future__ import absolute_import, print_function
3 import os
4 import errno, platform, tempfile
5 import logging
6
7 from bup import bloom
8
9 def test_bloom(tmpdir):
10     hashes = [os.urandom(20) for i in range(100)]
11     class Idx:
12         pass
13     ix = Idx()
14     ix.name = b'dummy.idx'
15     ix.shatable = b''.join(hashes)
16     for k in (4, 5):
17         b = bloom.create(tmpdir + b'/pybuptest.bloom', expected=100, k=k)
18         b.add_idx(ix)
19         assert b.pfalse_positive() < .1
20         b.close()
21         b = bloom.ShaBloom(tmpdir + b'/pybuptest.bloom')
22         all_present = True
23         for h in hashes:
24             all_present &= (b.exists(h) or False)
25         assert all_present
26         false_positives = 0
27         for h in [os.urandom(20) for i in range(1000)]:
28             if b.exists(h):
29                 false_positives += 1
30         assert false_positives < 5
31         os.unlink(tmpdir + b'/pybuptest.bloom')
32
33     tf = tempfile.TemporaryFile(dir=tmpdir)
34     b = bloom.create(b'bup.bloom', f=tf, expected=100)
35     assert b.rwfile == tf
36     assert b.k == 5
37
38     # Test large (~1GiB) filter.  This may fail on s390 (31-bit
39     # architecture), and anywhere else where the address space is
40     # sufficiently limited.
41     tf = tempfile.TemporaryFile(dir=tmpdir)
42     skip_test = False
43     try:
44         b = bloom.create(b'bup.bloom', f=tf, expected=2**28,
45                          delaywrite=False)
46         assert b.k == 4
47     except EnvironmentError as ex:
48         (ptr_width, linkage) = platform.architecture()
49         if ptr_width == '32bit' and ex.errno == errno.ENOMEM:
50             logging.getLogger().info('skipping large bloom filter test (mmap probably failed) '
51                   + str(ex))
52         else:
53             raise