diff --git a/pyvo/astropy_samp/__init__.py b/pyvo/astropy_samp/__init__.py new file mode 100644 index 000000000..cc5c8db9c --- /dev/null +++ b/pyvo/astropy_samp/__init__.py @@ -0,0 +1,39 @@ +# Licensed under a 3-clause BSD style license - see LICENSE.rst +""" +This subpackage provides classes to communicate with other applications via the +`Simple Application Messaging Protocol (SAMP) +`_. + +Before integration into Astropy it was known as +`SAMPy `_, and was developed by Luigi Paioro +(INAF - Istituto Nazionale di Astrofisica). +""" + +from astropy import config as _config + +from .client import * +from .constants import * +from .errors import * +from .hub import * +from .hub_proxy import * +from .integrated_client import * +from .utils import * + + +class Conf(_config.ConfigNamespace): + """ + Configuration parameters for `astropy.samp`. + """ + + use_internet = _config.ConfigItem( + True, + "Whether to allow `astropy.samp` to use the internet, if available.", + aliases=["astropy.samp.utils.use_internet"], + ) + + n_retries = _config.ConfigItem( + 10, "How many times to retry communications when they fail" + ) + + +conf = Conf() diff --git a/pyvo/astropy_samp/client.py b/pyvo/astropy_samp/client.py new file mode 100644 index 000000000..e7c291cb8 --- /dev/null +++ b/pyvo/astropy_samp/client.py @@ -0,0 +1,742 @@ +# Licensed under a 3-clause BSD style license - see LICENSE.rst + + +import copy +import os +import select +import socket +import threading +import warnings +from urllib.parse import urlunparse + +from .constants import SAMP_STATUS_OK, SAMP_STATUS_WARNING +from .errors import SAMPClientError, SAMPWarning +from .hub import SAMPHubServer +from .standard_profile import ThreadingXMLRPCServer +from .utils import get_num_args, internet_on + +__all__ = ["SAMPClient"] + + +class SAMPClient: + """ + Utility class which provides facilities to create and manage a SAMP + compliant XML-RPC server that acts as SAMP callable client application. + + Parameters + ---------- + hub : :class:`~astropy.samp.SAMPHubProxy` + An instance of :class:`~astropy.samp.SAMPHubProxy` to be + used for messaging with the SAMP Hub. + + name : str, optional + Client name (corresponding to ``samp.name`` metadata keyword). + + description : str, optional + Client description (corresponding to ``samp.description.text`` metadata + keyword). + + metadata : dict, optional + Client application metadata in the standard SAMP format. + + addr : str, optional + Listening address (or IP). This defaults to 127.0.0.1 if the internet + is not reachable, otherwise it defaults to the host name. + + port : int, optional + Listening XML-RPC server socket port. If left set to 0 (the default), + the operating system will select a free port. + + callable : bool, optional + Whether the client can receive calls and notifications. If set to + `False`, then the client can send notifications and calls, but can not + receive any. + """ + + # TODO: define what is meant by callable + + def __init__( + self, + hub, + name=None, + description=None, + metadata=None, + addr=None, + port=0, + callable=True, + ): + # GENERAL + self._is_running = False + self._is_registered = False + + if metadata is None: + metadata = {} + + if name is not None: + metadata["samp.name"] = name + + if description is not None: + metadata["samp.description.text"] = description + + self._metadata = metadata + + self._addr = addr + self._port = port + self._xmlrpcAddr = None + self._callable = callable + + # HUB INTERACTION + self.client = None + self._public_id = None + self._private_key = None + self._hub_id = None + self._notification_bindings = {} + self._call_bindings = { + "samp.app.ping": [self._ping, {}], + "client.env.get": [self._client_env_get, {}], + } + self._response_bindings = {} + + self._host_name = "127.0.0.1" + if internet_on(): + try: + self._host_name = socket.getfqdn() + socket.getaddrinfo(self._addr or self._host_name, self._port or 0) + except OSError: + self._host_name = "127.0.0.1" + + self.hub = hub + + if self._callable: + self._thread = threading.Thread(target=self._serve_forever) + self._thread.daemon = True + + self.client = ThreadingXMLRPCServer( + (self._addr or self._host_name, self._port), + logRequests=False, + allow_none=True, + ) + + self.client.register_introspection_functions() + self.client.register_function( + self.receive_notification, "samp.client.receiveNotification" + ) + self.client.register_function(self.receive_call, "samp.client.receiveCall") + self.client.register_function( + self.receive_response, "samp.client.receiveResponse" + ) + + # If the port was set to zero, then the operating system has + # selected a free port. We now check what this port number is. + if self._port == 0: + self._port = self.client.socket.getsockname()[1] + + protocol = "http" + + self._xmlrpcAddr = urlunparse( + ( + protocol, + f"{self._addr or self._host_name}:{self._port}", + "", + "", + "", + "", + ) + ) + + def start(self): + """ + Start the client in a separate thread (non-blocking). + + This only has an effect if ``callable`` was set to `True` when + initializing the client. + """ + if self._callable: + self._is_running = True + self._run_client() + + def stop(self, timeout=10.0): + """ + Stop the client. + + Parameters + ---------- + timeout : float + Timeout after which to give up if the client cannot be cleanly + shut down. + """ + # Setting _is_running to False causes the loop in _serve_forever to + # exit. The thread should then stop running. We wait for the thread to + # terminate until the timeout, then we continue anyway. + self._is_running = False + if self._callable and self._thread.is_alive(): + self._thread.join(timeout) + if self._thread.is_alive(): + raise SAMPClientError( + f"Client was not shut down successfully (timeout={timeout}s)" + ) + + @property + def is_running(self): + """ + Whether the client is currently running. + """ + return self._is_running + + @property + def is_registered(self): + """ + Whether the client is currently registered. + """ + return self._is_registered + + def _run_client(self): + if self._callable: + self._thread.start() + + def _serve_forever(self): + while self._is_running: + try: + read_ready = select.select([self.client.socket], [], [], 0.1)[0] + except OSError as exc: + warnings.warn( + f"Call to select in SAMPClient failed: {exc}", SAMPWarning + ) + else: + if read_ready: + self.client.handle_request() + + self.client.server_close() + + def _ping(self, private_key, sender_id, msg_id, msg_mtype, msg_params, message): + reply = {"samp.status": SAMP_STATUS_OK, "samp.result": {}} + + self.hub.reply(private_key, msg_id, reply) + + def _client_env_get( + self, private_key, sender_id, msg_id, msg_mtype, msg_params, message + ): + if msg_params["name"] in os.environ: + reply = { + "samp.status": SAMP_STATUS_OK, + "samp.result": {"value": os.environ[msg_params["name"]]}, + } + else: + reply = { + "samp.status": SAMP_STATUS_WARNING, + "samp.result": {"value": ""}, + "samp.error": {"samp.errortxt": "Environment variable not defined."}, + } + + self.hub.reply(private_key, msg_id, reply) + + def _handle_notification(self, private_key, sender_id, message): + if private_key == self.get_private_key() and "samp.mtype" in message: + msg_mtype = message["samp.mtype"] + del message["samp.mtype"] + msg_params = message["samp.params"] + del message["samp.params"] + + msubs = SAMPHubServer.get_mtype_subtypes(msg_mtype) + for mtype in msubs: + if mtype in self._notification_bindings: + bound_func = self._notification_bindings[mtype][0] + if get_num_args(bound_func) == 5: + bound_func( + private_key, sender_id, msg_mtype, msg_params, message + ) + else: + bound_func( + private_key, sender_id, None, msg_mtype, msg_params, message + ) + + return "" + + def receive_notification(self, private_key, sender_id, message): + """ + Standard callable client ``receive_notification`` method. + + This method is automatically handled when the + :meth:`~astropy.samp.client.SAMPClient.bind_receive_notification` + method is used to bind distinct operations to MTypes. In case of a + customized callable client implementation that inherits from the + :class:`~astropy.samp.SAMPClient` class this method should be + overwritten. + + .. note:: When overwritten, this method must always return + a string result (even empty). + + Parameters + ---------- + private_key : str + Client private key. + + sender_id : str + Sender public ID. + + message : dict + Received message. + + Returns + ------- + confirmation : str + Any confirmation string. + """ + return self._handle_notification(private_key, sender_id, message) + + def _handle_call(self, private_key, sender_id, msg_id, message): + if private_key == self.get_private_key() and "samp.mtype" in message: + msg_mtype = message["samp.mtype"] + del message["samp.mtype"] + msg_params = message["samp.params"] + del message["samp.params"] + + msubs = SAMPHubServer.get_mtype_subtypes(msg_mtype) + + for mtype in msubs: + if mtype in self._call_bindings: + self._call_bindings[mtype][0]( + private_key, sender_id, msg_id, msg_mtype, msg_params, message + ) + + return "" + + def receive_call(self, private_key, sender_id, msg_id, message): + """ + Standard callable client ``receive_call`` method. + + This method is automatically handled when the + :meth:`~astropy.samp.client.SAMPClient.bind_receive_call` method is + used to bind distinct operations to MTypes. In case of a customized + callable client implementation that inherits from the + :class:`~astropy.samp.SAMPClient` class this method should be + overwritten. + + .. note:: When overwritten, this method must always return + a string result (even empty). + + Parameters + ---------- + private_key : str + Client private key. + + sender_id : str + Sender public ID. + + msg_id : str + Message ID received. + + message : dict + Received message. + + Returns + ------- + confirmation : str + Any confirmation string. + """ + return self._handle_call(private_key, sender_id, msg_id, message) + + def _handle_response(self, private_key, responder_id, msg_tag, response): + if private_key == self.get_private_key() and msg_tag in self._response_bindings: + self._response_bindings[msg_tag]( + private_key, responder_id, msg_tag, response + ) + return "" + + def receive_response(self, private_key, responder_id, msg_tag, response): + """ + Standard callable client ``receive_response`` method. + + This method is automatically handled when the + :meth:`~astropy.samp.client.SAMPClient.bind_receive_response` method + is used to bind distinct operations to MTypes. In case of a customized + callable client implementation that inherits from the + :class:`~astropy.samp.SAMPClient` class this method should be + overwritten. + + .. note:: When overwritten, this method must always return + a string result (even empty). + + Parameters + ---------- + private_key : str + Client private key. + + responder_id : str + Responder public ID. + + msg_tag : str + Response message tag. + + response : dict + Received response. + + Returns + ------- + confirmation : str + Any confirmation string. + """ + return self._handle_response(private_key, responder_id, msg_tag, response) + + def bind_receive_message(self, mtype, function, declare=True, metadata=None): + """ + Bind a specific MType to a function or class method, being intended for + a call or a notification. + + The function must be of the form:: + + def my_function_or_method( private_key, sender_id, msg_id, + mtype, params, extra) + + where ``private_key`` is the client private-key, ``sender_id`` is the + notification sender ID, ``msg_id`` is the Hub message-id (calls only, + otherwise is `None`), ``mtype`` is the message MType, ``params`` is the + message parameter set (content of ``"samp.params"``) and ``extra`` is a + dictionary containing any extra message map entry. The client is + automatically declared subscribed to the MType by default. + + Parameters + ---------- + mtype : str + MType to be caught. + + function : callable + Application function to be used when ``mtype`` is received. + + declare : bool, optional + Specify whether the client must be automatically declared as + subscribed to the MType (see also + :meth:`~astropy.samp.client.SAMPClient.declare_subscriptions`). + + metadata : dict, optional + Dictionary containing additional metadata to declare associated + with the MType subscribed to (see also + :meth:`~astropy.samp.client.SAMPClient.declare_subscriptions`). + """ + self.bind_receive_call(mtype, function, declare=declare, metadata=metadata) + + self.bind_receive_notification( + mtype, function, declare=declare, metadata=metadata + ) + + def bind_receive_notification(self, mtype, function, declare=True, metadata=None): + """ + Bind a specific MType notification to a function or class method. + + The function must be of the form:: + + def my_function_or_method( private_key, sender_id, mtype, + params, extra) + + where ``private_key`` is the client private-key, ``sender_id`` is the + notification sender ID, ``mtype`` is the message MType, ``params`` is + the notified message parameter set (content of ``"samp.params"``) and + ``extra`` is a dictionary containing any extra message map entry. The + client is automatically declared subscribed to the MType by default. + + Parameters + ---------- + mtype : str + MType to be caught. + + function : callable + Application function to be used when ``mtype`` is received. + + declare : bool, optional + Specify whether the client must be automatically declared as + subscribed to the MType (see also + :meth:`~astropy.samp.client.SAMPClient.declare_subscriptions`). + + metadata : dict, optional + Dictionary containing additional metadata to declare associated + with the MType subscribed to (see also + :meth:`~astropy.samp.client.SAMPClient.declare_subscriptions`). + """ + if self._callable: + if not metadata: + metadata = {} + self._notification_bindings[mtype] = [function, metadata] + if declare: + self._declare_subscriptions() + else: + raise SAMPClientError("Client not callable.") + + def bind_receive_call(self, mtype, function, declare=True, metadata=None): + """ + Bind a specific MType call to a function or class method. + + The function must be of the form:: + + def my_function_or_method( private_key, sender_id, msg_id, + mtype, params, extra) + + where ``private_key`` is the client private-key, ``sender_id`` is the + notification sender ID, ``msg_id`` is the Hub message-id, ``mtype`` is + the message MType, ``params`` is the message parameter set (content of + ``"samp.params"``) and ``extra`` is a dictionary containing any extra + message map entry. The client is automatically declared subscribed to + the MType by default. + + Parameters + ---------- + mtype : str + MType to be caught. + + function : callable + Application function to be used when ``mtype`` is received. + + declare : bool, optional + Specify whether the client must be automatically declared as + subscribed to the MType (see also + :meth:`~astropy.samp.client.SAMPClient.declare_subscriptions`). + + metadata : dict, optional + Dictionary containing additional metadata to declare associated + with the MType subscribed to (see also + :meth:`~astropy.samp.client.SAMPClient.declare_subscriptions`). + """ + if self._callable: + if not metadata: + metadata = {} + self._call_bindings[mtype] = [function, metadata] + if declare: + self._declare_subscriptions() + else: + raise SAMPClientError("Client not callable.") + + def bind_receive_response(self, msg_tag, function): + """ + Bind a specific msg-tag response to a function or class method. + + The function must be of the form:: + + def my_function_or_method( private_key, responder_id, + msg_tag, response) + + where ``private_key`` is the client private-key, ``responder_id`` is + the message responder ID, ``msg_tag`` is the message-tag provided at + call time and ``response`` is the response received. + + Parameters + ---------- + msg_tag : str + Message-tag to be caught. + + function : callable + Application function to be used when ``msg_tag`` is received. + """ + if self._callable: + self._response_bindings[msg_tag] = function + else: + raise SAMPClientError("Client not callable.") + + def unbind_receive_notification(self, mtype, declare=True): + """ + Remove from the notifications binding table the specified MType and + unsubscribe the client from it (if required). + + Parameters + ---------- + mtype : str + MType to be removed. + + declare : bool + Specify whether the client must be automatically declared as + unsubscribed from the MType (see also + :meth:`~astropy.samp.client.SAMPClient.declare_subscriptions`). + """ + if self._callable: + del self._notification_bindings[mtype] + if declare: + self._declare_subscriptions() + else: + raise SAMPClientError("Client not callable.") + + def unbind_receive_call(self, mtype, declare=True): + """ + Remove from the calls binding table the specified MType and unsubscribe + the client from it (if required). + + Parameters + ---------- + mtype : str + MType to be removed. + + declare : bool + Specify whether the client must be automatically declared as + unsubscribed from the MType (see also + :meth:`~astropy.samp.client.SAMPClient.declare_subscriptions`). + """ + if self._callable: + del self._call_bindings[mtype] + if declare: + self._declare_subscriptions() + else: + raise SAMPClientError("Client not callable.") + + def unbind_receive_response(self, msg_tag): + """ + Remove from the responses binding table the specified message-tag. + + Parameters + ---------- + msg_tag : str + Message-tag to be removed. + """ + if self._callable: + del self._response_bindings[msg_tag] + else: + raise SAMPClientError("Client not callable.") + + def declare_subscriptions(self, subscriptions=None): + """ + Declares the MTypes the client wishes to subscribe to, implicitly + defined with the MType binding methods + :meth:`~astropy.samp.client.SAMPClient.bind_receive_notification` + and :meth:`~astropy.samp.client.SAMPClient.bind_receive_call`. + + An optional ``subscriptions`` map can be added to the final map passed + to the :meth:`~astropy.samp.hub_proxy.SAMPHubProxy.declare_subscriptions` + method. + + Parameters + ---------- + subscriptions : dict, optional + Dictionary containing the list of MTypes to subscribe to, with the + same format of the ``subscriptions`` map passed to the + :meth:`~astropy.samp.hub_proxy.SAMPHubProxy.declare_subscriptions` + method. + """ + if self._callable: + self._declare_subscriptions(subscriptions) + else: + raise SAMPClientError("Client not callable.") + + def register(self): + """ + Register the client to the SAMP Hub. + """ + if self.hub.is_connected: + if self._private_key is not None: + raise SAMPClientError("Client already registered") + + result = self.hub.register(self.hub.lockfile["samp.secret"]) + + if result["samp.self-id"] == "": + raise SAMPClientError( + "Registration failed - samp.self-id was not set by the hub." + ) + + if result["samp.private-key"] == "": + raise SAMPClientError( + "Registration failed - samp.private-key was not set by the hub." + ) + + self._public_id = result["samp.self-id"] + self._private_key = result["samp.private-key"] + self._hub_id = result["samp.hub-id"] + + if self._callable: + self._set_xmlrpc_callback() + self._declare_subscriptions() + + if self._metadata != {}: + self.declare_metadata() + + self._is_registered = True + + else: + raise SAMPClientError( + "Unable to register to the SAMP Hub. Hub proxy not connected." + ) + + def unregister(self): + """ + Unregister the client from the SAMP Hub. + """ + if self.hub.is_connected: + self._is_registered = False + self.hub.unregister(self._private_key) + self._hub_id = None + self._public_id = None + self._private_key = None + else: + raise SAMPClientError( + "Unable to unregister from the SAMP Hub. Hub proxy not connected." + ) + + def _set_xmlrpc_callback(self): + if self.hub.is_connected and self._private_key is not None: + self.hub.set_xmlrpc_callback(self._private_key, self._xmlrpcAddr) + + def _declare_subscriptions(self, subscriptions=None): + if self.hub.is_connected and self._private_key is not None: + mtypes_dict = {} + # Collect notification mtypes and metadata + for mtype in self._notification_bindings.keys(): + mtypes_dict[mtype] = copy.deepcopy( + self._notification_bindings[mtype][1] + ) + + # Collect notification mtypes and metadata + for mtype in self._call_bindings.keys(): + mtypes_dict[mtype] = copy.deepcopy(self._call_bindings[mtype][1]) + + # Add optional subscription map + if subscriptions: + mtypes_dict.update(copy.deepcopy(subscriptions)) + + self.hub.declare_subscriptions(self._private_key, mtypes_dict) + + else: + raise SAMPClientError( + "Unable to declare subscriptions. Hub " + "unreachable or not connected or client " + "not registered." + ) + + def declare_metadata(self, metadata=None): + """ + Declare the client application metadata supported. + + Parameters + ---------- + metadata : dict, optional + Dictionary containing the client application metadata as defined in + the SAMP definition document. If omitted, then no metadata are + declared. + """ + if self.hub.is_connected and self._private_key is not None: + if metadata is not None: + self._metadata.update(metadata) + self.hub.declare_metadata(self._private_key, self._metadata) + else: + raise SAMPClientError( + "Unable to declare metadata. Hub " + "unreachable or not connected or client " + "not registered." + ) + + def get_private_key(self): + """ + Return the client private key used for the Standard Profile + communications obtained at registration time (``samp.private-key``). + + Returns + ------- + key : str + Client private key. + """ + return self._private_key + + def get_public_id(self): + """ + Return public client ID obtained at registration time + (``samp.self-id``). + + Returns + ------- + id : str + Client public ID. + """ + return self._public_id diff --git a/pyvo/astropy_samp/constants.py b/pyvo/astropy_samp/constants.py new file mode 100644 index 000000000..4bd049485 --- /dev/null +++ b/pyvo/astropy_samp/constants.py @@ -0,0 +1,38 @@ +# Licensed under a 3-clause BSD style license - see LICENSE.rst +""" +Defines constants used in `astropy.samp`. +""" + + +from astropy.utils.data import get_pkg_data_filename + +__all__ = [ + "SAMP_STATUS_OK", + "SAMP_STATUS_WARNING", + "SAMP_STATUS_ERROR", + "SAFE_MTYPES", + "SAMP_ICON", +] + +__profile_version__ = "1.3" + +#: General constant for samp.ok status string +SAMP_STATUS_OK = "samp.ok" +#: General constant for samp.warning status string +SAMP_STATUS_WARNING = "samp.warning" +#: General constant for samp.error status string +SAMP_STATUS_ERROR = "samp.error" + +SAFE_MTYPES = [ + "samp.app.*", + "samp.msg.progress", + "table.*", + "image.*", + "coord.*", + "spectrum.*", + "bibcode.*", + "voresource.*", +] + +with open(get_pkg_data_filename("data/astropy_icon.png"), "rb") as f: + SAMP_ICON = f.read() diff --git a/pyvo/astropy_samp/data/astropy_icon.png b/pyvo/astropy_samp/data/astropy_icon.png new file mode 100644 index 000000000..81f06b37a Binary files /dev/null and b/pyvo/astropy_samp/data/astropy_icon.png differ diff --git a/pyvo/astropy_samp/data/clientaccesspolicy.xml b/pyvo/astropy_samp/data/clientaccesspolicy.xml new file mode 100644 index 000000000..c5eb86433 --- /dev/null +++ b/pyvo/astropy_samp/data/clientaccesspolicy.xml @@ -0,0 +1,13 @@ + + + + + + + + + + + + + diff --git a/pyvo/astropy_samp/data/crossdomain.xml b/pyvo/astropy_samp/data/crossdomain.xml new file mode 100644 index 000000000..63767deb9 --- /dev/null +++ b/pyvo/astropy_samp/data/crossdomain.xml @@ -0,0 +1,7 @@ + + + + + + + diff --git a/pyvo/astropy_samp/errors.py b/pyvo/astropy_samp/errors.py new file mode 100644 index 000000000..7f7a6858a --- /dev/null +++ b/pyvo/astropy_samp/errors.py @@ -0,0 +1,35 @@ +# Licensed under a 3-clause BSD style license - see LICENSE.rst +""" +Defines custom errors and exceptions used in `astropy.samp`. +""" + + +import xmlrpc.client as xmlrpc + +from astropy.utils.exceptions import AstropyUserWarning + +__all__ = ["SAMPWarning", "SAMPHubError", "SAMPClientError", "SAMPProxyError"] + + +class SAMPWarning(AstropyUserWarning): + """ + SAMP-specific Astropy warning class. + """ + + +class SAMPHubError(Exception): + """ + SAMP Hub exception. + """ + + +class SAMPClientError(Exception): + """ + SAMP Client exceptions. + """ + + +class SAMPProxyError(xmlrpc.Fault): + """ + SAMP Proxy Hub exception. + """ diff --git a/pyvo/astropy_samp/hub.py b/pyvo/astropy_samp/hub.py new file mode 100644 index 000000000..ba42bf68b --- /dev/null +++ b/pyvo/astropy_samp/hub.py @@ -0,0 +1,1518 @@ +# Licensed under a 3-clause BSD style license - see LICENSE.rst + + +import copy +import os +import queue +import select +import socket +import threading +import time +import uuid +import warnings +import xmlrpc.client as xmlrpc +from urllib.parse import urlunparse + +from astropy import log + +from .constants import SAMP_STATUS_OK, __profile_version__ +from .errors import SAMPHubError, SAMPProxyError, SAMPWarning +from .lockfile_helpers import create_lock_file, read_lockfile +from .standard_profile import ThreadingXMLRPCServer +from .utils import ServerProxyPool, _HubAsClient, internet_on +from .web_profile import WebProfileXMLRPCServer, web_profile_text_dialog + +__all__ = ["SAMPHubServer", "WebProfileDialog"] + +__doctest_skip__ = [".", "SAMPHubServer.*"] + + +class SAMPHubServer: + """ + SAMP Hub Server. + + Parameters + ---------- + secret : str, optional + The secret code to use for the SAMP lockfile. If none is is specified, + the :func:`uuid.uuid1` function is used to generate one. + + addr : str, optional + Listening address (or IP). This defaults to 127.0.0.1 if the internet + is not reachable, otherwise it defaults to the host name. + + port : int, optional + Listening XML-RPC server socket port. If left set to 0 (the default), + the operating system will select a free port. + + lockfile : str, optional + Custom lockfile name. + + timeout : int, optional + Hub inactivity timeout. If ``timeout > 0`` then the Hub automatically + stops after an inactivity period longer than ``timeout`` seconds. By + default ``timeout`` is set to 0 (Hub never expires). + + client_timeout : int, optional + Client inactivity timeout. If ``client_timeout > 0`` then the Hub + automatically unregisters the clients which result inactive for a + period longer than ``client_timeout`` seconds. By default + ``client_timeout`` is set to 0 (clients never expire). + + mode : str, optional + Defines the Hub running mode. If ``mode`` is ``'single'`` then the Hub + runs using the standard ``.samp`` lock-file, having a single instance + for user desktop session. Otherwise, if ``mode`` is ``'multiple'``, + then the Hub runs using a non-standard lock-file, placed in + ``.samp-1`` directory, of the form ``samp-hub-``, where + ```` is a unique UUID assigned to the hub. + + label : str, optional + A string used to label the Hub with a human readable name. This string + is written in the lock-file assigned to the ``hub.label`` token. + + web_profile : bool, optional + Enables or disables the Web Profile support. + + web_profile_dialog : class, optional + Allows a class instance to be specified using ``web_profile_dialog`` + to replace the terminal-based message with e.g. a GUI pop-up. Two + `queue.Queue` instances will be added to the instance as attributes + ``queue_request`` and ``queue_result``. When a request is received via + the ``queue_request`` queue, the pop-up should be displayed, and a + value of `True` or `False` should be added to ``queue_result`` + depending on whether the user accepted or refused the connection. + + web_port : int, optional + The port to use for web SAMP. This should not be changed except for + testing purposes, since web SAMP should always use port 21012. + + pool_size : int, optional + The number of socket connections opened to communicate with the + clients. + """ + + def __init__( + self, + secret=None, + addr=None, + port=0, + lockfile=None, + timeout=0, + client_timeout=0, + mode="single", + label="", + web_profile=True, + web_profile_dialog=None, + web_port=21012, + pool_size=20, + ): + # Generate random ID for the hub + self._id = str(uuid.uuid1()) + + # General settings + self._is_running = False + self._customlockfilename = lockfile + self._lockfile = None + self._addr = addr + self._port = port + self._mode = mode + self._label = label + self._timeout = timeout + self._client_timeout = client_timeout + self._pool_size = pool_size + + # Web profile specific attributes + self._web_profile = web_profile + self._web_profile_dialog = web_profile_dialog + self._web_port = web_port + + self._web_profile_server = None + self._web_profile_callbacks = {} + self._web_profile_requests_queue = None + self._web_profile_requests_result = None + self._web_profile_requests_semaphore = None + + self._host_name = "127.0.0.1" + if internet_on(): + try: + self._host_name = socket.getfqdn() + socket.getaddrinfo(self._addr or self._host_name, self._port or 0) + except OSError: + self._host_name = "127.0.0.1" + + # Threading stuff + self._thread_lock = threading.Lock() + self._thread_run = None + self._thread_hub_timeout = None + self._thread_client_timeout = None + + self._launched_threads = [] + + # Variables for timeout testing: + self._last_activity_time = None + self._client_activity_time = {} + + # Hub message id counter, used to create hub msg ids + self._hub_msg_id_counter = 0 + + # Hub secret code + self._hub_secret_code_customized = secret + self._hub_secret = self._create_secret_code() + + # Hub public id (as SAMP client) + self._hub_public_id = "" + + # Client ids + # {private_key: (public_id, timestamp)} + self._private_keys = {} + + # Metadata per client + # {private_key: metadata} + self._metadata = {} + + # List of subscribed clients per MType + # {mtype: private_key list} + self._mtype2ids = {} + + # List of subscribed MTypes per client + # {private_key: mtype list} + self._id2mtypes = {} + + # List of XML-RPC addresses per client + # {public_id: (XML-RPC address, ServerProxyPool instance)} + self._xmlrpc_endpoints = {} + + # Synchronous message id heap + self._sync_msg_ids_heap = {} + + # Public ids counter + self._client_id_counter = -1 + + @property + def id(self): + """ + The unique hub ID. + """ + return self._id + + def _register_standard_api(self, server): + # Standard Profile only operations + server.register_function(self._ping, "samp.hub.ping") + server.register_function( + self._set_xmlrpc_callback, "samp.hub.setXmlrpcCallback" + ) + + # Standard API operations + server.register_function(self._register, "samp.hub.register") + server.register_function(self._unregister, "samp.hub.unregister") + server.register_function(self._declare_metadata, "samp.hub.declareMetadata") + server.register_function(self._get_metadata, "samp.hub.getMetadata") + server.register_function( + self._declare_subscriptions, "samp.hub.declareSubscriptions" + ) + server.register_function(self._get_subscriptions, "samp.hub.getSubscriptions") + server.register_function( + self._get_registered_clients, "samp.hub.getRegisteredClients" + ) + server.register_function( + self._get_subscribed_clients, "samp.hub.getSubscribedClients" + ) + server.register_function(self._notify, "samp.hub.notify") + server.register_function(self._notify_all, "samp.hub.notifyAll") + server.register_function(self._call, "samp.hub.call") + server.register_function(self._call_all, "samp.hub.callAll") + server.register_function(self._call_and_wait, "samp.hub.callAndWait") + server.register_function(self._reply, "samp.hub.reply") + + def _register_web_profile_api(self, server): + # Web Profile methods like Standard Profile + server.register_function(self._ping, "samp.webhub.ping") + server.register_function(self._unregister, "samp.webhub.unregister") + server.register_function(self._declare_metadata, "samp.webhub.declareMetadata") + server.register_function(self._get_metadata, "samp.webhub.getMetadata") + server.register_function( + self._declare_subscriptions, "samp.webhub.declareSubscriptions" + ) + server.register_function( + self._get_subscriptions, "samp.webhub.getSubscriptions" + ) + server.register_function( + self._get_registered_clients, "samp.webhub.getRegisteredClients" + ) + server.register_function( + self._get_subscribed_clients, "samp.webhub.getSubscribedClients" + ) + server.register_function(self._notify, "samp.webhub.notify") + server.register_function(self._notify_all, "samp.webhub.notifyAll") + server.register_function(self._call, "samp.webhub.call") + server.register_function(self._call_all, "samp.webhub.callAll") + server.register_function(self._call_and_wait, "samp.webhub.callAndWait") + server.register_function(self._reply, "samp.webhub.reply") + + # Methods particularly for Web Profile + server.register_function(self._web_profile_register, "samp.webhub.register") + server.register_function( + self._web_profile_allowReverseCallbacks, "samp.webhub.allowReverseCallbacks" + ) + server.register_function( + self._web_profile_pullCallbacks, "samp.webhub.pullCallbacks" + ) + + def _start_standard_server(self): + self._server = ThreadingXMLRPCServer( + (self._addr or self._host_name, self._port or 0), + log, + logRequests=False, + allow_none=True, + ) + prot = "http" + + self._port = self._server.socket.getsockname()[1] + addr = f"{self._addr or self._host_name}:{self._port}" + self._url = urlunparse((prot, addr, "", "", "", "")) + self._server.register_introspection_functions() + self._register_standard_api(self._server) + + def _start_web_profile_server(self): + self._web_profile_requests_queue = queue.Queue(1) + self._web_profile_requests_result = queue.Queue(1) + self._web_profile_requests_semaphore = queue.Queue(1) + + if self._web_profile_dialog is not None: + # TODO: Some sort of duck-typing on the web_profile_dialog object + self._web_profile_dialog.queue_request = self._web_profile_requests_queue + self._web_profile_dialog.queue_result = self._web_profile_requests_result + + try: + self._web_profile_server = WebProfileXMLRPCServer( + ("localhost", self._web_port), log, logRequests=False, allow_none=True + ) + self._web_port = self._web_profile_server.socket.getsockname()[1] + self._web_profile_server.register_introspection_functions() + self._register_web_profile_api(self._web_profile_server) + log.info("Hub set to run with Web Profile support enabled.") + except OSError: + log.warning( + f"Port {self._web_port} already in use. Impossible to run the " + "Hub with Web Profile support.", + SAMPWarning, + ) + self._web_profile = False + # Cleanup + self._web_profile_requests_queue = None + self._web_profile_requests_result = None + self._web_profile_requests_semaphore = None + + def _launch_thread(self, group=None, target=None, name=None, args=None): + # Remove inactive threads + remove = [] + for t in self._launched_threads: + if not t.is_alive(): + remove.append(t) + for t in remove: + self._launched_threads.remove(t) + + # Start new thread + t = threading.Thread(group=group, target=target, name=name, args=args) + t.start() + + # Add to list of launched threads + self._launched_threads.append(t) + + def _join_launched_threads(self, timeout=None): + for t in self._launched_threads: + t.join(timeout=timeout) + + def _timeout_test_hub(self): + if self._timeout == 0: + return + + last = time.time() + while self._is_running: + time.sleep(0.05) # keep this small to check _is_running often + now = time.time() + if now - last > 1.0: + with self._thread_lock: + if self._last_activity_time is not None: + if now - self._last_activity_time >= self._timeout: + warnings.warn( + "Timeout expired, Hub is shutting down!", SAMPWarning + ) + self.stop() + return + last = now + + def _timeout_test_client(self): + if self._client_timeout == 0: + return + + last = time.time() + while self._is_running: + time.sleep(0.05) # keep this small to check _is_running often + now = time.time() + if now - last > 1.0: + for private_key in self._client_activity_time.keys(): + if ( + now - self._client_activity_time[private_key] + > self._client_timeout + and private_key != self._hub_private_key + ): + warnings.warn( + f"Client {private_key} timeout expired!", SAMPWarning + ) + self._notify_disconnection(private_key) + self._unregister(private_key) + last = now + + def _hub_as_client_request_handler(self, method, args): + if method == "samp.client.receiveCall": + return self._receive_call(*args) + elif method == "samp.client.receiveNotification": + return self._receive_notification(*args) + elif method == "samp.client.receiveResponse": + return self._receive_response(*args) + elif method == "samp.app.ping": + return self._ping(*args) + + def _setup_hub_as_client(self): + hub_metadata = { + "samp.name": "Astropy SAMP Hub", + "samp.description.text": self._label, + "author.name": "The Astropy Collaboration", + "samp.documentation.url": "https://docs.astropy.org/en/stable/samp", + "samp.icon.url": self._url + "/samp/icon", + } + + result = self._register(self._hub_secret) + self._hub_public_id = result["samp.self-id"] + self._hub_private_key = result["samp.private-key"] + self._set_xmlrpc_callback(self._hub_private_key, self._url) + self._declare_metadata(self._hub_private_key, hub_metadata) + self._declare_subscriptions( + self._hub_private_key, {"samp.app.ping": {}, "x-samp.query.by-meta": {}} + ) + + def start(self, wait=False): + """ + Start the current SAMP Hub instance and create the lock file. Hub + start-up can be blocking or non blocking depending on the ``wait`` + parameter. + + Parameters + ---------- + wait : bool + If `True` then the Hub process is joined with the caller, blocking + the code flow. Usually `True` option is used to run a stand-alone + Hub in an executable script. If `False` (default), then the Hub + process runs in a separated thread. `False` is usually used in a + Python shell. + """ + if self._is_running: + raise SAMPHubError("Hub is already running") + + if self._lockfile is not None: + raise SAMPHubError("Hub is not running but lockfile is set") + + if self._web_profile: + self._start_web_profile_server() + + self._start_standard_server() + + self._lockfile = create_lock_file( + lockfilename=self._customlockfilename, + mode=self._mode, + hub_id=self.id, + hub_params=self.params, + ) + + self._update_last_activity_time() + self._setup_hub_as_client() + + self._start_threads() + + log.info("Hub started") + + if wait and self._is_running: + self._thread_run.join() + self._thread_run = None + + @property + def params(self): + """ + The hub parameters (which are written to the logfile). + """ + params = {} + + # Keys required by standard profile + + params["samp.secret"] = self._hub_secret + params["samp.hub.xmlrpc.url"] = self._url + params["samp.profile.version"] = __profile_version__ + + # Custom keys + + params["hub.id"] = self.id + params["hub.label"] = self._label or f"Hub {self.id}" + + return params + + def _start_threads(self): + self._thread_run = threading.Thread(target=self._serve_forever) + self._thread_run.daemon = True + + if self._timeout > 0: + self._thread_hub_timeout = threading.Thread( + target=self._timeout_test_hub, name="Hub timeout test" + ) + self._thread_hub_timeout.daemon = True + else: + self._thread_hub_timeout = None + + if self._client_timeout > 0: + self._thread_client_timeout = threading.Thread( + target=self._timeout_test_client, name="Client timeout test" + ) + self._thread_client_timeout.daemon = True + else: + self._thread_client_timeout = None + + self._is_running = True + self._thread_run.start() + + if self._thread_hub_timeout is not None: + self._thread_hub_timeout.start() + if self._thread_client_timeout is not None: + self._thread_client_timeout.start() + + def _create_secret_code(self): + if self._hub_secret_code_customized is not None: + return self._hub_secret_code_customized + else: + return str(uuid.uuid1()) + + def stop(self): + """ + Stop the current SAMP Hub instance and delete the lock file. + """ + if not self._is_running: + return + + log.info("Hub is stopping...") + + self._notify_shutdown() + + self._is_running = False + + if self._lockfile and os.path.isfile(self._lockfile): + lockfiledict = read_lockfile(self._lockfile) + if lockfiledict["samp.secret"] == self._hub_secret: + os.remove(self._lockfile) + self._lockfile = None + + # Reset variables + # TODO: What happens if not all threads are stopped after timeout? + self._join_all_threads(timeout=10.0) + + self._hub_msg_id_counter = 0 + self._hub_secret = self._create_secret_code() + self._hub_public_id = "" + self._metadata = {} + self._private_keys = {} + self._mtype2ids = {} + self._id2mtypes = {} + self._xmlrpc_endpoints = {} + self._last_activity_time = None + + log.info("Hub stopped.") + + def _join_all_threads(self, timeout=None): + # In some cases, ``stop`` may be called from some of the sub-threads, + # so we just need to make sure that we don't try and shut down the + # calling thread. + current_thread = threading.current_thread() + if self._thread_run is not current_thread: + self._thread_run.join(timeout=timeout) + if not self._thread_run.is_alive(): + self._thread_run = None + if ( + self._thread_hub_timeout is not None + and self._thread_hub_timeout is not current_thread + ): + self._thread_hub_timeout.join(timeout=timeout) + if not self._thread_hub_timeout.is_alive(): + self._thread_hub_timeout = None + if ( + self._thread_client_timeout is not None + and self._thread_client_timeout is not current_thread + ): + self._thread_client_timeout.join(timeout=timeout) + if not self._thread_client_timeout.is_alive(): + self._thread_client_timeout = None + + self._join_launched_threads(timeout=timeout) + + @property + def is_running(self): + """Return an information concerning the Hub running status. + + Returns + ------- + running : bool + Is the hub running? + """ + return self._is_running + + def _serve_forever(self): + while self._is_running: + try: + read_ready = select.select([self._server.socket], [], [], 0.01)[0] + except OSError as exc: + warnings.warn( + f"Call to select() in SAMPHubServer failed: {exc}", SAMPWarning + ) + else: + if read_ready: + self._server.handle_request() + + if self._web_profile: + # We now check if there are any connection requests from the + # web profile, and if so, we initialize the pop-up. + if self._web_profile_dialog is None: + try: + request = self._web_profile_requests_queue.get_nowait() + except queue.Empty: + pass + else: + web_profile_text_dialog( + request, self._web_profile_requests_result + ) + + # We now check for requests over the web profile socket, and we + # also update the pop-up in case there are any changes. + try: + read_ready = select.select( + [self._web_profile_server.socket], [], [], 0.01 + )[0] + except OSError as exc: + warnings.warn( + f"Call to select() in SAMPHubServer failed: {exc}", SAMPWarning + ) + else: + if read_ready: + self._web_profile_server.handle_request() + + self._server.server_close() + if self._web_profile_server is not None: + self._web_profile_server.server_close() + + def _notify_shutdown(self): + msubs = SAMPHubServer.get_mtype_subtypes("samp.hub.event.shutdown") + for mtype in msubs: + if mtype in self._mtype2ids: + for key in self._mtype2ids[mtype]: + self._notify_( + self._hub_private_key, + self._private_keys[key][0], + {"samp.mtype": "samp.hub.event.shutdown", "samp.params": {}}, + ) + + def _notify_register(self, private_key): + msubs = SAMPHubServer.get_mtype_subtypes("samp.hub.event.register") + for mtype in msubs: + if mtype in self._mtype2ids: + public_id = self._private_keys[private_key][0] + for key in self._mtype2ids[mtype]: + # if key != private_key: + self._notify( + self._hub_private_key, + self._private_keys[key][0], + { + "samp.mtype": "samp.hub.event.register", + "samp.params": {"id": public_id}, + }, + ) + + def _notify_unregister(self, private_key): + msubs = SAMPHubServer.get_mtype_subtypes("samp.hub.event.unregister") + for mtype in msubs: + if mtype in self._mtype2ids: + public_id = self._private_keys[private_key][0] + for key in self._mtype2ids[mtype]: + if key != private_key: + self._notify( + self._hub_private_key, + self._private_keys[key][0], + { + "samp.mtype": "samp.hub.event.unregister", + "samp.params": {"id": public_id}, + }, + ) + + def _notify_metadata(self, private_key): + msubs = SAMPHubServer.get_mtype_subtypes("samp.hub.event.metadata") + for mtype in msubs: + if mtype in self._mtype2ids: + public_id = self._private_keys[private_key][0] + for key in self._mtype2ids[mtype]: + # if key != private_key: + self._notify( + self._hub_private_key, + self._private_keys[key][0], + { + "samp.mtype": "samp.hub.event.metadata", + "samp.params": { + "id": public_id, + "metadata": self._metadata[private_key], + }, + }, + ) + + def _notify_subscriptions(self, private_key): + msubs = SAMPHubServer.get_mtype_subtypes("samp.hub.event.subscriptions") + for mtype in msubs: + if mtype in self._mtype2ids: + public_id = self._private_keys[private_key][0] + for key in self._mtype2ids[mtype]: + self._notify( + self._hub_private_key, + self._private_keys[key][0], + { + "samp.mtype": "samp.hub.event.subscriptions", + "samp.params": { + "id": public_id, + "subscriptions": self._id2mtypes[private_key], + }, + }, + ) + + def _notify_disconnection(self, private_key): + def _xmlrpc_call_disconnect(endpoint, private_key, hub_public_id, message): + endpoint.samp.client.receiveNotification( + private_key, hub_public_id, message + ) + + msubs = SAMPHubServer.get_mtype_subtypes("samp.hub.disconnect") + public_id = self._private_keys[private_key][0] + endpoint = self._xmlrpc_endpoints[public_id][1] + + for mtype in msubs: + if mtype in self._mtype2ids and private_key in self._mtype2ids[mtype]: + log.debug(f"notify disconnection to {public_id}") + self._launch_thread( + target=_xmlrpc_call_disconnect, + args=( + endpoint, + private_key, + self._hub_public_id, + { + "samp.mtype": "samp.hub.disconnect", + "samp.params": {"reason": "Timeout expired!"}, + }, + ), + ) + + def _ping(self): + self._update_last_activity_time() + log.debug("ping") + return "1" + + def _query_by_metadata(self, key, value): + public_id_list = [] + for private_id in self._metadata: + if key in self._metadata[private_id]: + if self._metadata[private_id][key] == value: + public_id_list.append(self._private_keys[private_id][0]) + + return public_id_list + + def _set_xmlrpc_callback(self, private_key, xmlrpc_addr): + self._update_last_activity_time(private_key) + if private_key in self._private_keys: + if private_key == self._hub_private_key: + public_id = self._private_keys[private_key][0] + self._xmlrpc_endpoints[public_id] = ( + xmlrpc_addr, + _HubAsClient(self._hub_as_client_request_handler), + ) + return "" + + # Dictionary stored with the public id + + log.debug(f"set_xmlrpc_callback: {private_key} {xmlrpc_addr}") + + server_proxy_pool = None + + server_proxy_pool = ServerProxyPool( + self._pool_size, xmlrpc.ServerProxy, xmlrpc_addr, allow_none=1 + ) + + public_id = self._private_keys[private_key][0] + self._xmlrpc_endpoints[public_id] = (xmlrpc_addr, server_proxy_pool) + else: + raise SAMPProxyError(5, f"Private-key {private_key} expired or invalid.") + + return "" + + def _perform_standard_register(self): + with self._thread_lock: + private_key, public_id = self._get_new_ids() + self._private_keys[private_key] = (public_id, time.time()) + self._update_last_activity_time(private_key) + self._notify_register(private_key) + log.debug(f"register: private-key = {private_key} and self-id = {public_id}") + return { + "samp.self-id": public_id, + "samp.private-key": private_key, + "samp.hub-id": self._hub_public_id, + } + + def _register(self, secret): + self._update_last_activity_time() + if secret == self._hub_secret: + return self._perform_standard_register() + else: + # return {"samp.self-id": "", "samp.private-key": "", "samp.hub-id": ""} + raise SAMPProxyError(7, "Bad secret code") + + def _get_new_ids(self): + private_key = str(uuid.uuid1()) + self._client_id_counter += 1 + public_id = "cli#hub" + if self._client_id_counter > 0: + public_id = f"cli#{self._client_id_counter}" + + return private_key, public_id + + def _unregister(self, private_key): + self._update_last_activity_time() + + public_key = "" + + self._notify_unregister(private_key) + + with self._thread_lock: + if private_key in self._private_keys: + public_key = self._private_keys[private_key][0] + del self._private_keys[private_key] + else: + return "" + + if private_key in self._metadata: + del self._metadata[private_key] + + if private_key in self._id2mtypes: + del self._id2mtypes[private_key] + + for mtype in self._mtype2ids.keys(): + if private_key in self._mtype2ids[mtype]: + self._mtype2ids[mtype].remove(private_key) + + if public_key in self._xmlrpc_endpoints: + del self._xmlrpc_endpoints[public_key] + + if private_key in self._client_activity_time: + del self._client_activity_time[private_key] + + if self._web_profile: + if private_key in self._web_profile_callbacks: + del self._web_profile_callbacks[private_key] + self._web_profile_server.remove_client(private_key) + + log.debug(f"unregister {public_key} ({private_key})") + + return "" + + def _declare_metadata(self, private_key, metadata): + self._update_last_activity_time(private_key) + if private_key in self._private_keys: + log.debug( + "declare_metadata: private-key = {} metadata = {}".format( + private_key, str(metadata) + ) + ) + self._metadata[private_key] = metadata + self._notify_metadata(private_key) + else: + raise SAMPProxyError(5, f"Private-key {private_key} expired or invalid.") + return "" + + def _get_metadata(self, private_key, client_id): + self._update_last_activity_time(private_key) + if private_key in self._private_keys: + client_private_key = self._public_id_to_private_key(client_id) + log.debug( + f"get_metadata: private-key = {private_key} client-id = {client_id}" + ) + if client_private_key is not None: + if client_private_key in self._metadata: + log.debug(f"--> metadata = {self._metadata[client_private_key]}") + return self._metadata[client_private_key] + else: + return {} + else: + raise SAMPProxyError(6, "Invalid client ID") + else: + raise SAMPProxyError(5, f"Private-key {private_key} expired or invalid.") + + def _declare_subscriptions(self, private_key, mtypes): + self._update_last_activity_time(private_key) + + if private_key in self._private_keys: + log.debug( + "declare_subscriptions: private-key = {} mtypes = {}".format( + private_key, str(mtypes) + ) + ) + + # remove subscription to previous mtypes + if private_key in self._id2mtypes: + prev_mtypes = self._id2mtypes[private_key] + + for mtype in prev_mtypes: + try: + self._mtype2ids[mtype].remove(private_key) + except ValueError: # private_key is not in list + pass + + self._id2mtypes[private_key] = copy.deepcopy(mtypes) + + # remove duplicated MType for wildcard overwriting + original_mtypes = copy.deepcopy(mtypes) + + for mtype in original_mtypes: + if mtype.endswith("*"): + for mtype2 in original_mtypes: + if mtype2.startswith(mtype[:-1]) and mtype2 != mtype: + if mtype2 in mtypes: + del mtypes[mtype2] + + log.debug( + "declare_subscriptions: subscriptions accepted from {} => {}".format( + private_key, str(mtypes) + ) + ) + + for mtype in mtypes: + if mtype in self._mtype2ids: + if private_key not in self._mtype2ids[mtype]: + self._mtype2ids[mtype].append(private_key) + else: + self._mtype2ids[mtype] = [private_key] + + self._notify_subscriptions(private_key) + + else: + raise SAMPProxyError(5, f"Private-key {private_key} expired or invalid.") + + return "" + + def _get_subscriptions(self, private_key, client_id): + self._update_last_activity_time(private_key) + + if private_key in self._private_keys: + client_private_key = self._public_id_to_private_key(client_id) + if client_private_key is not None: + if client_private_key in self._id2mtypes: + log.debug( + "get_subscriptions: client-id = {} mtypes = {}".format( + client_id, str(self._id2mtypes[client_private_key]) + ) + ) + return self._id2mtypes[client_private_key] + else: + log.debug( + f"get_subscriptions: client-id = {client_id} mtypes = missing" + ) + return {} + else: + raise SAMPProxyError(6, "Invalid client ID") + else: + raise SAMPProxyError(5, f"Private-key {private_key} expired or invalid.") + + def _get_registered_clients(self, private_key): + self._update_last_activity_time(private_key) + + if private_key in self._private_keys: + reg_clients = [] + for pkey in self._private_keys.keys(): + if pkey != private_key: + reg_clients.append(self._private_keys[pkey][0]) + log.debug( + "get_registered_clients: private_key = {} clients = {}".format( + private_key, reg_clients + ) + ) + return reg_clients + else: + raise SAMPProxyError(5, f"Private-key {private_key} expired or invalid.") + + def _get_subscribed_clients(self, private_key, mtype): + self._update_last_activity_time(private_key) + + if private_key in self._private_keys: + sub_clients = {} + + for pkey in self._private_keys.keys(): + if pkey != private_key and self._is_subscribed(pkey, mtype): + sub_clients[self._private_keys[pkey][0]] = {} + + log.debug( + f"get_subscribed_clients: private_key = {private_key} mtype = {mtype} " + f"clients = {sub_clients}" + ) + return sub_clients + else: + raise SAMPProxyError(5, f"Private-key {private_key} expired or invalid.") + + @staticmethod + def get_mtype_subtypes(mtype): + """ + Return a list containing all the possible wildcarded subtypes of MType. + + Parameters + ---------- + mtype : str + MType to be parsed. + + Returns + ------- + types : list + List of subtypes + + Examples + -------- + >>> from astropy.samp import SAMPHubServer + >>> SAMPHubServer.get_mtype_subtypes("samp.app.ping") + ['samp.app.ping', 'samp.app.*', 'samp.*', '*'] + """ + subtypes = [] + + msubs = mtype.split(".") + indexes = list(range(len(msubs))) + indexes.reverse() + indexes.append(-1) + + for i in indexes: + tmp_mtype = ".".join(msubs[: i + 1]) + if tmp_mtype != mtype: + if tmp_mtype != "": + tmp_mtype = tmp_mtype + ".*" + else: + tmp_mtype = "*" + subtypes.append(tmp_mtype) + + return subtypes + + def _is_subscribed(self, private_key, mtype): + subscribed = False + + msubs = SAMPHubServer.get_mtype_subtypes(mtype) + + for msub in msubs: + if msub in self._mtype2ids: + if private_key in self._mtype2ids[msub]: + subscribed = True + + return subscribed + + def _notify(self, private_key, recipient_id, message): + self._update_last_activity_time(private_key) + + if private_key in self._private_keys: + if not ( + self._is_subscribed( + self._public_id_to_private_key(recipient_id), message["samp.mtype"] + ) + ): + raise SAMPProxyError( + 2, + "Client {} not subscribed to MType {}".format( + recipient_id, message["samp.mtype"] + ), + ) + + self._launch_thread( + target=self._notify_, args=(private_key, recipient_id, message) + ) + return {} + else: + raise SAMPProxyError(5, f"Private-key {private_key} expired or invalid.") + + def _notify_(self, sender_private_key, recipient_public_id, message): + if sender_private_key not in self._private_keys: + return + + sender_public_id = self._private_keys[sender_private_key][0] + + try: + log.debug( + "notify {} from {} to {}".format( + message["samp.mtype"], sender_public_id, recipient_public_id + ) + ) + + recipient_private_key = self._public_id_to_private_key(recipient_public_id) + arg_params = (sender_public_id, message) + samp_method_name = "receiveNotification" + + self._retry_method( + recipient_private_key, recipient_public_id, samp_method_name, arg_params + ) + + except Exception as exc: + warnings.warn( + "{} notification from client {} to client {} failed [{}]".format( + message["samp.mtype"], sender_public_id, recipient_public_id, exc + ), + SAMPWarning, + ) + + def _notify_all(self, private_key, message): + self._update_last_activity_time(private_key) + + if private_key in self._private_keys: + if "samp.mtype" not in message: + raise SAMPProxyError(3, "samp.mtype keyword is missing") + recipient_ids = self._notify_all_(private_key, message) + return recipient_ids + else: + raise SAMPProxyError(5, f"Private-key {private_key} expired or invalid.") + + def _notify_all_(self, sender_private_key, message): + recipient_ids = [] + msubs = SAMPHubServer.get_mtype_subtypes(message["samp.mtype"]) + + for mtype in msubs: + if mtype in self._mtype2ids: + for key in self._mtype2ids[mtype]: + if key != sender_private_key: + _recipient_id = self._private_keys[key][0] + recipient_ids.append(_recipient_id) + self._launch_thread( + target=self._notify, + args=(sender_private_key, _recipient_id, message), + ) + if not recipient_ids: + warnings.warn( + "No client was able to receive this message", + SAMPWarning, + ) + + return recipient_ids + + def _call(self, private_key, recipient_id, msg_tag, message): + self._update_last_activity_time(private_key) + + if private_key in self._private_keys: + if not ( + self._is_subscribed( + self._public_id_to_private_key(recipient_id), message["samp.mtype"] + ) + ): + raise SAMPProxyError( + 2, + "Client {} not subscribed to MType {}".format( + recipient_id, message["samp.mtype"] + ), + ) + public_id = self._private_keys[private_key][0] + msg_id = self._get_new_hub_msg_id(public_id, msg_tag) + self._launch_thread( + target=self._call_, + args=(private_key, public_id, recipient_id, msg_id, message), + ) + return msg_id + else: + raise SAMPProxyError(5, f"Private-key {private_key} expired or invalid.") + + def _call_( + self, sender_private_key, sender_public_id, recipient_public_id, msg_id, message + ): + if sender_private_key not in self._private_keys: + return + + try: + log.debug( + "call {} from {} to {} ({})".format( + msg_id.split(";;")[0], + sender_public_id, + recipient_public_id, + message["samp.mtype"], + ) + ) + + recipient_private_key = self._public_id_to_private_key(recipient_public_id) + arg_params = (sender_public_id, msg_id, message) + samp_methodName = "receiveCall" + + self._retry_method( + recipient_private_key, recipient_public_id, samp_methodName, arg_params + ) + + except Exception as exc: + warnings.warn( + "{} call {} from client {} to client {} failed [{},{}]".format( + message["samp.mtype"], + msg_id.split(";;")[0], + sender_public_id, + recipient_public_id, + type(exc), + exc, + ), + SAMPWarning, + ) + + def _call_all(self, private_key, msg_tag, message): + self._update_last_activity_time(private_key) + + if private_key in self._private_keys: + if "samp.mtype" not in message: + raise SAMPProxyError( + 3, + f"samp.mtype keyword is missing in message tagged as {msg_tag}", + ) + + public_id = self._private_keys[private_key][0] + msg_id = self._call_all_(private_key, public_id, msg_tag, message) + return msg_id + else: + raise SAMPProxyError(5, f"Private-key {private_key} expired or invalid.") + + def _call_all_(self, sender_private_key, sender_public_id, msg_tag, message): + msg_id = {} + msubs = SAMPHubServer.get_mtype_subtypes(message["samp.mtype"]) + + for mtype in msubs: + if mtype in self._mtype2ids: + for key in self._mtype2ids[mtype]: + if key != sender_private_key: + _msg_id = self._get_new_hub_msg_id(sender_public_id, msg_tag) + receiver_public_id = self._private_keys[key][0] + msg_id[receiver_public_id] = _msg_id + self._launch_thread( + target=self._call_, + args=( + sender_private_key, + sender_public_id, + receiver_public_id, + _msg_id, + message, + ), + ) + return msg_id + + def _call_and_wait(self, private_key, recipient_id, message, timeout): + self._update_last_activity_time(private_key) + + if private_key in self._private_keys: + timeout = int(timeout) + + now = time.time() + response = {} + + msg_id = self._call(private_key, recipient_id, "samp::sync::call", message) + self._sync_msg_ids_heap[msg_id] = None + + while self._is_running: + if 0 < timeout <= time.time() - now: + del self._sync_msg_ids_heap[msg_id] + raise SAMPProxyError(1, "Timeout expired!") + + if self._sync_msg_ids_heap[msg_id] is not None: + response = copy.deepcopy(self._sync_msg_ids_heap[msg_id]) + del self._sync_msg_ids_heap[msg_id] + break + time.sleep(0.01) + + return response + else: + raise SAMPProxyError(5, f"Private-key {private_key} expired or invalid.") + + def _reply(self, private_key, msg_id, response): + """ + The main method that gets called for replying. This starts up an + asynchronous reply thread and returns. + """ + self._update_last_activity_time(private_key) + if private_key in self._private_keys: + self._launch_thread( + target=self._reply_, args=(private_key, msg_id, response) + ) + else: + raise SAMPProxyError(5, f"Private-key {private_key} expired or invalid.") + + return {} + + def _reply_(self, responder_private_key, msg_id, response): + if responder_private_key not in self._private_keys or not msg_id: + return + + responder_public_id = self._private_keys[responder_private_key][0] + counter, hub_public_id, recipient_public_id, recipient_msg_tag = msg_id.split( + ";;", 3 + ) + + try: + log.debug( + f"reply {counter} from {responder_public_id} to {recipient_public_id}" + ) + + if recipient_msg_tag == "samp::sync::call": + if msg_id in self._sync_msg_ids_heap.keys(): + self._sync_msg_ids_heap[msg_id] = response + + else: + recipient_private_key = self._public_id_to_private_key( + recipient_public_id + ) + arg_params = (responder_public_id, recipient_msg_tag, response) + samp_method_name = "receiveResponse" + + self._retry_method( + recipient_private_key, + recipient_public_id, + samp_method_name, + arg_params, + ) + + except Exception as exc: + warnings.warn( + "{} reply from client {} to client {} failed [{}]".format( + recipient_msg_tag, responder_public_id, recipient_public_id, exc + ), + SAMPWarning, + ) + + def _retry_method( + self, recipient_private_key, recipient_public_id, samp_method_name, arg_params + ): + """ + This method is used to retry a SAMP call several times. + + Parameters + ---------- + recipient_private_key + The private key of the receiver of the call + recipient_public_key + The public key of the receiver of the call + samp_method_name : str + The name of the SAMP method to call + arg_params : tuple + Any additional arguments to be passed to the SAMP method + """ + if recipient_private_key is None: + raise SAMPHubError("Invalid client ID") + + from . import conf + + for attempt in range(conf.n_retries): + if not self._is_running: + time.sleep(0.01) + continue + + try: + if ( + self._web_profile + and recipient_private_key in self._web_profile_callbacks + ): + # Web Profile + callback = { + "samp.methodName": samp_method_name, + "samp.params": arg_params, + } + self._web_profile_callbacks[recipient_private_key].put(callback) + + else: + # Standard Profile + hub = self._xmlrpc_endpoints[recipient_public_id][1] + getattr(hub.samp.client, samp_method_name)( + recipient_private_key, *arg_params + ) + + except xmlrpc.Fault as exc: + log.debug( + "{} XML-RPC endpoint error (attempt {}): {}".format( + recipient_public_id, attempt + 1, exc.faultString + ) + ) + time.sleep(0.01) + else: + return + + # If we are here, then the above attempts failed + error_message = ( + samp_method_name + " failed after " + str(conf.n_retries) + " attempts" + ) + raise SAMPHubError(error_message) + + def _public_id_to_private_key(self, public_id): + for private_key in self._private_keys.keys(): + if self._private_keys[private_key][0] == public_id: + return private_key + return None + + def _get_new_hub_msg_id(self, sender_public_id, sender_msg_id): + with self._thread_lock: + self._hub_msg_id_counter += 1 + return "msg#{};;{};;{};;{}".format( + self._hub_msg_id_counter, + self._hub_public_id, + sender_public_id, + sender_msg_id, + ) + + def _update_last_activity_time(self, private_key=None): + with self._thread_lock: + self._last_activity_time = time.time() + if private_key is not None: + self._client_activity_time[private_key] = time.time() + + def _receive_notification(self, private_key, sender_id, message): + return "" + + def _receive_call(self, private_key, sender_id, msg_id, message): + if private_key == self._hub_private_key: + if "samp.mtype" in message and message["samp.mtype"] == "samp.app.ping": + self._reply( + self._hub_private_key, + msg_id, + {"samp.status": SAMP_STATUS_OK, "samp.result": {}}, + ) + + elif "samp.mtype" in message and ( + message["samp.mtype"] == "x-samp.query.by-meta" + or message["samp.mtype"] == "samp.query.by-meta" + ): + ids_list = self._query_by_metadata( + message["samp.params"]["key"], message["samp.params"]["value"] + ) + self._reply( + self._hub_private_key, + msg_id, + {"samp.status": SAMP_STATUS_OK, "samp.result": {"ids": ids_list}}, + ) + + return "" + else: + return "" + + def _receive_response(self, private_key, responder_id, msg_tag, response): + return "" + + def _web_profile_register( + self, identity_info, client_address=("unknown", 0), origin="unknown" + ): + self._update_last_activity_time() + + if client_address[0] not in ["localhost", "127.0.0.1"]: + raise SAMPProxyError(403, "Request of registration rejected by the Hub.") + + if not origin: + origin = "unknown" + + if isinstance(identity_info, dict): + # an old version of the protocol provided just a string with the app name + if "samp.name" not in identity_info: + raise SAMPProxyError( + 403, + "Request of registration rejected " + "by the Hub (application name not " + "provided).", + ) + + # Red semaphore for the other threads + self._web_profile_requests_semaphore.put("wait") + # Set the request to be displayed for the current thread + self._web_profile_requests_queue.put((identity_info, client_address, origin)) + # Get the popup dialogue response + response = self._web_profile_requests_result.get() + # OK, semaphore green + self._web_profile_requests_semaphore.get() + + if response: + register_map = self._perform_standard_register() + translator_url = "http://localhost:{}/translator/{}?ref=".format( + self._web_port, register_map["samp.private-key"] + ) + register_map["samp.url-translator"] = translator_url + self._web_profile_server.add_client(register_map["samp.private-key"]) + return register_map + else: + raise SAMPProxyError(403, "Request of registration rejected by the user.") + + def _web_profile_allowReverseCallbacks(self, private_key, allow): + self._update_last_activity_time() + if private_key in self._private_keys: + if allow == "0": + if private_key in self._web_profile_callbacks: + del self._web_profile_callbacks[private_key] + else: + self._web_profile_callbacks[private_key] = queue.Queue() + else: + raise SAMPProxyError(5, f"Private-key {private_key} expired or invalid.") + return "" + + def _web_profile_pullCallbacks(self, private_key, timeout_secs): + self._update_last_activity_time() + if private_key in self._private_keys: + callback = [] + callback_queue = self._web_profile_callbacks[private_key] + try: + while self._is_running: + item_queued = callback_queue.get_nowait() + callback.append(item_queued) + except queue.Empty: + pass + return callback + else: + raise SAMPProxyError(5, f"Private-key {private_key} expired or invalid.") + + +class WebProfileDialog: + """ + A base class to make writing Web Profile GUI consent dialogs + easier. + + The concrete class must: + + 1) Poll ``handle_queue`` periodically, using the timer services + of the GUI's event loop. This function will call + ``self.show_dialog`` when a request requires authorization. + ``self.show_dialog`` will be given the arguments: + + - ``samp_name``: The name of the application making the request. + + - ``details``: A dictionary of details about the client + making the request. + + - ``client``: A hostname, port pair containing the client + address. + + - ``origin``: A string containing the origin of the + request. + + 2) Call ``consent`` or ``reject`` based on the user's response to + the dialog. + """ + + def handle_queue(self): + try: + request = self.queue_request.get_nowait() + except queue.Empty: # queue is set but empty + pass + except AttributeError: # queue has not been set yet + pass + else: + if isinstance(request[0], str): # To support the old protocol version + samp_name = request[0] + else: + samp_name = request[0]["samp.name"] + + self.show_dialog(samp_name, request[0], request[1], request[2]) + + def consent(self): + self.queue_result.put(True) + + def reject(self): + self.queue_result.put(False) diff --git a/pyvo/astropy_samp/hub_proxy.py b/pyvo/astropy_samp/hub_proxy.py new file mode 100644 index 000000000..2f56621b8 --- /dev/null +++ b/pyvo/astropy_samp/hub_proxy.py @@ -0,0 +1,197 @@ +# Licensed under a 3-clause BSD style license - see LICENSE.rst + + +import copy +import xmlrpc.client as xmlrpc + +from .errors import SAMPHubError +from .lockfile_helpers import get_main_running_hub +from .utils import ServerProxyPool + +__all__ = ["SAMPHubProxy"] + + +class SAMPHubProxy: + """ + Proxy class to simplify the client interaction with a SAMP hub (via the + standard profile). + """ + + def __init__(self): + self.proxy = None + self._connected = False + + @property + def is_connected(self): + """ + Whether the hub proxy is currently connected to a hub. + """ + return self._connected + + def connect(self, hub=None, hub_params=None, pool_size=20): + """ + Connect to the current SAMP Hub. + + Parameters + ---------- + hub : `~astropy.samp.SAMPHubServer`, optional + The hub to connect to. + + hub_params : dict, optional + Optional dictionary containing the lock-file content of the Hub + with which to connect. This dictionary has the form + ``{: , ...}``. + + pool_size : int, optional + The number of socket connections opened to communicate with the + Hub. + """ + self._connected = False + self.lockfile = {} + + if hub is not None and hub_params is not None: + raise ValueError("Cannot specify both hub and hub_params") + + if hub_params is None: + if hub is not None: + if not hub.is_running: + raise SAMPHubError("Hub is not running") + else: + hub_params = hub.params + else: + hub_params = get_main_running_hub() + + try: + url = hub_params["samp.hub.xmlrpc.url"].replace("\\", "") + + self.proxy = ServerProxyPool( + pool_size, xmlrpc.ServerProxy, url, allow_none=1 + ) + + self.ping() + + self.lockfile = copy.deepcopy(hub_params) + self._connected = True + + except xmlrpc.ProtocolError as p: + # 401 Unauthorized + if p.errcode == 401: + raise SAMPHubError( + "Unauthorized access. Basic Authentication required or failed." + ) + else: + raise SAMPHubError(f"Protocol Error {p.errcode}: {p.errmsg}") + + def disconnect(self): + """ + Disconnect from the current SAMP Hub. + """ + if self.proxy is not None: + self.proxy.shutdown() + self.proxy = None + self._connected = False + self.lockfile = {} + + @property + def _samp_hub(self): + """ + Property to abstract away the path to the hub, which allows this class + to be used for other profiles. + """ + return self.proxy.samp.hub + + def ping(self): + """ + Proxy to ``ping`` SAMP Hub method (Standard Profile only). + """ + return self._samp_hub.ping() + + def set_xmlrpc_callback(self, private_key, xmlrpc_addr): + """ + Proxy to ``setXmlrpcCallback`` SAMP Hub method (Standard Profile only). + """ + return self._samp_hub.setXmlrpcCallback(private_key, xmlrpc_addr) + + def register(self, secret): + """ + Proxy to ``register`` SAMP Hub method. + """ + return self._samp_hub.register(secret) + + def unregister(self, private_key): + """ + Proxy to ``unregister`` SAMP Hub method. + """ + return self._samp_hub.unregister(private_key) + + def declare_metadata(self, private_key, metadata): + """ + Proxy to ``declareMetadata`` SAMP Hub method. + """ + return self._samp_hub.declareMetadata(private_key, metadata) + + def get_metadata(self, private_key, client_id): + """ + Proxy to ``getMetadata`` SAMP Hub method. + """ + return self._samp_hub.getMetadata(private_key, client_id) + + def declare_subscriptions(self, private_key, subscriptions): + """ + Proxy to ``declareSubscriptions`` SAMP Hub method. + """ + return self._samp_hub.declareSubscriptions(private_key, subscriptions) + + def get_subscriptions(self, private_key, client_id): + """ + Proxy to ``getSubscriptions`` SAMP Hub method. + """ + return self._samp_hub.getSubscriptions(private_key, client_id) + + def get_registered_clients(self, private_key): + """ + Proxy to ``getRegisteredClients`` SAMP Hub method. + """ + return self._samp_hub.getRegisteredClients(private_key) + + def get_subscribed_clients(self, private_key, mtype): + """ + Proxy to ``getSubscribedClients`` SAMP Hub method. + """ + return self._samp_hub.getSubscribedClients(private_key, mtype) + + def notify(self, private_key, recipient_id, message): + """ + Proxy to ``notify`` SAMP Hub method. + """ + return self._samp_hub.notify(private_key, recipient_id, message) + + def notify_all(self, private_key, message): + """ + Proxy to ``notifyAll`` SAMP Hub method. + """ + return self._samp_hub.notifyAll(private_key, message) + + def call(self, private_key, recipient_id, msg_tag, message): + """ + Proxy to ``call`` SAMP Hub method. + """ + return self._samp_hub.call(private_key, recipient_id, msg_tag, message) + + def call_all(self, private_key, msg_tag, message): + """ + Proxy to ``callAll`` SAMP Hub method. + """ + return self._samp_hub.callAll(private_key, msg_tag, message) + + def call_and_wait(self, private_key, recipient_id, message, timeout): + """ + Proxy to ``callAndWait`` SAMP Hub method. + """ + return self._samp_hub.callAndWait(private_key, recipient_id, message, timeout) + + def reply(self, private_key, msg_id, response): + """ + Proxy to ``reply`` SAMP Hub method. + """ + return self._samp_hub.reply(private_key, msg_id, response) diff --git a/pyvo/astropy_samp/hub_script.py b/pyvo/astropy_samp/hub_script.py new file mode 100644 index 000000000..a0180890c --- /dev/null +++ b/pyvo/astropy_samp/hub_script.py @@ -0,0 +1,213 @@ +# Licensed under a 3-clause BSD style license - see LICENSE.rst + + +import argparse +import copy +import sys +import time + +from astropy import __version__, log + +from .hub import SAMPHubServer + +__all__ = ["hub_script"] + + +def hub_script(timeout=0): + """ + This main function is executed by the ``samp_hub`` command line tool. + """ + parser = argparse.ArgumentParser(prog="samp_hub " + __version__) + + parser.add_argument( + "-k", "--secret", dest="secret", metavar="CODE", help="custom secret code." + ) + + parser.add_argument( + "-d", "--addr", dest="addr", metavar="ADDR", help="listening address (or IP)." + ) + + parser.add_argument( + "-p", + "--port", + dest="port", + metavar="PORT", + type=int, + help="listening port number.", + ) + + parser.add_argument( + "-f", "--lockfile", dest="lockfile", metavar="FILE", help="custom lockfile." + ) + + parser.add_argument( + "-w", + "--no-web-profile", + dest="web_profile", + action="store_false", + help="run the Hub disabling the Web Profile.", + default=True, + ) + + parser.add_argument( + "-P", + "--pool-size", + dest="pool_size", + metavar="SIZE", + type=int, + help="the socket connections pool size.", + default=20, + ) + + timeout_group = parser.add_argument_group( + "Timeout group", + "Special options to setup hub and client timeouts." + "It contains a set of special options that allows to set up the Hub and " + "clients inactivity timeouts, that is the Hub or client inactivity time " + "interval after which the Hub shuts down or unregisters the client. " + "Notification of samp.hub.disconnect MType is sent to the clients " + "forcibly unregistered for timeout expiration.", + ) + + timeout_group.add_argument( + "-t", + "--timeout", + dest="timeout", + metavar="SECONDS", + help=( + "set the Hub inactivity timeout in SECONDS. By default it " + "is set to 0, that is the Hub never expires." + ), + type=int, + default=0, + ) + + timeout_group.add_argument( + "-c", + "--client-timeout", + dest="client_timeout", + metavar="SECONDS", + help=( + "set the client inactivity timeout in SECONDS. By default it " + "is set to 0, that is the client never expires." + ), + type=int, + default=0, + ) + + parser.add_argument_group(timeout_group) + + log_group = parser.add_argument_group( + "Logging options", + "Additional options which allow to customize the logging output. By " + "default the SAMP Hub uses the standard output and standard error " + "devices to print out INFO level logging messages. Using the options " + "here below it is possible to modify the logging level and also " + "specify the output files where redirect the logging messages.", + ) + + log_group.add_argument( + "-L", + "--log-level", + dest="loglevel", + metavar="LEVEL", + help="set the Hub instance log level (OFF, ERROR, WARNING, INFO, DEBUG).", + type=str, + choices=["OFF", "ERROR", "WARNING", "INFO", "DEBUG"], + default="INFO", + ) + + log_group.add_argument( + "-O", + "--log-output", + dest="logout", + metavar="FILE", + help="set the output file for the log messages.", + default="", + ) + + parser.add_argument_group(log_group) + + adv_group = parser.add_argument_group( + "Advanced group", + "Advanced options addressed to facilitate administrative tasks and " + "allow new non-standard Hub behaviors. In particular the --label " + "options is used to assign a value to hub.label token and is used to " + "assign a name to the Hub instance. " + "The very special --multi option allows to start a Hub in multi-instance mode. " + "Multi-instance mode is a non-standard Hub behavior that enables " + "multiple contemporaneous running Hubs. Multi-instance hubs place " + "their non-standard lock-files within the /.samp-1 " + "directory naming them making use of the format: " + "samp-hub--, where PID is the Hub process ID while ID is an " + "internal ID (integer).", + ) + + adv_group.add_argument( + "-l", + "--label", + dest="label", + metavar="LABEL", + help="assign a LABEL to the Hub.", + default="", + ) + + adv_group.add_argument( + "-m", + "--multi", + dest="mode", + help=( + "run the Hub in multi-instance mode generating a custom " + "lockfile with a random name." + ), + action="store_const", + const="multiple", + default="single", + ) + + parser.add_argument_group(adv_group) + + options = parser.parse_args() + + try: + if options.loglevel in ("OFF", "ERROR", "WARNING", "DEBUG", "INFO"): + log.setLevel(options.loglevel) + + if options.logout != "": + context = log.log_to_file(options.logout) + else: + + class dummy_context: + def __enter__(self): + pass + + def __exit__(self, exc_type, exc_value, traceback): + pass + + context = dummy_context() + + with context: + args = copy.deepcopy(options.__dict__) + del args["loglevel"] + del args["logout"] + + hub = SAMPHubServer(**args) + hub.start(False) + + if not timeout: + while hub.is_running: + time.sleep(0.01) + else: + time.sleep(timeout) + hub.stop() + + except KeyboardInterrupt: + try: + hub.stop() + except NameError: + pass + except OSError as e: + print(f"[SAMP] Error: I/O error({e.errno}): {e.strerror}") + sys.exit(1) + except SystemExit: + pass diff --git a/pyvo/astropy_samp/integrated_client.py b/pyvo/astropy_samp/integrated_client.py new file mode 100644 index 000000000..f53a947d8 --- /dev/null +++ b/pyvo/astropy_samp/integrated_client.py @@ -0,0 +1,504 @@ +# Licensed under a 3-clause BSD style license - see LICENSE.rst + + +from .client import SAMPClient +from .hub_proxy import SAMPHubProxy + +__all__ = ["SAMPIntegratedClient"] + +__doctest_skip__ = ["SAMPIntegratedClient.*"] + + +class SAMPIntegratedClient: + """ + A Simple SAMP client. + + This class is meant to simplify the client usage providing a proxy class + that merges the :class:`~astropy.samp.SAMPClient` and + :class:`~astropy.samp.SAMPHubProxy` functionalities in a + simplified API. + + Parameters + ---------- + name : str, optional + Client name (corresponding to ``samp.name`` metadata keyword). + + description : str, optional + Client description (corresponding to ``samp.description.text`` metadata + keyword). + + metadata : dict, optional + Client application metadata in the standard SAMP format. + + addr : str, optional + Listening address (or IP). This defaults to 127.0.0.1 if the internet + is not reachable, otherwise it defaults to the host name. + + port : int, optional + Listening XML-RPC server socket port. If left set to 0 (the default), + the operating system will select a free port. + + callable : bool, optional + Whether the client can receive calls and notifications. If set to + `False`, then the client can send notifications and calls, but can not + receive any. + """ + + def __init__( + self, + name=None, + description=None, + metadata=None, + addr=None, + port=0, + callable=True, + ): + self.hub = SAMPHubProxy() + + self.client_arguments = { + "name": name, + "description": description, + "metadata": metadata, + "addr": addr, + "port": port, + "callable": callable, + } + """ + Collected arguments that should be passed on to the SAMPClient below. + The SAMPClient used to be instantiated in __init__; however, this + caused problems with disconnecting and reconnecting to the HUB. + The client_arguments is used to maintain backwards compatibility. + """ + + self.client = None + "The client will be instantiated upon connect()." + + # GENERAL + + @property + def is_connected(self): + """ + Testing method to verify the client connection with a running Hub. + + Returns + ------- + is_connected : bool + True if the client is connected to a Hub, False otherwise. + """ + return self.hub.is_connected and self.client.is_running + + def connect(self, hub=None, hub_params=None, pool_size=20): + """ + Connect with the current or specified SAMP Hub, start and register the + client. + + Parameters + ---------- + hub : `~astropy.samp.SAMPHubServer`, optional + The hub to connect to. + + hub_params : dict, optional + Optional dictionary containing the lock-file content of the Hub + with which to connect. This dictionary has the form + ``{: , ...}``. + + pool_size : int, optional + The number of socket connections opened to communicate with the + Hub. + """ + self.hub.connect(hub, hub_params, pool_size) + + # The client has to be instantiated here and not in __init__() because + # this allows disconnecting and reconnecting to the HUB. Nonetheless, + # the client_arguments are set in __init__() because the + # instantiation of the client used to happen there and this retains + # backwards compatibility. + self.client = SAMPClient(self.hub, **self.client_arguments) + self.client.start() + self.client.register() + + def disconnect(self): + """ + Unregister the client from the current SAMP Hub, stop the client and + disconnect from the Hub. + """ + if self.is_connected: + try: + self.client.unregister() + finally: + if self.client.is_running: + self.client.stop() + self.hub.disconnect() + + # HUB + def ping(self): + """ + Proxy to ``ping`` SAMP Hub method (Standard Profile only). + """ + return self.hub.ping() + + def declare_metadata(self, metadata): + """ + Proxy to ``declareMetadata`` SAMP Hub method. + """ + return self.client.declare_metadata(metadata) + + def get_metadata(self, client_id): + """ + Proxy to ``getMetadata`` SAMP Hub method. + """ + return self.hub.get_metadata(self.get_private_key(), client_id) + + def get_subscriptions(self, client_id): + """ + Proxy to ``getSubscriptions`` SAMP Hub method. + """ + return self.hub.get_subscriptions(self.get_private_key(), client_id) + + def get_registered_clients(self): + """ + Proxy to ``getRegisteredClients`` SAMP Hub method. + + This returns all the registered clients, excluding the current client. + """ + return self.hub.get_registered_clients(self.get_private_key()) + + def get_subscribed_clients(self, mtype): + """ + Proxy to ``getSubscribedClients`` SAMP Hub method. + """ + return self.hub.get_subscribed_clients(self.get_private_key(), mtype) + + def _format_easy_msg(self, mtype, params): + msg = {} + + if "extra_kws" in params: + extra = params["extra_kws"] + del params["extra_kws"] + msg = {"samp.mtype": mtype, "samp.params": params} + msg.update(extra) + else: + msg = {"samp.mtype": mtype, "samp.params": params} + + return msg + + def notify(self, recipient_id, message): + """ + Proxy to ``notify`` SAMP Hub method. + """ + return self.hub.notify(self.get_private_key(), recipient_id, message) + + def enotify(self, recipient_id, mtype, **params): + """ + Easy to use version of :meth:`~astropy.samp.integrated_client.SAMPIntegratedClient.notify`. + + This is a proxy to ``notify`` method that allows to send the + notification message in a simplified way. + + Note that reserved ``extra_kws`` keyword is a dictionary with the + special meaning of being used to add extra keywords, in addition to + the standard ``samp.mtype`` and ``samp.params``, to the message sent. + + Parameters + ---------- + recipient_id : str + Recipient ID + + mtype : str + the MType to be notified + + params : dict or set of str + Variable keyword set which contains the list of parameters for the + specified MType. + + Examples + -------- + >>> from astropy.samp import SAMPIntegratedClient + >>> cli = SAMPIntegratedClient() + >>> ... + >>> cli.enotify("samp.msg.progress", msgid = "xyz", txt = "initialization", + ... percent = "10", extra_kws = {"my.extra.info": "just an example"}) + """ + return self.notify(recipient_id, self._format_easy_msg(mtype, params)) + + def notify_all(self, message): + """ + Proxy to ``notifyAll`` SAMP Hub method. + """ + return self.hub.notify_all(self.get_private_key(), message) + + def enotify_all(self, mtype, **params): + """ + Easy to use version of :meth:`~astropy.samp.integrated_client.SAMPIntegratedClient.notify_all`. + + This is a proxy to ``notifyAll`` method that allows to send the + notification message in a simplified way. + + Note that reserved ``extra_kws`` keyword is a dictionary with the + special meaning of being used to add extra keywords, in addition to + the standard ``samp.mtype`` and ``samp.params``, to the message sent. + + Parameters + ---------- + mtype : str + MType to be notified. + + params : dict or set of str + Variable keyword set which contains the list of parameters for + the specified MType. + + Examples + -------- + >>> from astropy.samp import SAMPIntegratedClient + >>> cli = SAMPIntegratedClient() + >>> ... + >>> cli.enotify_all("samp.msg.progress", txt = "initialization", + ... percent = "10", + ... extra_kws = {"my.extra.info": "just an example"}) + """ + return self.notify_all(self._format_easy_msg(mtype, params)) + + def call(self, recipient_id, msg_tag, message): + """ + Proxy to ``call`` SAMP Hub method. + """ + return self.hub.call(self.get_private_key(), recipient_id, msg_tag, message) + + def ecall(self, recipient_id, msg_tag, mtype, **params): + """ + Easy to use version of :meth:`~astropy.samp.integrated_client.SAMPIntegratedClient.call`. + + This is a proxy to ``call`` method that allows to send a call message + in a simplified way. + + Note that reserved ``extra_kws`` keyword is a dictionary with the + special meaning of being used to add extra keywords, in addition to + the standard ``samp.mtype`` and ``samp.params``, to the message sent. + + Parameters + ---------- + recipient_id : str + Recipient ID + + msg_tag : str + Message tag to use + + mtype : str + MType to be sent + + params : dict of set of str + Variable keyword set which contains the list of parameters for + the specified MType. + + Examples + -------- + >>> from astropy.samp import SAMPIntegratedClient + >>> cli = SAMPIntegratedClient() + >>> ... + >>> msgid = cli.ecall("abc", "xyz", "samp.msg.progress", + ... txt = "initialization", percent = "10", + ... extra_kws = {"my.extra.info": "just an example"}) + """ + return self.call(recipient_id, msg_tag, self._format_easy_msg(mtype, params)) + + def call_all(self, msg_tag, message): + """ + Proxy to ``callAll`` SAMP Hub method. + """ + return self.hub.call_all(self.get_private_key(), msg_tag, message) + + def ecall_all(self, msg_tag, mtype, **params): + """ + Easy to use version of :meth:`~astropy.samp.integrated_client.SAMPIntegratedClient.call_all`. + + This is a proxy to ``callAll`` method that allows to send the call + message in a simplified way. + + Note that reserved ``extra_kws`` keyword is a dictionary with the + special meaning of being used to add extra keywords, in addition to + the standard ``samp.mtype`` and ``samp.params``, to the message sent. + + Parameters + ---------- + msg_tag : str + Message tag to use + + mtype : str + MType to be sent + + params : dict of set of str + Variable keyword set which contains the list of parameters for + the specified MType. + + Examples + -------- + >>> from astropy.samp import SAMPIntegratedClient + >>> cli = SAMPIntegratedClient() + >>> ... + >>> msgid = cli.ecall_all("xyz", "samp.msg.progress", + ... txt = "initialization", percent = "10", + ... extra_kws = {"my.extra.info": "just an example"}) + """ + self.call_all(msg_tag, self._format_easy_msg(mtype, params)) + + def call_and_wait(self, recipient_id, message, timeout): + """ + Proxy to ``callAndWait`` SAMP Hub method. + """ + return self.hub.call_and_wait( + self.get_private_key(), recipient_id, message, timeout + ) + + def ecall_and_wait(self, recipient_id, mtype, timeout, **params): + """ + Easy to use version of :meth:`~astropy.samp.integrated_client.SAMPIntegratedClient.call_and_wait`. + + This is a proxy to ``callAndWait`` method that allows to send the call + message in a simplified way. + + Note that reserved ``extra_kws`` keyword is a dictionary with the + special meaning of being used to add extra keywords, in addition to + the standard ``samp.mtype`` and ``samp.params``, to the message sent. + + Parameters + ---------- + recipient_id : str + Recipient ID + + mtype : str + MType to be sent + + timeout : str + Call timeout in seconds + + params : dict of set of str + Variable keyword set which contains the list of parameters for + the specified MType. + + Examples + -------- + >>> from astropy.samp import SAMPIntegratedClient + >>> cli = SAMPIntegratedClient() + >>> ... + >>> cli.ecall_and_wait("xyz", "samp.msg.progress", "5", + ... txt = "initialization", percent = "10", + ... extra_kws = {"my.extra.info": "just an example"}) + """ + return self.call_and_wait( + recipient_id, self._format_easy_msg(mtype, params), timeout + ) + + def reply(self, msg_id, response): + """ + Proxy to ``reply`` SAMP Hub method. + """ + return self.hub.reply(self.get_private_key(), msg_id, response) + + def _format_easy_response(self, status, result, error): + msg = {"samp.status": status} + if result is not None: + msg.update({"samp.result": result}) + if error is not None: + msg.update({"samp.error": error}) + + return msg + + def ereply(self, msg_id, status, result=None, error=None): + """ + Easy to use version of :meth:`~astropy.samp.integrated_client.SAMPIntegratedClient.reply`. + + This is a proxy to ``reply`` method that allows to send a reply + message in a simplified way. + + Parameters + ---------- + msg_id : str + Message ID to which reply. + + status : str + Content of the ``samp.status`` response keyword. + + result : dict + Content of the ``samp.result`` response keyword. + + error : dict + Content of the ``samp.error`` response keyword. + + Examples + -------- + >>> from astropy.samp import SAMPIntegratedClient, SAMP_STATUS_ERROR + >>> cli = SAMPIntegratedClient() + >>> ... + >>> cli.ereply("abd", SAMP_STATUS_ERROR, result={}, + ... error={"samp.errortxt": "Test error message"}) + """ + return self.reply(msg_id, self._format_easy_response(status, result, error)) + + # CLIENT + + def receive_notification(self, private_key, sender_id, message): + return self.client.receive_notification(private_key, sender_id, message) + + receive_notification.__doc__ = SAMPClient.receive_notification.__doc__ + + def receive_call(self, private_key, sender_id, msg_id, message): + return self.client.receive_call(private_key, sender_id, msg_id, message) + + receive_call.__doc__ = SAMPClient.receive_call.__doc__ + + def receive_response(self, private_key, responder_id, msg_tag, response): + return self.client.receive_response( + private_key, responder_id, msg_tag, response + ) + + receive_response.__doc__ = SAMPClient.receive_response.__doc__ + + def bind_receive_message(self, mtype, function, declare=True, metadata=None): + self.client.bind_receive_message(mtype, function, declare=True, metadata=None) + + bind_receive_message.__doc__ = SAMPClient.bind_receive_message.__doc__ + + def bind_receive_notification(self, mtype, function, declare=True, metadata=None): + self.client.bind_receive_notification(mtype, function, declare, metadata) + + bind_receive_notification.__doc__ = SAMPClient.bind_receive_notification.__doc__ + + def bind_receive_call(self, mtype, function, declare=True, metadata=None): + self.client.bind_receive_call(mtype, function, declare, metadata) + + bind_receive_call.__doc__ = SAMPClient.bind_receive_call.__doc__ + + def bind_receive_response(self, msg_tag, function): + self.client.bind_receive_response(msg_tag, function) + + bind_receive_response.__doc__ = SAMPClient.bind_receive_response.__doc__ + + def unbind_receive_notification(self, mtype, declare=True): + self.client.unbind_receive_notification(mtype, declare) + + unbind_receive_notification.__doc__ = SAMPClient.unbind_receive_notification.__doc__ + + def unbind_receive_call(self, mtype, declare=True): + self.client.unbind_receive_call(mtype, declare) + + unbind_receive_call.__doc__ = SAMPClient.unbind_receive_call.__doc__ + + def unbind_receive_response(self, msg_tag): + self.client.unbind_receive_response(msg_tag) + + unbind_receive_response.__doc__ = SAMPClient.unbind_receive_response.__doc__ + + def declare_subscriptions(self, subscriptions=None): + self.client.declare_subscriptions(subscriptions) + + declare_subscriptions.__doc__ = SAMPClient.declare_subscriptions.__doc__ + + def get_private_key(self): + return self.client.get_private_key() + + get_private_key.__doc__ = SAMPClient.get_private_key.__doc__ + + def get_public_id(self): + return self.client.get_public_id() + + get_public_id.__doc__ = SAMPClient.get_public_id.__doc__ diff --git a/pyvo/astropy_samp/lockfile_helpers.py b/pyvo/astropy_samp/lockfile_helpers.py new file mode 100644 index 000000000..b9ac2d141 --- /dev/null +++ b/pyvo/astropy_samp/lockfile_helpers.py @@ -0,0 +1,257 @@ +# Licensed under a 3-clause BSD style license - see LICENSE.rst + + +# TODO: this file should be refactored to use a more thread-safe and +# race-condition-safe lockfile mechanism. + +import datetime +import os +import stat +import warnings +import xmlrpc.client as xmlrpc +from contextlib import suppress +from urllib.parse import urlparse + +from astropy import log +from astropy.config.paths import _find_home +from astropy.utils.data import get_readable_fileobj + +from .errors import SAMPHubError, SAMPWarning + + +def read_lockfile(lockfilename): + """ + Read in the lockfile given by ``lockfilename`` into a dictionary. + """ + # lockfilename may be a local file or a remote URL, but + # get_readable_fileobj takes care of this. + lockfiledict = {} + with get_readable_fileobj(lockfilename) as f: + for line in f: + if not line.startswith("#"): + kw, val = line.split("=") + lockfiledict[kw.strip()] = val.strip() + return lockfiledict + + +def write_lockfile(lockfilename, lockfiledict): + lockfile = open(lockfilename, "w") + lockfile.close() + os.chmod(lockfilename, stat.S_IREAD + stat.S_IWRITE) + + lockfile = open(lockfilename, "w") + now_iso = datetime.datetime.now().isoformat() + lockfile.write(f"# SAMP lockfile written on {now_iso}\n") + lockfile.write("# Standard Profile required keys\n") + for key, value in lockfiledict.items(): + lockfile.write(f"{key}={value}\n") + lockfile.close() + + +def create_lock_file(lockfilename=None, mode=None, hub_id=None, hub_params=None): + # Remove lock-files of dead hubs + remove_garbage_lock_files() + + lockfiledir = "" + + # CHECK FOR SAMP_HUB ENVIRONMENT VARIABLE + if "SAMP_HUB" in os.environ: + # For the time being I assume just the std profile supported. + if os.environ["SAMP_HUB"].startswith("std-lockurl:"): + lockfilename = os.environ["SAMP_HUB"][len("std-lockurl:") :] + lockfile_parsed = urlparse(lockfilename) + + if lockfile_parsed[0] != "file": + warnings.warn( + f"Unable to start a Hub with lockfile {lockfilename}. " + "Start-up process aborted.", + SAMPWarning, + ) + return False + else: + lockfilename = lockfile_parsed[2] + else: + # If it is a fresh Hub instance + if lockfilename is None: + log.debug("Running mode: " + mode) + + if mode == "single": + lockfilename = os.path.join(_find_home(), ".samp") + else: + lockfiledir = os.path.join(_find_home(), ".samp-1") + + # If missing create .samp-1 directory + try: + os.mkdir(lockfiledir) + except OSError: + pass # directory already exists + finally: + os.chmod(lockfiledir, stat.S_IREAD + stat.S_IWRITE + stat.S_IEXEC) + + lockfilename = os.path.join(lockfiledir, f"samp-hub-{hub_id}") + + else: + log.debug("Running mode: multiple") + + hub_is_running, lockfiledict = check_running_hub(lockfilename) + + if hub_is_running: + warnings.warn( + "Another SAMP Hub is already running. Start-up process aborted.", + SAMPWarning, + ) + return False + + log.debug("Lock-file: " + lockfilename) + + write_lockfile(lockfilename, hub_params) + + return lockfilename + + +def get_main_running_hub(): + """ + Get either the hub given by the environment variable SAMP_HUB, or the one + given by the lockfile .samp in the user home directory. + """ + hubs = get_running_hubs() + + if not hubs: + raise SAMPHubError("Unable to find a running SAMP Hub.") + + # CHECK FOR SAMP_HUB ENVIRONMENT VARIABLE + if "SAMP_HUB" in os.environ: + # For the time being I assume just the std profile supported. + if os.environ["SAMP_HUB"].startswith("std-lockurl:"): + lockfilename = os.environ["SAMP_HUB"][len("std-lockurl:") :] + else: + raise SAMPHubError("SAMP Hub profile not supported.") + else: + lockfilename = os.path.join(_find_home(), ".samp") + + return hubs[lockfilename] + + +def get_running_hubs(): + """ + Return a dictionary containing the lock-file contents of all the currently + running hubs (single and/or multiple mode). + + The dictionary format is: + + ``{: {: , ...}, ...}`` + + where ``{}`` is the lock-file name, ``{}`` and + ``{}`` are the lock-file tokens (name and content). + + Returns + ------- + running_hubs : dict + Lock-file contents of all the currently running hubs. + """ + hubs = {} + lockfilename = "" + + # HUB SINGLE INSTANCE MODE + + # CHECK FOR SAMP_HUB ENVIRONMENT VARIABLE + if "SAMP_HUB" in os.environ: + # For the time being I assume just the std profile supported. + if os.environ["SAMP_HUB"].startswith("std-lockurl:"): + lockfilename = os.environ["SAMP_HUB"][len("std-lockurl:") :] + else: + lockfilename = os.path.join(_find_home(), ".samp") + + hub_is_running, lockfiledict = check_running_hub(lockfilename) + + if hub_is_running: + hubs[lockfilename] = lockfiledict + + # HUB MULTIPLE INSTANCE MODE + + lockfiledir = "" + + lockfiledir = os.path.join(_find_home(), ".samp-1") + + if os.path.isdir(lockfiledir): + for filename in os.listdir(lockfiledir): + if filename.startswith("samp-hub"): + lockfilename = os.path.join(lockfiledir, filename) + hub_is_running, lockfiledict = check_running_hub(lockfilename) + if hub_is_running: + hubs[lockfilename] = lockfiledict + + return hubs + + +def check_running_hub(lockfilename): + """ + Test whether a hub identified by ``lockfilename`` is running or not. + + Parameters + ---------- + lockfilename : str + Lock-file name (path + file name) of the Hub to be tested. + + Returns + ------- + is_running : bool + Whether the hub is running + hub_params : dict + If the hub is running this contains the parameters from the lockfile + """ + is_running = False + lockfiledict = {} + + # Check whether a lockfile already exists + try: + lockfiledict = read_lockfile(lockfilename) + except OSError: + return is_running, lockfiledict + + if "samp.hub.xmlrpc.url" in lockfiledict: + try: + proxy = xmlrpc.ServerProxy( + lockfiledict["samp.hub.xmlrpc.url"].replace("\\", ""), allow_none=1 + ) + proxy.samp.hub.ping() + is_running = True + except xmlrpc.ProtocolError: + # There is a protocol error (e.g. for authentication required), + # but the server is alive + is_running = True + except OSError: + pass + + return is_running, lockfiledict + + +def remove_garbage_lock_files(): + lockfilename = "" + + # HUB SINGLE INSTANCE MODE + + lockfilename = os.path.join(_find_home(), ".samp") + + hub_is_running, lockfiledict = check_running_hub(lockfilename) + + if not hub_is_running: + # If lockfilename belongs to a dead hub, then it is deleted + if os.path.isfile(lockfilename): + with suppress(OSError): + os.remove(lockfilename) + + # HUB MULTIPLE INSTANCE MODE + + lockfiledir = os.path.join(_find_home(), ".samp-1") + + if os.path.isdir(lockfiledir): + for filename in os.listdir(lockfiledir): + if filename.startswith("samp-hub"): + lockfilename = os.path.join(lockfiledir, filename) + hub_is_running, lockfiledict = check_running_hub(lockfilename) + if not hub_is_running: + # If lockfilename belongs to a dead hub, then it is deleted + if os.path.isfile(lockfilename): + with suppress(OSError): + os.remove(lockfilename) diff --git a/pyvo/astropy_samp/setup_package.py b/pyvo/astropy_samp/setup_package.py new file mode 100644 index 000000000..e69de29bb diff --git a/pyvo/astropy_samp/standard_profile.py b/pyvo/astropy_samp/standard_profile.py new file mode 100644 index 000000000..1bbf4b754 --- /dev/null +++ b/pyvo/astropy_samp/standard_profile.py @@ -0,0 +1,161 @@ +# Licensed under a 3-clause BSD style license - see LICENSE.rst + + +import socketserver +import sys +import traceback +import warnings +import xmlrpc.client as xmlrpc +from xmlrpc.server import SimpleXMLRPCRequestHandler, SimpleXMLRPCServer + +from .constants import SAMP_ICON +from .errors import SAMPWarning + +__all__ = [] + + +class SAMPSimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler): + """ + XMLRPC handler of Standard Profile requests. + """ + + def do_GET(self): + if self.path == "/samp/icon": + self.send_response(200, "OK") + self.send_header("Content-Type", "image/png") + self.end_headers() + self.wfile.write(SAMP_ICON) + + def do_POST(self): + """ + Handles the HTTP POST request. + + Attempts to interpret all HTTP POST requests as XML-RPC calls, + which are forwarded to the server's ``_dispatch`` method for + handling. + """ + # Check that the path is legal + if not self.is_rpc_path_valid(): + self.report_404() + return + + try: + # Get arguments by reading body of request. + # We read this in chunks to avoid straining + # socket.read(); around the 10 or 15Mb mark, some platforms + # begin to have problems (bug #792570). + max_chunk_size = 10 * 1024 * 1024 + size_remaining = int(self.headers["content-length"]) + L = [] + while size_remaining: + chunk_size = min(size_remaining, max_chunk_size) + L.append(self.rfile.read(chunk_size)) + size_remaining -= len(L[-1]) + data = b"".join(L) + + params, method = xmlrpc.loads(data) + + if method == "samp.webhub.register": + params = list(params) + params.append(self.client_address) + if "Origin" in self.headers: + params.append(self.headers.get("Origin")) + else: + params.append("unknown") + params = tuple(params) + data = xmlrpc.dumps(params, methodname=method) + + elif method in ( + "samp.hub.notify", + "samp.hub.notifyAll", + "samp.hub.call", + "samp.hub.callAll", + "samp.hub.callAndWait", + ): + user = "unknown" + + if method == "samp.hub.callAndWait": + params[2]["host"] = self.address_string() + params[2]["user"] = user + else: + params[-1]["host"] = self.address_string() + params[-1]["user"] = user + + data = xmlrpc.dumps(params, methodname=method) + + data = self.decode_request_content(data) + if data is None: + return # response has been sent + + # In previous versions of SimpleXMLRPCServer, _dispatch + # could be overridden in this class, instead of in + # SimpleXMLRPCDispatcher. To maintain backwards compatibility, + # check to see if a subclass implements _dispatch and dispatch + # using that method if present. + response = self.server._marshaled_dispatch( + data, getattr(self, "_dispatch", None), self.path + ) + except Exception as e: + # This should only happen if the module is buggy + # internal error, report as HTTP server error + self.send_response(500) + + # Send information about the exception if requested + if ( + hasattr(self.server, "_send_traceback_header") + and self.server._send_traceback_header + ): + self.send_header("X-exception", str(e)) + trace = traceback.format_exc() + trace = str(trace.encode("ASCII", "backslashreplace"), "ASCII") + self.send_header("X-traceback", trace) + + self.send_header("Content-length", "0") + self.end_headers() + else: + # got a valid XML RPC response + self.send_response(200) + self.send_header("Content-type", "text/xml") + if self.encode_threshold is not None: + if len(response) > self.encode_threshold: + q = self.accept_encodings().get("gzip", 0) + if q: + try: + response = xmlrpc.gzip_encode(response) + self.send_header("Content-Encoding", "gzip") + except NotImplementedError: + pass + self.send_header("Content-length", str(len(response))) + self.end_headers() + self.wfile.write(response) + + +class ThreadingXMLRPCServer(socketserver.ThreadingMixIn, SimpleXMLRPCServer): + """ + Asynchronous multithreaded XMLRPC server. + """ + + def __init__( + self, + addr, + log=None, + requestHandler=SAMPSimpleXMLRPCRequestHandler, + logRequests=True, + allow_none=True, + encoding=None, + ): + self.log = log + SimpleXMLRPCServer.__init__( + self, addr, requestHandler, logRequests, allow_none, encoding + ) + + def handle_error(self, request, client_address): + if self.log is None: + socketserver.BaseServer.handle_error(self, request, client_address) + else: + warnings.warn( + "Exception happened during processing of request from {}: {}".format( + client_address, sys.exc_info()[1] + ), + SAMPWarning, + ) diff --git a/pyvo/astropy_samp/tests/__init__.py b/pyvo/astropy_samp/tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/pyvo/astropy_samp/tests/test_client.py b/pyvo/astropy_samp/tests/test_client.py new file mode 100644 index 000000000..14fdaefd8 --- /dev/null +++ b/pyvo/astropy_samp/tests/test_client.py @@ -0,0 +1,60 @@ +# Licensed under a 3-clause BSD style license - see LICENSE.rst + +import pytest + +# By default, tests should not use the internet. +from pyvo.astropy_samp import SAMPWarning, conf +from pyvo.astropy_samp.client import SAMPClient +from pyvo.astropy_samp.hub import SAMPHubServer +from pyvo.astropy_samp.hub_proxy import SAMPHubProxy +from pyvo.astropy_samp.integrated_client import SAMPIntegratedClient + + +def setup_module(module): + conf.use_internet = False + + +def test_SAMPHubProxy(): + """Test that SAMPHubProxy can be instantiated""" + SAMPHubProxy() + + +def test_SAMPClient(): + """Test that SAMPClient can be instantiated""" + proxy = SAMPHubProxy() + SAMPClient(proxy) + + +def test_SAMPIntegratedClient(): + """Test that SAMPIntegratedClient can be instantiated""" + SAMPIntegratedClient() + + +@pytest.fixture +def samp_hub(): + """A fixture that can be used by client tests that require a HUB.""" + my_hub = SAMPHubServer() + my_hub.start() + yield + my_hub.stop() + + +def test_SAMPIntegratedClient_notify_all(samp_hub): + """Test that SAMP returns a warning if no receiver got the message.""" + client = SAMPIntegratedClient() + client.connect() + message = {"samp.mtype": "coverage.load.moc.fits"} + with pytest.warns(SAMPWarning): + client.notify_all(message) + client.disconnect() + + +def test_reconnect(samp_hub): + """Test that SAMPIntegratedClient can reconnect. + This is a regression test for bug [#2673] + https://github.com/astropy/astropy/issues/2673 + """ + my_client = SAMPIntegratedClient() + my_client.connect() + my_client.disconnect() + my_client.connect() diff --git a/pyvo/astropy_samp/tests/test_errors.py b/pyvo/astropy_samp/tests/test_errors.py new file mode 100644 index 000000000..a6a24f678 --- /dev/null +++ b/pyvo/astropy_samp/tests/test_errors.py @@ -0,0 +1,24 @@ +# Licensed under a 3-clause BSD style license - see LICENSE.rst + +# By default, tests should not use the internet. +from pyvo.astropy_samp import conf +from pyvo.astropy_samp.errors import SAMPClientError, SAMPHubError, SAMPProxyError + + +def setup_module(module): + conf.use_internet = False + + +def test_SAMPHubError(): + """Test that SAMPHubError can be instantiated""" + SAMPHubError("test") + + +def test_SAMPClientError(): + """Test that SAMPClientError can be instantiated""" + SAMPClientError("test") + + +def test_SAMPProxyError(): + """Test that SAMPProxyError can be instantiated""" + SAMPProxyError("test", "any") diff --git a/pyvo/astropy_samp/tests/test_helpers.py b/pyvo/astropy_samp/tests/test_helpers.py new file mode 100644 index 000000000..45da8d047 --- /dev/null +++ b/pyvo/astropy_samp/tests/test_helpers.py @@ -0,0 +1,70 @@ +import os +import pickle +import random +import string +import time + +from pyvo.astropy_samp import SAMP_STATUS_OK + +TEST_REPLY = {"samp.status": SAMP_STATUS_OK, "samp.result": {"txt": "test"}} + + +def write_output(mtype, private_key, sender_id, params): + filename = params["verification_file"] + f = open(filename, "wb") + pickle.dump(mtype, f) + pickle.dump(private_key, f) + pickle.dump(sender_id, f) + pickle.dump(params, f) + f.close() + + +def assert_output(mtype, private_key, sender_id, params, timeout=None): + filename = params["verification_file"] + start = time.time() + while True: + try: + with open(filename, "rb") as f: + rec_mtype = pickle.load(f) + rec_private_key = pickle.load(f) + rec_sender_id = pickle.load(f) + rec_params = pickle.load(f) + break + except (OSError, EOFError): + if timeout is not None and time.time() - start > timeout: + raise Exception(f"Timeout while waiting for file: {filename}") + + assert rec_mtype == mtype + assert rec_private_key == private_key + assert rec_sender_id == sender_id + assert rec_params == params + + +class Receiver: + def __init__(self, client): + self.client = client + + def receive_notification(self, private_key, sender_id, mtype, params, extra): + write_output(mtype, private_key, sender_id, params) + + def receive_call(self, private_key, sender_id, msg_id, mtype, params, extra): + # Here we need to make sure that we first reply, *then* write out the + # file, otherwise the tests see the file and move to the next call + # before waiting for the reply to be received. + self.client.reply(msg_id, TEST_REPLY) + self.receive_notification(private_key, sender_id, mtype, params, extra) + + def receive_response(self, private_key, sender_id, msg_id, response): + pass + + +def random_id(length=16): + return "".join(random.sample(string.ascii_letters + string.digits, length)) + + +def random_params(directory): + return { + "verification_file": os.path.join(directory, random_id()), + "parameter1": "abcde", + "parameter2": 1331, + } diff --git a/pyvo/astropy_samp/tests/test_hub.py b/pyvo/astropy_samp/tests/test_hub.py new file mode 100644 index 000000000..e6ba57e20 --- /dev/null +++ b/pyvo/astropy_samp/tests/test_hub.py @@ -0,0 +1,42 @@ +# Licensed under a 3-clause BSD style license - see LICENSE.rst + +import time + +import pytest + +from pyvo.astropy_samp import conf +from pyvo.astropy_samp.hub import SAMPHubServer + + +def setup_module(module): + conf.use_internet = False + + +def test_SAMPHubServer(): + """Test that SAMPHub can be instantiated""" + SAMPHubServer(web_profile=False, mode="multiple", pool_size=1) + + +def test_SAMPHubServer_run(): + """Test that SAMPHub can be run""" + hub = SAMPHubServer(web_profile=False, mode="multiple", pool_size=1) + hub.start() + time.sleep(1) + hub.stop() + + +@pytest.mark.slow +def test_SAMPHubServer_run_repeated(): + """ + Test that SAMPHub can be restarted after it has been stopped, including + when web profile support is enabled. + """ + + hub = SAMPHubServer(web_profile=True, mode="multiple", pool_size=1) + hub.start() + time.sleep(1) + hub.stop() + time.sleep(1) + hub.start() + time.sleep(1) + hub.stop() diff --git a/pyvo/astropy_samp/tests/test_hub_proxy.py b/pyvo/astropy_samp/tests/test_hub_proxy.py new file mode 100644 index 000000000..d94afe7be --- /dev/null +++ b/pyvo/astropy_samp/tests/test_hub_proxy.py @@ -0,0 +1,47 @@ +from pyvo.astropy_samp import conf +from pyvo.astropy_samp.hub import SAMPHubServer +from pyvo.astropy_samp.hub_proxy import SAMPHubProxy + + +def setup_module(module): + conf.use_internet = False + + +class TestHubProxy: + def setup_method(self, method): + self.hub = SAMPHubServer(web_profile=False, mode="multiple", pool_size=1) + self.hub.start() + + self.proxy = SAMPHubProxy() + self.proxy.connect(hub=self.hub, pool_size=1) + + def teardown_method(self, method): + if self.proxy.is_connected: + self.proxy.disconnect() + + self.hub.stop() + + def test_is_connected(self): + assert self.proxy.is_connected + + def test_disconnect(self): + self.proxy.disconnect() + + def test_ping(self): + self.proxy.ping() + + def test_registration(self): + result = self.proxy.register(self.proxy.lockfile["samp.secret"]) + self.proxy.unregister(result["samp.private-key"]) + + +def test_custom_lockfile(tmp_path): + lockfile = str(tmp_path / ".samptest") + + hub = SAMPHubServer(web_profile=False, lockfile=lockfile, pool_size=1) + hub.start() + + proxy = SAMPHubProxy() + proxy.connect(hub=hub, pool_size=1) + + hub.stop() diff --git a/pyvo/astropy_samp/tests/test_hub_script.py b/pyvo/astropy_samp/tests/test_hub_script.py new file mode 100644 index 000000000..7a5e73b39 --- /dev/null +++ b/pyvo/astropy_samp/tests/test_hub_script.py @@ -0,0 +1,26 @@ +import sys + +import pytest + +from pyvo.astropy_samp import conf +from pyvo.astropy_samp.hub_script import hub_script + + +def setup_module(module): + conf.use_internet = False + + +def setup_function(function): + function.sys_argv_orig = sys.argv + sys.argv = ["samp_hub"] + + +def teardown_function(function): + sys.argv = function.sys_argv_orig + + +@pytest.mark.slow +def test_hub_script(): + sys.argv.append("-m") # run in multiple mode + sys.argv.append("-w") # disable web profile + hub_script(timeout=3) diff --git a/pyvo/astropy_samp/tests/test_standard_profile.py b/pyvo/astropy_samp/tests/test_standard_profile.py new file mode 100644 index 000000000..491fc6075 --- /dev/null +++ b/pyvo/astropy_samp/tests/test_standard_profile.py @@ -0,0 +1,292 @@ +import pytest + +# By default, tests should not use the internet. +from pyvo.astropy_samp import conf +from pyvo.astropy_samp.errors import SAMPProxyError +from pyvo.astropy_samp.hub import SAMPHubServer +from pyvo.astropy_samp.integrated_client import SAMPIntegratedClient +from astropy.tests.helper import CI + +from .test_helpers import TEST_REPLY, Receiver, assert_output, random_params + + +def setup_module(module): + conf.use_internet = False + + +@pytest.mark.skipif(CI, reason="flaky in CI") +class TestStandardProfile: + @property + def hub_init_kwargs(self): + return {} + + @property + def client_init_kwargs(self): + return {} + + @property + def client_connect_kwargs(self): + return {} + + @pytest.fixture(autouse=True) + def setup_method(self, tmp_path): + self.tmpdir = str(tmp_path) + + self.hub = SAMPHubServer( + web_profile=False, mode="multiple", pool_size=1, **self.hub_init_kwargs + ) + self.hub.start() + + self.client1 = SAMPIntegratedClient(**self.client_init_kwargs) + self.client1.connect(hub=self.hub, pool_size=1, **self.client_connect_kwargs) + + self.client2 = SAMPIntegratedClient(**self.client_init_kwargs) + self.client2.connect(hub=self.hub, pool_size=1, **self.client_connect_kwargs) + + def teardown_method(self): + if self.client1.is_connected: + self.client1.disconnect() + if self.client2.is_connected: + self.client2.disconnect() + + self.hub.stop() + + def test_main(self): + self.client1_id = self.client1.get_public_id() + self.client2_id = self.client2.get_public_id() + + self.metadata1 = { + "samp.name": "Client 1", + "samp.description.text": "Client 1 Description", + "client.version": "1.1", + } + + self.metadata2 = { + "samp.name": "Client 2", + "samp.description.text": "Client 2 Description", + "client.version": "1.2", + } + + # Check that the clients are connected + + assert self.client1.is_connected + assert self.client2.is_connected + + # Check that ping works + + self.client1.ping() + self.client2.ping() + + # Check that get_registered_clients works as expected. + + assert self.client1_id not in self.client1.get_registered_clients() + assert self.client2_id in self.client1.get_registered_clients() + assert self.client1_id in self.client2.get_registered_clients() + assert self.client2_id not in self.client2.get_registered_clients() + + # Check that get_metadata works as expected + + assert self.client1.get_metadata(self.client1_id) == {} + assert self.client1.get_metadata(self.client2_id) == {} + assert self.client2.get_metadata(self.client1_id) == {} + assert self.client2.get_metadata(self.client2_id) == {} + + self.client1.declare_metadata(self.metadata1) + + assert self.client1.get_metadata(self.client1_id) == self.metadata1 + assert self.client2.get_metadata(self.client1_id) == self.metadata1 + assert self.client1.get_metadata(self.client2_id) == {} + assert self.client2.get_metadata(self.client2_id) == {} + + self.client2.declare_metadata(self.metadata2) + + assert self.client1.get_metadata(self.client1_id) == self.metadata1 + assert self.client2.get_metadata(self.client1_id) == self.metadata1 + assert self.client1.get_metadata(self.client2_id) == self.metadata2 + assert self.client2.get_metadata(self.client2_id) == self.metadata2 + + # Check that, without subscriptions, sending a notification from one + # client to another raises an error. + + message = {} + message["samp.mtype"] = "table.load.votable" + message["samp.params"] = {} + + with pytest.raises(SAMPProxyError): + self.client1.notify(self.client2_id, message) + + # Check that there are no currently active subscriptions + + assert self.client1.get_subscribed_clients("table.load.votable") == {} + assert self.client2.get_subscribed_clients("table.load.votable") == {} + + # We now test notifications and calls + + rec1 = Receiver(self.client1) + rec2 = Receiver(self.client2) + + self.client2.bind_receive_notification( + "table.load.votable", rec2.receive_notification + ) + + self.client2.bind_receive_call("table.load.votable", rec2.receive_call) + + self.client1.bind_receive_response("test-tag", rec1.receive_response) + + # Check resulting subscriptions + + assert self.client1.get_subscribed_clients("table.load.votable") == { + self.client2_id: {} + } + assert self.client2.get_subscribed_clients("table.load.votable") == {} + + assert "table.load.votable" in self.client1.get_subscriptions(self.client2_id) + assert "table.load.votable" in self.client2.get_subscriptions(self.client2_id) + + # Once we have finished with the calls and notifications, we will + # check the data got across correctly. + + # Test notify + + params = random_params(self.tmpdir) + self.client1.notify( + self.client2.get_public_id(), + {"samp.mtype": "table.load.votable", "samp.params": params}, + ) + + assert_output( + "table.load.votable", + self.client2.get_private_key(), + self.client1_id, + params, + timeout=60, + ) + + params = random_params(self.tmpdir) + self.client1.enotify( + self.client2.get_public_id(), "table.load.votable", **params + ) + + assert_output( + "table.load.votable", + self.client2.get_private_key(), + self.client1_id, + params, + timeout=60, + ) + + # Test notify_all + + params = random_params(self.tmpdir) + self.client1.notify_all( + {"samp.mtype": "table.load.votable", "samp.params": params} + ) + + assert_output( + "table.load.votable", + self.client2.get_private_key(), + self.client1_id, + params, + timeout=60, + ) + + params = random_params(self.tmpdir) + self.client1.enotify_all("table.load.votable", **params) + + assert_output( + "table.load.votable", + self.client2.get_private_key(), + self.client1_id, + params, + timeout=60, + ) + + # Test call + + params = random_params(self.tmpdir) + self.client1.call( + self.client2.get_public_id(), + "test-tag", + {"samp.mtype": "table.load.votable", "samp.params": params}, + ) + + assert_output( + "table.load.votable", + self.client2.get_private_key(), + self.client1_id, + params, + timeout=60, + ) + + params = random_params(self.tmpdir) + self.client1.ecall( + self.client2.get_public_id(), "test-tag", "table.load.votable", **params + ) + + assert_output( + "table.load.votable", + self.client2.get_private_key(), + self.client1_id, + params, + timeout=60, + ) + + # Test call_all + + params = random_params(self.tmpdir) + self.client1.call_all( + "tag1", {"samp.mtype": "table.load.votable", "samp.params": params} + ) + + assert_output( + "table.load.votable", + self.client2.get_private_key(), + self.client1_id, + params, + timeout=60, + ) + + params = random_params(self.tmpdir) + self.client1.ecall_all("tag2", "table.load.votable", **params) + + assert_output( + "table.load.votable", + self.client2.get_private_key(), + self.client1_id, + params, + timeout=60, + ) + + # Test call_and_wait + + params = random_params(self.tmpdir) + result = self.client1.call_and_wait( + self.client2.get_public_id(), + {"samp.mtype": "table.load.votable", "samp.params": params}, + timeout=5, + ) + + assert result == TEST_REPLY + assert_output( + "table.load.votable", + self.client2.get_private_key(), + self.client1_id, + params, + timeout=60, + ) + + params = random_params(self.tmpdir) + result = self.client1.ecall_and_wait( + self.client2.get_public_id(), "table.load.votable", timeout=5, **params + ) + + assert result == TEST_REPLY + assert_output( + "table.load.votable", + self.client2.get_private_key(), + self.client1_id, + params, + timeout=60, + ) + + # TODO: check that receive_response received the right data diff --git a/pyvo/astropy_samp/tests/test_web_profile.py b/pyvo/astropy_samp/tests/test_web_profile.py new file mode 100644 index 000000000..9277b6f6b --- /dev/null +++ b/pyvo/astropy_samp/tests/test_web_profile.py @@ -0,0 +1,87 @@ +""" +Test the web profile using Python classes that have been adapted to act like a +web client. We can only put a single test here because only one hub can run +with the web profile active, and the user might want to run the tests in +parallel. +""" +import threading +from urllib.request import Request, urlopen + +import pytest + +from astropy.samp import SAMPHubServer, SAMPIntegratedClient, conf +from astropy.samp.web_profile import CLIENT_ACCESS_POLICY, CROSS_DOMAIN +from astropy.tests.helper import CI +from astropy.utils.data import get_readable_fileobj + +from .test_standard_profile import TestStandardProfile as BaseTestStandardProfile +from .web_profile_test_helpers import ( + AlwaysApproveWebProfileDialog, + SAMPIntegratedWebClient, +) + + +def setup_module(module): + conf.use_internet = False + + +@pytest.mark.skipif(CI, reason="flaky in CI") +class TestWebProfile(BaseTestStandardProfile): + @pytest.fixture(autouse=True) + def setup_method(self, tmp_path): + self.dialog = AlwaysApproveWebProfileDialog() + t = threading.Thread(target=self.dialog.poll) + t.start() + + self.tmpdir = str(tmp_path) + lockfile = str(tmp_path / ".samp") + + self.hub = SAMPHubServer( + web_profile_dialog=self.dialog, lockfile=lockfile, web_port=0, pool_size=1 + ) + self.hub.start() + + self.client1 = SAMPIntegratedClient() + self.client1.connect(hub=self.hub, pool_size=1) + self.client1_id = self.client1.get_public_id() + self.client1_key = self.client1.get_private_key() + + self.client2 = SAMPIntegratedWebClient() + self.client2.connect(web_port=self.hub._web_port, pool_size=2) + self.client2_id = self.client2.get_public_id() + self.client2_key = self.client2.get_private_key() + + def teardown_method(self): + if self.client1.is_connected: + self.client1.disconnect() + if self.client2.is_connected: + self.client2.disconnect() + + self.hub.stop() + self.dialog.stop() + + # The full communication tests are run since TestWebProfile inherits + # test_main from TestStandardProfile + + def test_web_profile(self): + # Check some additional queries to the server + + with get_readable_fileobj( + f"http://localhost:{self.hub._web_port}/crossdomain.xml" + ) as f: + assert f.read() == CROSS_DOMAIN + + with get_readable_fileobj( + f"http://localhost:{self.hub._web_port}/clientaccesspolicy.xml" + ) as f: + assert f.read() == CLIENT_ACCESS_POLICY + + # Check headers + + req = Request(f"http://localhost:{self.hub._web_port}/crossdomain.xml") + req.add_header("Origin", "test_web_profile") + resp = urlopen(req) + + assert resp.getheader("Access-Control-Allow-Origin") == "test_web_profile" + assert resp.getheader("Access-Control-Allow-Headers") == "Content-Type" + assert resp.getheader("Access-Control-Allow-Credentials") == "true" diff --git a/pyvo/astropy_samp/tests/web_profile_test_helpers.py b/pyvo/astropy_samp/tests/web_profile_test_helpers.py new file mode 100644 index 000000000..41ac945ff --- /dev/null +++ b/pyvo/astropy_samp/tests/web_profile_test_helpers.py @@ -0,0 +1,282 @@ +import threading +import time +import xmlrpc.client as xmlrpc + +from pyvo.astropy_samp.client import SAMPClient +from pyvo.astropy_samp.errors import SAMPClientError, SAMPHubError +from pyvo.astropy_samp.hub import WebProfileDialog +from pyvo.astropy_samp.hub_proxy import SAMPHubProxy +from pyvo.astropy_samp.integrated_client import SAMPIntegratedClient +from pyvo.astropy_samp.utils import ServerProxyPool + + +class AlwaysApproveWebProfileDialog(WebProfileDialog): + def __init__(self): + self.polling = True + WebProfileDialog.__init__(self) + + def show_dialog(self, *args): + self.consent() + + def poll(self): + while self.polling: + self.handle_queue() + time.sleep(0.1) + + def stop(self): + self.polling = False + + +class SAMPWebHubProxy(SAMPHubProxy): + """ + Proxy class to simplify the client interaction with a SAMP hub (via the web + profile). + + In practice web clients should run from the browser, so this is provided as + a means of testing a hub's support for the web profile from Python. + """ + + def connect(self, pool_size=20, web_port=21012): + """ + Connect to the current SAMP Hub on localhost:web_port. + + Parameters + ---------- + pool_size : int, optional + The number of socket connections opened to communicate with the + Hub. + """ + self._connected = False + + try: + self.proxy = ServerProxyPool( + pool_size, + xmlrpc.ServerProxy, + f"http://127.0.0.1:{web_port}", + allow_none=1, + ) + self.ping() + self._connected = True + except xmlrpc.ProtocolError as p: + raise SAMPHubError(f"Protocol Error {p.errcode}: {p.errmsg}") + + @property + def _samp_hub(self): + """ + Property to abstract away the path to the hub, which allows this class + to be used for both the standard and the web profile. + """ + return self.proxy.samp.webhub + + def set_xmlrpc_callback(self, private_key, xmlrpc_addr): + raise NotImplementedError( + "set_xmlrpc_callback is not defined for the web profile" + ) + + def register(self, identity_info): + """ + Proxy to ``register`` SAMP Hub method. + """ + return self._samp_hub.register(identity_info) + + def allow_reverse_callbacks(self, private_key, allow): + """ + Proxy to ``allowReverseCallbacks`` SAMP Hub method. + """ + return self._samp_hub.allowReverseCallbacks(private_key, allow) + + def pull_callbacks(self, private_key, timeout): + """ + Proxy to ``pullCallbacks`` SAMP Hub method. + """ + return self._samp_hub.pullCallbacks(private_key, timeout) + + +class SAMPWebClient(SAMPClient): + """ + Utility class which provides facilities to create and manage a SAMP + compliant XML-RPC server that acts as SAMP callable web client application. + + In practice web clients should run from the browser, so this is provided as + a means of testing a hub's support for the web profile from Python. + + Parameters + ---------- + hub : :class:`~pyvo.astropy_samp.hub_proxy.SAMPWebHubProxy` + An instance of :class:`~pyvo.astropy_samp.hub_proxy.SAMPWebHubProxy` to + be used for messaging with the SAMP Hub. + + name : str, optional + Client name (corresponding to ``samp.name`` metadata keyword). + + description : str, optional + Client description (corresponding to ``samp.description.text`` metadata + keyword). + + metadata : dict, optional + Client application metadata in the standard SAMP format. + + callable : bool, optional + Whether the client can receive calls and notifications. If set to + `False`, then the client can send notifications and calls, but can not + receive any. + """ + + def __init__(self, hub, name=None, description=None, metadata=None, callable=True): + # GENERAL + self._is_running = False + self._is_registered = False + + if metadata is None: + metadata = {} + + if name is not None: + metadata["samp.name"] = name + + if description is not None: + metadata["samp.description.text"] = description + + self._metadata = metadata + + self._callable = callable + + # HUB INTERACTION + self.client = None + self._public_id = None + self._private_key = None + self._hub_id = None + self._notification_bindings = {} + self._call_bindings = { + "samp.app.ping": [self._ping, {}], + "client.env.get": [self._client_env_get, {}], + } + self._response_bindings = {} + + self.hub = hub + + self._registration_lock = threading.Lock() + self._registered_event = threading.Event() + if self._callable: + self._thread = threading.Thread(target=self._serve_forever) + self._thread.daemon = True + + def _serve_forever(self): + while self.is_running: + # Wait until we are actually registered before trying to do + # anything, to avoid busy looping + # Watch for callbacks here + self._registered_event.wait() + with self._registration_lock: + if not self._is_registered: + return + + results = self.hub.pull_callbacks(self.get_private_key(), 0) + for result in results: + if result["samp.methodName"] == "receiveNotification": + self.receive_notification( + self._private_key, *result["samp.params"] + ) + elif result["samp.methodName"] == "receiveCall": + self.receive_call(self._private_key, *result["samp.params"]) + elif result["samp.methodName"] == "receiveResponse": + self.receive_response(self._private_key, *result["samp.params"]) + + self.hub.disconnect() + + def register(self): + """ + Register the client to the SAMP Hub. + """ + if self.hub.is_connected: + if self._private_key is not None: + raise SAMPClientError("Client already registered") + + result = self.hub.register("Astropy SAMP Web Client") + + if result["samp.self-id"] == "": + raise SAMPClientError( + "Registration failed - samp.self-id was not set by the hub." + ) + + if result["samp.private-key"] == "": + raise SAMPClientError( + "Registration failed - samp.private-key was not set by the hub." + ) + + self._public_id = result["samp.self-id"] + self._private_key = result["samp.private-key"] + self._hub_id = result["samp.hub-id"] + + if self._callable: + self._declare_subscriptions() + self.hub.allow_reverse_callbacks(self._private_key, True) + + if self._metadata != {}: + self.declare_metadata() + + self._is_registered = True + # Let the client thread proceed + self._registered_event.set() + + else: + raise SAMPClientError( + "Unable to register to the SAMP Hub. Hub proxy not connected." + ) + + def unregister(self): + # We have to hold the registration lock if the client is callable + # to avoid a race condition where the client queries the hub for + # pushCallbacks after it has already been unregistered from the hub + with self._registration_lock: + super().unregister() + + +class SAMPIntegratedWebClient(SAMPIntegratedClient): + """ + A Simple SAMP web client. + + In practice web clients should run from the browser, so this is provided as + a means of testing a hub's support for the web profile from Python. + + This class is meant to simplify the client usage providing a proxy class + that merges the :class:`~pyvo.astropy_samp.client.SAMPWebClient` and + :class:`~pyvo.astropy_samp.hub_proxy.SAMPWebHubProxy` functionalities in a + simplified API. + + Parameters + ---------- + name : str, optional + Client name (corresponding to ``samp.name`` metadata keyword). + + description : str, optional + Client description (corresponding to ``samp.description.text`` metadata + keyword). + + metadata : dict, optional + Client application metadata in the standard SAMP format. + + callable : bool, optional + Whether the client can receive calls and notifications. If set to + `False`, then the client can send notifications and calls, but can not + receive any. + """ + + def __init__(self, name=None, description=None, metadata=None, callable=True): + self.hub = SAMPWebHubProxy() + + self.client = SAMPWebClient(self.hub, name, description, metadata, callable) + + def connect(self, pool_size=20, web_port=21012): + """ + Connect with the current or specified SAMP Hub, start and register the + client. + + Parameters + ---------- + pool_size : int, optional + The number of socket connections opened to communicate with the + Hub. + """ + self.hub.connect(pool_size, web_port=web_port) + self.client.start() + self.client.register() diff --git a/pyvo/astropy_samp/utils.py b/pyvo/astropy_samp/utils.py new file mode 100644 index 000000000..d695a9fb8 --- /dev/null +++ b/pyvo/astropy_samp/utils.py @@ -0,0 +1,169 @@ +# Licensed under a 3-clause BSD style license - see LICENSE.rst +""" +Utility functions and classes. +""" + + +import inspect +import queue +import traceback +import xmlrpc.client as xmlrpc +from io import StringIO +from urllib.request import urlopen + +from .constants import SAMP_STATUS_ERROR +from .errors import SAMPProxyError + + +def internet_on(): + from . import conf + + if not conf.use_internet: + return False + else: + try: + urlopen("http://google.com", timeout=1.0) + except Exception: + return False + else: + return True + + +__all__ = ["SAMPMsgReplierWrapper"] + +__doctest_skip__ = ["."] + + +def getattr_recursive(variable, attribute): + """ + Get attributes recursively. + """ + if "." in attribute: + top, remaining = attribute.split(".", 1) + return getattr_recursive(getattr(variable, top), remaining) + else: + return getattr(variable, attribute) + + +class _ServerProxyPoolMethod: + # some magic to bind an XML-RPC method to an RPC server. + # supports "nested" methods (e.g. examples.getStateName) + + def __init__(self, proxies, name): + self.__proxies = proxies + self.__name = name + + def __getattr__(self, name): + return _ServerProxyPoolMethod(self.__proxies, f"{self.__name}.{name}") + + def __call__(self, *args, **kwrds): + proxy = self.__proxies.get() + function = getattr_recursive(proxy, self.__name) + try: + response = function(*args, **kwrds) + except xmlrpc.Fault as exc: + raise SAMPProxyError(exc.faultCode, exc.faultString) + finally: + self.__proxies.put(proxy) + return response + + +class ServerProxyPool: + """ + A thread-safe pool of `xmlrpc.ServerProxy` objects. + """ + + def __init__(self, size, proxy_class, *args, **keywords): + self._proxies = queue.Queue(size) + for i in range(size): + self._proxies.put(proxy_class(*args, **keywords)) + + def __getattr__(self, name): + # magic method dispatcher + return _ServerProxyPoolMethod(self._proxies, name) + + def shutdown(self): + """Shut down the proxy pool by closing all active connections.""" + while True: + try: + proxy = self._proxies.get_nowait() + except queue.Empty: + break + # An undocumented but apparently supported way to call methods on + # an ServerProxy that are not dispatched to the remote server + proxy("close") + + +class SAMPMsgReplierWrapper: + """ + Function decorator that allows to automatically grab errors and returned + maps (if any) from a function bound to a SAMP call (or notify). + + Parameters + ---------- + cli : :class:`~astropy.samp.SAMPIntegratedClient` or :class:`~astropy.samp.SAMPClient` + SAMP client instance. Decorator initialization, accepting the instance + of the client that receives the call or notification. + """ + + def __init__(self, cli): + self.cli = cli + + def __call__(self, f): + def wrapped_f(*args): + if get_num_args(f) == 5 or args[2] is None: # notification + f(*args) + + else: # call + try: + result = f(*args) + if result: + self.cli.hub.reply( + self.cli.get_private_key(), + args[2], + {"samp.status": SAMP_STATUS_ERROR, "samp.result": result}, + ) + except Exception: + err = StringIO() + traceback.print_exc(file=err) + txt = err.getvalue() + self.cli.hub.reply( + self.cli.get_private_key(), + args[2], + {"samp.status": SAMP_STATUS_ERROR, "samp.result": {"txt": txt}}, + ) + + return wrapped_f + + +class _HubAsClient: + def __init__(self, handler): + self._handler = handler + + def __getattr__(self, name): + # magic method dispatcher + return _HubAsClientMethod(self._handler, name) + + +class _HubAsClientMethod: + def __init__(self, send, name): + self.__send = send + self.__name = name + + def __getattr__(self, name): + return _HubAsClientMethod(self.__send, f"{self.__name}.{name}") + + def __call__(self, *args): + return self.__send(self.__name, args) + + +def get_num_args(f): + """ + Find the number of arguments a function or method takes (excluding ``self``). + """ + if inspect.ismethod(f): + return f.__func__.__code__.co_argcount - 1 + elif inspect.isfunction(f): + return f.__code__.co_argcount + else: + raise TypeError("f should be a function or a method") diff --git a/pyvo/astropy_samp/web_profile.py b/pyvo/astropy_samp/web_profile.py new file mode 100644 index 000000000..47aa957d6 --- /dev/null +++ b/pyvo/astropy_samp/web_profile.py @@ -0,0 +1,178 @@ +# Licensed under a 3-clause BSD style license - see LICENSE.rst + + +from urllib.parse import parse_qs +from urllib.request import urlopen + +from astropy.utils.data import get_pkg_data_contents + +from .standard_profile import SAMPSimpleXMLRPCRequestHandler, ThreadingXMLRPCServer + +__all__ = [] + +CROSS_DOMAIN = get_pkg_data_contents("data/crossdomain.xml") +CLIENT_ACCESS_POLICY = get_pkg_data_contents("data/clientaccesspolicy.xml") + + +class WebProfileRequestHandler(SAMPSimpleXMLRPCRequestHandler): + """ + Handler of XMLRPC requests performed through the Web Profile. + """ + + def _send_CORS_header(self): + if self.headers.get("Origin") is not None: + method = self.headers.get("Access-Control-Request-Method") + if method and self.command == "OPTIONS": + # Preflight method + self.send_header("Content-Length", "0") + self.send_header( + "Access-Control-Allow-Origin", self.headers.get("Origin") + ) + self.send_header("Access-Control-Allow-Methods", method) + self.send_header("Access-Control-Allow-Headers", "Content-Type") + self.send_header("Access-Control-Allow-Credentials", "true") + else: + # Simple method + self.send_header( + "Access-Control-Allow-Origin", self.headers.get("Origin") + ) + self.send_header("Access-Control-Allow-Headers", "Content-Type") + self.send_header("Access-Control-Allow-Credentials", "true") + + def end_headers(self): + self._send_CORS_header() + SAMPSimpleXMLRPCRequestHandler.end_headers(self) + + def _serve_cross_domain_xml(self): + cross_domain = False + + if self.path == "/crossdomain.xml": + # Adobe standard + response = CROSS_DOMAIN + + self.send_response(200, "OK") + self.send_header("Content-Type", "text/x-cross-domain-policy") + self.send_header("Content-Length", f"{len(response)}") + self.end_headers() + self.wfile.write(response.encode("utf-8")) + self.wfile.flush() + cross_domain = True + + elif self.path == "/clientaccesspolicy.xml": + # Microsoft standard + response = CLIENT_ACCESS_POLICY + + self.send_response(200, "OK") + self.send_header("Content-Type", "text/xml") + self.send_header("Content-Length", f"{len(response)}") + self.end_headers() + self.wfile.write(response.encode("utf-8")) + self.wfile.flush() + cross_domain = True + + return cross_domain + + def do_POST(self): + if self._serve_cross_domain_xml(): + return + + return SAMPSimpleXMLRPCRequestHandler.do_POST(self) + + def do_HEAD(self): + if not self.is_http_path_valid(): + self.report_404() + return + + if self._serve_cross_domain_xml(): + return + + def do_OPTIONS(self): + self.send_response(200, "OK") + self.end_headers() + + def do_GET(self): + if not self.is_http_path_valid(): + self.report_404() + return + + split_path = self.path.split("?") + + if split_path[0] in [f"/translator/{clid}" for clid in self.server.clients]: + # Request of a file proxying + urlpath = parse_qs(split_path[1]) + try: + proxyfile = urlopen(urlpath["ref"][0]) + self.send_response(200, "OK") + self.end_headers() + self.wfile.write(proxyfile.read()) + proxyfile.close() + except OSError: + self.report_404() + return + + if self._serve_cross_domain_xml(): + return + + def is_http_path_valid(self): + valid_paths = ["/clientaccesspolicy.xml", "/crossdomain.xml"] + [ + f"/translator/{clid}" for clid in self.server.clients + ] + return self.path.split("?")[0] in valid_paths + + +class WebProfileXMLRPCServer(ThreadingXMLRPCServer): + """ + XMLRPC server supporting the SAMP Web Profile. + """ + + def __init__( + self, + addr, + log=None, + requestHandler=WebProfileRequestHandler, + logRequests=True, + allow_none=True, + encoding=None, + ): + self.clients = [] + ThreadingXMLRPCServer.__init__( + self, addr, log, requestHandler, logRequests, allow_none, encoding + ) + + def add_client(self, client_id): + self.clients.append(client_id) + + def remove_client(self, client_id): + try: + self.clients.remove(client_id) + except ValueError: + # No warning here because this method gets called for all clients, + # not just web clients, and we expect it to fail for non-web + # clients. + pass + + +def web_profile_text_dialog(request, queue): + samp_name = "unknown" + + if isinstance(request[0], str): + # To support the old protocol version + samp_name = request[0] + else: + samp_name = request[0]["samp.name"] + + text = f"""A Web application which declares to be + +Name: {samp_name} +Origin: {request[2]} + +is requesting to be registered with the SAMP Hub. +Pay attention that if you permit its registration, such +application will acquire all current user privileges, like +file read/write. + +Do you give your consent? [yes|no]""" + + print(text) + answer = input(">>> ") + queue.put(answer.lower() in ["yes", "y"]) diff --git a/pyvo/samp.py b/pyvo/samp.py index 535f1a872..a46a43472 100644 --- a/pyvo/samp.py +++ b/pyvo/samp.py @@ -6,7 +6,7 @@ import os import tempfile -from astropy.samp import SAMPIntegratedClient +from .astropy_samp import SAMPIntegratedClient __all__ = [