跳到内容

多智能体应用

使用 PydanticAI 构建应用程序时,大致有四个复杂程度级别

  1. 单智能体工作流程 — 大部分 pydantic_ai 文档涵盖的内容
  2. 智能体委托 — 智能体通过工具使用另一个智能体
  3. 程序化智能体交接 — 一个智能体运行,然后应用程序代码调用另一个智能体
  4. 基于图的控制流 — 对于最复杂的情况,可以使用基于图的状态机来控制多个智能体的执行

当然,您可以在单个应用程序中组合多种策略。

智能体委托

“智能体委托”指的是一个智能体将工作委托给另一个智能体,然后在委托智能体(从工具内部调用的智能体)完成时取回控制权的情况。

由于智能体是无状态的并且被设计为全局的,因此您不需要将智能体本身包含在智能体依赖项中。

您通常需要将 ctx.usage 传递给委托智能体运行的 usage 关键字参数,以便该运行中的使用量计入父智能体运行的总使用量。

多个模型

智能体委托不需要为每个智能体使用相同的模型。如果您选择在一个运行中使用不同的模型,则无法从运行的最终 result.usage() 计算货币成本,但您仍然可以使用 UsageLimits 来避免意外成本。

agent_delegation_simple.py
from pydantic_ai import Agent, RunContext
from pydantic_ai.usage import UsageLimits

joke_selection_agent = Agent(  # (1)!
    'openai:gpt-4o',
    system_prompt=(
        'Use the `joke_factory` to generate some jokes, then choose the best. '
        'You must return just a single joke.'
    ),
)
joke_generation_agent = Agent(  # (2)!
    'google-gla:gemini-1.5-flash', result_type=list[str]
)


@joke_selection_agent.tool
async def joke_factory(ctx: RunContext[None], count: int) -> list[str]:
    r = await joke_generation_agent.run(  # (3)!
        f'Please generate {count} jokes.',
        usage=ctx.usage,  # (4)!
    )
    return r.data  # (5)!


result = joke_selection_agent.run_sync(
    'Tell me a joke.',
    usage_limits=UsageLimits(request_limit=5, total_tokens_limit=300),
)
print(result.data)
#> Did you hear about the toothpaste scandal? They called it Colgate.
print(result.usage())
"""
Usage(
    requests=3, request_tokens=204, response_tokens=24, total_tokens=228, details=None
)
"""
  1. “父”或控制智能体。
  2. “委托”智能体,从父智能体的工具内部调用。
  3. 从父智能体的工具内部调用委托智能体。
  4. 将父智能体的使用量传递给委托智能体,以便最终的 result.usage() 包括来自两个智能体的使用量。
  5. 由于函数返回 list[str],并且 joke_generation_agentresult_type 也是 list[str],我们可以简单地从工具返回 r.data

(此示例是完整的,可以直接“按原样”运行)

此示例的控制流程非常简单,可以概括如下

graph TD
  START --> joke_selection_agent
  joke_selection_agent --> joke_factory["joke_factory (tool)"]
  joke_factory --> joke_generation_agent
  joke_generation_agent --> joke_factory
  joke_factory --> joke_selection_agent
  joke_selection_agent --> END

智能体委托和依赖项

通常,委托智能体需要具有与调用智能体相同的依赖项,或者依赖项是调用智能体依赖项的子集。

初始化依赖项

我们上面说“通常”,因为没有什么可以阻止您在工具调用中初始化依赖项,因此在父智能体上不可用的委托智能体中使用相互依赖性,这通常应该避免,因为它可能比重用来自父智能体的连接等慢得多。

agent_delegation_deps.py
from dataclasses import dataclass

import httpx

from pydantic_ai import Agent, RunContext


@dataclass
class ClientAndKey:  # (1)!
    http_client: httpx.AsyncClient
    api_key: str


