J'ai récemment trouvé le module dask qui vise à être un module de traitement parallèle Python facile à utiliser. Le grand argument de vente pour moi est que cela fonctionne avec les pandas.

Après avoir lu un peu sa page de manuel, je ne trouve pas de moyen de faire cette tâche trivialement parallélisable:

ts.apply(func) # for pandas series
df.apply(func, axis = 1) # for pandas DF row apply

Pour le moment, pour y parvenir en dask, AFAIK,

ddf.assign(A=lambda df: df.apply(func, axis=1)).compute() # dask DataFrame

Qui est une syntaxe laide et est en fait plus lente que pure et simple

df.apply(func, axis = 1) # for pandas DF row apply

Toute suggestion?

Edit: Merci @MRocklin pour la fonction de carte. Cela semble être plus lent que les pandas ordinaires. Est-ce lié au problème de publication de pandas GIL ou est-ce que je le fais mal?

import dask.dataframe as dd
s = pd.Series([10000]*120)
ds = dd.from_pandas(s, npartitions = 3)

def slow_func(k):
    A = np.random.normal(size = k) # k = 10000
    s = 0
    for a in A:
        if a > 0:
            s += 1
        else:
            s -= 1
    return s

s.apply(slow_func) # 0.43 sec
ds.map(slow_func).compute() # 2.04 sec
49
jf328 11 juil. 2015 à 23:52

2 réponses

Meilleure réponse

map_partitions

Vous pouvez appliquer votre fonction à toutes les partitions de votre dataframe avec la fonction map_partitions.

df.map_partitions(func, columns=...)

Notez que func ne recevra qu'une partie de l'ensemble de données à la fois, pas l'ensemble de données complet comme avec pandas apply (ce que vous ne voudriez probablement pas si vous voulez faire du parallélisme.)

map / apply

Vous pouvez mapper une fonction par ligne sur une série avec map

df.mycolumn.map(func)

Vous pouvez mapper une fonction par ligne sur une trame de données avec apply

df.apply(func, axis=1)

Threads vs processus

Depuis la version 0.6.0 dask.dataframes parallélise avec les threads. Les fonctions Python personnalisées ne bénéficieront pas beaucoup du parallélisme basé sur les threads. Vous pouvez essayer des processus à la place

df = dd.read_csv(...)

df.map_partitions(func, columns=...).compute(scheduler='processes')

Mais évitez apply

Cependant, vous devriez vraiment éviter apply avec les fonctions Python personnalisées, à la fois dans Pandas et dans Dask. Ceci est souvent une source de mauvaises performances. Il se peut que si vous trouvez un moyen de faire votre opération de manière vectorisée, il se peut que votre code Pandas soit 100 fois plus rapide et que vous n'ayez pas du tout besoin de dask.dataframe.

Considérez numba

Pour votre problème particulier, vous pouvez envisager numba. Cela améliore considérablement vos performances.

In [1]: import numpy as np
In [2]: import pandas as pd
In [3]: s = pd.Series([10000]*120)

In [4]: %paste
def slow_func(k):
    A = np.random.normal(size = k) # k = 10000
    s = 0
    for a in A:
        if a > 0:
            s += 1
        else:
            s -= 1
    return s
## -- End pasted text --

In [5]: %time _ = s.apply(slow_func)
CPU times: user 345 ms, sys: 3.28 ms, total: 348 ms
Wall time: 347 ms

In [6]: import numba
In [7]: fast_func = numba.jit(slow_func)

In [8]: %time _ = s.apply(fast_func)  # First time incurs compilation overhead
CPU times: user 179 ms, sys: 0 ns, total: 179 ms
Wall time: 175 ms

In [9]: %time _ = s.apply(fast_func)  # Subsequent times are all gain
CPU times: user 68.8 ms, sys: 27 µs, total: 68.8 ms
Wall time: 68.7 ms

Clause de non-responsabilité, je travaille pour la société qui fabrique à la fois numba et dask et emploie de nombreux développeurs pandas.

61
MRocklin 16 mars 2019 à 19:07

À partir de v dask.dataframe. Appliquer la responsabilité des délégués à map_partitions:

@insert_meta_param_description(pad=12)
def apply(self, func, convert_dtype=True, meta=no_default, args=(), **kwds):
    """ Parallel version of pandas.Series.apply
    ...
    """
    if meta is no_default:
        msg = ("`meta` is not specified, inferred from partial data. "
               "Please provide `meta` if the result is unexpected.\n"
               "  Before: .apply(func)\n"
               "  After:  .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result\n"
               "  or:     .apply(func, meta=('x', 'f8'))            for series result")
        warnings.warn(msg)

        meta = _emulate(M.apply, self._meta_nonempty, func,
                        convert_dtype=convert_dtype,
                        args=args, **kwds)

    return map_partitions(M.apply, self, func,
                          convert_dtype, args, meta=meta, **kwds)
2
Shubham Chaudhary 30 juin 2017 à 04:30