thenPlugin
thenPlugin 是一个在流中特定节点完成后执行插件的工具函数。
类型定义
typescript
thenPlugin: <T>(
plugins: Plugin<T>[],
resolvePrefix?: string,
rejectPrefix?: string,
ignoreUndefined?: boolean
) => {
execute: ({
result,
status,
onfulfilled,
onrejected,
root,
}: {
result: Promise<any> | any
status: PromiseStatus | null
onfulfilled?: OnFulfilled
onrejected?: OnRejected
root: boolean
}) => Promise<any>
}
参数
plugins
(必需): 要执行的插件数组resolvePrefix
(可选): 成功时的控制台前缀,默认为'resolve'
rejectPrefix
(可选): 失败时的控制台前缀,默认为'reject'
ignoreUndefined
(可选): 是否忽略undefined
值的输出,默认为true
详细说明
- 只在当前节点完成后执行指定的插件数组,不会在整个流链上执行
- 等待当前节点的结果(包括 Promise)完全解析后再执行插件
- 按照插件数组的顺序依次执行每个插件
- 插件执行的条件:
- 根节点 (
root=true
) - 有成功处理器的节点 (
onfulfilled
) - 有错误处理器的节点 (
onrejected
) 且状态为REJECTED
- 根节点 (
- 返回原始的
result
,不修改数据流
示例
场景 1:节点完成后的处理
typescript
import { $ } from 'fluth'
import { thenPlugin } from 'fluth'
const completionPlugin = () => ({
execute: ({ result }: any) => {
console.log('节点完成,结果:', result)
return result
},
})
const timestampPlugin = () => ({
execute: ({ result }: any) => {
console.log('完成时间:', new Date().toISOString())
return result
},
})
const plugins = [completionPlugin(), timestampPlugin()]
const stream$ = $()
.then(async (value) => {
// 模拟异步操作
await new Promise((resolve) => setTimeout(resolve, 500))
return value * 2
})
.use(thenPlugin(plugins)) // 只在这个节点完成后执行
.then((value) => value + 10)
stream$.next(5)
// 500ms后输出:
// 节点完成,结果: 10
// 完成时间: 2023-...
场景 2:特定节点的后处理
typescript
import { $ } from 'fluth'
import { thenPlugin } from 'fluth'
const validationPlugin = () => ({
execute: ({ result }: any) => {
if (result > 0) {
console.log('验证通过:', result)
} else {
console.log('验证失败:', result)
}
return result
},
})
const cachePlugin = () => ({
execute: ({ result }: any) => {
console.log('缓存结果:', result)
// 模拟缓存操作
return result
},
})
const plugins = [validationPlugin(), cachePlugin()]
const stream$ = $()
.then((value) => value * 3)
.use(thenPlugin(plugins)) // 只在乘法操作完成后执行
.then((value) => value - 5)
stream$.next(4)
// 输出:
// 验证通过: 12
// 缓存结果: 12
// 最终结果: 7
stream$.next(-2)
// 输出:
// 验证失败: -6
// 缓存结果: -6
// 最终结果: -11
场景 3:异步操作完成后的处理
typescript
import { $ } from 'fluth'
import { thenPlugin } from 'fluth'
const notificationPlugin = () => ({
execute: ({ result }: any) => {
console.log('发送通知: 异步操作完成,结果为', result)
return result
},
})
const metricsPlugin = () => ({
execute: ({ result }: any) => {
console.log('记录指标: 处理了数据', result)
return result
},
})
const plugins = [notificationPlugin(), metricsPlugin()]
const stream$ = $()
.then((value) => value + 1)
.then(async (value) => {
// 模拟数据库操作
await new Promise((resolve) => setTimeout(resolve, 800))
return value * 2
})
.use(thenPlugin(plugins)) // 数据库操作完成后执行
.then((value) => value + 100)
stream$.next(10)
// 800ms后输出:
// 发送通知: 异步操作完成,结果为 22
// 记录指标: 处理了数据 22
// 最终结果: 122
场景 4:错误处理和恢复
typescript
import { $ } from 'fluth'
import { thenPlugin } from 'fluth'
const errorHandlerPlugin = () => ({
execute: ({ result, status }: any) => {
if (status === 'REJECTED') {
console.log('节点错误处理:', result.message)
// 执行错误恢复逻辑
} else {
console.log('节点成功完成:', result)
}
return result
},
})
const auditPlugin = () => ({
execute: ({ result, status }: any) => {
console.log('审计记录:', {
status,
result: status === 'REJECTED' ? result.message : result,
timestamp: Date.now(),
})
return result
},
})
const plugins = [errorHandlerPlugin(), auditPlugin()]
const stream$ = $()
.then((value) => {
if (value < 0) {
throw new Error('负数不允许')
}
return value * 2
})
.use(thenPlugin(plugins))
.then((value) => value + 1)
stream$.next(5)
// 输出:
// 节点成功完成: 10
// 审计记录: { status: null, result: 10, timestamp: ... }
stream$.next(-3)
// 输出:
// 节点错误处理: 负数不允许
// 审计记录: { status: 'REJECTED', result: '负数不允许', timestamp: ... }
场景 5:性能监控
typescript
import { $ } from 'fluth'
import { thenPlugin } from 'fluth'
const performancePlugin = (operationName: string) => {
const startTime = Date.now()
return {
execute: ({ result }: any) => {
const endTime = Date.now()
const duration = endTime - startTime
console.log(`${operationName} 执行时间: ${duration}ms`)
return result
},
}
}
const resourcePlugin = () => ({
execute: ({ result }: any) => {
if (typeof process !== 'undefined' && process.memoryUsage) {
const memory = process.memoryUsage()
console.log('内存使用:', Math.round(memory.heapUsed / 1024 / 1024), 'MB')
}
return result
},
})
const plugins = [performancePlugin('数据处理'), resourcePlugin()]
const stream$ = $()
.then(async (value) => {
// 模拟耗时操作
await new Promise((resolve) => setTimeout(resolve, 200))
return value.map((x: number) => x * x)
})
.use(thenPlugin(plugins))
.then((value) => value.reduce((a: number, b: number) => a + b, 0))
stream$.next([1, 2, 3, 4, 5])
// 200ms后输出:
// 数据处理 执行时间: 200ms
// 内存使用: XX MB
场景 6:条件执行插件
typescript
import { $ } from 'fluth'
import { thenPlugin } from 'fluth'
const conditionalPlugin = (condition: (result: any) => boolean, message: string) => ({
execute: ({ result }: any) => {
if (condition(result)) {
console.log(`${message}: ${result}`)
}
return result
},
})
const plugins = [
conditionalPlugin((x) => x > 50, '高值警告'),
conditionalPlugin((x) => x < 10, '低值提醒'),
conditionalPlugin((x) => x % 10 === 0, '整十数'),
]
const stream$ = $()
.then((value) => value * 5)
.use(thenPlugin(plugins))
.then((value) => value / 2)
stream$.next(12)
// 输出: 高值警告: 60
stream$.next(1)
// 输出: 低值提醒: 5
stream$.next(4)
// 输出: 整十数: 20
场景 7:数据转换和格式化
typescript
import { $ } from 'fluth'
import { thenPlugin } from 'fluth'
const formatPlugin = () => ({
execute: ({ result }: any) => {
const formatted = JSON.stringify(result, null, 2)
console.log('格式化数据:', formatted)
return result
},
})
const validationPlugin = () => ({
execute: ({ result }: any) => {
const isValid = typeof result === 'object' && result !== null
console.log('数据验证:', isValid ? '通过' : '失败')
return result
},
})
const serializationPlugin = () => ({
execute: ({ result }: any) => {
const serialized = Buffer.from(JSON.stringify(result)).toString('base64')
console.log('序列化长度:', serialized.length)
return result
},
})
const plugins = [formatPlugin(), validationPlugin(), serializationPlugin()]
const stream$ = $()
.then((value) => ({
id: value,
name: `Item ${value}`,
timestamp: Date.now(),
}))
.use(thenPlugin(plugins))
.then((value) => ({ ...value, processed: true }))
stream$.next(42)
// 输出格式化、验证和序列化信息
场景 8:插件状态管理
typescript
import { $ } from 'fluth'
import { thenPlugin } from 'fluth'
class StatefulPlugin {
private history: any[] = []
private count = 0
getPlugin() {
return {
execute: ({ result }: any) => {
this.count++
this.history.push(result)
console.log(`第 ${this.count} 次处理:`, result)
console.log('历史记录:', this.history.slice(-3)) // 显示最近3次
return result
},
}
}
getStats() {
return {
totalCount: this.count,
history: [...this.history],
}
}
reset() {
this.history = []
this.count = 0
}
}
const statefulPlugin = new StatefulPlugin()
const plugins = [statefulPlugin.getPlugin()]
const stream$ = $()
.then((value) => value * 2)
.use(thenPlugin(plugins))
.then((value) => value + 1)
stream$.next(1) // 第1次处理: 2
stream$.next(2) // 第2次处理: 4
stream$.next(3) // 第3次处理: 6
console.log('统计信息:', statefulPlugin.getStats())
场景 9:移除插件
typescript
import { $ } from 'fluth'
import { thenPlugin } from 'fluth'
const plugin1 = () => ({
execute: ({ result }: any) => {
console.log('插件1执行:', result)
return result
},
})
const plugin2 = () => ({
execute: ({ result }: any) => {
console.log('插件2执行:', result)
return result
},
})
const plugins = [plugin1(), plugin2()]
const pluginCombination = thenPlugin(plugins)
const stream$ = $()
.then((value) => value * 3)
.use(pluginCombination)
.then((value) => value + 5)
stream$.next(2)
// 输出:
// 插件1执行: 6
// 插件2执行: 6
stream$.remove(pluginCombination)
stream$.next(4)
// 无插件输出,直接计算: 4 * 3 + 5 = 17