# Developing Convex Optimization Algorithms in Dask parallel math is fun

*This work is supported by Continuum Analytics,
the XDATA Program,
and the Data Driven Discovery Initiative from the Moore
Foundation.*

## Summary

We build distributed optimization algorithms with Dask. We show both simple examples and also benchmarks from a nascent dask-glm library for generalized linear models. We also talk about the experience of learning Dask to do this kind of work.

This blogpost is co-authored by Chris White (Capital One) who knows optimization and Matthew Rocklin (Continuum Analytics) who knows distributed computing.

## Introduction

Many machine learning and statistics models (such as logistic regression) depend on convex optimization algorithms like Newton’s method, stochastic gradient descent, and others. These optimization algorithms are both pragmatic (they’re used in many applications) and mathematically interesting. As a result these algorithms have been the subject of study by researchers and graduate students around the world for years both in academia and in industry.

Things got interesting about five or ten years ago when datasets grew beyond the size of working memory and “Big Data” became a buzzword. Parallel and distributed solutions for these algorithms have become the norm, and a researcher’s skillset now has to extend beyond linear algebra and optimization theory to include parallel algorithms and possibly even network programming, especially if you want to explore and create more interesting algorithms.

However, relatively few people understand both mathematical optimization theory and the details of distributed systems. Typically algorithmic researchers depend on the APIs of distributed computing libraries like Spark or Flink to implement their algorithms. In this blogpost we explore the extent to which Dask can be helpful in these applications. We approach this from two perspectives:

**Algorithmic researcher**(Chris): someone who knows optimization and iterative algorithms like Conjugate Gradient, Dual Ascent, or GMRES but isn’t so hot on distributed computing topics like sockets, MPI, load balancing, and so on**Distributed systems developer**(Matt): someone who knows how to move bytes around and keep machines busy but doesn’t know the right way to do a line search or handle a poorly conditioned matrix

## Prototyping Algorithms in Dask

Given knowledge of algorithms and of NumPy array computing it is easy to write parallel algorithms with Dask. For a range of complicated algorithmic structures we have two straightforward choices:

- Use parallel multi-dimensional arrays to construct algorithms from common operations like matrix multiplication, SVD, and so on. This mirrors mathematical algorithms well but lacks some flexibility.
- Create algorithms by hand that track operations on individual chunks of in-memory data and dependencies between them. This is very flexible but requires a bit more care.

Coding up either of these options from scratch can be a daunting task, but with Dask it can be as simple as writing NumPy code.

Let’s build up an example of fitting a large linear regression model using both built-in array parallelism and fancier, more customized parallelization features that Dask offers. The dask.array module helps us to easily parallelize standard NumPy functionality using the same syntax – we’ll start there.

### Data Creation

Dask has many ways to create dask arrays; to get us started quickly prototyping let’s create some random data in a way that should look familiar to NumPy users.

```
import dask
import dask.array as da
import numpy as np
from dask.distributed import Client
client = Client()
## create inputs with a bunch of independent normals
beta = np.random.random(100) # random beta coefficients, no intercept
X = da.random.normal(0, 1, size=(1000000, 100), chunks=(100000, 100))
y = X.dot(beta) + da.random.normal(0, 1, size=1000000, chunks=(100000,))
## make sure all chunks are ~equally sized
X, y = dask.persist(X, y)
client.rebalance([X, y])
```

Observe that `X`

is a dask array stored in 10 chunks, each of size `(100000, 100)`

. Also note that `X.dot(beta)`

runs smoothly for both `numpy`

and `dask`

arrays, so we can write code that basically works in either world.

**Caveat:** If `X`

is a `numpy`

array and `beta`

is a `dask`

array, `X.dot(beta)`

will output an *in-memory* `numpy`

array. This is usually not desirable as you want to carefully choose when to load something into memory. One fix is to use `multipledispatch`

to handle odd edge cases; for a starting example, check out the `dot`

code here.

Dask also has convenient visualization features built in that we will leverage; below we visualize our data in its 10 independent chunks:

### Array Programming

*If you can write iterative array-based algorithms in NumPy, then you can write iterative parallel algorithms in Dask*

As we’ve already seen, Dask inherits much of the NumPy API that we are familiar with, so we can write simple NumPy-style iterative optimization algorithms that will leverage the parallelism `dask.array`

