From b58ff3aa9913cb977f11f94cc914bfb0925be019 Mon Sep 17 00:00:00 2001 From: Adrian Coveney Date: Thu, 14 Jan 2021 12:12:29 +0000 Subject: [PATCH 01/16] Correct topic names in AMS migrate docs They use dashes rather than dots. --- migrating_to_ams.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/migrating_to_ams.md b/migrating_to_ams.md index 96c7b825..9d0ba131 100644 --- a/migrating_to_ams.md +++ b/migrating_to_ams.md @@ -23,8 +23,8 @@ The sender configuration is usually found under `/etc/apel/sender.cfg`. Follow t ``` 1. To send to the central APEL Accounting server, change `destination` to one of the following depending on your type of accounting: * `gLite-APEL` for Grid Accounting - * `eu.egi.cloud.accounting` for Cloud Accounting - * `eu.egi.storage.accounting` for Storage Accounting + * `eu-egi-cloud-accounting` for Cloud Accounting + * `eu-egi-storage-accounting` for Storage Accounting The next time `ssmsend` runs it should be using the AMS. You can check this by looking in the logs for a successful run, which should look like this: From 26ef823a986722f8c60b0fb220fa7699d5e7e4dd Mon Sep 17 00:00:00 2001 From: sarahbyrnie Date: Wed, 9 Dec 2020 11:53:32 +0000 Subject: [PATCH 02/16] Adds server and host dns to the sender log --- bin/sender.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/bin/sender.py b/bin/sender.py index 30fe95b2..0d14a5cc 100644 --- a/bin/sender.py +++ b/bin/sender.py @@ -18,7 +18,7 @@ from __future__ import print_function -from ssm import __version__, set_up_logging, LOG_BREAK +from ssm import __version__, set_up_logging, LOG_BREAK, crypto from ssm.ssm2 import Ssm2, Ssm2Exception from ssm.crypto import CryptoException from ssm.brokers import StompBrokerGetter, STOMP_SERVICE, STOMP_SSL_SERVICE @@ -171,6 +171,8 @@ def main(): verify_server_cert = True try: server_cert = cp.get('certificates', 'server_cert') + server_dn = crypto.get_certificate_subject(crypto._from_file(server_cert)) + log.info('Messages will be encrypted using %s', server_dn) try: verify_server_cert = cp.getboolean('certificates', 'verify_server_cert') except ConfigParser.NoOptionError: @@ -193,6 +195,10 @@ def main(): log.info('No path type defined, assuming dirq.') path_type = 'dirq' + host_cert = cp.get('certificates','certificate') + host_dn = crypto.get_certificate_subject(crypto._from_file(host_cert)) + log.info('Messages will be signed using %s', host_dn) + sender = Ssm2(brokers, cp.get('messaging', 'path'), path_type=path_type, From 902cf9c4c58471b5cfed863e6ead31a58b278ff2 Mon Sep 17 00:00:00 2001 From: Adrian Coveney Date: Wed, 17 Mar 2021 15:01:14 +0000 Subject: [PATCH 03/16] Reuse host_cert variable and correct spacing Rather than fetching the host certificate config twice, we can reuse the varible that it gets read into. --- bin/sender.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bin/sender.py b/bin/sender.py index 0d14a5cc..8f212151 100644 --- a/bin/sender.py +++ b/bin/sender.py @@ -195,14 +195,14 @@ def main(): log.info('No path type defined, assuming dirq.') path_type = 'dirq' - host_cert = cp.get('certificates','certificate') + host_cert = cp.get('certificates', 'certificate') host_dn = crypto.get_certificate_subject(crypto._from_file(host_cert)) log.info('Messages will be signed using %s', host_dn) sender = Ssm2(brokers, cp.get('messaging', 'path'), path_type=path_type, - cert=cp.get('certificates', 'certificate'), + cert=host_cert, key=cp.get('certificates', 'key'), dest=cp.get('messaging', 'destination'), use_ssl=use_ssl, From 7461bb74d9ee47f03700cd4d91f58f49dd9160bb Mon Sep 17 00:00:00 2001 From: Adrian Coveney Date: Wed, 17 Mar 2021 15:12:02 +0000 Subject: [PATCH 04/16] Tidy up imports to match the rest --- bin/sender.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/bin/sender.py b/bin/sender.py index 8f212151..2eb4c2b4 100644 --- a/bin/sender.py +++ b/bin/sender.py @@ -18,9 +18,9 @@ from __future__ import print_function -from ssm import __version__, set_up_logging, LOG_BREAK, crypto +from ssm import __version__, set_up_logging, LOG_BREAK from ssm.ssm2 import Ssm2, Ssm2Exception -from ssm.crypto import CryptoException +from ssm.crypto import CryptoException, get_certificate_subject, _from_file from ssm.brokers import StompBrokerGetter, STOMP_SERVICE, STOMP_SSL_SERVICE import logging.config @@ -171,7 +171,7 @@ def main(): verify_server_cert = True try: server_cert = cp.get('certificates', 'server_cert') - server_dn = crypto.get_certificate_subject(crypto._from_file(server_cert)) + server_dn = get_certificate_subject(_from_file(server_cert)) log.info('Messages will be encrypted using %s', server_dn) try: verify_server_cert = cp.getboolean('certificates', 'verify_server_cert') @@ -196,7 +196,7 @@ def main(): path_type = 'dirq' host_cert = cp.get('certificates', 'certificate') - host_dn = crypto.get_certificate_subject(crypto._from_file(host_cert)) + host_dn = get_certificate_subject(_from_file(host_cert)) log.info('Messages will be signed using %s', host_dn) sender = Ssm2(brokers, From 9a9021931b6489f61276dfbf17abcf4ee3f9df74 Mon Sep 17 00:00:00 2001 From: Adrian Coveney Date: Tue, 9 Mar 2021 17:32:47 +0000 Subject: [PATCH 05/16] Add agents.py to refactor reciver and sender into --- ssm/agents.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 ssm/agents.py diff --git a/ssm/agents.py b/ssm/agents.py new file mode 100644 index 00000000..e69de29b From 9b3114eaa64abbb5d0c99bf45ea576e605d4498c Mon Sep 17 00:00:00 2001 From: Adrian Coveney Date: Wed, 10 Mar 2021 15:13:39 +0000 Subject: [PATCH 06/16] Replace global log with passing logger to function Globals are generally not a good idea so replace the use of a global logger with passing the logger to the function that requires it. --- bin/receiver.py | 8 +++----- test/test_receiver.py | 7 ++++--- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/bin/receiver.py b/bin/receiver.py index 0afeb759..1f768ea1 100644 --- a/bin/receiver.py +++ b/bin/receiver.py @@ -45,10 +45,9 @@ # How often (in seconds) to read the list of valid DNs. REFRESH_DNS = 600 -log = None -def get_dns(dn_file): +def get_dns(dn_file, log): """Retrieve a list of DNs from a file.""" dns = [] f = None @@ -110,7 +109,6 @@ def main(): print('SSM will exit.') sys.exit(1) - global log log = logging.getLogger('ssmreceive') log.info(LOG_BREAK) @@ -224,7 +222,7 @@ def main(): token=token) log.info('Fetching valid DNs.') - dns = get_dns(options.dn_file) + dns = get_dns(options.dn_file, log) ssm.set_dns(dns) except Exception as e: @@ -251,7 +249,7 @@ def main(): if i % (REFRESH_DNS * 10) == 0: log.info('Refreshing valid DNs and then sending ping.') - dns = get_dns(options.dn_file) + dns = get_dns(options.dn_file, log) ssm.set_dns(dns) if protocol == Ssm2.STOMP_MESSAGING: diff --git a/test/test_receiver.py b/test/test_receiver.py index 70386484..40ef1544 100644 --- a/test/test_receiver.py +++ b/test/test_receiver.py @@ -23,7 +23,8 @@ def setUp(self): def test_get_empty_dns_file(self): """Attempting to read an empty DNs file should raise an exception.""" - self.assertRaises(Ssm2Exception, bin.receiver.get_dns, self.tf_path) + self.assertRaises(Ssm2Exception, bin.receiver.get_dns, + self.tf_path, self.mock_log) def test_get_good_dns(self): dn_text = dedent("""\ @@ -41,7 +42,7 @@ def test_get_good_dns(self): f = open(self.tf_path, 'w') f.write(dn_text) f.close() - self.assertEqual(bin.receiver.get_dns(self.tf_path), output) + self.assertEqual(bin.receiver.get_dns(self.tf_path, self.mock_log), output) def test_get_iffy_dns(self): """Check that the two bad DNs are picked up.""" @@ -56,7 +57,7 @@ def test_get_iffy_dns(self): f = open(self.tf_path, 'w') f.write(dn_text) f.close() - bin.receiver.get_dns(self.tf_path) + bin.receiver.get_dns(self.tf_path, self.mock_log) self.assertEqual(self.mock_log.warn.call_count, 2) def tearDown(self): From 61c58f6647f2ca4b3fe266aa146ad79e917a80f3 Mon Sep 17 00:00:00 2001 From: Adrian Coveney Date: Wed, 10 Mar 2021 17:21:54 +0000 Subject: [PATCH 07/16] Factor out agent code from executables --- bin/receiver.py | 223 +----------------------------- bin/sender.py | 192 +------------------------- ssm/agents.py | 352 ++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 365 insertions(+), 402 deletions(-) diff --git a/bin/receiver.py b/bin/receiver.py index 1f768ea1..cff0ee6e 100644 --- a/bin/receiver.py +++ b/bin/receiver.py @@ -18,59 +18,19 @@ from __future__ import print_function -from ssm.brokers import StompBrokerGetter, STOMP_SERVICE, STOMP_SSL_SERVICE -from ssm.ssm2 import Ssm2, Ssm2Exception -from ssm import __version__, set_up_logging, LOG_BREAK +import agents +from ssm import __version__, LOG_BREAK -from stomp.exception import NotConnectedException -try: - from argo_ams_library import AmsConnectionException -except ImportError: - # ImportError is raised when Ssm2 initialised if AMS is requested but lib - # not installed. - AmsConnectionException = None - -import time import logging.config -import ldap import os import sys from optparse import OptionParser -from daemon import DaemonContext try: import ConfigParser except ImportError: import configparser as ConfigParser -# How often (in seconds) to read the list of valid DNs. -REFRESH_DNS = 600 - - -def get_dns(dn_file, log): - """Retrieve a list of DNs from a file.""" - dns = [] - f = None - try: - f = open(dn_file, 'r') - lines = f.readlines() - for line in lines: - if line.isspace() or line.strip().startswith('#'): - continue - elif line.strip().startswith('/'): - dns.append(line.strip()) - else: - log.warn('DN in incorrect format: %s', line) - finally: - if f is not None: - f.close() - # If no valid DNs, SSM cannot receive any messages. - if len(dns) == 0: - raise Ssm2Exception('No valid DNs found in %s. SSM will not start' % dn_file) - - log.debug('%s DNs found.', len(dns)) - return dns - def main(): """Set up connection, and listen for messages.""" @@ -96,191 +56,20 @@ def main(): print('Cannot start SSM. Pidfile %s already exists.' % pidfile) sys.exit(1) - # set up logging - try: - if os.path.exists(options.log_config): - logging.config.fileConfig(options.log_config) - else: - set_up_logging(cp.get('logging', 'logfile'), - cp.get('logging', 'level'), - cp.getboolean('logging', 'console')) - except (ConfigParser.Error, ValueError, IOError) as err: - print('Error configuring logging: %s' % err) - print('SSM will exit.') - sys.exit(1) + agents.logging_helper(cp, options.log_config) log = logging.getLogger('ssmreceive') log.info(LOG_BREAK) log.info('Starting receiving SSM version %s.%s.%s.', *__version__) - # Determine the protocol for the SSM to use. - try: - protocol = cp.get('receiver', 'protocol') - - except (ConfigParser.NoSectionError, ConfigParser.NoOptionError): - # If the newer configuration setting 'protocol' is not set, use 'STOMP' - # for backwards compatability. - protocol = Ssm2.STOMP_MESSAGING - log.debug("No option set for 'protocol'. Defaulting to %s.", protocol) + protocol = agents.get_protocol(cp, log) log.info('Setting up SSM with protocol: %s', protocol) - if protocol == Ssm2.STOMP_MESSAGING: - # Set defaults for AMS variables that Ssm2 constructor requires below. - project = None - token = '' - - use_ssl = cp.getboolean('broker', 'use_ssl') - if use_ssl: - service = STOMP_SSL_SERVICE - else: - service = STOMP_SERVICE - - # If we can't get a broker to connect to, we have to give up. - try: - bg = StompBrokerGetter(cp.get('broker', 'bdii')) - brokers = bg.get_broker_hosts_and_ports(service, cp.get('broker', - 'network')) - except ConfigParser.NoOptionError as e: - try: - host = cp.get('broker', 'host') - port = cp.get('broker', 'port') - brokers = [(host, int(port))] - except ConfigParser.NoOptionError: - log.error('Options incorrectly supplied for either single ' - 'broker or broker network. ' - 'Please check configuration') - log.error('System will exit.') - log.info(LOG_BREAK) - sys.exit(1) - except ldap.SERVER_DOWN as e: - log.error('Could not connect to LDAP server: %s', e) - log.error('System will exit.') - log.info(LOG_BREAK) - sys.exit(1) - - elif protocol == Ssm2.AMS_MESSAGING: - # Then we are setting up an SSM to connect to a AMS. - - # 'use_ssl' isn't checked when using AMS (SSL is always used), but it - # is needed for the call to the Ssm2 constructor below. - use_ssl = None - try: - # We only need a hostname, not a port - host = cp.get('broker', 'host') - # Use brokers variable so subsequent code is not dependant on - # the exact destination type. - brokers = [host] - - except ConfigParser.NoOptionError: - log.error('The host must be specified when connecting to AMS, ' - 'please check your configuration') - log.error('System will exit.') - log.info(LOG_BREAK) - print('SSM failed to start. See log file for details.') - sys.exit(1) - - # Attempt to configure AMS specific variables. - try: - token = cp.get('messaging', 'token') - project = cp.get('messaging', 'ams_project') - - except (ConfigParser.Error, ValueError, IOError) as err: - # A token and project are needed to successfully receive from an - # AMS instance, so log and then exit on an error. - log.error('Error configuring AMS values: %s', err) - log.error('SSM will exit.') - print('SSM failed to start. See log file for details.') - sys.exit(1) + brokers, project, token = agents.get_ssm_args(protocol, cp, log) - if len(brokers) == 0: - log.error('No brokers available.') - log.error('System will exit.') - log.info(LOG_BREAK) - sys.exit(1) - - log.info('The SSM will run as a daemon.') - - # We need to preserve the file descriptor for any log files. - rootlog = logging.getLogger() - log_files = [x.stream for x in rootlog.handlers] - dc = DaemonContext(files_preserve=log_files) - - try: - ssm = Ssm2(brokers, - cp.get('messaging', 'path'), - cert=cp.get('certificates', 'certificate'), - key=cp.get('certificates', 'key'), - listen=cp.get('messaging', 'destination'), - use_ssl=use_ssl, - capath=cp.get('certificates', 'capath'), - check_crls=cp.getboolean('certificates', 'check_crls'), - pidfile=pidfile, - protocol=protocol, - project=project, - token=token) - - log.info('Fetching valid DNs.') - dns = get_dns(options.dn_file, log) - ssm.set_dns(dns) - - except Exception as e: - log.fatal('Failed to initialise SSM: %s', e) - log.info(LOG_BREAK) - sys.exit(1) - - try: - # Note: because we need to be compatible with python 2.4, we can't use - # with dc: - # here - we need to call the open() and close() methods - # manually. - dc.open() - ssm.startup() - i = 0 - # The message listening loop. - while True: - try: - time.sleep(0.1) - if protocol == Ssm2.AMS_MESSAGING: - # We need to pull down messages as part of - # this loop when using AMS. - ssm.pull_msg_ams() - - if i % (REFRESH_DNS * 10) == 0: - log.info('Refreshing valid DNs and then sending ping.') - dns = get_dns(options.dn_file, log) - ssm.set_dns(dns) - - if protocol == Ssm2.STOMP_MESSAGING: - ssm.send_ping() - - except (NotConnectedException, AmsConnectionException) as error: - log.warn('Connection lost.') - log.debug(error) - ssm.shutdown() - dc.close() - log.info("Waiting for 10 minutes before restarting...") - time.sleep(10 * 60) - log.info('Restarting SSM.') - dc.open() - ssm.startup() - - i += 1 - - except SystemExit as e: - log.info('Received the shutdown signal: %s', e) - ssm.shutdown() - dc.close() - except Exception as e: - log.error('Unexpected exception: %s', e) - log.error('Exception type: %s', e.__class__) - log.error('The SSM will exit.') - ssm.shutdown() - dc.close() - - log.info('Receiving SSM has shut down.') - log.info(LOG_BREAK) + agents.run_receiver(protocol, brokers, project, token, cp, log, options.dn_file, log) if __name__ == '__main__': diff --git a/bin/sender.py b/bin/sender.py index 2eb4c2b4..88eca409 100644 --- a/bin/sender.py +++ b/bin/sender.py @@ -18,15 +18,10 @@ from __future__ import print_function -from ssm import __version__, set_up_logging, LOG_BREAK -from ssm.ssm2 import Ssm2, Ssm2Exception -from ssm.crypto import CryptoException, get_certificate_subject, _from_file -from ssm.brokers import StompBrokerGetter, STOMP_SERVICE, STOMP_SSL_SERVICE +import agents +from ssm import __version__, LOG_BREAK import logging.config -import ldap -import sys -import os from optparse import OptionParser try: @@ -49,193 +44,20 @@ def main(): cp = ConfigParser.ConfigParser({'use_ssl': 'true'}) cp.read(options.config) - # set up logging - try: - if os.path.exists(options.log_config): - logging.config.fileConfig(options.log_config) - else: - set_up_logging(cp.get('logging', 'logfile'), - cp.get('logging', 'level'), - cp.getboolean('logging', 'console')) - except (ConfigParser.Error, ValueError, IOError) as err: - print('Error configuring logging: %s' % err) - print('The system will exit.') - sys.exit(1) + agents.logging_helper(cp, options.log_config) log = logging.getLogger('ssmsend') log.info(LOG_BREAK) log.info('Starting sending SSM version %s.%s.%s.', *__version__) - # Determine the protocol and destination type of the SSM to configure. - try: - protocol = cp.get('sender', 'protocol') - - except (ConfigParser.NoSectionError, ConfigParser.NoOptionError): - # If the newer configuration setting 'protocol' is not set, use 'STOMP' - # for backwards compatability. - protocol = Ssm2.STOMP_MESSAGING - log.debug("No option set for 'protocol'. Defaulting to %s.", protocol) + protocol = agents.get_protocol(cp, log) log.info('Setting up SSM with protocol: %s', protocol) - if protocol == Ssm2.STOMP_MESSAGING: - # Set defaults for AMS variables that Ssm2 constructor requires below. - project = None - token = '' - - use_ssl = cp.getboolean('broker', 'use_ssl') - if use_ssl: - service = STOMP_SSL_SERVICE - else: - service = STOMP_SERVICE - - # If we can't get a broker to connect to, we have to give up. - try: - bdii_url = cp.get('broker', 'bdii') - log.info('Retrieving broker details from %s ...', bdii_url) - bg = StompBrokerGetter(bdii_url) - brokers = bg.get_broker_hosts_and_ports(service, cp.get('broker', - 'network')) - log.info('Found %s brokers.', len(brokers)) - except ConfigParser.NoOptionError as e: - try: - host = cp.get('broker', 'host') - port = cp.get('broker', 'port') - brokers = [(host, int(port))] - except ConfigParser.NoOptionError: - log.error('Options incorrectly supplied for either single ' - 'broker or broker network. ' - 'Please check configuration') - log.error('System will exit.') - log.info(LOG_BREAK) - print('SSM failed to start. See log file for details.') - sys.exit(1) - except ldap.LDAPError as e: - log.error('Could not connect to LDAP server: %s', e) - log.error('System will exit.') - log.info(LOG_BREAK) - print('SSM failed to start. See log file for details.') - sys.exit(1) - - elif protocol == Ssm2.AMS_MESSAGING: - # Then we are setting up an SSM to connect to a AMS. - - # 'use_ssl' isn't checked when using AMS (SSL is always used), but it - # is needed for the call to the Ssm2 constructor below. - use_ssl = None - try: - # We only need a hostname, not a port - host = cp.get('broker', 'host') - # Use brokers variable so subsequent code is not dependant on - # the exact destination type. - brokers = [host] - - except ConfigParser.NoOptionError: - log.error('The host must be specified when connecting to AMS, ' - 'please check your configuration') - log.error('System will exit.') - log.info(LOG_BREAK) - print('SSM failed to start. See log file for details.') - sys.exit(1) - - # Attempt to configure AMS project variable. - try: - project = cp.get('messaging', 'ams_project') - - except (ConfigParser.Error, ValueError, IOError) as err: - # A project is needed to successfully send to an - # AMS instance, so log and then exit on an error. - log.error('Error configuring AMS values: %s', err) - log.error('SSM will exit.') - print('SSM failed to start. See log file for details.') - sys.exit(1) - - try: - token = cp.get('messaging', 'token') - except (ConfigParser.Error, ValueError, IOError) as err: - # A token is not necessarily needed, if the cert and key can be - # used by the underlying auth system to get a suitable token. - log.info('No AMS token provided, using cert/key pair instead.') - # Empty string used by AMS to define absence of token. - token = '' - - if len(brokers) == 0: - log.error('No brokers available.') - log.error('System will exit.') - log.info(LOG_BREAK) - sys.exit(1) - - try: - server_cert = None - verify_server_cert = True - try: - server_cert = cp.get('certificates', 'server_cert') - server_dn = get_certificate_subject(_from_file(server_cert)) - log.info('Messages will be encrypted using %s', server_dn) - try: - verify_server_cert = cp.getboolean('certificates', 'verify_server_cert') - except ConfigParser.NoOptionError: - pass - except ConfigParser.NoOptionError: - log.info('No server certificate supplied. Will not encrypt messages.') - - try: - destination = cp.get('messaging', 'destination') - if destination == '': - raise Ssm2Exception('No destination queue is configured.') - except ConfigParser.NoOptionError as e: - raise Ssm2Exception(e) - - # Determine what type of message store we are interacting with, - # i.e. a dirq QueueSimple object or a plain MessageDirectory directory. - try: - path_type = cp.get('messaging', 'path_type') - except ConfigParser.NoOptionError: - log.info('No path type defined, assuming dirq.') - path_type = 'dirq' - - host_cert = cp.get('certificates', 'certificate') - host_dn = get_certificate_subject(_from_file(host_cert)) - log.info('Messages will be signed using %s', host_dn) - - sender = Ssm2(brokers, - cp.get('messaging', 'path'), - path_type=path_type, - cert=host_cert, - key=cp.get('certificates', 'key'), - dest=cp.get('messaging', 'destination'), - use_ssl=use_ssl, - capath=cp.get('certificates', 'capath'), - enc_cert=server_cert, - verify_enc_cert=verify_server_cert, - protocol=protocol, - project=project, - token=token) - - if sender.has_msgs(): - sender.handle_connect() - sender.send_all() - log.info('SSM run has finished.') - else: - log.info('No messages found to send.') - - except (Ssm2Exception, CryptoException) as e: - print('SSM failed to complete successfully. See log file for details.') - log.error('SSM failed to complete successfully: %s', e) - except Exception as e: - print('SSM failed to complete successfully. See log file for details.') - log.error('Unexpected exception in SSM: %s', e) - log.error('Exception type: %s', e.__class__) - - try: - sender.close_connection() - except UnboundLocalError: - # SSM not set up. - pass - - log.info('SSM has shut down.') - log.info(LOG_BREAK) + brokers, project, token = agents.get_ssm_args(protocol, cp, log) + + agents.run_sender(protocol, brokers, project, token, cp, log) if __name__ == '__main__': diff --git a/ssm/agents.py b/ssm/agents.py index e69de29b..f9b252a7 100644 --- a/ssm/agents.py +++ b/ssm/agents.py @@ -0,0 +1,352 @@ +#!/usr/bin/env python + +# Copyright 2021 UK Research and Innovation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Code for sender and receiver messaging agents/clients.""" + +from __future__ import absolute_import, division, print_function, unicode_literals + +from daemon import DaemonContext +import logging +import ldap +import os +import sys +import time + +try: + import ConfigParser +except ImportError: + import configparser as ConfigParser + +from stomp.exception import NotConnectedException +try: + from argo_ams_library import AmsConnectionException +except ImportError: + # ImportError is raised when Ssm2 initialised if AMS is requested but lib + # not installed. + AmsConnectionException = None + +from ssm import set_up_logging, LOG_BREAK +from ssm.ssm2 import Ssm2, Ssm2Exception +from ssm.crypto import CryptoException, get_certificate_subject, _from_file +from ssm.brokers import StompBrokerGetter, STOMP_SERVICE, STOMP_SSL_SERVICE + +# How often (in seconds) to read the list of valid DNs. +REFRESH_DNS = 600 + + +def logging_helper(cp, log_manual_path=''): + """Take config parser object and set up root logger.""" + try: + if os.path.exists(log_manual_path): + logging.cp.fileConfig(log_manual_path) + else: + set_up_logging(cp.get('logging', 'logfile'), + cp.get('logging', 'level'), + cp.getboolean('logging', 'console')) + except (ConfigParser.Error, ValueError, IOError) as err: + print('Error configuring logging: %s' % err) + print('The system will exit.') + sys.exit(1) + + +def get_protocol(cp, log): + """Get the protocol from a ConfigParser object, defaulting to STOMP.""" + try: + protocol = cp.get('sender', 'protocol') + + except (ConfigParser.NoSectionError, ConfigParser.NoOptionError): + # If the newer configuration setting 'protocol' is not set, use 'STOMP' + # for backwards compatability. + protocol = Ssm2.STOMP_MESSAGING + log.debug("No option set for 'protocol'. Defaulting to %s.", protocol) + return protocol + + +def get_ssm_args(protocol, cp, log): + """Return brokers, project, and token from config based on protocol.""" + if protocol == Ssm2.STOMP_MESSAGING: + # Set defaults for AMS variables that Ssm2 constructor requires below. + project = None + token = '' + + use_ssl = cp.getboolean('broker', 'use_ssl') + if use_ssl: + service = STOMP_SSL_SERVICE + else: + service = STOMP_SERVICE + + # If we can't get a broker to connect to, we have to give up. + try: + bdii_url = cp.get('broker', 'bdii') + log.info('Retrieving broker details from %s ...', bdii_url) + bg = StompBrokerGetter(bdii_url) + brokers = bg.get_broker_hosts_and_ports(service, cp.get('broker', + 'network')) + log.info('Found %s brokers.', len(brokers)) + except ConfigParser.NoOptionError as e: + try: + host = cp.get('broker', 'host') + port = cp.get('broker', 'port') + brokers = [(host, int(port))] + except ConfigParser.NoOptionError: + log.error('Options incorrectly supplied for either single ' + 'broker or broker network. ' + 'Please check configuration') + log.error('System will exit.') + log.info(LOG_BREAK) + print('SSM failed to start. See log file for details.') + sys.exit(1) + except ldap.LDAPError as e: + log.error('Could not connect to LDAP server: %s', e) + log.error('System will exit.') + log.info(LOG_BREAK) + print('SSM failed to start. See log file for details.') + sys.exit(1) + + elif protocol == Ssm2.AMS_MESSAGING: + # Then we are setting up an SSM to connect to a AMS. + + # TODO: See if setting use_ssl directly in Ssm2 constructor is ok. + # 'use_ssl' isn't checked when using AMS (SSL is always used), but it + # is needed for the call to the Ssm2 constructor below. + use_ssl = None + try: + # We only need a hostname, not a port + host = cp.get('broker', 'host') + # Use brokers variable so subsequent code is not dependant on + # the exact destination type. + brokers = [host] + + except ConfigParser.NoOptionError: + log.error('The host must be specified when connecting to AMS, ' + 'please check your configuration') + log.error('System will exit.') + log.info(LOG_BREAK) + print('SSM failed to start. See log file for details.') + sys.exit(1) + + # Attempt to configure AMS project variable. + try: + project = cp.get('messaging', 'ams_project') + + except (ConfigParser.Error, ValueError, IOError) as err: + # A project is needed to successfully send to an + # AMS instance, so log and then exit on an error. + log.error('Error configuring AMS values: %s', err) + log.error('SSM will exit.') + print('SSM failed to start. See log file for details.') + sys.exit(1) + + try: + token = cp.get('messaging', 'token') + except (ConfigParser.Error, ValueError, IOError) as err: + # A token is not necessarily needed, if the cert and key can be + # used by the underlying auth system to get a suitable token. + log.info('No AMS token provided, using cert/key pair instead.') + # Empty string used by AMS to define absence of token. + token = '' + + if len(brokers) == 0: + log.error('No brokers available.') + log.error('System will exit.') + log.info(LOG_BREAK) + sys.exit(1) + + return brokers, project, token + + +def run_sender(protocol, brokers, project, token, cp, log): + """Run Ssm2 as a sender.""" + try: + server_cert = None + verify_server_cert = True + try: + server_cert = cp.get('certificates', 'server_cert') + server_dn = get_certificate_subject(_from_file(server_cert)) + log.info('Messages will be encrypted using %s', server_dn) + try: + verify_server_cert = cp.getboolean('certificates', 'verify_server_cert') + except ConfigParser.NoOptionError: + pass + except ConfigParser.NoOptionError: + log.info('No server certificate supplied. Will not encrypt messages.') + + try: + destination = cp.get('messaging', 'destination') + if destination == '': + raise Ssm2Exception('No destination queue is configured.') + except ConfigParser.NoOptionError as e: + raise Ssm2Exception(e) + + # Determine what type of message store we are interacting with, + # i.e. a dirq QueueSimple object or a plain MessageDirectory directory. + try: + path_type = cp.get('messaging', 'path_type') + except ConfigParser.NoOptionError: + log.info('No path type defined, assuming dirq.') + path_type = 'dirq' + + host_cert = cp.get('certificates', 'certificate') + host_dn = get_certificate_subject(_from_file(host_cert)) + log.info('Messages will be signed using %s', host_dn) + + sender = Ssm2(brokers, + cp.get('messaging', 'path'), + path_type=path_type, + cert=host_cert, + key=cp.get('certificates', 'key'), + dest=cp.get('messaging', 'destination'), + use_ssl=cp.getboolean('broker', 'use_ssl'), + capath=cp.get('certificates', 'capath'), + enc_cert=server_cert, + verify_enc_cert=verify_server_cert, + protocol=protocol, + project=project, + token=token) + + if sender.has_msgs(): + sender.handle_connect() + sender.send_all() + log.info('SSM run has finished.') + else: + log.info('No messages found to send.') + + except (Ssm2Exception, CryptoException) as e: + print('SSM failed to complete successfully. See log file for details.') + log.error('SSM failed to complete successfully: %s', e) + except Exception as e: + print('SSM failed to complete successfully. See log file for details.') + log.error('Unexpected exception in SSM: %s', e) + log.error('Exception type: %s', e.__class__) + + try: + sender.close_connection() + except UnboundLocalError: + # SSM not set up. + pass + + log.info('SSM has shut down.') + log.info(LOG_BREAK) + + +def run_receiver(protocol, brokers, project, token, cp, log, dn_file): + """Run Ssm2 as a receiver daemon.""" + log.info('The SSM will run as a daemon.') + + # We need to preserve the file descriptor for any log files. + rootlog = logging.getLogger() + log_files = [x.stream for x in rootlog.handlers] + dc = DaemonContext(files_preserve=log_files) + + try: + ssm = Ssm2(brokers, + cp.get('messaging', 'path'), + cert=cp.get('certificates', 'certificate'), + key=cp.get('certificates', 'key'), + listen=cp.get('messaging', 'destination'), + use_ssl=cp.getboolean('broker', 'use_ssl'), + capath=cp.get('certificates', 'capath'), + check_crls=cp.getboolean('certificates', 'check_crls'), + pidfile=cp.get('daemon', 'pidfile'), + protocol=protocol, + project=project, + token=token) + + log.info('Fetching valid DNs.') + dns = get_dns(dn_file, log) + ssm.set_dns(dns) + + except Exception as e: + log.fatal('Failed to initialise SSM: %s', e) + log.info(LOG_BREAK) + sys.exit(1) + + try: + # Note: because we need to be compatible with python 2.4, we can't use + # with dc: + # here - we need to call the open() and close() methods + # manually. + dc.open() + ssm.startup() + i = 0 + # The message listening loop. + while True: + try: + time.sleep(0.1) + if protocol == Ssm2.AMS_MESSAGING: + # We need to pull down messages as part of + # this loop when using AMS. + ssm.pull_msg_ams() + + if i % (REFRESH_DNS * 10) == 0: + log.info('Refreshing valid DNs and then sending ping.') + dns = get_dns(dn_file, log) + ssm.set_dns(dns) + + if protocol == Ssm2.STOMP_MESSAGING: + ssm.send_ping() + + except (NotConnectedException, AmsConnectionException) as error: + log.warn('Connection lost.') + log.debug(error) + ssm.shutdown() + dc.close() + log.info("Waiting for 10 minutes before restarting...") + time.sleep(10 * 60) + log.info('Restarting SSM.') + dc.open() + ssm.startup() + + i += 1 + + except SystemExit as e: + log.info('Received the shutdown signal: %s', e) + ssm.shutdown() + dc.close() + except Exception as e: + log.error('Unexpected exception: %s', e) + log.error('Exception type: %s', e.__class__) + log.error('The SSM will exit.') + ssm.shutdown() + dc.close() + + log.info('Receiving SSM has shut down.') + log.info(LOG_BREAK) + + +def get_dns(dn_file, log): + """Retrieve a list of DNs from a file.""" + dns = [] + f = None + try: + f = open(dn_file, 'r') + lines = f.readlines() + for line in lines: + if line.isspace() or line.strip().startswith('#'): + continue + elif line.strip().startswith('/'): + dns.append(line.strip()) + else: + log.warn('DN in incorrect format: %s', line) + finally: + if f is not None: + f.close() + # If no valid DNs, SSM cannot receive any messages. + if len(dns) == 0: + raise Ssm2Exception('No valid DNs found in %s. SSM will not start' % dn_file) + + log.debug('%s DNs found.', len(dns)) + return dns From 47b2ad81914048265b1a58a7fb29da542d1cf8a2 Mon Sep 17 00:00:00 2001 From: Adrian Coveney Date: Wed, 10 Mar 2021 17:25:56 +0000 Subject: [PATCH 08/16] Correct import to specific module that's used --- bin/receiver.py | 2 +- bin/sender.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/bin/receiver.py b/bin/receiver.py index cff0ee6e..b0c9cb39 100644 --- a/bin/receiver.py +++ b/bin/receiver.py @@ -21,7 +21,7 @@ import agents from ssm import __version__, LOG_BREAK -import logging.config +import logging import os import sys from optparse import OptionParser diff --git a/bin/sender.py b/bin/sender.py index 88eca409..ac3d3444 100644 --- a/bin/sender.py +++ b/bin/sender.py @@ -21,7 +21,7 @@ import agents from ssm import __version__, LOG_BREAK -import logging.config +import logging from optparse import OptionParser try: From cbf1dade1a8223850f4e0a5cb0cb621ab7b4e19b Mon Sep 17 00:00:00 2001 From: Adrian Coveney Date: Thu, 11 Mar 2021 10:00:18 +0000 Subject: [PATCH 09/16] Correct imports of ssm.agents --- bin/receiver.py | 11 ++++++----- bin/sender.py | 10 +++++----- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/bin/receiver.py b/bin/receiver.py index b0c9cb39..15b52421 100644 --- a/bin/receiver.py +++ b/bin/receiver.py @@ -18,7 +18,7 @@ from __future__ import print_function -import agents +import ssm.agents from ssm import __version__, LOG_BREAK import logging @@ -56,20 +56,21 @@ def main(): print('Cannot start SSM. Pidfile %s already exists.' % pidfile) sys.exit(1) - agents.logging_helper(cp, options.log_config) + ssm.agents.logging_helper(cp, options.log_config) log = logging.getLogger('ssmreceive') log.info(LOG_BREAK) log.info('Starting receiving SSM version %s.%s.%s.', *__version__) - protocol = agents.get_protocol(cp, log) + protocol = ssm.agents.get_protocol(cp, log) log.info('Setting up SSM with protocol: %s', protocol) - brokers, project, token = agents.get_ssm_args(protocol, cp, log) + brokers, project, token = ssm.agents.get_ssm_args(protocol, cp, log) - agents.run_receiver(protocol, brokers, project, token, cp, log, options.dn_file, log) + ssm.agents.run_receiver(protocol, brokers, project, token, + cp, log, options.dn_file, log) if __name__ == '__main__': diff --git a/bin/sender.py b/bin/sender.py index ac3d3444..265c022d 100644 --- a/bin/sender.py +++ b/bin/sender.py @@ -18,7 +18,7 @@ from __future__ import print_function -import agents +import ssm.agents from ssm import __version__, LOG_BREAK import logging @@ -44,20 +44,20 @@ def main(): cp = ConfigParser.ConfigParser({'use_ssl': 'true'}) cp.read(options.config) - agents.logging_helper(cp, options.log_config) + ssm.agents.logging_helper(cp, options.log_config) log = logging.getLogger('ssmsend') log.info(LOG_BREAK) log.info('Starting sending SSM version %s.%s.%s.', *__version__) - protocol = agents.get_protocol(cp, log) + protocol = ssm.agents.get_protocol(cp, log) log.info('Setting up SSM with protocol: %s', protocol) - brokers, project, token = agents.get_ssm_args(protocol, cp, log) + brokers, project, token = ssm.agents.get_ssm_args(protocol, cp, log) - agents.run_sender(protocol, brokers, project, token, cp, log) + ssm.agents.run_sender(protocol, brokers, project, token, cp, log) if __name__ == '__main__': From e0a005fe84fcff483871488728ccce9944a3e235 Mon Sep 17 00:00:00 2001 From: Adrian Coveney Date: Thu, 11 Mar 2021 10:36:39 +0000 Subject: [PATCH 10/16] Rename test_receiver to test_agents --- test/{test_receiver.py => test_agents.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename test/{test_receiver.py => test_agents.py} (100%) diff --git a/test/test_receiver.py b/test/test_agents.py similarity index 100% rename from test/test_receiver.py rename to test/test_agents.py From 6609df0b7576237530c82a64c180fd9977f225ba Mon Sep 17 00:00:00 2001 From: Adrian Coveney Date: Thu, 11 Mar 2021 10:37:28 +0000 Subject: [PATCH 11/16] Correct imports and mocking in moved test --- test/test_agents.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/test/test_agents.py b/test/test_agents.py index 40ef1544..1db8d01c 100644 --- a/test/test_agents.py +++ b/test/test_agents.py @@ -7,7 +7,7 @@ import mock -import bin.receiver +import ssm.agents from ssm.ssm2 import Ssm2Exception @@ -18,12 +18,12 @@ def setUp(self): os.close(self.tf) # Mock the logging to prevent errors due to it not being set up and so # that we can count how often logging methods are called - self.patcher = mock.patch('bin.receiver.log') + self.patcher = mock.patch('logging.Logger', autospec=True) self.mock_log = self.patcher.start() def test_get_empty_dns_file(self): """Attempting to read an empty DNs file should raise an exception.""" - self.assertRaises(Ssm2Exception, bin.receiver.get_dns, + self.assertRaises(Ssm2Exception, ssm.agents.get_dns, self.tf_path, self.mock_log) def test_get_good_dns(self): @@ -42,7 +42,7 @@ def test_get_good_dns(self): f = open(self.tf_path, 'w') f.write(dn_text) f.close() - self.assertEqual(bin.receiver.get_dns(self.tf_path, self.mock_log), output) + self.assertEqual(ssm.agents.get_dns(self.tf_path, self.mock_log), output) def test_get_iffy_dns(self): """Check that the two bad DNs are picked up.""" @@ -57,7 +57,7 @@ def test_get_iffy_dns(self): f = open(self.tf_path, 'w') f.write(dn_text) f.close() - bin.receiver.get_dns(self.tf_path, self.mock_log) + ssm.agents.get_dns(self.tf_path, self.mock_log) self.assertEqual(self.mock_log.warn.call_count, 2) def tearDown(self): From f68fadf3b0a5693bd0e40f3a50872337348d0454 Mon Sep 17 00:00:00 2001 From: Adrian Coveney Date: Wed, 17 Mar 2021 19:30:26 +0000 Subject: [PATCH 12/16] Correct number of args to run_reciever in receiver --- bin/receiver.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bin/receiver.py b/bin/receiver.py index 15b52421..22ecb63c 100644 --- a/bin/receiver.py +++ b/bin/receiver.py @@ -70,7 +70,7 @@ def main(): brokers, project, token = ssm.agents.get_ssm_args(protocol, cp, log) ssm.agents.run_receiver(protocol, brokers, project, token, - cp, log, options.dn_file, log) + cp, log, options.dn_file) if __name__ == '__main__': From 99e6b8754fefd1c24d1d6890ed420ee68ebeb713 Mon Sep 17 00:00:00 2001 From: Adrian Coveney Date: Wed, 17 Mar 2021 19:42:11 +0000 Subject: [PATCH 13/16] Fix and extend get_protocol function - Fix the function so that it supports both receivers and senders. - Add checking of the protocol set so that invalid protocols lead to a sys.exit(1). - Set the logging level to WARN if the default protocol is set as it should be brought to the user's attention that something isn't quite right in their config. --- ssm/agents.py | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/ssm/agents.py b/ssm/agents.py index f9b252a7..5c1b1d90 100644 --- a/ssm/agents.py +++ b/ssm/agents.py @@ -65,13 +65,28 @@ def logging_helper(cp, log_manual_path=''): def get_protocol(cp, log): """Get the protocol from a ConfigParser object, defaulting to STOMP.""" try: - protocol = cp.get('sender', 'protocol') + if 'sender' in cp.sections(): + protocol = cp.get('sender', 'protocol') + elif 'receiver' in cp.sections(): + protocol = cp.get('receiver', 'protocol') + else: + raise ConfigParser.NoSectionError('sender or receiver') + + if protocol not in (Ssm2.STOMP_MESSAGING, Ssm2.AMS_MESSAGING): + raise ValueError except (ConfigParser.NoSectionError, ConfigParser.NoOptionError): # If the newer configuration setting 'protocol' is not set, use 'STOMP' # for backwards compatability. protocol = Ssm2.STOMP_MESSAGING - log.debug("No option set for 'protocol'. Defaulting to %s.", protocol) + log.warn("No option set for 'protocol'. Defaulting to %s.", protocol) + except ValueError: + log.critical("Invalid protocol '%s' set. Must be either '%s' or '%s'.", + protocol, Ssm2.STOMP_MESSAGING, Ssm2.AMS_MESSAGING) + log.critical('SSM will exit.') + print('SSM failed to start. See log file for details.') + sys.exit(1) + return protocol From 5db3141f3b27def861b42228a8dd8794d0ba9837 Mon Sep 17 00:00:00 2001 From: Adrian Coveney Date: Thu, 18 Mar 2021 14:15:29 +0000 Subject: [PATCH 14/16] Add stomp.py DEB dependency to match other OSes --- scripts/ssm-build-deb.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/scripts/ssm-build-deb.sh b/scripts/ssm-build-deb.sh index f145b53b..a03ef312 100755 --- a/scripts/ssm-build-deb.sh +++ b/scripts/ssm-build-deb.sh @@ -54,6 +54,7 @@ fpm -s python -t deb \ --no-auto-depends \ --depends python2.7 \ --depends python-pip \ +--depends 'python-stomp < 5.0.0' \ --depends python-ldap \ --depends libssl-dev \ --depends libsasl2-dev \ From 72ee4174943ee50390a580a7737ffe0a213deb4b Mon Sep 17 00:00:00 2001 From: Adrian Coveney Date: Thu, 18 Mar 2021 15:38:49 +0000 Subject: [PATCH 15/16] Update version numbers for 3.2.0 --- apel-ssm.spec | 2 +- scripts/ssm-build-deb.sh | 2 +- scripts/ssm-build-rpm.sh | 2 +- ssm/__init__.py | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/apel-ssm.spec b/apel-ssm.spec index d9ebd767..9f8719d4 100644 --- a/apel-ssm.spec +++ b/apel-ssm.spec @@ -4,7 +4,7 @@ %endif Name: apel-ssm -Version: 3.1.1 +Version: 3.2.0 %define releasenumber 1 Release: %{releasenumber}%{?dist} Summary: Secure stomp messenger diff --git a/scripts/ssm-build-deb.sh b/scripts/ssm-build-deb.sh index a03ef312..7e996616 100755 --- a/scripts/ssm-build-deb.sh +++ b/scripts/ssm-build-deb.sh @@ -16,7 +16,7 @@ set -eu -TAG=3.1.1-1 +TAG=3.2.0-1 SOURCE_DIR=~/debbuild/source BUILD_DIR=~/debbuild/build diff --git a/scripts/ssm-build-rpm.sh b/scripts/ssm-build-rpm.sh index 9b6cf6ab..d4e3aa8f 100644 --- a/scripts/ssm-build-rpm.sh +++ b/scripts/ssm-build-rpm.sh @@ -10,7 +10,7 @@ rpmdev-setuptree RPMDIR=/home/rpmb/rpmbuild -VERSION=3.1.1-1 +VERSION=3.2.0-1 SSMDIR=apel-ssm-$VERSION # Remove old sources and RPMS diff --git a/ssm/__init__.py b/ssm/__init__.py index d282b9e3..0dfbaae1 100644 --- a/ssm/__init__.py +++ b/ssm/__init__.py @@ -19,7 +19,7 @@ import logging import sys -__version__ = (3, 1, 1) +__version__ = (3, 2, 0) LOG_BREAK = '========================================' From 4bdafb4892f87cb7f522d2440c470f7b3f5a2866 Mon Sep 17 00:00:00 2001 From: Adrian Coveney Date: Thu, 18 Mar 2021 15:45:49 +0000 Subject: [PATCH 16/16] Update changelogs for 3.2.0 --- CHANGELOG | 6 ++++++ apel-ssm.spec | 6 ++++++ 2 files changed, 12 insertions(+) diff --git a/CHANGELOG b/CHANGELOG index 1b0a6082..80e5eeff 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,5 +1,11 @@ Changelog for ssm ================= +* Thu Mar 18 2021 Adrian Coveney - 3.2.0-1 + - Added logging of what certificate DNs/subjects are being used to facilitate + troubleshooting. + - Added stomp.py as .deb package dependency to avoid broken installations. + - Refactored code to enable simpler external calls and better testing. + * Wed Dec 16 2020 Adrian Coveney - 3.1.1-1 - Changed logging to reduce how verbose the logging of a 3rd-party module is. diff --git a/apel-ssm.spec b/apel-ssm.spec index 9f8719d4..b8ea0e86 100644 --- a/apel-ssm.spec +++ b/apel-ssm.spec @@ -100,6 +100,12 @@ rm -rf $RPM_BUILD_ROOT %doc %_defaultdocdir/%{name} %changelog +* Thu Mar 18 2021 Adrian Coveney - 3.2.0-1 + - Added logging of what certificate DNs/subjects are being used to facilitate + troubleshooting. + - Added stomp.py as .deb package dependency to avoid broken installations. + - Refactored code to enable simpler external calls and better testing. + * Wed Dec 16 2020 Adrian Coveney - 3.1.1-1 - Changed logging to reduce how verbose the logging of a 3rd-party module is.