import sys
from binascii import hexlify, unhexlify
from subprocess import check_call
-import struct, os, time
+from functools import partial
+import struct, os
import pytest
from wvpytest import *
from bup import git, path
-from bup.compat import bytes_from_byte, environ, range
+from bup.compat import bytes_from_byte, environ
from bup.helpers import localtime, log, mkdirp, readpipe
git.init_repo(bupdir)
git.verbose = 1
- w = git.PackWriter()
- w.new_blob(os.urandom(100))
- w.new_blob(os.urandom(100))
- w.abort()
+ with git.PackWriter() as w:
+ w.new_blob(os.urandom(100))
+ w.new_blob(os.urandom(100))
+ w.abort()
- w = git.PackWriter()
- hashes = []
- nobj = 1000
- for i in range(nobj):
- hashes.append(w.new_blob(b'%d' % i))
- log('\n')
- nameprefix = w.close()
+ with git.PackWriter() as w:
+ hashes = []
+ nobj = 1000
+ for i in range(nobj):
+ hashes.append(w.new_blob(b'%d' % i))
+ log('\n')
+ nameprefix = w.close()
print(repr(nameprefix))
WVPASS(os.path.exists(nameprefix + b'.pack'))
WVPASS(os.path.exists(nameprefix + b'.idx'))
- r = git.open_idx(nameprefix + b'.idx')
- print(repr(r.fanout))
+ with git.open_idx(nameprefix + b'.idx') as r:
+ print(repr(r.fanout))
- for i in range(nobj):
- WVPASS(r.find_offset(hashes[i]) > 0)
- WVPASS(r.exists(hashes[99]))
- WVFAIL(r.exists(b'\0'*20))
+ for i in range(nobj):
+ WVPASS(r.find_offset(hashes[i]) > 0)
+ WVPASS(r.exists(hashes[99]))
+ WVFAIL(r.exists(b'\0'*20))
- pi = iter(r)
- for h in sorted(hashes):
- WVPASSEQ(hexlify(next(pi)), hexlify(h))
+ pi = iter(r)
+ for h in sorted(hashes):
+ WVPASSEQ(hexlify(next(pi)), hexlify(h))
- WVFAIL(r.find_offset(b'\0'*20))
+ WVFAIL(r.find_offset(b'\0'*20))
- r = git.PackIdxList(bupdir + b'/objects/pack')
- WVPASS(r.exists(hashes[5]))
- WVPASS(r.exists(hashes[6]))
- WVFAIL(r.exists(b'\0'*20))
+ with git.PackIdxList(bupdir + b'/objects/pack') as r:
+ WVPASS(r.exists(hashes[5]))
+ WVPASS(r.exists(hashes[6]))
+ WVFAIL(r.exists(b'\0'*20))
def test_pack_name_lookup(tmpdir):
hashes = []
for start in range(0,28,2):
- w = git.PackWriter()
- for i in range(start, start+2):
- hashes.append(w.new_blob(b'%d' % i))
- log('\n')
- idxnames.append(os.path.basename(w.close() + b'.idx'))
+ with git.PackWriter() as w:
+ for i in range(start, start+2):
+ hashes.append(w.new_blob(b'%d' % i))
+ log('\n')
+ idxnames.append(os.path.basename(w.close() + b'.idx'))
- r = git.PackIdxList(packdir)
- WVPASSEQ(len(r.packs), 2)
- for e,idxname in enumerate(idxnames):
- for i in range(e*2, (e+1)*2):
- WVPASSEQ(idxname, r.exists(hashes[i], want_source=True))
+ with git.PackIdxList(packdir) as r:
+ WVPASSEQ(len(r.packs), 2)
+ for e,idxname in enumerate(idxnames):
+ for i in range(e*2, (e+1)*2):
+ WVPASSEQ(idxname, r.exists(hashes[i], want_source=True))
def test_long_index(tmpdir):
idx.add(obj3_bin, 3, 0xff)
name = tmpdir + b'/tmp.idx'
r = idx.write(name, pack_bin)
- i = git.PackIdxV2(name, open(name, 'rb'))
- WVPASSEQ(i.find_offset(obj_bin), 0xfffffffff)
- WVPASSEQ(i.find_offset(obj2_bin), 0xffffffffff)
- WVPASSEQ(i.find_offset(obj3_bin), 0xff)
+ with git.PackIdxV2(name, open(name, 'rb')) as i:
+ WVPASSEQ(i.find_offset(obj_bin), 0xfffffffff)
+ WVPASSEQ(i.find_offset(obj2_bin), 0xffffffffff)
+ WVPASSEQ(i.find_offset(obj3_bin), 0xff)
def test_check_repo_or_die(tmpdir):
environ[b'GIT_AUTHOR_EMAIL'] = b'bup@a425bc70a02811e49bdf73ee56450e6f'
environ[b'GIT_COMMITTER_EMAIL'] = environ[b'GIT_AUTHOR_EMAIL']
try:
- readpipe([b'git', b'init', workdir])
environ[b'GIT_DIR'] = environ[b'BUP_DIR'] = repodir
+ readpipe([b'git', b'init', workdir])
+ exc(b'git', b'symbolic-ref', b'HEAD', b'refs/heads/main')
git.check_repo_or_die(repodir)
os.chdir(workdir)
with open('foo', 'w') as f:
readpipe([b'git', b'commit', b'-am', b'Do something',
b'--author', b'Someone <someone@somewhere>',
b'--date', b'Sat Oct 3 19:48:49 2009 -0400'])
- commit = readpipe([b'git', b'show-ref', b'-s', b'master']).strip()
+ commit = readpipe([b'git', b'show-ref', b'-s', b'main']).strip()
parents = showval(commit, b'%P')
tree = showval(commit, b'%T')
cname = showval(commit, b'%cn')
f.write(b'baz\n')
readpipe([b'git', b'add', '.'])
readpipe([b'git', b'commit', b'-am', b'Do something else'])
- child = readpipe([b'git', b'show-ref', b'-s', b'master']).strip()
+ child = readpipe([b'git', b'show-ref', b'-s', b'main']).strip()
parents = showval(child, b'%P')
commit_items = git.get_commit_items(child, git.cp())
WVPASSEQ(commit_items.parents, [commit])
restore_env_var(b'GIT_COMMITTER_EMAIL', orig_committer_email)
+gpgsig_example_1 = b'''tree 3fab08ade2fbbda60bef180bb8e0cc5724d6bd4d
+parent 36db87b46a95ca5079f43dfe9b72220acab7c731
+author Rob Browning <rlb@defaultvalue.org> 1633397238 -0500
+committer Rob Browning <rlb@defaultvalue.org> 1633397238 -0500
+gpgsig -----BEGIN PGP SIGNATURE-----
+
+ ...
+ -----END PGP SIGNATURE-----
+
+Sample signed commit.
+'''
+
+gpgsig_example_2 = b'''tree 3fab08ade2fbbda60bef180bb8e0cc5724d6bd4d
+parent 36db87b46a95ca5079f43dfe9b72220acab7c731
+author Rob Browning <rlb@defaultvalue.org> 1633397238 -0500
+committer Rob Browning <rlb@defaultvalue.org> 1633397238 -0500
+gpgsig -----BEGIN PGP SIGNATURE-----
+
+ ...
+ -----END PGP SIGNATURE-----
+
+
+Sample signed commit.
+'''
+
+def test_commit_gpgsig_parsing():
+ c = git.parse_commit(gpgsig_example_1)
+ assert c.gpgsig
+ assert c.gpgsig.startswith(b'-----BEGIN PGP SIGNATURE-----\n')
+ assert c.gpgsig.endswith(b'\n-----END PGP SIGNATURE-----\n')
+ c = git.parse_commit(gpgsig_example_2)
+ assert c.gpgsig
+ assert c.gpgsig.startswith(b'-----BEGIN PGP SIGNATURE-----')
+ assert c.gpgsig.endswith(b'\n-----END PGP SIGNATURE-----\n\n')
+
+
def test_new_commit(tmpdir):
environ[b'BUP_DIR'] = bupdir = tmpdir + b'/bup'
git.init_repo(bupdir)
git.verbose = 1
- w = git.PackWriter()
- tree = os.urandom(20)
- parent = os.urandom(20)
- author_name = b'Author'
- author_mail = b'author@somewhere'
- adate_sec = 1439657836
- cdate_sec = adate_sec + 1
- committer_name = b'Committer'
- committer_mail = b'committer@somewhere'
- adate_tz_sec = cdate_tz_sec = None
- commit = w.new_commit(tree, parent,
- b'%s <%s>' % (author_name, author_mail),
- adate_sec, adate_tz_sec,
- b'%s <%s>' % (committer_name, committer_mail),
- cdate_sec, cdate_tz_sec,
- b'There is a small mailbox here')
- adate_tz_sec = -60 * 60
- cdate_tz_sec = 120 * 60
- commit_off = w.new_commit(tree, parent,
+ with git.PackWriter() as w:
+ tree = os.urandom(20)
+ parent = os.urandom(20)
+ author_name = b'Author'
+ author_mail = b'author@somewhere'
+ adate_sec = 1439657836
+ cdate_sec = adate_sec + 1
+ committer_name = b'Committer'
+ committer_mail = b'committer@somewhere'
+ adate_tz_sec = cdate_tz_sec = None
+ commit = w.new_commit(tree, parent,
b'%s <%s>' % (author_name, author_mail),
adate_sec, adate_tz_sec,
b'%s <%s>' % (committer_name, committer_mail),
cdate_sec, cdate_tz_sec,
b'There is a small mailbox here')
- w.close()
+ adate_tz_sec = -60 * 60
+ cdate_tz_sec = 120 * 60
+ commit_off = w.new_commit(tree, parent,
+ b'%s <%s>' % (author_name, author_mail),
+ adate_sec, adate_tz_sec,
+ b'%s <%s>' % (committer_name, committer_mail),
+ cdate_sec, cdate_tz_sec,
+ b'There is a small mailbox here')
commit_items = git.get_commit_items(hexlify(commit), git.cp())
local_author_offset = localtime(adate_sec).tm_gmtoff
for i in range(10):
_create_idx(tmpdir, i)
git.auto_midx(tmpdir)
- l = git.PackIdxList(tmpdir)
+ with git.PackIdxList(tmpdir) as l:
# this doesn't exist (yet)
- WVPASSEQ(None, l.exists(struct.pack('18xBB', 10, 0)))
- for i in range(10, 15):
- _create_idx(tmpdir, i)
- # delete the midx ...
- # TODO: why do we need to? git.auto_midx() below doesn't?!
- for fn in os.listdir(tmpdir):
- if fn.endswith(b'.midx'):
- os.unlink(os.path.join(tmpdir, fn))
- # and make a new one
- git.auto_midx(tmpdir)
- # check it still doesn't exist - we haven't refreshed
- WVPASSEQ(None, l.exists(struct.pack('18xBB', 10, 0)))
- # check that we still have the midx open, this really
- # just checks more for the kernel API ('deleted' string)
- for fn in openfiles():
- if not b'midx-' in fn:
- continue
- WVPASSEQ(True, b'deleted' in fn)
- # refresh the PackIdxList
- l.refresh()
- # and check that an object in pack 10 exists now
- WVPASSEQ(True, l.exists(struct.pack('18xBB', 10, 0)))
- for fn in openfiles():
- if not b'midx-' in fn:
- continue
- # check that we don't have it open anymore
- WVPASSEQ(False, b'deleted' in fn)
+ WVPASSEQ(None, l.exists(struct.pack('18xBB', 10, 0)))
+ for i in range(10, 15):
+ _create_idx(tmpdir, i)
+ # delete the midx ...
+ # TODO: why do we need to? git.auto_midx() below doesn't?!
+ for fn in os.listdir(tmpdir):
+ if fn.endswith(b'.midx'):
+ os.unlink(os.path.join(tmpdir, fn))
+ # and make a new one
+ git.auto_midx(tmpdir)
+ # check it still doesn't exist - we haven't refreshed
+ WVPASSEQ(None, l.exists(struct.pack('18xBB', 10, 0)))
+ # check that we still have the midx open, this really
+ # just checks more for the kernel API ('deleted' string)
+ for fn in openfiles():
+ if not b'midx-' in fn:
+ continue
+ WVPASSEQ(True, b'deleted' in fn)
+ # refresh the PackIdxList
+ l.refresh()
+ # and check that an object in pack 10 exists now
+ WVPASSEQ(True, l.exists(struct.pack('18xBB', 10, 0)))
+ for fn in openfiles():
+ if not b'midx-' in fn:
+ continue
+ # check that we don't have it open anymore
+ WVPASSEQ(False, b'deleted' in fn)
+
+def test_config():
+ cfg_file = os.path.join(os.path.dirname(__file__), 'sample.conf')
+ no_such_file = os.path.join(os.path.dirname(__file__), 'nosuch.conf')
+ git_config_get = partial(git.git_config_get, cfg_file=cfg_file)
+ WVPASSEQ(git_config_get(b'bup.foo'), b'bar')
+ WVPASSEQ(git_config_get(b'bup.bup'), b'is great')
+ WVPASSEQ(git_config_get(b'bup.end'), b'end')
+ WVPASSEQ(git_config_get(b'bup.comments'), None)
+ WVPASSEQ(git_config_get(b'bup.;comments'), None)
+ WVPASSEQ(git_config_get(b'bup.and'), None)
+ WVPASSEQ(git_config_get(b'bup.#and'), None)
+
+ WVPASSEQ(git.git_config_get(b'bup.foo', cfg_file=no_such_file), None)
+
+ WVEXCEPT(git.GitError, git_config_get, b'bup.isbad', opttype='bool')
+ WVEXCEPT(git.GitError, git_config_get, b'bup.isbad', opttype='int')
+ WVPASSEQ(git_config_get(b'bup.isbad'), b'ok')
+ WVPASSEQ(True, git_config_get(b'bup.istrue1', opttype='bool'))
+ WVPASSEQ(True, git_config_get(b'bup.istrue2', opttype='bool'))
+ WVPASSEQ(True, git_config_get(b'bup.istrue3', opttype='bool'))
+ WVPASSEQ(False, git_config_get(b'bup.isfalse1', opttype='bool'))
+ WVPASSEQ(False, git_config_get(b'bup.isfalse2', opttype='bool'))
+ WVPASSEQ(None, git_config_get(b'bup.nosuchkey', opttype='bool'))
+ WVPASSEQ(1, git_config_get(b'bup.istrue1', opttype='int'))
+ WVPASSEQ(2, git_config_get(b'bup.istrue2', opttype='int'))
+ WVPASSEQ(0, git_config_get(b'bup.isfalse2', opttype='int'))
+ WVPASSEQ(0x777, git_config_get(b'bup.hex', opttype='int'))