#!/usr/bin/env python3
"""Pymodbus Aynchronous Client Example.
An example of a single threaded synchronous client.
usage: client_async.py [-h] [--comm {tcp,udp,serial,tls}]
[--framer {ascii,binary,rtu,socket,tls}]
[--log {critical,error,warning,info,debug}]
[--port PORT]
options:
-h, --help show this help message and exit
--comm {tcp,udp,serial,tls}
"serial", "tcp", "udp" or "tls"
--framer {ascii,binary,rtu,socket,tls}
"ascii", "binary", "rtu", "socket" or "tls"
--log {critical,error,warning,info,debug}
"critical", "error", "warning", "info" or "debug"
--port PORT the port to use
The corresponding server must be started before e.g. as:
python3 server_sync.py
"""
import argparse
import os
import asyncio
import logging
# --------------------------------------------------------------------------- #
# import the various client implementations
# --------------------------------------------------------------------------- #
from pymodbus.client import (
AsyncModbusSerialClient,
AsyncModbusTcpClient,
AsyncModbusTlsClient,
AsyncModbusUdpClient,
)
from pymodbus.transaction import (
ModbusAsciiFramer,
ModbusBinaryFramer,
ModbusRtuFramer,
ModbusSocketFramer,
ModbusTlsFramer,
)
def setup_async_client(args=None):
"""Run client setup."""
if not args:
args = get_commandline()
if args.comm != "serial" and args.port:
args.port = int(args.port)
_logger.info("### Create client object")
if args.comm == "tcp":
client = AsyncModbusTcpClient(
"127.0.0.1",
port=args.port, # on which port
# Common optional paramers:
framer=args.framer,
# timeout=10,
# retries=3,
# retry_on_empty=False,
# close_comm_on_error=False,
# strict=True,
# TCP setup parameters
# source_address=("localhost", 0),
)
elif args.comm == "udp":
client = AsyncModbusUdpClient(
"127.0.0.1",
port=args.port,
# Common optional paramers:
framer=args.framer,
# timeout=10,
# retries=3,
# retry_on_empty=False,
# close_comm_on_error=False,
# strict=True,
# UDP setup parameters
# source_address=None,
)
elif args.comm == "serial":
client = AsyncModbusSerialClient(
args.port,
# Common optional paramers:
# framer=ModbusRtuFramer,
# timeout=10,
# retries=3,
# retry_on_empty=False,
# close_comm_on_error=False,
# strict=True,
# Serial setup parameters
# baudrate=9600,
# bytesize=8,
# parity="N",
# stopbits=1,
# handle_local_echo=False,
)
elif args.comm == "tls":
cwd = os.getcwd().split("/")[-1]
if cwd == "examples":
path = "."
elif cwd == "test":
path = "../examples"
else:
path = "examples"
client = AsyncModbusTlsClient(
"127.0.0.1",
port=args.port,
# Common optional paramers:
framer=args.framer,
# timeout=10,
# retries=3,
# retry_on_empty=False,
# close_comm_on_error=False,
# strict=True,
# TLS setup parameters
# sslctx=sslctx,
certfile=f"{path}/certificates/pymodbus.crt",
keyfile=f"{path}/certificates/pymodbus.key",
# password="none",
server_hostname="localhost",
)
return client
async def run_async_client(client, modbus_calls=None):
"""Run sync client."""
_logger.info("### Client starting")
await client.connect()
assert client.protocol
if modbus_calls:
await modbus_calls(client)
await client.close()
_logger.info("### End of Program")
# --------------------------------------------------------------------------- #
# Extra code, to allow commandline parameters instead of changing the code
# --------------------------------------------------------------------------- #
FORMAT = "%(asctime)-15s %(levelname)-8s %(module)-15s:%(lineno)-8s %(message)s"
logging.basicConfig(format=FORMAT)
_logger = logging.getLogger()
def get_commandline():
"""Read and validate command line arguments"""
parser = argparse.ArgumentParser(
description="Connect/disconnect a synchronous client."
)
parser.add_argument(
"--comm",
choices=["tcp", "udp", "serial", "tls"],
help='"serial", "tcp", "udp" or "tls"',
type=str,
)
parser.add_argument(
"--framer",
choices=["ascii", "binary", "rtu", "socket", "tls"],
help='"ascii", "binary", "rtu", "socket" or "tls"',
type=str,
)
parser.add_argument(
"--log",
choices=["critical", "error", "warning", "info", "debug"],
help='"critical", "error", "warning", "info" or "debug"',
type=str,
)
parser.add_argument(
"--port",
help="the port to use",
type=str,
)
args = parser.parse_args()
# set defaults
comm_defaults = {
"tcp": ["socket", 5020],
"udp": ["socket", 5020],
"serial": ["rtu", "/dev/ptyp0"],
"tls": ["tls", 5020],
}
framers = {
"ascii": ModbusAsciiFramer,
"binary": ModbusBinaryFramer,
"rtu": ModbusRtuFramer,
"socket": ModbusSocketFramer,
"tls": ModbusTlsFramer,
}
_logger.setLevel(args.log.upper() if args.log else logging.INFO)
if not args.comm:
args.comm = "tcp"
if not args.framer:
args.framer = comm_defaults[args.comm][0]
args.port = args.port or comm_defaults[args.comm][1]
args.framer = framers[args.framer]
return args
if __name__ == "__main__":
# Connect/disconnect no calls.
testclient = setup_async_client()
asyncio.run(run_async_client(testclient), debug=True)