J'ai un fichier d'entrée foo.txt avec le contenu suivant:

c1|c2|c3|c4|c5|c6|c7|c8|
00| |1.0|1.0|9|27.0|0||
01|2|3.0|4.0|1|10.0|1|1|

Je veux le transformer en Dataframe pour effectuer des requêtes Sql:

var text = sc.textFile("foo.txt")
var header = text.first()
var rdd = text.filter(row => row != header)
case class Data(c1: String, c2: String, c3: String, c4: String, c5: String, c6: String, c7: String, c8: String)

Jusqu'à ce point, tout va bien, le problème vient dans la phrase suivante:

var df = rdd.map(_.split("\\|")).map(p => Data(p(0), p(1), p(2), p(3), p(4), p(5), p(6), p(7))).toDF()

Si j'essaie d'imprimer df avec df.show, j'obtiens un message d'erreur:

scala> df.show()
java.lang.ArrayIndexOutOfBoundsException: 7

Je sais que l'erreur peut être due à la phrase scindée. J'ai également essayé de diviser foo.txt en utilisant la syntaxe suivante:

var df = rdd.map(_.split("""|""")).map(p => Data(p(0), p(1), p(2), p(3), p(4), p(5), p(6), p(7))).toDF()

Et puis j'obtiens quelque chose comme ça:

scala> df.show()
+------+---------+----------+-----------+-----+-----------+----------------+----------------+
|  c1  |     c2  |    c3    |     c4    |  c5 |     c6    |        c7      |       c8       |
+------+---------+----------+-----------+-----+-----------+----------------+----------------+
|     0|        0|         ||           |    ||          1|               .|               0|
|     0|        1|         ||          2|    ||          3|               .|               0|
+------+---------+----------+-----------+-----+-----------+----------------+----------------+

Par conséquent, ma question est de savoir comment puis-je transmettre correctement ce fichier à un Dataframe.

EDIT: L'erreur est dans la première ligne en raison du champ || sans espace intermédiaire. Ce type de définition de champ en fonction des exemples fonctionne bien ou se bloque.

4
qwerty 6 avril 2017 à 14:30

2 réponses

Meilleure réponse

En effet, l'une de vos lignes est plus courte que les autres:

scala> var df = rdd.map(_.split("\\|")).map(_.length).collect()
df: Array[Int] = Array(7, 8)

Vous pouvez remplir les lignes manuellement (mais vous devez gérer chaque cas manuellement):

val df = rdd.map(_.split("\\|")).map{row =>
  row match {
    case Array(a,b,c,d,e,f,g,h) => Data(a,b,c,d,e,f,g,h)
    case Array(a,b,c,d,e,f,g) => Data(a,b,c,d,e,f,g," ")
  }
}

scala> df.show()
+---+---+---+---+---+----+---+---+
| c1| c2| c3| c4| c5|  c6| c7| c8|
+---+---+---+---+---+----+---+---+
| 00|   |1.0|1.0|  9|27.0|  0|   |
| 01|  2|3.0|4.0|  1|10.0|  1|  1|
+---+---+---+---+---+----+---+---+

ÉDITER:

Une solution plus générique serait quelque chose comme ceci:

val df = rdd.map(_.split("\\|", -1)).map(_.slice(0,8)).map(p => Data(p(0), p(1), p(2), p(3), p(4), p(5), p(6), p(7))).toDF()

Si vous supposez que vous avez toujours le bon nombre de délimiteurs, vous pouvez utiliser cette syntaxe en toute sécurité pour tronquer la dernière valeur.

5
jamborta 14 avril 2017 à 13:53

Ma suggestion serait d'utiliser l'analyseur csv de databrick.

Lien: https://github.com/databricks/spark-csv

Pour charger votre exemple:

J'ai chargé un exemple de fichier similaire au vôtre:

c1|c2|c3|c4|c5|c6|c7|c8|
00| |1.0|1.0|9|27.0|0||
01|2|3.0|4.0|1|10.0|1|1|

Pour créer le dataframe, utilisez le code ci-dessous:

  val df = sqlContext.read
    .format("com.databricks.spark.csv")
    .option("header", "true") // Use first line of all files as header
    .option("inferSchema", "true") // Automatically infer data types
    .option("delimiter", "|") // default is ","
    .load("foo.txt")
    .show

J'ai la sortie ci-dessous

+---+---+---+---+---+----+---+----+---+
| c1| c2| c3| c4| c5|  c6| c7|  c8|   |
+---+---+---+---+---+----+---+----+---+
|  0|   |1.0|1.0|  9|27.0|  0|null|   |
|  1|  2|3.0|4.0|  1|10.0|  1|   1|   |
+---+---+---+---+---+----+---+----+---+

De cette façon, vous n'avez pas à vous soucier de l'analyse du fichier vous-même. Vous obtenez un dataframe directement

3
Sanchit Grover 10 avril 2017 à 09:26