J'ai une méthode qui fait un certain nombre de requêtes asynchrones à DB pour récupérer une grande quantité de données. Par souci de simplicité, disons que chaque requête renvoie un tableau d'entiers. Je veux transformer cette méthode en nombres observables et en sortie un par un. Cette partie fonctionne très bien.

Le problème commence avec l'opérateur «take» - je voudrais arrêter les requêtes DB si personne n'écoute le résultat de toute façon. Et mon problème est que la fonction 'scroll' n'arrêterait pas de s'exécuter jusqu'à ce qu'elle atteigne sa condition d'arrêt, même si 'largeQueryPromise' ne l'écoute plus à cause de l'opérateur take (10).

Existe-t-il une possibilité d'arrêter l'exécution d'observable, lorsque l'abonné se désabonne pour diverses raisons?

let ind = 0;
function dbRequest(): Promise<number[]> {
    return new Promise(resolve => resolve([ind++, ind++]));
}

async function largeQuery(index: number) {
    let res = await dbRequest();
    return new Observable(observer => scroll(observer, res, index));
}

function scroll(observer: Subscriber<number>, res: number[], index: number) {
    if (Math.round(Math.random() * 5) === 0) {
        console.log(`completed sequence ${index}`);
        observer.complete();
        return;
    }

    res.forEach(value => observer.next(value));
    dbRequest().then(arr => scroll(observer, arr, index));
}

async function largeQueryPromise(index: number) {
    console.log(`started sequence ${index}`);
    const obs = await largeQuery(index);
    obs.pipe(take(10)).subscribe(
        undefined, 
        console.error, 
        () => {
            console.log(`stopped to listen for sequence ${index}`);
            largeQueryPromise(++index).then();
        });
}

largeQueryPromise(0).then();
1
Eugene Shtoka 27 août 2020 à 00:47

2 réponses

Meilleure réponse

L'observateur a un paramètre «fermé» qui indique si cet abonné s'est désabonné. Sachant cela, la solution est triviale:

function scroll(observer: Subscriber<number>, res: number[], index: number) {
    if (Math.round(Math.random() * 5) === 0) {
        console.log(`completed sequence ${index}`);
        observer.complete();
        return;
    }

    for(let i=0; i<res.length && !observer.closed; i++)
        observer.next(res[i]);

    if(!observer.closed)
        dbRequest().then(arr => scroll(observer, arr, index));
}

Edit: Notez que techniquement, vous n'avez pas besoin de vérifier dans la boucle for - tout .next sera un noop.

1
olivarra1 27 août 2020 à 07:38

Votre largeQuery ne peut être effectué qu'avec des opérateurs. Utilisez expand pour appeler de manière récursive dbRequest() lorsque la requête précédente a été émise. Terminez cette récursion en renvoyant EMPTY. Utilisez concatAll pour répartir les émissions des tableaux entrants.

function largeQuery(index: number): Observable<number> {
  console.log("largeQuery2 for", index);
  return from(dbRequest()).pipe(
    expand(res => {
      if (Math.round(Math.random() * 5) === 0) {
        console.log(`completed sequence ${index}`);
        return EMPTY;
      }
      // The observable returned here gets subscribed to before the 'take' operator
      // below ends the subscription. To prevent an additional call of 'dbRequest'
      // at the end, the observable returned here has to be asynchronous. 
      // That's why 'timer' is used. 
      // If this doesn't turn out to be an issue for you, the line below could be 
      // replace with 'return defer(() => dbRequest())' or even 'return from(dbRequest())'
      return timer(0).pipe(switchMap(() => dbRequest()));
    }),
    concatAll()
  );
}

function recursiveLargeQuery(index: number) {
  console.log(`started sequence ${index}`);
  largeQuery(index).pipe(
    take(10),
  ).subscribe(
    v => console.log(v), 
    console.error, 
    () => {
      console.log(`stopped to listen for sequence ${index}`);
      if (index < 2) { // end the recursion at some point
        recursiveLargeQuery(++index);
      }
    });
}

recursiveLargeQuery(0)

https://stackblitz.com/edit/rxjs-ihxkax?file=index.ts

1
fridoo 27 août 2020 à 12:06