Fetch, validate and arrange the data required by the Open Targets data pipeline.
PIS was formerly short for Platform Input Support, but after the merge of the platform and genetics products, we believe the name was no longer fitting.
PIS uses UV as its package manager. It is compatible with PIP, so you can also fall back to it if you feel more comfortable.
Note
PIS will be uploaded to Pypi once it is ready to use. In the meantime, you can run it locally with make or directly by using uv:
To run PIS with UV, you can use the following commands:
uv run pis -h
Tip
You can also use PIS with Make. Running make
without
any target shows help.
docker run ghcr.io/opentargets/pis:latest -h
PIS can upload the files it fetches into different cloud storage services. Open Targets uses Google Cloud. To enable it in a docker container, you must have a credentials file. Assuming you do, you can run the following command:
docker run \
-v /path/to/credentials.json:/app/credentials.json \
-e GOOGLE_APPLICATION_CREDENTIALS=/app/credentials.json \
ghcr.io/opentargets/pis:latest -h
To build your own Docker image, run the following command from the root of the repository:
docker build -t pis .
Note
Take a look at the API documentation, it is a very helpful guide when developing new tasks.
Important
Remember to run make dev
before starting development. This will set up a very simple git hook
that does a few checks before committing.
Development of PIS can be done straight away in the local environment. You can run the application
just like before (uv run pis
) to check the changes you make. Alternatively, you can run the app
from inside the virtual environment:
source .venv/bin/activate
pis -h
You can test the changes by running a small step, like so
:
uv run pis --step so
PIS is designed to run a series of steps which acquire the data for the Open Targets pipeline. Only one step is run in every execution, but the idea is still to run them all, we'll call this a pipeline run (although the pipeline is larger, PIS is just the first part).
If needed, a simple bash for
loop could be used to run multiple steps:
for step in go so; do (pis -s $step) &; done; wait
But the idea is to run PIS with the orchestrator, which uses Apache Airflow to run the steps in parallel.
The different steps are defined as a series of tasks in the configuration file. Those tasks must always generate a resource. That resource will be used by the next step in the pipeline. The resource is validated and can be uploaded into a remote location (we have implemented Google Cloud Storage connectors for now).
One execution of PIS will perform the following:
- Parse command line options, environment variables and configuration file.
- Load the available tasks into a registry.
- Ensure the local work directory exists and is writable.
- Run the step, which is divided into four phases:
- Initialization: A series of pretasks that prepare the execution of the step. Examples are getting a file list, or dynamically spawning more tasks to run in the main phase.
- Staging: Main phase of the step. It is made of tasks that perform the actual work. Examples of tasks are downloading a file from a remote location, or fetching an index from elasticsearch.
- Validation: Once tasks have run, a series of validators is executed on the results.
- Upload: Optionally, the resulting resources are uploaded somewhere.
- Write a report of the execution to a manifest file.
Important
In the staging, validation and upload phases, the tasks are run in parallel.
Pretasks and tasks are defined in the tasks
module. They both inherit from a base class that provides
some common functionality and defines the interface that a task must implement.
A task is a class that defines a run
method. This method is called by the pipeline runner and is
where the actual work is done. The task can also define a validate
and an upload
method. Those
are optional. Not implementing a validate
means no validation will be run on the results of the task.
upload
is not usually needed as the base class provides a default implementation.
Warning
The run
, validate
and upload
methods must return self
. This is because the pipeline runner
uses the return value to know the state of the task.
Tip
The run
, validate
and upload
methods have an abort
Event
argument that can be used to stop the execution of the task when a general abort
signal is produced anywhere in the step run. This is useful, for example, to stop a download early
when another task fails.
Tasks will be spawned from a registry by parsing the configuration file and using the first word in the task name as the task class name. So for example, if a task is defined as:
- name: download an example file
source: https://example.com/file.txt
destination: /path/to/file.txt
PIS will spawn a Download
task with the arguments source
and destination
.
Each task can have a different set of arguments, which are defined in TaskDefinition
class. There are
two requirements for the arguments of a task:
- All tasks, including pretasks, must have a
name
argument, for obvious reasons. - Main tasks must have a
destination
argument. This is to remember implementers that the purpose of a task is to generate something that will be used by the next step in the pipeline.
There is an example task: HelloWorld
.
Validators are defined in the validators
module. They are just functions that return a boolean value.
They will be run in the validate
method of tasks, by using the v
wrapper. v
takes a validator and
a list of arguments to pass to it. If during the execution of the validate
method, any call to v
returns False
, the validation will stop, raise a ValidationError
and the task will be marked as
failed.
PIS automatically generates a report of every execution. This report is appended into a JSON file that is overarching to a complete run of the pipeline (all the steps).
The manifest file contains:
- A log of the executions in the context of a whole pipeline run, including a timestamp, duration and a summary of the state the whole is in after that.
- A report for the last run of each step in the pipeline. The report includes the state of the step, the duration of the execution, a very simple log of what happened and a list of the resources generated by the step.
- For each step, a list of reports on all the tasks run by it. These include the resulting state of the task, the timestamp, a detailed log, and the whole configuration of the task.
Once a step run has finished, PIS attempts to retrieve a previous manifest from the remote uri that is specified in the config file and from the local work directory. If it finds one, it will append the new report to it. Then, the new manifest is saved locally and uploaded again.
The manifest management is automated in the tasks, so there is no need to handle it. The base class will take care of it. Any errors raised will be caught and logged, and any logs will be also directed to a handler that writes to the manifest.