跳转到内容

pydantic_ai.models.gemini

使用 HTTPXPydantic 自定义与 generativelanguage.googleapis.com API 的接口。

用于与 generativelanguage.googleapis.com API 交互的 Google SDK google-generativeai,读起来就像是由一个自认为精通 OOP 的 Java 开发者写的,他花了 30 分钟试图学习 Python,然后放弃了,并决定构建这个库来证明 Python 有多糟糕。它也没有使用 httpx 进行 HTTP 请求,而是试图自己实现工具调用,但没有使用 Pydantic 或等效库进行验证。

因此,我们直接实现了对该 API 的支持。

尽管存在这些缺点,Gemini 模型实际上功能强大且速度非常快。

设置

有关如何设置此模型身份验证的详细信息,请参阅Gemini 的模型配置

LatestGeminiModelNames module-attribute

LatestGeminiModelNames = Literal[
    "gemini-2.0-flash",
    "gemini-2.0-flash-lite",
    "gemini-2.5-flash",
    "gemini-2.5-flash-lite",
    "gemini-2.5-pro",
]

最新的 Gemini 模型。

GeminiModelName module-attribute

GeminiModelName = Union[str, LatestGeminiModelNames]

可能的 Gemini 模型名称。

由于 Gemini 支持多种带日期戳的模型,我们明确列出了最新的模型,但在类型提示中允许任何名称。有关完整列表,请参阅 Gemini API 文档

GeminiModelSettings

基类:ModelSettings

用于 Gemini 模型请求的设置。

源代码位于 pydantic_ai_slim/pydantic_ai/models/gemini.py
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
class GeminiModelSettings(ModelSettings, total=False):
    """Settings used for a Gemini model request."""

    # ALL FIELDS MUST BE `gemini_` PREFIXED SO YOU CAN MERGE THEM WITH OTHER MODELS.

    gemini_safety_settings: list[GeminiSafetySettings]
    """Safety settings options for Gemini model request."""

    gemini_thinking_config: ThinkingConfig
    """Thinking is "on" by default in both the API and AI Studio.

    Being on by default doesn't mean the model will send back thoughts. For that, you would need to set `include_thoughts`
    to `True`, but since end of January 2025, `thoughts` are not returned anymore, and are only displayed in the Google
    AI Studio. See https://discuss.ai.google.dev/t/thoughts-are-missing-cot-not-included-anymore/63653 for more details.

    If you want to avoid the model spending any tokens on thinking, you can set `thinking_budget` to `0`.

    See more about it on <https://ai.google.dev/gemini-api/docs/thinking>.
    """

    gemini_labels: dict[str, str]
    """User-defined metadata to break down billed charges. Only supported by the Vertex AI provider.

    See the [Gemini API docs](https://cloud.google.com/vertex-ai/generative-ai/docs/multimodal/add-labels-to-api-calls) for use cases and limitations.
    """

gemini_safety_settings instance-attribute

gemini_safety_settings: list[GeminiSafetySettings]

Gemini 模型请求的安全设置选项。

gemini_thinking_config instance-attribute

gemini_thinking_config: ThinkingConfig

在 API 和 AI Studio 中,“思考”功能默认是开启的。

默认开启并不意味着模型会返回思考过程。为此,您需要将 include_thoughts 设置为 True,但自 2025 年 1 月底起,thoughts 将不再返回,仅在 Google AI Studio 中显示。更多详情请参阅 https://discuss.ai.google.dev/t/thoughts-are-missing-cot-not-included-anymore/63653。

如果您希望避免模型在思考上花费任何 token,可以将 thinking_budget 设置为 0

有关更多信息,请访问 https://ai.google.dev/gemini-api/docs/thinking

gemini_labels instance-attribute

gemini_labels: dict[str, str]

用户定义的元数据,用于细分计费费用。仅 Vertex AI 提供商支持。

有关用例和限制,请参阅 Gemini API 文档

GeminiModel dataclass

基类:Model

通过 generativelanguage.googleapis.com API 使用 Gemini 的模型。

这是从头开始实现的,而不是使用专门的 SDK,这里有很好的 API 文档。

除了 __init__ 外,所有方法都是私有的或与基类的方法匹配。

