This work is supported by Continuum Analytics and the XDATA Program as part of the Blaze Project

A screencast version of this post is available here: https://www.youtube.com/watch?v=FkPlEqB8AnE

TL;DR.

Dask.distributed lets you submit individual tasks to the cluster. We use this ability combined with Scikit Learn to train and run a distributed random forest on distributed tabular NYC Taxi data.

Our machine learning model does not perform well, but we do learn how to execute ad-hoc computations easily.

Motivation

In the past few posts we analyzed data on a cluster with Dask collections:

  1. Dask.bag on JSON records
  2. Dask.dataframe on CSV data
  3. Dask.array on HDF5 data

Often our computations don’t fit neatly into the bag, dataframe, or array abstractions. In these cases we want the flexibility of normal code with for loops, but still with the computational power of a cluster. With the dask.distributed task interface, we achieve something close to this.

Application: Naive Distributed Random Forest Algorithm

As a motivating application we build a random forest algorithm from the ground up using the single-machine Scikit Learn library, and dask.distributed’s ability to quickly submit individual tasks to run on the cluster. Our algorithm will look like the following:

  1. Pull data from some external source (S3) into several dataframes on the cluster
  2. For each dataframe, create and train one RandomForestClassifier
  3. Scatter single testing dataframe to all machines
  4. For each RandomForestClassifier predict output on test dataframe
  5. Aggregate independent predictions from each classifier together by a majority vote. To avoid bringing too much data to any one machine, perform this majority vote as a tree reduction.

Data: NYC Taxi 2015

As in our blogpost on distributed dataframes we use the data on all NYC Taxi rides in 2015. This is around 20GB on disk and 60GB in RAM.

We predict the number of passengers in each cab given the other numeric columns like pickup and destination location, fare breakdown, distance, etc..

We do this first on a small bit of data on a single machine and then on the entire dataset on the cluster. Our cluster is composed of twelve m4.xlarges (4 cores, 15GB RAM each).

Disclaimer and Spoiler Alert: I am not an expert in machine learning. Our algorithm will perform very poorly. If you’re excited about machine learning you can stop reading here. However, if you’re interested in how to build distributed algorithms with Dask then you may want to read on, especially if you happen to know enough machine learning to improve upon my naive solution.

API: submit, map, gather

We use a small number of dask.distributed functions to build our computation:

futures = executor.scatter(data)                     # scatter data
future = executor.submit(function, *args, **kwargs)  # submit single task
futures = executor.map(function, sequence)           # submit many tasks
results = executor.gather(futures)                   # gather results
executor.replicate(futures, n=number_of_replications)

In particular, functions like executor.submit(function, *args) let us send individual functions out to our cluster thousands of times a second. Because these functions consume their own results we can create complex workflows that stay entirely on the cluster and trust the distributed scheduler to move data around intelligently.

Load Pandas from S3

First we load data from Amazon S3. We use the s3.read_csv(..., collection=False) function to load 178 Pandas DataFrames on our cluster from CSV data on S3. We get back a list of Future objects that refer to these remote dataframes. The use of collection=False gives us this list of futures rather than a single cohesive Dask.dataframe object.

from distributed import Executor, s3
e = Executor('52.91.1.177:8786')

dfs = s3.read_csv('dask-data/nyc-taxi/2015',
                  parse_dates=['tpep_pickup_datetime',
                               'tpep_dropoff_datetime'],
                  collection=False)
dfs = e.compute(dfs)

Each of these is a lightweight Future pointing to a pandas.DataFrame on the cluster.

>>> dfs[:5]
[<Future: status: finished, type: DataFrame, key: finalize-a06c3dd25769f434978fa27d5a4cf24b>,
 <Future: status: finished, type: DataFrame, key: finalize-7dcb27364a8701f45cb02d2fe034728a>,
 <Future: status: finished, type: DataFrame, key: finalize-b0dfe075000bd59c3a90bfdf89a990da>,
 <Future: status: finished, type: DataFrame, key: finalize-1c9bb25cefa1b892fac9b48c0aef7e04>,
 <Future: status: finished, type: DataFrame, key: finalize-c8254256b09ae287badca3cf6d9e3142>]