has built-in already. For example, if we want to naively fit a linear regression model on the data above, we are trying to solve the following convex optimization problem:

Recall that in non-degenerate situations this problem has a closed-form solution that is given by:

\[\beta^* = \left(X^T X\right)^{-1} X^T y\]We can compute $\beta^*$ using the above formula with Dask:

```
## naive solution
beta_star = da.linalg.solve(X.T.dot(X), X.T.dot(y))
>>> abs(beta_star.compute() - beta).max()
0.0024817567237768179
```

Sometimes a direct solve is too costly, and we want to solve the above problem using only simple matrix-vector multiplications. To this end, let’s take this one step further and actually implement a gradient descent algorithm which exploits parallel matrix operations. Recall that gradient descent iteratively refines an initial estimate of beta via the update:

\[\beta^+ = \beta - \alpha \nabla f(\beta)\]where $\alpha$ can be chosen based on a number of different “step-size” rules; for the purposes of exposition, we will stick with a constant step-size:

```
## quick step-size calculation to guarantee convergence
_, s, _ = np.linalg.svd(2 * X.T.dot(X))
step_size = 1 / s - 1e-8
## define some parameters
max_steps = 100
tol = 1e-8
beta_hat = np.zeros(100) # initial guess
for k in range(max_steps):
Xbeta = X.dot(beta_hat)
func = ((y - Xbeta)**2).sum()
gradient = 2 * X.T.dot(Xbeta - y)
## Update
obeta = beta_hat
beta_hat = beta_hat - step_size * gradient
new_func = ((y - X.dot(beta_hat))**2).sum()
beta_hat, func, new_func = dask.compute(beta_hat, func, new_func) # <--- Dask code
## Check for convergence
change = np.absolute(beta_hat - obeta).max()
if change < tol:
break
>>> abs(beta_hat - beta).max()
0.0024817567259038942
```

It’s worth noting that almost all of this code is exactly the same as the equivalent NumPy code. Because Dask.array and NumPy share the same API it’s pretty easy for people who are already comfortable with NumPy to get started with distributed algorithms right away. The only thing we had to change was how we produce our original data (`da.random.normal`

instead of `np.random.normal`

) and the call to `dask.compute`

at the end of the update state. The `dask.compute`

call tells Dask to go ahead and actually evaluate everything we’ve told it to do so far (Dask is lazy by default). Otherwise, all of the mathematical operations, matrix multiplies, slicing, and so on are exactly the same as with Numpy, except that Dask.array builds up a chunk-wise parallel computation for us and Dask.distributed can execute that computation in parallel.

To better appreciate all the scheduling that is happening in one update step of the above algorithm, here is a visualization of the computation necessary to compute `beta_hat`

and the new function value `new_func`

:

Each rectangle is an in-memory chunk of our distributed array and every circle is a numpy function call on those in-memory chunks. The Dask scheduler determines where and when to run all of these computations on our cluster of machines (or just on the cores of our laptop).

#### Array Programming + dask.delayed

Now that we’ve seen how to use the built-in parallel algorithms offered by Dask.array, let’s go one step further and talk about writing more customized parallel algorithms. Many distributed “consensus” based algorithms in machine learning are based on the idea that each chunk of data can be processed independently in parallel, and send their guess for the optimal parameter value to some master node. The master then computes a *consensus* estimate for the optimal parameters and reports that back to all of the workers. Each worker then processes their chunk of data given this new information, and the process continues until convergence.

From a parallel computing perspective this is a pretty simple map-reduce procedure. Any distributed computing framework should be able to handle this easily. We’ll use this as a very simple example for how to use Dask’s more customizable parallel options.

One such algorithm is the Alternating Direction Method of Multipliers, or ADMM for short. For the sake of this post, we will consider the work done by each worker to be a black box.

We will also be considering a *regularized* version of the problem above, namely:

At the end of the day, all we will do is:

- create NumPy functions which define how each chunk updates its parameter estimates
- wrap those functions in
`dask.delayed`

- call
`dask.compute`

and process the individual estimates, again using NumPy

First we need to define some *local* functions that the chunks will use to update their individual parameter estimates, and import the black box `local_update`

step from `dask_glm`

; also, we will need the so-called *shrinkage* operator (which is the proximal operator for the $l1$-norm in our problem):

