diff --git a/SRAMsync/cba_script_generator.py b/SRAMsync/cba_script_generator.py index 3c7d4f1..370b6e9 100644 --- a/SRAMsync/cba_script_generator.py +++ b/SRAMsync/cba_script_generator.py @@ -6,7 +6,7 @@ class is derived from CuaScriptGenerator, because of its strong relation. By import json import sys -from typing import Any, Callable, Union +from typing import Any, Callable, Union, cast import click from pathlib import Path @@ -17,7 +17,7 @@ class is derived from CuaScriptGenerator, because of its strong relation. By from SRAMsync.sramlogger import logger from SRAMsync.state import State from SRAMsync.sync_with_sram import ConfigValidationError -from SRAMsync.typing import EventHandlerConfig +from SRAMsync.typing import CbaNotificationConfig, EventHandlerConfig class CbaScriptGenerator(CuaScriptGenerator): @@ -40,19 +40,23 @@ class CbaScriptGenerator(CuaScriptGenerator): "required": ["cba_add_cmd", "cba_del_cmd", "cba_machine", "cba_budget_account", "cua_config"], } - def __init__(self, service: str, cfg: dict[str, Any], state: JsonFile, path: Path) -> None: + def __init__(self, service: str, cfg: EventHandlerConfig, state: JsonFile, path: Path) -> None: try: validate( schema=CbaScriptGenerator._schema, instance=cfg["event_handler_config"], format_checker=Draft202012Validator.FORMAT_CHECKER, ) + cua_config: EventHandlerConfig = { - "event_handler_config": cfg["event_handler_config"]["cua_config"], + "event_handler_config": cast(CbaNotificationConfig, cfg["event_handler_config"])[ + "cua_config" + ], "secrets": cfg["secrets"], } super().__init__(service, cfg=cua_config, state=state, cfg_path=path) - self.cfg = cfg["event_handler_config"] + + self.cfg = cast(CbaNotificationConfig, cfg["event_handler_config"]) self._cba_co_budget_mapping_filename = "" except ConfigValidationError as e: raise e diff --git a/SRAMsync/config.py b/SRAMsync/config.py index 5c9cfbb..f19b47b 100644 --- a/SRAMsync/config.py +++ b/SRAMsync/config.py @@ -18,7 +18,7 @@ from SRAMsync.event_handler_proxy import EventHandlerProxy from SRAMsync.sramlogger import logger from SRAMsync.state import NoGracePeriodForGroupError, State -from SRAMsync.typing import StateFile, StatusConfig, EventHandlerConfig, ConfigGroup +from SRAMsync.typing import Secrets, StateFile, StatusConfig, EventHandlerConfig, ConfigGroup import SRAMsync.typing @@ -185,7 +185,7 @@ def __init__(self, config_file: str, args: Union[dict[str, str], None] = None) - self.config = config - self.secrets: dict[str, dict[str, str]] = {} + self.secrets: Secrets = cast(Secrets, {}) if "secrets" in config: with open(file=config["secrets"]["file"], encoding="utf8") as fd: self.secrets = yaml.safe_load(stream=fd) @@ -231,7 +231,7 @@ def get_event_handlers(self, state: State, args: dict[str, str]) -> list[EventHa EventHandler, deduct_event_handler_class(event_handler_full_name=event_handler["name"]) ) - event_handler_cfg = cast(EventHandlerConfig, {}) + event_handler_cfg: EventHandlerConfig = cast(EventHandlerConfig, {}) if "config" in event_handler: event_handler_cfg["event_handler_config"] = event_handler["config"] diff --git a/SRAMsync/cua_script_generator.py b/SRAMsync/cua_script_generator.py index e4477fa..d8051b7 100644 --- a/SRAMsync/cua_script_generator.py +++ b/SRAMsync/cua_script_generator.py @@ -24,7 +24,7 @@ from SRAMsync.json_file import JsonFile from SRAMsync.sramlogger import logger from SRAMsync.sync_with_sram import ConfigValidationError -from SRAMsync.typing import EventHandlerConfig +from SRAMsync.typing import CuaNotificationsConfig, EventHandlerConfig class CuaScriptGenerator(EventHandler): @@ -62,7 +62,7 @@ def __init__(self, service: str, cfg: EventHandlerConfig, state: JsonFile, cfg_p self.run = False self.org_co_uuids = cast(dict[str, str], {}) - self.cfg = cfg["event_handler_config"] + self.cfg = cast(CuaNotificationsConfig, cfg["event_handler_config"]) self.state = state self.script_name = render_templated_string(template_string=self.cfg["filename"], service=service) self.script_file_descriptor = open(file=self.script_name, mode="w+", encoding="utf8") diff --git a/SRAMsync/dummy_event_handler.py b/SRAMsync/dummy_event_handler.py index d443f13..3d078d4 100644 --- a/SRAMsync/dummy_event_handler.py +++ b/SRAMsync/dummy_event_handler.py @@ -43,7 +43,7 @@ def add_new_user( self, entry: dict[str, list[bytes]], **kwargs: str, - ): + ) -> None: """Log the add_new_user event.""" org: str = kwargs["org"] co: str = kwargs["co"] @@ -68,7 +68,7 @@ def get_supported_arguments( ) -> dict[str, dict[str, Union[Union[Callable[[str], None], Callable[[], None]], str]]]: return {} - def add_public_ssh_key(self, co: str, user: str, key: str): + def add_public_ssh_key(self, co: str, user: str, key: str) -> None: """Log the add_public_ssh_key event.""" logger.info( " add_public_ssh_key(%s, %s, %s)", @@ -77,11 +77,11 @@ def add_public_ssh_key(self, co: str, user: str, key: str): click.style(text=key, fg="white", dim=True), ) - def delete_public_ssh_key(self, co: str, user: str, key: str): + def delete_public_ssh_key(self, co: str, user: str, key: str) -> None: """Log the delete_public_ssh_key event.""" logger.info(" delete_public_ssh_key(%s, %s, %s)", co, user, key) - def add_new_groups(self, co: str, groups: list[str], group_attributes: list[str]): + def add_new_groups(self, co: str, groups: list[str], group_attributes: list[str]) -> None: """Log the add_new_group event.""" logger.info( " add_new_group(%s, %s, %s)", @@ -90,7 +90,7 @@ def add_new_groups(self, co: str, groups: list[str], group_attributes: list[str] group_attributes, ) - def remove_group(self, co: str, group: str, group_attributes: list[str]): + def remove_group(self, co: str, group: str, group_attributes: list[str]) -> None: """Log the remove_group event.""" logger.info( " remove_group(%s, %s, %s)", @@ -99,7 +99,7 @@ def remove_group(self, co: str, group: str, group_attributes: list[str]): group_attributes, ) - def add_user_to_group(self, **kwargs: str): + def add_user_to_group(self, **kwargs: str) -> None: """Log the add_user_to_group event.""" try: org: str = kwargs["org"] @@ -133,7 +133,7 @@ def start_grace_period_for_user( duration, ) - def remove_user_from_group(self, co: str, group: str, group_attributes: list[str], user: str): + def remove_user_from_group(self, co: str, group: str, group_attributes: list[str], user: str) -> None: """Log the remove_user_from_group event.""" logger.info( " remove_user_from_group(%s, %s, %s, %s)", @@ -143,7 +143,9 @@ def remove_user_from_group(self, co: str, group: str, group_attributes: list[str click.style(text=user, fg="cyan"), ) - def remove_graced_user_from_group(self, co: str, group: str, group_attributes: list[str], user: str): + def remove_graced_user_from_group( + self, co: str, group: str, group_attributes: list[str], user: str + ) -> None: """Log the remove_graced_user_from_group event.""" logger.info( " remove_graced_user_from_group(%s, %s, %s, %s)", diff --git a/SRAMsync/email_notifications.py b/SRAMsync/email_notifications.py index 5531455..3008da2 100644 --- a/SRAMsync/email_notifications.py +++ b/SRAMsync/email_notifications.py @@ -4,21 +4,31 @@ applied. """ +from datetime import timedelta import smtplib import ssl import sys from email.message import EmailMessage from email.utils import formatdate -from typing import Any, Dict, List +from typing import Any, Dict, List, cast import click from jsonschema import Draft202012Validator, ValidationError, validate +from SRAMsync.cba_script_generator import Path from SRAMsync.common import get_attribute_from_entry, render_templated_string from SRAMsync.event_handler import EventHandler from SRAMsync.sramlogger import logger from SRAMsync.state import State from SRAMsync.sync_with_sram import ConfigValidationError, PasswordNotFound +from SRAMsync.typing import ( + COmessage, + EmailNotificationConfig, + EventHandlerConfig, + Message, + MessageContent, + SMTPconfig, +) class SMTPclient: @@ -32,7 +42,7 @@ class SMTPclient: def __init__( self, - cfg: dict, + cfg: SMTPconfig, service: str, mail_to: str, mail_from: str, @@ -42,34 +52,31 @@ def __init__( ) -> None: self.mail_to = mail_to self.mail_from = mail_from - self.mail_subject = render_templated_string(mail_subject, service=service) + self.mail_subject = render_templated_string(template_string=mail_subject, service=service) self.mail_message = mail_message self.mail_to_stdout = mail_to_stdout if not mail_to_stdout: self.server = self.conntect_to_smtp_server(cfg) if "login" in cfg or "passwd" in cfg: - got_credentials = True - if not "login" in cfg: + if "login" not in cfg: logger.error("Incomplete SMTP credentials. Need login name as well.") - got_credentials = False - if not "passwd" in cfg: + elif "passwd" not in cfg: logger.error("Incomplete SMTP credentials. Need passwd as well.") - got_credentials = False - if got_credentials: - self.login(cfg["login"], cfg["passwd"]) + else: + self.login(login_name=cfg["login"], passwd=cfg["passwd"]) - def __del__(self): + def __del__(self) -> None: if hasattr(self, "server"): logger.debug("Disconnecting SMTP server") self.server.quit() @staticmethod - def conntect_to_smtp_server(cfg: dict) -> smtplib.SMTP: + def conntect_to_smtp_server(cfg: SMTPconfig) -> smtplib.SMTP: """Connect to an SMTP server.""" - msg = f"SMTP: connecting to: {cfg['host']}" - host = cfg["host"] - port = 0 # If port not in cfg, then use system default port. + msg: str = f"SMTP: connecting to: {cfg['host']}" + host: str = cfg["host"] + port: int = 0 # If port not in cfg, then use system default port. if "port" in cfg: msg = msg + f":{cfg['port']}" @@ -100,10 +107,10 @@ def login(self, login_name: str, passwd: str) -> None: self.server.starttls(context=ctx) logger.debug("SMTP: trying to login") - self.server.login(login_name, passwd) + self.server.login(user=login_name, password=passwd) logger.debug("SMTP: login successful") - def send_message(self, message: str, service: str): + def send_message(self, message: str, service: str) -> None: """Send a message through an opened SMTP server.""" try: logger.debug("Sending message") @@ -113,15 +120,17 @@ def send_message(self, message: str, service: str): msg["from"] = self.mail_from msg["subject"] = self.mail_subject msg["Date"] = formatdate(localtime=True) - content = render_templated_string(self.mail_message, service=service, message=message) + content = render_templated_string( + template_string=self.mail_message, service=service, message=message + ) msg.set_content(content) if not self.mail_to_stdout: self.server.send_message(msg) else: - logger.info(click.style("Want to sent message:", bold=True, fg="yellow")) + logger.info(click.style(text="Want to sent message:", bold=True, fg="yellow")) logger.info(msg) - logger.info(click.style("end-of-message", bold=True, fg="yellow")) + logger.info(click.style(text="end-of-message", bold=True, fg="yellow")) logger.debug("Message sent") except smtplib.SMTPServerDisconnected: @@ -209,8 +218,8 @@ class EmailNotifications(EventHandler): "!eNULL:!MD5" ) - def __init__(self, service: str, cfg: dict, state: State, config_path) -> None: - super().__init__(service, cfg, state, config_path) + def __init__(self, service: str, cfg: EventHandlerConfig, state: State, config_path: Path) -> None: + super().__init__(service=service, cfg=cfg, state=state, cfg_path=config_path) try: validate( schema=self._schema, @@ -220,7 +229,7 @@ def __init__(self, service: str, cfg: dict, state: State, config_path) -> None: # self.mail_to_stdout = "mail-to-stdout" in args self.mail_to_stdout = False - self.cfg = cfg["event_handler_config"] + self.cfg = cast(EmailNotificationConfig, cfg["event_handler_config"]) self.collect_events = cfg["event_handler_config"].get("collect_events", True) if "aggregate_mails" in cfg["event_handler_config"]: @@ -233,19 +242,19 @@ def __init__(self, service: str, cfg: dict, state: State, config_path) -> None: self.aggregate_mails = True if "passwd_from_secrets" in self.cfg["smtp"]: - login_name = self.cfg["smtp"]["login"] - host = self.cfg["smtp"]["host"] + login_name: str = self.cfg["smtp"]["login"] + host: str = self.cfg["smtp"]["host"] self.cfg["smtp"]["passwd"] = cfg["secrets"]["smtp"][host][login_name] self.service = service - self._messages = {} + self._messages: Message = {} self.finalize_message = "" except ValidationError as e: - raise ConfigValidationError(e, config_path) from e + raise ConfigValidationError(exception=e, path=config_path) from e except KeyError as e: raise PasswordNotFound( - f"SMTP password for host {cfg['event_handler_config']['smtp']['host']} not found. " + f"SMTP password for host {cast(EmailNotificationConfig, cfg['event_handler_config'])['smtp']['host']} not found. " "Check your password source." ) from e @@ -271,7 +280,7 @@ def add_message_to_current_co_group( self._messages[co] = {} if event not in self._messages[co]: - self._messages[co][event] = {} + self._messages[co][event] = cast(MessageContent, {}) self._messages[co][event]["important"] = important self._messages[co][event]["messages"] = set() @@ -291,7 +300,7 @@ def add_message_to_current_co_group( def send_queued_messages(self) -> None: """Send all queued message.""" - message = "" + message: str = "" if "smtp" in self.cfg: smtp_client = SMTPclient( @@ -310,111 +319,111 @@ def send_queued_messages(self) -> None: self.cleanup_messages() for co_messages in self._messages.values(): - message = message + self.render_message(co_messages) + message = message + self.render_message(messages=co_messages) if message and not self.aggregate_mails: if self.finalize_message: message = message + self.finalize_message message = message.strip() - smtp_client.send_message(message, self.service) + smtp_client.send_message(message=message, service=self.service) message = "" if message and self.aggregate_mails: if self.finalize_message: message = message + self.finalize_message message = message.strip() - smtp_client.send_message(message, self.service) + smtp_client.send_message(message=message, service=self.service) def cleanup_messages(self) -> None: """Clean up some of the messages to make the resulting e-mail a bit easier to read.""" try: - self._messages[self.service]["add-group"]["messages"] = sorted( - self._messages[self.service]["add-group"]["messages"] + self._messages[self.service]["add-group"]["messages"] = set( + sorted(self._messages[self.service]["add-group"]["messages"]) ) - except: + except KeyError: pass try: - self._messages[self.service]["add-user-to-group"]["messages"] = sorted( - self._messages[self.service]["add-user-to-group"]["messages"] + self._messages[self.service]["add-user-to-group"]["messages"] = set( + sorted(self._messages[self.service]["add-user-to-group"]["messages"]) ) - except: + except KeyError: pass - def render_message(self, messages: dict) -> str: + def render_message(self, messages: COmessage) -> str: """Render a final message for collected event messages.""" - events = [k for k, v in messages.items() if v["important"] is True] + events: list[str] = [k for k, v in messages.items() if v["important"] is True] if not events: return "" final_message = "" for event, event_values in messages.items(): - message_part = "" + message_part: str = "" for line in event_values["messages"]: message_part = f"{message_part}{line}\n" if event in self.report_events and "header" in self.report_events[event]: - header = self.report_events[event]["header"] + header: str = cast(str, self.report_events[event]["header"]) message_part = f"{header}\n{message_part}" - final_message = final_message + message_part + final_message: str = final_message + message_part return final_message - def process_co_attributes(self, attributes: Dict[str, str], org: str, co: str) -> None: + def process_co_attributes(self, attributes: dict[str, list[bytes]], org: str, co: str) -> None: super().process_co_attributes(attributes, org, co) - def add_event_message(self, co: str, event: str, important: bool = True, **args) -> None: + def add_event_message(self, co: str, event: str, important: bool = True, **args: Any) -> None: """Add an event message and apply formatting to it.""" if event in self.report_events: - event_message = f"{self.report_events[event]['line']}".format(co=co, **args) + event_message: str = f"{self.report_events[event]['line']}".format(co=co, **args) self.add_message_to_current_co_group(co, event, event_message, important) def start_of_co_processing(self, co: str) -> None: """Add start event message to the message queue.""" - self.add_event_message(co, "start-co-processing", important=False) + self.add_event_message(co, event="start-co-processing", important=False) - def add_new_user(self, entry: dict, **kwargs) -> None: + def add_new_user(self, entry: dict[str, list[bytes]], **kwargs: str) -> None: """Add add-new-user event message to the message queue.""" try: - co = kwargs["co"] - groups = kwargs["groups"] - user = kwargs["user"] + co: str = kwargs["co"] + groups: str = kwargs["groups"] + user: str = kwargs["user"] except KeyError as e: logger.error("Missing(email_notification) argument: %s", e) sys.exit(1) - givenname = get_attribute_from_entry(entry, "givenName") - sn = get_attribute_from_entry(entry, "sn") - mail = get_attribute_from_entry(entry, "mail") + givenname: str = get_attribute_from_entry(entry=entry, attribute="givenName") + sn: str = get_attribute_from_entry(entry=entry, attribute="sn") + mail: str = get_attribute_from_entry(entry=entry, attribute="mail") self.add_event_message( - co, "add-new-user", groups=groups, givenname=givenname, sn=sn, user=user, mail=mail + co=co, event="add-new-user", groups=groups, givenname=givenname, sn=sn, user=user, mail=mail ) def add_public_ssh_key(self, co: str, user: str, key: str) -> None: """Add add-shh-key event message to the message queue.""" - self.add_event_message(co, "add-ssh-key", user=user, key=key) + self.add_event_message(co, event="add-ssh-key", user=user, key=key) def delete_public_ssh_key(self, co: str, user: str, key: str) -> None: """Add delete-ssh-key event message to the message queue.""" - self.add_event_message(co, "delete-ssh-key", user=user, key=key) + self.add_event_message(co, event="delete-ssh-key", user=user, key=key) - def add_new_groups(self, co: str, groups: List[str], group_attributes: list) -> None: + def add_new_groups(self, co: str, groups: List[str], group_attributes: list[str]) -> None: """Add add-group event message to the message queue.""" for group in groups: - self.add_event_message(co, "add-group", group=group, attributes=group_attributes) + self.add_event_message(co, event="add-group", group=group, attributes=group_attributes) - def remove_group(self, co: str, group: str, group_attributes: list) -> None: + def remove_group(self, co: str, group: str, group_attributes: list[str]) -> None: """Add remove-group event message to the message queue.""" - self.add_event_message(co, "remove-group", group=group, attributes=group_attributes) + self.add_event_message(co, event="remove-group", group=group, attributes=group_attributes) - def add_user_to_group(self, **kwargs) -> None: + def add_user_to_group(self, **kwargs: str) -> None: try: - co = kwargs["co"] - groups = kwargs["groups"] - group_attributes = kwargs["group_attributes"] - user = kwargs["user"] + co: str = kwargs["co"] + groups: str = kwargs["groups"] + group_attributes: str = kwargs["group_attributes"] + user: str = kwargs["user"] except KeyError as e: logger.error("Missing(email_notification) argument: %s", e) sys.exit(1) @@ -422,34 +431,39 @@ def add_user_to_group(self, **kwargs) -> None: """Add add-user-to-group event message to the message queue.""" for group in groups: self.add_event_message( - co, "add-user-to-group", group=group, user=user, attributes=group_attributes + co=co, event="add-user-to-group", group=group, user=user, attributes=group_attributes ) def start_grace_period_for_user( - self, co: str, group: str, group_attributes: list, user: str, duration: str - ): + self, co: str, group: str, group_attributes: list[str], user: str, duration: timedelta + ) -> None: """The grace period for the users has started.""" self.add_event_message( co, - "start-grace-period-for-user", + event="start-grace-period-for-user", group=group, attributes=group_attributes, user=user, duration=duration, ) - def remove_user_from_group(self, co, group: str, group_attributes: list, user: str) -> None: + def remove_user_from_group(self, co: str, group: str, group_attributes: list[str], user: str) -> None: """Add remove-user-from-group event message to the message queue.""" self.add_event_message( - co, "remove-user-from-group", group=group, user=user, attributes=group_attributes + co=co, event="remove-user-from-group", group=group, user=user, attributes=group_attributes ) - def remove_graced_user_from_group(self, co, group: str, group_attributes: list, user: str) -> None: + def remove_graced_user_from_group( + self, co: str, group: str, group_attributes: list[str], user: str + ) -> None: """Add remove-grace-users-from-group event message to the message queue.""" self.add_event_message( - co, "remove-graced-user-from-group", group=group, user=user, attributes=group_attributes + co=co, event="remove-graced-user-from-group", group=group, user=user, attributes=group_attributes ) + def remove_user(self, user: str, state: State) -> None: + super().remove_user(user, state) + def finalize(self) -> None: """ Render finalize event message. The finalize event is not associated @@ -457,5 +471,5 @@ def finalize(self) -> None: finishing the synchronization. """ if "finalize" in self.report_events: - event_message = f"{self.report_events['finalize']['line']}" + event_message: str = f"{self.report_events['finalize']['line']}" self.finalize_message = event_message diff --git a/SRAMsync/typing.py b/SRAMsync/typing.py index 363525b..c0b7b30 100644 --- a/SRAMsync/typing.py +++ b/SRAMsync/typing.py @@ -1,9 +1,21 @@ from typing import NewType, TypedDict, Union +from typing_extensions import NotRequired -class EventHandler(TypedDict): - name: str - config: dict[str, str] +class MessageContent(TypedDict): + important: bool + messages: set[str] + + +COmessage = dict[str, MessageContent] +Message = dict[str, COmessage] + + +class SMTPconfig(TypedDict): + host: str + port: int + login: NotRequired[str] + passwd: NotRequired[str] class ConfigGroup(TypedDict): @@ -11,9 +23,74 @@ class ConfigGroup(TypedDict): destination: list[str] +Secrets = TypedDict("Secrets", {"sram-ldap": dict[str, str], "smtp": dict[str, dict[str, str]]}) + + +class HeaderLine(TypedDict): + header: str + line: str + + +EmailHeaders = TypedDict( + "EmailHeaders", + { + "mail-to": str, + "mail-from": str, + "mail-message": str, + "mail-subject": str, + }, +) +report_events = TypedDict( + "report_events", + { + "start-co-processing": HeaderLine, + "add-new-user": HeaderLine, + "add-group": HeaderLine, + "add-user-to-group": HeaderLine, + "remove-user-from-group": HeaderLine, + "remove-graced-user-from-group": HeaderLine, + "finalize": HeaderLine, + }, +) + + +class EmailNotificationConfig(EmailHeaders): + aggregate_mails: bool + collect_events: bool + smtp: SMTPconfig + report_events: report_events + + +class CuaNotificationsConfig(TypedDict): + filename: str + add_cmd: str + modify_cmd: str + check_cmd: str + sshkey_cmd: str + + +class CbaNotificationConfig(TypedDict): + cua_config: CuaNotificationsConfig + cba_budget_account: str + cba_machine: str + cba_add_cmd: str + cba_del_cmd: str + + +class DummyEventHandler(TypedDict): + pass + + class EventHandlerConfig(TypedDict): - event_handler_config: dict[str, str] - secrets: dict[str, dict[str, str]] + event_handler_config: Union[ + EmailNotificationConfig, CuaNotificationsConfig, CbaNotificationConfig, DummyEventHandler + ] + secrets: Secrets + + +class EventHandler(TypedDict): + name: str + config: EventHandlerConfig class Sync(TypedDict):