If we’re willing to wait a bit then we can pull data from any future back to our local process using the .result() method. We don’t want to do this too much though, data transfer can be expensive and we can’t hold the entire dataset in the memory of a single machine. Here we just bring back one of the dataframes:

>>> df = dfs[0].result()
>>> df.head()
VendorID tpep_pickup_datetime tpep_dropoff_datetime passenger_count trip_distance pickup_longitude pickup_latitude RateCodeID store_and_fwd_flag dropoff_longitude dropoff_latitude payment_type fare_amount extra mta_tax tip_amount tolls_amount improvement_surcharge total_amount
0 2 2015-01-15 19:05:39 2015-01-15 19:23:42 1 1.59 -73.993896 40.750111 1 N -73.974785 40.750618 1 12.0 1.0 0.5 3.25 0 0.3 17.05
1 1 2015-01-10 20:33:38 2015-01-10 20:53:28 1 3.30 -74.001648 40.724243 1 N -73.994415 40.759109 1 14.5 0.5 0.5 2.00 0 0.3 17.80
2 1 2015-01-10 20:33:38 2015-01-10 20:43:41 1 1.80 -73.963341 40.802788 1 N -73.951820 40.824413 2 9.5 0.5 0.5 0.00 0 0.3 10.80
3 1 2015-01-10 20:33:39 2015-01-10 20:35:31 1 0.50 -74.009087 40.713818 1 N -74.004326 40.719986 2 3.5 0.5 0.5 0.00 0 0.3 4.80
4 1 2015-01-10 20:33:39 2015-01-10 20:52:58 1 3.00 -73.971176 40.762428 1 N -74.004181 40.742653 2 15.0 0.5 0.5 0.00 0 0.3 16.30

Train on a single machine

To start lets go through the standard Scikit Learn fit/predict/score cycle with this small bit of data on a single machine.

from sklearn.ensemble import RandomForestClassifier
from sklearn.cross_validation import train_test_split

df_train, df_test = train_test_split(df)

columns = ['trip_distance', 'pickup_longitude', 'pickup_latitude',
           'dropoff_longitude', 'dropoff_latitude', 'payment_type',
           'fare_amount', 'mta_tax', 'tip_amount', 'tolls_amount']

est = RandomForestClassifier(n_estimators=4)
est.fit(df_train[columns], df_train.passenger_count)

This builds a RandomForestClassifer with four decision trees and then trains it against the numeric columns in the data, trying to predict the passenger_count column. It takes around 10 seconds to train on a single core. We now see how well we do on the holdout testing data:

>>> est.score(df_test[columns], df_test.passenger_count)
0.65808188654721012

This 65% accuracy is actually pretty poor. About 70% of the rides in NYC have a single passenger, so the model of “always guess one” would out-perform our fancy random forest.

>>> from sklearn.metrics import accuracy_score
>>> import numpy as np
>>> accuracy_score(df_test.passenger_count,
...                np.ones_like(df_test.passenger_count))
0.70669390028780987

This is where my ignorance in machine learning really kills us. There is likely a simple way to improve this. However, because I’m more interested in showing how to build distributed computations with Dask than in actually doing machine learning I’m going to go ahead with this naive approach. Spoiler alert: we’re going to do a lot of computation and still not beat the “always guess one” strategy.

Fit across the cluster with executor.map

First we build a function that does just what we did before, builds a random forest and then trains it on a dataframe.

def fit(df):
    est = RandomForestClassifier(n_estimators=4)
    est.fit(df[columns], df.passenger_count)
    return est

Second we call this function on all of our training dataframes on the cluster using the standard e.map(function, sequence) function. This sends out many small tasks for the cluster to run. We use all but the last dataframe for training data and hold out the last dataframe for testing. There are more principled ways to do this, but again we’re going to charge ahead here.

train = dfs[:-1]
test = dfs[-1]

estimators = e.map(fit, train)

This takes around two minutes to train on all of the 177 dataframes and now we have 177 independent estimators, each capable of guessing how many passengers a particular ride had. There is relatively little overhead in this computation.

Predict on testing data

Recall that we kept separate a future, test, that points to a Pandas dataframe on the cluster that was not used to train any of our 177 estimators. We’re going to replicate this dataframe across all workers on the cluster and then ask each estimator to predict the number of passengers for each ride in this dataset.

