跳转到内容

智能体

介绍

代理是 Pydantic AI 与 LLM 交互的主要接口。

在某些用例中,单个代理将控制整个应用程序或组件,但多个代理也可以相互交互以实现更复杂的工作流程。

Agent 类有完整的 API 文档,但从概念上讲,您可以将代理视为一个容器,其中包含:

组件 描述
指令 由开发人员编写的一组给 LLM 的指令。
函数工具工具集 LLM 在生成响应时可能调用的用于获取信息的函数。
结构化输出类型 如果指定,LLM 在运行结束时必须返回的结构化数据类型。
依赖类型约束 动态指令函数、工具和输出函数在运行时都可以使用依赖项。
LLM 模型 与代理关联的可选默认 LLM 模型。也可以在运行代理时指定。
模型设置 可选的默认模型设置,用于微调请求。也可以在运行代理时指定。

在类型术语中,代理在其依赖项和输出类型上是泛型的,例如,一个需要 Foobar 类型依赖项并产生 list[str] 类型输出的代理,其类型为 Agent[Foobar, list[str]]。在实践中,您不需要关心这一点,这只意味着您的 IDE 能够告诉您类型是否正确,并且如果您选择使用静态类型检查,它应该能与 Pydantic AI 很好地协同工作。

下面是一个模拟轮盘赌的代理的简单示例:

roulette_wheel.py
from pydantic_ai import Agent, RunContext

roulette_agent = Agent(  # (1)!
    'openai:gpt-4o',
    deps_type=int,
    output_type=bool,
    system_prompt=(
        'Use the `roulette_wheel` function to see if the '
        'customer has won based on the number they provide.'
    ),
)


@roulette_agent.tool
async def roulette_wheel(ctx: RunContext[int], square: int) -> str:  # (2)!
    """check if the square is a winner"""
    return 'winner' if square == ctx.deps else 'loser'


# Run the agent
success_number = 18  # (3)!
result = roulette_agent.run_sync('Put my money on square eighteen', deps=success_number)
print(result.output)  # (4)!
#> True

result = roulette_agent.run_sync('I bet five is the winner', deps=success_number)
print(result.output)
#> False
  1. 创建一个代理,它期望一个整数依赖项并产生一个布尔输出。该代理的类型将为 Agent[int, bool]
  2. 定义一个检查方格是否中奖的工具。这里 RunContext 使用依赖类型 int 进行了参数化;如果您弄错了依赖类型,将会得到一个类型错误。
  3. 在实际情况中,您可能希望在这里使用一个随机数,例如 random.randint(0, 36)
  4. result.output 将是一个布尔值,指示该方格是否中奖。Pydantic 会执行输出验证,并且它的类型将被确定为 bool,因为其类型派生自代理的 output_type 泛型参数。

代理被设计为可重用,就像 FastAPI 应用一样

代理旨在被实例化一次(通常作为模块全局变量)并在您的整个应用程序中重用,类似于一个小的 FastAPI 应用或一个 APIRouter

运行代理

有四种方式可以运行一个代理:

  1. agent.run() — 一个异步函数,返回一个包含完整响应的 RunResult
  2. agent.run_sync() — 一个普通的同步函数,返回一个包含完整响应的 RunResult(在内部,它只是调用 loop.run_until_complete(self.run()))。
  3. agent.run_stream() — 一个异步上下文管理器,返回一个 StreamedRunResult,其中包含以异步可迭代方式流式传输文本和结构化输出的方法。
  4. agent.iter() — 一个上下文管理器,返回一个 AgentRun,这是一个对代理底层 Graph 节点进行异步迭代的对象。

下面是一个演示前三种方法的简单示例:

run_agent.py
from pydantic_ai import Agent

agent = Agent('openai:gpt-4o')

result_sync = agent.run_sync('What is the capital of Italy?')
print(result_sync.output)
#> The capital of Italy is Rome.


async def main():
    result = await agent.run('What is the capital of France?')
    print(result.output)
    #> The capital of France is Paris.

    async with agent.run_stream('What is the capital of the UK?') as response:
        async for text in response.stream_text():
            print(text)
            #> The capital of
            #> The capital of the UK is
            #> The capital of the UK is London.

