Observable
Observable instance's then, thenOnce, thenImmediate, pipe methods return Observable instances
value
Type
typescriptvalue: T | undefined
Details
Current node's data
status
Type
typescriptenum PromiseStatus { PENDING = 'pending', RESOLVED = 'resolved', REJECTED = 'rejected', } status: PromiseStatus | null
Details
Current node's status, generally in pending, fulfilled, rejected states. When the stream hasn't passed through this node or the node has been unsubscribed, the status is null
then
Type
typescripttype OnFulfilled<T> = Parameters<Promise<T>['then']>[0] type OnRejected<T> = Parameters<Promise<T>['catch']>[0] then<T>( onFulfilled: OnFulfilled<T>, onRejected?: OnRejected<unknown>, ): Observable
Details
then subscriber, usage consistent with promise, returns the subscribed node's Observable instance
Example
typescriptimport { $ } from 'fluth' const promise$ = $('1') const observable$ = promise$.then((value) => Number(value)) // Automatically infers observable.value type as number
thenSet
Type
typescript$then(setter: (value: T) => void | Promise<void>): Observable<T extends PromiseLike<infer V> ? V : T, E> & E;
Details
thenSet subscriber, unlike then subscriber, thenSet subscriber can only perform immutable operations on data and cannot handle reject errors from previous node, returns the subscribed node's Observable instance.
Example
typescriptimport { $ } from 'fluth' const promise$ = $<{ a: number; b: { c: number } }>() const observable$ = promise$.$then((value) => { value.a = value.a + 1 }) promise$.next({ a: 1, b: { c: 1 } }) // observable$.value === { a: 2, b: { c: 1 } } promise$.value.b === observable$.value.b // true
thenOnce
Type
typescripttype OnFulfilled<T> = Parameters<Promise<T>['then']>[0] type OnRejected<T> = Parameters<Promise<T>['catch']>[0] thenOnce<T>( onFulfilled: OnFulfilled<T>, onRejected?: OnRejected<unknown>, ): Observable
Details
thenOnce differs from then method in that once the subscribed node executes, it will automatically unsubscribe.
Example
typescriptimport { $ } from 'fluth' const promise$ = $('1') const observable$ = promise$.thenOnce((value) => console.log(value)) promise$.next('2') // Output 2 promise$.next('3') // Won't output 3
thenOnceSet
- TypetypescriptthenOnceSet differs from thenSet method in that once the subscribed node executes, it will automatically unsubscribe.
$thenOnce(setter: (value: T) => void | Promise<void>): Observable<T extends PromiseLike<infer V>? V : T, E> & E;
thenImmediate
Type
typescripttype OnFulfilled<T> = Parameters<Promise<T>['then']>[0] type OnRejected<T> = Parameters<Promise<T>['catch']>[0] thenImmediate<T>( onFulfilled: OnFulfilled<T>, onRejected?: OnRejected<unknown>, ): Observable
Details
thenImmediate differs from then method in that:
- If parent node is Stream instance and has initial value, using thenImmediate will immediately trigger subscribed child node's execute
- If parent subscribed node is Observable and has executed, using thenImmediate will immediately trigger subscribed child node's execute
Example
typescriptimport { $ } from 'fluth' const promise$ = $('1') const observable$ = promise$.thenImmediate((value) => console.log(value)) // Output 1
thenImmediateSet
- TypetypescriptthenImmediateSet differs from thenSet method in that if parent subscribed node has executed, using thenImmediateSet will immediately trigger subscribed child node's execute.
$thenImmediate(setter: (value: T) => void | Promise<void>): Observable<T extends PromiseLike<infer V>? V : T, E> & E;
catch
Type
typescripttype OnRejected<T> = Parameters<Promise<T>['catch']>[0] catch(onRejected: OnRejected<unknown>): Observable
Details
Catch for subscribed node, usage consistent with promise, returns the subscribed node's Observable instance.
Example
typescriptimport { $ } from 'fluth' const promise$ = $() observable$ = promise$.catch((error) => { console.log(error) }) promise$.next(Promise.reject('error')) // Output error
finally
Type
typescripttype OnFinally<T> = Parameters<Promise<T>['finally']>[0] finally(onFinally: OnFinally<unknown>): Observable
Details
Finally for subscribed node, usage consistent with promise, returns the subscribed node's Observable instance
Example
typescriptimport { $ } from 'fluth' const promise$ = $() observable$ = promise$.finally(() => { console.log('finally') }) promise$.next(1) // Output finally
pipe
Type
typescriptpipe(operator: Operator): Observable
Details
Pipe for subscribed node, pipe method can chain multiple operators, returns an Observable instance
Example
typescriptimport { $, delay } from 'fluth' const promise$ = $() promise$.pipe(delay(1000)).then((value) => { console.log(value) })
use
Type
plugin
typetypescripttype thenPlugin = (unsubscribe: () => void) => void type executePlugin = <T>(params: { result: Promise<T> | T set: (setter: (value: T) => Promise<void> | void) => Promise<T> | T root: boolean onfulfilled?: OnFulfilled onrejected?: OnRejected unsubscribe: () => void }) => Promise<any> | any type plugin: { then?: thenPlugin | thenPlugin[] thenAll?: thenPlugin | thenPlugin[] execute?: executePlugin | executePlugin[] executeAll?: executePlugin | executePlugin[] }
use
typetypescriptuse<P extends Plugin>(plugin: P): Stream<T, I, E & ChainReturn<P, T, E>> & E & ChainReturn<P, T, E>;
Details
Calling use can use four types of plugins: then plugin, execute plugin, thenAll plugin, executeAll plugin:
- then plugins execute when then method is called. They take the current node's unsubscribe function as parameter, can implement unified unsubscribe functionality.
- execute plugins execute when execute method is called. They take the current node's execution result, set function that can generate immutable data, current node's unsubscribe function as parameters, returned promise will be passed to next execute plugin, finally returned promise data will be passed to next then node.
- thenAll plugins trigger on then operations of root stream and all its child nodes, can only be used on root stream, child nodes cannot use.
- executeAll plugins trigger on execute operations of root stream and all its child nodes, can only be used on root stream, child nodes cannot use.
Example
typescriptimport { $, delay } from 'fluth' const promise$ = $('1').use(delay) promise$.delay(1000).then((value) => { console.log(value) }) promise$.next('2') // Output 2 after 1s
remove
Type
typescriptinterface PluginParams { then?: thenPlugin | thenPlugin[]; thenAll?: thenPlugin | thenPlugin[]; execute?: executePlugin | executePlugin[]; executeAll?: executePlugin | executePlugin[]; } remove(plugin: PluginParams | PluginParams[]): void;
Details
Remove specified plugin, plugin can be then, execute, thenAll, executeAll plugin
Example
typescriptimport { $, console } from 'fluth' const promise$ = $('1').use(console) promise$.next('2') // Output 2 promise$.remove(console) promise$.next('3') // Won't output 3
execute
Type
typescriptexecute: () => void
Details
Manually execute current node, data uses the last data that flowed through this node, if node has never executed, it won't execute.
WARNING
Execute current node, nodes after current node's then will also execute, equivalent to pushing old data of current node at current node
Example
typescriptimport { $ } from 'fluth' const promise$ = $(1) const observable$ = promise$.then((value) => value + 1) observable$.then((value) => console.log(value + 1)) observable$.execute() // No output promise$.next(1) // Output 3 observable$.execute() // Output 3
unsubscribe
Type
typescriptunsubscribe(): void
Details
Unsubscribe node, unlike promise's inability to cancel, stream's subscription can be cancelled anytime
Warning
Unsubscribe current node, all nodes after current node's then will also be unsubscribed
Example
typescriptimport { $ } from 'fluth' const promise$ = $(1) const observable$ = promise$.then((value) => value + 1) observable$.then((value) => console.log(value + 1)) promise$.next(2) // Output 2 observable$.unsubscribe() promise$.next(3) // No output
afterUnsubscribe
Type
typescriptafterUnsubscribe(callback: () => void): void
Details
Set callback function for node unsubscription
Example
typescriptimport { $ } from 'fluth' const promise$ = $(1) const observable$ = promise$.then((value) => value + 1) observable$.afterUnsubscribe(() => { console.log('unsubscribe') }) observable$.unsubscribe() // Output unsubscribe
offUnsubscribe
Type
typescriptoffUnsubscribe(callback: () => void): void
Details
Cancel callback function set through afterUnsubscribe
afterComplete
Type
typescriptafterComplete(callback: (value: T, status: PromiseStatus) => void): void;
Details
Callback function triggered after stream ends, will trigger earlier than subscribed node's automatic unsubscribe
Example
typescriptimport { $ } from 'fluth' const promise$ = $(1) const observable$ = promise$.then((value) => console.log(value)) observable$.afterComplete(() => console.log('complete')) observable$.afterUnsubscribe(() => console.log('unsubscribe')) promise$.next(2, true) // Output 2 complete unsubscribe
offComplete
Type
typescriptoffComplete(callback: (value: T, status: PromiseStatus) => void): void
Details
Cancel callback function set through afterComplete
afterSetValue
Type
typescriptafterSetValue(callback: (value: T) => void)
Details Callback function triggered after observable node modifies node value
Example
typescriptimport { $ } from 'fluth' const promise$ = $(1) promise$.afterSetValue((value) => console.log(value)) promise$.next(2) // Output 2
offAfterSetValue
Type
typescriptoffAfterSetValue(callback: (value: T) => void): void
Details
Cancel callback function set through afterSetValue