跳到内容

代理

简介

代理是 PydanticAI 与 LLM 交互的主要接口。

在某些用例中,单个代理将控制整个应用程序或组件,但多个代理也可以交互以体现更复杂的工作流程。

Agent 类具有完整的 API 文档,但从概念上讲,您可以将代理视为以下内容的容器

组件 描述
系统提示 由开发人员为 LLM 编写的一组指令。
函数工具 LLM 在生成响应时可以调用的函数,以获取信息。
结构化结果类型 LLM 必须在运行结束时返回的结构化数据类型(如果已指定)。
依赖类型约束 系统提示函数、工具和结果验证器在运行时都可以使用依赖项。
LLM 模型 与代理关联的可选默认 LLM 模型。也可以在运行代理时指定。
模型设置 可选的默认模型设置,以帮助微调请求。也可以在运行代理时指定。

在类型术语中,代理在其依赖类型和结果类型方面是通用的,例如,一个代理需要 Foobar 类型的依赖项并返回 list[str] 类型的结果,则其类型为 Agent[Foobar, list[str]]。在实践中,您不必关心这一点,这应该只是意味着您的 IDE 可以告诉您何时类型正确,并且如果您选择使用 静态类型检查,它应该与 PydanticAI 配合良好。

这是一个模拟轮盘赌的玩具代理示例

roulette_wheel.py
from pydantic_ai import Agent, RunContext

roulette_agent = Agent(  # (1)!
    'openai:gpt-4o',
    deps_type=int,
    result_type=bool,
    system_prompt=(
        'Use the `roulette_wheel` function to see if the '
        'customer has won based on the number they provide.'
    ),
)


@roulette_agent.tool
async def roulette_wheel(ctx: RunContext[int], square: int) -> str:  # (2)!
    """check if the square is a winner"""
    return 'winner' if square == ctx.deps else 'loser'


# Run the agent
success_number = 18  # (3)!
result = roulette_agent.run_sync('Put my money on square eighteen', deps=success_number)
print(result.data)  # (4)!
#> True

result = roulette_agent.run_sync('I bet five is the winner', deps=success_number)
print(result.data)
#> False
  1. 创建一个代理,它期望一个整数依赖项并返回一个布尔结果。此代理的类型将为 Agent[int, bool]
  2. 定义一个工具,用于检查方块是否为获胜者。在这里,RunContext 使用依赖类型 int 参数化;如果您弄错了依赖类型,您将收到类型错误。
  3. 实际上,您可能希望在此处使用随机数,例如 random.randint(0, 36)
  4. result.data 将是一个布尔值,指示方块是否为获胜者。Pydantic 执行结果验证,它将被键入为 bool,因为其类型是从代理的 result_type 泛型参数派生的。

代理被设计为可重用的,就像 FastAPI 应用程序一样

代理旨在实例化一次(通常作为模块全局变量),并在整个应用程序中重用,类似于小型 FastAPI 应用程序或 APIRouter

运行代理

有四种运行代理的方法

  1. agent.run() — 一个协程,返回一个包含已完成响应的 RunResult
  2. agent.run_sync() — 一个普通的同步函数,返回一个包含已完成响应的 RunResult(在内部,这只是调用 loop.run_until_complete(self.run()))。
  3. agent.run_stream() — 一个协程,返回一个 StreamedRunResult,其中包含将响应作为异步可迭代对象流式传输的方法。
  4. agent.iter() — 一个上下文管理器,返回一个 AgentRun,它是代理底层 Graph 节点的异步可迭代对象。

这是一个简单的示例,演示了前三个方法

run_agent.py
from pydantic_ai import Agent

agent = Agent('openai:gpt-4o')

result_sync = agent.run_sync('What is the capital of Italy?')
print(result_sync.data)
#> Rome


async def main():
    result = await agent.run('What is the capital of France?')
    print(result.data)
    #> Paris

    async with agent.run_stream('What is the capital of the UK?') as response:
        print(await response.get_data())
        #> London
