Dask and ITK for large scale image analysis
By John Kirkham, Matthew Rocklin, Matthew McCormick
This post explores using the ITK suite of image processing utilities in parallel with Dask Array.
We cover …
- A simple but common example of applying deconvolution across a stack of 3d images
- Tips on how to make these two libraries work well together
- Challenges that we ran into and opportunities for future improvements.
A Worked Example
Let’s start with a full example applying Richardson Lucy deconvolution to a stack of light sheet microscopy data. This is the same data that we showed how to load in our last blogpost on image loading:
# Load our data from last time¶ import dask.array as da imgs = da.from_zarr("AOLLSMData_m4_raw.zarr/", "data")
# Load our Point Spread Function (PSF) import dask.array.image psf = dask.array.image.imread("AOLLSMData/m4/psfs_z0p1/*.tif")[:, None, ...]
# Convert data to float32 for computation¶ import numpy as np imgs = imgs.astype(np.float32) # Note: the psf needs to be sampled with a voxel spacing # consistent with the image's sampling psf = psf.astype(np.float32)
# Apply Richardson-Lucy Deconvolution¶ def richardson_lucy_deconvolution(img, psf, iterations=1): """ Apply deconvolution to a single chunk of data """ import itk img = img[0, 0, ...] # remove leading two length-one dimensions psf = psf[0, 0, ...] # remove leading two length-one dimensions image = itk.image_view_from_array(img) # Convert to ITK object kernel = itk.image_view_from_array(psf) # Convert to ITK object deconvolved = itk.richardson_lucy_deconvolution_image_filter( image, kernel_image=kernel, number_of_iterations=iterations ) result = itk.array_from_image(deconvolved) # Convert back to Numpy array result = result[None, None, ...] # Add back the leading length-one dimensions return result out = da.map_blocks(richardson_lucy_deconvolution, imgs, psf, dtype=np.float32)
# Create a local cluster of dask worker processes # (this could also point to a distributed cluster if you have it) from dask.distributed import LocalCluster, Client cluster = LocalCluster(n_workers=20, threads_per_process=1) client = Client(cluster) # now dask operations use this cluster by default # Trigger computation and store out.to_zarr("AOLLSMData_m4_raw.zarr", "deconvolved", overwrite=True)
So in the example above we …
- Load data both from Zarr and TIFF files into multi-chunked Dask arrays
- Construct a function to apply an ITK routine onto each chunk
- Apply that function across the dask array with the dask.array.map_blocks function.
- Store the result back into Zarr format
From the perspective of an imaging scientist,
the new piece of technology here is the
Given a Dask array composed of many NumPy arrays and a function,
map_blocks applies that function across each block in parallel, returning a Dask array as a result.
It’s a great tool whenever you want to apply an operation across many blocks in a simple fashion.
Because Dask arrays are just made out of Numpy arrays it’s an easy way to
compose Dask with the rest of the Scientific Python ecosystem.
Building the right function
However in this case there are a few challenges to constructing the right Numpy -> Numpy function, due to both idiosyncrasies in ITK and Dask Array. Let’s look at our function again:
def richardson_lucy_deconvolution(img, psf, iterations=1): """ Apply deconvolution to a single chunk of data """ import itk img = img[0, 0, ...] # remove leading two length-one dimensions psf = psf[0, 0, ...] # remove leading two length-one dimensions image = itk.image_view_from_array(img) # Convert to ITK object kernel = itk.image_view_from_array(psf) # Convert to ITK object deconvolved = itk.richardson_lucy_deconvolution_image_filter( image, kernel_image=kernel, number_of_iterations=iterations ) result = itk.array_from_image(deconvolved) # Convert back to Numpy array result = result[None, None, ...] # Add back the leading length-one dimensions return result out = da.map_blocks(richardson_lucy_deconvolution, imgs, psf, dtype=np.float32)
This is longer than we would like.
Instead, we would have preferred to just use the
itk function directly,
without all of the steps before and after.
deconvolved = da.map_blocks(itk.richardson_lucy_deconvolution_image_filter, imgs, psf)
What were the extra steps in our function and why were they necessary?
Convert to and from ITK Image objects: ITK functions don’t consume and produce Numpy arrays, they consume and produce their own
Imagedata structure. There are convenient functions to convert back and forth, so handling this is straightforward, but it does need to be handled each time. See ITK #1136 for a feature request that would remove the need for this step.
Unpack and pack singleton dimensions: Our Dask arrays have shapes like the following:
Array Shape: (3, 199, 201, 1024, 768) Chunk Shape: (1, 1, 201, 1024, 768)
map_blocksfunction gets NumPy arrays of the chunk size,
(1, 1, 201, 1024, 768). However, our ITK functions are meant to work on 3d arrays, not 5d arrays, so we need to remove those first two dimensions.
img = img[0, 0, ...] # remove leading two length-one dimensions psf = psf[0, 0, ...] # remove leading two length-one dimensions
And then when we’re done, Dask expects to get back 5d arrays like what it provided, so we add these singleton dimensions back in
result = result[None, None, ...] # Add back the leading length-one dimensions
Again, this is straightforward for users who are accustomed to NumPy slicing syntax, but does need to be done each time. This adds some friction to our development process, and is another step that can confuse users.
But if you’re comfortable working around things like this,
then ITK and
map_blocks can be a powerful combination
if you want to parallelize out ITK operations across a cluster.
Defining a Dask Cluster
Above we used
dask.distributed.LocalCluster to set up 20 single-threaded
workers on our local workstation:
from dask.distributed import LocalCluster, Client cluster = LocalCluster(n_workers=20, threads_per_process=1) client = Client(cluster) # now dask operations use this cluster by default
If you had a distributed resource, this is where you would connect it.
You would swap out
LocalCluster with one of
Dask’s other deployment options.
Also, we found that we needed to use many single-threaded processes rather than one multi-threaded process because ITK functions seem to still hold onto the GIL. This is fine, we just need to be aware of it so that we set up our Dask workers appropriately with one thread per process for maximum efficiency. See ITK #1134 for an active Github issue on this topic.
We had some difficulty when using the ITK library across multiple processes, because the library itself didn’t serialize well. (If you don’t understand what that means, don’t worry). We solved a bit of this in ITK #1090, but some issues still remain.
We got around this by including the import in the function rather than outside of it.
def richardson_lucy_deconvolution(img, psf, iterations=1): import itk # <--- we work around serialization issues by importing within the function
That way each task imports itk individually, and we sidestep this issue.
We also tried out the Richardson Lucy deconvolution operation in Scikit-Image. Scikit-Image is known for being more Scipy/Numpy native, but not always as fast as ITK. Our experience confirmed this perception.
First, we were glad to see that the scikit-image function worked with
map_blocks immediately without any packing/unpacking, dimensionality, or
import skimage.restoration out = da.map_blocks(skimage.restoration.richardson_lucy, imgs, psf) # just works
So all of that converting to and from image objects or removing and adding singleton dimensions isn’t necessary here.
In terms of performance we were also happy to see that Scikit-Image released the GIL, so we were able to get very high reported CPU utilization when using a small number of multi-threaded processes. However, even though CPU utilization was high, our parallel performance was poor enough that we stuck with the ITK solution, warts and all. More information about this is available in Github issue scikit-image #4083.
Note: sequentially on a single chunk, ITK ran in around 2 minutes while scikit-image ran in 3 minutes. It was only once we started parallelizing that things became slow.
Regardless, our goal in this experiment was to see how well ITK and Dask array played together. It was nice to see what smooth integration looks like, if only to motivate future development in ITK+Dask relations.
An alternative to
da.map_blocks are Generalized Universal Functions (gufuncs)
These are functions that have many magical properties, one of which is that
they operate equally well on both NumPy and Dask arrays. If libraries like
ITK or Scikit-Image make their functions into gufuncs then they work without
users having to do anything special.
The easiest way to implement gufuncs today is with Numba. I did this on our
richardson_lucy function, just to show how it could work, in case
other libraries want to take this on in the future.
import numba @numba.guvectorize( ["float32[:,:,:], float32[:,:,:], float32[:,:,:]"], # we have to specify types "(i,j,k),(a,b,c)->(i,j,k)", # and dimensionality explicitly forceobj=True, ) def richardson_lucy_deconvolution(img, psf, out): # <---- no dimension unpacking! iterations = 1 image = itk.image_view_from_array(np.ascontiguousarray(img)) kernel = itk.image_view_from_array(np.ascontiguousarray(psf)) deconvolved = itk.richardson_lucy_deconvolution_image_filter( image, kernel_image=kernel, number_of_iterations=iterations ) out[:] = itk.array_from_image(deconvolved) # Now this function works natively on either NumPy or Dask arrays out = richardson_lucy_deconvolution(imgs, psf) # <-- no map_blocks call!
Note that we’ve both lost the dimension unpacking and the
Our function now knows enough information about how it can broadcast that Dask
can do the parallelization without being told what to do explicitly.
This adds some burden onto library maintainers, but makes the user experience much more smooth.
When doing some user research on image processing and Dask, almost everyone we interviewed said that they wanted faster deconvolution. This seemed to be a major pain point. Now we know why. It’s both very common, and very slow.
Running deconvolution on a single chunk of this size takes around 2-4 minutes, and we have hundreds of chunks in a single dataset. Multi-core parallelism can help a bit here, but this problem may also be ripe for GPU acceleration. Similar operations typically have 100x speedups on GPUs. This might be a more pragmatic solution than scaling out to large distributed clusters.
This experiment both …
- Gives us an example that other imaging scientists can copy and modify to be effective with Dask and ITK together.
Highlights areas of improvement where developers from the different libraries can work to remove some of these rough interactions spots in the future.
It’s worth noting that Dask has done this with lots of libraries within the Scipy ecosystem, including Pandas, Scikit-Image, Scikit-Learn, and others.
We’re also going to continue with our imaging experiment, while these technical issues get worked out in the background. Next up, segmentation!
blog comments powered by Disqus