```
from dask_glm.algorithms import local_update
def local_f(beta, X, y, z, u, rho):
return ((y - X.dot(beta)) **2).sum() + (rho / 2) * np.dot(beta - z + u,
beta - z + u)
def local_grad(beta, X, y, z, u, rho):
return 2 * X.T.dot(X.dot(beta) - y) + rho * (beta - z + u)
def shrinkage(beta, t):
return np.maximum(0, beta - t) - np.maximum(0, -beta - t)
## set some algorithm parameters
max_steps = 10
lamduh = 7.2
rho = 1.0
(n, p) = X.shape
nchunks = X.npartitions
XD = X.to_delayed().flatten().tolist() # A list of pointers to remote numpy arrays
yD = y.to_delayed().flatten().tolist() # ... one for each chunk
# the initial consensus estimate
z = np.zeros(p)
# an array of the individual "dual variables" and parameter estimates,
# one for each chunk of data
u = np.array([np.zeros(p) for i in range(nchunks)])
betas = np.array([np.zeros(p) for i in range(nchunks)])
for k in range(max_steps):
# process each chunk in parallel, using the black-box 'local_update' magic
new_betas = [dask.delayed(local_update)(xx, yy, bb, z, uu, rho,
f=local_f,
fprime=local_grad)
for xx, yy, bb, uu in zip(XD, yD, betas, u)]
new_betas = np.array(dask.compute(*new_betas))
# everything else is NumPy code occurring at "master"
beta_hat = 0.9 * new_betas + 0.1 * z
# create consensus estimate
zold = z.copy()
ztilde = np.mean(beta_hat + np.array(u), axis=0)
z = shrinkage(ztilde, lamduh / (rho * nchunks))
# update dual variables
u += beta_hat - z
>>> # Number of coefficients zeroed out due to L1 regularization
>>> print((z == 0).sum())
12
```

There is of course a little bit more work occurring in the above algorithm, but it should be clear that the distributed operations are *not* one of the difficult pieces. Using dask.delayed we were able to express a simple map-reduce algorithm like ADMM with similarly simple Python for loops and delayed function calls. Dask.delayed is keeping track of all of the function calls we wanted to make and what other function calls they depend on. For example all of the `local_update`

calls can happen independent of each other, but the consensus computation blocks on all of them.

We hope that both parallel algorithms shown above (gradient descent, ADMM) were straightforward to someone reading with an optimization background. These implementations run well on a laptop, a single multi-core workstation, or a thousand-node cluster if necessary. We’ve been building somewhat more sophisticated implementations of these algorithms (and others) in dask-glm. They are more sophisticated from an optimization perspective (stopping criteria, step size, asynchronicity, and so on) but remain as simple from a distributed computing perspective.

## Experiment

*We compare dask-glm implementations against Scikit-learn on a laptop, and then
show them running on a cluster.*

Reproducible notebook is available here

We’re building more sophisticated versions of the algorithms above in dask-glm. This project has convex optimization algorithms for gradient descent, proximal gradient descent, Newton’s method, and ADMM. These implementations extend the implementations above by also thinking about stopping criteria, step sizes, and other niceties that we avoided above for simplicity.

In this section we show off these algorithms by performing a simple numerical experiment that compares the numerical performance of proximal gradient descent and ADMM alongside Scikit-Learn’s LogisticRegression and SGD implementations on a single machine (a personal laptop) and then follows up by scaling the dask-glm options to a moderate cluster.

*Disclaimer: These experiments are crude. We’re using artificial data, we’re not tuning
parameters or even finding parameters at which these algorithms are producing
results of the same accuracy. The goal of this section is just to give a
general feeling of how things compare.*

We create data

```
## size of problem (no. observations)
N = 8e6
chunks = 1e6
seed = 20009
beta = (np.random.random(15) - 0.5) * 3
X = da.random.random((N,len(beta)), chunks=chunks)
y = make_y(X, beta=np.array(beta), chunks=chunks)
X, y = dask.persist(X, y)
client.rebalance([X, y])
```

And run each of our algorithms as follows:

