1. Intro
OpenManus 是一个基于大语言模型的 AI Agent Framework。它实现了一套完整的 Agent 系统,能够接收用户的自然语言指令,通过"思考-行动-观察"循环(ReAct 模式)自主调用各种工具来完成复杂任务。
核心能力包括:
- 通过 LLM 理解用户意图并规划执行步骤
- 调用多种工具(Python 执行、Shell 命令、浏览器操作、网页搜索、文件编辑等)
- 支持单 Agent 直接执行和多 Agent 协同的 Planning 流程
- 支持 MCP(Model Context Protocol)协议,可动态连接外部工具服务
- 支持 Docker 沙箱隔离执行
2. 架构简述
OpenManus 采用分层模块化架构:
┌──────────────────────────────────────────────────────────┐
│ 入口层 (main.py 等) │
└──────────────────────────┬───────────────────────────────┘
│
┌─────────────────┼─────────────────┐
▼ ▼ ▼
┌───────────┐ ┌───────────┐ ┌───────────┐
│ Agent │ │ Flow │ │ Config │
│ 代理模块 │ │ 流程编排 │ │ 配置管理 │
└─────┬─────┘ └─────┬─────┘ └───────────┘
│ │
▼ ▼
┌───────────┐ ┌───────────┐
│ Tool │ │ Prompt │
│ 工具模块 │ │ 提示词 │
└─────┬─────┘ └───────────┘
│
┌─────┴─────┐
│ LLM │ ┌──────────┐
│ 模型封装 │ │ Schema │
└───────────┘ │ 数据模型 │
└──────────┘
各个模块职责如下:
| 模块 | 路径 | 职责 |
|---|---|---|
| Agent(代理) | app/agent/ |
接收用户指令,执行"思考-行动"循环,协调工具调用 |
| Tool(工具) | app/tool/ |
具体功能执行单元(执行代码、搜索、浏览器操作等) |
| Flow(流程) | app/flow/ |
多 Agent 任务编排与调度 |
| LLM(语言模型) | app/llm.py |
封装 LLM API 调用,管理 Token 计数 |
| Schema(数据模型) | app/schema.py |
定义核心数据结构(Message、Memory 等) |
| Config(配置) | app/config.py |
全局配置加载与管理 |
| Prompt(提示词) | app/prompt/ |
各 Agent 的系统提示词模板 |
| Sandbox(沙箱) | app/sandbox/ |
Docker 容器隔离执行环境 |
| MCP(协议) | app/mcp/ |
Model Context Protocol 服务端实现 |
OpenManus 的整体架构特点可以概括为:
- ReAct 循环架构:核心执行模式是 Reasoning + Acting 的循环
- 插件式工具系统:Tool 类似插件,可以独立开发和注册
- 分层继承体系:BaseAgent → ReActAgent → ToolCallAgent → 具体 Agent
- 事件驱动/异步架构:全异步 (async/await) 设计,适合 I/O 密集型操作
单 Agent 模式的执行主线为:
- 入口文件(main.py)通过异步工厂方法创建 Agent 实例
- 调用
agent.run(prompt)进入主循环 - 每个迭代步执行
think()→act()→ 观察结果,循环往复 - 当 Agent 调用终止工具或达到最大步数时退出循环
- 执行资源清理并返回结果
多 Agent 模式则在此基础上增加了一个 Planning 层:先由 LLM 将用户请求拆解为多步计划,再按步骤将子任务分配给不同 Agent 执行。
3. 关键设计
3.1. Agent模块
Agent 模块是OpenManus 最核心的组件,采用了模板方法模式,定义多层继承体系,每一层解决一个特定的抽象问题:
BaseAgent → 执行引擎:循环控制、状态管理、死循环检测
└── ReActAgent → 执行节奏:将 step() 拆分为 think() + act()
└── ToolCallAgent → 执行实现:LLM 交互、工具调用、结果观察
├── Manus → 场景配置:通用 Agent,含 MCP 集成
├── SWEAgent → 场景配置:软件工程 Agent
└── DataAnalysis → 场景配置:数据分析 Agent
3.1.1. BaseAgent
BaseAgent(app/agent/base.py)定义了 Agent 的核心运行时,其 run() 方法构成整个框架的执行骨架,提供step()抽象方法供子类实现:
class BaseAgent(BaseModel, ABC):
max_steps: int = Field(default=10)
current_step: int = Field(default=0)
state: AgentState = Field(default=AgentState.IDLE)
memory: Memory = Field(default_factory=Memory)
duplicate_threshold: int = 2
async def run(self, request: Optional[str] = None) -> str:
"""Execute the agent's main loop asynchronously.
Args:
request: Optional initial user request to process.
Returns:
A string summarizing the execution results.
Raises:
RuntimeError: If the agent is not in IDLE state at start.
"""
if self.state != AgentState.IDLE:
raise RuntimeError(f"Cannot run agent from state: {self.state}")
if request:
self.update_memory("user", request)
results: List[str] = []
async with self.state_context(AgentState.RUNNING):
while (
self.current_step < self.max_steps and self.state != AgentState.FINISHED
):
self.current_step += 1
logger.info(f"Executing step {self.current_step}/{self.max_steps}")
step_result = await self.step()
# Check for stuck state
if self.is_stuck():
self.handle_stuck_state()
results.append(f"Step {self.current_step}: {step_result}")
if self.current_step >= self.max_steps:
self.current_step = 0
self.state = AgentState.IDLE
results.append(f"Terminated: Reached max steps ({self.max_steps})")
await SANDBOX_CLIENT.cleanup()
return "\n".join(results) if results else "No steps executed"
@abstractmethod
async def step(self) -> str:
"""Execute a single step in the agent's workflow.
Must be implemented by subclasses to define specific behavior.
"""
agent状态机与上下文管理器,通过state_context上下文管理器进行状态转换:
@asynccontextmanager
async def state_context(self, new_state: AgentState):
"""Context manager for safe agent state transitions.
Args:
new_state: The state to transition to during the context.
Yields:
None: Allows execution within the new state.
Raises:
ValueError: If the new_state is invalid.
"""
if not isinstance(new_state, AgentState):
raise ValueError(f"Invalid state: {new_state}")
previous_state = self.state
self.state = new_state
try:
yield
except Exception as e:
self.state = AgentState.ERROR # Transition to ERROR on failure
raise e
finally:
self.state = previous_state
is_stuck() 方法通过对 Memory 中的消息序列进行反向扫描,统计与最新 assistant 消息内容完全相同的历史消息数量:
def is_stuck(self) -> bool:
"""Check if the agent is stuck in a loop by detecting duplicate content"""
if len(self.memory.messages) < 2:
return False
last_message = self.memory.messages[-1]
if not last_message.content:
return False
# Count identical content occurrences
duplicate_count = sum(
1
for msg in reversed(self.memory.messages[:-1])
if msg.role == "assistant" and msg.content == last_message.content
)
return duplicate_count >= self.duplicate_threshold
当重复次数达到阈值(默认 2 次)时,handle_stuck_state() 会向 next_step_prompt 注入引导语句,要求 LLM 调整策略。
def handle_stuck_state(self):
"""Handle stuck state by adding a prompt to change strategy"""
stuck_prompt = "\
Observed duplicate responses. Consider new strategies and avoid repeating ineffective paths already attempted."
self.next_step_prompt = f"{stuck_prompt}\n{self.next_step_prompt}"
logger.warning(f"Agent detected stuck state. Added prompt: {stuck_prompt}")
3.1.2. ReactAgent
ReActAgent继承自 BaseAgent ,将 step() 拆分为 think() 和 act() 两个阶段,确立了 ReAct(Reasoning + Acting)模式的执行节奏,这一层的设计仅定义了think->act的执行顺序约束,不涉及任何具体实现:
class ReActAgent(BaseAgent, ABC):
async def step(self) -> str:
should_act = await self.think()
if not should_act:
return "Thinking complete - no action needed"
return await self.act()
@abstractmethod
async def think(self) -> bool: ...
@abstractmethod
async def act(self) -> str: ...
3.1.3. ToolCallAgent
ToolCallAgent是整个继承链中代码量最大、逻辑最密集的一层,think()将对话历史和可用工具的JSON schema发送给LLM,并解析返回的工具调用列表:
async def think(self) -> bool:
if self.next_step_prompt:
self.messages += [Message.user_message(self.next_step_prompt)]
try:
response = await self.llm.ask_tool(
messages=self.messages,
system_msgs=[Message.system_message(self.system_prompt)] if self.system_prompt else None,
tools=self.available_tools.to_params(),
tool_choice=self.tool_choices,
)
except Exception as e:
if hasattr(e, "__cause__") and isinstance(e.__cause__, TokenLimitExceeded):
self.state = AgentState.FINISHED
return False
raise
self.tool_calls = response.tool_calls if response and response.tool_calls else []
# ... 将 assistant 消息加入 Memory
return bool(self.tool_calls)
act()通过遍历tool_calls列表,逐个执行工具:
async def act(self) -> str:
results = []
for command in self.tool_calls:
self._current_base64_image = None
result = await self.execute_tool(command)
if self.max_observe:
result = result[: self.max_observe] # 截断过长的观察结果
tool_msg = Message.tool_message(
content=result, tool_call_id=command.id, name=command.function.name,
base64_image=self._current_base64_image,
)
self.memory.add_message(tool_msg)
results.append(result)
return "\n\n".join(results)
execute_tool() 方法封装了工具执行的完整生命周期:参数解析(JSON 反序列化)→ 工具查找(通过 ToolCollection 的名称映射)→ 异步执行 → 特殊工具处理 → 结果格式化:
async def execute_tool(self, command: ToolCall) -> str:
name = command.function.name
args = json.loads(command.function.arguments or "{}")
result = await self.available_tools.execute(name=name, tool_input=args)
await self._handle_special_tool(name=name, result=result)
return f"Observed output of cmd `{name}` executed:\n{str(result)}" if result else ...
ToolCallAgent 覆写了 run() 方法,在 finally 块中调用 cleanup(),遍历所有注册工具并调用其各自的清理方法(如关闭浏览器、断开 MCP 连接等):
async def cleanup(self):
for tool_name, tool_instance in self.available_tools.tool_map.items():
if hasattr(tool_instance, "cleanup") and asyncio.iscoroutinefunction(tool_instance.cleanup):
await tool_instance.cleanup()
3.1.4. Manus通用Agent
Manus(app/agent/manus.py)是面向用户的主力 Agent。主要解决两个问题:工具集组装和 MCP 集成。Manus重写了think()方法,引入浏览器上下文感知,如果检测到最近的工具调用中包含浏览器操作,自动注入页面信息到next_step_prompt中
async def think(self) -> bool:
"""Process current state and decide next actions with appropriate context."""
if not self._initialized:
await self.initialize_mcp_servers()
self._initialized = True
original_prompt = self.next_step_prompt
recent_messages = self.memory.messages[-3:] if self.memory.messages else []
browser_in_use = any(
tc.function.name == BrowserUseTool().name
for msg in recent_messages
if msg.tool_calls
for tc in msg.tool_calls
)
if browser_in_use:
self.next_step_prompt = (
await self.browser_context_helper.format_next_step_prompt()
)
result = await super().think()
# Restore original prompt
self.next_step_prompt = original_prompt
return result
工具集成通过声明式配置完成:
class Manus(ToolCallAgent):
available_tools: ToolCollection = Field(
default_factory=lambda: ToolCollection(
PythonExecute(), BrowserUseTool(), StrReplaceEditor(), AskHuman(), Terminate(),
)
)
max_observe: int = 10000
max_steps: int = 20
MCP通过异步工厂方法实现,其中initialize_mcp_servers() 遍历配置文件中的 MCP 服务器列表,根据传输类型(SSE 或 stdio)建立连接,并将远程工具注册到本地的 available_tools 中。这使得 Manus 的能力集在运行时可以动态扩展。
@classmethod
async def create(cls, **kwargs) -> "Manus":
instance = cls(**kwargs)
await instance.initialize_mcp_servers()
return instance
3.2. Tool模块
Tool 为 Agent 提供了与外部世界交互的能力。其设计核心是建立一套标准化的接口协议,使 LLM 能够理解、选择和调用任意工具。
3.2.1. BaseTool & ToolResult
BaseTool是工具基类,提供了 execute()抽象方法给子类实现。
class BaseTool(ABC, BaseModel):
name: str # 工具标识符
description: str # 功能描述,供 LLM 理解工具用途
parameters: Optional[dict] # 参数定义,遵循 JSON Schema 规范
@abstractmethod
async def execute(self, **kwargs) -> Any: ...
def to_param(self) -> Dict:
return {
"type": "function",
"function": {"name": self.name, "description": self.description, "parameters": self.parameters},
}
ToolResult 是Tool执行结果的标准化封装,其中 base64_image 字段支持工具返回视觉信息(如浏览器截图),由 Agent 在后续轮次中传递给多模态 LLM 进行视觉推理。:
class ToolResult(BaseModel):
output: Any = Field(default=None)
error: Optional[str] = Field(default=None)
base64_image: Optional[str] = Field(default=None)
system: Optional[str] = Field(default=None)
3.2.2. ToolCollection
ToolCollection通过名称映射管理一组工具,to_params() 批量导出工具的 JSON Schema 列表,在 think() 阶段随消息一同发送给 LLM。execute() 根据工具名查找实例并调用其 __call__ 方法(最终委托到 execute())。add_tool() 和 add_tools() 支持运行时动态注册工具,这为 MCP 集成提供了基础设施。
class ToolCollection:
def __init__(self, *tools: BaseTool):
self.tools = tools
self.tool_map = {tool.name: tool for tool in tools}
def to_params(self) -> List[Dict[str, Any]]:
return [tool.to_param() for tool in self.tools]
async def execute(self, *, name: str, tool_input: Dict[str, Any] = None) -> ToolResult:
tool = self.tool_map.get(name)
if not tool:
return ToolFailure(error=f"Tool {name} is invalid")
return await tool(**tool_input)
3.2.3. WebSearch Tool
WebSearch工具样例:
class WebSearch(BaseTool):
_search_engine: dict = {
"google": GoogleSearchEngine(),
"baidu": BaiduSearchEngine(),
"duckduckgo": DuckDuckGoSearchEngine(),
"bing": BingSearchEngine(),
}
async def execute(self, query, num_results=5, ...):
for retry_count in range(max_retries + 1):
results = await self._try_all_engines(query, num_results, search_params)
if results:
return SearchResponse(query=query, results=results, ...)
if retry_count < max_retries:
await asyncio.sleep(retry_delay)
return SearchResponse(query=query, error="All search engines failed...")
3.3. LLM封装层
LLM 类(app/llm.py)是与大语言模型交互的统一抽象层,其设计解决了三个核心问题:多 Provider 适配、Token 精算与限额管控、以及调用可靠性。
class LLM:
_instances: Dict[str, "LLM"] = {}
def __new__(cls, config_name: str = "default", llm_config=None):
if config_name not in cls._instances:
instance = super().__new__(cls)
instance.__init__(config_name, llm_config)
cls._instances[config_name] = instance
return cls._instances[config_name]
def __init__(self, config_name: str = "default", llm_config=None):
if not hasattr(self, "client"):
# 初始化逻辑...
LLM 采用了按 key 缓存的单例模式:相同配置名返回同一实例,不同配置名创建独立实例。这允许不同 Agent 使用不同的模型配置(如 Manus 使用 GPT-4o、DataAnalysis 使用专用模型),同时避免重复创建客户端。__init__ 中的 if not hasattr(self, "client") 防护是必要的——Python 的 __new__ 返回已有实例后,__init__ 仍会被调用。
3.3.1. Token计算
TokenCounter 类实现了精确的 Token 计算,覆盖文本、图像和工具调用三种场景:
class TokenCounter:
BASE_MESSAGE_TOKENS = 4
FORMAT_TOKENS = 2
LOW_DETAIL_IMAGE_TOKENS = 85
HIGH_DETAIL_TILE_TOKENS = 170
def count_message_tokens(self, messages: List[dict]) -> int:
total_tokens = self.FORMAT_TOKENS
for message in messages:
tokens = self.BASE_MESSAGE_TOKENS
tokens += self.count_text(message.get("role", ""))
if "content" in message:
tokens += self.count_content(message["content"])
if "tool_calls" in message:
tokens += self.count_tool_calls(message["tool_calls"])
total_tokens += tokens
return total_tokens
在 ask_tool() 方法中,Token 计算不仅覆盖消息列表,还包括工具描述的 Token 消耗:
input_tokens = self.count_message_tokens(messages)
if tools:
for tool in tools:
tools_tokens += self.count_tokens(str(tool))
input_tokens += tools_tokens
这意味着注册的工具数量越多,实际可用于对话的上下文窗口越小。这一约束在系统设计层面需要权衡。
3.3.2. API重试策略
LLM 的所有公开方法均使用 tenacity 的 @retry 装饰器实现可靠调用:
@retry(
wait=wait_random_exponential(min=1, max=60),
stop=stop_after_attempt(6),
retry=retry_if_exception_type((OpenAIError, Exception, ValueError)),
)
async def ask_tool(self, ...):
...
3.4. 关键数据模型
3.4.1. Message
Message(app/schema.py)是贯穿整个系统的数据流载体:
class Message(BaseModel):
role: ROLE_TYPE # system | user | assistant | tool
content: Optional[str] = None
tool_calls: Optional[List[ToolCall]] = None # LLM 发起的工具调用请求
tool_call_id: Optional[str] = None # 工具调用 ID(tool 角色使用)
base64_image: Optional[str] = None # 图像数据
四种角色的消息分别承载不同的语义:
- system:系统提示词,定义 Agent 的身份和能力边界
- user:用户输入或 Agent 的
next_step_prompt引导 - assistant:LLM 的推理输出,可能包含
tool_calls - tool:工具执行结果,通过
tool_call_id与对应的工具调用请求关联
Message 提供了一组工厂方法(user_message()、system_message()、tool_message() 等)简化实例创建,并通过 __add__ / __radd__ 运算符重载支持消息的拼接操作。
3.4.2. Memory
Memory 采用固定长度滑动窗口管理对话历史。当消息数量超过阈值时,最早的消息被截断丢弃。这是一种简洁有效的上下文管理策略,但也意味着在长任务中,早期的关键信息可能丢失。
class Memory(BaseModel):
messages: List[Message] = Field(default_factory=list)
max_messages: int = Field(default=100)
def add_message(self, message: Message) -> None:
self.messages.append(message)
if len(self.messages) > self.max_messages:
self.messages = self.messages[-self.max_messages:]
3.5. Flow设计
PlanningFlow(app/flow/planning.py)实现了"先规划、再执行"的多 Agent 协同模式。其执行流程分为四个阶段:
阶段一:计划创建。调用 LLM 将用户请求分解为若干步骤,通过 PlanningTool 以结构化方式存储:
async def _create_initial_plan(self, request: str) -> None:
system_message = Message.system_message(
"You are a planning assistant. Create a concise, actionable plan with clear steps..."
)
user_message = Message.user_message(f"Create a reasonable plan...: {request}")
response = await self.llm.ask_tool(
messages=[user_message], system_msgs=[system_message],
tools=[self.planning_tool.to_param()], tool_choice=ToolChoice.AUTO,
)
# 解析 tool_calls,将计划存入 planning_tool.plans 字典
阶段二:步骤调度。主循环查找第一个未完成的步骤,根据步骤类型选择合适的 Agent:
async def execute(self, input_text: str) -> str:
await self._create_initial_plan(input_text)
while True:
self.current_step_index, step_info = await self._get_current_step_info()
if self.current_step_index is None:
break
executor = self.get_executor(step_info.get("type"))
step_result = await self._execute_step(executor, step_info)
阶段三:步骤执行。将当前计划的全局状态和当前步骤的具体任务组合为 prompt,调用选中 Agent 的 run() 方法:
async def _execute_step(self, executor: BaseAgent, step_info: dict) -> str:
plan_status = await self._get_plan_text()
step_prompt = f"""
CURRENT PLAN STATUS:
{plan_status}
YOUR CURRENT TASK:
You are now working on step {self.current_step_index}: "{step_text}"
"""
step_result = await executor.run(step_prompt)
await self._mark_step_completed()
return step_result
阶段四:计划总结。所有步骤完成后,调用 LLM 生成执行摘要。
3.6. MCP集成
MCP(Model Context Protocol)集成使 OpenManus 的工具集可以在运行时动态扩展,MCPClients(app/tool/mcp.py)继承自 ToolCollection,管理与多个 MCP 服务器的连接,并将远程工具注册为本地可用:
async def _initialize_and_list_tools(self, server_id: str) -> None:
session = self.sessions[server_id]
await session.initialize()
response = await session.list_tools()
for tool in response.tools:
tool_name = self._sanitize_tool_name(f"mcp_{server_id}_{tool.name}")
server_tool = MCPClientTool(
name=tool_name, description=tool.description,
parameters=tool.inputSchema, session=session, server_id=server_id,
original_name=tool.name,
)
self.tool_map[tool_name] = server_tool
self.tools = tuple(self.tool_map.values())
MCPClientTool 作为代理类,将工具执行请求转发到远程 MCP 服务器,工具名称经过 _sanitize_tool_name() 处理,添加 mcp_{server_id}_ 前缀以避免命名冲突,并截断至 64 字符以满足 API 约束。:
class MCPClientTool(BaseTool):
async def execute(self, **kwargs) -> ToolResult:
if not self.session:
return ToolResult(error="Not connected to MCP server")
result = await self.session.call_tool(self.original_name, kwargs)
content_str = ", ".join(item.text for item in result.content if isinstance(item, TextContent))
return ToolResult(output=content_str or "No output returned.")
4. 总结
4.1. 架构优势
-
清晰的分层抽象。四层 Agent 继承体系(BaseAgent → ReActAgent → ToolCallAgent → 具体 Agent)将执行控制、执行节奏、执行实现和场景配置严格分离。每一层只回答一个问题,职责边界清晰。
-
标准化的工具协议。BaseTool 通过
name/description/parameters三元组 + JSON Schema 建立了工具与 LLM 之间的通信契约。工具的开发、注册和调用完全解耦,开发者无需理解 Agent 的内部机制即可编写新工具。 -
全异步架构。从入口到底层工具执行,全程使用
async/await,适配 Agent 系统天然的 I/O 密集型工作负载(LLM API 调用、网络请求、浏览器操作等),避免阻塞主线程。 -
健壮的容错设计。LLM 调用的指数退避重试、搜索引擎的多路回退策略、Token 限额的精确管控、死循环检测与自然语言干预,这些机制共同保障了系统在真实环境下的可靠运行。
-
可扩展的能力边界。通过 MCP 协议支持运行时动态发现和注册远程工具,Agent 的能力不再局限于本地安装的工具集。ToolCollection 的动态增删接口为此提供了基础设施。
4.2. 改进方向
- Memory 管理机制较为粗糙。当前采用固定数量(100 条)的滑动窗口截断消息,未考虑消息的 Token 权重差异。一条包含大量工具输出的消息和一条简短的 user 消息被同等对待。
- 工具调用的串行执行瓶颈。
ToolCallAgent.act()中的多个工具调用按顺序逐一执行。当 LLM 一次返回多个相互独立的工具调用时(如同时搜索多个关键词),串行执行造成不必要的等待。可以通过分析工具调用间的依赖关系,对独立调用使用asyncio.gather()实现并行化。