thenAllPlugin
thenAllPlugin 是一个在流链的所有节点完成后执行插件的工具函数。
类型定义
typescript
thenAllPlugin: <T>(
plugins: Plugin<T>[],
resolvePrefix?: string,
rejectPrefix?: string,
ignoreUndefined?: boolean
) => {
executeAll: ({
result,
status,
onfulfilled,
onrejected,
root,
}: {
result: Promise<any> | any
status: PromiseStatus | null
onfulfilled?: OnFulfilled
onrejected?: OnRejected
root: boolean
}) => Promise<any>
}
参数
plugins
(必需): 要执行的插件数组resolvePrefix
(可选): 成功时的控制台前缀,默认为'resolve'
rejectPrefix
(可选): 失败时的控制台前缀,默认为'reject'
ignoreUndefined
(可选): 是否忽略undefined
值的输出,默认为true
详细说明
- 在流链的所有节点完成后执行指定的插件数组
- 等待当前节点的结果(包括 Promise)完全解析后再执行插件
- 按照插件数组的顺序依次执行每个插件
- 插件执行的条件与其他插件相同:
- 根节点 (
root=true
) - 有成功处理器的节点 (
onfulfilled
) - 有错误处理器的节点 (
onrejected
) 且状态为REJECTED
- 根节点 (
- 返回原始的
result
,不修改数据流
示例
场景 1:流完成后的清理工作
typescript
import { $ } from 'fluth'
import { thenAllPlugin } from 'fluth'
const cleanupPlugin = () => ({
executeAll: ({ result }: any) => {
console.log('清理资源,最终结果:', result)
// 执行清理逻辑
return result
},
})
const logPlugin = () => ({
executeAll: ({ result }: any) => {
console.log('记录日志:', result)
return result
},
})
const plugins = [cleanupPlugin(), logPlugin()]
const stream$ = $().use(thenAllPlugin(plugins))
stream$.then((value) => value * 2).then((value) => value + 10)
stream$.next(5)
// 等待所有操作完成后输出:
// 清理资源,最终结果: 20
// 记录日志: 20
场景 2:异步操作完成后的处理
typescript
import { $ } from 'fluth'
import { thenAllPlugin } from 'fluth'
const notificationPlugin = () => ({
executeAll: ({ result }: any) => {
console.log('发送通知: 操作完成,结果为', result)
// 模拟发送通知
return result
},
})
const cachePlugin = () => ({
executeAll: ({ result }: any) => {
console.log('缓存结果:', result)
// 模拟缓存操作
return result
},
})
const plugins = [notificationPlugin(), cachePlugin()]
const stream$ = $().use(thenAllPlugin(plugins))
stream$
.then(async (value) => {
// 模拟异步操作
await new Promise((resolve) => setTimeout(resolve, 1000))
return value * 3
})
.then((value) => value + 5)
stream$.next(10)
// 1秒后输出:
// 发送通知: 操作完成,结果为 35
// 缓存结果: 35
场景 3:错误处理和恢复
typescript
import { $ } from 'fluth'
import { thenAllPlugin } from 'fluth'
const errorReportPlugin = () => ({
executeAll: ({ result, status }: any) => {
if (status === 'REJECTED') {
console.log('错误报告:', result.message)
// 发送错误报告
} else {
console.log('操作成功:', result)
}
return result
},
})
const recoveryPlugin = () => ({
executeAll: ({ result, status }: any) => {
if (status === 'REJECTED') {
console.log('尝试恢复操作')
// 执行恢复逻辑
}
return result
},
})
const plugins = [errorReportPlugin(), recoveryPlugin()]
const stream$ = $().use(thenAllPlugin(plugins))
stream$.then((value) => {
if (value < 0) {
throw new Error('负数错误')
}
return value * 2
})
stream$.next(5)
// 输出: 操作成功: 10
stream$.next(-3)
// 输出: 错误报告: 负数错误
// 输出: 尝试恢复操作
场景 4:性能统计和监控
typescript
import { $ } from 'fluth'
import { thenAllPlugin } from 'fluth'
const performancePlugin = () => {
const startTime = Date.now()
return {
executeAll: ({ result }: any) => {
const endTime = Date.now()
const duration = endTime - startTime
console.log(`总执行时间: ${duration}ms`)
console.log(`最终结果: ${result}`)
return result
},
}
}
const memoryPlugin = () => ({
executeAll: ({ result }: any) => {
if (typeof process !== 'undefined' && process.memoryUsage) {
const memory = process.memoryUsage()
console.log('最终内存使用:', Math.round(memory.heapUsed / 1024 / 1024), 'MB')
}
return result
},
})
const plugins = [performancePlugin(), memoryPlugin()]
const stream$ = $().use(thenAllPlugin(plugins))
stream$
.then(async (value) => {
await new Promise((resolve) => setTimeout(resolve, 500))
return value * 2
})
.then(async (value) => {
await new Promise((resolve) => setTimeout(resolve, 300))
return value + 10
})
stream$.next(15)
// 约800ms后输出性能和内存统计
场景 5:数据验证和保存
typescript
import { $ } from 'fluth'
import { thenAllPlugin } from 'fluth'
const validationPlugin = () => ({
executeAll: ({ result }: any) => {
if (typeof result === 'number' && result > 0) {
console.log('数据验证通过:', result)
} else {
console.log('数据验证失败:', result)
}
return result
},
})
const savePlugin = () => ({
executeAll: ({ result }: any) => {
console.log('保存数据到数据库:', result)
// 模拟保存操作
return result
},
})
const auditPlugin = () => ({
executeAll: ({ result }: any) => {
console.log('审计日志: 数据处理完成', {
timestamp: new Date().toISOString(),
result,
})
return result
},
})
const plugins = [validationPlugin(), savePlugin(), auditPlugin()]
const stream$ = $().use(thenAllPlugin(plugins))
stream$.then((value) => value * 2).then((value) => Math.round(value))
stream$.next(3.7)
// 输出:
// 数据验证通过: 7
// 保存数据到数据库: 7
// 审计日志: 数据处理完成 { timestamp: '...', result: 7 }
场景 6:条件执行插件
typescript
import { $ } from 'fluth'
import { thenAllPlugin } from 'fluth'
const conditionalPlugin = (condition: (result: any) => boolean, action: string) => ({
executeAll: ({ result }: any) => {
if (condition(result)) {
console.log(`条件满足,执行 ${action}:`, result)
} else {
console.log(`条件不满足,跳过 ${action}:`, result)
}
return result
},
})
const plugins = [
conditionalPlugin((x) => x > 10, '高值处理'),
conditionalPlugin((x) => x % 2 === 0, '偶数处理'),
conditionalPlugin((x) => x < 5, '低值处理'),
]
const stream$ = $().use(thenAllPlugin(plugins))
stream$.next(12)
// 输出:
// 条件满足,执行 高值处理: 12
// 条件满足,执行 偶数处理: 12
// 条件不满足,跳过 低值处理: 12
stream$.next(3)
// 输出:
// 条件不满足,跳过 高值处理: 3
// 条件不满足,跳过 偶数处理: 3
// 条件满足,执行 低值处理: 3
场景 7:插件链式处理
typescript
import { $ } from 'fluth'
import { thenAllPlugin } from 'fluth'
const formatPlugin = () => ({
executeAll: ({ result }: any) => {
const formatted = `结果: ${result}`
console.log('格式化:', formatted)
return result
},
})
const encryptPlugin = () => ({
executeAll: ({ result }: any) => {
const encrypted = btoa(result.toString()) // 简单的base64编码
console.log('加密:', encrypted)
return result
},
})
const compressPlugin = () => ({
executeAll: ({ result }: any) => {
console.log('压缩数据长度:', result.toString().length)
return result
},
})
const plugins = [formatPlugin(), encryptPlugin(), compressPlugin()]
const stream$ = $().use(thenAllPlugin(plugins))
stream$.next('Hello World')
// 输出:
// 格式化: 结果: Hello World
// 加密: SGVsbG8gV29ybGQ=
// 压缩数据长度: 11
场景 8:移除插件组合
typescript
import { $ } from 'fluth'
import { thenAllPlugin } from 'fluth'
const plugin1 = () => ({
executeAll: ({ result }: any) => {
console.log('插件1执行:', result)
return result
},
})
const plugin2 = () => ({
executeAll: ({ result }: any) => {
console.log('插件2执行:', result)
return result
},
})
const plugins = [plugin1(), plugin2()]
const pluginCombination = thenAllPlugin(plugins)
const stream$ = $().use(pluginCombination)
stream$.then((value) => value * 2)
stream$.next(5)
// 输出:
// 插件1执行: 10
// 插件2执行: 10
stream$.remove(pluginCombination)
stream$.next(3)
// 无插件输出