```
# Dask-GLM Proximal Gradient
result = proximal_grad(X, y, lamduh=alpha)
# Dask-GLM ADMM
X2 = X.rechunk((1e5, None)).persist() # ADMM prefers smaller chunks
y2 = y.rechunk(1e5).persist()
result = admm(X2, y2, lamduh=alpha)
# Scikit-Learn LogisticRegression
nX, ny = dask.compute(X, y) # sklearn wants numpy arrays
result = LogisticRegression(penalty='l1', C=1).fit(nX, ny).coef_
# Scikit-Learn Stochastic Gradient Descent
result = SGDClassifier(loss='log',
penalty='l1',
l1_ratio=1,
n_iter=10,
fit_intercept=False).fit(nX, ny).coef_
```

We then compare with the $L_{\infty}$ norm (largest different value).

```
abs(result - beta).max()
```

Times and $L_\infty$ distance from the true “generative beta” for these parameters are shown in the table below:

Algorithm | Error | Duration (s) |
---|---|---|

Proximal Gradient | 0.0227 | 128 |

ADMM | 0.0125 | 34.7 |

LogisticRegression | 0.0132 | 79 |

SGDClassifier | 0.0456 | 29.4 |

Again, please don’t take these numbers too seriously: these algorithms all solve regularized problems, so we don’t expect the results to necessarily be close to the underlying generative beta (even asymptotically). The numbers above are meant to demonstrate that they all return results which were roughly the same distance from the beta above. Also, Dask-glm is using a full four-core laptop while SKLearn is restricted to use a single core.

In the sections below we include profile plots for proximal gradient and ADMM. These show the operations that each of eight threads was doing over time. You can mouse-over rectangles/tasks and zoom in using the zoom tools in the upper right. You can see the difference in complexity of the algorithms. ADMM is much simpler from Dask’s perspective but also saturates hardware better for this chunksize.

### Profile Plot for Proximal Gradient Descent

### Profile Plot for ADMM

The general takeaway here is that dask-glm performs comparably to Scikit-Learn
on a single machine. If your problem fits in memory on a single machine you
should continue to use Scikit-Learn and Statsmodels. The real benefit to the
dask-glm algorithms is that they *scale* and can run efficiently on data that is
larger-than-memory by operating from disk on a single computer or on a
cluster of computers working together.

### Cluster Computing

As a demonstration, we run a larger version of the data above on a cluster of
eight `m4.2xlarges`

on EC2 (8 cores and 30GB of RAM each.)

We create a larger dataset with 800,000,000 rows and 15 columns across eight processes.

```
N = 8e8
chunks = 1e7
seed = 20009
beta = (np.random.random(15) - 0.5) * 3
X = da.random.random((N,len(beta)), chunks=chunks)
y = make_y(X, beta=np.array(beta), chunks=chunks)
X, y = dask.persist(X, y)
```

We then run the same `proximal_grad`

and `admm`

operations from before:

```
# Dask-GLM Proximal Gradient
result = proximal_grad(X, y, lamduh=alpha)
# Dask-GLM ADMM
X2 = X.rechunk((1e6, None)).persist() # ADMM prefers smaller chunks
y2 = y.rechunk(1e6).persist()
result = admm(X2, y2, lamduh=alpha)
```

Proximal grad completes in around seventeen minutes while ADMM completes in around four minutes. Profiles for the two computations are included below:

#### Profile Plot for Proximal Gradient Descent

We include only the first few iterations here. Otherwise this plot is several megabytes.

#### Profile Plot for ADMM

These both obtained similar $L_{\infty}$ errors to what we observed before.

Algorithm | Error | Duration (s) |
---|---|---|

Proximal Gradient | 0.0306 | 1020 |

ADMM | 0.00159 | 270 |

Although this time we had to be careful about a couple of things:

- We explicitly deleted the old data after rechunking (ADMM prefers different chunksizes than proximal_gradient) because our full dataset, 100GB, is close enough to our total distributed RAM (240GB) that it’s a good idea to avoid keeping replias around needlessly. Things would have run fine, but spilling excess data to disk would have negatively affected performance.
- We set the
`OMP_NUM_THREADS=1`

environment variable to avoid over-subscribing our CPUs. Surprisingly not doing so led both to worse performance and to non-deterministic results. An issue that we’re still tracking down.

### Analysis

