跳转到内容

pydantic_ai.retries

基于 tenacity 的重试工具,尤其适用于 HTTP 请求。

该模块提供了 HTTP 传输层封装和等待策略,它们与 tenacity 库集成,为 HTTP 请求添加重试功能。这些传输层可以与支持自定义传输层的 HTTP 客户端(如 httpx)一起使用,而等待策略则可以与任何 tenacity 的重试装饰器一起使用。

该模块包括: - TenacityTransport:具有重试功能的同步 HTTP 传输层 - AsyncTenacityTransport:具有重试功能的异步 HTTP 传输层 - wait_retry_after:遵循 HTTP Retry-After 标头的等待策略

RetryConfig

基类:TypedDict

基于 tenacity 的重试配置。

这些参数与 tenacity 的 retry 装饰器的参数完全一致,通常通过 @retry(**config) 或类似方式将它们传递给该装饰器,在内部使用。

所有字段都是可选的,如果未提供,将使用 tenacity.retry 装饰器的默认值。

源代码位于 pydantic_ai_slim/pydantic_ai/retries.py
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
class RetryConfig(TypedDict, total=False):
    """The configuration for tenacity-based retrying.

    These are precisely the arguments to the tenacity `retry` decorator, and they are generally
    used internally by passing them to that decorator via `@retry(**config)` or similar.

    All fields are optional, and if not provided, the default values from the `tenacity.retry` decorator will be used.
    """

    sleep: Callable[[int | float], None | Awaitable[None]]
    """A sleep strategy to use for sleeping between retries.

    Tenacity's default for this argument is `tenacity.nap.sleep`."""

    stop: StopBaseT
    """
    A stop strategy to determine when to stop retrying.

    Tenacity's default for this argument is `tenacity.stop.stop_never`."""

    wait: WaitBaseT
    """
    A wait strategy to determine how long to wait between retries.

    Tenacity's default for this argument is `tenacity.wait.wait_none`."""

    retry: SyncRetryBaseT | RetryBaseT
    """A retry strategy to determine which exceptions should trigger a retry.

    Tenacity's default for this argument is `tenacity.retry.retry_if_exception_type()`."""

    before: Callable[[RetryCallState], None | Awaitable[None]]
    """
    A callable that is called before each retry attempt.

    Tenacity's default for this argument is `tenacity.before.before_nothing`."""

    after: Callable[[RetryCallState], None | Awaitable[None]]
    """
    A callable that is called after each retry attempt.

    Tenacity's default for this argument is `tenacity.after.after_nothing`."""

    before_sleep: Callable[[RetryCallState], None | Awaitable[None]] | None
    """
    An optional callable that is called before sleeping between retries.

    Tenacity's default for this argument is `None`."""

    reraise: bool
    """Whether to reraise the last exception if the retry attempts are exhausted, or raise a RetryError instead.

    Tenacity's default for this argument is `False`."""

    retry_error_cls: type[RetryError]
    """The exception class to raise when the retry attempts are exhausted and `reraise` is False.

    Tenacity's default for this argument is `tenacity.RetryError`."""

    retry_error_callback: Callable[[RetryCallState], Any | Awaitable[Any]] | None
    """An optional callable that is called when the retry attempts are exhausted and `reraise` is False.

    Tenacity's default for this argument is `None`."""

sleep instance-attribute

sleep: Callable[[int | float], None | Awaitable[None]]

用于在两次重试之间休眠的策略。

Tenacity 对此参数的默认值为 tenacity.nap.sleep

stop instance-attribute

stop: StopBaseT

用于确定何时停止重试的策略。

Tenacity 对此参数的默认值为 tenacity.stop.stop_never

wait instance-attribute

wait: WaitBaseT

用于确定两次重试之间等待多长时间的策略。

Tenacity 对此参数的默认值为 tenacity.wait.wait_none

retry instance-attribute

retry: RetryBaseT | RetryBaseT

用于确定哪些异常应触发重试的策略。

Tenacity 对此参数的默认值为 tenacity.retry.retry_if_exception_type()

before instance-attribute

before: Callable[[RetryCallState], None | Awaitable[None]]

在每次重试尝试之前调用的可调用对象。

Tenacity 对此参数的默认值为 tenacity.before.before_nothing

after instance-attribute

