More accurate, and Python 3 compatible.
Signed-off-by: Rob Browning <rlb@defaultvalue.org>
Tested-by: Rob Browning <rlb@defaultvalue.org>
"""
# end of bup preamble
import sys, stat, time, math
-from cStringIO import StringIO
from errno import EACCES
+from io import BytesIO
from bup import hashsplit, git, options, index, client, metadata, hlinkdb
from bup.helpers import *
metalist = [('', dir_metadata)] + metalist[1:]
sorted_metalist = sorted(metalist, key = lambda x : x[0])
metadata = ''.join([m[1].encode() for m in sorted_metalist])
- metadata_f = StringIO(metadata)
+ metadata_f = BytesIO(metadata)
mode, id = hashsplit.split_to_blob_or_tree(w.new_blob, w.new_tree,
[metadata_f],
keep_boundaries=False)
-import math, os
+import io, math, os
from bup import _helpers, helpers
from bup.helpers import sc_page_size
b = ''
fd = rpr = rstart = rlen = None
if _fmincore and hasattr(f, 'fileno'):
- fd = f.fileno()
- mcore = _fmincore(fd)
- if mcore:
- max_chunk = max(1, (8 * 1024 * 1024) / sc_page_size)
- rpr = _nonresident_page_regions(mcore, helpers.MINCORE_INCORE,
- max_chunk)
- rstart, rlen = next(rpr, (None, None))
+ try:
+ fd = f.fileno()
+ except io.UnsupportedOperation:
+ pass
+ if fd:
+ mcore = _fmincore(fd)
+ if mcore:
+ max_chunk = max(1, (8 * 1024 * 1024) / sc_page_size)
+ rpr = _nonresident_page_regions(mcore, helpers.MINCORE_INCORE,
+ max_chunk)
+ rstart, rlen = next(rpr, (None, None))
while 1:
if progress:
progress(filenum, len(b))
#
# This code is covered under the terms of the GNU Library General
# Public License as described in the bup LICENSE file.
+
+from io import BytesIO
import errno, os, sys, stat, time, pwd, grp, socket, struct
-from cStringIO import StringIO
+
from bup import vint, xstat
from bup.drecurse import recursive_dirlist
from bup.helpers import add_error, mkdirp, log, is_superuser, format_filesize
def _load_linux_xattr_rec(self, file):
data = vint.read_bvec(file)
- memfile = StringIO(data)
+ memfile = BytesIO(data)
result = []
for i in range(vint.read_vuint(memfile)):
key = vint.read_bvec(memfile)
vint.write_vuint(port, _rec_tag_end)
def encode(self, include_path=True):
- port = StringIO()
+ port = BytesIO()
self.write(port, include_path)
return port.getvalue()
-from cStringIO import StringIO
+from io import BytesIO
from wvtest import *
# Return a byte which will be split with a level of n
sb = lambda n: chr(basebits + n)
- split_never = StringIO(z(16))
- split_first = StringIO(z(1) + sb(3) + z(14))
- split_end = StringIO(z(13) + sb(1) + z(2))
- split_many = StringIO(sb(1) + z(3) + sb(2) + z(4) +
- sb(0) + z(4) + sb(5) + z(1))
+ split_never = BytesIO(z(16))
+ split_first = BytesIO(z(1) + sb(3) + z(14))
+ split_end = BytesIO(z(13) + sb(1) + z(2))
+ split_many = BytesIO(sb(1) + z(3) + sb(2) + z(4) +
+ sb(0) + z(4) + sb(5) + z(1))
WVPASSEQ(levels(split_never), [(4, 0), (4, 0), (4, 0), (4, 0)])
WVPASSEQ(levels(split_first), [(2, 3), (4, 0), (4, 0), (4, 0), (2, 0)])
WVPASSEQ(levels(split_end), [(4, 0), (4, 0), (4, 0), (2, 1), (2, 0)])
-from bup import vint
+from io import BytesIO
+
from wvtest import *
-from cStringIO import StringIO
+
+from bup import vint
def encode_and_decode_vuint(x):
- f = StringIO()
+ f = BytesIO()
vint.write_vuint(f, x)
- return vint.read_vuint(StringIO(f.getvalue()))
+ return vint.read_vuint(BytesIO(f.getvalue()))
@wvtest
def test_vuint():
for x in (0, 1, 42, 128, 10**16):
WVPASSEQ(encode_and_decode_vuint(x), x)
- WVEXCEPT(Exception, vint.write_vuint, StringIO(), -1)
- WVEXCEPT(EOFError, vint.read_vuint, StringIO())
+ WVEXCEPT(Exception, vint.write_vuint, BytesIO(), -1)
+ WVEXCEPT(EOFError, vint.read_vuint, BytesIO())
def encode_and_decode_vint(x):
- f = StringIO()
+ f = BytesIO()
vint.write_vint(f, x)
- return vint.read_vint(StringIO(f.getvalue()))
+ return vint.read_vint(BytesIO(f.getvalue()))
@wvtest
WVPASSEQ(encode_and_decode_vint(x), x)
for x in [-x for x in values]:
WVPASSEQ(encode_and_decode_vint(x), x)
- WVEXCEPT(EOFError, vint.read_vint, StringIO())
+ WVEXCEPT(EOFError, vint.read_vint, BytesIO())
def encode_and_decode_bvec(x):
- f = StringIO()
+ f = BytesIO()
vint.write_bvec(f, x)
- return vint.read_bvec(StringIO(f.getvalue()))
+ return vint.read_bvec(BytesIO(f.getvalue()))
@wvtest
values = ('', 'x', 'foo', '\0', '\0foo', 'foo\0bar\0')
for x in values:
WVPASSEQ(encode_and_decode_bvec(x), x)
- WVEXCEPT(EOFError, vint.read_bvec, StringIO())
- outf = StringIO()
+ WVEXCEPT(EOFError, vint.read_bvec, BytesIO())
+ outf = BytesIO()
for x in ('foo', 'bar', 'baz', 'bax'):
vint.write_bvec(outf, x)
- inf = StringIO(outf.getvalue())
+ inf = BytesIO(outf.getvalue())
WVPASSEQ(vint.read_bvec(inf), 'foo')
WVPASSEQ(vint.read_bvec(inf), 'bar')
vint.skip_bvec(inf)
# This code is covered under the terms of the GNU Library General
# Public License as described in the bup LICENSE file.
-from cStringIO import StringIO
+from io import BytesIO
# Variable length integers are encoded as vints -- see jakarta lucene.
def pack(types, *args):
if len(types) != len(args):
raise Exception('number of arguments does not match format string')
- port = StringIO()
+ port = BytesIO()
for (type, value) in zip(types, args):
if type == 'V':
write_vuint(port, value)
def unpack(types, data):
result = []
- port = StringIO(data)
+ port = BytesIO(data)
for type in types:
if type == 'V':
result.append(read_vuint(port))