2

origin destination distance
1      2           0.5
2      3           0.7
3      4           0.8
4      5           0.5
1      3           1.1

J'ai besoin de transformer cette matrice en une matrice nxn avec la forme suivante :

destination 2    3    4    5
origin
1           0.5  1.1  NA   NA
2           NA   0.7  NA   NA
3           NA   NA   0.8  NA
4           NA   NA   NA   0.5

Le problème ici est la taille de la matrice. Il est fondamentalement impossible de lire l'intégralité de la matrice de 100 Go en mémoire pour la faire pivoter, j'ai donc cherché des moyens de fragmenter et de paralléliser ce processus. En utilisant python, j'ai pensé que quelque chose comme ceci pourrait fonctionner:

chunksize = 10 ** 7
dtypes = {"origin":np.int, "destination":np.int, "agg_cost":np.float32}
col_names = ["origin", "destination", "distance"]

def get_chunk(chunk):
    return chunk.pivot(index='origin', columns='destination', values='agg_cost')

results = pool.map(get_chunk, pd.read_csv("matrix.csv", usecols=col_names, dtype=dtypes, chunksize=chunksize))

pd.concat(results).to_csv("finished_matrix.csv")

Mais cela nécessite toujours de lire une quantité énorme en mémoire. De plus, étant donné que la taille de segment ne prend pas en compte le début et la fin des sections d'ID d'origine répétés, il existe des indices de ligne répétés dans le résultat concaténé final.

Existe-t-il un moyen de paralléliser efficacement cette opération de manière à ce qu'elle puisse être exécutée avec une quantité de mémoire normale (16 Go) ?

2
Dan Snow 16 mars 2019 à 20:54

2 réponses

Meilleure réponse

Étant donné que le fichier d'entrée est trop volumineux pour la mémoire, la sortie transformée sera également trop volumineuse. Je suppose donc que l'objectif est de produire un nouveau fichier de sortie, pas de trouver un moyen de conserver toutes les informations en mémoire en même temps (cette dernière question peut impliquer des matrices clairsemées ou une autre technique).

Par exemple, supposons que nous commencions avec ces données.

1   2   0.5
3   4   0.8
5   6   2.7
2   3   0.7
1   3   1.1
3   6   3.1
4   5   0.5
1   6   4.6

Divisez d'abord le fichier d'entrée en un groupe de fichiers d'entrée intermédiaires, un par ORIGINE. Dans notre exemple, nous nous retrouvons avec 5 fichiers.

1   2   0.5
1   3   1.1
1   6   4.6

2   3   0.7

3   4   0.8
3   6   3.1

4   5   0.5

5   6   2.7

Utilisez ensuite plusieurs processus pour transformer les fichiers d'entrée intermédiaires en fichiers de sortie intermédiaires, chacun ayant la nouvelle structure matricielle. Voici les fichiers résultants basés sur l'exemple.

1   .   0.5   1.1   .     .     4.6

2   .   .     0.7   .     .     .

3   .   .     .     0.8   .     3.1

4   .   .     .     .     0.5   .

5   .   .     .     .     .     2.7

Concaténez ensuite les fichiers de sortie intermédiaires pour produire la sortie finale.

La stratégie générale décrite ci-dessus peut probablement être optimisée pour la vitesse de diverses manières en sautant certains des fichiers intermédiaires. Par exemple, vous pourriez probablement éviter d'avoir un tas de fichiers intermédiaires en procédant comme suit : (A) créer un seul fichier d'entrée intermédiaire, trié par fusion par ORIGIN ; (B) tout en faisant cela, gardez également une trace des emplacements de recherche de fichiers (DEBUT, FIN) pour chaque ORIGINE ; puis (C) utiliser plusieurs processus pour produire la sortie finale, basée sur le fichier trié par fusion et les métadonnées de recherche. Cette approche pourrait être plus rapide (ce n'est peut-être pas le cas non plus), mais elle nécessite un peu plus de comptabilité. Mon premier réflexe serait de commencer simplement et d'évoluer à partir de là.

1
FMc 16 mars 2019 à 19:57

Sur la base des suggestions de chacun, j'ai écrit le script suivant. J'ai corrigé ma matrice d'entrée afin qu'elle ait des origines regroupées en séquence. Le script s'exécute très rapidement. Il traite une matrice de 50 Go en 2 minutes environ en utilisant 48 cœurs sur un serveur.

import csv
import itertools
import os
import operator
import shutil

import pandas as pd
import multiprocessing as mp

dir_path = "temp/"
dtypes = {0:str, 1:str, 2:np.float64}
col_names = ["origin", "destination", "distance"]

os.makedirs(os.path.dirname(dir_path), exist_ok=True)

for key, rows in itertools.groupby(csv.reader(open("temp.csv")), operator.itemgetter(0)):
    with open(dir_path + "%s.csv" % key, "w") as output:
        for row in rows:
            output.write(",".join(row[0:3]) + "\n")

if os.path.isfile(dir_path + "origin.csv"):
    os.remove(dir_path + "origin.csv")
files = [os.path.join(dir_path, f) for f in os.listdir(dir_path) if \
             os.path.isfile(os.path.join(dir_path, f)) and f != "origin.csv"]

destinations = pd.read_csv("temp.csv", usecols=["destination"], dtype=dtypes, squeeze=True).unique()

def convert_row(file):
    row = pd.read_csv(file, dtype=dtypes, names=col_names) \
    .pivot(index="origin", columns="destination", values="distance") \
    .reindex(columns=destinations) \
    .to_csv(file)

pool = mp.Pool(mp.cpu_count())
results = pool.map(convert_row, files)

with open('output.csv', 'wb') as outfile:
    for i, file in enumerate(files):
        with open(file, 'rb') as infile:
            if i != 0:
                infile.readline()  
            shutil.copyfileobj(infile, outfile)
0
Dan Snow 17 mars 2019 à 03:29