跳转到内容

pydantic_ai.models.fallback

FallbackModel dataclass

基类:Model

一个在失败时使用一个或多个备用模型的模型。

除了 __init__ 外,所有方法都是私有的或与基类的方法匹配。

源代码位于 pydantic_ai_slim/pydantic_ai/models/fallback.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
@dataclass(init=False)
class FallbackModel(Model):
    """A model that uses one or more fallback models upon failure.

    Apart from `__init__`, all methods are private or match those of the base class.
    """

    models: list[Model]

    _model_name: str = field(repr=False)
    _fallback_on: Callable[[Exception], bool]

    def __init__(
        self,
        default_model: Model | KnownModelName | str,
        *fallback_models: Model | KnownModelName | str,
        fallback_on: Callable[[Exception], bool] | tuple[type[Exception], ...] = (ModelHTTPError,),
    ):
        """Initialize a fallback model instance.

        Args:
            default_model: The name or instance of the default model to use.
            fallback_models: The names or instances of the fallback models to use upon failure.
            fallback_on: A callable or tuple of exceptions that should trigger a fallback.
        """
        super().__init__()
        self.models = [infer_model(default_model), *[infer_model(m) for m in fallback_models]]

        if isinstance(fallback_on, tuple):
            self._fallback_on = _default_fallback_condition_factory(fallback_on)
        else:
            self._fallback_on = fallback_on

    @property
    def model_name(self) -> str:
        """The model name."""
        return f'fallback:{",".join(model.model_name for model in self.models)}'

    @property
    def system(self) -> str:
        return f'fallback:{",".join(model.system for model in self.models)}'

    @property
    def base_url(self) -> str | None:
        return self.models[0].base_url

    async def request(
        self,
        messages: list[ModelMessage],
        model_settings: ModelSettings | None,
        model_request_parameters: ModelRequestParameters,
    ) -> ModelResponse:
        """Try each model in sequence until one succeeds.

        In case of failure, raise a FallbackExceptionGroup with all exceptions.
        """
        exceptions: list[Exception] = []

        for model in self.models:
            customized_model_request_parameters = model.customize_request_parameters(model_request_parameters)
            merged_settings = merge_model_settings(model.settings, model_settings)
            try:
                response = await model.request(messages, merged_settings, customized_model_request_parameters)
            except Exception as exc:
                if self._fallback_on(exc):
                    exceptions.append(exc)
                    continue
                raise exc

            self._set_span_attributes(model)
            return response

        raise FallbackExceptionGroup('All models from FallbackModel failed', exceptions)

    @asynccontextmanager
    async def request_stream(
        self,
        messages: list[ModelMessage],
        model_settings: ModelSettings | None,
        model_request_parameters: ModelRequestParameters,
        run_context: RunContext[Any] | None = None,
    ) -> AsyncIterator[StreamedResponse]:
        """Try each model in sequence until one succeeds."""
        exceptions: list[Exception] = []

        for model in self.models:
            customized_model_request_parameters = model.customize_request_parameters(model_request_parameters)
            merged_settings = merge_model_settings(model.settings, model_settings)
            async with AsyncExitStack() as stack:
                try:
                    response = await stack.enter_async_context(
                        model.request_stream(
                            messages, merged_settings, customized_model_request_parameters, run_context
                        )
                    )
                except Exception as exc:
                    if self._fallback_on(exc):
                        exceptions.append(exc)
                        continue
                    raise exc  # pragma: no cover

                self._set_span_attributes(model)
                yield response
                return

        raise FallbackExceptionGroup('All models from FallbackModel failed', exceptions)

    def _set_span_attributes(self, model: Model):
        with suppress(Exception):
            span = get_current_span()
            if span.is_recording():
                attributes = getattr(span, 'attributes', {})
                if attributes.get('gen_ai.request.model') == self.model_name:  # pragma: no branch
                    span.set_attributes(InstrumentedModel.model_attributes(model))

__init__

__init__(
    default_model: Model | KnownModelName | str,
    *fallback_models: Model | KnownModelName | str,
    fallback_on: (
        Callable[[Exception], bool]
        | tuple[type[Exception], ...]
    ) = (ModelHTTPError,)
)

