Skip to content

PySPIM Core API Reference

pyspim

Modules

conv

data

Modules
dispim

utilities for loading data acquired in micro-manager with the ASI diSPIM plugin

Functions
uManagerAcquisition(path: os.PathLike, multi_pos: bool = False, xp: ModuleType = numpy)

uManagerAcquisition Create new acquisition object for fetching raw acquisition data.

Parameters:

Name Type Description Default
path PathLike

path to root directory of acquisition

required
multi_pos bool

flag for if this is a multi-position acquisition. Defaults to False.

False
xp ModuleType

array module to load data into (numpy or cupy). Defaults to numpy.

numpy
Source code in packages/pyspim/src/pyspim/data/dispim.py
def uManagerAcquisition(
    path: os.PathLike, multi_pos: bool = False, xp: ModuleType = numpy
):
    """uManagerAcquisition Create new acquisition object for fetching raw acquisition data.

    Args:
        path (os.PathLike): path to root directory of acquisition
        multi_pos (bool, optional): flag for if this is a multi-position acquisition. Defaults to False.
        xp (ModuleType, optional): array module to load data into (``numpy`` or ``cupy``). Defaults to numpy.
    """
    if multi_pos:
        return uManagerAcquisitionMultiPos(path, xp)
    else:
        return uManagerAcquisitionOnePos(path, xp)
parse_position_list(path: os.PathLike) -> numpy.ndarray

parse_position_list Parse uManager position list into numpy array

Parameters:

Name Type Description Default
path PathLike

path to the PositionList.pos file

required

Returns:

Type Description
ndarray

numpy.ndarray: 6-column array [position_index, grid_z, grid_y, z, y, x]

Source code in packages/pyspim/src/pyspim/data/dispim.py
def parse_position_list(path: os.PathLike) -> numpy.ndarray:
    """parse_position_list Parse uManager position list into numpy array

    Args:
        path (os.PathLike): path to the PositionList.pos file

    Returns:
        numpy.ndarray: 6-column array [position_index, grid_z, grid_y, z, y, x]
    """
    with open(path) as f:
        pos_dict = json.load(f)
    positions = pos_dict["POSITIONS"]
    rows = []
    for idx, position in enumerate(positions):
        gz, gy = __label_to_grid_location(position["LABEL"])
        x, y, z = __physical_position_from_devices(position["DEVICES"])
        rows.append([idx, gz, gy, z, y, x])
    return numpy.vstack(rows)
subtract_constant_uint16arr(arr: NDArray, const: int) -> NDArray

subtract_constant_uint16arr Subtract constant value from uint16 array, preventing underflow.

Parameters:

Name Type Description Default
arr NDArray

input array

required
const int

value to subtract

required

Returns:

Type Description
NDArray

NDArray

Source code in packages/pyspim/src/pyspim/data/dispim.py
def subtract_constant_uint16arr(arr: NDArray, const: int) -> NDArray:
    """subtract_constant_uint16arr Subtract constant value from uint16 array, preventing underflow.

    Args:
        arr (NDArray): input array
        const (int): value to subtract

    Returns:
        NDArray
    """
    try:
        xp = cupy.get_array_module(arr)
    except (NameError, ImportError):
        xp = numpy
    return (arr.astype(xp.int32) - const).clip(0, 2**16).astype(xp.uint16)

decon

Functions
Modules
rl
Modules
dualview_fft

Deconvolution algorithms for jointly deconvolving dual-view microscopy data.

References

[1] Wu, et. al., "Spatially isotropic four-dimensional...", doi:10.1038/nbt.2713 [2] Preibsch, et al. "Efficient Bayesian-based...", doi:10.1038/nmeth.2929 [3] Guo, et al. "Rapid image deconvolution..." doi:10.1038/s41587-020-0560-x [4] Anconelli, B., et al. "Reduction of boundary effects...", doi:10.1051/0004-6361:20053848

Functions
deconvolve(view_a: NDArray, view_b: NDArray, est_i: NDArray | None, psf_a: NDArray | Iterable[NDArray], psf_b: NDArray | Iterable[NDArray], backproj_a: NDArray | Iterable[NDArray], backproj_b: NDArray | Iterable[NDArray], decon_function: str, num_iter: int, epsilon: float, req_both: bool, boundary_correction: bool, zero_padding: Optional[PadType], boundary_sigma_a: float, boundary_sigma_b: float, verbose: bool) -> NDArray

Joint deconvolution of 2 views into a single one.

Parameters:

Name Type Description Default
view_a NDArray

array from 1st view

required
view_b NDArray

array from 2nd view

required
est_i NDArray | None

initial estimate

required
psf_a NDArray | Iterable[NDArray]

psf arrays for view A (single one, or 1 per channel)

required
psf_b NDArray | Iterable[NDArray]

psf arrays for view B (single, or 1 per channel)

required
backproj_a NDArray | Iterable[NDArray]

backprojector array(s) for view A

required
backproj_b NDArray | Iterable[NDArray]

backprojector array(s) for view B

required
decon_function str

style of deconvolution to use

required
num_iter int

number of iterations

required
epsilon float

small parameter to prevent division by zero

required
req_both bool

set all areas where either view doesn't have data to 0

required
boundary_correction bool

do boundary correction

required
zero_padding Optional[PadType]

zero padding for boundary correction

required
boundary_sigma_a float

significance level for pixels, view a

required
boundary_sigma_b float

significance level for pixels, view b

required
verbose bool

show progressbar for iterations

required

Raises:

Type Description
ValueError

input array is not 3 or 4D

Returns:

Type Description
NDArray

NDArray

Source code in packages/pyspim/src/pyspim/decon/rl/dualview_fft.py
def deconvolve(
    view_a: NDArray,
    view_b: NDArray,
    est_i: NDArray | None,
    psf_a: NDArray | Iterable[NDArray],
    psf_b: NDArray | Iterable[NDArray],
    backproj_a: NDArray | Iterable[NDArray],
    backproj_b: NDArray | Iterable[NDArray],
    decon_function: str,
    num_iter: int,
    epsilon: float,
    req_both: bool,
    boundary_correction: bool,
    zero_padding: Optional[PadType],
    boundary_sigma_a: float,
    boundary_sigma_b: float,
    verbose: bool,
) -> NDArray:
    """Joint deconvolution of 2 views into a single one.

    Args:
        view_a (NDArray): array from 1st view
        view_b (NDArray): array from 2nd view
        est_i (NDArray | None): initial estimate
        psf_a (NDArray | Iterable[NDArray]): psf arrays for view A (single one, or 1 per channel)
        psf_b (NDArray | Iterable[NDArray]): psf arrays for view B (single, or 1 per channel)
        backproj_a (NDArray | Iterable[NDArray]): backprojector array(s) for view A
        backproj_b (NDArray | Iterable[NDArray]): backprojector array(s) for view B
        decon_function (str): style of deconvolution to use
        num_iter (int): number of iterations
        epsilon (float): small parameter to prevent division by zero
        req_both (bool): set all areas where either view doesn't have data to 0
        boundary_correction (bool): do boundary correction
        zero_padding (Optional[PadType]): zero padding for boundary correction
        boundary_sigma_a (float): significance level for pixels, view a
        boundary_sigma_b (float): significance level for pixels, view b
        verbose (bool): show progressbar for iterations

    Raises:
        ValueError: input array is not 3 or 4D

    Returns:
        NDArray
    """
    if len(view_a.shape) == 4:
        return _deconvolve_multichannel(
            view_a,
            view_b,
            est_i,
            psf_a,
            psf_b,
            backproj_a,
            backproj_b,
            decon_function,
            num_iter,
            epsilon,
            boundary_correction,
            req_both,
            zero_padding,
            boundary_sigma_a,
            boundary_sigma_b,
            verbose,
        )
    elif len(view_a.shape) == 3:
        return _deconvolve_single_channel(
            view_a,
            view_b,
            est_i,
            psf_a,
            psf_b,
            backproj_a,
            backproj_b,
            decon_function,
            num_iter,
            epsilon,
            boundary_correction,
            req_both,
            zero_padding,
            boundary_sigma_a,
            boundary_sigma_b,
            verbose,
        )
    else:
        raise ValueError("invalid shape, must be 3- or 4D")
joint_rl_dispim(view_a: NDArray, view_b: NDArray, est_i: NDArray, psf_a: NDArray, psf_b: NDArray, backproj_a: Optional[NDArray] = None, backproj_b: Optional[NDArray] = None, num_iter: int = 10, epsilon: float = 1e-05, boundary_correction: bool = True, req_both: bool = False, zero_padding: Optional[PadType] = None, boundary_sigma_a: float = 0.01, boundary_sigma_b: float = 0.01, verbose: bool = False) -> NDArray

joint_rl_dispim Modified Richardson-Lucy joint deconvolution, as described in [1].

originally developed by the Shroff group at the NIH for use in diSPIM imaging. NOTE: boundary correction is not implemented for this method, as at the time of writing, there is no known way of doing this

Parameters:

Name Type Description Default
view_a NDArray

data array corresponding to first view (A)

required
view_b NDArray

data array corresponding to second view (B)

required
est_i NDArray

initial estimate for deconvolution. If None, will use (A+B)/2.

required
psf_a NDArray

array with real-space point spread function for view A

required
psf_b NDArray

array with real-space point spread function for view B

required
backproj_a NDArray

array with real-space backprojector for view A. If None, the mirrored point spread function is used.

None
backproj_b NDArray

array with real-space backprojector for view B. If None, the mirrored point spread function is used.

None
num_iter int

number of iterations to deconvolve for. Defaults to 10.

10
epsilon float

small parameter to prevent division by zero errors. Defaults to 1e-5.

1e-05
boundary_correction bool

correct boundary effects, defaults to True.

True
req_both bool

only deconvolve areas where views a & b have data, defaults to False.

False
zero_padding PadType

amount of zero-padding to add to each axis, defaults to None. if None, each axis of size N is padded on each side by N/2 so that the padded image has dimension 2N in that axis.

None
boundary_sigma_a float

threshold for determining significant pixels in view A, defaults to 1e-2.

0.01
boundary_sigma_b float

threshold for determining significant pixels in view B, defaults to 1e-2.

0.01
verbose bool

display progress bar, defaults to False

False

Returns:

Type Description
NDArray

NDArray

Source code in packages/pyspim/src/pyspim/decon/rl/dualview_fft.py
def joint_rl_dispim(
    view_a: NDArray,
    view_b: NDArray,
    est_i: NDArray,
    psf_a: NDArray,
    psf_b: NDArray,
    backproj_a: Optional[NDArray] = None,
    backproj_b: Optional[NDArray] = None,
    num_iter: int = 10,
    epsilon: float = 1e-5,
    boundary_correction: bool = True,
    req_both: bool = False,
    zero_padding: Optional[PadType] = None,
    boundary_sigma_a: float = 1e-2,
    boundary_sigma_b: float = 1e-2,
    verbose: bool = False,
) -> NDArray:
    """joint_rl_dispim Modified Richardson-Lucy joint deconvolution, as described in [1].

    originally developed by the Shroff group at the NIH for use in diSPIM imaging.
    NOTE: boundary correction is not implemented for this method, as at the time of writing, there is no known way of doing this

    Args:
        view_a (NDArray): data array corresponding to first view (A)
        view_b (NDArray): data array corresponding to second view (B)
        est_i (NDArray): initial estimate for deconvolution. If ``None``, will use (A+B)/2.
        psf_a (NDArray): array with real-space point spread function for view A
        psf_b (NDArray): array with real-space point spread function for view B
        backproj_a (NDArray, optional): array with real-space backprojector for view A. If `None`, the mirrored point spread function is used.
        backproj_b (NDArray, optional): array with real-space backprojector for view B. If `None`, the mirrored point spread function is used.
        num_iter (int, optional): number of iterations to deconvolve for. Defaults to 10.
        epsilon (float, optional): small parameter to prevent division by zero errors. Defaults to 1e-5.
        boundary_correction (bool, optional): correct boundary effects, defaults to True.
        req_both (bool, optional): only deconvolve areas where views a & b have data, defaults to False.
        zero_padding (PadType, optional): amount of zero-padding to add to each axis, defaults to None. if None, each axis of size N is padded on each side by N/2 so that the padded image has dimension 2N in that axis.
        boundary_sigma_a (float, optional): threshold for determining significant pixels in view A, defaults to 1e-2.
        boundary_sigma_b (float, optional): threshold for determining significant pixels in view B, defaults to 1e-2.
        verbose (bool, optional): display progress bar, defaults to False

    Returns:
        NDArray
    """
    if boundary_correction:
        return _additive_joint_rl_boundcorr(
            view_a,
            view_b,
            est_i,
            psf_a,
            psf_b,
            backproj_a,
            backproj_b,
            num_iter,
            epsilon,
            zero_padding,
            boundary_sigma_a,
            boundary_sigma_b,
            req_both,
            verbose,
        )
    else:
        return _joint_rl_dispim_uncorr(
            view_a,
            view_b,
            est_i,
            psf_a,
            psf_b,
            backproj_a,
            backproj_b,
            num_iter,
            epsilon,
            req_both,
            verbose,
        )
efficient_bayesian_backprojectors(psf_a: NDArray, psf_b: NDArray) -> Tuple[NDArray, NDArray]

efficient_bayesian_backprojectors Calculate proper backprojectors for "Efficient Bayesian" deconvolution.

Parameters:

Name Type Description Default
psf_a NDArray

point spread function for view A

required
psf_b NDArray

point spread function for view B

required

Returns:

Type Description
Tuple[NDArray, NDArray]

Tuple[NDArray,NDArray]: tuple of backprojectors, one for each view.

Source code in packages/pyspim/src/pyspim/decon/rl/dualview_fft.py
def efficient_bayesian_backprojectors(
    psf_a: NDArray, psf_b: NDArray
) -> Tuple[NDArray, NDArray]:
    """efficient_bayesian_backprojectors Calculate proper backprojectors for "Efficient Bayesian" deconvolution.

    Args:
        psf_a (NDArray): point spread function for view A
        psf_b (NDArray): point spread function for view B

    Returns:
        Tuple[NDArray,NDArray]: tuple of backprojectors, one for each view.
    """
    xp = cupy.get_array_module(psf_a)
    if xp == cupy:
        conv = lambda k, i: fftconv_gpu(i, k, mode="same")
    else:
        conv = lambda k, i: fftconv_cpu(i, k, mode="same")
    float_type = supported_float_type(psf_a.dtype)
    psf_a = psf_a.astype(float_type, copy=False)
    psf_b = psf_b.astype(float_type, copy=False)
    # formulate backprojectors for "virtual view" updates
    # TODO: write this s.t. it's valid for 2D images, too
    flp_a = xp.ascontiguousarray(psf_a[::-1, ::-1, ::-1])
    flp_b = xp.ascontiguousarray(psf_b[::-1, ::-1, ::-1])
    # calculate backprojectors
    bp_a = flp_a * conv(conv(flp_a, psf_b), flp_b)
    bp_b = flp_b * conv(conv(flp_b, psf_a), flp_a)
    return bp_a, bp_b
efficient_bayesian(view_a: NDArray, view_b: NDArray, est_i: NDArray, psf_a: NDArray, psf_b: NDArray, num_iter: int = 10, epsilon: float = 1e-05, boundary_correction: bool = True, req_both: bool = False, zero_padding: Optional[PadType] = None, boundary_sigma_a: float = 0.01, boundary_sigma_b: float = 0.01, verbose: bool = False) -> NDArray

efficient_bayesian Efficient bayesian multiview deconvolution.

Originally described in [2], this is just additive joint deconvolution with backprojectors calculated as described in the "quadruple-view deconvolution" section of [3]

Parameters:

Name Type Description Default
view_a NDArray

data array corresponding to first view (A)

required
view_b NDArray

data array corresponding to second view (B)

required
est_i NDArray

initial estimate for deconvolution. If None, will use (A+B)/2.

required
psf_a NDArray

array with real-space point spread function for view A

required
psf_b NDArray

array with real-space point spread function for view B

required
backproj_a NDArray

array with real-space backprojector for view A. If None, the mirrored point spread function is used.

required
backproj_b NDArray

array with real-space backprojector for view B. If None, the mirrored point spread function is used.

required
num_iter int

number of iterations to deconvolve for. Defaults to 10.

10
epsilon float

small parameter to prevent division by zero errors. Defaults to 1e-5.

1e-05
boundary_correction bool

correct boundary effects, defaults to True.

True
req_both bool

only deconvolve areas where views a & b have data, defaults to False.

False
zero_padding PadType

amount of zero-padding to add to each axis, defaults to None. if None, each axis of size N is padded on each side by N/2 so that the padded image has dimension 2N in that axis.

None
boundary_sigma_a float

threshold for determining significant pixels in view A, defaults to 1e-2.

0.01
boundary_sigma_b float

threshold for determining significant pixels in view B, defaults to 1e-2.

0.01
verbose bool

display progress bar, defaults to False

False

Returns:

Type Description
NDArray

NDArray

Source code in packages/pyspim/src/pyspim/decon/rl/dualview_fft.py
def efficient_bayesian(
    view_a: NDArray,
    view_b: NDArray,
    est_i: NDArray,
    psf_a: NDArray,
    psf_b: NDArray,
    num_iter: int = 10,
    epsilon: float = 1e-5,
    boundary_correction: bool = True,
    req_both: bool = False,
    zero_padding: Optional[PadType] = None,
    boundary_sigma_a: float = 1e-2,
    boundary_sigma_b: float = 1e-2,
    verbose: bool = False,
) -> NDArray:
    """efficient_bayesian Efficient bayesian multiview deconvolution.

    Originally described in [2], this is just additive joint deconvolution with backprojectors calculated as described in the "quadruple-view deconvolution" section of [3]

    Args:
        view_a (NDArray): data array corresponding to first view (A)
        view_b (NDArray): data array corresponding to second view (B)
        est_i (NDArray): initial estimate for deconvolution. If ``None``, will use (A+B)/2.
        psf_a (NDArray): array with real-space point spread function for view A
        psf_b (NDArray): array with real-space point spread function for view B
        backproj_a (NDArray, optional): array with real-space backprojector for view A. If `None`, the mirrored point spread function is used.
        backproj_b (NDArray, optional): array with real-space backprojector for view B. If `None`, the mirrored point spread function is used.
        num_iter (int, optional): number of iterations to deconvolve for. Defaults to 10.
        epsilon (float, optional): small parameter to prevent division by zero errors. Defaults to 1e-5.
        boundary_correction (bool, optional): correct boundary effects, defaults to True.
        req_both (bool, optional): only deconvolve areas where views a & b have data, defaults to False.
        zero_padding (PadType, optional): amount of zero-padding to add to each axis, defaults to None. if None, each axis of size N is padded on each side by N/2 so that the padded image has dimension 2N in that axis.
        boundary_sigma_a (float, optional): threshold for determining significant pixels in view A, defaults to 1e-2.
        boundary_sigma_b (float, optional): threshold for determining significant pixels in view B, defaults to 1e-2.
        verbose (bool, optional): display progress bar, defaults to False

    Returns:
        NDArray
    """
    # compute backprojectors
    backproj_a, backproj_b = efficient_bayesian_backprojectors(psf_a, psf_b)
    return additive_joint_rl(
        view_a,
        view_b,
        est_i,
        psf_a,
        psf_b,
        backproj_a,
        backproj_b,
        num_iter,
        epsilon,
        boundary_correction,
        req_both,
        zero_padding,
        boundary_sigma_a,
        boundary_sigma_b,
        verbose,
    )
additive_joint_rl(view_a: NDArray, view_b: NDArray, est_i: NDArray, psf_a: NDArray, psf_b: NDArray, backproj_a: Optional[NDArray] = None, backproj_b: Optional[NDArray] = None, num_iter: int = 10, epsilon: float = 1e-05, boundary_correction: bool = True, req_both: bool = False, zero_padding: Optional[PadType] = None, boundary_sigma_a: float = 0.01, boundary_sigma_b: float = 0.01, verbose: bool = False) -> NDArray

additive_joint_rl Additive joint deconvolution.

With boundary correction turned off, each view is deconvolved separately and then averaged at each iteration. With boundary correction turned on, an ordered subset EM algorithm from [4] is used.

Parameters:

Name Type Description Default
view_a NDArray

data array corresponding to first view (A)

required
view_b NDArray

data array corresponding to second view (B)

required
est_i NDArray

initial estimate for deconvolution. If None, will use (A+B)/2.

required
psf_a NDArray

array with real-space point spread function for view A

required
psf_b NDArray

array with real-space point spread function for view B

required
backproj_a NDArray

array with real-space backprojector for view A. If None, the mirrored point spread function is used.

None
backproj_b NDArray

array with real-space backprojector for view B. If None, the mirrored point spread function is used.

None
num_iter int

number of iterations to deconvolve for. Defaults to 10.

10
epsilon float

small parameter to prevent division by zero errors. Defaults to 1e-5.

1e-05
boundary_correction bool

correct boundary effects, defaults to True.

True
req_both bool

only deconvolve areas where views a & b have data, defaults to False.

False
zero_padding PadType

amount of zero-padding to add to each axis, defaults to None. if None, each axis of size N is padded on each side by N/2 so that the padded image has dimension 2N in that axis.

None
boundary_sigma_a float

threshold for determining significant pixels in view A, defaults to 1e-2.

0.01
boundary_sigma_b float

threshold for determining significant pixels in view B, defaults to 1e-2.

0.01
verbose bool

display progress bar, defaults to False

False

Returns:

Type Description
NDArray

NDArray

Source code in packages/pyspim/src/pyspim/decon/rl/dualview_fft.py
def additive_joint_rl(
    view_a: NDArray,
    view_b: NDArray,
    est_i: NDArray,
    psf_a: NDArray,
    psf_b: NDArray,
    backproj_a: Optional[NDArray] = None,
    backproj_b: Optional[NDArray] = None,
    num_iter: int = 10,
    epsilon: float = 1e-5,
    boundary_correction: bool = True,
    req_both: bool = False,
    zero_padding: Optional[PadType] = None,
    boundary_sigma_a: float = 1e-2,
    boundary_sigma_b: float = 1e-2,
    verbose: bool = False,
) -> NDArray:
    """additive_joint_rl Additive joint deconvolution.

    With boundary correction turned off, each view is deconvolved separately and then averaged at each iteration.
    With boundary correction turned on, an ordered subset EM algorithm from [4] is used.

    Args:
        view_a (NDArray): data array corresponding to first view (A)
        view_b (NDArray): data array corresponding to second view (B)
        est_i (NDArray): initial estimate for deconvolution. If ``None``, will use (A+B)/2.
        psf_a (NDArray): array with real-space point spread function for view A
        psf_b (NDArray): array with real-space point spread function for view B
        backproj_a (NDArray, optional): array with real-space backprojector for view A. If `None`, the mirrored point spread function is used.
        backproj_b (NDArray, optional): array with real-space backprojector for view B. If `None`, the mirrored point spread function is used.
        num_iter (int, optional): number of iterations to deconvolve for. Defaults to 10.
        epsilon (float, optional): small parameter to prevent division by zero errors. Defaults to 1e-5.
        boundary_correction (bool, optional): correct boundary effects, defaults to True.
        req_both (bool, optional): only deconvolve areas where views a & b have data, defaults to False.
        zero_padding (PadType, optional): amount of zero-padding to add to each axis, defaults to None. if None, each axis of size N is padded on each side by N/2 so that the padded image has dimension 2N in that axis.
        boundary_sigma_a (float, optional): threshold for determining significant pixels in view A, defaults to 1e-2.
        boundary_sigma_b (float, optional): threshold for determining significant pixels in view B, defaults to 1e-2.
        verbose (bool, optional): display progress bar, defaults to False

    Returns:
        NDArray
    """
    if boundary_correction:
        return _additive_joint_rl_boundcorr(
            view_a,
            view_b,
            est_i,
            psf_a,
            psf_b,
            backproj_a,
            backproj_b,
            num_iter,
            epsilon,
            zero_padding,
            boundary_sigma_a,
            boundary_sigma_b,
            req_both,
            verbose,
        )
    else:
        return _additive_joint_rl_uncorr(
            view_a,
            view_b,
            est_i,
            psf_a,
            psf_b,
            backproj_a,
            backproj_b,
            num_iter,
            epsilon,
            req_both,
            verbose,
        )
estimate_initialization(view_a: NDArray, view_b: NDArray, init_constant: bool) -> NDArray

estimate_initialization Initialize estimate for deconvolution

Parameters:

Name Type Description Default
view_a NDArray

input volume, view A

required
view_b NDArray

input volume, view B

required
init_constant bool

initialize as constant matrix, otherwise (A + B)/2

required

Returns:

Type Description
NDArray

NDArray

Source code in packages/pyspim/src/pyspim/decon/rl/dualview_fft.py
def estimate_initialization(
    view_a: NDArray, view_b: NDArray, init_constant: bool
) -> NDArray:
    """estimate_initialization Initialize estimate for deconvolution

    Args:
        view_a (NDArray): input volume, view A
        view_b (NDArray): input volume, view B
        init_constant (bool): initialize as constant matrix, otherwise (A + B)/2

    Returns:
        NDArray
    """
    xp = cupy.get_array_module(view_a)
    if init_constant:
        # initialize the estimate as a constant value that can satisfy
        # the flux constraint, (Eqn. 16 of [4])
        c = xp.sum((view_a + view_b) / 2)
        return xp.ones_like(view_a) * (c / xp.prod(xp.asarray(view_a.shape)))
    else:
        # do (view_a + view_b) / 2. as is done typically in RL decon.
        # (NOTE: this still satisfies flux constraint, so ok)
        return (view_a + view_b) / 2
dualview_sep

Separable deconvolution of dual-view volumes.

TODO: add more detail here.

Functions
deconvolve(view_a: NDArray, view_b: NDArray, est_i: NDArray | None, psf_az: NDArray | Iterable[NDArray], psf_ay: NDArray | Iterable[NDArray], psf_ax: NDArray | Iterable[NDArray], psf_bz: NDArray | Iterable[NDArray], psf_by: NDArray | Iterable[NDArray], psf_bx: NDArray | Iterable[NDArray], bp_az: NDArray | Iterable[NDArray], bp_ay: NDArray | Iterable[NDArray], bp_ax: NDArray | Iterable[NDArray], bp_bz: NDArray | Iterable[NDArray], bp_by: NDArray | Iterable[NDArray], bp_bx: NDArray | Iterable[NDArray], decon_function: str, num_iter: int, epsilon: float, boundary_correction: bool, zero_padding: PadType, boundary_sigma_a: float, boundary_sigma_b: float, verbose: bool) -> NDArray

deconvolve Joint deconvolution of the 2 input views into a single volume.

Input volumes should be either 3D (ZRC) or 4D (CZRC).

Parameters:

Name Type Description Default
view_a NDArray

input volume from first view (A)

required
view_b NDArray

input volume from second view (B)

required
est_i NDArray | None

initial estimate for deconvolution. If None, will be initialized as (A + B)/2.

required
psf_az NDArray | Iterable[NDArray]

psf for view a in z-direction

required
psf_ay NDArray | Iterable[NDArray]

psf for view a in y-direction

required
psf_ax NDArray | Iterable[NDArray]

psf for view a in x-direction

required
psf_bz NDArray | Iterable[NDArray]

psf for view b in z-direction

required
psf_by NDArray | Iterable[NDArray]

psf for view b in y-direction

required
psf_bx NDArray | Iterable[NDArray]

psf for view b in x-direction

required
bp_az NDArray | Iterable[NDArray]

backprojector for view a in z-direction

required
bp_ay NDArray | Iterable[NDArray]

backprojector for view a in y-direction

required
bp_ax NDArray | Iterable[NDArray]

backprojector for view a in x-direction

required
bp_bz NDArray | Iterable[NDArray]

backprojector for view b in z-direction

required
bp_by NDArray | Iterable[NDArray]

backprojector for view b in y-direction

required
bp_bx NDArray | Iterable[NDArray]

backprojector for view b in x-direction

required
decon_function str

deconvolution function to use. One of "additive", "dispim".

required
num_iter int

number of deconvolution iterations

required
epsilon float

small parameter for preventing division by zero

required
boundary_correction bool

whether or not to do boundary correction

required
zero_padding Optional[PadType]

zero padding for boundary correction

required
boundary_sigma_a float

threshold for determining significant pixels in view A, defaults to 1e-2.

required
boundary_sigma_b float

threshold for determining significant pixels in view A, defaults to 1e-2.

required
verbose bool

show progress bar for iterations

required

Raises:

Type Description
ValueError

if input volume isn't 3- (single channel) or 4D (multichannel).

Returns:

Type Description
NDArray

NDArray

Source code in packages/pyspim/src/pyspim/decon/rl/dualview_sep.py
def deconvolve(
    view_a: NDArray,
    view_b: NDArray,
    est_i: NDArray | None,
    psf_az: NDArray | Iterable[NDArray],
    psf_ay: NDArray | Iterable[NDArray],
    psf_ax: NDArray | Iterable[NDArray],
    psf_bz: NDArray | Iterable[NDArray],
    psf_by: NDArray | Iterable[NDArray],
    psf_bx: NDArray | Iterable[NDArray],
    bp_az: NDArray | Iterable[NDArray],
    bp_ay: NDArray | Iterable[NDArray],
    bp_ax: NDArray | Iterable[NDArray],
    bp_bz: NDArray | Iterable[NDArray],
    bp_by: NDArray | Iterable[NDArray],
    bp_bx: NDArray | Iterable[NDArray],
    decon_function: str,
    num_iter: int,
    epsilon: float,
    boundary_correction: bool,
    zero_padding: PadType,
    boundary_sigma_a: float,
    boundary_sigma_b: float,
    verbose: bool,
) -> NDArray:
    """deconvolve Joint deconvolution of the 2 input views into a single volume.

    Input volumes should be either 3D (ZRC) or 4D (CZRC).

    Args:
        view_a (NDArray): input volume from first view (A)
        view_b (NDArray): input volume from second view (B)
        est_i (NDArray | None): initial estimate for deconvolution. If ``None``, will be initialized as (A + B)/2.
        psf_az (NDArray | Iterable[NDArray]): psf for view a in z-direction
        psf_ay (NDArray | Iterable[NDArray]): psf for view a in y-direction
        psf_ax (NDArray | Iterable[NDArray]): psf for view a in x-direction
        psf_bz (NDArray | Iterable[NDArray]): psf for view b in z-direction
        psf_by (NDArray | Iterable[NDArray]): psf for view b in y-direction
        psf_bx (NDArray | Iterable[NDArray]): psf for view b in x-direction
        bp_az (NDArray | Iterable[NDArray]): backprojector for view a in z-direction
        bp_ay (NDArray | Iterable[NDArray]): backprojector for view a in y-direction
        bp_ax (NDArray | Iterable[NDArray]): backprojector for view a in x-direction
        bp_bz (NDArray | Iterable[NDArray]): backprojector for view b in z-direction
        bp_by (NDArray | Iterable[NDArray]): backprojector for view b in y-direction
        bp_bx (NDArray | Iterable[NDArray]): backprojector for view b in x-direction
        decon_function (str): deconvolution function to use. One of ``"additive", "dispim"``.
        num_iter (int): number of deconvolution iterations
        epsilon (float): small parameter for preventing division by zero
        boundary_correction (bool): whether or not to do boundary correction
        zero_padding (Optional[PadType]): zero padding for boundary correction
        boundary_sigma_a (float): threshold for determining significant pixels in view A, defaults to 1e-2.
        boundary_sigma_b (float): threshold for determining significant pixels in view A, defaults to 1e-2.
        verbose (bool): show progress bar for iterations

    Raises:
        ValueError: if input volume isn't 3- (single channel) or 4D (multichannel).

    Returns:
        NDArray
    """
    if len(view_a.shape) == 4:
        fun = _deconvolve_multichannel
    elif len(view_a.shape) == 3:
        fun = _deconvolve_single_channel
    else:
        raise ValueError("invalid shape, must be 3 or 4D")
    return fun(
        view_a,
        view_b,
        est_i,
        psf_az,
        psf_ay,
        psf_ax,
        psf_bz,
        psf_by,
        psf_bx,
        bp_az,
        bp_ay,
        bp_ax,
        bp_bz,
        bp_by,
        bp_bx,
        decon_function,
        num_iter,
        epsilon,
        boundary_correction,
        zero_padding,
        boundary_sigma_a,
        boundary_sigma_b,
        verbose,
    )
additive_joint_rl(view_a: cupy.ndarray, view_b: cupy.ndarray, est_i: cupy.ndarray, psf_az: cupy.ndarray, psf_ay: cupy.ndarray, psf_ax: cupy.ndarray, psf_bz: cupy.ndarray, psf_by: cupy.ndarray, psf_bx: cupy.ndarray, bp_az: cupy.ndarray, bp_ay: cupy.ndarray, bp_ax: cupy.ndarray, bp_bz: cupy.ndarray, bp_by: cupy.ndarray, bp_bx: cupy.ndarray, num_iter: int, epsilon: float, boundary_correction: bool, zero_padding: PadType, boundary_sigma_a: float, boundary_sigma_b: float, conv_module: Optional[cupy.RawModule], launch_pars: Optional[CuLaunchParameters], verbose: bool) -> cupy.ndarray

additive_joint_rl Additive joint RL deconvolution.

Parameters:

Name Type Description Default
view_a ndarray

input volume for view A

required
view_b ndarray

input volume for view B

required
est_i ndarray

initial estimate

required
psf_az ndarray

PSF for view A, z-direction

required
psf_ay ndarray

PSF for view A, y-direction

required
psf_ax ndarray

PSF for view A, x-direction

required
psf_bz ndarray

PSF for view B, z-direction

required
psf_by ndarray

PSF for view B, y-direction

required
psf_bx ndarray

PSF for view B, x-direction

required
bp_az ndarray

backprojector for view A, z-direction

required
bp_ay ndarray

backprojector for view A, y-direction

required
bp_ax ndarray

backprojector for view A, x-direction

required
bp_bz ndarray

backprojector for view B, z-direction

required
bp_by ndarray

backprojector for view B, y-direction

required
bp_bx ndarray

backprojector for view B, x-direction

required
num_iter int

number of deconvolution iterations

required
epsilon float

small parameter to prevent division by zero

required
boundary_correction bool

whether or not to do boundary correction

required
zero_padding Optional[PadType]

zero-padding for boundary correction

required
boundary_sigma_a float

threshold for determining significant pixels in view A, defaults to 1e-2.

required
boundary_sigma_b float

threshold for determining significant pixels in view A, defaults to 1e-2.

required
conv_module Optional[RawModule]

cupy.RawModule that implements separable convolution for appropriate kernel. If None, will be generated on the fly.

required
launch_pars Optional[CuLaunchParameters]

kernel launch parameters. If None, will be computed for the input volume.

required
verbose bool

description

required

Returns:

Type Description
ndarray

cupy.ndarray

