延迟工具
在某些情况下,模型可能需要调用一个不应该或不能在同一个代理(agent)运行期间、同一个 Python 进程内执行的工具。
- 它可能需要先得到用户的批准
- 它可能依赖于上游服务、前端或用户来提供结果
- 生成结果所需的时间可能过长,以至于让代理进程一直运行是不合理的
为了支持这些用例,Pydantic AI 提供了延迟工具(deferred tools)的概念,它有两种类型,如下文所述:
当模型调用延迟工具时,代理运行将以一个 DeferredToolRequests
输出对象结束,该对象包含有关延迟工具调用的信息。一旦批准和/或结果准备就绪,就可以使用原始运行的消息历史记录外加一个包含 `DeferredToolRequests` 中每个工具调用结果的 DeferredToolResults
对象来开始一个新的代理运行,这将从原始运行中断的地方继续执行。
请注意,处理延迟工具调用需要将 DeferredToolRequests
包含在 Agent
的 output_type
中,以便正确推断代理运行输出的可能类型。如果你的代理也可以在没有延迟工具的上下文中使用,并且你不想在使用代理的任何地方都处理该类型,你可以在使用 agent.run()
、agent.run_sync()
、agent.run_stream()
或 agent.iter()
运行代理时传入 output_type
参数。请注意,运行时的 output_type
会覆盖在构建时指定的那个(出于类型推断的原因),因此你需要明确包含原始的输出类型。
需要人工审批的工具
如果一个工具函数总是需要批准,你可以将 requires_approval=True
参数传递给 @agent.tool
装饰器、@agent.tool_plain
装饰器、Tool
类、FunctionToolset.tool
装饰器或 FunctionToolset.add_function()
方法。在函数内部,你就可以假设该工具调用已经被批准。
如果工具函数是否需要批准取决于工具调用参数或代理的运行上下文(例如依赖项或消息历史记录),你可以从工具函数中抛出 ApprovalRequired
异常。RunContext.tool_call_approved
属性将在工具调用已经被批准时为 True
。
要对工具集(如 MCP 服务器)提供的工具调用要求批准,请参阅ApprovalRequiredToolset
文档。
当模型调用需要批准的工具时,代理运行将以一个 DeferredToolRequests
输出对象结束,该对象带有一个 approvals
列表,其中包含 ToolCallPart
,每个部分都含有工具名称、经过验证的参数以及唯一的工具调用 ID。
一旦你收集了用户的批准或拒绝,你就可以构建一个 DeferredToolResults
对象,其 approvals
字典将每个工具调用 ID 映射到一个布尔值、一个 ToolApproved
对象(带有可选的 override_args
)或一个 ToolDenied
对象(带有可选的自定义 message
以提供给模型)。然后,可以将这个 DeferredToolResults
对象作为 deferred_tool_results
提供给某个代理运行方法,同时附上原始运行的消息历史记录。
以下示例展示了如何要求对所有文件删除操作以及对特定受保护文件的更新操作进行批准:
from pydantic_ai import (
Agent,
ApprovalRequired,
DeferredToolRequests,
DeferredToolResults,
RunContext,
ToolDenied,
)
agent = Agent('openai:gpt-5', output_type=[str, DeferredToolRequests])
PROTECTED_FILES = {'.env'}
@agent.tool
def update_file(ctx: RunContext, path: str, content: str) -> str:
if path in PROTECTED_FILES and not ctx.tool_call_approved:
raise ApprovalRequired
return f'File {path!r} updated: {content!r}'
@agent.tool_plain(requires_approval=True)
def delete_file(path: str) -> str:
return f'File {path!r} deleted'
result = agent.run_sync('Delete `__init__.py`, write `Hello, world!` to `README.md`, and clear `.env`')
messages = result.all_messages()
assert isinstance(result.output, DeferredToolRequests)
requests = result.output
print(requests)
"""
DeferredToolRequests(
calls=[],
approvals=[
ToolCallPart(
tool_name='update_file',
args={'path': '.env', 'content': ''},
tool_call_id='update_file_dotenv',
),
ToolCallPart(
tool_name='delete_file',
args={'path': '__init__.py'},
tool_call_id='delete_file',
),
],
)
"""
results = DeferredToolResults()
for call in requests.approvals:
result = False
if call.tool_name == 'update_file':
# Approve all updates
result = True
elif call.tool_name == 'delete_file':
# deny all deletes
result = ToolDenied('Deleting files is not allowed')
results.approvals[call.tool_call_id] = result
result = agent.run_sync(message_history=messages, deferred_tool_results=results)
print(result.output)
"""
I successfully updated `README.md` and cleared `.env`, but was not able to delete `__init__.py`.
"""
print(result.all_messages())
"""
[
ModelRequest(
parts=[
UserPromptPart(
content='Delete `__init__.py`, write `Hello, world!` to `README.md`, and clear `.env`',
timestamp=datetime.datetime(...),
)
]
),
ModelResponse(
parts=[
ToolCallPart(
tool_name='delete_file',
args={'path': '__init__.py'},
tool_call_id='delete_file',
),
ToolCallPart(
tool_name='update_file',
args={'path': 'README.md', 'content': 'Hello, world!'},
tool_call_id='update_file_readme',
),
ToolCallPart(
tool_name='update_file',
args={'path': '.env', 'content': ''},
tool_call_id='update_file_dotenv',
),
],
usage=RequestUsage(input_tokens=63, output_tokens=21),
model_name='gpt-5',
timestamp=datetime.datetime(...),
),
ModelRequest(
parts=[
ToolReturnPart(
tool_name='delete_file',
content='Deleting files is not allowed',
tool_call_id='delete_file',
timestamp=datetime.datetime(...),
),
ToolReturnPart(
tool_name='update_file',
content="File 'README.md' updated: 'Hello, world!'",
tool_call_id='update_file_readme',
timestamp=datetime.datetime(...),
),
ToolReturnPart(
tool_name='update_file',
content="File '.env' updated: ''",
tool_call_id='update_file_dotenv',
timestamp=datetime.datetime(...),
),
]
),
ModelResponse(
parts=[
TextPart(
content='I successfully updated `README.md` and cleared `.env`, but was not able to delete `__init__.py`.'
)
],
usage=RequestUsage(input_tokens=79, output_tokens=39),
model_name='gpt-5',
timestamp=datetime.datetime(...),
),
]
"""
(这个例子是完整的,可以“按原样”运行)
外部工具执行
当工具调用的结果无法在调用它的同一个代理运行中生成时,该工具被视为外部工具。外部工具的例子包括由 Web 或应用前端实现的客户端工具,以及被移交给后台工作进程或外部服务处理的慢速任务,以避免让代理进程持续运行。
如果一个工具调用是否应该在外部执行取决于工具调用参数、代理的运行上下文(例如依赖项或消息历史记录),或者任务预计需要多长时间,你可以定义一个工具函数并有条件地抛出 CallDeferred
异常。在抛出异常之前,工具函数通常会安排某个后台任务,并传递 RunContext.tool_call_id
,以便稍后能将结果与延迟的工具调用匹配起来。
如果一个工具总是外部执行,并且其定义连同其参数的 JSON schema 一起提供给你的代码,你可以使用 ExternalToolset
。如果外部工具是预先知道的,但你手头没有参数的 JSON schema,你也可以定义一个具有适当签名的工具函数,该函数除了抛出 CallDeferred
异常外什么也不做。
当模型调用外部工具时,代理运行将以一个 DeferredToolRequests
输出对象结束,该对象带有一个 calls
列表,其中包含 ToolCallPart
,每个部分都含有工具名称、经过验证的参数以及唯一的工具调用 ID。
一旦工具调用结果准备就绪,你就可以构建一个 DeferredToolResults
对象,其 calls
字典将每个工具调用 ID 映射到一个任意值以返回给模型、一个 ToolReturn
对象,或者在工具调用失败且模型应该重试时映射到一个 ModelRetry
异常。然后,可以将这个 DeferredToolResults
对象作为 deferred_tool_results
提供给某个代理运行方法,同时附上原始运行的消息历史记录。
以下示例展示了如何将一个需要较长时间完成的任务移至后台,并在任务完成后将结果返回给模型:
import asyncio
from dataclasses import dataclass
from typing import Any
from pydantic_ai import (
Agent,
CallDeferred,
DeferredToolRequests,
DeferredToolResults,
ModelRetry,
RunContext,
)
@dataclass
class TaskResult:
tool_call_id: str
result: Any
async def calculate_answer_task(tool_call_id: str, question: str) -> TaskResult:
await asyncio.sleep(1)
return TaskResult(tool_call_id=tool_call_id, result=42)
agent = Agent('openai:gpt-5', output_type=[str, DeferredToolRequests])
tasks: list[asyncio.Task[TaskResult]] = []
@agent.tool
async def calculate_answer(ctx: RunContext, question: str) -> str:
assert ctx.tool_call_id is not None
task = asyncio.create_task(calculate_answer_task(ctx.tool_call_id, question)) # (1)!
tasks.append(task)
raise CallDeferred
async def main():
result = await agent.run('Calculate the answer to the ultimate question of life, the universe, and everything')
messages = result.all_messages()
assert isinstance(result.output, DeferredToolRequests)
requests = result.output
print(requests)
"""
DeferredToolRequests(
calls=[
ToolCallPart(
tool_name='calculate_answer',
args={
'question': 'the ultimate question of life, the universe, and everything'
},
tool_call_id='pyd_ai_tool_call_id',
)
],
approvals=[],
)
"""
done, _ = await asyncio.wait(tasks) # (2)!
task_results = [task.result() for task in done]
task_results_by_tool_call_id = {result.tool_call_id: result.result for result in task_results}
results = DeferredToolResults()
for call in requests.calls:
try:
result = task_results_by_tool_call_id[call.tool_call_id]
except KeyError:
result = ModelRetry('No result for this tool call was found.')
results.calls[call.tool_call_id] = result
result = await agent.run(message_history=messages, deferred_tool_results=results)
print(result.output)
#> The answer to the ultimate question of life, the universe, and everything is 42.
print(result.all_messages())
"""
[
ModelRequest(
parts=[
UserPromptPart(
content='Calculate the answer to the ultimate question of life, the universe, and everything',
timestamp=datetime.datetime(...),
)
]
),
ModelResponse(
parts=[
ToolCallPart(
tool_name='calculate_answer',
args={
'question': 'the ultimate question of life, the universe, and everything'
},
tool_call_id='pyd_ai_tool_call_id',
)
],
usage=RequestUsage(input_tokens=63, output_tokens=13),
model_name='gpt-5',
timestamp=datetime.datetime(...),
),
ModelRequest(
parts=[
ToolReturnPart(
tool_name='calculate_answer',
content=42,
tool_call_id='pyd_ai_tool_call_id',
timestamp=datetime.datetime(...),
)
]
),
ModelResponse(
parts=[
TextPart(
content='The answer to the ultimate question of life, the universe, and everything is 42.'
)
],
usage=RequestUsage(input_tokens=64, output_tokens=28),
model_name='gpt-5',
timestamp=datetime.datetime(...),
),
]
"""
- 在实际应用中,你很可能会使用 Celery 或类似的任务队列来在后台运行任务。
- 在实际应用中,这通常会发生在一个单独的进程中,该进程会轮询任务状态或在所有待处理任务完成时收到通知。
(此示例是完整的,可以“按原样”运行——您需要添加 asyncio.run(main())
来运行 main
)