Managing dask workloads with Flyte
By Bernhard Stadlbauer
It is now possible to manage
dask workloads using Flyte 🎉!
The major advantages are:
- Each Flyte
taskspins up its own ephemeral
daskcluster using a Docker image tailored to the task, ensuring consistency in the Python environment across the client, scheduler, and workers.
- Flyte will use the existing Kubernetes infrastructure to spin up
- Spot/Preemtible instances are natively supported.
- The whole
dasktask can be cached.
dasksupport in an already running Flyte setup can be done in just a few minutes.
This is what a Flyte
task backed by a
dask cluster with four workers looks like:
from typing import List from distributed import Client from flytekit import task, Resources from flytekitplugins.dask import Dask, WorkerGroup, Scheduler def inc(x): return x + 1 @task( task_config=Dask( scheduler=Scheduler( requests=Resources(cpu="2") ), workers=WorkerGroup( number_of_workers=4, limits=Resources(cpu="8", mem="32Gi") ) ) ) def increment_numbers(list_length: int) -> List[int]: client = Client() futures = client.map(inc, range(list_length)) return client.gather(futures)
This task can run locally using a standard
distributed.Client() and can scale to arbitrary cluster sizes once registered with Flyte.
What is Flyte?
Flyte is a Kubernetes native workflow orchestration engine. Originally developed at Lyft, it is now an open-source (Github) and a graduate project under the Linux Foundation. It stands out among similar tools such as Airflow or Argo due to its key features, which include:
- Caching/Memoization of previously executed tasks for improved performance
- Kubernetes native
- Workflow definitions in Python, not e.g.,
- Strong typing between tasks and workflows using
- Dynamic generation of workflow DAGs at runtime
- Ability to run workflows locally
A simple workflow would look something like the following:
from typing import List import pandas as pd from flytekit import task, workflow, Resources from flytekitplugins.dask import Dask, WorkerGroup, Scheduler @task( task_config=Dask( scheduler=Scheduler( requests=Resources(cpu="2") ), workers=WorkerGroup( number_of_workers=4, limits=Resources(cpu="8", mem="32Gi") ) ) ) def expensive_data_preparation(input_files: List[str]) -> pd.DataFrame: # Expensive, highly parallel `dask` code ... return pd.DataFrame(...) # Some large DataFrame, Flyte will handle serialization @task def train(input_data: pd.DataFrame) -> str: # Model training, can also use GPU, etc. ... return "s3://path-to-model" @workflow def train_model(input_files: List[str]) -> str: prepared_data = expensive_data_preparation(input_files=input_files) return train(input_data=prepared_data)
In the above, both
expensive_data_preparation() as well as
train() would be run in their own Pod(s) in Kubernetes, while the
train_model() workflow is a DSL which creates a Directed Acyclic Graph (DAG) of the workflow. It will determine the order of tasks based on their inputs and outputs. Input and output types (based on the type hints) will be validated at registration time to avoid surprises at runtime.
After registration with Flyte, the workflow can be started from the UI:
Why use the
dask plugin for Flyte?
At first glance, Flyte and
dask look similar in what they are trying to achieve, both capable of creating a DAG from user functions, managing inputs and outputs, etc. However, the major conceptual difference lies in their approach. While
dask has long-lived workers to run tasks, a Flyte task is a designated Kubernetes Pod that creates a significant overhead in task-runtime.
dask tasks incur an overhead of around one millisecond (refer to the docs), spinning up a new Kubernetes pod takes several seconds. The long-lived nature of the
dask workers allows for optimization of the DAG, running tasks that operate on the same data on the same node, reducing the need for inter-worker data serialization (known as shuffling). With Flyte tasks being ephemeral, this optimization is not possible, and task outputs are serialized to a blob storage instead.
Given the limitations discussed above, why use Flyte? Flyte is not intended to replace tools such as dask or Apache Spark, but rather provides an orchestration layer on top. While workloads can be run directly in Flyte, such as training a single GPU model, Flyte offers numerous integrations with other popular data processing tools.
With Flyte managing the
dask cluster lifecycle, each
dask Flyte task will run on its own dedicated
dask cluster made up of Kubernetes pods. When the Flyte task is triggered from the UI, Flyte will spin up a dask cluster tailored to the task, which will then be used to execute the user code. This enables the use of different Docker images with varying dependencies for different tasks, whilst always ensuring that the dependencies of the client, scheduler, and workers are consistent.
What prerequisites are required to run
dask tasks in Flyte?
- The Kubernetes cluster needs to have the dask operator installed.
- Flyte version
1.3.0or higher is required.
daskplugin needs to be enabled in the Flyte propeller config. (refer to the docs)
- The Docker image associated with the task must have the
flytekitplugins-daskpackage installed in its Python environment.
How do things work under the hood?
Note: The following is for reference only and is not necessary for users who only use the plugin. However, it could be useful for easier debugging.
On a high-level overview, the following steps occur when a
dask task is initiated in Flyte:
FlyteWorkflowCustom Resource (CR) is created in Kubernetes.
- Flyte Propeller, a Kubernetes Operator), detects the creation of the workflow.
- The operator inspects the task’s spec and identifies it as a
dasktask. It verifies if it has the required plugin associated with it and locates the
daskplugin within Flyte Propeller picks up the task defintion and creates a
DaskJobCustom Resource using the dask-k8s-operator-go-client.
- The dask operator picks up the
DaskJobresource and runs the job accordingly. It spins up a pod to run the client/job-runner, one for the scheduler, and additional worker pods as designated in the Flyte task decorator.
- While the
dasktask is running, Flyte Propeller continuously monitors the
DaskJobresource, waiting on it to report success or failure. Once the job has finished or the Flyte task has been terminated, all
daskrelated resources will be cleaned up.
- Flyte documentation
- Flyte community
- flytekitplugins-dask user documentation
- flytekitplugins-dask deployment documentation
- dask-kubernetes documentation
- Blog post on the dask kubernetes operator
blog comments powered by Disqus