From 7d727849bebecab91b95150848660bd62551315a Mon Sep 17 00:00:00 2001 From: Joakim Gustafsson Date: Tue, 8 Oct 2024 12:31:32 +0200 Subject: [PATCH] cleaned upp eventhandling --- .../ijt-support/Connection/SocketHandler.mjs | 2 +- .../IJT_Web_Client/Pytest/TestExample.py | 84 +++++++++++++++++++ .../Pytest/TestSubscriptionHandler.py | 35 ++++++++ .../IJT_Web_Client/Python/CallStructure.py | 6 +- .../IJT_Web_Client/Python/Connection.py | 64 +++++--------- .../IJT_Web_Client/Python/EventHandler.py | 35 ++++++++ .../Resources/connectionpoints.json | 2 +- .../IJT_Web_Client/Resources/settings.json | 26 +----- 8 files changed, 184 insertions(+), 70 deletions(-) create mode 100644 OPC_UA_Clients/Release2/IJT_Web_Client/Pytest/TestExample.py create mode 100644 OPC_UA_Clients/Release2/IJT_Web_Client/Pytest/TestSubscriptionHandler.py create mode 100644 OPC_UA_Clients/Release2/IJT_Web_Client/Python/EventHandler.py diff --git a/OPC_UA_Clients/Release2/IJT_Web_Client/Javascripts/ijt-support/Connection/SocketHandler.mjs b/OPC_UA_Clients/Release2/IJT_Web_Client/Javascripts/ijt-support/Connection/SocketHandler.mjs index 1c3a59f..d2ddeb5 100644 --- a/OPC_UA_Clients/Release2/IJT_Web_Client/Javascripts/ijt-support/Connection/SocketHandler.mjs +++ b/OPC_UA_Clients/Release2/IJT_Web_Client/Javascripts/ijt-support/Connection/SocketHandler.mjs @@ -164,7 +164,7 @@ export class SocketHandler { registerMandatory (responseString, callback) { function applyAll (functionList, msg) { if (msg && msg.exception) { - throw new Error('Response exception: ' + msg.exception) + // throw new Error('Response exception: ' + msg.exception) } let returnValue for (const f of functionList) { diff --git a/OPC_UA_Clients/Release2/IJT_Web_Client/Pytest/TestExample.py b/OPC_UA_Clients/Release2/IJT_Web_Client/Pytest/TestExample.py new file mode 100644 index 0000000..2d398dc --- /dev/null +++ b/OPC_UA_Clients/Release2/IJT_Web_Client/Pytest/TestExample.py @@ -0,0 +1,84 @@ +import pytest +import asyncio +import sys + +sys.path.append("..") + +from Python.Connection import Connection +from Pytest.TestSubscriptionHandler import SubHandler, combinedNameFilter + +# Make sure you are in the venv +# python -m pytest Pytest/TestExample.py + + +@pytest.fixture +async def connect(url): + connection = Connection(url, None) + await connection.connect() + return connection + +@pytest.fixture +async def subscribe(connection, cond): + handler= SubHandler(cond) + await connection.subscribe({"eventtype" : ["joiningsystemevent"]}, handler) + return asyncio.wait_for(handler.getFuture(), timeout=10) + +@pytest.mark.asyncio +async def test_opcua_client(connect, subscribe): + def cond(ev): + return combinedNameFilter(ev, 'SystemConditionClassType', ['AssetConnectedConditionClassType']) + url = 'opc.tcp://10.46.19.106:40451' + + try: + #connection = Connection(url, None) + #await connection.connect() + + #handler= SubHandler(cond) + #await connection.subscribe({"eventtype" : ["joiningsystemevent"]}, handler) + #timedFuture = asyncio.wait_for(handler.getFuture(), timeout=10) + + connection = connect(url, cond) + timedFuture = subscribe(connection, cond) + + call = { + 'objectnode': { + 'Identifier': 'TighteningSystem/Simulations/SimulateEventsAndConditions', + 'NamespaceIndex': '1'}, + 'methodnode': { + 'Identifier': 'TighteningSystem/Simulations/SimulateEventsAndConditions/SimulateEvents', + 'NamespaceIndex': '1'}, + 'arguments': [ + {'dataType': 7, 'value': 1} # 1 means 'Tool connected' simulation + ]} + await connection.methodcall(call) + + eventRaw = await timedFuture + + event = eventRaw.get_event_props_as_fields_dict() + + # Test joining technology + assert event['JoiningSystemEventContent/JoiningTechnology'].Value.Text == 'Tightening' + + # test severity + assert event['Severity'].Value == 1001 # Severity of event should be 100 + + #Test message + assert event['Message'].Value.Text == 'Tool Connected' # Message of event should be 'Tool Connected' + + # Test associatedEntities + associatedEntities = event['JoiningSystemEventContent/AssociatedEntities'].Value + productInstanceNr = 0 + for entity in associatedEntities: + if entity.EntityType == 4: + productInstanceNr = productInstanceNr+1 + assert productInstanceNr == 1 # Exactly one associatedEntity for productInstanceUri + + await connection.terminate() + + except asyncio.TimeoutError: + assert 1==0 # No answer in 10 seconds + await connection.terminate() + + +if __name__ == "__main__": + asyncio.run(test_opcua_client()) diff --git a/OPC_UA_Clients/Release2/IJT_Web_Client/Pytest/TestSubscriptionHandler.py b/OPC_UA_Clients/Release2/IJT_Web_Client/Pytest/TestSubscriptionHandler.py new file mode 100644 index 0000000..162bbe8 --- /dev/null +++ b/OPC_UA_Clients/Release2/IJT_Web_Client/Pytest/TestSubscriptionHandler.py @@ -0,0 +1,35 @@ + +from asyncio import Future + +class SubHandler(): + def __init__(self, filter): + self.filter = filter + self.my_future = Future() + + def getFuture(self): + return self.my_future + + def event_notification(self, event): + print("EVENT RECEIVED") + + if (self.filter(event)): + self.my_future.set_result(event) + +def conditionNameFilter(event, name): + if (event.ConditionClassName.Text == name): + return True + return False + +def subConditionNameFilter(event, names): + eventSubCondNames = [] + for sub in event.ConditionSubClassName: + eventSubCondNames.append(sub.Text) + for name in names: + if (name not in eventSubCondNames): + return False + return True + +def combinedNameFilter(event, name, subnames): + if (conditionNameFilter(event, name) and subConditionNameFilter(event, subnames)): + return True + return False \ No newline at end of file diff --git a/OPC_UA_Clients/Release2/IJT_Web_Client/Python/CallStructure.py b/OPC_UA_Clients/Release2/IJT_Web_Client/Python/CallStructure.py index 7a5f540..24f9c3b 100644 --- a/OPC_UA_Clients/Release2/IJT_Web_Client/Python/CallStructure.py +++ b/OPC_UA_Clients/Release2/IJT_Web_Client/Python/CallStructure.py @@ -6,9 +6,13 @@ def createCallStructure(argument): """ value = argument["value"] + print("value:") print(value) inp = 0 + print("argument - datatype") + print(argument["dataType"]) + match argument["dataType"]: case 3029: inp = ua.JoiningProcessIdentificationDataType() @@ -37,5 +41,5 @@ def createCallStructure(argument): print(inp.__dict__) case _: inp = ua.Variant(value, ua.VariantType(argument["dataType"])) - + return inp diff --git a/OPC_UA_Clients/Release2/IJT_Web_Client/Python/Connection.py b/OPC_UA_Clients/Release2/IJT_Web_Client/Python/Connection.py index 7e5c64a..cb5de4c 100644 --- a/OPC_UA_Clients/Release2/IJT_Web_Client/Python/Connection.py +++ b/OPC_UA_Clients/Release2/IJT_Web_Client/Python/Connection.py @@ -2,6 +2,7 @@ import asyncio from Python.Serialize import serializeTuple, serializeValue from Python.CallStructure import createCallStructure +from Python.EventHandler import EventHandler import json from threading import Thread #from IPython import embed @@ -12,37 +13,6 @@ def IdObjectToString (inp): if isinstance(inp, int): return "ns="+inp["NamespaceIndex"]+";i="+inp["Identifier"] return "ns="+inp["NamespaceIndex"]+";s="+inp["Identifier"] - -class SubHandler(): - """ - Subscription Handler. To receive events from server for a subscription - data_change and event methods are called directly from receiving thread. - Do not do expensive, slow or network operation there. - threaded_websocket handles that via wrap_async_func - """ - def __init__(self, websocket, server_url): - self.websocket = websocket - self.server_url = server_url - - async def threaded_websocket(self, arg): - returnValue = { - "command" : "event", - "endpoint": self.server_url, - "data": arg, - } - await self.websocket.send(json.dumps(returnValue)) - - - def wrap_async_func(self, arg): - asyncio.run(self.threaded_websocket(arg)) - - def event_notification(self, event): - print("EVENT RECEIVED") - # Eventhandlers should be quick and non networked so sending the response - # to the webpage needs to be done asyncronously via a separate thread - thread = Thread(target = self.wrap_async_func, args = (str(serializeValue(event)), )) - thread.start() - class Connection: """ @@ -62,6 +32,7 @@ def __init__(self, server_url, websocket): self.websocket = websocket self.handle = 'handle' self.sub = 'sub' + self.subhandler = 0 async def connect(self): @@ -78,7 +49,8 @@ async def connect(self): "command" : "connection established", "endpoint": self.server_url, } - await self.websocket.send(json.dumps(event)) + if (self.websocket): + await self.websocket.send(json.dumps(event)) return event except Exception as e: @@ -106,18 +78,26 @@ async def terminate(self): async def subscribe(self, data): try: - print("SUBSCRIBE") - msclt= SubHandler(self.websocket, self.server_url) # Defined above - + if not self.subhandler: # Default subscription handler + self.subhandler = EventHandler(self.websocket, self.server_url) + obj = await self.client.nodes.root.get_child(["0:Objects", "0:Server"]) resultEvent = await self.client.nodes.root.get_child(["0:Types", "0:EventTypes", "0:BaseEventType", "7:ResultReadyEventType", "3:JoiningSystemResultReadyEventType"]) joiningSystemEvent = await self.client.nodes.root.get_child(["0:Types", "0:EventTypes", "0:BaseEventType", "3:JoiningSystemEventType"]) await self.client.load_data_type_definitions() + + self.sub = await self.client.create_subscription(100, self.subhandler) - self.sub = await self.client.create_subscription(100, msclt) - self.handle = await self.sub.subscribe_events(obj, [resultEvent, joiningSystemEvent]) + eventTypes = [] + if not "eventype" in data or 'resultevent' in data["eventtype"]: + eventTypes.append(resultEvent) + if not "eventype" in data or 'joiningsystemevent' in data["eventtype"]: + eventTypes.append(joiningSystemEvent) + + self.handle = await self.sub.subscribe_events(obj, eventTypes) + return {} except Exception as e: @@ -240,6 +220,7 @@ async def namespaces(self, data): async def methodcall(self, data): try: + print(data) objectNode = data["objectnode"] methodNode = data["methodnode"] arguments = data["arguments"] @@ -255,14 +236,13 @@ async def methodcall(self, data): input = createCallStructure(argument) attrList.append(input) - print(1) + print("attrList") + print(attrList) + methodRepr = getattr(obj, "call_method") - print(2) out = await methodRepr(*attrList) # call the method and get the output - print(3) - - print(serializeValue(out)) + # print(serializeValue(out)) return { "output" : serializeValue(out) } except Exception as e: diff --git a/OPC_UA_Clients/Release2/IJT_Web_Client/Python/EventHandler.py b/OPC_UA_Clients/Release2/IJT_Web_Client/Python/EventHandler.py new file mode 100644 index 0000000..33b6ab0 --- /dev/null +++ b/OPC_UA_Clients/Release2/IJT_Web_Client/Python/EventHandler.py @@ -0,0 +1,35 @@ + +import json +from threading import Thread +import asyncio +from Python.Serialize import serializeTuple, serializeValue + +class EventHandler(): + """ + Subscription Handler. To receive events from server for a subscription + data_change and event methods are called directly from receiving thread. + Do not do expensive, slow or network operation there. + threaded_websocket handles that via wrap_async_func + """ + def __init__(self, websocket, server_url): + self.websocket = websocket + self.server_url = server_url + + async def threaded_websocket(self, arg): + returnValue = { + "command" : "event", + "endpoint": self.server_url, + "data": arg, + } + await self.websocket.send(json.dumps(returnValue)) + + + def wrap_async_func(self, arg): + asyncio.run(self.threaded_websocket(arg)) + + def event_notification(self, event): + print("EVENT RECEIVED") + # Eventhandlers should be quick and non networked so sending the response + # to the webpage needs to be done asyncronously via a separate thread + thread = Thread(target = self.wrap_async_func, args = (str(serializeValue(event)), )) + thread.start() diff --git a/OPC_UA_Clients/Release2/IJT_Web_Client/Resources/connectionpoints.json b/OPC_UA_Clients/Release2/IJT_Web_Client/Resources/connectionpoints.json index b03d892..3b5642b 100644 --- a/OPC_UA_Clients/Release2/IJT_Web_Client/Resources/connectionpoints.json +++ b/OPC_UA_Clients/Release2/IJT_Web_Client/Resources/connectionpoints.json @@ -1 +1 @@ -{"connectionpoints": [{"name": "Local", "address": "opc.tcp://127.0.0.1:40451", "autoconnect": false}, {"name": "Windows simulation", "address": "opc.tcp://10.46.19.106:40451", "autoconnect": true}, {"name": "ICB-A", "address": "opc.tcp://10.46.16.68:40451", "autoconnect": false}, {"name": "PF8", "address": "opc.tcp://10.46.16.174:40451", "autoconnect": false}, {"name": "Pf8Demo", "address": "opc.tcp://192.168.1.1:40451", "autoconnect": false}], "command": "set connectionpoints", "endpoint": "common"} \ No newline at end of file +{"connectionpoints": [{"name": "Local", "address": "opc.tcp://127.0.0.1:40451", "autoconnect": true}, {"name": "Windows simulation", "address": "opc.tcp://10.46.19.106:40451", "autoconnect": false}, {"name": "ICB-A", "address": "opc.tcp://10.46.16.68:40451", "autoconnect": false}, {"name": "PF8", "address": "opc.tcp://10.46.16.174:40451", "autoconnect": false}, {"name": "Pf8Demo", "address": "opc.tcp://192.168.1.1:40451", "autoconnect": false}], "command": "set connectionpoints", "endpoint": "common"} \ No newline at end of file diff --git a/OPC_UA_Clients/Release2/IJT_Web_Client/Resources/settings.json b/OPC_UA_Clients/Release2/IJT_Web_Client/Resources/settings.json index ea6b65c..60b2e00 100644 --- a/OPC_UA_Clients/Release2/IJT_Web_Client/Resources/settings.json +++ b/OPC_UA_Clients/Release2/IJT_Web_Client/Resources/settings.json @@ -1,25 +1 @@ -{ - "productid": "www.atlascopco.com/CABLE-B0000000-", - "button1selection": "ProgramIndex_1", - "button2selection": "ProgramIndex_2", - "initialviewlevel": "1", - "methoddefaults": { - "ns=1;s=TighteningSystem/Simulations/SimulateResults/SimulateSingleResult": { - "arguments": [ - 2, - true - ], - "autocall": true - }, - "ns=1;s=TighteningSystem/AssetManagement/MethodSet/EnableAsset": { - "arguments": [ - "www.atlascopco.com/WSQP082020", - false - ], - "autocall": false - } - }, - "command": "set settings", - "endpoint": "common", - "initialViewLevel": "1" -} \ No newline at end of file +{"productid": "www.atlascopco.com/CABLE-B0000000-", "button1selection": "ProgramIndex_1", "button2selection": "ProgramIndex_2", "initialviewlevel": "4", "methoddefaults": {"ns=1;s=TighteningSystem/Simulations/SimulateResults/SimulateSingleResult": {"arguments": [2, true], "autocall": true}, "ns=1;s=TighteningSystem/AssetManagement/MethodSet/EnableAsset": {"arguments": ["www.atlascopco.com/WSQP082020", false], "autocall": false}}, "command": "set settings", "endpoint": "common", "initialViewLevel": "1"} \ No newline at end of file