2 回答

TA貢獻1801條經驗 獲得超16個贊
在 sagas(生成器)和 epics(observables)之間,重要的是要改變您對事件如何到達您的代碼的看法。
生成器滿足迭代器和可迭代協議,這涉及從源中提取 值/事件(在本例中為 Redux 操作),并在這些事件到達之前阻止執行。
Observables 是推而不是拉。我們描述和命名我們感興趣的事件流,然后訂閱它們。沒有阻塞調用,因為我們所有的代碼都是由事件發生時觸發的。
此代碼復制了 saga 示例中的行為。
import { interval, timer } from 'rxjs';
import { withLatestFrom, mapTo, exhaustMap, takeUntil } from 'rxjs/operators';
import { ofType } from 'redux-observable';
const myEpic = (action$, state$) => {
// A stream of all the "cancel start" actions
const cancelStart$ = action$.pipe(ofType('CANCEL_START'));
// This observable will emit delayed start events that are not cancelled.
const delayedCancellableStarts$ = action$.pipe(
// When a start action occurs...
ofType('START'),
// Grab the latest start delay value from state...
withLatestFrom(state$, (_, { startDelay }) => startDelay),
exhaustMap(
// ...and emit an event after our delay, unless our cancel stream
// emits first, then do nothing until the next start event arrives.
// exhaustMap means we ignore all other start events while we handle
// this one.
(startDelay) => timer(startDelay).pipe(takeUntil(cancelStart$))
)
);
// On subscribe, emit a tick action every 10ms
const tick$ = interval(10).pipe(mapTo({ type: 'TICK' }));
// On subscribe, emit only STOP or RESET actions
const stopTick$ = action$.pipe(ofType('STOP', 'RESET'));
// When a start event arrives, start ticking until we get a message to
// stop. Ignore all start events until we stop ticking.
return delayedCancellableStarts$.pipe(
exhaustMap(() => tick$.pipe(takeUntil(stopTick$)))
);
};
重要的是,即使我們正在創建和命名這些可觀察的流,它們的行為也是惰性的——在訂閱之前它們都不會被“激活”,當你向redux-observable中間件提供這個史詩功能時就會發生這種情況。

TA貢獻1946條經驗 獲得超4個贊
我假設take()返回一個可觀察的,沒有測試代碼。它可能可以轉換為如下所示的 rx 時尚。
這里的關鍵是repeat()和takeUntil()
// outter condition for starting ticker
forkJoin(take('START'), select())
.pipe(
switchMap(([, startDelay]) =>
// inner looping ticker
timer(10).pipe(switchMap(_ => put({type: 'TICK'})), repeat(),
takeUntil(race(
take('CANCEL_START'),
delay(startDelay)
))
)
/////////////////////
)
)
添加回答
舉報