#!/usr/bin/env python3
# pylint: disable=missing-type-doc,missing-param-doc,differing-param-doc,missing-any-param-doc
"""Modbus Message Parser.
The following is an example of how to parse modbus messages
using the supplied framers for a number of protocols:
* tcp
* ascii
* rtu
* binary
"""
# -------------------------------------------------------------------------- #
# import needed libraries
# -------------------------------------------------------------------------- #
import codecs as c
import collections
import logging
from optparse import OptionParser # pylint: disable=deprecated-module
import textwrap
from pymodbus.factory import ClientDecoder, ServerDecoder
from pymodbus.transaction import (
ModbusAsciiFramer,
ModbusBinaryFramer,
ModbusRtuFramer,
ModbusSocketFramer,
)
# -------------------------------------------------------------------------- #
# -------------------------------------------------------------------------- #
FORMAT = (
"%(asctime)-15s %(threadName)-15s"
" %(levelname)-8s %(module)-15s:%(lineno)-8s %(message)s"
)
logging.basicConfig(format=FORMAT)
log = logging.getLogger()
# -------------------------------------------------------------------------- #
# build a quick wrapper around the framers
# -------------------------------------------------------------------------- #
class Decoder:
"""Decoder."""
def __init__(self, framer, encode=False):
"""Initialize a new instance of the decoder
:param framer: The framer to use
:param encode: If the message needs to be encoded
"""
self.framer = framer
self.encode = encode
def decode(self, message):
"""Attempt to decode the supplied message
:param message: The message to decode
"""
value = message if self.encode else c.encode(message, "hex_codec")
print("=" * 80)
print(f"Decoding Message {value}")
print("=" * 80)
decoders = [
self.framer(ServerDecoder(), client=None),
self.framer(ClientDecoder(), client=None),
]
for decoder in decoders:
print(f"{decoder.decoder.__class__.__name__}")
print("-" * 80)
try:
decoder.addToFrame(message)
if decoder.checkFrame():
unit = decoder._header.get( # pylint: disable=protected-access
"uid", 0x00
)
decoder.advanceFrame()
decoder.processIncomingPacket(message, self.report, unit)
else:
self.check_errors(decoder, message)
except Exception: # pylint: disable=broad-except
self.check_errors(decoder, message)
def check_errors(self, decoder, message):
"""Attempt to find message errors
:param message: The message to find errors in
"""
txt = f"Unable to parse message - {message} with {decoder}"
log.error(txt)
def report(self, message):
"""Print the message information
:param message: The message to print
"""
print(
"%-15s = %s" # pylint: disable=consider-using-f-string
% (
"name",
message.__class__.__name__,
)
)
for (k_dict, v_dict) in message.__dict__.items():
if isinstance(v_dict, dict):
print("%-15s =" % k_dict) # pylint: disable=consider-using-f-string
for k_item, v_item in v_dict.items():
print(
" %-12s => %s" # pylint: disable=consider-using-f-string
% (k_item, v_item)
)
elif isinstance(v_dict, collections.abc.Iterable):
print("%-15s =" % k_dict) # pylint: disable=consider-using-f-string
value = str([int(x) for x in v_dict])
for line in textwrap.wrap(value, 60):
print(
"%-15s . %s" # pylint: disable=consider-using-f-string
% ("", line)
)
else:
print(
"%-15s = %s" # pylint: disable=consider-using-f-string
% (k_dict, hex(v_dict))
)
print(
"%-15s = %s" # pylint: disable=consider-using-f-string
% (
"documentation",
message.__doc__,
)
)
# -------------------------------------------------------------------------- #
# and decode our message
# -------------------------------------------------------------------------- #
def get_options():
"""Parse the command line options
:returns: The options manager
"""
parser = OptionParser()
parser.add_option(
"-p",
"--parser",
help="The type of parser to use (tcp, rtu, binary, ascii)",
dest="parser",
default="tcp",
)
parser.add_option(
"-D",
"--debug",
help="Enable debug tracing",
action="store_true",
dest="debug",
default=False,
)
parser.add_option(
"-m", "--message", help="The message to parse", dest="message", default=None
)
parser.add_option(
"-a",
"--ascii",
help="The indicates that the message is ascii",
action="store_true",
dest="ascii",
default=False,
)
parser.add_option(
"-b",
"--binary",
help="The indicates that the message is binary",
action="store_false",
dest="ascii",
)
parser.add_option(
"-f",
"--file",
help="The file containing messages to parse",
dest="file",
default=None,
)
parser.add_option(
"-t",
"--transaction",
help="If the incoming message is in hexadecimal format",
action="store_true",
dest="transaction",
default=False,
)
parser.add_option(
"--framer",
help="Framer to use",
dest="framer",
default=None,
)
(opt, arg) = parser.parse_args()
if not opt.message and len(arg) > 0:
opt.message = arg[0]
return opt
def get_messages(option):
"""Do a helper method to generate the messages to parse
:param options: The option manager
:returns: The message iterator to parse
"""
if option.message:
if option.transaction:
msg = ""
for segment in option.message.split():
segment = segment.replace("0x", "")
segment = "0" + segment if len(segment) == 1 else segment
msg = msg + segment
option.message = msg
if not option.ascii:
option.message = c.decode(option.message.encode(), "hex_codec")
yield option.message
elif option.file:
with open(option.file, "r") as handle: # pylint: disable=unspecified-encoding
for line in handle:
if line.startswith("#"):
continue
if not option.ascii:
line = line.strip()
line = line.decode("hex")
yield line
def main():
"""Run main runner function"""
option = get_options()
if option.debug:
try:
log.setLevel(logging.DEBUG)
except Exception as exc: # pylint: disable=broad-except
print(f"Logging is not supported on this system- {exc}")
framer = {
"tcp": ModbusSocketFramer,
"rtu": ModbusRtuFramer,
"binary": ModbusBinaryFramer,
"ascii": ModbusAsciiFramer,
}.get(option.framer or option.parser, ModbusSocketFramer)
decoder = Decoder(framer, option.ascii)
for message in get_messages(option):
decoder.decode(message)
if __name__ == "__main__":
main()