How does task caching work across flow runs? #15560
-
Bug summaryI have written a fairly simple workflow, and I am expecting the result of the task to be cached, but it is not being cached. Here's an MWE: from prefect import task, flow
from prefect.futures import wait
from prefect.cache_policies import DEFAULT
from time import sleep
@task(persist_result=True, task_run_name="{year}-{day:>03}", cache_policy=DEFAULT)
def run_long_thing(
year: int,
day: int,
) -> str:
# ... actually some long-running code that returns a Path
sleep(1.5)
return f"{year}-{day}"
@flow
def run_mini():
ydays = [(2016, day) for day in range(50)]
outputs = []
for year, day in ydays:
outputs.append(run_long_thing.submit(year, day))
wait(outputs)
print(outputs)
if __name__ == "__main__":
run_mini() When I look at the state of the tasks in the UI, they are shown as I assume I'm doing something silly (just started with Version info (
|
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 2 replies
-
hey @steven-murray - I converted this to a discussion, since I think its more of a question than a bug - hope thats okay with you to be clear, here I'm assuming that you're asking about how to cache task results across flow runs, because it appears that you're submitting your task over consider this variant of your example, where importantly:
from time import sleep
from prefect import flow, task, unmapped
from prefect.cache_policies import INPUTS
@task(persist_result=True, task_run_name="{year}-{day:>03}", cache_policy=INPUTS)
def run_long_thing(
year: int,
day: int,
) -> str:
# ... actually some long-running code that returns a Path
sleep(1.5)
return f"{year}-{day}"
@flow
def run_mini():
return run_long_thing.map(unmapped(2016), range(50))
if __name__ == "__main__":
run_mini() |
Beta Was this translation helpful? Give feedback.
-
Thank you @zzstoatzz you are totally correct -- somehow I had missed that the |
Beta Was this translation helpful? Give feedback.
hey @steven-murray - I converted this to a discussion, since I think its more of a question than a bug - hope thats okay with you
to be clear, here I'm assuming that you're asking about how to cache task results across flow runs, because it appears that you're submitting your task over
range(50)
, which within a single flow run, would give unique task inputs for each invocation (i.e. all cache misses inside one flow run). If I'm mischaracterizing the ask, feel free to let me know and I can update the title hereconsider this variant of your example, where importantly:
INPUTS
and notDEFAULT
cache policy, since withDEFAULT
we will miss the cache in subsequent flow runs, because the …