joke_selection_agent = Agent(
    'openai:gpt-4o',
    deps_type=ClientAndKey,  # (2)!
    system_prompt=(
        'Use the `joke_factory` tool to generate some jokes on the given subject, '
        'then choose the best. You must return just a single joke.'
    ),
)
joke_generation_agent = Agent(
    'gemini-1.5-flash',
    deps_type=ClientAndKey,  # (4)!
    result_type=list[str],
    system_prompt=(
        'Use the "get_jokes" tool to get some jokes on the given subject, '
        'then extract each joke into a list.'
    ),
)


@joke_selection_agent.tool
async def joke_factory(ctx: RunContext[ClientAndKey], count: int) -> list[str]:
    r = await joke_generation_agent.run(
        f'Please generate {count} jokes.',
        deps=ctx.deps,  # (3)!
        usage=ctx.usage,
    )
    return r.data


@joke_generation_agent.tool  # (5)!
async def get_jokes(ctx: RunContext[ClientAndKey], count: int) -> str:
    response = await ctx.deps.http_client.get(
        'https://example.com',
        params={'count': count},
        headers={'Authorization': f'Bearer {ctx.deps.api_key}'},
    )
    response.raise_for_status()
    return response.text


async def main():
    async with httpx.AsyncClient() as client:
        deps = ClientAndKey(client, 'foobar')
        result = await joke_selection_agent.run('Tell me a joke.', deps=deps)
        print(result.data)
        #> Did you hear about the toothpaste scandal? They called it Colgate.
        print(result.usage())  # (6)!
        """
        Usage(
            requests=4,
            request_tokens=309,
            response_tokens=32,
            total_tokens=341,
            details=None,
        )
        """
  1. 定义一个数据类来保存客户端和 API 密钥依赖项。
  2. 设置调用智能体的 deps_type — 此处为 joke_selection_agent
  3. 在工具调用中将依赖项传递给委托智能体的 run 方法。
  4. 还要设置委托智能体的 deps_type — 此处为 joke_generation_agent
  5. 在委托智能体上定义一个工具,该工具使用依赖项来发出 HTTP 请求。
  6. 使用量现在包括 4 个请求 — 2 个来自调用智能体,2 个来自委托智能体。

(此示例是完整的,可以直接“按原样”运行 — 您需要添加 asyncio.run(main()) 来运行 main

此示例展示了即使是相当简单的智能体委托也可能导致复杂的控制流程

graph TD
  START --> joke_selection_agent
  joke_selection_agent --> joke_factory["joke_factory (tool)"]
  joke_factory --> joke_generation_agent
  joke_generation_agent --> get_jokes["get_jokes (tool)"]
  get_jokes --> http_request["HTTP request"]
  http_request --> get_jokes
  get_jokes --> joke_generation_agent
  joke_generation_agent --> joke_factory
  joke_factory --> joke_selection_agent
  joke_selection_agent --> END

程序化智能体交接

“程序化智能体交接”指的是连续调用多个智能体的场景,其中应用程序代码和/或人工负责决定接下来调用哪个智能体。

在这里,智能体不需要使用相同的 deps。

在这里,我们展示了连续使用的两个智能体,第一个用于查找航班,第二个用于提取用户的座位偏好。

programmatic_handoff.py
from typing import Literal, Union

from pydantic import BaseModel, Field
from rich.prompt import Prompt

from pydantic_ai import Agent, RunContext
from pydantic_ai.messages import ModelMessage
from pydantic_ai.usage import Usage, UsageLimits


class FlightDetails(BaseModel):
    flight_number: str


class Failed(BaseModel):
    """Unable to find a satisfactory choice."""


flight_search_agent = Agent[None, Union[FlightDetails, Failed]](  # (1)!
    'openai:gpt-4o',
    result_type=Union[FlightDetails, Failed],  # type: ignore
    system_prompt=(
        'Use the "flight_search" tool to find a flight '
        'from the given origin to the given destination.'
    ),
)


@flight_search_agent.tool  # (2)!
async def flight_search(
    ctx: RunContext[None], origin: str, destination: str
) -> Union[FlightDetails, None]:
    # in reality, this would call a flight search API or
    # use a browser to scrape a flight search website
    return FlightDetails(flight_number='AK456')


usage_limits = UsageLimits(request_limit=15)  # (3)!


async def find_flight(usage: Usage) -> Union[FlightDetails, None]:  # (4)!
    message_history: Union[list[ModelMessage], None] = None
    for _ in range(3):
        prompt = Prompt.ask(
            'Where would you like to fly from and to?',
        )
        result = await flight_search_agent.run(
            prompt,
            message_history=message_history,
            usage=usage,
            usage_limits=usage_limits,
        )
        if isinstance(result.data, FlightDetails):
            return result.data
        else:
            message_history = result.all_messages(
                result_tool_return_content='Please try again.'
            )


class SeatPreference(BaseModel):
    row: int = Field(ge=1, le=30)
    seat: Literal['A', 'B', 'C', 'D', 'E', 'F']


# This agent is responsible for extracting the user's seat selection
seat_preference_agent = Agent[None, Union[SeatPreference, Failed]](  # (5)!
    'openai:gpt-4o',
    result_type=Union[SeatPreference, Failed],  # type: ignore
    system_prompt=(
        "Extract the user's seat preference. "
        'Seats A and F are window seats. '
        'Row 1 is the front row and has extra leg room. '
        'Rows 14, and 20 also have extra leg room. '
    ),
)


async def find_seat(usage: Usage) -> SeatPreference:  # (6)!
    message_history: Union[list[ModelMessage], None] = None
    while True:
        answer = Prompt.ask('What seat would you like?')

        result = await seat_preference_agent.run(
            answer,
            message_history=message_history,
            usage=usage,
            usage_limits=usage_limits,
        )
        if isinstance(result.data, SeatPreference):
            return result.data
        else:
            print('Could not understand seat preference. Please try again.')
            message_history = result.all_messages()


async def main():  # (7)!
    usage: Usage = Usage()

    opt_flight_details = await find_flight(usage)
    if opt_flight_details is not None:
        print(f'Flight found: {opt_flight_details.flight_number}')
        #> Flight found: AK456
        seat_preference = await find_seat(usage)
        print(f'Seat preference: {seat_preference}')
        #> Seat preference: row=1 seat='A'
  1. 定义第一个智能体,用于查找航班。在我们使用显式类型注解直到 PEP-747 落地之前,请参阅结构化结果。我们使用联合作为结果类型,以便模型可以沟通它是否无法找到令人满意的选择;在内部,联合的每个成员都将注册为单独的工具。
  2. 在智能体上定义一个工具来查找航班。在这个简单的例子中,我们可以不用工具,而只是定义智能体来返回结构化数据,然后搜索航班,但在更复杂的情况下,工具是必要的。
  3. 为整个应用程序定义使用量限制。
  4. 定义一个查找航班的函数,该函数询问用户偏好,然后调用智能体查找航班。
  5. 与上面的 flight_search_agent 一样,我们使用显式类型注解来定义智能体。
  6. 定义一个查找用户座位偏好的函数,该函数询问用户座位偏好,然后调用智能体提取座位偏好。
  7. 现在我们已经将运行每个智能体的逻辑放入单独的函数中,我们的主应用程序变得非常简单。

(此示例是完整的,可以直接“按原样”运行 — 您需要添加 asyncio.run(main()) 来运行 main

此示例的控制流程可以概括如下

graph TB
  START --> ask_user_flight["ask user for flight"]

  subgraph find_flight
    flight_search_agent --> ask_user_flight
    ask_user_flight --> flight_search_agent
  end

  flight_search_agent --> ask_user_seat["ask user for seat"]
  flight_search_agent --> END

  subgraph find_seat
    seat_preference_agent --> ask_user_seat
    ask_user_seat --> seat_preference_agent
  end

  seat_preference_agent --> END

Pydantic 图

请参阅关于何时以及如何使用图的文档。

示例

以下示例演示如何在 PydanticAI 中使用依赖项