Je suis très nouveau en étincelle et j'en suis encore à mes premiers tests avec. J'ai installé un seul nœud et je l'utilise comme maître sur un serveur décent exécutant:
pyspark --master local[20]
Et bien sûr, je rencontre des difficultés avec mes premiers pas avec pyspark.
J'ai un fichier CSV de 40 Go et environ 300 millions de lignes dessus. Ce que je veux faire, c'est de trouver le moyen le plus rapide de fractionner ce fichier et d'en faire de petits paquets et de les stocker également sous forme de fichiers CSV. Pour cela, j'ai deux scénarios:
Premier. Fractionnez le fichier sans aucun critère. Il suffit de le diviser également en disons 100 pièces (3 millions de lignes chacune).
Deuxième. Les données CSV que je charge sont tabulaires et j'ai une colonne X avec 100 000 ID différents. Ce que j'aimerais faire, c'est créer un ensemble de dictionnaires et créer de plus petits morceaux de fichiers CSV où mes dictionnaires me diront dans quel paquet chaque ligne doit aller.
Jusqu'à présent, c'est là que je suis maintenant:
sc=SparkContext.getOrCreate()
file_1 = r'D:\PATH\TOFILE\data.csv'
sdf = spark.read.option("header","true").csv(file_1, sep=";", encoding='cp1252')
Merci de votre aide!
3 réponses
La meilleure façon (et probablement "la plus rapide") de le faire serait de profiter du partitionnement intégré des RDD par Spark et d'écrire dans un fichier CSV à partir de chaque partition. Vous pouvez repartition
ou coalesce
pour créer le nombre souhaité de partitions (disons, 100) que vous souhaitez. Cela vous donnera un parallélisme maximal (en fonction de vos ressources et configurations de cluster) car chaque Spark Executor travaille sur la tâche sur une partition à la fois.
Vous pouvez effectuer l'une de ces actions:
Faites un
mapPartition
sur le Dataframe et écrivez chaquepartition
dans un fichier CSV unique.OU
df.write.partitionBy("X").csv('mycsv.csv')
, qui créera une partition (et donc un fichier) par entrée unique dans"X"
Remarque. Si vous utilisez HDFS pour stocker vos fichiers CSV, Spark créera automatiquement plusieurs fichiers pour stocker les différentes partitions (nombre de fichiers créés = nombre de partitions RDD).
Ce que j'ai finalement fait, c'est de charger les données en tant que trame de données spark et spark crée automatiquement des partitions de taille égale de 128 Mo (configuration par défaut de la ruche), puis j'ai utilisé la méthode repartition
pour redistribuer mes lignes en fonction des valeurs d'un fichier spécifique. colonne sur mon dataframe.
# This will load my CSV data on a spark dataframe and will generate the requiered amount of 128MB partitions to store my raw data.
sdf = spark.read.option('header','true').csv(file_1, sep=';', encoding='utf-8')
# This line will redistribute the rows of each paritition according the values on a specific column. Here I'm placing all rows with the same set of values on the same partition and I'm creating 20 of them. (Sparks handle to allocate the rows so the partitions will be the same size)
sdf_2 = sdf.repartition(20, 'TARGET_COLUMN')
# This line will save all my 20 partitions on different csv files
sdf_2.write.saveAsTable('CSVBuckets', format='csv', sep=';', mode='overwrite', path=output_path, header='True')
Vous pouvez charger des fichiers csv de grande taille avec Pandas de manière efficace. Quelqu'un a effectué une comparaison sur stackexchange. Vous pouvez charger des données dans des blocs de données, puis diviser des blocs de données à de petits morceaux avec Numpy. Vous pouvez également enregistrer de nouvelles trames de données en tant que fichier csv avec Pandas.
Exemples d'extraits de code:
import pandas as pd
import numpy as np
Chargement du fichier csv dans la trame de données
df = pd.read_csv('YOUR_CSV_FILE_PATH')
Pour diviser l'ensemble du bloc de données en 3 parties
np.split(df, 3)
Enregistrer le bloc de données en tant que fichier csv
df_1.to_csv('YOUR_NEW_CSV_FILE_PATH')
Questions connexes
De nouvelles questions
python
Python est un langage de programmation multi-paradigme, typé dynamiquement et polyvalent. Il est conçu pour être rapide à apprendre, comprendre, utiliser et appliquer une syntaxe propre et uniforme. Veuillez noter que Python 2 est officiellement hors support à partir du 01-01-2020. Néanmoins, pour les questions Python spécifiques à la version, ajoutez la balise [python-2.7] ou [python-3.x]. Lorsque vous utilisez une variante Python (par exemple, Jython, PyPy) ou une bibliothèque (par exemple, Pandas et NumPy), veuillez l'inclure dans les balises.