From 984bc8056de11f60c6c5c33511fd99449122d43b Mon Sep 17 00:00:00 2001 From: Theodoros Katzalis Date: Tue, 17 Sep 2024 11:48:57 +0200 Subject: [PATCH] Update submodules to the latest changes - bioimageio.spec==0.5.3.post4 - bioimageio.core==0.6.7 --- .gitignore | 1 + README.md | 2 +- setup.py | 4 +- tests/conftest.py | 241 ++++++++---------- tests/test_converters.py | 35 ++- .../test_grpc/test_inference_servicer.py | 159 +++++++----- .../test_training/test_training.py | 2 +- tiktorch/converters.py | 36 ++- tiktorch/rpc/mp.py | 6 +- tiktorch/server/grpc/inference_servicer.py | 12 +- tiktorch/server/session/backend/base.py | 2 +- tiktorch/server/session/backend/supervisor.py | 7 +- tiktorch/server/session/process.py | 167 ++++++------ vendor/core-bioimage-io-python | 2 +- vendor/spec-bioimage-io | 2 +- 15 files changed, 340 insertions(+), 338 deletions(-) diff --git a/.gitignore b/.gitignore index b71865e2..cd0bc192 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,4 @@ tiktorch/__pycache/ /.#wrapper.py# .py~ unet_sample.zip +build diff --git a/README.md b/README.md index a128daed..b7de76cf 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,7 @@ Reasons to install the server component: We have a how-to for the set up of a tiktorch prediction server (powerful machine with nvidia GPU) to connect to from your client (e.g. ilastik on a laptop): [Installation](#installation). -If you are interested in running Model Zoo networks from Python directly, have a look at [`bioimageio.core`](https://github.com/bioimage-io/core-bioimage-io-python), where we have an [example notebook](https://github.com/bioimage-io/core-bioimage-io-python/blob/main/example/bioimageio-core-usage.ipynb) to get you started. +If you are interested in running Model Zoo networks from Python directly, have a look at [`bioimageio.core`](https://github.com/bioimage-io/core-bioimage-io-python), where we have an [example notebook](https://github.com/bioimage-io/core-bioimage-io-python/blob/main/example/model_usage.ipynb) to get you started. ## Installation diff --git a/setup.py b/setup.py index a6313f3f..8caaaaa9 100644 --- a/setup.py +++ b/setup.py @@ -27,8 +27,8 @@ ], packages=find_packages(exclude=["tests"]), # Required install_requires=[ - "bioimageio.spec==0.4.9.post5", - "bioimageio.core==0.5.11", + "bioimageio.spec==0.5.3.post4", + "bioimageio.core==0.6.7", "grpcio>=1.31", "numpy<2", # pytorch 2.2.2-py3.9_0 for macos is compiled with numpy 1.* "protobuf", diff --git a/tests/conftest.py b/tests/conftest.py index 1118935a..64f1d51b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,41 +1,22 @@ import faulthandler -import io import logging.handlers import multiprocessing as mp import signal import sys import threading -from collections import namedtuple from os import getenv -from pathlib import Path from random import randint -from zipfile import ZipFile +from typing import Generator, List, Tuple +from unittest.mock import create_autospec, patch import numpy as np import pytest -from bioimageio.core.resource_io import export_resource_package +import xarray as xr +from bioimageio.core import AxisId, PredictionPipeline, Sample, Tensor +from bioimageio.spec.model import v0_5 +from bioimageio.spec.model.v0_5 import TensorId -TEST_DATA = "data" -TEST_BIOIMAGEIO_ZIPFOLDER = "unet2d" -TEST_BIOIMAGEIO_ONNX = "unet2d_onnx" -TEST_BIOIMAGEIO_DUMMY_EXPLICIT = "dummy" -TEST_BIOIMAGEIO_DUMMY_EXPLICIT_RDF = f"{TEST_BIOIMAGEIO_DUMMY_EXPLICIT}/Dummy.model.yaml" -TEST_BIOIMAGEIO_DUMMY_PARAM_RDF = "dummy_param/Dummy.model_param.yaml" -TEST_BIOIMAGEIO_TENSORFLOW_DUMMY = "dummy_tensorflow" -TEST_BIOIMAGEIO_TORCHSCRIPT = "unet2d_torchscript" - -NNModel = namedtuple("NNModel", ["model", "state"]) - - -@pytest.fixture -def data_path(): - conf_path = Path(__file__).parent - return conf_path / TEST_DATA - - -def read_bytes(filename): - with open(filename, "rb") as file: - return file.read() +from tiktorch.server.session import process @pytest.fixture @@ -84,121 +65,115 @@ def assert_threads_cleanup(): pytest.fail("Threads still running:\n\t%s" % "\n\t".join(running_threads)) -@pytest.fixture -def bioimageio_model_bytes(data_path): - rdf_source = data_path / TEST_BIOIMAGEIO_ZIPFOLDER / "UNet2DNucleiBroad.model.yaml" - data = io.BytesIO() - export_resource_package(rdf_source, output_path=data) - return data - - -@pytest.fixture -def bioimageio_model_zipfile(bioimageio_model_bytes): - with ZipFile(bioimageio_model_bytes, mode="r") as zf: - yield zf - +MockedPredictionPipeline = Generator[Tuple[PredictionPipeline, Sample], None, None] -@pytest.fixture -def bioimageio_dummy_model_filepath(data_path, tmpdir): - bioimageio_net_dir = Path(data_path) / TEST_BIOIMAGEIO_DUMMY_EXPLICIT - path = tmpdir / "dummy_model.zip" - - with ZipFile(path, mode="w") as zip_model: - for f_path in bioimageio_net_dir.iterdir(): - if str(f_path.name).startswith("__"): - continue - - with f_path.open(mode="rb") as f: - zip_model.writestr(f_path.name, f.read()) - - return path - - -@pytest.fixture -def bioimageio_dummy_explicit_model_bytes(data_path): - rdf_source = data_path / TEST_BIOIMAGEIO_DUMMY_EXPLICIT_RDF - return _bioimageio_package(rdf_source) - - -@pytest.fixture -def bioimageio_dummy_param_model_bytes(data_path): - rdf_source = data_path / TEST_BIOIMAGEIO_DUMMY_PARAM_RDF - return _bioimageio_package(rdf_source) - -@pytest.fixture(params=[(TEST_BIOIMAGEIO_DUMMY_PARAM_RDF, "param"), (TEST_BIOIMAGEIO_DUMMY_EXPLICIT_RDF, "input")]) -def bioimageio_dummy_model(request, data_path): - path, tensor_id = request.param - yield _bioimageio_package(data_path / path), tensor_id - - -def _bioimageio_package(rdf_source): - data = io.BytesIO() - export_resource_package(rdf_source, output_path=data) - return data - - -def archive(directory): - result = io.BytesIO() - - with ZipFile(result, mode="w") as zip_model: - - def _archive(path_to_archive): - for path in path_to_archive.iterdir(): - if str(path.name).startswith("__"): - continue - - if path.is_dir(): - _archive(path) - - else: - with path.open(mode="rb") as f: - zip_model.writestr(str(path).replace(str(directory), ""), f.read()) - - _archive(directory) - - return result - - -@pytest.fixture -def bioimageio_dummy_tensorflow_model_bytes(data_path): - rdf_source = data_path / TEST_BIOIMAGEIO_TENSORFLOW_DUMMY / "rdf.yaml" - data = io.BytesIO() - export_resource_package(rdf_source, output_path=data) - return data - - -@pytest.fixture -def bioimageio_unet2d_onnx_bytes(data_path): - bioimageio_net_dir = Path(data_path) / TEST_BIOIMAGEIO_ONNX - return archive(bioimageio_net_dir) - - -@pytest.fixture -def bioimageio_unet2d_onnx_test_data(data_path): - bioimageio_net_dir = Path(data_path) / TEST_BIOIMAGEIO_ONNX - test_input = bioimageio_net_dir / "test_input.npy" - test_output = bioimageio_net_dir / "test_output.npy" - return {"test_input": test_input, "test_output": test_output} +def patched_prediction_pipeline(mocked_prediction_pipeline: PredictionPipeline): + return patch.object(process, "_get_prediction_pipeline_from_model_bytes", lambda *args: mocked_prediction_pipeline) @pytest.fixture -def npy_zeros_file(tmpdir): - path = str(tmpdir / "zeros.npy") - zeros = np.zeros(shape=(64, 64)) - np.save(path, zeros) - return path +def bioimage_model_explicit_siso() -> MockedPredictionPipeline: + mocked_prediction_pipeline, mocked_output_sample = _bioimage_model_siso( + [ + v0_5.BatchAxis(), + v0_5.ChannelAxis(channel_names=["channel1", "channel2"]), + v0_5.SpaceInputAxis(id="x", size=10), + v0_5.SpaceInputAxis(id="y", size=10), + ] + ) + with patched_prediction_pipeline(mocked_prediction_pipeline): + yield mocked_prediction_pipeline, mocked_output_sample @pytest.fixture -def bioimageio_unet2d_torchscript_bytes(data_path): - bioimageio_net_dir = Path(data_path) / TEST_BIOIMAGEIO_TORCHSCRIPT - return archive(bioimageio_net_dir) +def bioimage_model_param_siso() -> MockedPredictionPipeline: + mocked_prediction_pipeline, mocked_output_sample = _bioimage_model_siso( + [ + v0_5.BatchAxis(), + v0_5.ChannelAxis(channel_names=["channel1", "channel2"]), + v0_5.SpaceInputAxis(id="x", size=v0_5.ParameterizedSize(min=10, step=2)), + v0_5.SpaceInputAxis(id="y", size=v0_5.ParameterizedSize(min=20, step=3)), + ] + ) + with patched_prediction_pipeline(mocked_prediction_pipeline): + yield mocked_prediction_pipeline, mocked_output_sample + + +def _bioimage_model_siso(input_axes: List[v0_5.InputAxis]) -> Tuple[PredictionPipeline, Sample]: + """ + Mocked bioimageio prediction pipeline with single input single output + """ + + mocked_input = create_autospec(v0_5.InputTensorDescr) + mocked_input.id = "input" + mocked_input.axes = input_axes + return _bioimage_model([mocked_input]) @pytest.fixture -def bioimageio_unet2d_torchscript_test_data(data_path): - bioimageio_net_dir = Path(data_path) / TEST_BIOIMAGEIO_TORCHSCRIPT - test_input = bioimageio_net_dir / "test_input.npy" - test_output = bioimageio_net_dir / "test_output.npy" - return {"test_input": test_input, "test_output": test_output} +def bioimage_model_miso() -> MockedPredictionPipeline: + """ + Mocked bioimageio prediction pipeline with three inputs single output + """ + + mocked_input1 = create_autospec(v0_5.InputTensorDescr) + mocked_input1.id = "input1" + mocked_input1.axes = [ + v0_5.BatchAxis(), + v0_5.ChannelAxis(channel_names=["channel1", "channel2"]), + v0_5.SpaceInputAxis(id=AxisId("x"), size=10), + v0_5.SpaceInputAxis(id=AxisId("y"), size=v0_5.SizeReference(tensor_id="input3", axis_id="x")), + ] + + mocked_input2 = create_autospec(v0_5.InputTensorDescr) + mocked_input2.id = "input2" + mocked_input2.axes = [ + v0_5.BatchAxis(), + v0_5.ChannelAxis(channel_names=["channel1", "channel2"]), + v0_5.SpaceInputAxis(id=AxisId("x"), size=v0_5.ParameterizedSize(min=10, step=2)), + v0_5.SpaceInputAxis(id=AxisId("y"), size=v0_5.ParameterizedSize(min=10, step=5)), + ] + + mocked_input3 = create_autospec(v0_5.InputTensorDescr) + mocked_input3.id = "input3" + mocked_input3.axes = [ + v0_5.BatchAxis(), + v0_5.ChannelAxis(channel_names=["channel1", "channel2"]), + v0_5.SpaceInputAxis(id="x", size=v0_5.SizeReference(tensor_id="input2", axis_id="x")), + v0_5.SpaceInputAxis(id="y", size=10), + ] + + mocked_prediction_pipeline, mocked_output_sample = _bioimage_model([mocked_input1, mocked_input2, mocked_input3]) + with patched_prediction_pipeline(mocked_prediction_pipeline): + yield mocked_prediction_pipeline, mocked_output_sample + + +def _bioimage_model(inputs: List[v0_5.InputTensorDescr]) -> Tuple[PredictionPipeline, Sample]: + mocked_descr = create_autospec(v0_5.ModelDescr) + + mocked_output = create_autospec(v0_5.OutputTensorDescr) + mocked_output.id = "output" + mocked_output.axes = [ + v0_5.BatchAxis(), + v0_5.ChannelAxis(channel_names=["channel1", "channel2"]), + v0_5.SpaceInputAxis(id=AxisId("x"), size=20), + v0_5.SpaceInputAxis(id=AxisId("y"), size=20), + ] + mocked_descr.inputs = inputs + mocked_descr.outputs = [mocked_output] + + mocked_output_sample = Sample( + members={ + TensorId("output"): Tensor.from_xarray( + xr.DataArray(np.arange(2 * 20 * 20).reshape((1, 2, 20, 20)), dims=["batch", "channel", "x", "y"]) + ) + }, + id=None, + stat={}, + ) + + mocked_prediction_pipeline = create_autospec(PredictionPipeline) + mocked_prediction_pipeline.model_description = mocked_descr + mocked_prediction_pipeline.predict_sample_without_blocking.return_value = mocked_output_sample + return mocked_prediction_pipeline, mocked_output_sample diff --git a/tests/test_converters.py b/tests/test_converters.py index e112ede9..dd8db5d8 100644 --- a/tests/test_converters.py +++ b/tests/test_converters.py @@ -1,9 +1,18 @@ import numpy as np import pytest import xarray as xr +from bioimageio.spec.model.v0_5 import TensorId from numpy.testing import assert_array_equal -from tiktorch.converters import Sample, numpy_to_pb_tensor, pb_tensor_to_numpy, pb_tensor_to_xarray, xarray_to_pb_tensor +from tiktorch.converters import ( + numpy_to_pb_tensor, + pb_tensor_to_numpy, + pb_tensor_to_xarray, + pb_tensors_to_sample, + sample_to_pb_tensors, + xarray_to_pb_tensor, + xr_tensors_to_sample, +) from tiktorch.proto import inference_pb2 @@ -167,7 +176,7 @@ def test_should_same_data(self, shape): class TestSample: - def test_create_sample_from_pb_tensors(self): + def test_pb_tensors_to_sample(self): arr_1 = np.arange(32 * 32, dtype=np.int64).reshape(32, 32) tensor_1 = inference_pb2.Tensor( dtype="int64", @@ -184,22 +193,22 @@ def test_create_sample_from_pb_tensors(self): shape=[inference_pb2.NamedInt(name="x", size=64), inference_pb2.NamedInt(name="y", size=64)], ) - sample = Sample.from_pb_tensors([tensor_1, tensor_2]) - assert len(sample.tensors) == 2 - assert sample.tensors["input1"].equals(xr.DataArray(arr_1, dims=["x", "y"])) - assert sample.tensors["input2"].equals(xr.DataArray(arr_2, dims=["x", "y"])) + sample = pb_tensors_to_sample([tensor_1, tensor_2]) + assert len(sample.members) == 2 + assert sample.members[TensorId("input1")].data.equals(xr.DataArray(arr_1, dims=["x", "y"])) + assert sample.members[TensorId("input2")].data.equals(xr.DataArray(arr_2, dims=["x", "y"])) - def test_create_sample_from_raw_data(self): + def test_xr_tensors_to_sample(self): arr_1 = np.arange(32 * 32, dtype=np.int64).reshape(32, 32) tensor_1 = xr.DataArray(arr_1, dims=["x", "y"]) arr_2 = np.arange(64 * 64, dtype=np.int64).reshape(64, 64) tensor_2 = xr.DataArray(arr_2, dims=["x", "y"]) tensors_ids = ["input1", "input2"] - actual_sample = Sample.from_xr_tensors(tensors_ids, [tensor_1, tensor_2]) - expected_dict = {tensors_ids[0]: tensor_1, tensors_ids[1]: tensor_2} - expected_sample = Sample(expected_dict) - assert actual_sample == expected_sample + actual_sample = xr_tensors_to_sample(tensors_ids, [tensor_1, tensor_2]) + assert len(actual_sample.members) == 2 + assert actual_sample.members[TensorId("input1")].data.equals(tensor_1) + assert actual_sample.members[TensorId("input2")].data.equals(tensor_2) def test_sample_to_pb_tensors(self): arr_1 = np.arange(32 * 32, dtype=np.int64).reshape(32, 32) @@ -207,7 +216,7 @@ def test_sample_to_pb_tensors(self): arr_2 = np.arange(64 * 64, dtype=np.int64).reshape(64, 64) tensor_2 = xr.DataArray(arr_2, dims=["x", "y"]) tensors_ids = ["input1", "input2"] - sample = Sample.from_xr_tensors(tensors_ids, [tensor_1, tensor_2]) + sample = xr_tensors_to_sample(tensors_ids, [tensor_1, tensor_2]) pb_tensor_1 = inference_pb2.Tensor( dtype="int64", @@ -223,5 +232,5 @@ def test_sample_to_pb_tensors(self): ) expected_tensors = [pb_tensor_1, pb_tensor_2] - actual_tensors = sample.to_pb_tensors() + actual_tensors = sample_to_pb_tensors(sample) assert expected_tensors == actual_tensors diff --git a/tests/test_server/test_grpc/test_inference_servicer.py b/tests/test_server/test_grpc/test_inference_servicer.py index c61f0710..5e8ca066 100644 --- a/tests/test_server/test_grpc/test_inference_servicer.py +++ b/tests/test_server/test_grpc/test_inference_servicer.py @@ -1,10 +1,14 @@ +from unittest.mock import Mock + import grpc import numpy as np import pytest import xarray as xr +from bioimageio.spec.model.v0_5 import TensorId from numpy.testing import assert_array_equal from tiktorch import converters +from tiktorch.converters import pb_tensor_to_xarray from tiktorch.proto import inference_pb2, inference_pb2_grpc from tiktorch.server.data_store import DataStore from tiktorch.server.device_pool import TorchDevicePool @@ -38,10 +42,11 @@ def grpc_stub_cls(grpc_channel): return inference_pb2_grpc.InferenceStub -def valid_model_request(model_bytes, device_ids=None): - return inference_pb2.CreateModelSessionRequest( - model_blob=inference_pb2.Blob(content=model_bytes.getvalue()), deviceIds=device_ids or ["cpu"] +def valid_model_request(device_ids=None): + ret = inference_pb2.CreateModelSessionRequest( + model_blob=inference_pb2.Blob(content=b""), deviceIds=device_ids or ["cpu"] ) + return ret class TestModelManagement: @@ -50,12 +55,8 @@ def method_requiring_session(self, request, grpc_stub): method_name, req = request.param return getattr(grpc_stub, method_name), req - def test_model_session_creation(self, grpc_stub, bioimageio_model_bytes): - model = grpc_stub.CreateModelSession(valid_model_request(bioimageio_model_bytes)) - assert model.id - - def test_model_session_creation_using_upload_id(self, grpc_stub, data_store, bioimageio_dummy_explicit_model_bytes): - id_ = data_store.put(bioimageio_dummy_explicit_model_bytes.getvalue()) + def test_model_session_creation_using_upload_id(self, grpc_stub, data_store, bioimage_model_explicit_siso): + id_ = data_store.put(Mock()) rq = inference_pb2.CreateModelSessionRequest(model_uri=f"upload://{id_}", deviceIds=["cpu"]) model = grpc_stub.CreateModelSession(rq) @@ -103,24 +104,24 @@ def test_if_model_create_fails_devices_are_released(self, grpc_stub): assert "cpu" in device_by_id assert inference_pb2.Device.Status.AVAILABLE == device_by_id["cpu"].status - def test_use_device(self, grpc_stub, bioimageio_model_bytes): + def test_use_device(self, grpc_stub, bioimage_model_explicit_siso): device_by_id = self._query_devices(grpc_stub) assert "cpu" in device_by_id assert inference_pb2.Device.Status.AVAILABLE == device_by_id["cpu"].status - grpc_stub.CreateModelSession(valid_model_request(bioimageio_model_bytes, device_ids=["cpu"])) + grpc_stub.CreateModelSession(valid_model_request(device_ids=["cpu"])) device_by_id = self._query_devices(grpc_stub) assert "cpu" in device_by_id assert inference_pb2.Device.Status.IN_USE == device_by_id["cpu"].status - def test_using_same_device_fails(self, grpc_stub, bioimageio_model_bytes): - grpc_stub.CreateModelSession(valid_model_request(bioimageio_model_bytes, device_ids=["cpu"])) + def test_using_same_device_fails(self, grpc_stub, bioimage_model_explicit_siso): + grpc_stub.CreateModelSession(valid_model_request(device_ids=["cpu"])) with pytest.raises(grpc.RpcError): - grpc_stub.CreateModelSession(valid_model_request(bioimageio_model_bytes, device_ids=["cpu"])) + grpc_stub.CreateModelSession(valid_model_request(device_ids=["cpu"])) - def test_closing_session_releases_devices(self, grpc_stub, bioimageio_model_bytes): - model = grpc_stub.CreateModelSession(valid_model_request(bioimageio_model_bytes, device_ids=["cpu"])) + def test_closing_session_releases_devices(self, grpc_stub, bioimage_model_explicit_siso): + model = grpc_stub.CreateModelSession(valid_model_request(device_ids=["cpu"])) device_by_id = self._query_devices(grpc_stub) assert "cpu" in device_by_id @@ -134,13 +135,12 @@ def test_closing_session_releases_devices(self, grpc_stub, bioimageio_model_byte class TestGetLogs: - def test_returns_ack_message(self, bioimageio_model_bytes, grpc_stub): - model = grpc_stub.CreateModelSession(valid_model_request(bioimageio_model_bytes)) + def test_returns_ack_message(self, bioimage_model_explicit_siso, grpc_stub): + grpc_stub.CreateModelSession(valid_model_request()) resp = grpc_stub.GetLogs(inference_pb2.Empty()) record = next(resp) assert inference_pb2.LogEntry.Level.INFO == record.level assert "Sending model logs" == record.content - grpc_stub.CloseModelSession(model) class TestForwardPass: @@ -150,72 +150,95 @@ def test_call_fails_with_unknown_model_session_id(self, grpc_stub): assert grpc.StatusCode.FAILED_PRECONDITION == e.value.code() assert "model-session with id myid1 doesn't exist" in e.value.details() - def test_call_predict_valid_explicit(self, grpc_stub, bioimageio_dummy_explicit_model_bytes): - model = grpc_stub.CreateModelSession(valid_model_request(bioimageio_dummy_explicit_model_bytes)) - arr = xr.DataArray(np.arange(128 * 128).reshape(1, 1, 128, 128), dims=("b", "c", "x", "y")) - expected = arr + 1 + def test_call_predict_valid_explicit(self, grpc_stub, bioimage_model_explicit_siso): + mocked_pipeline, expected_prediction_output = bioimage_model_explicit_siso + model = grpc_stub.CreateModelSession(valid_model_request()) + arr = xr.DataArray(np.arange(2 * 10 * 10).reshape(1, 2, 10, 10), dims=("batch", "channel", "x", "y")) input_tensor_id = "input" - output_tensor_id = "output" input_tensors = [converters.xarray_to_pb_tensor(input_tensor_id, arr)] res = grpc_stub.Predict(inference_pb2.PredictRequest(modelSessionId=model.id, tensors=input_tensors)) - assert len(res.tensors) == 1 - assert res.tensors[0].tensorId == output_tensor_id - assert_array_equal(expected, converters.pb_tensor_to_numpy(res.tensors[0])) + pb_tensor = res.tensors[0] + assert pb_tensor.tensorId == "output" + assert_array_equal(pb_tensor_to_xarray(res.tensors[0]), expected_prediction_output.members[TensorId("output")]) - def test_call_predict_invalid_shape_explicit(self, grpc_stub, bioimageio_dummy_explicit_model_bytes): - model = grpc_stub.CreateModelSession(valid_model_request(bioimageio_dummy_explicit_model_bytes)) - arr = xr.DataArray(np.arange(32 * 32).reshape(1, 1, 32, 32), dims=("b", "c", "x", "y")) + def test_call_predict_invalid_shape_explicit(self, grpc_stub, bioimage_model_explicit_siso): + model = grpc_stub.CreateModelSession(valid_model_request()) + arr = xr.DataArray(np.arange(32 * 32).reshape(1, 1, 32, 32), dims=("batch", "channel", "x", "y")) input_tensors = [converters.xarray_to_pb_tensor("input", arr)] - with pytest.raises(grpc.RpcError): + with pytest.raises(grpc.RpcError) as error: grpc_stub.Predict(inference_pb2.PredictRequest(modelSessionId=model.id, tensors=input_tensors)) + assert error.value.details().startswith("Exception calling application: Incompatible axis") + + def test_call_predict_multiple_inputs_with_reference(self, grpc_stub, bioimage_model_miso): + mocked_pipeline, expected_prediction_output = bioimage_model_miso + model = grpc_stub.CreateModelSession(valid_model_request()) + + arr1 = xr.DataArray(np.arange(2 * 10 * 10).reshape(1, 2, 10, 10), dims=("batch", "channel", "x", "y")) + input_tensor_id1 = "input1" + + arr2 = xr.DataArray(np.arange(2 * 10 * 15).reshape(1, 2, 10, 15), dims=("batch", "channel", "x", "y")) + input_tensor_id2 = "input2" + + arr3 = xr.DataArray(np.arange(2 * 10 * 12).reshape(1, 2, 12, 10), dims=("batch", "channel", "x", "y")) + input_tensor_id3 = "input3" + + input_tensor_ids = [input_tensor_id1, input_tensor_id2, input_tensor_id3] + tensors_arr = [arr1, arr2, arr3] + input_tensors = [ + converters.xarray_to_pb_tensor(tensor_id, arr) for tensor_id, arr in zip(input_tensor_ids, tensors_arr) + ] + + res = grpc_stub.Predict(inference_pb2.PredictRequest(modelSessionId=model.id, tensors=input_tensors)) + grpc_stub.CloseModelSession(model) + assert len(res.tensors) == 1 + pb_tensor = res.tensors[0] + assert pb_tensor.tensorId == "output" + assert_array_equal(pb_tensor_to_xarray(res.tensors[0]), expected_prediction_output.members[TensorId("output")]) + + @pytest.mark.parametrize("shape", [(1, 2, 10, 20), (1, 2, 12, 20), (1, 2, 10, 23), (1, 2, 12, 23)]) + def test_call_predict_valid_shape_parameterized(self, grpc_stub, shape, bioimage_model_param_siso): + model = grpc_stub.CreateModelSession(valid_model_request()) + arr = xr.DataArray(np.arange(np.prod(shape)).reshape(*shape), dims=("batch", "channel", "x", "y")) + input_tensor_id = "input" + input_tensors = [converters.xarray_to_pb_tensor(input_tensor_id, arr)] + grpc_stub.Predict(inference_pb2.PredictRequest(modelSessionId=model.id, tensors=input_tensors)) @pytest.mark.parametrize( "shape", - [(1, 1, 64, 32), (1, 1, 32, 64), (1, 1, 64, 32), (0, 1, 64, 64), (1, 0, 64, 64)], + [(1, 1, 10, 20), (1, 2, 8, 20), (1, 2, 11, 20), (1, 2, 10, 21)], ) - def test_call_predict_invalid_shape_parameterized(self, grpc_stub, shape, bioimageio_dummy_param_model_bytes): - model = grpc_stub.CreateModelSession(valid_model_request(bioimageio_dummy_param_model_bytes)) - arr = xr.DataArray(np.arange(np.prod(shape)).reshape(*shape), dims=("b", "c", "x", "y")) - input_tensors = [converters.xarray_to_pb_tensor("param", arr)] - with pytest.raises(grpc.RpcError): + def test_call_predict_invalid_shape_parameterized(self, grpc_stub, shape, bioimage_model_param_siso): + model = grpc_stub.CreateModelSession(valid_model_request()) + arr = xr.DataArray(np.arange(np.prod(shape)).reshape(*shape), dims=("batch", "channel", "x", "y")) + input_tensor_id = "input" + input_tensors = [converters.xarray_to_pb_tensor(input_tensor_id, arr)] + with pytest.raises(grpc.RpcError) as error: grpc_stub.Predict(inference_pb2.PredictRequest(modelSessionId=model.id, tensors=input_tensors)) + assert error.value.details().startswith("Exception calling application: Incompatible axis") - def test_call_predict_invalid_tensor_ids(self, grpc_stub, bioimageio_dummy_model): - model_bytes, _ = bioimageio_dummy_model - model = grpc_stub.CreateModelSession(valid_model_request(model_bytes)) - arr = xr.DataArray(np.arange(32 * 32).reshape(32, 32), dims=("x", "y")) + def test_call_predict_invalid_tensor_ids(self, grpc_stub, bioimage_model_explicit_siso): + model = grpc_stub.CreateModelSession(valid_model_request()) + arr = xr.DataArray(np.arange(2 * 10 * 20).reshape(1, 2, 10, 20), dims=("batch", "channel", "x", "y")) input_tensors = [converters.xarray_to_pb_tensor("invalidTensorName", arr)] with pytest.raises(grpc.RpcError) as error: grpc_stub.Predict(inference_pb2.PredictRequest(modelSessionId=model.id, tensors=input_tensors)) assert error.value.details().startswith("Exception calling application: Spec invalidTensorName doesn't exist") - def test_call_predict_invalid_axes(self, grpc_stub, bioimageio_dummy_model): - model_bytes, tensor_id = bioimageio_dummy_model - model = grpc_stub.CreateModelSession(valid_model_request(model_bytes)) - arr = xr.DataArray(np.arange(32 * 32).reshape(32, 32), dims=("invalidAxis", "y")) - input_tensors = [converters.xarray_to_pb_tensor(tensor_id, arr)] - with pytest.raises(grpc.RpcError) as error: - grpc_stub.Predict(inference_pb2.PredictRequest(modelSessionId=model.id, tensors=input_tensors)) - assert error.value.details().startswith("Exception calling application: Incompatible axes") - - @pytest.mark.parametrize("shape", [(1, 1, 64, 64), (1, 1, 66, 65), (1, 1, 68, 66), (1, 1, 70, 67)]) - def test_call_predict_valid_shape_parameterized(self, grpc_stub, shape, bioimageio_dummy_param_model_bytes): - model = grpc_stub.CreateModelSession(valid_model_request(bioimageio_dummy_param_model_bytes)) - arr = xr.DataArray(np.arange(np.prod(shape)).reshape(*shape), dims=("b", "c", "x", "y")) - input_tensors = [converters.xarray_to_pb_tensor("param", arr)] - grpc_stub.Predict(inference_pb2.PredictRequest(modelSessionId=model.id, tensors=input_tensors)) - - @pytest.mark.skip - def test_call_predict_tf(self, grpc_stub, bioimageio_dummy_tensorflow_model_bytes): - model = grpc_stub.CreateModelSession(valid_model_request(bioimageio_dummy_tensorflow_model_bytes)) - arr = xr.DataArray(np.arange(32 * 32).reshape(1, 1, 32, 32), dims=("b", "c", "x", "y")) - expected = arr * -1 + @pytest.mark.parametrize( + "axes", + [ + ("channel", "batch", "x", "y"), + ("time", "channel", "x", "y"), + ("batch", "channel", "z", "y"), + ("b", "c", "x", "y"), + ], + ) + def test_call_predict_invalid_axes(self, grpc_stub, axes, bioimage_model_explicit_siso): + model = grpc_stub.CreateModelSession(valid_model_request()) + arr = xr.DataArray(np.arange(2 * 10 * 10).reshape(1, 2, 10, 10), dims=axes) input_tensor_id = "input" - output_tensor_id = "output" input_tensors = [converters.xarray_to_pb_tensor(input_tensor_id, arr)] - res = grpc_stub.Predict(inference_pb2.PredictRequest(modelSessionId=model.id, tensors=input_tensors)) - - assert len(res.tensors) == 1 - assert res.tensors[0].tensorId == output_tensor_id - assert_array_equal(expected, converters.pb_tensor_to_numpy(res.tensors[0])) + with pytest.raises(grpc.RpcError) as error: + grpc_stub.Predict(inference_pb2.PredictRequest(modelSessionId=model.id, tensors=input_tensors)) + assert error.value.details().startswith("Exception calling application: Incompatible axes names") diff --git a/tests/test_server/test_training/test_training.py b/tests/test_server/test_training/test_training.py index 21601b5d..c015129e 100644 --- a/tests/test_server/test_training/test_training.py +++ b/tests/test_server/test_training/test_training.py @@ -27,7 +27,7 @@ def __init__(self): def set_break_callback(self, cb): self._break_cb = cb - def forward(self, input_tensors): + def predict_sample_without_blocking(self, input_tensors): return [xr.DataArray(np.array([42]), dims=("x",))] def set_max_num_iterations(self, val): diff --git a/tiktorch/converters.py b/tiktorch/converters.py index 5ce890a7..5a6669b8 100644 --- a/tiktorch/converters.py +++ b/tiktorch/converters.py @@ -1,29 +1,37 @@ from __future__ import annotations -import dataclasses -from typing import Dict, List +from typing import List import numpy as np import xarray as xr +from bioimageio.core import Sample, Tensor +from bioimageio.spec.model.v0_5 import TensorId from tiktorch.proto import inference_pb2 -@dataclasses.dataclass(frozen=True) -class Sample: - tensors: Dict[str, xr.DataArray] +def pb_tensors_to_sample(pb_tensors: List[inference_pb2.Tensor]) -> Sample: + return Sample( + members={TensorId(tensor.tensorId): Tensor.from_xarray(pb_tensor_to_xarray(tensor)) for tensor in pb_tensors}, + id=None, + stat={}, + ) + - @classmethod - def from_pb_tensors(cls, pb_tensors: List[inference_pb2.Tensor]) -> Sample: - return Sample({tensor.tensorId: pb_tensor_to_xarray(tensor) for tensor in pb_tensors}) +def xr_tensors_to_sample(tensor_ids: List[str], tensors_data: List[xr.DataArray]) -> Sample: + assert len(tensor_ids) == len(tensors_data) + return Sample( + members={ + TensorId(tensor_id): Tensor.from_xarray(tensor_data) + for tensor_id, tensor_data in zip(tensor_ids, tensors_data) + }, + id=None, + stat={}, + ) - @classmethod - def from_xr_tensors(cls, tensor_ids: List[str], tensors_data: List[xr.DataArray]) -> Sample: - assert len(tensor_ids) == len(tensors_data) - return Sample({tensor_id: tensor_data for tensor_id, tensor_data in zip(tensor_ids, tensors_data)}) - def to_pb_tensors(self) -> List[inference_pb2.Tensor]: - return [xarray_to_pb_tensor(tensor_id, res_tensor) for tensor_id, res_tensor in self.tensors.items()] +def sample_to_pb_tensors(sample: Sample) -> List[inference_pb2.Tensor]: + return [xarray_to_pb_tensor(tensor_id, res_tensor.data) for tensor_id, res_tensor in sample.members.items()] def numpy_to_pb_tensor(array: np.ndarray, axistags=None) -> inference_pb2.Tensor: diff --git a/tiktorch/rpc/mp.py b/tiktorch/rpc/mp.py index 992ed761..3c6efb03 100644 --- a/tiktorch/rpc/mp.py +++ b/tiktorch/rpc/mp.py @@ -9,7 +9,7 @@ from typing import Any, List, Optional, Type, TypeVar from uuid import uuid4 -from bioimageio.core.resource_io import nodes +from bioimageio.spec.model import v0_5 from ..server.session import IRPCModelSession from .exceptions import Shutdown @@ -113,8 +113,8 @@ class _Api: @dataclasses.dataclass(frozen=True) class BioModelClient: api: IRPCModelSession - input_specs: List[nodes.InputTensor] - output_specs: List[nodes.OutputTensor] + input_specs: List[v0_5.InputTensorDescr] + output_specs: List[v0_5.OutputTensorDescr] class MPClient: diff --git a/tiktorch/server/grpc/inference_servicer.py b/tiktorch/server/grpc/inference_servicer.py index 987bb257..64a3b462 100644 --- a/tiktorch/server/grpc/inference_servicer.py +++ b/tiktorch/server/grpc/inference_servicer.py @@ -2,11 +2,11 @@ import grpc -from tiktorch.converters import Sample +from tiktorch.converters import pb_tensors_to_sample, sample_to_pb_tensors from tiktorch.proto import inference_pb2, inference_pb2_grpc from tiktorch.server.data_store import IDataStore from tiktorch.server.device_pool import DeviceStatus, IDevicePool -from tiktorch.server.session.process import InputTensorValidator, start_model_session_process +from tiktorch.server.session.process import SampleValidator, start_model_session_process from tiktorch.server.session_manager import Session, SessionManager @@ -84,13 +84,11 @@ def ListDevices(self, request: inference_pb2.Empty, context) -> inference_pb2.De def Predict(self, request: inference_pb2.PredictRequest, context) -> inference_pb2.PredictResponse: session = self._getModelSession(context, request.modelSessionId) - input_sample = Sample.from_pb_tensors(request.tensors) - tensor_validator = InputTensorValidator(session.bio_model_client.input_specs) + input_sample = pb_tensors_to_sample(request.tensors) + tensor_validator = SampleValidator(session.bio_model_client.input_specs) tensor_validator.check_tensors(input_sample) res = session.bio_model_client.api.forward(input_sample) - output_tensor_ids = [tensor.name for tensor in session.bio_model_client.output_specs] - output_sample = Sample.from_xr_tensors(output_tensor_ids, res) - return inference_pb2.PredictResponse(tensors=output_sample.to_pb_tensors()) + return inference_pb2.PredictResponse(tensors=sample_to_pb_tensors(res)) def _getModelSession(self, context, modelSessionId: str) -> Session: if not modelSessionId: diff --git a/tiktorch/server/session/backend/base.py b/tiktorch/server/session/backend/base.py index f0f2be6f..eab3ea44 100644 --- a/tiktorch/server/session/backend/base.py +++ b/tiktorch/server/session/backend/base.py @@ -5,7 +5,7 @@ import typing from concurrent.futures import Future -from bioimageio.core.prediction_pipeline import PredictionPipeline +from bioimageio.core import PredictionPipeline from tiktorch.configkeys import TRAINING, VALIDATION from tiktorch.server.session import types diff --git a/tiktorch/server/session/backend/supervisor.py b/tiktorch/server/session/backend/supervisor.py index 5e77b1ac..14d00f81 100644 --- a/tiktorch/server/session/backend/supervisor.py +++ b/tiktorch/server/session/backend/supervisor.py @@ -3,8 +3,7 @@ import logging import queue -import xarray as xr -from bioimageio.core.prediction_pipeline import PredictionPipeline +from bioimageio.core import PredictionPipeline from tiktorch.server.session import types from tiktorch.server.session.backend import commands @@ -39,9 +38,7 @@ def has_work(self): return self._pipeline.max_num_iterations and self._pipeline.max_num_iterations > self._pipeline.iteration_count def forward(self, input_tensors): - results = self._pipeline.forward(*input_tensors) - for tensor in results: - isinstance(tensor, xr.DataArray), f"Not a DataArray, but a {type(tensor)}" + results = self._pipeline.predict_sample_without_blocking(input_tensors) return results def transition_to(self, new_state: types.State) -> None: diff --git a/tiktorch/server/session/process.py b/tiktorch/server/session/process.py index c9e0186e..e47ffbd5 100644 --- a/tiktorch/server/session/process.py +++ b/tiktorch/server/session/process.py @@ -4,13 +4,12 @@ import uuid from concurrent.futures import Future from multiprocessing.connection import Connection -from typing import Dict, Iterator, List, Optional, Tuple +from typing import List, Optional, Tuple, Union -import numpy as np -from bioimageio.core import load_resource_description -from bioimageio.core.prediction_pipeline import PredictionPipeline, create_prediction_pipeline -from bioimageio.core.resource_io import nodes -from bioimageio.core.resource_io.nodes import ParametrizedInputShape +from bioimageio.core import PredictionPipeline, Tensor, create_prediction_pipeline +from bioimageio.spec import load_description +from bioimageio.spec.model import v0_5 +from bioimageio.spec.model.v0_5 import BatchAxis from tiktorch import log from tiktorch.rpc import Shutdown @@ -22,90 +21,81 @@ from .rpc_interface import IRPCModelSession -class InputTensorValidator: - def __init__(self, input_specs: List[nodes.InputTensor]): - self._input_specs = input_specs +class SampleValidator: + def __init__(self, specs: Union[List[v0_5.InputTensorDescr], List[v0_5.OutputTensorDescr]]): + self._specs = specs def check_tensors(self, sample: Sample): - for tensor_id, tensor in sample.tensors.items(): - self.check_shape(tensor_id, tensor.dims, tensor.shape) - - def _get_input_tensors_with_names(self) -> Dict[str, nodes.InputTensor]: - return {tensor.name: tensor for tensor in self._input_specs} - - def check_shape(self, tensor_id: str, axes: Tuple[str, ...], shape: Tuple[int, ...]): - shape = self.get_axes_with_size(axes, shape) - spec = self._get_input_spec(tensor_id) - if isinstance(spec.shape, list): - self._check_shape_explicit(spec, shape) - elif isinstance(spec.shape, ParametrizedInputShape): - self._check_shape_parameterized(spec, shape) + for tensor_id, tensor_data in sample.members.items(): + self._check_shape(tensor_id, tensor_data) + + def _check_shape(self, tensor_id: str, tensor: Tensor): + spec = self._get_spec(tensor_id) + dims_spec = tuple(axis.id for axis in spec.axes) + if dims_spec != tensor.dims: + raise ValueError(f"Incompatible axes names, got {tensor.dims} expected {dims_spec}") + for axis in spec.axes: + source_axis_size = axis.size + target_axis_size = tensor.sizes[axis.id] + if axis.id not in tensor.sizes: + ValueError(f"{axis.id} not found in {tensor.sizes}") + if isinstance(axis, BatchAxis) and axis.size is None: + continue + if not self._is_size_valid(source_axis_size, target_axis_size): + raise ValueError( + f"Incompatible axis for axis {axis.id} with {source_axis_size}. Got {target_axis_size}" + ) + + def _is_size_valid(self, source_size: Union[int, v0_5.ParameterizedSize, v0_5.SizeReference], target_size: int): + if isinstance(source_size, v0_5.SizeReference): + source_size = self._realize_size_reference(source_size) + + if isinstance(source_size, int): + return source_size == target_size + elif isinstance(source_size, v0_5.ParameterizedSize): + min_size = source_size.min + step_size = source_size.step + if target_size < min_size: + return False + if step_size == 0: + return min_size == target_size + diff = target_size - min_size + num_increments = diff / step_size + return num_increments % 1 == 0 else: - raise ValueError(f"Unexpected shape {spec.shape}") - - def _get_input_spec(self, tensor_id: str) -> nodes.InputTensor: - self._check_spec_exists(tensor_id) - specs = [spec for spec in self._input_specs if spec.name == tensor_id] + raise ValueError(f"Unexpected size {source_size}") + + def _realize_size_reference(self, size: v0_5.SizeReference) -> Union[int, v0_5.ParameterizedSize]: + visited = {} + + def dfs(tensor_id, axis_id): + """ + If size references to another reference and so on, we need to recursively found the ground truth + """ + if tensor_id in visited: + return + visited[tensor_id] = True + ref_tensor = self._get_spec(tensor_id) + ref_axes = [axis for axis in ref_tensor.axes if axis.id == axis_id] + assert len(ref_axes) == 1 + ref_axis = ref_axes[0] + ref_size = ref_axis.size + if not isinstance(ref_size, v0_5.SizeReference): + return ref_size + return dfs(ref_size.tensor_id, ref_size.axis_id) + + ground_size = dfs(size.tensor_id, size.axis_id) + if ground_size is None: + raise ValueError(f"Couldn't realize size reference {size}") + return ground_size + + def _get_spec(self, tensor_id: str) -> v0_5.InputTensorDescr: + specs = [spec for spec in self._specs if tensor_id == spec.id] + if len(specs) == 0: + raise ValueError(f"Spec {tensor_id} doesn't exist for specs {[spec.id for spec in self._specs]}") assert len(specs) == 1, "ids of tensor specs should be unique" return specs[0] - def _check_spec_exists(self, tensor_id: str): - spec_names = [spec.name for spec in self._input_specs] - if tensor_id not in spec_names: - raise ValueError(f"Spec {tensor_id} doesn't exist for specs {spec_names}") - - def _check_shape_explicit(self, spec: nodes.InputTensor, tensor_shape: Dict[str, int]): - assert self.is_shape_explicit(spec) - reference_shape = {name: size for name, size in zip(spec.axes, spec.shape)} - self.check_same_axes(reference_shape, tensor_shape) - if reference_shape != tensor_shape: - raise ValueError(f"Incompatible shapes found {tensor_shape}, expected {reference_shape}") - - def _check_shape_parameterized(self, spec: nodes.InputTensor, tensor_shape: Dict[str, int]): - assert isinstance(spec.shape, ParametrizedInputShape) - if not self.is_shape(tensor_shape.values()): - raise ValueError(f"Invalid shape's sizes {tensor_shape}") - - min_shape = self.get_axes_with_size(spec.axes, tuple(spec.shape.min)) - step = self.get_axes_with_size(spec.axes, tuple(spec.shape.step)) - self.check_same_axes(tensor_shape, min_shape) - - tensor_shapes_arr = np.array(list(tensor_shape.values())) - min_shape_arr = np.array(list(min_shape.values())) - step_arr = np.array(list(step.values())) - diff = tensor_shapes_arr - min_shape_arr - if any(size < 0 for size in diff): - raise ValueError(f"Tensor shape {tensor_shape} smaller than min shape {min_shape}") - - non_zero_idx = np.nonzero(step_arr) - multipliers = diff[non_zero_idx] / step_arr[non_zero_idx] - multiplier = np.unique(multipliers) - if len(multiplier) == 1 and self.is_natural_number(multiplier[0]): - return - raise ValueError(f"Tensor shape {tensor_shape} not valid for spec {spec}") - - @staticmethod - def check_same_axes(source: Dict[str, int], target: Dict[str, int]): - if source.keys() != target.keys(): - raise ValueError(f"Incompatible axes for tensor {target} and reference {source}") - - @staticmethod - def is_natural_number(n) -> bool: - return n % 1 == 0.0 and n >= 0 - - @staticmethod - def is_shape(shape: Iterator[int]) -> bool: - return all(InputTensorValidator.is_natural_number(dim) for dim in shape) - - @staticmethod - def get_axes_with_size(axes: Tuple[str, ...], shape: Tuple[int, ...]) -> Dict[str, int]: - assert len(axes) == len(shape) - return {name: size for name, size in zip(axes, shape)} - - @staticmethod - def is_shape_explicit(spec: nodes.InputTensor) -> bool: - return isinstance(spec.shape, list) - class ModelSessionProcess(IRPCModelSession[PredictionPipeline]): def __init__(self, model: PredictionPipeline) -> None: @@ -114,8 +104,7 @@ def __init__(self, model: PredictionPipeline) -> None: self._worker = base.SessionBackend(self._model) def forward(self, sample: Sample) -> Future: - tensors_data = [sample.tensors[tensor.name] for tensor in self.model.input_specs] - res = self._worker.forward(tensors_data) + res = self._worker.forward(sample) return res def create_dataset(self, mean, stddev): @@ -165,7 +154,9 @@ def start_model_session_process( proc.start() api = _mp_rpc.create_client_api(iface_cls=IRPCModelSession, conn=client_conn) return proc, BioModelClient( - input_specs=prediction_pipeline.input_specs, output_specs=prediction_pipeline.output_specs, api=api + input_specs=prediction_pipeline.model_description.inputs, + output_specs=prediction_pipeline.model_description.outputs, + api=api, ) @@ -173,5 +164,5 @@ def _get_prediction_pipeline_from_model_bytes(model_zip: bytes, devices: List[st with tempfile.NamedTemporaryFile(suffix=".zip", delete=False) as _tmp_file: _tmp_file.write(model_zip) temp_file_path = pathlib.Path(_tmp_file.name) - model = load_resource_description(temp_file_path) + model = load_description(temp_file_path) return create_prediction_pipeline(bioimageio_model=model, devices=devices) diff --git a/vendor/core-bioimage-io-python b/vendor/core-bioimage-io-python index 099cc731..ebe126b5 160000 --- a/vendor/core-bioimage-io-python +++ b/vendor/core-bioimage-io-python @@ -1 +1 @@ -Subproject commit 099cc7311c1257a180eaa97943c142a397112bba +Subproject commit ebe126b5f3e3b4d8d111c3153a4e357a1e0819e2 diff --git a/vendor/spec-bioimage-io b/vendor/spec-bioimage-io index 4a05b37c..ae96a548 160000 --- a/vendor/spec-bioimage-io +++ b/vendor/spec-bioimage-io @@ -1 +1 @@ -Subproject commit 4a05b37cef115bb60a2ed27629bb193e687db3da +Subproject commit ae96a5486e8652ebefd3c23ba1f6d6ca05051a22