Commit 78183016 authored by Philipp's avatar Philipp

feat: Implement client functionality (opcua-to-http-gw.py)

- Add login to OPC UA server via credentials
- Establish interval connection for reading nodes from OPC UA server
- Implement monitoring function to subscribe and monitor data changes on OPC UA server nodes
feat: Implement server functionality hosting data and randomly chaning it (minimal-server.py)
parent 16279aac
#!//usr/bin/python #!//usr/bin/python
""" """
Basic server with minimal data Sever simulating/emulating hosting data.
""" """
# from https://opcua-asyncio.readthedocs.io/en/latest/usage/get-started/minimal-client.html
import asyncio import asyncio
import logging import logging
from asyncua import Server, ua from asyncua import Server, ua
from asyncua.common.methods import uamethod from asyncua.common.methods import uamethod
from asyncua.common.manage_nodes import create_object_type
from asyncua.server.users import User, UserRole
import argparse import argparse
import random import random
import csv
import sys
import string
# command line handling # command line handling
parser = argparse.ArgumentParser(description='Run OPCUA Server.') parser = argparse.ArgumentParser(description='Run OPCUA Server.')
a = parser.add_argument a = parser.add_argument
a('--ipv4', help='The IPv4 address on which the OPCUA Server runs', default="0.0.0.0")
a('--ipv6', help='The IPv6 address on which the OPCUA Server runs', default="::") a('--ipv6', help='The IPv6 address on which the OPCUA Server runs', default="::")
a('--ipv6-enabled', help='The IPv6 address check whether it is enabled or disabled', default="1") a('--ipv6-enabled', help='The IPv6 address check whether it is enabled or disabled', default="1")
a('--port', help='The port on which the OPCUA Server runs', default="4840") a('--port', help='The port on which the OPCUA Server runs', default="4840")
a('--xml', help='Path of XML to configure Server. See asyncua doc for more details.', default=None) a('--xml', help='Path of XML to configure Server. See asyncua doc for more details.', default=None)
args = parser.parse_args() args = parser.parse_args()
ipv4 = args.ipv4
ipv6 = args.ipv6 ipv6 = args.ipv6
ipv6_enabled = args.ipv6_enabled ipv6_enabled = args.ipv6_enabled
port = args.port port = args.port
xml = args.xml xml = args.xml
ApplicationUri = "Powerfuse_DataConnect"
# For local development
ipv6 = "::"
users_db = {
"Powerfuse": "password"
}
class UserManager:
def get_user(self, iserver, username=None, password=None, certificate=None):
if username in users_db and password == users_db[username].encode():
return User(role=UserRole.Admin)
return None
def read_nodes(input_interface_description):
import csv
'''
Reads the nodes defined in .csv style description as list of dictionaries with key values paris.
keys represent the header of the .csv file
input_interface_description: filename of the interface description
'''
csv_data = []
with open(input_interface_description, 'r') as file:
reader = csv.DictReader(file, delimiter=',')
for row in reader:
csv_data.append(row)
return csv_data
# Class sub handler - with function that notes datachanges
class SubHandler(object):
def datachange_notification(self, node, val, data):
print(f"New data change event for node {node} with value {val}")
async def shuffle_based_on_csv_data(server, _logger, csv_data):
'''
# The values of the nodes are again set randomly.
# Import the nodes from the CSV data
# server: opcua server
# csv_data: data in the original format of csv now in the format of list of dictionary - iterating over the rows
'''
data_type_mapping = {
'BOOL': (ua.VariantType.Boolean, lambda: random.choice([True, False])),
'INT': (ua.VariantType.Int32, lambda: random.randint(-2**31, 2**31 - 1)),
'STRING': (ua.VariantType.String, lambda: ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(random.randint(5, 20)))),
'UDINT': (ua.VariantType.UInt32, lambda: random.randint(0, 2**32 - 1)),
'UINT': (ua.VariantType.UInt16, lambda: random.randint(0, 2**16 - 1)),
'REAL': (ua.VariantType.Float, lambda: random.uniform(-1e6, 1e6)),
}
object_type_node_class_map = {}
for row in csv_data:
idx = 1
object_type_node_class = row['object_type_node_class']
s = row['s']
data_type = row['data_type']
description = row['description']
access_level = row['access_level']
cloud_transmission = row['cloud_transmission']
if data_type not in data_type_mapping:
_logger.error(f"Unsupported data type: {data_type} for node {s}")
continue
root = server.nodes.root
# Get the object node
myobj = await root.get_child(["0:Objects", f"{idx}:{object_type_node_class}"])
# Get the variable nodes
myvar = await myobj.get_child(f"{idx}:{s}")
variant_type, value_func = data_type_mapping[data_type]
value = value_func()
await myvar.write_value(value)
async def import_nodes_from_csv_data(server, _logger, csv_data):
# Import the nodes from the CSV data
# server: opcua server
# csv_data: data in the original format of csv now in the format of list of dictionary - iterating over the rows
data_type_mapping = {
'BOOL': (ua.VariantType.Boolean, lambda: random.choice([True, False])),
'INT': (ua.VariantType.Int32, lambda: random.randint(-2**31, 2**31 - 1)),
'STRING': (ua.VariantType.String, lambda: ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(random.randint(5, 20)))),
'UDINT': (ua.VariantType.UInt32, lambda: random.randint(0, 2**32 - 1)),
'UINT': (ua.VariantType.UInt16, lambda: random.randint(0, 2**16 - 1)),
'REAL': (ua.VariantType.Float, lambda: random.uniform(-1e6, 1e6)),
}
object_type_node_class_map = {}
for row in csv_data:
idx = 1
object_type_node_class = row['object_type_node_class']
s = row['s']
data_type = row['data_type']
description = row['description']
access_level = row['access_level']
cloud_transmission = row['cloud_transmission']
if data_type not in data_type_mapping:
_logger.error(f"Unsupported data type: {data_type} for node {s}")
continue
if object_type_node_class not in object_type_node_class_map:
myobj = await server.nodes.objects.add_object(idx, object_type_node_class)
object_type_node_class_map[object_type_node_class] = myobj
else:
myobj = object_type_node_class_map[object_type_node_class]
variant_type, value_func = data_type_mapping[data_type]
myvar = await myobj.add_variable(idx, s, ua.Variant([], variant_type), description)
value = value_func()
await myvar.set_writable()
await myvar.set_writable(access_level == access_level)
await myvar.set_value(value)
# Create a subscription with a publishing interval of 500 milliseconds and assign it to the SubHandler class for handling subscription events
sub = await server.create_subscription(500, handler=SubHandler())
# Subscribe the variable node to the subscription for data change notifications
await sub.subscribe_data_change(myvar)
async def main(): async def main():
_logger = logging.getLogger(__name__) _logger = logging.getLogger(__name__)
bool_vialogin = True
# setup our server # setup our server
if bool_vialogin:
# server = Server()
user_manager = UserManager()
server = Server(user_manager = user_manager)
await server.init()
else:
server = Server() server = Server()
await server.init() await server.init()
if bool(int(ipv6_enabled)): if bool(int(ipv6_enabled)):
server.set_endpoint(f"opc.tcp://[{ipv6}]:{port}/PwDC") _logger.info(f"Set endpoint to: opc.tcp://[{ipv6}]:{port}/{ApplicationUri}")
server.set_endpoint(f"opc.tcp://[{ipv6}]:{port}/{ApplicationUri}")
else:
_logger.debug(f"Setting endpoint to: opc.tcp://[{ipv4}]:{port}/{ApplicationUri}")
server.set_endpoint(f"opc.tcp://{ipv4}:{port}/{ApplicationUri}")
#from asyncua.crypto.permission_rules import SimpleRoleRuleset
#server.set_security_policy([ua.SecurityPolicyType.NoSecurity],permission_ruleset=SimpleRoleRuleset())
#,
#ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt,
#ua.SecurityPolicyType.Basic256Sha256_Sign
# ]
if xml is not None: if xml is not None:
await server.import_xml(xml) await server.import_xml(xml)
idx = 1 # Read the interface description defining the nodes
# Create an object to hold the variables nodes_data = read_nodes(input_interface_description = "Powerfuse_Interface_definition_import.csv")
objects = server.get_objects_node()
powerfuse_obj = await objects.add_object(idx, "Powerfuse")
# Create variables
machine_status_rot = await powerfuse_obj.add_variable(idx, "Powerfuse_Maschinenstatus_Status_Rot", False)
machine_status_gelb = await powerfuse_obj.add_variable(idx, "Powerfuse_Maschinenstatus_Status_Gelb", False)
# Set the variables as writable # Create nodes defined in the interface description
await machine_status_rot.set_writable() await import_nodes_from_csv_data(server = server, _logger = _logger, csv_data = nodes_data)
await machine_status_gelb.set_writable()
# Start the server # Start the server
_logger.info("Starting server!") _logger.info("Starting server!")
# Trigger data change events
async with server: async with server:
while True: while True:
# Randomly change the machine status await shuffle_based_on_csv_data(server = server, _logger = _logger, csv_data = nodes_data)
new_status_rot = random.choice([True, False]) await asyncio.sleep(10)
new_status_gelb = random.choice([True, False])
await machine_status_rot.write_value(new_status_rot)
await machine_status_gelb.write_value(new_status_gelb)
val_machine_status_rot = await machine_status_rot.get_value()
val_machine_status_gelb = await machine_status_gelb.get_value()
_logger.info("New machine status: Rot=%r, Gelb=%r",
val_machine_status_rot,
val_machine_status_gelb)
await asyncio.sleep(1)
if __name__ == "__main__": if __name__ == "__main__":
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.ERROR)
asyncio.run(main(), debug=False) asyncio.run(main(), debug=False)
#!//usr/bin/python #!//usr/bin/python
""" """
Basic OPC UA <-> HTTP gateway server. Basic OPC UA <-> HTTP gateway server/client
""" """
import sys import sys
import asyncio import asyncio
from asyncua import Client, ua from asyncua import Client, ua
...@@ -15,11 +14,12 @@ import argparse ...@@ -15,11 +14,12 @@ import argparse
import logging import logging
import __main__ import __main__
import random import random
import csv
# command line handling # command line handling
parser = argparse.ArgumentParser(description='Run OPCUA Server.') parser = argparse.ArgumentParser(description='Run OPCUA Server or Client.')
a = parser.add_argument a = parser.add_argument
# a('--ipv4', help='The IPv4 address on which the OPCUA Server runs', default="0.0.0.0") a('--ipv4', help='The IPv4 address on which the OPCUA Server runs', default="0.0.0.0")
a('--ipv6', help='The IPv6 address on which the OPCUA Server runs', default="::") a('--ipv6', help='The IPv6 address on which the OPCUA Server runs', default="::")
a('--ipv6-enabled', help='The IPv6 address check whether it is enabled or disabled', default="1") a('--ipv6-enabled', help='The IPv6 address check whether it is enabled or disabled', default="1")
a('--port', help='The port on which the OPCUA Server runs', default="4840") a('--port', help='The port on which the OPCUA Server runs', default="4840")
...@@ -28,18 +28,66 @@ a('--erp5-url', help='URL of ERP5 instance to which data shall be send.', defaul ...@@ -28,18 +28,66 @@ a('--erp5-url', help='URL of ERP5 instance to which data shall be send.', defaul
a('--erp5-username', help='Username of ERP5 instance to which data shall be send.', default=None) a('--erp5-username', help='Username of ERP5 instance to which data shall be send.', default=None)
a('--erp5-password', help='Password of ERP5 instance to which data shall be send.', default=None) a('--erp5-password', help='Password of ERP5 instance to which data shall be send.', default=None)
args = parser.parse_args() args = parser.parse_args()
# ipv4 = args.ipv4 ipv4 = args.ipv4
ipv6 = args.ipv6 ipv6 = args.ipv6
ipv6_enabled = args.ipv6_enabled ipv6_enabled = args.ipv6_enabled
ApplicationUri = "Powerfuse_DataConnect"
port = args.port port = args.port
xml = args.xml xml = args.xml
erp5_url = args.erp5_url erp5_url = args.erp5_url
erp5_username = args.erp5_username erp5_username = args.erp5_username
erp5_password = args.erp5_password erp5_password = args.erp5_password
ERP5_REQUEST_API = "ERP5Site_handleOPCUARequest" # Name of the ERP5 site
ERP5_REQUEST_API = "ERP5Site_handleOPCUARequest_data_stream"
# In seconds
ERP5_REQUEST_API_INTERVAL = 30
# Enables OPCUA client functionality
bool_client = True
# Use bool_request_api_interval in combination with ERP5_REQUEST_API_INTERVAL
# Enables the client to sent in a given time interval all data specified in the interface description.
bool_request_api_interval = False
# Enables monitoring of the servers subscribed data
bool_monitoring = True
# Enables functionality to read the initial states from the ERP5 backend
read_backend = False read_backend = False
# Enables OPCUA server functionality
bool_server = False bool_server = False
bool_client = True # Enables login functionality of the client to the server
# True value is currently deprecated error states some internal bad coding.
bool_vialogin = True
# Credential used by the OPCUA client to connect to the OPCUA server
opcua_server_credentials = {"username": "Powerfuse",
"password": "password"}
def read_nodes(input_interface_description):
'''
Reads the nodes defined in .csv style description as list of dictionaries with key values paris.
keys represent the header of the .csv file
input_interface_description: filename of the interface description
'''
csv_data = []
with open(input_interface_description, 'r') as file:
reader = csv.DictReader(file, delimiter=',')
for row in reader:
csv_data.append(row)
return csv_data
# Class sub handler - with function that notes datachanges
class SubHandler(object):
def __init__(self, nodes_data_type):
self.nodes_data_type = nodes_data_type
async def datachange_notification(self, node, val, data):
s = (await node.read_browse_name()).Name
data_type = self.nodes_data_type[s]
custom_data = {
'node': s,
'val': val,
'data': data_type
}
erp5_handler.call(http_method="POST", **custom_data)
print(f"New data change event for node {node} with value {val}, type {data_type} and name {s}")
await asyncio.sleep(0.5)
# ERP5 backend storage for OPCUA Document # ERP5 backend storage for OPCUA Document
@dataclass(frozen=True) @dataclass(frozen=True)
...@@ -60,7 +108,7 @@ class ERP5Handler(asyncua.common.subscription.SubHandler): ...@@ -60,7 +108,7 @@ class ERP5Handler(asyncua.common.subscription.SubHandler):
v = str(v) v = str(v)
serialized_data[k] = v serialized_data[k] = v
print(f"Key: {k}\nValue: {v}\nValue Type: {type(v)}\n\n") # print(f"Key: {k}\nValue: {v}\nValue Type: {type(v)}\n\n")
params = urllib.parse.quote_plus(json.dumps(serialized_data)) params = urllib.parse.quote_plus(json.dumps(serialized_data))
self.session.auth = (erp5_username, erp5_password) self.session.auth = (erp5_username, erp5_password)
...@@ -80,7 +128,6 @@ class ERP5Handler(asyncua.common.subscription.SubHandler): ...@@ -80,7 +128,6 @@ class ERP5Handler(asyncua.common.subscription.SubHandler):
erp5_handler = ERP5Handler(erp5_url) erp5_handler = ERP5Handler(erp5_url)
class InternalSession(asyncua.server.internal_session.InternalSession): class InternalSession(asyncua.server.internal_session.InternalSession):
async def read(self, params): async def read(self, params):
erp5_handler.call(params=params) erp5_handler.call(params=params)
return await super().read(params) return await super().read(params)
...@@ -89,51 +136,104 @@ class InternalSession(asyncua.server.internal_session.InternalSession): ...@@ -89,51 +136,104 @@ class InternalSession(asyncua.server.internal_session.InternalSession):
async def main(): async def main():
_logger = logging.getLogger(__name__) _logger = logging.getLogger(__name__)
if bool_client: if bool_client:
#VariantType=<VariantType.Boolean # For local development - ipv6 adress of the OPCUA server
#VariantType=<VariantType.Int64 ipv6 = "::"
# Example of creating and sending data to the HTTP backend url = f"opc.tcp://[{ipv6}]:{port}/{ApplicationUri}"
def send_custom_data(node, val, data_type):
custom_data = {
'node': node,
'val': val,
'data': data_type
}
# Send the data to the ERP5 backend
erp5_handler.call(http_method="POST", **custom_data)
_logger.info("Sent custom data to ERP5 backend")
# FHI Server
ipv6 = "2001:67c:1254:108:404e::a"
url = f"opc.tcp://[{ipv6}]:{port}/PwDC"
_logger.info("Connecting to %s ...", url) _logger.info("Connecting to %s ...", url)
async with Client(url=url, timeout=2) as client:
# Read the interface description defining the nodes defined in csv format.
nodes_data = read_nodes(input_interface_description = "/Interface_definition_import.csv")
# nodes_data_type (dictionary) is used to map the changing node values to a data type.
# In more details this maps the String type NodeID (s=) (for example Powerfuse_Machinenstatus) to the corresponding data type. Is used in the SubHandler.
nodes_data_type = {}
for row in nodes_data:
nodes_data_type[row["s"]] = row["data_type"]
# While loop: Reconnecting to server in Interval
while True:
try:
# Connecting to client using a login
if bool_vialogin:
client = Client(url=url, timeout=2)
client.set_user(opcua_server_credentials["username"])
client.set_password(opcua_server_credentials["password"])
else:
# Connecting to client without login
client = Client(url=url, timeout=2)
async with client as client:
print("Client running")
# Get root node of OPCUA's server address space
root = client.nodes.root
# bool_monitoring is a boolean value. If "True" it does create a subscription on the nodes on the OPCUA server.
# It thus monitors the data changes of the values of the nodes on the OPCUA server.
if bool_monitoring:
# Create an instance of the SubHandler class with nodes_data_type as an argument
handler = SubHandler(nodes_data_type = nodes_data_type)
idx = 1
for row in nodes_data:
object_type_node_class = row['object_type_node_class']
s = row['s']
data_type = row['data_type']
description = row['description']
access_level = row['access_level']
cloud_transmission = row['cloud_transmission']
# Get the object node
# Reads as root.get_child(["0:Objects", "1:Powerfuse_Maschinenstatus"])
obj = await root.get_child(["0:Objects", f"{idx}:{object_type_node_class}"])
# Get the variable nodes
# Reads as obj.get_child("1:Powerfuse_Maschinenstatus_Status_Rot")
myvar = await obj.get_child(f"{idx}:{s}")
# Create a subscription with a publishing interval of 500 milliseconds and the specified handler
sub = await client.create_subscription(500, handler)
# Subscribe to data changes for the myvar node and get a handle for the subscription
handle = await sub.subscribe_data_change(myvar)
# Keep the subscription alive
try:
while True:
await asyncio.sleep(0)
finally:
await sub.delete()
# Iterates over the data nodes bool_request_api_interval
if bool_request_api_interval:
while True: while True:
await asyncio.sleep(1)
# Get the root node
root = client.get_root_node() root = client.get_root_node()
# custom_data_list stores the values from the nodes collected from the server.
# Get the objects node custom_data_list = []
objects = await root.get_child("0:Objects") idx = 1
for row in nodes_data:
# Get the Powerfuse object object_type_node_class = row['object_type_node_class']
powerfuse_obj = await objects.get_child("1:Powerfuse") s = row['s']
data_type = row['data_type']
# Get the variables description = row['description']
machine_status_rot = await powerfuse_obj.get_child("1:Powerfuse_Maschinenstatus_Status_Rot") access_level = row['access_level']
machine_status_gelb = await powerfuse_obj.get_child("1:Powerfuse_Maschinenstatus_Status_Gelb") cloud_transmission = row['cloud_transmission']
# If cloud_transmission is set to "x" the data is allowed to be processed to the cloude
# Read values if cloud_transmission == "x":
value_rot = await machine_status_rot.read_value() try:
value_gelb = await machine_status_gelb.read_value() myobj = await root.get_child(["0:Objects", f"{idx}:{object_type_node_class}"])
_logger.info(f"Initial values: Rot={value_rot}, Gelb={value_gelb}") mynode= await myobj.get_child(f"{idx}:{s}")
myval = await mynode.read_value()
# Write values except asyncio.TimeoutError:
await machine_status_rot.write_value(True) _logger.warning(f'asyncio.TimeoutError at .get_child(["0:Objects", "{idx}:{s}"]')
await machine_status_gelb.write_value(False) continue
custom_data_list.append({
# Start sending custom data in the background 'node': s,
send_custom_data(node = "Powerfuse_Maschinenstatus_Status_Rot", 'val': myval,
val = value_rot, 'data': data_type
data_type = "VariantType=<VariantType.String") })
# Send data via http_method POST in the for loop
for custom_data in custom_data_list:
# Send the data to the ERP5 backend
erp5_handler.call(http_method="POST", **custom_data)
#_logger.info("Sent custom data to ERP5 backend")
#
await asyncio.sleep(ERP5_REQUEST_API_INTERVAL)
# Catching the opcua ConnectionError and connecting in an interval again.
except (ConnectionError, ua.UaError):
reconnecting_time = 2
_logger.warning(f"Reconnecting in {reconnecting_time} seconds")
await asyncio.sleep(reconnecting_time)
if bool_server: if bool_server:
# setup our server # setup our server
...@@ -144,8 +244,9 @@ async def main(): ...@@ -144,8 +244,9 @@ async def main():
# todo change name of /freeopcua/server # todo change name of /freeopcua/server
_logger.debug(f"Setting endpoint to: opc.tcp://[{ipv6}]:{port}/freeopcua/server/") _logger.debug(f"Setting endpoint to: opc.tcp://[{ipv6}]:{port}/freeopcua/server/")
server.set_endpoint(f"opc.tcp://[{ipv6}]:{port}/freeopcua/server/") server.set_endpoint(f"opc.tcp://[{ipv6}]:{port}/freeopcua/server/")
#else: else:
# server.set_endpoint(f"opc.tcp://{ipv4}:{port}/freeopcua/server/") _logger.debug(f"Setting endpoint to: opc.tcp://[{ipv4}]:{port}/freeopcua/server/")
server.set_endpoint(f"opc.tcp://{ipv4}:{port}/freeopcua/server/")
if xml is not None: if xml is not None:
await server.import_xml(xml) await server.import_xml(xml)
...@@ -198,8 +299,7 @@ async def main(): ...@@ -198,8 +299,7 @@ async def main():
while True: while True:
await asyncio.sleep(1) await asyncio.sleep(1)
logging.basicConfig(level=logging.DEBUG) logging.basicConfig(level=logging.WARNING)
asyncio.run(main(), debug=True)
if __name__ == '__main__': if __name__ == '__main__':
sys.exit(__main__.main()) sys.exit(asyncio.run(main(), debug=False))
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment