Single-Node Multi-GPU Dataframe Joins
By Matthew Rocklin
Summary
We experiment with single-node multi-GPU joins using cuDF and Dask. We find that the in-GPU computation is faster than communication. We also present context and plans for near-future work, including improving high performance communication in Dask with UCX.
Here is a notebook of the experiment in this post
Introduction
In a recent post we showed how Dask + cuDF could accelerate reading CSV files using multiple GPUs in parallel. That operation quickly became bound by the speed of our disk after we added a few GPUs. Now we try a very different kind of operation, multi-GPU joins.
This workload can be communication-heavy, especially if the column on which we are joining is not sorted nicely, and so provides a good example on the other extreme from parsing CSV.
Benchmark
Construct random data using the CPU
Here we use Dask array and Dask dataframe to construct two random tables with a
shared id
column. We can play with the number of rows of each table and the
number of keys to make the join challenging in a variety of ways.
import dask.array as da
import dask.dataframe as dd
n_rows = 1000000000
n_keys = 5000000
left = dd.concat([
da.random.random(n_rows).to_dask_dataframe(columns='x'),
da.random.randint(0, n_keys, size=n_rows).to_dask_dataframe(columns='id'),
], axis=1)
n_rows = 10000000
right = dd.concat([
da.random.random(n_rows).to_dask_dataframe(columns='y'),
da.random.randint(0, n_keys, size=n_rows).to_dask_dataframe(columns='id'),
], axis=1)
Send to the GPUs
We have two Dask dataframes composed of many Pandas dataframes of our random
data. We now map the cudf.from_pandas
function across these to make a Dask
dataframe of cuDF dataframes.
import dask
import cudf
gleft = left.map_partitions(cudf.from_pandas)
gright = right.map_partitions(cudf.from_pandas)
gleft, gright = dask.persist(gleft, gright) # persist data in device memory
What’s nice here is that there wasn’t any special
dask_pandas_dataframe_to_dask_cudf_dataframe
function. Dask composed nicely
with cuDF. We didn’t need to do anything special to support it.
We’ll also persisted the data in device memory.
After this, simple operations are easy and fast and use our eight GPUs.
>>> gleft.x.sum().compute() # this takes 250ms
500004719.254711
Join
We’ll use standard Pandas syntax to merge the datasets, persist the result in RAM, and then wait
out = gleft.merge(gright, on=['id']) # this is lazy
Profile and analyze results
We now look at the Dask diagnostic plots for this computation.
Task stream and communication
When we look at Dask’s task stream plot we see that each of our eight threads (each of which manages a single GPU) spent most of its time in communication (red is communication time). The actual merge and concat tasks are quite fast relative to the data transfer time.
That’s not too surprising. For this computation I’ve turned off any attempt to communicate between devices (more on this below) so the data is being moved from the GPU to the CPU memory, then serialized and put onto a TCP socket. We’re moving tens of GB on a single machine, so we’re seeing about 1GB/s total throughput of the system, which is typical for TCP-on-localhost in Python.
Flamegraph of computation
We can also look more deeply at the computational costs in Dask’s flamegraph-style plot. This shows which lines of our functions were taking up the most time (down to the Python level at least).
This Flame graph shows which lines of cudf code we spent time on while computing (excluding the main communication costs mentioned above). It may be interesting for those trying to further optimize performance. It shows that most of our costs are in memory allocation. Like communication, this has actually also been fixed in RAPIDS’ optional memory management pool, it just isn’t default yet, so I didn’t use it here.
Plans for efficient communication
The cuDF library actually has a decent approach to single-node multi-GPU communication that I’ve intentionally turned off for this experiment. That approach cleverly used Dask to communicate device pointer information using Dask’s normal channels (this is small and fast) and then used that information to initiate a side-channel communication for the bulk of the data. This approach was effective, but somewhat fragile. I’m inclined to move on for it in favor of …
UCX. The UCX project provides a single API that wraps around several transports like TCP, Infiniband, shared memory, and also GPU-specific transports. UCX claims to find the best way to communicate data between two points given the hardware available. If Dask were able to use this for communication then it would provide both efficient GPU-to-GPU communication on a single machine, and also efficient inter-machine communication when efficient networking hardware like Infiniband was present, even outside the context of GPUs.
There is some work we need to do here:
- We need to make a Python wrapper around UCX
- We need to make an optional Dask Comm
around this ucx-py library that allows users to specify endpoints like
ucx://path-to-scheduler
- We need to make Python memoryview-like objects that refer to device memory
- …
This work is already in progress by Akshay Vekatesh, who works on UCX, and Tom Augspurger a core Dask/Pandas developer. I suspect that they’ll write about it soon. I’m looking forward to seeing what comes of it, both for Dask and for high performance Python generally.
It’s worth pointing out that this effort won’t just help GPU users. It should help anyone on advanced networking hardware, including the mainstream scientific HPC community.
Summary
Single-node Mutli-GPU joins have a lot of promise. In fact, earlier RAPIDS developers got this running much faster than I was able to do above through the clever communication tricks I briefly mentioned. The main purpose of this post is to provide a benchmark for joins that we can use in the future, and to highlight when communication can be essential in parallel computing.
Now that GPUs have accelerated the computation time of each of our chunks of work we increasingly find that other systems become the bottleneck. We didn’t care as much about communication before because computational costs were comparable. Now that computation is an order of magnitude cheaper, other aspects of our stack become much more important.
I’m looking forward to seeing where this goes.
Come help!
If the work above sounds interesting to you then come help! There is a lot of low-hanging and high impact work to do.
If you’re interested in being paid to focus more on these topics, then consider applying for a job. NVIDIA’s RAPIDS team is looking to hire engineers for Dask development with GPUs and other data analytics library development projects.
blog comments powered by Disqus