源代码位于 pydantic_ai_slim/pydantic_ai/models/gemini.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
@dataclass(init=False)
class GeminiModel(Model):
    """A model that uses Gemini via `generativelanguage.googleapis.com` API.

    This is implemented from scratch rather than using a dedicated SDK, good API documentation is
    available [here](https://ai.google.dev/api).

    Apart from `__init__`, all methods are private or match those of the base class.
    """

    client: httpx.AsyncClient = field(repr=False)

    _model_name: GeminiModelName = field(repr=False)
    _provider: Literal['google-gla', 'google-vertex'] | Provider[httpx.AsyncClient] | None = field(repr=False)
    _auth: AuthProtocol | None = field(repr=False)
    _url: str | None = field(repr=False)
    _system: str = field(default='gemini', repr=False)

    def __init__(
        self,
        model_name: GeminiModelName,
        *,
        provider: Literal['google-gla', 'google-vertex'] | Provider[httpx.AsyncClient] = 'google-gla',
        profile: ModelProfileSpec | None = None,
        settings: ModelSettings | None = None,
    ):
        """Initialize a Gemini model.

        Args:
            model_name: The name of the model to use.
            provider: The provider to use for authentication and API access. Can be either the string
                'google-gla' or 'google-vertex' or an instance of `Provider[httpx.AsyncClient]`.
                If not provided, a new provider will be created using the other parameters.
            profile: The model profile to use. Defaults to a profile picked by the provider based on the model name.
            settings: Default model settings for this model instance.
        """
        self._model_name = model_name
        self._provider = provider

        if isinstance(provider, str):
            provider = infer_provider(provider)
        self._system = provider.name
        self.client = provider.client
        self._url = str(self.client.base_url)

        super().__init__(settings=settings, profile=profile or provider.model_profile)

    @property
    def base_url(self) -> str:
        assert self._url is not None, 'URL not initialized'  # pragma: no cover
        return self._url  # pragma: no cover

    async def request(
        self,
        messages: list[ModelMessage],
        model_settings: ModelSettings | None,
        model_request_parameters: ModelRequestParameters,
    ) -> ModelResponse:
        check_allow_model_requests()
        async with self._make_request(
            messages, False, cast(GeminiModelSettings, model_settings or {}), model_request_parameters
        ) as http_response:
            data = await http_response.aread()
            response = _gemini_response_ta.validate_json(data)
        return self._process_response(response)

    @asynccontextmanager
    async def request_stream(
        self,
        messages: list[ModelMessage],
        model_settings: ModelSettings | None,
        model_request_parameters: ModelRequestParameters,
    ) -> AsyncIterator[StreamedResponse]:
        check_allow_model_requests()
        async with self._make_request(
            messages, True, cast(GeminiModelSettings, model_settings or {}), model_request_parameters
        ) as http_response:
            yield await self._process_streamed_response(http_response)

    @property
    def model_name(self) -> GeminiModelName:
        """The model name."""
        return self._model_name

    @property
    def system(self) -> str:
        """The system / model provider."""
        return self._system

    def _get_tools(self, model_request_parameters: ModelRequestParameters) -> _GeminiTools | None:
        tools = [_function_from_abstract_tool(t) for t in model_request_parameters.function_tools]
        if model_request_parameters.output_tools:
            tools += [_function_from_abstract_tool(t) for t in model_request_parameters.output_tools]
        return _GeminiTools(function_declarations=tools) if tools else None

    def _get_tool_config(
        self, model_request_parameters: ModelRequestParameters, tools: _GeminiTools | None
    ) -> _GeminiToolConfig | None:
        if not model_request_parameters.allow_text_output and tools:
            return _tool_config([t['name'] for t in tools['function_declarations']])
        else:
            return None

    @asynccontextmanager
    async def _make_request(
        self,
        messages: list[ModelMessage],
        streamed: bool,
        model_settings: GeminiModelSettings,
        model_request_parameters: ModelRequestParameters,
    ) -> AsyncIterator[HTTPResponse]:
        tools = self._get_tools(model_request_parameters)
        tool_config = self._get_tool_config(model_request_parameters, tools)
        sys_prompt_parts, contents = await self._message_to_gemini_content(messages)

        request_data = _GeminiRequest(contents=contents)
        if sys_prompt_parts:
            request_data['systemInstruction'] = _GeminiTextContent(role='user', parts=sys_prompt_parts)
        if tools is not None:
            request_data['tools'] = tools
        if tool_config is not None:
            request_data['toolConfig'] = tool_config

        generation_config = _settings_to_generation_config(model_settings)
        if model_request_parameters.output_mode == 'native':
            if tools:
                raise UserError('Gemini does not support structured output and tools at the same time.')

            generation_config['response_mime_type'] = 'application/json'

            output_object = model_request_parameters.output_object
            assert output_object is not None
            generation_config['response_schema'] = self._map_response_schema(output_object)
        elif model_request_parameters.output_mode == 'prompted' and not tools:
            generation_config['response_mime_type'] = 'application/json'

        if generation_config:
            request_data['generationConfig'] = generation_config

        if gemini_safety_settings := model_settings.get('gemini_safety_settings'):
            request_data['safetySettings'] = gemini_safety_settings

        if gemini_labels := model_settings.get('gemini_labels'):
            if self._system == 'google-vertex':
                request_data['labels'] = gemini_labels

        headers = {'Content-Type': 'application/json', 'User-Agent': get_user_agent()}
        url = f'/{self._model_name}:{"streamGenerateContent" if streamed else "generateContent"}'

        request_json = _gemini_request_ta.dump_json(request_data, by_alias=True)
        async with self.client.stream(
            'POST',
            url,
            content=request_json,
            headers=headers,
            timeout=model_settings.get('timeout', USE_CLIENT_DEFAULT),
        ) as r:
            if (status_code := r.status_code) != 200:
                await r.aread()
                if status_code >= 400:
                    raise ModelHTTPError(status_code=status_code, model_name=self.model_name, body=r.text)
                raise UnexpectedModelBehavior(  # pragma: no cover
                    f'Unexpected response from gemini {status_code}', r.text
                )
            yield r

    def _process_response(self, response: _GeminiResponse) -> ModelResponse:
        vendor_details: dict[str, Any] | None = None

        if len(response['candidates']) != 1:
            raise UnexpectedModelBehavior('Expected exactly one candidate in Gemini response')  # pragma: no cover
        if 'content' not in response['candidates'][0]:
            if response['candidates'][0].get('finish_reason') == 'SAFETY':
                raise UnexpectedModelBehavior('Safety settings triggered', str(response))
            else:
                raise UnexpectedModelBehavior(  # pragma: no cover
                    'Content field missing from Gemini response', str(response)
                )
        parts = response['candidates'][0]['content']['parts']
        vendor_id = response.get('vendor_id', None)
        finish_reason = response['candidates'][0].get('finish_reason')
        if finish_reason:
            vendor_details = {'finish_reason': finish_reason}
        usage = _metadata_as_usage(response)
        usage.requests = 1
        return _process_response_from_parts(
            parts,
            response.get('model_version', self._model_name),
            usage,
            vendor_id=vendor_id,
            vendor_details=vendor_details,
        )

    async def _process_streamed_response(self, http_response: HTTPResponse) -> StreamedResponse:
        """Process a streamed response, and prepare a streaming response to return."""
        aiter_bytes = http_response.aiter_bytes()
        start_response: _GeminiResponse | None = None
        content = bytearray()

        async for chunk in aiter_bytes:
            content.extend(chunk)
            responses = _gemini_streamed_response_ta.validate_json(
                _ensure_decodeable(content),
                experimental_allow_partial='trailing-strings',
            )
            if responses:  # pragma: no branch
                last = responses[-1]
                if last['candidates'] and last['candidates'][0].get('content', {}).get('parts'):
                    start_response = last
                    break

        if start_response is None:
            raise UnexpectedModelBehavior('Streamed response ended without content or tool calls')

        return GeminiStreamedResponse(_model_name=self._model_name, _content=content, _stream=aiter_bytes)

    async def _message_to_gemini_content(
        self, messages: list[ModelMessage]
    ) -> tuple[list[_GeminiTextPart], list[_GeminiContent]]:
        sys_prompt_parts: list[_GeminiTextPart] = []
        contents: list[_GeminiContent] = []
        for m in messages:
            if isinstance(m, ModelRequest):
                message_parts: list[_GeminiPartUnion] = []

                for part in m.parts:
                    if isinstance(part, SystemPromptPart):
                        sys_prompt_parts.append(_GeminiTextPart(text=part.content))
                    elif isinstance(part, UserPromptPart):
                        message_parts.extend(await self._map_user_prompt(part))
                    elif isinstance(part, ToolReturnPart):
                        message_parts.append(_response_part_from_response(part.tool_name, part.model_response_object()))
                    elif isinstance(part, RetryPromptPart):
                        if part.tool_name is None:
                            message_parts.append(_GeminiTextPart(text=part.model_response()))  # pragma: no cover
                        else:
                            response = {'call_error': part.model_response()}
                            message_parts.append(_response_part_from_response(part.tool_name, response))
                    else:
                        assert_never(part)

                if message_parts:  # pragma: no branch
                    contents.append(_GeminiContent(role='user', parts=message_parts))
            elif isinstance(m, ModelResponse):
                contents.append(_content_model_response(m))
            else:
                assert_never(m)
        if instructions := self._get_instructions(messages):
            sys_prompt_parts.insert(0, _GeminiTextPart(text=instructions))
        return sys_prompt_parts, contents

    async def _map_user_prompt(self, part: UserPromptPart) -> list[_GeminiPartUnion]:
        if isinstance(part.content, str):
            return [{'text': part.content}]
        else:
            content: list[_GeminiPartUnion] = []
            for item in part.content:
                if isinstance(item, str):
                    content.append({'text': item})
                elif isinstance(item, BinaryContent):
                    base64_encoded = base64.b64encode(item.data).decode('utf-8')
                    content.append(
                        _GeminiInlineDataPart(inline_data={'data': base64_encoded, 'mime_type': item.media_type})
                    )
                elif isinstance(item, VideoUrl) and item.is_youtube:
                    file_data = _GeminiFileDataPart(file_data={'file_uri': item.url, 'mime_type': item.media_type})
                    content.append(file_data)
                elif isinstance(item, FileUrl):
                    if self.system == 'google-gla' or item.force_download:
                        downloaded_item = await download_item(item, data_format='base64')
                        inline_data = _GeminiInlineDataPart(
                            inline_data={'data': downloaded_item['data'], 'mime_type': downloaded_item['data_type']}
                        )
                        content.append(inline_data)
                    else:
                        file_data = _GeminiFileDataPart(file_data={'file_uri': item.url, 'mime_type': item.media_type})
                        content.append(file_data)
                else:
                    assert_never(item)
        return content

    def _map_response_schema(self, o: OutputObjectDefinition) -> dict[str, Any]:
        response_schema = o.json_schema.copy()
        if o.name:
            response_schema['title'] = o.name
        if o.description:
            response_schema['description'] = o.description

        return response_schema

