异步任务 
概述 
异步任务是一种用于处理耗时操作、消耗大量资源或容易出错的逻辑的机制。它可以将这些操作从请求主流程中剥离出来,异步执行,从而提高系统的响应速度和稳定性。
使用场景 
- 耗时的数据处理操作
 - 资源密集型任务(如图片处理)
 - 第三方服务调用(如发送邮件)
 - 批量数据导入导出
 - 需要重试机制的操作
 
例如:新用户注册成功后发送欢迎邮件,可以将发送邮件的操作作为异步任务执行,这样可以让用户更快地得到注册响应,同时避免邮件发送失败影响主流程。
工作机制 
异步任务的工作机制如下:
- 在程序内部维护一个可重复调用的任务池
 - 当向任务池中加入新的任务时,任务会进入等待状态
 - 系统会根据配置的工作进程数(worker_nums)来并发处理任务
 - 任务执行过程中可以实时反馈进度
 - 支持失败重试机制
 
任务配置 
在应用的 tasks 目录下创建任务定义配置文件(如:tasks/example.yao):
json
{
  "name": "示例任务",
  "worker_nums": 10,
  "attempts": 3,
  "attempt_after": 200,
  "timeout": 2,
  "size": 1000,
  "process": "scripts.task.Send",
  "event": {
    "next": "scripts.task.NextID",
    "add": "scripts.task.OnAdd",
    "success": "scripts.task.OnSuccess",
    "error": "scripts.task.OnError",
    "progress": "scripts.task.OnProgress"
  }
}配置参数说明 
| 参数 | 说明 | 是否必填 | 
|---|---|---|
| name | 任务名称 | 是 | 
| worker_nums | 任务池中工作进程数量 | 是 | 
| attempts | 失败重试次数 | 是 | 
| attempt_after | 重试间隔(毫秒) | 是 | 
| timeout | 任务超时时间(秒) | 是 | 
| size | 任务队列大小 | 是 | 
| process | 任务处理器 | 是 | 
| event.next | 生成任务唯一ID的处理器 | 否 | 
| event.add | 添加任务时的回调处理器 | 否 | 
| event.success | 任务成功时的回调处理器 | 否 | 
| event.error | 任务失败时的回调处理器 | 否 | 
| event.progress | 任务进度更新的回调处理器 | 否 | 
事件处理器说明 
event.next 
- 触发时机:每次添加新任务时,用于生成任务的唯一标识
 - 参数:无
 - 返回值:number 类型,表示任务ID
 - 使用场景: 
- 需要自定义任务ID生成规则
 - 需要保证任务ID的全局唯一性
 - 需要使用特定格式的任务ID
 
 
event.add 
- 触发时机:任务被成功添加到任务池后
 - 参数: 
- id: number - 任务ID
 
 - 使用场景: 
- 记录任务创建日志
 - 更新任务相关的业务状态
 - 发送任务创建通知
 
 
event.success 
- 触发时机:任务执行成功后
 - 参数: 
- id: number - 任务ID
 - res: any - 任务执行结果
 
 - 使用场景: 
- 处理任务执行结果
 - 更新相关业务状态
 - 发送成功通知
 - 触发后续业务流程
 
 
event.error 
- 触发时机:任务执行失败时(包括重试后仍然失败)
 - 参数: 
- id: number - 任务ID
 - err: Error - 错误信息
 
 - 使用场景: 
- 记录错误日志
 - 发送失败告警
 - 执行补偿逻辑
 - 更新任务状态
 
 
event.progress 
- 触发时机:任务执行过程中调用 Progress 处理器时
 - 参数: 
- id: number - 任务ID
 - current: number - 当前进度
 - total: number - 总进度
 - message: string - 进度信息
 
 - 使用场景: 
- 更新任务进度显示
 - 记录执行过程日志
 - 实时反馈任务状态
 
 
事件处理器示例 
javascript
/**
 * 生成任务唯一ID
 * @returns {number} 任务ID
 */
function NextID() {
  // 使用时间戳和随机数生成唯一ID
  const timestamp = Date.now();
  const random = Math.floor(Math.random() * 1000);
  return parseInt(`${timestamp}${random}`);
}
/**
 * 任务添加时的回调
 * @param {number} id 任务ID
 */
function OnAdd(id) {
  // 记录任务创建日志
  console.log(`Task #${id} created at ${new Date().toISOString()}`);
  // 更新任务状态
  Process('models.task.save', {
    id: id,
    status: 'created',
    created_at: new Date().toISOString()
  });
}
/**
 * 任务成功回调
 * @param {number} id 任务ID
 * @param {any} res 任务结果
 */
