update to 1.0.2

This commit is contained in:
Norbert
2024-07-12 12:13:55 +02:00
parent 3a0fc1f9cd
commit 577596d9f3
44 changed files with 5860 additions and 1957 deletions

BIN
source/.DS_Store vendored

Binary file not shown.

View File

@@ -1,3 +1,7 @@
![Build badge](https://github.com/mkfam7/solaxx3/actions/workflows/python-package.yml/badge.svg)
# solax-x3
#### Read in real-time all parameters provided by Solax X3 solar inverter via its Modbus S-485 serial interface.
@@ -21,7 +25,7 @@ pip install solaxx3
```
from solaxx3.rs485 import SolaxX3
from solaxx3.solaxx3 import SolaxX3
# adjust the serial port and baud rate as necessary
s = SolaxX3(port="/dev/ttyUSB0", baudrate=115200)

View File

@@ -1,11 +0,0 @@
Metadata-Version: 2.1
Name: UNKNOWN
Version: 0.0.0
Summary: UNKNOWN
Home-page: UNKNOWN
License: UNKNOWN
Platform: UNKNOWN
License-File: LICENSE
UNKNOWN

View File

@@ -1,8 +0,0 @@
LICENSE
README.md
pyproject.toml
setup.py
UNKNOWN.egg-info/PKG-INFO
UNKNOWN.egg-info/SOURCES.txt
UNKNOWN.egg-info/dependency_links.txt
UNKNOWN.egg-info/top_level.txt

View File

@@ -1 +0,0 @@

View File

@@ -1,108 +0,0 @@
from datetime import datetime, timedelta
from pymodbus.client import ModbusSerialClient
from solaxx3.rs485 import SolaxX3
import mysql.connector
s = SolaxX3(port="/dev/ttyUSB0", baudrate=115200)
database_ip = "172.17.7.77"
if s.connect():
s.read_all_registers()
# read the stats from the inverter
battery_capacity = s.read("battery_capacity")[0]
feed_in_today = s.read("feed_in_energy_today")[0]
consumtion_today = s.read("consumption_energy_today")[0]
battery_charging = s.read("battery_power_charge1")[0]
grid_voltage_r = s.read("grid_voltage_r")[0]
grid_voltage_s = s.read("grid_voltage_s")[0]
grid_voltage_t = s.read("grid_voltage_t")[0]
run_mode = s.read("run_mode")[0]
time_count_down = s.read("time_count_down")[0]
inverter_ac_power = s.read("grid_power")[0]
etoday_togrid = s.read("energy_to_grid_today")[0]
solar_energy_today = s.read("solar_energy_today")[0]
echarge_today = s.read("echarge_today")[0]
energy_from_grid = s.read("energy_from_grid_meter")[0]
energy_to_grid = s.read("energy_to_grid_meter")[0]
power_to_ev = s.read("power_to_ev")[0]
feed_in_power = s.read("feed_in_power")[0]
output_energy_charge = s.read("output_energy_charge")[0]
output_energy_today = s.read("output_energy_charge_today")[0]
input_energy_today = s.read("input_energy_charge_today")[0]
power_dc1 = s.read("power_dc1")[0]
power_dc2 = s.read("power_dc2")[0]
total_power = power_dc1 + power_dc2
uploadTime = s.read("rtc_datetime")[0]
uploadDate = uploadTime.date()
timezone_difference_from_utc = 2
uploadTime = uploadTime - timedelta(hours=timezone_difference_from_utc, minutes=0)
# store the stats in the database
mydb = mysql.connector.connect(
host=database_ip, user="root", passwd="rootroot", database="solax"
)
mycursor = mydb.cursor()
try:
# create the sql statement
sql = """REPLACE INTO solax_local (
uploadTime,
inverter_status,
dc_solar_power,
grid_voltage_r,
grid_voltage_s,
grid_voltage_t,
battery_capacity,
battery_power,
feed_in_power,
time_count_down,
inverter_ac_power,
consumeenergy,
feedinenergy,
power_dc1,
power_dc2
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
values = (
uploadTime,
run_mode,
total_power,
grid_voltage_r,
grid_voltage_s,
grid_voltage_t,
battery_capacity,
battery_charging,
feed_in_power,
time_count_down,
inverter_ac_power,
energy_from_grid,
energy_to_grid,
power_dc1,
power_dc2,
)
mycursor.execute(sql, values)
mydb.commit()
# update daily values
sql = """REPLACE INTO solax_daily (
uploadDate,
feed_in,
total_yield
) VALUES (%s, %s, %s)
"""
values = (uploadDate, feed_in_today, etoday_togrid)
mycursor.execute(sql, values)
mydb.commit()
except mysql.connector.Error as error:
print("parameterized query failed {}".format(error))
else:
print("Cannot connect to the Modbus Server/Slave")
exit()

View File

@@ -0,0 +1,19 @@
"""Module containing an abstract class."""
from abc import ABC, abstractmethod
class DataSourceDb(ABC):
"""Abstract class representing a data source."""
@abstractmethod
def save_record(self):
"""Transfer some data."""
raise NotImplementedError
@abstractmethod
def bulk_save(self):
"""Transfer multiple records of data."""
raise NotImplementedError

View File

@@ -0,0 +1,89 @@
from typing import Any, Dict, List, Tuple
import mysql.connector
from .data_source_db import DataSourceDb
class MySQLDataSource(DataSourceDb):
def __init__(self, mysql_connection_info: Dict[str, str]) -> None:
self.user = mysql_connection_info["user"]
self.host = mysql_connection_info["host"]
self.password = mysql_connection_info["password"]
def save_record(
self,
database: str,
tablename: str,
data: Dict[str, Any],
use_obj_connection: bool = False,
close_obj_connection: bool = True,
):
query, values = self.create_query(tablename, data)
if use_obj_connection:
try:
self.cursor.execute(query, values)
if close_obj_connection:
self.db.commit()
self.db.close()
except mysql.connector.Error:
self.db.commit()
self.db.close()
raise
else:
db = mysql.connector.connect(
user=self.user,
host=self.host,
password=self.password,
database=database,
)
try:
cursor = db.cursor()
cursor.execute(query, values)
db.commit()
db.close()
except mysql.connector.Error:
db.close()
raise
def bulk_save(self, export_data: List[Dict[str, Any]]) -> None:
for index, unit in enumerate(export_data):
database, table_name, data = unit.values()
if (
not hasattr(self, "db")
or not hasattr(self, "cursor")
or not self.db.is_connected()
):
self.db = mysql.connector.connect(
user=self.user,
host=self.host,
password=self.password,
database=database,
)
try:
self.cursor = self.db.cursor()
except:
self.db.close()
raise
self.save_record(
database, table_name, data, True, index + 1 == len(export_data)
)
def create_query(self, table_name: str, data: dict) -> Tuple[str, list]:
columns = list(data.keys())
values = list(data.values())
query = (
f"REPLACE INTO {table_name} ("
+ ", ".join(columns)
+ ") VALUES ("
+ ", ".join(["%s"] * len(columns))
+ ")"
)
return (query, values)

View File

@@ -0,0 +1,93 @@
"""Example of reading and saving some inverter registers."""
import sys
from datetime import datetime, timedelta
from os import environ
from solaxx3.solaxx3 import SolaxX3
from .mysql_data_source import MySQLDataSource
def _get_datetime(inverter_time: datetime) -> datetime:
return inverter_time - timedelta(hours=TIMEZONE_OFFSET)
MYSQL_CONNECTION_INFO = {
"user": environ["MYSQL_DB_USERNAME"],
"host": environ["MYSQL_DB_HOST_IP"],
"password": environ["MYSQL_DB_PASSWORD"],
}
DATABASE = environ["MYSQL_DB_DATABASE"]
TIMEZONE_OFFSET = 2
s = SolaxX3(port="/dev/ttyUSB0", baudrate=115200)
if not s.connect():
print("Could not connect to inverter")
sys.exit(1)
s.read_all_registers()
mysql_export_data = [
{
"database": DATABASE,
"table": "solax_local",
"data": {
"uploadTime": _get_datetime(s.read("rtc_datetime")[0]),
"inverter_status": s.read("run_mode")[0],
"dc_solar_power": s.read("power_dc1")[0] + s.read("power_dc2")[0],
"grid_voltage_r": s.read("grid_voltage_r")[0],
"grid_voltage_s": s.read("grid_voltage_s")[0],
"grid_voltage_t": s.read("grid_voltage_t")[0],
"battery_capacity": s.read("battery_capacity")[0],
"battery_power": s.read("battery_power_charge1")[0],
"feed_in_power": s.read("feed_in_power")[0],
"time_count_down": s.read("time_count_down")[0],
"inverter_ac_power": s.read("grid_power")[0],
"consumeenergy": s.read("energy_from_grid_meter")[0],
"feedinenergy": s.read("energy_to_grid_meter")[0],
"power_dc1": s.read("power_dc1")[0],
"power_dc2": s.read("power_dc2")[0],
"inv_volt_r": s.read("inv_volt_r")[0],
"inv_volt_s": s.read("inv_volt_s")[0],
"inv_volt_t": s.read("inv_volt_t")[0],
"off_grid_power_active_r": s.read("off_grid_power_active_r")[0],
"off_grid_power_active_s": s.read("off_grid_power_active_s")[0],
"off_grid_power_active_t": s.read("off_grid_power_active_t")[0],
"grid_power_r": s.read("grid_power_r")[0],
"grid_power_s": s.read("grid_power_s")[0],
"grid_power_t": s.read("grid_power_t")[0],
},
},
{
"database": DATABASE,
"table": "solax_daily",
"data": {
"uploadDate": _get_datetime(s.read("rtc_datetime")[0]).date(),
"feed_in": s.read("feed_in_energy_today")[0],
"total_yield": s.read("energy_to_grid_today")[0],
},
},
]
mysql_data_source = MySQLDataSource(MYSQL_CONNECTION_INFO)
def bulk_save():
"""Save collected data."""
mysql_data_source.bulk_save(mysql_export_data)
def _transfer(index: int, **extras) -> None:
"""Save a record of collected data."""
data_record = mysql_export_data[index]
database, tablename = data_record["database"], data_record["table"]
data = data_record["data"]
mysql_data_source.save_record(database, tablename, data, **extras)
save = bulk_save
if __name__ == "__main__":
save()

View File

@@ -1,25 +1,36 @@
CREATE TABLE `solax_daily` (
`uploadDate` date NOT NULL,
`feed_in` float(6,1) DEFAULT NULL,
`total_yield` float(6,1) DEFAULT NULL,
PRIMARY KEY (`uploadDate`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
CREATE TABLE solax_daily (
uploadDate DATE NOT NULL,
feed_in FLOAT NULL,
total_yield FLOAT NULL
)
ENGINE=InnoDB
DEFAULT CHARSET=latin1;
CREATE TABLE `solax_local` (
`uploadTime` datetime NOT NULL,
`inverter_status` tinyint(4) DEFAULT NULL,
`dc_solar_power` smallint(6) DEFAULT NULL,
`grid_voltage_r` smallint(6) DEFAULT NULL,
`grid_voltage_s` smallint(6) DEFAULT NULL,
`grid_voltage_t` smallint(6) DEFAULT NULL,
`battery_capacity` tinyint(4) DEFAULT NULL,
`battery_power` smallint(6) DEFAULT NULL,
`feed_in_power` smallint(6) DEFAULT NULL,
`time_count_down` smallint(6) DEFAULT NULL,
`inverter_ac_power` smallint(6) DEFAULT NULL,
`consumeenergy` float(7,1) DEFAULT NULL,
`feedinenergy` float(7,1) DEFAULT NULL,
`power_dc1` smallint(6) DEFAULT NULL,
`power_dc2` smallint(6) DEFAULT NULL,
PRIMARY KEY (`uploadTime`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
CREATE TABLE solax_local (
uploadTime DATETIME NOT NULL,
inverter_status TINYINT NULL,
dc_solar_power SMALLINT NULL,
grid_voltage_r SMALLINT NULL,
grid_voltage_s SMALLINT NULL,
grid_voltage_t SMALLINT NULL,
battery_capacity TINYINT NULL,
battery_power SMALLINT NULL,
feed_in_power SMALLINT NULL,
time_count_down SMALLINT NULL,
inverter_ac_power SMALLINT NULL,
consumeenergy FLOAT NULL,
feedinenergy FLOAT NULL,
power_dc1 SMALLINT NULL,
power_dc2 SMALLINT NULL,
inv_volt_r SMALLINT NULL,
inv_volt_s SMALLINT NULL,
inv_volt_t SMALLINT NULL,
off_grid_power_active_r INTEGER NULL,
off_grid_power_active_s INTEGER NULL,
off_grid_power_active_t INTEGER NULL,
grid_power_r INTEGER NULL,
grid_power_s INTEGER NULL,
grid_power_t INTEGER NULL
)
ENGINE=InnoDB
DEFAULT CHARSET=latin1;

Binary file not shown.

BIN
source/dist/solaxx3-1.0.2.tar.gz vendored Normal file

Binary file not shown.

View File

@@ -1,22 +1,21 @@
"""Sample program for reading and saving some inverter register values."""
#from solaxx3.solaxx3 import SolaxX3
from solaxx3.rs485 import SolaxX3
# adjust the serial port and baud rate as necessary
s = SolaxX3(port="/dev/ttyUSB0", baudrate=115200)
s = SolaxX3(port="/dev/ttyUSB0", baudrate=19200)
if s.connect():
s.read_all_registers()
print(s._input_registers_values_list)
exit()
available_stats = s.list_register_names()
for stat in available_stats:
print(stat, f" {s.read(stat)}")
battery_temperature = s.read("temperature_battery")
print(f"\n\nBattery temperature: {s.read('temperature_battery')}")
work_mode = s.read("work_mode")
print(work_mode)
else:
print("Cannot connect to the Modbus Server/Slave")
exit()

View File

@@ -6,7 +6,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "solaxx3"
version = "0.1.0"
version = "1.0.2"
description = "Read Solax X3 inverter registers via modbus interface (RS-485)"
readme = "README.md"
authors = [{ name = "Flavius Moldovan", email = "mkfam@protonmail.com" }]
@@ -15,6 +15,11 @@ classifiers = [
"License :: OSI Approved :: MIT License",
"Programming Language :: Python",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
]
keywords = ["Solax", "solax-x3", "solaxx3", "solar inverter", "RTU", "MODBUS"]
dependencies = [

BIN
source/src/.DS_Store vendored

Binary file not shown.

View File

@@ -0,0 +1,82 @@
Metadata-Version: 2.1
Name: solaxx3
Version: 1.0.2
Summary: Read Solax X3 inverter registers via modbus interface (RS-485)
Author-email: Flavius Moldovan <mkfam@protonmail.com>
License: The MIT License (MIT)
Copyright © 2022 <copyright Flavius Moldovan>
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
Project-URL: Homepage, https://github.com/mkfam7/solaxx3
Keywords: Solax,solax-x3,solaxx3,solar inverter,RTU,MODBUS
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Requires-Python: >=3.8
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: pymodbus[serial]>=3.0.0
![Build badge](https://github.com/mkfam7/solaxx3/actions/workflows/python-package.yml/badge.svg)
# solax-x3
#### Read in real-time all parameters provided by Solax X3 solar inverter via its Modbus S-485 serial interface.
<br />
## Prerequisites
* Solax X3 inverter
* Modbus RS-485 serial adapter/interface
* [Modbus cable](https://github.com/mkfam7/solaxx3/blob/main/diagrams/rs485_cable.png)
* python version >= 3.8
* This python module
## Installation
```
pip install solaxx3
```
## Usage
```
from solaxx3.solaxx3 import SolaxX3
# adjust the serial port and baud rate as necessary
s = SolaxX3(port="/dev/ttyUSB0", baudrate=115200)
if s.connect():
s.read_all_registers()
available_stats = s.list_register_names()
for stat in available_stats:
print(stat)
battery_temperature = s.read("temperature_battery")
print(f"\n\nBattery temperature: {s.read('temperature_battery')}")
else:
print("Cannot connect to the Modbus Server/Slave")
exit()
```
Project Link: [https://github.com/mkfam7/solaxx3](https://github.com/mkfam7/solaxx3)

View File

@@ -0,0 +1,16 @@
LICENSE
README.md
pyproject.toml
setup.py
src/solaxx3/__init__.py
src/solaxx3/solax_registers_info.py
src/solaxx3/solaxx3.py
src/solaxx3/utils.py
src/solaxx3.egg-info/PKG-INFO
src/solaxx3.egg-info/SOURCES.txt
src/solaxx3.egg-info/dependency_links.txt
src/solaxx3.egg-info/requires.txt
src/solaxx3.egg-info/top_level.txt
tests/test_mysql_connector.py
tests/test_register_data.py
tests/test_solaxx3.py

View File

@@ -0,0 +1 @@
pymodbus[serial]>=3.0.0

View File

@@ -0,0 +1 @@
solaxx3

View File

@@ -1,2 +1,3 @@
# Version of the Solax RTU package
__version__ = "0.0.6"
"""Version of the Solax RTU package"""
__version__ = "1.0.2"

File diff suppressed because it is too large Load Diff

View File

@@ -1,168 +0,0 @@
from typing import Any
from pymodbus.client import ModbusSerialClient
from datetime import date, datetime, timedelta
from struct import *
from solaxx3.registers import SolaxRegistersInfo
from time import sleep, perf_counter
class SolaxX3:
connected: bool = False
def __init__(
self,
method="rtu",
port="/dev/ttyUSB0",
baudrate=115200,
timeout=3,
parity="N",
stopbits=1,
bytesize=8,
) -> None:
self._input_registers_values_list = []
self._holding_registers_values_list = []
self.client = ModbusSerialClient(
method=method,
port=port,
baudrate=baudrate,
timeout=timeout,
parity=parity,
stopbits=stopbits,
bytesize=bytesize,
)
def connect(self) -> bool:
self.connected = self.client.connect()
return self.connected
def _join_msb_lsb(self, msb: int, lsb: int) -> int:
return (msb << 16) | lsb
def _unsigned16(self, type: str, addr: int, count: int = 1, unit: int = 1) -> int:
self._input_registers_values_list
if type == "input":
return self._input_registers_values_list[addr]
elif type == "holding":
return self._holding_registers_values_list[addr]
def _readRegisterRange(
self, type: str, addr: int, count: int = 1, unit: int = 1
) -> list:
if type == "input":
return self._input_registers_values_list[addr : addr + count]
elif type == "holding":
return self._holding_registers_values_list[addr : addr + count]
def _twos_complement(self, number: int, bits: int) -> int:
"""
Compute the 2's complement of the int value val
"""
# if sign bit is set e.g., 8bit: 128-255
if (number & (1 << (bits - 1))) != 0:
# compute negative value
number = number - (1 << bits)
return number
def _read_register(self, register_type: str, register_info: dict) -> Any:
"""Read the values from a register based on length and sign
Parameters:
register_info:dict - dictionary with register definition fields
"""
if "int" in register_info["data_format"]:
if register_info["data_length"] == 1:
val = self._unsigned16(register_type, register_info["address"])
if register_info["data_length"] == 2:
val = self._join_msb_lsb(
self._unsigned16(register_type, register_info["address"] + 1),
self._unsigned16(register_type, register_info["address"]),
)
if register_info["signed"]:
val = self._twos_complement(val, register_info["data_length"] * 16)
val = val / register_info["si_adj"]
elif "varchar" in register_info["data_format"]:
block = self._readRegisterRange(
register_type, register_info["address"], register_info["data_length"]
)
sn = []
for i in range(register_info["data_length"]):
first_byte, second_byte = unpack(
"BB", int.to_bytes(block[i], 2, "little")
)
if not second_byte == 0x0:
sn.append(chr(second_byte))
if not first_byte == 0x0:
sn.append(chr(first_byte))
val = "".join(sn)
elif "datetime" in register_info["data_format"]:
sec, min, hr, day, mon, year = self._readRegisterRange(
register_type, register_info["address"], register_info["data_length"]
)
inverter_datetime = (
f"{(year+2000):02}-{mon:02}-{day:02} {hr:02}:{min:02}:{sec:02}"
)
val = datetime.strptime(inverter_datetime, "%Y-%m-%d %H:%M:%S")
return val
def read_register(self, register_info: dict) -> tuple:
"""Read the values from a register based on length and sign
Parameters:
register_info:dict - dictionary with register definition fields
"""
val = self._read_register(register_info["register_type"], register_info)
if not "data_unit" in register_info:
return (val, "N/A")
return (val, register_info["data_unit"])
def read(self, name: str):
"""Retrieve the value for the register with the provided name"""
r = SolaxRegistersInfo()
register_info = r.get_register_info(name)
value = self.read_register(register_info)
return value
def list_register_names(self):
r = SolaxRegistersInfo()
return r.list_register_names()
def read_all_registers(self) -> None:
self._input_registers_values_list = []
self._holding_registers_values_list = []
read_block_length = 100
for i in range(3):
address = i * read_block_length
values_list = self.client.read_input_registers(
address=address, count=read_block_length, slave=1
).registers
self._input_registers_values_list.extend(values_list)
for i in range(3):
address = i * read_block_length
values_list = self.client.read_holding_registers(
address=address, count=read_block_length, slave=1
).registers
self._holding_registers_values_list.extend(values_list)

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,198 @@
"""Class loading and storing data from the inverter."""
from datetime import datetime
from struct import unpack
from typing import Dict, Iterable, List, Literal, Tuple, Union
from pymodbus.client import ModbusSerialClient
from .solax_registers_info import FIELD_VALUES, FIELDS, SolaxRegistersInfo
from .utils import join_msb_lsb, twos_complement
REGISTER_VALUE = Union[float, str, datetime]
REGISTER_INFO = Dict[FIELDS, FIELD_VALUES]
class SolaxX3:
"""
Class interacting with values from the inverter.
Initialization parameters:
- method (default: rtu)
- port (default: /dev/ttyUSB0 )
- baudrate: Bits per second (default: 115,200 )
- timeout: Timeout for a request, in seconds. (default: 3)
- parity: 'E'ven, 'O'dd, 'N'one (default: N)
- stopbits: Number of stop bits 0-2 (default: 1)
- bytesize: Number of bits per byte (7 or 8) (default: 8)
"""
connected: bool = False
READ_BLOCK_LENGTH = 100
def __init__(
self,
method: str = "rtu",
port: str = "/dev/ttyUSB0",
baudrate: int = 115200,
timeout: int = 3,
parity: Literal["E", "O", "N"] = "N",
stopbits: Literal[0, 1, 2] = 1,
bytesize: Literal[7, 8] = 8,
) -> None:
self._input_registers_values: List[int] = []
self._holding_registers_values: List[int] = []
self.client: ModbusSerialClient = ModbusSerialClient(
method=method,
port=port,
baudrate=baudrate,
timeout=timeout,
parity=parity,
stopbits=stopbits,
bytesize=bytesize,
)
def connect(self) -> bool:
"""Connect to the inverter and return if it was successful."""
self.connected: bool = self.client.connect()
return self.connected
def _get_unsigned_16(self, value_type: str, address: int) -> int:
if value_type == "input":
return self._input_registers_values[address]
return self._holding_registers_values[address]
def _read_register_range(
self, value_type: str, address: int, count: int = 1
) -> list:
if value_type == "input":
return self._input_registers_values[address : address + count]
return self._holding_registers_values[address : address + count]
def _read_format_register_value(
self, register_info: REGISTER_INFO
) -> REGISTER_VALUE:
"""Read the values from a register based on length and sign
Parameters:
register_info:dict - dictionary with register definition fields
"""
if self._is_register_type_integer(register_info):
value = self._get_integer_value(register_info)
value = value / register_info["si_adj"]
elif self._is_register_type_string(register_info):
value = self._get_string_value(register_info)
else:
value = self._get_datetime_value(register_info)
return value
def _get_datetime_value(self, register_info: REGISTER_INFO) -> datetime:
register_type = register_info["register_type"]
sec, minute, hr, day, mon, year = self._read_register_range(
register_type, register_info["address"], register_info["data_length"]
)
inverter_datetime = f"{year:02}-{mon:02}-{day:02} {hr:02}:{minute:02}:{sec:02}"
value = datetime.strptime(inverter_datetime, "%y-%m-%d %H:%M:%S")
return value
def _is_register_type_datetime(self, register_info: REGISTER_INFO) -> bool:
return "datetime" in register_info["data_format"]
def _get_string_value(self, register_info: REGISTER_INFO) -> str:
characters: List[str] = []
register_type: Literal["input", "holding"] = register_info["register_type"]
block = self._read_register_range(
register_type, register_info["address"], register_info["data_length"]
)
for i in range(register_info["data_length"]):
first_byte, second_byte = unpack("BB", int.to_bytes(block[i], 2, "little"))
if not second_byte == 0x0:
characters.append(chr(second_byte))
if not first_byte == 0x0:
characters.append(chr(first_byte))
return "".join(characters)
def _is_register_type_string(self, register_info: REGISTER_INFO) -> bool:
return "varchar" in register_info["data_format"]
def _get_integer_value(self, register_info: REGISTER_INFO) -> int:
register_type = register_info["register_type"]
val = 0
if register_info["data_length"] == 1:
val = self._get_unsigned_16(register_type, register_info["address"])
if register_info["data_length"] == 2:
val = join_msb_lsb(
self._get_unsigned_16(register_type, register_info["address"] + 1),
self._get_unsigned_16(register_type, register_info["address"]),
)
if register_info["signed"]:
val = twos_complement(val, register_info["data_length"] * 16)
return val
def _is_register_type_integer(self, register_info: REGISTER_INFO) -> bool:
return "int" in register_info["data_format"]
def _read_register_value(
self, register_info: REGISTER_INFO
) -> Tuple[REGISTER_VALUE, str]:
"""Read the values from a register based on length and sign.
Parameters:
:param register_info: dictionary with register definition fields
:return: Tuple containing the value and data unit
"""
val = self._read_format_register_value(register_info)
return (val, register_info["data_unit"])
def read(self, name: str) -> Tuple[REGISTER_VALUE, str]:
"""Retrieve the value for the register with the provided name"""
registers = SolaxRegistersInfo()
register_info = registers.get_register_info(name)
value_data_unit = self._read_register_value(register_info)
return value_data_unit
def list_register_names(self) -> list:
"""Return all registers defined in register info."""
r = SolaxRegistersInfo()
return r.list_register_names()
def read_all_registers(self) -> None:
"""Read all register values from inverter."""
self._input_registers_values: List[int] = []
self._holding_registers_values: List[int] = []
self._read_input_registers()
self._read_holding_registers()
def _read_holding_registers(self):
for count in range(4):
address: int = count * self.READ_BLOCK_LENGTH
values: Iterable = self.client.read_holding_registers(
address=address, count=self.READ_BLOCK_LENGTH, slave=1
).registers
self._holding_registers_values.extend(values)
def _read_input_registers(self):
for count in range(4):
address: int = count * self.READ_BLOCK_LENGTH
values: Iterable = self.client.read_input_registers(
address=address, count=self.READ_BLOCK_LENGTH, slave=1
).registers
self._input_registers_values.extend(values)

View File

@@ -0,0 +1,18 @@
"""SolaxX3 utils module performing binary operations."""
def join_msb_lsb(msb: int, lsb: int) -> int:
"""Join two 16-bit registers into a 32-bit register."""
return (msb << 16) | lsb
def twos_complement(number: int, bits: int) -> int:
"""Compute the 2's complement of the provided integer."""
# if sign bit is set e.g., 8bit: 128-255
if (number & (1 << (bits - 1))) != 0:
# compute negative value
number = number - (1 << bits)
return number

View File

@@ -0,0 +1,361 @@
"""The formatted register values."""
import datetime
altered_input_register_values = [
236.4,
4.0,
1243.0,
629.5,
737.1,
1.4,
0.6,
49.96,
44.0,
2.0,
940.0,
484.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
428.1,
0.2,
128.0,
1.0,
29.0,
1.0,
0.0,
2.0,
98.0,
3914.1,
0.4,
4535.7,
8.2,
5.2,
30.0,
12288.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
20792.6,
3776.9,
0.0,
0.0,
0.0,
0.0,
56.9,
33161.0,
1.0,
0.0,
0.0,
0.0,
0.0,
235.9,
4.0,
937.0,
49.97,
232.3,
1.3,
45.0,
49.98,
235.4,
1.5,
261.0,
49.98,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
2.0,
2.0,
13476.8,
107.5,
69.1,
0.0,
0.0,
0.2,
35229.0,
68.0,
41.3,
1.75,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
21.3,
19.0,
3.344,
3.329,
0.0,
94.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
-1247.0,
-1.0,
2338.0,
-9973.0,
5000.0,
-5000.0,
0.0,
0.0,
245.0,
12042.0,
0.0,
0.0,
100.0,
40.0,
4.0,
0.0,
1.0,
]
altered_holding_register_values = [
"H34T15H9022043",
"Solax ",
" ",
900.0,
15.0,
900.0,
195.5,
264.5,
49.5,
50.5,
1.0,
0.0,
259.9,
195.5,
264.5,
47.5,
51.5,
100.0,
0.0,
1.0,
1.0,
1.0,
0.94,
0.9,
20.0,
50.0,
80.0,
100.0,
1.05,
1.0,
6.0,
0.0,
0.0,
213.9,
223.1,
236.9,
246.1,
0.0,
10.0,
0.0,
0.0,
20.0,
5.0,
0.0,
7500.0,
-7500.0,
47.55,
50.05,
195.5,
253.0,
0.0,
0.0,
47.55,
50.05,
195.5,
253.0,
0.0,
1.0,
900.0,
29.0,
0.0,
1.0,
1.0,
6.0,
27.0,
8.0,
datetime.datetime(2024, 3, 19, 14, 58, 43),
0.0,
0.0,
1.0,
0.0,
0.0,
20.0,
21.0,
0.0,
30.0,
0.0,
30.0,
0.0,
0.0,
0.0,
245.0,
0.0,
"SPNXLAQHWX",
1.0,
0.0,
0.0,
0.0,
60000.0,
1500.0,
0.0,
20.0,
0.0,
15000.0,
0.0,
0.0,
0.3,
0.1,
0.1,
0.1,
0.3,
0.01,
0.01,
0.01,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
32.0,
1.0,
50.2,
5.0,
0.0,
0.0,
0.0,
0.0,
2014.0,
49.8,
2.0,
0.005,
0.0,
30.0,
50.2,
49.8,
0.0,
0.0,
0.0,
0.0,
3000.0,
900.0,
900.0,
3000.0,
3000.0,
1.0,
0.09,
220.0,
253.0,
257.6,
207.0,
0.1,
0.0,
100.0,
100.0,
100.0,
0.0,
1.0,
0.0,
0.0,
0.0,
0.0,
0.0,
3.0,
1.0,
0.0,
1.0,
1.0,
2.0,
0.0,
0.0,
0.0,
1.0,
0.0,
0.0,
0.0,
1.0,
0.0,
266.8,
181.8,
0.0,
0.0,
0.0,
0.0,
30.0,
0.0,
0.0,
0.0,
2000.0,
80.0,
1000.0,
40.0,
30.0,
800.0,
0.0,
0.0,
0.0,
0.0,
0.0,
50.0,
0.0,
]

View File

@@ -0,0 +1,51 @@
"""Module containing a connection class."""
from pathlib import Path
from .cursor import Cursor
from .error import Error
open_connections_file = Path(__file__).parent / "open_connections.txt"
class Connection:
"""Class mocking a connection to a MySQL server."""
def __init__(self, *args, consider_open=True, **kwargs):
self.open_connections += consider_open
self.open: bool = consider_open
def cursor(self) -> Cursor:
"""Return the cursor for the connection."""
self._cursor = Cursor()
return self._cursor
def close(self):
"""Close connection."""
if not self.open:
raise Error("Connection already closed")
self.open = False
self.open_connections -= 1
def commit(self):
"""Save all changes to the database."""
def is_connected(self) -> bool:
"""Return if the connection is open."""
return self.open
@property
def open_connections(self):
with open(open_connections_file, "r", encoding="utf-8") as f:
contents = f.read()
return int(contents)
@open_connections.setter
def open_connections(self, value: int):
value = value if value > 0 else 0
with open(open_connections_file, "w", encoding="utf-8") as f:
f.write(str(value))

View File

@@ -0,0 +1,10 @@
"""Module with mock implementation of interacting with a MySQL server."""
from .connection import Connection
from .error import Error
def connect(*args, **kwargs) -> Connection:
"""Mock connect to a MySQL server."""
return Connection()

View File

@@ -0,0 +1,56 @@
"""Module with cursor implementation."""
import json
from pathlib import Path
from .error import Error
config_file = Path(__file__).parent / "cursor_config.json"
class Cursor:
"""Cursor of a MySQL connection."""
def __init__(self, raise_err: bool = True):
if self._raise_error_in_init and raise_err:
raise Error
def execute(self, *args, **kwargs) -> None:
"""Mock execute a query."""
if self._raise_error:
raise Error
def executemany(self, *args, **kwargs) -> None:
"""Mock execute a query multiple times with different data."""
if self._raise_error:
raise Error
@property
def _raise_error(self):
return self._read_configs()["raise_error"]
@property
def _raise_error_in_init(self):
return self._read_configs()["raise_error_in_init"]
@_raise_error.setter
def _raise_error(self, value: int):
data = self._read_configs()
data["raise_error"] = value
self._save_configs(data)
@_raise_error_in_init.setter
def _raise_error_in_init(self, value: int):
data = self._read_configs()
data["raise_error_in_init"] = value
self._save_configs(data)
def _read_configs(self):
with open(config_file, "r", encoding="utf-8") as f:
return json.loads(f.read())
def _save_configs(self, data):
with open(config_file, "w", encoding="utf-8") as f:
f.write(json.dumps(data, indent=4))

View File

@@ -0,0 +1,4 @@
{
"raise_error_in_init": false,
"raise_error": false
}

View File

@@ -0,0 +1,5 @@
"""Module containing the basic error class of this package."""
class Error(Exception):
"""Basic error class of this package."""

View File

@@ -0,0 +1 @@
0

View File

@@ -0,0 +1,30 @@
"""Class to mimick `pymodbus.client`. It is used when running the tests."""
from collections import namedtuple
from .registers_output import raw_holding_register_values, raw_input_register_values
from .utils import getnext
Registers = namedtuple("Registers", ["registers"])
class ModbusSerialClient:
"""Class mimicking `pymodbus.client.ModbusSerialClient`."""
def __init__(self, *_, **__) -> None:
self._holding_registers_gen = getnext(raw_holding_register_values)
self._input_registers_gen = getnext(raw_input_register_values)
def connect(self):
"""Mimick connecting the inverter and return the success code."""
return True
def read_holding_registers(self, count, *_, **__):
"""Read holding register values from inverter."""
return Registers([next(self._holding_registers_gen) for _ in range(count)])
def read_input_registers(self, count, *_, **__):
"""Read input register values from inverter."""
return Registers([next(self._input_registers_gen) for _ in range(count)])

View File

@@ -0,0 +1,806 @@
"""Raw register values, just read from the inverter."""
raw_input_register_values = [
2364,
40,
1243,
6295,
7371,
14,
6,
4996,
44,
2,
940,
484,
0,
0,
0,
0,
0,
0,
0,
0,
4281,
2,
128,
1,
29,
1,
0,
2,
98,
39141,
0,
0,
4,
45357,
0,
82,
52,
300,
12288,
0,
1,
0,
52,
10,
1,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
47644,
31,
50010,
5,
0,
0,
0,
0,
569,
0,
3930,
5,
1,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
2359,
40,
937,
4997,
2323,
13,
45,
4998,
2354,
15,
261,
4998,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
2,
0,
2,
0,
3696,
2,
1075,
0,
0,
0,
691,
0,
0,
0,
2,
0,
24610,
5,
680,
0,
4130,
0,
175,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
213,
190,
3344,
3329,
0,
94,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
64289,
65535,
65535,
65535,
2338,
0,
55563,
65535,
5000,
0,
60536,
65535,
0,
0,
0,
0,
245,
0,
12042,
0,
0,
0,
100,
40,
4,
0,
1,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
]
raw_holding_register_values = [
18483,
13396,
12597,
18489,
12338,
12848,
13363,
21359,
27745,
30752,
8224,
8224,
8224,
8224,
8224,
8224,
8224,
8224,
8224,
8224,
8224,
1800,
900,
15,
900,
1955,
2645,
4950,
5050,
1,
0,
2599,
1955,
2645,
4750,
5150,
275,
100,
0,
100,
100,
100,
94,
90,
20,
50,
80,
100,
105,
100,
6,
0,
0,
2139,
2231,
2369,
2461,
0,
10,
0,
0,
20,
5,
0,
7500,
58036,
4755,
5005,
1955,
2530,
0,
0,
4755,
5005,
1955,
2530,
0,
1,
900,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
29,
0,
1,
1,
0,
6,
27,
8,
43,
58,
14,
19,
3,
24,
0,
0,
1,
0,
0,
200,
210,
0,
10240,
30,
25610,
12830,
0,
0,
0,
15127,
0,
0,
0,
0,
0,
30,
0,
0,
0,
0,
0,
0,
0,
2450,
0,
21328,
20056,
19521,
20808,
22360,
1,
0,
0,
0,
0,
0,
60000,
1500,
0,
20,
0,
15000,
0,
0,
300,
100,
100,
100,
300,
10,
10,
10,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
32,
0,
0,
1,
5020,
5,
0,
0,
0,
0,
2014,
4980,
2,
5,
0,
30,
5020,
4980,
0,
0,
0,
0,
30000,
900,
900,
30000,
30000,
1000,
900,
2200,
2530,
2576,
2070,
10,
0,
100,
100,
100,
0,
10,
1,
0,
0,
0,
0,
0,
3,
1,
0,
1,
1,
2,
0,
0,
0,
100,
0,
1,
0,
0,
0,
0,
1,
0,
2668,
1818,
0,
0,
0,
0,
0,
0,
30,
0,
0,
0,
2000,
80,
1000,
40,
30,
800,
0,
0,
0,
0,
0,
0,
0,
0,
0,
500,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
20,
95,
1000,
30,
60,
0,
15127,
0,
2000,
0,
0,
2000,
30,
80,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
]

View File

@@ -0,0 +1,11 @@
"""Mock `pymodbus` utils."""
from typing import Any
def getnext(iterable: list, default: Any = 0):
"""Yield one item at a time and yield `default` when iterable is exhausted."""
yield from iterable
while True:
yield default

View File

@@ -0,0 +1,68 @@
import sys
from os import environ
from pathlib import Path
from unittest import TestCase
from tests.mock_packages.mysql.connection import Connection
from tests.mock_packages.mysql.cursor import Cursor
environ["MYSQL_DB_USERNAME"] = ""
environ["MYSQL_DB_HOST_IP"] = ""
environ["MYSQL_DB_PASSWORD"] = ""
environ["MYSQL_DB_DATABASE"] = ""
current: Path = Path().resolve()
paths_to_extend = [current / "tests" / "mock_packages", current / "src"]
for path in paths_to_extend:
path_as_string = str(path)
if path_as_string not in sys.path:
sys.path.append(path_as_string)
from database import read_and_save
class MysqlConnectionLeakageTests(TestCase):
"""Tests to see if the class leaves any connections open after its processes."""
def _open_connections(self):
return Connection(consider_open=False).open_connections
def setUp(self) -> None:
"""Set up the data after each test case."""
Cursor(raise_err=False)._raise_error = False
Cursor(raise_err=False)._raise_error_in_init = False
Connection().open_connections = 0
def test_bulk_transfer_without_error(self):
"""Test if `bulk_transfer` leaves any hanging connections in normal operations."""
for _ in range(3):
read_and_save.bulk_save()
self.assertEqual(self._open_connections(), 0)
def test_bulk_transfer_with_error_in_cursor_init(self):
"""Test if `bulk_transfer` leaves any hanging connections with errors when instantiating
cursor."""
Cursor()._raise_error_in_init = True
for _ in range(3):
try:
read_and_save.bulk_save()
except:
pass
self.assertEqual(self._open_connections(), 0)
def test_bulk_transfer_with_error_in_cursor(self):
"""Test if `bulk_transfer` leaves any hanging connections with errors when executing
a query in the cursor."""
Cursor()._raise_error = True
for _ in range(3):
try:
read_and_save.bulk_save()
except:
pass
self.assertEqual(self._open_connections(), 0)

View File

@@ -0,0 +1,254 @@
"""
Test scenarios:
Report any of the following anomalies:
1. Not valid dictionary syntax [done]
2. Dictionary key duplicates [done]
3. "address" duplicates
4. The value field has different keys than: [done]
* address
* register_type
* data_format
* si_adj
* signed
* data_unit
* data_length
* description
5. "dataformat" starts with 'uint' but "signed" is different than "False"
6. "dataformat" starts with 'int' but "signed" is different than "True"
7. If "data_format" contains "int" in the name,
"data_length" != <the number inside "data_format"> / 16 or 1 if no number is found
in "data_length"
8. "data_unit" is not one of following:
{%, A, C, Hx, KWh, N/A, V, VA, Var, W, Wh, hour, second}
Note: values are case sensitive
9. "register_type" is different than {input, holding}
Note: no values are case sensitive
10. the key name contains "volt" but the "data_unit" is different than "V"
11. the key name contains "current" but the "data_unit" is different than "A"
12. "address" overlapping
"""
import re
import sys
import unittest
from itertools import filterfalse
from pathlib import Path
from src.solaxx3.solax_registers_info import SolaxRegistersInfo
import_path = Path().resolve() / "tests" / "mock_packages"
if str(import_path) not in sys.path:
sys.path.append(str(import_path))
REGISTER_FILE = "src/solaxx3/solax_registers_info.py"
class RegisterTests(unittest.TestCase):
"""Tests for SolaxRegistersInfo."""
registers = SolaxRegistersInfo._registers
def test_tc1_valid_dictionary(self):
"""Prints an error if `registers` is a dictionary."""
self.assertIsInstance(self.registers, dict)
def test_tc2_duplicate_keys(self):
"""Prints an error if 'registers' has any duplicate keys."""
PATTERN = r"""
"(?P<key>.*?)" # The key of the dictionary
\s*: # colon
\s*{ # start of value
"""
regex = re.compile(PATTERN, re.VERBOSE)
with open(REGISTER_FILE, "r", encoding="utf-8") as dictionary:
list_keys = []
for line_number, line in filterfalse(
lambda x: x[1].lstrip().startswith("#"), enumerate(dictionary, start=1)
):
re_match = regex.search(line)
if re_match is not None:
key = re_match.group("key")
self.assertNotIn(key, list_keys, f"line={line_number}")
list_keys.append(key)
def test_tc4_correct_value_field(self):
"""Prints an error if the keys do not have the following keys:
- address, register_type, data_format, si_adj, signed, data_unit, data_length,
description"""
VALID_SUBKEYS = [
"address",
"data_format",
"data_length",
"data_unit",
"description",
"register_type",
"si_adj",
"signed",
]
for register, register_value in self.registers.items():
self.assertListEqual(
sorted(list(register_value.keys())), VALID_SUBKEYS, f"dict={register!r}"
)
def test_tc5_correctly_unsigned(self):
"""Prints an error if subkey `data_format` starts with `uint` but `signed` is
different than False."""
for register, register_value in self.registers.items():
self.assertFalse(
register_value["data_format"].startswith("uint")
and register_value["signed"] is not False,
f"not correctly signed: dict='{register}'",
)
def test_tc6_correctly_signed(self):
"""Prints an error if subkey `data_format` starts with 'int' but `signed`
is different than True."""
for register, register_value in self.registers.items():
self.assertFalse(
register_value["data_format"].startswith("int")
and register_value["signed"] is not True,
f"not correctly signed: dict='{register}'",
)
def test_tc7_correct_data_length(self):
"""Prints an error if 'data_format' does not match 'data_length'."""
PATTERN = r"int(?P<bits>\d+)"
regex = re.compile(PATTERN)
for register, register_value in self.registers.items():
match = regex.search(register_value["data_format"])
if match:
bits = int(match.group("bits"))
self.assertEqual(
bits / 16, register_value["data_length"], f"dict={register!r}"
)
def test_tc8_correct_data_unit_value(self):
"""Prints an error if `data_unit` is not %, C, Hz, KWh, N/A, V, VA, Var,
W, Wh, hour, second."""
POSSIBLE_DATA_UNITS = (
"min",
"bps",
"%",
"A",
"C",
"Hz",
"KWh",
"N/A",
"V",
"VA",
"Var",
"W",
"Wh",
"hour",
"second",
)
for register, register_value in self.registers.items():
self.assertGreater(
len(register_value["data_unit"]),
0,
f"empty 'data_unit': dict={register!r}",
)
self.assertIn(
register_value["data_unit"],
POSSIBLE_DATA_UNITS,
f"invalid data_unit: '{register_value['data_unit']}'; 'dict='{register}'",
)
def test_tc9_correct_register_type(self):
"""Prints an error if `register_type` is not 'input' or 'holding'."""
POSSIBLE_REGISTER_TYPES = ("input", "holding")
for register, register_value in self.registers.items():
self.assertIn(
register_value["register_type"],
POSSIBLE_REGISTER_TYPES,
f"dict='{register}'",
)
def test_tc10_correct_data_unit_with_volt(self):
"""Prints an error if the key contains 'volt' but `data_unit` is differrent
than 'V'."""
for register, register_value in self.registers.items():
self.assertFalse(
"volt" in register
and register_value["data_unit"] != "V"
and "percent" not in register
and "ratio" not in register,
f"'data_unit' does not match name: dict={register!r}",
)
def test_tc11_correct_data_unit_with_current(self):
"""Prints an error if the key contains 'current' but `data_unit` is
differrent than 'A'."""
for register, register_value in self.registers.items():
self.assertFalse(
"current" in register and register_value["data_unit"] != "A",
f"'data_unit' does not match name: dict='{register}'",
)
def test_tc12_correct_addresses(self):
"""Prints an error if there is any address overlapping."""
holding_addresses: list = []
input_addresses: list = []
for register, register_value in self.registers.items():
addresses = (
holding_addresses
if register_value["register_type"] == "holding"
else input_addresses
)
data_length = register_value["data_length"]
for x in range(data_length):
address = register_value["address"] + x
self.assertNotIn(address, addresses, f"dict={register!r}")
addresses.append(address)
def test_tc13_subkey_duplicates(self):
"""Prints an error if there are any subkey duplicates."""
PATTERN = r"""
"(?P<subkey>.*?)" # dictionary subkey
\s*: # colon
\s*?[^[]+$ # value different than a dict
"""
regex = re.compile(PATTERN, re.VERBOSE)
with open(REGISTER_FILE, "r", encoding="utf-8") as dictionary:
subkeys = []
for line_number, line in filterfalse(
lambda x: x[1].lstrip().startswith("#"), enumerate(dictionary, start=1)
):
match = regex.search(line)
if match:
subkey = match.group("subkey")
self.assertNotIn(subkey, subkeys, f"line={line_number}")
subkeys.append(subkey)
else:
subkeys.clear()
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,48 @@
"""Module to test solaxx3."""
import sys
import unittest
from pathlib import Path
from src.solaxx3 import solaxx3
from tests.final_result import (
altered_holding_register_values,
altered_input_register_values,
)
import_path = Path().resolve() / "tests" / "mock_packages"
if str(import_path) not in sys.path:
sys.path.append(str(import_path))
class Solaxx3Tests(unittest.TestCase):
"""Tests for SolaxX3."""
def test_connection(self):
"""Test if SolaxX3 connected."""
s = solaxx3.SolaxX3()
s.connect()
self.assertEqual(s.connected, True)
def test_format_registers(self):
"""Test if the module correctly formats raw register values."""
result = []
s = solaxx3.SolaxX3()
s.read_all_registers()
regs = solaxx3.SolaxRegistersInfo().list_register_names()
for reg in regs:
result.append(s.read(reg)[0])
self.assertEqual(
result, altered_input_register_values + altered_holding_register_values
)
def test_are_registers_identical(self):
"""Test if holding registers' values are not equal to input registers' values."""
s = solaxx3.SolaxX3()
s.read_all_registers()
self.assertFalse(s._holding_registers_values == s._input_registers_values)