From 405d5a42c028d68f7f577c7ce967498ee05d67f0 Mon Sep 17 00:00:00 2001 From: Vladyslav Tarasenko Date: Fri, 15 Nov 2024 22:17:07 +0200 Subject: [PATCH 1/5] Add binlog_utils_udf plugin. --- lib/charms/mysql/v0/mysql.py | 1 + src/charm.py | 1 + src/upgrade.py | 1 + 3 files changed, 3 insertions(+) diff --git a/lib/charms/mysql/v0/mysql.py b/lib/charms/mysql/v0/mysql.py index 535e3c3fc..e107ab9a1 100644 --- a/lib/charms/mysql/v0/mysql.py +++ b/lib/charms/mysql/v0/mysql.py @@ -1100,6 +1100,7 @@ def install_plugins(self, plugins: list[str]) -> None: supported_plugins = { "audit_log": ("INSTALL PLUGIN audit_log SONAME", "audit_log.so"), "audit_log_filter": ("INSTALL PLUGIN audit_log_filter SONAME", "audit_log_filter.so"), + "binlog_utils_udf": ("INSTALL PLUGIN binlog_utils_udf SONAME", "binlog_utils_udf.so"), } try: diff --git a/src/charm.py b/src/charm.py index d2c1c295d..3b6d96267 100755 --- a/src/charm.py +++ b/src/charm.py @@ -657,6 +657,7 @@ def workload_initialise(self) -> None: if self.config.plugin_audit_enabled: self._mysql.install_plugins(["audit_log", "audit_log_filter"]) + self._mysql.install_plugins(["binlog_utils_udf"]) # ensure hostname can be resolved self.hostname_resolution.update_etc_hosts(None) diff --git a/src/upgrade.py b/src/upgrade.py index eee76c26c..ac5cfd2b4 100644 --- a/src/upgrade.py +++ b/src/upgrade.py @@ -199,6 +199,7 @@ def _on_upgrade_granted(self, event: UpgradeGrantedEvent) -> None: # noqa: C901 self.charm._mysql.start_mysqld() if self.charm.config.plugin_audit_enabled: self.charm._mysql.install_plugins(["audit_log", "audit_log_filter"]) + self.charm._mysql.install_plugins(["binlog_utils_udf"]) self.charm._mysql.setup_logrotate_and_cron() except VersionError: logger.exception("Failed to upgrade MySQL dependencies") From ae58701b580333836be9f78f4b2b5af2edd2d20a Mon Sep 17 00:00:00 2001 From: Vladyslav Tarasenko Date: Fri, 15 Nov 2024 22:19:03 +0200 Subject: [PATCH 2/5] Enable gtid_mode and enforce_gtid_consistency for the MySQL. --- lib/charms/mysql/v0/mysql.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/charms/mysql/v0/mysql.py b/lib/charms/mysql/v0/mysql.py index e107ab9a1..8a1d62037 100644 --- a/lib/charms/mysql/v0/mysql.py +++ b/lib/charms/mysql/v0/mysql.py @@ -1009,6 +1009,8 @@ def render_mysqld_configuration( # noqa: C901 "binlog_expire_logs_seconds": f"{binlog_retention_seconds}", "loose-audit_log_policy": "LOGINS", "loose-audit_log_file": f"{snap_common}/var/log/mysql/audit.log", + "gtid_mode": "ON", + "enforce_gtid_consistency": "ON", } if audit_log_enabled: From 2ce63f734a11457c7495bc21798080c2cba4483c Mon Sep 17 00:00:00 2001 From: Vladyslav Tarasenko Date: Thu, 21 Nov 2024 23:05:23 +0200 Subject: [PATCH 3/5] Add S3 compatibility check based on the group replication id. --- lib/charms/mysql/v0/backups.py | 88 ++++++++++++++++++++++++++--- lib/charms/mysql/v0/mysql.py | 27 +++++++++ lib/charms/mysql/v0/s3_helpers.py | 93 +++++++++++++++++++++++++++++++ src/charm.py | 4 ++ 4 files changed, 205 insertions(+), 7 deletions(-) diff --git a/lib/charms/mysql/v0/backups.py b/lib/charms/mysql/v0/backups.py index 76024878c..21ee580ba 100644 --- a/lib/charms/mysql/v0/backups.py +++ b/lib/charms/mysql/v0/backups.py @@ -51,7 +51,11 @@ def is_unit_blocked(self) -> bool: import typing from typing import Dict, List, Optional, Tuple -from charms.data_platform_libs.v0.s3 import S3Requirer +from charms.data_platform_libs.v0.s3 import ( + CredentialsChangedEvent, + CredentialsGoneEvent, + S3Requirer, +) from charms.mysql.v0.mysql import ( MySQLConfigureInstanceError, MySQLCreateClusterError, @@ -76,6 +80,7 @@ def is_unit_blocked(self) -> bool: MySQLUnableToGetMemberStateError, ) from charms.mysql.v0.s3_helpers import ( + ensure_s3_compatible_group_replication_id, fetch_and_check_existence_of_s3_path, list_backups_in_s3_path, upload_content_to_s3, @@ -102,6 +107,10 @@ def is_unit_blocked(self) -> bool: # to 0 if you are raising the major API version LIBPATCH = 12 +ANOTHER_S3_CLUSTER_REPOSITORY_ERROR_MESSAGE = "S3 repository claimed by another cluster" +MOVE_RESTORED_CLUSTER_TO_ANOTHER_S3_REPOSITORY_ERROR = ( + "Move restored cluster to another S3 repository" +) if typing.TYPE_CHECKING: from charm import MySQLOperatorCharm @@ -119,6 +128,13 @@ def __init__(self, charm: "MySQLOperatorCharm", s3_integrator: S3Requirer) -> No self.framework.observe(self.charm.on.create_backup_action, self._on_create_backup) self.framework.observe(self.charm.on.list_backups_action, self._on_list_backups) self.framework.observe(self.charm.on.restore_action, self._on_restore) + self.framework.observe( + self.s3_integrator.on.credentials_changed, self._on_s3_credentials_changed + ) + self.framework.observe(self.charm.on.leader_elected, self._on_s3_credentials_changed) + self.framework.observe( + self.s3_integrator.on.credentials_gone, self._on_s3_credentials_gone + ) # ------------------ Helpers ------------------ @property @@ -235,18 +251,33 @@ def _on_list_backups(self, event: ActionEvent) -> None: # ------------------ Create Backup ------------------ - def _on_create_backup(self, event: ActionEvent) -> None: - """Handle the create backup action.""" - logger.info("A backup has been requested on unit") + def _pre_create_backup_checks(self, event: ActionEvent) -> bool: + """Run some checks before creating the backup. + Returns: a boolean indicating whether operation should be run. + """ if not self._s3_integrator_relation_exists: logger.error("Backup failed: missing relation with S3 integrator charm") event.fail("Missing relation with S3 integrator charm") - return + return False + + if "s3-block-message" in self.charm.app_peer_data: + logger.error("Backup failed: S3 relation is blocked for write") + event.fail("S3 relation is blocked for write") + return False if not self.charm._mysql.is_mysqld_running(): logger.error(f"Backup failed: process mysqld is not running on {self.charm.unit.name}") event.fail("Process mysqld not running") + return False + + return True + + def _on_create_backup(self, event: ActionEvent) -> None: + """Handle the create backup action.""" + logger.info("A backup has been requested on unit") + + if not self._pre_create_backup_checks(event): return datetime_backup_requested = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ") @@ -519,14 +550,19 @@ def _on_restore(self, event: ActionEvent) -> None: if not success: logger.error(f"Restore failed: {error_message}") event.fail(error_message) - if recoverable: self._clean_data_dir_and_start_mysqld() else: + self.charm.app_peer_data.update({ + "s3-block-message": MOVE_RESTORED_CLUSTER_TO_ANOTHER_S3_REPOSITORY_ERROR, + }) self.charm.unit.status = BlockedStatus(error_message) - return + self.charm.app_peer_data.update({ + "s3-block-message": MOVE_RESTORED_CLUSTER_TO_ANOTHER_S3_REPOSITORY_ERROR, + }) + # Run post-restore operations self.charm.unit.status = MaintenanceStatus("Running post-restore operations") success, error_message = self._post_restore() @@ -674,3 +710,41 @@ def _post_restore(self) -> Tuple[bool, str]: return False, "Failed to rescan the cluster" return True, "" + + def _on_s3_credentials_changed(self, event: CredentialsChangedEvent) -> None: + if not self.charm.unit.is_leader(): + logger.debug("Early exit on _on_s3_credentials_changed: unit is not a leader") + return + + if not self._s3_integrator_relation_exists: + logger.debug( + "Early exit on _on_s3_credentials_changed: s3 integrator relation does not exist" + ) + return + + logger.info("Retrieving s3 parameters from the s3-integrator relation") + s3_parameters, missing_parameters = self._retrieve_s3_parameters() + if missing_parameters: + logger.error(f"Missing S3 parameters: {missing_parameters}") + return + + logger.info("Ensuring compatibility with the provided S3 repository") + if ensure_s3_compatible_group_replication_id( + self.charm._mysql.get_current_group_replication_id(), s3_parameters + ): + self.charm.app_peer_data.update({ + "s3-block-message": "", + }) + else: + self.charm.app_peer_data.update({ + "s3-block-message": ANOTHER_S3_CLUSTER_REPOSITORY_ERROR_MESSAGE, + }) + + def _on_s3_credentials_gone(self, event: CredentialsGoneEvent) -> None: + if not self.charm.unit.is_leader(): + logger.debug("Early exit on _on_s3_credentials_gone: unit is not a leader") + return + + self.charm.app_peer_data.update({ + "s3-block-message": "", + }) diff --git a/lib/charms/mysql/v0/mysql.py b/lib/charms/mysql/v0/mysql.py index 8a1d62037..b81d10073 100644 --- a/lib/charms/mysql/v0/mysql.py +++ b/lib/charms/mysql/v0/mysql.py @@ -419,6 +419,10 @@ class MySQLPluginInstallError(Error): """Exception raised when there is an issue installing a MySQL plugin.""" +class MySQLGetGroupReplicationIDError(Error): + """Exception raised when there is an issue acquiring current current group replication id.""" + + @dataclasses.dataclass class RouterUser: """MySQL Router user.""" @@ -3111,6 +3115,29 @@ def strip_off_passwords(self, input_string: str) -> str: stripped_input = stripped_input.replace(password, "xxxxxxxxxxxx") return stripped_input + def get_current_group_replication_id(self) -> str: + """Get the current group replication id.""" + logger.debug("Getting current group replication id") + + commands = ( + f"shell.connect('{self.instance_def(self.server_config_user)}')", + 'result = session.run_sql("SELECT @@GLOBAL.group_replication_group_name")', + 'print(f"{result.fetch_one()[0]}")', + ) + + try: + output = self._run_mysqlsh_script("\n".join(commands)) + except MySQLClientError as e: + logger.warning("Failed to get current group replication id", exc_info=e) + raise MySQLGetGroupReplicationIDError(e.message) + + matches = re.search(r"(.+)", output) + + if not matches: + raise MySQLGetGroupReplicationIDError("Failed to get current group replication id") + + return matches.group(1) + @abstractmethod def is_mysqld_running(self) -> bool: """Returns whether mysqld is running.""" diff --git a/lib/charms/mysql/v0/s3_helpers.py b/lib/charms/mysql/v0/s3_helpers.py index 2dce393d1..4e4953a63 100644 --- a/lib/charms/mysql/v0/s3_helpers.py +++ b/lib/charms/mysql/v0/s3_helpers.py @@ -16,11 +16,15 @@ import base64 import logging +import pathlib import tempfile import time +from io import BytesIO from typing import Dict, List, Tuple import boto3 +import botocore +import botocore.exceptions logger = logging.getLogger(__name__) @@ -34,6 +38,8 @@ # to 0 if you are raising the major API version LIBPATCH = 9 +S3_GROUP_REPLICATION_ID_FILE = "group_replication_id.txt" + # botocore/urllib3 clutter the logs when on debug logging.getLogger("botocore").setLevel(logging.WARNING) logging.getLogger("urllib3").setLevel(logging.WARNING) @@ -93,6 +99,68 @@ def upload_content_to_s3(content: str, content_path: str, s3_parameters: Dict) - return True +def _read_content_from_s3(content_path: str, s3_parameters: dict) -> str | None: + """Reads specified content from the provided S3 bucket. + + Args: + content_path: The S3 path from which download the content + s3_parameters: A dictionary containing the S3 parameters + The following are expected keys in the dictionary: bucket, region, + endpoint, access-key and secret-key + + Returns: + a string with the content if object is successfully downloaded and None if file is not existing or error + occurred during download. + """ + ca_file = None + try: + logger.info(f"Reading content from bucket={s3_parameters['bucket']}, path={content_path}") + session = boto3.session.Session( + aws_access_key_id=s3_parameters["access-key"], + aws_secret_access_key=s3_parameters["secret-key"], + region_name=s3_parameters["region"], + ) + verif = True + if ca_chain := s3_parameters.get("tls-ca-chain"): + ca_file = tempfile.NamedTemporaryFile() + ca = "\n".join([base64.b64decode(s).decode() for s in ca_chain]) + ca_file.write(ca.encode()) + ca_file.flush() + verif = ca_file.name + + s3 = session.resource( + "s3", + endpoint_url=s3_parameters["endpoint"], + verify=verif, + ) + + bucket = s3.Bucket(s3_parameters["bucket"]) + + with BytesIO() as buf: + bucket.download_fileobj(content_path, buf) + return buf.getvalue().decode("utf-8") + except botocore.exceptions.ClientError as e: + if e.response["Error"]["Code"] == "404": + logger.info( + f"No such object to read from S3 bucket={s3_parameters['bucket']}, path={content_path}" + ) + else: + logger.exception( + f"Failed to read content from S3 bucket={s3_parameters['bucket']}, path={content_path}", + exc_info=e, + ) + except Exception as e: + logger.exception( + f"Failed to read content from S3 bucket={s3_parameters['bucket']}, path={content_path}", + exc_info=e, + ) + finally: + if ca_file: + ca_file.close() + + return None + + def _compile_backups_from_file_ids( metadata_ids: List[str], md5_ids: List[str], log_ids: List[str] ) -> List[Tuple[str, str]]: @@ -217,3 +285,28 @@ def fetch_and_check_existence_of_s3_path(path: str, s3_parameters: Dict[str, str exc_info=e, ) raise + + +def ensure_s3_compatible_group_replication_id( + group_replication_id: str, s3_parameters: Dict[str, str] +) -> bool: + """Checks if group replication id is equal to the one in the provided S3 repository. + + If S3 doesn't have this claim (so it's not initialized), + then it will be populated automatically with the provided id. + + Args: + group_replication_id: group replication id of the current cluster + s3_parameters: A dictionary containing the S3 parameters + The following are expected keys in the dictionary: bucket, region, + endpoint, access-key and secret-key + """ + s3_id_path = str(pathlib.Path(s3_parameters["path"]) / S3_GROUP_REPLICATION_ID_FILE) + s3_id = _read_content_from_s3(s3_id_path, s3_parameters) + if s3_id and s3_id != group_replication_id: + logger.info( + f"s3 repository is not compatible based on group replication id: {group_replication_id} != {s3_id}" + ) + return False + upload_content_to_s3(group_replication_id, s3_id_path, s3_parameters) + return True diff --git a/src/charm.py b/src/charm.py index 3b6d96267..26b8ee001 100755 --- a/src/charm.py +++ b/src/charm.py @@ -535,6 +535,10 @@ def _on_update_status(self, _) -> None: # noqa: C901 self.unit.status = MaintenanceStatus("Unable to find cluster primary") return + if "s3-block-message" in self.app_peer_data: + self.app.status = BlockedStatus(self.app_peer_data["s3-block-message"]) + return + # Set active status when primary is known self.app.status = ActiveStatus() From 7872d7d668e1ac390972c4efb73bbb1c42c22e3a Mon Sep 17 00:00:00 2001 From: Vladyslav Tarasenko Date: Sun, 8 Dec 2024 03:51:41 +0200 Subject: [PATCH 4/5] Point-in-time-recovery. --- actions.yaml | 3 + lib/charms/mysql/v0/backups.py | 132 +++++++++++++++++++++++++++++++-- lib/charms/mysql/v0/mysql.py | 82 ++++++++++++++++++++ src/charm.py | 3 + src/constants.py | 5 ++ src/mysql_vm_helpers.py | 45 +++++++++++ 6 files changed, 262 insertions(+), 8 deletions(-) diff --git a/actions.yaml b/actions.yaml index e0ce6291b..44598c1e1 100644 --- a/actions.yaml +++ b/actions.yaml @@ -55,6 +55,9 @@ restore: backup-id: type: string description: A backup-id to identify the backup to restore (format = %Y-%m-%dT%H:%M:%SZ) + restore-to-time: + type: string + description: Point-in-time-recovery target in MySQL timestamp format. pre-upgrade-check: description: Run necessary pre-upgrade checks and preparations before executing a charm refresh. diff --git a/lib/charms/mysql/v0/backups.py b/lib/charms/mysql/v0/backups.py index 21ee580ba..93b9873ab 100644 --- a/lib/charms/mysql/v0/backups.py +++ b/lib/charms/mysql/v0/backups.py @@ -46,11 +46,13 @@ def is_unit_blocked(self) -> bool: """ import datetime +import io import logging import pathlib import typing from typing import Dict, List, Optional, Tuple +import yaml from charms.data_platform_libs.v0.s3 import ( CredentialsChangedEvent, CredentialsGoneEvent, @@ -71,6 +73,7 @@ def is_unit_blocked(self) -> bool: MySQLPrepareBackupForRestoreError, MySQLRescanClusterError, MySQLRestoreBackupError, + MySQLRestorePitrError, MySQLRetrieveBackupWithXBCloudError, MySQLServiceNotRunningError, MySQLSetInstanceOfflineModeError, @@ -90,7 +93,12 @@ def is_unit_blocked(self) -> bool: from ops.jujuversion import JujuVersion from ops.model import BlockedStatus, MaintenanceStatus -from constants import MYSQL_DATA_DIR +from constants import ( + MYSQL_BINLOGS_COLLECTOR_CONFIG_FILE, + MYSQL_DATA_DIR, + SERVER_CONFIG_PASSWORD_KEY, + SERVER_CONFIG_USERNAME, +) logger = logging.getLogger(__name__) @@ -510,7 +518,7 @@ def _pre_restore_checks(self, event: ActionEvent) -> bool: return True - def _on_restore(self, event: ActionEvent) -> None: + def _on_restore(self, event: ActionEvent) -> None: # noqa: C901 """Handle the restore backup action event. Restore a backup from S3 (parameters for which can retrieved from the @@ -520,7 +528,12 @@ def _on_restore(self, event: ActionEvent) -> None: return backup_id = event.params["backup-id"].strip().strip("/") - logger.info(f"A restore with backup-id {backup_id} has been requested on unit") + restore_to_time = event.params.get("restore-to-time") + logger.info( + f"A restore with backup-id {backup_id}" + f"{f' to time point {restore_to_time}' if restore_to_time else ''}" + f" has been requested on the unit" + ) # Retrieve and validate missing S3 parameters s3_parameters, missing_parameters = self._retrieve_s3_parameters() @@ -555,13 +568,35 @@ def _on_restore(self, event: ActionEvent) -> None: else: self.charm.app_peer_data.update({ "s3-block-message": MOVE_RESTORED_CLUSTER_TO_ANOTHER_S3_REPOSITORY_ERROR, + "binlogs-collecting": "", }) + if not self.charm._mysql.start_stop_binlogs_collecting(): + logger.error("Failed to stop binlogs collecting after failed restore") self.charm.unit.status = BlockedStatus(error_message) return self.charm.app_peer_data.update({ "s3-block-message": MOVE_RESTORED_CLUSTER_TO_ANOTHER_S3_REPOSITORY_ERROR, + "binlogs-collecting": "", }) + if not self.charm._mysql.start_stop_binlogs_collecting(): + logger.error("Failed to stop binlogs collecting prior to restore") + + success, error_message = self._clean_data_dir_and_start_mysqld() + if not success: + logger.error(f"Restore failed: {error_message}") + self.charm.unit.status = BlockedStatus(error_message) + event.fail(error_message) + return + + if restore_to_time is not None: + self.charm.unit.status = MaintenanceStatus("Running point-in-time-recovery operations") + success, error_message = self._pitr_restore(restore_to_time, s3_parameters) + if not success: + logger.error(f"Restore failed: {error_message}") + event.fail(error_message) + self.charm.unit.status = BlockedStatus(error_message) + return # Run post-restore operations self.charm.unit.status = MaintenanceStatus("Running post-restore operations") @@ -672,15 +707,29 @@ def _clean_data_dir_and_start_mysqld(self) -> Tuple[bool, str]: return True, "" + def _pitr_restore( + self, restore_to_time: str, s3_parameters: Dict[str, str] + ) -> Tuple[bool, str]: + try: + logger.info("Restoring point-in-time-recovery") + stdout, stderr = self.charm._mysql.restore_pitr( + host=self.charm.get_unit_address(self.charm.unit), + mysql_user=SERVER_CONFIG_USERNAME, + password=self.charm.get_secret("app", SERVER_CONFIG_PASSWORD_KEY), + s3_parameters=s3_parameters, + restore_to_time=restore_to_time, + ) + logger.debug(f"Stdout of mysql-pitr-helper restore command: {stdout}") + logger.debug(f"Stderr of mysql-pitr-helper restore command: {stderr}") + except MySQLRestorePitrError: + return False, f"Failed to restore point-in-time-recovery to the {restore_to_time}" + return True, "" + def _post_restore(self) -> Tuple[bool, str]: """Run operations required after restoring a backup. Returns: tuple of (success, error_message) """ - success, error_message = self._clean_data_dir_and_start_mysqld() - if not success: - return success, error_message - try: logger.info("Configuring instance to be part of an InnoDB cluster") self.charm._mysql.configure_instance(create_cluster_admin=False) @@ -722,6 +771,17 @@ def _on_s3_credentials_changed(self, event: CredentialsChangedEvent) -> None: ) return + if ( + not self.charm._mysql.is_mysqld_running() + or not self.charm.unit_initialized + or not self.charm.upgrade.idle + ): + logger.debug( + "Deferring _on_s3_credentials_changed: mysql cluster is not started yet or upgrade is occurring" + ) + event.defer() + return + logger.info("Retrieving s3 parameters from the s3-integrator relation") s3_parameters, missing_parameters = self._retrieve_s3_parameters() if missing_parameters: @@ -734,17 +794,73 @@ def _on_s3_credentials_changed(self, event: CredentialsChangedEvent) -> None: ): self.charm.app_peer_data.update({ "s3-block-message": "", + "binlogs-collecting": "true", }) else: self.charm.app_peer_data.update({ "s3-block-message": ANOTHER_S3_CLUSTER_REPOSITORY_ERROR_MESSAGE, + "binlogs-collecting": "", }) + if not self.charm._mysql.start_stop_binlogs_collecting(True): + logger.error("Failed to restart binlogs collecting after S3 relation update") + def _on_s3_credentials_gone(self, event: CredentialsGoneEvent) -> None: if not self.charm.unit.is_leader(): logger.debug("Early exit on _on_s3_credentials_gone: unit is not a leader") return - self.charm.app_peer_data.update({ "s3-block-message": "", + "binlogs-collecting": "", }) + if not self.charm._mysql.start_stop_binlogs_collecting(): + logger.error("Failed to stop binlogs collecting after S3 relation depart") + + def update_binlogs_collector_config(self) -> bool: + """Update binlogs collector service config file. + + Returns: whether this operation was successful. + """ + if not self._s3_integrator_relation_exists: + logger.error( + "Cannot update binlogs collector config: s3 integrator relation does not exist" + ) + return False + + logger.info("Retrieving s3 parameters from the s3-integrator relation") + s3_parameters, missing_parameters = self._retrieve_s3_parameters() + if missing_parameters: + logger.error( + f"Cannot update binlogs collector config: Missing S3 parameters: {missing_parameters}" + ) + return False + + bucket_url = ( + f"{s3_parameters['bucket']}/{s3_parameters['path']}binlogs" + if s3_parameters["path"][-1] == "/" + else f"{s3_parameters['bucket']}/{s3_parameters['path']}/binlogs" + ) + + with io.StringIO() as string_io: + yaml.dump( + { + "endpoint": s3_parameters["endpoint"], + "hosts": self.charm._mysql.get_cluster_members(), + "user": SERVER_CONFIG_USERNAME, + "pass": self.charm.get_secret("app", SERVER_CONFIG_PASSWORD_KEY), + "storage_type": "s3", + "s3": { + "access_key_id": s3_parameters["access-key"], + "secret_access_key": s3_parameters["secret-key"], + "bucket_url": bucket_url, + "default_region": s3_parameters["region"], + }, + }, + string_io, + ) + self.charm._mysql.write_content_to_file( + path=MYSQL_BINLOGS_COLLECTOR_CONFIG_FILE, + content=string_io.getvalue(), + ) + + return True diff --git a/lib/charms/mysql/v0/mysql.py b/lib/charms/mysql/v0/mysql.py index 997722dcb..aee3dc375 100644 --- a/lib/charms/mysql/v0/mysql.py +++ b/lib/charms/mysql/v0/mysql.py @@ -106,6 +106,7 @@ def wait_until_mysql_connection(self) -> None: from constants import ( BACKUPS_PASSWORD_KEY, BACKUPS_USERNAME, + CHARMED_MYSQL_PITR_HELPER, CLUSTER_ADMIN_PASSWORD_KEY, CLUSTER_ADMIN_USERNAME, COS_AGENT_RELATION_NAME, @@ -338,6 +339,10 @@ class MySQLRestoreBackupError(Error): """Exception raised when there is an error restoring a backup.""" +class MySQLRestorePitrError(Error): + """Exception raised when there is an error during point-in-time-recovery restore.""" + + class MySQLDeleteTempRestoreDirectoryError(Error): """Exception raised when there is an error deleting the temp restore directory.""" @@ -2903,6 +2908,62 @@ def restore_backup( logger.exception("Failed to restore backup") raise MySQLRestoreBackupError + def restore_pitr( + self, + host: str, + mysql_user: str, + password: str, + s3_parameters: Dict[str, str], + restore_to_time: str, + user: str | None = None, + group: str | None = None, + ) -> Tuple[str, str]: + """Run point-in-time-recovery using binary logs from the S3 repository. + + Args: + host: the MySQL host to connect to. + mysql_user: the MySQL user to connect to. + password: the password of the provided MySQL user. + s3_parameters: S3 relation parameters. + restore_to_time: the MySQL timestamp to restore to or keyword `latest`. + user: the user with which to execute the commands. + group: the group with which to execute the commands. + """ + bucket_url = ( + f"{s3_parameters['bucket']}/{s3_parameters['path']}binlogs" + if s3_parameters["path"][-1] == "/" + else f"{s3_parameters['bucket']}/{s3_parameters['path']}/binlogs" + ) + + try: + return self._execute_commands( + [ + CHARMED_MYSQL_PITR_HELPER, + "recover", + ], + user=user, + group=group, + env_extra={ + "BINLOG_S3_ENDPOINT": s3_parameters["endpoint"], + "HOST": host, + "USER": mysql_user, + "PASS": password, + "PITR_DATE": restore_to_time if restore_to_time != "latest" else "", + "PITR_RECOVERY_TYPE": "latest" if restore_to_time == "latest" else "date", + "STORAGE_TYPE": "s3", + "BINLOG_ACCESS_KEY_ID": s3_parameters["access-key"], + "BINLOG_SECRET_ACCESS_KEY": s3_parameters["secret-key"], + "BINLOG_S3_REGION": s3_parameters["region"], + "BINLOG_S3_BUCKET_URL": bucket_url, + }, + ) + except MySQLExecError as e: + logger.exception("Failed to restore pitr") + raise MySQLRestorePitrError(e.message) + except Exception: + logger.exception("Failed to restore pitr") + raise MySQLRestorePitrError + def delete_temp_restore_directory( self, temp_restore_directory: str, @@ -3194,3 +3255,24 @@ def _file_exists(self, path: str) -> bool: path: Path to the file to check """ raise NotImplementedError + + @abstractmethod + def start_stop_binlogs_collecting(self, force_restart: bool = False) -> bool: + """Start or stop binlogs collecting service. + + Based on the `binlogs-collecting` app peer data value and unit leadership. + + Args: + force_restart: whether to restart service even if it's already running. + + Returns: whether the operation was successful. + """ + raise NotImplementedError + + @abstractmethod + def get_cluster_members(self) -> list[str]: + """Get cluster members in MySQL MEMBER_HOST format. + + Returns: list of the cluster members in the MySQL MEMBER_HOST format. + """ + raise NotImplementedError diff --git a/src/charm.py b/src/charm.py index f95af2f88..e77e76d48 100755 --- a/src/charm.py +++ b/src/charm.py @@ -387,6 +387,9 @@ def _on_peer_relation_changed(self, event: RelationChangedEvent) -> None: except subprocess.CalledProcessError: logger.exception(f"failed to open port {port}") + if not self._mysql.start_stop_binlogs_collecting(): + logger.error("Failed to start or stop binlogs collecting during peer relation event") + def _on_database_storage_detaching(self, _) -> None: """Handle the database storage detaching event.""" # Only executes if the unit was initialised diff --git a/src/constants.py b/src/constants.py index 00c774682..ed1d57230 100644 --- a/src/constants.py +++ b/src/constants.py @@ -29,12 +29,17 @@ CHARMED_MYSQLD_SERVICE = "mysqld" CHARMED_MYSQL = "charmed-mysql.mysql" CHARMED_MYSQLSH = "charmed-mysql.mysqlsh" +CHARMED_MYSQL_PITR_HELPER = "charmed-mysql.mysql-pitr-helper" +CHARMED_MYSQL_BINLOGS_COLLECTOR_SERVICE = "mysql-pitr-helper-collector" CHARMED_MYSQL_COMMON_DIRECTORY = "/var/snap/charmed-mysql/common" CHARMED_MYSQL_DATA_DIRECTORY = "/var/snap/charmed-mysql/current" MYSQLD_SOCK_FILE = f"{CHARMED_MYSQL_COMMON_DIRECTORY}/var/run/mysqld/mysqld.sock" MYSQLD_CONFIG_DIRECTORY = f"{CHARMED_MYSQL_DATA_DIRECTORY}/etc/mysql/mysql.conf.d" MYSQLD_DEFAULTS_CONFIG_FILE = f"{CHARMED_MYSQL_DATA_DIRECTORY}/etc/mysql/mysql.cnf" MYSQLD_CUSTOM_CONFIG_FILE = f"{MYSQLD_CONFIG_DIRECTORY}/z-custom-mysqld.cnf" +MYSQL_BINLOGS_COLLECTOR_CONFIG_FILE = ( + f"{CHARMED_MYSQL_DATA_DIRECTORY}/etc/mysql-pitr-helper-collector.yaml" +) MYSQL_SYSTEM_USER = "snap_daemon" MYSQL_DATA_DIR = f"{CHARMED_MYSQL_COMMON_DIRECTORY}/var/lib/mysql" CHARMED_MYSQL_XTRABACKUP_LOCATION = "/snap/bin/charmed-mysql.xtrabackup" diff --git a/src/mysql_vm_helpers.py b/src/mysql_vm_helpers.py index 840accc6e..782fd6fbb 100644 --- a/src/mysql_vm_helpers.py +++ b/src/mysql_vm_helpers.py @@ -35,6 +35,7 @@ from constants import ( CHARMED_MYSQL, + CHARMED_MYSQL_BINLOGS_COLLECTOR_SERVICE, CHARMED_MYSQL_COMMON_DIRECTORY, CHARMED_MYSQL_DATA_DIRECTORY, CHARMED_MYSQL_SNAP_NAME, @@ -201,6 +202,9 @@ def install_and_configure_mysql_dependencies() -> None: os.system(f"chown -R {MYSQL_SYSTEM_USER} {CHARMED_MYSQL_COMMON_DIRECTORY}") subprocess.run(["snap", "alias", "charmed-mysql.mysql", "mysql"], check=True) + subprocess.run( + ["snap", "alias", "charmed-mysql.mysqlbinlog", "mysqlbinlog"], check=True + ) installed_by_mysql_server_file.touch(exist_ok=True) except snap.SnapError: @@ -816,6 +820,47 @@ def _file_exists(self, path: str) -> bool: """Check if file exists.""" return os.path.exists(path) + def start_stop_binlogs_collecting(self, force_restart: bool = False) -> bool: + """Start or stop binlogs collecting service. + + Based on the "binlogs-collecting" app peer data value and unit leadership. + + Args: + force_restart: whether to restart service even if it's already running. + + Returns: whether the operation was successful. + """ + cache = snap.SnapCache() + selected_snap = cache[CHARMED_MYSQL_SNAP_NAME] + if not selected_snap.present: + raise SnapServiceOperationError(f"Snap {CHARMED_MYSQL_SNAP_NAME} not installed") + + is_running = selected_snap.services[CHARMED_MYSQL_BINLOGS_COLLECTOR_SERVICE]["active"] + + if is_running and ( + not self.charm.unit.is_leader() or "binlogs-collecting" not in self.charm.app_peer_data + ): + logger.debug("Stopping binlogs collector") + selected_snap.stop([CHARMED_MYSQL_BINLOGS_COLLECTOR_SERVICE]) + + if not self.charm.unit.is_leader(): + return True + + if "binlogs-collecting" in self.charm.app_peer_data and (force_restart or not is_running): + logger.debug("Restarting binlogs collector") + if not self.charm.backups.update_binlogs_collector_config(): + return False + selected_snap.restart([CHARMED_MYSQL_BINLOGS_COLLECTOR_SERVICE]) + + return True + + def get_cluster_members(self) -> list[str]: + """Get cluster members in MySQL MEMBER_HOST format. + + Returns: list of cluster members in MySQL MEMBER_HOST format. + """ + return [host.names[1] for host in self.charm.hostname_resolution._get_host_details()] + @staticmethod def write_content_to_file( path: str, From c730506f05f0bdff73d6be6056822933b9b081d4 Mon Sep 17 00:00:00 2001 From: Vladyslav Tarasenko Date: Tue, 10 Dec 2024 00:46:23 +0200 Subject: [PATCH 5/5] Integration tests. --- tests/integration/helpers.py | 3 + tests/integration/test_backup_aws.py | 37 +++- tests/integration/test_backup_ceph.py | 37 +++- tests/integration/test_backup_gcp.py | 37 +++- tests/integration/test_backup_pitr.py | 295 ++++++++++++++++++++++++++ 5 files changed, 400 insertions(+), 9 deletions(-) create mode 100644 tests/integration/test_backup_pitr.py diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index 4d239c655..a14ddd756 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -208,6 +208,7 @@ async def execute_queries_on_unit( password: str, queries: List[str], commit: bool = False, + raw: bool = False, ) -> List: """Execute given MySQL queries on a unit. @@ -217,6 +218,7 @@ async def execute_queries_on_unit( password: The MySQL password queries: A list of queries to execute commit: A keyword arg indicating whether there are any writes queries + raw: Whether MySQL results are returned as is, rather than converted to Python types. Returns: A list of rows that were potentially queried @@ -226,6 +228,7 @@ async def execute_queries_on_unit( "password": password, "host": unit_address, "raise_on_warnings": False, + "raw": raw, } with MysqlConnector(config, commit) as cursor: diff --git a/tests/integration/test_backup_aws.py b/tests/integration/test_backup_aws.py index 79b0e6b56..882cfb89b 100644 --- a/tests/integration/test_backup_aws.py +++ b/tests/integration/test_backup_aws.py @@ -37,6 +37,10 @@ ROOT_PASSWORD = "rootpassword" DATABASE_NAME = "backup-database" TABLE_NAME = "backup-table" +ANOTHER_S3_CLUSTER_REPOSITORY_ERROR_MESSAGE = "S3 repository claimed by another cluster" +MOVE_RESTORED_CLUSTER_TO_ANOTHER_S3_REPOSITORY_ERROR = ( + "Move restored cluster to another S3 repository" +) backup_id, value_before_backup, value_after_backup = "", None, None @@ -258,10 +262,20 @@ async def test_restore_on_same_cluster( assert sorted(values) == sorted([value_before_backup, value_after_restore]) logger.info("Scaling mysql application to 3 units") - await scale_application(ops_test, mysql_application_name, 3) + await ops_test.model.applications[mysql_application_name].add_unit(2) + await ops_test.model.wait_for_idle( + apps=[mysql_application_name], + wait_for_exact_units=3, + timeout=TIMEOUT, + ) logger.info("Ensuring inserted values before backup and after restore exist on all units") for unit in ops_test.model.applications[mysql_application_name].units: + await ops_test.model.block_until( + lambda: unit.workload_status == "active", + timeout=TIMEOUT, + ) + unit_address = await get_unit_ip(ops_test, unit.name) values = await execute_queries_on_unit( @@ -273,6 +287,11 @@ async def test_restore_on_same_cluster( assert sorted(values) == sorted([value_before_backup, value_after_restore]) + assert ( + ops_test.model.applications[mysql_application_name].status_message + == MOVE_RESTORED_CLUSTER_TO_ANOTHER_S3_REPOSITORY_ERROR + ), "cluster should migrate to blocked status after restore" + # scale down the cluster to preserve resources for the following tests await scale_application(ops_test, mysql_application_name, 0) @@ -295,7 +314,6 @@ async def test_restore_on_new_cluster(ops_test: OpsTest, cloud_configs, cloud_cr await ops_test.model.wait_for_idle( apps=[new_mysql_application_name, S3_INTEGRATOR], - status="active", timeout=TIMEOUT, ) @@ -329,7 +347,13 @@ async def test_restore_on_new_cluster(ops_test: OpsTest, cloud_configs, cloud_cr await ops_test.model.wait_for_idle( apps=[new_mysql_application_name, S3_INTEGRATOR], - status="active", + timeout=TIMEOUT, + ) + + logger.info("Waiting for blocked application status with another cluster S3 repository") + await ops_test.model.block_until( + lambda: ops_test.model.applications[new_mysql_application_name].status_message + == ANOTHER_S3_CLUSTER_REPOSITORY_ERROR_MESSAGE, timeout=TIMEOUT, ) @@ -370,3 +394,10 @@ async def test_restore_on_new_cluster(ops_test: OpsTest, cloud_configs, cloud_cr ) assert value_before_backup assert sorted(values) == sorted([value_before_backup, value_after_restore]) + + logger.info("Waiting for blocked application status after restore") + await ops_test.model.block_until( + lambda: ops_test.model.applications[new_mysql_application_name].status_message + == MOVE_RESTORED_CLUSTER_TO_ANOTHER_S3_REPOSITORY_ERROR, + timeout=TIMEOUT, + ) diff --git a/tests/integration/test_backup_ceph.py b/tests/integration/test_backup_ceph.py index be20c2ec0..874b6f703 100644 --- a/tests/integration/test_backup_ceph.py +++ b/tests/integration/test_backup_ceph.py @@ -38,6 +38,10 @@ ROOT_PASSWORD = "rootpassword" DATABASE_NAME = "backup-database" TABLE_NAME = "backup-table" +ANOTHER_S3_CLUSTER_REPOSITORY_ERROR_MESSAGE = "S3 repository claimed by another cluster" +MOVE_RESTORED_CLUSTER_TO_ANOTHER_S3_REPOSITORY_ERROR = ( + "Move restored cluster to another S3 repository" +) backup_id, value_before_backup, value_after_backup = "", None, None @@ -258,10 +262,20 @@ async def test_restore_on_same_cluster( assert sorted(values) == sorted([value_before_backup, value_after_restore]) logger.info("Scaling mysql application to 3 units") - await scale_application(ops_test, mysql_application_name, 3) + await ops_test.model.applications[mysql_application_name].add_unit(2) + await ops_test.model.wait_for_idle( + apps=[mysql_application_name], + wait_for_exact_units=3, + timeout=TIMEOUT, + ) logger.info("Ensuring inserted values before backup and after restore exist on all units") for unit in ops_test.model.applications[mysql_application_name].units: + await ops_test.model.block_until( + lambda: unit.workload_status == "active", + timeout=TIMEOUT, + ) + unit_address = await get_unit_ip(ops_test, unit.name) values = await execute_queries_on_unit( @@ -273,6 +287,11 @@ async def test_restore_on_same_cluster( assert sorted(values) == sorted([value_before_backup, value_after_restore]) + assert ( + ops_test.model.applications[mysql_application_name].status_message + == MOVE_RESTORED_CLUSTER_TO_ANOTHER_S3_REPOSITORY_ERROR + ), "cluster should migrate to blocked status after restore" + # scale down the cluster to preserve resources for the following tests await scale_application(ops_test, mysql_application_name, 0) @@ -295,7 +314,6 @@ async def test_restore_on_new_cluster(ops_test: OpsTest, cloud_configs, cloud_cr await ops_test.model.wait_for_idle( apps=[new_mysql_application_name, S3_INTEGRATOR], - status="active", timeout=TIMEOUT, ) @@ -329,7 +347,13 @@ async def test_restore_on_new_cluster(ops_test: OpsTest, cloud_configs, cloud_cr await ops_test.model.wait_for_idle( apps=[new_mysql_application_name, S3_INTEGRATOR], - status="active", + timeout=TIMEOUT, + ) + + logger.info("Waiting for blocked application status with another cluster S3 repository") + await ops_test.model.block_until( + lambda: ops_test.model.applications[new_mysql_application_name].status_message + == ANOTHER_S3_CLUSTER_REPOSITORY_ERROR_MESSAGE, timeout=TIMEOUT, ) @@ -370,3 +394,10 @@ async def test_restore_on_new_cluster(ops_test: OpsTest, cloud_configs, cloud_cr ) assert value_before_backup assert sorted(values) == sorted([value_before_backup, value_after_restore]) + + logger.info("Waiting for blocked application status after restore") + await ops_test.model.block_until( + lambda: ops_test.model.applications[new_mysql_application_name].status_message + == MOVE_RESTORED_CLUSTER_TO_ANOTHER_S3_REPOSITORY_ERROR, + timeout=TIMEOUT, + ) diff --git a/tests/integration/test_backup_gcp.py b/tests/integration/test_backup_gcp.py index c58b5363f..bd6d77b3c 100644 --- a/tests/integration/test_backup_gcp.py +++ b/tests/integration/test_backup_gcp.py @@ -37,6 +37,10 @@ ROOT_PASSWORD = "rootpassword" DATABASE_NAME = "backup-database" TABLE_NAME = "backup-table" +ANOTHER_S3_CLUSTER_REPOSITORY_ERROR_MESSAGE = "S3 repository claimed by another cluster" +MOVE_RESTORED_CLUSTER_TO_ANOTHER_S3_REPOSITORY_ERROR = ( + "Move restored cluster to another S3 repository" +) backup_id, value_before_backup, value_after_backup = "", None, None @@ -258,10 +262,20 @@ async def test_restore_on_same_cluster( assert sorted(values) == sorted([value_before_backup, value_after_restore]) logger.info("Scaling mysql application to 3 units") - await scale_application(ops_test, mysql_application_name, 3) + await ops_test.model.applications[mysql_application_name].add_unit(2) + await ops_test.model.wait_for_idle( + apps=[mysql_application_name], + wait_for_exact_units=3, + timeout=TIMEOUT, + ) logger.info("Ensuring inserted values before backup and after restore exist on all units") for unit in ops_test.model.applications[mysql_application_name].units: + await ops_test.model.block_until( + lambda: unit.workload_status == "active", + timeout=TIMEOUT, + ) + unit_address = await get_unit_ip(ops_test, unit.name) values = await execute_queries_on_unit( @@ -273,6 +287,11 @@ async def test_restore_on_same_cluster( assert sorted(values) == sorted([value_before_backup, value_after_restore]) + assert ( + ops_test.model.applications[mysql_application_name].status_message + == MOVE_RESTORED_CLUSTER_TO_ANOTHER_S3_REPOSITORY_ERROR + ), "cluster should migrate to blocked status after restore" + # scale down the cluster to preserve resources for the following tests await scale_application(ops_test, mysql_application_name, 0) @@ -295,7 +314,6 @@ async def test_restore_on_new_cluster(ops_test: OpsTest, cloud_configs, cloud_cr await ops_test.model.wait_for_idle( apps=[new_mysql_application_name, S3_INTEGRATOR], - status="active", timeout=TIMEOUT, ) @@ -329,7 +347,13 @@ async def test_restore_on_new_cluster(ops_test: OpsTest, cloud_configs, cloud_cr await ops_test.model.wait_for_idle( apps=[new_mysql_application_name, S3_INTEGRATOR], - status="active", + timeout=TIMEOUT, + ) + + logger.info("Waiting for blocked application status with another cluster S3 repository") + await ops_test.model.block_until( + lambda: ops_test.model.applications[new_mysql_application_name].status_message + == ANOTHER_S3_CLUSTER_REPOSITORY_ERROR_MESSAGE, timeout=TIMEOUT, ) @@ -370,3 +394,10 @@ async def test_restore_on_new_cluster(ops_test: OpsTest, cloud_configs, cloud_cr ) assert value_before_backup assert sorted(values) == sorted([value_before_backup, value_after_restore]) + + logger.info("Waiting for blocked application status after restore") + await ops_test.model.block_until( + lambda: ops_test.model.applications[new_mysql_application_name].status_message + == MOVE_RESTORED_CLUSTER_TO_ANOTHER_S3_REPOSITORY_ERROR, + timeout=TIMEOUT, + ) diff --git a/tests/integration/test_backup_pitr.py b/tests/integration/test_backup_pitr.py new file mode 100644 index 000000000..7ca133734 --- /dev/null +++ b/tests/integration/test_backup_pitr.py @@ -0,0 +1,295 @@ +#!/usr/bin/env python3 +# Copyright 2022 Canonical Ltd. +# See LICENSE file for licensing details. + +import logging +import uuid + +import boto3 +import pytest +from pytest_operator.plugin import OpsTest + +from . import juju_ +from .helpers import ( + execute_queries_on_unit, + get_primary_unit_wrapper, + get_unit_ip, + rotate_credentials, +) +from .high_availability.high_availability_helpers import ( + deploy_and_scale_mysql, + insert_data_into_mysql_and_validate_replication, +) + +logger = logging.getLogger(__name__) + +S3_INTEGRATOR = "s3-integrator" +S3_INTEGRATOR_CHANNEL = "latest/stable" +TIMEOUT = 10 * 60 +SERVER_CONFIG_USER = "serverconfig" +SERVER_CONFIG_PASSWORD = "serverconfigpassword" +DATABASE_NAME = "backup-database" +TABLE_NAME = "backup-table" +APPLICATION_NAME_AWS = "mysql-aws" +APPLICATION_NAME_GCP = "mysql-gcp" +MOVE_RESTORED_CLUSTER_TO_ANOTHER_S3_REPOSITORY_ERROR = ( + "Move restored cluster to another S3 repository" +) + + +@pytest.fixture(scope="session") +def cloud_configs_aws(github_secrets) -> tuple[dict[str, str], dict[str, str]]: + configs = { + "endpoint": "https://s3.amazonaws.com", + "bucket": "data-charms-testing", + "path": f"mysql/{uuid.uuid4()}", + "region": "us-east-1", + } + credentials = { + "access-key": github_secrets["AWS_ACCESS_KEY"], + "secret-key": github_secrets["AWS_SECRET_KEY"], + } + yield configs, credentials + clean_backups_from_buckets(configs, credentials) + + +@pytest.fixture(scope="session") +def cloud_configs_gcp(github_secrets) -> tuple[dict[str, str], dict[str, str]]: + configs = { + "endpoint": "https://storage.googleapis.com", + "bucket": "data-charms-testing", + "path": f"mysql/{uuid.uuid4()}", + "region": "", + } + credentials = { + "access-key": github_secrets["GCP_ACCESS_KEY"], + "secret-key": github_secrets["GCP_SECRET_KEY"], + } + yield configs, credentials + clean_backups_from_buckets(configs, credentials) + + +def clean_backups_from_buckets(cloud_configs, cloud_credentials) -> None: + """Teardown to clean up created backups from clouds.""" + logger.info("Cleaning backups from cloud buckets") + session = boto3.session.Session( # pyright: ignore + aws_access_key_id=cloud_credentials["access-key"], + aws_secret_access_key=cloud_credentials["secret-key"], + region_name=cloud_configs["region"], + ) + s3 = session.resource("s3", endpoint_url=cloud_configs["endpoint"]) + bucket = s3.Bucket(cloud_configs["bucket"]) + + # GCS doesn't support batch delete operation, so delete the objects one by one + backup_path = cloud_configs["path"] + for bucket_object in bucket.objects.filter(Prefix=backup_path): + bucket_object.delete() + + +@pytest.mark.group("AWS") +@pytest.mark.abort_on_fail +async def test_build_and_deploy_aws( + ops_test: OpsTest, cloud_configs_aws: tuple[dict[str, str], dict[str, str]] +) -> None: + await build_and_deploy_operations( + ops_test, APPLICATION_NAME_AWS, cloud_configs_aws[0], cloud_configs_aws[1] + ) + + +@pytest.mark.group("GCP") +@pytest.mark.abort_on_fail +async def test_build_and_deploy_gcp( + ops_test: OpsTest, cloud_configs_gcp: tuple[dict[str, str], dict[str, str]] +) -> None: + await build_and_deploy_operations( + ops_test, APPLICATION_NAME_GCP, cloud_configs_gcp[0], cloud_configs_gcp[1] + ) + + +async def build_and_deploy_operations( + ops_test: OpsTest, + mysql_application_name: str, + cloud_configs: dict[str, str], + cloud_credentials: dict[str, str], +) -> None: + """Simple test to ensure that the mysql charm gets deployed.""" + logger.info("Deploying s3 integrator") + await ops_test.model.deploy(S3_INTEGRATOR, channel=S3_INTEGRATOR_CHANNEL, base="ubuntu@22.04") + + logger.info("Deploying mysql") + await deploy_and_scale_mysql(ops_test, mysql_application_name=mysql_application_name) + + logger.info("Rotating mysql credentials") + primary_mysql = await get_primary_unit_wrapper(ops_test, mysql_application_name) + await rotate_credentials( + primary_mysql, username=SERVER_CONFIG_USER, password=SERVER_CONFIG_PASSWORD + ) + + logger.info("Configuring s3 integrator and integrating it with mysql") + await ops_test.model.wait_for_idle( + apps=[S3_INTEGRATOR], + status="blocked", + raise_on_blocked=False, + timeout=TIMEOUT, + ) + await ops_test.model.applications[S3_INTEGRATOR].set_config(cloud_configs) + await juju_.run_action( + ops_test.model.units[f"{S3_INTEGRATOR}/0"], # pyright: ignore + "sync-s3-credentials", + **cloud_credentials, + ) + await ops_test.model.wait_for_idle( + apps=[mysql_application_name, S3_INTEGRATOR], + status="active", + timeout=TIMEOUT, + ) + await ops_test.model.relate(mysql_application_name, S3_INTEGRATOR) + await ops_test.model.wait_for_idle( + apps=[mysql_application_name, S3_INTEGRATOR], + status="active", + timeout=TIMEOUT, + ) + + +@pytest.mark.group("AWS") +@pytest.mark.abort_on_fail +async def test_pitr_aws(ops_test: OpsTest) -> None: + await pitr_operations(ops_test, APPLICATION_NAME_AWS) + + +@pytest.mark.group("GCP") +@pytest.mark.abort_on_fail +async def test_pitr_gcp(ops_test: OpsTest) -> None: + await pitr_operations(ops_test, APPLICATION_NAME_GCP) + + +async def pitr_operations(ops_test: OpsTest, mysql_application_name: str) -> None: + primary_unit = await get_primary_unit_wrapper(ops_test, mysql_application_name) + non_primary_units = [ + unit + for unit in ops_test.model.applications[mysql_application_name].units + if unit.name != primary_unit.name + ] + primary_ip = await get_unit_ip(ops_test, primary_unit.name) + + logger.info("Creating backup") + results = await juju_.run_action(non_primary_units[0], "create-backup", **{"--wait": "5m"}) + backup_id = results["backup-id"] + + logger.info("Creating test data 1") + td1 = await insert_data_into_mysql_and_validate_replication( + ops_test, DATABASE_NAME, TABLE_NAME, mysql_application_name + ) + ts = await execute_queries_on_unit( + primary_ip, + SERVER_CONFIG_USER, + SERVER_CONFIG_PASSWORD, + ["SELECT CURRENT_TIMESTAMP"], + raw=True, + ) + # This is a raw bytes, so we need to decode it to the utf-8 string + ts = ts[0].decode("utf-8") + ts_year_before = ts.replace(ts[:4], str(int(ts[:4]) - 1), 1) + ts_year_after = ts.replace(ts[:4], str(int(ts[:4]) + 1), 1) + + logger.info("Creating test data 2") + td2 = await insert_data_into_mysql_and_validate_replication( + ops_test, DATABASE_NAME, TABLE_NAME, mysql_application_name + ) + + await execute_queries_on_unit( + primary_ip, + SERVER_CONFIG_USER, + SERVER_CONFIG_PASSWORD, + ["FLUSH BINARY LOGS"], + ) + + logger.info("Scaling down to 1 unit") + for unit_to_destroy in non_primary_units: + await ops_test.model.destroy_units(unit_to_destroy.name) + await ops_test.model.wait_for_idle( + apps=[mysql_application_name], + status="active", + timeout=TIMEOUT, + wait_for_exact_units=1, + ) + + logger.info(f"Restoring backup {backup_id} with bad restore-to-time parameter") + action = await primary_unit.run_action( + "restore", **{"backup-id": backup_id, "restore-to-time": "bad"} + ) + await action.wait() + assert ( + action.status == "failed" + ), "restore should fail with bad restore-to-time parameter, but it succeeded" + + logger.info(f"Restoring backup {backup_id} with year_before restore-to-time parameter") + await juju_.run_action( + primary_unit, "restore", **{"backup-id": backup_id, "restore-to-time": ts_year_before} + ) + await ops_test.model.wait_for_idle( + apps=[mysql_application_name, S3_INTEGRATOR], + timeout=TIMEOUT, + ) + assert await check_test_data_existence( + primary_ip, should_not_exist=[td1, td2] + ), "test data should not exist" + + logger.info(f"Restoring backup {backup_id} with year_after restore-to-time parameter") + await juju_.run_action( + primary_unit, "restore", **{"backup-id": backup_id, "restore-to-time": ts_year_after} + ) + await ops_test.model.wait_for_idle( + apps=[mysql_application_name, S3_INTEGRATOR], + timeout=TIMEOUT, + ) + assert await check_test_data_existence( + primary_ip, should_exist=[td1, td2] + ), "both test data should exist" + + logger.info(f"Restoring backup {backup_id} with actual restore-to-time parameter") + await juju_.run_action( + primary_unit, "restore", **{"backup-id": backup_id, "restore-to-time": ts} + ) + await ops_test.model.wait_for_idle( + apps=[mysql_application_name, S3_INTEGRATOR], + timeout=TIMEOUT, + ) + assert await check_test_data_existence( + primary_ip, should_exist=[td1], should_not_exist=[td2] + ), "only first test data should exist" + + logger.info(f"Restoring backup {backup_id} with restore-to-time=latest parameter") + await juju_.run_action( + primary_unit, "restore", **{"backup-id": backup_id, "restore-to-time": "latest"} + ) + await ops_test.model.wait_for_idle( + apps=[mysql_application_name, S3_INTEGRATOR], + timeout=TIMEOUT, + ) + assert await check_test_data_existence( + primary_ip, should_exist=[td1, td2] + ), "both test data should exist" + + +async def check_test_data_existence( + unit_address: str, + should_exist: list[str] | None = None, + should_not_exist: list[str] | None = None, +) -> bool: + if should_exist is None: + should_exist = [] + if should_not_exist is None: + should_not_exist = [] + res = await execute_queries_on_unit( + unit_address, + SERVER_CONFIG_USER, + SERVER_CONFIG_PASSWORD, + [ + f"CREATE DATABASE IF NOT EXISTS `{DATABASE_NAME}`", + f"CREATE TABLE IF NOT EXISTS `{DATABASE_NAME}`.`{TABLE_NAME}` (id varchar(255), primary key (id))", + f"SELECT id FROM `{DATABASE_NAME}`.`{TABLE_NAME}`", + ], + commit=True, + ) + return all(res_elem in should_exist and res_elem not in should_not_exist for res_elem in res)