J'ai un générateur Flux (du réacteur du projet) pour le flux qui retarde 1 seconde pour émettre chaque élément. Et comme le flux est infini, je décide également du nombre d'éléments que je souhaite utiliser en utilisant la méthode take(). Je veux tester qu'une telle méthode peut être exécutée dans un temps donné. J'ai essayé de créer ma solution inspirée de cette réponse mais sans succès.

C'est la méthode Flux

import reactor.core.publisher.Flux;
import java.time.Duration;
@Slf4j
public class FluxAndMonoStreams {
    public Flux<Double> createFluxDoubleWithDelay(long numberOfElements, long delaySec) {
        return Flux.interval(Duration.ofSeconds(delaySec))
                .map(l -> l.doubleValue())
                .take(numberOfElements)
                .log();
    }
}

Et c'est le test unitaire. J'ai essayé les approches utilisant .thenConsumeWhile(value -> true), .expectNextCount(4), .expectNext(0.0, 1.0, 2.0, 3.0) après avoir attendu 4 secondes sur le .thenAwait(Duration.ofSeconds(4)). Mais aucun d'entre eux ne fonctionne.

import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
public class FluxAndMonoStreamsTest {
    FluxAndMonoStreams myFluxAndMonoStreams = new FluxAndMonoStreams();
    @Test
    void testCreateFluxStreamWithMapAndVerifyDelay() {
        Flux<Double> streamLongFlux = myFluxAndMonoStreams
                .createFluxDoubleWithDelay(4, 1);

        Scheduler scheduler = Schedulers.newSingle("test");
        AtomicInteger incrementer = new AtomicInteger();

        StepVerifier
                .withVirtualTime(() -> streamLongFlux
                        .subscribeOn(scheduler)
                        .doOnNext(value -> incrementer.incrementAndGet())
                )
                .expectSubscription()
                .thenAwait(Duration.ofSeconds(4))
                // .thenConsumeWhile(value -> true)
                // .expectNextCount(4)
                // .expectNext(0.0, 1.0, 2.0, 3.0)
                .verifyComplete()
        ;
    }
}

Je reçois cette erreur qui dit que je ne consomme pas les valeurs 1.0, 2.0, ... Fondamentalement, je veux m'assurer de ne consommer que les valeurs 0.0, 1.0, 2.0, 3.0 dans les 4 secondes. Comment je fais ça?

12:37:31.853 [Test worker] DEBUG reactor.util.Loggers - Using Slf4j logging framework
12:37:31.912 [test-1] INFO reactor.Flux.Take.1 - onSubscribe(FluxTake.TakeSubscriber)
12:37:31.915 [test-1] INFO reactor.Flux.Take.1 - request(unbounded)
12:37:32.917 [parallel-1] INFO reactor.Flux.Take.1 - onNext(0.0)
12:37:32.921 [parallel-1] INFO reactor.Flux.Take.1 - cancel()

expectation "expectComplete" failed (expected: onComplete(); actual: onNext(0.0))
java.lang.AssertionError: expectation "expectComplete" failed (expected: onComplete(); actual: onNext(0.0))

Explication supplémentaire : juste pour clarifier une fois de plus le comportement que je souhaite. Si je décommente .expectNext(0.0, 1.0, 2.0, 3.0) et remplace 4 secondes par 2 secondes dans .thenAwait(Duration.ofSeconds(2)), le test devrait échouer. Mais ce n'est pas.

1
Felipe 12 mars 2021 à 14:42

2 réponses

Meilleure réponse

Quelqu'un d'autre m'a aidé et je publie la solution ici. J'utilise JUnit 5 timeout. En utilisant @Timeout(value = 5, unit = TimeUnit.SECONDS) la réussite du test, si j'utilise 4 secondes ou moins, le test échoue. La solution n'est pas basée sur StepVerifier mais atteint toujours ses objectifs.

    @Test
    @Timeout(value = 5, unit = TimeUnit.SECONDS)
    void testCreateFluxStreamWithMapAndVerifyDelay() {
        Flux<Double> streamLongFlux = myFluxAndMonoStreams
                .createFluxDoubleWithDelay(4, 1);

        StepVerifier.create(streamLongFlux)
                .expectSubscription()
                .expectNext(0.0, 1.0, 2.0, 3.0)
                .verifyComplete();
    }
0
Felipe 13 mars 2021 à 12:25

Ce test fonctionne "bien" pour moi si je décommente le expectNext.

    public Flux<Double> create(long numberOfElements, long delaySec) {
        return Flux.interval(Duration.ofSeconds(delaySec)).map(l -> l.doubleValue()).take(numberOfElements).log();
    }

    @Test
    public void virtualTimeTest() {
        Flux<Double> streamLongFlux = create(4,1);

        Scheduler scheduler = Schedulers.newSingle("test");
        AtomicInteger incrementer = new AtomicInteger();

        StepVerifier
                .withVirtualTime(() -> streamLongFlux
                        .subscribeOn(scheduler)
                        .doOnNext(value -> incrementer.incrementAndGet())
                )
                .expectSubscription()
                .thenAwait(Duration.ofSeconds(4))
                // .thenConsumeWhile(value -> true)
                // .expectNextCount(4)
                 .expectNext(0.0, 1.0, 2.0, 3.0)
                .verifyComplete()
        ;
    }

Il ne fonctionne pas en temps virtuel car le Flux.interval est appelé avant l'établissement de l'heure virtuelle.

 StepVerifier.withVirtualTime(() -> create(4,1))
                    .expectSubscription()
                    .thenAwait(Duration.ofSeconds(4))
                    .expectNext(0.0, 1.0, 2.0, 3.0)
                    .verifyComplete();

Cela fonctionne en temps virtuel et prend 400 ms au lieu de 4,4 secondes

Un autre test encore plus explicite serait:

@Test
    public void virtualTime(){
        StepVerifier.withVirtualTime(() -> create(4,1))
                    .expectSubscription()
                    .expectNoEvent(Duration.ofSeconds(1))
                    .expectNext(0.0)
                    .expectNoEvent(Duration.ofSeconds(1))
                    .expectNext(1.0)
                    .expectNoEvent(Duration.ofSeconds(1))
                    .expectNext(2.0)
                    .expectNoEvent(Duration.ofSeconds(1))
                    .expectNext(3.0)
                    .verifyComplete();
    }
1
p.streef 12 mars 2021 à 18:58