diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f6c8f58..ab92611 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -33,6 +33,10 @@ jobs: run: | . "${CONDA}/bin/activate" test-env pytest + - name: Run mypy + run: | + . "${CONDA}/bin/activate" test-env + mypy - name: Run pylint run: | . "${CONDA}/bin/activate" test-env diff --git a/.github/workflows/prepare_environment.sh b/.github/workflows/prepare_environment.sh index 0446196..7aa2cec 100755 --- a/.github/workflows/prepare_environment.sh +++ b/.github/workflows/prepare_environment.sh @@ -3,9 +3,9 @@ set -euo pipefail PYTHON_VERSION=$1 -conda create --quiet -c conda-forge -c free -n test-env \ +conda create --quiet -c conda-forge -n test-env \ python="$PYTHON_VERSION" \ "pytest>=4.6" pylint pytest-cov pycodestyle \ - setuptools setuptools_scm wheel + setuptools setuptools_scm wheel mypy source "${CONDA}/bin/activate" test-env pip install ".[testing]" diff --git a/pyproject.toml b/pyproject.toml index d06f103..4244047 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,3 +9,10 @@ build-backend = "setuptools.build_meta" [tool.black] line-length = 120 target-version = ['py39'] + +[tool.mypy] +strict = true +files = 'src/diraccfg' + +[tool.pylint."messages control"] +disable = ["unsubscriptable-object"] diff --git a/src/diraccfg/__main__.py b/src/diraccfg/__main__.py index 3249a48..23d0c5e 100644 --- a/src/diraccfg/__main__.py +++ b/src/diraccfg/__main__.py @@ -1,13 +1,16 @@ +from __future__ import annotations + import argparse import json import os import sys +from typing import NoReturn from .cfg import CFG from .versions import parseVersion -def parseArgs(): +def parseArgs() -> NoReturn: parser = argparse.ArgumentParser(description="Parser for DIRAC cfg files") subparsers = parser.add_subparsers(help="Actions to run with the ") @@ -27,7 +30,7 @@ def parseArgs(): sys.exit(0) -def dumpAsJson(cfgFilename): +def dumpAsJson(cfgFilename: str) -> None: if not os.path.isfile(cfgFilename): sys.stderr.write(f"ERROR: {cfgFilename} does not exist\n") sys.exit(1) @@ -35,7 +38,7 @@ def dumpAsJson(cfgFilename): print(json.dumps(res.getAsDict())) -def sortVersions(allow_pre_releases=False): +def sortVersions(allow_pre_releases: bool = False) -> None: try: objs = json.loads(sys.stdin.read()) except getattr(json, "JSONDecodeError", ValueError): @@ -56,7 +59,7 @@ def sortVersions(allow_pre_releases=False): continue parsedVersions[obj] = (major, minor, patch, pre) - print(json.dumps(sorted(parsedVersions, key=parsedVersions.get, reverse=True))) + print(json.dumps(sorted(parsedVersions, key=parsedVersions.__getitem__, reverse=True))) if __name__ == "__main__": diff --git a/src/diraccfg/cfg.py b/src/diraccfg/cfg.py index 32b40e7..3fe6c19 100644 --- a/src/diraccfg/cfg.py +++ b/src/diraccfg/cfg.py @@ -1,5 +1,5 @@ """ This is the main module that interprets DIRAC cfg format""" - +from __future__ import annotations import copy import os @@ -7,26 +7,78 @@ import zipfile import threading +from threading import Lock, RLock +from typing import Generic, Literal, TypeVar, overload, Callable, Union, TYPE_CHECKING, cast +from collections.abc import Iterator + +if TYPE_CHECKING: + from typing_extensions import TypedDict, ParamSpec, NotRequired + from _typeshed import StrOrBytesPath + + T = TypeVar("T") + P = ParamSpec("P") + + class DOKReturnType(TypedDict, Generic[T]): + OK: Literal[True] + Value: T + + class DErrorReturnType(TypedDict): + OK: Literal[False] + Message: str + + DReturnType = Union[DOKReturnType[T], DErrorReturnType] + + class CFGKeySection(TypedDict): + key: str + value: CFG + comment: str + levelsBelow: NotRequired[str] + + class CFGKey(TypedDict): + key: str + value: CFG | str + comment: str + levelsBelow: NotRequired[str] + + CFGAsDict = dict[str, Union["CFGAsDict", str]] + + ModificationType = Union[ + tuple[Literal["delOpt", "delSec"], str, int, str], + tuple[Literal["addOpt", "modOpt", "addSec"], str, int, str, str], + tuple[Literal["modSec"], str, int, list["ModificationType"], str], + ] -def S_ERROR(messageString=""): + +def S_ERROR(messageString: str = "") -> DErrorReturnType: return {"OK": False, "Message": str(messageString)} -def S_OK(value=""): +# mypy doesn't understand default parameter values with generics so use overloads (python/mypy#3737) +@overload +def S_OK() -> DOKReturnType[Literal[""]]: + ... + + +@overload +def S_OK(value: T) -> DOKReturnType[T]: + ... + + +def S_OK(value=""): # type: ignore return {"OK": True, "Value": value} class ListDummy: - def fromChar(self, inputString, sepChar=","): - if not ( - isinstance(inputString, str) and isinstance(sepChar, str) and sepChar - ): # to prevent getting an empty String as argument - return None + def fromChar(self, inputString: str, sepChar: str = ",") -> list[str]: + # to prevent getting an empty String as argument + if not (isinstance(inputString, str) and isinstance(sepChar, str) and sepChar): + # This makes no sense so just ignore type checking here + return None # type: ignore return [fieldString.strip() for fieldString in inputString.split(sepChar) if len(fieldString.strip()) > 0] -List = ListDummy() +List: ListDummy = ListDummy() class Synchronizer: @@ -34,7 +86,10 @@ class Synchronizer: allowing it to be used as a synchronizing decorator making the call thread-safe""" - def __init__(self, lockName="", recursive=False): + lockName: str + lock: Lock | RLock + + def __init__(self, lockName: str = "", recursive: bool = False) -> None: envVar = os.environ.get("DIRAC_FEWER_CFG_LOCKS", "no").lower() self.__locksEnabled = envVar not in ("y", "yes", "t", "true", "on", "1") if self.__locksEnabled: @@ -44,11 +99,11 @@ def __init__(self, lockName="", recursive=False): else: self.lock = threading.Lock() - def __call__(self, funcToCall): + def __call__(self, funcToCall: Callable[P, T]) -> Callable[P, T]: if not self.__locksEnabled: return funcToCall - def lockedFunc(*args, **kwargs): + def lockedFunc(*args: P.args, **kwargs: P.kwargs) -> T: try: if self.lockName: print("LOCKING", self.lockName) @@ -62,21 +117,21 @@ def lockedFunc(*args, **kwargs): return lockedFunc -gCFGSynchro = Synchronizer(recursive=True) +gCFGSynchro: Synchronizer = Synchronizer(recursive=True) class CFG: - def __init__(self): + def __init__(self) -> None: """ Constructor """ - self.__orderedList = [] - self.__commentDict = {} - self.__dataDict = {} + self.__orderedList: list[str] = [] + self.__commentDict: dict[str, str] = {} + self.__dataDict: dict[str, CFG | str] = {} self.reset() @gCFGSynchro - def reset(self): + def reset(self) -> None: """ Empty the CFG """ @@ -85,7 +140,9 @@ def reset(self): self.__dataDict = {} @gCFGSynchro - def createNewSection(self, sectionName, comment="", contents=False): + def createNewSection( + self, sectionName: str, comment: str = "", contents: CFG | None = None + ) -> DErrorReturnType | CFG: """ Create a new section @@ -109,28 +166,14 @@ def createNewSection(self, sectionName, comment="", contents=False): self.__addEntry(sectionName, comment) if sectionName not in self.__dataDict: if not contents: - self.__dataDict[sectionName] = CFG() - else: - self.__dataDict[sectionName] = contents + contents = CFG() + self.__dataDict[sectionName] = contents else: raise KeyError(f"{sectionName} key already exists") - return self.__dataDict[sectionName] - - def __overrideAndCloneSection(self, sectionName, oCFGToClone): - """ - Replace the contents of a section - - :type sectionName: string - :params sectionName: Name of the section - :type oCFGToClone: CFG - :param oCFGToClone: CFG with the contents of the section - """ - if sectionName not in self.listSections(): - raise KeyError(f"Section {sectionName} does not exist") - self.__dataDict[sectionName] = oCFGToClone.clone() + return contents @gCFGSynchro - def setOption(self, optionName, value, comment=""): + def setOption(self, optionName: str, value: str, comment: str = "") -> DErrorReturnType | None: """ Create a new option. @@ -153,8 +196,9 @@ def setOption(self, optionName, value, comment=""): return parentSection.setOption(recDict["levelsBelow"], value, comment) self.__addEntry(optionName, comment) self.__dataDict[optionName] = str(value) + return None - def __addEntry(self, entryName, comment): + def __addEntry(self, entryName: str, comment: str) -> None: """ Add an entry and set the comment @@ -167,7 +211,7 @@ def __addEntry(self, entryName, comment): self.__orderedList.append(entryName) self.__commentDict[entryName] = comment - def existsKey(self, key): + def existsKey(self, key: str) -> bool: """ Check if an option/section with that name exists @@ -177,7 +221,7 @@ def existsKey(self, key): """ return key in self.__orderedList - def sortAlphabetically(self, ascending=True): + def sortAlphabetically(self, ascending: bool = True) -> bool: """ Order this cfg alphabetically returns True if modified @@ -186,7 +230,7 @@ def sortAlphabetically(self, ascending=True): return self.sortByKey(reverse=True) return self.sortByKey() - def sortByKey(self, key=None, reverse=False): + def sortByKey(self, key: Callable[[str], bool] | None = None, reverse: bool = False) -> bool: """ Order this cfg by function refered in key, default is None corresponds to alphabetic sort @@ -197,7 +241,7 @@ def sortByKey(self, key=None, reverse=False): return unordered != self.__orderedList @gCFGSynchro - def deleteKey(self, key): + def deleteKey(self, key: str) -> bool: """ Delete an option/section @@ -219,7 +263,7 @@ def deleteKey(self, key): return False @gCFGSynchro - def copyKey(self, oldName, newName): + def copyKey(self, oldName: str, newName: str) -> bool: """ Copy an option/section @@ -253,7 +297,7 @@ def copyKey(self, oldName, newName): return False @gCFGSynchro - def listOptions(self, ordered=True): + def listOptions(self, ordered: bool = True) -> list[str]: """ List options @@ -267,7 +311,7 @@ def listOptions(self, ordered=True): return [sKey for sKey in self.__dataDict.keys() if isinstance(self.__dataDict[sKey], str)] @gCFGSynchro - def listSections(self, ordered=True): + def listSections(self, ordered: bool = True) -> list[str]: """ List subsections @@ -281,7 +325,7 @@ def listSections(self, ordered=True): return [sKey for sKey in self.__dataDict.keys() if not isinstance(self.__dataDict[sKey], str)] @gCFGSynchro - def isSection(self, key): + def isSection(self, key: str) -> bool: """ Return if a section exists @@ -301,7 +345,7 @@ def isSection(self, key): return key in self.__dataDict and not isinstance(self.__dataDict[key], str) @gCFGSynchro - def isOption(self, key): + def isOption(self, key: str) -> bool: """ Return if an option exists @@ -320,7 +364,7 @@ def isOption(self, key): return section.isOption(secKey) return key in self.__dataDict and isinstance(self.__dataDict[key], str) - def listAll(self): + def listAll(self) -> list[str]: """ List all sections and options @@ -328,7 +372,7 @@ def listAll(self): """ return self.__orderedList - def __recurse(self, pathList): + def __recurse(self, pathList: list[str]) -> CFGKey | Literal[False]: """ Explore recursively a path @@ -337,19 +381,27 @@ def __recurse(self, pathList): :return: Dictionary with the contents { key, value, comment } """ if pathList[0] in self.__dataDict: + value = self.__dataDict[pathList[0]] if len(pathList) == 1: return { "key": pathList[0], - "value": self.__dataDict[pathList[0]], + "value": value, "comment": self.__commentDict[pathList[0]], } - value = self.__dataDict[pathList[0]] if isinstance(value, CFG): return value.__recurse(pathList[1:]) return False + @overload + def getRecursive(self, path: str, levelsAbove: Literal[-1]) -> CFGKeySection | None: + ... + + @overload + def getRecursive(self, path: str, levelsAbove: Literal[0] = 0) -> CFGKey | None: + ... + @gCFGSynchro - def getRecursive(self, path, levelsAbove=0): + def getRecursive(self, path: str, levelsAbove: int = 0) -> CFGKey | CFGKeySection | None: """ Get path contents @@ -383,7 +435,19 @@ def getRecursive(self, path, levelsAbove=0): retDict["levelsBelow"] = levelsBelow return retDict - def getOption(self, opName, defaultValue=None): + @overload + def getOption(self, opName: str, defaultValue: None = ...) -> str | None: + ... + + @overload + def getOption(self, opName: str, defaultValue: list[str]) -> list[str]: + ... + + @overload + def getOption(self, opName: str, defaultValue: bool) -> bool: + ... + + def getOption(self, opName, defaultValue=None): # type: ignore """ Get option value with default applied @@ -396,12 +460,17 @@ def getOption(self, opName, defaultValue=None): :return: Value of the option casted to defaultValue type, or defaultValue """ levels = List.fromChar(opName, "/") - dataD = self.__dataDict + + dataD: CFG | str = self while len(levels) > 0: + assert isinstance(dataD, CFG), (opName, dataD) try: - dataV = dataD[levels.pop(0)] + val = dataD[levels.pop(0)] except KeyError: return defaultValue + if val is False: + return defaultValue + dataV = val dataD = dataV if not isinstance(dataV, str): @@ -440,7 +509,7 @@ def getOption(self, opName, defaultValue=None): except Exception: return defaultValue - def getAsCFG(self, path=""): + def getAsCFG(self, path: str = "") -> CFG: """Return subsection as CFG object. :param str path: Path to the section @@ -453,9 +522,12 @@ def getAsCFG(self, path=""): remainingPath = splitPath[1:] if basePath not in self.__dataDict: return CFG() - return self.__dataDict[basePath].getAsCFG("/".join(remainingPath)) + section = self.__dataDict[basePath] + if not isinstance(section, CFG): + raise ValueError(f"Path {path} is not a section") + return section.getAsCFG("/".join(remainingPath)) - def getAsDict(self, path=""): + def getAsDict(self, path: str = "") -> CFGAsDict: """ Get the contents below a given path as a dict @@ -464,7 +536,7 @@ def getAsDict(self, path=""): :return: Dictionary containing the data """ - resVal = {} + resVal: CFGAsDict = {} if path: reqDict = self.getRecursive(path) if not reqDict: @@ -474,15 +546,16 @@ def getAsDict(self, path=""): return resVal return keyCfg.getAsDict() for op in self.listOptions(): - resVal[op] = self[op] + resVal[op] = cast(str, self[op]) for sec in self.listSections(): - resVal[sec] = self[sec].getAsDict() + resVal[sec] = cast(CFG, self[sec]).getAsDict() return resVal @gCFGSynchro - def appendToOption(self, optionName, value): + def appendToOption(self, optionName: str, value: str) -> None: """ Append a value to an option prepending a comma + # Doesn't actually append commas! :type optionName: string :param optionName: Name of the option to append the value @@ -502,7 +575,7 @@ def appendToOption(self, optionName, value): cfg.__dataDict[end] = current_value + str(value) @gCFGSynchro - def addKey(self, key, value, comment, beforeKey=""): + def addKey(self, key: str, value: str | CFG, comment: str, beforeKey: str = "") -> None: """ Add a new entry (option or section) @@ -532,7 +605,7 @@ def addKey(self, key, value, comment, beforeKey=""): cfg.__orderedList.insert(refKeyPos, end) @gCFGSynchro - def renameKey(self, oldName, newName): + def renameKey(self, oldName: str, newName: str) -> bool: """ Rename a option/section @@ -568,7 +641,8 @@ def renameKey(self, oldName, newName): else: return False - def __getitem__(self, key): + # def __getitem__(self, key: str) -> CFG | str | Literal[False]: + def __getitem__(self, key: str) -> CFG | str: """ Get the contents of a section/option @@ -579,17 +653,18 @@ def __getitem__(self, key): if key.find("/") > -1: subDict = self.getRecursive(key) if not subDict: - return False + return False # type: ignore + # raise KeyError(key) return subDict["value"] return self.__dataDict[key] - def __iter__(self): + def __iter__(self) -> Iterator[str]: """ Iterate though the contents in order """ yield from self.__orderedList - def __contains__(self, key): + def __contains__(self, key: str) -> bool: """ Check if a key is defined """ @@ -597,7 +672,7 @@ def __contains__(self, key): return False return bool(self.getRecursive(key)) - def __str__(self): + def __str__(self) -> str: """ Get a print friendly representation of the CFG @@ -605,7 +680,7 @@ def __str__(self): """ return self.serialize() - def __repr__(self): + def __repr__(self) -> str: """ Get a print friendly representation of the CFG @@ -613,16 +688,18 @@ def __repr__(self): """ return self.serialize() - def __bool__(self): + def __bool__(self) -> Literal[True]: """ CFGs are not zeroes! ;) """ return True - def __eq__(self, cfg): + def __eq__(self, cfg: object) -> bool: """ Check CFGs """ + if not isinstance(cfg, CFG): + return False if not self.__orderedList == cfg.__orderedList: return False for key in self.__orderedList: @@ -633,7 +710,7 @@ def __eq__(self, cfg): return True @gCFGSynchro - def getComment(self, entryName): + def getComment(self, entryName: str) -> str: """ Get the comment for an option/section @@ -647,7 +724,7 @@ def getComment(self, entryName): raise ValueError(f"{entryName} does not have any comment defined") from None @gCFGSynchro - def setComment(self, entryName, comment): + def setComment(self, entryName: str, comment: str) -> bool: """ Set the comment for an option/section @@ -662,7 +739,7 @@ def setComment(self, entryName, comment): return False @gCFGSynchro - def serialize(self, tabLevelString=""): + def serialize(self, tabLevelString: str = "") -> str: """ Generate a human readable serialization of a CFG @@ -678,10 +755,10 @@ def serialize(self, tabLevelString=""): cfgString += f"{tabLevelString}#{commentLine}\n" if entryName in self.listSections(): cfgString += f"{tabLevelString}{entryName}\n{tabLevelString}{{\n" - cfgString += self.__dataDict[entryName].serialize(f"{tabLevelString}{indentation}") + cfgString += cast(CFG, self.__dataDict[entryName]).serialize(f"{tabLevelString}{indentation}") cfgString += "%s}\n" % tabLevelString elif entryName in self.listOptions(): - valueList = List.fromChar(self.__dataDict[entryName]) + valueList = List.fromChar(cast(str, self.__dataDict[entryName])) if len(valueList) == 0: cfgString += f"{tabLevelString}{entryName} = \n" else: @@ -693,7 +770,7 @@ def serialize(self, tabLevelString=""): return cfgString @gCFGSynchro - def clone(self): + def clone(self) -> CFG: """ Create a copy of the CFG @@ -703,13 +780,15 @@ def clone(self): clonedCFG.__orderedList = copy.deepcopy(self.__orderedList) clonedCFG.__commentDict = copy.deepcopy(self.__commentDict) for option in self.listOptions(): - clonedCFG.__dataDict[option] = self[option] + value = self[option] + assert value is not False + clonedCFG.__dataDict[option] = value for section in self.listSections(): - clonedCFG.__dataDict[section] = self[section].clone() + clonedCFG.__dataDict[section] = cast(CFG, self[section]).clone() return clonedCFG @gCFGSynchro - def mergeWith(self, cfgToMergeWith): + def mergeWith(self, cfgToMergeWith: CFG) -> CFG: """ Generate a CFG by merging with the contents of another CFG. @@ -720,21 +799,31 @@ def mergeWith(self, cfgToMergeWith): """ mergedCFG = CFG() for option in self.listOptions(): - mergedCFG.setOption(option, self[option], self.getComment(option)) + mergedCFG.setOption(option, cast(str, self[option]), self.getComment(option)) for option in cfgToMergeWith.listOptions(): - mergedCFG.setOption(option, cfgToMergeWith[option], cfgToMergeWith.getComment(option)) + mergedCFG.setOption(option, cast(str, cfgToMergeWith[option]), cfgToMergeWith.getComment(option)) for section in self.listSections(): + sectionCFG = cast(CFG, self[section]) if section in cfgToMergeWith.listSections(): - oSectionCFG = self[section].mergeWith(cfgToMergeWith[section]) + oSectionCFG = sectionCFG.mergeWith(cast(CFG, cfgToMergeWith[section])) mergedCFG.createNewSection(section, cfgToMergeWith.getComment(section), oSectionCFG) else: - mergedCFG.createNewSection(section, self.getComment(section), self[section].clone()) + mergedCFG.createNewSection(section, self.getComment(section), sectionCFG.clone()) for section in cfgToMergeWith.listSections(): if section not in self.listSections(): - mergedCFG.createNewSection(section, cfgToMergeWith.getComment(section), cfgToMergeWith[section]) + mergedCFG.createNewSection( + section, cfgToMergeWith.getComment(section), cast(CFG, cfgToMergeWith[section]) + ) return mergedCFG - def getModifications(self, newerCfg, ignoreMask=None, parentPath="", ignoreOrder=False, ignoreComments=False): + def getModifications( + self, + newerCfg: CFG, + ignoreMask: list[str] | None = None, + parentPath: str = "", + ignoreOrder: bool = False, + ignoreComments: bool = False, + ) -> list[ModificationType]: """ Compare two cfgs @@ -746,7 +835,7 @@ def getModifications(self, newerCfg, ignoreMask=None, parentPath="", ignoreOrder :param ignoreComments: Do not return changes for changed commens :return: A list of modifications """ - modList = [] + modList: list[ModificationType] = [] # Options oldOptions = self.listOptions(True) newOptions = newerCfg.listOptions(True) @@ -756,7 +845,9 @@ def getModifications(self, newerCfg, ignoreMask=None, parentPath="", ignoreOrder if ignoreMask and newOptPath in ignoreMask: continue if newOption not in oldOptions: - modList.append(("addOpt", newOption, iPos, newerCfg[newOption], newerCfg.getComment(newOption))) + modList.append( + ("addOpt", newOption, iPos, cast(str, newerCfg[newOption]), newerCfg.getComment(newOption)) + ) else: modified = False if iPos != self.__orderedList.index(newOption) and not ignoreOrder: @@ -766,7 +857,9 @@ def getModifications(self, newerCfg, ignoreMask=None, parentPath="", ignoreOrder elif newerCfg.getComment(newOption) != self.getComment(newOption) and not ignoreComments: modified = True if modified: - modList.append(("modOpt", newOption, iPos, newerCfg[newOption], newerCfg.getComment(newOption))) + modList.append( + ("modOpt", newOption, iPos, cast(str, newerCfg[newOption]), newerCfg.getComment(newOption)) + ) for oldOption in oldOptions: oldOptPath = f"{parentPath}/{oldOption}" if ignoreMask and oldOptPath in ignoreMask: @@ -789,8 +882,8 @@ def getModifications(self, newerCfg, ignoreMask=None, parentPath="", ignoreOrder modified = True elif newerCfg.getComment(newSection) != self.getComment(newSection): modified = True - subMod = self[newSection].getModifications( - newerCfg[newSection], ignoreMask, newSecPath, ignoreOrder, ignoreComments + subMod = cast(CFG, self[newSection]).getModifications( + cast(CFG, newerCfg[newSection]), ignoreMask, newSecPath, ignoreOrder, ignoreComments ) if subMod: modified = True @@ -804,7 +897,7 @@ def getModifications(self, newerCfg, ignoreMask=None, parentPath="", ignoreOrder modList.append(("delSec", oldSection, -1, "")) return modList - def applyModifications(self, modList, parentSection=""): + def applyModifications(self, modList: list[ModificationType], parentSection: str = "") -> DReturnType[Literal[""]]: """ Apply modifications to a CFG @@ -813,39 +906,41 @@ def applyModifications(self, modList, parentSection=""): :return: True/False """ for modAction in modList: - action = modAction[0] key = modAction[1] iPos = modAction[2] - value = modAction[3] - if action == "addSec": + if modAction[0] == "addSec": if key in self.listSections(): return S_ERROR(f"Section {parentSection}/{key} already exists") # key, value, comment, beforeKey = "" - value = CFG().loadFromBuffer(value) + sec = CFG().loadFromBuffer(modAction[3]) comment = modAction[4].strip() if iPos < len(self.__orderedList): beforeKey = self.__orderedList[iPos] else: beforeKey = "" - self.addKey(key, value, comment, beforeKey) - elif action == "delSec": + self.addKey(key, sec, comment, beforeKey) + elif modAction[0] == "delSec": if key not in self.listSections(): return S_ERROR(f"Section {parentSection}/{key} does not exist") self.deleteKey(key) - elif action == "modSec": + elif modAction[0] == "modSec": if key not in self.listSections(): return S_ERROR(f"Section {parentSection}/{key} does not exist") + section = self[key] + assert isinstance(section, CFG) + # if not isinstance(section, CFG): + # return S_ERROR(f"Section {parentSection}/{key} does not exist") comment = modAction[4].strip() self.setComment(key, comment) - if value: - result = self[key].applyModifications(value, f"{parentSection}/{key}") + if modAction[3]: + result = section.applyModifications(modAction[3], f"{parentSection}/{key}") if not result["OK"]: return result if iPos >= len(self.__orderedList) or key != self.__orderedList[iPos]: prevPos = self.__orderedList.index(key) del self.__orderedList[prevPos] self.__orderedList.insert(iPos, key) - elif action == "addOpt": + elif modAction[0] == "addOpt": if key in self.listOptions(): return S_ERROR(f"Option {parentSection}/{key} exists already") # key, value, comment, beforeKey = "" @@ -854,17 +949,17 @@ def applyModifications(self, modList, parentSection=""): beforeKey = self.__orderedList[iPos] else: beforeKey = "" - self.addKey(key, value, comment, beforeKey) - elif action == "modOpt": + self.addKey(key, modAction[3], comment, beforeKey) + elif modAction[0] == "modOpt": if key not in self.listOptions(): return S_ERROR(f"Option {parentSection}/{key} does not exist") comment = modAction[4].strip() - self.setOption(key, value, comment) + self.setOption(key, modAction[3], comment) if iPos >= len(self.__orderedList) or key != self.__orderedList[iPos]: prevPos = self.__orderedList.index(key) del self.__orderedList[prevPos] self.__orderedList.insert(iPos, key) - elif action == "delOpt": + elif modAction[0] == "delOpt": if key not in self.listOptions(): return S_ERROR(f"Option {parentSection}/{key} does not exist") self.deleteKey(key) @@ -872,7 +967,7 @@ def applyModifications(self, modList, parentSection=""): return S_OK() # Functions to load a CFG - def loadFromFile(self, fileName): + def loadFromFile(self, fileName: str) -> CFG: """ Load the contents of the CFG from a file @@ -881,19 +976,17 @@ def loadFromFile(self, fileName): :return: This CFG """ if zipfile.is_zipfile(fileName): - # Zipped file - zipHandler = zipfile.ZipFile(fileName) - nameList = zipHandler.namelist() - fileToRead = nameList[0] - fileData = zipHandler.read(fileToRead).decode("utf-8") - zipHandler.close() + with zipfile.ZipFile(fileName) as zipHandler: + nameList = zipHandler.namelist() + fileToRead = nameList[0] + fileData = zipHandler.read(fileToRead).decode("utf-8") else: with open(fileName) as fd: fileData = fd.read() return self.loadFromBuffer(fileData) @gCFGSynchro - def loadFromBuffer(self, data): + def loadFromBuffer(self, data: str) -> CFG: """ Load the contents of the CFG from a string @@ -921,7 +1014,7 @@ def loadFromBuffer(self, data): currentlyParsedString = currentlyParsedString.strip() currentLevel.createNewSection(currentlyParsedString, currentComment) levelList.append(currentLevel) - currentLevel = currentLevel[currentlyParsedString] + currentLevel = cast(CFG, currentLevel[currentlyParsedString]) currentlyParsedString = "" currentComment = "" elif line[index] == "}": @@ -951,7 +1044,7 @@ def loadFromBuffer(self, data): return self @gCFGSynchro - def loadFromDict(self, data): + def loadFromDict(self, data: CFGAsDict) -> CFG: for k in data: value = data[k] if isinstance(value, dict): @@ -962,7 +1055,7 @@ def loadFromDict(self, data): self.setOption(k, str(value), "") return self - def writeToFile(self, fileName): + def writeToFile(self, fileName: StrOrBytesPath) -> bool: """ Write the contents of the cfg to file diff --git a/src/diraccfg/py.typed b/src/diraccfg/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/src/diraccfg/versions.py b/src/diraccfg/versions.py index 38284fa..79e70db 100644 --- a/src/diraccfg/versions.py +++ b/src/diraccfg/versions.py @@ -1,7 +1,9 @@ +from __future__ import annotations + import re -def parseVersion(versionString): +def parseVersion(versionString: str) -> tuple[int, int, int, int | None]: """Parse a DIRAC-style version sting :param versionString: Version identifier to parse @@ -16,11 +18,9 @@ def parseVersion(versionString): if not match: raise ValueError(f"{versionString} is not a valid version") - segments = match.groupdict() - for k, v in segments.items(): - if k != "pre" and v is None: - segments[k] = 0 - if v is not None: - segments[k] = int(v) - - return (segments["major"], segments["minor"], segments["patch"], segments["pre"]) + return ( + int(match.group("major")), + int(match.group("minor")), + int(match.group("patch") or 0), + None if match.group("pre") is None else int(match.group("pre")), + ) diff --git a/tests/test_cfg.py b/tests/test_cfg.py index 9c78daa..1e21b12 100644 --- a/tests/test_cfg.py +++ b/tests/test_cfg.py @@ -338,7 +338,7 @@ def test_comments(): assert a.getComment("a") == "" assert a.setComment("a", "some comment") is True - a.getComment("a") == "some comment" + assert a.getComment("a") == "some comment" assert "missing" not in a assert a.setComment("missing", "some comment") is False @@ -352,7 +352,7 @@ def test_comments_nested(): a = CFG().loadFromDict({"a": "1", "b": "2", "c": {"d": "3", "e": "4"}}) assert a.getComment("/c/d") == "" assert a.setComment("/c/d", "some other comment") is True - a.getComment("/c/d") == "some other comment" + assert a.getComment("/c/d") == "some other comment" def test_getitem(): @@ -392,7 +392,7 @@ def test_setOption(): assert a.getOption("/c/x") == "hello world" assert a.getOption("/missing/z") is None - assert a.setOption("/missing/z", "invalid")["OK"] == False + assert a.setOption("/missing/z", "invalid")["OK"] is False assert a.getOption("/missing/z") is None with pytest.raises(KeyError, match=r"doesn't seem to be a section"): @@ -708,9 +708,9 @@ def test_createNewSection(): with pytest.raises(KeyError, match="Entry a doesn't seem to"): a.createNewSection("/a/new9") - assert a.createNewSection("/new2/new9/new10")["OK"] == False - assert a.createNewSection("new3/new4")["OK"] == False - assert a.createNewSection("/new3/new4")["OK"] == False + assert a.createNewSection("/new2/new9/new10")["OK"] is False + assert a.createNewSection("new3/new4")["OK"] is False + assert a.createNewSection("/new3/new4")["OK"] is False def test_writeToFile(tmp_path):