1. Intro

OpenManus 是一个基于大语言模型的 AI Agent Framework。它实现了一套完整的 Agent 系统,能够接收用户的自然语言指令,通过"思考-行动-观察"循环(ReAct 模式)自主调用各种工具来完成复杂任务。

核心能力包括

  • 通过 LLM 理解用户意图并规划执行步骤
  • 调用多种工具(Python 执行、Shell 命令、浏览器操作、网页搜索、文件编辑等)
  • 支持单 Agent 直接执行和多 Agent 协同的 Planning 流程
  • 支持 MCP(Model Context Protocol)协议,可动态连接外部工具服务
  • 支持 Docker 沙箱隔离执行

2. 架构简述

OpenManus 采用分层模块化架构

┌──────────────────────────────────────────────────────────┐
│                     入口层 (main.py 等)                   │
└──────────────────────────┬───────────────────────────────┘
                           │
         ┌─────────────────┼─────────────────┐
         ▼                 ▼                 ▼
   ┌───────────┐    ┌───────────┐    ┌───────────┐
   │   Agent   │    │   Flow    │    │  Config   │
   │  代理模块  │    │  流程编排  │    │  配置管理  │
   └─────┬─────┘    └─────┬─────┘    └───────────┘
         │                │
         ▼                ▼
   ┌───────────┐    ┌───────────┐
   │   Tool    │    │  Prompt   │
   │  工具模块  │    │  提示词    │
   └─────┬─────┘    └───────────┘
         │
   ┌─────┴─────┐
   │    LLM    │        ┌──────────┐
   │  模型封装  │        │  Schema  │
   └───────────┘        │  数据模型 │
                        └──────────┘

各个模块职责如下:

模块 路径 职责
Agent(代理) app/agent/ 接收用户指令,执行"思考-行动"循环,协调工具调用
Tool(工具) app/tool/ 具体功能执行单元(执行代码、搜索、浏览器操作等)
Flow(流程) app/flow/ 多 Agent 任务编排与调度
LLM(语言模型) app/llm.py 封装 LLM API 调用,管理 Token 计数
Schema(数据模型) app/schema.py 定义核心数据结构(Message、Memory 等)
Config(配置) app/config.py 全局配置加载与管理
Prompt(提示词) app/prompt/ 各 Agent 的系统提示词模板
Sandbox(沙箱) app/sandbox/ Docker 容器隔离执行环境
MCP(协议) app/mcp/ Model Context Protocol 服务端实现

OpenManus 的整体架构特点可以概括为:

  • ReAct 循环架构:核心执行模式是 Reasoning + Acting 的循环
  • 插件式工具系统:Tool 类似插件,可以独立开发和注册
  • 分层继承体系:BaseAgent → ReActAgent → ToolCallAgent → 具体 Agent
  • 事件驱动/异步架构:全异步 (async/await) 设计,适合 I/O 密集型操作

单 Agent 模式的执行主线为:

  1. 入口文件(main.py)通过异步工厂方法创建 Agent 实例
  2. 调用 agent.run(prompt) 进入主循环
  3. 每个迭代步执行 think()act() → 观察结果,循环往复
  4. 当 Agent 调用终止工具或达到最大步数时退出循环
  5. 执行资源清理并返回结果

多 Agent 模式则在此基础上增加了一个 Planning 层:先由 LLM 将用户请求拆解为多步计划,再按步骤将子任务分配给不同 Agent 执行。

3. 关键设计

3.1. Agent模块

Agent 模块是OpenManus 最核心的组件,采用了模板方法模式,定义多层继承体系,每一层解决一个特定的抽象问题:

BaseAgent          → 执行引擎:循环控制、状态管理、死循环检测
    └── ReActAgent     → 执行节奏:将 step() 拆分为 think() + act()
        └── ToolCallAgent  → 执行实现:LLM 交互、工具调用、结果观察
            ├── Manus          → 场景配置:通用 Agent,含 MCP 集成
            ├── SWEAgent       → 场景配置:软件工程 Agent
            └── DataAnalysis   → 场景配置:数据分析 Agent

3.1.1. BaseAgent

BaseAgentapp/agent/base.py)定义了 Agent 的核心运行时,其 run() 方法构成整个框架的执行骨架,提供step()抽象方法供子类实现:

class BaseAgent(BaseModel, ABC):
    max_steps: int = Field(default=10)
    current_step: int = Field(default=0)
    state: AgentState = Field(default=AgentState.IDLE)
    memory: Memory = Field(default_factory=Memory)
    duplicate_threshold: int = 2

    async def run(self, request: Optional[str] = None) -> str:
        """Execute the agent's main loop asynchronously.

        Args:
            request: Optional initial user request to process.

        Returns:
            A string summarizing the execution results.

        Raises:
            RuntimeError: If the agent is not in IDLE state at start.
        """
        if self.state != AgentState.IDLE:
            raise RuntimeError(f"Cannot run agent from state: {self.state}")

        if request:
            self.update_memory("user", request)

        results: List[str] = []
        async with self.state_context(AgentState.RUNNING):
            while (
                self.current_step < self.max_steps and self.state != AgentState.FINISHED
            ):
                self.current_step += 1
                logger.info(f"Executing step {self.current_step}/{self.max_steps}")
                step_result = await self.step()

                # Check for stuck state
                if self.is_stuck():
                    self.handle_stuck_state()

                results.append(f"Step {self.current_step}: {step_result}")

            if self.current_step >= self.max_steps:
                self.current_step = 0
                self.state = AgentState.IDLE
                results.append(f"Terminated: Reached max steps ({self.max_steps})")
        await SANDBOX_CLIENT.cleanup()
        return "\n".join(results) if results else "No steps executed"

    @abstractmethod
    async def step(self) -> str:
        """Execute a single step in the agent's workflow.

        Must be implemented by subclasses to define specific behavior.
        """

agent状态机与上下文管理器,通过state_context上下文管理器进行状态转换:

@asynccontextmanager
    async def state_context(self, new_state: AgentState):
        """Context manager for safe agent state transitions.

        Args:
            new_state: The state to transition to during the context.

        Yields:
            None: Allows execution within the new state.

        Raises:
            ValueError: If the new_state is invalid.
        """
        if not isinstance(new_state, AgentState):
            raise ValueError(f"Invalid state: {new_state}")

        previous_state = self.state
        self.state = new_state
        try:
            yield
        except Exception as e:
            self.state = AgentState.ERROR  # Transition to ERROR on failure
            raise e
        finally:
            self.state = previous_state

is_stuck() 方法通过对 Memory 中的消息序列进行反向扫描,统计与最新 assistant 消息内容完全相同的历史消息数量:

def is_stuck(self) -> bool:
    """Check if the agent is stuck in a loop by detecting duplicate content"""
    if len(self.memory.messages) < 2:
        return False

    last_message = self.memory.messages[-1]
    if not last_message.content:
        return False

    # Count identical content occurrences
    duplicate_count = sum(
        1
        for msg in reversed(self.memory.messages[:-1])
        if msg.role == "assistant" and msg.content == last_message.content
    )

    return duplicate_count >= self.duplicate_threshold

当重复次数达到阈值(默认 2 次)时,handle_stuck_state() 会向 next_step_prompt 注入引导语句,要求 LLM 调整策略。

def handle_stuck_state(self):
        """Handle stuck state by adding a prompt to change strategy"""
        stuck_prompt = "\
        Observed duplicate responses. Consider new strategies and avoid repeating ineffective paths already attempted."
        self.next_step_prompt = f"{stuck_prompt}\n{self.next_step_prompt}"
        logger.warning(f"Agent detected stuck state. Added prompt: {stuck_prompt}")

3.1.2. ReactAgent

ReActAgent继承自 BaseAgent ,将 step() 拆分为 think()act() 两个阶段,确立了 ReAct(Reasoning + Acting)模式的执行节奏,这一层的设计仅定义了think->act的执行顺序约束,不涉及任何具体实现:

class ReActAgent(BaseAgent, ABC):
    async def step(self) -> str:
        should_act = await self.think()
        if not should_act:
            return "Thinking complete - no action needed"
        return await self.act()

    @abstractmethod
    async def think(self) -> bool: ...
    @abstractmethod
    async def act(self) -> str: ...

3.1.3. ToolCallAgent

ToolCallAgent是整个继承链中代码量最大、逻辑最密集的一层,think()将对话历史和可用工具的JSON schema发送给LLM,并解析返回的工具调用列表:

async def think(self) -> bool:
    if self.next_step_prompt:
        self.messages += [Message.user_message(self.next_step_prompt)]
    try:
        response = await self.llm.ask_tool(
            messages=self.messages,
            system_msgs=[Message.system_message(self.system_prompt)] if self.system_prompt else None,
            tools=self.available_tools.to_params(),
            tool_choice=self.tool_choices,
        )
    except Exception as e:
        if hasattr(e, "__cause__") and isinstance(e.__cause__, TokenLimitExceeded):
            self.state = AgentState.FINISHED
            return False
        raise
    self.tool_calls = response.tool_calls if response and response.tool_calls else []
    # ... 将 assistant 消息加入 Memory
    return bool(self.tool_calls)

act()通过遍历tool_calls列表,逐个执行工具:

async def act(self) -> str:
    results = []
    for command in self.tool_calls:
        self._current_base64_image = None
        result = await self.execute_tool(command)
        if self.max_observe:
            result = result[: self.max_observe]        # 截断过长的观察结果
        tool_msg = Message.tool_message(
            content=result, tool_call_id=command.id, name=command.function.name,
            base64_image=self._current_base64_image,
        )
        self.memory.add_message(tool_msg)
        results.append(result)
    return "\n\n".join(results)

execute_tool() 方法封装了工具执行的完整生命周期:参数解析(JSON 反序列化)→ 工具查找(通过 ToolCollection 的名称映射)→ 异步执行 → 特殊工具处理 → 结果格式化:

async def execute_tool(self, command: ToolCall) -> str:
    name = command.function.name
    args = json.loads(command.function.arguments or "{}")
    result = await self.available_tools.execute(name=name, tool_input=args)
    await self._handle_special_tool(name=name, result=result)
    return f"Observed output of cmd `{name}` executed:\n{str(result)}" if result else ...

ToolCallAgent 覆写了 run() 方法,在 finally 块中调用 cleanup(),遍历所有注册工具并调用其各自的清理方法(如关闭浏览器、断开 MCP 连接等):

async def cleanup(self):
    for tool_name, tool_instance in self.available_tools.tool_map.items():
        if hasattr(tool_instance, "cleanup") and asyncio.iscoroutinefunction(tool_instance.cleanup):
            await tool_instance.cleanup()

3.1.4. Manus通用Agent

Manusapp/agent/manus.py)是面向用户的主力 Agent。主要解决两个问题:工具集组装MCP 集成。Manus重写了think()方法,引入浏览器上下文感知,如果检测到最近的工具调用中包含浏览器操作,自动注入页面信息到next_step_prompt中

async def think(self) -> bool:
        """Process current state and decide next actions with appropriate context."""
        if not self._initialized:
            await self.initialize_mcp_servers()
            self._initialized = True

        original_prompt = self.next_step_prompt
        recent_messages = self.memory.messages[-3:] if self.memory.messages else []
        browser_in_use = any(
            tc.function.name == BrowserUseTool().name
            for msg in recent_messages
            if msg.tool_calls
            for tc in msg.tool_calls
        )

        if browser_in_use:
            self.next_step_prompt = (
                await self.browser_context_helper.format_next_step_prompt()
            )

        result = await super().think()

        # Restore original prompt
        self.next_step_prompt = original_prompt

        return result

工具集成通过声明式配置完成:

class Manus(ToolCallAgent):
    available_tools: ToolCollection = Field(
        default_factory=lambda: ToolCollection(
            PythonExecute(), BrowserUseTool(), StrReplaceEditor(), AskHuman(), Terminate(),
        )
    )
    max_observe: int = 10000
    max_steps: int = 20

MCP通过异步工厂方法实现,其中initialize_mcp_servers() 遍历配置文件中的 MCP 服务器列表,根据传输类型(SSE 或 stdio)建立连接,并将远程工具注册到本地的 available_tools 中。这使得 Manus 的能力集在运行时可以动态扩展。

@classmethod
async def create(cls, **kwargs) -> "Manus":
    instance = cls(**kwargs)
    await instance.initialize_mcp_servers()
    return instance

3.2. Tool模块

Tool 为 Agent 提供了与外部世界交互的能力。其设计核心是建立一套标准化的接口协议,使 LLM 能够理解、选择和调用任意工具。

3.2.1. BaseTool & ToolResult

BaseTool是工具基类,提供了 execute()抽象方法给子类实现。

class BaseTool(ABC, BaseModel):
    name: str                       # 工具标识符
    description: str                # 功能描述,供 LLM 理解工具用途
    parameters: Optional[dict]      # 参数定义,遵循 JSON Schema 规范

    @abstractmethod
    async def execute(self, **kwargs) -> Any: ...

    def to_param(self) -> Dict:
        return {
            "type": "function",
            "function": {"name": self.name, "description": self.description, "parameters": self.parameters},
    }

ToolResult 是Tool执行结果的标准化封装,其中 base64_image 字段支持工具返回视觉信息(如浏览器截图),由 Agent 在后续轮次中传递给多模态 LLM 进行视觉推理。:

class ToolResult(BaseModel):
    output: Any = Field(default=None)
    error: Optional[str] = Field(default=None)
    base64_image: Optional[str] = Field(default=None)
    system: Optional[str] = Field(default=None)

3.2.2. ToolCollection

ToolCollection通过名称映射管理一组工具,to_params() 批量导出工具的 JSON Schema 列表,在 think() 阶段随消息一同发送给 LLM。execute() 根据工具名查找实例并调用其 __call__ 方法(最终委托到 execute())。add_tool()add_tools() 支持运行时动态注册工具,这为 MCP 集成提供了基础设施。

class ToolCollection:
    def __init__(self, *tools: BaseTool):
        self.tools = tools
        self.tool_map = {tool.name: tool for tool in tools}

    def to_params(self) -> List[Dict[str, Any]]:
        return [tool.to_param() for tool in self.tools]

    async def execute(self, *, name: str, tool_input: Dict[str, Any] = None) -> ToolResult:
        tool = self.tool_map.get(name)
        if not tool:
            return ToolFailure(error=f"Tool {name} is invalid")
        return await tool(**tool_input)

3.2.3. WebSearch Tool

WebSearch工具样例:

class WebSearch(BaseTool):
    _search_engine: dict = {
        "google": GoogleSearchEngine(),
        "baidu": BaiduSearchEngine(),
        "duckduckgo": DuckDuckGoSearchEngine(),
        "bing": BingSearchEngine(),
    }

    async def execute(self, query, num_results=5, ...):
        for retry_count in range(max_retries + 1):
            results = await self._try_all_engines(query, num_results, search_params)
            if results:
                return SearchResponse(query=query, results=results, ...)
            if retry_count < max_retries:
                await asyncio.sleep(retry_delay)
        return SearchResponse(query=query, error="All search engines failed...")

3.3. LLM封装层

LLM 类(app/llm.py)是与大语言模型交互的统一抽象层,其设计解决了三个核心问题:多 Provider 适配、Token 精算与限额管控、以及调用可靠性。

class LLM:
    _instances: Dict[str, "LLM"] = {}

    def __new__(cls, config_name: str = "default", llm_config=None):
        if config_name not in cls._instances:
            instance = super().__new__(cls)
            instance.__init__(config_name, llm_config)
            cls._instances[config_name] = instance
        return cls._instances[config_name]

    def __init__(self, config_name: str = "default", llm_config=None):
        if not hasattr(self, "client"):
            # 初始化逻辑...

LLM 采用了按 key 缓存的单例模式:相同配置名返回同一实例,不同配置名创建独立实例。这允许不同 Agent 使用不同的模型配置(如 Manus 使用 GPT-4o、DataAnalysis 使用专用模型),同时避免重复创建客户端。__init__ 中的 if not hasattr(self, "client") 防护是必要的——Python 的 __new__ 返回已有实例后,__init__ 仍会被调用。

3.3.1. Token计算

TokenCounter 类实现了精确的 Token 计算,覆盖文本、图像和工具调用三种场景:

class TokenCounter:
    BASE_MESSAGE_TOKENS = 4
    FORMAT_TOKENS = 2
    LOW_DETAIL_IMAGE_TOKENS = 85
    HIGH_DETAIL_TILE_TOKENS = 170

    def count_message_tokens(self, messages: List[dict]) -> int:
        total_tokens = self.FORMAT_TOKENS
        for message in messages:
            tokens = self.BASE_MESSAGE_TOKENS
            tokens += self.count_text(message.get("role", ""))
            if "content" in message:
                tokens += self.count_content(message["content"])
            if "tool_calls" in message:
                tokens += self.count_tool_calls(message["tool_calls"])
            total_tokens += tokens
        return total_tokens

ask_tool() 方法中,Token 计算不仅覆盖消息列表,还包括工具描述的 Token 消耗

input_tokens = self.count_message_tokens(messages)
if tools:
    for tool in tools:
        tools_tokens += self.count_tokens(str(tool))
input_tokens += tools_tokens

这意味着注册的工具数量越多,实际可用于对话的上下文窗口越小。这一约束在系统设计层面需要权衡。

3.3.2. API重试策略

LLM 的所有公开方法均使用 tenacity 的 @retry 装饰器实现可靠调用:

@retry(
    wait=wait_random_exponential(min=1, max=60),
    stop=stop_after_attempt(6),
    retry=retry_if_exception_type((OpenAIError, Exception, ValueError)),
)
async def ask_tool(self, ...):
    ...

3.4. 关键数据模型

3.4.1. Message

Messageapp/schema.py)是贯穿整个系统的数据流载体:

class Message(BaseModel):
    role: ROLE_TYPE                              # system | user | assistant | tool
    content: Optional[str] = None
    tool_calls: Optional[List[ToolCall]] = None  # LLM 发起的工具调用请求
    tool_call_id: Optional[str] = None           # 工具调用 ID(tool 角色使用)
    base64_image: Optional[str] = None           # 图像数据

四种角色的消息分别承载不同的语义:

  • system:系统提示词,定义 Agent 的身份和能力边界
  • user:用户输入或 Agent 的 next_step_prompt 引导
  • assistant:LLM 的推理输出,可能包含 tool_calls
  • tool:工具执行结果,通过 tool_call_id 与对应的工具调用请求关联

Message 提供了一组工厂方法(user_message()system_message()tool_message() 等)简化实例创建,并通过 __add__ / __radd__ 运算符重载支持消息的拼接操作。

3.4.2. Memory

Memory 采用固定长度滑动窗口管理对话历史。当消息数量超过阈值时,最早的消息被截断丢弃。这是一种简洁有效的上下文管理策略,但也意味着在长任务中,早期的关键信息可能丢失。

class Memory(BaseModel):
    messages: List[Message] = Field(default_factory=list)
    max_messages: int = Field(default=100)

    def add_message(self, message: Message) -> None:
        self.messages.append(message)
        if len(self.messages) > self.max_messages:
            self.messages = self.messages[-self.max_messages:]

3.5. Flow设计

PlanningFlowapp/flow/planning.py)实现了"先规划、再执行"的多 Agent 协同模式。其执行流程分为四个阶段:

阶段一:计划创建。调用 LLM 将用户请求分解为若干步骤,通过 PlanningTool 以结构化方式存储:

async def _create_initial_plan(self, request: str) -> None:
    system_message = Message.system_message(
        "You are a planning assistant. Create a concise, actionable plan with clear steps..."
    )
    user_message = Message.user_message(f"Create a reasonable plan...: {request}")
    response = await self.llm.ask_tool(
        messages=[user_message], system_msgs=[system_message],
        tools=[self.planning_tool.to_param()], tool_choice=ToolChoice.AUTO,
    )
    # 解析 tool_calls,将计划存入 planning_tool.plans 字典

阶段二:步骤调度。主循环查找第一个未完成的步骤,根据步骤类型选择合适的 Agent:

async def execute(self, input_text: str) -> str:
    await self._create_initial_plan(input_text)
    while True:
        self.current_step_index, step_info = await self._get_current_step_info()
        if self.current_step_index is None:
            break
        executor = self.get_executor(step_info.get("type"))
        step_result = await self._execute_step(executor, step_info)

阶段三:步骤执行。将当前计划的全局状态和当前步骤的具体任务组合为 prompt,调用选中 Agent 的 run() 方法:

async def _execute_step(self, executor: BaseAgent, step_info: dict) -> str:
    plan_status = await self._get_plan_text()
    step_prompt = f"""
    CURRENT PLAN STATUS:
    {plan_status}
    YOUR CURRENT TASK:
    You are now working on step {self.current_step_index}: "{step_text}"
    """
    step_result = await executor.run(step_prompt)
    await self._mark_step_completed()
    return step_result

阶段四:计划总结。所有步骤完成后,调用 LLM 生成执行摘要。

3.6. MCP集成

MCP(Model Context Protocol)集成使 OpenManus 的工具集可以在运行时动态扩展,MCPClientsapp/tool/mcp.py)继承自 ToolCollection,管理与多个 MCP 服务器的连接,并将远程工具注册为本地可用:

async def _initialize_and_list_tools(self, server_id: str) -> None:
    session = self.sessions[server_id]
    await session.initialize()
    response = await session.list_tools()
    for tool in response.tools:
        tool_name = self._sanitize_tool_name(f"mcp_{server_id}_{tool.name}")
        server_tool = MCPClientTool(
            name=tool_name, description=tool.description,
            parameters=tool.inputSchema, session=session, server_id=server_id,
            original_name=tool.name,
        )
        self.tool_map[tool_name] = server_tool
    self.tools = tuple(self.tool_map.values())

MCPClientTool 作为代理类,将工具执行请求转发到远程 MCP 服务器,工具名称经过 _sanitize_tool_name() 处理,添加 mcp_{server_id}_ 前缀以避免命名冲突,并截断至 64 字符以满足 API 约束。:

class MCPClientTool(BaseTool):
    async def execute(self, **kwargs) -> ToolResult:
        if not self.session:
            return ToolResult(error="Not connected to MCP server")
        result = await self.session.call_tool(self.original_name, kwargs)
        content_str = ", ".join(item.text for item in result.content if isinstance(item, TextContent))
        return ToolResult(output=content_str or "No output returned.")

4. 总结

4.1. 架构优势

  • 清晰的分层抽象。四层 Agent 继承体系(BaseAgent → ReActAgent → ToolCallAgent → 具体 Agent)将执行控制、执行节奏、执行实现和场景配置严格分离。每一层只回答一个问题,职责边界清晰。

  • 标准化的工具协议。BaseTool 通过 name/description/parameters 三元组 + JSON Schema 建立了工具与 LLM 之间的通信契约。工具的开发、注册和调用完全解耦,开发者无需理解 Agent 的内部机制即可编写新工具。

  • 全异步架构。从入口到底层工具执行,全程使用 async/await,适配 Agent 系统天然的 I/O 密集型工作负载(LLM API 调用、网络请求、浏览器操作等),避免阻塞主线程。

  • 健壮的容错设计。LLM 调用的指数退避重试、搜索引擎的多路回退策略、Token 限额的精确管控、死循环检测与自然语言干预,这些机制共同保障了系统在真实环境下的可靠运行。

  • 可扩展的能力边界。通过 MCP 协议支持运行时动态发现和注册远程工具,Agent 的能力不再局限于本地安装的工具集。ToolCollection 的动态增删接口为此提供了基础设施。

4.2. 改进方向

  • Memory 管理机制较为粗糙。当前采用固定数量(100 条)的滑动窗口截断消息,未考虑消息的 Token 权重差异。一条包含大量工具输出的消息和一条简短的 user 消息被同等对待。
  • 工具调用的串行执行瓶颈ToolCallAgent.act() 中的多个工具调用按顺序逐一执行。当 LLM 一次返回多个相互独立的工具调用时(如同时搜索多个关键词),串行执行造成不必要的等待。可以通过分析工具调用间的依赖关系,对独立调用使用asyncio.gather() 实现并行化。