tools/mpremote: Add new CLI utility to interact with remote device.

This has been under development since April 2017.  See #3034 and #6375.

Signed-off-by: Damien George <damien@micropython.org>
This commit is contained in:
Damien George 2021-05-29 17:12:54 +10:00
parent e4ba57c5cd
commit a60ad33641
9 changed files with 1390 additions and 0 deletions

21
tools/mpremote/LICENSE Normal file
View File

@ -0,0 +1,21 @@
The MIT License (MIT)
Copyright (c) 2021 Damien P. George
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

71
tools/mpremote/README.md Normal file
View File

@ -0,0 +1,71 @@
# mpremote -- MicroPython remote control
This CLI tool provides an integrated set of utilities to remotely interact with
and automate a MicroPython device over a serial connection.
The simplest way to use this tool is:
mpremote
This will automatically connect to the device and provide an interactive REPL.
The full list of supported commands are:
mpremote connect <device> -- connect to given device
device may be: list, auto, id:x, port:x
or any valid device name/path
mpremote disconnect -- disconnect current device
mpremote mount <local-dir> -- mount local directory on device
mpremote eval <string> -- evaluate and print the string
mpremote exec <string> -- execute the string
mpremote run <file> -- run the given local script
mpremote fs <command> <args...> -- execute filesystem commands on the device
command may be: cat, ls, cp, rm, mkdir, rmdir
use ":" as a prefix to specify a file on the device
mpremote repl -- enter REPL
options:
--capture <file>
--inject-code <string>
--inject-file <file>
Multiple commands can be specified and they will be run sequentially. Connection
and disconnection will be done automatically at the start and end of the execution
of the tool, if such commands are not explicitly given. Automatic connection will
search for the first available serial device. If no action is specified then the
REPL will be entered.
Shortcuts can be defined using the macro system. Built-in shortcuts are:
- a0, a1, a2, a3: connect to `/dev/ttyACM?`
- u0, u1, u2, u3: connect to `/dev/ttyUSB?`
- c0, c1, c2, c3: connect to `COM?`
- cat, ls, cp, rm, mkdir, rmdir, df: filesystem commands
- reset: reset the device
- bootloader: make the device enter its bootloader
Any user configuration, including user-defined shortcuts, can be placed in
.config/mpremote/config.py. For example:
# Custom macro commands
commands = {
"c33": "connect id:334D335C3138",
"bl": "bootloader",
"double x=4": "eval x*2",
}
Examples:
mpremote
mpremote a1
mpremote connect /dev/ttyUSB0 repl
mpremote ls
mpremote a1 ls
mpremote exec "import micropython; micropython.mem_info()"
mpremote eval 1/2 eval 3/4
mpremote mount .
mpremote mount . exec "import local_script"
mpremote ls
mpremote cat boot.py
mpremote cp :main.py .
mpremote cp main.py :
mpremote cp -r dir/ :

6
tools/mpremote/mpremote.py Executable file
View File

@ -0,0 +1,6 @@
#!/usr/bin/env python3
import sys
from mpremote import main
sys.exit(main.main())

View File

@ -0,0 +1 @@
# empty

View File

