Executive Summary

This post explores using the ITK suite of image processing utilities in parallel with Dask Array.

We cover …

  1. A simple but common example of applying deconvolution across a stack of 3d images
  2. Tips on how to make these two libraries work well together
  3. Challenges that we ran into and opportunities for future improvements.

A Worked Example

Let’s start with a full example applying Richardson Lucy deconvolution to a stack of light sheet microscopy data. This is the same data that we showed how to load in our last blogpost on image loading:

# Load our data from last time¶
import dask.array as da
imgs = da.from_zarr("AOLLSMData_m4_raw.zarr/", "data")
Array Chunk
Bytes 188.74 GB 316.15 MB
Shape (3, 199, 201, 1024, 768) (1, 1, 201, 1024, 768)
Count 598 Tasks 597 Chunks
Type uint16 numpy.ndarray
199 3 768 1024 201
# Load our Point Spread Function (PSF)
import dask.array.image
psf = dask.array.image.imread("AOLLSMData/m4/psfs_z0p1/*.tif")[:, None, ...]
Array Chunk
Bytes 2.48 MB 827.39 kB
Shape (3, 1, 101, 64, 64) (1, 1, 101, 64, 64)
Count 6 Tasks 3 Chunks
Type uint16 numpy.ndarray
1 3 64 64 101
# Convert data to float32 for computation¶
import numpy as np
imgs = imgs.astype(np.float32)
# Note: the psf needs to be sampled with a voxel spacing
# consistent with the image's sampling
psf = psf.astype(np.float32)
# Apply Richardson-Lucy Deconvolution¶
def richardson_lucy_deconvolution(img, psf, iterations=1):
    """ Apply deconvolution to a single chunk of data """
    import itk

    img = img[0, 0, ...]  # remove leading two length-one dimensions
    psf = psf[0, 0, ...]  # remove leading two length-one dimensions

    image = itk.image_view_from_array(img)   # Convert to ITK object
    kernel = itk.image_view_from_array(psf)  # Convert to ITK object

    deconvolved = itk.richardson_lucy_deconvolution_image_filter(
        image,
        kernel_image=kernel,
        number_of_iterations=iterations
    )

    result = itk.array_from_image(deconvolved)  # Convert back to Numpy array
    result = result[None, None, ...]  # Add back the leading length-one dimensions

    return result

out = da.map_blocks(richardson_lucy_deconvolution, imgs, psf, dtype=np.float32)
# Create a local cluster of dask worker processes
# (this could also point to a distributed cluster if you have it)
from dask.distributed import LocalCluster, Client
cluster = LocalCluster(n_workers=20, threads_per_process=1)
client = Client(cluster)  # now dask operations use this cluster by default

# Trigger computation and store
out.to_zarr("AOLLSMData_m4_raw.zarr", "deconvolved", overwrite=True)

So in the example above we …

  1. Load data both from Zarr and TIFF files into multi-chunked Dask arrays
  2. Construct a function to apply an ITK routine onto each chunk
  3. Apply that function across the dask array with the dask.array.map_blocks function.
  4. Store the result back into Zarr format

From the perspective of an imaging scientist, the new piece of technology here is the dask.array.map_blocks function. Given a Dask array composed of many NumPy arrays and a function, map_blocks applies that function across each block in parallel, returning a Dask array as a result. It’s a great tool whenever you want to apply an operation across many blocks in a simple fashion. Because Dask arrays are just made out of Numpy arrays it’s an easy way to compose Dask with the rest of the Scientific Python ecosystem.

Building the right function

However in this case there are a few challenges to constructing the right Numpy -> Numpy function, due to both idiosyncrasies in ITK and Dask Array. Let’s look at our function again:

def richardson_lucy_deconvolution(img, psf, iterations=1):
    """ Apply deconvolution to a single chunk of data """
    import itk

    img = img[0, 0, ...]  # remove leading two length-one dimensions
    psf = psf[0, 0, ...]  # remove leading two length-one dimensions

    image = itk.image_view_from_array(img)   # Convert to ITK object
    kernel = itk.image_view_from_array(psf)  # Convert to ITK object

    deconvolved = itk.richardson_lucy_deconvolution_image_filter(
        image,
        kernel_image=kernel,
        number_of_iterations=iterations
    )

    result = itk.array_from_image(deconvolved)  # Convert back to Numpy array
    result = result[None, None, ...]  # Add back the leading length-one dimensions

    return result

out = da.map_blocks(richardson_lucy_deconvolution, imgs, psf, dtype=np.float32)

This is longer than we would like. Instead, we would have preferred to just use the itk function directly, without all of the steps before and after.

deconvolved = da.map_blocks(itk.richardson_lucy_deconvolution_image_filter, imgs, psf)

What were the extra steps in our function and why were they necessary?

  1. Convert to and from ITK Image objects: ITK functions don’t consume and produce Numpy arrays, they consume and produce their own Image data structure. There are convenient functions to convert back and forth, so handling this is straightforward, but it does need to be handled each time. See ITK #1136 for a feature request that would remove the need for this step.

  2. Unpack and pack singleton dimensions: Our Dask arrays have shapes like the following:

    Array Shape: (3, 199, 201, 1024, 768)
    Chunk Shape: (1,   1, 201, 1024, 768)
    

    So our map_blocks function gets NumPy arrays of the chunk size, (1, 1, 201, 1024, 768). However, our ITK functions are meant to work on 3d arrays, not 5d arrays, so we need to remove those first two dimensions.

    img = img[0, 0, ...]  # remove leading two length-one dimensions
    psf = psf[0, 0, ...]  # remove leading two length-one dimensions
    

    And then when we’re done, Dask expects to get back 5d arrays like what it provided, so we add these singleton dimensions back in

    result = result[None, None, ...]  # Add back the leading length-one dimensions
    

    Again, this is straightforward for users who are accustomed to NumPy slicing syntax, but does need to be done each time. This adds some friction to our development process, and is another step that can confuse users.

