-import struct, os, tempfile, time
+from subprocess import check_call
+import struct, os, subprocess, tempfile, time
+
from bup import git
-from bup.helpers import *
+from bup.helpers import log, mkdirp, readpipe
+
from wvtest import *
+top_dir = os.path.realpath('../../..')
+bup_exe = top_dir + '/bup'
+bup_tmp = top_dir + '/t/tmp'
+
+
+def exc(*cmd):
+ cmd_str = ' '.join(cmd)
+ print >> sys.stderr, cmd_str
+ check_call(cmd)
+
+
+def exo(*cmd):
+ cmd_str = ' '.join(cmd)
+ print >> sys.stderr, cmd_str
+ return readpipe(cmd)
+
+
@wvtest
def testmangle():
afile = 0100644
WVPASSEQ(git.mangle_name("f.bup", afile, adir), "f.bup.bup")
WVPASSEQ(git.mangle_name("f", afile, adir), "f.bup")
- WVPASSEQ(git.demangle_name("f.bup"), ("f", git.BUP_CHUNKED))
- WVPASSEQ(git.demangle_name("f.bupl"), ("f", git.BUP_NORMAL))
- WVPASSEQ(git.demangle_name("f.bup.bupl"), ("f.bup", git.BUP_NORMAL))
+ WVPASSEQ(git.demangle_name("f.bup", afile), ("f", git.BUP_CHUNKED))
+ WVPASSEQ(git.demangle_name("f.bupl", afile), ("f", git.BUP_NORMAL))
+ WVPASSEQ(git.demangle_name("f.bup.bupl", afile), ("f.bup", git.BUP_NORMAL))
+
+ WVPASSEQ(git.demangle_name(".bupm", afile), ('', git.BUP_NORMAL))
+ WVPASSEQ(git.demangle_name(".bupm", adir), ('', git.BUP_CHUNKED))
# for safety, we ignore .bup? suffixes we don't recognize. Future
# versions might implement a .bup[a-z] extension as something other
# than BUP_NORMAL.
- WVPASSEQ(git.demangle_name("f.bupa"), ("f.bupa", git.BUP_NORMAL))
+ WVPASSEQ(git.demangle_name("f.bupa", afile), ("f.bupa", git.BUP_NORMAL))
@wvtest
@wvtest
def testpacks():
- subprocess.call(['rm','-rf', 'pybuptest.tmp'])
- git.init_repo('pybuptest.tmp')
+ initial_failures = wvfailure_count()
+ tmpdir = tempfile.mkdtemp(dir=bup_tmp, prefix='bup-tgit-')
+ os.environ['BUP_MAIN_EXE'] = bup_exe
+ os.environ['BUP_DIR'] = bupdir = tmpdir + "/bup"
+ git.init_repo(bupdir)
git.verbose = 1
w = git.PackWriter()
w.new_blob(os.urandom(100))
w.new_blob(os.urandom(100))
w.abort()
-
+
w = git.PackWriter()
hashes = []
nobj = 1000
WVFAIL(r.find_offset('\0'*20))
- r = git.PackIdxList('pybuptest.tmp/objects/pack')
+ r = git.PackIdxList(bupdir + '/objects/pack')
WVPASS(r.exists(hashes[5]))
WVPASS(r.exists(hashes[6]))
WVFAIL(r.exists('\0'*20))
-
+ if wvfailure_count() == initial_failures:
+ subprocess.call(['rm', '-rf', tmpdir])
@wvtest
def test_pack_name_lookup():
- os.environ['BUP_MAIN_EXE'] = bupmain = '../../../bup'
- os.environ['BUP_DIR'] = bupdir = 'pybuptest.tmp'
- subprocess.call(['rm','-rf', bupdir])
+ initial_failures = wvfailure_count()
+ tmpdir = tempfile.mkdtemp(dir=bup_tmp, prefix='bup-tgit-')
+ os.environ['BUP_MAIN_EXE'] = bup_exe
+ os.environ['BUP_DIR'] = bupdir = tmpdir + "/bup"
git.init_repo(bupdir)
git.verbose = 1
packdir = git.repo('objects/pack')
for e,idxname in enumerate(idxnames):
for i in range(e*2, (e+1)*2):
WVPASSEQ(r.exists(hashes[i], want_source=True), idxname)
+ if wvfailure_count() == initial_failures:
+ subprocess.call(['rm', '-rf', tmpdir])
@wvtest
def test_long_index():
+ initial_failures = wvfailure_count()
+ tmpdir = tempfile.mkdtemp(dir=bup_tmp, prefix='bup-tgit-')
+ os.environ['BUP_MAIN_EXE'] = bup_exe
+ os.environ['BUP_DIR'] = bupdir = tmpdir + "/bup"
+ git.init_repo(bupdir)
w = git.PackWriter()
obj_bin = struct.pack('!IIIII',
0x00112233, 0x44556677, 0x88990011, 0x22334455, 0x66778899)
WVPASSEQ(i.find_offset(obj_bin), 0xfffffffff)
WVPASSEQ(i.find_offset(obj2_bin), 0xffffffffff)
WVPASSEQ(i.find_offset(obj3_bin), 0xff)
- os.remove(name)
+ if wvfailure_count() == initial_failures:
+ os.remove(name)
+ subprocess.call(['rm', '-rf', tmpdir])
+
+
+@wvtest
+def test_check_repo_or_die():
+ initial_failures = wvfailure_count()
+ orig_cwd = os.getcwd()
+ tmpdir = tempfile.mkdtemp(dir=bup_tmp, prefix='bup-tgit-')
+ os.environ['BUP_DIR'] = bupdir = tmpdir + "/bup"
+ try:
+ os.chdir(tmpdir)
+ git.init_repo(bupdir)
+ git.check_repo_or_die()
+ WVPASS('check_repo_or_die') # if we reach this point the call above passed
+
+ os.rename(bupdir + '/objects/pack', bupdir + '/objects/pack.tmp')
+ open(bupdir + '/objects/pack', 'w').close()
+ try:
+ git.check_repo_or_die()
+ except SystemExit as e:
+ WVPASSEQ(e.code, 14)
+ else:
+ WVFAIL()
+ os.unlink(bupdir + '/objects/pack')
+ os.rename(bupdir + '/objects/pack.tmp', bupdir + '/objects/pack')
+
+ try:
+ git.check_repo_or_die('nonexistantbup.tmp')
+ except SystemExit as e:
+ WVPASSEQ(e.code, 15)
+ else:
+ WVFAIL()
+ finally:
+ os.chdir(orig_cwd)
+ if wvfailure_count() == initial_failures:
+ subprocess.call(['rm', '-rf', tmpdir])
+
+@wvtest
+def test_commit_parsing():
+ def restore_env_var(name, val):
+ if val is None:
+ del os.environ[name]
+ else:
+ os.environ[name] = val
+ def showval(commit, val):
+ return readpipe(['git', 'show', '-s',
+ '--pretty=format:%s' % val, commit]).strip()
+ initial_failures = wvfailure_count()
+ orig_cwd = os.getcwd()
+ tmpdir = tempfile.mkdtemp(dir=bup_tmp, prefix='bup-tgit-')
+ workdir = tmpdir + "/work"
+ repodir = workdir + '/.git'
+ orig_author_name = os.environ.get('GIT_AUTHOR_NAME')
+ orig_author_email = os.environ.get('GIT_AUTHOR_EMAIL')
+ orig_committer_name = os.environ.get('GIT_COMMITTER_NAME')
+ orig_committer_email = os.environ.get('GIT_COMMITTER_EMAIL')
+ os.environ['GIT_AUTHOR_NAME'] = 'bup test'
+ os.environ['GIT_COMMITTER_NAME'] = os.environ['GIT_AUTHOR_NAME']
+ os.environ['GIT_AUTHOR_EMAIL'] = 'bup@a425bc70a02811e49bdf73ee56450e6f'
+ os.environ['GIT_COMMITTER_EMAIL'] = os.environ['GIT_AUTHOR_EMAIL']
+ try:
+ readpipe(['git', 'init', workdir])
+ os.environ['GIT_DIR'] = os.environ['BUP_DIR'] = repodir
+ git.check_repo_or_die(repodir)
+ os.chdir(workdir)
+ with open('foo', 'w') as f:
+ print >> f, 'bar'
+ readpipe(['git', 'add', '.'])
+ readpipe(['git', 'commit', '-am', 'Do something',
+ '--author', 'Someone <someone@somewhere>',
+ '--date', 'Sat Oct 3 19:48:49 2009 -0400'])
+ commit = readpipe(['git', 'show-ref', '-s', 'master']).strip()
+ parents = showval(commit, '%P')
+ tree = showval(commit, '%T')
+ cname = showval(commit, '%cn')
+ cmail = showval(commit, '%ce')
+ cdate = showval(commit, '%ct')
+ coffs = showval(commit, '%ci')
+ coffs = coffs[-5:]
+ coff = (int(coffs[-4:-2]) * 60 * 60) + (int(coffs[-2:]) * 60)
+ if coffs[-5] == '-':
+ coff = - coff
+ commit_items = git.get_commit_items(commit, git.cp())
+ WVPASSEQ(commit_items.parents, [])
+ WVPASSEQ(commit_items.tree, tree)
+ WVPASSEQ(commit_items.author_name, 'Someone')
+ WVPASSEQ(commit_items.author_mail, 'someone@somewhere')
+ WVPASSEQ(commit_items.author_sec, 1254613729)
+ WVPASSEQ(commit_items.author_offset, -(4 * 60 * 60))
+ WVPASSEQ(commit_items.committer_name, cname)
+ WVPASSEQ(commit_items.committer_mail, cmail)
+ WVPASSEQ(commit_items.committer_sec, int(cdate))
+ WVPASSEQ(commit_items.committer_offset, coff)
+ WVPASSEQ(commit_items.message, 'Do something\n')
+ with open('bar', 'w') as f:
+ print >> f, 'baz'
+ readpipe(['git', 'add', '.'])
+ readpipe(['git', 'commit', '-am', 'Do something else'])
+ child = readpipe(['git', 'show-ref', '-s', 'master']).strip()
+ parents = showval(child, '%P')
+ commit_items = git.get_commit_items(child, git.cp())
+ WVPASSEQ(commit_items.parents, [commit])
+ finally:
+ os.chdir(orig_cwd)
+ restore_env_var('GIT_AUTHOR_NAME', orig_author_name)
+ restore_env_var('GIT_AUTHOR_EMAIL', orig_author_email)
+ restore_env_var('GIT_COMMITTER_NAME', orig_committer_name)
+ restore_env_var('GIT_COMMITTER_EMAIL', orig_committer_email)
+ if wvfailure_count() == initial_failures:
+ subprocess.call(['rm', '-rf', tmpdir])
+
+
+@wvtest
+def test_list_refs():
+ initial_failures = wvfailure_count()
+ tmpdir = tempfile.mkdtemp(dir=bup_tmp, prefix='bup-tgit-')
+ os.environ['BUP_MAIN_EXE'] = bup_exe
+ os.environ['BUP_DIR'] = bupdir = tmpdir + "/bup"
+ src = tmpdir + '/src'
+ mkdirp(src)
+ with open(src + '/1', 'w+') as f:
+ print f, 'something'
+ with open(src + '/2', 'w+') as f:
+ print f, 'something else'
+ git.init_repo(bupdir)
+ emptyset = frozenset()
+ WVPASSEQ(frozenset(git.list_refs()), emptyset)
+ WVPASSEQ(frozenset(git.list_refs(limit_to_tags=True)), emptyset)
+ WVPASSEQ(frozenset(git.list_refs(limit_to_heads=True)), emptyset)
+ exc(bup_exe, 'index', src)
+ exc(bup_exe, 'save', '-n', 'src', '--strip', src)
+ src_hash = exo('git', '--git-dir', bupdir,
+ 'rev-parse', 'src').strip().split('\n')
+ assert(len(src_hash) == 1)
+ src_hash = src_hash[0].decode('hex')
+ tree_hash = exo('git', '--git-dir', bupdir,
+ 'rev-parse', 'src:').strip().split('\n')[0].decode('hex')
+ blob_hash = exo('git', '--git-dir', bupdir,
+ 'rev-parse', 'src:1').strip().split('\n')[0].decode('hex')
+ WVPASSEQ(frozenset(git.list_refs()),
+ frozenset([('refs/heads/src', src_hash)]))
+ WVPASSEQ(frozenset(git.list_refs(limit_to_tags=True)), emptyset)
+ WVPASSEQ(frozenset(git.list_refs(limit_to_heads=True)),
+ frozenset([('refs/heads/src', src_hash)]))
+ exc('git', '--git-dir', bupdir, 'tag', 'commit-tag', 'src')
+ WVPASSEQ(frozenset(git.list_refs()),
+ frozenset([('refs/heads/src', src_hash),
+ ('refs/tags/commit-tag', src_hash)]))
+ WVPASSEQ(frozenset(git.list_refs(limit_to_tags=True)),
+ frozenset([('refs/tags/commit-tag', src_hash)]))
+ WVPASSEQ(frozenset(git.list_refs(limit_to_heads=True)),
+ frozenset([('refs/heads/src', src_hash)]))
+ exc('git', '--git-dir', bupdir, 'tag', 'tree-tag', 'src:')
+ exc('git', '--git-dir', bupdir, 'tag', 'blob-tag', 'src:1')
+ os.unlink(bupdir + '/refs/heads/src')
+ expected_tags = frozenset([('refs/tags/commit-tag', src_hash),
+ ('refs/tags/tree-tag', tree_hash),
+ ('refs/tags/blob-tag', blob_hash)])
+ WVPASSEQ(frozenset(git.list_refs()), expected_tags)
+ WVPASSEQ(frozenset(git.list_refs(limit_to_heads=True)), frozenset([]))
+ WVPASSEQ(frozenset(git.list_refs(limit_to_tags=True)), expected_tags)
+ if wvfailure_count() == initial_failures:
+ subprocess.call(['rm', '-rf', tmpdir])