Apache Spark est un moteur de traitement de données distribué open source écrit en Scala fournissant une API unifiée et des ensembles de données distribués aux utilisateurs. Les cas d'utilisation d'Apache Spark sont souvent liés à la machine / apprentissage en profondeur, au traitement graphique.

apache-spark...

Je voulais filtrer les lignes qui ont des valeurs nulles pour toutes les colonnes d'une liste. Supposons par exemple que nous ayons le df suivant, df = spark.createDataFrame([(0, 1, 1, 2,1), (0, 0, 1, 0, 1), (1, 0, 1, 1 ,1)], ['a', 'b', 'c', 'd', 'e']) +---+---+---+---+---+ ....
J'ai écrit une classe parent: class Parent(): def __init__(self, spark_session=None): try: # Instantiate Spark Session self.spark = spark_session if not self.spark: self.spark = SparkSession.builder.config("spark.sql.debug.maxToStringFi....
6 janv. 2020 à 14:22
J'ai un dossier avec des To de données structurées et ils adhèrent tous à un schéma fixe. La structure des dossiers est la suivante: s3://main-bucket/ folder-1/ <= One folder will only contain data from one algorithm part-0000-1.csv <= Created on 06/01....
J'ai un rdd sous cette forme, rdd = sc.parallelize([('A', [1, 2, 4, 1, 2, 5]), ('B', [2, 3, 2, 1, 5, 10]), ('C', [3, 2, 5, 10, 5, 2])]) mais je veux transformer le rdd comme ci-dessous, newrdd = [('A', [1, 2, 4, 5]), ('B', [2, 3, 1, 5, 10], ('C', [3, 2, 5, 10])] ce qui signifie, je dois obtenir le....
27 déc. 2019 à 02:50
Nous avons une installation personnalisée d'Apache Hadoop sur les nœuds de calcul Azure et utilisons Apache Oozie pour planifier des flux de travail. Tous les fichiers xml de workflow et de coordinateur sont déployés dans le stockage externe Microsoft Azure Data Lake. Il existe actuellement un pyspa....
J'essaie de combiner la logique de cet extrait de code dans PySpark pour réduire la répétition de code. Tout conseil serait vraiment apprécié: col0 = when(visit[1] == '0', to_date(visit[0])).otherwise(None) col1 = when(visit[1] == '1', to_date(visit[0])).otherwise(None) ....
21 déc. 2019 à 02:46
J'essaie d'exécuter un programme Spark pour le traitement sémantique, mais il est bloqué à l'étape 2. Je me demande quel est le problème ici? # create Spark Context spark = SparkSession.builder.master("Semantic Processing")\ .config('spark.master', 'local')\ .getOrCreate() sqlc = SQL....
18 déc. 2019 à 18:41
J'ai le tableau des événements suivant, je voudrais les regrouper en petits groupes de temps comme spécifié ci-dessous. Le tableau doit être divisé en ensembles plus petits où la ligne de début et de fin de l'ensemble est déterminée par geohash si le geohash est le même, puis set keep incluant les l....
J'essaye de combiner deux fichiers csv avec rien en commun (aucune clé n'est commune) dans un rdd apparié clé-valeur en utilisant pyspark Disons que A.csv a a b c et B.csv a 1 2 3 y a-t-il une option dans pyspark pour obtenir un rdd en joignant ces deux, comme ceci a:1 b:2 c:3 bien sûr, le nombre....
17 déc. 2019 à 15:23
Je veux trouver un moyen efficace de copier un dossier / préfixe S3 avec beaucoup d'objets dans un autre dossier / préfixe sur le même compartiment. Voilà ce que j'ai essayé. Données de test: autour de 200 objets, autour de 100 MB chacun. 1) aws s3 cp --recursive. Il a fallu environ 150 secs. 2) s3-....
J'utilise le code Scala ci-dessous pour renommer un fichier CSV en fichier TXT et déplacer le fichier TXT. J'ai besoin de traduire ce code en Python / Pyspark mais j'ai des problèmes (pas bien versé en Python). J'apprécierais grandement votre aide. Merci d'avance! //Prepare to rename file import org....
J'utilise un docker avec quelques conteneurs (un pour Jupyter-Lab, un pour Spark et 3 pour chaque produit d'ELK (ElasticSearch, Kibana et Logstash). J'utilise également sparkmagic pour les cahiers de mes jupyter. Donc, ce que j'essaie de faire, c'est d'envoyer une sortie d'une cellule à spark puis d....
16 déc. 2019 à 19:03
J'ai un cadre de données df avec les données ci-dessous: Name Value Code a 1 1 b 2 1 c 3 2 d 4 2 Je souhaite convertir cette trame de données en un dictionnaire. J'ai essayé d'utiliser asDict (): map(lambda row: row....
16 déc. 2019 à 15:11
J'ai l'environnement cloudera-quickstart-vm-5.13.0. Dans cet environnement, Hadoop et Spark sont déjà installés. J'ai mis un fichier csv dans hdfs. Ensuite, j'ai écrit ce code java pour lire le csv et essayer de compter le nombre de routes de taxi pour chaque jour (par exemple, pour la journée 10/10....
15 déc. 2019 à 20:50
J'ai une liste de transactions où les utilisateurs prennent une planche d'une station à une autre. Il s'agit d'un tableau de tableaux, appelé trans: Board: User: Station: Action: Time: [ ['1', 'Ana', 'Tribeca', 'check_out', '1:00pm'], ['1', 'Ana', 'Soho' , 'park' , '2:00pm'], [....
15 déc. 2019 à 00:54
J'ai besoin de réduire toutes les deux lignes consécutives d'une trame de données et de remplacer la colonne Marques par des notes moyennes de deux lignes pour chaque catégorie. J'utilise Pyspark 2.4.4 sur Azure Databricks. Toute idée comment puis-je aborder la même chose. Mon exemple de trame de do....
Disons que j'ai une liste de noms de colonnes et qu'ils existent tous dans la trame de données Cols = ['A', 'B', 'C', 'D'], Je cherche un moyen rapide d'obtenir une table / un dataframe comme NA_counts min max A 5 0 100 B 10 0 120 C 8 1 99 ....
J'essaie d'écraser complètement une table postgres en utilisant un cadre de données spark. Pour une raison quelconque, même lorsque je spécifie mode("overwrite"), j'obtiens une erreur relation already exists postgres. Pourquoi mon code n'écrase-t-il pas les données dans la base de données comme il e....
12 déc. 2019 à 22:31
J'ai des problèmes pour diviser un fichier CSV via PySpark. J'essaie de sortir le pays et le nom du vin (c'est juste pour prouver que l'analyse fonctionne), mais je reçois une erreur. Voici à quoi ressemble le fichier CSV: , pays, description, désignation, points, prix, province, région_1, région_2,....
12 déc. 2019 à 01:32
J'ai créé une trame de données avec le schéma ci-dessous, j'essaie d'extraire les 10 premières valeurs dans "contents.monid" de chaque ligne pour laquelle j'ai créé un UDF 'udfTop'. >>> df.printSchema() |-- userid: long (nullable = true) |-- contents: array (nullable = true) | |-- element: str....
11 déc. 2019 à 23:01
Nous travaillons sur un programme qui trie les informations d'un ensemble de données et nous voulons .split () le fichier CSV. Le problème est que le champ que nous voulons diviser se trouve entre "" et a des virgules. (Nous séparons déjà les virgules). Il s'agit d'une description d'un produit inuti....
11 déc. 2019 à 22:19
Je développe une application sur Java Spark. Généré et chargé avec succès le .jar dans le cluster EMR. Il y a une ligne du code qui lit: JsonReader jsonReader = new JsonReader(new FileReader("s3://naturgy-sabt-dev/QUERY/input.json")); Je suis sûr à 100% de: Ce fichier existe. Lors de l'exécution de....
J'ai des données délimitées par des tabulations (fichier csv) comme ci-dessous: 201911240130 a 201911250132 b 201911250143 c 201911250223 z 201911250224 d ... Je veux écrire un groupe d'annuaires par année, mois, jour, heure. hdfs://dest/2019/11/24/01/xxxx.csv hdfs://dest/2019/11/25/01/xxxx.csv hdf....
11 déc. 2019 à 07:17
Je consomme un sujet Kafka avec un streaming d'étincelles et je dois compter toutes les occurrences de valeurs dans un tableau. Il est similaire aux exemples de nombre de mots canoniques, sauf que mes données d'entrée sont une liste de chaînes. Divulgation complète: je suis nouveau dans toutes les c....
11 déc. 2019 à 00:54
J'essaie d'utiliser withColumn pour annuler les mauvaises dates dans une colonne dans une trame de données, j'utilise une fonction when () pour effectuer la mise à jour. J'ai deux conditions pour les "mauvaises" dates. dates antérieures à janvier 1900 ou dates futures. Mon code actuel ressemble à ce....
10 déc. 2019 à 18:03