This post is about experimental and rapidly changing software. Code examples in this post should not be relied upon to work in the future.

## Executive Summary

This post talks about connecting UCX, a high performance networking library, to Dask, a parallel Python library, to accelerate communication-heavy workloads, particularly when using GPUs.

Additionally, we do this work on a DGX, a high-end multi-CPU multi-GPU machine with a complex internal network. Working in this context was good to force improvements in setting up Dask in heterogeneous situations targeting different network cards, CPU sockets, GPUs, and so on..

## Motivation

Many distributed computing workloads are communication-bound. This is common in cases like the following:

1. Dataframe joins
2. Machine learning algorithms
3. Complex array computations

Communication becomes a bigger bottleneck as we accelerate our computation, such as when we use GPUs for computing.

Historically, high performance communication was only available using MPI, or with custom solutions. This post describes an effort to get close to the communication bandwidth of MPI while still maintaining the ease of programmability and accessibility of a dynamic system like Dask.

To get high performance networking in Dask, we wrapped UCX with Python and then connected that to Dask.

The OpenUCX project provides a uniform API around various high performance networking libraries like InfiniBand, traditional networking protocols like TCP/shared memory, and GPU-specific protocols like NVLink. It is a layer beneath something like OpenMPI (the main user of OpenUCX today) that figures out which networking system to use.

Python users today don’t have much access to these network libraries, except through MPI, which is sometimes not ideal. (Try searching for “infiniband” on PyPI.)

This led us to create UCX-Py . UCX-Py is a Python wrapper around the UCX C library, which provides a Pythonic API, both with blocking syntax appropriate for traditional HPC programs, as well as a non-blocking async/await syntax for more concurrent programs (like Dask). For more information on UCX I recommend watching Akshay’s UCX talk from the GPU Technology Conference 2019.

Note: UCX-Py was primarily developed by Akshay Venkatesh (UCX, NVIDIA) Tom Augspurger (Dask, Pandas, Anaconda), and Ben Zaitlen (NVIDIA, RAPIDS, Dask))

We then extended Dask communications to optionally use UCX. If you have UCX and UCX-Py installed, then you can use the ucx:// protocol in addresses or the --protocol ucx flag when starting things up, something like this.

$dask-scheduler --protocol ucx Scheduler started at ucx://127.0.0.1:8786$ dask-worker ucx://127.0.0.1:8786

>>> from dask.distributed import Client
>>> client = Client('ucx://127.0.0.1:8786')


## Experiment

We modified our SVD with Dask and CuPy benchmark benchmark to use the UCX protocol for inter-process communication and ran it on half of a DGX machine, using four GPUs. Here is a minimal implementation of the UCX-enabled code:

import cupy

# Define DGX cluster and client
cluster = DGX(CUDA_VISIBLE_DEVICES=[0, 1, 2, 3])
client = Client(cluster)

# Create random data
x = rs.random((1000000, 1000), chunks=(10000, 1000))
x = x.persist()

# Perform distributed SVD
u, s, v = dask.persist(u, s, v)
_ = wait([u, s, v])


By using UCX the overall communication times are reduced by an order of magnitude. To produce the task-stream figures below, the benchmark was run on a DGX-1 with CUDA_VISIBLE_DEVICES=[0,1,2,3]. It is clear that the red task bars, corresponding to inter-process communication, are significantly compressed. Communications that were taking 500ms-1s before now take around 20ms.

Before UCX:

After UCX:

## Diving into the Details

On a GPU using NVLink we can get somewhere between 5-10 GB/s throughput between pairs of GPUs. On a CPU this drops down to 1-2 GB/s (which seems well below optimal). These speeds can affect all Dask workloads (array, dataframe, xarray, ML, …), but when the proper hardware is present, other bottlenecks may occur, such as serialization when dealing with text or JSON-like data.

This of course, depends on this fancy networking hardware being present. On the GPU example above we’re mostly relying on NVLink, but we would also get improved performance on an HPC InfiniBand network or even on a single laptop machine using shared memory transports.

The examples above was run on a DGX machine, which includes all of these transports and more (as well as numerous GPUs).

## DGX

The test machine used above was a DGX-1, which has eight GPUs, two CPU sockets, four Infiniband network cards, and a complex NVLink arrangement. This is a good example of non-uniform hardware. Certain CPUs are closer to certain GPUs and network cards, and understanding this proximity has an order-of-magnitude effect on performance. This situation isn’t unique to DGX machines. The same situation arises when we have …

• Multiple workers in one node, with several nodes in a cluster
• Multiple nodes in one rack, with several racks in a data center
• Multiple data centers, such as is the case with hybrid cloud

Working with the DGX was interesting because it forced us to start thinking about heterogeneity, and making it easier to specify complex deployment scenarios with Dask.

Here is a diagram showing how the GPUs, CPUs, and Infiniband cards are connected to each other in a DGX-1:

And here the output of nvidia-smi showing the NVLink, networking, and CPU affinity structure (this is mostly orthogonal to the structure displayed above).

$nvidia-smi topo -m GPU0 GPU1 GPU2 GPU3 GPU4 GPU5 GPU6 GPU7 ib0 ib1 ib2 ib3 GPU0 X NV1 NV1 NV2 NV2 SYS SYS SYS PIX SYS PHB SYS GPU1 NV1 X NV2 NV1 SYS NV2 SYS SYS PIX SYS PHB SYS GPU2 NV1 NV2 X NV2 SYS SYS NV1 SYS PHB SYS PIX SYS GPU3 NV2 NV1 NV2 X SYS SYS SYS NV1 PHB SYS PIX SYS GPU4 NV2 SYS SYS SYS X NV1 NV1 NV2 SYS PIX SYS PHB GPU5 SYS NV2 SYS SYS NV1 X NV2 NV1 SYS PIX SYS PHB GPU6 SYS SYS NV1 SYS NV1 NV2 X NV2 SYS PHB SYS PIX GPU7 SYS SYS SYS NV1 NV2 NV1 NV2 X SYS PHB SYS PIX ib0 PIX PIX PHB PHB SYS SYS SYS SYS X SYS PHB SYS ib1 SYS SYS SYS SYS PIX PIX PHB PHB SYS X SYS PHB ib2 PHB PHB PIX PIX SYS SYS SYS SYS PHB SYS X SYS ib3 SYS SYS SYS SYS PHB PHB PIX PIX SYS PHB SYS X CPU Affinity GPU0 0-19,40-59 GPU1 0-19,40-59 GPU2 0-19,40-59 GPU3 0-19,40-59 GPU4 20-39,60-79 GPU5 20-39,60-79 GPU6 20-39,60-79 GPU7 20-39,60-79 Legend: X = Self SYS = Traverse PCIe as well as the SMP interconnect between NUMA nodes NODE = Travrese PCIe as well as the interconnect between PCIe Host Bridges PHB = Traverse PCIe as well as a PCIe Host Bridge (typically the CPU) PXB = Traverse multiple PCIe switches (without PCIe Host Bridge) PIX = Traverse a single PCIe switch NV# = Traverse a bonded set of # NVLinks  The DGX was originally designed for deep learning applications. The complex network infrastructure above can be well used by specialized NVIDIA networking libraries like NCCL, which knows how to route things correctly, but is something of a challenge for other more general purpose systems like Dask to adapt to. Fortunately, in meeting this challenge we were able to clean up a number of related issues in Dask. In particular we can now: 1. Specify a more heterogeneous worker configuration when starting up a local cluster dask/distributed #2675 2. Learn bandwidth over time dask/distributed #2658 3. Add Worker plugins to help handle things like CPU affinity (though this is quite general) dask/distributed #2453 With these changes we’re now able to describe most of the DGX structure as configuration in the Python function below: import os from dask.distributed import Nanny, SpecCluster, Scheduler from distributed.worker import TOTAL_MEMORY from dask_cuda.local_cuda_cluster import cuda_visible_devices class CPUAffinity: """ A Worker plugin to pin CPU affinity """ def __init__(self, cores): self.cores = cores def setup(self, worker=None): os.sched_setaffinity(0, self.cores) affinity = { # See nvidia-smi topo -m 0: list(range(0, 20)) + list(range(40, 60)), 1: list(range(0, 20)) + list(range(40, 60)), 2: list(range(0, 20)) + list(range(40, 60)), 3: list(range(0, 20)) + list(range(40, 60)), 4: list(range(20, 40)) + list(range(60, 79)), 5: list(range(20, 40)) + list(range(60, 79)), 6: list(range(20, 40)) + list(range(60, 79)), 7: list(range(20, 40)) + list(range(60, 79)), } def DGX( interface="ib", dashboard_address=":8787", threads_per_worker=1, silence_logs=True, CUDA_VISIBLE_DEVICES=None, **kwargs ): """ A Local Cluster for a DGX 1 machine NVIDIA's DGX-1 machine has a complex architecture mapping CPUs, GPUs, and network hardware. This function creates a local cluster that tries to respect this hardware as much as possible. It creates one Dask worker process per GPU, and assigns each worker process the correct CPU cores and Network interface cards to maximize performance. That being said, things aren't perfect. Today a DGX has very high performance between certain sets of GPUs and not others. A Dask DGX cluster that uses only certain tightly coupled parts of the computer will have significantly higher bandwidth than a deployment on the entire thing. Parameters ---------- interface: str The interface prefix for the infiniband networking cards. This is often "ib" or "bond". We will add the numeric suffix 0,1,2,3 as appropriate. Defaults to "ib". dashboard_address: str The address for the scheduler dashboard. Defaults to ":8787". CUDA_VISIBLE_DEVICES: str String like "0,1,2,3" or [0, 1, 2, 3] to restrict activity to different GPUs Examples -------- >>> from dask_cuda import DGX >>> from dask.distributed import Client >>> cluster = DGX(interface='ib') >>> client = Client(cluster) """ if CUDA_VISIBLE_DEVICES is None: CUDA_VISIBLE_DEVICES = os.environ.get("CUDA_VISIBLE_DEVICES", "0,1,2,3,4,5,6,7") if isinstance(CUDA_VISIBLE_DEVICES, str): CUDA_VISIBLE_DEVICES = CUDA_VISIBLE_DEVICES.split(",") CUDA_VISIBLE_DEVICES = list(map(int, CUDA_VISIBLE_DEVICES)) memory_limit = TOTAL_MEMORY / 8 spec = { i: { "cls": Nanny, "options": { "env": { "CUDA_VISIBLE_DEVICES": cuda_visible_devices( ii, CUDA_VISIBLE_DEVICES ), "UCX_TLS": "rc,cuda_copy,cuda_ipc", }, "interface": interface + str(i // 2), "protocol": "ucx", "ncores": threads_per_worker, "data": dict, "preload": ["dask_cuda.initialize_context"], "dashboard_address": ":0", "plugins": [CPUAffinity(affinity[i])], "silence_logs": silence_logs, "memory_limit": memory_limit, }, } for ii, i in enumerate(CUDA_VISIBLE_DEVICES) } scheduler = { "cls": Scheduler, "options": { "interface": interface + str(CUDA_VISIBLE_DEVICES[0] // 2), "protocol": "ucx", "dashboard_address": dashboard_address, }, } return SpecCluster( workers=spec, scheduler=scheduler, silence_logs=silence_logs, **kwargs )  However, we never got the NVLink structure down. The Dask scheduler currently still assumes uniform bandwidths between workers. We’ve started to make small steps towards changing this, but we’re not there yet (this will be useful as well for people that want to think about in-rack or cross-data-center deployments). As usual, in solving a highly specific problem, we were able to solve a number of lingering general features, which then made our specific problem easy to write down. ## Future Work There has been significant effort over the last few months make everything above work. In particular we … • Modified UCX to support client-server workloads • Wrapped UCX with UCX-Py and design a Python async-await friendly interface • Wrapped UCX-Py with Dask • Hooked everything together to make generic workloads function well The result is quite nice, especially for more communication heavy workloads. However there is still plenty to do. This section details what we’re thinking about now to continue this work. • Routing within complex networks: If you restrict yourself to four of the eight GPUs in a DGX, you can get 5-12 GB/s between pairs of GPUs. For some workloads this can be significant. It makes the system feel much more like a single unit than a bunch of isolated machines. However we still can’t get great performance across the whole DGX because there are many GPU-pairs that are not connected by NVLink, and so we get 10x slower speeds. These dominate communication costs if you naively try to use the full DGX. This might be solved either by: 1. Teaching Dask to avoid these communications 2. Teaching UCX to route communications like these through a chain of multiple NVLink connections 3. Avoiding complex networks altogether. Newer systems like the DGX-2 use NVSwitch, which provides uniform connectivity, with each GPU connected to every other GPU. Edit: I’ve since learned that UCX should be able to handle this. We should still get PCIe speeds (around 4-7 GB/s) even when we don’t have NVLink once an upstream bug gets fixed. Hooray! • CPU: We can get 1-2 GB/s across InfiniBand, which isn’t bad, but also wasn’t the full 5-8 GB/s that we were hoping for. This deserves more serious profiling to determine what is going wrong. The current guess is that this has to do with memory allocations. In [1]: %time _ = b'0' * 1000000000 # 1 GB CPU times: user 248 ms, sys: 223 ms, total: 472 ms Wall time: 470 ms # <<----- Around 2 GB/s. Slower than I expected  Probably we’re just doing something dumb here. • Package UCX: Currently I’m building the UCX and UCX-Py libraries from source (see appendix below for instructions). Ideally these would become conda packages. John Kirkham (Conda Forge, NVIDIA, Dask) is taking a look at this along with the UCX developers from Mellanox. See ucx-py #65 for more information. • Learn Heterogeneous Bandwidths: In order to make good scheduling decisions Dask needs to estimate how long it will take to move data between machines. This question is now becoming much more complex, and depends on both the source and destination machines (the network topology) the data type (NumPy array, GPU array, Pandas Dataframe with text) and more. In complex situations our bandwidths can span a 100x range (100 MB/s to 10 GB/s). Dask will have to develop more complex models for bandwidth, and learn these over time. See dask/distributed #2743 for more information. • Support other GPU libraries: To send GPU data around we need to teach Dask how to serialize Python objects into GPU buffers. There is code in the dask/distributed repository to do this for Numba, CuPy, and RAPIDS cuDF objects, but we’ve really only tested CuPy seriously. We should expand this by some of the following steps: 1. Try a distributed Dask cuDF join computation See dask/distributed #2746 for initial work here. 2. Teach Dask to serialize array GPU libraries, like PyTorch and TensorFlow, or possibly anything that supports the __cuda_array_interface__ protocol. • Track down communication failures: We still occasionally get unexplained communication failures. We should stress test this system to discover rough corners. • TCP: Groups with high performing TCP networks can’t yet make use of UCX+Dask (though they can use either one individually). Currently using UCX in a client-server mode as we’re doing with Dask requires access to RDMA libraries, which are often not found on systems without networking systems like InfiniBand. This means that groups with high performing TCP networks can’t make use of UCX+Dask. This is in progress at openucx/ucx #3570 • Commodity Hardware: Currently this code is only really useful on high performance Linux systems that have InfiniBand or NVLink. However, it would be nice to also use this on more commodity systems, including personal laptop computers using TCP and shared memory. Currently Dask uses TCP for inter-process communication on a single machine. Using UCX on a personal computer would give us access to shared memory speeds, which tend to be an order of magnitude faster. See openucx/ucx #3663 for more information. • Tune Performance: The 5-10 GB/s bandwidths that we see with NVLink today are sub-optimal. With UCX-Py alone we’re able to get something like 15 GB/s on large message tranfers. We should benchmark and tune our implementation to see what is taking up the extra time. Until things work more robustly though, this is a secondary priority. # Appendix: Setup Performing these experiments depends currently on development branches in a few repositories. This section includes my current setup. ## Create Conda Environment conda create -n ucx python=3.7 libtool cmake automake autoconf cython bokeh pytest pkg-config ipython dask numba -y  Note: for some reason using conda-forge makes the autogen step below fail. ## Set up UCX # Clone UCX repository and get branch git clone https://github.com/openucx/ucx cd ucx git remote add Akshay-Venkatesh [email protected]:Akshay-Venkatesh/ucx.git git remote update Akshay-Venkatesh git checkout ucx-cuda # Build git clean -xfd export CUDA_HOME=/usr/local/cuda-9.2/ ./autogen.sh mkdir build cd build ../configure --prefix=$CONDA_PREFIX --enable-debug --with-cuda=$CUDA_HOME --enable-mt --disable-cma CPPFLAGS="-I//usr/local/cuda-9.2/include" make -j install # Verify ucx_info -d which ucx_info # verify that this is in the conda environment # Verify that we see NVLink speeds ucx_perftest -t tag_bw -m cuda -s 1048576 -n 1000 & ucx_perftest dgx15 -t tag_bw -m cuda -s 1048576 -n 1000  ## Set up UCX-Py git clone [email protected]:rapidsai/ucx-py cd ucx-py export UCX_PATH=$CONDA_PREFIX
make install


