J'utilise Spring Reactor avec Spring Cloud Stream (GCP Pub / Sub Binder) et je rencontre des problèmes de gestion des erreurs. Je suis en mesure de reproduire le problème avec un exemple très simple:

@Bean
public Function<Flux<String>, Mono<Void>> consumer() {
    return flux -> flux
        .doOnNext(msg -> log.info("New message received: {}", msg))
        .map(msg -> {
            if (true) { 
                throw new RuntimeException("exception encountered!");
            }
            return msg;
        })
        .doOnError(throwable -> log.error("Failed to consume message", throwable))
        .then();
}

Le comportement auquel je m'attends est de voir "Impossible de consommer le message" imprimé, cependant, ce n'est pas ce qui semble se produire. Lors de l'ajout d'un appel .log() à la chaîne, je vois des signaux onNext / onComplete, je m'attends à voir des signaux onError.

Mon code actuel ressemble à ceci:

@Bean
public Function<Flux<CustomMessage>, Mono<Void>> consumer(MyService myService) {
    return flux -> flux
        .doOnNext(msg -> log.info("New message received: {}", msg))
        .flatMap(myService::processMessage) // exception happens deep in here
        .doOnError(throwable -> log.error("Failed to consume message", throwable))
        .then();
}

J'ai remarqué qu'au fond de ma classe de service, j'essayais de gérer les erreurs sur mes éditeurs Reactor. Cependant, le signal onError ne se produirait pas lors de l'utilisation de Spring Cloud Stream. Si j'appelais simplement mon service en tant que tel myService.processMessage(msg) dans un test unitaire et me moquais de l'exception, ma chaîne réactive propagerait correctement les signaux d'erreur.

Cela semble être un problème lorsque je me connecte à Spring Cloud Stream. Je me demande si Spring Cloud Function / Stream effectue un wrapping d'erreur global?

Dans mon code non trivial , je remarque ce message d'erreur qui peut avoir quelque chose à voir avec pourquoi je ne reçois pas de signaux d'erreur?

ERROR --- onfiguration$FunctionToDestinationBinder : Failed to process the following content which will be dropped: ...

Pour aggraver ma confusion, je peux obtenir le signal onError dans ma chaîne réactive si je passe ma liaison Spring Cloud Stream à l'implémentation non réactive comme suit:

@Bean
public Consumer<CustomMessage> consumer(MyService myService) {
    return customMessage -> Mono.just(customMessage)
        .doOnNext(msg -> log.info("New message received: {}", msg))
        .flatMap(myService::processMessage) // exception happens deep in here
        .doOnError(throwable -> log.error("Failed to consume message", throwable)) // prints successfully this time
        .subscribe();
}
1
Jon Catanio 2 oct. 2020 à 00:16

2 réponses

Meilleure réponse

C'est donc ce que j'ai retenu de mes propres enquêtes, peut-être que cela pourrait aider d'autres. Attention, je n'utilise peut-être pas le bon "Spring Reactor Language" mais c'est ainsi que j'ai fini par le résoudre ...

Dans Hoxton.SR5, un onErrorContinue était inclus dans la liaison réactive qui gérait l'abonnement au flux. Le problème avec onErrorContinue, c'est qu'il affecte les opérateurs < upstream en appliquant la fonction BiConsumer à l'opérateur qui a échoué (si pris en charge).

Cela signifie que lorsqu'une erreur se produisait dans nos opérateurs map / flatMap, le onErrorContinue BiConsumer se mettait en marche et modifiait le signal en aval en onComplete() (Mono<T> ) ou request(...) (s'il a demandé un nouvel élément à un Flux<T>). Cela a eu pour conséquence que nos opérateurs doOnError(...) ne s'exécutaient pas car il n'y avait pas de signaux onError().

Finalement, l'équipe SCS a décidé de supprimer ce wrapper de gestion des erreurs. Hoxton.SR6 ne l'a plus onErrorContinue. Cependant, cela signifiait que les exceptions se propageant jusqu'à la liaison SCS entraîneraient la rupture de l'abonnement Flux. Les messages suivants n'auraient alors nulle part où être acheminés car il n'y avait pas d'abonnés.

Cette gestion des erreurs a été transmise aux clients, nous ajoutons un opérateur onErrorResume à l ' éditeur interne pour supprimer efficacement les signaux d'erreur. Lorsqu'une erreur est rencontrée dans l'éditeur myService::processMessage, onErrorResume fera basculer les éditeurs vers l'éditeur de secours qui a été transmis en tant que paramètre et reprendra à partir de ce point dans la chaîne d'opérateurs. Dans notre cas, cet éditeur de secours renvoie simplement Mono.empty() ce qui nous permet de supprimer les signaux d'erreur tout en permettant aux mécanismes de gestion des erreurs internes de fonctionner sans affecter l'éditeur source externe.

onErrorResume Exemple / Explication

La technique ci-dessus peut être illustrée par un exemple très simple.

Flux.just(1, 2, 3)
    .flatMap(i -> i == 2
        ? Mono.error(new RuntimeException("error")
        : Mono.just(i))
    .onErrorResume(t -> Flux.just(4, 5, 6))
    .doOnNext(i -> log.info("Element: {}", i))
    .subscribe();

Le Flux<Integer> ci-dessus affichera ce qui suit:

Element: 1
Element: 4
Element: 5
Element: 6

Puisqu'une erreur est rencontrée au niveau de l'élément 2, onErrorResume la solution de secours intervient et le nouvel éditeur devient Flux.just(4, 5, 6) effectivement reprise de la solution de secours. Dans notre cas, nous ne voulons pas affecter l'éditeur source (c'est-à-dire Flux.just(1, 2, 3)). Nous voulons simplement supprimer l'élément erroné (2) et passer à l'élément suivant (3).

Nous ne pouvons pas simplement changer Flux.just(4, 5, 6) en Flux.empty() ou Mono.empty() comme ceci:

Flux.just(1, 2, 3)
    .flatMap(i -> i == 2
        ? Mono.error(new RuntimeException("error")
        : Mono.just(i))
    .onErrorResume(t -> Mono.empty())
    .doOnNext(i -> log.info("Element: {}", i))
    .subscribe();

Cela entraînerait la sortie de ce qui suit:

Element: 1

En effet, onErrorResume a remplacé les éditeurs en amont par l'éditeur de secours (c'est-à-dire Mono.empty()) et a repris à partir de ce moment.

Pour atteindre le résultat souhaité de:

Element: 1
Element: 3

Nous devons placer l'opérateur onErrorResume sur l'éditeur interne de flatMap:

public Mono<Integer> func(int i) {
    return i = 2 ? Mono.error(new RuntimeException("error")) : Mono.just(i);
}

Flux.just(1, 2, 3)
    .flatMap(i -> func(i)
        onErrorResume(t -> Mono.empty()))
    .doOnNext(i -> log.info("Element: {}", i))
    .subscribe();

Désormais, onErrorResume affecte uniquement l'éditeur interne renvoyé par func(i). Si une erreur se produit de la part des opérateurs dans func(i), onErrorResume se repliera pour Mono.empty() terminer efficacement le Mono<T> sans exploser. Cela permet également aux opérateurs de gestion des erreurs (par exemple doOnError) dans func(i) d'être appliqués avant l'exécution de la solution de secours. En effet, contrairement à onErrorContinue, il n'affecte pas les opérateurs en amont et modifie le signal suivant à l'emplacement de l'erreur.

Solution finale

En réutilisant l'extrait de code de ma question, j'ai mis à niveau ma version Spring Cloud vers Hoxton.SR6 et changé le code en quelque chose comme ceci:

@Bean
public Function<Flux<CustomMessage>, Mono<Void>> consumer(MyService myService) {
    return flux -> flux
        .doOnNext(msg -> log.info("New message received: {}", msg))
        .flatMap(msg -> myService.processMessage(msg)
            .onErrorResume(throwable -> Mono.empty())
        )
        .then();
}

Notez que le onErrorResume est sur l'éditeur interne (à l'intérieur du flatMap).

2
Jon Catanio 6 oct. 2020 à 15:18

Je pense que le problème existe dans le code suivant:

    .map(msg -> new RuntimeException("exception encountered!"))

Le lambda dans votre ligne de carte renvoie une exception, pas en lève une.

0
toolkit 1 oct. 2020 à 21:45