J'appelle une API et je reçois un tableau de résultats, je vérifie la pagination et si plus de pages existent, j'appelle la page suivante, répétez jusqu'à ce qu'il n'y ait plus de pages.

Pour chaque tableau de résultats, j'appelle un autre point de terminaison et fais exactement la même chose: je reçois un tableau de résultats, recherche une autre page et appelle à nouveau le point de terminaison. Laver, rincer à répétition.

Par exemple:

Je veux saisir une liste de pays qui pourraient être une réponse paginée, puis pour chaque pays, je veux saisir une liste de villes, qui pourraient également être paginées. Et pour chaque ville, j'exécute un ensemble de transformations, puis je stocke dans une base de données.

J'ai déjà essayé cela, mais je suis resté coincé:


const grabCountries = Observable.create(async (observer) => {
    const url = 'http://api.com/countries'
    let cursor = url
    do {

        const results = fetch(cursor)

        // results = { 
        //   data: [ 'Canada', 'France', 'Spain' ],
        //   next: '47asd8f76358df8f4058898fd8fab'
        // }

        results.data.forEach(country => { observer.next(country) })

        cursor = results.next ? `${url}/${results.next}` : undefined

    } while(cursor)

})


const getCities = {
    next: (country) => {
        const url = 'http://api.com/cities'
        let cursor = url
        do {

            const results = fetch(cursor)

            // results = {
            //     data: [ 
            //         'Montreal', 'Toronto', 
            //         'Paris', 'Marseilles', 
            //         'Barcelona', 'Madrid' 
            //     ],
            //     next: '89ghjg98nd8g8sdfg98gs9h868hfoig'
            // }

            results.data.forEach(city => { 
                `**** What do I do here?? ****` 
            })

            cursor = results.next ? `${url}/${results.next}` : undefined

        } while(cursor)
    }
}

J'ai essayé quelques approches:

Faire un sujet (parfois je devrai faire un traitement parallèle basé sur les résultats de 'grabCountries'. Par exemple, je peux vouloir stocker les pays dans une base de données en parallèle avec la capture des villes.)

const intermediateSubject = new Subject()

intermediateSubject.subscribe(storeCountriesInDatabase)
intermediateSubject.subscribe(getCities)

J'ai également essayé la tuyauterie et le mappage, mais il semble que ce soit essentiellement la même chose.

Alors que j'écrivais ceci, j'ai pensé à cette solution et elle semble fonctionner correctement, je voudrais juste savoir si je complique les choses. Il peut y avoir des cas où je dois faire plus que quelques appels API consécutifs. (Imaginez, les pays => les États => les villes => les boulangeries => les critiques => les commentaires => les réponses).

Voilà donc ce que j'ai maintenant essentiellement:

// grabCountries stays the same as above, but the rest is as follows:

const grabCities = (country) =>
  Observable.create(async (observer) => {
    const url = `http://api.com/${country}/cities`
      let cursor = url
      do {
       const results = fetch(cursor)

       // results = {
       //     data: [
       //         'Montreal', 'Toronto',
       //         'Paris', 'Marseilles',
       //         'Barcelona', 'Madrid'
       //     ],
       //     next: '89ghjg98nd8g8sdfg98gs9h868hfoig'
       // }

       results.data.forEach(city => {
         observer.next(city)
       })

    cursor = results.next ? `${url}/${results.next}` : undefined

    } while (cursor)
})

const multiCaster = new Subject()

grabCountries.subscribe(multiCaster)
multiCaster.pipe(map((country) => {
    grabCities(country).pipe(map(saveCityToDB)).subscribe()
})).subscribe()
multiCaster.pipe(map(saveCountryToDB)).subscribe()

Tl; dr - J'appelle une API qui reçoit un ensemble de résultats paginé dans un tableau et j'ai besoin de mapper chaque élément et d'appeler une autre API qui reçoit un autre ensemble de résultats paginé, chaque ensemble étant également dans un tableau.

Est-ce que l'imbrication d'un observable à l'intérieur d'un autre et la cartographie à travers les résultats via 'callApiForCountries.pipe (map (forEachCountryCallApiForCities))' sont la meilleure méthode ou avez-vous d'autres recommandations?

3
tim.breeding 27 janv. 2019 à 20:49

3 réponses

Meilleure réponse

OK, j'ai donc dépensé beaucoup de puissance cérébrale à ce sujet et j'ai trouvé deux solutions qui semblent fonctionner.

const nestedFlow = () => {
	fetchAccountIDs.pipe(map(accountIds => {
		getAccountPostIDs(accountIds) // Has the do loop for paging inside
			.pipe(
				map(fetchPostDetails),
				map(mapToDBFormat),
				map(storeInDB)
			).subscribe()
	})).subscribe()
}


const expandedflow = () => {
	fetchAccountIDs.subscribe((accountId) => {
		// accountId { accountId: '345367geg55sy'}
		getAccountPostIDs(accountId).pipe(
			expand((results) => {
				/*
				results : {
					postIDs: [
						131424234,
						247345345,
					],
					cursor: '374fg8v0ggfgt94',
				}
				*/
				const { postIDs, cursor } = results
				if (cursor) return getAccountPostIDs({...accountId, cursor})
				return { postIDs, cursor }
			}),
			takeWhile(hasCursor, true), // recurs until cursor is undefined
			concatMap(data => data.postIDs), 
			map(data => ({ post_id: data })), 
			map(fetchPostDetails), 
			map(mapToDBFormat), 
			map(storeInDB) 
		).subscribe()
	})
}

Les deux semblent fonctionner avec des performances similaires. J'ai lu un peu où laisser le flux de données est une mauvaise pratique et vous devriez tout diriger, mais je ne sais pas comment éliminer la première sortie dans le 'ExpandFlow' parce que le 'expand' doit rappeler un observable, mais peut-être ça peut être fait.

Il ne me reste plus qu'à résoudre les problèmes de conditions de concurrence entre le moment où le 'complete' est appelé dans getAccountPostIDs et le dernier enregistrement est stocké dans la base de données. Actuellement dans mon test, le observer.complete se termine avant 3 des actions upsert.

Tous les commentaires sont appréciés et j'espère que cela aidera quelqu'un à l'avenir.

1
tim.breeding 4 févr. 2019 à 06:36

Vous avez besoin de l 'expand opérateur. Il se comporte de manière récursive et correspond donc à l'idée d'avoir des résultats paginés.

0
Dzhavat Ushev 27 janv. 2019 à 21:51

Voici le code qui devrait fonctionner avec l'exploration séquentielle de la prochaine URL. Vous commencez par un {next: url} jusqu'à ce que res.next ne soit pas disponible.

of({next:http://api.com/cities}).pipe(
    expand(res=>results.next ? `${url}/${results.next}` : undefined
    takeWhile(res=>res.next!==undefined)
).subscribe()
1
Fan Cheung 28 janv. 2019 à 06:17