-
Notifications
You must be signed in to change notification settings - Fork 113
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Invoke validations from import executor #1141
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -111,6 +111,12 @@ class ExecutorConfig: | |
user_script_args: List[str] = () | ||
# Environment variables for the user script | ||
user_script_env: dict = None | ||
# Skip uploading the data to GCS (for local testing). | ||
skip_gcs_upload: bool = True | ||
# Invoke validations before upload. | ||
invoke_import_validation: bool = True | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's keep this as False until we add call to dc-import so the previous version of MCF is stored in GCS There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
# Import validation config file. | ||
validation_config_file: str = 'tools/validation/config.json' | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we call this validation_config.json? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
# Maximum time venv creation can take in seconds. | ||
venv_create_timeout: float = 3600 | ||
# Maximum time downloading a file can take in seconds. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -34,6 +34,7 @@ | |
from app.service import file_uploader | ||
from app.service import github_api | ||
from app.service import import_service | ||
from google.cloud import storage | ||
|
||
# Email address for status messages. | ||
_DEBUG_EMAIL_ADDR = '[email protected]' | ||
|
@@ -318,6 +319,100 @@ def _import_one( | |
) | ||
raise exc | ||
|
||
def _invoke_import_validation(self, repo_dir: str, relative_import_dir: str, | ||
absolute_import_dir: str, import_spec: dict, | ||
interpreter_path: str, | ||
process: subprocess.CompletedProcess) -> None: | ||
""" | ||
Performs validations on import data. | ||
""" | ||
import_inputs = import_spec.get('import_inputs', []) | ||
for import_input in import_inputs: | ||
mcf_path = import_input['node_mcf'] | ||
current_data_path = os.path.join(absolute_import_dir, mcf_path) | ||
previous_data_path = os.path.join(absolute_import_dir, | ||
mcf_path) + '.old' | ||
differ_results_path = os.path.join(absolute_import_dir, 'results') | ||
config_file_path = os.path.join(absolute_import_dir, | ||
self.config.validation_config_file) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can this be an import specific validation config declared in the manifest.json and default to config.validation_config_file if not declared? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes I have added a conditional assignment to check for it in the import spec first, else use the default value from the config |
||
|
||
# Download previous import data. | ||
bucket = storage.Client(self.config.gcs_project_id).bucket( | ||
self.config.storage_prod_bucket_name) | ||
folder = relative_import_dir + '/' + import_spec['import_name'] + '/' | ||
blob = bucket.blob(folder + 'latest_version.txt') | ||
blob = bucket.blob(folder + blob.download_as_text() + '/' + | ||
mcf_path) | ||
blob.download_to_filename(previous_data_path) | ||
|
||
# Invoke data differ script. | ||
differ_script_path = os.path.join(repo_dir, 'tools', 'differ', | ||
'differ.py') | ||
differ_script_args: List[str] = ('--current_data=' + | ||
current_data_path, | ||
'--previous_data=' + | ||
previous_data_path, | ||
'--output_location=' + | ||
differ_results_path) | ||
process = _run_user_script( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we call the differ module directly instead of a process? |
||
interpreter_path=interpreter_path, | ||
script_path=differ_script_path, | ||
timeout=self.config.user_script_timeout, | ||
args=differ_script_args, | ||
cwd=absolute_import_dir, | ||
env=self.config.user_script_env, | ||
) | ||
_log_process(process=process) | ||
process.check_returncode() | ||
|
||
# Invoke data validation script. | ||
validation_script_path = os.path.join(repo_dir, 'tools', | ||
'validation', 'validation.py') | ||
validation_script_args: List[str] = ('--differ_output_location=' + | ||
differ_results_path, | ||
'--config_file=' + | ||
config_file_path) | ||
process = _run_user_script( | ||
interpreter_path=interpreter_path, | ||
script_path=validation_script_path, | ||
timeout=self.config.user_script_timeout, | ||
args=validation_script_args, | ||
cwd=absolute_import_dir, | ||
env=self.config.user_script_env, | ||
) | ||
_log_process(process=process) | ||
process.check_returncode() | ||
|
||
def _invoke_import_job(self, absolute_import_dir: str, import_spec: dict, | ||
version: str, interpreter_path: str, | ||
process: subprocess.CompletedProcess) -> None: | ||
script_paths = import_spec.get('scripts') | ||
for path in script_paths: | ||
script_path = os.path.join(absolute_import_dir, path) | ||
simple_job = cloud_run_simple_import.get_simple_import_job_id( | ||
import_spec, script_path) | ||
if simple_job: | ||
# Running simple import as cloud run job. | ||
cloud_run_simple_import.cloud_run_simple_import_job( | ||
import_spec=import_spec, | ||
config_file=script_path, | ||
env=self.config.user_script_env, | ||
version=version, | ||
image=import_spec.get('image'), | ||
) | ||
else: | ||
# Run import script locally. | ||
process = _run_user_script( | ||
interpreter_path=interpreter_path, | ||
script_path=script_path, | ||
timeout=self.config.user_script_timeout, | ||
args=self.config.user_script_args, | ||
cwd=absolute_import_dir, | ||
env=self.config.user_script_env, | ||
) | ||
_log_process(process=process) | ||
process.check_returncode() | ||
|
||
def _import_one_helper( | ||
self, | ||
repo_dir: str, | ||
|
@@ -351,36 +446,23 @@ def _import_one_helper( | |
_log_process(process=process) | ||
process.check_returncode() | ||
|
||
script_paths = import_spec.get('scripts') | ||
for path in script_paths: | ||
script_path = os.path.join(absolute_import_dir, path) | ||
simple_job = cloud_run_simple_import.get_simple_import_job_id( | ||
import_spec, script_path) | ||
if simple_job: | ||
# Running simple import as cloud run job. | ||
cloud_run_simple_import.cloud_run_simple_import_job( | ||
import_spec=import_spec, | ||
config_file=script_path, | ||
env=self.config.user_script_env, | ||
version=version, | ||
image=import_spec.get('image'), | ||
) | ||
else: | ||
# Run import script locally. | ||
script_interpreter = _get_script_interpreter( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Pls retain this as the interpreter path changes for imports using shell scripts There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This logic is not removed..it's just refactored out into a separate function |
||
script_path, interpreter_path) | ||
process = _run_user_script( | ||
interpreter_path=script_interpreter, | ||
script_path=script_path, | ||
timeout=self.config.user_script_timeout, | ||
args=self.config.user_script_args, | ||
cwd=absolute_import_dir, | ||
env=self.config.user_script_env, | ||
name=import_name, | ||
) | ||
_log_process(process=process) | ||
process.check_returncode() | ||
|
||
self._invoke_import_job(absolute_import_dir=absolute_import_dir, | ||
import_spec=import_spec, | ||
version=version, | ||
interpreter_path=interpreter_path, | ||
process=process) | ||
|
||
if self.config.invoke_import_validation: | ||
self._invoke_import_validation( | ||
repo_dir=repo_dir, | ||
relative_import_dir=relative_import_dir, | ||
absolute_import_dir=absolute_import_dir, | ||
import_spec=import_spec, | ||
interpreter_path=interpreter_path, | ||
process=process) | ||
|
||
if self.config.skip_gcs_upload: | ||
return | ||
inputs = self._upload_import_inputs( | ||
import_dir=absolute_import_dir, | ||
output_dir=f'{relative_import_dir}/{import_name}', | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we turn this to False in the config_overrides.json?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have removed this flag from this PR