@ -0,0 +1,162 @@
import sys
try:
import select, termios
except ImportError:
termios = None
select = None
import msvcrt
class ConsolePosix:
def __init__(self):
self.infd = sys.stdin.fileno()
self.infile = sys.stdin.buffer.raw
self.outfile = sys.stdout.buffer.raw
self.orig_attr = termios.tcgetattr(self.infd)
def enter(self):
# attr is: [iflag, oflag, cflag, lflag, ispeed, ospeed, cc]
attr = termios.tcgetattr(self.infd)
attr[0] &= ~(
termios.BRKINT | termios.ICRNL | termios.INPCK | termios.ISTRIP | termios.IXON
)
attr[1] = 0
attr[2] = attr[2] & ~(termios.CSIZE | termios.PARENB) | termios.CS8
attr[3] = 0
attr[6][termios.VMIN] = 1
attr[6][termios.VTIME] = 0
termios.tcsetattr(self.infd, termios.TCSANOW, attr)
def exit(self):
termios.tcsetattr(self.infd, termios.TCSANOW, self.orig_attr)
def waitchar(self):
# TODO pyb.serial might not have fd
select.select([console_in.infd, pyb.serial.fd], [], [])
def readchar(self):
res = select.select([self.infd], [], [], 0)
if res[0]:
return self.infile.read(1)
else:
return None
def write(self, buf):
self.outfile.write(buf)
class ConsoleWindows:
KEY_MAP = {
b"H": b"A", # UP
b"P": b"B", # DOWN
b"M": b"C", # RIGHT
b"K": b"D", # LEFT
b"G": b"H", # POS1
b"O": b"F", # END
b"Q": b"6~", # PGDN
b"I": b"5~", # PGUP
b"s": b"1;5D", # CTRL-LEFT,
b"t": b"1;5C", # CTRL-RIGHT,
b"\x8d": b"1;5A", # CTRL-UP,
b"\x91": b"1;5B", # CTRL-DOWN,
b"w": b"1;5H", # CTRL-POS1
b"u": b"1;5F", # CTRL-END
b"\x98": b"1;3A", # ALT-UP,
b"\xa0": b"1;3B", # ALT-DOWN,
b"\x9d": b"1;3C", # ALT-RIGHT,
b"\x9b": b"1;3D", # ALT-LEFT,
b"\x97": b"1;3H", # ALT-POS1,
b"\x9f": b"1;3F", # ALT-END,
b"S": b"3~", # DEL,
b"\x93": b"3;5~", # CTRL-DEL
b"R": b"2~", # INS
b"\x92": b"2;5~", # CTRL-INS
b"\x94": b"Z", # Ctrl-Tab = BACKTAB,
}
def enter(self):
pass
def exit(self):
pass
def inWaiting(self):
return 1 if msvcrt.kbhit() else 0
def waitchar(self):
while not (self.inWaiting() or pyb.serial.inWaiting()):
time.sleep(0.01)
def readchar(self):
if msvcrt.kbhit():
ch = msvcrt.getch()
while ch in b"\x00\xe0": # arrow or function key prefix?
if not msvcrt.kbhit():
return None
ch = msvcrt.getch() # second call returns the actual key code
try:
ch = b"\x1b[" + self.KEY_MAP[ch]
except KeyError:
return None
return ch
def write(self, buf):
buf = buf.decode() if isinstance(buf, bytes) else buf
sys.stdout.write(buf)
sys.stdout.flush()
# for b in buf:
# if isinstance(b, bytes):
# msvcrt.putch(b)
# else:
# msvcrt.putwch(b)
if termios:
Console = ConsolePosix
VT_ENABLED = True
else:
Console = ConsoleWindows
# Windows VT mode ( >= win10 only)
# https://bugs.python.org/msg291732
import ctypes
from ctypes import wintypes
kernel32 = ctypes.WinDLL("kernel32", use_last_error=True)
ERROR_INVALID_PARAMETER = 0x0057
ENABLE_VIRTUAL_TERMINAL_PROCESSING = 0x0004
def _check_bool(result, func, args):
if not result:
raise ctypes.WinError(ctypes.get_last_error())
return args
LPDWORD = ctypes.POINTER(wintypes.DWORD)
kernel32.GetConsoleMode.errcheck = _check_bool
kernel32.GetConsoleMode.argtypes = (wintypes.HANDLE, LPDWORD)
kernel32.SetConsoleMode.errcheck = _check_bool
kernel32.SetConsoleMode.argtypes = (wintypes.HANDLE, wintypes.DWORD)
def set_conout_mode(new_mode, mask=0xFFFFFFFF):
# don't assume StandardOutput is a console.
# open CONOUT$ instead
fdout = os.open("CONOUT$", os.O_RDWR)
try:
hout = msvcrt.get_osfhandle(fdout)
old_mode = wintypes.DWORD()
kernel32.GetConsoleMode(hout, ctypes.byref(old_mode))
mode = (new_mode & mask) | (old_mode.value & ~mask)
kernel32.SetConsoleMode(hout, mode)
return old_mode.value
finally:
os.close(fdout)
# def enable_vt_mode():
mode = mask = ENABLE_VIRTUAL_TERMINAL_PROCESSING
try:
set_conout_mode(mode, mask)
VT_ENABLED = True
except WindowsError as e:
VT_ENABLED = False

View File

