跳转到内容

HTTP 请求重试

Pydantic AI 通过自定义 HTTP 传输为模型提供商发出的 HTTP 请求提供重试功能。这对于处理瞬时故障(如速率限制、网络超时或临时服务器错误)特别有用。

概述

重试功能建立在 tenacity 库之上,并与 httpx 客户端无缝集成。您可以为任何接受自定义 HTTP 客户端的提供商配置重试行为。

安装

要使用重试传输,您需要安装 tenacity,可以通过 retries 依赖组进行安装

pip install 'pydantic-ai-slim[retries]'
uv add 'pydantic-ai-slim[retries]'

用法示例

以下是一个添加具有智能重试处理功能的重试功能的示例

smart_retry_example.py
from httpx import AsyncClient, HTTPStatusError
from tenacity import retry_if_exception_type, stop_after_attempt, wait_exponential

from pydantic_ai import Agent
from pydantic_ai.models.openai import OpenAIChatModel
from pydantic_ai.providers.openai import OpenAIProvider
from pydantic_ai.retries import AsyncTenacityTransport, RetryConfig, wait_retry_after


def create_retrying_client():
    """Create a client with smart retry handling for multiple error types."""

    def should_retry_status(response):
        """Raise exceptions for retryable HTTP status codes."""
        if response.status_code in (429, 502, 503, 504):
            response.raise_for_status()  # This will raise HTTPStatusError

    transport = AsyncTenacityTransport(
        config=RetryConfig(
            # Retry on HTTP errors and connection issues
            retry=retry_if_exception_type((HTTPStatusError, ConnectionError)),
            # Smart waiting: respects Retry-After headers, falls back to exponential backoff
            wait=wait_retry_after(
                fallback_strategy=wait_exponential(multiplier=1, max=60),
                max_wait=300
            ),
            # Stop after 5 attempts
            stop=stop_after_attempt(5),
            # Re-raise the last exception if all retries fail
            reraise=True
        ),
        validate_response=should_retry_status
    )
    return AsyncClient(transport=transport)

# Use the retrying client with a model
client = create_retrying_client()
model = OpenAIChatModel('gpt-4o', provider=OpenAIProvider(http_client=client))
agent = Agent(model)

等待策略

wait_retry_after

wait_retry_after 函数是一个智能等待策略,它会自动遵循 HTTP 的 Retry-After 响应头

wait_strategy_example.py
from tenacity import wait_exponential

from pydantic_ai.retries import wait_retry_after

# Basic usage - respects Retry-After headers, falls back to exponential backoff
wait_strategy_1 = wait_retry_after()

# Custom configuration
wait_strategy_2 = wait_retry_after(
    fallback_strategy=wait_exponential(multiplier=2, max=120),
    max_wait=600  # Never wait more than 10 minutes
)

