pipe 
pipe 是一种高级的流程控制机制,与 flow 类似但功能更加强大。pipe 通过不同类型的节点来实现各种功能,特别增强了客户端执行和人机交互的能力。
pipe 的核心设计结构如下:
- 一个 pipe 由多个 node(节点)组成,每个 node 可以包含多个子节点,形成层级结构。
 - 每个 node 都有其输入和输出: 
- 使用 
$in[0],$in[1]...来获取节点的输入数据 - 使用 
$out[0],$out[1]...来设置节点的输出数据 
 - 使用 
 - pipe 支持多种类型的节点,每种类型都有其特定的功能:
 
- Process:调用 Yao 处理器,执行预定义的处理逻辑
 - Switch:根据条件判断执行不同的分支节点
 - AI:调用 AI 处理器,支持智能对话和处理
 - User Input:等待用户输入,支持多种交互界面(CLI、Web 等)
 - EOF:结束节点,标记流程终止
 
节点类型会根据配置自动推断,无需显式指定。具体规则如下:
- 配置了 prompts,就会自动的识别为 AI 节点。
 - 配置了 process,就会自动的识别为 Process 节点。
 - 配置了 switch,就会自动的识别为 Switch 节点。使用 switch 配置不同的条件来触发不同的执行分支,也可以使用 goto 来改变执行顺序。如果需要跳转到特定的节点,可以使用
goto声明。而"goto": "EOF"会直接结束节点执行。 - 配置了 ui,就会自动的识别为 user-input 节点。UI 类型需要是
cli, web, app, wxapp中的一种 
类型定义 
// Pipe the pipe
type Pipe struct {
	  ID        string
	  Name      string    `json:"name"`
	  Nodes     []Node    `json:"nodes"`
	  Label     string    `json:"label,omitempty"`
	  Hooks     *Hooks    `json:"hooks,omitempty"`
	  Output    any       `json:"output,omitempty"`    // the pipe output expression
	  Input     Input     `json:"input,omitempty"`     // the pipe input expression
	  Whitelist Whitelist `json:"whitelist,omitempty"` // the process whitelist
	  Goto      string    `json:"goto,omitempty"`      // goto node name / EOF
	  parent    *Pipe            // the parent pipe
	  namespace string           // the namespace of the pipe
	  mapping   map[string]*Node // the mapping of the nodes Key:name Value:index
}
// Hooks the Hooks
type Hooks struct {
	  Progress string `json:"progress,omitempty"`
}
// Node the pip node
type Node struct {
	  Name     string           `json:"name"`
	  Type     string           `json:"type,omitempty"`     // user-input, ai, process, switch, request
	  Label    string           `json:"label,omitempty"`    // Display
	  Process  *Process         `json:"process,omitempty"`  // Yao Process
	  Prompts  []Prompt         `json:"prompts,omitempty"`  // AI prompts
	  Model    string           `json:"model,omitempty"`    // AI model name (optional)
	  Options  map[string]any   `json:"options,omitempty"`  // AI or Request options (optional)
	  Request  *Request         `json:"request,omitempty"`  // Http Request
	  UI       string           `json:"ui,omitempty"`       // The User Interface cli, web, app, wxapp ...
	  AutoFill *AutoFill        `json:"autofill,omitempty"` // Autofill the user input with the expression
	  Switch   map[string]*Pipe `json:"case,omitempty"`     // Switch
	  Input    Input            `json:"input,omitempty"`    // the node input expression
	  Output   any              `json:"output,omitempty"`   // the node output expression
	  Goto     string           `json:"goto,omitempty"`     // goto node name / EOF
	  index int // the index of the node
}
// Whitelist the Whitelist
type Whitelist map[string]bool
// Input the input
type Input []any
// Args the args
type Args []any
// Data data for the template
type Data map[string]interface{}
// ResumeContext the resume context
type ResumeContext struct {
	  ID    string `json:"__id"`
	  Type  string `json:"__type"`
	  UI    string `json:"__ui"`
	  Input Input  `json:"input"`
	  Node  *Node  `json:"node"`
	  Data  Data   `json:"data"`
}
// AutoFill the autofill
type AutoFill struct {
	  Value  any    `json:"value"`
	  Action string `json:"action,omitempty"`
}
// Case the switch case section
type Case struct {
	  Input  Input  `json:"input,omitempty"`  // $in
	  Output any    `json:"output,omitempty"` // $out
	  Nodes  []Node `json:"nodes,omitempty"`  // $out
}
// Prompt the switch
type Prompt struct {
	  Role    string `json:"role,omitempty"`
	  Content string `json:"content,omitempty"`
}
// Process the switch
type Process struct {
	  Name string `json:"name"`
	  Args Args   `json:"args,omitempty"`
}
// Request the request
type Request struct{}
// ChatCompletionChunk the chat completion chunk
type ChatCompletionChunk struct {
	  ID                string      `json:"id"`
	  Object            string      `json:"object"`
	  Created           int64       `json:"created"`
	  Model             string      `json:"model"`
	  SystemFingerprint interface{} `json:"system_fingerprint"`
	  Choices           []struct {
		  Index        int         `json:"index"`
		  Delta        DeltaStruct `json:"delta"`
		  Logprobs     interface{} `json:"logprobs"`
		  FinishReason interface{} `json:"finish_reason"`
	  } `json:"choices"`
}
// DeltaStruct the delta struct
type DeltaStruct struct {
	  Content string `json:"content"`
}Context 定义,用于保存节点的执行状态。
// Context the Context
type Context struct {
	  *Pipe
	  id     string
	  parent *Context // the parent context id
	  context context.Context
	  global  map[string]interface{} // $global
	  sid     string                 // $sid
	  current *Node                  // current position
	  in      map[*Node][]any    // $in the current node input value
	  out     map[*Node]any      // $out the current node output value
	  history map[*Node][]Prompt // history of prompts, this is for the AI and auto merge to the prompts of the node
	  input  []any // $input the pipe input value
	  output any   // $output the pipe output value
}创建 pipe 
创建 pipe 定义 
创建一个 pipe 定义,创建 pipe 的方法有两种,一种是直接在目录 pipes 下面创建 pipe 的定义。第二种方法是调用处理器pipe.create或是pipe.createwith创建并立即执行 pipe,注意这两个处理器创建的 pipe 并不会保存在缓存中,这个跟上面的方法是有区别的。
pipes/cli/translator.pip.yaopipes/web/translator.pip.yao
这种直接由文件创建的 pipe 会在系统启动时自动加载。但时并不会立即执行,需要调用处理器pipe.run或是pipe.resume来执行。
使用处理器来调用 pipe.
- pipe.run <pipe_id>
 - pipe.resume <pipe_id>
 - pipe.resumewith <pipe_id>
 - pipe.close <pipe_id>
 
使用以下的处理器来调用 pipe:
yao run pipes.<Widget.ID> [args...],如果用户打断了 pipe 的执行,会返回一个 pipe context id。可以使用这个 id 进行 pipe 的恢复执行。yao run pipe.run <pipe.id> [...args]yao run pipe.Resume <Context.ID> [args...]yao run pipe.ResumeWith <Context.ID> '::{"foo":"bar"}' [args...]
动态的创建并执行 pipe 
可以直接通过在运行时,动态的加载 pipe 源码的方式来创建运行 pipe,主要使用到 2 个处理器。
pipe.create <pipe.id> [...args],第一个参数是 dsl 的定义,后面是输入的参数。createWith <pipe.id> <global>, [...args]带全局变量的创建方法。参数一是 dsl 定义,第二个参数是全局变量,后面是输入的参数。全局变量的类型必须是一个字典对象。
使用示例 1:
先创建一个 pipe 的定义,这个跟文件创建的格式是一样的。调用处理器pipe.create。
let in = ['hello world']
let input = ['hello world']
let dsl = `{
	"whitelist": ["utils.fmt.Print"],
	"name": "test",
	"label": "Test",
	"nodes": [
		{
			"name": "print",
			"process": {"name":"utils.fmt.Print", "args": "{{ $in }}"},
			"output": "print"
		}
	],
	"output": {"input": "{{ $input }}" }
}`;
Process('pipe.Create', dsl, 'hello world');使用示例 2:
let in = ['hello world']
let input = ['hello world']
dsl = `{
		"whitelist": ["utils.fmt.Print"],
		"name": "test",
		"label": "Test",
		"input": "{{ $global.placeholder }}",
		"nodes": [
			{
				"name": "print",
				"process": {"name":"utils.fmt.Print", "args": "{{ $in }}"},
				"output": "print"
			}
		],
		"output": {"input": "{{ $input }}" }
	}`;
Process('pipe.CreateWith', dsl, { placeholder: 'hello world' });变量引用 
在 pipe 中,变量引用遵循以下规则:
节点内部变量 
$in[index]:引用节点的输入变量,index 从 0 开始$out[index]:引用节点的输出变量,index 从 0 开始
节点间变量引用 
- 可以通过节点名称引用其他节点的输出,如 
node_name.field_name - 在 output 配置中: 
- 可以使用 
$input引用 pipe 的输入数据 - 可以使用 
$out引用当前节点的输出 - 不能使用 
$in关键字 
 - 可以使用 
 
