Skip to content

Commit

Permalink
Add run_job module
Browse files Browse the repository at this point in the history
Towards #123
  • Loading branch information
penelopeysm committed Jul 1, 2024
1 parent 86ab0ed commit 6d0a49d
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 1 deletion.
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ dev = [
"types-requests", # Required for type checking requests
"urllib3<2", # Pin this, pending this PR for dagster https://github.com/dagster-io/dagster/pull/16738
"pre-commit", # Used for managing pre-commit hooks
"pyright >=1.1.339" # Used for static type checking (mypy is not yet compatible with Dagster)
"pyright >=1.1.339", # Used for static type checking (mypy is not yet compatible with Dagster)
"python-dotenv >=1.0.1", # For sourcing .env
]
docs = [
"mkdocs >=1.6.0"
Expand Down
132 changes: 132 additions & 0 deletions python/popgetter/run_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
"""
run_job.py
----------
This module can be used to materialise all assets within a job in an
'intelligent' way.
Specifically, this module was written to solve a specific problem. One can run
an _unpartitioned_ job using the Dagster CLI:
dagster job launch -j <job_name>
However, that doesn't work for _partitioned_ assets. When it reaches the first
partitioned asset, the command will crash. For those you need to do:
dagster job backfill -j <job_name> --noprompt
but if the partitioned assets depend on unpartitioned ones, the upstream
unpartitioned assets will be run one time for each partition, which is
extremely inefficient.
This module provides a way to sequentially materialise assets within a job in
much the same way as one might do manually via the web UI: that is,
materialising the most upstream asset first, then its reverse dependencies, and
so on. It handles both assets that are unpartitioned as well as those with
dynamic partitions (static partitions are not supported, but popgetter does not
have any such assets, so this is not a problem for now).
To use it, run:
python -m popgetter.run_job <job_name>
Note that you must set a $DAGSTER_HOME environment variable, and any other
environment variables that are required for the job to run successfully. This
script will source a `.env` file in your working directory, which is similar to
Dagster's behaviour, so you can simply use that file.
"""

from . import defs
import time
from dagster import materialize, DagsterInstance, DynamicPartitionsDefinition
import argparse
from dotenv import load_dotenv

def find_materialisable_asset_names(dep_list, done_asset_names: set[str]) -> set[str]:
"""Given a dictionary of {node: dependencies} and a set of asset names
which have already been materialised, return a set of asset names which
haven't been materialized yet but can now be.
dep_list should be obtained from
defs.get_job_def(job_name)._graph_def._dependencies.
"""
materialisable_asset_names = set()

for asset, dep_dict in dep_list.items():
if asset.name in done_asset_names:
continue

if all(dep.node in done_asset_names for dep in dep_dict.values()):
materialisable_asset_names.add(asset.name)

return materialisable_asset_names


def run_job(job_name: str, delay: float):
load_dotenv()
job = defs.get_job_def(job_name)

# Required for persisting outputs in $DAGSTER_HOME/storage
instance = DagsterInstance.get()

dependency_list = job._graph_def._dependencies
all_assets = {node_handle.name: definition
for node_handle, definition in
job._asset_layer.assets_defs_by_node_handle.items()}

materialised_asset_names = set()
while len(materialised_asset_names) < len(all_assets):
asset_names_to_materialise = find_materialisable_asset_names(dependency_list, materialised_asset_names)

if len(asset_names_to_materialise) == 0:
print("No more assets to materialise")
break

asset_name_to_materialise = asset_names_to_materialise.pop()
asset_to_materialise = all_assets.get(asset_name_to_materialise)

print(f"Materialising: {asset_name_to_materialise}")

partitions_def = asset_to_materialise.partitions_def

if partitions_def is None:
# Unpartitioned

# https://docs.dagster.io/_apidocs/execution#dagster.materialize -- note
# that the `assets` keyword argument needs to include upstream assets as
# well. We use `selection` to specify the asset that is actually being
# materialised.
materialize(assets=[asset_to_materialise,
*(all_assets.get(k) for k in materialised_asset_names)],
selection=[asset_to_materialise],
instance=instance)
time.sleep(delay)
materialised_asset_names.add(asset_name_to_materialise)

else:
# Partitioned
if type(partitions_def) != DynamicPartitionsDefinition:
# Everything in popgetter is dynamically partitioned so we
# should not run into this.
raise NotImplementedError("Non-dynamic partitions not implemented yet")
partition_names = instance.get_dynamic_partitions(partitions_def.name)

for partition in partition_names:
print(f" - with partition key: {partition}")
time.sleep(delay)
materialize(assets=[asset_to_materialise,
*(all_assets.get(k) for k in materialised_asset_names)],
selection=[asset_to_materialise],
partition_key=partition,
instance=instance)
materialised_asset_names.add(asset_name_to_materialise)


if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Run a job (in an intelligent way)")
parser.add_argument("job_name", type=str,
help="Name of the job to run")
parser.add_argument("--delay", type=float, default=0.5,
help="Delay between materialising successive assets")
args = parser.parse_args()
run_job(args.job_name, args.delay)

0 comments on commit 6d0a49d

Please sign in to comment.