Towards Out-of-core ND-Arrays -- Dask + Toolz = Bag
tl;dr We use dask to build a parallel Python list.
This is the seventh in a sequence of posts constructing an out-of-core nd-array using NumPy, and dask. You can view these posts here:
- Simple task scheduling,
- Frontend usability
- A multi-threaded scheduler
- Matrix Multiply Benchmark
- Spilling to disk
- Slicing and Stacking
Today we take a break from ND-Arrays and show how task scheduling can attack
other collections like the simple
list of Python objects.
Often before we have an
ndarray or a
table/DataFrame we have unstructured
data like log files. In this case tuned subsets of the language (e.g.
pandas) aren’t sufficient, we need the full Python language.
My usual approach to the inconveniently large dump of log files is to use
along with multiprocessing or IPython
Parallel on a single
large machine. I often write/speak about this workflow when discussing
This workflow grows complex for most users when you introduce many processes.
To resolve this we build our normal tricks into a new
In the same way that
dask.array mimics NumPy operations (e.g. matrix
dask.bag mimics functional operations like
reduce found in the standard library as well as many of the
streaming functions found in
- Dask array = NumPy + threads
- Dask bag = Python/Toolz + processes
Here’s the obligatory wordcount example
We use all of our cores and stream through memory on each core. We use
multiprocessing but could get fancier with some work.
There are a lot of much larger much more powerful systems that have a similar API, notably Spark and DPark. If you have Big Data and need to use very many machines then you should stop reading this and go install them.
I mostly made dask.bag because
- It was very easy given the work already done on dask.array
- I often only need multiprocessing + a heavy machine
- I wanted something trivially pip installable that didn’t use the JVM
But again, if you have Big Data, then this isn’t for you.
As before, a
Bag is just a dict holding tasks, along with a little meta data.
In this way we break up one collection
[0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 0, 1, 2, 3, 4]
into three independent pieces
[0, 1, 2, 3, 4] [0, 1, 2, 3, 4] [0, 1, 2, 3, 4]
When we abstractly operate on the large collection…
… we generate new tasks to operate on each of the components.
And when we ask for concrete results (the call to
list) we spin up a
scheduler to execute the resulting dependency graph of tasks.
More complex operations yield more complex dasks. Beware, dask code is pretty Lispy. Fortunately these dasks are internal; users don’t interact with them.
The current interface for
Bag has the following operations:
all frequencies min any join product count map std filter map_partitions sum fold max topk foldby mean var
Manipulations of bags create task dependency graphs. We eventually execute these graphs in parallel.
We repurpose the threaded scheduler we used for arrays to support
multiprocessing to provide parallelism even on pure Python code. We’re
careful to avoid unnecessary data transfer. None of the operations listed above
requires significant communication. Notably we don’t have any concept of
shuffling or scatter/gather.
These tricks remove need for user expertise.
Productive Sweet Spot
I think that there is a productive sweet spot in the following configuration
- Pure Python functions
- Streaming/lazy data
- A single large machine or a few machines in an informal cluster
This setup is common and it’s capable of handling terabyte scale workflows. In my brief experience people rarely take this route. They use single-threaded in-memory Python until it breaks, and then seek out Big Data Infrastructure like Hadoop/Spark at relatively high productivity overhead.
Your workstation can scale bigger than you think.
Here is about a gigabyte of network flow data, recording which computers made connections to which other computers on the UC-Berkeley campus in 1996.
846890339:661920 846890339:755475 846890340:197141 188.8.131.52:1163 184.108.40.206:80 2 8 4294967295 4294967295 846615753 176 2462 39 GET 21068906053917068819..html HTTP/1.0 846890340:989181 846890341:2147 846890341:2268 220.127.116.11:1269 18.104.22.168:80 10 0 842099997 4294967295 4294967295 64 1 38 GET 20271810743860818265..gif HTTP/1.0 846890341:80714 846890341:90331 846890341:90451 22.214.171.124:1270 126.96.36.199:80 10 0 842099995 4294967295 4294967295 64 1 38 GET 38127854093537985420..gif HTTP/1.0
This is actually relatively clean. Many of the fields are space delimited (not all) and I’ve already compiled and run the decade old C-code needed to decompress it from its original format.
Lets use Bag and regular expressions to parse this.
This returns instantly. We only compute results when necessary. We trigger
computation by calling
Because bag operates lazily this small result also returns immediately.
To demonstrate depth we find the ten client/server pairs with the most connections.
Comparison with Spark
First, it is silly and unfair to compare with PySpark running locally. PySpark offers much more in a distributed context.
So, given a compute-bound mostly embarrassingly parallel task (regexes are comparatively expensive) on a single machine they are comparable.
Reasons you would want to use Spark
- You want to use many machines and interact with HDFS
- Shuffling operations
Reasons you would want to use dask.bag
- Trivial installation
- No mucking about with JVM heap sizes or config files
- Nice error reporting. Spark error reporting includes the typical giant Java Stack trace that comes with any JVM solution.
- Easier/simpler for Python programmers to hack on. The implementation is 350 lines including comments.
Again, this is really just a toy experiment to show that the dask model isn’t just about arrays. I absolutely do not want to throw Dask in the ring with Spark.
However I do want to stress the importance of single-machine parallelism. Dask.bag targets this application well and leverages common hardware in a simple way that is both natural and accessible to most Python programmers.
A skilled developer could extend this to work in a distributed memory context. The logic to create the task dependency graphs is separate from the scheduler.
Special thanks to Erik Welch for finely crafting the dask optimization passes that keep the data flowly smoothly.
blog comments powered by Disqus