This commit is contained in:
Norbert
2024-03-01 09:13:14 +01:00
parent 8d2d50a0ee
commit a2006baf04
14 changed files with 1654 additions and 600 deletions

BIN
source/src/.DS_Store vendored Normal file

Binary file not shown.

View File

@@ -0,0 +1,2 @@
# Version of the Solax RTU package
__version__ = "0.0.6"

File diff suppressed because it is too large Load Diff

168
source/src/solaxx3/rs485.py Normal file
View File

@@ -0,0 +1,168 @@
from typing import Any
from pymodbus.client import ModbusSerialClient
from datetime import date, datetime, timedelta
from struct import *
from solaxx3.registers import SolaxRegistersInfo
from time import sleep, perf_counter
class SolaxX3:
connected: bool = False
def __init__(
self,
method="rtu",
port="/dev/ttyUSB0",
baudrate=115200,
timeout=3,
parity="N",
stopbits=1,
bytesize=8,
) -> None:
self._input_registers_values_list = []
self._holding_registers_values_list = []
self.client = ModbusSerialClient(
method=method,
port=port,
baudrate=baudrate,
timeout=timeout,
parity=parity,
stopbits=stopbits,
bytesize=bytesize,
)
def connect(self) -> bool:
self.connected = self.client.connect()
return self.connected
def _join_msb_lsb(self, msb: int, lsb: int) -> int:
return (msb << 16) | lsb
def _unsigned16(self, type: str, addr: int, count: int = 1, unit: int = 1) -> int:
self._input_registers_values_list
if type == "input":
return self._input_registers_values_list[addr]
elif type == "holding":
return self._holding_registers_values_list[addr]
def _readRegisterRange(
self, type: str, addr: int, count: int = 1, unit: int = 1
) -> list:
if type == "input":
return self._input_registers_values_list[addr : addr + count]
elif type == "holding":
return self._holding_registers_values_list[addr : addr + count]
def _twos_complement(self, number: int, bits: int) -> int:
"""
Compute the 2's complement of the int value val
"""
# if sign bit is set e.g., 8bit: 128-255
if (number & (1 << (bits - 1))) != 0:
# compute negative value
number = number - (1 << bits)
return number
def _read_register(self, register_type: str, register_info: dict) -> Any:
"""Read the values from a register based on length and sign
Parameters:
register_info:dict - dictionary with register definition fields
"""
if "int" in register_info["data_format"]:
if register_info["data_length"] == 1:
val = self._unsigned16(register_type, register_info["address"])
if register_info["data_length"] == 2:
val = self._join_msb_lsb(
self._unsigned16(register_type, register_info["address"] + 1),
self._unsigned16(register_type, register_info["address"]),
)
if register_info["signed"]:
val = self._twos_complement(val, register_info["data_length"] * 16)
val = val / register_info["si_adj"]
elif "varchar" in register_info["data_format"]:
block = self._readRegisterRange(
register_type, register_info["address"], register_info["data_length"]
)
sn = []
for i in range(register_info["data_length"]):
first_byte, second_byte = unpack(
"BB", int.to_bytes(block[i], 2, "little")
)
if not second_byte == 0x0:
sn.append(chr(second_byte))
if not first_byte == 0x0:
sn.append(chr(first_byte))
val = "".join(sn)
elif "datetime" in register_info["data_format"]:
sec, min, hr, day, mon, year = self._readRegisterRange(
register_type, register_info["address"], register_info["data_length"]
)
inverter_datetime = (
f"{(year+2000):02}-{mon:02}-{day:02} {hr:02}:{min:02}:{sec:02}"
)
val = datetime.strptime(inverter_datetime, "%Y-%m-%d %H:%M:%S")
return val
def read_register(self, register_info: dict) -> tuple:
"""Read the values from a register based on length and sign
Parameters:
register_info:dict - dictionary with register definition fields
"""
val = self._read_register(register_info["register_type"], register_info)
if not "data_unit" in register_info:
return (val, "N/A")
return (val, register_info["data_unit"])
def read(self, name: str):
"""Retrieve the value for the register with the provided name"""
r = SolaxRegistersInfo()
register_info = r.get_register_info(name)
value = self.read_register(register_info)
return value
def list_register_names(self):
r = SolaxRegistersInfo()
return r.list_register_names()
def read_all_registers(self) -> None:
self._input_registers_values_list = []
self._holding_registers_values_list = []
read_block_length = 100
for i in range(3):
address = i * read_block_length
values_list = self.client.read_input_registers(
address=address, count=read_block_length, slave=1
).registers
self._input_registers_values_list.extend(values_list)
for i in range(3):
address = i * read_block_length
values_list = self.client.read_holding_registers(
address=address, count=read_block_length, slave=1
).registers
self._holding_registers_values_list.extend(values_list)