Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

再次咨询,action not found问题 #51

Open
chenxushao opened this issue Jan 8, 2024 · 21 comments
Open

再次咨询,action not found问题 #51

chenxushao opened this issue Jan 8, 2024 · 21 comments

Comments

@chenxushao
Copy link

情况是这样的,fastflow部署了一共2台机器。运行的时候,经常报action not found
报这个错误的代码在这里:
execturor.go
func (e *DefExecutor) runAction(taskIns *entity.TaskInstance) error {
act := ActionMap[taskIns.ActionName]
if act == nil {
return fmt.Errorf("action not found: %s", taskIns.ActionName)
}

再翻看源代码,ActionMap 是定义的一个全局变量。
var (
ActionMap = map[string]run.Action{}

defExc Executor
defStore Store
defKeeper Keeper
defParser Parser
defCommander Commander
)

Action 的注册,是将 Action 放到这个 Map 中,相当于数据是存储在‹内存›中,并没有持久化。
在程序中,我是这么注册Action 的。

// 注册 action
fastflow.RegisterAction(actions)

我看到官方文档上是这样描述:
当你开始运行一个 Dag 后,则会为本次执行生成一个执行记录,它被称为 DagInstance,当它生成以后,会由 Leader 实例将其分发到一个健康的 Worker,再由其解析、执行。

假设fastflow.RegisterAction(actions)在2台机器中的A执行,那么是不有可能由另一台机器去执行,是不是因为这个造成的报action not found呢?

@chenxushao
Copy link
Author

另外,部署的代码是同一套。

@chenxushao
Copy link
Author

Dag和注册Action我这边均是由前端发起请求,动态注册的,假如是A机器收到请求,那么这个action肯定是在A机器了。
但是真正的执行Action根据文档描述,可能是由另一台worker机器B执行,这是不是出现了action not found的根因呢

@chenxushao
Copy link
Author

诚盼回复。

@philhuan
Copy link
Contributor

philhuan commented Jan 8, 2024

这里 action 的注册用法是在服务启动的时候注册,不是通过接口注册的。

@philhuan
Copy link
Contributor

philhuan commented Jan 8, 2024

要注册得有一个实现了接口的结构体吧,通过接口注册怎么关联这个结构体,新增action 得改代码吧?

@chenxushao
Copy link
Author

动态不支持吗,比如新增一个编排任务,这时候会新创建dag和新注册action,应该是比较合理的需求。

@chenxushao
Copy link
Author

要注册得有一个实现了接口的结构体吧,通过接口注册怎么关联这个结构体,新增action 得改代码吧?

您好,是有两个固化的结构体,只是每次注册name不一样,因为处理的逻辑不一样。

@ShiningRush
Copy link
Owner

action not found的唯一原因就是它没注册上,看你描述,可能导致这个现象有几个点:

  1. 你的注册逻辑可能晚于fastflow的初始化
  2. 注册的actionname不对,或者注册失败

@chenxushao
Copy link
Author

chenxushao commented Jan 10, 2024

action not found的唯一原因就是它没注册上,看你描述,可能导致这个现象有几个点:

  1. 你的注册逻辑可能晚于fastflow的初始化
  2. 注册的actionname不对,或者注册失败

你好。 fasttflow的初始化,只需要也只能够执行一次对吗?

目前fastflow的初始化,是在go main方法中进行的。
之后,通过controller(使用的gin框架)来触发提交的任务。

在main语文教学中初始化fastflow的代码如下:

func InitFlow() {
fastFlowConf := configs.Get().FastFlow
connStr := fastFlowConf.ConnStr
dataBase := fastFlowConf.Database
prefix := fastFlowConf.Prefix

workKeyNum := BuildWorkNum()
logrus.Infof("InitFlow. workKeyNum:%d", workKeyNum)

// 初始化选举组件
Keeper = mongoKeeper.NewKeeper(&mongoKeeper.KeeperOption{
	Key: "worker-" + util.StringUtils.AsString(workKeyNum),
	// if your mongo does not set user/pwd, you should remove it
	ConnStr:  connStr,
	Database: dataBase,
	Prefix:   prefix,
})

logrus.Infof("init keeper begin. keeper:%v", Keeper)
if err := Keeper.Init(); err != nil {
	errMsg := fmt.Sprintf("init keeper failed: %s", err.Error())
	logrus.Error(errMsg)
	notice.RobotNotice(errMsg)
	return
}

logrus.Infof("init keeper finish. workKey:%s", Keeper.WorkerKey())

isLeader := Keeper.IsLeader()
if isLeader {
	notice.RobotNotice(fmt.Sprintf("keeper leader ip:%s,workKey:%s", runtimes.AppInfo.Ip, Keeper.WorkerKey()))
}

// 初始化存储组件
st := mongoStore.NewStore(&mongoStore.StoreOption{
	// if your mongo does not set user/pwd, you should remove it
	ConnStr:  connStr,
	Database: dataBase,
	Prefix:   prefix,
})

logrus.Infof("init store begin. st:%v", st)
if err := st.Init(); err != nil {
	errMsg := fmt.Sprintf("init store failed: %s", err.Error())
	logrus.Error(errMsg)
	notice.RobotNotice(errMsg)
	return
}
logrus.Info("init store finish")

// 启动fastflow
logrus.Info("start flow begin")
if err := fastflow.Start(&fastflow.InitialOption{
	Keeper:             Keeper,
	Store:              st,
	DagScheduleTimeout: 300 * time.Second,
	ExecutorTimeout:    150 * time.Second,
}); err != nil {
	errMsg := fmt.Sprintf("init fastflow failed: %s", err.Error())
	logrus.Error(errMsg)
	notice.RobotNotice(errMsg)
	return
}
logrus.Info("start flow finish")

}

之后,通过http请求来构建Dag并触发执行。代码形如下:

dagVars, dagVarMap := BuildDagVars(globalVarItems)
tasks, actions := BuildTagElement(rawDagBO.TaskNodes)

// 注册 action
fastflow.RegisterAction(actions)
logrus.Infof("RegisterAction finish.dagExplainId:%d", dagExplainId)

//构建dag模型
dag := initialize.CreateDag(util.StringUtils.AsString(dagId), rawDagBO.Name, rawDagBO.Descr, tasks, dagVars)

ch := make(chan string)
//创建dag并运行
go initialize.CreateDagAndInstance(dag, dagVarMap, ch)

这种使用方式是可行的吗?

@chenxushao
Copy link
Author

通过阅读源码, fastflow.RegisterAction(actions) 只会在接收到请求的那台机器的内存中(是一个map)才会有数据。我想的是,之所以报action not found,是否是由于具体的执行被调度到另一台机器上了。 另外,还有一个问题,一个Dag上的N外节点,只会被同一台机器执行吧?因为一个dag节点还存在数据共享和传递。

@chenxushao
Copy link
Author

action not found的唯一原因就是它没注册上,看你描述,可能导致这个现象有几个点:

  1. 你的注册逻辑可能晚于fastflow的初始化
  2. 注册的actionname不对,或者注册失败

第1点有一个疑问:
fastflow.RegisterAction
必须在
fastflow.start之前执行是吗?

@ShiningRush
Copy link
Owner

action not found的唯一原因就是它没注册上,看你描述,可能导致这个现象有几个点:

  1. 你的注册逻辑可能晚于fastflow的初始化
  2. 注册的actionname不对,或者注册失败

第1点有一个疑问: fastflow.RegisterAction 必须在 fastflow.start之前执行是吗?

最好是,最晚必须在工作流实例被执行前,否则就会在action集合中找不到

@ShiningRush
Copy link
Owner

通过阅读源码, fastflow.RegisterAction(actions) 只会在接收到请求的那台机器的内存中(是一个map)才会有数据。我想的是,之所以报action not found,是否是由于具体的执行被调度到另一台机器上了。 另外,还有一个问题,一个Dag上的N外节点,只会被同一台机器执行吧?因为一个dag节点还存在数据共享和传递。

你的理解是对的,同一个DagInstance只会被同一个worker执行

@chenxushao
Copy link
Author

action not found的唯一原因就是它没注册上,看你描述,可能导致这个现象有几个点:

  1. 你的注册逻辑可能晚于fastflow的初始化
  2. 注册的actionname不对,或者注册失败

你好。 fasttflow的初始化,只需要也只能够执行一次对吗?

目前fastflow的初始化,是在go main方法中进行的。 之后,通过controller(使用的gin框架)来触发提交的任务。

在main语文教学中初始化fastflow的代码如下:

func InitFlow() { fastFlowConf := configs.Get().FastFlow connStr := fastFlowConf.ConnStr dataBase := fastFlowConf.Database prefix := fastFlowConf.Prefix

workKeyNum := BuildWorkNum()
logrus.Infof("InitFlow. workKeyNum:%d", workKeyNum)

// 初始化选举组件
Keeper = mongoKeeper.NewKeeper(&mongoKeeper.KeeperOption{
	Key: "worker-" + util.StringUtils.AsString(workKeyNum),
	// if your mongo does not set user/pwd, you should remove it
	ConnStr:  connStr,
	Database: dataBase,
	Prefix:   prefix,
})

logrus.Infof("init keeper begin. keeper:%v", Keeper)
if err := Keeper.Init(); err != nil {
	errMsg := fmt.Sprintf("init keeper failed: %s", err.Error())
	logrus.Error(errMsg)
	notice.RobotNotice(errMsg)
	return
}

logrus.Infof("init keeper finish. workKey:%s", Keeper.WorkerKey())

isLeader := Keeper.IsLeader()
if isLeader {
	notice.RobotNotice(fmt.Sprintf("keeper leader ip:%s,workKey:%s", runtimes.AppInfo.Ip, Keeper.WorkerKey()))
}

// 初始化存储组件
st := mongoStore.NewStore(&mongoStore.StoreOption{
	// if your mongo does not set user/pwd, you should remove it
	ConnStr:  connStr,
	Database: dataBase,
	Prefix:   prefix,
})

logrus.Infof("init store begin. st:%v", st)
if err := st.Init(); err != nil {
	errMsg := fmt.Sprintf("init store failed: %s", err.Error())
	logrus.Error(errMsg)
	notice.RobotNotice(errMsg)
	return
}
logrus.Info("init store finish")

// 启动fastflow
logrus.Info("start flow begin")
if err := fastflow.Start(&fastflow.InitialOption{
	Keeper:             Keeper,
	Store:              st,
	DagScheduleTimeout: 300 * time.Second,
	ExecutorTimeout:    150 * time.Second,
}); err != nil {
	errMsg := fmt.Sprintf("init fastflow failed: %s", err.Error())
	logrus.Error(errMsg)
	notice.RobotNotice(errMsg)
	return
}
logrus.Info("start flow finish")

}

之后,通过http请求来构建Dag并触发执行。代码形如下:

dagVars, dagVarMap := BuildDagVars(globalVarItems)
tasks, actions := BuildTagElement(rawDagBO.TaskNodes)

// 注册 action
fastflow.RegisterAction(actions)
logrus.Infof("RegisterAction finish.dagExplainId:%d", dagExplainId)

//构建dag模型
dag := initialize.CreateDag(util.StringUtils.AsString(dagId), rawDagBO.Name, rawDagBO.Descr, tasks, dagVars)

ch := make(chan string)
//创建dag并运行
go initialize.CreateDagAndInstance(dag, dagVarMap, ch)

这种使用方式是可行的吗?

这种方式可行吗?

@chenxushao
Copy link
Author

action not found的唯一原因就是它没注册上,看你描述,可能导致这个现象有几个点:

  1. 你的注册逻辑可能晚于fastflow的初始化
  2. 注册的actionname不对,或者注册失败

你好。 fasttflow的初始化,只需要也只能够执行一次对吗?
目前fastflow的初始化,是在go main方法中进行的。 之后,通过controller(使用的gin框架)来触发提交的任务。
在main语文教学中初始化fastflow的代码如下:
func InitFlow() { fastFlowConf := configs.Get().FastFlow connStr := fastFlowConf.ConnStr dataBase := fastFlowConf.Database prefix := fastFlowConf.Prefix

workKeyNum := BuildWorkNum()
logrus.Infof("InitFlow. workKeyNum:%d", workKeyNum)

// 初始化选举组件
Keeper = mongoKeeper.NewKeeper(&mongoKeeper.KeeperOption{
	Key: "worker-" + util.StringUtils.AsString(workKeyNum),
	// if your mongo does not set user/pwd, you should remove it
	ConnStr:  connStr,
	Database: dataBase,
	Prefix:   prefix,
})

logrus.Infof("init keeper begin. keeper:%v", Keeper)
if err := Keeper.Init(); err != nil {
	errMsg := fmt.Sprintf("init keeper failed: %s", err.Error())
	logrus.Error(errMsg)
	notice.RobotNotice(errMsg)
	return
}

logrus.Infof("init keeper finish. workKey:%s", Keeper.WorkerKey())

isLeader := Keeper.IsLeader()
if isLeader {
	notice.RobotNotice(fmt.Sprintf("keeper leader ip:%s,workKey:%s", runtimes.AppInfo.Ip, Keeper.WorkerKey()))
}

// 初始化存储组件
st := mongoStore.NewStore(&mongoStore.StoreOption{
	// if your mongo does not set user/pwd, you should remove it
	ConnStr:  connStr,
	Database: dataBase,
	Prefix:   prefix,
})

logrus.Infof("init store begin. st:%v", st)
if err := st.Init(); err != nil {
	errMsg := fmt.Sprintf("init store failed: %s", err.Error())
	logrus.Error(errMsg)
	notice.RobotNotice(errMsg)
	return
}
logrus.Info("init store finish")

// 启动fastflow
logrus.Info("start flow begin")
if err := fastflow.Start(&fastflow.InitialOption{
	Keeper:             Keeper,
	Store:              st,
	DagScheduleTimeout: 300 * time.Second,
	ExecutorTimeout:    150 * time.Second,
}); err != nil {
	errMsg := fmt.Sprintf("init fastflow failed: %s", err.Error())
	logrus.Error(errMsg)
	notice.RobotNotice(errMsg)
	return
}
logrus.Info("start flow finish")

}
之后,通过http请求来构建Dag并触发执行。代码形如下:

dagVars, dagVarMap := BuildDagVars(globalVarItems)
tasks, actions := BuildTagElement(rawDagBO.TaskNodes)

// 注册 action
fastflow.RegisterAction(actions)
logrus.Infof("RegisterAction finish.dagExplainId:%d", dagExplainId)

//构建dag模型
dag := initialize.CreateDag(util.StringUtils.AsString(dagId), rawDagBO.Name, rawDagBO.Descr, tasks, dagVars)

ch := make(chan string)
//创建dag并运行
go initialize.CreateDagAndInstance(dag, dagVarMap, ch)

这种使用方式是可行的吗?

这种方式可行吗?

目前这样,就是遇到 了action not found异常。 我部署了2台机器

@ShiningRush
Copy link
Owner

我看你Resgiter是在接口里面,你有2台机器,可能在A机器接受的请求,注册Action,但是被分发到了B机器,导致这个错误。
为什么不在初始化的时候注册Action呢

@chenxushao
Copy link
Author

因为我们目前的Dag是由业务方构建然后提交的,如果只能在初始化构建,那这个编排任务是否只能在系统启动时才能初始化呢?

@chenxushao
Copy link
Author

我们目前是2类固化的Action,一类是查询类,一类是执行动作的变更,我目前是固化了2个Action实现。然后由前端动态构建Dag,构建好后,regionAction(因为action的name每次不一样),之后创建dag并持久化到mongodb,然后执行CreateDagAndInstance。

@ShiningRush
Copy link
Owner

Dag的执行是可以被任意节点提交的, 但是Action 包含逻辑代码, 所以这部分内容只能在编译时提交

@gaohaotian010
Copy link

Dag的执行是可以被任意节点提交的, 但是Action 包含逻辑代码, 所以这部分内容只能在编译时提交

也就是说,如果像 @chenxushao 这种用法,其实需要预定义所有的“能力”到Action里,然后前端构建DAG的时候只能通过填写“能力”的参数构建DAG的形式,可以这样理解吗?

@ShiningRush
Copy link
Owner

Dag的执行是可以被任意节点提交的, 但是Action 包含逻辑代码, 所以这部分内容只能在编译时提交

也就是说,如果像 @chenxushao 这种用法,其实需要预定义所有的“能力”到Action里,然后前端构建DAG的时候只能通过填写“能力”的参数构建DAG的形式,可以这样理解吗?

是的,你理解的没问题。
如果需要支持动态插入Action,需要将Action用Plugin来实现,目前没有强需求

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants