Summary

Over the last six months many Dask developers have worked on making Dask easier to deploy in a wide variety of situations. This post summarizes those efforts, and provides links to ongoing work.

What we mean by Deployment

In order to run Dask on a cluster, you need to setup a scheduler on one machine:

$ dask-scheduler
Scheduler running at tcp://192.168.0.1

And start Dask workers on many other machines

$ dask-worker tcp://192.168.0.1
Waiting to connect to:       tcp://scheduler:8786

$ dask-worker tcp://192.168.0.1
Waiting to connect to:       tcp://scheduler:8786

$ dask-worker tcp://192.168.0.1
Waiting to connect to:       tcp://scheduler:8786

$ dask-worker tcp://192.168.0.1
Waiting to connect to:       tcp://scheduler:8786

For informal clusters people might do this manually, by logging into each machine and running these commands themselves. However it’s much more common to use a cluster resource manager such as Kubernetes, Yarn (Hadoop/Spark), HPC batch schedulers (SGE, PBS, SLURM, LSF …), some cloud service or some custom system.

As Dask is used by more institutions and used more broadly within those institutions, making deployment smooth and natural becomes increasingly important. This is so important in fact, that there have been seven separate efforts to improve deployment in some regard or another by a few different groups.

We’ll briefly summarize and link to this work below, and then we’ll finish up by talking about some internal design that helped to make this work more consistent.

Dask-SSH

According to our user survey, the most common deployment mechanism was still SSH. Dask has had a command line dask-ssh tool to make it easier to deploy with SSH for some time. We recently updated this to also include a Python interface, which provides more control.

>>> from dask.distributed import Client, SSHCluster
>>> cluster = SSHCluster(
...     ["host1", "host2", "host3", "host4"],
...     connect_options={"known_hosts": None},
...     worker_options={"nthreads": 2},
...     scheduler_options={"port": 0, "dashboard_address": ":8797"}
... )
>>> client = Client(cluster)

This isn’t what we recommend for large institutions, but it can be helpful for more informal groups who are just getting started.

Dask-Jobqueue and Dask-Kubernetes Rewrite

We’ve rewritten Dask-Jobqueue for SLURM/PBS/LSF/SGE cluster managers typically found in HPC centers and Dask-Kubernetes. These now share a common codebase along with Dask SSH, and so are much more consistent and hopefully bug free.

Ideally users shouldn’t notice much difference with existing workloads, but new features like asynchronous operation, integration with the Dask JupyterLab extension, and so on are more consistently available. Also, we’ve been able to unify development and reduce our maintenance burden considerably.

The new version of Dask Jobqueue where these changes take place is 0.7.0, and the work was done in dask/dask-jobqueue #307. The new version of Dask Kubernetes is 0.10.0 and the work was done in dask/dask-kubernetes #162.

Dask-CloudProvider

For cloud deployments we generally recommend using a hosted Kubernetes or Yarn service, and then using Dask-Kubernetes or Dask-Yarn on top of these.

However, some institutions have made decisions or commitments to use certain vendor specific technologies, and it’s more convenient to use APIs that are more native to the particular cloud. The new package Dask Cloudprovider handles this today for Amazon’s ECS API, which has been around for a long while and is more universally accepted.

from dask_cloudprovider import ECSCluster
cluster = ECSCluster(cluster_arn="arn:aws:ecs:<region>:<acctid>:cluster/<clustername>")

from dask_cloudprovider import FargateCluster
cluster = FargateCluster()

Dask-Gateway

In some cases users may not have access to the cluster manager. For example the institution may not give all of their data science users access to the Yarn or Kubernetes cluster. In this case the Dask-Gateway project may be useful. It can launch and manage Dask jobs, and provide a proxy connection to these jobs if necessary. It is typically deployed with elevated permissions but managed directly by IT, giving them a point of greater control.

GPUs and Dask-CUDA

While using Dask with multi-GPU deployments the NVIDIA RAPIDS has needed the ability to specify increasingly complex setups of Dask workers. They recommend the following deployment strategy:

  1. One Dask-worker per GPU on a machine
  2. Specify the CUDA_VISIBLE_DEVICES environment variable to pin that worker to that GPU
  3. If your machine has multiple network interfaces then choose the network interface that has the best connection to that GPU
  4. If your machine has multiple CPUs then set thread affinities to use the closest CPU
  5. … and more

For this reason we wanted to specify these configurations in code, like the following:

specification = {
    "worker-0": {
        "cls": dask.distributed.Nanny,
        "options": {"nthreads": 1, "env": {"CUDA_VISIBLE_DEVICES": "0,1,2,3"}, interface="ib0"},
    },
    "worker-1": {
        "cls": dask.distributed.Nanny,
        "options": {"nthreads": 1, "env": {"CUDA_VISIBLE_DEVICES": "1,2,3,0"}, interface="ib0"},
    },
    "worker-2": {
        "cls": dask.distributed.Nanny,
        "options": {"nthreads": 1, "env": {"CUDA_VISIBLE_DEVICES": "2,3,0,1"}, interface="ib1"},
    },
    "worker-2": {
        "cls": dask.distributed.Nanny,
        "options": {"nthreads": 1, "env": {"CUDA_VISIBLE_DEVICES": "3,0,1,2"}, interface="ib1"},
    },
}

And the new SpecCluster class to deploy these workers:

cluster = SpecCluster(workers=specification)

We’ve used this technique in the Dask-CUDA project to provide convenient functions for deployment on multi-GPU systems.

This class was generic enough that it ended up forming the base of the SSH, Jobqueue, and Kubernetes solutions as well.

Standards and Conventions

The solutions above are built by different teams that work in different companies. This is great because those teams have hands-on experience with the cluster managers in the wild, but has historically been somewhat challenging to standardize user experience. This is particularly challenging when we build other tools like IPython widgets or the Dask JupyterLab extension, which want to interoperate with all of the Dask deployment solutions.

The recent rewrite of Dask-SSH, Dask-Jobqueue, Dask-Kubernetes, and the new Dask-Cloudprovider and Dask-CUDA libraries place them all under the same dask.distributed.SpecCluster superclass. So we can expect a high degree of uniformity from them. Additionally, all of the classes now match the dask.distributed.Cluster interface, which standardizes things like adaptivity, IPython widgets, logs, and some basic reporting.

  • Cluster
    • SpecCluster
      • Kubernetes
      • JobQueue (PBS/SLURM/LSF/SGE/Torque/Condor/Moab/OAR)
      • SSH
      • CloudProvider (ECS)
      • CUDA (LocalCUDACluster, DGX)
      • LocalCluster
    • Yarn
    • Gateway

Future Work

There is still plenty to do. Here are some of the themes we’ve seen among current development:

  1. Move the Scheduler off to a separate job/pod/container in the network, which is often helpful for complex networking situations
  2. Improve proxying of the dashboard in these situations
  3. Optionally separate the life-cycle of the cluster from the lifetime of the Python process that requested the cluster
  4. Write up best practices how to compose GPU support generally with all of the cluster managers

blog comments powered by Disqus