RxJS Observable:一个奇特的函数
<h2>前言</h2> <p>RxJS 的 Observable 有点难理解,其实 RxJS 相关的概念都有点难理解。毕竟 RxJS 引入了响应式编程这种新的模式,会不习惯是正常的。不过总得去理解嘛,而认识新的事物时,如果能够参照一个合适的已知事物比对着,会比较容易理解吧。对于 Observable,类比 JS 中的函数,还是比较好的。</p> <h2>开始</h2> <h3>封装</h3> <p>先来看一个普通函数调用的例子:</p> <pre> <code class="language-javascript">function foo() { console.log('process...') } foo() // 输出: // process...</code></pre> <p>很简单,函数 foo() 封装了一段逻辑(这里只是向控制台输出),然后通过调用函数,函数执行内部的逻辑。</p> <p>再来看 RxJS Observable 的一个例子:</p> <pre> <code class="language-javascript">var foo = Rx.Observable.create(() => { console.log('process...') }) foo.subscribe() // 输出: // process...</code></pre> <p>上例中,通过 Rx.Observable.create() 来创建 Observable 对象,将同样将一段代码逻辑封装到 Observable 对象 foo 中,然后通过 foo.subscribe() 来执行封装的代码逻辑。</p> <p>对于普通函数和 Observable 对象,封装的代码逻辑在每次调用时都会重新执行一次。从这一点来看,Observable 能够和普通函数一样实现封装代码进行复用。</p> <h3>返回值</h3> <p>函数调用后可以有返回值:</p> <pre> <code class="language-javascript">function foo() { console.log('process...') return 42 } console.log(foo()) // 输出: // process... // 42</code></pre> <p>Observable 执行后也会产生值,不过和函数直接返回的方式不同,要通过回调函数方式获取:</p> <pre> <code class="language-javascript">var foo = Rx.Observable.create((observer) => { console.log('process...') observer.next(42) }) foo.subscribe(value => console.log(value)) // 输出: // process... // 42</code></pre> <p>Observable 对象内部是通过 observer.next(42) 这种方式返回值,而调用方则通过回调函数来接收返回的数据。形式上比普通函数直接返回值啰嗦一些。</p> <p>从调用方的角度来看,两个过程分别是:</p> <ul> <li>普通函数: <strong>调用</strong> > 执行逻辑 > 返回数据</li> <li>Observable: <strong>订阅(subscribe)</strong> > 执行逻辑 > 返回数据</li> </ul> <p>从获取返回值方式来看,调用函数是一种直接获取数据的模式,从函数那里“拿”(pull)数据;而 Observable 订阅后,是要由 Observable 通过间接调用回调函数的方式,将数据“推”(push)给调用方。</p> <p>这里 pull 和 push 的重要区别在于,push 模式下,Observable 可以决定什么时候返回值,以及返回几个值(即调用回调函数的次数)。</p> <pre> <code class="language-javascript">var foo = Rx.Observable.create((observer) => { console.log('process...') observer.next(1) setTimeout(() => observer.next(2), 1000) }) console.log('before') foo.subscribe(value => console.log(value)) console.log('after') // 输出: // before // process... // 1 // after // 2</code></pre> <p>上面例子中,Observable 返回了两个值,第1个值同步返回,第2个值则是过了1秒后异步返回。</p> <p>也就是说,从返回值来说,Observable 相比普通函数区别在于:</p> <ul> <li>可以返回多个值</li> <li>可以异步返回值</li> </ul> <h3>异常处理</h3> <p>函数执行可能出现异常情况,例如:</p> <pre> <code class="language-javascript">function foo() { console.log('process...') throw new Error('BUG!') }</code></pre> <p>我们可以捕获到异常状态进行处理:</p> <pre> <code class="language-javascript">try { foo() } catch(e) { console.log('error: ' + e) }</code></pre> <p>对于 Observable,也有错误处理的机制:</p> <pre> <code class="language-javascript">var foo = Rx.Observable.create((observer) => { console.log('process...') observer.error(new Error('BUG!')) }) foo.subscribe( value => console.log(value), e => console.log('error: ' + e) )</code></pre> <p>Observable 的 subscribe() 方法支持传入额外的回调函数,用于处理异常情况。和函数执行类似,出现错误之后,Observable 就不再继续返回数据了。</p> <p>subscribe() 方法还支持另一种形式传入回调函数:</p> <pre> <code class="language-javascript">foo.subscribe({ next(value) { console.log(value) }, error(e) { console.log('error: ' + e) } })</code></pre> <p>而这种形式下,传入的对象和 Observable 内部执行函数中的 observer 参数在形式上就比较一致了。</p> <h3>中止执行</h3> <p>Observable 内部的逻辑可以异步多个返回值,甚至返回无数个值:</p> <pre> <code class="language-javascript">var foo = Rx.Observable.create((observer) => { let i = 0 setInterval(() => observer.next(i++), 1000) }) foo.subscribe(i => console.log(i)) // 输出: // 0 // 1 // 2 // ...</code></pre> <p>上面例子中,Observable 对象每隔 1 秒会返回一个值给调用方。即使调用方不再需要数据,仍旧会继续通过回调函数向调用推送数据。</p> <p>RxJS 提供了中止 Observable 执行的机制:</p> <pre> <code class="language-javascript">var foo = Rx.Observable.create((observer) => { console.log('start') let i = 0 let timer = setInterval(() => observer.next(i++), 1000) return () => { clearInterval(timer) console.log('end') } }) var subscription = foo.subscribe(i => console.log(i)) setTimeout(() => subscription.unsubscribe(), 2500) // 输出: // start // 0 // 1 // 2 // end</code></pre> <p>subscribe() 方法返回一个订阅对象(subscription),该对象上的 unsubscribe() 方法用于取消订阅,也就是中止 Observable 内部逻辑的执行,停止返回新的数据。</p> <p>对于具体的 Observable 对象是如何中止执行,则要由 Observable 在执行后返回一个用于中止执行的函数,像上面例子中的这种方式。</p> <p>Observable 执行结束后,会触发观察者的 complete 回调,所以可以这样:</p> <pre> <code class="language-javascript">foo.subscribe({ next(value) { console.log(value) }, complete() { console.log('completed') } })</code></pre> <p>Observable 的观察者共有上面三种回调:</p> <ul> <li>next:获得数据</li> <li>error:处理异常</li> <li>complete:执行结束</li> </ul> <p>其中 next 可以被多次调用,error 和 complete 最多只有一个被调用一次(任意一个被调用后不再触发其他回调)。</p> <h3>数据转换</h3> <p>对于函数返回值,有时候我们要转换后再使用,例如:</p> <pre> <code class="language-javascript">function foo() { return 1 } console.log(f00() * 2) // 输出: // 2</code></pre> <p>对于 Observable 返回的值,也会有类似的情况,不过通常采用下面的方式:</p> <pre> <code class="language-javascript">var foo = Rx.Observable.create((observer) => { let i = 0 setInterval(() => observer.next(i++), 1000) }) foo.map(i => i * 2).subscribe(i => console.log(i)) // 输出: // 0 // 2 // 4 // ...</code></pre> <p>其实 foo.map() 返回了新的 Observable 对象,上面代码等价于:</p> <pre> <code class="language-javascript">var foo2 = foo.map(i => i * 2) foo2.subscribe(i => console.log(i))</code></pre> <p>Observable 对象 foo2 被订阅时执行的内部逻辑可以简单视为:</p> <pre> <code class="language-javascript">function subscribe(observer) { let mapFn = v => v * 2 foo.subscribe(v => { observer.next(mapFn(v)) }) }</code></pre> <p>将这种对数据的处理和数组进行比较看看:</p> <pre> <code class="language-javascript">var array = [0, 1, 2, 3, 4, 5] array.map(i => i * 2).forEach(i => console.log(i))</code></pre> <p>是不是有点像?</p> <p>除了 map() 方法,Observable 还提供了多种转换方法,如 filter() 用于过滤数据,find() 值返回第一个满足条件的数据,reduce() 对数据进行累积处理,在执行结束后返回最终的数据。这些方法和数组方法功能是类似的,只不过是对异步返回的数据进行处理。还有一些转换方法更加强大,例如可以 debounceTime() 可以在时间维度上对数据进行拦截等等。</p> <p>Observable 的转换方法,本质不过是创建了一个新的 Observable,新的 Observable 基于一定的逻辑对原 Observable 的返回值进行转换处理,然后再推送给观察者。</p> <h2>总结</h2> <p>Observable 就是一个奇怪的函数,它有和函数类似的东西,例如封装了一段逻辑,每次调用时都会重新执行逻辑,执行有返回数据等;也有更特殊的特性,例如数据是推送(push)的方式返回给调用方法,返回值可以是异步,可以返回多个值等。</p> <p>不过将 Observable 视作特殊函数,至少对于理解 Observable 上是比较有帮助的。</p> <p>Observable 也被视为 data stream(数据流),这是从 Observable 可以返回多个值的角度来看的,而数据转换则是基于当前数据流创建新的数据流,例如:</p> <p><img src="https://simg.open-open.com/show/d194f74ecadfc2e3580b2e9a37213ea8.png"></p> <p>observable.map(x => 10 * x)</p> <p>不过上图看到的只是数据,而将 Observable 视为特殊函数时,不应该忘了其内部逻辑,不然数据是怎么产生的呢。</p> <p> </p> <p>来自:http://www.jianshu.com/p/b05aa5d152c8</p> <p> </p>
本文由用户 EthFritzsch 自行上传分享,仅供网友学习交流。所有权归原作者,若您的权利被侵害,请联系管理员。
转载本站原创文章,请注明出处,并保留原始链接、图片水印。
本站是一个以用户分享为主的开源技术平台,欢迎各类分享!