(此示例是完整的,可以“按原样”运行——您需要添加 asyncio.run(main()) 来运行 main

您还可以传递先前运行的消息来继续对话或提供上下文,如 消息和聊天历史 中所述。

流式传输事件和最终输出

如上例所示,run_stream() 可以轻松地在代理的最终输出到来时进行流式传输。它还接受一个可选的 event_stream_handler 参数,您可以使用它来了解在最终输出产生之前运行期间发生了什么。

下面的示例展示了如何流式传输事件和文本输出。您也可以流式传输结构化输出

注意

由于 run_stream() 方法会将第一个匹配 output_type 的输出视为最终输出,它将停止运行代理图,并且不会执行模型在该“最终”输出之后进行的任何工具调用。

如果您希望始终将代理图运行到完成,并流式传输模型流式响应和代理执行工具的所有事件,请使用带有 event_stream_handleragent.run()agent.iter(),如下几节所述。

run_stream_events.py
import asyncio
from collections.abc import AsyncIterable
from datetime import date

from pydantic_ai import Agent, RunContext
from pydantic_ai.messages import (
    AgentStreamEvent,
    FinalResultEvent,
    FunctionToolCallEvent,
    FunctionToolResultEvent,
    PartDeltaEvent,
    PartStartEvent,
    TextPartDelta,
    ThinkingPartDelta,
    ToolCallPartDelta,
)

weather_agent = Agent(
    'openai:gpt-4o',
    system_prompt='Providing a weather forecast at the locations the user provides.',
)


@weather_agent.tool
async def weather_forecast(
    ctx: RunContext,
    location: str,
    forecast_date: date,
) -> str:
    return f'The forecast in {location} on {forecast_date} is 24°C and sunny.'


output_messages: list[str] = []


async def event_stream_handler(
    ctx: RunContext,
    event_stream: AsyncIterable[AgentStreamEvent],
):
    async for event in event_stream:
        if isinstance(event, PartStartEvent):
            output_messages.append(f'[Request] Starting part {event.index}: {event.part!r}')
        elif isinstance(event, PartDeltaEvent):
            if isinstance(event.delta, TextPartDelta):
                output_messages.append(f'[Request] Part {event.index} text delta: {event.delta.content_delta!r}')
            elif isinstance(event.delta, ThinkingPartDelta):
                output_messages.append(f'[Request] Part {event.index} thinking delta: {event.delta.content_delta!r}')
            elif isinstance(event.delta, ToolCallPartDelta):
                output_messages.append(f'[Request] Part {event.index} args delta: {event.delta.args_delta}')
        elif isinstance(event, FunctionToolCallEvent):
            output_messages.append(
                f'[Tools] The LLM calls tool={event.part.tool_name!r} with args={event.part.args} (tool_call_id={event.part.tool_call_id!r})'
            )
        elif isinstance(event, FunctionToolResultEvent):
            output_messages.append(f'[Tools] Tool call {event.tool_call_id!r} returned => {event.result.content}')
        elif isinstance(event, FinalResultEvent):
            output_messages.append(f'[Result] The model starting producing a final result (tool_name={event.tool_name})')


async def main():
    user_prompt = 'What will the weather be like in Paris on Tuesday?'

    async with weather_agent.run_stream(user_prompt, event_stream_handler=event_stream_handler) as run:
        async for output in run.stream_text():
            output_messages.append(f'[Output] {output}')


if __name__ == '__main__':
    asyncio.run(main())

    print(output_messages)
    """
    [
        "[Request] Starting part 0: ToolCallPart(tool_name='weather_forecast', tool_call_id='0001')",
        '[Request] Part 0 args delta: {"location":"Pa',
        '[Request] Part 0 args delta: ris","forecast_',
        '[Request] Part 0 args delta: date":"2030-01-',
        '[Request] Part 0 args delta: 01"}',
        '[Tools] The LLM calls tool=\'weather_forecast\' with args={"location":"Paris","forecast_date":"2030-01-01"} (tool_call_id=\'0001\')',
        "[Tools] Tool call '0001' returned => The forecast in Paris on 2030-01-01 is 24°C and sunny.",
        "[Request] Starting part 0: TextPart(content='It will be ')",
        '[Result] The model starting producing a final result (tool_name=None)',
        '[Output] It will be ',
        '[Output] It will be warm and sunny ',
        '[Output] It will be warm and sunny in Paris on ',
        '[Output] It will be warm and sunny in Paris on Tuesday.',
    ]
    """

流式传输所有事件

agent.run_stream() 类似,agent.run() 接受一个可选的 event_stream_handler 参数,让您可以流式传输模型流式响应和代理执行工具的所有事件。与 run_stream() 不同的是,即使在工具调用之前收到了看起来可能是最终结果的文本,它也总是将代理图运行到完成。

注意

当与 event_stream_handler 一起使用时,run() 方法目前需要您自己从 PartStartEvent 和随后的 PartDeltaEvent 中拼接流式文本,而不是提供一个 stream_text() 便捷方法。

为了两全其美,但会增加一些复杂性,您可以使用下一节中描述的 agent.iter(),它允许您遍历代理图在每一步流式传输事件和输出

run_events.py
import asyncio

from run_stream_events import event_stream_handler, output_messages, weather_agent


async def main():
    user_prompt = 'What will the weather be like in Paris on Tuesday?'

    run = await weather_agent.run(user_prompt, event_stream_handler=event_stream_handler)

    output_messages.append(f'[Final Output] {run.output}')


if __name__ == '__main__':
    asyncio.run(main())

    print(output_messages)
    """
    [
        "[Request] Starting part 0: ToolCallPart(tool_name='weather_forecast', tool_call_id='0001')",
        '[Request] Part 0 args delta: {"location":"Pa',
        '[Request] Part 0 args delta: ris","forecast_',
        '[Request] Part 0 args delta: date":"2030-01-',
        '[Request] Part 0 args delta: 01"}',
        '[Tools] The LLM calls tool=\'weather_forecast\' with args={"location":"Paris","forecast_date":"2030-01-01"} (tool_call_id=\'0001\')',
        "[Tools] Tool call '0001' returned => The forecast in Paris on 2030-01-01 is 24°C and sunny.",
        "[Request] Starting part 0: TextPart(content='It will be ')",
        '[Result] The model starting producing a final result (tool_name=None)',
        "[Request] Part 0 text delta: 'warm and sunny '",
        "[Request] Part 0 text delta: 'in Paris on '",
        "[Request] Part 0 text delta: 'Tuesday.'",
        '[Final Output] It will be warm and sunny in Paris on Tuesday.',
    ]
    """

(这个例子是完整的,可以“按原样”运行)

遍历代理的图

在底层,Pydantic AI 中的每个 Agent 都使用 pydantic-graph 来管理其执行流。pydantic-graph 是一个通用的、以类型为中心的库,用于在 Python 中构建和运行有限状态机。它实际上不依赖于 Pydantic AI — 您可以将其独立用于与生成式AI无关的工作流程 — 但 Pydantic AI 利用它来协调代理运行中模型请求和模型响应的处理。

在许多情况下,您根本不需要担心 pydantic-graph;调用 agent.run(...) 只是从头到尾遍历底层图。但是,如果您需要更深入的洞察或控制 — 例如在特定阶段注入您自己的逻辑 — Pydantic AI 通过 Agent.iter 暴露了底层的迭代过程。该方法返回一个 AgentRun,您可以对其进行异步迭代,或通过 next 方法手动逐节点驱动。一旦代理的图返回一个 End,您就得到了最终结果以及所有步骤的详细历史记录。

async for 迭代

以下是使用 async foriter 记录代理执行的每个节点的示例:

agent_iter_async_for.py
from pydantic_ai import Agent

agent = Agent('openai:gpt-4o')


async def main():
    nodes = []
    # Begin an AgentRun, which is an async-iterable over the nodes of the agent's graph
    async with agent.iter('What is the capital of France?') as agent_run:
        async for node in agent_run:
            # Each node represents a step in the agent's execution
            nodes.append(node)
    print(nodes)
    """
    [
        UserPromptNode(
            user_prompt='What is the capital of France?',
            instructions=None,
            instructions_functions=[],
            system_prompts=(),
            system_prompt_functions=[],
            system_prompt_dynamic_functions={},
        ),
        ModelRequestNode(
            request=ModelRequest(
                parts=[
                    UserPromptPart(
                        content='What is the capital of France?',
                        timestamp=datetime.datetime(...),
                    )
                ]
            )
        ),
        CallToolsNode(
            model_response=ModelResponse(
                parts=[TextPart(content='The capital of France is Paris.')],
                usage=RequestUsage(input_tokens=56, output_tokens=7),
                model_name='gpt-4o',
                timestamp=datetime.datetime(...),
            )
        ),
        End(data=FinalResult(output='The capital of France is Paris.')),
    ]
    """
    print(agent_run.result.output)
    #> The capital of France is Paris.

(此示例是完整的,可以“按原样”运行——您需要添加 asyncio.run(main()) 来运行 main

  • AgentRun 是一个异步迭代器,它会产生流程中的每个节点(BaseNodeEnd)。
  • 当返回 End 节点时,运行结束。

手动使用 .next(...)

您还可以通过将要运行的下一个节点传递给 AgentRun.next(...) 方法来手动驱动迭代。这允许您在节点执行前检查或修改它,或根据您自己的逻辑跳过节点,并且更容易地在 next() 中捕获错误。

agent_iter_next.py
from pydantic_ai import Agent
from pydantic_graph import End

agent = Agent('openai:gpt-4o')


async def main():
    async with agent.iter('What is the capital of France?') as agent_run:
        node = agent_run.next_node  # (1)!

        all_nodes = [node]

        # Drive the iteration manually:
        while not isinstance(node, End):  # (2)!
            node = await agent_run.next(node)  # (3)!
            all_nodes.append(node)  # (4)!

        print(all_nodes)
        """
        [
            UserPromptNode(
                user_prompt='What is the capital of France?',
                instructions=None,
                instructions_functions=[],
                system_prompts=(),
                system_prompt_functions=[],
                system_prompt_dynamic_functions={},
            ),
            ModelRequestNode(
                request=ModelRequest(
                    parts=[
                        UserPromptPart(
                            content='What is the capital of France?',
                            timestamp=datetime.datetime(...),
                        )
                    ]
                )
            ),
            CallToolsNode(
                model_response=ModelResponse(
                    parts=[TextPart(content='The capital of France is Paris.')],
                    usage=RequestUsage(input_tokens=56, output_tokens=7),
                    model_name='gpt-4o',
                    timestamp=datetime.datetime(...),
                )
            ),
            End(data=FinalResult(output='The capital of France is Paris.')),
        ]
        """
  1. 我们首先获取将在代理图中运行的第一个节点。
  2. 一旦产生了一个 End 节点,代理运行就完成了;End 的实例不能传递给 next
  3. 当您调用 await agent_run.next(node) 时,它会执行代理图中的那个节点,更新运行的历史记录,并返回要运行的*下一个*节点。
  4. 您也可以在这里根据需要检查或修改新的 node

(此示例是完整的,可以“按原样”运行——您需要添加 asyncio.run(main()) 来运行 main

访问用量和最终输出

您可以随时通过 agent_run.usage()AgentRun 对象中检索用量统计信息(令牌、请求等)。该方法返回一个包含用量数据的 RunUsage 对象。

一旦运行完成,agent_run.result 就会变成一个 AgentRunResult 对象,其中包含最终输出(以及相关元数据)。

流式传输所有事件和输出

这是一个结合 async for 迭代流式传输代理运行的示例:

streaming_iter.py
import asyncio
from dataclasses import dataclass
from datetime import date

from pydantic_ai import Agent, RunContext
from pydantic_ai.messages import (
    FinalResultEvent,
    FunctionToolCallEvent,
    FunctionToolResultEvent,
    PartDeltaEvent,
    PartStartEvent,
    TextPartDelta,
    ThinkingPartDelta,
    ToolCallPartDelta,
)


@dataclass
class WeatherService:
    async def get_forecast(self, location: str, forecast_date: date) -> str:
        # In real code: call weather API, DB queries, etc.
        return f'The forecast in {location} on {forecast_date} is 24°C and sunny.'

    async def get_historic_weather(self, location: str, forecast_date: date) -> str:
        # In real code: call a historical weather API or DB
        return f'The weather in {location} on {forecast_date} was 18°C and partly cloudy.'


weather_agent = Agent[WeatherService, str](
    'openai:gpt-4o',
    deps_type=WeatherService,
    output_type=str,  # We'll produce a final answer as plain text
    system_prompt='Providing a weather forecast at the locations the user provides.',
)


@weather_agent.tool
async def weather_forecast(
    ctx: RunContext[WeatherService],
    location: str,
    forecast_date: date,
) -> str:
    if forecast_date >= date.today():
        return await ctx.deps.get_forecast(location, forecast_date)
    else:
        return await ctx.deps.get_historic_weather(location, forecast_date)


output_messages: list[str] = []


async def main():
    user_prompt = 'What will the weather be like in Paris on Tuesday?'

    # Begin a node-by-node, streaming iteration
    async with weather_agent.iter(user_prompt, deps=WeatherService()) as run:
        async for node in run:
            if Agent.is_user_prompt_node(node):
                # A user prompt node => The user has provided input
                output_messages.append(f'=== UserPromptNode: {node.user_prompt} ===')
            elif Agent.is_model_request_node(node):
                # A model request node => We can stream tokens from the model's request
                output_messages.append('=== ModelRequestNode: streaming partial request tokens ===')
                async with node.stream(run.ctx) as request_stream:
                    final_result_found = False
                    async for event in request_stream:
                        if isinstance(event, PartStartEvent):
                            output_messages.append(f'[Request] Starting part {event.index}: {event.part!r}')
                        elif isinstance(event, PartDeltaEvent):
                            if isinstance(event.delta, TextPartDelta):
                                output_messages.append(
                                    f'[Request] Part {event.index} text delta: {event.delta.content_delta!r}'
                                )
                            elif isinstance(event.delta, ThinkingPartDelta):
                                output_messages.append(
                                    f'[Request] Part {event.index} thinking delta: {event.delta.content_delta!r}'
                                )
                            elif isinstance(event.delta, ToolCallPartDelta):
                                output_messages.append(
                                    f'[Request] Part {event.index} args delta: {event.delta.args_delta}'
                                )
                        elif isinstance(event, FinalResultEvent):
                            output_messages.append(
                                f'[Result] The model started producing a final result (tool_name={event.tool_name})'
                            )
                            final_result_found = True
                            break

                    if final_result_found:
                        # Once the final result is found, we can call `AgentStream.stream_text()` to stream the text.
                        # A similar `AgentStream.stream_output()` method is available to stream structured output.
                        async for output in request_stream.stream_text():
                            output_messages.append(f'[Output] {output}')
            elif Agent.is_call_tools_node(node):
                # A handle-response node => The model returned some data, potentially calls a tool
                output_messages.append('=== CallToolsNode: streaming partial response & tool usage ===')
                async with node.stream(run.ctx) as handle_stream:
                    async for event in handle_stream:
                        if isinstance(event, FunctionToolCallEvent):
                            output_messages.append(
                                f'[Tools] The LLM calls tool={event.part.tool_name!r} with args={event.part.args} (tool_call_id={event.part.tool_call_id!r})'
                            )
                        elif isinstance(event, FunctionToolResultEvent):
                            output_messages.append(
                                f'[Tools] Tool call {event.tool_call_id!r} returned => {event.result.content}'
                            )
            elif Agent.is_end_node(node):
                # Once an End node is reached, the agent run is complete
                assert run.result is not None
                assert run.result.output == node.data.output
                output_messages.append(f'=== Final Agent Output: {run.result.output} ===')


if __name__ == '__main__':
    asyncio.run(main())

    print(output_messages)
    """
    [
        '=== UserPromptNode: What will the weather be like in Paris on Tuesday? ===',
        '=== ModelRequestNode: streaming partial request tokens ===',
        "[Request] Starting part 0: ToolCallPart(tool_name='weather_forecast', tool_call_id='0001')",
        '[Request] Part 0 args delta: {"location":"Pa',
        '[Request] Part 0 args delta: ris","forecast_',
        '[Request] Part 0 args delta: date":"2030-01-',
        '[Request] Part 0 args delta: 01"}',
        '=== CallToolsNode: streaming partial response & tool usage ===',
        '[Tools] The LLM calls tool=\'weather_forecast\' with args={"location":"Paris","forecast_date":"2030-01-01"} (tool_call_id=\'0001\')',
        "[Tools] Tool call '0001' returned => The forecast in Paris on 2030-01-01 is 24°C and sunny.",
        '=== ModelRequestNode: streaming partial request tokens ===',
        "[Request] Starting part 0: TextPart(content='It will be ')",
        '[Result] The model started producing a final result (tool_name=None)',
        '[Output] It will be ',
        '[Output] It will be warm and sunny ',
        '[Output] It will be warm and sunny in Paris on ',
        '[Output] It will be warm and sunny in Paris on Tuesday.',
        '=== CallToolsNode: streaming partial response & tool usage ===',
        '=== Final Agent Output: It will be warm and sunny in Paris on Tuesday. ===',
    ]
    """

(这个例子是完整的,可以“按原样”运行)

附加配置

用量限制

Pydantic AI 提供了一个 UsageLimits 结构,帮助您限制模型运行中的用量(令牌、请求和工具调用)。

您可以通过将 usage_limits 参数传递给 run{_sync,_stream} 函数来应用这些设置。

考虑以下示例,我们限制了响应令牌的数量:

from pydantic_ai import Agent, UsageLimitExceeded, UsageLimits

agent = Agent('anthropic:claude-3-5-sonnet-latest')

result_sync = agent.run_sync(
    'What is the capital of Italy? Answer with just the city.',
    usage_limits=UsageLimits(response_tokens_limit=10),
)
print(result_sync.output)
#> Rome
print(result_sync.usage())
#> RunUsage(input_tokens=62, output_tokens=1, requests=1)

try:
    result_sync = agent.run_sync(
        'What is the capital of Italy? Answer with a paragraph.',
        usage_limits=UsageLimits(response_tokens_limit=10),
    )
except UsageLimitExceeded as e:
    print(e)
    #> Exceeded the output_tokens_limit of 10 (output_tokens=32)

限制请求数量对于防止无限循环或过多的工具调用非常有用。

from typing_extensions import TypedDict

from pydantic_ai import Agent, ModelRetry, UsageLimitExceeded, UsageLimits


class NeverOutputType(TypedDict):
    """
    Never ever coerce data to this type.
    """

    never_use_this: str


agent = Agent(
    'anthropic:claude-3-5-sonnet-latest',
    retries=3,
    output_type=NeverOutputType,
    system_prompt='Any time you get a response, call the `infinite_retry_tool` to produce another response.',
)


@agent.tool_plain(retries=5)  # (1)!
def infinite_retry_tool() -> int:
    raise ModelRetry('Please try again.')


try:
    result_sync = agent.run_sync(
        'Begin infinite retry loop!', usage_limits=UsageLimits(request_limit=3)  # (2)!
    )
except UsageLimitExceeded as e:
    print(e)
    #> The next request would exceed the request_limit of 3
  1. 这个工具在出错前可以重试 5 次,模拟一个可能会陷入循环的工具。
  2. 这次运行将在 3 次请求后出错,从而防止了无限的工具调用。
限制工具调用次数

如果您需要限制单次运行中成功工具调用的次数,请使用 tool_calls_limit

from pydantic_ai import Agent
from pydantic_ai.exceptions import UsageLimitExceeded
from pydantic_ai.usage import UsageLimits

agent = Agent('anthropic:claude-3-5-sonnet-latest')

@agent.tool_plain
def do_work() -> str:
    return 'ok'

try:
    # Allow at most one executed tool call in this run
    agent.run_sync('Please call the tool twice', usage_limits=UsageLimits(tool_calls_limit=1))
except UsageLimitExceeded as e:
    print(e)
    #> The next tool call would exceed the tool_calls_limit of 1 (tool_calls=1)

注意

  • 如果您注册了许多工具,用量限制就尤其重要。使用 request_limit 来限制模型轮次的数量,使用 tool_calls_limit 来限制单次运行中成功工具执行的次数上限。
  • 这些限制在调用 LLM 之前的最后阶段强制执行。如果您的限制比重试设置更严格,用量限制将在所有重试尝试完成之前达到。

模型(运行)设置

Pydantic AI 提供了一个 settings.ModelSettings 结构来帮助您微调请求。这个结构允许您配置影响模型行为的常见参数,例如 temperaturemax_tokenstimeout 等。

有三种方式可以应用这些设置,优先级顺序明确:

  1. 模型级别默认值 - 在创建模型实例时通过 settings 参数设置。这些作为该模型的基础默认值。
  2. 代理级别默认值 - 在 Agent 初始化期间通过 model_settings 参数设置。这些会与模型默认值合并,代理设置优先。
  3. 运行时覆盖 - 通过 model_settings 参数传递给 run{_sync,_stream} 函数。这些具有最高优先级,并与合并后的代理和模型默认值合并。

例如,如果您想将 temperature 设置为 0.0 以确保行为更少随机性,您可以这样做:

from pydantic_ai import Agent, ModelSettings
from pydantic_ai.models.openai import OpenAIChatModel

# 1. Model-level defaults
model = OpenAIChatModel(
    'gpt-4o',
    settings=ModelSettings(temperature=0.8, max_tokens=500)  # Base defaults
)

# 2. Agent-level defaults (overrides model defaults by merging)
agent = Agent(model, model_settings=ModelSettings(temperature=0.5))

# 3. Run-time overrides (highest priority)
result_sync = agent.run_sync(
    'What is the capital of Italy?',
    model_settings=ModelSettings(temperature=0.0)  # Final temperature: 0.0
)
print(result_sync.output)
#> The capital of Italy is Rome.

最终请求使用了 temperature=0.0(运行时),max_tokens=500(来自模型),展示了设置如何合并,且运行时设置优先。

模型设置支持

所有具体的模型实现(OpenAI、Anthropic、Google 等)都支持模型级别的设置。像 FallbackModelWrapperModelInstrumentedModel 这样的包装模型没有自己的设置 - 它们使用其底层模型的设置。

模型特定设置

如果您希望进一步自定义模型行为,可以使用 ModelSettings 的子类,例如与您选择的模型相关联的 GoogleModelSettings

例如:

from pydantic_ai import Agent, UnexpectedModelBehavior
from pydantic_ai.models.google import GoogleModelSettings

agent = Agent('google-gla:gemini-1.5-flash')

try:
    result = agent.run_sync(
        'Write a list of 5 very rude things that I might say to the universe after stubbing my toe in the dark:',
        model_settings=GoogleModelSettings(
            temperature=0.0,  # general model settings can also be specified
            gemini_safety_settings=[
                {
                    'category': 'HARM_CATEGORY_HARASSMENT',
                    'threshold': 'BLOCK_LOW_AND_ABOVE',
                },
                {
                    'category': 'HARM_CATEGORY_HATE_SPEECH',
                    'threshold': 'BLOCK_LOW_AND_ABOVE',
                },
            ],
        ),
    )
except UnexpectedModelBehavior as e:
    print(e)  # (1)!
    """
    Safety settings triggered, body:
    <safety settings details>
    """
  1. 引发此错误是因为超出了安全阈值。

运行 vs. 对话

一次代理运行可能代表整个对话——单次运行中可以交换的消息数量没有限制。然而,一次对话也可能由多次运行组成,特别是如果您需要在独立的交互或 API 调用之间维护状态。

下面是一个由多次运行组成的对话示例:

conversation_example.py
from pydantic_ai import Agent

agent = Agent('openai:gpt-4o')

# First run
result1 = agent.run_sync('Who was Albert Einstein?')
print(result1.output)
#> Albert Einstein was a German-born theoretical physicist.

# Second run, passing previous messages
result2 = agent.run_sync(
    'What was his most famous equation?',
    message_history=result1.new_messages(),  # (1)!
)
print(result2.output)
#> Albert Einstein's most famous equation is (E = mc^2).
  1. 继续对话;如果没有 message_history,模型将不知道“他”指的是谁。

(这个例子是完整的,可以“按原样”运行)

设计上的类型安全

Pydantic AI 的设计旨在与静态类型检查器(如 mypy 和 pyright)良好协作。

类型是(某种程度上)可选的

Pydantic AI 的设计旨在使类型检查在您选择使用时尽可能有用,但您不必随时随地都使用类型。

话虽如此,因为 Pydantic AI 使用 Pydantic,而 Pydantic 使用类型提示作为模式和验证的定义,所以某些类型(特别是工具参数上的类型提示,以及传递给 Agentoutput_type 参数)在运行时会被使用。

如果我们(库的开发者)搞砸了,导致类型提示给您带来的困惑多于帮助,如果您发现这种情况,请创建一个问题,说明是什么让您感到烦恼!

特别是,代理在其依赖项类型和返回的输出类型上都是泛型的,因此您可以使用类型提示来确保您使用的是正确的类型。

考虑以下带有类型错误的脚本:

type_mistakes.py
from dataclasses import dataclass

from pydantic_ai import Agent, RunContext


@dataclass
class User:
    name: str


agent = Agent(
    'test',
    deps_type=User,  # (1)!
    output_type=bool,
)


@agent.system_prompt
def add_user_name(ctx: RunContext[str]) -> str:  # (2)!
    return f"The user's name is {ctx.deps}."


def foobar(x: bytes) -> None:
    pass


result = agent.run_sync('Does their name start with "A"?', deps=User('Anne'))
foobar(result.output)  # (3)!
  1. 代理被定义为期望一个 User 实例作为 deps
  2. 但这里 add_user_name 被定义为接受一个 str 作为依赖项,而不是一个 User
  3. 由于代理被定义为返回一个 bool,这里会引发一个类型错误,因为 foobar 期望的是 bytes

对此运行 mypy 将得到以下输出:

 uv run mypy type_mistakes.py
type_mistakes.py:18: error: Argument 1 to "system_prompt" of "Agent" has incompatible type "Callable[[RunContext[str]], str]"; expected "Callable[[RunContext[User]], str]"  [arg-type]
type_mistakes.py:28: error: Argument 1 to "foobar" has incompatible type "bool"; expected "bytes"  [arg-type]
Found 2 errors in 1 file (checked 1 source file)

运行 pyright 会识别出相同的问题。

系统提示

系统提示初看起来可能很简单,因为它们只是字符串(或连接起来的字符串序列),但精心设计正确的系统提示是让模型按您期望的方式行事的关键。

提示

对于大多数用例,您应该使用“指令”(instructions)而不是“系统提示”(system prompts)。

不过,如果您清楚自己的目的,并希望在后续的补全请求中保留发送给 LLM 的消息历史中的系统提示消息,您可以使用 system_prompt 参数/装饰器来实现这一点。

有关更多信息,请参阅下面关于指令的部分。

通常,系统提示分为两类:

  1. 静态系统提示:这些在编写代码时是已知的,可以通过 Agent 构造函数system_prompt 参数定义。
  2. 动态系统提示:这些以某种方式依赖于直到运行时才知道的上下文,应该通过使用 @agent.system_prompt 装饰的函数来定义。

您可以将两者都添加到一个代理中;它们在运行时按定义的顺序附加。

下面是一个同时使用两种类型系统提示的示例:

system_prompts.py
from datetime import date

from pydantic_ai import Agent, RunContext

agent = Agent(
    'openai:gpt-4o',
    deps_type=str,  # (1)!
    system_prompt="Use the customer's name while replying to them.",  # (2)!
)


@agent.system_prompt  # (3)!
def add_the_users_name(ctx: RunContext[str]) -> str:
    return f"The user's name is {ctx.deps}."


@agent.system_prompt
def add_the_date() -> str:  # (4)!
    return f'The date is {date.today()}.'


result = agent.run_sync('What is the date?', deps='Frank')
print(result.output)
#> Hello Frank, the date today is 2032-01-02.
  1. 代理期望一个字符串依赖项。
  2. 在代理创建时定义的静态系统提示。
  3. 通过带有 RunContext 的装饰器定义的动态系统提示,它在 run_sync 之后被调用,而不是在代理创建时,因此可以利用运行时的信息,比如该次运行中使用的依赖项。
  4. 另一个动态系统提示,系统提示不必有 RunContext 参数。

(这个例子是完整的,可以“按原样”运行)

指令

指令(Instructions)与系统提示类似。主要区别在于,当在调用 Agent.run 及类似方法时提供了明确的 message_history 时,历史记录中任何现有消息的*指令*都不会包含在对模型的请求中——只有*当前*代理的指令会被包含。

您应该使用:

  • instructions 当您希望对模型的请求只包含*当前*代理的系统提示时
  • system_prompt 当您希望对模型的请求*保留*先前请求中使用的系统提示(可能由其他代理发出)时

总的来说,我们建议使用 instructions 而不是 system_prompt,除非您有特定的理由使用 system_prompt

指令,像系统提示一样,分为两类:

  1. 静态指令:这些在编写代码时是已知的,可以通过 Agent 构造函数instructions 参数定义。
  2. 动态指令:这些依赖于仅在运行时可用的上下文,应该使用 @agent.instructions 装饰的函数来定义。与动态系统提示不同(当存在 message_history 时可能会被重用),动态指令总是被重新评估。

静态和动态指令都可以添加到单个代理中,它们在运行时按定义的顺序附加。

下面是一个同时使用两种类型指令的示例:

instructions.py
from datetime import date

from pydantic_ai import Agent, RunContext

agent = Agent(
    'openai:gpt-4o',
    deps_type=str,  # (1)!
    instructions="Use the customer's name while replying to them.",  # (2)!
)


@agent.instructions  # (3)!
def add_the_users_name(ctx: RunContext[str]) -> str:
    return f"The user's name is {ctx.deps}."


@agent.instructions
def add_the_date() -> str:  # (4)!
    return f'The date is {date.today()}.'


result = agent.run_sync('What is the date?', deps='Frank')
print(result.output)
#> Hello Frank, the date today is 2032-01-02.
  1. 代理期望一个字符串依赖项。
  2. 在代理创建时定义的静态指令。
  3. 通过带有 RunContext 的装饰器定义的动态指令,它在 run_sync 之后被调用,而不是在代理创建时,因此可以利用运行时的信息,比如该次运行中使用的依赖项。
  4. 另一个动态指令,指令不必有 RunContext 参数。

(这个例子是完整的,可以“按原样”运行)

请注意,返回一个空字符串将导致不添加任何指令消息。

反思与自我纠正

来自函数工具参数验证和结构化输出验证的验证错误都可以被传回给模型,并请求重试。

您也可以在工具输出函数内部引发 ModelRetry 来告诉模型它应该重试生成响应。

这是一个例子:

tool_retry.py
from pydantic import BaseModel

from pydantic_ai import Agent, RunContext, ModelRetry

from fake_database import DatabaseConn


class ChatResult(BaseModel):
    user_id: int
    message: str


agent = Agent(
    'openai:gpt-4o',
    deps_type=DatabaseConn,
    output_type=ChatResult,
)


@agent.tool(retries=2)
def get_user_by_name(ctx: RunContext[DatabaseConn], name: str) -> int:
    """Get a user's ID from their full name."""
    print(name)
    #> John
    #> John Doe
    user_id = ctx.deps.users.get(name=name)
    if user_id is None:
        raise ModelRetry(
            f'No user found with name {name!r}, remember to provide their full name'
        )
    return user_id


result = agent.run_sync(
    'Send a message to John Doe asking for coffee next week', deps=DatabaseConn()
)
print(result.output)
"""
user_id=123 message='Hello John, would you be free for coffee sometime next week? Let me know what works for you!'
"""

模型错误

如果模型的行为出乎意料(例如,超过重试限制,或其 API 返回 503),代理运行将引发 UnexpectedModelBehavior

在这些情况下,可以使用 capture_run_messages 来访问运行期间交换的消息,以帮助诊断问题。

agent_model_errors.py
from pydantic_ai import Agent, ModelRetry, UnexpectedModelBehavior, capture_run_messages

agent = Agent('openai:gpt-4o')


@agent.tool_plain
def calc_volume(size: int) -> int:  # (1)!
    if size == 42:
        return size**3
    else:
        raise ModelRetry('Please try again.')


with capture_run_messages() as messages:  # (2)!
    try:
        result = agent.run_sync('Please get me the volume of a box with size 6.')
    except UnexpectedModelBehavior as e:
        print('An error occurred:', e)
        #> An error occurred: Tool 'calc_volume' exceeded max retries count of 1
        print('cause:', repr(e.__cause__))
        #> cause: ModelRetry('Please try again.')
        print('messages:', messages)
        """
        messages:
        [
            ModelRequest(
                parts=[
                    UserPromptPart(
                        content='Please get me the volume of a box with size 6.',
                        timestamp=datetime.datetime(...),
                    )
                ]
            ),
            ModelResponse(
                parts=[
                    ToolCallPart(
                        tool_name='calc_volume',
                        args={'size': 6},
                        tool_call_id='pyd_ai_tool_call_id',
                    )
                ],
                usage=RequestUsage(input_tokens=62, output_tokens=4),
                model_name='gpt-4o',
                timestamp=datetime.datetime(...),
            ),
            ModelRequest(
                parts=[
                    RetryPromptPart(
                        content='Please try again.',
                        tool_name='calc_volume',
                        tool_call_id='pyd_ai_tool_call_id',
                        timestamp=datetime.datetime(...),
                    )
                ]
            ),
            ModelResponse(
                parts=[
                    ToolCallPart(
                        tool_name='calc_volume',
                        args={'size': 6},
                        tool_call_id='pyd_ai_tool_call_id',
                    )
                ],
                usage=RequestUsage(input_tokens=72, output_tokens=8),
                model_name='gpt-4o',
                timestamp=datetime.datetime(...),
            ),
        ]
        """
    else:
        print(result.output)
  1. 定义一个在这种情况下会重复引发 ModelRetry 的工具。
  2. 使用 capture_run_messages 来捕获运行期间交换的消息。

(这个例子是完整的,可以“按原样”运行)

注意

如果您在单个 capture_run_messages 上下文中多次调用 runrun_syncrun_streammessages 将仅代表第一次调用期间交换的消息。