Skip to content

Commit

Permalink
Merge pull request #17 from CDLUC3/feature/dmp-works-matching
Browse files Browse the repository at this point in the history
Match works to DMPs
  • Loading branch information
briri authored Nov 12, 2024
2 parents 2b81a92 + 8c8c092 commit a3d1a92
Show file tree
Hide file tree
Showing 38 changed files with 2,737 additions and 0 deletions.
2 changes: 2 additions & 0 deletions queries/dmptool-workflows/.astro/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
project:
name: dmptool-workflows
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Add dag files to exempt from parse test below. ex: dags/<test-file>
8 changes: 8 additions & 0 deletions queries/dmptool-workflows/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
astro
.git
.env
airflow_settings.yaml
logs/
.venv
airflow.db
airflow.cfg
13 changes: 13 additions & 0 deletions queries/dmptool-workflows/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
.env
.DS_Store
airflow_settings.yaml
__pycache__/
astro
.venv
airflow-webserver.pid
webserver_config.py
airflow.cfg
airflow.db
.idea
workflows-config.yaml
docker-compose.override.yml
12 changes: 12 additions & 0 deletions queries/dmptool-workflows/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
FROM quay.io/astronomer/astro-runtime:9.10.0

# Root user for installations
USER root

# Install git
RUN apt-get update && apt-get install git -y
USER astro

# Install Observatory Platform
RUN git clone --branch feature/astro_kubernetes https://github.com/The-Academic-Observatory/observatory-platform.git
RUN pip install -e ./observatory-platform/ --constraint https://raw.githubusercontent.com/apache/airflow/constraints-2.7.3/constraints-no-providers-3.10.txt
21 changes: 21 additions & 0 deletions queries/dmptool-workflows/LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
MIT License

Copyright (c) 2024 UC Curation Center (California Digital Library)

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
98 changes: 98 additions & 0 deletions queries/dmptool-workflows/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
# DMPTool Apache Airflow Workflows
Astronomer.io based Apache Airflow workflows for matching academic works to DMPTool DMPs using the Academic Observatory
BigQuery datasets.

## Dependencies
Install the Astro CLI: https://www.astronomer.io/docs/astro/cli/install-cli

Clone the project and enter the `dmptool-workflows` directory:
```bash
git clone --branch feature/dmp-works-matching [email protected]:CDLUC3/dmsp_api_prototype.git
cd dmsp_api_prototype/queries/dmptool-workflows
```

Setup a Python virtual environment:
```bash
python3.10 -m venv venv
source venv/bin/activate
```

Install Python dependencies:
```bash
pip install git+https://github.com/The-Academic-Observatory/observatory-platform.git@feature/astro_kubernetes --constraint https://raw.githubusercontent.com/apache/airflow/constraints-2.7.3/constraints-no-providers-3.10.txt
```

## Local Development Setup
Add the following to your `.env` file:
```bash
GOOGLE_APPLICATION_CREDENTIALS=/usr/local/airflow/gcloud/application_default_credentials.json
```

Add `docker-compose.override.yml` to the root of this project and customise the path to the Google Cloud credentials file:
```commandline
version: "3.1"
services:
scheduler:
volumes:
- /path/to/host/google-application-credentials.json:/usr/local/airflow/gcloud/application_default_credentials.json:ro
webserver:
volumes:
- /path/to/host/google-application-credentials.json:/usr/local/airflow/gcloud/application_default_credentials.json:ro
triggerer:
volumes:
- /path/to/host/google-application-credentials.json:/usr/local/airflow/gcloud/application_default_credentials.json:ro
```

Customise the `workflow-config.yaml` file:
```yaml
cloud_workspaces:
- workspace: &dmptool_dev
project_id: academic-observatory-project-id
output_project_id: my-project-id
download_bucket: my-download-bucket
transform_bucket: my-transform-bucket
data_location: us

workflows:
- dag_id: "dmp_match_workflow"
name: "DMP Match Workflow"
class_name: "dmptool_workflows.dmp_match_workflow.workflow"
cloud_workspace: *dmptool_dev
```
Convert `workflow-config.yaml` to JSON:
```bash
yq -o=json '.workflows' workflows-config.yaml | jq -c .
```

