Skip to content

Commit

Permalink
Import executor migration to cloud run
Browse files Browse the repository at this point in the history
  • Loading branch information
vish-cs committed Dec 26, 2024
1 parent 4bdde25 commit 22ff841
Show file tree
Hide file tree
Showing 9 changed files with 226 additions and 71 deletions.
3 changes: 2 additions & 1 deletion import-automation/executor/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,6 @@ ADD requirements.txt /workspace/requirements.txt
RUN pip install -r /workspace/requirements.txt

COPY app/. /workspace/app/
COPY main.py /workspace/

CMD gunicorn --timeout 0 --workers 5 -b :$PORT app.main:FLASK_APP
ENTRYPOINT ["python","./main.py"]
6 changes: 6 additions & 0 deletions import-automation/executor/app/configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@ class ExecutorConfig:
dashboard_oauth_client_id: str = ''
# Oauth Client ID used to authenticate with the proxy.
importer_oauth_client_id: str = ''
# Artifact registry for docker images.
artifact_registry: str = ''
# Name for the import executor container image.
docker_image_name: str = 'dc-import'
# Service account for the Cloud Run job.
cloud_run_service_account: str = ''
# Access token of the account used to authenticate with GitHub. This is not
# the account password. See
# https://docs.github.com/en/github/authenticating-to-github/creating-a-personal-access-token.
Expand Down
8 changes: 4 additions & 4 deletions import-automation/executor/app/executor/cloud_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,13 @@ def create_or_update_cloud_run_job(
exe_template = run_v2.ExecutionTemplate(template=run_v2.TaskTemplate(
containers=[container]))
new_job = run_v2.Job(template=exe_template)
logging.info(f"Creating job {job_name}: {new_job}")
logging.info(f"Creating job: {job_name}")

# Look for existing job to update
job = None
try:
job = client.get_job(request=run_v2.GetJobRequest(name=job_name))
logging.info(f"Found existing job {job_name}: {job}")
logging.info(f"Found existing job: {job_name}")
except NotFound:
logging.info(f"No existing job, creating new job: {job_name}")

Expand All @@ -85,11 +85,11 @@ def create_or_update_cloud_run_job(
# Update existing Cloud Run job
# Overrides container settings including image, env
job.template.template.containers = new_job.template.template.containers
logging.info(f"Updating job {job_name}: {job}")
logging.info(f"Updating job: {job_name}")
update_request = run_v2.UpdateJobRequest(job=job)
update_operation = client.update_job(request=update_request)
result = update_operation.result() # Blocks until update completes
logging.info(f"Job updated {job_name}: {result}")
logging.info(f"Job updated: {job_name}")
return result


Expand Down
46 changes: 40 additions & 6 deletions import-automation/executor/app/executor/cloud_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from google.protobuf import json_format
from google.api_core.exceptions import AlreadyExists, NotFound

CLOUD_RUN_SERVICE_ACCOUNT = os.getenv('CLOUD_SCHEDULER_CALLER_SA')
GKE_SERVICE_DOMAIN = os.getenv('GKE_SERVICE_DOMAIN',
'importautomation.datacommons.org')
GKE_CALLER_SERVICE_ACCOUNT = os.getenv('CLOUD_SCHEDULER_CALLER_SA')
Expand All @@ -50,15 +51,48 @@ def _base_job_request(absolute_import_name, schedule: str):
# 30m is the max allowed deadline
'seconds': 60 * 30
}
# <'http_request'|'appengine_job_request'>: {...}
# <'gke_job_request'|'appengine_job_request'|'cloud_run_job_request'>: {...}
}


def http_job_request(absolute_import_name,
schedule,
json_encoded_job_body: str,
gke_caller_service_account: str = "",
gke_oauth_audience: str = "") -> Dict:
def cloud_run_job_request(absolute_import_name, schedule,
json_encoded_config: str, cloud_run_job_url: str,
cloud_run_service_account: str) -> Dict:
"""Cloud Scheduler request that targets jossZ [;bs in CLOUD_RUN."""
json_encoded_job_body = json.dumps({
'overrides': {
"containerOverrides": [{
'args': [
f'--import_name={absolute_import_name}',
f'--import_config={json_encoded_config}'
]
}]
}
}).encode("utf-8")
# print(json_encoded_job_body)

