Skip to content

Commit

Permalink
refactor: docstrings and type hints for configuration module
Browse files Browse the repository at this point in the history
  • Loading branch information
markuslevonyak committed May 2, 2024
1 parent ba0c253 commit 33b9533
Showing 1 changed file with 77 additions and 48 deletions.
125 changes: 77 additions & 48 deletions pantos/common/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import logging
import os
import pathlib
import typing

import cerberus # type: ignore
import dotenv
Expand Down Expand Up @@ -57,63 +58,101 @@ class ConfigError(BaseError):


class Config:
"""TODO
"""Class that loads and parses a configuration file and provides
dictionary-like access to the configuration values.
Attributes
----------
default_file_name : str
The default configuration file name to be used when no explicit
file path is provided for loading the configuration.
"""
def __init__(self, default_file_name):
"""TODO
def __init__(self, default_file_name: str):
"""Initialize a configuration instance.
Parameters
----------
default_file_name : str
The default configuration file name to be used when no
explicit file path is provided for loading the
configuration.
"""
assert isinstance(default_file_name, str)
self.default_file_name = default_file_name
self.__config_dict = None
self.__config_dict: dict[str, typing.Any] | None = None

def __getitem__(self, key: str) -> typing.Any:
"""Get the configuration value for a given key.
Parameters
----------
key : str
The key to get the configuration value for.
def __getitem__(self, key):
"""TODO
Returns
-------
Any
The configuration value.
Raises
------
ConfigError
If the configuration has not been loaded.
"""
assert isinstance(key, str)
if self.__config_dict is None:
raise ConfigError('configuration not yet loaded')
return self.__config_dict[key]

def __str__(self):
"""TODO
"""
def __str__(self) -> str:
return str(self.__config_dict)

def is_loaded(self):
"""TODO
def is_loaded(self) -> bool:
"""Determine if the configuration has been loaded.
Returns
-------
bool
True if the configuration has been loaded.
"""
return self.__config_dict is not None

def load(self, validation_schema, file_path=None):
"""TODO
def load(self, validation_schema: dict[str, typing.Any],
file_path: str | None = None) -> None:
"""Load the configuration from a file.
Parameters
----------
validation_schema : dict
The Cerberus validation schema used for validating the
loaded configuration.
file_path : str, optional
The path to the configuration file to load. If no file path
is provided, the configuration is loaded from a default
configuration file path.
Raises
------
ConfigError
If the validation schema is invalid, the configuration file
cannot be found, or the configuration file is invalid.
"""
assert isinstance(validation_schema, dict)
assert file_path is None or isinstance(file_path, str)
# Find the configuration file
path = self.__find_file(file_path)
_logger.info('loading configuration from file {}'.format(path))
# Parse the configuration file
_logger.info(f'loading configuration from file {path}')
config_dict = self.__parse_file(path)
# Validate the configuration and add default configuration values
self.__config_dict = self.__validate(config_dict, validation_schema)

def __find_file(self, file_path=None):
"""TODO
"""
assert file_path is None or isinstance(file_path, str)
def __find_file(self, file_path: str | None) -> pathlib.Path:
if file_path is not None:
# Use the specified configuration file
path = pathlib.Path(file_path)
if not path.is_file():
raise ConfigError('no configuration file found at '
'{}'.format(file_path))
raise ConfigError(
f'no configuration file found at {file_path}')
return path
# Find the configuration file at common locations
for path in _CONFIGURATION_PATHS:
Expand All @@ -130,17 +169,12 @@ def __find_file(self, file_path=None):
# No configuration file found at common locations
raise ConfigError('no configuration file found')

def __parse_file(self, path):
"""TODO
"""
assert isinstance(path, pathlib.Path)
def __parse_file(self, path: pathlib.Path) -> dict[str, typing.Any]:
# List of potential .env file paths
env_files = [
pathlib.Path(path.as_posix() + '.env'),
pathlib.Path(path.with_suffix('.env').as_posix())
]

# Iterate over the potential .env file paths
for env_file in env_files:
if env_file.is_file():
Expand All @@ -157,27 +191,22 @@ def __parse_file(self, path):
if hasattr(error, 'problem_mark'):
line = error.problem_mark.line + 1
column = error.problem_mark.column + 1
raise ConfigError('YAML code in configuration file '
'invalid at line {} and '
'column {}'.format(line, column))
raise ConfigError('YAML code in configuration file invalid at '
f'line {line} and column {column}')
else:
raise ConfigError('YAML code in configuration file '
'invalid')

def __validate(self, config_dict, validation_schema):
"""TODO
raise ConfigError('YAML code in configuration file invalid')

"""
assert isinstance(config_dict, dict)
assert isinstance(validation_schema, dict)
def __validate(
self, config_dict: dict[str, typing.Any],
validation_schema: dict[str, typing.Any]) -> dict[str, typing.Any]:
# Create the validator and validate the validation schema
try:
validator = _CustomValidator(validation_schema)
except cerberus.schema.SchemaError as error:
raise ConfigError('validation schema invalid: {}'.format(error))
raise ConfigError(f'validation schema invalid: {error}')
# Validate the configuration
if not validator.validate(config_dict):
raise ConfigError('configuration file invalid: '
'{}'.format(validator.errors))
raise ConfigError(
f'configuration file invalid: {validator.errors}')
# Add default configuration values
return validator.normalized(config_dict)

0 comments on commit 33b9533

Please sign in to comment.