跳转到内容

使用 Temporal 实现持久化执行

Pydantic AI 使您能够构建持久的代理,这些代理可以在瞬态 API 故障、应用程序错误或重启后保持其进度,并以生产级的可靠性处理长时间运行、异步和有人类参与的工作流。持久代理完全支持流式处理MCP,并增加了容错的优点。

Temporal 是一个流行的持久执行平台,Pydantic AI 原生支持它。该集成仅使用 Pydantic AI 的公共接口,因此它也可以作为如何与其他持久执行系统集成的参考。

持久执行

在 Temporal 的持久执行实现中,一个在与模型或 API 交互时崩溃或遇到异常的程序将不断重试,直到成功完成。

Temporal 主要依赖重放机制从故障中恢复。随着程序的进展,Temporal 会保存关键的输入和决策,从而允许重新启动的程序从中断的地方继续执行。

实现这一点的关键是将应用程序的可重复(确定性)部分和不可重复(非确定性)部分分开。

  1. 确定性部分,称为工作流 (workflows),在使用相同输入重新运行时会以相同的方式执行。
  2. 非确定性部分,称为活动 (activities),可以运行任意代码,执行 I/O 和任何其他操作。

工作流代码可以长时间运行,如果中断,可以精确地从中断处恢复。关键的是,工作流代码通常不能包含任何类型的 I/O,无论是通过网络、磁盘等。活动代码对 I/O 或外部交互没有限制,但如果活动在执行中途失败,它会从头开始重启。

注意

如果您熟悉 Celery,将 Temporal 的活动看作类似于 Celery 的任务可能会有所帮助,但是在工作流的下一步之前,您需要等待任务完成并获取其结果。然而,Temporal 的工作流和活动比 Celery 的任务提供了更大的灵活性和功能。

更多信息请参见 Temporal 文档

对于 Pydantic AI 代理,与 Temporal 的集成意味着模型请求、可能需要 I/O 的工具调用以及MCP 服务器通信都因其 I/O 要求而需要被卸载到 Temporal 的活动中,而协调它们的逻辑(即代理运行)则存在于工作流中。处理计划任务或 Web 请求的代码可以执行工作流,工作流将根据需要执行活动。

下图展示了在 Temporal 中一个代理应用的整体架构。Temporal 服务器负责跟踪程序执行,并确保相关状态被可靠地保存(即存储到内部数据库,并可能跨云区域复制)。Temporal 服务器以加密形式管理数据,因此所有数据处理都在运行工作流和活动的 Worker 上进行。

            +---------------------+
            |   Temporal Server   |      (Stores workflow state,
            +---------------------+       schedules activities,
                     ^                    persists progress)
                     |
        Save state,  |   Schedule Tasks,
        progress,    |   load state on resume
        timeouts     |
                     |
+------------------------------------------------------+
|                      Worker                          |
|   +----------------------------------------------+   |
|   |              Workflow Code                   |   |
|   |       (Agent Run Loop)                       |   |
|   +----------------------------------------------+   |
|          |          |                |               |
|          v          v                v               |
|   +-----------+ +------------+ +-------------+       |
|   | Activity  | | Activity   | |  Activity   |       |
|   | (Tool)    | | (MCP Tool) | | (Model API) |       |
|   +-----------+ +------------+ +-------------+       |
|         |           |                |               |
+------------------------------------------------------+
          |           |                |
          v           v                v
      [External APIs, services, databases, etc.]

更多信息请参见 Temporal 文档

持久代理

任何代理都可以被包装在 TemporalAgent 中,以获得一个可以在确定性的 Temporal 工作流内部使用的持久代理。这是通过自动将所有需要 I/O 的工作(即模型请求、工具调用和 MCP 服务器通信)卸载到非确定性的活动中来实现的。

在包装时,代理的模型工具集(包括在代理上注册的函数工具和 MCP 服务器)被冻结,为每个都动态创建活动,并且原始的模型和工具集被包装以调用 worker 来执行相应的活动,而不是直接在工作流内部执行操作。原始代理在 Temporal 工作流之外仍然可以正常使用,但在包装后对其模型或工具集的任何更改都不会反映在持久代理中。

这里有一个简单但完整的示例,展示了如何包装一个代理以实现持久执行,创建一个带有持久执行逻辑的 Temporal 工作流,连接到 Temporal 服务器,并从非持久代码中运行工作流。它只需要一个在本地运行的 Temporal 服务器。

brew install temporal
temporal server start-dev
temporal_agent.py
import uuid

from temporalio import workflow
from temporalio.client import Client
from temporalio.worker import Worker

from pydantic_ai import Agent
from pydantic_ai.durable_exec.temporal import (
    AgentPlugin,
    PydanticAIPlugin,
    TemporalAgent,
)

agent = Agent(
    'gpt-5',
    instructions="You're an expert in geography.",
    name='geography',  # (10)!
)

temporal_agent = TemporalAgent(agent)  # (1)!


@workflow.defn
class GeographyWorkflow:  # (2)!
    @workflow.run
    async def run(self, prompt: str) -> str:
        result = await temporal_agent.run(prompt)  # (3)!
        return result.output


async def main():
    client = await Client.connect(  # (4)!
        'localhost:7233',  # (5)!
        plugins=[PydanticAIPlugin()],  # (6)!
    )

    async with Worker(  # (7)!
        client,
        task_queue='geography',
        workflows=[GeographyWorkflow],
        plugins=[AgentPlugin(temporal_agent)],  # (8)!
    ):
        output = await client.execute_workflow(  # (9)!
            GeographyWorkflow.run,
            args=['What is the capital of Mexico?'],
            id=f'geography-{uuid.uuid4()}',
            task_queue='geography',
        )
        print(output)
        #> Mexico City (Ciudad de México, CDMX)
  1. 原始的 Agent 不能在确定性的 Temporal 工作流内部使用,但 TemporalAgent 可以。
  2. 如上所述,工作流代表一段确定性代码,它可以使用非确定性活动来执行需要 I/O 的操作。
  3. TemporalAgent.run() 的工作方式与 Agent.run() 完全相同,但它会自动将模型请求、工具调用和 MCP 服务器通信卸载到 Temporal 活动中。
  4. 我们连接到 Temporal 服务器,它负责跟踪工作流和活动的执行。
  5. 这假设 Temporal 服务器正在本地运行
  6. PydanticAIPlugin 告诉 Temporal 使用 Pydantic 进行序列化和反序列化,并将 UserError 异常视为不可重试的。
  7. 我们启动 worker,它将监听指定的任务队列并运行工作流和活动。在实际应用中,这可能会在一个单独的服务中运行。
  8. AgentPluginTemporalAgent 的活动注册到 worker。
  9. 我们调用服务器,在一个正在监听指定任务队列的 worker 上执行工作流。
  10. 代理的 name 用于唯一标识其活动。

