From a5865010e9bb93eb0b7a681b24b0f6266d4df319 Mon Sep 17 00:00:00 2001 From: Vishal Gupta Date: Sat, 14 Dec 2024 01:29:33 +0000 Subject: [PATCH] Invoke validations from import executor --- import-automation/executor/app/configs.py | 6 + .../executor/app/executor/import_executor.py | 142 ++++++++++++++---- 2 files changed, 118 insertions(+), 30 deletions(-) diff --git a/import-automation/executor/app/configs.py b/import-automation/executor/app/configs.py index 60e615a99..4be7a4e72 100644 --- a/import-automation/executor/app/configs.py +++ b/import-automation/executor/app/configs.py @@ -111,6 +111,12 @@ class ExecutorConfig: user_script_args: List[str] = () # Environment variables for the user script user_script_env: dict = None + # Skip uploading the data to GCS (for local testing). + skip_gcs_upload: bool = True + # Invoke validations before upload. + invoke_import_validation: bool = True + # Import validation config file. + validation_config_file: str = 'tools/validation/config.json' # Maximum time venv creation can take in seconds. venv_create_timeout: float = 3600 # Maximum time downloading a file can take in seconds. diff --git a/import-automation/executor/app/executor/import_executor.py b/import-automation/executor/app/executor/import_executor.py index 8396f9546..c921690b6 100644 --- a/import-automation/executor/app/executor/import_executor.py +++ b/import-automation/executor/app/executor/import_executor.py @@ -34,6 +34,7 @@ from app.service import file_uploader from app.service import github_api from app.service import import_service +from google.cloud import storage # Email address for status messages. _DEBUG_EMAIL_ADDR = 'datacommons-debug+imports@google.com' @@ -318,6 +319,100 @@ def _import_one( ) raise exc + def _invoke_import_validation(self, repo_dir: str, relative_import_dir: str, + absolute_import_dir: str, import_spec: dict, + interpreter_path: str, + process: subprocess.CompletedProcess) -> None: + """ + Performs validations on import data. + """ + import_inputs = import_spec.get('import_inputs', []) + for import_input in import_inputs: + mcf_path = import_input['node_mcf'] + current_data_path = os.path.join(absolute_import_dir, mcf_path) + previous_data_path = os.path.join(absolute_import_dir, + mcf_path) + '.old' + differ_results_path = os.path.join(absolute_import_dir, 'results') + config_file_path = os.path.join(absolute_import_dir, + self.config.validation_config_file) + + # Download previous import data. + bucket = storage.Client(self.config.gcs_project_id).bucket( + self.config.storage_prod_bucket_name) + folder = relative_import_dir + '/' + import_spec['import_name'] + '/' + blob = bucket.blob(folder + 'latest_version.txt') + blob = bucket.blob(folder + blob.download_as_text() + '/' + + mcf_path) + blob.download_to_filename(previous_data_path) + + # Invoke data differ script. + differ_script_path = os.path.join(repo_dir, 'tools', 'differ', + 'differ.py') + differ_script_args: List[str] = ('--current_data=' + + current_data_path, + '--previous_data=' + + previous_data_path, + '--output_location=' + + differ_results_path) + process = _run_user_script( + interpreter_path=interpreter_path, + script_path=differ_script_path, + timeout=self.config.user_script_timeout, + args=differ_script_args, + cwd=absolute_import_dir, + env=self.config.user_script_env, + ) + _log_process(process=process) + process.check_returncode() + + # Invoke data validation script. + validation_script_path = os.path.join(repo_dir, 'tools', + 'validation', 'validation.py') + validation_script_args: List[str] = ('--differ_output_location=' + + differ_results_path, + '--config_file=' + + config_file_path) + process = _run_user_script( + interpreter_path=interpreter_path, + script_path=validation_script_path, + timeout=self.config.user_script_timeout, + args=validation_script_args, + cwd=absolute_import_dir, + env=self.config.user_script_env, + ) + _log_process(process=process) + process.check_returncode() + + def _invoke_import_job(self, absolute_import_dir: str, import_spec: dict, + version: str, interpreter_path: str, + process: subprocess.CompletedProcess) -> None: + script_paths = import_spec.get('scripts') + for path in script_paths: + script_path = os.path.join(absolute_import_dir, path) + simple_job = cloud_run_simple_import.get_simple_import_job_id( + import_spec, script_path) + if simple_job: + # Running simple import as cloud run job. + cloud_run_simple_import.cloud_run_simple_import_job( + import_spec=import_spec, + config_file=script_path, + env=self.config.user_script_env, + version=version, + image=import_spec.get('image'), + ) + else: + # Run import script locally. + process = _run_user_script( + interpreter_path=interpreter_path, + script_path=script_path, + timeout=self.config.user_script_timeout, + args=self.config.user_script_args, + cwd=absolute_import_dir, + env=self.config.user_script_env, + ) + _log_process(process=process) + process.check_returncode() + def _import_one_helper( self, repo_dir: str, @@ -351,36 +446,23 @@ def _import_one_helper( _log_process(process=process) process.check_returncode() - script_paths = import_spec.get('scripts') - for path in script_paths: - script_path = os.path.join(absolute_import_dir, path) - simple_job = cloud_run_simple_import.get_simple_import_job_id( - import_spec, script_path) - if simple_job: - # Running simple import as cloud run job. - cloud_run_simple_import.cloud_run_simple_import_job( - import_spec=import_spec, - config_file=script_path, - env=self.config.user_script_env, - version=version, - image=import_spec.get('image'), - ) - else: - # Run import script locally. - script_interpreter = _get_script_interpreter( - script_path, interpreter_path) - process = _run_user_script( - interpreter_path=script_interpreter, - script_path=script_path, - timeout=self.config.user_script_timeout, - args=self.config.user_script_args, - cwd=absolute_import_dir, - env=self.config.user_script_env, - name=import_name, - ) - _log_process(process=process) - process.check_returncode() - + self._invoke_import_job(absolute_import_dir=absolute_import_dir, + import_spec=import_spec, + version=version, + interpreter_path=interpreter_path, + process=process) + + if self.config.invoke_import_validation: + self._invoke_import_validation( + repo_dir=repo_dir, + relative_import_dir=relative_import_dir, + absolute_import_dir=absolute_import_dir, + import_spec=import_spec, + interpreter_path=interpreter_path, + process=process) + + if self.config.skip_gcs_upload: + return inputs = self._upload_import_inputs( import_dir=absolute_import_dir, output_dir=f'{relative_import_dir}/{import_name}',