xref #5751
questions from SO. mrocklins nice example of using .apply
So here is an example of how to do a parallel apply using dask. This could be baked into .apply()
in pandas by the following signature enhancement:
current:
DataFrame.apply(self, func, axis=0, broadcast=False, raw=False, reduce=None,
args=(), **kwds)
proposed:
DataFrame.apply(self, func, axis=0, broadcast=False, raw=False, reduce=None,
engine=None, chunksize=None, args=(), **kwds)
where engine='dask'
(or numba
at some point) are possibilities
chunksize
would map directly to npartitions
and default to the number of cores if not specified.
further would allow engine
to be a meta object like Dask(scheduler='multiprocessing')
to support other options one would commonly pass (could also move chunksize
inside that instead of as a separate object).
impl and timings:
from functools import partial
import pandas as pd
import dask
import dask.dataframe as dd
from dask import threaded, multiprocessing
from time import sleep
pd.__version__
dask.__version__
def make_frame(N):
return pd.DataFrame({'A' : range(N)})
def slow_func(x):
sleep(0.5)
return x
df = make_frame(40)
# reg apply
def f1(df):
return df.apply(slow_func, axis=1)
# dask apply
def f2(df, get):
ddf = dd.from_pandas(df, npartitions=8, sort=False)
return ddf.apply(slow_func, columns=df.columns, axis=1).compute(get=get)
f1 = partial(f1, df)
f2_threaded = partial(f2, df, threaded.get)
f2_multi = partial(f2, df, multiprocessing.get)
result1 = f1()
result2 = f2_threaded()
result3 = f2_multi()
In [18]: result1.equals(result2)
Out[18]: True
In [19]: result1.equals(result3)
Out[19]: True
In [20]: %timeit -n 1 -r 1 f1()
1 loop, best of 1: 20.6 s per loop
In [21]: %timeit -n 1 -r 1 f2_threaded()
1 loop, best of 1: 3.03 s per loop
In [22]: %timeit -n 1 -r 1 f2_multi()
1 loop, best of 1: 3.07 s per loop
Now for some caveats.
People want to parallelize a poor implementation. Generally you proceed thru the following steps first:
- get your problem correct; optimizing incorrect results is useless
- profile profile profile. This is always the first thing to do
- use built-in pandas / numpy vectorized routines
- use cython
or numba
on the user defined function
- .apply
is always the last choice
- if its still not enough, parallelizaton.
You always want to make code simpler, not more complex. Its hard to know a-priori where bottlenecks are. People think .apply
is some magical thing, its NOT, its JUST A FOR LOOP. The problem is people tend to throw in the kitchen sink, and just everything, which is just a terrible idea.
Ok my 2c about optimizing things.
In order for parallelization to actually matter the function you are computing should take some non-trivial amount of time to things like: - iteration costs of the loop - serialization time (esp if using multi-processing / distributed computing) - does the function release the GIL if not, then threading will probably not help much - development resources (your time)
If these criteria are met, then sure give it a try.
I think providing pandas a first class way to parallelize things, even tough people will just naively use it is probably not a bad thing.
Further extensions to this are: to_dask()
(return a dask.dataframe
to the user directly), and engine='dask'
syntax for .groupby()
Comment From: jreback
cc @michaelaye cc @jseabold @jorisvandenbossche @sinhrks @TomAugspurger @shoyer cc @mrocklin @wesm
Comment From: jreback
cc @mcg1969
Comment From: mcg1969
I too worry about premature parallelization, but I'm not sure how you prevent that. I think I'd rather try and find ways to encourage Numba compilation (and if there are missing Numba features preventing that from being effective, address them). At least then you could engage target=parallel
and get multicore apply.
Comment From: mrocklin
As a reminder, here is the recipe for a simple convert-apply-convert with minimal overhead
for dask > 0.8.2
out = dd.from_pandas(df, npartitions=..., sort=False, name='x').apply(udf).compute()
Comment From: rtkaleta
For those also wondering the dd
above is from import dask.dataframe as dd
.
You must also pass axis=1
in the call to apply
because at present dask only supports applying functions to each row. Consider specifying appropriate meta
too.
Comment From: enlighter
What kind of effort would be required exactly to implement this feature?
I think if someone gives a fairly detailed answer to that question, it should help all concerned people in the community.
Comment From: TomAugspurger
@enlighter does the dask-based solution work for you? Properly supporting this in pandas would be a decent amount of effort. Dask already takes care of much of that.
Comment From: jreback
closing this for now. if a specific soln is wanted by the community could open an issue that.