Rxjs - Operators - forEach

这个forEach 不完全算一个operator

Signature

/**
* @method forEach
* @param {Function} next a handler for each value emitted by the observable
* @param {PromiseConstructor} [PromiseCtor] a constructor function used to instantiate the Promise
* @return {Promise} a promise that either resolves on observable completion or
* rejects with the handled error
*/
forEach(next, PromiseCtor)

接受两个参数:

  • 第一个是funtion,forEach的流会订阅后,把value 给这个function当参数,并执行.
  • 第二个参数 是要给一个Promise的构造,让他知道如何new 一个promise

Source Code

forEach(next, PromiseCtor) {
if (!PromiseCtor) {
if (root.Rx && root.Rx.config && root.Rx.config.Promise) {
PromiseCtor = root.Rx.config.Promise;
}
else if (root.Promise) {
PromiseCtor = root.Promise;
}
}
if (!PromiseCtor) {
throw new Error('no Promise impl found');
}
return new PromiseCtor((resolve, reject) => {
const subscription = this.subscribe((value) => {
if (subscription) {
// if there is a subscription, then we can surmise
// the next handling is asynchronous. Any errors thrown
// need to be rejected explicitly and unsubscribe must be
// called manually
try {
next(value);
}
catch (err) {
reject(err);
subscription.unsubscribe();
}
}
else {
// if there is NO subscription, then we're getting a nexted
// value synchronously during subscription. We can just call it.
// If it errors, Observable's `subscribe` imple will ensure the
// unsubscription logic is called, then synchronously rethrow the error.
// After that, Promise will trap the error and send it
// down the rejection path.
next(value);
}
}, reject, resolve);
});
}

重点在他new 的promise里
他判断是否是异步流,如果是就把值给参数方法执行,有erro了就执行reject,取消订阅
如果是forEach的调用者是不是异步流就执行(当数组的forEach用了)
onError的时候调用reject, complete的时候调用resolve,因为他返回的是 promise

var button = $("button");
var clickStream = Rx.Observable.fromEvent(button, 'click').take(5);
clickStream.map(function () {
return 1;
}).scan(function (a, b) {
return a + b;
}).forEach(function (x) {
console.log(x);
// throw new Error("haha");
$("span").html(x);
}).then(_ => console.log("complete")).catch(e => {
console.log("catch the errror", e);
})

看例子:

  • 直接执行他会走then,因为他complete调用了resolve.所以5次后打印”complete”
  • 打开throw error的注释, 他会打印 “catch the error”,因为onError 调用 reject走到catch里了

forEach相当于:

  • 内部订阅并执行某个方法(提供的next)
  • 返回promise方便处理error 和complete