代理
简介
代理是 PydanticAI 与 LLM 交互的主要接口。
在某些用例中,单个代理将控制整个应用程序或组件,但多个代理也可以交互以体现更复杂的工作流程。
Agent
类具有完整的 API 文档,但从概念上讲,您可以将代理视为以下内容的容器
组件 | 描述 |
---|---|
系统提示 | 由开发人员为 LLM 编写的一组指令。 |
函数工具 | LLM 在生成响应时可以调用的函数,以获取信息。 |
结构化结果类型 | LLM 必须在运行结束时返回的结构化数据类型(如果已指定)。 |
依赖类型约束 | 系统提示函数、工具和结果验证器在运行时都可以使用依赖项。 |
LLM 模型 | 与代理关联的可选默认 LLM 模型。也可以在运行代理时指定。 |
模型设置 | 可选的默认模型设置,以帮助微调请求。也可以在运行代理时指定。 |
在类型术语中,代理在其依赖类型和结果类型方面是通用的,例如,一个代理需要 Foobar
类型的依赖项并返回 list[str]
类型的结果,则其类型为 Agent[Foobar, list[str]]
。在实践中,您不必关心这一点,这应该只是意味着您的 IDE 可以告诉您何时类型正确,并且如果您选择使用 静态类型检查,它应该与 PydanticAI 配合良好。
这是一个模拟轮盘赌的玩具代理示例
from pydantic_ai import Agent, RunContext
roulette_agent = Agent( # (1)!
'openai:gpt-4o',
deps_type=int,
result_type=bool,
system_prompt=(
'Use the `roulette_wheel` function to see if the '
'customer has won based on the number they provide.'
),
)
@roulette_agent.tool
async def roulette_wheel(ctx: RunContext[int], square: int) -> str: # (2)!
"""check if the square is a winner"""
return 'winner' if square == ctx.deps else 'loser'
# Run the agent
success_number = 18 # (3)!
result = roulette_agent.run_sync('Put my money on square eighteen', deps=success_number)
print(result.data) # (4)!
#> True
result = roulette_agent.run_sync('I bet five is the winner', deps=success_number)
print(result.data)
#> False
- 创建一个代理,它期望一个整数依赖项并返回一个布尔结果。此代理的类型将为
Agent[int, bool]
。 - 定义一个工具,用于检查方块是否为获胜者。在这里,
RunContext
使用依赖类型int
参数化;如果您弄错了依赖类型,您将收到类型错误。 - 实际上,您可能希望在此处使用随机数,例如
random.randint(0, 36)
。 result.data
将是一个布尔值,指示方块是否为获胜者。Pydantic 执行结果验证,它将被键入为bool
,因为其类型是从代理的result_type
泛型参数派生的。
运行代理
有四种运行代理的方法
agent.run()
— 一个协程,返回一个包含已完成响应的RunResult
。agent.run_sync()
— 一个普通的同步函数,返回一个包含已完成响应的RunResult
(在内部,这只是调用loop.run_until_complete(self.run())
)。agent.run_stream()
— 一个协程,返回一个StreamedRunResult
,其中包含将响应作为异步可迭代对象流式传输的方法。agent.iter()
— 一个上下文管理器,返回一个AgentRun
,它是代理底层Graph
节点的异步可迭代对象。
这是一个简单的示例,演示了前三个方法
from pydantic_ai import Agent
agent = Agent('openai:gpt-4o')
result_sync = agent.run_sync('What is the capital of Italy?')
print(result_sync.data)
#> Rome
async def main():
result = await agent.run('What is the capital of France?')
print(result.data)
#> Paris
async with agent.run_stream('What is the capital of the UK?') as response:
print(await response.get_data())
#> London
asyncio.run(main())
才能运行 main
)
您还可以传递先前运行中的消息以继续对话或提供上下文,如 消息和聊天记录 中所述。
迭代代理的图
在底层,PydanticAI 中的每个 Agent
都使用 pydantic-graph 来管理其执行流程。 pydantic-graph 是一个通用的、以类型为中心的库,用于在 Python 中构建和运行有限状态机。它实际上并不依赖于 PydanticAI — 您可以将其独立用于与 GenAI 无关的工作流程 — 但 PydanticAI 利用它来编排代理运行中模型请求和模型响应的处理。
在许多情况下,您根本不需要担心 pydantic-graph;调用 agent.run(...)
只是从头到尾遍历底层图。但是,如果您需要更深入的洞察力或控制 — 例如捕获每个工具调用,或在特定阶段注入您自己的逻辑 — PydanticAI 通过 Agent.iter
公开较低级别的迭代过程。此方法返回一个 AgentRun
,您可以异步迭代它,或通过 next
方法手动逐节点驱动。一旦代理的图返回一个 End
,您将获得最终结果以及所有步骤的详细历史记录。
async for
迭代
这是一个使用 async for
和 iter
记录代理执行的每个节点的示例
from pydantic_ai import Agent
agent = Agent('openai:gpt-4o')
async def main():
nodes = []
# Begin an AgentRun, which is an async-iterable over the nodes of the agent's graph
async with agent.iter('What is the capital of France?') as agent_run:
async for node in agent_run:
# Each node represents a step in the agent's execution
nodes.append(node)
print(nodes)
"""
[
ModelRequestNode(
request=ModelRequest(
parts=[
UserPromptPart(
content='What is the capital of France?',
timestamp=datetime.datetime(...),
part_kind='user-prompt',
)
],
kind='request',
)
),
CallToolsNode(
model_response=ModelResponse(
parts=[TextPart(content='Paris', part_kind='text')],
model_name='gpt-4o',
timestamp=datetime.datetime(...),
kind='response',
)
),
End(data=FinalResult(data='Paris', tool_name=None, tool_call_id=None)),
]
"""
print(agent_run.result.data)
#> Paris
AgentRun
是一个异步迭代器,它产生流程中的每个节点(BaseNode
或End
)。- 当返回
End
节点时,运行结束。
手动使用 .next(...)
您还可以通过将要运行的下一个节点传递给 AgentRun.next(...)
方法来手动驱动迭代。这允许您在节点执行之前检查或修改节点,或根据您自己的逻辑跳过节点,并更轻松地捕获 next()
中的错误
from pydantic_ai import Agent
from pydantic_graph import End
agent = Agent('openai:gpt-4o')
async def main():
async with agent.iter('What is the capital of France?') as agent_run:
node = agent_run.next_node # (1)!
all_nodes = [node]
# Drive the iteration manually:
while not isinstance(node, End): # (2)!
node = await agent_run.next(node) # (3)!
all_nodes.append(node) # (4)!
print(all_nodes)
"""
[
UserPromptNode(
user_prompt='What is the capital of France?',
system_prompts=(),
system_prompt_functions=[],
system_prompt_dynamic_functions={},
),
ModelRequestNode(
request=ModelRequest(
parts=[
UserPromptPart(
content='What is the capital of France?',
timestamp=datetime.datetime(...),
part_kind='user-prompt',
)
],
kind='request',
)
),
CallToolsNode(
model_response=ModelResponse(
parts=[TextPart(content='Paris', part_kind='text')],
model_name='gpt-4o',
timestamp=datetime.datetime(...),
kind='response',
)
),
End(data=FinalResult(data='Paris', tool_name=None, tool_call_id=None)),
]
"""
- 我们首先获取将在代理图中运行的第一个节点。
- 一旦生成
End
节点,代理运行就完成;End
的实例无法传递给next
。 - 当您调用
await agent_run.next(node)
时,它会在代理图中执行该节点,更新运行历史记录,并返回要运行的下一个节点。 - 您也可以在此处根据需要检查或更改新的
node
。
访问使用情况和最终结果
您可以随时通过 agent_run.usage()
从 AgentRun
对象检索使用情况统计信息(令牌、请求等)。此方法返回一个包含使用情况数据的 Usage
对象。
运行完成后,agent_run.final_result
将变为一个包含最终输出(和相关元数据)的 AgentRunResult
对象。
流式传输
这是一个流式传输代理运行的示例,结合了 async for
迭代
import asyncio
from dataclasses import dataclass
from datetime import date
from pydantic_ai import Agent
from pydantic_ai.messages import (
FinalResultEvent,
FunctionToolCallEvent,
FunctionToolResultEvent,
PartDeltaEvent,
PartStartEvent,
TextPartDelta,
ToolCallPartDelta,
)
from pydantic_ai.tools import RunContext
@dataclass
class WeatherService:
async def get_forecast(self, location: str, forecast_date: date) -> str:
# In real code: call weather API, DB queries, etc.
return f'The forecast in {location} on {forecast_date} is 24°C and sunny.'
async def get_historic_weather(self, location: str, forecast_date: date) -> str:
# In real code: call a historical weather API or DB
return (
f'The weather in {location} on {forecast_date} was 18°C and partly cloudy.'
)
weather_agent = Agent[WeatherService, str](
'openai:gpt-4o',
deps_type=WeatherService,
result_type=str, # We'll produce a final answer as plain text
system_prompt='Providing a weather forecast at the locations the user provides.',
)
@weather_agent.tool
async def weather_forecast(
ctx: RunContext[WeatherService],
location: str,
forecast_date: date,
) -> str:
if forecast_date >= date.today():
return await ctx.deps.get_forecast(location, forecast_date)
else:
return await ctx.deps.get_historic_weather(location, forecast_date)
output_messages: list[str] = []
async def main():
user_prompt = 'What will the weather be like in Paris on Tuesday?'
# Begin a node-by-node, streaming iteration
async with weather_agent.iter(user_prompt, deps=WeatherService()) as run:
async for node in run:
if Agent.is_user_prompt_node(node):
# A user prompt node => The user has provided input
output_messages.append(f'=== UserPromptNode: {node.user_prompt} ===')
elif Agent.is_model_request_node(node):
# A model request node => We can stream tokens from the model's request
output_messages.append(
'=== ModelRequestNode: streaming partial request tokens ==='
)
async with node.stream(run.ctx) as request_stream:
async for event in request_stream:
if isinstance(event, PartStartEvent):
output_messages.append(
f'[Request] Starting part {event.index}: {event.part!r}'
)
elif isinstance(event, PartDeltaEvent):
if isinstance(event.delta, TextPartDelta):
output_messages.append(
f'[Request] Part {event.index} text delta: {event.delta.content_delta!r}'
)
elif isinstance(event.delta, ToolCallPartDelta):
output_messages.append(
f'[Request] Part {event.index} args_delta={event.delta.args_delta}'
)
elif isinstance(event, FinalResultEvent):
output_messages.append(
f'[Result] The model produced a final result (tool_name={event.tool_name})'
)
elif Agent.is_call_tools_node(node):
# A handle-response node => The model returned some data, potentially calls a tool
output_messages.append(
'=== CallToolsNode: streaming partial response & tool usage ==='
)
async with node.stream(run.ctx) as handle_stream:
async for event in handle_stream:
if isinstance(event, FunctionToolCallEvent):
output_messages.append(
f'[Tools] The LLM calls tool={event.part.tool_name!r} with args={event.part.args} (tool_call_id={event.part.tool_call_id!r})'
)
elif isinstance(event, FunctionToolResultEvent):
output_messages.append(
f'[Tools] Tool call {event.tool_call_id!r} returned => {event.result.content}'
)
elif Agent.is_end_node(node):
assert run.result.data == node.data.data
# Once an End node is reached, the agent run is complete
output_messages.append(f'=== Final Agent Output: {run.result.data} ===')
if __name__ == '__main__':
asyncio.run(main())
print(output_messages)
"""
[
'=== ModelRequestNode: streaming partial request tokens ===',
'[Request] Starting part 0: ToolCallPart(tool_name=\'weather_forecast\', args=\'{"location":"Pa\', tool_call_id=\'0001\', part_kind=\'tool-call\')',
'[Request] Part 0 args_delta=ris","forecast_',
'[Request] Part 0 args_delta=date":"2030-01-',
'[Request] Part 0 args_delta=01"}',
'=== CallToolsNode: streaming partial response & tool usage ===',
'[Tools] The LLM calls tool=\'weather_forecast\' with args={"location":"Paris","forecast_date":"2030-01-01"} (tool_call_id=\'0001\')',
"[Tools] Tool call '0001' returned => The forecast in Paris on 2030-01-01 is 24°C and sunny.",
'=== ModelRequestNode: streaming partial request tokens ===',
"[Request] Starting part 0: TextPart(content='It will be ', part_kind='text')",
'[Result] The model produced a final result (tool_name=None)',
"[Request] Part 0 text delta: 'warm and sunny '",
"[Request] Part 0 text delta: 'in Paris on '",
"[Request] Part 0 text delta: 'Tuesday.'",
'=== CallToolsNode: streaming partial response & tool usage ===',
'=== Final Agent Output: It will be warm and sunny in Paris on Tuesday. ===',
]
"""
其他配置
使用限制
PydanticAI 提供了一个 UsageLimits
结构,以帮助您限制模型运行的使用量(令牌和/或请求)。
您可以通过将 usage_limits
参数传递给 run{_sync,_stream}
函数来应用这些设置。
考虑以下示例,我们限制了响应令牌的数量
from pydantic_ai import Agent
from pydantic_ai.exceptions import UsageLimitExceeded
from pydantic_ai.usage import UsageLimits
agent = Agent('anthropic:claude-3-5-sonnet-latest')
result_sync = agent.run_sync(
'What is the capital of Italy? Answer with just the city.',
usage_limits=UsageLimits(response_tokens_limit=10),
)
print(result_sync.data)
#> Rome
print(result_sync.usage())
"""
Usage(requests=1, request_tokens=62, response_tokens=1, total_tokens=63, details=None)
"""
try:
result_sync = agent.run_sync(
'What is the capital of Italy? Answer with a paragraph.',
usage_limits=UsageLimits(response_tokens_limit=10),
)
except UsageLimitExceeded as e:
print(e)
#> Exceeded the response_tokens_limit of 10 (response_tokens=32)
限制请求数量对于防止无限循环或过度工具调用非常有用
from typing_extensions import TypedDict
from pydantic_ai import Agent, ModelRetry
from pydantic_ai.exceptions import UsageLimitExceeded
from pydantic_ai.usage import UsageLimits
class NeverResultType(TypedDict):
"""
Never ever coerce data to this type.
"""
never_use_this: str
agent = Agent(
'anthropic:claude-3-5-sonnet-latest',
retries=3,
result_type=NeverResultType,
system_prompt='Any time you get a response, call the `infinite_retry_tool` to produce another response.',
)
@agent.tool_plain(retries=5) # (1)!
def infinite_retry_tool() -> int:
raise ModelRetry('Please try again.')
try:
result_sync = agent.run_sync(
'Begin infinite retry loop!', usage_limits=UsageLimits(request_limit=3) # (2)!
)
except UsageLimitExceeded as e:
print(e)
#> The next request would exceed the request_limit of 3
- 此工具能够在出错之前重试 5 次,模拟一个可能陷入循环的工具。
- 此运行将在 3 个请求后出错,从而防止无限工具调用。
注意
如果您注册了很多工具,这一点尤其重要。 request_limit
可用于防止模型在循环中调用它们太多次。
模型(运行)设置
PydanticAI 提供了一个 settings.ModelSettings
结构,以帮助您微调请求。此结构允许您配置影响模型行为的常见参数,例如 temperature
、max_tokens
、timeout
等。
有两种方法可以应用这些设置:1. 通过 model_settings
参数传递给 run{_sync,_stream}
函数。这允许在每个请求的基础上进行微调。 2. 通过 model_settings
参数在 Agent
初始化期间进行设置。这些设置将默认应用于使用所述代理的所有后续运行调用。但是,在特定运行调用期间提供的 model_settings
将覆盖代理的默认设置。
例如,如果您想将 temperature
设置设置为 0.0
以确保较少的随机行为,您可以执行以下操作
from pydantic_ai import Agent
agent = Agent('openai:gpt-4o')
result_sync = agent.run_sync(
'What is the capital of Italy?', model_settings={'temperature': 0.0}
)
print(result_sync.data)
#> Rome
模型特定设置
如果您希望进一步自定义模型行为,您可以使用 ModelSettings
的子类,例如与您选择的模型关联的 GeminiModelSettings
。
例如
from pydantic_ai import Agent, UnexpectedModelBehavior
from pydantic_ai.models.gemini import GeminiModelSettings
agent = Agent('google-gla:gemini-1.5-flash')
try:
result = agent.run_sync(
'Write a list of 5 very rude things that I might say to the universe after stubbing my toe in the dark:',
model_settings=GeminiModelSettings(
temperature=0.0, # general model settings can also be specified
gemini_safety_settings=[
{
'category': 'HARM_CATEGORY_HARASSMENT',
'threshold': 'BLOCK_LOW_AND_ABOVE',
},
{
'category': 'HARM_CATEGORY_HATE_SPEECH',
'threshold': 'BLOCK_LOW_AND_ABOVE',
},
],
),
)
except UnexpectedModelBehavior as e:
print(e) # (1)!
"""
Safety settings triggered, body:
<safety settings details>
"""
- 引发此错误是因为超过了安全阈值。通常,
result
将包含正常的ModelResponse
。
运行 vs. 对话
代理运行可能代表整个对话 — 单次运行中可以交换的消息数量没有限制。但是,对话也可能由多次运行组成,特别是如果您需要在单独的交互或 API 调用之间维护状态。
这是一个由多次运行组成的对话示例
from pydantic_ai import Agent
agent = Agent('openai:gpt-4o')
# First run
result1 = agent.run_sync('Who was Albert Einstein?')
print(result1.data)
#> Albert Einstein was a German-born theoretical physicist.
# Second run, passing previous messages
result2 = agent.run_sync(
'What was his most famous equation?',
message_history=result1.new_messages(), # (1)!
)
print(result2.data)
#> Albert Einstein's most famous equation is (E = mc^2).
- 继续对话;如果没有
message_history
,模型将不知道“his”指的是谁。
(此示例是完整的,可以“按原样”运行)
设计上的类型安全
PydanticAI 旨在与静态类型检查器(如 mypy 和 pyright)良好协作。
类型提示在某种程度上是可选的
PydanticAI 旨在尽可能使类型检查对您有用(如果您选择使用它),但您不必一直到处使用类型。
也就是说,由于 PydanticAI 使用 Pydantic,而 Pydantic 使用类型提示作为模式和验证的定义,因此某些类型(特别是工具参数上的类型提示,以及 Agent
的 result_type
参数)在运行时使用。
如果类型提示给您带来的困惑多于帮助,我们(库开发人员)就搞砸了,如果您发现这种情况,请创建一个 issue 解释是什么让您感到恼火!
特别是,代理在其依赖项类型和它们返回的结果类型方面都是通用的,因此您可以使用类型提示来确保您正在使用正确的类型。
考虑以下带有类型错误的脚本
from dataclasses import dataclass
from pydantic_ai import Agent, RunContext
@dataclass
class User:
name: str
agent = Agent(
'test',
deps_type=User, # (1)!
result_type=bool,
)
@agent.system_prompt
def add_user_name(ctx: RunContext[str]) -> str: # (2)!
return f"The user's name is {ctx.deps}."
def foobar(x: bytes) -> None:
pass
result = agent.run_sync('Does their name start with "A"?', deps=User('Anne'))
foobar(result.data) # (3)!
- 该代理被定义为期望
User
的实例作为deps
。 - 但是这里的
add_user_name
被定义为接受str
作为依赖项,而不是User
。 - 由于代理被定义为返回
bool
,这将引发类型错误,因为foobar
期望bytes
。
在此运行 mypy
将给出以下输出
➤ uv run mypy type_mistakes.py
type_mistakes.py:18: error: Argument 1 to "system_prompt" of "Agent" has incompatible type "Callable[[RunContext[str]], str]"; expected "Callable[[RunContext[User]], str]" [arg-type]
type_mistakes.py:28: error: Argument 1 to "foobar" has incompatible type "bool"; expected "bytes" [arg-type]
Found 2 errors in 1 file (checked 1 source file)
运行 pyright
将识别出相同的问题。
系统提示
系统提示乍一看似乎很简单,因为它们只是字符串(或连接的字符串序列),但制作正确的系统提示是使模型按您希望的方式行为的关键。
一般来说,系统提示分为两类
- 静态系统提示:这些在编写代码时已知,可以通过
Agent
构造函数 的system_prompt
参数定义。 - 动态系统提示:这些在某种程度上取决于运行时才知道的上下文,应通过使用
@agent.system_prompt
修饰器装饰的函数来定义。
您可以将两者都添加到单个代理;它们按照运行时定义的顺序附加。
这是一个使用两种类型的系统提示的示例
from datetime import date
from pydantic_ai import Agent, RunContext
agent = Agent(
'openai:gpt-4o',
deps_type=str, # (1)!
system_prompt="Use the customer's name while replying to them.", # (2)!
)
@agent.system_prompt # (3)!
def add_the_users_name(ctx: RunContext[str]) -> str:
return f"The user's name is {ctx.deps}."
@agent.system_prompt
def add_the_date() -> str: # (4)!
return f'The date is {date.today()}.'
result = agent.run_sync('What is the date?', deps='Frank')
print(result.data)
#> Hello Frank, the date today is 2032-01-02.
- 该代理期望一个字符串依赖项。
- 在代理创建时定义的静态系统提示。
- 通过带有
RunContext
的装饰器定义的动态系统提示,这在run_sync
之后立即调用,而不是在创建代理时调用,因此可以受益于运行时信息,例如该运行中使用的依赖项。 - 另一个动态系统提示,系统提示不必具有
RunContext
参数。
(此示例是完整的,可以“按原样”运行)
反思和自我纠正
来自函数工具参数验证和 结构化结果验证 的验证错误可以传递回模型,并请求重试。
您还可以从 工具 或 结果验证器函数 中引发 ModelRetry
,以告知模型应重试生成响应。
这是一个示例
from pydantic import BaseModel
from pydantic_ai import Agent, RunContext, ModelRetry
from fake_database import DatabaseConn
class ChatResult(BaseModel):
user_id: int
message: str
agent = Agent(
'openai:gpt-4o',
deps_type=DatabaseConn,
result_type=ChatResult,
)
@agent.tool(retries=2)
def get_user_by_name(ctx: RunContext[DatabaseConn], name: str) -> int:
"""Get a user's ID from their full name."""
print(name)
#> John
#> John Doe
user_id = ctx.deps.users.get(name=name)
if user_id is None:
raise ModelRetry(
f'No user found with name {name!r}, remember to provide their full name'
)
return user_id
result = agent.run_sync(
'Send a message to John Doe asking for coffee next week', deps=DatabaseConn()
)
print(result.data)
"""
user_id=123 message='Hello John, would you be free for coffee sometime next week? Let me know what works for you!'
"""
模型错误
如果模型行为异常(例如,超出重试限制,或其 API 返回 503
),代理运行将引发 UnexpectedModelBehavior
。
在这些情况下,可以使用 capture_run_messages
来访问运行期间交换的消息,以帮助诊断问题。
from pydantic_ai import Agent, ModelRetry, UnexpectedModelBehavior, capture_run_messages
agent = Agent('openai:gpt-4o')
@agent.tool_plain
def calc_volume(size: int) -> int: # (1)!
if size == 42:
return size**3
else:
raise ModelRetry('Please try again.')
with capture_run_messages() as messages: # (2)!
try:
result = agent.run_sync('Please get me the volume of a box with size 6.')
except UnexpectedModelBehavior as e:
print('An error occurred:', e)
#> An error occurred: Tool exceeded max retries count of 1
print('cause:', repr(e.__cause__))
#> cause: ModelRetry('Please try again.')
print('messages:', messages)
"""
messages:
[
ModelRequest(
parts=[
UserPromptPart(
content='Please get me the volume of a box with size 6.',
timestamp=datetime.datetime(...),
part_kind='user-prompt',
)
],
kind='request',
),
ModelResponse(
parts=[
ToolCallPart(
tool_name='calc_volume',
args={'size': 6},
tool_call_id=None,
part_kind='tool-call',
)
],
model_name='gpt-4o',
timestamp=datetime.datetime(...),
kind='response',
),
ModelRequest(
parts=[
RetryPromptPart(
content='Please try again.',
tool_name='calc_volume',
tool_call_id=None,
timestamp=datetime.datetime(...),
part_kind='retry-prompt',
)
],
kind='request',
),
ModelResponse(
parts=[
ToolCallPart(
tool_name='calc_volume',
args={'size': 6},
tool_call_id=None,
part_kind='tool-call',
)
],
model_name='gpt-4o',
timestamp=datetime.datetime(...),
kind='response',
),
]
"""
else:
print(result.data)
- 定义一个工具,在这种情况下会重复引发
ModelRetry
。 capture_run_messages
用于捕获运行期间交换的消息。
(此示例是完整的,可以“按原样”运行)
注意
如果您在单个 capture_run_messages
上下文中多次调用 run
、run_sync
或 run_stream
,则 messages
将仅表示第一次调用期间交换的消息。