git clone [email protected]:dask/dask.git
pip install -e .
cd ..

git clone [email protected]:dask/distributed.git
cd distributed
pip install -e .
cd ..


## Optionally set up cupy

pip install cupy-cuda92==6


## Optionally set up cudf

conda install -c rapidsai-nightly -c conda-forge -c numba cudf dask-cudf cudatoolkit=9.2


## Optionally set up JupyterLab

conda install ipykernel jupyterlab nb_conda_kernels nodejs


pip install dask_labextension


## My Benchmark

I’ve been using the following benchmark to test communication. It allocates a chunked Dask array, and then adds it to its transpose, which forces a lot of communication, but not much computation.

from collections import defaultdict
import asyncio
import time
import numpy as np
from pprint import pprint
import cupy

from distributed.utils import format_time, format_bytes

async def f():

# Set up workers on the local machine
async with DGX(asynchronous=True, silence_logs=True) as cluster:
async with Client(cluster, asynchronous=True) as client:

# Create a simple random array
rs = da.random.RandomState(RandomState=cupy.random.RandomState)
x = rs.random((40000, 40000), chunks='128 MiB').persist()
print(x.npartitions, 'chunks')
await wait(x)

# Add X to its transpose, forcing computation
y = (x + x.T).sum()
result = await client.compute(y)

# Collect, aggregate, and print peer-to-peer bandwidths
bandwidths = defaultdict(list)
for k, L in incoming_logs.items():
for d in L:
if d['total'] > 1_000_000:
bandwidths[k, d['who']].append(d['bandwidth'])
bandwidths = {
(cluster.scheduler.workers[w1].name,
cluster.scheduler.workers[w2].name): [format_bytes(x) + '/s' for x in np.quantile(v, [0.25, 0.50, 0.75])]
for (w1, w2), v in bandwidths.items()
}
pprint(bandwidths)

if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(f())
`

Note: most of this example is just getting back diagnostics, which can be easily ignored. Also, you can drop the async/await code if you like. I think that there should probably be more examples in the world using Dask with async/await syntax, so I decided to leave it in.