def test_server_split_with_indexes(tmpdir):
environ[b'BUP_DIR'] = bupdir = tmpdir
git.init_repo(bupdir)
- lw = git.PackWriter()
- with client.Client(bupdir, create=True) as c:
- rw = c.new_packwriter()
-
+ with git.PackWriter() as lw:
lw.new_blob(s1)
- lw.close()
-
+ with client.Client(bupdir, create=True) as c, \
+ c.new_packwriter() as rw:
rw.new_blob(s2)
rw.breakpoint()
rw.new_blob(s1)
- rw.close()
def test_multiple_suggestions(tmpdir):
environ[b'BUP_DIR'] = bupdir = tmpdir
git.init_repo(bupdir)
- lw = git.PackWriter()
- lw.new_blob(s1)
- lw.close()
- lw = git.PackWriter()
- lw.new_blob(s2)
- lw.close()
+ with git.PackWriter() as lw:
+ lw.new_blob(s1)
+ with git.PackWriter() as lw:
+ lw.new_blob(s2)
assert len(glob.glob(git.repo(b'objects/pack'+IDX_PAT))) == 2
- with client.Client(bupdir, create=True) as c:
+ with client.Client(bupdir, create=True) as c, \
+ c.new_packwriter() as rw:
+
assert len(glob.glob(c.cachedir+IDX_PAT)) == 0
- rw = c.new_packwriter()
s1sha = rw.new_blob(s1)
assert rw.exists(s1sha)
s2sha = rw.new_blob(s2)
assert rw.objcache.exists(s2sha)
rw.new_blob(s3)
assert len(glob.glob(c.cachedir+IDX_PAT)) == 2
- rw.close()
- assert len(glob.glob(c.cachedir+IDX_PAT)) == 3
+ assert len(glob.glob(c.cachedir+IDX_PAT)) == 3
def test_dumb_client_server(tmpdir):
git.init_repo(bupdir)
open(git.repo(b'bup-dumb-server'), 'w').close()
- lw = git.PackWriter()
- lw.new_blob(s1)
- lw.close()
+ with git.PackWriter() as lw:
+ lw.new_blob(s1)
- with client.Client(bupdir, create=True) as c:
- rw = c.new_packwriter()
+ with client.Client(bupdir, create=True) as c, \
+ c.new_packwriter() as rw:
assert len(glob.glob(c.cachedir+IDX_PAT)) == 1
rw.new_blob(s1)
assert len(glob.glob(c.cachedir+IDX_PAT)) == 1
rw.new_blob(s2)
- rw.close()
- assert len(glob.glob(c.cachedir+IDX_PAT)) == 2
+ assert len(glob.glob(c.cachedir+IDX_PAT)) == 2
def test_midx_refreshing(tmpdir):
environ[b'BUP_DIR'] = bupdir = tmpdir
git.init_repo(bupdir)
- with client.Client(bupdir, create=True) as c:
- rw = c.new_packwriter()
+ with client.Client(bupdir, create=True) as c, \
+ c.new_packwriter() as rw:
rw.new_blob(s1)
p1base = rw.breakpoint()
p1name = os.path.join(c.cachedir, p1base)
s1sha = rw.new_blob(s1) # should not be written; it's already in p1
s2sha = rw.new_blob(s2)
p2base = rw.close()
- p2name = os.path.join(c.cachedir, p2base)
- del rw
-
- pi = git.PackIdxList(bupdir + b'/objects/pack')
- assert len(pi.packs) == 2
- pi.refresh()
- assert len(pi.packs) == 2
- assert sorted([os.path.basename(i.name) for i in pi.packs]) == sorted([p1base, p2base])
-
- p1 = git.open_idx(p1name)
- assert p1.exists(s1sha)
- p2 = git.open_idx(p2name)
- assert not p2.exists(s1sha)
- assert p2.exists(s2sha)
-
- subprocess.call([path.exe(), b'midx', b'-f'])
- pi.refresh()
- assert len(pi.packs) == 1
- pi.refresh(skip_midx=True)
- assert len(pi.packs) == 2
- pi.refresh(skip_midx=False)
- assert len(pi.packs) == 1
+ p2name = os.path.join(c.cachedir, p2base)
+
+ with git.PackIdxList(bupdir + b'/objects/pack') as pi:
+ assert len(pi.packs) == 2
+ pi.refresh()
+ assert len(pi.packs) == 2
+ assert sorted([os.path.basename(i.name) for i in pi.packs]) \
+ == sorted([p1base, p2base])
+
+ with git.open_idx(p1name) as p1, \
+ git.open_idx(p2name) as p2:
+ assert p1.exists(s1sha)
+ assert not p2.exists(s1sha)
+ assert p2.exists(s2sha)
+
+ subprocess.call([path.exe(), b'midx', b'-f'])
+ pi.refresh()
+ assert len(pi.packs) == 1
+ pi.refresh(skip_midx=True)
+ assert len(pi.packs) == 2
+ pi.refresh(skip_midx=False)
+ assert len(pi.packs) == 1
def test_remote_parsing():