diff --git a/discopop_library/PatchApplicator/apply.py b/discopop_library/PatchApplicator/apply.py index d0fd75d01..d17322643 100644 --- a/discopop_library/PatchApplicator/apply.py +++ b/discopop_library/PatchApplicator/apply.py @@ -38,11 +38,14 @@ def apply_patches( if suggestion_id in applicable_suggestions: if arguments.verbose: print("Applying suggestion ", suggestion_id) - __apply_file_patches(file_mapping, suggestion_id, patch_generator_dir, arguments) - applied_suggestions["applied"].append(suggestion_id) - # write updated applied suggestions to file - with open(applied_suggestions_file, "w") as f: - f.write(json.dumps(applied_suggestions)) + successful = __apply_file_patches(file_mapping, suggestion_id, patch_generator_dir, arguments) + if successful: + applied_suggestions["applied"].append(suggestion_id) + # write updated applied suggestions to file + with open(applied_suggestions_file, "w") as f: + f.write(json.dumps(applied_suggestions)) + else: + print("Applying suggestion", suggestion_id, "not successful.") else: if arguments.verbose: print("Nothing to apply for suggestion ", suggestion_id) @@ -50,17 +53,18 @@ def apply_patches( def __apply_file_patches( file_mapping: Dict[int, Path], suggestion_id: str, patch_generator_dir: str, arguments: PatchApplicatorArguments -): +) -> bool: # get a list of patches for the given suggestion patch_files = os.listdir(os.path.join(patch_generator_dir, suggestion_id)) if arguments.verbose: print("\tFound patch files:", patch_files) + encountered_error = False + already_patched: List[str] = [] for patch_file_name in patch_files: patch_file_id = int(patch_file_name.rstrip(".patch")) patch_target = file_mapping[patch_file_id] patch_file_path = os.path.join(patch_generator_dir, suggestion_id, patch_file_name) - command = [ "patch", patch_target.as_posix(), @@ -80,5 +84,40 @@ def __apply_file_patches( print("RESULT: ", result.returncode) print("STDERR:") print(result.stderr) + print("STDOUT: ") + print(result.stdout) + encountered_error = True + break + else: + already_patched.append(patch_file_name) + + # cleanup in case of an error + if encountered_error: + for patch_file_name in already_patched: + patch_file_id = int(patch_file_name.rstrip(".patch")) + patch_target = file_mapping[patch_file_id] + patch_file_path = os.path.join(patch_generator_dir, suggestion_id, patch_file_name) + command = [ + "patch", + "-R", + patch_target.as_posix(), + patch_file_path, + ] + if arguments.verbose: + print("\tcleanup: applying: ", " ".join(command)) + result = subprocess.run( + command, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + universal_newlines=True, + cwd=os.getcwd(), + ) + if result.returncode != 0: + if arguments.verbose: + print("RESULT: ", result.returncode) + print("STDERR:") + print(result.stderr) print("STDOUT: ") print(result.stdout) + + return not encountered_error diff --git a/discopop_library/PatchApplicator/load.py b/discopop_library/PatchApplicator/load.py index f2c770729..fa0bc0d57 100644 --- a/discopop_library/PatchApplicator/load.py +++ b/discopop_library/PatchApplicator/load.py @@ -5,5 +5,30 @@ # This software may be modified and distributed under the terms of # the 3-Clause BSD License. See the LICENSE file in the package base # directory for details. -def load_patches(): - raise NotImplementedError() +import json +import os.path +from pathlib import Path +from typing import Dict + +from discopop_library.PatchApplicator.PatchApplicatorArguments import PatchApplicatorArguments +from discopop_library.PatchApplicator.apply import apply_patches + + +def load_patches( + file_mapping: Dict[int, Path], + arguments: PatchApplicatorArguments, + applied_suggestions_file: str, + patch_generator_dir: str, +): + # check if a saved configuration exists + if not os.path.exists(applied_suggestions_file + ".save"): + raise FileNotFoundError( + "Nothing to be loaded. Exiting." "\nExpected file: ", applied_suggestions_file + ".save" + ) + + # get the list of suggestions to be applied + with open(applied_suggestions_file + ".save", "r") as f: + suggestions_to_be_applied = json.loads(f.read())["applied"] + + # apply all suggestions + apply_patches(suggestions_to_be_applied, file_mapping, arguments, applied_suggestions_file, patch_generator_dir) diff --git a/discopop_library/PatchApplicator/patch_applicator.py b/discopop_library/PatchApplicator/patch_applicator.py index 10e3b595d..40736b8bf 100644 --- a/discopop_library/PatchApplicator/patch_applicator.py +++ b/discopop_library/PatchApplicator/patch_applicator.py @@ -68,7 +68,7 @@ def run(arguments: PatchApplicatorArguments): elif arguments.clear: clear_patches(file_mapping, arguments, applied_suggestions_file, patch_generator_dir) elif arguments.load: - load_patches() + load_patches(file_mapping, arguments, applied_suggestions_file, patch_generator_dir) elif arguments.list: print("Applied suggestions: ", list_applied_suggestions(arguments, applied_suggestions_file)) diff --git a/discopop_library/PatchApplicator/rollback.py b/discopop_library/PatchApplicator/rollback.py index 5bf1d2ee1..9d363fa60 100644 --- a/discopop_library/PatchApplicator/rollback.py +++ b/discopop_library/PatchApplicator/rollback.py @@ -37,12 +37,15 @@ def rollback_patches( continue if suggestion_id in applicable_suggestions: if arguments.verbose: - print("Applying rollback of suggestion ", suggestion_id) - __rollback_file_patches(file_mapping, suggestion_id, patch_generator_dir, arguments) - applied_suggestions["applied"].remove(suggestion_id) - # write updated applied suggestions to file - with open(applied_suggestions_file, "w") as f: - f.write(json.dumps(applied_suggestions)) + print("Rollback suggestion ", suggestion_id) + successul = __rollback_file_patches(file_mapping, suggestion_id, patch_generator_dir, arguments) + if successul: + applied_suggestions["applied"].remove(suggestion_id) + # write updated applied suggestions to file + with open(applied_suggestions_file, "w") as f: + f.write(json.dumps(applied_suggestions)) + else: + print("Rollback of suggestion", suggestion_id, "not successful.") else: if arguments.verbose: print("Nothing to rollback for suggestion ", suggestion_id) @@ -50,12 +53,14 @@ def rollback_patches( def __rollback_file_patches( file_mapping: Dict[int, Path], suggestion_id: str, patch_generator_dir: str, arguments: PatchApplicatorArguments -): +) -> bool: # get a list of patches for the given suggestion patch_files = os.listdir(os.path.join(patch_generator_dir, suggestion_id)) if arguments.verbose: print("\tFound patch files:", patch_files) + encountered_error = False + already_patched: List[str] = [] for patch_file_name in patch_files: patch_file_id = int(patch_file_name.rstrip(".patch")) patch_target = file_mapping[patch_file_id] @@ -81,5 +86,39 @@ def __rollback_file_patches( print("RESULT: ", result.returncode) print("STDERR:") print(result.stderr) + print("STDOUT: ") + print(result.stdout) + encountered_error = True + break + else: + already_patched.append(patch_file_name) + + # cleanup in case of an error + if encountered_error: + for patch_file_name in already_patched: + patch_file_id = int(patch_file_name.rstrip(".patch")) + patch_target = file_mapping[patch_file_id] + patch_file_path = os.path.join(patch_generator_dir, suggestion_id, patch_file_name) + command = [ + "patch", + patch_target.as_posix(), + patch_file_path, + ] + if arguments.verbose: + print("\tcleanup: applying: ", " ".join(command)) + result = subprocess.run( + command, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + universal_newlines=True, + cwd=os.getcwd(), + ) + if result.returncode != 0: + if arguments.verbose: + print("RESULT: ", result.returncode) + print("STDERR:") + print(result.stderr) print("STDOUT: ") print(result.stdout) + + return not encountered_error