@ -0,0 +1,477 @@
"""
MicroPython Remote - Interaction and automation tool for MicroPython
MIT license; Copyright (c) 2019-2021 Damien P. George
This program provides a set of utilities to interact with and automate a
MicroPython device over a serial connection. Commands supported are:
mpremote -- auto-detect, connect and enter REPL
mpremote <device-shortcut> -- connect to given device
mpremote connect <device> -- connect to given device
mpremote disconnect -- disconnect current device
mpremote mount <local-dir> -- mount local directory on device
mpremote eval <string> -- evaluate and print the string
mpremote exec <string> -- execute the string
mpremote run <script> -- run the given local script
mpremote fs <command> <args...> -- execute filesystem commands on the device
mpremote repl -- enter REPL
"""
import os, select, sys, time
import serial.tools.list_ports
from . import pyboardextended as pyboard
from .console import Console, ConsolePosix
_PROG = "mpremote"
_AUTO_CONNECT_SEARCH_LIST = [
"/dev/ttyACM0",
"/dev/ttyACM1",
"/dev/ttyACM2",
"/dev/ttyACM3",
"/dev/ttyUSB0",
"/dev/ttyUSB1",
"/dev/ttyUSB2",
"/dev/ttyUSB3",
"COM0",
"COM1",
"COM2",
"COM3",
]
_BUILTIN_COMMAND_EXPANSIONS = {
# Device connection shortcuts.
"a0": "connect /dev/ttyACM0",
"a1": "connect /dev/ttyACM1",
"a2": "connect /dev/ttyACM2",
"a3": "connect /dev/ttyACM3",
"u0": "connect /dev/ttyUSB0",
"u1": "connect /dev/ttyUSB1",
"u2": "connect /dev/ttyUSB2",
"u3": "connect /dev/ttyUSB3",
"c0": "connect COM0",
"c1": "connect COM1",
"c2": "connect COM2",
"c3": "connect COM3",
# Filesystem shortcuts.
"cat": "fs cat",
"ls": "fs ls",
"cp": "fs cp",
"rm": "fs rm",
"mkdir": "fs mkdir",
"rmdir": "fs rmdir",
"df": [
"exec",
"import uos\nprint('mount \\tsize \\tused \\tavail \\tuse%')\nfor _m in [''] + uos.listdir('/'):\n _s = uos.stat('/' + _m)\n if not _s[0] & 1 << 14: continue\n _s = uos.statvfs(_m)\n if _s[0]:\n _size = _s[0] * _s[2]; _free = _s[0] * _s[3]; print(_m, _size, _size - _free, _free, int(100 * (_size - _free) / _size), sep='\\t')",
],
# Other shortcuts.
"reset t_ms=100": [
"exec",
"--no-follow",
"import utime, umachine; utime.sleep_ms(t_ms); umachine.reset()",
],
"bootloader t_ms=100": [
"exec",
"--no-follow",
"import utime, umachine; utime.sleep_ms(t_ms); umachine.bootloader()",
],
"setrtc": [
"exec",
"import machine; machine.RTC().datetime((2020, 1, 1, 0, 10, 0, 0, 0))",
],
}
def load_user_config():
# Create empty config object.
config = __build_class__(lambda: None, "Config")()
config.commands = {}
# Get config file name.
path = os.getenv("XDG_CONFIG_HOME")
if path is None:
path = os.getenv("HOME")
if path is None:
return config
path = os.path.join(path, ".config")
path = os.path.join(path, _PROG)
config_file = os.path.join(path, "config.py")
# Check if config file exists.
if not os.path.exists(config_file):
return config
# Exec the config file in its directory.
with open(config_file) as f:
config_data = f.read()
prev_cwd = os.getcwd()
os.chdir(path)
exec(config_data, config.__dict__)
os.chdir(prev_cwd)
return config
def prepare_command_expansions(config):
global _command_expansions
_command_expansions = {}
for command_set in (_BUILTIN_COMMAND_EXPANSIONS, config.commands):
for cmd, sub in command_set.items():
cmd = cmd.split()
if len(cmd) == 1:
args = ()
else:
args = tuple(c.split("=") for c in cmd[1:])
if isinstance(sub, str):
sub = sub.split()
_command_expansions[cmd[0]] = (args, sub)
def do_command_expansion(args):
def usage_error(cmd, exp_args, msg):
print(f"Command {cmd} {msg}; signature is:")
print(" ", cmd, " ".join("=".join(a) for a in exp_args))
sys.exit(1)
last_arg_idx = len(args)
pre = []
while args and args[0] in _command_expansions:
cmd = args.pop(0)
exp_args, exp_sub = _command_expansions[cmd]
for exp_arg in exp_args:
exp_arg_name = exp_arg[0]
if args and "=" not in args[0]:
# Argument given without a name.
value = args.pop(0)
elif args and args[0].startswith(exp_arg_name + "="):
# Argument given with correct name.
value = args.pop(0).split("=", 1)[1]
else:
# No argument given, or argument given with a different name.
if len(exp_arg) == 1:
# Required argument (it has no default).
usage_error(cmd, exp_args, f"missing argument {exp_arg_name}")
else:
# Optional argument with a default.
value = exp_arg[1]
pre.append(f"{exp_arg_name}={value}")
args[0:0] = exp_sub
last_arg_idx = len(exp_sub)
if last_arg_idx < len(args) and "=" in args[last_arg_idx]:
# Extra unknown arguments given.
arg = args[last_arg_idx].split("=", 1)[0]
usage_error(cmd, exp_args, f"given unexpected argument {arg}")
sys.exit(1)
# Insert expansion with optional setting of arguments.
if pre:
args[0:0] = ["exec", ";".join(pre)]
def do_connect(args):
dev = args.pop(0)
try:
if dev == "list":
# List attached devices.
for p in sorted(serial.tools.list_ports.comports()):
print(
"{} {} {:04x}:{:04x} {} {}".format(
p.device, p.serial_number, p.pid, p.vid, p.manufacturer, p.product
)
)
return None
elif dev == "auto":
# Auto-detect and auto-connect to the first available device.
ports = serial.tools.list_ports.comports()
for dev in _AUTO_CONNECT_SEARCH_LIST:
if any(p.device == dev for p in ports):
try:
return pyboard.PyboardExtended(dev, baudrate=115200)
except pyboard.PyboardError as er:
if not er.args[0].startswith("failed to access"):
raise er
raise pyboard.PyboardError("no device found")
elif dev.startswith("id:"):
# Search for a device with the given serial number.
serial_number = dev[len("id:") :]
dev = None
for p in serial.tools.list_ports.comports():
if p.serial_number == serial_number:
return pyboard.PyboardExtended(p.device, baudrate=115200)
raise pyboard.PyboardError("no device with serial number {}".format(serial_number))
else:
# Connect to the given device.
if dev.startswith("port:"):
dev = dev[len("port:") :]
return pyboard.PyboardExtended(dev, baudrate=115200)
except pyboard.PyboardError as er:
msg = er.args[0]
if msg.startswith("failed to access"):
msg += " (it may be in use by another program)"
print(msg)
sys.exit(1)
def do_disconnect(pyb):
try:
if pyb.mounted:
if not pyb.in_raw_repl:
pyb.enter_raw_repl(soft_reset=False)
pyb.umount_local()
if pyb.in_raw_repl:
pyb.exit_raw_repl()
except OSError:
# Ignore any OSError exceptions when shutting down, eg:
# - pyboard.filesystem_command will close the connecton if it had an error
# - umounting will fail if serial port disappeared
pass
pyb.close()
def do_filesystem(pyb, args):
def _list_recursive(files, path):
if os.path.isdir(path):
for entry in os.listdir(path):
_list_recursive(files, os.path.join(path, entry))
else:
files.append(os.path.split(path))
if args[0] == "cp" and args[1] == "-r":
args.pop(0)
args.pop(0)
assert args[-1] == ":"
args.pop()
src_files = []
for path in args:
_list_recursive(src_files, path)
known_dirs = {""}
pyb.exec_("import uos")
for dir, file in src_files:
dir_parts = dir.split("/")
for i in range(len(dir_parts)):
d = "/".join(dir_parts[: i + 1])
if d not in known_dirs:
pyb.exec_("try:\n uos.mkdir('%s')\nexcept OSError as e:\n print(e)" % d)
known_dirs.add(d)
pyboard.filesystem_command(pyb, ["cp", os.path.join(dir, file), ":" + dir + "/"])
else:
pyboard.filesystem_command(pyb, args)
args.clear()
def do_repl_main_loop(pyb, console_in, console_out_write, *, code_to_inject, file_to_inject):
while True:
if isinstance(console_in, ConsolePosix):
# TODO pyb.serial might not have fd
select.select([console_in.infd, pyb.serial.fd], [], [])
else:
while not (console_in.inWaiting() or pyb.serial.inWaiting()):
time.sleep(0.01)
c = console_in.readchar()
if c:
if c == b"\x1d": # ctrl-], quit
break
elif c == b"\x04": # ctrl-D
# do a soft reset and reload the filesystem hook
pyb.soft_reset_with_mount(console_out_write)
elif c == b"\x0a" and code_to_inject is not None: # ctrl-j, inject code
pyb.serial.write(code_to_inject)
elif c == b"\x0b" and file_to_inject is not None: # ctrl-k, inject script
console_out_write(bytes("Injecting %s\r\n" % file_to_inject, "utf8"))
pyb.enter_raw_repl(soft_reset=False)
with open(file_to_inject, "rb") as f:
pyfile = f.read()
try:
pyb.exec_raw_no_follow(pyfile)
except pyboard.PyboardError as er:
console_out_write(b"Error:\r\n")
console_out_write(er)
pyb.exit_raw_repl()
else:
pyb.serial.write(c)
try:
n = pyb.serial.inWaiting()
except OSError as er:
if er.args[0] == 5: # IO error, device disappeared
print("device disconnected")
break
if n > 0:
c = pyb.serial.read(1)
if c is not None:
# pass character through to the console
oc = ord(c)
if oc in (8, 9, 10, 13, 27) or 32 <= oc <= 126:
console_out_write(c)
else:
console_out_write(b"[%02x]" % ord(c))
def do_repl(pyb, args):
capture_file = None
code_to_inject = None
file_to_inject = None
while len(args):
if args[0] == "--capture":
args.pop(0)
capture_file = args.pop(0)
elif args[0] == "--inject-code":
args.pop(0)
code_to_inject = bytes(args.pop(0).replace("\\n", "\r\n"), "utf8")
elif args[0] == "--inject-file":
args.pop(0)
file_to_inject = args.pop(0)
else:
break
print("Connected to MicroPython at %s" % pyb.device_name)
print("Use Ctrl-] to exit this shell")
if capture_file is not None:
print('Capturing session to file "%s"' % capture_file)
capture_file = open(capture_file, "wb")
if code_to_inject is not None:
print("Use Ctrl-J to inject", code_to_inject)
if file_to_inject is not None:
print('Use Ctrl-K to inject file "%s"' % file_to_inject)
console = Console()
console.enter()
def console_out_write(b):
console.write(b)
if capture_file is not None:
capture_file.write(b)
capture_file.flush()
try:
do_repl_main_loop(
pyb,
console,
console_out_write,
code_to_inject=code_to_inject,
file_to_inject=file_to_inject,
)
finally:
console.exit()
if capture_file is not None:
capture_file.close()
def execbuffer(pyb, buf, follow):
ret_val = 0
try:
pyb.exec_raw_no_follow(buf)
if follow:
ret, ret_err = pyb.follow(timeout=None, data_consumer=pyboard.stdout_write_bytes)
if ret_err:
pyboard.stdout_write_bytes(ret_err)
ret_val = 1
except pyboard.PyboardError as er:
print(er)
ret_val = 1
except KeyboardInterrupt:
ret_val = 1
return ret_val
def main():
config = load_user_config()
prepare_command_expansions(config)
args = sys.argv[1:]
pyb = None
did_action = False
try:
while args:
do_command_expansion(args)
cmds = {
"connect": (False, False, 1),
"disconnect": (False, False, 0),
"mount": (True, False, 1),
"repl": (False, True, 0),
"eval": (True, True, 1),
"exec": (True, True, 1),
"run": (True, True, 1),
"fs": (True, True, 1),
}
cmd = args.pop(0)
try:
need_raw_repl, is_action, num_args_min = cmds[cmd]
except KeyError:
print(f"{_PROG}: '{cmd}' is not a command")
return 1
if len(args) < num_args_min:
print(f"{_PROG}: '{cmd}' neads at least {num_args_min} argument(s)")
return 1
if cmd == "connect":
if pyb is not None:
do_disconnect(pyb)
pyb = do_connect(args)
if pyb is None:
did_action = True
continue
if pyb is None:
pyb = do_connect(["auto"])
if need_raw_repl:
if not pyb.in_raw_repl:
pyb.enter_raw_repl()
else:
if pyb.in_raw_repl:
pyb.exit_raw_repl()
if is_action:
did_action = True
if cmd == "disconnect":
do_disconnect(pyb)
pyb = None
elif cmd == "mount":
path = args.pop(0)
pyb.mount_local(path)
print(f"Local directory {path} is mounted at /remote")
elif cmd in ("exec", "eval", "run"):
follow = True
if args[0] == "--no-follow":
args.pop(0)
follow = False
if cmd == "exec":
buf = args.pop(0)
elif cmd == "eval":
buf = "print(" + args.pop(0) + ")"
else:
filename = args.pop(0)
try:
with open(filename, "rb") as f:
buf = f.read()
except OSError:
print(f"{_PROG}: could not read file '{filename}'")
return 1
ret = execbuffer(pyb, buf, follow)
if ret:
return ret
elif cmd == "fs":
do_filesystem(pyb, args)
elif cmd == "repl":
do_repl(pyb, args)
if not did_action:
if pyb is None:
pyb = do_connect(["auto"])
if pyb.in_raw_repl:
pyb.exit_raw_repl()
do_repl(pyb, args)
finally:
if pyb is not None:
do_disconnect(pyb)

