all: Replace all uses of umodule in Python code.
Applies to drivers/examples/extmod/port-modules/tools. This work was funded through GitHub Sponsors. Signed-off-by: Jim Mussared <jim.mussared@gmail.com>
This commit is contained in:
parent
9d7eac0713
commit
5fd042e7d1
|
@ -40,13 +40,13 @@ application of this idea would look like:
|
|||
`app.py`:
|
||||
|
||||
from hwconfig import *
|
||||
import utime
|
||||
import time
|
||||
|
||||
while True:
|
||||
LED.value(1)
|
||||
utime.sleep_ms(500)
|
||||
time.sleep_ms(500)
|
||||
LED.value(0)
|
||||
utime.sleep_ms(500)
|
||||
time.sleep_ms(500)
|
||||
|
||||
|
||||
To deploy this application to a particular board, a user will need:
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
import utime
|
||||
import time
|
||||
from hwconfig import LED, BUTTON
|
||||
|
||||
# Light LED when (and while) a BUTTON is pressed
|
||||
|
@ -6,4 +6,4 @@ from hwconfig import LED, BUTTON
|
|||
while 1:
|
||||
LED.value(BUTTON.value())
|
||||
# Don't burn CPU
|
||||
utime.sleep_ms(10)
|
||||
time.sleep_ms(10)
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
import utime
|
||||
import time
|
||||
import machine
|
||||
from hwconfig import LED, BUTTON
|
||||
|
||||
|
@ -18,4 +18,4 @@ while 1:
|
|||
print("Well, you're *really* slow")
|
||||
else:
|
||||
print("You are as slow as %d microseconds!" % delay)
|
||||
utime.sleep_ms(10)
|
||||
time.sleep_ms(10)
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
import utime
|
||||
import time
|
||||
from hwconfig import LED
|
||||
|
||||
|
||||
|
@ -15,10 +15,10 @@ def pwm_cycle(led, duty, cycles):
|
|||
for i in range(cycles):
|
||||
if duty:
|
||||
led.on()
|
||||
utime.sleep_ms(duty)
|
||||
time.sleep_ms(duty)
|
||||
if duty_off:
|
||||
led.off()
|
||||
utime.sleep_ms(duty_off)
|
||||
time.sleep_ms(duty_off)
|
||||
|
||||
|
||||
# At the duty setting of 1, an LED is still pretty bright, then
|
||||
|
|
|
@ -1,7 +1,4 @@
|
|||
try:
|
||||
import usocket as socket
|
||||
except:
|
||||
import socket
|
||||
import socket
|
||||
|
||||
|
||||
def main(use_stream=False):
|
||||
|
|
|
@ -1,17 +1,11 @@
|
|||
try:
|
||||
import usocket as _socket
|
||||
except:
|
||||
import _socket
|
||||
try:
|
||||
import ussl as ssl
|
||||
except:
|
||||
import ssl
|
||||
import socket
|
||||
import ssl
|
||||
|
||||
|
||||
def main(use_stream=True):
|
||||
s = _socket.socket()
|
||||
s = socket.socket()
|
||||
|
||||
ai = _socket.getaddrinfo("google.com", 443)
|
||||
ai = socket.getaddrinfo("google.com", 443)
|
||||
print("Address infos:", ai)
|
||||
addr = ai[0][-1]
|
||||
|
||||
|
|
|
@ -1,7 +1,4 @@
|
|||
try:
|
||||
import usocket as socket
|
||||
except:
|
||||
import socket
|
||||
import socket
|
||||
|
||||
|
||||
CONTENT = b"""\
|
||||
|
|
|
@ -1,9 +1,6 @@
|
|||
# Do not use this code in real projects! Read
|
||||
# http_server_simplistic_commented.py for details.
|
||||
try:
|
||||
import usocket as socket
|
||||
except:
|
||||
import socket
|
||||
import socket
|
||||
|
||||
|
||||
CONTENT = b"""\
|
||||
|
|
|
@ -8,10 +8,7 @@
|
|||
# details, and use this code only for quick hacks, preferring
|
||||
# http_server.py for "real thing".
|
||||
#
|
||||
try:
|
||||
import usocket as socket
|
||||
except:
|
||||
import socket
|
||||
import socket
|
||||
|
||||
|
||||
CONTENT = b"""\
|
||||
|
|
|
@ -1,10 +1,6 @@
|
|||
import ubinascii as binascii
|
||||
|
||||
try:
|
||||
import usocket as socket
|
||||
except:
|
||||
import socket
|
||||
import ussl as ssl
|
||||
import binascii
|
||||
import socket
|
||||
import ssl
|
||||
|
||||
|
||||
# This self-signed key/cert pair is randomly generated and to be used for
|
||||
|
|
|
@ -4,6 +4,6 @@
|
|||
# It is expected to print 0xaa55, which is a signature at the start of
|
||||
# Video BIOS.
|
||||
|
||||
import umachine as machine
|
||||
import machine
|
||||
|
||||
print(hex(machine.mem16[0xC0000]))
|
||||
|
|
|
@ -40,9 +40,9 @@ class Event:
|
|||
# that asyncio will poll until a flag is set.
|
||||
# Note: Unlike Event, this is self-clearing after a wait().
|
||||
try:
|
||||
import uio
|
||||
import io
|
||||
|
||||
class ThreadSafeFlag(uio.IOBase):
|
||||
class ThreadSafeFlag(io.IOBase):
|
||||
def __init__(self):
|
||||
self.state = 0
|
||||
|
||||
|
|
|
@ -101,8 +101,8 @@ StreamWriter = Stream
|
|||
#
|
||||
# async
|
||||
def open_connection(host, port):
|
||||
from uerrno import EINPROGRESS
|
||||
import usocket as socket
|
||||
from errno import EINPROGRESS
|
||||
import socket
|
||||
|
||||
ai = socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM)[0] # TODO this is blocking!
|
||||
s = socket.socket(ai[0], ai[1], ai[2])
|
||||
|
@ -154,7 +154,7 @@ class Server:
|
|||
# Helper function to start a TCP stream server, running as a new task
|
||||
# TODO could use an accept-callback on socket read activity instead of creating a task
|
||||
async def start_server(cb, host, port, backlog=5):
|
||||
import usocket as socket
|
||||
import socket
|
||||
|
||||
# Create and bind server socket.
|
||||
host = socket.getaddrinfo(host, port)[0] # TODO this is blocking!
|
||||
|
|
|
@ -1,10 +1,10 @@
|
|||
import gc
|
||||
import uos
|
||||
import os
|
||||
from flashbdev import bdev
|
||||
|
||||
try:
|
||||
if bdev:
|
||||
uos.mount(bdev, "/")
|
||||
os.mount(bdev, "/")
|
||||
except OSError:
|
||||
import inisetup
|
||||
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
import uos
|
||||
import os
|
||||
from flashbdev import bdev
|
||||
|
||||
|
||||
|
@ -33,9 +33,9 @@ by firmware programming).
|
|||
def setup():
|
||||
check_bootsec()
|
||||
print("Performing initial setup")
|
||||
uos.VfsLfs2.mkfs(bdev)
|
||||
vfs = uos.VfsLfs2(bdev)
|
||||
uos.mount(vfs, "/")
|
||||
os.VfsLfs2.mkfs(bdev)
|
||||
vfs = os.VfsLfs2(bdev)
|
||||
os.mount(vfs, "/")
|
||||
with open("boot.py", "w") as f:
|
||||
f.write(
|
||||
"""\
|
||||
|
|
|
@ -6,7 +6,7 @@ Note: v1.12-334 and newer (including v1.13) require an ESP8266 module with
|
|||
2MiB of flash or more, and use littlefs as the filesystem by default. When
|
||||
upgrading from older firmware please backup your files first, and either
|
||||
erase all flash before upgrading, or after upgrading execute
|
||||
`uos.VfsLfs2.mkfs(bdev)`.
|
||||
`os.VfsLfs2.mkfs(bdev)`.
|
||||
|
||||
### OTA builds
|
||||
Over-The-Air (OTA) builds of the ESP8266 firmware are also provided.
|
||||
|
|
|
@ -2,4 +2,4 @@ The following are daily builds of the ESP8266 firmware tailored for modules with
|
|||
only 1MiB of flash. This firmware uses littlefs as the filesystem.
|
||||
When upgrading from older firmware that uses a FAT filesystem please backup your files
|
||||
first, and either erase all flash before upgrading, or after upgrading execute
|
||||
`uos.VfsLfs2.mkfs(bdev)`.
|
||||
`os.VfsLfs2.mkfs(bdev)`.
|
||||
|
|
|
@ -1,12 +1,12 @@
|
|||
import gc
|
||||
|
||||
gc.threshold((gc.mem_free() + gc.mem_alloc()) // 4)
|
||||
import uos
|
||||
import os
|
||||
from flashbdev import bdev
|
||||
|
||||
if bdev:
|
||||
try:
|
||||
uos.mount(bdev, "/")
|
||||
os.mount(bdev, "/")
|
||||
except:
|
||||
import inisetup
|
||||
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
# MIT license; Copyright (c) 2022 Glenn Moloney @glenn20
|
||||
|
||||
from _espnow import *
|
||||
from uselect import poll, POLLIN
|
||||
from select import poll, POLLIN
|
||||
|
||||
|
||||
class ESPNow(ESPNowBase):
|
||||
|
|
|
@ -1,13 +1,13 @@
|
|||
import uos
|
||||
import os
|
||||
import network
|
||||
from flashbdev import bdev
|
||||
|
||||
|
||||
def wifi():
|
||||
import ubinascii
|
||||
import binascii
|
||||
|
||||
ap_if = network.WLAN(network.AP_IF)
|
||||
ssid = b"MicroPython-%s" % ubinascii.hexlify(ap_if.config("mac")[-3:])
|
||||
ssid = b"MicroPython-%s" % binascii.hexlify(ap_if.config("mac")[-3:])
|
||||
ap_if.config(ssid=ssid, security=network.AUTH_WPA_WPA2_PSK, key=b"micropythoN")
|
||||
|
||||
|
||||
|
@ -32,7 +32,7 @@ def fs_corrupted():
|
|||
"""\
|
||||
The filesystem starting at sector %d with size %d sectors looks corrupt.
|
||||
You may want to make a flash snapshot and try to recover it. Otherwise,
|
||||
format it with uos.VfsLfs2.mkfs(bdev), or completely erase the flash and
|
||||
format it with os.VfsLfs2.mkfs(bdev), or completely erase the flash and
|
||||
reprogram MicroPython.
|
||||
"""
|
||||
% (bdev.start_sec, bdev.blocks)
|
||||
|
@ -44,17 +44,17 @@ def setup():
|
|||
check_bootsec()
|
||||
print("Performing initial setup")
|
||||
wifi()
|
||||
uos.VfsLfs2.mkfs(bdev)
|
||||
vfs = uos.VfsLfs2(bdev)
|
||||
uos.mount(vfs, "/")
|
||||
os.VfsLfs2.mkfs(bdev)
|
||||
vfs = os.VfsLfs2(bdev)
|
||||
os.mount(vfs, "/")
|
||||
with open("boot.py", "w") as f:
|
||||
f.write(
|
||||
"""\
|
||||
# This file is executed on every boot (including wake-boot from deepsleep)
|
||||
#import esp
|
||||
#esp.osdebug(None)
|
||||
import uos, machine
|
||||
#uos.dupterm(None, 1) # disable REPL on UART(0)
|
||||
import os, machine
|
||||
#os.dupterm(None, 1) # disable REPL on UART(0)
|
||||
import gc
|
||||
#import webrepl
|
||||
#webrepl.start()
|
||||
|
|
|
@ -1,19 +1,19 @@
|
|||
import uos, nrf
|
||||
import os, nrf
|
||||
|
||||
try:
|
||||
from uos import VfsLfs1
|
||||
from os import VfsLfs1
|
||||
|
||||
uos.VfsLfs1.mkfs(nrf.Flash())
|
||||
os.VfsLfs1.mkfs(nrf.Flash())
|
||||
except ImportError:
|
||||
try:
|
||||
from uos import VfsLfs2
|
||||
from os import VfsLfs2
|
||||
|
||||
uos.VfsLfs2.mkfs(nrf.Flash())
|
||||
os.VfsLfs2.mkfs(nrf.Flash())
|
||||
except ImportError:
|
||||
try:
|
||||
from uos import VfsFat
|
||||
from os import VfsFat
|
||||
|
||||
uos.VfsFat.mkfs(nrf.Flash())
|
||||
os.VfsFat.mkfs(nrf.Flash())
|
||||
except ImportError:
|
||||
pass
|
||||
except OSError as e:
|
||||
|
|
|
@ -34,9 +34,9 @@ class PIOASMEmit:
|
|||
pull_thresh=32,
|
||||
fifo_join=0
|
||||
):
|
||||
# uarray is a built-in module so importing it here won't require
|
||||
# array is a built-in module so importing it here won't require
|
||||
# scanning the filesystem.
|
||||
from uarray import array
|
||||
from array import array
|
||||
|
||||
self.labels = {}
|
||||
execctrl = 0
|
||||
|
|
|
@ -1,19 +1,19 @@
|
|||
import gc
|
||||
import uos
|
||||
import os
|
||||
import samd
|
||||
|
||||
bdev = samd.Flash()
|
||||
|
||||
# Try to mount the filesystem, and format the flash if it doesn't exist.
|
||||
fs_type = uos.VfsLfs2 if hasattr(uos, "VfsLfs2") else uos.VfsLfs1
|
||||
fs_type = os.VfsLfs2 if hasattr(os, "VfsLfs2") else os.VfsLfs1
|
||||
|
||||
try:
|
||||
vfs = fs_type(bdev, progsize=256)
|
||||
except:
|
||||
fs_type.mkfs(bdev, progsize=256)
|
||||
vfs = fs_type(bdev, progsize=256)
|
||||
uos.mount(vfs, "/")
|
||||
os.mount(vfs, "/")
|
||||
|
||||
del vfs, fs_type, bdev, uos, samd
|
||||
del vfs, fs_type, bdev, os, samd
|
||||
gc.collect()
|
||||
del gc
|
||||
|
|
|
@ -73,7 +73,7 @@ import struct, os
|
|||
|
||||
try:
|
||||
import machine, stm
|
||||
from ubinascii import crc32
|
||||
from binascii import crc32
|
||||
from micropython import const
|
||||
except ImportError:
|
||||
# cpython
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
|
||||
from micropython import const
|
||||
import struct, time
|
||||
import uzlib, machine, stm
|
||||
import zlib, machine, stm
|
||||
|
||||
# Constants to be used with update_mpy
|
||||
VFS_FAT = 1
|
||||
|
@ -36,7 +36,7 @@ def dfu_read(filename):
|
|||
if hdr == b"Dfu":
|
||||
pass
|
||||
elif hdr == b"\x1f\x8b\x08":
|
||||
f = uzlib.DecompIO(f, 16 + 15)
|
||||
f = zlib.DecompIO(f, 16 + 15)
|
||||
else:
|
||||
print("Invalid firmware", filename)
|
||||
return None
|
||||
|
@ -231,7 +231,7 @@ def update_app_elements(
|
|||
# Check firmware is of .dfu or .dfu.gz type
|
||||
try:
|
||||
with open(filename, "rb") as f:
|
||||
hdr = uzlib.DecompIO(f, 16 + 15).read(6)
|
||||
hdr = zlib.DecompIO(f, 16 + 15).read(6)
|
||||
except Exception:
|
||||
with open(filename, "rb") as f:
|
||||
hdr = f.read(6)
|
||||
|
|
|
@ -137,14 +137,14 @@ def do_filesystem(state, args):
|
|||
raise CommandError("'cp -r' source files must be local")
|
||||
_list_recursive(src_files, path)
|
||||
known_dirs = {""}
|
||||
state.transport.exec_("import uos")
|
||||
state.transport.exec_("import os")
|
||||
for dir, file in src_files:
|
||||
dir_parts = dir.split("/")
|
||||
for i in range(len(dir_parts)):
|
||||
d = "/".join(dir_parts[: i + 1])
|
||||
if d not in known_dirs:
|
||||
state.transport.exec_(
|
||||
"try:\n uos.mkdir('%s')\nexcept OSError as e:\n print(e)" % d
|
||||
"try:\n os.mkdir('%s')\nexcept OSError as e:\n print(e)" % d
|
||||
)
|
||||
known_dirs.add(d)
|
||||
state.transport.filesystem_command(
|
||||
|
|
|
@ -317,7 +317,7 @@ _BUILTIN_COMMAND_EXPANSIONS = {
|
|||
# Disk used/free.
|
||||
"df": [
|
||||
"exec",
|
||||
"import uos\nprint('mount \\tsize \\tused \\tavail \\tuse%')\nfor _m in [''] + uos.listdir('/'):\n _s = uos.stat('/' + _m)\n if not _s[0] & 1 << 14: continue\n _s = uos.statvfs(_m)\n if _s[0]:\n _size = _s[0] * _s[2]; _free = _s[0] * _s[3]; print(_m, _size, _size - _free, _free, int(100 * (_size - _free) / _size), sep='\\t')",
|
||||
"import os\nprint('mount \\tsize \\tused \\tavail \\tuse%')\nfor _m in [''] + os.listdir('/'):\n _s = os.stat('/' + _m)\n if not _s[0] & 1 << 14: continue\n _s = os.statvfs(_m)\n if _s[0]:\n _size = _s[0] * _s[2]; _free = _s[0] * _s[3]; print(_m, _size, _size - _free, _free, int(100 * (_size - _free) / _size), sep='\\t')",
|
||||
],
|
||||
# Other shortcuts.
|
||||
"reset": {
|
||||
|
|
|
@ -292,14 +292,14 @@ class SerialTransport(Transport):
|
|||
|
||||
def fs_exists(self, src):
|
||||
try:
|
||||
self.exec("import uos\nuos.stat(%s)" % (("'%s'" % src) if src else ""))
|
||||
self.exec("import os\nos.stat(%s)" % (("'%s'" % src) if src else ""))
|
||||
return True
|
||||
except TransportError:
|
||||
return False
|
||||
|
||||
def fs_ls(self, src):
|
||||
cmd = (
|
||||
"import uos\nfor f in uos.ilistdir(%s):\n"
|
||||
"import os\nfor f in os.ilistdir(%s):\n"
|
||||
" print('{:12} {}{}'.format(f[3]if len(f)>3 else 0,f[0],'/'if f[1]&0x4000 else ''))"
|
||||
% (("'%s'" % src) if src else "")
|
||||
)
|
||||
|
@ -311,7 +311,7 @@ class SerialTransport(Transport):
|
|||
def repr_consumer(b):
|
||||
buf.extend(b.replace(b"\x04", b""))
|
||||
|
||||
cmd = "import uos\nfor f in uos.ilistdir(%s):\n" " print(repr(f), end=',')" % (
|
||||
cmd = "import os\nfor f in os.ilistdir(%s):\n" " print(repr(f), end=',')" % (
|
||||
("'%s'" % src) if src else ""
|
||||
)
|
||||
try:
|
||||
|
@ -328,8 +328,8 @@ class SerialTransport(Transport):
|
|||
|
||||
def fs_stat(self, src):
|
||||
try:
|
||||
self.exec("import uos")
|
||||
return os.stat_result(self.eval("uos.stat(%s)" % (("'%s'" % src)), parse=True))
|
||||
self.exec("import os")
|
||||
return os.stat_result(self.eval("os.stat(%s)" % (("'%s'" % src)), parse=True))
|
||||
except TransportError as e:
|
||||
reraise_filesystem_error(e, src)
|
||||
|
||||
|
@ -422,13 +422,13 @@ class SerialTransport(Transport):
|
|||
self.exec("f.close()")
|
||||
|
||||
def fs_mkdir(self, dir):
|
||||
self.exec("import uos\nuos.mkdir('%s')" % dir)
|
||||
self.exec("import os\nos.mkdir('%s')" % dir)
|
||||
|
||||
def fs_rmdir(self, dir):
|
||||
self.exec("import uos\nuos.rmdir('%s')" % dir)
|
||||
self.exec("import os\nos.rmdir('%s')" % dir)
|
||||
|
||||
def fs_rm(self, src):
|
||||
self.exec("import uos\nuos.remove('%s')" % src)
|
||||
self.exec("import os\nos.remove('%s')" % src)
|
||||
|
||||
def fs_touch(self, src):
|
||||
self.exec("f=open('%s','a')\nf.close()" % src)
|
||||
|
@ -595,7 +595,7 @@ class SerialTransport(Transport):
|
|||
|
||||
def umount_local(self):
|
||||
if self.mounted:
|
||||
self.exec('uos.umount("/remote")')
|
||||
self.exec('os.umount("/remote")')
|
||||
self.mounted = False
|
||||
self.serial = self.serial.orig_serial
|
||||
|
||||
|
@ -616,18 +616,18 @@ fs_hook_cmds = {
|
|||
}
|
||||
|
||||
fs_hook_code = """\
|
||||
import uos, uio, ustruct, micropython
|
||||
import os, io, struct, micropython
|
||||
|
||||
SEEK_SET = 0
|
||||
|
||||
class RemoteCommand:
|
||||
def __init__(self):
|
||||
import uselect, usys
|
||||
import select, sys
|
||||
self.buf4 = bytearray(4)
|
||||
self.fout = usys.stdout.buffer
|
||||
self.fin = usys.stdin.buffer
|
||||
self.poller = uselect.poll()
|
||||
self.poller.register(self.fin, uselect.POLLIN)
|
||||
self.fout = sys.stdout.buffer
|
||||
self.fin = sys.stdin.buffer
|
||||
self.poller = select.poll()
|
||||
self.poller.register(self.fin, select.POLLIN)
|
||||
|
||||
def poll_in(self):
|
||||
for _ in self.poller.ipoll(1000):
|
||||
|
@ -710,7 +710,7 @@ class RemoteCommand:
|
|||
self.fout.write(self.buf4, 1)
|
||||
|
||||
def wr_s32(self, i):
|
||||
ustruct.pack_into('<i', self.buf4, 0, i)
|
||||
struct.pack_into('<i', self.buf4, 0, i)
|
||||
self.fout.write(self.buf4)
|
||||
|
||||
def wr_bytes(self, b):
|
||||
|
@ -721,7 +721,7 @@ class RemoteCommand:
|
|||
wr_str = wr_bytes
|
||||
|
||||
|
||||
class RemoteFile(uio.IOBase):
|
||||
class RemoteFile(io.IOBase):
|
||||
def __init__(self, cmd, fd, is_text):
|
||||
self.cmd = cmd
|
||||
self.fd = fd
|
||||
|
@ -934,8 +934,8 @@ class RemoteFS:
|
|||
|
||||
|
||||
def __mount():
|
||||
uos.mount(RemoteFS(RemoteCommand()), '/remote')
|
||||
uos.chdir('/remote')
|
||||
os.mount(RemoteFS(RemoteCommand()), '/remote')
|
||||
os.chdir('/remote')
|
||||
"""
|
||||
|
||||
# Apply basic compression on hook code.
|
||||
|
|
|
@ -509,14 +509,14 @@ class Pyboard:
|
|||
|
||||
def fs_exists(self, src):
|
||||
try:
|
||||
self.exec_("import uos\nuos.stat(%s)" % (("'%s'" % src) if src else ""))
|
||||
self.exec_("import os\nos.stat(%s)" % (("'%s'" % src) if src else ""))
|
||||
return True
|
||||
except PyboardError:
|
||||
return False
|
||||
|
||||
def fs_ls(self, src):
|
||||
cmd = (
|
||||
"import uos\nfor f in uos.ilistdir(%s):\n"
|
||||
"import os\nfor f in os.ilistdir(%s):\n"
|
||||
" print('{:12} {}{}'.format(f[3]if len(f)>3 else 0,f[0],'/'if f[1]&0x4000 else ''))"
|
||||
% (("'%s'" % src) if src else "")
|
||||
)
|
||||
|
@ -528,7 +528,7 @@ class Pyboard:
|
|||
def repr_consumer(b):
|
||||
buf.extend(b.replace(b"\x04", b""))
|
||||
|
||||
cmd = "import uos\nfor f in uos.ilistdir(%s):\n" " print(repr(f), end=',')" % (
|
||||
cmd = "import os\nfor f in os.ilistdir(%s):\n" " print(repr(f), end=',')" % (
|
||||
("'%s'" % src) if src else ""
|
||||
)
|
||||
try:
|
||||
|
@ -545,8 +545,8 @@ class Pyboard:
|
|||
|
||||
def fs_stat(self, src):
|
||||
try:
|
||||
self.exec_("import uos")
|
||||
return os.stat_result(self.eval("uos.stat(%s)" % (("'%s'" % src)), parse=True))
|
||||
self.exec_("import os")
|
||||
return os.stat_result(self.eval("os.stat(%s)" % (("'%s'" % src)), parse=True))
|
||||
except PyboardError as e:
|
||||
raise e.convert(src)
|
||||
|
||||
|
@ -639,13 +639,13 @@ class Pyboard:
|
|||
self.exec_("f.close()")
|
||||
|
||||
def fs_mkdir(self, dir):
|
||||
self.exec_("import uos\nuos.mkdir('%s')" % dir)
|
||||
self.exec_("import os\nos.mkdir('%s')" % dir)
|
||||
|
||||
def fs_rmdir(self, dir):
|
||||
self.exec_("import uos\nuos.rmdir('%s')" % dir)
|
||||
self.exec_("import os\nos.rmdir('%s')" % dir)
|
||||
|
||||
def fs_rm(self, src):
|
||||
self.exec_("import uos\nuos.remove('%s')" % src)
|
||||
self.exec_("import os\nos.remove('%s')" % src)
|
||||
|
||||
def fs_touch(self, src):
|
||||
self.exec_("f=open('%s','a')\nf.close()" % src)
|
||||
|
@ -737,9 +737,9 @@ def filesystem_command(pyb, args, progress_callback=None, verbose=False):
|
|||
|
||||
|
||||
_injected_import_hook_code = """\
|
||||
import uos, uio
|
||||
import os, io
|
||||
class _FS:
|
||||
class File(uio.IOBase):
|
||||
class File(io.IOBase):
|
||||
def __init__(self):
|
||||
self.off = 0
|
||||
def ioctl(self, request, arg):
|
||||
|
@ -756,10 +756,10 @@ class _FS:
|
|||
raise OSError(-2) # ENOENT
|
||||
def open(self, path, mode):
|
||||
return self.File()
|
||||
uos.mount(_FS(), '/_')
|
||||
uos.chdir('/_')
|
||||
os.mount(_FS(), '/_')
|
||||
os.chdir('/_')
|
||||
from _injected import *
|
||||
uos.umount('/_')
|
||||
os.umount('/_')
|
||||
del _injected_buf, _FS
|
||||
"""
|
||||
|
||||
|
|
Loading…
Reference in New Issue