185 lines
5.4 KiB
Python
185 lines
5.4 KiB
Python
"""Modbus Utilities.
|
|
|
|
A collection of utilities for packing data, unpacking
|
|
data computing checksums, and decode checksums.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
|
|
__all__ = [
|
|
"pack_bitstring",
|
|
"unpack_bitstring",
|
|
"default",
|
|
"rtuFrameSize",
|
|
]
|
|
|
|
# pylint: disable=missing-type-doc
|
|
import struct
|
|
|
|
|
|
class ModbusTransactionState: # pylint: disable=too-few-public-methods
|
|
"""Modbus Client States."""
|
|
|
|
IDLE = 0
|
|
SENDING = 1
|
|
WAITING_FOR_REPLY = 2
|
|
WAITING_TURNAROUND_DELAY = 3
|
|
PROCESSING_REPLY = 4
|
|
PROCESSING_ERROR = 5
|
|
TRANSACTION_COMPLETE = 6
|
|
RETRYING = 7
|
|
NO_RESPONSE_STATE = 8
|
|
|
|
@classmethod
|
|
def to_string(cls, state):
|
|
"""Convert to string."""
|
|
states = {
|
|
ModbusTransactionState.IDLE: "IDLE",
|
|
ModbusTransactionState.SENDING: "SENDING",
|
|
ModbusTransactionState.WAITING_FOR_REPLY: "WAITING_FOR_REPLY",
|
|
ModbusTransactionState.WAITING_TURNAROUND_DELAY: "WAITING_TURNAROUND_DELAY",
|
|
ModbusTransactionState.PROCESSING_REPLY: "PROCESSING_REPLY",
|
|
ModbusTransactionState.PROCESSING_ERROR: "PROCESSING_ERROR",
|
|
ModbusTransactionState.TRANSACTION_COMPLETE: "TRANSACTION_COMPLETE",
|
|
ModbusTransactionState.RETRYING: "RETRYING TRANSACTION",
|
|
}
|
|
return states.get(state, None)
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# Helpers
|
|
# --------------------------------------------------------------------------- #
|
|
|
|
|
|
def default(value):
|
|
"""Return the default value of object.
|
|
|
|
:param value: The value to get the default of
|
|
:returns: The default value
|
|
"""
|
|
return type(value)()
|
|
|
|
|
|
def dict_property(store, index):
|
|
"""Create class properties from a dictionary.
|
|
|
|
Basically this allows you to remove a lot of possible
|
|
boilerplate code.
|
|
|
|
:param store: The store store to pull from
|
|
:param index: The index into the store to close over
|
|
:returns: An initialized property set
|
|
"""
|
|
if hasattr(store, "__call__"):
|
|
getter = lambda self: store( # pylint: disable=unnecessary-lambda-assignment
|
|
self
|
|
)[index]
|
|
setter = lambda self, value: store( # pylint: disable=unnecessary-lambda-assignment
|
|
self
|
|
).__setitem__(index, value)
|
|
elif isinstance(store, str):
|
|
getter = lambda self: self.__getattribute__( # pylint: disable=unnecessary-dunder-call,unnecessary-lambda-assignment
|
|
store
|
|
)[index]
|
|
setter = lambda self, value: self.__getattribute__( # pylint: disable=unnecessary-dunder-call,unnecessary-lambda-assignment
|
|
store
|
|
).__setitem__(index, value)
|
|
else:
|
|
getter = (
|
|
lambda self: store[index] # pylint: disable=unnecessary-lambda-assignment
|
|
)
|
|
setter = lambda self, value: store.__setitem__( # pylint: disable=unnecessary-lambda-assignment
|
|
index, value
|
|
)
|
|
|
|
return property(getter, setter)
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# Bit packing functions
|
|
# --------------------------------------------------------------------------- #
|
|
def pack_bitstring(bits: list[bool]) -> bytes:
|
|
"""Create a bytestring out of a list of bits.
|
|
|
|
:param bits: A list of bits
|
|
|
|
example::
|
|
|
|
bits = [False, True, False, True]
|
|
result = pack_bitstring(bits)
|
|
"""
|
|
ret = b""
|
|
i = packed = 0
|
|
for bit in bits:
|
|
if bit:
|
|
packed += 128
|
|
i += 1
|
|
if i == 8:
|
|
ret += struct.pack(">B", packed)
|
|
i = packed = 0
|
|
else:
|
|
packed >>= 1
|
|
if 0 < i < 8:
|
|
packed >>= 7 - i
|
|
ret += struct.pack(">B", packed)
|
|
return ret
|
|
|
|
|
|
def unpack_bitstring(data: bytes) -> list[bool]:
|
|
"""Create bit list out of a bytestring.
|
|
|
|
:param data: The modbus data packet to decode
|
|
|
|
example::
|
|
|
|
bytes = "bytes to decode"
|
|
result = unpack_bitstring(bytes)
|
|
"""
|
|
byte_count = len(data)
|
|
bits = []
|
|
for byte in range(byte_count):
|
|
value = int(int(data[byte]))
|
|
for _ in range(8):
|
|
bits.append((value & 1) == 1)
|
|
value >>= 1
|
|
return bits
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# Error Detection Functions
|
|
# --------------------------------------------------------------------------- #
|
|
|
|
|
|
def rtuFrameSize(data, byte_count_pos): # pylint: disable=invalid-name
|
|
"""Calculate the size of the frame based on the byte count.
|
|
|
|
:param data: The buffer containing the frame.
|
|
:param byte_count_pos: The index of the byte count in the buffer.
|
|
:returns: The size of the frame.
|
|
|
|
The structure of frames with a byte count field is always the
|
|
same:
|
|
|
|
- first, there are some header fields
|
|
- then the byte count field
|
|
- then as many data bytes as indicated by the byte count,
|
|
- finally the CRC (two bytes).
|
|
|
|
To calculate the frame size, it is therefore sufficient to extract
|
|
the contents of the byte count field, add the position of this
|
|
field, and finally increment the sum by three (one byte for the
|
|
byte count field, two for the CRC).
|
|
"""
|
|
return int(data[byte_count_pos]) + byte_count_pos + 3
|
|
|
|
|
|
def hexlify_packets(packet):
|
|
"""Return hex representation of bytestring received.
|
|
|
|
:param packet:
|
|
:return:
|
|
"""
|
|
if not packet:
|
|
return ""
|
|
return " ".join([hex(int(x)) for x in packet])
|