Spark 2.2 a introduit une source de streaming structurée de Kafka. Si je comprends bien, il s'appuie sur le répertoire des points de contrôle HDFS pour stocker les décalages et garantir une livraison de message "exactement une fois".

Mais les vieux quais (comme https://blog.cloudera.com/blog/2017/06/offset-management-for-apache-kafka-with-apache-spark-streaming/) indique que les points de contrôle Spark Streaming ne sont pas récupérables entre les applications ou des mises à niveau Spark et donc pas très fiables. En tant que solution, il existe une pratique pour prendre en charge le stockage des décalages dans un stockage externe qui prend en charge des transactions telles que MySQL ou RedshiftDB.

Si je souhaite stocker des décalages de la source Kafka vers une base de données transactionnelle, comment puis-je obtenir un décalage à partir d'un lot de flux structuré?

Auparavant, cela pouvait être fait en castant RDD vers HasOffsetRanges:

val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges    

Mais avec la nouvelle API Streaming, j'ai un Dataset de InternalRow et je ne trouve pas de moyen facile de récupérer les décalages. L'API Sink n'a que la méthode addBatch(batchId: Long, data: DataFrame) et comment puis-je supposer obtenir un décalage pour un ID de lot donné?

28
dnaumenko 11 sept. 2017 à 13:07

2 réponses

L'ensemble de données en streaming avec la source Kafka a offset comme l'un des champ. Vous pouvez simplement rechercher tous les décalages dans la requête et les enregistrer dans JDBC Sink

1
T. Gawęda 11 sept. 2017 à 10:43

Spark 2.2 a introduit une source de streaming structurée de Kafka. Si je comprends bien, il repose sur le répertoire de point de contrôle HDFS pour stocker les décalages et garantir une livraison de message "exactement une fois".

Correcte.

Chaque déclencheur Spark Structured Streaming enregistre les décalages dans le répertoire offset à l'emplacement du point de contrôle (défini à l'aide de l'option checkpointLocation ou de la spark.sql.streaming.checkpointLocation propriété Spark ou attribuée de manière aléatoire) qui est censé garantir que les décalages sont traités < strong> au plus une fois . Cette fonctionnalité s'appelle Écrire des journaux d'avance .

L'autre répertoire dans l'emplacement du point de contrôle est le répertoire commits pour les lots de streaming terminés avec un seul fichier par lot (avec un nom de fichier étant l'ID du lot).

Citant la documentation officielle dans Fault Tolerance Semantics:

Pour ce faire, nous avons conçu les sources de Streaming Structuré, les puits et le moteur d'exécution pour suivre de manière fiable la progression exacte du traitement afin qu'il puisse gérer tout type de panne en redémarrant et / ou en retraitant. Chaque source de streaming est supposée avoir des décalages (similaires aux décalages Kafka ou aux numéros de séquence Kinesis) pour suivre la position de lecture dans le flux. Le moteur utilise le point de contrôle et l'écriture anticipée des journaux pour enregistrer la plage de décalage des données en cours de traitement dans chaque déclencheur. Les récepteurs de streaming sont conçus pour être idempotents pour la gestion du retraitement. Ensemble, en utilisant des sources rejouables et des récepteurs idempotents, le streaming structuré peut garantir une sémantique de bout en bout exactement une fois en cas de panne.

Chaque fois qu'un déclencheur est exécuté, StreamExecution vérifie les répertoires et "calcule" quels décalages ont déjà été traités. Cela vous donne une sémantique au moins une fois et exactement une fois au total.

Mais d'anciens documents (...) indiquent que les points de contrôle Spark Streaming ne sont pas récupérables entre les applications ou les mises à niveau Spark et donc pas très fiables.

Il y avait une raison pour laquelle vous les appeliez "vieux", n'y était-il pas?

Ils font référence à l'ancien et (à mon avis) mort Spark Streaming qui gardait non seulement les décalages, mais tout le code de requête qui a conduit à des situations où le point de contrôle était presque inutilisable, par exemple. lorsque vous modifiez le code.

Les temps sont révolus maintenant et le streaming structuré est plus prudent en ce qui concerne les points de contrôle et quand.

Si je souhaite stocker des décalages de la source Kafka vers une base de données transactionnelle, comment puis-je obtenir un décalage à partir d'un lot de flux structuré?

Une solution pourrait être d’implémenter ou d’utiliser en quelque sorte MetadataLog interface qui est utilisée pour gérer les points de contrôle décalés. Cela pourrait fonctionner.

comment puis-je supposer obtenir un décalage pour un identifiant de lot donné?

Ce n'est actuellement pas possible.

Je crois comprendre que vous ne pourrez pas le faire car la sémantique du streaming vous est cachée. Vous ne devriez tout simplement pas vous occuper de cette "chose" de bas niveau appelée offsets que Spark Structured Streaming utilise pour offrir des garanties une seule fois.

Citant Michael Armbrust lors de son discours au Spark Summit Traitement de flux simple, évolutif et tolérant aux pannes avec le streaming structuré dans Apache Spark:

vous ne devriez pas avoir à raisonner sur le streaming

Et plus loin dans l'exposé (sur la diapositive suivante):

vous devez écrire des requêtes simples et Spark doit constamment mettre à jour la réponse


Il existe un moyen d'obtenir des décalages (à partir de n'importe quelle source, y compris Kafka) en utilisant StreamingQueryProgress que vous pouvez intercepter en utilisant StreamingQueryListener et onQueryProgress callback.

onQueryProgress (event: QueryProgressEvent): Unit Appelé en cas de mise à jour de l'état (taux d'ingestion mis à jour, etc.)

Avec StreamingQueryProgress, vous pouvez accéder à la propriété sources avec SourceProgress qui vous donne ce que vous voulez.

48
Jacek Laskowski 14 sept. 2017 à 13:32