Skip to content

基础概念

fluth中一共有两种流,分别是Stream流和Observable流,所谓的流就是一个可以被订阅的数据源,对Stream流进行订阅后的订阅节点就是Observable流。

StreamObservable的主要区别在于Stream流可以主动推流,而Observable流只能被动的推流或者重复上一次推流。

typescript
import { $ } from "fluth";

const promise$ = $();

const observable$ = promise$.then(xxx);

const observable1$ = observable$.then(xxx);

promise$.next(xxx); // 推送数据

推流

推流就是推送新的数据源给到订阅节点,推流分为主动推流和被动推流;主动推流指节点可以主动推送数据,被动推流指节点只能被动的接收数据处理后再将处理后的数据推流给订阅节点。

  • Stream可以使用next方法进行主动推流,所有订阅节点都能收到推送的数据
typescript
import { Stream } from "fluth";

const promise$ = new Stream();

promise$.then((data) => console.log(data));

promise$.next("hello"); // 打印 hello
  • 也可以使用set方法也可以进行数据的推送,和next方法的区别在于set方法推送的是基于上次数据的immutable数据。
typescript
import { $ } from "fluth";
const promise$ = $({ a: 1, b: { c: 2 } });
const oldValue = promise$.value;

promise$.set((state) => (state.a = 3));
const newValue = promise$.value;

console.log(oldValue === newValue); // false
console.log(oldValue.b === newValue.b); // true

执行

调用订阅节点的observableexecute方法重新执行上一次订阅的数据流,并推流至其所有子订阅节点。

订阅节点

fluth采用promise的形式进行数据流的推送,通过then$thenthenOnce$thenOncethenImmediate$thenImmediategetchangefilter等方法对流新增一个订阅节点,返回订阅节点Observable实例,整体使用和promise保持一致。

链式订阅

调用订阅节点的observable$then等方法进行链式订阅,类似promisethen链式方法。

部分订阅

调用订阅节点的observableget等方法进行部分订阅,只订阅get获取的那部分数据的变化。

条件订阅

只有节点满足条件才会进行推流,调用订阅节点的observablechangefilter方法可以进行条件订阅,只有满足条件节点才会进行推流,两者的区别在于:

  • change方法传入的是一个getter函数,分别传入上一次数据和当前数据并对返回值进行对比,如果有变化才会进行推流。
  • filter方法传入的是一个condition函数,传入的是当前数据,如果返回true才会进行推流。

取消订阅

调用订阅节点的observableunsubscribe方法取消订阅。

结束

只有Stream流才可以结束,结束意味着流不再推送新的数据。

typescript
import { Stream } from "fluth";

const promise$ = new Stream();

promise$.next(1, true); // true表示结束,最后一次推流

每个订阅节点在执行完最后这次数据推送后都会触发afterComplete回调,然后自动取消订阅所有的订阅者,触发所有子节点的afterUnsubscribe