The algorithms in Dask-GLM are new and need development, but are in a usable state by people comfortable operating at this technical level. Additionally, we would like to attract other mathematical and algorithmic developers to this work. We’ve found that Dask provides a nice balance between being flexible enough to support interesting algorithms, while being managed enough to be usable by researchers without a strong background in distributed systems. In this section we’re going to discuss the things that we learned from both Chris’ (mathematical algorithms) and Matt’s (distributed systems) perspective and then talk about possible future work. We encourage people to pay attention to future work; we’re open to collaboration and think that this is a good opportunity for new researchers to meaningfully engage.

#### Chris’s perspective

- Creating distributed algorithms with Dask was surprisingly easy; there is
still a small learning curve around when to call things like
`persist`

,`compute`

,`rebalance`

, and so on, but that can’t be avoided. Using Dask for algorithm development has been a great learning environment for understanding the unique challenges associated with distributed algorithms (including communication costs, among others). - Getting the particulars of algorithms correct is non-trivial; there is still work to be done in better understanding the tolerance settings vs. accuracy tradeoffs that are occurring in many of these algorithms, as well as fine-tuning the convergence criteria for increased precision.
- On the software development side, reliably testing optimization algorithms
is hard. Finding provably correct optimality conditions that should be
satisfied
*which are also numerically stable*has been a challenge for me. - Working on algorithms in isolation is not nearly as fun as collaborating on them; please join the conversation and contribute!
- Most importantly from my perspective, I’ve found there is a surprisingly
large amount of misunderstanding in “the community” surrounding what
optimization algorithms do in the world of predictive modeling, what
problems they each individually solve, and whether or not they are
interchangeable for a given problem. For example, Newton’s method can’t be
used to optimize an l1-regularized problem, and the coefficient estimates
from an l1-regularized problem are fundamentally (and numerically) different
from those of an l2-regularized problem (and from those of an unregularized
problem). My own personal goal is that the API for
`dask-glm`

exposes these subtle distinctions more transparently and leads to more thoughtful modeling decisions “in the wild”.

#### Matt’s perspective

This work triggered a number of concrete changes within the Dask library:

- We can convert Dask.dataframes to Dask.arrays. This is particularly important because people want to do pre-processing with dataframes but then switch to efficient multi-dimensional arrays for algorithms.
- We had to unify the single-machine scheduler and distributed scheduler APIs
a bit, notably adding a
`persist`

function to the single machine scheduler. This was particularly important because Chris generally prototyped on his laptop but we wanted to write code that was effective on clusters. - Scheduler overhead can be a problem for the iterative dask-array algorithms (gradient descent, proximal gradient descent, BFGS). This is particularly a problem because NumPy is very fast. Often our tasks take only a few milliseconds, which makes Dask’s overhead of 200us per task become very relevant (this is why you see whitespace in the profile plots above). We’ve started resolving this problem in a few ways like more aggressive task fusion and lower overheads generally, but this will be a medium-term challenge. In practice for dask-glm we’ve started handling this just by choosing chunksizes well. I suspect that for the dask-glm in particular we’ll just develop auto-chunksize heuristics that will mostly solve this problem. However we expect this problem to recur in other work with scientists on HPC systems who have similar situations.
- A couple of things can be tricky for algorithmic users:
- Placing the calls to asynchronously start computation (persist, compute). In practice Chris did a good job here and then I came through and tweaked things afterwards. The web diagnostics ended up being crucial to identify issues.
- Avoiding accidentally calling NumPy functions on dask.arrays and vice versa. We’ve improved this on the dask.array side, and they now operate intelligently when given numpy arrays. Changing this on the NumPy side is harder until NumPy protocols change (which is planned).

#### Future work

There are a number of things we would like to do, both in terms of measurement and for the dask-glm project itself. We welcome people to voice their opinions (and join development) on the following issues:

- Asynchronous Algorithms
- User APIs
- Extend GLM families
- Write more extensive rigorous algorithm testing - for satisfying provable optimality criteria, and for robustness to various input data
- Begin work on smart initialization routines

What is your perspective here, gentle reader? Both Matt and Chris can use help on this project. We hope that some of the issues above provide seeds for community engagement. We welcome other questions, comments, and contributions either as github issues or comments below.

## Acknowledgements

Thanks also go to Hussain Sultan (Capital One) and Tom Augspurger for collaboration on Dask-GLM and to Will Warner (Continuum) for reviewing and editing this post.

blog comments powered by Disqus