计划组件
Plan 提供了灵活的任务编排系统,具有共享状态管理和信号控制功能。
特性
- 任务排序和并行执行
- 任务间共享状态管理
- 任务生命周期控制(暂停/恢复/停止)
- 基于信号的任务通信
- 可配置的状态检查间隔
- 线程安全操作
- 资源清理管理
核心概念
Plan
Plan 是一个具有共享状态空间的任务集合。它管理任务的执行顺序和生命周期。
Task
Task 是一个工作单元,具有以下特性:
- 可以访问共享状态
- 可以响应控制信号
- 可以与同一顺序的其他任务并行执行
- 可以报告其状态并存储任务特定数据
共享空间
共享空间提供了一个线程安全的存储机制,用于任务之间共享数据和通信。
API 参考
Plan 创建
typescript
class Plan {
constructor(id: string) {
// 初始化计划
}
}
类型定义
typescript
/**
* 计划或任务的状态
*/
export type PlanStatus =
| 'created' //初始状态
| 'running' //任务执行中
| 'paused' //任务暂停
| 'completed' //任务成功完成
| 'failed' //任务执行失败
| 'destroyed' //任务已终止
| 'unknown';
/**
* Plan 类
*/
export declare class Plan {
/**
* @param plan_id - 计划ID
*/
constructor(plan_id: string);
/**
* 订阅计划
* @param key - 要订阅的键
* @param subscribe_fn - 计划变更时执行的函数
* @param subscribe_args - 传递给订阅函数的参数,The rest of the arguments are the arguments to the process
*/
Subscribe(key: string, subscribe_fn: string, ...subscribe_args: any[]): void;
/**
* 添加任务到计划
*
* will Trigger the TaskStarted signal "TaskStarted"
* @param task_id - 任务ID
* @param order - 任务顺序
* @param task_process - 要执行的任务处理函数
* @param task_args - 传递给任务的参数
*/
Add(
task_id: string,
order: number,
task_process: string,
...task_args: any[]
): void;
/**
* 同步运行计划
*/
Run(): void;
/**
* 释放计划,清空共享空间
*/
Release(): void;
/**
* 获取计划状态和每个任务的状态
*/
Status(): {
plan: PlanStatus;
tasks: Record<string, PlanStatus>;
};
/**
* 获取任务状态
*/
TaskStatus(task_id: string): PlanStatus;
/**
* 获取或设置任务数据,如果未提供数据则返回当前数据,否则设置数据并返回之前的数据
*/
TaskData(task_id: string, data?: any): any;
/**
* 从共享空间获取值
*/
Get(key: string): any;
/**
* 在共享空间中设置值
*/
Set(key: string, value: any): void;
/**
* 从共享空间删除值
*/
Del(key: string): void;
/**
* 清空共享空间
*/
Clear(): void;
}
示例
typescript
import { Plan, time } from '@yao/runtime';
function Test() {
const namespace = 'scripts.runtime.api.plan';
//same id with same namespace,will be the same plan
//同一个id,多次new Plan,会返回同一个plan
const plan = new Plan('test-plan');
// 订阅任务的变化
plan.Subscribe('TaskStarted', `${namespace}.TaskStarted`, 'foo');
plan.Subscribe('TaskCompleted', `${namespace}.TaskCompleted`, 'foo');
// 订阅共享空间的变化
plan.Subscribe('some-key', `${namespace}.SomeKey`, 'bar');
plan.Add('task-1', 1, `${namespace}.Task1`, 'foo1');
plan.Add('task-2', 1, `${namespace}.Task2`, 'foo2');
plan.Add('task-3', 2, `${namespace}.Task3`, 'foo3');
plan.Add('task-4', 2, `${namespace}.Task4`, 'foo4');
// 运行计划
plan.Run();
// 释放计划,资源,也会触发subscribe事件。
plan.Release();
return 'Done';
}
function Task1(plan_id: string, task_id: string, foo: string) {
const plan = new Plan(plan_id);
// 更新共享数据,will trigger the SomeKey signal
plan.Set('some-key', `foo-${foo}`);
time.Sleep(200);
const ts = new Date().getTime();
return ts;
}
function Task2(plan_id: string, task_id: string, foo: string) {
time.Sleep(300);
const ts = new Date().getTime();
return ts;
}
function Task3(plan_id: string, task_id: string, foo: string) {
const plan = new Plan(plan_id);
const some = plan.Get('some-key');
// 更新共享数据
plan.Set('some-key', `bar-${foo}`);
time.Sleep(400);
const ts = new Date().getTime();
return { ts: ts, shared: some };
}
function Task4(plan_id: string, task_id: string, foo: string) {
const plan = new Plan(plan_id);
time.Sleep(500);
const some = plan.Get('some-key');
const ts = new Date().getTime();
return { ts: ts, shared: some };
}
function SomeKey(plan_id: string, key: string, data: any, foo: string) {
const plan = new Plan(plan_id);
const ts = new Date().getTime();
console.log(`SomeKey ${plan_id} ${key} ${JSON.stringify(data)} ${foo} ${ts}`);
console.log(plan.Status());
}
function TaskStarted(plan_id: string, key: string, data: any, foo: string) {
const ts = new Date().getTime();
console.log(
`TaskStarted ${plan_id} ${key} ${JSON.stringify(data)} ${foo} ${ts}`
);
}
function TaskCompleted(plan_id: string, key: string, data: any, foo: string) {
const ts = new Date().getTime();
console.log(
`TaskCompleted ${plan_id} ${key} ${JSON.stringify(data)} ${foo} ${ts}`
);
}
最佳实践
- 错误处理: 始终检查计划操作返回的错误。
- 信号处理: 在长时间运行的任务中实现适当的信号处理。
- 资源清理: 使用
stop()
正确清理资源。 - 上下文使用: 在任务实现中尊重上下文取消。
- 共享状态: 使用共享空间进行任务通信,而不是外部变量。
线程安全
计划组件设计为线程安全:
- 所有计划操作都是同步的
- 共享空间操作受互斥锁保护
- 信号通道是缓冲的,以防止阻塞
事件触发
内置事件:
- TaskCompleted:当任务完成时触发
- TaskError:当任务发生错误时触发
- TaskStarted:当任务被添加到计划时触发
- Released:当共享空间中的所有数据被删除时触发