import pytest
from bup import client, git, path
-from bup.compat import bytes_from_uint, environ, range
+from bup.compat import bytes_from_uint, environ
def randbytes(sz):
s = b''
def test_server_split_with_indexes(tmpdir):
environ[b'BUP_DIR'] = bupdir = tmpdir
git.init_repo(bupdir)
- with git.PackWriter() as lw, \
- client.Client(bupdir, create=True) as c, \
- c.new_packwriter() as rw:
+ with git.PackWriter() as lw:
lw.new_blob(s1)
- lw.close()
-
+ with client.Client(bupdir, create=True) as c, \
+ c.new_packwriter() as rw:
rw.new_blob(s2)
rw.breakpoint()
rw.new_blob(s1)
p2base = rw.close()
p2name = os.path.join(c.cachedir, p2base)
- pi = git.PackIdxList(bupdir + b'/objects/pack')
- assert len(pi.packs) == 2
- pi.refresh()
- assert len(pi.packs) == 2
- assert sorted([os.path.basename(i.name) for i in pi.packs]) == sorted([p1base, p2base])
-
- p1 = git.open_idx(p1name)
- assert p1.exists(s1sha)
- p2 = git.open_idx(p2name)
- assert not p2.exists(s1sha)
- assert p2.exists(s2sha)
-
- subprocess.call([path.exe(), b'midx', b'-f'])
- pi.refresh()
- assert len(pi.packs) == 1
- pi.refresh(skip_midx=True)
- assert len(pi.packs) == 2
- pi.refresh(skip_midx=False)
- assert len(pi.packs) == 1
+ with git.PackIdxList(bupdir + b'/objects/pack') as pi:
+ assert len(pi.packs) == 2
+ pi.refresh()
+ assert len(pi.packs) == 2
+ assert sorted([os.path.basename(i.name) for i in pi.packs]) \
+ == sorted([p1base, p2base])
+
+ with git.open_idx(p1name) as p1, \
+ git.open_idx(p2name) as p2:
+ assert p1.exists(s1sha)
+ assert not p2.exists(s1sha)
+ assert p2.exists(s2sha)
+
+ subprocess.call([path.exe(), b'midx', b'-f'])
+ pi.refresh()
+ assert len(pi.packs) == 1
+ pi.refresh(skip_midx=True)
+ assert len(pi.packs) == 2
+ pi.refresh(skip_midx=False)
+ assert len(pi.packs) == 1
def test_remote_parsing():