J'ai un service qui diffuse des données vers un deuxième service qui reçoit un flux d'objets et les enregistre dans mon MongoDB. dans ma fonction d'abonnement sur l'objet Flux que je reçois du service de streaming, j'utilise la méthode save de l'interface ReactiveMongoRepository. lorsque j'essaie d'utiliser la fonction de blocage et d'obtenir les données, j'obtiens l'erreur suivante :

2019-10-11 13:30:38.559  INFO 19584 --- [localhost:27017] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:1, serverValue:25}] to localhost:27017
2019-10-11 13:30:38.566  INFO 19584 --- [localhost:27017] org.mongodb.driver.cluster               : Monitor thread successfully connected to server with description ServerDescription{address=localhost:27017, type=STANDALONE, state=CONNECTED, ok=true, version=ServerVersion{versionList=[4, 0, 1]}, minWireVersion=0, maxWireVersion=7, maxDocumentSize=16777216, logicalSessionTimeoutMinutes=30, roundTripTimeNanos=6218300}
2019-10-11 13:30:39.158  INFO 19584 --- [ctor-http-nio-4] quote-monitor-service                    : onNext(Quote(id=null, ticker=AAPL, price=164.8, instant=2019-10-11T10:30:38.800Z))
2019-10-11 13:30:39.411  INFO 19584 --- [ctor-http-nio-4] quote-monitor-service                    : cancel()
2019-10-11 13:30:39.429  INFO 19584 --- [ntLoopGroup-2-2] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:3, serverValue:26}] to localhost:27017
2019-10-11 13:30:39.437  WARN 19584 --- [ctor-http-nio-4] io.netty.util.ReferenceCountUtil         : Failed to release a message: DefaultHttpContent(data: PooledSlicedByteBuf(freed), decoderResult: success)

io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
    at io.netty.util.internal.ReferenceCountUpdater.toLiveRealRefCnt(ReferenceCountUpdater.java:74) ~[netty-common-4.1.39.Final.jar:4.1.39.Final]
    at io.netty.util.internal.ReferenceCountUpdater.release(ReferenceCountUpdater.java:138) ~[netty-common-4.1.39.Final.jar:4.1.39.Final]
    at 
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-4
Caused by: java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-4
    at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:77) ~[reactor-core-3.2.12.RELEASE.jar:3.2.12.RELEASE]
    at reactor.core.publisher.Mono.block(Mono.java:1494) ~[reactor-core-3.2.12.RELEASE.jar:3.2.12.RELEASE]
    at

Mon code:

stockQuoteClient.getQuoteStream()
                .log("quote-monitor-service")
                .subscribe(quote -> {
                    Mono<Quote> savedQuote = quoteRepository.save(quote);
                    System.out.println("I saved a quote! Id: " +savedQuote.block().getId());
                });

Après quelques fouilles, j'arrive à le faire fonctionner mais je ne comprends pas pourquoi cela fonctionne maintenant. le nouveau code :

stockQuoteClient.getQuoteStream()
                .log("quote-monitor-service")
                .subscribe(quote -> {
                       Mono<Quote> savedQuote = quoteRepository.insert(quote);
                       savedQuote.subscribe(result ->
                                 System.out.println("I saved a quote! Id :: " + result.getId()));
    });

La définition de block() : Abonnez-vous à ce Mono et bloquez indéfiniment jusqu'à ce qu'un prochain signal soit reçu.

La définition de s'abonner () : Abonnez-vous à ce Mono et demandez une demande illimitée.

Quelqu'un peut-il m'aider à comprendre pourquoi le blocage n'a pas fonctionné et l'abonnement a fonctionné ? Qu'est-ce que j'oublie ici?

5
israel berko 11 oct. 2019 à 13:51

1 réponse

Meilleure réponse

Le blocage est mauvais, car il bloque un thread en attente d'une réponse. C'est très mauvais dans un framework réactif qui a peu de threads à sa disposition, et est conçu pour que aucun d'entre eux ne soit inutilement bloqué.

C'est précisément ce que les frameworks réactifs sont conçus pour éviter, donc dans ce cas, cela vous empêche simplement de le faire :

block()/blockFirst()/blockLast() sont bloquants, ce qui n'est pas pris en charge dans le thread réacteur-http-nio-4

Votre nouveau code, en revanche, fonctionne de manière asynchrone. Le thread n'est pas bloqué, car rien ne se passe réellement jusqu'à ce que le référentiel renvoie une valeur (puis le lambda que vous avez transmis à savedQuote.subscribe() est exécuté, affichant votre résultat sur la console.)

Cependant, le nouveau code n'est toujours pas optimal / normal du point de vue des flux réactifs, car vous faites toute votre logique dans votre méthode d'abonnement. La chose normale à faire est de nous envoyer une série d'appels flatMap/map pour transformer les éléments du flux et d'utiliser doOnNext() pour les effets secondaires (comme l'impression d'une valeur) :

stockQuoteClient.getQuoteStream()
            .log("quote-monitor-service")
            .flatMap(quoteRepository::insert)
            .doOnNext(result -> System.out.println("I saved a quote! Id :: " + result.getId())))
            .subscribe();

Si vous effectuez un travail sérieux avec des réacteurs / flux réactifs, cela vaudrait la peine de les lire en général. Ils sont très puissants pour le travail non bloquant, mais ils nécessitent une façon de penser (et de coder) différente de celle de Java plus "standard".

3
Michael Berry 11 oct. 2019 à 11:13