tools/pyboard: Add "exec" and "execpty" pseudo-devices support.
This allows to execute a command and communicate with its stdin/stdout via pipes ("exec") or with command-created pseudo-terminal ("execpty"), to emulate serial access. Immediate usecase is controlling a QEMU process which emulates board's serial via normal console, but it could be used e.g. with helper binaries to access real board over other hadware protocols, etc. An example of device specification for these cases is: --device exec:../zephyr/qemu.sh --device execpty:../zephyr/qemu2.sh Where qemu.sh contains long-long qemu startup line, or calls another command. There's a special support in this patch for running the command in a new terminal session, to support shell wrappers like that (without new terminal session, only wrapper script would be terminated, but its child processes would continue to run).
This commit is contained in:
parent
58168c8e6b
commit
647e72ca63
@ -39,6 +39,7 @@ Or:
|
||||
|
||||
import sys
|
||||
import time
|
||||
import os
|
||||
|
||||
try:
|
||||
stdout = sys.stdout.buffer
|
||||
@ -116,9 +117,91 @@ class TelnetToSerial:
|
||||
else:
|
||||
return n_waiting
|
||||
|
||||
|
||||
class ProcessToSerial:
|
||||
"Execute a process and emulate serial connection using its stdin/stdout."
|
||||
|
||||
def __init__(self, cmd):
|
||||
import subprocess
|
||||
self.subp = subprocess.Popen(cmd.split(), bufsize=0, shell=True, preexec_fn=os.setsid,
|
||||
stdin=subprocess.PIPE, stdout=subprocess.PIPE)
|
||||
|
||||
# Initially was implemented with selectors, but that adds Python3
|
||||
# dependency. However, there can be race conditions communicating
|
||||
# with a particular child process (like QEMU), and selectors may
|
||||
# still work better in that case, so left inplace for now.
|
||||
#
|
||||
#import selectors
|
||||
#self.sel = selectors.DefaultSelector()
|
||||
#self.sel.register(self.subp.stdout, selectors.EVENT_READ)
|
||||
|
||||
import select
|
||||
self.poll = select.poll()
|
||||
self.poll.register(self.subp.stdout.fileno())
|
||||
|
||||
def close(self):
|
||||
import signal
|
||||
os.killpg(os.getpgid(self.subp.pid), signal.SIGTERM)
|
||||
|
||||
def read(self, size=1):
|
||||
data = b""
|
||||
while len(data) < size:
|
||||
data += self.subp.stdout.read(size - len(data))
|
||||
return data
|
||||
|
||||
def write(self, data):
|
||||
self.subp.stdin.write(data)
|
||||
return len(data)
|
||||
|
||||
def inWaiting(self):
|
||||
#res = self.sel.select(0)
|
||||
res = self.poll.poll(0)
|
||||
if res:
|
||||
return 1
|
||||
return 0
|
||||
|
||||
|
||||
class ProcessPtyToTerminal:
|
||||
"""Execute a process which creates a PTY and prints slave PTY as
|
||||
first line of its output, and emulate serial connection using
|
||||
this PTY."""
|
||||
|
||||
def __init__(self, cmd):
|
||||
import subprocess
|
||||
import re
|
||||
import serial
|
||||
self.subp = subprocess.Popen(cmd.split(), bufsize=0, shell=True, preexec_fn=os.setsid,
|
||||
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
pty_line = self.subp.stderr.readline().decode("utf-8")
|
||||
m = re.search(r"/dev/pts/[0-9]+", pty_line)
|
||||
if not m:
|
||||
print("Error: unable to find PTY device in startup line:", pty_line)
|
||||
self.close()
|
||||
sys.exit(1)
|
||||
pty = m.group()
|
||||
self.ser = serial.Serial(pty, interCharTimeout=1)
|
||||
|
||||
def close(self):
|
||||
import signal
|
||||
os.killpg(os.getpgid(self.subp.pid), signal.SIGTERM)
|
||||
|
||||
def read(self, size=1):
|
||||
return self.ser.read(size)
|
||||
|
||||
def write(self, data):
|
||||
return self.ser.write(data)
|
||||
|
||||
def inWaiting(self):
|
||||
return self.ser.inWaiting()
|
||||
|
||||
|
||||
class Pyboard:
|
||||
def __init__(self, device, baudrate=115200, user='micro', password='python', wait=0):
|
||||
if device and device[0].isdigit() and device[-1].isdigit() and device.count('.') == 3:
|
||||
if device.startswith("exec:"):
|
||||
self.serial = ProcessToSerial(device[len("exec:"):])
|
||||
elif device.startswith("execpty:"):
|
||||
self.serial = ProcessPtyToTerminal(device[len("qemupty:"):])
|
||||
elif device and device[0].isdigit() and device[-1].isdigit() and device.count('.') == 3:
|
||||
# device looks like an IP address
|
||||
self.serial = TelnetToSerial(device, user, password, read_timeout=10)
|
||||
else:
|
||||
|
Loading…
Reference in New Issue
Block a user