3dc324d3f1
This adds the Python files in the tests/ directory to be formatted with ./tools/codeformat.py. The basics/ subdirectory is excluded for now so we aren't changing too much at once. In a few places `# fmt: off`/`# fmt: on` was used where the code had special formatting for readability or where the test was actually testing the specific formatting.
420 lines
13 KiB
Python
Executable File
420 lines
13 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
# This file is part of the MicroPython project, http://micropython.org/
|
|
# The MIT License (MIT)
|
|
# Copyright (c) 2020 Damien P. George
|
|
|
|
import sys, os, time, re, select
|
|
import argparse
|
|
import subprocess
|
|
|
|
sys.path.append("../tools")
|
|
import pyboard
|
|
|
|
if os.name == "nt":
|
|
CPYTHON3 = os.getenv("MICROPY_CPYTHON3", "python3.exe")
|
|
MICROPYTHON = os.getenv("MICROPY_MICROPYTHON", "../ports/windows/micropython.exe")
|
|
else:
|
|
CPYTHON3 = os.getenv("MICROPY_CPYTHON3", "python3")
|
|
MICROPYTHON = os.getenv("MICROPY_MICROPYTHON", "../ports/unix/micropython")
|
|
|
|
PYTHON_TRUTH = CPYTHON3
|
|
|
|
INSTANCE_READ_TIMEOUT_S = 10
|
|
|
|
APPEND_CODE_TEMPLATE = """
|
|
import sys
|
|
class multitest:
|
|
@staticmethod
|
|
def flush():
|
|
if hasattr(sys.stdout, "flush"):
|
|
sys.stdout.flush()
|
|
@staticmethod
|
|
def skip():
|
|
print("SKIP")
|
|
multitest.flush()
|
|
raise SystemExit
|
|
@staticmethod
|
|
def next():
|
|
print("NEXT")
|
|
multitest.flush()
|
|
@staticmethod
|
|
def globals(**gs):
|
|
for g in gs:
|
|
print("SET {{}} = {{!r}}".format(g, gs[g]))
|
|
multitest.flush()
|
|
@staticmethod
|
|
def get_network_ip():
|
|
try:
|
|
import network
|
|
ip = network.WLAN().ifconfig()[0]
|
|
except:
|
|
ip = "127.0.0.1"
|
|
return ip
|
|
|
|
{}
|
|
|
|
instance{}()
|
|
multitest.flush()
|
|
"""
|
|
|
|
|
|
class PyInstance:
|
|
def __init__(self):
|
|
pass
|
|
|
|
def close(self):
|
|
pass
|
|
|
|
def prepare_script_from_file(self, filename, prepend, append):
|
|
with open(filename, "rb") as f:
|
|
script = f.read()
|
|
if prepend:
|
|
script = bytes(prepend, "ascii") + b"\n" + script
|
|
if append:
|
|
script += b"\n" + bytes(append, "ascii")
|
|
return script
|
|
|
|
def run_file(self, filename, prepend="", append=""):
|
|
return self.run_script(self.prepare_script_from_file(filename, prepend, append))
|
|
|
|
def start_file(self, filename, prepend="", append=""):
|
|
return self.start_script(self.prepare_script_from_file(filename, prepend, append))
|
|
|
|
|
|
class PyInstanceSubProcess(PyInstance):
|
|
def __init__(self, cmd):
|
|
self.cmd = cmd
|
|
self.popen = None
|
|
self.finished = True
|
|
|
|
def __str__(self):
|
|
return self.cmd[0].rsplit("/")[-1]
|
|
|
|
def run_script(self, script):
|
|
output = b""
|
|
err = None
|
|
try:
|
|
p = subprocess.run(
|
|
self.cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, input=script
|
|
)
|
|
output = p.stdout
|
|
except subprocess.CalledProcessError as er:
|
|
err = er
|
|
return str(output.strip(), "ascii"), err
|
|
|
|
def start_script(self, script):
|
|
self.popen = subprocess.Popen(
|
|
self.cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
|
|
)
|
|
self.popen.stdin.write(script)
|
|
self.popen.stdin.close()
|
|
self.finished = False
|
|
|
|
def stop(self):
|
|
if self.popen and self.popen.poll() is None:
|
|
self.popen.terminate()
|
|
|
|
def readline(self):
|
|
sel = select.select([self.popen.stdout.raw], [], [], 0.1)
|
|
if not sel[0]:
|
|
self.finished = self.popen.poll() is not None
|
|
return None, None
|
|
out = self.popen.stdout.raw.readline()
|
|
if out == b"":
|
|
self.finished = self.popen.poll() is not None
|
|
return None, None
|
|
else:
|
|
return str(out.rstrip(), "ascii"), None
|
|
|
|
def is_finished(self):
|
|
return self.finished
|
|
|
|
def wait_finished(self):
|
|
self.popen.wait()
|
|
out = self.popen.stdout.read()
|
|
return str(out, "ascii"), ""
|
|
|
|
|
|
class PyInstancePyboard(PyInstance):
|
|
def __init__(self, device):
|
|
self.device = device
|
|
self.pyb = pyboard.Pyboard(device)
|
|
self.pyb.enter_raw_repl()
|
|
self.finished = True
|
|
|
|
def __str__(self):
|
|
return self.device.rsplit("/")[-1]
|
|
|
|
def close(self):
|
|
self.pyb.exit_raw_repl()
|
|
self.pyb.close()
|
|
|
|
def run_script(self, script):
|
|
output = b""
|
|
err = None
|
|
try:
|
|
self.pyb.enter_raw_repl()
|
|
output = self.pyb.exec_(script)
|
|
except pyboard.PyboardError as er:
|
|
err = er
|
|
return str(output.strip(), "ascii"), err
|
|
|
|
def start_script(self, script):
|
|
self.pyb.enter_raw_repl()
|
|
self.pyb.exec_raw_no_follow(script)
|
|
self.finished = False
|
|
|
|
def stop(self):
|
|
self.pyb.serial.write(b"\r\x03")
|
|
|
|
def readline(self):
|
|
if self.finished:
|
|
return None, None
|
|
if self.pyb.serial.inWaiting() == 0:
|
|
return None, None
|
|
out = self.pyb.read_until(1, (b"\r\n", b"\x04"))
|
|
if out.endswith(b"\x04"):
|
|
self.finished = True
|
|
out = out[:-1]
|
|
err = str(self.pyb.read_until(1, b"\x04"), "ascii")
|
|
err = err[:-1]
|
|
if not out and not err:
|
|
return None, None
|
|
else:
|
|
err = None
|
|
return str(out.rstrip(), "ascii"), err
|
|
|
|
def is_finished(self):
|
|
return self.finished
|
|
|
|
def wait_finished(self):
|
|
out, err = self.pyb.follow(10, None)
|
|
return str(out, "ascii"), str(err, "ascii")
|
|
|
|
|
|
def prepare_test_file_list(test_files):
|
|
test_files2 = []
|
|
for test_file in sorted(test_files):
|
|
num_instances = 0
|
|
with open(test_file) as f:
|
|
for line in f:
|
|
m = re.match(r"def instance([0-9]+)\(\):", line)
|
|
if m:
|
|
num_instances = max(num_instances, int(m.group(1)) + 1)
|
|
test_files2.append((test_file, num_instances))
|
|
return test_files2
|
|
|
|
|
|
def trace_instance_output(instance_idx, line):
|
|
if cmd_args.trace_output:
|
|
t_ms = round((time.time() - trace_t0) * 1000)
|
|
print("{:6} i{} :".format(t_ms, instance_idx), line)
|
|
|
|
|
|
def run_test_on_instances(test_file, num_instances, instances):
|
|
global trace_t0
|
|
trace_t0 = time.time()
|
|
|
|
error = False
|
|
skip = False
|
|
injected_globals = ""
|
|
output = [[] for _ in range(num_instances)]
|
|
|
|
# Start all instances running, in order, waiting until they signal they are ready
|
|
for idx in range(num_instances):
|
|
append_code = APPEND_CODE_TEMPLATE.format(injected_globals, idx)
|
|
instance = instances[idx]
|
|
instance.start_file(test_file, append=append_code)
|
|
last_read_time = time.time()
|
|
while True:
|
|
if instance.is_finished():
|
|
break
|
|
out, err = instance.readline()
|
|
if out is None and err is None:
|
|
if time.time() > last_read_time + INSTANCE_READ_TIMEOUT_S:
|
|
output[idx].append("TIMEOUT")
|
|
error = True
|
|
break
|
|
time.sleep(0.1)
|
|
continue
|
|
last_read_time = time.time()
|
|
if out is not None:
|
|
trace_instance_output(idx, out)
|
|
if out.startswith("SET "):
|
|
injected_globals += out[4:] + "\n"
|
|
elif out == "SKIP":
|
|
skip = True
|
|
break
|
|
elif out == "NEXT":
|
|
break
|
|
else:
|
|
output[idx].append(out)
|
|
if err is not None:
|
|
trace_instance_output(idx, err)
|
|
output[idx].append(err)
|
|
error = True
|
|
|
|
if error or skip:
|
|
break
|
|
|
|
if not error and not skip:
|
|
# Capture output and wait for all instances to finish running
|
|
last_read_time = [time.time() for _ in range(num_instances)]
|
|
while True:
|
|
num_running = 0
|
|
num_output = 0
|
|
for idx in range(num_instances):
|
|
instance = instances[idx]
|
|
if instance.is_finished():
|
|
continue
|
|
num_running += 1
|
|
out, err = instance.readline()
|
|
if out is None and err is None:
|
|
if time.time() > last_read_time[idx] + INSTANCE_READ_TIMEOUT_S:
|
|
output[idx].append("TIMEOUT")
|
|
error = True
|
|
continue
|
|
num_output += 1
|
|
last_read_time[idx] = time.time()
|
|
if out is not None:
|
|
trace_instance_output(idx, out)
|
|
output[idx].append(out)
|
|
if err is not None:
|
|
trace_instance_output(idx, err)
|
|
output[idx].append(err)
|
|
error = True
|
|
|
|
if not num_output:
|
|
time.sleep(0.1)
|
|
if not num_running or error:
|
|
break
|
|
|
|
# Stop all instances
|
|
for idx in range(num_instances):
|
|
instances[idx].stop()
|
|
|
|
output_str = ""
|
|
for idx, lines in enumerate(output):
|
|
output_str += "--- instance{} ---\n".format(idx)
|
|
output_str += "\n".join(lines) + "\n"
|
|
|
|
return error, skip, output_str
|
|
|
|
|
|
def run_tests(test_files, instances_truth, instances_test):
|
|
skipped_tests = []
|
|
passed_tests = []
|
|
failed_tests = []
|
|
|
|
for test_file, num_instances in test_files:
|
|
instances_str = "|".join(str(instances_test[i]) for i in range(num_instances))
|
|
print("{} on {}: ".format(test_file, instances_str), end="")
|
|
if cmd_args.show_output or cmd_args.trace_output:
|
|
print()
|
|
sys.stdout.flush()
|
|
|
|
# Run test on test instances
|
|
error, skip, output_test = run_test_on_instances(test_file, num_instances, instances_test)
|
|
|
|
if cmd_args.show_output:
|
|
print("### TEST ###")
|
|
print(output_test, end="")
|
|
|
|
if not skip:
|
|
# Check if truth exists in a file, and read it in
|
|
test_file_expected = test_file + ".exp"
|
|
if os.path.isfile(test_file_expected):
|
|
with open(test_file_expected) as f:
|
|
output_truth = f.read()
|
|
else:
|
|
# Run test on truth instances to get expected output
|
|
_, _, output_truth = run_test_on_instances(
|
|
test_file, num_instances, instances_truth
|
|
)
|
|
if cmd_args.show_output:
|
|
print("### TRUTH ###")
|
|
print(output_truth, end="")
|
|
|
|
# Print result of test
|
|
if skip:
|
|
print("skip")
|
|
skipped_tests.append(test_file)
|
|
elif output_test == output_truth:
|
|
print("pass")
|
|
passed_tests.append(test_file)
|
|
else:
|
|
print("FAIL")
|
|
failed_tests.append(test_file)
|
|
if not cmd_args.show_output:
|
|
print("### TEST ###")
|
|
print(output_test, end="")
|
|
print("### TRUTH ###")
|
|
print(output_truth, end="")
|
|
|
|
if cmd_args.show_output:
|
|
print()
|
|
|
|
print("{} tests performed".format(len(skipped_tests) + len(passed_tests) + len(failed_tests)))
|
|
print("{} tests passed".format(len(passed_tests)))
|
|
|
|
if skipped_tests:
|
|
print("{} tests skipped: {}".format(len(skipped_tests), " ".join(skipped_tests)))
|
|
if failed_tests:
|
|
print("{} tests failed: {}".format(len(failed_tests), " ".join(failed_tests)))
|
|
|
|
return not failed_tests
|
|
|
|
|
|
def main():
|
|
global cmd_args
|
|
|
|
cmd_parser = argparse.ArgumentParser(description="Run network tests for MicroPython")
|
|
cmd_parser.add_argument(
|
|
"-s", "--show-output", action="store_true", help="show test output after running"
|
|
)
|
|
cmd_parser.add_argument(
|
|
"-t", "--trace-output", action="store_true", help="trace test output while running"
|
|
)
|
|
cmd_parser.add_argument(
|
|
"-i", "--instance", action="append", default=[], help="instance(s) to run the tests on"
|
|
)
|
|
cmd_parser.add_argument("files", nargs="+", help="input test files")
|
|
cmd_args = cmd_parser.parse_args()
|
|
|
|
# clear search path to make sure tests use only builtin modules and those in extmod
|
|
os.environ["MICROPYPATH"] = os.pathsep + "../extmod"
|
|
|
|
test_files = prepare_test_file_list(cmd_args.files)
|
|
max_instances = max(t[1] for t in test_files)
|
|
|
|
instances_truth = [PyInstanceSubProcess([PYTHON_TRUTH]) for _ in range(max_instances)]
|
|
|
|
instances_test = []
|
|
for i in cmd_args.instance:
|
|
if i.startswith("exec:"):
|
|
instances_test.append(PyInstanceSubProcess([i[len("exec:") :]]))
|
|
elif i.startswith("pyb:"):
|
|
instances_test.append(PyInstancePyboard(i[len("pyb:") :]))
|
|
else:
|
|
print("unknown instance string: {}".format(i), file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
for _ in range(max_instances - len(instances_test)):
|
|
instances_test.append(PyInstanceSubProcess([MICROPYTHON]))
|
|
|
|
try:
|
|
all_pass = run_tests(test_files, instances_truth, instances_test)
|
|
finally:
|
|
for i in instances_truth:
|
|
i.close()
|
|
for i in instances_test:
|
|
i.close()
|
|
|
|
if not all_pass:
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|