diff --git a/.circleci/config.yml b/.circleci/config.yml index 2378774a..4babccf0 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -3,8 +3,8 @@ aliases: run: name: Set environment variables command: | - # set prefect tag -- currently pinning to 0.13.19 as 0.14 is prepared - echo 'export PREFECT_VERSION=0.13.19' >> $BASH_ENV + # set prefect tag -- currently pinning to master + echo 'export PREFECT_VERSION=master' >> $BASH_ENV - &install_prefect_server run: diff --git a/src/prefect_server/database/orm.py b/src/prefect_server/database/orm.py index 31790afc..34bf3cdf 100644 --- a/src/prefect_server/database/orm.py +++ b/src/prefect_server/database/orm.py @@ -22,7 +22,7 @@ pydantic.json.ENCODERS_BY_TYPE[pendulum.Time] = str pydantic.json.ENCODERS_BY_TYPE[pendulum.Duration] = lambda x: str(x.total_seconds()) pydantic.json.ENCODERS_BY_TYPE[pendulum.Period] = lambda x: str(x.total_seconds()) -pydantic.json.ENCODERS_BY_TYPE[prefect.engine.result.NoResultType] = str +pydantic.json.ENCODERS_BY_TYPE[prefect.engine.result.Result] = str def _as_pendulum(value: Union[str, datetime.datetime]) -> pendulum.DateTime: diff --git a/tests/api/test_runs.py b/tests/api/test_runs.py index d787b175..47ecf91d 100644 --- a/tests/api/test_runs.py +++ b/tests/api/test_runs.py @@ -773,7 +773,7 @@ async def test_get_flow_run_in_queue_works_if_environment_labels_are_none( """ flow = await models.Flow.where(id=flow_id).first({"environment"}) - flow.environment["labels"] = None + flow.environment = dict(labels=None) await models.Flow.where(id=flow_id).update({"environment": flow.environment}) check_flow = await models.Flow.where(id=flow_id).first({"environment"}) assert check_flow.environment["labels"] is None @@ -795,7 +795,7 @@ async def test_get_flow_run_in_queue_works_if_environment_labels_are_missing( """ flow = await models.Flow.where(id=flow_id).first({"environment"}) - del flow.environment["labels"] + flow.environment = dict() await models.Flow.where(id=flow_id).update({"environment": flow.environment}) check_flow = await models.Flow.where(id=flow_id).first({"environment"}) assert "labels" not in check_flow.environment diff --git a/tests/api/test_states.py b/tests/api/test_states.py index 24ad8c1c..73af453e 100644 --- a/tests/api/test_states.py +++ b/tests/api/test_states.py @@ -5,8 +5,6 @@ from box import Box from prefect import api, models -from prefect.engine.result import SafeResult -from prefect.engine.result_handlers import JSONResultHandler from prefect.engine.state import ( Cancelled, Failed, @@ -84,89 +82,6 @@ async def test_trigger_failed_state_does_not_set_end_time(self, task_run_id): assert not task_run_info.start_time assert not task_run_info.end_time - @pytest.mark.parametrize( - "state", - [s() for s in State.children() if s not in _MetaState.children()], - ids=[s.__name__ for s in State.children() if s not in _MetaState.children()], - ) - async def test_setting_a_task_run_state_pulls_cached_inputs_if_possible( - self, task_run_id, state, running_flow_run_id - ): - - res1 = SafeResult(1, result_handler=JSONResultHandler()) - res2 = SafeResult({"z": 2}, result_handler=JSONResultHandler()) - complex_result = {"x": res1, "y": res2} - cached_state = Failed(cached_inputs=complex_result) - await models.TaskRun.where(id=task_run_id).update( - set=dict(serialized_state=cached_state.serialize()) - ) - - # try to schedule the task run to scheduled - await api.states.set_task_run_state(task_run_id=task_run_id, state=state) - - task_run = await models.TaskRun.where(id=task_run_id).first( - {"serialized_state"} - ) - - # ensure the state change took place - assert task_run.serialized_state["type"] == type(state).__name__ - assert task_run.serialized_state["cached_inputs"]["x"]["value"] == 1 - assert task_run.serialized_state["cached_inputs"]["y"]["value"] == {"z": 2} - - @pytest.mark.parametrize( - "state", - [ - s(cached_inputs=None) - for s in State.children() - if s not in _MetaState.children() - ], - ids=[s.__name__ for s in State.children() if s not in _MetaState.children()], - ) - async def test_task_runs_with_null_cached_inputs_do_not_overwrite_cache( - self, state, task_run_id, running_flow_run_id - ): - - await api.states.set_task_run_state(task_run_id=task_run_id, state=state) - # set up a Retrying state with non-null cached inputs - res1 = SafeResult(1, result_handler=JSONResultHandler()) - res2 = SafeResult({"z": 2}, result_handler=JSONResultHandler()) - complex_result = {"x": res1, "y": res2} - cached_state = Retrying(cached_inputs=complex_result) - await api.states.set_task_run_state(task_run_id=task_run_id, state=cached_state) - run = await models.TaskRun.where(id=task_run_id).first({"serialized_state"}) - - assert run.serialized_state["cached_inputs"]["x"]["value"] == 1 - assert run.serialized_state["cached_inputs"]["y"]["value"] == {"z": 2} - - @pytest.mark.parametrize( - "state_cls", [s for s in State.children() if s not in _MetaState.children()] - ) - async def test_task_runs_cached_inputs_give_preference_to_new_cached_inputs( - self, state_cls, task_run_id, running_flow_run_id - ): - - # set up a Failed state with null cached inputs - res1 = SafeResult(1, result_handler=JSONResultHandler()) - res2 = SafeResult({"a": 2}, result_handler=JSONResultHandler()) - complex_result = {"b": res1, "c": res2} - cached_state = state_cls(cached_inputs=complex_result) - await api.states.set_task_run_state(task_run_id=task_run_id, state=cached_state) - # set up a Retrying state with non-null cached inputs - res1 = SafeResult(1, result_handler=JSONResultHandler()) - res2 = SafeResult({"z": 2}, result_handler=JSONResultHandler()) - complex_result = {"x": res1, "y": res2} - cached_state = Retrying(cached_inputs=complex_result) - await api.states.set_task_run_state(task_run_id=task_run_id, state=cached_state) - run = Box( - await models.TaskRun.where(id=task_run_id).first({"serialized_state"}) - ) - - # verify that we have cached inputs, and that preference has been given to the new - # cached inputs - assert run.serialized_state.cached_inputs - assert run.serialized_state.cached_inputs.x.value == 1 - assert run.serialized_state.cached_inputs.y.value == {"z": 2} - @pytest.mark.parametrize( "flow_run_state", [Pending(), Running(), Failed(), Success()] ) diff --git a/tests/graphql/test_states.py b/tests/graphql/test_states.py index c4d9d2c6..e65711bb 100644 --- a/tests/graphql/test_states.py +++ b/tests/graphql/test_states.py @@ -4,8 +4,8 @@ import pytest from prefect import api, models -from prefect.engine.result import Result, SafeResult -from prefect.engine.result_handlers import JSONResultHandler +from prefect.engine.result import Result +from prefect.engine.results import PrefectResult from prefect.engine.state import Retrying, Running, Submitted, Success @@ -133,26 +133,7 @@ async def test_set_multiple_flow_run_states_with_one_failed( ) async def test_set_flow_run_state_with_result(self, run_query, flow_run_id): - result = Result(10, result_handler=JSONResultHandler()) - result.store_safe_value() - state = Success(result=result) - - result = await run_query( - query=self.mutation, - variables=dict( - input=dict( - states=[dict(flow_run_id=flow_run_id, state=state.serialize())] - ) - ), - ) - fr = await models.FlowRun.where( - id=result.data.set_flow_run_states.states[0].id - ).first({"state", "version"}) - assert fr.version == 3 - assert fr.state == "Success" - - async def test_set_flow_run_state_with_saferesult(self, run_query, flow_run_id): - result = SafeResult("10", result_handler=JSONResultHandler()) + result = PrefectResult(location="10") state = Success(result=result) result = await run_query( @@ -325,26 +306,7 @@ async def test_set_multiple_task_run_states_with_one_failed( ) async def test_set_task_run_state_with_result(self, run_query, task_run_id): - result = Result(10, result_handler=JSONResultHandler()) - result.store_safe_value() - state = Success(result=result) - - result = await run_query( - query=self.mutation, - variables=dict( - input=dict( - states=[dict(task_run_id=task_run_id, state=state.serialize())] - ) - ), - ) - tr = await models.TaskRun.where( - id=result.data.set_task_run_states.states[0].id - ).first({"state", "version"}) - assert tr.version == 2 - assert tr.state == "Success" - - async def test_set_task_run_state_with_safe_result(self, run_query, task_run_id): - result = SafeResult("10", result_handler=JSONResultHandler()) + result = PrefectResult(location="10") state = Success(result=result) result = await run_query(