Apache Spark est un moteur de traitement de données distribué open source écrit en Scala fournissant une API unifiée et des ensembles de données distribués aux utilisateurs. Les cas d'utilisation d'Apache Spark sont souvent liés à la machine / apprentissage en profondeur, au traitement graphique.

apache-spark...

Quelqu'un peut-il m'aider à créer une trame de données où la valeur d'horodatage de début et de fin est donnée et nous devons imprimer toutes les heures par incrément? Exemple d'horodatage de début: 02-05-2020 01:00 Horodatage de fin: 03-05-2020 02:00 Donc, le df devrait avoir toutes ces valeurs: 0....
2 juin 2020 à 10:18
J'ai un fichier texte comme ci-dessous 1234_4567_DigitalDoc_XRay-01.pdf 2345_5678_DigitalDoc_CTC-03.png 1234_5684_DigitalDoc_XRay-05.pdf 1234_3345_DigitalDoc_XRay-02.pdf J'attends la sortie comme | catg|sub_catg| doc_name |revision_label|extension| |1234| 4567|DigitalDoc_XRay-01.pdf....
1 juin 2020 à 13:26
J'ai une colonne de dataframe pyspark qui contient des données comme ci-dessous. event_list PL:1547497782:1547497782~ST:1548593509:1547497782 PU:1547497782:1547497782~MU:1548611698:1547497782:1~MU:1548612195:1547497782:0~ST:1548627786:1547497782 PU:1547497782:1547497782~PU:1547497782:1547497782~ST:1....
J'ai besoin de rechercher une valeur dans toutes les colonnes Spark DataFrame. J'ai essayé ça; for col_name in df.columns: filter_df = df.where(df[col_name].like('%searchterm%')) Mais il n'y a pas de correspondance dans filter_df (filter.df.count() = 0) Si je le teste avec un nom de colonne dont ....
26 mai 2020 à 09:46
Je suis nouveau sur pyspark. Je travaille généralement avec des pandas. I pour parcourir ligne par ligne en utilisant une colonne dans pyspark. Mon ensemble de données ressemble à: - +-------------------+--------------------+--------+-----+ | DateTime| user_name|keyboard|mouse| +....
J'essaie de lire à partir d'une table en utilisant com.microsoft.azure. Voici l'extrait de code import org.apache.log4j.{Level, Logger} import org.apache.spark.sql.SparkSession import com.microsoft.azure.sqldb.spark.config.Config import com.microsoft.azure.sqldb.spark.connect._ import com.microsoft.....
20 mai 2020 à 00:06
J'essaie d'accéder au fichier hadoop dans spark mais j'obtiens cette erreur org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://localhost:9000/ex1/cen.csv at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:287) Je peux afficher....
18 mai 2020 à 19:00
val data = Seq( ("India","Pakistan","India"), ("Australia","India","India"), ("New Zealand","Zimbabwe","New Zealand"), ("West Indies", "Bangladesh","Bangladesh"), ("Sri Lanka","Bangladesh","Bangladesh"), ("Sri Lanka","Bangladesh","Bangladesh"), ("Sri Lanka","Bangladesh","Bangladesh") )....
Actuellement, le schéma de ma table est: root |-- product_id: integer (nullable = true) |-- product_name: string (nullable = true) |-- aisle_id: string (nullable = true) |-- department_id: string (nullable = true) Je veux appliquer le schéma ci-dessous sur le tableau ci-dessus et supprimer tout....
14 mai 2020 à 01:14
Selon les documents: Pour le type d'étape, choisissez l'application Spark. Mais dans Amazon EMR -> Clusters -> mycluster -> Steps -> Add step -> Step type, les seules options sont: ....
Je suis assez nouveau dans PySpark et j'ai du mal à effectuer ce que je pense devrait être une tâche simple ... J'ai un dataframe PySpark, où 1 colonne se compose de listes de chaînes. Je voudrais compter le nombre d'instances de chaque élément dans chaque liste de chaînes sur toutes les lignes. Le ....
8 mai 2020 à 18:43
J'essaye de créer un tableau de carte basé sur quelques conditions. Voici ma fonction. Même si je suis fourni Map scala me force à avoir un tuple comme type de retour. Y a-t-il un moyen de résoudre ce problème? def getSchemaMap(schema: StructType): Array[(String, String)] ={ schema.fields.flatM....
8 mai 2020 à 00:14
Je lis un fichier csv en utilisant l'option inferschema activée dans le cadre de données en utilisant la commande ci-dessous. df2 = spark.read.options(Map("inferSchema"->"true","header"->"true")).csv("s3://Bucket-Name/Fun/Map/file.csv") df2.printSchema() Output: root |-- CC|Fun|Head|Country|Send....
7 mai 2020 à 12:11
J'ai une trame de données: +------------+------------+-------------+ | id| column1| column2| +------------+------------+-------------+ | 1| 1| 5| | 1| 2| 5| | 1| 3| 5| | 2| ....
7 mai 2020 à 00:33
J'essaie de lire les messages Kafka en JSON dans Spark Structured Streaming. Voici un exemple des messages dans Kafka: { "_id": { "$oid": "5eb292531c7d910b8c98dbce" }, "Id": 37, "Timestamp": { "$date": 1582889068616 }, "TTNR": "R902170286", "SNR": 91177446, "State": 0, "I_A....
J'ai un cluster K8s opérationnel, sur des machines virtuelles à l'intérieur de VMWare Workstation, à partir de maintenant. J'essaye de déployer une application Spark de manière native en utilisant la documentation officielle de ici. Cependant, j'ai également atterri sur cet article qui l'a rendu plu....
J'ai une trame de données qui ressemble à ceci: root |-- value: int (nullable = true) |-- date: date (nullable = true) Je voudrais retourner la valeur où la valeur est la dernière date dans le dataframe. Ce problème change-t-il si je dois créer un groupBy et un agg? Mon problème réel ressemble à ....
J'ai une trame de données: +---------+---------------------+ | id| Name| +---------+---------------------+ | 1| 'Gary'| | 1| 'Danny'| | 2| 'Christopher'| | 2| ....
1 mai 2020 à 17:47
Donc, je veux créer une nouvelle colonne dans mon dataframe, dont les lignes dépendent des valeurs de deux colonnes, et implique également une condition. J'ai essayé ça, mais ça ne marche pas. some_value = ... df = df.withColumn("new_col", col("col1") if col("col2") == some_value else None) Quelle ....
Comment parcourir les lignes Spark DataFrame et les ajouter à une séquence d'objets de classe de cas? DF1: val someDF = Seq( ("202003101750", "202003101700",122), ("202003101800", "202003101700",12), ("202003101750", "202003101700",42) ).toDF("number", "word","value") Classe de cas: case clas....
J'ai une trame de données comme: Name_Index City_Index 2.0 1.0 0.0 2.0 1.0 0.0 J'ai une nouvelle liste de valeurs. list(1.0,1.0) Je souhaite ajouter ces valeurs à une nouvelle ligne dans dataframe dans le cas où toutes les lignes précédentes sont supprimées. Mon code....
23 avril 2020 à 13:31
J'ai une trame de données qui ressemble à ceci val df = Seq( (1,"a,b,c"), (2,"b,c") ).toDF("id","page_path") df.createOrReplaceTempView("df") df.show() +---+---------+ | id|page_path| +---+---------+ | 1| a,b,c| | 2| b,c| +---+---------+ Je souhaite effectuer un encodage à chaud sur ce....
20 avril 2020 à 18:50
J'ai demandé des données à HDFS et j'aimerais obtenir les métadonnées des fichiers à partir desquels elles ont été lues. Cela me permettra de créer des rapports qui ressembleront à des données disponibles à un moment donné. J'ai trouvé la solution qui consiste à utiliser org.apache.hadoop.fs.FileSys....
20 avril 2020 à 10:56
J'ai actuellement le code suivant: def _join_intent_types(df): mappings = { 'PastNews': 'ContextualInformation', 'ContinuingNews': 'News', 'KnownAlready': 'OriginalEvent', 'SignificantEventChange': 'NewSubEvent', } return df.withColumn('Categories', posexplode('Categories').ali....
17 avril 2020 à 12:35
J'ai une colonne avec la valeur * NZ, je veux supprimer le *, df.groupBy('State1').count().show() (5) Spark Jobs +-----------+-----+ | State1|count| +-----------+-----+ | NT| 1423| | ACT| 2868| | SA|12242| | TAS| 4603| | WA|35848| | *NZ| 806| | ....
16 avril 2020 à 04:31