ledger/test/RegressTests.py
2012-03-01 17:32:51 -06:00

198 lines
5.5 KiB
Python
Executable file

#!/usr/bin/env python
import sys
import os
import re
import tempfile
multiproc = False
try:
from multiprocessing import Pool
multiproc = True
except:
pass
from string import join
from difflib import unified_diff
from LedgerHarness import LedgerHarness
args = sys.argv
jobs = 1
match = re.match('-j([0-9]+)?', args[1])
if match:
args = [args[0]] + args[2:]
if match.group(1):
jobs = int(match.group(1))
if jobs == 1:
multiproc = False
harness = LedgerHarness(args)
tests = args[3]
if not os.path.isdir(tests) and not os.path.isfile(tests):
sys.exit(1)
class RegressFile(object):
def __init__(self, filename):
self.filename = filename
self.fd = open(self.filename)
def transform_line(self, line):
line = re.sub('\$sourcepath', harness.sourcepath, line)
return line
def read_test(self):
test = {
'command': None,
'output': None,
'error': None,
'exitcode': 0
}
in_output = False
in_error = False
line = self.fd.readline()
#print "line =", line
while line:
if line.startswith("test "):
command = line[5:]
match = re.match('(.*) -> ([0-9]+)', command)
if match:
test['command'] = self.transform_line(match.group(1))
test['exitcode'] = int(match.group(2))
else:
test['command'] = command
in_output = True
elif in_output:
if line.startswith("end test"):
in_output = in_error = False
break
elif in_error:
if test['error'] is None:
test['error'] = []
test['error'].append(self.transform_line(line))
else:
if line.startswith("__ERROR__"):
in_error = True
else:
if test['output'] is None:
test['output'] = []
test['output'].append(self.transform_line(line))
line = self.fd.readline()
#print "line =", line
return test['command'] and test
def notify_user(self, msg, test):
print msg
print "--"
print test['command'],
print "--"
def run_test(self, test):
use_stdin = False
if test['command'].find("-f - ") != -1:
use_stdin = True
else:
test['command'] = (('$ledger -f "%s" ' % self.filename) +
test['command'])
p = harness.run(test['command'],
columns=(not re.search('--columns', test['command'])))
if use_stdin:
fd = open(self.filename)
try:
p.stdin.write(fd.read())
finally:
fd.close()
p.stdin.close()
success = True
printed = False
index = 0
if test['output'] is not None:
for line in unified_diff(test['output'], harness.readlines(p.stdout)):
index += 1
if index < 3:
continue
if not printed:
if success: print
self.notify_user("FAILURE in output from %s:" % self.filename, test)
success = False
printed = True
print " ", line,
printed = False
index = 0
if test['error'] is not None:
for line in unified_diff([re.sub('\$FILE', self.filename, line)
for line in test['error']],
harness.readlines(p.stderr)):
index += 1
if index < 3:
continue
if not printed:
if success: print
self.notify_user("FAILURE in error output from %s:"
% self.filename, test)
success = False
printed = True
print " ", line,
if test['exitcode'] is None or test['exitcode'] == p.wait():
if success:
harness.success()
else:
harness.failure()
else:
if success: print
if test['exitcode']:
self.notify_user("FAILURE in exit code (%d != %d) from %s:"
% (test['exitcode'], p.returncode, self.filename),
test)
harness.failure()
def run_tests(self):
test = self.read_test()
while test:
self.run_test(test)
test = self.read_test()
def close(self):
self.fd.close()
def do_test(path):
entry = RegressFile(path)
entry.run_tests()
entry.close()
if __name__ == '__main__':
if multiproc:
pool = Pool(jobs*2)
else:
pool = None
if os.path.isdir(tests):
tests = [os.path.join(tests, x)
for x in os.listdir(tests)
if (x.endswith('.test') and
(not '_py.test' in x or harness.python))]
if pool:
pool.map(do_test, tests, 1)
else:
map(do_test, tests)
else:
entry = RegressFile(tests)
entry.run_tests()
entry.close()
if pool:
pool.close()
pool.join()
harness.exit()