after: Callable[[RetryCallState], None | Awaitable[None]]

在每次重试尝试之后调用的可调用对象。

Tenacity 对此参数的默认值为 tenacity.after.after_nothing

before_sleep instance-attribute

before_sleep: (
    Callable[[RetryCallState], None | Awaitable[None]]
    | None
)

在两次重试之间休眠之前调用的可选可调用对象。

Tenacity 对此参数的默认值为 None

reraise instance-attribute

reraise: bool

当重试次数耗尽时,是重新引发最后一个异常,还是引发一个 RetryError。

Tenacity 对此参数的默认值为 False

retry_error_cls instance-attribute

retry_error_cls: type[RetryError]

当重试次数耗尽且 reraise 为 False 时引发的异常类。

Tenacity 对此参数的默认值为 tenacity.RetryError

retry_error_callback instance-attribute

retry_error_callback: (
    Callable[[RetryCallState], Any | Awaitable[Any]] | None
)

当重试次数耗尽且 reraise 为 False 时调用的可选可调用对象。

Tenacity 对此参数的默认值为 None

TenacityTransport

基类:BaseTransport

具有基于 tenacity 的重试功能的同步 HTTP 传输层。

该传输层封装了另一个 BaseTransport,并使用 tenacity 库添加了重试功能。可以配置它根据各种条件进行重试,例如特定的异常类型、响应状态码或自定义验证逻辑。

该传输层通过拦截 HTTP 请求和响应来工作,允许 tenacity 控制器确定何时以及如何重试失败的请求。validate_response 函数可用于将 HTTP 响应转换为触发重试的异常。

参数

名称 类型 描述 默认值
wrapped BaseTransport | None

要封装并添加重试功能的底层传输层。

None
config RetryConfig

用于 tenacity retry 装饰器的参数,包括重试条件、等待策略、停止条件等。更多信息请参阅 tenacity 文档。

必需
validate_response Callable[[Response], Any] | None

一个可选的可调用对象,它接受一个 Response,如果该响应应触发重试,则可以引发一个由控制器处理的异常。常见的用例是为某些 HTTP 状态码引发异常。如果为 None,则不执行响应验证。

None
示例
from httpx import Client, HTTPStatusError, HTTPTransport
from tenacity import retry_if_exception_type, stop_after_attempt

from pydantic_ai.retries import RetryConfig, TenacityTransport, wait_retry_after

transport = TenacityTransport(
    RetryConfig(
        retry=retry_if_exception_type(HTTPStatusError),
        wait=wait_retry_after(max_wait=300),
        stop=stop_after_attempt(5),
        reraise=True
    ),
    HTTPTransport(),
    validate_response=lambda r: r.raise_for_status()
)
client = Client(transport=transport)
源代码位于 pydantic_ai_slim/pydantic_ai/retries.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
class TenacityTransport(BaseTransport):
    """Synchronous HTTP transport with tenacity-based retry functionality.

    This transport wraps another BaseTransport and adds retry capabilities using the tenacity library.
    It can be configured to retry requests based on various conditions such as specific exception types,
    response status codes, or custom validation logic.

    The transport works by intercepting HTTP requests and responses, allowing the tenacity controller
    to determine when and how to retry failed requests. The validate_response function can be used
    to convert HTTP responses into exceptions that trigger retries.

    Args:
        wrapped: The underlying transport to wrap and add retry functionality to.
        config: The arguments to use for the tenacity `retry` decorator, including retry conditions,
            wait strategy, stop conditions, etc. See the tenacity docs for more info.
        validate_response: Optional callable that takes a Response and can raise an exception
            to be handled by the controller if the response should trigger a retry.
            Common use case is to raise exceptions for certain HTTP status codes.
            If None, no response validation is performed.

    Example:
        ```python
        from httpx import Client, HTTPStatusError, HTTPTransport
        from tenacity import retry_if_exception_type, stop_after_attempt

        from pydantic_ai.retries import RetryConfig, TenacityTransport, wait_retry_after

        transport = TenacityTransport(
            RetryConfig(
                retry=retry_if_exception_type(HTTPStatusError),
                wait=wait_retry_after(max_wait=300),
                stop=stop_after_attempt(5),
                reraise=True
            ),
            HTTPTransport(),
            validate_response=lambda r: r.raise_for_status()
        )
        client = Client(transport=transport)
        ```
    """

    def __init__(
        self,
        config: RetryConfig,
        wrapped: BaseTransport | None = None,
        validate_response: Callable[[Response], Any] | None = None,
    ):
        self.config = config
        self.wrapped = wrapped or HTTPTransport()
        self.validate_response = validate_response

    def handle_request(self, request: Request) -> Response:
        """Handle an HTTP request with retry logic.

        Args:
            request: The HTTP request to handle.

        Returns:
            The HTTP response.

        Raises:
            RuntimeError: If the retry controller did not make any attempts.
            Exception: Any exception raised by the wrapped transport or validation function.
        """

        @retry(**self.config)
        def handle_request(req: Request) -> Response:
            response = self.wrapped.handle_request(req)

            # this is normally set by httpx _after_ calling this function, but we want the request in the validator:
            response.request = req

            if self.validate_response:
                try:
                    self.validate_response(response)
                except Exception:
                    response.close()
                    raise
            return response

        return handle_request(request)

    def __enter__(self) -> TenacityTransport:
        self.wrapped.__enter__()
        return self

    def __exit__(
        self,
        exc_type: type[BaseException] | None = None,
        exc_value: BaseException | None = None,
        traceback: TracebackType | None = None,
    ) -> None:
        self.wrapped.__exit__(exc_type, exc_value, traceback)

    def close(self) -> None:
        self.wrapped.close()  # pragma: no cover

