跳转到内容

消息和聊天历史

Pydantic AI 提供了访问智能体运行期间交换的消息的功能。这些消息既可以用来继续连贯的对话,也可以用来了解智能体的执行情况。

从结果中访问消息

运行智能体后,您可以从 result 对象访问该次运行期间交换的消息。

RunResult(由 Agent.runAgent.run_sync 返回)和 StreamedRunResult(由 Agent.run_stream 返回)都具有以下方法:

StreamedRunResult 和完整的消息

StreamedRunResult 上,从这些方法返回的消息只有在流结束后才会包含最终的结果消息。

例如,您已经等待了以下协程之一:

注意: 如果您使用 .stream_text(delta=True),最终的结果消息将不会被添加到结果消息中,因为在这种情况下,结果内容从未被构造成一个完整的字符串。

RunResult 上访问方法的示例

run_result_messages.py
from pydantic_ai import Agent

agent = Agent('openai:gpt-4o', system_prompt='Be a helpful assistant.')

result = agent.run_sync('Tell me a joke.')
print(result.output)
#> Did you hear about the toothpaste scandal? They called it Colgate.

# all messages from the run
print(result.all_messages())
"""
[
    ModelRequest(
        parts=[
            SystemPromptPart(
                content='Be a helpful assistant.',
                timestamp=datetime.datetime(...),
            ),
            UserPromptPart(
                content='Tell me a joke.',
                timestamp=datetime.datetime(...),
            ),
        ]
    ),
    ModelResponse(
        parts=[
            TextPart(
                content='Did you hear about the toothpaste scandal? They called it Colgate.'
            )
        ],
        usage=RequestUsage(input_tokens=60, output_tokens=12),
        model_name='gpt-4o',
        timestamp=datetime.datetime(...),
    ),
]
"""

(这个例子是完整的,可以“按原样”运行)

StreamedRunResult 上访问方法的示例

streamed_run_result_messages.py
from pydantic_ai import Agent

agent = Agent('openai:gpt-4o', system_prompt='Be a helpful assistant.')


async def main():
    async with agent.run_stream('Tell me a joke.') as result:
        # incomplete messages before the stream finishes
        print(result.all_messages())
        """
        [
            ModelRequest(
                parts=[
                    SystemPromptPart(
                        content='Be a helpful assistant.',
                        timestamp=datetime.datetime(...),
                    ),
                    UserPromptPart(
                        content='Tell me a joke.',
                        timestamp=datetime.datetime(...),
                    ),
                ]
            )
        ]
        """

        async for text in result.stream_text():
            print(text)
            #> Did you hear
            #> Did you hear about the toothpaste
            #> Did you hear about the toothpaste scandal? They called
            #> Did you hear about the toothpaste scandal? They called it Colgate.

        # complete messages once the stream finishes
        print(result.all_messages())
        """
        [
            ModelRequest(
                parts=[
                    SystemPromptPart(
                        content='Be a helpful assistant.',
                        timestamp=datetime.datetime(...),
                    ),
                    UserPromptPart(
                        content='Tell me a joke.',
                        timestamp=datetime.datetime(...),
                    ),
                ]
            ),
            ModelResponse(
                parts=[
                    TextPart(
                        content='Did you hear about the toothpaste scandal? They called it Colgate.'
                    )
                ],
                usage=RequestUsage(input_tokens=50, output_tokens=12),
                model_name='gpt-4o',
                timestamp=datetime.datetime(...),
            ),
        ]
        """

