From aa64280666d40208ade0183503fe683fbb32c2ab Mon Sep 17 00:00:00 2001 From: Jim Mussared Date: Wed, 21 Dec 2022 11:36:18 +1100 Subject: [PATCH] tools/pyboard.py: Add fs_{listdir,readfile,writefile,stat}. These are for working with the filesystem when using pyboard.py as a library, rather than at the command line. - fs_listdir returns a list of tuples, in the same format as os.ilistdir(). - fs_readfile returns the contents of a file as a bytes object. - fs_writefile allows writing a bytes object to a file. - fs_stat returns an os.statresult. All raise FileNotFoundError (or OSError(ENOENT) on Python 2) if the file is not found (or PyboardError on other errors). Updated fs_cp and fs_get to use fs_stat to compute file size. This work was funded through GitHub Sponsors. Signed-off-by: Jim Mussared --- tools/pyboard.py | 69 +++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 66 insertions(+), 3 deletions(-) diff --git a/tools/pyboard.py b/tools/pyboard.py index b45cc19196..d1abd2b781 100755 --- a/tools/pyboard.py +++ b/tools/pyboard.py @@ -73,6 +73,8 @@ import struct import sys import time +from collections import namedtuple + try: stdout = sys.stdout.buffer except AttributeError: @@ -87,7 +89,15 @@ def stdout_write_bytes(b): class PyboardError(Exception): - pass + def convert(self, info): + if len(self.args) >= 3: + if b"OSError" in self.args[2] and b"ENOENT" in self.args[2]: + return OSError(errno.ENOENT, info) + + return self + + +listdir_result = namedtuple("dir_result", ["name", "st_mode", "st_ino", "st_size"]) class TelnetToSerial: @@ -467,6 +477,7 @@ class Pyboard: ret = ret.strip() return ret + # In Python3, call as pyboard.exec(), see the setattr call below. def exec_(self, command, data_consumer=None): ret, ret_err = self.exec_raw(command, data_consumer=data_consumer) if ret_err: @@ -497,6 +508,34 @@ class Pyboard: ) self.exec_(cmd, data_consumer=stdout_write_bytes) + def fs_listdir(self, src=""): + buf = bytearray() + + def repr_consumer(b): + buf.extend(b.replace(b"\x04", b"")) + + cmd = "import uos\nfor f in uos.ilistdir(%s):\n" " print(repr(f), end=',')" % ( + ("'%s'" % src) if src else "" + ) + try: + buf.extend(b"[") + self.exec_(cmd, data_consumer=repr_consumer) + buf.extend(b"]") + except PyboardError as e: + raise e.convert(src) + + return [ + listdir_result(*f) if len(f) == 4 else listdir_result(*(f + (0,))) + for f in ast.literal_eval(buf.decode()) + ] + + def fs_stat(self, src): + try: + self.exec_("import uos") + return os.stat_result(self.eval("uos.stat(%s)" % (("'%s'" % src)), parse=True)) + except PyboardError as e: + raise e.convert(src) + def fs_cat(self, src, chunk_size=256): cmd = ( "with open('%s') as f:\n while 1:\n" @@ -504,9 +543,33 @@ class Pyboard: ) self.exec_(cmd, data_consumer=stdout_write_bytes) + def fs_readfile(self, src, chunk_size=256): + buf = bytearray() + + def repr_consumer(b): + buf.extend(b.replace(b"\x04", b"")) + + cmd = ( + "with open('%s', 'rb') as f:\n while 1:\n" + " b=f.read(%u)\n if not b:break\n print(b,end='')" % (src, chunk_size) + ) + try: + self.exec_(cmd, data_consumer=repr_consumer) + except PyboardError as e: + raise e.convert(src) + return ast.literal_eval(buf.decode()) + + def fs_writefile(self, dest, data, chunk_size=256): + self.exec_("f=open('%s','wb')\nw=f.write" % dest) + while data: + chunk = data[:chunk_size] + self.exec_("w(" + repr(chunk) + ")") + data = data[len(chunk) :] + self.exec_("f.close()") + def fs_cp(self, src, dest, chunk_size=256, progress_callback=None): if progress_callback: - src_size = int(self.exec_("import os\nprint(os.stat('%s')[6])" % src)) + src_size = self.fs_stat(src).st_size written = 0 self.exec_("fr=open('%s','rb')\nr=fr.read\nfw=open('%s','wb')\nw=fw.write" % (src, dest)) while True: @@ -520,7 +583,7 @@ class Pyboard: def fs_get(self, src, dest, chunk_size=256, progress_callback=None): if progress_callback: - src_size = int(self.exec_("import os\nprint(os.stat('%s')[6])" % src)) + src_size = self.fs_stat(src).st_size written = 0 self.exec_("f=open('%s','rb')\nr=f.read" % src) with open(dest, "wb") as f: