This work is supported by Anaconda Inc

Summary

Dask DataFrame works well with pandas’ new Extension Array interface, including third-party extension arrays. This lets Dask

  1. easily support pandas’ new extension arrays, like their new nullable integer array
  2. support third-party extension array arrays, like cyberpandas’s IPArray

Background

Pandas 0.23 introduced the ExtensionArray, a way to store things other than a simple NumPy array in a DataFrame or Series. Internally pandas uses this for data types that aren’t handled natively by NumPy like datetimes with timezones, Categorical, or (the new!) nullable integer arrays.

>>> s = pd.Series(pd.date_range('2000', periods=4, tz="US/Central"))
>>> s
0   2000-01-01 00:00:00-06:00
1   2000-01-02 00:00:00-06:00
2   2000-01-03 00:00:00-06:00
3   2000-01-04 00:00:00-06:00
dtype: datetime64[ns, US/Central]

dask.dataframe has always supported the extension types that pandas defines.

>>> import dask.dataframe as dd
>>> dd.from_pandas(s, npartitions=2)
Dask Series Structure:
npartitions=2
0    datetime64[ns, US/Central]
2                           ...
3                           ...
dtype: datetime64[ns, US/Central]
Dask Name: from_pandas, 2 tasks

The Challenge

Newer versions of pandas allow third-party libraries to write custom extension arrays. These arrays can be placed inside a DataFrame or Series, and work just as well as any extension array defined within pandas itself. However, third-party extension arrays provide a slight challenge for Dask.

Recall: dask.dataframe is lazy. We use a familiar pandas-like API to build up a task graph, rather than executing immediately. But if Dask DataFrame is lazy, then how do things like the following work?

>>> df = pd.DataFrame({"A": [1, 2], 'B': [3, 4]})
>>> ddf = dd.from_pandas(df, npartitions=2)
>>> ddf[['B']].columns
Index(['B'], dtype='object')

ddf[['B']] (lazily) selects the column 'B' from the dataframe. But accessing .columns immediately returns a pandas Index object with just the selected columns.

No real computation has happened (you could just as easily swap out the from_pandas for a dd.read_parquet on a larger-than-memory dataset, and the behavior would be the same). Dask is able to do these kinds of “metadata-only” computations, where the output depends only on the columns and the dtypes, without executing the task graph. Internally, Dask does this by keeping a pair of dummy pandas DataFrames on each Dask DataFrame.

>>> ddf._meta
Empty DataFrame
Columns: [A, B]
Index: []

>>> ddf._meta_nonempty
ddf._meta_nonempty
   A  B
0  1  1
1  1  1

We need the _meta_nonempty, since some operations in pandas behave differently on an Empty DataFrame than on a non-empty one (either by design or, occasionally, a bug in pandas).

The issue with third-party extension arrays is that Dask doesn’t know what values to put in the _meta_nonempty. We’re quite happy to do it for each NumPy dtype and each of pandas’ own extension dtypes. But any third-party library could create an ExtensionArray for any type, and Dask would have no way of knowing what’s a valid value for it.

The Solution

Rather than Dask guessing what values to use for the _meta_nonempty, extension array authors (or users) can register their extension dtype with Dask. Once registered, Dask will be able to generate the _meta_nonempty, and things should work fine from there. For example, we can register the dummy DecimalArray that pandas uses for testing (this isn’t part of pandas’ public API) with Dask.

from decimal import Decimal
from pandas.tests.extension.decimal import DecimalArray, DecimalDtype

# The actual registration that would be done in the 3rd-party library
from dask.dataframe.extensions import make_array_nonempty


@make_array_nonempty.register(DecimalDtype)
def _(dtype):
    return DecimalArray._from_sequence([Decimal('0'), Decimal('NaN')],
                                       dtype=dtype)

Now users of that extension type can place those arrays inside a Dask DataFrame or Series.

>>> df = pd.DataFrame({"A": DecimalArray([Decimal('1.0'), Decimal('2.0'),
...                                       Decimal('3.0')])})

>>> ddf = dd.from_pandas(df, 2)
>>> ddf
Dask DataFrame Structure:
                     A
npartitions=1
0              decimal
2                  ...
Dask Name: from_pandas, 1 tasks

>>> ddf.dtypes
A    decimal
dtype: object

And from there, the usual operations just as they would in pandas.

>>> from random import choices
>>> df = pd.DataFrame({"A": DecimalArray(choices([Decimal('1.0'),
...                                               Decimal('2.0')],
...                                              k=100)),
...                    "B": np.random.choice([0, 1, 2, 3], size=(100,))})
>>> ddf = dd.from_pandas(df, 2)
In [35]: ddf.groupby("A").B.mean().compute()
Out[35]:
A
1.0    1.50
2.0    1.48
Name: B, dtype: float64

The Real Lesson

It’s neat that Dask now supports extension arrays. But to me, the exciting thing is just how little work this took. The PR implementing support for third-party extension arrays is quite short, just defining the object that third-parties register with, and using it to generate the data when dtype is detected. Supporting the three new extension arrays in pandas 0.24.0 (IntegerArray, PeriodArray, and IntervalArray), takes a handful of lines of code

@make_array_nonempty.register(pd.Interval):
def _(dtype):
    return IntervalArray.from_breaks([0, 1, 2], closed=dtype.closed)


@make_array_nonempty.register(pd.Period):
def _(dtype):
    return period_array([2000, 2001], freq=dtype.freq)


@make_array_nonempty.register(_IntegerDtype):
def _(dtype):
    return integer_array([0, None], dtype=dtype)

Dask benefits directly from improvements made to pandas. Dask didn’t have to build out a new parallel extension array interface, and reimplement all the new extension arrays using the parallel interface. We just re-used what pandas already did, and it fits into the existing Dask structure.

For third-party extension array authors, like cyberpandas, the work is similarly minimal. They don’t need to re-implement everything from the ground up, just to play well with Dask.

This highlights the importance of one of the Dask project’s core values: working with the community. If you visit dask.org, you’ll see phrases like

Integrates with existing projects

and

Built with the broader community

At the start of Dask, the developers could have gone off and re-written pandas or NumPy from scratch to be parallel friendly (though we’d probably still be working on that part today, since that’s such a massive undertaking). Instead, the Dask developers worked with the community, occasionally nudging it in directions that would help out dask. For example, many places in pandas held the GIL, preventing thread-based parallelism. Rather than abandoning pandas, the Dask and pandas developers worked together to release the GIL where possible when it was a bottleneck for dask.dataframe. This benefited Dask and anyone else trying to do thread-based parallelism with pandas DataFrames.

And now, when pandas introduces new features like nullable integers, dask.dataframe just needs to register it as an extension type and immediately benefits from it. And third-party extension array authors can do the same for their extension arrays.

If you’re writing an ExtensionArray, make sure to add it to the pandas ecosystem page, and register it with Dask!


blog comments powered by Disqus