Commit 89886652 authored by Ivan Tyagov's avatar Ivan Tyagov

Readability.

parent 1d06b719
......@@ -24,30 +24,37 @@ a('--xml', help='Path of XML to configure Server. See asyncua doc for more detai
a('--erp5-ip', help='IP of ERP5 instance to which data shall be send.', default=None)
a('--erp5-port', help='Port of ERP5 instance to which data shall be send.', default=None)
args = parser.parse_args()
ip, port, xml, erp5_ip, erp5_port = args.ip, args.port, args.xml, args.erp5_ip, args.erp5_port
ip = args.ip
port = args.port
xml = args.xml
erp5_ip = args.erp5_ip
erp5_port = args.erp5_port
# #################################
# Define ERP5Handler
@dataclass(frozen=True)
class ERP5Handler(asyncua.common.subscription.SubHandler):
ip: str
port: str
session: requests.Session = field(default_factory=requests.Session)
@property
def uri(self):
return f"http://{self.ip}:{self.port}/erp5/opcua_test"
def call(self, **data):
params = urllib.parse.quote_plus(json.dumps({k: str(v) for k, v in data.items()}))
self.session.post(f"{self.uri}?data={params}")
def datachange_notification(self, node, val, data):
self.call(node=node, val=val, data=data)
def event_notification(self, event):
self.call(event=event)
erp5_handler = None
if erp5_ip is not None and erp5_port is not None:
erp5_handler = ERP5Handler(erp5_ip, erp5_port)
class InternalSession(asyncua.server.internal_session.InternalSession):
async def read(self, params):
erp5_handler.call(params=params)
......@@ -61,24 +68,29 @@ async def main():
server = asyncua.Server()
await server.init()
server.set_endpoint(f"opc.tcp://{ip}:{port}/freeopcua/server/")
if xml is not None:
await server.import_xml(xml)
if erp5_handler:
subscription = await server.create_subscription(1000, erp5_handler)
await subscription.subscribe_events()
nodes = await asyncua.common.ua_utils.get_nodes_of_namespace(server)
await subscription.subscribe_data_change(nodes)
def create_session(
name, user=asyncua.server.users.User(role=asyncua.server.users.UserRole.Anonymous), external=False
):
def create_session(name,
user=asyncua.server.users.User(role=asyncua.server.users.UserRole.Anonymous),
external=False):
self = server.iserver
return InternalSession(self, self.aspace, self.subscription_service, name, user=user, external=external)
server.iserver.create_session = create_session
_logger.info("Added subscription for erp5 handler.")
_logger.info("Starting server!")
async with server:
while True:
await asyncio.sleep(1)
logging.basicConfig(level=logging.DEBUG)
asyncio.run(main(), debug=True)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment