Je suis très nouveau en étincelle et j'en suis encore à mes premiers tests avec. J'ai installé un seul nœud et je l'utilise comme maître sur un serveur décent exécutant:

pyspark --master local[20]

Et bien sûr, je rencontre des difficultés avec mes premiers pas avec pyspark.

J'ai un fichier CSV de 40 Go et environ 300 millions de lignes dessus. Ce que je veux faire, c'est de trouver le moyen le plus rapide de fractionner ce fichier et d'en faire de petits paquets et de les stocker également sous forme de fichiers CSV. Pour cela, j'ai deux scénarios:

Premier. Fractionnez le fichier sans aucun critère. Il suffit de le diviser également en disons 100 pièces (3 millions de lignes chacune).

Deuxième. Les données CSV que je charge sont tabulaires et j'ai une colonne X avec 100 000 ID différents. Ce que j'aimerais faire, c'est créer un ensemble de dictionnaires et créer de plus petits morceaux de fichiers CSV où mes dictionnaires me diront dans quel paquet chaque ligne doit aller.

Jusqu'à présent, c'est là que je suis maintenant:

sc=SparkContext.getOrCreate()

file_1 = r'D:\PATH\TOFILE\data.csv'

sdf = spark.read.option("header","true").csv(file_1, sep=";", encoding='cp1252')

Merci de votre aide!

0
oso_ted 16 avril 2018 à 11:25

3 réponses

Meilleure réponse

La meilleure façon (et probablement "la plus rapide") de le faire serait de profiter du partitionnement intégré des RDD par Spark et d'écrire dans un fichier CSV à partir de chaque partition. Vous pouvez repartition ou coalesce pour créer le nombre souhaité de partitions (disons, 100) que vous souhaitez. Cela vous donnera un parallélisme maximal (en fonction de vos ressources et configurations de cluster) car chaque Spark Executor travaille sur la tâche sur une partition à la fois.

Vous pouvez effectuer l'une de ces actions:

  1. Faites un mapPartition sur le Dataframe et écrivez chaque partition dans un fichier CSV unique.

  2. OU df.write.partitionBy("X").csv('mycsv.csv'), qui créera une partition (et donc un fichier) par entrée unique dans "X"

Remarque. Si vous utilisez HDFS pour stocker vos fichiers CSV, Spark créera automatiquement plusieurs fichiers pour stocker les différentes partitions (nombre de fichiers créés = nombre de partitions RDD).

2
void 17 avril 2018 à 08:04

Ce que j'ai finalement fait, c'est de charger les données en tant que trame de données spark et spark crée automatiquement des partitions de taille égale de 128 Mo (configuration par défaut de la ruche), puis j'ai utilisé la méthode repartition pour redistribuer mes lignes en fonction des valeurs d'un fichier spécifique. colonne sur mon dataframe.

# This will load my CSV data on a spark dataframe and will generate the requiered amount of 128MB partitions to store my raw data.
sdf = spark.read.option('header','true').csv(file_1, sep=';', encoding='utf-8')
# This line will redistribute the rows of each paritition according the values on a specific column. Here I'm placing all rows with the same set of values on the same partition and I'm creating 20 of them. (Sparks handle to allocate the rows so the partitions will be the same size)
sdf_2 = sdf.repartition(20, 'TARGET_COLUMN')
# This line will save all my 20 partitions on different csv files
sdf_2.write.saveAsTable('CSVBuckets', format='csv', sep=';', mode='overwrite', path=output_path, header='True')
0
oso_ted 17 avril 2018 à 08:09

Vous pouvez charger des fichiers csv de grande taille avec Pandas de manière efficace. Quelqu'un a effectué une comparaison sur stackexchange. Vous pouvez charger des données dans des blocs de données, puis diviser des blocs de données à de petits morceaux avec Numpy. Vous pouvez également enregistrer de nouvelles trames de données en tant que fichier csv avec Pandas.

Exemples d'extraits de code:

import pandas as pd
import numpy as np

Chargement du fichier csv dans la trame de données

df = pd.read_csv('YOUR_CSV_FILE_PATH')

Pour diviser l'ensemble du bloc de données en 3 parties

np.split(df, 3)

Enregistrer le bloc de données en tant que fichier csv

df_1.to_csv('YOUR_NEW_CSV_FILE_PATH')
1
abdullahselek 16 avril 2018 à 09:16