__init__

__init__(
    model_name: GeminiModelName,
    *,
    provider: (
        Literal["google-gla", "google-vertex"]
        | Provider[AsyncClient]
    ) = "google-gla",
    profile: ModelProfileSpec | None = None,
    settings: ModelSettings | None = None
)

初始化一个 Gemini 模型。

参数

名称 类型 描述 默认值
model_name GeminiModelName

要使用的模型的名称。

必需
provider Literal['google-gla', 'google-vertex'] | Provider[AsyncClient]

用于身份验证和 API 访问的提供商。可以是字符串 'google-gla' 或 'google-vertex',也可以是 Provider[httpx.AsyncClient] 的实例。如果未提供,将使用其他参数创建一个新的提供商。

'google-gla'
profile ModelProfileSpec | None

要使用的模型配置文件。默认为提供程序根据模型名称选择的配置文件。

None
settings ModelSettings | None

此模型实例的默认模型设置。

None
源代码位于 pydantic_ai_slim/pydantic_ai/models/gemini.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
def __init__(
    self,
    model_name: GeminiModelName,
    *,
    provider: Literal['google-gla', 'google-vertex'] | Provider[httpx.AsyncClient] = 'google-gla',
    profile: ModelProfileSpec | None = None,
    settings: ModelSettings | None = None,
):
    """Initialize a Gemini model.

    Args:
        model_name: The name of the model to use.
        provider: The provider to use for authentication and API access. Can be either the string
            'google-gla' or 'google-vertex' or an instance of `Provider[httpx.AsyncClient]`.
            If not provided, a new provider will be created using the other parameters.
        profile: The model profile to use. Defaults to a profile picked by the provider based on the model name.
        settings: Default model settings for this model instance.
    """
    self._model_name = model_name
    self._provider = provider

    if isinstance(provider, str):
        provider = infer_provider(provider)
    self._system = provider.name
    self.client = provider.client
    self._url = str(self.client.base_url)

    super().__init__(settings=settings, profile=profile or provider.model_profile)