Create or add the following to `airflow_settings.yaml`, making sure to paste the JSON output from above into the
WORKFLOWS variable_value:
```yaml
airflow:
variables:
- variable_name: DATA_PATH
variable_value: /home/astro/data
- variable_name: WORKFLOWS
variable_value: REPLACE WITH WORKFLOWS JSON
```

## Running Airflow locally
Run the following command:
```bash
astro dev start
```

Then open the Airflow UI and run the workflow at: http://localhost:8080

## Running the Queries
You may also run or generate the queries. Customise the project IDs and the shard date (the shard date of the dmps_raw
table). Add `--dry-run` to just generate the SQL queries and not run them.
```bash
cd bin
export PYTHONPATH=/path/to/dmptool-workflows/dags:$PYTHONPATH
python3 run_queries.py ao-project-id my-project-id YYYY-MM-DD
```

## Deploy
TODO
1 change: 1 addition & 0 deletions queries/dmptool-workflows/bin/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*.sql
56 changes: 56 additions & 0 deletions queries/dmptool-workflows/bin/run_queries.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import argparse

import pendulum
from pendulum.exceptions import ParserError

from dmptool_workflows.dmp_match_workflow.tasks import create_dmp_matches


def valid_date(date_str):
try:
return pendulum.parse(date_str).date()
except ParserError:
raise argparse.ArgumentTypeError(f"Not a valid date: '{date_str}'. Expected format: YYYY-MM-DD.")


def parse_args():
parser = argparse.ArgumentParser(description="Process input and output project IDs, and dataset ID.")

# Required arguments
parser.add_argument("ao_project_id", type=str, help="The Academic Observatory Google Cloud project ID")
parser.add_argument("dmps_project_id", type=str, help="The DMPs Google Cloud project ID")
parser.add_argument("release_date", type=valid_date, help="The date for sharding the results.")

# Optional argument with a default value
parser.add_argument(
"--dataset_id", type=str, default="cdl_dmps_test", help="The BigQuery dataset ID where results should be stored"
)
parser.add_argument(
"--vertex_ai_model_id", type=str, default="text-multilingual-embedding-002", help="The Vertex AI model ID"
)
parser.add_argument(
"--weighted_count_threshold",
type=int,
default=3,
help="The threshold to pre-filter intermediate matches before running embeddings and vector search",
)
parser.add_argument(
"--max_matches", type=int, default=100, help="The maximum number of matches to return for each DMP"
)
parser.add_argument("--dry_run", action="store_true", help="Whether to do a dry run of the queries")

return parser.parse_args()


if __name__ == "__main__":
args = parse_args()
create_dmp_matches(
ao_project_id=args.ao_project_id,
dmps_project_id=args.dmps_project_id,
dataset_id=args.dataset_id,
release_date=args.release_date,
vertex_ai_model_id=args.vertex_ai_model_id,
weighted_count_threshold=args.weighted_count_threshold,
max_matches=args.max_matches,
dry_run=args.dry_run,
)
Empty file.
Empty file.
Empty file.
16 changes: 16 additions & 0 deletions queries/dmptool-workflows/dags/dmptool_workflows/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import os

from observatory_platform.config import module_file_path


def project_path(*subpaths: str) -> str:
"""Make a path to a file or folder within this project.
:param subpaths: any sub paths.
:return: a path to a file or folder.
"""

path = os.path.join(module_file_path("dmptool_workflows"), *subpaths)
if not os.path.exists(path):
raise FileNotFoundError(f"project_path: path or file {path} does not exist!")
return path
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from functools import cached_property

import observatory_platform.google.bigquery as bq


class AcademicObservatoryDataset:
def __init__(self, project_id: str):
self.project_id = project_id
self.ror_dataset = RORDataset(project_id)
self.openalex_dataset = OpenAlexDataset(project_id)
self.crossref_metadata_dataset = CrossrefMetadataDataset(project_id)
self.datacite_dataset = DataCiteDataset(project_id)


class RORDataset:
def __init__(self, project_id: str):
self.project_id = project_id

@cached_property
def ror(self):
return bq.bq_select_latest_table(bq.bq_table_id(self.project_id, "ror", "ror"))