(此示例是完整的,可以“按原样”运行——您需要添加 asyncio.run(main()) 来运行 main

在实际应用中,代理、工作流和 worker 通常与调用工作流执行的代码分开定义。因为 Temporal 工作流需要在文件的顶层定义,并且在工作流内部以及启动 worker 时(用于注册活动)需要 TemporalAgent 实例,所以它也需要在文件的顶层定义。

有关如何在 Python 应用程序中使用 Temporal 的更多信息,请参见他们的Python SDK 指南

Temporal 集成注意事项

在使用 Temporal 进行持久执行时,有一些特定于代理和工具集的注意事项。理解这些对于确保您的代理和工具集能够与 Temporal 的工作流和活动模型正确工作非常重要。

代理名称和工具集 ID

为了确保 Temporal 在活动失败或中断然后重新启动时知道要运行什么代码,即使您的代码在此期间发生了变化,每个活动都需要有一个稳定且唯一的名称。

TemporalAgent 为被包装代理的模型请求和工具集(特别是那些实现自己的工具列表和调用的工具集,即 FunctionToolsetMCPServer)动态创建活动时,它们的名称来源于代理的 name 和工具集的 id。这些字段通常是可选的,但在使用 Temporal 时是必须设置的。一旦持久代理部署到生产环境,就不应更改它们,因为这会破坏活动中的工作流。

除此之外,任何代理和工具集都能正常工作!

指令函数、输出函数和历史处理器

Pydantic AI 在线程中运行非异步的指令系统提示函数、历史处理器输出函数输出验证器,这在 Temporal 工作流中不受支持,并且需要一个活动。请确保这些函数是异步的。

支持同步的工具函数,因为工具会自动在活动中运行,除非明确禁用。尽管如此,还是建议将工具函数也设置为异步以提高性能。

代理运行上下文和依赖项

由于工作流和活动在不同的进程中运行,它们之间传递的任何值都需要是可序列化的。由于这些载荷存储在工作流执行事件历史中,Temporal 将其大小限制为 2MB。

为了考虑这些限制,在活动中运行的工具函数和事件流处理器会收到代理 RunContext 的一个受限版本,您有责任确保提供给 TemporalAgent.run()依赖项对象可以使用 Pydantic 进行序列化。

具体来说,默认情况下只有 depsretriestool_call_idtool_nametool_call_approvedretryrun_step 字段可用,尝试访问 modelusagepromptmessagestracer 会引发错误。如果您需要在活动中使用这些属性中的一个或多个,可以创建一个带有自定义 serialize_run_contextdeserialize_run_context 类方法的 TemporalRunContext 子类,并将其作为 run_context_type 传递给 TemporalAgent

流式处理

由于 Temporal 活动无法直接将输出流式传输到活动调用处,因此不支持 Agent.run_stream()Agent.iter()

作为替代方案,您可以通过在 AgentTemporalAgent 实例上设置 event_stream_handler 并在工作流内部使用 TemporalAgent.run() 来实现流式处理。事件流处理器函数将接收代理的运行上下文以及一个包含来自模型流式响应和代理工具执行的事件的异步可迭代对象。有关示例,请参见流式处理文档

由于流式模型请求活动、工作流和工作流执行调用都在不同的进程中进行,因此在它们之间传递数据需要谨慎处理。

  • 要将数据从工作流调用处或工作流传递到事件流处理器,您可以使用依赖项对象
  • 要将数据从事件流处理器传递到工作流、工作流调用处或前端,您需要使用一个外部系统,事件流处理器可以向其写入,而事件消费者可以从中读取,例如消息队列。您可以使用依赖项对象来确保在所有需要它的地方都可以使用相同的连接字符串或其他唯一 ID。

活动配置

可以通过向 TemporalAgent 构造函数传递 temporalio.workflow.ActivityConfig 对象来自定义 Temporal 的活动配置,例如超时和重试策略。

  • activity_config:用于所有活动的基本 Temporal 活动配置。如果未提供配置,则使用 60 秒的 start_to_close_timeout
  • model_activity_config:用于模型请求活动的 Temporal 活动配置。此配置会与基本活动配置合并。
  • toolset_activity_config:用于特定工具集(通过 ID 标识)的获取工具和调用工具活动的 Temporal 活动配置。此配置会与基本活动配置合并。
  • tool_activity_config:用于特定工具调用活动(通过工具集 ID 和工具名称标识)的 Temporal 活动配置。此配置会与基本配置和特定于工具集的活动配置合并。

    如果某个工具不使用 I/O,您可以指定 False 来禁用使用活动。请注意,该工具必须被定义为 async 函数,因为非异步工具在线程中运行,这是非确定性的,因此在活动之外不受支持。

活动重试

除了 Temporal 会执行的请求失败自动重试之外,Pydantic AI 和各种提供商的 API 客户端也有自己的请求重试逻辑。同时启用这些可能会导致请求重试次数超出预期,并且 Retry-After 处理不当。

使用 Temporal 时,建议不要使用HTTP 请求重试,并关闭您的提供商 API 客户端自身的重试逻辑,例如,通过在自定义的 OpenAIProvider API 客户端上设置 max_retries=0

您可以使用活动配置来自定义 Temporal 的重试策略。

使用 Logfire 实现可观测性

Temporal 为每个工作流和活动执行生成遥测事件和指标,Pydantic AI 为每个代理运行、模型请求和工具调用生成事件。这些可以发送到 Pydantic Logfire,以全面了解您的应用程序中发生的情况。

要将 Logfire 与 Temporal 一起使用,您需要向 Temporal 的 Client.connect() 传递一个 LogfirePlugin 对象。

logfire_plugin.py
from temporalio.client import Client

from pydantic_ai.durable_exec.temporal import LogfirePlugin, PydanticAIPlugin


async def main():
    client = await Client.connect(
        'localhost:7233',
        plugins=[PydanticAIPlugin(), LogfirePlugin()],
    )

默认情况下,LogfirePlugin 会检测 Temporal(包括指标)和 Pydantic AI,并将所有数据发送到 Logfire。要自定义 Logfire 配置和检测,您可以向 LogfirePlugin 构造函数传递一个 logfire_setup 函数,并返回一个自定义的 Logfire 实例(即 logfire.configure() 的结果)。要禁用向 Logfire 发送 Temporal 指标,您可以向 LogfirePlugin 构造函数传递 metrics=False

已知问题

Pandas

当在活动内部使用 logfire.info 并且您的项目依赖项中包含 pandas 包时,您可能会遇到以下错误,这似乎是导入竞争条件的结果。

AttributeError: partially initialized module 'pandas' has no attribute '_pandas_parser_CAPI' (most likely due to a circular import)

要解决此问题,您可以使用 temporalio.workflow.unsafe.imports_passed_through() 上下文管理器来主动导入该包,使其不会在工作流沙箱中重新加载。

temporal_activity.py
from temporalio import workflow

with workflow.unsafe.imports_passed_through():
    import pandas