diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 6fb55da..61af795 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -23,7 +23,7 @@ jobs: runs-on: ubuntu-latest steps: - name: Checkout - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Run tests uses: get-woke/woke-action@v0 with: @@ -34,7 +34,7 @@ jobs: runs-on: ubuntu-latest steps: - name: Checkout - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Install dependencies run: python3 -m pip install tox - name: Run linters @@ -45,12 +45,23 @@ jobs: runs-on: ubuntu-latest steps: - name: Checkout - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Install dependencies run: python3 -m pip install tox - name: Run tests run: tox -e unit + type-check: + name: Static type checking + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + - name: Install dependencies + run: python3 -m pip install tox + - name: Run tests + run: tox -e type + integration-test: strategy: fail-fast: true @@ -63,9 +74,10 @@ jobs: - inclusive-naming-check - lint - unit-test + - type-check steps: - name: Checkout - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Setup operator environment uses: charmed-kubernetes/actions-operator@main with: diff --git a/.gitignore b/.gitignore index 9ceb2e5..da25869 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,4 @@ __pycache__/ .idea .vscode/ version +.ruff_cache diff --git a/README.md b/README.md index bb7145f..7fadd0d 100644 --- a/README.md +++ b/README.md @@ -24,13 +24,7 @@ This operator should be used with Juju 3.x or greater. ```shell $ juju deploy slurmctld --channel edge $ juju deploy slurmd --channel edge -$ juju deploy slurmdbd --channel edge -$ juju deploy mysql --channel 8.0/edge -$ juju deploy mysql-router slurmdbd-mysql-router --channel dpe/edge -$ juju integrate slurmctld:slurmd slurmd:slurmd -$ juju integrate slurmdbd-mysql-router:backend-database mysql:database -$ juju integrate slurmdbd:database slurmdbd-mysql-router:database -$ juju integrate slurmctld:slurmdbd slurmdbd:slurmdbd +$ juju integrate slurmctld:slurmd slurmd:slurmctld ``` ## Project & Community diff --git a/charmcraft.yaml b/charmcraft.yaml index 9b57f0f..db30591 100644 --- a/charmcraft.yaml +++ b/charmcraft.yaml @@ -22,9 +22,6 @@ links: source: - https://github.com/charmed-hpc/slurmctld-operator -peers: - slurmctld-peer: - interface: slurmctld-peer requires: slurmd: interface: slurmd @@ -32,18 +29,8 @@ requires: interface: slurmdbd slurmrestd: interface: slurmrestd - influxdb-api: - interface: influxdb-api - elasticsearch: - interface: elasticsearch fluentbit: interface: fluentbit -provides: - prolog-epilog: - interface: prolog-epilog - grafana-source: - interface: grafana-source - scope: global assumes: - juju @@ -58,138 +45,84 @@ bases: channel: "22.04" architectures: [amd64] -parts: - charm: - build-packages: [git] - charm-python-packages: [setuptools] - - # Create a version file and pack it into the charm. This is dynamically generated - # as part of the build process for a charm to ensure that the git revision of the - # charm is always recorded in this version file. - version-file: - plugin: nil - build-packages: - - git - override-build: | - VERSION=$(git -C $CRAFT_PART_SRC/../../charm/src describe --dirty --always) - echo "Setting version to $VERSION" - echo $VERSION > $CRAFT_PART_INSTALL/version - stage: - - version config: options: - custom-slurm-repo: - type: string - default: "" - description: > - Use a custom repository for Slurm installation. - - This can be set to the Organization's local mirror/cache of packages and - supersedes the Omnivector repositories. Alternatively, it can be used to - track a `testing` Slurm version, e.g. by setting to - `ppa:omnivector/osd-testing`. - - Note: The configuration `custom-slurm-repo` must be set *before* - deploying the units. Changing this value after deploying the units will - not reinstall Slurm. cluster-name: type: string default: osd-cluster - description: > + description: | Name to be recorded in database for jobs from this cluster. This is important if a single database is used to record information from multiple Slurm-managed clusters. + default-partition: type: string default: "" - description: > + description: | Default Slurm partition. This is only used if defined, and must match an existing partition. - custom-config: + + slurm-conf-parameters: type: string default: "" - description: > - User supplied Slurm configuration. - - This value supplements the charm supplied `slurm.conf` that is used for - Slurm Controller and Compute nodes. + description: | + User supplied Slurm configuration as a multiline string. Example usage: - $ juju config slurmcltd custom-config="FirstJobId=1234" - proctrack-type: - type: string - default: proctrack/cgroup - description: > - Identifies the plugin to be used for process tracking on a job step - basis. - cgroup-config: + $ juju config slurmcltd slurm-conf-parameters="$(cat additional.conf)" + + cgroup-parameters: type: string default: | CgroupAutomount=yes ConstrainCores=yes - description: > - Configuration content for `cgroup.conf`. + description: | + User supplied configuration for `cgroup.conf`. health-check-params: default: "" type: string - description: > + description: | Extra parameters for NHC command. This option can be used to customize how NHC is called, e.g. to send an - e-mail to an admin when NHC detects an error set this value to + e-mail to an admin when NHC detects an error set this value to. `-M admin@domain.com`. + health-check-interval: default: 600 type: int description: Interval in seconds between executions of the Health Check. + health-check-state: default: "ANY,CYCLE" type: string description: Only run the Health Check on nodes in this state. - - acct-gather-frequency: - type: string - default: "task=30" - description: > - Accounting and profiling sampling intervals for the acct_gather plugins. - - Note: A value of `0` disables the periodic sampling. In this case, the - accounting information is collected when the job terminates. - - Example usage: - $ juju config slurmcltd acct-gather-frequency="task=30,network=30" - acct-gather-custom: - type: string - default: "" - description: > - User supplied `acct_gather.conf` configuration. - - This value supplements the charm supplied `acct_gather.conf` file that is - used for configuring the acct_gather plugins. actions: show-current-config: - description: > + description: | Display the currently used `slurm.conf`. - Note: This file only exists in `slurmctld` charm and is automatically - distributed to all compute nodes by Slurm. - Example usage: - $ juju run-action slurmctld/leader --format=json --wait | jq .[].results.slurm.conf | xargs -I % -0 python3 -c 'print(%)' + + ```bash + juju run slurmctld/leader show-current-config \ + --quiet --format=json | jq .[].results.slurm.conf | xargs -I % -0 python3 -c 'print(%)' + ``` + drain: - description: > + description: | Drain specified nodes. Example usage: - $ juju run-action slurmctld/leader drain nodename=node-[1,2] reason="Updating kernel" + $ juju run slurmctld/leader drain nodename="node-[1,2]" reason="Updating kernel" params: nodename: type: string - description: The nodes to drain, using the Slurm format, e.g. `node-[1,2]`. + description: The nodes to drain, using the Slurm format, e.g. `"node-[1,2]"`. reason: type: string description: Reason to drain the nodes. @@ -197,24 +130,17 @@ actions: - nodename - reason resume: - description: > + description: | Resume specified nodes. Note: Newly added nodes will remain in the `down` state until configured, with the `node-configured` action. - Example usage: $ juju run-action slurmctld/leader resume nodename=node-[1,2] + Example usage: $ juju run slurmctld/leader resume nodename="node-[1,2]" params: nodename: type: string - description: > - The nodes to resume, using the Slurm format, e.g. `node-[1,2]`. + description: | + The nodes to resume, using the Slurm format, e.g. `"node-[1,2]"`. required: - nodename - - influxdb-info: - description: > - Get InfluxDB info. - - This action returns the host, port, username, password, database, and - retention policy regarding to InfluxDB. diff --git a/requirements.txt b/requirements.txt index 1e1821d..46ded0a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,3 @@ -ops==2.* -influxdb==5.3.1 -jinja2==3.1.3 -distro -pycryptodome +ops==2.14.0 +distro==1.9.0 +pycryptodome==3.20.0 diff --git a/src/charm.py b/src/charm.py index 6e2cf01..56f96b0 100755 --- a/src/charm.py +++ b/src/charm.py @@ -4,27 +4,44 @@ """SlurmctldCharm.""" -import copy import logging import shlex import subprocess -from pathlib import Path -from typing import List +from typing import Any, Dict, List, Optional, Union from charms.fluentbit.v0.fluentbit import FluentbitClient -from interface_elasticsearch import Elasticsearch -from interface_grafana_source import GrafanaSource -from interface_influxdb import InfluxDB -from interface_prolog_epilog import PrologEpilog -from interface_slurmctld_peer import SlurmctldPeer -from interface_slurmd import Slurmd -from interface_slurmdbd import Slurmdbd -from interface_slurmrestd import Slurmrestd -from ops.charm import CharmBase, LeaderElectedEvent -from ops.framework import StoredState -from ops.main import main -from ops.model import ActiveStatus, BlockedStatus, WaitingStatus -from slurmctld_ops import SlurmctldManager +from constants import CHARM_MAINTAINED_SLURM_CONF_PARAMETERS, FLUENTBIT_CONFIG, SLURM_CONF_PATH +from interface_slurmd import ( + PartitionAvailableEvent, + PartitionUnavailableEvent, + Slurmd, + SlurmdAvailableEvent, + SlurmdDepartedEvent, +) +from interface_slurmdbd import ( + Slurmdbd, + SlurmdbdAvailableEvent, + SlurmdbdUnavailableEvent, +) +from interface_slurmrestd import ( + Slurmrestd, + SlurmrestdAvailableEvent, +) +from ops import ( + ActionEvent, + ActiveStatus, + BlockedStatus, + CharmBase, + ConfigChangedEvent, + InstallEvent, + RelationCreatedEvent, + StoredState, + UpdateStatusEvent, + WaitingStatus, + main, +) +from slurm_conf_editor import slurm_conf_as_string +from slurmctld_ops import SlurmctldManager, is_container logger = logging.getLogger() @@ -39,540 +56,384 @@ def __init__(self, *args): super().__init__(*args) self._stored.set_default( + default_partition=str(), jwt_key=str(), munge_key=str(), + new_nodes=[], + nhc_params=str(), slurm_installed=False, - slurmd_available=False, - slurmrestd_available=False, - slurmdbd_available=False, - down_nodes=[], + slurmdbd_host=str(), + user_supplied_slurm_conf_params=str(), ) - self._slurm_manager = SlurmctldManager(self, "slurmctld") + self._slurmctld_manager = SlurmctldManager() + self._fluentbit = FluentbitClient(self, "fluentbit") self._slurmd = Slurmd(self, "slurmd") self._slurmdbd = Slurmdbd(self, "slurmdbd") self._slurmrestd = Slurmrestd(self, "slurmrestd") - self._slurmctld_peer = SlurmctldPeer(self, "slurmctld-peer") - self._prolog_epilog = PrologEpilog(self, "prolog-epilog") - - self._grafana = GrafanaSource(self, "grafana-source") - self._influxdb = InfluxDB(self, "influxdb-api") - self._elasticsearch = Elasticsearch(self, "elasticsearch") - self._fluentbit = FluentbitClient(self, "fluentbit") event_handler_bindings = { + # Charm lifecycle hook events self.on.install: self._on_install, - self.on.upgrade_charm: self._on_upgrade, self.on.update_status: self._on_update_status, - self.on.config_changed: self._on_write_slurm_config, - self.on.leader_elected: self._on_leader_elected, - # slurm component lifecycle events + self.on.config_changed: self._on_config_changed, + # slurmdbd lifecycle hook events self._slurmdbd.on.slurmdbd_available: self._on_slurmdbd_available, self._slurmdbd.on.slurmdbd_unavailable: self._on_slurmdbd_unavailable, - self._slurmd.on.slurmd_available: self._on_write_slurm_config, - self._slurmd.on.slurmd_unavailable: self._on_write_slurm_config, - self._slurmd.on.slurmd_departed: self._on_write_slurm_config, + # slurmd lifecycle hook events + self._slurmd.on.partition_available: self._on_write_slurm_conf, + self._slurmd.on.partition_unavailable: self._on_write_slurm_conf, + self._slurmd.on.slurmd_available: self._on_write_slurm_conf, + self._slurmd.on.slurmd_departed: self._on_write_slurm_conf, + # slurmrestd available self._slurmrestd.on.slurmrestd_available: self._on_slurmrestd_available, - self._slurmrestd.on.slurmrestd_unavailable: self._on_write_slurm_config, - # NOTE: a second slurmctld should get the jwt/munge keys and configure them - self._slurmctld_peer.on.slurmctld_peer_available: self._on_write_slurm_config, # fluentbit self.on["fluentbit"].relation_created: self._on_fluentbit_relation_created, - # Addons lifecycle events - self._prolog_epilog.on.prolog_epilog_available: self._on_write_slurm_config, - self._prolog_epilog.on.prolog_epilog_unavailable: self._on_write_slurm_config, - self._grafana.on.grafana_available: self._on_grafana_available, - self._influxdb.on.influxdb_available: self._on_influxdb_available, - self._influxdb.on.influxdb_unavailable: self._on_write_slurm_config, - self._elasticsearch.on.elasticsearch_available: self._on_elasticsearch_available, - self._elasticsearch.on.elasticsearch_unavailable: self._on_write_slurm_config, # actions - self.on.show_current_config_action: self._on_show_current_config, - self.on.drain_action: self._drain_nodes_action, - self.on.resume_action: self._resume_nodes_action, - self.on.influxdb_info_action: self._infludb_info_action, + self.on.show_current_config_action: self._on_show_current_config_action, + self.on.drain_action: self._on_drain_nodes_action, + self.on.resume_action: self._on_resume_nodes_action, } for event, handler in event_handler_bindings.items(): self.framework.observe(event, handler) - @property - def hostname(self): - """Return the hostname.""" - return self._slurm_manager.hostname - - @property - def port(self): - """Return the port.""" - return self._slurm_manager.port - - @property - def cluster_name(self) -> str: - """Return the cluster name.""" - return self.config.get("cluster-name") - - @property - def _slurmctld_info(self): - return self._slurmctld_peer.get_slurmctld_info() - - @property - def slurmdbd_info(self): - """Return slurmdbd_info from relation.""" - return self._slurmdbd.get_slurmdbd_info() - - @property - def _slurmd_info(self) -> list: - return self._slurmd.get_slurmd_info() - - @property - def _cluster_info(self): - """Assemble information about the cluster.""" - cluster_info = {} - cluster_info["cluster_name"] = self.config.get("cluster-name") - cluster_info["custom_config"] = self.config.get("custom-config") - cluster_info["proctrack_type"] = self.config.get("proctrack-type") - cluster_info["cgroup_config"] = self.config.get("cgroup-config") + def _on_install(self, event: InstallEvent) -> None: + """Perform installation operations for slurmctld.""" + self.unit.status = WaitingStatus("Installing slurmctld") - interval = self.config.get("health-check-interval") - state = self.config.get("health-check-state") - nhc = self._slurm_manager.slurm_config_nhc_values(interval, state) - cluster_info.update(nhc) + if self._slurmctld_manager.install(): - return cluster_info + # Store the munge_key and jwt_rsa key in the stored state. + # NOTE: Use secrets instead of stored state when secrets are supported the framework. + if self.model.unit.is_leader(): + jwt_rsa = self._slurmctld_manager.generate_jwt_rsa() + self._stored.jwt_rsa = jwt_rsa - @property - def _addons_info(self): - """Assemble addons for slurm.conf.""" - return { - **self._assemble_prolog_epilog(), - **self._assemble_acct_gather_addon(), - **self._assemble_elastic_search_addon(), - } + munge_key = self._slurmctld_manager.generate_munge_key() + self._stored.munge_key = munge_key - def _assemble_prolog_epilog(self) -> dict: - """Generate the prolog_epilog section of the addons.""" - logger.debug("## Generating prolog epilog configuration") + self._slurmctld_manager.stop_munged() + self._slurmctld_manager.write_munge_key(munge_key) + self._slurmctld_manager.start_munged() - prolog_epilog = self._prolog_epilog.get_prolog_epilog() + self._slurmctld_manager.stop_slurmctld() + self._slurmctld_manager.write_jwt_rsa(jwt_rsa) + self._slurmctld_manager.start_slurmctld() - if prolog_epilog: - return {"prolog_epilog": prolog_epilog} + self.unit.set_workload_version(self._slurmctld_manager.version()) + self.slurm_installed = True + else: + self.unit.status = BlockedStatus("Only singleton slurmctld is supported.") + logger.debug("Secondary slurmctld not supported.") + event.defer() else: - return {} - - def _assemble_acct_gather_addon(self): - """Generate the acct gather section of the addons.""" - logger.debug("## Generating acct gather configuration") - - addons = {} - - influxdb_info = self._get_influxdb_info() - if influxdb_info: - addons["acct_gather"] = influxdb_info - addons["acct_gather"]["default"] = "all" - addons["acct_gather_profile"] = "acct_gather_profile/influxdb" - - # it is possible to setup influxdb or hdf5 profiles without the - # relation, using the custom-config section of slurm.conf. We need to - # support setting up the acct_gather configuration for this scenario - acct_gather_custom = self.config.get("acct-gather-custom") - if acct_gather_custom: - if not addons.get("acct_gather"): - addons["acct_gather"] = {} - - addons["acct_gather"]["custom"] = acct_gather_custom - - addons["acct_gather_frequency"] = self.config.get("acct-gather-frequency") - - return addons + self.unit.status = BlockedStatus("Error installing slurmctld") + logger.error("Cannot install slurmctld, please debug.") + event.defer() - def _assemble_elastic_search_addon(self): - """Generate the acct gather section of the addons.""" - logger.debug("## Generating elastic search addon configuration") - addon = {} + self._on_write_slurm_conf(event) + + def _on_config_changed(self, event: ConfigChangedEvent) -> None: + """Perform config-changed operations.""" + charm_config_nhc_params = str(self.config.get("health-check-params", "")) + if (charm_config_nhc_params != self._stored.nhc_params) and ( + charm_config_nhc_params != "" + ): + logger.debug("## NHC user supplied params changed, sending to slurmd.") + self._stored.nhc_params = charm_config_nhc_params + # Send the custom NHC parameters to all slurmd. + self._slurmd.set_nhc_params(charm_config_nhc_params) + + write_slurm_conf = False + if charm_config_default_partition := self.config.get("default-partition"): + if charm_config_default_partition != self._stored.default_partition: + logger.debug("## Default partition configuration changed.") + self._stored.default_partition = charm_config_default_partition + write_slurm_conf = True + + if ( + charm_config_slurm_conf_params := self.config.get("slurm-conf-parameters") + ) is not None: + if charm_config_slurm_conf_params != self._stored.user_supplied_slurm_conf_params: + logger.debug("## User supplied parameters changed.") + self._stored.user_supplied_slurm_conf_params = charm_config_slurm_conf_params + write_slurm_conf = True + + if write_slurm_conf: + logger.debug("## Emitting write-slurm-config event.") + self._on_write_slurm_conf(event) + + def _on_update_status(self, event: UpdateStatusEvent) -> None: + """Handle update status.""" + self._check_status() - elasticsearch_ingress = self._elasticsearch.elasticsearch_ingress - if elasticsearch_ingress: - suffix = f"/{self.cluster_name}/jobcomp" - addon = {"elasticsearch_address": f"{elasticsearch_ingress}{suffix}"} + def _on_show_current_config_action(self, event: ActionEvent) -> None: + """Show current slurm.conf.""" + slurm_conf = SLURM_CONF_PATH.read_text() + event.set_results({"slurm.conf": slurm_conf}) - return addon + def _on_fluentbit_relation_created(self, event: RelationCreatedEvent) -> None: + """Set up Fluentbit log forwarding.""" + logger.debug("## Configuring fluentbit") + self._fluentbit.configure(FLUENTBIT_CONFIG) + + def _on_slurmrestd_available(self, event: SlurmrestdAvailableEvent) -> None: + """Check that we have slurm_config when slurmrestd available otherwise defer the event.""" + if self.model.unit.is_leader(): + if self._check_status(): + slurm_conf = slurm_conf_as_string(self._assemble_slurm_conf()) + self._slurmrestd.set_slurm_config_on_app_relation_data(slurm_conf) + return + logger.debug("Cluster not ready yet, deferring event.") + event.defer() - def set_slurmd_available(self, flag: bool): - """Set stored value of slurmd available.""" - self._stored.slurmd_available = flag + def _on_slurmdbd_available(self, event: SlurmdbdAvailableEvent) -> None: + self._stored.slurmdbd_host = event.slurmdbd_host + self._on_write_slurm_conf(event) - def _set_slurmdbd_available(self, flag: bool): - """Set stored value of slurmdbd available.""" - self._stored.slurmdbd_available = flag + def _on_slurmdbd_unavailable(self, event: SlurmdbdUnavailableEvent) -> None: + self._stored.slurmdbd_host = "" + self._check_status() - def set_slurmrestd_available(self, flag: bool): - """Set stored value of slurmdrest available.""" - self._stored.slurmrestd_available = flag + def _on_drain_nodes_action(self, event: ActionEvent) -> None: + """Drain specified nodes.""" + nodes = event.params["nodename"] + reason = event.params["reason"] - def _is_leader(self): - return self.model.unit.is_leader() + logger.debug(f"#### Draining {nodes} because {reason}.") + event.log(f"Draining {nodes} because {reason}.") - def is_slurm_installed(self): - """Return true/false based on whether or not slurm is installed.""" - return self._stored.slurm_installed + try: + cmd = f'scontrol update nodename={nodes} state=drain reason="{reason}"' + subprocess.check_output(shlex.split(cmd)) + event.set_results({"status": "draining", "nodes": nodes}) + except subprocess.CalledProcessError as e: + event.fail(message=f"Error draining {nodes}: {e.output}") - def _on_show_current_config(self, event): - """Show current slurm.conf.""" - slurm_conf = self._slurm_manager.slurm_conf_path.read_text() - event.set_results({"slurm.conf": slurm_conf}) + def _on_resume_nodes_action(self, event: ActionEvent) -> None: + """Resume specified nodes.""" + nodes = event.params["nodename"] - def _on_install(self, event): - """Perform installation operations for slurmctld.""" - self.unit.status = WaitingStatus("Installing slurmctld") + logger.debug(f"#### Resuming {nodes}.") + event.log(f"Resuming {nodes}.") - successful_installation = self._slurm_manager.install() + try: + cmd = f"scontrol update nodename={nodes} state=resume" + subprocess.check_output(shlex.split(cmd)) + event.set_results({"status": "resuming", "nodes": nodes}) + except subprocess.CalledProcessError as e: + event.fail(message=f"Error resuming {nodes}: {e.output}") - self.unit.set_workload_version(self._slurm_manager.version()) + def _on_write_slurm_conf( + self, + event: Union[ + ConfigChangedEvent, + InstallEvent, + SlurmdbdAvailableEvent, + SlurmdDepartedEvent, + SlurmdAvailableEvent, + PartitionUnavailableEvent, + PartitionAvailableEvent, + ], + ) -> None: + """Check that we have what we need before we proceed.""" + logger.debug("### Slurmctld - _on_write_slurm_conf()") - if successful_installation: - self._stored.slurm_installed = True + # only the leader should write the config, restart, and scontrol reconf + if not self.model.unit.is_leader(): + return - # Store the munge_key and jwt_rsa key in the stored state. - # NOTE: Use leadership settings instead of stored state when - # leadership settings support becomes available in the framework. - if self._is_leader(): - # NOTE the backup controller should also have the jwt and munge - # keys configured. We should move these information to the - # peer relation. - self._stored.jwt_rsa = self._slurm_manager.generate_jwt_rsa() - self._stored.munge_key = self._slurm_manager.get_munge_key() - self._slurm_manager.write_jwt_rsa(self.get_jwt_rsa()) - else: - # NOTE: the secondary slurmctld should get the jwt and munge - # keys from the peer relation here - logger.debug("secondary slurmctld") + if not self._check_status(): + event.defer() + return - # all slurmctld should restart munged here, as it would assure - # munge is working - self._slurm_manager.restart_munged() + if slurm_config := self._assemble_slurm_conf(): + self._slurmctld_manager.stop_slurmctld() + self._slurmctld_manager.write_slurm_conf(slurm_config) + + # Write out any user_supplied_cgroup_parameters to /etc/slurm/cgroup.conf. + if user_supplied_cgroup_parameters := self.config.get("cgroup-parameters", ""): + self._slurmctld_manager.write_cgroup_conf(str(user_supplied_cgroup_parameters)) + + self._slurmctld_manager.start_slurmctld() + + self._slurmctld_manager.slurm_cmd("scontrol", "reconfigure") + + # Transitioning Nodes + # + # 1) Identify transitioning_nodes by comparing the new_nodes in StoredState with the + # new_nodes that come from slurmd relation data. + # + # 2) If there are transitioning_nodes, resume them, and update the new_nodes in + # StoredState. + new_nodes_from_stored_state = self.new_nodes + new_nodes_from_slurm_config = self._get_new_node_names_from_slurm_config(slurm_config) + + transitioning_nodes: list = [ + node + for node in new_nodes_from_stored_state + if node not in new_nodes_from_slurm_config + ] + + if len(transitioning_nodes) > 0: + self._resume_nodes(transitioning_nodes) + self.new_nodes = new_nodes_from_slurm_config.copy() + + # slurmrestd needs the slurm.conf file, so send it every time it changes. + if self._slurmrestd.is_joined is not False: + slurm_conf = slurm_conf_as_string(slurm_config) + self._slurmrestd.set_slurm_config_on_app_relation_data(slurm_conf) else: - self.unit.status = BlockedStatus("Error installing slurmctld") + logger.debug("## Should write slurm.conf, but we don't have it. " "Deferring.") event.defer() - self._check_status() - - def _on_fluentbit_relation_created(self, event): - """Set up Fluentbit log forwarding.""" - logger.debug("## Configuring fluentbit") - cfg = [] - cfg.extend(self._slurm_manager.fluentbit_config_nhc) - cfg.extend(self._slurm_manager.fluentbit_config_slurm) - self._fluentbit.configure(cfg) - - def _on_upgrade(self, event): - """Perform upgrade operations.""" - self.unit.set_workload_version(Path("version").read_text().strip()) - - def _on_update_status(self, event): - """Handle update status.""" - self._check_status() - - def _on_leader_elected(self, event: LeaderElectedEvent) -> None: - logger.debug("## slurmctld - leader elected") - - slurm_config = self._assemble_slurm_config() - accounted_nodes = self._assemble_all_nodes(slurm_config.get("partitions", [])) # noqa + def _assemble_slurm_conf(self) -> Dict[str, Any]: + """Return the slurm.conf parameters.""" + user_supplied_parameters = self._get_user_supplied_parameters() + + slurmd_parameters = self._slurmd.get_new_nodes_and_nodes_and_partitions() + + def _assemble_slurmctld_parameters() -> str: + # Preprocess merging slurmctld_parameters if they exist in the context + slurmctld_param_config = CHARM_MAINTAINED_SLURM_CONF_PARAMETERS[ + "SlurmctldParameters" + ].split(",") + user_config = [] + + if ( + user_supplied_slurmctld_parameters := user_supplied_parameters.get( + "SlurmctldParameters", "" + ) + != "" + ): + user_config.extend(user_supplied_slurmctld_parameters.split(",")) + + return ",".join(slurmctld_param_config + user_config) + + accounting_params = {} + if (slurmdbd_host := self._stored.slurmdbd_host) != "": + accounting_params = { + "AccountingStorageHost": slurmdbd_host, + "AccountingStorageType": "accounting_storage/slurmdbd", + "AccountingStoragePass": "/var/run/munge/munge.socket.2", + "AccountingStoragePort": "6819", + } + + slurm_conf = { + "ClusterName": self.cluster_name, + "SlurmctldAddr": self._slurmd_ingress_address, + "SlurmctldHost": self.hostname, + "SlurmctldParameters": _assemble_slurmctld_parameters(), + "ProctrackType": "proctrack/linuxproc" if is_container() else "proctrack/cgroup", + **accounting_params, + **CHARM_MAINTAINED_SLURM_CONF_PARAMETERS, + **slurmd_parameters, + **user_supplied_parameters, + } - def _check_status(self): # noqa C901 + logger.debug(f"slurm.conf: {slurm_conf}") + return slurm_conf + + def _get_user_supplied_parameters(self) -> Dict[Any, Any]: + """Gather, parse, and return the user supplied parameters.""" + user_supplied_parameters = {} + if custom_config := self.config.get("slurm-conf-parameters"): + user_supplied_parameters = { + line.split("=")[0]: line.split("=")[1] + for line in str(custom_config).split("\n") + if not line.startswith("#") and line.strip() != "" + } + return user_supplied_parameters + + def _get_new_node_names_from_slurm_config( + self, slurm_config: Dict[str, Any] + ) -> List[Optional[str]]: + """Given the slurm_config, return the nodes that are DownNodes with reason 'New node.'.""" + new_node_names = [] + if down_nodes_from_slurm_config := slurm_config.get("down_nodes"): + for down_nodes_entry in down_nodes_from_slurm_config: + for down_node_name in down_nodes_entry["DownNodes"]: + if down_nodes_entry["Reason"] == "New node.": + new_node_names.append(down_node_name) + return new_node_names + + def _check_status(self) -> bool: # noqa C901 """Check for all relations and set appropriate status. This charm needs these conditions to be satisfied in order to be ready: - - Slurm components installed. - - Munge running. - - slurmdbd node running. - - slurmd inventory. + - Slurmctld component installed + - Munge running """ - # NOTE: slurmd and slurmrestd are not needed for slurmctld to work, - # only for the cluster to operate. But we need slurmd inventory - # to assemble slurm.conf - if not self._stored.slurm_installed: + if self.slurm_installed is not True: self.unit.status = BlockedStatus("Error installing slurmctld") return False - if not self._slurm_manager.check_munged(): + if not self._slurmctld_manager.check_munged(): self.unit.status = BlockedStatus("Error configuring munge key") return False - # statuses of mandatory components: - # - joined: someone executed juju relate slurmctld foo - # - available: the units exchanged data through the relation - # NOTE: slurmrestd is not mandatory for the cluster to work, that's why - # it is not acounted for in here - statuses = { - "slurmd": { - "available": self._stored.slurmd_available, - "joined": self._slurmd.is_joined, - }, - "slurmdbd": { - "available": self._stored.slurmdbd_available, - "joined": self._slurmdbd.is_joined, - }, - } - - relations_needed = [] - waiting_on = [] - for component in statuses.keys(): - if not statuses[component]["joined"]: - relations_needed.append(component) - if not statuses[component]["available"]: - waiting_on.append(component) - - if len(relations_needed): - msg = f"Need relations: {','.join(relations_needed)}" - self.unit.status = BlockedStatus(msg) - return False - - if len(waiting_on): - msg = f"Waiting on: {','.join(waiting_on)}" - self.unit.status = WaitingStatus(msg) - return False - - self.unit.status = ActiveStatus("slurmctld available") + self.unit.status = ActiveStatus("") return True - def get_munge_key(self): + def get_munge_key(self) -> Optional[str]: """Get the stored munge key.""" - return self._stored.munge_key + return str(self._stored.munge_key) - def get_jwt_rsa(self): + def get_jwt_rsa(self) -> Optional[str]: """Get the stored jwt_rsa key.""" - return self._stored.jwt_rsa - - def _assemble_partitions(self, slurmd_info): - """Make any needed modifications to partition data.""" - slurmd_info_tmp = copy.deepcopy(slurmd_info) - default_partition_from_config = self.config.get("default-partition") - - for partition in slurmd_info: - # Deep copy the partition to a tmp var so we can modify it as - # needed whilst not modifying the object we are iterating over. - partition_tmp = copy.deepcopy(partition) - # Extract the partition_name from the partition. - partition_name = partition["partition_name"] - - # Check that the default_partition isn't defined in the charm - # config. - # If the user hasn't provided a default partition, then we infer - # the partition_default by defaulting to the "configurator" - # partition. - if default_partition_from_config: - if default_partition_from_config == partition_name: - partition_tmp["partition_default"] = "YES" - - slurmd_info_tmp.remove(partition) - slurmd_info_tmp.append(partition_tmp) - - return slurmd_info_tmp - - def _assemble_slurm_config(self): - """Assemble and return the slurm config.""" - logger.debug("## Assembling new slurm.conf") - - slurmctld_info = self._slurmctld_info - slurmdbd_info = self.slurmdbd_info - slurmd_info = self._slurmd_info - cluster_info = self._cluster_info - - logger.debug("######## INFO") - logger.debug(f"## slurmd: {slurmd_info}") - logger.debug(f"## slurmctld_info: {slurmctld_info}") - logger.debug(f"## slurmdbd_info: {slurmdbd_info}") - logger.debug(f"## cluster_info: {cluster_info}") - logger.debug("######## INFO - end") - - if not (slurmctld_info and slurmd_info and slurmdbd_info): - return {} - - addons_info = self._addons_info - partitions_info = self._assemble_partitions(slurmd_info) - down_nodes = self._assemble_down_nodes(slurmd_info) - - logger.debug(f"#### addons: {addons_info}") - logger.debug(f"#### partitions_info: {partitions_info}") - logger.debug(f"#### Down nodes: {down_nodes}") - - return { - "partitions": partitions_info, - "down_nodes": down_nodes, - **slurmctld_info, - **slurmdbd_info, - **addons_info, - **cluster_info, - } - - def _on_slurmrestd_available(self, event): - """Set slurm_config on the relation when slurmrestd available.""" - if not self._check_status(): - event.defer() - return - - slurm_config = self._assemble_slurm_config() - - if not slurm_config: - self.unit.status = BlockedStatus("Cannot generate slurm_config - deferring event.") - event.defer() - return - - def _on_slurmdbd_available(self, event): - self._set_slurmdbd_available(True) - self._on_write_slurm_config(event) - - def _on_slurmdbd_unavailable(self, event): - self._set_slurmdbd_available(False) - self._check_status() - - def _on_write_slurm_config(self, event): - """Check that we have what we need before we proceed.""" - logger.debug("### Slurmctld - _on_write_slurm_config()") - - # only the leader should write the config, restart, and scontrol reconf - if not self._is_leader(): - return - - if not self._check_status(): - event.defer() - return - - slurm_config = self._assemble_slurm_config() - if slurm_config: - self._slurm_manager.render_slurm_configs(slurm_config) - - # restart is needed if nodes are added/removed from the cluster - self._slurm_manager.restart_slurmctld() - self._slurm_manager.slurm_cmd("scontrol", "reconfigure") - - # send the custom NHC parameters to all slurmd - self._slurmd.set_nhc_params(self.config.get("health-check-params")) - - # check for "not new anymore" nodes, i.e., nodes that run the - # node-configured action. Those nodes are not anymore in the - # DownNodes section in the slurm.conf, but we need to resume them - # manually and update the internal cache - down_nodes = slurm_config["down_nodes"] - configured_nodes = self._assemble_configured_nodes(down_nodes) - logger.debug(f"### configured nodes: {configured_nodes}") - self._resume_nodes(configured_nodes) - self._stored.down_nodes = down_nodes.copy() - - # slurmrestd needs the slurm.conf file, so send it every time it changes - if self._stored.slurmrestd_available: - self._slurmrestd.set_slurm_config_on_app_relation_data(slurm_config) - # NOTE: scontrol reconfigure does not restart slurmrestd - self._slurmrestd.restart_slurmrestd() - else: - logger.debug("## Should rewrite slurm.conf, but we don't have it. " "Deferring.") - event.defer() - - @staticmethod - def _assemble_all_nodes(slurmd_info: list) -> List[str]: - """Parse slurmd_info and return a list with all hostnames.""" - nodes = [] - for partition in slurmd_info: - for node in partition["inventory"]: - nodes.append(node["node_name"]) - return nodes - - @staticmethod - def _assemble_down_nodes(slurmd_info): - """Parse partitions' nodes and assemble a list of DownNodes.""" - down_nodes = [] - for partition in slurmd_info: - for node in partition["inventory"]: - if node["new_node"]: - down_nodes.append(node["node_name"]) - - return down_nodes - - def _assemble_configured_nodes(self, down_nodes): - """Assemble list of nodes that are not new anymore. - - new_node status is removed with an action, this method returns a list - of nodes that were previously new but are not anymore. - """ - configured_nodes = [] - for node in self._stored.down_nodes: - if node not in down_nodes: - configured_nodes.append(node) - - return configured_nodes + return str(self._stored.jwt_rsa) - def _resume_nodes(self, nodelist): + def _resume_nodes(self, nodelist: List[str]) -> None: """Run scontrol to resume the specified node list.""" nodes = ",".join(nodelist) update_cmd = f"update nodename={nodes} state=resume" - self._slurm_manager.slurm_cmd("scontrol", update_cmd) + self._slurmctld_manager.slurm_cmd("scontrol", update_cmd) - def _on_grafana_available(self, event): - """Create the grafana-source if we are the leader and have influxdb.""" - if not self._is_leader(): - return - - influxdb_info = self._get_influxdb_info() - - if influxdb_info: - self._grafana.set_grafana_source_info(influxdb_info) - else: - logger.error("## Can not set Grafana source: missing influxdb relation") - - def _on_influxdb_available(self, event): - """Assemble addons to forward slurm data to influxdb.""" - self._on_write_slurm_config(event) - - def _on_elasticsearch_available(self, event): - """Assemble addons to forward Slurm data to elasticsearch.""" - self._on_write_slurm_config(event) - - def _get_influxdb_info(self) -> dict: - """Return influxdb info.""" - return self._influxdb.get_influxdb_info() - - def _drain_nodes_action(self, event): - """Drain specified nodes.""" - nodes = event.params["nodename"] - reason = event.params["reason"] - - logger.debug(f"#### Draining {nodes} because {reason}.") - event.log(f"Draining {nodes} because {reason}.") + @property + def cluster_name(self) -> str: + """Return the cluster name.""" + cluster_name = "charmedhpc" + if cluster_name_from_config := self.config.get("cluster-name"): + cluster_name = str(cluster_name_from_config) + return cluster_name - try: - cmd = f'scontrol update nodename={nodes} state=drain reason="{reason}"' - subprocess.check_output(shlex.split(cmd)) - event.set_results({"status": "draining", "nodes": nodes}) - except subprocess.CalledProcessError as e: - event.fail(message=f"Error draining {nodes}: {e.output}") + @property + def new_nodes(self) -> list: + """Return new_nodes from StoredState. - def _resume_nodes_action(self, event): - """Resume specified nodes.""" - nodes = event.params["nodename"] + Note: Ignore the attr-defined for now until this is fixed upstream. + """ + return list(self._stored.new_nodes) # type: ignore[call-overload] - logger.debug(f"#### Resuming {nodes}.") - event.log(f"Resuming {nodes}.") + @new_nodes.setter + def new_nodes(self, new_nodes: List[Any]) -> None: + """Set the new nodes.""" + self._stored.new_nodes = new_nodes - try: - cmd = f"scontrol update nodename={nodes} state=resume" - subprocess.check_output(shlex.split(cmd)) - event.set_results({"status": "resuming", "nodes": nodes}) - except subprocess.CalledProcessError as e: - event.fail(message=f"Error resuming {nodes}: {e.output}") + @property + def hostname(self) -> str: + """Return the hostname.""" + return self._slurmctld_manager.hostname - def _infludb_info_action(self, event): - influxdb_info = self._get_influxdb_info() + @property + def _slurmd_ingress_address(self) -> str: + """Return the ingress_address from the slurmd relation if it exists.""" + ingress_address = "" + if binding := self.model.get_binding("slurmd"): + ingress_address = f"{binding.network.ingress_address}" + return ingress_address - if not influxdb_info: - info = "not related" - else: - # Juju does not like underscores in dictionaries - info = {k.replace("_", "-"): v for k, v in influxdb_info.items()} + @property + def slurm_installed(self) -> bool: + """Return slurm_installed from stored state.""" + return True if self._stored.slurm_installed is True else False - logger.debug(f"## InfluxDB-info action: {influxdb_info}") - event.set_results({"influxdb": info}) + @slurm_installed.setter + def slurm_installed(self, slurm_installed: bool) -> None: + """Set slurm_installed in stored state.""" + self._stored.slurm_installed = slurm_installed if __name__ == "__main__": - main(SlurmctldCharm) + main.main(SlurmctldCharm) diff --git a/src/constants.py b/src/constants.py new file mode 100644 index 0000000..c074c34 --- /dev/null +++ b/src/constants.py @@ -0,0 +1,99 @@ +# Copyright 2024 Omnivector, LLC. +# See LICENSE file for licensing details. +"""This module provides constants for the slurmctld-operator charm.""" +from pathlib import Path + +SLURM_CONF_PATH = Path("/etc/slurm/slurm.conf") +SLURM_USER = "slurm" +SLURM_GROUP = "slurm" + +CHARM_MAINTAINED_SLURM_CONF_PARAMETERS = { + "AuthAltParameters": "jwt_key=/var/spool/slurmctldjwt_hs256.key", + "AuthAltTypes": "auth/jwt", + "AuthInfo": "/var/run/munge/munge.socket.2", + "AuthType": "auth/munge", + "GresTypes": "gpu", + "HealthCheckInterval": "600", + "HealthCheckNodeState": "ANY,CYCLE", + "HealthCheckProgram": "/usr/sbin/omni-nhc-wrapper", + "MailProg": "/usr/bin/mail.mailutils", + "PluginDir": "/usr/lib/x86_64-linux-gnu/slurm-wlm", + "PlugStackConfig": "/etc/slurm/plugstack.conf.d/plugstack.conf", + "SelectType": "select/cons_tres", + "SlurmctldPort": "6817", + "SlurmdPort": "6818", + "StateSaveLocation": "/var/spool/slurmctld", + "SlurmdSpoolDir": "/var/spool/slurmd", + "SlurmctldParameters": "enable_configless", + "SlurmctldLogFile": "/var/log/slurm/slurmctld.log", + "SlurmdLogFile": "/var/log/slurm/slurmctld.log", + "SlurmdPidFile": "/var/run/slurmd.pid", + "SlurmctldPidFile": "/var/run/slurmctld.pid", + "SlurmUser": SLURM_USER, + "SlurmdUser": "root", + "RebootProgram": '"/usr/sbin/reboot --reboot"', +} + + +FLUENTBIT_CONFIG = [ + { + "input": [ + ("name", "tail"), + ("path", "/var/log/slurm/slurmctld.log"), + ("path_key", "filename"), + ("tag", "slurmctld"), + ("parser", "slurm"), + ] + }, + { + "parser": [ + ("name", "slurm"), + ("format", "regex"), + ("regex", r"^\[(?