1 from bup import hashsplit, _helpers
3 from cStringIO import StringIO
6 def test_rolling_sums():
7 WVPASS(_helpers.selftest())
10 def test_fanout_behaviour():
12 # Drop in replacement for bupsplit, but splitting if the int value of a
13 # byte >= BUP_BLOBBITS
14 basebits = _helpers.blobbits()
19 if ord(c) >= basebits:
23 old_splitbuf = _helpers.splitbuf
24 _helpers.splitbuf = splitbuf
25 old_BLOB_MAX = hashsplit.BLOB_MAX
26 hashsplit.BLOB_MAX = 4
27 old_BLOB_READ_SIZE = hashsplit.BLOB_READ_SIZE
28 hashsplit.BLOB_READ_SIZE = 10
29 old_fanout = hashsplit.fanout
32 levels = lambda f: [(len(b), l) for b, l in
33 hashsplit.hashsplit_iter([f], True, None)]
34 # Return a string of n null bytes
35 z = lambda n: '\x00' * n
36 # Return a byte which will be split with a level of n
37 sb = lambda n: chr(basebits + n)
39 split_never = StringIO(z(16))
40 split_first = StringIO(z(1) + sb(3) + z(14))
41 split_end = StringIO(z(13) + sb(1) + z(2))
42 split_many = StringIO(sb(1) + z(3) + sb(2) + z(4) +
43 sb(0) + z(4) + sb(5) + z(1))
44 WVPASSEQ(levels(split_never), [(4, 0), (4, 0), (4, 0), (4, 0)])
45 WVPASSEQ(levels(split_first), [(2, 3), (4, 0), (4, 0), (4, 0), (2, 0)])
46 WVPASSEQ(levels(split_end), [(4, 0), (4, 0), (4, 0), (2, 1), (2, 0)])
47 WVPASSEQ(levels(split_many),
48 [(1, 1), (4, 2), (4, 0), (1, 0), (4, 0), (1, 5), (1, 0)])
50 _helpers.splitbuf = old_splitbuf
51 hashsplit.BLOB_MAX = old_BLOB_MAX
52 hashsplit.BLOB_READ_SIZE = old_BLOB_READ_SIZE
53 hashsplit.fanout = old_fanout