handle_request

handle_request(request: Request) -> Response

使用重试逻辑处理 HTTP 请求。

参数

名称 类型 描述 默认值
request Request

要处理的 HTTP 请求。

必需

返回

类型 描述
Response

HTTP 响应。

引发

类型 描述
RuntimeError

如果重试控制器没有进行任何尝试。

Exception

由封装的传输层或验证函数引发的任何异常。

源代码位于 pydantic_ai_slim/pydantic_ai/retries.py
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
def handle_request(self, request: Request) -> Response:
    """Handle an HTTP request with retry logic.

    Args:
        request: The HTTP request to handle.

    Returns:
        The HTTP response.

    Raises:
        RuntimeError: If the retry controller did not make any attempts.
        Exception: Any exception raised by the wrapped transport or validation function.
    """

    @retry(**self.config)
    def handle_request(req: Request) -> Response:
        response = self.wrapped.handle_request(req)

        # this is normally set by httpx _after_ calling this function, but we want the request in the validator:
        response.request = req

        if self.validate_response:
            try:
                self.validate_response(response)
            except Exception:
                response.close()
                raise
        return response

    return handle_request(request)

AsyncTenacityTransport

基类:AsyncBaseTransport

具有基于 tenacity 的重试功能的异步 HTTP 传输层。

该传输层封装了另一个 AsyncBaseTransport,并使用 tenacity 库添加了重试功能。可以配置它根据各种条件进行重试,例如特定的异常类型、响应状态码或自定义验证逻辑。

该传输层通过拦截 HTTP 请求和响应来工作,允许 tenacity 控制器确定何时以及如何重试失败的请求。validate_response 函数可用于将 HTTP 响应转换为触发重试的异常。

参数

名称 类型 描述 默认值
wrapped AsyncBaseTransport | None

要封装并添加重试功能的底层异步传输层。

None
config RetryConfig

用于 tenacity retry 装饰器的参数,包括重试条件、等待策略、停止条件等。更多信息请参阅 tenacity 文档。

必需
validate_response Callable[[Response], Any] | None

一个可选的可调用对象,它接受一个 Response,如果该响应应触发重试,则可以引发一个由控制器处理的异常。常见的用例是为某些 HTTP 状态码引发异常。如果为 None,则不执行响应验证。

None
示例
from httpx import AsyncClient, HTTPStatusError
from tenacity import retry_if_exception_type, stop_after_attempt

from pydantic_ai.retries import AsyncTenacityTransport, RetryConfig, wait_retry_after