此等待策略

  • 自动解析 HTTP 429 响应中的 Retry-After 响应头
  • 支持秒数格式("30")和 HTTP 日期格式("Wed, 21 Oct 2015 07:28:00 GMT"
  • 当响应头不存在时,回退到您选择的策略
  • 遵守 max_wait 限制以防止过度延迟

传输类

AsyncTenacityTransport

对于异步 HTTP 客户端(推荐用于大多数用例)

async_transport_example.py
from httpx import AsyncClient
from tenacity import stop_after_attempt

from pydantic_ai.retries import AsyncTenacityTransport, RetryConfig


def validator(response):
    """Treat responses with HTTP status 4xx/5xx as failures that need to be retried.
    Without a response validator, only network errors and timeouts will result in a retry.
    """
    response.raise_for_status()

# Create the transport
transport = AsyncTenacityTransport(
    config=RetryConfig(stop=stop_after_attempt(3), reraise=True),
    validate_response=validator
)

# Create a client using the transport:
client = AsyncClient(transport=transport)

TenacityTransport

对于同步 HTTP 客户端

sync_transport_example.py
from httpx import Client
from tenacity import stop_after_attempt

from pydantic_ai.retries import RetryConfig, TenacityTransport


def validator(response):
    """Treat responses with HTTP status 4xx/5xx as failures that need to be retried.
    Without a response validator, only network errors and timeouts will result in a retry.
    """
    response.raise_for_status()

# Create the transport
transport = TenacityTransport(
    config=RetryConfig(stop=stop_after_attempt(3), reraise=True),
    validate_response=validator
)

# Create a client using the transport
client = Client(transport=transport)

常见重试模式

支持 Retry-After 的速率限制处理

rate_limit_handling.py
from httpx import AsyncClient, HTTPStatusError
from tenacity import retry_if_exception_type, stop_after_attempt, wait_exponential

from pydantic_ai.retries import AsyncTenacityTransport, RetryConfig, wait_retry_after


def create_rate_limit_client():
    """Create a client that respects Retry-After headers from rate limiting responses."""
    transport = AsyncTenacityTransport(
        config=RetryConfig(
            retry=retry_if_exception_type(HTTPStatusError),
            wait=wait_retry_after(
                fallback_strategy=wait_exponential(multiplier=1, max=60),
                max_wait=300  # Don't wait more than 5 minutes
            ),
            stop=stop_after_attempt(10),
            reraise=True
        ),
        validate_response=lambda r: r.raise_for_status()  # Raises HTTPStatusError for 4xx/5xx
    )
    return AsyncClient(transport=transport)

# Example usage
client = create_rate_limit_client()
# Client is now ready to use with any HTTP requests and will respect Retry-After headers

wait_retry_after 函数会自动检测 429(速率限制)响应中的 Retry-After 响应头,并等待指定的时间。如果响应头不存在,它会回退到指数退避策略。

网络错误处理

network_error_handling.py
import httpx
from tenacity import retry_if_exception_type, stop_after_attempt, wait_exponential

from pydantic_ai.retries import AsyncTenacityTransport, RetryConfig


def create_network_resilient_client():
    """Create a client that handles network errors with retries."""
    transport = AsyncTenacityTransport(
        config=RetryConfig(
            retry=retry_if_exception_type((
                httpx.TimeoutException,
                httpx.ConnectError,
                httpx.ReadError
            )),
            wait=wait_exponential(multiplier=1, max=10),
            stop=stop_after_attempt(3),
            reraise=True
        )
    )
    return httpx.AsyncClient(transport=transport)

# Example usage
client = create_network_resilient_client()
# Client will now retry on timeout, connection, and read errors

自定义重试逻辑

custom_retry_logic.py
import httpx
from tenacity import stop_after_attempt, wait_exponential

from pydantic_ai.retries import AsyncTenacityTransport, RetryConfig, wait_retry_after


def create_custom_retry_client():
    """Create a client with custom retry logic."""
    def custom_retry_condition(exception):
        """Custom logic to determine if we should retry."""
        if isinstance(exception, httpx.HTTPStatusError):
            # Retry on server errors but not client errors
            return 500 <= exception.response.status_code < 600
        return isinstance(exception, httpx.TimeoutException | httpx.ConnectError)

    transport = AsyncTenacityTransport(
        config=RetryConfig(
            retry=custom_retry_condition,
            # Use wait_retry_after for smart waiting on rate limits,
            # with custom exponential backoff as fallback
            wait=wait_retry_after(
                fallback_strategy=wait_exponential(multiplier=2, max=30),
                max_wait=120
            ),
            stop=stop_after_attempt(5),
            reraise=True
        ),
        validate_response=lambda r: r.raise_for_status()
    )
    return httpx.AsyncClient(transport=transport)

client = create_custom_retry_client()
# Client will retry server errors (5xx) and network errors, but not client errors (4xx)

与不同提供商一起使用

重试传输适用于任何接受自定义 HTTP 客户端的提供商

OpenAI

openai_with_retries.py
from pydantic_ai import Agent
from pydantic_ai.models.openai import OpenAIChatModel
from pydantic_ai.providers.openai import OpenAIProvider

from smart_retry_example import create_retrying_client

client = create_retrying_client()
model = OpenAIChatModel('gpt-4o', provider=OpenAIProvider(http_client=client))
agent = Agent(model)

Anthropic

anthropic_with_retries.py
from pydantic_ai import Agent
from pydantic_ai.models.anthropic import AnthropicModel
from pydantic_ai.providers.anthropic import AnthropicProvider

from smart_retry_example import create_retrying_client

client = create_retrying_client()
model = AnthropicModel('claude-3-5-sonnet-20241022', provider=AnthropicProvider(http_client=client))
agent = Agent(model)

任何与 OpenAI 兼容的提供商

openai_compatible_with_retries.py
from pydantic_ai import Agent
from pydantic_ai.models.openai import OpenAIChatModel
from pydantic_ai.providers.openai import OpenAIProvider

from smart_retry_example import create_retrying_client

client = create_retrying_client()
model = OpenAIChatModel(
    'your-model-name',  # Replace with actual model name
    provider=OpenAIProvider(
        base_url='https://api.example.com/v1',  # Replace with actual API URL
        api_key='your-api-key',  # Replace with actual API key
        http_client=client
    )
)
agent = Agent(model)

最佳实践

  1. 从保守开始:从较少的重试次数(3-5 次)和合理的等待时间开始。

  2. 使用指数退避:这有助于避免在服务中断期间压垮服务器。

  3. 设置最大等待时间:通过合理的最大等待时间防止无限期延迟。

  4. 正确处理速率限制:尽可能遵守 Retry-After 响应头。

  5. 记录重试尝试:添加日志记录以监控生产环境中的重试行为。(如果您对 httpx 进行了埋点,Logfire 将自动捕获这些日志。)

  6. 考虑熔断器:对于高流量应用,可以考虑实现熔断器模式。

错误处理

如果所有重试尝试都失败,重试传输将重新抛出最后一个异常。请确保在您的应用程序中妥善处理这些异常

error_handling_example.py
from pydantic_ai import Agent
from pydantic_ai.models.openai import OpenAIChatModel
from pydantic_ai.providers.openai import OpenAIProvider

from smart_retry_example import create_retrying_client

client = create_retrying_client()
model = OpenAIChatModel('gpt-4o', provider=OpenAIProvider(http_client=client))
agent = Agent(model)

性能考量

  • 重试会增加请求的延迟,尤其是在使用指数退避时
  • 在配置重试行为时,请考虑应用程序的总超时时间
  • 监控重试率以检测系统性问题
  • 在处理多个请求时,使用异步传输以获得更好的并发性

有关更高级的重试配置,请参阅 tenacity 文档