Source code in packages/pyspim/src/pyspim/decon/rl/dualview_sep.py
def additive_joint_rl(
    view_a: cupy.ndarray,
    view_b: cupy.ndarray,
    est_i: cupy.ndarray,
    psf_az: cupy.ndarray,
    psf_ay: cupy.ndarray,
    psf_ax: cupy.ndarray,
    psf_bz: cupy.ndarray,
    psf_by: cupy.ndarray,
    psf_bx: cupy.ndarray,
    bp_az: cupy.ndarray,
    bp_ay: cupy.ndarray,
    bp_ax: cupy.ndarray,
    bp_bz: cupy.ndarray,
    bp_by: cupy.ndarray,
    bp_bx: cupy.ndarray,
    num_iter: int,
    epsilon: float,
    boundary_correction: bool,
    zero_padding: PadType,
    boundary_sigma_a: float,
    boundary_sigma_b: float,
    conv_module: Optional[cupy.RawModule],
    launch_pars: Optional[CuLaunchParameters],
    verbose: bool,
) -> cupy.ndarray:
    """additive_joint_rl Additive joint RL deconvolution.

    Args:
        view_a (cupy.ndarray): input volume for view A
        view_b (cupy.ndarray): input volume for view B
        est_i (cupy.ndarray): initial estimate
        psf_az (cupy.ndarray): PSF for view A, z-direction
        psf_ay (cupy.ndarray): PSF for view A, y-direction
        psf_ax (cupy.ndarray): PSF for view A, x-direction
        psf_bz (cupy.ndarray): PSF for view B, z-direction
        psf_by (cupy.ndarray): PSF for view B, y-direction
        psf_bx (cupy.ndarray): PSF for view B, x-direction
        bp_az (cupy.ndarray): backprojector for view A, z-direction
        bp_ay (cupy.ndarray): backprojector for view A, y-direction
        bp_ax (cupy.ndarray): backprojector for view A, x-direction
        bp_bz (cupy.ndarray): backprojector for view B, z-direction
        bp_by (cupy.ndarray): backprojector for view B, y-direction
        bp_bx (cupy.ndarray): backprojector for view B, x-direction
        num_iter (int): number of deconvolution iterations
        epsilon (float): small parameter to prevent division by zero
        boundary_correction (bool): whether or not to do boundary correction
        zero_padding (Optional[PadType]): zero-padding for boundary correction
        boundary_sigma_a (float): threshold for determining significant pixels in view A, defaults to 1e-2.
        boundary_sigma_b (float): threshold for determining significant pixels in view A, defaults to 1e-2.
        conv_module (Optional[cupy.RawModule]): ``cupy.RawModule`` that implements separable convolution for appropriate kernel. If ``None``, will be generated on the fly.
        launch_pars (Optional[CuLaunchParameters]): kernel launch parameters. If ``None``, will be computed for the input volume.
        verbose (bool): _description_

    Returns:
        cupy.ndarray
    """
    # compile the convolution module
    if conv_module is None:
        conv_module = make_conv_module(len(psf_az) // 2)
    if launch_pars is None:
        launch_pars = calc_launch_params(len(psf_az) // 2, view_a.shape)
    if boundary_correction:
        # determine zero padding
        if zero_padding is None:
            pad = tuple([(d // 2, d // 2) for d in view_a.shape])
        else:
            if isinstance(zero_padding, int):
                pad = tuple([(zero_padding, zero_padding) for _ in view_a.shape])
            else:
                assert len(zero_padding) == len(view_a.shape), (
                    "zero-padding must be specified for all dimensions of input"
                )
                pad = tuple([(p, p) for p in zero_padding])
        return _additive_joint_rl_boundcorr(
            view_a,
            view_b,
            est_i,
            psf_az,
            psf_ay,
            psf_ax,
            psf_bz,
            psf_by,
            psf_bx,
            bp_az,
            bp_ay,
            bp_ax,
            bp_bz,
            bp_by,
            bp_bx,
            num_iter,
            epsilon,
            pad,
            boundary_sigma_a,
            boundary_sigma_b,
            conv_module,
            launch_pars,
            verbose,
        )
    else:
        return _additive_joint_rl_uncorr(
            view_a,
            view_b,
            est_i,
            psf_az,
            psf_ay,
            psf_ax,
            psf_bz,
            psf_by,
            psf_bx,
            bp_az,
            bp_ay,
            bp_ax,
            bp_bz,
            bp_by,
            bp_bx,
            num_iter,
            epsilon,
            conv_module,
            launch_pars,
            False,
        )
joint_rl_dispim(view_a: cupy.ndarray, view_b: cupy.ndarray, est_i: cupy.ndarray, psf_az: cupy.ndarray, psf_ay: cupy.ndarray, psf_ax: cupy.ndarray, psf_bz: cupy.ndarray, psf_by: cupy.ndarray, psf_bx: cupy.ndarray, bp_az: cupy.ndarray, bp_ay: cupy.ndarray, bp_ax: cupy.ndarray, bp_bz: cupy.ndarray, bp_by: cupy.ndarray, bp_bx: cupy.ndarray, num_iter: int, epsilon: float, boundary_correction: bool, zero_padding: PadType, boundary_sigma_a: float, boundary_sigma_b: float, conv_module: Optional[cupy.RawModule], launch_pars: Optional[CuLaunchParameters], verbose: bool) -> cupy.ndarray

Joint deconvolution specifically for diSPIM volumes.

Parameters:

Name Type Description Default
view_a ndarray

input volume for view A

required
view_b ndarray

input volume for view B

required
est_i ndarray

initial estimate

required
psf_az ndarray

PSF for view A, z-direction

required
psf_ay ndarray

PSF for view A, y-direction

required
psf_ax ndarray

PSF for view A, x-direction

required
psf_bz ndarray

PSF for view B, z-direction

required
psf_by ndarray

PSF for view B, y-direction

required
psf_bx ndarray

PSF for view B, x-direction

required
bp_az ndarray

backprojector for view A, z-direction

required
bp_ay ndarray

backprojector for view A, y-direction

required
bp_ax ndarray

backprojector for view A, x-direction

required
bp_bz ndarray

backprojector for view B, z-direction

required
bp_by ndarray

backprojector for view B, y-direction

required
bp_bx ndarray

backprojector for view B, x-direction

required
num_iter int

number of deconvolution iterations

required
epsilon float

small parameter to prevent division by zero

required
boundary_correction bool

whether or not to do boundary correction

required
zero_padding Optional[PadType]

zero-padding for boundary correction

required
boundary_sigma_a float

threshold for determining significant pixels in view A, defaults to 1e-2.

required
boundary_sigma_b float

threshold for determining significant pixels in view A, defaults to 1e-2.

required
conv_module Optional[RawModule]

cupy.RawModule that implements separable convolution for appropriate kernel. If None, will be generated on the fly.

required
launch_pars Optional[CuLaunchParameters]

kernel launch parameters. If None, will be computed for the input volume.

required
verbose bool

description

required

Returns:

Type Description
ndarray

cupy.ndarray

Source code in packages/pyspim/src/pyspim/decon/rl/dualview_sep.py
def joint_rl_dispim(
    view_a: cupy.ndarray,
    view_b: cupy.ndarray,
    est_i: cupy.ndarray,
    psf_az: cupy.ndarray,
    psf_ay: cupy.ndarray,
    psf_ax: cupy.ndarray,
    psf_bz: cupy.ndarray,
    psf_by: cupy.ndarray,
    psf_bx: cupy.ndarray,
    bp_az: cupy.ndarray,
    bp_ay: cupy.ndarray,
    bp_ax: cupy.ndarray,
    bp_bz: cupy.ndarray,
    bp_by: cupy.ndarray,
    bp_bx: cupy.ndarray,
    num_iter: int,
    epsilon: float,
    boundary_correction: bool,
    zero_padding: PadType,
    boundary_sigma_a: float,
    boundary_sigma_b: float,
    conv_module: Optional[cupy.RawModule],
    launch_pars: Optional[CuLaunchParameters],
    verbose: bool,
) -> cupy.ndarray:
    """Joint deconvolution specifically for diSPIM volumes.

    Args:
        view_a (cupy.ndarray): input volume for view A
        view_b (cupy.ndarray): input volume for view B
        est_i (cupy.ndarray): initial estimate
        psf_az (cupy.ndarray): PSF for view A, z-direction
        psf_ay (cupy.ndarray): PSF for view A, y-direction
        psf_ax (cupy.ndarray): PSF for view A, x-direction
        psf_bz (cupy.ndarray): PSF for view B, z-direction
        psf_by (cupy.ndarray): PSF for view B, y-direction
        psf_bx (cupy.ndarray): PSF for view B, x-direction
        bp_az (cupy.ndarray): backprojector for view A, z-direction
        bp_ay (cupy.ndarray): backprojector for view A, y-direction
        bp_ax (cupy.ndarray): backprojector for view A, x-direction
        bp_bz (cupy.ndarray): backprojector for view B, z-direction
        bp_by (cupy.ndarray): backprojector for view B, y-direction
        bp_bx (cupy.ndarray): backprojector for view B, x-direction
        num_iter (int): number of deconvolution iterations
        epsilon (float): small parameter to prevent division by zero
        boundary_correction (bool): whether or not to do boundary correction
        zero_padding (Optional[PadType]): zero-padding for boundary correction
        boundary_sigma_a (float): threshold for determining significant pixels in view A, defaults to 1e-2.
        boundary_sigma_b (float): threshold for determining significant pixels in view A, defaults to 1e-2.
        conv_module (Optional[cupy.RawModule]): ``cupy.RawModule`` that implements separable convolution for appropriate kernel. If ``None``, will be generated on the fly.
        launch_pars (Optional[CuLaunchParameters]): kernel launch parameters. If ``None``, will be computed for the input volume.
        verbose (bool): _description_

    Returns:
        cupy.ndarray
    """
    # compile the convolution module
    if conv_module is None:
        conv_module = make_conv_module(len(psf_az) // 2)
    if launch_pars is None:
        launch_pars = calc_launch_params(len(psf_az) // 2, view_a.shape)
    # compile the
    if boundary_correction:
        warn("not implemented, falling back to additive algorithm")
        return _joint_rl_dispim_corr(
            view_a,
            view_b,
            est_i,
            psf_az,
            psf_ay,
            psf_ax,
            psf_bz,
            psf_by,
            psf_bx,
            bp_az,
            bp_ay,
            bp_ax,
            bp_bz,
            bp_by,
            bp_bx,
            num_iter,
            epsilon,
            zero_padding,
            boundary_sigma_a,
            boundary_sigma_b,
            conv_module,
            launch_pars,
            verbose,
        )
    else:
        return _joint_rl_dispim_uncorr(
            view_a,
            view_b,
            est_i,
            psf_az,
            psf_ay,
            psf_ax,
            psf_bz,
            psf_by,
            psf_bx,
            bp_az,
            bp_ay,
            bp_ax,
            bp_bz,
            bp_by,
            bp_bx,
            num_iter,
            epsilon,
            conv_module,
            launch_pars,
            verbose,
        )
deconvolve_chunkwise(view_a: zarr.Array, view_b: zarr.Array, out: zarr.Array, chunk_size: int | Tuple[int, int, int], overlap: int | Tuple[int, int, int], sigma_az: float, sigma_ay: float, sigma_ax: float, sigma_bz: float, sigma_by: float, sigma_bx: float, kernel_radius_z: int, kernel_radius_y: int, kernel_radius_x: int, decon_function: str, num_iter: int, epsilon: float, boundary_correction: bool, zero_padding: PadType, boundary_sigma_a: float, boundary_sigma_b: float, verbose: bool)

Joint deconvolution of the input views A & B, done chunk-by-chunk.

Parameters:

Name Type Description Default
view_a Array

zarr array for view A data

required
view_b Array

zarr array for view B data

required
out Array

zarr array to place deconvolved, output data into

required
chunk_size int | Tuple[int, int, int]

size of chunks

required
overlap int | Tuple[int, int, int]

amount of overlap between chunks

required
sigma_az float

standard deviation of PSF (assumed Gaussian) for view A in z-direction

required
sigma_ay float

standard deviation of PSF (assumed Gaussian) for view A in y-direction

required
sigma_ax float

standard deviation of PSF (assumed Gaussian) for view A in x-direction

required
sigma_bz float

standard deviation of PSF (assumed Gaussian) for view B in z-direction

required
sigma_by float

standard deviation of PSF (assumed Gaussian) for view B in y-direction

required
sigma_bx float

standard deviation of PSF (assumed Gaussian) for view B in x-direction

required
kernel_radius_z int

radius of PSF kernel in z-direction

required
kernel_radius_y int

radius of PSF kernel in y-direction

required
kernel_radius_x int

radius of PSF kernel in x-direction

required
decon_function str

deconvolution function to use. Either "dispim","additive".

required
num_iter int

number of deconvolution iterations.

required
epsilon float

small parameter to prevent division by zero

required
boundary_correction bool

whether or not to do boundary correction

required
zero_padding Optional[PadType]

zero-padding for boundary correction

required
boundary_sigma_a float

threshold for determining significant pixels in view A, defaults to 1e-2.

required
boundary_sigma_b float

threshold for determining significant pixels in view A, defaults to 1e-2.

required
verbose bool

display a progress bar showing how many chunks have been/will be computed.

required
Source code in packages/pyspim/src/pyspim/decon/rl/dualview_sep.py
def deconvolve_chunkwise(
    view_a: zarr.Array,
    view_b: zarr.Array,
    out: zarr.Array,
    chunk_size: int | Tuple[int, int, int],
    overlap: int | Tuple[int, int, int],
    sigma_az: float,
    sigma_ay: float,
    sigma_ax: float,
    sigma_bz: float,
    sigma_by: float,
    sigma_bx: float,
    kernel_radius_z: int,
    kernel_radius_y: int,
    kernel_radius_x: int,
    decon_function: str,
    num_iter: int,
    epsilon: float,
    boundary_correction: bool,
    zero_padding: PadType,
    boundary_sigma_a: float,
    boundary_sigma_b: float,
    verbose: bool,
):
    """Joint deconvolution of the input views A & B, done chunk-by-chunk.

    Args:
        view_a (zarr.Array): zarr array for view A data
        view_b (zarr.Array): zarr array for view B data
        out (zarr.Array): zarr array to place deconvolved, output data into
        chunk_size (int | Tuple[int,int,int]): size of chunks
        overlap (int | Tuple[int,int,int]): amount of overlap between chunks
        sigma_az (float): standard deviation of PSF (assumed Gaussian) for view A in z-direction
        sigma_ay (float): standard deviation of PSF (assumed Gaussian) for view A in y-direction
        sigma_ax (float): standard deviation of PSF (assumed Gaussian) for view A in x-direction
        sigma_bz (float): standard deviation of PSF (assumed Gaussian) for view B in z-direction
        sigma_by (float): standard deviation of PSF (assumed Gaussian) for view B in y-direction
        sigma_bx (float): standard deviation of PSF (assumed Gaussian) for view B in x-direction
        kernel_radius_z (int): radius of PSF kernel in z-direction
        kernel_radius_y (int): radius of PSF kernel in y-direction
        kernel_radius_x (int): radius of PSF kernel in x-direction
        decon_function (str): deconvolution function to use. Either ``"dispim","additive"``.
        num_iter (int): number of deconvolution iterations.
        epsilon (float): small parameter to prevent division by zero
        boundary_correction (bool): whether or not to do boundary correction
        zero_padding (Optional[PadType]): zero-padding for boundary correction
        boundary_sigma_a (float): threshold for determining significant pixels in view A, defaults to 1e-2.
        boundary_sigma_b (float): threshold for determining significant pixels in view A, defaults to 1e-2.
        verbose (bool): display a progress bar showing how many chunks have been/will be computed.
    """
    if len(view_a.shape) == 4:
        channel_slice = slice(None)
        ch_a, z_a, r_a, c_a = view_a.shape
        ch_b, z_b, r_b, c_b = view_b.shape
        assert ch_a == ch_b, "input volumes must have same # channels"
    else:
        channel_slice = None
        z_a, r_a, c_a = view_a.shape
        z_b, r_b, c_b = view_b.shape
    assert all([a == b for a, b in zip([z_a, r_a, c_a], [z_b, r_b, c_b])]), (
        "input volumes must be same shape"
    )
    chunks = calculate_conv_chunks(z_a, r_a, c_a, chunk_size, overlap, channel_slice)
    n_gpu = cupy.cuda.runtime.getDeviceCount()
    with concurrent.futures.ProcessPoolExecutor(
        max_workers=n_gpu, mp_context=multiprocessing.get_context("spawn")
    ) as executor, multiprocessing.Manager() as manager:
        gpu_queue = manager.Queue()
        for gpu_id in range(n_gpu):
            gpu_queue.put(gpu_id)
        fun = partial(
            _decon_chunk,
            out,
            view_a,
            view_b,
            sigma_az,
            sigma_ay,
            sigma_ax,
            sigma_bz,
            sigma_by,
            sigma_bx,
            kernel_radius_z,
            kernel_radius_y,
            kernel_radius_x,
            decon_function,
            num_iter,
            epsilon,
            boundary_correction,
            zero_padding,
            boundary_sigma_a,
            boundary_sigma_b,
        )
        futures = []
        for _, chunk in chunks.items():
            future = executor.submit(fun, chunk, gpu_queue)
            futures.append(future)
        if verbose:
            pbar = tqdm(total=len(futures), desc="Deconvolving Chunks")
        else:
            pbar = nullcontext
        with pbar:
            for future in concurrent.futures.as_completed(futures):
                val = future.result()
                if verbose:
                    pbar.update(1)
                    pbar.set_postfix({"ind": val})
singleview_fft

Single-view Richardson Lucy deconvolution using FFT-space convolution.

References

[1] Bertero & Boccacci, "A simple method...", doi:10.1051/0004-6361:20052717

Functions
richardson_lucy(image: NDArray, psf: NDArray, bp: NDArray, num_iter: int = 50, boundary_correction: bool = True, epsilon: Optional[float] = None, boundary_padding: Optional[int] = None, boundary_sigma: float = 0.01, verbose: bool = False) -> NDArray

richardson_lucy Richardson-Lucy deconvolution.

Parameters:

Name Type Description Default
image NDArray

input volume to be deconvolved

required
psf NDArray

point spread function

required
bp NDArray

back projector for deconvolution

required
num_iter int

number of iterations. Defaults to 50.

50
boundary_correction bool

whether or not to do boundary correction. Defaults to True.

True
epsilon Optional[float]

small parameter to prevent division by zero. Defaults to None.

None
boundary_padding Optional[int]

zero-padding for boundary correction. Defaults to None.

None
boundary_sigma float

significance level for pixels when doing boundary correction. Defaults to 1e-2.

0.01
verbose bool

show a progress bar. Defaults to False.

False

Raises:

Type Description
NotImplementedError

uncorrected version of RL deconvolution.

Returns:

Type Description
NDArray

NDArray

Source code in packages/pyspim/src/pyspim/decon/rl/singleview_fft.py
def richardson_lucy(
    image: NDArray,
    psf: NDArray,
    bp: NDArray,
    num_iter: int = 50,
    boundary_correction: bool = True,
    epsilon: Optional[float] = None,
    boundary_padding: Optional[int] = None,
    boundary_sigma: float = 1e-2,
    verbose: bool = False,
) -> NDArray:
    """richardson_lucy Richardson-Lucy deconvolution.

    Args:
        image (NDArray): input volume to be deconvolved
        psf (NDArray): point spread function
        bp (NDArray): back projector for deconvolution
        num_iter (int, optional): number of iterations. Defaults to 50.
        boundary_correction (bool, optional): whether or not to do boundary correction. Defaults to True.
        epsilon (Optional[float], optional): small parameter to prevent division by zero. Defaults to None.
        boundary_padding (Optional[int], optional): zero-padding for boundary correction. Defaults to None.
        boundary_sigma (float, optional): significance level for pixels when doing boundary correction. Defaults to 1e-2.
        verbose (bool, optional): show a progress bar. Defaults to False.

    Raises:
        NotImplementedError: uncorrected version of RL deconvolution.

    Returns:
        NDArray
    """
    if boundary_correction:
        return richardson_lucy_boundcorr(
            image,
            psf,
            bp,
            num_iter,
            epsilon,
            boundary_padding,
            boundary_sigma,
            init_constant=False,
            verbose=verbose,
        )
    else:
        raise NotImplementedError("TODO")
richardson_lucy_boundcorr(image: NDArray, psf: NDArray, bp: Optional[NDArray] = None, num_iter: int = 50, epsilon: float = 0.0001, zero_padding: Optional[PadType] = None, boundary_sigma: float = 0.01, init_constant: bool = False, verbose: bool = False) -> NDArray

richardson_lucy_boundcorr Richardson-Lucy deconvolution with boundary correction described in [1], this method reduces Gibbs oscillations that occur due to boundary effects when doing RL deconvolution.

Parameters:

Name Type Description Default
image NDArray

input volume to be deconvolved

required
psf NDArray

point spread function

required
bp Optional[NDArray]

backprojector. Defaults to None, if so will used mirrored PSF.

None
num_iter int

number of iterations. Defaults to 50.

50
epsilon float

small parameter to prevent division by zero. Defaults to 1e-4.

0.0001
zero_padding Optional[PadType]

amount of zero-padding. Defaults to None.

None
boundary_sigma float

significance level for pixels when doing boundary correction. Defaults to 1e-2.

0.01
init_constant bool

whether to make the inital guess a constant array, (True) or (A+B)/2 (False). Defaults to False.

False
verbose bool

show progress bar. Defaults to False.

False

Returns:

Type Description
NDArray

NDArray

Source code in packages/pyspim/src/pyspim/decon/rl/singleview_fft.py
def richardson_lucy_boundcorr(
    image: NDArray,
    psf: NDArray,
    bp: Optional[NDArray] = None,
    num_iter: int = 50,
    epsilon: float = 1e-4,
    zero_padding: Optional[PadType] = None,
    boundary_sigma: float = 1e-2,
    init_constant: bool = False,
    verbose: bool = False,
) -> NDArray:
    """richardson_lucy_boundcorr Richardson-Lucy deconvolution with boundary correction described in [1], this method reduces Gibbs oscillations that occur due to boundary effects when doing RL deconvolution.

    Args:
        image (NDArray): input volume to be deconvolved
        psf (NDArray): point spread function
        bp (Optional[NDArray], optional): backprojector. Defaults to None, if so will used mirrored PSF.
        num_iter (int, optional): number of iterations. Defaults to 50.
        epsilon (float, optional): small parameter to prevent division by zero. Defaults to 1e-4.
        zero_padding (Optional[PadType], optional): amount of zero-padding. Defaults to None.
        boundary_sigma (float, optional): significance level for pixels when doing boundary correction. Defaults to 1e-2.
        init_constant (bool, optional): whether to make the inital guess a constant array, (``True``) or (A+B)/2 (``False``). Defaults to False.
        verbose (bool, optional): show progress bar. Defaults to False.

    Returns:
        NDArray
    """
    assert len(image.shape) == 2 or len(image.shape) == 3, (
        "RL deconvolution only implemented for 2- and 3d inputs"
    )
    xp = cupy.get_array_module(image)
    # make sure all inputs are floats
    float_type = supported_float_type(image.dtype)
    image = image.astype(float_type, copy=False)
    psf = psf.astype(float_type, copy=False)
    # default back-projector is mirrored PSF
    if bp is None:
        bp = xp.ascontiguousarray(psf[::-1, ::-1, ::-1])
    # figure out padding
    if zero_padding is None:  # use default value from paper (2N)
        # default is to pad such that an NxN image is 2Nx2N
        pad = tuple([(d // 2, d // 2) for d in image.shape])
    else:
        if isinstance(zero_padding, int):
            pad = tuple([(zero_padding, zero_padding) for _ in image.shape])
        else:
            assert len(zero_padding) == len(image.shape), (
                "zero padding must be specified for all dimensions of input"
            )
            pad = tuple([(p, p) for p in zero_padding])
    # define utility function for doing convolution
    # NOTE: by default `fftconvolve` wants the larger array as first arg
    # but for clarity, I like specifying the PSF first
    if xp == cupy:
        conv = lambda k, i: fftconv_gpu(i, k, mode="same")
    else:
        conv = lambda k, i: fftconv_cpu(i, k, mode="same")
    # generate the mask that we'll use to determine $\overbar{alpha}$
    mask = xp.ones_like(image)
    # pad the image and the mask with zeros
    image = xp.pad(image, padding=pad, mode="constant", constant_values=0)
    mask = xp.pad(mask, padding=pad, mode="constant", constant_values=0)
    # calculate the window, $\overbar{w}(n')
    alpha = conv(bp, mask)
    window = xp.where(alpha > boundary_sigma, 1.0 / alpha, 0.0)
    # initialization conditions
    # if *not* init_constant, just start with the data
    if init_constant:
        est = xp.ones_like(image) * xp.mean(image)
    else:
        est = image
    # Lucy-Richardson iterations
    for _ in trange(num_iter) if verbose else range(num_iter):
        con = conv(psf, est)
        est = xp.multiply(
            xp.multiply(window, est),  # conv(window, est)
            conv(bp, xp.where(con < epsilon, 0, image / con)),
        )
    # trim out padding
    if len(est) == 2:
        return est[pad[0][0] : -pad[0][1], pad[1][0] : -pad[1][1]]
    else:
        return est[
            pad[0][0] : -pad[0][1], pad[1][0] : -pad[1][1], pad[2][0] : -pad[2][1]
        ]
singleview_sep

Separable deconvolution for single view volumes.

TODO: more detail here.

Functions
deconvolve(volume: NDArray, psf_z: NDArray, psf_y: NDArray, psf_x: NDArray, bp_z: NDArray, bp_y: NDArray, bp_x: NDArray, num_iter: int, boundary_correction: bool, epsilon: Optional[float], init_constant: bool, boundary_padding: Optional[int], boundary_sigma: float, verbose: bool) -> NDArray

deconvolve do deconvolution of volume.

Parameters:

Name Type Description Default
volume NDArray

input volume to be deconvolved

required
psf_z NDArray

PSF kernel in z-direction

required
psf_y NDArray

PSF kernel in y-direction

required
psf_x NDArray

PSF kernel in x-direction

required
bp_z NDArray

backprojector kernel in z-direction

required
bp_y NDArray

backprojector kernel in y-direction

required
bp_x NDArray

backprojector kernel in x-direction

required
num_iter int

number of iterations to do deconvolution for

required
boundary_correction bool

whether or not to do boundary correction

required
epsilon Optional[float]

small parameter to prevent division by zero

required
init_constant bool

initialize deconvolution with a constant array (if False, will use input volume)

required
boundary_padding Optional[int]

zero-padding for boundary correction

required
boundary_sigma float

significance value for pixels when doing boundary correction

required
verbose bool

show progress bar

required

Raises:

Type Description
NotImplementedError

boundary correction

Returns:

Type Description
NDArray

NDArray

Source code in packages/pyspim/src/pyspim/decon/rl/singleview_sep.py
def deconvolve(
    volume: NDArray,
    psf_z: NDArray,
    psf_y: NDArray,
    psf_x: NDArray,
    bp_z: NDArray,
    bp_y: NDArray,
    bp_x: NDArray,
    num_iter: int,
    boundary_correction: bool,
    epsilon: Optional[float],
    init_constant: bool,
    boundary_padding: Optional[int],
    boundary_sigma: float,
    verbose: bool,
) -> NDArray:
    """deconvolve do deconvolution of ``volume``.

    Args:
        volume (NDArray): input volume to be deconvolved
        psf_z (NDArray): PSF kernel in z-direction
        psf_y (NDArray): PSF kernel in y-direction
        psf_x (NDArray): PSF kernel in x-direction
        bp_z (NDArray): backprojector kernel in z-direction
        bp_y (NDArray): backprojector kernel in y-direction
        bp_x (NDArray): backprojector kernel in x-direction
        num_iter (int): number of iterations to do deconvolution for
        boundary_correction (bool): whether or not to do boundary correction
        epsilon (Optional[float]): small parameter to prevent division by zero
        init_constant (bool): initialize deconvolution with a constant array (if ``False``, will use input volume)
        boundary_padding (Optional[int]): zero-padding for boundary correction
        boundary_sigma (float): significance value for pixels when doing boundary correction
        verbose (bool): show progress bar

    Raises:
        NotImplementedError: boundary correction

    Returns:
        NDArray
    """
    volume = cupy.asarray(volume, dtype=cupy.float32, order="F")
    psf_z = cupy.asarray(psf_z, dtype=cupy.float32)
    psf_y = cupy.asarray(psf_y, dtype=cupy.float32)
    psf_x = cupy.asarray(psf_x, dtype=cupy.float32)
    bp_z = cupy.asarray(bp_z, dtype=cupy.float32)
    bp_y = cupy.asarray(bp_y, dtype=cupy.float32)
    bp_x = cupy.asarray(bp_x, dtype=cupy.float32)
    if init_constant:
        raise NotImplementedError("todo")
    else:
        est_i = volume
    if boundary_correction:
        raise NotImplementedError("todo")
    else:
        return _deconvolve_uncorrected(
            volume, psf_z, psf_y, psf_x, bp_z, bp_y, bp_x, num_iter, epsilon, verbose
        )

deskew

Functions
Modules
affine

Deskew by shear-warp algorithm.

Deskew the input volume using an affine transformation. Should return the same result as dispim but using shear-warp algorithm and (possibly higher-order) interpolation instead of texture-based interpolation.

Functions
fwd_deskew_matrix(pixel_size: float, step_size: float, direction: int, zed: int, row: int, col: int) -> numpy.ndarray

Forward deskewing matrix

Parameters:

Name Type Description Default
pixel_size float

size of pixels, in real space

required
step_size float

spacing between sheets, in real units

required
direction int

scan direction (±1)

required
z

input volume size in z

required
r

input volume size in r

required
c

input volume size in c

required

Returns:

Type Description
ndarray

numpy.ndarray

Source code in packages/pyspim/src/pyspim/deskew/affine.py
def fwd_deskew_matrix(
    pixel_size: float, step_size: float, direction: int,
    zed: int, row: int, col: int
) -> numpy.ndarray:
    """Forward deskewing matrix

    Args:
        pixel_size (float): size of pixels, in real space
        step_size (float): spacing between sheets, in real units
        direction (int): scan direction (+/-1)
        z: input volume size in z
        r: input volume size in r
        c: input volume size in c

    Returns:
        numpy.ndarray
    """
    sq2 = sqrt(2)
    a = 1 / sqrt(2)
    r = step_size / pixel_size
    u_c = (col - 1) / 2
    v_c = (row - 1) / 2
    w_c = (zed - 1) / 2

    if direction > 0:
        return numpy.asarray([
            [ a, 0, 0, -a * u_c],
            [ 0, 1, 0, -v_c],
            [ a, 0, r * sq2, -(a * u_c + r * sq2 * w_c)],
            [ 0, 0, 0, 1],
        ])
    else:
        return numpy.asarray([
            [-a, 0, 0, a * u_c],
            [ 0, 1, 0, -v_c],
            [ a, 0, -r * sq2, -(a * u_c - r * sq2 * w_c)],
            [ 0, 0, 0, 1],
        ])
output_shape(z: int, r: int, c: int, pixel_size: float, step_size: float, direction: int) -> Tuple[int, int, int]

Compute output shape of deskewing transform.

Parameters:

Name Type Description Default
z int

shape of input volume, z-direction

required
r int

shape of input volume, r-direction (# rows)

required
c int

shape of input volume, c-direction (# cols)

required
pixel_size float

size of pixels, in real units

required
step_size float

spacing between steps, in real units

required
direction int

scan direction (± 1)

required
auto_crop bool

automatically crop the output to account for rotation of B-view

required

Returns:

Type Description
Tuple[int, int, int]

Tuple[int,int,int]

Source code in packages/pyspim/src/pyspim/deskew/affine.py
def output_shape(
    z: int,
    r: int,
    c: int,
    pixel_size: float,
    step_size: float,
    direction: int,
) -> Tuple[int, int, int]:
    """Compute output shape of deskewing transform.

    Args:
        z (int): shape of input volume, z-direction
        r (int): shape of input volume, r-direction (# rows)
        c (int): shape of input volume, c-direction (# cols)
        pixel_size (float): size of pixels, in real units
        step_size (float): spacing between steps, in real units
        direction (int): scan direction (+/- 1)
        auto_crop (bool): automatically crop the output to account for rotation of B-view

    Returns:
        Tuple[int,int,int]
    """
    full_shp = output_shape_for_transform(
        fwd_deskew_matrix(pixel_size, step_size, direction, z, r, c), 
        (z, r, c)
    )
    return full_shp
deskewing_transform(z: int, r: int, c: int, pixel_size: float, step_size: float, direction: int) -> Tuple[NDArray, Tuple[int, int, int]]

Compute the deskewing transform for the input volume.

Parameters:

Name Type Description Default
z int

size of input volume, z-direction (pixels)

required
r int

size of input volume, r-direction (pixels, # rows)

required
c int

size of input volume, c-direction (pixels, # cols)

required
pixel_size float

size of pixels, in real units

required
step_size float

step size, in real units

required
direction int

scan direction (±1)

required
auto_crop bool

auto-crop outputs to account for rotation of B into coordinate system of A.

required
rotation_thetas Tuple[int, int, int] | None

rotation angles for each axis. If None, no rotation is done.

required

Returns:

Type Description
Tuple[NDArray, Tuple[int, int, int]]

numpy.ndarray, List[int]: deskewing transform & the output shape

Source code in packages/pyspim/src/pyspim/deskew/affine.py
def deskewing_transform(
    z: int,
    r: int,
    c: int,
    pixel_size: float,
    step_size: float,
    direction: int,
) -> Tuple[NDArray, Tuple[int,int,int]]:
    """Compute the deskewing transform for the input volume.

    Args:
        z (int): size of input volume, z-direction (pixels)
        r (int): size of input volume, r-direction (pixels, # rows)
        c (int): size of input volume, c-direction (pixels, # cols)
        pixel_size (float): size of pixels, in real units
        step_size (float): step size, in real units
        direction (int): scan direction (+/-1)
        auto_crop (bool): auto-crop outputs to account for rotation of B into coordinate system of A.
        rotation_thetas (Tuple[int,int,int] | None): rotation angles for each axis. If ``None``, no rotation is done.

    Returns:
        numpy.ndarray, List[int]: deskewing transform & the output shape
    """
    T = fwd_deskew_matrix(pixel_size, step_size, direction, z, r, c)
    shp = output_shape_for_transform(T, (z, r, c))

    # Translate output volume coordinates (0 to shape-1) to origin-centered coordinates
    u_c_out = (shp[2] - 1) / 2
    v_c_out = (shp[1] - 1) / 2
    w_c_out = (shp[0] - 1) / 2

    t = translation_matrix(-u_c_out, -v_c_out, -w_c_out)
    D = numpy.linalg.inv(T) @ t
    return D, shp
deskew_stage_scan(im: cupy.ndarray, pixel_size: float, step_size: float, direction: int, interp_method: str, preserve_dtype: bool, block_size: Tuple[int, int, int]) -> cupy.ndarray

Deskew the input volume using the shear-warp algorithm.

Parameters:

Name Type Description Default
im ndarray

input volume to be deskewed.

required
pixel_size float

pixel size, in real units

required
step_size float

step size, in real units

required
direction int

scan direction (± 1)

required
rotation_thetas Tuple[float, float, float] | None

rotation angles for each axis. If None, no rotation is done.

required
interp_method str

interpolation method to use

required
auto_crop bool

auto-crop output volumes to account for rotation of view B

required
preserve_dtype bool

make output volume have same datatype as input volume (probably uint16).

required
block_size Tuple[int, int, int]

size of blocks for CUDA kernel launch.

required

Returns:

Type Description
ndarray

cupy.ndarray

Source code in packages/pyspim/src/pyspim/deskew/affine.py
def deskew_stage_scan(
    im: cupy.ndarray,
    pixel_size: float,
    step_size: float,
    direction: int,
    interp_method: str,
    preserve_dtype: bool,
    block_size: Tuple[int, int, int],
) -> cupy.ndarray:
    """Deskew the input volume using the shear-warp algorithm.

    Args:
        im (cupy.ndarray): input volume to be deskewed.
        pixel_size (float): pixel size, in real units
        step_size (float): step size, in real units
        direction (int): scan direction (+/- 1)
        rotation_thetas (Tuple[float,float,float] | None): rotation angles for each axis. If ``None``, no rotation is done.
        interp_method (str): interpolation method to use
        auto_crop (bool): auto-crop output volumes to account for rotation of view B
        preserve_dtype (bool): make output volume have same datatype as input volume (probably uint16).
        block_size (Tuple[int,int,int]): size of blocks for CUDA kernel launch.

    Returns:
        cupy.ndarray
    """
    T_inv, out_shape = deskewing_transform(
        im.shape[0],
        im.shape[1],
        im.shape[2],
        pixel_size,
        step_size,
        direction,
    )
    T_inv = cupy.asarray(T_inv).astype(cupy.float32)
    dsk = transform(
        im, T_inv, interp_method, preserve_dtype, 
        out_shape, *block_size
    )
    return dsk.swapaxes(0, 2)
dispim

Deskew stage-scanned volume into "normal" diSPIM coordinate system (see [1], esp. Figure 1).

We mimic the original Shroff lab Fiji plugin, but use a CUDA kernel for speed. Note that interpolation is done using CUDA texture memory.

References

[1] Kumar, et. al, "Using Stage- and Slit-...", doi: 10.1086/689589

Functions
deskew_stage_scan(im: NDArray, pixel_size: float, step_size: float, direction: int, **kwargs) -> NDArray

Deskew the input volume, im.

Parameters:

Name Type Description Default
im NDArray

input volume

required
pixel_size float

pixel size, in real (space) units

required
step_size float

step size, in real (space) units

required
direction int

±1, direction of objective movement rel. to x-axis.

required
**kwargs

block_size will be passed as kernel launch parameter.

{}

Returns:

Type Description
NDArray

NDArray

Source code in packages/pyspim/src/pyspim/deskew/dispim.py
def deskew_stage_scan(
    im: NDArray, pixel_size: float, step_size: float, direction: int, **kwargs
) -> NDArray:
    """Deskew the input volume, ``im``.

    Args:
        im (NDArray): input volume
        pixel_size (float): pixel size, in real (space) units
        step_size (float): step size, in real (space) units
        direction (int): +/-1, direction of objective movement rel. to x-axis.
        **kwargs: ``block_size`` will be passed as kernel launch parameter.

    Returns:
        NDArray
    """
    return _deskew_texture(im, pixel_size, step_size, direction, **kwargs)
output_width(depth: int, width: int, step_size: float, pixel_size: float) -> int

output_width Compute output width of deskewed volume

Parameters:

Name Type Description Default
depth int

depth of input (pre-deskewing) volume

required
width int

width of input (pre-deskewing) volume

required
step_size float

step size, in real (space) units

required
pixel_size float

pixel size, in real (space) units

required

Returns:

Type Description
int

int

Source code in packages/pyspim/src/pyspim/deskew/dispim.py
def output_width(depth: int, width: int, step_size: float, pixel_size: float) -> int:
    """output_width Compute output width of deskewed volume

    Args:
        depth (int): depth of input (pre-deskewing) volume
        width (int): width of input (pre-deskewing) volume
        step_size (float): step size, in real (space) units
        pixel_size (float): pixel size, in real (space) units

    Returns:
        int
    """
    return width + round(depth * abs(step_size / pixel_size))
ortho

Deskewing by orthogonal interpolation.

Uses the 'orthogonal interpolation' scheme where interpolation is done orthogonally to the direction of the light sheet. this was originally proposed/implemented in V. Maioli's Ph.D. thesis [1]. Our code is based heavily on the implementation by D. Shepherd's group [2].

References

[1] Vincent Miaioli's PhD Thesis doi: 10.25560/68022 [2] github.com/QI2lab/OPM

Functions
deskew_stage_scan(im: NDArray, pixel_size: float, step_size: float, direction: int, theta: float = math.pi / 4, preserve_dtype: bool = False, stream: cupy.cuda.Stream | None = None) -> NDArray

Deskew stage scan data into xyz coordinate system.

Output format is zyx where z is normal to the coverslip, and x & y are the coverslip, y is along the long axis of the sheet, x along is the scan direction.

Parameters:

Name Type Description Default
im NDArray

input volume to be deskewed

required
pixel_size float

size of pixels, in real (space) units

required
step_size float

real space spacing between sheets

required
direction int

scan direction (±1)

required
theta float

angle of objective w.r.t coverslip (in radians)

pi / 4
preserve_dtype bool

preserve input datatype in output. If False, will return single-precision float.

False

Returns: NDArray

Source code in packages/pyspim/src/pyspim/deskew/ortho.py
def deskew_stage_scan(
    im: NDArray,
    pixel_size: float,
    step_size: float,
    direction: int,
    theta: float = math.pi / 4,
    preserve_dtype: bool = False,
    stream: cupy.cuda.Stream | None = None,
) -> NDArray:
    """Deskew stage scan data into xyz coordinate system.

    Output format is zyx where z is normal to the coverslip, and x & y are the coverslip, y is along the long axis of the sheet, x along is the scan direction.

    Args:
        im (NDArray): input volume to be deskewed
        pixel_size (float): size of pixels, in real (space) units
        step_size (float): real space spacing between sheets
        direction (int): scan direction (+/-1)
        theta (float): angle of objective w.r.t coverslip (in radians)
        preserve_dtype (bool): preserve input datatype in output. If ``False``, will return single-precision float.
    Returns:
        NDArray
    """
    xp = cupy.get_array_module(im)
    if xp == numpy: # type: ignore
        dsk = _deskew_orthogonal_cpu(
            im, pixel_size, step_size, direction, theta, preserve_dtype
        )
        #dsk = _zero_triangle_cpu(dsk, direction) # FIXME: why is this here, not necessary?
    else:
        dsk = _deskew_gpu(im, pixel_size, step_size, direction, theta, 
                          preserve_dtype, stream)
        dsk = _zero_triangle_gpu(dsk, direction)
    return dsk
output_shape(z: int, r: int, c: int, pixel_size: float, step_size: float, theta: float = math.pi / 4) -> Tuple[int, int, int]

output_shape Calculate output shape of deskewing a volume.

Parameters:

Name Type Description Default
z int

size of input volume in z-direction

required
r int

size of input volume in r-direction (# rows)

required
c int

size of input volume in c-direction (# cols)

required
pixel_size float

pixel size, in real units

required
step_size float

step size, in real units

required
theta float

angle of objective w.r.t coverslip. Defaults to math.pi/4.

pi / 4

Returns:

Type Description
Tuple[int, int, int]

Tuple[int,int,int]: (n_z, n_y, n_x) shape of deskewed volume

Source code in packages/pyspim/src/pyspim/deskew/ortho.py
def output_shape(
    z: int,
    r: int,
    c: int,
    pixel_size: float,
    step_size: float,
    theta: float = math.pi / 4,
) -> Tuple[int, int, int]:
    """output_shape Calculate output shape of deskewing a volume.

    Args:
        z (int): size of input volume in z-direction
        r (int): size of input volume in r-direction (# rows)
        c (int): size of input volume in c-direction (# cols)
        pixel_size (float): pixel size, in real units
        step_size (float): step size, in real units
        theta (float, optional): angle of objective w.r.t coverslip. Defaults to math.pi/4.

    Returns:
        Tuple[int,int,int]: (n_z, n_y, n_x) shape of deskewed volume
    """
    step_size_lat = step_size / math.cos(theta)
    step_pix = step_size_lat / pixel_size
    # precompute useful trig quantities
    # determine output shape & preallocate
    cos_theta = math.cos(theta)
    sin_theta = math.sin(theta)
    n_x = numpy.int64(numpy.ceil(z * step_pix + c * cos_theta))
    n_y = numpy.int64(r)
    n_z = numpy.int64(numpy.ceil(c * sin_theta))
    return (n_z, n_y, n_x) # type: ignore
shear

Deskew by shear-warp algorithm.

Deskew the input volume using an affine transformation. Should return the same result as dispim but using shear-warp algorithm and (possibly higher-order) interpolation instead of texture-based interpolation.

Functions
inv_deskew_matrix(pixel_size: float, step_size: float, direction: int) -> numpy.ndarray

Inverse deskewing matrix

Parameters:

Name Type Description Default
pixel_size float

size of pixels, in real space

required
step_size float

spacing between sheets, in real units

required
direction int

scan direction (±1)

required

Returns:

Type Description
ndarray

numpy.ndarray

Source code in packages/pyspim/src/pyspim/deskew/shear.py
def inv_deskew_matrix(
    pixel_size: float, step_size: float, direction: int
) -> numpy.ndarray:
    """Inverse deskewing matrix

    Args:
        pixel_size (float): size of pixels, in real space
        step_size (float): spacing between sheets, in real units
        direction (int): scan direction (+/-1)

    Returns:
        numpy.ndarray
    """
    return numpy.asarray(
        [
            [1, 0, direction * step_size / pixel_size, 0],
            [0, 1, 0, 0],
            [0, 0, step_size / pixel_size, 0],
            [0, 0, 0, 1],
        ]
    )
output_shape(z: int, r: int, c: int, pixel_size: float, step_size: float, direction: int, auto_crop: bool) -> Tuple[int, int, int]

Compute output shape of deskewing transform.

Parameters:

Name Type Description Default
z int

shape of input volume, z-direction

required
r int

shape of input volume, r-direction (# rows)

required
c int

shape of input volume, c-direction (# cols)

required
pixel_size float

size of pixels, in real units

required
step_size float

spacing between steps, in real units

required
direction int

scan direction (± 1)

required
auto_crop bool

automatically crop the output to account for rotation of B-view

required

Returns:

Type Description
Tuple[int, int, int]

Tuple[int,int,int]

Source code in packages/pyspim/src/pyspim/deskew/shear.py
def output_shape(
    z: int,
    r: int,
    c: int,
    pixel_size: float,
    step_size: float,
    direction: int,
    auto_crop: bool,
) -> Tuple[int, int, int]:
    """Compute output shape of deskewing transform.

    Args:
        z (int): shape of input volume, z-direction
        r (int): shape of input volume, r-direction (# rows)
        c (int): shape of input volume, c-direction (# cols)
        pixel_size (float): size of pixels, in real units
        step_size (float): spacing between steps, in real units
        direction (int): scan direction (+/- 1)
        auto_crop (bool): automatically crop the output to account for rotation of B-view

    Returns:
        Tuple[int,int,int]
    """
    full_shp = output_shape_for_transform(
        inv_deskew_matrix(pixel_size, step_size, direction), (z, r, c)
    )
    if auto_crop:
        return (full_shp[0], full_shp[1], full_shp[0])
    else:
        return full_shp
deskewing_transform(z: int, r: int, c: int, pixel_size: float, step_size: float, direction: int, auto_crop: bool, rotation_thetas: Tuple[float, float, float] | None) -> Tuple[NDArray, Tuple[int, int, int]]

Compute the deskewing transform for the input volume.

Parameters:

Name Type Description Default
z int

size of input volume, z-direction (pixels)

required
r int

size of input volume, r-direction (pixels, # rows)

required
c int

size of input volume, c-direction (pixels, # cols)

required
pixel_size float

size of pixels, in real units

required
step_size float

step size, in real units

required
direction int

scan direction (±1)

required
auto_crop bool

auto-crop outputs to account for rotation of B into coordinate system of A.

required
rotation_thetas Tuple[int, int, int] | None

rotation angles for each axis. If None, no rotation is done.

required

Returns:

Type Description
Tuple[NDArray, Tuple[int, int, int]]

numpy.ndarray, List[int]: deskewing transform & the output shape

Source code in packages/pyspim/src/pyspim/deskew/shear.py
def deskewing_transform(
    z: int,
    r: int,
    c: int,
    pixel_size: float,
    step_size: float,
    direction: int,
    auto_crop: bool,
    rotation_thetas: Tuple[float, float, float] | None,
) -> Tuple[NDArray, Tuple[int,int,int]]:
    """Compute the deskewing transform for the input volume.

    Args:
        z (int): size of input volume, z-direction (pixels)
        r (int): size of input volume, r-direction (pixels, # rows)
        c (int): size of input volume, c-direction (pixels, # cols)
        pixel_size (float): size of pixels, in real units
        step_size (float): step size, in real units
        direction (int): scan direction (+/-1)
        auto_crop (bool): auto-crop outputs to account for rotation of B into coordinate system of A.
        rotation_thetas (Tuple[int,int,int] | None): rotation angles for each axis. If ``None``, no rotation is done.

    Returns:
        numpy.ndarray, List[int]: deskewing transform & the output shape
    """
    D = inv_deskew_matrix(pixel_size, step_size, direction)
    full_shp = output_shape_for_transform(D, (z, r, c))
    if direction < 0:
        t = translation_matrix(full_shp[0], 0, 0)
        D = D @ t
    if auto_crop:
        # rotating B into view of A implies that some of the B view
        # this will take care of automatically cropping the outputs
        # to this overlap region (helps save memory)
        out_shp = (full_shp[0], full_shp[1], full_shp[0])
        t = translation_matrix(-(full_shp[2] - full_shp[0]) / 2, 0, 0)
        D = D @ t
    else:
        out_shp = full_shp
    if rotation_thetas is None:
        T = numpy.linalg.inv(D).astype(numpy.float32)
    else:
        R = rotation_about_point_matrix(
            *rotation_thetas, *[s / 2 for s in out_shp[::-1]]
        )
        T = (numpy.linalg.inv(D) @ R).astype(numpy.float32)
    return T, out_shp
deskew_stage_scan(im: cupy.ndarray, pixel_size: float, step_size: float, direction: int, rotation_thetas: Tuple[float, float, float] | None, interp_method: str, auto_crop: bool, preserve_dtype: bool, block_size: Tuple[int, int, int]) -> cupy.ndarray

Deskew the input volume using the shear-warp algorithm.

Parameters:

Name Type Description Default
im ndarray

input volume to be deskewed.

required
pixel_size float

pixel size, in real units

required
step_size float

step size, in real units

required
direction int

scan direction (± 1)

required
rotation_thetas Tuple[float, float, float] | None

rotation angles for each axis. If None, no rotation is done.

required
interp_method str

interpolation method to use

required
auto_crop bool

auto-crop output volumes to account for rotation of view B

required
preserve_dtype bool

make output volume have same datatype as input volume (probably uint16).

required
block_size Tuple[int, int, int]

size of blocks for CUDA kernel launch.

required

Returns:

Type Description
ndarray

cupy.ndarray

Source code in packages/pyspim/src/pyspim/deskew/shear.py
def deskew_stage_scan(
    im: cupy.ndarray,
    pixel_size: float,
    step_size: float,
    direction: int,
    rotation_thetas: Tuple[float, float, float] | None,
    interp_method: str,
    auto_crop: bool,
    preserve_dtype: bool,
    block_size: Tuple[int, int, int],
) -> cupy.ndarray:
    """Deskew the input volume using the shear-warp algorithm.

    Args:
        im (cupy.ndarray): input volume to be deskewed.
        pixel_size (float): pixel size, in real units
        step_size (float): step size, in real units
        direction (int): scan direction (+/- 1)
        rotation_thetas (Tuple[float,float,float] | None): rotation angles for each axis. If ``None``, no rotation is done.
        interp_method (str): interpolation method to use
        auto_crop (bool): auto-crop output volumes to account for rotation of view B
        preserve_dtype (bool): make output volume have same datatype as input volume (probably uint16).
        block_size (Tuple[int,int,int]): size of blocks for CUDA kernel launch.

    Returns:
        cupy.ndarray
    """
    T, out_shape = deskewing_transform(
        im.shape[0],
        im.shape[1],
        im.shape[2],
        pixel_size,
        step_size,
        direction,
        auto_crop,
        rotation_thetas,
    )
    T = cupy.asarray(T).astype(cupy.float32)
    dsk = transform(im, T, interp_method, preserve_dtype, out_shape, *block_size)
    return dsk

dispimfusion

Reimplementation of the diSPIMFusion API, as first described in [1].

References

[1] Guo M., et al. "Rapid image deconvolution and multiview fusion..." Nature Biotechnology 38.11 (2020): 1337-1346. [2] https://github.com/eguomin/diSPIMFusion [3] https://github.com/eguomin/microImageLib

filter

Functions
Modules
adaptive_median

Adaptive Median Filtering

as described in: Weisong Zhao et al. "Sparse deconvolution improves..." Nature Biotechnology 40, 606–617 (2022).

Functions
fftwavelet
Functions
remove_stripes(im: NDArray, dec_num: int, wavelet: str, sigma: float, direction: str = 'horizontal', verbose: bool = False) -> NDArray

remove stripes from input image by the FFTWavelet algorithm

:param im: input image :type im: NDArray :param dec_num: number of decimation levels (loops) :type dec_num: int :param wavelet: wavelet to use :type wavelet: str :param sigma: description :type sigma: float :param direction: direction of stripes in input to be removed, defaults to 'horizontal' :type direction: str, optional :param verbose: show progress through loop, defaults to False :type verbose: bool, optional :raises ValueError: if direction isn't 'horizontal' or 'vertical' :return: input image with stripes removed :rtype: NDArray

Source code in packages/pyspim/src/pyspim/filter/fftwavelet.py
def remove_stripes(
    im: NDArray,
    dec_num: int,
    wavelet: str,
    sigma: float,
    direction: str = "horizontal",
    verbose: bool = False,
) -> NDArray:
    """remove stripes from input image by the FFTWavelet algorithm

    :param im: input image
    :type im: NDArray
    :param dec_num: number of decimation levels (loops)
    :type dec_num: int
    :param wavelet: wavelet to use
    :type wavelet: str
    :param sigma: _description_
    :type sigma: float
    :param direction: direction of stripes in input to be removed, defaults to 'horizontal'
    :type direction: str, optional
    :param verbose: show progress through loop, defaults to False
    :type verbose: bool, optional
    :raises ValueError: if direction isn't `'horizontal'` or `'vertical'`
    :return: input image with stripes removed
    :rtype: NDArray
    """
    input_is_gpu = cupy.get_array_module(im) == cupy
    if input_is_gpu:
        warnings.warn("remove_stripes is computed on the cpu, will move gpu->cpu->gpu")
    im = im.get() if input_is_gpu else im
    dir_char = direction.lower()[0]
    scales = []
    prev_cA = im.copy()
    for _ in trange(dec_num, desc="dwt") if verbose else range(dec_num):
        (cA, (cH, cV, cD)) = dwt2(prev_cA, wavelet)
        if dir_char == "v" or dir_char == "c":
            fCv = numpy.fft.fftshift(numpy.fft.fft2(cV))
            my, mx = fCv.shape
            flrmy2 = numpy.floor(my / 2)
            denom = 2 * numpy.square(sigma)
            damp = 1 - numpy.exp(
                -numpy.square(numpy.arange(-flrmy2, -flrmy2 + my)) / denom
            )
            fCv *= numpy.repeat(damp[:, None], mx, 1)
            cV = numpy.fft.ifft(numpy.fft.ifftshift(fCv))
        elif dir_char == "h" or dir_char == "r":
            fCh = numpy.fft.fftshift(numpy.fft.fft2(cH))
            my, mx = fCh.shape
            flrmx2 = numpy.floor(mx / 2)
            denom = 2 * numpy.square(sigma)
            damp = 1 - numpy.exp(
                -numpy.square(numpy.arange(-flrmx2, -flrmx2 + mx)) / denom
            )
            fCh *= numpy.repeat(damp[None, :], my, 0)
            cH = numpy.fft.ifft(numpy.fft.ifftshift(fCh))
        else:
            raise ValueError("invalid direction")
        prev_cA = cA.copy()
        scales.insert(0, (cH, cV, cD))
    recon = prev_cA
    for dec in trange(dec_num, desc="idwt") if verbose else range(dec_num):
        cH, cV, cD = scales[dec]
        recon = recon[: cH.shape[0], : cH.shape[1]]
        recon = idwt2((recon, (cH, cV, cD)), wavelet)
    recon = numpy.real(recon)
    if input_is_gpu:
        return cupy.asarray(recon)
    return recon

fsc

Modules
checkerboard

image/volume splitting by checkerboard subsampling

Functions
checkerboard_split(arr: NDArray, split: int) -> Tuple[NDArray, NDArray]

checkerboard split images or volumes for FSC analysis generate pairs of images for FS(R)C analysis by subsampling them, as described in Fig. 2(a) of [1]

References

[1] Koho, et al. "Fourier ring correlation..." doi:10.1038/s41467-019-11024-z

:param arr: input image/volume :type arr: NDArray :param split: type of split to do :type split: int :returns: checkerboard-subsampled halves of input array :rtype: Tuple[NDArray,NDArray]

Source code in packages/pyspim/src/pyspim/fsc/checkerboard.py
def checkerboard_split(arr: NDArray, split: int) -> Tuple[NDArray, NDArray]:
    """checkerboard split images or volumes for FSC analysis
        generate pairs of images for FS(R)C analysis by subsampling
        them, as described in Fig. 2(a) of [1]

    References
    ---
    [1] Koho, et al. "Fourier ring correlation..." doi:10.1038/s41467-019-11024-z

    :param arr: input image/volume
    :type arr: NDArray
    :param split: type of split to do
    :type split: int
    :returns: checkerboard-subsampled halves of input array
    :rtype: Tuple[NDArray,NDArray]
    """
    if len(arr.shape) == 3:
        return _checkerboard_split_vol(arr, split)
    elif len(arr.shape) == 2:
        return _checkerboard_split_img(arr, split)
    else:
        raise ValueError("input arr must be 2- or 3-dimensional")
main
Functions
fourier_ring_correlation(im1: NDArray, im2: NDArray, bin_edges: Sequence, pixel_size: Optional[float] = None) -> FRCOutput

compute Fourier Ring Correlation for pair of input images pair should be checkerboard-subsampled pair of images originating from a single image

:param im1: image 1 :type im1: NDArray :param im2: image 2 :type im2: NDArray :param bin_edges: edges of frequency bins that FRC is calculated over :type bin_edges: Sequence :param pixel_size: pixel size, in real-space units. if None, output spatial frequencies will be in units pix^{-1} if specified, output spatial frequencies will have correct units :type pixel_size: Optional[float] :returns: FRC values and corresponding spatial frequencies :rtype: Tuple[NDArray,NDArray]

Source code in packages/pyspim/src/pyspim/fsc/main.py
def fourier_ring_correlation(
    im1: NDArray, im2: NDArray, bin_edges: Sequence, pixel_size: Optional[float] = None
) -> FRCOutput:
    """compute Fourier Ring Correlation for pair of input images
        pair should be checkerboard-subsampled pair of images originating
        from a single image

    :param im1: image 1
    :type im1: NDArray
    :param im2: image 2
    :type im2: NDArray
    :param bin_edges: edges of frequency bins that FRC is calculated over
    :type bin_edges: Sequence
    :param pixel_size: pixel size, in real-space units.
        if `None`, output spatial frequencies will be in units pix^{-1}
        if specified, output spatial frequencies will have correct units
    :type pixel_size: Optional[float]
    :returns: FRC values and corresponding spatial frequencies
    :rtype: Tuple[NDArray,NDArray]
    """
    pixel_size = 1.0 if pixel_size is None else pixel_size
    assert len(im1.shape) == 2 and len(im2.shape) == 2, "both inputs must be 2D images"
    # figure out auto-dispatch
    xp = cupy.get_array_module(im1)
    assert xp == cupy.get_array_module(im2), (
        "both images (`im1` & `im2`) must be on same device"
    )
    # compute fourier transform
    fft1 = xp.fft.fftshift(xp.fft.fft2(im1))
    fft2 = xp.fft.fftshift(xp.fft.fft2(im2))
    # figure out center coordinate of fft
    ysze, xsze = fft1.shape
    yc, xc = ysze / 2.0, xsze / 2.0
    # formulate a grid of coordinates & use to calculate radial
    # distance from center at each point in the image
    grid = xp.meshgrid(xp.arange(xsze) - xc, xp.arange(ysze) - yc, indexing="xy")
    R = xp.sqrt(functools.reduce(operator.add, map(xp.square, grid)))
    # calculate spatial frequencies (in real units) being probed
    nyq_frq = xp.floor(im1.shape[0] * pixel_size / 2.0)  # Nyquist frequency
    bin_centers = (bin_edges[1:] + bin_edges[:-1]) / 2.0
    spt_frq = bin_centers / nyq_frq
    frc, npts = __fourier_correlation(fft1, fft2, R, bin_edges)
    return frc, spt_frq
fourier_shell_correlation(vol1: NDArray, vol2: NDArray, bin_edges: Sequence, voxel_size: Optional[float] = None) -> FSCOutput

compute Fourier Shell Correlation for pair of input volumes pair should be checkerboard-subsampled pair of volumes originating from a single volume

:param vol1: volume 1 :type vol1: NDArray :param vol2: volume 2 :type vol2: NDArray :param bin_edges: :type bin_edges: Sequence :param voxel_size: :type voxel_size: Optional[float] :returns: :rtype: Tuple[NDArray,NDArray,NDArray]

Source code in packages/pyspim/src/pyspim/fsc/main.py
def fourier_shell_correlation(
    vol1: NDArray,
    vol2: NDArray,
    bin_edges: Sequence,
    voxel_size: Optional[float] = None,
) -> FSCOutput:
    """compute Fourier Shell Correlation for pair of input volumes
        pair should be checkerboard-subsampled pair of volumes originating
        from a single volume

    :param vol1: volume 1
    :type vol1: NDArray
    :param vol2: volume 2
    :type vol2: NDArray
    :param bin_edges:
    :type bin_edges: Sequence
    :param voxel_size:
    :type voxel_size: Optional[float]
    :returns:
    :rtype: Tuple[NDArray,NDArray,NDArray]
    """
    voxel_size = 1.0 if voxel_size is None else voxel_size
    assert len(vol1.shape) == 3 and len(vol2.shape) == 3, (
        "both inputs must be 3D volumes"
    )
    # figure out auto-dispatch
    xp = cupy.get_array_module(vol1)
    assert xp == cupy.get_array_module(vol2), (
        "both images (`vol1` & `vol2`) must be on same device"
    )
    # compute fourier transform
    fft1 = xp.fft.fftshift(xp.fft.fftn(vol1))
    fft2 = xp.fft.fftshift(xp.fft.fftn(vol2))
    # figure out center coordinate of fft
    zsze, ysze, xsze = fft1.shape
    zc, yc, xc = zsze / 2.0, ysze / 2.0, xsze / 2.0
    # formulate a grid of coordinates & use to calculate radial
    # distance from center at each voxel in volume
    grid = xp.meshgrid(
        xp.arange(xsze) - xc, xp.arange(ysze) - yc, xp.arange(zsze) - zc, indexing="xy"
    )
    R = xp.sqrt(functools.reduce(operator.add, map(xp.square, grid)))
    # calculate spatial frequencies (in real units) being probed
    nyq_frq = xp.floor(vol1.shape[0] * voxel_size / 2.0)  # Nyquist frequency
    bin_centers = (bin_edges[1:] + bin_edges[:-1]) / 2.0
    spt_frq = bin_centers / nyq_frq
    fsc, npts = __fourier_correlation(fft1, fft2, R, bin_edges)
    return fsc, spt_frq, npts

interp

Functions
Modules
affine
Functions
output_shape_for_transform(T: NDArray, input_shape: tuple[int, int, int]) -> Tuple[int, int, int]

Calculate output shape of transformed volume.

Parameters:

Name Type Description Default
T NDArray

affine transform matrix

required
input_shape Iterable

shape of input volume (ZRC)

required

Returns:

Type Description
Tuple[int, int, int]

Tuple[int,int,int]: output shape (ZRC)

Source code in packages/pyspim/src/pyspim/interp/affine.py
def output_shape_for_transform(
    T: NDArray, input_shape: tuple[int,int,int],
) -> Tuple[int, int, int]:
    """Calculate output shape of transformed volume.

    Args:
        T (NDArray): affine transform matrix
        input_shape (Iterable): shape of input volume (ZRC)

    Returns:
        Tuple[int,int,int]: output shape (ZRC)
    """
    t = T.get() if cupy.get_array_module(T) == cupy else T # type: ignore
    coord = list(product(*[(0, s) for s in input_shape[::-1]]))
    coord = numpy.asarray(coord).T
    coord = numpy.vstack([coord, numpy.zeros_like(coord[0, :])])
    coordT = (t @ coord)[:-1, :]
    ptp = numpy.ceil(numpy.ptp(coordT, axis=1))
    x, y, z = int(ptp[0]), int(ptp[1]), int(ptp[2])
    return z, y, x
output_shape_for_inv_transform(T: NDArray, input_shape: tuple[int, int, int]) -> Tuple[int, int, int]

Calculate output shape of (inverse)-transformed volume.

Parameters:

Name Type Description Default
T NDArray

affine transform matrix (to be inverted)

required
input_shape Iterable

shape of input volume (ZRC)

required

Returns:

Type Description
Tuple[int, int, int]

Tuple[int,int,int]: shape of output volume (ZRC)

Source code in packages/pyspim/src/pyspim/interp/affine.py
def output_shape_for_inv_transform(
    T: NDArray, input_shape: tuple[int,int,int]
) -> Tuple[int, int, int]:
    """Calculate output shape of (inverse)-transformed volume.

    Args:
        T (NDArray): affine transform matrix (to be inverted)
        input_shape (Iterable): shape of input volume (ZRC)

    Returns:
        Tuple[int,int,int]: shape of output volume (ZRC)
    """
    xp = cupy.get_array_module(T)
    fwd = xp.linalg.inv(T).get() if xp == cupy else xp.linalg.inv(T) # type: ignore
    return output_shape_for_transform(fwd, input_shape)
transform(A: NDArray, T: NDArray, interp_method: str, preserve_dtype: bool, out_shp: Tuple[int, int, int] | None, block_size_z: int, block_size_y: int, block_size_x: int, gpu_id: int = 0, n_gpu: int = 1) -> cupy.ndarray

Apply affine transform to input volume.

Parameters:

Name Type Description Default
A NDArray

input volume (ZRC)

required
T NDArray

affine transform matrix (4x4), rows of matrix corresp. to XYZ

required
interp_method str

interpolation method to use when interpolating points in the transformed volume. One of 'nearest','linear','cubspl'.

required
preserve_dtype bool

make output datatype match that of the input (uint16), if False, output is single-precision float.

required
out_shp Tuple[int, int, int] | None

shape of output volume. if None, will be calculated by this function

required
block_size_z int

size of kernel launch block, in z dimension

required
block_size_y int

size of kernel launch block, in y dimension

required
block_size_x int

size of kernel launch block, in x dimension

required

Raises:

Type Description
ValueError

if input is not a cupy.ndarray

Returns:

Type Description
ndarray

cupy.ndarray: transformed volume

Notes

The input transform, T, should be the forward transform mapping the input to the desired output domain. Internally, this function takes the inverse of the specified transform and then iterates over the output domain to extract values from the input array.

Source code in packages/pyspim/src/pyspim/interp/affine.py
def transform(
    A: NDArray,
    T: NDArray,
    interp_method: str,
    preserve_dtype: bool,
    out_shp: Tuple[int, int, int] | None,
    block_size_z: int,
    block_size_y: int,
    block_size_x: int,
    gpu_id: int = 0,
    n_gpu: int = 1,
) -> cupy.ndarray:
    """Apply affine transform to input volume.

    Args:
        A (NDArray): input volume (ZRC)
        T (NDArray): affine transform matrix (4x4), rows of matrix corresp. to XYZ
        interp_method (str): interpolation method to use when interpolating points in the transformed volume. One of ``'nearest','linear','cubspl'``.
        preserve_dtype (bool): make output datatype match that of the input (uint16), if False, output is single-precision float.
        out_shp (Tuple[int,int,int] | None): shape of output volume. if ``None``, will be calculated by this function
        block_size_z (int): size of kernel launch block, in z dimension
        block_size_y (int): size of kernel launch block, in y dimension
        block_size_x (int): size of kernel launch block, in x dimension

    Raises:
        ValueError: if input is not a ``cupy.ndarray``

    Returns:
        cupy.ndarray: transformed volume

    Notes:
        The input transform, T, should be the forward transform mapping the input to the desired output domain. Internally, this function takes the inverse of the specified transform and then iterates over the output domain to extract values from the input array.
    """
    if cupy.get_array_module(A) == cupy: # type: ignore
        kernel = _get_kernel(A.dtype, interp_method, preserve_dtype)
        if out_shp is None:
            out_shp = output_shape_for_transform(T, A.shape)
        launch_params = launch_params_for_volume(
            out_shp, block_size_z, block_size_y, block_size_x
        )
        T = cupy.linalg.inv(cupy.asarray(T)).astype(cupy.float32)
        # preallocate output and call kernel
        out_dtype = A.dtype if preserve_dtype else cupy.float32
        out = cupy.zeros(out_shp, dtype=out_dtype)
        kernel(
            launch_params[0], launch_params[1], 
            (out, A, T, *out_shp, *A.shape, gpu_id, n_gpu)
        )
        return out
    else:
        raise ValueError("only works on cupy arrays")

isotropize

Isotropization utilities.

Individual views from diSPIM volumes are typically acquired with uneven spacing in one of the directions, but analysis requires cubic pixels. These are utilities to upsample the axis with uneven spacing relative to the camera pixel size.

This can be done either by direct interpolation (interpolate) or by upsampling in Fourier space (fourier_upsample), using the technique described in [1].

References

[1] Stein, et al. "Fourier interpolation stochastic..." doi:10.1364/OE.23.016154

Functions
interpolate(vol: NDArray, pixel_size: float, step_size: float, axis: int = 0, **kwargs) -> NDArray

interpolate Upsample an axis by interpolation. Function will upsample specified axis by assuming it has spacing step_size such that the output voxel is cubic, pixel_size**3.

Parameters:

Name Type Description Default
vol NDArray

input volume

required
pixel_size float

pixel size in real (space) units

required
step_size float

step size in real (space) units

required
axis int

axis to upsample. Defaults to 0.

0
**kwargs

passed to ndimage.zoom

{}

Returns:

Name Type Description
NDArray NDArray
Source code in packages/pyspim/src/pyspim/isotropize.py
def interpolate(
    vol: NDArray, pixel_size: float, step_size: float, axis: int = 0, **kwargs
) -> NDArray:
    """interpolate Upsample an axis by interpolation. Function will upsample specified `axis` by assuming it has spacing `step_size` such that the output voxel is cubic, `pixel_size**3`.

    Args:
        vol (NDArray): input volume
        pixel_size (float): pixel size in real (space) units
        step_size (float): step size in real (space) units
        axis (int, optional): axis to upsample. Defaults to 0.
        **kwargs: passed to `ndimage.zoom`

    Returns:
        NDArray:
    """

    n_dim = len(vol.shape)
    assert axis < n_dim and axis >= 0, "axis must be in range [0, len(vol.shape))"
    ndi = get_ndimage_module(vol)
    resc = [
        1.0,
    ] * n_dim
    resc[axis] = step_size / pixel_size
    return ndi.zoom(vol, resc, **kwargs)
fourier_upsample(vol: NDArray, pixel_size: float, step_size: float, axis: int = 0) -> NDArray

fourier_upsample Upsample an axis by Fourier upsampling.

Parameters:

Name Type Description Default
vol NDArray

input volume

required
pixel_size float

pixel size in real (space) units.

required
step_size float

step size in real (space) units

required
axis int

axis to upsample. Defaults to 0.

0

Returns:

Type Description
NDArray

NDArray

Source code in packages/pyspim/src/pyspim/isotropize.py
def fourier_upsample(
    vol: NDArray, pixel_size: float, step_size: float, axis: int = 0
) -> NDArray:
    """fourier_upsample Upsample an axis by Fourier upsampling.

    Args:
        vol (NDArray): input volume
        pixel_size (float): pixel size in real (space) units.
        step_size (float): step size in real (space) units
        axis (int, optional): axis to upsample. Defaults to 0.

    Returns:
        NDArray
    """

    n_dim = len(vol.shape)
    assert n_dim == 2 or n_dim == 3, "input must be a 2D image or 3D volume"
    assert axis < n_dim and axis >= 0, "axis must be in range [0, len(vol.shape))"
    xp = cupy.get_array_module(vol)
    fft = get_fft_module(vol)
    # mirror the volume on the relevant axis and convert to fourier domain
    ax_sze = vol.shape[axis]
    mirror_padding = [(0, 0) for _ in range(n_dim)]
    mirror_padding[axis] = (ax_sze // 2, ax_sze // 2)
    fourier = fft.fftshift(fft.rfftn(xp.pad(vol, mirror_padding, mode="symmetric")))
    # calculate how much we need to pad to make the upsampling work
    # see Eqn. 9 of fSOFI paper
    n12 = math.ceil((2 * ax_sze - 1) / 2)
    delta = int(math.ceil((step_size / pixel_size) * n12 - n12))
    # pad fourier domain image with zeros
    zero_padding = [(0, 0) for _ in range(n_dim)]
    zero_padding[axis] = (delta, delta)
    fourier = xp.pad(fourier, zero_padding, mode="constant", constant_values=0)
    out = fft.irfftn(fft.ifftshift(fourier))
    # trim out extra stuff generated by the padding
    n_mid = math.floor(step_size / pixel_size * ax_sze)
    nmh = n_mid // 2
    cent = out.shape[axis]
    if n_dim == 3:
        if axis == 0:
            return out[cent - nmh : cent + nmh, ...]
        elif axis == 1:
            return out[:, cent - nmh, cent + nmh, :]
        else:
            return out[..., cent - nmh : cent + nmh]
    else:  # n_dim == 2
        if axis == 0:
            return out[cent - nmh : cent + nmh, :]
        else:
            return out[:, cent - nmh : cent + nmh]

opt

Modules
powell

Powell's method

psf

Modules
gauss
Functions
gaussian_fwhm(sigma: float) -> float

gaussian_fwhm Compute full-width at half-max for Gaussian from std. dev.

Parameters:

Name Type Description Default
sigma float

standard deviation of Gaussian

required

Returns:

Type Description
float

float

Source code in packages/pyspim/src/pyspim/psf/gauss.py
def gaussian_fwhm(sigma: float) -> float:
    """gaussian_fwhm Compute full-width at half-max for Gaussian from std. dev.

    Args:
        sigma (float): standard deviation of Gaussian

    Returns:
        float
    """
    return 2 * numpy.sqrt(2 * numpy.log(2)) * sigma

reg

Functions
Modules
mutinfo
Functions
mutual_information(ref: NDArray, tar: NDArray, tM: NDArray, nbin_ref: int, nbin_tar: int) -> float

mutual information between input images ref & tar where tar is first transformed by matrix tM

:param ref: reference volume :type ref: NDArray :param tar: target (AKA "moving") volume :type tar: NDArray :param tM: matrix to transform tar with :type tM: NDArray :param nbin_ref: # of bins in histogram of ref :type nbin_ref: int :param nbin_tar: # of bins in histogram of tar :type nbin_tar: int :returns: mutual information between ref and transformed tar :rtype: float

Source code in packages/pyspim/src/pyspim/reg/mutinfo.py
def mutual_information(
    ref: NDArray, tar: NDArray, tM: NDArray, nbin_ref: int, nbin_tar: int
) -> float:
    """mutual information between input images `ref` & `tar` where `tar`
        is first transformed by matrix `tM`

    :param ref: reference volume
    :type ref: NDArray
    :param tar: target (AKA "moving") volume
    :type tar: NDArray
    :param tM: matrix to transform `tar` with
    :type tM: NDArray
    :param nbin_ref: # of bins in histogram of `ref`
    :type nbin_ref: int
    :param nbin_tar: # of bins in histogram of `tar`
    :type nbin_tar: int
    :returns: mutual information between `ref` and transformed `tar`
    :rtype: float
    """
    ref_tex, tex_arr = create_texture_object(ref, "border", "linear", "element_type")
    # j1, jx = _preprocess_image_pair_reftex(
    #    ref_tex, tar, tM, nbin_ref, nbin_tar
    # )
    # compute entropy of target
    P_tar, _ = cupy.histogram(tar, nbin_tar)
    P_tar = P_tar / cupy.sum(P_tar) + cupy.finfo(float).eps
    H_tar = discrete_entropy(P_tar) / cupy.log2(nbin_tar)
    mi = _mutual_information_precomp_target(
        ref_tex, tar, cupy.eye(4), nbin_ref, nbin_tar, H_tar
    )
    del ref_tex, tex_arr
    return mi
entropy_correlation_coeff(ref: NDArray, tar: NDArray, tM: NDArray, nbin_ref: int, nbin_tar: int) -> float

entropy correlation coefficient between input images ref & tar where tar is first transformed by matrix tM

:param ref: reference volume :type ref: NDArray :param tar: target (AKA "moving") volume :type tar: NDArray :param tM: matrix to transform tar with :type tM: NDArray :param nbin_ref: # of bins in histogram of ref :type nbin_ref: int :param nbin_tar: # of bins in histogram of tar :type nbin_tar: int :returns: entropy correlation coefficient between ref and transformed tar :rtype: float

Source code in packages/pyspim/src/pyspim/reg/mutinfo.py
def entropy_correlation_coeff(
    ref: NDArray, tar: NDArray, tM: NDArray, nbin_ref: int, nbin_tar: int
) -> float:
    """entropy correlation coefficient between input images `ref` & `tar`
        where `tar` is first transformed by matrix `tM`

    :param ref: reference volume
    :type ref: NDArray
    :param tar: target (AKA "moving") volume
    :type tar: NDArray
    :param tM: matrix to transform `tar` with
    :type tM: NDArray
    :param nbin_ref: # of bins in histogram of `ref`
    :type nbin_ref: int
    :param nbin_tar: # of bins in histogram of `tar`
    :type nbin_tar: int
    :returns: entropy correlation coefficient between `ref` and transformed `tar`
    :rtype: float
    """
    ref_tex, tex_arr = create_texture_object(ref, "border", "linear", "element_type")
    # j1, jx = _preprocess_image_pair_reftex(
    #    ref_tex, tar, tM, nbin_ref, nbin_tar
    # )
    # compute entropy of target
    P_tar, _ = cupy.histogram(tar, nbin_tar)
    P_tar = P_tar / cupy.sum(P_tar) + cupy.finfo(float).eps
    H_tar = discrete_entropy(P_tar) / cupy.log2(nbin_tar)
    ec = _entropy_correlation_coeff_precomp_target(
        ref_tex, tar, tM, nbin_ref, nbin_tar, H_tar
    )
    del ref_tex, tex_arr
    return ec
pcc
Functions
translation(ref: NDArray, mov: NDArray, upsample_factor: int = 1) -> NDArray

Determine translation between ref and mov.

:param ref: reference image :type ref: NDArray :param mov: image to register. must be same dimensionality as ref :type mov: NDArray :param upsample_factor: upsampling factor, images are registered to 1/upsample_factor. defaults to 1. :type upsample_factor: int :returns: translation along each axis :rtype: NDArray

Source code in packages/pyspim/src/pyspim/reg/pcc.py
def translation(ref: NDArray, mov: NDArray, upsample_factor: int = 1) -> NDArray:
    """Determine translation between `ref` and `mov`.

    :param ref: reference image
    :type ref: NDArray
    :param mov: image to register. must be same dimensionality as `ref`
    :type mov: NDArray
    :param upsample_factor: upsampling factor, images are registered to
        `1/upsample_factor`. defaults to 1.
    :type upsample_factor: int
    :returns: translation along each axis
    :rtype: NDArray
    """
    # get relevant modules for cpu/gpu
    xp = cupy.get_array_module(ref)
    fft = get_fft_module(xp)
    if any([rd != md for rd, md in zip(ref.shape, mov.shape)]):
        ref, mov = pad_to_same_size(ref, mov, style="right")
    # compute fft's and then do phase cross-correlation
    ref_fft = fft.fft2(ref)
    mov_fft = fft.fft2(mov)
    if xp == cupy: # type: ignore
        ref_fft, mov_fft = ref_fft.get(), mov_fft.get()
    trans, _, _ = skimage.registration.phase_cross_correlation(
        ref_fft,
        mov_fft,
        space="fourier",
        upsample_factor=upsample_factor,
    )
    return trans
scale_rotation(ref: NDArray, mov: NDArray, upsample_factor: int = 1) -> Tuple[float, float]

determine scaling & rotation between ref & mov using phase cross correlation of the log-polar transformed inputs

:param ref: reference image :type ref: NDArray :param mov: image to register. must be same dimensionality as ref :type mov: NDArray :param upsample_factor: upsampling factor, images are registered to 1/upsample_factor. defaults to 1. :type upsample_factor: int :returns: rotation (in degrees) and scale difference between ref & mov :rtype: Tuple[float,float]

Source code in packages/pyspim/src/pyspim/reg/pcc.py
def scale_rotation(
    ref: NDArray, mov: NDArray, upsample_factor: int = 1
) -> Tuple[float, float]:
    """determine scaling & rotation between `ref` & `mov` using phase cross
        correlation of the log-polar transformed inputs

    :param ref: reference image
    :type ref: NDArray
    :param mov: image to register. must be same dimensionality as `ref`
    :type mov: NDArray
    :param upsample_factor: upsampling factor, images are registered to
        `1/upsample_factor`. defaults to 1.
    :type upsample_factor: int
    :returns: rotation (in degrees) and scale difference between `ref` & `mov`
    :rtype: Tuple[float,float]
    """
    xp = cupy.get_array_module(ref)
    if xp == cupy: # type: ignore
        ref, mov = ref.get(), mov.get() # type: ignore (we know there are cupy)
    if any([rd != md for rd, md in zip(ref.shape, mov.shape)]):
        ref, mov = pad_to_same_size(ref, mov, style="right")
    radius = min([min(ref.shape), min(mov.shape)])
    ref_polar = skimage.transform.warp_polar(ref, radius=radius, scaling="log")
    mov_polar = skimage.transform.warp_polar(mov, radius=radius, scaling="log")
    shift, err, _ = skimage.registration.phase_cross_correlation(
        ref_polar, mov_polar, upsample_factor=upsample_factor
    )
    cc_rot = shift[0]
    scale = 1 / (math.exp(shift[1] / (radius / math.log(radius))))
    return scale, cc_rot
rotation_scale_for_volumes(ref: NDArray, mov: NDArray, upsample_factor: int = 1.0)

calculate rotation, and scaling between input volumes by phase cross correlation of the maximum projections in each axis

:param ref: reference volume :type ref: NDArray :param mov: 'moving' volume to be registered to ref :type mov: NDArray :param upsample_factor: upsampling factor. images are registered to 1/upsample_factor. defaults to 1 :type upsample_factor: float :returns: rotation and scaling for each axis :rtype: Tuple[NDArray,NDArray]

Source code in packages/pyspim/src/pyspim/reg/pcc.py
def rotation_scale_for_volumes(
    ref: NDArray, mov: NDArray, upsample_factor: int = 1.0
):
    """calculate rotation, and scaling between input volumes by
        phase cross correlation of the maximum projections in each axis

    :param ref: reference volume
    :type ref: NDArray
    :param mov: 'moving' volume to be registered to `ref`
    :type mov: NDArray
    :param upsample_factor: upsampling factor. images are registered to
        `1/upsample_factor`. defaults to 1
    :type upsample_factor: float
    :returns: rotation and scaling for each axis
    :rtype: Tuple[NDArray,NDArray]
    """
    xp = cupy.get_array_module(ref)
    # calculate max-projections for XY
    ref_xy, mov_xy = xp.amax(ref, axis=0), xp.amax(mov, axis=0)
    scale_xy, rot_xy = scale_rotation(ref_xy, mov_xy, upsample_factor)
    # repeat for ZX
    ref_zx, mov_zx = xp.amax(ref, axis=1), xp.amax(mov, axis=1)
    scale_zx, rot_zx = scale_rotation(ref_zx, mov_zx, upsample_factor)
    # and again for ZY
    ref_zy, mov_zy = xp.amax(ref, axis=2), xp.amax(mov, axis=2)
    scale_zy, rot_zy = scale_rotation(ref_zy, mov_zy, upsample_factor)
    # rotations & scalings are scalars, just concat them
    rot = numpy.asarray([rot_zy, rot_zx, rot_xy])
    scale = numpy.asarray([scale_zy, scale_zx, scale_xy])
    return [float(r) for r in rot[::-1]], [float(s) for s in scale[::-1]]
translation_for_volumes(ref: NDArray, mov: NDArray, upsample_factor: int = 1)

calculate relative translation, rotation, and scaling between input volumes by phase cross correlation of the maximum projections in each axis

:param ref: reference volume :type ref: NDArray :param mov: 'moving' volume to be registered to ref :type mov: NDArray :param upsample_factor: upsampling factor. images are registered to 1/upsample_factor. defaults to 1 :type upsample_factor: float :returns: translation, rotation, and scaling for each axis :rtype: Tuple[NDArray,NDArray,NDArray]

Source code in packages/pyspim/src/pyspim/reg/pcc.py
def translation_for_volumes(ref: NDArray, mov: NDArray, upsample_factor: int = 1):
    """calculate relative translation, rotation, and scaling between input
        volumes by phase cross correlation of the maximum projections
        in each axis

    :param ref: reference volume
    :type ref: NDArray
    :param mov: 'moving' volume to be registered to `ref`
    :type mov: NDArray
    :param upsample_factor: upsampling factor. images are registered to
        `1/upsample_factor`. defaults to 1
    :type upsample_factor: float
    :returns: translation, rotation, and scaling for each axis
    :rtype: Tuple[NDArray,NDArray,NDArray]
    """
    xp = cupy.get_array_module(ref)
    # calculate max-projections for XY
    ref_xy = xp.amax(ref, axis=0)
    mov_xy = xp.amax(mov, axis=0)
    trans_xy = translation(ref_xy, mov_xy, upsample_factor)
    trans_xy = numpy.concatenate([numpy.asarray([numpy.inf]), numpy.asarray(trans_xy)])
    # repeat for ZX
    ref_zx = xp.amax(ref, axis=1)
    mov_zx = xp.amax(mov, axis=1)
    trans_zx = translation(ref_zx, mov_zx, upsample_factor)
    trans_zx = numpy.concatenate(
        [
            numpy.asarray([trans_zx[0]]),
            numpy.asarray([numpy.inf]),
            numpy.asarray([trans_zx[1]]),
        ]
    )
    # and again for ZY
    ref_zy = xp.amax(ref, axis=2)
    mov_zy = xp.amax(mov, axis=2)
    trans_zy = translation(ref_zy, mov_zy, upsample_factor)
    trans_zy = numpy.concatenate([numpy.asarray(trans_zy), numpy.asarray([numpy.inf])])
    # assemble outputs
    # starting translation is just whichever the smallest one
    # calculated for each dimension
    trans = numpy.amin(numpy.vstack([trans_xy, trans_zx, trans_zy]), axis=0)
    # NOTE: _ab corresponds to the translation for the 'c' axis so right
    # now its formatted like ZRC
    return [float(t) for t in trans[::-1]]  # this makes output XYZ
translation_rotation_scale_for_volumes(ref: NDArray, mov: NDArray, upsample_factor: int = 1)

calculate relative translation, rotation, and scaling between input volumes by phase cross correlation of the maximum projections in each axis

:param ref: reference volume :type ref: NDArray :param mov: 'moving' volume to be registered to ref :type mov: NDArray :param upsample_factor: upsampling factor. images are registered to 1/upsample_factor. defaults to 1 :type upsample_factor: float :returns: translation, rotation, and scaling for each axis :rtype: Tuple[NDArray,NDArray,NDArray]

Source code in packages/pyspim/src/pyspim/reg/pcc.py
def translation_rotation_scale_for_volumes(
    ref: NDArray, mov: NDArray, upsample_factor: int = 1
):
    """calculate relative translation, rotation, and scaling between input
        volumes by phase cross correlation of the maximum projections
        in each axis

    :param ref: reference volume
    :type ref: NDArray
    :param mov: 'moving' volume to be registered to `ref`
    :type mov: NDArray
    :param upsample_factor: upsampling factor. images are registered to
        `1/upsample_factor`. defaults to 1
    :type upsample_factor: float
    :returns: translation, rotation, and scaling for each axis
    :rtype: Tuple[NDArray,NDArray,NDArray]
    """
    xp = cupy.get_array_module(ref)
    # calculate max-projections for XY
    ref_xy = xp.amax(ref, axis=0)
    mov_xy = xp.amax(mov, axis=0)
    trans_xy = translation(ref_xy, mov_xy, upsample_factor)
    trans_xy = numpy.concatenate([numpy.asarray([numpy.inf]), numpy.asarray(trans_xy)])
    scale_xy, rot_xy = scale_rotation(ref_xy, mov_xy, upsample_factor)
    # repeat for ZX
    ref_zx = xp.amax(ref, axis=1)
    mov_zx = xp.amax(mov, axis=1)
    trans_zx = translation(ref_zx, mov_zx, upsample_factor)
    trans_zx = numpy.concatenate(
        [
            numpy.asarray([trans_zx[0]]),
            numpy.asarray([numpy.inf]),
            numpy.asarray([trans_zx[1]]),
        ]
    )
    scale_zx, rot_zx = scale_rotation(ref_zx, mov_zx, upsample_factor)
    # and again for ZY
    ref_zy = xp.amax(ref, axis=2)
    mov_zy = xp.amax(mov, axis=2)
    trans_zy = translation(ref_zy, mov_zy, upsample_factor)
    trans_zy = numpy.concatenate([numpy.asarray(trans_zy), numpy.asarray([numpy.inf])])
    scale_zy, rot_zy = scale_rotation(ref_zy, mov_zy, upsample_factor)
    # assemble outputs
    # starting translation is just whichever the smallest one
    # calculated for each dimension
    trans = numpy.amin(numpy.vstack([trans_xy, trans_zx, trans_zy]), axis=0)[::-1]
    # rotations & scalings are scalars, just concat them
    rot = numpy.asarray([rot_zy, rot_zx, rot_xy])[::-1]
    scale = numpy.asarray([scale_zy, scale_zx, scale_xy])[::-1]
    return trans, rot, scale
powell
Functions
optimize_affine(ref: cupy.ndarray, mov: cupy.ndarray, metric: str, transform: str, interp_method: str, par0: List[float], bounds: Optional[Union[OptBounds, OptBoundMargins]] = None, kernel_launch_params: Optional[CuLaunchParameters] = None, verbose: bool = False, **opt_kwargs)

Find optimal affine transform to register mov with ref by Powell's method

**opt_kwargs are passed to scipy.optimize.minimize

:param ref: reference volume :type ref: cupy.ndarray :param mov: moving volume, to be registered :type mov: cupy.ndarray :param metric: metric to optimize for registration one of ('nip', 'cr', 'ncc'). 'nip' : normalized inner product 'cr' : correlation ratio 'ncc' : normalized cross correlation :type metric: str :param transform: transform to optimize :type transform: str :param interp: interpolation method to use during transformation one of ('linear', 'cubspl') 'linear' : trilinear interpolation 'cubspl' : cubic b-spline interpolation :type interp: str :param par0: initial guess for parameters :type par0: list :param bounds: bounds for parameters :type bounds: list :param verbose: show intermediate results with tqdm progress bar :type verbose: bool :returns: transform and optimization results :rtype: Tuple[NDArray,OptimizeResult]

Source code in packages/pyspim/src/pyspim/reg/powell.py
def optimize_affine(
    ref: cupy.ndarray,
    mov: cupy.ndarray,
    metric: str,
    transform: str,
    interp_method: str,
    par0: List[float],
    bounds: Optional[Union[OptBounds, OptBoundMargins]] = None,
    kernel_launch_params: Optional[CuLaunchParameters] = None,
    verbose: bool = False,
    **opt_kwargs,
):
    """Find optimal affine transform to register `mov` with `ref` by Powell's method

    `**opt_kwargs` are passed to `scipy.optimize.minimize`

    :param ref: reference volume
    :type ref: cupy.ndarray
    :param mov: moving volume, to be registered
    :type mov: cupy.ndarray
    :param metric: metric to optimize for registration
        one of ('nip', 'cr', 'ncc').
        'nip' : normalized inner product
        'cr'  : correlation ratio
        'ncc' : normalized cross correlation
    :type metric: str
    :param transform: transform to optimize
    :type transform: str
    :param interp: interpolation method to use during transformation
        one of ('linear', 'cubspl')
        'linear' : trilinear interpolation
        'cubspl' : cubic b-spline interpolation
    :type interp: str
    :param par0: initial guess for parameters
    :type par0: list
    :param bounds: bounds for parameters
    :type bounds: list
    :param verbose: show intermediate results with tqdm progress bar
    :type verbose: bool
    :returns: transform and optimization results
    :rtype: Tuple[NDArray,OptimizeResult]
    """
    # make sure both input images are already on the GPU and are floating point
    # TODO: update so that this works on CPU, too
    assert cupy.get_array_module(ref) == cupy, "reference image must be on the GPU" # type: ignore
    assert cupy.get_array_module(mov) == cupy, "moving image must be on the GPU" # type: ignore
    # figure out coords of centroid of image
    msze_z, msze_y, msze_x = mov.shape
    cx, cy, cz = msze_x / 2.0, msze_y / 2.0, msze_z / 2.0
    # make function that will generate the transform matrix
    # also get # of params req'd
    mat_fun, postfix_fun, ipar0 = parse_transform_string(transform)
    if par0 is None:
        par0 = ipar0
    if len(par0) != len(ipar0):
        raise ValueError("invalid # of initial params for transform")
    if bounds is not None:
        if len(bounds) != len(ipar0):
            raise ValueError("invalid # of bounds for transform")
        if isinstance(bounds[0], float) or isinstance(bounds[0], int):
            bounds = [(p - b, p + b) for p, b in zip(par0, bounds)] # type: ignore
        elif isinstance(bounds[0], tuple):
            pass  # already correctly formatted
        else:
            raise ValueError("invalid bound formatting")
    # par_fun takes in the parameter vector and generates the matrix that then gets passed to the kernel
    par_fun = lambda p: mat_fun(p, cx, cy, cz).astype(float).flatten()[:12]
    # get shape of reference and moving images
    sz_ref, sy_ref, sx_ref = ref.shape
    sz_mov, sy_mov, sx_mov = mov.shape
    # figure out launch parameters
    if kernel_launch_params is None:
        block_size = 8
        kernel_launch_params = launch_params_for_volume(
            [sz_mov, sy_mov, sx_mov], block_size, block_size, block_size
        )
    # initialize the computation
    # this makes all the kernels, CUDA streams, and per-device arguments that will get used per-device.
    kernels, streams, kernel_args = kern.initialize_computation(
        metric, interp_method, ref, mov, cupy.mean(ref), cupy.mean(mov),
        sz_ref, sy_ref, sx_ref, sz_mov, sy_mov, sx_mov
    )
    # formulate optimization function
    # NOTE: scipy only has a `minimize` function and these metrics should be maximized. 
    # They are all confined to [0,1] so to minimize, do 1.0 - kernel_value.
    def opt_fun(pars: numpy.ndarray) -> float:
        T = par_fun(pars) # generate transform matrix
        val = kern.compute(
            T, kernels, streams, kernel_args, kernel_launch_params
        )
        return 1.0 - val.get()

    # do powell's registration
    opt_call = partial(
        minimize, opt_fun, x0=par0, bounds=bounds, method="powell", options=opt_kwargs
    )
    if verbose:
        def cback(x, pbar, postfix_fun):
            pbar.update(1)
            pbar.set_postfix(postfix_fun(x))
        with tqdm(desc="Registration", leave=False) as pbar:
            _cback = partial(cback, pbar=pbar, postfix_fun=postfix_fun)
            res = opt_call(callback=_cback)
    else:
        res = opt_call()
    # calculate transform
    T = mat_fun(res.x, cx, cy, cz).reshape(4, 4)
    return T, res

roi

Functions
detect_roi_2d(im: NDArray, method: str = 'triangle', quantile: float = 0, **kwargs) -> BBox2D

detect_roi_2d Determine ROI in 2D image by thresholding. If using method=threshold, the threshold=[value] keyword arg must be passed into function. Args: im (NDArray): image to find ROI in method (str, optional): thresholding method, one of ('triangle', 'otsu', 'threshold'). Defaults to 'triangle'. quantile (float, optional): quantile to truncate coordinates with (helps get rid of outliers). Defaults to 0.

Returns:

Name Type Description
BBox2D BBox2D

bounding box ROI

Source code in packages/pyspim/src/pyspim/roi.py
def detect_roi_2d(
    im: NDArray, method: str = "triangle", quantile: float = 0, **kwargs
) -> BBox2D:
    """detect_roi_2d Determine ROI in 2D image by thresholding. If using `method=threshold`, the `threshold=[value]` keyword arg must be passed into function.
    Args:
        im (NDArray): image to find ROI in
        method (str, optional): thresholding method, one of ('triangle', 'otsu', 'threshold'). Defaults to 'triangle'.
        quantile (float, optional): quantile to truncate coordinates with (helps get rid of outliers). Defaults to 0.

    Returns:
        BBox2D: bounding box ROI
    """
    assert method in ("triangle", "otsu", "threshold"), (
        "invalid segmentation method specified"
    )
    if method == "threshold":
        thresh = kwargs["threshold"]
    xp = cupy.get_array_module(im)
    ndi = get_ndimage_module(xp)
    # get rid of line artifacts
    vert_line = xp.zeros((5, 5))
    vert_line[:, 2] = 1
    thrim = ndi.grey_erosion(
        ndi.grey_erosion(im, structure=vert_line), structure=vert_line.T
    )
    # threshold the image by triangle method
    if method == "triangle":
        thresh = threshold_triangle(im)
    elif method == "otsu":
        thresh = skimage.filters.threshold_otsu(im)
    else:  # method == 'threshold'
        pass  # already defined this, above when we grabbed from **kwargs

    # dilate 3 times to remove small particles
    thrim = ndi.binary_dilation(thrim > thresh, iterations=3, brute_force=True)
    # figure out how to draw bounding box/ROI
    row, col = xp.nonzero(thrim)
    if quantile > 0:
        rq = xp.quantile(row, [quantile, 1 - quantile])
        assert len(rq) == 2, "should only return 2 values"
        rs, re = int(rq[0]), int(rq[1])
        cq = xp.quantile(col, [quantile, 1 - quantile])
        cs, ce = int(cq[0]), int(cq[1])
    else:
        rs, re = int(xp.amin(row)), int(xp.amax(row) + 1)
        cs, ce = int(xp.amin(col)), int(xp.amax(col) + 1)
    return (rs, re), (cs, ce)
detect_roi_3d(im: NDArray, method: str = 'triangle', quantile_rc: float = 0, quantile_zc: float = 0, **kwargs) -> BBox3D

detect_roi_3d Detect ROI of 3D volume by combining ROIs of 2D max-projections.

Parameters:

Name Type Description Default
im NDArray

volume to find ROI of

required
method str

thresholding method. Defaults to 'triangle'.

'triangle'
quantile_rc float

qunatile used for determining row-col bounding box. Defaults to 0.

0
quantile_zc float

quantile used for determine z-row bounding box. Defaults to 0.

0

Returns:

Name Type Description
BBox3D BBox3D

format is ((lb_z, ub_z), (lb_r,ub_r), (lb_c,ub_c))

Source code in packages/pyspim/src/pyspim/roi.py
def detect_roi_3d(
    im: NDArray,
    method: str = "triangle",
    quantile_rc: float = 0,
    quantile_zc: float = 0,
    **kwargs,
) -> BBox3D:
    """detect_roi_3d Detect ROI of 3D volume by combining ROIs of 2D max-projections.

    Args:
        im (NDArray): volume to find ROI of
        method (str, optional): thresholding method. Defaults to 'triangle'.
        quantile_rc (float, optional): qunatile used for determining row-col bounding box. Defaults to 0.
        quantile_zc (float, optional): quantile used for determine z-row bounding box. Defaults to 0.

    Returns:
        BBox3D: format is `((lb_z, ub_z), (lb_r,ub_r), (lb_c,ub_c))`
    """
    xp = cupy.get_array_module(im)
    rc = xp.amax(im, axis=0)
    (rs, re), (cs, ce) = detect_roi_2d(rc, method, quantile_rc, **kwargs)
    zc = xp.amax(im, axis=1)
    (zs, ze), _ = detect_roi_2d(zc, method, quantile_zc, **kwargs)
    return (zs, ze), (rs, re), (cs, ce)
combine_rois(a: BBox3D, b: BBox3D, ensure_even: bool = False) -> BBox3D

combine_rois Combine 2 ROIs into smallest possible one that encompasses both.

Parameters:

Name Type Description Default
a BBox3D

3d bounding box for first ROI

required
b BBox3D

3d bounding box for second ROI

required
ensure_even bool

make all dimensions of output ROI even. Defaults to False.

False

Returns:

Type Description
BBox3D

BBox3D

Source code in packages/pyspim/src/pyspim/roi.py
def combine_rois(a: BBox3D, b: BBox3D, ensure_even: bool = False) -> BBox3D:
    """combine_rois Combine 2 ROIs into smallest possible one that encompasses both.

    Args:
        a (BBox3D): 3d bounding box for first ROI
        b (BBox3D): 3d bounding box for second ROI
        ensure_even (bool, optional): make all dimensions of output ROI even. Defaults to False.

    Returns:
        BBox3D
    """
    (azs, aze), (ars, are), (acs, ace) = a
    (bzs, bze), (brs, bre), (bcs, bce) = b
    zs, ze = min([azs, bzs]), max([aze, bze])
    rs, re = min([ars, brs]), max([are, bre])
    cs, ce = min([acs, bcs]), max([ace, bce])
    if ensure_even:
        if (ze - zs) % 2 > 0:
            zs += 1
        if (re - rs) % 2 > 0:
            rs += 1
        if (ce - cs) % 2 > 0:
            cs += 1
    return (zs, ze), (rs, re), (cs, ce)
crop_with_roi(a: NDArray, roi: BBox3D | BBox2D) -> NDArray

crop_with_roi Utility function to crop input using specified bounding box ROI.

Parameters:

Name Type Description Default
a NDArray

input to be cropped

required
roi BBox3D | BBox2D

bounding box ROI

required

Raises:

Type Description
ValueError

if input isn't 2 or 3D

Returns:

Name Type Description
NDArray NDArray

cropped input

Source code in packages/pyspim/src/pyspim/roi.py
def crop_with_roi(a: NDArray, roi: BBox3D | BBox2D) -> NDArray:
    """crop_with_roi Utility function to crop input using specified bounding box ROI.

    Args:
        a (NDArray): input to be cropped
        roi (BBox3D | BBox2D): bounding box ROI

    Raises:
        ValueError: if input isn't 2 or 3D

    Returns:
        NDArray: cropped input
    """
    if len(a.shape) == 3:
        return a[roi[0][0] : roi[0][1], roi[1][0] : roi[1][1], roi[2][0] : roi[2][1]]
    elif len(a.shape) == 2:
        return a[roi[0][0] : roi[0][1], roi[1][0] : roi[1][1]]
    else:
        raise ValueError("invalid shape, must be 2 or 3D input")

util

public utility functions

Functions
center_crop(vol: NDArray, *args) -> NDArray

crop out the center of the input volume

:param vol: input volume :type vol: NDArray :returns: crop of input volume :rtype: NDArray

Source code in packages/pyspim/src/pyspim/_util.py
def center_crop(vol: NDArray, *args) -> NDArray:
    """crop out the center of the input volume

    :param vol: input volume
    :type vol: NDArray
    :returns: crop of input volume
    :rtype: NDArray
    """
    # unpack args into crop_z, crop_r, crop_c
    n_dim = len(vol.shape)
    assert n_dim == 2 or n_dim == 3, "input must be 2D image or 3D volume"
    if len(args) == 1:
        crop_z, crop_r, crop_c = args[0], args[0], args[0]
    else:
        assert len(args) == n_dim, "must specify either 1 or `n_dim` crop inputs"
        if n_dim == 2:
            crop_r, crop_c = args[0], args[1]
            crop_z = 0 # not needed, but suppresses typing error
        else:  # n_dim == 3
            crop_z, crop_r, crop_c = args[0], args[1], args[2]
    if n_dim == 2:
        size_r, size_c = vol.shape
        sz = 0 # not needed, but suppresses typing error
    else:
        size_z, size_r, size_c = vol.shape
        crop_z = crop_z if crop_z < size_z else size_z
        sz = size_z // 2 - crop_z // 2
    crop_r = crop_r if crop_r < size_r else size_r
    crop_c = crop_c if crop_c < size_c else size_c
    sr = size_r // 2 - crop_r // 2
    sc = size_c // 2 - crop_c // 2
    if n_dim == 2:
        return vol[sr : sr + crop_r, sc : sc + crop_c]
    else:
        return vol[sz : sz + crop_z, sr : sr + crop_r, sc : sc + crop_c]
launch_params_for_volume(shp: tuple[int, int, int], block_size_z: int, block_size_r: int, block_size_c: int) -> CuLaunchParameters

Automatically calculate good launch parameters for CUDA kernel that will be run on a volume of specified shape.

Parameters:

Name Type Description Default
shp Iterable[int]

shape of input volume kernel will be run over (ZRC).

required
block_size_z int

block size in z dimension (0 axis)

required
block_size_r int

block size in r dimension (1 axis)

required
block_size_c int

block size in c dimension (2 axis)

required

Returns:

Type Description
CuLaunchParameters

CuLaunchParameters

Source code in packages/pyspim/src/pyspim/_util.py
def launch_params_for_volume(
    shp: tuple[int,int,int], block_size_z: int, block_size_r: int, block_size_c: int
) -> CuLaunchParameters:
    """Automatically calculate good launch parameters for CUDA kernel that will be run on a volume of specified `shape`.

    Args:
        shp (Iterable[int]): shape of input volume kernel will be run over (ZRC).
        block_size_z (int): block size in z dimension (0 axis)
        block_size_r (int): block size in r dimension (1 axis)
        block_size_c (int): block size in c dimension (2 axis)

    Returns:
        CuLaunchParameters
    """
    # gz = _cuda_gridsize_for_blocksize(shp[0], block_size_z)
    # gr = _cuda_gridsize_for_blocksize(shp[1], block_size_r)
    # gc = _cuda_gridsize_for_blocksize(shp[2], block_size_c)

    #return (gz, gr, gc), (block_size_c, block_size_r, block_size_z)
    return (8, 8, 8), (block_size_z, block_size_r, block_size_c)
shared_bbox_from_proj_threshold(v1: NDArray, v2: NDArray, proj_fun: str = 'max', thresh_val: float = 0) -> BBox3D

determine region shared by both volumes by thresholding 'shared' means that both have data in the region

:param v1: first volume :type v1: NDArray :param v2: second volume :type v2: NDArray :param proj_fun: function to project with one of ('max', 'mean', 'median') :type proj_fun: str :param thresh_val: value to threshold with to make mask :type thresh_val: float :returns: 3d bounding box of region common to v1 and v2 :rtype: BBox3D

Source code in packages/pyspim/src/pyspim/_util.py
def shared_bbox_from_proj_threshold(
    v1: NDArray, v2: NDArray, proj_fun: str = "max", thresh_val: float = 0
) -> BBox3D:
    """determine region shared by both volumes by thresholding
        'shared' means that both have data in the region

    :param v1: first volume
    :type v1: NDArray
    :param v2: second volume
    :type v2: NDArray
    :param proj_fun: function to project with
       one of ('max', 'mean', 'median')
    :type proj_fun: str
    :param thresh_val: value to threshold with to make mask
    :type thresh_val: float
    :returns: 3d bounding box of region common to `v1` and `v2`
    :rtype: BBox3D
    """
    assert proj_fun in ("max", "mean", "median"), (
        "invalid `proj_fun` must be one of ('max', 'mean', 'median')"
    )
    xp = cupy.get_array_module(v1)
    if proj_fun == "max":
        fun = xp.amax
    elif proj_fun == "mean":
        fun = xp.mean
    else:
        fun = xp.median
    bb_yx = bbox_for_mask(
        xp.logical_and(fun(v1, 0) > thresh_val, fun(v2, 0) > thresh_val)
    )
    bb_yx = [0, xp.inf] + list(bb_yx[0]) + list(bb_yx[1])
    bb_zx = bbox_for_mask(
        xp.logical_and(fun(v1, 1) > thresh_val, fun(v2, 1) > thresh_val)
    )
    bb_zx = list(bb_zx[0]) + [0, xp.inf] + list(bb_zx[1])
    bb_zy = bbox_for_mask(
        xp.logical_and(fun(v1, 2) > thresh_val, fun(v2, 2) > thresh_val)
    )
    bb_zy = list(bb_zy[0]) + list(bb_zy[1]) + [0, xp.inf]
    bb = numpy.vstack([bb_yx, bb_zx, bb_zy])
    starts = xp.amax(bb, 0)[::2].astype(int)
    ends = xp.amin(bb, 0)[1::2].astype(int)
    bb = [(int(starts[i]), int(ends[i])) for i in range(3)]
    return tuple(bb) # type: ignore
threshold_triangle(im: NDArray, nbins: int = 256) -> float

threshold_triangle Compute threshold value by triangle method. Copied from skimage.filters but modified so that numpy or cupy is used based on input device.

Parameters:

Name Type Description Default
im NDArray

array to calculate threshold for

required
nbins int

number of bins to use in histogram. Defaults to 256.

256

Returns:

Type Description
float

float

Source code in packages/pyspim/src/pyspim/_util.py
def threshold_triangle(im: NDArray, nbins: int = 256) -> float:
    """threshold_triangle Compute threshold value by triangle method. Copied from `skimage.filters` but modified so that numpy or cupy is used based on input device.

    Args:
        im (NDArray): array to calculate threshold for
        nbins (int, optional): number of bins to use in histogram. Defaults to 256.

    Returns:
        float
    """
    xp = cupy.get_array_module(im)

    hist, bin_edges = xp.histogram(im, bins=nbins)
    bin_centers = (bin_edges[:-1] + bin_edges[1:]) / 2
    nbins = len(hist)

    # Find peak, lowest and highest gray levels.
    arg_pk_hgt = xp.argmax(hist)
    peak_height = hist[arg_pk_hgt]
    arg_llev, arg_hlev = xp.flatnonzero(hist)[[0, -1]]

    # Flip is True if left tail is shorter.
    flip = arg_pk_hgt - arg_llev < arg_hlev - arg_pk_hgt
    if flip:
        hist = hist[::-1]
        arg_llev = nbins - arg_hlev - 1
        arg_pk_hgt = nbins - arg_pk_hgt - 1

    # If flip == True, arg_hlev becomes incorrect
    # but we don't need it anymore.
    del arg_hlev

    # set up coordinate system
    width = arg_pk_hgt - arg_llev
    x1 = xp.arange(width)
    y1 = hist[x1 + arg_llev]

    # normalize
    norm = int(xp.sqrt(peak_height**2 + width**2))
    peak_height = peak_height / norm
    width = width / norm

    # maximize the length
    length = peak_height * x1 - width * y1
    arg_level = xp.argmax(length) + arg_llev

    if flip:
        arg_level = nbins - arg_level - 1
    return float(bin_centers[arg_level])

pyspim.roi

Functions

detect_roi_2d(im: NDArray, method: str = 'triangle', quantile: float = 0, **kwargs) -> BBox2D

detect_roi_2d Determine ROI in 2D image by thresholding. If using method=threshold, the threshold=[value] keyword arg must be passed into function. Args: im (NDArray): image to find ROI in method (str, optional): thresholding method, one of ('triangle', 'otsu', 'threshold'). Defaults to 'triangle'. quantile (float, optional): quantile to truncate coordinates with (helps get rid of outliers). Defaults to 0.

Returns:

Name Type Description
BBox2D BBox2D

bounding box ROI

Source code in packages/pyspim/src/pyspim/roi.py
def detect_roi_2d(
    im: NDArray, method: str = "triangle", quantile: float = 0, **kwargs
) -> BBox2D:
    """detect_roi_2d Determine ROI in 2D image by thresholding. If using `method=threshold`, the `threshold=[value]` keyword arg must be passed into function.
    Args:
        im (NDArray): image to find ROI in
        method (str, optional): thresholding method, one of ('triangle', 'otsu', 'threshold'). Defaults to 'triangle'.
        quantile (float, optional): quantile to truncate coordinates with (helps get rid of outliers). Defaults to 0.

    Returns:
        BBox2D: bounding box ROI
    """
    assert method in ("triangle", "otsu", "threshold"), (
        "invalid segmentation method specified"
    )
    if method == "threshold":
        thresh = kwargs["threshold"]
    xp = cupy.get_array_module(im)
    ndi = get_ndimage_module(xp)
    # get rid of line artifacts
    vert_line = xp.zeros((5, 5))
    vert_line[:, 2] = 1
    thrim = ndi.grey_erosion(
        ndi.grey_erosion(im, structure=vert_line), structure=vert_line.T
    )
    # threshold the image by triangle method
    if method == "triangle":
        thresh = threshold_triangle(im)
    elif method == "otsu":
        thresh = skimage.filters.threshold_otsu(im)
    else:  # method == 'threshold'
        pass  # already defined this, above when we grabbed from **kwargs

    # dilate 3 times to remove small particles
    thrim = ndi.binary_dilation(thrim > thresh, iterations=3, brute_force=True)
    # figure out how to draw bounding box/ROI
    row, col = xp.nonzero(thrim)
    if quantile > 0:
        rq = xp.quantile(row, [quantile, 1 - quantile])
        assert len(rq) == 2, "should only return 2 values"
        rs, re = int(rq[0]), int(rq[1])
        cq = xp.quantile(col, [quantile, 1 - quantile])
        cs, ce = int(cq[0]), int(cq[1])
    else:
        rs, re = int(xp.amin(row)), int(xp.amax(row) + 1)
        cs, ce = int(xp.amin(col)), int(xp.amax(col) + 1)
    return (rs, re), (cs, ce)
detect_roi_3d(im: NDArray, method: str = 'triangle', quantile_rc: float = 0, quantile_zc: float = 0, **kwargs) -> BBox3D

detect_roi_3d Detect ROI of 3D volume by combining ROIs of 2D max-projections.

Parameters:

Name Type Description Default
im NDArray

volume to find ROI of

required
method str

thresholding method. Defaults to 'triangle'.

'triangle'
quantile_rc float

qunatile used for determining row-col bounding box. Defaults to 0.

0
quantile_zc float

quantile used for determine z-row bounding box. Defaults to 0.

0

Returns:

Name Type Description
BBox3D BBox3D

format is ((lb_z, ub_z), (lb_r,ub_r), (lb_c,ub_c))

Source code in packages/pyspim/src/pyspim/roi.py
def detect_roi_3d(
    im: NDArray,
    method: str = "triangle",
    quantile_rc: float = 0,
    quantile_zc: float = 0,
    **kwargs,
) -> BBox3D:
    """detect_roi_3d Detect ROI of 3D volume by combining ROIs of 2D max-projections.

    Args:
        im (NDArray): volume to find ROI of
        method (str, optional): thresholding method. Defaults to 'triangle'.
        quantile_rc (float, optional): qunatile used for determining row-col bounding box. Defaults to 0.
        quantile_zc (float, optional): quantile used for determine z-row bounding box. Defaults to 0.

    Returns:
        BBox3D: format is `((lb_z, ub_z), (lb_r,ub_r), (lb_c,ub_c))`
    """
    xp = cupy.get_array_module(im)
    rc = xp.amax(im, axis=0)
    (rs, re), (cs, ce) = detect_roi_2d(rc, method, quantile_rc, **kwargs)
    zc = xp.amax(im, axis=1)
    (zs, ze), _ = detect_roi_2d(zc, method, quantile_zc, **kwargs)
    return (zs, ze), (rs, re), (cs, ce)
combine_rois(a: BBox3D, b: BBox3D, ensure_even: bool = False) -> BBox3D

combine_rois Combine 2 ROIs into smallest possible one that encompasses both.

Parameters:

Name Type Description Default
a BBox3D

3d bounding box for first ROI

required
b BBox3D

3d bounding box for second ROI

required
ensure_even bool

make all dimensions of output ROI even. Defaults to False.

False

Returns:

Type Description
BBox3D

BBox3D

Source code in packages/pyspim/src/pyspim/roi.py
def combine_rois(a: BBox3D, b: BBox3D, ensure_even: bool = False) -> BBox3D:
    """combine_rois Combine 2 ROIs into smallest possible one that encompasses both.

    Args:
        a (BBox3D): 3d bounding box for first ROI
        b (BBox3D): 3d bounding box for second ROI
        ensure_even (bool, optional): make all dimensions of output ROI even. Defaults to False.

    Returns:
        BBox3D
    """
    (azs, aze), (ars, are), (acs, ace) = a
    (bzs, bze), (brs, bre), (bcs, bce) = b
    zs, ze = min([azs, bzs]), max([aze, bze])
    rs, re = min([ars, brs]), max([are, bre])
    cs, ce = min([acs, bcs]), max([ace, bce])
    if ensure_even:
        if (ze - zs) % 2 > 0:
            zs += 1
        if (re - rs) % 2 > 0:
            rs += 1
        if (ce - cs) % 2 > 0:
            cs += 1
    return (zs, ze), (rs, re), (cs, ce)
crop_with_roi(a: NDArray, roi: BBox3D | BBox2D) -> NDArray

crop_with_roi Utility function to crop input using specified bounding box ROI.

Parameters:

Name Type Description Default
a NDArray

input to be cropped

required
roi BBox3D | BBox2D

bounding box ROI

required

Raises:

Type Description
ValueError

if input isn't 2 or 3D

Returns:

Name Type Description
NDArray NDArray

cropped input

Source code in packages/pyspim/src/pyspim/roi.py
def crop_with_roi(a: NDArray, roi: BBox3D | BBox2D) -> NDArray:
    """crop_with_roi Utility function to crop input using specified bounding box ROI.

    Args:
        a (NDArray): input to be cropped
        roi (BBox3D | BBox2D): bounding box ROI

    Raises:
        ValueError: if input isn't 2 or 3D

    Returns:
        NDArray: cropped input
    """
    if len(a.shape) == 3:
        return a[roi[0][0] : roi[0][1], roi[1][0] : roi[1][1], roi[2][0] : roi[2][1]]
    elif len(a.shape) == 2:
        return a[roi[0][0] : roi[0][1], roi[1][0] : roi[1][1]]
    else:
        raise ValueError("invalid shape, must be 2 or 3D input")

pyspim.util

public utility functions

Functions

center_crop(vol: NDArray, *args) -> NDArray

crop out the center of the input volume

:param vol: input volume :type vol: NDArray :returns: crop of input volume :rtype: NDArray

Source code in packages/pyspim/src/pyspim/_util.py
def center_crop(vol: NDArray, *args) -> NDArray:
    """crop out the center of the input volume

    :param vol: input volume
    :type vol: NDArray
    :returns: crop of input volume
    :rtype: NDArray
    """
    # unpack args into crop_z, crop_r, crop_c
    n_dim = len(vol.shape)
    assert n_dim == 2 or n_dim == 3, "input must be 2D image or 3D volume"
    if len(args) == 1:
        crop_z, crop_r, crop_c = args[0], args[0], args[0]
    else:
        assert len(args) == n_dim, "must specify either 1 or `n_dim` crop inputs"
        if n_dim == 2:
            crop_r, crop_c = args[0], args[1]
            crop_z = 0 # not needed, but suppresses typing error
        else:  # n_dim == 3
            crop_z, crop_r, crop_c = args[0], args[1], args[2]
    if n_dim == 2:
        size_r, size_c = vol.shape
        sz = 0 # not needed, but suppresses typing error
    else:
        size_z, size_r, size_c = vol.shape
        crop_z = crop_z if crop_z < size_z else size_z
        sz = size_z // 2 - crop_z // 2
    crop_r = crop_r if crop_r < size_r else size_r
    crop_c = crop_c if crop_c < size_c else size_c
    sr = size_r // 2 - crop_r // 2
    sc = size_c // 2 - crop_c // 2
    if n_dim == 2:
        return vol[sr : sr + crop_r, sc : sc + crop_c]
    else:
        return vol[sz : sz + crop_z, sr : sr + crop_r, sc : sc + crop_c]
launch_params_for_volume(shp: tuple[int, int, int], block_size_z: int, block_size_r: int, block_size_c: int) -> CuLaunchParameters

Automatically calculate good launch parameters for CUDA kernel that will be run on a volume of specified shape.

Parameters:

Name Type Description Default
shp Iterable[int]

shape of input volume kernel will be run over (ZRC).

required
block_size_z int

block size in z dimension (0 axis)

required
block_size_r int

block size in r dimension (1 axis)

required
block_size_c int

block size in c dimension (2 axis)

required

Returns:

Type Description
CuLaunchParameters

CuLaunchParameters

Source code in packages/pyspim/src/pyspim/_util.py
def launch_params_for_volume(
    shp: tuple[int,int,int], block_size_z: int, block_size_r: int, block_size_c: int
) -> CuLaunchParameters:
    """Automatically calculate good launch parameters for CUDA kernel that will be run on a volume of specified `shape`.

    Args:
        shp (Iterable[int]): shape of input volume kernel will be run over (ZRC).
        block_size_z (int): block size in z dimension (0 axis)
        block_size_r (int): block size in r dimension (1 axis)
        block_size_c (int): block size in c dimension (2 axis)

    Returns:
        CuLaunchParameters
    """
    # gz = _cuda_gridsize_for_blocksize(shp[0], block_size_z)
    # gr = _cuda_gridsize_for_blocksize(shp[1], block_size_r)
    # gc = _cuda_gridsize_for_blocksize(shp[2], block_size_c)

    #return (gz, gr, gc), (block_size_c, block_size_r, block_size_z)
    return (8, 8, 8), (block_size_z, block_size_r, block_size_c)
shared_bbox_from_proj_threshold(v1: NDArray, v2: NDArray, proj_fun: str = 'max', thresh_val: float = 0) -> BBox3D

determine region shared by both volumes by thresholding 'shared' means that both have data in the region

:param v1: first volume :type v1: NDArray :param v2: second volume :type v2: NDArray :param proj_fun: function to project with one of ('max', 'mean', 'median') :type proj_fun: str :param thresh_val: value to threshold with to make mask :type thresh_val: float :returns: 3d bounding box of region common to v1 and v2 :rtype: BBox3D

Source code in packages/pyspim/src/pyspim/_util.py
def shared_bbox_from_proj_threshold(
    v1: NDArray, v2: NDArray, proj_fun: str = "max", thresh_val: float = 0
) -> BBox3D:
    """determine region shared by both volumes by thresholding
        'shared' means that both have data in the region

    :param v1: first volume
    :type v1: NDArray
    :param v2: second volume
    :type v2: NDArray
    :param proj_fun: function to project with
       one of ('max', 'mean', 'median')
    :type proj_fun: str
    :param thresh_val: value to threshold with to make mask
    :type thresh_val: float
    :returns: 3d bounding box of region common to `v1` and `v2`
    :rtype: BBox3D
    """
    assert proj_fun in ("max", "mean", "median"), (
        "invalid `proj_fun` must be one of ('max', 'mean', 'median')"
    )
    xp = cupy.get_array_module(v1)
    if proj_fun == "max":
        fun = xp.amax
    elif proj_fun == "mean":
        fun = xp.mean
    else:
        fun = xp.median
    bb_yx = bbox_for_mask(
        xp.logical_and(fun(v1, 0) > thresh_val, fun(v2, 0) > thresh_val)
    )
    bb_yx = [0, xp.inf] + list(bb_yx[0]) + list(bb_yx[1])
    bb_zx = bbox_for_mask(
        xp.logical_and(fun(v1, 1) > thresh_val, fun(v2, 1) > thresh_val)
    )
    bb_zx = list(bb_zx[0]) + [0, xp.inf] + list(bb_zx[1])
    bb_zy = bbox_for_mask(
        xp.logical_and(fun(v1, 2) > thresh_val, fun(v2, 2) > thresh_val)
    )
    bb_zy = list(bb_zy[0]) + list(bb_zy[1]) + [0, xp.inf]
    bb = numpy.vstack([bb_yx, bb_zx, bb_zy])
    starts = xp.amax(bb, 0)[::2].astype(int)
    ends = xp.amin(bb, 0)[1::2].astype(int)
    bb = [(int(starts[i]), int(ends[i])) for i in range(3)]
    return tuple(bb) # type: ignore
threshold_triangle(im: NDArray, nbins: int = 256) -> float

threshold_triangle Compute threshold value by triangle method. Copied from skimage.filters but modified so that numpy or cupy is used based on input device.

Parameters:

Name Type Description Default
im NDArray

array to calculate threshold for

required
nbins int

number of bins to use in histogram. Defaults to 256.

256

Returns:

Type Description
float

float

Source code in packages/pyspim/src/pyspim/_util.py
def threshold_triangle(im: NDArray, nbins: int = 256) -> float:
    """threshold_triangle Compute threshold value by triangle method. Copied from `skimage.filters` but modified so that numpy or cupy is used based on input device.

    Args:
        im (NDArray): array to calculate threshold for
        nbins (int, optional): number of bins to use in histogram. Defaults to 256.

    Returns:
        float
    """
    xp = cupy.get_array_module(im)

    hist, bin_edges = xp.histogram(im, bins=nbins)
    bin_centers = (bin_edges[:-1] + bin_edges[1:]) / 2
    nbins = len(hist)

    # Find peak, lowest and highest gray levels.
    arg_pk_hgt = xp.argmax(hist)
    peak_height = hist[arg_pk_hgt]
    arg_llev, arg_hlev = xp.flatnonzero(hist)[[0, -1]]

    # Flip is True if left tail is shorter.
    flip = arg_pk_hgt - arg_llev < arg_hlev - arg_pk_hgt
    if flip:
        hist = hist[::-1]
        arg_llev = nbins - arg_hlev - 1
        arg_pk_hgt = nbins - arg_pk_hgt - 1

    # If flip == True, arg_hlev becomes incorrect
    # but we don't need it anymore.
    del arg_hlev

    # set up coordinate system
    width = arg_pk_hgt - arg_llev
    x1 = xp.arange(width)
    y1 = hist[x1 + arg_llev]

    # normalize
    norm = int(xp.sqrt(peak_height**2 + width**2))
    peak_height = peak_height / norm
    width = width / norm

    # maximize the length
    length = peak_height * x1 - width * y1
    arg_level = xp.argmax(length) + arg_llev

    if flip:
        arg_level = nbins - arg_level - 1
    return float(bin_centers[arg_level])

pyspim.typing

pyspim.dispimfusion

Reimplementation of the diSPIMFusion API, as first described in [1].

References

[1] Guo M., et al. "Rapid image deconvolution and multiview fusion..." Nature Biotechnology 38.11 (2020): 1337-1346. [2] https://github.com/eguomin/diSPIMFusion [3] https://github.com/eguomin/microImageLib

pyspim.isotropize

Isotropization utilities.

Individual views from diSPIM volumes are typically acquired with uneven spacing in one of the directions, but analysis requires cubic pixels. These are utilities to upsample the axis with uneven spacing relative to the camera pixel size.

This can be done either by direct interpolation (interpolate) or by upsampling in Fourier space (fourier_upsample), using the technique described in [1].

References

[1] Stein, et al. "Fourier interpolation stochastic..." doi:10.1364/OE.23.016154

Functions

interpolate(vol: NDArray, pixel_size: float, step_size: float, axis: int = 0, **kwargs) -> NDArray

interpolate Upsample an axis by interpolation. Function will upsample specified axis by assuming it has spacing step_size such that the output voxel is cubic, pixel_size**3.

Parameters:

Name Type Description Default
vol NDArray

input volume

required
pixel_size float

pixel size in real (space) units

required
step_size float

step size in real (space) units

required
axis int

axis to upsample. Defaults to 0.

0
**kwargs

passed to ndimage.zoom

{}

Returns:

Name Type Description
NDArray NDArray
Source code in packages/pyspim/src/pyspim/isotropize.py
def interpolate(
    vol: NDArray, pixel_size: float, step_size: float, axis: int = 0, **kwargs
) -> NDArray:
    """interpolate Upsample an axis by interpolation. Function will upsample specified `axis` by assuming it has spacing `step_size` such that the output voxel is cubic, `pixel_size**3`.

    Args:
        vol (NDArray): input volume
        pixel_size (float): pixel size in real (space) units
        step_size (float): step size in real (space) units
        axis (int, optional): axis to upsample. Defaults to 0.
        **kwargs: passed to `ndimage.zoom`

    Returns:
        NDArray:
    """

    n_dim = len(vol.shape)
    assert axis < n_dim and axis >= 0, "axis must be in range [0, len(vol.shape))"
    ndi = get_ndimage_module(vol)
    resc = [
        1.0,
    ] * n_dim
    resc[axis] = step_size / pixel_size
    return ndi.zoom(vol, resc, **kwargs)
fourier_upsample(vol: NDArray, pixel_size: float, step_size: float, axis: int = 0) -> NDArray

fourier_upsample Upsample an axis by Fourier upsampling.

Parameters:

Name Type Description Default
vol NDArray

input volume

required
pixel_size float

pixel size in real (space) units.

required
step_size float

step size in real (space) units

required
axis int

axis to upsample. Defaults to 0.

0

Returns:

Type Description
NDArray

NDArray

Source code in packages/pyspim/src/pyspim/isotropize.py
def fourier_upsample(
    vol: NDArray, pixel_size: float, step_size: float, axis: int = 0
) -> NDArray:
    """fourier_upsample Upsample an axis by Fourier upsampling.

    Args:
        vol (NDArray): input volume
        pixel_size (float): pixel size in real (space) units.
        step_size (float): step size in real (space) units
        axis (int, optional): axis to upsample. Defaults to 0.

    Returns:
        NDArray
    """

    n_dim = len(vol.shape)
    assert n_dim == 2 or n_dim == 3, "input must be a 2D image or 3D volume"
    assert axis < n_dim and axis >= 0, "axis must be in range [0, len(vol.shape))"
    xp = cupy.get_array_module(vol)
    fft = get_fft_module(vol)
    # mirror the volume on the relevant axis and convert to fourier domain
    ax_sze = vol.shape[axis]
    mirror_padding = [(0, 0) for _ in range(n_dim)]
    mirror_padding[axis] = (ax_sze // 2, ax_sze // 2)
    fourier = fft.fftshift(fft.rfftn(xp.pad(vol, mirror_padding, mode="symmetric")))
    # calculate how much we need to pad to make the upsampling work
    # see Eqn. 9 of fSOFI paper
    n12 = math.ceil((2 * ax_sze - 1) / 2)
    delta = int(math.ceil((step_size / pixel_size) * n12 - n12))
    # pad fourier domain image with zeros
    zero_padding = [(0, 0) for _ in range(n_dim)]
    zero_padding[axis] = (delta, delta)
    fourier = xp.pad(fourier, zero_padding, mode="constant", constant_values=0)
    out = fft.irfftn(fft.ifftshift(fourier))
    # trim out extra stuff generated by the padding
    n_mid = math.floor(step_size / pixel_size * ax_sze)
    nmh = n_mid // 2
    cent = out.shape[axis]
    if n_dim == 3:
        if axis == 0:
            return out[cent - nmh : cent + nmh, ...]
        elif axis == 1:
            return out[:, cent - nmh, cent + nmh, :]
        else:
            return out[..., cent - nmh : cent + nmh]
    else:  # n_dim == 2
        if axis == 0:
            return out[cent - nmh : cent + nmh, :]
        else:
            return out[:, cent - nmh : cent + nmh]

pyspim.decon.util

pyspim.decon.rl.dualview_sep

Separable deconvolution of dual-view volumes.

TODO: add more detail here.

Functions

deconvolve(view_a: NDArray, view_b: NDArray, est_i: NDArray | None, psf_az: NDArray | Iterable[NDArray], psf_ay: NDArray | Iterable[NDArray], psf_ax: NDArray | Iterable[NDArray], psf_bz: NDArray | Iterable[NDArray], psf_by: NDArray | Iterable[NDArray], psf_bx: NDArray | Iterable[NDArray], bp_az: NDArray | Iterable[NDArray], bp_ay: NDArray | Iterable[NDArray], bp_ax: NDArray | Iterable[NDArray], bp_bz: NDArray | Iterable[NDArray], bp_by: NDArray | Iterable[NDArray], bp_bx: NDArray | Iterable[NDArray], decon_function: str, num_iter: int, epsilon: float, boundary_correction: bool, zero_padding: PadType, boundary_sigma_a: float, boundary_sigma_b: float, verbose: bool) -> NDArray

deconvolve Joint deconvolution of the 2 input views into a single volume.

Input volumes should be either 3D (ZRC) or 4D (CZRC).

Parameters:

Name Type Description Default
view_a NDArray

input volume from first view (A)

required
view_b NDArray

input volume from second view (B)

required
est_i NDArray | None

initial estimate for deconvolution. If None, will be initialized as (A + B)/2.

required
psf_az NDArray | Iterable[NDArray]

psf for view a in z-direction

required
psf_ay NDArray | Iterable[NDArray]

psf for view a in y-direction

required
psf_ax NDArray | Iterable[NDArray]

psf for view a in x-direction

required
psf_bz NDArray | Iterable[NDArray]

psf for view b in z-direction

required
psf_by NDArray | Iterable[NDArray]

psf for view b in y-direction

required
psf_bx NDArray | Iterable[NDArray]

psf for view b in x-direction

required
bp_az NDArray | Iterable[NDArray]

backprojector for view a in z-direction

required
bp_ay NDArray | Iterable[NDArray]

backprojector for view a in y-direction

required
bp_ax NDArray | Iterable[NDArray]

backprojector for view a in x-direction

required
bp_bz NDArray | Iterable[NDArray]

backprojector for view b in z-direction

required
bp_by NDArray | Iterable[NDArray]

backprojector for view b in y-direction

required
bp_bx NDArray | Iterable[NDArray]

backprojector for view b in x-direction

required
decon_function str

deconvolution function to use. One of "additive", "dispim".

required
num_iter int

number of deconvolution iterations

required
epsilon float

small parameter for preventing division by zero

required
boundary_correction bool

whether or not to do boundary correction

required
zero_padding Optional[PadType]

zero padding for boundary correction

required
boundary_sigma_a float

threshold for determining significant pixels in view A, defaults to 1e-2.

required
boundary_sigma_b float

threshold for determining significant pixels in view A, defaults to 1e-2.

required
verbose bool

show progress bar for iterations

required

Raises:

Type Description
ValueError

if input volume isn't 3- (single channel) or 4D (multichannel).

Returns:

Type Description
NDArray

NDArray

Source code in packages/pyspim/src/pyspim/decon/rl/dualview_sep.py
def deconvolve(
    view_a: NDArray,
    view_b: NDArray,
    est_i: NDArray | None,
    psf_az: NDArray | Iterable[NDArray],
    psf_ay: NDArray | Iterable[NDArray],
    psf_ax: NDArray | Iterable[NDArray],
    psf_bz: NDArray | Iterable[NDArray],
    psf_by: NDArray | Iterable[NDArray],
    psf_bx: NDArray | Iterable[NDArray],
    bp_az: NDArray | Iterable[NDArray],
    bp_ay: NDArray | Iterable[NDArray],
    bp_ax: NDArray | Iterable[NDArray],
    bp_bz: NDArray | Iterable[NDArray],
    bp_by: NDArray | Iterable[NDArray],
    bp_bx: NDArray | Iterable[NDArray],
    decon_function: str,
    num_iter: int,
    epsilon: float,
    boundary_correction: bool,
    zero_padding: PadType,
    boundary_sigma_a: float,
    boundary_sigma_b: float,
    verbose: bool,
) -> NDArray:
    """deconvolve Joint deconvolution of the 2 input views into a single volume.

    Input volumes should be either 3D (ZRC) or 4D (CZRC).

    Args:
        view_a (NDArray): input volume from first view (A)
        view_b (NDArray): input volume from second view (B)
        est_i (NDArray | None): initial estimate for deconvolution. If ``None``, will be initialized as (A + B)/2.
        psf_az (NDArray | Iterable[NDArray]): psf for view a in z-direction
        psf_ay (NDArray | Iterable[NDArray]): psf for view a in y-direction
        psf_ax (NDArray | Iterable[NDArray]): psf for view a in x-direction
        psf_bz (NDArray | Iterable[NDArray]): psf for view b in z-direction
        psf_by (NDArray | Iterable[NDArray]): psf for view b in y-direction
        psf_bx (NDArray | Iterable[NDArray]): psf for view b in x-direction
        bp_az (NDArray | Iterable[NDArray]): backprojector for view a in z-direction
        bp_ay (NDArray | Iterable[NDArray]): backprojector for view a in y-direction
        bp_ax (NDArray | Iterable[NDArray]): backprojector for view a in x-direction
        bp_bz (NDArray | Iterable[NDArray]): backprojector for view b in z-direction
        bp_by (NDArray | Iterable[NDArray]): backprojector for view b in y-direction
        bp_bx (NDArray | Iterable[NDArray]): backprojector for view b in x-direction
        decon_function (str): deconvolution function to use. One of ``"additive", "dispim"``.
        num_iter (int): number of deconvolution iterations
        epsilon (float): small parameter for preventing division by zero
        boundary_correction (bool): whether or not to do boundary correction
        zero_padding (Optional[PadType]): zero padding for boundary correction
        boundary_sigma_a (float): threshold for determining significant pixels in view A, defaults to 1e-2.
        boundary_sigma_b (float): threshold for determining significant pixels in view A, defaults to 1e-2.
        verbose (bool): show progress bar for iterations

    Raises:
        ValueError: if input volume isn't 3- (single channel) or 4D (multichannel).

    Returns:
        NDArray
    """
    if len(view_a.shape) == 4:
        fun = _deconvolve_multichannel
    elif len(view_a.shape) == 3:
        fun = _deconvolve_single_channel
    else:
        raise ValueError("invalid shape, must be 3 or 4D")
    return fun(
        view_a,
        view_b,
        est_i,
        psf_az,
        psf_ay,
        psf_ax,
        psf_bz,
        psf_by,
        psf_bx,
        bp_az,
        bp_ay,
        bp_ax,
        bp_bz,
        bp_by,
        bp_bx,
        decon_function,
        num_iter,
        epsilon,
        boundary_correction,
        zero_padding,
        boundary_sigma_a,
        boundary_sigma_b,
        verbose,
    )
additive_joint_rl(view_a: cupy.ndarray, view_b: cupy.ndarray, est_i: cupy.ndarray, psf_az: cupy.ndarray, psf_ay: cupy.ndarray, psf_ax: cupy.ndarray, psf_bz: cupy.ndarray, psf_by: cupy.ndarray, psf_bx: cupy.ndarray, bp_az: cupy.ndarray, bp_ay: cupy.ndarray, bp_ax: cupy.ndarray, bp_bz: cupy.ndarray, bp_by: cupy.ndarray, bp_bx: cupy.ndarray, num_iter: int, epsilon: float, boundary_correction: bool, zero_padding: PadType, boundary_sigma_a: float, boundary_sigma_b: float, conv_module: Optional[cupy.RawModule], launch_pars: Optional[CuLaunchParameters], verbose: bool) -> cupy.ndarray

additive_joint_rl Additive joint RL deconvolution.

Parameters:

Name Type Description Default
view_a ndarray

input volume for view A

required
view_b ndarray

input volume for view B

required
est_i ndarray

initial estimate

required
psf_az ndarray

PSF for view A, z-direction

required
psf_ay ndarray

PSF for view A, y-direction

required
psf_ax ndarray

PSF for view A, x-direction

required
psf_bz ndarray

PSF for view B, z-direction

required
psf_by ndarray

PSF for view B, y-direction

required
psf_bx ndarray

PSF for view B, x-direction

required
bp_az ndarray

backprojector for view A, z-direction

required
bp_ay ndarray

backprojector for view A, y-direction

required
bp_ax ndarray

backprojector for view A, x-direction

required
bp_bz ndarray

backprojector for view B, z-direction

required
bp_by ndarray

backprojector for view B, y-direction

required
bp_bx ndarray

backprojector for view B, x-direction

required
num_iter int

number of deconvolution iterations

required
epsilon float

small parameter to prevent division by zero

required
boundary_correction bool

whether or not to do boundary correction

required
zero_padding Optional[PadType]

zero-padding for boundary correction

required
boundary_sigma_a float

threshold for determining significant pixels in view A, defaults to 1e-2.

required
boundary_sigma_b float

threshold for determining significant pixels in view A, defaults to 1e-2.

required
conv_module Optional[RawModule]

cupy.RawModule that implements separable convolution for appropriate kernel. If None, will be generated on the fly.

required
launch_pars Optional[CuLaunchParameters]

kernel launch parameters. If None, will be computed for the input volume.

required
verbose bool

description

required

Returns:

Type Description
ndarray

cupy.ndarray

Source code in packages/pyspim/src/pyspim/decon/rl/dualview_sep.py
def additive_joint_rl(
    view_a: cupy.ndarray,
    view_b: cupy.ndarray,
    est_i: cupy.ndarray,
    psf_az: cupy.ndarray,
    psf_ay: cupy.ndarray,
    psf_ax: cupy.ndarray,
    psf_bz: cupy.ndarray,
    psf_by: cupy.ndarray,
    psf_bx: cupy.ndarray,
    bp_az: cupy.ndarray,
    bp_ay: cupy.ndarray,
    bp_ax: cupy.ndarray,
    bp_bz: cupy.ndarray,
    bp_by: cupy.ndarray,
    bp_bx: cupy.ndarray,
    num_iter: int,
    epsilon: float,
    boundary_correction: bool,
    zero_padding: PadType,
    boundary_sigma_a: float,
    boundary_sigma_b: float,
    conv_module: Optional[cupy.RawModule],
    launch_pars: Optional[CuLaunchParameters],
    verbose: bool,
) -> cupy.ndarray:
    """additive_joint_rl Additive joint RL deconvolution.

    Args:
        view_a (cupy.ndarray): input volume for view A
        view_b (cupy.ndarray): input volume for view B
        est_i (cupy.ndarray): initial estimate
        psf_az (cupy.ndarray): PSF for view A, z-direction
        psf_ay (cupy.ndarray): PSF for view A, y-direction
        psf_ax (cupy.ndarray): PSF for view A, x-direction
        psf_bz (cupy.ndarray): PSF for view B, z-direction
        psf_by (cupy.ndarray): PSF for view B, y-direction
        psf_bx (cupy.ndarray): PSF for view B, x-direction
        bp_az (cupy.ndarray): backprojector for view A, z-direction
        bp_ay (cupy.ndarray): backprojector for view A, y-direction
        bp_ax (cupy.ndarray): backprojector for view A, x-direction
        bp_bz (cupy.ndarray): backprojector for view B, z-direction
        bp_by (cupy.ndarray): backprojector for view B, y-direction
        bp_bx (cupy.ndarray): backprojector for view B, x-direction
        num_iter (int): number of deconvolution iterations
        epsilon (float): small parameter to prevent division by zero
        boundary_correction (bool): whether or not to do boundary correction
        zero_padding (Optional[PadType]): zero-padding for boundary correction
        boundary_sigma_a (float): threshold for determining significant pixels in view A, defaults to 1e-2.
        boundary_sigma_b (float): threshold for determining significant pixels in view A, defaults to 1e-2.
        conv_module (Optional[cupy.RawModule]): ``cupy.RawModule`` that implements separable convolution for appropriate kernel. If ``None``, will be generated on the fly.
        launch_pars (Optional[CuLaunchParameters]): kernel launch parameters. If ``None``, will be computed for the input volume.
        verbose (bool): _description_

    Returns:
        cupy.ndarray
    """
    # compile the convolution module
    if conv_module is None:
        conv_module = make_conv_module(len(psf_az) // 2)
    if launch_pars is None:
        launch_pars = calc_launch_params(len(psf_az) // 2, view_a.shape)
    if boundary_correction:
        # determine zero padding
        if zero_padding is None:
            pad = tuple([(d // 2, d // 2) for d in view_a.shape])
        else:
            if isinstance(zero_padding, int):
                pad = tuple([(zero_padding, zero_padding) for _ in view_a.shape])
            else:
                assert len(zero_padding) == len(view_a.shape), (
                    "zero-padding must be specified for all dimensions of input"
                )
                pad = tuple([(p, p) for p in zero_padding])
        return _additive_joint_rl_boundcorr(
            view_a,
            view_b,
            est_i,
            psf_az,
            psf_ay,
            psf_ax,
            psf_bz,
            psf_by,
            psf_bx,
            bp_az,
            bp_ay,
            bp_ax,
            bp_bz,
            bp_by,
            bp_bx,
            num_iter,
            epsilon,
            pad,
            boundary_sigma_a,
            boundary_sigma_b,
            conv_module,
            launch_pars,
            verbose,
        )
    else:
        return _additive_joint_rl_uncorr(
            view_a,
            view_b,
            est_i,
            psf_az,
            psf_ay,
            psf_ax,
            psf_bz,
            psf_by,
            psf_bx,
            bp_az,
            bp_ay,
            bp_ax,
            bp_bz,
            bp_by,
            bp_bx,
            num_iter,
            epsilon,
            conv_module,
            launch_pars,
            False,
        )
joint_rl_dispim(view_a: cupy.ndarray, view_b: cupy.ndarray, est_i: cupy.ndarray, psf_az: cupy.ndarray, psf_ay: cupy.ndarray, psf_ax: cupy.ndarray, psf_bz: cupy.ndarray, psf_by: cupy.ndarray, psf_bx: cupy.ndarray, bp_az: cupy.ndarray, bp_ay: cupy.ndarray, bp_ax: cupy.ndarray, bp_bz: cupy.ndarray, bp_by: cupy.ndarray, bp_bx: cupy.ndarray, num_iter: int, epsilon: float, boundary_correction: bool, zero_padding: PadType, boundary_sigma_a: float, boundary_sigma_b: float, conv_module: Optional[cupy.RawModule], launch_pars: Optional[CuLaunchParameters], verbose: bool) -> cupy.ndarray

Joint deconvolution specifically for diSPIM volumes.

Parameters:

Name Type Description Default
view_a ndarray

input volume for view A

required
view_b ndarray

input volume for view B

required
est_i ndarray

initial estimate

required
psf_az ndarray

PSF for view A, z-direction

required
psf_ay ndarray

PSF for view A, y-direction

required
psf_ax ndarray

PSF for view A, x-direction

required
psf_bz ndarray

PSF for view B, z-direction

required
psf_by ndarray

PSF for view B, y-direction

required
psf_bx ndarray

PSF for view B, x-direction

required
bp_az ndarray

backprojector for view A, z-direction

required
bp_ay ndarray

backprojector for view A, y-direction

required
bp_ax ndarray

backprojector for view A, x-direction

required
bp_bz ndarray

backprojector for view B, z-direction

required
bp_by ndarray

backprojector for view B, y-direction

required
bp_bx ndarray

backprojector for view B, x-direction

required
num_iter int

number of deconvolution iterations

required
epsilon float

small parameter to prevent division by zero

required
boundary_correction bool

whether or not to do boundary correction

required
zero_padding Optional[PadType]

zero-padding for boundary correction

required
boundary_sigma_a float

threshold for determining significant pixels in view A, defaults to 1e-2.

required
boundary_sigma_b float

threshold for determining significant pixels in view A, defaults to 1e-2.

required
conv_module Optional[RawModule]

cupy.RawModule that implements separable convolution for appropriate kernel. If None, will be generated on the fly.

required
launch_pars Optional[CuLaunchParameters]

kernel launch parameters. If None, will be computed for the input volume.

required
verbose bool

description

required

Returns:

Type Description
ndarray

cupy.ndarray

Source code in packages/pyspim/src/pyspim/decon/rl/dualview_sep.py
def joint_rl_dispim(
    view_a: cupy.ndarray,
    view_b: cupy.ndarray,
    est_i: cupy.ndarray,
    psf_az: cupy.ndarray,
    psf_ay: cupy.ndarray,
    psf_ax: cupy.ndarray,
    psf_bz: cupy.ndarray,
    psf_by: cupy.ndarray,
    psf_bx: cupy.ndarray,
    bp_az: cupy.ndarray,
    bp_ay: cupy.ndarray,
    bp_ax: cupy.ndarray,
    bp_bz: cupy.ndarray,
    bp_by: cupy.ndarray,
    bp_bx: cupy.ndarray,
    num_iter: int,
    epsilon: float,
    boundary_correction: bool,
    zero_padding: PadType,
    boundary_sigma_a: float,
    boundary_sigma_b: float,
    conv_module: Optional[cupy.RawModule],
    launch_pars: Optional[CuLaunchParameters],
    verbose: bool,
) -> cupy.ndarray:
    """Joint deconvolution specifically for diSPIM volumes.

    Args:
        view_a (cupy.ndarray): input volume for view A
        view_b (cupy.ndarray): input volume for view B
        est_i (cupy.ndarray): initial estimate
        psf_az (cupy.ndarray): PSF for view A, z-direction
        psf_ay (cupy.ndarray): PSF for view A, y-direction
        psf_ax (cupy.ndarray): PSF for view A, x-direction
        psf_bz (cupy.ndarray): PSF for view B, z-direction
        psf_by (cupy.ndarray): PSF for view B, y-direction
        psf_bx (cupy.ndarray): PSF for view B, x-direction
        bp_az (cupy.ndarray): backprojector for view A, z-direction
        bp_ay (cupy.ndarray): backprojector for view A, y-direction
        bp_ax (cupy.ndarray): backprojector for view A, x-direction
        bp_bz (cupy.ndarray): backprojector for view B, z-direction
        bp_by (cupy.ndarray): backprojector for view B, y-direction
        bp_bx (cupy.ndarray): backprojector for view B, x-direction
        num_iter (int): number of deconvolution iterations
        epsilon (float): small parameter to prevent division by zero
        boundary_correction (bool): whether or not to do boundary correction
        zero_padding (Optional[PadType]): zero-padding for boundary correction
        boundary_sigma_a (float): threshold for determining significant pixels in view A, defaults to 1e-2.
        boundary_sigma_b (float): threshold for determining significant pixels in view A, defaults to 1e-2.
        conv_module (Optional[cupy.RawModule]): ``cupy.RawModule`` that implements separable convolution for appropriate kernel. If ``None``, will be generated on the fly.
        launch_pars (Optional[CuLaunchParameters]): kernel launch parameters. If ``None``, will be computed for the input volume.
        verbose (bool): _description_

    Returns:
        cupy.ndarray
    """
    # compile the convolution module
    if conv_module is None:
        conv_module = make_conv_module(len(psf_az) // 2)
    if launch_pars is None:
        launch_pars = calc_launch_params(len(psf_az) // 2, view_a.shape)
    # compile the
    if boundary_correction:
        warn("not implemented, falling back to additive algorithm")
        return _joint_rl_dispim_corr(
            view_a,
            view_b,
            est_i,
            psf_az,
            psf_ay,
            psf_ax,
            psf_bz,
            psf_by,
            psf_bx,
            bp_az,
            bp_ay,
            bp_ax,
            bp_bz,
            bp_by,
            bp_bx,
            num_iter,
            epsilon,
            zero_padding,
            boundary_sigma_a,
            boundary_sigma_b,
            conv_module,
            launch_pars,
            verbose,
        )
    else:
        return _joint_rl_dispim_uncorr(
            view_a,
            view_b,
            est_i,
            psf_az,
            psf_ay,
            psf_ax,
            psf_bz,
            psf_by,
            psf_bx,
            bp_az,
            bp_ay,
            bp_ax,
            bp_bz,
            bp_by,
            bp_bx,
            num_iter,
            epsilon,
            conv_module,
            launch_pars,
            verbose,
        )
deconvolve_chunkwise(view_a: zarr.Array, view_b: zarr.Array, out: zarr.Array, chunk_size: int | Tuple[int, int, int], overlap: int | Tuple[int, int, int], sigma_az: float, sigma_ay: float, sigma_ax: float, sigma_bz: float, sigma_by: float, sigma_bx: float, kernel_radius_z: int, kernel_radius_y: int, kernel_radius_x: int, decon_function: str, num_iter: int, epsilon: float, boundary_correction: bool, zero_padding: PadType, boundary_sigma_a: float, boundary_sigma_b: float, verbose: bool)

Joint deconvolution of the input views A & B, done chunk-by-chunk.

Parameters:

Name Type Description Default
view_a Array

zarr array for view A data

required
view_b Array

zarr array for view B data

required
out Array

zarr array to place deconvolved, output data into

required
chunk_size int | Tuple[int, int, int]

size of chunks

required
overlap int | Tuple[int, int, int]

amount of overlap between chunks

required
sigma_az float

standard deviation of PSF (assumed Gaussian) for view A in z-direction

required
sigma_ay float

standard deviation of PSF (assumed Gaussian) for view A in y-direction

required
sigma_ax float

standard deviation of PSF (assumed Gaussian) for view A in x-direction

required
sigma_bz float

standard deviation of PSF (assumed Gaussian) for view B in z-direction

required
sigma_by float

standard deviation of PSF (assumed Gaussian) for view B in y-direction

required
sigma_bx float

standard deviation of PSF (assumed Gaussian) for view B in x-direction

required
kernel_radius_z int

radius of PSF kernel in z-direction

required
kernel_radius_y int

radius of PSF kernel in y-direction

required
kernel_radius_x int

radius of PSF kernel in x-direction

required
decon_function str

deconvolution function to use. Either "dispim","additive".

required
num_iter int

number of deconvolution iterations.

required
epsilon float

small parameter to prevent division by zero

required
boundary_correction bool

whether or not to do boundary correction

required
zero_padding Optional[PadType]

zero-padding for boundary correction

required
boundary_sigma_a float

threshold for determining significant pixels in view A, defaults to 1e-2.

required
boundary_sigma_b float

threshold for determining significant pixels in view A, defaults to 1e-2.

required
verbose bool

display a progress bar showing how many chunks have been/will be computed.

required
Source code in packages/pyspim/src/pyspim/decon/rl/dualview_sep.py
def deconvolve_chunkwise(
    view_a: zarr.Array,
    view_b: zarr.Array,
    out: zarr.Array,
    chunk_size: int | Tuple[int, int, int],
    overlap: int | Tuple[int, int, int],
    sigma_az: float,
    sigma_ay: float,
    sigma_ax: float,
    sigma_bz: float,
    sigma_by: float,
    sigma_bx: float,
    kernel_radius_z: int,
    kernel_radius_y: int,
    kernel_radius_x: int,
    decon_function: str,
    num_iter: int,
    epsilon: float,
    boundary_correction: bool,
    zero_padding: PadType,
    boundary_sigma_a: float,
    boundary_sigma_b: float,
    verbose: bool,
):
    """Joint deconvolution of the input views A & B, done chunk-by-chunk.

    Args:
        view_a (zarr.Array): zarr array for view A data
        view_b (zarr.Array): zarr array for view B data
        out (zarr.Array): zarr array to place deconvolved, output data into
        chunk_size (int | Tuple[int,int,int]): size of chunks
        overlap (int | Tuple[int,int,int]): amount of overlap between chunks
        sigma_az (float): standard deviation of PSF (assumed Gaussian) for view A in z-direction
        sigma_ay (float): standard deviation of PSF (assumed Gaussian) for view A in y-direction
        sigma_ax (float): standard deviation of PSF (assumed Gaussian) for view A in x-direction
        sigma_bz (float): standard deviation of PSF (assumed Gaussian) for view B in z-direction
        sigma_by (float): standard deviation of PSF (assumed Gaussian) for view B in y-direction
        sigma_bx (float): standard deviation of PSF (assumed Gaussian) for view B in x-direction
        kernel_radius_z (int): radius of PSF kernel in z-direction
        kernel_radius_y (int): radius of PSF kernel in y-direction
        kernel_radius_x (int): radius of PSF kernel in x-direction
        decon_function (str): deconvolution function to use. Either ``"dispim","additive"``.
        num_iter (int): number of deconvolution iterations.
        epsilon (float): small parameter to prevent division by zero
        boundary_correction (bool): whether or not to do boundary correction
        zero_padding (Optional[PadType]): zero-padding for boundary correction
        boundary_sigma_a (float): threshold for determining significant pixels in view A, defaults to 1e-2.
        boundary_sigma_b (float): threshold for determining significant pixels in view A, defaults to 1e-2.
        verbose (bool): display a progress bar showing how many chunks have been/will be computed.
    """
    if len(view_a.shape) == 4:
        channel_slice = slice(None)
        ch_a, z_a, r_a, c_a = view_a.shape
        ch_b, z_b, r_b, c_b = view_b.shape
        assert ch_a == ch_b, "input volumes must have same # channels"
    else:
        channel_slice = None
        z_a, r_a, c_a = view_a.shape
        z_b, r_b, c_b = view_b.shape
    assert all([a == b for a, b in zip([z_a, r_a, c_a], [z_b, r_b, c_b])]), (
        "input volumes must be same shape"
    )
    chunks = calculate_conv_chunks(z_a, r_a, c_a, chunk_size, overlap, channel_slice)
    n_gpu = cupy.cuda.runtime.getDeviceCount()
    with concurrent.futures.ProcessPoolExecutor(
        max_workers=n_gpu, mp_context=multiprocessing.get_context("spawn")
    ) as executor, multiprocessing.Manager() as manager:
        gpu_queue = manager.Queue()
        for gpu_id in range(n_gpu):
            gpu_queue.put(gpu_id)
        fun = partial(
            _decon_chunk,
            out,
            view_a,
            view_b,
            sigma_az,
            sigma_ay,
            sigma_ax,
            sigma_bz,
            sigma_by,
            sigma_bx,
            kernel_radius_z,
            kernel_radius_y,
            kernel_radius_x,
            decon_function,
            num_iter,
            epsilon,
            boundary_correction,
            zero_padding,
            boundary_sigma_a,
            boundary_sigma_b,
        )
        futures = []
        for _, chunk in chunks.items():
            future = executor.submit(fun, chunk, gpu_queue)
            futures.append(future)
        if verbose:
            pbar = tqdm(total=len(futures), desc="Deconvolving Chunks")
        else:
            pbar = nullcontext
        with pbar:
            for future in concurrent.futures.as_completed(futures):
                val = future.result()
                if verbose:
                    pbar.update(1)
                    pbar.set_postfix({"ind": val})

pyspim.decon.rl.singleview_sep

Separable deconvolution for single view volumes.

TODO: more detail here.

Functions

deconvolve(volume: NDArray, psf_z: NDArray, psf_y: NDArray, psf_x: NDArray, bp_z: NDArray, bp_y: NDArray, bp_x: NDArray, num_iter: int, boundary_correction: bool, epsilon: Optional[float], init_constant: bool, boundary_padding: Optional[int], boundary_sigma: float, verbose: bool) -> NDArray

deconvolve do deconvolution of volume.

Parameters:

Name Type Description Default
volume NDArray

input volume to be deconvolved

required
psf_z NDArray

PSF kernel in z-direction

required
psf_y NDArray

PSF kernel in y-direction

required
psf_x NDArray

PSF kernel in x-direction

required
bp_z NDArray

backprojector kernel in z-direction

required
bp_y NDArray

backprojector kernel in y-direction

required
bp_x NDArray

backprojector kernel in x-direction

required
num_iter int

number of iterations to do deconvolution for

required
boundary_correction bool

whether or not to do boundary correction

required
epsilon Optional[float]

small parameter to prevent division by zero

required
init_constant bool

initialize deconvolution with a constant array (if False, will use input volume)

required
boundary_padding Optional[int]

zero-padding for boundary correction

required
boundary_sigma float

significance value for pixels when doing boundary correction

required
verbose bool

show progress bar

required

Raises:

Type Description
NotImplementedError

boundary correction

Returns:

Type Description
NDArray

NDArray

Source code in packages/pyspim/src/pyspim/decon/rl/singleview_sep.py
def deconvolve(
    volume: NDArray,
    psf_z: NDArray,
    psf_y: NDArray,
    psf_x: NDArray,
    bp_z: NDArray,
    bp_y: NDArray,
    bp_x: NDArray,
    num_iter: int,
    boundary_correction: bool,
    epsilon: Optional[float],
    init_constant: bool,
    boundary_padding: Optional[int],
    boundary_sigma: float,
    verbose: bool,
) -> NDArray:
    """deconvolve do deconvolution of ``volume``.

    Args:
        volume (NDArray): input volume to be deconvolved
        psf_z (NDArray): PSF kernel in z-direction
        psf_y (NDArray): PSF kernel in y-direction
        psf_x (NDArray): PSF kernel in x-direction
        bp_z (NDArray): backprojector kernel in z-direction
        bp_y (NDArray): backprojector kernel in y-direction
        bp_x (NDArray): backprojector kernel in x-direction
        num_iter (int): number of iterations to do deconvolution for
        boundary_correction (bool): whether or not to do boundary correction
        epsilon (Optional[float]): small parameter to prevent division by zero
        init_constant (bool): initialize deconvolution with a constant array (if ``False``, will use input volume)
        boundary_padding (Optional[int]): zero-padding for boundary correction
        boundary_sigma (float): significance value for pixels when doing boundary correction
        verbose (bool): show progress bar

    Raises:
        NotImplementedError: boundary correction

    Returns:
        NDArray
    """
    volume = cupy.asarray(volume, dtype=cupy.float32, order="F")
    psf_z = cupy.asarray(psf_z, dtype=cupy.float32)
    psf_y = cupy.asarray(psf_y, dtype=cupy.float32)
    psf_x = cupy.asarray(psf_x, dtype=cupy.float32)
    bp_z = cupy.asarray(bp_z, dtype=cupy.float32)
    bp_y = cupy.asarray(bp_y, dtype=cupy.float32)
    bp_x = cupy.asarray(bp_x, dtype=cupy.float32)
    if init_constant:
        raise NotImplementedError("todo")
    else:
        est_i = volume
    if boundary_correction:
        raise NotImplementedError("todo")
    else:
        return _deconvolve_uncorrected(
            volume, psf_z, psf_y, psf_x, bp_z, bp_y, bp_x, num_iter, epsilon, verbose
        )

pyspim.decon.rl.dualview_fft

Deconvolution algorithms for jointly deconvolving dual-view microscopy data.

References

[1] Wu, et. al., "Spatially isotropic four-dimensional...", doi:10.1038/nbt.2713 [2] Preibsch, et al. "Efficient Bayesian-based...", doi:10.1038/nmeth.2929 [3] Guo, et al. "Rapid image deconvolution..." doi:10.1038/s41587-020-0560-x [4] Anconelli, B., et al. "Reduction of boundary effects...", doi:10.1051/0004-6361:20053848

Functions

deconvolve(view_a: NDArray, view_b: NDArray, est_i: NDArray | None, psf_a: NDArray | Iterable[NDArray], psf_b: NDArray | Iterable[NDArray], backproj_a: NDArray | Iterable[NDArray], backproj_b: NDArray | Iterable[NDArray], decon_function: str, num_iter: int, epsilon: float, req_both: bool, boundary_correction: bool, zero_padding: Optional[PadType], boundary_sigma_a: float, boundary_sigma_b: float, verbose: bool) -> NDArray

Joint deconvolution of 2 views into a single one.

Parameters:

Name Type Description Default
view_a NDArray

array from 1st view

required
view_b NDArray

array from 2nd view

required
est_i NDArray | None

initial estimate

required
psf_a NDArray | Iterable[NDArray]

psf arrays for view A (single one, or 1 per channel)

required
psf_b NDArray | Iterable[NDArray]

psf arrays for view B (single, or 1 per channel)

required
backproj_a NDArray | Iterable[NDArray]

backprojector array(s) for view A

required
backproj_b NDArray | Iterable[NDArray]

backprojector array(s) for view B

required
decon_function str

style of deconvolution to use

required
num_iter int

number of iterations

required
epsilon float

small parameter to prevent division by zero

required
req_both bool

set all areas where either view doesn't have data to 0

required
boundary_correction bool

do boundary correction

required
zero_padding Optional[PadType]

zero padding for boundary correction

required
boundary_sigma_a float

significance level for pixels, view a

required
boundary_sigma_b float

significance level for pixels, view b

required
verbose bool

show progressbar for iterations

required

Raises:

Type Description
ValueError

input array is not 3 or 4D

Returns:

Type Description
NDArray

NDArray

Source code in packages/pyspim/src/pyspim/decon/rl/dualview_fft.py
def deconvolve(
    view_a: NDArray,
    view_b: NDArray,
    est_i: NDArray | None,
    psf_a: NDArray | Iterable[NDArray],
    psf_b: NDArray | Iterable[NDArray],
    backproj_a: NDArray | Iterable[NDArray],
    backproj_b: NDArray | Iterable[NDArray],
    decon_function: str,
    num_iter: int,
    epsilon: float,
    req_both: bool,
    boundary_correction: bool,
    zero_padding: Optional[PadType],
    boundary_sigma_a: float,
    boundary_sigma_b: float,
    verbose: bool,
) -> NDArray:
    """Joint deconvolution of 2 views into a single one.

    Args:
        view_a (NDArray): array from 1st view
        view_b (NDArray): array from 2nd view
        est_i (NDArray | None): initial estimate
        psf_a (NDArray | Iterable[NDArray]): psf arrays for view A (single one, or 1 per channel)
        psf_b (NDArray | Iterable[NDArray]): psf arrays for view B (single, or 1 per channel)
        backproj_a (NDArray | Iterable[NDArray]): backprojector array(s) for view A
        backproj_b (NDArray | Iterable[NDArray]): backprojector array(s) for view B
        decon_function (str): style of deconvolution to use
        num_iter (int): number of iterations
        epsilon (float): small parameter to prevent division by zero
        req_both (bool): set all areas where either view doesn't have data to 0
        boundary_correction (bool): do boundary correction
        zero_padding (Optional[PadType]): zero padding for boundary correction
        boundary_sigma_a (float): significance level for pixels, view a
        boundary_sigma_b (float): significance level for pixels, view b
        verbose (bool): show progressbar for iterations

    Raises:
        ValueError: input array is not 3 or 4D

    Returns:
        NDArray
    """
    if len(view_a.shape) == 4:
        return _deconvolve_multichannel(
            view_a,
            view_b,
            est_i,
            psf_a,
            psf_b,
            backproj_a,
            backproj_b,
            decon_function,
            num_iter,
            epsilon,
            boundary_correction,
            req_both,
            zero_padding,
            boundary_sigma_a,
            boundary_sigma_b,
            verbose,
        )
    elif len(view_a.shape) == 3:
        return _deconvolve_single_channel(
            view_a,
            view_b,
            est_i,
            psf_a,
            psf_b,
            backproj_a,
            backproj_b,
            decon_function,
            num_iter,
            epsilon,
            boundary_correction,
            req_both,
            zero_padding,
            boundary_sigma_a,
            boundary_sigma_b,
            verbose,
        )
    else:
        raise ValueError("invalid shape, must be 3- or 4D")
joint_rl_dispim(view_a: NDArray, view_b: NDArray, est_i: NDArray, psf_a: NDArray, psf_b: NDArray, backproj_a: Optional[NDArray] = None, backproj_b: Optional[NDArray] = None, num_iter: int = 10, epsilon: float = 1e-05, boundary_correction: bool = True, req_both: bool = False, zero_padding: Optional[PadType] = None, boundary_sigma_a: float = 0.01, boundary_sigma_b: float = 0.01, verbose: bool = False) -> NDArray

joint_rl_dispim Modified Richardson-Lucy joint deconvolution, as described in [1].

originally developed by the Shroff group at the NIH for use in diSPIM imaging. NOTE: boundary correction is not implemented for this method, as at the time of writing, there is no known way of doing this

Parameters:

Name Type Description Default
view_a NDArray

data array corresponding to first view (A)

required
view_b NDArray

data array corresponding to second view (B)

required
est_i NDArray

initial estimate for deconvolution. If None, will use (A+B)/2.

required
psf_a NDArray

array with real-space point spread function for view A

required
psf_b NDArray

array with real-space point spread function for view B

required
backproj_a NDArray

array with real-space backprojector for view A. If None, the mirrored point spread function is used.

None
backproj_b NDArray

array with real-space backprojector for view B. If None, the mirrored point spread function is used.

None
num_iter int

number of iterations to deconvolve for. Defaults to 10.

10
epsilon float

small parameter to prevent division by zero errors. Defaults to 1e-5.

1e-05
boundary_correction bool

correct boundary effects, defaults to True.

True
req_both bool

only deconvolve areas where views a & b have data, defaults to False.

False
zero_padding PadType

amount of zero-padding to add to each axis, defaults to None. if None, each axis of size N is padded on each side by N/2 so that the padded image has dimension 2N in that axis.

None
boundary_sigma_a float

threshold for determining significant pixels in view A, defaults to 1e-2.

0.01
boundary_sigma_b float

threshold for determining significant pixels in view B, defaults to 1e-2.

0.01
verbose bool

display progress bar, defaults to False

False

Returns:

Type Description
NDArray

NDArray

Source code in packages/pyspim/src/pyspim/decon/rl/dualview_fft.py
def joint_rl_dispim(
    view_a: NDArray,
    view_b: NDArray,
    est_i: NDArray,
    psf_a: NDArray,
    psf_b: NDArray,
    backproj_a: Optional[NDArray] = None,
    backproj_b: Optional[NDArray] = None,
    num_iter: int = 10,
    epsilon: float = 1e-5,
    boundary_correction: bool = True,
    req_both: bool = False,
    zero_padding: Optional[PadType] = None,
    boundary_sigma_a: float = 1e-2,
    boundary_sigma_b: float = 1e-2,
    verbose: bool = False,
) -> NDArray:
    """joint_rl_dispim Modified Richardson-Lucy joint deconvolution, as described in [1].

    originally developed by the Shroff group at the NIH for use in diSPIM imaging.
    NOTE: boundary correction is not implemented for this method, as at the time of writing, there is no known way of doing this

    Args:
        view_a (NDArray): data array corresponding to first view (A)
        view_b (NDArray): data array corresponding to second view (B)
        est_i (NDArray): initial estimate for deconvolution. If ``None``, will use (A+B)/2.
        psf_a (NDArray): array with real-space point spread function for view A
        psf_b (NDArray): array with real-space point spread function for view B
        backproj_a (NDArray, optional): array with real-space backprojector for view A. If `None`, the mirrored point spread function is used.
        backproj_b (NDArray, optional): array with real-space backprojector for view B. If `None`, the mirrored point spread function is used.
        num_iter (int, optional): number of iterations to deconvolve for. Defaults to 10.
        epsilon (float, optional): small parameter to prevent division by zero errors. Defaults to 1e-5.
        boundary_correction (bool, optional): correct boundary effects, defaults to True.
        req_both (bool, optional): only deconvolve areas where views a & b have data, defaults to False.
        zero_padding (PadType, optional): amount of zero-padding to add to each axis, defaults to None. if None, each axis of size N is padded on each side by N/2 so that the padded image has dimension 2N in that axis.
        boundary_sigma_a (float, optional): threshold for determining significant pixels in view A, defaults to 1e-2.
        boundary_sigma_b (float, optional): threshold for determining significant pixels in view B, defaults to 1e-2.
        verbose (bool, optional): display progress bar, defaults to False

    Returns:
        NDArray
    """
    if boundary_correction:
        return _additive_joint_rl_boundcorr(
            view_a,
            view_b,
            est_i,
            psf_a,
            psf_b,
            backproj_a,
            backproj_b,
            num_iter,
            epsilon,
            zero_padding,
            boundary_sigma_a,
            boundary_sigma_b,
            req_both,
            verbose,
        )
    else:
        return _joint_rl_dispim_uncorr(
            view_a,
            view_b,
            est_i,
            psf_a,
            psf_b,
            backproj_a,
            backproj_b,
            num_iter,
            epsilon,
            req_both,
            verbose,
        )
efficient_bayesian_backprojectors(psf_a: NDArray, psf_b: NDArray) -> Tuple[NDArray, NDArray]

efficient_bayesian_backprojectors Calculate proper backprojectors for "Efficient Bayesian" deconvolution.

Parameters:

Name Type Description Default
psf_a NDArray

point spread function for view A

required
psf_b NDArray

point spread function for view B

required

Returns:

Type Description
Tuple[NDArray, NDArray]

Tuple[NDArray,NDArray]: tuple of backprojectors, one for each view.

Source code in packages/pyspim/src/pyspim/decon/rl/dualview_fft.py
def efficient_bayesian_backprojectors(
    psf_a: NDArray, psf_b: NDArray
) -> Tuple[NDArray, NDArray]:
    """efficient_bayesian_backprojectors Calculate proper backprojectors for "Efficient Bayesian" deconvolution.

    Args:
        psf_a (NDArray): point spread function for view A
        psf_b (NDArray): point spread function for view B

    Returns:
        Tuple[NDArray,NDArray]: tuple of backprojectors, one for each view.
    """
    xp = cupy.get_array_module(psf_a)
    if xp == cupy:
        conv = lambda k, i: fftconv_gpu(i, k, mode="same")
    else:
        conv = lambda k, i: fftconv_cpu(i, k, mode="same")
    float_type = supported_float_type(psf_a.dtype)
    psf_a = psf_a.astype(float_type, copy=False)
    psf_b = psf_b.astype(float_type, copy=False)
    # formulate backprojectors for "virtual view" updates
    # TODO: write this s.t. it's valid for 2D images, too
    flp_a = xp.ascontiguousarray(psf_a[::-1, ::-1, ::-1])
    flp_b = xp.ascontiguousarray(psf_b[::-1, ::-1, ::-1])
    # calculate backprojectors
    bp_a = flp_a * conv(conv(flp_a, psf_b), flp_b)
    bp_b = flp_b * conv(conv(flp_b, psf_a), flp_a)
    return bp_a, bp_b
efficient_bayesian(view_a: NDArray, view_b: NDArray, est_i: NDArray, psf_a: NDArray, psf_b: NDArray, num_iter: int = 10, epsilon: float = 1e-05, boundary_correction: bool = True, req_both: bool = False, zero_padding: Optional[PadType] = None, boundary_sigma_a: float = 0.01, boundary_sigma_b: float = 0.01, verbose: bool = False) -> NDArray

efficient_bayesian Efficient bayesian multiview deconvolution.

Originally described in [2], this is just additive joint deconvolution with backprojectors calculated as described in the "quadruple-view deconvolution" section of [3]

Parameters:

Name Type Description Default
view_a NDArray

data array corresponding to first view (A)

required
view_b NDArray

data array corresponding to second view (B)

required
est_i NDArray

initial estimate for deconvolution. If None, will use (A+B)/2.

required
psf_a NDArray

array with real-space point spread function for view A

required
psf_b NDArray

array with real-space point spread function for view B

required
backproj_a NDArray

array with real-space backprojector for view A. If None, the mirrored point spread function is used.

required
backproj_b NDArray

array with real-space backprojector for view B. If None, the mirrored point spread function is used.

required
num_iter int

number of iterations to deconvolve for. Defaults to 10.

10
epsilon float

small parameter to prevent division by zero errors. Defaults to 1e-5.

1e-05
boundary_correction bool

correct boundary effects, defaults to True.

True
req_both bool

only deconvolve areas where views a & b have data, defaults to False.

False
zero_padding PadType

amount of zero-padding to add to each axis, defaults to None. if None, each axis of size N is padded on each side by N/2 so that the padded image has dimension 2N in that axis.

None
boundary_sigma_a float

threshold for determining significant pixels in view A, defaults to 1e-2.

0.01
boundary_sigma_b float

threshold for determining significant pixels in view B, defaults to 1e-2.

0.01
verbose bool

display progress bar, defaults to False

False

Returns:

Type Description
NDArray

NDArray

Source code in packages/pyspim/src/pyspim/decon/rl/dualview_fft.py
def efficient_bayesian(
    view_a: NDArray,
    view_b: NDArray,
    est_i: NDArray,
    psf_a: NDArray,
    psf_b: NDArray,
    num_iter: int = 10,
    epsilon: float = 1e-5,
    boundary_correction: bool = True,
    req_both: bool = False,
    zero_padding: Optional[PadType] = None,
    boundary_sigma_a: float = 1e-2,
    boundary_sigma_b: float = 1e-2,
    verbose: bool = False,
) -> NDArray:
    """efficient_bayesian Efficient bayesian multiview deconvolution.

    Originally described in [2], this is just additive joint deconvolution with backprojectors calculated as described in the "quadruple-view deconvolution" section of [3]

    Args:
        view_a (NDArray): data array corresponding to first view (A)
        view_b (NDArray): data array corresponding to second view (B)
        est_i (NDArray): initial estimate for deconvolution. If ``None``, will use (A+B)/2.
        psf_a (NDArray): array with real-space point spread function for view A
        psf_b (NDArray): array with real-space point spread function for view B
        backproj_a (NDArray, optional): array with real-space backprojector for view A. If `None`, the mirrored point spread function is used.
        backproj_b (NDArray, optional): array with real-space backprojector for view B. If `None`, the mirrored point spread function is used.
        num_iter (int, optional): number of iterations to deconvolve for. Defaults to 10.
        epsilon (float, optional): small parameter to prevent division by zero errors. Defaults to 1e-5.
        boundary_correction (bool, optional): correct boundary effects, defaults to True.
        req_both (bool, optional): only deconvolve areas where views a & b have data, defaults to False.
        zero_padding (PadType, optional): amount of zero-padding to add to each axis, defaults to None. if None, each axis of size N is padded on each side by N/2 so that the padded image has dimension 2N in that axis.
        boundary_sigma_a (float, optional): threshold for determining significant pixels in view A, defaults to 1e-2.
        boundary_sigma_b (float, optional): threshold for determining significant pixels in view B, defaults to 1e-2.
        verbose (bool, optional): display progress bar, defaults to False

    Returns:
        NDArray
    """
    # compute backprojectors
    backproj_a, backproj_b = efficient_bayesian_backprojectors(psf_a, psf_b)
    return additive_joint_rl(
        view_a,
        view_b,
        est_i,
        psf_a,
        psf_b,
        backproj_a,
        backproj_b,
        num_iter,
        epsilon,
        boundary_correction,
        req_both,
        zero_padding,
        boundary_sigma_a,
        boundary_sigma_b,
        verbose,
    )
additive_joint_rl(view_a: NDArray, view_b: NDArray, est_i: NDArray, psf_a: NDArray, psf_b: NDArray, backproj_a: Optional[NDArray] = None, backproj_b: Optional[NDArray] = None, num_iter: int = 10, epsilon: float = 1e-05, boundary_correction: bool = True, req_both: bool = False, zero_padding: Optional[PadType] = None, boundary_sigma_a: float = 0.01, boundary_sigma_b: float = 0.01, verbose: bool = False) -> NDArray

additive_joint_rl Additive joint deconvolution.

With boundary correction turned off, each view is deconvolved separately and then averaged at each iteration. With boundary correction turned on, an ordered subset EM algorithm from [4] is used.

Parameters:

Name Type Description Default
view_a NDArray

data array corresponding to first view (A)

required
view_b NDArray

data array corresponding to second view (B)

required
est_i NDArray

initial estimate for deconvolution. If None, will use (A+B)/2.

required
psf_a NDArray

array with real-space point spread function for view A

required
psf_b NDArray

array with real-space point spread function for view B

required
backproj_a NDArray

array with real-space backprojector for view A. If None, the mirrored point spread function is used.

None
backproj_b NDArray

array with real-space backprojector for view B. If None, the mirrored point spread function is used.

None
num_iter int

number of iterations to deconvolve for. Defaults to 10.

10
epsilon float

small parameter to prevent division by zero errors. Defaults to 1e-5.

1e-05
boundary_correction bool

correct boundary effects, defaults to True.

True
req_both bool

only deconvolve areas where views a & b have data, defaults to False.

False
zero_padding PadType

amount of zero-padding to add to each axis, defaults to None. if None, each axis of size N is padded on each side by N/2 so that the padded image has dimension 2N in that axis.

None
boundary_sigma_a float

threshold for determining significant pixels in view A, defaults to 1e-2.

0.01
boundary_sigma_b float

threshold for determining significant pixels in view B, defaults to 1e-2.

0.01
verbose bool

display progress bar, defaults to False

False

Returns:

Type Description
NDArray

NDArray

Source code in packages/pyspim/src/pyspim/decon/rl/dualview_fft.py
def additive_joint_rl(
    view_a: NDArray,
    view_b: NDArray,
    est_i: NDArray,
    psf_a: NDArray,
    psf_b: NDArray,
    backproj_a: Optional[NDArray] = None,
    backproj_b: Optional[NDArray] = None,
    num_iter: int = 10,
    epsilon: float = 1e-5,
    boundary_correction: bool = True,
    req_both: bool = False,
    zero_padding: Optional[PadType] = None,
    boundary_sigma_a: float = 1e-2,
    boundary_sigma_b: float = 1e-2,
    verbose: bool = False,
) -> NDArray:
    """additive_joint_rl Additive joint deconvolution.

    With boundary correction turned off, each view is deconvolved separately and then averaged at each iteration.
    With boundary correction turned on, an ordered subset EM algorithm from [4] is used.

    Args:
        view_a (NDArray): data array corresponding to first view (A)
        view_b (NDArray): data array corresponding to second view (B)
        est_i (NDArray): initial estimate for deconvolution. If ``None``, will use (A+B)/2.
        psf_a (NDArray): array with real-space point spread function for view A
        psf_b (NDArray): array with real-space point spread function for view B
        backproj_a (NDArray, optional): array with real-space backprojector for view A. If `None`, the mirrored point spread function is used.
        backproj_b (NDArray, optional): array with real-space backprojector for view B. If `None`, the mirrored point spread function is used.
        num_iter (int, optional): number of iterations to deconvolve for. Defaults to 10.
        epsilon (float, optional): small parameter to prevent division by zero errors. Defaults to 1e-5.
        boundary_correction (bool, optional): correct boundary effects, defaults to True.
        req_both (bool, optional): only deconvolve areas where views a & b have data, defaults to False.
        zero_padding (PadType, optional): amount of zero-padding to add to each axis, defaults to None. if None, each axis of size N is padded on each side by N/2 so that the padded image has dimension 2N in that axis.
        boundary_sigma_a (float, optional): threshold for determining significant pixels in view A, defaults to 1e-2.
        boundary_sigma_b (float, optional): threshold for determining significant pixels in view B, defaults to 1e-2.
        verbose (bool, optional): display progress bar, defaults to False

    Returns:
        NDArray
    """
    if boundary_correction:
        return _additive_joint_rl_boundcorr(
            view_a,
            view_b,
            est_i,
            psf_a,
            psf_b,
            backproj_a,
            backproj_b,
            num_iter,
            epsilon,
            zero_padding,
            boundary_sigma_a,
            boundary_sigma_b,
            req_both,
            verbose,
        )
    else:
        return _additive_joint_rl_uncorr(
            view_a,
            view_b,
            est_i,
            psf_a,
            psf_b,
            backproj_a,
            backproj_b,
            num_iter,
            epsilon,
            req_both,
            verbose,
        )
estimate_initialization(view_a: NDArray, view_b: NDArray, init_constant: bool) -> NDArray

estimate_initialization Initialize estimate for deconvolution

Parameters:

Name Type Description Default
view_a NDArray

input volume, view A

required
view_b NDArray

input volume, view B

required
init_constant bool

initialize as constant matrix, otherwise (A + B)/2

required

Returns:

Type Description
NDArray

NDArray

Source code in packages/pyspim/src/pyspim/decon/rl/dualview_fft.py
def estimate_initialization(
    view_a: NDArray, view_b: NDArray, init_constant: bool
) -> NDArray:
    """estimate_initialization Initialize estimate for deconvolution

    Args:
        view_a (NDArray): input volume, view A
        view_b (NDArray): input volume, view B
        init_constant (bool): initialize as constant matrix, otherwise (A + B)/2

    Returns:
        NDArray
    """
    xp = cupy.get_array_module(view_a)
    if init_constant:
        # initialize the estimate as a constant value that can satisfy
        # the flux constraint, (Eqn. 16 of [4])
        c = xp.sum((view_a + view_b) / 2)
        return xp.ones_like(view_a) * (c / xp.prod(xp.asarray(view_a.shape)))
    else:
        # do (view_a + view_b) / 2. as is done typically in RL decon.
        # (NOTE: this still satisfies flux constraint, so ok)
        return (view_a + view_b) / 2

pyspim.decon.rl.singleview_fft

Single-view Richardson Lucy deconvolution using FFT-space convolution.

References

[1] Bertero & Boccacci, "A simple method...", doi:10.1051/0004-6361:20052717

Functions

richardson_lucy(image: NDArray, psf: NDArray, bp: NDArray, num_iter: int = 50, boundary_correction: bool = True, epsilon: Optional[float] = None, boundary_padding: Optional[int] = None, boundary_sigma: float = 0.01, verbose: bool = False) -> NDArray

richardson_lucy Richardson-Lucy deconvolution.

Parameters:

Name Type Description Default
image NDArray

input volume to be deconvolved

required
psf NDArray

point spread function

required
bp NDArray

back projector for deconvolution

required
num_iter int

number of iterations. Defaults to 50.

50
boundary_correction bool

whether or not to do boundary correction. Defaults to True.

True
epsilon Optional[float]

small parameter to prevent division by zero. Defaults to None.

None
boundary_padding Optional[int]

zero-padding for boundary correction. Defaults to None.

None
boundary_sigma float

significance level for pixels when doing boundary correction. Defaults to 1e-2.

0.01
verbose bool

show a progress bar. Defaults to False.

False

Raises:

Type Description
NotImplementedError

uncorrected version of RL deconvolution.

Returns:

Type Description
NDArray

NDArray

Source code in packages/pyspim/src/pyspim/decon/rl/singleview_fft.py
def richardson_lucy(
    image: NDArray,
    psf: NDArray,
    bp: NDArray,
    num_iter: int = 50,
    boundary_correction: bool = True,
    epsilon: Optional[float] = None,
    boundary_padding: Optional[int] = None,
    boundary_sigma: float = 1e-2,
    verbose: bool = False,
) -> NDArray:
    """richardson_lucy Richardson-Lucy deconvolution.

    Args:
        image (NDArray): input volume to be deconvolved
        psf (NDArray): point spread function
        bp (NDArray): back projector for deconvolution
        num_iter (int, optional): number of iterations. Defaults to 50.
        boundary_correction (bool, optional): whether or not to do boundary correction. Defaults to True.
        epsilon (Optional[float], optional): small parameter to prevent division by zero. Defaults to None.
        boundary_padding (Optional[int], optional): zero-padding for boundary correction. Defaults to None.
        boundary_sigma (float, optional): significance level for pixels when doing boundary correction. Defaults to 1e-2.
        verbose (bool, optional): show a progress bar. Defaults to False.

    Raises:
        NotImplementedError: uncorrected version of RL deconvolution.

    Returns:
        NDArray
    """
    if boundary_correction:
        return richardson_lucy_boundcorr(
            image,
            psf,
            bp,
            num_iter,
            epsilon,
            boundary_padding,
            boundary_sigma,
            init_constant=False,
            verbose=verbose,
        )
    else:
        raise NotImplementedError("TODO")
richardson_lucy_boundcorr(image: NDArray, psf: NDArray, bp: Optional[NDArray] = None, num_iter: int = 50, epsilon: float = 0.0001, zero_padding: Optional[PadType] = None, boundary_sigma: float = 0.01, init_constant: bool = False, verbose: bool = False) -> NDArray

richardson_lucy_boundcorr Richardson-Lucy deconvolution with boundary correction described in [1], this method reduces Gibbs oscillations that occur due to boundary effects when doing RL deconvolution.

Parameters:

Name Type Description Default
image NDArray

input volume to be deconvolved

required
psf NDArray

point spread function

required
bp Optional[NDArray]

backprojector. Defaults to None, if so will used mirrored PSF.

None
num_iter int

number of iterations. Defaults to 50.

50
epsilon float

small parameter to prevent division by zero. Defaults to 1e-4.

0.0001
zero_padding Optional[PadType]

amount of zero-padding. Defaults to None.

None
boundary_sigma float

significance level for pixels when doing boundary correction. Defaults to 1e-2.

0.01
init_constant bool

whether to make the inital guess a constant array, (True) or (A+B)/2 (False). Defaults to False.

False
verbose bool

show progress bar. Defaults to False.

False

Returns:

Type Description
NDArray

NDArray

Source code in packages/pyspim/src/pyspim/decon/rl/singleview_fft.py
def richardson_lucy_boundcorr(
    image: NDArray,
    psf: NDArray,
    bp: Optional[NDArray] = None,
    num_iter: int = 50,
    epsilon: float = 1e-4,
    zero_padding: Optional[PadType] = None,
    boundary_sigma: float = 1e-2,
    init_constant: bool = False,
    verbose: bool = False,
) -> NDArray:
    """richardson_lucy_boundcorr Richardson-Lucy deconvolution with boundary correction described in [1], this method reduces Gibbs oscillations that occur due to boundary effects when doing RL deconvolution.

    Args:
        image (NDArray): input volume to be deconvolved
        psf (NDArray): point spread function
        bp (Optional[NDArray], optional): backprojector. Defaults to None, if so will used mirrored PSF.
        num_iter (int, optional): number of iterations. Defaults to 50.
        epsilon (float, optional): small parameter to prevent division by zero. Defaults to 1e-4.
        zero_padding (Optional[PadType], optional): amount of zero-padding. Defaults to None.
        boundary_sigma (float, optional): significance level for pixels when doing boundary correction. Defaults to 1e-2.
        init_constant (bool, optional): whether to make the inital guess a constant array, (``True``) or (A+B)/2 (``False``). Defaults to False.
        verbose (bool, optional): show progress bar. Defaults to False.

    Returns:
        NDArray
    """
    assert len(image.shape) == 2 or len(image.shape) == 3, (
        "RL deconvolution only implemented for 2- and 3d inputs"
    )
    xp = cupy.get_array_module(image)
    # make sure all inputs are floats
    float_type = supported_float_type(image.dtype)
    image = image.astype(float_type, copy=False)
    psf = psf.astype(float_type, copy=False)
    # default back-projector is mirrored PSF
    if bp is None:
        bp = xp.ascontiguousarray(psf[::-1, ::-1, ::-1])
    # figure out padding
    if zero_padding is None:  # use default value from paper (2N)
        # default is to pad such that an NxN image is 2Nx2N
        pad = tuple([(d // 2, d // 2) for d in image.shape])
    else:
        if isinstance(zero_padding, int):
            pad = tuple([(zero_padding, zero_padding) for _ in image.shape])
        else:
            assert len(zero_padding) == len(image.shape), (
                "zero padding must be specified for all dimensions of input"
            )
            pad = tuple([(p, p) for p in zero_padding])
    # define utility function for doing convolution
    # NOTE: by default `fftconvolve` wants the larger array as first arg
    # but for clarity, I like specifying the PSF first
    if xp == cupy:
        conv = lambda k, i: fftconv_gpu(i, k, mode="same")
    else:
        conv = lambda k, i: fftconv_cpu(i, k, mode="same")
    # generate the mask that we'll use to determine $\overbar{alpha}$
    mask = xp.ones_like(image)
    # pad the image and the mask with zeros
    image = xp.pad(image, padding=pad, mode="constant", constant_values=0)
    mask = xp.pad(mask, padding=pad, mode="constant", constant_values=0)
    # calculate the window, $\overbar{w}(n')
    alpha = conv(bp, mask)
    window = xp.where(alpha > boundary_sigma, 1.0 / alpha, 0.0)
    # initialization conditions
    # if *not* init_constant, just start with the data
    if init_constant:
        est = xp.ones_like(image) * xp.mean(image)
    else:
        est = image
    # Lucy-Richardson iterations
    for _ in trange(num_iter) if verbose else range(num_iter):
        con = conv(psf, est)
        est = xp.multiply(
            xp.multiply(window, est),  # conv(window, est)
            conv(bp, xp.where(con < epsilon, 0, image / con)),
        )
    # trim out padding
    if len(est) == 2:
        return est[pad[0][0] : -pad[0][1], pad[1][0] : -pad[1][1]]
    else:
        return est[
            pad[0][0] : -pad[0][1], pad[1][0] : -pad[1][1], pad[2][0] : -pad[2][1]
        ]

pyspim.filter.adaptive_median

Adaptive Median Filtering

as described in: Weisong Zhao et al. "Sparse deconvolution improves..." Nature Biotechnology 40, 606–617 (2022).

Functions

pyspim.filter.fftwavelet

Functions

remove_stripes(im: NDArray, dec_num: int, wavelet: str, sigma: float, direction: str = 'horizontal', verbose: bool = False) -> NDArray

remove stripes from input image by the FFTWavelet algorithm

:param im: input image :type im: NDArray :param dec_num: number of decimation levels (loops) :type dec_num: int :param wavelet: wavelet to use :type wavelet: str :param sigma: description :type sigma: float :param direction: direction of stripes in input to be removed, defaults to 'horizontal' :type direction: str, optional :param verbose: show progress through loop, defaults to False :type verbose: bool, optional :raises ValueError: if direction isn't 'horizontal' or 'vertical' :return: input image with stripes removed :rtype: NDArray

Source code in packages/pyspim/src/pyspim/filter/fftwavelet.py
def remove_stripes(
    im: NDArray,
    dec_num: int,
    wavelet: str,
    sigma: float,
    direction: str = "horizontal",
    verbose: bool = False,
) -> NDArray:
    """remove stripes from input image by the FFTWavelet algorithm

    :param im: input image
    :type im: NDArray
    :param dec_num: number of decimation levels (loops)
    :type dec_num: int
    :param wavelet: wavelet to use
    :type wavelet: str
    :param sigma: _description_
    :type sigma: float
    :param direction: direction of stripes in input to be removed, defaults to 'horizontal'
    :type direction: str, optional
    :param verbose: show progress through loop, defaults to False
    :type verbose: bool, optional
    :raises ValueError: if direction isn't `'horizontal'` or `'vertical'`
    :return: input image with stripes removed
    :rtype: NDArray
    """
    input_is_gpu = cupy.get_array_module(im) == cupy
    if input_is_gpu:
        warnings.warn("remove_stripes is computed on the cpu, will move gpu->cpu->gpu")
    im = im.get() if input_is_gpu else im
    dir_char = direction.lower()[0]
    scales = []
    prev_cA = im.copy()
    for _ in trange(dec_num, desc="dwt") if verbose else range(dec_num):
        (cA, (cH, cV, cD)) = dwt2(prev_cA, wavelet)
        if dir_char == "v" or dir_char == "c":
            fCv = numpy.fft.fftshift(numpy.fft.fft2(cV))
            my, mx = fCv.shape
            flrmy2 = numpy.floor(my / 2)
            denom = 2 * numpy.square(sigma)
            damp = 1 - numpy.exp(
                -numpy.square(numpy.arange(-flrmy2, -flrmy2 + my)) / denom
            )
            fCv *= numpy.repeat(damp[:, None], mx, 1)
            cV = numpy.fft.ifft(numpy.fft.ifftshift(fCv))
        elif dir_char == "h" or dir_char == "r":
            fCh = numpy.fft.fftshift(numpy.fft.fft2(cH))
            my, mx = fCh.shape
            flrmx2 = numpy.floor(mx / 2)
            denom = 2 * numpy.square(sigma)
            damp = 1 - numpy.exp(
                -numpy.square(numpy.arange(-flrmx2, -flrmx2 + mx)) / denom
            )
            fCh *= numpy.repeat(damp[None, :], my, 0)
            cH = numpy.fft.ifft(numpy.fft.ifftshift(fCh))
        else:
            raise ValueError("invalid direction")
        prev_cA = cA.copy()
        scales.insert(0, (cH, cV, cD))
    recon = prev_cA
    for dec in trange(dec_num, desc="idwt") if verbose else range(dec_num):
        cH, cV, cD = scales[dec]
        recon = recon[: cH.shape[0], : cH.shape[1]]
        recon = idwt2((recon, (cH, cV, cD)), wavelet)
    recon = numpy.real(recon)
    if input_is_gpu:
        return cupy.asarray(recon)
    return recon

pyspim.stitch.dispim

Functions

pyspim.stitch.grid

Functions

Modules

pyspim.psf.gauss

Functions

gaussian_fwhm(sigma: float) -> float

gaussian_fwhm Compute full-width at half-max for Gaussian from std. dev.

Parameters:

Name Type Description Default
sigma float

standard deviation of Gaussian

required

Returns:

Type Description
float

float

Source code in packages/pyspim/src/pyspim/psf/gauss.py
def gaussian_fwhm(sigma: float) -> float:
    """gaussian_fwhm Compute full-width at half-max for Gaussian from std. dev.

    Args:
        sigma (float): standard deviation of Gaussian

    Returns:
        float
    """
    return 2 * numpy.sqrt(2 * numpy.log(2)) * sigma

pyspim.opt.powell

Powell's method

pyspim.opt.affine

Functions

pyspim.deskew.ortho

Deskewing by orthogonal interpolation.

Uses the 'orthogonal interpolation' scheme where interpolation is done orthogonally to the direction of the light sheet. this was originally proposed/implemented in V. Maioli's Ph.D. thesis [1]. Our code is based heavily on the implementation by D. Shepherd's group [2].

References

[1] Vincent Miaioli's PhD Thesis doi: 10.25560/68022 [2] github.com/QI2lab/OPM

Functions

deskew_stage_scan(im: NDArray, pixel_size: float, step_size: float, direction: int, theta: float = math.pi / 4, preserve_dtype: bool = False, stream: cupy.cuda.Stream | None = None) -> NDArray

Deskew stage scan data into xyz coordinate system.

Output format is zyx where z is normal to the coverslip, and x & y are the coverslip, y is along the long axis of the sheet, x along is the scan direction.

Parameters:

Name Type Description Default
im NDArray

input volume to be deskewed

required
pixel_size float

size of pixels, in real (space) units

required
step_size float

real space spacing between sheets

required
direction int

scan direction (±1)

required
theta float

angle of objective w.r.t coverslip (in radians)

pi / 4
preserve_dtype bool

preserve input datatype in output. If False, will return single-precision float.

False

Returns: NDArray

Source code in packages/pyspim/src/pyspim/deskew/ortho.py
def deskew_stage_scan(
    im: NDArray,
    pixel_size: float,
    step_size: float,
    direction: int,
    theta: float = math.pi / 4,
    preserve_dtype: bool = False,
    stream: cupy.cuda.Stream | None = None,
) -> NDArray:
    """Deskew stage scan data into xyz coordinate system.

    Output format is zyx where z is normal to the coverslip, and x & y are the coverslip, y is along the long axis of the sheet, x along is the scan direction.

    Args:
        im (NDArray): input volume to be deskewed
        pixel_size (float): size of pixels, in real (space) units
        step_size (float): real space spacing between sheets
        direction (int): scan direction (+/-1)
        theta (float): angle of objective w.r.t coverslip (in radians)
        preserve_dtype (bool): preserve input datatype in output. If ``False``, will return single-precision float.
    Returns:
        NDArray
    """
    xp = cupy.get_array_module(im)
    if xp == numpy: # type: ignore
        dsk = _deskew_orthogonal_cpu(
            im, pixel_size, step_size, direction, theta, preserve_dtype
        )
        #dsk = _zero_triangle_cpu(dsk, direction) # FIXME: why is this here, not necessary?
    else:
        dsk = _deskew_gpu(im, pixel_size, step_size, direction, theta, 
                          preserve_dtype, stream)
        dsk = _zero_triangle_gpu(dsk, direction)
    return dsk
output_shape(z: int, r: int, c: int, pixel_size: float, step_size: float, theta: float = math.pi / 4) -> Tuple[int, int, int]

output_shape Calculate output shape of deskewing a volume.

Parameters:

Name Type Description Default
z int

size of input volume in z-direction

required
r int

size of input volume in r-direction (# rows)

required
c int

size of input volume in c-direction (# cols)

required
pixel_size float

pixel size, in real units

required
step_size float

step size, in real units

required
theta float

angle of objective w.r.t coverslip. Defaults to math.pi/4.

pi / 4

Returns:

Type Description
Tuple[int, int, int]

Tuple[int,int,int]: (n_z, n_y, n_x) shape of deskewed volume

Source code in packages/pyspim/src/pyspim/deskew/ortho.py
def output_shape(
    z: int,
    r: int,
    c: int,
    pixel_size: float,
    step_size: float,
    theta: float = math.pi / 4,
) -> Tuple[int, int, int]:
    """output_shape Calculate output shape of deskewing a volume.

    Args:
        z (int): size of input volume in z-direction
        r (int): size of input volume in r-direction (# rows)
        c (int): size of input volume in c-direction (# cols)
        pixel_size (float): pixel size, in real units
        step_size (float): step size, in real units
        theta (float, optional): angle of objective w.r.t coverslip. Defaults to math.pi/4.

    Returns:
        Tuple[int,int,int]: (n_z, n_y, n_x) shape of deskewed volume
    """
    step_size_lat = step_size / math.cos(theta)
    step_pix = step_size_lat / pixel_size
    # precompute useful trig quantities
    # determine output shape & preallocate
    cos_theta = math.cos(theta)
    sin_theta = math.sin(theta)
    n_x = numpy.int64(numpy.ceil(z * step_pix + c * cos_theta))
    n_y = numpy.int64(r)
    n_z = numpy.int64(numpy.ceil(c * sin_theta))
    return (n_z, n_y, n_x) # type: ignore

pyspim.deskew._util

Functions

extract_coefficients(p, degree)

Extract polynomial coefficients from a polynomial function.

Given a polynomial function p(x) of specified degree, this function extracts the coefficients by evaluating the polynomial at n+1 points and solving a linear system to find the coefficients.

Parameters:

Name Type Description Default
p callable

A polynomial function that takes a single argument.

required
degree int

The degree of the polynomial.

required

Returns:

Type Description

numpy.ndarray: Array of coefficients from highest to lowest degree.

Source code in packages/pyspim/src/pyspim/deskew/_util.py
def extract_coefficients(p, degree):
    """Extract polynomial coefficients from a polynomial function.

    Given a polynomial function p(x) of specified degree, this function
    extracts the coefficients by evaluating the polynomial at n+1 points
    and solving a linear system to find the coefficients.

    Args:
        p (callable): A polynomial function that takes a single argument.
        degree (int): The degree of the polynomial.

    Returns:
        numpy.ndarray: Array of coefficients from highest to lowest degree.
    """
    n = degree + 1
    sample_x = [x for x in range(n)]
    sample_y = [p(x) for x in sample_x]
    A = [[0 for _ in range(n)] for _ in range(n)]
    for line in range(n):
        for column in range(n):
            A[line][column] = sample_x[line] ** column
    c = numpy.linalg.solve(A, sample_y)
    return c[::-1]
characteristic_polynomial(A)

Compute the characteristic polynomial of a matrix.

This function is currently a placeholder and not implemented.

Parameters:

Name Type Description Default
A ndarray

Input matrix.

required

Returns:

Name Type Description
NotImplemented

This function is not yet implemented.

Source code in packages/pyspim/src/pyspim/deskew/_util.py
def characteristic_polynomial(A):
    """Compute the characteristic polynomial of a matrix.

    This function is currently a placeholder and not implemented.

    Args:
        A (numpy.ndarray): Input matrix.

    Returns:
        NotImplemented: This function is not yet implemented.
    """
    pass

pyspim.deskew.dispim

Deskew stage-scanned volume into "normal" diSPIM coordinate system (see [1], esp. Figure 1).

We mimic the original Shroff lab Fiji plugin, but use a CUDA kernel for speed. Note that interpolation is done using CUDA texture memory.

References

[1] Kumar, et. al, "Using Stage- and Slit-...", doi: 10.1086/689589

Functions

deskew_stage_scan(im: NDArray, pixel_size: float, step_size: float, direction: int, **kwargs) -> NDArray

Deskew the input volume, im.

Parameters:

Name Type Description Default
im NDArray

input volume

required
pixel_size float

pixel size, in real (space) units

required
step_size float

step size, in real (space) units

required
direction int

±1, direction of objective movement rel. to x-axis.

required
**kwargs

block_size will be passed as kernel launch parameter.

{}

Returns:

Type Description
NDArray

NDArray

Source code in packages/pyspim/src/pyspim/deskew/dispim.py
def deskew_stage_scan(
    im: NDArray, pixel_size: float, step_size: float, direction: int, **kwargs
) -> NDArray:
    """Deskew the input volume, ``im``.

    Args:
        im (NDArray): input volume
        pixel_size (float): pixel size, in real (space) units
        step_size (float): step size, in real (space) units
        direction (int): +/-1, direction of objective movement rel. to x-axis.
        **kwargs: ``block_size`` will be passed as kernel launch parameter.

    Returns:
        NDArray
    """
    return _deskew_texture(im, pixel_size, step_size, direction, **kwargs)
output_width(depth: int, width: int, step_size: float, pixel_size: float) -> int

output_width Compute output width of deskewed volume

Parameters:

Name Type Description Default
depth int

depth of input (pre-deskewing) volume

required
width int

width of input (pre-deskewing) volume

required
step_size float

step size, in real (space) units

required
pixel_size float

pixel size, in real (space) units

required

Returns:

Type Description
int

int

Source code in packages/pyspim/src/pyspim/deskew/dispim.py
def output_width(depth: int, width: int, step_size: float, pixel_size: float) -> int:
    """output_width Compute output width of deskewed volume

    Args:
        depth (int): depth of input (pre-deskewing) volume
        width (int): width of input (pre-deskewing) volume
        step_size (float): step size, in real (space) units
        pixel_size (float): pixel size, in real (space) units

    Returns:
        int
    """
    return width + round(depth * abs(step_size / pixel_size))

pyspim.deskew.shear

Deskew by shear-warp algorithm.

Deskew the input volume using an affine transformation. Should return the same result as dispim but using shear-warp algorithm and (possibly higher-order) interpolation instead of texture-based interpolation.

Functions

inv_deskew_matrix(pixel_size: float, step_size: float, direction: int) -> numpy.ndarray

Inverse deskewing matrix

Parameters:

Name Type Description Default
pixel_size float

size of pixels, in real space

required
step_size float

spacing between sheets, in real units

required
direction int

scan direction (±1)

required

Returns:

Type Description
ndarray

numpy.ndarray

Source code in packages/pyspim/src/pyspim/deskew/shear.py
def inv_deskew_matrix(
    pixel_size: float, step_size: float, direction: int
) -> numpy.ndarray:
    """Inverse deskewing matrix

    Args:
        pixel_size (float): size of pixels, in real space
        step_size (float): spacing between sheets, in real units
        direction (int): scan direction (+/-1)

    Returns:
        numpy.ndarray
    """
    return numpy.asarray(
        [
            [1, 0, direction * step_size / pixel_size, 0],
            [0, 1, 0, 0],
            [0, 0, step_size / pixel_size, 0],
            [0, 0, 0, 1],
        ]
    )
output_shape(z: int, r: int, c: int, pixel_size: float, step_size: float, direction: int, auto_crop: bool) -> Tuple[int, int, int]

Compute output shape of deskewing transform.

Parameters:

Name Type Description Default
z int

shape of input volume, z-direction

required
r int

shape of input volume, r-direction (# rows)

required
c int

shape of input volume, c-direction (# cols)

required
pixel_size float

size of pixels, in real units

required
step_size float

spacing between steps, in real units

required
direction int

scan direction (± 1)

required
auto_crop bool

automatically crop the output to account for rotation of B-view

required

Returns:

Type Description
Tuple[int, int, int]

Tuple[int,int,int]

Source code in packages/pyspim/src/pyspim/deskew/shear.py
def output_shape(
    z: int,
    r: int,
    c: int,
    pixel_size: float,
    step_size: float,
    direction: int,
    auto_crop: bool,
) -> Tuple[int, int, int]:
    """Compute output shape of deskewing transform.

    Args:
        z (int): shape of input volume, z-direction
        r (int): shape of input volume, r-direction (# rows)
        c (int): shape of input volume, c-direction (# cols)
        pixel_size (float): size of pixels, in real units
        step_size (float): spacing between steps, in real units
        direction (int): scan direction (+/- 1)
        auto_crop (bool): automatically crop the output to account for rotation of B-view

    Returns:
        Tuple[int,int,int]
    """
    full_shp = output_shape_for_transform(
        inv_deskew_matrix(pixel_size, step_size, direction), (z, r, c)
    )
    if auto_crop:
        return (full_shp[0], full_shp[1], full_shp[0])
    else:
        return full_shp
deskewing_transform(z: int, r: int, c: int, pixel_size: float, step_size: float, direction: int, auto_crop: bool, rotation_thetas: Tuple[float, float, float] | None) -> Tuple[NDArray, Tuple[int, int, int]]

Compute the deskewing transform for the input volume.

Parameters:

Name Type Description Default
z int

size of input volume, z-direction (pixels)

required
r int

size of input volume, r-direction (pixels, # rows)

required
c int

size of input volume, c-direction (pixels, # cols)

required
pixel_size float

size of pixels, in real units

required
step_size float

step size, in real units

required
direction int

scan direction (±1)

required
auto_crop bool

auto-crop outputs to account for rotation of B into coordinate system of A.

required
rotation_thetas Tuple[int, int, int] | None

rotation angles for each axis. If None, no rotation is done.

required

Returns:

Type Description
Tuple[NDArray, Tuple[int, int, int]]

numpy.ndarray, List[int]: deskewing transform & the output shape

Source code in packages/pyspim/src/pyspim/deskew/shear.py
def deskewing_transform(
    z: int,
    r: int,
    c: int,
    pixel_size: float,
    step_size: float,
    direction: int,
    auto_crop: bool,
    rotation_thetas: Tuple[float, float, float] | None,
) -> Tuple[NDArray, Tuple[int,int,int]]:
    """Compute the deskewing transform for the input volume.

    Args:
        z (int): size of input volume, z-direction (pixels)
        r (int): size of input volume, r-direction (pixels, # rows)
        c (int): size of input volume, c-direction (pixels, # cols)
        pixel_size (float): size of pixels, in real units
        step_size (float): step size, in real units
        direction (int): scan direction (+/-1)
        auto_crop (bool): auto-crop outputs to account for rotation of B into coordinate system of A.
        rotation_thetas (Tuple[int,int,int] | None): rotation angles for each axis. If ``None``, no rotation is done.

    Returns:
        numpy.ndarray, List[int]: deskewing transform & the output shape
    """
    D = inv_deskew_matrix(pixel_size, step_size, direction)
    full_shp = output_shape_for_transform(D, (z, r, c))
    if direction < 0:
        t = translation_matrix(full_shp[0], 0, 0)
        D = D @ t
    if auto_crop:
        # rotating B into view of A implies that some of the B view
        # this will take care of automatically cropping the outputs
        # to this overlap region (helps save memory)
        out_shp = (full_shp[0], full_shp[1], full_shp[0])
        t = translation_matrix(-(full_shp[2] - full_shp[0]) / 2, 0, 0)
        D = D @ t
    else:
        out_shp = full_shp
    if rotation_thetas is None:
        T = numpy.linalg.inv(D).astype(numpy.float32)
    else:
        R = rotation_about_point_matrix(
            *rotation_thetas, *[s / 2 for s in out_shp[::-1]]
        )
        T = (numpy.linalg.inv(D) @ R).astype(numpy.float32)
    return T, out_shp
deskew_stage_scan(im: cupy.ndarray, pixel_size: float, step_size: float, direction: int, rotation_thetas: Tuple[float, float, float] | None, interp_method: str, auto_crop: bool, preserve_dtype: bool, block_size: Tuple[int, int, int]) -> cupy.ndarray

Deskew the input volume using the shear-warp algorithm.

Parameters:

Name Type Description Default
im ndarray

input volume to be deskewed.

required
pixel_size float

pixel size, in real units

required
step_size float

step size, in real units

required
direction int

scan direction (± 1)

required
rotation_thetas Tuple[float, float, float] | None

rotation angles for each axis. If None, no rotation is done.

required
interp_method str

interpolation method to use

required
auto_crop bool

auto-crop output volumes to account for rotation of view B

required
preserve_dtype bool

make output volume have same datatype as input volume (probably uint16).

required
block_size Tuple[int, int, int]

size of blocks for CUDA kernel launch.

required

Returns:

Type Description
ndarray

cupy.ndarray

Source code in packages/pyspim/src/pyspim/deskew/shear.py
def deskew_stage_scan(
    im: cupy.ndarray,
    pixel_size: float,
    step_size: float,
    direction: int,
    rotation_thetas: Tuple[float, float, float] | None,
    interp_method: str,
    auto_crop: bool,
    preserve_dtype: bool,
    block_size: Tuple[int, int, int],
) -> cupy.ndarray:
    """Deskew the input volume using the shear-warp algorithm.

    Args:
        im (cupy.ndarray): input volume to be deskewed.
        pixel_size (float): pixel size, in real units
        step_size (float): step size, in real units
        direction (int): scan direction (+/- 1)
        rotation_thetas (Tuple[float,float,float] | None): rotation angles for each axis. If ``None``, no rotation is done.
        interp_method (str): interpolation method to use
        auto_crop (bool): auto-crop output volumes to account for rotation of view B
        preserve_dtype (bool): make output volume have same datatype as input volume (probably uint16).
        block_size (Tuple[int,int,int]): size of blocks for CUDA kernel launch.

    Returns:
        cupy.ndarray
    """
    T, out_shape = deskewing_transform(
        im.shape[0],
        im.shape[1],
        im.shape[2],
        pixel_size,
        step_size,
        direction,
        auto_crop,
        rotation_thetas,
    )
    T = cupy.asarray(T).astype(cupy.float32)
    dsk = transform(im, T, interp_method, preserve_dtype, out_shape, *block_size)
    return dsk

pyspim.reg.mutinfo

Functions

mutual_information(ref: NDArray, tar: NDArray, tM: NDArray, nbin_ref: int, nbin_tar: int) -> float

mutual information between input images ref & tar where tar is first transformed by matrix tM

:param ref: reference volume :type ref: NDArray :param tar: target (AKA "moving") volume :type tar: NDArray :param tM: matrix to transform tar with :type tM: NDArray :param nbin_ref: # of bins in histogram of ref :type nbin_ref: int :param nbin_tar: # of bins in histogram of tar :type nbin_tar: int :returns: mutual information between ref and transformed tar :rtype: float

Source code in packages/pyspim/src/pyspim/reg/mutinfo.py
def mutual_information(
    ref: NDArray, tar: NDArray, tM: NDArray, nbin_ref: int, nbin_tar: int
) -> float:
    """mutual information between input images `ref` & `tar` where `tar`
        is first transformed by matrix `tM`

    :param ref: reference volume
    :type ref: NDArray
    :param tar: target (AKA "moving") volume
    :type tar: NDArray
    :param tM: matrix to transform `tar` with
    :type tM: NDArray
    :param nbin_ref: # of bins in histogram of `ref`
    :type nbin_ref: int
    :param nbin_tar: # of bins in histogram of `tar`
    :type nbin_tar: int
    :returns: mutual information between `ref` and transformed `tar`
    :rtype: float
    """
    ref_tex, tex_arr = create_texture_object(ref, "border", "linear", "element_type")
    # j1, jx = _preprocess_image_pair_reftex(
    #    ref_tex, tar, tM, nbin_ref, nbin_tar
    # )
    # compute entropy of target
    P_tar, _ = cupy.histogram(tar, nbin_tar)
    P_tar = P_tar / cupy.sum(P_tar) + cupy.finfo(float).eps
    H_tar = discrete_entropy(P_tar) / cupy.log2(nbin_tar)
    mi = _mutual_information_precomp_target(
        ref_tex, tar, cupy.eye(4), nbin_ref, nbin_tar, H_tar
    )
    del ref_tex, tex_arr
    return mi
entropy_correlation_coeff(ref: NDArray, tar: NDArray, tM: NDArray, nbin_ref: int, nbin_tar: int) -> float

entropy correlation coefficient between input images ref & tar where tar is first transformed by matrix tM

:param ref: reference volume :type ref: NDArray :param tar: target (AKA "moving") volume :type tar: NDArray :param tM: matrix to transform tar with :type tM: NDArray :param nbin_ref: # of bins in histogram of ref :type nbin_ref: int :param nbin_tar: # of bins in histogram of tar :type nbin_tar: int :returns: entropy correlation coefficient between ref and transformed tar :rtype: float

Source code in packages/pyspim/src/pyspim/reg/mutinfo.py
def entropy_correlation_coeff(
    ref: NDArray, tar: NDArray, tM: NDArray, nbin_ref: int, nbin_tar: int
) -> float:
    """entropy correlation coefficient between input images `ref` & `tar`
        where `tar` is first transformed by matrix `tM`

    :param ref: reference volume
    :type ref: NDArray
    :param tar: target (AKA "moving") volume
    :type tar: NDArray
    :param tM: matrix to transform `tar` with
    :type tM: NDArray
    :param nbin_ref: # of bins in histogram of `ref`
    :type nbin_ref: int
    :param nbin_tar: # of bins in histogram of `tar`
    :type nbin_tar: int
    :returns: entropy correlation coefficient between `ref` and transformed `tar`
    :rtype: float
    """
    ref_tex, tex_arr = create_texture_object(ref, "border", "linear", "element_type")
    # j1, jx = _preprocess_image_pair_reftex(
    #    ref_tex, tar, tM, nbin_ref, nbin_tar
    # )
    # compute entropy of target
    P_tar, _ = cupy.histogram(tar, nbin_tar)
    P_tar = P_tar / cupy.sum(P_tar) + cupy.finfo(float).eps
    H_tar = discrete_entropy(P_tar) / cupy.log2(nbin_tar)
    ec = _entropy_correlation_coeff_precomp_target(
        ref_tex, tar, tM, nbin_ref, nbin_tar, H_tar
    )
    del ref_tex, tex_arr
    return ec

pyspim.reg.powell

Functions

optimize_affine(ref: cupy.ndarray, mov: cupy.ndarray, metric: str, transform: str, interp_method: str, par0: List[float], bounds: Optional[Union[OptBounds, OptBoundMargins]] = None, kernel_launch_params: Optional[CuLaunchParameters] = None, verbose: bool = False, **opt_kwargs)

Find optimal affine transform to register mov with ref by Powell's method

**opt_kwargs are passed to scipy.optimize.minimize

:param ref: reference volume :type ref: cupy.ndarray :param mov: moving volume, to be registered :type mov: cupy.ndarray :param metric: metric to optimize for registration one of ('nip', 'cr', 'ncc'). 'nip' : normalized inner product 'cr' : correlation ratio 'ncc' : normalized cross correlation :type metric: str :param transform: transform to optimize :type transform: str :param interp: interpolation method to use during transformation one of ('linear', 'cubspl') 'linear' : trilinear interpolation 'cubspl' : cubic b-spline interpolation :type interp: str :param par0: initial guess for parameters :type par0: list :param bounds: bounds for parameters :type bounds: list :param verbose: show intermediate results with tqdm progress bar :type verbose: bool :returns: transform and optimization results :rtype: Tuple[NDArray,OptimizeResult]

Source code in packages/pyspim/src/pyspim/reg/powell.py
def optimize_affine(
    ref: cupy.ndarray,
    mov: cupy.ndarray,
    metric: str,
    transform: str,
    interp_method: str,
    par0: List[float],
    bounds: Optional[Union[OptBounds, OptBoundMargins]] = None,
    kernel_launch_params: Optional[CuLaunchParameters] = None,
    verbose: bool = False,
    **opt_kwargs,
):
    """Find optimal affine transform to register `mov` with `ref` by Powell's method

    `**opt_kwargs` are passed to `scipy.optimize.minimize`

    :param ref: reference volume
    :type ref: cupy.ndarray
    :param mov: moving volume, to be registered
    :type mov: cupy.ndarray
    :param metric: metric to optimize for registration
        one of ('nip', 'cr', 'ncc').
        'nip' : normalized inner product
        'cr'  : correlation ratio
        'ncc' : normalized cross correlation
    :type metric: str
    :param transform: transform to optimize
    :type transform: str
    :param interp: interpolation method to use during transformation
        one of ('linear', 'cubspl')
        'linear' : trilinear interpolation
        'cubspl' : cubic b-spline interpolation
    :type interp: str
    :param par0: initial guess for parameters
    :type par0: list
    :param bounds: bounds for parameters
    :type bounds: list
    :param verbose: show intermediate results with tqdm progress bar
    :type verbose: bool
    :returns: transform and optimization results
    :rtype: Tuple[NDArray,OptimizeResult]
    """
    # make sure both input images are already on the GPU and are floating point
    # TODO: update so that this works on CPU, too
    assert cupy.get_array_module(ref) == cupy, "reference image must be on the GPU" # type: ignore
    assert cupy.get_array_module(mov) == cupy, "moving image must be on the GPU" # type: ignore
    # figure out coords of centroid of image
    msze_z, msze_y, msze_x = mov.shape
    cx, cy, cz = msze_x / 2.0, msze_y / 2.0, msze_z / 2.0
    # make function that will generate the transform matrix
    # also get # of params req'd
    mat_fun, postfix_fun, ipar0 = parse_transform_string(transform)
    if par0 is None:
        par0 = ipar0
    if len(par0) != len(ipar0):
        raise ValueError("invalid # of initial params for transform")
    if bounds is not None:
        if len(bounds) != len(ipar0):
            raise ValueError("invalid # of bounds for transform")
        if isinstance(bounds[0], float) or isinstance(bounds[0], int):
            bounds = [(p - b, p + b) for p, b in zip(par0, bounds)] # type: ignore
        elif isinstance(bounds[0], tuple):
            pass  # already correctly formatted
        else:
            raise ValueError("invalid bound formatting")
    # par_fun takes in the parameter vector and generates the matrix that then gets passed to the kernel
    par_fun = lambda p: mat_fun(p, cx, cy, cz).astype(float).flatten()[:12]
    # get shape of reference and moving images
    sz_ref, sy_ref, sx_ref = ref.shape
    sz_mov, sy_mov, sx_mov = mov.shape
    # figure out launch parameters
    if kernel_launch_params is None:
        block_size = 8
        kernel_launch_params = launch_params_for_volume(
            [sz_mov, sy_mov, sx_mov], block_size, block_size, block_size
        )
    # initialize the computation
    # this makes all the kernels, CUDA streams, and per-device arguments that will get used per-device.
    kernels, streams, kernel_args = kern.initialize_computation(
        metric, interp_method, ref, mov, cupy.mean(ref), cupy.mean(mov),
        sz_ref, sy_ref, sx_ref, sz_mov, sy_mov, sx_mov
    )
    # formulate optimization function
    # NOTE: scipy only has a `minimize` function and these metrics should be maximized. 
    # They are all confined to [0,1] so to minimize, do 1.0 - kernel_value.
    def opt_fun(pars: numpy.ndarray) -> float:
        T = par_fun(pars) # generate transform matrix
        val = kern.compute(
            T, kernels, streams, kernel_args, kernel_launch_params
        )
        return 1.0 - val.get()

    # do powell's registration
    opt_call = partial(
        minimize, opt_fun, x0=par0, bounds=bounds, method="powell", options=opt_kwargs
    )
    if verbose:
        def cback(x, pbar, postfix_fun):
            pbar.update(1)
            pbar.set_postfix(postfix_fun(x))
        with tqdm(desc="Registration", leave=False) as pbar:
            _cback = partial(cback, pbar=pbar, postfix_fun=postfix_fun)
            res = opt_call(callback=_cback)
    else:
        res = opt_call()
    # calculate transform
    T = mat_fun(res.x, cx, cy, cz).reshape(4, 4)
    return T, res

pyspim.reg.pcc

Functions

translation(ref: NDArray, mov: NDArray, upsample_factor: int = 1) -> NDArray

Determine translation between ref and mov.

:param ref: reference image :type ref: NDArray :param mov: image to register. must be same dimensionality as ref :type mov: NDArray :param upsample_factor: upsampling factor, images are registered to 1/upsample_factor. defaults to 1. :type upsample_factor: int :returns: translation along each axis :rtype: NDArray

Source code in packages/pyspim/src/pyspim/reg/pcc.py
def translation(ref: NDArray, mov: NDArray, upsample_factor: int = 1) -> NDArray:
    """Determine translation between `ref` and `mov`.

    :param ref: reference image
    :type ref: NDArray
    :param mov: image to register. must be same dimensionality as `ref`
    :type mov: NDArray
    :param upsample_factor: upsampling factor, images are registered to
        `1/upsample_factor`. defaults to 1.
    :type upsample_factor: int
    :returns: translation along each axis
    :rtype: NDArray
    """
    # get relevant modules for cpu/gpu
    xp = cupy.get_array_module(ref)
    fft = get_fft_module(xp)
    if any([rd != md for rd, md in zip(ref.shape, mov.shape)]):
        ref, mov = pad_to_same_size(ref, mov, style="right")
    # compute fft's and then do phase cross-correlation
    ref_fft = fft.fft2(ref)
    mov_fft = fft.fft2(mov)
    if xp == cupy: # type: ignore
        ref_fft, mov_fft = ref_fft.get(), mov_fft.get()
    trans, _, _ = skimage.registration.phase_cross_correlation(
        ref_fft,
        mov_fft,
        space="fourier",
        upsample_factor=upsample_factor,
    )
    return trans
scale_rotation(ref: NDArray, mov: NDArray, upsample_factor: int = 1) -> Tuple[float, float]

determine scaling & rotation between ref & mov using phase cross correlation of the log-polar transformed inputs

:param ref: reference image :type ref: NDArray :param mov: image to register. must be same dimensionality as ref :type mov: NDArray :param upsample_factor: upsampling factor, images are registered to 1/upsample_factor. defaults to 1. :type upsample_factor: int :returns: rotation (in degrees) and scale difference between ref & mov :rtype: Tuple[float,float]

Source code in packages/pyspim/src/pyspim/reg/pcc.py
def scale_rotation(
    ref: NDArray, mov: NDArray, upsample_factor: int = 1
) -> Tuple[float, float]:
    """determine scaling & rotation between `ref` & `mov` using phase cross
        correlation of the log-polar transformed inputs

    :param ref: reference image
    :type ref: NDArray
    :param mov: image to register. must be same dimensionality as `ref`
    :type mov: NDArray
    :param upsample_factor: upsampling factor, images are registered to
        `1/upsample_factor`. defaults to 1.
    :type upsample_factor: int
    :returns: rotation (in degrees) and scale difference between `ref` & `mov`
    :rtype: Tuple[float,float]
    """
    xp = cupy.get_array_module(ref)
    if xp == cupy: # type: ignore
        ref, mov = ref.get(), mov.get() # type: ignore (we know there are cupy)
    if any([rd != md for rd, md in zip(ref.shape, mov.shape)]):
        ref, mov = pad_to_same_size(ref, mov, style="right")
    radius = min([min(ref.shape), min(mov.shape)])
    ref_polar = skimage.transform.warp_polar(ref, radius=radius, scaling="log")
    mov_polar = skimage.transform.warp_polar(mov, radius=radius, scaling="log")
    shift, err, _ = skimage.registration.phase_cross_correlation(
        ref_polar, mov_polar, upsample_factor=upsample_factor
    )
    cc_rot = shift[0]
    scale = 1 / (math.exp(shift[1] / (radius / math.log(radius))))
    return scale, cc_rot
rotation_scale_for_volumes(ref: NDArray, mov: NDArray, upsample_factor: int = 1.0)

calculate rotation, and scaling between input volumes by phase cross correlation of the maximum projections in each axis

:param ref: reference volume :type ref: NDArray :param mov: 'moving' volume to be registered to ref :type mov: NDArray :param upsample_factor: upsampling factor. images are registered to 1/upsample_factor. defaults to 1 :type upsample_factor: float :returns: rotation and scaling for each axis :rtype: Tuple[NDArray,NDArray]

Source code in packages/pyspim/src/pyspim/reg/pcc.py
def rotation_scale_for_volumes(
    ref: NDArray, mov: NDArray, upsample_factor: int = 1.0
):
    """calculate rotation, and scaling between input volumes by
        phase cross correlation of the maximum projections in each axis

    :param ref: reference volume
    :type ref: NDArray
    :param mov: 'moving' volume to be registered to `ref`
    :type mov: NDArray
    :param upsample_factor: upsampling factor. images are registered to
        `1/upsample_factor`. defaults to 1
    :type upsample_factor: float
    :returns: rotation and scaling for each axis
    :rtype: Tuple[NDArray,NDArray]
    """
    xp = cupy.get_array_module(ref)
    # calculate max-projections for XY
    ref_xy, mov_xy = xp.amax(ref, axis=0), xp.amax(mov, axis=0)
    scale_xy, rot_xy = scale_rotation(ref_xy, mov_xy, upsample_factor)
    # repeat for ZX
    ref_zx, mov_zx = xp.amax(ref, axis=1), xp.amax(mov, axis=1)
    scale_zx, rot_zx = scale_rotation(ref_zx, mov_zx, upsample_factor)
    # and again for ZY
    ref_zy, mov_zy = xp.amax(ref, axis=2), xp.amax(mov, axis=2)
    scale_zy, rot_zy = scale_rotation(ref_zy, mov_zy, upsample_factor)
    # rotations & scalings are scalars, just concat them
    rot = numpy.asarray([rot_zy, rot_zx, rot_xy])
    scale = numpy.asarray([scale_zy, scale_zx, scale_xy])
    return [float(r) for r in rot[::-1]], [float(s) for s in scale[::-1]]
translation_for_volumes(ref: NDArray, mov: NDArray, upsample_factor: int = 1)

calculate relative translation, rotation, and scaling between input volumes by phase cross correlation of the maximum projections in each axis

:param ref: reference volume :type ref: NDArray :param mov: 'moving' volume to be registered to ref :type mov: NDArray :param upsample_factor: upsampling factor. images are registered to 1/upsample_factor. defaults to 1 :type upsample_factor: float :returns: translation, rotation, and scaling for each axis :rtype: Tuple[NDArray,NDArray,NDArray]

Source code in packages/pyspim/src/pyspim/reg/pcc.py
def translation_for_volumes(ref: NDArray, mov: NDArray, upsample_factor: int = 1):
    """calculate relative translation, rotation, and scaling between input
        volumes by phase cross correlation of the maximum projections
        in each axis

    :param ref: reference volume
    :type ref: NDArray
    :param mov: 'moving' volume to be registered to `ref`
    :type mov: NDArray
    :param upsample_factor: upsampling factor. images are registered to
        `1/upsample_factor`. defaults to 1
    :type upsample_factor: float
    :returns: translation, rotation, and scaling for each axis
    :rtype: Tuple[NDArray,NDArray,NDArray]
    """
    xp = cupy.get_array_module(ref)
    # calculate max-projections for XY
    ref_xy = xp.amax(ref, axis=0)
    mov_xy = xp.amax(mov, axis=0)
    trans_xy = translation(ref_xy, mov_xy, upsample_factor)
    trans_xy = numpy.concatenate([numpy.asarray([numpy.inf]), numpy.asarray(trans_xy)])
    # repeat for ZX
    ref_zx = xp.amax(ref, axis=1)
    mov_zx = xp.amax(mov, axis=1)
    trans_zx = translation(ref_zx, mov_zx, upsample_factor)
    trans_zx = numpy.concatenate(
        [
            numpy.asarray([trans_zx[0]]),
            numpy.asarray([numpy.inf]),
            numpy.asarray([trans_zx[1]]),
        ]
    )
    # and again for ZY
    ref_zy = xp.amax(ref, axis=2)
    mov_zy = xp.amax(mov, axis=2)
    trans_zy = translation(ref_zy, mov_zy, upsample_factor)
    trans_zy = numpy.concatenate([numpy.asarray(trans_zy), numpy.asarray([numpy.inf])])
    # assemble outputs
    # starting translation is just whichever the smallest one
    # calculated for each dimension
    trans = numpy.amin(numpy.vstack([trans_xy, trans_zx, trans_zy]), axis=0)
    # NOTE: _ab corresponds to the translation for the 'c' axis so right
    # now its formatted like ZRC
    return [float(t) for t in trans[::-1]]  # this makes output XYZ
translation_rotation_scale_for_volumes(ref: NDArray, mov: NDArray, upsample_factor: int = 1)

calculate relative translation, rotation, and scaling between input volumes by phase cross correlation of the maximum projections in each axis

:param ref: reference volume :type ref: NDArray :param mov: 'moving' volume to be registered to ref :type mov: NDArray :param upsample_factor: upsampling factor. images are registered to 1/upsample_factor. defaults to 1 :type upsample_factor: float :returns: translation, rotation, and scaling for each axis :rtype: Tuple[NDArray,NDArray,NDArray]

Source code in packages/pyspim/src/pyspim/reg/pcc.py
def translation_rotation_scale_for_volumes(
    ref: NDArray, mov: NDArray, upsample_factor: int = 1
):
    """calculate relative translation, rotation, and scaling between input
        volumes by phase cross correlation of the maximum projections
        in each axis

    :param ref: reference volume
    :type ref: NDArray
    :param mov: 'moving' volume to be registered to `ref`
    :type mov: NDArray
    :param upsample_factor: upsampling factor. images are registered to
        `1/upsample_factor`. defaults to 1
    :type upsample_factor: float
    :returns: translation, rotation, and scaling for each axis
    :rtype: Tuple[NDArray,NDArray,NDArray]
    """
    xp = cupy.get_array_module(ref)
    # calculate max-projections for XY
    ref_xy = xp.amax(ref, axis=0)
    mov_xy = xp.amax(mov, axis=0)
    trans_xy = translation(ref_xy, mov_xy, upsample_factor)
    trans_xy = numpy.concatenate([numpy.asarray([numpy.inf]), numpy.asarray(trans_xy)])
    scale_xy, rot_xy = scale_rotation(ref_xy, mov_xy, upsample_factor)
    # repeat for ZX
    ref_zx = xp.amax(ref, axis=1)
    mov_zx = xp.amax(mov, axis=1)
    trans_zx = translation(ref_zx, mov_zx, upsample_factor)
    trans_zx = numpy.concatenate(
        [
            numpy.asarray([trans_zx[0]]),
            numpy.asarray([numpy.inf]),
            numpy.asarray([trans_zx[1]]),
        ]
    )
    scale_zx, rot_zx = scale_rotation(ref_zx, mov_zx, upsample_factor)
    # and again for ZY
    ref_zy = xp.amax(ref, axis=2)
    mov_zy = xp.amax(mov, axis=2)
    trans_zy = translation(ref_zy, mov_zy, upsample_factor)
    trans_zy = numpy.concatenate([numpy.asarray(trans_zy), numpy.asarray([numpy.inf])])
    scale_zy, rot_zy = scale_rotation(ref_zy, mov_zy, upsample_factor)
    # assemble outputs
    # starting translation is just whichever the smallest one
    # calculated for each dimension
    trans = numpy.amin(numpy.vstack([trans_xy, trans_zx, trans_zy]), axis=0)[::-1]
    # rotations & scalings are scalars, just concat them
    rot = numpy.asarray([rot_zy, rot_zx, rot_xy])[::-1]
    scale = numpy.asarray([scale_zy, scale_zx, scale_xy])[::-1]
    return trans, rot, scale

pyspim.reg.kernels

pyspim.data.dispim

utilities for loading data acquired in micro-manager with the ASI diSPIM plugin

Functions

uManagerAcquisition(path: os.PathLike, multi_pos: bool = False, xp: ModuleType = numpy)

uManagerAcquisition Create new acquisition object for fetching raw acquisition data.

Parameters:

Name Type Description Default
path PathLike

path to root directory of acquisition

required
multi_pos bool

flag for if this is a multi-position acquisition. Defaults to False.

False
xp ModuleType

array module to load data into (numpy or cupy). Defaults to numpy.

numpy
Source code in packages/pyspim/src/pyspim/data/dispim.py
def uManagerAcquisition(
    path: os.PathLike, multi_pos: bool = False, xp: ModuleType = numpy
):
    """uManagerAcquisition Create new acquisition object for fetching raw acquisition data.

    Args:
        path (os.PathLike): path to root directory of acquisition
        multi_pos (bool, optional): flag for if this is a multi-position acquisition. Defaults to False.
        xp (ModuleType, optional): array module to load data into (``numpy`` or ``cupy``). Defaults to numpy.
    """
    if multi_pos:
        return uManagerAcquisitionMultiPos(path, xp)
    else:
        return uManagerAcquisitionOnePos(path, xp)
parse_position_list(path: os.PathLike) -> numpy.ndarray

parse_position_list Parse uManager position list into numpy array

Parameters:

Name Type Description Default
path PathLike

path to the PositionList.pos file

required

Returns:

Type Description
ndarray

numpy.ndarray: 6-column array [position_index, grid_z, grid_y, z, y, x]

Source code in packages/pyspim/src/pyspim/data/dispim.py
def parse_position_list(path: os.PathLike) -> numpy.ndarray:
    """parse_position_list Parse uManager position list into numpy array

    Args:
        path (os.PathLike): path to the PositionList.pos file

    Returns:
        numpy.ndarray: 6-column array [position_index, grid_z, grid_y, z, y, x]
    """
    with open(path) as f:
        pos_dict = json.load(f)
    positions = pos_dict["POSITIONS"]
    rows = []
    for idx, position in enumerate(positions):
        gz, gy = __label_to_grid_location(position["LABEL"])
        x, y, z = __physical_position_from_devices(position["DEVICES"])
        rows.append([idx, gz, gy, z, y, x])
    return numpy.vstack(rows)
subtract_constant_uint16arr(arr: NDArray, const: int) -> NDArray

subtract_constant_uint16arr Subtract constant value from uint16 array, preventing underflow.

Parameters:

Name Type Description Default
arr NDArray

input array

required
const int

value to subtract

required

Returns:

Type Description
NDArray

NDArray

Source code in packages/pyspim/src/pyspim/data/dispim.py
def subtract_constant_uint16arr(arr: NDArray, const: int) -> NDArray:
    """subtract_constant_uint16arr Subtract constant value from uint16 array, preventing underflow.

    Args:
        arr (NDArray): input array
        const (int): value to subtract

    Returns:
        NDArray
    """
    try:
        xp = cupy.get_array_module(arr)
    except (NameError, ImportError):
        xp = numpy
    return (arr.astype(xp.int32) - const).clip(0, 2**16).astype(xp.uint16)

pyspim.interp.affine

Functions

output_shape_for_transform(T: NDArray, input_shape: tuple[int, int, int]) -> Tuple[int, int, int]

Calculate output shape of transformed volume.

Parameters:

Name Type Description Default
T NDArray

affine transform matrix

required
input_shape Iterable

shape of input volume (ZRC)

required

Returns:

Type Description
Tuple[int, int, int]

Tuple[int,int,int]: output shape (ZRC)

Source code in packages/pyspim/src/pyspim/interp/affine.py
def output_shape_for_transform(
    T: NDArray, input_shape: tuple[int,int,int],
) -> Tuple[int, int, int]:
    """Calculate output shape of transformed volume.

    Args:
        T (NDArray): affine transform matrix
        input_shape (Iterable): shape of input volume (ZRC)

    Returns:
        Tuple[int,int,int]: output shape (ZRC)
    """
    t = T.get() if cupy.get_array_module(T) == cupy else T # type: ignore
    coord = list(product(*[(0, s) for s in input_shape[::-1]]))
    coord = numpy.asarray(coord).T
    coord = numpy.vstack([coord, numpy.zeros_like(coord[0, :])])
    coordT = (t @ coord)[:-1, :]
    ptp = numpy.ceil(numpy.ptp(coordT, axis=1))
    x, y, z = int(ptp[0]), int(ptp[1]), int(ptp[2])
    return z, y, x
output_shape_for_inv_transform(T: NDArray, input_shape: tuple[int, int, int]) -> Tuple[int, int, int]

Calculate output shape of (inverse)-transformed volume.

Parameters:

Name Type Description Default
T NDArray

affine transform matrix (to be inverted)

required
input_shape Iterable

shape of input volume (ZRC)

required

Returns:

Type Description
Tuple[int, int, int]

Tuple[int,int,int]: shape of output volume (ZRC)

Source code in packages/pyspim/src/pyspim/interp/affine.py
def output_shape_for_inv_transform(
    T: NDArray, input_shape: tuple[int,int,int]
) -> Tuple[int, int, int]:
    """Calculate output shape of (inverse)-transformed volume.

    Args:
        T (NDArray): affine transform matrix (to be inverted)
        input_shape (Iterable): shape of input volume (ZRC)

    Returns:
        Tuple[int,int,int]: shape of output volume (ZRC)
    """
    xp = cupy.get_array_module(T)
    fwd = xp.linalg.inv(T).get() if xp == cupy else xp.linalg.inv(T) # type: ignore
    return output_shape_for_transform(fwd, input_shape)
transform(A: NDArray, T: NDArray, interp_method: str, preserve_dtype: bool, out_shp: Tuple[int, int, int] | None, block_size_z: int, block_size_y: int, block_size_x: int, gpu_id: int = 0, n_gpu: int = 1) -> cupy.ndarray

Apply affine transform to input volume.

Parameters:

Name Type Description Default
A NDArray

input volume (ZRC)

required
T NDArray

affine transform matrix (4x4), rows of matrix corresp. to XYZ

required
interp_method str

interpolation method to use when interpolating points in the transformed volume. One of 'nearest','linear','cubspl'.

required
preserve_dtype bool

make output datatype match that of the input (uint16), if False, output is single-precision float.

required
out_shp Tuple[int, int, int] | None

shape of output volume. if None, will be calculated by this function

required
block_size_z int

size of kernel launch block, in z dimension

required
block_size_y int

size of kernel launch block, in y dimension

required
block_size_x int

size of kernel launch block, in x dimension

required

Raises:

Type Description
ValueError

if input is not a cupy.ndarray

Returns:

Type Description
ndarray

cupy.ndarray: transformed volume

Notes

The input transform, T, should be the forward transform mapping the input to the desired output domain. Internally, this function takes the inverse of the specified transform and then iterates over the output domain to extract values from the input array.

Source code in packages/pyspim/src/pyspim/interp/affine.py
def transform(
    A: NDArray,
    T: NDArray,
    interp_method: str,
    preserve_dtype: bool,
    out_shp: Tuple[int, int, int] | None,
    block_size_z: int,
    block_size_y: int,
    block_size_x: int,
    gpu_id: int = 0,
    n_gpu: int = 1,
) -> cupy.ndarray:
    """Apply affine transform to input volume.

    Args:
        A (NDArray): input volume (ZRC)
        T (NDArray): affine transform matrix (4x4), rows of matrix corresp. to XYZ
        interp_method (str): interpolation method to use when interpolating points in the transformed volume. One of ``'nearest','linear','cubspl'``.
        preserve_dtype (bool): make output datatype match that of the input (uint16), if False, output is single-precision float.
        out_shp (Tuple[int,int,int] | None): shape of output volume. if ``None``, will be calculated by this function
        block_size_z (int): size of kernel launch block, in z dimension
        block_size_y (int): size of kernel launch block, in y dimension
        block_size_x (int): size of kernel launch block, in x dimension

    Raises:
        ValueError: if input is not a ``cupy.ndarray``

    Returns:
        cupy.ndarray: transformed volume

    Notes:
        The input transform, T, should be the forward transform mapping the input to the desired output domain. Internally, this function takes the inverse of the specified transform and then iterates over the output domain to extract values from the input array.
    """
    if cupy.get_array_module(A) == cupy: # type: ignore
        kernel = _get_kernel(A.dtype, interp_method, preserve_dtype)
        if out_shp is None:
            out_shp = output_shape_for_transform(T, A.shape)
        launch_params = launch_params_for_volume(
            out_shp, block_size_z, block_size_y, block_size_x
        )
        T = cupy.linalg.inv(cupy.asarray(T)).astype(cupy.float32)
        # preallocate output and call kernel
        out_dtype = A.dtype if preserve_dtype else cupy.float32
        out = cupy.zeros(out_shp, dtype=out_dtype)
        kernel(
            launch_params[0], launch_params[1], 
            (out, A, T, *out_shp, *A.shape, gpu_id, n_gpu)
        )
        return out
    else:
        raise ValueError("only works on cupy arrays")

pyspim.fsc.checkerboard

image/volume splitting by checkerboard subsampling

Functions

checkerboard_split(arr: NDArray, split: int) -> Tuple[NDArray, NDArray]

checkerboard split images or volumes for FSC analysis generate pairs of images for FS(R)C analysis by subsampling them, as described in Fig. 2(a) of [1]

References

[1] Koho, et al. "Fourier ring correlation..." doi:10.1038/s41467-019-11024-z

:param arr: input image/volume :type arr: NDArray :param split: type of split to do :type split: int :returns: checkerboard-subsampled halves of input array :rtype: Tuple[NDArray,NDArray]

Source code in packages/pyspim/src/pyspim/fsc/checkerboard.py
def checkerboard_split(arr: NDArray, split: int) -> Tuple[NDArray, NDArray]:
    """checkerboard split images or volumes for FSC analysis
        generate pairs of images for FS(R)C analysis by subsampling
        them, as described in Fig. 2(a) of [1]

    References
    ---
    [1] Koho, et al. "Fourier ring correlation..." doi:10.1038/s41467-019-11024-z

    :param arr: input image/volume
    :type arr: NDArray
    :param split: type of split to do
    :type split: int
    :returns: checkerboard-subsampled halves of input array
    :rtype: Tuple[NDArray,NDArray]
    """
    if len(arr.shape) == 3:
        return _checkerboard_split_vol(arr, split)
    elif len(arr.shape) == 2:
        return _checkerboard_split_img(arr, split)
    else:
        raise ValueError("input arr must be 2- or 3-dimensional")

pyspim.fsc.main

Functions

fourier_ring_correlation(im1: NDArray, im2: NDArray, bin_edges: Sequence, pixel_size: Optional[float] = None) -> FRCOutput

compute Fourier Ring Correlation for pair of input images pair should be checkerboard-subsampled pair of images originating from a single image

:param im1: image 1 :type im1: NDArray :param im2: image 2 :type im2: NDArray :param bin_edges: edges of frequency bins that FRC is calculated over :type bin_edges: Sequence :param pixel_size: pixel size, in real-space units. if None, output spatial frequencies will be in units pix^{-1} if specified, output spatial frequencies will have correct units :type pixel_size: Optional[float] :returns: FRC values and corresponding spatial frequencies :rtype: Tuple[NDArray,NDArray]

Source code in packages/pyspim/src/pyspim/fsc/main.py
def fourier_ring_correlation(
    im1: NDArray, im2: NDArray, bin_edges: Sequence, pixel_size: Optional[float] = None
) -> FRCOutput:
    """compute Fourier Ring Correlation for pair of input images
        pair should be checkerboard-subsampled pair of images originating
        from a single image

    :param im1: image 1
    :type im1: NDArray
    :param im2: image 2
    :type im2: NDArray
    :param bin_edges: edges of frequency bins that FRC is calculated over
    :type bin_edges: Sequence
    :param pixel_size: pixel size, in real-space units.
        if `None`, output spatial frequencies will be in units pix^{-1}
        if specified, output spatial frequencies will have correct units
    :type pixel_size: Optional[float]
    :returns: FRC values and corresponding spatial frequencies
    :rtype: Tuple[NDArray,NDArray]
    """
    pixel_size = 1.0 if pixel_size is None else pixel_size
    assert len(im1.shape) == 2 and len(im2.shape) == 2, "both inputs must be 2D images"
    # figure out auto-dispatch
    xp = cupy.get_array_module(im1)
    assert xp == cupy.get_array_module(im2), (
        "both images (`im1` & `im2`) must be on same device"
    )
    # compute fourier transform
    fft1 = xp.fft.fftshift(xp.fft.fft2(im1))
    fft2 = xp.fft.fftshift(xp.fft.fft2(im2))
    # figure out center coordinate of fft
    ysze, xsze = fft1.shape
    yc, xc = ysze / 2.0, xsze / 2.0
    # formulate a grid of coordinates & use to calculate radial
    # distance from center at each point in the image
    grid = xp.meshgrid(xp.arange(xsze) - xc, xp.arange(ysze) - yc, indexing="xy")
    R = xp.sqrt(functools.reduce(operator.add, map(xp.square, grid)))
    # calculate spatial frequencies (in real units) being probed
    nyq_frq = xp.floor(im1.shape[0] * pixel_size / 2.0)  # Nyquist frequency
    bin_centers = (bin_edges[1:] + bin_edges[:-1]) / 2.0
    spt_frq = bin_centers / nyq_frq
    frc, npts = __fourier_correlation(fft1, fft2, R, bin_edges)
    return frc, spt_frq
fourier_shell_correlation(vol1: NDArray, vol2: NDArray, bin_edges: Sequence, voxel_size: Optional[float] = None) -> FSCOutput

compute Fourier Shell Correlation for pair of input volumes pair should be checkerboard-subsampled pair of volumes originating from a single volume

:param vol1: volume 1 :type vol1: NDArray :param vol2: volume 2 :type vol2: NDArray :param bin_edges: :type bin_edges: Sequence :param voxel_size: :type voxel_size: Optional[float] :returns: :rtype: Tuple[NDArray,NDArray,NDArray]

Source code in packages/pyspim/src/pyspim/fsc/main.py
def fourier_shell_correlation(
    vol1: NDArray,
    vol2: NDArray,
    bin_edges: Sequence,
    voxel_size: Optional[float] = None,
) -> FSCOutput:
    """compute Fourier Shell Correlation for pair of input volumes
        pair should be checkerboard-subsampled pair of volumes originating
        from a single volume

    :param vol1: volume 1
    :type vol1: NDArray
    :param vol2: volume 2
    :type vol2: NDArray
    :param bin_edges:
    :type bin_edges: Sequence
    :param voxel_size:
    :type voxel_size: Optional[float]
    :returns:
    :rtype: Tuple[NDArray,NDArray,NDArray]
    """
    voxel_size = 1.0 if voxel_size is None else voxel_size
    assert len(vol1.shape) == 3 and len(vol2.shape) == 3, (
        "both inputs must be 3D volumes"
    )
    # figure out auto-dispatch
    xp = cupy.get_array_module(vol1)
    assert xp == cupy.get_array_module(vol2), (
        "both images (`vol1` & `vol2`) must be on same device"
    )
    # compute fourier transform
    fft1 = xp.fft.fftshift(xp.fft.fftn(vol1))
    fft2 = xp.fft.fftshift(xp.fft.fftn(vol2))
    # figure out center coordinate of fft
    zsze, ysze, xsze = fft1.shape
    zc, yc, xc = zsze / 2.0, ysze / 2.0, xsze / 2.0
    # formulate a grid of coordinates & use to calculate radial
    # distance from center at each voxel in volume
    grid = xp.meshgrid(
        xp.arange(xsze) - xc, xp.arange(ysze) - yc, xp.arange(zsze) - zc, indexing="xy"
    )
    R = xp.sqrt(functools.reduce(operator.add, map(xp.square, grid)))
    # calculate spatial frequencies (in real units) being probed
    nyq_frq = xp.floor(vol1.shape[0] * voxel_size / 2.0)  # Nyquist frequency
    bin_centers = (bin_edges[1:] + bin_edges[:-1]) / 2.0
    spt_frq = bin_centers / nyq_frq
    fsc, npts = __fourier_correlation(fft1, fft2, R, bin_edges)
    return fsc, spt_frq, npts