初始化一个备用模型实例。

参数

名称 类型 描述 默认值
default_model Model | KnownModelName | str

要使用的默认模型的名称或实例。

必需
fallback_models Model | KnownModelName | str

在失败时要使用的备用模型的名称或实例。

()
fallback_on Callable[[Exception], bool] | tuple[type[Exception], ...]

一个可调用对象或一个异常元组,用于触发备用机制。

(ModelHTTPError,)
源代码位于 pydantic_ai_slim/pydantic_ai/models/fallback.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
def __init__(
    self,
    default_model: Model | KnownModelName | str,
    *fallback_models: Model | KnownModelName | str,
    fallback_on: Callable[[Exception], bool] | tuple[type[Exception], ...] = (ModelHTTPError,),
):
    """Initialize a fallback model instance.

    Args:
        default_model: The name or instance of the default model to use.
        fallback_models: The names or instances of the fallback models to use upon failure.
        fallback_on: A callable or tuple of exceptions that should trigger a fallback.
    """
    super().__init__()
    self.models = [infer_model(default_model), *[infer_model(m) for m in fallback_models]]

    if isinstance(fallback_on, tuple):
        self._fallback_on = _default_fallback_condition_factory(fallback_on)
    else:
        self._fallback_on = fallback_on

model_name 属性

model_name: str

模型名称。

request async

request(
    messages: list[ModelMessage],
    model_settings: ModelSettings | None,
    model_request_parameters: ModelRequestParameters,
) -> ModelResponse

按顺序尝试每个模型,直到有一个成功为止。

如果全部失败,则抛出一个包含所有异常的 FallbackExceptionGroup。

源代码位于 pydantic_ai_slim/pydantic_ai/models/fallback.py
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
async def request(
    self,
    messages: list[ModelMessage],
    model_settings: ModelSettings | None,
    model_request_parameters: ModelRequestParameters,
) -> ModelResponse:
    """Try each model in sequence until one succeeds.

    In case of failure, raise a FallbackExceptionGroup with all exceptions.
    """
    exceptions: list[Exception] = []

    for model in self.models:
        customized_model_request_parameters = model.customize_request_parameters(model_request_parameters)
        merged_settings = merge_model_settings(model.settings, model_settings)
        try:
            response = await model.request(messages, merged_settings, customized_model_request_parameters)
        except Exception as exc:
            if self._fallback_on(exc):
                exceptions.append(exc)
                continue
            raise exc

        self._set_span_attributes(model)
        return response

    raise FallbackExceptionGroup('All models from FallbackModel failed', exceptions)

request_stream async

request_stream(
    messages: list[ModelMessage],
    model_settings: ModelSettings | None,
    model_request_parameters: ModelRequestParameters,
    run_context: RunContext[Any] | None = None,
) -> AsyncIterator[StreamedResponse]

按顺序尝试每个模型,直到有一个成功为止。

源代码位于 pydantic_ai_slim/pydantic_ai/models/fallback.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
@asynccontextmanager
async def request_stream(
    self,
    messages: list[ModelMessage],
    model_settings: ModelSettings | None,
    model_request_parameters: ModelRequestParameters,
    run_context: RunContext[Any] | None = None,
) -> AsyncIterator[StreamedResponse]:
    """Try each model in sequence until one succeeds."""
    exceptions: list[Exception] = []

    for model in self.models:
        customized_model_request_parameters = model.customize_request_parameters(model_request_parameters)
        merged_settings = merge_model_settings(model.settings, model_settings)
        async with AsyncExitStack() as stack:
            try:
                response = await stack.enter_async_context(
                    model.request_stream(
                        messages, merged_settings, customized_model_request_parameters, run_context
                    )
                )
            except Exception as exc:
                if self._fallback_on(exc):
                    exceptions.append(exc)
                    continue
                raise exc  # pragma: no cover

            self._set_span_attributes(model)
            yield response
            return

    raise FallbackExceptionGroup('All models from FallbackModel failed', exceptions)