Cette question rejoint ma question précédente. la base de données pyspark agrège une colonne par fenêtre temporelle glissante

Mais, je voudrais créer un post afin de clarifier certains points clés qui manquent dans ma question précédente.

La base de données d'origine :

client_id    value1    name1    a_date
 dhd         561       ecdu     2019-10-8
 dhd         561       tygp     2019-10-8  
 dhd         561       rdsr     2019-10-8
 dhd         561       rgvd     2019-8-12
 dhd         561       bhnd     2019-8-12
 dhd         561       prti     2019-8-12
 dhd         561       teuq     2019-5-7
 dhd         561       wnva     2019-5-7
 dhd         561       pqhn     2019-5-7

J'ai besoin de trouver les valeurs de "name1" pour chaque "client_id", chaque "value1" et pour une fenêtre temporelle glissante donnée.

J'ai défini une fonction fenêtre :

 w = window().partitionBy("client_id", "value1").orderBy("a_date")

Mais je ne sais pas comment sélectionner les valeurs de "name1" pour la taille de fenêtre de 1, 2, 6, 9, 12.

Ici, la taille de la fenêtre signifie que la longueur du mois à partir du mois en cours de "a_date".

Par exemple.

 client_id     value1    names1_within_window_size_1  names1_within_window_size_2
  dhd           561       [ecdu,tygp,rdsr]             [ecdu,tygp,rdsr]   

  names1_within_window_size_6
  [ecdu,tygp,rdsr, rgvd,bhnd,prti, teuq, wnva,pqhn ]  

 names1_within_window_size_1   : the month window 2019-10
 names1_within_window_size_2    : the month window 2019-10 and 2019-9 (no data in 2019-9 so just keep the data from 2019-10)
 names1_within_window_size_6    : the month window 2019-10 and 2019-9 (no data in 2019-9 so just keep the data from 2019-10) but there are data in 2019-8

Merci

============================================= MISE À JOUR

from pyspark.sql import functions as F
from pyspark.sql.window import Window

data=  [['dhd',589,'ecdu','2020-1-5'],
    ['dhd',575,'tygp','2020-1-5'],  
    ['dhd',821,'rdsr','2020-1-5'],
    ['dhd',872,'rgvd','2019-12-1'],
    ['dhd',619,'bhnd','2019-12-15'],
    ['dhd',781,'prti','2019-12-18'],
    ['dhd',781,'prti1','2019-12-18'],
    ['dhd',781,'prti2','2019-11-18'],
    ['dhd',781,'prti3','2019-10-31'],
    ['dhd',781,'prti4','2019-09-30'],
    ['dhd',781,'prt1','2019-07-31'],
    ['dhd',781,'pr4','2019-06-30'],
    ['dhd',781,'pr2','2019-08-31'],
    ['dhd',781,'prt4','2019-01-31'],
    ['dhd',781,'prti6','2019-02-28'],
    ['dhd',781,'prti7','2019-02-02'],
    ['dhd',781,'prti8','2019-03-29'],
    ['dhd',781,'prti9','2019-04-29'],
    ['dhd',781,'prti10','2019-05-04'],
    ['dhd',781,'prti11','2019-03-01'],
    ['dhd',781,'prti12','2018-12-17'],
    ['dhd',781,'prti15','2018-11-21'],
    ['dhd',781,'prti17','2018-10-31'],
    ['dhd',781,'prti19','2018-09-5']

   ]
columns= ['client_id','value1','name1','a_date']

df= spark.createDataFrame(data,columns)

df2 = df.withColumn("year_val", F.year("a_date"))\
    .withColumn("month_val", F.month("a_date"))\
    .withColumn("year_month", F.year(F.col("a_date")) * 100 + 
    F.month(F.col("a_date")))\
    .groupBy("client_id", "value1", "year_month")\
    .agg(F.concat_ws(", ", F.collect_list("name1")).alias("init_list"))

 df2.sort(F.col("value1").desc(), F.col("year_month").desc()).show()

 w = Window().partitionBy("client_id", "value1")\
    .orderBy("year_month")
df4 = df2.withColumn("a_rank", F.dense_rank().over(w))
df4.sort(F.col("value1"), F.col("year_month")).show()


month_range = 3
w = Window().partitionBy("client_id", "value1")\
    .orderBy("a_rank")\
    .rangeBetween(-(month_range),0)

 df5 = df4.withColumn("last_" + str(month_range) + "_month", F.collect_list(F.col("init_list")).over(w))\
    .orderBy("value1", "a_rank")

 df6 = df5.sort(F.col("value1").desc(), F.col("year_month").desc())
 df6.show(100,False)
0
user3448011 11 févr. 2020 à 04:16

1 réponse

Meilleure réponse

J'ai volé des données de votre question précédente pour cela car j'étais trop paresseux pour le faire moi-même et un gars sympa avait créé la liste des données d'entrée là-bas.

Alors que la fenêtre glisse sur le nombre d'enregistrements, plutôt que sur le nombre de mois, j'ai combiné tous les enregistrements pour un mois donné (regroupés par client_id et value1, bien sûr) dans un seul enregistrement dans .groupBy("client_id", "value1", "year_val", "month_val") qui est présent dans le calcul pour df2

from pyspark.sql import functions as F
from pyspark.sql.window import Window

data=  [['dhd',589,'ecdu','2020-1-5'],
        ['dhd',575,'tygp','2020-1-5'],  
        ['dhd',821,'rdsr','2020-1-5'],
        ['dhd',872,'rgvd','2019-12-1'],
        ['dhd',619,'bhnd','2019-12-15'],
        ['dhd',781,'prti','2019-12-18'],
        ['dhd',781,'prti1','2019-12-18'],
        ['dhd',781,'prti2','2019-11-18'],
        ['dhd',781,'prti3','2019-10-31'],
        ['dhd',781,'prti4','2019-09-30'],
        ['dhd',781,'prt1','2019-07-31'],
        ['dhd',781,'pr4','2019-06-30'],
        ['dhd',781,'pr2','2019-08-31'],
        ['dhd',781,'prt4','2019-01-31'],
        ['dhd',781,'prti6','2019-02-28'],
        ['dhd',781,'prti7','2019-02-02'],
        ['dhd',781,'prti8','2019-03-29'],
        ['dhd',781,'prti9','2019-04-29'],
        ['dhd',781,'prti10','2019-05-04'],
        ['dhd',781,'prti11','2019-03-01']]
columns= ['client_id','value1','name1','a_date']

df= spark.createDataFrame(data,columns)

df2 = df.withColumn("year_val", F.year("a_date"))\
        .withColumn("month_val", F.month("a_date"))\
        .groupBy("client_id", "value1", "year_val", "month_val")\
        .agg(F.concat_ws(", ", F.collect_list("name1")).alias("init_list"))

df2.show()

Ici, nous obtenons init_list comme :

+---------+------+--------+---------+-------------+
|client_id|value1|year_val|month_val|    init_list|
+---------+------+--------+---------+-------------+
|      dhd|   781|    2019|       12|  prti, prti1|
|      dhd|   589|    2020|        1|         ecdu|
|      dhd|   781|    2019|        8|          pr2|
|      dhd|   781|    2019|        3|prti8, prti11|
|      dhd|   575|    2020|        1|         tygp|
|      dhd|   781|    2019|        5|       prti10|
|      dhd|   781|    2019|        9|        prti4|
|      dhd|   781|    2019|       11|        prti2|
|      dhd|   781|    2019|       10|        prti3|
|      dhd|   821|    2020|        1|         rdsr|
|      dhd|   781|    2019|        6|          pr4|
|      dhd|   619|    2019|       12|         bhnd|
|      dhd|   781|    2019|        7|         prt1|
|      dhd|   781|    2019|        4|        prti9|
|      dhd|   781|    2019|        1|         prt4|
|      dhd|   781|    2019|        2| prti6, prti7|
|      dhd|   872|    2019|       12|         rgvd|
+---------+------+--------+---------+-------------+

En utilisant cela, nous pouvons obtenir le résultat final en exécutant simplement la fenêtre sur les enregistrements :

month_range = 6
w = Window().partitionBy("client_id", "value1")\
        .orderBy("month_val")\
        .rangeBetween(-(month_range+1),0)

df3 = df2.withColumn("last_0_month", F.collect_list(F.col("init_list")).over(w))\
        .orderBy("value1", "year_val", "month_val")

df3.show(100,False)

Ce qui nous donne:

+---------+------+--------+---------+-------------+-------------------------------------------------------------------+
|client_id|value1|year_val|month_val|init_list    |last_0_month                                                       |
+---------+------+--------+---------+-------------+-------------------------------------------------------------------+
|dhd      |575   |2020    |1        |tygp         |[tygp]                                                             |
|dhd      |589   |2020    |1        |ecdu         |[ecdu]                                                             |
|dhd      |619   |2019    |12       |bhnd         |[bhnd]                                                             |
|dhd      |781   |2019    |1        |prt4         |[prt4]                                                             |
|dhd      |781   |2019    |2        |prti6, prti7 |[prt4, prti6, prti7]                                               |
|dhd      |781   |2019    |3        |prti8, prti11|[prt4, prti6, prti7, prti8, prti11]                                |
|dhd      |781   |2019    |4        |prti9        |[prt4, prti6, prti7, prti8, prti11, prti9]                         |
|dhd      |781   |2019    |5        |prti10       |[prt4, prti6, prti7, prti8, prti11, prti9, prti10]                 |
|dhd      |781   |2019    |6        |pr4          |[prt4, prti6, prti7, prti8, prti11, prti9, prti10, pr4]            |
|dhd      |781   |2019    |7        |prt1         |[prt4, prti6, prti7, prti8, prti11, prti9, prti10, pr4, prt1]      |
|dhd      |781   |2019    |8        |pr2          |[prt4, prti6, prti7, prti8, prti11, prti9, prti10, pr4, prt1, pr2] |
|dhd      |781   |2019    |9        |prti4        |[prti6, prti7, prti8, prti11, prti9, prti10, pr4, prt1, pr2, prti4]|
|dhd      |781   |2019    |10       |prti3        |[prti8, prti11, prti9, prti10, pr4, prt1, pr2, prti4, prti3]       |
|dhd      |781   |2019    |11       |prti2        |[prti9, prti10, pr4, prt1, pr2, prti4, prti3, prti2]               |
|dhd      |781   |2019    |12       |prti, prti1  |[prti10, pr4, prt1, pr2, prti4, prti3, prti2, prti, prti1]         |
|dhd      |821   |2020    |1        |rdsr         |[rdsr]                                                             |
|dhd      |872   |2019    |12       |rgvd         |[rgvd]                                                             |
+---------+------+--------+---------+-------------+-------------------------------------------------------------------+

Limitations:

Malheureusement, dans la deuxième partie, le champ a_date est perdu et pour les opérations de fenêtre coulissante avec une plage définie dessus, le orderBy ne peut pas spécifier plusieurs colonnes (notez que orderBy dans la définition de fenêtre est uniquement sur month_val). Pour cette raison, cette solution exacte ne fonctionnera pas pour les données couvrant plusieurs années. Cependant, cela peut être facilement surmonté en ayant quelque chose comme un month_id en tant que colonne unique combinant les valeurs de l'année et du mois, puis en l'utilisant dans la clause orderBy.

Si vous souhaitez avoir plusieurs fenêtres, vous pouvez convertir month_range en une liste et la parcourir dans le dernier extrait de code pour couvrir toutes les plages.

Bien que la dernière colonne (last_0_month) ressemble à un tableau, elle contient des chaînes séparées par des virgules de l'opération précédente agg. Vous voudrez peut-être aussi le nettoyer.

1
xenodevil 11 févr. 2020 à 07:49