class OpenAlexDataset:
def __init__(self, project_id: str):
self.project_id = project_id

@property
def works(self):
return bq.bq_table_id(self.project_id, "openalex", "works")

@property
def funders(self):
return bq.bq_table_id(self.project_id, "openalex", "funders")


class CrossrefMetadataDataset:
def __init__(self, project_id: str):
self.project_id = project_id

@cached_property
def crossref_metadata(self):
return bq.bq_select_latest_table(bq.bq_table_id(self.project_id, "crossref_metadata", "crossref_metadata"))


class DataCiteDataset:
def __init__(self, project_id: str):
self.project_id = project_id

@cached_property
def datacite(self):
return bq.bq_select_latest_table(bq.bq_table_id(self.project_id, "datacite", "datacite"))
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import os

import observatory_platform.google.bigquery as bq
import pendulum


class DMPToolDataset:
def __init__(self, project_id: str, dataset_id: str, release_date: pendulum.Date):
self.dmp_dataset = DMPDataset(project_id, dataset_id, release_date)
self.openalex_match_dataset = MatchDataset(project_id, dataset_id, "openalex", release_date)
self.crossref_match_dataset = MatchDataset(project_id, dataset_id, "crossref", release_date)
self.datacite_match_dataset = MatchDataset(project_id, dataset_id, "datacite", release_date)
self.all_datasets = [
self.dmp_dataset,
self.openalex_match_dataset,
self.crossref_match_dataset,
self.datacite_match_dataset,
]
self.match_datasets = [self.openalex_match_dataset, self.crossref_match_dataset, self.datacite_match_dataset]


class DMPDataset:
def __init__(self, project_id: str, dataset_id: str, release_date: pendulum.Date):
self.name = "dmps"
self.project_id = project_id
self.dataset_id = dataset_id
self.release_date = release_date

@property
def dmps_raw(self):
return bq.bq_sharded_table_id(self.project_id, self.dataset_id, "dmps_raw", self.release_date)

@property
def normalised(self):
return bq.bq_sharded_table_id(self.project_id, self.dataset_id, "dmps", self.release_date)

@property
def content(self):
return bq.bq_sharded_table_id(self.project_id, self.dataset_id, "dmps_content", self.release_date)

@property
def content_embeddings(self):
return bq.bq_sharded_table_id(self.project_id, self.dataset_id, "dmps_content_embeddings", self.release_date)


class MatchDataset:
def __init__(self, project_id: str, dataset_id: str, name: str, release_date: pendulum.Date):
self.project_id = project_id
self.dataset_id = dataset_id
self.name = name
self.release_date = release_date

@property
def normalised(self):
return bq.bq_sharded_table_id(self.project_id, self.dataset_id, self.name, self.release_date)

@property
def match_intermediate(self):
return bq.bq_sharded_table_id(
self.project_id, self.dataset_id, f"{self.name}_match_intermediate", self.release_date
)

@property
def content(self):
return bq.bq_sharded_table_id(self.project_id, self.dataset_id, f"{self.name}_content", self.release_date)

@property
def content_embeddings(self):
return bq.bq_sharded_table_id(
self.project_id, self.dataset_id, f"{self.name}_content_embeddings", self.release_date
)

@property
def match(self):
return bq.bq_sharded_table_id(self.project_id, self.dataset_id, f"{self.name}_match", self.release_date)

def destination_uri(self, bucket_name: str, dag_id: str) -> str:
return make_destination_uri(bucket_name, dag_id, self.release_date, self.name)

def local_file_path(self, download_folder: str) -> str:
return os.path.join(download_folder, f"{self.name}.jsonl.gz")


def make_destination_uri(bucket_name: str, dag_id: str, release_date: pendulum.Date, source: str) -> str:
date_str = release_date.strftime("%Y%m%d")
return f"gs://{bucket_name}/{dag_id}_{date_str}/{source}-*.jsonl.gz"


def make_prefix(dag_id: str, release_date: pendulum.Date) -> str:
date_str = release_date.strftime("%Y%m%d")
return f"{dag_id}_{date_str}"
Loading

0 comments on commit a3d1a92

Please sign in to comment.