This is done using the multiprocessing library in Python 2.6. If that's unavailable, this feature does nothing.
189 lines
5.2 KiB
Python
Executable file
189 lines
5.2 KiB
Python
Executable file
#!/usr/bin/env python
|
|
|
|
import sys
|
|
import os
|
|
import re
|
|
import tempfile
|
|
|
|
multiproc = False
|
|
try:
|
|
from multiprocessing import Pool
|
|
multiproc = True
|
|
except:
|
|
pass
|
|
|
|
from string import join
|
|
from difflib import unified_diff
|
|
|
|
from LedgerHarness import LedgerHarness
|
|
|
|
args = sys.argv
|
|
jobs = 1
|
|
match = re.match('-j([0-9]+)?', args[1])
|
|
if match:
|
|
args = [args[0]] + args[2:]
|
|
if match.group(1):
|
|
jobs = int(match.group(1))
|
|
if jobs == 1:
|
|
multiproc = False
|
|
|
|
harness = LedgerHarness(args)
|
|
tests = args[3]
|
|
|
|
if not os.path.isdir(tests) and not os.path.isfile(tests):
|
|
sys.exit(1)
|
|
|
|
class RegressFile(object):
|
|
def __init__(self, filename):
|
|
self.filename = filename
|
|
self.fd = open(self.filename)
|
|
|
|
def is_directive(self, line):
|
|
return line == "<<<\n" or \
|
|
line == ">>>\n" or \
|
|
line == ">>>1\n" or \
|
|
line == ">>>2\n" or \
|
|
line.startswith("===")
|
|
|
|
def transform_line(self, line):
|
|
line = re.sub('\$sourcepath', harness.sourcepath, line)
|
|
return line
|
|
|
|
def read_section(self):
|
|
lines = []
|
|
line = self.fd.readline()
|
|
while not self.is_directive(line):
|
|
lines.append(self.transform_line(line))
|
|
line = self.fd.readline()
|
|
return (lines, line)
|
|
|
|
def read_test(self, last_test = None):
|
|
test = {
|
|
'command': None,
|
|
'input': "",
|
|
'output': "",
|
|
'error': "",
|
|
'exitcode': 0
|
|
}
|
|
if last_test:
|
|
test['input'] = last_test['input']
|
|
|
|
line = self.fd.readline()
|
|
while line:
|
|
if line == "<<<\n":
|
|
(test['input'], line) = self.read_section()
|
|
elif line == ">>>\n" or line == ">>>1\n":
|
|
(test['output'], line) = self.read_section()
|
|
elif line == ">>>2\n":
|
|
(test['error'], line) = self.read_section()
|
|
elif line.startswith("==="):
|
|
match = re.match('=== ([0-9]+)', line)
|
|
assert match
|
|
test['exitcode'] = int(match.group(1))
|
|
return test
|
|
else:
|
|
test['command'] = self.transform_line(line)
|
|
line = self.fd.readline()
|
|
|
|
return None
|
|
|
|
def run_test(self, test):
|
|
use_stdin = False
|
|
if test['command'].find("-f - ") != -1:
|
|
use_stdin = True
|
|
|
|
test['command'] = '$ledger ' + test['command']
|
|
else:
|
|
tempdata = tempfile.mkstemp()
|
|
|
|
os.write(tempdata[0], join(test['input'], ''))
|
|
os.close(tempdata[0])
|
|
|
|
test['command'] = (('$ledger -f "%s" ' % tempdata[1]) +
|
|
test['command'])
|
|
|
|
p = harness.run(test['command'],
|
|
columns=(not re.search('--columns', test['command'])))
|
|
|
|
if use_stdin:
|
|
p.stdin.write(join(test['input'], ''))
|
|
p.stdin.close()
|
|
|
|
success = True
|
|
printed = False
|
|
index = 0
|
|
for line in unified_diff(test['output'], harness.readlines(p.stdout)):
|
|
index += 1
|
|
if index < 3:
|
|
continue
|
|
if not printed:
|
|
if success: print
|
|
print "Regression failure in output from %s:" % \
|
|
os.path.basename(self.filename)
|
|
success = False
|
|
printed = True
|
|
print " ", line,
|
|
|
|
printed = False
|
|
index = 0
|
|
for line in unified_diff([re.sub('\$FILE', tempdata[1], line)
|
|
for line in test['error']],
|
|
harness.readlines(p.stderr)):
|
|
index += 1
|
|
if index < 3:
|
|
continue
|
|
if not printed:
|
|
if success: print
|
|
print "Regression failure in error output from %s:" % \
|
|
os.path.basename(self.filename)
|
|
success = False
|
|
printed = True
|
|
print " ", line,
|
|
|
|
if test['exitcode'] == p.wait():
|
|
if success:
|
|
harness.success()
|
|
else:
|
|
harness.failure()
|
|
else:
|
|
if success: print
|
|
print "Regression failure in exitcode from %s: %d (expected) != %d" % \
|
|
(os.path.basename(self.filename), test['exitcode'], p.returncode)
|
|
harness.failure()
|
|
|
|
if not use_stdin:
|
|
os.remove(tempdata[1])
|
|
|
|
def run_tests(self, pool):
|
|
test = self.read_test()
|
|
while test:
|
|
if pool:
|
|
pool.apply_async(RegressFile.run_test, (self, test,))
|
|
else:
|
|
self.run_test(test)
|
|
test = self.read_test(test)
|
|
|
|
def close(self):
|
|
self.fd.close()
|
|
|
|
if __name__ == '__main__':
|
|
if multiproc:
|
|
pool = Pool(jobs*2)
|
|
else:
|
|
pool = None
|
|
|
|
if os.path.isdir(tests):
|
|
for test_file in os.listdir(tests):
|
|
if re.search('\.test$', test_file):
|
|
entry = RegressFile(os.path.join(tests, test_file))
|
|
entry.run_tests(pool)
|
|
entry.close()
|
|
else:
|
|
entry = RegressFile(tests)
|
|
entry.run_tests(pool)
|
|
entry.close()
|
|
|
|
if pool:
|
|
pool.close()
|
|
pool.join()
|
|
harness.exit()
|