diff --git a/CHANGELOG.md b/CHANGELOG.md index 9c2f7b17..9df4a891 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added +- On consumer side: Option to configure the time to wait until the event sink is started, + this avoids invalid subscription requests [#147](https://github.com/Draegerwerk/sdc11073/issues/147) + +### Changed +- Error handling for invalid subscription requests - sending faults specified in WS-Eventing [#147](https://github.com/Draegerwerk/sdc11073/issues/147) + ## [1.1.26] - 2023-06-08 ### Added diff --git a/src/sdc11073/pysoap/soapenvelope.py b/src/sdc11073/pysoap/soapenvelope.py index 2525f606..8975e862 100644 --- a/src/sdc11073/pysoap/soapenvelope.py +++ b/src/sdc11073/pysoap/soapenvelope.py @@ -209,10 +209,10 @@ def mkEndpointReference(idstring): _LANGUAGE_ATTR = '{http://www.w3.org/XML/1998/namespace}lang' +MODE_PUSH = '{}/DeliveryModes/Push'.format(Prefix.WSE.namespace) class WsSubscribe(object): - MODE_PUSH = '{}/DeliveryModes/Push'.format(Prefix.WSE.namespace) __slots__ = ('delivery_mode', 'notifyTo', 'endTo', 'expires', 'filter') def __init__(self, notifyTo, expires, @@ -223,9 +223,9 @@ def __init__(self, notifyTo, @param notifyTo: a WsaEndpointReferenceType @param expires: duration in seconds ( absolute date not supported) @param endTo: a WsaEndpointReferenceType or None - @param delivery_mode: defaults to self.MODE_PUSH + @param delivery_mode: defaults to MODE_PUSH """ - self.delivery_mode = delivery_mode or self.MODE_PUSH + self.delivery_mode = delivery_mode or MODE_PUSH self.notifyTo = notifyTo self.endTo = endTo self.expires = expires diff --git a/src/sdc11073/sdcclient/sdcclientimpl.py b/src/sdc11073/sdcclient/sdcclientimpl.py index f14ffb1c..ff0ff94d 100644 --- a/src/sdc11073/sdcclient/sdcclientimpl.py +++ b/src/sdc11073/sdcclient/sdcclientimpl.py @@ -288,7 +288,7 @@ def client(self, porttypename): """ returns the client for the given port type name. WDP and SDC use different port type names, e.g. WPF="Get", SDC="GetService". If the port type is not found directly, it tries also with or without "Service" in name. - :param porttypename: string, e.g "Get", or "GetService", ... + @param porttypename: string, e.g "Get", or "GetService", ... """ client = self._serviceClients.get(porttypename) if client is None and porttypename.endswith('Service'): @@ -338,22 +338,24 @@ def subscription_mgr(self): return self._subscriptionMgr def startAll(self, notSubscribedActions=None, subscriptionsCheckInterval=None, async_dispatch=True, - subscribe_periodic_reports=False): + subscribe_periodic_reports=False, dispatcher_timeout=15.0): """ - :param notSubscribedActions: a list of pmtypes.Actions elements or None. if None, everything is subscribed. - :param subscriptionsCheckInterval: an interval in seconds or None - :param async_dispatch: if True, incoming requests are queued and response is sent immediately (processing is done later). + @param notSubscribedActions: a list of pmtypes.Actions elements or None. if None, everything is subscribed. + @param subscriptionsCheckInterval: an interval in seconds or None + @param async_dispatch: if True, incoming requests are queued and response is sent immediately (processing is done later). if False, response is sent after the complete processing is done. - :param subscribe_periodic_reports: boolean - :return: None + @param subscribe_periodic_reports: boolean + @param dispatcher_timeout: time to wait for the event sink aka. dispatcher thread to be started, if timeout is + exceeded a RuntimeError is raised + @return: None """ self.discoverHostedServices() - self._startEventSink(async_dispatch) - periodic_actions = set([self.sdc_definitions.Actions.PeriodicMetricReport, - self.sdc_definitions.Actions.PeriodicAlertReport, - self.sdc_definitions.Actions.PeriodicComponentReport, - self.sdc_definitions.Actions.PeriodicContextReport, - self.sdc_definitions.Actions.PeriodicOperationalStateReport]) + self._startEventSink(async_dispatch, dispatcher_timeout=dispatcher_timeout) + periodic_actions = {self.sdc_definitions.Actions.PeriodicMetricReport, + self.sdc_definitions.Actions.PeriodicAlertReport, + self.sdc_definitions.Actions.PeriodicComponentReport, + self.sdc_definitions.Actions.PeriodicContextReport, + self.sdc_definitions.Actions.PeriodicOperationalStateReport} # start subscription manager self._subscriptionMgr = subscription.SubscriptionManager(self._notificationsDispatcherThread.base_url, log_prefix=self.log_prefix, @@ -508,7 +510,7 @@ def _mkHostedServiceClient(self, porttype, soapClient, hosted): cls = self._servicesLookup.get(porttype, HostedServiceClient) return cls(soapClient, hosted, porttype, self.sdc_definitions, self.log_prefix) - def _startEventSink(self, async_dispatch): + def _startEventSink(self, async_dispatch, dispatcher_timeout=15.0): if self._sslEvents == 'auto': sslContext = self._sslContext if self._device_uses_https else None elif self._sslEvents: # True @@ -527,7 +529,11 @@ def _startEventSink(self, async_dispatch): async_dispatch=async_dispatch) self._notificationsDispatcherThread.start() - self._notificationsDispatcherThread.started_evt.wait(timeout=5) + event_is_set = self._notificationsDispatcherThread.started_evt.wait(timeout=dispatcher_timeout) + if not event_is_set: + self._logger.error('Cannot start consumer, start event of EventSink not set.') + raise RuntimeError('Cannot start consumer, start event of EventSink not set.') + self._logger.info('serving EventSink on {}', self._notificationsDispatcherThread.base_url) def _stopEventSink(self, closeAllConnections): diff --git a/src/sdc11073/sdcdevice/exceptions.py b/src/sdc11073/sdcdevice/exceptions.py index 77d599a9..9dfe77dc 100644 --- a/src/sdc11073/sdcdevice/exceptions.py +++ b/src/sdc11073/sdcdevice/exceptions.py @@ -1,6 +1,12 @@ -from ..pysoap.soapenvelope import SoapFault, SoapFaultCode, AdressingFault +from ..pysoap.soapenvelope import AdressingFault +from ..pysoap.soapenvelope import MODE_PUSH +from ..pysoap.soapenvelope import SoapFault +from ..pysoap.soapenvelope import SoapFaultCode + + class HTTPRequestHandlingError(Exception): """ This class is used to communicate errors from http request handlers back to http server.""" + def __init__(self, status, reason, soapfault): """ @param status: integer, e.g. 404 @@ -38,3 +44,24 @@ def __init__(self, request, path): code=SoapFaultCode.SENDER, reason='invalid path {}'.format(path)) super().__init__(400, 'Bad Request', fault.as_xml()) + + +class InvalidMessageError(HTTPRequestHandlingError): + def __init__(self, request, detail): + fault = AdressingFault(request, + code=SoapFaultCode.SENDER, + reason='The message is not valid and cannot be processed.', + details='Detail: {} - The invalid message: {}'.format(detail, + request.rawdata.decode("utf-8"))) + super().__init__(400, 'Bad Request', fault.as_xml()) + + +class DeliveryModeRequestedUnavailableError(HTTPRequestHandlingError): + def __init__(self, request, detail=None): + if detail is None: + detail = f"The only supported mode: {MODE_PUSH}" + fault = AdressingFault(request, + code=SoapFaultCode.SENDER, + reason='The requested delivery mode is not supported.', + details=detail) + super().__init__(400, 'Bad Request', fault.as_xml()) diff --git a/src/sdc11073/sdcdevice/sdcservicesimpl.py b/src/sdc11073/sdcdevice/sdcservicesimpl.py index 5d7a20f3..234a318f 100644 --- a/src/sdc11073/sdcdevice/sdcservicesimpl.py +++ b/src/sdc11073/sdcdevice/sdcservicesimpl.py @@ -8,7 +8,7 @@ from ..namespaces import msgTag, domTag, s12Tag, wsxTag, wseTag, dpwsTag, mdpwsTag, nsmap from .. import pmtypes from .. import loghelper -from .exceptions import InvalidActionError, FunctionNotImplementedError +from .exceptions import InvalidActionError, FunctionNotImplementedError, InvalidMessageError _msg_prefix = Prefix.MSG.prefix _wsdl_ns = Prefix.WSDL.namespace @@ -151,7 +151,7 @@ def _onSubscribe(self, httpHeader, soapEnvelope): "//wse:Filter[@Dialect='{}/Action']".format(Prefix.DPWS.namespace), namespaces=nsmap) if len(subscriptionFilters) != 1: - raise Exception + raise InvalidMessageError(request=soapEnvelope, detail="wse:Subscribe/wse:Filter not provided") else: sfilters = subscriptionFilters[0].text for sfilter in sfilters.split(): diff --git a/src/sdc11073/sdcdevice/subscriptionmgr.py b/src/sdc11073/sdcdevice/subscriptionmgr.py index d8f48ab3..5a4c20c3 100644 --- a/src/sdc11073/sdcdevice/subscriptionmgr.py +++ b/src/sdc11073/sdcdevice/subscriptionmgr.py @@ -1,22 +1,32 @@ -import uuid -import time import copy +import http.client import socket +import time import traceback -from collections import deque, defaultdict import urllib -import http.client +import uuid +from collections import defaultdict +from collections import deque from lxml import etree as etree_ -from ..namespaces import xmlTag, wseTag, wsaTag, msgTag, nsmap, DocNamespaceHelper -from ..namespaces import Prefix_Namespace as Prefix -from .. import pysoap + +from .exceptions import DeliveryModeRequestedUnavailableError +from .exceptions import InvalidMessageError from .. import isoduration -from .. import xmlparsing -from .. import observableproperties -from .. import multikey from .. import loghelper +from .. import multikey +from .. import observableproperties +from .. import pysoap +from .. import xmlparsing from ..compression import CompressionHandler +from ..namespaces import DocNamespaceHelper +from ..namespaces import Prefix_Namespace as Prefix +from ..namespaces import msgTag +from ..namespaces import nsmap +from ..namespaces import wsaTag +from ..namespaces import wseTag +from ..namespaces import xmlTag + WsAddress = pysoap.soapenvelope.WsAddress Soap12Envelope = pysoap.soapenvelope.Soap12Envelope @@ -258,24 +268,31 @@ def fromSoapEnvelope(cls, soapEnvelope, sslContext, acceptedEncodings, max_subsc endToAddresses = soapEnvelope.bodyNode.xpath('wse:Subscribe/wse:EndTo', namespaces=nsmap) if len(endToAddresses) == 1: endToNode = endToAddresses[0] - endToAddress = endToNode.xpath('wsa:Address/text()', namespaces=nsmap)[0] + endToAddress = endToNode.xpath('wsa:Address/text()', namespaces=nsmap) + if not endToAddress: + raise InvalidMessageError(request=soapEnvelope, detail="wse:Subscribe/wse:EndTo/wsa:Address not set") + endToAddress = endToAddress[0] endToRefNode = endToNode.find('wsa:ReferenceParameters', namespaces=nsmap) # determine (mandatory) notification address deliveryNode = soapEnvelope.bodyNode.xpath('wse:Subscribe/wse:Delivery', namespaces=nsmap)[0] notifyToNode = deliveryNode.find('wse:NotifyTo', namespaces=nsmap) + if notifyToNode is None: + raise InvalidMessageError(request=soapEnvelope, detail="wse:Subscribe/wse:Delivery/wse:NotifyTo not set") notifyToAddress = notifyToNode.xpath('wsa:Address/text()', namespaces=nsmap)[0] notifyRefNode = notifyToNode.find('wsa:ReferenceParameters', namespaces=nsmap) - mode = deliveryNode.get('Mode') # mandatory attribute + mode = deliveryNode.get('Mode', pysoap.soapenvelope.MODE_PUSH) + if mode != pysoap.soapenvelope.MODE_PUSH: + raise DeliveryModeRequestedUnavailableError(request=soapEnvelope) expiresNodes = soapEnvelope.bodyNode.xpath('wse:Subscribe/wse:Expires/text()', namespaces=nsmap) - if len(expiresNodes) == 0: - expires = None - else: - expires = isoduration.parse_duration(str(expiresNodes[0])) + expires = isoduration.parse_duration(str(expiresNodes[0])) if expiresNodes else None - filter_ = soapEnvelope.bodyNode.xpath('wse:Subscribe/wse:Filter/text()', namespaces=nsmap)[0] + filter_ = soapEnvelope.bodyNode.xpath('wse:Subscribe/wse:Filter/text()', namespaces=nsmap) + if not filter_: + raise InvalidMessageError(request=soapEnvelope, detail="wse:Subscribe/wse:Filter not set") + filter_ = filter_[0] return cls(str(mode), base_urls, notifyToAddress, notifyRefNode, endToAddress, endToRefNode, expires, max_subscription_duration, str(filter_), sslContext, acceptedEncodings)