]> arthur.barton.de Git - bup.git/blob - lib/bup/vint.py
test: add pylint and test imports
[bup.git] / lib / bup / vint.py
1 """Binary encodings for bup."""
2
3 # Copyright (C) 2010 Rob Browning
4 #
5 # This code is covered under the terms of the GNU Library General
6 # Public License as described in the bup LICENSE file.
7
8 # Variable length integers are encoded as vints -- see lucene.
9
10 from __future__ import absolute_import
11 from io import BytesIO
12
13 from bup import compat
14 from bup import _helpers
15
16
17 def write_vuint(port, x):
18     port.write(encode_vuint(x))
19
20
21 def encode_vuint(x):
22     try:
23         return _helpers.vuint_encode(x)
24     except OverflowError:
25         ret = b''
26         bytes_from_uint = compat.bytes_from_uint
27         if x < 0:
28             raise Exception("vuints must not be negative")
29         assert x, "the C version should have picked this up"
30
31         while True:
32             seven_bits = x & 0x7f
33             x >>= 7
34             if x:
35                 ret += bytes_from_uint(0x80 | seven_bits)
36             else:
37                 ret += bytes_from_uint(seven_bits)
38                 break
39         return ret
40
41 def read_vuint(port):
42     c = port.read(1)
43     if not c:
44         raise EOFError('encountered EOF while reading vuint')
45     assert isinstance(c, bytes)
46     if ord(c) == 0:
47         return 0
48     result = 0
49     offset = 0
50     while True:
51         b = ord(c)
52         if b & 0x80:
53             result |= ((b & 0x7f) << offset)
54             offset += 7
55             c = port.read(1)
56             if not c:
57                 raise EOFError('encountered EOF while reading vuint')
58         else:
59             result |= (b << offset)
60             break
61     return result
62
63
64 def write_vint(port, x):
65     # Sign is handled with the second bit of the first byte.  All else
66     # matches vuint.
67     port.write(encode_vint(x))
68
69
70 def encode_vint(x):
71     try:
72         return _helpers.vint_encode(x)
73     except OverflowError:
74         bytes_from_uint = compat.bytes_from_uint
75         assert x != 0, "the C version should have picked this up"
76         if x < 0:
77             x = -x
78             sign_and_six_bits = (x & 0x3f) | 0x40
79         else:
80             sign_and_six_bits = x & 0x3f
81         x >>= 6
82         assert x, "the C version should have picked this up"
83         return bytes_from_uint(0x80 | sign_and_six_bits) + encode_vuint(x)
84
85
86 def read_vint(port):
87     c = port.read(1)
88     if not c:
89         raise EOFError('encountered EOF while reading vint')
90     assert isinstance(c, bytes)
91     negative = False
92     result = 0
93     offset = 0
94     # Handle first byte with sign bit specially.
95     b = ord(c)
96     if b & 0x40:
97         negative = True
98     result |= (b & 0x3f)
99     if b & 0x80:
100         offset += 6
101         c = port.read(1)
102     elif negative:
103         return -result
104     else:
105         return result
106     while True:
107         b = ord(c)
108         if b & 0x80:
109             result |= ((b & 0x7f) << offset)
110             offset += 7
111             c = port.read(1)
112             if not c:
113                 raise EOFError('encountered EOF while reading vint')
114         else:
115             result |= (b << offset)
116             break
117     if negative:
118         return -result
119     else:
120         return result
121
122
123 def write_bvec(port, x):
124     write_vuint(port, len(x))
125     port.write(x)
126
127
128 def read_bvec(port):
129     n = read_vuint(port)
130     return port.read(n)
131
132
133 def encode_bvec(x):
134     return _helpers.vuint_encode(len(x)) + x
135
136
137 def skip_bvec(port):
138     port.read(read_vuint(port))
139
140 def send(port, types, *args):
141     if len(types) != len(args):
142         raise Exception('number of arguments does not match format string')
143     for (type, value) in zip(types, args):
144         if type == 'V':
145             write_vuint(port, value)
146         elif type == 'v':
147             write_vint(port, value)
148         elif type == 's':
149             write_bvec(port, value)
150         else:
151             raise Exception('unknown xpack format string item "' + type + '"')
152
153 def recv(port, types):
154     result = []
155     for type in types:
156         if type == 'V':
157             result.append(read_vuint(port))
158         elif type == 'v':
159             result.append(read_vint(port))
160         elif type == 's':
161             result.append(read_bvec(port))
162         else:
163             raise Exception('unknown xunpack format string item "' + type + '"')
164     return result
165
166 def pack(types, *args):
167     try:
168         return _helpers.limited_vint_pack(types, args)
169     except OverflowError:
170         assert len(types) == len(args)
171         ret = []
172         for typ, value in zip(types, args):
173             if typ == 'V':
174                 ret.append(encode_vuint(value))
175             elif typ == 'v':
176                 ret.append(encode_vint(value))
177             elif typ == 's':
178                 ret.append(encode_bvec(value))
179             else:
180                 assert False
181         return b''.join(ret)
182
183 def unpack(types, data):
184     port = BytesIO(data)
185     return recv(port, types)