Commit 9c321528 authored by Levin Zimmermann's avatar Levin Zimmermann

components += opcua-server

This new component creates a bin, which can be used to run an OPC UA
server [1] as a SlapOS service.

[1] https://opcfoundation.org/about/opc-technologies/opc-ua/
parent e4ace7ff
[buildout]
parts = opcua-server
[opcua-server]
recipe = zc.recipe.egg
eggs =
asyncua
# To send data to erp5
requests
egg-versions =
asyncua = 1.0.1
sortedcontainers = 2.4.0
aiosqlite = 0.18.0:whl
aiofiles = 23.1.0
python-dateutil = 2.8.2:whl
entry-points = ${:_buildout_section_name_}=__main__:main
initialization =
# #################################
# Configure
import argparse
parser = argparse.ArgumentParser(description='Run OPCUA Server.')
a = parser.add_argument
a('--ip', help='The IP address on which the OPCUA Server runs', default="127.0.0.1")
a('--port', help='The port on which the OPCUA Server runs', default="2262")
a('--xml', help='Path of XML to configure Server. See asyncua doc for more details.', default=None)
a('--erp5-ip', help='IP of ERP5 instance to which data shall be send.', default=None)
a('--erp5-port', help='Port of ERP5 instance to which data shall be send.', default=None)
args = parser.parse_args()
ip, port, xml, erp5_ip, erp5_port = args.ip, args.port, args.xml, args.erp5_ip, args.erp5_port
# #################################
# Define ERP5Handler
from dataclasses import dataclass, field
import json
import requests
import urllib
from asyncua.common.subscription import SubHandler
@dataclass(frozen=True)
class ERP5Handler(SubHandler):
ip: str
port: str
session: requests.Session = field(default_factory=requests.Session)
@property
def uri(self):
return f"http://{self.ip}:{self.port}/erp5/opcua_test"
def call(self, **data):
params = urllib.parse.quote_plus(json.dumps({k: str(v) for k, v in data.items()}))
self.session.post(f"{self.uri}?data={params}")
def datachange_notification(self, node, val, data):
self.call(node=node, val=val, data=data)
def event_notification(self, event):
self.call(event=event)
# #################################
# Start OPCUA Server
import asyncio
import logging
from asyncua import Server
from asyncua.common.ua_utils import get_nodes_of_namespace
async def main():
_logger = logging.getLogger(__name__)
# setup our server
server = Server()
await server.init()
server.set_endpoint(f"opc.tcp://{ip}:{port}/freeopcua/server/")
if xml is not None:
await server.import_xml(xml)
if erp5_ip is not None and erp5_port is not None:
erp5_handler = ERP5Handler(erp5_ip, erp5_port)
subscription = await server.create_subscription(1000, erp5_handler)
await subscription.subscribe_events()
nodes = await get_nodes_of_namespace(server)
await subscription.subscribe_data_change(nodes)
_logger.info("Added subscription for erp5 handler.")
_logger.info("Starting server!")
async with server:
while True:
await asyncio.sleep(1)
logging.basicConfig(level=logging.DEBUG)
asyncio.run(main(), debug=True)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment