Skip to content

Commit

Permalink
Merge pull request #108 from constantinpape/fix-label
Browse files Browse the repository at this point in the history
Fix parallel.label and add documentation strings
  • Loading branch information
constantinpape authored Dec 29, 2024
2 parents 2eda443 + 187ba01 commit 2d52133
Show file tree
Hide file tree
Showing 21 changed files with 760 additions and 459 deletions.
23 changes: 0 additions & 23 deletions .github/environment.yaml

This file was deleted.

2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ jobs:
- name: Setup micromamba
uses: mamba-org/setup-micromamba@v1
with:
environment-file: .github/environment.yaml
environment-file: environment.yaml
create-args: >-
python=${{ matrix.python-version }}
Expand Down
2 changes: 2 additions & 0 deletions elf/parallel/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ def _is_chunk_aligned(shape, chunks):


def get_blocking(data, block_shape, roi, n_threads):
"""@private
"""
# check if we have a chunked data storage
try:
chunks = data.chunks
Expand Down
42 changes: 24 additions & 18 deletions elf/parallel/copy_dataset.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,35 @@
import multiprocessing
from concurrent import futures
from typing import Optional, Tuple

from numpy.typing import ArrayLike
from tqdm import tqdm
from .common import get_blocking


def copy_dataset(ds_in, ds_out,
roi_in=None,
roi_out=None,
block_shape=None,
n_threads=None,
verbose=False):
""" Copy input to output dataset in parallel.
Arguments:
ds_in [dataset] - input dataset (h5py, z5py or zarr dataset)
ds_out [dataset] - output dataset (h5py, z5py or zarr dataset)
roi_in [tuple[slice]] - region of interest (for the input dataset) (default: None)
roi_out [tuple[slice]] - region of interest (for the output dataset) (default: None)
block_shape [tuple] - shape of the blocks used for parallelisation,
by default chunks of the output dataset will be used (default: None)
n_threads [int] - number of threads, by default all are used (default: None)
verbose [bool] - verbosity flag (default: False)
def copy_dataset(
ds_in: ArrayLike,
ds_out: ArrayLike,
roi_in: Optional[Tuple[slice, ...]] = None,
roi_out: Optional[Tuple[slice, ...]] = None,
block_shape: Optional[Tuple[int, ...]] = None,
n_threads: Optional[int] = None,
verbose: bool = False,
) -> ArrayLike:
"""Copy input to an output dataset or other array-like object in parallel.
Args:
ds_in: The input dataset or array-like object, like h5py, z5py or zarr dataset.
ds_out: The output dataset, like h5py, z5py or zarr dataset.
roi_in: Region of interest for the input dataset.
roi_out: Region of interest for the output dataset.
block_shape: Shape of the blocks to use for parallelisation,
by default chunks of the output dataset will be used.
n_threads: Number of threads, by default all available ones are used.
verbose: Verbosity flag.
Returns:
array_like - output
The output dataset / array-like object.
"""

n_threads = multiprocessing.cpu_count() if n_threads is None else n_threads
Expand Down
49 changes: 37 additions & 12 deletions elf/parallel/distance_transform.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,55 @@
# IMPORTANT do threadctl import first (before numpy imports)
from threadpoolctl import threadpool_limits
from typing import Optional, Tuple, Union

import multiprocessing
# would be nice to use dask, so that we can also run this on the cluster
from concurrent import futures

import numpy as np
from numpy.typing import ArrayLike
from scipy.ndimage import distance_transform_edt
from tqdm import tqdm

from .common import get_blocking


def distance_transform(
data,
halo,
sampling=None,
return_distances=True,
return_indices=False,
distances=None,
indices=None,
block_shape=None,
n_threads=None,
verbose=False,
roi=None,
):
data: ArrayLike,
halo: Tuple[int, ...],
sampling: Optional[Union[float, Tuple[float, ...]]] = None,
return_distances: bool = True,
return_indices: bool = False,
distances: Optional[ArrayLike] = None,
indices: Optional[ArrayLike] = None,
block_shape: Optional[Tuple[int, ...]] = None,
n_threads: Optional[int] = None,
verbose: bool = False,
roi: Optional[Tuple[slice, ...]] = None,
) -> Union[ArrayLike, Tuple[ArrayLike, ArrayLike]]:
"""Compute distance transform in parallel over blocks.
The results are only correct up to the distance to the block boundary plus halo.
The function `scipy.ndimage.distance_transform_edt` is used to compute the distances.
Args:
data: The input data.
halo: The halo, which is the padding added at each side of the block.
sampling: The sampling value passed to distance_transfor_edt.
return_distances: Whether to return the computed distances.
return_indices: Whether to return the computed indices.
distances: Pre-allocated array-like object for the distances.
indices: Pre-allocated array-like object for the indices.
block_shape: Shape of the blocks to use for parallelisation,
by default chunks of the input will be used, if available.
n_threads: Number of threads, by default all available threads are used.
verbose: Verbosity flag.
roi: Region of interest for this computation.
Returns:
The distances, if return_distances is set to True.
The indices, if return_distances is set ot True.
"""
if data.ndim not in (2, 3):
raise ValueError(
f"Invalid input dimensionality. Expected input to have 2 or 3 dimensions, got {data.ndim}."
Expand Down
173 changes: 109 additions & 64 deletions elf/parallel/filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
from concurrent import futures
from functools import partial
from tqdm import tqdm
from typing import Optional, Tuple

import numpy as np
from numpy.typing import ArrayLike
try:
import fastfilters as ff
except ImportError:
Expand All @@ -16,43 +18,58 @@

# helper function to choose a channel from filter output
def choose_channel(data, sigma, function, channel):
"""@private
"""
return function(data, sigma)[..., channel]


def block_to_bb(block):
"""@private
"""
return tuple(slice(beg, end) for beg, end in zip(block.begin, block.end))


def get_halo(sigma, order, ndim, outer_scale=None):
"""@private
"""
sigma_ = sigma if outer_scale is None else sigma + outer_scale
halo = sigma_to_halo(sigma_, order)
if isinstance(halo, int):
halo = ndim * [halo]
return halo


def apply_filter(data, filter_name, sigma,
outer_scale=None, return_channel=None,
out=None, block_shape=None,
n_threads=None, mask=None,
verbose=False, roi=None):
""" Apply filter to data in parallel.
Arguments:
data [array_like] - input data, numpy array or similar like h5py or zarr dataset
filter_name [str] - name of the filter to apply
sigma [float or list[float]] - sigma value for filter
outer_scale [float] - outer scale value for structure tensor (default: None)
return_channel [int] - channel to select for multi-scale response (default: None)
out [array_like] - output, by default the filter is applied inplace (default: None)
block_shape [tuple] - shape of the blocks used for parallelisation,
by default chunks of the input will be used, if available (default: None)
n_threads [int] - number of threads, by default all are used (default: None)
mask [array_like] - mask to exclude data from the computation (default: None)
verbose [bool] - verbosity flag (default: False)
roi [tuple[slice]] - region of interest for this computation (default: None)
def apply_filter(
data: ArrayLike,
filter_name: str,
sigma: float,
outer_scale: Optional[str] = None,
return_channel: Optional[int] = None,
out: Optional[ArrayLike] = None,
block_shape: Optional[Tuple[int, ...]] = None,
n_threads: Optional[int] = None,
mask: Optional[ArrayLike] = None,
verbose: bool = False,
roi: Optional[Tuple[slice, ...]] = None,
) -> ArrayLike:
"""Apply filter to data in parallel.
Args:
data: Input data, numpy array or similar like h5py or zarr dataset
filter_name: Name of the filter to apply.
sigma: Sigma value for filter.
outer_scale: Outer scale value for structure tensor.
return_channel: Channel to select for multi-channel response.
out: Output, by default the filter is applied inplace.
block_shape: Shape of the blocks used for parallelisation,
by default chunks of the input will be used, if available.
n_threads: Number of threads, by default all are used.
mask: Mask to exclude data from the computation.
verbose: Verbosity flag.
roi: Region of interest for this computation.
Returns:
array_like - filter response
The filter response.
"""
n_threads = multiprocessing.cpu_count() if n_threads is None else n_threads

Expand Down Expand Up @@ -147,22 +164,31 @@ def _generate_filter(filter_name):
doc_str =\
"""Comppute %s response block-wise and in parallel.
Arguments:
data [array_like] - input data, numpy array or similar like h5py or zarr dataset
sigma [float or list[float]] - sigma value for filter
out [array_like] - output, by default the filter is applied inplace (default: None)
block_shape [tuple] - shape of the blocks used for parallelisation,
by default chunks of the input will be used, if available (default: None)
n_threads [int] - number of threads, by default all are used (default: None)
mask [array_like] - mask to exclude data from the computation (default: None)
verbose [bool] - verbosity flag (default: False)
roi [tuple[slice]] - region of interest for this computation (default: None)
Args:
data: Input data, numpy array or similar like h5py or zarr dataset.
sigma: Sigma value for filter.
out: Output, by default the filter is applied inplace.
block_shape: Shape of the blocks used for parallelisation,
by default chunks of the input will be used, if available.
n_threads: Number of threads, by default all are used.
mask: Mask to exclude data from the computation.
verbose: Verbosity flag.
roi: Region of interest for this computation.
Returns:
array_like - filter response
The filter response.
""" % elf_name

def op(data, sigma, out=None, block_shape=None,
n_threads=None, mask=None, verbose=False, roi=None):
def op(
data: ArrayLike,
sigma: float,
out: Optional[ArrayLike] = None,
block_shape: Optional[Tuple[int, ...]] = None,
n_threads: Optional[int] = None,
mask: Optional[ArrayLike] = None,
verbose: bool = False,
roi: Optional[Tuple[slice, ...]] = None,
) -> ArrayLike:
return apply_filter(data, filter_name, sigma, outer_scale=None,
return_channel=None, out=out, block_shape=block_shape,
n_threads=n_threads, mask=mask, verbose=verbose, roi=roi)
Expand All @@ -179,24 +205,34 @@ def _generate_structure_tensor(filter_name):
"""Comppute %s response block-wise and in parallel.
Arguments:
data [array_like] - input data, numpy array or similar like h5py or zarr dataset
sigma [float or list[float]] - sigma value for filter
outer_scale [float] - outer scale value
return_channel [int] - return selected channel (default: None)
out [array_like] - output, by default the filter is applied inplace (default: None)
block_shape [tuple] - shape of the blocks used for parallelisation,
by default chunks of the input will be used, if available (default: None)
n_threads [int] - number of threads, by default all are used (default: None)
mask [array_like] - mask to exclude data from the computation (default: None)
verbose [bool] - verbosity flag (default: False)
roi [tuple[slice]] - region of interest for this computation (default: None)
data: Input data, numpy array or similar like h5py or zarr dataset.
sigma: Sigma value for filter.
outer_scale: Outer scale value.
return_channel: Return selected channel.
out: Output, by default the filter is applied inplace.
block_shape: Shape of the blocks used for parallelisation,
by default chunks of the input will be used, if available.
n_threads: Number of threads, by default all are used.
mask: Mask to exclude data from the computation.
verbose: Verbosity flag.
roi: Region of interest for this computation.
Returns:
array_like - filter response
The filter response.
""" % elf_name

def op(data, sigma, outer_scale,
return_channel=None, out=None, block_shape=None,
n_threads=None, mask=None, verbose=False, roi=None):
def op(
data: ArrayLike,
sigma: float,
outer_scale: float,
return_channel: Optional[int] = None,
out: Optional[ArrayLike] = None,
block_shape: Optional[Tuple[int, ...]] = None,
n_threads: Optional[int] = None,
mask: Optional[ArrayLike] = None,
verbose: bool = False,
roi: Optional[Tuple[slice, ...]] = None,
) -> ArrayLike:
return apply_filter(data, filter_name, sigma, outer_scale=outer_scale,
return_channel=return_channel, out=out, block_shape=block_shape,
n_threads=n_threads, mask=mask, verbose=verbose, roi=roi)
Expand All @@ -212,24 +248,33 @@ def _generate_hessian(filter_name):
doc_str =\
"""Comppute %s response block-wise and in parallel.
Arguments:
data [array_like] - input data, numpy array or similar like h5py or zarr dataset
sigma [float or list[float]] - sigma value for filter
return_channel [int] - return selected channel (default: None)
out [array_like] - output, by default the filter is applied inplace (default: None)
block_shape [tuple] - shape of the blocks used for parallelisation,
by default chunks of the input will be used, if available (default: None)
n_threads [int] - number of threads, by default all are used (default: None)
mask [array_like] - mask to exclude data from the computation (default: None)
verbose [bool] - verbosity flag (default: False)
roi [tuple[slice]] - region of interest for this computation (default: None)
Args:
data: Input data, numpy array or similar like h5py or zarr dataset.
sigma: Sigma value for filter.
return_channel: Return selected channel.
out: Output, by default the filter is applied inplace.
block_shape: Shape of the blocks used for parallelisation,
by default chunks of the input will be used, if available.
n_threads: Number of threads, by default all are used.
mask: Mask to exclude data from the computation.
verbose: Verbosity flag.
roi: Region of interest for this computation.
Returns:
array_like - filter response
The filter response.
""" % elf_name

def op(data, sigma, return_channel=None,
out=None, block_shape=None,
n_threads=None, mask=None, verbose=False, roi=None):
def op(
data: ArrayLike,
sigma: float,
return_channel: Optional[int] = None,
out: Optional[ArrayLike] = None,
block_shape: Optional[Tuple[int, ...]] = None,
n_threads: Optional[int] = None,
mask: Optional[ArrayLike] = None,
verbose: bool = False,
roi: Optional[Tuple[slice, ...]] = None,
):
return apply_filter(data, filter_name, sigma, return_channel=return_channel,
out=out, block_shape=block_shape,
n_threads=n_threads, mask=mask, verbose=verbose, roi=roi)
Expand Down
Loading

0 comments on commit 2d52133

Please sign in to comment.