circuitpython/extmod/webrepl/webrepl.py
Jim Mussared 7ce1e0b1dc extmod/webrepl: Move webrepl scripts to common place and use manifest.
Move webrepl support code from ports/esp8266/modules into extmod/webrepl
(to be alongside extmod/modwebrepl.c), and use frozen manifests to include
it in the build on esp8266 and esp32.

A small modification is made to webrepl.py to make it work on non-ESP
ports, i.e. don't call dupterm_notify if not available.
2019-12-20 12:59:13 +11:00

81 lines
2.1 KiB
Python

# This module should be imported from REPL, not run from command line.
import socket
import uos
import network
import uwebsocket
import websocket_helper
import _webrepl
listen_s = None
client_s = None
def setup_conn(port, accept_handler):
global listen_s
listen_s = socket.socket()
listen_s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
ai = socket.getaddrinfo("0.0.0.0", port)
addr = ai[0][4]
listen_s.bind(addr)
listen_s.listen(1)
if accept_handler:
listen_s.setsockopt(socket.SOL_SOCKET, 20, accept_handler)
for i in (network.AP_IF, network.STA_IF):
iface = network.WLAN(i)
if iface.active():
print("WebREPL daemon started on ws://%s:%d" % (iface.ifconfig()[0], port))
return listen_s
def accept_conn(listen_sock):
global client_s
cl, remote_addr = listen_sock.accept()
prev = uos.dupterm(None)
uos.dupterm(prev)
if prev:
print("\nConcurrent WebREPL connection from", remote_addr, "rejected")
cl.close()
return
print("\nWebREPL connection from:", remote_addr)
client_s = cl
websocket_helper.server_handshake(cl)
ws = uwebsocket.websocket(cl, True)
ws = _webrepl._webrepl(ws)
cl.setblocking(False)
# notify REPL on socket incoming data (ESP32/ESP8266-only)
if hasattr(uos, 'dupterm_notify'):
cl.setsockopt(socket.SOL_SOCKET, 20, uos.dupterm_notify)
uos.dupterm(ws)
def stop():
global listen_s, client_s
uos.dupterm(None)
if client_s:
client_s.close()
if listen_s:
listen_s.close()
def start(port=8266, password=None):
stop()
if password is None:
try:
import webrepl_cfg
_webrepl.password(webrepl_cfg.PASS)
setup_conn(port, accept_conn)
print("Started webrepl in normal mode")
except:
print("WebREPL is not configured, run 'import webrepl_setup'")
else:
_webrepl.password(password)
setup_conn(port, accept_conn)
print("Started webrepl in manual override mode")
def start_foreground(port=8266):
stop()
s = setup_conn(port, None)
accept_conn(s)