(此示例是完整的,可以“按原样”运行 — 您需要添加 asyncio.run(main()) 才能运行 main

您还可以传递先前运行中的消息以继续对话或提供上下文,如 消息和聊天记录 中所述。

迭代代理的图

在底层,PydanticAI 中的每个 Agent 都使用 pydantic-graph 来管理其执行流程。 pydantic-graph 是一个通用的、以类型为中心的库,用于在 Python 中构建和运行有限状态机。它实际上并不依赖于 PydanticAI — 您可以将其独立用于与 GenAI 无关的工作流程 — 但 PydanticAI 利用它来编排代理运行中模型请求和模型响应的处理。

在许多情况下,您根本不需要担心 pydantic-graph;调用 agent.run(...) 只是从头到尾遍历底层图。但是,如果您需要更深入的洞察力或控制 — 例如捕获每个工具调用,或在特定阶段注入您自己的逻辑 — PydanticAI 通过 Agent.iter 公开较低级别的迭代过程。此方法返回一个 AgentRun,您可以异步迭代它,或通过 next 方法手动逐节点驱动。一旦代理的图返回一个 End,您将获得最终结果以及所有步骤的详细历史记录。

async for 迭代

这是一个使用 async foriter 记录代理执行的每个节点的示例

agent_iter_async_for.py
from pydantic_ai import Agent

agent = Agent('openai:gpt-4o')


async def main():
    nodes = []
    # Begin an AgentRun, which is an async-iterable over the nodes of the agent's graph
    async with agent.iter('What is the capital of France?') as agent_run:
        async for node in agent_run:
            # Each node represents a step in the agent's execution
            nodes.append(node)
    print(nodes)
    """
    [
        ModelRequestNode(
            request=ModelRequest(
                parts=[
                    UserPromptPart(
                        content='What is the capital of France?',
                        timestamp=datetime.datetime(...),
                        part_kind='user-prompt',
                    )
                ],
                kind='request',
            )
        ),
        CallToolsNode(
            model_response=ModelResponse(
                parts=[TextPart(content='Paris', part_kind='text')],
                model_name='gpt-4o',
                timestamp=datetime.datetime(...),
                kind='response',
            )
        ),
        End(data=FinalResult(data='Paris', tool_name=None, tool_call_id=None)),
    ]
    """
    print(agent_run.result.data)
    #> Paris
  • AgentRun 是一个异步迭代器,它产生流程中的每个节点(BaseNodeEnd)。
  • 当返回 End 节点时,运行结束。

手动使用 .next(...)

您还可以通过将要运行的下一个节点传递给 AgentRun.next(...) 方法来手动驱动迭代。这允许您在节点执行之前检查或修改节点,或根据您自己的逻辑跳过节点,并更轻松地捕获 next() 中的错误

agent_iter_next.py
from pydantic_ai import Agent
from pydantic_graph import End

agent = Agent('openai:gpt-4o')


async def main():
    async with agent.iter('What is the capital of France?') as agent_run:
        node = agent_run.next_node  # (1)!

        all_nodes = [node]

        # Drive the iteration manually:
        while not isinstance(node, End):  # (2)!
            node = await agent_run.next(node)  # (3)!
            all_nodes.append(node)  # (4)!

        print(all_nodes)
        """
        [
            UserPromptNode(
                user_prompt='What is the capital of France?',
                system_prompts=(),
                system_prompt_functions=[],
                system_prompt_dynamic_functions={},
            ),
            ModelRequestNode(
                request=ModelRequest(
                    parts=[
                        UserPromptPart(
                            content='What is the capital of France?',
                            timestamp=datetime.datetime(...),
                            part_kind='user-prompt',
                        )
                    ],
                    kind='request',
                )
            ),
            CallToolsNode(
                model_response=ModelResponse(
                    parts=[TextPart(content='Paris', part_kind='text')],
                    model_name='gpt-4o',
                    timestamp=datetime.datetime(...),
                    kind='response',
                )
            ),
            End(data=FinalResult(data='Paris', tool_name=None, tool_call_id=None)),
        ]
        """
  1. 我们首先获取将在代理图中运行的第一个节点。
  2. 一旦生成 End 节点,代理运行就完成; End 的实例无法传递给 next
  3. 当您调用 await agent_run.next(node) 时,它会在代理图中执行该节点,更新运行历史记录,并返回要运行的下一个节点。
  4. 您也可以在此处根据需要检查或更改新的 node

访问使用情况和最终结果

您可以随时通过 agent_run.usage()AgentRun 对象检索使用情况统计信息(令牌、请求等)。此方法返回一个包含使用情况数据的 Usage 对象。

运行完成后,agent_run.final_result 将变为一个包含最终输出(和相关元数据)的 AgentRunResult 对象。


流式传输

这是一个流式传输代理运行的示例,结合了 async for 迭代

streaming.py
import asyncio
from dataclasses import dataclass
from datetime import date

from pydantic_ai import Agent
from pydantic_ai.messages import (
    FinalResultEvent,
    FunctionToolCallEvent,
    FunctionToolResultEvent,
    PartDeltaEvent,
    PartStartEvent,
    TextPartDelta,
    ToolCallPartDelta,
)
from pydantic_ai.tools import RunContext


@dataclass
class WeatherService:
    async def get_forecast(self, location: str, forecast_date: date) -> str:
        # In real code: call weather API, DB queries, etc.
        return f'The forecast in {location} on {forecast_date} is 24°C and sunny.'

    async def get_historic_weather(self, location: str, forecast_date: date) -> str:
        # In real code: call a historical weather API or DB
        return (
            f'The weather in {location} on {forecast_date} was 18°C and partly cloudy.'
        )


weather_agent = Agent[WeatherService, str](
    'openai:gpt-4o',
    deps_type=WeatherService,
    result_type=str,  # We'll produce a final answer as plain text
    system_prompt='Providing a weather forecast at the locations the user provides.',
)


@weather_agent.tool
async def weather_forecast(
    ctx: RunContext[WeatherService],
    location: str,
    forecast_date: date,
) -> str:
    if forecast_date >= date.today():
        return await ctx.deps.get_forecast(location, forecast_date)
    else:
        return await ctx.deps.get_historic_weather(location, forecast_date)


output_messages: list[str] = []


async def main():
    user_prompt = 'What will the weather be like in Paris on Tuesday?'

    # Begin a node-by-node, streaming iteration
    async with weather_agent.iter(user_prompt, deps=WeatherService()) as run:
        async for node in run:
            if Agent.is_user_prompt_node(node):
                # A user prompt node => The user has provided input
                output_messages.append(f'=== UserPromptNode: {node.user_prompt} ===')
            elif Agent.is_model_request_node(node):
                # A model request node => We can stream tokens from the model's request
                output_messages.append(
                    '=== ModelRequestNode: streaming partial request tokens ==='
                )
                async with node.stream(run.ctx) as request_stream:
                    async for event in request_stream:
                        if isinstance(event, PartStartEvent):
                            output_messages.append(
                                f'[Request] Starting part {event.index}: {event.part!r}'
                            )
                        elif isinstance(event, PartDeltaEvent):
                            if isinstance(event.delta, TextPartDelta):
                                output_messages.append(
                                    f'[Request] Part {event.index} text delta: {event.delta.content_delta!r}'
                                )
                            elif isinstance(event.delta, ToolCallPartDelta):
                                output_messages.append(
                                    f'[Request] Part {event.index} args_delta={event.delta.args_delta}'
                                )
                        elif isinstance(event, FinalResultEvent):
                            output_messages.append(
                                f'[Result] The model produced a final result (tool_name={event.tool_name})'
                            )
            elif Agent.is_call_tools_node(node):
                # A handle-response node => The model returned some data, potentially calls a tool
                output_messages.append(
                    '=== CallToolsNode: streaming partial response & tool usage ==='
                )
                async with node.stream(run.ctx) as handle_stream:
                    async for event in handle_stream:
                        if isinstance(event, FunctionToolCallEvent):
                            output_messages.append(
                                f'[Tools] The LLM calls tool={event.part.tool_name!r} with args={event.part.args} (tool_call_id={event.part.tool_call_id!r})'
                            )
                        elif isinstance(event, FunctionToolResultEvent):
                            output_messages.append(
                                f'[Tools] Tool call {event.tool_call_id!r} returned => {event.result.content}'
                            )
            elif Agent.is_end_node(node):
                assert run.result.data == node.data.data
                # Once an End node is reached, the agent run is complete
                output_messages.append(f'=== Final Agent Output: {run.result.data} ===')


if __name__ == '__main__':
    asyncio.run(main())

    print(output_messages)
    """
    [
        '=== ModelRequestNode: streaming partial request tokens ===',
        '[Request] Starting part 0: ToolCallPart(tool_name=\'weather_forecast\', args=\'{"location":"Pa\', tool_call_id=\'0001\', part_kind=\'tool-call\')',
        '[Request] Part 0 args_delta=ris","forecast_',
        '[Request] Part 0 args_delta=date":"2030-01-',
        '[Request] Part 0 args_delta=01"}',
        '=== CallToolsNode: streaming partial response & tool usage ===',
        '[Tools] The LLM calls tool=\'weather_forecast\' with args={"location":"Paris","forecast_date":"2030-01-01"} (tool_call_id=\'0001\')',
        "[Tools] Tool call '0001' returned => The forecast in Paris on 2030-01-01 is 24°C and sunny.",
        '=== ModelRequestNode: streaming partial request tokens ===',
        "[Request] Starting part 0: TextPart(content='It will be ', part_kind='text')",
        '[Result] The model produced a final result (tool_name=None)',
        "[Request] Part 0 text delta: 'warm and sunny '",
        "[Request] Part 0 text delta: 'in Paris on '",
        "[Request] Part 0 text delta: 'Tuesday.'",
        '=== CallToolsNode: streaming partial response & tool usage ===',
        '=== Final Agent Output: It will be warm and sunny in Paris on Tuesday. ===',
    ]
    """

其他配置

使用限制

PydanticAI 提供了一个 UsageLimits 结构,以帮助您限制模型运行的使用量(令牌和/或请求)。

您可以通过将 usage_limits 参数传递给 run{_sync,_stream} 函数来应用这些设置。

考虑以下示例,我们限制了响应令牌的数量

from pydantic_ai import Agent
from pydantic_ai.exceptions import UsageLimitExceeded
from pydantic_ai.usage import UsageLimits

agent = Agent('anthropic:claude-3-5-sonnet-latest')

result_sync = agent.run_sync(
    'What is the capital of Italy? Answer with just the city.',
    usage_limits=UsageLimits(response_tokens_limit=10),
)
print(result_sync.data)
#> Rome
print(result_sync.usage())
"""
Usage(requests=1, request_tokens=62, response_tokens=1, total_tokens=63, details=None)
"""

try:
    result_sync = agent.run_sync(
        'What is the capital of Italy? Answer with a paragraph.',
        usage_limits=UsageLimits(response_tokens_limit=10),
    )
except UsageLimitExceeded as e:
    print(e)
    #> Exceeded the response_tokens_limit of 10 (response_tokens=32)

限制请求数量对于防止无限循环或过度工具调用非常有用

from typing_extensions import TypedDict

from pydantic_ai import Agent, ModelRetry
from pydantic_ai.exceptions import UsageLimitExceeded
from pydantic_ai.usage import UsageLimits


class NeverResultType(TypedDict):
    """
    Never ever coerce data to this type.
    """

    never_use_this: str


agent = Agent(
    'anthropic:claude-3-5-sonnet-latest',
    retries=3,
    result_type=NeverResultType,
    system_prompt='Any time you get a response, call the `infinite_retry_tool` to produce another response.',
)


@agent.tool_plain(retries=5)  # (1)!
def infinite_retry_tool() -> int:
    raise ModelRetry('Please try again.')


try:
    result_sync = agent.run_sync(
        'Begin infinite retry loop!', usage_limits=UsageLimits(request_limit=3)  # (2)!
    )
except UsageLimitExceeded as e:
    print(e)
    #> The next request would exceed the request_limit of 3
  1. 此工具能够在出错之前重试 5 次,模拟一个可能陷入循环的工具。
  2. 此运行将在 3 个请求后出错,从而防止无限工具调用。

注意

如果您注册了很多工具,这一点尤其重要。 request_limit 可用于防止模型在循环中调用它们太多次。

模型(运行)设置

PydanticAI 提供了一个 settings.ModelSettings 结构,以帮助您微调请求。此结构允许您配置影响模型行为的常见参数,例如 temperaturemax_tokenstimeout 等。

有两种方法可以应用这些设置:1. 通过 model_settings 参数传递给 run{_sync,_stream} 函数。这允许在每个请求的基础上进行微调。 2. 通过 model_settings 参数在 Agent 初始化期间进行设置。这些设置将默认应用于使用所述代理的所有后续运行调用。但是,在特定运行调用期间提供的 model_settings 将覆盖代理的默认设置。

例如,如果您想将 temperature 设置设置为 0.0 以确保较少的随机行为,您可以执行以下操作

from pydantic_ai import Agent

agent = Agent('openai:gpt-4o')

result_sync = agent.run_sync(
    'What is the capital of Italy?', model_settings={'temperature': 0.0}
)
print(result_sync.data)
#> Rome

模型特定设置

如果您希望进一步自定义模型行为,您可以使用 ModelSettings 的子类,例如与您选择的模型关联的 GeminiModelSettings

例如

from pydantic_ai import Agent, UnexpectedModelBehavior
from pydantic_ai.models.gemini import GeminiModelSettings

agent = Agent('google-gla:gemini-1.5-flash')

try:
    result = agent.run_sync(
        'Write a list of 5 very rude things that I might say to the universe after stubbing my toe in the dark:',
        model_settings=GeminiModelSettings(
            temperature=0.0,  # general model settings can also be specified
            gemini_safety_settings=[
                {
                    'category': 'HARM_CATEGORY_HARASSMENT',
                    'threshold': 'BLOCK_LOW_AND_ABOVE',
                },
                {
                    'category': 'HARM_CATEGORY_HATE_SPEECH',
                    'threshold': 'BLOCK_LOW_AND_ABOVE',
                },
            ],
        ),
    )
except UnexpectedModelBehavior as e:
    print(e)  # (1)!
    """
    Safety settings triggered, body:
    <safety settings details>
    """
  1. 引发此错误是因为超过了安全阈值。通常,result 将包含正常的 ModelResponse

运行 vs. 对话

代理运行可能代表整个对话 — 单次运行中可以交换的消息数量没有限制。但是,对话也可能由多次运行组成,特别是如果您需要在单独的交互或 API 调用之间维护状态。

这是一个由多次运行组成的对话示例

conversation_example.py
from pydantic_ai import Agent

agent = Agent('openai:gpt-4o')

# First run
result1 = agent.run_sync('Who was Albert Einstein?')
print(result1.data)
#> Albert Einstein was a German-born theoretical physicist.

# Second run, passing previous messages
result2 = agent.run_sync(
    'What was his most famous equation?',
    message_history=result1.new_messages(),  # (1)!
)
print(result2.data)
#> Albert Einstein's most famous equation is (E = mc^2).
  1. 继续对话;如果没有 message_history,模型将不知道“his”指的是谁。

(此示例是完整的,可以“按原样”运行)

设计上的类型安全

PydanticAI 旨在与静态类型检查器(如 mypy 和 pyright)良好协作。

类型提示在某种程度上是可选的

PydanticAI 旨在尽可能使类型检查对您有用(如果您选择使用它),但您不必一直到处使用类型。

也就是说,由于 PydanticAI 使用 Pydantic,而 Pydantic 使用类型提示作为模式和验证的定义,因此某些类型(特别是工具参数上的类型提示,以及 Agentresult_type 参数)在运行时使用。

如果类型提示给您带来的困惑多于帮助,我们(库开发人员)就搞砸了,如果您发现这种情况,请创建一个 issue 解释是什么让您感到恼火!

特别是,代理在其依赖项类型和它们返回的结果类型方面都是通用的,因此您可以使用类型提示来确保您正在使用正确的类型。

考虑以下带有类型错误的脚本

type_mistakes.py
from dataclasses import dataclass

from pydantic_ai import Agent, RunContext


@dataclass
class User:
    name: str


agent = Agent(
    'test',
    deps_type=User,  # (1)!
    result_type=bool,
)


@agent.system_prompt
def add_user_name(ctx: RunContext[str]) -> str:  # (2)!
    return f"The user's name is {ctx.deps}."


def foobar(x: bytes) -> None:
    pass


result = agent.run_sync('Does their name start with "A"?', deps=User('Anne'))
foobar(result.data)  # (3)!
  1. 该代理被定义为期望 User 的实例作为 deps
  2. 但是这里的 add_user_name 被定义为接受 str 作为依赖项,而不是 User
  3. 由于代理被定义为返回 bool,这将引发类型错误,因为 foobar 期望 bytes

在此运行 mypy 将给出以下输出

 uv run mypy type_mistakes.py
type_mistakes.py:18: error: Argument 1 to "system_prompt" of "Agent" has incompatible type "Callable[[RunContext[str]], str]"; expected "Callable[[RunContext[User]], str]"  [arg-type]
type_mistakes.py:28: error: Argument 1 to "foobar" has incompatible type "bool"; expected "bytes"  [arg-type]
Found 2 errors in 1 file (checked 1 source file)

运行 pyright 将识别出相同的问题。

系统提示

系统提示乍一看似乎很简单,因为它们只是字符串(或连接的字符串序列),但制作正确的系统提示是使模型按您希望的方式行为的关键。

一般来说,系统提示分为两类

  1. 静态系统提示:这些在编写代码时已知,可以通过 Agent 构造函数system_prompt 参数定义。
  2. 动态系统提示:这些在某种程度上取决于运行时才知道的上下文,应通过使用 @agent.system_prompt 修饰器装饰的函数来定义。

您可以将两者都添加到单个代理;它们按照运行时定义的顺序附加。

这是一个使用两种类型的系统提示的示例

system_prompts.py
from datetime import date

from pydantic_ai import Agent, RunContext

agent = Agent(
    'openai:gpt-4o',
    deps_type=str,  # (1)!
    system_prompt="Use the customer's name while replying to them.",  # (2)!
)


@agent.system_prompt  # (3)!
def add_the_users_name(ctx: RunContext[str]) -> str:
    return f"The user's name is {ctx.deps}."


@agent.system_prompt
def add_the_date() -> str:  # (4)!
    return f'The date is {date.today()}.'


result = agent.run_sync('What is the date?', deps='Frank')
print(result.data)
#> Hello Frank, the date today is 2032-01-02.
  1. 该代理期望一个字符串依赖项。
  2. 在代理创建时定义的静态系统提示。
  3. 通过带有 RunContext 的装饰器定义的动态系统提示,这在 run_sync 之后立即调用,而不是在创建代理时调用,因此可以受益于运行时信息,例如该运行中使用的依赖项。
  4. 另一个动态系统提示,系统提示不必具有 RunContext 参数。

(此示例是完整的,可以“按原样”运行)

反思和自我纠正

来自函数工具参数验证和 结构化结果验证 的验证错误可以传递回模型,并请求重试。

您还可以从 工具结果验证器函数 中引发 ModelRetry,以告知模型应重试生成响应。

这是一个示例

tool_retry.py
from pydantic import BaseModel

from pydantic_ai import Agent, RunContext, ModelRetry

from fake_database import DatabaseConn


class ChatResult(BaseModel):
    user_id: int
    message: str


agent = Agent(
    'openai:gpt-4o',
    deps_type=DatabaseConn,
    result_type=ChatResult,
)


@agent.tool(retries=2)
def get_user_by_name(ctx: RunContext[DatabaseConn], name: str) -> int:
    """Get a user's ID from their full name."""
    print(name)
    #> John
    #> John Doe
    user_id = ctx.deps.users.get(name=name)
    if user_id is None:
        raise ModelRetry(
            f'No user found with name {name!r}, remember to provide their full name'
        )
    return user_id


result = agent.run_sync(
    'Send a message to John Doe asking for coffee next week', deps=DatabaseConn()
)
print(result.data)
"""
user_id=123 message='Hello John, would you be free for coffee sometime next week? Let me know what works for you!'
"""

模型错误

如果模型行为异常(例如,超出重试限制,或其 API 返回 503),代理运行将引发 UnexpectedModelBehavior

在这些情况下,可以使用 capture_run_messages 来访问运行期间交换的消息,以帮助诊断问题。

agent_model_errors.py
from pydantic_ai import Agent, ModelRetry, UnexpectedModelBehavior, capture_run_messages

agent = Agent('openai:gpt-4o')


@agent.tool_plain
def calc_volume(size: int) -> int:  # (1)!
    if size == 42:
        return size**3
    else:
        raise ModelRetry('Please try again.')


with capture_run_messages() as messages:  # (2)!
    try:
        result = agent.run_sync('Please get me the volume of a box with size 6.')
    except UnexpectedModelBehavior as e:
        print('An error occurred:', e)
        #> An error occurred: Tool exceeded max retries count of 1
        print('cause:', repr(e.__cause__))
        #> cause: ModelRetry('Please try again.')
        print('messages:', messages)
        """
        messages:
        [
            ModelRequest(
                parts=[
                    UserPromptPart(
                        content='Please get me the volume of a box with size 6.',
                        timestamp=datetime.datetime(...),
                        part_kind='user-prompt',
                    )
                ],
                kind='request',
            ),
            ModelResponse(
                parts=[
                    ToolCallPart(
                        tool_name='calc_volume',
                        args={'size': 6},
                        tool_call_id=None,
                        part_kind='tool-call',
                    )
                ],
                model_name='gpt-4o',
                timestamp=datetime.datetime(...),
                kind='response',
            ),
            ModelRequest(
                parts=[
                    RetryPromptPart(
                        content='Please try again.',
                        tool_name='calc_volume',
                        tool_call_id=None,
                        timestamp=datetime.datetime(...),
                        part_kind='retry-prompt',
                    )
                ],
                kind='request',
            ),
            ModelResponse(
                parts=[
                    ToolCallPart(
                        tool_name='calc_volume',
                        args={'size': 6},
                        tool_call_id=None,
                        part_kind='tool-call',
                    )
                ],
                model_name='gpt-4o',
                timestamp=datetime.datetime(...),
                kind='response',
            ),
        ]
        """
    else:
        print(result.data)
  1. 定义一个工具,在这种情况下会重复引发 ModelRetry
  2. capture_run_messages 用于捕获运行期间交换的消息。

(此示例是完整的,可以“按原样”运行)

注意

如果您在单个 capture_run_messages 上下文中多次调用 runrun_syncrun_stream,则 messages 将仅表示第一次调用期间交换的消息。