Je travaille avec Akka (version 2.4.17) pour construire un Flow d'observation en Java (disons des éléments de type <T> pour rester générique).

Mon exigence est que ce flux soit personnalisable pour fournir un nombre maximum d'observations par unité de temps dès leur arrivée . Par exemple, il devrait être capable de fournir au plus 2 observations par minute (la première qui arrive, le reste peut être abandonné).

J'ai regardé de très près la documentation Akka, et en particulier cette page qui détaille les étapes intégrées et leur sémantique.

Jusqu'à présent, j'ai essayé les approches suivantes.

  • En mode throttle et shaping() (pour ne pas fermer le flux lorsque la limite est dépassée):

      Flow.of(T.class)
           .throttle(2, 
                     new FiniteDuration(1, TimeUnit.MINUTES), 
                     0, 
                     ThrottleMode.shaping())
    
  • Avec groupedWith et une méthode personnalisée intermédiaire:

    final int nbObsMax = 2;
    
    Flow.of(T.class)
        .groupedWithin(Integer.MAX_VALUE, new FiniteDuration(1, TimeUnit.MINUTES))
        .map(list -> {
             List<T> listToTransfer = new ArrayList<>();
             for (int i = list.size()-nbObsMax ; i>0 && i<list.size() ; i++) {
                 listToTransfer.add(new T(list.get(i)));
             }
             return listToTransfer;
        })
        .mapConcat(elem -> elem)  // Splitting List<T> in a Flow of T objects
    

Les approches précédentes me donnent le nombre correct d'observations par unité de temps, mais ces observations sont conservées et livrées uniquement à la fin de la fenêtre de temps (et il y a donc un délai supplémentaire).

Pour donner un exemple plus concret, si les observations suivantes arrivent dans mon Flow:

[Obs1 t = 0s] [Obs2 t = 45s] [Obs3 t = 47s] [Obs4 t = 121s] [Obs5 t = 122s]

Il ne devrait afficher les suivants que dès leur arrivée (le temps de traitement peut être négligé ici):

Fenêtre 1: [Obs1 t ~ 0s] [Obs2 t ~ 45s] Fenêtre 2: [Obs4 t ~ 121s] [Obs5 t ~ 122s]

Toute aide sera appréciée, merci d'avoir lu mon premier article StackOverflow;)

3
Antoine 20 avril 2017 à 11:17

3 réponses

Meilleure réponse

Je ne peux pas penser à une solution prête à l'emploi qui fasse ce que vous voulez. Throttle émettra dans un flux constant en raison de la façon dont il est implémenté avec le modèle de compartiment, plutôt que d'avoir un bail autorisé au début de chaque période.

Pour obtenir le comportement exact que vous recherchez, vous devez créer votre propre étape de limite de débit personnalisée (ce qui n'est peut-être pas si difficile). Vous pouvez trouver la documentation sur la création d'étapes personnalisées ici: http://doc.akka.io/docs/akka/2.5.0/java/stream/stream-customize.html#custom-linear-processing-stages -utilisant-graphstage

Une conception qui pourrait fonctionner est d'avoir un compteur d'allocation indiquant le nombre d'éléments qui peuvent être émis que vous réinitialisez à chaque intervalle, pour chaque élément entrant, vous en soustrayez un du compteur et émettez, lorsque l'allocation est épuisée, vous continuez à tirer en amont mais vous rejetez les éléments. plutôt que de les émettre. L'utilisation de TimerGraphStageLogic pour GraphStageLogic vous permet de définir un rappel chronométré qui peut réinitialiser l'allocation.

3
johanandren 26 avril 2017 à 06:49

Grâce à la réponse de @johanandren, j'ai implémenté avec succès un GraphStage basé sur le temps personnalisé qui répond à mes besoins.

Je poste le code ci-dessous, si quelqu'un est intéressé:

import akka.stream.Attributes;
import akka.stream.FlowShape;
import akka.stream.Inlet;
import akka.stream.Outlet;
import akka.stream.stage.*;
import scala.concurrent.duration.FiniteDuration;

public class CustomThrottleGraphStage<A> extends GraphStage<FlowShape<A, A>> {

    private final FiniteDuration silencePeriod;
    private int nbElemsMax;

    public CustomThrottleGraphStage(int nbElemsMax, FiniteDuration silencePeriod) {
        this.silencePeriod = silencePeriod;
        this.nbElemsMax = nbElemsMax;
    }

    public final Inlet<A> in = Inlet.create("TimedGate.in");
    public final Outlet<A> out = Outlet.create("TimedGate.out");

    private final FlowShape<A, A> shape = FlowShape.of(in, out);
    @Override
    public FlowShape<A, A> shape() {
        return shape;
    }

    @Override
    public GraphStageLogic createLogic(Attributes inheritedAttributes) {
        return new TimerGraphStageLogic(shape) {

            private boolean open = false;
            private int countElements = 0;

            {
                setHandler(in, new AbstractInHandler() {
                    @Override
                    public void onPush() throws Exception {
                        A elem = grab(in);
                        if (open || countElements >= nbElemsMax) {
                            pull(in);  // we drop all incoming observations since the rate limit has been reached
                        }
                        else {
                            if (countElements == 0) { // we schedule the next instant to reset the observation counter
                                scheduleOnce("resetCounter", silencePeriod);
                            }
                            push(out, elem); // we forward the incoming observation
                            countElements += 1; // we increment the counter
                        }
                    }
                });
                setHandler(out, new AbstractOutHandler() {
                    @Override
                    public void onPull() throws Exception {
                        pull(in);
                    }
                });
            }

            @Override
            public void onTimer(Object key) {
                if (key.equals("resetCounter")) {
                    open = false;
                    countElements = 0;
                }
            }
        };
    }
}
1
Antoine 2 mai 2017 à 08:16

Je pense que c'est exactement ce dont vous avez besoin: http://doc.akka.io/docs/akka/2.5.0/java/stream/stream-cookbook.html#Globally_limiting_the_rate_of_a_set_of_streams

2
meln1k 26 avril 2017 à 14:20