Skip to content

Commit

Permalink
Merge pull request #164 from PrefectHQ/compat-for-0.14
Browse files Browse the repository at this point in the history
Compatibility for 0.14
  • Loading branch information
cicdw authored Dec 15, 2020
2 parents 0343526 + e25b3a9 commit f28a797
Show file tree
Hide file tree
Showing 5 changed files with 9 additions and 132 deletions.
4 changes: 2 additions & 2 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ aliases:
run:
name: Set environment variables
command: |
# set prefect tag -- currently pinning to 0.13.19 as 0.14 is prepared
echo 'export PREFECT_VERSION=0.13.19' >> $BASH_ENV
# set prefect tag -- currently pinning to master
echo 'export PREFECT_VERSION=master' >> $BASH_ENV
- &install_prefect_server
run:
Expand Down
2 changes: 1 addition & 1 deletion src/prefect_server/database/orm.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
pydantic.json.ENCODERS_BY_TYPE[pendulum.Time] = str
pydantic.json.ENCODERS_BY_TYPE[pendulum.Duration] = lambda x: str(x.total_seconds())
pydantic.json.ENCODERS_BY_TYPE[pendulum.Period] = lambda x: str(x.total_seconds())
pydantic.json.ENCODERS_BY_TYPE[prefect.engine.result.NoResultType] = str
pydantic.json.ENCODERS_BY_TYPE[prefect.engine.result.Result] = str


