tools/mpremote: Simplify dispatch of commands.

No functional change.

This makes each built-in command defined by just a handler method and
simplifies a lot of the logic around tracking the board state.

This work was funded through GitHub Sponsors.

Signed-off-by: Jim Mussared <jim.mussared@gmail.com>
This commit is contained in:
Jim Mussared 2022-09-30 11:46:23 +10:00
parent 282401da5c
commit 413a69b94b
3 changed files with 507 additions and 406 deletions

View File

@ -0,0 +1,261 @@
import os
import sys
import tempfile
import serial.tools.list_ports
from . import pyboardextended as pyboard
class CommandError(Exception):
pass
do_disconnect(state)
try:
if dev == "list":
# List attached devices.
for p in sorted(serial.tools.list_ports.comports()):
print(
"{} {} {:04x}:{:04x} {} {}".format(
p.device,
p.serial_number,
p.vid if isinstance(p.vid, int) else 0,
p.pid if isinstance(p.pid, int) else 0,
p.manufacturer,
p.product,
)
)
# Don't do implicit REPL command.
state.did_action()
elif dev == "auto":
# Auto-detect and auto-connect to the first available device.
for p in sorted(serial.tools.list_ports.comports()):
try:
state.pyb = pyboard.PyboardExtended(p.device, baudrate=115200)
return
except pyboard.PyboardError as er:
if not er.args[0].startswith("failed to access"):
raise er
raise pyboard.PyboardError("no device found")
elif dev.startswith("id:"):
# Search for a device with the given serial number.
serial_number = dev[len("id:") :]
dev = None
for p in serial.tools.list_ports.comports():
if p.serial_number == serial_number:
state.pyb = pyboard.PyboardExtended(p.device, baudrate=115200)
return
raise pyboard.PyboardError("no device with serial number {}".format(serial_number))
else:
# Connect to the given device.
if dev.startswith("port:"):
dev = dev[len("port:") :]
state.pyb = pyboard.PyboardExtended(dev, baudrate=115200)
return
except pyboard.PyboardError as er:
msg = er.args[0]
if msg.startswith("failed to access"):
msg += " (it may be in use by another program)"
print(msg)
sys.exit(1)
def do_disconnect(state, _args=None):
if not state.pyb:
return
try:
if state.pyb.mounted:
if not state.pyb.in_raw_repl:
state.pyb.enter_raw_repl(soft_reset=False)
state.pyb.umount_local()
if state.pyb.in_raw_repl:
state.pyb.exit_raw_repl()
except OSError:
# Ignore any OSError exceptions when shutting down, eg:
# - pyboard.filesystem_command will close the connecton if it had an error
# - umounting will fail if serial port disappeared
pass
state.pyb.close()
state.pyb = None
state._auto_soft_reset = True
def show_progress_bar(size, total_size, op="copying"):
if not sys.stdout.isatty():
return
verbose_size = 2048
bar_length = 20
if total_size < verbose_size:
return
elif size >= total_size:
# Clear progress bar when copy completes
print("\r" + " " * (13 + len(op) + bar_length) + "\r", end="")
else:
bar = size * bar_length // total_size
progress = size * 100 // total_size
print(
"\r ... {} {:3d}% [{}{}]".format(op, progress, "#" * bar, "-" * (bar_length - bar)),
end="",
)
# Get all args up to the terminator ("+").
# The passed args will be updated with these ones removed.
def _get_fs_args(args):
n = 0
for src in args:
if src == "+":
break
n += 1
fs_args = args[:n]
args[:] = args[n + 1 :]
return fs_args
def do_filesystem(state, args):
state.ensure_raw_repl()
state.did_action()
def _list_recursive(files, path):
if os.path.isdir(path):
for entry in os.listdir(path):
_list_recursive(files, "/".join((path, entry)))
else:
files.append(os.path.split(path))
fs_args = _get_fs_args(args)
# Don't be verbose when using cat, so output can be redirected to something.
verbose = fs_args[0] != "cat"
if fs_args[0] == "cp" and fs_args[1] == "-r":
fs_args.pop(0)
fs_args.pop(0)
if fs_args[-1] != ":":
print(f"{_PROG}: 'cp -r' destination must be ':'")
sys.exit(1)
fs_args.pop()
src_files = []
for path in fs_args:
if path.startswith(":"):
raise CommandError("'cp -r' source files must be local")
_list_recursive(src_files, path)
known_dirs = {""}
state.pyb.exec_("import uos")
for dir, file in src_files:
dir_parts = dir.split("/")
for i in range(len(dir_parts)):
d = "/".join(dir_parts[: i + 1])
if d not in known_dirs:
state.pyb.exec_("try:\n uos.mkdir('%s')\nexcept OSError as e:\n print(e)" % d)
known_dirs.add(d)
pyboard.filesystem_command(
state.pyb,
["cp", "/".join((dir, file)), ":" + dir + "/"],
progress_callback=show_progress_bar,
verbose=verbose,
)
else:
try:
pyboard.filesystem_command(
state.pyb, fs_args, progress_callback=show_progress_bar, verbose=verbose
)
except OSError as er:
raise CommandError(er)
def do_edit(state, args):
state.ensure_raw_repl()
state.did_action()
if not os.getenv("EDITOR"):
raise pyboard.PyboardError("edit: $EDITOR not set")
for src in _get_fs_args(args):
src = src.lstrip(":")
dest_fd, dest = tempfile.mkstemp(suffix=os.path.basename(src))
try:
print("edit :%s" % (src,))
os.close(dest_fd)
state.pyb.fs_touch(src)
state.pyb.fs_get(src, dest, progress_callback=show_progress_bar)
if os.system("$EDITOR '%s'" % (dest,)) == 0:
state.pyb.fs_put(dest, src, progress_callback=show_progress_bar)
finally:
os.unlink(dest)
def _get_follow_arg(args):
if args[0] == "--no-follow":
args.pop(0)
return False
else:
return True
def _do_execbuffer(state, buf, follow):
state.ensure_raw_repl()
state.did_action()
try:
state.pyb.exec_raw_no_follow(buf)
if follow:
ret, ret_err = state.pyb.follow(timeout=None, data_consumer=pyboard.stdout_write_bytes)
if ret_err:
pyboard.stdout_write_bytes(ret_err)
sys.exit(1)
except pyboard.PyboardError as er:
print(er)
sys.exit(1)
except KeyboardInterrupt:
sys.exit(1)
def do_exec(state, args):
follow = _get_follow_arg(args)
buf = args.pop(0)
_do_execbuffer(state, buf, follow)
def do_eval(state, args):
follow = _get_follow_arg(args)
buf = "print(" + args.pop(0) + ")"
_do_execbuffer(state, buf, follow)
def do_run(state, args):
follow = _get_follow_arg(args)
filename = args.pop(0)
try:
with open(filename, "rb") as f:
buf = f.read()
except OSError:
raise CommandError(f"could not read file '{filename}'")
sys.exit(1)
_do_execbuffer(state, buf, follow)
def do_mount(state, args):
state.ensure_raw_repl()
unsafe_links = False
if args[0] == "--unsafe-links" or args[0] == "-l":
args.pop(0)
unsafe_links = True
path = args.pop(0)
state.pyb.mount_local(path, unsafe_links=unsafe_links)
print(f"Local directory {path} is mounted at /remote")
def do_umount(state, path):
state.ensure_raw_repl()
state.pyb.umount_local()
def do_resume(state, _args=None):
state._auto_soft_reset = False
def do_soft_reset(state, _args=None):
state.ensure_raw_repl(soft_reset=True)

