This work is supported by Continuum Analytics and the Data Driven Discovery Initiative from the Moore Foundation

To increase transparency I’m blogging weekly(ish) about the work done on Dask and related projects during the previous week. This log covers work done between 2017-04-20 and 2017-04-28. Nothing here is ready for production. This blogpost is written in haste, so refined polish should not be expected.

Development in Dask and Dask-related projects during the last week includes the following notable changes:

1. Improved Joblib support, accelerating existing Scikit-Learn code
2. A dask-glm powered LogisticRegression estimator that is scikit-learn compatible
3. Additional Parquet support by Arrow
4. Sparse arrays
5. Better spill-to-disk behavior
6. AsyncIO compatible Client
7. TLS (SSL) support
8. NumPy __array_ufunc__ protocol

Joblib

Scikit learn parallelizes most of their algorithms with Joblib, which provides a simple interface for embarrassingly parallel computations. Dask has been able to hijack joblib code and serve as the backend for some time now, but it had some limitations, particularly because we would repeatedly send data back and forth from a worker to client for every batch of computations.

import distributed.joblib
from joblib import Parallel, parallel_backend

# normal Joblib code


Now there is a scatter= keyword, which allows you to pre-scatter select variables out to all of the Dask workers. This significantly cuts down on overhead, especially on machine learning workloads where most of the data doesn’t change very much.

# Send the training data only once to each worker
scatter=[digits.data, digits.target]):
search.fit(digits.data, digits.target)


Early trials indicate that computations like scikit-learn’s RandomForest scale nicely on a cluster without any additional code.

This is particularly nice because it allows Dask and Scikit-Learn to play well together without having to introduce Dask within the Scikit-Learn codebase at all. From a maintenance perspective this combination is very attractive.

Work done by Jim Crist in dask/distributed #1022

The convex optimization solvers in the dask-glm project allow us to solve common machine learning and statistics problems in parallel and at scale. Historically this young library has contained only optimization solvers and relatively little in the way of user API.

This week dask-glm grew new LogisticRegression and LinearRegression estimators that expose the scalable convex optimization algorithms within dask-glm through a Scikit-Learn compatible interface. This can both speedup solutions on a single computer or provide solutions for datasets that were previously too large to fit in memory.

from dask_glm.estimators import LogisticRegression

est = LogisticRegression()


This notebook compares performance to the latest release of scikit-learn on a 5,000,000 dataset running on a single machine. Dask-glm beats scikit-learn by a factor of four, which is also roughly the number of cores on the development machine. However in response this notebook by Olivier Grisel shows the development version of scikit-learn (with a new algorithm) beating out dask-glm by a factor of six. This just goes to show you that being smarter about your algorithms is almost always a better use of time than adopting parallelism.

Parquet with Arrow

The Parquet format is quickly becoming a standard for parallel and distributed dataframes. There are currently two Parquet reader/writers accessible from Python, fastparquet a NumPy/Numba solution, and Parquet-CPP a C++ solution with wrappers provided by Arrow. Dask.dataframe has supported parquet for a while now with fastparquet.

However, users will now have an option to use Arrow instead by switching the engine= keyword in the dd.read_parquet function.

df = dd.read_parquet('/path/to/mydata.parquet', engine='fastparquet')


Hopefully this capability increases the use of both projects and results in greater feedback to those libraries so that they can continue to advance Python’s access to the Parquet format. As a gentle reminder, you can typically get much faster query times by switching from CSV to Parquet. This is often much more effective than parallel computing.

Sparse Arrays

There is a small multi-dimensional sparse array library here: https://github.com/mrocklin/sparse. It allows us to represent arrays compactly in memory when most entries are zero. This differs from the standard solution in scipy.sparse, which can only support arrays of dimension two (matrices) and not greater.

pip install sparse

>>> import numpy as np
>>> x = np.random.random(size=(10, 10, 10, 10))
>>> x[x < 0.9] = 0
>>> x.nbytes
80000

>>> import sparse
>>> s = sparse.COO(x)
>>> s
<COO: shape=(10, 10, 10, 10), dtype=float64, nnz=1074>

>>> s.nbytes
12888

>>> sparse.tensordot(s, s, axes=((1, 0, 3), (2, 1, 0))).sum(axis=1)
array([ 100.93868073,  128.72312323,  119.12997217,  118.56304153,
133.24522101,   98.33555365,   90.25304866,   98.99823973,
100.57555847,   78.27915528])


Additionally, this sparse library more faithfully follows the numpy.ndarray API, which is exactly what dask.array expects. Because of this close API matching dask.array is able to parallelize around sparse arrays just as easily as it parallelizes around dense numpy arrays. This gives us a decent distributed multidimensional sparse array library relatively cheaply.

>>> import dask.array as da
>>> x = da.random.random(size=(10000, 10000, 10000, 10000),
...                      chunks=(100, 100, 100, 100))
>>> x[x < 0.9] = 0

>>> s = x.map_blocks(sparse.COO)  # parallel array of sparse arrays


Work on the sparse library is so far by myself and Jake VanderPlas and is available here. Work connecting this up to Dask.array is in dask/dask #2234.

Better spill to disk behavior

I’ve been playing with a 50GB sample of the 1TB Criteo dataset on my laptop (this is where I’m using sparse arrays). To make computations flow a bit faster I’ve improved the performance of Dask’s spill-to-disk policies.

Now, rather than depend on (cloud)pickle we use Dask’s network protocol, which handles data more efficiently, compresses well, and has special handling for common and important types like NumPy arrays and things built out of NumPy arrays (like sparse arrays).

As a result reading and writing excess data to disk is significantly faster. When performing machine learning computations (which are fairly heavy-weight) disk access is now fast enough that I don’t notice it in practice and running out of memory doesn’t significantly impact performance.

This is only really relevant when using common types (like numpy arrays) and when your computation to disk access ratio is relatively high (such as is the case for analytic workloads), but it was a simple fix and yielded a nice boost to my personal productivity.

Work by myself in dask/distributed #946.

AsyncIO compatible Client

The Dask.distributed scheduler maintains a fully asynchronous API for use with non-blocking systems like Tornado or AsyncIO. Because Dask supports Python 2 all of our internal code is written with Tornado. While Tornado and AsyncIO can work together, this generally requires a bit of excess book-keeping, like turning Tornado futures into AsyncIO futures, etc..

Now there is an AsyncIO specific Client that only includes non-blocking methods that are AsyncIO native. This allows for more idiomatic asynchronous code in Python 3.

async with AioClient('scheduler-address:8786') as c:
future = c.submit(func, *args, **kwargs)
result = await future


Work by Krisztián Szűcs in dask/distributed #1029.

TLS (SSL) support

TLS (previously called SSL) is a common and trusted solution to authentication and encryption. It is a commonly requested feature by companies of institutions where intra-network security is important. This is currently being worked on now at dask/distributed #1034. I encourage anyone who this may affect to engage on that pull request.

Work by Antoine Pitrou in dask/distributed #1034 and previously by Marius van Niekerk in dask/distributed #866.

NumPy __array_ufunc__

This recent change in NumPy (literally merged as I was typing this blogpost) allows other array libraries to take control of the the existing NumPy ufuncs, so if you call something like np.exp(my_dask_array) this will no longer convert to a NumPy array, but will rather call the appropriate dask.array.exp function. This is a big step towards writing generic array code that works both on NumPy arrays as well as other array projects like dask.array, xarray, bcolz, sparse, etc..

As with all large changes in NumPy this was accomplished through a collaboration of many people. PR in numpy/numpy #8247.