Skip to content
导航栏

使用任务

使用异步任务来执行耗时的处理操作。

异步任务的工作机制如下:在程序内部维护了一个可重复调用的任务池,当向任务池中加入新的任务时,并不会马上执行,而是会等待有空闲的处理器才会执行。避免了大量并发处理导致服务器资源短缺。

https://yaoapps.com/doc/手册/Widgets/Task

操作

定义任务,在应用tasks目录创建任务定义配置文件。

  • name 任务名称
  • worker_nums 指定任务池中工作处理器的数量
  • attempts 失败重试次数
  • attempt_after 重试间隔
  • timeout 超时时间
  • process 该 task 绑定的处理器
  • next 生成任务唯一 id,可选,需要返回一个整型的数字,用来生成任务的标识,如果没有传值,内部会根据作业队列自动的生成 id,注意官方版本有一个 bug,内部 ID 无法自动增长
  • add 添加任务时触发的方法,可选
  • success 任务处理成功后触发方法,可选
  • error 任务失败后触发方法,可选
  • progress 任务处理中调用,可选

tasks/test.yao

json
{
  "name": "测试task",
  "worker_nums": 10,
  "attempts": 3,
  "attempt_after": 200,
  "timeout": 2,
  "size": 1000,
  "process": "scripts.task.Send", //必须的
  "event": {
    "next": "scripts.task.NextID", //需要返回一个整型的数字,用来生成任务的标识,返回值一定要保持唯一,要不就留空
    "add": "scripts.task.OnAdd",
    "success": "scripts.task.OnSuccess",
    "error": "scripts.task.OnError",
    "progress": "scripts.task.OnProgress"
  }
}
js
// 如果是在响应api请求时,不要在脚本里使用全局变量,因为不同的会话中,脚本会初始化,id并不会增长
// 可以考虑使用session保存全局唯一的id
var id = 1024;

/**
 * 任务标识生成器
 * Generate job id,需要返回一个整型的数字,用来生成任务的标识,一定要保证ID的唯一性
 * @returns
 */
function NextID() {
  id = id + 1;
  console.log(`NextID: ${id}`);
  return id;
}

/**
 * 任务绑定的处理器,
 * @param {integer} task_id 作业的id,
 * @param {any} args 任务的参数,可以有多个,由tasks.xxx.add处理器传入。
 *
 */
function Send(task_id, ...args) {
  console.log(args);

  // do the job

  const current = 1;
  const totla = 100;

  // 在任务处理器内部过程中调用,汇报任务处理进展,在一个长时间运行的作业中,这个是必要的。
  Progress(
    'tasks.task.process',
    task_id,
    current,
    total,
    fmt.Sprintf('Progress %v/%v', current, total),
  );

  return {
    message: 'ok',
  };
}

/**
 * OnAdd add event
 * @param {*} id 任务id
 */
function OnAdd(id) {
  log.Error('进入add');
  console.log(`OnAdd: #${id}`);
}

/**
 * OnProgress
 * @param {integer} id task id,任务ID
 * @param {integer} current
 * @param {integer} total
 * @param {string} message
 */
function OnProgress(id, current, total, message) {
  console.log(`OnProgress: #${id} ${message} ${current}/${total} `);
}
/**
 * OnSuccess
 * @param {integer} id task id,任务ID
 * @param {any} res
 */
function OnSuccess(id, res) {
  console.log(`OnSuccess: #${id} ${JSON.stringify(res)}`);
}
/**
 * OnError
 * @param {integer} id task id,任务ID
 * @param {err} error
 */
function OnError(id, err) {
  console.log(`OnError: #${id} ${err}`);
}

工作原理

任务处理流程图

当定义 task 后,yao 框架会维护一个作业池。

处理器:

  • tasks.[task_id].add 增加一个任务,增加任务时可以传入任务处理器需要的参数,如果增加成功返回 task_id,失败返回 0。

  • tasks.[task_id].progress 任务进度反馈处理器,在任务处理器内部调用,在处理器内部主动调用,除了会更新 job 内部的状态,还会回调 event.process 事件

js
// 在任务处理器内部过程中调用
Progress(
  'tasks.task.process',
  task_id,
  current,
  total,
  fmt.Sprintf('Progress %v/%v', current, total),
);
  • tasks.[task_id].get 根据任务 ID,获取当前任务的状态,在任务外部主动调用

获取 job 的状态

js
const job = Progress('tasks.task.get', task_id);

return {
  id: job_id,
  status: '', //"WAITING"/"RUNNING"/"SUCCESS"/"FAILURE"
  current: job.curr, //由处理器`tasks.[task_id].progress` 更新
  total: job.total, //由处理器`tasks.[task_id].progress` 更新
  message: job.message, //由处理器`tasks.[task_id].progress` 更新
  response: job.response,
};

调用任务时,是调用tasks.[task_id].add处理器方法,并且传入业务处理器需要参数。这里的 task_id 是 task 的定义标识,而标识即是定义的 task 配置文件的文件名。

任务会保存到缓存中,并等待处理。

js
function task() {
  for (i = 1; i < 100; i++) {
    //
    const task_id = Process('tasks.test.Add', '进入任务' + i);
    if (task_id == 0) {
      console.log('add task failed');
    }
  }
}