But if you’re comfortable working around things like this, then ITK and map_blocks can be a powerful combination if you want to parallelize out ITK operations across a cluster.

Defining a Dask Cluster

Above we used dask.distributed.LocalCluster to set up 20 single-threaded workers on our local workstation:

from dask.distributed import LocalCluster, Client
cluster = LocalCluster(n_workers=20, threads_per_process=1)
client = Client(cluster)  # now dask operations use this cluster by default

If you had a distributed resource, this is where you would connect it. You would swap out LocalCluster with one of Dask’s other deployment options.

Also, we found that we needed to use many single-threaded processes rather than one multi-threaded process because ITK functions seem to still hold onto the GIL. This is fine, we just need to be aware of it so that we set up our Dask workers appropriately with one thread per process for maximum efficiency. See ITK #1134 for an active Github issue on this topic.

Serialization

We had some difficulty when using the ITK library across multiple processes, because the library itself didn’t serialize well. (If you don’t understand what that means, don’t worry). We solved a bit of this in ITK #1090, but some issues still remain.

We got around this by including the import in the function rather than outside of it.

def richardson_lucy_deconvolution(img, psf, iterations=1):
    import itk   # <--- we work around serialization issues by importing within the function

That way each task imports itk individually, and we sidestep this issue.

Trying Scikit-Image

We also tried out the Richardson Lucy deconvolution operation in Scikit-Image. Scikit-Image is known for being more Scipy/Numpy native, but not always as fast as ITK. Our experience confirmed this perception.

First, we were glad to see that the scikit-image function worked with map_blocks immediately without any packing/unpacking, dimensionality, or serialization issues:

import skimage.restoration

out = da.map_blocks(skimage.restoration.richardson_lucy, imgs, psf)  # just works

So all of that converting to and from image objects or removing and adding singleton dimensions isn’t necessary here.

In terms of performance we were also happy to see that Scikit-Image released the GIL, so we were able to get very high reported CPU utilization when using a small number of multi-threaded processes. However, even though CPU utilization was high, our parallel performance was poor enough that we stuck with the ITK solution, warts and all. More information about this is available in Github issue scikit-image #4083.

Note: sequentially on a single chunk, ITK ran in around 2 minutes while scikit-image ran in 3 minutes. It was only once we started parallelizing that things became slow.

Regardless, our goal in this experiment was to see how well ITK and Dask array played together. It was nice to see what smooth integration looks like, if only to motivate future development in ITK+Dask relations.

Numba GUFuncs

An alternative to da.map_blocks are Generalized Universal Functions (gufuncs) These are functions that have many magical properties, one of which is that they operate equally well on both NumPy and Dask arrays. If libraries like ITK or Scikit-Image make their functions into gufuncs then they work without users having to do anything special.

The easiest way to implement gufuncs today is with Numba. I did this on our wrapped richardson_lucy function, just to show how it could work, in case other libraries want to take this on in the future.

import numba

@numba.guvectorize(
    ["float32[:,:,:], float32[:,:,:], float32[:,:,:]"],  # we have to specify types
    "(i,j,k),(a,b,c)->(i,j,k)",                          # and dimensionality explicitly
    forceobj=True,
)
def richardson_lucy_deconvolution(img, psf, out):
    # <---- no dimension unpacking!
    iterations = 1
    image = itk.image_view_from_array(np.ascontiguousarray(img))
    kernel = itk.image_view_from_array(np.ascontiguousarray(psf))

    deconvolved = itk.richardson_lucy_deconvolution_image_filter(
        image, kernel_image=kernel, number_of_iterations=iterations
    )
    out[:] = itk.array_from_image(deconvolved)

# Now this function works natively on either NumPy or Dask arrays
out = richardson_lucy_deconvolution(imgs, psf)  # <-- no map_blocks call!

Note that we’ve both lost the dimension unpacking and the map_blocks call. Our function now knows enough information about how it can broadcast that Dask can do the parallelization without being told what to do explicitly.

This adds some burden onto library maintainers, but makes the user experience much more smooth.

GPU Acceleration

When doing some user research on image processing and Dask, almost everyone we interviewed said that they wanted faster deconvolution. This seemed to be a major pain point. Now we know why. It’s both very common, and very slow.

Running deconvolution on a single chunk of this size takes around 2-4 minutes, and we have hundreds of chunks in a single dataset. Multi-core parallelism can help a bit here, but this problem may also be ripe for GPU acceleration. Similar operations typically have 100x speedups on GPUs. This might be a more pragmatic solution than scaling out to large distributed clusters.

What’s next?

This experiment both …

  • Gives us an example that other imaging scientists can copy and modify to be effective with Dask and ITK together.
  • Highlights areas of improvement where developers from the different libraries can work to remove some of these rough interactions spots in the future.

    It’s worth noting that Dask has done this with lots of libraries within the Scipy ecosystem, including Pandas, Scikit-Image, Scikit-Learn, and others.

We’re also going to continue with our imaging experiment, while these technical issues get worked out in the background. Next up, segmentation!


blog comments powered by Disqus