function OnSuccess(id, res) {
  // 更新业务状态
  Process('models.task.save', {
    id: id,
    status: 'completed',
    result: res,
    completed_at: new Date().toISOString()
  });
  // 发送成功通知
  Process('scripts.notification.send', {
    type: 'task_complete',
    task_id: id,
    result: res
  });
}
/**
 * 任务失败回调
 * @param {number} id 任务ID
 * @param {Error} err 错误信息
 */
function OnError(id, err) {
  // 记录错误日志
  console.error(`Task #${id} failed: ${err.message}`);
  // 更新任务状态
  Process('models.task.save', {
    id: id,
    status: 'failed',
    error: err.message,
    failed_at: new Date().toISOString()
  });
  // 发送失败告警
  Process('scripts.notification.send', {
    type: 'task_error',
    task_id: id,
    error: err.message,
    level: 'error'
  });
}
/**
 * 任务进度更新回调
 * @param {number} id 任务ID
 * @param {number} current 当前进度
 * @param {number} total 总进度
 * @param {string} message 进度信息
 */
function OnProgress(id, current, total, message) {
  // 计算进度百分比
  const percentage = Math.floor((current / total) * 100);
  // 更新任务进度
  Process('models.task.save', {
    id: id,
    progress: percentage,
    progress_message: message,
    updated_at: new Date().toISOString()
  });
  // 记录进度日志
  if (percentage % 10 === 0) {
    // 每完成10%记录一次
    console.log(`Task #${id}: ${percentage}% completed - ${message}`);
  }
}任务处理器 
在 scripts 目录下创建任务处理脚本(如:scripts/task.js):
javascript
// 如果是在响应api请求时,不要在脚本里使用全局变量
// 可以考虑使用session保存全局唯一的id
var id = 1024;
/**
 * 生成任务唯一ID
 * @returns {number} 任务ID
 */
function NextID() {
  id = id + 1;
  console.log(`NextID: ${id}`);
  return id;
}
/**
 * 任务处理器
 * @param {number} task_id 任务ID
 * @param {...any} args 任务参数
 */
function Send(task_id, ...args) {
  console.log(args);
  // 任务处理逻辑
  const current = 1;
  const total = 100;
  // 更新任务进度
  Progress(
    'tasks.task.process',
    task_id,
    current,
    total,
    `Progress ${current}/${total}`
  );
  return { message: 'ok' };
}
/**
 * 任务添加时的回调
 * @param {number} id 任务ID
 */
function OnAdd(id) {
  console.log(`OnAdd: #${id}`);
}
/**
 * 任务进度更新回调
 * @param {number} id 任务ID
 * @param {number} current 当前进度
 * @param {number} total 总进度
 * @param {string} message 进度信息
 */
function OnProgress(id, current, total, message) {
  console.log(`OnProgress: #${id} ${message} ${current}/${total}`);
}
/**
 * 任务成功回调
 * @param {number} id 任务ID
 * @param {any} res 任务结果
 */
function OnSuccess(id, res) {
  console.log(`OnSuccess: #${id} ${JSON.stringify(res)}`);
}
/**
 * 任务失败回调
 * @param {number} id 任务ID
 * @param {Error} err 错误信息
 */
function OnError(id, err) {
  console.log(`OnError: #${id} ${err}`);
}任务处理流程 
添加任务
- 使用 
tasks.[task_id].add处理器添加任务 - 可传入任务处理器所需的参数
 - 返回任务ID(成功)或0(失败)
 
- 使用 
 任务状态更新
- 使用 
tasks.[task_id].progress处理器更新任务进度 - 在任务处理器内部调用
 - 会触发 event.progress 回调
 
- 使用 
 查询任务状态
- 使用 
tasks.[task_id].get处理器获取任务状态 - 返回任务的详细信息:javascript
{ id: task_id, status: '', // "WAITING"/"RUNNING"/"SUCCESS"/"FAILURE" current: number, // 当前进度 total: number, // 总进度 message: string, // 进度信息 response: any // 任务结果 } 
- 使用 
 
使用示例 
javascript
function task() {
  for (let i = 1; i < 100; i++) {
    const task_id = Process('tasks.test.add', '任务-' + i);
    if (task_id === 0) {
      console.log('添加任务失败');
      continue;
    }
    console.log(`添加任务成功:${task_id}`);
  }
}最佳实践 
任务设计
- 将耗时操作从主流程中剥离
 - 任务应该是独立且原子的
 - 考虑任务的幂等性
 
资源管理
- 根据服务器资源合理设置 worker_nums
 - 设置合适的任务超时时间
 - 控制任务队列大小
 
错误处理
- 实现完善的重试机制
 - 记录详细的错误日志
 - 设置合理的重试间隔
 
监控告警
- 监控任务队列大小
 - 监控任务执行时间
 - 监控失败率和重试情况