import { EMPTY, from, of } from "rxjs";
import {
catchError,
delay,
delayWhen,
finalize,
map,
mergeMap,
retry,
tap,
toArray,
} from "rxjs/operators";
const fetchObservable = (data) => {
return from(
new Promise((resolve) => {
setTimeout(() => {
resolve({
data,
});
}, 300);
})
);
};
const concurrency = 5;
const fetchConcurrently$ = from(YOUR_DATA).pipe(
mergeMap((token) => {
return fetchObservable(YOUR_DATA).pipe(
map(({ data }) => data),
delay(1000),
retry(1),
catchError(() => EMPTY)
);
}, concurrency),
map((data) => {
return data.id;
}),
toArray(),
delayWhen(() => Promise.resolve()),
finalize(() => console.log("done"))
);
fetchConcurrently$.subscribe((ids) => console.log(ids));