job = _base_job_request(absolute_import_name, schedule)
job['name'] = f'{job["name"]}'
job['http_target'] = {
'uri': f'https://{cloud_run_job_url}',
'http_method': 'POST',
'headers': {
'Content-Type': 'application/json',
},
'body': json_encoded_job_body,
'oauth_token': {
'service_account_email': f'{cloud_run_service_account}',
'scope': 'https://www.googleapis.com/auth/cloud-platform'
}
}
return job


def gke_job_request(absolute_import_name,
schedule,
json_encoded_job_body: str,
gke_caller_service_account: str = "",
gke_oauth_audience: str = "") -> Dict:
"""Cloud Scheduler request that targets executors launched in GKE."""

# If the service account and oauth audience are provided as
Expand Down
67 changes: 50 additions & 17 deletions import-automation/executor/app/executor/scheduler_job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,17 @@
import traceback
import tempfile
from typing import Dict
import cloud_run

from app import configs
from app.service import github_api
from app.executor import import_target
from app.executor import import_executor
from app.executor import cloud_scheduler

_GKE_SERVICE_ACCOUNT_KEY: str = 'gke_service_account'
_GKE_OAUTH_AUDIENCE_KEY: str = 'gke_oauth_audience'


def schedule_on_commit(github: github_api.GitHubRepoAPI,
config: configs.ExecutorConfig, commit_sha: str):
Expand Down Expand Up @@ -76,8 +80,8 @@ def schedule_on_commit(github: github_api.GitHubRepoAPI,
relative_dir, spec['import_name'])
logging.info('Scheduling a data update job for %s',
absolute_import_name)
job = _create_or_update_import_schedule(absolute_import_name,
schedule, config)
job = create_or_update_import_schedule(absolute_import_name,
schedule, config, {})
scheduled.append(job)
except Exception:
raise import_executor.ExecutionError(
Expand All @@ -87,27 +91,56 @@ def schedule_on_commit(github: github_api.GitHubRepoAPI,
'No issues')


def _create_or_update_import_schedule(absolute_import_name, schedule: str,
config: configs.ExecutorConfig):
def create_or_update_import_schedule(absolute_import_name: str, schedule: str,
config: configs.ExecutorConfig,
scheduler_config_dict: Dict):
"""Create/Update the import schedule for 1 import."""
# Note: this is the content of what is passed to /update API
# inside each cronjob http calls.
json_encoded_job_body = json.dumps({
'absolute_import_name': absolute_import_name,
'configs': config.get_data_refresh_config()
}).encode()

if config.executor_type == "GKE":
req = cloud_scheduler.http_job_request(absolute_import_name, schedule,
json_encoded_job_body)
# Note: this is the content of what is passed to /update API
# inside each cronjob http calls.
json_encoded_job_body = json.dumps({
'absolute_import_name': absolute_import_name,
'configs': config.get_data_refresh_config()
}).encode('utf-8')
# Before proceeding, ensure that the configs read from GCS have the expected fields.
assert _GKE_SERVICE_ACCOUNT_KEY in scheduler_config_dict
assert _GKE_OAUTH_AUDIENCE_KEY in scheduler_config_dict
service_account_key = scheduler_config_dict[_GKE_SERVICE_ACCOUNT_KEY]
oauth_audience_key = scheduler_config_dict[_GKE_OAUTH_AUDIENCE_KEY]
req = cloud_scheduler.gke_job_request(absolute_import_name, schedule,
json_encoded_job_body,
service_account_key,
oauth_audience_key)
elif config.executor_type == "GAE":
json_encoded_job_body = json.dumps({
'absolute_import_name': absolute_import_name,
'configs': config.get_data_refresh_config()
}).encode('utf-8')
req = cloud_scheduler.appengine_job_request(absolute_import_name,
schedule,
json_encoded_job_body)
elif config.executor_type == "CLOUD_RUN":
image_version = 'latest'
docker_image = f'{config.artifact_registry}/{config.docker_image_name}:{image_version}'
job_name = absolute_import_name.split(':')[1]
logging.info(absolute_import_name)
logging.info(job_name)
job = cloud_run.create_or_update_cloud_run_job(
config.gcp_project_id, config.scheduler_location, job_name,
docker_image, {})
job_id = job.name.rsplit('/', 1)[1]
if not job:
logging.error(
f'Failed to setup cloud run job for {absolute_import_name}')
json_encoded_config = json.dumps(config.get_data_refresh_config())
cloud_run_job_url = f'{config.scheduler_location}-run.googleapis.com/apis/run.googleapis.com/v1/namespaces/{config.gcp_project_id}/jobs/{job_id}:run'
req = cloud_scheduler.cloud_run_job_request(
job_name, schedule, json_encoded_config,
cloud_run_job_url, config.cloud_run_service_account)
return cloud_scheduler.create_or_update_job(config.gcp_project_id,
config.scheduler_location,
req)
else:
raise Exception(
"Invalid executor_type %s, expects one of ('GKE', 'GAE')",
"Invalid executor_type %s, expects one of ('GKE', 'GAE', 'CLOUD_RUN')",
config.executor_type)

return cloud_scheduler.create_or_update_job(config.gcp_project_id,
config.scheduler_location, req)
28 changes: 28 additions & 0 deletions import-automation/executor/cloudbuild.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Builds the docker image of import executor, pushes it to artificat registry and creates a cloud run job using this images.
#
# gcloud builds submit --region=us-west1 --config=cloudbuild.yaml --substitutions=_ARTIFACT_REGISTRY_REPO="dc-images",_IMAGE_NAME="dc-import",_REGION="us-west1",_JOB_NAME="dc-import" .

steps:
# Install dependencies
- name: python
entrypoint: pip
args: ["install", "-r", "requirements.txt", "--user"]

# Docker Build
- name: 'gcr.io/cloud-builders/docker'
args: ['build', '-t',
'${_REGION}-docker.pkg.dev/${PROJECT_ID}/${_ARTIFACT_REGISTRY_REPO}/${_IMAGE_NAME}:latest', '.']

# Docker push to Google Artifact Registry
- name: 'gcr.io/cloud-builders/docker'
args: ['push', '${_REGION}-docker.pkg.dev/${PROJECT_ID}/${_ARTIFACT_REGISTRY_REPO}/${_IMAGE_NAME}:latest']

# Deploy to Cloud Run
- name: google/cloud-sdk
args: ['gcloud', 'run', 'jobs', 'deploy', '${_JOB_NAME}',
'--image=${_REGION}-docker.pkg.dev/${PROJECT_ID}/${_ARTIFACT_REGISTRY_REPO}/${_IMAGE_NAME}:latest',
'--region', '${_REGION}']

#Store images in Google Artifact Registry
images:
- ${_REGION}-docker.pkg.dev/${PROJECT_ID}/${_ARTIFACT_REGISTRY_REPO}/${_IMAGE_NAME}:latest
43 changes: 43 additions & 0 deletions import-automation/executor/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from app import configs
from app.executor import import_executor
from app.service import file_uploader
from app.service import github_api
from app.service import email_notifier

from absl import flags
from absl import app
import logging
import json

FLAGS = flags.FLAGS
flags.DEFINE_string('import_name', '', 'Absoluate import name.')
flags.DEFINE_string('import_config', '', 'Import executor configuration.')


def scheduled_updates(absolute_import_name: str, import_config: str):
logging.info(absolute_import_name)
cfg = json.loads(import_config)
config = configs.ExecutorConfig(**cfg)
logging.info(config)
executor = import_executor.ImportExecutor(
uploader=file_uploader.GCSFileUploader(
project_id=config.gcs_project_id,
bucket_name=config.storage_prod_bucket_name),
github=github_api.GitHubRepoAPI(
repo_owner_username=config.github_repo_owner_username,
repo_name=config.github_repo_name,
auth_username=config.github_auth_username,
auth_access_token=config.github_auth_access_token),
config=config,
notifier=email_notifier.EmailNotifier(config.email_account,
config.email_token))
result = executor.execute_imports_on_update(absolute_import_name)
logging.info(result)


def main(_):
scheduled_updates(FLAGS.import_name, FLAGS.import_config)


if __name__ == '__main__':
app.run(main)
46 changes: 6 additions & 40 deletions import-automation/executor/schedule_update_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,14 @@
from app.executor import import_target
from app.executor import import_executor
from app.executor import cloud_scheduler
from app.executor import scheduler_job_manager
from app.executor import validation
from app.service import email_notifier
from app.service import file_uploader
from app.service import github_api
from google.cloud import storage

_CONFIG_OVERRIDE_FILE: str = 'config_override.json'
_GKE_SERVICE_ACCOUNT_KEY: str = 'gke_service_account'
_GKE_OAUTH_AUDIENCE_KEY: str = 'gke_oauth_audience'

_FLAGS = flags.FLAGS

Expand Down Expand Up @@ -238,34 +237,6 @@ def update(cfg: configs.ExecutorConfig,
return executor.execute_imports_on_update(absolute_import_path)


def schedule(cfg: configs.ExecutorConfig,
absolute_import_name: str,
repo_dir: str,
gke_service_account: str = "",
gke_oauth_audience: str = "") -> Dict:
# This is the content of what is passed to /update API
# inside each cronjob http calls from Cloud Scheduler.
json_encoded_job_body = json.dumps({
'absolute_import_name': absolute_import_name,
'configs': cfg.get_data_refresh_config()
}).encode("utf-8")

# Retrieve the cron schedule.
cron_schedule = _get_cron_schedule(repo_dir, absolute_import_name,
cfg.manifest_filename)

# Create an HTTP Job Request.
req = cloud_scheduler.http_job_request(
absolute_import_name,
cron_schedule,
json_encoded_job_body,
gke_caller_service_account=gke_service_account,
gke_oauth_audience=gke_oauth_audience)

return cloud_scheduler.create_or_update_job(cfg.gcp_project_id,
cfg.scheduler_location, req)


def main(_):
mode = _FLAGS.mode
absolute_import_path = _FLAGS.absolute_import_path
Expand Down Expand Up @@ -332,19 +303,14 @@ def main(_):
_print_fileupload_results(cfg, absolute_import_path)

elif mode == 'schedule':
# Before proceeding, ensure that the configs read from GCS have the expected fields.
assert _GKE_SERVICE_ACCOUNT_KEY in scheduler_config_dict
assert _GKE_OAUTH_AUDIENCE_KEY in scheduler_config_dict

logging.info("*************************************************")
logging.info("***** Beginning Schedule Operation **************")
logging.info("*************************************************")
res = schedule(
cfg,
absolute_import_path,
repo_dir,
gke_service_account=scheduler_config_dict[_GKE_SERVICE_ACCOUNT_KEY],
gke_oauth_audience=scheduler_config_dict[_GKE_OAUTH_AUDIENCE_KEY])
# Retrieve the cron schedule.
cron_schedule = _get_cron_schedule(repo_dir, absolute_import_path,
cfg.manifest_filename)
res = scheduler_job_manager.create_or_update_import_schedule(
absolute_import_path, cron_schedule, cfg, scheduler_config_dict)
logging.info("*************************************************")
logging.info("*********** Schedule Operation Complete. ********")
logging.info("*************************************************")
Expand Down
Loading

0 comments on commit 22ff841

Please sign in to comment.