Je m'enseigne actuellement la programmation réactive avec rxjs, et je me suis lancé le défi de créer un flux observable qui émettra toujours le même résultat à un abonné quoi qu'il arrive.

J'ai mémorisé la création d'un flux HTTP "GET" en fonction d'une URL spécifique, et j'essaie d'agir sur ce flux toutes les deux secondes, le résultat étant que pour chaque tick du minuteur, j'extraire un en cache / a mémorisé le résultat HTTP du flux d'origine.

import superagent from 'superagent';
import _ from 'lodash';

// Cached GET function, returning a stream that emits the HTTP response object
var httpget = _.memoize(function(url) {
  var req = superagent.get(url);
  req = req.end.bind(req);
  return Rx.Observable.fromNodeCallback(req)();
});

// Assume this is created externally and I only have access to response$
var response$ = httpget('/ontologies/acl.ttl');

// Every two seconds, emit the memoized HTTP response
Rx.Observable.timer(0, 2000)
  .map(() => response$)
  .flatMap($ => $)
  .subscribe(response => {
    console.log('Got the response!');
  });

J'étais sûr que je devrais coller un appel à replay() quelque part, mais quoi que je fasse, un nouvel appel HTTP est lancé toutes les deux secondes. Comment puis-je structurer cela afin de pouvoir construire une observable à partir d'une URL et qu'elle émette toujours le même résultat HTTP à tous les abonnés suivants?

MODIFIER
J'ai trouvé un moyen d'obtenir le résultat que je souhaite, mais j'ai l'impression qu'il me manque quelque chose et que je devrais pouvoir le refactoriser avec une approche beaucoup plus simplifiée:

var httpget = _.memoize(function(url) {
  var subject = new Rx.ReplaySubject();
  try {
    superagent.get(url).end((err, res) => {
      if(err) {
        subject.onError(err);
      }
      else {
        subject.onNext(res);
        subject.onCompleted();
      }
    });
  }
  catch(e) {
    subject.onError(e);
  }
  return subject.asObservable();
});
2
Nathan Ridley 17 juil. 2015 à 15:10

2 réponses

Meilleure réponse

Votre premier exemple de code est en fait plus proche de la façon de le faire

var httpget = _.memoize(function(url) {
  var req = superagent.get(url);
  return Rx.Observable.fromNodeCallback(req.end, req)();
});

Cependant, cela ne fonctionne pas car il semble y avoir un bogue dans fromNodeCallback. Pour contourner ce problème, je pense que vous recherchez en fait le AsyncSubject au lieu de ReplaySubject. Ce dernier fonctionne, mais le premier est conçu exactement pour ce scénario (et n'a pas la surcharge d'une création de tableau + les vérifications d'exécution du cache si cela vous intéresse).

var httpget = _.memoize(function(url) {

  var subject = new Rx.AsyncSubject();
  var req = superagent.get(url);
  Rx.Observable.fromNodeCallback(req.end, req)().subscribe(subject);
  return subject.asObservable();

});

Enfin, bien que map apprécie que vous y réfléchissiez, vous pouvez simplifier votre code timer en utilisant la surcharge flatMap qui prend directement un Observable:

Rx.Observable.timer(0, 2000)
  .flatMap($response)
  .subscribe(response => {
    console.log('Got the response');
  });
1
paulpdaniels 18 juil. 2015 à 01:03

À moins que je ne me trompe dans votre question, Observable.combineLatest fait exactement cela pour vous, il cache la dernière valeur émise de votre observable.

Ce code envoie la demande une fois, puis donne la même réponse en cache toutes les 200 ms:

import reqPromise from 'request-promise';
import {Observable} from 'rx';

let httpGet_ = (url) =>
      Observable
      .combineLatest(
        Observable.interval(200),
        reqPromise(url),
        (count, response) => response
      );

httpGet_('http://google.com/')
  .subscribe(
    x => console.log(x),
    e => console.error(e)
  );
0
channikhabra 28 juil. 2015 à 12:51