(此示例是完整的,可以“按原样”运行——您需要添加 asyncio.run(main()) 来运行 main

使用消息作为后续智能体运行的输入

在 Pydantic AI 中,消息记录的主要用途是在多次智能体运行之间保持上下文。

要在一次运行中使用现有消息,请将它们传递给 Agent.runAgent.run_syncAgent.run_streammessage_history 参数。

如果 message_history 已设置且不为空,则不会生成新的系统提示——我们假定现有的消息记录中已包含系统提示。

在对话中重用消息
from pydantic_ai import Agent

agent = Agent('openai:gpt-4o', system_prompt='Be a helpful assistant.')

result1 = agent.run_sync('Tell me a joke.')
print(result1.output)
#> Did you hear about the toothpaste scandal? They called it Colgate.

result2 = agent.run_sync('Explain?', message_history=result1.new_messages())
print(result2.output)
#> This is an excellent joke invented by Samuel Colvin, it needs no explanation.

print(result2.all_messages())
"""
[
    ModelRequest(
        parts=[
            SystemPromptPart(
                content='Be a helpful assistant.',
                timestamp=datetime.datetime(...),
            ),
            UserPromptPart(
                content='Tell me a joke.',
                timestamp=datetime.datetime(...),
            ),
        ]
    ),
    ModelResponse(
        parts=[
            TextPart(
                content='Did you hear about the toothpaste scandal? They called it Colgate.'
            )
        ],
        usage=RequestUsage(input_tokens=60, output_tokens=12),
        model_name='gpt-4o',
        timestamp=datetime.datetime(...),
    ),
    ModelRequest(
        parts=[
            UserPromptPart(
                content='Explain?',
                timestamp=datetime.datetime(...),
            )
        ]
    ),
    ModelResponse(
        parts=[
            TextPart(
                content='This is an excellent joke invented by Samuel Colvin, it needs no explanation.'
            )
        ],
        usage=RequestUsage(input_tokens=61, output_tokens=26),
        model_name='gpt-4o',
        timestamp=datetime.datetime(...),
    ),
]
"""

(这个例子是完整的,可以“按原样”运行)

存储和加载消息(转为 JSON)

虽然在内存中维护对话状态对于许多应用来说已经足够,但有时您可能希望将智能体运行的消息记录存储在磁盘或数据库中。这可能是为了评估、在 Python 和 JavaScript/TypeScript 之间共享数据,或任何其他用例。

实现这一点的预期方法是使用 TypeAdapter

我们导出了可用于此目的的 ModelMessagesTypeAdapter,您也可以创建自己的。

以下是一个示例,展示了如何操作:

将消息序列化为 json
from pydantic_core import to_jsonable_python

from pydantic_ai import Agent
from pydantic_ai.messages import ModelMessagesTypeAdapter  # (1)!

agent = Agent('openai:gpt-4o', system_prompt='Be a helpful assistant.')

result1 = agent.run_sync('Tell me a joke.')
history_step_1 = result1.all_messages()
as_python_objects = to_jsonable_python(history_step_1)  # (2)!
same_history_as_step_1 = ModelMessagesTypeAdapter.validate_python(as_python_objects)

result2 = agent.run_sync(  # (3)!
    'Tell me a different joke.', message_history=same_history_as_step_1
)
  1. 或者,您可以从头创建一个 TypeAdapter
    from pydantic import TypeAdapter
    from pydantic_ai.messages import ModelMessage
    ModelMessagesTypeAdapter = TypeAdapter(list[ModelMessage])
    
  2. 或者,您也可以直接序列化为 JSON 或从 JSON 反序列化:
    from pydantic_core import to_json
    ...
    as_json_objects = to_json(history_step_1)
    same_history_as_step_1 = ModelMessagesTypeAdapter.validate_json(as_json_objects)
    
  3. 现在,尽管创建了一个新的智能体运行,您仍然可以使用记录 same_history_as_step_1 继续对话。

(这个例子是完整的,可以“按原样”运行)

使用消息的其他方式

由于消息是由简单的数据类(dataclasses)定义的,您可以手动创建和操作它们,例如用于测试。

消息格式与所使用的模型无关,因此您可以在不同的智能体中使用消息,或者在同一个智能体中使用不同的模型。

在下面的示例中,我们重用了第一个智能体运行(使用 openai:gpt-4o 模型)中的消息,并在第二个使用 google-gla:gemini-1.5-pro 模型的智能体运行中使用它。

使用不同模型重用消息
from pydantic_ai import Agent

agent = Agent('openai:gpt-4o', system_prompt='Be a helpful assistant.')

result1 = agent.run_sync('Tell me a joke.')
print(result1.output)
#> Did you hear about the toothpaste scandal? They called it Colgate.

result2 = agent.run_sync(
    'Explain?',
    model='google-gla:gemini-1.5-pro',
    message_history=result1.new_messages(),
)
print(result2.output)
#> This is an excellent joke invented by Samuel Colvin, it needs no explanation.

print(result2.all_messages())
"""
[
    ModelRequest(
        parts=[
            SystemPromptPart(
                content='Be a helpful assistant.',
                timestamp=datetime.datetime(...),
            ),
            UserPromptPart(
                content='Tell me a joke.',
                timestamp=datetime.datetime(...),
            ),
        ]
    ),
    ModelResponse(
        parts=[
            TextPart(
                content='Did you hear about the toothpaste scandal? They called it Colgate.'
            )
        ],
        usage=RequestUsage(input_tokens=60, output_tokens=12),
        model_name='gpt-4o',
        timestamp=datetime.datetime(...),
    ),
    ModelRequest(
        parts=[
            UserPromptPart(
                content='Explain?',
                timestamp=datetime.datetime(...),
            )
        ]
    ),
    ModelResponse(
        parts=[
            TextPart(
                content='This is an excellent joke invented by Samuel Colvin, it needs no explanation.'
            )
        ],
        usage=RequestUsage(input_tokens=61, output_tokens=26),
        model_name='gemini-1.5-pro',
        timestamp=datetime.datetime(...),
    ),
]
"""

处理消息记录

有时,您可能希望在将消息记录发送给模型之前对其进行修改。这可能是出于隐私原因(过滤掉敏感信息)、节省 token 成本、为 LLM 提供较少上下文,或实现自定义处理逻辑。

Pydantic AI 在 Agent 上提供了一个 history_processors 参数,允许您在每次模型请求之前拦截和修改消息记录。

记录处理器会替换消息记录

记录处理器会用处理过的消息(包括新的用户提示部分)替换状态中的消息记录。这意味着,如果您想保留原始的消息记录,您需要创建它的一个副本。

用法

history_processors 是一个可调用对象(callables)的列表,它们接受一个 ModelMessage 列表,并返回一个修改过的同类型列表。

每个处理器按顺序应用,处理器可以是同步的也可以是异步的。

simple_history_processor.py
from pydantic_ai import Agent
from pydantic_ai.messages import (
    ModelMessage,
    ModelRequest,
    ModelResponse,
    TextPart,
    UserPromptPart,
)


def filter_responses(messages: list[ModelMessage]) -> list[ModelMessage]:
    """Remove all ModelResponse messages, keeping only ModelRequest messages."""
    return [msg for msg in messages if isinstance(msg, ModelRequest)]

# Create agent with history processor
agent = Agent('openai:gpt-4o', history_processors=[filter_responses])

# Example: Create some conversation history
message_history = [
    ModelRequest(parts=[UserPromptPart(content='What is 2+2?')]),
    ModelResponse(parts=[TextPart(content='2+2 equals 4')]),  # This will be filtered out
]

# When you run the agent, the history processor will filter out ModelResponse messages
# result = agent.run_sync('What about 3+3?', message_history=message_history)

仅保留最近的消息

您可以使用 history_processor 只保留最近的消息:

keep_recent_messages.py
from pydantic_ai import Agent
from pydantic_ai.messages import ModelMessage


async def keep_recent_messages(messages: list[ModelMessage]) -> list[ModelMessage]:
    """Keep only the last 5 messages to manage token usage."""
    return messages[-5:] if len(messages) > 5 else messages

agent = Agent('openai:gpt-4o', history_processors=[keep_recent_messages])

# Example: Even with a long conversation history, only the last 5 messages are sent to the model
long_conversation_history: list[ModelMessage] = []  # Your long conversation history here
# result = agent.run_sync('What did we discuss?', message_history=long_conversation_history)

切片消息记录时要小心

在对消息记录进行切片时,您需要确保工具调用和返回是成对的,否则 LLM 可能会返回错误。更多详情,请参阅此 GitHub issue

RunContext 参数

记录处理器可以选择性地接受一个 RunContext 参数,以访问有关当前运行的附加信息,例如依赖项、模型信息和使用情况统计信息。

context_aware_processor.py
from pydantic_ai import Agent, RunContext
from pydantic_ai.messages import ModelMessage


def context_aware_processor(
    ctx: RunContext[None],
    messages: list[ModelMessage],
) -> list[ModelMessage]:
    # Access current usage
    current_tokens = ctx.usage.total_tokens

    # Filter messages based on context
    if current_tokens > 1000:
        return messages[-3:]  # Keep only recent messages when token usage is high
    return messages

agent = Agent('openai:gpt-4o', history_processors=[context_aware_processor])

这允许基于智能体运行的当前状态进行更复杂的消息处理。

总结旧消息

使用 LLM 总结较旧的消息,以在减少 token 的同时保留上下文。

summarize_old_messages.py
from pydantic_ai import Agent
from pydantic_ai.messages import ModelMessage

# Use a cheaper model to summarize old messages.
summarize_agent = Agent(
    'openai:gpt-4o-mini',
    instructions="""
Summarize this conversation, omitting small talk and unrelated topics.
Focus on the technical discussion and next steps.
""",
)


async def summarize_old_messages(messages: list[ModelMessage]) -> list[ModelMessage]:
    # Summarize the oldest 10 messages
    if len(messages) > 10:
        oldest_messages = messages[:10]
        summary = await summarize_agent.run(message_history=oldest_messages)
        # Return the last message and the summary
        return summary.new_messages() + messages[-1:]

    return messages


agent = Agent('openai:gpt-4o', history_processors=[summarize_old_messages])

总结消息记录时要小心

在总结消息记录时,您需要确保工具调用和返回是成对的,否则 LLM 可能会返回错误。更多详情,请参阅此 GitHub issue,您可以在其中找到总结消息记录的示例。

测试记录处理器

您可以使用 FunctionModel 来测试实际发送给模型提供商的消息。

test_history_processor.py
import pytest

from pydantic_ai import Agent
from pydantic_ai.messages import (
    ModelMessage,
    ModelRequest,
    ModelResponse,
    TextPart,
    UserPromptPart,
)
from pydantic_ai.models.function import AgentInfo, FunctionModel


@pytest.fixture
def received_messages() -> list[ModelMessage]:
    return []


@pytest.fixture
def function_model(received_messages: list[ModelMessage]) -> FunctionModel:
    def capture_model_function(messages: list[ModelMessage], info: AgentInfo) -> ModelResponse:
        # Capture the messages that the provider actually receives
        received_messages.clear()
        received_messages.extend(messages)
        return ModelResponse(parts=[TextPart(content='Provider response')])

    return FunctionModel(capture_model_function)


def test_history_processor(function_model: FunctionModel, received_messages: list[ModelMessage]):
    def filter_responses(messages: list[ModelMessage]) -> list[ModelMessage]:
        return [msg for msg in messages if isinstance(msg, ModelRequest)]

    agent = Agent(function_model, history_processors=[filter_responses])

    message_history = [
        ModelRequest(parts=[UserPromptPart(content='Question 1')]),
        ModelResponse(parts=[TextPart(content='Answer 1')]),
    ]

    agent.run_sync('Question 2', message_history=message_history)
    assert received_messages == [
        ModelRequest(parts=[UserPromptPart(content='Question 1')]),
        ModelRequest(parts=[UserPromptPart(content='Question 2')]),
    ]

多个处理器

您也可以使用多个处理器:

multiple_history_processors.py
from pydantic_ai import Agent
from pydantic_ai.messages import ModelMessage, ModelRequest


def filter_responses(messages: list[ModelMessage]) -> list[ModelMessage]:
    return [msg for msg in messages if isinstance(msg, ModelRequest)]


def summarize_old_messages(messages: list[ModelMessage]) -> list[ModelMessage]:
    return messages[-5:]


agent = Agent('openai:gpt-4o', history_processors=[filter_responses, summarize_old_messages])

在这种情况下,filter_responses 处理器将首先应用,然后 summarize_old_messages 处理器将其次应用。

示例

有关在对话中使用消息的更完整示例,请参阅聊天应用示例。