model_name 属性

model_name: GeminiModelName

模型名称。

system 属性

system: str

系统 / 模型提供程序。

AuthProtocol

基类:Protocol

Gemini 身份验证的抽象定义。

源代码位于 pydantic_ai_slim/pydantic_ai/models/gemini.py
405
406
407
408
class AuthProtocol(Protocol):
    """Abstract definition for Gemini authentication."""

    async def headers(self) -> dict[str, str]: ...

ApiKeyAuth dataclass

使用 API 密钥对 X-Goog-Api-Key 标头进行身份验证。

源代码位于 pydantic_ai_slim/pydantic_ai/models/gemini.py
411
412
413
414
415
416
417
418
419
@dataclass
class ApiKeyAuth:
    """Authentication using an API key for the `X-Goog-Api-Key` header."""

    api_key: str

    async def headers(self) -> dict[str, str]:
        # https://cloud.google.com/docs/authentication/api-keys-use#using-with-rest
        return {'X-Goog-Api-Key': self.api_key}  # pragma: no cover

GeminiStreamedResponse dataclass

基类:StreamedResponse

Gemini 模型的 StreamedResponse 实现。

源代码位于 pydantic_ai_slim/pydantic_ai/models/gemini.py
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
@dataclass
class GeminiStreamedResponse(StreamedResponse):
    """Implementation of `StreamedResponse` for the Gemini model."""

    _model_name: GeminiModelName
    _content: bytearray
    _stream: AsyncIterator[bytes]
    _timestamp: datetime = field(default_factory=_utils.now_utc, init=False)

    async def _get_event_iterator(self) -> AsyncIterator[ModelResponseStreamEvent]:
        async for gemini_response in self._get_gemini_responses():
            candidate = gemini_response['candidates'][0]
            if 'content' not in candidate:
                raise UnexpectedModelBehavior('Streamed response has no content field')  # pragma: no cover
            gemini_part: _GeminiPartUnion
            for gemini_part in candidate['content']['parts']:
                if 'text' in gemini_part:
                    # Using vendor_part_id=None means we can produce multiple text parts if their deltas are sprinkled
                    # amongst the tool call deltas
                    maybe_event = self._parts_manager.handle_text_delta(
                        vendor_part_id=None, content=gemini_part['text']
                    )
                    if maybe_event is not None:  # pragma: no branch
                        yield maybe_event

                elif 'function_call' in gemini_part:
                    # Here, we assume all function_call parts are complete and don't have deltas.
                    # We do this by assigning a unique randomly generated "vendor_part_id".
                    # We need to confirm whether this is actually true, but if it isn't, we can still handle it properly
                    # it would just be a bit more complicated. And we'd need to confirm the intended semantics.
                    maybe_event = self._parts_manager.handle_tool_call_delta(
                        vendor_part_id=uuid4(),
                        tool_name=gemini_part['function_call']['name'],
                        args=gemini_part['function_call']['args'],
                        tool_call_id=None,
                    )
                    if maybe_event is not None:  # pragma: no branch
                        yield maybe_event
                else:
                    if not any([key in gemini_part for key in ['function_response', 'thought']]):
                        raise AssertionError(f'Unexpected part: {gemini_part}')  # pragma: no cover

    async def _get_gemini_responses(self) -> AsyncIterator[_GeminiResponse]:
        # This method exists to ensure we only yield completed items, so we don't need to worry about
        # partial gemini responses, which would make everything more complicated

        gemini_responses: list[_GeminiResponse] = []
        current_gemini_response_index = 0
        # Right now, there are some circumstances where we will have information that could be yielded sooner than it is
        # But changing that would make things a lot more complicated.
        async for chunk in self._stream:
            self._content.extend(chunk)

            gemini_responses = _gemini_streamed_response_ta.validate_json(
                _ensure_decodeable(self._content),
                experimental_allow_partial='trailing-strings',
            )

            # The idea: yield only up to the latest response, which might still be partial.
            # Note that if the latest response is complete, we could yield it immediately, but there's not a good
            # allow_partial API to determine if the last item in the list is complete.
            responses_to_yield = gemini_responses[:-1]
            for r in responses_to_yield[current_gemini_response_index:]:
                current_gemini_response_index += 1
                yield r

        # Now yield the final response, which should be complete
        if gemini_responses:  # pragma: no branch
            r = gemini_responses[-1]
            self._usage = _metadata_as_usage(r)
            yield r

    @property
    def model_name(self) -> GeminiModelName:
        """Get the model name of the response."""
        return self._model_name

    @property
    def timestamp(self) -> datetime:
        """Get the timestamp of the response."""
        return self._timestamp

