Skip to content

快速开始

安装

bash
# npm
npm install fluth

# yarn
yarn add fluth

# pnpm
pnpm add fluth

使用

第一步:创建和订阅流

创建空流

typescript
import { $ } from 'fluth'

// 创建一个空流
const stream$ = $()

// 订阅流的数据变化
stream$.then((data) => {
  console.log('接收到数据:', data)
})

// 推送数据
stream$.next('第一条消息') // 输出:接收到数据: 第一条消息
stream$.next('第二条消息') // 输出:接收到数据: 第二条消息

创建带初始值的流

typescript
// 创建带初始值的流
const stream$ = $('初始值')

// 立即触发订阅(因为有初始值)
stream$.thenImmediate((data) => {
  console.log('接收到数据:', data) // 输出: 初始值
}) // 输出:接收到数据: 初始值

// 继续推送新数据
stream$.next('新数据') // 输出:接收到数据: 新数据

第二步:链式订阅

和 Promise 一样,fluth 支持链式操作:

typescript
import { $ } from 'fluth'

const stream$ = $()

// 链式处理数据
stream$
  .then((data) => data.toUpperCase()) // 转大写
  .then((data) => `[${data}]`) // 添加括号
  .then((data) => {
    console.log(data)
  })

stream$.next('hello') // 输出: [HELLO]
stream$.next('world') // 输出: [WORLD]

第三步:推流

推流既可以采用 next 方法,也可以采用 set 方法,两者的区别在于:

  • next 方法会直接推送新值,适合简单数据类型
  • set 方法会自动创建不可变对象,适合复杂对象,自动处理深拷贝

使用 next 推送新数据

typescript
const stream$ = $(0)

stream$.then((value) => {
  console.log('当前值:', value)
})

stream$.next(1) // 输出: 当前值: 1
stream$.next(2) // 输出: 当前值: 2

使用 set 进行不可变更新

typescript
const stream$ = $({ key1: { key11: 'test' }, key2: { key22: 'test' } })
const oldValue = stream$.value

stream$.set((state) => {
  state.key2.key22 = 'test2' // 直接修改,fluth 会创建新的不可变对象
})

// 验证不可变性
console.log(oldValue === stream$.value) // false - 根对象引用已改变
console.log(oldValue?.key2 === stream$.value?.key2) // false - 修改的对象引用已改变
console.log(oldValue?.key1 === stream$.value?.key1) // true - 未修改的对象引用保持不变

第四步: 部分订阅

typescript
import { $, change } from 'fluth'

const stream$ = $({ key1: { key11: 'test' }, key2: { key22: 'test' } })

stream$.pipe(change((state) => state.key2)).then((data) => {
  console.log('key2发生变化')
})

stream$.set((state) => {
  state.key1.key11 = 'test1'
}) // 没有输出

stream$.set((state) => {
  state.key2.key22 = 'test2'
}) // 输出:key2发生变化

第五步:条件订阅

typescript
import { $, filter } from 'fluth'
const stream$ = $()

// 只处理偶数
stream$.pipe(filter((num) => num % 2 === 0)).then((evenNum) => {
  console.log('偶数:', evenNum)
})

stream$.next(1) // 不输出
stream$.next(2) // 输出: 偶数: 2
stream$.next(3) // 不输出
stream$.next(4) // 输出: 偶数: 4

第六步:流的组合

合并多个流的最新值

typescript
import { $, combine } from 'fluth'

const name$ = $('john')
const age$ = $(25)

// 合并两个流的最新值
const user$ = combine(name$, age$)

user$.then(([name, age]) => {
  console.log(`用户: ${name}, 年龄: ${age}`)
})

// 当且仅当所有流都推送过数据后,才会输出
name$.next('andy') // 不输出
age$.next(30) // 输出: 用户: andy, 年龄: 30
name$.next('lucy') // 输出: 用户: lucy, 年龄: 30
age$.next(31) // 输出: 用户: lucy, 年龄: 31

等待所有流完成

typescript
import { $, finish } from 'fluth'

const task1$ = $()
const task2$ = $()
const task3$ = $()

// 等待所有任务完成
const allTasks$ = finish(task1$, task2$, task3$)

allTasks$.then(([result1, result2, result3]) => {
  console.log('所有任务完成:', { result1, result2, result3 })
})

// 完成各个任务
task1$.next('任务1完成', true) // true 表示流结束
task2$.next('任务2完成', true)
task3$.next('任务3完成', true)
// 输出: 所有任务完成: { result1: "任务1完成", result2: "任务2完成", result3: "任务3完成" }

第七步:实际应用场景

用户输入防抖

typescript
import { $, throttle } from 'fluth'

const searchInput$ = $()

// 使用节流插件,300ms 内只处理最后一次输入
searchInput$.use(throttle(300)).then((keyword) => {
  console.log('搜索:', keyword)
  // 执行搜索逻辑
})

// 模拟用户快速输入
searchInput$.next('f')
searchInput$.next('fl')
searchInput$.next('flu')
searchInput$.next('fluth')
// 只会输出最后一次: 搜索: fluth

对象状态管理

typescript
import { $ } from 'fluth'

// 应用状态
const appState$ = $({
  user: null,
  loading: false,
  error: null,
})

// 监听用户状态变化
appState$
  .get((state) => state.user)
  .then((user) => {
    if (user) {
      console.log('用户已登录:', user.name)
    } else {
      console.log('用户未登录')
    }
  })

// 监听加载状态
appState$
  .get((state) => state.loading)
  .then((loading) => {
    console.log(loading ? '加载中...' : '加载完成')
  })

// 模拟登录流程
appState$.set((state) => {
  state.loading = true
})

setTimeout(() => {
  appState$.set((state) => {
    state.loading = false
    state.user = { name: 'john', id: 1 }
  })
}, 1000)

常见问题

如何取消订阅?

typescript
const stream$ = $()
const observable$ = stream$.then((data) => console.log(data))

// 取消订阅
observable$.unsubscribe()

如何处理错误?

typescript
const stream$ = $()

stream$.then(
  (data) => console.log('成功:', data),
  (error) => console.log('错误:', error)
)

stream$.next('正常数据')
stream$.next(Promise.reject('错误信息'))