transport = AsyncTenacityTransport(
    RetryConfig(
        retry=retry_if_exception_type(HTTPStatusError),
        wait=wait_retry_after(max_wait=300),
        stop=stop_after_attempt(5),
        reraise=True
    ),
    validate_response=lambda r: r.raise_for_status()
)
client = AsyncClient(transport=transport)
源代码位于 pydantic_ai_slim/pydantic_ai/retries.py
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
class AsyncTenacityTransport(AsyncBaseTransport):
    """Asynchronous HTTP transport with tenacity-based retry functionality.

    This transport wraps another AsyncBaseTransport and adds retry capabilities using the tenacity library.
    It can be configured to retry requests based on various conditions such as specific exception types,
    response status codes, or custom validation logic.

    The transport works by intercepting HTTP requests and responses, allowing the tenacity controller
    to determine when and how to retry failed requests. The validate_response function can be used
    to convert HTTP responses into exceptions that trigger retries.

    Args:
        wrapped: The underlying async transport to wrap and add retry functionality to.
        config: The arguments to use for the tenacity `retry` decorator, including retry conditions,
            wait strategy, stop conditions, etc. See the tenacity docs for more info.
        validate_response: Optional callable that takes a Response and can raise an exception
            to be handled by the controller if the response should trigger a retry.
            Common use case is to raise exceptions for certain HTTP status codes.
            If None, no response validation is performed.

    Example:
        ```python
        from httpx import AsyncClient, HTTPStatusError
        from tenacity import retry_if_exception_type, stop_after_attempt

        from pydantic_ai.retries import AsyncTenacityTransport, RetryConfig, wait_retry_after

        transport = AsyncTenacityTransport(
            RetryConfig(
                retry=retry_if_exception_type(HTTPStatusError),
                wait=wait_retry_after(max_wait=300),
                stop=stop_after_attempt(5),
                reraise=True
            ),
            validate_response=lambda r: r.raise_for_status()
        )
        client = AsyncClient(transport=transport)
        ```
    """

    def __init__(
        self,
        config: RetryConfig,
        wrapped: AsyncBaseTransport | None = None,
        validate_response: Callable[[Response], Any] | None = None,
    ):
        self.config = config
        self.wrapped = wrapped or AsyncHTTPTransport()
        self.validate_response = validate_response

    async def handle_async_request(self, request: Request) -> Response:
        """Handle an async HTTP request with retry logic.

        Args:
            request: The HTTP request to handle.

        Returns:
            The HTTP response.

        Raises:
            RuntimeError: If the retry controller did not make any attempts.
            Exception: Any exception raised by the wrapped transport or validation function.
        """

        @retry(**self.config)
        async def handle_async_request(req: Request) -> Response:
            response = await self.wrapped.handle_async_request(req)

            # this is normally set by httpx _after_ calling this function, but we want the request in the validator:
            response.request = req

            if self.validate_response:
                try:
                    self.validate_response(response)
                except Exception:
                    await response.aclose()
                    raise
            return response

        return await handle_async_request(request)

    async def __aenter__(self) -> AsyncTenacityTransport:
        await self.wrapped.__aenter__()
        return self

    async def __aexit__(
        self,
        exc_type: type[BaseException] | None = None,
        exc_value: BaseException | None = None,
        traceback: TracebackType | None = None,
    ) -> None:
        await self.wrapped.__aexit__(exc_type, exc_value, traceback)

    async def aclose(self) -> None:
        await self.wrapped.aclose()

handle_async_request async

handle_async_request(request: Request) -> Response

使用重试逻辑处理异步 HTTP 请求。

参数

名称 类型 描述 默认值
request Request

要处理的 HTTP 请求。

必需

返回

类型 描述
Response

HTTP 响应。

引发

类型 描述
RuntimeError

如果重试控制器没有进行任何尝试。

Exception

由封装的传输层或验证函数引发的任何异常。

源代码位于 pydantic_ai_slim/pydantic_ai/retries.py
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
async def handle_async_request(self, request: Request) -> Response:
    """Handle an async HTTP request with retry logic.

    Args:
        request: The HTTP request to handle.

    Returns:
        The HTTP response.

    Raises:
        RuntimeError: If the retry controller did not make any attempts.
        Exception: Any exception raised by the wrapped transport or validation function.
    """

    @retry(**self.config)
    async def handle_async_request(req: Request) -> Response:
        response = await self.wrapped.handle_async_request(req)

        # this is normally set by httpx _after_ calling this function, but we want the request in the validator:
        response.request = req

        if self.validate_response:
            try:
                self.validate_response(response)
            except Exception:
                await response.aclose()
                raise
        return response

    return await handle_async_request(request)

