RxJS - Creating Observables

create

  • Rx.Observable.create

    function (subscribe, parent) {
    return new AnonymousObservable(subscribe, parent);
    }

      var observable = Rx.Observable.create(function(observer) {
    observer.onNext('Simon');
    observer.onNext('Jen');
    observer.onNext('Sergi');
    observer.onCompleted(); // We are done
    });

    create 里参数是个 Observer
    Observer 定义了如果发出参数. 当subscribe的时候他会发出3个string后结束

Observer listen Observable, 当Observable 触发的时候回执行所有Observers相应的方法

相当于create 里的observer参数代表的是所有subscribe这个Observable的Observer,
类似于指针template
当触发的时候先走每个Observer的 3次onNext,然后触发每个Observer的onCompleted

Observers have three methods: onNext, onCompleted, and onError

单独创建observer,就是实现它的3个方法

  var observer = Rx.Observer.create(
function onNext(x) { console.log('Next: ' + x); },
function onError(err) { console.log('Error: ' + err); },
function onCompleted() { console.log('Completed'); }
);

使用observer

var observable = Rx.Observable.create(function(observer) {
//定义如何执行发送值
observer.onNext('Simon');
observer.onNext('Jen');
observer.onNext('Sergi');
observer.onCompleted(); // We are done
});


var observer = Rx.Observer.create(
//如何处理每一句执行
function onNext(x) { console.log('Next: ' + x); },
function onError(err) { console.log('Error: ' + err); },
function onCompleted() { console.log('Completed'); }
);

//执行
observable.subscribe(observer);

可以不单独定义Observer,简写到subscribe里

test.subscribe(
function onNext(x) { console.log('Result: ' + x); },
function onError(err) { console.log('Error: ' + err); },
function onCompleted() { console.log('Completed'); }
);

针对特定需求的Observable用create,但RxJS里有一些现成的


针对DOM事件流和 ajax请求的有个单独的lib,帮助创建observable
https://github.com/Reactive-Extensions/RxJS-DOM


observable的主要对象

arrays

events

and

callbacks

from

任何可以迭代的 类数组的参数都可以用from

Rx.Observable
.from(['Adrià', 'Jen', 'Sergi'])
.subscribe(
function(x) { console.log('Next: ' + x); },
function(err) { console.log('Error:', err); },
function() { console.log('Completed'); }
);

fromEvent

transform an event into an Observable


var allMoves = Rx.Observable.fromEvent(document, 'mousemove');
allMoves.subscribe(function(e) {
console.log(e.clientX, e.clientY);
});

流之间的转化,创建只是开始,不断转化才是函数编程的特色

var movesOnTheRight = allMoves.filter(function(e) {
return e.clientX > window.innerWidth / 2;
});
var movesOnTheLeft = allMoves.filter(function(e) {
return e.clientX < window.innerWidth / 2;
});
movesOnTheRight.subscribe(function(e) {
console.log('Mouse is on the right:', e.clientX);
});
movesOnTheLeft.subscribe(function(e) {
console.log('Mouse is on the left:', e.clientX);
});

fromCallback fromNodeCallback

异步流

Node.js follows
the convention of always invoking the callback function with an error argument
Chapter 1. The Reactive Way • 14
report erratum • discuss
first to signal to the callback function that there was a problem.

var Rx = require('rx'); // Load RxJS
var fs = require('fs'); // Load Node.js Filesystem module
// Create an Observable from the readdir method
var readdir = Rx.Observable.fromNodeCallback(fs.readdir);
// Send a delayed message
var source = readdir('/Users/sergi');
var subscription = source.subscribe(
function(res) { console.log('List of directories: ' + res); },
function(err) { console.log('Error: ' + err); },
function() { console.log('Done!'); });