Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow to use async tools within the task pipeline #550

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

53 changes: 50 additions & 3 deletions src/crewai/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,53 @@ def execute_task(

return result

async def aexecute_task(
self,
task: Any,
context: Optional[str] = None,
tools: Optional[List[Any]] = None,
) -> str:
"""Execute a task with the agent.

Args:
task: Task to execute.
context: Context to execute the task in.
tools: Tools to use for the task.

Returns:
Output of the agent
"""
self.tools_handler.last_used_tool = {}

task_prompt = task.prompt()

if context:
task_prompt = self.i18n.slice("task_with_context").format(
task=task_prompt, context=context
)

tools = self._parse_tools(tools or self.tools)
self.create_agent_executor(tools=tools)
self.agent_executor.tools = tools
self.agent_executor.task = task

self.agent_executor.tools_description = render_text_description(tools)
self.agent_executor.tools_names = self.__tools_names(tools)

invocation_result = await self.agent_executor.ainvoke(
{
"input": task_prompt,
"tool_names": self.agent_executor.tools_names,
"tools": self.agent_executor.tools_description,
}
)
result = invocation_result["output"]

if self.max_rpm:
self._rpm_controller.stop_rpm_counter()

return result

def set_cache_handler(self, cache_handler: CacheHandler) -> None:
"""Set the cache handler for the agent.

Expand Down Expand Up @@ -303,9 +350,9 @@ def create_agent_executor(self, tools=None) -> None:
}

if self._rpm_controller:
executor_args[
"request_within_rpm_limit"
] = self._rpm_controller.check_or_wait
executor_args["request_within_rpm_limit"] = (
self._rpm_controller.check_or_wait
)

prompt = Prompts(
i18n=self.i18n,
Expand Down
172 changes: 171 additions & 1 deletion src/crewai/agents/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@

from langchain.agents import AgentExecutor
from langchain.agents.agent import ExceptionTool
from langchain.callbacks.manager import CallbackManagerForChainRun
from langchain.callbacks.manager import (
CallbackManagerForChainRun,
AsyncCallbackManagerForChainRun,
)
from langchain_core.agents import AgentAction, AgentFinish, AgentStep
from langchain_core.exceptions import OutputParserException
from langchain_core.pydantic_v1 import root_validator
Expand Down Expand Up @@ -166,6 +169,58 @@ def _call(

return self._return(output, intermediate_steps, run_manager=run_manager)

async def _acall(
self,
inputs: Dict[str, str],
run_manager: Optional[CallbackManagerForChainRun] = None,
) -> Dict[str, Any]:
"""Run text through and get agent response."""
# Construct a mapping of tool name to tool for easy lookup
name_to_tool_map = {tool.name: tool for tool in self.tools}
# We construct a mapping from each tool to a color, used for logging.
color_mapping = get_color_mapping(
[tool.name for tool in self.tools], excluded_colors=["green", "red"]
)
intermediate_steps: List[Tuple[AgentAction, str]] = []
# Let's start tracking the number of iterations and time elapsed
self.iterations = 0
time_elapsed = 0.0
start_time = time.time()
# We now enter the agent loop (until it returns something).
while self._should_continue(self.iterations, time_elapsed):
if not self.request_within_rpm_limit or self.request_within_rpm_limit():
next_step_output = await self._atake_next_step(
name_to_tool_map,
color_mapping,
inputs,
intermediate_steps,
run_manager=run_manager,
)

if self.step_callback:
self.step_callback(next_step_output)

if isinstance(next_step_output, AgentFinish):
return await self._areturn(
next_step_output, intermediate_steps, run_manager=run_manager
)

intermediate_steps.extend(next_step_output)
if len(next_step_output) == 1:
next_step_action = next_step_output[0]
# See if tool should return directly
tool_return = self._get_tool_return(next_step_action)
if tool_return is not None:
return await self._areturn(
tool_return, intermediate_steps, run_manager=run_manager
)
self.iterations += 1
time_elapsed = time.time() - start_time
output = self.agent.return_stopped_response(
self.early_stopping_method, intermediate_steps, **inputs
)
return await self._areturn(output, intermediate_steps, run_manager=run_manager)

def _iter_next_step(
self,
name_to_tool_map: Dict[str, BaseTool],
Expand Down Expand Up @@ -305,3 +360,118 @@ def _ask_human_input(self, final_answer: dict) -> str:
return input(
self._i18n.slice("getting_input").format(final_answer=final_answer)
)

async def _aiter_next_step(
self,
name_to_tool_map: Dict[str, BaseTool],
color_mapping: Dict[str, str],
inputs: Dict[str, str],
intermediate_steps: List[Tuple[AgentAction, str]],
run_manager: Optional[AsyncCallbackManagerForChainRun] = None,
) -> Iterator[Union[AgentFinish, AgentAction, AgentStep]]:
"""Take a single step in the thought-action-observation loop.

Override this to take control of how the agent makes and acts on choices.
"""
try:
if self._should_force_answer():
error = self._i18n.errors("force_final_answer")
output = AgentAction("_Exception", error, error)
self.have_forced_answer = True
yield AgentStep(action=output, observation=error)
return

intermediate_steps = self._prepare_intermediate_steps(intermediate_steps)
# Call the LLM to see what to do.
output = self.agent.plan(
intermediate_steps,
callbacks=run_manager.get_child() if run_manager else None,
**inputs,
)

except OutputParserException as e:
if isinstance(self.handle_parsing_errors, bool):
raise_error = not self.handle_parsing_errors
else:
raise_error = False
if raise_error:
raise ValueError(
"An output parsing error occurred. "
"In order to pass this error back to the agent and have it try "
"again, pass `handle_parsing_errors=True` to the AgentExecutor. "
f"This is the error: {str(e)}"
)
str(e)
if isinstance(self.handle_parsing_errors, bool):
if e.send_to_llm:
observation = f"\n{str(e.observation)}"
str(e.llm_output)
else:
observation = ""
elif isinstance(self.handle_parsing_errors, str):
observation = f"\n{self.handle_parsing_errors}"
elif callable(self.handle_parsing_errors):
observation = f"\n{self.handle_parsing_errors(e)}"
else:
raise ValueError("Got unexpected type of `handle_parsing_errors`")
output = AgentAction("_Exception", observation, "")
if run_manager:
await run_manager.on_agent_action(output, color="green")
tool_run_kwargs = self.agent.tool_run_logging_kwargs()
observation = ExceptionTool().run(
output.tool_input,
verbose=False,
color=None,
callbacks=run_manager.get_child() if run_manager else None,
**tool_run_kwargs,
)

if self._should_force_answer():
error = self._i18n.errors("force_final_answer")
output = AgentAction("_Exception", error, error)
yield AgentStep(action=output, observation=error)
return

yield AgentStep(action=output, observation=observation)
return

# If the tool chosen is the finishing tool, then we end and return.
if isinstance(output, AgentFinish):
yield output
return

actions: List[AgentAction]
actions = [output] if isinstance(output, AgentAction) else output
for action in actions:
yield action
for agent_action in actions:
if run_manager:
await run_manager.on_agent_action(agent_action, color="green")
# Otherwise we lookup the tool
tool_usage = ToolUsage(
tools_handler=self.tools_handler,
tools=self.tools,
original_tools=self.original_tools,
tools_description=self.tools_description,
tools_names=self.tools_names,
function_calling_llm=self.function_calling_llm,
task=self.task,
action=agent_action,
)
tool_calling = tool_usage.parse(agent_action.log)

print(f"Tool Calling: {tool_calling}")
if isinstance(tool_calling, ToolUsageErrorException):
observation = tool_calling.message
else:
if tool_calling.tool_name.lower().strip() in [
name.lower().strip() for name in name_to_tool_map
]:
observation = await tool_usage.ause(tool_calling, agent_action.log)
else:
observation = self._i18n.errors("wrong_tool_name").format(
tool=tool_calling.tool_name,
tools=", ".join([tool.name for tool in self.tools]),
)
print(f"Observation: {observation}")
yield AgentStep(action=agent_action, observation=observation)
91 changes: 91 additions & 0 deletions src/crewai/crew.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,97 @@ def _set_tasks_callbacks(self) -> str:
if not task.callback:
task.callback = self.task_callback

async def akickoff(self, inputs: Optional[Dict[str, Any]] = {}) -> str:
"""Starts the crew to work on its assigned tasks."""
self._execution_span = self._telemetry.crew_execution_span(self)
self._interpolate_inputs(inputs)

for agent in self.agents:
agent.i18n = I18N(language=self.language)

if not agent.function_calling_llm:
agent.function_calling_llm = self.function_calling_llm
agent.create_agent_executor()
if not agent.step_callback:
agent.step_callback = self.step_callback
agent.create_agent_executor()

metrics = []

if self.process == Process.sequential:
result = await self._arun_sequential_process()
elif self.process == Process.hierarchical:
result, manager_metrics = await self._arun_hierarchical_process()
metrics.append(manager_metrics)

else:
raise NotImplementedError(
f"The process '{self.process}' is not implemented yet."
)

metrics = metrics + [
agent._token_process.get_summary() for agent in self.agents
]
self.usage_metrics = {
key: sum([m[key] for m in metrics if m is not None]) for key in metrics[0]
}

return result

async def _arun_sequential_process(self) -> str:
"""Executes tasks sequentially and returns the final output."""
task_output = ""
for task in self.tasks:
if task.agent.allow_delegation:
agents_for_delegation = [
agent for agent in self.agents if agent != task.agent
]
if len(self.agents) > 1 and len(agents_for_delegation) > 0:
task.tools += AgentTools(agents=agents_for_delegation).tools()

role = task.agent.role if task.agent is not None else "None"
self._logger.log("debug", f"== Working Agent: {role}", color="bold_yellow")
self._logger.log(
"info", f"== Starting Task: {task.description}", color="bold_yellow"
)

output = await task.aexecute(context=task_output)
if not task.async_execution:
task_output = output

role = task.agent.role if task.agent is not None else "None"
self._logger.log("debug", f"== [{role}] Task output: {task_output}\n\n")

self._finish_execution(task_output)
return self._format_output(task_output)

async def _arun_hierarchical_process(self) -> str:
"""Creates and assigns a manager agent to make sure the crew completes the tasks."""

i18n = I18N(language=self.language)
manager = Agent(
role=i18n.retrieve("hierarchical_manager_agent", "role"),
goal=i18n.retrieve("hierarchical_manager_agent", "goal"),
backstory=i18n.retrieve("hierarchical_manager_agent", "backstory"),
tools=AgentTools(agents=self.agents).tools(),
llm=self.manager_llm,
verbose=True,
)

task_output = ""
for task in self.tasks:
self._logger.log("debug", f"Working Agent: {manager.role}")
self._logger.log("info", f"Starting Task: {task.description}")

task_output = await task.aexecute(
agent=manager, context=task_output, tools=manager.tools
)

self._logger.log("debug", f"[{manager.role}] Task output: {task_output}")

self._finish_execution(task_output)
return self._format_output(task_output), manager._token_process.get_summary()

def _interpolate_inputs(self, inputs: Dict[str, Any]) -> str:
"""Interpolates the inputs in the tasks and agents."""
[task.interpolate_inputs(inputs) for task in self.tasks]
Expand Down