View File

@ -19,45 +19,101 @@ MicroPython device over a serial connection. Commands supported are:
import os, sys
from collections.abc import Mapping
import tempfile
from textwrap import dedent
import serial.tools.list_ports
from . import pyboardextended as pyboard
from .console import Console, ConsolePosix
from .commands import (
CommandError,
do_connect,
do_disconnect,
do_edit,
do_filesystem,
do_mount,
do_umount,
do_exec,
do_eval,
do_run,
do_resume,
do_soft_reset,
)
from .repl import do_repl
_PROG = "mpremote"
# (need_raw_repl, is_action, num_args_min, help_text)
def do_help(state, _args=None):
def print_commands_help(cmds, help_idx):
max_command_len = max(len(cmd) for cmd in cmds.keys())
for cmd in sorted(cmds.keys()):
help_message_lines = dedent(cmds[cmd][help_idx]).split("\n")
help_message = help_message_lines[0]
for line in help_message_lines[1:]:
help_message = "{}\n{}{}".format(help_message, " " * (max_command_len + 4), line)
print(" ", cmd, " " * (max_command_len - len(cmd) + 2), help_message, sep="")
print(_PROG, "-- MicroPython remote control")
print("See https://docs.micropython.org/en/latest/reference/mpremote.html")
print("\nList of commands:")
print_commands_help(_COMMANDS, 1)
print("\nList of shortcuts:")
print_commands_help(_command_expansions, 2)
sys.exit(0)
def do_version(state, _args=None):
from . import __version__
print(f"{_PROG} {__version__}")
sys.exit(0)
# Map of "command" to tuple of (num_args_min, help_text, handler).
_COMMANDS = {
"connect": (
False,
False,
1,
"""\
connect to given device
device may be: list, auto, id:x, port:x
or any valid device name/path""",
do_connect,
),
"disconnect": (
0,
"disconnect current device",
do_disconnect,
),
"edit": (
1,
"edit files on the device",
do_edit,
),
"resume": (
0,
"resume a previous mpremote session (will not auto soft-reset)",
do_resume,
),
"soft-reset": (
0,
"perform a soft-reset of the device",
do_soft_reset,
),
"disconnect": (False, False, 0, "disconnect current device"),
"edit": (True, True, 1, "edit files on the device"),
"resume": (False, False, 0, "resume a previous mpremote session (will not auto soft-reset)"),
"soft-reset": (False, True, 0, "perform a soft-reset of the device"),
"mount": (
True,
False,
1,
"""\
mount local directory on device
options:
--unsafe-links, -l
follow symbolic links pointing outside of local directory""",
do_mount,
),
"umount": (
0,
"unmount the local directory",
do_umount,
),
"umount": (True, False, 0, "unmount the local directory"),
"repl": (
False,
True,
0,
"""\
enter REPL
@ -65,15 +121,45 @@ _COMMANDS = {
--capture <file>
--inject-code <string>
--inject-file <file>""",
do_repl,
),
"eval": (
1,
"evaluate and print the string",
do_eval,
),
"exec": (
1,
"execute the string",
do_exec,
),
"run": (
1,
"run the given local script",
do_run,
),
"fs": (
1,
"execute filesystem commands on the device",
do_filesystem,
),
"help": (
0,
"print help and exit",
do_help,
),
"version": (
0,
"print version and exit",
do_version,
),
"eval": (True, True, 1, "evaluate and print the string"),
"exec": (True, True, 1, "execute the string"),
"run": (True, True, 1, "run the given local script"),
"fs": (True, True, 1, "execute filesystem commands on the device"),
"help": (False, False, 0, "print help and exit"),
"version": (False, False, 0, "print version and exit"),
}
# Additional commands aliases.
# The value can either be:
# - A command string.
# - A list of command strings, each command will be executed sequentially.
# - A dict of command: { [], help: ""}
_BUILTIN_COMMAND_EXPANSIONS = {
# Device connection shortcuts.
"devs": {
@ -117,6 +203,8 @@ _BUILTIN_COMMAND_EXPANSIONS = {
"--version": "version",
}
# Add "a0", "a1", ..., "u0", "u1", ..., "c0", "c1", ... as aliases
# for "connect /dev/ttyACMn" (and /dev/ttyUSBn, COMn) etc.
for port_num in range(4):
for prefix, port in [("a", "/dev/ttyACM"), ("u", "/dev/ttyUSB"), ("c", "COM")]:
_BUILTIN_COMMAND_EXPANSIONS["{}{}".format(prefix, port_num)] = {
@ -220,307 +308,33 @@ def do_command_expansion(args):
args[0:0] = ["exec", ";".join(pre)]
def do_connect(args):
dev = args.pop(0)
try:
if dev == "list":
# List attached devices.
for p in sorted(serial.tools.list_ports.comports()):
print(
"{} {} {:04x}:{:04x} {} {}".format(
p.device,
p.serial_number,
p.vid if isinstance(p.vid, int) else 0,
p.pid if isinstance(p.pid, int) else 0,
p.manufacturer,
p.product,
)
)
return None
elif dev == "auto":
# Auto-detect and auto-connect to the first available device.
for p in sorted(serial.tools.list_ports.comports()):
try:
return pyboard.PyboardExtended(p.device, baudrate=115200)
except pyboard.PyboardError as er:
if not er.args[0].startswith("failed to access"):
raise er
raise pyboard.PyboardError("no device found")
elif dev.startswith("id:"):
# Search for a device with the given serial number.
serial_number = dev[len("id:") :]
dev = None
for p in serial.tools.list_ports.comports():
if p.serial_number == serial_number:
return pyboard.PyboardExtended(p.device, baudrate=115200)
raise pyboard.PyboardError("no device with serial number {}".format(serial_number))
else:
# Connect to the given device.
if dev.startswith("port:"):
dev = dev[len("port:") :]
return pyboard.PyboardExtended(dev, baudrate=115200)
except pyboard.PyboardError as er:
msg = er.args[0]
if msg.startswith("failed to access"):
msg += " (it may be in use by another program)"
print(msg)
sys.exit(1)
class State:
def __init__(self):
self.pyb = None
self._did_action = False
self._auto_soft_reset = True
def did_action(self):
self._did_action = True
def do_disconnect(pyb):
try:
if pyb.mounted:
if not pyb.in_raw_repl:
pyb.enter_raw_repl(soft_reset=False)
pyb.umount_local()
if pyb.in_raw_repl:
pyb.exit_raw_repl()
except OSError:
# Ignore any OSError exceptions when shutting down, eg:
# - pyboard.filesystem_command will close the connecton if it had an error
# - umounting will fail if serial port disappeared
pass
pyb.close()
def run_repl_on_completion(self):
return not self._did_action
def ensure_connected(self):
if self.pyb is None:
do_connect(self, ["auto"])
def show_progress_bar(size, total_size):
if not sys.stdout.isatty():
return
verbose_size = 2048
bar_length = 20
if total_size < verbose_size:
return
elif size >= total_size:
# Clear progress bar when copy completes
print("\r" + " " * (20 + bar_length) + "\r", end="")
else:
progress = size / total_size
bar = round(progress * bar_length)
print(
"\r ... copying {:3.0f}% [{}{}]".format(
progress * 100, "#" * bar, "-" * (bar_length - bar)
),
end="",
)
def ensure_raw_repl(self, soft_reset=None):
self.ensure_connected()
soft_reset = self._auto_soft_reset if soft_reset is None else soft_reset
if soft_reset or not self.pyb.in_raw_repl:
self.pyb.enter_raw_repl(soft_reset=soft_reset)
self._auto_soft_reset = False
# Get all args up to the terminator ("+").
# The passed args will be updated with these ones removed.
def get_fs_args(args):
n = 0
for src in args:
if src == "+":
break
n += 1
fs_args = args[:n]
args[:] = args[n + 1 :]
return fs_args
def do_filesystem(pyb, args):
def _list_recursive(files, path):
if os.path.isdir(path):
for entry in os.listdir(path):
_list_recursive(files, "/".join((path, entry)))
else:
files.append(os.path.split(path))
fs_args = get_fs_args(args)
# Don't be verbose when using cat, so output can be redirected to something.
verbose = fs_args[0] != "cat"
if fs_args[0] == "cp" and fs_args[1] == "-r":
fs_args.pop(0)
fs_args.pop(0)
if fs_args[-1] != ":":
print(f"{_PROG}: 'cp -r' destination must be ':'")
sys.exit(1)
fs_args.pop()
src_files = []
for path in fs_args:
if path.startswith(":"):
print(f"{_PROG}: 'cp -r' source files must be local")
sys.exit(1)
_list_recursive(src_files, path)
known_dirs = {""}
pyb.exec_("import uos")
for dir, file in src_files:
dir_parts = dir.split("/")
for i in range(len(dir_parts)):
d = "/".join(dir_parts[: i + 1])
if d not in known_dirs:
pyb.exec_("try:\n uos.mkdir('%s')\nexcept OSError as e:\n print(e)" % d)
known_dirs.add(d)
pyboard.filesystem_command(
pyb,
["cp", "/".join((dir, file)), ":" + dir + "/"],
progress_callback=show_progress_bar,
verbose=verbose,
)
else:
try:
pyboard.filesystem_command(
pyb, fs_args, progress_callback=show_progress_bar, verbose=verbose
)
except OSError as er:
print(f"{_PROG}: {er}")
sys.exit(1)
def do_edit(pyb, args):
if not os.getenv("EDITOR"):
raise pyboard.PyboardError("edit: $EDITOR not set")
for src in get_fs_args(args):
src = src.lstrip(":")
dest_fd, dest = tempfile.mkstemp(suffix=os.path.basename(src))
try:
print("edit :%s" % (src,))
os.close(dest_fd)
pyb.fs_touch(src)
pyb.fs_get(src, dest, progress_callback=show_progress_bar)
if os.system("$EDITOR '%s'" % (dest,)) == 0:
pyb.fs_put(dest, src, progress_callback=show_progress_bar)
finally:
os.unlink(dest)
def do_repl_main_loop(pyb, console_in, console_out_write, *, code_to_inject, file_to_inject):
while True:
console_in.waitchar(pyb.serial)
c = console_in.readchar()
if c:
if c == b"\x1d": # ctrl-], quit
break
elif c == b"\x04": # ctrl-D
# special handling needed for ctrl-D if filesystem is mounted
pyb.write_ctrl_d(console_out_write)
elif c == b"\x0a" and code_to_inject is not None: # ctrl-j, inject code
pyb.serial.write(code_to_inject)
elif c == b"\x0b" and file_to_inject is not None: # ctrl-k, inject script
console_out_write(bytes("Injecting %s\r\n" % file_to_inject, "utf8"))
pyb.enter_raw_repl(soft_reset=False)
with open(file_to_inject, "rb") as f:
pyfile = f.read()
try:
pyb.exec_raw_no_follow(pyfile)
except pyboard.PyboardError as er:
console_out_write(b"Error:\r\n")
console_out_write(er)
pyb.exit_raw_repl()
else:
pyb.serial.write(c)
try:
n = pyb.serial.inWaiting()
except OSError as er:
if er.args[0] == 5: # IO error, device disappeared
print("device disconnected")
break
if n > 0:
c = pyb.serial.read(1)
if c is not None:
# pass character through to the console
oc = ord(c)
if oc in (8, 9, 10, 13, 27) or 32 <= oc <= 126:
console_out_write(c)
else:
console_out_write(b"[%02x]" % ord(c))
def do_repl(pyb, args):
capture_file = None
code_to_inject = None
file_to_inject = None
while len(args):
if args[0] == "--capture":
args.pop(0)
capture_file = args.pop(0)
elif args[0] == "--inject-code":
args.pop(0)
code_to_inject = bytes(args.pop(0).replace("\\n", "\r\n"), "utf8")
elif args[0] == "--inject-file":
args.pop(0)
file_to_inject = args.pop(0)
else:
break
print("Connected to MicroPython at %s" % pyb.device_name)
print("Use Ctrl-] to exit this shell")
if capture_file is not None:
print('Capturing session to file "%s"' % capture_file)
capture_file = open(capture_file, "wb")
if code_to_inject is not None:
print("Use Ctrl-J to inject", code_to_inject)
if file_to_inject is not None:
print('Use Ctrl-K to inject file "%s"' % file_to_inject)
console = Console()
console.enter()
def console_out_write(b):
console.write(b)
if capture_file is not None:
capture_file.write(b)
capture_file.flush()
try:
do_repl_main_loop(
pyb,
console,
console_out_write,
code_to_inject=code_to_inject,
file_to_inject=file_to_inject,
)
finally:
console.exit()
if capture_file is not None:
capture_file.close()
def execbuffer(pyb, buf, follow):
ret_val = 0
try:
pyb.exec_raw_no_follow(buf)
if follow:
ret, ret_err = pyb.follow(timeout=None, data_consumer=pyboard.stdout_write_bytes)
if ret_err:
pyboard.stdout_write_bytes(ret_err)
ret_val = 1
except pyboard.PyboardError as er:
print(er)
ret_val = 1
except KeyboardInterrupt:
ret_val = 1
return ret_val
def print_help():
def print_commands_help(cmds, help_idx):
max_command_len = max(len(cmd) for cmd in cmds.keys())
for cmd in sorted(cmds.keys()):
help_message_lines = dedent(cmds[cmd][help_idx]).split("\n")
help_message = help_message_lines[0]
for line in help_message_lines[1:]:
help_message = "{}\n{}{}".format(help_message, " " * (max_command_len + 4), line)
print(" ", cmd, " " * (max_command_len - len(cmd) + 2), help_message, sep="")
print(_PROG, "-- MicroPython remote control")
print("See https://docs.micropython.org/en/latest/reference/mpremote.html")
print("\nList of commands:")
print_commands_help(_COMMANDS, 3)
print("\nList of shortcuts:")
print_commands_help(_command_expansions, 2)
def print_version():
from . import __version__
print(f"{_PROG} {__version__}")
def ensure_friendly_repl(self):
self.ensure_connected()
if self.pyb.in_raw_repl:
self.pyb.exit_raw_repl()
def main():
@ -528,106 +342,31 @@ def main():
prepare_command_expansions(config)
args = sys.argv[1:]
pyb = None
auto_soft_reset = True
did_action = False
state = State()
try:
while args:
do_command_expansion(args)
cmd = args.pop(0)
try:
need_raw_repl, is_action, num_args_min, _ = _COMMANDS[cmd]
num_args_min, _help, handler = _COMMANDS[cmd]
except KeyError:
print(f"{_PROG}: '{cmd}' is not a command")
return 1
raise CommandError(f"'{cmd}' is not a command")
if len(args) < num_args_min:
print(f"{_PROG}: '{cmd}' neads at least {num_args_min} argument(s)")
return 1
if cmd == "connect":
if pyb is not None:
do_disconnect(pyb)
pyb = do_connect(args)
if pyb is None:
did_action = True
continue
elif cmd == "help":
print_help()
sys.exit(0)
elif cmd == "version":
print_version()
sys.exit(0)
elif cmd == "resume":
auto_soft_reset = False
continue
handler(state, args)
# The following commands need a connection, and either a raw or friendly REPL.
if pyb is None:
pyb = do_connect(["auto"])
# If no commands were "actions" then implicitly finish with the REPL.
if state.run_repl_on_completion():
do_repl(state, args)
if need_raw_repl:
if not pyb.in_raw_repl:
pyb.enter_raw_repl(soft_reset=auto_soft_reset)
auto_soft_reset = False
else:
if pyb.in_raw_repl:
pyb.exit_raw_repl()
if is_action:
did_action = True
if cmd == "disconnect":
do_disconnect(pyb)
pyb = None
auto_soft_reset = True
elif cmd == "soft-reset":
pyb.enter_raw_repl(soft_reset=True)
auto_soft_reset = False
elif cmd == "mount":
unsafe_links = False
if args[0] == "--unsafe-links" or args[0] == "-l":
args.pop(0)
unsafe_links = True
path = args.pop(0)
pyb.mount_local(path, unsafe_links=unsafe_links)
print(f"Local directory {path} is mounted at /remote")
elif cmd == "umount":
pyb.umount_local()
elif cmd in ("exec", "eval", "run"):
follow = True
if args[0] == "--no-follow":
args.pop(0)
follow = False
if cmd == "exec":
buf = args.pop(0)
elif cmd == "eval":
buf = "print(" + args.pop(0) + ")"
else:
filename = args.pop(0)
try:
with open(filename, "rb") as f:
buf = f.read()
except OSError:
print(f"{_PROG}: could not read file '{filename}'")
return 1
ret = execbuffer(pyb, buf, follow)
if ret:
return ret
elif cmd == "fs":
do_filesystem(pyb, args)
elif cmd == "edit":
do_edit(pyb, args)
elif cmd == "repl":
do_repl(pyb, args)
if not did_action:
if pyb is None:
pyb = do_connect(["auto"])
if pyb.in_raw_repl:
pyb.exit_raw_repl()
do_repl(pyb, args)
return 0
except CommandError as e:
print(f"{_PROG}: {e}", file=sys.stderr)
return 1
finally:
if pyb is not None:
do_disconnect(pyb)
do_disconnect(state)

View File

@ -0,0 +1,101 @@
from .console import Console, ConsolePosix
from . import pyboardextended as pyboard
def do_repl_main_loop(state, console_in, console_out_write, *, code_to_inject, file_to_inject):
while True:
console_in.waitchar(state.pyb.serial)
c = console_in.readchar()
if c:
if c == b"\x1d": # ctrl-], quit
break
elif c == b"\x04": # ctrl-D
# special handling needed for ctrl-D if filesystem is mounted
state.pyb.write_ctrl_d(console_out_write)
elif c == b"\x0a" and code_to_inject is not None: # ctrl-j, inject code
state.pyb.serial.write(code_to_inject)
elif c == b"\x0b" and file_to_inject is not None: # ctrl-k, inject script
console_out_write(bytes("Injecting %s\r\n" % file_to_inject, "utf8"))
state.pyb.enter_raw_repl(soft_reset=False)
with open(file_to_inject, "rb") as f:
pyfile = f.read()
try:
state.pyb.exec_raw_no_follow(pyfile)
except pyboard.PyboardError as er:
console_out_write(b"Error:\r\n")
console_out_write(er)
state.pyb.exit_raw_repl()
else:
state.pyb.serial.write(c)
try:
n = state.pyb.serial.inWaiting()
except OSError as er:
if er.args[0] == 5: # IO error, device disappeared
print("device disconnected")
break
if n > 0:
c = state.pyb.serial.read(1)
if c is not None:
# pass character through to the console
oc = ord(c)
if oc in (8, 9, 10, 13, 27) or 32 <= oc <= 126:
console_out_write(c)
else:
console_out_write(b"[%02x]" % ord(c))
def do_repl(state, args):
state.ensure_friendly_repl()
state.did_action()
capture_file = None
code_to_inject = None
file_to_inject = None
while len(args):
if args[0] == "--capture":
args.pop(0)
capture_file = args.pop(0)
elif args[0] == "--inject-code":
args.pop(0)
code_to_inject = bytes(args.pop(0).replace("\\n", "\r\n"), "utf8")
elif args[0] == "--inject-file":
args.pop(0)
file_to_inject = args.pop(0)
else:
break
print("Connected to MicroPython at %s" % state.pyb.device_name)
print("Use Ctrl-] to exit this shell")
if capture_file is not None:
print('Capturing session to file "%s"' % capture_file)
capture_file = open(capture_file, "wb")
if code_to_inject is not None:
print("Use Ctrl-J to inject", code_to_inject)
if file_to_inject is not None:
print('Use Ctrl-K to inject file "%s"' % file_to_inject)
console = Console()
console.enter()
def console_out_write(b):
console.write(b)
if capture_file is not None:
capture_file.write(b)
capture_file.flush()
try:
do_repl_main_loop(
state,
console,
console_out_write,
code_to_inject=code_to_inject,
file_to_inject=file_to_inject,
)
finally:
console.exit()
if capture_file is not None:
capture_file.close()