circuitpython/tests/run-multitests.py
Damien George df9a949891 tests/run-multitests.py: Add new test runner for multiple Py instances.
This commit adds a test runner and initial test scripts which run multiple
Python/MicroPython instances (eg executables, target boards) in parallel.
This is useful for testing, eg, network and Bluetooth functionality.

Each test file has a set of functions called instanceX(), where X ranges
from 0 up to the maximum number of instances that are needed, N-1.  Then
run-multitests.py will execute this script on N separate instances (eg
micropython executables, or attached boards via pyboard.py) at the same
time, synchronising their start in the right order, possibly passing IP
address (or other address like bluetooth MAC) from the "server" instance to
the "client" instances so they can connect to each other.  It then runs
them to completion, collects the output, and then tests against what
CPython gives (or what's in a provided .py.exp file).

The tests will be run using the standard unix executable for all instances
by default, eg:

    $ ./run-multitests.py multi_net/*.py

Or they can be run with a board and unix executable via:

    $ ./run-multitests.py --instance pyb:/dev/ttyACM0 --instance exec:micropython multi_net/*.py
2020-03-10 02:22:34 +11:00

397 lines
12 KiB
Python
Executable File

#!/usr/bin/env python3
# This file is part of the MicroPython project, http://micropython.org/
# The MIT License (MIT)
# Copyright (c) 2020 Damien P. George
import sys, os, time, re, select
import argparse
import subprocess
sys.path.append("../tools")
import pyboard
if os.name == "nt":
CPYTHON3 = os.getenv("MICROPY_CPYTHON3", "python3.exe")
MICROPYTHON = os.getenv("MICROPY_MICROPYTHON", "../ports/windows/micropython.exe")
else:
CPYTHON3 = os.getenv("MICROPY_CPYTHON3", "python3")
MICROPYTHON = os.getenv("MICROPY_MICROPYTHON", "../ports/unix/micropython")
PYTHON_TRUTH = CPYTHON3
INSTANCE_READ_TIMEOUT_S = 10
APPEND_CODE_TEMPLATE = """
import sys
class multitest:
@staticmethod
def flush():
if hasattr(sys.stdout, "flush"):
sys.stdout.flush()
@staticmethod
def skip():
print("SKIP")
multitest.flush()
raise SystemExit
@staticmethod
def next():
print("NEXT")
multitest.flush()
@staticmethod
def globals(**gs):
for g in gs:
print("SET {{}} = {{!r}}".format(g, gs[g]))
multitest.flush()
@staticmethod
def get_network_ip():
try:
import network
ip = network.WLAN().ifconfig()[0]
except:
ip = "127.0.0.1"
return ip
{}
instance{}()
multitest.flush()
"""
class PyInstance:
def __init__(self):
pass
def close(self):
pass
def prepare_script_from_file(self, filename, prepend, append):
with open(filename, "rb") as f:
script = f.read()
if prepend:
script = bytes(prepend, "ascii") + b"\n" + script
if append:
script += b"\n" + bytes(append, "ascii")
return script
def run_file(self, filename, prepend="", append=""):
return self.run_script(self.prepare_script_from_file(filename, prepend, append))
def start_file(self, filename, prepend="", append=""):
return self.start_script(self.prepare_script_from_file(filename, prepend, append))
class PyInstanceSubProcess(PyInstance):
def __init__(self, cmd):
self.cmd = cmd
self.popen = None
self.finished = True
def __str__(self):
return self.cmd[0].rsplit("/")[-1]
def run_script(self, script):
output = b""
err = None
try:
p = subprocess.run(
self.cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, input=script
)
output = p.stdout
except subprocess.CalledProcessError as er:
err = er
return str(output.strip(), "ascii"), err
def start_script(self, script):
self.popen = subprocess.Popen(
self.cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)
self.popen.stdin.write(script)
self.popen.stdin.close()
self.finished = False
def stop(self):
if self.popen and self.popen.poll() is None:
self.popen.terminate()
def readline(self):
sel = select.select([self.popen.stdout.raw], [], [], 0.1)
if not sel[0]:
self.finished = self.popen.poll() is not None
return None, None
out = self.popen.stdout.raw.readline()
if out == b"":
self.finished = self.popen.poll() is not None
return None, None
else:
return str(out.rstrip(), "ascii"), None
def is_finished(self):
return self.finished
def wait_finished(self):
self.popen.wait()
out = self.popen.stdout.read()
return str(out, "ascii"), ""
class PyInstancePyboard(PyInstance):
def __init__(self, device):
self.device = device
self.pyb = pyboard.Pyboard(device)
self.pyb.enter_raw_repl()
self.finished = True
def __str__(self):
return self.device.rsplit("/")[-1]
def close(self):
self.pyb.exit_raw_repl()
self.pyb.close()
def run_script(self, script):
output = b""
err = None
try:
self.pyb.enter_raw_repl()
output = self.pyb.exec_(script)
except pyboard.PyboardError as er:
err = er
return str(output.strip(), "ascii"), err
def start_script(self, script):
self.pyb.enter_raw_repl()
self.pyb.exec_raw_no_follow(script)
self.finished = False
def stop(self):
self.pyb.serial.write(b"\r\x03")
def readline(self):
if self.finished:
return None, None
if self.pyb.serial.inWaiting() == 0:
return None, None
out = self.pyb.read_until(1, (b"\r\n", b"\x04"))
if out.endswith(b"\x04"):
self.finished = True
out = out[:-1]
err = str(self.pyb.read_until(1, b"\x04"), "ascii")
err = err[:-1]
if not out and not err:
return None, None
else:
err = None
return str(out.rstrip(), "ascii"), err
def is_finished(self):
return self.finished
def wait_finished(self):
out, err = self.pyb.follow(10, None)
return str(out, "ascii"), str(err, "ascii")
def prepare_test_file_list(test_files):
test_files2 = []
for test_file in sorted(test_files):
num_instances = 0
with open(test_file) as f:
for line in f:
m = re.match(r"def instance([0-9]+)\(\):", line)
if m:
num_instances = max(num_instances, int(m.group(1)) + 1)
test_files2.append((test_file, num_instances))
return test_files2
def trace_instance_output(instance_idx, line):
if cmd_args.trace_output:
t_ms = round((time.time() - trace_t0) * 1000)
print("{:6} i{} :".format(t_ms, instance_idx), line)
def run_test_on_instances(test_file, num_instances, instances):
global trace_t0
trace_t0 = time.time()
error = False
skip = False
injected_globals = ""
output = [[] for _ in range(num_instances)]
# Start all instances running, in order, waiting until they signal they are ready
for idx in range(num_instances):
append_code = APPEND_CODE_TEMPLATE.format(injected_globals, idx)
instance = instances[idx]
instance.start_file(test_file, append=append_code)
last_read_time = time.time()
while True:
if instance.is_finished():
break
out, err = instance.readline()
if out is None and err is None:
if time.time() > last_read_time + INSTANCE_READ_TIMEOUT_S:
output[idx].append("TIMEOUT")
error = True
break
time.sleep(0.1)
continue
last_read_time = time.time()
if out is not None:
trace_instance_output(idx, out)
if out.startswith("SET "):
injected_globals += out[4:] + "\n"
elif out == "SKIP":
skip = True
break
elif out == "NEXT":
break
else:
output[idx].append(out)
if err is not None:
trace_instance_output(idx, err)
output[idx].append(err)
error = True
if error or skip:
break
if not error and not skip:
# Capture output and wait for all instances to finish running
last_read_time = [time.time() for _ in range(num_instances)]
while True:
num_running = 0
num_output = 0
for idx in range(num_instances):
instance = instances[idx]
if instance.is_finished():
continue
num_running += 1
out, err = instance.readline()
if out is None and err is None:
if time.time() > last_read_time[idx] + INSTANCE_READ_TIMEOUT_S:
output[idx].append("TIMEOUT")
error = True
continue
num_output += 1
last_read_time[idx] = time.time()
if out is not None:
trace_instance_output(idx, out)
output[idx].append(out)
if err is not None:
trace_instance_output(idx, err)
output[idx].append(err)
error = True
if not num_output:
time.sleep(0.1)
if not num_running or error:
break
# Stop all instances
for idx in range(num_instances):
instances[idx].stop()
output_str = ""
for idx, lines in enumerate(output):
output_str += "--- instance{} ---\n".format(idx)
output_str += "\n".join(lines) + "\n"
return error, skip, output_str
def run_tests(test_files, instances_truth, instances_test):
for test_file, num_instances in test_files:
instances_str = "|".join(str(instances_test[i]) for i in range(num_instances))
print("{} on {}: ".format(test_file, instances_str), end="")
if cmd_args.show_output or cmd_args.trace_output:
print()
sys.stdout.flush()
# Run test on test instances
error, skip, output_test = run_test_on_instances(test_file, num_instances, instances_test)
if cmd_args.show_output:
print("### TEST ###")
print(output_test, end="")
if not skip:
# Check if truth exists in a file, and read it in
test_file_expected = test_file + ".exp"
if os.path.isfile(test_file_expected):
with open(test_file_expected) as f:
output_truth = f.read()
else:
# Run test on truth instances to get expected output
_, _, output_truth = run_test_on_instances(
test_file, num_instances, instances_truth
)
if cmd_args.show_output:
print("### TRUTH ###")
print(output_truth, end="")
# Print result of test
if skip:
print("skip")
elif output_test == output_truth:
print("pass")
else:
print("FAIL")
if not cmd_args.show_output:
print("### TEST ###")
print(output_test, end="")
print("### TRUTH ###")
print(output_truth, end="")
if cmd_args.show_output:
print()
def main():
global cmd_args
cmd_parser = argparse.ArgumentParser(description="Run network tests for MicroPython")
cmd_parser.add_argument(
"-s", "--show-output", action="store_true", help="show test output after running"
)
cmd_parser.add_argument(
"-t", "--trace-output", action="store_true", help="trace test output while running"
)
cmd_parser.add_argument(
"-i", "--instance", action="append", default=[], help="instance(s) to run the tests on"
)
cmd_parser.add_argument("files", nargs="+", help="input test files")
cmd_args = cmd_parser.parse_args()
test_files = prepare_test_file_list(cmd_args.files)
max_instances = max(t[1] for t in test_files)
instances_truth = [PyInstanceSubProcess([PYTHON_TRUTH]) for _ in range(max_instances)]
instances_test = []
for i in cmd_args.instance:
if i.startswith("exec:"):
instances_test.append(PyInstanceSubProcess([i[len("exec:") :]]))
elif i.startswith("pyb:"):
instances_test.append(PyInstancePyboard(i[len("pyb:") :]))
else:
print("unknown instance string: {}".format(i), file=sys.stderr)
sys.exit(1)
for _ in range(max_instances - len(instances_test)):
instances_test.append(PyInstanceSubProcess([MICROPYTHON]))
try:
run_tests(test_files, instances_truth, instances_test)
finally:
for i in instances_truth:
i.close()
for i in instances_test:
i.close()
if __name__ == "__main__":
main()