e.replicate([test], n=48)

def predict(est, X):
    return est.predict(X[columns])

predictions = [e.submit(predict, est, test) for est in estimators]

Here we used the executor.submit(function, *args, **kwrags) function in a list comprehension to individually launch many tasks. The scheduler determines when and where to run these tasks for optimal computation time and minimal data transfer. As with all functions, this returns futures that we can use to collect data if we want in the future.

Developers note: we explicitly replicate here in order to take advantage of efficient tree-broadcasting algorithms. This is purely a performance consideration, everything would have worked fine without this, but the explicit broadcast turns a 30s communication+computation into a 2s communication+computation.

Aggregate predictions by majority vote

For each estimator we now have an independent prediction of the passenger counts for all of the rides in our test data. In other words for each ride we have 177 different opinions on how many passengers were in the cab. By averaging these opinions together we hope to achieve a more accurate consensus opinion.

For example, consider the first four prediction arrays:

>>> a_few_predictions = e.gather(predictions[:4])  # remote futures -> local arrays
>>> a_few_predictions
[array([1, 2, 1, ..., 2, 2, 1]),
 array([1, 1, 1, ..., 1, 1, 1]),
 array([2, 1, 1, ..., 1, 1, 1]),
 array([1, 1, 1, ..., 1, 1, 1])]

For the first ride/column we see that three of the four predictions are for a single passenger while one prediction disagrees and is for two passengers. We create a consensus opinion by taking the mode of the stacked arrays:

from scipy.stats import mode
import numpy as np

def mymode(*arrays):
    array = np.stack(arrays, axis=0)
    return mode(array)[0][0]

>>> mymode(*a_few_predictions)
array([1, 1, 1, ..., 1, 1, 1])

And so when we average these four prediction arrays together we see that the majority opinion of one passenger dominates for all of the six rides visible here.

Tree Reduction

We could call our mymode function on all of our predictions like this:

>>> mode_prediction = e.submit(mymode, *predictions)  # this doesn't scale well

Unfortunately this would move all of our results to a single machine to compute the mode there. This might swamp that single machine.

Instead we batch our predictions into groups of size 10, average each group, and then repeat the process with the smaller set of predictions until we have only one left. This sort of multi-step reduction is called a tree reduction. We can write it up with a couple nested loops and executor.submit. This is only an approximation of the mode, but it’s a much more scalable computation. This finishes in about 1.5 seconds.

from toolz import partition_all

while len(predictions) > 1:
    predictions = [e.submit(mymode, *chunk)
                   for chunk in partition_all(10, predictions)]

result = e.gather(predictions)[0]

>>> result
array([1, 1, 1, ..., 1, 1, 1])

Final Score

Finally, after completing all of our work on our cluster we can see how well our distributed random forest algorithm does.

>>> accuracy_score(result, test.result().passenger_count)
0.67061974451423045

Still worse than the naive “always guess one” strategy. This just goes to show that, no matter how sophisticated your Big Data solution is, there is no substitute for common sense and a little bit of domain expertise.

What didn’t work

As always I’ll have a section like this that honestly says what doesn’t work well and what I would have done with more time.

  • Clearly this would have benefited from more machine learning knowledge. What would have been a good approach for this problem?
  • I’ve been thinking a bit about memory management of replicated data on the cluster. In this exercise we specifically replicated out the test data. Everything would have worked fine without this step but it would have been much slower as every worker gathered data from the single worker that originally had the test dataframe. Replicating data is great until you start filling up distributed RAM. It will be interesting to think of policies about when to start cleaning up redundant data and when to keep it around.
  • Several people from both open source users and Continuum customers have asked about a general Dask library for machine learning, something akin to Spark’s MLlib. Ideally a future Dask.learn module would leverage Scikit-Learn in the same way that Dask.dataframe leverages Pandas. It’s not clear how to cleanly break up and parallelize Scikit-Learn algorithms.

Conclusion

This blogpost gives a concrete example using basic task submission with executor.map and executor.submit to build a non-trivial computation. This approach is straightforward and not restrictive. Personally this interface excites me more than collections like Dask.dataframe; there is a lot of freedom in arbitrary task submission.


blog comments powered by Disqus