diff --git a/elf/parallel/common.py b/elf/parallel/common.py index 9cad57b..8f0b42e 100644 --- a/elf/parallel/common.py +++ b/elf/parallel/common.py @@ -7,6 +7,8 @@ def _is_chunk_aligned(shape, chunks): def get_blocking(data, block_shape, roi, n_threads): + """@private + """ # check if we have a chunked data storage try: chunks = data.chunks diff --git a/elf/parallel/copy_dataset.py b/elf/parallel/copy_dataset.py index ba2e8bc..bdea4f0 100644 --- a/elf/parallel/copy_dataset.py +++ b/elf/parallel/copy_dataset.py @@ -1,29 +1,35 @@ import multiprocessing from concurrent import futures +from typing import Optional, Tuple +from numpy.typing import ArrayLike from tqdm import tqdm from .common import get_blocking -def copy_dataset(ds_in, ds_out, - roi_in=None, - roi_out=None, - block_shape=None, - n_threads=None, - verbose=False): - """ Copy input to output dataset in parallel. - - Arguments: - ds_in [dataset] - input dataset (h5py, z5py or zarr dataset) - ds_out [dataset] - output dataset (h5py, z5py or zarr dataset) - roi_in [tuple[slice]] - region of interest (for the input dataset) (default: None) - roi_out [tuple[slice]] - region of interest (for the output dataset) (default: None) - block_shape [tuple] - shape of the blocks used for parallelisation, - by default chunks of the output dataset will be used (default: None) - n_threads [int] - number of threads, by default all are used (default: None) - verbose [bool] - verbosity flag (default: False) +def copy_dataset( + ds_in: ArrayLike, + ds_out: ArrayLike, + roi_in: Optional[Tuple[slice, ...]] = None, + roi_out: Optional[Tuple[slice, ...]] = None, + block_shape: Optional[Tuple[int, ...]] = None, + n_threads: Optional[int] = None, + verbose: bool = False, +) -> ArrayLike: + """Copy input to an output dataset or other array-like object in parallel. + + Args: + ds_in: The input dataset or array-like object, like h5py, z5py or zarr dataset. + ds_out: The output dataset, like h5py, z5py or zarr dataset. + roi_in: Region of interest for the input dataset. + roi_out: Region of interest for the output dataset. + block_shape: Shape of the blocks to use for parallelisation, + by default chunks of the output dataset will be used. + n_threads: Number of threads, by default all available ones are used. + verbose: Verbosity flag. + Returns: - array_like - output + The output dataset / array-like object. """ n_threads = multiprocessing.cpu_count() if n_threads is None else n_threads diff --git a/elf/parallel/distance_transform.py b/elf/parallel/distance_transform.py index afe9615..0062528 100644 --- a/elf/parallel/distance_transform.py +++ b/elf/parallel/distance_transform.py @@ -1,11 +1,13 @@ # IMPORTANT do threadctl import first (before numpy imports) from threadpoolctl import threadpool_limits +from typing import Optional, Tuple, Union import multiprocessing # would be nice to use dask, so that we can also run this on the cluster from concurrent import futures import numpy as np +from numpy.typing import ArrayLike from scipy.ndimage import distance_transform_edt from tqdm import tqdm @@ -13,18 +15,41 @@ def distance_transform( - data, - halo, - sampling=None, - return_distances=True, - return_indices=False, - distances=None, - indices=None, - block_shape=None, - n_threads=None, - verbose=False, - roi=None, -): + data: ArrayLike, + halo: Tuple[int, ...], + sampling: Optional[Union[float, Tuple[float, ...]]] = None, + return_distances: bool = True, + return_indices: bool = False, + distances: Optional[ArrayLike] = None, + indices: Optional[ArrayLike] = None, + block_shape: Optional[Tuple[int, ...]] = None, + n_threads: Optional[int] = None, + verbose: bool = False, + roi: Optional[Tuple[slice, ...]] = None, +) -> Union[ArrayLike, Tuple[ArrayLike, ArrayLike]]: + """Compute distance transform in parallel over blocks. + + The results are only correct up to the distance to the block boundary plus halo. + The function `scipy.ndimage.distance_transform_edt` is used to compute the distances. + + Args: + data: The input data. + halo: The halo, which is the padding added at each side of the block. + sampling: The sampling value passed to distance_transfor_edt. + return_distances: Whether to return the computed distances. + return_indices: Whether to return the computed indices. + distances: Pre-allocated array-like object for the distances. + indices: Pre-allocated array-like object for the indices. + block_shape: Shape of the blocks to use for parallelisation, + by default chunks of the input will be used, if available. + n_threads: Number of threads, by default all available threads are used. + verbose: Verbosity flag. + roi: Region of interest for this computation. + + Returns: + The distances, if return_distances is set to True. + The indices, if return_distances is set ot True. + """ if data.ndim not in (2, 3): raise ValueError( f"Invalid input dimensionality. Expected input to have 2 or 3 dimensions, got {data.ndim}." diff --git a/elf/parallel/filters.py b/elf/parallel/filters.py index fc8b9ff..2dfcfb3 100644 --- a/elf/parallel/filters.py +++ b/elf/parallel/filters.py @@ -3,8 +3,10 @@ from concurrent import futures from functools import partial from tqdm import tqdm +from typing import Optional, Tuple import numpy as np +from numpy.typing import ArrayLike try: import fastfilters as ff except ImportError: @@ -16,14 +18,20 @@ # helper function to choose a channel from filter output def choose_channel(data, sigma, function, channel): + """@private + """ return function(data, sigma)[..., channel] def block_to_bb(block): + """@private + """ return tuple(slice(beg, end) for beg, end in zip(block.begin, block.end)) def get_halo(sigma, order, ndim, outer_scale=None): + """@private + """ sigma_ = sigma if outer_scale is None else sigma + outer_scale halo = sigma_to_halo(sigma_, order) if isinstance(halo, int): @@ -31,28 +39,37 @@ def get_halo(sigma, order, ndim, outer_scale=None): return halo -def apply_filter(data, filter_name, sigma, - outer_scale=None, return_channel=None, - out=None, block_shape=None, - n_threads=None, mask=None, - verbose=False, roi=None): - """ Apply filter to data in parallel. - - Arguments: - data [array_like] - input data, numpy array or similar like h5py or zarr dataset - filter_name [str] - name of the filter to apply - sigma [float or list[float]] - sigma value for filter - outer_scale [float] - outer scale value for structure tensor (default: None) - return_channel [int] - channel to select for multi-scale response (default: None) - out [array_like] - output, by default the filter is applied inplace (default: None) - block_shape [tuple] - shape of the blocks used for parallelisation, - by default chunks of the input will be used, if available (default: None) - n_threads [int] - number of threads, by default all are used (default: None) - mask [array_like] - mask to exclude data from the computation (default: None) - verbose [bool] - verbosity flag (default: False) - roi [tuple[slice]] - region of interest for this computation (default: None) +def apply_filter( + data: ArrayLike, + filter_name: str, + sigma: float, + outer_scale: Optional[str] = None, + return_channel: Optional[int] = None, + out: Optional[ArrayLike] = None, + block_shape: Optional[Tuple[int, ...]] = None, + n_threads: Optional[int] = None, + mask: Optional[ArrayLike] = None, + verbose: bool = False, + roi: Optional[Tuple[slice, ...]] = None, +) -> ArrayLike: + """Apply filter to data in parallel. + + Args: + data: Input data, numpy array or similar like h5py or zarr dataset + filter_name: Name of the filter to apply. + sigma: Sigma value for filter. + outer_scale: Outer scale value for structure tensor. + return_channel: Channel to select for multi-channel response. + out: Output, by default the filter is applied inplace. + block_shape: Shape of the blocks used for parallelisation, + by default chunks of the input will be used, if available. + n_threads: Number of threads, by default all are used. + mask: Mask to exclude data from the computation. + verbose: Verbosity flag. + roi: Region of interest for this computation. + Returns: - array_like - filter response + The filter response. """ n_threads = multiprocessing.cpu_count() if n_threads is None else n_threads @@ -147,22 +164,31 @@ def _generate_filter(filter_name): doc_str =\ """Comppute %s response block-wise and in parallel. - Arguments: - data [array_like] - input data, numpy array or similar like h5py or zarr dataset - sigma [float or list[float]] - sigma value for filter - out [array_like] - output, by default the filter is applied inplace (default: None) - block_shape [tuple] - shape of the blocks used for parallelisation, - by default chunks of the input will be used, if available (default: None) - n_threads [int] - number of threads, by default all are used (default: None) - mask [array_like] - mask to exclude data from the computation (default: None) - verbose [bool] - verbosity flag (default: False) - roi [tuple[slice]] - region of interest for this computation (default: None) + Args: + data: Input data, numpy array or similar like h5py or zarr dataset. + sigma: Sigma value for filter. + out: Output, by default the filter is applied inplace. + block_shape: Shape of the blocks used for parallelisation, + by default chunks of the input will be used, if available. + n_threads: Number of threads, by default all are used. + mask: Mask to exclude data from the computation. + verbose: Verbosity flag. + roi: Region of interest for this computation. + Returns: - array_like - filter response + The filter response. """ % elf_name - def op(data, sigma, out=None, block_shape=None, - n_threads=None, mask=None, verbose=False, roi=None): + def op( + data: ArrayLike, + sigma: float, + out: Optional[ArrayLike] = None, + block_shape: Optional[Tuple[int, ...]] = None, + n_threads: Optional[int] = None, + mask: Optional[ArrayLike] = None, + verbose: bool = False, + roi: Optional[Tuple[slice, ...]] = None, + ) -> ArrayLike: return apply_filter(data, filter_name, sigma, outer_scale=None, return_channel=None, out=out, block_shape=block_shape, n_threads=n_threads, mask=mask, verbose=verbose, roi=roi) @@ -179,24 +205,34 @@ def _generate_structure_tensor(filter_name): """Comppute %s response block-wise and in parallel. Arguments: - data [array_like] - input data, numpy array or similar like h5py or zarr dataset - sigma [float or list[float]] - sigma value for filter - outer_scale [float] - outer scale value - return_channel [int] - return selected channel (default: None) - out [array_like] - output, by default the filter is applied inplace (default: None) - block_shape [tuple] - shape of the blocks used for parallelisation, - by default chunks of the input will be used, if available (default: None) - n_threads [int] - number of threads, by default all are used (default: None) - mask [array_like] - mask to exclude data from the computation (default: None) - verbose [bool] - verbosity flag (default: False) - roi [tuple[slice]] - region of interest for this computation (default: None) + data: Input data, numpy array or similar like h5py or zarr dataset. + sigma: Sigma value for filter. + outer_scale: Outer scale value. + return_channel: Return selected channel. + out: Output, by default the filter is applied inplace. + block_shape: Shape of the blocks used for parallelisation, + by default chunks of the input will be used, if available. + n_threads: Number of threads, by default all are used. + mask: Mask to exclude data from the computation. + verbose: Verbosity flag. + roi: Region of interest for this computation. + Returns: - array_like - filter response + The filter response. """ % elf_name - def op(data, sigma, outer_scale, - return_channel=None, out=None, block_shape=None, - n_threads=None, mask=None, verbose=False, roi=None): + def op( + data: ArrayLike, + sigma: float, + outer_scale: float, + return_channel: Optional[int] = None, + out: Optional[ArrayLike] = None, + block_shape: Optional[Tuple[int, ...]] = None, + n_threads: Optional[int] = None, + mask: Optional[ArrayLike] = None, + verbose: bool = False, + roi: Optional[Tuple[slice, ...]] = None, + ) -> ArrayLike: return apply_filter(data, filter_name, sigma, outer_scale=outer_scale, return_channel=return_channel, out=out, block_shape=block_shape, n_threads=n_threads, mask=mask, verbose=verbose, roi=roi) @@ -212,24 +248,33 @@ def _generate_hessian(filter_name): doc_str =\ """Comppute %s response block-wise and in parallel. - Arguments: - data [array_like] - input data, numpy array or similar like h5py or zarr dataset - sigma [float or list[float]] - sigma value for filter - return_channel [int] - return selected channel (default: None) - out [array_like] - output, by default the filter is applied inplace (default: None) - block_shape [tuple] - shape of the blocks used for parallelisation, - by default chunks of the input will be used, if available (default: None) - n_threads [int] - number of threads, by default all are used (default: None) - mask [array_like] - mask to exclude data from the computation (default: None) - verbose [bool] - verbosity flag (default: False) - roi [tuple[slice]] - region of interest for this computation (default: None) + Args: + data: Input data, numpy array or similar like h5py or zarr dataset. + sigma: Sigma value for filter. + return_channel: Return selected channel. + out: Output, by default the filter is applied inplace. + block_shape: Shape of the blocks used for parallelisation, + by default chunks of the input will be used, if available. + n_threads: Number of threads, by default all are used. + mask: Mask to exclude data from the computation. + verbose: Verbosity flag. + roi: Region of interest for this computation. + Returns: - array_like - filter response + The filter response. """ % elf_name - def op(data, sigma, return_channel=None, - out=None, block_shape=None, - n_threads=None, mask=None, verbose=False, roi=None): + def op( + data: ArrayLike, + sigma: float, + return_channel: Optional[int] = None, + out: Optional[ArrayLike] = None, + block_shape: Optional[Tuple[int, ...]] = None, + n_threads: Optional[int] = None, + mask: Optional[ArrayLike] = None, + verbose: bool = False, + roi: Optional[Tuple[slice, ...]] = None, + ): return apply_filter(data, filter_name, sigma, return_channel=return_channel, out=out, block_shape=block_shape, n_threads=n_threads, mask=mask, verbose=verbose, roi=roi) diff --git a/elf/parallel/io.py b/elf/parallel/io.py index 0cf0922..0123770 100644 --- a/elf/parallel/io.py +++ b/elf/parallel/io.py @@ -1,28 +1,37 @@ import multiprocessing from concurrent import futures from functools import partial -from tqdm import tqdm +from typing import Optional, Tuple, Union import numpy as np +from numpy.typing import ArrayLike +from tqdm import tqdm from .common import get_blocking -def copy(data, out, - block_shape=None, n_threads=None, - mask=None, verbose=False, roi=None): - """ Copy a dataset in parallel. - - Arguments: - data [array_like] - input data, numpy array or similar like h5py or zarr dataset - out [array_like] - output dataset - block_shape [tuple] - shape of the blocks used for parallelisation, - by default chunks of the output will be used, if available (default: None) - n_threads [int] - number of threads, by default all are used (default: None) - mask [array_like] - mask to exclude data from the computation (default: None) - verbose [bool] - verbosity flag (default: False) - roi [tuple[slice]] - region of interest for this computation (default: None) +def copy( + data: ArrayLike, + out: ArrayLike, + block_shape: Optional[Tuple[int, ...]] = None, + n_threads: Optional[int] = None, + mask: Optional[ArrayLike] = None, + verbose: bool = False, + roi: Optional[Tuple[slice, ...]] = None, +) -> ArrayLike: + """Copy a dataset or array-like object in parallel. + + Args: + data: Input data, numpy array or similar like h5py or zarr dataset. + out: Output dataset or array-like object. + block_shape: Shape of the blocks to use for parallelisation, + by default chunks of the output will be used, if available. + n_threads: Number of threads, by default all are used. + mask: Mask to exclude data from the computation. + verbose: Verbosity flag. + roi: Region of interest for this computation. + Returns: - array_like - the copied dataset + The copied object. """ n_threads = multiprocessing.cpu_count() if n_threads is None else n_threads @@ -75,33 +84,45 @@ def _ds_interpolate(data, out_shape, order): cubic_downscaling = partial(_ds_interpolate, order=3) -def downscale(data, out, downscaling_function=None, - block_shape=None, n_threads=None, - mask=None, verbose=False, roi=None): - """ Downscale a dataset in parallel. - - Arguments: - data [array_like] - input data, numpy array or similar like h5py or zarr dataset - out [array_like] - output dataset - downscaling_function [str or callable] - the function used for downscaling the blocks. - By default mean downscaling is used (default: fNone) - block_shape [tuple] - shape of the blocks used for parallelisation, - by default chunks of the output will be used, if available (default: None) - n_threads [int] - number of threads, by default all are used (default: None) - mask [array_like] - mask to exclude data from the computation (default: None) - verbose [bool] - verbosity flag (default: False) - roi [tuple[slice]] - region of interest for this computation (default: None) +def downscale( + data: ArrayLike, + out: ArrayLike, + downscaling_function: Optional[Union[str, callable]] = None, + block_shape: Optional[Tuple[int, ...]] = None, + n_threads: Optional[int] = None, + mask: Optional[ArrayLike] = None, + verbose: bool = False, + roi: Optional[Tuple[slice, ...]] = None, +) -> ArrayLike: + """Downscale a dataset in parallel. + + This functionality is not yet implemented. Calling it will raise a NotImplementedError. + + Args: + data: Input data, numpy array or similar like h5py or zarr dataset. + out: Output dataset / array-like object. + downscaling_function: The function used for downscaling the blocks. + By default mean downscaling is used. + block_shape: Shape of the blocks to use for parallelisation, + by default chunks of the output will be used, if available. + n_threads: Number of threads, by default all are used. + mask: Mask to exclude data from the computation. + verbose: Verbosity flag. + roi: Region of interest for this computation. + Returns: - array_like - the downscaled dataset + The downscaled output. """ - ds_function_dict = {'mean_downscaling': mean_downscaling, - 'max_downscaling': max_downscaling, - 'min_downscaling': min_downscaling, - 'nearest_downscaling': nearest_downscaling, - 'linear_downscaling': linear_downscaling, - 'quadratic_downscaling': quadratic_downscaling, - 'cubic_downscaling': cubic_downscaling} - ds_function_dict.update({name.replace('_downscaling', ''): func + raise NotImplementedError + + ds_function_dict = {"mean_downscaling": mean_downscaling, + "max_downscaling": max_downscaling, + "min_downscaling": min_downscaling, + "nearest_downscaling": nearest_downscaling, + "linear_downscaling": linear_downscaling, + "quadratic_downscaling": quadratic_downscaling, + "cubic_downscaling": cubic_downscaling} + ds_function_dict.update({name.replace("_downscaling", ""): func for name, func in ds_function_dict.items()}) if downscaling_function is None: diff --git a/elf/parallel/operations.py b/elf/parallel/operations.py index ed16d90..dc238a7 100644 --- a/elf/parallel/operations.py +++ b/elf/parallel/operations.py @@ -5,12 +5,15 @@ # would be nice to use dask for all of this instead of concurrent.futures # so that this could be used on a cluster as well from concurrent import futures -from numbers import Number from functools import partial +from numbers import Number +from typing import Optional, Tuple, Union + +import numpy as np +from numpy.typing import ArrayLike from tqdm import tqdm from .common import get_blocking -import numpy as np def _compute_broadcast(shapex, shapey): @@ -25,24 +28,31 @@ def _compute_broadcast(shapex, shapey): return broadcast -def isin(x, y, out=None, - block_shape=None, n_threads=None, - mask=None, verbose=False, roi=None): - """ Compute np.isin in parallel. - - Arguments: - x [array_like] - operand 1, numpy array or similar like h5py or zarr dataset - y [array_like or scalar] - operand 2, scalar, numpy array or list - out [array_like] - output, by default the operation - is done inplace in the first operand (default: None) - block_shape [tuple] - shape of the blocks used for parallelisation, - by default chunks of the input will be used, if available (default: None) - n_threads [int] - number of threads, by default all are used (default: None) - mask [array_like] - mask to exclude data from the computation (default: None) - verbose [bool] - verbosity flag (default: False) - roi [tuple[slice]] - region of interest for this computation (default: None) +def isin( + x: ArrayLike, + y: Union[ArrayLike, Number], + out: Optional[ArrayLike] = None, + block_shape: Optional[Tuple[int, ...]] = None, + n_threads: Optional[int] = None, + mask: Optional[ArrayLike] = None, + verbose: bool = False, + roi: Optional[Tuple[slice, ...]] = None, +) -> ArrayLike: + """Compute np.isin in parallel. + + Args: + x: Operand 1, numpy array or similar, like h5py or zarr dataset. + y: Operand 2, scalar, numpy array or list. + out: Output, by default the operation is done inplace in the first operand. + block_shape: Shape of the blocks to use for parallelisation, + by default chunks of the input will be used, if available. + n_threads: Number of threads, by default all are used. + mask: Mask to exclude data from the computation. + verbose: Verbosity flag. + roi: Region of interest for this computation. + Returns: - array_like - output + The output. """ # check the mask if given @@ -86,26 +96,33 @@ def _isin(block_id): return out -def apply_operation(x, y, operation, out=None, - block_shape=None, n_threads=None, - mask=None, verbose=False, roi=None): - """ Apply operation to two operands in parallel. - - Arguments: - x [array_like] - operand 1, numpy array or similar like h5py or zarr dataset - y [array_like or scalar] - operand 2, numpy array or similar like h5py or zarr dataset - or scalar - operation [callable] - operation applied to the two operands - out [array_like] - output, by default the operation - is done inplace in the first operand (default: None) - block_shape [tuple] - shape of the blocks used for parallelisation, - by default chunks of the input will be used, if available (default: None) - n_threads [int] - number of threads, by default all are used (default: None) - mask [array_like] - mask to exclude data from the computation (default: None) - verbose [bool] - verbosity flag (default: False) - roi [tuple[slice]] - region of interest for this computation (default: None) +def apply_operation( + x: ArrayLike, + y: Union[ArrayLike, Number], + operation: callable, + out: Optional[ArrayLike] = None, + block_shape: Optional[Tuple[int, ...]] = None, + n_threads: Optional[int] = None, + mask: Optional[ArrayLike] = None, + verbose: bool = False, + roi: Optional[Tuple[slice, ...]] = None, +) -> ArrayLike: + """Apply operation to two operands in parallel. + + Args: + x: Operand 1, numpy array or similar like h5py or zarr dataset. + y: Operand 2, numpy array or similar like h5py or zarr dataset or scalar. + operation: Operation applied to the two operands. + out: Output, by default the operation is done inplace in the first operand. + block_shape: Shape of the blocks to use for parallelisation, + by default chunks of the input will be used, if available. + n_threads: Number of threads, by default all are used. + mask: Mask to exclude data from the computation. + verbose: Verbosity flag. + roi: Region of interest for this computation. + Returns: - array_like - output + The output. """ # check type and dimension of the second operand and check if we need to broadcast @@ -193,25 +210,33 @@ def _apply_array(block_id): return out -def apply_operation_single(x, operation, axis=None, out=None, - block_shape=None, n_threads=None, - mask=None, verbose=False, roi=None): - """ Apply operation to single operand in parallel. - - Arguments: - x [array_like] - operand 1, numpy array or similar like h5py or zarr dataset - operation [callable] - operation applied to the two operands - axis [int] - axis along which to apply the operation (default: Naone) - out [array_like] - output, by default the operation - is done inplace in the first operand (default: None) - block_shape [tuple] - shape of the blocks used for parallelisation, - by default chunks of the input will be used, if available (default: None) - n_threads [int] - number of threads, by default all are used (default: None) - mask [array_like] - mask to exclude data from the computation (default: None) - verbose [bool] - verbosity flag (default: False) - roi [tuple[slice]] - region of interest for this computation (default: None) +def apply_operation_single( + x: ArrayLike, + operation: callable, + axis: Optional[int] = None, + out: Optional[ArrayLike] = None, + block_shape: Optional[Tuple[int, ...]] = None, + n_threads: Optional[int] = None, + mask: Optional[ArrayLike] = None, + verbose: bool = False, + roi: Optional[Tuple[slice, ...]] = None, +) -> ArrayLike: + """Apply operation to single operand in parallel. + + Args: + x: Operand 1, numpy array or similar like h5py or zarr dataset. + operation: Operation applied to the two operands. + axis: Axis along which to apply the operation. + out: Output, by default the operation is done inplace in the first operand. + block_shape: Shape of the blocks used for parallelisation, + by default chunks of the input will be used, if available. + n_threads: Number of threads, by default all are used. + mask: Mask to exclude data from the computation. + verbose: Verbosity flag. + roi: Region of interest for this computation. + Returns: - array_like - output + The output. """ shape = x.shape @@ -270,23 +295,31 @@ def _generate_operation(op_name): doc_str =\ """Apply np.%s block-wise and in parallel. - Arguments: - x [array_like] - operand 1, numpy array or similar like h5py or zarr dataset - y [array_like or scalar] - operand 2, numpy array, h5py or zarr dataset or scalar - out [array_like] - output, by default the operation - is done inplace in the first operand (default: None) - block_shape [tuple] - shape of the blocks used for parallelisation, - by default chunks of the input will be used, if available (default: None) - n_threads [int] - number of threads, by default all are used (default: None) - mask [array_like] - mask to exclude data from the computation (default: None) - verbose [bool] - verbosity flag (default: False) - roi [tuple[slice]] - region of interest for this computation (default: None) + Args: + x: Operand 1, numpy array or similar like h5py or zarr dataset. + y: Operand 2, numpy array, h5py or zarr dataset or scalar. + out: Output, by default the operation is done inplace in the first operand. + block_shape: Shape of the blocks to use for parallelisation, + by default chunks of the input will be used, if available. + n_threads: Number of threads, by default all are used. + mask: Mask to exclude data from the computation. + verbose: Verbosity flag. + roi: Region of interest for this computation. + Returns: - array_like - output + The output. """ % op_name - def op(x, y, out=None, block_shape=None, n_threads=None, - mask=None, verbose=False, roi=None): + def op( + x: ArrayLike, + y: Union[ArrayLike, Number], + out: Optional[ArrayLike] = None, + block_shape: Optional[Tuple[int, ...]] = None, + n_threads: Optional[int] = None, + mask: Optional[ArrayLike] = None, + verbose: bool = False, + roi: Optional[Tuple[slice, ...]] = None, + ) -> ArrayLike: return apply_operation(x, y, getattr(np, op_name), block_shape=block_shape, n_threads=n_threads, mask=mask, verbose=verbose, out=out, roi=roi) diff --git a/elf/parallel/relabel.py b/elf/parallel/relabel.py index 085da91..fdf35ce 100644 --- a/elf/parallel/relabel.py +++ b/elf/parallel/relabel.py @@ -5,35 +5,47 @@ # would be nice to use dask, so that we can also run this on the cluster from concurrent import futures -from tqdm import tqdm +from typing import Dict, Optional, Tuple + import nifty.tools as nt +from tqdm import tqdm from .unique import unique from .common import get_blocking import numpy as np +from numpy.typing import ArrayLike -def relabel_consecutive(data, start_label=0, keep_zeros=True, out=None, - block_shape=None, n_threads=None, - mask=None, verbose=False, roi=None): +def relabel_consecutive( + data: ArrayLike, + start_label: int = 0, + keep_zeros: bool = True, + out: Optional[ArrayLike] = None, + block_shape: Tuple[int, ...] = None, + n_threads: Optional[int] = None, + mask: Optional[ArrayLike] = None, + verbose: bool = False, + roi: Tuple[slice, ...] = None, +) -> Tuple[ArrayLike, int, Dict[int, int]]: """Compute the unique values of the data. - Arguments: - data [array_like] - input data, numpy array or similar like h5py or zarr dataset - start_label [int] - start value for relabeling (default: 0) - keep_zeros [bool] - whether to always keep zero mapped to zero (default: True) - out [array_like] - output, by default the relabeling is done inplace (default: None) - block_shape [tuple] - shape of the blocks used for parallelisation, - by default chunks of the input will be used, if available (default: None) - n_threads [int] - number of threads, by default all are used (default: None) - mask [array_like] - mask to exclude data from the computation (default: None) - verbose [bool] - verbosity flag (default: False) - roi [tuple[slice]] - region of interest for this computation (default: None) + Args: + data: Input data, numpy array or similar like h5py or zarr dataset. + start_label: Start value for relabeling. + keep_zeros: Whether to always keep zero mapped to zero. + out: Output, by default the relabeling is done inplace. + block_shape: Shape of the blocks used for parallelisation, + by default chunks of the input will be used, if available. + n_threads: Number of threads, by default all are used. + mask: Mask to exclude data from the computation. + verbose: Verbosity flag. + roi: Region of interest for this computation. + Returns: - array_like - the output data - int - the max id after relabeling - dict - mapping of old to new labels + The relabeled output data. + The max id after relabeling. + The mapping of old to new labels. """ n_threads = multiprocessing.cpu_count() if n_threads is None else n_threads diff --git a/elf/parallel/seeded_watershed.py b/elf/parallel/seeded_watershed.py index 2d30d03..a6948d5 100644 --- a/elf/parallel/seeded_watershed.py +++ b/elf/parallel/seeded_watershed.py @@ -1,15 +1,43 @@ import multiprocessing # would be nice to use dask, so that we can also run this on the cluster from concurrent import futures +from typing import Optional, Tuple +from numpy.typing import ArrayLike from skimage.segmentation import watershed from tqdm import tqdm from elf.parallel.common import get_blocking -def seeded_watershed(hmap, seeds, out, block_shape, halo, - mask=None, n_threads=None, verbose=False, roi=None): +def seeded_watershed( + hmap: ArrayLike, + seeds: ArrayLike, + out: ArrayLike, + block_shape: Tuple[int, ...], + halo: Tuple[int, ...], + mask: Optional[ArrayLike] = None, + n_threads: Optional[int] = None, + verbose: bool = False, + roi: Optional[Tuple[slice, ...]] = None, +) -> ArrayLike: + """Compute seeded watershed in parallel over blocks. + + Args: + hmap: The heightmap for the watershed. + seeds: The seeds for the watershed. + out: The output for the watershed. + block_shape: Shape of the blocks used for parallelisation, + by default chunks of the input will be used, if available. + halo: The halo for enlarging the blocks used for parallelization. + mask: Mask to exclude data from the watershed. + n_threads: Number of threads, by default all are used. + verbose: Verbosity flag. + roi: Region of interest for this computation. + + Returns: + The watershed output. + """ n_threads = multiprocessing.cpu_count() if n_threads is None else n_threads blocking = get_blocking(hmap, block_shape, roi, n_threads) diff --git a/elf/parallel/size_filter.py b/elf/parallel/size_filter.py index 94988d7..97281cc 100644 --- a/elf/parallel/size_filter.py +++ b/elf/parallel/size_filter.py @@ -4,6 +4,7 @@ import multiprocessing # would be nice to use dask, so that we can also run this on the cluster from concurrent import futures +from typing import Optional, Tuple import nifty.tools as nt from tqdm import tqdm @@ -12,11 +13,37 @@ from .unique import unique import numpy as np +from numpy.typing import ArrayLike + + +def segmentation_filter( + data: ArrayLike, + out: ArrayLike, + filter_function: callable, + block_shape: Optional[Tuple[int, ...]] = None, + n_threads: Optional[int] = None, + mask: Optional[ArrayLike] = None, + verbose: bool = False, + roi: Optional[Tuple[slice, ...]] = None, + relabel: Optional[callable] = None, +) -> ArrayLike: + """Filter a segmentation based on a custom criterion. + + Args: + data: Input data, numpy array or similar like h5py or zarr dataset. + out: Output data, numpy array or similar like h5py or zarr dataset. + filter_function: The function to express the custom filter criterion. + block_shape: Shape of the blocks to use for parallelisation, + by default chunks of the input will be used, if available. + n_threads: Number of threads, by default all are used. + mask: Mask to exclude data from the computation. + verbose: Verbosity flag. + roi: Region of interest for this computation. + relabel: Function for relabeling the segmentation. - -def segmentation_filter(data, out, filter_function, block_shape=None, - n_threads=None, mask=None, verbose=False, roi=None, relabel=None): - + Returns: + The filtered segmentation. + """ n_threads = multiprocessing.cpu_count() if n_threads is None else n_threads blocking = get_blocking(data, block_shape, roi, n_threads) n_blocks = blocking.numberOfBlocks @@ -46,26 +73,38 @@ def apply_filter(block_id): return out -def size_filter(data, out, min_size=None, max_size=None, - block_shape=None, n_threads=None, mask=None, - verbose=False, roi=None, relabel=True): - """Filter objects small and/or large objects from a segmentation and set them to 0. +def size_filter( + data: ArrayLike, + out: ArrayLike, + min_size: Optional[int] = None, + max_size: Optional[int] = None, + block_shape: Optional[Tuple[int, ...]] = None, + n_threads: Optional[int] = None, + mask: Optional[ArrayLike] = None, + verbose: bool = False, + roi: Optional[Tuple[slice, ...]] = None, + relabel: bool = True, +) -> ArrayLike: + """Filter small objects and/or large objects from a segmentation and set them to 0. By default this functions relabels the segmentation result consecutively. Set relabel=False to avoid this behavior. - Arguments: - data [array_like] - input data, numpy array or similar like h5py or zarr dataset - out [array_like] - output data, numpy array or similar like h5py or zarr dataset - block_shape [tuple] - shape of the blocks used for parallelisation, - by default chunks of the input will be used, if available (default: None) - n_threads [int] - number of threads, by default all are used (default: None) - mask [array_like] - mask to exclude data from the computation (default: None) - verbose [bool] - verbosity flag (default: False) - roi [tuple[slice]] - region of interest for this computation (default: None) - relabel [bool] - whether to relabel the segmentation consecutively after filtering (default: True) + Args: + data: Input data, numpy array or similar like h5py or zarr dataset. + out: Output data, numpy array or similar like h5py or zarr dataset. + min_size: The minimal object size. + max_size: The maximum object size. + block_shape: Shape of the blocks to use for parallelisation, + by default chunks of the input will be used, if available. + n_threads: Number of threads, by default all are used. + mask: Mask to exclude data from the computation. + verbose: Verbosity flag. + roi: Region of interest for this computation. + relabel: Whether to relabel the segmentation consecutively after filtering. + Returns: - np.ndarray - + The filtered segmentation. """ assert (min_size is not None) or (max_size is not None) ids, counts = unique(data, return_counts=True, block_shape=block_shape, diff --git a/elf/parallel/stats.py b/elf/parallel/stats.py index e6a91af..e770a31 100644 --- a/elf/parallel/stats.py +++ b/elf/parallel/stats.py @@ -5,26 +5,36 @@ # would be nice to use dask for all of this instead of concurrent.futures # so that this could be used on a cluster as well from concurrent import futures +from typing import Optional, Tuple from tqdm import tqdm from .common import get_blocking import numpy as np +from numpy.typing import ArrayLike + + +def mean( + data: ArrayLike, + block_shape: Optional[Tuple[int, ...]] = None, + n_threads: Optional[int] = None, + mask: Optional[ArrayLike] = None, + verbose: bool = False, + roi: Optional[Tuple[slice, ...]] = None, +) -> float: + """Compute the mean of the data in parallel. + + Args: + data: Input data, numpy array or similar like h5py or zarr dataset. + block_shape: Shape of the blocks to use for parallelisation, + by default chunks of the input will be used, if available. + n_threads: Number of threads, by default all are used. + mask: Mask to exclude data from the computation. + verbose: Verbosity flag. + roi: Region of interest for this computation. - -def mean(data, block_shape=None, n_threads=None, mask=None, verbose=False, roi=None): - """ Compute the mean of the data in parallel. - - Arguments: - data [array_like] - input data, numpy array or similar like h5py or zarr dataset - block_shape [tuple] - shape of the blocks used for parallelisation, - by default chunks of the input will be used, if available (default: None) - n_threads [int] - number of threads, by default all are used (default: None) - mask [array_like] - mask to exclude data from the computation (default: None) - verbose [bool] - verbosity flag (default: False) - roi [tuple[slice]] - region of interest for this computation (default: None) Returns: - float - mean of the data + Mean of the data. """ n_threads = multiprocessing.cpu_count() if n_threads is None else n_threads @@ -57,20 +67,28 @@ def _mean(block_id): return np.mean(means) -def mean_and_std(data, block_shape=None, n_threads=None, mask=None, verbose=False, roi=None): - """ Compute the mean and the standard deviation of the data in parallel. +def mean_and_std( + data: ArrayLike, + block_shape: Optional[Tuple[int, ...]] = None, + n_threads: Optional[int] = None, + mask: Optional[ArrayLike] = None, + verbose: bool = False, + roi: Optional[Tuple[slice, ...]] = None, +) -> Tuple[float, float]: + """Compute the mean and the standard deviation of the data in parallel. + + Args: + data: Input data, numpy array or similar like h5py or zarr dataset. + block_shape: Shape of the blocks to use for parallelisation, + by default chunks of the input will be used, if available. + n_threads: Number of threads, by default all are used. + mask: Mask to exclude data from the computation. + verbose: Verbosity flag. + roi: Region of interest for this computation. - Arguments: - data [array_like] - input data, numpy array or similar like h5py or zarr dataset - block_shape [tuple] - shape of the blocks used for parallelisation, - by default chunks of the input will be used, if available (default: None) - n_threads [int] - number of threads, by default all are used (default: None) - mask [array_like] - mask to exclude data from the computation (default: None) - verbose [bool] - verbosity flag (default: False) - roi [tuple[slice]] - region of interest for this computation (default: None) Returns: - float - mean of the data - float - standard deviation of the data + Mean of the data. + Standard deviation of the data. """ n_threads = multiprocessing.cpu_count() if n_threads is None else n_threads @@ -115,37 +133,53 @@ def _mean_and_std(block_id): return mean_val, std_val -def std(data, block_shape=None, n_threads=None, mask=None, verbose=False, roi=None): - """ Compute the standard deviation of the data in parallel. +def std( + data: ArrayLike, + block_shape: Optional[Tuple[int, ...]] = None, + n_threads: Optional[int] = None, + mask: Optional[ArrayLike] = None, + verbose: bool = False, + roi: Optional[Tuple[slice, ...]] = None, +) -> float: + """Compute the standard deviation of the data in parallel. + + Args: + data: Input data, numpy array or similar like h5py or zarr dataset. + block_shape: Shape of the blocks to use for parallelisation, + by default chunks of the input will be used, if available. + n_threads: Number of threads, by default all are used. + mask: Mask to exclude data from the computation. + verbose: Verbosity flag. + roi: Region of interest for this computation. - Arguments: - data [array_like] - input data, numpy array or similar like h5py or zarr dataset - block_shape [tuple] - shape of the blocks used for parallelisation, - by default chunks of the input will be used, if available (default: None) - n_threads [int] - number of threads, by default all are used (default: None) - mask [array_like] - mask to exclude data from the computation (default: None) - verbose [bool] - verbosity flag (default: False) - roi [tuple[slice]] - region of interest for this computation (default: None) Returns: - float - standard deviation of the data + Standard deviation of the data. """ return mean_and_std(data, block_shape, n_threads, mask, verbose, roi)[1] -def min_and_max(data, block_shape=None, n_threads=None, mask=None, verbose=False, roi=None): - """ Compute the minimum and maximum of the data in parallel. +def min_and_max( + data: ArrayLike, + block_shape: Optional[Tuple[int, ...]] = None, + n_threads: Optional[int] = None, + mask: Optional[ArrayLike] = None, + verbose: bool = False, + roi: Optional[Tuple[slice, ...]] = None, +) -> Tuple[float, float]: + """Compute the minimum and maximum of the data in parallel. + + Args: + data: Input data, numpy array or similar like h5py or zarr dataset. + block_shape: Shape of the blocks to use for parallelisation, + by default chunks of the input will be used, if available. + n_threads: Number of threads, by default all are used. + mask: Mask to exclude data from the computation. + verbose: Verbosity flag. + roi: Region of interest for this computation. - Arguments: - data [array_like] - input data, numpy array or similar like h5py or zarr dataset - block_shape [tuple] - shape of the blocks used for parallelisation, - by default chunks of the input will be used, if available (default: None) - n_threads [int] - number of threads, by default all are used (default: None) - mask [array_like] - mask to exclude data from the computation (default: None) - verbose [bool] - verbosity flag (default: False) - roi [tuple[slice]] - region of interest for this computation (default: None) Returns: - scalar - minimum value of the data - scalar - maximum value of the data + Minimum value of the data. + Maximum value of the data. """ n_threads = multiprocessing.cpu_count() if n_threads is None else n_threads blocking = get_blocking(data, block_shape, roi, n_threads) @@ -183,35 +217,51 @@ def _min_and_max(block_id): return mins.min(), maxs.max() -def min(data, block_shape=None, n_threads=None, mask=None, verbose=False, roi=None): - """ Compute the minimum of the data in parallel. +def min( + data: ArrayLike, + block_shape: Optional[Tuple[int, ...]] = None, + n_threads: Optional[int] = None, + mask: Optional[ArrayLike] = None, + verbose: bool = False, + roi: Optional[Tuple[slice, ...]] = None, +) -> float: + """Compute the minimum and maximum of the data in parallel. + + Args: + data: Input data, numpy array or similar like h5py or zarr dataset. + block_shape: Shape of the blocks to use for parallelisation, + by default chunks of the input will be used, if available. + n_threads: Number of threads, by default all are used. + mask: Mask to exclude data from the computation. + verbose: Verbosity flag. + roi: Region of interest for this computation. - Arguments: - data [array_like] - input data, numpy array or similar like h5py or zarr dataset - block_shape [tuple] - shape of the blocks used for parallelisation, - by default chunks of the input will be used, if available (default: None) - n_threads [int] - number of threads, by default all are used (default: None) - mask [array_like] - mask to exclude data from the computation (default: None) - verbose [bool] - verbosity flag (default: False) - roi [tuple[slice]] - region of interest for this computation (default: None) Returns: - scalar - minimum value of the data + Minimum value of the data. """ return min_and_max(data, block_shape, n_threads, mask, verbose, roi)[0] -def max(data, block_shape=None, n_threads=None, mask=None, verbose=False, roi=None): - """ Compute the maximum of the data in parallel. +def max( + data: ArrayLike, + block_shape: Optional[Tuple[int, ...]] = None, + n_threads: Optional[int] = None, + mask: Optional[ArrayLike] = None, + verbose: bool = False, + roi: Optional[Tuple[slice, ...]] = None, +) -> float: + """Compute the minimum and maximum of the data in parallel. + + Args: + data: Input data, numpy array or similar like h5py or zarr dataset. + block_shape: Shape of the blocks to use for parallelisation, + by default chunks of the input will be used, if available. + n_threads: Number of threads, by default all are used. + mask: Mask to exclude data from the computation. + verbose: Verbosity flag. + roi: Region of interest for this computation. - Arguments: - data [array_like] - input data, numpy array or similar like h5py or zarr dataset - block_shape [tuple] - shape of the blocks used for parallelisation, - by default chunks of the input will be used, if available (default: None) - n_threads [int] - number of threads, by default all are used (default: None) - mask [array_like] - mask to exclude data from the computation (default: None) - verbose [bool] - verbosity flag (default: False) - roi [tuple[slice]] - region of interest for this computation (default: None) Returns: - scalar - maximum value of the data + Maximum value of the data. """ return min_and_max(data, block_shape, n_threads, mask, verbose, roi)[1] diff --git a/elf/parallel/unique.py b/elf/parallel/unique.py index 32e1253..975dbf9 100644 --- a/elf/parallel/unique.py +++ b/elf/parallel/unique.py @@ -4,29 +4,40 @@ import multiprocessing # would be nice to use dask, so that we can also run this on the cluster from concurrent import futures +from typing import Optional, Tuple, Union + from tqdm import tqdm from .common import get_blocking import numpy as np +from numpy.typing import ArrayLike + + +def unique( + data: ArrayLike, + return_counts: bool = False, + block_shape: Optional[Tuple[int, ...]] = None, + n_threads: Optional[int] = None, + mask: Optional[ArrayLike] = None, + verbose: bool = False, + roi: Optional[Tuple[slice, ...]] = None, +) -> Union[np.ndarray, Tuple[np.ndarray, np.ndarray]]: + """Compute the unique values of the data. + + Args: + data: Input data, numpy array or similar like h5py or zarr dataset. + return_counts: Whether to return the counts. + block_shape: Shape of the blocks to use for parallelisation, + by default chunks of the input will be used, if available. + n_threads: Number of threads, by default all are used. + mask: Mask to exclude data from the computation. + verbose: Verbosity flag. + roi: Region of interest for this computation. - -def unique(data, return_counts=False, block_shape=None, n_threads=None, - mask=None, verbose=False, roi=None): - """ Compute the unique values of the data. - - Arguments: - data [array_like] - input data, numpy array or similar like h5py or zarr dataset - return_counts [bool] - whether to return the counts (default: False) - block_shape [tuple] - shape of the blocks used for parallelisation, - by default chunks of the input will be used, if available (default: None) - n_threads [int] - number of threads, by default all are used (default: None) - mask [array_like] - mask to exclude data from the computation (default: None) - verbose [bool] - verbosity flag (default: False) - roi [tuple[slice]] - region of interest for this computation (default: None) Returns: - np.ndarray - unique values - np.ndarray - count values (only if return_counts is True) + The unique values. + The counts of the unique values, if return_counts is set to True. """ n_threads = multiprocessing.cpu_count() if n_threads is None else n_threads