wait_retry_after

wait_retry_after(
    fallback_strategy: (
        Callable[[RetryCallState], float] | None
    ) = None,
    max_wait: float = 300,
) -> Callable[[RetryCallState], float]

创建一个遵循 HTTP Retry-After 标头的、与 tenacity 兼容的等待策略。

此等待策略会检查异常是否包含带有 Retry-After 标头的 HTTPStatusError,如果是,则等待标头中指定的时间。如果没有该标头或解析失败,则回退到提供的策略。

Retry-After 标头可以有两种格式: - 一个表示等待秒数的整数 - 一个表示何时重试的 HTTP 日期字符串

参数

名称 类型 描述 默认值
fallback_strategy Callable[[RetryCallState], float] | None

当不存在 Retry-After 标头或解析失败时使用的等待策略。默认为最大 60 秒的指数退避策略。

None
max_wait float

无论标头值如何,最大等待时间(秒)。默认为 300(5分钟)。

300

返回

类型 描述
Callable[[RetryCallState], float]

一个可以与 tenacity 重试装饰器一起使用的等待函数。

示例
from httpx import AsyncClient, HTTPStatusError
from tenacity import retry_if_exception_type, stop_after_attempt

from pydantic_ai.retries import AsyncTenacityTransport, RetryConfig, wait_retry_after

transport = AsyncTenacityTransport(
    RetryConfig(
        retry=retry_if_exception_type(HTTPStatusError),
        wait=wait_retry_after(max_wait=120),
        stop=stop_after_attempt(5),
        reraise=True
    ),
    validate_response=lambda r: r.raise_for_status()
)
client = AsyncClient(transport=transport)
源代码位于 pydantic_ai_slim/pydantic_ai/retries.py
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
def wait_retry_after(
    fallback_strategy: Callable[[RetryCallState], float] | None = None, max_wait: float = 300
) -> Callable[[RetryCallState], float]:
    """Create a tenacity-compatible wait strategy that respects HTTP Retry-After headers.

    This wait strategy checks if the exception contains an HTTPStatusError with a
    Retry-After header, and if so, waits for the time specified in the header.
    If no header is present or parsing fails, it falls back to the provided strategy.

    The Retry-After header can be in two formats:
    - An integer representing seconds to wait
    - An HTTP date string representing when to retry

    Args:
        fallback_strategy: Wait strategy to use when no Retry-After header is present
                          or parsing fails. Defaults to exponential backoff with max 60s.
        max_wait: Maximum time to wait in seconds, regardless of header value.
                 Defaults to 300 (5 minutes).

    Returns:
        A wait function that can be used with tenacity retry decorators.

    Example:
        ```python
        from httpx import AsyncClient, HTTPStatusError
        from tenacity import retry_if_exception_type, stop_after_attempt

        from pydantic_ai.retries import AsyncTenacityTransport, RetryConfig, wait_retry_after

        transport = AsyncTenacityTransport(
            RetryConfig(
                retry=retry_if_exception_type(HTTPStatusError),
                wait=wait_retry_after(max_wait=120),
                stop=stop_after_attempt(5),
                reraise=True
            ),
            validate_response=lambda r: r.raise_for_status()
        )
        client = AsyncClient(transport=transport)
        ```
    """
    if fallback_strategy is None:
        fallback_strategy = wait_exponential(multiplier=1, max=60)

    def wait_func(state: RetryCallState) -> float:
        exc = state.outcome.exception() if state.outcome else None
        if isinstance(exc, HTTPStatusError):
            retry_after = exc.response.headers.get('retry-after')
            if retry_after:
                try:
                    # Try parsing as seconds first
                    wait_seconds = int(retry_after)
                    return min(float(wait_seconds), max_wait)
                except ValueError:
                    # Try parsing as HTTP date
                    try:
                        retry_time = cast(datetime, parsedate_to_datetime(retry_after))
                        assert isinstance(retry_time, datetime)
                        now = datetime.now(timezone.utc)
                        wait_seconds = (retry_time - now).total_seconds()

                        if wait_seconds > 0:
                            return min(wait_seconds, max_wait)
                    except (ValueError, TypeError, AssertionError):
                        # If date parsing fails, fall back to fallback strategy
                        pass

        # Use fallback strategy
        return fallback_strategy(state)

    return wait_func