From f6aa2ab34d32d9ee1095f79e603fe6ff62f1c9d7 Mon Sep 17 00:00:00 2001 From: moe-ad Date: Thu, 5 Dec 2024 14:46:48 +0100 Subject: [PATCH] fix: reverted src .py files --- src/ansys/dpf/core/core.py | 29 ++++++------- src/ansys/dpf/core/custom_operator.py | 59 +++++++++++++-------------- src/ansys/dpf/core/data_sources.py | 33 +++++++-------- 3 files changed, 56 insertions(+), 65 deletions(-) diff --git a/src/ansys/dpf/core/core.py b/src/ansys/dpf/core/core.py index 69a1a15d20..199f5295c4 100644 --- a/src/ansys/dpf/core/core.py +++ b/src/ansys/dpf/core/core.py @@ -29,7 +29,6 @@ import logging import warnings import weakref -from pathlib import Path from ansys.dpf.core import errors, misc from ansys.dpf.core import server as server_module @@ -430,6 +429,7 @@ def load_library(self, file_path, name="", symbol="LoadOperators", generate_oper ) if generate_operators: # TODO: fix code generation upload posix + import os def __generate_code(TARGET_PATH, filename, name, symbol): from ansys.dpf.core.dpf_operator import Operator @@ -444,8 +444,8 @@ def __generate_code(TARGET_PATH, filename, name, symbol): except Exception as e: warnings.warn("Unable to generate the python code with error: " + str(e.args)) - local_dir = Path(__file__).parent - LOCAL_PATH = local_dir / "operators" + local_dir = os.path.dirname(os.path.abspath(__file__)) + LOCAL_PATH = os.path.join(local_dir, "operators") if not self._server().local_server: if self._server().os != "posix" or (not self._server().os and os.name != "posix"): # send local generated code @@ -761,25 +761,24 @@ def upload_files_in_folder( new file paths server side """ server_paths = [] - client_folder_path = Path(client_folder_path) - for root, subdirectories, files in client_folder_path.walk(): + for root, subdirectories, files in os.walk(client_folder_path): for subdirectory in subdirectories: - subdir = root / subdirectory - for filename in subdir.iterdir(): - f = subdir / filename + subdir = os.path.join(root, subdirectory) + for filename in os.listdir(subdir): + f = os.path.join(subdir, filename) server_paths = self._upload_and_get_server_path( specific_extension, - str(f), + f, filename, server_paths, str(to_server_folder_path), subdirectory, ) for file in files: - f = root / file + f = os.path.join(root, file) server_paths = self._upload_and_get_server_path( specific_extension, - str(f), + f, file, server_paths, str(to_server_folder_path), @@ -837,8 +836,7 @@ def upload_file(self, file_path, to_server_file_path): server_file_path : str path generated server side """ - file_path = Path(file_path) - if file_path.stat().st_size == 0: + if os.stat(file_path).st_size == 0: raise ValueError(file_path + " is empty") if not self._server().has_client(): txt = """ @@ -870,12 +868,11 @@ def upload_file_in_tmp_folder(self, file_path, new_file_name=None): server_file_path : str path generated server side """ - file_path = Path(file_path) if new_file_name: file_name = new_file_name else: - file_name = Path(file_path).name - if file_path.stat().st_size == 0: + file_name = os.path.basename(file_path) + if os.stat(file_path).st_size == 0: raise ValueError(file_path + " is empty") if not self._server().has_client(): txt = """ diff --git a/src/ansys/dpf/core/custom_operator.py b/src/ansys/dpf/core/custom_operator.py index 9ba5b9b49b..f5e8941951 100644 --- a/src/ansys/dpf/core/custom_operator.py +++ b/src/ansys/dpf/core/custom_operator.py @@ -31,7 +31,7 @@ import abc import ctypes import os -from pathlib import Path +import pathlib import re import shutil import tempfile @@ -85,23 +85,23 @@ def update_virtual_environment_for_custom_operators( raise NotImplementedError( "Updating the dpf-site.zip of a DPF Server is only available when InProcess." ) - current_dpf_site_zip_path = Path(server.ansys_path) / "dpf" / "python" / "dpf-site.zip" + current_dpf_site_zip_path = os.path.join(server.ansys_path, "dpf", "python", "dpf-site.zip") # Get the path to where we store the original dpf-site.zip - original_dpf_site_zip_path = ( - Path(server.ansys_path) / "dpf" / "python" / "original" / "dpf-site.zip" + original_dpf_site_zip_path = os.path.join( + server.ansys_path, "dpf", "python", "original", "dpf-site.zip" ) # Restore the original dpf-site.zip if restore_original: - if original_dpf_site_zip_path.exists(): + if os.path.exists(original_dpf_site_zip_path): shutil.move(src=original_dpf_site_zip_path, dst=current_dpf_site_zip_path) - original_dpf_site_zip_path.parent.rmdir() + os.rmdir(os.path.dirname(original_dpf_site_zip_path)) else: warnings.warn("No original dpf-site.zip found. Current is most likely the original.") else: # Store original dpf-site.zip for this DPF Server if no original is stored - if not original_dpf_site_zip_path.parent.exists(): - original_dpf_site_zip_path.parent.mkdir() - if not original_dpf_site_zip_path.exists(): + if not os.path.exists(os.path.dirname(original_dpf_site_zip_path)): + os.mkdir(os.path.dirname(original_dpf_site_zip_path)) + if not os.path.exists(original_dpf_site_zip_path): shutil.move(src=current_dpf_site_zip_path, dst=original_dpf_site_zip_path) # Get the current paths to site_packages import site @@ -111,47 +111,46 @@ def update_virtual_environment_for_custom_operators( # Get the first one targeting an actual site-packages folder for path_to_site_packages in paths_to_current_site_packages: if path_to_site_packages[-13:] == "site-packages": - current_site_packages_path = Path(path_to_site_packages) + current_site_packages_path = pathlib.Path(path_to_site_packages) break if current_site_packages_path is None: warnings.warn("Could not find a currently loaded site-packages folder to update from.") return # If an ansys.dpf.core.path file exists, then the installation is editable - search_path = current_site_packages_path + search_path = pathlib.Path(current_site_packages_path) potential_editable = list(search_path.rglob("__editable__.ansys_dpf_core-*.pth")) if potential_editable: path_file = potential_editable[0] else: # Keep for older setuptools versions - path_file = current_site_packages_path / "ansys.dpf.core.pth" - if path_file.exists(): + path_file = os.path.join(current_site_packages_path, "ansys.dpf.core.pth") + if os.path.exists(path_file): # Treat editable installation of ansys-dpf-core - with path_file.open("r") as f: + with open(path_file, "r") as f: current_site_packages_path = f.readline().strip() with tempfile.TemporaryDirectory() as tmpdir: - tmpdir = Path(tmpdir) - ansys_dir = tmpdir / "ansys_dpf_core" - ansys_dir.mkdir() - ansys_dir.joinpath("ansys").mkdir() - ansys_dir.joinpath("ansys", "dpf").mkdir() - ansys_dir.joinpath("ansys", "grpc").mkdir() + os.mkdir(os.path.join(tmpdir, "ansys_dpf_core")) + ansys_dir = os.path.join(tmpdir, "ansys_dpf_core") + os.mkdir(os.path.join(ansys_dir, "ansys")) + os.mkdir(os.path.join(ansys_dir, "ansys", "dpf")) + os.mkdir(os.path.join(ansys_dir, "ansys", "grpc")) shutil.copytree( - src=current_site_packages_path / "ansys" / "dpf" / "core", - dst=ansys_dir / "ansys" / "dpf" / "core", + src=os.path.join(current_site_packages_path, "ansys", "dpf", "core"), + dst=os.path.join(ansys_dir, "ansys", "dpf", "core"), ignore=lambda directory, contents: ["__pycache__", "result_files"], ) shutil.copytree( - src=current_site_packages_path / "ansys" / "dpf" / "gate", - dst=ansys_dir / "ansys" / "dpf" / "gate", + src=os.path.join(current_site_packages_path, "ansys", "dpf", "gate"), + dst=os.path.join(ansys_dir, "ansys", "dpf", "gate"), ignore=lambda directory, contents: ["__pycache__"], ) shutil.copytree( - src=current_site_packages_path / "ansys" / "grpc" / "dpf", - dst=ansys_dir / "ansys" / "grpc" / "dpf", + src=os.path.join(current_site_packages_path, "ansys", "grpc", "dpf"), + dst=os.path.join(ansys_dir, "ansys", "grpc", "dpf"), ignore=lambda directory, contents: ["__pycache__"], ) # Find the .dist_info folder pattern = re.compile(r"^ansys_dpf_core\S*") - for p in current_site_packages_path.iterdir(): + for p in pathlib.Path(current_site_packages_path).iterdir(): if p.is_dir(): # print(p.stem) if re.search(pattern, p.stem): @@ -159,12 +158,12 @@ def update_virtual_environment_for_custom_operators( break shutil.copytree( src=dist_info_path, - dst=ansys_dir / dist_info_path.name, + dst=os.path.join(ansys_dir, dist_info_path.name), ) # Zip the files as dpf-site.zip - base_name = tmpdir / "ansys_dpf_core_zip" + base_name = os.path.join(tmpdir, "ansys_dpf_core_zip") base_dir = "." - root_dir = tmpdir / "ansys_dpf_core" # OK + root_dir = os.path.join(tmpdir, "ansys_dpf_core") # OK shutil.make_archive( base_name=base_name, root_dir=root_dir, base_dir=base_dir, format="zip" ) diff --git a/src/ansys/dpf/core/data_sources.py b/src/ansys/dpf/core/data_sources.py index 0275f6f4f6..d694f54b5a 100644 --- a/src/ansys/dpf/core/data_sources.py +++ b/src/ansys/dpf/core/data_sources.py @@ -28,7 +28,6 @@ """ import os -from pathlib import Path import warnings import traceback from typing import Union @@ -143,7 +142,7 @@ def set_result_file_path(self, filepath, key=""): ['/tmp/file.rst'] """ - extension = Path(filepath).suffix + extension = os.path.splitext(filepath)[1] # Handle .res files from CFX if key == "" and extension == ".res": key = "cas" @@ -163,7 +162,7 @@ def set_result_file_path(self, filepath, key=""): def guess_result_key(filepath: str) -> str: """Guess result key for files without a file extension.""" result_keys = ["d3plot", "binout"] - base_name = Path(filepath).name + base_name = os.path.basename(filepath) # Handle files without extension for result_key in result_keys: if result_key in base_name: @@ -173,13 +172,14 @@ def guess_result_key(filepath: str) -> str: @staticmethod def guess_second_key(filepath: str) -> str: """For files with an h5 or cff extension, look for another extension.""" - - # These files usually end with .cas.h5 or .dat.h5 accepted = ["cas", "dat"] - new_split = Path(filepath).suffixes + without_ext = os.path.splitext(filepath)[0] + new_split = os.path.splitext(without_ext) new_key = "" - if new_split[0] in accepted: - new_key = new_split[0] + if len(new_split) > 1: + key = new_split[1][1:] + if key in accepted: + new_key = key return new_key def set_domain_result_file_path( @@ -241,12 +241,9 @@ def add_file_path(self, filepath, key="", is_domain: bool = False, domain_id=0): """ # The filename needs to be a fully qualified file name - # if not os.path.dirname(filepath) - - filepath = Path(filepath) - if not filepath.parent.name: + if not os.path.dirname(filepath): # append local path - filepath = Path.cwd() / filepath.name + filepath = os.path.join(os.getcwd(), os.path.basename(filepath)) if is_domain: if key == "": raise NotImplementedError("A key must be given when using is_domain=True.") @@ -283,10 +280,9 @@ def add_domain_file_path(self, filepath, key, domain_id): """ # The filename needs to be a fully qualified file name - filepath = Path(filepath) - if not filepath.parent.name: + if not os.path.dirname(filepath): # append local path - filepath = Path.cwd() / filepath.name + filepath = os.path.join(os.getcwd(), os.path.basename(filepath)) self._api.data_sources_add_domain_file_path_with_key_utf8( self, str(filepath), key, domain_id ) @@ -311,10 +307,9 @@ def add_file_path_for_specified_result(self, filepath, key="", result_key=""): The default is ``""``, in which case the key is found directly. """ # The filename needs to be a fully qualified file name - filepath = Path(filepath) - if not filepath.parent.name: + if not os.path.dirname(filepath): # append local path - filepath = Path.cwd() / filepath.name + filepath = os.path.join(os.getcwd(), os.path.basename(filepath)) self._api.data_sources_add_file_path_for_specified_result_utf8( self, str(filepath), key, result_key