Je souhaite filtrer un Pyspark DataFrame avec une clause IN de type SQL, comme dans

sc = SparkContext()
sqlc = SQLContext(sc)
df = sqlc.sql('SELECT * from my_df WHERE field1 IN a')

a est le tuple (1, 2, 3). Je reçois cette erreur:

java.lang.RuntimeException: [1.67] échec: `` ('' attendu mais identifiant un trouvé

Ce qui signifie essentiellement qu'il s'attendait à quelque chose comme '(1, 2, 3)' au lieu d'un. Le problème est que je ne peux pas écrire manuellement les valeurs dans un car il est extrait d'un autre travail.

Comment filtrer dans ce cas?

37
mar tin 8 mars 2016 à 18:00

5 réponses

Meilleure réponse

Chaîne que vous passez à SQLContext qu'il a évaluée dans le cadre de l'environnement SQL. Il ne capture pas la fermeture. Si vous voulez passer une variable, vous devrez le faire explicitement en utilisant la mise en forme des chaînes:

df = sc.parallelize([(1, "foo"), (2, "x"), (3, "bar")]).toDF(("k", "v"))
df.registerTempTable("df")
sqlContext.sql("SELECT * FROM df WHERE v IN {0}".format(("foo", "bar"))).count()
##  2 

Évidemment, ce n'est pas quelque chose que vous utiliseriez dans un "vrai" environnement SQL pour des raisons de sécurité, mais cela ne devrait pas avoir d'importance ici.

En pratique, DataFrame DSL est un choix très important lorsque vous souhaitez créer des requêtes dynamiques:

from pyspark.sql.functions import col

df.where(col("v").isin({"foo", "bar"})).count()
## 2

Il est facile à construire et à composer et gère tous les détails de HiveQL / Spark SQL pour vous.

55
zero323 9 mars 2016 à 08:49

Juste un petit ajout / mise à jour:

choice_list = ["foo", "bar", "jack", "joan"]

Si vous souhaitez filtrer votre cadre de données "df", de sorte que vous souhaitez conserver les lignes basées sur une colonne "v" en prenant uniquement les valeurs de choice_list, alors

df_filtered = df.where( ( col("v").isin (choice_list) ) )
5
Vega 27 juin 2018 à 16:05

Réitérant ce que @ zero323 a mentionné ci-dessus: nous pouvons faire la même chose en utilisant une liste également (pas seulement set) comme ci-dessous

from pyspark.sql.functions import col

df.where(col("v").isin(["foo", "bar"])).count()
20
zero323 15 mai 2017 à 21:47

Une approche légèrement différente qui a fonctionné pour moi consiste à filtrer avec une fonction de filtre personnalisée.

def filter_func(a):
"""wrapper function to pass a in udf"""
    def filter_func_(col):
    """filtering function"""
        if col in a.value:
            return True

    return False

return udf(filter_func_, BooleanType())

# Broadcasting allows to pass large variables efficiently
a = sc.broadcast((1, 2, 3))
df = my_df.filter(filter_func(a)(col('field1'))) \
0
Alex_Gidiotis 9 nov. 2018 à 15:12

Vous pouvez également le faire pour les colonnes entières:

df_filtered = df.filter("field1 in (1,2,3)")

Ou ceci pour les colonnes de chaînes:

df_filtered = df.filter("field1 in ('a','b','c')")
2
BICube 5 juin 2019 à 22:20