]> arthur.barton.de Git - bup.git/blob - lib/bup/vint.py
cmd/bup: add some missing safety checks
[bup.git] / lib / bup / vint.py
1 """Binary encodings for bup."""
2
3 # Copyright (C) 2010 Rob Browning
4 #
5 # This code is covered under the terms of the GNU Library General
6 # Public License as described in the bup LICENSE file.
7
8 # Variable length integers are encoded as vints -- see lucene.
9
10 from __future__ import absolute_import
11 from io import BytesIO
12 import sys
13
14 from bup import compat
15 from bup import _helpers
16
17
18 def write_vuint(port, x):
19     port.write(encode_vuint(x))
20
21
22 def encode_vuint(x):
23     try:
24         return _helpers.vuint_encode(x)
25     except OverflowError:
26         ret = b''
27         bytes_from_uint = compat.bytes_from_uint
28         if x < 0:
29             raise Exception("vuints must not be negative")
30         assert x, "the C version should have picked this up"
31
32         while True:
33             seven_bits = x & 0x7f
34             x >>= 7
35             if x:
36                 ret += bytes_from_uint(0x80 | seven_bits)
37             else:
38                 ret += bytes_from_uint(seven_bits)
39                 break
40         return ret
41
42 def read_vuint(port):
43     c = port.read(1)
44     if not c:
45         raise EOFError('encountered EOF while reading vuint')
46     assert isinstance(c, bytes)
47     if ord(c) == 0:
48         return 0
49     result = 0
50     offset = 0
51     while True:
52         b = ord(c)
53         if b & 0x80:
54             result |= ((b & 0x7f) << offset)
55             offset += 7
56             c = port.read(1)
57             if not c:
58                 raise EOFError('encountered EOF while reading vuint')
59         else:
60             result |= (b << offset)
61             break
62     return result
63
64
65 def write_vint(port, x):
66     # Sign is handled with the second bit of the first byte.  All else
67     # matches vuint.
68     port.write(encode_vint(x))
69
70
71 def encode_vint(x):
72     try:
73         return _helpers.vint_encode(x)
74     except OverflowError:
75         bytes_from_uint = compat.bytes_from_uint
76         assert x != 0, "the C version should have picked this up"
77         if x < 0:
78             x = -x
79             sign_and_six_bits = (x & 0x3f) | 0x40
80         else:
81             sign_and_six_bits = x & 0x3f
82         x >>= 6
83         assert x, "the C version should have picked this up"
84         return bytes_from_uint(0x80 | sign_and_six_bits) + encode_vuint(x)
85
86
87 def read_vint(port):
88     c = port.read(1)
89     if not c:
90         raise EOFError('encountered EOF while reading vint')
91     assert isinstance(c, bytes)
92     negative = False
93     result = 0
94     offset = 0
95     # Handle first byte with sign bit specially.
96     b = ord(c)
97     if b & 0x40:
98         negative = True
99     result |= (b & 0x3f)
100     if b & 0x80:
101         offset += 6
102         c = port.read(1)
103     elif negative:
104         return -result
105     else:
106         return result
107     while True:
108         b = ord(c)
109         if b & 0x80:
110             result |= ((b & 0x7f) << offset)
111             offset += 7
112             c = port.read(1)
113             if not c:
114                 raise EOFError('encountered EOF while reading vint')
115         else:
116             result |= (b << offset)
117             break
118     if negative:
119         return -result
120     else:
121         return result
122
123
124 def write_bvec(port, x):
125     write_vuint(port, len(x))
126     port.write(x)
127
128
129 def read_bvec(port):
130     n = read_vuint(port)
131     return port.read(n)
132
133
134 def encode_bvec(x):
135     return _helpers.vuint_encode(len(x)) + x
136
137
138 def skip_bvec(port):
139     port.read(read_vuint(port))
140
141 def send(port, types, *args):
142     if len(types) != len(args):
143         raise Exception('number of arguments does not match format string')
144     for (type, value) in zip(types, args):
145         if type == 'V':
146             write_vuint(port, value)
147         elif type == 'v':
148             write_vint(port, value)
149         elif type == 's':
150             write_bvec(port, value)
151         else:
152             raise Exception('unknown xpack format string item "' + type + '"')
153
154 def recv(port, types):
155     result = []
156     for type in types:
157         if type == 'V':
158             result.append(read_vuint(port))
159         elif type == 'v':
160             result.append(read_vint(port))
161         elif type == 's':
162             result.append(read_bvec(port))
163         else:
164             raise Exception('unknown xunpack format string item "' + type + '"')
165     return result
166
167 def pack(types, *args):
168     try:
169         return _helpers.limited_vint_pack(types, args)
170     except OverflowError:
171         assert len(types) == len(args)
172         ret = []
173         for typ, value in zip(types, args):
174             if typ == 'V':
175                 ret.append(encode_vuint(value))
176             elif typ == 'v':
177                 ret.append(encode_vint(value))
178             elif typ == 's':
179                 ret.append(encode_bvec(value))
180             else:
181                 assert False
182         return b''.join(ret)
183
184 def unpack(types, data):
185     port = BytesIO(data)
186     return recv(port, types)