Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Invoke validations from import executor #1141

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions import-automation/executor/app/configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,12 @@ class ExecutorConfig:
user_script_args: List[str] = ()
# Environment variables for the user script
user_script_env: dict = None
# Skip uploading the data to GCS (for local testing).
skip_gcs_upload: bool = True
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we turn this to False in the config_overrides.json?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have removed this flag from this PR

# Invoke validations before upload.
invoke_import_validation: bool = True
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep this as False until we add call to dc-import so the previous version of MCF is stored in GCS

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

# Import validation config file.
validation_config_file: str = 'tools/validation/config.json'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we call this validation_config.json?
There are a lot of config.json files in the repo.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

# Maximum time venv creation can take in seconds.
venv_create_timeout: float = 3600
# Maximum time downloading a file can take in seconds.
Expand Down
142 changes: 112 additions & 30 deletions import-automation/executor/app/executor/import_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from app.service import file_uploader
from app.service import github_api
from app.service import import_service
from google.cloud import storage

# Email address for status messages.
_DEBUG_EMAIL_ADDR = '[email protected]'
Expand Down Expand Up @@ -318,6 +319,100 @@ def _import_one(
)
raise exc

def _invoke_import_validation(self, repo_dir: str, relative_import_dir: str,
absolute_import_dir: str, import_spec: dict,
interpreter_path: str,
process: subprocess.CompletedProcess) -> None:
"""
Performs validations on import data.
"""
import_inputs = import_spec.get('import_inputs', [])
for import_input in import_inputs:
mcf_path = import_input['node_mcf']
current_data_path = os.path.join(absolute_import_dir, mcf_path)
previous_data_path = os.path.join(absolute_import_dir,
mcf_path) + '.old'
differ_results_path = os.path.join(absolute_import_dir, 'results')
config_file_path = os.path.join(absolute_import_dir,
self.config.validation_config_file)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be an import specific validation config declared in the manifest.json and default to config.validation_config_file if not declared?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I have added a conditional assignment to check for it in the import spec first, else use the default value from the config


# Download previous import data.
bucket = storage.Client(self.config.gcs_project_id).bucket(
self.config.storage_prod_bucket_name)
folder = relative_import_dir + '/' + import_spec['import_name'] + '/'
blob = bucket.blob(folder + 'latest_version.txt')
blob = bucket.blob(folder + blob.download_as_text() + '/' +
mcf_path)
blob.download_to_filename(previous_data_path)

# Invoke data differ script.
differ_script_path = os.path.join(repo_dir, 'tools', 'differ',
'differ.py')
differ_script_args: List[str] = ('--current_data=' +
current_data_path,
'--previous_data=' +
previous_data_path,
'--output_location=' +
differ_results_path)
process = _run_user_script(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we call the differ module directly instead of a process?

interpreter_path=interpreter_path,
script_path=differ_script_path,
timeout=self.config.user_script_timeout,
args=differ_script_args,
cwd=absolute_import_dir,
env=self.config.user_script_env,
)
_log_process(process=process)
process.check_returncode()

# Invoke data validation script.
validation_script_path = os.path.join(repo_dir, 'tools',
'validation', 'validation.py')
validation_script_args: List[str] = ('--differ_output_location=' +
differ_results_path,
'--config_file=' +
config_file_path)
process = _run_user_script(
interpreter_path=interpreter_path,
script_path=validation_script_path,
timeout=self.config.user_script_timeout,
args=validation_script_args,
cwd=absolute_import_dir,
env=self.config.user_script_env,
)
_log_process(process=process)
process.check_returncode()

def _invoke_import_job(self, absolute_import_dir: str, import_spec: dict,
version: str, interpreter_path: str,
process: subprocess.CompletedProcess) -> None:
script_paths = import_spec.get('scripts')
for path in script_paths:
script_path = os.path.join(absolute_import_dir, path)
simple_job = cloud_run_simple_import.get_simple_import_job_id(
import_spec, script_path)
if simple_job:
# Running simple import as cloud run job.
cloud_run_simple_import.cloud_run_simple_import_job(
import_spec=import_spec,
config_file=script_path,
env=self.config.user_script_env,
version=version,
image=import_spec.get('image'),
)
else:
# Run import script locally.
process = _run_user_script(
interpreter_path=interpreter_path,
script_path=script_path,
timeout=self.config.user_script_timeout,
args=self.config.user_script_args,
cwd=absolute_import_dir,
env=self.config.user_script_env,
)
_log_process(process=process)
process.check_returncode()

def _import_one_helper(
self,
repo_dir: str,
Expand Down Expand Up @@ -351,36 +446,23 @@ def _import_one_helper(
_log_process(process=process)
process.check_returncode()

script_paths = import_spec.get('scripts')
for path in script_paths:
script_path = os.path.join(absolute_import_dir, path)
simple_job = cloud_run_simple_import.get_simple_import_job_id(
import_spec, script_path)
if simple_job:
# Running simple import as cloud run job.
cloud_run_simple_import.cloud_run_simple_import_job(
import_spec=import_spec,
config_file=script_path,
env=self.config.user_script_env,
version=version,
image=import_spec.get('image'),
)
else:
# Run import script locally.
script_interpreter = _get_script_interpreter(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pls retain this as the interpreter path changes for imports using shell scripts

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic is not removed..it's just refactored out into a separate function

script_path, interpreter_path)
process = _run_user_script(
interpreter_path=script_interpreter,
script_path=script_path,
timeout=self.config.user_script_timeout,
args=self.config.user_script_args,
cwd=absolute_import_dir,
env=self.config.user_script_env,
name=import_name,
)
_log_process(process=process)
process.check_returncode()

self._invoke_import_job(absolute_import_dir=absolute_import_dir,
import_spec=import_spec,
version=version,
interpreter_path=interpreter_path,
process=process)

if self.config.invoke_import_validation:
self._invoke_import_validation(
repo_dir=repo_dir,
relative_import_dir=relative_import_dir,
absolute_import_dir=absolute_import_dir,
import_spec=import_spec,
interpreter_path=interpreter_path,
process=process)

if self.config.skip_gcs_upload:
return
inputs = self._upload_import_inputs(
import_dir=absolute_import_dir,
output_dir=f'{relative_import_dir}/{import_name}',
Expand Down
Loading