消息和聊天历史
Pydantic AI 提供了访问智能体运行期间交换的消息的功能。这些消息既可以用来继续连贯的对话,也可以用来了解智能体的执行情况。
从结果中访问消息
运行智能体后,您可以从 result
对象访问该次运行期间交换的消息。
RunResult
(由 Agent.run
、Agent.run_sync
返回)和 StreamedRunResult
(由 Agent.run_stream
返回)都具有以下方法:
all_messages()
: 返回所有消息,包括之前运行的消息。还有一个返回 JSON 字节的变体:all_messages_json()
。new_messages()
: 仅返回当前运行的消息。还有一个返回 JSON 字节的变体:new_messages_json()
。
StreamedRunResult 和完整的消息
在 StreamedRunResult
上,从这些方法返回的消息只有在流结束后才会包含最终的结果消息。
例如,您已经等待了以下协程之一:
StreamedRunResult.stream_output()
StreamedRunResult.stream_text()
StreamedRunResult.stream_responses()
StreamedRunResult.get_output()
注意: 如果您使用 .stream_text(delta=True)
,最终的结果消息将不会被添加到结果消息中,因为在这种情况下,结果内容从未被构造成一个完整的字符串。
在 RunResult
上访问方法的示例
from pydantic_ai import Agent
agent = Agent('openai:gpt-4o', system_prompt='Be a helpful assistant.')
result = agent.run_sync('Tell me a joke.')
print(result.output)
#> Did you hear about the toothpaste scandal? They called it Colgate.
# all messages from the run
print(result.all_messages())
"""
[
ModelRequest(
parts=[
SystemPromptPart(
content='Be a helpful assistant.',
timestamp=datetime.datetime(...),
),
UserPromptPart(
content='Tell me a joke.',
timestamp=datetime.datetime(...),
),
]
),
ModelResponse(
parts=[
TextPart(
content='Did you hear about the toothpaste scandal? They called it Colgate.'
)
],
usage=RequestUsage(input_tokens=60, output_tokens=12),
model_name='gpt-4o',
timestamp=datetime.datetime(...),
),
]
"""
(这个例子是完整的,可以“按原样”运行)
在 StreamedRunResult
上访问方法的示例
from pydantic_ai import Agent
agent = Agent('openai:gpt-4o', system_prompt='Be a helpful assistant.')
async def main():
async with agent.run_stream('Tell me a joke.') as result:
# incomplete messages before the stream finishes
print(result.all_messages())
"""
[
ModelRequest(
parts=[
SystemPromptPart(
content='Be a helpful assistant.',
timestamp=datetime.datetime(...),
),
UserPromptPart(
content='Tell me a joke.',
timestamp=datetime.datetime(...),
),
]
)
]
"""
async for text in result.stream_text():
print(text)
#> Did you hear
#> Did you hear about the toothpaste
#> Did you hear about the toothpaste scandal? They called
#> Did you hear about the toothpaste scandal? They called it Colgate.
# complete messages once the stream finishes
print(result.all_messages())
"""
[
ModelRequest(
parts=[
SystemPromptPart(
content='Be a helpful assistant.',
timestamp=datetime.datetime(...),
),
UserPromptPart(
content='Tell me a joke.',
timestamp=datetime.datetime(...),
),
]
),
ModelResponse(
parts=[
TextPart(
content='Did you hear about the toothpaste scandal? They called it Colgate.'
)
],
usage=RequestUsage(input_tokens=50, output_tokens=12),
model_name='gpt-4o',
timestamp=datetime.datetime(...),
),
]
"""
(此示例是完整的,可以“按原样”运行——您需要添加 asyncio.run(main())
来运行 main
)
使用消息作为后续智能体运行的输入
在 Pydantic AI 中,消息记录的主要用途是在多次智能体运行之间保持上下文。
要在一次运行中使用现有消息,请将它们传递给 Agent.run
、Agent.run_sync
或 Agent.run_stream
的 message_history
参数。
如果 message_history
已设置且不为空,则不会生成新的系统提示——我们假定现有的消息记录中已包含系统提示。
from pydantic_ai import Agent
agent = Agent('openai:gpt-4o', system_prompt='Be a helpful assistant.')
result1 = agent.run_sync('Tell me a joke.')
print(result1.output)
#> Did you hear about the toothpaste scandal? They called it Colgate.
result2 = agent.run_sync('Explain?', message_history=result1.new_messages())
print(result2.output)
#> This is an excellent joke invented by Samuel Colvin, it needs no explanation.
print(result2.all_messages())
"""
[
ModelRequest(
parts=[
SystemPromptPart(
content='Be a helpful assistant.',
timestamp=datetime.datetime(...),
),
UserPromptPart(
content='Tell me a joke.',
timestamp=datetime.datetime(...),
),
]
),
ModelResponse(
parts=[
TextPart(
content='Did you hear about the toothpaste scandal? They called it Colgate.'
)
],
usage=RequestUsage(input_tokens=60, output_tokens=12),
model_name='gpt-4o',
timestamp=datetime.datetime(...),
),
ModelRequest(
parts=[
UserPromptPart(
content='Explain?',
timestamp=datetime.datetime(...),
)
]
),
ModelResponse(
parts=[
TextPart(
content='This is an excellent joke invented by Samuel Colvin, it needs no explanation.'
)
],
usage=RequestUsage(input_tokens=61, output_tokens=26),
model_name='gpt-4o',
timestamp=datetime.datetime(...),
),
]
"""
(这个例子是完整的,可以“按原样”运行)
存储和加载消息(转为 JSON)
虽然在内存中维护对话状态对于许多应用来说已经足够,但有时您可能希望将智能体运行的消息记录存储在磁盘或数据库中。这可能是为了评估、在 Python 和 JavaScript/TypeScript 之间共享数据,或任何其他用例。
实现这一点的预期方法是使用 TypeAdapter
。
我们导出了可用于此目的的 ModelMessagesTypeAdapter
,您也可以创建自己的。
以下是一个示例,展示了如何操作:
from pydantic_core import to_jsonable_python
from pydantic_ai import Agent
from pydantic_ai.messages import ModelMessagesTypeAdapter # (1)!
agent = Agent('openai:gpt-4o', system_prompt='Be a helpful assistant.')
result1 = agent.run_sync('Tell me a joke.')
history_step_1 = result1.all_messages()
as_python_objects = to_jsonable_python(history_step_1) # (2)!
same_history_as_step_1 = ModelMessagesTypeAdapter.validate_python(as_python_objects)
result2 = agent.run_sync( # (3)!
'Tell me a different joke.', message_history=same_history_as_step_1
)
- 或者,您可以从头创建一个
TypeAdapter
:from pydantic import TypeAdapter from pydantic_ai.messages import ModelMessage ModelMessagesTypeAdapter = TypeAdapter(list[ModelMessage])
- 或者,您也可以直接序列化为 JSON 或从 JSON 反序列化:
from pydantic_core import to_json ... as_json_objects = to_json(history_step_1) same_history_as_step_1 = ModelMessagesTypeAdapter.validate_json(as_json_objects)
- 现在,尽管创建了一个新的智能体运行,您仍然可以使用记录
same_history_as_step_1
继续对话。
(这个例子是完整的,可以“按原样”运行)
使用消息的其他方式
由于消息是由简单的数据类(dataclasses)定义的,您可以手动创建和操作它们,例如用于测试。
消息格式与所使用的模型无关,因此您可以在不同的智能体中使用消息,或者在同一个智能体中使用不同的模型。
在下面的示例中,我们重用了第一个智能体运行(使用 openai:gpt-4o
模型)中的消息,并在第二个使用 google-gla:gemini-1.5-pro
模型的智能体运行中使用它。
from pydantic_ai import Agent
agent = Agent('openai:gpt-4o', system_prompt='Be a helpful assistant.')
result1 = agent.run_sync('Tell me a joke.')
print(result1.output)
#> Did you hear about the toothpaste scandal? They called it Colgate.
result2 = agent.run_sync(
'Explain?',
model='google-gla:gemini-1.5-pro',
message_history=result1.new_messages(),
)
print(result2.output)
#> This is an excellent joke invented by Samuel Colvin, it needs no explanation.
print(result2.all_messages())
"""
[
ModelRequest(
parts=[
SystemPromptPart(
content='Be a helpful assistant.',
timestamp=datetime.datetime(...),
),
UserPromptPart(
content='Tell me a joke.',
timestamp=datetime.datetime(...),
),
]
),
ModelResponse(
parts=[
TextPart(
content='Did you hear about the toothpaste scandal? They called it Colgate.'
)
],
usage=RequestUsage(input_tokens=60, output_tokens=12),
model_name='gpt-4o',
timestamp=datetime.datetime(...),
),
ModelRequest(
parts=[
UserPromptPart(
content='Explain?',
timestamp=datetime.datetime(...),
)
]
),
ModelResponse(
parts=[
TextPart(
content='This is an excellent joke invented by Samuel Colvin, it needs no explanation.'
)
],
usage=RequestUsage(input_tokens=61, output_tokens=26),
model_name='gemini-1.5-pro',
timestamp=datetime.datetime(...),
),
]
"""
处理消息记录
有时,您可能希望在将消息记录发送给模型之前对其进行修改。这可能是出于隐私原因(过滤掉敏感信息)、节省 token 成本、为 LLM 提供较少上下文,或实现自定义处理逻辑。
Pydantic AI 在 Agent
上提供了一个 history_processors
参数,允许您在每次模型请求之前拦截和修改消息记录。
记录处理器会替换消息记录
记录处理器会用处理过的消息(包括新的用户提示部分)替换状态中的消息记录。这意味着,如果您想保留原始的消息记录,您需要创建它的一个副本。
用法
history_processors
是一个可调用对象(callables)的列表,它们接受一个 ModelMessage
列表,并返回一个修改过的同类型列表。
每个处理器按顺序应用,处理器可以是同步的也可以是异步的。
from pydantic_ai import Agent
from pydantic_ai.messages import (
ModelMessage,
ModelRequest,
ModelResponse,
TextPart,
UserPromptPart,
)
def filter_responses(messages: list[ModelMessage]) -> list[ModelMessage]:
"""Remove all ModelResponse messages, keeping only ModelRequest messages."""
return [msg for msg in messages if isinstance(msg, ModelRequest)]
# Create agent with history processor
agent = Agent('openai:gpt-4o', history_processors=[filter_responses])
# Example: Create some conversation history
message_history = [
ModelRequest(parts=[UserPromptPart(content='What is 2+2?')]),
ModelResponse(parts=[TextPart(content='2+2 equals 4')]), # This will be filtered out
]
# When you run the agent, the history processor will filter out ModelResponse messages
# result = agent.run_sync('What about 3+3?', message_history=message_history)
仅保留最近的消息
您可以使用 history_processor
只保留最近的消息:
from pydantic_ai import Agent
from pydantic_ai.messages import ModelMessage
async def keep_recent_messages(messages: list[ModelMessage]) -> list[ModelMessage]:
"""Keep only the last 5 messages to manage token usage."""
return messages[-5:] if len(messages) > 5 else messages
agent = Agent('openai:gpt-4o', history_processors=[keep_recent_messages])
# Example: Even with a long conversation history, only the last 5 messages are sent to the model
long_conversation_history: list[ModelMessage] = [] # Your long conversation history here
# result = agent.run_sync('What did we discuss?', message_history=long_conversation_history)
切片消息记录时要小心
在对消息记录进行切片时,您需要确保工具调用和返回是成对的,否则 LLM 可能会返回错误。更多详情,请参阅此 GitHub issue。
RunContext
参数
记录处理器可以选择性地接受一个 RunContext
参数,以访问有关当前运行的附加信息,例如依赖项、模型信息和使用情况统计信息。
from pydantic_ai import Agent, RunContext
from pydantic_ai.messages import ModelMessage
def context_aware_processor(
ctx: RunContext[None],
messages: list[ModelMessage],
) -> list[ModelMessage]:
# Access current usage
current_tokens = ctx.usage.total_tokens
# Filter messages based on context
if current_tokens > 1000:
return messages[-3:] # Keep only recent messages when token usage is high
return messages
agent = Agent('openai:gpt-4o', history_processors=[context_aware_processor])
这允许基于智能体运行的当前状态进行更复杂的消息处理。
总结旧消息
使用 LLM 总结较旧的消息,以在减少 token 的同时保留上下文。
from pydantic_ai import Agent
from pydantic_ai.messages import ModelMessage
# Use a cheaper model to summarize old messages.
summarize_agent = Agent(
'openai:gpt-4o-mini',
instructions="""
Summarize this conversation, omitting small talk and unrelated topics.
Focus on the technical discussion and next steps.
""",
)
async def summarize_old_messages(messages: list[ModelMessage]) -> list[ModelMessage]:
# Summarize the oldest 10 messages
if len(messages) > 10:
oldest_messages = messages[:10]
summary = await summarize_agent.run(message_history=oldest_messages)
# Return the last message and the summary
return summary.new_messages() + messages[-1:]
return messages
agent = Agent('openai:gpt-4o', history_processors=[summarize_old_messages])
总结消息记录时要小心
在总结消息记录时,您需要确保工具调用和返回是成对的,否则 LLM 可能会返回错误。更多详情,请参阅此 GitHub issue,您可以在其中找到总结消息记录的示例。
测试记录处理器
您可以使用 FunctionModel
来测试实际发送给模型提供商的消息。
import pytest
from pydantic_ai import Agent
from pydantic_ai.messages import (
ModelMessage,
ModelRequest,
ModelResponse,
TextPart,
UserPromptPart,
)
from pydantic_ai.models.function import AgentInfo, FunctionModel
@pytest.fixture
def received_messages() -> list[ModelMessage]:
return []
@pytest.fixture
def function_model(received_messages: list[ModelMessage]) -> FunctionModel:
def capture_model_function(messages: list[ModelMessage], info: AgentInfo) -> ModelResponse:
# Capture the messages that the provider actually receives
received_messages.clear()
received_messages.extend(messages)
return ModelResponse(parts=[TextPart(content='Provider response')])
return FunctionModel(capture_model_function)
def test_history_processor(function_model: FunctionModel, received_messages: list[ModelMessage]):
def filter_responses(messages: list[ModelMessage]) -> list[ModelMessage]:
return [msg for msg in messages if isinstance(msg, ModelRequest)]
agent = Agent(function_model, history_processors=[filter_responses])
message_history = [
ModelRequest(parts=[UserPromptPart(content='Question 1')]),
ModelResponse(parts=[TextPart(content='Answer 1')]),
]
agent.run_sync('Question 2', message_history=message_history)
assert received_messages == [
ModelRequest(parts=[UserPromptPart(content='Question 1')]),
ModelRequest(parts=[UserPromptPart(content='Question 2')]),
]
多个处理器
您也可以使用多个处理器:
from pydantic_ai import Agent
from pydantic_ai.messages import ModelMessage, ModelRequest
def filter_responses(messages: list[ModelMessage]) -> list[ModelMessage]:
return [msg for msg in messages if isinstance(msg, ModelRequest)]
def summarize_old_messages(messages: list[ModelMessage]) -> list[ModelMessage]:
return messages[-5:]
agent = Agent('openai:gpt-4o', history_processors=[filter_responses, summarize_old_messages])
在这种情况下,filter_responses
处理器将首先应用,然后 summarize_old_messages
处理器将其次应用。
示例
有关在对话中使用消息的更完整示例,请参阅聊天应用示例。