esp8266/scripts/onewire.py: Simplify and improve 1-wire driver.
Changes are: - added OneWireError exception and used where errors can occur - renamed read/write functions to use same names as C _onewire funcs - read_bytes is now read, write_bytes is now write - add ability to read/write DS18B20 scratch pad - rename start_measure to convert_temp (since that's what it does) - rename get_temp to read_temp (consistency with other read names) - removed test function
This commit is contained in:
parent
38358a096d
commit
1f7cec944e
@ -1,10 +1,15 @@
|
|||||||
|
# 1-Wire driver for MicroPython on ESP8266
|
||||||
|
# MIT license; Copyright (c) 2016 Damien P. George
|
||||||
|
|
||||||
import _onewire as _ow
|
import _onewire as _ow
|
||||||
|
|
||||||
|
class OneWireError(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
class OneWire:
|
class OneWire:
|
||||||
CMD_SEARCHROM = const(0xf0)
|
SEARCH_ROM = const(0xf0)
|
||||||
CMD_READROM = const(0x33)
|
MATCH_ROM = const(0x55)
|
||||||
CMD_MATCHROM = const(0x55)
|
SKIP_ROM = const(0xcc)
|
||||||
CMD_SKIPROM = const(0xcc)
|
|
||||||
|
|
||||||
def __init__(self, pin):
|
def __init__(self, pin):
|
||||||
self.pin = pin
|
self.pin = pin
|
||||||
@ -13,32 +18,32 @@ class OneWire:
|
|||||||
def reset(self):
|
def reset(self):
|
||||||
return _ow.reset(self.pin)
|
return _ow.reset(self.pin)
|
||||||
|
|
||||||
def read_bit(self):
|
def readbit(self):
|
||||||
return _ow.readbit(self.pin)
|
return _ow.readbit(self.pin)
|
||||||
|
|
||||||
def read_byte(self):
|
def readbyte(self):
|
||||||
return _ow.readbyte(self.pin)
|
return _ow.readbyte(self.pin)
|
||||||
|
|
||||||
def read_bytes(self, count):
|
def read(self, count):
|
||||||
buf = bytearray(count)
|
buf = bytearray(count)
|
||||||
for i in range(count):
|
for i in range(count):
|
||||||
buf[i] = _ow.readbyte(self.pin)
|
buf[i] = _ow.readbyte(self.pin)
|
||||||
return buf
|
return buf
|
||||||
|
|
||||||
def write_bit(self, value):
|
def writebit(self, value):
|
||||||
return _ow.writebit(self.pin, value)
|
return _ow.writebit(self.pin, value)
|
||||||
|
|
||||||
def write_byte(self, value):
|
def writebyte(self, value):
|
||||||
return _ow.writebyte(self.pin, value)
|
return _ow.writebyte(self.pin, value)
|
||||||
|
|
||||||
def write_bytes(self, buf):
|
def write(self, buf):
|
||||||
for b in buf:
|
for b in buf:
|
||||||
_ow.writebyte(self.pin, b)
|
_ow.writebyte(self.pin, b)
|
||||||
|
|
||||||
def select_rom(self, rom):
|
def select_rom(self, rom):
|
||||||
self.reset()
|
self.reset()
|
||||||
self.write_byte(CMD_MATCHROM)
|
self.writebyte(MATCH_ROM)
|
||||||
self.write_bytes(rom)
|
self.write(rom)
|
||||||
|
|
||||||
def scan(self):
|
def scan(self):
|
||||||
devices = []
|
devices = []
|
||||||
@ -55,7 +60,7 @@ class OneWire:
|
|||||||
def _search_rom(self, l_rom, diff):
|
def _search_rom(self, l_rom, diff):
|
||||||
if not self.reset():
|
if not self.reset():
|
||||||
return None, 0
|
return None, 0
|
||||||
self.write_byte(CMD_SEARCHROM)
|
self.writebyte(SEARCH_ROM)
|
||||||
if not l_rom:
|
if not l_rom:
|
||||||
l_rom = bytearray(8)
|
l_rom = bytearray(8)
|
||||||
rom = bytearray(8)
|
rom = bytearray(8)
|
||||||
@ -64,8 +69,8 @@ class OneWire:
|
|||||||
for byte in range(8):
|
for byte in range(8):
|
||||||
r_b = 0
|
r_b = 0
|
||||||
for bit in range(8):
|
for bit in range(8):
|
||||||
b = self.read_bit()
|
b = self.readbit()
|
||||||
if self.read_bit():
|
if self.readbit():
|
||||||
if b: # there are no devices or there is an error on the bus
|
if b: # there are no devices or there is an error on the bus
|
||||||
return None, 0
|
return None, 0
|
||||||
else:
|
else:
|
||||||
@ -73,7 +78,7 @@ class OneWire:
|
|||||||
if diff > i or ((l_rom[byte] & (1 << bit)) and diff != i):
|
if diff > i or ((l_rom[byte] & (1 << bit)) and diff != i):
|
||||||
b = 1
|
b = 1
|
||||||
next_diff = i
|
next_diff = i
|
||||||
self.write_bit(b)
|
self.writebit(b)
|
||||||
if b:
|
if b:
|
||||||
r_b |= 1 << bit
|
r_b |= 1 << bit
|
||||||
i -= 1
|
i -= 1
|
||||||
@ -84,60 +89,39 @@ class OneWire:
|
|||||||
return _ow.crc8(data)
|
return _ow.crc8(data)
|
||||||
|
|
||||||
class DS18B20:
|
class DS18B20:
|
||||||
THERM_CMD_CONVERTTEMP = const(0x44)
|
CONVERT = const(0x44)
|
||||||
THERM_CMD_RSCRATCHPAD = const(0xbe)
|
RD_SCRATCH = const(0xbe)
|
||||||
|
WR_SCRATCH = const(0x4e)
|
||||||
|
|
||||||
def __init__(self, onewire):
|
def __init__(self, onewire):
|
||||||
self.ow = onewire
|
self.ow = onewire
|
||||||
self.roms = []
|
|
||||||
|
|
||||||
def scan(self):
|
def scan(self):
|
||||||
self.roms = []
|
return [rom for rom in self.ow.scan() if rom[0] == 0x28]
|
||||||
for rom in self.ow.scan():
|
|
||||||
if rom[0] == 0x28:
|
|
||||||
self.roms += [rom]
|
|
||||||
return self.roms
|
|
||||||
|
|
||||||
def start_measure(self):
|
def convert_temp(self):
|
||||||
if not self.ow.reset():
|
if not self.ow.reset():
|
||||||
return False
|
raise OneWireError
|
||||||
self.ow.write_byte(CMD_SKIPROM)
|
self.ow.writebyte(SKIP_ROM)
|
||||||
self.ow.write_byte(THERM_CMD_CONVERTTEMP)
|
self.ow.writebyte(CONVERT)
|
||||||
return True
|
|
||||||
|
|
||||||
def get_temp(self, rom):
|
def read_scratch(self, rom):
|
||||||
if not self.ow.reset():
|
if not self.ow.reset():
|
||||||
return None
|
raise OneWireError
|
||||||
|
|
||||||
self.ow.select_rom(rom)
|
self.ow.select_rom(rom)
|
||||||
self.ow.write_byte(THERM_CMD_RSCRATCHPAD)
|
self.ow.writebyte(RD_SCRATCH)
|
||||||
|
buf = self.ow.read(9)
|
||||||
buf = self.ow.read_bytes(9)
|
|
||||||
if self.ow.crc8(buf):
|
if self.ow.crc8(buf):
|
||||||
return None
|
raise OneWireError
|
||||||
|
return buf
|
||||||
|
|
||||||
return self._convert_temp(buf)
|
def write_scratch(self, rom, buf):
|
||||||
|
if not self.ow.reset():
|
||||||
|
raise OneWireError
|
||||||
|
self.ow.select_rom(rom)
|
||||||
|
self.ow.writebyte(WR_SCRATCH)
|
||||||
|
self.ow.write(buf)
|
||||||
|
|
||||||
def _convert_temp(self, data):
|
def read_temp(self, rom):
|
||||||
temp_lsb = data[0]
|
buf = self.read_scratch(rom)
|
||||||
temp_msb = data[1]
|
return (buf[1] << 8 | buf[0]) / 16
|
||||||
return (temp_msb << 8 | temp_lsb) / 16
|
|
||||||
|
|
||||||
# connect 1-wire temp sensors to GPIO12 for this test
|
|
||||||
def test():
|
|
||||||
import time
|
|
||||||
import machine
|
|
||||||
dat = machine.Pin(12)
|
|
||||||
ow = OneWire(dat)
|
|
||||||
|
|
||||||
ds = DS18B20(ow)
|
|
||||||
roms = ow.scan()
|
|
||||||
print('found devices:', roms)
|
|
||||||
|
|
||||||
for i in range(4):
|
|
||||||
print('temperatures:', end=' ')
|
|
||||||
ds.start_measure()
|
|
||||||
time.sleep_ms(750)
|
|
||||||
for rom in roms:
|
|
||||||
print(ds.get_temp(rom), end=' ')
|
|
||||||
print()
|
|
||||||
|
Loading…
Reference in New Issue
Block a user