diff --git a/lib/charms/data_platform_libs/v0/upgrade.py b/lib/charms/data_platform_libs/v0/upgrade.py deleted file mode 100644 index ef74644de..000000000 --- a/lib/charms/data_platform_libs/v0/upgrade.py +++ /dev/null @@ -1,1091 +0,0 @@ -# Copyright 2023 Canonical Ltd. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -r"""Library to manage in-place upgrades for charms running on VMs and K8s. - -This library contains handlers for `upgrade` relation events used to coordinate -between units in an application during a `juju refresh`, as well as `Pydantic` models -for instantiating, validating and comparing dependencies. - -An upgrade on VMs is initiated with the command `juju refresh`. Once executed, the following -events are emitted to each unit at random: - - `upgrade-charm` - - `config-changed` - - `leader-settings-changed` - Non-leader only - -Charm authors can implement the classes defined in this library to streamline the process of -coordinating which unit updates when, achieved through updating of unit-data `state` throughout. - -At a high-level, the upgrade steps are as follows: - - Run pre-checks on the cluster to confirm it is safe to upgrade - - Create stack of unit.ids, to serve as the upgrade order (generally workload leader is last) - - Start the upgrade by issuing a Juju CLI command - - The unit at the top of the stack gets permission to upgrade - - The unit handles the upgrade and restarts their service - - Repeat, until all units have restarted - -### Usage by charm authors - -#### `upgrade` relation - -Charm authors must implement an additional peer-relation. - -As this library uses relation data exchanged between units to coordinate, charm authors -need to add a new relation interface. The relation name does not matter. - -`metadata.yaml` -```yaml -peers: - upgrade: - interface: upgrade -``` - -#### Dependencies JSON/Dict - -Charm authors must implement a dict object tracking current charm versions, requirements + upgradability. - -Many workload versions may be incompatible with older/newer versions. This same idea also can apply to -charm or snap versions. Workloads with required related applications (e.g Kafka + ZooKeeper) also need to -ensure their versions are compatible during an upgrade, to avoid cluster failure. - -As such, it is necessasry to freeze any dependencies within each published charm. An example of this could -be creating a `DEPENDENCIES` dict within the charm code, with the following structure: - -`src/literals.py` -```python -DEPENDENCIES = { - "kafka_charm": { - "dependencies": {"zookeeper": ">50"}, - "name": "kafka", - "upgrade_supported": ">90", - "version": "100", - }, - "kafka_service": { - "dependencies": {"zookeeper": "^3"}, - "name": "kafka", - "upgrade_supported": ">=0.8", - "version": "3.3.2", - }, -} -``` - -The first-level key names are arbitrary labels for tracking what those versions+dependencies are for. -The `dependencies` second-level values are a key-value map of any required external applications, - and the versions this packaged charm can support. -The `upgrade_suppported` second-level values are requirements from which an in-place upgrade can be - supported by the charm. -The `version` second-level values correspond to the current version of this packaged charm. - -Any requirements comply with [`poetry`'s dependency specifications](https://python-poetry.org/docs/dependency-specification/#caret-requirements). - -### Dependency Model - -Charm authors must implement their own class inheriting from `DependencyModel`. - -Using a `Pydantic` model to instantiate the aforementioned `DEPENDENCIES` dict gives stronger type safety and additional -layers of validation. - -Implementation just needs to ensure that the top-level key names from `DEPENDENCIES` are defined as attributed in the model. - -`src/upgrade.py` -```python -from pydantic import BaseModel - -class KafkaDependenciesModel(BaseModel): - kafka_charm: DependencyModel - kafka_service: DependencyModel -``` - -### Overrides for `DataUpgrade` - -Charm authors must define their own class, inheriting from `DataUpgrade`, overriding all required `abstractmethod`s. - -```python -class ZooKeeperUpgrade(DataUpgrade): - def __init__(self, charm: "ZooKeeperUpgrade", **kwargs): - super().__init__(charm, **kwargs) - self.charm = charm -``` - -#### Implementation of `pre_upgrade_check()` - -Before upgrading a cluster, it's a good idea to check that it is stable and healthy before permitting it. -Here, charm authors can validate upgrade safety through API calls, relation-data checks, etc. -If any of these checks fail, raise `ClusterNotReadyError`. - -```python - @override - def pre_upgrade_check(self) -> None: - default_message = "Pre-upgrade check failed and cannot safely upgrade" - try: - if not self.client.members_broadcasting or not len(self.client.server_members) == len( - self.charm.cluster.peer_units - ): - raise ClusterNotReadyError( - message=default_message, - cause="Not all application units are connected and broadcasting in the quorum", - ) - - if self.client.members_syncing: - raise ClusterNotReadyError( - message=default_message, cause="Some quorum members are syncing data" - ) - - if not self.charm.cluster.stable: - raise ClusterNotReadyError( - message=default_message, cause="Charm has not finished initialising" - ) - - except QuorumLeaderNotFoundError: - raise ClusterNotReadyError(message=default_message, cause="Quorum leader not found") - except ConnectionClosedError: - raise ClusterNotReadyError( - message=default_message, cause="Unable to connect to the cluster" - ) -``` - -#### Implementation of `build_upgrade_stack()` - VM ONLY - -Oftentimes, it is necessary to ensure that the workload leader is the last unit to upgrade, -to ensure high-availability during the upgrade process. -Here, charm authors can create a LIFO stack of unit.ids, represented as a list of unit.id strings, -with the leader unit being at i[0]. - -```python -@override -def build_upgrade_stack(self) -> list[int]: - upgrade_stack = [] - for unit in self.charm.cluster.peer_units: - config = self.charm.cluster.unit_config(unit=unit) - - # upgrade quorum leader last - if config["host"] == self.client.leader: - upgrade_stack.insert(0, int(config["unit_id"])) - else: - upgrade_stack.append(int(config["unit_id"])) - - return upgrade_stack -``` - -#### Implementation of `_on_upgrade_granted()` - -On relation-changed events, each unit will check the current upgrade-stack persisted to relation data. -If that unit is at the top of the stack, it will emit an `upgrade-granted` event, which must be handled. -Here, workloads can be re-installed with new versions, checks can be made, data synced etc. -If the new unit successfully rejoined the cluster, call `set_unit_completed()`. -If the new unit failed to rejoin the cluster, call `set_unit_failed()`. - -NOTE - It is essential here to manually call `on_upgrade_changed` if the unit is the current leader. -This ensures that the leader gets it's own relation-changed event, and updates the upgrade-stack for -other units to follow suit. - -```python -@override -def _on_upgrade_granted(self, event: UpgradeGrantedEvent) -> None: - self.charm.snap.stop_snap_service() - - if not self.charm.snap.install(): - logger.error("Unable to install ZooKeeper Snap") - self.set_unit_failed() - return None - - logger.info(f"{self.charm.unit.name} upgrading service...") - self.charm.snap.restart_snap_service() - - try: - logger.debug("Running post-upgrade check...") - self.pre_upgrade_check() - - logger.debug("Marking unit completed...") - self.set_unit_completed() - - # ensures leader gets it's own relation-changed when it upgrades - if self.charm.unit.is_leader(): - logger.debug("Re-emitting upgrade-changed on leader...") - self.on_upgrade_changed(event) - - except ClusterNotReadyError as e: - logger.error(e.cause) - self.set_unit_failed() -``` - -#### Implementation of `log_rollback_instructions()` - -If the upgrade fails, manual intervention may be required for cluster recovery. -Here, charm authors can log out any necessary steps to take to recover from a failed upgrade. -When a unit fails, this library will automatically log out this message. - -```python -@override -def log_rollback_instructions(self) -> None: - logger.error("Upgrade failed. Please run `juju refresh` to previous version.") -``` - -### Instantiating in the charm and deferring events - -Charm authors must add a class attribute for the child class of `DataUpgrade` in the main charm. -They must also ensure that any non-upgrade related events that may be unsafe to handle during -an upgrade, are deferred if the unit is not in the `idle` state - i.e not currently upgrading. - -```python -class ZooKeeperCharm(CharmBase): - def __init__(self, *args): - super().__init__(*args) - self.upgrade = ZooKeeperUpgrade( - self, - relation_name = "upgrade", - substrate = "vm", - dependency_model=ZooKeeperDependencyModel( - **DEPENDENCIES - ), - ) - - def restart(self, event) -> None: - if not self.upgrade.state == "idle": - event.defer() - return None - - self.restart_snap_service() -``` -""" - -import json -import logging -from abc import ABC, abstractmethod -from typing import Dict, List, Literal, Optional, Set, Tuple - -import poetry.core.constraints.version as poetry_version -from ops.charm import ( - ActionEvent, - CharmBase, - CharmEvents, - RelationCreatedEvent, - UpgradeCharmEvent, -) -from ops.framework import EventBase, EventSource, Object -from ops.model import ActiveStatus, BlockedStatus, MaintenanceStatus, Relation, Unit, WaitingStatus -from pydantic import BaseModel, root_validator, validator - -# The unique Charmhub library identifier, never change it -LIBID = "156258aefb79435a93d933409a8c8684" - -# Increment this major API version when introducing breaking changes -LIBAPI = 0 - -# Increment this PATCH version before using `charmcraft publish-lib` or reset -# to 0 if you are raising the major API version -LIBPATCH = 16 - -PYDEPS = ["pydantic>=1.10,<2", "poetry-core"] - -logger = logging.getLogger(__name__) - -# --- DEPENDENCY RESOLUTION FUNCTIONS --- - - -def verify_requirements(version: str, requirement: str) -> bool: - """Verifies a specified version against defined constraint. - - Supports Poetry version constraints - https://python-poetry.org/docs/dependency-specification/#version-constraints - - Args: - version: the version currently in use - requirement: Poetry version constraint - - Returns: - True if `version` meets defined `requirement`. Otherwise False - """ - return poetry_version.parse_constraint(requirement).allows( - poetry_version.Version.parse(version) - ) - - -# --- DEPENDENCY MODEL TYPES --- - - -class DependencyModel(BaseModel): - """Manager for a single dependency. - - To be used as part of another model representing a collection of arbitrary dependencies. - - Example:: - - class KafkaDependenciesModel(BaseModel): - kafka_charm: DependencyModel - kafka_service: DependencyModel - - deps = { - "kafka_charm": { - "dependencies": {"zookeeper": ">5"}, - "name": "kafka", - "upgrade_supported": ">5", - "version": "10", - }, - "kafka_service": { - "dependencies": {"zookeeper": "^3.6"}, - "name": "kafka", - "upgrade_supported": "~3.3", - "version": "3.3.2", - }, - } - - model = KafkaDependenciesModel(**deps) # loading dict in to model - - print(model.dict()) # exporting back validated deps - """ - - dependencies: Dict[str, str] - name: str - upgrade_supported: str - version: str - - @validator("dependencies", "upgrade_supported", each_item=True) - @classmethod - def dependencies_validator(cls, value): - """Validates version constraint.""" - if isinstance(value, dict): - deps = value.values() - else: - deps = [value] - - for dep in deps: - poetry_version.parse_constraint(dep) - - return value - - @root_validator(skip_on_failure=True) - @classmethod - def version_upgrade_supported_validator(cls, values): - """Validates specified `version` meets `upgrade_supported` requirement.""" - if not verify_requirements( - version=values.get("version"), requirement=values.get("upgrade_supported") - ): - raise ValueError( - f"upgrade_supported value {values.get('upgrade_supported')} greater than version value {values.get('version')} for {values.get('name')}." - ) - - return values - - def can_upgrade(self, dependency: "DependencyModel") -> bool: - """Compares two instances of :class:`DependencyModel` for upgradability. - - Args: - dependency: a dependency model to compare this model against - - Returns: - True if current model can upgrade from dependent model. Otherwise False - """ - return verify_requirements(version=self.version, requirement=dependency.upgrade_supported) - - -# --- CUSTOM EXCEPTIONS --- - - -class UpgradeError(Exception): - """Base class for upgrade related exceptions in the module.""" - - def __init__(self, message: str, cause: Optional[str], resolution: Optional[str]): - super().__init__(message) - self.message = message - self.cause = cause or "" - self.resolution = resolution or "" - - def __repr__(self): - """Representation of the UpgradeError class.""" - return f"{type(self).__module__}.{type(self).__name__} - {str(vars(self))}" - - def __str__(self): - """String representation of the UpgradeError class.""" - return repr(self) - - -class ClusterNotReadyError(UpgradeError): - """Exception flagging that the cluster is not ready to start upgrading. - - For example, if the cluster fails :class:`DataUpgrade._on_pre_upgrade_check_action` - - Args: - message: string message to be logged out - cause: short human-readable description of the cause of the error - resolution: short human-readable instructions for manual error resolution (optional) - """ - - def __init__(self, message: str, cause: str, resolution: Optional[str] = None): - super().__init__(message, cause=cause, resolution=resolution) - - -class KubernetesClientError(UpgradeError): - """Exception flagging that a call to Kubernetes API failed. - - For example, if the cluster fails :class:`DataUpgrade._set_rolling_update_partition` - - Args: - message: string message to be logged out - cause: short human-readable description of the cause of the error - resolution: short human-readable instructions for manual error resolution (optional) - """ - - def __init__(self, message: str, cause: str, resolution: Optional[str] = None): - super().__init__(message, cause=cause, resolution=resolution) - - -class VersionError(UpgradeError): - """Exception flagging that the old `version` fails to meet the new `upgrade_supported`s. - - For example, upgrades from version `2.x` --> `4.x`, - but `4.x` only supports upgrading from `3.x` onwards - - Args: - message: string message to be logged out - cause: short human-readable description of the cause of the error - resolution: short human-readable instructions for manual solutions to the error (optional) - """ - - def __init__(self, message: str, cause: str, resolution: Optional[str] = None): - super().__init__(message, cause=cause, resolution=resolution) - - -class DependencyError(UpgradeError): - """Exception flagging that some new `dependency` is not being met. - - For example, new version requires related App version `2.x`, but currently is `1.x` - - Args: - message: string message to be logged out - cause: short human-readable description of the cause of the error - resolution: short human-readable instructions for manual solutions to the error (optional) - """ - - def __init__(self, message: str, cause: str, resolution: Optional[str] = None): - super().__init__(message, cause=cause, resolution=resolution) - - -# --- CUSTOM EVENTS --- - - -class UpgradeGrantedEvent(EventBase): - """Used to tell units that they can process an upgrade.""" - - -class UpgradeFinishedEvent(EventBase): - """Used to tell units that they finished the upgrade.""" - - -class UpgradeEvents(CharmEvents): - """Upgrade events. - - This class defines the events that the lib can emit. - """ - - upgrade_granted = EventSource(UpgradeGrantedEvent) - upgrade_finished = EventSource(UpgradeFinishedEvent) - - -# --- EVENT HANDLER --- - - -class DataUpgrade(Object, ABC): - """Manages `upgrade` relation operations for in-place upgrades.""" - - STATES = ["recovery", "failed", "idle", "ready", "upgrading", "completed"] - - on = UpgradeEvents() # pyright: ignore [reportAssignmentType] - - def __init__( - self, - charm: CharmBase, - dependency_model: BaseModel, - relation_name: str = "upgrade", - substrate: Literal["vm", "k8s"] = "vm", - ): - super().__init__(charm, relation_name) - self.charm = charm - self.dependency_model = dependency_model - self.relation_name = relation_name - self.substrate = substrate - self._upgrade_stack = None - - # events - self.framework.observe( - self.charm.on[relation_name].relation_created, self._on_upgrade_created - ) - self.framework.observe( - self.charm.on[relation_name].relation_changed, self.on_upgrade_changed - ) - self.framework.observe(self.charm.on.upgrade_charm, self._on_upgrade_charm) - self.framework.observe(getattr(self.on, "upgrade_granted"), self._on_upgrade_granted) - self.framework.observe(getattr(self.on, "upgrade_finished"), self._on_upgrade_finished) - - # actions - self.framework.observe( - getattr(self.charm.on, "pre_upgrade_check_action"), self._on_pre_upgrade_check_action - ) - if self.substrate == "k8s": - self.framework.observe( - getattr(self.charm.on, "resume_upgrade_action"), self._on_resume_upgrade_action - ) - - @property - def peer_relation(self) -> Optional[Relation]: - """The upgrade peer relation.""" - return self.charm.model.get_relation(self.relation_name) - - @property - def app_units(self) -> Set[Unit]: - """The peer-related units in the application.""" - if not self.peer_relation: - return set() - - return set([self.charm.unit] + list(self.peer_relation.units)) - - @property - def state(self) -> Optional[str]: - """The unit state from the upgrade peer relation.""" - if not self.peer_relation: - return None - - return self.peer_relation.data[self.charm.unit].get("state", None) - - @property - def stored_dependencies(self) -> Optional[BaseModel]: - """The application dependencies from the upgrade peer relation.""" - if not self.peer_relation: - return None - - if not (deps := self.peer_relation.data[self.charm.app].get("dependencies", "")): - return None - - return type(self.dependency_model)(**json.loads(deps)) - - @property - def upgrade_stack(self) -> Optional[List[int]]: - """Gets the upgrade stack from the upgrade peer relation. - - Unit.ids are ordered Last-In-First-Out (LIFO). - i.e unit.id at index `-1` is the first unit to upgrade. - unit.id at index `0` is the last unit to upgrade. - - Returns: - List of integer unit.ids, ordered in upgrade order in a stack - """ - if not self.peer_relation: - return None - - # lazy-load - if self._upgrade_stack is None: - self._upgrade_stack = ( - json.loads(self.peer_relation.data[self.charm.app].get("upgrade-stack", "[]")) - or None - ) - - return self._upgrade_stack - - @upgrade_stack.setter - def upgrade_stack(self, stack: List[int]) -> None: - """Sets the upgrade stack to the upgrade peer relation. - - Unit.ids are ordered Last-In-First-Out (LIFO). - i.e unit.id at index `-1` is the first unit to upgrade. - unit.id at index `0` is the last unit to upgrade. - """ - if not self.peer_relation: - return - - self.peer_relation.data[self.charm.app].update({"upgrade-stack": json.dumps(stack)}) - self._upgrade_stack = stack - - @property - def other_unit_states(self) -> list: - """Current upgrade state for other units. - - Returns: - Unsorted list of upgrade states for other units. - """ - if not self.peer_relation: - return [] - - return [ - self.peer_relation.data[unit].get("state", "") - for unit in list(self.peer_relation.units) - ] - - @property - def unit_states(self) -> list: - """Current upgrade state for all units. - - Returns: - Unsorted list of upgrade states for all units. - """ - if not self.peer_relation: - return [] - - return [self.peer_relation.data[unit].get("state", "") for unit in self.app_units] - - @property - def cluster_state(self) -> Optional[str]: - """Current upgrade state for cluster units. - - Determined from :class:`DataUpgrade.STATE`, taking the lowest ordinal unit state. - - For example, if units in have states: `["ready", "upgrading", "completed"]`, - the overall state for the cluster is `ready`. - - Returns: - String of upgrade state from the furthest behind unit. - """ - if not self.unit_states: - return None - - try: - return sorted(self.unit_states, key=self.STATES.index)[0] - except (ValueError, KeyError): - return None - - @property - def idle(self) -> Optional[bool]: - """Flag for whether the cluster is in an idle upgrade state. - - Returns: - True if all application units in idle state. Otherwise False - """ - return set(self.unit_states) == {"idle"} - - @abstractmethod - def pre_upgrade_check(self) -> None: - """Runs necessary checks validating the cluster is in a healthy state to upgrade. - - Called by all units during :meth:`_on_pre_upgrade_check_action`. - - Raises: - :class:`ClusterNotReadyError`: if cluster is not ready to upgrade - """ - pass - - def build_upgrade_stack(self) -> List[int]: - """Builds ordered iterable of all application unit.ids to upgrade in. - - Called by leader unit during :meth:`_on_pre_upgrade_check_action`. - - Returns: - Iterable of integer unit.ids, LIFO ordered in upgrade order - i.e `[5, 2, 4, 1, 3]`, unit `3` upgrades first, `5` upgrades last - """ - # don't raise if k8s substrate, uses default statefulset order - if self.substrate == "k8s": - return [] - - raise NotImplementedError - - @abstractmethod - def log_rollback_instructions(self) -> None: - """Sets charm state and logs out rollback instructions. - - Called by all units when `state=failed` found during :meth:`_on_upgrade_changed`. - """ - pass - - def _repair_upgrade_stack(self) -> None: - """Ensures completed units are re-added to the upgrade-stack after failure.""" - # need to update the stack as it was not refreshed by rollback run of pre-upgrade-check - # avoids difficult health check implementation by charm-authors needing to exclude dead units - - # if the first unit in the stack fails, the stack will be the same length as units - # i.e this block not ran - if ( - self.cluster_state in ["failed", "recovery"] - and self.upgrade_stack - and len(self.upgrade_stack) != len(self.app_units) - and self.charm.unit.is_leader() - ): - new_stack = self.upgrade_stack - for unit in self.app_units: - unit_id = int(unit.name.split("/")[1]) - - # if a unit fails, it rolls back first - if unit_id not in new_stack: - new_stack.insert(-1, unit_id) - logger.debug(f"Inserted {unit_id} in to upgrade-stack - {new_stack}") - - self.upgrade_stack = new_stack - - def set_unit_failed(self, cause: Optional[str] = None) -> None: - """Sets unit `state=failed` to the upgrade peer data. - - Args: - cause: short description of cause of failure - """ - if not self.peer_relation: - return None - - # needed to refresh the stack - # now leader pulls a fresh stack from newly updated relation data - if self.charm.unit.is_leader(): - self._upgrade_stack = None - - self.charm.unit.status = BlockedStatus(cause if cause else "") - self.peer_relation.data[self.charm.unit].update({"state": "failed"}) - self.log_rollback_instructions() - - def set_unit_completed(self) -> None: - """Sets unit `state=completed` to the upgrade peer data.""" - if not self.peer_relation: - return None - - # needed to refresh the stack - # now leader pulls a fresh stack from newly updated relation data - if self.charm.unit.is_leader(): - self._upgrade_stack = None - - self.charm.unit.status = MaintenanceStatus("upgrade completed") - self.peer_relation.data[self.charm.unit].update({"state": "completed"}) - - # Emit upgrade_finished event to run unit's post upgrade operations. - if self.substrate == "k8s": - logger.debug( - f"{self.charm.unit.name} has completed the upgrade, emitting `upgrade_finished` event..." - ) - getattr(self.on, "upgrade_finished").emit() - - def _on_upgrade_created(self, event: RelationCreatedEvent) -> None: - """Handler for `upgrade-relation-created` events.""" - if not self.peer_relation: - event.defer() - return - - # setting initial idle state needed to avoid execution on upgrade-changed events - self.peer_relation.data[self.charm.unit].update({"state": "idle"}) - - if self.charm.unit.is_leader(): - logger.debug("Persisting dependencies to upgrade relation data...") - self.peer_relation.data[self.charm.app].update( - {"dependencies": json.dumps(self.dependency_model.dict())} - ) - - def _on_pre_upgrade_check_action(self, event: ActionEvent) -> None: - """Handler for `pre-upgrade-check-action` events.""" - if not self.peer_relation: - event.fail(message="Could not find upgrade relation.") - return - - if not self.charm.unit.is_leader(): - event.fail(message="Action must be ran on the Juju leader.") - return - - if self.cluster_state == "failed": - logger.info("Entering recovery state for rolling-back to previous version...") - self._repair_upgrade_stack() - self.charm.unit.status = BlockedStatus("ready to rollback application") - self.peer_relation.data[self.charm.unit].update({"state": "recovery"}) - return - - # checking if upgrade in progress - if self.cluster_state != "idle": - event.fail("Cannot run pre-upgrade checks, cluster already upgrading.") - return - - try: - logger.info("Running pre-upgrade-check...") - self.pre_upgrade_check() - - if self.substrate == "k8s": - logger.info("Building upgrade-stack for K8s...") - built_upgrade_stack = sorted( - [int(unit.name.split("/")[1]) for unit in self.app_units] - ) - else: - logger.info("Building upgrade-stack for VMs...") - built_upgrade_stack = self.build_upgrade_stack() - - logger.debug(f"Built upgrade stack of {built_upgrade_stack}") - - except ClusterNotReadyError as e: - logger.error(e) - event.fail(message=e.message) - return - except Exception as e: - logger.error(e) - event.fail(message="Unknown error found.") - return - - logger.info("Setting upgrade-stack to relation data...") - self.upgrade_stack = built_upgrade_stack - - def _on_resume_upgrade_action(self, event: ActionEvent) -> None: - """Handle resume upgrade action. - - Continue the upgrade by setting the partition to the next unit. - """ - if not self.peer_relation: - event.fail(message="Could not find upgrade relation.") - return - - if not self.charm.unit.is_leader(): - event.fail(message="Action must be ran on the Juju leader.") - return - - if not self.upgrade_stack: - event.fail(message="Nothing to resume, upgrade stack unset.") - return - - # Check whether this is being run after juju refresh was called - # (the size of the upgrade stack should match the number of total - # unit minus one). - if len(self.upgrade_stack) != len(self.peer_relation.units): - event.fail(message="Upgrade can be resumed only once after juju refresh is called.") - return - - try: - next_partition = self.upgrade_stack[-1] - self._set_rolling_update_partition(partition=next_partition) - event.set_results({"message": f"Upgrade will resume on unit {next_partition}"}) - except KubernetesClientError: - event.fail(message="Cannot set rolling update partition.") - - def _upgrade_supported_check(self) -> None: - """Checks if previous versions can be upgraded to new versions. - - Raises: - :class:`VersionError` if upgrading to existing `version` is not supported - """ - keys = self.dependency_model.__fields__.keys() - - compatible = True - incompatibilities: List[Tuple[str, str, str, str]] = [] - for key in keys: - old_dep: DependencyModel = getattr(self.stored_dependencies, key) - new_dep: DependencyModel = getattr(self.dependency_model, key) - - if not old_dep.can_upgrade(dependency=new_dep): - compatible = False - incompatibilities.append( - (key, old_dep.version, new_dep.version, new_dep.upgrade_supported) - ) - - base_message = "Versions incompatible" - base_cause = "Upgrades only supported for specific versions" - if not compatible: - for incompat in incompatibilities: - base_message += ( - f", {incompat[0]} {incompat[1]} can not be upgraded to {incompat[2]}" - ) - base_cause += f", {incompat[0]} versions satisfying requirement {incompat[3]}" - - raise VersionError( - message=base_message, - cause=base_cause, - ) - - def _on_upgrade_charm(self, event: UpgradeCharmEvent) -> None: - """Handler for `upgrade-charm` events.""" - # defer if not all units have pre-upgraded - if not self.peer_relation: - event.defer() - return - - if not self.upgrade_stack: - logger.error("Cluster upgrade failed, ensure pre-upgrade checks are ran first.") - return - - if self.substrate == "vm": - # for VM run version checks on leader only - if self.charm.unit.is_leader(): - try: - self._upgrade_supported_check() - except VersionError as e: # not ready if not passed check - logger.error(e) - self.set_unit_failed() - return - self.charm.unit.status = WaitingStatus("other units upgrading first...") - self.peer_relation.data[self.charm.unit].update({"state": "ready"}) - - if self.charm.app.planned_units() == 1: - # single unit upgrade, emit upgrade_granted event right away - getattr(self.on, "upgrade_granted").emit() - - else: - # for k8s run version checks only on highest ordinal unit - if ( - self.charm.unit.name - == f"{self.charm.app.name}/{self.charm.app.planned_units() -1}" - ): - try: - self._upgrade_supported_check() - except VersionError as e: # not ready if not passed check - logger.error(e) - self.set_unit_failed() - return - # On K8s an unit that receives the upgrade-charm event is upgrading - self.charm.unit.status = MaintenanceStatus("upgrading unit") - self.peer_relation.data[self.charm.unit].update({"state": "upgrading"}) - - def on_upgrade_changed(self, event: EventBase) -> None: - """Handler for `upgrade-relation-changed` events.""" - if not self.peer_relation: - return - - # if any other unit failed, don't continue with upgrade - if self.cluster_state == "failed": - logger.debug("Cluster failed to upgrade, exiting...") - return - - if self.substrate == "vm" and self.cluster_state == "recovery": - # skip run while in recovery. The event will be retrigged when the cluster is ready - logger.debug("Cluster in recovery, skip...") - return - - # if all units completed, mark as complete - if not self.upgrade_stack: - if self.state == "completed" and self.cluster_state in ["idle", "completed"]: - logger.info("All units completed upgrade, setting idle upgrade state...") - self.charm.unit.status = ActiveStatus() - self.peer_relation.data[self.charm.unit].update({"state": "idle"}) - - if self.charm.unit.is_leader(): - logger.debug("Persisting new dependencies to upgrade relation data...") - self.peer_relation.data[self.charm.app].update( - {"dependencies": json.dumps(self.dependency_model.dict())} - ) - return - - if self.cluster_state == "idle": - logger.debug("upgrade-changed event handled before pre-checks, exiting...") - return - - logger.debug("Did not find upgrade-stack or completed cluster state, skipping...") - return - - # upgrade ongoing, set status for waiting units - if "upgrading" in self.unit_states and self.state in ["idle", "ready"]: - self.charm.unit.status = WaitingStatus("other units upgrading first...") - - # pop mutates the `upgrade_stack` attr - top_unit_id = self.upgrade_stack.pop() - top_unit = self.charm.model.get_unit(f"{self.charm.app.name}/{top_unit_id}") - top_state = self.peer_relation.data[top_unit].get("state") - - # if top of stack is completed, leader pops it - if self.charm.unit.is_leader() and top_state == "completed": - logger.debug(f"{top_unit} has finished upgrading, updating stack...") - - # writes the mutated attr back to rel data - self.peer_relation.data[self.charm.app].update( - {"upgrade-stack": json.dumps(self.upgrade_stack)} - ) - - # recurse on leader to ensure relation changed event not lost - # in case leader is next or the last unit to complete - self.on_upgrade_changed(event) - - # if unit top of stack and all units ready (i.e stack), emit granted event - if ( - self.charm.unit == top_unit - and top_state in ["ready", "upgrading"] - and self.cluster_state == "ready" - and "upgrading" not in self.other_unit_states - ): - logger.debug( - f"{top_unit.name} is next to upgrade, emitting `upgrade_granted` event and upgrading..." - ) - self.charm.unit.status = MaintenanceStatus("upgrading...") - self.peer_relation.data[self.charm.unit].update({"state": "upgrading"}) - - try: - getattr(self.on, "upgrade_granted").emit() - except DependencyError as e: - logger.error(e) - self.set_unit_failed() - return - - def _on_upgrade_granted(self, event: UpgradeGrantedEvent) -> None: - """Handler for `upgrade-granted` events. - - Handlers of this event must meet the following: - - SHOULD check for related application deps from :class:`DataUpgrade.dependencies` - - MAY raise :class:`DependencyError` if dependency not met - - MUST update unit `state` after validating the success of the upgrade, calling one of: - - :class:`DataUpgrade.set_unit_failed` if the unit upgrade fails - - :class:`DataUpgrade.set_unit_completed` if the unit upgrade succeeds - - MUST call :class:`DataUpgarde.on_upgrade_changed` on exit so event not lost on leader - """ - # don't raise if k8s substrate, only return - if self.substrate == "k8s": - return - - raise NotImplementedError - - def _on_upgrade_finished(self, _) -> None: - """Handler for `upgrade-finished` events.""" - if self.substrate == "vm" or not self.peer_relation: - return - - # Emit the upgrade relation changed event in the leader to update the upgrade_stack. - if self.charm.unit.is_leader(): - self.charm.on[self.relation_name].relation_changed.emit( - self.model.get_relation(self.relation_name) - ) - - # This hook shouldn't run for the last unit (the first that is upgraded). For that unit it - # should be done through an action after the upgrade success on that unit is double-checked. - unit_number = int(self.charm.unit.name.split("/")[1]) - if unit_number == len(self.peer_relation.units): - logger.info( - f"{self.charm.unit.name} unit upgraded. Evaluate and run `resume-upgrade` action to continue upgrade" - ) - return - - # Also, the hook shouldn't run for the first unit (the last that is upgraded). - if unit_number == 0: - logger.info(f"{self.charm.unit.name} unit upgraded. Upgrade is complete") - return - - try: - # Use the unit number instead of the upgrade stack to avoid race conditions - # (i.e. the leader updates the upgrade stack after this hook runs). - next_partition = unit_number - 1 - logger.debug(f"Set rolling update partition to unit {next_partition}") - self._set_rolling_update_partition(partition=next_partition) - except KubernetesClientError: - logger.exception("Cannot set rolling update partition") - self.set_unit_failed() - self.log_rollback_instructions() - - def _set_rolling_update_partition(self, partition: int) -> None: - """Patch the StatefulSet's `spec.updateStrategy.rollingUpdate.partition`. - - Args: - partition: partition to set. - - K8s only. It should decrement the rolling update strategy partition by using a code - like the following: - - from lightkube.core.client import Client - from lightkube.core.exceptions import ApiError - from lightkube.resources.apps_v1 import StatefulSet - - try: - patch = {"spec": {"updateStrategy": {"rollingUpdate": {"partition": partition}}}} - Client().patch(StatefulSet, name=self.charm.model.app.name, namespace=self.charm.model.name, obj=patch) - logger.debug(f"Kubernetes StatefulSet partition set to {partition}") - except ApiError as e: - if e.status.code == 403: - cause = "`juju trust` needed" - else: - cause = str(e) - raise KubernetesClientError("Kubernetes StatefulSet patch failed", cause) - """ - if self.substrate == "vm": - return - - raise NotImplementedError diff --git a/lib/charms/mongodb/v0/config_server_interface.py b/lib/charms/mongodb/v0/config_server_interface.py index dadf4199f..bab57c086 100644 --- a/lib/charms/mongodb/v0/config_server_interface.py +++ b/lib/charms/mongodb/v0/config_server_interface.py @@ -82,11 +82,6 @@ def pass_hook_checks(self, event: EventBase) -> bool: ) return False - if not self.charm.upgrade.idle: - logger.info("cannot process %s, upgrade is in progress", event) - event.defer() - return False - if not self.charm.unit.is_leader(): return False diff --git a/lib/charms/mongodb/v0/mongodb_tls.py b/lib/charms/mongodb/v0/mongodb_tls.py index c061de7f3..2193e2aa0 100644 --- a/lib/charms/mongodb/v0/mongodb_tls.py +++ b/lib/charms/mongodb/v0/mongodb_tls.py @@ -73,10 +73,6 @@ def is_tls_enabled(self, internal: bool): def _on_set_tls_private_key(self, event: ActionEvent) -> None: """Set the TLS private key, which will be used for requesting the certificate.""" - if not self.charm.upgrade.idle: - event.fail("Cannot set TLS key - upgrade is in progress.") - return - logger.debug("Request to set TLS private key received.") if self.charm.is_role(Config.Role.MONGOS) and not self.charm.has_config_server(): logger.error( @@ -145,21 +141,11 @@ def _on_tls_relation_joined(self, event: RelationJoinedEvent) -> None: event.defer() return - if not self.charm.upgrade.idle: - logger.info("cannot process %s, upgrade is in progress", event) - event.defer() - return - self.request_certificate(None, internal=True) self.request_certificate(None, internal=False) def _on_tls_relation_broken(self, event: RelationBrokenEvent) -> None: """Disable TLS when TLS relation broken.""" - if not self.charm.upgrade.idle: - logger.info("cannot process %s, upgrade is in progress", event) - event.defer() - return - logger.debug("Disabling external and internal TLS for unit: %s", self.charm.unit.name) for internal in [True, False]: @@ -179,11 +165,6 @@ def _on_tls_relation_broken(self, event: RelationBrokenEvent) -> None: def _on_certificate_available(self, event: CertificateAvailableEvent) -> None: """Enable TLS when TLS certificate available.""" - if not self.charm.upgrade.idle: - logger.info("cannot process %s, upgrade is in progress", event) - event.defer() - return - if self.charm.is_role(Config.Role.MONGOS) and not self.charm.config_server_db: logger.debug( "mongos requires config-server in order to start, do not restart with TLS until integrated to config-server" @@ -251,11 +232,6 @@ def waiting_for_certs(self): def _on_certificate_expiring(self, event: CertificateExpiringEvent) -> None: """Request the new certificate when old certificate is expiring.""" - if not self.charm.upgrade.idle: - logger.info("cannot process %s, upgrade is in progress", event) - event.defer() - return - if self.charm.is_role(Config.Role.MONGOS) and not self.charm.has_config_server(): logger.info( "mongos is not running (not integrated to config-server) deferring renewal of certificates." diff --git a/lib/charms/mongodb/v1/mongodb_backups.py b/lib/charms/mongodb/v1/mongodb_backups.py index 9760553e9..3d73a8cc4 100644 --- a/lib/charms/mongodb/v1/mongodb_backups.py +++ b/lib/charms/mongodb/v1/mongodb_backups.py @@ -125,11 +125,6 @@ def __init__(self, charm): def on_s3_relation_joined(self, event: RelationJoinedEvent) -> None: """Checks for valid integration for s3-integrations.""" - if not self.charm.upgrade.idle: - logger.info("cannot process %s, upgrade is in progress", event) - event.defer() - return False - if not self.is_valid_s3_integration(): logger.debug( "Shard does not support s3 relations, please relate s3-integrator to config-server only." @@ -329,11 +324,6 @@ def _pass_sanity_checks(self, event, action) -> bool: No matter what backup-action is being run, these requirements must be met. """ - if not self.charm.upgrade.idle: - logger.info("cannot process %s, upgrade is in progress", event) - event.defer() - return False - if not self.is_valid_s3_integration(): self._fail_action_with_error_log( event, diff --git a/lib/charms/mongodb/v1/mongodb_provider.py b/lib/charms/mongodb/v1/mongodb_provider.py index 1b59ea7ab..59c12f1c4 100644 --- a/lib/charms/mongodb/v1/mongodb_provider.py +++ b/lib/charms/mongodb/v1/mongodb_provider.py @@ -89,11 +89,6 @@ def pass_hook_checks(self, event: EventBase) -> bool: """Runs the pre-hooks checks for MongoDBProvider, returns True if all pass.""" # We shouldn't try to create or update users if the database is not # initialised. We will create users as part of initialisation. - if not self.charm.upgrade.idle: - logger.info("cannot process %s, upgrade is in progress", event) - event.defer() - return False - if not self.charm.db_initialised: return False diff --git a/lib/charms/mongodb/v1/shards_interface.py b/lib/charms/mongodb/v1/shards_interface.py index fda0315bd..0d881e3e8 100644 --- a/lib/charms/mongodb/v1/shards_interface.py +++ b/lib/charms/mongodb/v1/shards_interface.py @@ -146,11 +146,6 @@ def _on_relation_joined(self, event): def pass_hook_checks(self, event: EventBase) -> bool: """Runs the pre-hooks checks for ShardingProvider, returns True if all pass.""" - if not self.charm.upgrade.idle: - logger.info("cannot process %s, upgrade is in progress", event) - event.defer() - return False - if not self.charm.db_initialised: logger.info("Deferring %s. db is not initialised.", str(type(event))) event.defer() @@ -547,11 +542,6 @@ def _handle_changed_secrets(self, event) -> None: Changes in secrets do not re-trigger a relation changed event, so it is necessary to listen to secret changes events. """ - if not self.charm.upgrade.idle: - logger.info("cannot process %s, upgrade is in progress", event) - event.defer() - return False - if ( not self.charm.unit.is_leader() or not event.secret.label @@ -655,11 +645,6 @@ def sync_cluster_passwords( def _on_relation_joined(self, event: RelationJoinedEvent): """Sets status and flags in relation data relevant to sharding.""" - if not self.charm.upgrade.idle: - logger.info("cannot process %s, upgrade is in progress", event) - event.defer() - return - # if re-using an old shard, re-set flags. self.charm.unit_peer_data["drained"] = json.dumps(False) self.charm.unit.status = MaintenanceStatus("Adding shard to config-server") @@ -708,11 +693,6 @@ def _on_relation_changed(self, event): def pass_hook_checks(self, event): """Runs the pre-hooks checks for ConfigServerRequirer, returns True if all pass.""" - if not self.charm.upgrade.idle: - logger.info("cannot process %s, upgrade is in progress", event) - event.defer() - return False - if not self.charm.db_initialised: logger.info("Deferring %s. db is not initialised.", str(type(event))) event.defer() diff --git a/src/charm.py b/src/charm.py index cba18f686..b35c14c5c 100755 --- a/src/charm.py +++ b/src/charm.py @@ -76,9 +76,9 @@ from tenacity import Retrying, before_log, retry, stop_after_attempt, wait_fixed from config import Config, Package -from events.upgrade import MongoDBDependencyModel, MongoDBUpgrade from exceptions import AdminUserCreationError, ApplicationHostNotFoundError from machine_helpers import MONGO_USER, ROOT_USER_GID, update_mongod_service +from upgrades.mongodb_upgrades import MongoDBUpgrade AUTH_FAILED_CODE = 18 UNAUTHORISED_CODE = 13 @@ -131,12 +131,7 @@ def __init__(self, *args): self.legacy_client_relations = MongoDBLegacyProvider(self) self.tls = MongoDBTLS(self, Config.Relations.PEERS, substrate=Config.SUBSTRATE) self.backups = MongoDBBackups(self) - self.upgrade = MongoDBUpgrade( - self, - dependency_model=MongoDBDependencyModel( - **Config.DEPENDENCIES # pyright: ignore[reportGeneralTypeIssues, reportArgumentType] - ), - ) # TODO future PR add dependency_model + self.upgrade = MongoDBUpgrade(self) self.config_server = ShardingProvider(self) self.cluster = ClusterProvider(self) self.shard = ConfigServerRequirer(self) @@ -368,7 +363,7 @@ def _on_config_changed(self, event: ConfigChangedEvent) -> None: unresponsive therefore causing a cluster failure, error the component. This prevents it from executing other hooks with a new role. """ - if self.upgrade.idle and self.is_role_changed(): + if self.is_role_changed(): # TODO in the future (24.04) support migration of components logger.error( f"cluster migration currently not supported, cannot change from { self.model.config['role']} to {self.role}" @@ -442,11 +437,6 @@ def _on_relation_joined(self, event: RelationJoinedEvent) -> None: if not self.unit.is_leader(): return - if not self.upgrade.idle: - logger.info("cannot process %s, upgrade is in progress", event) - event.defer() - return - self._on_relation_handler(event) self._update_related_hosts(event) @@ -457,11 +447,6 @@ def _on_relation_handler(self, event: RelationEvent) -> None: Args: event: The triggering relation joined/changed event. """ - if not self.upgrade.idle: - logger.info("cannot process %s, upgrade is in progress", event) - event.defer() - return - # changing the monitor password will lead to non-leader units receiving a relation changed # event. We must update the monitor and pbm URI if the password changes so that COS/pbm # can continue to work @@ -505,11 +490,6 @@ def _on_relation_handler(self, event: RelationEvent) -> None: def _on_leader_elected(self, event: LeaderElectedEvent) -> None: """Generates necessary keyfile and updates replica hosts.""" - if not self.upgrade.idle: - logger.info("cannot process %s, upgrade is in progress", event) - event.defer() - return - if not self.get_secret(APP_SCOPE, Config.Secrets.SECRET_KEYFILE_NAME): self._generate_secrets() @@ -525,11 +505,6 @@ def _on_relation_departed(self, event: RelationDepartedEvent) -> None: if not self.unit.is_leader() or event.departing_unit == self.unit: return - if not self.upgrade.idle: - logger.info("cannot process %s, upgrade is in progress", event) - event.defer() - return - self._update_hosts(event) def _on_storage_detaching(self, event: StorageDetachingEvent) -> None: @@ -579,10 +554,6 @@ def _on_storage_detaching(self, event: StorageDetachingEvent) -> None: logger.error("Failed to remove %s from replica set, error=%r", self.unit.name, e) def _on_update_status(self, event: UpdateStatusEvent): - if not self.upgrade.idle: - logger.info("Processing upgrade, wait to check status") - return - # user-made mistakes might result in other incorrect statues. Prioritise informing users of # their mistake. invalid_integration_status = self.get_invalid_integration_status() @@ -632,10 +603,6 @@ def _on_get_password(self, event: ActionEvent) -> None: def _on_set_password(self, event: ActionEvent) -> None: """Set the password for the admin user.""" - if not self.upgrade.idle: - event.fail("Cannot set password, upgrade is in progress.") - return - # check conditions for setting the password and fail if necessary if not self.pass_pre_set_password_checks(event): return diff --git a/src/config.py b/src/config.py index dd9a398f7..73ede9d44 100644 --- a/src/config.py +++ b/src/config.py @@ -20,15 +20,6 @@ class Config: MONGOD_CONF_DIR = f"{MONGODB_SNAP_DATA_DIR}/etc/mongod" MONGOD_CONF_FILE_PATH = f"{MONGOD_CONF_DIR}/mongod.conf" SNAP_PACKAGES = [("charmed-mongodb", "6/edge", 118)] - DEPENDENCIES = { - "mongod_service": { - "dependencies": {}, - "name": "mongod", - "upgrade_supported": "^6.0.0,<7", - "version": "6.0.6", - }, - # TODO: Future PR - implement mongos deps when supporting sharding upgrades - } # Keep these alphabetically sorted class Actions: diff --git a/src/events/upgrade.py b/src/events/upgrade.py deleted file mode 100644 index 237653a15..000000000 --- a/src/events/upgrade.py +++ /dev/null @@ -1,282 +0,0 @@ -# Copyright 2024 Canonical Ltd. -# See LICENSE file for licensing details. - -"""Manager for handling MongoDB in-place upgrades.""" - -import logging -import secrets -import string -from typing import Tuple - -from charms.data_platform_libs.v0.upgrade import ( - ClusterNotReadyError, - DataUpgrade, - DependencyModel, - UpgradeGrantedEvent, -) -from charms.mongodb.v0.mongodb import MongoDBConfiguration, MongoDBConnection -from charms.operator_libs_linux.v1 import snap -from ops.charm import CharmBase -from ops.model import ActiveStatus -from pydantic import BaseModel -from tenacity import Retrying, retry, stop_after_attempt, wait_fixed -from typing_extensions import override - -from config import Config - -logger = logging.getLogger(__name__) - -WRITE_KEY = "write_value" -MONGOD_SERVICE = "mongod" - - -ROLLBACK_INSTRUCTIONS = """Unit failed to upgrade and requires manual rollback to previous stable version. - 1. Re-run `pre-upgrade-check` action on the leader unit to enter 'recovery' state - 2. Run `juju refresh` to the previously deployed charm revision -""" - - -class FailedToElectNewPrimaryError(Exception): - """Raised when a new primary isn't elected after stepping down.""" - - -class MongoDBDependencyModel(BaseModel): - """Model for MongoDB Operator dependencies.""" - - mongod_service: DependencyModel - # in future have a mongos service here too - - -class MongoDBUpgrade(DataUpgrade): - """Implementation of :class:`DataUpgrade` overrides for in-place upgrades.""" - - def __init__(self, charm: CharmBase, **kwargs): - super().__init__(charm, **kwargs) - self.charm = charm - - @property - def idle(self) -> bool: - """Checks if cluster has completed upgrade. - - Returns: - True if cluster has completed upgrade. Otherwise False - """ - return not bool(self.upgrade_stack) - - @override - def pre_upgrade_check(self) -> None: - """Verifies that an upgrade can be done on the MongoDB deployment.""" - default_message = "Pre-upgrade check failed and cannot safely upgrade" - - if self.charm.is_role(Config.Role.SHARD): - raise ClusterNotReadyError( - message=default_message, - cause="Cannot run pre-upgrade check on shards", - resolution="Run this action on config-server.", - ) - - if not self.is_cluster_healthy(): - raise ClusterNotReadyError( - message=default_message, - cause="Cluster is not healthy", - resolution="Please check juju status for information", - ) - - if not self.is_cluster_able_to_read_write(): - raise ClusterNotReadyError( - message=default_message, cause="Cluster cannot read/write - please check logs" - ) - - # Future PR - sharding based checks - - @retry( - stop=stop_after_attempt(20), - wait=wait_fixed(1), - reraise=True, - ) - def post_upgrade_check(self) -> None: - """Runs necessary checks validating the unit is in a healthy state after upgrade.""" - if not self.is_cluster_able_to_read_write(): - raise ClusterNotReadyError( - message="post-upgrade check failed and cannot safely upgrade", - cause="Cluster cannot read/write", - ) - - @override - def build_upgrade_stack(self) -> list[int]: - """Builds an upgrade stack, specifying the order of nodes to upgrade.""" - if self.charm.is_role(Config.Role.CONFIG_SERVER): - # TODO implement in a future PR a stack for shards and config server - pass - elif self.charm.is_role(Config.Role.REPLICATION): - return self.get_replica_set_upgrade_stack() - - def get_replica_set_upgrade_stack(self) -> list[int]: - """Builds an upgrade stack, specifying the order of nodes to upgrade. - - MongoDB Specific: The primary should be upgraded last, so the unit with the primary is - put at the very bottom of the stack. - """ - upgrade_stack = [] - units = set([self.charm.unit] + list(self.charm.peers.units)) # type: ignore[reportOptionalMemberAccess] - primary_unit_id = None - for unit in units: - unit_id = int(unit.name.split("/")[-1]) - if unit.name == self.charm.primary: - primary_unit_id = unit_id - continue - - upgrade_stack.append(unit_id) - - upgrade_stack.insert(0, primary_unit_id) - return upgrade_stack - - @override - def log_rollback_instructions(self) -> None: - """Logs the rollback instructions in case of failure to upgrade.""" - logger.critical(ROLLBACK_INSTRUCTIONS) - - @override - def _on_upgrade_granted(self, event: UpgradeGrantedEvent) -> None: - """Execute a series of upgrade steps.""" - # TODO: Future PR - check compatibility of new mongod version with current mongos versions - self.charm.stop_charm_services() - - try: - self.charm.install_snap_packages(packages=Config.SNAP_PACKAGES) - except snap.SnapError: - logger.error("Unable to install Snap") - self.set_unit_failed() - return - - if self.charm.unit.name == self.charm.primary: - logger.debug("Stepping down current primary, before upgrading service...") - self.step_down_primary_and_wait_reelection() - - logger.info(f"{self.charm.unit.name} upgrading service...") - self.charm.restart_charm_services() - - try: - logger.debug("Running post-upgrade check...") - self.post_upgrade_check() - - logger.debug("Marking unit completed...") - self.set_unit_completed() - - # ensures leader gets it's own relation-changed when it upgrades - if self.charm.unit.is_leader(): - logger.debug("Re-emitting upgrade-changed on leader...") - self.on_upgrade_changed(event) - - except ClusterNotReadyError as e: - logger.error(e.cause) - self.set_unit_failed() - - def step_down_primary_and_wait_reelection(self) -> bool: - """Steps down the current primary and waits for a new one to be elected.""" - old_primary = self.charm.primary - with MongoDBConnection(self.charm.mongodb_config) as mongod: - mongod.step_down_primary() - - for attempt in Retrying(stop=stop_after_attempt(30), wait=wait_fixed(1), reraise=True): - with attempt: - new_primary = self.charm.primary - if new_primary != old_primary: - raise FailedToElectNewPrimaryError() - - def is_cluster_healthy(self) -> bool: - """Returns True if all nodes in the cluster/replcia set are healthy.""" - if self.charm.is_role(Config.Role.SHARD): - logger.debug("Cannot run full cluster health check on shards") - return False - - charm_status = self.charm.process_statuses() - return self.are_nodes_healthy() and isinstance(charm_status, ActiveStatus) - - def are_nodes_healthy(self) -> bool: - """Returns True if all nodes in the MongoDB deployment are healthy.""" - if self.charm.is_role(Config.Role.CONFIG_SERVER): - # TODO future PR implement this - pass - - if self.charm.is_role(Config.Role.REPLICATION): - with MongoDBConnection(self.charm.mongodb_config) as mongod: - rs_status = mongod.get_replset_status() - rs_status = mongod.client.admin.command("replSetGetStatus") - return not mongod.is_any_sync(rs_status) - - def is_cluster_able_to_read_write(self) -> bool: - """Returns True if read and write is feasible for cluster.""" - if self.charm.is_role(Config.Role.SHARD): - logger.debug("Cannot run read/write check on shard, must run via config-server.") - return False - elif self.charm.is_role(Config.Role.CONFIG_SERVER): - return self.is_sharded_cluster_able_to_read_write() - else: - return self.is_replica_set_able_read_write() - - def is_replica_set_able_read_write(self) -> bool: - """Returns True if is possible to write to primary and read from replicas.""" - collection_name, write_value = self.get_random_write_and_collection() - # add write to primary - self.add_write(self.charm.mongodb_config, collection_name, write_value) - - # verify writes on secondaries - with MongoDBConnection(self.charm.mongodb_config) as mongod: - primary_ip = mongod.primary() - - replica_ips = set(self.charm._unit_ips) - secondary_ips = replica_ips - set(primary_ip) - for secondary_ip in secondary_ips: - if not self.is_excepted_write_on_replica(secondary_ip, collection_name, write_value): - # do not return False immediately - as it is - logger.debug("Secondary with IP %s, does not contain the expected write.") - self.clear_tmp_collection(self.charm.mongodb_config, collection_name) - return False - - self.clear_tmp_collection(self.charm.mongodb_config, collection_name) - return True - - def is_sharded_cluster_able_to_read_write(self) -> bool: - """Returns True if is possible to write each shard and read value from all nodes. - - TODO: Implement in a future PR. - """ - return False - - def clear_tmp_collection( - self, mongodb_config: MongoDBConfiguration, collection_name: str - ) -> None: - """Clears the temporary collection.""" - with MongoDBConnection(mongodb_config) as mongod: - db = mongod.client["admin"] - db.drop_collection(collection_name) - - def is_excepted_write_on_replica( - self, host: str, collection: str, expected_write_value: str - ) -> bool: - """Returns True if the replica contains the expected write in the provided collection.""" - secondary_config = self.charm.mongodb_config - secondary_config.hosts = {host} - with MongoDBConnection(secondary_config, direct=True) as direct_seconary: - db = direct_seconary.client["admin"] - test_collection = db[collection] - query = test_collection.find({}, {WRITE_KEY: 1}) - return query[0][WRITE_KEY] == expected_write_value - - def get_random_write_and_collection(self) -> Tuple[str, str]: - """Returns a tutple for a random collection name and a unique write to add to it.""" - choices = string.ascii_letters + string.digits - collection_name = "collection_" + "".join([secrets.choice(choices) for _ in range(16)]) - write_value = "unique_write_" + "".join([secrets.choice(choices) for _ in range(16)]) - return (collection_name, write_value) - - def add_write( - self, mongodb_config: MongoDBConfiguration, collection_name, write_value - ) -> None: - """Adds a the provided write to the admin database with the provided collection.""" - with MongoDBConnection(mongodb_config) as mongod: - db = mongod.client["admin"] - test_collection = db[collection_name] - write = {WRITE_KEY: write_value} - test_collection.insert_one(write) diff --git a/src/upgrades/machine_upgrade.py b/src/upgrades/machine_upgrade.py new file mode 100644 index 000000000..7078b90b9 --- /dev/null +++ b/src/upgrades/machine_upgrade.py @@ -0,0 +1,150 @@ +# Copyright 2023 Canonical Ltd. +# See LICENSE file for licensing details. + +"""In-place upgrades on machines. + +Derived from specification: DA058 - In-Place Upgrades - Kubernetes v2 +(https://docs.google.com/document/d/1tLjknwHudjcHs42nzPVBNkHs98XxAOT2BXGGpP7NyEU/) +""" +import json +import logging +import time +import typing + +import ops +import upgrade + +from config import Config + +logger = logging.getLogger(__name__) + +_SNAP_REVISION = str(Config.SNAP_PACKAGES[0][2]) + + +class Upgrade(upgrade.Upgrade): + """In-place upgrades on machines.""" + + @property + def unit_state(self) -> typing.Optional[str]: + """Returns the unit state.""" + if ( + self._unit_workload_version is not None + and self._unit_workload_version != self._app_workload_version + ): + logger.debug("Unit upgrade state: outdated") + return "outdated" + return super().unit_state + + @unit_state.setter + def unit_state(self, value: str) -> None: + """Sets the unit state.""" + if value == "healthy": + # Set snap revision on first install + self._unit_databag["snap_revision"] = _SNAP_REVISION + logger.debug(f"Saved {_SNAP_REVISION} in unit databag while setting state healthy") + # Super call + upgrade.Upgrade.unit_state.fset(self, value) + + def _get_unit_healthy_status( + self, *, workload_status: typing.Optional[ops.StatusBase] + ) -> ops.StatusBase: + if self._unit_workload_version == self._app_workload_version: + if isinstance(workload_status, ops.WaitingStatus): + return ops.WaitingStatus( + f'Router {self._current_versions["workload"]} rev {self._unit_workload_version}' + ) + return ops.ActiveStatus( + f'Router {self._current_versions["workload"]} rev {self._unit_workload_version} running' + ) + if isinstance(workload_status, ops.WaitingStatus): + return ops.WaitingStatus( + f'Charmed operator upgraded. Router {self._current_versions["workload"]} rev {self._unit_workload_version}' + ) + return ops.WaitingStatus( + f'Charmed operator upgraded. Router {self._current_versions["workload"]} rev {self._unit_workload_version} running' + ) + + @property + def app_status(self) -> typing.Optional[ops.StatusBase]: + """Returns the status of the upgrade for the application.""" + if not self.is_compatible: + logger.info( + "Upgrade incompatible. If you accept potential *data loss* and *downtime*, you can continue by running `force-upgrade` action on each remaining unit" + ) + return ops.BlockedStatus( + "Upgrade incompatible. Rollback to previous revision with `juju refresh`" + ) + return super().app_status + + @property + def _unit_workload_versions(self) -> typing.Dict[str, str]: + """{Unit name: installed snap revision}.""" + versions = {} + for unit in self._sorted_units: + if version := (self._peer_relation.data[unit].get("snap_revision")): + versions[unit.name] = version + return versions + + @property + def _unit_workload_version(self) -> typing.Optional[str]: + """Installed snap revision for this unit.""" + return self._unit_databag.get("snap_revision") + + @property + def _app_workload_version(self) -> str: + """Snap revision for current charm code.""" + return _SNAP_REVISION + + def reconcile_partition(self, *, action_event: ops.ActionEvent = None) -> None: + """Handle Juju action to confirm first upgraded unit is healthy and resume upgrade.""" + if action_event: + self.upgrade_resumed = True + message = "Upgrade resumed." + action_event.set_results({"result": message}) + logger.debug(f"Resume upgrade event succeeded: {message}") + + @property + def upgrade_resumed(self) -> bool: + """Whether user has resumed upgrade with Juju action. + + Reset to `False` after each `juju refresh` + """ + return json.loads(self._app_databag.get("upgrade-resumed", "false")) + + @upgrade_resumed.setter + def upgrade_resumed(self, value: bool): + # Trigger peer relation_changed event even if value does not change + # (Needed when leader sets value to False during `ops.UpgradeCharmEvent`) + self._app_databag["-unused-timestamp-upgrade-resume-last-updated"] = str(time.time()) + + self._app_databag["upgrade-resumed"] = json.dumps(value) + logger.debug(f"Set upgrade-resumed to {value=}") + + @property + def authorized(self) -> bool: + """Returns True if the unit is authorized to upgrade.""" + assert self._unit_workload_version != self._app_workload_version + for index, unit in enumerate(self._sorted_units): + if unit.name == self._unit.name: + # Higher number units have already upgraded + if index == 1: + # User confirmation needed to resume upgrade (i.e. upgrade second unit) + logger.debug(f"Second unit authorized to upgrade if {self.upgrade_resumed=}") + return self.upgrade_resumed + return True + if ( + self._unit_workload_versions.get(unit.name) != self._app_workload_version + or self._peer_relation.data[unit].get("state") != "healthy" + ): + # Waiting for higher number units to upgrade + return False + return False + + def upgrade_unit(self, *, snap) -> None: + """Runs the upgrade procedure.""" + # TODO: Future PR - run mongodb specific checks for upgrade + logger.debug(f"Upgrading {self.authorized=}") + self.unit_state = "upgrading" + snap.install() + self._unit_databag["snap_revision"] = _SNAP_REVISION + logger.debug(f"Saved {_SNAP_REVISION} in unit databag after upgrade") diff --git a/src/upgrades/mongodb_upgrades.py b/src/upgrades/mongodb_upgrades.py new file mode 100644 index 000000000..cb3a19adb --- /dev/null +++ b/src/upgrades/mongodb_upgrades.py @@ -0,0 +1,189 @@ +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Manager for handling MongoDB in-place upgrades.""" + +import logging +import secrets +import string +from typing import Tuple + +from charms.mongodb.v0.mongodb import MongoDBConfiguration, MongoDBConnection +from ops.charm import CharmBase +from ops.framework import Object +from ops.model import ActiveStatus +from pymongo.errors import OperationFailure, PyMongoError, ServerSelectionTimeoutError + +from config import Config + +logger = logging.getLogger(__name__) + + +WRITE_KEY = "write_value" +UPGRADE_RELATION = "upgrade" + + +class FailedToMovePrimaryError(Exception): + """Raised when attempt to move a primary fails.""" + + +class MongoDBUpgrade(Object): + """Handlers for upgrade events.""" + + def __init__(self, charm: CharmBase): + self.charm = charm + super().__init__(charm, UPGRADE_RELATION) + self.framework.observe(self.charm.on.pre_upgrade_check_action, self.on_pre_upgrade_check) + + def on_pre_upgrade_check(self, event) -> None: + """Verifies that an upgrade can be done on the MongoDB deployment.""" + # TODO Future PR - integrate this into the juju refresh procedure as to automatically run. + + if self.charm.is_role(Config.Role.SHARD): + event.fail( + "Cannot run pre-upgrade check on shards, run this action on the related config-server." + ) + + if not self.is_cluster_healthy(): + event.fail( + "Cluster is not healthy, do not proceed with ugprade. Please check juju debug for information." + ) + + # We do not get to decide the order of units to upgrade, so we move the primary to the + # last unit to upgrade. This prevents the primary from jumping around from unit to unit + # during the upgrade procedure. + try: + self.move_primary_to_last_upgrade_unit() + except FailedToMovePrimaryError: + event.fail( + "Cluster failed to move primary before re-election. do not proceed with ugprade." + ) + + if not self.is_cluster_able_to_read_write(): + event.fail( + "Cluster is not healthy cannot read/write to replicas, do not proceed with ugprade. Please check juju debug for information." + ) + + event.set_results({"message": "Pre-upgrade check successful. Proceed with ugprade."}) + + def move_primary_to_last_upgrade_unit(self) -> None: + """Moves the primary to last unit that gets upgraded (the unit with the lowest id). + + TODO implement in a future PR + """ + + def is_cluster_healthy(self) -> bool: + """Returns True if all nodes in the cluster/replcia set are healthy.""" + if self.charm.is_role(Config.Role.SHARD): + logger.debug("Cannot run full cluster health check on shards") + return False + + # TODO Future PR - find a way to check shard statuses from config-server + charm_status = self.charm.process_statuses() + return self.are_nodes_healthy() and isinstance(charm_status, ActiveStatus) + + def are_nodes_healthy(self) -> bool: + """Returns True if all nodes in the MongoDB deployment are healthy.""" + try: + if self.charm.is_role(Config.Role.CONFIG_SERVER): + # TODO Future PR - implement node healthy check for sharded cluster + pass + if self.charm.is_role(Config.Role.REPLICATION): + return self.are_replica_set_nodes_healthy(self.charm.mongodb_config) + except (PyMongoError, OperationFailure, ServerSelectionTimeoutError) as e: + logger.debug( + "Cannot proceed with upgrade. Failed to check cluster health, error: %s", e + ) + return False + + def are_replica_set_nodes_healthy(self, mongodb_config: MongoDBConfiguration) -> bool: + """Returns true if all nodes in the MongoDB replica set are healthy.""" + with MongoDBConnection(mongodb_config) as mongod: + rs_status = mongod.get_replset_status() + rs_status = mongod.client.admin.command("replSetGetStatus") + return not mongod.is_any_sync(rs_status) + + def is_cluster_able_to_read_write(self) -> bool: + """Returns True if read and write is feasible for cluster.""" + if self.charm.is_role(Config.Role.SHARD): + logger.debug("Cannot run read/write check on shard, must run via config-server.") + return False + elif self.charm.is_role(Config.Role.CONFIG_SERVER): + # TODO Future PR - implement node healthy check for sharded cluster + pass + else: + return self.is_replica_set_able_read_write() + + def is_replica_set_able_read_write(self) -> bool: + """Returns True if is possible to write to primary and read from replicas.""" + collection_name, write_value = self.get_random_write_and_collection() + self.add_write_to_replica_set(self.charm.mongodb_config, collection_name, write_value) + write_replicated = self.is_write_on_secondaries( + self.charm.mongodb_config, collection_name, write_value + ) + self.clear_tmp_collection(self.charm.mongodb_config, collection_name) + return write_replicated + + def clear_tmp_collection( + self, mongodb_config: MongoDBConfiguration, collection_name: str + ) -> None: + """Clears the temporary collection.""" + with MongoDBConnection(mongodb_config) as mongod: + db = mongod.client["admin"] + db.drop_collection(collection_name) + + def is_excepted_write_on_replica( + self, + host: str, + db_name: str, + collection: str, + expected_write_value: str, + secondary_config: MongoDBConfiguration, + ) -> bool: + """Returns True if the replica contains the expected write in the provided collection.""" + secondary_config.hosts = {host} + with MongoDBConnection(secondary_config, direct=True) as direct_seconary: + db = direct_seconary.client[db_name] + test_collection = db[collection] + query = test_collection.find({}, {WRITE_KEY: 1}) + return query[0][WRITE_KEY] == expected_write_value + + def get_random_write_and_collection(self) -> Tuple[str, str]: + """Returns a tutple for a random collection name and a unique write to add to it.""" + choices = string.ascii_letters + string.digits + collection_name = "collection_" + "".join([secrets.choice(choices) for _ in range(32)]) + write_value = "unique_write_" + "".join([secrets.choice(choices) for _ in range(16)]) + return (collection_name, write_value) + + def add_write_to_replica_set( + self, mongodb_config: MongoDBConfiguration, collection_name, write_value + ) -> None: + """Adds a the provided write to the admin database with the provided collection.""" + with MongoDBConnection(mongodb_config) as mongod: + db = mongod.client["admin"] + test_collection = db[collection_name] + write = {WRITE_KEY: write_value} + test_collection.insert_one(write) + + def is_write_on_secondaries( + self, + mongodb_config: MongoDBConfiguration, + collection_name, + expected_write_value, + db_name: str = "admin", + ): + """Returns true if the expected write.""" + with MongoDBConnection(mongodb_config) as mongod: + primary_ip = mongod.primary() + + replica_ips = mongodb_config.hosts + secondary_ips = replica_ips - set(primary_ip) + for secondary_ip in secondary_ips: + if not self.is_excepted_write_on_replica( + secondary_ip, db_name, collection_name, expected_write_value, mongodb_config + ): + # do not return False immediately - as it is + logger.debug("Secondary with IP %s, does not contain the expected write.") + return False + + return True diff --git a/src/upgrades/upgrade.py b/src/upgrades/upgrade.py new file mode 100644 index 000000000..b61cac2d8 --- /dev/null +++ b/src/upgrades/upgrade.py @@ -0,0 +1,227 @@ +# Copyright 2023 Canonical Ltd. +# See LICENSE file for licensing details. + +"""In-place upgrades. + +Based off specification: DA058 - In-Place Upgrades - Kubernetes v2 +(https://docs.google.com/document/d/1tLjknwHudjcHs42nzPVBNkHs98XxAOT2BXGGpP7NyEU/) +""" + +import abc +import copy +import json +import logging +import pathlib +import typing + +import ops +import poetry.core.constraints.version as poetry_version + +logger = logging.getLogger(__name__) + +PEER_RELATION_ENDPOINT_NAME = "upgrade-version-a" +RESUME_ACTION_NAME = "resume-upgrade" + + +def unit_number(unit_: ops.Unit) -> int: + """Get unit number.""" + return int(unit_.name.split("/")[-1]) + + +class PeerRelationNotReadyError(Exception): + """Upgrade peer relation not available (to this unit).""" + + +class Upgrade(abc.ABC): + """In-place upgrades.""" + + def __init__(self, charm_: ops.CharmBase) -> None: + relations = charm_.model.relations[PEER_RELATION_ENDPOINT_NAME] + if not relations: + raise PeerRelationNotReadyError + assert len(relations) == 1 + self._peer_relation = relations[0] + self._unit: ops.Unit = charm_.unit + self._unit_databag = self._peer_relation.data[self._unit] + self._app_databag = self._peer_relation.data[charm_.app] + self._app_name = charm_.app.name + self._current_versions = {} # For this unit + for version, file_name in { + "charm": "charm_version", + "workload": "workload_version", + }.items(): + self._current_versions[version] = pathlib.Path(file_name).read_text().strip() + + @property + def unit_state(self) -> typing.Optional[str]: + """Unit upgrade state.""" + return self._unit_databag.get("state") + + @unit_state.setter + def unit_state(self, value: str) -> None: + """Set unit upgrade state.""" + self._unit_databag["state"] = value + + @property + def is_compatible(self) -> bool: + """Whether upgrade is supported from previous.""" + assert self.versions_set + try: + previous_version_strs: typing.Dict[str, str] = json.loads( + self._app_databag["versions"] + ) + except KeyError as exception: + logger.debug("`versions` missing from peer relation", exc_info=exception) + return False + # TODO charm versioning: remove `.split("+")` (which removes git hash before comparing) + previous_version_strs["charm"] = previous_version_strs["charm"].split("+")[0] + previous_versions: typing.Dict[str, poetry_version.Version] = { + key: poetry_version.Version.parse(value) + for key, value in previous_version_strs.items() + } + current_version_strs = copy.copy(self._current_versions) + current_version_strs["charm"] = current_version_strs["charm"].split("+")[0] + current_versions = { + key: poetry_version.Version.parse(value) for key, value in current_version_strs.items() + } + try: + # TODO Future PR: change this > sign to support downgrades + if ( + previous_versions["charm"] > current_versions["charm"] + or previous_versions["charm"].major != current_versions["charm"].major + ): + logger.debug( + f'{previous_versions["charm"]=} incompatible with {current_versions["charm"]=}' + ) + return False + if ( + previous_versions["workload"] > current_versions["workload"] + or previous_versions["workload"].major != current_versions["workload"].major + or previous_versions["workload"].minor != current_versions["workload"].minor + ): + logger.debug( + f'{previous_versions["workload"]=} incompatible with {current_versions["workload"]=}' + ) + return False + logger.debug( + f"Versions before upgrade compatible with versions after upgrade {previous_version_strs=} {self._current_versions=}" + ) + return True + except KeyError as exception: + logger.debug(f"Version missing from {previous_versions=}", exc_info=exception) + return False + + @property + def in_progress(self) -> bool: + """Returns True if the upgrade is in progress.""" + logger.debug(f"{self._app_workload_version=} {self._unit_workload_versions=}") + return any( + version != self._app_workload_version + for version in self._unit_workload_versions.values() + ) + + @property + def _sorted_units(self) -> typing.List[ops.Unit]: + """Units sorted from highest to lowest unit number.""" + return sorted((self._unit, *self._peer_relation.units), key=unit_number, reverse=True) + + @abc.abstractmethod + def _get_unit_healthy_status( + self, *, workload_status: typing.Optional[ops.StatusBase] + ) -> ops.StatusBase: + """Status shown during upgrade if unit is healthy.""" + + def get_unit_juju_status( + self, *, workload_status: typing.Optional[ops.StatusBase] + ) -> typing.Optional[ops.StatusBase]: + """Returns the unit status for the upgrade.""" + # TODO: revise status handling & priority + if self.in_progress: + return self._get_unit_healthy_status(workload_status=workload_status) + + @property + def app_status(self) -> typing.Optional[ops.StatusBase]: + """Return the app status for the upgrade.""" + if not self.in_progress: + return + if not self.upgrade_resumed: + # User confirmation needed to resume upgrade (i.e. upgrade second unit) + # Statuses over 120 characters are truncated in `juju status` as of juju 3.1.6 and + # 2.9.45 + return ops.BlockedStatus( + f"Upgrading. Verify highest unit is healthy & run `{RESUME_ACTION_NAME}` action. To rollback, `juju refresh` to last revision" + ) + return ops.MaintenanceStatus( + "Upgrading. To rollback, `juju refresh` to the previous revision" + ) + + @property + def versions_set(self) -> bool: + """Whether versions have been saved in app databag. + + Should only be `False` during first charm install. + + If a user upgrades from a charm that does not set versions, this charm will get stuck. + """ + return self._app_databag.get("versions") is not None + + def set_versions_in_app_databag(self) -> None: + """Save current versions in app databag. + + Used after next upgrade to check compatibility (i.e. whether that upgrade should be + allowed). + """ + assert not self.in_progress + logger.debug(f"Setting {self._current_versions=} in upgrade peer relation app databag") + self._app_databag["versions"] = json.dumps(self._current_versions) + logger.debug(f"Set {self._current_versions=} in upgrade peer relation app databag") + + @property + @abc.abstractmethod + def upgrade_resumed(self) -> bool: + """Whether user has resumed upgrade with Juju action.""" + + @property + @abc.abstractmethod + def _unit_workload_versions(self) -> typing.Dict[str, str]: + """{Unit name: unique identifier for unit's workload version}. + + If and only if this version changes, the workload will restart (during upgrade or + rollback). + + On Kubernetes, the workload & charm are upgraded together + On machines, the charm is upgraded before the workload + + This identifier should be comparable to `_app_workload_version` to determine if the unit & + app are the same workload version. + """ + + @property + @abc.abstractmethod + def _app_workload_version(self) -> str: + """Unique identifier for the app's workload version. + + This should match the workload version in the current Juju app charm version. + + This identifier should be comparable to `_get_unit_workload_version` to determine if the + app & unit are the same workload version. + """ + + @abc.abstractmethod + def reconcile_partition(self, *, action_event: ops.ActionEvent = None) -> None: + """If ready, allow next unit to upgrade.""" + + @property + @abc.abstractmethod + def authorized(self) -> bool: + """Whether this unit is authorized to upgrade. + + Only applies to machine charm + """ + + @abc.abstractmethod + def upgrade_unit(self, *, snap) -> None: + """Upgrade this unit. + + Only applies to machine charm + """ diff --git a/tests/integration/test_charm.py b/tests/integration/test_charm.py index 3c7ba8b5e..da5edd101 100644 --- a/tests/integration/test_charm.py +++ b/tests/integration/test_charm.py @@ -59,6 +59,7 @@ async def test_build_and_deploy(ops_test: OpsTest) -> None: await ops_test.model.wait_for_idle() +@pytest.mark.skip("Skip this test until upgrade are implemented in the new way") @pytest.mark.group(1) async def test_consistency_between_workload_and_metadata(ops_test: OpsTest): """Verifies that the dependencies in the charm version are accurate.""" @@ -70,6 +71,8 @@ async def test_consistency_between_workload_and_metadata(ops_test: OpsTest): # version has format x.y.z-a mongod_version = client.server_info()["version"].split("-")[0] + # Future PR - change the dependency check to check the file for workload and charm version + # instead assert ( mongod_version == Config.DEPENDENCIES["mongod_service"]["version"] ), f"Version of mongod running does not match dependency matrix, update DEPENDENCIES in src/config.py to {mongod_version}" diff --git a/tests/integration/upgrade/test_upgrade.py b/tests/integration/upgrade/test_upgrade.py index 67d2c5af8..c441487ea 100644 --- a/tests/integration/upgrade/test_upgrade.py +++ b/tests/integration/upgrade/test_upgrade.py @@ -34,14 +34,17 @@ async def test_build_and_deploy(ops_test: OpsTest) -> None: await check_or_scale_app(ops_test, app_name, required_units=3) return - # TODO: When `6/stable` track supports upgrades deploy and test that revision instead. - await ops_test.model.deploy("mongodb", channel="edge", num_units=3) + # TODO: When upgrades are supported, deploy with most recent revision (6/stable when possible, + # but 6/edge as soon as available) + charm = await ops_test.build_charm(".") + await ops_test.model.deploy(charm, channel="edge", num_units=3) await ops_test.model.wait_for_idle( apps=["mongodb"], status="active", timeout=1000, idle_period=120 ) +@pytest.mark.skip("re-enable these tests once upgrades are functioning") @pytest.mark.group(1) async def test_upgrade(ops_test: OpsTest, continuous_writes) -> None: """Verifies that the upgrade can run successfully.""" diff --git a/tests/unit/test_upgrade.py b/tests/unit/test_upgrade.py deleted file mode 100644 index 93df5c721..000000000 --- a/tests/unit/test_upgrade.py +++ /dev/null @@ -1,132 +0,0 @@ -# Copyright 2024 Canonical Ltd. -# See LICENSE file for licensing details. -import unittest -from unittest import mock -from unittest.mock import MagicMock, patch - -from charms.data_platform_libs.v0.upgrade import ClusterNotReadyError -from charms.operator_libs_linux.v1 import snap -from ops.model import ActiveStatus, BlockedStatus, MaintenanceStatus -from ops.testing import Harness - -from charm import MongodbOperatorCharm - -from .helpers import patch_network_get - - -class TestCharm(unittest.TestCase): - def setUp(self, *unused): - self.harness = Harness(MongodbOperatorCharm) - self.addCleanup(self.harness.cleanup) - self.harness.begin() - self.peer_rel_id = self.harness.add_relation("database-peers", "database-peers") - self.peer_rel_id = self.harness.add_relation("upgrade", "upgrade") - - @patch_network_get(private_address="1.1.1.1") - @patch("events.upgrade.MongoDBConnection") - def test_is_cluster_healthy(self, connection): - """Test is_cluster_healthy function.""" - - def is_shard_mock_call(*args): - return args == ("shard",) - - def is_replication_mock_call(*args): - return args == ("replication",) - - active_status = mock.Mock() - active_status.return_value = ActiveStatus() - - blocked_status = mock.Mock() - blocked_status.return_value = BlockedStatus() - - # case 1: running on a shard - self.harness.charm.is_role = is_shard_mock_call - assert not self.harness.charm.upgrade.is_cluster_healthy() - - # case 2: cluster is still syncing - self.harness.charm.is_role = is_replication_mock_call - self.harness.charm.process_statuses = active_status - connection.return_value.__enter__.return_value.is_any_sync.return_value = True - assert not self.harness.charm.upgrade.is_cluster_healthy() - - # case 3: unit is not active - self.harness.charm.process_statuses = blocked_status - connection.return_value.__enter__.return_value.is_any_sync.return_value = False - assert not self.harness.charm.upgrade.is_cluster_healthy() - - # # case 4: cluster is helathy - self.harness.charm.process_statuses = active_status - assert self.harness.charm.upgrade.is_cluster_healthy() - - @patch_network_get(private_address="1.1.1.1") - @patch("events.upgrade.MongoDBConnection") - @patch("charm.MongoDBUpgrade.is_excepted_write_on_replica") - def test_is_replica_set_able_read_write(self, is_excepted_write_on_replica, connection): - """Test test_is_replica_set_able_read_write function.""" - # case 1: writes are not present on secondaries - is_excepted_write_on_replica.return_value = False - assert not self.harness.charm.upgrade.is_replica_set_able_read_write() - - # case 2: writes are present on secondaries - is_excepted_write_on_replica.return_value = True - assert self.harness.charm.upgrade.is_replica_set_able_read_write() - - @patch_network_get(private_address="1.1.1.1") - @patch("charm.MongoDBConnection") - def test_build_upgrade_stack(self, connection): - """Tests that build upgrade stack puts the primary unit at the bottom of the stack.""" - rel_id = self.harness.charm.model.get_relation("database-peers").id - self.harness.add_relation_unit(rel_id, "mongodb/1") - connection.return_value.__enter__.return_value.primary.return_value = "1.1.1.1" - assert self.harness.charm.upgrade.build_upgrade_stack() == [0, 1] - - @patch_network_get(private_address="1.1.1.1") - @patch("events.upgrade.Retrying") - @patch("charm.MongoDBUpgrade.is_excepted_write_on_replica") - @patch("charm.MongodbOperatorCharm.restart_charm_services") - @patch("charm.MongoDBConnection") - @patch("events.upgrade.MongoDBConnection") - @patch("charm.MongodbOperatorCharm.install_snap_packages") - @patch("charm.MongodbOperatorCharm.stop_charm_services") - @patch("charm.MongoDBUpgrade.post_upgrade_check") - def test_on_upgrade_granted( - self, - post_upgrade_check, - stop_charm_services, - install_snap_packages, - connection_1, - connection_2, - restart, - is_excepted_write_on_replica, - retrying, - ): - # upgrades need a peer relation to proceed - rel_id = self.harness.charm.model.get_relation("database-peers").id - self.harness.add_relation_unit(rel_id, "mongodb/1") - - # case 1: fails to install snap_packages - install_snap_packages.side_effect = snap.SnapError - mock_event = MagicMock() - self.harness.charm.upgrade._on_upgrade_granted(mock_event) - restart.assert_not_called() - - # case 2: post_upgrade_check fails - install_snap_packages.side_effect = None - # disable_retry - post_upgrade_check.side_effect = ClusterNotReadyError( - "post-upgrade check failed and cannot safely upgrade", - cause="Cluster cannot read/write", - ) - mock_event = MagicMock() - self.harness.charm.upgrade._on_upgrade_granted(mock_event) - restart.assert_called() - self.assertTrue(isinstance(self.harness.charm.unit.status, BlockedStatus)) - - # case 3: everything works - install_snap_packages.side_effect = None - is_excepted_write_on_replica.return_value = True - post_upgrade_check.side_effect = None - mock_event = MagicMock() - self.harness.charm.upgrade._on_upgrade_granted(mock_event) - restart.assert_called() - self.assertTrue(isinstance(self.harness.charm.unit.status, MaintenanceStatus))