全局变量 
$global.field_name:访问全局变量$sid:获取当前会话 ID
在 nodes 中可以通过 node 的名称引用上一个 node 的输出。比如使用 user.cmd 引用上一个节点 name=user 的 output 输出。
比如有以下的节点配置,需要使用 user.cmd 或是 user.args 来引用上一个节点的输出。而不是使用 user.output.cmd 也不是 user.output.args。
节点 1 配置:
{
  "name": "user",
  "label": "Enter the command",
  "ui": "cli",
  "autofill": {
    "value": "{{ $in[0].placeholder }}",
    "action": "exit"
  },
  "output": {
    "cmd": "{{$out[0]}}",
    "args": "{{$out[1:]}}"
  }
}节点 2 配置如下,使用{{user.args}}来引用上一个节点的输出。
{
  "input": "{{ user.args }}",
  "nodes": [
    {
      "name": "ping",
      "process": {
        "name": "utils.app.Ping",
        "args": "{{ $in[1:] }}"
      }
    }
  ],
  "output": ["run", "{{ user  }}", "{{ ping.engine  }}", "{{ ping.version }}"],
  "goto": "print"
}再看一个详细测试用例:
{
  "name": "AI Translator",
  "hooks": { "progress": "scripts.pipe.onProgress" },
  "whitelist": ["utils.json.Validate", "utils.fmt.Print", "utils.app.Ping"],
  "nodes": [
    {
      "name": "user",
      "label": "Enter the command",
      "ui": "cli",
      "autofill": { "value": "{{ $in[0].placeholder }}", "action": "exit" },
      "output": { "cmd": "{{$out[0]}}", "args": "{{$out[1:]}}" }
    },
    {
      "name": "switch",
      "case": {
        "user.cmd == 'translate'": {
          "input": "{{ user.args[0] }}",
          "nodes": [
            {
              "name": "translate",
              "prompts": [
                {
                  "role": "system",
                  "content": "you will act as a translator, helping me translate the words I give you into Chinese and Arabic."
                },
                {
                  "role": "system",
                  "content": "Use the following JSON format to answer my question. {\"Chinese\":\"...\", \"Arabic\", \"...\"}"
                },
                { "role": "user", "content": "{{ $in[0] }}" }
              ]
            },
            {
              "name": "validate",
              "process": {
                "name": "utils.json.Validate",
                "args": [
                  "{{ translate }}",
                  [{ "haskey": "Chinese" }, { "haskey": "Arabic" }]
                ]
              },
              "goto": "{{ $out == false ? 'print' : 'EOF' }}"
            },
            {
              "name": "print",
              "process": {
                "name": "utils.fmt.Print",
                "args": "{{ translate }}"
              },
              "output": null
            },
            {
              "name": "user",
              "label": "Enter the words to translate",
              "ui": "cli",
              "output": { "args": "{{ $out[0] }}" },
              "autofill": { "value": "{{ translate }}" },
              "goto": "translate"
            }
          ],
          "output": ["{{ translate.Chinese }}", "{{ translate.Arabic }}"],
          "goto": "print"
        },
        "user.cmd == 'run'": {
          "input": "{{ user.args }}",
          "nodes": [
            {
              "name": "ping",
              "process": { "name": "utils.app.Ping", "args": "{{ $in[1:] }}" }
            }
          ],
          "output": ["run", "{{ ping.engine  }}", "{{ ping.version }}"],
          "goto": "print"
        },
        "user.cmd == 'print'": {
          "input": "{{ user.args }}",
          "output": ["print", "{{ user.args }}"],
          "goto": "print"
        },
        "user.cmd == 'exit'": { "goto": "EOF" },
        "default": { "goto": "help", "input": "{{ user }}" }
      }
    },
    {
      "name": "print",
      "process": { "name": "utils.fmt.Print", "args": "{{ $in }}" },
      "output": null,
      "goto": "EOF"
    },
    {
      "name": "help",
      "process": {
        "name": "utils.fmt.Print",
        "args": ["help", "{{ $in }}"]
      },
      "output": null
    }
  ],
  "output": {
    "switch": "{{ switch }}",
    "output": "{{ $output }}",
    "input": "{{ $input }}",
    "sid": "{{ $sid }}",
    "global": "{{ $global }}"
  }
}它的功能如下:
接收用户输入命令
如果命令是
translate,则调用 ai 翻译单词,在调用 ai 的过程中,又有子节点- 进行检查 ai 返回的信息。
 - 如果 ai 返回的信息符合要求,则跳转到
print节点,打印输出结果,然后再等待用户输入,否则跳转到EOF节点。 
如果命令是
run,则作一个 ping 的操作如果命令是
print,则打印输出用户的输入如果命令是
exit,则退出程序
user-input 节点 
当类型为 UI 的节点配置成 cli 类型时,适合于在使用 yao run 命令执行 pipe。遇到 cli 节点时,会变成一个交互界面,程序会等待用户输入。用户退出交互时,需要单独输入一行exit()的命令。继续执行其它节点。
当类型不是 cli,而是 web ,会返回一个 resume context,类型为ResumeContext,用户可以根据这个 context.ID 在下一次调用来恢复执行。比如在 web 端时,会返回一个 resume context,用户可以根据这个上下文信息在下一次请求调用处理器pipe.Resume context.ID继续执行。
由于 pipe 是根据 id 保存在内存中。所有 web 请求都会共用一套 pipe,如果是多个人使用,pipe.id 需要根据用户 ID 来区分,这样才能保证不同用户的请求不会互相影响。