Stream
Stream
inherits from Observable
. In addition to the properties and methods of Observable
, the following methods are added:
next
Type
typescriptnext(payload: any, finishFlag?: boolean): void;
Details
- Pushes data to the current stream. When
payload
isPromise.reject(xxx)
, subsequentthen
behavior is consistent withpromise
'sthen
. - The second parameter indicates whether the current stream is finished. When set to
true
, subsequentset
andnext
will not execute, and after the stream completes each node, it will trigger the node'safterComplete
callback function, then automatically call the node'sunsubscribe
method.
- Pushes data to the current stream. When
Example
typescriptimport { $ } from "fluth"; const promise$ = $("1"); promise$.then((value) => { console.log(value); }); promise$.next("2"); // prints 2
set
Type
typescriptset(setter: (value: T) => void, finishFlag?: boolean): void;
Details
Pushes data to the current stream. The difference from
next
is thatset
accepts asetter
(which can be synchronous or asynchronous) and pushes a newimmutable
data. The second parameter indicates whether the current stream is finished. When set totrue
, subsequentset
andnext
will not execute.Example
typescriptimport { $ } from "fluth"; const promise$ = $({ a: 1, b: { c: 2 } }); const oldValue = promise$.value; promise$.then((value) => { console.log(value); }); promise$.set((value) => { value.a = 2; }); // prints { a: 1, b: { c: 3 } } const newValue = promise$.value; console.log(oldValue === newValue); // prints false console.log(oldValue.b === newValue.b); // prints true
use
Type
plugin
typetypescripttype thenPlugin = (unsubscribe: () => void) => void type ChainPluginFn<T extends Observable = Observable> = (observer: T) => Record<string, any> type executePlugin = <T>(params: { result: Promise<T> | T set: (setter: (value: T) => Promise<void> | void) => Promise<T> | T unsubscribe: () => void }) => Promise<any> | any type plugin: { then?: thenPluginFn | thenPluginFn[] execute?: executePlugin | executePlugin[] chain?: ChainPluginFn }
use
typetypescriptuse<P extends Plugin>(plugin: P): Stream<T, I, E & ChainReturn<P, T, E>> & E & ChainReturn<P, T, E>;
Details
Calling
use
allows you to use three types of plugins:then
plugins,execute
plugins, andchain
plugins:then
plugins are executed when the then method is called. They take the current node'sunsubscribe
function as a parameter and can implement unified unsubscription functionality.execute
plugins are executed when the execute method is called. They take the current node's execution result, aset
function that can generateimmutable
data, and the current node'sunsubscribe
function as parameters. The returnedpromise
will be passed to the nextexecute
plugin, and the final returnedpromise
data will be passed to the nextthen
node.chain
plugins can add new properties and methods to the current stream's chain operations.
Example
typescriptimport { $, delay } from "fluth"; const promise$ = $("1").use(delay); promise$.delay(1000).then((value) => { console.log(value); }); promise$.next("2"); // prints 2 after 1s
remove
Type
typescriptinterface ThenOrExecutePlugin { then?: thenPluginFn | thenPluginFn[]; execute?: executePlugin | executePlugin[]; } remove(plugin: ThenOrExecutePlugin | ThenOrExecutePlugin[]): void;
Details
Removes the specified
plugin
. Theplugin
can only be athen
plugin or anexecute
plugin.Example
typescriptimport { $, console } from "fluth"; const promise$ = $("1").use(console); promise$.next("2"); // prints 2 promise$.remove(console); promise$.next("3"); // doesn't print 3
pause
Type
typescriptpause: () => void
Details
Pauses the current stream. After executing the
pause
method, all subscribed nodes will not execute.Example
typescriptimport { $, console } from "fluth"; const promise$ = $("1"); promise$.then((value) => { console.log(value); }); promise$.next("2"); // prints 2 promise$.pause(); promise$.next("3"); // doesn't print 3
restart
Type
typescriptrestart: () => void
Details
Restarts the current stream. After executing the
restart
method, all subscribed nodes start accepting and executing stream pushes.Example
typescriptimport { $, console } from "fluth"; const promise$ = $("1"); promise$.then((value) => { console.log(value); }); promise$.pause(); promise$.next("2"); // doesn't print 2 promise$.restart(); promise$.next("3"); // prints 3