model_name 属性

model_name: GeminiModelName

获取响应的模型名称。

timestamp property

timestamp: datetime

获取响应的时间戳。

GeminiSafetySettings

基类:TypedDict

Gemini 模型请求的安全设置选项。

有关安全类别和阈值的描述,请参阅 Gemini API 文档。有关如何使用 GeminiSafetySettings 的示例,请参阅此处

源代码位于 pydantic_ai_slim/pydantic_ai/models/gemini.py
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
class GeminiSafetySettings(TypedDict):
    """Safety settings options for Gemini model request.

    See [Gemini API docs](https://ai.google.dev/gemini-api/docs/safety-settings) for safety category and threshold descriptions.
    For an example on how to use `GeminiSafetySettings`, see [here](../../agents.md#model-specific-settings).
    """

    category: Literal[
        'HARM_CATEGORY_UNSPECIFIED',
        'HARM_CATEGORY_HARASSMENT',
        'HARM_CATEGORY_HATE_SPEECH',
        'HARM_CATEGORY_SEXUALLY_EXPLICIT',
        'HARM_CATEGORY_DANGEROUS_CONTENT',
        'HARM_CATEGORY_CIVIC_INTEGRITY',
    ]
    """
    Safety settings category.
    """

    threshold: Literal[
        'HARM_BLOCK_THRESHOLD_UNSPECIFIED',
        'BLOCK_LOW_AND_ABOVE',
        'BLOCK_MEDIUM_AND_ABOVE',
        'BLOCK_ONLY_HIGH',
        'BLOCK_NONE',
        'OFF',
    ]
    """
    Safety settings threshold.
    """

