-import os, time, random, subprocess
+import sys, os, time, random, subprocess
+sys.path.append('../lib')
from bup import client, git, hashsplit
from wvtest import *
@wvtest
def test_server_split_with_indexes():
- os.environ['BUP_MAIN_EXE'] = './bup'
+ os.environ['BUP_MAIN_EXE'] = '../bup'
os.environ['BUP_DIR'] = bupdir = 'buptest_tclient.tmp'
subprocess.call(['rm', '-rf', bupdir])
git.init_repo(bupdir)
@wvtest
def test_midx_refreshing():
- os.environ['BUP_MAIN_EXE'] = bupmain = './bup'
+ os.environ['BUP_MAIN_EXE'] = bupmain = '../bup'
os.environ['BUP_DIR'] = bupdir = 'buptest_tmidx.tmp'
subprocess.call(['rm', '-rf', bupdir])
git.init_repo(bupdir)
def index_basic():
cd = os.path.realpath('')
WVPASS(cd)
- sd = os.path.realpath('t/sampledata')
- WVPASSEQ(index.realpath('t/sampledata'), cd + '/t/sampledata')
- WVPASSEQ(os.path.realpath('t/sampledata/x'), sd + '/x')
- WVPASSEQ(os.path.realpath('t/sampledata/etc'), os.path.realpath('/etc'))
- WVPASSEQ(index.realpath('t/sampledata/etc'), sd + '/etc')
+ sd = os.path.realpath('sampledata')
+ WVPASSEQ(index.realpath('sampledata'), cd + '/sampledata')
+ WVPASSEQ(os.path.realpath('sampledata/x'), sd + '/x')
+ WVPASSEQ(os.path.realpath('sampledata/etc'), os.path.realpath('/etc'))
+ WVPASSEQ(index.realpath('sampledata/etc'), sd + '/etc')
@wvtest
def index_writer():
unlink('index.tmp')
ds = os.stat('.')
- fs = os.stat('t/tindex.py')
+ fs = os.stat('tindex.py')
w = index.Writer('index.tmp')
w.add('/var/tmp/sporky', fs)
w.add('/etc/passwd', fs)
unlink('index.tmp')
unlink('index2.tmp')
ds = os.stat('.')
- fs = os.stat('t/tindex.py')
+ fs = os.stat('tindex.py')
w1 = index.Writer('index.tmp')
w1.add('/a/b/x', fs)
_registered = []
_tests = 0
_fails = 0
-
+
def wvtest(func):
""" Use this decorator (@wvtest) in front of any function you want to run
as part of the unit test suite. Then run:
"""
_registered.append(func)
return func
-
-
+
+
def _result(msg, tb, code):
global _tests, _fails
_tests += 1
print '! %-70s %s' % ('%s:%-4d %s' % (filename, line, msg),
code)
sys.stdout.flush()
-
-
+
+
def _check(cond, msg = 'unknown', tb = None):
if tb == None: tb = traceback.extract_stack()[-3]
if cond:
else:
_result(msg, tb, 'FAILED')
return cond
-
-
+
+
def _code():
(filename, line, func, text) = traceback.extract_stack()[-3]
- text = re.sub(r'^\w+\((.*)\)$', r'\1', unicode(text));
+ text = re.sub(r'^\w+\((.*)\)(\s*#.*)?$', r'\1', text);
return text
-
-
+
+
def WVPASS(cond = True):
- ''' Throws an exception unless cond is true. '''
+ ''' Counts a test failure unless cond is true. '''
return _check(cond, _code())
-
+
def WVFAIL(cond = True):
- ''' Throws an exception unless cond is false. '''
+ ''' Counts a test failure unless cond is false. '''
return _check(not cond, 'NOT(%s)' % _code())
-
+
def WVPASSEQ(a, b):
- ''' Throws an exception unless a == b. '''
+ ''' Counts a test failure unless a == b. '''
return _check(a == b, '%s == %s' % (repr(a), repr(b)))
-
+
def WVPASSNE(a, b):
- ''' Throws an exception unless a != b. '''
+ ''' Counts a test failure unless a != b. '''
return _check(a != b, '%s != %s' % (repr(a), repr(b)))
-
+
def WVPASSLT(a, b):
- ''' Throws an exception unless a < b. '''
+ ''' Counts a test failure unless a < b. '''
return _check(a < b, '%s < %s' % (repr(a), repr(b)))
-
+
def WVPASSLE(a, b):
- ''' Throws an exception unless a <= b. '''
+ ''' Counts a test failure unless a <= b. '''
return _check(a <= b, '%s <= %s' % (repr(a), repr(b)))
-
+
def WVPASSGT(a, b):
- ''' Throws an exception unless a > b. '''
+ ''' Counts a test failure unless a > b. '''
return _check(a > b, '%s > %s' % (repr(a), repr(b)))
-
+
def WVPASSGE(a, b):
- ''' Throws an exception unless a >= b. '''
+ ''' Counts a test failure unless a >= b. '''
return _check(a >= b, '%s >= %s' % (repr(a), repr(b)))
+ def WVEXCEPT(etype, func, *args, **kwargs):
+ ''' Counts a test failure unless func throws an 'etype' exception.
+ You have to spell out the function name and arguments, rather than
+ calling the function yourself, so that WVEXCEPT can run before
+ your test code throws an exception.
+ '''
+ try:
+ func(*args, **kwargs)
+ except etype, e:
+ return _check(True, 'EXCEPT(%s)' % _code())
+ except:
+ _check(False, 'EXCEPT(%s)' % _code())
+ raise
+ else:
+ return _check(False, 'EXCEPT(%s)' % _code())
+
else: # we're the main program
# NOTE
- # Why do we do this in such convoluted way? Because if you run
+ # Why do we do this in such a convoluted way? Because if you run
# wvtest.py as a main program and it imports your test files, then
# those test files will try to import the wvtest module recursively.
# That actually *works* fine, because we don't run this main program
# All this is done just so that wvtest.py can be a single file that's
# easy to import into your own applications.
import wvtest
-
+
def _runtest(modname, fname, f):
print
print 'Testing "%s" in %s.py:' % (fname, modname)
print
print traceback.format_exc()
tb = sys.exc_info()[2]
- wvtest._result(e, traceback.extract_tb(tb)[-1],
+ wvtest._result(e, traceback.extract_tb(tb)[1],
'EXCEPTION')
-
+
# main code
for modname in sys.argv[1:]:
if not os.path.exists(modname):
modname = modname[:-3]
print 'Importing: %s' % modname
wvtest._registered = []
- mod = __import__(modname.replace('/', '.'), None, None, [])
-
- for t in wvtest._registered:
- _runtest(modname, t.func_name, t)
- print
+ oldwd = os.getcwd()
+ oldpath = sys.path
+ try:
+ modpath = os.path.abspath(modname).split('/')[:-1]
+ os.chdir('/'.join(modpath))
+ sys.path += ['/'.join(modpath),
+ '/'.join(modpath[:-1])]
+ mod = __import__(modname.replace('/', '.'), None, None, [])
+ for t in wvtest._registered:
+ _runtest(modname, t.func_name, t)
+ print
+ finally:
+ os.chdir(oldwd)
+ sys.path = oldpath
print
print 'WvTest: %d tests, %d failures.' % (wvtest._tests, wvtest._fails)
+#
+# Include this file in your shell script by using:
+# #!/bin/sh
+# . ./wvtest.sh
+#
+
# we don't quote $TEXT in case it contains newlines; newlines
# aren't allowed in test output. However, we set -f so that
# at least shell glob characters aren't processed.
-_textclean()
+_wvtextclean()
{
( set -f; echo $* )
}
+
+if [ -n "$BASH_VERSION" ]; then
+ _wvfind_caller()
+ {
+ LVL=$1
+ WVCALLER_FILE=${BASH_SOURCE[2]}
+ WVCALLER_LINE=${BASH_LINENO[1]}
+ }
+else
+ _wvfind_caller()
+ {
+ LVL=$1
+ WVCALLER_FILE="unknown"
+ WVCALLER_LINE=0
+ }
+fi
+
+
_wvcheck()
{
CODE="$1"
- TEXT=$(_textclean "$2")
+ TEXT=$(_wvtextclean "$2")
OK=ok
if [ "$CODE" -ne 0 ]; then
OK=FAILED
fi
- echo "! ${BASH_SOURCE[2]}:${BASH_LINENO[1]} $TEXT $OK" >&2
+ echo "! $WVCALLER_FILE:$WVCALLER_LINE $TEXT $OK" >&2
if [ "$CODE" -ne 0 ]; then
exit $CODE
else
WVPASS()
{
TEXT="$*"
-
+
+ _wvfind_caller
if "$@"; then
_wvcheck 0 "$TEXT"
return 0
WVFAIL()
{
TEXT="$*"
-
+
+ _wvfind_caller
if "$@"; then
_wvcheck 1 "NOT($TEXT)"
# NOTREACHED
WVPASSEQ()
{
- WVPASS [ "$#" -eq 2 ]
+ _wvfind_caller
+ _wvcheck $(_wvgetrv [ "$#" -eq 2 ]) "exactly 2 arguments"
echo "Comparing:" >&2
echo "$1" >&2
echo "--" >&2
WVPASSNE()
{
- WVPASS [ "$#" -eq 2 ]
+ _wvfind_caller
+ _wvcheck $(_wvgetrv [ "$#" -eq 2 ]) "exactly 2 arguments"
echo "Comparing:" >&2
echo "$1" >&2
echo "--" >&2
}
+WVPASSRC()
+{
+ RC=$?
+ _wvfind_caller
+ _wvcheck $(_wvgetrv [ $RC -eq 0 ]) "return code($RC) == 0"
+}
+
+
+WVFAILRC()
+{
+ RC=$?
+ _wvfind_caller
+ _wvcheck $(_wvgetrv [ $RC -ne 0 ]) "return code($RC) != 0"
+}
+
+
WVSTART()
{
echo >&2
- echo "Testing \"$*\" in ${BASH_SOURCE[1]}:" >&2
+ _wvfind_caller
+ echo "Testing \"$*\" in $WVCALLER_FILE:" >&2
}
sub bigkill($)
{
my $pid = shift;
-
+
if (@log) {
print "\n" . join("\n", @log) . "\n";
}
-
+
print STDERR "\n! Killed by signal FAILED\n";
($pid > 0) || die("pid is '$pid'?!\n");
local $SIG{CHLD} = sub { }; # this will wake us from sleep() faster
kill 15, $pid;
sleep(2);
-
+
if ($pid > 1) {
kill 9, -$pid;
}
kill 9, $pid;
-
+
exit(125);
}
# parent
local $SIG{INT} = sub { bigkill($pid); };
local $SIG{TERM} = sub { bigkill($pid); };
-local $SIG{ALRM} = sub {
+local $SIG{ALRM} = sub {
print STDERR "Alarm timed out! No test results for too long.\n";
bigkill($pid);
};
{
my $result = shift;
my $pass = ($result eq "ok");
-
+
if ($istty) {
my $colour = $pass ? "\e[32;1m" : "\e[31;1m";
return "$colour$result\e[0m";
my ($floatsec, $warntime, $badtime) = @_;
my $ms = int($floatsec * 1000);
my $str = sprintf("%d.%03ds", $ms/1000, $ms % 1000);
-
+
if ($istty && $ms > $badtime) {
return "\e[31;1m$str\e[0m";
} elsif ($istty && $ms > $warntime) {
{
chomp;
s/\r//g;
-
+
if (/^\s*Testing "(.*)" in (.*):\s*$/)
{
alarm(120);
my ($sect, $file) = ($1, $2);
-
+
endsect();
-
+
printf("! %s %s: ", $file, $sect);
@log = ();
$start = $stop;
elsif (/^!\s*(.*?)\s+(\S+)\s*$/)
{
alarm(120);
-
+
my ($name, $result) = ($1, $2);
my $pass = ($result eq "ok");
-
+
if (!$start) {
printf("\n! Startup: ");
$start = time();
}
-
+
push @log, resultline($name, $result);
-
+
if (!$pass) {
$gfails++;
if (@log) {