def _as_pendulum(value: Union[str, datetime.datetime]) -> pendulum.DateTime:
Expand Down
4 changes: 2 additions & 2 deletions tests/api/test_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -773,7 +773,7 @@ async def test_get_flow_run_in_queue_works_if_environment_labels_are_none(
"""

flow = await models.Flow.where(id=flow_id).first({"environment"})
flow.environment["labels"] = None
flow.environment = dict(labels=None)
await models.Flow.where(id=flow_id).update({"environment": flow.environment})
check_flow = await models.Flow.where(id=flow_id).first({"environment"})
assert check_flow.environment["labels"] is None
Expand All @@ -795,7 +795,7 @@ async def test_get_flow_run_in_queue_works_if_environment_labels_are_missing(
"""

flow = await models.Flow.where(id=flow_id).first({"environment"})
del flow.environment["labels"]
flow.environment = dict()
await models.Flow.where(id=flow_id).update({"environment": flow.environment})
check_flow = await models.Flow.where(id=flow_id).first({"environment"})
assert "labels" not in check_flow.environment
Expand Down
85 changes: 0 additions & 85 deletions tests/api/test_states.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
from box import Box

from prefect import api, models
from prefect.engine.result import SafeResult
from prefect.engine.result_handlers import JSONResultHandler
from prefect.engine.state import (
Cancelled,
Failed,
Expand Down Expand Up @@ -84,89 +82,6 @@ async def test_trigger_failed_state_does_not_set_end_time(self, task_run_id):
assert not task_run_info.start_time
assert not task_run_info.end_time

@pytest.mark.parametrize(
"state",
[s() for s in State.children() if s not in _MetaState.children()],
ids=[s.__name__ for s in State.children() if s not in _MetaState.children()],
)
async def test_setting_a_task_run_state_pulls_cached_inputs_if_possible(
self, task_run_id, state, running_flow_run_id
):

res1 = SafeResult(1, result_handler=JSONResultHandler())
res2 = SafeResult({"z": 2}, result_handler=JSONResultHandler())
complex_result = {"x": res1, "y": res2}
cached_state = Failed(cached_inputs=complex_result)
await models.TaskRun.where(id=task_run_id).update(
set=dict(serialized_state=cached_state.serialize())
)

# try to schedule the task run to scheduled
await api.states.set_task_run_state(task_run_id=task_run_id, state=state)

task_run = await models.TaskRun.where(id=task_run_id).first(
{"serialized_state"}
)

# ensure the state change took place
assert task_run.serialized_state["type"] == type(state).__name__
assert task_run.serialized_state["cached_inputs"]["x"]["value"] == 1
assert task_run.serialized_state["cached_inputs"]["y"]["value"] == {"z": 2}

@pytest.mark.parametrize(
"state",
[
s(cached_inputs=None)
for s in State.children()
if s not in _MetaState.children()
],
ids=[s.__name__ for s in State.children() if s not in _MetaState.children()],
)
async def test_task_runs_with_null_cached_inputs_do_not_overwrite_cache(
self, state, task_run_id, running_flow_run_id
):

await api.states.set_task_run_state(task_run_id=task_run_id, state=state)
# set up a Retrying state with non-null cached inputs
res1 = SafeResult(1, result_handler=JSONResultHandler())
res2 = SafeResult({"z": 2}, result_handler=JSONResultHandler())
complex_result = {"x": res1, "y": res2}
cached_state = Retrying(cached_inputs=complex_result)
await api.states.set_task_run_state(task_run_id=task_run_id, state=cached_state)
run = await models.TaskRun.where(id=task_run_id).first({"serialized_state"})

assert run.serialized_state["cached_inputs"]["x"]["value"] == 1
assert run.serialized_state["cached_inputs"]["y"]["value"] == {"z": 2}

@pytest.mark.parametrize(
"state_cls", [s for s in State.children() if s not in _MetaState.children()]
)
async def test_task_runs_cached_inputs_give_preference_to_new_cached_inputs(
self, state_cls, task_run_id, running_flow_run_id
):

# set up a Failed state with null cached inputs
res1 = SafeResult(1, result_handler=JSONResultHandler())
res2 = SafeResult({"a": 2}, result_handler=JSONResultHandler())
complex_result = {"b": res1, "c": res2}
cached_state = state_cls(cached_inputs=complex_result)
await api.states.set_task_run_state(task_run_id=task_run_id, state=cached_state)
# set up a Retrying state with non-null cached inputs
res1 = SafeResult(1, result_handler=JSONResultHandler())
res2 = SafeResult({"z": 2}, result_handler=JSONResultHandler())
complex_result = {"x": res1, "y": res2}
cached_state = Retrying(cached_inputs=complex_result)
await api.states.set_task_run_state(task_run_id=task_run_id, state=cached_state)
run = Box(
await models.TaskRun.where(id=task_run_id).first({"serialized_state"})
)

# verify that we have cached inputs, and that preference has been given to the new
# cached inputs
assert run.serialized_state.cached_inputs
assert run.serialized_state.cached_inputs.x.value == 1
assert run.serialized_state.cached_inputs.y.value == {"z": 2}

@pytest.mark.parametrize(
"flow_run_state", [Pending(), Running(), Failed(), Success()]
)
Expand Down
46 changes: 4 additions & 42 deletions tests/graphql/test_states.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
import pytest

from prefect import api, models
from prefect.engine.result import Result, SafeResult
from prefect.engine.result_handlers import JSONResultHandler
from prefect.engine.result import Result
from prefect.engine.results import PrefectResult
from prefect.engine.state import Retrying, Running, Submitted, Success


Expand Down Expand Up @@ -133,26 +133,7 @@ async def test_set_multiple_flow_run_states_with_one_failed(
)

async def test_set_flow_run_state_with_result(self, run_query, flow_run_id):
result = Result(10, result_handler=JSONResultHandler())
result.store_safe_value()
state = Success(result=result)

result = await run_query(
query=self.mutation,
variables=dict(
input=dict(
states=[dict(flow_run_id=flow_run_id, state=state.serialize())]
)
),
)
fr = await models.FlowRun.where(
id=result.data.set_flow_run_states.states[0].id
).first({"state", "version"})
assert fr.version == 3
assert fr.state == "Success"

async def test_set_flow_run_state_with_saferesult(self, run_query, flow_run_id):
result = SafeResult("10", result_handler=JSONResultHandler())
result = PrefectResult(location="10")
state = Success(result=result)

result = await run_query(
Expand Down Expand Up @@ -325,26 +306,7 @@ async def test_set_multiple_task_run_states_with_one_failed(
)

async def test_set_task_run_state_with_result(self, run_query, task_run_id):
result = Result(10, result_handler=JSONResultHandler())
result.store_safe_value()
state = Success(result=result)

result = await run_query(
query=self.mutation,
variables=dict(
input=dict(
states=[dict(task_run_id=task_run_id, state=state.serialize())]
)
),
)
tr = await models.TaskRun.where(
id=result.data.set_task_run_states.states[0].id
).first({"state", "version"})
assert tr.version == 2
assert tr.state == "Success"

async def test_set_task_run_state_with_safe_result(self, run_query, task_run_id):
result = SafeResult("10", result_handler=JSONResultHandler())
result = PrefectResult(location="10")
state = Success(result=result)

result = await run_query(
Expand Down

0 comments on commit f28a797

Please sign in to comment.