category instance-attribute

category: Literal[
    "HARM_CATEGORY_UNSPECIFIED",
    "HARM_CATEGORY_HARASSMENT",
    "HARM_CATEGORY_HATE_SPEECH",
    "HARM_CATEGORY_SEXUALLY_EXPLICIT",
    "HARM_CATEGORY_DANGEROUS_CONTENT",
    "HARM_CATEGORY_CIVIC_INTEGRITY",
]

安全设置类别。

threshold instance-attribute

threshold: Literal[
    "HARM_BLOCK_THRESHOLD_UNSPECIFIED",
    "BLOCK_LOW_AND_ABOVE",
    "BLOCK_MEDIUM_AND_ABOVE",
    "BLOCK_ONLY_HIGH",
    "BLOCK_NONE",
    "OFF",
]

安全设置阈值。

ThinkingConfig

基类:TypedDict

思考功能配置。

源代码位于 pydantic_ai_slim/pydantic_ai/models/gemini.py
565
566
567
568
569
570
571
572
class ThinkingConfig(TypedDict, total=False):
    """The thinking features configuration."""

    include_thoughts: Annotated[bool, pydantic.Field(alias='includeThoughts')]
    """Indicates whether to include thoughts in the response. If true, thoughts are returned only if the model supports thought and thoughts are available."""

    thinking_budget: Annotated[int, pydantic.Field(alias='thinkingBudget')]
    """Indicates the thinking budget in tokens."""

include_thoughts instance-attribute

include_thoughts: Annotated[
    bool, Field(alias=includeThoughts)
]

指示是否在响应中包含思考过程。如果为 true,则仅在模型支持思考且思考过程可用时返回。

thinking_budget instance-attribute

thinking_budget: Annotated[int, Field(alias=thinkingBudget)]

指示以 token 为单位的思考预算。