J'ai écrit un code python sur la somme de tous les nombres dans la première colonne pour chaque fichier csv qui est comme suit:

import os, sys, inspect, csv

### Current directory path.
curr_dir = os.path.split(inspect.getfile(inspect.currentframe()))[0]

### Setup the environment variables
spark_home_dir = os.path.realpath(os.path.abspath(os.path.join(curr_dir, "../spark")))
python_dir = os.path.realpath(os.path.abspath(os.path.join(spark_home_dir, "./python")))
os.environ["SPARK_HOME"] = spark_home_dir
os.environ["PYTHONPATH"] = python_dir

### Setup pyspark directory path
pyspark_dir = python_dir
sys.path.append(pyspark_dir)

### Import the pyspark
from pyspark import SparkConf, SparkContext

### Specify the data file directory, and load the data files
data_path = os.path.realpath(os.path.abspath(os.path.join(curr_dir, "./test_dir")))

### myfunc is to add all numbers in the first column.
def myfunc(s):
    total = 0
    if s.endswith(".csv"):
            cr = csv.reader(open(s,"rb"))
            for row in cr:
                total += int(row[0])
                return total

def main():
### Initialize the SparkConf and SparkContext
    conf = SparkConf().setAppName("ruofan").setMaster("spark://ec2-52-26-177-197.us-west-2.compute.amazonaws.com:7077")
    sc = SparkContext(conf = conf)
    datafile = sc.wholeTextFiles(data_path)

    ### Sent the application in each of the slave node
    temp = datafile.map(lambda (path, content): myfunc(str(path).strip('file:')))

    ### Collect the result and print it out.
    for x in temp.collect():
            print x

if __name__ == "__main__":
    main()

Je voudrais utiliser Apache-Spark pour paralléliser le processus de sommation de plusieurs fichiers csv en utilisant le même code python. J'ai déjà effectué les étapes suivantes:

  1. J'ai créé un nœud maître et deux nœuds esclaves sur AWS.
  2. J'ai utilisé la commande bash $ scp -r -i my-key-pair.pem my_dir root@ec2-52-27-82-124.us-west-2.compute.amazonaws.com pour télécharger le répertoire my_dir comprenant mon code python avec les fichiers csv sur le nœud maître du cluster.
  3. Je me suis connecté à mon nœud maître, et à partir de là, j'ai utilisé la commande bash $ ./spark/copy-dir my_dir pour envoyer mon code python ainsi que des fichiers csv à tous les nœuds esclaves.
  4. J'ai configuré les variables d'environnement sur le nœud maître:

    $ export SPARK_HOME=~/spark

    $ export PYTHONPATH=$SPARK_HOME/python/:$PYTHONPATH

Cependant, lorsque j'exécute le code python sur le nœud maître: $ python sum.py, il affiche l'erreur suivante:

Traceback (most recent call last):
  File "sum.py", line 18, in <module>
    from pyspark import SparkConf, SparkContext
  File "/root/spark/python/pyspark/__init__.py", line 41, in <module>
    from pyspark.context import SparkContext
  File "/root/spark/python/pyspark/context.py", line 31, in <module>
    from pyspark.java_gateway import launch_gateway
  File "/root/spark/python/pyspark/java_gateway.py", line 31, in <module>
    from py4j.java_gateway import java_import, JavaGateway, GatewayClient
ImportError: No module named py4j.java_gateway

Je n'ai aucune idée de cette erreur. De plus, je me demande si le nœud maître appelle automatiquement tous les nœuds esclaves à fonctionner en parallèle. J'apprécie vraiment si quelqu'un peut m'aider.

1
Ruofan Kong 17 juil. 2015 à 00:31

2 réponses

Meilleure réponse

Voici comment je déboguerais cette erreur d'importation particulière.

  1. ssh sur votre nœud maître
  2. Exécutez le Python REPL avec $ python
  3. Essayez la ligne d'importation qui a échoué >> from py4j.java_gateway import java_import, JavaGateway, GatewayClient
  4. En cas d'échec, essayez simplement d'exécuter >> import py4j
  5. Si cela échoue, cela signifie que py4j n'est pas installé sur votre système ou ne peut pas le trouver.
  6. Quittez le REPL >> exit()
  7. Essayez d'installer py4j $ pip install py4j (vous devrez installer pip)
  8. Ouvrez le REPL $ python
  9. Réessayez d'importer >> from py4j.java_gateway import java_import, JavaGateway, GatewayClient
  10. Si cela fonctionne, >> exit() et réessayez d'exécuter votre $ python sum.py
2
eatCitrus 18 juil. 2015 à 21:03

Je pense que vous posez deux questions distinctes. Il semble que vous ayez une erreur d'importation. Est-il possible que vous ayez une version différente du package py4j installée sur votre ordinateur local que vous n'avez pas installée sur votre nœud maître?

Je ne peux pas m'empêcher d'exécuter cela en parallèle.

0
eatCitrus 17 juil. 2015 à 02:25