View File

@ -0,0 +1,621 @@
import os, re, serial, struct, time
from errno import EPERM
from .console import VT_ENABLED
try:
from .pyboard import Pyboard, PyboardError, stdout_write_bytes, filesystem_command
except ImportError:
import sys
sys.path.append(os.path.dirname(__file__) + "/../..")
from pyboard import Pyboard, PyboardError, stdout_write_bytes, filesystem_command
fs_hook_cmds = {
"CMD_STAT": 1,
"CMD_ILISTDIR_START": 2,
"CMD_ILISTDIR_NEXT": 3,
"CMD_OPEN": 4,
"CMD_CLOSE": 5,
"CMD_READ": 6,
"CMD_WRITE": 7,
"CMD_SEEK": 8,
"CMD_REMOVE": 9,
"CMD_RENAME": 10,
}
fs_hook_code = """\
import uos, uio, ustruct, micropython, usys
class RemoteCommand:
def __init__(self, use_second_port):
self.buf4 = bytearray(4)
try:
import pyb
self.fout = pyb.USB_VCP()
if use_second_port:
self.fin = pyb.USB_VCP(1)
else:
self.fin = pyb.USB_VCP()
except:
import usys
self.fout = usys.stdout.buffer
self.fin = usys.stdin.buffer
import select
self.poller = select.poll()
self.poller.register(self.fin, select.POLLIN)
def poll_in(self):
for _ in self.poller.ipoll(1000):
return
self.end()
raise Exception('timeout waiting for remote')
def rd(self, n):
buf = bytearray(n)
self.rd_into(buf, n)
return buf
def rd_into(self, buf, n):
# implement reading with a timeout in case other side disappears
if n == 0:
return
self.poll_in()
r = self.fin.readinto(buf, n)
if r < n:
mv = memoryview(buf)
while r < n:
self.poll_in()
r += self.fin.readinto(mv[r:], n - r)
def begin(self, type):
micropython.kbd_intr(-1)
buf4 = self.buf4
buf4[0] = 0x18
buf4[1] = type
self.fout.write(buf4, 2)
# Wait for sync byte 0x18, but don't get stuck forever
for i in range(30):
self.poller.poll(1000)
self.fin.readinto(buf4, 1)
if buf4[0] == 0x18:
break
def end(self):
micropython.kbd_intr(3)
def rd_s8(self):
self.rd_into(self.buf4, 1)
n = self.buf4[0]
if n & 0x80:
n -= 0x100
return n
def rd_s32(self):
buf4 = self.buf4
self.rd_into(buf4, 4)
n = buf4[0] | buf4[1] << 8 | buf4[2] << 16 | buf4[3] << 24
if buf4[3] & 0x80:
n -= 0x100000000
return n
def rd_u32(self):
buf4 = self.buf4
self.rd_into(buf4, 4)
return buf4[0] | buf4[1] << 8 | buf4[2] << 16 | buf4[3] << 24
def rd_bytes(self, buf):
# TODO if n is large (eg >256) then we may miss bytes on stdin
n = self.rd_s32()
if buf is None:
ret = buf = bytearray(n)
else:
ret = n
self.rd_into(buf, n)
return ret
def rd_str(self):
n = self.rd_s32()
if n == 0:
return ''
else:
return str(self.rd(n), 'utf8')
def wr_s8(self, i):
self.buf4[0] = i
self.fout.write(self.buf4, 1)
def wr_s32(self, i):
ustruct.pack_into('<i', self.buf4, 0, i)
self.fout.write(self.buf4)
def wr_bytes(self, b):
self.wr_s32(len(b))
self.fout.write(b)
# str and bytes act the same in MicroPython
wr_str = wr_bytes
class RemoteFile(uio.IOBase):
def __init__(self, cmd, fd, is_text):
self.cmd = cmd
self.fd = fd
self.is_text = is_text
def __enter__(self):
return self
def __exit__(self, a, b, c):
self.close()
def ioctl(self, request, arg):
if request == 4: # CLOSE
self.close()
return 0
def flush(self):
pass
def close(self):
if self.fd is None:
return
c = self.cmd
c.begin(CMD_CLOSE)
c.wr_s8(self.fd)
c.end()
self.fd = None
def read(self, n=-1):
c = self.cmd
c.begin(CMD_READ)
c.wr_s8(self.fd)
c.wr_s32(n)
data = c.rd_bytes(None)
c.end()
if self.is_text:
data = str(data, 'utf8')
else:
data = bytes(data)
return data
def readinto(self, buf):
c = self.cmd
c.begin(CMD_READ)
c.wr_s8(self.fd)
c.wr_s32(len(buf))
n = c.rd_bytes(buf)
c.end()
return n
def readline(self):
l = ''
while 1:
c = self.read(1)
l += c
if c == '\\n' or c == '':
return l
def readlines(self):
ls = []
while 1:
l = self.readline()
if not l:
return ls
ls.append(l)
def write(self, buf):
c = self.cmd
c.begin(CMD_WRITE)
c.wr_s8(self.fd)
c.wr_bytes(buf)
n = c.rd_s32()
c.end()
return n
def seek(self, n):
c = self.cmd
c.begin(CMD_SEEK)
c.wr_s8(self.fd)
c.wr_s32(n)
n = c.rd_s32()
c.end()
return n
class RemoteFS:
def __init__(self, cmd):
self.cmd = cmd
def mount(self, readonly, mkfs):
pass
def umount(self):
pass
def chdir(self, path):
if not path.startswith("/"):
path = self.path + path
if not path.endswith("/"):
path += "/"
if path != "/":
self.stat(path)
self.path = path
def getcwd(self):
return self.path
def remove(self, path):
c = self.cmd
c.begin(CMD_REMOVE)
c.wr_str(self.path + path)
res = c.rd_s32()
c.end()
if res < 0:
raise OSError(-res)
def rename(self, old, new):
c = self.cmd
c.begin(CMD_RENAME)
c.wr_str(self.path + old)
c.wr_str(self.path + new)
res = c.rd_s32()
c.end()
if res < 0:
raise OSError(-res)
def stat(self, path):
c = self.cmd
c.begin(CMD_STAT)
c.wr_str(self.path + path)
res = c.rd_s8()
if res < 0:
c.end()
raise OSError(-res)
mode = c.rd_u32()
size = c.rd_u32()
atime = c.rd_u32()
mtime = c.rd_u32()
ctime = c.rd_u32()
c.end()
return mode, 0, 0, 0, 0, 0, size, atime, mtime, ctime
def ilistdir(self, path):
c = self.cmd
c.begin(CMD_ILISTDIR_START)
c.wr_str(self.path + path)
res = c.rd_s8()
c.end()
if res < 0:
raise OSError(-res)
def next():
while True:
c.begin(CMD_ILISTDIR_NEXT)
name = c.rd_str()
if name:
type = c.rd_u32()
c.end()
yield (name, type, 0)
else:
c.end()
break
return next()
def open(self, path, mode):
c = self.cmd
c.begin(CMD_OPEN)
c.wr_str(self.path + path)
c.wr_str(mode)
fd = c.rd_s8()
c.end()
if fd < 0:
raise OSError(-fd)
return RemoteFile(c, fd, mode.find('b') == -1)
def __mount(use_second_port):
uos.mount(RemoteFS(RemoteCommand(use_second_port)), '/remote')
uos.chdir('/remote')
"""
# Apply basic compression on hook code.
for key, value in fs_hook_cmds.items():
fs_hook_code = re.sub(key, str(value), fs_hook_code)
fs_hook_code = re.sub(" *#.*$", "", fs_hook_code, flags=re.MULTILINE)
fs_hook_code = re.sub("\n\n+", "\n", fs_hook_code)
fs_hook_code = re.sub(" ", " ", fs_hook_code)
fs_hook_code = re.sub("rd_", "r", fs_hook_code)
fs_hook_code = re.sub("wr_", "w", fs_hook_code)
fs_hook_code = re.sub("buf4", "b4", fs_hook_code)
class PyboardCommand:
def __init__(self, fin, fout, path):
self.fin = fin
self.fout = fout
self.root = path + "/"
self.data_ilistdir = ["", []]
self.data_files = []
def rd_s8(self):
return struct.unpack("<b", self.fin.read(1))[0]
def rd_s32(self):
return struct.unpack("<i", self.fin.read(4))[0]
def rd_bytes(self):
n = self.rd_s32()
return self.fin.read(n)
def rd_str(self):
n = self.rd_s32()
if n == 0:
return ""
else:
return str(self.fin.read(n), "utf8")
def wr_s8(self, i):
self.fout.write(struct.pack("<b", i))
def wr_s32(self, i):
self.fout.write(struct.pack("<i", i))
def wr_u32(self, i):
self.fout.write(struct.pack("<I", i))
def wr_bytes(self, b):
self.wr_s32(len(b))
self.fout.write(b)
def wr_str(self, s):
b = bytes(s, "utf8")
self.wr_s32(len(b))
self.fout.write(b)
def log_cmd(self, msg):
print(f"[{msg}]", end="\r\n")
def path_check(self, path):
parent = os.path.realpath(self.root)
child = os.path.realpath(path)
if parent != os.path.commonpath([parent, child]):
raise OSError(EPERM, "") # File is outside mounted dir
def do_stat(self):
path = self.root + self.rd_str()
# self.log_cmd(f"stat {path}")
try:
self.path_check(path)
stat = os.stat(path)
except OSError as er:
self.wr_s8(-abs(er.errno))
else:
self.wr_s8(0)
# Note: st_ino would need to be 64-bit if added here
self.wr_u32(stat.st_mode)
self.wr_u32(stat.st_size)
self.wr_u32(int(stat.st_atime))
self.wr_u32(int(stat.st_mtime))
self.wr_u32(int(stat.st_ctime))
def do_ilistdir_start(self):
path = self.root + self.rd_str()
try:
self.path_check(path)
self.wr_s8(0)
except OSError as er:
self.wr_s8(-abs(er.errno))
else:
self.data_ilistdir[0] = path
self.data_ilistdir[1] = os.listdir(path)
def do_ilistdir_next(self):
if self.data_ilistdir[1]:
entry = self.data_ilistdir[1].pop(0)
try:
stat = os.lstat(self.data_ilistdir[0] + "/" + entry)
mode = stat.st_mode & 0xC000
except OSError as er:
mode = 0
self.wr_str(entry)
self.wr_u32(mode)
else:
self.wr_str("")
def do_open(self):
path = self.root + self.rd_str()
mode = self.rd_str()
# self.log_cmd(f"open {path} {mode}")
try:
self.path_check(path)
f = open(path, mode)
except OSError as er:
self.wr_s8(-abs(er.errno))
else:
is_text = mode.find("b") == -1
try:
fd = self.data_files.index(None)
self.data_files[fd] = (f, is_text)
except ValueError:
fd = len(self.data_files)
self.data_files.append((f, is_text))
self.wr_s8(fd)
def do_close(self):
fd = self.rd_s8()
# self.log_cmd(f"close {fd}")
self.data_files[fd][0].close()
self.data_files[fd] = None
def do_read(self):
fd = self.rd_s8()
n = self.rd_s32()
buf = self.data_files[fd][0].read(n)
if self.data_files[fd][1]:
buf = bytes(buf, "utf8")
self.wr_bytes(buf)
# self.log_cmd(f"read {fd} {n} -> {len(buf)}")
def do_seek(self):
fd = self.rd_s8()
n = self.rd_s32()
# self.log_cmd(f"seek {fd} {n}")
self.data_files[fd][0].seek(n)
self.wr_s32(n)
def do_write(self):
fd = self.rd_s8()
buf = self.rd_bytes()
if self.data_files[fd][1]:
buf = str(buf, "utf8")
n = self.data_files[fd][0].write(buf)
self.wr_s32(n)
# self.log_cmd(f"write {fd} {len(buf)} -> {n}")
def do_remove(self):
path = self.root + self.rd_str()
# self.log_cmd(f"remove {path}")
try:
self.path_check(path)
os.remove(path)
ret = 0
except OSError as er:
ret = -abs(er.errno)
self.wr_s32(ret)
def do_rename(self):
old = self.root + self.rd_str()
new = self.root + self.rd_str()
# self.log_cmd(f"rename {old} {new}")
try:
self.path_check(old)
self.path_check(new)
os.rename(old, new)
ret = 0
except OSError as er:
ret = -abs(er.errno)
self.wr_s32(ret)
cmd_table = {
fs_hook_cmds["CMD_STAT"]: do_stat,
fs_hook_cmds["CMD_ILISTDIR_START"]: do_ilistdir_start,
fs_hook_cmds["CMD_ILISTDIR_NEXT"]: do_ilistdir_next,
fs_hook_cmds["CMD_OPEN"]: do_open,
fs_hook_cmds["CMD_CLOSE"]: do_close,
fs_hook_cmds["CMD_READ"]: do_read,
fs_hook_cmds["CMD_WRITE"]: do_write,
fs_hook_cmds["CMD_SEEK"]: do_seek,
fs_hook_cmds["CMD_REMOVE"]: do_remove,
fs_hook_cmds["CMD_RENAME"]: do_rename,
}
class SerialIntercept:
def __init__(self, serial, cmd):
self.orig_serial = serial
self.cmd = cmd
self.buf = b""
self.orig_serial.timeout = 5.0
def _check_input(self, blocking):
if blocking or self.orig_serial.inWaiting() > 0:
c = self.orig_serial.read(1)
if c == b"\x18":
# a special command
c = self.orig_serial.read(1)[0]
self.orig_serial.write(b"\x18") # Acknowledge command
PyboardCommand.cmd_table[c](self.cmd)
elif not VT_ENABLED and c == b"\x1b":
# ESC code, ignore these on windows
esctype = self.orig_serial.read(1)
if esctype == b"[": # CSI
while not (0x40 < self.orig_serial.read(1)[0] < 0x7E):
# Looking for "final byte" of escape sequence
pass
else:
self.buf += c
@property
def fd(self):
return self.orig_serial.fd
def close(self):
self.orig_serial.close()
def inWaiting(self):
self._check_input(False)
return len(self.buf)
def read(self, n):
while len(self.buf) < n:
self._check_input(True)
out = self.buf[:n]
self.buf = self.buf[n:]
return out
def write(self, buf):
self.orig_serial.write(buf)
class PyboardExtended(Pyboard):
def __init__(self, dev, *args, **kwargs):
super().__init__(dev, *args, **kwargs)
self.device_name = dev
self.mounted = False
def mount_local(self, path, dev_out=None):
fout = self.serial
if dev_out is not None:
try:
fout = serial.Serial(dev_out)
except serial.SerialException:
port = list(serial.tools.list_ports.grep(dev_out))
if not port:
raise
for p in port:
try:
fout = serial.Serial(p.device)
break
except serial.SerialException:
pass
self.mounted = True
if self.eval('"RemoteFS" in globals()') == b"False":
self.exec_(fs_hook_code)
self.exec_("__mount(%s)" % (dev_out is not None))
self.cmd = PyboardCommand(self.serial, fout, path)
self.serial = SerialIntercept(self.serial, self.cmd)
self.dev_out = dev_out
def soft_reset_with_mount(self, out_callback):
self.serial.write(b"\x04")
if not self.mounted:
return
# Wait for a response to the soft-reset command.
for i in range(10):
if self.serial.inWaiting():
break
time.sleep(0.05)
else:
# Device didn't respond so it wasn't in a state to do a soft reset.
return
out_callback(self.serial.read(1))
self.serial = self.serial.orig_serial
n = self.serial.inWaiting()
while n > 0:
buf = self.serial.read(n)
out_callback(buf)
time.sleep(0.1)
n = self.serial.inWaiting()
self.serial.write(b"\x01")
self.exec_(fs_hook_code)
self.exec_("__mount(%s)" % (self.dev_out is not None))
self.exit_raw_repl()
self.read_until(4, b">>> ")
self.serial = SerialIntercept(self.serial, self.cmd)
def umount_local(self):
if self.mounted:
self.exec_('uos.umount("/remote")')
self.mounted = False

View File

@ -0,0 +1,6 @@
[build-system]
requires = [
"setuptools>=42",
"wheel"
]
build-backend = "setuptools.build_meta"

25
tools/mpremote/setup.cfg Normal file
View File

@ -0,0 +1,25 @@
[metadata]
name = mpremote
version = 0.0.4
author = Damien George
author_email = damien@micropython.org
description = Tool for interacting remotely with MicroPython
long_description = file: README.md
long_description_content_type = text/markdown
url = https://github.com/micropython/micropython
project_urls =
Bug Tracker = https://github.com/micropython/micropython/issues
classifiers =
Programming Language :: Python :: 3
License :: OSI Approved :: MIT License
Operating System :: OS Independent
[options]
packages = mpremote
python_requires = >= 3.4
install_requires =
pyserial >= 3.3
[options.entry_points]
console_scripts =
mpremote = mpremote.main:main